From d6aa8c49f849ebb3878e7f1e721dea89a8cf3dcb Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Thu, 16 Sep 2021 17:17:40 -0700 Subject: [PATCH] Expose blob file information through the EventListener interface (#8675) Summary: 1. Extend FlushJobInfo and CompactionJobInfo with information about the blob files generated by flush/compaction jobs. This PR add two structures BlobFileInfo and BlobFileGarbageInfo that contains the required information of blob files. 2. Notify the creation and deletion of blob files through OnBlobFileCreationStarted, OnBlobFileCreated, and OnBlobFileDeleted. 3. Test OnFile*Finish operations notifications with Blob Files. 4. Log the blob file creation/deletion events through EventLogger in Log file. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8675 Test Plan: Add new unit tests in listener_test Reviewed By: ltamasi Differential Revision: D30412613 Pulled By: akankshamahajan15 fbshipit-source-id: ca51b63c6e8c8d0485a38c503572bc5a82bd5d07 --- HISTORY.md | 3 + db/blob/blob_file_builder.cc | 32 +- db/blob/blob_file_builder.h | 6 +- db/blob/blob_file_builder_test.cc | 14 +- db/blob/blob_file_completion_callback.h | 65 +++- db/builder.cc | 17 +- db/builder.h | 1 + db/compaction/compaction_job.cc | 16 +- db/db_impl/db_impl.cc | 3 +- db/db_impl/db_impl_compaction_flush.cc | 37 ++- db/db_impl/db_impl_files.cc | 5 + db/db_impl/db_impl_open.cc | 5 +- db/event_helpers.cc | 106 ++++++- db/event_helpers.h | 24 ++ db/flush_job.cc | 20 +- db/listener_test.cc | 403 ++++++++++++++++++++++++ db/repair.cc | 5 +- include/rocksdb/listener.h | 154 ++++++++- include/rocksdb/types.h | 6 + 19 files changed, 856 insertions(+), 66 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 5d07ebaad..eccbe3c97 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -22,10 +22,13 @@ * `SstFileWriter` now supports `Put`s and `Delete`s with user-defined timestamps. Note that the ingestion logic itself is not timestamp-aware yet. * Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp. * Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first. +* Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file. ### Public API change * Remove obsolete implementation details FullKey and ParseFullKey from public API * Add a public API RateLimiter::GetTotalPendingRequests() for the total number of requests that are pending for bytes in the rate limiter. +* Extended `FlushJobInfo` and `CompactionJobInfo` in listener.h to provide information about the blob files generated by a flush/compaction and garbage collected during compaction in Integrated BlobDB. Added struct members `blob_file_addition_infos` and `blob_file_garbage_infos` that contain this information. +* Extended parameter `output_file_names` of `CompactFiles` API to also include paths of the blob files generated by the compaction in Integrated BlobDB. ## 6.24.0 (2021-08-20) ### Bug Fixes diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 4a3f3d4b0..48817984a 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -12,6 +12,7 @@ #include "db/blob/blob_index.h" #include "db/blob/blob_log_format.h" #include "db/blob/blob_log_writer.h" +#include "db/event_helpers.h" #include "db/version_set.h" #include "file/filename.h" #include "file/read_write_util.h" @@ -36,13 +37,14 @@ BlobFileBuilder::BlobFileBuilder( Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, BlobFileCompletionCallback* blob_callback, + BlobFileCreationReason creation_reason, std::vector* blob_file_paths, std::vector* blob_file_additions) : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs, immutable_options, mutable_cf_options, file_options, job_id, column_family_id, column_family_name, io_priority, - write_hint, io_tracer, blob_callback, blob_file_paths, - blob_file_additions) {} + write_hint, io_tracer, blob_callback, creation_reason, + blob_file_paths, blob_file_additions) {} BlobFileBuilder::BlobFileBuilder( std::function file_number_generator, FileSystem* fs, @@ -53,6 +55,7 @@ BlobFileBuilder::BlobFileBuilder( Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, BlobFileCompletionCallback* blob_callback, + BlobFileCreationReason creation_reason, std::vector* blob_file_paths, std::vector* blob_file_additions) : file_number_generator_(std::move(file_number_generator)), @@ -69,6 +72,7 @@ BlobFileBuilder::BlobFileBuilder( write_hint_(write_hint), io_tracer_(io_tracer), blob_callback_(blob_callback), + creation_reason_(creation_reason), blob_file_paths_(blob_file_paths), blob_file_additions_(blob_file_additions), blob_count_(0), @@ -161,6 +165,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { std::string blob_file_path = BlobFileName(immutable_options_->cf_paths.front().path, blob_file_number); + if (blob_callback_) { + blob_callback_->OnBlobFileCreationStarted( + blob_file_path, column_family_name_, job_id_, creation_reason_); + } + std::unique_ptr file; { @@ -305,6 +314,13 @@ Status BlobFileBuilder::CloseBlobFile() { const uint64_t blob_file_number = writer_->get_log_number(); + if (blob_callback_) { + s = blob_callback_->OnBlobFileCompleted( + blob_file_paths_->back(), column_family_name_, job_id_, + blob_file_number, creation_reason_, s, checksum_value, checksum_method, + blob_count_, blob_bytes_); + } + assert(blob_file_additions_); blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_, std::move(checksum_method), @@ -316,9 +332,6 @@ Status BlobFileBuilder::CloseBlobFile() { " total blobs, %" PRIu64 " total bytes", column_family_name_.c_str(), job_id_, blob_file_number, blob_count_, blob_bytes_); - if (blob_callback_) { - s = blob_callback_->OnBlobFileCompleted(blob_file_paths_->back()); - } writer_.reset(); blob_count_ = 0; @@ -340,15 +353,18 @@ Status BlobFileBuilder::CloseBlobFileIfNeeded() { return CloseBlobFile(); } -void BlobFileBuilder::Abandon() { +void BlobFileBuilder::Abandon(const Status& s) { if (!IsBlobFileOpen()) { return; } - if (blob_callback_) { // BlobFileBuilder::Abandon() is called because of error while writing to // Blob files. So we can ignore the below error. - blob_callback_->OnBlobFileCompleted(blob_file_paths_->back()) + blob_callback_ + ->OnBlobFileCompleted(blob_file_paths_->back(), column_family_name_, + job_id_, writer_->get_log_number(), + creation_reason_, s, "", "", blob_count_, + blob_bytes_) .PermitUncheckedError(); } diff --git a/db/blob/blob_file_builder.h b/db/blob/blob_file_builder.h index 0929b6a77..745af20eb 100644 --- a/db/blob/blob_file_builder.h +++ b/db/blob/blob_file_builder.h @@ -13,6 +13,7 @@ #include "rocksdb/compression_type.h" #include "rocksdb/env.h" #include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/types.h" namespace ROCKSDB_NAMESPACE { @@ -41,6 +42,7 @@ class BlobFileBuilder { Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, BlobFileCompletionCallback* blob_callback, + BlobFileCreationReason creation_reason, std::vector* blob_file_paths, std::vector* blob_file_additions); @@ -54,6 +56,7 @@ class BlobFileBuilder { Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, BlobFileCompletionCallback* blob_callback, + BlobFileCreationReason creation_reason, std::vector* blob_file_paths, std::vector* blob_file_additions); @@ -64,7 +67,7 @@ class BlobFileBuilder { Status Add(const Slice& key, const Slice& value, std::string* blob_index); Status Finish(); - void Abandon(); + void Abandon(const Status& s); private: bool IsBlobFileOpen() const; @@ -89,6 +92,7 @@ class BlobFileBuilder { Env::WriteLifeTimeHint write_hint_; std::shared_ptr io_tracer_; BlobFileCompletionCallback* blob_callback_; + BlobFileCreationReason creation_reason_; std::vector* blob_file_paths_; std::vector* blob_file_additions_; std::unique_ptr writer_; diff --git a/db/blob/blob_file_builder_test.cc b/db/blob/blob_file_builder_test.cc index 08cfac007..1b85d05e8 100644 --- a/db/blob/blob_file_builder_test.cc +++ b/db/blob/blob_file_builder_test.cc @@ -145,7 +145,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); std::vector> expected_key_value_pairs( number_of_blobs); @@ -229,7 +229,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); std::vector> expected_key_value_pairs( number_of_blobs); @@ -315,7 +315,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); for (size_t i = 0; i < number_of_blobs; ++i) { const std::string key = std::to_string(i); @@ -368,7 +368,7 @@ TEST_F(BlobFileBuilderTest, Compression) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); const std::string key("1"); const std::string uncompressed_value(value_size, 'x'); @@ -450,7 +450,7 @@ TEST_F(BlobFileBuilderTest, CompressionError) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue", [](void* arg) { @@ -528,7 +528,7 @@ TEST_F(BlobFileBuilderTest, Checksum) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); const std::string key("1"); const std::string value("deadbeef"); @@ -624,7 +624,7 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { Status* const s = static_cast(arg); diff --git a/db/blob/blob_file_completion_callback.h b/db/blob/blob_file_completion_callback.h index 42b6def89..8c88de212 100644 --- a/db/blob/blob_file_completion_callback.h +++ b/db/blob/blob_file_completion_callback.h @@ -9,6 +9,7 @@ #pragma once #include "db/error_handler.h" +#include "db/event_helpers.h" #include "file/sst_file_manager_impl.h" #include "rocksdb/status.h" @@ -16,23 +17,46 @@ namespace ROCKSDB_NAMESPACE { class BlobFileCompletionCallback { public: -#ifdef ROCKSDB_LITE - BlobFileCompletionCallback(SstFileManager* /*sst_file_manager*/, - InstrumentedMutex* /*mutex*/, - ErrorHandler* /*error_handler*/) {} - Status OnBlobFileCompleted(const std::string& /*file_name*/) { - return Status::OK(); - } -#else - BlobFileCompletionCallback(SstFileManager* sst_file_manager, - InstrumentedMutex* mutex, - ErrorHandler* error_handler) + BlobFileCompletionCallback( + SstFileManager* sst_file_manager, InstrumentedMutex* mutex, + ErrorHandler* error_handler, EventLogger* event_logger, + const std::vector>& listeners, + const std::string& dbname) : sst_file_manager_(sst_file_manager), mutex_(mutex), - error_handler_(error_handler) {} + error_handler_(error_handler), + event_logger_(event_logger), + listeners_(listeners), + dbname_(dbname) {} - Status OnBlobFileCompleted(const std::string& file_name) { + void OnBlobFileCreationStarted(const std::string& file_name, + const std::string& column_family_name, + int job_id, + BlobFileCreationReason creation_reason) { +#ifndef ROCKSDB_LITE + // Notify the listeners. + EventHelpers::NotifyBlobFileCreationStarted(listeners_, dbname_, + column_family_name, file_name, + job_id, creation_reason); +#else + (void)file_name; + (void)column_family_name; + (void)job_id; + (void)creation_reason; +#endif + } + + Status OnBlobFileCompleted(const std::string& file_name, + const std::string& column_family_name, int job_id, + uint64_t file_number, + BlobFileCreationReason creation_reason, + const Status& report_status, + const std::string& checksum_value, + const std::string& checksum_method, + uint64_t blob_count, uint64_t blob_bytes) { Status s; + +#ifndef ROCKSDB_LITE auto sfm = static_cast(sst_file_manager_); if (sfm) { // Report new blob files to SstFileManagerImpl @@ -45,6 +69,17 @@ class BlobFileCompletionCallback { error_handler_->SetBGError(s, BackgroundErrorReason::kFlush); } } +#endif // !ROCKSDB_LITE + + // Notify the listeners. + EventHelpers::LogAndNotifyBlobFileCreationFinished( + event_logger_, listeners_, dbname_, column_family_name, file_name, + job_id, file_number, creation_reason, + (!report_status.ok() ? report_status : s), + (checksum_value.empty() ? kUnknownFileChecksum : checksum_value), + (checksum_method.empty() ? kUnknownFileChecksumFuncName + : checksum_method), + blob_count, blob_bytes); return s; } @@ -52,6 +87,8 @@ class BlobFileCompletionCallback { SstFileManager* sst_file_manager_; InstrumentedMutex* mutex_; ErrorHandler* error_handler_; -#endif // ROCKSDB_LITE + EventLogger* event_logger_; + std::vector> listeners_; + std::string dbname_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/builder.cc b/db/builder.cc index 16e5744d1..0971e9dc0 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -65,7 +65,8 @@ Status BuildTable( SequenceNumber earliest_write_conflict_snapshot, SnapshotChecker* snapshot_checker, bool paranoid_file_checks, InternalStats* internal_stats, IOStatus* io_status, - const std::shared_ptr& io_tracer, EventLogger* event_logger, + const std::shared_ptr& io_tracer, + BlobFileCreationReason blob_creation_reason, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, TableProperties* table_properties, Env::WriteLifeTimeHint write_hint, const std::string* full_history_ts_low, @@ -178,12 +179,12 @@ Status BuildTable( std::unique_ptr blob_file_builder( (mutable_cf_options.enable_blob_files && blob_file_additions) - ? new BlobFileBuilder(versions, fs, &ioptions, &mutable_cf_options, - &file_options, job_id, - tboptions.column_family_id, - tboptions.column_family_name, io_priority, - write_hint, io_tracer, blob_callback, - &blob_file_paths, blob_file_additions) + ? new BlobFileBuilder( + versions, fs, &ioptions, &mutable_cf_options, &file_options, + job_id, tboptions.column_family_id, + tboptions.column_family_name, io_priority, write_hint, + io_tracer, blob_callback, blob_creation_reason, + &blob_file_paths, blob_file_additions) : nullptr); CompactionIterator c_iter( @@ -311,7 +312,7 @@ Status BuildTable( if (s.ok()) { s = blob_file_builder->Finish(); } else { - blob_file_builder->Abandon(); + blob_file_builder->Abandon(s); } blob_file_builder.reset(); } diff --git a/db/builder.h b/db/builder.h index f8828f5c4..c8f39b237 100644 --- a/db/builder.h +++ b/db/builder.h @@ -60,6 +60,7 @@ extern Status BuildTable( SnapshotChecker* snapshot_checker, bool paranoid_file_checks, InternalStats* internal_stats, IOStatus* io_status, const std::shared_ptr& io_tracer, + BlobFileCreationReason blob_creation_reason, EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, TableProperties* table_properties = nullptr, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 3e2ba08e7..a89fccb99 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1210,13 +1210,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { std::unique_ptr blob_file_builder( mutable_cf_options->enable_blob_files - ? new BlobFileBuilder(versions_, fs_.get(), - sub_compact->compaction->immutable_options(), - mutable_cf_options, &file_options_, job_id_, - cfd->GetID(), cfd->GetName(), - Env::IOPriority::IO_LOW, write_hint_, - io_tracer_, blob_callback_, &blob_file_paths, - &sub_compact->blob_file_additions) + ? new BlobFileBuilder( + versions_, fs_.get(), + sub_compact->compaction->immutable_options(), + mutable_cf_options, &file_options_, job_id_, cfd->GetID(), + cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_, + io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction, + &blob_file_paths, &sub_compact->blob_file_additions) : nullptr); TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); @@ -1427,7 +1427,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (status.ok()) { status = blob_file_builder->Finish(); } else { - blob_file_builder->Abandon(); + blob_file_builder->Abandon(status); } blob_file_builder.reset(); } diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index f23502b47..a3d5bb19b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -237,7 +237,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, closed_(false), atomic_flush_install_cv_(&mutex_), blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_, - &error_handler_) { + &error_handler_, &event_logger_, + immutable_db_options_.listeners, dbname_) { // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. assert(batch_per_txn_ || seq_per_batch_); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index b1679d756..590a2be8e 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1353,10 +1353,15 @@ Status DBImpl::CompactFilesImpl( if (output_file_names != nullptr) { for (const auto& newf : c->edit()->GetNewFiles()) { - (*output_file_names) - .push_back(TableFileName(c->immutable_options()->cf_paths, - newf.second.fd.GetNumber(), - newf.second.fd.GetPathId())); + output_file_names->push_back(TableFileName( + c->immutable_options()->cf_paths, newf.second.fd.GetNumber(), + newf.second.fd.GetPathId())); + } + + for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) { + output_file_names->push_back( + BlobFileName(c->immutable_options()->cf_paths.front().path, + blob_file.GetBlobFileNumber())); } } @@ -3461,6 +3466,30 @@ void DBImpl::BuildCompactionJobInfo( compaction_job_info->output_file_infos.push_back(CompactionFileInfo{ newf.first, file_number, meta.oldest_blob_file_number}); } + compaction_job_info->blob_compression_type = + c->mutable_cf_options()->blob_compression_type; + + // Update BlobFilesInfo. + for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) { + BlobFileAdditionInfo blob_file_addition_info( + BlobFileName(c->immutable_options()->cf_paths.front().path, + blob_file.GetBlobFileNumber()) /*blob_file_path*/, + blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(), + blob_file.GetTotalBlobBytes()); + compaction_job_info->blob_file_addition_infos.emplace_back( + std::move(blob_file_addition_info)); + } + + // Update BlobFilesGarbageInfo. + for (const auto& blob_file : c->edit()->GetBlobFileGarbages()) { + BlobFileGarbageInfo blob_file_garbage_info( + BlobFileName(c->immutable_options()->cf_paths.front().path, + blob_file.GetBlobFileNumber()) /*blob_file_path*/, + blob_file.GetBlobFileNumber(), blob_file.GetGarbageBlobCount(), + blob_file.GetGarbageBlobBytes()); + compaction_job_info->blob_file_garbage_infos.emplace_back( + std::move(blob_file_garbage_info)); + } } #endif diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 96aac73b3..4eb46a2c8 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -360,6 +360,11 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, &event_logger_, job_id, number, fname, file_deletion_status, GetName(), immutable_db_options_.listeners); } + if (type == kBlobFile) { + EventHelpers::LogAndNotifyBlobFileDeletion( + &event_logger_, immutable_db_options_.listeners, job_id, number, fname, + file_deletion_status, GetName()); + } } // Diffs the files listed in filenames and those that do not diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 25e4bc5a3..ee40f7e51 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1424,8 +1424,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, std::move(range_del_iters), &meta, &blob_file_additions, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s, io_tracer_, - &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */, - write_hint, nullptr /*full_history_ts_low*/, &blob_callback_); + BlobFileCreationReason::kRecovery, &event_logger_, job_id, + Env::IO_HIGH, nullptr /* table_properties */, write_hint, + nullptr /*full_history_ts_low*/, &blob_callback_); LogFlush(immutable_db_options_.info_log); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 0073302a0..fd1c6f013 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -37,6 +37,9 @@ void EventHelpers::NotifyTableFileCreationStarted( const std::vector>& listeners, const std::string& db_name, const std::string& cf_name, const std::string& file_path, int job_id, TableFileCreationReason reason) { + if (listeners.empty()) { + return; + } TableFileCreationBriefInfo info; info.db_name = db_name; info.cf_name = cf_name; @@ -54,7 +57,7 @@ void EventHelpers::NotifyOnBackgroundError( BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex, bool* auto_recovery) { #ifndef ROCKSDB_LITE - if (listeners.size() == 0U) { + if (listeners.empty()) { return; } db_mutex->AssertHeld(); @@ -163,7 +166,7 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( } #ifndef ROCKSDB_LITE - if (listeners.size() == 0) { + if (listeners.empty()) { return; } TableFileCreationInfo info; @@ -210,6 +213,9 @@ void EventHelpers::LogAndNotifyTableFileDeletion( event_logger->Log(jwriter); #ifndef ROCKSDB_LITE + if (listeners.empty()) { + return; + } TableFileDeletionInfo info; info.db_name = dbname; info.job_id = job_id; @@ -230,7 +236,7 @@ void EventHelpers::NotifyOnErrorRecoveryCompleted( const std::vector>& listeners, Status old_bg_error, InstrumentedMutex* db_mutex) { #ifndef ROCKSDB_LITE - if (listeners.size() > 0) { + if (!listeners.empty()) { db_mutex->AssertHeld(); // release lock while notifying events db_mutex->Unlock(); @@ -247,4 +253,98 @@ void EventHelpers::NotifyOnErrorRecoveryCompleted( #endif // ROCKSDB_LITE } +#ifndef ROCKSDB_LITE +void EventHelpers::NotifyBlobFileCreationStarted( + const std::vector>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, + BlobFileCreationReason creation_reason) { + if (listeners.empty()) { + return; + } + BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id, + creation_reason); + for (const auto& listener : listeners) { + listener->OnBlobFileCreationStarted(info); + } +} +#endif // !ROCKSDB_LITE + +void EventHelpers::LogAndNotifyBlobFileCreationFinished( + EventLogger* event_logger, + const std::vector>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, uint64_t file_number, + BlobFileCreationReason creation_reason, const Status& s, + const std::string& file_checksum, + const std::string& file_checksum_func_name, uint64_t total_blob_count, + uint64_t total_blob_bytes) { + if (s.ok() && event_logger) { + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + jwriter << "cf_name" << cf_name << "job" << job_id << "event" + << "blob_file_creation" + << "file_number" << file_number << "total_blob_count" + << total_blob_count << "total_blob_bytes" << total_blob_bytes + << "file_checksum" << file_checksum << "file_checksum_func_name" + << file_checksum_func_name << "status" << s.ToString(); + + jwriter.EndObject(); + event_logger->Log(jwriter); + } + +#ifndef ROCKSDB_LITE + if (listeners.empty()) { + return; + } + BlobFileCreationInfo info(db_name, cf_name, file_path, job_id, + creation_reason, total_blob_count, total_blob_bytes, + s, file_checksum, file_checksum_func_name); + for (const auto& listener : listeners) { + listener->OnBlobFileCreated(info); + } + info.status.PermitUncheckedError(); +#else + (void)listeners; + (void)db_name; + (void)file_path; + (void)creation_reason; +#endif +} + +void EventHelpers::LogAndNotifyBlobFileDeletion( + EventLogger* event_logger, + const std::vector>& listeners, int job_id, + uint64_t file_number, const std::string& file_path, const Status& status, + const std::string& dbname) { + if (event_logger) { + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + + jwriter << "job" << job_id << "event" + << "blob_file_deletion" + << "file_number" << file_number; + if (!status.ok()) { + jwriter << "status" << status.ToString(); + } + + jwriter.EndObject(); + event_logger->Log(jwriter); + } +#ifndef ROCKSDB_LITE + if (listeners.empty()) { + return; + } + BlobFileDeletionInfo info(dbname, file_path, job_id, status); + for (const auto& listener : listeners) { + listener->OnBlobFileDeleted(info); + } + info.status.PermitUncheckedError(); +#else + (void)listeners; + (void)dbname; + (void)file_path; +#endif // !ROCKSDB_LITE +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/event_helpers.h b/db/event_helpers.h index abc00981c..f8b7f1d51 100644 --- a/db/event_helpers.h +++ b/db/event_helpers.h @@ -47,6 +47,30 @@ class EventHelpers { const std::vector>& listeners, Status bg_error, InstrumentedMutex* db_mutex); +#ifndef ROCKSDB_LITE + static void NotifyBlobFileCreationStarted( + const std::vector>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, + BlobFileCreationReason creation_reason); +#endif // !ROCKSDB_LITE + + static void LogAndNotifyBlobFileCreationFinished( + EventLogger* event_logger, + const std::vector>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, uint64_t file_number, + BlobFileCreationReason creation_reason, const Status& s, + const std::string& file_checksum, + const std::string& file_checksum_func_name, uint64_t total_blob_count, + uint64_t total_blob_bytes); + + static void LogAndNotifyBlobFileDeletion( + EventLogger* event_logger, + const std::vector>& listeners, int job_id, + uint64_t file_number, const std::string& file_path, const Status& status, + const std::string& db_name); + private: static void LogAndNotifyTableFileCreation( EventLogger* event_logger, diff --git a/db/flush_job.cc b/db/flush_job.cc index d520b709e..a6b93f2b2 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -900,9 +900,10 @@ Status FlushJob::WriteLevel0Table() { &blob_file_additions, existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), - &io_s, io_tracer_, event_logger_, job_context_->job_id, Env::IO_HIGH, - &table_properties_, write_hint, full_history_ts_low, blob_callback_, - &num_input_entries, &memtable_payload_bytes, &memtable_garbage_bytes); + &io_s, io_tracer_, BlobFileCreationReason::kFlush, event_logger_, + job_context_->job_id, Env::IO_HIGH, &table_properties_, write_hint, + full_history_ts_low, blob_callback_, &num_input_entries, + &memtable_payload_bytes, &memtable_garbage_bytes); if (!io_s.ok()) { io_status_ = io_s; } @@ -1021,8 +1022,21 @@ std::unique_ptr FlushJob::GetFlushJobInfo() const { info->largest_seqno = meta_.fd.largest_seqno; info->table_properties = table_properties_; info->flush_reason = cfd_->GetFlushReason(); + info->blob_compression_type = mutable_cf_options_.blob_compression_type; + + // Update BlobFilesInfo. + for (const auto& blob_file : edit_->GetBlobFileAdditions()) { + BlobFileAdditionInfo blob_file_addition_info( + BlobFileName(cfd_->ioptions()->cf_paths.front().path, + blob_file.GetBlobFileNumber()) /*blob_file_path*/, + blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(), + blob_file.GetTotalBlobBytes()); + info->blob_file_addition_infos.emplace_back( + std::move(blob_file_addition_info)); + } return info; } + #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/listener_test.cc b/db/listener_test.cc index e9e6bc7ff..ce53d297b 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -1000,6 +1000,12 @@ class TestFileOperationListener : public EventListener { file_syncs_success_.store(0); file_truncates_.store(0); file_truncates_success_.store(0); + blob_file_reads_.store(0); + blob_file_writes_.store(0); + blob_file_flushes_.store(0); + blob_file_closes_.store(0); + blob_file_syncs_.store(0); + blob_file_truncates_.store(0); } void OnFileReadFinish(const FileOperationInfo& info) override { @@ -1007,6 +1013,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_reads_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_reads_; + } ReportDuration(info); } @@ -1015,6 +1024,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_writes_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_writes_; + } ReportDuration(info); } @@ -1023,6 +1035,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_flushes_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_flushes_; + } ReportDuration(info); } @@ -1031,6 +1046,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_closes_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_closes_; + } ReportDuration(info); } @@ -1039,6 +1057,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_syncs_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_syncs_; + } ReportDuration(info); } @@ -1047,6 +1068,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_truncates_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_truncates_; + } ReportDuration(info); } @@ -1064,6 +1088,12 @@ class TestFileOperationListener : public EventListener { std::atomic file_syncs_success_; std::atomic file_truncates_; std::atomic file_truncates_success_; + std::atomic blob_file_reads_; + std::atomic blob_file_writes_; + std::atomic blob_file_flushes_; + std::atomic blob_file_closes_; + std::atomic blob_file_syncs_; + std::atomic blob_file_truncates_; private: void ReportDuration(const FileOperationInfo& info) const { @@ -1113,6 +1143,379 @@ TEST_F(EventListenerTest, OnFileOperationTest) { } } +TEST_F(EventListenerTest, OnBlobFileOperationTest) { + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + TestFileOperationListener* listener = new TestFileOperationListener(); + options.listeners.emplace_back(listener); + options.disable_auto_compactions = true; + options.enable_blob_files = true; + options.min_blob_size = 0; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + ASSERT_GT(listener->blob_file_writes_.load(), 0U); + ASSERT_GT(listener->blob_file_flushes_.load(), 0U); + Close(); + + Reopen(options); + ASSERT_GT(listener->blob_file_closes_.load(), 0U); + ASSERT_GT(listener->blob_file_syncs_.load(), 0U); + if (true == options.use_direct_io_for_flush_and_compaction) { + ASSERT_GT(listener->blob_file_truncates_.load(), 0U); + } +} + +class BlobDBJobLevelEventListenerTest : public EventListener { + public: + explicit BlobDBJobLevelEventListenerTest(EventListenerTest* test) + : test_(test), call_count_(0) {} + + std::shared_ptr GetBlobFileMetaData( + const VersionStorageInfo::BlobFiles& blob_files, + uint64_t blob_file_number) { + const auto it = blob_files.find(blob_file_number); + + if (it == blob_files.end()) { + return nullptr; + } + + const auto& meta = it->second; + assert(meta); + + return meta; + } + + const VersionStorageInfo::BlobFiles& GetBlobFiles() { + VersionSet* const versions = test_->dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + EXPECT_NE(cfd, nullptr); + + Version* const current = cfd->current(); + EXPECT_NE(current, nullptr); + + const VersionStorageInfo* const storage_info = current->storage_info(); + EXPECT_NE(storage_info, nullptr); + + const auto& blob_files = storage_info->GetBlobFiles(); + return blob_files; + } + + std::vector GetFlushedFiles() { + std::lock_guard lock(mutex_); + std::vector result; + for (const auto& fname : flushed_files_) { + result.push_back(fname); + } + return result; + } + + void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { + call_count_++; + EXPECT_FALSE(info.blob_file_addition_infos.empty()); + const auto& blob_files = GetBlobFiles(); + { + std::lock_guard lock(mutex_); + flushed_files_.push_back(info.file_path); + } + EXPECT_EQ(info.blob_compression_type, kNoCompression); + + for (const auto& blob_file_addition_info : info.blob_file_addition_infos) { + const auto meta = GetBlobFileMetaData( + blob_files, blob_file_addition_info.blob_file_number); + EXPECT_EQ(meta->GetBlobFileNumber(), + blob_file_addition_info.blob_file_number); + EXPECT_EQ(meta->GetTotalBlobBytes(), + blob_file_addition_info.total_blob_bytes); + EXPECT_EQ(meta->GetTotalBlobCount(), + blob_file_addition_info.total_blob_count); + EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty()); + } + } + + void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override { + call_count_++; + EXPECT_FALSE(ci.blob_file_garbage_infos.empty()); + const auto& blob_files = GetBlobFiles(); + EXPECT_EQ(ci.blob_compression_type, kNoCompression); + + for (const auto& blob_file_addition_info : ci.blob_file_addition_infos) { + const auto meta = GetBlobFileMetaData( + blob_files, blob_file_addition_info.blob_file_number); + EXPECT_EQ(meta->GetBlobFileNumber(), + blob_file_addition_info.blob_file_number); + EXPECT_EQ(meta->GetTotalBlobBytes(), + blob_file_addition_info.total_blob_bytes); + EXPECT_EQ(meta->GetTotalBlobCount(), + blob_file_addition_info.total_blob_count); + EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty()); + } + + for (const auto& blob_file_garbage_info : ci.blob_file_garbage_infos) { + EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U); + EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty()); + } + } + + EventListenerTest* test_; + uint32_t call_count_; + + private: + std::vector flushed_files_; + std::mutex mutex_; +}; + +// Test OnFlushCompleted EventListener called for blob files +TEST_F(EventListenerTest, BlobDBOnFlushCompleted) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + + options.min_blob_size = 0; + BlobDBJobLevelEventListenerTest* blob_event_listener = + new BlobDBJobLevelEventListenerTest(this); + options.listeners.emplace_back(blob_event_listener); + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Flush()); + + ASSERT_EQ(Get("Key1"), "blob_value1"); + ASSERT_EQ(Get("Key2"), "blob_value2"); + ASSERT_EQ(Get("Key3"), "blob_value3"); + + ASSERT_GT(blob_event_listener->call_count_, 0U); +} + +// Test OnCompactionCompleted EventListener called for blob files +TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.min_blob_size = 0; + BlobDBJobLevelEventListenerTest* blob_event_listener = + new BlobDBJobLevelEventListenerTest(this); + options.listeners.emplace_back(blob_event_listener); + + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + blob_event_listener->call_count_ = 0; + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + // On compaction, because of blob_garbage_collection_age_cutoff, it will + // delete the oldest blob file and create new blob file during compaction. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + // Make sure, OnCompactionCompleted is called. + ASSERT_GT(blob_event_listener->call_count_, 0U); +} + +// Test CompactFiles calls OnCompactionCompleted EventListener for blob files +// and populate the blob files info. +TEST_F(EventListenerTest, BlobDBCompactFiles) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.min_blob_size = 0; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + BlobDBJobLevelEventListenerTest* blob_event_listener = + new BlobDBJobLevelEventListenerTest(this); + options.listeners.emplace_back(blob_event_listener); + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + std::vector output_file_names; + CompactionJobInfo compaction_job_info; + + // On compaction, because of blob_garbage_collection_age_cutoff, it will + // delete the oldest blob file and create new blob file during compaction + // which will be populated in output_files_names. + ASSERT_OK(dbfull()->CompactFiles( + CompactionOptions(), blob_event_listener->GetFlushedFiles(), 1, -1, + &output_file_names, &compaction_job_info)); + + bool is_blob_in_output = false; + for (const auto& file : output_file_names) { + if (EndsWith(file, ".blob")) { + is_blob_in_output = true; + } + } + ASSERT_TRUE(is_blob_in_output); + + for (const auto& blob_file_addition_info : + compaction_job_info.blob_file_addition_infos) { + EXPECT_GT(blob_file_addition_info.blob_file_number, 0U); + EXPECT_GT(blob_file_addition_info.total_blob_bytes, 0U); + EXPECT_GT(blob_file_addition_info.total_blob_count, 0U); + EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty()); + } + + for (const auto& blob_file_garbage_info : + compaction_job_info.blob_file_garbage_infos) { + EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U); + EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty()); + } +} + +class BlobDBFileLevelEventListener : public EventListener { + public: + BlobDBFileLevelEventListener() { + files_started_.store(0); + files_created_.store(0); + files_deleted_.store(0); + } + + void OnBlobFileCreationStarted( + const BlobFileCreationBriefInfo& info) override { + files_started_++; + EXPECT_FALSE(info.db_name.empty()); + EXPECT_FALSE(info.cf_name.empty()); + EXPECT_FALSE(info.file_path.empty()); + EXPECT_GT(info.job_id, 0); + } + + void OnBlobFileCreated(const BlobFileCreationInfo& info) override { + files_created_++; + EXPECT_FALSE(info.db_name.empty()); + EXPECT_FALSE(info.cf_name.empty()); + EXPECT_FALSE(info.file_path.empty()); + EXPECT_GT(info.job_id, 0); + EXPECT_GT(info.total_blob_count, 0U); + EXPECT_GT(info.total_blob_bytes, 0U); + EXPECT_EQ(info.file_checksum, kUnknownFileChecksum); + EXPECT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName); + EXPECT_TRUE(info.status.ok()); + } + + void OnBlobFileDeleted(const BlobFileDeletionInfo& info) override { + files_deleted_++; + EXPECT_FALSE(info.db_name.empty()); + EXPECT_FALSE(info.file_path.empty()); + EXPECT_GT(info.job_id, 0); + EXPECT_TRUE(info.status.ok()); + } + + void CheckCounters() { + EXPECT_EQ(files_started_, files_created_); + EXPECT_GT(files_started_, 0U); + EXPECT_GT(files_deleted_, 0U); + EXPECT_LT(files_deleted_, files_created_); + } + + private: + std::atomic files_started_; + std::atomic files_created_; + std::atomic files_deleted_; +}; + +TEST_F(EventListenerTest, BlobDBFileTest) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.min_blob_size = 0; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + BlobDBFileLevelEventListener* blob_event_listener = + new BlobDBFileLevelEventListener(); + options.listeners.emplace_back(blob_event_listener); + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + // On compaction, because of blob_garbage_collection_age_cutoff, it will + // delete the oldest blob file and create new blob file during compaction. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + blob_event_listener->CheckCounters(); +} + } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/db/repair.cc b/db/repair.cc index 3efe63dfc..42b874f45 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -450,8 +450,9 @@ class Repairer { std::move(range_del_iters), &meta, nullptr /* blob_file_additions */, {}, kMaxSequenceNumber, snapshot_checker, false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s, - nullptr /*IOTracer*/, nullptr /* event_logger */, 0 /* job_id */, - Env::IO_HIGH, nullptr /* table_properties */, write_hint); + nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery, + nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, + nullptr /* table_properties */, write_hint); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index a70836a27..c79de186d 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -29,8 +29,16 @@ class ColumnFamilyHandle; class Status; struct CompactionJobStats; -struct TableFileCreationBriefInfo { - // the name of the database where the file was created +struct FileCreationBriefInfo { + FileCreationBriefInfo() = default; + FileCreationBriefInfo(const std::string& _db_name, + const std::string& _cf_name, + const std::string& _file_path, int _job_id) + : db_name(_db_name), + cf_name(_cf_name), + file_path(_file_path), + job_id(_job_id) {} + // the name of the database where the file was created. std::string db_name; // the name of the column family where the file was created. std::string cf_name; @@ -38,7 +46,10 @@ struct TableFileCreationBriefInfo { std::string file_path; // the id of the job (which could be flush or compaction) that // created the file. - int job_id; + int job_id = 0; +}; + +struct TableFileCreationBriefInfo : public FileCreationBriefInfo { // reason of creating the table. TableFileCreationReason reason; }; @@ -59,6 +70,44 @@ struct TableFileCreationInfo : public TableFileCreationBriefInfo { std::string file_checksum_func_name; }; +struct BlobFileCreationBriefInfo : public FileCreationBriefInfo { + BlobFileCreationBriefInfo(const std::string& _db_name, + const std::string& _cf_name, + const std::string& _file_path, int _job_id, + BlobFileCreationReason _reason) + : FileCreationBriefInfo(_db_name, _cf_name, _file_path, _job_id), + reason(_reason) {} + // reason of creating the blob file. + BlobFileCreationReason reason; +}; + +struct BlobFileCreationInfo : public BlobFileCreationBriefInfo { + BlobFileCreationInfo(const std::string& _db_name, const std::string& _cf_name, + const std::string& _file_path, int _job_id, + BlobFileCreationReason _reason, + uint64_t _total_blob_count, uint64_t _total_blob_bytes, + Status _status, const std::string& _file_checksum, + const std::string& _file_checksum_func_name) + : BlobFileCreationBriefInfo(_db_name, _cf_name, _file_path, _job_id, + _reason), + total_blob_count(_total_blob_count), + total_blob_bytes(_total_blob_bytes), + status(_status), + file_checksum(_file_checksum), + file_checksum_func_name(_file_checksum_func_name) {} + + // the number of blob in a file. + uint64_t total_blob_count; + // the total bytes in a file. + uint64_t total_blob_bytes; + // The status indicating whether the creation was successful or not. + Status status; + // The checksum of the blob file being created. + std::string file_checksum; + // The checksum function name of checksum generator used for this blob file. + std::string file_checksum_func_name; +}; + enum class CompactionReason : int { kUnknown = 0, // [Level] number of L0 files > level0_file_num_compaction_trigger @@ -150,17 +199,34 @@ struct WriteStallInfo { #ifndef ROCKSDB_LITE -struct TableFileDeletionInfo { +struct FileDeletionInfo { + FileDeletionInfo() = default; + + FileDeletionInfo(const std::string& _db_name, const std::string& _file_path, + int _job_id, Status _status) + : db_name(_db_name), + file_path(_file_path), + job_id(_job_id), + status(_status) {} // The name of the database where the file was deleted. std::string db_name; // The path to the deleted file. std::string file_path; // The id of the job which deleted the file. - int job_id; + int job_id = 0; // The status indicating whether the deletion was successful or not. Status status; }; +struct TableFileDeletionInfo : public FileDeletionInfo {}; + +struct BlobFileDeletionInfo : public FileDeletionInfo { + BlobFileDeletionInfo(const std::string& _db_name, + const std::string& _file_path, int _job_id, + Status _status) + : FileDeletionInfo(_db_name, _file_path, _job_id, _status) {} +}; + enum class FileOperationType { kRead, kWrite, @@ -206,6 +272,39 @@ struct FileOperationInfo { } }; +struct BlobFileInfo { + BlobFileInfo(const std::string& _blob_file_path, + const uint64_t _blob_file_number) + : blob_file_path(_blob_file_path), blob_file_number(_blob_file_number) {} + + std::string blob_file_path; + uint64_t blob_file_number; +}; + +struct BlobFileAdditionInfo : public BlobFileInfo { + BlobFileAdditionInfo(const std::string& _blob_file_path, + const uint64_t _blob_file_number, + const uint64_t _total_blob_count, + const uint64_t _total_blob_bytes) + : BlobFileInfo(_blob_file_path, _blob_file_number), + total_blob_count(_total_blob_count), + total_blob_bytes(_total_blob_bytes) {} + uint64_t total_blob_count; + uint64_t total_blob_bytes; +}; + +struct BlobFileGarbageInfo : public BlobFileInfo { + BlobFileGarbageInfo(const std::string& _blob_file_path, + const uint64_t _blob_file_number, + const uint64_t _garbage_blob_count, + const uint64_t _garbage_blob_bytes) + : BlobFileInfo(_blob_file_path, _blob_file_number), + garbage_blob_count(_garbage_blob_count), + garbage_blob_bytes(_garbage_blob_bytes) {} + uint64_t garbage_blob_count; + uint64_t garbage_blob_bytes; +}; + struct FlushJobInfo { // the id of the column family uint32_t cf_id; @@ -239,6 +338,12 @@ struct FlushJobInfo { TableProperties table_properties; FlushReason flush_reason; + + // Compression algorithm used for blob output files + CompressionType blob_compression_type; + + // Information about blob files created during flush in Integrated BlobDB. + std::vector blob_file_addition_infos; }; struct CompactionFileInfo { @@ -299,6 +404,17 @@ struct CompactionJobInfo { // Statistics and other additional details on the compaction CompactionJobStats stats; + + // Compression algorithm used for blob output files. + CompressionType blob_compression_type; + + // Information about blob files created during compaction in Integrated + // BlobDB. + std::vector blob_file_addition_infos; + + // Information about blob files deleted during compaction in Integrated + // BlobDB. + std::vector blob_file_garbage_infos; }; struct MemTableInfo { @@ -555,6 +671,34 @@ class EventListener : public Customizable { // initiate any further recovery actions needed virtual void OnErrorRecoveryCompleted(Status /* old_bg_error */) {} + // A callback function for RocksDB which will be called before + // a blob file is being created. It will follow by OnBlobFileCreated after + // the creation finishes. + // + // Note that if applications would like to use the passed reference + // outside this function call, they should make copies from these + // returned value. + virtual void OnBlobFileCreationStarted( + const BlobFileCreationBriefInfo& /*info*/) {} + + // A callback function for RocksDB which will be called whenever + // a blob file is created. + // It will be called whether the file is successfully created or not. User can + // check info.status to see if it succeeded or not. + // + // Note that if applications would like to use the passed reference + // outside this function call, they should make copies from these + // returned value. + virtual void OnBlobFileCreated(const BlobFileCreationInfo& /*info*/) {} + + // A callback function for RocksDB which will be called whenever + // a blob file is deleted. + // + // Note that if applications would like to use the passed reference + // outside this function call, they should make copies from these + // returned value. + virtual void OnBlobFileDeleted(const BlobFileDeletionInfo& /*info*/) {} + virtual ~EventListener() {} }; diff --git a/include/rocksdb/types.h b/include/rocksdb/types.h index 14dc39ea3..20cff59b8 100644 --- a/include/rocksdb/types.h +++ b/include/rocksdb/types.h @@ -26,6 +26,12 @@ enum class TableFileCreationReason { kMisc, }; +enum class BlobFileCreationReason { + kFlush, + kCompaction, + kRecovery, +}; + // The types of files RocksDB uses in a DB directory. (Available for // advanced options.) enum FileType {