New WriteImpl to pipeline WAL/memtable write

Summary:
PipelineWriteImpl is an alternative approach to WriteImpl. In WriteImpl, only one thread is allow to write at the same time. This thread will do both WAL and memtable writes for all write threads in the write group. Pending writers wait in queue until the current writer finishes. In the pipeline write approach, two queue is maintained: one WAL writer queue and one memtable writer queue. All writers (regardless of whether they need to write WAL) will still need to first join the WAL writer queue, and after the house keeping work and WAL writing, they will need to join memtable writer queue if needed. The benefit of this approach is that
1. Writers without memtable writes (e.g. the prepare phase of two phase commit) can exit write thread once WAL write is finish. They don't need to wait for memtable writes in case of group commit.
2. Pending writers only need to wait for previous WAL writer finish to be able to join the write thread, instead of wait also for previous memtable writes.

Merging #2056 and #2058 into this PR.
Closes https://github.com/facebook/rocksdb/pull/2286

Differential Revision: D5054606

Pulled By: yiwu-arbug

fbshipit-source-id: ee5b11efd19d3e39d6b7210937b11cefdd4d1c8d
main
Yi Wu 8 years ago committed by Facebook Github Bot
parent d746aead1a
commit 07bdcb91fe
  1. 1
      HISTORY.md
  2. 5
      db/db_impl.cc
  3. 19
      db/db_impl.h
  4. 240
      db/db_impl_write.cc
  5. 28
      db/flush_scheduler.cc
  6. 15
      db/write_batch.cc
  7. 2
      db/write_batch_internal.h
  8. 280
      db/write_callback_test.cc
  9. 414
      db/write_thread.cc
  10. 194
      db/write_thread.h
  11. 15
      include/rocksdb/options.h
  12. 3
      options/db_options.cc
  13. 1
      options/db_options.h
  14. 1
      options/options.cc
  15. 3
      options/options_helper.h
  16. 1
      options/options_settable_test.cc

@ -13,6 +13,7 @@
* Add debugging function `GetAllKeyVersions` to see internal versions of a range of keys.
* Support file ingestion with universal compaction style
* Support file ingestion behind with option `allow_ingest_behind`
* New option enable_pipelined_write which may improve write throughput in case writing from multiple threads and WAL enabled .
## 5.4.0 (04/11/2017)
### Public API Change

@ -159,10 +159,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
max_total_in_memory_state_(0),
is_snapshot_supported_(true),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
write_thread_(immutable_db_options_.enable_write_thread_adaptive_yield
? immutable_db_options_.write_thread_max_yield_usec
: 0,
immutable_db_options_.write_thread_slow_yield_usec),
write_thread_(immutable_db_options_),
write_controller_(mutable_db_options_.delayed_write_rate),
last_batch_group_size_(0),
unscheduled_flushes_(0),

@ -607,6 +607,11 @@ class DBImpl : public DB {
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false);
Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false);
uint64_t FindMinLogContainingOutstandingPrep();
uint64_t FindMinPrepLogReferencedByMemTable();
@ -726,16 +731,18 @@ class DBImpl : public DB {
Status HandleWriteBufferFull(WriteContext* write_context);
// REQUIRES: mutex locked
Status PreprocessWrite(const WriteOptions& write_options, bool need_log_sync,
bool* logs_getting_syned, WriteContext* write_context);
Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync,
WriteContext* write_context);
Status WriteToWAL(const autovector<WriteThread::Writer*>& write_group,
Status WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, bool need_log_sync,
bool need_log_dir_sync, SequenceNumber sequence);
// Used by WriteImpl to update bg_error_ when encountering memtable insert
// error.
void UpdateBackgroundError(const Status& memtable_insert_status);
// Used by WriteImpl to update bg_error_ if paranoid check is enabled.
void ParanoidCheck(const Status& status);
// Used by WriteImpl to update bg_error_ in case of memtable insert error.
void MemTableInsertStatusCheck(const Status& memtable_insert_status);
#ifndef ROCKSDB_LITE

@ -66,6 +66,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::Corruption("Batch is nullptr!");
}
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable);
}
Status status;
PERF_TIMER_GUARD(write_pre_and_post_process_time);
@ -79,7 +84,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
write_thread_.JoinBatchGroup(&w);
if (w.state == WriteThread::STATE_PARALLEL_FOLLOWER) {
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// we are a non-leader in a parallel group
PERF_TIMER_GUARD(write_memtable_time);
@ -93,11 +98,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
true /*concurrent_memtable_writes*/);
}
if (write_thread_.CompleteParallelWorker(&w)) {
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
// we're responsible for exit batch group
auto last_sequence = w.parallel_group->last_sequence;
auto last_sequence = w.write_group->last_sequence;
versions_->SetLastSequence(last_sequence);
UpdateBackgroundError(w.status);
MemTableInsertStatusCheck(w.status);
write_thread_.ExitAsBatchGroupFollower(&w);
}
assert(w.state == WriteThread::STATE_COMPLETED);
@ -120,10 +125,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// when it finds suitable, and finish them in the same write batch.
// This is how a write job could be done by the other writer.
WriteContext write_context;
WriteThread::Writer* last_writer = &w; // Dummy intial value
autovector<WriteThread::Writer*> write_group;
WriteThread::ParallelGroup pg;
bool logs_getting_synced = false;
WriteThread::WriteGroup write_group;
bool in_parallel_group = false;
uint64_t last_sequence = versions_->LastSequence();
@ -131,8 +133,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
status = PreprocessWrite(write_options, need_log_sync, &logs_getting_synced,
&write_context);
status = PreprocessWrite(write_options, &need_log_sync, &write_context);
log::Writer* cur_log_writer = logs_.back().writer;
mutex_.Unlock();
@ -143,7 +144,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// into memtables
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &last_writer, &write_group);
write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
if (status.ok()) {
// Rules for when we can update the memtable concurrently
@ -158,10 +159,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// relax rules 2 if we could prevent write batches from referring
// more than once to a particular key.
bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
write_group.size() > 1;
write_group.size > 1;
int total_count = 0;
uint64_t total_byte_size = 0;
for (auto writer : write_group) {
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->batch);
@ -187,7 +188,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
RecordTick(stats_, WRITE_DONE_BY_SELF);
auto write_done_by_other = write_group.size() - 1;
auto write_done_by_other = write_group.size - 1;
if (write_done_by_other > 0) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
write_done_by_other);
@ -219,12 +220,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this);
} else {
pg.leader = &w;
pg.last_writer = last_writer;
pg.last_sequence = last_sequence;
pg.running.store(static_cast<uint32_t>(write_group.size()),
std::memory_order_relaxed);
write_thread_.LaunchParallelFollowers(&pg, current_sequence);
SequenceNumber next_sequence = current_sequence;
for (auto* writer : write_group) {
if (writer->ShouldWriteToMemtable()) {
writer->sequence = next_sequence;
next_sequence += WriteBatchInternal::Count(writer->batch);
}
}
write_group.last_sequence = last_sequence;
write_group.running.store(static_cast<uint32_t>(write_group.size),
std::memory_order_relaxed);
write_thread_.LaunchParallelMemTableWriters(&write_group);
in_parallel_group = true;
// Each parallel follower is doing each own writes. The leader should
@ -244,19 +250,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
PERF_TIMER_START(write_pre_and_post_process_time);
//
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
if (immutable_db_options_.paranoid_checks && !status.ok() &&
!w.CallbackFailed() && !status.IsBusy() && !status.IsIncomplete()) {
mutex_.Lock();
if (bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes
}
mutex_.Unlock();
if (!w.CallbackFailed()) {
ParanoidCheck(status);
}
if (logs_getting_synced) {
if (need_log_sync) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
mutex_.Unlock();
@ -266,40 +264,180 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (in_parallel_group) {
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
should_exit_batch_group = write_thread_.CompleteParallelWorker(&w);
should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
}
if (should_exit_batch_group) {
versions_->SetLastSequence(last_sequence);
UpdateBackgroundError(w.status);
write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status);
MemTableInsertStatusCheck(w.status);
write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
}
if (status.ok()) {
status = w.FinalStatus();
}
return status;
}
void DBImpl::UpdateBackgroundError(const Status& memtable_insert_status) {
Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
WriteContext write_context;
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable);
write_thread_.JoinBatchGroup(&w);
if (w.state == WriteThread::STATE_GROUP_LEADER) {
WriteThread::WriteGroup wal_write_group;
if (w.callback && !w.callback->AllowWriteBatching()) {
write_thread_.WaitForMemTableWriters();
}
mutex_.Lock();
bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
log::Writer* cur_log_writer = logs_.back().writer;
mutex_.Unlock();
// This can set non-OK status if callback fail.
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
const SequenceNumber current_sequence =
write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
size_t total_count = 0;
size_t total_byte_size = 0;
if (w.status.ok()) {
SequenceNumber next_sequence = current_sequence;
for (auto writer : wal_write_group) {
if (writer->CheckCallback(this)) {
if (writer->ShouldWriteToMemtable()) {
writer->sequence = next_sequence;
size_t count = WriteBatchInternal::Count(writer->batch);
next_sequence += count;
total_count += count;
}
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
}
}
if (w.disable_wal) {
has_unpersisted_data_.store(true, std::memory_order_relaxed);
}
write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
}
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
PERF_TIMER_STOP(write_pre_and_post_process_time);
if (w.ShouldWriteToWAL()) {
PERF_TIMER_GUARD(write_wal_time);
stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
if (wal_write_group.size > 1) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
wal_write_group.size - 1);
RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
}
w.status = WriteToWAL(wal_write_group, cur_log_writer, need_log_sync,
need_log_dir_sync, current_sequence);
}
if (!w.CallbackFailed()) {
ParanoidCheck(w.status);
}
if (need_log_sync) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
mutex_.Unlock();
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
}
WriteThread::WriteGroup memtable_write_group;
if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
assert(w.status.ok());
write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
if (memtable_write_group.size > 1 &&
immutable_db_options_.allow_concurrent_memtable_write) {
write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
} else {
memtable_write_group.status = WriteBatchInternal::InsertInto(
memtable_write_group, w.sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
}
}
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
assert(w.ShouldWriteToMemtable());
WriteBatchInternal::SetSequence(w.batch, w.sequence);
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/);
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
MemTableInsertStatusCheck(w.status);
versions_->SetLastSequence(w.write_group->last_sequence);
write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
}
}
assert(w.state == WriteThread::STATE_COMPLETED);
if (log_used != nullptr) {
*log_used = w.log_used;
}
return w.FinalStatus();
}
void DBImpl::ParanoidCheck(const Status& status) {
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
if (immutable_db_options_.paranoid_checks && !status.ok() &&
!status.IsBusy() && !status.IsIncomplete()) {
mutex_.Lock();
if (bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes
}
mutex_.Unlock();
}
}
void DBImpl::MemTableInsertStatusCheck(const Status& status) {
// A non-OK status here indicates that the state implied by the
// WAL has diverged from the in-memory state. This could be
// because of a corrupt write_batch (very bad), or because the
// client specified an invalid column family and didn't specify
// ignore_missing_column_families.
if (!memtable_insert_status.ok()) {
if (!status.ok()) {
mutex_.Lock();
assert(bg_error_.ok());
bg_error_ = memtable_insert_status;
bg_error_ = status;
mutex_.Unlock();
}
}
Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
bool need_log_sync, bool* logs_getting_synced,
bool* need_log_sync,
WriteContext* write_context) {
mutex_.AssertHeld();
assert(write_context != nullptr && logs_getting_synced != nullptr);
assert(write_context != nullptr && need_log_sync != nullptr);
Status status;
assert(!single_column_family_mode_ ||
@ -336,7 +474,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
status = DelayWrite(last_batch_group_size_, write_options);
}
if (status.ok() && need_log_sync) {
if (status.ok() && *need_log_sync) {
// Wait until the parallel syncs are finished. Any sync process has to sync
// the front log too so it is enough to check the status of front()
// We do a while loop since log_sync_cv_ is signalled when any sync is
@ -356,26 +494,28 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
// actually write to the WAL
log.getting_synced = true;
}
*logs_getting_synced = true;
} else {
*need_log_sync = false;
}
return status;
}
Status DBImpl::WriteToWAL(const autovector<WriteThread::Writer*>& write_group,
Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, bool need_log_sync,
bool need_log_dir_sync, SequenceNumber sequence) {
Status status;
WriteBatch* merged_batch = nullptr;
size_t write_with_wal = 0;
if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() &&
write_group[0]->batch->GetWalTerminationPoint().is_cleared()) {
auto* leader = write_group.leader;
if (write_group.size == 1 && leader->ShouldWriteToWAL() &&
leader->batch->GetWalTerminationPoint().is_cleared()) {
// we simply write the first WriteBatch to WAL if the group only
// contains one batch, that batch should be written to the WAL,
// and the batch is not wanting to be truncated
merged_batch = write_group[0]->batch;
write_group[0]->log_used = logfile_number_;
merged_batch = leader->batch;
leader->log_used = logfile_number_;
write_with_wal = 1;
} else {
// WAL needs all of the batches flattened into a single batch.
@ -643,6 +783,12 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr;
// In case of pipelined write is enabled, wait for all pending memtable
// writers.
if (immutable_db_options_.enable_pipelined_write) {
write_thread_.WaitForMemTableWriters();
}
// Attempt to switch to a new memtable and trigger flush of old.
// Do this without holding the dbmutex lock.
assert(versions_->prev_log_number() == 0);

@ -15,11 +15,9 @@ namespace rocksdb {
void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
#ifndef NDEBUG
{
std::lock_guard<std::mutex> lock(checking_mutex_);
assert(checking_set_.count(cfd) == 0);
checking_set_.insert(cfd);
}
std::lock_guard<std::mutex> lock(checking_mutex_);
assert(checking_set_.count(cfd) == 0);
checking_set_.insert(cfd);
#endif // NDEBUG
cfd->Ref();
// Suppress false positive clang analyzer warnings.
@ -36,8 +34,11 @@ void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
}
ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
#ifndef NDEBUG
std::lock_guard<std::mutex> lock(checking_mutex_);
#endif // NDEBUG
while (true) {
if (Empty()) {
if (head_.load(std::memory_order_relaxed) == nullptr) {
return nullptr;
}
@ -48,11 +49,9 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
delete node;
#ifndef NDEBUG
{
auto iter = checking_set_.find(cfd);
assert(iter != checking_set_.end());
checking_set_.erase(iter);
}
auto iter = checking_set_.find(cfd);
assert(iter != checking_set_.end());
checking_set_.erase(iter);
#endif // NDEBUG
if (!cfd->IsDropped()) {
@ -68,8 +67,13 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
}
bool FlushScheduler::Empty() {
#ifndef NDEBUG
std::lock_guard<std::mutex> lock(checking_mutex_);
#endif // NDEBUG
auto rv = head_.load(std::memory_order_relaxed) == nullptr;
#ifndef NDEBUG
assert(rv == checking_set_.empty());
#endif // NDEBUG
return rv;
}
@ -80,7 +84,7 @@ void FlushScheduler::Clear() {
delete cfd;
}
}
assert(Empty());
assert(head_.load(std::memory_order_relaxed) == nullptr);
}
} // namespace rocksdb

@ -1290,16 +1290,17 @@ public:
// 2) During Write(), in a single-threaded write thread
// 3) During Write(), in a concurrent context where memtables has been cloned
// The reason is that it calls memtables->Seek(), which has a stateful cache
Status WriteBatchInternal::InsertInto(
const autovector<WriteThread::Writer*>& writers, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
bool concurrent_memtable_writes) {
Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group,
SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t recovery_log_number, DB* db,
bool concurrent_memtable_writes) {
MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, recovery_log_number,
db, concurrent_memtable_writes);
for (size_t i = 0; i < writers.size(); i++) {
auto w = writers[i];
for (auto w : write_group) {
if (!w->ShouldWriteToMemtable()) {
continue;
}

@ -153,7 +153,7 @@ class WriteBatchInternal {
//
// Under concurrent use, the caller is responsible for making sure that
// the memtables object itself is thread-local.
static Status InsertInto(const autovector<WriteThread::Writer*>& batches,
static Status InsertInto(WriteThread::WriteGroup& write_group,
SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,

@ -119,161 +119,169 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
for (auto& allow_parallel : {true, false}) {
for (auto& allow_batching : {true, false}) {
for (auto& enable_WAL : {true, false}) {
for (auto& write_group : write_scenarios) {
Options options;
options.create_if_missing = true;
options.allow_concurrent_memtable_write = allow_parallel;
ReadOptions read_options;
DB* db;
DBImpl* db_impl;
DestroyDB(dbname, options);
ASSERT_OK(DB::Open(options, dbname, &db));
db_impl = dynamic_cast<DBImpl*>(db);
ASSERT_TRUE(db_impl);
for (auto& enable_pipelined_write : {true, false}) {
for (auto& write_group : write_scenarios) {
Options options;
options.create_if_missing = true;
options.allow_concurrent_memtable_write = allow_parallel;
options.enable_pipelined_write = enable_pipelined_write;
ReadOptions read_options;
DB* db;
DBImpl* db_impl;
DestroyDB(dbname, options);
ASSERT_OK(DB::Open(options, dbname, &db));
db_impl = dynamic_cast<DBImpl*>(db);
ASSERT_TRUE(db_impl);
std::atomic<uint64_t> threads_waiting(0);
std::atomic<uint64_t> seq(db_impl->GetLatestSequenceNumber());
ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
uint64_t cur_threads_waiting = 0;
bool is_leader = false;
bool is_last = false;
// who am i
do {
cur_threads_waiting = threads_waiting.load();
is_leader = (cur_threads_waiting == 0);
is_last = (cur_threads_waiting == write_group.size() - 1);
} while (!threads_waiting.compare_exchange_strong(
cur_threads_waiting, cur_threads_waiting + 1));
// check my state
auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
if (is_leader) {
ASSERT_TRUE(writer->state ==
WriteThread::State::STATE_GROUP_LEADER);
} else {
ASSERT_TRUE(writer->state ==
WriteThread::State::STATE_INIT);
}
// (meta test) the first WriteOP should indeed be the first
// and the last should be the last (all others can be out of
// order)
if (is_leader) {
ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
!write_group.front().callback_.should_fail_);
} else if (is_last) {
ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
!write_group.back().callback_.should_fail_);
}
// wait for friends
while (threads_waiting.load() < write_group.size()) {
}
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) {
// check my state
auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
if (!allow_batching) {
// no batching so everyone should be a leader
ASSERT_TRUE(writer->state ==
WriteThread::State::STATE_GROUP_LEADER);
} else if (!allow_parallel) {
ASSERT_TRUE(
writer->state == WriteThread::State::STATE_COMPLETED ||
(enable_pipelined_write &&
writer->state ==
WriteThread::State::STATE_MEMTABLE_WRITER_LEADER));
}
});
std::atomic<uint32_t> thread_num(0);
std::atomic<char> dummy_key(0);
std::function<void()> write_with_callback_func = [&]() {
uint32_t i = thread_num.fetch_add(1);
Random rnd(i);
// leaders gotta lead
while (i > 0 && threads_waiting.load() < 1) {
}
std::atomic<uint64_t> threads_waiting(0);
std::atomic<uint64_t> seq(db_impl->GetLatestSequenceNumber());
ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0);
// loser has to lose
while (i == write_group.size() - 1 &&
threads_waiting.load() < write_group.size() - 1) {
}
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
uint64_t cur_threads_waiting = 0;
bool is_leader = false;
bool is_last = false;
auto& write_op = write_group.at(i);
write_op.Clear();
write_op.callback_.allow_batching_ = allow_batching;
// who am i
// insert some keys
for (uint32_t j = 0; j < rnd.Next() % 50; j++) {
// grab unique key
char my_key = 0;
do {
cur_threads_waiting = threads_waiting.load();
is_leader = (cur_threads_waiting == 0);
is_last = (cur_threads_waiting == write_group.size() - 1);
} while (!threads_waiting.compare_exchange_strong(
cur_threads_waiting, cur_threads_waiting + 1));
// check my state
auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
if (is_leader) {
ASSERT_TRUE(writer->state ==
WriteThread::State::STATE_GROUP_LEADER);
} else {
ASSERT_TRUE(writer->state == WriteThread::State::STATE_INIT);
}
my_key = dummy_key.load();
} while (
!dummy_key.compare_exchange_strong(my_key, my_key + 1));
// (meta test) the first WriteOP should indeed be the first
// and the last should be the last (all others can be out of
// order)
if (is_leader) {
ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
!write_group.front().callback_.should_fail_);
} else if (is_last) {
ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
!write_group.back().callback_.should_fail_);
}
string skey(5, my_key);
string sval(10, my_key);
write_op.Put(skey, sval);
// wait for friends
while (threads_waiting.load() < write_group.size()) {
if (!write_op.callback_.should_fail_) {
seq.fetch_add(1);
}
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) {
// check my state
auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
if (!allow_batching) {
// no batching so everyone should be a leader
ASSERT_TRUE(writer->state ==
WriteThread::State::STATE_GROUP_LEADER);
} else if (!allow_parallel) {
ASSERT_TRUE(writer->state ==
WriteThread::State::STATE_COMPLETED);
}
});
std::atomic<uint32_t> thread_num(0);
std::atomic<char> dummy_key(0);
std::function<void()> write_with_callback_func = [&]() {
uint32_t i = thread_num.fetch_add(1);
Random rnd(i);
// leaders gotta lead
while (i > 0 && threads_waiting.load() < 1) {
}
// loser has to lose
while (i == write_group.size() - 1 &&
threads_waiting.load() < write_group.size() - 1) {
}
auto& write_op = write_group.at(i);
write_op.Clear();
write_op.callback_.allow_batching_ = allow_batching;
// insert some keys
for (uint32_t j = 0; j < rnd.Next() % 50; j++) {
// grab unique key
char my_key = 0;
do {
my_key = dummy_key.load();
} while (!dummy_key.compare_exchange_strong(my_key, my_key + 1));
}
string skey(5, my_key);
string sval(10, my_key);
write_op.Put(skey, sval);
WriteOptions woptions;
woptions.disableWAL = !enable_WAL;
woptions.sync = enable_WAL;
Status s = db_impl->WriteWithCallback(
woptions, &write_op.write_batch_, &write_op.callback_);
if (!write_op.callback_.should_fail_) {
seq.fetch_add(1);
if (write_op.callback_.should_fail_) {
ASSERT_TRUE(s.IsBusy());
} else {
ASSERT_OK(s);
}
}
};
WriteOptions woptions;
woptions.disableWAL = !enable_WAL;
woptions.sync = enable_WAL;
Status s = db_impl->WriteWithCallback(
woptions, &write_op.write_batch_, &write_op.callback_);
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
if (write_op.callback_.should_fail_) {
ASSERT_TRUE(s.IsBusy());
} else {
ASSERT_OK(s);
// do all the writes
std::vector<port::Thread> threads;
for (uint32_t i = 0; i < write_group.size(); i++) {
threads.emplace_back(write_with_callback_func);
}
for (auto& t : threads) {
t.join();
}
};
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// do all the writes
std::vector<port::Thread> threads;
for (uint32_t i = 0; i < write_group.size(); i++) {
threads.emplace_back(write_with_callback_func);
}
for (auto& t : threads) {
t.join();
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
// check for keys
string value;
for (auto& w : write_group) {
ASSERT_TRUE(w.callback_.was_called_);
for (auto& kvp : w.kvs_) {
if (w.callback_.should_fail_) {
ASSERT_TRUE(
db->Get(read_options, kvp.first, &value).IsNotFound());
} else {
ASSERT_OK(db->Get(read_options, kvp.first, &value));
ASSERT_EQ(value, kvp.second);
// check for keys
string value;
for (auto& w : write_group) {
ASSERT_TRUE(w.callback_.was_called_);
for (auto& kvp : w.kvs_) {
if (w.callback_.should_fail_) {
ASSERT_TRUE(
db->Get(read_options, kvp.first, &value).IsNotFound());
} else {
ASSERT_OK(db->Get(read_options, kvp.first, &value));
ASSERT_EQ(value, kvp.second);
}
}
}
}
ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber());
ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber());
delete db;
DestroyDB(dbname, options);
delete db;
DestroyDB(dbname, options);
}
}
}
}

@ -15,10 +15,17 @@
namespace rocksdb {
WriteThread::WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec)
: max_yield_usec_(max_yield_usec),
slow_yield_usec_(slow_yield_usec),
newest_writer_(nullptr) {}
WriteThread::WriteThread(const ImmutableDBOptions& db_options)
: max_yield_usec_(db_options.enable_write_thread_adaptive_yield
? db_options.write_thread_max_yield_usec
: 0),
slow_yield_usec_(db_options.write_thread_slow_yield_usec),
allow_concurrent_memtable_write_(
db_options.allow_concurrent_memtable_write),
enable_pipelined_write_(db_options.enable_pipelined_write),
newest_writer_(nullptr),
newest_memtable_writer_(nullptr),
last_sequence_(0) {}
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
// We're going to block. Lazily create the mutex. We guarantee
@ -184,22 +191,39 @@ void WriteThread::SetState(Writer* w, uint8_t new_state) {
}
}
void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) {
bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
assert(newest_writer != nullptr);
assert(w->state == STATE_INIT);
Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
Writer* writers = newest_writer_.load(std::memory_order_relaxed);
w->link_older = writers;
if (newest_writer_.compare_exchange_strong(writers, w)) {
if (writers == nullptr) {
// this isn't part of the WriteThread machinery, but helps with
// debugging and is checked by an assert in WriteImpl
w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed);
}
// Then we are the head of the queue and hence definiltly the leader
*linked_as_leader = (writers == nullptr);
// Otherwise we will wait for previous leader to define our status
return;
if (newest_writer->compare_exchange_weak(writers, w)) {
return (writers == nullptr);
}
}
}
bool WriteThread::LinkGroup(WriteGroup& write_group,
std::atomic<Writer*>* newest_writer) {
assert(newest_writer != nullptr);
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
Writer* w = last_writer;
while (true) {
// Unset link_newer pointers to make sure when we call
// CreateMissingNewerLinks later it create all missing links.
w->link_newer = nullptr;
w->write_group = nullptr;
if (w == leader) {
break;
}
w = w->link_older;
}
Writer* newest = newest_writer->load(std::memory_order_relaxed);
while (true) {
leader->link_older = newest;
if (newest_writer->compare_exchange_weak(newest, last_writer)) {
return (newest == nullptr);
}
}
}
@ -216,12 +240,43 @@ void WriteThread::CreateMissingNewerLinks(Writer* head) {
}
}
void WriteThread::CompleteLeader(WriteGroup& write_group) {
assert(write_group.size > 0);
Writer* leader = write_group.leader;
if (write_group.size == 1) {
write_group.leader = nullptr;
write_group.last_writer = nullptr;
} else {
assert(leader->link_newer != nullptr);
leader->link_newer->link_older = nullptr;
write_group.leader = leader->link_newer;
}
write_group.size -= 1;
SetState(leader, STATE_COMPLETED);
}
void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) {
assert(write_group.size > 1);
assert(w != write_group.leader);
if (w == write_group.last_writer) {
w->link_older->link_newer = nullptr;
write_group.last_writer = w->link_older;
} else {
w->link_older->link_newer = w->link_newer;
w->link_newer->link_older = w->link_older;
}
write_group.size -= 1;
SetState(w, STATE_COMPLETED);
}
void WriteThread::JoinBatchGroup(Writer* w) {
static AdaptationContext ctx("JoinBatchGroup");
assert(w->batch != nullptr);
bool linked_as_leader;
LinkOne(w, &linked_as_leader);
bool linked_as_leader = LinkOne(w, &newest_writer_);
if (linked_as_leader) {
SetState(w, STATE_GROUP_LEADER);
}
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
@ -231,23 +286,28 @@ void WriteThread::JoinBatchGroup(Writer* w) {
* 1) An existing leader pick us as the new leader when it finishes
* 2) An existing leader pick us as its follewer and
* 2.1) finishes the memtable writes on our behalf
* 2.2) Or tell us to finish the memtable writes it in pralallel
* 2.2) Or tell us to finish the memtable writes in pralallel
* 3) (pipelined write) An existing leader pick us as its follower and
* finish book-keeping and WAL write for us, enqueue us as pending
* memtable writer, and
* 3.1) we become memtable writer group leader, or
* 3.2) an existing memtable writer group leader tell us to finish memtable
* writes in parallel.
*/
AwaitState(w,
STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED,
AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&ctx);
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
}
}
size_t WriteThread::EnterAsBatchGroupLeader(
Writer* leader, WriteThread::Writer** last_writer,
autovector<WriteThread::Writer*>* write_batch_group) {
size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
WriteGroup* write_group) {
assert(leader->link_older == nullptr);
assert(leader->batch != nullptr);
assert(write_group != nullptr);
size_t size = WriteBatchInternal::ByteSize(leader->batch);
write_batch_group->push_back(leader);
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
@ -257,8 +317,10 @@ size_t WriteThread::EnterAsBatchGroupLeader(
max_size = size + (128 << 10);
}
*last_writer = leader;
leader->write_group = write_group;
write_group->leader = leader;
write_group->last_writer = leader;
write_group->size = 1;
Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
// This is safe regardless of any db mutex status of the caller. Previous
@ -308,136 +370,268 @@ size_t WriteThread::EnterAsBatchGroupLeader(
break;
}
w->write_group = write_group;
size += batch_size;
write_batch_group->push_back(w);
w->in_batch_group = true;
*last_writer = w;
write_group->last_writer = w;
write_group->size++;
}
return size;
}
void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
SequenceNumber sequence) {
// EnterAsBatchGroupLeader already created the links from leader to
// newer writers in the group
void WriteThread::EnterAsMemTableWriter(Writer* leader,
WriteGroup* write_group) {
assert(leader != nullptr);
assert(leader->link_older == nullptr);
assert(leader->batch != nullptr);
assert(write_group != nullptr);
size_t size = WriteBatchInternal::ByteSize(leader->batch);
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128 << 10)) {
max_size = size + (128 << 10);
}
leader->write_group = write_group;
write_group->leader = leader;
write_group->size = 1;
Writer* last_writer = leader;
if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
Writer* newest_writer = newest_memtable_writer_.load();
CreateMissingNewerLinks(newest_writer);
Writer* w = leader;
while (w != newest_writer) {
w = w->link_newer;
if (w->batch == nullptr) {
break;
}
if (w->batch->HasMerge()) {
break;
}
pg->leader->parallel_group = pg;
if (!allow_concurrent_memtable_write_) {
auto batch_size = WriteBatchInternal::ByteSize(w->batch);
if (size + batch_size > max_size) {
// Do not make batch too big
break;
}
size += batch_size;
}
Writer* w = pg->leader;
w->sequence = sequence;
w->write_group = write_group;
last_writer = w;
write_group->size++;
}
}
write_group->last_writer = last_writer;
write_group->last_sequence =
last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
}
// Initialize and wake up the others
while (w != pg->last_writer) {
// Writers that won't write don't get sequence allotment
if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) {
// There is a sequence number of each written key
sequence += WriteBatchInternal::Count(w->batch);
void WriteThread::ExitAsMemTableWriter(Writer* self, WriteGroup& write_group) {
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
Writer* newest_writer = last_writer;
if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
nullptr)) {
CreateMissingNewerLinks(newest_writer);
Writer* next_leader = last_writer->link_newer;
assert(next_leader != nullptr);
next_leader->link_older = nullptr;
SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
}
Writer* w = leader;
while (true) {
if (!write_group.status.ok()) {
w->status = write_group.status;
}
w = w->link_newer;
Writer* next = w->link_newer;
if (w != leader) {
SetState(w, STATE_COMPLETED);
}
if (w == last_writer) {
break;
}
w = next;
}
// Note that leader has to exit last, since it owns the write group.
SetState(leader, STATE_COMPLETED);
}
w->sequence = sequence; // sequence number for the first key in the batch
w->parallel_group = pg;
SetState(w, STATE_PARALLEL_FOLLOWER);
void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
assert(write_group != nullptr);
write_group->running.store(write_group->size);
for (auto w : *write_group) {
SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
}
}
// This method is called by both the leader and parallel followers
bool WriteThread::CompleteParallelWorker(Writer* w) {
static AdaptationContext ctx("CompleteParallelWorker");
bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
static AdaptationContext ctx("CompleteParallelMemTableWriter");
auto* pg = w->parallel_group;
auto* write_group = w->write_group;
if (!w->status.ok()) {
std::lock_guard<std::mutex> guard(pg->leader->StateMutex());
pg->status = w->status;
std::lock_guard<std::mutex> guard(write_group->leader->StateMutex());
write_group->status = w->status;
}
if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) {
if (write_group->running-- > 1) {
// we're not the last one
AwaitState(w, STATE_COMPLETED, &ctx);
return false;
}
// else we're the last parallel worker and should perform exit duties.
w->status = pg->status;
w->status = write_group->status;
return true;
}
void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
auto* pg = w->parallel_group;
auto* write_group = w->write_group;
assert(w->state == STATE_PARALLEL_FOLLOWER);
assert(pg->status.ok());
ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status);
assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER);
assert(write_group->status.ok());
ExitAsBatchGroupLeader(*write_group, write_group->status);
assert(w->status.ok());
assert(w->state == STATE_COMPLETED);
SetState(pg->leader, STATE_COMPLETED);
SetState(write_group->leader, STATE_COMPLETED);
}
void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Status status) {
static AdaptationContext ctx("ExitAsBatchGroupLeader");
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
assert(leader->link_older == nullptr);
Writer* head = newest_writer_.load(std::memory_order_acquire);
if (head != last_writer ||
!newest_writer_.compare_exchange_strong(head, nullptr)) {
// Either w wasn't the head during the load(), or it was the head
// during the load() but somebody else pushed onto the list before
// we did the compare_exchange_strong (causing it to fail). In the
// latter case compare_exchange_strong has the effect of re-reading
// its first param (head). No need to retry a failing CAS, because
// only a departing leader (which we are at the moment) can remove
// nodes from the list.
assert(head != last_writer);
// After walking link_older starting from head (if not already done)
// we will be able to traverse w->link_newer below. This function
// can only be called from an active leader, only a leader can
// clear newest_writer_, we didn't, and only a clear newest_writer_
// could cause the next leader to start their work without a call
// to MarkJoined, so we can definitely conclude that no other leader
// work is going on here (with or without db mutex).
CreateMissingNewerLinks(head);
assert(last_writer->link_newer->link_older == last_writer);
last_writer->link_newer->link_older = nullptr;
// Next leader didn't self-identify, because newest_writer_ wasn't
// nullptr when they enqueued (we were definitely enqueued before them
// and are still in the list). That means leader handoff occurs when
// we call MarkJoined
SetState(last_writer->link_newer, STATE_GROUP_LEADER);
}
// else nobody else was waiting, although there might already be a new
// leader now
while (last_writer != leader) {
last_writer->status = status;
// we need to read link_older before calling SetState, because as soon
// as it is marked committed the other thread's Await may return and
// deallocate the Writer.
auto next = last_writer->link_older;
SetState(last_writer, STATE_COMPLETED);
last_writer = next;
if (enable_pipelined_write_) {
// Notify writers don't write to memtable to exit.
for (Writer* w = last_writer; w != leader;) {
Writer* next = w->link_older;
w->status = status;
if (!w->ShouldWriteToMemtable()) {
CompleteFollower(w, write_group);
}
w = next;
}
if (!leader->ShouldWriteToMemtable()) {
CompleteLeader(write_group);
}
// Link the ramaining of the group to memtable writer list.
if (write_group.size > 0) {
if (LinkGroup(write_group, &newest_memtable_writer_)) {
// The leader can now be different from current writer.
SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
}
}
// Reset newest_writer_ and wake up the next leader.
Writer* newest_writer = last_writer;
if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
Writer* next_leader = newest_writer;
while (next_leader->link_older != last_writer) {
next_leader = next_leader->link_older;
assert(next_leader != nullptr);
}
next_leader->link_older = nullptr;
SetState(next_leader, STATE_GROUP_LEADER);
}
AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&ctx);
} else {
Writer* head = newest_writer_.load(std::memory_order_acquire);
if (head != last_writer ||
!newest_writer_.compare_exchange_strong(head, nullptr)) {
// Either w wasn't the head during the load(), or it was the head
// during the load() but somebody else pushed onto the list before
// we did the compare_exchange_strong (causing it to fail). In the
// latter case compare_exchange_strong has the effect of re-reading
// its first param (head). No need to retry a failing CAS, because
// only a departing leader (which we are at the moment) can remove
// nodes from the list.
assert(head != last_writer);
// After walking link_older starting from head (if not already done)
// we will be able to traverse w->link_newer below. This function
// can only be called from an active leader, only a leader can
// clear newest_writer_, we didn't, and only a clear newest_writer_
// could cause the next leader to start their work without a call
// to MarkJoined, so we can definitely conclude that no other leader
// work is going on here (with or without db mutex).
CreateMissingNewerLinks(head);
assert(last_writer->link_newer->link_older == last_writer);
last_writer->link_newer->link_older = nullptr;
// Next leader didn't self-identify, because newest_writer_ wasn't
// nullptr when they enqueued (we were definitely enqueued before them
// and are still in the list). That means leader handoff occurs when
// we call MarkJoined
SetState(last_writer->link_newer, STATE_GROUP_LEADER);
}
// else nobody else was waiting, although there might already be a new
// leader now
while (last_writer != leader) {
last_writer->status = status;
// we need to read link_older before calling SetState, because as soon
// as it is marked committed the other thread's Await may return and
// deallocate the Writer.
auto next = last_writer->link_older;
SetState(last_writer, STATE_COMPLETED);
last_writer = next;
}
}
}
void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
static AdaptationContext ctx("EnterUnbatched");
assert(w->batch == nullptr);
bool linked_as_leader;
LinkOne(w, &linked_as_leader);
assert(w != nullptr && w->batch == nullptr);
mu->Unlock();
bool linked_as_leader = LinkOne(w, &newest_writer_);
if (!linked_as_leader) {
mu->Unlock();
TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
// Last leader will not pick us as a follower since our batch is nullptr
AwaitState(w, STATE_GROUP_LEADER, &ctx);
mu->Lock();
}
if (enable_pipelined_write_) {
WaitForMemTableWriters();
}
mu->Lock();
}
void WriteThread::ExitUnbatched(Writer* w) {
Status dummy_status;
ExitAsBatchGroupLeader(w, w, dummy_status);
assert(w != nullptr);
Writer* newest_writer = w;
if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
CreateMissingNewerLinks(newest_writer);
Writer* next_leader = w->link_newer;
assert(next_leader != nullptr);
next_leader->link_older = nullptr;
SetState(next_leader, STATE_GROUP_LEADER);
}
}
void WriteThread::WaitForMemTableWriters() {
static AdaptationContext ctx("WaitForMemTableWriters");
assert(enable_pipelined_write_);
if (newest_memtable_writer_.load() == nullptr) {
return;
}
Writer w;
if (!LinkOne(&w, &newest_memtable_writer_)) {
AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &ctx);
}
newest_memtable_writer_.store(nullptr);
}
} // namespace rocksdb

@ -49,32 +49,66 @@ class WriteThread {
// the leader to STATE_COMPLETED.
STATE_GROUP_LEADER = 2,
// A Writer that has returned as a follower in a parallel group.
// It should apply its batch to the memtable and then call
// CompleteParallelWorker. When someone calls ExitAsBatchGroupLeader
// or EarlyExitParallelGroup this state will get transitioned to
// STATE_COMPLETED.
STATE_PARALLEL_FOLLOWER = 4,
// The state used to inform a waiting writer that it has become the
// leader of memtable writer group. The leader will either write
// memtable for the whole group, or launch a parallel group write
// to memtable by calling LaunchParallelMemTableWrite.
STATE_MEMTABLE_WRITER_LEADER = 4,
// The state used to inform a waiting writer that it has become a
// parallel memtable writer. It can be the group leader who launch the
// praallel writer group, or one of the followers. The writer should then
// apply its batch to the memtable concurrently and call
// CompleteParallelMemTableWriter.
STATE_PARALLEL_MEMTABLE_WRITER = 8,
// A follower whose writes have been applied, or a parallel leader
// whose followers have all finished their work. This is a terminal
// state.
STATE_COMPLETED = 8,
STATE_COMPLETED = 16,
// A state indicating that the thread may be waiting using StateMutex()
// and StateCondVar()
STATE_LOCKED_WAITING = 16,
STATE_LOCKED_WAITING = 32,
};
struct Writer;
struct ParallelGroup {
Writer* leader;
Writer* last_writer;
struct WriteGroup {
Writer* leader = nullptr;
Writer* last_writer = nullptr;
SequenceNumber last_sequence;
// before running goes to zero, status needs leader->StateMutex()
Status status;
std::atomic<uint32_t> running;
std::atomic<size_t> running;
size_t size = 0;
struct Iterator {
Writer* writer;
Writer* last_writer;
explicit Iterator(Writer* w, Writer* last)
: writer(w), last_writer(last) {}
Writer* operator*() const { return writer; }
Iterator& operator++() {
assert(writer != nullptr);
if (writer == last_writer) {
writer = nullptr;
} else {
writer = writer->link_newer;
}
return *this;
}
bool operator!=(const Iterator& other) const {
return writer != other.writer;
}
};
Iterator begin() const { return Iterator(leader, last_writer); }
Iterator end() const { return Iterator(nullptr, nullptr); }
};
// Information kept for every waiting writer.
@ -86,11 +120,10 @@ class WriteThread {
bool disable_memtable;
uint64_t log_used; // log number that this batch was inserted into
uint64_t log_ref; // log number that memtable insert should reference
bool in_batch_group;
WriteCallback* callback;
bool made_waitable; // records lazy construction of mutex and cv
std::atomic<uint8_t> state; // write under StateMutex() or pre-link
ParallelGroup* parallel_group;
WriteGroup* write_group;
SequenceNumber sequence; // the sequence number to use for the first key
Status status; // status of memtable inserter
Status callback_status; // status returned by callback->Callback()
@ -107,11 +140,10 @@ class WriteThread {
disable_memtable(false),
log_used(0),
log_ref(0),
in_batch_group(false),
callback(nullptr),
made_waitable(false),
state(STATE_INIT),
parallel_group(nullptr),
write_group(nullptr),
link_older(nullptr),
link_newer(nullptr) {}
@ -124,11 +156,10 @@ class WriteThread {
disable_memtable(_disable_memtable),
log_used(0),
log_ref(_log_ref),
in_batch_group(false),
callback(_callback),
made_waitable(false),
state(STATE_INIT),
parallel_group(nullptr),
write_group(nullptr),
link_older(nullptr),
link_newer(nullptr) {}
@ -182,10 +213,12 @@ class WriteThread {
}
bool ShouldWriteToMemtable() {
return !CallbackFailed() && !disable_memtable;
return status.ok() && !CallbackFailed() && !disable_memtable;
}
bool ShouldWriteToWAL() { return !CallbackFailed() && !disable_wal; }
bool ShouldWriteToWAL() {
return status.ok() && !CallbackFailed() && !disable_wal;
}
// No other mutexes may be acquired while holding StateMutex(), it is
// always last in the order
@ -201,7 +234,16 @@ class WriteThread {
}
};
WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec);
struct AdaptationContext {
const char* name;
std::atomic<int32_t> value;
explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
};
explicit WriteThread(const ImmutableDBOptions& db_options);
virtual ~WriteThread() = default;
// IMPORTANT: None of the methods in this class rely on the db mutex
// for correctness. All of the methods except JoinBatchGroup and
@ -226,40 +268,45 @@ class WriteThread {
// Constructs a write batch group led by leader, which should be a
// Writer passed to JoinBatchGroup on the current thread.
//
// Writer* leader: Writer that is STATE_GROUP_LEADER
// Writer** last_writer: Out-param that identifies the last follower
// autovector<WriteBatch*>* write_batch_group: Out-param of group members
// returns: Total batch group byte size
size_t EnterAsBatchGroupLeader(
Writer* leader, Writer** last_writer,
autovector<WriteThread::Writer*>* write_batch_group);
// Writer* leader: Writer that is STATE_GROUP_LEADER
// WriteGroup* write_group: Out-param of group members
// returns: Total batch group byte size
size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);
// Unlinks the Writer-s in a batch group, wakes up the non-leaders,
// and wakes up the next leader (if any).
//
// WriteGroup* write_group: the write group
// Status status: Status of write operation
void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status);
// Exit batch group on behalf of batch group leader.
void ExitAsBatchGroupFollower(Writer* w);
// Constructs a write batch group led by leader from newest_memtable_writers_
// list. The leader should either write memtable for the whole group and
// call ExitAsMemTableWriter, or launch parallel memtable write through
// LaunchParallelMemTableWriters.
void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup);
// Memtable writer group leader, or the last finished writer in a parallel
// write group, exit from the newest_memtable_writers_ list, and wake up
// the next leader if needed.
void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group);
// Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the
// non-leader members of this write batch group. Sets Writer::sequence
// before waking them up.
//
// ParallalGroup* pg: Extra state used to coordinate the parallel add
// SequenceNumber sequence: Starting sequence number to assign to Writer-s
void LaunchParallelFollowers(ParallelGroup* pg, SequenceNumber sequence);
// WriteGroup* write_group: Extra state used to coordinate the parallel add
void LaunchParallelMemTableWriters(WriteGroup* write_group);
// Reports the completion of w's batch to the parallel group leader, and
// waits for the rest of the parallel batch to complete. Returns true
// if this thread is the last to complete, and hence should advance
// the sequence number and then call EarlyExitParallelGroup, false if
// someone else has already taken responsibility for that.
bool CompleteParallelWorker(Writer* w);
// Exit batch group on behalf of batch group leader.
void ExitAsBatchGroupFollower(Writer* w);
// Unlinks the Writer-s in a batch group, wakes up the non-leaders,
// and wakes up the next leader (if any).
//
// Writer* leader: From EnterAsBatchGroupLeader
// Writer* last_writer: Value of out-param of EnterAsBatchGroupLeader
// Status status: Status of write operation
void ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
Status status);
bool CompleteParallelMemTableWriter(Writer* w);
// Waits for all preceding writers (unlocking mu while waiting), then
// registers w as the currently proceeding writer.
@ -273,21 +320,40 @@ class WriteThread {
// writers.
void ExitUnbatched(Writer* w);
struct AdaptationContext {
const char* name;
std::atomic<int32_t> value;
// Wait for all parallel memtable writers to finish, in case pipelined
// write is enabled.
void WaitForMemTableWriters();
explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
};
SequenceNumber UpdateLastSequence(SequenceNumber sequence) {
if (sequence > last_sequence_) {
last_sequence_ = sequence;
}
return last_sequence_;
}
private:
uint64_t max_yield_usec_;
uint64_t slow_yield_usec_;
// See AwaitState.
const uint64_t max_yield_usec_;
const uint64_t slow_yield_usec_;
// Allow multiple writers write to memtable concurrently.
const bool allow_concurrent_memtable_write_;
// Enable pipelined write to WAL and memtable.
const bool enable_pipelined_write_;
// Points to the newest pending Writer. Only leader can remove
// elements, adding can be done lock-free by anybody
// Points to the newest pending writer. Only leader can remove
// elements, adding can be done lock-free by anybody.
std::atomic<Writer*> newest_writer_;
// Points to the newest pending memtable writer. Used only when pipelined
// write is enabled.
std::atomic<Writer*> newest_memtable_writer_;
// The last sequence that have been consumed by a writer. The sequence
// is not necessary visible to reads because the writer can be ongoing.
SequenceNumber last_sequence_;
// Waits for w->state & goal_mask using w->StateMutex(). Returns
// the state that satisfies goal_mask.
uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
@ -298,16 +364,30 @@ class WriteThread {
// a context-dependent static.
uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
// Set writer state and wake the writer up if it is waiting.
void SetState(Writer* w, uint8_t new_state);
// Links w into the newest_writer_ list. Sets *linked_as_leader to
// true if w was linked directly into the leader position. Safe to
// call from multiple threads without external locking.
void LinkOne(Writer* w, bool* linked_as_leader);
// Links w into the newest_writer list. Return true if w was linked directly
// into the leader position. Safe to call from multiple threads without
// external locking.
bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer);
// Link write group into the newest_writer list as a whole, while keeping the
// order of the writers unchanged. Return true if the group was linked
// directly into the leader position.
bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer);
// Computes any missing link_newer links. Should not be called
// concurrently with itself.
void CreateMissingNewerLinks(Writer* head);
// Set the leader in write_group to completed state and remove it from the
// write group.
void CompleteLeader(WriteGroup& write_group);
// Set a follower in write_group to completed state and remove it from the
// write group.
void CompleteFollower(Writer* w, WriteGroup& write_group);
};
} // namespace rocksdb

@ -751,6 +751,21 @@ struct DBOptions {
// Default: 16MB/s
uint64_t delayed_write_rate = 16 * 1024U * 1024U;
// By default, a single write thread queue is maintained. The thread gets
// to the head of the queue becomes write batch group leader and responsible
// for writing to WAL and memtable for the batch group.
//
// If enable_pipelined_write is true, separate write thread queue is
// maintained for WAL write and memtable write. A write thread first enter WAL
// writer queue and then memtable writer queue. Pending thread on the WAL
// writer queue thus only have to wait for previous writers to finish thier
// WAL writing but not the memtable writing. Enabling the feature may improve
// write throughput and reduce latency of the prepare phase of two-phase
// commit.
//
// Default: false
bool enable_pipelined_write = false;
// If true, allow multi-writers to update mem tables in parallel.
// Only some memtable_factory-s support concurrent writes; currently it
// is implemented only for SkipListFactory. Concurrent memtable writes

@ -70,6 +70,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
wal_bytes_per_sync(options.wal_bytes_per_sync),
listeners(options.listeners),
enable_thread_tracking(options.enable_thread_tracking),
enable_pipelined_write(options.enable_pipelined_write),
allow_concurrent_memtable_write(options.allow_concurrent_memtable_write),
enable_write_thread_adaptive_yield(
options.enable_write_thread_adaptive_yield),
@ -189,6 +190,8 @@ void ImmutableDBOptions::Dump(Logger* log) const {
wal_recovery_mode);
ROCKS_LOG_HEADER(log, " Options.enable_thread_tracking: %d",
enable_thread_tracking);
ROCKS_LOG_HEADER(log, " Options.enable_pipelined_write: %d",
enable_pipelined_write);
ROCKS_LOG_HEADER(log, " Options.allow_concurrent_memtable_write: %d",
allow_concurrent_memtable_write);
ROCKS_LOG_HEADER(log, " Options.enable_write_thread_adaptive_yield: %d",

@ -63,6 +63,7 @@ struct ImmutableDBOptions {
uint64_t wal_bytes_per_sync;
std::vector<std::shared_ptr<EventListener>> listeners;
bool enable_thread_tracking;
bool enable_pipelined_write;
bool allow_concurrent_memtable_write;
bool enable_write_thread_adaptive_yield;
uint64_t write_thread_max_yield_usec;

@ -176,6 +176,7 @@ DBOptions::DBOptions(const Options& options)
listeners(options.listeners),
enable_thread_tracking(options.enable_thread_tracking),
delayed_write_rate(options.delayed_write_rate),
enable_pipelined_write(options.enable_pipelined_write),
allow_concurrent_memtable_write(options.allow_concurrent_memtable_write),
enable_write_thread_adaptive_yield(
options.enable_write_thread_adaptive_yield),

@ -305,6 +305,9 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"fail_if_options_file_error",
{offsetof(struct DBOptions, fail_if_options_file_error),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
{"enable_pipelined_write",
{offsetof(struct DBOptions, enable_pipelined_write), OptionType::kBoolean,
OptionVerificationType::kNormal, false, 0}},
{"allow_concurrent_memtable_write",
{offsetof(struct DBOptions, allow_concurrent_memtable_write),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},

@ -281,6 +281,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"random_access_max_buffer_size=1048576;"
"advise_random_on_open=true;"
"fail_if_options_file_error=false;"
"enable_pipelined_write=false;"
"allow_concurrent_memtable_write=true;"
"wal_recovery_mode=kPointInTimeRecovery;"
"enable_write_thread_adaptive_yield=true;"

Loading…
Cancel
Save