From e9f91a51761652669ac4a012d339df1c203f7fac Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Thu, 29 Jun 2017 16:57:13 -0700 Subject: [PATCH] Add a fetch_add variation to AddDBStats Summary: AddDBStats is in two steps of load and store, which is more efficient than fetch_add. This is however not thread-safe. Currently we have to protect concurrent access to AddDBStats with a mutex which is less efficient that fetch_add. This patch adds the option to do fetch_add when AddDBStats. The results for my 2pc benchmark on sysbench is: - vanilla: 68618 tps - removing mutex on AddDBStats (unsafe): 69767 tps - fetch_add for all AddDBStats: 69200 tps - fetch_add only for concurrently access AddDBStats (this patch): 69579 tps Closes https://github.com/facebook/rocksdb/pull/2505 Differential Revision: D5330656 Pulled By: maysamyabandeh fbshipit-source-id: af64d7bee135b0e86b4fac323a4f9d9113eaa383 --- db/db_impl.h | 2 -- db/db_impl_write.cc | 38 ++++++++++++++++++-------------------- db/internal_stats.h | 11 ++++++++--- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/db/db_impl.h b/db/db_impl.h index 285a7c861..5cd7d2cfb 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -843,8 +843,6 @@ class DBImpl : public DB { // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; - // It is used to concurrently update stats in the write threads - InstrumentedMutex stat_mutex_; // It protects the back() of logs_ and alive_log_files_. Any push_back to // these must be under log_write_mutex_ and any access that requires the // back() to remain the same must also lock log_write_mutex_. diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 86c29c34d..734ec841a 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -197,30 +197,27 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } - if (concurrent_prepare_) { - stat_mutex_.Lock(); - } + const bool concurrent_update = concurrent_prepare_; // Update stats while we are an exclusive group leader, so we know // that nobody else can be writing to these particular stats. // We're optimistic, updating the stats before we successfully // commit. That lets us release our leader status early. auto stats = default_cf_internal_stats_; - stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); + stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count, + concurrent_update); RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); - stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); + stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size, + concurrent_update); RecordTick(stats_, BYTES_WRITTEN, total_byte_size); - stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); + stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update); RecordTick(stats_, WRITE_DONE_BY_SELF); auto write_done_by_other = write_group.size - 1; if (write_done_by_other > 0) { - stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, - write_done_by_other); + stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other, + concurrent_update); RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); } MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); - if (concurrent_prepare_) { - stat_mutex_.Unlock(); - } if (write_options.disableWAL) { has_unpersisted_data_.store(true, std::memory_order_relaxed); @@ -489,23 +486,24 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, } } - stat_mutex_.Lock(); + const bool concurrent_update = true; // Update stats while we are an exclusive group leader, so we know // that nobody else can be writing to these particular stats. // We're optimistic, updating the stats before we successfully // commit. That lets us release our leader status early. auto stats = default_cf_internal_stats_; - stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); + stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size, + concurrent_update); RecordTick(stats_, BYTES_WRITTEN, total_byte_size); - stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); + stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update); RecordTick(stats_, WRITE_DONE_BY_SELF); auto write_done_by_other = write_group.size - 1; if (write_done_by_other > 0) { - stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other); + stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other, + concurrent_update); RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); } MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); - stat_mutex_.Unlock(); PERF_TIMER_STOP(write_pre_and_post_process_time); @@ -783,13 +781,13 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, log_write_mutex_.Unlock(); if (status.ok()) { - stat_mutex_.Lock(); + const bool concurrent = true; auto stats = default_cf_internal_stats_; - stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size); + stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size, concurrent); RecordTick(stats_, WAL_FILE_BYTES, log_size); - stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal); + stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal, + concurrent); RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); - stat_mutex_.Unlock(); } return status; } diff --git a/db/internal_stats.h b/db/internal_stats.h index fcf598f2c..01cc651a6 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -272,10 +272,15 @@ class InternalStats { ++cf_stats_count_[type]; } - void AddDBStats(InternalDBStatsType type, uint64_t value) { + void AddDBStats(InternalDBStatsType type, uint64_t value, + bool concurrent = false) { auto& v = db_stats_[type]; - v.store(v.load(std::memory_order_relaxed) + value, - std::memory_order_relaxed); + if (concurrent) { + v.fetch_add(value, std::memory_order_relaxed); + } else { + v.store(v.load(std::memory_order_relaxed) + value, + std::memory_order_relaxed); + } } uint64_t GetDBStats(InternalDBStatsType type) {