Added EventListener::OnTableFileCreationStarted() callback

Summary: Added EventListener::OnTableFileCreationStarted. EventListener::OnTableFileCreated will be called on failure case. User can check creation status via TableFileCreationInfo::status.

Test Plan: unit test.

Reviewers: dhruba, yhchiang, ott, sdong

Reviewed By: sdong

Subscribers: sdong, kradhakrishnan, IslamAbdelRahman, andrewkr, yhchiang, leveldb, ott, dhruba

Differential Revision: https://reviews.facebook.net/D56337
main
Yi Wu 9 years ago
parent e8115cea45
commit a92049e3e7
  1. 1
      HISTORY.md
  2. 25
      db/builder.cc
  3. 11
      db/builder.h
  4. 40
      db/compaction_job.cc
  5. 17
      db/db_impl.cc
  6. 3
      db/db_test.cc
  7. 108
      db/event_helpers.cc
  8. 19
      db/event_helpers.h
  9. 18
      db/flush_job.cc
  10. 141
      db/listener_test.cc
  11. 2
      db/repair.cc
  12. 39
      include/rocksdb/listener.h

@ -4,6 +4,7 @@
* Allow preset compression dictionary for improved compression of block-based tables. This is supported for zlib, zstd, and lz4. The compression dictionary's size is configurable via CompressionOptions::max_dict_bytes. * Allow preset compression dictionary for improved compression of block-based tables. This is supported for zlib, zstd, and lz4. The compression dictionary's size is configurable via CompressionOptions::max_dict_bytes.
* Delete deprecated classes for creating backups (BackupableDB) and restoring from backups (RestoreBackupableDB). Now, BackupEngine should be used for creating backups, and BackupEngineReadOnly should be used for restorations. For more details, see https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F * Delete deprecated classes for creating backups (BackupableDB) and restoring from backups (RestoreBackupableDB). Now, BackupEngine should be used for creating backups, and BackupEngineReadOnly should be used for restorations. For more details, see https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F
* Expose estimate of per-level compression ratio via DB property: "rocksdb.compression-ratio-at-levelN". * Expose estimate of per-level compression ratio via DB property: "rocksdb.compression-ratio-at-levelN".
* Added EventListener::OnTableFileCreationStarted. EventListener::OnTableFileCreated will be called on failure case. User can check creation status via TableFileCreationInfo::status.
## 4.7.0 (4/8/2016) ## 4.7.0 (4/8/2016)
### Public API Change ### Public API Change

@ -15,6 +15,7 @@
#include "db/compaction_iterator.h" #include "db/compaction_iterator.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/event_helpers.h"
#include "db/filename.h" #include "db/filename.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/merge_helper.h" #include "db/merge_helper.h"
@ -68,7 +69,8 @@ Status BuildTable(
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
const CompressionType compression, const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks, const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, const Env::IOPriority io_priority, InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level) { TableProperties* table_properties, int level) {
assert((column_family_id == assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
@ -81,6 +83,12 @@ Status BuildTable(
std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(), std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId()); meta->fd.GetPathId());
#ifndef ROCKSDB_LITE
EventHelpers::NotifyTableFileCreationStarted(
ioptions.listeners, dbname, column_family_name, fname, job_id, reason);
#endif // !ROCKSDB_LITE
TableProperties tp;
if (iter->Valid()) { if (iter->Valid()) {
TableBuilder* builder; TableBuilder* builder;
unique_ptr<WritableFileWriter> file_writer; unique_ptr<WritableFileWriter> file_writer;
@ -88,6 +96,9 @@ Status BuildTable(
unique_ptr<WritableFile> file; unique_ptr<WritableFile> file;
s = NewWritableFile(env, fname, &file, env_options); s = NewWritableFile(env, fname, &file, env_options);
if (!s.ok()) { if (!s.ok()) {
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, column_family_name, fname,
job_id, meta->fd, tp, reason, s);
return s; return s;
} }
file->SetIOPriority(io_priority); file->SetIOPriority(io_priority);
@ -135,11 +146,13 @@ Status BuildTable(
} }
if (s.ok() && !empty) { if (s.ok() && !empty) {
meta->fd.file_size = builder->FileSize(); uint64_t file_size = builder->FileSize();
meta->fd.file_size = file_size;
meta->marked_for_compaction = builder->NeedCompact(); meta->marked_for_compaction = builder->NeedCompact();
assert(meta->fd.GetFileSize() > 0); assert(meta->fd.GetFileSize() > 0);
tp = builder->GetTableProperties();
if (table_properties) { if (table_properties) {
*table_properties = builder->GetTableProperties(); *table_properties = tp;
} }
} }
delete builder; delete builder;
@ -178,6 +191,12 @@ Status BuildTable(
if (!s.ok() || meta->fd.GetFileSize() == 0) { if (!s.ok() || meta->fd.GetFileSize() == 0) {
env->DeleteFile(fname); env->DeleteFile(fname);
} }
// Output to event logger and fire events.
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, column_family_name, fname,
job_id, meta->fd, tp, reason, s);
return s; return s;
} }

@ -12,11 +12,13 @@
#include "db/table_properties_collector.h" #include "db/table_properties_collector.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/options.h"
#include "rocksdb/immutable_options.h" #include "rocksdb/immutable_options.h"
#include "rocksdb/listener.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "rocksdb/types.h"
#include "util/event_logger.h"
namespace rocksdb { namespace rocksdb {
@ -69,7 +71,8 @@ extern Status BuildTable(
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
const CompressionType compression, const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks, const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH, const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr, int level = -1); TableProperties* table_properties = nullptr, int level = -1);

@ -953,9 +953,10 @@ Status CompactionJob::FinishCompactionOutputFile(
} }
sub_compact->outfile.reset(); sub_compact->outfile.reset();
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
TableProperties tp;
if (s.ok() && current_entries > 0) { if (s.ok() && current_entries > 0) {
// Verify that the table is usable // Verify that the table is usable
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
InternalIterator* iter = cfd->table_cache()->NewIterator( InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd, ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd,
nullptr, cfd->internal_stats()->GetFileReadHist( nullptr, cfd->internal_stats()->GetFileReadHist(
@ -969,34 +970,30 @@ Status CompactionJob::FinishCompactionOutputFile(
} }
delete iter; delete iter;
// Output to event logger and fire events.
if (s.ok()) { if (s.ok()) {
auto tp = sub_compact->builder->GetTableProperties(); tp = sub_compact->builder->GetTableProperties();
sub_compact->current_output()->table_properties = sub_compact->current_output()->table_properties =
std::make_shared<TableProperties>(tp); std::make_shared<TableProperties>(tp);
TableFileCreationInfo info(std::move(tp));
info.db_name = dbname_;
info.cf_name = cfd->GetName();
info.file_path =
TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
info.file_size = meta->fd.GetFileSize();
info.job_id = job_id_;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
" keys, %" PRIu64 " bytes%s", " keys, %" PRIu64 " bytes%s",
cfd->GetName().c_str(), job_id_, output_number, current_entries, cfd->GetName().c_str(), job_id_, output_number, current_entries,
current_bytes, current_bytes,
meta->marked_for_compaction ? " (need compaction)" : ""); meta->marked_for_compaction ? " (need compaction)" : "");
EventHelpers::LogAndNotifyTableFileCreation(
event_logger_, cfd->ioptions()->listeners, meta->fd, info);
} }
} }
std::string fname = TableFileName(db_options_.db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
job_id_, meta->fd, tp, TableFileCreationReason::kCompaction, s);
// Report new file to SstFileManagerImpl // Report new file to SstFileManagerImpl
auto sfm = auto sfm =
static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get()); static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
if (sfm && meta->fd.GetPathId() == 0) { if (sfm && meta->fd.GetPathId() == 0) {
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(), auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId()); meta->fd.GetPathId());
sfm->OnAddFile(fn); sfm->OnAddFile(fn);
@ -1072,10 +1069,17 @@ Status CompactionJob::OpenCompactionOutputFile(
assert(sub_compact->builder == nullptr); assert(sub_compact->builder == nullptr);
// no need to lock because VersionSet::next_file_number_ is atomic // no need to lock because VersionSet::next_file_number_ is atomic
uint64_t file_number = versions_->NewFileNumber(); uint64_t file_number = versions_->NewFileNumber();
// Make the output file
unique_ptr<WritableFile> writable_file;
std::string fname = TableFileName(db_options_.db_paths, file_number, std::string fname = TableFileName(db_options_.db_paths, file_number,
sub_compact->compaction->output_path_id()); sub_compact->compaction->output_path_id());
// Fire events.
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
#ifndef ROCKSDB_LITE
EventHelpers::NotifyTableFileCreationStarted(
cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_,
TableFileCreationReason::kCompaction);
#endif // !ROCKSDB_LITE
// Make the output file
unique_ptr<WritableFile> writable_file;
Status s = NewWritableFile(env_, fname, &writable_file, env_options_); Status s = NewWritableFile(env_, fname, &writable_file, env_options_);
if (!s.ok()) { if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
@ -1084,8 +1088,13 @@ Status CompactionJob::OpenCompactionOutputFile(
sub_compact->compaction->column_family_data()->GetName().c_str(), sub_compact->compaction->column_family_data()->GetName().c_str(),
job_id_, file_number, s.ToString().c_str()); job_id_, file_number, s.ToString().c_str());
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(),
fname, job_id_, FileDescriptor(), TableProperties(),
TableFileCreationReason::kCompaction, s);
return s; return s;
} }
SubcompactionState::Output out; SubcompactionState::Output out;
out.meta.fd = out.meta.fd =
FileDescriptor(file_number, sub_compact->compaction->output_path_id(), 0); FileDescriptor(file_number, sub_compact->compaction->output_path_id(), 0);
@ -1098,7 +1107,6 @@ Status CompactionJob::OpenCompactionOutputFile(
sub_compact->outfile.reset( sub_compact->outfile.reset(
new WritableFileWriter(std::move(writable_file), env_options_)); new WritableFileWriter(std::move(writable_file), env_options_));
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
// If the Column family flag is to only optimize filters for hits, // If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where // we can skip creating filters if this is the bottommost_level where
// data is going to be found // data is going to be found

@ -1534,7 +1534,6 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
cfd->GetLatestMutableCFOptions()->paranoid_file_checks; cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
{ {
mutex_.Unlock(); mutex_.Unlock();
TableFileCreationInfo info;
SequenceNumber earliest_write_conflict_snapshot; SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs = std::vector<SequenceNumber> snapshot_seqs =
@ -1547,26 +1546,14 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_seqs, earliest_write_conflict_snapshot,
GetCompressionFlush(*cfd->ioptions()), GetCompressionFlush(*cfd->ioptions()),
cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->ioptions()->compression_opts, paranoid_file_checks,
cfd->internal_stats(), Env::IO_HIGH, &info.table_properties); cfd->internal_stats(), TableFileCreationReason::kRecovery,
&event_logger_, job_id);
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]" "[%s] [WriteLevel0TableForRecovery]"
" Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s", " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(),
s.ToString().c_str()); s.ToString().c_str());
// output to event logger
if (s.ok()) {
info.db_name = dbname_;
info.cf_name = cfd->GetName();
info.file_path = TableFileName(db_options_.db_paths,
meta.fd.GetNumber(),
meta.fd.GetPathId());
info.file_size = meta.fd.GetFileSize();
info.job_id = job_id;
EventHelpers::LogAndNotifyTableFileCreation(
&event_logger_, db_options_.listeners, meta.fd, info);
}
mutex_.Lock(); mutex_.Lock();
} }
} }

@ -4049,8 +4049,7 @@ TEST_F(DBTest, ThreadStatusFlush) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({ rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"FlushJob::FlushJob()", "DBTest::ThreadStatusFlush:1"}, {"FlushJob::FlushJob()", "DBTest::ThreadStatusFlush:1"},
{"DBTest::ThreadStatusFlush:2", {"DBTest::ThreadStatusFlush:2", "FlushJob::WriteLevel0Table"},
"FlushJob::LogAndNotifyTableFileCreation()"},
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();

@ -20,57 +20,83 @@ void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
std::chrono::system_clock::now().time_since_epoch()).count(); std::chrono::system_clock::now().time_since_epoch()).count();
} }
void EventHelpers::LogAndNotifyTableFileCreation( #ifndef ROCKSDB_LITE
void EventHelpers::NotifyTableFileCreationStarted(
const std::vector<std::shared_ptr<EventListener>>& listeners,
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, TableFileCreationReason reason) {
TableFileCreationBriefInfo info;
info.db_name = db_name;
info.cf_name = cf_name;
info.file_path = file_path;
info.job_id = job_id;
info.reason = reason;
for (auto& listener : listeners) {
listener->OnTableFileCreationStarted(info);
}
}
#endif // !ROCKSDB_LITE
void EventHelpers::LogAndNotifyTableFileCreationFinished(
EventLogger* event_logger, EventLogger* event_logger,
const std::vector<std::shared_ptr<EventListener>>& listeners, const std::vector<std::shared_ptr<EventListener>>& listeners,
const FileDescriptor& fd, const TableFileCreationInfo& info) { const std::string& db_name, const std::string& cf_name,
assert(event_logger); const std::string& file_path, int job_id, const FileDescriptor& fd,
JSONWriter jwriter; const TableProperties& table_properties, TableFileCreationReason reason,
AppendCurrentTime(&jwriter); const Status& s) {
jwriter << "cf_name" << info.cf_name if (s.ok() && event_logger) {
<< "job" << info.job_id JSONWriter jwriter;
<< "event" << "table_file_creation" AppendCurrentTime(&jwriter);
<< "file_number" << fd.GetNumber() jwriter << "cf_name" << cf_name << "job" << job_id << "event"
<< "file_size" << fd.GetFileSize(); << "table_file_creation"
<< "file_number" << fd.GetNumber() << "file_size"
// table_properties << fd.GetFileSize();
{
jwriter << "table_properties"; // table_properties
jwriter.StartObject(); {
jwriter << "table_properties";
// basic properties: jwriter.StartObject();
jwriter << "data_size" << info.table_properties.data_size
<< "index_size" << info.table_properties.index_size // basic properties:
<< "filter_size" << info.table_properties.filter_size jwriter << "data_size" << table_properties.data_size << "index_size"
<< "raw_key_size" << info.table_properties.raw_key_size << table_properties.index_size << "filter_size"
<< "raw_average_key_size" << SafeDivide( << table_properties.filter_size << "raw_key_size"
info.table_properties.raw_key_size, << table_properties.raw_key_size << "raw_average_key_size"
info.table_properties.num_entries) << SafeDivide(table_properties.raw_key_size,
<< "raw_value_size" << info.table_properties.raw_value_size table_properties.num_entries)
<< "raw_average_value_size" << SafeDivide( << "raw_value_size" << table_properties.raw_value_size
info.table_properties.raw_value_size, << "raw_average_value_size"
info.table_properties.num_entries) << SafeDivide(table_properties.raw_value_size,
<< "num_data_blocks" << info.table_properties.num_data_blocks table_properties.num_entries)
<< "num_entries" << info.table_properties.num_entries << "num_data_blocks" << table_properties.num_data_blocks
<< "filter_policy_name" << << "num_entries" << table_properties.num_entries
info.table_properties.filter_policy_name; << "filter_policy_name" << table_properties.filter_policy_name;
// user collected properties // user collected properties
for (const auto& prop : info.table_properties.readable_properties) { for (const auto& prop : table_properties.readable_properties) {
jwriter << prop.first << prop.second; jwriter << prop.first << prop.second;
}
jwriter.EndObject();
} }
jwriter.EndObject(); jwriter.EndObject();
}
jwriter.EndObject();
event_logger->Log(jwriter); event_logger->Log(jwriter);
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (listeners.size() == 0) { if (listeners.size() == 0) {
return; return;
} }
TableFileCreationInfo info;
for (auto listener : listeners) { info.db_name = db_name;
info.cf_name = cf_name;
info.file_path = file_path;
info.file_size = fd.file_size;
info.job_id = job_id;
info.table_properties = table_properties;
info.reason = reason;
info.status = s;
for (auto& listener : listeners) {
listener->OnTableFileCreated(info); listener->OnTableFileCreated(info);
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE

@ -19,15 +19,30 @@ namespace rocksdb {
class EventHelpers { class EventHelpers {
public: public:
static void AppendCurrentTime(JSONWriter* json_writer); static void AppendCurrentTime(JSONWriter* json_writer);
static void LogAndNotifyTableFileCreation( #ifndef ROCKSDB_LITE
static void NotifyTableFileCreationStarted(
const std::vector<std::shared_ptr<EventListener>>& listeners,
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, TableFileCreationReason reason);
#endif // !ROCKSDB_LITE
static void LogAndNotifyTableFileCreationFinished(
EventLogger* event_logger, EventLogger* event_logger,
const std::vector<std::shared_ptr<EventListener>>& listeners, const std::vector<std::shared_ptr<EventListener>>& listeners,
const FileDescriptor& fd, const TableFileCreationInfo& info); const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, const FileDescriptor& fd,
const TableProperties& table_properties, TableFileCreationReason reason,
const Status& s);
static void LogAndNotifyTableFileDeletion( static void LogAndNotifyTableFileDeletion(
EventLogger* event_logger, int job_id, EventLogger* event_logger, int job_id,
uint64_t file_number, const std::string& file_path, uint64_t file_number, const std::string& file_path,
const Status& status, const std::string& db_name, const Status& status, const std::string& db_name,
const std::vector<std::shared_ptr<EventListener>>& listeners); const std::vector<std::shared_ptr<EventListener>>& listeners);
private:
static void LogAndNotifyTableFileCreation(
EventLogger* event_logger,
const std::vector<std::shared_ptr<EventListener>>& listeners,
const FileDescriptor& fd, const TableFileCreationInfo& info);
}; };
} // namespace rocksdb } // namespace rocksdb

@ -253,7 +253,6 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
<< total_num_deletes << "memory_usage" << total_num_deletes << "memory_usage"
<< total_memory_usage; << total_memory_usage;
TableFileCreationInfo info;
{ {
ScopedArenaIterator iter( ScopedArenaIterator iter(
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0], NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
@ -272,8 +271,8 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
earliest_write_conflict_snapshot_, output_compression_, earliest_write_conflict_snapshot_, output_compression_,
cfd_->ioptions()->compression_opts, cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */); Env::IO_HIGH, &table_properties_, 0 /* level */);
info.table_properties = table_properties_;
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);
} }
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
@ -284,21 +283,6 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
meta->fd.GetFileSize(), s.ToString().c_str(), meta->fd.GetFileSize(), s.ToString().c_str(),
meta->marked_for_compaction ? " (needs compaction)" : ""); meta->marked_for_compaction ? " (needs compaction)" : "");
// output to event logger
if (s.ok()) {
info.db_name = dbname_;
info.cf_name = cfd_->GetName();
info.file_path = TableFileName(db_options_.db_paths,
meta->fd.GetNumber(),
meta->fd.GetPathId());
info.file_size = meta->fd.GetFileSize();
info.job_id = job_context_->job_id;
EventHelpers::LogAndNotifyTableFileCreation(
event_logger_, db_options_.listeners,
meta->fd, info);
TEST_SYNC_POINT("FlushJob::LogAndNotifyTableFileCreation()");
}
if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {
output_file_directory_->Fsync(); output_file_directory_->Fsync();
} }

@ -582,6 +582,147 @@ TEST_F(EventListenerTest, CompactionReasonFIFO) {
ASSERT_EQ(compaction_reason, CompactionReason::kFIFOMaxSize); ASSERT_EQ(compaction_reason, CompactionReason::kFIFOMaxSize);
} }
} }
class TableFileCreationListener : public EventListener {
public:
class TestEnv : public EnvWrapper {
public:
TestEnv() : EnvWrapper(Env::Default()) {}
void SetStatus(Status s) { status_ = s; }
Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) {
if (fname.size() > 4 && fname.substr(fname.size() - 4) == ".sst") {
if (!status_.ok()) {
return status_;
}
}
return Env::Default()->NewWritableFile(fname, result, options);
}
private:
Status status_;
};
TableFileCreationListener() {
for (int i = 0; i < 2; i++) {
started_[i] = finished_[i] = failure_[i] = 0;
}
}
int Index(TableFileCreationReason reason) {
int idx;
switch (reason) {
case TableFileCreationReason::kFlush:
idx = 0;
break;
case TableFileCreationReason::kCompaction:
idx = 1;
break;
default:
idx = -1;
}
return idx;
}
void CheckAndResetCounters(int flush_started, int flush_finished,
int flush_failure, int compaction_started,
int compaction_finished, int compaction_failure) {
ASSERT_EQ(started_[0], flush_started);
ASSERT_EQ(finished_[0], flush_finished);
ASSERT_EQ(failure_[0], flush_failure);
ASSERT_EQ(started_[1], compaction_started);
ASSERT_EQ(finished_[1], compaction_finished);
ASSERT_EQ(failure_[1], compaction_failure);
for (int i = 0; i < 2; i++) {
started_[i] = finished_[i] = failure_[i] = 0;
}
}
void OnTableFileCreationStarted(
const TableFileCreationBriefInfo& info) override {
int idx = Index(info.reason);
if (idx >= 0) {
started_[idx]++;
}
ASSERT_GT(info.db_name.size(), 0U);
ASSERT_GT(info.cf_name.size(), 0U);
ASSERT_GT(info.file_path.size(), 0U);
ASSERT_GT(info.job_id, 0);
}
void OnTableFileCreated(const TableFileCreationInfo& info) override {
int idx = Index(info.reason);
if (idx >= 0) {
finished_[idx]++;
}
ASSERT_GT(info.db_name.size(), 0U);
ASSERT_GT(info.cf_name.size(), 0U);
ASSERT_GT(info.file_path.size(), 0U);
ASSERT_GT(info.job_id, 0);
if (info.status.ok()) {
ASSERT_GT(info.table_properties.data_size, 0U);
ASSERT_GT(info.table_properties.raw_key_size, 0U);
ASSERT_GT(info.table_properties.raw_value_size, 0U);
ASSERT_GT(info.table_properties.num_data_blocks, 0U);
ASSERT_GT(info.table_properties.num_entries, 0U);
} else {
if (idx >= 0) {
failure_[idx]++;
}
}
}
TestEnv test_env;
int started_[2];
int finished_[2];
int failure_[2];
};
TEST_F(EventListenerTest, TableFileCreationListenersTest) {
auto listener = std::make_shared<TableFileCreationListener>();
Options options;
options.create_if_missing = true;
options.listeners.push_back(listener);
options.env = &listener->test_env;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "aaa"));
ASSERT_OK(Put("bar", "bbb"));
ASSERT_OK(Flush());
dbfull()->TEST_WaitForFlushMemTable();
listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
ASSERT_OK(Put("foo", "aaa1"));
ASSERT_OK(Put("bar", "bbb1"));
listener->test_env.SetStatus(Status::NotSupported("not supported"));
ASSERT_NOK(Flush());
listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
listener->test_env.SetStatus(Status::OK());
Reopen(options);
ASSERT_OK(Put("foo", "aaa2"));
ASSERT_OK(Put("bar", "bbb2"));
ASSERT_OK(Flush());
dbfull()->TEST_WaitForFlushMemTable();
listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
const Slice kRangeStart = "a";
const Slice kRangeEnd = "z";
dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd);
dbfull()->TEST_WaitForCompact();
listener->CheckAndResetCounters(0, 0, 0, 1, 1, 0);
ASSERT_OK(Put("foo", "aaa3"));
ASSERT_OK(Put("bar", "bbb3"));
ASSERT_OK(Flush());
listener->test_env.SetStatus(Status::NotSupported("not supported"));
dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd);
dbfull()->TEST_WaitForCompact();
listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1);
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -297,7 +297,7 @@ class Repairer {
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
std::string() /* column_family_name */, {}, kMaxSequenceNumber, std::string() /* column_family_name */, {}, kMaxSequenceNumber,
kNoCompression, CompressionOptions(), false, kNoCompression, CompressionOptions(), false,
nullptr /* internal_stats */); nullptr /* internal_stats */, TableFileCreationReason::kRecovery);
} }
delete mem->Unref(); delete mem->Unref();
delete cf_mems_default; delete cf_mems_default;

@ -21,23 +21,36 @@ class DB;
class Status; class Status;
struct CompactionJobStats; struct CompactionJobStats;
struct TableFileCreationInfo { enum class TableFileCreationReason {
TableFileCreationInfo() = default; kFlush,
explicit TableFileCreationInfo(TableProperties&& prop) : kCompaction,
table_properties(prop) {} kRecovery,
};
struct TableFileCreationBriefInfo {
// the name of the database where the file was created // the name of the database where the file was created
std::string db_name; std::string db_name;
// the name of the column family where the file was created. // the name of the column family where the file was created.
std::string cf_name; std::string cf_name;
// the path to the created file. // the path to the created file.
std::string file_path; std::string file_path;
// the size of the file.
uint64_t file_size;
// the id of the job (which could be flush or compaction) that // the id of the job (which could be flush or compaction) that
// created the file. // created the file.
int job_id; int job_id;
// reason of creating the table.
TableFileCreationReason reason;
};
struct TableFileCreationInfo : public TableFileCreationBriefInfo {
TableFileCreationInfo() = default;
explicit TableFileCreationInfo(TableProperties&& prop)
: table_properties(prop) {}
// the size of the file.
uint64_t file_size;
// Detailed properties of the created file. // Detailed properties of the created file.
TableProperties table_properties; TableProperties table_properties;
// The status indicating whether the creation was successful or not.
Status status;
}; };
enum class CompactionReason { enum class CompactionReason {
@ -212,11 +225,25 @@ class EventListener {
// on file creations and deletions is suggested to implement // on file creations and deletions is suggested to implement
// OnFlushCompleted and OnCompactionCompleted. // OnFlushCompleted and OnCompactionCompleted.
// //
// Historically it will only be called if the file is successfully created.
// Now it will also be called on failure case. User can check info.status
// to see if it succeeded or not.
//
// Note that if applications would like to use the passed reference // Note that if applications would like to use the passed reference
// outside this function call, they should make copies from these // outside this function call, they should make copies from these
// returned value. // returned value.
virtual void OnTableFileCreated(const TableFileCreationInfo& /*info*/) {} virtual void OnTableFileCreated(const TableFileCreationInfo& /*info*/) {}
// A call-back function for RocksDB which will be called before
// a SST file is being created. It will follow by OnTableFileCreated after
// the creation finishes.
//
// Note that if applications would like to use the passed reference
// outside this function call, they should make copies from these
// returned value.
virtual void OnTableFileCreationStarted(
const TableFileCreationBriefInfo& /*info*/) {}
virtual ~EventListener() {} virtual ~EventListener() {}
}; };

Loading…
Cancel
Save