Add a fetch_add variation to AddDBStats

Summary:
AddDBStats is in two steps of load and store, which is more efficient than fetch_add. This is however not thread-safe. Currently we have to protect concurrent access to AddDBStats with a mutex which is less efficient that fetch_add.

This patch adds the option to do fetch_add when AddDBStats. The results for my 2pc benchmark on sysbench is:
- vanilla: 68618 tps
- removing mutex on AddDBStats (unsafe): 69767 tps
- fetch_add for all AddDBStats: 69200 tps
- fetch_add only for concurrently access AddDBStats (this patch): 69579 tps
Closes https://github.com/facebook/rocksdb/pull/2505

Differential Revision: D5330656

Pulled By: maysamyabandeh

fbshipit-source-id: af64d7bee135b0e86b4fac323a4f9d9113eaa383
main
Maysam Yabandeh 8 years ago committed by Facebook Github Bot
parent c1b375e96a
commit e9f91a5176
  1. 2
      db/db_impl.h
  2. 38
      db/db_impl_write.cc
  3. 7
      db/internal_stats.h

@ -843,8 +843,6 @@ class DBImpl : public DB {
// Lock over the persistent DB state. Non-nullptr iff successfully acquired. // Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_; FileLock* db_lock_;
// It is used to concurrently update stats in the write threads
InstrumentedMutex stat_mutex_;
// It protects the back() of logs_ and alive_log_files_. Any push_back to // It protects the back() of logs_ and alive_log_files_. Any push_back to
// these must be under log_write_mutex_ and any access that requires the // these must be under log_write_mutex_ and any access that requires the
// back() to remain the same must also lock log_write_mutex_. // back() to remain the same must also lock log_write_mutex_.

@ -197,30 +197,27 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} }
} }
if (concurrent_prepare_) { const bool concurrent_update = concurrent_prepare_;
stat_mutex_.Lock();
}
// Update stats while we are an exclusive group leader, so we know // Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats. // that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully // We're optimistic, updating the stats before we successfully
// commit. That lets us release our leader status early. // commit. That lets us release our leader status early.
auto stats = default_cf_internal_stats_; auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count,
concurrent_update);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size,
concurrent_update);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size); RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_SELF); RecordTick(stats_, WRITE_DONE_BY_SELF);
auto write_done_by_other = write_group.size - 1; auto write_done_by_other = write_group.size - 1;
if (write_done_by_other > 0) { if (write_done_by_other > 0) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other,
write_done_by_other); concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
} }
MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
if (concurrent_prepare_) {
stat_mutex_.Unlock();
}
if (write_options.disableWAL) { if (write_options.disableWAL) {
has_unpersisted_data_.store(true, std::memory_order_relaxed); has_unpersisted_data_.store(true, std::memory_order_relaxed);
@ -489,23 +486,24 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
} }
} }
stat_mutex_.Lock(); const bool concurrent_update = true;
// Update stats while we are an exclusive group leader, so we know // Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats. // that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully // We're optimistic, updating the stats before we successfully
// commit. That lets us release our leader status early. // commit. That lets us release our leader status early.
auto stats = default_cf_internal_stats_; auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size,
concurrent_update);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size); RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_SELF); RecordTick(stats_, WRITE_DONE_BY_SELF);
auto write_done_by_other = write_group.size - 1; auto write_done_by_other = write_group.size - 1;
if (write_done_by_other > 0) { if (write_done_by_other > 0) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other); stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other,
concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
} }
MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
stat_mutex_.Unlock();
PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_STOP(write_pre_and_post_process_time);
@ -783,13 +781,13 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
log_write_mutex_.Unlock(); log_write_mutex_.Unlock();
if (status.ok()) { if (status.ok()) {
stat_mutex_.Lock(); const bool concurrent = true;
auto stats = default_cf_internal_stats_; auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size); stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size, concurrent);
RecordTick(stats_, WAL_FILE_BYTES, log_size); RecordTick(stats_, WAL_FILE_BYTES, log_size);
stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal); stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal,
concurrent);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
stat_mutex_.Unlock();
} }
return status; return status;
} }

@ -272,11 +272,16 @@ class InternalStats {
++cf_stats_count_[type]; ++cf_stats_count_[type];
} }
void AddDBStats(InternalDBStatsType type, uint64_t value) { void AddDBStats(InternalDBStatsType type, uint64_t value,
bool concurrent = false) {
auto& v = db_stats_[type]; auto& v = db_stats_[type];
if (concurrent) {
v.fetch_add(value, std::memory_order_relaxed);
} else {
v.store(v.load(std::memory_order_relaxed) + value, v.store(v.load(std::memory_order_relaxed) + value,
std::memory_order_relaxed); std::memory_order_relaxed);
} }
}
uint64_t GetDBStats(InternalDBStatsType type) { uint64_t GetDBStats(InternalDBStatsType type) {
return db_stats_[type].load(std::memory_order_relaxed); return db_stats_[type].load(std::memory_order_relaxed);

Loading…
Cancel
Save