diff --git a/HISTORY.md b/HISTORY.md index 5d07ebaad..eccbe3c97 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -22,10 +22,13 @@ * `SstFileWriter` now supports `Put`s and `Delete`s with user-defined timestamps. Note that the ingestion logic itself is not timestamp-aware yet. * Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp. * Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first. +* Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file. ### Public API change * Remove obsolete implementation details FullKey and ParseFullKey from public API * Add a public API RateLimiter::GetTotalPendingRequests() for the total number of requests that are pending for bytes in the rate limiter. +* Extended `FlushJobInfo` and `CompactionJobInfo` in listener.h to provide information about the blob files generated by a flush/compaction and garbage collected during compaction in Integrated BlobDB. Added struct members `blob_file_addition_infos` and `blob_file_garbage_infos` that contain this information. +* Extended parameter `output_file_names` of `CompactFiles` API to also include paths of the blob files generated by the compaction in Integrated BlobDB. ## 6.24.0 (2021-08-20) ### Bug Fixes diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 4a3f3d4b0..48817984a 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -12,6 +12,7 @@ #include "db/blob/blob_index.h" #include "db/blob/blob_log_format.h" #include "db/blob/blob_log_writer.h" +#include "db/event_helpers.h" #include "db/version_set.h" #include "file/filename.h" #include "file/read_write_util.h" @@ -36,13 +37,14 @@ BlobFileBuilder::BlobFileBuilder( Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, BlobFileCompletionCallback* blob_callback, + BlobFileCreationReason creation_reason, std::vector* blob_file_paths, std::vector* blob_file_additions) : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs, immutable_options, mutable_cf_options, file_options, job_id, column_family_id, column_family_name, io_priority, - write_hint, io_tracer, blob_callback, blob_file_paths, - blob_file_additions) {} + write_hint, io_tracer, blob_callback, creation_reason, + blob_file_paths, blob_file_additions) {} BlobFileBuilder::BlobFileBuilder( std::function file_number_generator, FileSystem* fs, @@ -53,6 +55,7 @@ BlobFileBuilder::BlobFileBuilder( Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, BlobFileCompletionCallback* blob_callback, + BlobFileCreationReason creation_reason, std::vector* blob_file_paths, std::vector* blob_file_additions) : file_number_generator_(std::move(file_number_generator)), @@ -69,6 +72,7 @@ BlobFileBuilder::BlobFileBuilder( write_hint_(write_hint), io_tracer_(io_tracer), blob_callback_(blob_callback), + creation_reason_(creation_reason), blob_file_paths_(blob_file_paths), blob_file_additions_(blob_file_additions), blob_count_(0), @@ -161,6 +165,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { std::string blob_file_path = BlobFileName(immutable_options_->cf_paths.front().path, blob_file_number); + if (blob_callback_) { + blob_callback_->OnBlobFileCreationStarted( + blob_file_path, column_family_name_, job_id_, creation_reason_); + } + std::unique_ptr file; { @@ -305,6 +314,13 @@ Status BlobFileBuilder::CloseBlobFile() { const uint64_t blob_file_number = writer_->get_log_number(); + if (blob_callback_) { + s = blob_callback_->OnBlobFileCompleted( + blob_file_paths_->back(), column_family_name_, job_id_, + blob_file_number, creation_reason_, s, checksum_value, checksum_method, + blob_count_, blob_bytes_); + } + assert(blob_file_additions_); blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_, std::move(checksum_method), @@ -316,9 +332,6 @@ Status BlobFileBuilder::CloseBlobFile() { " total blobs, %" PRIu64 " total bytes", column_family_name_.c_str(), job_id_, blob_file_number, blob_count_, blob_bytes_); - if (blob_callback_) { - s = blob_callback_->OnBlobFileCompleted(blob_file_paths_->back()); - } writer_.reset(); blob_count_ = 0; @@ -340,15 +353,18 @@ Status BlobFileBuilder::CloseBlobFileIfNeeded() { return CloseBlobFile(); } -void BlobFileBuilder::Abandon() { +void BlobFileBuilder::Abandon(const Status& s) { if (!IsBlobFileOpen()) { return; } - if (blob_callback_) { // BlobFileBuilder::Abandon() is called because of error while writing to // Blob files. So we can ignore the below error. - blob_callback_->OnBlobFileCompleted(blob_file_paths_->back()) + blob_callback_ + ->OnBlobFileCompleted(blob_file_paths_->back(), column_family_name_, + job_id_, writer_->get_log_number(), + creation_reason_, s, "", "", blob_count_, + blob_bytes_) .PermitUncheckedError(); } diff --git a/db/blob/blob_file_builder.h b/db/blob/blob_file_builder.h index 0929b6a77..745af20eb 100644 --- a/db/blob/blob_file_builder.h +++ b/db/blob/blob_file_builder.h @@ -13,6 +13,7 @@ #include "rocksdb/compression_type.h" #include "rocksdb/env.h" #include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/types.h" namespace ROCKSDB_NAMESPACE { @@ -41,6 +42,7 @@ class BlobFileBuilder { Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, BlobFileCompletionCallback* blob_callback, + BlobFileCreationReason creation_reason, std::vector* blob_file_paths, std::vector* blob_file_additions); @@ -54,6 +56,7 @@ class BlobFileBuilder { Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, BlobFileCompletionCallback* blob_callback, + BlobFileCreationReason creation_reason, std::vector* blob_file_paths, std::vector* blob_file_additions); @@ -64,7 +67,7 @@ class BlobFileBuilder { Status Add(const Slice& key, const Slice& value, std::string* blob_index); Status Finish(); - void Abandon(); + void Abandon(const Status& s); private: bool IsBlobFileOpen() const; @@ -89,6 +92,7 @@ class BlobFileBuilder { Env::WriteLifeTimeHint write_hint_; std::shared_ptr io_tracer_; BlobFileCompletionCallback* blob_callback_; + BlobFileCreationReason creation_reason_; std::vector* blob_file_paths_; std::vector* blob_file_additions_; std::unique_ptr writer_; diff --git a/db/blob/blob_file_builder_test.cc b/db/blob/blob_file_builder_test.cc index 08cfac007..1b85d05e8 100644 --- a/db/blob/blob_file_builder_test.cc +++ b/db/blob/blob_file_builder_test.cc @@ -145,7 +145,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); std::vector> expected_key_value_pairs( number_of_blobs); @@ -229,7 +229,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); std::vector> expected_key_value_pairs( number_of_blobs); @@ -315,7 +315,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); for (size_t i = 0; i < number_of_blobs; ++i) { const std::string key = std::to_string(i); @@ -368,7 +368,7 @@ TEST_F(BlobFileBuilderTest, Compression) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); const std::string key("1"); const std::string uncompressed_value(value_size, 'x'); @@ -450,7 +450,7 @@ TEST_F(BlobFileBuilderTest, CompressionError) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue", [](void* arg) { @@ -528,7 +528,7 @@ TEST_F(BlobFileBuilderTest, Checksum) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); const std::string key("1"); const std::string value("deadbeef"); @@ -624,7 +624,7 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) { TestFileNumberGenerator(), fs_, &immutable_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, nullptr /*IOTracer*/, nullptr /*BlobFileCompletionCallback*/, - &blob_file_paths, &blob_file_additions); + BlobFileCreationReason::kFlush, &blob_file_paths, &blob_file_additions); SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { Status* const s = static_cast(arg); diff --git a/db/blob/blob_file_completion_callback.h b/db/blob/blob_file_completion_callback.h index 42b6def89..8c88de212 100644 --- a/db/blob/blob_file_completion_callback.h +++ b/db/blob/blob_file_completion_callback.h @@ -9,6 +9,7 @@ #pragma once #include "db/error_handler.h" +#include "db/event_helpers.h" #include "file/sst_file_manager_impl.h" #include "rocksdb/status.h" @@ -16,23 +17,46 @@ namespace ROCKSDB_NAMESPACE { class BlobFileCompletionCallback { public: -#ifdef ROCKSDB_LITE - BlobFileCompletionCallback(SstFileManager* /*sst_file_manager*/, - InstrumentedMutex* /*mutex*/, - ErrorHandler* /*error_handler*/) {} - Status OnBlobFileCompleted(const std::string& /*file_name*/) { - return Status::OK(); - } -#else - BlobFileCompletionCallback(SstFileManager* sst_file_manager, - InstrumentedMutex* mutex, - ErrorHandler* error_handler) + BlobFileCompletionCallback( + SstFileManager* sst_file_manager, InstrumentedMutex* mutex, + ErrorHandler* error_handler, EventLogger* event_logger, + const std::vector>& listeners, + const std::string& dbname) : sst_file_manager_(sst_file_manager), mutex_(mutex), - error_handler_(error_handler) {} + error_handler_(error_handler), + event_logger_(event_logger), + listeners_(listeners), + dbname_(dbname) {} - Status OnBlobFileCompleted(const std::string& file_name) { + void OnBlobFileCreationStarted(const std::string& file_name, + const std::string& column_family_name, + int job_id, + BlobFileCreationReason creation_reason) { +#ifndef ROCKSDB_LITE + // Notify the listeners. + EventHelpers::NotifyBlobFileCreationStarted(listeners_, dbname_, + column_family_name, file_name, + job_id, creation_reason); +#else + (void)file_name; + (void)column_family_name; + (void)job_id; + (void)creation_reason; +#endif + } + + Status OnBlobFileCompleted(const std::string& file_name, + const std::string& column_family_name, int job_id, + uint64_t file_number, + BlobFileCreationReason creation_reason, + const Status& report_status, + const std::string& checksum_value, + const std::string& checksum_method, + uint64_t blob_count, uint64_t blob_bytes) { Status s; + +#ifndef ROCKSDB_LITE auto sfm = static_cast(sst_file_manager_); if (sfm) { // Report new blob files to SstFileManagerImpl @@ -45,6 +69,17 @@ class BlobFileCompletionCallback { error_handler_->SetBGError(s, BackgroundErrorReason::kFlush); } } +#endif // !ROCKSDB_LITE + + // Notify the listeners. + EventHelpers::LogAndNotifyBlobFileCreationFinished( + event_logger_, listeners_, dbname_, column_family_name, file_name, + job_id, file_number, creation_reason, + (!report_status.ok() ? report_status : s), + (checksum_value.empty() ? kUnknownFileChecksum : checksum_value), + (checksum_method.empty() ? kUnknownFileChecksumFuncName + : checksum_method), + blob_count, blob_bytes); return s; } @@ -52,6 +87,8 @@ class BlobFileCompletionCallback { SstFileManager* sst_file_manager_; InstrumentedMutex* mutex_; ErrorHandler* error_handler_; -#endif // ROCKSDB_LITE + EventLogger* event_logger_; + std::vector> listeners_; + std::string dbname_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/builder.cc b/db/builder.cc index 16e5744d1..0971e9dc0 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -65,7 +65,8 @@ Status BuildTable( SequenceNumber earliest_write_conflict_snapshot, SnapshotChecker* snapshot_checker, bool paranoid_file_checks, InternalStats* internal_stats, IOStatus* io_status, - const std::shared_ptr& io_tracer, EventLogger* event_logger, + const std::shared_ptr& io_tracer, + BlobFileCreationReason blob_creation_reason, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, TableProperties* table_properties, Env::WriteLifeTimeHint write_hint, const std::string* full_history_ts_low, @@ -178,12 +179,12 @@ Status BuildTable( std::unique_ptr blob_file_builder( (mutable_cf_options.enable_blob_files && blob_file_additions) - ? new BlobFileBuilder(versions, fs, &ioptions, &mutable_cf_options, - &file_options, job_id, - tboptions.column_family_id, - tboptions.column_family_name, io_priority, - write_hint, io_tracer, blob_callback, - &blob_file_paths, blob_file_additions) + ? new BlobFileBuilder( + versions, fs, &ioptions, &mutable_cf_options, &file_options, + job_id, tboptions.column_family_id, + tboptions.column_family_name, io_priority, write_hint, + io_tracer, blob_callback, blob_creation_reason, + &blob_file_paths, blob_file_additions) : nullptr); CompactionIterator c_iter( @@ -311,7 +312,7 @@ Status BuildTable( if (s.ok()) { s = blob_file_builder->Finish(); } else { - blob_file_builder->Abandon(); + blob_file_builder->Abandon(s); } blob_file_builder.reset(); } diff --git a/db/builder.h b/db/builder.h index f8828f5c4..c8f39b237 100644 --- a/db/builder.h +++ b/db/builder.h @@ -60,6 +60,7 @@ extern Status BuildTable( SnapshotChecker* snapshot_checker, bool paranoid_file_checks, InternalStats* internal_stats, IOStatus* io_status, const std::shared_ptr& io_tracer, + BlobFileCreationReason blob_creation_reason, EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, TableProperties* table_properties = nullptr, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 3e2ba08e7..a89fccb99 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1210,13 +1210,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { std::unique_ptr blob_file_builder( mutable_cf_options->enable_blob_files - ? new BlobFileBuilder(versions_, fs_.get(), - sub_compact->compaction->immutable_options(), - mutable_cf_options, &file_options_, job_id_, - cfd->GetID(), cfd->GetName(), - Env::IOPriority::IO_LOW, write_hint_, - io_tracer_, blob_callback_, &blob_file_paths, - &sub_compact->blob_file_additions) + ? new BlobFileBuilder( + versions_, fs_.get(), + sub_compact->compaction->immutable_options(), + mutable_cf_options, &file_options_, job_id_, cfd->GetID(), + cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_, + io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction, + &blob_file_paths, &sub_compact->blob_file_additions) : nullptr); TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); @@ -1427,7 +1427,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (status.ok()) { status = blob_file_builder->Finish(); } else { - blob_file_builder->Abandon(); + blob_file_builder->Abandon(status); } blob_file_builder.reset(); } diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index f23502b47..a3d5bb19b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -237,7 +237,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, closed_(false), atomic_flush_install_cv_(&mutex_), blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_, - &error_handler_) { + &error_handler_, &event_logger_, + immutable_db_options_.listeners, dbname_) { // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. assert(batch_per_txn_ || seq_per_batch_); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index b1679d756..590a2be8e 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1353,10 +1353,15 @@ Status DBImpl::CompactFilesImpl( if (output_file_names != nullptr) { for (const auto& newf : c->edit()->GetNewFiles()) { - (*output_file_names) - .push_back(TableFileName(c->immutable_options()->cf_paths, - newf.second.fd.GetNumber(), - newf.second.fd.GetPathId())); + output_file_names->push_back(TableFileName( + c->immutable_options()->cf_paths, newf.second.fd.GetNumber(), + newf.second.fd.GetPathId())); + } + + for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) { + output_file_names->push_back( + BlobFileName(c->immutable_options()->cf_paths.front().path, + blob_file.GetBlobFileNumber())); } } @@ -3461,6 +3466,30 @@ void DBImpl::BuildCompactionJobInfo( compaction_job_info->output_file_infos.push_back(CompactionFileInfo{ newf.first, file_number, meta.oldest_blob_file_number}); } + compaction_job_info->blob_compression_type = + c->mutable_cf_options()->blob_compression_type; + + // Update BlobFilesInfo. + for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) { + BlobFileAdditionInfo blob_file_addition_info( + BlobFileName(c->immutable_options()->cf_paths.front().path, + blob_file.GetBlobFileNumber()) /*blob_file_path*/, + blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(), + blob_file.GetTotalBlobBytes()); + compaction_job_info->blob_file_addition_infos.emplace_back( + std::move(blob_file_addition_info)); + } + + // Update BlobFilesGarbageInfo. + for (const auto& blob_file : c->edit()->GetBlobFileGarbages()) { + BlobFileGarbageInfo blob_file_garbage_info( + BlobFileName(c->immutable_options()->cf_paths.front().path, + blob_file.GetBlobFileNumber()) /*blob_file_path*/, + blob_file.GetBlobFileNumber(), blob_file.GetGarbageBlobCount(), + blob_file.GetGarbageBlobBytes()); + compaction_job_info->blob_file_garbage_infos.emplace_back( + std::move(blob_file_garbage_info)); + } } #endif diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 96aac73b3..4eb46a2c8 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -360,6 +360,11 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, &event_logger_, job_id, number, fname, file_deletion_status, GetName(), immutable_db_options_.listeners); } + if (type == kBlobFile) { + EventHelpers::LogAndNotifyBlobFileDeletion( + &event_logger_, immutable_db_options_.listeners, job_id, number, fname, + file_deletion_status, GetName()); + } } // Diffs the files listed in filenames and those that do not diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 25e4bc5a3..ee40f7e51 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1424,8 +1424,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, std::move(range_del_iters), &meta, &blob_file_additions, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s, io_tracer_, - &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */, - write_hint, nullptr /*full_history_ts_low*/, &blob_callback_); + BlobFileCreationReason::kRecovery, &event_logger_, job_id, + Env::IO_HIGH, nullptr /* table_properties */, write_hint, + nullptr /*full_history_ts_low*/, &blob_callback_); LogFlush(immutable_db_options_.info_log); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 0073302a0..fd1c6f013 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -37,6 +37,9 @@ void EventHelpers::NotifyTableFileCreationStarted( const std::vector>& listeners, const std::string& db_name, const std::string& cf_name, const std::string& file_path, int job_id, TableFileCreationReason reason) { + if (listeners.empty()) { + return; + } TableFileCreationBriefInfo info; info.db_name = db_name; info.cf_name = cf_name; @@ -54,7 +57,7 @@ void EventHelpers::NotifyOnBackgroundError( BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex, bool* auto_recovery) { #ifndef ROCKSDB_LITE - if (listeners.size() == 0U) { + if (listeners.empty()) { return; } db_mutex->AssertHeld(); @@ -163,7 +166,7 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( } #ifndef ROCKSDB_LITE - if (listeners.size() == 0) { + if (listeners.empty()) { return; } TableFileCreationInfo info; @@ -210,6 +213,9 @@ void EventHelpers::LogAndNotifyTableFileDeletion( event_logger->Log(jwriter); #ifndef ROCKSDB_LITE + if (listeners.empty()) { + return; + } TableFileDeletionInfo info; info.db_name = dbname; info.job_id = job_id; @@ -230,7 +236,7 @@ void EventHelpers::NotifyOnErrorRecoveryCompleted( const std::vector>& listeners, Status old_bg_error, InstrumentedMutex* db_mutex) { #ifndef ROCKSDB_LITE - if (listeners.size() > 0) { + if (!listeners.empty()) { db_mutex->AssertHeld(); // release lock while notifying events db_mutex->Unlock(); @@ -247,4 +253,98 @@ void EventHelpers::NotifyOnErrorRecoveryCompleted( #endif // ROCKSDB_LITE } +#ifndef ROCKSDB_LITE +void EventHelpers::NotifyBlobFileCreationStarted( + const std::vector>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, + BlobFileCreationReason creation_reason) { + if (listeners.empty()) { + return; + } + BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id, + creation_reason); + for (const auto& listener : listeners) { + listener->OnBlobFileCreationStarted(info); + } +} +#endif // !ROCKSDB_LITE + +void EventHelpers::LogAndNotifyBlobFileCreationFinished( + EventLogger* event_logger, + const std::vector>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, uint64_t file_number, + BlobFileCreationReason creation_reason, const Status& s, + const std::string& file_checksum, + const std::string& file_checksum_func_name, uint64_t total_blob_count, + uint64_t total_blob_bytes) { + if (s.ok() && event_logger) { + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + jwriter << "cf_name" << cf_name << "job" << job_id << "event" + << "blob_file_creation" + << "file_number" << file_number << "total_blob_count" + << total_blob_count << "total_blob_bytes" << total_blob_bytes + << "file_checksum" << file_checksum << "file_checksum_func_name" + << file_checksum_func_name << "status" << s.ToString(); + + jwriter.EndObject(); + event_logger->Log(jwriter); + } + +#ifndef ROCKSDB_LITE + if (listeners.empty()) { + return; + } + BlobFileCreationInfo info(db_name, cf_name, file_path, job_id, + creation_reason, total_blob_count, total_blob_bytes, + s, file_checksum, file_checksum_func_name); + for (const auto& listener : listeners) { + listener->OnBlobFileCreated(info); + } + info.status.PermitUncheckedError(); +#else + (void)listeners; + (void)db_name; + (void)file_path; + (void)creation_reason; +#endif +} + +void EventHelpers::LogAndNotifyBlobFileDeletion( + EventLogger* event_logger, + const std::vector>& listeners, int job_id, + uint64_t file_number, const std::string& file_path, const Status& status, + const std::string& dbname) { + if (event_logger) { + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + + jwriter << "job" << job_id << "event" + << "blob_file_deletion" + << "file_number" << file_number; + if (!status.ok()) { + jwriter << "status" << status.ToString(); + } + + jwriter.EndObject(); + event_logger->Log(jwriter); + } +#ifndef ROCKSDB_LITE + if (listeners.empty()) { + return; + } + BlobFileDeletionInfo info(dbname, file_path, job_id, status); + for (const auto& listener : listeners) { + listener->OnBlobFileDeleted(info); + } + info.status.PermitUncheckedError(); +#else + (void)listeners; + (void)dbname; + (void)file_path; +#endif // !ROCKSDB_LITE +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/event_helpers.h b/db/event_helpers.h index abc00981c..f8b7f1d51 100644 --- a/db/event_helpers.h +++ b/db/event_helpers.h @@ -47,6 +47,30 @@ class EventHelpers { const std::vector>& listeners, Status bg_error, InstrumentedMutex* db_mutex); +#ifndef ROCKSDB_LITE + static void NotifyBlobFileCreationStarted( + const std::vector>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, + BlobFileCreationReason creation_reason); +#endif // !ROCKSDB_LITE + + static void LogAndNotifyBlobFileCreationFinished( + EventLogger* event_logger, + const std::vector>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, uint64_t file_number, + BlobFileCreationReason creation_reason, const Status& s, + const std::string& file_checksum, + const std::string& file_checksum_func_name, uint64_t total_blob_count, + uint64_t total_blob_bytes); + + static void LogAndNotifyBlobFileDeletion( + EventLogger* event_logger, + const std::vector>& listeners, int job_id, + uint64_t file_number, const std::string& file_path, const Status& status, + const std::string& db_name); + private: static void LogAndNotifyTableFileCreation( EventLogger* event_logger, diff --git a/db/flush_job.cc b/db/flush_job.cc index d520b709e..a6b93f2b2 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -900,9 +900,10 @@ Status FlushJob::WriteLevel0Table() { &blob_file_additions, existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), - &io_s, io_tracer_, event_logger_, job_context_->job_id, Env::IO_HIGH, - &table_properties_, write_hint, full_history_ts_low, blob_callback_, - &num_input_entries, &memtable_payload_bytes, &memtable_garbage_bytes); + &io_s, io_tracer_, BlobFileCreationReason::kFlush, event_logger_, + job_context_->job_id, Env::IO_HIGH, &table_properties_, write_hint, + full_history_ts_low, blob_callback_, &num_input_entries, + &memtable_payload_bytes, &memtable_garbage_bytes); if (!io_s.ok()) { io_status_ = io_s; } @@ -1021,8 +1022,21 @@ std::unique_ptr FlushJob::GetFlushJobInfo() const { info->largest_seqno = meta_.fd.largest_seqno; info->table_properties = table_properties_; info->flush_reason = cfd_->GetFlushReason(); + info->blob_compression_type = mutable_cf_options_.blob_compression_type; + + // Update BlobFilesInfo. + for (const auto& blob_file : edit_->GetBlobFileAdditions()) { + BlobFileAdditionInfo blob_file_addition_info( + BlobFileName(cfd_->ioptions()->cf_paths.front().path, + blob_file.GetBlobFileNumber()) /*blob_file_path*/, + blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(), + blob_file.GetTotalBlobBytes()); + info->blob_file_addition_infos.emplace_back( + std::move(blob_file_addition_info)); + } return info; } + #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/listener_test.cc b/db/listener_test.cc index e9e6bc7ff..ce53d297b 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -1000,6 +1000,12 @@ class TestFileOperationListener : public EventListener { file_syncs_success_.store(0); file_truncates_.store(0); file_truncates_success_.store(0); + blob_file_reads_.store(0); + blob_file_writes_.store(0); + blob_file_flushes_.store(0); + blob_file_closes_.store(0); + blob_file_syncs_.store(0); + blob_file_truncates_.store(0); } void OnFileReadFinish(const FileOperationInfo& info) override { @@ -1007,6 +1013,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_reads_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_reads_; + } ReportDuration(info); } @@ -1015,6 +1024,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_writes_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_writes_; + } ReportDuration(info); } @@ -1023,6 +1035,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_flushes_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_flushes_; + } ReportDuration(info); } @@ -1031,6 +1046,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_closes_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_closes_; + } ReportDuration(info); } @@ -1039,6 +1057,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_syncs_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_syncs_; + } ReportDuration(info); } @@ -1047,6 +1068,9 @@ class TestFileOperationListener : public EventListener { if (info.status.ok()) { ++file_truncates_success_; } + if (EndsWith(info.path, ".blob")) { + ++blob_file_truncates_; + } ReportDuration(info); } @@ -1064,6 +1088,12 @@ class TestFileOperationListener : public EventListener { std::atomic file_syncs_success_; std::atomic file_truncates_; std::atomic file_truncates_success_; + std::atomic blob_file_reads_; + std::atomic blob_file_writes_; + std::atomic blob_file_flushes_; + std::atomic blob_file_closes_; + std::atomic blob_file_syncs_; + std::atomic blob_file_truncates_; private: void ReportDuration(const FileOperationInfo& info) const { @@ -1113,6 +1143,379 @@ TEST_F(EventListenerTest, OnFileOperationTest) { } } +TEST_F(EventListenerTest, OnBlobFileOperationTest) { + Options options; + options.env = CurrentOptions().env; + options.create_if_missing = true; + TestFileOperationListener* listener = new TestFileOperationListener(); + options.listeners.emplace_back(listener); + options.disable_auto_compactions = true; + options.enable_blob_files = true; + options.min_blob_size = 0; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + ASSERT_GT(listener->blob_file_writes_.load(), 0U); + ASSERT_GT(listener->blob_file_flushes_.load(), 0U); + Close(); + + Reopen(options); + ASSERT_GT(listener->blob_file_closes_.load(), 0U); + ASSERT_GT(listener->blob_file_syncs_.load(), 0U); + if (true == options.use_direct_io_for_flush_and_compaction) { + ASSERT_GT(listener->blob_file_truncates_.load(), 0U); + } +} + +class BlobDBJobLevelEventListenerTest : public EventListener { + public: + explicit BlobDBJobLevelEventListenerTest(EventListenerTest* test) + : test_(test), call_count_(0) {} + + std::shared_ptr GetBlobFileMetaData( + const VersionStorageInfo::BlobFiles& blob_files, + uint64_t blob_file_number) { + const auto it = blob_files.find(blob_file_number); + + if (it == blob_files.end()) { + return nullptr; + } + + const auto& meta = it->second; + assert(meta); + + return meta; + } + + const VersionStorageInfo::BlobFiles& GetBlobFiles() { + VersionSet* const versions = test_->dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + EXPECT_NE(cfd, nullptr); + + Version* const current = cfd->current(); + EXPECT_NE(current, nullptr); + + const VersionStorageInfo* const storage_info = current->storage_info(); + EXPECT_NE(storage_info, nullptr); + + const auto& blob_files = storage_info->GetBlobFiles(); + return blob_files; + } + + std::vector GetFlushedFiles() { + std::lock_guard lock(mutex_); + std::vector result; + for (const auto& fname : flushed_files_) { + result.push_back(fname); + } + return result; + } + + void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { + call_count_++; + EXPECT_FALSE(info.blob_file_addition_infos.empty()); + const auto& blob_files = GetBlobFiles(); + { + std::lock_guard lock(mutex_); + flushed_files_.push_back(info.file_path); + } + EXPECT_EQ(info.blob_compression_type, kNoCompression); + + for (const auto& blob_file_addition_info : info.blob_file_addition_infos) { + const auto meta = GetBlobFileMetaData( + blob_files, blob_file_addition_info.blob_file_number); + EXPECT_EQ(meta->GetBlobFileNumber(), + blob_file_addition_info.blob_file_number); + EXPECT_EQ(meta->GetTotalBlobBytes(), + blob_file_addition_info.total_blob_bytes); + EXPECT_EQ(meta->GetTotalBlobCount(), + blob_file_addition_info.total_blob_count); + EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty()); + } + } + + void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override { + call_count_++; + EXPECT_FALSE(ci.blob_file_garbage_infos.empty()); + const auto& blob_files = GetBlobFiles(); + EXPECT_EQ(ci.blob_compression_type, kNoCompression); + + for (const auto& blob_file_addition_info : ci.blob_file_addition_infos) { + const auto meta = GetBlobFileMetaData( + blob_files, blob_file_addition_info.blob_file_number); + EXPECT_EQ(meta->GetBlobFileNumber(), + blob_file_addition_info.blob_file_number); + EXPECT_EQ(meta->GetTotalBlobBytes(), + blob_file_addition_info.total_blob_bytes); + EXPECT_EQ(meta->GetTotalBlobCount(), + blob_file_addition_info.total_blob_count); + EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty()); + } + + for (const auto& blob_file_garbage_info : ci.blob_file_garbage_infos) { + EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U); + EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty()); + } + } + + EventListenerTest* test_; + uint32_t call_count_; + + private: + std::vector flushed_files_; + std::mutex mutex_; +}; + +// Test OnFlushCompleted EventListener called for blob files +TEST_F(EventListenerTest, BlobDBOnFlushCompleted) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + + options.min_blob_size = 0; + BlobDBJobLevelEventListenerTest* blob_event_listener = + new BlobDBJobLevelEventListenerTest(this); + options.listeners.emplace_back(blob_event_listener); + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Flush()); + + ASSERT_EQ(Get("Key1"), "blob_value1"); + ASSERT_EQ(Get("Key2"), "blob_value2"); + ASSERT_EQ(Get("Key3"), "blob_value3"); + + ASSERT_GT(blob_event_listener->call_count_, 0U); +} + +// Test OnCompactionCompleted EventListener called for blob files +TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.min_blob_size = 0; + BlobDBJobLevelEventListenerTest* blob_event_listener = + new BlobDBJobLevelEventListenerTest(this); + options.listeners.emplace_back(blob_event_listener); + + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + blob_event_listener->call_count_ = 0; + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + // On compaction, because of blob_garbage_collection_age_cutoff, it will + // delete the oldest blob file and create new blob file during compaction. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + // Make sure, OnCompactionCompleted is called. + ASSERT_GT(blob_event_listener->call_count_, 0U); +} + +// Test CompactFiles calls OnCompactionCompleted EventListener for blob files +// and populate the blob files info. +TEST_F(EventListenerTest, BlobDBCompactFiles) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.min_blob_size = 0; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + BlobDBJobLevelEventListenerTest* blob_event_listener = + new BlobDBJobLevelEventListenerTest(this); + options.listeners.emplace_back(blob_event_listener); + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + std::vector output_file_names; + CompactionJobInfo compaction_job_info; + + // On compaction, because of blob_garbage_collection_age_cutoff, it will + // delete the oldest blob file and create new blob file during compaction + // which will be populated in output_files_names. + ASSERT_OK(dbfull()->CompactFiles( + CompactionOptions(), blob_event_listener->GetFlushedFiles(), 1, -1, + &output_file_names, &compaction_job_info)); + + bool is_blob_in_output = false; + for (const auto& file : output_file_names) { + if (EndsWith(file, ".blob")) { + is_blob_in_output = true; + } + } + ASSERT_TRUE(is_blob_in_output); + + for (const auto& blob_file_addition_info : + compaction_job_info.blob_file_addition_infos) { + EXPECT_GT(blob_file_addition_info.blob_file_number, 0U); + EXPECT_GT(blob_file_addition_info.total_blob_bytes, 0U); + EXPECT_GT(blob_file_addition_info.total_blob_count, 0U); + EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty()); + } + + for (const auto& blob_file_garbage_info : + compaction_job_info.blob_file_garbage_infos) { + EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U); + EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U); + EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty()); + } +} + +class BlobDBFileLevelEventListener : public EventListener { + public: + BlobDBFileLevelEventListener() { + files_started_.store(0); + files_created_.store(0); + files_deleted_.store(0); + } + + void OnBlobFileCreationStarted( + const BlobFileCreationBriefInfo& info) override { + files_started_++; + EXPECT_FALSE(info.db_name.empty()); + EXPECT_FALSE(info.cf_name.empty()); + EXPECT_FALSE(info.file_path.empty()); + EXPECT_GT(info.job_id, 0); + } + + void OnBlobFileCreated(const BlobFileCreationInfo& info) override { + files_created_++; + EXPECT_FALSE(info.db_name.empty()); + EXPECT_FALSE(info.cf_name.empty()); + EXPECT_FALSE(info.file_path.empty()); + EXPECT_GT(info.job_id, 0); + EXPECT_GT(info.total_blob_count, 0U); + EXPECT_GT(info.total_blob_bytes, 0U); + EXPECT_EQ(info.file_checksum, kUnknownFileChecksum); + EXPECT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName); + EXPECT_TRUE(info.status.ok()); + } + + void OnBlobFileDeleted(const BlobFileDeletionInfo& info) override { + files_deleted_++; + EXPECT_FALSE(info.db_name.empty()); + EXPECT_FALSE(info.file_path.empty()); + EXPECT_GT(info.job_id, 0); + EXPECT_TRUE(info.status.ok()); + } + + void CheckCounters() { + EXPECT_EQ(files_started_, files_created_); + EXPECT_GT(files_started_, 0U); + EXPECT_GT(files_deleted_, 0U); + EXPECT_LT(files_deleted_, files_created_); + } + + private: + std::atomic files_started_; + std::atomic files_created_; + std::atomic files_deleted_; +}; + +TEST_F(EventListenerTest, BlobDBFileTest) { + Options options; + options.env = CurrentOptions().env; + options.enable_blob_files = true; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.min_blob_size = 0; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + BlobDBFileLevelEventListener* blob_event_listener = + new BlobDBFileLevelEventListener(); + options.listeners.emplace_back(blob_event_listener); + + DestroyAndReopen(options); + + ASSERT_OK(Put("Key1", "blob_value1")); + ASSERT_OK(Put("Key2", "blob_value2")); + ASSERT_OK(Put("Key3", "blob_value3")); + ASSERT_OK(Put("Key4", "blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key3", "new_blob_value3")); + ASSERT_OK(Put("Key4", "new_blob_value4")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put("Key5", "blob_value5")); + ASSERT_OK(Put("Key6", "blob_value6")); + ASSERT_OK(Flush()); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + // On compaction, because of blob_garbage_collection_age_cutoff, it will + // delete the oldest blob file and create new blob file during compaction. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + blob_event_listener->CheckCounters(); +} + } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/db/repair.cc b/db/repair.cc index 3efe63dfc..42b874f45 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -450,8 +450,9 @@ class Repairer { std::move(range_del_iters), &meta, nullptr /* blob_file_additions */, {}, kMaxSequenceNumber, snapshot_checker, false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s, - nullptr /*IOTracer*/, nullptr /* event_logger */, 0 /* job_id */, - Env::IO_HIGH, nullptr /* table_properties */, write_hint); + nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery, + nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, + nullptr /* table_properties */, write_hint); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index a70836a27..c79de186d 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -29,8 +29,16 @@ class ColumnFamilyHandle; class Status; struct CompactionJobStats; -struct TableFileCreationBriefInfo { - // the name of the database where the file was created +struct FileCreationBriefInfo { + FileCreationBriefInfo() = default; + FileCreationBriefInfo(const std::string& _db_name, + const std::string& _cf_name, + const std::string& _file_path, int _job_id) + : db_name(_db_name), + cf_name(_cf_name), + file_path(_file_path), + job_id(_job_id) {} + // the name of the database where the file was created. std::string db_name; // the name of the column family where the file was created. std::string cf_name; @@ -38,7 +46,10 @@ struct TableFileCreationBriefInfo { std::string file_path; // the id of the job (which could be flush or compaction) that // created the file. - int job_id; + int job_id = 0; +}; + +struct TableFileCreationBriefInfo : public FileCreationBriefInfo { // reason of creating the table. TableFileCreationReason reason; }; @@ -59,6 +70,44 @@ struct TableFileCreationInfo : public TableFileCreationBriefInfo { std::string file_checksum_func_name; }; +struct BlobFileCreationBriefInfo : public FileCreationBriefInfo { + BlobFileCreationBriefInfo(const std::string& _db_name, + const std::string& _cf_name, + const std::string& _file_path, int _job_id, + BlobFileCreationReason _reason) + : FileCreationBriefInfo(_db_name, _cf_name, _file_path, _job_id), + reason(_reason) {} + // reason of creating the blob file. + BlobFileCreationReason reason; +}; + +struct BlobFileCreationInfo : public BlobFileCreationBriefInfo { + BlobFileCreationInfo(const std::string& _db_name, const std::string& _cf_name, + const std::string& _file_path, int _job_id, + BlobFileCreationReason _reason, + uint64_t _total_blob_count, uint64_t _total_blob_bytes, + Status _status, const std::string& _file_checksum, + const std::string& _file_checksum_func_name) + : BlobFileCreationBriefInfo(_db_name, _cf_name, _file_path, _job_id, + _reason), + total_blob_count(_total_blob_count), + total_blob_bytes(_total_blob_bytes), + status(_status), + file_checksum(_file_checksum), + file_checksum_func_name(_file_checksum_func_name) {} + + // the number of blob in a file. + uint64_t total_blob_count; + // the total bytes in a file. + uint64_t total_blob_bytes; + // The status indicating whether the creation was successful or not. + Status status; + // The checksum of the blob file being created. + std::string file_checksum; + // The checksum function name of checksum generator used for this blob file. + std::string file_checksum_func_name; +}; + enum class CompactionReason : int { kUnknown = 0, // [Level] number of L0 files > level0_file_num_compaction_trigger @@ -150,17 +199,34 @@ struct WriteStallInfo { #ifndef ROCKSDB_LITE -struct TableFileDeletionInfo { +struct FileDeletionInfo { + FileDeletionInfo() = default; + + FileDeletionInfo(const std::string& _db_name, const std::string& _file_path, + int _job_id, Status _status) + : db_name(_db_name), + file_path(_file_path), + job_id(_job_id), + status(_status) {} // The name of the database where the file was deleted. std::string db_name; // The path to the deleted file. std::string file_path; // The id of the job which deleted the file. - int job_id; + int job_id = 0; // The status indicating whether the deletion was successful or not. Status status; }; +struct TableFileDeletionInfo : public FileDeletionInfo {}; + +struct BlobFileDeletionInfo : public FileDeletionInfo { + BlobFileDeletionInfo(const std::string& _db_name, + const std::string& _file_path, int _job_id, + Status _status) + : FileDeletionInfo(_db_name, _file_path, _job_id, _status) {} +}; + enum class FileOperationType { kRead, kWrite, @@ -206,6 +272,39 @@ struct FileOperationInfo { } }; +struct BlobFileInfo { + BlobFileInfo(const std::string& _blob_file_path, + const uint64_t _blob_file_number) + : blob_file_path(_blob_file_path), blob_file_number(_blob_file_number) {} + + std::string blob_file_path; + uint64_t blob_file_number; +}; + +struct BlobFileAdditionInfo : public BlobFileInfo { + BlobFileAdditionInfo(const std::string& _blob_file_path, + const uint64_t _blob_file_number, + const uint64_t _total_blob_count, + const uint64_t _total_blob_bytes) + : BlobFileInfo(_blob_file_path, _blob_file_number), + total_blob_count(_total_blob_count), + total_blob_bytes(_total_blob_bytes) {} + uint64_t total_blob_count; + uint64_t total_blob_bytes; +}; + +struct BlobFileGarbageInfo : public BlobFileInfo { + BlobFileGarbageInfo(const std::string& _blob_file_path, + const uint64_t _blob_file_number, + const uint64_t _garbage_blob_count, + const uint64_t _garbage_blob_bytes) + : BlobFileInfo(_blob_file_path, _blob_file_number), + garbage_blob_count(_garbage_blob_count), + garbage_blob_bytes(_garbage_blob_bytes) {} + uint64_t garbage_blob_count; + uint64_t garbage_blob_bytes; +}; + struct FlushJobInfo { // the id of the column family uint32_t cf_id; @@ -239,6 +338,12 @@ struct FlushJobInfo { TableProperties table_properties; FlushReason flush_reason; + + // Compression algorithm used for blob output files + CompressionType blob_compression_type; + + // Information about blob files created during flush in Integrated BlobDB. + std::vector blob_file_addition_infos; }; struct CompactionFileInfo { @@ -299,6 +404,17 @@ struct CompactionJobInfo { // Statistics and other additional details on the compaction CompactionJobStats stats; + + // Compression algorithm used for blob output files. + CompressionType blob_compression_type; + + // Information about blob files created during compaction in Integrated + // BlobDB. + std::vector blob_file_addition_infos; + + // Information about blob files deleted during compaction in Integrated + // BlobDB. + std::vector blob_file_garbage_infos; }; struct MemTableInfo { @@ -555,6 +671,34 @@ class EventListener : public Customizable { // initiate any further recovery actions needed virtual void OnErrorRecoveryCompleted(Status /* old_bg_error */) {} + // A callback function for RocksDB which will be called before + // a blob file is being created. It will follow by OnBlobFileCreated after + // the creation finishes. + // + // Note that if applications would like to use the passed reference + // outside this function call, they should make copies from these + // returned value. + virtual void OnBlobFileCreationStarted( + const BlobFileCreationBriefInfo& /*info*/) {} + + // A callback function for RocksDB which will be called whenever + // a blob file is created. + // It will be called whether the file is successfully created or not. User can + // check info.status to see if it succeeded or not. + // + // Note that if applications would like to use the passed reference + // outside this function call, they should make copies from these + // returned value. + virtual void OnBlobFileCreated(const BlobFileCreationInfo& /*info*/) {} + + // A callback function for RocksDB which will be called whenever + // a blob file is deleted. + // + // Note that if applications would like to use the passed reference + // outside this function call, they should make copies from these + // returned value. + virtual void OnBlobFileDeleted(const BlobFileDeletionInfo& /*info*/) {} + virtual ~EventListener() {} }; diff --git a/include/rocksdb/types.h b/include/rocksdb/types.h index 14dc39ea3..20cff59b8 100644 --- a/include/rocksdb/types.h +++ b/include/rocksdb/types.h @@ -26,6 +26,12 @@ enum class TableFileCreationReason { kMisc, }; +enum class BlobFileCreationReason { + kFlush, + kCompaction, + kRecovery, +}; + // The types of files RocksDB uses in a DB directory. (Available for // advanced options.) enum FileType {