Make FlushBlockPolicyFactory into a Customizable class (#8432)

Summary:
Add ability to treat FlushBlockPolicyFactory as a Customizable.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8432

Reviewed By: zhichao-cao

Differential Revision: D29558941

Pulled By: mrambacher

fbshipit-source-id: 4a791af941ea4a65fc2f1fdfb1d7a95f42ca6774
main
mrambacher 3 years ago committed by Facebook GitHub Bot
parent 5afd1e309c
commit c8665611bc
  1. 6
      include/rocksdb/configurable.h
  2. 20
      include/rocksdb/flush_block_policy.h
  3. 1
      include/rocksdb/utilities/options_type.h
  4. 4
      options/configurable.cc
  5. 92
      options/customizable_test.cc
  6. 7
      options/options_helper.cc
  7. 7
      table/block_based/block_based_table_factory.cc
  8. 61
      table/block_based/flush_block_policy.cc
  9. 5
      table/block_based/flush_block_policy.h

@ -56,7 +56,7 @@ class Configurable {
}; };
public: public:
Configurable() : prepared_(false) {} Configurable();
virtual ~Configurable() {} virtual ~Configurable() {}
// Returns the raw pointer of the named options that is used by this // Returns the raw pointer of the named options that is used by this
@ -265,12 +265,12 @@ class Configurable {
// Returns true if this object has been initialized via PrepareOptions, false // Returns true if this object has been initialized via PrepareOptions, false
// otherwise. Once an object has been prepared, only mutable options may be // otherwise. Once an object has been prepared, only mutable options may be
// changed. // changed.
virtual bool IsPrepared() const { return prepared_; } virtual bool IsPrepared() const { return is_prepared_; }
protected: protected:
// True once the object is prepared. Once the object is prepared, only // True once the object is prepared. Once the object is prepared, only
// mutable options can be configured. // mutable options can be configured.
std::atomic<bool> prepared_; std::atomic<bool> is_prepared_;
// Returns the raw pointer for the associated named option. // Returns the raw pointer for the associated named option.
// The name is typically the name of an option registered via the // The name is typically the name of an option registered via the

@ -6,12 +6,15 @@
#pragma once #pragma once
#include <string> #include <string>
#include "rocksdb/customizable.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class Slice; class Slice;
class BlockBuilder; class BlockBuilder;
struct ConfigOptions;
struct Options; struct Options;
// FlushBlockPolicy provides a configurable way to determine when to flush a // FlushBlockPolicy provides a configurable way to determine when to flush a
@ -25,10 +28,16 @@ class FlushBlockPolicy {
virtual ~FlushBlockPolicy() {} virtual ~FlushBlockPolicy() {}
}; };
class FlushBlockPolicyFactory { class FlushBlockPolicyFactory : public Customizable {
public: public:
// Return the name of the flush block policy. static const char* Type() { return "FlushBlockPolicyFactory"; }
virtual const char* Name() const = 0;
// Creates a FlushBlockPolicyFactory based on the input value.
// By default, this method can create EveryKey or BySize PolicyFactory,
// which take now config_options.
static Status CreateFromString(
const ConfigOptions& config_options, const std::string& value,
std::shared_ptr<FlushBlockPolicyFactory>* result);
// Return a new block flush policy that flushes data blocks by data size. // Return a new block flush policy that flushes data blocks by data size.
// FlushBlockPolicy may need to access the metadata of the data block // FlushBlockPolicy may need to access the metadata of the data block
@ -45,9 +54,10 @@ class FlushBlockPolicyFactory {
class FlushBlockBySizePolicyFactory : public FlushBlockPolicyFactory { class FlushBlockBySizePolicyFactory : public FlushBlockPolicyFactory {
public: public:
FlushBlockBySizePolicyFactory() {} FlushBlockBySizePolicyFactory();
const char* Name() const override { return "FlushBlockBySizePolicyFactory"; } static const char* kClassName() { return "FlushBlockBySizePolicyFactory"; }
const char* Name() const override { return kClassName(); }
FlushBlockPolicy* NewFlushBlockPolicy( FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& table_options, const BlockBasedTableOptions& table_options,

@ -41,7 +41,6 @@ enum class OptionType {
kMergeOperator, kMergeOperator,
kMemTableRepFactory, kMemTableRepFactory,
kFilterPolicy, kFilterPolicy,
kFlushBlockPolicyFactory,
kChecksumType, kChecksumType,
kEncodingType, kEncodingType,
kEnv, kEnv,

@ -17,6 +17,8 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
Configurable::Configurable() : is_prepared_(false) {}
void Configurable::RegisterOptions( void Configurable::RegisterOptions(
const std::string& name, void* opt_ptr, const std::string& name, void* opt_ptr,
const std::unordered_map<std::string, OptionTypeInfo>* type_map) { const std::unordered_map<std::string, OptionTypeInfo>* type_map) {
@ -64,7 +66,7 @@ Status Configurable::PrepareOptions(const ConfigOptions& opts) {
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
if (status.ok()) { if (status.ok()) {
prepared_ = true; is_prepared_ = true;
} }
} }
return status; return status;

@ -18,10 +18,12 @@
#include "options/options_helper.h" #include "options/options_helper.h"
#include "options/options_parser.h" #include "options/options_parser.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/secondary_cache.h" #include "rocksdb/secondary_cache.h"
#include "rocksdb/utilities/customizable_util.h" #include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/object_registry.h" #include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h" #include "rocksdb/utilities/options_type.h"
#include "table/block_based/flush_block_policy.h"
#include "table/mock_table.h" #include "table/mock_table.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
@ -56,7 +58,6 @@ class TestCustomizable : public Customizable {
// Method to allow CheckedCast to work for this class // Method to allow CheckedCast to work for this class
static const char* kClassName() { static const char* kClassName() {
return "TestCustomizable"; return "TestCustomizable";
;
} }
const char* Name() const override { return name_.c_str(); } const char* Name() const override { return name_.c_str(); }
@ -873,8 +874,8 @@ TEST_F(CustomizableTest, MutableOptionsTest) {
class TestSecondaryCache : public SecondaryCache { class TestSecondaryCache : public SecondaryCache {
public: public:
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "Test"; } static const char* kClassName() { return "Test"; }
const char* Name() const override { return kClassName(); }
Status Insert(const Slice& /*key*/, void* /*value*/, Status Insert(const Slice& /*key*/, void* /*value*/,
const Cache::CacheItemHelper* /*helper*/) override { const Cache::CacheItemHelper* /*helper*/) override {
return Status::NotSupported(); return Status::NotSupported();
@ -916,10 +917,33 @@ static int RegisterTestObjects(ObjectLibrary& library,
return static_cast<int>(library.GetFactoryCount(&num_types)); return static_cast<int>(library.GetFactoryCount(&num_types));
} }
class TestFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
public:
TestFlushBlockPolicyFactory() {}
static const char* kClassName() { return "TestFlushBlockPolicyFactory"; }
const char* Name() const override { return kClassName(); }
FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& /*table_options*/,
const BlockBuilder& /*data_block_builder*/) const override {
return nullptr;
}
};
static int RegisterLocalObjects(ObjectLibrary& library, static int RegisterLocalObjects(ObjectLibrary& library,
const std::string& /*arg*/) { const std::string& /*arg*/) {
size_t num_types; size_t num_types;
// Load any locally defined objects here // Load any locally defined objects here
library.Register<FlushBlockPolicyFactory>(
TestFlushBlockPolicyFactory::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<FlushBlockPolicyFactory>* guard,
std::string* /* errmsg */) {
guard->reset(new TestFlushBlockPolicyFactory());
return guard->get();
});
library.Register<SecondaryCache>( library.Register<SecondaryCache>(
TestSecondaryCache::kClassName(), TestSecondaryCache::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<SecondaryCache>* guard, [](const std::string& /*uri*/, std::unique_ptr<SecondaryCache>* guard,
@ -954,6 +978,7 @@ class LoadCustomizableTest : public testing::Test {
}; };
TEST_F(LoadCustomizableTest, LoadTableFactoryTest) { TEST_F(LoadCustomizableTest, LoadTableFactoryTest) {
ColumnFamilyOptions cf_opts;
std::shared_ptr<TableFactory> factory; std::shared_ptr<TableFactory> factory;
ASSERT_NOK( ASSERT_NOK(
TableFactory::CreateFromString(config_options_, "MockTable", &factory)); TableFactory::CreateFromString(config_options_, "MockTable", &factory));
@ -961,12 +986,27 @@ TEST_F(LoadCustomizableTest, LoadTableFactoryTest) {
config_options_, TableFactory::kBlockBasedTableName(), &factory)); config_options_, TableFactory::kBlockBasedTableName(), &factory));
ASSERT_NE(factory, nullptr); ASSERT_NE(factory, nullptr);
ASSERT_STREQ(factory->Name(), TableFactory::kBlockBasedTableName()); ASSERT_STREQ(factory->Name(), TableFactory::kBlockBasedTableName());
#ifndef ROCKSDB_LITE
std::string opts_str = "table_factory=";
ASSERT_OK(GetColumnFamilyOptionsFromString(
config_options_, ColumnFamilyOptions(),
opts_str + TableFactory::kBlockBasedTableName(), &cf_opts));
ASSERT_NE(cf_opts.table_factory.get(), nullptr);
ASSERT_STREQ(cf_opts.table_factory->Name(),
TableFactory::kBlockBasedTableName());
#endif // ROCKSDB_LITE
if (RegisterTests("Test")) { if (RegisterTests("Test")) {
ASSERT_OK( ASSERT_OK(
TableFactory::CreateFromString(config_options_, "MockTable", &factory)); TableFactory::CreateFromString(config_options_, "MockTable", &factory));
ASSERT_NE(factory, nullptr); ASSERT_NE(factory, nullptr);
ASSERT_STREQ(factory->Name(), "MockTable"); ASSERT_STREQ(factory->Name(), "MockTable");
#ifndef ROCKSDB_LITE
ASSERT_OK(
GetColumnFamilyOptionsFromString(config_options_, ColumnFamilyOptions(),
opts_str + "MockTable", &cf_opts));
ASSERT_NE(cf_opts.table_factory.get(), nullptr);
ASSERT_STREQ(cf_opts.table_factory->Name(), "MockTable");
#endif // ROCKSDB_LITE
} }
} }
@ -1007,6 +1047,52 @@ TEST_F(LoadCustomizableTest, LoadComparatorTest) {
} }
} }
TEST_F(LoadCustomizableTest, LoadFlushBlockPolicyFactoryTest) {
std::shared_ptr<TableFactory> table;
std::shared_ptr<FlushBlockPolicyFactory> result;
ASSERT_NOK(FlushBlockPolicyFactory::CreateFromString(
config_options_, "TestFlushBlockPolicyFactory", &result));
ASSERT_OK(
FlushBlockPolicyFactory::CreateFromString(config_options_, "", &result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), FlushBlockBySizePolicyFactory::kClassName());
ASSERT_OK(FlushBlockPolicyFactory::CreateFromString(
config_options_, FlushBlockEveryKeyPolicyFactory::kClassName(), &result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), FlushBlockEveryKeyPolicyFactory::kClassName());
ASSERT_OK(FlushBlockPolicyFactory::CreateFromString(
config_options_, FlushBlockBySizePolicyFactory::kClassName(), &result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), FlushBlockBySizePolicyFactory::kClassName());
#ifndef ROCKSDB_LITE
std::string table_opts = "id=BlockBasedTable; flush_block_policy_factory=";
ASSERT_OK(TableFactory::CreateFromString(
config_options_,
table_opts + FlushBlockEveryKeyPolicyFactory::kClassName(), &table));
auto bbto = table->GetOptions<BlockBasedTableOptions>();
ASSERT_NE(bbto, nullptr);
ASSERT_NE(bbto->flush_block_policy_factory.get(), nullptr);
ASSERT_STREQ(bbto->flush_block_policy_factory->Name(),
FlushBlockEveryKeyPolicyFactory::kClassName());
if (RegisterTests("Test")) {
ASSERT_OK(FlushBlockPolicyFactory::CreateFromString(
config_options_, "TestFlushBlockPolicyFactory", &result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), "TestFlushBlockPolicyFactory");
ASSERT_OK(TableFactory::CreateFromString(
config_options_, table_opts + "TestFlushBlockPolicyFactory", &table));
bbto = table->GetOptions<BlockBasedTableOptions>();
ASSERT_NE(bbto, nullptr);
ASSERT_NE(bbto->flush_block_policy_factory.get(), nullptr);
ASSERT_STREQ(bbto->flush_block_policy_factory->Name(),
"TestFlushBlockPolicyFactory");
}
#endif // ROCKSDB_LITE
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);

@ -598,13 +598,6 @@ bool SerializeSingleOptionHelper(const void* opt_address,
return SerializeEnum<ChecksumType>( return SerializeEnum<ChecksumType>(
checksum_type_string_map, checksum_type_string_map,
*static_cast<const ChecksumType*>(opt_address), value); *static_cast<const ChecksumType*>(opt_address), value);
case OptionType::kFlushBlockPolicyFactory: {
const auto* ptr =
static_cast<const std::shared_ptr<FlushBlockPolicyFactory>*>(
opt_address);
*value = ptr->get() ? ptr->get()->Name() : kNullptrString;
break;
}
case OptionType::kEncodingType: case OptionType::kEncodingType:
return SerializeEnum<EncodingType>( return SerializeEnum<EncodingType>(
encoding_type_string_map, encoding_type_string_map,

@ -230,9 +230,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
std::shared_ptr<Cache> block_cache_compressed = nullptr; std::shared_ptr<Cache> block_cache_compressed = nullptr;
*/ */
{"flush_block_policy_factory", {"flush_block_policy_factory",
{offsetof(struct BlockBasedTableOptions, flush_block_policy_factory), OptionTypeInfo::AsCustomSharedPtr<FlushBlockPolicyFactory>(
OptionType::kFlushBlockPolicyFactory, OptionVerificationType::kByName, offsetof(struct BlockBasedTableOptions,
OptionTypeFlags::kCompareNever}}, flush_block_policy_factory),
OptionVerificationType::kByName, OptionTypeFlags::kCompareNever)},
{"cache_index_and_filter_blocks", {"cache_index_and_filter_blocks",
{offsetof(struct BlockBasedTableOptions, {offsetof(struct BlockBasedTableOptions,
cache_index_and_filter_blocks), cache_index_and_filter_blocks),

@ -4,12 +4,17 @@
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/flush_block_policy.h" #include "rocksdb/flush_block_policy.h"
#include <cassert>
#include <mutex>
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/utilities/customizable_util.h"
#include "table/block_based/block_builder.h" #include "table/block_based/block_builder.h"
#include "table/block_based/flush_block_policy.h"
#include "table/format.h" #include "table/format.h"
#include <cassert>
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -85,4 +90,58 @@ FlushBlockPolicy* FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
return new FlushBlockBySizePolicy(size, deviation, false, data_block_builder); return new FlushBlockBySizePolicy(size, deviation, false, data_block_builder);
} }
#ifndef ROCKSDB_LITE
static int RegisterFlushBlockPolicyFactories(ObjectLibrary& library,
const std::string& /*arg*/) {
library.Register<FlushBlockPolicyFactory>(
FlushBlockBySizePolicyFactory::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<FlushBlockPolicyFactory>* guard,
std::string* /* errmsg */) {
guard->reset(new FlushBlockBySizePolicyFactory());
return guard->get();
});
library.Register<FlushBlockPolicyFactory>(
FlushBlockEveryKeyPolicyFactory::kClassName(),
[](const std::string& /*uri*/,
std::unique_ptr<FlushBlockPolicyFactory>* guard,
std::string* /* errmsg */) {
guard->reset(new FlushBlockEveryKeyPolicyFactory());
return guard->get();
});
return 2;
}
#endif // ROCKSDB_LITE
static bool LoadFlushPolicyFactory(
const std::string& id, std::shared_ptr<FlushBlockPolicyFactory>* result) {
if (id.empty()) {
result->reset(new FlushBlockBySizePolicyFactory());
#ifdef ROCKSDB_LITE
} else if (id == FlushBlockBySizePolicyFactory::kClassName()) {
result->reset(new FlushBlockBySizePolicyFactory());
} else if (id == FlushBlockEveryKeyPolicyFactory::kClassName()) {
result->reset(new FlushBlockEveryKeyPolicyFactory());
#endif // ROCKSDB_LITE
} else {
return false;
}
return true;
}
FlushBlockBySizePolicyFactory::FlushBlockBySizePolicyFactory()
: FlushBlockPolicyFactory() {}
Status FlushBlockPolicyFactory::CreateFromString(
const ConfigOptions& config_options, const std::string& value,
std::shared_ptr<FlushBlockPolicyFactory>* factory) {
#ifndef ROCKSDB_LITE
static std::once_flag once;
std::call_once(once, [&]() {
RegisterFlushBlockPolicyFactories(*(ObjectLibrary::Default().get()), "");
});
#endif // ROCKSDB_LITE
return LoadSharedObject<FlushBlockPolicyFactory>(
config_options, value, LoadFlushPolicyFactory, factory);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -27,9 +27,8 @@ class FlushBlockEveryKeyPolicyFactory : public FlushBlockPolicyFactory {
public: public:
explicit FlushBlockEveryKeyPolicyFactory() {} explicit FlushBlockEveryKeyPolicyFactory() {}
const char* Name() const override { static const char* kClassName() { return "FlushBlockEveryKeyPolicyFactory"; }
return "FlushBlockEveryKeyPolicyFactory"; const char* Name() const override { return kClassName(); }
}
FlushBlockPolicy* NewFlushBlockPolicy( FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& /*table_options*/, const BlockBasedTableOptions& /*table_options*/,

Loading…
Cancel
Save