Implement RowLocks for assoc schema

Summary:
Each assoc is identified by (id1, assocType). This is the rowkey.
Each row has a read/write rowlock. There is statically allocated array
of 2000 read/write locks. A rowkey is murmur-hashed to one of the
read/write locks.

assocPut and assocDelete acquires the rowlock in Write mode.
The key-updates are done within the rowlock with a atomic nosync
batch write to leveldb. Then the rowlock is released and
a write-with-sync is done to sync leveldb transaction log.

Test Plan: added unit test

Reviewers: heyongqiang

Reviewed By: heyongqiang

Differential Revision: https://reviews.facebook.net/D5859
main
Dhruba Borthakur 12 years ago
parent c1006d4276
commit f7975ac733
  1. 110
      thrift/assoc.h
  2. 9
      thrift/test/simpletest.cpp
  3. 178
      util/murmurhash.cc
  4. 32
      util/murmurhash.h

@ -15,6 +15,9 @@
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "port/port.h"
#include "util/mutexlock.h"
#include "util/murmurhash.h"
using namespace apache::thrift; using namespace apache::thrift;
using namespace apache::thrift::protocol; using namespace apache::thrift::protocol;
@ -35,7 +38,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
AssocServiceHandler(OpenHandles* openhandles) { AssocServiceHandler(OpenHandles* openhandles) {
openhandles_ = openhandles; openhandles_ = openhandles;
woptions_.sync = true; woptions_sync_.sync = true;
} }
int64_t taoAssocPut(const Text& tableName, int64_t assocType, int64_t id1, int64_t taoAssocPut(const Text& tableName, int64_t assocType, int64_t id1,
@ -103,11 +106,25 @@ class AssocServiceHandler : virtual public AssocServiceIf {
private: private:
OpenHandles* openhandles_; OpenHandles* openhandles_;
leveldb::ReadOptions roptions_; leveldb::ReadOptions roptions_;
leveldb::WriteOptions woptions_; leveldb::WriteOptions woptions_; // write with no sync
leveldb::WriteOptions woptions_sync_; // write with sync
// the maximum values returned in a rangeget/multiget call. // the maximum values returned in a rangeget/multiget call.
const static unsigned int MAX_RANGE_SIZE = 10000; const static unsigned int MAX_RANGE_SIZE = 10000;
// the seed for murmur hash (copied from Hadoop)
const static unsigned int HASHSEED = 0x5bd1e995;
// A bunch of rowlocks, sharded over the entire rowkey range
// Each rowkey is deterministically mapped to one of these locks.
leveldb::port::RWMutex rowlocks_[1000];
// A helper method that hashes the row key to a lock
leveldb::port::RWMutex* findRowLock(char* str, int size) {
int index = MurmurHash(str, size, HASHSEED) % sizeof(rowlocks_);
return &rowlocks_[index];
}
// //
// Inserts an assoc // Inserts an assoc
// If update_count, returns the updated count of the assoc. // If update_count, returns the updated count of the assoc.
@ -151,6 +168,12 @@ class AssocServiceHandler : virtual public AssocServiceIf {
int keysize = appendRowKeyForCount(rowkeysize, keybuf); int keysize = appendRowKeyForCount(rowkeysize, keybuf);
leveldb::Slice ckey(keybuf, keysize); leveldb::Slice ckey(keybuf, keysize);
// find the row lock
leveldb::port::RWMutex* rowlock = findRowLock(keybuf, rowkeysize);
{
// acquire the row lock
leveldb::WriteLock l(rowlock);
// Scan 'c' to get $count if $update_count == true // Scan 'c' to get $count if $update_count == true
if (update_count) { if (update_count) {
status = db->Get(roptions_, ckey, &value); status = db->Get(roptions_, ckey, &value);
@ -177,6 +200,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
status = db->Get(roptions_, mkey, &value); status = db->Get(roptions_, mkey, &value);
if (status.IsNotFound()) { if (status.IsNotFound()) {
newassoc = true; newassoc = true;
oldvis = UNUSED1;
} else if (!status.ok() || } else if (!status.ok() ||
(value.size() != sizeof(int64_t) + sizeof(int8_t))) { (value.size() != sizeof(int64_t) + sizeof(int8_t))) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
@ -192,12 +216,12 @@ class AssocServiceHandler : virtual public AssocServiceIf {
dummy3.resize(maxkeysize); dummy3.resize(maxkeysize);
keybuf = &dummy3[0]; keybuf = &dummy3[0];
rowkeysize = makeRowKey(keybuf, id1, assocType); rowkeysize = makeRowKey(keybuf, id1, assocType);
keysize = appendRowKeyForPayload(rowkeysize, keybuf, oldts, id2);
leveldb::Slice pkey(keybuf, keysize);
// if ts != oldts, then delete 'p'$old_ts$id2 // if ts != oldts, then delete 'p'$old_ts$id2
if (!newassoc) { if (!newassoc) {
extractTsVisString(&oldts, &oldvis, (char *)value.c_str()); extractTsVisString(&oldts, &oldvis, (char *)value.c_str());
keysize = appendRowKeyForPayload(rowkeysize, keybuf, oldts, id2);
leveldb::Slice pkey(keybuf, keysize);
if (ts != oldts) { if (ts != oldts) {
batch.Delete(pkey); batch.Delete(pkey);
} }
@ -226,12 +250,29 @@ class AssocServiceHandler : virtual public AssocServiceIf {
makeCountString(&myvalue[0], count); makeCountString(&myvalue[0], count);
batch.Put(ckey, leveldb::Slice(myvalue)); batch.Put(ckey, leveldb::Slice(myvalue));
} }
// We do a write here without sync. This writes it to the
// transaction log but does not sync it. It also makes these
// changes readable by other threads.
status = db->Write(woptions_, &batch); status = db->Write(woptions_, &batch);
if (!status.ok()) { if (!status.ok()) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"AssocPut Unable to batch write ", "AssocPut Unable to batch write ",
assocType, id1, id2, id1Type, id2Type, ts, vis); assocType, id1, id2, id1Type, id2Type, ts, vis);
} }
} // release rowlock
// Do a sync to the transaction log without holding the rowlock.
// This improves updates for hotrows. The disadvantage is that
// uncommiitted reads might be read by other threads, but that
// should be ok.
batch.Clear();
status = db->Write(woptions_sync_, &batch);
if (!status.ok()) {
throw generate_exception(tableName, Code::kNotFound,
"AssocPut Unable to batch sync write ",
assocType, id1, id2, id1Type, id2Type, ts, vis);
}
if (update_count) { if (update_count) {
assert(count > 0); assert(count > 0);
return count; return count;
@ -249,6 +290,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
int64_t id2, AssocVisibility vis, int64_t id2, AssocVisibility vis,
bool update_count, const Text& wormhole_comment) { bool update_count, const Text& wormhole_comment) {
leveldb::WriteBatch batch; leveldb::WriteBatch batch;
int return_value = 0;
int64_t count = 0; int64_t count = 0;
int64_t oldts; int64_t oldts;
int8_t oldvis; int8_t oldvis;
@ -265,6 +307,12 @@ class AssocServiceHandler : virtual public AssocServiceIf {
int keysize = appendRowKeyForCount(rowkeysize, keybuf); int keysize = appendRowKeyForCount(rowkeysize, keybuf);
leveldb::Slice ckey(keybuf, keysize); leveldb::Slice ckey(keybuf, keysize);
// find the row lock
leveldb::port::RWMutex* rowlock = findRowLock(keybuf, rowkeysize);
{
// acquire the row lock
leveldb::WriteLock l(rowlock);
// Scan 'c' to get $count if $update_count == true // Scan 'c' to get $count if $update_count == true
if (update_count) { if (update_count) {
status = db->Get(roptions_, ckey, &value); status = db->Get(roptions_, ckey, &value);
@ -361,7 +409,6 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// delete p$ts$id2 // delete p$ts$id2
batch.Delete(pkey); batch.Delete(pkey);
} }
int return_value = 0;
if (update_count && oldvis == AssocVisibility::VISIBLE) { if (update_count && oldvis == AssocVisibility::VISIBLE) {
return_value = 1; return_value = 1;
assert(count >= 1); assert(count >= 1);
@ -372,12 +419,23 @@ class AssocServiceHandler : virtual public AssocServiceIf {
makeCountString(&myvalue[0], count); makeCountString(&myvalue[0], count);
batch.Put(ckey, leveldb::Slice(myvalue)); batch.Put(ckey, leveldb::Slice(myvalue));
} }
status = db->Write(woptions_, &batch); status = db->Write(woptions_, &batch); // write with no sync
if (!status.ok()) { if (!status.ok()) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"assocDelete Unable to Batch Write ", "assocDelete Unable to Batch Write ",
assocType, id1, id2, 0, 0, oldts, vis); assocType, id1, id2, 0, 0, oldts, vis);
} }
} // release rowlock
// Do a sync write after releasing the rowlock. This
// improves performance for hotrow updates.
batch.Clear();
status = db->Write(woptions_sync_, &batch);
if (!status.ok()) {
throw generate_exception(tableName, Code::kNotFound,
"assocDelete Unable to Batch sync Write ",
assocType, id1, id2, 0, 0, oldts, vis);
}
if (update_count) { if (update_count) {
assert(count >= 0); assert(count >= 0);
return count; return count;
@ -397,7 +455,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
int keysize = appendRowKeyForCount(rowkeysize, keybuf); // column 'c' int keysize = appendRowKeyForCount(rowkeysize, keybuf); // column 'c'
leveldb::Slice ckey(keybuf, keysize); leveldb::Slice ckey(keybuf, keysize);
// query database to find value // Query database to find value
leveldb::Status status; leveldb::Status status;
std::string value; std::string value;
int64_t count; int64_t count;
@ -433,7 +491,6 @@ class AssocServiceHandler : virtual public AssocServiceIf {
} }
int64_t ts, id2; int64_t ts, id2;
const leveldb::ReadOptions options;
std::string wormhole; std::string wormhole;
// convert times to time-till-LONGMAX // convert times to time-till-LONGMAX
@ -453,9 +510,9 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// Position scan at 'p'$ts$id2 where ts = startTime and id2 = 0 // Position scan at 'p'$ts$id2 where ts = startTime and id2 = 0
id2 = 0; id2 = 0;
leveldb::Iterator* iter = db->NewIterator(options);
int keysize = appendRowKeyForPayload(rowkeysize, keybuf, startTime, id2); int keysize = appendRowKeyForPayload(rowkeysize, keybuf, startTime, id2);
leveldb::Slice pkey(keybuf, keysize); leveldb::Slice pkey(keybuf, keysize);
leveldb::Iterator* iter = db->NewIterator(roptions_);
for (iter->Seek(pkey); iter->Valid() && limit > 0 ; iter->Next()) { for (iter->Seek(pkey); iter->Valid() && limit > 0 ; iter->Next()) {
// skip over records that the caller is not interested in // skip over records that the caller is not interested in
@ -511,30 +568,35 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// create rowkey // create rowkey
char* keybuf = &dummy[0]; char* keybuf = &dummy[0];
int rowkeysize = makeRowKey(keybuf, id1, assocType); int rowkeysize = makeRowKey(keybuf, id1, assocType);
leveldb::Iterator* iter = db->NewIterator(roptions_);
for (unsigned int index = 0; index < id2s.size(); index++) { for (unsigned int index = 0; index < id2s.size(); index++) {
int64_t ts; int64_t ts;
int8_t oldvis; int8_t oldvis;
leveldb::Status status; leveldb::Status status;
std::string value, wormhole; std::string wormhole;
// query column 'm'$id2 // query column 'm'$id2
id2 = id2s[index]; id2 = id2s[index];
int keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2); int keysize = appendRowKeyForMeta(rowkeysize, keybuf, id2);
leveldb::Slice ckey(keybuf, keysize); leveldb::Slice ckey(keybuf, keysize);
status = db->Get(roptions_, ckey, &value); iter->Seek(ckey);
if (!iter->Valid()) {
// parse results retrieved from database throw generate_exception(tableName, Code::kNotFound,
if (status.IsNotFound()) { "Unable to find m$id2 ",
assocType, id1, id2, 0, 0, 0, Tleveldb::UNUSED1);
}
if (ckey != iter->key()) {
continue; // non existant assoc continue; // non existant assoc
} else if (!status.ok() || }
value.size() != sizeof(int64_t) + sizeof(int8_t)) { leveldb::Slice value = iter->value();
if (value.size() != sizeof(int64_t) + sizeof(int8_t)) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"Unable to find m$id2 ", "Unable to find m$id2 ",
assocType, id1, id2, 0, 0, 0, Tleveldb::UNUSED1); assocType, id1, id2, 0, 0, 0, Tleveldb::UNUSED1);
} }
extractTsVisString(&ts, &oldvis, &value[0]); extractTsVisString(&ts, &oldvis, (char*)value.data_);
if(oldvis != AssocVisibility::VISIBLE) { if(oldvis != AssocVisibility::VISIBLE) {
continue; continue;
} }
@ -543,15 +605,11 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// this assoc is visible, scan 'p'$ts$id2 to retrieve payload. // this assoc is visible, scan 'p'$ts$id2 to retrieve payload.
keysize = appendRowKeyForPayload(rowkeysize, keybuf, ts, id2); keysize = appendRowKeyForPayload(rowkeysize, keybuf, ts, id2);
leveldb::Slice pkey(keybuf, keysize); leveldb::Slice pkey(keybuf, keysize);
status = db->Get(roptions_, pkey, &value); iter->Seek(pkey);
if (!iter->Valid() || (pkey != iter->key())) {
// parse results retrieved from database
if (status.IsNotFound()) {
continue; // non existant assoc
} else if (!status.ok()) {
throw generate_exception(tableName, Code::kNotFound, throw generate_exception(tableName, Code::kNotFound,
"Unable to find m$id2 ", "Unable to find p$ts$id2 ",
assocType, id1, id2, 0, 0, 0, Tleveldb::UNUSED1); assocType, id1, id2, 0, 0, ts, Tleveldb::UNUSED1);
} }
// allocate a new slot in the result set. // allocate a new slot in the result set.
@ -561,7 +619,7 @@ class AssocServiceHandler : virtual public AssocServiceIf {
// Fill up new element in result set. // Fill up new element in result set.
result->id2 = id2; result->id2 = id2;
result->time = convertTime(ts); result->time = convertTime(ts);
extractPayload((char*)value.c_str(), &result->id1Type, extractPayload((char *)iter->value().data_, &result->id1Type,
&result->id2Type, &result->id2Type,
&result->dataVersion, result->data, wormhole); &result->dataVersion, result->data, wormhole);
} }

@ -289,6 +289,10 @@ static void testAssocs() {
ASSERT_EQ(count, 2); ASSERT_EQ(count, 2);
printf("AssocPut second record suceeded.\n"); printf("AssocPut second record suceeded.\n");
// verify assoc count is 2
cnt = aclient->taoAssocCount(dbname, assocType, id1);
ASSERT_EQ(cnt, 2);
// do a range get for id1+type and verify that there // do a range get for id1+type and verify that there
// are two assocs. // are two assocs.
readback.clear(); readback.clear();
@ -303,6 +307,11 @@ static void testAssocs() {
int c = aclient->taoAssocDelete(dbname, assocType, int c = aclient->taoAssocDelete(dbname, assocType,
id1, id2+2, AssocVisibility::HIDDEN, true, ""); id1, id2+2, AssocVisibility::HIDDEN, true, "");
ASSERT_EQ(c, 1); ASSERT_EQ(c, 1);
// verify assoc falls back to 1.
cnt = aclient->taoAssocCount(dbname, assocType, id1);
ASSERT_EQ(cnt, 1);
printf("AssocCount suceeded.\n");
} }
// //

@ -0,0 +1,178 @@
/*
Murmurhash from http://sites.google.com/site/murmurhash/
All code is released to the public domain. For business purposes, Murmurhash is
under the MIT license.
*/
#include "murmurhash.h"
#if defined(__x86_64__)
// -------------------------------------------------------------------
//
// The same caveats as 32-bit MurmurHash2 apply here - beware of alignment
// and endian-ness issues if used across multiple platforms.
//
// 64-bit hash for 64-bit platforms
uint64_t MurmurHash64A ( const void * key, int len, unsigned int seed )
{
const uint64_t m = 0xc6a4a7935bd1e995;
const int r = 47;
uint64_t h = seed ^ (len * m);
const uint64_t * data = (const uint64_t *)key;
const uint64_t * end = data + (len/8);
while(data != end)
{
uint64_t k = *data++;
k *= m;
k ^= k >> r;
k *= m;
h ^= k;
h *= m;
}
const unsigned char * data2 = (const unsigned char*)data;
switch(len & 7)
{
case 7: h ^= ((uint64_t)data2[6]) << 48;
case 6: h ^= ((uint64_t)data2[5]) << 40;
case 5: h ^= ((uint64_t)data2[4]) << 32;
case 4: h ^= ((uint64_t)data2[3]) << 24;
case 3: h ^= ((uint64_t)data2[2]) << 16;
case 2: h ^= ((uint64_t)data2[1]) << 8;
case 1: h ^= ((uint64_t)data2[0]);
h *= m;
};
h ^= h >> r;
h *= m;
h ^= h >> r;
return h;
}
#elif defined(__i386__)
// -------------------------------------------------------------------
//
// Note - This code makes a few assumptions about how your machine behaves -
//
// 1. We can read a 4-byte value from any address without crashing
// 2. sizeof(int) == 4
//
// And it has a few limitations -
//
// 1. It will not work incrementally.
// 2. It will not produce the same results on little-endian and big-endian
// machines.
unsigned int MurmurHash2 ( const void * key, int len, unsigned int seed )
{
// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
const unsigned int m = 0x5bd1e995;
const int r = 24;
// Initialize the hash to a 'random' value
unsigned int h = seed ^ len;
// Mix 4 bytes at a time into the hash
const unsigned char * data = (const unsigned char *)key;
while(len >= 4)
{
unsigned int k = *(unsigned int *)data;
k *= m;
k ^= k >> r;
k *= m;
h *= m;
h ^= k;
data += 4;
len -= 4;
}
// Handle the last few bytes of the input array
switch(len)
{
case 3: h ^= data[2] << 16;
case 2: h ^= data[1] << 8;
case 1: h ^= data[0];
h *= m;
};
// Do a few final mixes of the hash to ensure the last few
// bytes are well-incorporated.
h ^= h >> 13;
h *= m;
h ^= h >> 15;
return h;
}
#else
// -------------------------------------------------------------------
//
// Same as MurmurHash2, but endian- and alignment-neutral.
// Half the speed though, alas.
unsigned int MurmurHashNeutral2 ( const void * key, int len, unsigned int seed )
{
const unsigned int m = 0x5bd1e995;
const int r = 24;
unsigned int h = seed ^ len;
const unsigned char * data = (const unsigned char *)key;
while(len >= 4)
{
unsigned int k;
k = data[0];
k |= data[1] << 8;
k |= data[2] << 16;
k |= data[3] << 24;
k *= m;
k ^= k >> r;
k *= m;
h *= m;
h ^= k;
data += 4;
len -= 4;
}
switch(len)
{
case 3: h ^= data[2] << 16;
case 2: h ^= data[1] << 8;
case 1: h ^= data[0];
h *= m;
};
h ^= h >> 13;
h *= m;
h ^= h >> 15;
return h;
}
#endif

@ -0,0 +1,32 @@
/*
Murmurhash from http://sites.google.com/site/murmurhash/
All code is released to the public domain. For business purposes, Murmurhash is
under the MIT license.
*/
#ifndef MURMURHASH_H
#define MURMURHASH_H
#include <stdint.h>
#if defined(__x86_64__)
#define MURMUR_HASH MurmurHash64A
uint64_t MurmurHash64A ( const void * key, int len, unsigned int seed );
#define MurmurHash MurmurHash64A
typedef uint64_t murmur_t;
#elif defined(__i386__)
#define MURMUR_HASH MurmurHash2
unsigned int MurmurHash2 ( const void * key, int len, unsigned int seed );
#define MurmurHash MurmurHash2
typedef unsigned int murmur_t;
#else
#define MURMUR_HASH MurmurHashNeutral2
unsigned int MurmurHashNeutral2 ( const void * key, int len, unsigned int seed );
#define MurmurHash MurmurHashNeutral2
typedef unsigned int murmur_t;
#endif
#endif /* MURMURHASH_H */
Loading…
Cancel
Save