In-place updates for equal keys and similar sized values

Summary:
Currently for each put, a fresh memory is allocated, and a new entry is added to the memtable with a new sequence number irrespective of whether the key already exists in the memtable. This diff is an attempt to update the value inplace for existing keys. It currently handles a very simple case:
1. Key already exists in the current memtable. Does not inplace update values in immutable memtable or snapshot
2. Latest value type is a 'put' ie kTypeValue
3. New value size is less than existing value, to avoid reallocating memory

TODO: For a put of an existing key, deallocate memory take by values, for other value types till a kTypeValue is found, ie. remove kTypeMerge.
TODO: Update the transaction log, to allow consistent reload of the memtable.

Test Plan: Added a unit test verifying the inplace update. But some other unit tests broken due to invalid sequence number checks. WIll fix them next.

Reviewers: xinyaohu, sumeet, haobo, dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D12423

Automatic commit by arc
main
Naman Gupta 12 years ago
parent 17991cd5a0
commit fe25070242
  1. 71
      db/db_test.cc
  2. 91
      db/memtable.cc
  3. 16
      db/memtable.h
  4. 2
      db/repair.cc
  5. 7
      db/write_batch.cc
  6. 2
      db/write_batch_internal.h
  7. 3
      db/write_batch_test.cc
  8. 4
      include/rocksdb/db.h
  9. 11
      include/rocksdb/options.h
  10. 7
      include/rocksdb/statistics.h
  11. 3
      table/table_test.cc
  12. 16
      util/options.cc

@ -438,11 +438,11 @@ class DBTest {
return DB::Open(opts, dbname_, &db_);
}
Status Put(const Slice& k, const Slice& v) {
Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
if (kMergePut == option_config_ ) {
return db_->Merge(WriteOptions(), k, v);
return db_->Merge(wo, k, v);
} else {
return db_->Put(WriteOptions(), k, v);
return db_->Put(wo, k, v);
}
}
@ -2306,6 +2306,71 @@ TEST(DBTest, RepeatedWritesToSameKey) {
} while (ChangeCompactOptions());
}
TEST(DBTest, InPlaceUpdate) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.inplace_update_support = true;
options.env = env_;
options.write_buffer_size = 100000;
// Update key with values of smaller size
Reopen(&options);
int numValues = 10;
for (int i = numValues; i > 0; i--) {
std::string value = DummyString(i, 'a');
ASSERT_OK(Put("key", value));
ASSERT_EQ(value, Get("key"));
}
int count = 0;
Iterator* iter = dbfull()->TEST_NewInternalIterator();
iter->SeekToFirst();
ASSERT_EQ(iter->status().ok(), true);
while (iter->Valid()) {
ParsedInternalKey ikey;
ikey.sequence = -1;
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
count++;
// All updates with the same sequence number.
ASSERT_EQ(ikey.sequence, (unsigned)1);
iter->Next();
}
// Only 1 instance for that key.
ASSERT_EQ(count, 1);
delete iter;
// Update key with values of larger size
DestroyAndReopen(&options);
numValues = 10;
for (int i = 0; i < numValues; i++) {
std::string value = DummyString(i, 'a');
ASSERT_OK(Put("key", value));
ASSERT_EQ(value, Get("key"));
}
count = 0;
iter = dbfull()->TEST_NewInternalIterator();
iter->SeekToFirst();
ASSERT_EQ(iter->status().ok(), true);
int seq = numValues;
while (iter->Valid()) {
ParsedInternalKey ikey;
ikey.sequence = -1;
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
count++;
// No inplace updates. All updates are puts with new seq number
ASSERT_EQ(ikey.sequence, (unsigned)seq--);
iter->Next();
}
// All 10 updates exist in the internal iterator
ASSERT_EQ(count, numValues);
delete iter;
} while (ChangeCompactOptions());
}
// This is a static filter used for filtering
// kvs during the compaction process.
static int cfilter_count;

@ -17,8 +17,18 @@
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/murmurhash.h"
namespace std {
template <>
struct hash<rocksdb::Slice> {
size_t operator()(const rocksdb::Slice& slice) const {
return MurmurHash(slice.data(), slice.size(), 0);
}
};
}
namespace rocksdb {
MemTable::MemTable(const InternalKeyComparator& cmp,
@ -35,7 +45,10 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
edit_(numlevel),
first_seqno_(0),
mem_next_logfile_number_(0),
mem_logfile_number_(0) { }
mem_logfile_number_(0),
locks_(options.inplace_update_support
? options.inplace_update_num_locks
: 0) { }
MemTable::~MemTable() {
assert(refs_ == 0);
@ -110,6 +123,10 @@ Iterator* MemTable::NewIterator(const Slice* prefix) {
}
}
port::RWMutex* MemTable::GetLock(const Slice& key) {
return &locks_[std::hash<Slice>()(key) % locks_.size()];
}
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key,
const Slice& value) {
@ -169,14 +186,16 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
// all entries with overly large sequence numbers.
const char* entry = iter->key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0) {
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
if (options.inplace_update_support) {
GetLock(key.user_key())->ReadLock();
}
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*s = Status::OK();
if (merge_in_progress) {
@ -189,6 +208,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
} else {
value->assign(v.data(), v.size());
}
if (options.inplace_update_support) {
GetLock(key.user_key())->Unlock();
}
return true;
}
case kTypeDeletion: {
@ -243,4 +265,65 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
return false;
}
bool MemTable::Update(SequenceNumber seq, ValueType type,
const Slice& key,
const Slice& value) {
LookupKey lkey(key, seq);
Slice memkey = lkey.memtable_key();
std::shared_ptr<MemTableRep::Iterator> iter(
table_.get()->GetIterator(lkey.user_key()));
iter->Seek(memkey.data());
if (iter->Valid()) {
// entry format is:
// klength varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter->key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
uint32_t vlength;
GetVarint32Ptr(key_ptr + key_length,
key_ptr + key_length+5, &vlength);
// Update value, if newValue size <= curValue size
if (value.size() <= vlength) {
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
value.size());
WriteLock wl(GetLock(lkey.user_key()));
memcpy(p, value.data(), value.size());
assert(
(p + value.size()) - entry ==
(unsigned) (VarintLength(key_length) +
key_length +
VarintLength(value.size()) +
value.size())
);
return true;
}
}
default:
// If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData
// then we probably don't have enough space to update in-place
// Maybe do something later
// Return false, and do normal Add()
return false;
}
}
}
// Key doesn't exist
return false;
}
} // namespace rocksdb

@ -88,6 +88,16 @@ class MemTable {
bool Get(const LookupKey& key, std::string* value, Status* s,
std::deque<std::string>* operands, const Options& options);
// Update the value and return status ok,
// if key exists in current memtable
// if new sizeof(new_value) <= sizeof(old_value) &&
// old_value for that key is a put i.e. kTypeValue
// else return false, and status - NotUpdatable()
// else return false, and status - NotFound()
bool Update(SequenceNumber seq, ValueType type,
const Slice& key,
const Slice& value);
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }
@ -144,9 +154,15 @@ class MemTable {
// memtable flush is done)
uint64_t mem_logfile_number_;
// rw locks for inplace updates
std::vector<port::RWMutex> locks_;
// No copying allowed
MemTable(const MemTable&);
void operator=(const MemTable&);
// Get the lock associated for the key
port::RWMutex* GetLock(const Slice& key);
};
} // namespace rocksdb

@ -208,7 +208,7 @@ class Repairer {
continue;
}
WriteBatchInternal::SetContents(&batch, record);
status = WriteBatchInternal::InsertInto(&batch, mem);
status = WriteBatchInternal::InsertInto(&batch, mem, &options_);
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {

@ -188,7 +188,12 @@ class MemTableInserter : public WriteBatch::Handler {
}
virtual void Put(const Slice& key, const Slice& value) {
mem_->Add(sequence_, kTypeValue, key, value);
if (options_->inplace_update_support
&& mem_->Update(sequence_, kTypeValue, key, value)) {
RecordTick(options_->statistics, NUMBER_KEYS_UPDATED);
} else {
mem_->Add(sequence_, kTypeValue, key, value);
}
sequence_++;
}
virtual void Merge(const Slice& key, const Slice& value) {

@ -48,7 +48,7 @@ class WriteBatchInternal {
// Drops deletes in batch if filter_del is set to true and
// db->KeyMayExist returns false
static Status InsertInto(const WriteBatch* batch, MemTable* memtable,
const Options* opts = nullptr, DB* db = nullptr,
const Options* opts, DB* db = nullptr,
const bool filter_del = false);
static void Append(WriteBatch* dst, const WriteBatch* src);

@ -25,7 +25,8 @@ static std::string PrintContents(WriteBatch* b) {
MemTable* mem = new MemTable(cmp, factory);
mem->Ref();
std::string state;
Status s = WriteBatchInternal::InsertInto(b, mem);
Options options;
Status s = WriteBatchInternal::InsertInto(b, mem, &options);
int count = 0;
Iterator* iter = mem->NewIterator();
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {

@ -81,8 +81,8 @@ class DB {
DB() { }
virtual ~DB();
// Set the database entry for "key" to "value". Returns OK on success,
// and a non-OK status on error.
// Set the database entry for "key" to "value".
// Returns OK on success, and a non-OK status on error.
// Note: consider setting options.sync = true.
virtual Status Put(const WriteOptions& options,
const Slice& key,

@ -594,6 +594,17 @@ struct Options {
// Default: emtpy vector -- no user-defined statistics collection will be
// performed.
std::vector<std::shared_ptr<TableStatsCollector>> table_stats_collectors;
// Allows thread-safe inplace updates. Requires Updates iff
// * key exists in current memtable
// * new sizeof(new_value) <= sizeof(old_value)
// * old_value for that key is a put i.e. kTypeValue
// Default: false.
bool inplace_update_support;
// Number of locks used for inplace update
// Default: 10000, if inplace_update_support = true, else 0.
size_t inplace_update_num_locks;
};
//

@ -39,6 +39,8 @@ enum Tickers {
NUMBER_KEYS_WRITTEN,
// Number of Keys read,
NUMBER_KEYS_READ,
// Number keys updated, if inplace update is enabled
NUMBER_KEYS_UPDATED,
// Bytes written / read
BYTES_WRITTEN,
BYTES_READ,
@ -94,6 +96,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{ COMPACTION_KEY_DROP_USER, "rocksdb.compaction.key.drop.user" },
{ NUMBER_KEYS_WRITTEN, "rocksdb.number.keys.written" },
{ NUMBER_KEYS_READ, "rocksdb.number.keys.read" },
{ NUMBER_KEYS_UPDATED, "rocksdb.number.keys.updated" },
{ BYTES_WRITTEN, "rocksdb.bytes.written" },
{ BYTES_READ, "rocksdb.bytes.read" },
{ NO_FILE_CLOSES, "rocksdb.no.file.closes" },
@ -144,7 +147,7 @@ enum Histograms {
HARD_RATE_LIMIT_DELAY_COUNT,
SOFT_RATE_LIMIT_DELAY_COUNT,
NUM_FILES_IN_SINGLE_COMPACTION,
HISTOGRAM_ENUM_MAX
HISTOGRAM_ENUM_MAX,
};
const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
@ -165,7 +168,7 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{ STALL_L0_NUM_FILES_COUNT, "rocksdb.num.files.stall.count"},
{ HARD_RATE_LIMIT_DELAY_COUNT, "rocksdb.hard.rate.limit.delay.count"},
{ SOFT_RATE_LIMIT_DELAY_COUNT, "rocksdb.soft.rate.limit.delay.count"},
{ NUM_FILES_IN_SINGLE_COMPACTION, "rocksdb.numfiles.in.singlecompaction" }
{ NUM_FILES_IN_SINGLE_COMPACTION, "rocksdb.numfiles.in.singlecompaction" },
};
struct HistogramData {

@ -821,12 +821,13 @@ TEST(MemTableTest, Simple) {
MemTable* memtable = new MemTable(cmp, table_factory);
memtable->Ref();
WriteBatch batch;
Options options;
WriteBatchInternal::SetSequence(&batch, 100);
batch.Put(std::string("k1"), std::string("v1"));
batch.Put(std::string("k2"), std::string("v2"));
batch.Put(std::string("k3"), std::string("v3"));
batch.Put(std::string("largekey"), std::string("vlarge"));
ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, memtable).ok());
ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, memtable, &options).ok());
Iterator* iter = memtable->NewIterator();
iter->SeekToFirst();

@ -94,7 +94,9 @@ Options::Options()
compaction_filter_factory(
std::shared_ptr<CompactionFilterFactory>(
new DefaultCompactionFilterFactory())),
purge_log_after_memtable_flush(true) {
purge_log_after_memtable_flush(true),
inplace_update_support(false),
inplace_update_num_locks(10000) {
assert(memtable_factory.get() != nullptr);
}
@ -253,11 +255,11 @@ Options::Dump(Logger* log) const
filter_deletes);
Log(log," Options.compaction_style: %d",
compaction_style);
Log(log," Options.compaction_options_universal.size_ratio: %u",
Log(log," Options.compaction_options_universal.size_ratio: %u",
compaction_options_universal.size_ratio);
Log(log," Options.compaction_options_universal.min_merge_width: %u",
Log(log,"Options.compaction_options_universal.min_merge_width: %u",
compaction_options_universal.min_merge_width);
Log(log," Options.compaction_options_universal.max_merge_width: %u",
Log(log,"Options.compaction_options_universal.max_merge_width: %u",
compaction_options_universal.max_merge_width);
Log(log,"Options.compaction_options_universal."
"max_size_amplification_percent: %u",
@ -269,8 +271,12 @@ Options::Dump(Logger* log) const
collector_names.append(collector->Name());
collector_names.append("; ");
}
Log(log," Options.table_stats_collectors: %s",
Log(log," Options.table_stats_collectors: %s",
collector_names.c_str());
Log(log," Options.inplace_update_support: %d",
inplace_update_support);
Log(log," Options.inplace_update_num_locks: %zd",
inplace_update_num_locks);
} // Options::Dump
//

Loading…
Cancel
Save