[API Change] Move listeners from ColumnFamilyOptions to DBOptions

Summary: Move listeners from ColumnFamilyOptions to DBOptions

Test Plan:
listener_test
compact_files_test

Reviewers: rven, anthony, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D39087
main
Yueh-Hsuan Chiang 10 years ago
parent 3ab8ffd4dd
commit 672dda9b3b
  1. 1
      HISTORY.md
  2. 45
      db/column_family.cc
  3. 7
      db/column_family.h
  4. 42
      db/db_impl.cc
  5. 12
      include/rocksdb/options.h
  6. 22
      util/options.cc

@ -2,6 +2,7 @@
## Public API changes ## Public API changes
* DB::GetDbIdentity() is now a const function. If this function is overridden in your application, be sure to also make GetDbIdentity() const to avoid compile error. * DB::GetDbIdentity() is now a const function. If this function is overridden in your application, be sure to also make GetDbIdentity() const to avoid compile error.
* Move listeners from ColumnFamilyOptions to DBOptions.
## 3.11.0 (5/19/2015) ## 3.11.0 (5/19/2015)

@ -634,51 +634,6 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
return false; return false;
} }
void ColumnFamilyData::NotifyOnCompactionCompleted(
DB* db, Compaction* c, const Status& status) {
#ifndef ROCKSDB_LITE
auto listeners = ioptions()->listeners;
assert(listeners.size() > 0U);
CompactionJobInfo info;
info.cf_name = c->column_family_data()->GetName();
info.status = status;
info.output_level = c->output_level();
for (size_t i = 0; i < c->num_input_levels(); ++i) {
for (const auto fmd : *c->inputs(i)) {
info.input_files.push_back(
TableFileName(options_.db_paths,
fmd->fd.GetNumber(),
fmd->fd.GetPathId()));
}
}
for (const auto newf : c->edit()->GetNewFiles()) {
info.output_files.push_back(
TableFileName(options_.db_paths,
newf.second.fd.GetNumber(),
newf.second.fd.GetPathId()));
}
for (auto listener : listeners) {
listener->OnCompactionCompleted(db, info);
}
#endif // ROCKSDB_LITE
}
void ColumnFamilyData::NotifyOnFlushCompleted(
DB* db, const std::string& file_path,
bool triggered_flush_slowdown,
bool triggered_flush_stop) {
#ifndef ROCKSDB_LITE
auto listeners = ioptions()->listeners;
for (auto listener : listeners) {
listener->OnFlushCompleted(
db, GetName(), file_path,
// Use path 0 as fulled memtables are first flushed into path 0.
triggered_flush_slowdown, triggered_flush_stop);
}
#endif // ROCKSDB_LITE
}
SuperVersion* ColumnFamilyData::InstallSuperVersion( SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, InstrumentedMutex* db_mutex) { SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
db_mutex->AssertHeld(); db_mutex->AssertHeld();

@ -292,13 +292,6 @@ class ColumnFamilyData {
void ResetThreadLocalSuperVersions(); void ResetThreadLocalSuperVersions();
void NotifyOnCompactionCompleted(DB* db, Compaction* c, const Status& status);
void NotifyOnFlushCompleted(
DB* db, const std::string& file_path,
bool triggered_flush_slowdown,
bool triggered_flush_stop);
// Protected by DB mutex // Protected by DB mutex
void set_pending_flush(bool value) { pending_flush_ = value; } void set_pending_flush(bool value) { pending_flush_ = value; }
void set_pending_compaction(bool value) { pending_compaction_ = value; } void set_pending_compaction(bool value) { pending_compaction_ = value; }

@ -1255,7 +1255,7 @@ void DBImpl::NotifyOnFlushCompleted(
ColumnFamilyData* cfd, uint64_t file_number, ColumnFamilyData* cfd, uint64_t file_number,
const MutableCFOptions& mutable_cf_options) { const MutableCFOptions& mutable_cf_options) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (cfd->ioptions()->listeners.size() == 0U) { if (db_options_.listeners.size() == 0U) {
return; return;
} }
mutex_.AssertHeld(); mutex_.AssertHeld();
@ -1271,11 +1271,17 @@ void DBImpl::NotifyOnFlushCompleted(
notifying_events_++; notifying_events_++;
// release lock while notifying events // release lock while notifying events
mutex_.Unlock(); mutex_.Unlock();
{
// TODO(yhchiang): make db_paths dynamic. // TODO(yhchiang): make db_paths dynamic.
cfd->NotifyOnFlushCompleted( auto file_path = MakeTableFileName(db_options_.db_paths[0].path,
this, MakeTableFileName(db_options_.db_paths[0].path, file_number), file_number);
triggered_flush_slowdown, for (auto listener : db_options_.listeners) {
triggered_flush_stop); listener->OnFlushCompleted(
this, cfd->GetName(), file_path,
// Use path 0 as fulled memtables are first flushed into path 0.
triggered_flush_slowdown, triggered_flush_stop);
}
}
mutex_.Lock(); mutex_.Lock();
notifying_events_--; notifying_events_--;
assert(notifying_events_ >= 0); assert(notifying_events_ >= 0);
@ -1540,7 +1546,7 @@ Status DBImpl::CompactFilesImpl(
void DBImpl::NotifyOnCompactionCompleted( void DBImpl::NotifyOnCompactionCompleted(
ColumnFamilyData* cfd, Compaction *c, const Status &st) { ColumnFamilyData* cfd, Compaction *c, const Status &st) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (cfd->ioptions()->listeners.size() == 0U) { if (db_options_.listeners.size() == 0U) {
return; return;
} }
mutex_.AssertHeld(); mutex_.AssertHeld();
@ -1550,7 +1556,29 @@ void DBImpl::NotifyOnCompactionCompleted(
notifying_events_++; notifying_events_++;
// release lock while notifying events // release lock while notifying events
mutex_.Unlock(); mutex_.Unlock();
cfd->NotifyOnCompactionCompleted(this, c, st); {
CompactionJobInfo info;
info.cf_name = cfd->GetName();
info.status = st;
info.output_level = c->output_level();
for (size_t i = 0; i < c->num_input_levels(); ++i) {
for (const auto fmd : *c->inputs(i)) {
info.input_files.push_back(
TableFileName(db_options_.db_paths,
fmd->fd.GetNumber(),
fmd->fd.GetPathId()));
}
}
for (const auto newf : c->edit()->GetNewFiles()) {
info.output_files.push_back(
TableFileName(db_options_.db_paths,
newf.second.fd.GetNumber(),
newf.second.fd.GetPathId()));
}
for (auto listener : db_options_.listeners) {
listener->OnCompactionCompleted(this, info);
}
}
mutex_.Lock(); mutex_.Lock();
notifying_events_--; notifying_events_--;
assert(notifying_events_ >= 0); assert(notifying_events_ >= 0);

@ -709,12 +709,6 @@ struct ColumnFamilyOptions {
// Default: false // Default: false
bool paranoid_file_checks; bool paranoid_file_checks;
#ifndef ROCKSDB_LITE
// A vector of EventListeners which call-back functions will be called
// when specific RocksDB event happens.
std::vector<std::shared_ptr<EventListener>> listeners;
#endif // ROCKSDB_LITE
// Create ColumnFamilyOptions with default values for all fields // Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions(); ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options // Create ColumnFamilyOptions from Options
@ -1009,6 +1003,12 @@ struct DBOptions {
// Default: 0, turned off // Default: 0, turned off
uint64_t wal_bytes_per_sync; uint64_t wal_bytes_per_sync;
#ifndef ROCKSDB_LITE
// A vector of EventListeners which call-back functions will be called
// when specific RocksDB event happens.
std::vector<std::shared_ptr<EventListener>> listeners;
#endif // ROCKSDB_LITE
// If true, then the status of the threads involved in this DB will // If true, then the status of the threads involved in this DB will
// be tracked and available via GetThreadList() API. // be tracked and available via GetThreadList() API.
// //

@ -129,13 +129,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
max_successive_merges(0), max_successive_merges(0),
min_partial_merge_operands(2), min_partial_merge_operands(2),
optimize_filters_for_hits(false), optimize_filters_for_hits(false),
paranoid_file_checks(false) paranoid_file_checks(false) {
#ifndef ROCKSDB_LITE
,
listeners() {
#else // ROCKSDB_LITE
{
#endif // ROCKSDB_LITE
assert(memtable_factory.get() != nullptr); assert(memtable_factory.get() != nullptr);
} }
@ -199,13 +193,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
max_successive_merges(options.max_successive_merges), max_successive_merges(options.max_successive_merges),
min_partial_merge_operands(options.min_partial_merge_operands), min_partial_merge_operands(options.min_partial_merge_operands),
optimize_filters_for_hits(options.optimize_filters_for_hits), optimize_filters_for_hits(options.optimize_filters_for_hits),
paranoid_file_checks(options.paranoid_file_checks) paranoid_file_checks(options.paranoid_file_checks) {
#ifndef ROCKSDB_LITE
,
listeners(options.listeners) {
#else // ROCKSDB_LITE
{
#endif // ROCKSDB_LITE
assert(memtable_factory.get() != nullptr); assert(memtable_factory.get() != nullptr);
if (max_bytes_for_level_multiplier_additional.size() < if (max_bytes_for_level_multiplier_additional.size() <
static_cast<unsigned int>(num_levels)) { static_cast<unsigned int>(num_levels)) {
@ -256,6 +244,9 @@ DBOptions::DBOptions()
use_adaptive_mutex(false), use_adaptive_mutex(false),
bytes_per_sync(0), bytes_per_sync(0),
wal_bytes_per_sync(0), wal_bytes_per_sync(0),
#ifndef ROCKSDB_LITE
listeners(),
#endif
enable_thread_tracking(false) { enable_thread_tracking(false) {
} }
@ -300,6 +291,9 @@ DBOptions::DBOptions(const Options& options)
use_adaptive_mutex(options.use_adaptive_mutex), use_adaptive_mutex(options.use_adaptive_mutex),
bytes_per_sync(options.bytes_per_sync), bytes_per_sync(options.bytes_per_sync),
wal_bytes_per_sync(options.wal_bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync),
#ifndef ROCKSDB_LITE
listeners(options.listeners),
#endif
enable_thread_tracking(options.enable_thread_tracking) {} enable_thread_tracking(options.enable_thread_tracking) {}
static const char* const access_hints[] = { static const char* const access_hints[] = {

Loading…
Cancel
Save