diff --git a/HISTORY.md b/HISTORY.md index d5a6cd9b4..51133dc34 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ * Add debugging function `GetAllKeyVersions` to see internal versions of a range of keys. * Support file ingestion with universal compaction style * Support file ingestion behind with option `allow_ingest_behind` +* New option enable_pipelined_write which may improve write throughput in case writing from multiple threads and WAL enabled . ## 5.4.0 (04/11/2017) ### Public API Change diff --git a/db/db_impl.cc b/db/db_impl.cc index 45467c6ff..765e9d29c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -159,10 +159,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) max_total_in_memory_state_(0), is_snapshot_supported_(true), write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), - write_thread_(immutable_db_options_.enable_write_thread_adaptive_yield - ? immutable_db_options_.write_thread_max_yield_usec - : 0, - immutable_db_options_.write_thread_slow_yield_usec), + write_thread_(immutable_db_options_), write_controller_(mutable_db_options_.delayed_write_rate), last_batch_group_size_(0), unscheduled_flushes_(0), diff --git a/db/db_impl.h b/db/db_impl.h index 37aaf1ca5..68da91491 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -607,6 +607,11 @@ class DBImpl : public DB { uint64_t* log_used = nullptr, uint64_t log_ref = 0, bool disable_memtable = false); + Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, + WriteCallback* callback = nullptr, + uint64_t* log_used = nullptr, uint64_t log_ref = 0, + bool disable_memtable = false); + uint64_t FindMinLogContainingOutstandingPrep(); uint64_t FindMinPrepLogReferencedByMemTable(); @@ -726,16 +731,18 @@ class DBImpl : public DB { Status HandleWriteBufferFull(WriteContext* write_context); // REQUIRES: mutex locked - Status PreprocessWrite(const WriteOptions& write_options, bool need_log_sync, - bool* logs_getting_syned, WriteContext* write_context); + Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, + WriteContext* write_context); - Status WriteToWAL(const autovector& write_group, + Status WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, bool need_log_sync, bool need_log_dir_sync, SequenceNumber sequence); - // Used by WriteImpl to update bg_error_ when encountering memtable insert - // error. - void UpdateBackgroundError(const Status& memtable_insert_status); + // Used by WriteImpl to update bg_error_ if paranoid check is enabled. + void ParanoidCheck(const Status& status); + + // Used by WriteImpl to update bg_error_ in case of memtable insert error. + void MemTableInsertStatusCheck(const Status& memtable_insert_status); #ifndef ROCKSDB_LITE diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 0ee368613..20b6efd09 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -66,6 +66,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::Corruption("Batch is nullptr!"); } + if (immutable_db_options_.enable_pipelined_write) { + return PipelinedWriteImpl(write_options, my_batch, callback, log_used, + log_ref, disable_memtable); + } + Status status; PERF_TIMER_GUARD(write_pre_and_post_process_time); @@ -79,7 +84,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); write_thread_.JoinBatchGroup(&w); - if (w.state == WriteThread::STATE_PARALLEL_FOLLOWER) { + if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { // we are a non-leader in a parallel group PERF_TIMER_GUARD(write_memtable_time); @@ -93,11 +98,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, true /*concurrent_memtable_writes*/); } - if (write_thread_.CompleteParallelWorker(&w)) { + if (write_thread_.CompleteParallelMemTableWriter(&w)) { // we're responsible for exit batch group - auto last_sequence = w.parallel_group->last_sequence; + auto last_sequence = w.write_group->last_sequence; versions_->SetLastSequence(last_sequence); - UpdateBackgroundError(w.status); + MemTableInsertStatusCheck(w.status); write_thread_.ExitAsBatchGroupFollower(&w); } assert(w.state == WriteThread::STATE_COMPLETED); @@ -120,10 +125,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // when it finds suitable, and finish them in the same write batch. // This is how a write job could be done by the other writer. WriteContext write_context; - WriteThread::Writer* last_writer = &w; // Dummy intial value - autovector write_group; - WriteThread::ParallelGroup pg; - bool logs_getting_synced = false; + WriteThread::WriteGroup write_group; bool in_parallel_group = false; uint64_t last_sequence = versions_->LastSequence(); @@ -131,8 +133,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_dir_sync = need_log_sync && !log_dir_synced_; - status = PreprocessWrite(write_options, need_log_sync, &logs_getting_synced, - &write_context); + status = PreprocessWrite(write_options, &need_log_sync, &write_context); log::Writer* cur_log_writer = logs_.back().writer; mutex_.Unlock(); @@ -143,7 +144,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // into memtables last_batch_group_size_ = - write_thread_.EnterAsBatchGroupLeader(&w, &last_writer, &write_group); + write_thread_.EnterAsBatchGroupLeader(&w, &write_group); if (status.ok()) { // Rules for when we can update the memtable concurrently @@ -158,10 +159,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // relax rules 2 if we could prevent write batches from referring // more than once to a particular key. bool parallel = immutable_db_options_.allow_concurrent_memtable_write && - write_group.size() > 1; + write_group.size > 1; int total_count = 0; uint64_t total_byte_size = 0; - for (auto writer : write_group) { + for (auto* writer : write_group) { if (writer->CheckCallback(this)) { if (writer->ShouldWriteToMemtable()) { total_count += WriteBatchInternal::Count(writer->batch); @@ -187,7 +188,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, RecordTick(stats_, BYTES_WRITTEN, total_byte_size); stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); RecordTick(stats_, WRITE_DONE_BY_SELF); - auto write_done_by_other = write_group.size() - 1; + auto write_done_by_other = write_group.size - 1; if (write_done_by_other > 0) { stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other); @@ -219,12 +220,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*recovery_log_number*/, this); } else { - pg.leader = &w; - pg.last_writer = last_writer; - pg.last_sequence = last_sequence; - pg.running.store(static_cast(write_group.size()), - std::memory_order_relaxed); - write_thread_.LaunchParallelFollowers(&pg, current_sequence); + SequenceNumber next_sequence = current_sequence; + for (auto* writer : write_group) { + if (writer->ShouldWriteToMemtable()) { + writer->sequence = next_sequence; + next_sequence += WriteBatchInternal::Count(writer->batch); + } + } + write_group.last_sequence = last_sequence; + write_group.running.store(static_cast(write_group.size), + std::memory_order_relaxed); + write_thread_.LaunchParallelMemTableWriters(&write_group); in_parallel_group = true; // Each parallel follower is doing each own writes. The leader should @@ -244,19 +250,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } PERF_TIMER_START(write_pre_and_post_process_time); - // - // Is setting bg_error_ enough here? This will at least stop - // compaction and fail any further writes. - if (immutable_db_options_.paranoid_checks && !status.ok() && - !w.CallbackFailed() && !status.IsBusy() && !status.IsIncomplete()) { - mutex_.Lock(); - if (bg_error_.ok()) { - bg_error_ = status; // stop compaction & fail any further writes - } - mutex_.Unlock(); + if (!w.CallbackFailed()) { + ParanoidCheck(status); } - if (logs_getting_synced) { + if (need_log_sync) { mutex_.Lock(); MarkLogsSynced(logfile_number_, need_log_dir_sync, status); mutex_.Unlock(); @@ -266,40 +264,180 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (in_parallel_group) { // CompleteParallelWorker returns true if this thread should // handle exit, false means somebody else did - should_exit_batch_group = write_thread_.CompleteParallelWorker(&w); + should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w); } if (should_exit_batch_group) { versions_->SetLastSequence(last_sequence); - UpdateBackgroundError(w.status); - write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status); + MemTableInsertStatusCheck(w.status); + write_thread_.ExitAsBatchGroupLeader(write_group, w.status); } if (status.ok()) { status = w.FinalStatus(); } - return status; } -void DBImpl::UpdateBackgroundError(const Status& memtable_insert_status) { +Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, + WriteBatch* my_batch, WriteCallback* callback, + uint64_t* log_used, uint64_t log_ref, + bool disable_memtable) { + PERF_TIMER_GUARD(write_pre_and_post_process_time); + StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); + + WriteContext write_context; + + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + disable_memtable); + write_thread_.JoinBatchGroup(&w); + if (w.state == WriteThread::STATE_GROUP_LEADER) { + WriteThread::WriteGroup wal_write_group; + if (w.callback && !w.callback->AllowWriteBatching()) { + write_thread_.WaitForMemTableWriters(); + } + mutex_.Lock(); + bool need_log_sync = !write_options.disableWAL && write_options.sync; + bool need_log_dir_sync = need_log_sync && !log_dir_synced_; + w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); + log::Writer* cur_log_writer = logs_.back().writer; + mutex_.Unlock(); + + // This can set non-OK status if callback fail. + last_batch_group_size_ = + write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group); + const SequenceNumber current_sequence = + write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1; + size_t total_count = 0; + size_t total_byte_size = 0; + + if (w.status.ok()) { + SequenceNumber next_sequence = current_sequence; + for (auto writer : wal_write_group) { + if (writer->CheckCallback(this)) { + if (writer->ShouldWriteToMemtable()) { + writer->sequence = next_sequence; + size_t count = WriteBatchInternal::Count(writer->batch); + next_sequence += count; + total_count += count; + } + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); + } + } + if (w.disable_wal) { + has_unpersisted_data_.store(true, std::memory_order_relaxed); + } + write_thread_.UpdateLastSequence(current_sequence + total_count - 1); + } + + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + + PERF_TIMER_STOP(write_pre_and_post_process_time); + + if (w.ShouldWriteToWAL()) { + PERF_TIMER_GUARD(write_wal_time); + stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); + RecordTick(stats_, WRITE_DONE_BY_SELF, 1); + if (wal_write_group.size > 1) { + stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, + wal_write_group.size - 1); + RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); + } + w.status = WriteToWAL(wal_write_group, cur_log_writer, need_log_sync, + need_log_dir_sync, current_sequence); + } + + if (!w.CallbackFailed()) { + ParanoidCheck(w.status); + } + + if (need_log_sync) { + mutex_.Lock(); + MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status); + mutex_.Unlock(); + } + + write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); + } + + WriteThread::WriteGroup memtable_write_group; + if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { + PERF_TIMER_GUARD(write_memtable_time); + assert(w.status.ok()); + write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group); + if (memtable_write_group.size > 1 && + immutable_db_options_.allow_concurrent_memtable_write) { + write_thread_.LaunchParallelMemTableWriters(&memtable_write_group); + } else { + memtable_write_group.status = WriteBatchInternal::InsertInto( + memtable_write_group, w.sequence, column_family_memtables_.get(), + &flush_scheduler_, write_options.ignore_missing_column_families, + 0 /*log_number*/, this); + versions_->SetLastSequence(memtable_write_group.last_sequence); + write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); + } + } + + if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { + assert(w.ShouldWriteToMemtable()); + WriteBatchInternal::SetSequence(w.batch, w.sequence); + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + w.status = WriteBatchInternal::InsertInto( + &w, &column_family_memtables, &flush_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, this, + true /*concurrent_memtable_writes*/); + if (write_thread_.CompleteParallelMemTableWriter(&w)) { + MemTableInsertStatusCheck(w.status); + versions_->SetLastSequence(w.write_group->last_sequence); + write_thread_.ExitAsMemTableWriter(&w, *w.write_group); + } + } + + assert(w.state == WriteThread::STATE_COMPLETED); + if (log_used != nullptr) { + *log_used = w.log_used; + } + + return w.FinalStatus(); +} + +void DBImpl::ParanoidCheck(const Status& status) { + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. + if (immutable_db_options_.paranoid_checks && !status.ok() && + !status.IsBusy() && !status.IsIncomplete()) { + mutex_.Lock(); + if (bg_error_.ok()) { + bg_error_ = status; // stop compaction & fail any further writes + } + mutex_.Unlock(); + } +} + +void DBImpl::MemTableInsertStatusCheck(const Status& status) { // A non-OK status here indicates that the state implied by the // WAL has diverged from the in-memory state. This could be // because of a corrupt write_batch (very bad), or because the // client specified an invalid column family and didn't specify // ignore_missing_column_families. - if (!memtable_insert_status.ok()) { + if (!status.ok()) { mutex_.Lock(); assert(bg_error_.ok()); - bg_error_ = memtable_insert_status; + bg_error_ = status; mutex_.Unlock(); } } Status DBImpl::PreprocessWrite(const WriteOptions& write_options, - bool need_log_sync, bool* logs_getting_synced, + bool* need_log_sync, WriteContext* write_context) { mutex_.AssertHeld(); - assert(write_context != nullptr && logs_getting_synced != nullptr); + assert(write_context != nullptr && need_log_sync != nullptr); Status status; assert(!single_column_family_mode_ || @@ -336,7 +474,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, status = DelayWrite(last_batch_group_size_, write_options); } - if (status.ok() && need_log_sync) { + if (status.ok() && *need_log_sync) { // Wait until the parallel syncs are finished. Any sync process has to sync // the front log too so it is enough to check the status of front() // We do a while loop since log_sync_cv_ is signalled when any sync is @@ -356,26 +494,28 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, // actually write to the WAL log.getting_synced = true; } - *logs_getting_synced = true; + } else { + *need_log_sync = false; } return status; } -Status DBImpl::WriteToWAL(const autovector& write_group, +Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, bool need_log_sync, bool need_log_dir_sync, SequenceNumber sequence) { Status status; WriteBatch* merged_batch = nullptr; size_t write_with_wal = 0; - if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() && - write_group[0]->batch->GetWalTerminationPoint().is_cleared()) { + auto* leader = write_group.leader; + if (write_group.size == 1 && leader->ShouldWriteToWAL() && + leader->batch->GetWalTerminationPoint().is_cleared()) { // we simply write the first WriteBatch to WAL if the group only // contains one batch, that batch should be written to the WAL, // and the batch is not wanting to be truncated - merged_batch = write_group[0]->batch; - write_group[0]->log_used = logfile_number_; + merged_batch = leader->batch; + leader->log_used = logfile_number_; write_with_wal = 1; } else { // WAL needs all of the batches flattened into a single batch. @@ -643,6 +783,12 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log::Writer* new_log = nullptr; MemTable* new_mem = nullptr; + // In case of pipelined write is enabled, wait for all pending memtable + // writers. + if (immutable_db_options_.enable_pipelined_write) { + write_thread_.WaitForMemTableWriters(); + } + // Attempt to switch to a new memtable and trigger flush of old. // Do this without holding the dbmutex lock. assert(versions_->prev_log_number() == 0); diff --git a/db/flush_scheduler.cc b/db/flush_scheduler.cc index 08c29d598..eb24efb9c 100644 --- a/db/flush_scheduler.cc +++ b/db/flush_scheduler.cc @@ -15,11 +15,9 @@ namespace rocksdb { void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) { #ifndef NDEBUG - { - std::lock_guard lock(checking_mutex_); - assert(checking_set_.count(cfd) == 0); - checking_set_.insert(cfd); - } + std::lock_guard lock(checking_mutex_); + assert(checking_set_.count(cfd) == 0); + checking_set_.insert(cfd); #endif // NDEBUG cfd->Ref(); // Suppress false positive clang analyzer warnings. @@ -36,8 +34,11 @@ void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) { } ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { +#ifndef NDEBUG + std::lock_guard lock(checking_mutex_); +#endif // NDEBUG while (true) { - if (Empty()) { + if (head_.load(std::memory_order_relaxed) == nullptr) { return nullptr; } @@ -48,11 +49,9 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { delete node; #ifndef NDEBUG - { - auto iter = checking_set_.find(cfd); - assert(iter != checking_set_.end()); - checking_set_.erase(iter); - } + auto iter = checking_set_.find(cfd); + assert(iter != checking_set_.end()); + checking_set_.erase(iter); #endif // NDEBUG if (!cfd->IsDropped()) { @@ -68,8 +67,13 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { } bool FlushScheduler::Empty() { +#ifndef NDEBUG + std::lock_guard lock(checking_mutex_); +#endif // NDEBUG auto rv = head_.load(std::memory_order_relaxed) == nullptr; +#ifndef NDEBUG assert(rv == checking_set_.empty()); +#endif // NDEBUG return rv; } @@ -80,7 +84,7 @@ void FlushScheduler::Clear() { delete cfd; } } - assert(Empty()); + assert(head_.load(std::memory_order_relaxed) == nullptr); } } // namespace rocksdb diff --git a/db/write_batch.cc b/db/write_batch.cc index b1bb6b06c..0cc48b087 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1290,16 +1290,17 @@ public: // 2) During Write(), in a single-threaded write thread // 3) During Write(), in a concurrent context where memtables has been cloned // The reason is that it calls memtables->Seek(), which has a stateful cache -Status WriteBatchInternal::InsertInto( - const autovector& writers, SequenceNumber sequence, - ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, - bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db, - bool concurrent_memtable_writes) { +Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group, + SequenceNumber sequence, + ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, + bool ignore_missing_column_families, + uint64_t recovery_log_number, DB* db, + bool concurrent_memtable_writes) { MemTableInserter inserter(sequence, memtables, flush_scheduler, ignore_missing_column_families, recovery_log_number, db, concurrent_memtable_writes); - for (size_t i = 0; i < writers.size(); i++) { - auto w = writers[i]; + for (auto w : write_group) { if (!w->ShouldWriteToMemtable()) { continue; } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 97292a969..730edca75 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -153,7 +153,7 @@ class WriteBatchInternal { // // Under concurrent use, the caller is responsible for making sure that // the memtables object itself is thread-local. - static Status InsertInto(const autovector& batches, + static Status InsertInto(WriteThread::WriteGroup& write_group, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index 745135789..7e90a2fb8 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -119,161 +119,169 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { for (auto& allow_parallel : {true, false}) { for (auto& allow_batching : {true, false}) { for (auto& enable_WAL : {true, false}) { - for (auto& write_group : write_scenarios) { - Options options; - options.create_if_missing = true; - options.allow_concurrent_memtable_write = allow_parallel; - - ReadOptions read_options; - DB* db; - DBImpl* db_impl; - - DestroyDB(dbname, options); - ASSERT_OK(DB::Open(options, dbname, &db)); - - db_impl = dynamic_cast(db); - ASSERT_TRUE(db_impl); + for (auto& enable_pipelined_write : {true, false}) { + for (auto& write_group : write_scenarios) { + Options options; + options.create_if_missing = true; + options.allow_concurrent_memtable_write = allow_parallel; + options.enable_pipelined_write = enable_pipelined_write; + + ReadOptions read_options; + DB* db; + DBImpl* db_impl; + + DestroyDB(dbname, options); + ASSERT_OK(DB::Open(options, dbname, &db)); + + db_impl = dynamic_cast(db); + ASSERT_TRUE(db_impl); + + std::atomic threads_waiting(0); + std::atomic seq(db_impl->GetLatestSequenceNumber()); + ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + uint64_t cur_threads_waiting = 0; + bool is_leader = false; + bool is_last = false; + + // who am i + do { + cur_threads_waiting = threads_waiting.load(); + is_leader = (cur_threads_waiting == 0); + is_last = (cur_threads_waiting == write_group.size() - 1); + } while (!threads_waiting.compare_exchange_strong( + cur_threads_waiting, cur_threads_waiting + 1)); + + // check my state + auto* writer = reinterpret_cast(arg); + + if (is_leader) { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_INIT); + } + + // (meta test) the first WriteOP should indeed be the first + // and the last should be the last (all others can be out of + // order) + if (is_leader) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.front().callback_.should_fail_); + } else if (is_last) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.back().callback_.should_fail_); + } + + // wait for friends + while (threads_waiting.load() < write_group.size()) { + } + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { + // check my state + auto* writer = reinterpret_cast(arg); + + if (!allow_batching) { + // no batching so everyone should be a leader + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else if (!allow_parallel) { + ASSERT_TRUE( + writer->state == WriteThread::State::STATE_COMPLETED || + (enable_pipelined_write && + writer->state == + WriteThread::State::STATE_MEMTABLE_WRITER_LEADER)); + } + }); + + std::atomic thread_num(0); + std::atomic dummy_key(0); + std::function write_with_callback_func = [&]() { + uint32_t i = thread_num.fetch_add(1); + Random rnd(i); + + // leaders gotta lead + while (i > 0 && threads_waiting.load() < 1) { + } - std::atomic threads_waiting(0); - std::atomic seq(db_impl->GetLatestSequenceNumber()); - ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); + // loser has to lose + while (i == write_group.size() - 1 && + threads_waiting.load() < write_group.size() - 1) { + } - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { - uint64_t cur_threads_waiting = 0; - bool is_leader = false; - bool is_last = false; + auto& write_op = write_group.at(i); + write_op.Clear(); + write_op.callback_.allow_batching_ = allow_batching; - // who am i + // insert some keys + for (uint32_t j = 0; j < rnd.Next() % 50; j++) { + // grab unique key + char my_key = 0; do { - cur_threads_waiting = threads_waiting.load(); - is_leader = (cur_threads_waiting == 0); - is_last = (cur_threads_waiting == write_group.size() - 1); - } while (!threads_waiting.compare_exchange_strong( - cur_threads_waiting, cur_threads_waiting + 1)); - - // check my state - auto* writer = reinterpret_cast(arg); - - if (is_leader) { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else { - ASSERT_TRUE(writer->state == WriteThread::State::STATE_INIT); - } + my_key = dummy_key.load(); + } while ( + !dummy_key.compare_exchange_strong(my_key, my_key + 1)); - // (meta test) the first WriteOP should indeed be the first - // and the last should be the last (all others can be out of - // order) - if (is_leader) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.front().callback_.should_fail_); - } else if (is_last) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.back().callback_.should_fail_); - } + string skey(5, my_key); + string sval(10, my_key); + write_op.Put(skey, sval); - // wait for friends - while (threads_waiting.load() < write_group.size()) { + if (!write_op.callback_.should_fail_) { + seq.fetch_add(1); } - }); - - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { - // check my state - auto* writer = reinterpret_cast(arg); - - if (!allow_batching) { - // no batching so everyone should be a leader - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else if (!allow_parallel) { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_COMPLETED); - } - }); - - std::atomic thread_num(0); - std::atomic dummy_key(0); - std::function write_with_callback_func = [&]() { - uint32_t i = thread_num.fetch_add(1); - Random rnd(i); - - // leaders gotta lead - while (i > 0 && threads_waiting.load() < 1) { - } - - // loser has to lose - while (i == write_group.size() - 1 && - threads_waiting.load() < write_group.size() - 1) { - } - - auto& write_op = write_group.at(i); - write_op.Clear(); - write_op.callback_.allow_batching_ = allow_batching; - - // insert some keys - for (uint32_t j = 0; j < rnd.Next() % 50; j++) { - // grab unique key - char my_key = 0; - do { - my_key = dummy_key.load(); - } while (!dummy_key.compare_exchange_strong(my_key, my_key + 1)); + } - string skey(5, my_key); - string sval(10, my_key); - write_op.Put(skey, sval); + WriteOptions woptions; + woptions.disableWAL = !enable_WAL; + woptions.sync = enable_WAL; + Status s = db_impl->WriteWithCallback( + woptions, &write_op.write_batch_, &write_op.callback_); - if (!write_op.callback_.should_fail_) { - seq.fetch_add(1); + if (write_op.callback_.should_fail_) { + ASSERT_TRUE(s.IsBusy()); + } else { + ASSERT_OK(s); } - } + }; - WriteOptions woptions; - woptions.disableWAL = !enable_WAL; - woptions.sync = enable_WAL; - Status s = db_impl->WriteWithCallback( - woptions, &write_op.write_batch_, &write_op.callback_); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - if (write_op.callback_.should_fail_) { - ASSERT_TRUE(s.IsBusy()); - } else { - ASSERT_OK(s); + // do all the writes + std::vector threads; + for (uint32_t i = 0; i < write_group.size(); i++) { + threads.emplace_back(write_with_callback_func); + } + for (auto& t : threads) { + t.join(); } - }; - - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - - // do all the writes - std::vector threads; - for (uint32_t i = 0; i < write_group.size(); i++) { - threads.emplace_back(write_with_callback_func); - } - for (auto& t : threads) { - t.join(); - } - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - // check for keys - string value; - for (auto& w : write_group) { - ASSERT_TRUE(w.callback_.was_called_); - for (auto& kvp : w.kvs_) { - if (w.callback_.should_fail_) { - ASSERT_TRUE( - db->Get(read_options, kvp.first, &value).IsNotFound()); - } else { - ASSERT_OK(db->Get(read_options, kvp.first, &value)); - ASSERT_EQ(value, kvp.second); + // check for keys + string value; + for (auto& w : write_group) { + ASSERT_TRUE(w.callback_.was_called_); + for (auto& kvp : w.kvs_) { + if (w.callback_.should_fail_) { + ASSERT_TRUE( + db->Get(read_options, kvp.first, &value).IsNotFound()); + } else { + ASSERT_OK(db->Get(read_options, kvp.first, &value)); + ASSERT_EQ(value, kvp.second); + } } } - } - ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber()); + ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber()); - delete db; - DestroyDB(dbname, options); + delete db; + DestroyDB(dbname, options); + } } } } diff --git a/db/write_thread.cc b/db/write_thread.cc index ca3f9b36d..0938ad28c 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -15,10 +15,17 @@ namespace rocksdb { -WriteThread::WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec) - : max_yield_usec_(max_yield_usec), - slow_yield_usec_(slow_yield_usec), - newest_writer_(nullptr) {} +WriteThread::WriteThread(const ImmutableDBOptions& db_options) + : max_yield_usec_(db_options.enable_write_thread_adaptive_yield + ? db_options.write_thread_max_yield_usec + : 0), + slow_yield_usec_(db_options.write_thread_slow_yield_usec), + allow_concurrent_memtable_write_( + db_options.allow_concurrent_memtable_write), + enable_pipelined_write_(db_options.enable_pipelined_write), + newest_writer_(nullptr), + newest_memtable_writer_(nullptr), + last_sequence_(0) {} uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) { // We're going to block. Lazily create the mutex. We guarantee @@ -184,22 +191,39 @@ void WriteThread::SetState(Writer* w, uint8_t new_state) { } } -void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) { +bool WriteThread::LinkOne(Writer* w, std::atomic* newest_writer) { + assert(newest_writer != nullptr); assert(w->state == STATE_INIT); - + Writer* writers = newest_writer->load(std::memory_order_relaxed); while (true) { - Writer* writers = newest_writer_.load(std::memory_order_relaxed); w->link_older = writers; - if (newest_writer_.compare_exchange_strong(writers, w)) { - if (writers == nullptr) { - // this isn't part of the WriteThread machinery, but helps with - // debugging and is checked by an assert in WriteImpl - w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed); - } - // Then we are the head of the queue and hence definiltly the leader - *linked_as_leader = (writers == nullptr); - // Otherwise we will wait for previous leader to define our status - return; + if (newest_writer->compare_exchange_weak(writers, w)) { + return (writers == nullptr); + } + } +} + +bool WriteThread::LinkGroup(WriteGroup& write_group, + std::atomic* newest_writer) { + assert(newest_writer != nullptr); + Writer* leader = write_group.leader; + Writer* last_writer = write_group.last_writer; + Writer* w = last_writer; + while (true) { + // Unset link_newer pointers to make sure when we call + // CreateMissingNewerLinks later it create all missing links. + w->link_newer = nullptr; + w->write_group = nullptr; + if (w == leader) { + break; + } + w = w->link_older; + } + Writer* newest = newest_writer->load(std::memory_order_relaxed); + while (true) { + leader->link_older = newest; + if (newest_writer->compare_exchange_weak(newest, last_writer)) { + return (newest == nullptr); } } } @@ -216,12 +240,43 @@ void WriteThread::CreateMissingNewerLinks(Writer* head) { } } +void WriteThread::CompleteLeader(WriteGroup& write_group) { + assert(write_group.size > 0); + Writer* leader = write_group.leader; + if (write_group.size == 1) { + write_group.leader = nullptr; + write_group.last_writer = nullptr; + } else { + assert(leader->link_newer != nullptr); + leader->link_newer->link_older = nullptr; + write_group.leader = leader->link_newer; + } + write_group.size -= 1; + SetState(leader, STATE_COMPLETED); +} + +void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) { + assert(write_group.size > 1); + assert(w != write_group.leader); + if (w == write_group.last_writer) { + w->link_older->link_newer = nullptr; + write_group.last_writer = w->link_older; + } else { + w->link_older->link_newer = w->link_newer; + w->link_newer->link_older = w->link_older; + } + write_group.size -= 1; + SetState(w, STATE_COMPLETED); +} + void WriteThread::JoinBatchGroup(Writer* w) { static AdaptationContext ctx("JoinBatchGroup"); assert(w->batch != nullptr); - bool linked_as_leader; - LinkOne(w, &linked_as_leader); + bool linked_as_leader = LinkOne(w, &newest_writer_); + if (linked_as_leader) { + SetState(w, STATE_GROUP_LEADER); + } TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w); @@ -231,23 +286,28 @@ void WriteThread::JoinBatchGroup(Writer* w) { * 1) An existing leader pick us as the new leader when it finishes * 2) An existing leader pick us as its follewer and * 2.1) finishes the memtable writes on our behalf - * 2.2) Or tell us to finish the memtable writes it in pralallel + * 2.2) Or tell us to finish the memtable writes in pralallel + * 3) (pipelined write) An existing leader pick us as its follower and + * finish book-keeping and WAL write for us, enqueue us as pending + * memtable writer, and + * 3.1) we become memtable writer group leader, or + * 3.2) an existing memtable writer group leader tell us to finish memtable + * writes in parallel. */ - AwaitState(w, - STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED, + AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | + STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &ctx); TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); } } -size_t WriteThread::EnterAsBatchGroupLeader( - Writer* leader, WriteThread::Writer** last_writer, - autovector* write_batch_group) { +size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, + WriteGroup* write_group) { assert(leader->link_older == nullptr); assert(leader->batch != nullptr); + assert(write_group != nullptr); size_t size = WriteBatchInternal::ByteSize(leader->batch); - write_batch_group->push_back(leader); // Allow the group to grow up to a maximum size, but if the // original write is small, limit the growth so we do not slow @@ -257,8 +317,10 @@ size_t WriteThread::EnterAsBatchGroupLeader( max_size = size + (128 << 10); } - *last_writer = leader; - + leader->write_group = write_group; + write_group->leader = leader; + write_group->last_writer = leader; + write_group->size = 1; Writer* newest_writer = newest_writer_.load(std::memory_order_acquire); // This is safe regardless of any db mutex status of the caller. Previous @@ -308,136 +370,268 @@ size_t WriteThread::EnterAsBatchGroupLeader( break; } + w->write_group = write_group; size += batch_size; - write_batch_group->push_back(w); - w->in_batch_group = true; - *last_writer = w; + write_group->last_writer = w; + write_group->size++; } return size; } -void WriteThread::LaunchParallelFollowers(ParallelGroup* pg, - SequenceNumber sequence) { - // EnterAsBatchGroupLeader already created the links from leader to - // newer writers in the group +void WriteThread::EnterAsMemTableWriter(Writer* leader, + WriteGroup* write_group) { + assert(leader != nullptr); + assert(leader->link_older == nullptr); + assert(leader->batch != nullptr); + assert(write_group != nullptr); + + size_t size = WriteBatchInternal::ByteSize(leader->batch); + + // Allow the group to grow up to a maximum size, but if the + // original write is small, limit the growth so we do not slow + // down the small write too much. + size_t max_size = 1 << 20; + if (size <= (128 << 10)) { + max_size = size + (128 << 10); + } + + leader->write_group = write_group; + write_group->leader = leader; + write_group->size = 1; + Writer* last_writer = leader; + + if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) { + Writer* newest_writer = newest_memtable_writer_.load(); + CreateMissingNewerLinks(newest_writer); + + Writer* w = leader; + while (w != newest_writer) { + w = w->link_newer; + + if (w->batch == nullptr) { + break; + } + + if (w->batch->HasMerge()) { + break; + } - pg->leader->parallel_group = pg; + if (!allow_concurrent_memtable_write_) { + auto batch_size = WriteBatchInternal::ByteSize(w->batch); + if (size + batch_size > max_size) { + // Do not make batch too big + break; + } + size += batch_size; + } - Writer* w = pg->leader; - w->sequence = sequence; + w->write_group = write_group; + last_writer = w; + write_group->size++; + } + } + + write_group->last_writer = last_writer; + write_group->last_sequence = + last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1; +} - // Initialize and wake up the others - while (w != pg->last_writer) { - // Writers that won't write don't get sequence allotment - if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) { - // There is a sequence number of each written key - sequence += WriteBatchInternal::Count(w->batch); +void WriteThread::ExitAsMemTableWriter(Writer* self, WriteGroup& write_group) { + Writer* leader = write_group.leader; + Writer* last_writer = write_group.last_writer; + + Writer* newest_writer = last_writer; + if (!newest_memtable_writer_.compare_exchange_strong(newest_writer, + nullptr)) { + CreateMissingNewerLinks(newest_writer); + Writer* next_leader = last_writer->link_newer; + assert(next_leader != nullptr); + next_leader->link_older = nullptr; + SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER); + } + Writer* w = leader; + while (true) { + if (!write_group.status.ok()) { + w->status = write_group.status; } - w = w->link_newer; + Writer* next = w->link_newer; + if (w != leader) { + SetState(w, STATE_COMPLETED); + } + if (w == last_writer) { + break; + } + w = next; + } + // Note that leader has to exit last, since it owns the write group. + SetState(leader, STATE_COMPLETED); +} - w->sequence = sequence; // sequence number for the first key in the batch - w->parallel_group = pg; - SetState(w, STATE_PARALLEL_FOLLOWER); +void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) { + assert(write_group != nullptr); + write_group->running.store(write_group->size); + for (auto w : *write_group) { + SetState(w, STATE_PARALLEL_MEMTABLE_WRITER); } } // This method is called by both the leader and parallel followers -bool WriteThread::CompleteParallelWorker(Writer* w) { - static AdaptationContext ctx("CompleteParallelWorker"); +bool WriteThread::CompleteParallelMemTableWriter(Writer* w) { + static AdaptationContext ctx("CompleteParallelMemTableWriter"); - auto* pg = w->parallel_group; + auto* write_group = w->write_group; if (!w->status.ok()) { - std::lock_guard guard(pg->leader->StateMutex()); - pg->status = w->status; + std::lock_guard guard(write_group->leader->StateMutex()); + write_group->status = w->status; } - if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) { + if (write_group->running-- > 1) { // we're not the last one AwaitState(w, STATE_COMPLETED, &ctx); return false; } // else we're the last parallel worker and should perform exit duties. - w->status = pg->status; + w->status = write_group->status; return true; } void WriteThread::ExitAsBatchGroupFollower(Writer* w) { - auto* pg = w->parallel_group; + auto* write_group = w->write_group; - assert(w->state == STATE_PARALLEL_FOLLOWER); - assert(pg->status.ok()); - ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status); + assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER); + assert(write_group->status.ok()); + ExitAsBatchGroupLeader(*write_group, write_group->status); assert(w->status.ok()); assert(w->state == STATE_COMPLETED); - SetState(pg->leader, STATE_COMPLETED); + SetState(write_group->leader, STATE_COMPLETED); } -void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, +void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, Status status) { + static AdaptationContext ctx("ExitAsBatchGroupLeader"); + Writer* leader = write_group.leader; + Writer* last_writer = write_group.last_writer; assert(leader->link_older == nullptr); - Writer* head = newest_writer_.load(std::memory_order_acquire); - if (head != last_writer || - !newest_writer_.compare_exchange_strong(head, nullptr)) { - // Either w wasn't the head during the load(), or it was the head - // during the load() but somebody else pushed onto the list before - // we did the compare_exchange_strong (causing it to fail). In the - // latter case compare_exchange_strong has the effect of re-reading - // its first param (head). No need to retry a failing CAS, because - // only a departing leader (which we are at the moment) can remove - // nodes from the list. - assert(head != last_writer); - - // After walking link_older starting from head (if not already done) - // we will be able to traverse w->link_newer below. This function - // can only be called from an active leader, only a leader can - // clear newest_writer_, we didn't, and only a clear newest_writer_ - // could cause the next leader to start their work without a call - // to MarkJoined, so we can definitely conclude that no other leader - // work is going on here (with or without db mutex). - CreateMissingNewerLinks(head); - assert(last_writer->link_newer->link_older == last_writer); - last_writer->link_newer->link_older = nullptr; - - // Next leader didn't self-identify, because newest_writer_ wasn't - // nullptr when they enqueued (we were definitely enqueued before them - // and are still in the list). That means leader handoff occurs when - // we call MarkJoined - SetState(last_writer->link_newer, STATE_GROUP_LEADER); - } - // else nobody else was waiting, although there might already be a new - // leader now - - while (last_writer != leader) { - last_writer->status = status; - // we need to read link_older before calling SetState, because as soon - // as it is marked committed the other thread's Await may return and - // deallocate the Writer. - auto next = last_writer->link_older; - SetState(last_writer, STATE_COMPLETED); - - last_writer = next; + if (enable_pipelined_write_) { + // Notify writers don't write to memtable to exit. + for (Writer* w = last_writer; w != leader;) { + Writer* next = w->link_older; + w->status = status; + if (!w->ShouldWriteToMemtable()) { + CompleteFollower(w, write_group); + } + w = next; + } + if (!leader->ShouldWriteToMemtable()) { + CompleteLeader(write_group); + } + // Link the ramaining of the group to memtable writer list. + if (write_group.size > 0) { + if (LinkGroup(write_group, &newest_memtable_writer_)) { + // The leader can now be different from current writer. + SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER); + } + } + // Reset newest_writer_ and wake up the next leader. + Writer* newest_writer = last_writer; + if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) { + Writer* next_leader = newest_writer; + while (next_leader->link_older != last_writer) { + next_leader = next_leader->link_older; + assert(next_leader != nullptr); + } + next_leader->link_older = nullptr; + SetState(next_leader, STATE_GROUP_LEADER); + } + AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER | + STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, + &ctx); + } else { + Writer* head = newest_writer_.load(std::memory_order_acquire); + if (head != last_writer || + !newest_writer_.compare_exchange_strong(head, nullptr)) { + // Either w wasn't the head during the load(), or it was the head + // during the load() but somebody else pushed onto the list before + // we did the compare_exchange_strong (causing it to fail). In the + // latter case compare_exchange_strong has the effect of re-reading + // its first param (head). No need to retry a failing CAS, because + // only a departing leader (which we are at the moment) can remove + // nodes from the list. + assert(head != last_writer); + + // After walking link_older starting from head (if not already done) + // we will be able to traverse w->link_newer below. This function + // can only be called from an active leader, only a leader can + // clear newest_writer_, we didn't, and only a clear newest_writer_ + // could cause the next leader to start their work without a call + // to MarkJoined, so we can definitely conclude that no other leader + // work is going on here (with or without db mutex). + CreateMissingNewerLinks(head); + assert(last_writer->link_newer->link_older == last_writer); + last_writer->link_newer->link_older = nullptr; + + // Next leader didn't self-identify, because newest_writer_ wasn't + // nullptr when they enqueued (we were definitely enqueued before them + // and are still in the list). That means leader handoff occurs when + // we call MarkJoined + SetState(last_writer->link_newer, STATE_GROUP_LEADER); + } + // else nobody else was waiting, although there might already be a new + // leader now + + while (last_writer != leader) { + last_writer->status = status; + // we need to read link_older before calling SetState, because as soon + // as it is marked committed the other thread's Await may return and + // deallocate the Writer. + auto next = last_writer->link_older; + SetState(last_writer, STATE_COMPLETED); + + last_writer = next; + } } } void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { static AdaptationContext ctx("EnterUnbatched"); - - assert(w->batch == nullptr); - bool linked_as_leader; - LinkOne(w, &linked_as_leader); + assert(w != nullptr && w->batch == nullptr); + mu->Unlock(); + bool linked_as_leader = LinkOne(w, &newest_writer_); if (!linked_as_leader) { - mu->Unlock(); TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait"); // Last leader will not pick us as a follower since our batch is nullptr AwaitState(w, STATE_GROUP_LEADER, &ctx); - mu->Lock(); } + if (enable_pipelined_write_) { + WaitForMemTableWriters(); + } + mu->Lock(); } void WriteThread::ExitUnbatched(Writer* w) { - Status dummy_status; - ExitAsBatchGroupLeader(w, w, dummy_status); + assert(w != nullptr); + Writer* newest_writer = w; + if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) { + CreateMissingNewerLinks(newest_writer); + Writer* next_leader = w->link_newer; + assert(next_leader != nullptr); + next_leader->link_older = nullptr; + SetState(next_leader, STATE_GROUP_LEADER); + } +} + +void WriteThread::WaitForMemTableWriters() { + static AdaptationContext ctx("WaitForMemTableWriters"); + assert(enable_pipelined_write_); + if (newest_memtable_writer_.load() == nullptr) { + return; + } + Writer w; + if (!LinkOne(&w, &newest_memtable_writer_)) { + AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &ctx); + } + newest_memtable_writer_.store(nullptr); } } // namespace rocksdb diff --git a/db/write_thread.h b/db/write_thread.h index a7dae3019..c72c95a11 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -49,32 +49,66 @@ class WriteThread { // the leader to STATE_COMPLETED. STATE_GROUP_LEADER = 2, - // A Writer that has returned as a follower in a parallel group. - // It should apply its batch to the memtable and then call - // CompleteParallelWorker. When someone calls ExitAsBatchGroupLeader - // or EarlyExitParallelGroup this state will get transitioned to - // STATE_COMPLETED. - STATE_PARALLEL_FOLLOWER = 4, + // The state used to inform a waiting writer that it has become the + // leader of memtable writer group. The leader will either write + // memtable for the whole group, or launch a parallel group write + // to memtable by calling LaunchParallelMemTableWrite. + STATE_MEMTABLE_WRITER_LEADER = 4, + + // The state used to inform a waiting writer that it has become a + // parallel memtable writer. It can be the group leader who launch the + // praallel writer group, or one of the followers. The writer should then + // apply its batch to the memtable concurrently and call + // CompleteParallelMemTableWriter. + STATE_PARALLEL_MEMTABLE_WRITER = 8, // A follower whose writes have been applied, or a parallel leader // whose followers have all finished their work. This is a terminal // state. - STATE_COMPLETED = 8, + STATE_COMPLETED = 16, // A state indicating that the thread may be waiting using StateMutex() // and StateCondVar() - STATE_LOCKED_WAITING = 16, + STATE_LOCKED_WAITING = 32, }; struct Writer; - struct ParallelGroup { - Writer* leader; - Writer* last_writer; + struct WriteGroup { + Writer* leader = nullptr; + Writer* last_writer = nullptr; SequenceNumber last_sequence; // before running goes to zero, status needs leader->StateMutex() Status status; - std::atomic running; + std::atomic running; + size_t size = 0; + + struct Iterator { + Writer* writer; + Writer* last_writer; + + explicit Iterator(Writer* w, Writer* last) + : writer(w), last_writer(last) {} + + Writer* operator*() const { return writer; } + + Iterator& operator++() { + assert(writer != nullptr); + if (writer == last_writer) { + writer = nullptr; + } else { + writer = writer->link_newer; + } + return *this; + } + + bool operator!=(const Iterator& other) const { + return writer != other.writer; + } + }; + + Iterator begin() const { return Iterator(leader, last_writer); } + Iterator end() const { return Iterator(nullptr, nullptr); } }; // Information kept for every waiting writer. @@ -86,11 +120,10 @@ class WriteThread { bool disable_memtable; uint64_t log_used; // log number that this batch was inserted into uint64_t log_ref; // log number that memtable insert should reference - bool in_batch_group; WriteCallback* callback; bool made_waitable; // records lazy construction of mutex and cv std::atomic state; // write under StateMutex() or pre-link - ParallelGroup* parallel_group; + WriteGroup* write_group; SequenceNumber sequence; // the sequence number to use for the first key Status status; // status of memtable inserter Status callback_status; // status returned by callback->Callback() @@ -107,11 +140,10 @@ class WriteThread { disable_memtable(false), log_used(0), log_ref(0), - in_batch_group(false), callback(nullptr), made_waitable(false), state(STATE_INIT), - parallel_group(nullptr), + write_group(nullptr), link_older(nullptr), link_newer(nullptr) {} @@ -124,11 +156,10 @@ class WriteThread { disable_memtable(_disable_memtable), log_used(0), log_ref(_log_ref), - in_batch_group(false), callback(_callback), made_waitable(false), state(STATE_INIT), - parallel_group(nullptr), + write_group(nullptr), link_older(nullptr), link_newer(nullptr) {} @@ -182,10 +213,12 @@ class WriteThread { } bool ShouldWriteToMemtable() { - return !CallbackFailed() && !disable_memtable; + return status.ok() && !CallbackFailed() && !disable_memtable; } - bool ShouldWriteToWAL() { return !CallbackFailed() && !disable_wal; } + bool ShouldWriteToWAL() { + return status.ok() && !CallbackFailed() && !disable_wal; + } // No other mutexes may be acquired while holding StateMutex(), it is // always last in the order @@ -201,7 +234,16 @@ class WriteThread { } }; - WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec); + struct AdaptationContext { + const char* name; + std::atomic value; + + explicit AdaptationContext(const char* name0) : name(name0), value(0) {} + }; + + explicit WriteThread(const ImmutableDBOptions& db_options); + + virtual ~WriteThread() = default; // IMPORTANT: None of the methods in this class rely on the db mutex // for correctness. All of the methods except JoinBatchGroup and @@ -226,40 +268,45 @@ class WriteThread { // Constructs a write batch group led by leader, which should be a // Writer passed to JoinBatchGroup on the current thread. // - // Writer* leader: Writer that is STATE_GROUP_LEADER - // Writer** last_writer: Out-param that identifies the last follower - // autovector* write_batch_group: Out-param of group members - // returns: Total batch group byte size - size_t EnterAsBatchGroupLeader( - Writer* leader, Writer** last_writer, - autovector* write_batch_group); + // Writer* leader: Writer that is STATE_GROUP_LEADER + // WriteGroup* write_group: Out-param of group members + // returns: Total batch group byte size + size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group); + + // Unlinks the Writer-s in a batch group, wakes up the non-leaders, + // and wakes up the next leader (if any). + // + // WriteGroup* write_group: the write group + // Status status: Status of write operation + void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status); + + // Exit batch group on behalf of batch group leader. + void ExitAsBatchGroupFollower(Writer* w); + + // Constructs a write batch group led by leader from newest_memtable_writers_ + // list. The leader should either write memtable for the whole group and + // call ExitAsMemTableWriter, or launch parallel memtable write through + // LaunchParallelMemTableWriters. + void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup); + + // Memtable writer group leader, or the last finished writer in a parallel + // write group, exit from the newest_memtable_writers_ list, and wake up + // the next leader if needed. + void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group); // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the // non-leader members of this write batch group. Sets Writer::sequence // before waking them up. // - // ParallalGroup* pg: Extra state used to coordinate the parallel add - // SequenceNumber sequence: Starting sequence number to assign to Writer-s - void LaunchParallelFollowers(ParallelGroup* pg, SequenceNumber sequence); + // WriteGroup* write_group: Extra state used to coordinate the parallel add + void LaunchParallelMemTableWriters(WriteGroup* write_group); // Reports the completion of w's batch to the parallel group leader, and // waits for the rest of the parallel batch to complete. Returns true // if this thread is the last to complete, and hence should advance // the sequence number and then call EarlyExitParallelGroup, false if // someone else has already taken responsibility for that. - bool CompleteParallelWorker(Writer* w); - - // Exit batch group on behalf of batch group leader. - void ExitAsBatchGroupFollower(Writer* w); - - // Unlinks the Writer-s in a batch group, wakes up the non-leaders, - // and wakes up the next leader (if any). - // - // Writer* leader: From EnterAsBatchGroupLeader - // Writer* last_writer: Value of out-param of EnterAsBatchGroupLeader - // Status status: Status of write operation - void ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, - Status status); + bool CompleteParallelMemTableWriter(Writer* w); // Waits for all preceding writers (unlocking mu while waiting), then // registers w as the currently proceeding writer. @@ -273,21 +320,40 @@ class WriteThread { // writers. void ExitUnbatched(Writer* w); - struct AdaptationContext { - const char* name; - std::atomic value; + // Wait for all parallel memtable writers to finish, in case pipelined + // write is enabled. + void WaitForMemTableWriters(); - explicit AdaptationContext(const char* name0) : name(name0), value(0) {} - }; + SequenceNumber UpdateLastSequence(SequenceNumber sequence) { + if (sequence > last_sequence_) { + last_sequence_ = sequence; + } + return last_sequence_; + } private: - uint64_t max_yield_usec_; - uint64_t slow_yield_usec_; + // See AwaitState. + const uint64_t max_yield_usec_; + const uint64_t slow_yield_usec_; + + // Allow multiple writers write to memtable concurrently. + const bool allow_concurrent_memtable_write_; + + // Enable pipelined write to WAL and memtable. + const bool enable_pipelined_write_; - // Points to the newest pending Writer. Only leader can remove - // elements, adding can be done lock-free by anybody + // Points to the newest pending writer. Only leader can remove + // elements, adding can be done lock-free by anybody. std::atomic newest_writer_; + // Points to the newest pending memtable writer. Used only when pipelined + // write is enabled. + std::atomic newest_memtable_writer_; + + // The last sequence that have been consumed by a writer. The sequence + // is not necessary visible to reads because the writer can be ongoing. + SequenceNumber last_sequence_; + // Waits for w->state & goal_mask using w->StateMutex(). Returns // the state that satisfies goal_mask. uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask); @@ -298,16 +364,30 @@ class WriteThread { // a context-dependent static. uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx); + // Set writer state and wake the writer up if it is waiting. void SetState(Writer* w, uint8_t new_state); - // Links w into the newest_writer_ list. Sets *linked_as_leader to - // true if w was linked directly into the leader position. Safe to - // call from multiple threads without external locking. - void LinkOne(Writer* w, bool* linked_as_leader); + // Links w into the newest_writer list. Return true if w was linked directly + // into the leader position. Safe to call from multiple threads without + // external locking. + bool LinkOne(Writer* w, std::atomic* newest_writer); + + // Link write group into the newest_writer list as a whole, while keeping the + // order of the writers unchanged. Return true if the group was linked + // directly into the leader position. + bool LinkGroup(WriteGroup& write_group, std::atomic* newest_writer); // Computes any missing link_newer links. Should not be called // concurrently with itself. void CreateMissingNewerLinks(Writer* head); + + // Set the leader in write_group to completed state and remove it from the + // write group. + void CompleteLeader(WriteGroup& write_group); + + // Set a follower in write_group to completed state and remove it from the + // write group. + void CompleteFollower(Writer* w, WriteGroup& write_group); }; } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 40b917c6d..2dd8e94f5 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -751,6 +751,21 @@ struct DBOptions { // Default: 16MB/s uint64_t delayed_write_rate = 16 * 1024U * 1024U; + // By default, a single write thread queue is maintained. The thread gets + // to the head of the queue becomes write batch group leader and responsible + // for writing to WAL and memtable for the batch group. + // + // If enable_pipelined_write is true, separate write thread queue is + // maintained for WAL write and memtable write. A write thread first enter WAL + // writer queue and then memtable writer queue. Pending thread on the WAL + // writer queue thus only have to wait for previous writers to finish thier + // WAL writing but not the memtable writing. Enabling the feature may improve + // write throughput and reduce latency of the prepare phase of two-phase + // commit. + // + // Default: false + bool enable_pipelined_write = false; + // If true, allow multi-writers to update mem tables in parallel. // Only some memtable_factory-s support concurrent writes; currently it // is implemented only for SkipListFactory. Concurrent memtable writes diff --git a/options/db_options.cc b/options/db_options.cc index 55f87a7bf..df048d403 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -70,6 +70,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) wal_bytes_per_sync(options.wal_bytes_per_sync), listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), + enable_pipelined_write(options.enable_pipelined_write), allow_concurrent_memtable_write(options.allow_concurrent_memtable_write), enable_write_thread_adaptive_yield( options.enable_write_thread_adaptive_yield), @@ -189,6 +190,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { wal_recovery_mode); ROCKS_LOG_HEADER(log, " Options.enable_thread_tracking: %d", enable_thread_tracking); + ROCKS_LOG_HEADER(log, " Options.enable_pipelined_write: %d", + enable_pipelined_write); ROCKS_LOG_HEADER(log, " Options.allow_concurrent_memtable_write: %d", allow_concurrent_memtable_write); ROCKS_LOG_HEADER(log, " Options.enable_write_thread_adaptive_yield: %d", diff --git a/options/db_options.h b/options/db_options.h index e0d4a823e..befa2daa3 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -63,6 +63,7 @@ struct ImmutableDBOptions { uint64_t wal_bytes_per_sync; std::vector> listeners; bool enable_thread_tracking; + bool enable_pipelined_write; bool allow_concurrent_memtable_write; bool enable_write_thread_adaptive_yield; uint64_t write_thread_max_yield_usec; diff --git a/options/options.cc b/options/options.cc index 780e86532..198d4bbca 100644 --- a/options/options.cc +++ b/options/options.cc @@ -176,6 +176,7 @@ DBOptions::DBOptions(const Options& options) listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), delayed_write_rate(options.delayed_write_rate), + enable_pipelined_write(options.enable_pipelined_write), allow_concurrent_memtable_write(options.allow_concurrent_memtable_write), enable_write_thread_adaptive_yield( options.enable_write_thread_adaptive_yield), diff --git a/options/options_helper.h b/options/options_helper.h index e331416a0..8c48ae6ea 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -305,6 +305,9 @@ static std::unordered_map db_options_type_info = { {"fail_if_options_file_error", {offsetof(struct DBOptions, fail_if_options_file_error), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, + {"enable_pipelined_write", + {offsetof(struct DBOptions, enable_pipelined_write), OptionType::kBoolean, + OptionVerificationType::kNormal, false, 0}}, {"allow_concurrent_memtable_write", {offsetof(struct DBOptions, allow_concurrent_memtable_write), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 57256bb32..ce76093c5 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -281,6 +281,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "random_access_max_buffer_size=1048576;" "advise_random_on_open=true;" "fail_if_options_file_error=false;" + "enable_pipelined_write=false;" "allow_concurrent_memtable_write=true;" "wal_recovery_mode=kPointInTimeRecovery;" "enable_write_thread_adaptive_yield=true;"