From 073cbfc8f080a09c539ed16ff3b945bf20e0009f Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Wed, 16 Oct 2013 13:32:53 -0700 Subject: [PATCH] Enable background flush thread by default and fix issues related to it Summary: Enable background flush thread in this patch and fix unit tests with: (1) After background flush, schedule a background compaction if condition satisfied; (2) Fix a bug that if universal compaction is enabled and number of levels are set to be 0, compaction will not be automatically triggered (3) Fix unit tests to wait for compaction to finish instead of flush, before checking the compaction results. Test Plan: pass all unit tests Reviewers: haobo, xjin, dhruba Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D13461 --- db/corruption_test.cc | 1 + db/db_impl.cc | 11 +++++++---- db/db_impl.h | 2 +- db/db_test.cc | 2 ++ db/version_set.cc | 7 ++++++- db/version_set.h | 12 +++++++++++- 6 files changed, 28 insertions(+), 7 deletions(-) diff --git a/db/corruption_test.cc b/db/corruption_test.cc index f835edc68..ccea4cb3f 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -324,6 +324,7 @@ TEST(CorruptionTest, CompactionInputErrorParanoid) { Build(10); dbi->TEST_FlushMemTable(); + dbi->TEST_WaitForCompact(); ASSERT_EQ(1, Property("rocksdb.num-files-at-level0")); Corrupt(kTableFile, 100, 1); diff --git a/db/db_impl.cc b/db/db_impl.cc index f3e3b4d32..7ce02a752 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1436,24 +1436,25 @@ void DBImpl::BGWorkCompaction(void* db) { reinterpret_cast(db)->BackgroundCallCompaction(); } -Status DBImpl::BackgroundFlush() { +Status DBImpl::BackgroundFlush(bool* madeProgress) { Status stat; while (stat.ok() && imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", options_.max_background_flushes - bg_flush_scheduled_); - stat = FlushMemTableToOutputFile(); + stat = FlushMemTableToOutputFile(madeProgress); } return stat; } void DBImpl::BackgroundCallFlush() { + bool madeProgress = false; assert(bg_flush_scheduled_); MutexLock l(&mutex_); if (!shutting_down_.Acquire_Load()) { - Status s = BackgroundFlush(); + Status s = BackgroundFlush(&madeProgress); if (!s.ok()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to @@ -1469,7 +1470,9 @@ void DBImpl::BackgroundCallFlush() { } bg_flush_scheduled_--; - + if (madeProgress) { + MaybeScheduleFlushOrCompaction(); + } bg_cv_.SignalAll(); } diff --git a/db/db_impl.h b/db/db_impl.h index ef2ef5944..ca9c755a3 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -186,7 +186,7 @@ class DBImpl : public DB { void BackgroundCallCompaction(); void BackgroundCallFlush(); Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state); - Status BackgroundFlush(); + Status BackgroundFlush(bool* madeProgress); void CleanupCompaction(CompactionState* compact); Status DoCompactionWork(CompactionState* compact); diff --git a/db/db_test.cc b/db/db_test.cc index 340590cdd..59113c7df 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1850,6 +1850,8 @@ TEST(DBTest, UniversalCompactionSizeAmplification) { // but will instead trigger size amplification. dbfull()->Flush(FlushOptions()); + dbfull()->TEST_WaitForCompact(); + // Verify that size amplification did occur ASSERT_EQ(NumTableFilesAtLevel(0), 1); } diff --git a/db/version_set.cc b/db/version_set.cc index ebd138046..62309d336 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1645,7 +1645,12 @@ void VersionSet::Finalize(Version* v, double max_score = 0; int max_score_level = 0; - for (int level = 0; level < NumberLevels()-1; level++) { + int num_levels_to_check = + (options_->compaction_style != kCompactionStyleUniversal) ? + NumberLevels() - 1 : 1; + + for (int level = 0; level < num_levels_to_check; level++) { + double score; if (level == 0) { // We treat level-0 specially by bounding the number of files diff --git a/db/version_set.h b/db/version_set.h index f95a89a4f..8617e0491 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -312,7 +312,17 @@ class VersionSet { // Returns true iff some level needs a compaction because it has // exceeded its target size. bool NeedsSizeCompaction() const { - for (int i = 0; i < NumberLevels()-1; i++) { + // In universal compaction case, this check doesn't really + // check the compaction condition, but checks num of files threshold + // only. We are not going to miss any compaction opportunity + // but it's likely that more compactions are scheduled but + // ending up with nothing to do. We can improve it later. + // TODO: improve this function to be accurate for universal + // compactions. + int num_levels_to_check = + (options_->compaction_style != kCompactionStyleUniversal) ? + NumberLevels() - 1 : 1; + for (int i = 0; i < num_levels_to_check; i++) { if (current_->compaction_score_[i] >= 1) { return true; }