Fix mutex unlock issue between scheduled compaction and ReleaseCompactionFiles()

Summary:
NotifyOnCompactionCompleted can unlock the mutex.
That mean that we can schedule a background compaction that will start before we ReleaseCompactionFiles().

Test Plan:
added unittest
existing unittest

Reviewers: yhchiang, sdong

Reviewed By: sdong

Subscribers: yoshinorim, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D58065
main
Islam AbdelRahman 9 years ago
parent 05c5c39a7c
commit c70a9335de
  1. 5
      db/db_impl.cc
  2. 69
      db/db_test2.cc

@ -2159,6 +2159,7 @@ void DBImpl::NotifyOnCompactionCompleted(
} }
// release lock while notifying events // release lock while notifying events
mutex_.Unlock(); mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
{ {
CompactionJobInfo info; CompactionJobInfo info;
info.cf_name = cfd->GetName(); info.cf_name = cfd->GetName();
@ -3267,11 +3268,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
*made_progress = true; *made_progress = true;
} }
if (c != nullptr) { if (c != nullptr) {
c->ReleaseCompactionFiles(status);
*made_progress = true;
NotifyOnCompactionCompleted( NotifyOnCompactionCompleted(
c->column_family_data(), c.get(), status, c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id); compaction_job_stats, job_context->job_id);
c->ReleaseCompactionFiles(status);
*made_progress = true;
} }
// this will unref its input_version and column_family_data // this will unref its input_version and column_family_data
c.reset(); c.reset();

@ -897,6 +897,75 @@ TEST_F(DBTest2, CompressionOptions) {
ASSERT_EQ(listener->max_level_checked, 6); ASSERT_EQ(listener->max_level_checked, 6);
} }
} }
class CompactionStallTestListener : public EventListener {
public:
CompactionStallTestListener() : compacted_files_cnt_(0) {}
void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
ASSERT_EQ(ci.cf_name, "default");
ASSERT_EQ(ci.base_input_level, 0);
ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
compacted_files_cnt_ += ci.input_files.size();
}
std::atomic<size_t> compacted_files_cnt_;
};
TEST_F(DBTest2, CompactionStall) {
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"},
{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"},
{"DBTest2::CompactionStall:2",
"DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.max_background_compactions = 40;
CompactionStallTestListener* listener = new CompactionStallTestListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
Random rnd(301);
// 4 Files in L0
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 10; j++) {
ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
}
ASSERT_OK(Flush());
}
// Wait for compaction to be triggered
TEST_SYNC_POINT("DBTest2::CompactionStall:0");
// Clear "DBImpl::BGWorkCompaction" SYNC_POINT since we want to hold it again
// at DBTest2::CompactionStall::1
rocksdb::SyncPoint::GetInstance()->ClearTrace();
// Another 6 L0 files to trigger compaction again
for (int i = 0; i < 6; i++) {
for (int j = 0; j < 10; j++) {
ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
}
ASSERT_OK(Flush());
}
// Wait for another compaction to be triggered
TEST_SYNC_POINT("DBTest2::CompactionStall:1");
// Hold NotifyOnCompactionCompleted in the unlock mutex section
TEST_SYNC_POINT("DBTest2::CompactionStall:2");
dbfull()->TEST_WaitForCompact();
ASSERT_LT(NumTableFilesAtLevel(0),
options.level0_file_num_compaction_trigger);
ASSERT_GT(listener->compacted_files_cnt_.load(),
10 - options.level0_file_num_compaction_trigger);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
TEST_F(DBTest2, FirstSnapshotTest) { TEST_F(DBTest2, FirstSnapshotTest) {

Loading…
Cancel
Save