diff --git a/db/corruption_test.cc b/db/corruption_test.cc index f835edc68..ccea4cb3f 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -324,6 +324,7 @@ TEST(CorruptionTest, CompactionInputErrorParanoid) { Build(10); dbi->TEST_FlushMemTable(); + dbi->TEST_WaitForCompact(); ASSERT_EQ(1, Property("rocksdb.num-files-at-level0")); Corrupt(kTableFile, 100, 1); diff --git a/db/db_impl.cc b/db/db_impl.cc index f3e3b4d32..7ce02a752 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1436,24 +1436,25 @@ void DBImpl::BGWorkCompaction(void* db) { reinterpret_cast(db)->BackgroundCallCompaction(); } -Status DBImpl::BackgroundFlush() { +Status DBImpl::BackgroundFlush(bool* madeProgress) { Status stat; while (stat.ok() && imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", options_.max_background_flushes - bg_flush_scheduled_); - stat = FlushMemTableToOutputFile(); + stat = FlushMemTableToOutputFile(madeProgress); } return stat; } void DBImpl::BackgroundCallFlush() { + bool madeProgress = false; assert(bg_flush_scheduled_); MutexLock l(&mutex_); if (!shutting_down_.Acquire_Load()) { - Status s = BackgroundFlush(); + Status s = BackgroundFlush(&madeProgress); if (!s.ok()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to @@ -1469,7 +1470,9 @@ void DBImpl::BackgroundCallFlush() { } bg_flush_scheduled_--; - + if (madeProgress) { + MaybeScheduleFlushOrCompaction(); + } bg_cv_.SignalAll(); } diff --git a/db/db_impl.h b/db/db_impl.h index ef2ef5944..ca9c755a3 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -186,7 +186,7 @@ class DBImpl : public DB { void BackgroundCallCompaction(); void BackgroundCallFlush(); Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state); - Status BackgroundFlush(); + Status BackgroundFlush(bool* madeProgress); void CleanupCompaction(CompactionState* compact); Status DoCompactionWork(CompactionState* compact); diff --git a/db/db_test.cc b/db/db_test.cc index 340590cdd..59113c7df 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1850,6 +1850,8 @@ TEST(DBTest, UniversalCompactionSizeAmplification) { // but will instead trigger size amplification. dbfull()->Flush(FlushOptions()); + dbfull()->TEST_WaitForCompact(); + // Verify that size amplification did occur ASSERT_EQ(NumTableFilesAtLevel(0), 1); } diff --git a/db/version_set.cc b/db/version_set.cc index ebd138046..62309d336 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1645,7 +1645,12 @@ void VersionSet::Finalize(Version* v, double max_score = 0; int max_score_level = 0; - for (int level = 0; level < NumberLevels()-1; level++) { + int num_levels_to_check = + (options_->compaction_style != kCompactionStyleUniversal) ? + NumberLevels() - 1 : 1; + + for (int level = 0; level < num_levels_to_check; level++) { + double score; if (level == 0) { // We treat level-0 specially by bounding the number of files diff --git a/db/version_set.h b/db/version_set.h index f95a89a4f..8617e0491 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -312,7 +312,17 @@ class VersionSet { // Returns true iff some level needs a compaction because it has // exceeded its target size. bool NeedsSizeCompaction() const { - for (int i = 0; i < NumberLevels()-1; i++) { + // In universal compaction case, this check doesn't really + // check the compaction condition, but checks num of files threshold + // only. We are not going to miss any compaction opportunity + // but it's likely that more compactions are scheduled but + // ending up with nothing to do. We can improve it later. + // TODO: improve this function to be accurate for universal + // compactions. + int num_levels_to_check = + (options_->compaction_style != kCompactionStyleUniversal) ? + NumberLevels() - 1 : 1; + for (int i = 0; i < num_levels_to_check; i++) { if (current_->compaction_score_[i] >= 1) { return true; }