From 0d1e0722efc6a946e24a490e897218441b7f6bcd Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Mon, 27 Jun 2022 16:37:09 -0700 Subject: [PATCH] Fix in-place updates for value types other than kTypeValue (#10254) Summary: The patch fixes a couple of issues related to in-place updates: 1) the value type was not passed from `MemTableInserter::PutCFImpl` to `MemTable::Update` and 2) `MemTable::UpdateCallback` was called for any value type (with the callee's logic assuming `kTypeValue`) even though the callback mechanism is only safe for plain values. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10254 Test Plan: `make check` Reviewed By: riversand963 Differential Revision: D37463644 Pulled By: ltamasi fbshipit-source-id: 33802477dac0691681f416ae84c4d9742c6fe41a --- db/db_inplace_update_test.cc | 54 ++++++++++++++++ db/memtable.cc | 118 +++++++++++++++++------------------ db/memtable.h | 6 +- db/write_batch.cc | 6 +- 4 files changed, 118 insertions(+), 66 deletions(-) diff --git a/db/db_inplace_update_test.cc b/db/db_inplace_update_test.cc index 2012cabf1..3921a3b00 100644 --- a/db/db_inplace_update_test.cc +++ b/db/db_inplace_update_test.cc @@ -65,6 +65,60 @@ TEST_F(DBTestInPlaceUpdate, InPlaceUpdateLargeNewValue) { } while (ChangeCompactOptions()); } +TEST_F(DBTestInPlaceUpdate, InPlaceUpdateEntitySmallerNewValue) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + options.env = env_; + options.allow_concurrent_memtable_write = false; + + Reopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + // Update key with values of smaller size + constexpr int num_values = 10; + for (int i = num_values; i > 0; --i) { + constexpr char key[] = "key"; + const std::string value = DummyString(i, 'a'); + WideColumns wide_columns{{"attr", value}}; + + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[1], key, wide_columns)); + // TODO: use Get to check entity once it's supported + } + + // Only 1 instance for that key. + validateNumberOfEntries(1, 1); + } while (ChangeCompactOptions()); +} + +TEST_F(DBTestInPlaceUpdate, InPlaceUpdateEntityLargerNewValue) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + options.env = env_; + options.allow_concurrent_memtable_write = false; + + Reopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + // Update key with values of larger size + constexpr int num_values = 10; + for (int i = 0; i < num_values; ++i) { + constexpr char key[] = "key"; + const std::string value = DummyString(i, 'a'); + WideColumns wide_columns{{"attr", value}}; + + ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[1], key, wide_columns)); + // TODO: use Get to check entity once it's supported + } + + // All 10 updates exist in the internal iterator + validateNumberOfEntries(num_values, 1); + } while (ChangeCompactOptions()); +} + TEST_F(DBTestInPlaceUpdate, InPlaceUpdateCallbackSmallerSize) { do { Options options = CurrentOptions(); diff --git a/db/memtable.cc b/db/memtable.cc index e01164188..ab8c6e2ac 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -1058,8 +1058,8 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, PERF_COUNTER_ADD(get_from_memtable_count, 1); } -Status MemTable::Update(SequenceNumber seq, const Slice& key, - const Slice& value, +Status MemTable::Update(SequenceNumber seq, ValueType value_type, + const Slice& key, const Slice& value, const ProtectionInfoKVOS64* kv_prot_info) { LookupKey lkey(key, seq); Slice mem_key = lkey.memtable_key(); @@ -1089,7 +1089,7 @@ Status MemTable::Update(SequenceNumber seq, const Slice& key, SequenceNumber existing_seq; UnPackSequenceAndType(tag, &existing_seq, &type); assert(existing_seq != seq); - if (type == kTypeValue) { + if (type == value_type) { Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); uint32_t prev_size = static_cast(prev_value.size()); uint32_t new_size = static_cast(value.size()); @@ -1117,8 +1117,8 @@ Status MemTable::Update(SequenceNumber seq, const Slice& key, } } - // The latest value is not `kTypeValue` or key doesn't exist - return Add(seq, kTypeValue, key, value, kv_prot_info); + // The latest value is not value_type or key doesn't exist + return Add(seq, value_type, key, value, kv_prot_info); } Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key, @@ -1151,66 +1151,62 @@ Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key, ValueType type; uint64_t existing_seq; UnPackSequenceAndType(tag, &existing_seq, &type); - switch (type) { - case kTypeValue: { - Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); - uint32_t prev_size = static_cast(prev_value.size()); - - char* prev_buffer = const_cast(prev_value.data()); - uint32_t new_prev_size = prev_size; + if (type == kTypeValue) { + Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); + uint32_t prev_size = static_cast(prev_value.size()); - std::string str_value; - WriteLock wl(GetLock(lkey.user_key())); - auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size, - delta, &str_value); - if (status == UpdateStatus::UPDATED_INPLACE) { - // Value already updated by callback. - assert(new_prev_size <= prev_size); - if (new_prev_size < prev_size) { - // overwrite the new prev_size - char* p = EncodeVarint32(const_cast(key_ptr) + key_length, - new_prev_size); - if (VarintLength(new_prev_size) < VarintLength(prev_size)) { - // shift the value buffer as well. - memcpy(p, prev_buffer, new_prev_size); - prev_buffer = p; - } - } - RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); - UpdateFlushState(); - if (kv_prot_info != nullptr) { - ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); - // `seq` is swallowed and `existing_seq` prevails. - updated_kv_prot_info.UpdateS(seq, existing_seq); - updated_kv_prot_info.UpdateV(delta, - Slice(prev_buffer, new_prev_size)); - Slice encoded(entry, prev_buffer + new_prev_size - entry); - return VerifyEncodedEntry(encoded, updated_kv_prot_info); - } - return Status::OK(); - } else if (status == UpdateStatus::UPDATED) { - Status s; - if (kv_prot_info != nullptr) { - ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); - updated_kv_prot_info.UpdateV(delta, str_value); - s = Add(seq, kTypeValue, key, Slice(str_value), - &updated_kv_prot_info); - } else { - s = Add(seq, kTypeValue, key, Slice(str_value), - nullptr /* kv_prot_info */); + char* prev_buffer = const_cast(prev_value.data()); + uint32_t new_prev_size = prev_size; + + std::string str_value; + WriteLock wl(GetLock(lkey.user_key())); + auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size, + delta, &str_value); + if (status == UpdateStatus::UPDATED_INPLACE) { + // Value already updated by callback. + assert(new_prev_size <= prev_size); + if (new_prev_size < prev_size) { + // overwrite the new prev_size + char* p = EncodeVarint32(const_cast(key_ptr) + key_length, + new_prev_size); + if (VarintLength(new_prev_size) < VarintLength(prev_size)) { + // shift the value buffer as well. + memcpy(p, prev_buffer, new_prev_size); + prev_buffer = p; } - RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); - UpdateFlushState(); - return s; - } else if (status == UpdateStatus::UPDATE_FAILED) { - // `UPDATE_FAILED` is named incorrectly. It indicates no update - // happened. It does not indicate a failure happened. - UpdateFlushState(); - return Status::OK(); } + RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); + UpdateFlushState(); + if (kv_prot_info != nullptr) { + ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); + // `seq` is swallowed and `existing_seq` prevails. + updated_kv_prot_info.UpdateS(seq, existing_seq); + updated_kv_prot_info.UpdateV(delta, + Slice(prev_buffer, new_prev_size)); + Slice encoded(entry, prev_buffer + new_prev_size - entry); + return VerifyEncodedEntry(encoded, updated_kv_prot_info); + } + return Status::OK(); + } else if (status == UpdateStatus::UPDATED) { + Status s; + if (kv_prot_info != nullptr) { + ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); + updated_kv_prot_info.UpdateV(delta, str_value); + s = Add(seq, kTypeValue, key, Slice(str_value), + &updated_kv_prot_info); + } else { + s = Add(seq, kTypeValue, key, Slice(str_value), + nullptr /* kv_prot_info */); + } + RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); + UpdateFlushState(); + return s; + } else if (status == UpdateStatus::UPDATE_FAILED) { + // `UPDATE_FAILED` is named incorrectly. It indicates no update + // happened. It does not indicate a failure happened. + UpdateFlushState(); + return Status::OK(); } - default: - break; } } } diff --git a/db/memtable.h b/db/memtable.h index 965404d25..fe038a90b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -274,7 +274,7 @@ class MemTable { void MultiGet(const ReadOptions& read_options, MultiGetRange* range, ReadCallback* callback); - // If `key` exists in current memtable with type `kTypeValue` and the existing + // If `key` exists in current memtable with type value_type and the existing // value is at least as large as the new value, updates it in-place. Otherwise // adds the new value to the memtable out-of-place. // @@ -284,8 +284,8 @@ class MemTable { // // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable. - Status Update(SequenceNumber seq, const Slice& key, const Slice& value, - const ProtectionInfoKVOS64* kv_prot_info); + Status Update(SequenceNumber seq, ValueType value_type, const Slice& key, + const Slice& value, const ProtectionInfoKVOS64* kv_prot_info); // If `key` exists in current memtable with type `kTypeValue` and the existing // value is at least as large as the new value, updates it in-place. Otherwise diff --git a/db/write_batch.cc b/db/write_batch.cc index 5a71ad782..90e72b751 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1987,11 +1987,13 @@ class MemTableInserter : public WriteBatch::Handler { mem->Add(sequence_, value_type, key, value, kv_prot_info, concurrent_memtable_writes_, get_post_process_info(mem), hint_per_batch_ ? &GetHintMap()[mem] : nullptr); - } else if (moptions->inplace_callback == nullptr) { + } else if (moptions->inplace_callback == nullptr || + value_type != kTypeValue) { assert(!concurrent_memtable_writes_); - ret_status = mem->Update(sequence_, key, value, kv_prot_info); + ret_status = mem->Update(sequence_, value_type, key, value, kv_prot_info); } else { assert(!concurrent_memtable_writes_); + assert(value_type == kTypeValue); ret_status = mem->UpdateCallback(sequence_, key, value, kv_prot_info); if (ret_status.IsNotFound()) { // key not found in memtable. Do sst get, update, add