Allow flushes to run in parallel with manual compaction

Summary: As title. I spent some time thinking about it and I don't think there should be any issue with running manual compaction and flushes in parallel

Test Plan: make check works

Reviewers: rven, yhchiang, sdong

Reviewed By: yhchiang, sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D38355
main
Igor Canadi 10 years ago
parent 74f3832d85
commit b0fdda4ff0
  1. 11
      db/db_impl.cc
  2. 64
      db/db_test.cc

@ -1338,6 +1338,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
} }
} }
if (!s.ok()) { if (!s.ok()) {
@ -1865,9 +1867,6 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} else if (shutting_down_.load(std::memory_order_acquire)) { } else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions // DB is being deleted; no more background compactions
return; return;
} else if (bg_manual_only_) {
// manual only
return;
} }
while (unscheduled_flushes_ > 0 && while (unscheduled_flushes_ > 0 &&
@ -1877,6 +1876,12 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this); env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
} }
if (bg_manual_only_) {
// only manual compactions are allowed to run. don't schedule automatic
// compactions
return;
}
if (db_options_.max_background_flushes == 0 && if (db_options_.max_background_flushes == 0 &&
bg_compaction_scheduled_ < db_options_.max_background_compactions && bg_compaction_scheduled_ < db_options_.max_background_compactions &&
unscheduled_flushes_ > 0) { unscheduled_flushes_ > 0) {

@ -12881,6 +12881,70 @@ TEST_F(DBTest, LargeBatchWithColumnFamilies) {
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
} }
// Make sure that Flushes can proceed in parallel with CompactRange()
TEST_F(DBTest, FlushesInParallelWithCompactRange) {
// iter == 0 -- leveled
// iter == 1 -- leveled, but throw in a flush between two levels compacting
// iter == 2 -- universal
for (int iter = 0; iter < 3; ++iter) {
printf("iter %d\n", iter);
Options options = CurrentOptions();
if (iter < 2) {
options.compaction_style = kCompactionStyleLevel;
} else {
options.compaction_style = kCompactionStyleUniversal;
}
options.write_buffer_size = 110 << 10;
options.level0_file_num_compaction_trigger = 4;
options.num_levels = 4;
options.compression = kNoCompression;
options.max_bytes_for_level_base = 450 << 10;
options.target_file_size_base = 98 << 10;
options.max_write_buffer_number = 2;
DestroyAndReopen(options);
Random rnd(301);
for (int num = 0; num < 14; num++) {
GenerateNewRandomFile(&rnd);
}
if (iter == 1) {
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::RunManualCompaction()::1",
"DBTest::FlushesInParallelWithCompactRange:1"},
{"DBTest::FlushesInParallelWithCompactRange:2",
"DBImpl::RunManualCompaction()::2"}});
} else {
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"CompactionJob::Run():Start",
"DBTest::FlushesInParallelWithCompactRange:1"},
{"DBTest::FlushesInParallelWithCompactRange:2",
"CompactionJob::Run():End"}});
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::vector<std::thread> threads;
threads.emplace_back([&]() { Compact("a", "z"); });
TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1");
// this has to start a flush. if flushes are blocked, this will try to
// create
// 3 memtables, and that will fail because max_write_buffer_number is 2
for (int num = 0; num < 3; num++) {
GenerateNewRandomFile(&rnd, /* nowait */ true);
}
TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:2");
for (auto& t : threads) {
t.join();
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save