Add signalall after removing item from manual_compaction deque

Summary:
When there are waiting manual compactions, we need to signal
them after removing the current manual compaction from the deque.

Test Plan: ColumnFamilytTest.SameCFManualManualCommaction

Reviewers: anthony, IslamAbdelRahman, kradhakrishnan, sdong

Reviewed By: sdong

Subscribers: dhruba, yoshinorim

Differential Revision: https://reviews.facebook.net/D52119
main
Venkatesh Radhakrishnan 9 years ago
parent d72b31774e
commit 7b12ae97d4
  1. 98
      db/column_family_test.cc
  2. 9
      db/db_impl.cc
  3. 2
      db/db_impl.h

@ -1216,6 +1216,104 @@ TEST_F(ColumnFamilyTest, ManualAndAutomaticCompactions) {
Close(); Close();
} }
TEST_F(ColumnFamilyTest, SameCFManualManualCompactions) {
Open();
CreateColumnFamilies({"one"});
ColumnFamilyOptions default_cf, one;
db_options_.max_open_files = 20; // only 10 files in file cache
db_options_.disableDataSync = true;
db_options_.max_background_compactions = 3;
default_cf.compaction_style = kCompactionStyleLevel;
default_cf.num_levels = 3;
default_cf.write_buffer_size = 64 << 10; // 64KB
default_cf.target_file_size_base = 30 << 10;
default_cf.source_compaction_factor = 100;
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
one.compaction_style = kCompactionStyleUniversal;
one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
one.write_buffer_size = 120000;
Reopen({default_cf, one});
// SETUP column family "one" -- universal style
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(i + 1), 1);
}
bool cf_1_1 = true;
bool cf_1_2 = true;
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:2"},
{"ColumnFamilyTest::ManualManual:4", "ColumnFamilyTest::ManualManual:5"},
{"ColumnFamilyTest::ManualManual:1", "ColumnFamilyTest::ManualManual:2"},
{"ColumnFamilyTest::ManualManual:1",
"ColumnFamilyTest::ManualManual:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (cf_1_1) {
TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:4");
cf_1_1 = false;
TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:3");
} else if (cf_1_2) {
TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:2");
cf_1_2 = false;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread threads([&] {
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = true;
ASSERT_OK(
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
});
TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:5");
WaitForFlush(1);
// Add more L0 files and force another manual compaction
for (int i = 0; i < one.level0_file_num_compaction_trigger - 2; ++i) {
PutRandomData(1, 10, 12000, true);
PutRandomData(1, 1, 10, true);
WaitForFlush(1);
AssertFilesPerLevel(ToString(one.level0_file_num_compaction_trigger + i),
1);
}
std::thread threads1([&] {
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false;
ASSERT_OK(
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
});
TEST_SYNC_POINT("ColumnFamilyTest::ManualManual:1");
threads.join();
threads1.join();
WaitForCompaction();
// VERIFY compaction "one"
ASSERT_LE(NumTableFilesAtLevel(0, 1), 2);
// Compare against saved keys
std::set<std::string>::iterator key_iter = keys_.begin();
while (key_iter != keys_.end()) {
ASSERT_NE("NOT_FOUND", Get(1, *key_iter));
key_iter++;
}
Close();
}
TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) { TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) {
Open(); Open();
CreateColumnFamilies({"one"}); CreateColumnFamilies({"one"});

@ -2279,7 +2279,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
while (!manual.done) { while (!manual.done) {
assert(HasPendingManualCompaction()); assert(HasPendingManualCompaction());
manual_conflict = false; manual_conflict = false;
if (ShouldRunManualCompaction(&manual) || (manual.in_progress == true) || if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
scheduled || scheduled ||
((manual.manual_end = &manual.tmp_storage1)&&( ((manual.manual_end = &manual.tmp_storage1)&&(
(manual.compaction = manual.cfd->CompactRange( (manual.compaction = manual.cfd->CompactRange(
@ -2318,6 +2318,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
assert(!manual.in_progress); assert(!manual.in_progress);
assert(HasPendingManualCompaction()); assert(HasPendingManualCompaction());
RemoveManualCompaction(&manual); RemoveManualCompaction(&manual);
bg_cv_.SignalAll();
return manual.status; return manual.status;
} }
@ -3057,9 +3058,9 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
return; return;
} }
bool DBImpl::ShouldRunManualCompaction(ManualCompaction* m) { bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
if ((m->exclusive) && (bg_compaction_scheduled_ > 0)) { if (m->exclusive) {
return true; return (bg_compaction_scheduled_ > 0);
} }
std::deque<ManualCompaction*>::iterator it = std::deque<ManualCompaction*>::iterator it =
manual_compaction_dequeue_.begin(); manual_compaction_dequeue_.begin();

@ -897,7 +897,7 @@ class DBImpl : public DB {
bool HasExclusiveManualCompaction(); bool HasExclusiveManualCompaction();
void AddManualCompaction(ManualCompaction* m); void AddManualCompaction(ManualCompaction* m);
void RemoveManualCompaction(ManualCompaction* m); void RemoveManualCompaction(ManualCompaction* m);
bool ShouldRunManualCompaction(ManualCompaction* m); bool ShouldntRunManualCompaction(ManualCompaction* m);
bool HaveManualCompaction(ColumnFamilyData* cfd); bool HaveManualCompaction(ColumnFamilyData* cfd);
bool MCOverlap(ManualCompaction* m, ManualCompaction* m1); bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
}; };

Loading…
Cancel
Save