Manual flush with `wait=false` should not stall when writes stopped (#10001)

Summary:
When `FlushOptions::wait` is set to false, manual flush should not stall forever.

If the database has already stopped writes, then the thread calling `DB::Flush()` with
`FlushOptions::wait=false` should not enter the `DBImpl::write_thread_`.

To prevent this, we should do a check at the beginning and return `TryAgain()`

Resolves: https://github.com/facebook/rocksdb/issues/9892

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10001

Reviewed By: siying

Differential Revision: D36422303

Pulled By: siying

fbshipit-source-id: 723bd3065e8edc4f17c82449d0d6b95a2381ac0a
main
Yanqin Jin 2 years ago committed by Facebook GitHub Bot
parent f007ad8b4f
commit edda219fc3
  1. 1
      HISTORY.md
  2. 38
      db/db_flush_test.cc
  3. 4
      db/db_impl/db_impl.cc
  4. 5
      db/db_impl/db_impl.h
  5. 32
      db/db_impl/db_impl_compaction_flush.cc
  6. 3
      db/db_impl/db_impl_write.cc

@ -8,6 +8,7 @@
* Fixed a regression in iterator performance when the entire DB is a single memtable introduced in #10449. The fix is in #10705 and #10716.
* Fixed an optimistic transaction validation bug caused by DBImpl::GetLatestSequenceForKey() returning non-latest seq for merge (#10724).
* Fixed a bug in iterator refresh which could segfault for DeleteRange users (#10739).
* Fixed a bug causing manual flush with `flush_opts.wait=false` to stall when database has stopped all writes (#10001).
### Performance Improvements
* Try to align the compaction output file boundaries to the next level ones, which can reduce more than 10% compaction load for the default level compaction. The feature is enabled by default, to disable, set `AdvancedColumnFamilyOptions.level_compaction_dynamic_file_size` to false. As a side effect, it can create SSTs larger than the target_file_size (capped at 2x target_file_size) or smaller files.

@ -3032,6 +3032,44 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) {
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBAtomicFlushTest, NoWaitWhenWritesStopped) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
options.max_write_buffer_number = 2;
options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
Reopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::DelayWrite:Start",
"DBAtomicFlushTest::NoWaitWhenWritesStopped:0"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->PauseBackgroundWork());
for (int i = 0; i < options.max_write_buffer_number; ++i) {
ASSERT_OK(Put("k" + std::to_string(i), "v" + std::to_string(i)));
}
std::thread stalled_writer([&]() { ASSERT_OK(Put("k", "v")); });
TEST_SYNC_POINT("DBAtomicFlushTest::NoWaitWhenWritesStopped:0");
{
FlushOptions flush_opts;
flush_opts.wait = false;
flush_opts.allow_write_stall = true;
ASSERT_TRUE(db_->Flush(flush_opts).IsTryAgain());
}
ASSERT_OK(dbfull()->ContinueBackgroundWork());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
stalled_writer.join();
SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());

@ -5281,7 +5281,7 @@ Status DBImpl::IngestExternalFiles(
mutex_.Unlock();
status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
true /* entered_write_thread */);
mutex_.Lock();
} else {
for (size_t i = 0; i != num_cfs; ++i) {
@ -5292,7 +5292,7 @@ Status DBImpl::IngestExternalFiles(
->cfd();
status = FlushMemTable(cfd, flush_opts,
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
true /* entered_write_thread */);
mutex_.Lock();
if (!status.ok()) {
break;

@ -1880,12 +1880,13 @@ class DBImpl : public DB {
// Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
FlushReason flush_reason, bool writes_stopped = false);
FlushReason flush_reason,
bool entered_write_thread = false);
Status AtomicFlushMemTables(
const autovector<ColumnFamilyData*>& column_family_datas,
const FlushOptions& options, FlushReason flush_reason,
bool writes_stopped = false);
bool entered_write_thread = false);
// Wait until flushing this column family won't stall writes
Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,

@ -1055,10 +1055,10 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
false /* writes_stopped */);
false /* entered_write_thread */);
} else {
s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
false /* writes_stopped*/);
false /* entered_write_thread */);
}
if (!s.ok()) {
LogFlush(immutable_db_options_.info_log);
@ -2016,9 +2016,16 @@ void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
FlushReason flush_reason, bool writes_stopped) {
FlushReason flush_reason,
bool entered_write_thread) {
// This method should not be called if atomic_flush is true.
assert(!immutable_db_options_.atomic_flush);
if (!flush_options.wait && write_controller_.IsStopped()) {
std::ostringstream oss;
oss << "Writes have been stopped, thus unable to perform manual flush. "
"Please try again later after writes are resumed";
return Status::TryAgain(oss.str());
}
Status s;
if (!flush_options.allow_write_stall) {
bool flush_needed = true;
@ -2029,6 +2036,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}
const bool needs_to_join_write_thread = !entered_write_thread;
autovector<FlushRequest> flush_reqs;
autovector<uint64_t> memtable_ids_to_wait;
{
@ -2037,7 +2045,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
WriteThread::Writer w;
WriteThread::Writer nonmem_w;
if (!writes_stopped) {
if (needs_to_join_write_thread) {
write_thread_.EnterUnbatched(&w, &mutex_);
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
@ -2120,7 +2128,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
MaybeScheduleFlushOrCompaction();
}
if (!writes_stopped) {
if (needs_to_join_write_thread) {
write_thread_.ExitUnbatched(&w);
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
@ -2156,7 +2164,14 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
Status DBImpl::AtomicFlushMemTables(
const autovector<ColumnFamilyData*>& column_family_datas,
const FlushOptions& flush_options, FlushReason flush_reason,
bool writes_stopped) {
bool entered_write_thread) {
assert(immutable_db_options_.atomic_flush);
if (!flush_options.wait && write_controller_.IsStopped()) {
std::ostringstream oss;
oss << "Writes have been stopped, thus unable to perform manual flush. "
"Please try again later after writes are resumed";
return Status::TryAgain(oss.str());
}
Status s;
if (!flush_options.allow_write_stall) {
int num_cfs_to_flush = 0;
@ -2173,6 +2188,7 @@ Status DBImpl::AtomicFlushMemTables(
return s;
}
}
const bool needs_to_join_write_thread = !entered_write_thread;
FlushRequest flush_req;
autovector<ColumnFamilyData*> cfds;
{
@ -2181,7 +2197,7 @@ Status DBImpl::AtomicFlushMemTables(
WriteThread::Writer w;
WriteThread::Writer nonmem_w;
if (!writes_stopped) {
if (needs_to_join_write_thread) {
write_thread_.EnterUnbatched(&w, &mutex_);
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
@ -2229,7 +2245,7 @@ Status DBImpl::AtomicFlushMemTables(
MaybeScheduleFlushOrCompaction();
}
if (!writes_stopped) {
if (needs_to_join_write_thread) {
write_thread_.ExitUnbatched(&w);
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);

@ -1760,6 +1760,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
&time_delayed);
uint64_t delay =
write_controller_.GetDelay(immutable_db_options_.clock, num_bytes);
TEST_SYNC_POINT("DBImpl::DelayWrite:Start");
if (delay > 0) {
if (write_options.no_slowdown) {
return Status::Incomplete("Write stall");
@ -1769,8 +1770,8 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
// Notify write_thread_ about the stall so it can setup a barrier and
// fail any pending writers with no_slowdown
write_thread_.BeginWriteStall();
TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
// We will delay the write until we have slept for `delay` microseconds
// or we don't need a delay anymore. We check for cancellation every 1ms
// (slightly longer because WriteController minimum delay is 1ms, in

Loading…
Cancel
Save