The ObjectRegistry class replaces the Registrar and NewCustomObjects.… (#5293)

Summary:
The ObjectRegistry class replaces the Registrar and NewCustomObjects.  Objects are registered with the registry by Type (the class must implement the static const char *Type() method).

This change is necessary for a few reasons:
- By having a class (rather than static template instances), the class can be passed between compilation units, meaning that objects could be registered and shared from a dynamic library with an executable.
- By having a class with instances, different units could have different objects registered.  This could be useful if, for example, one Option allowed for a dynamic library and one did not.

When combined with some other PRs (being able to load shared libraries, a Configurable interface to configure objects to/from string), this code will allow objects in external shared libraries to be added to a RocksDB image at run-time, rather than requiring every new extension to be built into the main library and called explicitly by every program.

Test plan (on riversand963's  devserver)
```
$COMPILE_WITH_ASAN=1 make -j32 all && sleep 1 && make check
```
All tests pass.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5293

Differential Revision: D16363396

Pulled By: riversand963

fbshipit-source-id: fbe4acb615bfc11103eef40a0b288845791c0180
main
Mark Rambacher 5 years ago committed by Facebook Github Bot
parent 092f417037
commit cfcf045acc
  1. 3
      CMakeLists.txt
  2. 1
      HISTORY.md
  3. 26
      TARGETS
  4. 15
      env/env.cc
  5. 4
      env/env_basic_test.cc
  6. 1
      include/rocksdb/comparator.h
  7. 5
      include/rocksdb/env.h
  8. 1
      include/rocksdb/merge_operator.h
  9. 2
      include/rocksdb/statistics.h
  10. 225
      include/rocksdb/utilities/object_registry.h
  11. 26
      options/options_helper.cc
  12. 33
      options/options_test.cc
  13. 1
      src.mk
  14. 2
      tools/block_cache_trace_analyzer.cc
  15. 17
      tools/db_bench_tool.cc
  16. 11
      tools/ldb_cmd.cc
  17. 87
      utilities/object_registry.cc
  18. 137
      utilities/object_registry_test.cc

@ -520,7 +520,7 @@ set(SOURCES
db/flush_job.cc db/flush_job.cc
db/flush_scheduler.cc db/flush_scheduler.cc
db/forward_iterator.cc db/forward_iterator.cc
db/import_column_family_job.cc db/import_column_family_job.cc
db/internal_stats.cc db/internal_stats.cc
db/logs_with_prep_tracker.cc db/logs_with_prep_tracker.cc
db/log_reader.cc db/log_reader.cc
@ -681,6 +681,7 @@ set(SOURCES
utilities/merge_operators/string_append/stringappend.cc utilities/merge_operators/string_append/stringappend.cc
utilities/merge_operators/string_append/stringappend2.cc utilities/merge_operators/string_append/stringappend2.cc
utilities/merge_operators/uint64add.cc utilities/merge_operators/uint64add.cc
utilities/object_registry.cc
utilities/option_change_migration/option_change_migration.cc utilities/option_change_migration/option_change_migration.cc
utilities/options/options_util.cc utilities/options/options_util.cc
utilities/persistent_cache/block_cache_tier.cc utilities/persistent_cache/block_cache_tier.cc

@ -20,6 +20,7 @@
* Overload GetAllKeyVersions() to support non-default column family. * Overload GetAllKeyVersions() to support non-default column family.
* Added new APIs ExportColumnFamily() and CreateColumnFamilyWithImport() to support export and import of a Column Family. https://github.com/facebook/rocksdb/issues/3469 * Added new APIs ExportColumnFamily() and CreateColumnFamilyWithImport() to support export and import of a Column Family. https://github.com/facebook/rocksdb/issues/3469
* ldb sometimes uses a string-append merge operator if no merge operator is passed in. This is to allow users to print keys from a DB with a merge operator. * ldb sometimes uses a string-append merge operator if no merge operator is passed in. This is to allow users to print keys from a DB with a merge operator.
* Replaces old Registra with ObjectRegistry to allow user to create custom object from string, also add LoadEnv() to Env.
### New Features ### New Features
* Add an option `snap_refresh_nanos` (default to 0) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature. * Add an option `snap_refresh_nanos` (default to 0) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature.

@ -276,6 +276,7 @@ cpp_library(
"utilities/merge_operators/string_append/stringappend.cc", "utilities/merge_operators/string_append/stringappend.cc",
"utilities/merge_operators/string_append/stringappend2.cc", "utilities/merge_operators/string_append/stringappend2.cc",
"utilities/merge_operators/uint64add.cc", "utilities/merge_operators/uint64add.cc",
"utilities/object_registry.cc",
"utilities/option_change_migration/option_change_migration.cc", "utilities/option_change_migration/option_change_migration.cc",
"utilities/options/options_util.cc", "utilities/options/options_util.cc",
"utilities/persistent_cache/block_cache_tier.cc", "utilities/persistent_cache/block_cache_tier.cc",
@ -371,11 +372,6 @@ ROCKS_TESTS = [
"logging/auto_roll_logger_test.cc", "logging/auto_roll_logger_test.cc",
"serial", "serial",
], ],
[
"env_logger_test",
"logging/env_logger_test.cc",
"serial",
],
[ [
"autovector_test", "autovector_test",
"util/autovector_test.cc", "util/autovector_test.cc",
@ -422,13 +418,13 @@ ROCKS_TESTS = [
"serial", "serial",
], ],
[ [
"cache_test", "cache_simulator_test",
"cache/cache_test.cc", "utilities/simulator_cache/cache_simulator_test.cc",
"serial", "serial",
], ],
[ [
"cache_simulator_test", "cache_test",
"utilities/simulator_cache/cache_simulator_test.cc", "cache/cache_test.cc",
"serial", "serial",
], ],
[ [
@ -554,7 +550,7 @@ ROCKS_TESTS = [
[ [
"db_bloom_filter_test", "db_bloom_filter_test",
"db/db_bloom_filter_test.cc", "db/db_bloom_filter_test.cc",
"parallel", "serial",
], ],
[ [
"db_compaction_filter_test", "db_compaction_filter_test",
@ -711,6 +707,11 @@ ROCKS_TESTS = [
"env/env_basic_test.cc", "env/env_basic_test.cc",
"serial", "serial",
], ],
[
"env_logger_test",
"logging/env_logger_test.cc",
"serial",
],
[ [
"env_test", "env_test",
"env/env_test.cc", "env/env_test.cc",
@ -796,6 +797,11 @@ ROCKS_TESTS = [
"monitoring/histogram_test.cc", "monitoring/histogram_test.cc",
"serial", "serial",
], ],
[
"import_column_family_test",
"db/import_column_family_test.cc",
"parallel",
],
[ [
"inlineskiplist_test", "inlineskiplist_test",
"memtable/inlineskiplist_test.cc", "memtable/inlineskiplist_test.cc",

15
env/env.cc vendored

@ -16,6 +16,7 @@
#include "port/port.h" #include "port/port.h"
#include "port/sys_time.h" #include "port/sys_time.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/utilities/object_registry.h"
#include "util/autovector.h" #include "util/autovector.h"
namespace rocksdb { namespace rocksdb {
@ -28,6 +29,20 @@ Status Env::NewLogger(const std::string& fname,
return NewEnvLogger(fname, this, result); return NewEnvLogger(fname, this, result);
} }
Status Env::LoadEnv(const std::string& value, Env** result) {
Env* env = *result;
Status s;
#ifndef ROCKSDB_LITE
s = ObjectRegistry::NewInstance()->NewStaticObject<Env>(value, &env);
#else
s = Status::NotSupported("Cannot load environment in LITE mode: ", value);
#endif
if (s.ok()) {
*result = env;
}
return s;
}
std::string Env::PriorityToString(Env::Priority priority) { std::string Env::PriorityToString(Env::Priority priority) {
switch (priority) { switch (priority) {
case Env::Priority::BOTTOM: case Env::Priority::BOTTOM:

@ -11,7 +11,6 @@
#include "env/mock_env.h" #include "env/mock_env.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/utilities/object_registry.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
namespace rocksdb { namespace rocksdb {
@ -104,13 +103,12 @@ namespace {
// ValuesIn() will skip running tests when given an empty collection. // ValuesIn() will skip running tests when given an empty collection.
std::vector<Env*> GetCustomEnvs() { std::vector<Env*> GetCustomEnvs() {
static Env* custom_env; static Env* custom_env;
static std::unique_ptr<Env> custom_env_guard;
static bool init = false; static bool init = false;
if (!init) { if (!init) {
init = true; init = true;
const char* uri = getenv("TEST_ENV_URI"); const char* uri = getenv("TEST_ENV_URI");
if (uri != nullptr) { if (uri != nullptr) {
custom_env = NewCustomObject<Env>(uri, &custom_env_guard); Env::LoadEnv(uri, &custom_env);
} }
} }

@ -35,6 +35,7 @@ class Comparator {
virtual ~Comparator() {} virtual ~Comparator() {}
static const char* Type() { return "Comparator"; }
// Three-way comparison. Returns value: // Three-way comparison. Returns value:
// < 0 iff "a" < "b", // < 0 iff "a" < "b",
// == 0 iff "a" == "b", // == 0 iff "a" == "b",

@ -144,6 +144,11 @@ class Env {
virtual ~Env(); virtual ~Env();
static const char* Type() { return "Environment"; }
// Loads the environment specified by the input value into the result
static Status LoadEnv(const std::string& value, Env** result);
// Return a default environment suitable for the current operating // Return a default environment suitable for the current operating
// system. Sophisticated users may wish to provide their own Env // system. Sophisticated users may wish to provide their own Env
// implementation instead of relying on this default environment. // implementation instead of relying on this default environment.

@ -46,6 +46,7 @@ class Logger;
class MergeOperator { class MergeOperator {
public: public:
virtual ~MergeOperator() {} virtual ~MergeOperator() {}
static const char* Type() { return "MergeOperator"; }
// Gives the client a way to express the read -> modify -> write semantics // Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation. // key: (IN) The key that's associated with this merge operation.

@ -480,7 +480,7 @@ enum StatsLevel : uint8_t {
class Statistics { class Statistics {
public: public:
virtual ~Statistics() {} virtual ~Statistics() {}
static const char* Type() { return "Statistics"; }
virtual uint64_t getTickerCount(uint32_t tickerType) const = 0; virtual uint64_t getTickerCount(uint32_t tickerType) const = 0;
virtual void histogramData(uint32_t type, virtual void histogramData(uint32_t type,
HistogramData* const data) const = 0; HistogramData* const data) const = 0;

@ -11,80 +11,195 @@
#include <memory> #include <memory>
#include <regex> #include <regex>
#include <string> #include <string>
#include <unordered_map>
#include <vector> #include <vector>
#include "rocksdb/status.h"
#include "rocksdb/env.h"
namespace rocksdb { namespace rocksdb {
class Logger;
// Creates a new T using the factory function that was registered with a pattern
// that matches the provided "target" string according to std::regex_match.
//
// If no registered functions match, returns nullptr. If multiple functions
// match, the factory function used is unspecified.
//
// Populates res_guard with result pointer if caller is granted ownership.
template <typename T>
T* NewCustomObject(const std::string& target, std::unique_ptr<T>* res_guard);
// Returns a new T when called with a string. Populates the std::unique_ptr // Returns a new T when called with a string. Populates the std::unique_ptr
// argument if granting ownership to caller. // argument if granting ownership to caller.
template <typename T> template <typename T>
using FactoryFunc = std::function<T*(const std::string&, std::unique_ptr<T>*)>; using FactoryFunc =
std::function<T*(const std::string&, std::unique_ptr<T>*, std::string*)>;
// To register a factory function for a type T, initialize a Registrar<T> object
// with static storage duration. For example: class ObjectLibrary {
//
// static Registrar<Env> hdfs_reg("hdfs://.*", &CreateHdfsEnv);
//
// Then, calling NewCustomObject<Env>("hdfs://some_path", ...) will match the
// regex provided above, so it returns the result of invoking CreateHdfsEnv.
template <typename T>
class Registrar {
public: public:
explicit Registrar(std::string pattern, FactoryFunc<T> factory); // Base class for an Entry in the Registry.
}; class Entry {
public:
virtual ~Entry() {}
Entry(const std::string& name) : name_(std::move(name)) {}
// Checks to see if the target matches this entry
virtual bool matches(const std::string& target) const {
return name_ == target;
}
const std::string& Name() const { return name_; }
private:
const std::string name_; // The name of the Entry
}; // End class Entry
// An Entry containing a FactoryFunc for creating new Objects
template <typename T>
class FactoryEntry : public Entry {
public:
FactoryEntry(const std::string& name, FactoryFunc<T> f)
: Entry(name), pattern_(std::move(name)), factory_(std::move(f)) {}
~FactoryEntry() override {}
bool matches(const std::string& target) const override {
return std::regex_match(target, pattern_);
}
// Creates a new T object.
T* NewFactoryObject(const std::string& target, std::unique_ptr<T>* guard,
std::string* msg) const {
return factory_(target, guard, msg);
}
// Implementation details follow. private:
std::regex pattern_; // The pattern for this entry
FactoryFunc<T> factory_;
}; // End class FactoryEntry
public:
// Finds the entry matching the input name and type
const Entry* FindEntry(const std::string& type,
const std::string& name) const;
void Dump(Logger* logger) const;
// Registers the factory with the library for the pattern.
// If the pattern matches, the factory may be used to create a new object.
template <typename T>
const FactoryFunc<T>& Register(const std::string& pattern,
const FactoryFunc<T>& factory) {
std::unique_ptr<Entry> entry(new FactoryEntry<T>(pattern, factory));
AddEntry(T::Type(), entry);
return factory;
}
// Returns the default ObjectLibrary
static std::shared_ptr<ObjectLibrary>& Default();
namespace internal { private:
// Adds the input entry to the list for the given type
void AddEntry(const std::string& type, std::unique_ptr<Entry>& entry);
template <typename T> // ** FactoryFunctions for this loader, organized by type
struct RegistryEntry { std::unordered_map<std::string, std::vector<std::unique_ptr<Entry>>> entries_;
std::regex pattern;
FactoryFunc<T> factory;
}; };
template <typename T> // The ObjectRegistry is used to register objects that can be created by a
struct Registry { // name/pattern at run-time where the specific implementation of the object may
static Registry* Get() { // not be known in advance.
static Registry<T> instance; class ObjectRegistry {
return &instance; public:
static std::shared_ptr<ObjectRegistry> NewInstance();
ObjectRegistry();
void AddLibrary(const std::shared_ptr<ObjectLibrary>& library) {
libraries_.emplace_back(library);
} }
std::vector<RegistryEntry<T>> entries;
private: // Creates a new T using the factory function that was registered with a
Registry() = default; // pattern that matches the provided "target" string according to
}; // std::regex_match.
//
// If no registered functions match, returns nullptr. If multiple functions
// match, the factory function used is unspecified.
//
// Populates res_guard with result pointer if caller is granted ownership.
template <typename T>
T* NewObject(const std::string& target, std::unique_ptr<T>* guard,
std::string* errmsg) {
guard->reset();
const auto* basic = FindEntry(T::Type(), target);
if (basic != nullptr) {
const auto* factory =
static_cast<const ObjectLibrary::FactoryEntry<T>*>(basic);
return factory->NewFactoryObject(target, guard, errmsg);
} else {
*errmsg = std::string("Could not load ") + T::Type();
return nullptr;
}
}
// Creates a new unique T using the input factory functions.
// Returns OK if a new unique T was successfully created
// Returns NotFound if the type/target could not be created
// Returns InvalidArgument if the factory return an unguarded object
// (meaning it cannot be managed by a unique ptr)
template <typename T>
Status NewUniqueObject(const std::string& target,
std::unique_ptr<T>* result) {
std::string errmsg;
T* ptr = NewObject(target, result, &errmsg);
if (ptr == nullptr) {
return Status::NotFound(errmsg, target);
} else if (*result) {
return Status::OK();
} else {
return Status::InvalidArgument(std::string("Cannot make a unique ") +
T::Type() + " from unguarded one ",
target);
}
}
} // namespace internal // Creates a new shared T using the input factory functions.
// Returns OK if a new shared T was successfully created
// Returns NotFound if the type/target could not be created
// Returns InvalidArgument if the factory return an unguarded object
// (meaning it cannot be managed by a shared ptr)
template <typename T>
Status NewSharedObject(const std::string& target,
std::shared_ptr<T>* result) {
std::string errmsg;
std::unique_ptr<T> guard;
T* ptr = NewObject(target, &guard, &errmsg);
if (ptr == nullptr) {
return Status::NotFound(errmsg, target);
} else if (guard) {
result->reset(guard.release());
return Status::OK();
} else {
return Status::InvalidArgument(std::string("Cannot make a shared ") +
T::Type() + " from unguarded one ",
target);
}
}
template <typename T> // Creates a new static T using the input factory functions.
T* NewCustomObject(const std::string& target, std::unique_ptr<T>* res_guard) { // Returns OK if a new static T was successfully created
res_guard->reset(); // Returns NotFound if the type/target could not be created
for (const auto& entry : internal::Registry<T>::Get()->entries) { // Returns InvalidArgument if the factory return a guarded object
if (std::regex_match(target, entry.pattern)) { // (meaning it is managed by a unique ptr)
return entry.factory(target, res_guard); template <typename T>
Status NewStaticObject(const std::string& target, T** result) {
std::string errmsg;
std::unique_ptr<T> guard;
T* ptr = NewObject(target, &guard, &errmsg);
if (ptr == nullptr) {
return Status::NotFound(errmsg, target);
} else if (guard.get()) {
return Status::InvalidArgument(std::string("Cannot make a static ") +
T::Type() + " from a guarded one ",
target);
} else {
*result = ptr;
return Status::OK();
} }
} }
return nullptr;
}
template <typename T> // Dump the contents of the registry to the logger
Registrar<T>::Registrar(std::string pattern, FactoryFunc<T> factory) { void Dump(Logger* logger) const;
internal::Registry<T>::Get()->entries.emplace_back(internal::RegistryEntry<T>{
std::regex(std::move(pattern)), std::move(factory)}); private:
} const ObjectLibrary::Entry* FindEntry(const std::string& type,
const std::string& name) const;
// The set of libraries to search for factories for this registry.
// The libraries are searched in reverse order (back to front) when
// searching for entries.
std::vector<std::shared_ptr<ObjectLibrary>> libraries_;
};
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -1045,21 +1045,21 @@ Status ParseColumnFamilyOption(const std::string& name,
} else { } else {
if (name == kNameComparator) { if (name == kNameComparator) {
// Try to get comparator from object registry first. // Try to get comparator from object registry first.
std::unique_ptr<const Comparator> comp_guard;
const Comparator* comp =
NewCustomObject<const Comparator>(value, &comp_guard);
// Only support static comparator for now. // Only support static comparator for now.
if (comp != nullptr && !comp_guard) { Status status = ObjectRegistry::NewInstance()->NewStaticObject(
new_options->comparator = comp; value, &new_options->comparator);
if (status.ok()) {
return status;
} }
} else if (name == kNameMergeOperator) { } else if (name == kNameMergeOperator) {
// Try to get merge operator from object registry first. // Try to get merge operator from object registry first.
std::unique_ptr<std::shared_ptr<MergeOperator>> mo_guard; std::shared_ptr<MergeOperator> mo;
std::shared_ptr<MergeOperator>* mo = Status status =
NewCustomObject<std::shared_ptr<MergeOperator>>(value, &mo_guard); ObjectRegistry::NewInstance()->NewSharedObject<MergeOperator>(
value, &new_options->merge_operator);
// Only support static comparator for now. // Only support static comparator for now.
if (mo != nullptr) { if (status.ok()) {
new_options->merge_operator = *mo; return status;
} }
} }
@ -1191,10 +1191,10 @@ Status ParseDBOption(const std::string& name,
NewGenericRateLimiter(static_cast<int64_t>(ParseUint64(value)))); NewGenericRateLimiter(static_cast<int64_t>(ParseUint64(value))));
} else if (name == kNameEnv) { } else if (name == kNameEnv) {
// Currently `Env` can be deserialized from object registry only. // Currently `Env` can be deserialized from object registry only.
std::unique_ptr<Env> env_guard; Env* env = new_options->env;
Env* env = NewCustomObject<Env>(value, &env_guard); Status status = Env::LoadEnv(value, &env);
// Only support static env for now. // Only support static env for now.
if (env != nullptr && !env_guard) { if (status.ok()) {
new_options->env = env; new_options->env = env;
} }
} else { } else {

@ -341,11 +341,11 @@ TEST_F(OptionsTest, GetColumnFamilyOptionsFromStringTest) {
// Comparator from object registry // Comparator from object registry
std::string kCompName = "reverse_comp"; std::string kCompName = "reverse_comp";
static Registrar<const Comparator> test_reg_a( ObjectLibrary::Default()->Register<const Comparator>(
kCompName, [](const std::string& /*name*/, kCompName,
std::unique_ptr<const Comparator>* /*comparator_guard*/) { [](const std::string& /*name*/,
return ReverseBytewiseComparator(); std::unique_ptr<const Comparator>* /*guard*/,
}); std::string* /* errmsg */) { return ReverseBytewiseComparator(); });
ASSERT_OK(GetColumnFamilyOptionsFromString( ASSERT_OK(GetColumnFamilyOptionsFromString(
base_cf_opt, "comparator=" + kCompName + ";", &new_cf_opt)); base_cf_opt, "comparator=" + kCompName + ";", &new_cf_opt));
@ -354,13 +354,12 @@ TEST_F(OptionsTest, GetColumnFamilyOptionsFromStringTest) {
// MergeOperator from object registry // MergeOperator from object registry
std::unique_ptr<BytesXOROperator> bxo(new BytesXOROperator()); std::unique_ptr<BytesXOROperator> bxo(new BytesXOROperator());
std::string kMoName = bxo->Name(); std::string kMoName = bxo->Name();
static Registrar<std::shared_ptr<MergeOperator>> test_reg_b( ObjectLibrary::Default()->Register<MergeOperator>(
kMoName, [](const std::string& /*name*/, kMoName,
std::unique_ptr<std::shared_ptr<MergeOperator>>* [](const std::string& /*name*/, std::unique_ptr<MergeOperator>* guard,
merge_operator_guard) { std::string* /* errmsg */) {
merge_operator_guard->reset( guard->reset(new BytesXOROperator());
new std::shared_ptr<MergeOperator>(new BytesXOROperator())); return guard->get();
return merge_operator_guard->get();
}); });
ASSERT_OK(GetColumnFamilyOptionsFromString( ASSERT_OK(GetColumnFamilyOptionsFromString(
@ -770,9 +769,10 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) {
explicit CustomEnv(Env* _target) : EnvWrapper(_target) {} explicit CustomEnv(Env* _target) : EnvWrapper(_target) {}
}; };
static Registrar<Env> test_reg_env( ObjectLibrary::Default()->Register<Env>(
kCustomEnvName, kCustomEnvName,
[](const std::string& /*name*/, std::unique_ptr<Env>* /*env_guard*/) { [](const std::string& /*name*/, std::unique_ptr<Env>* /*env_guard*/,
std::string* /* errmsg */) {
static CustomEnv env(Env::Default()); static CustomEnv env(Env::Default());
return &env; return &env;
}); });
@ -813,8 +813,9 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) {
ASSERT_EQ(new_options.create_if_missing, true); ASSERT_EQ(new_options.create_if_missing, true);
ASSERT_EQ(new_options.max_open_files, 1); ASSERT_EQ(new_options.max_open_files, 1);
ASSERT_TRUE(new_options.rate_limiter.get() != nullptr); ASSERT_TRUE(new_options.rate_limiter.get() != nullptr);
std::unique_ptr<Env> env_guard; Env* newEnv = new_options.env;
ASSERT_EQ(NewCustomObject<Env>(kCustomEnvName, &env_guard), new_options.env); ASSERT_OK(Env::LoadEnv(kCustomEnvName, &newEnv));
ASSERT_EQ(newEnv, new_options.env);
} }
TEST_F(OptionsTest, DBOptionsSerialization) { TEST_F(OptionsTest, DBOptionsSerialization) {

@ -195,6 +195,7 @@ LIB_SOURCES = \
utilities/merge_operators/string_append/stringappend2.cc \ utilities/merge_operators/string_append/stringappend2.cc \
utilities/merge_operators/uint64add.cc \ utilities/merge_operators/uint64add.cc \
utilities/merge_operators/bytesxor.cc \ utilities/merge_operators/bytesxor.cc \
utilities/object_registry.cc \
utilities/option_change_migration/option_change_migration.cc \ utilities/option_change_migration/option_change_migration.cc \
utilities/options/options_util.cc \ utilities/options/options_util.cc \
utilities/persistent_cache/block_cache_tier.cc \ utilities/persistent_cache/block_cache_tier.cc \

@ -1637,7 +1637,7 @@ void BlockCacheTraceAnalyzer::PrintAccessCountStats(bool user_access_only,
} }
fprintf(stdout, fprintf(stdout,
"Bottom %" PRIu32 " access count. Access count=%" PRIu64 "Bottom %" PRIu32 " access count. Access count=%" PRIu64
" nblocks=%" PRIu64 " %s\n", " nblocks=%" ROCKSDB_PRIszt " %s\n",
bottom_k, naccess_it->first, naccess_it->second.size(), bottom_k, naccess_it->first, naccess_it->second.size(),
statistics.c_str()); statistics.c_str());
} }

@ -3049,8 +3049,9 @@ class Benchmark {
std::shared_ptr<TimestampEmulator> timestamp_emulator_; std::shared_ptr<TimestampEmulator> timestamp_emulator_;
std::unique_ptr<port::Thread> secondary_update_thread_; std::unique_ptr<port::Thread> secondary_update_thread_;
std::atomic<int> secondary_update_stopped_{0}; std::atomic<int> secondary_update_stopped_{0};
#ifndef ROCKSDB_LITE
uint64_t secondary_db_updates_ = 0; uint64_t secondary_db_updates_ = 0;
#endif // ROCKSDB_LITE
struct ThreadArg { struct ThreadArg {
Benchmark* bm; Benchmark* bm;
SharedState* shared; SharedState* shared;
@ -6366,13 +6367,12 @@ int db_bench_tool(int argc, char** argv) {
exit(1); exit(1);
} }
if (!FLAGS_statistics_string.empty()) { if (!FLAGS_statistics_string.empty()) {
std::unique_ptr<Statistics> custom_stats_guard; Status s = ObjectRegistry::NewInstance()->NewSharedObject<Statistics>(
dbstats.reset(NewCustomObject<Statistics>(FLAGS_statistics_string, FLAGS_statistics_string, &dbstats);
&custom_stats_guard));
custom_stats_guard.release();
if (dbstats == nullptr) { if (dbstats == nullptr) {
fprintf(stderr, "No Statistics registered matching string: %s\n", fprintf(stderr,
FLAGS_statistics_string.c_str()); "No Statistics registered matching string: %s status=%s\n",
FLAGS_statistics_string.c_str(), s.ToString().c_str());
exit(1); exit(1);
} }
} }
@ -6400,12 +6400,11 @@ int db_bench_tool(int argc, char** argv) {
StringToCompressionType(FLAGS_compression_type.c_str()); StringToCompressionType(FLAGS_compression_type.c_str());
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
std::unique_ptr<Env> custom_env_guard;
if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) { if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) {
fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n"); fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n");
exit(1); exit(1);
} else if (!FLAGS_env_uri.empty()) { } else if (!FLAGS_env_uri.empty()) {
FLAGS_env = NewCustomObject<Env>(FLAGS_env_uri, &custom_env_guard); Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env);
if (FLAGS_env == nullptr) { if (FLAGS_env == nullptr) {
fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str()); fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
exit(1); exit(1);

@ -20,7 +20,6 @@
#include "rocksdb/utilities/backupable_db.h" #include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/checkpoint.h" #include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/debug.h" #include "rocksdb/utilities/debug.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_util.h" #include "rocksdb/utilities/options_util.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "rocksdb/write_buffer_manager.h" #include "rocksdb/write_buffer_manager.h"
@ -2854,8 +2853,9 @@ void BackupCommand::DoCommand() {
return; return;
} }
printf("open db OK\n"); printf("open db OK\n");
std::unique_ptr<Env> custom_env_guard; Env* custom_env = nullptr;
Env* custom_env = NewCustomObject<Env>(backup_env_uri_, &custom_env_guard); Env::LoadEnv(backup_env_uri_, &custom_env);
BackupableDBOptions backup_options = BackupableDBOptions backup_options =
BackupableDBOptions(backup_dir_, custom_env); BackupableDBOptions(backup_dir_, custom_env);
backup_options.info_log = logger_.get(); backup_options.info_log = logger_.get();
@ -2889,8 +2889,9 @@ void RestoreCommand::Help(std::string& ret) {
} }
void RestoreCommand::DoCommand() { void RestoreCommand::DoCommand() {
std::unique_ptr<Env> custom_env_guard; Env* custom_env = nullptr;
Env* custom_env = NewCustomObject<Env>(backup_env_uri_, &custom_env_guard); Env::LoadEnv(backup_env_uri_, &custom_env);
std::unique_ptr<BackupEngineReadOnly> restore_engine; std::unique_ptr<BackupEngineReadOnly> restore_engine;
Status status; Status status;
{ {

@ -0,0 +1,87 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/utilities/object_registry.h"
#include "logging/logging.h"
#include "rocksdb/env.h"
namespace rocksdb {
#ifndef ROCKSDB_LITE
// Looks through the "type" factories for one that matches "name".
// If found, returns the pointer to the Entry matching this name.
// Otherwise, nullptr is returned
const ObjectLibrary::Entry *ObjectLibrary::FindEntry(
const std::string &type, const std::string &name) const {
auto entries = entries_.find(type);
if (entries != entries_.end()) {
for (const auto &entry : entries->second) {
if (entry->matches(name)) {
return entry.get();
}
}
}
return nullptr;
}
void ObjectLibrary::AddEntry(const std::string &type,
std::unique_ptr<Entry> &entry) {
auto &entries = entries_[type];
entries.emplace_back(std::move(entry));
}
void ObjectLibrary::Dump(Logger *logger) const {
for (const auto &iter : entries_) {
ROCKS_LOG_HEADER(logger, " Registered factories for type[%s] ",
iter.first.c_str());
bool printed_one = false;
for (const auto &e : iter.second) {
ROCKS_LOG_HEADER(logger, "%c %s", (printed_one) ? ',' : ':',
e->Name().c_str());
printed_one = true;
}
}
ROCKS_LOG_HEADER(logger, "\n");
}
// Returns the Default singleton instance of the ObjectLibrary
// This instance will contain most of the "standard" registered objects
std::shared_ptr<ObjectLibrary> &ObjectLibrary::Default() {
static std::shared_ptr<ObjectLibrary> instance =
std::make_shared<ObjectLibrary>();
return instance;
}
std::shared_ptr<ObjectRegistry> ObjectRegistry::NewInstance() {
std::shared_ptr<ObjectRegistry> instance = std::make_shared<ObjectRegistry>();
return instance;
}
ObjectRegistry::ObjectRegistry() {
libraries_.push_back(ObjectLibrary::Default());
}
// Searches (from back to front) the libraries looking for the
// an entry that matches this pattern.
// Returns the entry if it is found, and nullptr otherwise
const ObjectLibrary::Entry *ObjectRegistry::FindEntry(
const std::string &type, const std::string &name) const {
for (auto iter = libraries_.crbegin(); iter != libraries_.crend(); ++iter) {
const auto *entry = iter->get()->FindEntry(type, name);
if (entry != nullptr) {
return entry;
}
}
return nullptr;
}
void ObjectRegistry::Dump(Logger *logger) const {
for (auto iter = libraries_.crbegin(); iter != libraries_.crend(); ++iter) {
iter->get()->Dump(logger);
}
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

@ -17,44 +17,145 @@ class EnvRegistryTest : public testing::Test {
int EnvRegistryTest::num_a = 0; int EnvRegistryTest::num_a = 0;
int EnvRegistryTest::num_b = 0; int EnvRegistryTest::num_b = 0;
static FactoryFunc<Env> test_reg_a = ObjectLibrary::Default()->Register<Env>(
"a://.*",
[](const std::string& /*uri*/, std::unique_ptr<Env>* /*env_guard*/,
std::string* /* errmsg */) {
++EnvRegistryTest::num_a;
return Env::Default();
});
static Registrar<Env> test_reg_a("a://.*", static FactoryFunc<Env> test_reg_b = ObjectLibrary::Default()->Register<Env>(
[](const std::string& /*uri*/, "b://.*", [](const std::string& /*uri*/, std::unique_ptr<Env>* env_guard,
std::unique_ptr<Env>* /*env_guard*/) { std::string* /* errmsg */) {
++EnvRegistryTest::num_a; ++EnvRegistryTest::num_b;
return Env::Default(); // Env::Default() is a singleton so we can't grant ownership directly to
}); // the caller - we must wrap it first.
env_guard->reset(new EnvWrapper(Env::Default()));
static Registrar<Env> test_reg_b("b://.*", [](const std::string& /*uri*/, return env_guard->get();
std::unique_ptr<Env>* env_guard) { });
++EnvRegistryTest::num_b;
// Env::Default() is a singleton so we can't grant ownership directly to the
// caller - we must wrap it first.
env_guard->reset(new EnvWrapper(Env::Default()));
return env_guard->get();
});
TEST_F(EnvRegistryTest, Basics) { TEST_F(EnvRegistryTest, Basics) {
std::string msg;
std::unique_ptr<Env> env_guard; std::unique_ptr<Env> env_guard;
auto res = NewCustomObject<Env>("a://test", &env_guard); auto registry = ObjectRegistry::NewInstance();
auto res = registry->NewObject<Env>("a://test", &env_guard, &msg);
ASSERT_NE(res, nullptr); ASSERT_NE(res, nullptr);
ASSERT_EQ(env_guard, nullptr); ASSERT_EQ(env_guard, nullptr);
ASSERT_EQ(1, num_a); ASSERT_EQ(1, num_a);
ASSERT_EQ(0, num_b); ASSERT_EQ(0, num_b);
res = NewCustomObject<Env>("b://test", &env_guard); res = registry->NewObject<Env>("b://test", &env_guard, &msg);
ASSERT_NE(res, nullptr); ASSERT_NE(res, nullptr);
ASSERT_NE(env_guard, nullptr); ASSERT_NE(env_guard, nullptr);
ASSERT_EQ(1, num_a); ASSERT_EQ(1, num_a);
ASSERT_EQ(1, num_b); ASSERT_EQ(1, num_b);
res = NewCustomObject<Env>("c://test", &env_guard); res = registry->NewObject<Env>("c://test", &env_guard, &msg);
ASSERT_EQ(res, nullptr); ASSERT_EQ(res, nullptr);
ASSERT_EQ(env_guard, nullptr); ASSERT_EQ(env_guard, nullptr);
ASSERT_EQ(1, num_a); ASSERT_EQ(1, num_a);
ASSERT_EQ(1, num_b); ASSERT_EQ(1, num_b);
} }
TEST_F(EnvRegistryTest, LocalRegistry) {
std::string msg;
std::unique_ptr<Env> guard;
auto registry = ObjectRegistry::NewInstance();
std::shared_ptr<ObjectLibrary> library = std::make_shared<ObjectLibrary>();
registry->AddLibrary(library);
library->Register<Env>(
"test-local",
[](const std::string& /*uri*/, std::unique_ptr<Env>* /*guard */,
std::string* /* errmsg */) { return Env::Default(); });
ObjectLibrary::Default()->Register<Env>(
"test-global",
[](const std::string& /*uri*/, std::unique_ptr<Env>* /*guard */,
std::string* /* errmsg */) { return Env::Default(); });
ASSERT_EQ(
ObjectRegistry::NewInstance()->NewObject<Env>("test-local", &guard, &msg),
nullptr);
ASSERT_NE(
ObjectRegistry::NewInstance()->NewObject("test-global", &guard, &msg),
nullptr);
ASSERT_NE(registry->NewObject<Env>("test-local", &guard, &msg), nullptr);
ASSERT_NE(registry->NewObject<Env>("test-global", &guard, &msg), nullptr);
}
TEST_F(EnvRegistryTest, CheckShared) {
std::shared_ptr<Env> shared;
std::shared_ptr<ObjectRegistry> registry = ObjectRegistry::NewInstance();
std::shared_ptr<ObjectLibrary> library = std::make_shared<ObjectLibrary>();
registry->AddLibrary(library);
library->Register<Env>(
"unguarded",
[](const std::string& /*uri*/, std::unique_ptr<Env>* /*guard */,
std::string* /* errmsg */) { return Env::Default(); });
library->Register<Env>(
"guarded", [](const std::string& /*uri*/, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new EnvWrapper(Env::Default()));
return guard->get();
});
ASSERT_OK(registry->NewSharedObject<Env>("guarded", &shared));
ASSERT_NE(shared, nullptr);
shared.reset();
ASSERT_NOK(registry->NewSharedObject<Env>("unguarded", &shared));
ASSERT_EQ(shared, nullptr);
}
TEST_F(EnvRegistryTest, CheckStatic) {
Env* env = nullptr;
std::shared_ptr<ObjectRegistry> registry = ObjectRegistry::NewInstance();
std::shared_ptr<ObjectLibrary> library = std::make_shared<ObjectLibrary>();
registry->AddLibrary(library);
library->Register<Env>(
"unguarded",
[](const std::string& /*uri*/, std::unique_ptr<Env>* /*guard */,
std::string* /* errmsg */) { return Env::Default(); });
library->Register<Env>(
"guarded", [](const std::string& /*uri*/, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new EnvWrapper(Env::Default()));
return guard->get();
});
ASSERT_NOK(registry->NewStaticObject<Env>("guarded", &env));
ASSERT_EQ(env, nullptr);
env = nullptr;
ASSERT_OK(registry->NewStaticObject<Env>("unguarded", &env));
ASSERT_NE(env, nullptr);
}
TEST_F(EnvRegistryTest, CheckUnique) {
std::unique_ptr<Env> unique;
std::shared_ptr<ObjectRegistry> registry = ObjectRegistry::NewInstance();
std::shared_ptr<ObjectLibrary> library = std::make_shared<ObjectLibrary>();
registry->AddLibrary(library);
library->Register<Env>(
"unguarded",
[](const std::string& /*uri*/, std::unique_ptr<Env>* /*guard */,
std::string* /* errmsg */) { return Env::Default(); });
library->Register<Env>(
"guarded", [](const std::string& /*uri*/, std::unique_ptr<Env>* guard,
std::string* /* errmsg */) {
guard->reset(new EnvWrapper(Env::Default()));
return guard->get();
});
ASSERT_OK(registry->NewUniqueObject<Env>("guarded", &unique));
ASSERT_NE(unique, nullptr);
unique.reset();
ASSERT_NOK(registry->NewUniqueObject<Env>("unguarded", &unique));
ASSERT_EQ(unique, nullptr);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save