diff --git a/db/c.cc b/db/c.cc index 66e3892af..b1fe76019 100644 --- a/db/c.cc +++ b/db/c.cc @@ -3224,6 +3224,11 @@ void rocksdb_writeoptions_set_low_pri( opt->rep.low_pri = v; } +void rocksdb_writeoptions_set_memtable_insert_hint_per_batch( + rocksdb_writeoptions_t* opt, unsigned char v) { + opt->rep.memtable_insert_hint_per_batch = v; +} + rocksdb_compactoptions_t* rocksdb_compactoptions_create() { return new rocksdb_compactoptions_t; } diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index fcf34f83f..2f6d35d17 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -174,7 +174,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt, - batch_per_txn_); + batch_per_txn_, write_options.memtable_insert_hint_per_batch); PERF_TIMER_START(write_pre_and_post_process_time); } @@ -397,7 +397,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, seq_per_batch_, - w.batch_cnt, batch_per_txn_); + w.batch_cnt, batch_per_txn_, + write_options.memtable_insert_hint_per_batch); } } if (seq_used != nullptr) { @@ -564,7 +565,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, w.status = WriteBatchInternal::InsertInto( &w, w.sequence, &column_family_memtables, &flush_scheduler_, &trim_history_scheduler_, write_options.ignore_missing_column_families, - 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); + 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, + false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/, + write_options.memtable_insert_hint_per_batch); if (write_thread_.CompleteParallelMemTableWriter(&w)) { MemTableInsertStatusCheck(w.status); versions_->SetLastSequence(w.write_group->last_sequence); @@ -603,7 +606,8 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, &w, w.sequence, &column_family_memtables, &flush_scheduler_, &trim_history_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, - seq_per_batch_, sub_batch_cnt); + seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/, + write_options.memtable_insert_hint_per_batch); WriteStatusCheck(w.status); if (write_options.disableWAL) { diff --git a/db/memtable.cc b/db/memtable.cc index 33036ad98..dd6604514 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -469,7 +469,7 @@ MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey, bool MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, /* user key */ const Slice& value, bool allow_concurrent, - MemTablePostProcessInfo* post_process_info) { + MemTablePostProcessInfo* post_process_info, void** hint) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] @@ -547,7 +547,9 @@ bool MemTable::Add(SequenceNumber s, ValueType type, assert(post_process_info == nullptr); UpdateFlushState(); } else { - bool res = table->InsertKeyConcurrently(handle); + bool res = (hint == nullptr) + ? table->InsertKeyConcurrently(handle) + : table->InsertKeyWithHintConcurrently(handle, hint); if (UNLIKELY(!res)) { return res; } diff --git a/db/memtable.h b/db/memtable.h index ed837e945..f316ab8e2 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -182,7 +182,8 @@ class MemTable { // the already exists. bool Add(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value, bool allow_concurrent = false, - MemTablePostProcessInfo* post_process_info = nullptr); + MemTablePostProcessInfo* post_process_info = nullptr, + void** hint = nullptr); // Used to Get value associated with key or Get Merge Operands associated // with key. diff --git a/db/write_batch.cc b/db/write_batch.cc index 225e3e947..1b878f3b0 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -39,6 +39,7 @@ #include #include #include +#include #include #include "db/column_family.h" @@ -1225,6 +1226,22 @@ class MemTableInserter : public WriteBatch::Handler { DupDetector duplicate_detector_; bool dup_dectector_on_; + bool hint_per_batch_; + bool hint_created_; + // Hints for this batch + using HintMap = std::unordered_map; + using HintMapType = std::aligned_storage::type; + HintMapType hint_; + + HintMap& GetHintMap() { + assert(hint_per_batch_); + if (!hint_created_) { + new (&hint_) HintMap(); + hint_created_ = true; + } + return *reinterpret_cast(&hint_); + } + MemPostInfoMap& GetPostMap() { assert(concurrent_memtable_writes_); if(!post_info_created_) { @@ -1258,7 +1275,7 @@ class MemTableInserter : public WriteBatch::Handler { uint64_t recovering_log_number, DB* db, bool concurrent_memtable_writes, bool* has_valid_writes = nullptr, bool seq_per_batch = false, - bool batch_per_txn = true) + bool batch_per_txn = true, bool hint_per_batch = false) : sequence_(_sequence), cf_mems_(cf_mems), flush_scheduler_(flush_scheduler), @@ -1282,7 +1299,9 @@ class MemTableInserter : public WriteBatch::Handler { write_before_prepare_(!batch_per_txn), unprepared_batch_(false), duplicate_detector_(), - dup_dectector_on_(false) { + dup_dectector_on_(false), + hint_per_batch_(hint_per_batch), + hint_created_(false) { assert(cf_mems_); } @@ -1295,6 +1314,12 @@ class MemTableInserter : public WriteBatch::Handler { reinterpret_cast (&mem_post_info_map_)->~MemPostInfoMap(); } + if (hint_created_) { + for (auto iter : GetHintMap()) { + delete[] reinterpret_cast(iter.second); + } + reinterpret_cast(&hint_)->~HintMap(); + } delete rebuilding_trx_; } @@ -1404,7 +1429,8 @@ class MemTableInserter : public WriteBatch::Handler { if (!moptions->inplace_update_support) { bool mem_res = mem->Add(sequence_, value_type, key, value, - concurrent_memtable_writes_, get_post_process_info(mem)); + concurrent_memtable_writes_, get_post_process_info(mem), + hint_per_batch_ ? &GetHintMap()[mem] : nullptr); if (UNLIKELY(!mem_res)) { assert(seq_per_batch_); ret_status = Status::TryAgain("key+seq exists"); @@ -1487,7 +1513,8 @@ class MemTableInserter : public WriteBatch::Handler { MemTable* mem = cf_mems_->GetMemTable(); bool mem_res = mem->Add(sequence_, delete_type, key, value, - concurrent_memtable_writes_, get_post_process_info(mem)); + concurrent_memtable_writes_, get_post_process_info(mem), + hint_per_batch_ ? &GetHintMap()[mem] : nullptr); if (UNLIKELY(!mem_res)) { assert(seq_per_batch_); ret_status = Status::TryAgain("key+seq exists"); @@ -1962,7 +1989,7 @@ Status WriteBatchInternal::InsertInto( TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families, uint64_t log_number, DB* db, bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt, - bool batch_per_txn) { + bool batch_per_txn, bool hint_per_batch) { #ifdef NDEBUG (void)batch_cnt; #endif @@ -1971,7 +1998,7 @@ Status WriteBatchInternal::InsertInto( sequence, memtables, flush_scheduler, trim_history_scheduler, ignore_missing_column_families, log_number, db, concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch, - batch_per_txn); + batch_per_txn, hint_per_batch); SetSequence(writer->batch, sequence); inserter.set_log_number_ref(writer->log_ref); Status s = writer->batch->Iterate(&inserter); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 1d742fee1..3810c6722 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -188,7 +188,8 @@ class WriteBatchInternal { uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false, bool seq_per_batch = false, size_t batch_cnt = 0, - bool batch_per_txn = true); + bool batch_per_txn = true, + bool hint_per_batch = false); static Status Append(WriteBatch* dst, const WriteBatch* src, const bool WAL_only = false); diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index d7f13f8ed..525e38138 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -1263,6 +1263,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_writeoptions_set_no_slowdown( rocksdb_writeoptions_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_writeoptions_set_low_pri( rocksdb_writeoptions_t*, unsigned char); +extern ROCKSDB_LIBRARY_API void +rocksdb_writeoptions_set_memtable_insert_hint_per_batch(rocksdb_writeoptions_t*, + unsigned char); /* Compact range options */ diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 328422f57..7f18a581e 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -120,6 +120,28 @@ class MemTableRep { return true; } + // Same as ::InsertWithHint, but allow concurrnet write + // + // If hint points to nullptr, a new hint will be allocated on heap, otherwise + // the hint will be updated to reflect the last insert location. The hint is + // owned by the caller and it is the caller's responsibility to delete the + // hint later. + // + // Currently only skip-list based memtable implement the interface. Other + // implementations will fallback to InsertConcurrently() by default. + virtual void InsertWithHintConcurrently(KeyHandle handle, void** /*hint*/) { + // Ignore the hint by default. + InsertConcurrently(handle); + } + + // Same as ::InsertWithHintConcurrently + // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and + // the already exists. + virtual bool InsertKeyWithHintConcurrently(KeyHandle handle, void** hint) { + InsertWithHintConcurrently(handle, hint); + return true; + } + // Like Insert(handle), but may be called concurrent with other calls // to InsertConcurrently for other handles. // diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a7e8af16e..8c08b50d1 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1356,6 +1356,15 @@ struct WriteOptions { // Default: false bool low_pri; + // If true, this writebatch will maintain the last insert positions of each + // memtable as hints in concurrent write. It can improve write performance + // in concurrent writes if keys in one writebatch are sequential. In + // non-concurrent writes (when concurrent_memtable_writes is false) this + // option will be ignored. + // + // Default: false + bool memtable_insert_hint_per_batch; + // Timestamp of write operation, e.g. Put. All timestamps of the same // database must share the same length and format. The user is also // responsible for providing a customized compare function via Comparator to @@ -1373,6 +1382,7 @@ struct WriteOptions { ignore_missing_column_families(false), no_slowdown(false), low_pri(false), + memtable_insert_hint_per_batch(false), timestamp(nullptr) {} }; diff --git a/memtable/inlineskiplist.h b/memtable/inlineskiplist.h index faebad63e..91ab3d754 100644 --- a/memtable/inlineskiplist.h +++ b/memtable/inlineskiplist.h @@ -86,6 +86,9 @@ class InlineSkipList { // Allocate a splice using allocator. Splice* AllocateSplice(); + // Allocate a splice on heap. + Splice* AllocateSpliceOnHeap(); + // Inserts a key allocated by AllocateKey, after the actual key value // has been filled in. // @@ -105,6 +108,12 @@ class InlineSkipList { // REQUIRES: no concurrent calls to any of inserts. bool InsertWithHint(const char* key, void** hint); + // Like InsertConcurrently, but with a hint + // + // REQUIRES: nothing that compares equal to key is currently in the list. + // REQUIRES: no concurrent calls that use same hint + bool InsertWithHintConcurrently(const char* key, void** hint); + // Like Insert, but external synchronization is not required. bool InsertConcurrently(const char* key); @@ -642,6 +651,18 @@ InlineSkipList::AllocateSplice() { return splice; } +template +typename InlineSkipList::Splice* +InlineSkipList::AllocateSpliceOnHeap() { + size_t array_size = sizeof(Node*) * (kMaxHeight_ + 1); + char* raw = new char[sizeof(Splice) + array_size * 2]; + Splice* splice = reinterpret_cast(raw); + splice->height_ = 0; + splice->prev_ = reinterpret_cast(raw + sizeof(Splice)); + splice->next_ = reinterpret_cast(raw + sizeof(Splice) + array_size); + return splice; +} + template bool InlineSkipList::Insert(const char* key) { return Insert(key, seq_splice_, false); @@ -668,6 +689,18 @@ bool InlineSkipList::InsertWithHint(const char* key, void** hint) { return Insert(key, splice, true); } +template +bool InlineSkipList::InsertWithHintConcurrently(const char* key, + void** hint) { + assert(hint != nullptr); + Splice* splice = reinterpret_cast(*hint); + if (splice == nullptr) { + splice = AllocateSpliceOnHeap(); + *hint = reinterpret_cast(splice); + } + return Insert(key, splice, true); +} + template template void InlineSkipList::FindSpliceForLevel(const DecodedKey& key, diff --git a/memtable/inlineskiplist_test.cc b/memtable/inlineskiplist_test.cc index 9670f3fc6..a3ae41498 100644 --- a/memtable/inlineskiplist_test.cc +++ b/memtable/inlineskiplist_test.cc @@ -412,12 +412,18 @@ class ConcurrentTest { } // REQUIRES: No concurrent calls for the same k - void ConcurrentWriteStep(uint32_t k) { + void ConcurrentWriteStep(uint32_t k, bool use_hint = false) { const int g = current_.Get(k) + 1; const Key new_key = MakeKey(k, g); char* buf = list_.AllocateKey(sizeof(Key)); memcpy(buf, &new_key, sizeof(Key)); - list_.InsertConcurrently(buf); + if (use_hint) { + void* hint = nullptr; + list_.InsertWithHintConcurrently(buf, &hint); + delete[] reinterpret_cast(hint); + } else { + list_.InsertConcurrently(buf); + } ASSERT_EQ(g, current_.Get(k) + 1); current_.Set(k, g); } @@ -508,6 +514,7 @@ TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) { class TestState { public: ConcurrentTest t_; + bool use_hint_; int seed_; std::atomic quit_flag_; std::atomic next_writer_; @@ -575,7 +582,7 @@ static void ConcurrentReader(void* arg) { static void ConcurrentWriter(void* arg) { TestState* state = reinterpret_cast(arg); uint32_t k = state->next_writer_++ % ConcurrentTest::K; - state->t_.ConcurrentWriteStep(k); + state->t_.ConcurrentWriteStep(k, state->use_hint_); state->AdjustPendingWriters(-1); } @@ -600,7 +607,8 @@ static void RunConcurrentRead(int run) { } } -static void RunConcurrentInsert(int run, int write_parallelism = 4) { +static void RunConcurrentInsert(int run, bool use_hint = false, + int write_parallelism = 4) { Env::Default()->SetBackgroundThreads(1 + write_parallelism, Env::Priority::LOW); const int seed = test::RandomSeed() + (run * 100); @@ -612,6 +620,7 @@ static void RunConcurrentInsert(int run, int write_parallelism = 4) { fprintf(stderr, "Run %d of %d\n", i, N); } TestState state(seed + 1); + state.use_hint_ = use_hint; Env::Default()->Schedule(ConcurrentReader, &state); state.Wait(TestState::RUNNING); for (int k = 0; k < kSize; k += write_parallelism) { @@ -635,6 +644,15 @@ TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); } TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); } TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); } TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); } +TEST_F(InlineSkipTest, ConcurrentInsertWithHint1) { + RunConcurrentInsert(1, true); +} +TEST_F(InlineSkipTest, ConcurrentInsertWithHint2) { + RunConcurrentInsert(2, true); +} +TEST_F(InlineSkipTest, ConcurrentInsertWithHint3) { + RunConcurrentInsert(3, true); +} #endif // ROCKSDB_VALGRIND_RUN } // namespace rocksdb diff --git a/memtable/skiplistrep.cc b/memtable/skiplistrep.cc index 3955217cc..55d3cd7a6 100644 --- a/memtable/skiplistrep.cc +++ b/memtable/skiplistrep.cc @@ -50,6 +50,15 @@ public: return skip_list_.InsertWithHint(static_cast(handle), hint); } + void InsertWithHintConcurrently(KeyHandle handle, void** hint) override { + skip_list_.InsertWithHintConcurrently(static_cast(handle), hint); + } + + bool InsertKeyWithHintConcurrently(KeyHandle handle, void** hint) override { + return skip_list_.InsertWithHintConcurrently(static_cast(handle), + hint); + } + void InsertConcurrently(KeyHandle handle) override { skip_list_.InsertConcurrently(static_cast(handle)); } diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 715fd842a..caf73e7f7 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -4677,8 +4677,8 @@ class Benchmark { } char msg[100]; - snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", - found, read); + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", found, + read); thread->stats.AddBytes(bytes); thread->stats.AddMessage(msg);