diff --git a/HISTORY.md b/HISTORY.md index aa728f412..a424099e3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Fixed an issue where `OnFlushCompleted` was not called for atomic flush. * Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`. * Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction. +* Fixed passing of BlobFileCompletionCallback to Compaction job and Atomic flush job which was default paramter (nullptr). BlobFileCompletitionCallback is internal callback that manages addition of blob files to SSTFileManager. ### New Features * Made the EventListener extend the Customizable class. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index df849d519..b1679d756 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -411,7 +411,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, false /* sync_output_directory */, false /* write_manifest */, thread_pri, io_tracer_, db_id_, db_session_id_, - cfd->GetFullHistoryTsLow())); + cfd->GetFullHistoryTsLow(), &blob_callback_)); } std::vector file_meta(num_cfs); @@ -1280,7 +1280,7 @@ Status DBImpl::CompactFilesImpl( c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, &manual_compaction_paused_, nullptr, db_id_, db_session_id_, - c->column_family_data()->GetFullHistoryTsLow()); + c->column_family_data()->GetFullHistoryTsLow(), &blob_callback_); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -3193,7 +3193,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, &compaction_job_stats, thread_pri, io_tracer_, is_manual ? &manual_compaction_paused_ : nullptr, is_manual ? manual_compaction->canceled : nullptr, db_id_, - db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); + db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), + &blob_callback_); compaction_job.Prepare(); NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 77984758f..cba6fb0b7 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -1569,6 +1569,96 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) { ASSERT_EQ(total_sst_files_size, 0); } +// This test if blob files are recorded by SST File Manager when Compaction job +// creates/delete them and in case of AtomicFlush. +TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) { + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.enable_blob_files = true; + options.min_blob_size = 0; + options.disable_auto_compactions = true; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + options.atomic_flush = true; + + int files_added = 0; + int files_deleted = 0; + int files_scheduled_to_delete = 0; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnAddFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + files_added++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnDeleteFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + files_deleted++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) { + assert(arg); + const std::string* const file_path = + static_cast(arg); + if (EndsWith(*file_path, ".blob")) { + ++files_scheduled_to_delete; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + DestroyAndReopen(options); + Random rnd(301); + + ASSERT_OK(Put("key_1", "value_1")); + ASSERT_OK(Put("key_2", "value_2")); + ASSERT_OK(Put("key_3", "value_3")); + ASSERT_OK(Put("key_4", "value_4")); + ASSERT_OK(Flush()); + + // Overwrite will create the garbage data. + ASSERT_OK(Put("key_3", "new_value_3")); + ASSERT_OK(Put("key_4", "new_value_4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + ASSERT_EQ(files_added, 3); + ASSERT_EQ(files_deleted, 0); + ASSERT_EQ(files_scheduled_to_delete, 0); + files_added = 0; + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + // Compaction job will create a new file and delete the older files. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + ASSERT_EQ(files_added, 1); + ASSERT_EQ(files_deleted, 1); + ASSERT_EQ(files_scheduled_to_delete, 1); + + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + sfm->WaitForEmptyTrash(); + ASSERT_EQ(files_deleted, 4); + ASSERT_EQ(files_scheduled_to_delete, 4); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + #endif // ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE