From 1447bb5919256ac94872821b48baff3cf2b99c26 Mon Sep 17 00:00:00 2001 From: Naman Gupta Date: Thu, 16 Jan 2014 15:11:19 -0800 Subject: [PATCH] Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases. Summary: This diff fixes 2 hacks: * The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint. * The callback function has 3 functionalities 1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1. 2. Generate a new buffer indicating merged value. Returns 2. 3. Fails to do either of above, based on whatever application logic. Returns 0. Test Plan: Just make all for now. I'm adding another unit test to test each scenario. Reviewers: dhruba, haobo Reviewed By: haobo CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo Differential Revision: https://reviews.facebook.net/D15195 --- db/db_test.cc | 100 ++++++++++++++++++++++++++++++++------ db/memtable.cc | 76 ++++++++++++++--------------- db/write_batch.cc | 26 +++++----- include/rocksdb/options.h | 55 +++++++++++++-------- 4 files changed, 170 insertions(+), 87 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 9a5d128df..42bcf8277 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -712,20 +712,49 @@ class DBTest { // If previous value is nullptr or delta is > than previous value, // sets newValue with delta // If previous value is not empty, - // updates previous value with 'b' string of previous value size - static bool updateInPlace(char* prevValue, size_t prevSize, - Slice delta, std::string* newValue) { - if (prevValue == nullptr || delta.size() > prevSize) { + // updates previous value with 'b' string of previous value size - 1. + static UpdateStatus + updateInPlaceSmallerSize(char* prevValue, uint32_t* prevSize, + Slice delta, std::string* newValue) { + if (prevValue == nullptr) { *newValue = std::string(delta.size(), 'c'); - return false; + return UpdateStatus::UPDATED; } else { - std::string str_b = std::string(prevSize, 'b'); + *prevSize = *prevSize - 1; + std::string str_b = std::string(*prevSize, 'b'); memcpy(prevValue, str_b.c_str(), str_b.size()); - return true; + return UpdateStatus::UPDATED_INPLACE; } } - // Used to test InplaceUpdate + static UpdateStatus + updateInPlaceSmallerVarintSize(char* prevValue, uint32_t* prevSize, + Slice delta, std::string* newValue) { + if (prevValue == nullptr) { + *newValue = std::string(delta.size(), 'c'); + return UpdateStatus::UPDATED; + } else { + *prevSize = 1; + std::string str_b = std::string(*prevSize, 'b'); + memcpy(prevValue, str_b.c_str(), str_b.size()); + return UpdateStatus::UPDATED_INPLACE; + } + } + + static UpdateStatus + updateInPlaceLargerSize(char* prevValue, uint32_t* prevSize, + Slice delta, std::string* newValue) { + *newValue = std::string(delta.size(), 'c'); + return UpdateStatus::UPDATED; + } + + static UpdateStatus + updateInPlaceNoAction(char* prevValue, uint32_t* prevSize, + Slice delta, std::string* newValue) { + return UpdateStatus::UPDATE_FAILED; + } + + // Utility method to test InplaceUpdate void validateNumberOfEntries(int numValues) { Iterator* iter = dbfull()->TEST_NewInternalIterator(); iter->SeekToFirst(); @@ -2619,7 +2648,7 @@ TEST(DBTest, InPlaceUpdateLargeNewValue) { } -TEST(DBTest, InPlaceUpdateCallback) { +TEST(DBTest, InPlaceUpdateCallbackSmallerSize) { do { Options options = CurrentOptions(); options.create_if_missing = true; @@ -2628,7 +2657,7 @@ TEST(DBTest, InPlaceUpdateCallback) { options.env = env_; options.write_buffer_size = 100000; options.inplace_callback = - rocksdb::DBTest::updateInPlace; + rocksdb::DBTest::updateInPlaceSmallerSize; Reopen(&options); // Update key with values of smaller size @@ -2638,7 +2667,7 @@ TEST(DBTest, InPlaceUpdateCallback) { for (int i = numValues; i > 0; i--) { ASSERT_OK(Put("key", DummyString(i, 'a'))); - ASSERT_EQ(DummyString(numValues, 'b'), Get("key")); + ASSERT_EQ(DummyString(i - 1, 'b'), Get("key")); } // Only 1 instance for that key. @@ -2647,9 +2676,31 @@ TEST(DBTest, InPlaceUpdateCallback) { } while (ChangeCompactOptions()); } -TEST(DBTest, InPlaceUpdateCallbackNotFound) { +TEST(DBTest, InPlaceUpdateCallbackSmallerVarintSize) { do { - //Test sst get/update/put + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + + options.env = env_; + options.write_buffer_size = 100000; + options.inplace_callback = + rocksdb::DBTest::updateInPlaceSmallerVarintSize; + Reopen(&options); + + // Update key with values of smaller varint size + int numValues = 265; + ASSERT_OK(Put("key", DummyString(numValues, 'a'))); + ASSERT_EQ(DummyString(numValues, 'c'), Get("key")); + + for (int i = numValues; i > 0; i--) { + ASSERT_OK(Put("key", DummyString(i, 'a'))); + ASSERT_EQ(DummyString(1, 'b'), Get("key")); + } + + // Only 1 instance for that key. + validateNumberOfEntries(1); + } while (ChangeCompactOptions()); } @@ -2662,12 +2713,12 @@ TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) { options.env = env_; options.write_buffer_size = 100000; options.inplace_callback = - rocksdb::DBTest::updateInPlace; + rocksdb::DBTest::updateInPlaceLargerSize; Reopen(&options); // Update key with values of larger size int numValues = 10; - for (int i = 1; i <= numValues; i++) { + for (int i = 0; i < numValues; i++) { ASSERT_OK(Put("key", DummyString(i, 'a'))); ASSERT_EQ(DummyString(i, 'c'), Get("key")); } @@ -2679,6 +2730,25 @@ TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) { } while (ChangeCompactOptions()); } +TEST(DBTest, InPlaceUpdateCallbackNoAction) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + + options.env = env_; + options.write_buffer_size = 100000; + options.inplace_callback = + rocksdb::DBTest::updateInPlaceNoAction; + Reopen(&options); + + // Callback function requests no actions from db + ASSERT_OK(Put("key", DummyString(1, 'a'))); + ASSERT_EQ(Get("key"), "NOT_FOUND"); + + } while (ChangeCompactOptions()); +} + // This is a static filter used for filtering // kvs during the compaction process. static int cfilter_count; diff --git a/db/memtable.cc b/db/memtable.cc index 430c3589b..73a24f8d7 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -346,21 +346,21 @@ void MemTable::Update(SequenceNumber seq, switch (static_cast(tag & 0xff)) { case kTypeValue: { Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); - uint32_t prev_value_size = prev_value.size(); - uint32_t new_value_size = value.size(); + uint32_t prev_size = prev_value.size(); + uint32_t new_size = value.size(); - // Update value, if newValue size <= curValue size - if (new_value_size <= prev_value_size ) { + // Update value, if new value size <= previous value size + if (new_size <= prev_size ) { char* p = EncodeVarint32(const_cast(key_ptr) + key_length, - new_value_size); + new_size); WriteLock wl(GetLock(lkey.user_key())); - memcpy(p, value.data(), new_value_size); + memcpy(p, value.data(), new_size); assert( - (p + new_value_size) - entry == + (p + new_size) - entry == (unsigned) (VarintLength(key_length) + key_length + - VarintLength(new_value_size) + - new_value_size) + VarintLength(new_size) + + new_size) ); // no need to update bloom, as user key does not change. return; @@ -380,9 +380,9 @@ void MemTable::Update(SequenceNumber seq, } bool MemTable::UpdateCallback(SequenceNumber seq, - const Slice& key, - const Slice& delta, - const Options& options) { + const Slice& key, + const Slice& delta, + const Options& options) { LookupKey lkey(key, seq); Slice memkey = lkey.memtable_key(); @@ -410,39 +410,35 @@ bool MemTable::UpdateCallback(SequenceNumber seq, switch (static_cast(tag & 0xff)) { case kTypeValue: { Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); - uint32_t prev_value_size = prev_value.size(); + uint32_t prev_size = prev_value.size(); + + char* prev_buffer = const_cast(prev_value.data()); + uint32_t new_prev_size = prev_size; - WriteLock wl(GetLock(lkey.user_key())); std::string str_value; - if (options.inplace_callback(const_cast(prev_value.data()), - prev_value_size, delta, &str_value)) { + WriteLock wl(GetLock(lkey.user_key())); + auto status = options.inplace_callback(prev_buffer, &new_prev_size, + delta, &str_value); + if (status == UpdateStatus::UPDATED_INPLACE) { // Value already updated by callback. - // TODO: Change size of value in memtable slice. - // This works for leaf, since size is already encoded in the - // value. It doesn't depend on rocksdb buffer size. + assert(new_prev_size <= prev_size); + if (new_prev_size < prev_size) { + // overwrite the new prev_size + char* p = EncodeVarint32(const_cast(key_ptr) + key_length, + new_prev_size); + if (VarintLength(new_prev_size) < VarintLength(prev_size)) { + // shift the value buffer as well. + memcpy(p, prev_buffer, new_prev_size); + } + } + RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED); return true; - } - Slice slice_value = Slice(str_value); - uint32_t new_value_size = slice_value.size(); - - // Update value, if newValue size <= curValue size - if (new_value_size <= prev_value_size ) { - char* p = EncodeVarint32(const_cast(key_ptr) + key_length, - new_value_size); - - memcpy(p, slice_value.data(), new_value_size); - assert( - (p + new_value_size) - entry == - (unsigned) (VarintLength(key_length) + - key_length + - VarintLength(new_value_size) + - new_value_size) - ); + } else if (status == UpdateStatus::UPDATED) { + Add(seq, kTypeValue, key, Slice(str_value)); + RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN); return true; - } else { - // If we don't have enough space to update in-place - // Return as NotUpdatable, and do normal Add() - Add(seq, kTypeValue, key, slice_value); + } else if (status == UpdateStatus::UPDATE_FAILED) { + // No action required. Return. return true; } } diff --git a/db/write_batch.cc b/db/write_batch.cc index 6a427f1a6..72fd2a9ea 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -203,9 +203,8 @@ class MemTableInserter : public WriteBatch::Handler { RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); } else { if (mem_->UpdateCallback(sequence_, key, value, *options_)) { - RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); } else { - // key not found in memtable. Do sst get/update/add + // key not found in memtable. Do sst get, update, add SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; ReadOptions ropts; @@ -215,22 +214,25 @@ class MemTableInserter : public WriteBatch::Handler { std::string merged_value; Status s = db_->Get(ropts, key, &prev_value); char* prev_buffer = const_cast(prev_value.c_str()); - size_t prev_size = prev_value.size(); - if (options_->inplace_callback(s.ok() ? prev_buffer: nullptr, - s.ok() ? prev_size: 0, - value, &merged_value)) { + uint32_t prev_size = prev_value.size(); + auto status = + options_->inplace_callback(s.ok() ? prev_buffer: nullptr, + s.ok() ? &prev_size: nullptr, + value, &merged_value); + if (status == UpdateStatus::UPDATED_INPLACE) { // prev_value is updated in-place with final value. mem_->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size)); RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN); - } else { - // merged_value contains the final value. Only add if not empty. - if (!merged_value.empty()) { - mem_->Add(sequence_, kTypeValue, key, Slice(merged_value)); - RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN); - } + } else if (status == UpdateStatus::UPDATED) { + // merged_value contains the final value. + mem_->Add(sequence_, kTypeValue, key, Slice(merged_value)); + RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN); } } } + // Since all Puts are logged in trasaction logs (if enabled), always bump + // sequence number. Even if the update eventually fails and does not result + // in memtable add/update. sequence_++; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8499f6025..cf2daa819 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -65,6 +65,12 @@ struct CompressionOptions { : window_bits(wbits), level(lev), strategy(strategy) {} }; +enum UpdateStatus { // Return status For inplace update callback + UPDATE_FAILED = 0, // Nothing to update + UPDATED_INPLACE = 1, // Value updated inplace + UPDATED = 2, // No inplace update. Merged value set +}; + // Options to control the behavior of a database (passed to DB::Open) struct Options { // ------------------- @@ -650,38 +656,47 @@ struct Options { // Default: 10000, if inplace_update_support = true, else 0. size_t inplace_update_num_locks; - // * existing_value - pointer to previous value (from both memtable and sst). - // nullptr if key doesn't exist - // * existing_value_size - sizeof(existing_value). 0 if key doesn't exist - // * delta_value - Delta value to be merged with the 'existing_value'. - // Stored in transaction logs. - // * merged_value - Set when delta is applied on the previous value. + // existing_value - pointer to previous value (from both memtable and sst). + // nullptr if key doesn't exist + // existing_value_size - pointer to size of existing_value). + // nullptr if key doesn't exist + // delta_value - Delta value to be merged with the existing_value. + // Stored in transaction logs. + // merged_value - Set when delta is applied on the previous value. // Applicable only when inplace_update_support is true, // this callback function is called at the time of updating the memtable // as part of a Put operation, lets say Put(key, delta_value). It allows the // 'delta_value' specified as part of the Put operation to be merged with - // an 'existing_value' of the 'key' in the database. + // an 'existing_value' of the key in the database. // If the merged value is smaller in size that the 'existing_value', - // then this function can update the 'existing_value' buffer inplace if it - // wishes to. The callback should return true in this case. (In this case, - // the snapshot-semantics of the rocksdb Iterator is not atomic anymore). - - // If the application does not wish to modify the 'existing_value' buffer - // inplace, then it should allocate a new buffer and update it by merging the - // 'existing_value' and the Put 'delta_value' and set the 'merged_value' - // pointer to this buffer. The callback should return false in this case. It - // is upto the calling layer to manage the memory returned in 'merged_value'. + // then this function can update the 'existing_value' buffer inplace and + // the corresponding 'existing_value'_size pointer, if it wishes to. + // The callback should return UpdateStatus::UPDATED_INPLACE. + // In this case. (In this case, the snapshot-semantics of the rocksdb + // Iterator is not atomic anymore). + + // If the merged value is larger in size than the 'existing_value' or the + // application does not wish to modify the 'existing_value' buffer inplace, + // then the merged value should be returned via *merge_value. It is set by + // merging the 'existing_value' and the Put 'delta_value'. The callback should + // return UpdateStatus::UPDATED in this case. This merged value will be added + // to the memtable. + + // If merging fails or the application does not wish to take any action, + // then the callback should return UpdateStatus::UPDATE_FAILED. // Please remember that the original call from the application is Put(key, - // delta_value). So the transaction log (if enabled) will still contain - // (key, delta_value). The 'merged_value' is not stored in the transaction log + // delta_value). So the transaction log (if enabled) will still contain (key, + // delta_value). The 'merged_value' is not stored in the transaction log. // Hence the inplace_callback function should be consistent across db reopens. // Default: nullptr - bool (*inplace_callback)(char* existing_value, size_t existing_value_size, - Slice delta_value, std::string* merged_value); + UpdateStatus (*inplace_callback)(char* existing_value, + uint32_t* existing_value_size, + Slice delta_value, + std::string* merged_value); // if prefix_extractor is set and bloom_bits is not 0, create prefix bloom // for memtable