Add EventListener::OnTableFileCreated()

Summary:
Add EventListener::OnTableFileCreated(), which will be called
when a table file is created.  This patch is part of the
EventLogger and EventListener integration.

Test Plan: Augment existing test in db/listener_test.cc

Reviewers: anthony, kradhakrishnan, rven, igor, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D38865
main
Yueh-Hsuan Chiang 10 years ago
parent 898e803fc5
commit fc83821270
  1. 20
      db/compaction_job.cc
  2. 4
      db/compaction_job.h
  3. 3
      db/compaction_job_test.cc
  4. 31
      db/db_impl.cc
  5. 2
      db/db_impl.h
  6. 54
      db/event_helpers.cc
  7. 17
      db/event_helpers.h
  8. 17
      db/flush_job.cc
  9. 25
      db/listener_test.cc
  10. 35
      include/rocksdb/listener.h
  11. 2
      include/rocksdb/table_properties.h
  12. 12
      tools/db_stress.cc

@ -205,10 +205,11 @@ CompactionJob::CompactionJob(
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback, EventLogger* event_logger,
bool paranoid_file_checks)
bool paranoid_file_checks, const std::string& dbname)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_stats_(1),
dbname_(dbname),
db_options_(db_options),
env_options_(env_options),
env_(db_options.env),
@ -1020,13 +1021,9 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
} else {
compact_->builder->Abandon();
}
if (s.ok()) {
table_properties = compact_->builder->GetTableProperties();
}
const uint64_t current_bytes = compact_->builder->FileSize();
compact_->current_output()->file_size = current_bytes;
compact_->total_bytes += current_bytes;
compact_->builder.reset();
// Finish and check for file errors
if (s.ok() && !db_options_.disableDataSync) {
@ -1058,16 +1055,23 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
delete iter;
if (s.ok()) {
TableFileCreationInfo info(compact_->builder->GetTableProperties());
info.db_name = dbname_;
info.cf_name = cfd->GetName();
info.file_path = TableFileName(cfd->ioptions()->db_paths,
fd.GetNumber(), fd.GetPathId());
info.file_size = fd.GetFileSize();
info.job_id = job_id_;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
" keys, %" PRIu64 " bytes",
cfd->GetName().c_str(), job_id_, output_number, current_entries,
current_bytes);
EventHelpers::LogTableFileCreation(event_logger_, job_id_,
output_number, current_bytes,
table_properties);
EventHelpers::LogAndNotifyTableFileCreation(
event_logger_, cfd->ioptions()->listeners, fd, info);
}
}
compact_->builder.reset();
return s;
}

@ -58,7 +58,8 @@ class CompactionJob {
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback,
EventLogger* event_logger, bool paranoid_file_checks);
EventLogger* event_logger, bool paranoid_file_checks,
const std::string& dbname);
~CompactionJob();
@ -111,6 +112,7 @@ class CompactionJob {
InternalStats::CompactionStats compaction_stats_;
// DBImpl state
const std::string& dbname_;
const DBOptions& db_options_;
const EnvOptions& env_options_;
Env* env_;

@ -166,7 +166,8 @@ TEST_F(CompactionJobTest, Simple) {
CompactionJob compaction_job(0, compaction.get(), db_options_, env_options_,
versions_.get(), &shutting_down_, &log_buffer,
nullptr, nullptr, nullptr, {}, table_cache_,
std::move(yield_callback), &event_logger, false);
std::move(yield_callback), &event_logger, false,
"dbname");
compaction_job.Prepare();
mutex_.Unlock();

@ -1162,13 +1162,14 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
{
mutex_.Unlock();
TableFileCreationInfo info;
s = BuildTable(
dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(),
iter.get(), &meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), newest_snapshot,
earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()),
cfd->ioptions()->compression_opts, paranoid_file_checks, Env::IO_HIGH,
&table_properties);
&info.table_properties);
LogFlush(db_options_.info_log);
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
@ -1178,9 +1179,15 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
// output to event logger
if (s.ok()) {
EventHelpers::LogTableFileCreation(
&event_logger_, job_id, meta.fd.GetNumber(), meta.fd.GetFileSize(),
table_properties);
info.db_name = dbname_;
info.cf_name = cfd->GetName();
info.file_path = TableFileName(db_options_.db_paths,
meta.fd.GetNumber(),
meta.fd.GetPathId());
info.file_size = meta.fd.GetFileSize();
info.job_id = job_id;
EventHelpers::LogAndNotifyTableFileCreation(
&event_logger_, db_options_.listeners, meta.fd, info);
}
mutex_.Lock();
}
@ -1222,6 +1229,13 @@ Status DBImpl::FlushMemTableToOutputFile(
&event_logger_);
uint64_t file_number;
// Within flush_job.Run, rocksdb may call event listener to notify
// file creation and deletion.
//
// Note that flush_job.Run will unlock and lock the db_mutex,
// and EventListener callback will be called when the db_mutex
// is unlocked by the current thread.
Status s = flush_job.Run(&file_number);
if (s.ok()) {
@ -1516,12 +1530,14 @@ Status DBImpl::CompactFilesImpl(
&shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->GetOutputPathId()), stats_,
snapshots_.GetAll(), table_cache_, std::move(yield_callback),
&event_logger_, c->mutable_cf_options()->paranoid_file_checks);
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
dbname_);
compaction_job.Prepare();
mutex_.Unlock();
Status status = compaction_job.Run();
mutex_.Lock();
compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), job_context,
@ -2439,11 +2455,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->GetOutputPathId()), stats_,
snapshots_.GetAll(), table_cache_, std::move(yield_callback),
&event_logger_, c->mutable_cf_options()->paranoid_file_checks);
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
dbname_);
compaction_job.Prepare();
mutex_.Unlock();
status = compaction_job.Run();
mutex_.Lock();
compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), job_context,

@ -22,6 +22,8 @@
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "db/column_family.h"
#include "db/compaction_job.h"
#include "db/flush_job.h"
#include "db/version_edit.h"
#include "db/wal_manager.h"
#include "db/writebuffer.h"

@ -17,15 +17,19 @@ void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
std::chrono::system_clock::now().time_since_epoch()).count();
}
void EventHelpers::LogTableFileCreation(
EventLogger* event_logger, int job_id, uint64_t file_number,
uint64_t file_size, const TableProperties& table_properties) {
// TODO(yhchiang): change the API to directly take TableFileCreationInfo
void EventHelpers::LogAndNotifyTableFileCreation(
EventLogger* event_logger,
const std::vector<std::shared_ptr<EventListener>>& listeners,
const FileDescriptor& fd, const TableFileCreationInfo& info) {
assert(event_logger);
JSONWriter jwriter;
AppendCurrentTime(&jwriter);
jwriter << "job" << job_id
<< "event" << "table_file_creation"
<< "file_number" << file_number
<< "file_size" << file_size;
jwriter << "cf_name" << info.cf_name
<< "job" << info.job_id
<< "event" << "table_file_creation"
<< "file_number" << fd.GetNumber()
<< "file_size" << fd.GetFileSize();
// table_properties
{
@ -33,22 +37,24 @@ void EventHelpers::LogTableFileCreation(
jwriter.StartObject();
// basic properties:
jwriter << "data_size" << table_properties.data_size
<< "index_size" << table_properties.index_size
<< "filter_size" << table_properties.filter_size
<< "raw_key_size" << table_properties.raw_key_size
jwriter << "data_size" << info.table_properties.data_size
<< "index_size" << info.table_properties.index_size
<< "filter_size" << info.table_properties.filter_size
<< "raw_key_size" << info.table_properties.raw_key_size
<< "raw_average_key_size" << SafeDivide(
table_properties.raw_key_size,
table_properties.num_entries)
<< "raw_value_size" << table_properties.raw_value_size
info.table_properties.raw_key_size,
info.table_properties.num_entries)
<< "raw_value_size" << info.table_properties.raw_value_size
<< "raw_average_value_size" << SafeDivide(
table_properties.raw_value_size, table_properties.num_entries)
<< "num_data_blocks" << table_properties.num_data_blocks
<< "num_entries" << table_properties.num_entries
<< "filter_policy_name" << table_properties.filter_policy_name;
info.table_properties.raw_value_size,
info.table_properties.num_entries)
<< "num_data_blocks" << info.table_properties.num_data_blocks
<< "num_entries" << info.table_properties.num_entries
<< "filter_policy_name" <<
info.table_properties.filter_policy_name;
// user collected properties
for (const auto& prop : table_properties.user_collected_properties) {
for (const auto& prop : info.table_properties.user_collected_properties) {
jwriter << prop.first << prop.second;
}
jwriter.EndObject();
@ -56,6 +62,16 @@ void EventHelpers::LogTableFileCreation(
jwriter.EndObject();
event_logger->Log(jwriter);
#ifndef ROCKSDB_LITE
if (listeners.size() == 0) {
return;
}
for (auto listener : listeners) {
listener->OnTableFileCreated(info);
}
#endif // !ROCKSDB_LITE
}
} // namespace rocksdb

@ -4,16 +4,25 @@
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include "util/event_logger.h"
#include <memory>
#include <string>
#include <vector>
#include "db/column_family.h"
#include "db/version_edit.h"
#include "rocksdb/listener.h"
#include "rocksdb/table_properties.h"
#include "util/event_logger.h"
namespace rocksdb {
class EventHelpers {
public:
static void AppendCurrentTime(JSONWriter* json_writer);
static void LogTableFileCreation(EventLogger* event_logger, int job_id,
uint64_t file_number, uint64_t file_size,
const TableProperties& table_properties);
static void LogAndNotifyTableFileCreation(
EventLogger* event_logger,
const std::vector<std::shared_ptr<EventListener>>& listeners,
const FileDescriptor& fd, const TableFileCreationInfo& info);
};
} // namespace rocksdb

@ -222,7 +222,7 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
<< total_num_entries << "num_deletes"
<< total_num_deletes << "memory_usage"
<< total_memory_usage;
TableProperties table_properties;
TableFileCreationInfo info;
{
ScopedArenaIterator iter(
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
@ -240,7 +240,7 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
earliest_seqno_in_memtable, output_compression_,
cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, Env::IO_HIGH,
&table_properties);
&info.table_properties);
LogFlush(db_options_.info_log);
}
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
@ -250,9 +250,16 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
// output to event logger
if (s.ok()) {
EventHelpers::LogTableFileCreation(
event_logger_, job_context_->job_id, meta.fd.GetNumber(),
meta.fd.GetFileSize(), table_properties);
info.db_name = dbname_;
info.cf_name = cfd_->GetName();
info.file_path = TableFileName(db_options_.db_paths,
meta.fd.GetNumber(),
meta.fd.GetPathId());
info.file_size = meta.fd.GetFileSize();
info.job_id = job_context_->job_id;
EventHelpers::LogAndNotifyTableFileCreation(
event_logger_, db_options_.listeners,
meta.fd, info);
}
if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {

@ -208,27 +208,48 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
}
}
// This simple Listener can only handle one flush at a time.
class TestFlushListener : public EventListener {
public:
void OnTableFileCreated(
const TableFileCreationInfo& info) {
db_name_ = info.db_name;
cf_name_ = info.cf_name;
file_path_ = info.file_path;
ASSERT_GT(info.table_properties.data_size, 0U);
ASSERT_GT(info.table_properties.raw_key_size, 0U);
ASSERT_GT(info.table_properties.raw_value_size, 0U);
ASSERT_GT(info.table_properties.num_data_blocks, 0U);
ASSERT_GT(info.table_properties.num_entries, 0U);
}
void OnFlushCompleted(
DB* db, const std::string& name,
DB* db, const std::string& cf_name,
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) override {
flushed_dbs_.push_back(db);
flushed_column_family_names_.push_back(name);
flushed_column_family_names_.push_back(cf_name);
if (triggered_writes_slowdown) {
slowdown_count++;
}
if (triggered_writes_stop) {
stop_count++;
}
// verify the file created matches the flushed file.
ASSERT_EQ(db_name_, db->GetName());
ASSERT_EQ(cf_name_, cf_name);
ASSERT_GT(file_path.size(), 0U);
ASSERT_EQ(file_path, file_path_);
}
std::vector<std::string> flushed_column_family_names_;
std::vector<DB*> flushed_dbs_;
int slowdown_count;
int stop_count;
std::string db_name_;
std::string cf_name_;
std::string file_path_;
};
TEST_F(EventListenerTest, OnSingleDBFlushTest) {

@ -9,6 +9,7 @@
#include <string>
#include <vector>
#include "rocksdb/status.h"
#include "rocksdb/table_properties.h"
namespace rocksdb {
@ -28,6 +29,25 @@ struct CompactionJobInfo {
std::vector<std::string> output_files;
};
struct TableFileCreationInfo {
TableFileCreationInfo() = default;
explicit TableFileCreationInfo(TableProperties&& prop) :
table_properties(prop) {}
// the name of the database where the file was created
std::string db_name;
// the name of the column family where the file was created.
std::string cf_name;
// the path to the created file.
std::string file_path;
// the size of the file.
uint64_t file_size;
// the id of the job (which could be flush or compaction) that
// created the file.
int job_id;
// Detailed properties of the created file.
TableProperties table_properties;
};
// EventListener class contains a set of call-back functions that will
// be called when specific RocksDB event happens such as flush. It can
// be used as a building block for developing custom features such as
@ -99,6 +119,21 @@ class EventListener {
// after this function is returned, and must be copied if it is needed
// outside of this function.
virtual void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) {}
// A call-back function for RocksDB which will be called whenever
// a SST file is created. Different from OnCompactionCompleted and
// OnFlushCompleted, this call-back is designed for external logging
// service and thus only provide string parameters instead
// of a pointer to DB. Applications that build logic basic based
// on file creations and deletions is suggested to implement
// OnFlushCompleted and OnCompactionCompleted.
//
// Note that if applications would like to use the passed reference
// outside this function call, they should make copies from these
// returned value.
virtual void OnTableFileCreated(
const TableFileCreationInfo& info) {}
virtual ~EventListener() {}
};

@ -24,7 +24,7 @@ namespace rocksdb {
// ++pos) {
// ...
// }
typedef std::map<const std::string, std::string> UserCollectedProperties;
typedef std::map<std::string, std::string> UserCollectedProperties;
// TableProperties contains a bunch of read-only properties of its associated
// table.

@ -821,6 +821,18 @@ class DbStressListener : public EventListener {
std::chrono::microseconds(rand_.Uniform(5000)));
}
virtual void OnTableFileCreated(
const TableFileCreationInfo& info) override {
assert(info.db_name == db_name_);
assert(IsValidColumnFamilyName(info.cf_name));
VerifyFilePath(info.file_path);
assert(info.file_size > 0);
assert(info.job_id > 0);
assert(info.table_properties.data_size > 0);
assert(info.table_properties.raw_key_size > 0);
assert(info.table_properties.num_entries > 0);
}
protected:
bool IsValidColumnFamilyName(const std::string& cf_name) const {
if (cf_name == kDefaultColumnFamilyName) {

Loading…
Cancel
Save