Move pipeline write waiting logic into WaitForPendingWrites() (#5716)

Summary:
In pipeline writing mode, memtable switching needs to wait for memtable writing to finish to make sure that when memtables are made immutable, inserts are not going to them. This is currently done in DBImpl::SwitchMemtable(). This is done after flush_scheduler_.TakeNextColumnFamily() is called to fetch the list of column families to switch. The function flush_scheduler_.TakeNextColumnFamily() itself, however, is not thread-safe when being called together with flush_scheduler_.ScheduleFlush().
This change provides a fix, which moves the waiting logic before flush_scheduler_.TakeNextColumnFamily(). WaitForPendingWrites() is a natural place where the logic can happen.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5716

Test Plan: Run all tests with ASAN and TSAN.

Differential Revision: D18217658

fbshipit-source-id: b9c5e765c9989645bf10afda7c5c726c3f82f6c3
main
sdong 5 years ago committed by Facebook Github Bot
parent f22aaf8b3f
commit a3960fc875
  1. 12
      db/db_impl/db_impl.h
  2. 10
      db/db_impl/db_impl_write.cc
  3. 5
      tools/db_crashtest.py

@ -1409,10 +1409,22 @@ class DBImpl : public DB {
bool resuming_from_bg_err); bool resuming_from_bg_err);
inline void WaitForPendingWrites() { inline void WaitForPendingWrites() {
mutex_.AssertHeld();
// In case of pipelined write is enabled, wait for all pending memtable
// writers.
if (immutable_db_options_.enable_pipelined_write) {
// Memtable writers may call DB::Get in case max_successive_merges > 0,
// which may lock mutex. Unlocking mutex here to avoid deadlock.
mutex_.Unlock();
write_thread_.WaitForMemTableWriters();
mutex_.Lock();
}
if (!immutable_db_options_.unordered_write) { if (!immutable_db_options_.unordered_write) {
// Then the writes are finished before the next write group starts // Then the writes are finished before the next write group starts
return; return;
} }
// Wait for the ones who already wrote to the WAL to finish their // Wait for the ones who already wrote to the WAL to finish their
// memtable write. // memtable write.
if (pending_memtable_writes_.load() != 0) { if (pending_memtable_writes_.load() != 0) {

@ -1592,16 +1592,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
return s; return s;
} }
// In case of pipelined write is enabled, wait for all pending memtable
// writers.
if (immutable_db_options_.enable_pipelined_write) {
// Memtable writers may call DB::Get in case max_successive_merges > 0,
// which may lock mutex. Unlocking mutex here to avoid deadlock.
mutex_.Unlock();
write_thread_.WaitForMemTableWriters();
mutex_.Lock();
}
// Attempt to switch to a new memtable and trigger flush of old. // Attempt to switch to a new memtable and trigger flush of old.
// Do this without holding the dbmutex lock. // Do this without holding the dbmutex lock.
assert(versions_->prev_log_number() == 0); assert(versions_->prev_log_number() == 0);

@ -37,8 +37,7 @@ default_params = {
"delpercent": 4, "delpercent": 4,
"delrangepercent": 1, "delrangepercent": 1,
"destroy_db_initially": 0, "destroy_db_initially": 0,
# Temporarily disable it until its concurrency issue are fixed "enable_pipelined_write": lambda: random.randint(0, 1),
"enable_pipelined_write": 0,
"expected_values_path": expected_values_file.name, "expected_values_path": expected_values_file.name,
"flush_one_in": 1000000, "flush_one_in": 1000000,
# Temporarily disable hash index # Temporarily disable hash index
@ -147,7 +146,7 @@ cf_consistency_params = {
# more frequently # more frequently
"write_buffer_size": 1024 * 1024, "write_buffer_size": 1024 * 1024,
# disable pipelined write when test_atomic_flush is true # disable pipelined write when test_atomic_flush is true
"enable_pipelined_write": 0, "enable_pipelined_write": lambda: random.randint(0, 1),
} }

Loading…
Cancel
Save