diff --git a/HISTORY.md b/HISTORY.md index 9897744a7..2d8da1a65 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ * Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info. ### Public API changes +* EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters. * DB::GetDbIdentity() is now a const function. If this function is overridden in your application, be sure to also make GetDbIdentity() const to avoid compile error. * Move listeners from ColumnFamilyOptions to DBOptions. * Add max_write_buffer_number_to_maintain option diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index b7255c2e9..3208b264b 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -31,12 +31,9 @@ class FlushedFileCollector : public EventListener { ~FlushedFileCollector() {} virtual void OnFlushCompleted( - DB* db, const std::string& column_family_name, - const std::string& file_path, - bool triggered_writes_slowdown, - bool triggered_writes_stop) { + DB* db, const FlushJobInfo& info) override { std::lock_guard lock(mutex_); - flushed_files_.push_back(file_path); + flushed_files_.push_back(info.file_path); } std::vector GetFlushedFiles() { diff --git a/db/db_impl.cc b/db/db_impl.cc index 39e6fb4d2..fb70ff0b0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1276,7 +1276,8 @@ Status DBImpl::FlushMemTableToOutputFile( #ifndef ROCKSDB_LITE if (s.ok()) { // may temporarily unlock and lock the mutex. - NotifyOnFlushCompleted(cfd, file_number, mutable_cf_options); + NotifyOnFlushCompleted(cfd, file_number, mutable_cf_options, + job_context->job_id); } #endif // ROCKSDB_LITE return s; @@ -1284,7 +1285,7 @@ Status DBImpl::FlushMemTableToOutputFile( void DBImpl::NotifyOnFlushCompleted( ColumnFamilyData* cfd, uint64_t file_number, - const MutableCFOptions& mutable_cf_options) { + const MutableCFOptions& mutable_cf_options, int job_id) { #ifndef ROCKSDB_LITE if (db_options_.listeners.size() == 0U) { return; @@ -1293,23 +1294,27 @@ void DBImpl::NotifyOnFlushCompleted( if (shutting_down_.load(std::memory_order_acquire)) { return; } - bool triggered_flush_slowdown = + bool triggered_writes_slowdown = (cfd->current()->storage_info()->NumLevelFiles(0) >= mutable_cf_options.level0_slowdown_writes_trigger); - bool triggered_flush_stop = + bool triggered_writes_stop = (cfd->current()->storage_info()->NumLevelFiles(0) >= mutable_cf_options.level0_stop_writes_trigger); // release lock while notifying events mutex_.Unlock(); { - // TODO(yhchiang): make db_paths dynamic. - auto file_path = MakeTableFileName(db_options_.db_paths[0].path, + FlushJobInfo info; + info.cf_name = cfd->GetName(); + // TODO(yhchiang): make db_paths dynamic in case flush does not + // go to L0 in the future. + info.file_path = MakeTableFileName(db_options_.db_paths[0].path, file_number); + info.thread_id = ThreadStatusUtil::GetThreadID(); + info.job_id = job_id; + info.triggered_writes_slowdown = triggered_writes_slowdown; + info.triggered_writes_stop = triggered_writes_stop; for (auto listener : db_options_.listeners) { - listener->OnFlushCompleted( - this, cfd->GetName(), file_path, - // Use path 0 as fulled memtables are first flushed into path 0. - triggered_flush_slowdown, triggered_flush_stop); + listener->OnFlushCompleted(this, info); } } mutex_.Lock(); diff --git a/db/db_impl.h b/db/db_impl.h index 88c653649..5ff349926 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -350,7 +350,8 @@ class DBImpl : public DB { SuperVersion* super_version, Arena* arena); void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number, - const MutableCFOptions& mutable_cf_options); + const MutableCFOptions& mutable_cf_options, + int job_id); void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, Compaction *c, const Status &st, diff --git a/db/listener_test.cc b/db/listener_test.cc index 94219381b..79db2235f 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -213,9 +213,12 @@ class TestFlushListener : public EventListener { public: void OnTableFileCreated( const TableFileCreationInfo& info) override { - db_name_ = info.db_name; - cf_name_ = info.cf_name; - file_path_ = info.file_path; + // remember the info for later checking the FlushJobInfo. + prev_fc_info_ = info; + ASSERT_GT(info.db_name.size(), 0U); + ASSERT_GT(info.cf_name.size(), 0U); + ASSERT_GT(info.file_path.size(), 0U); + ASSERT_GT(info.job_id, 0); ASSERT_GT(info.table_properties.data_size, 0U); ASSERT_GT(info.table_properties.raw_key_size, 0U); ASSERT_GT(info.table_properties.raw_value_size, 0U); @@ -224,32 +227,27 @@ class TestFlushListener : public EventListener { } void OnFlushCompleted( - DB* db, const std::string& cf_name, - const std::string& file_path, - bool triggered_writes_slowdown, - bool triggered_writes_stop) override { + DB* db, const FlushJobInfo& info) override { flushed_dbs_.push_back(db); - flushed_column_family_names_.push_back(cf_name); - if (triggered_writes_slowdown) { + flushed_column_family_names_.push_back(info.cf_name); + if (info.triggered_writes_slowdown) { slowdown_count++; } - if (triggered_writes_stop) { + if (info.triggered_writes_stop) { stop_count++; } - // verify the file created matches the flushed file. - ASSERT_EQ(db_name_, db->GetName()); - ASSERT_EQ(cf_name_, cf_name); - ASSERT_GT(file_path.size(), 0U); - ASSERT_EQ(file_path, file_path_); + // verify whether the previously created file matches the flushed file. + ASSERT_EQ(prev_fc_info_.db_name, db->GetName()); + ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name); + ASSERT_EQ(prev_fc_info_.job_id, info.job_id); + ASSERT_EQ(prev_fc_info_.file_path, info.file_path); } std::vector flushed_column_family_names_; std::vector flushed_dbs_; int slowdown_count; int stop_count; - std::string db_name_; - std::string cf_name_; - std::string file_path_; + TableFileCreationInfo prev_fc_info_; }; TEST_F(EventListenerTest, OnSingleDBFlushTest) { diff --git a/examples/compact_files_example.cc b/examples/compact_files_example.cc index 0ea58c8f7..b9e75562a 100644 --- a/examples/compact_files_example.cc +++ b/examples/compact_files_example.cc @@ -71,13 +71,10 @@ class FullCompactor : public Compactor { // If triggered_writes_stop is true, it will also set the retry // flag of compaction-task to true. void OnFlushCompleted( - DB* db, const std::string& cf_name, - const std::string& file_path, - bool triggered_writes_slowdown, - bool triggered_writes_stop) override { - CompactionTask* task = PickCompaction(db, cf_name); + DB* db, const FlushJobInfo& info) override { + CompactionTask* task = PickCompaction(db, info.cf_name); if (task != nullptr) { - if (triggered_writes_stop) { + if (info.triggered_writes_stop) { task->retry_on_fail = true; } // Schedule compaction in a different thread. diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index dfda708af..1cd0e46d1 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -4,7 +4,6 @@ #pragma once - #include #include #include "rocksdb/compaction_job_stats.h" @@ -50,6 +49,27 @@ struct TableFileDeletionInfo { Status status; }; +struct FlushJobInfo { + // the name of the column family + std::string cf_name; + // the path to the newly created file + std::string file_path; + // the id of the thread that completed this flush job. + uint64_t thread_id; + // the job id, which is unique in the same thread. + int job_id; + // If true, then rocksdb is currently slowing-down all writes to prevent + // creating too many Level 0 files as compaction seems not able to + // catch up the write request speed. This indicates that there are + // too many files in Level 0. + bool triggered_writes_slowdown; + // If true, then rocksdb is currently blocking any writes to prevent + // creating more L0 files. This indicates that there are too many + // files in level 0. Compactions should try to compact L0 files down + // to lower levels as soon as possible. + bool triggered_writes_stop; +}; + struct CompactionJobInfo { CompactionJobInfo() = default; explicit CompactionJobInfo(const CompactionJobStats& _stats) : @@ -114,24 +134,8 @@ class EventListener { // Note that the this function must be implemented in a way such that // it should not run for an extended period of time before the function // returns. Otherwise, RocksDB may be blocked. - // - // @param db a pointer to the rocksdb instance which just flushed - // a memtable to disk. - // @param column_family_id the id of the flushed column family. - // @param file_path the path to the newly created file. - // @param triggered_writes_slowdown true when rocksdb is currently - // slowing-down all writes to prevent creating too many Level 0 - // files as compaction seems not able to catch up the write request - // speed. This indicates that there're too many files in Level 0. - // @param triggered_writes_stop true when rocksdb is currently blocking - // any writes to prevent creating more L0 files. This indicates that - // there're too many files in level 0. Compactions should try to - // compact L0 files down to lower levels as soon as possible. virtual void OnFlushCompleted( - DB* db, const std::string& column_family_name, - const std::string& file_path, - bool triggered_writes_slowdown, - bool triggered_writes_stop) {} + DB* db, const FlushJobInfo& flush_job_info) {} // A call-back function for RocksDB which will be called whenever // a SST file is deleted. Different from OnCompactionCompleted and diff --git a/tools/db_stress.cc b/tools/db_stress.cc index c4bb9cc3d..a3fa02ca3 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -791,14 +791,11 @@ class DbStressListener : public EventListener { virtual ~DbStressListener() {} #ifndef ROCKSDB_LITE virtual void OnFlushCompleted( - DB* db, const std::string& column_family_name, - const std::string& file_path, - bool triggered_writes_slowdown, - bool triggered_writes_stop) override { + DB* db, const FlushJobInfo& info) override { assert(db); assert(db->GetName() == db_name_); - assert(IsValidColumnFamilyName(column_family_name)); - VerifyFilePath(file_path); + assert(IsValidColumnFamilyName(info.cf_name)); + VerifyFilePath(info.file_path); // pretending doing some work here std::this_thread::sleep_for( std::chrono::microseconds(rand_.Uniform(5000)));