diff --git a/HISTORY.md b/HISTORY.md index 72a7fd588..388c101ac 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,7 @@ * Fixed a bug in iterator refresh which could segfault for DeleteRange users (#10739). * Fixed a bug causing manual flush with `flush_opts.wait=false` to stall when database has stopped all writes (#10001). * Fixed a bug in iterator refresh that was not freeing up SuperVersion, which could cause excessive resource pinniung (#10770). +* Fixed a bug where RocksDB could be doing compaction endlessly when allow_ingest_behind is true and the bottommost level is not filled (#10767). ### Performance Improvements * Try to align the compaction output file boundaries to the next level ones, which can reduce more than 10% compaction load for the default level compaction. The feature is enabled by default, to disable, set `AdvancedColumnFamilyOptions.level_compaction_dynamic_file_size` to false. As a side effect, it can create SSTs larger than the target_file_size (capped at 2x target_file_size) or smaller files. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 15a7ce9a6..0594825d6 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -8194,6 +8194,42 @@ TEST_F(DBCompactionTest, BottomPriCompactionCountsTowardConcurrencyLimit) { compact_range_thread.join(); } +TEST_F(DBCompactionTest, BottommostFileCompactionAllowIngestBehind) { + // allow_ingest_behind prevents seqnum zeroing, and could cause + // compaction loop with reason kBottommostFiles. + Options options = CurrentOptions(); + options.env = env_; + options.compaction_style = kCompactionStyleLevel; + options.allow_ingest_behind = true; + options.comparator = BytewiseComparator(); + DestroyAndReopen(options); + + WriteOptions write_opts; + ASSERT_OK(db_->Put(write_opts, "infinite", "compaction loop")); + ASSERT_OK(db_->Put(write_opts, "infinite", "loop")); + + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_OK(db_->Put(write_opts, "bumpseqnum", "")); + ASSERT_OK(Flush()); + auto snapshot = db_->GetSnapshot(); + // Bump up oldest_snapshot_seqnum_ in VersionStorageInfo. + db_->ReleaseSnapshot(snapshot); + bool compacted = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "LevelCompactionPicker::PickCompaction:Return", [&](void* /* arg */) { + // There should not be a compaction. + compacted = true; + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + // Wait for compaction to be scheduled. + env_->SleepForMicroseconds(2000000); + ASSERT_FALSE(compacted); + // The following assert can be used to check for compaction loop: + // it used to wait forever before the fix. + // ASSERT_OK(dbfull()->TEST_WaitForCompact(true /* wait_unscheduled */)); +} + #endif // !defined(ROCKSDB_LITE) } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 56cc6cde1..5aa4b2a3d 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3674,14 +3674,16 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { if (oldest_snapshot > bottommost_files_mark_threshold_) { CfdList cf_scheduled; for (auto* cfd : *versions_->GetColumnFamilySet()) { - cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot); - if (!cfd->current() - ->storage_info() - ->BottommostFilesMarkedForCompaction() - .empty()) { - SchedulePendingCompaction(cfd); - MaybeScheduleFlushOrCompaction(); - cf_scheduled.push_back(cfd); + if (!cfd->ioptions()->allow_ingest_behind) { + cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot); + if (!cfd->current() + ->storage_info() + ->BottommostFilesMarkedForCompaction() + .empty()) { + SchedulePendingCompaction(cfd); + MaybeScheduleFlushOrCompaction(); + cf_scheduled.push_back(cfd); + } } } @@ -3690,7 +3692,8 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { // mutex might be unlocked during the loop, making the result inaccurate. SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber; for (auto* cfd : *versions_->GetColumnFamilySet()) { - if (CfdListContains(cf_scheduled, cfd)) { + if (CfdListContains(cf_scheduled, cfd) || + cfd->ioptions()->allow_ingest_behind) { continue; } new_bottommost_files_mark_threshold = std::min( diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 7e7305a54..aed66f291 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -3772,9 +3772,11 @@ void DBImpl::InstallSuperVersionAndScheduleWork( // triggered soon anyway. bottommost_files_mark_threshold_ = kMaxSequenceNumber; for (auto* my_cfd : *versions_->GetColumnFamilySet()) { - bottommost_files_mark_threshold_ = std::min( - bottommost_files_mark_threshold_, - my_cfd->current()->storage_info()->bottommost_files_mark_threshold()); + if (!my_cfd->ioptions()->allow_ingest_behind) { + bottommost_files_mark_threshold_ = std::min( + bottommost_files_mark_threshold_, + my_cfd->current()->storage_info()->bottommost_files_mark_threshold()); + } } // Whenever we install new SuperVersion, we might need to issue new flushes or diff --git a/db/version_set.cc b/db/version_set.cc index ae8a30a28..009c917a0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2915,7 +2915,9 @@ void VersionStorageInfo::PrepareForVersionAppend( GenerateFileIndexer(); GenerateLevelFilesBrief(); GenerateLevel0NonOverlapping(); - GenerateBottommostFiles(); + if (!immutable_options.allow_ingest_behind) { + GenerateBottommostFiles(); + } GenerateFileLocationIndex(); } @@ -3355,7 +3357,9 @@ void VersionStorageInfo::ComputeCompactionScore( } } ComputeFilesMarkedForCompaction(); - ComputeBottommostFilesMarkedForCompaction(); + if (!immutable_options.allow_ingest_behind) { + ComputeBottommostFilesMarkedForCompaction(); + } if (mutable_cf_options.ttl > 0) { ComputeExpiredTtlFiles(immutable_options, mutable_cf_options.ttl); }