Implement assocDelete.

Summary: Implement assocDelete.

Test Plan: unit test attached

Reviewers: heyongqiang

Reviewed By: heyongqiang

CC: MarkCallaghan

Differential Revision: https://reviews.facebook.net/D5721
main
Dhruba Borthakur 12 years ago
parent 72c45c66c6
commit a321d5be9e
  1. 272
      thrift/assoc.h
  2. 5
      thrift/test/simpletest.cpp

@ -35,6 +35,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
AssocServiceHandler(OpenHandles* openhandles) { AssocServiceHandler(OpenHandles* openhandles) {
openhandles_ = openhandles; openhandles_ = openhandles;
woptions_.sync = true;
} }
int64_t taoAssocPut(const Text& tableName, int64_t assocType, int64_t id1, int64_t taoAssocPut(const Text& tableName, int64_t assocType, int64_t id1,
@ -56,7 +57,12 @@ class AssocServiceHandler : virtual public AssocServiceIf {
int64_t taoAssocDelete(const Text& tableName, int64_t assocType, int64_t id1, int64_t taoAssocDelete(const Text& tableName, int64_t assocType, int64_t id1,
int64_t id2, AssocVisibility visibility, bool update_count, int64_t id2, AssocVisibility visibility, bool update_count,
const Text& wormhole_comment) { const Text& wormhole_comment) {
printf("taoAssocDelete not implemented yet\n"); leveldb::DB* db = openhandles_->get(tableName, NULL);
if (db == NULL) {
return Code::kNotFound;
}
return assocDeleteInternal(tableName, db, assocType, id1, id2, visibility,
update_count, wormhole_comment);
return 0; return 0;
} }
@ -96,6 +102,8 @@ class AssocServiceHandler : virtual public AssocServiceIf {
private: private:
OpenHandles* openhandles_; OpenHandles* openhandles_;
leveldb::ReadOptions roptions_;
leveldb::WriteOptions woptions_;
// the maximum values returned in a rangeget/multiget call. // the maximum values returned in a rangeget/multiget call.
const static unsigned int MAX_RANGE_SIZE = 10000; const static unsigned int MAX_RANGE_SIZE = 10000;
@ -103,8 +111,8 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// //
// Inserts an assoc // Inserts an assoc
// If update_count, returns the updated count of the assoc. // If update_count, returns the updated count of the assoc.
// If update_count, return zero. // If !update_count, return zero.
// On failure, return negative number. // On failure, throws exception
// //
int64_t assocPutInternal(const Text& tableName, leveldb::DB* db, int64_t assocPutInternal(const Text& tableName, leveldb::DB* db,
int64_t assocType, int64_t id1, int64_t assocType, int64_t id1,
@ -112,8 +120,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
int64_t ts, AssocVisibility vis, int64_t ts, AssocVisibility vis,
bool update_count, int64_t dataVersion, const Text& data, bool update_count, int64_t dataVersion, const Text& data,
const Text& wormhole_comment) { const Text& wormhole_comment) {
leveldb::WriteOptions woptions; leveldb::WriteBatch batch;
woptions.sync = true;
ts = convertTime(ts); // change time to numberofmillis till MAXLONG ts = convertTime(ts); // change time to numberofmillis till MAXLONG
// create the payload for this assoc // create the payload for this assoc
@ -126,62 +133,65 @@ class AssocServiceHandler : virtual public AssocServiceIf {
payload.resize(payloadsize); payload.resize(payloadsize);
makePayload(&payload[0], id1Type, id2Type, dataVersion, data, makePayload(&payload[0], id1Type, id2Type, dataVersion, data,
wormhole_comment); wormhole_comment);
leveldb::Slice payload_slice(payload);
// create RowKey with plenty of space at the end to query
// all columns 'c', 'm', 'p, etc.
int maxkeysize = sizeof(id1) + sizeof(assocType) +
1 + // 'c', 'p' or 'm'
sizeof(ts) +
sizeof(id2);
std::string dummy;
dummy.reserve(maxkeysize);
dummy.resize(maxkeysize);
char* keybuf = &dummy[0];
int rowkeysize = makeRowKey(keybuf, id1, assocType);
leveldb::ReadOptions roptions;
leveldb::Status status;
std::string value;
int keysize;
int64_t count = 0; int64_t count = 0;
int64_t oldts; int64_t oldts;
int8_t oldvis; int8_t oldvis;
bool newassoc = false; // is this assoc new or an overwrite bool newassoc = false; // is this assoc new or an overwrite
leveldb::Status status;
std::string value;
// make a key for count // create RowKey for 'c'
keysize = appendRowKeyForCount(rowkeysize, keybuf); int maxkeysize = sizeof(id1) + sizeof(assocType) + 1;
std::string dummy1;
dummy1.reserve(maxkeysize);
dummy1.resize(maxkeysize);
char* keybuf = &dummy1[0];
int rowkeysize = makeRowKey(keybuf, id1, assocType);
int keysize = appendRowKeyForCount(rowkeysize, keybuf);
leveldb::Slice ckey(keybuf, keysize); leveldb::Slice ckey(keybuf, keysize);
// Scan 'c' to get $count if $update_count == true // Scan 'c' to get $count if $update_count == true
if (update_count) { if (update_count) {
status = db->Get(roptions, ckey, &value); status = db->Get(roptions_, ckey, &value);
if (status.IsNotFound()) { if (status.IsNotFound()) {
// nothing to do // nothing to do
} else if (!status.ok() || (value.size() != sizeof(int64_t))) { } else if (!status.ok() || (value.size() != sizeof(int64_t))) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"Unable to find count ", "AssocPut Unable to extract count ",
assocType, id1, id2, id1Type, id2Type, ts, vis); assocType, id1, id2, id1Type, id2Type, ts, vis);
} else { } else {
extract_int64(&count, (char *)value.c_str()); extract_int64(&count, (char *)value.c_str());
} }
} }
// Scan 'm'$id2 to get $ts and $vis // Scan 'm'$id2 to get $ts and $vis
maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(id2);
std::string dummy2;
dummy2.reserve(maxkeysize);
dummy2.resize(maxkeysize);
keybuf = &dummy2[0];
rowkeysize = makeRowKey(keybuf, id1, assocType);
keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2); keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2);
leveldb::Slice mkey(keybuf, keysize); leveldb::Slice mkey(keybuf, keysize);
status = db->Get(roptions, mkey, &value); status = db->Get(roptions_, mkey, &value);
if (status.IsNotFound()) { if (status.IsNotFound()) {
newassoc = true; newassoc = true;
} else if (!status.ok() || } else if (!status.ok() ||
(value.size() != sizeof(int64_t) + sizeof(int8_t))) { (value.size() != sizeof(int64_t) + sizeof(int8_t))) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"Unable to find m$id2 ", "AssocPut Unable to extract m$id2 ",
assocType, id1, id2, id1Type, id2Type, ts, vis); assocType, id1, id2, id1Type, id2Type, ts, vis);
} }
// make the key 'p'$old_ts$id2 // make the key 'p'$old_ts$id2
maxkeysize = sizeof(id1) + sizeof(assocType) + 1 +
sizeof(ts) + sizeof(id2);
std::string dummy3;
dummy3.reserve(maxkeysize);
dummy3.resize(maxkeysize);
keybuf = &dummy3[0];
rowkeysize = makeRowKey(keybuf, id1, assocType);
keysize = appendRowKeyForPayload(rowkeysize, keybuf, oldts, id2); keysize = appendRowKeyForPayload(rowkeysize, keybuf, oldts, id2);
leveldb::Slice pkey(keybuf, keysize); leveldb::Slice pkey(keybuf, keysize);
@ -189,51 +199,39 @@ class AssocServiceHandler : virtual public AssocServiceIf {
if (!newassoc) { if (!newassoc) {
extractTsVisString(&oldts, &oldvis, (char *)value.c_str()); extractTsVisString(&oldts, &oldvis, (char *)value.c_str());
if (ts != oldts) { if (ts != oldts) {
if (!db->Delete(woptions, pkey).ok()) { batch.Delete(pkey);
throw generate_exception(tableName, Code::kNotFound,
"Unable to delete from p$oldts$id2 ",
assocType, id1, id2, id1Type, id2Type, ts, vis);
}
} }
} }
// store in m$id2 the value of $ts$vis // store in m$id2 the value of $ts$vis
std::string myvalue; std::string myvalue;
appendRowKeyForMeta(rowkeysize, keybuf, id2);
myvalue.reserve(sizeof(int64_t) + sizeof(int8_t)); myvalue.reserve(sizeof(int64_t) + sizeof(int8_t));
myvalue.resize(sizeof(int64_t) + sizeof(int8_t)); myvalue.resize(sizeof(int64_t) + sizeof(int8_t));
makeTsVisString(&myvalue[0], ts, vis); makeTsVisString(&myvalue[0], ts, vis);
leveldb::Slice sl(myvalue); leveldb::Slice sl(myvalue);
if (!db->Put(woptions, mkey, sl).ok()) { batch.Put(mkey, leveldb::Slice(myvalue));
// throw exception;
throw generate_exception(tableName, Code::kNotFound,
"Unable to put into m$id2",
assocType, id1, id2, id1Type, id2Type, ts, vis);
}
// store in p$ts$id2 the payload // store in p$ts$id2 the payload
appendRowKeyForPayload(rowkeysize, keybuf, ts, id2); keybuf = &dummy3[0];
if (!db->Put(woptions, pkey, payload_slice).ok()) { keysize = appendRowKeyForPayload(rowkeysize, keybuf, ts, id2);
throw generate_exception(tableName, Code::kNotFound, leveldb::Slice pkeynew(keybuf, keysize);
"Unable to put into p$ts$id2", batch.Put(pkeynew, leveldb::Slice(payload));
assocType, id1, id2, id1Type, id2Type, ts, vis);
}
// increment count // increment count
if (update_count && (newassoc || oldvis != VISIBLE)) { if (update_count && (newassoc || oldvis != VISIBLE)) {
assert(count >= 0); assert(count >= 0);
count++; count++;
appendRowKeyForCount(rowkeysize, keybuf);
myvalue.reserve(sizeof(int64_t)); myvalue.reserve(sizeof(int64_t));
myvalue.resize(sizeof(int64_t)); myvalue.resize(sizeof(int64_t));
makeCountString(&myvalue[0], count); makeCountString(&myvalue[0], count);
leveldb::Slice count_slice(myvalue); batch.Put(ckey, leveldb::Slice(myvalue));
if (!db->Put(woptions, ckey, count_slice).ok()) { }
status = db->Write(woptions_, &batch);
if (!status.ok()) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"Unable to insert into count", "AssocPut Unable to batch write ",
assocType, id1, id2, id1Type, id2Type, ts, vis); assocType, id1, id2, id1Type, id2Type, ts, vis);
} }
}
if (update_count) { if (update_count) {
assert(count > 0); assert(count > 0);
return count; return count;
@ -241,6 +239,152 @@ class AssocServiceHandler : virtual public AssocServiceIf {
return 0; return 0;
} }
//
// Deletes an assoc
// If count changes return 1, else returns zero
// On failure, thrws exception
//
int64_t assocDeleteInternal(const Text& tableName, leveldb::DB* db,
int64_t assocType, int64_t id1,
int64_t id2, AssocVisibility vis,
bool update_count, const Text& wormhole_comment) {
leveldb::WriteBatch batch;
int64_t count = 0;
int64_t oldts;
int8_t oldvis;
std::string value;
// make a key for count
int maxkeysize = sizeof(id1) + sizeof(assocType) + 1;
std::string dummy;
dummy.reserve(maxkeysize);
dummy.resize(maxkeysize);
char* keybuf = &dummy[0];
int rowkeysize = makeRowKey(keybuf, id1, assocType);
leveldb::Status status;
int keysize = appendRowKeyForCount(rowkeysize, keybuf);
leveldb::Slice ckey(keybuf, keysize);
// Scan 'c' to get $count if $update_count == true
if (update_count) {
status = db->Get(roptions_, ckey, &value);
if (status.IsNotFound()) {
throw generate_exception(tableName, Code::kNotFound,
"assocDelete: Unable to find count ",
assocType, id1, id2, 0, 0, 0, vis);
} else if (!status.ok() || (value.size() != sizeof(int64_t))) {
throw generate_exception(tableName, Code::kNotFound,
"assocDelete: Unable to extract count ",
assocType, id1, id2, 0, 0, 0, vis);
} else {
extract_int64(&count, (char *)value.c_str());
}
}
// Scan 'm'$id2 to get $ts and $vis
maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(id2);
std::string dummy2;
dummy2.reserve(maxkeysize);
dummy2.resize(maxkeysize);
keybuf = &dummy2[0];
rowkeysize = makeRowKey(keybuf, id1, assocType);
keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2);
leveldb::Slice mkey(keybuf, keysize);
status = db->Get(roptions_, mkey, &value);
if (status.IsNotFound()) {
throw generate_exception(tableName, Code::kNotFound,
"assocDelete Unable to find column m ",
assocType, id1, id2, 0, 0, 0, vis);
} else if (!status.ok() ||
(value.size() != sizeof(int64_t) + sizeof(int8_t))) {
throw generate_exception(tableName, Code::kNotFound,
"assocDelete Unable to extract m$id2 ",
assocType, id1, id2, 0, 0, 0, vis);
}
extractTsVisString(&oldts, &oldvis, (char *)value.c_str());
// Create d'$id2
maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(id2);
std::string dummy3;
dummy3.reserve(maxkeysize);
dummy3.resize(maxkeysize);
keybuf = &dummy3[0];
rowkeysize = makeRowKey(keybuf, id1, assocType);
keysize = appendRowKeyForDelete(rowkeysize, keybuf, id2);
leveldb::Slice dkey(keybuf, keysize);
// create key for 'p'
maxkeysize = sizeof(id1) + sizeof(assocType) + 1 +
sizeof(oldts) + sizeof(id2);
std::string dummy4;
dummy4.reserve(maxkeysize);
dummy4.resize(maxkeysize);
keybuf = &dummy4[0];
rowkeysize = makeRowKey(keybuf, id1, assocType);
keysize = appendRowKeyForPayload(rowkeysize, keybuf, oldts, id2);
leveldb::Slice pkey(keybuf, keysize);
// if this is a hard delete, then delete all columns
if (vis == AssocVisibility::HARD_DELETE) {
batch.Delete(ckey);
batch.Delete(mkey);
batch.Delete(dkey);
batch.Delete(pkey);
} else if (vis == AssocVisibility::DELETED) {
if (oldvis != AssocVisibility::DELETED) {
// change vis in m$id2
std::string mvalue;
mvalue.reserve(sizeof(int64_t) + sizeof(int8_t));
mvalue.resize(sizeof(int64_t) + sizeof(int8_t));
makeTsVisString(&mvalue[0], oldts, vis);
batch.Put(mkey, leveldb::Slice(mvalue));
}
// scan p$tsid2 to get payload
// do we need to modify payload with new wormhole comments?
std::string pvalue;
status = db->Get(roptions_, pkey, &pvalue);
if (status.IsNotFound()) {
throw generate_exception(tableName, Code::kNotFound,
"assocDelete Unable to find p ",
assocType, id1, id2, 0, 0, oldts, vis);
} else if (!status.ok() ||
(value.size() != sizeof(int64_t) + sizeof(int8_t))) {
throw generate_exception(tableName, Code::kNotFound,
"assocDelete Unable to extract p ",
assocType, id1, id2, 0, 0, oldts, vis);
}
// store payload in d$id2
batch.Put(dkey, leveldb::Slice(pvalue));
// delete p$ts$id2
batch.Delete(pkey);
}
int return_value = 0;
if (update_count && oldvis == AssocVisibility::VISIBLE) {
return_value = 1;
assert(count >= 1);
count--;
std::string myvalue;
myvalue.reserve(sizeof(int64_t));
myvalue.resize(sizeof(int64_t));
makeCountString(&myvalue[0], count);
batch.Put(ckey, leveldb::Slice(myvalue));
}
status = db->Write(woptions_, &batch);
if (!status.ok()) {
throw generate_exception(tableName, Code::kNotFound,
"assocDelete Unable to Batch Write ",
assocType, id1, id2, 0, 0, oldts, vis);
}
if (update_count) {
assert(count >= 0);
return count;
}
return return_value;
}
int64_t assocCountInternal(const Text& tableName, leveldb::DB* db, int64_t assocCountInternal(const Text& tableName, leveldb::DB* db,
int64_t assocType, int64_t id1) { int64_t assocType, int64_t id1) {
// create key to query // create key to query
@ -254,24 +398,23 @@ class AssocServiceHandler : virtual public AssocServiceIf {
leveldb::Slice ckey(keybuf, keysize); leveldb::Slice ckey(keybuf, keysize);
// query database to find value // query database to find value
leveldb::ReadOptions roptions;
leveldb::Status status; leveldb::Status status;
std::string value; std::string value;
int64_t count; int64_t count;
status = db->Get(roptions, ckey, &value); status = db->Get(roptions_, ckey, &value);
// parse results retrieved from database // parse results retrieved from database
if (status.IsNotFound()) { if (status.IsNotFound()) {
return 0; // non existant assoc return 0; // non existant assoc
} else if (!status.ok()) { } else if (!status.ok()) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"Unable to find count ", "assocCountInternal Unable to find count ",
assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1); assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1);
} }
if (value.size() != sizeof(int64_t)) { if (value.size() != sizeof(int64_t)) {
printf("expected %ld got %ld\n", sizeof(int64_t), value.size()); printf("expected %ld got %ld\n", sizeof(int64_t), value.size());
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"Bad sizes for count ", "assocCountInternal Bad sizes for count ",
assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1); assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1);
} }
extract_int64(&count, (char *)value.c_str()); extract_int64(&count, (char *)value.c_str());
@ -320,7 +463,6 @@ class AssocServiceHandler : virtual public AssocServiceIf {
offset--; offset--;
continue; continue;
} }
printf("XXX server key found %s\n", iter->key().ToString().c_str());
ASSERT_GE(iter->key().size_, (unsigned int)rowkeysize); ASSERT_GE(iter->key().size_, (unsigned int)rowkeysize);
// extract the timestamp and id1 from the key // extract the timestamp and id1 from the key
@ -353,7 +495,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
if (id2s.size() > MAX_RANGE_SIZE) { if (id2s.size() > MAX_RANGE_SIZE) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"Ids2 cannot be gteater than 10K.", "assocGetInternal Ids2 cannot be gteater than 10K.",
assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1); assocType, id1, 0, 0, 0, 0, Tleveldb::UNUSED1);
} }
// allocate the entire result buffer. // allocate the entire result buffer.
@ -373,7 +515,6 @@ class AssocServiceHandler : virtual public AssocServiceIf {
for (unsigned int index = 0; index < id2s.size(); index++) { for (unsigned int index = 0; index < id2s.size(); index++) {
int64_t ts; int64_t ts;
int8_t oldvis; int8_t oldvis;
leveldb::ReadOptions roptions;
leveldb::Status status; leveldb::Status status;
std::string value, wormhole; std::string value, wormhole;
@ -381,7 +522,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
id2 = id2s[index]; id2 = id2s[index];
int keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2); int keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2);
leveldb::Slice ckey(keybuf, keysize); leveldb::Slice ckey(keybuf, keysize);
status = db->Get(roptions, ckey, &value); status = db->Get(roptions_, ckey, &value);
// parse results retrieved from database // parse results retrieved from database
if (status.IsNotFound()) { if (status.IsNotFound()) {
@ -402,7 +543,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// this assoc is visible, scan 'p'$ts$id2 to retrieve payload. // this assoc is visible, scan 'p'$ts$id2 to retrieve payload.
keysize = appendRowKeyForPayload(rowkeysize, keybuf, ts, id2); keysize = appendRowKeyForPayload(rowkeysize, keybuf, ts, id2);
leveldb::Slice pkey(keybuf, keysize); leveldb::Slice pkey(keybuf, keysize);
status = db->Get(roptions, pkey, &value); status = db->Get(roptions_, pkey, &value);
// parse results retrieved from database // parse results retrieved from database
if (status.IsNotFound()) { if (status.IsNotFound()) {
@ -469,6 +610,15 @@ class AssocServiceHandler : virtual public AssocServiceIf {
return rowkeysize + sizeof(id2) + 1; return rowkeysize + sizeof(id2) + 1;
} }
// fill the row key +'d' + id2 and returns the size of the key
inline int appendRowKeyForDelete(int rowkeysize, char* dest,
int64_t id2) {
dest += rowkeysize;
*dest++ = 'd';
dest = copy_int64_switch_endian(dest, id2);
return rowkeysize + sizeof(id2) + 1;
}
// encode id1Type, id2Type, dataversion, etc into the payload // encode id1Type, id2Type, dataversion, etc into the payload
void makePayload(char* dest, int64_t id1Type, int64_t id2Type, void makePayload(char* dest, int64_t id1Type, int64_t id2Type,
int64_t dataVersion, const Text& data, int64_t dataVersion, const Text& data,

@ -298,6 +298,11 @@ static void testAssocs() {
id1, LONG_MAX, 0, id1, LONG_MAX, 0,
offset, limit); offset, limit);
ASSERT_EQ((unsigned int)2, readback.size()); ASSERT_EQ((unsigned int)2, readback.size());
// Delete the most recent assoc
int c = aclient->taoAssocDelete(dbname, assocType,
id1, id2+2, AssocVisibility::HIDDEN, true, "");
ASSERT_EQ(c, 1);
} }
// //

Loading…
Cancel
Save