Fix in-place updates for value types other than kTypeValue (#10254)

Summary:
The patch fixes a couple of issues related to in-place updates: 1) the value type was not passed from
`MemTableInserter::PutCFImpl` to `MemTable::Update` and 2) `MemTable::UpdateCallback` was called
for any value type (with the callee's logic assuming `kTypeValue`) even though the callback mechanism
is only safe for plain values.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10254

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D37463644

Pulled By: ltamasi

fbshipit-source-id: 33802477dac0691681f416ae84c4d9742c6fe41a
main
Levi Tamasi 3 years ago committed by Facebook GitHub Bot
parent d3de59255a
commit 0d1e0722ef
  1. 54
      db/db_inplace_update_test.cc
  2. 118
      db/memtable.cc
  3. 6
      db/memtable.h
  4. 6
      db/write_batch.cc

@ -65,6 +65,60 @@ TEST_F(DBTestInPlaceUpdate, InPlaceUpdateLargeNewValue) {
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
TEST_F(DBTestInPlaceUpdate, InPlaceUpdateEntitySmallerNewValue) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.inplace_update_support = true;
options.env = env_;
options.allow_concurrent_memtable_write = false;
Reopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// Update key with values of smaller size
constexpr int num_values = 10;
for (int i = num_values; i > 0; --i) {
constexpr char key[] = "key";
const std::string value = DummyString(i, 'a');
WideColumns wide_columns{{"attr", value}};
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[1], key, wide_columns));
// TODO: use Get to check entity once it's supported
}
// Only 1 instance for that key.
validateNumberOfEntries(1, 1);
} while (ChangeCompactOptions());
}
TEST_F(DBTestInPlaceUpdate, InPlaceUpdateEntityLargerNewValue) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.inplace_update_support = true;
options.env = env_;
options.allow_concurrent_memtable_write = false;
Reopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// Update key with values of larger size
constexpr int num_values = 10;
for (int i = 0; i < num_values; ++i) {
constexpr char key[] = "key";
const std::string value = DummyString(i, 'a');
WideColumns wide_columns{{"attr", value}};
ASSERT_OK(db_->PutEntity(WriteOptions(), handles_[1], key, wide_columns));
// TODO: use Get to check entity once it's supported
}
// All 10 updates exist in the internal iterator
validateNumberOfEntries(num_values, 1);
} while (ChangeCompactOptions());
}
TEST_F(DBTestInPlaceUpdate, InPlaceUpdateCallbackSmallerSize) { TEST_F(DBTestInPlaceUpdate, InPlaceUpdateCallbackSmallerSize) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();

@ -1058,8 +1058,8 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
PERF_COUNTER_ADD(get_from_memtable_count, 1); PERF_COUNTER_ADD(get_from_memtable_count, 1);
} }
Status MemTable::Update(SequenceNumber seq, const Slice& key, Status MemTable::Update(SequenceNumber seq, ValueType value_type,
const Slice& value, const Slice& key, const Slice& value,
const ProtectionInfoKVOS64* kv_prot_info) { const ProtectionInfoKVOS64* kv_prot_info) {
LookupKey lkey(key, seq); LookupKey lkey(key, seq);
Slice mem_key = lkey.memtable_key(); Slice mem_key = lkey.memtable_key();
@ -1089,7 +1089,7 @@ Status MemTable::Update(SequenceNumber seq, const Slice& key,
SequenceNumber existing_seq; SequenceNumber existing_seq;
UnPackSequenceAndType(tag, &existing_seq, &type); UnPackSequenceAndType(tag, &existing_seq, &type);
assert(existing_seq != seq); assert(existing_seq != seq);
if (type == kTypeValue) { if (type == value_type) {
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
uint32_t new_size = static_cast<uint32_t>(value.size()); uint32_t new_size = static_cast<uint32_t>(value.size());
@ -1117,8 +1117,8 @@ Status MemTable::Update(SequenceNumber seq, const Slice& key,
} }
} }
// The latest value is not `kTypeValue` or key doesn't exist // The latest value is not value_type or key doesn't exist
return Add(seq, kTypeValue, key, value, kv_prot_info); return Add(seq, value_type, key, value, kv_prot_info);
} }
Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key, Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
@ -1151,66 +1151,62 @@ Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
ValueType type; ValueType type;
uint64_t existing_seq; uint64_t existing_seq;
UnPackSequenceAndType(tag, &existing_seq, &type); UnPackSequenceAndType(tag, &existing_seq, &type);
switch (type) { if (type == kTypeValue) {
case kTypeValue: { Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
char* prev_buffer = const_cast<char*>(prev_value.data());
uint32_t new_prev_size = prev_size;
std::string str_value; char* prev_buffer = const_cast<char*>(prev_value.data());
WriteLock wl(GetLock(lkey.user_key())); uint32_t new_prev_size = prev_size;
auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
delta, &str_value); std::string str_value;
if (status == UpdateStatus::UPDATED_INPLACE) { WriteLock wl(GetLock(lkey.user_key()));
// Value already updated by callback. auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
assert(new_prev_size <= prev_size); delta, &str_value);
if (new_prev_size < prev_size) { if (status == UpdateStatus::UPDATED_INPLACE) {
// overwrite the new prev_size // Value already updated by callback.
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length, assert(new_prev_size <= prev_size);
new_prev_size); if (new_prev_size < prev_size) {
if (VarintLength(new_prev_size) < VarintLength(prev_size)) { // overwrite the new prev_size
// shift the value buffer as well. char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
memcpy(p, prev_buffer, new_prev_size); new_prev_size);
prev_buffer = p; if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
} // shift the value buffer as well.
} memcpy(p, prev_buffer, new_prev_size);
RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); prev_buffer = p;
UpdateFlushState();
if (kv_prot_info != nullptr) {
ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
// `seq` is swallowed and `existing_seq` prevails.
updated_kv_prot_info.UpdateS(seq, existing_seq);
updated_kv_prot_info.UpdateV(delta,
Slice(prev_buffer, new_prev_size));
Slice encoded(entry, prev_buffer + new_prev_size - entry);
return VerifyEncodedEntry(encoded, updated_kv_prot_info);
}
return Status::OK();
} else if (status == UpdateStatus::UPDATED) {
Status s;
if (kv_prot_info != nullptr) {
ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
updated_kv_prot_info.UpdateV(delta, str_value);
s = Add(seq, kTypeValue, key, Slice(str_value),
&updated_kv_prot_info);
} else {
s = Add(seq, kTypeValue, key, Slice(str_value),
nullptr /* kv_prot_info */);
} }
RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
UpdateFlushState();
return s;
} else if (status == UpdateStatus::UPDATE_FAILED) {
// `UPDATE_FAILED` is named incorrectly. It indicates no update
// happened. It does not indicate a failure happened.
UpdateFlushState();
return Status::OK();
} }
RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
UpdateFlushState();
if (kv_prot_info != nullptr) {
ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
// `seq` is swallowed and `existing_seq` prevails.
updated_kv_prot_info.UpdateS(seq, existing_seq);
updated_kv_prot_info.UpdateV(delta,
Slice(prev_buffer, new_prev_size));
Slice encoded(entry, prev_buffer + new_prev_size - entry);
return VerifyEncodedEntry(encoded, updated_kv_prot_info);
}
return Status::OK();
} else if (status == UpdateStatus::UPDATED) {
Status s;
if (kv_prot_info != nullptr) {
ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
updated_kv_prot_info.UpdateV(delta, str_value);
s = Add(seq, kTypeValue, key, Slice(str_value),
&updated_kv_prot_info);
} else {
s = Add(seq, kTypeValue, key, Slice(str_value),
nullptr /* kv_prot_info */);
}
RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
UpdateFlushState();
return s;
} else if (status == UpdateStatus::UPDATE_FAILED) {
// `UPDATE_FAILED` is named incorrectly. It indicates no update
// happened. It does not indicate a failure happened.
UpdateFlushState();
return Status::OK();
} }
default:
break;
} }
} }
} }

@ -274,7 +274,7 @@ class MemTable {
void MultiGet(const ReadOptions& read_options, MultiGetRange* range, void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback); ReadCallback* callback);
// If `key` exists in current memtable with type `kTypeValue` and the existing // If `key` exists in current memtable with type value_type and the existing
// value is at least as large as the new value, updates it in-place. Otherwise // value is at least as large as the new value, updates it in-place. Otherwise
// adds the new value to the memtable out-of-place. // adds the new value to the memtable out-of-place.
// //
@ -284,8 +284,8 @@ class MemTable {
// //
// REQUIRES: external synchronization to prevent simultaneous // REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable. // operations on the same MemTable.
Status Update(SequenceNumber seq, const Slice& key, const Slice& value, Status Update(SequenceNumber seq, ValueType value_type, const Slice& key,
const ProtectionInfoKVOS64* kv_prot_info); const Slice& value, const ProtectionInfoKVOS64* kv_prot_info);
// If `key` exists in current memtable with type `kTypeValue` and the existing // If `key` exists in current memtable with type `kTypeValue` and the existing
// value is at least as large as the new value, updates it in-place. Otherwise // value is at least as large as the new value, updates it in-place. Otherwise

@ -1987,11 +1987,13 @@ class MemTableInserter : public WriteBatch::Handler {
mem->Add(sequence_, value_type, key, value, kv_prot_info, mem->Add(sequence_, value_type, key, value, kv_prot_info,
concurrent_memtable_writes_, get_post_process_info(mem), concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr); hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
} else if (moptions->inplace_callback == nullptr) { } else if (moptions->inplace_callback == nullptr ||
value_type != kTypeValue) {
assert(!concurrent_memtable_writes_); assert(!concurrent_memtable_writes_);
ret_status = mem->Update(sequence_, key, value, kv_prot_info); ret_status = mem->Update(sequence_, value_type, key, value, kv_prot_info);
} else { } else {
assert(!concurrent_memtable_writes_); assert(!concurrent_memtable_writes_);
assert(value_type == kTypeValue);
ret_status = mem->UpdateCallback(sequence_, key, value, kv_prot_info); ret_status = mem->UpdateCallback(sequence_, key, value, kv_prot_info);
if (ret_status.IsNotFound()) { if (ret_status.IsNotFound()) {
// key not found in memtable. Do sst get, update, add // key not found in memtable. Do sst get, update, add

Loading…
Cancel
Save