From 07bdcb91fe1fb6fcbed885580dcded8f5eac36f5 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 19 May 2017 14:24:23 -0700 Subject: [PATCH] New WriteImpl to pipeline WAL/memtable write Summary: PipelineWriteImpl is an alternative approach to WriteImpl. In WriteImpl, only one thread is allow to write at the same time. This thread will do both WAL and memtable writes for all write threads in the write group. Pending writers wait in queue until the current writer finishes. In the pipeline write approach, two queue is maintained: one WAL writer queue and one memtable writer queue. All writers (regardless of whether they need to write WAL) will still need to first join the WAL writer queue, and after the house keeping work and WAL writing, they will need to join memtable writer queue if needed. The benefit of this approach is that 1. Writers without memtable writes (e.g. the prepare phase of two phase commit) can exit write thread once WAL write is finish. They don't need to wait for memtable writes in case of group commit. 2. Pending writers only need to wait for previous WAL writer finish to be able to join the write thread, instead of wait also for previous memtable writes. Merging #2056 and #2058 into this PR. Closes https://github.com/facebook/rocksdb/pull/2286 Differential Revision: D5054606 Pulled By: yiwu-arbug fbshipit-source-id: ee5b11efd19d3e39d6b7210937b11cefdd4d1c8d --- HISTORY.md | 1 + db/db_impl.cc | 5 +- db/db_impl.h | 19 +- db/db_impl_write.cc | 240 ++++++++++++++---- db/flush_scheduler.cc | 28 ++- db/write_batch.cc | 15 +- db/write_batch_internal.h | 2 +- db/write_callback_test.cc | 280 +++++++++++---------- db/write_thread.cc | 414 +++++++++++++++++++++++-------- db/write_thread.h | 194 ++++++++++----- include/rocksdb/options.h | 15 ++ options/db_options.cc | 3 + options/db_options.h | 1 + options/options.cc | 1 + options/options_helper.h | 3 + options/options_settable_test.cc | 1 + 16 files changed, 842 insertions(+), 380 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index d5a6cd9b4..51133dc34 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ * Add debugging function `GetAllKeyVersions` to see internal versions of a range of keys. * Support file ingestion with universal compaction style * Support file ingestion behind with option `allow_ingest_behind` +* New option enable_pipelined_write which may improve write throughput in case writing from multiple threads and WAL enabled . ## 5.4.0 (04/11/2017) ### Public API Change diff --git a/db/db_impl.cc b/db/db_impl.cc index 45467c6ff..765e9d29c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -159,10 +159,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) max_total_in_memory_state_(0), is_snapshot_supported_(true), write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), - write_thread_(immutable_db_options_.enable_write_thread_adaptive_yield - ? immutable_db_options_.write_thread_max_yield_usec - : 0, - immutable_db_options_.write_thread_slow_yield_usec), + write_thread_(immutable_db_options_), write_controller_(mutable_db_options_.delayed_write_rate), last_batch_group_size_(0), unscheduled_flushes_(0), diff --git a/db/db_impl.h b/db/db_impl.h index 37aaf1ca5..68da91491 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -607,6 +607,11 @@ class DBImpl : public DB { uint64_t* log_used = nullptr, uint64_t log_ref = 0, bool disable_memtable = false); + Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, + WriteCallback* callback = nullptr, + uint64_t* log_used = nullptr, uint64_t log_ref = 0, + bool disable_memtable = false); + uint64_t FindMinLogContainingOutstandingPrep(); uint64_t FindMinPrepLogReferencedByMemTable(); @@ -726,16 +731,18 @@ class DBImpl : public DB { Status HandleWriteBufferFull(WriteContext* write_context); // REQUIRES: mutex locked - Status PreprocessWrite(const WriteOptions& write_options, bool need_log_sync, - bool* logs_getting_syned, WriteContext* write_context); + Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, + WriteContext* write_context); - Status WriteToWAL(const autovector& write_group, + Status WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, bool need_log_sync, bool need_log_dir_sync, SequenceNumber sequence); - // Used by WriteImpl to update bg_error_ when encountering memtable insert - // error. - void UpdateBackgroundError(const Status& memtable_insert_status); + // Used by WriteImpl to update bg_error_ if paranoid check is enabled. + void ParanoidCheck(const Status& status); + + // Used by WriteImpl to update bg_error_ in case of memtable insert error. + void MemTableInsertStatusCheck(const Status& memtable_insert_status); #ifndef ROCKSDB_LITE diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 0ee368613..20b6efd09 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -66,6 +66,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::Corruption("Batch is nullptr!"); } + if (immutable_db_options_.enable_pipelined_write) { + return PipelinedWriteImpl(write_options, my_batch, callback, log_used, + log_ref, disable_memtable); + } + Status status; PERF_TIMER_GUARD(write_pre_and_post_process_time); @@ -79,7 +84,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); write_thread_.JoinBatchGroup(&w); - if (w.state == WriteThread::STATE_PARALLEL_FOLLOWER) { + if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { // we are a non-leader in a parallel group PERF_TIMER_GUARD(write_memtable_time); @@ -93,11 +98,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, true /*concurrent_memtable_writes*/); } - if (write_thread_.CompleteParallelWorker(&w)) { + if (write_thread_.CompleteParallelMemTableWriter(&w)) { // we're responsible for exit batch group - auto last_sequence = w.parallel_group->last_sequence; + auto last_sequence = w.write_group->last_sequence; versions_->SetLastSequence(last_sequence); - UpdateBackgroundError(w.status); + MemTableInsertStatusCheck(w.status); write_thread_.ExitAsBatchGroupFollower(&w); } assert(w.state == WriteThread::STATE_COMPLETED); @@ -120,10 +125,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // when it finds suitable, and finish them in the same write batch. // This is how a write job could be done by the other writer. WriteContext write_context; - WriteThread::Writer* last_writer = &w; // Dummy intial value - autovector write_group; - WriteThread::ParallelGroup pg; - bool logs_getting_synced = false; + WriteThread::WriteGroup write_group; bool in_parallel_group = false; uint64_t last_sequence = versions_->LastSequence(); @@ -131,8 +133,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_dir_sync = need_log_sync && !log_dir_synced_; - status = PreprocessWrite(write_options, need_log_sync, &logs_getting_synced, - &write_context); + status = PreprocessWrite(write_options, &need_log_sync, &write_context); log::Writer* cur_log_writer = logs_.back().writer; mutex_.Unlock(); @@ -143,7 +144,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // into memtables last_batch_group_size_ = - write_thread_.EnterAsBatchGroupLeader(&w, &last_writer, &write_group); + write_thread_.EnterAsBatchGroupLeader(&w, &write_group); if (status.ok()) { // Rules for when we can update the memtable concurrently @@ -158,10 +159,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // relax rules 2 if we could prevent write batches from referring // more than once to a particular key. bool parallel = immutable_db_options_.allow_concurrent_memtable_write && - write_group.size() > 1; + write_group.size > 1; int total_count = 0; uint64_t total_byte_size = 0; - for (auto writer : write_group) { + for (auto* writer : write_group) { if (writer->CheckCallback(this)) { if (writer->ShouldWriteToMemtable()) { total_count += WriteBatchInternal::Count(writer->batch); @@ -187,7 +188,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, RecordTick(stats_, BYTES_WRITTEN, total_byte_size); stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); RecordTick(stats_, WRITE_DONE_BY_SELF); - auto write_done_by_other = write_group.size() - 1; + auto write_done_by_other = write_group.size - 1; if (write_done_by_other > 0) { stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other); @@ -219,12 +220,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*recovery_log_number*/, this); } else { - pg.leader = &w; - pg.last_writer = last_writer; - pg.last_sequence = last_sequence; - pg.running.store(static_cast(write_group.size()), - std::memory_order_relaxed); - write_thread_.LaunchParallelFollowers(&pg, current_sequence); + SequenceNumber next_sequence = current_sequence; + for (auto* writer : write_group) { + if (writer->ShouldWriteToMemtable()) { + writer->sequence = next_sequence; + next_sequence += WriteBatchInternal::Count(writer->batch); + } + } + write_group.last_sequence = last_sequence; + write_group.running.store(static_cast(write_group.size), + std::memory_order_relaxed); + write_thread_.LaunchParallelMemTableWriters(&write_group); in_parallel_group = true; // Each parallel follower is doing each own writes. The leader should @@ -244,19 +250,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } PERF_TIMER_START(write_pre_and_post_process_time); - // - // Is setting bg_error_ enough here? This will at least stop - // compaction and fail any further writes. - if (immutable_db_options_.paranoid_checks && !status.ok() && - !w.CallbackFailed() && !status.IsBusy() && !status.IsIncomplete()) { - mutex_.Lock(); - if (bg_error_.ok()) { - bg_error_ = status; // stop compaction & fail any further writes - } - mutex_.Unlock(); + if (!w.CallbackFailed()) { + ParanoidCheck(status); } - if (logs_getting_synced) { + if (need_log_sync) { mutex_.Lock(); MarkLogsSynced(logfile_number_, need_log_dir_sync, status); mutex_.Unlock(); @@ -266,40 +264,180 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (in_parallel_group) { // CompleteParallelWorker returns true if this thread should // handle exit, false means somebody else did - should_exit_batch_group = write_thread_.CompleteParallelWorker(&w); + should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w); } if (should_exit_batch_group) { versions_->SetLastSequence(last_sequence); - UpdateBackgroundError(w.status); - write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status); + MemTableInsertStatusCheck(w.status); + write_thread_.ExitAsBatchGroupLeader(write_group, w.status); } if (status.ok()) { status = w.FinalStatus(); } - return status; } -void DBImpl::UpdateBackgroundError(const Status& memtable_insert_status) { +Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, + WriteBatch* my_batch, WriteCallback* callback, + uint64_t* log_used, uint64_t log_ref, + bool disable_memtable) { + PERF_TIMER_GUARD(write_pre_and_post_process_time); + StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); + + WriteContext write_context; + + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + disable_memtable); + write_thread_.JoinBatchGroup(&w); + if (w.state == WriteThread::STATE_GROUP_LEADER) { + WriteThread::WriteGroup wal_write_group; + if (w.callback && !w.callback->AllowWriteBatching()) { + write_thread_.WaitForMemTableWriters(); + } + mutex_.Lock(); + bool need_log_sync = !write_options.disableWAL && write_options.sync; + bool need_log_dir_sync = need_log_sync && !log_dir_synced_; + w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); + log::Writer* cur_log_writer = logs_.back().writer; + mutex_.Unlock(); + + // This can set non-OK status if callback fail. + last_batch_group_size_ = + write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group); + const SequenceNumber current_sequence = + write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1; + size_t total_count = 0; + size_t total_byte_size = 0; + + if (w.status.ok()) { + SequenceNumber next_sequence = current_sequence; + for (auto writer : wal_write_group) { + if (writer->CheckCallback(this)) { + if (writer->ShouldWriteToMemtable()) { + writer->sequence = next_sequence; + size_t count = WriteBatchInternal::Count(writer->batch); + next_sequence += count; + total_count += count; + } + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); + } + } + if (w.disable_wal) { + has_unpersisted_data_.store(true, std::memory_order_relaxed); + } + write_thread_.UpdateLastSequence(current_sequence + total_count - 1); + } + + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + + PERF_TIMER_STOP(write_pre_and_post_process_time); + + if (w.ShouldWriteToWAL()) { + PERF_TIMER_GUARD(write_wal_time); + stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); + RecordTick(stats_, WRITE_DONE_BY_SELF, 1); + if (wal_write_group.size > 1) { + stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, + wal_write_group.size - 1); + RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); + } + w.status = WriteToWAL(wal_write_group, cur_log_writer, need_log_sync, + need_log_dir_sync, current_sequence); + } + + if (!w.CallbackFailed()) { + ParanoidCheck(w.status); + } + + if (need_log_sync) { + mutex_.Lock(); + MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status); + mutex_.Unlock(); + } + + write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); + } + + WriteThread::WriteGroup memtable_write_group; + if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { + PERF_TIMER_GUARD(write_memtable_time); + assert(w.status.ok()); + write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group); + if (memtable_write_group.size > 1 && + immutable_db_options_.allow_concurrent_memtable_write) { + write_thread_.LaunchParallelMemTableWriters(&memtable_write_group); + } else { + memtable_write_group.status = WriteBatchInternal::InsertInto( + memtable_write_group, w.sequence, column_family_memtables_.get(), + &flush_scheduler_, write_options.ignore_missing_column_families, + 0 /*log_number*/, this); + versions_->SetLastSequence(memtable_write_group.last_sequence); + write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); + } + } + + if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { + assert(w.ShouldWriteToMemtable()); + WriteBatchInternal::SetSequence(w.batch, w.sequence); + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + w.status = WriteBatchInternal::InsertInto( + &w, &column_family_memtables, &flush_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, this, + true /*concurrent_memtable_writes*/); + if (write_thread_.CompleteParallelMemTableWriter(&w)) { + MemTableInsertStatusCheck(w.status); + versions_->SetLastSequence(w.write_group->last_sequence); + write_thread_.ExitAsMemTableWriter(&w, *w.write_group); + } + } + + assert(w.state == WriteThread::STATE_COMPLETED); + if (log_used != nullptr) { + *log_used = w.log_used; + } + + return w.FinalStatus(); +} + +void DBImpl::ParanoidCheck(const Status& status) { + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. + if (immutable_db_options_.paranoid_checks && !status.ok() && + !status.IsBusy() && !status.IsIncomplete()) { + mutex_.Lock(); + if (bg_error_.ok()) { + bg_error_ = status; // stop compaction & fail any further writes + } + mutex_.Unlock(); + } +} + +void DBImpl::MemTableInsertStatusCheck(const Status& status) { // A non-OK status here indicates that the state implied by the // WAL has diverged from the in-memory state. This could be // because of a corrupt write_batch (very bad), or because the // client specified an invalid column family and didn't specify // ignore_missing_column_families. - if (!memtable_insert_status.ok()) { + if (!status.ok()) { mutex_.Lock(); assert(bg_error_.ok()); - bg_error_ = memtable_insert_status; + bg_error_ = status; mutex_.Unlock(); } } Status DBImpl::PreprocessWrite(const WriteOptions& write_options, - bool need_log_sync, bool* logs_getting_synced, + bool* need_log_sync, WriteContext* write_context) { mutex_.AssertHeld(); - assert(write_context != nullptr && logs_getting_synced != nullptr); + assert(write_context != nullptr && need_log_sync != nullptr); Status status; assert(!single_column_family_mode_ || @@ -336,7 +474,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, status = DelayWrite(last_batch_group_size_, write_options); } - if (status.ok() && need_log_sync) { + if (status.ok() && *need_log_sync) { // Wait until the parallel syncs are finished. Any sync process has to sync // the front log too so it is enough to check the status of front() // We do a while loop since log_sync_cv_ is signalled when any sync is @@ -356,26 +494,28 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, // actually write to the WAL log.getting_synced = true; } - *logs_getting_synced = true; + } else { + *need_log_sync = false; } return status; } -Status DBImpl::WriteToWAL(const autovector& write_group, +Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, bool need_log_sync, bool need_log_dir_sync, SequenceNumber sequence) { Status status; WriteBatch* merged_batch = nullptr; size_t write_with_wal = 0; - if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() && - write_group[0]->batch->GetWalTerminationPoint().is_cleared()) { + auto* leader = write_group.leader; + if (write_group.size == 1 && leader->ShouldWriteToWAL() && + leader->batch->GetWalTerminationPoint().is_cleared()) { // we simply write the first WriteBatch to WAL if the group only // contains one batch, that batch should be written to the WAL, // and the batch is not wanting to be truncated - merged_batch = write_group[0]->batch; - write_group[0]->log_used = logfile_number_; + merged_batch = leader->batch; + leader->log_used = logfile_number_; write_with_wal = 1; } else { // WAL needs all of the batches flattened into a single batch. @@ -643,6 +783,12 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log::Writer* new_log = nullptr; MemTable* new_mem = nullptr; + // In case of pipelined write is enabled, wait for all pending memtable + // writers. + if (immutable_db_options_.enable_pipelined_write) { + write_thread_.WaitForMemTableWriters(); + } + // Attempt to switch to a new memtable and trigger flush of old. // Do this without holding the dbmutex lock. assert(versions_->prev_log_number() == 0); diff --git a/db/flush_scheduler.cc b/db/flush_scheduler.cc index 08c29d598..eb24efb9c 100644 --- a/db/flush_scheduler.cc +++ b/db/flush_scheduler.cc @@ -15,11 +15,9 @@ namespace rocksdb { void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) { #ifndef NDEBUG - { - std::lock_guard lock(checking_mutex_); - assert(checking_set_.count(cfd) == 0); - checking_set_.insert(cfd); - } + std::lock_guard lock(checking_mutex_); + assert(checking_set_.count(cfd) == 0); + checking_set_.insert(cfd); #endif // NDEBUG cfd->Ref(); // Suppress false positive clang analyzer warnings. @@ -36,8 +34,11 @@ void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) { } ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { +#ifndef NDEBUG + std::lock_guard lock(checking_mutex_); +#endif // NDEBUG while (true) { - if (Empty()) { + if (head_.load(std::memory_order_relaxed) == nullptr) { return nullptr; } @@ -48,11 +49,9 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { delete node; #ifndef NDEBUG - { - auto iter = checking_set_.find(cfd); - assert(iter != checking_set_.end()); - checking_set_.erase(iter); - } + auto iter = checking_set_.find(cfd); + assert(iter != checking_set_.end()); + checking_set_.erase(iter); #endif // NDEBUG if (!cfd->IsDropped()) { @@ -68,8 +67,13 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { } bool FlushScheduler::Empty() { +#ifndef NDEBUG + std::lock_guard lock(checking_mutex_); +#endif // NDEBUG auto rv = head_.load(std::memory_order_relaxed) == nullptr; +#ifndef NDEBUG assert(rv == checking_set_.empty()); +#endif // NDEBUG return rv; } @@ -80,7 +84,7 @@ void FlushScheduler::Clear() { delete cfd; } } - assert(Empty()); + assert(head_.load(std::memory_order_relaxed) == nullptr); } } // namespace rocksdb diff --git a/db/write_batch.cc b/db/write_batch.cc index b1bb6b06c..0cc48b087 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1290,16 +1290,17 @@ public: // 2) During Write(), in a single-threaded write thread // 3) During Write(), in a concurrent context where memtables has been cloned // The reason is that it calls memtables->Seek(), which has a stateful cache -Status WriteBatchInternal::InsertInto( - const autovector& writers, SequenceNumber sequence, - ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, - bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db, - bool concurrent_memtable_writes) { +Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group, + SequenceNumber sequence, + ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, + bool ignore_missing_column_families, + uint64_t recovery_log_number, DB* db, + bool concurrent_memtable_writes) { MemTableInserter inserter(sequence, memtables, flush_scheduler, ignore_missing_column_families, recovery_log_number, db, concurrent_memtable_writes); - for (size_t i = 0; i < writers.size(); i++) { - auto w = writers[i]; + for (auto w : write_group) { if (!w->ShouldWriteToMemtable()) { continue; } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 97292a969..730edca75 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -153,7 +153,7 @@ class WriteBatchInternal { // // Under concurrent use, the caller is responsible for making sure that // the memtables object itself is thread-local. - static Status InsertInto(const autovector& batches, + static Status InsertInto(WriteThread::WriteGroup& write_group, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index 745135789..7e90a2fb8 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -119,161 +119,169 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { for (auto& allow_parallel : {true, false}) { for (auto& allow_batching : {true, false}) { for (auto& enable_WAL : {true, false}) { - for (auto& write_group : write_scenarios) { - Options options; - options.create_if_missing = true; - options.allow_concurrent_memtable_write = allow_parallel; - - ReadOptions read_options; - DB* db; - DBImpl* db_impl; - - DestroyDB(dbname, options); - ASSERT_OK(DB::Open(options, dbname, &db)); - - db_impl = dynamic_cast(db); - ASSERT_TRUE(db_impl); + for (auto& enable_pipelined_write : {true, false}) { + for (auto& write_group : write_scenarios) { + Options options; + options.create_if_missing = true; + options.allow_concurrent_memtable_write = allow_parallel; + options.enable_pipelined_write = enable_pipelined_write; + + ReadOptions read_options; + DB* db; + DBImpl* db_impl; + + DestroyDB(dbname, options); + ASSERT_OK(DB::Open(options, dbname, &db)); + + db_impl = dynamic_cast(db); + ASSERT_TRUE(db_impl); + + std::atomic threads_waiting(0); + std::atomic seq(db_impl->GetLatestSequenceNumber()); + ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + uint64_t cur_threads_waiting = 0; + bool is_leader = false; + bool is_last = false; + + // who am i + do { + cur_threads_waiting = threads_waiting.load(); + is_leader = (cur_threads_waiting == 0); + is_last = (cur_threads_waiting == write_group.size() - 1); + } while (!threads_waiting.compare_exchange_strong( + cur_threads_waiting, cur_threads_waiting + 1)); + + // check my state + auto* writer = reinterpret_cast(arg); + + if (is_leader) { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_INIT); + } + + // (meta test) the first WriteOP should indeed be the first + // and the last should be the last (all others can be out of + // order) + if (is_leader) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.front().callback_.should_fail_); + } else if (is_last) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.back().callback_.should_fail_); + } + + // wait for friends + while (threads_waiting.load() < write_group.size()) { + } + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { + // check my state + auto* writer = reinterpret_cast(arg); + + if (!allow_batching) { + // no batching so everyone should be a leader + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else if (!allow_parallel) { + ASSERT_TRUE( + writer->state == WriteThread::State::STATE_COMPLETED || + (enable_pipelined_write && + writer->state == + WriteThread::State::STATE_MEMTABLE_WRITER_LEADER)); + } + }); + + std::atomic thread_num(0); + std::atomic dummy_key(0); + std::function write_with_callback_func = [&]() { + uint32_t i = thread_num.fetch_add(1); + Random rnd(i); + + // leaders gotta lead + while (i > 0 && threads_waiting.load() < 1) { + } - std::atomic threads_waiting(0); - std::atomic seq(db_impl->GetLatestSequenceNumber()); - ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); + // loser has to lose + while (i == write_group.size() - 1 && + threads_waiting.load() < write_group.size() - 1) { + } - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { - uint64_t cur_threads_waiting = 0; - bool is_leader = false; - bool is_last = false; + auto& write_op = write_group.at(i); + write_op.Clear(); + write_op.callback_.allow_batching_ = allow_batching; - // who am i + // insert some keys + for (uint32_t j = 0; j < rnd.Next() % 50; j++) { + // grab unique key + char my_key = 0; do { - cur_threads_waiting = threads_waiting.load(); - is_leader = (cur_threads_waiting == 0); - is_last = (cur_threads_waiting == write_group.size() - 1); - } while (!threads_waiting.compare_exchange_strong( - cur_threads_waiting, cur_threads_waiting + 1)); - - // check my state - auto* writer = reinterpret_cast(arg); - - if (is_leader) { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else { - ASSERT_TRUE(writer->state == WriteThread::State::STATE_INIT); - } + my_key = dummy_key.load(); + } while ( + !dummy_key.compare_exchange_strong(my_key, my_key + 1)); - // (meta test) the first WriteOP should indeed be the first - // and the last should be the last (all others can be out of - // order) - if (is_leader) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.front().callback_.should_fail_); - } else if (is_last) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.back().callback_.should_fail_); - } + string skey(5, my_key); + string sval(10, my_key); + write_op.Put(skey, sval); - // wait for friends - while (threads_waiting.load() < write_group.size()) { + if (!write_op.callback_.should_fail_) { + seq.fetch_add(1); } - }); - - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { - // check my state - auto* writer = reinterpret_cast(arg); - - if (!allow_batching) { - // no batching so everyone should be a leader - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else if (!allow_parallel) { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_COMPLETED); - } - }); - - std::atomic thread_num(0); - std::atomic dummy_key(0); - std::function write_with_callback_func = [&]() { - uint32_t i = thread_num.fetch_add(1); - Random rnd(i); - - // leaders gotta lead - while (i > 0 && threads_waiting.load() < 1) { - } - - // loser has to lose - while (i == write_group.size() - 1 && - threads_waiting.load() < write_group.size() - 1) { - } - - auto& write_op = write_group.at(i); - write_op.Clear(); - write_op.callback_.allow_batching_ = allow_batching; - - // insert some keys - for (uint32_t j = 0; j < rnd.Next() % 50; j++) { - // grab unique key - char my_key = 0; - do { - my_key = dummy_key.load(); - } while (!dummy_key.compare_exchange_strong(my_key, my_key + 1)); + } - string skey(5, my_key); - string sval(10, my_key); - write_op.Put(skey, sval); + WriteOptions woptions; + woptions.disableWAL = !enable_WAL; + woptions.sync = enable_WAL; + Status s = db_impl->WriteWithCallback( + woptions, &write_op.write_batch_, &write_op.callback_); - if (!write_op.callback_.should_fail_) { - seq.fetch_add(1); + if (write_op.callback_.should_fail_) { + ASSERT_TRUE(s.IsBusy()); + } else { + ASSERT_OK(s); } - } + }; - WriteOptions woptions; - woptions.disableWAL = !enable_WAL; - woptions.sync = enable_WAL; - Status s = db_impl->WriteWithCallback( - woptions, &write_op.write_batch_, &write_op.callback_); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - if (write_op.callback_.should_fail_) { - ASSERT_TRUE(s.IsBusy()); - } else { - ASSERT_OK(s); + // do all the writes + std::vector threads; + for (uint32_t i = 0; i < write_group.size(); i++) { + threads.emplace_back(write_with_callback_func); + } + for (auto& t : threads) { + t.join(); } - }; - - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - - // do all the writes - std::vector threads; - for (uint32_t i = 0; i < write_group.size(); i++) { - threads.emplace_back(write_with_callback_func); - } - for (auto& t : threads) { - t.join(); - } - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - // check for keys - string value; - for (auto& w : write_group) { - ASSERT_TRUE(w.callback_.was_called_); - for (auto& kvp : w.kvs_) { - if (w.callback_.should_fail_) { - ASSERT_TRUE( - db->Get(read_options, kvp.first, &value).IsNotFound()); - } else { - ASSERT_OK(db->Get(read_options, kvp.first, &value)); - ASSERT_EQ(value, kvp.second); + // check for keys + string value; + for (auto& w : write_group) { + ASSERT_TRUE(w.callback_.was_called_); + for (auto& kvp : w.kvs_) { + if (w.callback_.should_fail_) { + ASSERT_TRUE( + db->Get(read_options, kvp.first, &value).IsNotFound()); + } else { + ASSERT_OK(db->Get(read_options, kvp.first, &value)); + ASSERT_EQ(value, kvp.second); + } } } - } - ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber()); + ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber()); - delete db; - DestroyDB(dbname, options); + delete db; + DestroyDB(dbname, options); + } } } } diff --git a/db/write_thread.cc b/db/write_thread.cc index ca3f9b36d..0938ad28c 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -15,10 +15,17 @@ namespace rocksdb { -WriteThread::WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec) - : max_yield_usec_(max_yield_usec), - slow_yield_usec_(slow_yield_usec), - newest_writer_(nullptr) {} +WriteThread::WriteThread(const ImmutableDBOptions& db_options) + : max_yield_usec_(db_options.enable_write_thread_adaptive_yield + ? db_options.write_thread_max_yield_usec + : 0), + slow_yield_usec_(db_options.write_thread_slow_yield_usec), + allow_concurrent_memtable_write_( + db_options.allow_concurrent_memtable_write), + enable_pipelined_write_(db_options.enable_pipelined_write), + newest_writer_(nullptr), + newest_memtable_writer_(nullptr), + last_sequence_(0) {} uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) { // We're going to block. Lazily create the mutex. We guarantee @@ -184,22 +191,39 @@ void WriteThread::SetState(Writer* w, uint8_t new_state) { } } -void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) { +bool WriteThread::LinkOne(Writer* w, std::atomic* newest_writer) { + assert(newest_writer != nullptr); assert(w->state == STATE_INIT); - + Writer* writers = newest_writer->load(std::memory_order_relaxed); while (true) { - Writer* writers = newest_writer_.load(std::memory_order_relaxed); w->link_older = writers; - if (newest_writer_.compare_exchange_strong(writers, w)) { - if (writers == nullptr) { - // this isn't part of the WriteThread machinery, but helps with - // debugging and is checked by an assert in WriteImpl - w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed); - } - // Then we are the head of the queue and hence definiltly the leader - *linked_as_leader = (writers == nullptr); - // Otherwise we will wait for previous leader to define our status - return; + if (newest_writer->compare_exchange_weak(writers, w)) { + return (writers == nullptr); + } + } +} + +bool WriteThread::LinkGroup(WriteGroup& write_group, + std::atomic* newest_writer) { + assert(newest_writer != nullptr); + Writer* leader = write_group.leader; + Writer* last_writer = write_group.last_writer; + Writer* w = last_writer; + while (true) { + // Unset link_newer pointers to make sure when we call + // CreateMissingNewerLinks later it create all missing links. + w->link_newer = nullptr; + w->write_group = nullptr; + if (w == leader) { + break; + } + w = w->link_older; + } + Writer* newest = newest_writer->load(std::memory_order_relaxed); + while (true) { + leader->link_older = newest; + if (newest_writer->compare_exchange_weak(newest, last_writer)) { + return (newest == nullptr); } } } @@ -216,12 +240,43 @@ void WriteThread::CreateMissingNewerLinks(Writer* head) { } } +void WriteThread::CompleteLeader(WriteGroup& write_group) { + assert(write_group.size > 0); + Writer* leader = write_group.leader; + if (write_group.size == 1) { + write_group.leader = nullptr; + write_group.last_writer = nullptr; + } else { + assert(leader->link_newer != nullptr); + leader->link_newer->link_older = nullptr; + write_group.leader = leader->link_newer; + } + write_group.size -= 1; + SetState(leader, STATE_COMPLETED); +} + +void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) { + assert(write_group.size > 1); + assert(w != write_group.leader); + if (w == write_group.last_writer) { + w->link_older->link_newer = nullptr; + write_group.last_writer = w->link_older; + } else { + w->link_older->link_newer = w->link_newer; + w->link_newer->link_older = w->link_older; + } + write_group.size -= 1; + SetState(w, STATE_COMPLETED); +} + void WriteThread::JoinBatchGroup(Writer* w) { static AdaptationContext ctx("JoinBatchGroup"); assert(w->batch != nullptr); - bool linked_as_leader; - LinkOne(w, &linked_as_leader); + bool linked_as_leader = LinkOne(w, &newest_writer_); + if (linked_as_leader) { + SetState(w, STATE_GROUP_LEADER); + } TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w); @@ -231,23 +286,28 @@ void WriteThread::JoinBatchGroup(Writer* w) { * 1) An existing leader pick us as the new leader when it finishes * 2) An existing leader pick us as its follewer and * 2.1) finishes the memtable writes on our behalf - * 2.2) Or tell us to finish the memtable writes it in pralallel + * 2.2) Or tell us to finish the memtable writes in pralallel + * 3) (pipelined write) An existing leader pick us as its follower and + * finish book-keeping and WAL write for us, enqueue us as pending + * memtable writer, and + * 3.1) we become memtable writer group leader, or + * 3.2) an existing memtable writer group leader tell us to finish memtable + * writes in parallel. */ - AwaitState(w, - STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED, + AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | + STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &ctx); TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); } } -size_t WriteThread::EnterAsBatchGroupLeader( - Writer* leader, WriteThread::Writer** last_writer, - autovector* write_batch_group) { +size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, + WriteGroup* write_group) { assert(leader->link_older == nullptr); assert(leader->batch != nullptr); + assert(write_group != nullptr); size_t size = WriteBatchInternal::ByteSize(leader->batch); - write_batch_group->push_back(leader); // Allow the group to grow up to a maximum size, but if the // original write is small, limit the growth so we do not slow @@ -257,8 +317,10 @@ size_t WriteThread::EnterAsBatchGroupLeader( max_size = size + (128 << 10); } - *last_writer = leader; - + leader->write_group = write_group; + write_group->leader = leader; + write_group->last_writer = leader; + write_group->size = 1; Writer* newest_writer = newest_writer_.load(std::memory_order_acquire); // This is safe regardless of any db mutex status of the caller. Previous @@ -308,136 +370,268 @@ size_t WriteThread::EnterAsBatchGroupLeader( break; } + w->write_group = write_group; size += batch_size; - write_batch_group->push_back(w); - w->in_batch_group = true; - *last_writer = w; + write_group->last_writer = w; + write_group->size++; } return size; } -void WriteThread::LaunchParallelFollowers(ParallelGroup* pg, - SequenceNumber sequence) { - // EnterAsBatchGroupLeader already created the links from leader to - // newer writers in the group +void WriteThread::EnterAsMemTableWriter(Writer* leader, + WriteGroup* write_group) { + assert(leader != nullptr); + assert(leader->link_older == nullptr); + assert(leader->batch != nullptr); + assert(write_group != nullptr); + + size_t size = WriteBatchInternal::ByteSize(leader->batch); + + // Allow the group to grow up to a maximum size, but if the + // original write is small, limit the growth so we do not slow + // down the small write too much. + size_t max_size = 1 << 20; + if (size <= (128 << 10)) { + max_size = size + (128 << 10); + } + + leader->write_group = write_group; + write_group->leader = leader; + write_group->size = 1; + Writer* last_writer = leader; + + if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) { + Writer* newest_writer = newest_memtable_writer_.load(); + CreateMissingNewerLinks(newest_writer); + + Writer* w = leader; + while (w != newest_writer) { + w = w->link_newer; + + if (w->batch == nullptr) { + break; + } + + if (w->batch->HasMerge()) { + break; + } - pg->leader->parallel_group = pg; + if (!allow_concurrent_memtable_write_) { + auto batch_size = WriteBatchInternal::ByteSize(w->batch); + if (size + batch_size > max_size) { + // Do not make batch too big + break; + } + size += batch_size; + } - Writer* w = pg->leader; - w->sequence = sequence; + w->write_group = write_group; + last_writer = w; + write_group->size++; + } + } + + write_group->last_writer = last_writer; + write_group->last_sequence = + last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1; +} - // Initialize and wake up the others - while (w != pg->last_writer) { - // Writers that won't write don't get sequence allotment - if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) { - // There is a sequence number of each written key - sequence += WriteBatchInternal::Count(w->batch); +void WriteThread::ExitAsMemTableWriter(Writer* self, WriteGroup& write_group) { + Writer* leader = write_group.leader; + Writer* last_writer = write_group.last_writer; + + Writer* newest_writer = last_writer; + if (!newest_memtable_writer_.compare_exchange_strong(newest_writer, + nullptr)) { + CreateMissingNewerLinks(newest_writer); + Writer* next_leader = last_writer->link_newer; + assert(next_leader != nullptr); + next_leader->link_older = nullptr; + SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER); + } + Writer* w = leader; + while (true) { + if (!write_group.status.ok()) { + w->status = write_group.status; } - w = w->link_newer; + Writer* next = w->link_newer; + if (w != leader) { + SetState(w, STATE_COMPLETED); + } + if (w == last_writer) { + break; + } + w = next; + } + // Note that leader has to exit last, since it owns the write group. + SetState(leader, STATE_COMPLETED); +} - w->sequence = sequence; // sequence number for the first key in the batch - w->parallel_group = pg; - SetState(w, STATE_PARALLEL_FOLLOWER); +void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) { + assert(write_group != nullptr); + write_group->running.store(write_group->size); + for (auto w : *write_group) { + SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); } } // This method is called by both the leader and parallel followers -bool WriteThread::CompleteParallelWorker(Writer* w) { - static AdaptationContext ctx("CompleteParallelWorker"); +bool WriteThread::CompleteParallelMemTableWriter(Writer* w) { + static AdaptationContext ctx("CompleteParallelMemTableWriter"); - auto* pg = w->parallel_group; + auto* write_group = w->write_group; if (!w->status.ok()) { - std::lock_guard guard(pg->leader->StateMutex()); - pg->status = w->status; + std::lock_guard guard(write_group->leader->StateMutex()); + write_group->status = w->status; } - if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) { + if (write_group->running-- > 1) { // we're not the last one AwaitState(w, STATE_COMPLETED, &ctx); return false; } // else we're the last parallel worker and should perform exit duties. - w->status = pg->status; + w->status = write_group->status; return true; } void WriteThread::ExitAsBatchGroupFollower(Writer* w) { - auto* pg = w->parallel_group; + auto* write_group = w->write_group; - assert(w->state == STATE_PARALLEL_FOLLOWER); - assert(pg->status.ok()); - ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status); + assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER); + assert(write_group->status.ok()); + ExitAsBatchGroupLeader(*write_group, write_group->status); assert(w->status.ok()); assert(w->state == STATE_COMPLETED); - SetState(pg->leader, STATE_COMPLETED); + SetState(write_group->leader, STATE_COMPLETED); } -void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, +void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, Status status) { + static AdaptationContext ctx("ExitAsBatchGroupLeader"); + Writer* leader = write_group.leader; + Writer* last_writer = write_group.last_writer; assert(leader->link_older == nullptr); - Writer* head = newest_writer_.load(std::memory_order_acquire); - if (head != last_writer || - !newest_writer_.compare_exchange_strong(head, nullptr)) { - // Either w wasn't the head during the load(), or it was the head - // during the load() but somebody else pushed onto the list before - // we did the compare_exchange_strong (causing it to fail). In the - // latter case compare_exchange_strong has the effect of re-reading - // its first param (head). No need to retry a failing CAS, because - // only a departing leader (which we are at the moment) can remove - // nodes from the list. - assert(head != last_writer); - - // After walking link_older starting from head (if not already done) - // we will be able to traverse w->link_newer below. This function - // can only be called from an active leader, only a leader can - // clear newest_writer_, we didn't, and only a clear newest_writer_ - // could cause the next leader to start their work without a call - // to MarkJoined, so we can definitely conclude that no other leader - // work is going on here (with or without db mutex). - CreateMissingNewerLinks(head); - assert(last_writer->link_newer->link_older == last_writer); - last_writer->link_newer->link_older = nullptr; - - // Next leader didn't self-identify, because newest_writer_ wasn't - // nullptr when they enqueued (we were definitely enqueued before them - // and are still in the list). That means leader handoff occurs when - // we call MarkJoined - SetState(last_writer->link_newer, STATE_GROUP_LEADER); - } - // else nobody else was waiting, although there might already be a new - // leader now - - while (last_writer != leader) { - last_writer->status = status; - // we need to read link_older before calling SetState, because as soon - // as it is marked committed the other thread's Await may return and - // deallocate the Writer. - auto next = last_writer->link_older; - SetState(last_writer, STATE_COMPLETED); - - last_writer = next; + if (enable_pipelined_write_) { + // Notify writers don't write to memtable to exit. + for (Writer* w = last_writer; w != leader;) { + Writer* next = w->link_older; + w->status = status; + if (!w->ShouldWriteToMemtable()) { + CompleteFollower(w, write_group); + } + w = next; + } + if (!leader->ShouldWriteToMemtable()) { + CompleteLeader(write_group); + } + // Link the ramaining of the group to memtable writer list. + if (write_group.size > 0) { + if (LinkGroup(write_group, &newest_memtable_writer_)) { + // The leader can now be different from current writer. + SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER); + } + } + // Reset newest_writer_ and wake up the next leader. + Writer* newest_writer = last_writer; + if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) { + Writer* next_leader = newest_writer; + while (next_leader->link_older != last_writer) { + next_leader = next_leader->link_older; + assert(next_leader != nullptr); + } + next_leader->link_older = nullptr; + SetState(next_leader, STATE_GROUP_LEADER); + } + AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER | + STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, + &ctx); + } else { + Writer* head = newest_writer_.load(std::memory_order_acquire); + if (head != last_writer || + !newest_writer_.compare_exchange_strong(head, nullptr)) { + // Either w wasn't the head during the load(), or it was the head + // during the load() but somebody else pushed onto the list before + // we did the compare_exchange_strong (causing it to fail). In the + // latter case compare_exchange_strong has the effect of re-reading + // its first param (head). No need to retry a failing CAS, because + // only a departing leader (which we are at the moment) can remove + // nodes from the list. + assert(head != last_writer); + + // After walking link_older starting from head (if not already done) + // we will be able to traverse w->link_newer below. This function + // can only be called from an active leader, only a leader can + // clear newest_writer_, we didn't, and only a clear newest_writer_ + // could cause the next leader to start their work without a call + // to MarkJoined, so we can definitely conclude that no other leader + // work is going on here (with or without db mutex). + CreateMissingNewerLinks(head); + assert(last_writer->link_newer->link_older == last_writer); + last_writer->link_newer->link_older = nullptr; + + // Next leader didn't self-identify, because newest_writer_ wasn't + // nullptr when they enqueued (we were definitely enqueued before them + // and are still in the list). That means leader handoff occurs when + // we call MarkJoined + SetState(last_writer->link_newer, STATE_GROUP_LEADER); + } + // else nobody else was waiting, although there might already be a new + // leader now + + while (last_writer != leader) { + last_writer->status = status; + // we need to read link_older before calling SetState, because as soon + // as it is marked committed the other thread's Await may return and + // deallocate the Writer. + auto next = last_writer->link_older; + SetState(last_writer, STATE_COMPLETED); + + last_writer = next; + } } } void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { static AdaptationContext ctx("EnterUnbatched"); - - assert(w->batch == nullptr); - bool linked_as_leader; - LinkOne(w, &linked_as_leader); + assert(w != nullptr && w->batch == nullptr); + mu->Unlock(); + bool linked_as_leader = LinkOne(w, &newest_writer_); if (!linked_as_leader) { - mu->Unlock(); TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait"); // Last leader will not pick us as a follower since our batch is nullptr AwaitState(w, STATE_GROUP_LEADER, &ctx); - mu->Lock(); } + if (enable_pipelined_write_) { + WaitForMemTableWriters(); + } + mu->Lock(); } void WriteThread::ExitUnbatched(Writer* w) { - Status dummy_status; - ExitAsBatchGroupLeader(w, w, dummy_status); + assert(w != nullptr); + Writer* newest_writer = w; + if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) { + CreateMissingNewerLinks(newest_writer); + Writer* next_leader = w->link_newer; + assert(next_leader != nullptr); + next_leader->link_older = nullptr; + SetState(next_leader, STATE_GROUP_LEADER); + } +} + +void WriteThread::WaitForMemTableWriters() { + static AdaptationContext ctx("WaitForMemTableWriters"); + assert(enable_pipelined_write_); + if (newest_memtable_writer_.load() == nullptr) { + return; + } + Writer w; + if (!LinkOne(&w, &newest_memtable_writer_)) { + AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &ctx); + } + newest_memtable_writer_.store(nullptr); } } // namespace rocksdb diff --git a/db/write_thread.h b/db/write_thread.h index a7dae3019..c72c95a11 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -49,32 +49,66 @@ class WriteThread { // the leader to STATE_COMPLETED. STATE_GROUP_LEADER = 2, - // A Writer that has returned as a follower in a parallel group. - // It should apply its batch to the memtable and then call - // CompleteParallelWorker. When someone calls ExitAsBatchGroupLeader - // or EarlyExitParallelGroup this state will get transitioned to - // STATE_COMPLETED. - STATE_PARALLEL_FOLLOWER = 4, + // The state used to inform a waiting writer that it has become the + // leader of memtable writer group. The leader will either write + // memtable for the whole group, or launch a parallel group write + // to memtable by calling LaunchParallelMemTableWrite. + STATE_MEMTABLE_WRITER_LEADER = 4, + + // The state used to inform a waiting writer that it has become a + // parallel memtable writer. It can be the group leader who launch the + // praallel writer group, or one of the followers. The writer should then + // apply its batch to the memtable concurrently and call + // CompleteParallelMemTableWriter. + STATE_PARALLEL_MEMTABLE_WRITER = 8, // A follower whose writes have been applied, or a parallel leader // whose followers have all finished their work. This is a terminal // state. - STATE_COMPLETED = 8, + STATE_COMPLETED = 16, // A state indicating that the thread may be waiting using StateMutex() // and StateCondVar() - STATE_LOCKED_WAITING = 16, + STATE_LOCKED_WAITING = 32, }; struct Writer; - struct ParallelGroup { - Writer* leader; - Writer* last_writer; + struct WriteGroup { + Writer* leader = nullptr; + Writer* last_writer = nullptr; SequenceNumber last_sequence; // before running goes to zero, status needs leader->StateMutex() Status status; - std::atomic running; + std::atomic running; + size_t size = 0; + + struct Iterator { + Writer* writer; + Writer* last_writer; + + explicit Iterator(Writer* w, Writer* last) + : writer(w), last_writer(last) {} + + Writer* operator*() const { return writer; } + + Iterator& operator++() { + assert(writer != nullptr); + if (writer == last_writer) { + writer = nullptr; + } else { + writer = writer->link_newer; + } + return *this; + } + + bool operator!=(const Iterator& other) const { + return writer != other.writer; + } + }; + + Iterator begin() const { return Iterator(leader, last_writer); } + Iterator end() const { return Iterator(nullptr, nullptr); } }; // Information kept for every waiting writer. @@ -86,11 +120,10 @@ class WriteThread { bool disable_memtable; uint64_t log_used; // log number that this batch was inserted into uint64_t log_ref; // log number that memtable insert should reference - bool in_batch_group; WriteCallback* callback; bool made_waitable; // records lazy construction of mutex and cv std::atomic state; // write under StateMutex() or pre-link - ParallelGroup* parallel_group; + WriteGroup* write_group; SequenceNumber sequence; // the sequence number to use for the first key Status status; // status of memtable inserter Status callback_status; // status returned by callback->Callback() @@ -107,11 +140,10 @@ class WriteThread { disable_memtable(false), log_used(0), log_ref(0), - in_batch_group(false), callback(nullptr), made_waitable(false), state(STATE_INIT), - parallel_group(nullptr), + write_group(nullptr), link_older(nullptr), link_newer(nullptr) {} @@ -124,11 +156,10 @@ class WriteThread { disable_memtable(_disable_memtable), log_used(0), log_ref(_log_ref), - in_batch_group(false), callback(_callback), made_waitable(false), state(STATE_INIT), - parallel_group(nullptr), + write_group(nullptr), link_older(nullptr), link_newer(nullptr) {} @@ -182,10 +213,12 @@ class WriteThread { } bool ShouldWriteToMemtable() { - return !CallbackFailed() && !disable_memtable; + return status.ok() && !CallbackFailed() && !disable_memtable; } - bool ShouldWriteToWAL() { return !CallbackFailed() && !disable_wal; } + bool ShouldWriteToWAL() { + return status.ok() && !CallbackFailed() && !disable_wal; + } // No other mutexes may be acquired while holding StateMutex(), it is // always last in the order @@ -201,7 +234,16 @@ class WriteThread { } }; - WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec); + struct AdaptationContext { + const char* name; + std::atomic value; + + explicit AdaptationContext(const char* name0) : name(name0), value(0) {} + }; + + explicit WriteThread(const ImmutableDBOptions& db_options); + + virtual ~WriteThread() = default; // IMPORTANT: None of the methods in this class rely on the db mutex // for correctness. All of the methods except JoinBatchGroup and @@ -226,40 +268,45 @@ class WriteThread { // Constructs a write batch group led by leader, which should be a // Writer passed to JoinBatchGroup on the current thread. // - // Writer* leader: Writer that is STATE_GROUP_LEADER - // Writer** last_writer: Out-param that identifies the last follower - // autovector* write_batch_group: Out-param of group members - // returns: Total batch group byte size - size_t EnterAsBatchGroupLeader( - Writer* leader, Writer** last_writer, - autovector* write_batch_group); + // Writer* leader: Writer that is STATE_GROUP_LEADER + // WriteGroup* write_group: Out-param of group members + // returns: Total batch group byte size + size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group); + + // Unlinks the Writer-s in a batch group, wakes up the non-leaders, + // and wakes up the next leader (if any). + // + // WriteGroup* write_group: the write group + // Status status: Status of write operation + void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status); + + // Exit batch group on behalf of batch group leader. + void ExitAsBatchGroupFollower(Writer* w); + + // Constructs a write batch group led by leader from newest_memtable_writers_ + // list. The leader should either write memtable for the whole group and + // call ExitAsMemTableWriter, or launch parallel memtable write through + // LaunchParallelMemTableWriters. + void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup); + + // Memtable writer group leader, or the last finished writer in a parallel + // write group, exit from the newest_memtable_writers_ list, and wake up + // the next leader if needed. + void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group); // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the // non-leader members of this write batch group. Sets Writer::sequence // before waking them up. // - // ParallalGroup* pg: Extra state used to coordinate the parallel add - // SequenceNumber sequence: Starting sequence number to assign to Writer-s - void LaunchParallelFollowers(ParallelGroup* pg, SequenceNumber sequence); + // WriteGroup* write_group: Extra state used to coordinate the parallel add + void LaunchParallelMemTableWriters(WriteGroup* write_group); // Reports the completion of w's batch to the parallel group leader, and // waits for the rest of the parallel batch to complete. Returns true // if this thread is the last to complete, and hence should advance // the sequence number and then call EarlyExitParallelGroup, false if // someone else has already taken responsibility for that. - bool CompleteParallelWorker(Writer* w); - - // Exit batch group on behalf of batch group leader. - void ExitAsBatchGroupFollower(Writer* w); - - // Unlinks the Writer-s in a batch group, wakes up the non-leaders, - // and wakes up the next leader (if any). - // - // Writer* leader: From EnterAsBatchGroupLeader - // Writer* last_writer: Value of out-param of EnterAsBatchGroupLeader - // Status status: Status of write operation - void ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, - Status status); + bool CompleteParallelMemTableWriter(Writer* w); // Waits for all preceding writers (unlocking mu while waiting), then // registers w as the currently proceeding writer. @@ -273,21 +320,40 @@ class WriteThread { // writers. void ExitUnbatched(Writer* w); - struct AdaptationContext { - const char* name; - std::atomic value; + // Wait for all parallel memtable writers to finish, in case pipelined + // write is enabled. + void WaitForMemTableWriters(); - explicit AdaptationContext(const char* name0) : name(name0), value(0) {} - }; + SequenceNumber UpdateLastSequence(SequenceNumber sequence) { + if (sequence > last_sequence_) { + last_sequence_ = sequence; + } + return last_sequence_; + } private: - uint64_t max_yield_usec_; - uint64_t slow_yield_usec_; + // See AwaitState. + const uint64_t max_yield_usec_; + const uint64_t slow_yield_usec_; + + // Allow multiple writers write to memtable concurrently. + const bool allow_concurrent_memtable_write_; + + // Enable pipelined write to WAL and memtable. + const bool enable_pipelined_write_; - // Points to the newest pending Writer. Only leader can remove - // elements, adding can be done lock-free by anybody + // Points to the newest pending writer. Only leader can remove + // elements, adding can be done lock-free by anybody. std::atomic newest_writer_; + // Points to the newest pending memtable writer. Used only when pipelined + // write is enabled. + std::atomic newest_memtable_writer_; + + // The last sequence that have been consumed by a writer. The sequence + // is not necessary visible to reads because the writer can be ongoing. + SequenceNumber last_sequence_; + // Waits for w->state & goal_mask using w->StateMutex(). Returns // the state that satisfies goal_mask. uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask); @@ -298,16 +364,30 @@ class WriteThread { // a context-dependent static. uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx); + // Set writer state and wake the writer up if it is waiting. void SetState(Writer* w, uint8_t new_state); - // Links w into the newest_writer_ list. Sets *linked_as_leader to - // true if w was linked directly into the leader position. Safe to - // call from multiple threads without external locking. - void LinkOne(Writer* w, bool* linked_as_leader); + // Links w into the newest_writer list. Return true if w was linked directly + // into the leader position. Safe to call from multiple threads without + // external locking. + bool LinkOne(Writer* w, std::atomic* newest_writer); + + // Link write group into the newest_writer list as a whole, while keeping the + // order of the writers unchanged. Return true if the group was linked + // directly into the leader position. + bool LinkGroup(WriteGroup& write_group, std::atomic* newest_writer); // Computes any missing link_newer links. Should not be called // concurrently with itself. void CreateMissingNewerLinks(Writer* head); + + // Set the leader in write_group to completed state and remove it from the + // write group. + void CompleteLeader(WriteGroup& write_group); + + // Set a follower in write_group to completed state and remove it from the + // write group. + void CompleteFollower(Writer* w, WriteGroup& write_group); }; } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 40b917c6d..2dd8e94f5 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -751,6 +751,21 @@ struct DBOptions { // Default: 16MB/s uint64_t delayed_write_rate = 16 * 1024U * 1024U; + // By default, a single write thread queue is maintained. The thread gets + // to the head of the queue becomes write batch group leader and responsible + // for writing to WAL and memtable for the batch group. + // + // If enable_pipelined_write is true, separate write thread queue is + // maintained for WAL write and memtable write. A write thread first enter WAL + // writer queue and then memtable writer queue. Pending thread on the WAL + // writer queue thus only have to wait for previous writers to finish thier + // WAL writing but not the memtable writing. Enabling the feature may improve + // write throughput and reduce latency of the prepare phase of two-phase + // commit. + // + // Default: false + bool enable_pipelined_write = false; + // If true, allow multi-writers to update mem tables in parallel. // Only some memtable_factory-s support concurrent writes; currently it // is implemented only for SkipListFactory. Concurrent memtable writes diff --git a/options/db_options.cc b/options/db_options.cc index 55f87a7bf..df048d403 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -70,6 +70,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) wal_bytes_per_sync(options.wal_bytes_per_sync), listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), + enable_pipelined_write(options.enable_pipelined_write), allow_concurrent_memtable_write(options.allow_concurrent_memtable_write), enable_write_thread_adaptive_yield( options.enable_write_thread_adaptive_yield), @@ -189,6 +190,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { wal_recovery_mode); ROCKS_LOG_HEADER(log, " Options.enable_thread_tracking: %d", enable_thread_tracking); + ROCKS_LOG_HEADER(log, " Options.enable_pipelined_write: %d", + enable_pipelined_write); ROCKS_LOG_HEADER(log, " Options.allow_concurrent_memtable_write: %d", allow_concurrent_memtable_write); ROCKS_LOG_HEADER(log, " Options.enable_write_thread_adaptive_yield: %d", diff --git a/options/db_options.h b/options/db_options.h index e0d4a823e..befa2daa3 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -63,6 +63,7 @@ struct ImmutableDBOptions { uint64_t wal_bytes_per_sync; std::vector> listeners; bool enable_thread_tracking; + bool enable_pipelined_write; bool allow_concurrent_memtable_write; bool enable_write_thread_adaptive_yield; uint64_t write_thread_max_yield_usec; diff --git a/options/options.cc b/options/options.cc index 780e86532..198d4bbca 100644 --- a/options/options.cc +++ b/options/options.cc @@ -176,6 +176,7 @@ DBOptions::DBOptions(const Options& options) listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), delayed_write_rate(options.delayed_write_rate), + enable_pipelined_write(options.enable_pipelined_write), allow_concurrent_memtable_write(options.allow_concurrent_memtable_write), enable_write_thread_adaptive_yield( options.enable_write_thread_adaptive_yield), diff --git a/options/options_helper.h b/options/options_helper.h index e331416a0..8c48ae6ea 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -305,6 +305,9 @@ static std::unordered_map db_options_type_info = { {"fail_if_options_file_error", {offsetof(struct DBOptions, fail_if_options_file_error), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, + {"enable_pipelined_write", + {offsetof(struct DBOptions, enable_pipelined_write), OptionType::kBoolean, + OptionVerificationType::kNormal, false, 0}}, {"allow_concurrent_memtable_write", {offsetof(struct DBOptions, allow_concurrent_memtable_write), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 57256bb32..ce76093c5 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -281,6 +281,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "random_access_max_buffer_size=1048576;" "advise_random_on_open=true;" "fail_if_options_file_error=false;" + "enable_pipelined_write=false;" "allow_concurrent_memtable_write=true;" "wal_recovery_mode=kPointInTimeRecovery;" "enable_write_thread_adaptive_yield=true;"