diff --git a/CMakeLists.txt b/CMakeLists.txt index ca7c7a0b9..ca570ccd2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -191,6 +191,7 @@ set(SOURCES util/coding.cc util/compaction_job_stats_impl.cc util/comparator.cc + util/concurrent_arena.cc util/crc32c.cc util/db_info_dumper.cc util/delete_scheduler_impl.cc diff --git a/db/column_family.cc b/db/column_family.cc index 2ef5a907f..0b74e4ebe 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -110,6 +110,20 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) { return Status::OK(); } +Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) { + if (cf_options.inplace_update_support) { + return Status::InvalidArgument( + "In-place memtable updates (inplace_update_support) is not compatible " + "with concurrent writes (allow_concurrent_memtable_write)"); + } + if (cf_options.filter_deletes) { + return Status::InvalidArgument( + "Delete filtering (filter_deletes) is not compatible with concurrent " + "memtable writes (allow_concurrent_memtable_writes)"); + } + return Status::OK(); +} + ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, const InternalKeyComparator* icmp, const ColumnFamilyOptions& src) { @@ -916,13 +930,6 @@ ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() { return &handle_; } -void ColumnFamilyMemTablesImpl::CheckMemtableFull() { - if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) { - flush_scheduler_->ScheduleFlush(current_); - current_->mem()->MarkFlushScheduled(); - } -} - uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) { uint32_t column_family_id = 0; if (column_family != nullptr) { diff --git a/db/column_family.h b/db/column_family.h index 64bb1c9a1..4ba154779 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -19,12 +19,10 @@ #include "db/write_controller.h" #include "db/table_cache.h" #include "db/table_properties_collector.h" -#include "db/flush_scheduler.h" #include "rocksdb/compaction_job_stats.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/options.h" -#include "util/instrumented_mutex.h" #include "util/mutable_cf_options.h" #include "util/thread_local.h" @@ -134,6 +132,9 @@ struct SuperVersion { extern Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options); +extern Status CheckConcurrentWritesSupported( + const ColumnFamilyOptions& cf_options); + extern ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, const InternalKeyComparator* icmp, const ColumnFamilyOptions& src); @@ -158,14 +159,16 @@ class ColumnFamilyData { // thread-safe const std::string& GetName() const { return name_; } - // Ref() can only be called whily holding a DB mutex or during a - // single-threaded write. + // Ref() can only be called from a context where the caller can guarantee + // that ColumnFamilyData is alive (while holding a non-zero ref already, + // holding a DB mutex, or as the leader in a write batch group). void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); } - // will just decrease reference count to 0, but will not delete it. returns - // true if the ref count was decreased to zero. in that case, it can be - // deleted by the caller immediately, or later, by calling - // FreeDeadColumnFamilies() - // Unref() can only be called while holding a DB mutex + + // Unref decreases the reference count, but does not handle deletion + // when the count goes to 0. If this method returns true then the + // caller should delete the instance immediately, or later, by calling + // FreeDeadColumnFamilies(). Unref() can only be called while holding + // a DB mutex, or during single-threaded recovery. bool Unref() { int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed); assert(old_refs > 0); @@ -497,15 +500,18 @@ class ColumnFamilySet { // memtables of different column families (specified by ID in the write batch) class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { public: - explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set, - FlushScheduler* flush_scheduler) - : column_family_set_(column_family_set), - current_(nullptr), - flush_scheduler_(flush_scheduler) {} + explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set) + : column_family_set_(column_family_set), current_(nullptr) {} + + // Constructs a ColumnFamilyMemTablesImpl equivalent to one constructed + // with the arguments used to construct *orig. + explicit ColumnFamilyMemTablesImpl(ColumnFamilyMemTablesImpl* orig) + : column_family_set_(orig->column_family_set_), current_(nullptr) {} // sets current_ to ColumnFamilyData with column_family_id // returns false if column family doesn't exist - // REQUIRES: under a DB mutex OR from a write thread + // REQUIRES: use this function of DBImpl::column_family_memtables_ should be + // under a DB mutex OR from a write thread bool Seek(uint32_t column_family_id) override; // Returns log number of the selected column family @@ -513,20 +519,23 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { uint64_t GetLogNumber() const override; // REQUIRES: Seek() called first - // REQUIRES: under a DB mutex OR from a write thread + // REQUIRES: use this function of DBImpl::column_family_memtables_ should be + // under a DB mutex OR from a write thread virtual MemTable* GetMemTable() const override; // Returns column family handle for the selected column family - // REQUIRES: under a DB mutex OR from a write thread + // REQUIRES: use this function of DBImpl::column_family_memtables_ should be + // under a DB mutex OR from a write thread virtual ColumnFamilyHandle* GetColumnFamilyHandle() override; - // REQUIRES: under a DB mutex OR from a write thread - virtual void CheckMemtableFull() override; + // Cannot be called while another thread is calling Seek(). + // REQUIRES: use this function of DBImpl::column_family_memtables_ should be + // under a DB mutex OR from a write thread + virtual ColumnFamilyData* current() { return current_; } private: ColumnFamilySet* column_family_set_; ColumnFamilyData* current_; - FlushScheduler* flush_scheduler_; ColumnFamilyHandleInternal handle_; }; diff --git a/db/db_bench.cc b/db/db_bench.cc index 42667b362..0f455d9c7 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -646,6 +646,20 @@ DEFINE_uint64(delayed_write_rate, 8388608u, "Limited bytes allowed to DB when soft_rate_limit or " "level0_slowdown_writes_trigger triggers"); +DEFINE_bool(allow_concurrent_memtable_write, false, + "Allow multi-writers to update mem tables in parallel."); + +DEFINE_bool(enable_write_thread_adaptive_yield, false, + "Use a yielding spin loop for brief writer thread waits."); + +DEFINE_uint64( + write_thread_max_yield_usec, 100, + "Maximum microseconds for enable_write_thread_adaptive_yield operation."); + +DEFINE_uint64(write_thread_slow_yield_usec, 3, + "The threshold at which a slow yield is considered a signal that " + "other processes or threads want the core."); + DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, "When hard_rate_limit is set then this is the max time a put will" " be stalled."); @@ -2552,6 +2566,12 @@ class Benchmark { options.hard_pending_compaction_bytes_limit = FLAGS_hard_pending_compaction_bytes_limit; options.delayed_write_rate = FLAGS_delayed_write_rate; + options.allow_concurrent_memtable_write = + FLAGS_allow_concurrent_memtable_write; + options.enable_write_thread_adaptive_yield = + FLAGS_enable_write_thread_adaptive_yield; + options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec; + options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec; options.rate_limit_delay_max_milliseconds = FLAGS_rate_limit_delay_max_milliseconds; options.table_cache_numshardbits = FLAGS_table_cache_numshardbits; diff --git a/db/db_impl.cc b/db/db_impl.cc index 25f0fa066..fb179e04b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -247,6 +247,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) max_total_in_memory_state_(0), is_snapshot_supported_(true), write_buffer_(options.db_write_buffer_size), + write_thread_(options.enable_write_thread_adaptive_yield + ? options.write_thread_max_yield_usec + : 0, + options.write_thread_slow_yield_usec), write_controller_(options.delayed_write_rate), last_batch_group_size_(0), unscheduled_flushes_(0), @@ -282,8 +286,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_, &write_controller_)); - column_family_memtables_.reset(new ColumnFamilyMemTablesImpl( - versions_->GetColumnFamilySet(), &flush_scheduler_)); + column_family_memtables_.reset( + new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); DumpRocksDBBuildVersion(db_options_.info_log.get()); DumpDBFileSummary(db_options_, dbname_); @@ -1235,8 +1239,9 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // insert. We don't want to fail the whole write batch in that case -- // we just ignore the update. // That's why we set ignore missing column families to true - status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), true, log_number); + status = + WriteBatchInternal::InsertInto(&batch, column_family_memtables_.get(), + &flush_scheduler_, true, log_number); MaybeIgnoreError(&status); if (!status.ok()) { @@ -1257,7 +1262,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // DB and there is only a single thread operating on DB ColumnFamilyData* cfd; - while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { + while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { cfd->Unref(); // If this asserts, it means that InsertInto failed in // filtering updates to already-flushed column families @@ -3623,6 +3628,9 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, *handle = nullptr; s = CheckCompressionSupported(cf_options); + if (s.ok() && db_options_.allow_concurrent_memtable_write) { + s = CheckConcurrentWritesSupported(cf_options); + } if (!s.ok()) { return s; } @@ -4071,7 +4079,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, w.sync = write_options.sync; w.disableWAL = write_options.disableWAL; w.in_batch_group = false; - w.done = false; w.has_callback = (callback != nullptr) ? true : false; if (!write_options.disableWAL) { @@ -4081,12 +4088,35 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, StopWatch write_sw(env_, db_options_.statistics.get(), DB_WRITE); write_thread_.JoinBatchGroup(&w); - if (w.done) { - // write was done by someone else, no need to grab mutex + if (w.state == WriteThread::STATE_PARALLEL_FOLLOWER) { + // we are a non-leader in a parallel group + PERF_TIMER_GUARD(write_memtable_time); + + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + WriteBatchInternal::SetSequence(w.batch, w.sequence); + w.status = WriteBatchInternal::InsertInto( + w.batch, &column_family_memtables, &flush_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, this, + true /*dont_filter_deletes*/, true /*concurrent_memtable_writes*/); + + if (write_thread_.CompleteParallelWorker(&w)) { + // we're responsible for early exit + auto last_sequence = w.parallel_group->last_writer->sequence; + SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); + versions_->SetLastSequence(last_sequence); + write_thread_.EarlyExitParallelGroup(&w); + } + assert(w.state == WriteThread::STATE_COMPLETED); + // STATE_COMPLETED conditional below handles exit + } + if (w.state == WriteThread::STATE_COMPLETED) { + // write is complete and leader has updated sequence RecordTick(stats_, WRITE_DONE_BY_OTHER); return w.status; } // else we are the leader of the write batch group + assert(w.state == WriteThread::STATE_GROUP_LEADER); WriteContext context; mutex_.Lock(); @@ -4108,9 +4138,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, uint64_t max_total_wal_size = (db_options_.max_total_wal_size == 0) ? 4 * max_total_in_memory_state_ : db_options_.max_total_wal_size; - if (UNLIKELY(!single_column_family_mode_) && - alive_log_files_.begin()->getting_flushed == false && - total_log_size_ > max_total_wal_size) { + if (UNLIKELY(!single_column_family_mode_ && + alive_log_files_.begin()->getting_flushed == false && + total_log_size_ > max_total_wal_size)) { uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number; alive_log_files_.begin()->getting_flushed = true; Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, @@ -4175,8 +4205,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, status = ScheduleFlushes(&context); } - if (UNLIKELY(status.ok()) && - (write_controller_.IsStopped() || write_controller_.NeedsDelay())) { + if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || + write_controller_.NeedsDelay()))) { PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_delay_time); // We don't know size of curent batch so that we always use the size @@ -4194,9 +4224,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool need_log_dir_sync = need_log_sync && !log_dir_synced_; if (status.ok()) { - last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader( - &w, &last_writer, &write_batch_group); - if (need_log_sync) { while (logs_.front().getting_synced) { log_sync_cv_.Wait(); @@ -4226,154 +4253,204 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // At this point the mutex is unlocked + bool exit_completed_early = false; + last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader( + &w, &last_writer, &write_batch_group); + if (status.ok()) { - int total_count = 0; - uint64_t total_byte_size = 0; - for (auto b : write_batch_group) { - total_count += WriteBatchInternal::Count(b); - total_byte_size = WriteBatchInternal::AppendedByteSize( - total_byte_size, WriteBatchInternal::ByteSize(b)); - } + // Rules for when we can update the memtable concurrently + // 1. supported by memtable + // 2. Puts are not okay if inplace_update_support + // 3. Deletes or SingleDeletes are not okay if filtering deletes + // (controlled by both batch and memtable setting) + // 4. Merges are not okay + // + // Rules 1..3 are enforced by checking the options + // during startup (CheckConcurrentWritesSupported), so if + // options.allow_concurrent_memtable_write is true then they can be + // assumed to be true. Rule 4 is checked for each batch. We could + // relax rules 2 and 3 if we could prevent write batches from referring + // more than once to a particular key. + bool parallel = db_options_.allow_concurrent_memtable_write && + write_batch_group.size() > 1; + int total_count = 0; + uint64_t total_byte_size = 0; + for (auto b : write_batch_group) { + total_count += WriteBatchInternal::Count(b); + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(b)); + parallel = parallel && !b->HasMerge(); + } + + const SequenceNumber current_sequence = last_sequence + 1; + last_sequence += total_count; + + // Record statistics + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + PERF_TIMER_STOP(write_pre_and_post_process_time); - const SequenceNumber current_sequence = last_sequence + 1; - last_sequence += total_count; + if (write_options.disableWAL) { + flush_on_destroy_ = true; + } - // Record statistics - RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); - RecordTick(stats_, BYTES_WRITTEN, total_byte_size); - PERF_TIMER_STOP(write_pre_and_post_process_time); + uint64_t log_size = 0; + if (!write_options.disableWAL) { + PERF_TIMER_GUARD(write_wal_time); - if (write_options.disableWAL) { - flush_on_destroy_ = true; + WriteBatch* merged_batch = nullptr; + if (write_batch_group.size() == 1) { + merged_batch = write_batch_group[0]; + } else { + // WAL needs all of the batches flattened into a single batch. + // We could avoid copying here with an iov-like AddRecord + // interface + merged_batch = &tmp_batch_; + for (auto b : write_batch_group) { + WriteBatchInternal::Append(merged_batch, b); + } + } + WriteBatchInternal::SetSequence(merged_batch, current_sequence); + + assert(WriteBatchInternal::Count(merged_batch) == total_count); + assert(WriteBatchInternal::ByteSize(merged_batch) == total_byte_size); + + Slice log_entry = WriteBatchInternal::Contents(merged_batch); + status = logs_.back().writer->AddRecord(log_entry); + total_log_size_ += log_entry.size(); + alive_log_files_.back().AddSize(log_entry.size()); + log_empty_ = false; + log_size = log_entry.size(); + RecordTick(stats_, WAL_FILE_BYTES, log_size); + if (status.ok() && need_log_sync) { + RecordTick(stats_, WAL_FILE_SYNCED); + StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); + // It's safe to access logs_ with unlocked mutex_ here because: + // - we've set getting_synced=true for all logs, + // so other threads won't pop from logs_ while we're here, + // - only writer thread can push to logs_, and we're in + // writer thread, so no one will push to logs_, + // - as long as other threads don't modify it, it's safe to read + // from std::deque from multiple threads concurrently. + for (auto& log : logs_) { + status = log.writer->file()->Sync(db_options_.use_fsync); + if (!status.ok()) { + break; + } + } + if (status.ok() && need_log_dir_sync) { + // We only sync WAL directory the first time WAL syncing is + // requested, so that in case users never turn on WAL sync, + // we can avoid the disk I/O in the write code path. + status = directories_.GetWalDir()->Fsync(); + } } - uint64_t log_size = 0; - if (!write_options.disableWAL) { - PERF_TIMER_GUARD(write_wal_time); + if (merged_batch == &tmp_batch_) { + tmp_batch_.Clear(); + } + } + if (status.ok()) { + PERF_TIMER_GUARD(write_memtable_time); - WriteBatch* merged_batch = nullptr; - if (write_batch_group.size() == 1) { - merged_batch = write_batch_group[0]; - } else { - // WAL needs all of the batches flattened into a single batch. - // We could avoid copying here with an iov-like AddRecord - // interface - merged_batch = &tmp_batch_; - for (auto b : write_batch_group) { - WriteBatchInternal::Append(merged_batch, b); + { + // Update stats while we are an exclusive group leader, so we know + // that nobody else can be writing to these particular stats. + // We're optimistic, updating the stats before we successfully + // commit. That lets us release our leader status early in + // some cases. + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); + stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); + if (!write_options.disableWAL) { + if (write_options.sync) { + stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1); } + stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size); } - WriteBatchInternal::SetSequence(merged_batch, current_sequence); - - assert(WriteBatchInternal::Count(merged_batch) == total_count); - assert(WriteBatchInternal::ByteSize(merged_batch) == total_byte_size); - - Slice log_entry = WriteBatchInternal::Contents(merged_batch); - status = logs_.back().writer->AddRecord(log_entry); - total_log_size_ += log_entry.size(); - alive_log_files_.back().AddSize(log_entry.size()); - log_empty_ = false; - log_size = log_entry.size(); - RecordTick(stats_, WAL_FILE_BYTES, log_size); - if (status.ok() && need_log_sync) { - RecordTick(stats_, WAL_FILE_SYNCED); - StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); - // It's safe to access logs_ with unlocked mutex_ here because: - // - we've set getting_synced=true for all logs, - // so other threads won't pop from logs_ while we're here, - // - only writer thread can push to logs_, and we're in - // writer thread, so no one will push to logs_, - // - as long as other threads don't modify it, it's safe to read - // from std::deque from multiple threads concurrently. - for (auto& log : logs_) { - status = log.writer->file()->Sync(db_options_.use_fsync); - if (!status.ok()) { - break; - } + uint64_t for_other = write_batch_group.size() - 1; + if (for_other > 0) { + stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, for_other); + if (!write_options.disableWAL) { + stats->AddDBStats(InternalStats::WRITE_WITH_WAL, for_other); } - if (status.ok() && need_log_dir_sync) { - // We only sync WAL directory the first time WAL syncing is - // requested, so that in case users never turn on WAL sync, - // we can avoid the disk I/O in the write code path. - status = directories_.GetWalDir()->Fsync(); - } - } - - if (merged_batch == &tmp_batch_) { - tmp_batch_.Clear(); } } - if (status.ok()) { - PERF_TIMER_GUARD(write_memtable_time); + if (!parallel) { status = WriteBatchInternal::InsertInto( write_batch_group, current_sequence, column_family_memtables_.get(), - write_options.ignore_missing_column_families, - /*log_number*/ 0, this, /*dont_filter_deletes*/ false); - - // A non-OK status here indicates that the state implied by the - // WAL has diverged from the in-memory state. This could be - // because of a corrupt write_batch (very bad), or because the - // client specified an invalid column family and didn't specify - // ignore_missing_column_families. - // - // Is setting bg_error_ enough here? This will at least stop - // compaction and fail any further writes. - if (!status.ok() && bg_error_.ok()) { - bg_error_ = status; - } - - SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); + &flush_scheduler_, write_options.ignore_missing_column_families, + 0 /*log_number*/, this, false /*dont_filter_deletes*/); + } else { + WriteThread::ParallelGroup pg{}; + pg.leader = &w; + pg.last_writer = last_writer; + pg.early_exit_allowed = !need_log_sync; + pg.running.store(write_batch_group.size(), std::memory_order_relaxed); + write_thread_.LaunchParallelFollowers(&pg, current_sequence); + + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + assert(w.sequence == current_sequence); + WriteBatchInternal::SetSequence(w.batch, w.sequence); + w.status = WriteBatchInternal::InsertInto( + w.batch, &column_family_memtables, &flush_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, + this, true /*dont_filter_deletes*/, + true /*concurrent_memtable_writes*/); + + assert(last_writer->sequence == last_sequence); + // CompleteParallelWorker returns true if this thread should + // handle exit, false means somebody else did + exit_completed_early = !write_thread_.CompleteParallelWorker(&w); + status = w.status; + assert(status.ok() || !exit_completed_early); } - PERF_TIMER_START(write_pre_and_post_process_time); - mutex_.Lock(); - // internal stats - default_cf_internal_stats_->AddDBStats(InternalStats::BYTES_WRITTEN, - total_byte_size); - default_cf_internal_stats_->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, - total_count); - if (!write_options.disableWAL) { - if (write_options.sync) { - default_cf_internal_stats_->AddDBStats(InternalStats::WAL_FILE_SYNCED, - 1); + if (status.ok() && !exit_completed_early) { + SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); + versions_->SetLastSequence(last_sequence); + if (!need_log_sync) { + write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status); + exit_completed_early = true; } - default_cf_internal_stats_->AddDBStats( - InternalStats::WAL_FILE_BYTES, log_size); } - if (status.ok()) { - versions_->SetLastSequence(last_sequence); + + // A non-OK status here indicates that the state implied by the + // WAL has diverged from the in-memory state. This could be + // because of a corrupt write_batch (very bad), or because the + // client specified an invalid column family and didn't specify + // ignore_missing_column_families. + // + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. + if (!status.ok() && bg_error_.ok()) { + bg_error_ = status; } - } else { - // Operation failed. Make sure sure mutex is held for cleanup code below. - mutex_.Lock(); + } } + PERF_TIMER_START(write_pre_and_post_process_time); if (db_options_.paranoid_checks && !status.ok() && !callback_failed && - !status.IsBusy() && bg_error_.ok()) { - bg_error_ = status; // stop compaction & fail any further writes + !status.IsBusy()) { + mutex_.Lock(); + if (bg_error_.ok()) { + bg_error_ = status; // stop compaction & fail any further writes + } + mutex_.Unlock(); } - mutex_.AssertHeld(); - if (need_log_sync) { + mutex_.Lock(); MarkLogsSynced(logfile_number_, need_log_dir_sync, status); + mutex_.Unlock(); } - uint64_t writes_for_other = write_batch_group.size() - 1; - if (writes_for_other > 0) { - default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, - writes_for_other); - if (!write_options.disableWAL) { - default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, - writes_for_other); - } + if (!exit_completed_early) { + write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status); } - mutex_.Unlock(); - - write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status); - return status; } @@ -4411,7 +4488,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes) { Status DBImpl::ScheduleFlushes(WriteContext* context) { ColumnFamilyData* cfd; - while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { + while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { auto status = SwitchMemtable(cfd, context); if (cfd->Unref()) { delete cfd; @@ -5084,6 +5161,9 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, for (auto& cfd : column_families) { s = CheckCompressionSupported(cfd.options); + if (s.ok() && db_options.allow_concurrent_memtable_write) { + s = CheckConcurrentWritesSupported(cfd.options); + } if (!s.ok()) { return s; } diff --git a/db/flush_scheduler.cc b/db/flush_scheduler.cc index 56816159e..f970f1ca8 100644 --- a/db/flush_scheduler.cc +++ b/db/flush_scheduler.cc @@ -13,51 +13,69 @@ namespace rocksdb { void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) { #ifndef NDEBUG - assert(column_families_set_.find(cfd) == column_families_set_.end()); - column_families_set_.insert(cfd); + { + std::lock_guard lock(checking_mutex_); + assert(checking_set_.count(cfd) == 0); + checking_set_.insert(cfd); + } #endif // NDEBUG cfd->Ref(); - column_families_.push_back(cfd); + Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)}; + while (!head_.compare_exchange_strong( + node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) { + // failing CAS updates the first param, so we are already set for + // retry. TakeNextColumnFamily won't happen until after another + // inter-thread synchronization, so we don't even need release + // semantics for this CAS + } } -ColumnFamilyData* FlushScheduler::GetNextColumnFamily() { - ColumnFamilyData* cfd = nullptr; - while (column_families_.size() > 0) { - cfd = column_families_.front(); - column_families_.pop_front(); - if (cfd->IsDropped()) { - if (cfd->Unref()) { - delete cfd; - cfd = nullptr; - } - } else { - break; +ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { + while (true) { + if (Empty()) { + return nullptr; } - } + + // dequeue the head + Node* node = head_.load(std::memory_order_relaxed); + head_.store(node->next, std::memory_order_relaxed); + ColumnFamilyData* cfd = node->column_family; + delete node; + #ifndef NDEBUG - if (cfd != nullptr) { - auto itr = column_families_set_.find(cfd); - assert(itr != column_families_set_.end()); - column_families_set_.erase(itr); - } + { + auto iter = checking_set_.find(cfd); + assert(iter != checking_set_.end()); + checking_set_.erase(iter); + } #endif // NDEBUG - return cfd; + + if (!cfd->IsDropped()) { + // success + return cfd; + } + + // no longer relevant, retry + if (cfd->Unref()) { + delete cfd; + } + } } -bool FlushScheduler::Empty() { return column_families_.empty(); } +bool FlushScheduler::Empty() { + auto rv = head_.load(std::memory_order_relaxed) == nullptr; + assert(rv == checking_set_.empty()); + return rv; +} void FlushScheduler::Clear() { - for (auto cfd : column_families_) { -#ifndef NDEBUG - auto itr = column_families_set_.find(cfd); - assert(itr != column_families_set_.end()); - column_families_set_.erase(itr); -#endif // NDEBUG + ColumnFamilyData* cfd; + while ((cfd = TakeNextColumnFamily()) != nullptr) { if (cfd->Unref()) { delete cfd; } } - column_families_.clear(); + assert(Empty()); } } // namespace rocksdb diff --git a/db/flush_scheduler.h b/db/flush_scheduler.h index 0c96709b9..dd439e410 100644 --- a/db/flush_scheduler.h +++ b/db/flush_scheduler.h @@ -6,34 +6,42 @@ #pragma once #include -#include +#include +#include #include -#include namespace rocksdb { class ColumnFamilyData; -// This class is thread-compatible. It's should only be accessed from single -// write thread (between BeginWrite() and EndWrite()) +// Unless otherwise noted, all methods on FlushScheduler should be called +// only with the DB mutex held or from a single-threaded recovery context. class FlushScheduler { public: - FlushScheduler() = default; - ~FlushScheduler() = default; + FlushScheduler() : head_(nullptr) {} + // May be called from multiple threads at once, but not concurrent with + // any other method calls on this instance void ScheduleFlush(ColumnFamilyData* cfd); - // Returns Ref()-ed column family. Client needs to Unref() - // REQUIRES: db mutex is held (exception is single-threaded recovery) - ColumnFamilyData* GetNextColumnFamily(); + + // Removes and returns Ref()-ed column family. Client needs to Unref(). + // Filters column families that have been dropped. + ColumnFamilyData* TakeNextColumnFamily(); bool Empty(); void Clear(); private: - std::deque column_families_; + struct Node { + ColumnFamilyData* column_family; + Node* next; + }; + + std::atomic head_; #ifndef NDEBUG - std::set column_families_set_; + std::mutex checking_mutex_; + std::set checking_set_; #endif // NDEBUG }; diff --git a/db/inlineskiplist.h b/db/inlineskiplist.h index a4fb5ab89..201580b10 100644 --- a/db/inlineskiplist.h +++ b/db/inlineskiplist.h @@ -20,10 +20,12 @@ // // Thread safety ------------- // -// Writes require external synchronization, most likely a mutex. Reads -// require a guarantee that the InlineSkipList will not be destroyed while -// the read is in progress. Apart from that, reads progress without any -// internal locking or synchronization. +// Writes via Insert require external synchronization, most likely a mutex. +// InsertConcurrently can be safely called concurrently with reads and +// with other concurrent inserts. Reads require a guarantee that the +// InlineSkipList will not be destroyed while the read is in progress. +// Apart from that, reads progress without any internal locking or +// synchronization. // // Invariants: // @@ -63,16 +65,21 @@ class InlineSkipList { int32_t max_height = 12, int32_t branching_factor = 4); - // Allocates a key and a skip-list node, returning a pointer to the - // key portion of the node. + // Allocates a key and a skip-list node, returning a pointer to the key + // portion of the node. This method is thread-safe if the allocator + // is thread-safe. char* AllocateKey(size_t key_size); // Inserts a key allocated by AllocateKey, after the actual key value // has been filled in. // // REQUIRES: nothing that compares equal to key is currently in the list. + // REQUIRES: no concurrent calls to INSERT void Insert(const char* key); + // Like Insert, but external synchronization is not required. + void InsertConcurrently(const char* key); + // Returns true iff an entry that compares equal to key is in the list. bool Contains(const char* key) const; @@ -124,6 +131,8 @@ class InlineSkipList { }; private: + enum MaxPossibleHeightEnum : uint16_t { kMaxPossibleHeight = 32 }; + const uint16_t kMaxHeight_; const uint16_t kBranching_; const uint32_t kScaledInverseBranching_; @@ -139,11 +148,11 @@ class InlineSkipList { std::atomic max_height_; // Height of the entire list // Used for optimizing sequential insert patterns. Tricky. prev_[i] for - // i up to max_height_ is the predecessor of prev_[0] and prev_height_ - // is the height of prev_[0]. prev_[0] can only be equal to head before - // insertion, in which case max_height_ and prev_height_ are 1. + // i up to max_height_ - 1 (inclusive) is the predecessor of prev_[0]. + // prev_height_ is the height of prev_[0]. prev_[0] can only be equal + // to head when max_height_ and prev_height_ are both 1. Node** prev_; - int32_t prev_height_; + std::atomic prev_height_; inline int GetMaxHeight() const { return max_height_.load(std::memory_order_relaxed); @@ -175,6 +184,15 @@ class InlineSkipList { // Return head_ if list is empty. Node* FindLast() const; + // Traverses a single level of the list, setting *out_prev to the last + // node before the key and *out_next to the first node after. Assumes + // that the key is not present in the skip list. On entry, before should + // point to a node that is before the key, and after should point to + // a node that is after the key. after should be nullptr if a good after + // node isn't conveniently available. + void FindLevelSplice(const char* key, Node* before, Node* after, int level, + Node** out_prev, Node** out_next); + // No copying allowed InlineSkipList(const InlineSkipList&); InlineSkipList& operator=(const InlineSkipList&); @@ -223,6 +241,11 @@ struct InlineSkipList::Node { next_[-n].store(x, std::memory_order_release); } + bool CASNext(int n, Node* expected, Node* x) { + assert(n >= 0); + return next_[-n].compare_exchange_strong(expected, x); + } + // No-barrier variants that can be safely used in a few locations. Node* NoBarrier_Next(int n) { assert(n >= 0); @@ -305,11 +328,13 @@ int InlineSkipList::RandomHeight() { // Increase height with probability 1 in kBranching int height = 1; - while (height < kMaxHeight_ && rnd->Next() < kScaledInverseBranching_) { + while (height < kMaxHeight_ && height < kMaxPossibleHeight && + rnd->Next() < kScaledInverseBranching_) { height++; } assert(height > 0); assert(height <= kMaxHeight_); + assert(height <= kMaxPossibleHeight); return height; } @@ -440,7 +465,7 @@ InlineSkipList::InlineSkipList(const Comparator cmp, max_height_(1), prev_height_(1) { assert(max_height > 0 && kMaxHeight_ == static_cast(max_height)); - assert(branching_factor > 0 && + assert(branching_factor > 1 && kBranching_ == static_cast(branching_factor)); assert(kScaledInverseBranching_ > 0); // Allocate the prev_ Node* array, directly from the passed-in allocator. @@ -485,16 +510,23 @@ InlineSkipList::AllocateNode(size_t key_size, int height) { template void InlineSkipList::Insert(const char* key) { + // InsertConcurrently can't maintain the prev_ invariants when it needs + // to increase max_height_. In that case it sets prev_height_ to zero, + // letting us know that we should ignore it. A relaxed load suffices + // here because write thread synchronization separates Insert calls + // from InsertConcurrently calls. + auto prev_height = prev_height_.load(std::memory_order_relaxed); + // fast path for sequential insertion - if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) && + if (prev_height > 0 && !KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) && (prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) { - assert(prev_[0] != head_ || (prev_height_ == 1 && GetMaxHeight() == 1)); + assert(prev_[0] != head_ || (prev_height == 1 && GetMaxHeight() == 1)); // Outside of this method prev_[1..max_height_] is the predecessor // of prev_[0], and prev_height_ refers to prev_[0]. Inside Insert // prev_[0..max_height - 1] is the predecessor of key. Switch from // the external state to the internal - for (int i = 1; i < prev_height_; i++) { + for (int i = 1; i < prev_height; i++) { prev_[i] = prev_[0]; } } else { @@ -534,7 +566,73 @@ void InlineSkipList::Insert(const char* key) { prev_[i]->SetNext(i, x); } prev_[0] = x; - prev_height_ = height; + prev_height_.store(height, std::memory_order_relaxed); +} + +template +void InlineSkipList::FindLevelSplice(const char* key, Node* before, + Node* after, int level, + Node** out_prev, + Node** out_next) { + while (true) { + Node* next = before->Next(level); + assert(before == head_ || next == nullptr || + KeyIsAfterNode(next->Key(), before)); + assert(before == head_ || KeyIsAfterNode(key, before)); + if (next == after || !KeyIsAfterNode(key, next)) { + // found it + *out_prev = before; + *out_next = next; + return; + } + before = next; + } +} + +template +void InlineSkipList::InsertConcurrently(const char* key) { + Node* x = reinterpret_cast(const_cast(key)) - 1; + int height = x->UnstashHeight(); + assert(height >= 1 && height <= kMaxHeight_); + + int max_height = max_height_.load(std::memory_order_relaxed); + while (height > max_height) { + if (max_height_.compare_exchange_strong(max_height, height)) { + // successfully updated it + max_height = height; + + // we dont have a lock-free algorithm for fixing up prev_, so just + // mark it invalid + prev_height_.store(0, std::memory_order_relaxed); + break; + } + // else retry, possibly exiting the loop because somebody else + // increased it + } + assert(max_height <= kMaxPossibleHeight); + + Node* prev[kMaxPossibleHeight + 1]; + Node* next[kMaxPossibleHeight + 1]; + prev[max_height] = head_; + next[max_height] = nullptr; + for (int i = max_height - 1; i >= 0; --i) { + FindLevelSplice(key, prev[i + 1], next[i + 1], i, &prev[i], &next[i]); + } + for (int i = 0; i < height; ++i) { + while (true) { + x->NoBarrier_SetNext(i, next[i]); + if (prev[i]->CASNext(i, next[i], x)) { + // success + break; + } + // CAS failed, we need to recompute prev and next. It is unlikely + // to be helpful to try to use a different level as we redo the + // search, because it should be unlikely that lots of nodes have + // been inserted between prev[i] and next[i]. No point in using + // next[i] as the after hint, because we know it is stale. + FindLevelSplice(key, prev[i], nullptr, i, &prev[i], &next[i]); + } + } } template diff --git a/db/inlineskiplist_test.cc b/db/inlineskiplist_test.cc index 70fd97a88..5c2dd6fa5 100644 --- a/db/inlineskiplist_test.cc +++ b/db/inlineskiplist_test.cc @@ -10,7 +10,7 @@ #include "db/inlineskiplist.h" #include #include "rocksdb/env.h" -#include "util/arena.h" +#include "util/concurrent_arena.h" #include "util/hash.h" #include "util/random.h" #include "util/testharness.h" @@ -67,7 +67,7 @@ TEST_F(InlineSkipTest, InsertAndLookup) { const int R = 5000; Random rnd(1000); std::set keys; - Arena arena; + ConcurrentArena arena; TestComparator cmp; InlineSkipList list(cmp, &arena); for (int i = 0; i < N; i++) { @@ -167,9 +167,10 @@ TEST_F(InlineSkipTest, InsertAndLookup) { // check that it is either expected given the initial snapshot or has // been concurrently added since the iterator started. class ConcurrentTest { - private: - static const uint32_t K = 4; + public: + static const uint32_t K = 8; + private: static uint64_t key(Key key) { return (key >> 40); } static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; } static uint64_t hash(Key key) { return key & 0xff; } @@ -222,7 +223,7 @@ class ConcurrentTest { // Current state of the test State current_; - Arena arena_; + ConcurrentArena arena_; // InlineSkipList is not protected by mu_. We just use a single writer // thread to modify it. @@ -231,7 +232,7 @@ class ConcurrentTest { public: ConcurrentTest() : list_(TestComparator(), &arena_) {} - // REQUIRES: External synchronization + // REQUIRES: No concurrent calls to WriteStep or ConcurrentWriteStep void WriteStep(Random* rnd) { const uint32_t k = rnd->Next() % K; const int g = current_.Get(k) + 1; @@ -242,6 +243,17 @@ class ConcurrentTest { current_.Set(k, g); } + // REQUIRES: No concurrent calls for the same k + void ConcurrentWriteStep(uint32_t k) { + const int g = current_.Get(k) + 1; + const Key new_key = MakeKey(k, g); + char* buf = list_.AllocateKey(sizeof(Key)); + memcpy(buf, &new_key, sizeof(Key)); + list_.InsertConcurrently(buf); + ASSERT_EQ(g, current_.Get(k) + 1); + current_.Set(k, g); + } + void ReadStep(Random* rnd) { // Remember the initial committed state of the skiplist. State initial_state; @@ -304,7 +316,7 @@ const uint32_t ConcurrentTest::K; // Simple test that does single-threaded testing of the ConcurrentTest // scaffolding. -TEST_F(InlineSkipTest, ConcurrentWithoutThreads) { +TEST_F(InlineSkipTest, ConcurrentReadWithoutThreads) { ConcurrentTest test; Random rnd(test::RandomSeed()); for (int i = 0; i < 10000; i++) { @@ -313,16 +325,33 @@ TEST_F(InlineSkipTest, ConcurrentWithoutThreads) { } } +TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) { + ConcurrentTest test; + Random rnd(test::RandomSeed()); + for (int i = 0; i < 10000; i++) { + test.ReadStep(&rnd); + uint32_t base = rnd.Next(); + for (int j = 0; j < 4; ++j) { + test.ConcurrentWriteStep((base + j) % ConcurrentTest::K); + } + } +} + class TestState { public: ConcurrentTest t_; int seed_; std::atomic quit_flag_; + std::atomic next_writer_; enum ReaderState { STARTING, RUNNING, DONE }; explicit TestState(int s) - : seed_(s), quit_flag_(false), state_(STARTING), state_cv_(&mu_) {} + : seed_(s), + quit_flag_(false), + state_(STARTING), + pending_writers_(0), + state_cv_(&mu_) {} void Wait(ReaderState s) { mu_.Lock(); @@ -339,9 +368,27 @@ class TestState { mu_.Unlock(); } + void AdjustPendingWriters(int delta) { + mu_.Lock(); + pending_writers_ += delta; + if (pending_writers_ == 0) { + state_cv_.Signal(); + } + mu_.Unlock(); + } + + void WaitForPendingWriters() { + mu_.Lock(); + while (pending_writers_ != 0) { + state_cv_.Wait(); + } + mu_.Unlock(); + } + private: port::Mutex mu_; ReaderState state_; + int pending_writers_; port::CondVar state_cv_; }; @@ -357,7 +404,14 @@ static void ConcurrentReader(void* arg) { state->Change(TestState::DONE); } -static void RunConcurrent(int run) { +static void ConcurrentWriter(void* arg) { + TestState* state = reinterpret_cast(arg); + uint32_t k = state->next_writer_++ % ConcurrentTest::K; + state->t_.ConcurrentWriteStep(k); + state->AdjustPendingWriters(-1); +} + +static void RunConcurrentRead(int run) { const int seed = test::RandomSeed() + (run * 100); Random rnd(seed); const int N = 1000; @@ -369,7 +423,7 @@ static void RunConcurrent(int run) { TestState state(seed + 1); Env::Default()->Schedule(ConcurrentReader, &state); state.Wait(TestState::RUNNING); - for (int k = 0; k < kSize; k++) { + for (int k = 0; k < kSize; ++k) { state.t_.WriteStep(&rnd); } state.quit_flag_.store(true, std::memory_order_release); @@ -377,11 +431,41 @@ static void RunConcurrent(int run) { } } -TEST_F(InlineSkipTest, Concurrent1) { RunConcurrent(1); } -TEST_F(InlineSkipTest, Concurrent2) { RunConcurrent(2); } -TEST_F(InlineSkipTest, Concurrent3) { RunConcurrent(3); } -TEST_F(InlineSkipTest, Concurrent4) { RunConcurrent(4); } -TEST_F(InlineSkipTest, Concurrent5) { RunConcurrent(5); } +static void RunConcurrentInsert(int run, int write_parallelism = 4) { + Env::Default()->SetBackgroundThreads(1 + write_parallelism, + Env::Priority::LOW); + const int seed = test::RandomSeed() + (run * 100); + Random rnd(seed); + const int N = 1000; + const int kSize = 1000; + for (int i = 0; i < N; i++) { + if ((i % 100) == 0) { + fprintf(stderr, "Run %d of %d\n", i, N); + } + TestState state(seed + 1); + Env::Default()->Schedule(ConcurrentReader, &state); + state.Wait(TestState::RUNNING); + for (int k = 0; k < kSize; k += write_parallelism) { + state.next_writer_ = rnd.Next(); + state.AdjustPendingWriters(write_parallelism); + for (int p = 0; p < write_parallelism; ++p) { + Env::Default()->Schedule(ConcurrentWriter, &state); + } + state.WaitForPendingWriters(); + } + state.quit_flag_.store(true, std::memory_order_release); + state.Wait(TestState::DONE); + } +} + +TEST_F(InlineSkipTest, ConcurrentRead1) { RunConcurrentRead(1); } +TEST_F(InlineSkipTest, ConcurrentRead2) { RunConcurrentRead(2); } +TEST_F(InlineSkipTest, ConcurrentRead3) { RunConcurrentRead(3); } +TEST_F(InlineSkipTest, ConcurrentRead4) { RunConcurrentRead(4); } +TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); } +TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); } +TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); } +TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); } } // namespace rocksdb diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 3ee0eb931..74aac3649 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -485,14 +485,14 @@ void InternalStats::DumpDBStats(std::string* value) { seconds_up, interval_seconds_up); value->append(buf); // Cumulative - uint64_t user_bytes_written = db_stats_[InternalStats::BYTES_WRITTEN]; - uint64_t num_keys_written = db_stats_[InternalStats::NUMBER_KEYS_WRITTEN]; - uint64_t write_other = db_stats_[InternalStats::WRITE_DONE_BY_OTHER]; - uint64_t write_self = db_stats_[InternalStats::WRITE_DONE_BY_SELF]; - uint64_t wal_bytes = db_stats_[InternalStats::WAL_FILE_BYTES]; - uint64_t wal_synced = db_stats_[InternalStats::WAL_FILE_SYNCED]; - uint64_t write_with_wal = db_stats_[InternalStats::WRITE_WITH_WAL]; - uint64_t write_stall_micros = db_stats_[InternalStats::WRITE_STALL_MICROS]; + uint64_t user_bytes_written = GetDBStats(InternalStats::BYTES_WRITTEN); + uint64_t num_keys_written = GetDBStats(InternalStats::NUMBER_KEYS_WRITTEN); + uint64_t write_other = GetDBStats(InternalStats::WRITE_DONE_BY_OTHER); + uint64_t write_self = GetDBStats(InternalStats::WRITE_DONE_BY_SELF); + uint64_t wal_bytes = GetDBStats(InternalStats::WAL_FILE_BYTES); + uint64_t wal_synced = GetDBStats(InternalStats::WAL_FILE_SYNCED); + uint64_t write_with_wal = GetDBStats(InternalStats::WRITE_WITH_WAL); + uint64_t write_stall_micros = GetDBStats(InternalStats::WRITE_STALL_MICROS); uint64_t compact_bytes_read = 0; uint64_t compact_bytes_write = 0; uint64_t compact_micros = 0; diff --git a/db/internal_stats.h b/db/internal_stats.h index 16aee45a8..9c4414ef1 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -109,24 +109,16 @@ class InternalStats { }; InternalStats(int num_levels, Env* env, ColumnFamilyData* cfd) - : db_stats_(INTERNAL_DB_STATS_ENUM_MAX), - cf_stats_value_(INTERNAL_CF_STATS_ENUM_MAX), - cf_stats_count_(INTERNAL_CF_STATS_ENUM_MAX), + : db_stats_{}, + cf_stats_value_{}, + cf_stats_count_{}, comp_stats_(num_levels), file_read_latency_(num_levels), bg_error_count_(0), number_levels_(num_levels), env_(env), cfd_(cfd), - started_at_(env->NowMicros()) { - for (int i = 0; i< INTERNAL_DB_STATS_ENUM_MAX; ++i) { - db_stats_[i] = 0; - } - for (int i = 0; i< INTERNAL_CF_STATS_ENUM_MAX; ++i) { - cf_stats_value_[i] = 0; - cf_stats_count_[i] = 0; - } - } + started_at_(env->NowMicros()) {} // Per level compaction stats. comp_stats_[level] stores the stats for // compactions that produced data for the specified "level". @@ -239,7 +231,13 @@ class InternalStats { } void AddDBStats(InternalDBStatsType type, uint64_t value) { - db_stats_[type] += value; + auto& v = db_stats_[type]; + v.store(v.load(std::memory_order_relaxed) + value, + std::memory_order_relaxed); + } + + uint64_t GetDBStats(InternalDBStatsType type) { + return db_stats_[type].load(std::memory_order_relaxed); } HistogramImpl* GetFileReadHist(int level) { @@ -264,10 +262,10 @@ class InternalStats { void DumpCFStats(std::string* value); // Per-DB stats - std::vector db_stats_; + std::atomic db_stats_[INTERNAL_DB_STATS_ENUM_MAX]; // Per-ColumnFamily stats - std::vector cf_stats_value_; - std::vector cf_stats_count_; + uint64_t cf_stats_value_[INTERNAL_CF_STATS_ENUM_MAX]; + uint64_t cf_stats_count_[INTERNAL_CF_STATS_ENUM_MAX]; // Per-ColumnFamily/level compaction stats std::vector comp_stats_; std::vector file_read_latency_; diff --git a/db/memtable.cc b/db/memtable.cc index 484e18e91..120a39d1b 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -60,7 +60,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, moptions_(ioptions, mutable_cf_options), refs_(0), kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)), - arena_(moptions_.arena_block_size), + arena_(moptions_.arena_block_size, 0), allocator_(&arena_, write_buffer), table_(ioptions.memtable_factory->CreateMemTableRep( comparator_, &allocator_, ioptions.prefix_extractor, @@ -78,12 +78,12 @@ MemTable::MemTable(const InternalKeyComparator& cmp, ? moptions_.inplace_update_num_locks : 0), prefix_extractor_(ioptions.prefix_extractor), - should_flush_(ShouldFlushNow()), - flush_scheduled_(false), + flush_state_(FLUSH_NOT_REQUESTED), env_(ioptions.env) { - // if should_flush_ == true without an entry inserted, something must have - // gone wrong already. - assert(!should_flush_); + UpdateFlushState(); + // something went wrong if we need to flush before inserting anything + assert(!ShouldScheduleFlush()); + if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom( &allocator_, @@ -167,6 +167,17 @@ bool MemTable::ShouldFlushNow() const { return arena_.AllocatedAndUnused() < kArenaBlockSize / 4; } +void MemTable::UpdateFlushState() { + auto state = flush_state_.load(std::memory_order_relaxed); + if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) { + // ignore CAS failure, because that means somebody else requested + // a flush + flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED, + std::memory_order_relaxed, + std::memory_order_relaxed); + } +} + int MemTable::KeyComparator::operator()(const char* prefix_len_key1, const char* prefix_len_key2) const { // Internal keys are encoded as length-prefixed strings. @@ -335,7 +346,7 @@ uint64_t MemTable::ApproximateSize(const Slice& start_ikey, void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, /* user key */ - const Slice& value) { + const Slice& value, bool allow_concurrent) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] @@ -349,7 +360,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, val_size; char* buf = nullptr; KeyHandle handle = table_->Allocate(encoded_len, &buf); - assert(buf != nullptr); + char* p = EncodeVarint32(buf, internal_key_size); memcpy(p, key.data(), key_size); p += key_size; @@ -359,32 +370,64 @@ void MemTable::Add(SequenceNumber s, ValueType type, p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len); - table_->Insert(handle); - num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1, + if (!allow_concurrent) { + table_->Insert(handle); + + // this is a bit ugly, but is the way to avoid locked instructions + // when incrementing an atomic + num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1, + std::memory_order_relaxed); + data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len, std::memory_order_relaxed); - data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len, - std::memory_order_relaxed); - if (type == kTypeDeletion) { - num_deletes_++; - } + if (type == kTypeDeletion) { + num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1, + std::memory_order_relaxed); + } - if (prefix_bloom_) { - assert(prefix_extractor_); - prefix_bloom_->Add(prefix_extractor_->Transform(key)); - } + if (prefix_bloom_) { + assert(prefix_extractor_); + prefix_bloom_->Add(prefix_extractor_->Transform(key)); + } - // The first sequence number inserted into the memtable - assert(first_seqno_ == 0 || s > first_seqno_); - if (first_seqno_ == 0) { - first_seqno_ = s; + // The first sequence number inserted into the memtable + assert(first_seqno_ == 0 || s > first_seqno_); + if (first_seqno_ == 0) { + first_seqno_.store(s, std::memory_order_relaxed); - if (earliest_seqno_ == kMaxSequenceNumber) { - earliest_seqno_ = first_seqno_; + if (earliest_seqno_ == kMaxSequenceNumber) { + earliest_seqno_.store(GetFirstSequenceNumber(), + std::memory_order_relaxed); + } + assert(first_seqno_.load() >= earliest_seqno_.load()); + } + } else { + table_->InsertConcurrently(handle); + + num_entries_.fetch_add(1, std::memory_order_relaxed); + data_size_.fetch_add(encoded_len, std::memory_order_relaxed); + if (type == kTypeDeletion) { + num_deletes_.fetch_add(1, std::memory_order_relaxed); + } + + if (prefix_bloom_) { + assert(prefix_extractor_); + prefix_bloom_->AddConcurrently(prefix_extractor_->Transform(key)); + } + + // atomically update first_seqno_ and earliest_seqno_. + uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed); + while ((cur_seq_num == 0 || s < cur_seq_num) && + !first_seqno_.compare_exchange_weak(cur_seq_num, s)) { + } + uint64_t cur_earliest_seqno = + earliest_seqno_.load(std::memory_order_relaxed); + while ( + (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) && + !first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) { } - assert(first_seqno_ >= earliest_seqno_); } - should_flush_ = ShouldFlushNow(); + UpdateFlushState(); } // Callback from MemTable::Get() @@ -685,16 +728,16 @@ bool MemTable::UpdateCallback(SequenceNumber seq, } } RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); - should_flush_ = ShouldFlushNow(); + UpdateFlushState(); return true; } else if (status == UpdateStatus::UPDATED) { Add(seq, kTypeValue, key, Slice(str_value)); RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); - should_flush_ = ShouldFlushNow(); + UpdateFlushState(); return true; } else if (status == UpdateStatus::UPDATE_FAILED) { // No action required. Return. - should_flush_ = ShouldFlushNow(); + UpdateFlushState(); return true; } } diff --git a/db/memtable.h b/db/memtable.h index af3b1f945..110985620 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -8,10 +8,11 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once -#include -#include -#include +#include #include +#include +#include +#include #include #include "db/dbformat.h" #include "db/skiplist.h" @@ -21,8 +22,9 @@ #include "rocksdb/memtablerep.h" #include "rocksdb/immutable_options.h" #include "db/memtable_allocator.h" -#include "util/arena.h" +#include "util/concurrent_arena.h" #include "util/dynamic_bloom.h" +#include "util/instrumented_mutex.h" #include "util/mutable_cf_options.h" namespace rocksdb { @@ -124,10 +126,17 @@ class MemTable { // This method heuristically determines if the memtable should continue to // host more data. bool ShouldScheduleFlush() const { - return flush_scheduled_ == false && should_flush_; + return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED; } - void MarkFlushScheduled() { flush_scheduled_ = true; } + // Returns true if a flush should be scheduled and the caller should + // be the one to schedule it + bool MarkFlushScheduled() { + auto before = FLUSH_REQUESTED; + return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED, + std::memory_order_relaxed, + std::memory_order_relaxed); + } // Return an iterator that yields the contents of the memtable. // @@ -147,11 +156,10 @@ class MemTable { // specified sequence number and with the specified type. // Typically value will be empty if type==kTypeDeletion. // - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable. - void Add(SequenceNumber seq, ValueType type, - const Slice& key, - const Slice& value); + // REQUIRES: if allow_concurrent = false, external synchronization to prevent + // simultaneous operations on the same MemTable. + void Add(SequenceNumber seq, ValueType type, const Slice& key, + const Slice& value, bool allow_concurrent = false); // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error @@ -220,7 +228,9 @@ class MemTable { // Get total number of deletes in the mem table. // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable (unless this Memtable is immutable). - uint64_t num_deletes() const { return num_deletes_; } + uint64_t num_deletes() const { + return num_deletes_.load(std::memory_order_relaxed); + } // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } @@ -234,7 +244,9 @@ class MemTable { // into the memtable. // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable (unless this Memtable is immutable). - SequenceNumber GetFirstSequenceNumber() { return first_seqno_; } + SequenceNumber GetFirstSequenceNumber() { + return first_seqno_.load(std::memory_order_relaxed); + } // Returns the sequence number that is guaranteed to be smaller than or equal // to the sequence number of any key that could be inserted into this @@ -243,7 +255,9 @@ class MemTable { // // If the earliest sequence number could not be determined, // kMaxSequenceNumber will be returned. - SequenceNumber GetEarliestSequenceNumber() { return earliest_seqno_; } + SequenceNumber GetEarliestSequenceNumber() { + return earliest_seqno_.load(std::memory_order_relaxed); + } // Returns the next active logfile number when this memtable is about to // be flushed to storage @@ -290,8 +304,7 @@ class MemTable { const MemTableOptions* GetMemTableOptions() const { return &moptions_; } private: - // Dynamically check if we can add more incoming entries - bool ShouldFlushNow() const; + enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; friend class MemTableIterator; friend class MemTableBackwardIterator; @@ -301,14 +314,14 @@ class MemTable { const MemTableOptions moptions_; int refs_; const size_t kArenaBlockSize; - Arena arena_; + ConcurrentArena arena_; MemTableAllocator allocator_; unique_ptr table_; // Total data size of all data inserted std::atomic data_size_; std::atomic num_entries_; - uint64_t num_deletes_; + std::atomic num_deletes_; // These are used to manage memtable flushes to storage bool flush_in_progress_; // started the flush @@ -320,11 +333,11 @@ class MemTable { VersionEdit edit_; // The sequence number of the kv that was inserted first - SequenceNumber first_seqno_; + std::atomic first_seqno_; // The db sequence number at the time of creation or kMaxSequenceNumber // if not set. - SequenceNumber earliest_seqno_; + std::atomic earliest_seqno_; // The log files earlier than this number can be deleted. uint64_t mem_next_logfile_number_; @@ -332,19 +345,22 @@ class MemTable { // rw locks for inplace updates std::vector locks_; - // No copying allowed - MemTable(const MemTable&); - void operator=(const MemTable&); - const SliceTransform* const prefix_extractor_; std::unique_ptr prefix_bloom_; - // a flag indicating if a memtable has met the criteria to flush - bool should_flush_; + std::atomic flush_state_; - // a flag indicating if flush has been scheduled - bool flush_scheduled_; Env* env_; + + // Returns a heuristic flush decision + bool ShouldFlushNow() const; + + // Updates flush_state_ using ShouldFlushNow() + void UpdateFlushState(); + + // No copying allowed + MemTable(const MemTable&); + MemTable& operator=(const MemTable&); }; extern const char* EncodeKey(std::string* scratch, const Slice& target); diff --git a/db/memtable_allocator.cc b/db/memtable_allocator.cc index d3ecea2fd..1ed2019b6 100644 --- a/db/memtable_allocator.cc +++ b/db/memtable_allocator.cc @@ -7,46 +7,42 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include - #include "db/memtable_allocator.h" + +#include #include "db/writebuffer.h" #include "util/arena.h" namespace rocksdb { -MemTableAllocator::MemTableAllocator(Arena* arena, WriteBuffer* write_buffer) - : arena_(arena), write_buffer_(write_buffer), bytes_allocated_(0) { -} +MemTableAllocator::MemTableAllocator(Allocator* allocator, + WriteBuffer* write_buffer) + : allocator_(allocator), write_buffer_(write_buffer), bytes_allocated_(0) {} -MemTableAllocator::~MemTableAllocator() { - DoneAllocating(); -} +MemTableAllocator::~MemTableAllocator() { DoneAllocating(); } char* MemTableAllocator::Allocate(size_t bytes) { assert(write_buffer_ != nullptr); - bytes_allocated_ += bytes; + bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed); write_buffer_->ReserveMem(bytes); - return arena_->Allocate(bytes); + return allocator_->Allocate(bytes); } char* MemTableAllocator::AllocateAligned(size_t bytes, size_t huge_page_size, Logger* logger) { assert(write_buffer_ != nullptr); - bytes_allocated_ += bytes; + bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed); write_buffer_->ReserveMem(bytes); - return arena_->AllocateAligned(bytes, huge_page_size, logger); + return allocator_->AllocateAligned(bytes, huge_page_size, logger); } void MemTableAllocator::DoneAllocating() { if (write_buffer_ != nullptr) { - write_buffer_->FreeMem(bytes_allocated_); + write_buffer_->FreeMem(bytes_allocated_.load(std::memory_order_relaxed)); write_buffer_ = nullptr; } } -size_t MemTableAllocator::BlockSize() const { - return arena_->BlockSize(); -} +size_t MemTableAllocator::BlockSize() const { return allocator_->BlockSize(); } } // namespace rocksdb diff --git a/db/memtable_allocator.h b/db/memtable_allocator.h index fa8ee1287..c2cf130cc 100644 --- a/db/memtable_allocator.h +++ b/db/memtable_allocator.h @@ -11,17 +11,18 @@ // to WriteBuffer so we can track and enforce overall write buffer limits. #pragma once + +#include #include "util/allocator.h" namespace rocksdb { -class Arena; class Logger; class WriteBuffer; class MemTableAllocator : public Allocator { public: - explicit MemTableAllocator(Arena* arena, WriteBuffer* write_buffer); + explicit MemTableAllocator(Allocator* allocator, WriteBuffer* write_buffer); ~MemTableAllocator(); // Allocator interface @@ -35,9 +36,9 @@ class MemTableAllocator : public Allocator { void DoneAllocating(); private: - Arena* arena_; + Allocator* allocator_; WriteBuffer* write_buffer_; - size_t bytes_allocated_; + std::atomic bytes_allocated_; // No copying allowed MemTableAllocator(const MemTableAllocator&); diff --git a/db/repair.cc b/db/repair.cc index db8650e18..1805059a7 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -270,7 +270,7 @@ class Repairer { continue; } WriteBatchInternal::SetContents(&batch, record); - status = WriteBatchInternal::InsertInto(&batch, cf_mems_default); + status = WriteBatchInternal::InsertInto(&batch, cf_mems_default, nullptr); if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { diff --git a/db/write_batch.cc b/db/write_batch.cc index ade89aa31..0565c0599 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -28,10 +28,12 @@ #include #include +#include #include "db/column_family.h" #include "db/db_impl.h" #include "db/dbformat.h" +#include "db/flush_scheduler.h" #include "db/memtable.h" #include "db/snapshot_impl.h" #include "db/write_batch_internal.h" @@ -536,35 +538,42 @@ Status WriteBatch::RollbackToSavePoint() { } namespace { -// This class can *only* be used from a single-threaded write thread, because it -// calls ColumnFamilyMemTablesImpl::Seek() class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; - ColumnFamilyMemTables* cf_mems_; - bool ignore_missing_column_families_; - uint64_t log_number_; + ColumnFamilyMemTables* const cf_mems_; + FlushScheduler* const flush_scheduler_; + const bool ignore_missing_column_families_; + const uint64_t log_number_; DBImpl* db_; const bool dont_filter_deletes_; + const bool concurrent_memtable_writes_; + // cf_mems should not be shared with concurrent inserters MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, + FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t log_number, - DB* db, const bool dont_filter_deletes) + DB* db, const bool dont_filter_deletes, + bool concurrent_memtable_writes) : sequence_(sequence), cf_mems_(cf_mems), + flush_scheduler_(flush_scheduler), ignore_missing_column_families_(ignore_missing_column_families), log_number_(log_number), db_(reinterpret_cast(db)), - dont_filter_deletes_(dont_filter_deletes) { - assert(cf_mems); + dont_filter_deletes_(dont_filter_deletes), + concurrent_memtable_writes_(concurrent_memtable_writes) { + assert(cf_mems_); if (!dont_filter_deletes_) { assert(db_); } } bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { - // We are only allowed to call this from a single-threaded write thread - // (or while holding DB mutex) + // If we are in a concurrent mode, it is the caller's responsibility + // to clone the original ColumnFamilyMemTables so that each thread + // has its own instance. Otherwise, it must be guaranteed that there + // is no concurrent access bool found = cf_mems_->Seek(column_family_id); if (!found) { if (ignore_missing_column_families_) { @@ -598,11 +607,13 @@ class MemTableInserter : public WriteBatch::Handler { MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!moptions->inplace_update_support) { - mem->Add(sequence_, kTypeValue, key, value); + mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_); } else if (moptions->inplace_callback == nullptr) { + assert(!concurrent_memtable_writes_); mem->Update(sequence_, key, value); RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED); } else { + assert(!concurrent_memtable_writes_); if (mem->UpdateCallback(sequence_, key, value)) { } else { // key not found in memtable. Do sst get, update, add @@ -640,7 +651,7 @@ class MemTableInserter : public WriteBatch::Handler { // sequence number. Even if the update eventually fails and does not result // in memtable add/update. sequence_++; - cf_mems_->CheckMemtableFull(); + CheckMemtableFull(); return Status::OK(); } @@ -654,6 +665,7 @@ class MemTableInserter : public WriteBatch::Handler { MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!dont_filter_deletes_ && moptions->filter_deletes) { + assert(!concurrent_memtable_writes_); SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; ReadOptions ropts; @@ -668,9 +680,9 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } } - mem->Add(sequence_, delete_type, key, Slice()); + mem->Add(sequence_, delete_type, key, Slice(), concurrent_memtable_writes_); sequence_++; - cf_mems_->CheckMemtableFull(); + CheckMemtableFull(); return Status::OK(); } @@ -686,6 +698,7 @@ class MemTableInserter : public WriteBatch::Handler { virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { + assert(!concurrent_memtable_writes_); Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; @@ -760,24 +773,38 @@ class MemTableInserter : public WriteBatch::Handler { } sequence_++; - cf_mems_->CheckMemtableFull(); + CheckMemtableFull(); return Status::OK(); } + + void CheckMemtableFull() { + if (flush_scheduler_ != nullptr) { + auto* cfd = cf_mems_->current(); + assert(cfd != nullptr); + if (cfd->mem()->ShouldScheduleFlush() && + cfd->mem()->MarkFlushScheduled()) { + // MarkFlushScheduled only returns true if we are the one that + // should take action, so no need to dedup further + flush_scheduler_->ScheduleFlush(cfd); + } + } + } }; } // namespace // This function can only be called in these conditions: // 1) During Recovery() // 2) During Write(), in a single-threaded write thread +// 3) During Write(), in a concurrent context where memtables has been cloned // The reason is that it calls memtables->Seek(), which has a stateful cache -Status WriteBatchInternal::InsertInto(const autovector& batches, - SequenceNumber sequence, - ColumnFamilyMemTables* memtables, - bool ignore_missing_column_families, - uint64_t log_number, DB* db, - const bool dont_filter_deletes) { - MemTableInserter inserter(sequence, memtables, ignore_missing_column_families, - log_number, db, dont_filter_deletes); +Status WriteBatchInternal::InsertInto( + const autovector& batches, SequenceNumber sequence, + ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, + bool ignore_missing_column_families, uint64_t log_number, DB* db, + const bool dont_filter_deletes, bool concurrent_memtable_writes) { + MemTableInserter inserter(sequence, memtables, flush_scheduler, + ignore_missing_column_families, log_number, db, + dont_filter_deletes, concurrent_memtable_writes); Status rv = Status::OK(); for (size_t i = 0; i < batches.size() && rv.ok(); ++i) { rv = batches[i]->Iterate(&inserter); @@ -787,12 +814,15 @@ Status WriteBatchInternal::InsertInto(const autovector& batches, Status WriteBatchInternal::InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t log_number, DB* db, - const bool dont_filter_deletes) { + const bool dont_filter_deletes, + bool concurrent_memtable_writes) { MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables, - ignore_missing_column_families, log_number, db, - dont_filter_deletes); + flush_scheduler, ignore_missing_column_families, + log_number, db, dont_filter_deletes, + concurrent_memtable_writes); return batch->Iterate(&inserter); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 3ae4edc7a..d75d2ef65 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include "rocksdb/types.h" #include "rocksdb/write_batch.h" #include "rocksdb/db.h" @@ -17,6 +18,8 @@ namespace rocksdb { class MemTable; +class FlushScheduler; +class ColumnFamilyData; class ColumnFamilyMemTables { public: @@ -28,7 +31,7 @@ class ColumnFamilyMemTables { virtual uint64_t GetLogNumber() const = 0; virtual MemTable* GetMemTable() const = 0; virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0; - virtual void CheckMemtableFull() = 0; + virtual ColumnFamilyData* current() { return nullptr; } }; class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { @@ -50,8 +53,6 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; } - void CheckMemtableFull() override {} - private: bool ok_; MemTable* mem_; @@ -127,19 +128,29 @@ class WriteBatchInternal { // // If log_number is non-zero, the memtable will be updated only if // memtables->GetLogNumber() >= log_number. + // + // If flush_scheduler is non-null, it will be invoked if the memtable + // should be flushed. + // + // Under concurrent use, the caller is responsible for making sure that + // the memtables object itself is thread-local. static Status InsertInto(const autovector& batches, SequenceNumber sequence, ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, - const bool dont_filter_deletes = true); + const bool dont_filter_deletes = true, + bool concurrent_memtable_writes = false); // Convenience form of InsertInto when you have only one batch static Status InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, - const bool dont_filter_deletes = true); + const bool dont_filter_deletes = true, + bool concurrent_memtable_writes = false); static void Append(WriteBatch* dst, const WriteBatch* src); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 62830da48..5d008b3a4 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -37,7 +37,7 @@ static std::string PrintContents(WriteBatch* b) { mem->Ref(); std::string state; ColumnFamilyMemTablesDefault cf_mems_default(mem); - Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default); + Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr); int count = 0; int put_count = 0; int delete_count = 0; diff --git a/db/write_thread.cc b/db/write_thread.cc index cbfd2646f..a5285ce99 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -4,34 +4,197 @@ // of patent rights can be found in the PATENTS file in the same directory. #include "db/write_thread.h" +#include +#include +#include +#include "db/column_family.h" +#include "port/port.h" #include "util/sync_point.h" namespace rocksdb { -void WriteThread::Await(Writer* w) { - std::unique_lock guard(w->JoinMutex()); - w->JoinCV().wait(guard, [w] { return w->joined; }); +WriteThread::WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec) + : max_yield_usec_(max_yield_usec), + slow_yield_usec_(slow_yield_usec), + newest_writer_(nullptr) {} + +uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) { + // We're going to block. Lazily create the mutex. We guarantee + // propagation of this construction to the waker via the + // STATE_LOCKED_WAITING state. The waker won't try to touch the mutex + // or the condvar unless they CAS away the STATE_LOCKED_WAITING that + // we install below. + w->CreateMutex(); + + auto state = w->state.load(std::memory_order_acquire); + assert(state != STATE_LOCKED_WAITING); + if ((state & goal_mask) == 0 && + w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) { + // we have permission (and an obligation) to use StateMutex + std::unique_lock guard(w->StateMutex()); + w->StateCV().wait(guard, [w] { + return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING; + }); + state = w->state.load(std::memory_order_relaxed); + } + // else tricky. Goal is met or CAS failed. In the latter case the waker + // must have changed the state, and compare_exchange_strong has updated + // our local variable with the new one. At the moment WriteThread never + // waits for a transition across intermediate states, so we know that + // since a state change has occurred the goal must have been met. + assert((state & goal_mask) != 0); + return state; +} + +uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask, + AdaptationContext* ctx) { + uint8_t state; + + // On a modern Xeon each loop takes about 7 nanoseconds (most of which + // is the effect of the pause instruction), so 200 iterations is a bit + // more than a microsecond. This is long enough that waits longer than + // this can amortize the cost of accessing the clock and yielding. + for (uint32_t tries = 0; tries < 200; ++tries) { + state = w->state.load(std::memory_order_acquire); + if ((state & goal_mask) != 0) { + return state; + } + port::AsmVolatilePause(); + } + + // If we're only going to end up waiting a short period of time, + // it can be a lot more efficient to call std::this_thread::yield() + // in a loop than to block in StateMutex(). For reference, on my 4.0 + // SELinux test server with support for syscall auditing enabled, the + // minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is + // 2.7 usec, and the average is more like 10 usec. That can be a big + // drag on RockDB's single-writer design. Of course, spinning is a + // bad idea if other threads are waiting to run or if we're going to + // wait for a long time. How do we decide? + // + // We break waiting into 3 categories: short-uncontended, + // short-contended, and long. If we had an oracle, then we would always + // spin for short-uncontended, always block for long, and our choice for + // short-contended might depend on whether we were trying to optimize + // RocksDB throughput or avoid being greedy with system resources. + // + // Bucketing into short or long is easy by measuring elapsed time. + // Differentiating short-uncontended from short-contended is a bit + // trickier, but not too bad. We could look for involuntary context + // switches using getrusage(RUSAGE_THREAD, ..), but it's less work + // (portability code and CPU) to just look for yield calls that take + // longer than we expect. sched_yield() doesn't actually result in any + // context switch overhead if there are no other runnable processes + // on the current core, in which case it usually takes less than + // a microsecond. + // + // There are two primary tunables here: the threshold between "short" + // and "long" waits, and the threshold at which we suspect that a yield + // is slow enough to indicate we should probably block. If these + // thresholds are chosen well then CPU-bound workloads that don't + // have more threads than cores will experience few context switches + // (voluntary or involuntary), and the total number of context switches + // (voluntary and involuntary) will not be dramatically larger (maybe + // 2x) than the number of voluntary context switches that occur when + // --max_yield_wait_micros=0. + // + // There's another constant, which is the number of slow yields we will + // tolerate before reversing our previous decision. Solitary slow + // yields are pretty common (low-priority small jobs ready to run), + // so this should be at least 2. We set this conservatively to 3 so + // that we can also immediately schedule a ctx adaptation, rather than + // waiting for the next update_ctx. + + const size_t kMaxSlowYieldsWhileSpinning = 3; + + bool update_ctx = false; + bool would_spin_again = false; + + if (max_yield_usec_ > 0) { + update_ctx = Random::GetTLSInstance()->OneIn(256); + + if (update_ctx || ctx->value.load(std::memory_order_relaxed) >= 0) { + // we're updating the adaptation statistics, or spinning has > + // 50% chance of being shorter than max_yield_usec_ and causing no + // involuntary context switches + auto spin_begin = std::chrono::steady_clock::now(); + + // this variable doesn't include the final yield (if any) that + // causes the goal to be met + size_t slow_yield_count = 0; + + auto iter_begin = spin_begin; + while ((iter_begin - spin_begin) <= + std::chrono::microseconds(max_yield_usec_)) { + std::this_thread::yield(); + + state = w->state.load(std::memory_order_acquire); + if ((state & goal_mask) != 0) { + // success + would_spin_again = true; + break; + } + + auto now = std::chrono::steady_clock::now(); + if (now == iter_begin || + now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) { + // conservatively count it as a slow yield if our clock isn't + // accurate enough to measure the yield duration + ++slow_yield_count; + if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) { + // Not just one ivcsw, but several. Immediately update ctx + // and fall back to blocking + update_ctx = true; + break; + } + } + iter_begin = now; + } + } + } + + if ((state & goal_mask) == 0) { + state = BlockingAwaitState(w, goal_mask); + } + + if (update_ctx) { + auto v = ctx->value.load(std::memory_order_relaxed); + // fixed point exponential decay with decay constant 1/1024, with +1 + // and -1 scaled to avoid overflow for int32_t + v = v + (v / 1024) + (would_spin_again ? 1 : -1) * 16384; + ctx->value.store(v, std::memory_order_relaxed); + } + + assert((state & goal_mask) != 0); + return state; } -void WriteThread::MarkJoined(Writer* w) { - std::lock_guard guard(w->JoinMutex()); - assert(!w->joined); - w->joined = true; - w->JoinCV().notify_one(); +void WriteThread::SetState(Writer* w, uint8_t new_state) { + auto state = w->state.load(std::memory_order_acquire); + if (state == STATE_LOCKED_WAITING || + !w->state.compare_exchange_strong(state, new_state)) { + assert(state == STATE_LOCKED_WAITING); + + std::lock_guard guard(w->StateMutex()); + assert(w->state.load(std::memory_order_relaxed) != new_state); + w->state.store(new_state, std::memory_order_relaxed); + w->StateCV().notify_one(); + } } -void WriteThread::LinkOne(Writer* w, bool* wait_needed) { - assert(!w->joined && !w->done); +void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) { + assert(w->state == STATE_INIT); Writer* writers = newest_writer_.load(std::memory_order_relaxed); while (true) { w->link_older = writers; - if (writers != nullptr) { - w->CreateMutex(); - } if (newest_writer_.compare_exchange_strong(writers, w)) { - // Success. - *wait_needed = (writers != nullptr); + if (writers == nullptr) { + // this isn't part of the WriteThread machinery, but helps with + // debugging and is checked by an assert in WriteImpl + w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed); + } + *linked_as_leader = (writers == nullptr); return; } } @@ -50,11 +213,15 @@ void WriteThread::CreateMissingNewerLinks(Writer* head) { } void WriteThread::JoinBatchGroup(Writer* w) { + static AdaptationContext ctx{"JoinBatchGroup"}; + assert(w->batch != nullptr); - bool wait_needed; - LinkOne(w, &wait_needed); - if (wait_needed) { - Await(w); + bool linked_as_leader; + LinkOne(w, &linked_as_leader); + if (!linked_as_leader) { + AwaitState(w, + STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED, + &ctx); } } @@ -88,7 +255,7 @@ size_t WriteThread::EnterAsBatchGroupLeader( // This is safe regardless of any db mutex status of the caller. Previous // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks // (they emptied the list and then we added ourself as leader) or had to - // explicitly wake up us (the list was non-empty when we added ourself, + // explicitly wake us up (the list was non-empty when we added ourself, // so we have already received our MarkJoined). CreateMissingNewerLinks(newest_writer); @@ -135,6 +302,73 @@ size_t WriteThread::EnterAsBatchGroupLeader( return size; } +void WriteThread::LaunchParallelFollowers(ParallelGroup* pg, + SequenceNumber sequence) { + // EnterAsBatchGroupLeader already created the links from leader to + // newer writers in the group + + pg->leader->parallel_group = pg; + + Writer* w = pg->leader; + w->sequence = sequence; + + while (w != pg->last_writer) { + sequence += WriteBatchInternal::Count(w->batch); + w = w->link_newer; + + w->sequence = sequence; + w->parallel_group = pg; + SetState(w, STATE_PARALLEL_FOLLOWER); + } +} + +bool WriteThread::CompleteParallelWorker(Writer* w) { + static AdaptationContext ctx{"CompleteParallelWorker"}; + + auto* pg = w->parallel_group; + if (!w->status.ok()) { + std::lock_guard guard(w->StateMutex()); + pg->status = w->status; + } + auto leader = pg->leader; + auto early_exit_allowed = pg->early_exit_allowed; + + if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) { + // we're not the last one + AwaitState(w, STATE_COMPLETED, &ctx); + + // Caller only needs to perform exit duties if early exit doesn't + // apply and this is the leader. Can't touch pg here. Whoever set + // our state to STATE_COMPLETED copied pg->status to w.status for us. + return w == leader && !(early_exit_allowed && w->status.ok()); + } + // else we're the last parallel worker + + if (w == leader || (early_exit_allowed && pg->status.ok())) { + // this thread should perform exit duties + w->status = pg->status; + return true; + } else { + // We're the last parallel follower but early commit is not + // applicable. Wake up the leader and then wait for it to exit. + assert(w->state == STATE_PARALLEL_FOLLOWER); + SetState(leader, STATE_COMPLETED); + AwaitState(w, STATE_COMPLETED, &ctx); + return false; + } +} + +void WriteThread::EarlyExitParallelGroup(Writer* w) { + auto* pg = w->parallel_group; + + assert(w->state == STATE_PARALLEL_FOLLOWER); + assert(pg->status.ok()); + ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status); + assert(w->state == STATE_COMPLETED); + assert(w->status.ok()); + SetState(pg->leader, STATE_COMPLETED); +} + void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, Status status) { assert(leader->link_older == nullptr); @@ -166,31 +400,35 @@ void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, // nullptr when they enqueued (we were definitely enqueued before them // and are still in the list). That means leader handoff occurs when // we call MarkJoined - MarkJoined(last_writer->link_newer); + SetState(last_writer->link_newer, STATE_GROUP_LEADER); } // else nobody else was waiting, although there might already be a new // leader now while (last_writer != leader) { last_writer->status = status; - last_writer->done = true; - // We must read link_older before calling MarkJoined, because as - // soon as it is marked the other thread's AwaitJoined may return - // and deallocate the Writer. + + // we need to read link_older before calling SetState, because as soon + // as it is marked committed the other thread's Await may return and + // deallocate the Writer. auto next = last_writer->link_older; - MarkJoined(last_writer); + SetState(last_writer, STATE_COMPLETED); + last_writer = next; } } void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { + static AdaptationContext ctx{"EnterUnbatched"}; + static std::atomic adaptation_history{}; + assert(w->batch == nullptr); - bool wait_needed; - LinkOne(w, &wait_needed); - if (wait_needed) { + bool linked_as_leader; + LinkOne(w, &linked_as_leader); + if (!linked_as_leader) { mu->Unlock(); TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait"); - Await(w); + AwaitState(w, STATE_GROUP_LEADER, &ctx); mu->Lock(); } } diff --git a/db/write_thread.h b/db/write_thread.h index 3a15ea847..2fdd4a02f 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -8,11 +8,13 @@ #include #include #include +#include #include #include +#include #include -#include "rocksdb/status.h" #include "db/write_batch_internal.h" +#include "rocksdb/status.h" #include "util/autovector.h" #include "util/instrumented_mutex.h" @@ -20,19 +22,69 @@ namespace rocksdb { class WriteThread { public: + enum State : uint8_t { + // The initial state of a writer. This is a Writer that is + // waiting in JoinBatchGroup. This state can be left when another + // thread informs the waiter that it has become a group leader + // (-> STATE_GROUP_LEADER), when a leader that has chosen to be + // non-parallel informs a follower that its writes have been committed + // (-> STATE_COMPLETED), or when a leader that has chosen to perform + // updates in parallel and needs this Writer to apply its batch (-> + // STATE_PARALLEL_FOLLOWER). + STATE_INIT = 1, + + // The state used to inform a waiting Writer that it has become the + // leader, and it should now build a write batch group. Tricky: + // this state is not used if newest_writer_ is empty when a writer + // enqueues itself, because there is no need to wait (or even to + // create the mutex and condvar used to wait) in that case. This is + // a terminal state unless the leader chooses to make this a parallel + // batch, in which case the last parallel worker to finish will move + // the leader to STATE_COMPLETED. + STATE_GROUP_LEADER = 2, + + // A Writer that has returned as a follower in a parallel group. + // It should apply its batch to the memtable and then call + // CompleteParallelWorker. When someone calls ExitAsBatchGroupLeader + // or EarlyExitParallelGroup this state will get transitioned to + // STATE_COMPLETED. + STATE_PARALLEL_FOLLOWER = 4, + + // A follower whose writes have been applied, or a parallel leader + // whose followers have all finished their work. This is a terminal + // state. + STATE_COMPLETED = 8, + + // A state indicating that the thread may be waiting using StateMutex() + // and StateCondVar() + STATE_LOCKED_WAITING = 16, + }; + + struct Writer; + + struct ParallelGroup { + Writer* leader; + Writer* last_writer; + bool early_exit_allowed; + // before running goes to zero, status needs leader->StateMutex() + Status status; + std::atomic running; + }; + // Information kept for every waiting writer. struct Writer { WriteBatch* batch; bool sync; bool disableWAL; bool in_batch_group; - bool done; bool has_callback; + bool made_waitable; // records lazy construction of mutex and cv + std::atomic state; // write under StateMutex() or pre-link + ParallelGroup* parallel_group; + SequenceNumber sequence; // the sequence number to use Status status; - bool made_waitable; // records lazy construction of mutex and cv - bool joined; // read/write only under JoinMutex() (or pre-link) - std::aligned_storage::type join_mutex_bytes; - std::aligned_storage::type join_cv_bytes; + std::aligned_storage::type state_mutex_bytes; + std::aligned_storage::type state_cv_bytes; Writer* link_older; // read/write only before linking, or as leader Writer* link_newer; // lazy, read/write only before linking, or as leader @@ -41,44 +93,45 @@ class WriteThread { sync(false), disableWAL(false), in_batch_group(false), - done(false), has_callback(false), made_waitable(false), - joined(false), + state(STATE_INIT), link_older(nullptr), link_newer(nullptr) {} ~Writer() { if (made_waitable) { - JoinMutex().~mutex(); - JoinCV().~condition_variable(); + StateMutex().~mutex(); + StateCV().~condition_variable(); } } void CreateMutex() { - assert(!joined); if (!made_waitable) { + // Note that made_waitable is tracked separately from state + // transitions, because we can't atomically create the mutex and + // link into the list. made_waitable = true; - new (&join_mutex_bytes) std::mutex; - new (&join_cv_bytes) std::condition_variable; + new (&state_mutex_bytes) std::mutex; + new (&state_cv_bytes) std::condition_variable; } } - // No other mutexes may be acquired while holding JoinMutex(), it is + // No other mutexes may be acquired while holding StateMutex(), it is // always last in the order - std::mutex& JoinMutex() { + std::mutex& StateMutex() { assert(made_waitable); - return *static_cast(static_cast(&join_mutex_bytes)); + return *static_cast(static_cast(&state_mutex_bytes)); } - std::condition_variable& JoinCV() { + std::condition_variable& StateCV() { assert(made_waitable); return *static_cast( - static_cast(&join_cv_bytes)); + static_cast(&state_cv_bytes)); } }; - WriteThread() : newest_writer_(nullptr) {} + WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec); // IMPORTANT: None of the methods in this class rely on the db mutex // for correctness. All of the methods except JoinBatchGroup and @@ -86,13 +139,16 @@ class WriteThread { // Correctness is maintained by ensuring that only a single thread is // a leader at a time. - // Registers w as ready to become part of a batch group, and blocks - // until some other thread has completed the write (in which case - // w->done will be set to true) or this write has become the leader - // of a batch group (w->done will remain unset). The db mutex SHOULD - // NOT be held when calling this function, because it will block. - // If !w->done then JoinBatchGroup should be followed by a call to - // EnterAsBatchGroupLeader and ExitAsBatchGroupLeader. + // Registers w as ready to become part of a batch group, waits until the + // caller should perform some work, and returns the current state of the + // writer. If w has become the leader of a write batch group, returns + // STATE_GROUP_LEADER. If w has been made part of a sequential batch + // group and the leader has performed the write, returns STATE_DONE. + // If w has been made part of a parallel batch group and is reponsible + // for updating the memtable, returns STATE_PARALLEL_FOLLOWER. + // + // The db mutex SHOULD NOT be held when calling this function, because + // it will block. // // Writer* w: Writer to be executed as part of a batch group void JoinBatchGroup(Writer* w); @@ -100,15 +156,35 @@ class WriteThread { // Constructs a write batch group led by leader, which should be a // Writer passed to JoinBatchGroup on the current thread. // - // Writer* leader: Writer passed to JoinBatchGroup, but !done - // Writer** last_writer: Out-param for use by ExitAsBatchGroupLeader + // Writer* leader: Writer that is STATE_GROUP_LEADER + // Writer** last_writer: Out-param that identifies the last follower // autovector* write_batch_group: Out-param of group members - // returns: Total batch group size + // returns: Total batch group byte size size_t EnterAsBatchGroupLeader(Writer* leader, Writer** last_writer, autovector* write_batch_group); - // Unlinks the Writer-s in a batch group, wakes up the non-leaders, and - // wakes up the next leader (if any). + // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the + // non-leader members of this write batch group. Sets Writer::sequence + // before waking them up. + // + // ParallalGroup* pg: Extra state used to coordinate the parallel add + // SequenceNumber sequence: Starting sequence number to assign to Writer-s + void LaunchParallelFollowers(ParallelGroup* pg, SequenceNumber sequence); + + // Reports the completion of w's batch to the parallel group leader, and + // waits for the rest of the parallel batch to complete. Returns true + // if this thread is the last to complete, and hence should advance + // the sequence number and then call EarlyExitParallelGroup, false if + // someone else has already taken responsibility for that. + bool CompleteParallelWorker(Writer* w); + + // This method performs an early completion of a parallel write group, + // where the cleanup work of the leader is performed by a follower who + // happens to be the last parallel worker to complete. + void EarlyExitParallelGroup(Writer* w); + + // Unlinks the Writer-s in a batch group, wakes up the non-leaders, + // and wakes up the next leader (if any). // // Writer* leader: From EnterAsBatchGroupLeader // Writer* last_writer: Value of out-param of EnterAsBatchGroupLeader @@ -128,18 +204,35 @@ class WriteThread { // writers. void ExitUnbatched(Writer* w); + struct AdaptationContext { + const char* name; + std::atomic value; + }; + private: + uint64_t max_yield_usec_; + uint64_t slow_yield_usec_; + // Points to the newest pending Writer. Only leader can remove // elements, adding can be done lock-free by anybody std::atomic newest_writer_; - void Await(Writer* w); - void MarkJoined(Writer* w); + // Waits for w->state & goal_mask using w->StateMutex(). Returns + // the state that satisfies goal_mask. + uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask); + + // Blocks until w->state & goal_mask, returning the state value + // that satisfied the predicate. Uses ctx to adaptively use + // std::this_thread::yield() to avoid mutex overheads. ctx should be + // a context-dependent static. + uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx); + + void SetState(Writer* w, uint8_t new_state); - // Links w into the newest_writer_ list. Sets *wait_needed to false - // if w was linked directly into the leader position, true otherwise. - // Safe to call from multiple threads without external locking. - void LinkOne(Writer* w, bool* wait_needed); + // Links w into the newest_writer_ list. Sets *linked_as_leader to + // true if w was linked directly into the leader position. Safe to + // call from multiple threads without external locking. + void LinkOne(Writer* w, bool* linked_as_leader); // Computes any missing link_newer links. Should not be called // concurrently with itself. diff --git a/db/writebuffer.h b/db/writebuffer.h index 7047a9244..4fe51d8a7 100644 --- a/db/writebuffer.h +++ b/db/writebuffer.h @@ -11,16 +11,20 @@ #pragma once +#include + namespace rocksdb { class WriteBuffer { public: explicit WriteBuffer(size_t _buffer_size) - : buffer_size_(_buffer_size), memory_used_(0) {} + : buffer_size_(_buffer_size), memory_used_(0) {} ~WriteBuffer() {} - size_t memory_usage() const { return memory_used_; } + size_t memory_usage() const { + return memory_used_.load(std::memory_order_relaxed); + } size_t buffer_size() const { return buffer_size_; } // Should only be called from write thread @@ -29,12 +33,16 @@ class WriteBuffer { } // Should only be called from write thread - void ReserveMem(size_t mem) { memory_used_ += mem; } - void FreeMem(size_t mem) { memory_used_ -= mem; } + void ReserveMem(size_t mem) { + memory_used_.fetch_add(mem, std::memory_order_relaxed); + } + void FreeMem(size_t mem) { + memory_used_.fetch_sub(mem, std::memory_order_relaxed); + } private: const size_t buffer_size_; - size_t memory_used_; + std::atomic memory_used_; // No copying allowed WriteBuffer(const WriteBuffer&); diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index f02c2d094..6cd92d823 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -36,7 +36,9 @@ #pragma once #include +#include #include +#include namespace rocksdb { @@ -68,25 +70,36 @@ class MemTableRep { explicit MemTableRep(MemTableAllocator* allocator) : allocator_(allocator) {} - // Allocate a buf of len size for storing key. The idea is that a specific - // memtable representation knows its underlying data structure better. By - // allowing it to allocate memory, it can possibly put correlated stuff - // in consecutive memory area to make processor prefetching more efficient. + // Allocate a buf of len size for storing key. The idea is that a + // specific memtable representation knows its underlying data structure + // better. By allowing it to allocate memory, it can possibly put + // correlated stuff in consecutive memory area to make processor + // prefetching more efficient. virtual KeyHandle Allocate(const size_t len, char** buf); // Insert key into the collection. (The caller will pack key and value into a // single buffer and pass that in as the parameter to Insert). // REQUIRES: nothing that compares equal to key is currently in the - // collection. + // collection, and no concurrent modifications to the table in progress virtual void Insert(KeyHandle handle) = 0; + // Like Insert(handle), but may be called concurrent with other calls + // to InsertConcurrently for other handles + virtual void InsertConcurrently(KeyHandle handle) { +#ifndef ROCKSDB_LITE + throw std::runtime_error("concurrent insert not supported"); +#else + abort(); +#endif + } + // Returns true iff an entry that compares equal to key is in the collection. virtual bool Contains(const char* key) const = 0; - // Notify this table rep that it will no longer be added to. By default, does - // nothing. After MarkReadOnly() is called, this table rep will not be - // written to (ie No more calls to Allocate(), Insert(), or any writes done - // directly to entries accessed through the iterator.) + // Notify this table rep that it will no longer be added to. By default, + // does nothing. After MarkReadOnly() is called, this table rep will + // not be written to (ie No more calls to Allocate(), Insert(), + // or any writes done directly to entries accessed through the iterator.) virtual void MarkReadOnly() { } // Look up key from the mem table, since the first key in the mem table whose @@ -94,6 +107,7 @@ class MemTableRep { // callback_args directly forwarded as the first parameter, and the mem table // key as the second parameter. If the return value is false, then terminates. // Otherwise, go through the next key. + // // It's safe for Get() to terminate after having finished all the potential // key for the k.user_key(), or not. // @@ -109,7 +123,7 @@ class MemTableRep { } // Report an approximation of how much memory has been used other than memory - // that was allocated through the allocator. + // that was allocated through the allocator. Safe to call from any thread. virtual size_t ApproximateMemoryUsage() = 0; virtual ~MemTableRep() { } @@ -174,6 +188,10 @@ class MemTableRep { // Default: true virtual bool IsSnapshotSupported() const { return true; } + // Return true if the current MemTableRep supports concurrent inserts + // Default: false + virtual bool IsInsertConcurrentlySupported() const { return false; } + protected: // When *key is an internal key concatenated with the value, returns the // user key. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index fb35b4108..e7064b3cb 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1173,6 +1173,47 @@ struct DBOptions { // Default: 2MB/s uint64_t delayed_write_rate; + // If true, allow multi-writers to update mem tables in parallel. + // Only some memtable_factory-s support concurrent writes; currently it + // is implemented only for SkipListFactory. Concurrent memtable writes + // are not compatible with inplace_update_support or filter_deletes. + // It is strongly recommended to set enable_write_thread_adaptive_yield + // if you are going to use this feature. + // + // THIS FEATURE IS NOT STABLE YET. + // + // Default: false + bool allow_concurrent_memtable_write; + + // If true, threads synchronizing with the write batch group leader will + // wait for up to write_thread_max_yield_usec before blocking on a mutex. + // This can substantially improve throughput for concurrent workloads, + // regardless of whether allow_concurrent_memtable_write is enabled. + // + // THIS FEATURE IS NOT STABLE YET. + // + // Default: false + bool enable_write_thread_adaptive_yield; + + // The maximum number of microseconds that a write operation will use + // a yielding spin loop to coordinate with other write threads before + // blocking on a mutex. (Assuming write_thread_slow_yield_usec is + // set properly) increasing this value is likely to increase RocksDB + // throughput at the expense of increased CPU usage. + // + // Default: 100 + uint64_t write_thread_max_yield_usec; + + // The latency in microseconds after which a std::this_thread::yield + // call (sched_yield on Linux) is considered to be a signal that + // other processes or threads would like to use the current core. + // Increasing this makes writer threads more likely to take CPU + // by spinning, which will show up as an increase in the number of + // involuntary context switches. + // + // Default: 3 + uint64_t write_thread_slow_yield_usec; + // If true, then DB::Open() will not update the statistics used to optimize // compaction decision by loading table properties from many files. // Turning off this feature will improve DBOpen time especially in diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 9a21fe174..15c49439c 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -145,7 +145,7 @@ enum Tickers : uint32_t { // Writes can be processed by requesting thread or by the thread at the // head of the writers queue. WRITE_DONE_BY_SELF, - WRITE_DONE_BY_OTHER, + WRITE_DONE_BY_OTHER, // Equivalent to writes done for others WRITE_TIMEDOUT, // Number of writes ending up with timed-out. WRITE_WITH_WAL, // Number of Write calls that request WAL COMPACT_READ_BYTES, // Bytes read during compaction diff --git a/port/port_posix.cc b/port/port_posix.cc index 773c6f1c3..73ad3caf1 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -10,6 +10,9 @@ #include "port/port_posix.h" #include +#if defined(__i386__) || defined(__x86_64__) +#include +#endif #include #include #include @@ -132,6 +135,19 @@ void RWMutex::ReadUnlock() { PthreadCall("read unlock", pthread_rwlock_unlock(&m void RWMutex::WriteUnlock() { PthreadCall("write unlock", pthread_rwlock_unlock(&mu_)); } +int PhysicalCoreID() { +#if defined(__i386__) || defined(__x86_64__) + // if you ever find that this function is hot on Linux, you can go from + // ~200 nanos to ~20 nanos by adding the machinery to use __vdso_getcpu + unsigned eax, ebx = 0, ecx, edx; + __get_cpuid(1, &eax, &ebx, &ecx, &edx); + return ebx >> 24; +#else + // getcpu or sched_getcpu could work here + return -1; +#endif +} + void InitOnce(OnceType* once, void (*initializer)()) { PthreadCall("once", pthread_once(once, initializer)); } diff --git a/port/port_posix.h b/port/port_posix.h index 8854e1c3f..efcd1aa8e 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -43,8 +43,9 @@ #include #include -#include #include +#include +#include #ifndef PLATFORM_IS_LITTLE_ENDIAN #define PLATFORM_IS_LITTLE_ENDIAN (__BYTE_ORDER == __LITTLE_ENDIAN) @@ -71,8 +72,6 @@ #define fdatasync fsync #endif -#include - namespace rocksdb { namespace port { @@ -142,6 +141,20 @@ class CondVar { Mutex* mu_; }; +static inline void AsmVolatilePause() { +#if defined(__i386__) || defined(__x86_64__) + asm volatile("pause"); +#elif defined(__aarch64__) + asm volatile("wfe"); +#elif defined(__powerpc64__) + asm volatile("or 27,27,27"); +#endif + // it's okay for other platforms to be no-ops +} + +// Returns -1 if not available on this platform +extern int PhysicalCoreID(); + typedef pthread_once_t OnceType; #define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT extern void InitOnce(OnceType* once, void (*initializer)()); diff --git a/port/win/port_win.cc b/port/win/port_win.cc index 91c83b8c9..e08f0ec22 100644 --- a/port/win/port_win.cc +++ b/port/win/port_win.cc @@ -100,6 +100,8 @@ void CondVar::Signal() { cv_.notify_one(); } void CondVar::SignalAll() { cv_.notify_all(); } +int PhysicalCoreID() { return GetCurrentProcessorNumber(); } + void InitOnce(OnceType* once, void (*initializer)()) { std::call_once(once->flag_, initializer); } diff --git a/port/win/port_win.h b/port/win/port_win.h index 6b7920023..9ee7d96be 100644 --- a/port/win/port_win.h +++ b/port/win/port_win.h @@ -243,6 +243,15 @@ extern void InitOnce(OnceType* once, void (*initializer)()); #define CACHE_LINE_SIZE 64U +static inline void AsmVolatilePause() { +#if defined(_M_IX86) || defined(_M_X64) + ::_mm_pause(); +#endif + // it would be nice to get "wfe" on ARM here +} + +extern int PhysicalCoreID(); + // For Thread Local Storage abstraction typedef DWORD pthread_key_t; diff --git a/src.mk b/src.mk index 40f7f0d7a..369890258 100644 --- a/src.mk +++ b/src.mk @@ -88,6 +88,7 @@ LIB_SOURCES = \ util/coding.cc \ util/comparator.cc \ util/compaction_job_stats_impl.cc \ + util/concurrent_arena.cc \ util/crc32c.cc \ util/db_info_dumper.cc \ util/delete_scheduler_impl.cc \ diff --git a/table/table_test.cc b/table/table_test.cc index 58607bbb2..cf58a2477 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -2041,7 +2041,8 @@ TEST_F(MemTableTest, Simple) { batch.Put(std::string("k3"), std::string("v3")); batch.Put(std::string("largekey"), std::string("vlarge")); ColumnFamilyMemTablesDefault cf_mems_default(memtable); - ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, &cf_mems_default).ok()); + ASSERT_TRUE( + WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr).ok()); Arena arena; ScopedArenaIterator iter(memtable->NewIterator(ReadOptions(), &arena)); diff --git a/util/arena.cc b/util/arena.cc index 1fe455af5..1d292ec01 100644 --- a/util/arena.cc +++ b/util/arena.cc @@ -167,7 +167,7 @@ char* Arena::AllocateAligned(size_t bytes, size_t huge_page_size, aligned_alloc_ptr_ += needed; alloc_bytes_remaining_ -= needed; } else { - // AllocateFallback always returned aligned memory + // AllocateFallback always returns aligned memory result = AllocateFallback(bytes, true /* aligned */); } assert((reinterpret_cast(result) & (kAlignUnit - 1)) == 0); diff --git a/util/arena.h b/util/arena.h index 9149498c8..db2150a8f 100644 --- a/util/arena.h +++ b/util/arena.h @@ -21,6 +21,7 @@ #include #include #include "util/allocator.h" +#include "util/mutexlock.h" namespace rocksdb { @@ -76,7 +77,7 @@ class Arena : public Allocator { size_t BlockSize() const override { return kBlockSize; } private: - char inline_block_[kInlineSize]; + char inline_block_[kInlineSize] __attribute__((__aligned__(sizeof(void*)))); // Number of bytes allocated in one block const size_t kBlockSize; // Array of new[] allocated memory blocks diff --git a/util/concurrent_arena.cc b/util/concurrent_arena.cc new file mode 100644 index 000000000..027124871 --- /dev/null +++ b/util/concurrent_arena.cc @@ -0,0 +1,49 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "util/concurrent_arena.h" +#include +#include "port/likely.h" +#include "port/port.h" +#include "util/random.h" + +namespace rocksdb { + +#if ROCKSDB_SUPPORT_THREAD_LOCAL +__thread uint32_t ConcurrentArena::tls_cpuid = 0; +#endif + +ConcurrentArena::ConcurrentArena(size_t block_size, size_t huge_page_size) + : shard_block_size_(block_size / 8), arena_(block_size, huge_page_size) { + // find a power of two >= num_cpus and >= 8 + auto num_cpus = std::thread::hardware_concurrency(); + index_mask_ = 7; + while (index_mask_ + 1 < num_cpus) { + index_mask_ = index_mask_ * 2 + 1; + } + + shards_.reset(new Shard[index_mask_ + 1]); + Fixup(); +} + +ConcurrentArena::Shard* ConcurrentArena::Repick() { + int cpuid = port::PhysicalCoreID(); + if (UNLIKELY(cpuid < 0)) { + // cpu id unavailable, just pick randomly + cpuid = Random::GetTLSInstance()->Uniform(index_mask_ + 1); + } +#if ROCKSDB_SUPPORT_THREAD_LOCAL + // even if we are cpu 0, use a non-zero tls_cpuid so we can tell we + // have repicked + tls_cpuid = cpuid | (index_mask_ + 1); +#endif + return &shards_[cpuid & index_mask_]; +} + +} // namespace rocksdb diff --git a/util/concurrent_arena.h b/util/concurrent_arena.h new file mode 100644 index 000000000..e3e1a3eb3 --- /dev/null +++ b/util/concurrent_arena.h @@ -0,0 +1,192 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include +#include "port/likely.h" +#include "util/allocator.h" +#include "util/arena.h" +#include "util/mutexlock.h" +#include "util/thread_local.h" + +namespace rocksdb { + +class Logger; + +// ConcurrentArena wraps an Arena. It makes it thread safe using a fast +// inlined spinlock, and adds small per-core allocation caches to avoid +// contention for small allocations. To avoid any memory waste from the +// per-core shards, they are kept small, they are lazily instantiated +// only if ConcurrentArena actually notices concurrent use, and they +// adjust their size so that there is no fragmentation waste when the +// shard blocks are allocated from the underlying main arena. +class ConcurrentArena : public Allocator { + public: + // block_size and huge_page_size are the same as for Arena (and are + // in fact just passed to the constructor of arena_. The core-local + // shards compute their shard_block_size as a fraction of block_size + // that varies according to the hardware concurrency level. + explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize, + size_t huge_page_size = 0); + + char* Allocate(size_t bytes) override { + return AllocateImpl(bytes, false /*force_arena*/, + [=]() { return arena_.Allocate(bytes); }); + } + + char* AllocateAligned(size_t bytes, size_t huge_page_size = 0, + Logger* logger = nullptr) override { + size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1; + assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) && + (rounded_up % sizeof(void*)) == 0); + + return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() { + return arena_.AllocateAligned(rounded_up, huge_page_size, logger); + }); + } + + size_t ApproximateMemoryUsage() const { + std::unique_lock lock(arena_mutex_, std::defer_lock); + if (index_mask_ != 0) { + lock.lock(); + } + return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused(); + } + + size_t MemoryAllocatedBytes() const { + return memory_allocated_bytes_.load(std::memory_order_relaxed); + } + + size_t AllocatedAndUnused() const { + return arena_allocated_and_unused_.load(std::memory_order_relaxed) + + ShardAllocatedAndUnused(); + } + + size_t IrregularBlockNum() const { + return irregular_block_num_.load(std::memory_order_relaxed); + } + + size_t BlockSize() const override { return arena_.BlockSize(); } + + private: + struct Shard { + char padding[40]; + mutable SpinMutex mutex; + char* free_begin_; + std::atomic allocated_and_unused_; + + Shard() : allocated_and_unused_(0) {} + }; + +#if ROCKSDB_SUPPORT_THREAD_LOCAL + static __thread uint32_t tls_cpuid; +#else + enum ZeroFirstEnum : uint32_t { tls_cpuid = 0 }; +#endif + + char padding0[56]; + + size_t shard_block_size_; + + // shards_[i & index_mask_] is valid + size_t index_mask_; + std::unique_ptr shards_; + + Arena arena_; + mutable SpinMutex arena_mutex_; + std::atomic arena_allocated_and_unused_; + std::atomic memory_allocated_bytes_; + std::atomic irregular_block_num_; + + char padding1[56]; + + Shard* Repick(); + + size_t ShardAllocatedAndUnused() const { + size_t total = 0; + for (size_t i = 0; i <= index_mask_; ++i) { + total += shards_[i].allocated_and_unused_.load(std::memory_order_relaxed); + } + return total; + } + + template + char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) { + uint32_t cpu; + + // Go directly to the arena if the allocation is too large, or if + // we've never needed to Repick() and the arena mutex is available + // with no waiting. This keeps the fragmentation penalty of + // concurrency zero unless it might actually confer an advantage. + std::unique_lock arena_lock(arena_mutex_, std::defer_lock); + if (bytes > shard_block_size_ / 4 || force_arena || + ((cpu = tls_cpuid) == 0 && + !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) && + arena_lock.try_lock())) { + if (!arena_lock.owns_lock()) { + arena_lock.lock(); + } + auto rv = func(); + Fixup(); + return rv; + } + + // pick a shard from which to allocate + Shard* s = &shards_[cpu & index_mask_]; + if (!s->mutex.try_lock()) { + s = Repick(); + s->mutex.lock(); + } + std::unique_lock lock(s->mutex, std::adopt_lock); + + size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed); + if (avail < bytes) { + // reload + std::lock_guard reload_lock(arena_mutex_); + + // If the arena's current block is within a factor of 2 of the right + // size, we adjust our request to avoid arena waste. + auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed); + assert(exact == arena_.AllocatedAndUnused()); + avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2 + ? exact + : shard_block_size_; + s->free_begin_ = arena_.AllocateAligned(avail); + Fixup(); + } + s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed); + + char* rv; + if ((bytes % sizeof(void*)) == 0) { + // aligned allocation from the beginning + rv = s->free_begin_; + s->free_begin_ += bytes; + } else { + // unaligned from the end + rv = s->free_begin_ + avail - bytes; + } + return rv; + } + + void Fixup() { + arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(), + std::memory_order_relaxed); + memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(), + std::memory_order_relaxed); + irregular_block_num_.store(arena_.IrregularBlockNum(), + std::memory_order_relaxed); + } + + ConcurrentArena(const ConcurrentArena&) = delete; + ConcurrentArena& operator=(const ConcurrentArena&) = delete; +}; + +} // namespace rocksdb diff --git a/util/dynamic_bloom.cc b/util/dynamic_bloom.cc index ffe8157cc..4df81d527 100644 --- a/util/dynamic_bloom.cc +++ b/util/dynamic_bloom.cc @@ -48,7 +48,7 @@ DynamicBloom::DynamicBloom(uint32_t num_probes, void DynamicBloom::SetRawData(unsigned char* raw_data, uint32_t total_bits, uint32_t num_blocks) { - data_ = raw_data; + data_ = reinterpret_cast*>(raw_data); kTotalBits = total_bits; kNumBlocks = num_blocks; } @@ -69,15 +69,14 @@ void DynamicBloom::SetTotalBits(Allocator* allocator, sz += CACHE_LINE_SIZE - 1; } assert(allocator); - raw_ = reinterpret_cast( - allocator->AllocateAligned(sz, huge_page_tlb_size, logger)); - memset(raw_, 0, sz); - if (kNumBlocks > 0 && (reinterpret_cast(raw_) % CACHE_LINE_SIZE)) { - data_ = raw_ + CACHE_LINE_SIZE - - reinterpret_cast(raw_) % CACHE_LINE_SIZE; - } else { - data_ = raw_; + + char* raw = allocator->AllocateAligned(sz, huge_page_tlb_size, logger); + memset(raw, 0, sz); + auto cache_line_offset = reinterpret_cast(raw) % CACHE_LINE_SIZE; + if (kNumBlocks > 0 && cache_line_offset > 0) { + raw += CACHE_LINE_SIZE - cache_line_offset; } + data_ = reinterpret_cast*>(raw); } } // rocksdb diff --git a/util/dynamic_bloom.h b/util/dynamic_bloom.h index e2ac56e76..8d1b7b4af 100644 --- a/util/dynamic_bloom.h +++ b/util/dynamic_bloom.h @@ -51,9 +51,15 @@ class DynamicBloom { // Assuming single threaded access to this function. void Add(const Slice& key); + // Like Add, but may be called concurrent with other functions. + void AddConcurrently(const Slice& key); + // Assuming single threaded access to this function. void AddHash(uint32_t hash); + // Like AddHash, but may be called concurrent with other functions. + void AddHashConcurrently(uint32_t hash); + // Multithreaded access to this function is OK bool MayContain(const Slice& key) const; @@ -81,12 +87,40 @@ class DynamicBloom { const uint32_t kNumProbes; uint32_t (*hash_func_)(const Slice& key); - unsigned char* data_; - unsigned char* raw_; + std::atomic* data_; + + // or_func(ptr, mask) should effect *ptr |= mask with the appropriate + // concurrency safety, working with bytes. + template + void AddHash(uint32_t hash, const OrFunc& or_func); }; inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); } +inline void DynamicBloom::AddConcurrently(const Slice& key) { + AddHashConcurrently(hash_func_(key)); +} + +inline void DynamicBloom::AddHash(uint32_t hash) { + AddHash(hash, [](std::atomic* ptr, uint8_t mask) { + ptr->store(ptr->load(std::memory_order_relaxed) | mask, + std::memory_order_relaxed); + }); +} + +inline void DynamicBloom::AddHashConcurrently(uint32_t hash) { + AddHash(hash, [](std::atomic* ptr, uint8_t mask) { + // Happens-before between AddHash and MaybeContains is handled by + // access to versions_->LastSequence(), so all we have to do here is + // avoid races (so we don't give the compiler a license to mess up + // our code) and not lose bits. std::memory_order_relaxed is enough + // for that. + if ((mask & ptr->load(std::memory_order_relaxed)) != mask) { + ptr->fetch_or(mask, std::memory_order_relaxed); + } + }); +} + inline bool DynamicBloom::MayContain(const Slice& key) const { return (MayContainHash(hash_func_(key))); } @@ -107,7 +141,8 @@ inline bool DynamicBloom::MayContainHash(uint32_t h) const { // Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized // to a simple and operation by compiler. const uint32_t bitpos = b + (h % (CACHE_LINE_SIZE * 8)); - if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { + uint8_t byteval = data_[bitpos / 8].load(std::memory_order_relaxed); + if ((byteval & (1 << (bitpos % 8))) == 0) { return false; } // Rotate h so that we don't reuse the same bytes. @@ -118,7 +153,8 @@ inline bool DynamicBloom::MayContainHash(uint32_t h) const { } else { for (uint32_t i = 0; i < kNumProbes; ++i) { const uint32_t bitpos = h % kTotalBits; - if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { + uint8_t byteval = data_[bitpos / 8].load(std::memory_order_relaxed); + if ((byteval & (1 << (bitpos % 8))) == 0) { return false; } h += delta; @@ -127,7 +163,8 @@ inline bool DynamicBloom::MayContainHash(uint32_t h) const { return true; } -inline void DynamicBloom::AddHash(uint32_t h) { +template +inline void DynamicBloom::AddHash(uint32_t h, const OrFunc& or_func) { assert(IsInitialized()); const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits if (kNumBlocks != 0) { @@ -136,7 +173,7 @@ inline void DynamicBloom::AddHash(uint32_t h) { // Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized // to a simple and operation by compiler. const uint32_t bitpos = b + (h % (CACHE_LINE_SIZE * 8)); - data_[bitpos / 8] |= (1 << (bitpos % 8)); + or_func(&data_[bitpos / 8], (1 << (bitpos % 8))); // Rotate h so that we don't reuse the same bytes. h = h / (CACHE_LINE_SIZE * 8) + (h % (CACHE_LINE_SIZE * 8)) * (0x20000000U / CACHE_LINE_SIZE); @@ -145,7 +182,7 @@ inline void DynamicBloom::AddHash(uint32_t h) { } else { for (uint32_t i = 0; i < kNumProbes; ++i) { const uint32_t bitpos = h % kTotalBits; - data_[bitpos / 8] |= (1 << (bitpos % 8)); + or_func(&data_[bitpos / 8], (1 << (bitpos % 8))); h += delta; } } diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc index cb3836661..e5ef4aab7 100644 --- a/util/dynamic_bloom_test.cc +++ b/util/dynamic_bloom_test.cc @@ -17,6 +17,10 @@ int main() { #include #include +#include +#include +#include +#include #include #include "dynamic_bloom.h" @@ -72,6 +76,25 @@ TEST_F(DynamicBloomTest, Small) { ASSERT_TRUE(!bloom2.MayContain("foo")); } +TEST_F(DynamicBloomTest, SmallConcurrentAdd) { + Arena arena; + DynamicBloom bloom1(&arena, 100, 0, 2); + bloom1.AddConcurrently("hello"); + bloom1.AddConcurrently("world"); + ASSERT_TRUE(bloom1.MayContain("hello")); + ASSERT_TRUE(bloom1.MayContain("world")); + ASSERT_TRUE(!bloom1.MayContain("x")); + ASSERT_TRUE(!bloom1.MayContain("foo")); + + DynamicBloom bloom2(&arena, CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); + bloom2.AddConcurrently("hello"); + bloom2.AddConcurrently("world"); + ASSERT_TRUE(bloom2.MayContain("hello")); + ASSERT_TRUE(bloom2.MayContain("world")); + ASSERT_TRUE(!bloom2.MayContain("x")); + ASSERT_TRUE(!bloom2.MayContain("foo")); +} + static uint32_t NextNum(uint32_t num) { if (num < 10) { num += 1; @@ -93,8 +116,8 @@ TEST_F(DynamicBloomTest, VaryingLengths) { int good_filters = 0; uint32_t num_probes = static_cast(FLAGS_num_probes); - fprintf(stderr, "bits_per_key: %d num_probes: %d\n", - FLAGS_bits_per_key, num_probes); + fprintf(stderr, "bits_per_key: %d num_probes: %d\n", FLAGS_bits_per_key, + num_probes); for (uint32_t enable_locality = 0; enable_locality < 2; ++enable_locality) { for (uint32_t num = 1; num <= 10000; num = NextNum(num)) { @@ -114,8 +137,8 @@ TEST_F(DynamicBloomTest, VaryingLengths) { // All added keys must match for (uint64_t i = 0; i < num; i++) { - ASSERT_TRUE(bloom.MayContain(Key(i, buffer))) - << "Num " << num << "; key " << i; + ASSERT_TRUE(bloom.MayContain(Key(i, buffer))) << "Num " << num + << "; key " << i; } // Check false positive rate @@ -139,9 +162,9 @@ TEST_F(DynamicBloomTest, VaryingLengths) { good_filters++; } - fprintf(stderr, "Filters: %d good, %d mediocre\n", - good_filters, mediocre_filters); - ASSERT_LE(mediocre_filters, good_filters/5); + fprintf(stderr, "Filters: %d good, %d mediocre\n", good_filters, + mediocre_filters); + ASSERT_LE(mediocre_filters, good_filters / 5); } } @@ -161,7 +184,7 @@ TEST_F(DynamicBloomTest, perf) { DynamicBloom std_bloom(&arena, num_keys * 10, 0, num_probes); timer.Start(); - for (uint32_t i = 1; i <= num_keys; ++i) { + for (uint64_t i = 1; i <= num_keys; ++i) { std_bloom.Add(Slice(reinterpret_cast(&i), 8)); } @@ -171,7 +194,7 @@ TEST_F(DynamicBloomTest, perf) { uint32_t count = 0; timer.Start(); - for (uint32_t i = 1; i <= num_keys; ++i) { + for (uint64_t i = 1; i <= num_keys; ++i) { if (std_bloom.MayContain(Slice(reinterpret_cast(&i), 8))) { ++count; } @@ -184,31 +207,125 @@ TEST_F(DynamicBloomTest, perf) { // Locality enabled version DynamicBloom blocked_bloom(&arena, num_keys * 10, 1, num_probes); + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + blocked_bloom.Add(Slice(reinterpret_cast(&i), 8)); + } + + elapsed = timer.ElapsedNanos(); + fprintf(stderr, + "blocked bloom(enable locality), avg add latency %" PRIu64 "\n", + elapsed / num_keys); + + count = 0; + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + if (blocked_bloom.MayContain( + Slice(reinterpret_cast(&i), 8))) { + ++count; + } + } + + elapsed = timer.ElapsedNanos(); + fprintf(stderr, + "blocked bloom(enable locality), avg query latency %" PRIu64 "\n", + elapsed / count); + ASSERT_TRUE(count == num_keys); + } +} + +TEST_F(DynamicBloomTest, concurrent_with_perf) { + StopWatchNano timer(Env::Default()); + uint32_t num_probes = static_cast(FLAGS_num_probes); + + uint32_t m_limit = FLAGS_enable_perf ? 8 : 1; + uint32_t locality_limit = FLAGS_enable_perf ? 1 : 0; + + uint32_t num_threads = 4; + std::vector threads; + + for (uint32_t m = 1; m <= m_limit; ++m) { + for (uint32_t locality = 0; locality <= locality_limit; ++locality) { + Arena arena; + const uint32_t num_keys = m * 8 * 1024 * 1024; + fprintf(stderr, "testing %" PRIu32 "M keys with %" PRIu32 " locality\n", + m * 8, locality); + + DynamicBloom std_bloom(&arena, num_keys * 10, locality, num_probes); + timer.Start(); - for (uint32_t i = 1; i <= num_keys; ++i) { - blocked_bloom.Add(Slice(reinterpret_cast(&i), 8)); + + auto adder = [&](size_t t) { + for (uint64_t i = 1 + t; i <= num_keys; i += num_threads) { + std_bloom.AddConcurrently( + Slice(reinterpret_cast(&i), 8)); + } + }; + for (size_t t = 0; t < num_threads; ++t) { + // TSAN currently complains of a race between an allocation + // made bythis race and the eventual shutdown of the thread. + // It is a false positive. + threads.emplace_back(adder, t); + } + while (threads.size() > 0) { + threads.back().join(); + threads.pop_back(); + } + + uint64_t elapsed = timer.ElapsedNanos(); + fprintf(stderr, "standard bloom, avg parallel add latency %" PRIu64 + " nanos/key\n", + elapsed / num_keys); + + timer.Start(); + + auto hitter = [&](size_t t) { + for (uint64_t i = 1 + t; i <= num_keys; i += num_threads) { + bool f = + std_bloom.MayContain(Slice(reinterpret_cast(&i), 8)); + ASSERT_TRUE(f); + } + }; + for (size_t t = 0; t < num_threads; ++t) { + threads.emplace_back(hitter, t); + } + while (threads.size() > 0) { + threads.back().join(); + threads.pop_back(); } elapsed = timer.ElapsedNanos(); - fprintf(stderr, - "blocked bloom(enable locality), avg add latency %" PRIu64 "\n", + fprintf(stderr, "standard bloom, avg parallel hit latency %" PRIu64 + " nanos/key\n", elapsed / num_keys); - count = 0; timer.Start(); - for (uint32_t i = 1; i <= num_keys; ++i) { - if (blocked_bloom.MayContain( - Slice(reinterpret_cast(&i), 8))) { - ++count; + + std::atomic false_positives(0); + auto misser = [&](size_t t) { + for (uint64_t i = num_keys + 1 + t; i <= 2 * num_keys; + i += num_threads) { + bool f = + std_bloom.MayContain(Slice(reinterpret_cast(&i), 8)); + if (f) { + ++false_positives; + } } + }; + for (size_t t = 0; t < num_threads; ++t) { + threads.emplace_back(misser, t); + } + while (threads.size() > 0) { + threads.back().join(); + threads.pop_back(); } elapsed = timer.ElapsedNanos(); - fprintf(stderr, - "blocked bloom(enable locality), avg query latency %" PRIu64 "\n", - elapsed / count); - ASSERT_TRUE(count == num_keys); + fprintf(stderr, "standard bloom, avg parallel miss latency %" PRIu64 + " nanos/key, %f%% false positive rate\n", + elapsed / num_keys, false_positives.load() * 100.0 / num_keys); } + } } } // namespace rocksdb diff --git a/util/mutexlock.h b/util/mutexlock.h index 6121ec1ec..63a0f5ce1 100644 --- a/util/mutexlock.h +++ b/util/mutexlock.h @@ -8,6 +8,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include +#include +#include +#include #include "port/port.h" namespace rocksdb { @@ -75,4 +79,39 @@ class WriteLock { void operator=(const WriteLock&); }; +// +// SpinMutex has very low overhead for low-contention cases. Method names +// are chosen so you can use std::unique_lock or std::lock_guard with it. +// +class SpinMutex { + public: + SpinMutex() : locked_(false) {} + + bool try_lock() { + auto currently_locked = locked_.load(std::memory_order_relaxed); + return !currently_locked && + locked_.compare_exchange_weak(currently_locked, true, + std::memory_order_acquire, + std::memory_order_relaxed); + } + + void lock() { + for (size_t tries = 0;; ++tries) { + if (try_lock()) { + // success + break; + } + port::AsmVolatilePause(); + if (tries > 100) { + std::this_thread::yield(); + } + } + } + + void unlock() { locked_.store(false, std::memory_order_release); } + + private: + std::atomic locked_; +}; + } // namespace rocksdb diff --git a/util/options.cc b/util/options.cc index 09ae10360..c925153fd 100644 --- a/util/options.cc +++ b/util/options.cc @@ -262,6 +262,10 @@ DBOptions::DBOptions() listeners(), enable_thread_tracking(false), delayed_write_rate(2 * 1024U * 1024U), + allow_concurrent_memtable_write(false), + enable_write_thread_adaptive_yield(false), + write_thread_max_yield_usec(100), + write_thread_slow_yield_usec(3), skip_stats_update_on_db_open(false), wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords), row_cache(nullptr), @@ -325,6 +329,11 @@ DBOptions::DBOptions(const Options& options) listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), delayed_write_rate(options.delayed_write_rate), + allow_concurrent_memtable_write(options.allow_concurrent_memtable_write), + enable_write_thread_adaptive_yield( + options.enable_write_thread_adaptive_yield), + write_thread_max_yield_usec(options.write_thread_max_yield_usec), + write_thread_slow_yield_usec(options.write_thread_slow_yield_usec), skip_stats_update_on_db_open(options.skip_stats_update_on_db_open), wal_recovery_mode(options.wal_recovery_mode), row_cache(options.row_cache), @@ -435,6 +444,14 @@ void DBOptions::Dump(Logger* log) const { wal_recovery_mode); Header(log, " Options.enable_thread_tracking: %d", enable_thread_tracking); + Header(log, " Options.allow_concurrent_memtable_write: %d", + allow_concurrent_memtable_write); + Header(log, " Options.enable_write_thread_adaptive_yield: %d", + enable_write_thread_adaptive_yield); + Header(log, " Options.write_thread_max_yield_usec: %" PRIu64, + write_thread_max_yield_usec); + Header(log, " Options.write_thread_slow_yield_usec: %" PRIu64, + write_thread_slow_yield_usec); if (row_cache) { Header(log, " Options.row_cache: %" PRIu64, row_cache->GetCapacity()); diff --git a/util/skiplistrep.cc b/util/skiplistrep.cc index 83472c699..7108008a8 100644 --- a/util/skiplistrep.cc +++ b/util/skiplistrep.cc @@ -25,6 +25,8 @@ public: transform_(transform), lookahead_(lookahead) { } + virtual bool IsInsertConcurrentlySupported() const override { return true; } + virtual KeyHandle Allocate(const size_t len, char** buf) override { *buf = skip_list_.AllocateKey(len); return static_cast(*buf); @@ -36,6 +38,10 @@ public: skip_list_.Insert(static_cast(handle)); } + virtual void InsertConcurrently(KeyHandle handle) override { + skip_list_.InsertConcurrently(static_cast(handle)); + } + // Returns true iff an entry that compares equal to key is in the list. virtual bool Contains(const char* key) const override { return skip_list_.Contains(key);