diff --git a/HISTORY.md b/HISTORY.md index 80aa22263..c0d351550 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -15,6 +15,7 @@ * Fixed a bug in LockWAL() leading to re-locking mutex (#11020). * Fixed a heap use after free bug in async scan prefetching when the scan thread and another thread try to read and load the same seek block into cache. * Fixed a heap use after free in async scan prefetching if dictionary compression is enabled, in which case sync read of the compression dictionary gets mixed with async prefetching +* Fixed a data race bug of `CompactRange()` under `change_level=true` acts on overlapping range with an ongoing file ingestion for level compaction. This will either result in overlapping file ranges corruption at a certain level caught by `force_consistency_checks=true` or protentially two same keys both with seqno 0 in two different levels (i.e, new data ends up in lower/older level). The latter will be caught by assertion in debug build but go silently and result in read returning wrong result in release build. This fix is general so it also replaced previous fixes to a similar problem for `CompactFiles()` (#4665), general `CompactRange()` and auto compaction (commit 5c64fb6 and 87dfc1d). ### New Features * When an SstPartitionerFactory is configured, CompactRange() now automatically selects for compaction any files overlapping a partition boundary that is in the compaction range, even if no actual entries are in the requested compaction range. With this feature, manual compaction can be used to (re-)establish SST partition points when SstPartitioner changes, without a full compaction. diff --git a/db/column_family.cc b/db/column_family.cc index d9875336c..8124b23cd 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1218,6 +1218,7 @@ Compaction* ColumnFamilyData::CompactRange( if (result != nullptr) { result->SetInputVersion(current_); } + TEST_SYNC_POINT("ColumnFamilyData::CompactRange:Return"); return result; } diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 3d6d334db..894754589 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -235,12 +235,19 @@ Compaction::Compaction( inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))), grandparents_(std::move(_grandparents)), score_(_score), - bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)), + bottommost_level_( + // For simplicity, we don't support the concept of "bottommost level" + // with + // `CompactionReason::kExternalSstIngestion` and + // `CompactionReason::kRefitLevel` + (_compaction_reason == CompactionReason::kExternalSstIngestion || + _compaction_reason == CompactionReason::kRefitLevel) + ? false + : IsBottommostLevel(output_level_, vstorage, inputs_)), is_full_compaction_(IsFullCompaction(vstorage, inputs_)), is_manual_compaction_(_manual_compaction), trim_ts_(_trim_ts), is_trivial_move_(false), - compaction_reason_(_compaction_reason), notify_on_compaction_completion_(false), enable_blob_garbage_collection_( @@ -255,8 +262,15 @@ Compaction::Compaction( _blob_garbage_collection_age_cutoff > 1 ? mutable_cf_options()->blob_garbage_collection_age_cutoff : _blob_garbage_collection_age_cutoff), - penultimate_level_(EvaluatePenultimateLevel( - vstorage, immutable_options_, start_level_, output_level_)) { + penultimate_level_( + // For simplicity, we don't support the concept of "penultimate level" + // with `CompactionReason::kExternalSstIngestion` and + // `CompactionReason::kRefitLevel` + _compaction_reason == CompactionReason::kExternalSstIngestion || + _compaction_reason == CompactionReason::kRefitLevel + ? Compaction::kInvalidLevel + : EvaluatePenultimateLevel(vstorage, immutable_options_, + start_level_, output_level_)) { MarkFilesBeingCompacted(true); if (is_manual_compaction_) { compaction_reason_ = CompactionReason::kManualCompaction; diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index a30b21195..0f1dde327 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -99,6 +99,8 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) { return "ForcedBlobGC"; case CompactionReason::kRoundRobinTtl: return "RoundRobinTtl"; + case CompactionReason::kRefitLevel: + return "RefitLevel"; case CompactionReason::kNumOfReasons: // fall through default: diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index de2570eee..5fe058b56 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -1126,7 +1126,11 @@ void CompactionPicker::RegisterCompaction(Compaction* c) { c->output_level() == 0 || !FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level(), c->GetPenultimateLevel())); - if (c->start_level() == 0 || + // CompactionReason::kExternalSstIngestion's start level is just a placeholder + // number without actual meaning as file ingestion technically does not have + // an input level like other compactions + if ((c->start_level() == 0 && + c->compaction_reason() != CompactionReason::kExternalSstIngestion) || ioptions_.compaction_style == kCompactionStyleUniversal) { level0_compactions_in_progress_.insert(c); } diff --git a/db/compaction/compaction_picker_level.cc b/db/compaction/compaction_picker_level.cc index 31987fc52..2162d30a3 100644 --- a/db/compaction/compaction_picker_level.cc +++ b/db/compaction/compaction_picker_level.cc @@ -447,21 +447,21 @@ bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() { compaction_inputs_.push_back(output_level_inputs_); } + // In some edge cases we could pick a compaction that will be compacting + // a key range that overlap with another running compaction, and both + // of them have the same output level. This could happen if + // (1) we are running a non-exclusive manual compaction + // (2) AddFile ingest a new file into the LSM tree + // We need to disallow this from happening. + if (compaction_picker_->FilesRangeOverlapWithCompaction( + compaction_inputs_, output_level_, + Compaction::EvaluatePenultimateLevel( + vstorage_, ioptions_, start_level_, output_level_))) { + // This compaction output could potentially conflict with the output + // of a currently running compaction, we cannot run it. + return false; + } if (!is_l0_trivial_move_) { - // In some edge cases we could pick a compaction that will be compacting - // a key range that overlap with another running compaction, and both - // of them have the same output level. This could happen if - // (1) we are running a non-exclusive manual compaction - // (2) AddFile ingest a new file into the LSM tree - // We need to disallow this from happening. - if (compaction_picker_->FilesRangeOverlapWithCompaction( - compaction_inputs_, output_level_, - Compaction::EvaluatePenultimateLevel( - vstorage_, ioptions_, start_level_, output_level_))) { - // This compaction output could potentially conflict with the output - // of a currently running compaction, we cannot run it. - return false; - } compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_, output_level_inputs_, &grandparents_); } diff --git a/db/db_bloom_filter_test.cc b/db/db_bloom_filter_test.cc index d68ab6115..7bf509abe 100644 --- a/db/db_bloom_filter_test.cc +++ b/db/db_bloom_filter_test.cc @@ -1229,7 +1229,7 @@ TEST_P(ChargeFilterConstructionTestWithParam, Basic) { * * The test is designed in a way such that the reservation for (p1 - b') * will trigger at least another dummy entry insertion - * (or equivelantly to saying, creating another peak). + * (or equivalently to saying, creating another peak). * * kStandard128Ribbon + FullFilter + * detect_filter_construct_corruption @@ -2618,8 +2618,7 @@ TEST_F(DBBloomFilterTest, OptimizeFiltersForHits) { BottommostLevelCompaction::kSkip; compact_options.change_level = true; compact_options.target_level = 7; - ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr) - .IsNotSupported()); + ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); ASSERT_EQ(trivial_move, 1); ASSERT_EQ(non_trivial_move, 0); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 0227c06e4..2fffcae60 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6245,6 +6245,231 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) { Close(); } +class DBCompactionTestWithOngoingFileIngestionParam + : public DBCompactionTest, + public testing::WithParamInterface { + public: + DBCompactionTestWithOngoingFileIngestionParam() : DBCompactionTest() { + compaction_path_to_test_ = GetParam(); + } + void SetupOptions() { + options_ = CurrentOptions(); + options_.create_if_missing = true; + + if (compaction_path_to_test_ == "RefitLevelCompactRange") { + options_.num_levels = 7; + } else { + options_.num_levels = 3; + } + options_.compaction_style = CompactionStyle::kCompactionStyleLevel; + if (compaction_path_to_test_ == "AutoCompaction") { + options_.disable_auto_compactions = false; + options_.level0_file_num_compaction_trigger = 1; + } else { + options_.disable_auto_compactions = true; + } + } + + void PauseCompactionThread() { + sleeping_task_.reset(new test::SleepingBackgroundTask()); + env_->SetBackgroundThreads(1, Env::LOW); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + sleeping_task_.get(), Env::Priority::LOW); + sleeping_task_->WaitUntilSleeping(); + } + + void ResumeCompactionThread() { + if (sleeping_task_) { + sleeping_task_->WakeUp(); + sleeping_task_->WaitUntilDone(); + } + } + + void SetupFilesToForceFutureFilesIngestedToCertainLevel() { + SstFileWriter sst_file_writer(EnvOptions(), options_); + std::string dummy = dbname_ + "/dummy.sst"; + ASSERT_OK(sst_file_writer.Open(dummy)); + ASSERT_OK(sst_file_writer.Put("k2", "dummy")); + ASSERT_OK(sst_file_writer.Finish()); + ASSERT_OK(db_->IngestExternalFile({dummy}, IngestExternalFileOptions())); + // L2 is made to contain a file overlapped with files to be ingested in + // later steps on key "k2". This will force future files ingested to L1 or + // above. + ASSERT_EQ("0,0,1", FilesPerLevel(0)); + } + + void SetupSyncPoints() { + if (compaction_path_to_test_ == "AutoCompaction") { + SyncPoint::GetInstance()->SetCallBack( + "ExternalSstFileIngestionJob::Run", [&](void*) { + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BackgroundCompaction():AfterPickCompaction", + "VersionSet::LogAndApply:WriteManifest"}}); + }); + } else if (compaction_path_to_test_ == "NonRefitLevelCompactRange") { + SyncPoint::GetInstance()->SetCallBack( + "ExternalSstFileIngestionJob::Run", [&](void*) { + SyncPoint::GetInstance()->LoadDependency( + {{"ColumnFamilyData::CompactRange:Return", + "VersionSet::LogAndApply:WriteManifest"}}); + }); + } else if (compaction_path_to_test_ == "RefitLevelCompactRange") { + SyncPoint::GetInstance()->SetCallBack( + "ExternalSstFileIngestionJob::Run", [&](void*) { + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::CompactRange:PostRefitLevel", + "VersionSet::LogAndApply:WriteManifest"}}); + }); + } else if (compaction_path_to_test_ == "CompactFiles") { + SyncPoint::GetInstance()->SetCallBack( + "ExternalSstFileIngestionJob::Run", [&](void*) { + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::CompactFilesImpl::PostSanitizeCompactionInputFiles", + "VersionSet::LogAndApply:WriteManifest"}}); + }); + } else { + assert(false); + } + SyncPoint::GetInstance()->LoadDependency( + {{"ExternalSstFileIngestionJob::Run", "PreCompaction"}}); + SyncPoint::GetInstance()->EnableProcessing(); + } + + void RunCompactionOverlappedWithFileIngestion() { + if (compaction_path_to_test_ == "AutoCompaction") { + TEST_SYNC_POINT("PreCompaction"); + ResumeCompactionThread(); + // Without proper range conflict check, + // this would have been `Status::Corruption` about overlapping ranges + Status s = dbfull()->TEST_WaitForCompact(); + EXPECT_OK(s); + } else if (compaction_path_to_test_ == "NonRefitLevelCompactRange") { + CompactRangeOptions cro; + cro.change_level = false; + std::string start_key = "k1"; + Slice start(start_key); + std::string end_key = "k4"; + Slice end(end_key); + TEST_SYNC_POINT("PreCompaction"); + // Without proper range conflict check, + // this would have been `Status::Corruption` about overlapping ranges + Status s = dbfull()->CompactRange(cro, &start, &end); + EXPECT_OK(s); + } else if (compaction_path_to_test_ == "RefitLevelCompactRange") { + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 5; + std::string start_key = "k1"; + Slice start(start_key); + std::string end_key = "k4"; + Slice end(end_key); + TEST_SYNC_POINT("PreCompaction"); + Status s = dbfull()->CompactRange(cro, &start, &end); + // Without proper range conflict check, + // this would have been `Status::Corruption` about overlapping ranges + // To see this, remove the fix AND replace + // `DBImpl::CompactRange:PostRefitLevel` in sync point dependency with + // `DBImpl::ReFitLevel:PostRegisterCompaction` + EXPECT_TRUE(s.IsNotSupported()); + EXPECT_TRUE(s.ToString().find("some ongoing compaction's output") != + std::string::npos); + } else if (compaction_path_to_test_ == "CompactFiles") { + ColumnFamilyMetaData cf_meta_data; + db_->GetColumnFamilyMetaData(&cf_meta_data); + ASSERT_EQ(cf_meta_data.levels[0].files.size(), 1); + std::vector input_files; + for (const auto& file : cf_meta_data.levels[0].files) { + input_files.push_back(file.name); + } + TEST_SYNC_POINT("PreCompaction"); + Status s = db_->CompactFiles(CompactionOptions(), input_files, 1); + // Without proper range conflict check, + // this would have been `Status::Corruption` about overlapping ranges + EXPECT_TRUE(s.IsAborted()); + EXPECT_TRUE( + s.ToString().find( + "A running compaction is writing to the same output level") != + std::string::npos); + } else { + assert(false); + } + } + + void DisableSyncPoints() { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + } + + protected: + std::string compaction_path_to_test_; + Options options_; + std::shared_ptr sleeping_task_; +}; + +INSTANTIATE_TEST_CASE_P(DBCompactionTestWithOngoingFileIngestionParam, + DBCompactionTestWithOngoingFileIngestionParam, + ::testing::Values("AutoCompaction", + "NonRefitLevelCompactRange", + "RefitLevelCompactRange", + "CompactFiles")); + +TEST_P(DBCompactionTestWithOngoingFileIngestionParam, RangeConflictCheck) { + SetupOptions(); + DestroyAndReopen(options_); + + if (compaction_path_to_test_ == "AutoCompaction") { + PauseCompactionThread(); + } + + if (compaction_path_to_test_ != "RefitLevelCompactRange") { + SetupFilesToForceFutureFilesIngestedToCertainLevel(); + } + + // Create s1 + ASSERT_OK(Put("k1", "v")); + ASSERT_OK(Put("k4", "v")); + ASSERT_OK(Flush()); + if (compaction_path_to_test_ == "RefitLevelCompactRange") { + MoveFilesToLevel(6 /* level */); + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel(0)); + } else { + ASSERT_EQ("1,0,1", FilesPerLevel(0)); + } + + // To coerce following sequence of events + // Timeline Thread 1 (Ingest s2) Thread 2 (Compact s1) + // t0 | Decide to output to Lk + // t1 | Release lock in LogAndApply() + // t2 | Acquire lock + // t3 | Decides to compact to Lk + // | Expected to fail due to range + // | conflict check with file + // | ingestion + // t4 | Release lock in LogAndApply() + // t5 | Acquire lock again and finish + // t6 | Acquire lock again and finish + SetupSyncPoints(); + + // Ingest s2 + port::Thread thread1([&] { + SstFileWriter sst_file_writer(EnvOptions(), options_); + std::string s2 = dbname_ + "/ingested_s2.sst"; + ASSERT_OK(sst_file_writer.Open(s2)); + ASSERT_OK(sst_file_writer.Put("k2", "v2")); + ASSERT_OK(sst_file_writer.Put("k3", "v2")); + ASSERT_OK(sst_file_writer.Finish()); + ASSERT_OK(db_->IngestExternalFile({s2}, IngestExternalFileOptions())); + }); + + // Compact s1. Without proper range conflict check, + // this will encounter overlapping file corruption. + port::Thread thread2([&] { RunCompactionOverlappedWithFileIngestion(); }); + + thread1.join(); + thread2.join(); + DisableSyncPoints(); +} + TEST_F(DBCompactionTest, ConsistencyFailTest) { Options options = CurrentOptions(); options.force_consistency_checks = true; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 411503a6f..f930878de 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -5199,8 +5199,9 @@ Status DBImpl::IngestExternalFiles( for (const auto& arg : args) { auto* cfd = static_cast(arg.column_family)->cfd(); ingestion_jobs.emplace_back(versions_.get(), cfd, immutable_db_options_, - file_options_, &snapshots_, arg.options, - &directories_, &event_logger_, io_tracer_); + mutable_db_options_, file_options_, &snapshots_, + arg.options, &directories_, &event_logger_, + io_tracer_); } // TODO(yanqin) maybe make jobs run in parallel @@ -5333,6 +5334,7 @@ Status DBImpl::IngestExternalFiles( if (!status.ok()) { break; } + ingestion_jobs[i].RegisterRange(); } } if (status.ok()) { @@ -5388,6 +5390,10 @@ Status DBImpl::IngestExternalFiles( } } + for (auto& job : ingestion_jobs) { + job.UnregisterRange(); + } + if (status.ok()) { for (size_t i = 0; i != num_cfs; ++i) { auto* cfd = @@ -5759,13 +5765,6 @@ void DBImpl::NotifyOnExternalFileIngested( } } -void DBImpl::WaitForIngestFile() { - mutex_.AssertHeld(); - while (num_running_ingest_file_ > 0) { - bg_cv_.Wait(); - } -} - Status DBImpl::StartTrace(const TraceOptions& trace_options, std::unique_ptr&& trace_writer) { InstrumentedMutexLock lock(&trace_mutex_); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 0eebef774..9fcd9efea 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2023,14 +2023,6 @@ class DBImpl : public DB { const int output_level, int output_path_id, JobContext* job_context, LogBuffer* log_buffer, CompactionJobInfo* compaction_job_info); - - // Wait for current IngestExternalFile() calls to finish. - // REQUIRES: mutex_ held - void WaitForIngestFile(); -#else - // IngestExternalFile is not supported in ROCKSDB_LITE so this function - // will be no-op - void WaitForIngestFile() {} #endif // ROCKSDB_LITE ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 95ee948eb..f84588276 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1249,6 +1249,12 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, ROCKS_LOG_INFO(immutable_db_options_.info_log, "[RefitLevel] waiting for background threads to stop"); + // TODO(hx235): remove `Enable/DisableManualCompaction` and + // `Continue/PauseBackgroundWork` once we ensure registering RefitLevel()'s + // range is sufficient (if not, what else is needed) for avoiding range + // conflicts with other activities (e.g, compaction, flush) that are + // currently avoided by `Enable/DisableManualCompaction` and + // `Continue/PauseBackgroundWork`. DisableManualCompaction(); s = PauseBackgroundWork(); if (s.ok()) { @@ -1313,13 +1319,6 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options, const_cast*>(&manual_compaction_paused_))); { InstrumentedMutexLock l(&mutex_); - - // This call will unlock/lock the mutex to wait for current running - // IngestExternalFile() calls to finish. - WaitForIngestFile(); - - // We need to get current after `WaitForIngestFile`, because - // `IngestExternalFile` may add files that overlap with `input_file_names` auto* current = cfd->current(); current->Ref(); @@ -1398,6 +1397,7 @@ Status DBImpl::CompactFilesImpl( Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles( &input_set, cf_meta, output_level); + TEST_SYNC_POINT("DBImpl::CompactFilesImpl::PostSanitizeCompactionInputFiles"); if (!s.ok()) { return s; } @@ -1691,6 +1691,10 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { InstrumentedMutexLock guard_lock(&mutex_); + auto* vstorage = cfd->current()->storage_info(); + if (vstorage->LevelFiles(level).empty()) { + return Status::OK(); + } // only allow one thread refitting if (refitting_level_) { ROCKS_LOG_INFO(immutable_db_options_.info_log, @@ -1706,8 +1710,16 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level); } - auto* vstorage = cfd->current()->storage_info(); if (to_level != level) { + std::vector input(1); + input[0].level = level; + for (auto& f : vstorage->LevelFiles(level)) { + input[0].files.push_back(f); + } + InternalKey refit_level_smallest; + InternalKey refit_level_largest; + cfd->compaction_picker()->GetRange(input[0], &refit_level_smallest, + &refit_level_largest); if (to_level > level) { if (level == 0) { refitting_level_ = false; @@ -1721,6 +1733,14 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { return Status::NotSupported( "Levels between source and target are not empty for a move."); } + if (cfd->RangeOverlapWithCompaction(refit_level_smallest.user_key(), + refit_level_largest.user_key(), + l)) { + refitting_level_ = false; + return Status::NotSupported( + "Levels between source and target " + "will have some ongoing compaction's output."); + } } } else { // to_level < level @@ -1731,12 +1751,39 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { return Status::NotSupported( "Levels between source and target are not empty for a move."); } + if (cfd->RangeOverlapWithCompaction(refit_level_smallest.user_key(), + refit_level_largest.user_key(), + l)) { + refitting_level_ = false; + return Status::NotSupported( + "Levels between source and target " + "will have some ongoing compaction's output."); + } } } ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] Before refitting:\n%s", cfd->GetName().c_str(), cfd->current()->DebugString().data()); + std::unique_ptr c(new Compaction( + vstorage, *cfd->ioptions(), mutable_cf_options, mutable_db_options_, + {input}, to_level, + MaxFileSizeForLevel( + mutable_cf_options, to_level, + cfd->ioptions() + ->compaction_style) /* output file size limit, not applicable */ + , + LLONG_MAX /* max compaction bytes, not applicable */, + 0 /* output path ID, not applicable */, mutable_cf_options.compression, + mutable_cf_options.compression_opts, Temperature::kUnknown, + 0 /* max_subcompactions, not applicable */, + {} /* grandparents, not applicable */, false /* is manual */, + "" /* trim_ts */, -1 /* score, not applicable */, + false /* is deletion compaction, not applicable */, + false /* l0_files_might_overlap, not applicable */, + CompactionReason::kRefitLevel)); + cfd->compaction_picker()->RegisterCompaction(c.get()); + TEST_SYNC_POINT("DBImpl::ReFitLevel:PostRegisterCompaction"); VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); @@ -1757,6 +1804,9 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { Status status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, directories_.GetDbDir()); + cfd->compaction_picker()->UnregisterCompaction(c.get()); + c.reset(); + InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n", @@ -1973,9 +2023,6 @@ Status DBImpl::RunManualCompaction( manual.begin, manual.end, &manual.manual_end, &manual_conflict, max_file_num_to_ignore, trim_ts)) == nullptr && manual_conflict))) { - // exclusive manual compactions should not see a conflict during - // CompactRange - assert(!exclusive || !manual_conflict); // Running either this or some other manual compaction bg_cv_.Wait(); if (manual_compaction_paused_ > 0 && scheduled && !unscheduled) { @@ -3004,10 +3051,6 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, { InstrumentedMutexLock l(&mutex_); - // This call will unlock/lock the mutex to wait for current running - // IngestExternalFile() calls to finish. - WaitForIngestFile(); - num_running_compactions_++; std::unique_ptr::iterator> @@ -3649,11 +3692,6 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) { } bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) { - if (num_running_ingest_file_ > 0) { - // We need to wait for other IngestExternalFile() calls to finish - // before running a manual compaction. - return true; - } if (m->exclusive) { return (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0); diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 665c89869..97b989733 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -800,6 +800,7 @@ TEST_P(ExternalSSTFileBasicTest, IngestFileWithMultipleValueType) { bool verify_checksums_before_ingest = std::get<1>(GetParam()); do { Options options = CurrentOptions(); + options.disable_auto_compactions = true; options.merge_operator.reset(new TestPutOperator()); DestroyAndReopen(options); std::map true_data; diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 849f98e87..6437fdc49 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -477,9 +477,82 @@ Status ExternalSstFileIngestionJob::Run() { f_metadata.temperature = f.file_temperature; edit_.AddFile(f.picked_level, f_metadata); } + + CreateEquivalentFileIngestingCompactions(); return status; } +void ExternalSstFileIngestionJob::CreateEquivalentFileIngestingCompactions() { + // A map from output level to input of compactions equivalent to this + // ingestion job. + // TODO: simplify below logic to creating compaction per ingested file + // instead of per output level, once we figure out how to treat ingested files + // with adjacent range deletion tombstones to same output level in the same + // job as non-overlapping compactions. + std::map + output_level_to_file_ingesting_compaction_input; + + for (const auto& pair : edit_.GetNewFiles()) { + int output_level = pair.first; + const FileMetaData& f_metadata = pair.second; + + CompactionInputFiles& input = + output_level_to_file_ingesting_compaction_input[output_level]; + if (input.files.empty()) { + // Treat the source level of ingested files to be level 0 + input.level = 0; + } + + compaction_input_metdatas_.push_back(new FileMetaData(f_metadata)); + input.files.push_back(compaction_input_metdatas_.back()); + } + + for (const auto& pair : output_level_to_file_ingesting_compaction_input) { + int output_level = pair.first; + const CompactionInputFiles& input = pair.second; + + const auto& mutable_cf_options = *(cfd_->GetLatestMutableCFOptions()); + file_ingesting_compactions_.push_back(new Compaction( + cfd_->current()->storage_info(), *cfd_->ioptions(), mutable_cf_options, + mutable_db_options_, {input}, output_level, + MaxFileSizeForLevel( + mutable_cf_options, output_level, + cfd_->ioptions()->compaction_style) /* output file size + limit, + * not applicable + */ + , + LLONG_MAX /* max compaction bytes, not applicable */, + 0 /* output path ID, not applicable */, mutable_cf_options.compression, + mutable_cf_options.compression_opts, Temperature::kUnknown, + 0 /* max_subcompaction, not applicable */, + {} /* grandparents, not applicable */, false /* is manual */, + "" /* trim_ts */, -1 /* score, not applicable */, + false /* is deletion compaction, not applicable */, + files_overlap_ /* l0_files_might_overlap, not applicable */, + CompactionReason::kExternalSstIngestion)); + } +} + +void ExternalSstFileIngestionJob::RegisterRange() { + for (const auto& c : file_ingesting_compactions_) { + cfd_->compaction_picker()->RegisterCompaction(c); + } +} + +void ExternalSstFileIngestionJob::UnregisterRange() { + for (const auto& c : file_ingesting_compactions_) { + cfd_->compaction_picker()->UnregisterCompaction(c); + delete c; + } + file_ingesting_compactions_.clear(); + + for (const auto& f : compaction_input_metdatas_) { + delete f; + } + compaction_input_metdatas_.clear(); +} + void ExternalSstFileIngestionJob::UpdateStats() { // Update internal stats for new ingested files uint64_t total_keys = 0; @@ -798,8 +871,16 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( if (lvl > 0 && lvl < vstorage->base_level()) { continue; } - - if (vstorage->NumLevelFiles(lvl) > 0) { + if (cfd_->RangeOverlapWithCompaction( + file_to_ingest->smallest_internal_key.user_key(), + file_to_ingest->largest_internal_key.user_key(), lvl)) { + // We must use L0 or any level higher than `lvl` to be able to overwrite + // the compaction output keys that we overlap with in this level, We also + // need to assign this file a seqno to overwrite the compaction output + // keys in level `lvl` + overlap_with_db = true; + break; + } else if (vstorage->NumLevelFiles(lvl) > 0) { bool overlap_with_level = false; status = sv->current->OverlapWithLevelIterator( ro, env_options_, file_to_ingest->smallest_internal_key.user_key(), @@ -856,6 +937,7 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( target_level < cfd_->NumberLevels() - 1) { status = Status::TryAgain( "Files cannot be ingested to Lmax. Please make sure key range of Lmax " + "and ongoing compaction's output to Lmax" "does not overlap with files to ingest."); return status; } @@ -873,7 +955,7 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile( IngestedFileInfo* file_to_ingest) { auto* vstorage = cfd_->current()->storage_info(); - // first check if new files fit in the bottommost level + // First, check if new files fit in the bottommost level int bottom_lvl = cfd_->NumberLevels() - 1; if (!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) { return Status::InvalidArgument( @@ -881,7 +963,7 @@ Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile( "at the bottommost level!"); } - // second check if despite allow_ingest_behind=true we still have 0 seqnums + // Second, check if despite allow_ingest_behind=true we still have 0 seqnums // at some upper level for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) { for (auto file : vstorage->LevelFiles(lvl)) { @@ -997,14 +1079,8 @@ bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( // add it to this level return false; } - if (cfd_->RangeOverlapWithCompaction(file_smallest_user_key, - file_largest_user_key, level)) { - // File overlap with a running compaction output that will be stored - // in this level, we cannot add this file to this level - return false; - } - // File did not overlap with level files, our compaction output + // File did not overlap with level files, nor compaction output return true; } diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index ce50ae86d..49bb1e31e 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -11,6 +11,7 @@ #include "db/column_family.h" #include "db/internal_stats.h" #include "db/snapshot_impl.h" +#include "db/version_edit.h" #include "env/file_system_tracer.h" #include "logging/event_logger.h" #include "options/db_options.h" @@ -78,7 +79,8 @@ class ExternalSstFileIngestionJob { public: ExternalSstFileIngestionJob( VersionSet* versions, ColumnFamilyData* cfd, - const ImmutableDBOptions& db_options, const EnvOptions& env_options, + const ImmutableDBOptions& db_options, + const MutableDBOptions& mutable_db_options, const EnvOptions& env_options, SnapshotList* db_snapshots, const IngestExternalFileOptions& ingestion_options, Directories* directories, EventLogger* event_logger, @@ -88,6 +90,7 @@ class ExternalSstFileIngestionJob { versions_(versions), cfd_(cfd), db_options_(db_options), + mutable_db_options_(mutable_db_options), env_options_(env_options), db_snapshots_(db_snapshots), ingestion_options_(ingestion_options), @@ -99,6 +102,17 @@ class ExternalSstFileIngestionJob { assert(directories != nullptr); } + ~ExternalSstFileIngestionJob() { + for (const auto& c : file_ingesting_compactions_) { + cfd_->compaction_picker()->UnregisterCompaction(c); + delete c; + } + + for (const auto& f : compaction_input_metdatas_) { + delete f; + } + } + // Prepare the job by copying external files into the DB. Status Prepare(const std::vector& external_files_paths, const std::vector& files_checksums, @@ -120,6 +134,15 @@ class ExternalSstFileIngestionJob { // REQUIRES: Mutex held Status Run(); + // Register key range involved in this ingestion job + // to prevent key range conflict with other ongoing compaction/file ingestion + // REQUIRES: Mutex held + void RegisterRange(); + + // Unregister key range registered for this ingestion job + // REQUIRES: Mutex held + void UnregisterRange(); + // Update column family stats. // REQUIRES: Mutex held void UpdateStats(); @@ -175,11 +198,17 @@ class ExternalSstFileIngestionJob { template Status SyncIngestedFile(TWritableFile* file); + // Create equivalent `Compaction` objects to this file ingestion job + // , which will be used to check range conflict with other ongoing + // compactions. + void CreateEquivalentFileIngestingCompactions(); + SystemClock* clock_; FileSystemPtr fs_; VersionSet* versions_; ColumnFamilyData* cfd_; const ImmutableDBOptions& db_options_; + const MutableDBOptions& mutable_db_options_; const EnvOptions& env_options_; SnapshotList* db_snapshots_; autovector files_to_ingest_; @@ -196,6 +225,14 @@ class ExternalSstFileIngestionJob { // file_checksum_gen_factory is set, DB will generate checksum each file. bool need_generate_file_checksum_{true}; std::shared_ptr io_tracer_; + + // Below are variables used in (un)registering range for this ingestion job + // + // FileMetaData used in inputs of compactions equivalent to this ingestion + // job + std::vector compaction_input_metdatas_; + // Compactions equivalent to this ingestion job + std::vector file_ingesting_compactions_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index d16f6a58c..fcb2e7c94 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -973,7 +973,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) { do { Options options = CurrentOptions(); - + options.disable_auto_compactions = true; std::atomic thread_num(0); std::function write_file_func = [&]() { int file_idx = thread_num.fetch_add(1); @@ -1249,8 +1249,9 @@ TEST_P(ExternalSSTFileTest, PickedLevel) { // This file overlaps with file 0 (L3), file 1 (L2) and the // output of compaction going to L1 - ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1, false, false, true, - false, false, &true_data)); + ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1, + true /* allow_global_seqno */, false, + true, false, false, &true_data)); EXPECT_EQ(FilesPerLevel(), "5,0,1,1"); // This file does not overlap with any file or with the running compaction @@ -1270,106 +1271,6 @@ TEST_P(ExternalSSTFileTest, PickedLevel) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } -TEST_F(ExternalSSTFileTest, PickedLevelBug) { - env_->skip_fsync_ = true; - Options options = CurrentOptions(); - options.disable_auto_compactions = false; - options.level0_file_num_compaction_trigger = 3; - options.num_levels = 2; - DestroyAndReopen(options); - - std::vector file_keys; - - // file #1 in L0 - file_keys = {0, 5, 7}; - for (int k : file_keys) { - ASSERT_OK(Put(Key(k), Key(k))); - } - ASSERT_OK(Flush()); - - // file #2 in L0 - file_keys = {4, 6, 8, 9}; - for (int k : file_keys) { - ASSERT_OK(Put(Key(k), Key(k))); - } - ASSERT_OK(Flush()); - - // We have 2 overlapping files in L0 - EXPECT_EQ(FilesPerLevel(), "2"); - - ASSERT_OK(dbfull()->TEST_WaitForCompact()); - - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( - {{"DBImpl::IngestExternalFile:AfterIncIngestFileCounter", - "ExternalSSTFileTest::PickedLevelBug:0"}, - {"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"}, - {"ExternalSSTFileTest::PickedLevelBug:2", - "DBImpl::RunManualCompaction:0"}, - {"ExternalSSTFileTest::PickedLevelBug:3", - "DBImpl::RunManualCompaction:1"}}); - - std::atomic bg_compact_started(false); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::BackgroundCompaction:Start", - [&](void* /*arg*/) { bg_compact_started.store(true); }); - - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); - - Status bg_compact_status; - Status bg_addfile_status; - - { - // While writing the MANIFEST start a thread that will ask for compaction - ThreadGuard bg_compact(port::Thread([&]() { - bg_compact_status = - db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); - })); - TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2"); - - // Start a thread that will ingest a new file - ThreadGuard bg_addfile(port::Thread([&]() { - file_keys = {1, 2, 3}; - bg_addfile_status = GenerateAndAddExternalFile(options, file_keys, 1); - })); - - // Wait for AddFile to start picking levels and writing MANIFEST - TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0"); - - TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3"); - - // We need to verify that no compactions can run while AddFile is - // ingesting the files into the levels it find suitable. So we will - // wait for 2 seconds to give a chance for compactions to run during - // this period, and then make sure that no compactions where able to run - env_->SleepForMicroseconds(1000000 * 2); - bool bg_compact_started_tmp = bg_compact_started.load(); - - // Hold AddFile from finishing writing the MANIFEST - TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1"); - - // check the status at the end, so even if the ASSERT fails the threads - // could be joined and return. - ASSERT_FALSE(bg_compact_started_tmp); - } - - ASSERT_OK(bg_addfile_status); - ASSERT_OK(bg_compact_status); - - ASSERT_OK(dbfull()->TEST_WaitForCompact()); - - int total_keys = 0; - Iterator* iter = db_->NewIterator(ReadOptions()); - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - ASSERT_OK(iter->status()); - total_keys++; - } - ASSERT_EQ(total_keys, 10); - - delete iter; - - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); -} - TEST_F(ExternalSSTFileTest, IngestNonExistingFile) { Options options = CurrentOptions(); DestroyAndReopen(options); @@ -1420,7 +1321,8 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) { int range_id = 0; std::vector file_keys; std::function bg_addfile = [&]() { - ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id)); + ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id, + true /* allow_global_seqno */)); }; const int num_of_ranges = 1000; @@ -1503,8 +1405,9 @@ TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { // This file overlaps with the output of the compaction (going to L3) // so the file will be added to L0 since L3 is the base level - ASSERT_OK(GenerateAndAddExternalFile(options, {31, 32, 33, 34}, -1, false, - false, true, false, false, &true_data)); + ASSERT_OK(GenerateAndAddExternalFile(options, {31, 32, 33, 34}, -1, + true /* allow_global_seqno */, false, + true, false, false, &true_data)); EXPECT_EQ(FilesPerLevel(), "5"); // This file does not overlap with the current running compactiong @@ -1642,14 +1545,15 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "CompactionJob::Run():Start", [&](void* /*arg*/) { - // fit in L3 but will overlap with compaction so will be added - // to L2 but a compaction will trivially move it to L3 - // and break LSM consistency + // Fit in L3 but will overlap with the compaction output so will be + // added to L2. Prior to the fix, a compaction will then trivially move + // this file to L3 and break LSM consistency static std::atomic called = {false}; if (!called) { called = true; ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); - ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); + ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7, + true /* allow_global_seqno */)); } }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 8644fcf3f..853b58758 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -140,7 +140,10 @@ enum class CompactionReason : int { // According to the comments in flush_job.cc, RocksDB treats flush as // a level 0 compaction in internal stats. kFlush, - // Compaction caused by external sst file ingestion + // [InternalOnly] External sst file ingestion treated as a compaction + // with placeholder input level L0 as file ingestion + // technically does not have an input level like other compactions. + // Used only for internal stats and conflict checking with other compactions kExternalSstIngestion, // Compaction due to SST file being too old kPeriodicCompaction, @@ -151,6 +154,9 @@ enum class CompactionReason : int { // A special TTL compaction for RoundRobin policy, which basically the same as // kLevelMaxLevelSize, but the goal is to compact TTLed files. kRoundRobinTtl, + // [InternalOnly] DBImpl::ReFitLevel treated as a compaction, + // Used only for internal conflict checking with other compactions + kRefitLevel, // total number of compaction reasons, new reasons must be added above this. kNumOfReasons, }; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 7a4d8b5a6..e98328826 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1933,7 +1933,8 @@ struct IngestExternalFileOptions { // that where created before the file was ingested. bool snapshot_consistency = true; // If set to false, IngestExternalFile() will fail if the file key range - // overlaps with existing keys or tombstones in the DB. + // overlaps with existing keys or tombstones or output of ongoing compaction + // during file ingestion in the DB. bool allow_global_seqno = true; // If set to false and the file key range overlaps with the memtable key range // (memtable flush required), IngestExternalFile will fail. diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 340199507..1a72507a9 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -7181,6 +7181,16 @@ class CompactionReasonJni { return 0x0C; case ROCKSDB_NAMESPACE::CompactionReason::kExternalSstIngestion: return 0x0D; + case ROCKSDB_NAMESPACE::CompactionReason::kPeriodicCompaction: + return 0x0E; + case ROCKSDB_NAMESPACE::CompactionReason::kChangeTemperature: + return 0x0F; + case ROCKSDB_NAMESPACE::CompactionReason::kForcedBlobGC: + return 0x11; + case ROCKSDB_NAMESPACE::CompactionReason::kRoundRobinTtl: + return 0x12; + case ROCKSDB_NAMESPACE::CompactionReason::kRefitLevel: + return 0x13; default: return 0x7F; // undefined } @@ -7225,6 +7235,12 @@ class CompactionReasonJni { return ROCKSDB_NAMESPACE::CompactionReason::kPeriodicCompaction; case 0x0F: return ROCKSDB_NAMESPACE::CompactionReason::kChangeTemperature; + case 0x11: + return ROCKSDB_NAMESPACE::CompactionReason::kForcedBlobGC; + case 0x12: + return ROCKSDB_NAMESPACE::CompactionReason::kRoundRobinTtl; + case 0x13: + return ROCKSDB_NAMESPACE::CompactionReason::kRefitLevel; default: // undefined/default return ROCKSDB_NAMESPACE::CompactionReason::kUnknown; diff --git a/java/src/main/java/org/rocksdb/CompactionReason.java b/java/src/main/java/org/rocksdb/CompactionReason.java index 24e234450..46ec33f3f 100644 --- a/java/src/main/java/org/rocksdb/CompactionReason.java +++ b/java/src/main/java/org/rocksdb/CompactionReason.java @@ -88,7 +88,23 @@ public enum CompactionReason { /** * Compaction in order to move files to temperature */ - kChangeTemperature((byte) 0x0F); + kChangeTemperature((byte) 0x0F), + + /** + * Compaction scheduled to force garbage collection of blob files + */ + kForcedBlobGC((byte) 0x11), + + /** + * A special TTL compaction for RoundRobin policy, which basically the same as + * kLevelMaxLevelSize, but the goal is to compact TTLed files. + */ + kRoundRobinTtl((byte) 0x12), + + /** + * Compaction by calling DBImpl::ReFitLevel + */ + kRefitLevel((byte) 0x13); private final byte value;