Add insert hints for each writebatch (#5728)

Summary:
Add insert hints for each writebatch so that they can be used in concurrent write, and add write option to enable it.

Bench result (qps):

`./db_bench --benchmarks=fillseq -allow_concurrent_memtable_write=true -num=4000000 -batch-size=1 -threads=1 -db=/data3/ylj/tmp -write_buffer_size=536870912 -num_column_families=4`

master:

| batch size \ thread num | 1       | 2       | 4       | 8       |
| ----------------------- | ------- | ------- | ------- | ------- |
| 1                       | 387883  | 220790  | 308294  | 490998  |
| 10                      | 1397208 | 978911  | 1275684 | 1733395 |
| 100                     | 2045414 | 1589927 | 1798782 | 2681039 |
| 1000                    | 2228038 | 1698252 | 1839877 | 2863490 |

fillseq with writebatch hint:

| batch size \ thread num | 1       | 2       | 4       | 8       |
| ----------------------- | ------- | ------- | ------- | ------- |
| 1                       | 286005  | 223570  | 300024  | 466981  |
| 10                      | 970374  | 813308  | 1399299 | 1753588 |
| 100                     | 1962768 | 1983023 | 2676577 | 3086426 |
| 1000                    | 2195853 | 2676782 | 3231048 | 3638143 |
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5728

Differential Revision: D17297240

fbshipit-source-id: b053590a6d77871f1ef2f911a7bd013b3899b26c
main
Lingjing You 5 years ago committed by Facebook Github Bot
parent a378a4c2ac
commit 1a928c22a0
  1. 5
      db/c.cc
  2. 12
      db/db_impl/db_impl_write.cc
  3. 6
      db/memtable.cc
  4. 3
      db/memtable.h
  5. 39
      db/write_batch.cc
  6. 3
      db/write_batch_internal.h
  7. 3
      include/rocksdb/c.h
  8. 22
      include/rocksdb/memtablerep.h
  9. 10
      include/rocksdb/options.h
  10. 33
      memtable/inlineskiplist.h
  11. 24
      memtable/inlineskiplist_test.cc
  12. 9
      memtable/skiplistrep.cc
  13. 4
      tools/db_bench_tool.cc

@ -3224,6 +3224,11 @@ void rocksdb_writeoptions_set_low_pri(
opt->rep.low_pri = v;
}
void rocksdb_writeoptions_set_memtable_insert_hint_per_batch(
rocksdb_writeoptions_t* opt, unsigned char v) {
opt->rep.memtable_insert_hint_per_batch = v;
}
rocksdb_compactoptions_t* rocksdb_compactoptions_create() {
return new rocksdb_compactoptions_t;
}

@ -174,7 +174,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
&trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
batch_per_txn_);
batch_per_txn_, write_options.memtable_insert_hint_per_batch);
PERF_TIMER_START(write_pre_and_post_process_time);
}
@ -397,7 +397,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
&trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
w.batch_cnt, batch_per_txn_);
w.batch_cnt, batch_per_txn_,
write_options.memtable_insert_hint_per_batch);
}
}
if (seq_used != nullptr) {
@ -564,7 +565,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, true /*concurrent_memtable_writes*/);
0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
write_options.memtable_insert_hint_per_batch);
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
MemTableInsertStatusCheck(w.status);
versions_->SetLastSequence(w.write_group->last_sequence);
@ -603,7 +606,8 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
seq_per_batch_, sub_batch_cnt);
seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/,
write_options.memtable_insert_hint_per_batch);
WriteStatusCheck(w.status);
if (write_options.disableWAL) {

@ -469,7 +469,7 @@ MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
bool MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value, bool allow_concurrent,
MemTablePostProcessInfo* post_process_info) {
MemTablePostProcessInfo* post_process_info, void** hint) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
@ -547,7 +547,9 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
assert(post_process_info == nullptr);
UpdateFlushState();
} else {
bool res = table->InsertKeyConcurrently(handle);
bool res = (hint == nullptr)
? table->InsertKeyConcurrently(handle)
: table->InsertKeyWithHintConcurrently(handle, hint);
if (UNLIKELY(!res)) {
return res;
}

@ -182,7 +182,8 @@ class MemTable {
// the <key, seq> already exists.
bool Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value, bool allow_concurrent = false,
MemTablePostProcessInfo* post_process_info = nullptr);
MemTablePostProcessInfo* post_process_info = nullptr,
void** hint = nullptr);
// Used to Get value associated with key or Get Merge Operands associated
// with key.

@ -39,6 +39,7 @@
#include <stack>
#include <stdexcept>
#include <type_traits>
#include <unordered_map>
#include <vector>
#include "db/column_family.h"
@ -1225,6 +1226,22 @@ class MemTableInserter : public WriteBatch::Handler {
DupDetector duplicate_detector_;
bool dup_dectector_on_;
bool hint_per_batch_;
bool hint_created_;
// Hints for this batch
using HintMap = std::unordered_map<MemTable*, void*>;
using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
HintMapType hint_;
HintMap& GetHintMap() {
assert(hint_per_batch_);
if (!hint_created_) {
new (&hint_) HintMap();
hint_created_ = true;
}
return *reinterpret_cast<HintMap*>(&hint_);
}
MemPostInfoMap& GetPostMap() {
assert(concurrent_memtable_writes_);
if(!post_info_created_) {
@ -1258,7 +1275,7 @@ class MemTableInserter : public WriteBatch::Handler {
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes,
bool* has_valid_writes = nullptr, bool seq_per_batch = false,
bool batch_per_txn = true)
bool batch_per_txn = true, bool hint_per_batch = false)
: sequence_(_sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
@ -1282,7 +1299,9 @@ class MemTableInserter : public WriteBatch::Handler {
write_before_prepare_(!batch_per_txn),
unprepared_batch_(false),
duplicate_detector_(),
dup_dectector_on_(false) {
dup_dectector_on_(false),
hint_per_batch_(hint_per_batch),
hint_created_(false) {
assert(cf_mems_);
}
@ -1295,6 +1314,12 @@ class MemTableInserter : public WriteBatch::Handler {
reinterpret_cast<MemPostInfoMap*>
(&mem_post_info_map_)->~MemPostInfoMap();
}
if (hint_created_) {
for (auto iter : GetHintMap()) {
delete[] reinterpret_cast<char*>(iter.second);
}
reinterpret_cast<HintMap*>(&hint_)->~HintMap();
}
delete rebuilding_trx_;
}
@ -1404,7 +1429,8 @@ class MemTableInserter : public WriteBatch::Handler {
if (!moptions->inplace_update_support) {
bool mem_res =
mem->Add(sequence_, value_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
@ -1487,7 +1513,8 @@ class MemTableInserter : public WriteBatch::Handler {
MemTable* mem = cf_mems_->GetMemTable();
bool mem_res =
mem->Add(sequence_, delete_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
@ -1962,7 +1989,7 @@ Status WriteBatchInternal::InsertInto(
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
bool batch_per_txn) {
bool batch_per_txn, bool hint_per_batch) {
#ifdef NDEBUG
(void)batch_cnt;
#endif
@ -1971,7 +1998,7 @@ Status WriteBatchInternal::InsertInto(
sequence, memtables, flush_scheduler, trim_history_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
batch_per_txn);
batch_per_txn, hint_per_batch);
SetSequence(writer->batch, sequence);
inserter.set_log_number_ref(writer->log_ref);
Status s = writer->batch->Iterate(&inserter);

@ -188,7 +188,8 @@ class WriteBatchInternal {
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false,
bool seq_per_batch = false, size_t batch_cnt = 0,
bool batch_per_txn = true);
bool batch_per_txn = true,
bool hint_per_batch = false);
static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);

@ -1263,6 +1263,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_writeoptions_set_no_slowdown(
rocksdb_writeoptions_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_writeoptions_set_low_pri(
rocksdb_writeoptions_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_writeoptions_set_memtable_insert_hint_per_batch(rocksdb_writeoptions_t*,
unsigned char);
/* Compact range options */

@ -120,6 +120,28 @@ class MemTableRep {
return true;
}
// Same as ::InsertWithHint, but allow concurrnet write
//
// If hint points to nullptr, a new hint will be allocated on heap, otherwise
// the hint will be updated to reflect the last insert location. The hint is
// owned by the caller and it is the caller's responsibility to delete the
// hint later.
//
// Currently only skip-list based memtable implement the interface. Other
// implementations will fallback to InsertConcurrently() by default.
virtual void InsertWithHintConcurrently(KeyHandle handle, void** /*hint*/) {
// Ignore the hint by default.
InsertConcurrently(handle);
}
// Same as ::InsertWithHintConcurrently
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
// the <key, seq> already exists.
virtual bool InsertKeyWithHintConcurrently(KeyHandle handle, void** hint) {
InsertWithHintConcurrently(handle, hint);
return true;
}
// Like Insert(handle), but may be called concurrent with other calls
// to InsertConcurrently for other handles.
//

@ -1356,6 +1356,15 @@ struct WriteOptions {
// Default: false
bool low_pri;
// If true, this writebatch will maintain the last insert positions of each
// memtable as hints in concurrent write. It can improve write performance
// in concurrent writes if keys in one writebatch are sequential. In
// non-concurrent writes (when concurrent_memtable_writes is false) this
// option will be ignored.
//
// Default: false
bool memtable_insert_hint_per_batch;
// Timestamp of write operation, e.g. Put. All timestamps of the same
// database must share the same length and format. The user is also
// responsible for providing a customized compare function via Comparator to
@ -1373,6 +1382,7 @@ struct WriteOptions {
ignore_missing_column_families(false),
no_slowdown(false),
low_pri(false),
memtable_insert_hint_per_batch(false),
timestamp(nullptr) {}
};

@ -86,6 +86,9 @@ class InlineSkipList {
// Allocate a splice using allocator.
Splice* AllocateSplice();
// Allocate a splice on heap.
Splice* AllocateSpliceOnHeap();
// Inserts a key allocated by AllocateKey, after the actual key value
// has been filled in.
//
@ -105,6 +108,12 @@ class InlineSkipList {
// REQUIRES: no concurrent calls to any of inserts.
bool InsertWithHint(const char* key, void** hint);
// Like InsertConcurrently, but with a hint
//
// REQUIRES: nothing that compares equal to key is currently in the list.
// REQUIRES: no concurrent calls that use same hint
bool InsertWithHintConcurrently(const char* key, void** hint);
// Like Insert, but external synchronization is not required.
bool InsertConcurrently(const char* key);
@ -642,6 +651,18 @@ InlineSkipList<Comparator>::AllocateSplice() {
return splice;
}
template <class Comparator>
typename InlineSkipList<Comparator>::Splice*
InlineSkipList<Comparator>::AllocateSpliceOnHeap() {
size_t array_size = sizeof(Node*) * (kMaxHeight_ + 1);
char* raw = new char[sizeof(Splice) + array_size * 2];
Splice* splice = reinterpret_cast<Splice*>(raw);
splice->height_ = 0;
splice->prev_ = reinterpret_cast<Node**>(raw + sizeof(Splice));
splice->next_ = reinterpret_cast<Node**>(raw + sizeof(Splice) + array_size);
return splice;
}
template <class Comparator>
bool InlineSkipList<Comparator>::Insert(const char* key) {
return Insert<false>(key, seq_splice_, false);
@ -668,6 +689,18 @@ bool InlineSkipList<Comparator>::InsertWithHint(const char* key, void** hint) {
return Insert<false>(key, splice, true);
}
template <class Comparator>
bool InlineSkipList<Comparator>::InsertWithHintConcurrently(const char* key,
void** hint) {
assert(hint != nullptr);
Splice* splice = reinterpret_cast<Splice*>(*hint);
if (splice == nullptr) {
splice = AllocateSpliceOnHeap();
*hint = reinterpret_cast<void*>(splice);
}
return Insert<true>(key, splice, true);
}
template <class Comparator>
template <bool prefetch_before>
void InlineSkipList<Comparator>::FindSpliceForLevel(const DecodedKey& key,

@ -412,12 +412,18 @@ class ConcurrentTest {
}
// REQUIRES: No concurrent calls for the same k
void ConcurrentWriteStep(uint32_t k) {
void ConcurrentWriteStep(uint32_t k, bool use_hint = false) {
const int g = current_.Get(k) + 1;
const Key new_key = MakeKey(k, g);
char* buf = list_.AllocateKey(sizeof(Key));
memcpy(buf, &new_key, sizeof(Key));
if (use_hint) {
void* hint = nullptr;
list_.InsertWithHintConcurrently(buf, &hint);
delete[] reinterpret_cast<char*>(hint);
} else {
list_.InsertConcurrently(buf);
}
ASSERT_EQ(g, current_.Get(k) + 1);
current_.Set(k, g);
}
@ -508,6 +514,7 @@ TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) {
class TestState {
public:
ConcurrentTest t_;
bool use_hint_;
int seed_;
std::atomic<bool> quit_flag_;
std::atomic<uint32_t> next_writer_;
@ -575,7 +582,7 @@ static void ConcurrentReader(void* arg) {
static void ConcurrentWriter(void* arg) {
TestState* state = reinterpret_cast<TestState*>(arg);
uint32_t k = state->next_writer_++ % ConcurrentTest::K;
state->t_.ConcurrentWriteStep(k);
state->t_.ConcurrentWriteStep(k, state->use_hint_);
state->AdjustPendingWriters(-1);
}
@ -600,7 +607,8 @@ static void RunConcurrentRead(int run) {
}
}
static void RunConcurrentInsert(int run, int write_parallelism = 4) {
static void RunConcurrentInsert(int run, bool use_hint = false,
int write_parallelism = 4) {
Env::Default()->SetBackgroundThreads(1 + write_parallelism,
Env::Priority::LOW);
const int seed = test::RandomSeed() + (run * 100);
@ -612,6 +620,7 @@ static void RunConcurrentInsert(int run, int write_parallelism = 4) {
fprintf(stderr, "Run %d of %d\n", i, N);
}
TestState state(seed + 1);
state.use_hint_ = use_hint;
Env::Default()->Schedule(ConcurrentReader, &state);
state.Wait(TestState::RUNNING);
for (int k = 0; k < kSize; k += write_parallelism) {
@ -635,6 +644,15 @@ TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); }
TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); }
TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); }
TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); }
TEST_F(InlineSkipTest, ConcurrentInsertWithHint1) {
RunConcurrentInsert(1, true);
}
TEST_F(InlineSkipTest, ConcurrentInsertWithHint2) {
RunConcurrentInsert(2, true);
}
TEST_F(InlineSkipTest, ConcurrentInsertWithHint3) {
RunConcurrentInsert(3, true);
}
#endif // ROCKSDB_VALGRIND_RUN
} // namespace rocksdb

@ -50,6 +50,15 @@ public:
return skip_list_.InsertWithHint(static_cast<char*>(handle), hint);
}
void InsertWithHintConcurrently(KeyHandle handle, void** hint) override {
skip_list_.InsertWithHintConcurrently(static_cast<char*>(handle), hint);
}
bool InsertKeyWithHintConcurrently(KeyHandle handle, void** hint) override {
return skip_list_.InsertWithHintConcurrently(static_cast<char*>(handle),
hint);
}
void InsertConcurrently(KeyHandle handle) override {
skip_list_.InsertConcurrently(static_cast<char*>(handle));
}

@ -4677,8 +4677,8 @@ class Benchmark {
}
char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
found, read);
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", found,
read);
thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);

Loading…
Cancel
Save