From a92049e3e7c3c14e9c246ecfeaeb19ba175f0d99 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 29 Apr 2016 11:35:00 -0700 Subject: [PATCH] Added EventListener::OnTableFileCreationStarted() callback Summary: Added EventListener::OnTableFileCreationStarted. EventListener::OnTableFileCreated will be called on failure case. User can check creation status via TableFileCreationInfo::status. Test Plan: unit test. Reviewers: dhruba, yhchiang, ott, sdong Reviewed By: sdong Subscribers: sdong, kradhakrishnan, IslamAbdelRahman, andrewkr, yhchiang, leveldb, ott, dhruba Differential Revision: https://reviews.facebook.net/D56337 --- HISTORY.md | 1 + db/builder.cc | 25 ++++++- db/builder.h | 11 +-- db/compaction_job.cc | 40 ++++++----- db/db_impl.cc | 17 +---- db/db_test.cc | 3 +- db/event_helpers.cc | 108 +++++++++++++++++----------- db/event_helpers.h | 19 ++++- db/flush_job.cc | 18 +---- db/listener_test.cc | 141 +++++++++++++++++++++++++++++++++++++ db/repair.cc | 2 +- include/rocksdb/listener.h | 39 ++++++++-- 12 files changed, 317 insertions(+), 107 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 261ce4ef4..0eaac9a45 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ * Allow preset compression dictionary for improved compression of block-based tables. This is supported for zlib, zstd, and lz4. The compression dictionary's size is configurable via CompressionOptions::max_dict_bytes. * Delete deprecated classes for creating backups (BackupableDB) and restoring from backups (RestoreBackupableDB). Now, BackupEngine should be used for creating backups, and BackupEngineReadOnly should be used for restorations. For more details, see https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F * Expose estimate of per-level compression ratio via DB property: "rocksdb.compression-ratio-at-levelN". +* Added EventListener::OnTableFileCreationStarted. EventListener::OnTableFileCreated will be called on failure case. User can check creation status via TableFileCreationInfo::status. ## 4.7.0 (4/8/2016) ### Public API Change diff --git a/db/builder.cc b/db/builder.cc index ceb765862..a932fe6fd 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -15,6 +15,7 @@ #include "db/compaction_iterator.h" #include "db/dbformat.h" +#include "db/event_helpers.h" #include "db/filename.h" #include "db/internal_stats.h" #include "db/merge_helper.h" @@ -68,7 +69,8 @@ Status BuildTable( SequenceNumber earliest_write_conflict_snapshot, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, - InternalStats* internal_stats, const Env::IOPriority io_priority, + InternalStats* internal_stats, TableFileCreationReason reason, + EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, TableProperties* table_properties, int level) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == @@ -81,6 +83,12 @@ Status BuildTable( std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(), meta->fd.GetPathId()); +#ifndef ROCKSDB_LITE + EventHelpers::NotifyTableFileCreationStarted( + ioptions.listeners, dbname, column_family_name, fname, job_id, reason); +#endif // !ROCKSDB_LITE + TableProperties tp; + if (iter->Valid()) { TableBuilder* builder; unique_ptr file_writer; @@ -88,6 +96,9 @@ Status BuildTable( unique_ptr file; s = NewWritableFile(env, fname, &file, env_options); if (!s.ok()) { + EventHelpers::LogAndNotifyTableFileCreationFinished( + event_logger, ioptions.listeners, dbname, column_family_name, fname, + job_id, meta->fd, tp, reason, s); return s; } file->SetIOPriority(io_priority); @@ -135,11 +146,13 @@ Status BuildTable( } if (s.ok() && !empty) { - meta->fd.file_size = builder->FileSize(); + uint64_t file_size = builder->FileSize(); + meta->fd.file_size = file_size; meta->marked_for_compaction = builder->NeedCompact(); assert(meta->fd.GetFileSize() > 0); + tp = builder->GetTableProperties(); if (table_properties) { - *table_properties = builder->GetTableProperties(); + *table_properties = tp; } } delete builder; @@ -178,6 +191,12 @@ Status BuildTable( if (!s.ok() || meta->fd.GetFileSize() == 0) { env->DeleteFile(fname); } + + // Output to event logger and fire events. + EventHelpers::LogAndNotifyTableFileCreationFinished( + event_logger, ioptions.listeners, dbname, column_family_name, fname, + job_id, meta->fd, tp, reason, s); + return s; } diff --git a/db/builder.h b/db/builder.h index 01353ed8d..7ef1a29d8 100644 --- a/db/builder.h +++ b/db/builder.h @@ -12,11 +12,13 @@ #include "db/table_properties_collector.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" -#include "rocksdb/status.h" -#include "rocksdb/types.h" -#include "rocksdb/options.h" #include "rocksdb/immutable_options.h" +#include "rocksdb/listener.h" +#include "rocksdb/options.h" +#include "rocksdb/status.h" #include "rocksdb/table_properties.h" +#include "rocksdb/types.h" +#include "util/event_logger.h" namespace rocksdb { @@ -69,7 +71,8 @@ extern Status BuildTable( SequenceNumber earliest_write_conflict_snapshot, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, - InternalStats* internal_stats, + InternalStats* internal_stats, TableFileCreationReason reason, + EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, TableProperties* table_properties = nullptr, int level = -1); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index f8f471b58..90f155869 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -953,9 +953,10 @@ Status CompactionJob::FinishCompactionOutputFile( } sub_compact->outfile.reset(); + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); + TableProperties tp; if (s.ok() && current_entries > 0) { // Verify that the table is usable - ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); InternalIterator* iter = cfd->table_cache()->NewIterator( ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd, nullptr, cfd->internal_stats()->GetFileReadHist( @@ -969,34 +970,30 @@ Status CompactionJob::FinishCompactionOutputFile( } delete iter; + + // Output to event logger and fire events. if (s.ok()) { - auto tp = sub_compact->builder->GetTableProperties(); + tp = sub_compact->builder->GetTableProperties(); sub_compact->current_output()->table_properties = std::make_shared(tp); - TableFileCreationInfo info(std::move(tp)); - info.db_name = dbname_; - info.cf_name = cfd->GetName(); - info.file_path = - TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(), - meta->fd.GetPathId()); - info.file_size = meta->fd.GetFileSize(); - info.job_id = job_id_; Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 " keys, %" PRIu64 " bytes%s", cfd->GetName().c_str(), job_id_, output_number, current_entries, current_bytes, meta->marked_for_compaction ? " (need compaction)" : ""); - EventHelpers::LogAndNotifyTableFileCreation( - event_logger_, cfd->ioptions()->listeners, meta->fd, info); } } + std::string fname = TableFileName(db_options_.db_paths, meta->fd.GetNumber(), + meta->fd.GetPathId()); + EventHelpers::LogAndNotifyTableFileCreationFinished( + event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, + job_id_, meta->fd, tp, TableFileCreationReason::kCompaction, s); // Report new file to SstFileManagerImpl auto sfm = static_cast(db_options_.sst_file_manager.get()); if (sfm && meta->fd.GetPathId() == 0) { - ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(), meta->fd.GetPathId()); sfm->OnAddFile(fn); @@ -1072,10 +1069,17 @@ Status CompactionJob::OpenCompactionOutputFile( assert(sub_compact->builder == nullptr); // no need to lock because VersionSet::next_file_number_ is atomic uint64_t file_number = versions_->NewFileNumber(); - // Make the output file - unique_ptr writable_file; std::string fname = TableFileName(db_options_.db_paths, file_number, sub_compact->compaction->output_path_id()); + // Fire events. + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); +#ifndef ROCKSDB_LITE + EventHelpers::NotifyTableFileCreationStarted( + cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_, + TableFileCreationReason::kCompaction); +#endif // !ROCKSDB_LITE + // Make the output file + unique_ptr writable_file; Status s = NewWritableFile(env_, fname, &writable_file, env_options_); if (!s.ok()) { Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, @@ -1084,8 +1088,13 @@ Status CompactionJob::OpenCompactionOutputFile( sub_compact->compaction->column_family_data()->GetName().c_str(), job_id_, file_number, s.ToString().c_str()); LogFlush(db_options_.info_log); + EventHelpers::LogAndNotifyTableFileCreationFinished( + event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), + fname, job_id_, FileDescriptor(), TableProperties(), + TableFileCreationReason::kCompaction, s); return s; } + SubcompactionState::Output out; out.meta.fd = FileDescriptor(file_number, sub_compact->compaction->output_path_id(), 0); @@ -1098,7 +1107,6 @@ Status CompactionJob::OpenCompactionOutputFile( sub_compact->outfile.reset( new WritableFileWriter(std::move(writable_file), env_options_)); - ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); // If the Column family flag is to only optimize filters for hits, // we can skip creating filters if this is the bottommost_level where // data is going to be found diff --git a/db/db_impl.cc b/db/db_impl.cc index b93166df3..fdb924dea 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1534,7 +1534,6 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, cfd->GetLatestMutableCFOptions()->paranoid_file_checks; { mutex_.Unlock(); - TableFileCreationInfo info; SequenceNumber earliest_write_conflict_snapshot; std::vector snapshot_seqs = @@ -1547,26 +1546,14 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, snapshot_seqs, earliest_write_conflict_snapshot, GetCompressionFlush(*cfd->ioptions()), cfd->ioptions()->compression_opts, paranoid_file_checks, - cfd->internal_stats(), Env::IO_HIGH, &info.table_properties); + cfd->internal_stats(), TableFileCreationReason::kRecovery, + &event_logger_, job_id); LogFlush(db_options_.info_log); Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s", cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), s.ToString().c_str()); - - // output to event logger - if (s.ok()) { - info.db_name = dbname_; - info.cf_name = cfd->GetName(); - info.file_path = TableFileName(db_options_.db_paths, - meta.fd.GetNumber(), - meta.fd.GetPathId()); - info.file_size = meta.fd.GetFileSize(); - info.job_id = job_id; - EventHelpers::LogAndNotifyTableFileCreation( - &event_logger_, db_options_.listeners, meta.fd, info); - } mutex_.Lock(); } } diff --git a/db/db_test.cc b/db/db_test.cc index bf82b5a28..4d466c1e1 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4049,8 +4049,7 @@ TEST_F(DBTest, ThreadStatusFlush) { rocksdb::SyncPoint::GetInstance()->LoadDependency({ {"FlushJob::FlushJob()", "DBTest::ThreadStatusFlush:1"}, - {"DBTest::ThreadStatusFlush:2", - "FlushJob::LogAndNotifyTableFileCreation()"}, + {"DBTest::ThreadStatusFlush:2", "FlushJob::WriteLevel0Table"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 1a591dc91..9249837c2 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -20,57 +20,83 @@ void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) { std::chrono::system_clock::now().time_since_epoch()).count(); } -void EventHelpers::LogAndNotifyTableFileCreation( +#ifndef ROCKSDB_LITE +void EventHelpers::NotifyTableFileCreationStarted( + const std::vector>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, TableFileCreationReason reason) { + TableFileCreationBriefInfo info; + info.db_name = db_name; + info.cf_name = cf_name; + info.file_path = file_path; + info.job_id = job_id; + info.reason = reason; + for (auto& listener : listeners) { + listener->OnTableFileCreationStarted(info); + } +} +#endif // !ROCKSDB_LITE + +void EventHelpers::LogAndNotifyTableFileCreationFinished( EventLogger* event_logger, const std::vector>& listeners, - const FileDescriptor& fd, const TableFileCreationInfo& info) { - assert(event_logger); - JSONWriter jwriter; - AppendCurrentTime(&jwriter); - jwriter << "cf_name" << info.cf_name - << "job" << info.job_id - << "event" << "table_file_creation" - << "file_number" << fd.GetNumber() - << "file_size" << fd.GetFileSize(); - - // table_properties - { - jwriter << "table_properties"; - jwriter.StartObject(); - - // basic properties: - jwriter << "data_size" << info.table_properties.data_size - << "index_size" << info.table_properties.index_size - << "filter_size" << info.table_properties.filter_size - << "raw_key_size" << info.table_properties.raw_key_size - << "raw_average_key_size" << SafeDivide( - info.table_properties.raw_key_size, - info.table_properties.num_entries) - << "raw_value_size" << info.table_properties.raw_value_size - << "raw_average_value_size" << SafeDivide( - info.table_properties.raw_value_size, - info.table_properties.num_entries) - << "num_data_blocks" << info.table_properties.num_data_blocks - << "num_entries" << info.table_properties.num_entries - << "filter_policy_name" << - info.table_properties.filter_policy_name; - - // user collected properties - for (const auto& prop : info.table_properties.readable_properties) { - jwriter << prop.first << prop.second; + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, const FileDescriptor& fd, + const TableProperties& table_properties, TableFileCreationReason reason, + const Status& s) { + if (s.ok() && event_logger) { + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + jwriter << "cf_name" << cf_name << "job" << job_id << "event" + << "table_file_creation" + << "file_number" << fd.GetNumber() << "file_size" + << fd.GetFileSize(); + + // table_properties + { + jwriter << "table_properties"; + jwriter.StartObject(); + + // basic properties: + jwriter << "data_size" << table_properties.data_size << "index_size" + << table_properties.index_size << "filter_size" + << table_properties.filter_size << "raw_key_size" + << table_properties.raw_key_size << "raw_average_key_size" + << SafeDivide(table_properties.raw_key_size, + table_properties.num_entries) + << "raw_value_size" << table_properties.raw_value_size + << "raw_average_value_size" + << SafeDivide(table_properties.raw_value_size, + table_properties.num_entries) + << "num_data_blocks" << table_properties.num_data_blocks + << "num_entries" << table_properties.num_entries + << "filter_policy_name" << table_properties.filter_policy_name; + + // user collected properties + for (const auto& prop : table_properties.readable_properties) { + jwriter << prop.first << prop.second; + } + jwriter.EndObject(); } jwriter.EndObject(); - } - jwriter.EndObject(); - event_logger->Log(jwriter); + event_logger->Log(jwriter); + } #ifndef ROCKSDB_LITE if (listeners.size() == 0) { return; } - - for (auto listener : listeners) { + TableFileCreationInfo info; + info.db_name = db_name; + info.cf_name = cf_name; + info.file_path = file_path; + info.file_size = fd.file_size; + info.job_id = job_id; + info.table_properties = table_properties; + info.reason = reason; + info.status = s; + for (auto& listener : listeners) { listener->OnTableFileCreated(info); } #endif // !ROCKSDB_LITE diff --git a/db/event_helpers.h b/db/event_helpers.h index a36010e16..e9c111f20 100644 --- a/db/event_helpers.h +++ b/db/event_helpers.h @@ -19,15 +19,30 @@ namespace rocksdb { class EventHelpers { public: static void AppendCurrentTime(JSONWriter* json_writer); - static void LogAndNotifyTableFileCreation( +#ifndef ROCKSDB_LITE + static void NotifyTableFileCreationStarted( + const std::vector>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, TableFileCreationReason reason); +#endif // !ROCKSDB_LITE + static void LogAndNotifyTableFileCreationFinished( EventLogger* event_logger, const std::vector>& listeners, - const FileDescriptor& fd, const TableFileCreationInfo& info); + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, const FileDescriptor& fd, + const TableProperties& table_properties, TableFileCreationReason reason, + const Status& s); static void LogAndNotifyTableFileDeletion( EventLogger* event_logger, int job_id, uint64_t file_number, const std::string& file_path, const Status& status, const std::string& db_name, const std::vector>& listeners); + + private: + static void LogAndNotifyTableFileCreation( + EventLogger* event_logger, + const std::vector>& listeners, + const FileDescriptor& fd, const TableFileCreationInfo& info); }; } // namespace rocksdb diff --git a/db/flush_job.cc b/db/flush_job.cc index ad0427367..949c02622 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -253,7 +253,6 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, << total_num_deletes << "memory_usage" << total_memory_usage; - TableFileCreationInfo info; { ScopedArenaIterator iter( NewMergingIterator(&cfd_->internal_comparator(), &memtables[0], @@ -272,8 +271,8 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, earliest_write_conflict_snapshot_, output_compression_, cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), + TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */); - info.table_properties = table_properties_; LogFlush(db_options_.info_log); } Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, @@ -284,21 +283,6 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, meta->fd.GetFileSize(), s.ToString().c_str(), meta->marked_for_compaction ? " (needs compaction)" : ""); - // output to event logger - if (s.ok()) { - info.db_name = dbname_; - info.cf_name = cfd_->GetName(); - info.file_path = TableFileName(db_options_.db_paths, - meta->fd.GetNumber(), - meta->fd.GetPathId()); - info.file_size = meta->fd.GetFileSize(); - info.job_id = job_context_->job_id; - EventHelpers::LogAndNotifyTableFileCreation( - event_logger_, db_options_.listeners, - meta->fd, info); - TEST_SYNC_POINT("FlushJob::LogAndNotifyTableFileCreation()"); - } - if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { output_file_directory_->Fsync(); } diff --git a/db/listener_test.cc b/db/listener_test.cc index 26a5cf101..c9e1589a4 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -582,6 +582,147 @@ TEST_F(EventListenerTest, CompactionReasonFIFO) { ASSERT_EQ(compaction_reason, CompactionReason::kFIFOMaxSize); } } + +class TableFileCreationListener : public EventListener { + public: + class TestEnv : public EnvWrapper { + public: + TestEnv() : EnvWrapper(Env::Default()) {} + + void SetStatus(Status s) { status_ = s; } + + Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) { + if (fname.size() > 4 && fname.substr(fname.size() - 4) == ".sst") { + if (!status_.ok()) { + return status_; + } + } + return Env::Default()->NewWritableFile(fname, result, options); + } + + private: + Status status_; + }; + + TableFileCreationListener() { + for (int i = 0; i < 2; i++) { + started_[i] = finished_[i] = failure_[i] = 0; + } + } + + int Index(TableFileCreationReason reason) { + int idx; + switch (reason) { + case TableFileCreationReason::kFlush: + idx = 0; + break; + case TableFileCreationReason::kCompaction: + idx = 1; + break; + default: + idx = -1; + } + return idx; + } + + void CheckAndResetCounters(int flush_started, int flush_finished, + int flush_failure, int compaction_started, + int compaction_finished, int compaction_failure) { + ASSERT_EQ(started_[0], flush_started); + ASSERT_EQ(finished_[0], flush_finished); + ASSERT_EQ(failure_[0], flush_failure); + ASSERT_EQ(started_[1], compaction_started); + ASSERT_EQ(finished_[1], compaction_finished); + ASSERT_EQ(failure_[1], compaction_failure); + for (int i = 0; i < 2; i++) { + started_[i] = finished_[i] = failure_[i] = 0; + } + } + + void OnTableFileCreationStarted( + const TableFileCreationBriefInfo& info) override { + int idx = Index(info.reason); + if (idx >= 0) { + started_[idx]++; + } + ASSERT_GT(info.db_name.size(), 0U); + ASSERT_GT(info.cf_name.size(), 0U); + ASSERT_GT(info.file_path.size(), 0U); + ASSERT_GT(info.job_id, 0); + } + + void OnTableFileCreated(const TableFileCreationInfo& info) override { + int idx = Index(info.reason); + if (idx >= 0) { + finished_[idx]++; + } + ASSERT_GT(info.db_name.size(), 0U); + ASSERT_GT(info.cf_name.size(), 0U); + ASSERT_GT(info.file_path.size(), 0U); + ASSERT_GT(info.job_id, 0); + if (info.status.ok()) { + ASSERT_GT(info.table_properties.data_size, 0U); + ASSERT_GT(info.table_properties.raw_key_size, 0U); + ASSERT_GT(info.table_properties.raw_value_size, 0U); + ASSERT_GT(info.table_properties.num_data_blocks, 0U); + ASSERT_GT(info.table_properties.num_entries, 0U); + } else { + if (idx >= 0) { + failure_[idx]++; + } + } + } + + TestEnv test_env; + int started_[2]; + int finished_[2]; + int failure_[2]; +}; + +TEST_F(EventListenerTest, TableFileCreationListenersTest) { + auto listener = std::make_shared(); + Options options; + options.create_if_missing = true; + options.listeners.push_back(listener); + options.env = &listener->test_env; + DestroyAndReopen(options); + + ASSERT_OK(Put("foo", "aaa")); + ASSERT_OK(Put("bar", "bbb")); + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForFlushMemTable(); + listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0); + + ASSERT_OK(Put("foo", "aaa1")); + ASSERT_OK(Put("bar", "bbb1")); + listener->test_env.SetStatus(Status::NotSupported("not supported")); + ASSERT_NOK(Flush()); + listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0); + listener->test_env.SetStatus(Status::OK()); + + Reopen(options); + ASSERT_OK(Put("foo", "aaa2")); + ASSERT_OK(Put("bar", "bbb2")); + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForFlushMemTable(); + listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0); + + const Slice kRangeStart = "a"; + const Slice kRangeEnd = "z"; + dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd); + dbfull()->TEST_WaitForCompact(); + listener->CheckAndResetCounters(0, 0, 0, 1, 1, 0); + + ASSERT_OK(Put("foo", "aaa3")); + ASSERT_OK(Put("bar", "bbb3")); + ASSERT_OK(Flush()); + listener->test_env.SetStatus(Status::NotSupported("not supported")); + dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd); + dbfull()->TEST_WaitForCompact(); + listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1); +} } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/repair.cc b/db/repair.cc index 68fef0783..1dcffe7e1 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -297,7 +297,7 @@ class Repairer { TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, std::string() /* column_family_name */, {}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false, - nullptr /* internal_stats */); + nullptr /* internal_stats */, TableFileCreationReason::kRecovery); } delete mem->Unref(); delete cf_mems_default; diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 3d54f2788..6b228b5c9 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -21,23 +21,36 @@ class DB; class Status; struct CompactionJobStats; -struct TableFileCreationInfo { - TableFileCreationInfo() = default; - explicit TableFileCreationInfo(TableProperties&& prop) : - table_properties(prop) {} +enum class TableFileCreationReason { + kFlush, + kCompaction, + kRecovery, +}; + +struct TableFileCreationBriefInfo { // the name of the database where the file was created std::string db_name; // the name of the column family where the file was created. std::string cf_name; // the path to the created file. std::string file_path; - // the size of the file. - uint64_t file_size; // the id of the job (which could be flush or compaction) that // created the file. int job_id; + // reason of creating the table. + TableFileCreationReason reason; +}; + +struct TableFileCreationInfo : public TableFileCreationBriefInfo { + TableFileCreationInfo() = default; + explicit TableFileCreationInfo(TableProperties&& prop) + : table_properties(prop) {} + // the size of the file. + uint64_t file_size; // Detailed properties of the created file. TableProperties table_properties; + // The status indicating whether the creation was successful or not. + Status status; }; enum class CompactionReason { @@ -212,11 +225,25 @@ class EventListener { // on file creations and deletions is suggested to implement // OnFlushCompleted and OnCompactionCompleted. // + // Historically it will only be called if the file is successfully created. + // Now it will also be called on failure case. User can check info.status + // to see if it succeeded or not. + // // Note that if applications would like to use the passed reference // outside this function call, they should make copies from these // returned value. virtual void OnTableFileCreated(const TableFileCreationInfo& /*info*/) {} + // A call-back function for RocksDB which will be called before + // a SST file is being created. It will follow by OnTableFileCreated after + // the creation finishes. + // + // Note that if applications would like to use the passed reference + // outside this function call, they should make copies from these + // returned value. + virtual void OnTableFileCreationStarted( + const TableFileCreationBriefInfo& /*info*/) {} + virtual ~EventListener() {} };