From a3960fc875233308976351f185b672e8f01296ec Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 29 Oct 2019 18:15:08 -0700 Subject: [PATCH] Move pipeline write waiting logic into WaitForPendingWrites() (#5716) Summary: In pipeline writing mode, memtable switching needs to wait for memtable writing to finish to make sure that when memtables are made immutable, inserts are not going to them. This is currently done in DBImpl::SwitchMemtable(). This is done after flush_scheduler_.TakeNextColumnFamily() is called to fetch the list of column families to switch. The function flush_scheduler_.TakeNextColumnFamily() itself, however, is not thread-safe when being called together with flush_scheduler_.ScheduleFlush(). This change provides a fix, which moves the waiting logic before flush_scheduler_.TakeNextColumnFamily(). WaitForPendingWrites() is a natural place where the logic can happen. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5716 Test Plan: Run all tests with ASAN and TSAN. Differential Revision: D18217658 fbshipit-source-id: b9c5e765c9989645bf10afda7c5c726c3f82f6c3 --- db/db_impl/db_impl.h | 12 ++++++++++++ db/db_impl/db_impl_write.cc | 10 ---------- tools/db_crashtest.py | 5 ++--- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index d3bf37b7d..8f1b6e88a 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1409,10 +1409,22 @@ class DBImpl : public DB { bool resuming_from_bg_err); inline void WaitForPendingWrites() { + mutex_.AssertHeld(); + // In case of pipelined write is enabled, wait for all pending memtable + // writers. + if (immutable_db_options_.enable_pipelined_write) { + // Memtable writers may call DB::Get in case max_successive_merges > 0, + // which may lock mutex. Unlocking mutex here to avoid deadlock. + mutex_.Unlock(); + write_thread_.WaitForMemTableWriters(); + mutex_.Lock(); + } + if (!immutable_db_options_.unordered_write) { // Then the writes are finished before the next write group starts return; } + // Wait for the ones who already wrote to the WAL to finish their // memtable write. if (pending_memtable_writes_.load() != 0) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 2f6d35d17..4ab9de8c4 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1592,16 +1592,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { return s; } - // In case of pipelined write is enabled, wait for all pending memtable - // writers. - if (immutable_db_options_.enable_pipelined_write) { - // Memtable writers may call DB::Get in case max_successive_merges > 0, - // which may lock mutex. Unlocking mutex here to avoid deadlock. - mutex_.Unlock(); - write_thread_.WaitForMemTableWriters(); - mutex_.Lock(); - } - // Attempt to switch to a new memtable and trigger flush of old. // Do this without holding the dbmutex lock. assert(versions_->prev_log_number() == 0); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 54c3fb9ac..dfc6f760c 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -37,8 +37,7 @@ default_params = { "delpercent": 4, "delrangepercent": 1, "destroy_db_initially": 0, - # Temporarily disable it until its concurrency issue are fixed - "enable_pipelined_write": 0, + "enable_pipelined_write": lambda: random.randint(0, 1), "expected_values_path": expected_values_file.name, "flush_one_in": 1000000, # Temporarily disable hash index @@ -147,7 +146,7 @@ cf_consistency_params = { # more frequently "write_buffer_size": 1024 * 1024, # disable pipelined write when test_atomic_flush is true - "enable_pipelined_write": 0, + "enable_pipelined_write": lambda: random.randint(0, 1), }