support for concurrent adds to memtable

Summary:
This diff adds support for concurrent adds to the skiplist memtable
implementations.  Memory allocation is made thread-safe by the addition of
a spinlock, with small per-core buffers to avoid contention.  Concurrent
memtable writes are made via an additional method and don't impose a
performance overhead on the non-concurrent case, so parallelism can be
selected on a per-batch basis.

Write thread synchronization is an increasing bottleneck for higher levels
of concurrency, so this diff adds --enable_write_thread_adaptive_yield
(default off).  This feature causes threads joining a write batch
group to spin for a short time (default 100 usec) using sched_yield,
rather than going to sleep on a mutex.  If the timing of the yield calls
indicates that another thread has actually run during the yield then
spinning is avoided.  This option improves performance for concurrent
situations even without parallel adds, although it has the potential to
increase CPU usage (and the heuristic adaptation is not yet mature).

Parallel writes are not currently compatible with
inplace updates, update callbacks, or delete filtering.
Enable it with --allow_concurrent_memtable_write (and
--enable_write_thread_adaptive_yield).  Parallel memtable writes
are performance neutral when there is no actual parallelism, and in
my experiments (SSD server-class Linux and varying contention and key
sizes for fillrandom) they are always a performance win when there is
more than one thread.

Statistics are updated earlier in the write path, dropping the number
of DB mutex acquisitions from 2 to 1 for almost all cases.

This diff was motivated and inspired by Yahoo's cLSM work.  It is more
conservative than cLSM: RocksDB's write batch group leader role is
preserved (along with all of the existing flush and write throttling
logic) and concurrent writers are blocked until all memtable insertions
have completed and the sequence number has been advanced, to preserve
linearizability.

My test config is "db_bench -benchmarks=fillrandom -threads=$T
-batch_size=1 -memtablerep=skip_list -value_size=100 --num=1000000/$T
-level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999
-disable_auto_compactions --max_write_buffer_number=8
-max_background_flushes=8 --disable_wal --write_buffer_size=160000000
--block_size=16384 --allow_concurrent_memtable_write" on a two-socket
Xeon E5-2660 @ 2.2Ghz with lots of memory and an SSD hard drive.  With 1
thread I get ~440Kops/sec.  Peak performance for 1 socket (numactl
-N1) is slightly more than 1Mops/sec, at 16 threads.  Peak performance
across both sockets happens at 30 threads, and is ~900Kops/sec, although
with fewer threads there is less performance loss when the system has
background work.

Test Plan:
1. concurrent stress tests for InlineSkipList and DynamicBloom
2. make clean; make check
3. make clean; DISABLE_JEMALLOC=1 make valgrind_check; valgrind db_bench
4. make clean; COMPILE_WITH_TSAN=1 make all check; db_bench
5. make clean; COMPILE_WITH_ASAN=1 make all check; db_bench
6. make clean; OPT=-DROCKSDB_LITE make check
7. verify no perf regressions when disabled

Reviewers: igor, sdong

Reviewed By: sdong

Subscribers: MarkCallaghan, IslamAbdelRahman, anthony, yhchiang, rven, sdong, guyg8, kradhakrishnan, dhruba

Differential Revision: https://reviews.facebook.net/D50589
main
Nathan Bronson 10 years ago
parent 5b2587b5cb
commit 7d87f02799
  1. 1
      CMakeLists.txt
  2. 21
      db/column_family.cc
  3. 49
      db/column_family.h
  4. 20
      db/db_bench.cc
  5. 348
      db/db_impl.cc
  6. 78
      db/flush_scheduler.cc
  7. 30
      db/flush_scheduler.h
  8. 130
      db/inlineskiplist.h
  9. 114
      db/inlineskiplist_test.cc
  10. 16
      db/internal_stats.cc
  11. 30
      db/internal_stats.h
  12. 103
      db/memtable.cc
  13. 72
      db/memtable.h
  14. 28
      db/memtable_allocator.cc
  15. 9
      db/memtable_allocator.h
  16. 2
      db/repair.cc
  17. 82
      db/write_batch.cc
  18. 21
      db/write_batch_internal.h
  19. 2
      db/write_batch_test.cc
  20. 298
      db/write_thread.cc
  21. 167
      db/write_thread.h
  22. 18
      db/writebuffer.h
  23. 38
      include/rocksdb/memtablerep.h
  24. 41
      include/rocksdb/options.h
  25. 2
      include/rocksdb/statistics.h
  26. 16
      port/port_posix.cc
  27. 19
      port/port_posix.h
  28. 2
      port/win/port_win.cc
  29. 9
      port/win/port_win.h
  30. 1
      src.mk
  31. 3
      table/table_test.cc
  32. 2
      util/arena.cc
  33. 3
      util/arena.h
  34. 49
      util/concurrent_arena.cc
  35. 192
      util/concurrent_arena.h
  36. 17
      util/dynamic_bloom.cc
  37. 51
      util/dynamic_bloom.h
  38. 161
      util/dynamic_bloom_test.cc
  39. 39
      util/mutexlock.h
  40. 17
      util/options.cc
  41. 6
      util/skiplistrep.cc

@ -191,6 +191,7 @@ set(SOURCES
util/coding.cc
util/compaction_job_stats_impl.cc
util/comparator.cc
util/concurrent_arena.cc
util/crc32c.cc
util/db_info_dumper.cc
util/delete_scheduler_impl.cc

@ -110,6 +110,20 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
return Status::OK();
}
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
if (cf_options.inplace_update_support) {
return Status::InvalidArgument(
"In-place memtable updates (inplace_update_support) is not compatible "
"with concurrent writes (allow_concurrent_memtable_write)");
}
if (cf_options.filter_deletes) {
return Status::InvalidArgument(
"Delete filtering (filter_deletes) is not compatible with concurrent "
"memtable writes (allow_concurrent_memtable_writes)");
}
return Status::OK();
}
ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
const InternalKeyComparator* icmp,
const ColumnFamilyOptions& src) {
@ -916,13 +930,6 @@ ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
return &handle_;
}
void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
flush_scheduler_->ScheduleFlush(current_);
current_->mem()->MarkFlushScheduled();
}
}
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {

@ -19,12 +19,10 @@
#include "db/write_controller.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
#include "db/flush_scheduler.h"
#include "rocksdb/compaction_job_stats.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "util/instrumented_mutex.h"
#include "util/mutable_cf_options.h"
#include "util/thread_local.h"
@ -134,6 +132,9 @@ struct SuperVersion {
extern Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options);
extern Status CheckConcurrentWritesSupported(
const ColumnFamilyOptions& cf_options);
extern ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
const InternalKeyComparator* icmp,
const ColumnFamilyOptions& src);
@ -158,14 +159,16 @@ class ColumnFamilyData {
// thread-safe
const std::string& GetName() const { return name_; }
// Ref() can only be called whily holding a DB mutex or during a
// single-threaded write.
// Ref() can only be called from a context where the caller can guarantee
// that ColumnFamilyData is alive (while holding a non-zero ref already,
// holding a DB mutex, or as the leader in a write batch group).
void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
// will just decrease reference count to 0, but will not delete it. returns
// true if the ref count was decreased to zero. in that case, it can be
// deleted by the caller immediately, or later, by calling
// FreeDeadColumnFamilies()
// Unref() can only be called while holding a DB mutex
// Unref decreases the reference count, but does not handle deletion
// when the count goes to 0. If this method returns true then the
// caller should delete the instance immediately, or later, by calling
// FreeDeadColumnFamilies(). Unref() can only be called while holding
// a DB mutex, or during single-threaded recovery.
bool Unref() {
int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed);
assert(old_refs > 0);
@ -497,15 +500,18 @@ class ColumnFamilySet {
// memtables of different column families (specified by ID in the write batch)
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
public:
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set,
FlushScheduler* flush_scheduler)
: column_family_set_(column_family_set),
current_(nullptr),
flush_scheduler_(flush_scheduler) {}
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set)
: column_family_set_(column_family_set), current_(nullptr) {}
// Constructs a ColumnFamilyMemTablesImpl equivalent to one constructed
// with the arguments used to construct *orig.
explicit ColumnFamilyMemTablesImpl(ColumnFamilyMemTablesImpl* orig)
: column_family_set_(orig->column_family_set_), current_(nullptr) {}
// sets current_ to ColumnFamilyData with column_family_id
// returns false if column family doesn't exist
// REQUIRES: under a DB mutex OR from a write thread
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
// under a DB mutex OR from a write thread
bool Seek(uint32_t column_family_id) override;
// Returns log number of the selected column family
@ -513,20 +519,23 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
uint64_t GetLogNumber() const override;
// REQUIRES: Seek() called first
// REQUIRES: under a DB mutex OR from a write thread
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
// under a DB mutex OR from a write thread
virtual MemTable* GetMemTable() const override;
// Returns column family handle for the selected column family
// REQUIRES: under a DB mutex OR from a write thread
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
// under a DB mutex OR from a write thread
virtual ColumnFamilyHandle* GetColumnFamilyHandle() override;
// REQUIRES: under a DB mutex OR from a write thread
virtual void CheckMemtableFull() override;
// Cannot be called while another thread is calling Seek().
// REQUIRES: use this function of DBImpl::column_family_memtables_ should be
// under a DB mutex OR from a write thread
virtual ColumnFamilyData* current() { return current_; }
private:
ColumnFamilySet* column_family_set_;
ColumnFamilyData* current_;
FlushScheduler* flush_scheduler_;
ColumnFamilyHandleInternal handle_;
};

@ -646,6 +646,20 @@ DEFINE_uint64(delayed_write_rate, 8388608u,
"Limited bytes allowed to DB when soft_rate_limit or "
"level0_slowdown_writes_trigger triggers");
DEFINE_bool(allow_concurrent_memtable_write, false,
"Allow multi-writers to update mem tables in parallel.");
DEFINE_bool(enable_write_thread_adaptive_yield, false,
"Use a yielding spin loop for brief writer thread waits.");
DEFINE_uint64(
write_thread_max_yield_usec, 100,
"Maximum microseconds for enable_write_thread_adaptive_yield operation.");
DEFINE_uint64(write_thread_slow_yield_usec, 3,
"The threshold at which a slow yield is considered a signal that "
"other processes or threads want the core.");
DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
"When hard_rate_limit is set then this is the max time a put will"
" be stalled.");
@ -2552,6 +2566,12 @@ class Benchmark {
options.hard_pending_compaction_bytes_limit =
FLAGS_hard_pending_compaction_bytes_limit;
options.delayed_write_rate = FLAGS_delayed_write_rate;
options.allow_concurrent_memtable_write =
FLAGS_allow_concurrent_memtable_write;
options.enable_write_thread_adaptive_yield =
FLAGS_enable_write_thread_adaptive_yield;
options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
options.rate_limit_delay_max_milliseconds =
FLAGS_rate_limit_delay_max_milliseconds;
options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;

@ -247,6 +247,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
max_total_in_memory_state_(0),
is_snapshot_supported_(true),
write_buffer_(options.db_write_buffer_size),
write_thread_(options.enable_write_thread_adaptive_yield
? options.write_thread_max_yield_usec
: 0,
options.write_thread_slow_yield_usec),
write_controller_(options.delayed_write_rate),
last_batch_group_size_(0),
unscheduled_flushes_(0),
@ -282,8 +286,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_,
&write_controller_));
column_family_memtables_.reset(new ColumnFamilyMemTablesImpl(
versions_->GetColumnFamilySet(), &flush_scheduler_));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
DumpRocksDBBuildVersion(db_options_.info_log.get());
DumpDBFileSummary(db_options_, dbname_);
@ -1235,8 +1239,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// insert. We don't want to fail the whole write batch in that case --
// we just ignore the update.
// That's why we set ignore missing column families to true
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), true, log_number);
status =
WriteBatchInternal::InsertInto(&batch, column_family_memtables_.get(),
&flush_scheduler_, true, log_number);
MaybeIgnoreError(&status);
if (!status.ok()) {
@ -1257,7 +1262,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// DB and there is only a single thread operating on DB
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
cfd->Unref();
// If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families
@ -3623,6 +3628,9 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
*handle = nullptr;
s = CheckCompressionSupported(cf_options);
if (s.ok() && db_options_.allow_concurrent_memtable_write) {
s = CheckConcurrentWritesSupported(cf_options);
}
if (!s.ok()) {
return s;
}
@ -4071,7 +4079,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w.sync = write_options.sync;
w.disableWAL = write_options.disableWAL;
w.in_batch_group = false;
w.done = false;
w.has_callback = (callback != nullptr) ? true : false;
if (!write_options.disableWAL) {
@ -4081,12 +4088,35 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
StopWatch write_sw(env_, db_options_.statistics.get(), DB_WRITE);
write_thread_.JoinBatchGroup(&w);
if (w.done) {
// write was done by someone else, no need to grab mutex
if (w.state == WriteThread::STATE_PARALLEL_FOLLOWER) {
// we are a non-leader in a parallel group
PERF_TIMER_GUARD(write_memtable_time);
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
WriteBatchInternal::SetSequence(w.batch, w.sequence);
w.status = WriteBatchInternal::InsertInto(
w.batch, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*dont_filter_deletes*/, true /*concurrent_memtable_writes*/);
if (write_thread_.CompleteParallelWorker(&w)) {
// we're responsible for early exit
auto last_sequence = w.parallel_group->last_writer->sequence;
SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
versions_->SetLastSequence(last_sequence);
write_thread_.EarlyExitParallelGroup(&w);
}
assert(w.state == WriteThread::STATE_COMPLETED);
// STATE_COMPLETED conditional below handles exit
}
if (w.state == WriteThread::STATE_COMPLETED) {
// write is complete and leader has updated sequence
RecordTick(stats_, WRITE_DONE_BY_OTHER);
return w.status;
}
// else we are the leader of the write batch group
assert(w.state == WriteThread::STATE_GROUP_LEADER);
WriteContext context;
mutex_.Lock();
@ -4108,9 +4138,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
uint64_t max_total_wal_size = (db_options_.max_total_wal_size == 0)
? 4 * max_total_in_memory_state_
: db_options_.max_total_wal_size;
if (UNLIKELY(!single_column_family_mode_) &&
alive_log_files_.begin()->getting_flushed == false &&
total_log_size_ > max_total_wal_size) {
if (UNLIKELY(!single_column_family_mode_ &&
alive_log_files_.begin()->getting_flushed == false &&
total_log_size_ > max_total_wal_size)) {
uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
alive_log_files_.begin()->getting_flushed = true;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
@ -4175,8 +4205,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
status = ScheduleFlushes(&context);
}
if (UNLIKELY(status.ok()) &&
(write_controller_.IsStopped() || write_controller_.NeedsDelay())) {
if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
write_controller_.NeedsDelay()))) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_delay_time);
// We don't know size of curent batch so that we always use the size
@ -4194,9 +4224,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
if (status.ok()) {
last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader(
&w, &last_writer, &write_batch_group);
if (need_log_sync) {
while (logs_.front().getting_synced) {
log_sync_cv_.Wait();
@ -4226,154 +4253,204 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// At this point the mutex is unlocked
bool exit_completed_early = false;
last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader(
&w, &last_writer, &write_batch_group);
if (status.ok()) {
int total_count = 0;
uint64_t total_byte_size = 0;
for (auto b : write_batch_group) {
total_count += WriteBatchInternal::Count(b);
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(b));
}
// Rules for when we can update the memtable concurrently
// 1. supported by memtable
// 2. Puts are not okay if inplace_update_support
// 3. Deletes or SingleDeletes are not okay if filtering deletes
// (controlled by both batch and memtable setting)
// 4. Merges are not okay
//
// Rules 1..3 are enforced by checking the options
// during startup (CheckConcurrentWritesSupported), so if
// options.allow_concurrent_memtable_write is true then they can be
// assumed to be true. Rule 4 is checked for each batch. We could
// relax rules 2 and 3 if we could prevent write batches from referring
// more than once to a particular key.
bool parallel = db_options_.allow_concurrent_memtable_write &&
write_batch_group.size() > 1;
int total_count = 0;
uint64_t total_byte_size = 0;
for (auto b : write_batch_group) {
total_count += WriteBatchInternal::Count(b);
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(b));
parallel = parallel && !b->HasMerge();
}
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += total_count;
// Record statistics
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
PERF_TIMER_STOP(write_pre_and_post_process_time);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += total_count;
if (write_options.disableWAL) {
flush_on_destroy_ = true;
}
// Record statistics
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
PERF_TIMER_STOP(write_pre_and_post_process_time);
uint64_t log_size = 0;
if (!write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
if (write_options.disableWAL) {
flush_on_destroy_ = true;
WriteBatch* merged_batch = nullptr;
if (write_batch_group.size() == 1) {
merged_batch = write_batch_group[0];
} else {
// WAL needs all of the batches flattened into a single batch.
// We could avoid copying here with an iov-like AddRecord
// interface
merged_batch = &tmp_batch_;
for (auto b : write_batch_group) {
WriteBatchInternal::Append(merged_batch, b);
}
}
WriteBatchInternal::SetSequence(merged_batch, current_sequence);
assert(WriteBatchInternal::Count(merged_batch) == total_count);
assert(WriteBatchInternal::ByteSize(merged_batch) == total_byte_size);
Slice log_entry = WriteBatchInternal::Contents(merged_batch);
status = logs_.back().writer->AddRecord(log_entry);
total_log_size_ += log_entry.size();
alive_log_files_.back().AddSize(log_entry.size());
log_empty_ = false;
log_size = log_entry.size();
RecordTick(stats_, WAL_FILE_BYTES, log_size);
if (status.ok() && need_log_sync) {
RecordTick(stats_, WAL_FILE_SYNCED);
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
// It's safe to access logs_ with unlocked mutex_ here because:
// - we've set getting_synced=true for all logs,
// so other threads won't pop from logs_ while we're here,
// - only writer thread can push to logs_, and we're in
// writer thread, so no one will push to logs_,
// - as long as other threads don't modify it, it's safe to read
// from std::deque from multiple threads concurrently.
for (auto& log : logs_) {
status = log.writer->file()->Sync(db_options_.use_fsync);
if (!status.ok()) {
break;
}
}
if (status.ok() && need_log_dir_sync) {
// We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync,
// we can avoid the disk I/O in the write code path.
status = directories_.GetWalDir()->Fsync();
}
}
uint64_t log_size = 0;
if (!write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
if (merged_batch == &tmp_batch_) {
tmp_batch_.Clear();
}
}
if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);
WriteBatch* merged_batch = nullptr;
if (write_batch_group.size() == 1) {
merged_batch = write_batch_group[0];
} else {
// WAL needs all of the batches flattened into a single batch.
// We could avoid copying here with an iov-like AddRecord
// interface
merged_batch = &tmp_batch_;
for (auto b : write_batch_group) {
WriteBatchInternal::Append(merged_batch, b);
{
// Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully
// commit. That lets us release our leader status early in
// some cases.
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count);
if (!write_options.disableWAL) {
if (write_options.sync) {
stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1);
}
stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size);
}
WriteBatchInternal::SetSequence(merged_batch, current_sequence);
assert(WriteBatchInternal::Count(merged_batch) == total_count);
assert(WriteBatchInternal::ByteSize(merged_batch) == total_byte_size);
Slice log_entry = WriteBatchInternal::Contents(merged_batch);
status = logs_.back().writer->AddRecord(log_entry);
total_log_size_ += log_entry.size();
alive_log_files_.back().AddSize(log_entry.size());
log_empty_ = false;
log_size = log_entry.size();
RecordTick(stats_, WAL_FILE_BYTES, log_size);
if (status.ok() && need_log_sync) {
RecordTick(stats_, WAL_FILE_SYNCED);
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
// It's safe to access logs_ with unlocked mutex_ here because:
// - we've set getting_synced=true for all logs,
// so other threads won't pop from logs_ while we're here,
// - only writer thread can push to logs_, and we're in
// writer thread, so no one will push to logs_,
// - as long as other threads don't modify it, it's safe to read
// from std::deque from multiple threads concurrently.
for (auto& log : logs_) {
status = log.writer->file()->Sync(db_options_.use_fsync);
if (!status.ok()) {
break;
}
uint64_t for_other = write_batch_group.size() - 1;
if (for_other > 0) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, for_other);
if (!write_options.disableWAL) {
stats->AddDBStats(InternalStats::WRITE_WITH_WAL, for_other);
}
if (status.ok() && need_log_dir_sync) {
// We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync,
// we can avoid the disk I/O in the write code path.
status = directories_.GetWalDir()->Fsync();
}
}
if (merged_batch == &tmp_batch_) {
tmp_batch_.Clear();
}
}
if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);
if (!parallel) {
status = WriteBatchInternal::InsertInto(
write_batch_group, current_sequence, column_family_memtables_.get(),
write_options.ignore_missing_column_families,
/*log_number*/ 0, this, /*dont_filter_deletes*/ false);
// A non-OK status here indicates that the state implied by the
// WAL has diverged from the in-memory state. This could be
// because of a corrupt write_batch (very bad), or because the
// client specified an invalid column family and didn't specify
// ignore_missing_column_families.
//
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
if (!status.ok() && bg_error_.ok()) {
bg_error_ = status;
}
SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, false /*dont_filter_deletes*/);
} else {
WriteThread::ParallelGroup pg{};
pg.leader = &w;
pg.last_writer = last_writer;
pg.early_exit_allowed = !need_log_sync;
pg.running.store(write_batch_group.size(), std::memory_order_relaxed);
write_thread_.LaunchParallelFollowers(&pg, current_sequence);
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence);
WriteBatchInternal::SetSequence(w.batch, w.sequence);
w.status = WriteBatchInternal::InsertInto(
w.batch, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*dont_filter_deletes*/,
true /*concurrent_memtable_writes*/);
assert(last_writer->sequence == last_sequence);
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
exit_completed_early = !write_thread_.CompleteParallelWorker(&w);
status = w.status;
assert(status.ok() || !exit_completed_early);
}
PERF_TIMER_START(write_pre_and_post_process_time);
mutex_.Lock();
// internal stats
default_cf_internal_stats_->AddDBStats(InternalStats::BYTES_WRITTEN,
total_byte_size);
default_cf_internal_stats_->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN,
total_count);
if (!write_options.disableWAL) {
if (write_options.sync) {
default_cf_internal_stats_->AddDBStats(InternalStats::WAL_FILE_SYNCED,
1);
if (status.ok() && !exit_completed_early) {
SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
versions_->SetLastSequence(last_sequence);
if (!need_log_sync) {
write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status);
exit_completed_early = true;
}
default_cf_internal_stats_->AddDBStats(
InternalStats::WAL_FILE_BYTES, log_size);
}
if (status.ok()) {
versions_->SetLastSequence(last_sequence);
// A non-OK status here indicates that the state implied by the
// WAL has diverged from the in-memory state. This could be
// because of a corrupt write_batch (very bad), or because the
// client specified an invalid column family and didn't specify
// ignore_missing_column_families.
//
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
if (!status.ok() && bg_error_.ok()) {
bg_error_ = status;
}
} else {
// Operation failed. Make sure sure mutex is held for cleanup code below.
mutex_.Lock();
}
}
PERF_TIMER_START(write_pre_and_post_process_time);
if (db_options_.paranoid_checks && !status.ok() && !callback_failed &&
!status.IsBusy() && bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes
!status.IsBusy()) {
mutex_.Lock();
if (bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes
}
mutex_.Unlock();
}
mutex_.AssertHeld();
if (need_log_sync) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
mutex_.Unlock();
}
uint64_t writes_for_other = write_batch_group.size() - 1;
if (writes_for_other > 0) {
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
writes_for_other);
if (!write_options.disableWAL) {
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL,
writes_for_other);
}
if (!exit_completed_early) {
write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status);
}
mutex_.Unlock();
write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status);
return status;
}
@ -4411,7 +4488,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes) {
Status DBImpl::ScheduleFlushes(WriteContext* context) {
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
auto status = SwitchMemtable(cfd, context);
if (cfd->Unref()) {
delete cfd;
@ -5084,6 +5161,9 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
for (auto& cfd : column_families) {
s = CheckCompressionSupported(cfd.options);
if (s.ok() && db_options.allow_concurrent_memtable_write) {
s = CheckConcurrentWritesSupported(cfd.options);
}
if (!s.ok()) {
return s;
}

@ -13,51 +13,69 @@ namespace rocksdb {
void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
#ifndef NDEBUG
assert(column_families_set_.find(cfd) == column_families_set_.end());
column_families_set_.insert(cfd);
{
std::lock_guard<std::mutex> lock(checking_mutex_);
assert(checking_set_.count(cfd) == 0);
checking_set_.insert(cfd);
}
#endif // NDEBUG
cfd->Ref();
column_families_.push_back(cfd);
Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
while (!head_.compare_exchange_strong(
node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
// failing CAS updates the first param, so we are already set for
// retry. TakeNextColumnFamily won't happen until after another
// inter-thread synchronization, so we don't even need release
// semantics for this CAS
}
}
ColumnFamilyData* FlushScheduler::GetNextColumnFamily() {
ColumnFamilyData* cfd = nullptr;
while (column_families_.size() > 0) {
cfd = column_families_.front();
column_families_.pop_front();
if (cfd->IsDropped()) {
if (cfd->Unref()) {
delete cfd;
cfd = nullptr;
}
} else {
break;
ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
while (true) {
if (Empty()) {
return nullptr;
}
}
// dequeue the head
Node* node = head_.load(std::memory_order_relaxed);
head_.store(node->next, std::memory_order_relaxed);
ColumnFamilyData* cfd = node->column_family;
delete node;
#ifndef NDEBUG
if (cfd != nullptr) {
auto itr = column_families_set_.find(cfd);
assert(itr != column_families_set_.end());
column_families_set_.erase(itr);
}
{
auto iter = checking_set_.find(cfd);
assert(iter != checking_set_.end());
checking_set_.erase(iter);
}
#endif // NDEBUG
return cfd;
if (!cfd->IsDropped()) {
// success
return cfd;
}
// no longer relevant, retry
if (cfd->Unref()) {
delete cfd;
}
}
}
bool FlushScheduler::Empty() { return column_families_.empty(); }
bool FlushScheduler::Empty() {
auto rv = head_.load(std::memory_order_relaxed) == nullptr;
assert(rv == checking_set_.empty());
return rv;
}
void FlushScheduler::Clear() {
for (auto cfd : column_families_) {
#ifndef NDEBUG
auto itr = column_families_set_.find(cfd);
assert(itr != column_families_set_.end());
column_families_set_.erase(itr);
#endif // NDEBUG
ColumnFamilyData* cfd;
while ((cfd = TakeNextColumnFamily()) != nullptr) {
if (cfd->Unref()) {
delete cfd;
}
}
column_families_.clear();
assert(Empty());
}
} // namespace rocksdb

@ -6,34 +6,42 @@
#pragma once
#include <stdint.h>
#include <deque>
#include <atomic>
#include <mutex>
#include <set>
#include <vector>
namespace rocksdb {
class ColumnFamilyData;
// This class is thread-compatible. It's should only be accessed from single
// write thread (between BeginWrite() and EndWrite())
// Unless otherwise noted, all methods on FlushScheduler should be called
// only with the DB mutex held or from a single-threaded recovery context.
class FlushScheduler {
public:
FlushScheduler() = default;
~FlushScheduler() = default;
FlushScheduler() : head_(nullptr) {}
// May be called from multiple threads at once, but not concurrent with
// any other method calls on this instance
void ScheduleFlush(ColumnFamilyData* cfd);
// Returns Ref()-ed column family. Client needs to Unref()
// REQUIRES: db mutex is held (exception is single-threaded recovery)
ColumnFamilyData* GetNextColumnFamily();
// Removes and returns Ref()-ed column family. Client needs to Unref().
// Filters column families that have been dropped.
ColumnFamilyData* TakeNextColumnFamily();
bool Empty();
void Clear();
private:
std::deque<ColumnFamilyData*> column_families_;
struct Node {
ColumnFamilyData* column_family;
Node* next;
};
std::atomic<Node*> head_;
#ifndef NDEBUG
std::set<ColumnFamilyData*> column_families_set_;
std::mutex checking_mutex_;
std::set<ColumnFamilyData*> checking_set_;
#endif // NDEBUG
};

@ -20,10 +20,12 @@
//
// Thread safety -------------
//
// Writes require external synchronization, most likely a mutex. Reads
// require a guarantee that the InlineSkipList will not be destroyed while
// the read is in progress. Apart from that, reads progress without any
// internal locking or synchronization.
// Writes via Insert require external synchronization, most likely a mutex.
// InsertConcurrently can be safely called concurrently with reads and
// with other concurrent inserts. Reads require a guarantee that the
// InlineSkipList will not be destroyed while the read is in progress.
// Apart from that, reads progress without any internal locking or
// synchronization.
//
// Invariants:
//
@ -63,16 +65,21 @@ class InlineSkipList {
int32_t max_height = 12,
int32_t branching_factor = 4);
// Allocates a key and a skip-list node, returning a pointer to the
// key portion of the node.
// Allocates a key and a skip-list node, returning a pointer to the key
// portion of the node. This method is thread-safe if the allocator
// is thread-safe.
char* AllocateKey(size_t key_size);
// Inserts a key allocated by AllocateKey, after the actual key value
// has been filled in.
//
// REQUIRES: nothing that compares equal to key is currently in the list.
// REQUIRES: no concurrent calls to INSERT
void Insert(const char* key);
// Like Insert, but external synchronization is not required.
void InsertConcurrently(const char* key);
// Returns true iff an entry that compares equal to key is in the list.
bool Contains(const char* key) const;
@ -124,6 +131,8 @@ class InlineSkipList {
};
private:
enum MaxPossibleHeightEnum : uint16_t { kMaxPossibleHeight = 32 };
const uint16_t kMaxHeight_;
const uint16_t kBranching_;
const uint32_t kScaledInverseBranching_;
@ -139,11 +148,11 @@ class InlineSkipList {
std::atomic<int> max_height_; // Height of the entire list
// Used for optimizing sequential insert patterns. Tricky. prev_[i] for
// i up to max_height_ is the predecessor of prev_[0] and prev_height_
// is the height of prev_[0]. prev_[0] can only be equal to head before
// insertion, in which case max_height_ and prev_height_ are 1.
// i up to max_height_ - 1 (inclusive) is the predecessor of prev_[0].
// prev_height_ is the height of prev_[0]. prev_[0] can only be equal
// to head when max_height_ and prev_height_ are both 1.
Node** prev_;
int32_t prev_height_;
std::atomic<int32_t> prev_height_;
inline int GetMaxHeight() const {
return max_height_.load(std::memory_order_relaxed);
@ -175,6 +184,15 @@ class InlineSkipList {
// Return head_ if list is empty.
Node* FindLast() const;
// Traverses a single level of the list, setting *out_prev to the last
// node before the key and *out_next to the first node after. Assumes
// that the key is not present in the skip list. On entry, before should
// point to a node that is before the key, and after should point to
// a node that is after the key. after should be nullptr if a good after
// node isn't conveniently available.
void FindLevelSplice(const char* key, Node* before, Node* after, int level,
Node** out_prev, Node** out_next);
// No copying allowed
InlineSkipList(const InlineSkipList&);
InlineSkipList& operator=(const InlineSkipList&);
@ -223,6 +241,11 @@ struct InlineSkipList<Comparator>::Node {
next_[-n].store(x, std::memory_order_release);
}
bool CASNext(int n, Node* expected, Node* x) {
assert(n >= 0);
return next_[-n].compare_exchange_strong(expected, x);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
@ -305,11 +328,13 @@ int InlineSkipList<Comparator>::RandomHeight() {
// Increase height with probability 1 in kBranching
int height = 1;
while (height < kMaxHeight_ && rnd->Next() < kScaledInverseBranching_) {
while (height < kMaxHeight_ && height < kMaxPossibleHeight &&
rnd->Next() < kScaledInverseBranching_) {
height++;
}
assert(height > 0);
assert(height <= kMaxHeight_);
assert(height <= kMaxPossibleHeight);
return height;
}
@ -440,7 +465,7 @@ InlineSkipList<Comparator>::InlineSkipList(const Comparator cmp,
max_height_(1),
prev_height_(1) {
assert(max_height > 0 && kMaxHeight_ == static_cast<uint32_t>(max_height));
assert(branching_factor > 0 &&
assert(branching_factor > 1 &&
kBranching_ == static_cast<uint32_t>(branching_factor));
assert(kScaledInverseBranching_ > 0);
// Allocate the prev_ Node* array, directly from the passed-in allocator.
@ -485,16 +510,23 @@ InlineSkipList<Comparator>::AllocateNode(size_t key_size, int height) {
template <class Comparator>
void InlineSkipList<Comparator>::Insert(const char* key) {
// InsertConcurrently can't maintain the prev_ invariants when it needs
// to increase max_height_. In that case it sets prev_height_ to zero,
// letting us know that we should ignore it. A relaxed load suffices
// here because write thread synchronization separates Insert calls
// from InsertConcurrently calls.
auto prev_height = prev_height_.load(std::memory_order_relaxed);
// fast path for sequential insertion
if (!KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) &&
if (prev_height > 0 && !KeyIsAfterNode(key, prev_[0]->NoBarrier_Next(0)) &&
(prev_[0] == head_ || KeyIsAfterNode(key, prev_[0]))) {
assert(prev_[0] != head_ || (prev_height_ == 1 && GetMaxHeight() == 1));
assert(prev_[0] != head_ || (prev_height == 1 && GetMaxHeight() == 1));
// Outside of this method prev_[1..max_height_] is the predecessor
// of prev_[0], and prev_height_ refers to prev_[0]. Inside Insert
// prev_[0..max_height - 1] is the predecessor of key. Switch from
// the external state to the internal
for (int i = 1; i < prev_height_; i++) {
for (int i = 1; i < prev_height; i++) {
prev_[i] = prev_[0];
}
} else {
@ -534,7 +566,73 @@ void InlineSkipList<Comparator>::Insert(const char* key) {
prev_[i]->SetNext(i, x);
}
prev_[0] = x;
prev_height_ = height;
prev_height_.store(height, std::memory_order_relaxed);
}
template <class Comparator>
void InlineSkipList<Comparator>::FindLevelSplice(const char* key, Node* before,
Node* after, int level,
Node** out_prev,
Node** out_next) {
while (true) {
Node* next = before->Next(level);
assert(before == head_ || next == nullptr ||
KeyIsAfterNode(next->Key(), before));
assert(before == head_ || KeyIsAfterNode(key, before));
if (next == after || !KeyIsAfterNode(key, next)) {
// found it
*out_prev = before;
*out_next = next;
return;
}
before = next;
}
}
template <class Comparator>
void InlineSkipList<Comparator>::InsertConcurrently(const char* key) {
Node* x = reinterpret_cast<Node*>(const_cast<char*>(key)) - 1;
int height = x->UnstashHeight();
assert(height >= 1 && height <= kMaxHeight_);
int max_height = max_height_.load(std::memory_order_relaxed);
while (height > max_height) {
if (max_height_.compare_exchange_strong(max_height, height)) {
// successfully updated it
max_height = height;
// we dont have a lock-free algorithm for fixing up prev_, so just
// mark it invalid
prev_height_.store(0, std::memory_order_relaxed);
break;
}
// else retry, possibly exiting the loop because somebody else
// increased it
}
assert(max_height <= kMaxPossibleHeight);
Node* prev[kMaxPossibleHeight + 1];
Node* next[kMaxPossibleHeight + 1];
prev[max_height] = head_;
next[max_height] = nullptr;
for (int i = max_height - 1; i >= 0; --i) {
FindLevelSplice(key, prev[i + 1], next[i + 1], i, &prev[i], &next[i]);
}
for (int i = 0; i < height; ++i) {
while (true) {
x->NoBarrier_SetNext(i, next[i]);
if (prev[i]->CASNext(i, next[i], x)) {
// success
break;
}
// CAS failed, we need to recompute prev and next. It is unlikely
// to be helpful to try to use a different level as we redo the
// search, because it should be unlikely that lots of nodes have
// been inserted between prev[i] and next[i]. No point in using
// next[i] as the after hint, because we know it is stale.
FindLevelSplice(key, prev[i], nullptr, i, &prev[i], &next[i]);
}
}
}
template <class Comparator>

@ -10,7 +10,7 @@
#include "db/inlineskiplist.h"
#include <set>
#include "rocksdb/env.h"
#include "util/arena.h"
#include "util/concurrent_arena.h"
#include "util/hash.h"
#include "util/random.h"
#include "util/testharness.h"
@ -67,7 +67,7 @@ TEST_F(InlineSkipTest, InsertAndLookup) {
const int R = 5000;
Random rnd(1000);
std::set<Key> keys;
Arena arena;
ConcurrentArena arena;
TestComparator cmp;
InlineSkipList<TestComparator> list(cmp, &arena);
for (int i = 0; i < N; i++) {
@ -167,9 +167,10 @@ TEST_F(InlineSkipTest, InsertAndLookup) {
// check that it is either expected given the initial snapshot or has
// been concurrently added since the iterator started.
class ConcurrentTest {
private:
static const uint32_t K = 4;
public:
static const uint32_t K = 8;
private:
static uint64_t key(Key key) { return (key >> 40); }
static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; }
static uint64_t hash(Key key) { return key & 0xff; }
@ -222,7 +223,7 @@ class ConcurrentTest {
// Current state of the test
State current_;
Arena arena_;
ConcurrentArena arena_;
// InlineSkipList is not protected by mu_. We just use a single writer
// thread to modify it.
@ -231,7 +232,7 @@ class ConcurrentTest {
public:
ConcurrentTest() : list_(TestComparator(), &arena_) {}
// REQUIRES: External synchronization
// REQUIRES: No concurrent calls to WriteStep or ConcurrentWriteStep
void WriteStep(Random* rnd) {
const uint32_t k = rnd->Next() % K;
const int g = current_.Get(k) + 1;
@ -242,6 +243,17 @@ class ConcurrentTest {
current_.Set(k, g);
}
// REQUIRES: No concurrent calls for the same k
void ConcurrentWriteStep(uint32_t k) {
const int g = current_.Get(k) + 1;
const Key new_key = MakeKey(k, g);
char* buf = list_.AllocateKey(sizeof(Key));
memcpy(buf, &new_key, sizeof(Key));
list_.InsertConcurrently(buf);
ASSERT_EQ(g, current_.Get(k) + 1);
current_.Set(k, g);
}
void ReadStep(Random* rnd) {
// Remember the initial committed state of the skiplist.
State initial_state;
@ -304,7 +316,7 @@ const uint32_t ConcurrentTest::K;
// Simple test that does single-threaded testing of the ConcurrentTest
// scaffolding.
TEST_F(InlineSkipTest, ConcurrentWithoutThreads) {
TEST_F(InlineSkipTest, ConcurrentReadWithoutThreads) {
ConcurrentTest test;
Random rnd(test::RandomSeed());
for (int i = 0; i < 10000; i++) {
@ -313,16 +325,33 @@ TEST_F(InlineSkipTest, ConcurrentWithoutThreads) {
}
}
TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) {
ConcurrentTest test;
Random rnd(test::RandomSeed());
for (int i = 0; i < 10000; i++) {
test.ReadStep(&rnd);
uint32_t base = rnd.Next();
for (int j = 0; j < 4; ++j) {
test.ConcurrentWriteStep((base + j) % ConcurrentTest::K);
}
}
}
class TestState {
public:
ConcurrentTest t_;
int seed_;
std::atomic<bool> quit_flag_;
std::atomic<uint32_t> next_writer_;
enum ReaderState { STARTING, RUNNING, DONE };
explicit TestState(int s)
: seed_(s), quit_flag_(false), state_(STARTING), state_cv_(&mu_) {}
: seed_(s),
quit_flag_(false),
state_(STARTING),
pending_writers_(0),
state_cv_(&mu_) {}
void Wait(ReaderState s) {
mu_.Lock();
@ -339,9 +368,27 @@ class TestState {
mu_.Unlock();
}
void AdjustPendingWriters(int delta) {
mu_.Lock();
pending_writers_ += delta;
if (pending_writers_ == 0) {
state_cv_.Signal();
}
mu_.Unlock();
}
void WaitForPendingWriters() {
mu_.Lock();
while (pending_writers_ != 0) {
state_cv_.Wait();
}
mu_.Unlock();
}
private:
port::Mutex mu_;
ReaderState state_;
int pending_writers_;
port::CondVar state_cv_;
};
@ -357,7 +404,14 @@ static void ConcurrentReader(void* arg) {
state->Change(TestState::DONE);
}
static void RunConcurrent(int run) {
static void ConcurrentWriter(void* arg) {
TestState* state = reinterpret_cast<TestState*>(arg);
uint32_t k = state->next_writer_++ % ConcurrentTest::K;
state->t_.ConcurrentWriteStep(k);
state->AdjustPendingWriters(-1);
}
static void RunConcurrentRead(int run) {
const int seed = test::RandomSeed() + (run * 100);
Random rnd(seed);
const int N = 1000;
@ -369,7 +423,7 @@ static void RunConcurrent(int run) {
TestState state(seed + 1);
Env::Default()->Schedule(ConcurrentReader, &state);
state.Wait(TestState::RUNNING);
for (int k = 0; k < kSize; k++) {
for (int k = 0; k < kSize; ++k) {
state.t_.WriteStep(&rnd);
}
state.quit_flag_.store(true, std::memory_order_release);
@ -377,11 +431,41 @@ static void RunConcurrent(int run) {
}
}
TEST_F(InlineSkipTest, Concurrent1) { RunConcurrent(1); }
TEST_F(InlineSkipTest, Concurrent2) { RunConcurrent(2); }
TEST_F(InlineSkipTest, Concurrent3) { RunConcurrent(3); }
TEST_F(InlineSkipTest, Concurrent4) { RunConcurrent(4); }
TEST_F(InlineSkipTest, Concurrent5) { RunConcurrent(5); }
static void RunConcurrentInsert(int run, int write_parallelism = 4) {
Env::Default()->SetBackgroundThreads(1 + write_parallelism,
Env::Priority::LOW);
const int seed = test::RandomSeed() + (run * 100);
Random rnd(seed);
const int N = 1000;
const int kSize = 1000;
for (int i = 0; i < N; i++) {
if ((i % 100) == 0) {
fprintf(stderr, "Run %d of %d\n", i, N);
}
TestState state(seed + 1);
Env::Default()->Schedule(ConcurrentReader, &state);
state.Wait(TestState::RUNNING);
for (int k = 0; k < kSize; k += write_parallelism) {
state.next_writer_ = rnd.Next();
state.AdjustPendingWriters(write_parallelism);
for (int p = 0; p < write_parallelism; ++p) {
Env::Default()->Schedule(ConcurrentWriter, &state);
}
state.WaitForPendingWriters();
}
state.quit_flag_.store(true, std::memory_order_release);
state.Wait(TestState::DONE);
}
}
TEST_F(InlineSkipTest, ConcurrentRead1) { RunConcurrentRead(1); }
TEST_F(InlineSkipTest, ConcurrentRead2) { RunConcurrentRead(2); }
TEST_F(InlineSkipTest, ConcurrentRead3) { RunConcurrentRead(3); }
TEST_F(InlineSkipTest, ConcurrentRead4) { RunConcurrentRead(4); }
TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); }
TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); }
TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); }
TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); }
} // namespace rocksdb

@ -485,14 +485,14 @@ void InternalStats::DumpDBStats(std::string* value) {
seconds_up, interval_seconds_up);
value->append(buf);
// Cumulative
uint64_t user_bytes_written = db_stats_[InternalStats::BYTES_WRITTEN];
uint64_t num_keys_written = db_stats_[InternalStats::NUMBER_KEYS_WRITTEN];
uint64_t write_other = db_stats_[InternalStats::WRITE_DONE_BY_OTHER];
uint64_t write_self = db_stats_[InternalStats::WRITE_DONE_BY_SELF];
uint64_t wal_bytes = db_stats_[InternalStats::WAL_FILE_BYTES];
uint64_t wal_synced = db_stats_[InternalStats::WAL_FILE_SYNCED];
uint64_t write_with_wal = db_stats_[InternalStats::WRITE_WITH_WAL];
uint64_t write_stall_micros = db_stats_[InternalStats::WRITE_STALL_MICROS];
uint64_t user_bytes_written = GetDBStats(InternalStats::BYTES_WRITTEN);
uint64_t num_keys_written = GetDBStats(InternalStats::NUMBER_KEYS_WRITTEN);
uint64_t write_other = GetDBStats(InternalStats::WRITE_DONE_BY_OTHER);
uint64_t write_self = GetDBStats(InternalStats::WRITE_DONE_BY_SELF);
uint64_t wal_bytes = GetDBStats(InternalStats::WAL_FILE_BYTES);
uint64_t wal_synced = GetDBStats(InternalStats::WAL_FILE_SYNCED);
uint64_t write_with_wal = GetDBStats(InternalStats::WRITE_WITH_WAL);
uint64_t write_stall_micros = GetDBStats(InternalStats::WRITE_STALL_MICROS);
uint64_t compact_bytes_read = 0;
uint64_t compact_bytes_write = 0;
uint64_t compact_micros = 0;

@ -109,24 +109,16 @@ class InternalStats {
};
InternalStats(int num_levels, Env* env, ColumnFamilyData* cfd)
: db_stats_(INTERNAL_DB_STATS_ENUM_MAX),
cf_stats_value_(INTERNAL_CF_STATS_ENUM_MAX),
cf_stats_count_(INTERNAL_CF_STATS_ENUM_MAX),
: db_stats_{},
cf_stats_value_{},
cf_stats_count_{},
comp_stats_(num_levels),
file_read_latency_(num_levels),
bg_error_count_(0),
number_levels_(num_levels),
env_(env),
cfd_(cfd),
started_at_(env->NowMicros()) {
for (int i = 0; i< INTERNAL_DB_STATS_ENUM_MAX; ++i) {
db_stats_[i] = 0;
}
for (int i = 0; i< INTERNAL_CF_STATS_ENUM_MAX; ++i) {
cf_stats_value_[i] = 0;
cf_stats_count_[i] = 0;
}
}
started_at_(env->NowMicros()) {}
// Per level compaction stats. comp_stats_[level] stores the stats for
// compactions that produced data for the specified "level".
@ -239,7 +231,13 @@ class InternalStats {
}
void AddDBStats(InternalDBStatsType type, uint64_t value) {
db_stats_[type] += value;
auto& v = db_stats_[type];
v.store(v.load(std::memory_order_relaxed) + value,
std::memory_order_relaxed);
}
uint64_t GetDBStats(InternalDBStatsType type) {
return db_stats_[type].load(std::memory_order_relaxed);
}
HistogramImpl* GetFileReadHist(int level) {
@ -264,10 +262,10 @@ class InternalStats {
void DumpCFStats(std::string* value);
// Per-DB stats
std::vector<uint64_t> db_stats_;
std::atomic<uint64_t> db_stats_[INTERNAL_DB_STATS_ENUM_MAX];
// Per-ColumnFamily stats
std::vector<uint64_t> cf_stats_value_;
std::vector<uint64_t> cf_stats_count_;
uint64_t cf_stats_value_[INTERNAL_CF_STATS_ENUM_MAX];
uint64_t cf_stats_count_[INTERNAL_CF_STATS_ENUM_MAX];
// Per-ColumnFamily/level compaction stats
std::vector<CompactionStats> comp_stats_;
std::vector<HistogramImpl> file_read_latency_;

@ -60,7 +60,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
moptions_(ioptions, mutable_cf_options),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
arena_(moptions_.arena_block_size),
arena_(moptions_.arena_block_size, 0),
allocator_(&arena_, write_buffer),
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &allocator_, ioptions.prefix_extractor,
@ -78,12 +78,12 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
? moptions_.inplace_update_num_locks
: 0),
prefix_extractor_(ioptions.prefix_extractor),
should_flush_(ShouldFlushNow()),
flush_scheduled_(false),
flush_state_(FLUSH_NOT_REQUESTED),
env_(ioptions.env) {
// if should_flush_ == true without an entry inserted, something must have
// gone wrong already.
assert(!should_flush_);
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(
&allocator_,
@ -167,6 +167,17 @@ bool MemTable::ShouldFlushNow() const {
return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
}
void MemTable::UpdateFlushState() {
auto state = flush_state_.load(std::memory_order_relaxed);
if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
// ignore CAS failure, because that means somebody else requested
// a flush
flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
}
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
const char* prefix_len_key2) const {
// Internal keys are encoded as length-prefixed strings.
@ -335,7 +346,7 @@ uint64_t MemTable::ApproximateSize(const Slice& start_ikey,
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key, /* user key */
const Slice& value) {
const Slice& value, bool allow_concurrent) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
@ -349,7 +360,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
val_size;
char* buf = nullptr;
KeyHandle handle = table_->Allocate(encoded_len, &buf);
assert(buf != nullptr);
char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
p += key_size;
@ -359,32 +370,64 @@ void MemTable::Add(SequenceNumber s, ValueType type,
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
table_->Insert(handle);
num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1,
if (!allow_concurrent) {
table_->Insert(handle);
// this is a bit ugly, but is the way to avoid locked instructions
// when incrementing an atomic
num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len,
std::memory_order_relaxed);
data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len,
std::memory_order_relaxed);
if (type == kTypeDeletion) {
num_deletes_++;
}
if (type == kTypeDeletion) {
num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
std::memory_order_relaxed);
}
if (prefix_bloom_) {
assert(prefix_extractor_);
prefix_bloom_->Add(prefix_extractor_->Transform(key));
}
if (prefix_bloom_) {
assert(prefix_extractor_);
prefix_bloom_->Add(prefix_extractor_->Transform(key));
}
// The first sequence number inserted into the memtable
assert(first_seqno_ == 0 || s > first_seqno_);
if (first_seqno_ == 0) {
first_seqno_ = s;
// The first sequence number inserted into the memtable
assert(first_seqno_ == 0 || s > first_seqno_);
if (first_seqno_ == 0) {
first_seqno_.store(s, std::memory_order_relaxed);
if (earliest_seqno_ == kMaxSequenceNumber) {
earliest_seqno_ = first_seqno_;
if (earliest_seqno_ == kMaxSequenceNumber) {
earliest_seqno_.store(GetFirstSequenceNumber(),
std::memory_order_relaxed);
}
assert(first_seqno_.load() >= earliest_seqno_.load());
}
} else {
table_->InsertConcurrently(handle);
num_entries_.fetch_add(1, std::memory_order_relaxed);
data_size_.fetch_add(encoded_len, std::memory_order_relaxed);
if (type == kTypeDeletion) {
num_deletes_.fetch_add(1, std::memory_order_relaxed);
}
if (prefix_bloom_) {
assert(prefix_extractor_);
prefix_bloom_->AddConcurrently(prefix_extractor_->Transform(key));
}
// atomically update first_seqno_ and earliest_seqno_.
uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed);
while ((cur_seq_num == 0 || s < cur_seq_num) &&
!first_seqno_.compare_exchange_weak(cur_seq_num, s)) {
}
uint64_t cur_earliest_seqno =
earliest_seqno_.load(std::memory_order_relaxed);
while (
(cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) &&
!first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
}
assert(first_seqno_ >= earliest_seqno_);
}
should_flush_ = ShouldFlushNow();
UpdateFlushState();
}
// Callback from MemTable::Get()
@ -685,16 +728,16 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
}
}
RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
should_flush_ = ShouldFlushNow();
UpdateFlushState();
return true;
} else if (status == UpdateStatus::UPDATED) {
Add(seq, kTypeValue, key, Slice(str_value));
RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
should_flush_ = ShouldFlushNow();
UpdateFlushState();
return true;
} else if (status == UpdateStatus::UPDATE_FAILED) {
// No action required. Return.
should_flush_ = ShouldFlushNow();
UpdateFlushState();
return true;
}
}

@ -8,10 +8,11 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <string>
#include <memory>
#include <functional>
#include <atomic>
#include <deque>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "db/skiplist.h"
@ -21,8 +22,9 @@
#include "rocksdb/memtablerep.h"
#include "rocksdb/immutable_options.h"
#include "db/memtable_allocator.h"
#include "util/arena.h"
#include "util/concurrent_arena.h"
#include "util/dynamic_bloom.h"
#include "util/instrumented_mutex.h"
#include "util/mutable_cf_options.h"
namespace rocksdb {
@ -124,10 +126,17 @@ class MemTable {
// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldScheduleFlush() const {
return flush_scheduled_ == false && should_flush_;
return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
}
void MarkFlushScheduled() { flush_scheduled_ = true; }
// Returns true if a flush should be scheduled and the caller should
// be the one to schedule it
bool MarkFlushScheduled() {
auto before = FLUSH_REQUESTED;
return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
// Return an iterator that yields the contents of the memtable.
//
@ -147,11 +156,10 @@ class MemTable {
// specified sequence number and with the specified type.
// Typically value will be empty if type==kTypeDeletion.
//
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable.
void Add(SequenceNumber seq, ValueType type,
const Slice& key,
const Slice& value);
// REQUIRES: if allow_concurrent = false, external synchronization to prevent
// simultaneous operations on the same MemTable.
void Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value, bool allow_concurrent = false);
// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
@ -220,7 +228,9 @@ class MemTable {
// Get total number of deletes in the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
uint64_t num_deletes() const { return num_deletes_; }
uint64_t num_deletes() const {
return num_deletes_.load(std::memory_order_relaxed);
}
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }
@ -234,7 +244,9 @@ class MemTable {
// into the memtable.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
SequenceNumber GetFirstSequenceNumber() { return first_seqno_; }
SequenceNumber GetFirstSequenceNumber() {
return first_seqno_.load(std::memory_order_relaxed);
}
// Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into this
@ -243,7 +255,9 @@ class MemTable {
//
// If the earliest sequence number could not be determined,
// kMaxSequenceNumber will be returned.
SequenceNumber GetEarliestSequenceNumber() { return earliest_seqno_; }
SequenceNumber GetEarliestSequenceNumber() {
return earliest_seqno_.load(std::memory_order_relaxed);
}
// Returns the next active logfile number when this memtable is about to
// be flushed to storage
@ -290,8 +304,7 @@ class MemTable {
const MemTableOptions* GetMemTableOptions() const { return &moptions_; }
private:
// Dynamically check if we can add more incoming entries
bool ShouldFlushNow() const;
enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
friend class MemTableIterator;
friend class MemTableBackwardIterator;
@ -301,14 +314,14 @@ class MemTable {
const MemTableOptions moptions_;
int refs_;
const size_t kArenaBlockSize;
Arena arena_;
ConcurrentArena arena_;
MemTableAllocator allocator_;
unique_ptr<MemTableRep> table_;
// Total data size of all data inserted
std::atomic<uint64_t> data_size_;
std::atomic<uint64_t> num_entries_;
uint64_t num_deletes_;
std::atomic<uint64_t> num_deletes_;
// These are used to manage memtable flushes to storage
bool flush_in_progress_; // started the flush
@ -320,11 +333,11 @@ class MemTable {
VersionEdit edit_;
// The sequence number of the kv that was inserted first
SequenceNumber first_seqno_;
std::atomic<SequenceNumber> first_seqno_;
// The db sequence number at the time of creation or kMaxSequenceNumber
// if not set.
SequenceNumber earliest_seqno_;
std::atomic<SequenceNumber> earliest_seqno_;
// The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_;
@ -332,19 +345,22 @@ class MemTable {
// rw locks for inplace updates
std::vector<port::RWMutex> locks_;
// No copying allowed
MemTable(const MemTable&);
void operator=(const MemTable&);
const SliceTransform* const prefix_extractor_;
std::unique_ptr<DynamicBloom> prefix_bloom_;
// a flag indicating if a memtable has met the criteria to flush
bool should_flush_;
std::atomic<FlushStateEnum> flush_state_;
// a flag indicating if flush has been scheduled
bool flush_scheduled_;
Env* env_;
// Returns a heuristic flush decision
bool ShouldFlushNow() const;
// Updates flush_state_ using ShouldFlushNow()
void UpdateFlushState();
// No copying allowed
MemTable(const MemTable&);
MemTable& operator=(const MemTable&);
};
extern const char* EncodeKey(std::string* scratch, const Slice& target);

@ -7,46 +7,42 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <assert.h>
#include "db/memtable_allocator.h"
#include <assert.h>
#include "db/writebuffer.h"
#include "util/arena.h"
namespace rocksdb {
MemTableAllocator::MemTableAllocator(Arena* arena, WriteBuffer* write_buffer)
: arena_(arena), write_buffer_(write_buffer), bytes_allocated_(0) {
}
MemTableAllocator::MemTableAllocator(Allocator* allocator,
WriteBuffer* write_buffer)
: allocator_(allocator), write_buffer_(write_buffer), bytes_allocated_(0) {}
MemTableAllocator::~MemTableAllocator() {
DoneAllocating();
}
MemTableAllocator::~MemTableAllocator() { DoneAllocating(); }
char* MemTableAllocator::Allocate(size_t bytes) {
assert(write_buffer_ != nullptr);
bytes_allocated_ += bytes;
bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed);
write_buffer_->ReserveMem(bytes);
return arena_->Allocate(bytes);
return allocator_->Allocate(bytes);
}
char* MemTableAllocator::AllocateAligned(size_t bytes, size_t huge_page_size,
Logger* logger) {
assert(write_buffer_ != nullptr);
bytes_allocated_ += bytes;
bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed);
write_buffer_->ReserveMem(bytes);
return arena_->AllocateAligned(bytes, huge_page_size, logger);
return allocator_->AllocateAligned(bytes, huge_page_size, logger);
}
void MemTableAllocator::DoneAllocating() {
if (write_buffer_ != nullptr) {
write_buffer_->FreeMem(bytes_allocated_);
write_buffer_->FreeMem(bytes_allocated_.load(std::memory_order_relaxed));
write_buffer_ = nullptr;
}
}
size_t MemTableAllocator::BlockSize() const {
return arena_->BlockSize();
}
size_t MemTableAllocator::BlockSize() const { return allocator_->BlockSize(); }
} // namespace rocksdb

@ -11,17 +11,18 @@
// to WriteBuffer so we can track and enforce overall write buffer limits.
#pragma once
#include <atomic>
#include "util/allocator.h"
namespace rocksdb {
class Arena;
class Logger;
class WriteBuffer;
class MemTableAllocator : public Allocator {
public:
explicit MemTableAllocator(Arena* arena, WriteBuffer* write_buffer);
explicit MemTableAllocator(Allocator* allocator, WriteBuffer* write_buffer);
~MemTableAllocator();
// Allocator interface
@ -35,9 +36,9 @@ class MemTableAllocator : public Allocator {
void DoneAllocating();
private:
Arena* arena_;
Allocator* allocator_;
WriteBuffer* write_buffer_;
size_t bytes_allocated_;
std::atomic<size_t> bytes_allocated_;
// No copying allowed
MemTableAllocator(const MemTableAllocator&);

@ -270,7 +270,7 @@ class Repairer {
continue;
}
WriteBatchInternal::SetContents(&batch, record);
status = WriteBatchInternal::InsertInto(&batch, cf_mems_default);
status = WriteBatchInternal::InsertInto(&batch, cf_mems_default, nullptr);
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {

@ -28,10 +28,12 @@
#include <stack>
#include <stdexcept>
#include <vector>
#include "db/column_family.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/flush_scheduler.h"
#include "db/memtable.h"
#include "db/snapshot_impl.h"
#include "db/write_batch_internal.h"
@ -536,35 +538,42 @@ Status WriteBatch::RollbackToSavePoint() {
}
namespace {
// This class can *only* be used from a single-threaded write thread, because it
// calls ColumnFamilyMemTablesImpl::Seek()
class MemTableInserter : public WriteBatch::Handler {
public:
SequenceNumber sequence_;
ColumnFamilyMemTables* cf_mems_;
bool ignore_missing_column_families_;
uint64_t log_number_;
ColumnFamilyMemTables* const cf_mems_;
FlushScheduler* const flush_scheduler_;
const bool ignore_missing_column_families_;
const uint64_t log_number_;
DBImpl* db_;
const bool dont_filter_deletes_;
const bool concurrent_memtable_writes_;
// cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t log_number,
DB* db, const bool dont_filter_deletes)
DB* db, const bool dont_filter_deletes,
bool concurrent_memtable_writes)
: sequence_(sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
ignore_missing_column_families_(ignore_missing_column_families),
log_number_(log_number),
db_(reinterpret_cast<DBImpl*>(db)),
dont_filter_deletes_(dont_filter_deletes) {
assert(cf_mems);
dont_filter_deletes_(dont_filter_deletes),
concurrent_memtable_writes_(concurrent_memtable_writes) {
assert(cf_mems_);
if (!dont_filter_deletes_) {
assert(db_);
}
}
bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
// We are only allowed to call this from a single-threaded write thread
// (or while holding DB mutex)
// If we are in a concurrent mode, it is the caller's responsibility
// to clone the original ColumnFamilyMemTables so that each thread
// has its own instance. Otherwise, it must be guaranteed that there
// is no concurrent access
bool found = cf_mems_->Seek(column_family_id);
if (!found) {
if (ignore_missing_column_families_) {
@ -598,11 +607,13 @@ class MemTableInserter : public WriteBatch::Handler {
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!moptions->inplace_update_support) {
mem->Add(sequence_, kTypeValue, key, value);
mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_);
} else if (moptions->inplace_callback == nullptr) {
assert(!concurrent_memtable_writes_);
mem->Update(sequence_, key, value);
RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED);
} else {
assert(!concurrent_memtable_writes_);
if (mem->UpdateCallback(sequence_, key, value)) {
} else {
// key not found in memtable. Do sst get, update, add
@ -640,7 +651,7 @@ class MemTableInserter : public WriteBatch::Handler {
// sequence number. Even if the update eventually fails and does not result
// in memtable add/update.
sequence_++;
cf_mems_->CheckMemtableFull();
CheckMemtableFull();
return Status::OK();
}
@ -654,6 +665,7 @@ class MemTableInserter : public WriteBatch::Handler {
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!dont_filter_deletes_ && moptions->filter_deletes) {
assert(!concurrent_memtable_writes_);
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
@ -668,9 +680,9 @@ class MemTableInserter : public WriteBatch::Handler {
return Status::OK();
}
}
mem->Add(sequence_, delete_type, key, Slice());
mem->Add(sequence_, delete_type, key, Slice(), concurrent_memtable_writes_);
sequence_++;
cf_mems_->CheckMemtableFull();
CheckMemtableFull();
return Status::OK();
}
@ -686,6 +698,7 @@ class MemTableInserter : public WriteBatch::Handler {
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
assert(!concurrent_memtable_writes_);
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
@ -760,24 +773,38 @@ class MemTableInserter : public WriteBatch::Handler {
}
sequence_++;
cf_mems_->CheckMemtableFull();
CheckMemtableFull();
return Status::OK();
}
void CheckMemtableFull() {
if (flush_scheduler_ != nullptr) {
auto* cfd = cf_mems_->current();
assert(cfd != nullptr);
if (cfd->mem()->ShouldScheduleFlush() &&
cfd->mem()->MarkFlushScheduled()) {
// MarkFlushScheduled only returns true if we are the one that
// should take action, so no need to dedup further
flush_scheduler_->ScheduleFlush(cfd);
}
}
}
};
} // namespace
// This function can only be called in these conditions:
// 1) During Recovery()
// 2) During Write(), in a single-threaded write thread
// 3) During Write(), in a concurrent context where memtables has been cloned
// The reason is that it calls memtables->Seek(), which has a stateful cache
Status WriteBatchInternal::InsertInto(const autovector<WriteBatch*>& batches,
SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
bool ignore_missing_column_families,
uint64_t log_number, DB* db,
const bool dont_filter_deletes) {
MemTableInserter inserter(sequence, memtables, ignore_missing_column_families,
log_number, db, dont_filter_deletes);
Status WriteBatchInternal::InsertInto(
const autovector<WriteBatch*>& batches, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
const bool dont_filter_deletes, bool concurrent_memtable_writes) {
MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, log_number, db,
dont_filter_deletes, concurrent_memtable_writes);
Status rv = Status::OK();
for (size_t i = 0; i < batches.size() && rv.ok(); ++i) {
rv = batches[i]->Iterate(&inserter);
@ -787,12 +814,15 @@ Status WriteBatchInternal::InsertInto(const autovector<WriteBatch*>& batches,
Status WriteBatchInternal::InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t log_number, DB* db,
const bool dont_filter_deletes) {
const bool dont_filter_deletes,
bool concurrent_memtable_writes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables,
ignore_missing_column_families, log_number, db,
dont_filter_deletes);
flush_scheduler, ignore_missing_column_families,
log_number, db, dont_filter_deletes,
concurrent_memtable_writes);
return batch->Iterate(&inserter);
}

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <vector>
#include "rocksdb/types.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/db.h"
@ -17,6 +18,8 @@
namespace rocksdb {
class MemTable;
class FlushScheduler;
class ColumnFamilyData;
class ColumnFamilyMemTables {
public:
@ -28,7 +31,7 @@ class ColumnFamilyMemTables {
virtual uint64_t GetLogNumber() const = 0;
virtual MemTable* GetMemTable() const = 0;
virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;
virtual void CheckMemtableFull() = 0;
virtual ColumnFamilyData* current() { return nullptr; }
};
class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
@ -50,8 +53,6 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; }
void CheckMemtableFull() override {}
private:
bool ok_;
MemTable* mem_;
@ -127,19 +128,29 @@ class WriteBatchInternal {
//
// If log_number is non-zero, the memtable will be updated only if
// memtables->GetLogNumber() >= log_number.
//
// If flush_scheduler is non-null, it will be invoked if the memtable
// should be flushed.
//
// Under concurrent use, the caller is responsible for making sure that
// the memtables object itself is thread-local.
static Status InsertInto(const autovector<WriteBatch*>& batches,
SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
const bool dont_filter_deletes = true);
const bool dont_filter_deletes = true,
bool concurrent_memtable_writes = false);
// Convenience form of InsertInto when you have only one batch
static Status InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
const bool dont_filter_deletes = true);
const bool dont_filter_deletes = true,
bool concurrent_memtable_writes = false);
static void Append(WriteBatch* dst, const WriteBatch* src);

@ -37,7 +37,7 @@ static std::string PrintContents(WriteBatch* b) {
mem->Ref();
std::string state;
ColumnFamilyMemTablesDefault cf_mems_default(mem);
Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default);
Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default, nullptr);
int count = 0;
int put_count = 0;
int delete_count = 0;

@ -4,34 +4,197 @@
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/write_thread.h"
#include <chrono>
#include <limits>
#include <thread>
#include "db/column_family.h"
#include "port/port.h"
#include "util/sync_point.h"
namespace rocksdb {
void WriteThread::Await(Writer* w) {
std::unique_lock<std::mutex> guard(w->JoinMutex());
w->JoinCV().wait(guard, [w] { return w->joined; });
WriteThread::WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec)
: max_yield_usec_(max_yield_usec),
slow_yield_usec_(slow_yield_usec),
newest_writer_(nullptr) {}
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
// We're going to block. Lazily create the mutex. We guarantee
// propagation of this construction to the waker via the
// STATE_LOCKED_WAITING state. The waker won't try to touch the mutex
// or the condvar unless they CAS away the STATE_LOCKED_WAITING that
// we install below.
w->CreateMutex();
auto state = w->state.load(std::memory_order_acquire);
assert(state != STATE_LOCKED_WAITING);
if ((state & goal_mask) == 0 &&
w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) {
// we have permission (and an obligation) to use StateMutex
std::unique_lock<std::mutex> guard(w->StateMutex());
w->StateCV().wait(guard, [w] {
return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
});
state = w->state.load(std::memory_order_relaxed);
}
// else tricky. Goal is met or CAS failed. In the latter case the waker
// must have changed the state, and compare_exchange_strong has updated
// our local variable with the new one. At the moment WriteThread never
// waits for a transition across intermediate states, so we know that
// since a state change has occurred the goal must have been met.
assert((state & goal_mask) != 0);
return state;
}
uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
AdaptationContext* ctx) {
uint8_t state;
// On a modern Xeon each loop takes about 7 nanoseconds (most of which
// is the effect of the pause instruction), so 200 iterations is a bit
// more than a microsecond. This is long enough that waits longer than
// this can amortize the cost of accessing the clock and yielding.
for (uint32_t tries = 0; tries < 200; ++tries) {
state = w->state.load(std::memory_order_acquire);
if ((state & goal_mask) != 0) {
return state;
}
port::AsmVolatilePause();
}
// If we're only going to end up waiting a short period of time,
// it can be a lot more efficient to call std::this_thread::yield()
// in a loop than to block in StateMutex(). For reference, on my 4.0
// SELinux test server with support for syscall auditing enabled, the
// minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is
// 2.7 usec, and the average is more like 10 usec. That can be a big
// drag on RockDB's single-writer design. Of course, spinning is a
// bad idea if other threads are waiting to run or if we're going to
// wait for a long time. How do we decide?
//
// We break waiting into 3 categories: short-uncontended,
// short-contended, and long. If we had an oracle, then we would always
// spin for short-uncontended, always block for long, and our choice for
// short-contended might depend on whether we were trying to optimize
// RocksDB throughput or avoid being greedy with system resources.
//
// Bucketing into short or long is easy by measuring elapsed time.
// Differentiating short-uncontended from short-contended is a bit
// trickier, but not too bad. We could look for involuntary context
// switches using getrusage(RUSAGE_THREAD, ..), but it's less work
// (portability code and CPU) to just look for yield calls that take
// longer than we expect. sched_yield() doesn't actually result in any
// context switch overhead if there are no other runnable processes
// on the current core, in which case it usually takes less than
// a microsecond.
//
// There are two primary tunables here: the threshold between "short"
// and "long" waits, and the threshold at which we suspect that a yield
// is slow enough to indicate we should probably block. If these
// thresholds are chosen well then CPU-bound workloads that don't
// have more threads than cores will experience few context switches
// (voluntary or involuntary), and the total number of context switches
// (voluntary and involuntary) will not be dramatically larger (maybe
// 2x) than the number of voluntary context switches that occur when
// --max_yield_wait_micros=0.
//
// There's another constant, which is the number of slow yields we will
// tolerate before reversing our previous decision. Solitary slow
// yields are pretty common (low-priority small jobs ready to run),
// so this should be at least 2. We set this conservatively to 3 so
// that we can also immediately schedule a ctx adaptation, rather than
// waiting for the next update_ctx.
const size_t kMaxSlowYieldsWhileSpinning = 3;
bool update_ctx = false;
bool would_spin_again = false;
if (max_yield_usec_ > 0) {
update_ctx = Random::GetTLSInstance()->OneIn(256);
if (update_ctx || ctx->value.load(std::memory_order_relaxed) >= 0) {
// we're updating the adaptation statistics, or spinning has >
// 50% chance of being shorter than max_yield_usec_ and causing no
// involuntary context switches
auto spin_begin = std::chrono::steady_clock::now();
// this variable doesn't include the final yield (if any) that
// causes the goal to be met
size_t slow_yield_count = 0;
auto iter_begin = spin_begin;
while ((iter_begin - spin_begin) <=
std::chrono::microseconds(max_yield_usec_)) {
std::this_thread::yield();
state = w->state.load(std::memory_order_acquire);
if ((state & goal_mask) != 0) {
// success
would_spin_again = true;
break;
}
auto now = std::chrono::steady_clock::now();
if (now == iter_begin ||
now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
// conservatively count it as a slow yield if our clock isn't
// accurate enough to measure the yield duration
++slow_yield_count;
if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
// Not just one ivcsw, but several. Immediately update ctx
// and fall back to blocking
update_ctx = true;
break;
}
}
iter_begin = now;
}
}
}
if ((state & goal_mask) == 0) {
state = BlockingAwaitState(w, goal_mask);
}
if (update_ctx) {
auto v = ctx->value.load(std::memory_order_relaxed);
// fixed point exponential decay with decay constant 1/1024, with +1
// and -1 scaled to avoid overflow for int32_t
v = v + (v / 1024) + (would_spin_again ? 1 : -1) * 16384;
ctx->value.store(v, std::memory_order_relaxed);
}
assert((state & goal_mask) != 0);
return state;
}
void WriteThread::MarkJoined(Writer* w) {
std::lock_guard<std::mutex> guard(w->JoinMutex());
assert(!w->joined);
w->joined = true;
w->JoinCV().notify_one();
void WriteThread::SetState(Writer* w, uint8_t new_state) {
auto state = w->state.load(std::memory_order_acquire);
if (state == STATE_LOCKED_WAITING ||
!w->state.compare_exchange_strong(state, new_state)) {
assert(state == STATE_LOCKED_WAITING);
std::lock_guard<std::mutex> guard(w->StateMutex());
assert(w->state.load(std::memory_order_relaxed) != new_state);
w->state.store(new_state, std::memory_order_relaxed);
w->StateCV().notify_one();
}
}
void WriteThread::LinkOne(Writer* w, bool* wait_needed) {
assert(!w->joined && !w->done);
void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) {
assert(w->state == STATE_INIT);
Writer* writers = newest_writer_.load(std::memory_order_relaxed);
while (true) {
w->link_older = writers;
if (writers != nullptr) {
w->CreateMutex();
}
if (newest_writer_.compare_exchange_strong(writers, w)) {
// Success.
*wait_needed = (writers != nullptr);
if (writers == nullptr) {
// this isn't part of the WriteThread machinery, but helps with
// debugging and is checked by an assert in WriteImpl
w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed);
}
*linked_as_leader = (writers == nullptr);
return;
}
}
@ -50,11 +213,15 @@ void WriteThread::CreateMissingNewerLinks(Writer* head) {
}
void WriteThread::JoinBatchGroup(Writer* w) {
static AdaptationContext ctx{"JoinBatchGroup"};
assert(w->batch != nullptr);
bool wait_needed;
LinkOne(w, &wait_needed);
if (wait_needed) {
Await(w);
bool linked_as_leader;
LinkOne(w, &linked_as_leader);
if (!linked_as_leader) {
AwaitState(w,
STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED,
&ctx);
}
}
@ -88,7 +255,7 @@ size_t WriteThread::EnterAsBatchGroupLeader(
// This is safe regardless of any db mutex status of the caller. Previous
// calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
// (they emptied the list and then we added ourself as leader) or had to
// explicitly wake up us (the list was non-empty when we added ourself,
// explicitly wake us up (the list was non-empty when we added ourself,
// so we have already received our MarkJoined).
CreateMissingNewerLinks(newest_writer);
@ -135,6 +302,73 @@ size_t WriteThread::EnterAsBatchGroupLeader(
return size;
}
void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
SequenceNumber sequence) {
// EnterAsBatchGroupLeader already created the links from leader to
// newer writers in the group
pg->leader->parallel_group = pg;
Writer* w = pg->leader;
w->sequence = sequence;
while (w != pg->last_writer) {
sequence += WriteBatchInternal::Count(w->batch);
w = w->link_newer;
w->sequence = sequence;
w->parallel_group = pg;
SetState(w, STATE_PARALLEL_FOLLOWER);
}
}
bool WriteThread::CompleteParallelWorker(Writer* w) {
static AdaptationContext ctx{"CompleteParallelWorker"};
auto* pg = w->parallel_group;
if (!w->status.ok()) {
std::lock_guard<std::mutex> guard(w->StateMutex());
pg->status = w->status;
}
auto leader = pg->leader;
auto early_exit_allowed = pg->early_exit_allowed;
if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) {
// we're not the last one
AwaitState(w, STATE_COMPLETED, &ctx);
// Caller only needs to perform exit duties if early exit doesn't
// apply and this is the leader. Can't touch pg here. Whoever set
// our state to STATE_COMPLETED copied pg->status to w.status for us.
return w == leader && !(early_exit_allowed && w->status.ok());
}
// else we're the last parallel worker
if (w == leader || (early_exit_allowed && pg->status.ok())) {
// this thread should perform exit duties
w->status = pg->status;
return true;
} else {
// We're the last parallel follower but early commit is not
// applicable. Wake up the leader and then wait for it to exit.
assert(w->state == STATE_PARALLEL_FOLLOWER);
SetState(leader, STATE_COMPLETED);
AwaitState(w, STATE_COMPLETED, &ctx);
return false;
}
}
void WriteThread::EarlyExitParallelGroup(Writer* w) {
auto* pg = w->parallel_group;
assert(w->state == STATE_PARALLEL_FOLLOWER);
assert(pg->status.ok());
ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status);
assert(w->state == STATE_COMPLETED);
assert(w->status.ok());
SetState(pg->leader, STATE_COMPLETED);
}
void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
Status status) {
assert(leader->link_older == nullptr);
@ -166,31 +400,35 @@ void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
// nullptr when they enqueued (we were definitely enqueued before them
// and are still in the list). That means leader handoff occurs when
// we call MarkJoined
MarkJoined(last_writer->link_newer);
SetState(last_writer->link_newer, STATE_GROUP_LEADER);
}
// else nobody else was waiting, although there might already be a new
// leader now
while (last_writer != leader) {
last_writer->status = status;
last_writer->done = true;
// We must read link_older before calling MarkJoined, because as
// soon as it is marked the other thread's AwaitJoined may return
// and deallocate the Writer.
// we need to read link_older before calling SetState, because as soon
// as it is marked committed the other thread's Await may return and
// deallocate the Writer.
auto next = last_writer->link_older;
MarkJoined(last_writer);
SetState(last_writer, STATE_COMPLETED);
last_writer = next;
}
}
void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
static AdaptationContext ctx{"EnterUnbatched"};
static std::atomic<uint32_t> adaptation_history{};
assert(w->batch == nullptr);
bool wait_needed;
LinkOne(w, &wait_needed);
if (wait_needed) {
bool linked_as_leader;
LinkOne(w, &linked_as_leader);
if (!linked_as_leader) {
mu->Unlock();
TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
Await(w);
AwaitState(w, STATE_GROUP_LEADER, &ctx);
mu->Lock();
}
}

@ -8,11 +8,13 @@
#include <assert.h>
#include <stdint.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <vector>
#include <type_traits>
#include "rocksdb/status.h"
#include "db/write_batch_internal.h"
#include "rocksdb/status.h"
#include "util/autovector.h"
#include "util/instrumented_mutex.h"
@ -20,19 +22,69 @@ namespace rocksdb {
class WriteThread {
public:
enum State : uint8_t {
// The initial state of a writer. This is a Writer that is
// waiting in JoinBatchGroup. This state can be left when another
// thread informs the waiter that it has become a group leader
// (-> STATE_GROUP_LEADER), when a leader that has chosen to be
// non-parallel informs a follower that its writes have been committed
// (-> STATE_COMPLETED), or when a leader that has chosen to perform
// updates in parallel and needs this Writer to apply its batch (->
// STATE_PARALLEL_FOLLOWER).
STATE_INIT = 1,
// The state used to inform a waiting Writer that it has become the
// leader, and it should now build a write batch group. Tricky:
// this state is not used if newest_writer_ is empty when a writer
// enqueues itself, because there is no need to wait (or even to
// create the mutex and condvar used to wait) in that case. This is
// a terminal state unless the leader chooses to make this a parallel
// batch, in which case the last parallel worker to finish will move
// the leader to STATE_COMPLETED.
STATE_GROUP_LEADER = 2,
// A Writer that has returned as a follower in a parallel group.
// It should apply its batch to the memtable and then call
// CompleteParallelWorker. When someone calls ExitAsBatchGroupLeader
// or EarlyExitParallelGroup this state will get transitioned to
// STATE_COMPLETED.
STATE_PARALLEL_FOLLOWER = 4,
// A follower whose writes have been applied, or a parallel leader
// whose followers have all finished their work. This is a terminal
// state.
STATE_COMPLETED = 8,
// A state indicating that the thread may be waiting using StateMutex()
// and StateCondVar()
STATE_LOCKED_WAITING = 16,
};
struct Writer;
struct ParallelGroup {
Writer* leader;
Writer* last_writer;
bool early_exit_allowed;
// before running goes to zero, status needs leader->StateMutex()
Status status;
std::atomic<uint32_t> running;
};
// Information kept for every waiting writer.
struct Writer {
WriteBatch* batch;
bool sync;
bool disableWAL;
bool in_batch_group;
bool done;
bool has_callback;
bool made_waitable; // records lazy construction of mutex and cv
std::atomic<uint8_t> state; // write under StateMutex() or pre-link
ParallelGroup* parallel_group;
SequenceNumber sequence; // the sequence number to use
Status status;
bool made_waitable; // records lazy construction of mutex and cv
bool joined; // read/write only under JoinMutex() (or pre-link)
std::aligned_storage<sizeof(std::mutex)>::type join_mutex_bytes;
std::aligned_storage<sizeof(std::condition_variable)>::type join_cv_bytes;
std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
Writer* link_older; // read/write only before linking, or as leader
Writer* link_newer; // lazy, read/write only before linking, or as leader
@ -41,44 +93,45 @@ class WriteThread {
sync(false),
disableWAL(false),
in_batch_group(false),
done(false),
has_callback(false),
made_waitable(false),
joined(false),
state(STATE_INIT),
link_older(nullptr),
link_newer(nullptr) {}
~Writer() {
if (made_waitable) {
JoinMutex().~mutex();
JoinCV().~condition_variable();
StateMutex().~mutex();
StateCV().~condition_variable();
}
}
void CreateMutex() {
assert(!joined);
if (!made_waitable) {
// Note that made_waitable is tracked separately from state
// transitions, because we can't atomically create the mutex and
// link into the list.
made_waitable = true;
new (&join_mutex_bytes) std::mutex;
new (&join_cv_bytes) std::condition_variable;
new (&state_mutex_bytes) std::mutex;
new (&state_cv_bytes) std::condition_variable;
}
}
// No other mutexes may be acquired while holding JoinMutex(), it is
// No other mutexes may be acquired while holding StateMutex(), it is
// always last in the order
std::mutex& JoinMutex() {
std::mutex& StateMutex() {
assert(made_waitable);
return *static_cast<std::mutex*>(static_cast<void*>(&join_mutex_bytes));
return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes));
}
std::condition_variable& JoinCV() {
std::condition_variable& StateCV() {
assert(made_waitable);
return *static_cast<std::condition_variable*>(
static_cast<void*>(&join_cv_bytes));
static_cast<void*>(&state_cv_bytes));
}
};
WriteThread() : newest_writer_(nullptr) {}
WriteThread(uint64_t max_yield_usec, uint64_t slow_yield_usec);
// IMPORTANT: None of the methods in this class rely on the db mutex
// for correctness. All of the methods except JoinBatchGroup and
@ -86,13 +139,16 @@ class WriteThread {
// Correctness is maintained by ensuring that only a single thread is
// a leader at a time.
// Registers w as ready to become part of a batch group, and blocks
// until some other thread has completed the write (in which case
// w->done will be set to true) or this write has become the leader
// of a batch group (w->done will remain unset). The db mutex SHOULD
// NOT be held when calling this function, because it will block.
// If !w->done then JoinBatchGroup should be followed by a call to
// EnterAsBatchGroupLeader and ExitAsBatchGroupLeader.
// Registers w as ready to become part of a batch group, waits until the
// caller should perform some work, and returns the current state of the
// writer. If w has become the leader of a write batch group, returns
// STATE_GROUP_LEADER. If w has been made part of a sequential batch
// group and the leader has performed the write, returns STATE_DONE.
// If w has been made part of a parallel batch group and is reponsible
// for updating the memtable, returns STATE_PARALLEL_FOLLOWER.
//
// The db mutex SHOULD NOT be held when calling this function, because
// it will block.
//
// Writer* w: Writer to be executed as part of a batch group
void JoinBatchGroup(Writer* w);
@ -100,15 +156,35 @@ class WriteThread {
// Constructs a write batch group led by leader, which should be a
// Writer passed to JoinBatchGroup on the current thread.
//
// Writer* leader: Writer passed to JoinBatchGroup, but !done
// Writer** last_writer: Out-param for use by ExitAsBatchGroupLeader
// Writer* leader: Writer that is STATE_GROUP_LEADER
// Writer** last_writer: Out-param that identifies the last follower
// autovector<WriteBatch*>* write_batch_group: Out-param of group members
// returns: Total batch group size
// returns: Total batch group byte size
size_t EnterAsBatchGroupLeader(Writer* leader, Writer** last_writer,
autovector<WriteBatch*>* write_batch_group);
// Unlinks the Writer-s in a batch group, wakes up the non-leaders, and
// wakes up the next leader (if any).
// Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the
// non-leader members of this write batch group. Sets Writer::sequence
// before waking them up.
//
// ParallalGroup* pg: Extra state used to coordinate the parallel add
// SequenceNumber sequence: Starting sequence number to assign to Writer-s
void LaunchParallelFollowers(ParallelGroup* pg, SequenceNumber sequence);
// Reports the completion of w's batch to the parallel group leader, and
// waits for the rest of the parallel batch to complete. Returns true
// if this thread is the last to complete, and hence should advance
// the sequence number and then call EarlyExitParallelGroup, false if
// someone else has already taken responsibility for that.
bool CompleteParallelWorker(Writer* w);
// This method performs an early completion of a parallel write group,
// where the cleanup work of the leader is performed by a follower who
// happens to be the last parallel worker to complete.
void EarlyExitParallelGroup(Writer* w);
// Unlinks the Writer-s in a batch group, wakes up the non-leaders,
// and wakes up the next leader (if any).
//
// Writer* leader: From EnterAsBatchGroupLeader
// Writer* last_writer: Value of out-param of EnterAsBatchGroupLeader
@ -128,18 +204,35 @@ class WriteThread {
// writers.
void ExitUnbatched(Writer* w);
struct AdaptationContext {
const char* name;
std::atomic<int32_t> value;
};
private:
uint64_t max_yield_usec_;
uint64_t slow_yield_usec_;
// Points to the newest pending Writer. Only leader can remove
// elements, adding can be done lock-free by anybody
std::atomic<Writer*> newest_writer_;
void Await(Writer* w);
void MarkJoined(Writer* w);
// Waits for w->state & goal_mask using w->StateMutex(). Returns
// the state that satisfies goal_mask.
uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
// Blocks until w->state & goal_mask, returning the state value
// that satisfied the predicate. Uses ctx to adaptively use
// std::this_thread::yield() to avoid mutex overheads. ctx should be
// a context-dependent static.
uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
void SetState(Writer* w, uint8_t new_state);
// Links w into the newest_writer_ list. Sets *wait_needed to false
// if w was linked directly into the leader position, true otherwise.
// Safe to call from multiple threads without external locking.
void LinkOne(Writer* w, bool* wait_needed);
// Links w into the newest_writer_ list. Sets *linked_as_leader to
// true if w was linked directly into the leader position. Safe to
// call from multiple threads without external locking.
void LinkOne(Writer* w, bool* linked_as_leader);
// Computes any missing link_newer links. Should not be called
// concurrently with itself.

@ -11,16 +11,20 @@
#pragma once
#include <atomic>
namespace rocksdb {
class WriteBuffer {
public:
explicit WriteBuffer(size_t _buffer_size)
: buffer_size_(_buffer_size), memory_used_(0) {}
: buffer_size_(_buffer_size), memory_used_(0) {}
~WriteBuffer() {}
size_t memory_usage() const { return memory_used_; }
size_t memory_usage() const {
return memory_used_.load(std::memory_order_relaxed);
}
size_t buffer_size() const { return buffer_size_; }
// Should only be called from write thread
@ -29,12 +33,16 @@ class WriteBuffer {
}
// Should only be called from write thread
void ReserveMem(size_t mem) { memory_used_ += mem; }
void FreeMem(size_t mem) { memory_used_ -= mem; }
void ReserveMem(size_t mem) {
memory_used_.fetch_add(mem, std::memory_order_relaxed);
}
void FreeMem(size_t mem) {
memory_used_.fetch_sub(mem, std::memory_order_relaxed);
}
private:
const size_t buffer_size_;
size_t memory_used_;
std::atomic<size_t> memory_used_;
// No copying allowed
WriteBuffer(const WriteBuffer&);

@ -36,7 +36,9 @@
#pragma once
#include <memory>
#include <stdexcept>
#include <stdint.h>
#include <stdlib.h>
namespace rocksdb {
@ -68,25 +70,36 @@ class MemTableRep {
explicit MemTableRep(MemTableAllocator* allocator) : allocator_(allocator) {}
// Allocate a buf of len size for storing key. The idea is that a specific
// memtable representation knows its underlying data structure better. By
// allowing it to allocate memory, it can possibly put correlated stuff
// in consecutive memory area to make processor prefetching more efficient.
// Allocate a buf of len size for storing key. The idea is that a
// specific memtable representation knows its underlying data structure
// better. By allowing it to allocate memory, it can possibly put
// correlated stuff in consecutive memory area to make processor
// prefetching more efficient.
virtual KeyHandle Allocate(const size_t len, char** buf);
// Insert key into the collection. (The caller will pack key and value into a
// single buffer and pass that in as the parameter to Insert).
// REQUIRES: nothing that compares equal to key is currently in the
// collection.
// collection, and no concurrent modifications to the table in progress
virtual void Insert(KeyHandle handle) = 0;
// Like Insert(handle), but may be called concurrent with other calls
// to InsertConcurrently for other handles
virtual void InsertConcurrently(KeyHandle handle) {
#ifndef ROCKSDB_LITE
throw std::runtime_error("concurrent insert not supported");
#else
abort();
#endif
}
// Returns true iff an entry that compares equal to key is in the collection.
virtual bool Contains(const char* key) const = 0;
// Notify this table rep that it will no longer be added to. By default, does
// nothing. After MarkReadOnly() is called, this table rep will not be
// written to (ie No more calls to Allocate(), Insert(), or any writes done
// directly to entries accessed through the iterator.)
// Notify this table rep that it will no longer be added to. By default,
// does nothing. After MarkReadOnly() is called, this table rep will
// not be written to (ie No more calls to Allocate(), Insert(),
// or any writes done directly to entries accessed through the iterator.)
virtual void MarkReadOnly() { }
// Look up key from the mem table, since the first key in the mem table whose
@ -94,6 +107,7 @@ class MemTableRep {
// callback_args directly forwarded as the first parameter, and the mem table
// key as the second parameter. If the return value is false, then terminates.
// Otherwise, go through the next key.
//
// It's safe for Get() to terminate after having finished all the potential
// key for the k.user_key(), or not.
//
@ -109,7 +123,7 @@ class MemTableRep {
}
// Report an approximation of how much memory has been used other than memory
// that was allocated through the allocator.
// that was allocated through the allocator. Safe to call from any thread.
virtual size_t ApproximateMemoryUsage() = 0;
virtual ~MemTableRep() { }
@ -174,6 +188,10 @@ class MemTableRep {
// Default: true
virtual bool IsSnapshotSupported() const { return true; }
// Return true if the current MemTableRep supports concurrent inserts
// Default: false
virtual bool IsInsertConcurrentlySupported() const { return false; }
protected:
// When *key is an internal key concatenated with the value, returns the
// user key.

@ -1173,6 +1173,47 @@ struct DBOptions {
// Default: 2MB/s
uint64_t delayed_write_rate;
// If true, allow multi-writers to update mem tables in parallel.
// Only some memtable_factory-s support concurrent writes; currently it
// is implemented only for SkipListFactory. Concurrent memtable writes
// are not compatible with inplace_update_support or filter_deletes.
// It is strongly recommended to set enable_write_thread_adaptive_yield
// if you are going to use this feature.
//
// THIS FEATURE IS NOT STABLE YET.
//
// Default: false
bool allow_concurrent_memtable_write;
// If true, threads synchronizing with the write batch group leader will
// wait for up to write_thread_max_yield_usec before blocking on a mutex.
// This can substantially improve throughput for concurrent workloads,
// regardless of whether allow_concurrent_memtable_write is enabled.
//
// THIS FEATURE IS NOT STABLE YET.
//
// Default: false
bool enable_write_thread_adaptive_yield;
// The maximum number of microseconds that a write operation will use
// a yielding spin loop to coordinate with other write threads before
// blocking on a mutex. (Assuming write_thread_slow_yield_usec is
// set properly) increasing this value is likely to increase RocksDB
// throughput at the expense of increased CPU usage.
//
// Default: 100
uint64_t write_thread_max_yield_usec;
// The latency in microseconds after which a std::this_thread::yield
// call (sched_yield on Linux) is considered to be a signal that
// other processes or threads would like to use the current core.
// Increasing this makes writer threads more likely to take CPU
// by spinning, which will show up as an increase in the number of
// involuntary context switches.
//
// Default: 3
uint64_t write_thread_slow_yield_usec;
// If true, then DB::Open() will not update the statistics used to optimize
// compaction decision by loading table properties from many files.
// Turning off this feature will improve DBOpen time especially in

@ -145,7 +145,7 @@ enum Tickers : uint32_t {
// Writes can be processed by requesting thread or by the thread at the
// head of the writers queue.
WRITE_DONE_BY_SELF,
WRITE_DONE_BY_OTHER,
WRITE_DONE_BY_OTHER, // Equivalent to writes done for others
WRITE_TIMEDOUT, // Number of writes ending up with timed-out.
WRITE_WITH_WAL, // Number of Write calls that request WAL
COMPACT_READ_BYTES, // Bytes read during compaction

@ -10,6 +10,9 @@
#include "port/port_posix.h"
#include <assert.h>
#if defined(__i386__) || defined(__x86_64__)
#include <cpuid.h>
#endif
#include <errno.h>
#include <signal.h>
#include <stdio.h>
@ -132,6 +135,19 @@ void RWMutex::ReadUnlock() { PthreadCall("read unlock", pthread_rwlock_unlock(&m
void RWMutex::WriteUnlock() { PthreadCall("write unlock", pthread_rwlock_unlock(&mu_)); }
int PhysicalCoreID() {
#if defined(__i386__) || defined(__x86_64__)
// if you ever find that this function is hot on Linux, you can go from
// ~200 nanos to ~20 nanos by adding the machinery to use __vdso_getcpu
unsigned eax, ebx = 0, ecx, edx;
__get_cpuid(1, &eax, &ebx, &ecx, &edx);
return ebx >> 24;
#else
// getcpu or sched_getcpu could work here
return -1;
#endif
}
void InitOnce(OnceType* once, void (*initializer)()) {
PthreadCall("once", pthread_once(once, initializer));
}

@ -43,8 +43,9 @@
#include <pthread.h>
#include <stdint.h>
#include <string>
#include <string.h>
#include <limits>
#include <string>
#ifndef PLATFORM_IS_LITTLE_ENDIAN
#define PLATFORM_IS_LITTLE_ENDIAN (__BYTE_ORDER == __LITTLE_ENDIAN)
@ -71,8 +72,6 @@
#define fdatasync fsync
#endif
#include <limits>
namespace rocksdb {
namespace port {
@ -142,6 +141,20 @@ class CondVar {
Mutex* mu_;
};
static inline void AsmVolatilePause() {
#if defined(__i386__) || defined(__x86_64__)
asm volatile("pause");
#elif defined(__aarch64__)
asm volatile("wfe");
#elif defined(__powerpc64__)
asm volatile("or 27,27,27");
#endif
// it's okay for other platforms to be no-ops
}
// Returns -1 if not available on this platform
extern int PhysicalCoreID();
typedef pthread_once_t OnceType;
#define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT
extern void InitOnce(OnceType* once, void (*initializer)());

@ -100,6 +100,8 @@ void CondVar::Signal() { cv_.notify_one(); }
void CondVar::SignalAll() { cv_.notify_all(); }
int PhysicalCoreID() { return GetCurrentProcessorNumber(); }
void InitOnce(OnceType* once, void (*initializer)()) {
std::call_once(once->flag_, initializer);
}

@ -243,6 +243,15 @@ extern void InitOnce(OnceType* once, void (*initializer)());
#define CACHE_LINE_SIZE 64U
static inline void AsmVolatilePause() {
#if defined(_M_IX86) || defined(_M_X64)
::_mm_pause();
#endif
// it would be nice to get "wfe" on ARM here
}
extern int PhysicalCoreID();
// For Thread Local Storage abstraction
typedef DWORD pthread_key_t;

@ -88,6 +88,7 @@ LIB_SOURCES = \
util/coding.cc \
util/comparator.cc \
util/compaction_job_stats_impl.cc \
util/concurrent_arena.cc \
util/crc32c.cc \
util/db_info_dumper.cc \
util/delete_scheduler_impl.cc \

@ -2041,7 +2041,8 @@ TEST_F(MemTableTest, Simple) {
batch.Put(std::string("k3"), std::string("v3"));
batch.Put(std::string("largekey"), std::string("vlarge"));
ColumnFamilyMemTablesDefault cf_mems_default(memtable);
ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, &cf_mems_default).ok());
ASSERT_TRUE(
WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr).ok());
Arena arena;
ScopedArenaIterator iter(memtable->NewIterator(ReadOptions(), &arena));

@ -167,7 +167,7 @@ char* Arena::AllocateAligned(size_t bytes, size_t huge_page_size,
aligned_alloc_ptr_ += needed;
alloc_bytes_remaining_ -= needed;
} else {
// AllocateFallback always returned aligned memory
// AllocateFallback always returns aligned memory
result = AllocateFallback(bytes, true /* aligned */);
}
assert((reinterpret_cast<uintptr_t>(result) & (kAlignUnit - 1)) == 0);

@ -21,6 +21,7 @@
#include <assert.h>
#include <stdint.h>
#include "util/allocator.h"
#include "util/mutexlock.h"
namespace rocksdb {
@ -76,7 +77,7 @@ class Arena : public Allocator {
size_t BlockSize() const override { return kBlockSize; }
private:
char inline_block_[kInlineSize];
char inline_block_[kInlineSize] __attribute__((__aligned__(sizeof(void*))));
// Number of bytes allocated in one block
const size_t kBlockSize;
// Array of new[] allocated memory blocks

@ -0,0 +1,49 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/concurrent_arena.h"
#include <thread>
#include "port/likely.h"
#include "port/port.h"
#include "util/random.h"
namespace rocksdb {
#if ROCKSDB_SUPPORT_THREAD_LOCAL
__thread uint32_t ConcurrentArena::tls_cpuid = 0;
#endif
ConcurrentArena::ConcurrentArena(size_t block_size, size_t huge_page_size)
: shard_block_size_(block_size / 8), arena_(block_size, huge_page_size) {
// find a power of two >= num_cpus and >= 8
auto num_cpus = std::thread::hardware_concurrency();
index_mask_ = 7;
while (index_mask_ + 1 < num_cpus) {
index_mask_ = index_mask_ * 2 + 1;
}
shards_.reset(new Shard[index_mask_ + 1]);
Fixup();
}
ConcurrentArena::Shard* ConcurrentArena::Repick() {
int cpuid = port::PhysicalCoreID();
if (UNLIKELY(cpuid < 0)) {
// cpu id unavailable, just pick randomly
cpuid = Random::GetTLSInstance()->Uniform(index_mask_ + 1);
}
#if ROCKSDB_SUPPORT_THREAD_LOCAL
// even if we are cpu 0, use a non-zero tls_cpuid so we can tell we
// have repicked
tls_cpuid = cpuid | (index_mask_ + 1);
#endif
return &shards_[cpuid & index_mask_];
}
} // namespace rocksdb

@ -0,0 +1,192 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <memory>
#include <utility>
#include "port/likely.h"
#include "util/allocator.h"
#include "util/arena.h"
#include "util/mutexlock.h"
#include "util/thread_local.h"
namespace rocksdb {
class Logger;
// ConcurrentArena wraps an Arena. It makes it thread safe using a fast
// inlined spinlock, and adds small per-core allocation caches to avoid
// contention for small allocations. To avoid any memory waste from the
// per-core shards, they are kept small, they are lazily instantiated
// only if ConcurrentArena actually notices concurrent use, and they
// adjust their size so that there is no fragmentation waste when the
// shard blocks are allocated from the underlying main arena.
class ConcurrentArena : public Allocator {
public:
// block_size and huge_page_size are the same as for Arena (and are
// in fact just passed to the constructor of arena_. The core-local
// shards compute their shard_block_size as a fraction of block_size
// that varies according to the hardware concurrency level.
explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
size_t huge_page_size = 0);
char* Allocate(size_t bytes) override {
return AllocateImpl(bytes, false /*force_arena*/,
[=]() { return arena_.Allocate(bytes); });
}
char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
Logger* logger = nullptr) override {
size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
(rounded_up % sizeof(void*)) == 0);
return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() {
return arena_.AllocateAligned(rounded_up, huge_page_size, logger);
});
}
size_t ApproximateMemoryUsage() const {
std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
if (index_mask_ != 0) {
lock.lock();
}
return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
}
size_t MemoryAllocatedBytes() const {
return memory_allocated_bytes_.load(std::memory_order_relaxed);
}
size_t AllocatedAndUnused() const {
return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
ShardAllocatedAndUnused();
}
size_t IrregularBlockNum() const {
return irregular_block_num_.load(std::memory_order_relaxed);
}
size_t BlockSize() const override { return arena_.BlockSize(); }
private:
struct Shard {
char padding[40];
mutable SpinMutex mutex;
char* free_begin_;
std::atomic<size_t> allocated_and_unused_;
Shard() : allocated_and_unused_(0) {}
};
#if ROCKSDB_SUPPORT_THREAD_LOCAL
static __thread uint32_t tls_cpuid;
#else
enum ZeroFirstEnum : uint32_t { tls_cpuid = 0 };
#endif
char padding0[56];
size_t shard_block_size_;
// shards_[i & index_mask_] is valid
size_t index_mask_;
std::unique_ptr<Shard[]> shards_;
Arena arena_;
mutable SpinMutex arena_mutex_;
std::atomic<size_t> arena_allocated_and_unused_;
std::atomic<size_t> memory_allocated_bytes_;
std::atomic<size_t> irregular_block_num_;
char padding1[56];
Shard* Repick();
size_t ShardAllocatedAndUnused() const {
size_t total = 0;
for (size_t i = 0; i <= index_mask_; ++i) {
total += shards_[i].allocated_and_unused_.load(std::memory_order_relaxed);
}
return total;
}
template <typename Func>
char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
uint32_t cpu;
// Go directly to the arena if the allocation is too large, or if
// we've never needed to Repick() and the arena mutex is available
// with no waiting. This keeps the fragmentation penalty of
// concurrency zero unless it might actually confer an advantage.
std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
if (bytes > shard_block_size_ / 4 || force_arena ||
((cpu = tls_cpuid) == 0 &&
!shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) &&
arena_lock.try_lock())) {
if (!arena_lock.owns_lock()) {
arena_lock.lock();
}
auto rv = func();
Fixup();
return rv;
}
// pick a shard from which to allocate
Shard* s = &shards_[cpu & index_mask_];
if (!s->mutex.try_lock()) {
s = Repick();
s->mutex.lock();
}
std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
if (avail < bytes) {
// reload
std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
// If the arena's current block is within a factor of 2 of the right
// size, we adjust our request to avoid arena waste.
auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
assert(exact == arena_.AllocatedAndUnused());
avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
? exact
: shard_block_size_;
s->free_begin_ = arena_.AllocateAligned(avail);
Fixup();
}
s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
char* rv;
if ((bytes % sizeof(void*)) == 0) {
// aligned allocation from the beginning
rv = s->free_begin_;
s->free_begin_ += bytes;
} else {
// unaligned from the end
rv = s->free_begin_ + avail - bytes;
}
return rv;
}
void Fixup() {
arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
std::memory_order_relaxed);
memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
std::memory_order_relaxed);
irregular_block_num_.store(arena_.IrregularBlockNum(),
std::memory_order_relaxed);
}
ConcurrentArena(const ConcurrentArena&) = delete;
ConcurrentArena& operator=(const ConcurrentArena&) = delete;
};
} // namespace rocksdb

@ -48,7 +48,7 @@ DynamicBloom::DynamicBloom(uint32_t num_probes,
void DynamicBloom::SetRawData(unsigned char* raw_data, uint32_t total_bits,
uint32_t num_blocks) {
data_ = raw_data;
data_ = reinterpret_cast<std::atomic<uint8_t>*>(raw_data);
kTotalBits = total_bits;
kNumBlocks = num_blocks;
}
@ -69,15 +69,14 @@ void DynamicBloom::SetTotalBits(Allocator* allocator,
sz += CACHE_LINE_SIZE - 1;
}
assert(allocator);
raw_ = reinterpret_cast<unsigned char*>(
allocator->AllocateAligned(sz, huge_page_tlb_size, logger));
memset(raw_, 0, sz);
if (kNumBlocks > 0 && (reinterpret_cast<uint64_t>(raw_) % CACHE_LINE_SIZE)) {
data_ = raw_ + CACHE_LINE_SIZE -
reinterpret_cast<uint64_t>(raw_) % CACHE_LINE_SIZE;
} else {
data_ = raw_;
char* raw = allocator->AllocateAligned(sz, huge_page_tlb_size, logger);
memset(raw, 0, sz);
auto cache_line_offset = reinterpret_cast<uintptr_t>(raw) % CACHE_LINE_SIZE;
if (kNumBlocks > 0 && cache_line_offset > 0) {
raw += CACHE_LINE_SIZE - cache_line_offset;
}
data_ = reinterpret_cast<std::atomic<uint8_t>*>(raw);
}
} // rocksdb

@ -51,9 +51,15 @@ class DynamicBloom {
// Assuming single threaded access to this function.
void Add(const Slice& key);
// Like Add, but may be called concurrent with other functions.
void AddConcurrently(const Slice& key);
// Assuming single threaded access to this function.
void AddHash(uint32_t hash);
// Like AddHash, but may be called concurrent with other functions.
void AddHashConcurrently(uint32_t hash);
// Multithreaded access to this function is OK
bool MayContain(const Slice& key) const;
@ -81,12 +87,40 @@ class DynamicBloom {
const uint32_t kNumProbes;
uint32_t (*hash_func_)(const Slice& key);
unsigned char* data_;
unsigned char* raw_;
std::atomic<uint8_t>* data_;
// or_func(ptr, mask) should effect *ptr |= mask with the appropriate
// concurrency safety, working with bytes.
template <typename OrFunc>
void AddHash(uint32_t hash, const OrFunc& or_func);
};
inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); }
inline void DynamicBloom::AddConcurrently(const Slice& key) {
AddHashConcurrently(hash_func_(key));
}
inline void DynamicBloom::AddHash(uint32_t hash) {
AddHash(hash, [](std::atomic<uint8_t>* ptr, uint8_t mask) {
ptr->store(ptr->load(std::memory_order_relaxed) | mask,
std::memory_order_relaxed);
});
}
inline void DynamicBloom::AddHashConcurrently(uint32_t hash) {
AddHash(hash, [](std::atomic<uint8_t>* ptr, uint8_t mask) {
// Happens-before between AddHash and MaybeContains is handled by
// access to versions_->LastSequence(), so all we have to do here is
// avoid races (so we don't give the compiler a license to mess up
// our code) and not lose bits. std::memory_order_relaxed is enough
// for that.
if ((mask & ptr->load(std::memory_order_relaxed)) != mask) {
ptr->fetch_or(mask, std::memory_order_relaxed);
}
});
}
inline bool DynamicBloom::MayContain(const Slice& key) const {
return (MayContainHash(hash_func_(key)));
}
@ -107,7 +141,8 @@ inline bool DynamicBloom::MayContainHash(uint32_t h) const {
// Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized
// to a simple and operation by compiler.
const uint32_t bitpos = b + (h % (CACHE_LINE_SIZE * 8));
if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) {
uint8_t byteval = data_[bitpos / 8].load(std::memory_order_relaxed);
if ((byteval & (1 << (bitpos % 8))) == 0) {
return false;
}
// Rotate h so that we don't reuse the same bytes.
@ -118,7 +153,8 @@ inline bool DynamicBloom::MayContainHash(uint32_t h) const {
} else {
for (uint32_t i = 0; i < kNumProbes; ++i) {
const uint32_t bitpos = h % kTotalBits;
if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) {
uint8_t byteval = data_[bitpos / 8].load(std::memory_order_relaxed);
if ((byteval & (1 << (bitpos % 8))) == 0) {
return false;
}
h += delta;
@ -127,7 +163,8 @@ inline bool DynamicBloom::MayContainHash(uint32_t h) const {
return true;
}
inline void DynamicBloom::AddHash(uint32_t h) {
template <typename OrFunc>
inline void DynamicBloom::AddHash(uint32_t h, const OrFunc& or_func) {
assert(IsInitialized());
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
if (kNumBlocks != 0) {
@ -136,7 +173,7 @@ inline void DynamicBloom::AddHash(uint32_t h) {
// Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized
// to a simple and operation by compiler.
const uint32_t bitpos = b + (h % (CACHE_LINE_SIZE * 8));
data_[bitpos / 8] |= (1 << (bitpos % 8));
or_func(&data_[bitpos / 8], (1 << (bitpos % 8)));
// Rotate h so that we don't reuse the same bytes.
h = h / (CACHE_LINE_SIZE * 8) +
(h % (CACHE_LINE_SIZE * 8)) * (0x20000000U / CACHE_LINE_SIZE);
@ -145,7 +182,7 @@ inline void DynamicBloom::AddHash(uint32_t h) {
} else {
for (uint32_t i = 0; i < kNumProbes; ++i) {
const uint32_t bitpos = h % kTotalBits;
data_[bitpos / 8] |= (1 << (bitpos % 8));
or_func(&data_[bitpos / 8], (1 << (bitpos % 8)));
h += delta;
}
}

@ -17,6 +17,10 @@ int main() {
#include <inttypes.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include <thread>
#include <vector>
#include <gflags/gflags.h>
#include "dynamic_bloom.h"
@ -72,6 +76,25 @@ TEST_F(DynamicBloomTest, Small) {
ASSERT_TRUE(!bloom2.MayContain("foo"));
}
TEST_F(DynamicBloomTest, SmallConcurrentAdd) {
Arena arena;
DynamicBloom bloom1(&arena, 100, 0, 2);
bloom1.AddConcurrently("hello");
bloom1.AddConcurrently("world");
ASSERT_TRUE(bloom1.MayContain("hello"));
ASSERT_TRUE(bloom1.MayContain("world"));
ASSERT_TRUE(!bloom1.MayContain("x"));
ASSERT_TRUE(!bloom1.MayContain("foo"));
DynamicBloom bloom2(&arena, CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2);
bloom2.AddConcurrently("hello");
bloom2.AddConcurrently("world");
ASSERT_TRUE(bloom2.MayContain("hello"));
ASSERT_TRUE(bloom2.MayContain("world"));
ASSERT_TRUE(!bloom2.MayContain("x"));
ASSERT_TRUE(!bloom2.MayContain("foo"));
}
static uint32_t NextNum(uint32_t num) {
if (num < 10) {
num += 1;
@ -93,8 +116,8 @@ TEST_F(DynamicBloomTest, VaryingLengths) {
int good_filters = 0;
uint32_t num_probes = static_cast<uint32_t>(FLAGS_num_probes);
fprintf(stderr, "bits_per_key: %d num_probes: %d\n",
FLAGS_bits_per_key, num_probes);
fprintf(stderr, "bits_per_key: %d num_probes: %d\n", FLAGS_bits_per_key,
num_probes);
for (uint32_t enable_locality = 0; enable_locality < 2; ++enable_locality) {
for (uint32_t num = 1; num <= 10000; num = NextNum(num)) {
@ -114,8 +137,8 @@ TEST_F(DynamicBloomTest, VaryingLengths) {
// All added keys must match
for (uint64_t i = 0; i < num; i++) {
ASSERT_TRUE(bloom.MayContain(Key(i, buffer)))
<< "Num " << num << "; key " << i;
ASSERT_TRUE(bloom.MayContain(Key(i, buffer))) << "Num " << num
<< "; key " << i;
}
// Check false positive rate
@ -139,9 +162,9 @@ TEST_F(DynamicBloomTest, VaryingLengths) {
good_filters++;
}
fprintf(stderr, "Filters: %d good, %d mediocre\n",
good_filters, mediocre_filters);
ASSERT_LE(mediocre_filters, good_filters/5);
fprintf(stderr, "Filters: %d good, %d mediocre\n", good_filters,
mediocre_filters);
ASSERT_LE(mediocre_filters, good_filters / 5);
}
}
@ -161,7 +184,7 @@ TEST_F(DynamicBloomTest, perf) {
DynamicBloom std_bloom(&arena, num_keys * 10, 0, num_probes);
timer.Start();
for (uint32_t i = 1; i <= num_keys; ++i) {
for (uint64_t i = 1; i <= num_keys; ++i) {
std_bloom.Add(Slice(reinterpret_cast<const char*>(&i), 8));
}
@ -171,7 +194,7 @@ TEST_F(DynamicBloomTest, perf) {
uint32_t count = 0;
timer.Start();
for (uint32_t i = 1; i <= num_keys; ++i) {
for (uint64_t i = 1; i <= num_keys; ++i) {
if (std_bloom.MayContain(Slice(reinterpret_cast<const char*>(&i), 8))) {
++count;
}
@ -184,31 +207,125 @@ TEST_F(DynamicBloomTest, perf) {
// Locality enabled version
DynamicBloom blocked_bloom(&arena, num_keys * 10, 1, num_probes);
timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
blocked_bloom.Add(Slice(reinterpret_cast<const char*>(&i), 8));
}
elapsed = timer.ElapsedNanos();
fprintf(stderr,
"blocked bloom(enable locality), avg add latency %" PRIu64 "\n",
elapsed / num_keys);
count = 0;
timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
if (blocked_bloom.MayContain(
Slice(reinterpret_cast<const char*>(&i), 8))) {
++count;
}
}
elapsed = timer.ElapsedNanos();
fprintf(stderr,
"blocked bloom(enable locality), avg query latency %" PRIu64 "\n",
elapsed / count);
ASSERT_TRUE(count == num_keys);
}
}
TEST_F(DynamicBloomTest, concurrent_with_perf) {
StopWatchNano timer(Env::Default());
uint32_t num_probes = static_cast<uint32_t>(FLAGS_num_probes);
uint32_t m_limit = FLAGS_enable_perf ? 8 : 1;
uint32_t locality_limit = FLAGS_enable_perf ? 1 : 0;
uint32_t num_threads = 4;
std::vector<std::thread> threads;
for (uint32_t m = 1; m <= m_limit; ++m) {
for (uint32_t locality = 0; locality <= locality_limit; ++locality) {
Arena arena;
const uint32_t num_keys = m * 8 * 1024 * 1024;
fprintf(stderr, "testing %" PRIu32 "M keys with %" PRIu32 " locality\n",
m * 8, locality);
DynamicBloom std_bloom(&arena, num_keys * 10, locality, num_probes);
timer.Start();
for (uint32_t i = 1; i <= num_keys; ++i) {
blocked_bloom.Add(Slice(reinterpret_cast<const char*>(&i), 8));
auto adder = [&](size_t t) {
for (uint64_t i = 1 + t; i <= num_keys; i += num_threads) {
std_bloom.AddConcurrently(
Slice(reinterpret_cast<const char*>(&i), 8));
}
};
for (size_t t = 0; t < num_threads; ++t) {
// TSAN currently complains of a race between an allocation
// made bythis race and the eventual shutdown of the thread.
// It is a false positive.
threads.emplace_back(adder, t);
}
while (threads.size() > 0) {
threads.back().join();
threads.pop_back();
}
uint64_t elapsed = timer.ElapsedNanos();
fprintf(stderr, "standard bloom, avg parallel add latency %" PRIu64
" nanos/key\n",
elapsed / num_keys);
timer.Start();
auto hitter = [&](size_t t) {
for (uint64_t i = 1 + t; i <= num_keys; i += num_threads) {
bool f =
std_bloom.MayContain(Slice(reinterpret_cast<const char*>(&i), 8));
ASSERT_TRUE(f);
}
};
for (size_t t = 0; t < num_threads; ++t) {
threads.emplace_back(hitter, t);
}
while (threads.size() > 0) {
threads.back().join();
threads.pop_back();
}
elapsed = timer.ElapsedNanos();
fprintf(stderr,
"blocked bloom(enable locality), avg add latency %" PRIu64 "\n",
fprintf(stderr, "standard bloom, avg parallel hit latency %" PRIu64
" nanos/key\n",
elapsed / num_keys);
count = 0;
timer.Start();
for (uint32_t i = 1; i <= num_keys; ++i) {
if (blocked_bloom.MayContain(
Slice(reinterpret_cast<const char*>(&i), 8))) {
++count;
std::atomic<uint32_t> false_positives(0);
auto misser = [&](size_t t) {
for (uint64_t i = num_keys + 1 + t; i <= 2 * num_keys;
i += num_threads) {
bool f =
std_bloom.MayContain(Slice(reinterpret_cast<const char*>(&i), 8));
if (f) {
++false_positives;
}
}
};
for (size_t t = 0; t < num_threads; ++t) {
threads.emplace_back(misser, t);
}
while (threads.size() > 0) {
threads.back().join();
threads.pop_back();
}
elapsed = timer.ElapsedNanos();
fprintf(stderr,
"blocked bloom(enable locality), avg query latency %" PRIu64 "\n",
elapsed / count);
ASSERT_TRUE(count == num_keys);
fprintf(stderr, "standard bloom, avg parallel miss latency %" PRIu64
" nanos/key, %f%% false positive rate\n",
elapsed / num_keys, false_positives.load() * 100.0 / num_keys);
}
}
}
} // namespace rocksdb

@ -8,6 +8,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <assert.h>
#include <atomic>
#include <mutex>
#include <thread>
#include "port/port.h"
namespace rocksdb {
@ -75,4 +79,39 @@ class WriteLock {
void operator=(const WriteLock&);
};
//
// SpinMutex has very low overhead for low-contention cases. Method names
// are chosen so you can use std::unique_lock or std::lock_guard with it.
//
class SpinMutex {
public:
SpinMutex() : locked_(false) {}
bool try_lock() {
auto currently_locked = locked_.load(std::memory_order_relaxed);
return !currently_locked &&
locked_.compare_exchange_weak(currently_locked, true,
std::memory_order_acquire,
std::memory_order_relaxed);
}
void lock() {
for (size_t tries = 0;; ++tries) {
if (try_lock()) {
// success
break;
}
port::AsmVolatilePause();
if (tries > 100) {
std::this_thread::yield();
}
}
}
void unlock() { locked_.store(false, std::memory_order_release); }
private:
std::atomic<bool> locked_;
};
} // namespace rocksdb

@ -262,6 +262,10 @@ DBOptions::DBOptions()
listeners(),
enable_thread_tracking(false),
delayed_write_rate(2 * 1024U * 1024U),
allow_concurrent_memtable_write(false),
enable_write_thread_adaptive_yield(false),
write_thread_max_yield_usec(100),
write_thread_slow_yield_usec(3),
skip_stats_update_on_db_open(false),
wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords),
row_cache(nullptr),
@ -325,6 +329,11 @@ DBOptions::DBOptions(const Options& options)
listeners(options.listeners),
enable_thread_tracking(options.enable_thread_tracking),
delayed_write_rate(options.delayed_write_rate),
allow_concurrent_memtable_write(options.allow_concurrent_memtable_write),
enable_write_thread_adaptive_yield(
options.enable_write_thread_adaptive_yield),
write_thread_max_yield_usec(options.write_thread_max_yield_usec),
write_thread_slow_yield_usec(options.write_thread_slow_yield_usec),
skip_stats_update_on_db_open(options.skip_stats_update_on_db_open),
wal_recovery_mode(options.wal_recovery_mode),
row_cache(options.row_cache),
@ -435,6 +444,14 @@ void DBOptions::Dump(Logger* log) const {
wal_recovery_mode);
Header(log, " Options.enable_thread_tracking: %d",
enable_thread_tracking);
Header(log, " Options.allow_concurrent_memtable_write: %d",
allow_concurrent_memtable_write);
Header(log, " Options.enable_write_thread_adaptive_yield: %d",
enable_write_thread_adaptive_yield);
Header(log, " Options.write_thread_max_yield_usec: %" PRIu64,
write_thread_max_yield_usec);
Header(log, " Options.write_thread_slow_yield_usec: %" PRIu64,
write_thread_slow_yield_usec);
if (row_cache) {
Header(log, " Options.row_cache: %" PRIu64,
row_cache->GetCapacity());

@ -25,6 +25,8 @@ public:
transform_(transform), lookahead_(lookahead) {
}
virtual bool IsInsertConcurrentlySupported() const override { return true; }
virtual KeyHandle Allocate(const size_t len, char** buf) override {
*buf = skip_list_.AllocateKey(len);
return static_cast<KeyHandle>(*buf);
@ -36,6 +38,10 @@ public:
skip_list_.Insert(static_cast<char*>(handle));
}
virtual void InsertConcurrently(KeyHandle handle) override {
skip_list_.InsertConcurrently(static_cast<char*>(handle));
}
// Returns true iff an entry that compares equal to key is in the list.
virtual bool Contains(const char* key) const override {
return skip_list_.Contains(key);

Loading…
Cancel
Save