[API Change] Improve EventListener::OnFlushCompleted interface

Summary:
EventListener::OnFlushCompleted() now passes a structure instead
of a list of parameters.  This minimizes the API change in the
future.

Test Plan:
listener_test
compact_files_test
example/compact_files_example

Reviewers: kradhakrishnan, sdong, IslamAbdelRahman, rven, igor

Reviewed By: rven, igor

Subscribers: IslamAbdelRahman, rven, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D39543
main
Yueh-Hsuan Chiang 9 years ago
parent 7322c74012
commit 2e764f06ea
  1. 1
      HISTORY.md
  2. 7
      db/compact_files_test.cc
  3. 25
      db/db_impl.cc
  4. 3
      db/db_impl.h
  5. 34
      db/listener_test.cc
  6. 9
      examples/compact_files_example.cc
  7. 40
      include/rocksdb/listener.h
  8. 9
      tools/db_stress.cc

@ -4,6 +4,7 @@
* Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info. * Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info.
### Public API changes ### Public API changes
* EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters.
* DB::GetDbIdentity() is now a const function. If this function is overridden in your application, be sure to also make GetDbIdentity() const to avoid compile error. * DB::GetDbIdentity() is now a const function. If this function is overridden in your application, be sure to also make GetDbIdentity() const to avoid compile error.
* Move listeners from ColumnFamilyOptions to DBOptions. * Move listeners from ColumnFamilyOptions to DBOptions.
* Add max_write_buffer_number_to_maintain option * Add max_write_buffer_number_to_maintain option

@ -31,12 +31,9 @@ class FlushedFileCollector : public EventListener {
~FlushedFileCollector() {} ~FlushedFileCollector() {}
virtual void OnFlushCompleted( virtual void OnFlushCompleted(
DB* db, const std::string& column_family_name, DB* db, const FlushJobInfo& info) override {
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.push_back(file_path); flushed_files_.push_back(info.file_path);
} }
std::vector<std::string> GetFlushedFiles() { std::vector<std::string> GetFlushedFiles() {

@ -1276,7 +1276,8 @@ Status DBImpl::FlushMemTableToOutputFile(
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (s.ok()) { if (s.ok()) {
// may temporarily unlock and lock the mutex. // may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, file_number, mutable_cf_options); NotifyOnFlushCompleted(cfd, file_number, mutable_cf_options,
job_context->job_id);
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
return s; return s;
@ -1284,7 +1285,7 @@ Status DBImpl::FlushMemTableToOutputFile(
void DBImpl::NotifyOnFlushCompleted( void DBImpl::NotifyOnFlushCompleted(
ColumnFamilyData* cfd, uint64_t file_number, ColumnFamilyData* cfd, uint64_t file_number,
const MutableCFOptions& mutable_cf_options) { const MutableCFOptions& mutable_cf_options, int job_id) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (db_options_.listeners.size() == 0U) { if (db_options_.listeners.size() == 0U) {
return; return;
@ -1293,23 +1294,27 @@ void DBImpl::NotifyOnFlushCompleted(
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return; return;
} }
bool triggered_flush_slowdown = bool triggered_writes_slowdown =
(cfd->current()->storage_info()->NumLevelFiles(0) >= (cfd->current()->storage_info()->NumLevelFiles(0) >=
mutable_cf_options.level0_slowdown_writes_trigger); mutable_cf_options.level0_slowdown_writes_trigger);
bool triggered_flush_stop = bool triggered_writes_stop =
(cfd->current()->storage_info()->NumLevelFiles(0) >= (cfd->current()->storage_info()->NumLevelFiles(0) >=
mutable_cf_options.level0_stop_writes_trigger); mutable_cf_options.level0_stop_writes_trigger);
// release lock while notifying events // release lock while notifying events
mutex_.Unlock(); mutex_.Unlock();
{ {
// TODO(yhchiang): make db_paths dynamic. FlushJobInfo info;
auto file_path = MakeTableFileName(db_options_.db_paths[0].path, info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
info.file_path = MakeTableFileName(db_options_.db_paths[0].path,
file_number); file_number);
info.thread_id = ThreadStatusUtil::GetThreadID();
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
for (auto listener : db_options_.listeners) { for (auto listener : db_options_.listeners) {
listener->OnFlushCompleted( listener->OnFlushCompleted(this, info);
this, cfd->GetName(), file_path,
// Use path 0 as fulled memtables are first flushed into path 0.
triggered_flush_slowdown, triggered_flush_stop);
} }
} }
mutex_.Lock(); mutex_.Lock();

@ -350,7 +350,8 @@ class DBImpl : public DB {
SuperVersion* super_version, Arena* arena); SuperVersion* super_version, Arena* arena);
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number, void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number,
const MutableCFOptions& mutable_cf_options); const MutableCFOptions& mutable_cf_options,
int job_id);
void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, void NotifyOnCompactionCompleted(ColumnFamilyData* cfd,
Compaction *c, const Status &st, Compaction *c, const Status &st,

@ -213,9 +213,12 @@ class TestFlushListener : public EventListener {
public: public:
void OnTableFileCreated( void OnTableFileCreated(
const TableFileCreationInfo& info) override { const TableFileCreationInfo& info) override {
db_name_ = info.db_name; // remember the info for later checking the FlushJobInfo.
cf_name_ = info.cf_name; prev_fc_info_ = info;
file_path_ = info.file_path; ASSERT_GT(info.db_name.size(), 0U);
ASSERT_GT(info.cf_name.size(), 0U);
ASSERT_GT(info.file_path.size(), 0U);
ASSERT_GT(info.job_id, 0);
ASSERT_GT(info.table_properties.data_size, 0U); ASSERT_GT(info.table_properties.data_size, 0U);
ASSERT_GT(info.table_properties.raw_key_size, 0U); ASSERT_GT(info.table_properties.raw_key_size, 0U);
ASSERT_GT(info.table_properties.raw_value_size, 0U); ASSERT_GT(info.table_properties.raw_value_size, 0U);
@ -224,32 +227,27 @@ class TestFlushListener : public EventListener {
} }
void OnFlushCompleted( void OnFlushCompleted(
DB* db, const std::string& cf_name, DB* db, const FlushJobInfo& info) override {
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) override {
flushed_dbs_.push_back(db); flushed_dbs_.push_back(db);
flushed_column_family_names_.push_back(cf_name); flushed_column_family_names_.push_back(info.cf_name);
if (triggered_writes_slowdown) { if (info.triggered_writes_slowdown) {
slowdown_count++; slowdown_count++;
} }
if (triggered_writes_stop) { if (info.triggered_writes_stop) {
stop_count++; stop_count++;
} }
// verify the file created matches the flushed file. // verify whether the previously created file matches the flushed file.
ASSERT_EQ(db_name_, db->GetName()); ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
ASSERT_EQ(cf_name_, cf_name); ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
ASSERT_GT(file_path.size(), 0U); ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
ASSERT_EQ(file_path, file_path_); ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
} }
std::vector<std::string> flushed_column_family_names_; std::vector<std::string> flushed_column_family_names_;
std::vector<DB*> flushed_dbs_; std::vector<DB*> flushed_dbs_;
int slowdown_count; int slowdown_count;
int stop_count; int stop_count;
std::string db_name_; TableFileCreationInfo prev_fc_info_;
std::string cf_name_;
std::string file_path_;
}; };
TEST_F(EventListenerTest, OnSingleDBFlushTest) { TEST_F(EventListenerTest, OnSingleDBFlushTest) {

@ -71,13 +71,10 @@ class FullCompactor : public Compactor {
// If triggered_writes_stop is true, it will also set the retry // If triggered_writes_stop is true, it will also set the retry
// flag of compaction-task to true. // flag of compaction-task to true.
void OnFlushCompleted( void OnFlushCompleted(
DB* db, const std::string& cf_name, DB* db, const FlushJobInfo& info) override {
const std::string& file_path, CompactionTask* task = PickCompaction(db, info.cf_name);
bool triggered_writes_slowdown,
bool triggered_writes_stop) override {
CompactionTask* task = PickCompaction(db, cf_name);
if (task != nullptr) { if (task != nullptr) {
if (triggered_writes_stop) { if (info.triggered_writes_stop) {
task->retry_on_fail = true; task->retry_on_fail = true;
} }
// Schedule compaction in a different thread. // Schedule compaction in a different thread.

@ -4,7 +4,6 @@
#pragma once #pragma once
#include <string> #include <string>
#include <vector> #include <vector>
#include "rocksdb/compaction_job_stats.h" #include "rocksdb/compaction_job_stats.h"
@ -50,6 +49,27 @@ struct TableFileDeletionInfo {
Status status; Status status;
}; };
struct FlushJobInfo {
// the name of the column family
std::string cf_name;
// the path to the newly created file
std::string file_path;
// the id of the thread that completed this flush job.
uint64_t thread_id;
// the job id, which is unique in the same thread.
int job_id;
// If true, then rocksdb is currently slowing-down all writes to prevent
// creating too many Level 0 files as compaction seems not able to
// catch up the write request speed. This indicates that there are
// too many files in Level 0.
bool triggered_writes_slowdown;
// If true, then rocksdb is currently blocking any writes to prevent
// creating more L0 files. This indicates that there are too many
// files in level 0. Compactions should try to compact L0 files down
// to lower levels as soon as possible.
bool triggered_writes_stop;
};
struct CompactionJobInfo { struct CompactionJobInfo {
CompactionJobInfo() = default; CompactionJobInfo() = default;
explicit CompactionJobInfo(const CompactionJobStats& _stats) : explicit CompactionJobInfo(const CompactionJobStats& _stats) :
@ -114,24 +134,8 @@ class EventListener {
// Note that the this function must be implemented in a way such that // Note that the this function must be implemented in a way such that
// it should not run for an extended period of time before the function // it should not run for an extended period of time before the function
// returns. Otherwise, RocksDB may be blocked. // returns. Otherwise, RocksDB may be blocked.
//
// @param db a pointer to the rocksdb instance which just flushed
// a memtable to disk.
// @param column_family_id the id of the flushed column family.
// @param file_path the path to the newly created file.
// @param triggered_writes_slowdown true when rocksdb is currently
// slowing-down all writes to prevent creating too many Level 0
// files as compaction seems not able to catch up the write request
// speed. This indicates that there're too many files in Level 0.
// @param triggered_writes_stop true when rocksdb is currently blocking
// any writes to prevent creating more L0 files. This indicates that
// there're too many files in level 0. Compactions should try to
// compact L0 files down to lower levels as soon as possible.
virtual void OnFlushCompleted( virtual void OnFlushCompleted(
DB* db, const std::string& column_family_name, DB* db, const FlushJobInfo& flush_job_info) {}
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) {}
// A call-back function for RocksDB which will be called whenever // A call-back function for RocksDB which will be called whenever
// a SST file is deleted. Different from OnCompactionCompleted and // a SST file is deleted. Different from OnCompactionCompleted and

@ -791,14 +791,11 @@ class DbStressListener : public EventListener {
virtual ~DbStressListener() {} virtual ~DbStressListener() {}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
virtual void OnFlushCompleted( virtual void OnFlushCompleted(
DB* db, const std::string& column_family_name, DB* db, const FlushJobInfo& info) override {
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) override {
assert(db); assert(db);
assert(db->GetName() == db_name_); assert(db->GetName() == db_name_);
assert(IsValidColumnFamilyName(column_family_name)); assert(IsValidColumnFamilyName(info.cf_name));
VerifyFilePath(file_path); VerifyFilePath(info.file_path);
// pretending doing some work here // pretending doing some work here
std::this_thread::sleep_for( std::this_thread::sleep_for(
std::chrono::microseconds(rand_.Uniform(5000))); std::chrono::microseconds(rand_.Uniform(5000)));

Loading…
Cancel
Save