From f7975ac733f5ec475ef4d016807b48211e162979 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Wed, 3 Oct 2012 16:35:53 -0700 Subject: [PATCH] Implement RowLocks for assoc schema Summary: Each assoc is identified by (id1, assocType). This is the rowkey. Each row has a read/write rowlock. There is statically allocated array of 2000 read/write locks. A rowkey is murmur-hashed to one of the read/write locks. assocPut and assocDelete acquires the rowlock in Write mode. The key-updates are done within the rowlock with a atomic nosync batch write to leveldb. Then the rowlock is released and a write-with-sync is done to sync leveldb transaction log. Test Plan: added unit test Reviewers: heyongqiang Reviewed By: heyongqiang Differential Revision: https://reviews.facebook.net/D5859 --- thrift/assoc.h | 444 +++++++++++++++++++++---------------- thrift/test/simpletest.cpp | 9 + util/murmurhash.cc | 178 +++++++++++++++ util/murmurhash.h | 32 +++ 4 files changed, 470 insertions(+), 193 deletions(-) create mode 100644 util/murmurhash.cc create mode 100644 util/murmurhash.h diff --git a/thrift/assoc.h b/thrift/assoc.h index 2882eb9c0..af944bd10 100644 --- a/thrift/assoc.h +++ b/thrift/assoc.h @@ -15,6 +15,9 @@ #include "leveldb/db.h" #include "leveldb/write_batch.h" #include "util/testharness.h" +#include "port/port.h" +#include "util/mutexlock.h" +#include "util/murmurhash.h" using namespace apache::thrift; using namespace apache::thrift::protocol; @@ -35,7 +38,7 @@ class AssocServiceHandler : virtual public AssocServiceIf { AssocServiceHandler(OpenHandles* openhandles) { openhandles_ = openhandles; - woptions_.sync = true; + woptions_sync_.sync = true; } int64_t taoAssocPut(const Text& tableName, int64_t assocType, int64_t id1, @@ -103,11 +106,25 @@ class AssocServiceHandler : virtual public AssocServiceIf { private: OpenHandles* openhandles_; leveldb::ReadOptions roptions_; - leveldb::WriteOptions woptions_; + leveldb::WriteOptions woptions_; // write with no sync + leveldb::WriteOptions woptions_sync_; // write with sync // the maximum values returned in a rangeget/multiget call. const static unsigned int MAX_RANGE_SIZE = 10000; + // the seed for murmur hash (copied from Hadoop) + const static unsigned int HASHSEED = 0x5bd1e995; + + // A bunch of rowlocks, sharded over the entire rowkey range + // Each rowkey is deterministically mapped to one of these locks. + leveldb::port::RWMutex rowlocks_[1000]; + + // A helper method that hashes the row key to a lock + leveldb::port::RWMutex* findRowLock(char* str, int size) { + int index = MurmurHash(str, size, HASHSEED) % sizeof(rowlocks_); + return &rowlocks_[index]; + } + // // Inserts an assoc // If update_count, returns the updated count of the assoc. @@ -151,86 +168,110 @@ class AssocServiceHandler : virtual public AssocServiceIf { int keysize = appendRowKeyForCount(rowkeysize, keybuf); leveldb::Slice ckey(keybuf, keysize); - // Scan 'c' to get $count if $update_count == true - if (update_count) { - status = db->Get(roptions_, ckey, &value); + // find the row lock + leveldb::port::RWMutex* rowlock = findRowLock(keybuf, rowkeysize); + { + // acquire the row lock + leveldb::WriteLock l(rowlock); + + // Scan 'c' to get $count if $update_count == true + if (update_count) { + status = db->Get(roptions_, ckey, &value); + if (status.IsNotFound()) { + // nothing to do + } else if (!status.ok() || (value.size() != sizeof(int64_t))) { + throw generate_exception(tableName, Code::kNotFound, + "AssocPut Unable to extract count ", + assocType, id1, id2, id1Type, id2Type, ts, vis); + } else { + extract_int64(&count, (char *)value.c_str()); + } + } + + // Scan 'm'$id2 to get $ts and $vis + maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(id2); + std::string dummy2; + dummy2.reserve(maxkeysize); + dummy2.resize(maxkeysize); + keybuf = &dummy2[0]; + rowkeysize = makeRowKey(keybuf, id1, assocType); + keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2); + leveldb::Slice mkey(keybuf, keysize); + status = db->Get(roptions_, mkey, &value); if (status.IsNotFound()) { - // nothing to do - } else if (!status.ok() || (value.size() != sizeof(int64_t))) { + newassoc = true; + oldvis = UNUSED1; + } else if (!status.ok() || + (value.size() != sizeof(int64_t) + sizeof(int8_t))) { throw generate_exception(tableName, Code::kNotFound, - "AssocPut Unable to extract count ", - assocType, id1, id2, id1Type, id2Type, ts, vis); - } else { - extract_int64(&count, (char *)value.c_str()); + "AssocPut Unable to extract m$id2 ", + assocType, id1, id2, id1Type, id2Type, ts, vis); } - } - // Scan 'm'$id2 to get $ts and $vis - maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(id2); - std::string dummy2; - dummy2.reserve(maxkeysize); - dummy2.resize(maxkeysize); - keybuf = &dummy2[0]; - rowkeysize = makeRowKey(keybuf, id1, assocType); - keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2); - leveldb::Slice mkey(keybuf, keysize); - status = db->Get(roptions_, mkey, &value); - if (status.IsNotFound()) { - newassoc = true; - } else if (!status.ok() || - (value.size() != sizeof(int64_t) + sizeof(int8_t))) { - throw generate_exception(tableName, Code::kNotFound, - "AssocPut Unable to extract m$id2 ", - assocType, id1, id2, id1Type, id2Type, ts, vis); - } - - // make the key 'p'$old_ts$id2 - maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + - sizeof(ts) + sizeof(id2); - std::string dummy3; - dummy3.reserve(maxkeysize); - dummy3.resize(maxkeysize); - keybuf = &dummy3[0]; - rowkeysize = makeRowKey(keybuf, id1, assocType); - keysize = appendRowKeyForPayload(rowkeysize, keybuf, oldts, id2); - leveldb::Slice pkey(keybuf, keysize); + // make the key 'p'$old_ts$id2 + maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + + sizeof(ts) + sizeof(id2); + std::string dummy3; + dummy3.reserve(maxkeysize); + dummy3.resize(maxkeysize); + keybuf = &dummy3[0]; + rowkeysize = makeRowKey(keybuf, id1, assocType); + + // if ts != oldts, then delete 'p'$old_ts$id2 + if (!newassoc) { + extractTsVisString(&oldts, &oldvis, (char *)value.c_str()); + keysize = appendRowKeyForPayload(rowkeysize, keybuf, oldts, id2); + leveldb::Slice pkey(keybuf, keysize); + if (ts != oldts) { + batch.Delete(pkey); + } + } - // if ts != oldts, then delete 'p'$old_ts$id2 - if (!newassoc) { - extractTsVisString(&oldts, &oldvis, (char *)value.c_str()); - if (ts != oldts) { - batch.Delete(pkey); + // store in m$id2 the value of $ts$vis + std::string myvalue; + myvalue.reserve(sizeof(int64_t) + sizeof(int8_t)); + myvalue.resize(sizeof(int64_t) + sizeof(int8_t)); + makeTsVisString(&myvalue[0], ts, vis); + leveldb::Slice sl(myvalue); + batch.Put(mkey, leveldb::Slice(myvalue)); + + // store in p$ts$id2 the payload + keybuf = &dummy3[0]; + keysize = appendRowKeyForPayload(rowkeysize, keybuf, ts, id2); + leveldb::Slice pkeynew(keybuf, keysize); + batch.Put(pkeynew, leveldb::Slice(payload)); + + // increment count + if (update_count && (newassoc || oldvis != VISIBLE)) { + assert(count >= 0); + count++; + myvalue.reserve(sizeof(int64_t)); + myvalue.resize(sizeof(int64_t)); + makeCountString(&myvalue[0], count); + batch.Put(ckey, leveldb::Slice(myvalue)); } - } - // store in m$id2 the value of $ts$vis - std::string myvalue; - myvalue.reserve(sizeof(int64_t) + sizeof(int8_t)); - myvalue.resize(sizeof(int64_t) + sizeof(int8_t)); - makeTsVisString(&myvalue[0], ts, vis); - leveldb::Slice sl(myvalue); - batch.Put(mkey, leveldb::Slice(myvalue)); - - // store in p$ts$id2 the payload - keybuf = &dummy3[0]; - keysize = appendRowKeyForPayload(rowkeysize, keybuf, ts, id2); - leveldb::Slice pkeynew(keybuf, keysize); - batch.Put(pkeynew, leveldb::Slice(payload)); - - // increment count - if (update_count && (newassoc || oldvis != VISIBLE)) { - assert(count >= 0); - count++; - myvalue.reserve(sizeof(int64_t)); - myvalue.resize(sizeof(int64_t)); - makeCountString(&myvalue[0], count); - batch.Put(ckey, leveldb::Slice(myvalue)); - } - status = db->Write(woptions_, &batch); + // We do a write here without sync. This writes it to the + // transaction log but does not sync it. It also makes these + // changes readable by other threads. + status = db->Write(woptions_, &batch); + if (!status.ok()) { + throw generate_exception(tableName, Code::kNotFound, + "AssocPut Unable to batch write ", + assocType, id1, id2, id1Type, id2Type, ts, vis); + } + } // release rowlock + + // Do a sync to the transaction log without holding the rowlock. + // This improves updates for hotrows. The disadvantage is that + // uncommiitted reads might be read by other threads, but that + // should be ok. + batch.Clear(); + status = db->Write(woptions_sync_, &batch); if (!status.ok()) { throw generate_exception(tableName, Code::kNotFound, - "AssocPut Unable to batch write ", - assocType, id1, id2, id1Type, id2Type, ts, vis); + "AssocPut Unable to batch sync write ", + assocType, id1, id2, id1Type, id2Type, ts, vis); } if (update_count) { assert(count > 0); @@ -249,6 +290,7 @@ class AssocServiceHandler : virtual public AssocServiceIf { int64_t id2, AssocVisibility vis, bool update_count, const Text& wormhole_comment) { leveldb::WriteBatch batch; + int return_value = 0; int64_t count = 0; int64_t oldts; int8_t oldvis; @@ -265,119 +307,135 @@ class AssocServiceHandler : virtual public AssocServiceIf { int keysize = appendRowKeyForCount(rowkeysize, keybuf); leveldb::Slice ckey(keybuf, keysize); - // Scan 'c' to get $count if $update_count == true - if (update_count) { - status = db->Get(roptions_, ckey, &value); - if (status.IsNotFound()) { - throw generate_exception(tableName, Code::kNotFound, - "assocDelete: Unable to find count ", - assocType, id1, id2, 0, 0, 0, vis); - } else if (!status.ok() || (value.size() != sizeof(int64_t))) { - throw generate_exception(tableName, Code::kNotFound, - "assocDelete: Unable to extract count ", - assocType, id1, id2, 0, 0, 0, vis); - } else { - extract_int64(&count, (char *)value.c_str()); + // find the row lock + leveldb::port::RWMutex* rowlock = findRowLock(keybuf, rowkeysize); + { + // acquire the row lock + leveldb::WriteLock l(rowlock); + + // Scan 'c' to get $count if $update_count == true + if (update_count) { + status = db->Get(roptions_, ckey, &value); + if (status.IsNotFound()) { + throw generate_exception(tableName, Code::kNotFound, + "assocDelete: Unable to find count ", + assocType, id1, id2, 0, 0, 0, vis); + } else if (!status.ok() || (value.size() != sizeof(int64_t))) { + throw generate_exception(tableName, Code::kNotFound, + "assocDelete: Unable to extract count ", + assocType, id1, id2, 0, 0, 0, vis); + } else { + extract_int64(&count, (char *)value.c_str()); + } } - } - // Scan 'm'$id2 to get $ts and $vis - maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(id2); - std::string dummy2; - dummy2.reserve(maxkeysize); - dummy2.resize(maxkeysize); - keybuf = &dummy2[0]; - rowkeysize = makeRowKey(keybuf, id1, assocType); - keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2); - leveldb::Slice mkey(keybuf, keysize); - status = db->Get(roptions_, mkey, &value); - if (status.IsNotFound()) { - throw generate_exception(tableName, Code::kNotFound, - "assocDelete Unable to find column m ", - assocType, id1, id2, 0, 0, 0, vis); - } else if (!status.ok() || - (value.size() != sizeof(int64_t) + sizeof(int8_t))) { - throw generate_exception(tableName, Code::kNotFound, - "assocDelete Unable to extract m$id2 ", - assocType, id1, id2, 0, 0, 0, vis); - } - extractTsVisString(&oldts, &oldvis, (char *)value.c_str()); - - // Create d'$id2 - maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(id2); - std::string dummy3; - dummy3.reserve(maxkeysize); - dummy3.resize(maxkeysize); - keybuf = &dummy3[0]; - rowkeysize = makeRowKey(keybuf, id1, assocType); - keysize = appendRowKeyForDelete(rowkeysize, keybuf, id2); - leveldb::Slice dkey(keybuf, keysize); - - // create key for 'p' - maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + - sizeof(oldts) + sizeof(id2); - std::string dummy4; - dummy4.reserve(maxkeysize); - dummy4.resize(maxkeysize); - keybuf = &dummy4[0]; - rowkeysize = makeRowKey(keybuf, id1, assocType); - keysize = appendRowKeyForPayload(rowkeysize, keybuf, oldts, id2); - leveldb::Slice pkey(keybuf, keysize); - - // if this is a hard delete, then delete all columns - if (vis == AssocVisibility::HARD_DELETE) { - batch.Delete(ckey); - batch.Delete(mkey); - batch.Delete(dkey); - batch.Delete(pkey); - } else if (vis == AssocVisibility::DELETED) { - if (oldvis != AssocVisibility::DELETED) { - // change vis in m$id2 - std::string mvalue; - mvalue.reserve(sizeof(int64_t) + sizeof(int8_t)); - mvalue.resize(sizeof(int64_t) + sizeof(int8_t)); - makeTsVisString(&mvalue[0], oldts, vis); - batch.Put(mkey, leveldb::Slice(mvalue)); - } - - // scan p$tsid2 to get payload - // do we need to modify payload with new wormhole comments? - std::string pvalue; - status = db->Get(roptions_, pkey, &pvalue); + // Scan 'm'$id2 to get $ts and $vis + maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(id2); + std::string dummy2; + dummy2.reserve(maxkeysize); + dummy2.resize(maxkeysize); + keybuf = &dummy2[0]; + rowkeysize = makeRowKey(keybuf, id1, assocType); + keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2); + leveldb::Slice mkey(keybuf, keysize); + status = db->Get(roptions_, mkey, &value); if (status.IsNotFound()) { throw generate_exception(tableName, Code::kNotFound, - "assocDelete Unable to find p ", - assocType, id1, id2, 0, 0, oldts, vis); + "assocDelete Unable to find column m ", + assocType, id1, id2, 0, 0, 0, vis); } else if (!status.ok() || (value.size() != sizeof(int64_t) + sizeof(int8_t))) { throw generate_exception(tableName, Code::kNotFound, - "assocDelete Unable to extract p ", - assocType, id1, id2, 0, 0, oldts, vis); + "assocDelete Unable to extract m$id2 ", + assocType, id1, id2, 0, 0, 0, vis); } + extractTsVisString(&oldts, &oldvis, (char *)value.c_str()); - // store payload in d$id2 - batch.Put(dkey, leveldb::Slice(pvalue)); + // Create d'$id2 + maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + sizeof(id2); + std::string dummy3; + dummy3.reserve(maxkeysize); + dummy3.resize(maxkeysize); + keybuf = &dummy3[0]; + rowkeysize = makeRowKey(keybuf, id1, assocType); + keysize = appendRowKeyForDelete(rowkeysize, keybuf, id2); + leveldb::Slice dkey(keybuf, keysize); + + // create key for 'p' + maxkeysize = sizeof(id1) + sizeof(assocType) + 1 + + sizeof(oldts) + sizeof(id2); + std::string dummy4; + dummy4.reserve(maxkeysize); + dummy4.resize(maxkeysize); + keybuf = &dummy4[0]; + rowkeysize = makeRowKey(keybuf, id1, assocType); + keysize = appendRowKeyForPayload(rowkeysize, keybuf, oldts, id2); + leveldb::Slice pkey(keybuf, keysize); + + // if this is a hard delete, then delete all columns + if (vis == AssocVisibility::HARD_DELETE) { + batch.Delete(ckey); + batch.Delete(mkey); + batch.Delete(dkey); + batch.Delete(pkey); + } else if (vis == AssocVisibility::DELETED) { + if (oldvis != AssocVisibility::DELETED) { + // change vis in m$id2 + std::string mvalue; + mvalue.reserve(sizeof(int64_t) + sizeof(int8_t)); + mvalue.resize(sizeof(int64_t) + sizeof(int8_t)); + makeTsVisString(&mvalue[0], oldts, vis); + batch.Put(mkey, leveldb::Slice(mvalue)); + } + + // scan p$tsid2 to get payload + // do we need to modify payload with new wormhole comments? + std::string pvalue; + status = db->Get(roptions_, pkey, &pvalue); + if (status.IsNotFound()) { + throw generate_exception(tableName, Code::kNotFound, + "assocDelete Unable to find p ", + assocType, id1, id2, 0, 0, oldts, vis); + } else if (!status.ok() || + (value.size() != sizeof(int64_t) + sizeof(int8_t))) { + throw generate_exception(tableName, Code::kNotFound, + "assocDelete Unable to extract p ", + assocType, id1, id2, 0, 0, oldts, vis); + } + + // store payload in d$id2 + batch.Put(dkey, leveldb::Slice(pvalue)); + + // delete p$ts$id2 + batch.Delete(pkey); + } + if (update_count && oldvis == AssocVisibility::VISIBLE) { + return_value = 1; + assert(count >= 1); + count--; + std::string myvalue; + myvalue.reserve(sizeof(int64_t)); + myvalue.resize(sizeof(int64_t)); + makeCountString(&myvalue[0], count); + batch.Put(ckey, leveldb::Slice(myvalue)); + } + status = db->Write(woptions_, &batch); // write with no sync + if (!status.ok()) { + throw generate_exception(tableName, Code::kNotFound, + "assocDelete Unable to Batch Write ", + assocType, id1, id2, 0, 0, oldts, vis); + } + } // release rowlock - // delete p$ts$id2 - batch.Delete(pkey); - } - int return_value = 0; - if (update_count && oldvis == AssocVisibility::VISIBLE) { - return_value = 1; - assert(count >= 1); - count--; - std::string myvalue; - myvalue.reserve(sizeof(int64_t)); - myvalue.resize(sizeof(int64_t)); - makeCountString(&myvalue[0], count); - batch.Put(ckey, leveldb::Slice(myvalue)); - } - status = db->Write(woptions_, &batch); + // Do a sync write after releasing the rowlock. This + // improves performance for hotrow updates. + batch.Clear(); + status = db->Write(woptions_sync_, &batch); if (!status.ok()) { throw generate_exception(tableName, Code::kNotFound, - "assocDelete Unable to Batch Write ", + "assocDelete Unable to Batch sync Write ", assocType, id1, id2, 0, 0, oldts, vis); - } + } if (update_count) { assert(count >= 0); return count; @@ -397,7 +455,7 @@ class AssocServiceHandler : virtual public AssocServiceIf { int keysize = appendRowKeyForCount(rowkeysize, keybuf); // column 'c' leveldb::Slice ckey(keybuf, keysize); - // query database to find value + // Query database to find value leveldb::Status status; std::string value; int64_t count; @@ -433,7 +491,6 @@ class AssocServiceHandler : virtual public AssocServiceIf { } int64_t ts, id2; - const leveldb::ReadOptions options; std::string wormhole; // convert times to time-till-LONGMAX @@ -453,9 +510,9 @@ class AssocServiceHandler : virtual public AssocServiceIf { // Position scan at 'p'$ts$id2 where ts = startTime and id2 = 0 id2 = 0; - leveldb::Iterator* iter = db->NewIterator(options); int keysize = appendRowKeyForPayload(rowkeysize, keybuf, startTime, id2); leveldb::Slice pkey(keybuf, keysize); + leveldb::Iterator* iter = db->NewIterator(roptions_); for (iter->Seek(pkey); iter->Valid() && limit > 0 ; iter->Next()) { // skip over records that the caller is not interested in @@ -511,30 +568,35 @@ class AssocServiceHandler : virtual public AssocServiceIf { // create rowkey char* keybuf = &dummy[0]; int rowkeysize = makeRowKey(keybuf, id1, assocType); + leveldb::Iterator* iter = db->NewIterator(roptions_); for (unsigned int index = 0; index < id2s.size(); index++) { int64_t ts; int8_t oldvis; leveldb::Status status; - std::string value, wormhole; + std::string wormhole; // query column 'm'$id2 id2 = id2s[index]; int keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2); leveldb::Slice ckey(keybuf, keysize); - status = db->Get(roptions_, ckey, &value); - - // parse results retrieved from database - if (status.IsNotFound()) { + iter->Seek(ckey); + if (!iter->Valid()) { + throw generate_exception(tableName, Code::kNotFound, + "Unable to find m$id2 ", + assocType, id1, id2, 0, 0, 0, Tleveldb::UNUSED1); + } + if (ckey != iter->key()) { continue; // non existant assoc - } else if (!status.ok() || - value.size() != sizeof(int64_t) + sizeof(int8_t)) { + } + leveldb::Slice value = iter->value(); + if (value.size() != sizeof(int64_t) + sizeof(int8_t)) { throw generate_exception(tableName, Code::kNotFound, "Unable to find m$id2 ", assocType, id1, id2, 0, 0, 0, Tleveldb::UNUSED1); } - extractTsVisString(&ts, &oldvis, &value[0]); + extractTsVisString(&ts, &oldvis, (char*)value.data_); if(oldvis != AssocVisibility::VISIBLE) { continue; } @@ -543,15 +605,11 @@ class AssocServiceHandler : virtual public AssocServiceIf { // this assoc is visible, scan 'p'$ts$id2 to retrieve payload. keysize = appendRowKeyForPayload(rowkeysize, keybuf, ts, id2); leveldb::Slice pkey(keybuf, keysize); - status = db->Get(roptions_, pkey, &value); - - // parse results retrieved from database - if (status.IsNotFound()) { - continue; // non existant assoc - } else if (!status.ok()) { + iter->Seek(pkey); + if (!iter->Valid() || (pkey != iter->key())) { throw generate_exception(tableName, Code::kNotFound, - "Unable to find m$id2 ", - assocType, id1, id2, 0, 0, 0, Tleveldb::UNUSED1); + "Unable to find p$ts$id2 ", + assocType, id1, id2, 0, 0, ts, Tleveldb::UNUSED1); } // allocate a new slot in the result set. @@ -561,7 +619,7 @@ class AssocServiceHandler : virtual public AssocServiceIf { // Fill up new element in result set. result->id2 = id2; result->time = convertTime(ts); - extractPayload((char*)value.c_str(), &result->id1Type, + extractPayload((char *)iter->value().data_, &result->id1Type, &result->id2Type, &result->dataVersion, result->data, wormhole); } diff --git a/thrift/test/simpletest.cpp b/thrift/test/simpletest.cpp index bdb5b344f..daf74edbb 100644 --- a/thrift/test/simpletest.cpp +++ b/thrift/test/simpletest.cpp @@ -289,6 +289,10 @@ static void testAssocs() { ASSERT_EQ(count, 2); printf("AssocPut second record suceeded.\n"); + // verify assoc count is 2 + cnt = aclient->taoAssocCount(dbname, assocType, id1); + ASSERT_EQ(cnt, 2); + // do a range get for id1+type and verify that there // are two assocs. readback.clear(); @@ -303,6 +307,11 @@ static void testAssocs() { int c = aclient->taoAssocDelete(dbname, assocType, id1, id2+2, AssocVisibility::HIDDEN, true, ""); ASSERT_EQ(c, 1); + + // verify assoc falls back to 1. + cnt = aclient->taoAssocCount(dbname, assocType, id1); + ASSERT_EQ(cnt, 1); + printf("AssocCount suceeded.\n"); } // diff --git a/util/murmurhash.cc b/util/murmurhash.cc new file mode 100644 index 000000000..2c650d8bd --- /dev/null +++ b/util/murmurhash.cc @@ -0,0 +1,178 @@ +/* + Murmurhash from http://sites.google.com/site/murmurhash/ + + All code is released to the public domain. For business purposes, Murmurhash is + under the MIT license. +*/ +#include "murmurhash.h" + +#if defined(__x86_64__) + +// ------------------------------------------------------------------- +// +// The same caveats as 32-bit MurmurHash2 apply here - beware of alignment +// and endian-ness issues if used across multiple platforms. +// +// 64-bit hash for 64-bit platforms + +uint64_t MurmurHash64A ( const void * key, int len, unsigned int seed ) +{ + const uint64_t m = 0xc6a4a7935bd1e995; + const int r = 47; + + uint64_t h = seed ^ (len * m); + + const uint64_t * data = (const uint64_t *)key; + const uint64_t * end = data + (len/8); + + while(data != end) + { + uint64_t k = *data++; + + k *= m; + k ^= k >> r; + k *= m; + + h ^= k; + h *= m; + } + + const unsigned char * data2 = (const unsigned char*)data; + + switch(len & 7) + { + case 7: h ^= ((uint64_t)data2[6]) << 48; + case 6: h ^= ((uint64_t)data2[5]) << 40; + case 5: h ^= ((uint64_t)data2[4]) << 32; + case 4: h ^= ((uint64_t)data2[3]) << 24; + case 3: h ^= ((uint64_t)data2[2]) << 16; + case 2: h ^= ((uint64_t)data2[1]) << 8; + case 1: h ^= ((uint64_t)data2[0]); + h *= m; + }; + + h ^= h >> r; + h *= m; + h ^= h >> r; + + return h; +} + +#elif defined(__i386__) + +// ------------------------------------------------------------------- +// +// Note - This code makes a few assumptions about how your machine behaves - +// +// 1. We can read a 4-byte value from any address without crashing +// 2. sizeof(int) == 4 +// +// And it has a few limitations - +// +// 1. It will not work incrementally. +// 2. It will not produce the same results on little-endian and big-endian +// machines. + +unsigned int MurmurHash2 ( const void * key, int len, unsigned int seed ) +{ + // 'm' and 'r' are mixing constants generated offline. + // They're not really 'magic', they just happen to work well. + + const unsigned int m = 0x5bd1e995; + const int r = 24; + + // Initialize the hash to a 'random' value + + unsigned int h = seed ^ len; + + // Mix 4 bytes at a time into the hash + + const unsigned char * data = (const unsigned char *)key; + + while(len >= 4) + { + unsigned int k = *(unsigned int *)data; + + k *= m; + k ^= k >> r; + k *= m; + + h *= m; + h ^= k; + + data += 4; + len -= 4; + } + + // Handle the last few bytes of the input array + + switch(len) + { + case 3: h ^= data[2] << 16; + case 2: h ^= data[1] << 8; + case 1: h ^= data[0]; + h *= m; + }; + + // Do a few final mixes of the hash to ensure the last few + // bytes are well-incorporated. + + h ^= h >> 13; + h *= m; + h ^= h >> 15; + + return h; +} + +#else + +// ------------------------------------------------------------------- +// +// Same as MurmurHash2, but endian- and alignment-neutral. +// Half the speed though, alas. + +unsigned int MurmurHashNeutral2 ( const void * key, int len, unsigned int seed ) +{ + const unsigned int m = 0x5bd1e995; + const int r = 24; + + unsigned int h = seed ^ len; + + const unsigned char * data = (const unsigned char *)key; + + while(len >= 4) + { + unsigned int k; + + k = data[0]; + k |= data[1] << 8; + k |= data[2] << 16; + k |= data[3] << 24; + + k *= m; + k ^= k >> r; + k *= m; + + h *= m; + h ^= k; + + data += 4; + len -= 4; + } + + switch(len) + { + case 3: h ^= data[2] << 16; + case 2: h ^= data[1] << 8; + case 1: h ^= data[0]; + h *= m; + }; + + h ^= h >> 13; + h *= m; + h ^= h >> 15; + + return h; +} + +#endif diff --git a/util/murmurhash.h b/util/murmurhash.h new file mode 100644 index 000000000..1f476b664 --- /dev/null +++ b/util/murmurhash.h @@ -0,0 +1,32 @@ +/* + Murmurhash from http://sites.google.com/site/murmurhash/ + + All code is released to the public domain. For business purposes, Murmurhash is + under the MIT license. +*/ +#ifndef MURMURHASH_H +#define MURMURHASH_H + +#include + +#if defined(__x86_64__) +#define MURMUR_HASH MurmurHash64A +uint64_t MurmurHash64A ( const void * key, int len, unsigned int seed ); +#define MurmurHash MurmurHash64A +typedef uint64_t murmur_t; + +#elif defined(__i386__) +#define MURMUR_HASH MurmurHash2 +unsigned int MurmurHash2 ( const void * key, int len, unsigned int seed ); +#define MurmurHash MurmurHash2 +typedef unsigned int murmur_t; + +#else +#define MURMUR_HASH MurmurHashNeutral2 +unsigned int MurmurHashNeutral2 ( const void * key, int len, unsigned int seed ); +#define MurmurHash MurmurHashNeutral2 +typedef unsigned int murmur_t; + +#endif + +#endif /* MURMURHASH_H */