Avoiding write stall caused by manual flushes (#4297)

Summary:
Basically at the moment it seems it's possible to cause write stall by calling flush (either manually vis DB::Flush(), or from Backup Engine directly calling FlushMemTable() while background flush may be already happening.

One of the ways to fix it is that in DBImpl::CompactRange() we already check for possible stall and delay flush if needed before we actually proceed to call FlushMemTable(). We can simply move this delay logic to separate method and call it from FlushMemTable.

This is draft patch, for first look; need to check tests/update SyncPoints and most certainly would need to add allow_write_stall method to FlushOptions().
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4297

Differential Revision: D9420705

Pulled By: mikhail-antonov

fbshipit-source-id: f81d206b55e1d7b39e4dc64242fdfbceeea03fcc
main
Mikhail Antonov 6 years ago committed by Facebook Github Bot
parent 5f63a89b35
commit 927f274939
  1. 3
      HISTORY.md
  2. 26
      db/db_compaction_test.cc
  3. 1
      db/db_flush_test.cc
  4. 6
      db/db_impl.h
  5. 127
      db/db_impl_compaction_flush.cc
  6. 4
      db/db_impl_debug.cc
  7. 16
      db/db_test.cc
  8. 4
      db/listener_test.cc
  9. 9
      include/rocksdb/options.h
  10. 30
      utilities/transactions/transaction_test.cc

@ -2,6 +2,7 @@
## Unreleased ## Unreleased
### Public API Change ### Public API Change
* `OnTableFileCreated` will now be called for empty files generated during compaction. In that case, `TableFileCreationInfo::file_path` will be "(nil)" and `TableFileCreationInfo::file_size` will be zero. * `OnTableFileCreated` will now be called for empty files generated during compaction. In that case, `TableFileCreationInfo::file_path` will be "(nil)" and `TableFileCreationInfo::file_size` will be zero.
* Add `FlushOptions::allow_write_stall`, which controls whether Flush calls start working immediately, even if it causes user writes to stall, or will wait until flush can be performed without causing write stall (similar to `CompactRangeOptions::allow_write_stall`). Note that the default value is false, meaning we add delay to Flush calls until stalling can be avoided when possible. This is behavior change compared to previous RocksDB versions, where Flush calls didn't check if they might cause stall or not.
### New Features ### New Features
### Bug Fixes ### Bug Fixes
@ -10,7 +11,7 @@
## 5.16.0 (8/21/2018) ## 5.16.0 (8/21/2018)
### Public API Change ### Public API Change
* The merge operands are passed to `MergeOperator::ShouldMerge` in the reversed order relative to how they were merged (passed to FullMerge or FullMergeV2) for performance reasons * The merge operands are passed to `MergeOperator::ShouldMerge` in the reversed order relative to how they were merged (passed to FullMerge or FullMergeV2) for performance reasons
* GetAllKeyVersions() to take an extra argument of `max_num_ikeys`. * GetAllKeyVersions() to take an extra argument of `max_num_ikeys`.
* Using ZSTD dictionary trainer (i.e., setting `CompressionOptions::zstd_max_train_bytes` to a nonzero value) now requires ZSTD version 1.1.3 or later. * Using ZSTD dictionary trainer (i.e., setting `CompressionOptions::zstd_max_train_bytes` to a nonzero value) now requires ZSTD version 1.1.3 or later.

@ -3485,12 +3485,13 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
// ensure the auto compaction doesn't finish until manual compaction has // ensure the auto compaction doesn't finish until manual compaction has
// had a chance to be delayed. // had a chance to be delayed.
rocksdb::SyncPoint::GetInstance()->LoadDependency( rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait", "CompactionJob::Run():End"}}); {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"CompactionJob::Run():End"}});
} else { } else {
// ensure the auto-compaction doesn't finish until manual compaction has // ensure the auto-compaction doesn't finish until manual compaction has
// continued without delay. // continued without delay.
rocksdb::SyncPoint::GetInstance()->LoadDependency( rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}}); {{"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
} }
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -3538,12 +3539,13 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
// ensure the flush doesn't finish until manual compaction has had a // ensure the flush doesn't finish until manual compaction has had a
// chance to be delayed. // chance to be delayed.
rocksdb::SyncPoint::GetInstance()->LoadDependency( rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait", "FlushJob::WriteLevel0Table"}}); {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"FlushJob::WriteLevel0Table"}});
} else { } else {
// ensure the flush doesn't finish until manual compaction has continued // ensure the flush doesn't finish until manual compaction has continued
// without delay. // without delay.
rocksdb::SyncPoint::GetInstance()->LoadDependency( rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWaitDone", {{"DBImpl::FlushMemTable:StallWaitDone",
"FlushJob::WriteLevel0Table"}}); "FlushJob::WriteLevel0Table"}});
} }
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -3553,6 +3555,7 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024))); ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
FlushOptions flush_opts; FlushOptions flush_opts;
flush_opts.wait = false; flush_opts.wait = false;
flush_opts.allow_write_stall = true;
dbfull()->Flush(flush_opts); dbfull()->Flush(flush_opts);
} }
@ -3588,7 +3591,7 @@ TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
// The auto-compaction waits until the manual compaction finishes to ensure // The auto-compaction waits until the manual compaction finishes to ensure
// the signal comes from closing CF/DB, not from compaction making progress. // the signal comes from closing CF/DB, not from compaction making progress.
rocksdb::SyncPoint::GetInstance()->LoadDependency( rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait", {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"}, "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"},
{"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual", {"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual",
"CompactionJob::Run():End"}}); "CompactionJob::Run():End"}});
@ -3639,18 +3642,21 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
// began. So it unblocks CompactRange and precludes its flush. Throughout the // began. So it unblocks CompactRange and precludes its flush. Throughout the
// test, stall conditions are upheld via high L0 file count. // test, stall conditions are upheld via high L0 file count.
rocksdb::SyncPoint::GetInstance()->LoadDependency( rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait", {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"}, "DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"},
{"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush", {"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush",
"DBImpl::CompactRange:StallWaitDone"}, "DBImpl::FlushMemTable:StallWaitDone"},
{"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}}); {"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
//used for the delayable flushes
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
for (int i = 0; i < kNumL0FilesLimit - 1; ++i) { for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
for (int j = 0; j < 2; ++j) { for (int j = 0; j < 2; ++j) {
ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024))); ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
} }
Flush(); dbfull()->Flush(flush_opts);
} }
auto manual_compaction_thread = port::Thread([this]() { auto manual_compaction_thread = port::Thread([this]() {
CompactRangeOptions cro; CompactRangeOptions cro;
@ -3660,7 +3666,7 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"); TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
Put(ToString(0), RandomString(&rnd, 1024)); Put(ToString(0), RandomString(&rnd, 1024));
Flush(); dbfull()->Flush(flush_opts);
Put(ToString(0), RandomString(&rnd, 1024)); Put(ToString(0), RandomString(&rnd, 1024));
TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush"); TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
manual_compaction_thread.join(); manual_compaction_thread.join();

@ -35,6 +35,7 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
Reopen(options); Reopen(options);
FlushOptions no_wait; FlushOptions no_wait;
no_wait.wait = false; no_wait.wait = false;
no_wait.allow_write_stall=true;
SyncPoint::GetInstance()->LoadDependency( SyncPoint::GetInstance()->LoadDependency(
{{"VersionSet::LogAndApply:WriteManifest", {{"VersionSet::LogAndApply:WriteManifest",

@ -396,7 +396,7 @@ class DBImpl : public DB {
Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr); Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr);
// Force current memtable contents to be flushed. // Force current memtable contents to be flushed.
Status TEST_FlushMemTable(bool wait = true, Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false,
ColumnFamilyHandle* cfh = nullptr); ColumnFamilyHandle* cfh = nullptr);
// Wait for memtable compaction // Wait for memtable compaction
@ -946,6 +946,10 @@ class DBImpl : public DB {
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
FlushReason flush_reason, bool writes_stopped = false); FlushReason flush_reason, bool writes_stopped = false);
// Wait until flushing this column family won't stall writes
Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
bool* flush_needed);
// Wait for memtable flushed. // Wait for memtable flushed.
// If flush_memtable_id is non-null, wait until the memtable with the ID // If flush_memtable_id is non-null, wait until the memtable with the ID
// gets flush. Otherwise, wait until the column family don't have any // gets flush. Otherwise, wait until the column family don't have any

@ -344,60 +344,12 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
CleanupSuperVersion(super_version); CleanupSuperVersion(super_version);
} }
if (!options.allow_write_stall && flush_needed) {
InstrumentedMutexLock l(&mutex_);
uint64_t orig_active_memtable_id = cfd->mem()->GetID();
WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
do {
if (write_stall_condition != WriteStallCondition::kNormal) {
TEST_SYNC_POINT("DBImpl::CompactRange:StallWait");
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] CompactRange waiting on stall conditions to clear",
cfd->GetName().c_str());
bg_cv_.Wait();
}
if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
uint64_t earliest_memtable_id =
std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
if (earliest_memtable_id > orig_active_memtable_id) {
// We waited so long that the memtable we were originally waiting on was
// flushed.
flush_needed = false;
break;
}
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
const auto* vstorage = cfd->current()->storage_info();
// Skip stalling check if we're below auto-flush and auto-compaction
// triggers. If it stalled in these conditions, that'd mean the stall
// triggers are so low that stalling is needed for any background work. In
// that case we shouldn't wait since background work won't be scheduled.
if (cfd->imm()->NumNotFlushed() <
cfd->ioptions()->min_write_buffer_number_to_merge &&
vstorage->l0_delay_trigger_count() <
mutable_cf_options.level0_file_num_compaction_trigger) {
break;
}
// check whether one extra immutable memtable or an extra L0 file would
// cause write stalling mode to be entered. It could still enter stall
// mode due to pending compaction bytes, but that's less common
write_stall_condition =
ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->imm()->NumNotFlushed() + 1,
vstorage->l0_delay_trigger_count() + 1,
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
.first;
} while (write_stall_condition != WriteStallCondition::kNormal);
}
TEST_SYNC_POINT("DBImpl::CompactRange:StallWaitDone");
Status s; Status s;
if (flush_needed) { if (flush_needed) {
s = FlushMemTable(cfd, FlushOptions(), FlushReason::kManualCompaction); FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
false /* writes_stopped*/);
if (!s.ok()) { if (!s.ok()) {
LogFlush(immutable_db_options_.info_log); LogFlush(immutable_db_options_.info_log);
return s; return s;
@ -1097,6 +1049,14 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
FlushReason flush_reason, bool writes_stopped) { FlushReason flush_reason, bool writes_stopped) {
Status s; Status s;
uint64_t flush_memtable_id = 0; uint64_t flush_memtable_id = 0;
if (!flush_options.allow_write_stall) {
bool flush_needed = true;
s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
if (!s.ok() || !flush_needed) {
return s;
}
}
FlushRequest flush_req; FlushRequest flush_req;
{ {
WriteContext context; WriteContext context;
@ -1141,6 +1101,69 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
return s; return s;
} }
// Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
// cause write stall, for example if one memtable is being flushed already.
// This method tries to avoid write stall (similar to CompactRange() behavior)
// it emulates how the SuperVersion / LSM would change if flush happens, checks
// it against various constrains and delays flush if it'd cause write stall.
// Called should check status and flush_needed to see if flush already happened.
Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
bool* flush_needed) {
{
*flush_needed = true;
InstrumentedMutexLock l(&mutex_);
uint64_t orig_active_memtable_id = cfd->mem()->GetID();
WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
do {
if (write_stall_condition != WriteStallCondition::kNormal) {
TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] WaitUntilFlushWouldNotStallWrites"
" waiting on stall conditions to clear",
cfd->GetName().c_str());
bg_cv_.Wait();
}
if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
uint64_t earliest_memtable_id =
std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
if (earliest_memtable_id > orig_active_memtable_id) {
// We waited so long that the memtable we were originally waiting on was
// flushed.
*flush_needed = false;
return Status::OK();
}
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
const auto* vstorage = cfd->current()->storage_info();
// Skip stalling check if we're below auto-flush and auto-compaction
// triggers. If it stalled in these conditions, that'd mean the stall
// triggers are so low that stalling is needed for any background work. In
// that case we shouldn't wait since background work won't be scheduled.
if (cfd->imm()->NumNotFlushed() <
cfd->ioptions()->min_write_buffer_number_to_merge &&
vstorage->l0_delay_trigger_count() <
mutable_cf_options.level0_file_num_compaction_trigger) {
break;
}
// check whether one extra immutable memtable or an extra L0 file would
// cause write stalling mode to be entered. It could still enter stall
// mode due to pending compaction bytes, but that's less common
write_stall_condition =
ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->imm()->NumNotFlushed() + 1,
vstorage->l0_delay_trigger_count() + 1,
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
.first;
} while (write_stall_condition != WriteStallCondition::kNormal);
}
return Status::OK();
}
// Wait for memtables to be flushed for multiple column families. // Wait for memtables to be flushed for multiple column families.
// let N = cfds.size() // let N = cfds.size()
// for i in [0, N), // for i in [0, N),

@ -100,9 +100,11 @@ Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) {
return SwitchMemtable(cfd, &write_context); return SwitchMemtable(cfd, &write_context);
} }
Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) { Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall,
ColumnFamilyHandle* cfh) {
FlushOptions fo; FlushOptions fo;
fo.wait = wait; fo.wait = wait;
fo.allow_write_stall = allow_write_stall;
ColumnFamilyData* cfd; ColumnFamilyData* cfd;
if (cfh == nullptr) { if (cfh == nullptr) {
cfd = default_cf_handle_->cfd(); cfd = default_cf_handle_->cfd();

@ -4331,7 +4331,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
// Clean up memtable and L0. Block compaction threads. If continue to write // Clean up memtable and L0. Block compaction threads. If continue to write
// and flush memtables. We should see put stop after 8 memtable flushes // and flush memtables. We should see put stop after 8 memtable flushes
// since level0_stop_writes_trigger = 8 // since level0_stop_writes_trigger = 8
dbfull()->TEST_FlushMemTable(true); dbfull()->TEST_FlushMemTable(true, true);
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Block compaction // Block compaction
test::SleepingBackgroundTask sleeping_task_low; test::SleepingBackgroundTask sleeping_task_low;
@ -4344,7 +4344,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
WriteOptions wo; WriteOptions wo;
while (count < 64) { while (count < 64) {
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo)); ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
dbfull()->TEST_FlushMemTable(true); dbfull()->TEST_FlushMemTable(true, true);
count++; count++;
if (dbfull()->TEST_write_controler().IsStopped()) { if (dbfull()->TEST_write_controler().IsStopped()) {
sleeping_task_low.WakeUp(); sleeping_task_low.WakeUp();
@ -4372,7 +4372,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
count = 0; count = 0;
while (count < 64) { while (count < 64) {
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo)); ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
dbfull()->TEST_FlushMemTable(true); dbfull()->TEST_FlushMemTable(true, true);
count++; count++;
if (dbfull()->TEST_write_controler().IsStopped()) { if (dbfull()->TEST_write_controler().IsStopped()) {
sleeping_task_low.WakeUp(); sleeping_task_low.WakeUp();
@ -5512,7 +5512,7 @@ TEST_F(DBTest, SoftLimit) {
for (int i = 0; i < 72; i++) { for (int i = 0; i < 72; i++) {
Put(Key(i), std::string(5000, 'x')); Put(Key(i), std::string(5000, 'x'));
if (i % 10 == 0) { if (i % 10 == 0) {
Flush(); dbfull()->TEST_FlushMemTable(true, true);
} }
} }
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
@ -5522,7 +5522,7 @@ TEST_F(DBTest, SoftLimit) {
for (int i = 0; i < 72; i++) { for (int i = 0; i < 72; i++) {
Put(Key(i), std::string(5000, 'x')); Put(Key(i), std::string(5000, 'x'));
if (i % 10 == 0) { if (i % 10 == 0) {
Flush(); dbfull()->TEST_FlushMemTable(true, true);
} }
} }
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
@ -5541,7 +5541,7 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(i), std::string(5000, 'x')); Put(Key(i), std::string(5000, 'x'));
Put(Key(100 - i), std::string(5000, 'x')); Put(Key(100 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB. // Flush the file. File size is around 30KB.
Flush(); dbfull()->TEST_FlushMemTable(true, true);
} }
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
@ -5576,7 +5576,7 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(10 + i), std::string(5000, 'x')); Put(Key(10 + i), std::string(5000, 'x'));
Put(Key(90 - i), std::string(5000, 'x')); Put(Key(90 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB. // Flush the file. File size is around 30KB.
Flush(); dbfull()->TEST_FlushMemTable(true, true);
} }
// Wake up sleep task to enable compaction to run and waits // Wake up sleep task to enable compaction to run and waits
@ -5597,7 +5597,7 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(20 + i), std::string(5000, 'x')); Put(Key(20 + i), std::string(5000, 'x'));
Put(Key(80 - i), std::string(5000, 'x')); Put(Key(80 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB. // Flush the file. File size is around 30KB.
Flush(); dbfull()->TEST_FlushMemTable(true, true);
} }
// Wake up sleep task to enable compaction to run and waits // Wake up sleep task to enable compaction to run and waits
// for it to go to sleep state again to make sure one compaction // for it to go to sleep state again to make sure one compaction

@ -417,7 +417,9 @@ TEST_F(EventListenerTest, DisableBGCompaction) {
for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10; for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
++i) { ++i) {
Put(1, ToString(i), std::string(10000, 'x'), WriteOptions()); Put(1, ToString(i), std::string(10000, 'x'), WriteOptions());
db_->Flush(FlushOptions(), handles_[1]); FlushOptions fo;
fo.allow_write_stall = true;
db_->Flush(fo, handles_[1]);
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
} }
ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9); ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);

@ -1183,8 +1183,13 @@ struct FlushOptions {
// If true, the flush will wait until the flush is done. // If true, the flush will wait until the flush is done.
// Default: true // Default: true
bool wait; bool wait;
// If true, the flush would proceed immediately even it means writes will
FlushOptions() : wait(true) {} // stall for the duration of the flush; if false the operation will wait
// until it's possible to do flush w/o causing stall or until required flush
// is performed by someone else (foreground call or background thread).
// Default: false
bool allow_write_stall;
FlushOptions() : wait(true), allow_write_stall(false) {}
}; };
// Create a Logger from provided DBOptions // Create a Logger from provided DBOptions

@ -1719,7 +1719,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
} }
// flush only cfa memtable // flush only cfa memtable
s = db_impl->TEST_FlushMemTable(true, cfa); s = db_impl->TEST_FlushMemTable(true, false, cfa);
ASSERT_OK(s); ASSERT_OK(s);
switch (txn_db_options.write_policy) { switch (txn_db_options.write_policy) {
@ -1738,7 +1738,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
} }
// flush only cfb memtable // flush only cfb memtable
s = db_impl->TEST_FlushMemTable(true, cfb); s = db_impl->TEST_FlushMemTable(true, false, cfb);
ASSERT_OK(s); ASSERT_OK(s);
// should show not dependency on logs // should show not dependency on logs
@ -3788,7 +3788,7 @@ TEST_P(TransactionTest, SavepointTest3) {
s = txn1->Put("A", ""); s = txn1->Put("A", "");
ASSERT_OK(s); ASSERT_OK(s);
s = txn1->PopSavePoint(); // Still no SavePoint present s = txn1->PopSavePoint(); // Still no SavePoint present
ASSERT_TRUE(s.IsNotFound()); ASSERT_TRUE(s.IsNotFound());
@ -3798,21 +3798,21 @@ TEST_P(TransactionTest, SavepointTest3) {
ASSERT_OK(s); ASSERT_OK(s);
s = txn1->PopSavePoint(); // Remove 1 s = txn1->PopSavePoint(); // Remove 1
ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound()); ASSERT_TRUE(txn1->RollbackToSavePoint().IsNotFound());
// Verify that "A" is still locked // Verify that "A" is still locked
Transaction* txn2 = db->BeginTransaction(write_options, txn_options); Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn2); ASSERT_TRUE(txn2);
s = txn2->Put("A", "a2"); s = txn2->Put("A", "a2");
ASSERT_TRUE(s.IsTimedOut()); ASSERT_TRUE(s.IsTimedOut());
delete txn2; delete txn2;
txn1->SetSavePoint(); // 2 txn1->SetSavePoint(); // 2
s = txn1->Put("B", "b"); s = txn1->Put("B", "b");
ASSERT_OK(s); ASSERT_OK(s);
txn1->SetSavePoint(); // 3 txn1->SetSavePoint(); // 3
s = txn1->Put("B", "b2"); s = txn1->Put("B", "b2");
@ -3822,7 +3822,7 @@ TEST_P(TransactionTest, SavepointTest3) {
s = txn1->PopSavePoint(); s = txn1->PopSavePoint();
ASSERT_OK(s); ASSERT_OK(s);
s = txn1->PopSavePoint(); s = txn1->PopSavePoint();
ASSERT_TRUE(s.IsNotFound()); ASSERT_TRUE(s.IsNotFound());
@ -3836,12 +3836,12 @@ TEST_P(TransactionTest, SavepointTest3) {
s = db->Get(read_options, "A", &value); s = db->Get(read_options, "A", &value);
ASSERT_OK(s); ASSERT_OK(s);
ASSERT_EQ("a", value); ASSERT_EQ("a", value);
// tnx1 should have set "B" to just "b" // tnx1 should have set "B" to just "b"
s = db->Get(read_options, "B", &value); s = db->Get(read_options, "B", &value);
ASSERT_OK(s); ASSERT_OK(s);
ASSERT_EQ("b", value); ASSERT_EQ("b", value);
s = db->Get(read_options, "C", &value); s = db->Get(read_options, "C", &value);
ASSERT_TRUE(s.IsNotFound()); ASSERT_TRUE(s.IsNotFound());
} }
@ -5513,7 +5513,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
db->FlushWAL(true); db->FlushWAL(true);
// Flush only cf 1 // Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB()) reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]); ->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles)); ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid"); txn0 = db->GetTransactionByName("xid");
@ -5551,7 +5551,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true)); ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1 // Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB()) reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]); ->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles)); ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid"); txn0 = db->GetTransactionByName("xid");
@ -5584,7 +5584,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true)); ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1 // Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB()) reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]); ->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles)); ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid"); txn0 = db->GetTransactionByName("xid");
@ -5611,7 +5611,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true)); ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1 // Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB()) reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]); ->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles)); ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid"); txn0 = db->GetTransactionByName("xid");
@ -5638,7 +5638,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true)); ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1 // Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB()) reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]); ->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash(); reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles)); ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid"); txn0 = db->GetTransactionByName("xid");

Loading…
Cancel
Save