From 3aee4fbd41bd05799feb9da988698ab21b6a5f32 Mon Sep 17 00:00:00 2001 From: mrambacher Date: Tue, 27 Jul 2021 07:46:09 -0700 Subject: [PATCH] Make EventListener into a Customizable Class (#8473) Summary: - Added Type/CreateFromString - Added ability to load EventListeners to DBOptions - Since EventListeners did not previously have a Name(), defaulted to "". If there is no name, the listener cannot be loaded from the ObjectRegistry. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8473 Reviewed By: zhichao-cao Differential Revision: D29901488 Pulled By: mrambacher fbshipit-source-id: 2d3a4aa6db1562ac03e7ad41b360e3521d486254 --- HISTORY.md | 4 +- db/db_test_util.h | 4 ++ db/event_helpers.cc | 11 +++ db_stress_tool/db_stress_listener.h | 4 ++ fuzz/db_fuzzer.cc | 23 ++++--- fuzz/db_map_fuzzer.cc | 12 ++-- include/rocksdb/listener.h | 12 +++- options/customizable.cc | 10 +-- options/customizable_test.cc | 35 +++++++++- options/db_options.cc | 63 ++++++++++++++++- options/options_test.cc | 68 +++++++++++++++++++ tools/db_bench_tool.cc | 3 + utilities/blob_db/blob_db_listener.h | 5 ++ .../lib/portability/toku_external_pthread.h | 5 +- 14 files changed, 236 insertions(+), 23 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 8cdb6d118..2b7ba7b1a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,7 +2,9 @@ ## Unreleased ### Bug Fixes * If the primary's CURRENT file is missing or inaccessible, the secondary instance should not hang repeatedly trying to switch to a new MANIFEST. It should instead return the error code encountered while accessing the file. - +### New Features +* Made the EventListener extend the Customizable class. +* EventListeners that have a non-empty Name() and that are registered with the ObjectRegistry can now be serialized to/from the OPTIONS file. ## 6.23.0 (2021-07-16) ### Behavior Changes * Obsolete keys in the bottommost level that were preserved for a snapshot will now be cleaned upon snapshot release in all cases. This form of compaction (snapshot release triggered compaction) previously had an artificial limitation that multiple tombstones needed to be present. diff --git a/db/db_test_util.h b/db/db_test_util.h index 0086ca821..f2f56e890 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -764,6 +764,8 @@ class SpecialEnv : public EnvWrapper { class OnFileDeletionListener : public EventListener { public: OnFileDeletionListener() : matched_count_(0), expected_file_name_("") {} + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "OnFileDeletionListener"; } void SetExpectedFileName(const std::string file_name) { expected_file_name_ = file_name; @@ -788,6 +790,8 @@ class OnFileDeletionListener : public EventListener { class FlushCounterListener : public EventListener { public: + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "FlushCounterListener"; } std::atomic count{0}; std::atomic expected_flush_reason{FlushReason::kOthers}; diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 6164dde29..25b239019 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -5,7 +5,18 @@ #include "db/event_helpers.h" +#include "rocksdb/convenience.h" +#include "rocksdb/listener.h" +#include "rocksdb/utilities/customizable_util.h" + namespace ROCKSDB_NAMESPACE { +#ifndef ROCKSDB_LITE +Status EventListener::CreateFromString(const ConfigOptions& config_options, + const std::string& id, + std::shared_ptr* result) { + return LoadSharedObject(config_options, id, nullptr, result); +} +#endif // ROCKSDB_LITE namespace { template diff --git a/db_stress_tool/db_stress_listener.h b/db_stress_tool/db_stress_listener.h index bda4ec131..6985a7742 100644 --- a/db_stress_tool/db_stress_listener.h +++ b/db_stress_tool/db_stress_listener.h @@ -21,7 +21,11 @@ class DbStressListener : public EventListener { db_paths_(db_paths), column_families_(column_families), num_pending_file_creations_(0) {} + #ifndef ROCKSDB_LITE + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "DBStressListener"; } + ~DbStressListener() override { assert(num_pending_file_creations_ == 0); } void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { assert(IsValidColumnFamilyName(info.cf_name)); diff --git a/fuzz/db_fuzzer.cc b/fuzz/db_fuzzer.cc index 4a4e1fe3f..3881cab26 100644 --- a/fuzz/db_fuzzer.cc +++ b/fuzz/db_fuzzer.cc @@ -28,7 +28,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { ROCKSDB_NAMESPACE::DB* db; ROCKSDB_NAMESPACE::Options options; options.create_if_missing = true; - ROCKSDB_NAMESPACE::Status status = ROCKSDB_NAMESPACE::DB::Open(options, db_path, &db); + ROCKSDB_NAMESPACE::Status status = + ROCKSDB_NAMESPACE::DB::Open(options, db_path, &db); if (!status.ok()) { return 0; } @@ -64,7 +65,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { break; } case kIterator: { - ROCKSDB_NAMESPACE::Iterator* it = db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions()); + ROCKSDB_NAMESPACE::Iterator* it = + db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { } delete it; @@ -92,8 +94,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { case kColumn: { ROCKSDB_NAMESPACE::ColumnFamilyHandle* cf; ROCKSDB_NAMESPACE::Status s; - s = db->CreateColumnFamily(ROCKSDB_NAMESPACE::ColumnFamilyOptions(), "new_cf", - &cf); + s = db->CreateColumnFamily(ROCKSDB_NAMESPACE::ColumnFamilyOptions(), + "new_cf", &cf); s = db->DestroyColumnFamilyHandle(cf); db->Close(); delete db; @@ -102,21 +104,24 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { std::vector column_families; // have to open default column family column_families.push_back(ROCKSDB_NAMESPACE::ColumnFamilyDescriptor( - ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, ROCKSDB_NAMESPACE::ColumnFamilyOptions())); + ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, + ROCKSDB_NAMESPACE::ColumnFamilyOptions())); // open the new one, too column_families.push_back(ROCKSDB_NAMESPACE::ColumnFamilyDescriptor( "new_cf", ROCKSDB_NAMESPACE::ColumnFamilyOptions())); std::vector handles; - s = ROCKSDB_NAMESPACE::DB::Open(ROCKSDB_NAMESPACE::DBOptions(), db_path, column_families, - &handles, &db); + s = ROCKSDB_NAMESPACE::DB::Open(ROCKSDB_NAMESPACE::DBOptions(), db_path, + column_families, &handles, &db); if (s.ok()) { std::string key1 = fuzzed_data.ConsumeRandomLengthString(); std::string val1 = fuzzed_data.ConsumeRandomLengthString(); std::string key2 = fuzzed_data.ConsumeRandomLengthString(); - s = db->Put(ROCKSDB_NAMESPACE::WriteOptions(), handles[1], key1, val1); + s = db->Put(ROCKSDB_NAMESPACE::WriteOptions(), handles[1], key1, + val1); std::string value; - s = db->Get(ROCKSDB_NAMESPACE::ReadOptions(), handles[1], key2, &value); + s = db->Get(ROCKSDB_NAMESPACE::ReadOptions(), handles[1], key2, + &value); s = db->DropColumnFamily(handles[1]); for (auto handle : handles) { s = db->DestroyColumnFamilyHandle(handle); diff --git a/fuzz/db_map_fuzzer.cc b/fuzz/db_map_fuzzer.cc index d91ebbf98..ed9df8f84 100644 --- a/fuzz/db_map_fuzzer.cc +++ b/fuzz/db_map_fuzzer.cc @@ -16,7 +16,8 @@ protobuf_mutator::libfuzzer::PostProcessorRegistration reg = { [](DBOperations* input, unsigned int /* seed */) { - const ROCKSDB_NAMESPACE::Comparator* comparator = ROCKSDB_NAMESPACE::BytewiseComparator(); + const ROCKSDB_NAMESPACE::Comparator* comparator = + ROCKSDB_NAMESPACE::BytewiseComparator(); auto ops = input->mutable_operations(); // Make sure begin <= end for DELETE_RANGE. for (DBOperation& op : *ops) { @@ -42,7 +43,8 @@ DEFINE_PROTO_FUZZER(DBOperations& input) { const std::string kDbPath = "/tmp/db_map_fuzzer_test"; auto fs = ROCKSDB_NAMESPACE::FileSystem::Default(); - if (fs->FileExists(kDbPath, ROCKSDB_NAMESPACE::IOOptions(), /*dbg=*/nullptr).ok()) { + if (fs->FileExists(kDbPath, ROCKSDB_NAMESPACE::IOOptions(), /*dbg=*/nullptr) + .ok()) { std::cerr << "db path " << kDbPath << " already exists" << std::endl; abort(); } @@ -56,7 +58,8 @@ DEFINE_PROTO_FUZZER(DBOperations& input) { for (const DBOperation& op : input.operations()) { switch (op.type()) { case OpType::PUT: { - CHECK_OK(db->Put(ROCKSDB_NAMESPACE::WriteOptions(), op.key(), op.value())); + CHECK_OK( + db->Put(ROCKSDB_NAMESPACE::WriteOptions(), op.key(), op.value())); kv[op.key()] = op.value(); break; } @@ -88,7 +91,8 @@ DEFINE_PROTO_FUZZER(DBOperations& input) { CHECK_OK(ROCKSDB_NAMESPACE::DB::Open(options, kDbPath, &db)); auto kv_it = kv.begin(); - ROCKSDB_NAMESPACE::Iterator* it = db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions()); + ROCKSDB_NAMESPACE::Iterator* it = + db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next(), kv_it++) { CHECK_TRUE(kv_it != kv.end()); CHECK_EQ(it->key().ToString(), kv_it->first); diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index dec3e4420..f59e395de 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -14,6 +14,7 @@ #include "rocksdb/compaction_job_stats.h" #include "rocksdb/compression_type.h" +#include "rocksdb/customizable.h" #include "rocksdb/status.h" #include "rocksdb/table_properties.h" #include "rocksdb/types.h" @@ -355,8 +356,17 @@ struct ExternalFileIngestionInfo { // the current thread holding any DB mutex. This is to prevent potential // deadlock and performance issue when using EventListener callback // in a complex way. -class EventListener { +class EventListener : public Customizable { public: + static const char* Type() { return "EventListener"; } + static Status CreateFromString(const ConfigOptions& options, + const std::string& id, + std::shared_ptr* result); + const char* Name() const override { + // Since EventListeners did not have a name previously, we will assume + // an empty name. Instances should override this method. + return ""; + } // A callback function to RocksDB which will be called whenever a // registered RocksDB flushes a file. The default implementation is // no-op. diff --git a/options/customizable.cc b/options/customizable.cc index 754eff75c..e9388cb57 100644 --- a/options/customizable.cc +++ b/options/customizable.cc @@ -41,14 +41,16 @@ std::string Customizable::SerializeOptions(const ConfigOptions& config_options, const std::string& prefix) const { std::string result; std::string parent; - if (!config_options.IsShallow()) { + std::string id = GetId(); + if (!config_options.IsShallow() && !id.empty()) { parent = Configurable::SerializeOptions(config_options, ""); } if (parent.empty()) { - result = GetId(); + result = id; } else { - result.append(prefix + ConfigurableHelper::kIdPropName + "=" + GetId() + - config_options.delimiter); + result.append(prefix); + result.append(ConfigurableHelper::kIdPropName).append("="); + result.append(id).append(config_options.delimiter); result.append(parent); } return result; diff --git a/options/customizable_test.cc b/options/customizable_test.cc index ee6ca4b85..814bb686d 100644 --- a/options/customizable_test.cc +++ b/options/customizable_test.cc @@ -14,7 +14,7 @@ #include #include -#include "options/configurable_helper.h" +#include "db/db_test_util.h" #include "options/options_helper.h" #include "options/options_parser.h" #include "rocksdb/convenience.h" @@ -922,6 +922,20 @@ static int RegisterTestObjects(ObjectLibrary& library, guard->reset(new mock::MockTableFactory()); return guard->get(); }); + library.Register( + OnFileDeletionListener::kClassName(), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new OnFileDeletionListener()); + return guard->get(); + }); + library.Register( + FlushCounterListener::kClassName(), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new FlushCounterListener()); + return guard->get(); + }); library.Register( test::SimpleSuffixReverseComparator::kClassName(), [](const std::string& /*uri*/, @@ -1123,6 +1137,25 @@ TEST_F(LoadCustomizableTest, LoadComparatorTest) { } #ifndef ROCKSDB_LITE +TEST_F(LoadCustomizableTest, LoadEventListenerTest) { + std::shared_ptr result; + + ASSERT_NOK(EventListener::CreateFromString( + config_options_, OnFileDeletionListener::kClassName(), &result)); + ASSERT_NOK(EventListener::CreateFromString( + config_options_, FlushCounterListener::kClassName(), &result)); + if (RegisterTests("Test")) { + ASSERT_OK(EventListener::CreateFromString( + config_options_, OnFileDeletionListener::kClassName(), &result)); + ASSERT_NE(result, nullptr); + ASSERT_STREQ(result->Name(), OnFileDeletionListener::kClassName()); + ASSERT_OK(EventListener::CreateFromString( + config_options_, FlushCounterListener::kClassName(), &result)); + ASSERT_NE(result, nullptr); + ASSERT_STREQ(result->Name(), FlushCounterListener::kClassName()); + } +} + TEST_F(LoadCustomizableTest, LoadEncryptionProviderTest) { std::shared_ptr result; ASSERT_NOK( diff --git a/options/db_options.cc b/options/db_options.cc index 9a364d57b..d716d3863 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -15,6 +15,7 @@ #include "rocksdb/configurable.h" #include "rocksdb/env.h" #include "rocksdb/file_system.h" +#include "rocksdb/listener.h" #include "rocksdb/rate_limiter.h" #include "rocksdb/sst_file_manager.h" #include "rocksdb/system_clock.h" @@ -141,7 +142,6 @@ static std::unordered_map std::shared_ptr rate_limiter; std::shared_ptr statistics; std::vector db_paths; - std::vector> listeners; FileTypeSet checksum_handoff_file_types; */ {"advise_random_on_open", @@ -446,6 +446,67 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, allow_data_in_errors), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + // Allow EventListeners that have a non-empty Name() to be read/written + // as options Each listener will either be + // - A simple name (e.g. "MyEventListener") + // - A name with properties (e.g. "{id=MyListener1; timeout=60}" + // Multiple listeners will be separated by a ":": + // - "MyListener0;{id=MyListener1; timeout=60} + {"listeners", + {offsetof(struct ImmutableDBOptions, listeners), OptionType::kVector, + OptionVerificationType::kByNameAllowNull, + OptionTypeFlags::kCompareNever, + [](const ConfigOptions& opts, const std::string& /*name*/, + const std::string& value, void* addr) { + ConfigOptions embedded = opts; + embedded.ignore_unsupported_options = true; + std::vector> listeners; + Status s; + for (size_t start = 0, end = 0; + s.ok() && start < value.size() && end != std::string::npos; + start = end + 1) { + std::string token; + s = OptionTypeInfo::NextToken(value, ':', start, &end, &token); + if (s.ok() && !token.empty()) { + std::shared_ptr listener; + s = EventListener::CreateFromString(embedded, token, &listener); + if (s.ok() && listener != nullptr) { + listeners.push_back(listener); + } + } + } + if (s.ok()) { // It worked + *(static_cast>*>( + addr)) = listeners; + } + return s; + }, + [](const ConfigOptions& opts, const std::string& /*name*/, + const void* addr, std::string* value) { + const auto listeners = + static_cast>*>( + addr); + ConfigOptions embedded = opts; + embedded.delimiter = ";"; + int printed = 0; + for (const auto& listener : *listeners) { + auto id = listener->GetId(); + if (!id.empty()) { + std::string elem_str = listener->ToString(embedded, ""); + if (printed++ == 0) { + value->append("{"); + } else { + value->append(":"); + } + value->append(elem_str); + } + } + if (printed > 0) { + value->append("}"); + } + return Status::OK(); + }, + nullptr}}, }; const std::string OptionsHelper::kDBOptionsName = "DBOptions"; diff --git a/options/options_test.cc b/options/options_test.cc index d3787c475..0d6570494 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -1864,6 +1864,74 @@ TEST_F(OptionsTest, ConvertOptionsTest) { leveldb_opt.block_restart_interval); ASSERT_EQ(table_opt->filter_policy.get(), leveldb_opt.filter_policy); } +#ifndef ROCKSDB_LITE +class TestEventListener : public EventListener { + private: + std::string id_; + + public: + explicit TestEventListener(const std::string& id) : id_("Test" + id) {} + const char* Name() const override { return id_.c_str(); } +}; + +static std::unordered_map + test_listener_option_info = { + {"s", + {0, OptionType::kString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + +}; + +class TestConfigEventListener : public TestEventListener { + private: + std::string s_; + + public: + explicit TestConfigEventListener(const std::string& id) + : TestEventListener("Config" + id) { + s_ = id; + RegisterOptions("Test", &s_, &test_listener_option_info); + } +}; + +static int RegisterTestEventListener(ObjectLibrary& library, + const std::string& arg) { + library.Register( + "Test" + arg, + [](const std::string& name, std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new TestEventListener(name.substr(4))); + return guard->get(); + }); + library.Register( + "TestConfig" + arg, + [](const std::string& name, std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new TestConfigEventListener(name.substr(10))); + return guard->get(); + }); + return 1; +} +TEST_F(OptionsTest, OptionsListenerTest) { + DBOptions orig, copy; + orig.listeners.push_back(std::make_shared("1")); + orig.listeners.push_back(std::make_shared("2")); + orig.listeners.push_back(std::make_shared("")); + orig.listeners.push_back(std::make_shared("1")); + orig.listeners.push_back(std::make_shared("2")); + orig.listeners.push_back(std::make_shared("")); + ConfigOptions config_opts(orig); + config_opts.registry->AddLibrary("listener", RegisterTestEventListener, "1"); + std::string opts_str; + ASSERT_OK(GetStringFromDBOptions(config_opts, orig, &opts_str)); + ASSERT_OK(GetDBOptionsFromString(config_opts, orig, opts_str, ©)); + ASSERT_OK(GetStringFromDBOptions(config_opts, copy, &opts_str)); + ASSERT_EQ( + copy.listeners.size(), + 2); // The Test{Config}1 Listeners could be loaded but not the others + ASSERT_OK(RocksDBOptionsParser::VerifyDBOptions(config_opts, orig, copy)); +} +#endif // ROCKSDB_LITE #ifndef ROCKSDB_LITE const static std::string kCustomEnvName = "Custom"; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 354453c23..19589e54f 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -2582,6 +2582,9 @@ class Benchmark { ~ErrorHandlerListener() override {} + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "ErrorHandlerListener"; } + void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/, Status /*bg_error*/, bool* auto_recovery) override { diff --git a/utilities/blob_db/blob_db_listener.h b/utilities/blob_db/blob_db_listener.h index c26d7bd27..d17d29853 100644 --- a/utilities/blob_db/blob_db_listener.h +++ b/utilities/blob_db/blob_db_listener.h @@ -37,6 +37,9 @@ class BlobDBListener : public EventListener { blob_db_impl_->UpdateLiveSSTSize(); } + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "BlobDBListener"; } + protected: BlobDBImpl* blob_db_impl_; }; @@ -46,6 +49,8 @@ class BlobDBListenerGC : public BlobDBListener { explicit BlobDBListenerGC(BlobDBImpl* blob_db_impl) : BlobDBListener(blob_db_impl) {} + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "BlobDBListenerGC"; } void OnFlushCompleted(DB* db, const FlushJobInfo& info) override { BlobDBListener::OnFlushCompleted(db, info); diff --git a/utilities/transactions/lock/range/range_tree/lib/portability/toku_external_pthread.h b/utilities/transactions/lock/range/range_tree/lib/portability/toku_external_pthread.h index 456d5a94a..eb8291c1d 100644 --- a/utilities/transactions/lock/range/range_tree/lib/portability/toku_external_pthread.h +++ b/utilities/transactions/lock/range/range_tree/lib/portability/toku_external_pthread.h @@ -1,6 +1,7 @@ /* - A wrapper around ROCKSDB_NAMESPACE::TransactionDBMutexFactory-provided condition and - mutex that provides toku_pthread_*-like interface. The functions are named + A wrapper around ROCKSDB_NAMESPACE::TransactionDBMutexFactory-provided + condition and mutex that provides toku_pthread_*-like interface. The functions + are named toku_external_{mutex|cond}_XXX