diff --git a/db/db_test.cc b/db/db_test.cc index d35a9c7da..a99e056e3 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -438,11 +438,11 @@ class DBTest { return DB::Open(opts, dbname_, &db_); } - Status Put(const Slice& k, const Slice& v) { + Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) { if (kMergePut == option_config_ ) { - return db_->Merge(WriteOptions(), k, v); + return db_->Merge(wo, k, v); } else { - return db_->Put(WriteOptions(), k, v); + return db_->Put(wo, k, v); } } @@ -2306,6 +2306,71 @@ TEST(DBTest, RepeatedWritesToSameKey) { } while (ChangeCompactOptions()); } +TEST(DBTest, InPlaceUpdate) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + options.env = env_; + options.write_buffer_size = 100000; + + // Update key with values of smaller size + Reopen(&options); + int numValues = 10; + for (int i = numValues; i > 0; i--) { + std::string value = DummyString(i, 'a'); + ASSERT_OK(Put("key", value)); + ASSERT_EQ(value, Get("key")); + } + + int count = 0; + Iterator* iter = dbfull()->TEST_NewInternalIterator(); + iter->SeekToFirst(); + ASSERT_EQ(iter->status().ok(), true); + while (iter->Valid()) { + ParsedInternalKey ikey; + ikey.sequence = -1; + ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); + count++; + // All updates with the same sequence number. + ASSERT_EQ(ikey.sequence, (unsigned)1); + iter->Next(); + } + // Only 1 instance for that key. + ASSERT_EQ(count, 1); + delete iter; + + // Update key with values of larger size + DestroyAndReopen(&options); + numValues = 10; + for (int i = 0; i < numValues; i++) { + std::string value = DummyString(i, 'a'); + ASSERT_OK(Put("key", value)); + ASSERT_EQ(value, Get("key")); + } + + count = 0; + iter = dbfull()->TEST_NewInternalIterator(); + iter->SeekToFirst(); + ASSERT_EQ(iter->status().ok(), true); + int seq = numValues; + while (iter->Valid()) { + ParsedInternalKey ikey; + ikey.sequence = -1; + ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); + count++; + // No inplace updates. All updates are puts with new seq number + ASSERT_EQ(ikey.sequence, (unsigned)seq--); + iter->Next(); + } + // All 10 updates exist in the internal iterator + ASSERT_EQ(count, numValues); + delete iter; + + + } while (ChangeCompactOptions()); +} + // This is a static filter used for filtering // kvs during the compaction process. static int cfilter_count; diff --git a/db/memtable.cc b/db/memtable.cc index 6cb09b4de..ab9ef46e3 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -17,8 +17,18 @@ #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" #include "util/coding.h" +#include "util/mutexlock.h" #include "util/murmurhash.h" +namespace std { +template <> +struct hash { + size_t operator()(const rocksdb::Slice& slice) const { + return MurmurHash(slice.data(), slice.size(), 0); + } +}; +} + namespace rocksdb { MemTable::MemTable(const InternalKeyComparator& cmp, @@ -35,7 +45,10 @@ MemTable::MemTable(const InternalKeyComparator& cmp, edit_(numlevel), first_seqno_(0), mem_next_logfile_number_(0), - mem_logfile_number_(0) { } + mem_logfile_number_(0), + locks_(options.inplace_update_support + ? options.inplace_update_num_locks + : 0) { } MemTable::~MemTable() { assert(refs_ == 0); @@ -110,6 +123,10 @@ Iterator* MemTable::NewIterator(const Slice* prefix) { } } +port::RWMutex* MemTable::GetLock(const Slice& key) { + return &locks_[std::hash()(key) % locks_.size()]; +} + void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, const Slice& value) { @@ -169,14 +186,16 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, // all entries with overly large sequence numbers. const char* entry = iter->key(); uint32_t key_length; - const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length); + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); if (comparator_.comparator.user_comparator()->Compare( - Slice(key_ptr, key_length - 8), - key.user_key()) == 0) { + Slice(key_ptr, key_length - 8), key.user_key()) == 0) { // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast(tag & 0xff)) { case kTypeValue: { + if (options.inplace_update_support) { + GetLock(key.user_key())->ReadLock(); + } Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *s = Status::OK(); if (merge_in_progress) { @@ -189,6 +208,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, } else { value->assign(v.data(), v.size()); } + if (options.inplace_update_support) { + GetLock(key.user_key())->Unlock(); + } return true; } case kTypeDeletion: { @@ -243,4 +265,65 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, return false; } +bool MemTable::Update(SequenceNumber seq, ValueType type, + const Slice& key, + const Slice& value) { + LookupKey lkey(key, seq); + Slice memkey = lkey.memtable_key(); + + std::shared_ptr iter( + table_.get()->GetIterator(lkey.user_key())); + iter->Seek(memkey.data()); + + if (iter->Valid()) { + // entry format is: + // klength varint32 + // userkey char[klength-8] + // tag uint64 + // vlength varint32 + // value char[vlength] + // Check that it belongs to same user key. We do not check the + // sequence number since the Seek() call above should have skipped + // all entries with overly large sequence numbers. + const char* entry = iter->key(); + uint32_t key_length; + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (comparator_.comparator.user_comparator()->Compare( + Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + switch (static_cast(tag & 0xff)) { + case kTypeValue: { + uint32_t vlength; + GetVarint32Ptr(key_ptr + key_length, + key_ptr + key_length+5, &vlength); + // Update value, if newValue size <= curValue size + if (value.size() <= vlength) { + char* p = EncodeVarint32(const_cast(key_ptr) + key_length, + value.size()); + WriteLock wl(GetLock(lkey.user_key())); + memcpy(p, value.data(), value.size()); + assert( + (p + value.size()) - entry == + (unsigned) (VarintLength(key_length) + + key_length + + VarintLength(value.size()) + + value.size()) + ); + return true; + } + } + default: + // If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData + // then we probably don't have enough space to update in-place + // Maybe do something later + // Return false, and do normal Add() + return false; + } + } + } + + // Key doesn't exist + return false; +} } // namespace rocksdb diff --git a/db/memtable.h b/db/memtable.h index e92bf46ce..1a10f8088 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -88,6 +88,16 @@ class MemTable { bool Get(const LookupKey& key, std::string* value, Status* s, std::deque* operands, const Options& options); + // Update the value and return status ok, + // if key exists in current memtable + // if new sizeof(new_value) <= sizeof(old_value) && + // old_value for that key is a put i.e. kTypeValue + // else return false, and status - NotUpdatable() + // else return false, and status - NotFound() + bool Update(SequenceNumber seq, ValueType type, + const Slice& key, + const Slice& value); + // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } @@ -144,9 +154,15 @@ class MemTable { // memtable flush is done) uint64_t mem_logfile_number_; + // rw locks for inplace updates + std::vector locks_; + // No copying allowed MemTable(const MemTable&); void operator=(const MemTable&); + + // Get the lock associated for the key + port::RWMutex* GetLock(const Slice& key); }; } // namespace rocksdb diff --git a/db/repair.cc b/db/repair.cc index 252f24e45..1a6eae06c 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -208,7 +208,7 @@ class Repairer { continue; } WriteBatchInternal::SetContents(&batch, record); - status = WriteBatchInternal::InsertInto(&batch, mem); + status = WriteBatchInternal::InsertInto(&batch, mem, &options_); if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { diff --git a/db/write_batch.cc b/db/write_batch.cc index 66324d676..57ced9480 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -188,7 +188,12 @@ class MemTableInserter : public WriteBatch::Handler { } virtual void Put(const Slice& key, const Slice& value) { - mem_->Add(sequence_, kTypeValue, key, value); + if (options_->inplace_update_support + && mem_->Update(sequence_, kTypeValue, key, value)) { + RecordTick(options_->statistics, NUMBER_KEYS_UPDATED); + } else { + mem_->Add(sequence_, kTypeValue, key, value); + } sequence_++; } virtual void Merge(const Slice& key, const Slice& value) { diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 50bbda8a7..b8991732f 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -48,7 +48,7 @@ class WriteBatchInternal { // Drops deletes in batch if filter_del is set to true and // db->KeyMayExist returns false static Status InsertInto(const WriteBatch* batch, MemTable* memtable, - const Options* opts = nullptr, DB* db = nullptr, + const Options* opts, DB* db = nullptr, const bool filter_del = false); static void Append(WriteBatch* dst, const WriteBatch* src); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index b532a75db..c97201973 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -25,7 +25,8 @@ static std::string PrintContents(WriteBatch* b) { MemTable* mem = new MemTable(cmp, factory); mem->Ref(); std::string state; - Status s = WriteBatchInternal::InsertInto(b, mem); + Options options; + Status s = WriteBatchInternal::InsertInto(b, mem, &options); int count = 0; Iterator* iter = mem->NewIterator(); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 7d62ccd6c..06e6b9bb3 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -81,8 +81,8 @@ class DB { DB() { } virtual ~DB(); - // Set the database entry for "key" to "value". Returns OK on success, - // and a non-OK status on error. + // Set the database entry for "key" to "value". + // Returns OK on success, and a non-OK status on error. // Note: consider setting options.sync = true. virtual Status Put(const WriteOptions& options, const Slice& key, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 79abacb78..9f7c87f31 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -594,6 +594,17 @@ struct Options { // Default: emtpy vector -- no user-defined statistics collection will be // performed. std::vector> table_stats_collectors; + + // Allows thread-safe inplace updates. Requires Updates iff + // * key exists in current memtable + // * new sizeof(new_value) <= sizeof(old_value) + // * old_value for that key is a put i.e. kTypeValue + // Default: false. + bool inplace_update_support; + + // Number of locks used for inplace update + // Default: 10000, if inplace_update_support = true, else 0. + size_t inplace_update_num_locks; }; // diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index e11127ce7..e9302b973 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -39,6 +39,8 @@ enum Tickers { NUMBER_KEYS_WRITTEN, // Number of Keys read, NUMBER_KEYS_READ, + // Number keys updated, if inplace update is enabled + NUMBER_KEYS_UPDATED, // Bytes written / read BYTES_WRITTEN, BYTES_READ, @@ -94,6 +96,7 @@ const std::vector> TickersNameMap = { { COMPACTION_KEY_DROP_USER, "rocksdb.compaction.key.drop.user" }, { NUMBER_KEYS_WRITTEN, "rocksdb.number.keys.written" }, { NUMBER_KEYS_READ, "rocksdb.number.keys.read" }, + { NUMBER_KEYS_UPDATED, "rocksdb.number.keys.updated" }, { BYTES_WRITTEN, "rocksdb.bytes.written" }, { BYTES_READ, "rocksdb.bytes.read" }, { NO_FILE_CLOSES, "rocksdb.no.file.closes" }, @@ -144,7 +147,7 @@ enum Histograms { HARD_RATE_LIMIT_DELAY_COUNT, SOFT_RATE_LIMIT_DELAY_COUNT, NUM_FILES_IN_SINGLE_COMPACTION, - HISTOGRAM_ENUM_MAX + HISTOGRAM_ENUM_MAX, }; const std::vector> HistogramsNameMap = { @@ -165,7 +168,7 @@ const std::vector> HistogramsNameMap = { { STALL_L0_NUM_FILES_COUNT, "rocksdb.num.files.stall.count"}, { HARD_RATE_LIMIT_DELAY_COUNT, "rocksdb.hard.rate.limit.delay.count"}, { SOFT_RATE_LIMIT_DELAY_COUNT, "rocksdb.soft.rate.limit.delay.count"}, - { NUM_FILES_IN_SINGLE_COMPACTION, "rocksdb.numfiles.in.singlecompaction" } + { NUM_FILES_IN_SINGLE_COMPACTION, "rocksdb.numfiles.in.singlecompaction" }, }; struct HistogramData { diff --git a/table/table_test.cc b/table/table_test.cc index 5ed8c7070..b8e25877a 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -821,12 +821,13 @@ TEST(MemTableTest, Simple) { MemTable* memtable = new MemTable(cmp, table_factory); memtable->Ref(); WriteBatch batch; + Options options; WriteBatchInternal::SetSequence(&batch, 100); batch.Put(std::string("k1"), std::string("v1")); batch.Put(std::string("k2"), std::string("v2")); batch.Put(std::string("k3"), std::string("v3")); batch.Put(std::string("largekey"), std::string("vlarge")); - ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, memtable).ok()); + ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, memtable, &options).ok()); Iterator* iter = memtable->NewIterator(); iter->SeekToFirst(); diff --git a/util/options.cc b/util/options.cc index 795b08d77..df1a82e48 100644 --- a/util/options.cc +++ b/util/options.cc @@ -94,7 +94,9 @@ Options::Options() compaction_filter_factory( std::shared_ptr( new DefaultCompactionFilterFactory())), - purge_log_after_memtable_flush(true) { + purge_log_after_memtable_flush(true), + inplace_update_support(false), + inplace_update_num_locks(10000) { assert(memtable_factory.get() != nullptr); } @@ -253,11 +255,11 @@ Options::Dump(Logger* log) const filter_deletes); Log(log," Options.compaction_style: %d", compaction_style); - Log(log," Options.compaction_options_universal.size_ratio: %u", + Log(log," Options.compaction_options_universal.size_ratio: %u", compaction_options_universal.size_ratio); - Log(log," Options.compaction_options_universal.min_merge_width: %u", + Log(log,"Options.compaction_options_universal.min_merge_width: %u", compaction_options_universal.min_merge_width); - Log(log," Options.compaction_options_universal.max_merge_width: %u", + Log(log,"Options.compaction_options_universal.max_merge_width: %u", compaction_options_universal.max_merge_width); Log(log,"Options.compaction_options_universal." "max_size_amplification_percent: %u", @@ -269,8 +271,12 @@ Options::Dump(Logger* log) const collector_names.append(collector->Name()); collector_names.append("; "); } - Log(log," Options.table_stats_collectors: %s", + Log(log," Options.table_stats_collectors: %s", collector_names.c_str()); + Log(log," Options.inplace_update_support: %d", + inplace_update_support); + Log(log," Options.inplace_update_num_locks: %zd", + inplace_update_num_locks); } // Options::Dump //