diff --git a/HISTORY.md b/HISTORY.md index 93f9f1d7f..804673199 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Fixed a regression in iterator performance when the entire DB is a single memtable introduced in #10449. The fix is in #10705 and #10716. * Fixed an optimistic transaction validation bug caused by DBImpl::GetLatestSequenceForKey() returning non-latest seq for merge (#10724). * Fixed a bug in iterator refresh which could segfault for DeleteRange users (#10739). +* Fixed a bug causing manual flush with `flush_opts.wait=false` to stall when database has stopped all writes (#10001). ### Performance Improvements * Try to align the compaction output file boundaries to the next level ones, which can reduce more than 10% compaction load for the default level compaction. The feature is enabled by default, to disable, set `AdvancedColumnFamilyOptions.level_compaction_dynamic_file_size` to false. As a side effect, it can create SSTs larger than the target_file_size (capped at 2x target_file_size) or smaller files. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index cb1788179..724bf3246 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -3032,6 +3032,44 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) { SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_P(DBAtomicFlushTest, NoWaitWhenWritesStopped) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.atomic_flush = GetParam(); + options.max_write_buffer_number = 2; + options.memtable_factory.reset(test::NewSpecialSkipListFactory(1)); + + Reopen(options); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::DelayWrite:Start", + "DBAtomicFlushTest::NoWaitWhenWritesStopped:0"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(dbfull()->PauseBackgroundWork()); + for (int i = 0; i < options.max_write_buffer_number; ++i) { + ASSERT_OK(Put("k" + std::to_string(i), "v" + std::to_string(i))); + } + std::thread stalled_writer([&]() { ASSERT_OK(Put("k", "v")); }); + + TEST_SYNC_POINT("DBAtomicFlushTest::NoWaitWhenWritesStopped:0"); + + { + FlushOptions flush_opts; + flush_opts.wait = false; + flush_opts.allow_write_stall = true; + ASSERT_TRUE(db_->Flush(flush_opts).IsTryAgain()); + } + + ASSERT_OK(dbfull()->ContinueBackgroundWork()); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + + stalled_writer.join(); + + SyncPoint::GetInstance()->DisableProcessing(); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index a88dac2de..56cc6cde1 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -5281,7 +5281,7 @@ Status DBImpl::IngestExternalFiles( mutex_.Unlock(); status = AtomicFlushMemTables(cfds_to_flush, flush_opts, FlushReason::kExternalFileIngestion, - true /* writes_stopped */); + true /* entered_write_thread */); mutex_.Lock(); } else { for (size_t i = 0; i != num_cfs; ++i) { @@ -5292,7 +5292,7 @@ Status DBImpl::IngestExternalFiles( ->cfd(); status = FlushMemTable(cfd, flush_opts, FlushReason::kExternalFileIngestion, - true /* writes_stopped */); + true /* entered_write_thread */); mutex_.Lock(); if (!status.ok()) { break; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 7047bb58a..dfc701b30 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1880,12 +1880,13 @@ class DBImpl : public DB { // Force current memtable contents to be flushed. Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, - FlushReason flush_reason, bool writes_stopped = false); + FlushReason flush_reason, + bool entered_write_thread = false); Status AtomicFlushMemTables( const autovector& column_family_datas, const FlushOptions& options, FlushReason flush_reason, - bool writes_stopped = false); + bool entered_write_thread = false); // Wait until flushing this column family won't stall writes Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd, diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index b6882df06..7e7305a54 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1055,10 +1055,10 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, SelectColumnFamiliesForAtomicFlush(&cfds); mutex_.Unlock(); s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction, - false /* writes_stopped */); + false /* entered_write_thread */); } else { s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction, - false /* writes_stopped*/); + false /* entered_write_thread */); } if (!s.ok()) { LogFlush(immutable_db_options_.info_log); @@ -2016,9 +2016,16 @@ void DBImpl::GenerateFlushRequest(const autovector& cfds, Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& flush_options, - FlushReason flush_reason, bool writes_stopped) { + FlushReason flush_reason, + bool entered_write_thread) { // This method should not be called if atomic_flush is true. assert(!immutable_db_options_.atomic_flush); + if (!flush_options.wait && write_controller_.IsStopped()) { + std::ostringstream oss; + oss << "Writes have been stopped, thus unable to perform manual flush. " + "Please try again later after writes are resumed"; + return Status::TryAgain(oss.str()); + } Status s; if (!flush_options.allow_write_stall) { bool flush_needed = true; @@ -2029,6 +2036,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } } + const bool needs_to_join_write_thread = !entered_write_thread; autovector flush_reqs; autovector memtable_ids_to_wait; { @@ -2037,7 +2045,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, WriteThread::Writer w; WriteThread::Writer nonmem_w; - if (!writes_stopped) { + if (needs_to_join_write_thread) { write_thread_.EnterUnbatched(&w, &mutex_); if (two_write_queues_) { nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); @@ -2120,7 +2128,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, MaybeScheduleFlushOrCompaction(); } - if (!writes_stopped) { + if (needs_to_join_write_thread) { write_thread_.ExitUnbatched(&w); if (two_write_queues_) { nonmem_write_thread_.ExitUnbatched(&nonmem_w); @@ -2156,7 +2164,14 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, Status DBImpl::AtomicFlushMemTables( const autovector& column_family_datas, const FlushOptions& flush_options, FlushReason flush_reason, - bool writes_stopped) { + bool entered_write_thread) { + assert(immutable_db_options_.atomic_flush); + if (!flush_options.wait && write_controller_.IsStopped()) { + std::ostringstream oss; + oss << "Writes have been stopped, thus unable to perform manual flush. " + "Please try again later after writes are resumed"; + return Status::TryAgain(oss.str()); + } Status s; if (!flush_options.allow_write_stall) { int num_cfs_to_flush = 0; @@ -2173,6 +2188,7 @@ Status DBImpl::AtomicFlushMemTables( return s; } } + const bool needs_to_join_write_thread = !entered_write_thread; FlushRequest flush_req; autovector cfds; { @@ -2181,7 +2197,7 @@ Status DBImpl::AtomicFlushMemTables( WriteThread::Writer w; WriteThread::Writer nonmem_w; - if (!writes_stopped) { + if (needs_to_join_write_thread) { write_thread_.EnterUnbatched(&w, &mutex_); if (two_write_queues_) { nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); @@ -2229,7 +2245,7 @@ Status DBImpl::AtomicFlushMemTables( MaybeScheduleFlushOrCompaction(); } - if (!writes_stopped) { + if (needs_to_join_write_thread) { write_thread_.ExitUnbatched(&w); if (two_write_queues_) { nonmem_write_thread_.ExitUnbatched(&nonmem_w); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index e69a83b49..fc7495cf0 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1760,6 +1760,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, &time_delayed); uint64_t delay = write_controller_.GetDelay(immutable_db_options_.clock, num_bytes); + TEST_SYNC_POINT("DBImpl::DelayWrite:Start"); if (delay > 0) { if (write_options.no_slowdown) { return Status::Incomplete("Write stall"); @@ -1769,8 +1770,8 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, // Notify write_thread_ about the stall so it can setup a barrier and // fail any pending writers with no_slowdown write_thread_.BeginWriteStall(); - TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone"); mutex_.Unlock(); + TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone"); // We will delay the write until we have slept for `delay` microseconds // or we don't need a delay anymore. We check for cancellation every 1ms // (slightly longer because WriteController minimum delay is 1ms, in