Add read/modify/write functionality to Put() api

Summary: The application can set a callback function, which is applied on the previous value. And calculates the new value. This new value can be set, either inplace, if the previous value existed in memtable, and new value is smaller than previous value. Otherwise the new value is added normally.

Test Plan: fbmake. Added unit tests. All unit tests pass.

Reviewers: dhruba, haobo

Reviewed By: haobo

CC: sdong, kailiu, xinyaohu, sumeet, leveldb

Differential Revision: https://reviews.facebook.net/D14745
main
Naman Gupta 11 years ago
parent aa0ef6602d
commit 8454cfe569
  1. 150
      db/db_test.cc
  2. 113
      db/memtable.cc
  3. 31
      db/memtable.h
  4. 38
      db/write_batch.cc
  5. 46
      include/rocksdb/options.h
  6. 1
      util/options.cc

@ -555,7 +555,7 @@ class DBTest {
case kTypeDeletion:
result += "DEL";
break;
case kTypeLogData:
default:
assert(false);
break;
}
@ -705,6 +705,44 @@ class DBTest {
ASSERT_EQ(IterStatus(iter), expected_key);
delete iter;
}
// Used to test InplaceUpdate
// If previous value is nullptr or delta is > than previous value,
// sets newValue with delta
// If previous value is not empty,
// updates previous value with 'b' string of previous value size
static bool updateInPlace(char* prevValue, size_t prevSize,
Slice delta, std::string* newValue) {
if (prevValue == nullptr || delta.size() > prevSize) {
*newValue = std::string(delta.size(), 'c');
return false;
} else {
std::string str_b = std::string(prevSize, 'b');
memcpy(prevValue, str_b.c_str(), str_b.size());
return true;
}
}
// Used to test InplaceUpdate
void validateNumberOfEntries(int numValues) {
Iterator* iter = dbfull()->TEST_NewInternalIterator();
iter->SeekToFirst();
ASSERT_EQ(iter->status().ok(), true);
int seq = numValues;
while (iter->Valid()) {
ParsedInternalKey ikey;
ikey.sequence = -1;
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
// checks sequence number for updates
ASSERT_EQ(ikey.sequence, (unsigned)seq--);
iter->Next();
}
delete iter;
ASSERT_EQ(0, seq);
}
};
std::unique_ptr<const SliceTransform> DBTest::prefix_1_transform(
NewFixedPrefixTransform(1));
@ -2391,9 +2429,9 @@ TEST(DBTest, InPlaceUpdate) {
options.inplace_update_support = true;
options.env = env_;
options.write_buffer_size = 100000;
Reopen(&options);
// Update key with values of smaller size
Reopen(&options);
int numValues = 10;
for (int i = numValues; i > 0; i--) {
std::string value = DummyString(i, 'a');
@ -2401,50 +2439,92 @@ TEST(DBTest, InPlaceUpdate) {
ASSERT_EQ(value, Get("key"));
}
int count = 0;
Iterator* iter = dbfull()->TEST_NewInternalIterator();
iter->SeekToFirst();
ASSERT_EQ(iter->status().ok(), true);
while (iter->Valid()) {
ParsedInternalKey ikey(Slice(), 0, kTypeValue);
ikey.sequence = -1;
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
count++;
// All updates with the same sequence number.
ASSERT_EQ(ikey.sequence, (unsigned)1);
iter->Next();
}
// Only 1 instance for that key.
ASSERT_EQ(count, 1);
delete iter;
validateNumberOfEntries(1);
} while (ChangeCompactOptions());
}
TEST(DBTest, InPlaceUpdateLargeNewValue) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.inplace_update_support = true;
options.env = env_;
options.write_buffer_size = 100000;
Reopen(&options);
// Update key with values of larger size
DestroyAndReopen(&options);
numValues = 10;
int numValues = 10;
for (int i = 0; i < numValues; i++) {
std::string value = DummyString(i, 'a');
ASSERT_OK(Put("key", value));
ASSERT_EQ(value, Get("key"));
}
count = 0;
iter = dbfull()->TEST_NewInternalIterator();
iter->SeekToFirst();
ASSERT_EQ(iter->status().ok(), true);
int seq = numValues;
while (iter->Valid()) {
ParsedInternalKey ikey(Slice(), 0, kTypeValue);
ikey.sequence = -1;
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
count++;
// No inplace updates. All updates are puts with new seq number
ASSERT_EQ(ikey.sequence, (unsigned)seq--);
iter->Next();
}
// All 10 updates exist in the internal iterator
ASSERT_EQ(count, numValues);
delete iter;
validateNumberOfEntries(numValues);
} while (ChangeCompactOptions());
}
TEST(DBTest, InPlaceUpdateCallback) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.inplace_update_support = true;
options.env = env_;
options.write_buffer_size = 100000;
options.inplace_callback =
rocksdb::DBTest::updateInPlace;
Reopen(&options);
// Update key with values of smaller size
int numValues = 10;
ASSERT_OK(Put("key", DummyString(numValues, 'a')));
ASSERT_EQ(DummyString(numValues, 'c'), Get("key"));
for (int i = numValues; i > 0; i--) {
ASSERT_OK(Put("key", DummyString(i, 'a')));
ASSERT_EQ(DummyString(numValues, 'b'), Get("key"));
}
// Only 1 instance for that key.
validateNumberOfEntries(1);
} while (ChangeCompactOptions());
}
TEST(DBTest, InPlaceUpdateCallbackNotFound) {
do {
//Test sst get/update/put
} while (ChangeCompactOptions());
}
TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.inplace_update_support = true;
options.env = env_;
options.write_buffer_size = 100000;
options.inplace_callback =
rocksdb::DBTest::updateInPlace;
Reopen(&options);
// Update key with values of larger size
int numValues = 10;
for (int i = 1; i <= numValues; i++) {
ASSERT_OK(Put("key", DummyString(i, 'a')));
ASSERT_EQ(DummyString(i, 'c'), Get("key"));
}
// No inplace updates. All updates are puts with new seq number
// All 10 updates exist in the internal iterator
validateNumberOfEntries(numValues);
} while (ChangeCompactOptions());
}

@ -302,7 +302,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
}
break;
}
case kTypeLogData:
default:
assert(false);
break;
}
@ -322,7 +322,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
return found_final_value;
}
bool MemTable::Update(SequenceNumber seq, ValueType type,
void MemTable::Update(SequenceNumber seq,
const Slice& key,
const Slice& value) {
LookupKey lkey(key, seq);
@ -335,7 +335,7 @@ bool MemTable::Update(SequenceNumber seq, ValueType type,
if (iter->Valid()) {
// entry format is:
// klength varint32
// key_length varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32
@ -352,37 +352,114 @@ bool MemTable::Update(SequenceNumber seq, ValueType type,
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
uint32_t vlength;
GetVarint32Ptr(key_ptr + key_length,
key_ptr + key_length+5, &vlength);
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
uint32_t prev_value_size = prev_value.size();
uint32_t new_value_size = value.size();
// Update value, if newValue size <= curValue size
if (value.size() <= vlength) {
if (new_value_size <= prev_value_size ) {
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
value.size());
new_value_size);
WriteLock wl(GetLock(lkey.user_key()));
memcpy(p, value.data(), value.size());
memcpy(p, value.data(), new_value_size);
assert(
(p + value.size()) - entry ==
(p + new_value_size) - entry ==
(unsigned) (VarintLength(key_length) +
key_length +
VarintLength(value.size()) +
value.size())
VarintLength(new_value_size) +
new_value_size)
);
// no need to update bloom, as user key does not change.
return true;
return;
}
}
default:
// If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData
// then we probably don't have enough space to update in-place
// Maybe do something later
// Return false, and do normal Add()
return false;
// we don't have enough space for update inplace
Add(seq, kTypeValue, key, value);
return;
}
}
}
// Key doesn't exist
// key doesn't exist
Add(seq, kTypeValue, key, value);
}
bool MemTable::UpdateCallback(SequenceNumber seq,
const Slice& key,
const Slice& delta,
const Options& options) {
LookupKey lkey(key, seq);
Slice memkey = lkey.memtable_key();
std::shared_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(lkey.user_key()));
iter->Seek(key, memkey.data());
if (iter->Valid()) {
// entry format is:
// key_length varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter->key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
uint32_t prev_value_size = prev_value.size();
WriteLock wl(GetLock(lkey.user_key()));
std::string str_value;
if (options.inplace_callback(const_cast<char*>(prev_value.data()),
prev_value_size, delta, &str_value)) {
// Value already updated by callback.
// TODO: Change size of value in memtable slice.
// This works for leaf, since size is already encoded in the
// value. It doesn't depend on rocksdb buffer size.
return true;
}
Slice slice_value = Slice(str_value);
uint32_t new_value_size = slice_value.size();
// Update value, if newValue size <= curValue size
if (new_value_size <= prev_value_size ) {
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
new_value_size);
memcpy(p, slice_value.data(), new_value_size);
assert(
(p + new_value_size) - entry ==
(unsigned) (VarintLength(key_length) +
key_length +
VarintLength(new_value_size) +
new_value_size)
);
return true;
} else {
// If we don't have enough space to update in-place
// Return as NotUpdatable, and do normal Add()
Add(seq, kTypeValue, key, slice_value);
return true;
}
}
default:
break;
}
}
}
// If the latest value is not kTypeValue
// or key doesn't exist
return false;
}
} // namespace rocksdb

@ -98,16 +98,31 @@ class MemTable {
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options);
// Update the value and return status ok,
// if key exists in current memtable
// if new sizeof(new_value) <= sizeof(old_value) &&
// old_value for that key is a put i.e. kTypeValue
// else return false, and status - NotUpdatable()
// else return false, and status - NotFound()
bool Update(SequenceNumber seq, ValueType type,
// Attempts to update the new_value inplace, else does normal Add
// Pseudocode
// if key exists in current memtable && prev_value is of type kTypeValue
// if new sizeof(new_value) <= sizeof(prev_value)
// update inplace
// else add(key, new_value)
// else add(key, new_value)
void Update(SequenceNumber seq,
const Slice& key,
const Slice& value);
// If prev_value for key exits, attempts to update it inplace.
// else returns false
// Pseudocode
// if key exists in current memtable && prev_value is of type kTypeValue
// new_value = delta(prev_value)
// if sizeof(new_value) <= sizeof(prev_value)
// update inplace
// else add(key, new_value)
// else return false
bool UpdateCallback(SequenceNumber seq,
const Slice& key,
const Slice& delta,
const Options& options);
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }
@ -149,7 +164,7 @@ class MemTable {
bool flush_completed_; // finished the flush
uint64_t file_number_; // filled up after flush is complete
// The udpates to be applied to the transaction log when this
// The updates to be applied to the transaction log when this
// memtable is flushed to storage.
VersionEdit edit_;

@ -115,7 +115,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
return Status::Corruption("unknown WriteBatch tag");
}
}
if (found != WriteBatchInternal::Count(this)) {
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
@ -194,14 +194,44 @@ class MemTableInserter : public WriteBatch::Handler {
}
virtual void Put(const Slice& key, const Slice& value) {
if (options_->inplace_update_support
&& mem_->Update(sequence_, kTypeValue, key, value)) {
if (!options_->inplace_update_support) {
mem_->Add(sequence_, kTypeValue, key, value);
} else if (options_->inplace_callback == nullptr) {
mem_->Update(sequence_, key, value);
RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED);
} else {
mem_->Add(sequence_, kTypeValue, key, value);
if (mem_->UpdateCallback(sequence_, key, value, *options_)) {
RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED);
} else {
// key not found in memtable. Do sst get/update/add
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
ropts.snapshot = &read_from_snapshot;
std::string prev_value;
std::string merged_value;
Status s = db_->Get(ropts, key, &prev_value);
char* prev_buffer = const_cast<char*>(prev_value.c_str());
size_t prev_size = prev_value.size();
if (options_->inplace_callback(s.ok() ? prev_buffer: nullptr,
s.ok() ? prev_size: 0,
value, &merged_value)) {
// prev_value is updated in-place with final value.
mem_->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN);
} else {
// merged_value contains the final value. Only add if not empty.
if (!merged_value.empty()) {
mem_->Add(sequence_, kTypeValue, key, Slice(merged_value));
RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN);
}
}
}
}
sequence_++;
}
virtual void Merge(const Slice& key, const Slice& value) {
mem_->Add(sequence_, kTypeMerge, key, value);
sequence_++;

@ -627,10 +627,13 @@ struct Options {
TablePropertiesCollectors;
TablePropertiesCollectors table_properties_collectors;
// Allows thread-safe inplace updates. Requires Updates iff
// * key exists in current memtable
// * new sizeof(new_value) <= sizeof(old_value)
// * old_value for that key is a put i.e. kTypeValue
// Allows thread-safe inplace updates.
// If inplace_callback function is not set,
// Put(key, new_value) will update inplace the existing_value iff
// * key exists in current memtable
// * new sizeof(new_value) <= sizeof(existing_value)
// * existing_value for that key is a put i.e. kTypeValue
// If inplace_callback function is set, check doc for inplace_callback.
// Default: false.
bool inplace_update_support;
@ -638,13 +641,46 @@ struct Options {
// Default: 10000, if inplace_update_support = true, else 0.
size_t inplace_update_num_locks;
// * existing_value - pointer to previous value (from both memtable and sst).
// nullptr if key doesn't exist
// * existing_value_size - sizeof(existing_value). 0 if key doesn't exist
// * delta_value - Delta value to be merged with the 'existing_value'.
// Stored in transaction logs.
// * merged_value - Set when delta is applied on the previous value.
// Applicable only when inplace_update_support is true,
// this callback function is called at the time of updating the memtable
// as part of a Put operation, lets say Put(key, delta_value). It allows the
// 'delta_value' specified as part of the Put operation to be merged with
// an 'existing_value' of the 'key' in the database.
// If the merged value is smaller in size that the 'existing_value',
// then this function can update the 'existing_value' buffer inplace if it
// wishes to. The callback should return true in this case. (In this case,
// the snapshot-semantics of the rocksdb Iterator is not atomic anymore).
// If the application does not wish to modify the 'existing_value' buffer
// inplace, then it should allocate a new buffer and update it by merging the
// 'existing_value' and the Put 'delta_value' and set the 'merged_value'
// pointer to this buffer. The callback should return false in this case. It
// is upto the calling layer to manage the memory returned in 'merged_value'.
// Please remember that the original call from the application is Put(key,
// delta_value). So the transaction log (if enabled) will still contain
// (key, delta_value). The 'merged_value' is not stored in the transaction log
// Hence the inplace_callback function should be consistent across db reopens.
// Default: nullptr
bool (*inplace_callback)(char* existing_value, size_t existing_value_size,
Slice delta_value, std::string* merged_value);
// if prefix_extractor is set and bloom_bits is not 0, create prefix bloom
// for memtable
uint32_t memtable_prefix_bloom_bits;
// number of hash probes per key
uint32_t memtable_prefix_bloom_probes;
};
//

@ -102,6 +102,7 @@ Options::Options()
std::shared_ptr<TableFactory>(new BlockBasedTableFactory())),
inplace_update_support(false),
inplace_update_num_locks(10000),
inplace_callback(nullptr),
memtable_prefix_bloom_bits(0),
memtable_prefix_bloom_probes(6) {
assert(memtable_factory.get() != nullptr);

Loading…
Cancel
Save