Fix blob callback in compaction and atomic flush (#8681)

Summary:
Pass BlobFileCompletionCallback  in case of atomic flush and
compaction job which is currently nullptr(default parameter).
BlobFileCompletionCallback is used in case of IntegratedBlobDB to report new blob files to
SstFileManager.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8681

Test Plan: CircleCI jobs

Reviewed By: ltamasi

Differential Revision: D30445998

Pulled By: akankshamahajan15

fbshipit-source-id: ba48093843864faec57f1f365cce7b5a569c4021
main
Akanksha Mahajan 3 years ago committed by Facebook GitHub Bot
parent ff8953380f
commit 5efec84c60
  1. 1
      HISTORY.md
  2. 7
      db/db_impl/db_impl_compaction_flush.cc
  3. 90
      db/db_sst_test.cc

@ -8,6 +8,7 @@
* Fixed an issue where `OnFlushCompleted` was not called for atomic flush. * Fixed an issue where `OnFlushCompleted` was not called for atomic flush.
* Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`. * Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`.
* Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction. * Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction.
* Fixed passing of BlobFileCompletionCallback to Compaction job and Atomic flush job which was default paramter (nullptr). BlobFileCompletitionCallback is internal callback that manages addition of blob files to SSTFileManager.
### New Features ### New Features
* Made the EventListener extend the Customizable class. * Made the EventListener extend the Customizable class.

@ -411,7 +411,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */, false /* sync_output_directory */, false /* write_manifest */,
thread_pri, io_tracer_, db_id_, db_session_id_, thread_pri, io_tracer_, db_id_, db_session_id_,
cfd->GetFullHistoryTsLow())); cfd->GetFullHistoryTsLow(), &blob_callback_));
} }
std::vector<FileMetaData> file_meta(num_cfs); std::vector<FileMetaData> file_meta(num_cfs);
@ -1280,7 +1280,7 @@ Status DBImpl::CompactFilesImpl(
c->mutable_cf_options()->report_bg_io_stats, dbname_, c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, io_tracer_, &compaction_job_stats, Env::Priority::USER, io_tracer_,
&manual_compaction_paused_, nullptr, db_id_, db_session_id_, &manual_compaction_paused_, nullptr, db_id_, db_session_id_,
c->column_family_data()->GetFullHistoryTsLow()); c->column_family_data()->GetFullHistoryTsLow(), &blob_callback_);
// Creating a compaction influences the compaction score because the score // Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already // takes running compactions into account (by skipping files that are already
@ -3193,7 +3193,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&compaction_job_stats, thread_pri, io_tracer_, &compaction_job_stats, thread_pri, io_tracer_,
is_manual ? &manual_compaction_paused_ : nullptr, is_manual ? &manual_compaction_paused_ : nullptr,
is_manual ? manual_compaction->canceled : nullptr, db_id_, is_manual ? manual_compaction->canceled : nullptr, db_id_,
db_session_id_, c->column_family_data()->GetFullHistoryTsLow()); db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
&blob_callback_);
compaction_job.Prepare(); compaction_job.Prepare();
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,

@ -1569,6 +1569,96 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
ASSERT_EQ(total_sst_files_size, 0); ASSERT_EQ(total_sst_files_size, 0);
} }
// This test if blob files are recorded by SST File Manager when Compaction job
// creates/delete them and in case of AtomicFlush.
TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.enable_blob_files = true;
options.min_blob_size = 0;
options.disable_auto_compactions = true;
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 0.5;
options.atomic_flush = true;
int files_added = 0;
int files_deleted = 0;
int files_scheduled_to_delete = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnAddFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_added++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_deleted++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
DestroyAndReopen(options);
Random rnd(301);
ASSERT_OK(Put("key_1", "value_1"));
ASSERT_OK(Put("key_2", "value_2"));
ASSERT_OK(Put("key_3", "value_3"));
ASSERT_OK(Put("key_4", "value_4"));
ASSERT_OK(Flush());
// Overwrite will create the garbage data.
ASSERT_OK(Put("key_3", "new_value_3"));
ASSERT_OK(Put("key_4", "new_value_4"));
ASSERT_OK(Flush());
ASSERT_OK(Put("Key5", "blob_value5"));
ASSERT_OK(Put("Key6", "blob_value6"));
ASSERT_OK(Flush());
ASSERT_EQ(files_added, 3);
ASSERT_EQ(files_deleted, 0);
ASSERT_EQ(files_scheduled_to_delete, 0);
files_added = 0;
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
// Compaction job will create a new file and delete the older files.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
ASSERT_EQ(files_added, 1);
ASSERT_EQ(files_deleted, 1);
ASSERT_EQ(files_scheduled_to_delete, 1);
Close();
ASSERT_OK(DestroyDB(dbname_, options));
sfm->WaitForEmptyTrash();
ASSERT_EQ(files_deleted, 4);
ASSERT_EQ(files_scheduled_to_delete, 4);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save