Make EventListener into a Customizable Class (#8473)

Summary:
- Added Type/CreateFromString
- Added ability to load EventListeners to DBOptions
- Since EventListeners did not previously have a Name(), defaulted to "".  If there is no name, the listener cannot be loaded from the ObjectRegistry.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8473

Reviewed By: zhichao-cao

Differential Revision: D29901488

Pulled By: mrambacher

fbshipit-source-id: 2d3a4aa6db1562ac03e7ad41b360e3521d486254
main
mrambacher 4 years ago committed by Facebook GitHub Bot
parent 9ddb55a8f6
commit 3aee4fbd41
  1. 4
      HISTORY.md
  2. 4
      db/db_test_util.h
  3. 11
      db/event_helpers.cc
  4. 4
      db_stress_tool/db_stress_listener.h
  5. 23
      fuzz/db_fuzzer.cc
  6. 12
      fuzz/db_map_fuzzer.cc
  7. 12
      include/rocksdb/listener.h
  8. 10
      options/customizable.cc
  9. 35
      options/customizable_test.cc
  10. 63
      options/db_options.cc
  11. 68
      options/options_test.cc
  12. 3
      tools/db_bench_tool.cc
  13. 5
      utilities/blob_db/blob_db_listener.h
  14. 5
      utilities/transactions/lock/range/range_tree/lib/portability/toku_external_pthread.h

@ -2,7 +2,9 @@
## Unreleased
### Bug Fixes
* If the primary's CURRENT file is missing or inaccessible, the secondary instance should not hang repeatedly trying to switch to a new MANIFEST. It should instead return the error code encountered while accessing the file.
### New Features
* Made the EventListener extend the Customizable class.
* EventListeners that have a non-empty Name() and that are registered with the ObjectRegistry can now be serialized to/from the OPTIONS file.
## 6.23.0 (2021-07-16)
### Behavior Changes
* Obsolete keys in the bottommost level that were preserved for a snapshot will now be cleaned upon snapshot release in all cases. This form of compaction (snapshot release triggered compaction) previously had an artificial limitation that multiple tombstones needed to be present.

@ -764,6 +764,8 @@ class SpecialEnv : public EnvWrapper {
class OnFileDeletionListener : public EventListener {
public:
OnFileDeletionListener() : matched_count_(0), expected_file_name_("") {}
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "OnFileDeletionListener"; }
void SetExpectedFileName(const std::string file_name) {
expected_file_name_ = file_name;
@ -788,6 +790,8 @@ class OnFileDeletionListener : public EventListener {
class FlushCounterListener : public EventListener {
public:
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "FlushCounterListener"; }
std::atomic<int> count{0};
std::atomic<FlushReason> expected_flush_reason{FlushReason::kOthers};

@ -5,7 +5,18 @@
#include "db/event_helpers.h"
#include "rocksdb/convenience.h"
#include "rocksdb/listener.h"
#include "rocksdb/utilities/customizable_util.h"
namespace ROCKSDB_NAMESPACE {
#ifndef ROCKSDB_LITE
Status EventListener::CreateFromString(const ConfigOptions& config_options,
const std::string& id,
std::shared_ptr<EventListener>* result) {
return LoadSharedObject<EventListener>(config_options, id, nullptr, result);
}
#endif // ROCKSDB_LITE
namespace {
template <class T>

@ -21,7 +21,11 @@ class DbStressListener : public EventListener {
db_paths_(db_paths),
column_families_(column_families),
num_pending_file_creations_(0) {}
#ifndef ROCKSDB_LITE
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "DBStressListener"; }
~DbStressListener() override { assert(num_pending_file_creations_ == 0); }
void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
assert(IsValidColumnFamilyName(info.cf_name));

@ -28,7 +28,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
ROCKSDB_NAMESPACE::DB* db;
ROCKSDB_NAMESPACE::Options options;
options.create_if_missing = true;
ROCKSDB_NAMESPACE::Status status = ROCKSDB_NAMESPACE::DB::Open(options, db_path, &db);
ROCKSDB_NAMESPACE::Status status =
ROCKSDB_NAMESPACE::DB::Open(options, db_path, &db);
if (!status.ok()) {
return 0;
}
@ -64,7 +65,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
break;
}
case kIterator: {
ROCKSDB_NAMESPACE::Iterator* it = db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions());
ROCKSDB_NAMESPACE::Iterator* it =
db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
}
delete it;
@ -92,8 +94,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
case kColumn: {
ROCKSDB_NAMESPACE::ColumnFamilyHandle* cf;
ROCKSDB_NAMESPACE::Status s;
s = db->CreateColumnFamily(ROCKSDB_NAMESPACE::ColumnFamilyOptions(), "new_cf",
&cf);
s = db->CreateColumnFamily(ROCKSDB_NAMESPACE::ColumnFamilyOptions(),
"new_cf", &cf);
s = db->DestroyColumnFamilyHandle(cf);
db->Close();
delete db;
@ -102,21 +104,24 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
std::vector<ROCKSDB_NAMESPACE::ColumnFamilyDescriptor> column_families;
// have to open default column family
column_families.push_back(ROCKSDB_NAMESPACE::ColumnFamilyDescriptor(
ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, ROCKSDB_NAMESPACE::ColumnFamilyOptions()));
ROCKSDB_NAMESPACE::kDefaultColumnFamilyName,
ROCKSDB_NAMESPACE::ColumnFamilyOptions()));
// open the new one, too
column_families.push_back(ROCKSDB_NAMESPACE::ColumnFamilyDescriptor(
"new_cf", ROCKSDB_NAMESPACE::ColumnFamilyOptions()));
std::vector<ROCKSDB_NAMESPACE::ColumnFamilyHandle*> handles;
s = ROCKSDB_NAMESPACE::DB::Open(ROCKSDB_NAMESPACE::DBOptions(), db_path, column_families,
&handles, &db);
s = ROCKSDB_NAMESPACE::DB::Open(ROCKSDB_NAMESPACE::DBOptions(), db_path,
column_families, &handles, &db);
if (s.ok()) {
std::string key1 = fuzzed_data.ConsumeRandomLengthString();
std::string val1 = fuzzed_data.ConsumeRandomLengthString();
std::string key2 = fuzzed_data.ConsumeRandomLengthString();
s = db->Put(ROCKSDB_NAMESPACE::WriteOptions(), handles[1], key1, val1);
s = db->Put(ROCKSDB_NAMESPACE::WriteOptions(), handles[1], key1,
val1);
std::string value;
s = db->Get(ROCKSDB_NAMESPACE::ReadOptions(), handles[1], key2, &value);
s = db->Get(ROCKSDB_NAMESPACE::ReadOptions(), handles[1], key2,
&value);
s = db->DropColumnFamily(handles[1]);
for (auto handle : handles) {
s = db->DestroyColumnFamilyHandle(handle);

@ -16,7 +16,8 @@
protobuf_mutator::libfuzzer::PostProcessorRegistration<DBOperations> reg = {
[](DBOperations* input, unsigned int /* seed */) {
const ROCKSDB_NAMESPACE::Comparator* comparator = ROCKSDB_NAMESPACE::BytewiseComparator();
const ROCKSDB_NAMESPACE::Comparator* comparator =
ROCKSDB_NAMESPACE::BytewiseComparator();
auto ops = input->mutable_operations();
// Make sure begin <= end for DELETE_RANGE.
for (DBOperation& op : *ops) {
@ -42,7 +43,8 @@ DEFINE_PROTO_FUZZER(DBOperations& input) {
const std::string kDbPath = "/tmp/db_map_fuzzer_test";
auto fs = ROCKSDB_NAMESPACE::FileSystem::Default();
if (fs->FileExists(kDbPath, ROCKSDB_NAMESPACE::IOOptions(), /*dbg=*/nullptr).ok()) {
if (fs->FileExists(kDbPath, ROCKSDB_NAMESPACE::IOOptions(), /*dbg=*/nullptr)
.ok()) {
std::cerr << "db path " << kDbPath << " already exists" << std::endl;
abort();
}
@ -56,7 +58,8 @@ DEFINE_PROTO_FUZZER(DBOperations& input) {
for (const DBOperation& op : input.operations()) {
switch (op.type()) {
case OpType::PUT: {
CHECK_OK(db->Put(ROCKSDB_NAMESPACE::WriteOptions(), op.key(), op.value()));
CHECK_OK(
db->Put(ROCKSDB_NAMESPACE::WriteOptions(), op.key(), op.value()));
kv[op.key()] = op.value();
break;
}
@ -88,7 +91,8 @@ DEFINE_PROTO_FUZZER(DBOperations& input) {
CHECK_OK(ROCKSDB_NAMESPACE::DB::Open(options, kDbPath, &db));
auto kv_it = kv.begin();
ROCKSDB_NAMESPACE::Iterator* it = db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions());
ROCKSDB_NAMESPACE::Iterator* it =
db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next(), kv_it++) {
CHECK_TRUE(kv_it != kv.end());
CHECK_EQ(it->key().ToString(), kv_it->first);

@ -14,6 +14,7 @@
#include "rocksdb/compaction_job_stats.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/customizable.h"
#include "rocksdb/status.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/types.h"
@ -355,8 +356,17 @@ struct ExternalFileIngestionInfo {
// the current thread holding any DB mutex. This is to prevent potential
// deadlock and performance issue when using EventListener callback
// in a complex way.
class EventListener {
class EventListener : public Customizable {
public:
static const char* Type() { return "EventListener"; }
static Status CreateFromString(const ConfigOptions& options,
const std::string& id,
std::shared_ptr<EventListener>* result);
const char* Name() const override {
// Since EventListeners did not have a name previously, we will assume
// an empty name. Instances should override this method.
return "";
}
// A callback function to RocksDB which will be called whenever a
// registered RocksDB flushes a file. The default implementation is
// no-op.

@ -41,14 +41,16 @@ std::string Customizable::SerializeOptions(const ConfigOptions& config_options,
const std::string& prefix) const {
std::string result;
std::string parent;
if (!config_options.IsShallow()) {
std::string id = GetId();
if (!config_options.IsShallow() && !id.empty()) {
parent = Configurable::SerializeOptions(config_options, "");
}
if (parent.empty()) {
result = GetId();
result = id;
} else {
result.append(prefix + ConfigurableHelper::kIdPropName + "=" + GetId() +
config_options.delimiter);
result.append(prefix);
result.append(ConfigurableHelper::kIdPropName).append("=");
result.append(id).append(config_options.delimiter);
result.append(parent);
}
return result;

@ -14,7 +14,7 @@
#include <cstring>
#include <unordered_map>
#include "options/configurable_helper.h"
#include "db/db_test_util.h"
#include "options/options_helper.h"
#include "options/options_parser.h"
#include "rocksdb/convenience.h"
@ -922,6 +922,20 @@ static int RegisterTestObjects(ObjectLibrary& library,
guard->reset(new mock::MockTableFactory());
return guard->get();
});
library.Register<EventListener>(
OnFileDeletionListener::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<EventListener>* guard,
std::string* /* errmsg */) {
guard->reset(new OnFileDeletionListener());
return guard->get();
});
library.Register<EventListener>(
FlushCounterListener::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<EventListener>* guard,
std::string* /* errmsg */) {
guard->reset(new FlushCounterListener());
return guard->get();
});
library.Register<const Comparator>(
test::SimpleSuffixReverseComparator::kClassName(),
[](const std::string& /*uri*/,
@ -1123,6 +1137,25 @@ TEST_F(LoadCustomizableTest, LoadComparatorTest) {
}
#ifndef ROCKSDB_LITE
TEST_F(LoadCustomizableTest, LoadEventListenerTest) {
std::shared_ptr<EventListener> result;
ASSERT_NOK(EventListener::CreateFromString(
config_options_, OnFileDeletionListener::kClassName(), &result));
ASSERT_NOK(EventListener::CreateFromString(
config_options_, FlushCounterListener::kClassName(), &result));
if (RegisterTests("Test")) {
ASSERT_OK(EventListener::CreateFromString(
config_options_, OnFileDeletionListener::kClassName(), &result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), OnFileDeletionListener::kClassName());
ASSERT_OK(EventListener::CreateFromString(
config_options_, FlushCounterListener::kClassName(), &result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), FlushCounterListener::kClassName());
}
}
TEST_F(LoadCustomizableTest, LoadEncryptionProviderTest) {
std::shared_ptr<EncryptionProvider> result;
ASSERT_NOK(

@ -15,6 +15,7 @@
#include "rocksdb/configurable.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "rocksdb/listener.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/system_clock.h"
@ -141,7 +142,6 @@ static std::unordered_map<std::string, OptionTypeInfo>
std::shared_ptr<RateLimiter> rate_limiter;
std::shared_ptr<Statistics> statistics;
std::vector<DbPath> db_paths;
std::vector<std::shared_ptr<EventListener>> listeners;
FileTypeSet checksum_handoff_file_types;
*/
{"advise_random_on_open",
@ -446,6 +446,67 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct ImmutableDBOptions, allow_data_in_errors),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
// Allow EventListeners that have a non-empty Name() to be read/written
// as options Each listener will either be
// - A simple name (e.g. "MyEventListener")
// - A name with properties (e.g. "{id=MyListener1; timeout=60}"
// Multiple listeners will be separated by a ":":
// - "MyListener0;{id=MyListener1; timeout=60}
{"listeners",
{offsetof(struct ImmutableDBOptions, listeners), OptionType::kVector,
OptionVerificationType::kByNameAllowNull,
OptionTypeFlags::kCompareNever,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
ConfigOptions embedded = opts;
embedded.ignore_unsupported_options = true;
std::vector<std::shared_ptr<EventListener>> listeners;
Status s;
for (size_t start = 0, end = 0;
s.ok() && start < value.size() && end != std::string::npos;
start = end + 1) {
std::string token;
s = OptionTypeInfo::NextToken(value, ':', start, &end, &token);
if (s.ok() && !token.empty()) {
std::shared_ptr<EventListener> listener;
s = EventListener::CreateFromString(embedded, token, &listener);
if (s.ok() && listener != nullptr) {
listeners.push_back(listener);
}
}
}
if (s.ok()) { // It worked
*(static_cast<std::vector<std::shared_ptr<EventListener>>*>(
addr)) = listeners;
}
return s;
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto listeners =
static_cast<const std::vector<std::shared_ptr<EventListener>>*>(
addr);
ConfigOptions embedded = opts;
embedded.delimiter = ";";
int printed = 0;
for (const auto& listener : *listeners) {
auto id = listener->GetId();
if (!id.empty()) {
std::string elem_str = listener->ToString(embedded, "");
if (printed++ == 0) {
value->append("{");
} else {
value->append(":");
}
value->append(elem_str);
}
}
if (printed > 0) {
value->append("}");
}
return Status::OK();
},
nullptr}},
};
const std::string OptionsHelper::kDBOptionsName = "DBOptions";

@ -1864,6 +1864,74 @@ TEST_F(OptionsTest, ConvertOptionsTest) {
leveldb_opt.block_restart_interval);
ASSERT_EQ(table_opt->filter_policy.get(), leveldb_opt.filter_policy);
}
#ifndef ROCKSDB_LITE
class TestEventListener : public EventListener {
private:
std::string id_;
public:
explicit TestEventListener(const std::string& id) : id_("Test" + id) {}
const char* Name() const override { return id_.c_str(); }
};
static std::unordered_map<std::string, OptionTypeInfo>
test_listener_option_info = {
{"s",
{0, OptionType::kString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
class TestConfigEventListener : public TestEventListener {
private:
std::string s_;
public:
explicit TestConfigEventListener(const std::string& id)
: TestEventListener("Config" + id) {
s_ = id;
RegisterOptions("Test", &s_, &test_listener_option_info);
}
};
static int RegisterTestEventListener(ObjectLibrary& library,
const std::string& arg) {
library.Register<EventListener>(
"Test" + arg,
[](const std::string& name, std::unique_ptr<EventListener>* guard,
std::string* /* errmsg */) {
guard->reset(new TestEventListener(name.substr(4)));
return guard->get();
});
library.Register<EventListener>(
"TestConfig" + arg,
[](const std::string& name, std::unique_ptr<EventListener>* guard,
std::string* /* errmsg */) {
guard->reset(new TestConfigEventListener(name.substr(10)));
return guard->get();
});
return 1;
}
TEST_F(OptionsTest, OptionsListenerTest) {
DBOptions orig, copy;
orig.listeners.push_back(std::make_shared<TestEventListener>("1"));
orig.listeners.push_back(std::make_shared<TestEventListener>("2"));
orig.listeners.push_back(std::make_shared<TestEventListener>(""));
orig.listeners.push_back(std::make_shared<TestConfigEventListener>("1"));
orig.listeners.push_back(std::make_shared<TestConfigEventListener>("2"));
orig.listeners.push_back(std::make_shared<TestConfigEventListener>(""));
ConfigOptions config_opts(orig);
config_opts.registry->AddLibrary("listener", RegisterTestEventListener, "1");
std::string opts_str;
ASSERT_OK(GetStringFromDBOptions(config_opts, orig, &opts_str));
ASSERT_OK(GetDBOptionsFromString(config_opts, orig, opts_str, &copy));
ASSERT_OK(GetStringFromDBOptions(config_opts, copy, &opts_str));
ASSERT_EQ(
copy.listeners.size(),
2); // The Test{Config}1 Listeners could be loaded but not the others
ASSERT_OK(RocksDBOptionsParser::VerifyDBOptions(config_opts, orig, copy));
}
#endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE
const static std::string kCustomEnvName = "Custom";

@ -2582,6 +2582,9 @@ class Benchmark {
~ErrorHandlerListener() override {}
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "ErrorHandlerListener"; }
void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
Status /*bg_error*/,
bool* auto_recovery) override {

@ -37,6 +37,9 @@ class BlobDBListener : public EventListener {
blob_db_impl_->UpdateLiveSSTSize();
}
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "BlobDBListener"; }
protected:
BlobDBImpl* blob_db_impl_;
};
@ -46,6 +49,8 @@ class BlobDBListenerGC : public BlobDBListener {
explicit BlobDBListenerGC(BlobDBImpl* blob_db_impl)
: BlobDBListener(blob_db_impl) {}
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "BlobDBListenerGC"; }
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
BlobDBListener::OnFlushCompleted(db, info);

@ -1,6 +1,7 @@
/*
A wrapper around ROCKSDB_NAMESPACE::TransactionDBMutexFactory-provided condition and
mutex that provides toku_pthread_*-like interface. The functions are named
A wrapper around ROCKSDB_NAMESPACE::TransactionDBMutexFactory-provided
condition and mutex that provides toku_pthread_*-like interface. The functions
are named
toku_external_{mutex|cond}_XXX

Loading…
Cancel
Save