diff --git a/CMakeLists.txt b/CMakeLists.txt index 7b587f54c..43b3f6275 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -849,6 +849,7 @@ set(SOURCES utilities/cassandra/format.cc utilities/cassandra/merge_operator.cc utilities/checkpoint/checkpoint_impl.cc + utilities/compaction_filters.cc utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc utilities/debug.cc utilities/env_mirror.cc @@ -857,6 +858,7 @@ set(SOURCES utilities/fault_injection_fs.cc utilities/leveldb_options/leveldb_options.cc utilities/memory/memory_util.cc + utilities/merge_operators.cc utilities/merge_operators/bytesxor.cc utilities/merge_operators/max.cc utilities/merge_operators/put.cc diff --git a/TARGETS b/TARGETS index c73bd84ab..d96845938 100644 --- a/TARGETS +++ b/TARGETS @@ -368,6 +368,7 @@ cpp_library( "utilities/cassandra/format.cc", "utilities/cassandra/merge_operator.cc", "utilities/checkpoint/checkpoint_impl.cc", + "utilities/compaction_filters.cc", "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", "utilities/convenience/info_log_finder.cc", "utilities/debug.cc", @@ -377,6 +378,7 @@ cpp_library( "utilities/fault_injection_fs.cc", "utilities/leveldb_options/leveldb_options.cc", "utilities/memory/memory_util.cc", + "utilities/merge_operators.cc", "utilities/merge_operators/bytesxor.cc", "utilities/merge_operators/max.cc", "utilities/merge_operators/put.cc", @@ -681,6 +683,7 @@ cpp_library( "utilities/cassandra/format.cc", "utilities/cassandra/merge_operator.cc", "utilities/checkpoint/checkpoint_impl.cc", + "utilities/compaction_filters.cc", "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", "utilities/convenience/info_log_finder.cc", "utilities/debug.cc", @@ -690,6 +693,7 @@ cpp_library( "utilities/fault_injection_fs.cc", "utilities/leveldb_options/leveldb_options.cc", "utilities/memory/memory_util.cc", + "utilities/merge_operators.cc", "utilities/merge_operators/bytesxor.cc", "utilities/merge_operators/max.cc", "utilities/merge_operators/put.cc", diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 9128bba98..e20c6f865 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -337,8 +337,9 @@ class PartialDeleteCompactionFilter : public CompactionFilter { TEST_F(CompactionServiceTest, CompactionFilter) { Options options = CurrentOptions(); options.env = env_; - auto delete_comp_filter = PartialDeleteCompactionFilter(); - options.compaction_filter = &delete_comp_filter; + std::unique_ptr delete_comp_filter( + new PartialDeleteCompactionFilter()); + options.compaction_filter = delete_comp_filter.get(); options.compaction_service = std::make_shared(dbname_, options); diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 14515976e..400f3388e 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -13,6 +13,7 @@ #include #include +#include "rocksdb/customizable.h" #include "rocksdb/rocksdb_namespace.h" #include "rocksdb/types.h" @@ -24,7 +25,7 @@ class SliceTransform; // CompactionFilter allows an application to modify/delete a key-value during // table file creation. -class CompactionFilter { +class CompactionFilter : public Customizable { public: enum ValueType { kValue, @@ -59,6 +60,10 @@ class CompactionFilter { }; virtual ~CompactionFilter() {} + static const char* Type() { return "CompactionFilter"; } + static Status CreateFromString(const ConfigOptions& config_options, + const std::string& name, + const CompactionFilter** result); // The table file creation process invokes this method before adding a kv to // the table file. A return value of false indicates that the kv should be @@ -193,7 +198,7 @@ class CompactionFilter { // Returns a name that identifies this `CompactionFilter`. // The name will be printed to LOG file on start up for diagnosis. - virtual const char* Name() const = 0; + const char* Name() const override = 0; // Internal (BlobDB) use only. Do not override in application code. virtual bool IsStackedBlobDbInternalCompactionFilter() const { return false; } @@ -214,9 +219,13 @@ class CompactionFilter { // `CompactionFilter` according to `ShouldFilterTableFileCreation()`. This // allows the application to know about the different ongoing threads of work // and makes it unnecessary for `CompactionFilter` to provide thread-safety. -class CompactionFilterFactory { +class CompactionFilterFactory : public Customizable { public: virtual ~CompactionFilterFactory() {} + static const char* Type() { return "CompactionFilterFactory"; } + static Status CreateFromString( + const ConfigOptions& config_options, const std::string& name, + std::shared_ptr* result); // Returns whether a thread creating table files for the specified `reason` // should invoke `CreateCompactionFilter()` and pass KVs through the returned diff --git a/include/rocksdb/customizable.h b/include/rocksdb/customizable.h index c04e71360..b168f472e 100644 --- a/include/rocksdb/customizable.h +++ b/include/rocksdb/customizable.h @@ -87,7 +87,18 @@ class Customizable : public Configurable { // @param name The name of the instance to find. // Returns true if the class is an instance of the input name. virtual bool IsInstanceOf(const std::string& name) const { - return name == Name(); + if (name.empty()) { + return false; + } else if (name == Name()) { + return true; + } else { + const char* nickname = NickName(); + if (nickname != nullptr && name == nickname) { + return true; + } else { + return false; + } + } } // Returns the named instance of the Customizable as a T*, or nullptr if not @@ -179,6 +190,10 @@ class Customizable : public Configurable { virtual const Customizable* Inner() const { return nullptr; } protected: + // Some classes have both a class name (e.g. PutOperator) and a nickname + // (e.g. put). Classes can override this method to return a + // nickname. Nicknames can be used by InstanceOf and object creation. + virtual const char* NickName() const { return ""; } // Given a name (e.g. rocksdb.my.type.opt), returns the short name (opt) std::string GetOptionName(const std::string& long_name) const override; #ifndef ROCKSDB_LITE diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index 0b04ec310..68065454a 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -10,6 +10,7 @@ #include #include +#include "rocksdb/customizable.h" #include "rocksdb/slice.h" namespace ROCKSDB_NAMESPACE { @@ -43,10 +44,13 @@ class Logger; // // Refer to rocksdb-merge wiki for more details and example implementations. // -class MergeOperator { +class MergeOperator : public Customizable { public: virtual ~MergeOperator() {} static const char* Type() { return "MergeOperator"; } + static Status CreateFromString(const ConfigOptions& opts, + const std::string& id, + std::shared_ptr* result); // Gives the client a way to express the read -> modify -> write semantics // key: (IN) The key that's associated with this merge operation. diff --git a/include/rocksdb/utilities/options_type.h b/include/rocksdb/utilities/options_type.h index a4e36752b..ac3bfcba2 100644 --- a/include/rocksdb/utilities/options_type.h +++ b/include/rocksdb/utilities/options_type.h @@ -35,10 +35,7 @@ enum class OptionType { kCompactionPri, kSliceTransform, kCompressionType, - kCompactionFilter, - kCompactionFilterFactory, kCompactionStopStyle, - kMergeOperator, kMemTableRepFactory, kFilterPolicy, kChecksumType, diff --git a/options/cf_options.cc b/options/cf_options.cc index 89b6ed43f..830f820ef 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -15,6 +15,7 @@ #include "options/options_helper.h" #include "options/options_parser.h" #include "port/port.h" +#include "rocksdb/compaction_filter.h" #include "rocksdb/concurrent_task_limiter.h" #include "rocksdb/configurable.h" #include "rocksdb/convenience.h" @@ -656,30 +657,18 @@ static std::unordered_map } }}}, {"compaction_filter", - {offset_of(&ImmutableCFOptions::compaction_filter), - OptionType::kCompactionFilter, OptionVerificationType::kByName, - OptionTypeFlags::kNone}}, + OptionTypeInfo::AsCustomRawPtr( + offset_of(&ImmutableCFOptions::compaction_filter), + OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)}, {"compaction_filter_factory", - {offset_of(&ImmutableCFOptions::compaction_filter_factory), - OptionType::kCompactionFilterFactory, OptionVerificationType::kByName, - OptionTypeFlags::kNone}}, + OptionTypeInfo::AsCustomSharedPtr( + offset_of(&ImmutableCFOptions::compaction_filter_factory), + OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)}, {"merge_operator", - {offset_of(&ImmutableCFOptions::merge_operator), - OptionType::kMergeOperator, - OptionVerificationType::kByNameAllowFromNull, - OptionTypeFlags::kCompareLoose, - // Parses the input value as a MergeOperator, updating the value - [](const ConfigOptions& opts, const std::string& /*name*/, - const std::string& value, void* addr) { - auto mop = static_cast*>(addr); - Status status = - opts.registry->NewSharedObject(value, mop); - // Only support static comparator for now. - if (status.ok()) { - return status; - } - return Status::OK(); - }}}, + OptionTypeInfo::AsCustomSharedPtr( + offset_of(&ImmutableCFOptions::merge_operator), + OptionVerificationType::kByNameAllowFromNull, + OptionTypeFlags::kCompareLoose | OptionTypeFlags::kAllowNull)}, {"compaction_style", {offset_of(&ImmutableCFOptions::compaction_style), OptionType::kCompactionStyle, OptionVerificationType::kNormal, diff --git a/options/customizable_test.cc b/options/customizable_test.cc index 814bb686d..5eae06175 100644 --- a/options/customizable_test.cc +++ b/options/customizable_test.cc @@ -29,6 +29,7 @@ #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/string_util.h" +#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h" #ifndef GFLAGS bool FLAGS_enable_print = false; @@ -944,6 +945,26 @@ static int RegisterTestObjects(ObjectLibrary& library, static test::SimpleSuffixReverseComparator ssrc; return &ssrc; }); + library.Register( + "Changling", + [](const std::string& uri, std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new test::ChanglingMergeOperator(uri)); + return guard->get(); + }); + library.Register( + "Changling", + [](const std::string& uri, std::unique_ptr* /*guard*/, + std::string* /* errmsg */) { + return new test::ChanglingCompactionFilter(uri); + }); + library.Register( + "Changling", [](const std::string& uri, + std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new test::ChanglingCompactionFilterFactory(uri)); + return guard->get(); + }); return static_cast(library.GetFactoryCount(&num_types)); } @@ -1136,6 +1157,58 @@ TEST_F(LoadCustomizableTest, LoadComparatorTest) { } } +TEST_F(LoadCustomizableTest, LoadMergeOperatorTest) { + std::shared_ptr result; + + ASSERT_NOK( + MergeOperator::CreateFromString(config_options_, "Changling", &result)); + ASSERT_OK(MergeOperator::CreateFromString(config_options_, "put", &result)); + ASSERT_NE(result, nullptr); + ASSERT_STREQ(result->Name(), "PutOperator"); + if (RegisterTests("Test")) { + ASSERT_OK( + MergeOperator::CreateFromString(config_options_, "Changling", &result)); + ASSERT_NE(result, nullptr); + ASSERT_STREQ(result->Name(), "ChanglingMergeOperator"); + } +} + +TEST_F(LoadCustomizableTest, LoadCompactionFilterFactoryTest) { + std::shared_ptr result; + + ASSERT_NOK(CompactionFilterFactory::CreateFromString(config_options_, + "Changling", &result)); + if (RegisterTests("Test")) { + ASSERT_OK(CompactionFilterFactory::CreateFromString(config_options_, + "Changling", &result)); + ASSERT_NE(result, nullptr); + ASSERT_STREQ(result->Name(), "ChanglingCompactionFilterFactory"); + } +} + +TEST_F(LoadCustomizableTest, LoadCompactionFilterTest) { + const CompactionFilter* result = nullptr; + + ASSERT_NOK(CompactionFilter::CreateFromString(config_options_, "Changling", + &result)); +#ifndef ROCKSDB_LITE + ASSERT_OK(CompactionFilter::CreateFromString( + config_options_, RemoveEmptyValueCompactionFilter::kClassName(), + &result)); + ASSERT_NE(result, nullptr); + ASSERT_STREQ(result->Name(), RemoveEmptyValueCompactionFilter::kClassName()); + delete result; + result = nullptr; + if (RegisterTests("Test")) { + ASSERT_OK(CompactionFilter::CreateFromString(config_options_, "Changling", + &result)); + ASSERT_NE(result, nullptr); + ASSERT_STREQ(result->Name(), "ChanglingCompactionFilter"); + delete result; + } +#endif // ROCKSDB_LITE +} + #ifndef ROCKSDB_LITE TEST_F(LoadCustomizableTest, LoadEventListenerTest) { std::shared_ptr result; diff --git a/options/options_helper.cc b/options/options_helper.cc index c11984ff9..157537923 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -562,32 +562,12 @@ bool SerializeSingleOptionHelper(const void* opt_address, : kNullptrString; break; } - case OptionType::kCompactionFilter: { - // it's a const pointer of const CompactionFilter* - const auto* ptr = - static_cast(opt_address); - *value = *ptr ? (*ptr)->Name() : kNullptrString; - break; - } - case OptionType::kCompactionFilterFactory: { - const auto* ptr = - static_cast*>( - opt_address); - *value = ptr->get() ? ptr->get()->Name() : kNullptrString; - break; - } case OptionType::kMemTableRepFactory: { const auto* ptr = static_cast*>(opt_address); *value = ptr->get() ? ptr->get()->Name() : kNullptrString; break; } - case OptionType::kMergeOperator: { - const auto* ptr = - static_cast*>(opt_address); - *value = ptr->get() ? ptr->get()->Name() : kNullptrString; - break; - } case OptionType::kFilterPolicy: { const auto* ptr = static_cast*>(opt_address); diff --git a/options/options_test.cc b/options/options_test.cc index 0d6570494..3905a9577 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -30,6 +30,9 @@ #include "util/stderr_logger.h" #include "util/string_util.h" #include "utilities/merge_operators/bytesxor.h" +#include "utilities/merge_operators/sortlist.h" +#include "utilities/merge_operators/string_append/stringappend.h" +#include "utilities/merge_operators/string_append/stringappend2.h" #ifndef GFLAGS bool FLAGS_enable_print = false; @@ -395,13 +398,6 @@ TEST_F(OptionsTest, GetColumnFamilyOptionsFromStringTest) { // MergeOperator from object registry std::unique_ptr bxo(new BytesXOROperator()); std::string kMoName = bxo->Name(); - ObjectLibrary::Default()->Register( - kMoName, - [](const std::string& /*name*/, std::unique_ptr* guard, - std::string* /* errmsg */) { - guard->reset(new BytesXOROperator()); - return guard->get(); - }); ASSERT_OK(GetColumnFamilyOptionsFromString(config_options, base_cf_opt, "merge_operator=" + kMoName + ";", @@ -2281,14 +2277,6 @@ TEST_F(OptionsOldApiTest, GetColumnFamilyOptionsFromStringTest) { // MergeOperator from object registry std::unique_ptr bxo(new BytesXOROperator()); std::string kMoName = bxo->Name(); - ObjectLibrary::Default()->Register( - kMoName, - [](const std::string& /*name*/, std::unique_ptr* guard, - std::string* /* errmsg */) { - guard->reset(new BytesXOROperator()); - return guard->get(); - }); - ASSERT_OK(GetColumnFamilyOptionsFromString( base_cf_opt, "merge_operator=" + kMoName + ";", &new_cf_opt)); ASSERT_EQ(kMoName, std::string(new_cf_opt.merge_operator->Name())); @@ -3096,8 +3084,8 @@ void VerifyCFPointerTypedOptions( // change the name of merge operator back-and-forth { - auto* merge_operator = dynamic_cast( - base_cf_opt->merge_operator.get()); + auto* merge_operator = base_cf_opt->merge_operator + ->CheckedCast(); if (merge_operator != nullptr) { name_buffer = merge_operator->Name(); // change the name and expect non-ok status @@ -3114,8 +3102,8 @@ void VerifyCFPointerTypedOptions( // change the name of the compaction filter factory back-and-forth { auto* compaction_filter_factory = - dynamic_cast( - base_cf_opt->compaction_filter_factory.get()); + base_cf_opt->compaction_filter_factory + ->CheckedCast(); if (compaction_filter_factory != nullptr) { name_buffer = compaction_filter_factory->Name(); // change the name and expect non-ok status @@ -4186,6 +4174,90 @@ TEST_F(ConfigOptionsTest, EnvFromConfigOptions) { delete mem_env; } +TEST_F(ConfigOptionsTest, MergeOperatorFromString) { + ConfigOptions config_options; + std::shared_ptr merge_op; + + ASSERT_OK(MergeOperator::CreateFromString(config_options, "put", &merge_op)); + ASSERT_NE(merge_op, nullptr); + ASSERT_TRUE(merge_op->IsInstanceOf("put")); + ASSERT_STREQ(merge_op->Name(), "PutOperator"); + + ASSERT_OK( + MergeOperator::CreateFromString(config_options, "put_v1", &merge_op)); + ASSERT_NE(merge_op, nullptr); + ASSERT_TRUE(merge_op->IsInstanceOf("PutOperator")); + + ASSERT_OK( + MergeOperator::CreateFromString(config_options, "uint64add", &merge_op)); + ASSERT_NE(merge_op, nullptr); + ASSERT_TRUE(merge_op->IsInstanceOf("uint64add")); + ASSERT_STREQ(merge_op->Name(), "UInt64AddOperator"); + + ASSERT_OK(MergeOperator::CreateFromString(config_options, "max", &merge_op)); + ASSERT_NE(merge_op, nullptr); + ASSERT_TRUE(merge_op->IsInstanceOf("max")); + ASSERT_STREQ(merge_op->Name(), "MaxOperator"); + + ASSERT_OK( + MergeOperator::CreateFromString(config_options, "bytesxor", &merge_op)); + ASSERT_NE(merge_op, nullptr); + ASSERT_TRUE(merge_op->IsInstanceOf("bytesxor")); + ASSERT_STREQ(merge_op->Name(), BytesXOROperator::kClassName()); + + ASSERT_OK( + MergeOperator::CreateFromString(config_options, "sortlist", &merge_op)); + ASSERT_NE(merge_op, nullptr); + ASSERT_TRUE(merge_op->IsInstanceOf("sortlist")); + ASSERT_STREQ(merge_op->Name(), SortList::kClassName()); + + ASSERT_OK(MergeOperator::CreateFromString(config_options, "stringappend", + &merge_op)); + ASSERT_NE(merge_op, nullptr); + ASSERT_TRUE(merge_op->IsInstanceOf("stringappend")); + ASSERT_STREQ(merge_op->Name(), StringAppendOperator::kClassName()); + auto delimiter = merge_op->GetOptions("Delimiter"); + ASSERT_NE(delimiter, nullptr); + ASSERT_EQ(*delimiter, ","); + + ASSERT_OK(MergeOperator::CreateFromString(config_options, "stringappendtest", + &merge_op)); + ASSERT_NE(merge_op, nullptr); + ASSERT_TRUE(merge_op->IsInstanceOf("stringappendtest")); + ASSERT_STREQ(merge_op->Name(), StringAppendTESTOperator::kClassName()); + delimiter = merge_op->GetOptions("Delimiter"); + ASSERT_NE(delimiter, nullptr); + ASSERT_EQ(*delimiter, ","); + + ASSERT_OK(MergeOperator::CreateFromString( + config_options, "id=stringappend; delimiter=||", &merge_op)); + ASSERT_NE(merge_op, nullptr); + ASSERT_TRUE(merge_op->IsInstanceOf("stringappend")); + ASSERT_STREQ(merge_op->Name(), StringAppendOperator::kClassName()); + delimiter = merge_op->GetOptions("Delimiter"); + ASSERT_NE(delimiter, nullptr); + ASSERT_EQ(*delimiter, "||"); + + ASSERT_OK(MergeOperator::CreateFromString( + config_options, "id=stringappendtest; delimiter=&&", &merge_op)); + ASSERT_NE(merge_op, nullptr); + ASSERT_TRUE(merge_op->IsInstanceOf("stringappendtest")); + ASSERT_STREQ(merge_op->Name(), StringAppendTESTOperator::kClassName()); + delimiter = merge_op->GetOptions("Delimiter"); + ASSERT_NE(delimiter, nullptr); + ASSERT_EQ(*delimiter, "&&"); + + std::shared_ptr copy; + std::string mismatch; + std::string opts_str = merge_op->ToString(config_options); + + ASSERT_OK(MergeOperator::CreateFromString(config_options, opts_str, ©)); + ASSERT_TRUE(merge_op->AreEquivalent(config_options, copy.get(), &mismatch)); + ASSERT_NE(copy, nullptr); + delimiter = copy->GetOptions("Delimiter"); + ASSERT_NE(delimiter, nullptr); + ASSERT_EQ(*delimiter, "&&"); +} #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/src.mk b/src.mk index 2a36f9507..053181715 100644 --- a/src.mk +++ b/src.mk @@ -232,6 +232,7 @@ LIB_SOURCES = \ utilities/cassandra/format.cc \ utilities/cassandra/merge_operator.cc \ utilities/checkpoint/checkpoint_impl.cc \ + utilities/compaction_filters.cc \ utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \ utilities/convenience/info_log_finder.cc \ utilities/debug.cc \ @@ -241,6 +242,7 @@ LIB_SOURCES = \ utilities/fault_injection_fs.cc \ utilities/leveldb_options/leveldb_options.cc \ utilities/memory/memory_util.cc \ + utilities/merge_operators.cc \ utilities/merge_operators/max.cc \ utilities/merge_operators/put.cc \ utilities/merge_operators/sortlist.cc \ diff --git a/test_util/testutil.h b/test_util/testutil.h index c1dc09b0c..e688b61c6 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -772,6 +772,15 @@ class ChanglingMergeOperator : public MergeOperator { Logger* /*logger*/) const override { return false; } + static const char* kClassName() { return "ChanglingMergeOperator"; } + virtual bool IsInstanceOf(const std::string& id) const override { + if (id == kClassName()) { + return true; + } else { + return MergeOperator::IsInstanceOf(id); + } + } + virtual const char* Name() const override { return name_.c_str(); } protected: @@ -796,6 +805,15 @@ class ChanglingCompactionFilter : public CompactionFilter { return false; } + static const char* kClassName() { return "ChanglingCompactionFilter"; } + virtual bool IsInstanceOf(const std::string& id) const override { + if (id == kClassName()) { + return true; + } else { + return CompactionFilter::IsInstanceOf(id); + } + } + const char* Name() const override { return name_.c_str(); } private: @@ -821,6 +839,14 @@ class ChanglingCompactionFilterFactory : public CompactionFilterFactory { // Returns a name that identifies this compaction filter factory. const char* Name() const override { return name_.c_str(); } + static const char* kClassName() { return "ChanglingCompactionFilterFactory"; } + virtual bool IsInstanceOf(const std::string& id) const override { + if (id == kClassName()) { + return true; + } else { + return CompactionFilterFactory::IsInstanceOf(id); + } + } protected: std::string name_; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index c4437092f..1b2ad6988 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -3929,6 +3929,7 @@ class Benchmark { void InitializeOptionsFromFlags(Options* opts) { printf("Initializing RocksDB Options from command-line flags\n"); Options& options = *opts; + ConfigOptions config_options(options); assert(db_.db == nullptr); @@ -4294,12 +4295,14 @@ class Benchmark { options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync; // merge operator options - options.merge_operator = MergeOperators::CreateFromStringId( - FLAGS_merge_operator); - if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) { - fprintf(stderr, "invalid merge operator: %s\n", - FLAGS_merge_operator.c_str()); - exit(1); + if (!FLAGS_merge_operator.empty()) { + Status s = MergeOperator::CreateFromString( + config_options, FLAGS_merge_operator, &options.merge_operator); + if (!s.ok()) { + fprintf(stderr, "invalid merge operator[%s]: %s\n", + FLAGS_merge_operator.c_str(), s.ToString().c_str()); + exit(1); + } } options.max_successive_merges = FLAGS_max_successive_merges; options.report_bg_io_stats = FLAGS_report_bg_io_stats; diff --git a/utilities/cassandra/cassandra_compaction_filter.cc b/utilities/cassandra/cassandra_compaction_filter.cc index f0a00e4d1..3d49ea0ab 100644 --- a/utilities/cassandra/cassandra_compaction_filter.cc +++ b/utilities/cassandra/cassandra_compaction_filter.cc @@ -4,15 +4,35 @@ // (found in the LICENSE.Apache file in the root directory). #include "utilities/cassandra/cassandra_compaction_filter.h" + #include + #include "rocksdb/slice.h" +#include "rocksdb/utilities/object_registry.h" +#include "rocksdb/utilities/options_type.h" #include "utilities/cassandra/format.h" +#include "utilities/cassandra/merge_operator.h" namespace ROCKSDB_NAMESPACE { namespace cassandra { +static std::unordered_map + cassandra_filter_type_info = { +#ifndef ROCKSDB_LITE + {"purge_ttl_on_expiration", + {offsetof(struct CassandraOptions, purge_ttl_on_expiration), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"gc_grace_period_in_seconds", + {offsetof(struct CassandraOptions, gc_grace_period_in_seconds), + OptionType::kUInt32T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, +#endif // ROCKSDB_LITE +}; -const char* CassandraCompactionFilter::Name() const { - return "CassandraCompactionFilter"; +CassandraCompactionFilter::CassandraCompactionFilter( + bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds) + : options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) { + RegisterOptions(&options_, &cassandra_filter_type_info); } CompactionFilter::Decision CassandraCompactionFilter::FilterV2( @@ -23,12 +43,12 @@ CompactionFilter::Decision CassandraCompactionFilter::FilterV2( RowValue row_value = RowValue::Deserialize( existing_value.data(), existing_value.size()); RowValue compacted = - purge_ttl_on_expiration_ + options_.purge_ttl_on_expiration ? row_value.RemoveExpiredColumns(&value_changed) : row_value.ConvertExpiredColumnsToTombstones(&value_changed); if (value_type == ValueType::kValue) { - compacted = compacted.RemoveTombstones(gc_grace_period_in_seconds_); + compacted = compacted.RemoveTombstones(options_.gc_grace_period_in_seconds); } if(compacted.Empty()) { @@ -43,5 +63,48 @@ CompactionFilter::Decision CassandraCompactionFilter::FilterV2( return Decision::kKeep; } +CassandraCompactionFilterFactory::CassandraCompactionFilterFactory( + bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds) + : options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) { + RegisterOptions(&options_, &cassandra_filter_type_info); +} + +std::unique_ptr +CassandraCompactionFilterFactory::CreateCompactionFilter( + const CompactionFilter::Context&) { + std::unique_ptr result(new CassandraCompactionFilter( + options_.purge_ttl_on_expiration, options_.gc_grace_period_in_seconds)); + return result; +} + +#ifndef ROCKSDB_LITE +int RegisterCassandraObjects(ObjectLibrary& library, + const std::string& /*arg*/) { + library.Register( + CassandraValueMergeOperator::kClassName(), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new CassandraValueMergeOperator(0)); + return guard->get(); + }); + library.Register( + CassandraCompactionFilter::kClassName(), + [](const std::string& /*uri*/, + std::unique_ptr* /*guard */, + std::string* /* errmsg */) { + return new CassandraCompactionFilter(false, 0); + }); + library.Register( + CassandraCompactionFilterFactory::kClassName(), + [](const std::string& /*uri*/, + std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new CassandraCompactionFilterFactory(false, 0)); + return guard->get(); + }); + size_t num_types; + return static_cast(library.GetFactoryCount(&num_types)); +} +#endif // ROCKSDB_LITE } // namespace cassandra } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/cassandra/cassandra_compaction_filter.h b/utilities/cassandra/cassandra_compaction_filter.h index ac2588106..becadde32 100644 --- a/utilities/cassandra/cassandra_compaction_filter.h +++ b/utilities/cassandra/cassandra_compaction_filter.h @@ -5,8 +5,10 @@ #pragma once #include + #include "rocksdb/compaction_filter.h" #include "rocksdb/slice.h" +#include "utilities/cassandra/cassandra_options.h" namespace ROCKSDB_NAMESPACE { namespace cassandra { @@ -25,18 +27,31 @@ namespace cassandra { class CassandraCompactionFilter : public CompactionFilter { public: explicit CassandraCompactionFilter(bool purge_ttl_on_expiration, - int32_t gc_grace_period_in_seconds) - : purge_ttl_on_expiration_(purge_ttl_on_expiration), - gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {} + int32_t gc_grace_period_in_seconds); + static const char* kClassName() { return "CassandraCompactionFilter"; } + const char* Name() const override { return kClassName(); } - const char* Name() const override; virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, const Slice& existing_value, std::string* new_value, std::string* skip_until) const override; private: - bool purge_ttl_on_expiration_; - int32_t gc_grace_period_in_seconds_; + CassandraOptions options_; +}; + +class CassandraCompactionFilterFactory : public CompactionFilterFactory { + public: + explicit CassandraCompactionFilterFactory(bool purge_ttl_on_expiration, + int32_t gc_grace_period_in_seconds); + ~CassandraCompactionFilterFactory() override {} + + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override; + static const char* kClassName() { return "CassandraCompactionFilterFactory"; } + const char* Name() const override { return kClassName(); } + + private: + CassandraOptions options_; }; } // namespace cassandra } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/cassandra/cassandra_functional_test.cc b/utilities/cassandra/cassandra_functional_test.cc index cd06acc92..bde20340a 100644 --- a/utilities/cassandra/cassandra_functional_test.cc +++ b/utilities/cassandra/cassandra_functional_test.cc @@ -6,9 +6,10 @@ #include #include "db/db_impl/db_impl.h" +#include "rocksdb/convenience.h" #include "rocksdb/db.h" #include "rocksdb/merge_operator.h" -#include "rocksdb/utilities/db_ttl.h" +#include "rocksdb/utilities/object_registry.h" #include "test_util/testharness.h" #include "util/cast_util.h" #include "util/random.h" @@ -318,6 +319,99 @@ TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) { ASSERT_FALSE(std::get<0>(store.Get("k1"))); } +#ifndef ROCKSDB_LITE +TEST_F(CassandraFunctionalTest, LoadMergeOperator) { + ConfigOptions config_options; + std::shared_ptr mo; + config_options.ignore_unsupported_options = false; + + ASSERT_NOK(MergeOperator::CreateFromString( + config_options, CassandraValueMergeOperator::kClassName(), &mo)); + + config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects, + "cassandra"); + + ASSERT_OK(MergeOperator::CreateFromString( + config_options, CassandraValueMergeOperator::kClassName(), &mo)); + ASSERT_NE(mo, nullptr); + ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName()); + mo.reset(); + ASSERT_OK(MergeOperator::CreateFromString( + config_options, + std::string("operands_limit=20;gc_grace_period_in_seconds=42;id=") + + CassandraValueMergeOperator::kClassName(), + &mo)); + ASSERT_NE(mo, nullptr); + ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName()); + const auto* opts = mo->GetOptions(); + ASSERT_NE(opts, nullptr); + ASSERT_EQ(opts->gc_grace_period_in_seconds, 42); + ASSERT_EQ(opts->operands_limit, 20); +} + +TEST_F(CassandraFunctionalTest, LoadCompactionFilter) { + ConfigOptions config_options; + const CompactionFilter* filter = nullptr; + config_options.ignore_unsupported_options = false; + + ASSERT_NOK(CompactionFilter::CreateFromString( + config_options, CassandraCompactionFilter::kClassName(), &filter)); + config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects, + "cassandra"); + + ASSERT_OK(CompactionFilter::CreateFromString( + config_options, CassandraCompactionFilter::kClassName(), &filter)); + ASSERT_NE(filter, nullptr); + ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName()); + delete filter; + filter = nullptr; + ASSERT_OK(CompactionFilter::CreateFromString( + config_options, + std::string( + "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") + + CassandraCompactionFilter::kClassName(), + &filter)); + ASSERT_NE(filter, nullptr); + ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName()); + const auto* opts = filter->GetOptions(); + ASSERT_NE(opts, nullptr); + ASSERT_EQ(opts->gc_grace_period_in_seconds, 42); + ASSERT_TRUE(opts->purge_ttl_on_expiration); + delete filter; +} + +TEST_F(CassandraFunctionalTest, LoadCompactionFilterFactory) { + ConfigOptions config_options; + std::shared_ptr factory; + + config_options.ignore_unsupported_options = false; + ASSERT_NOK(CompactionFilterFactory::CreateFromString( + config_options, CassandraCompactionFilterFactory::kClassName(), + &factory)); + config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects, + "cassandra"); + + ASSERT_OK(CompactionFilterFactory::CreateFromString( + config_options, CassandraCompactionFilterFactory::kClassName(), + &factory)); + ASSERT_NE(factory, nullptr); + ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName()); + factory.reset(); + ASSERT_OK(CompactionFilterFactory::CreateFromString( + config_options, + std::string( + "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") + + CassandraCompactionFilterFactory::kClassName(), + &factory)); + ASSERT_NE(factory, nullptr); + ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName()); + const auto* opts = factory->GetOptions(); + ASSERT_NE(opts, nullptr); + ASSERT_EQ(opts->gc_grace_period_in_seconds, 42); + ASSERT_TRUE(opts->purge_ttl_on_expiration); +} +#endif // ROCKSDB_LITE + } // namespace cassandra } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/cassandra/cassandra_options.h b/utilities/cassandra/cassandra_options.h new file mode 100644 index 000000000..8cf756a98 --- /dev/null +++ b/utilities/cassandra/cassandra_options.h @@ -0,0 +1,40 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include + +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { +class ObjectLibrary; +namespace cassandra { +struct CassandraOptions { + static const char* kName() { return "CassandraOptions"; } + CassandraOptions(int32_t _gc_grace_period_in_seconds, size_t _operands_limit, + bool _purge_ttl_on_expiration = false) + : operands_limit(_operands_limit), + gc_grace_period_in_seconds(_gc_grace_period_in_seconds), + purge_ttl_on_expiration(_purge_ttl_on_expiration) {} + // Limit on the number of merge operands. + size_t operands_limit; + + // How long (in seconds) tombstoned data remains before it is purged + int32_t gc_grace_period_in_seconds; + + // If is set to true, expired data will be directly purged. + // Otherwise expired data will be converted tombstones first, + // then be eventually removed after gc grace period. This value should + // only true if all writes have same ttl setting, otherwise it could bring old + // data back. + bool purge_ttl_on_expiration; +}; +#ifndef ROCKSDB_LITE +extern "C" { +int RegisterCassandraObjects(ObjectLibrary& library, const std::string& arg); +} // extern "C" +#endif // ROCKSDB_LITE +} // namespace cassandra +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/cassandra/merge_operator.cc b/utilities/cassandra/merge_operator.cc index 82fe5d661..9d0cdd385 100644 --- a/utilities/cassandra/merge_operator.cc +++ b/utilities/cassandra/merge_operator.cc @@ -5,16 +5,36 @@ #include "merge_operator.h" -#include #include -#include "rocksdb/slice.h" +#include + #include "rocksdb/merge_operator.h" -#include "utilities/merge_operators.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/options_type.h" #include "utilities/cassandra/format.h" +#include "utilities/merge_operators.h" namespace ROCKSDB_NAMESPACE { namespace cassandra { +static std::unordered_map + merge_operator_options_info = { +#ifndef ROCKSDB_LITE + {"gc_grace_period_in_seconds", + {offsetof(struct CassandraOptions, gc_grace_period_in_seconds), + OptionType::kUInt32T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, + {"operands_limit", + {offsetof(struct CassandraOptions, operands_limit), OptionType::kSizeT, + OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, +#endif // ROCKSDB_LITE +}; + +CassandraValueMergeOperator::CassandraValueMergeOperator( + int32_t gc_grace_period_in_seconds, size_t operands_limit) + : options_(gc_grace_period_in_seconds, operands_limit) { + RegisterOptions(&options_, &merge_operator_options_info); +} // Implementation for the merge operation (merges two Cassandra values) bool CassandraValueMergeOperator::FullMergeV2( @@ -34,7 +54,7 @@ bool CassandraValueMergeOperator::FullMergeV2( } RowValue merged = RowValue::Merge(std::move(row_values)); - merged = merged.RemoveTombstones(gc_grace_period_in_seconds_); + merged = merged.RemoveTombstones(options_.gc_grace_period_in_seconds); merge_out->new_value.reserve(merged.Size()); merged.Serialize(&(merge_out->new_value)); @@ -58,10 +78,6 @@ bool CassandraValueMergeOperator::PartialMergeMulti( return true; } -const char* CassandraValueMergeOperator::Name() const { - return "CassandraValueMergeOperator"; -} - } // namespace cassandra } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/cassandra/merge_operator.h b/utilities/cassandra/merge_operator.h index b5bf7c520..4bf912809 100644 --- a/utilities/cassandra/merge_operator.h +++ b/utilities/cassandra/merge_operator.h @@ -6,6 +6,7 @@ #pragma once #include "rocksdb/merge_operator.h" #include "rocksdb/slice.h" +#include "utilities/cassandra/cassandra_options.h" namespace ROCKSDB_NAMESPACE { namespace cassandra { @@ -16,9 +17,7 @@ namespace cassandra { class CassandraValueMergeOperator : public MergeOperator { public: explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds, - size_t operands_limit = 0) - : gc_grace_period_in_seconds_(gc_grace_period_in_seconds), - operands_limit_(operands_limit) {} + size_t operands_limit = 0); virtual bool FullMergeV2(const MergeOperationInput& merge_in, MergeOperationOutput* merge_out) const override; @@ -28,17 +27,18 @@ public: std::string* new_value, Logger* logger) const override; - virtual const char* Name() const override; + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "CassandraValueMergeOperator"; } virtual bool AllowSingleOperand() const override { return true; } virtual bool ShouldMerge(const std::vector& operands) const override { - return operands_limit_ > 0 && operands.size() >= operands_limit_; + return options_.operands_limit > 0 && + operands.size() >= options_.operands_limit; } private: - int32_t gc_grace_period_in_seconds_; - size_t operands_limit_; + CassandraOptions options_; }; } // namespace cassandra } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/compaction_filters.cc b/utilities/compaction_filters.cc new file mode 100644 index 000000000..d527ee5b8 --- /dev/null +++ b/utilities/compaction_filters.cc @@ -0,0 +1,56 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include + +#include "rocksdb/compaction_filter.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/customizable_util.h" +#include "rocksdb/utilities/options_type.h" +#include "utilities/compaction_filters/layered_compaction_filter_base.h" +#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h" + +namespace ROCKSDB_NAMESPACE { +#ifndef ROCKSDB_LITE +static int RegisterBuiltinCompactionFilters(ObjectLibrary& library, + const std::string& /*arg*/) { + library.Register( + RemoveEmptyValueCompactionFilter::kClassName(), + [](const std::string& /*uri*/, + std::unique_ptr* /*guard*/, + std::string* /*errmsg*/) { + return new RemoveEmptyValueCompactionFilter(); + }); + return 1; +} +#endif // ROCKSDB_LITE +Status CompactionFilter::CreateFromString(const ConfigOptions& config_options, + const std::string& value, + const CompactionFilter** result) { +#ifndef ROCKSDB_LITE + static std::once_flag once; + std::call_once(once, [&]() { + RegisterBuiltinCompactionFilters(*(ObjectLibrary::Default().get()), ""); + }); +#endif // ROCKSDB_LITE + CompactionFilter* filter = const_cast(*result); + Status status = LoadStaticObject(config_options, value, + nullptr, &filter); + if (status.ok()) { + *result = const_cast(filter); + } + return status; +} + +Status CompactionFilterFactory::CreateFromString( + const ConfigOptions& config_options, const std::string& value, + std::shared_ptr* result) { + // Currently there are no builtin CompactionFilterFactories. + // If any are introduced, they need to be registered here. + Status status = LoadSharedObject( + config_options, value, nullptr, result); + return status; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/compaction_filters/layered_compaction_filter_base.h b/utilities/compaction_filters/layered_compaction_filter_base.h index abde64952..803fa94ae 100644 --- a/utilities/compaction_filters/layered_compaction_filter_base.h +++ b/utilities/compaction_filters/layered_compaction_filter_base.h @@ -10,7 +10,7 @@ namespace ROCKSDB_NAMESPACE { -// Abstract base class for building layered compation filter on top of +// Abstract base class for building layered compaction filter on top of // user compaction filter. // See BlobIndexCompactionFilter or TtlCompactionFilter for a basic usage. class LayeredCompactionFilterBase : public CompactionFilter { @@ -29,8 +29,12 @@ class LayeredCompactionFilterBase : public CompactionFilter { // Return a pointer to user compaction filter const CompactionFilter* user_comp_filter() const { return user_comp_filter_; } - private: + const Customizable* Inner() const override { return user_comp_filter_; } + + protected: const CompactionFilter* user_comp_filter_; + + private: std::unique_ptr user_comp_filter_from_factory_; }; diff --git a/utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc b/utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc index c97eef41d..f4dbce100 100644 --- a/utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc +++ b/utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc @@ -12,10 +12,6 @@ namespace ROCKSDB_NAMESPACE { -const char* RemoveEmptyValueCompactionFilter::Name() const { - return "RemoveEmptyValueCompactionFilter"; -} - bool RemoveEmptyValueCompactionFilter::Filter(int /*level*/, const Slice& /*key*/, const Slice& existing_value, diff --git a/utilities/compaction_filters/remove_emptyvalue_compactionfilter.h b/utilities/compaction_filters/remove_emptyvalue_compactionfilter.h index f5dbec900..864ad15ff 100644 --- a/utilities/compaction_filters/remove_emptyvalue_compactionfilter.h +++ b/utilities/compaction_filters/remove_emptyvalue_compactionfilter.h @@ -16,12 +16,13 @@ namespace ROCKSDB_NAMESPACE { class RemoveEmptyValueCompactionFilter : public CompactionFilter { public: - const char* Name() const override; - bool Filter(int level, - const Slice& key, - const Slice& existing_value, - std::string* new_value, - bool* value_changed) const override; + static const char* kClassName() { return "RemoveEmptyValueCompactionFilter"; } + + const char* Name() const override { return kClassName(); } + + bool Filter(int level, const Slice& key, const Slice& existing_value, + std::string* new_value, bool* value_changed) const override; }; + } // namespace ROCKSDB_NAMESPACE #endif // !ROCKSDB_LITE diff --git a/utilities/merge_operators.cc b/utilities/merge_operators.cc new file mode 100644 index 000000000..7fe0abfaf --- /dev/null +++ b/utilities/merge_operators.cc @@ -0,0 +1,125 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "utilities/merge_operators.h" + +#include + +#include "rocksdb/merge_operator.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/customizable_util.h" +#include "rocksdb/utilities/object_registry.h" +#include "utilities/merge_operators/bytesxor.h" +#include "utilities/merge_operators/sortlist.h" +#include "utilities/merge_operators/string_append/stringappend.h" +#include "utilities/merge_operators/string_append/stringappend2.h" + +namespace ROCKSDB_NAMESPACE { +static bool LoadMergeOperator(const std::string& id, + std::shared_ptr* result) { + bool success = true; + // TODO: Hook the "name" up to the actual Name() of the MergeOperators? + // Requires these classes be moved into a header file... + if (id == "put" || id == "PutOperator") { + *result = MergeOperators::CreatePutOperator(); + } else if (id == "put_v1") { + *result = MergeOperators::CreateDeprecatedPutOperator(); + } else if (id == "uint64add" || id == "UInt64AddOperator") { + *result = MergeOperators::CreateUInt64AddOperator(); + } else if (id == "max" || id == "MaxOperator") { + *result = MergeOperators::CreateMaxOperator(); +#ifdef ROCKSDB_LITE + // The remainder of the classes are handled by the ObjectRegistry in + // non-LITE mode + } else if (id == StringAppendOperator::kNickName() || + id == StringAppendOperator::kClassName()) { + *result = MergeOperators::CreateStringAppendOperator(); + } else if (id == StringAppendTESTOperator::kNickName() || + id == StringAppendTESTOperator::kClassName()) { + *result = MergeOperators::CreateStringAppendTESTOperator(); + } else if (id == BytesXOROperator::kNickName() || + id == BytesXOROperator::kClassName()) { + *result = MergeOperators::CreateBytesXOROperator(); + } else if (id == SortList::kNickName() || id == SortList::kClassName()) { + *result = MergeOperators::CreateSortOperator(); +#endif // ROCKSDB_LITE + } else { + success = false; + } + return success; +} + +#ifndef ROCKSDB_LITE +static int RegisterBuiltinMergeOperators(ObjectLibrary& library, + const std::string& /*arg*/) { + size_t num_types; + auto AsRegex = [](const std::string& name, const std::string& alt) { + std::string regex; + regex.append("(").append(name); + regex.append("|").append(alt).append(")"); + return regex; + }; + + library.Register( + AsRegex(StringAppendOperator::kClassName(), + StringAppendOperator::kNickName()), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /*errmsg*/) { + guard->reset(new StringAppendOperator(",")); + return guard->get(); + }); + library.Register( + AsRegex(StringAppendTESTOperator::kClassName(), + StringAppendTESTOperator::kNickName()), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /*errmsg*/) { + guard->reset(new StringAppendTESTOperator(",")); + return guard->get(); + }); + library.Register( + AsRegex(SortList::kClassName(), SortList::kNickName()), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /*errmsg*/) { + guard->reset(new SortList()); + return guard->get(); + }); + library.Register( + AsRegex(BytesXOROperator::kClassName(), BytesXOROperator::kNickName()), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /*errmsg*/) { + guard->reset(new BytesXOROperator()); + return guard->get(); + }); + + return static_cast(library.GetFactoryCount(&num_types)); +} +#endif // ROCKSDB_LITE + +Status MergeOperator::CreateFromString(const ConfigOptions& config_options, + const std::string& value, + std::shared_ptr* result) { +#ifndef ROCKSDB_LITE + static std::once_flag once; + std::call_once(once, [&]() { + RegisterBuiltinMergeOperators(*(ObjectLibrary::Default().get()), ""); + }); +#endif // ROCKSDB_LITE + return LoadSharedObject(config_options, value, + LoadMergeOperator, result); +} + +std::shared_ptr MergeOperators::CreateFromStringId( + const std::string& id) { + std::shared_ptr result; + Status s = MergeOperator::CreateFromString(ConfigOptions(), id, &result); + if (s.ok()) { + return result; + } else { + // Empty or unknown, just return nullptr + return nullptr; + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h index ded5e2ee8..37535cdc5 100644 --- a/utilities/merge_operators.h +++ b/utilities/merge_operators.h @@ -28,30 +28,8 @@ class MergeOperators { static std::shared_ptr CreateSortOperator(); // Will return a different merge operator depending on the string. - // TODO: Hook the "name" up to the actual Name() of the MergeOperators? static std::shared_ptr CreateFromStringId( - const std::string& name) { - if (name == "put") { - return CreatePutOperator(); - } else if (name == "put_v1") { - return CreateDeprecatedPutOperator(); - } else if ( name == "uint64add") { - return CreateUInt64AddOperator(); - } else if (name == "stringappend") { - return CreateStringAppendOperator(); - } else if (name == "stringappendtest") { - return CreateStringAppendTESTOperator(); - } else if (name == "max") { - return CreateMaxOperator(); - } else if (name == "bytesxor") { - return CreateBytesXOROperator(); - } else if (name == "sortlist") { - return CreateSortOperator(); - } else { - // Empty or unknown, just return nullptr - return nullptr; - } - } + const std::string& name); }; } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/merge_operators/bytesxor.h b/utilities/merge_operators/bytesxor.h index ab0c5aecc..f05b6ca98 100644 --- a/utilities/merge_operators/bytesxor.h +++ b/utilities/merge_operators/bytesxor.h @@ -28,9 +28,11 @@ class BytesXOROperator : public AssociativeMergeOperator { std::string* new_value, Logger* logger) const override; - virtual const char* Name() const override { - return "BytesXOR"; - } + static const char* kClassName() { return "BytesXOR"; } + static const char* kNickName() { return "bytesxor"; } + + const char* NickName() const override { return kNickName(); } + const char* Name() const override { return kClassName(); } void XOR(const Slice* existing_value, const Slice& value, std::string* new_value) const; diff --git a/utilities/merge_operators/max.cc b/utilities/merge_operators/max.cc index 2270c1f03..de4abfa6f 100644 --- a/utilities/merge_operators/max.cc +++ b/utilities/merge_operators/max.cc @@ -64,7 +64,10 @@ class MaxOperator : public MergeOperator { return true; } - const char* Name() const override { return "MaxOperator"; } + static const char* kClassName() { return "MaxOperator"; } + static const char* kNickName() { return "max"; } + const char* Name() const override { return kClassName(); } + const char* NickName() const override { return kNickName(); } }; } // end of anonymous namespace diff --git a/utilities/merge_operators/put.cc b/utilities/merge_operators/put.cc index 901d69e94..2133f1b57 100644 --- a/utilities/merge_operators/put.cc +++ b/utilities/merge_operators/put.cc @@ -48,7 +48,10 @@ class PutOperator : public MergeOperator { return true; } - const char* Name() const override { return "PutOperator"; } + static const char* kClassName() { return "PutOperator"; } + static const char* kNickName() { return "put_v1"; } + const char* Name() const override { return kClassName(); } + const char* NickName() const override { return kNickName(); } }; class PutOperatorV2 : public PutOperator { @@ -67,6 +70,9 @@ class PutOperatorV2 : public PutOperator { merge_out->existing_operand = merge_in.operand_list.back(); return true; } + + static const char* kNickName() { return "put"; } + const char* NickName() const override { return kNickName(); } }; } // end of anonymous namespace diff --git a/utilities/merge_operators/sortlist.cc b/utilities/merge_operators/sortlist.cc index 078b83e19..fae33e2fd 100644 --- a/utilities/merge_operators/sortlist.cc +++ b/utilities/merge_operators/sortlist.cc @@ -49,8 +49,6 @@ bool SortList::PartialMergeMulti(const Slice& /*key*/, return true; } -const char* SortList::Name() const { return "MergeSortOperator"; } - void SortList::MakeVector(std::vector& operand, Slice slice) const { do { const char* begin = slice.data_; diff --git a/utilities/merge_operators/sortlist.h b/utilities/merge_operators/sortlist.h index 5e08bd583..eaa4e76fb 100644 --- a/utilities/merge_operators/sortlist.h +++ b/utilities/merge_operators/sortlist.h @@ -27,7 +27,11 @@ class SortList : public MergeOperator { const std::deque& operand_list, std::string* new_value, Logger* logger) const override; - const char* Name() const override; + static const char* kClassName() { return "MergeSortOperator"; } + static const char* kNickName() { return "sortlist"; } + + const char* Name() const override { return kClassName(); } + const char* NickName() const override { return kNickName(); } void MakeVector(std::vector& operand, Slice slice) const; diff --git a/utilities/merge_operators/string_append/stringappend.cc b/utilities/merge_operators/string_append/stringappend.cc index cd963b5b1..c20d415e7 100644 --- a/utilities/merge_operators/string_append/stringappend.cc +++ b/utilities/merge_operators/string_append/stringappend.cc @@ -6,21 +6,36 @@ #include "stringappend.h" -#include #include -#include "rocksdb/slice.h" +#include + #include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/options_type.h" #include "utilities/merge_operators.h" namespace ROCKSDB_NAMESPACE { - +namespace { +static std::unordered_map + stringappend_merge_type_info = { +#ifndef ROCKSDB_LITE + {"delimiter", + {0, OptionType::kString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, +#endif // ROCKSDB_LITE +}; +} // namespace // Constructor: also specify the delimiter character. StringAppendOperator::StringAppendOperator(char delim_char) - : delim_(1, delim_char) {} + : delim_(1, delim_char) { + RegisterOptions("Delimiter", &delim_, &stringappend_merge_type_info); +} StringAppendOperator::StringAppendOperator(const std::string& delim) - : delim_(delim) {} + : delim_(delim) { + RegisterOptions("Delimiter", &delim_, &stringappend_merge_type_info); +} // Implementation for the merge operation (concatenates two strings) bool StringAppendOperator::Merge(const Slice& /*key*/, @@ -46,9 +61,6 @@ bool StringAppendOperator::Merge(const Slice& /*key*/, return true; } -const char* StringAppendOperator::Name() const { - return "StringAppendOperator"; -} std::shared_ptr MergeOperators::CreateStringAppendOperator() { return std::make_shared(','); diff --git a/utilities/merge_operators/string_append/stringappend.h b/utilities/merge_operators/string_append/stringappend.h index 98fc6c998..3c2bb1907 100644 --- a/utilities/merge_operators/string_append/stringappend.h +++ b/utilities/merge_operators/string_append/stringappend.h @@ -22,7 +22,10 @@ class StringAppendOperator : public AssociativeMergeOperator { std::string* new_value, Logger* logger) const override; - virtual const char* Name() const override; + static const char* kClassName() { return "StringAppendOperator"; } + static const char* kNickName() { return "stringappend"; } + virtual const char* Name() const override { return kClassName(); } + virtual const char* NickName() const override { return kNickName(); } private: std::string delim_; // The delimiter is inserted between elements diff --git a/utilities/merge_operators/string_append/stringappend2.cc b/utilities/merge_operators/string_append/stringappend2.cc index 699697c43..36cb9ee34 100644 --- a/utilities/merge_operators/string_append/stringappend2.cc +++ b/utilities/merge_operators/string_append/stringappend2.cc @@ -5,22 +5,38 @@ #include "stringappend2.h" +#include + #include #include -#include -#include "rocksdb/slice.h" #include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/options_type.h" #include "utilities/merge_operators.h" namespace ROCKSDB_NAMESPACE { +namespace { +static std::unordered_map + stringappend2_merge_type_info = { +#ifndef ROCKSDB_LITE + {"delimiter", + {0, OptionType::kString, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, +#endif // ROCKSDB_LITE +}; +} // namespace // Constructor: also specify the delimiter character. StringAppendTESTOperator::StringAppendTESTOperator(char delim_char) - : delim_(1, delim_char) {} + : delim_(1, delim_char) { + RegisterOptions("Delimiter", &delim_, &stringappend2_merge_type_info); +} StringAppendTESTOperator::StringAppendTESTOperator(const std::string& delim) - : delim_(delim) {} + : delim_(delim) { + RegisterOptions("Delimiter", &delim_, &stringappend2_merge_type_info); +} // Implementation for the merge operation (concatenates two strings) bool StringAppendTESTOperator::FullMergeV2( @@ -37,6 +53,7 @@ bool StringAppendTESTOperator::FullMergeV2( // Compute the space needed for the final result. size_t numBytes = 0; + for (auto it = merge_in.operand_list.begin(); it != merge_in.operand_list.end(); ++it) { numBytes += it->size() + delim_.size(); @@ -107,11 +124,6 @@ bool StringAppendTESTOperator::_AssocPartialMergeMulti( return true; } -const char* StringAppendTESTOperator::Name() const { - return "StringAppendTESTOperator"; -} - - std::shared_ptr MergeOperators::CreateStringAppendTESTOperator() { return std::make_shared(','); diff --git a/utilities/merge_operators/string_append/stringappend2.h b/utilities/merge_operators/string_append/stringappend2.h index 2d4b554f3..339c760bd 100644 --- a/utilities/merge_operators/string_append/stringappend2.h +++ b/utilities/merge_operators/string_append/stringappend2.h @@ -34,7 +34,10 @@ class StringAppendTESTOperator : public MergeOperator { std::string* new_value, Logger* logger) const override; - virtual const char* Name() const override; + static const char* kClassName() { return "StringAppendTESTOperator"; } + static const char* kNickName() { return "stringappendtest"; } + const char* Name() const override { return kClassName(); } + const char* NickName() const override { return kNickName(); } private: // A version of PartialMerge that actually performs "partial merging". diff --git a/utilities/merge_operators/uint64add.cc b/utilities/merge_operators/uint64add.cc index 3ef240928..d8e39615e 100644 --- a/utilities/merge_operators/uint64add.cc +++ b/utilities/merge_operators/uint64add.cc @@ -36,7 +36,10 @@ class UInt64AddOperator : public AssociativeMergeOperator { return true; // Return true always since corruption will be treated as 0 } - const char* Name() const override { return "UInt64AddOperator"; } + static const char* kClassName() { return "UInt64AddOperator"; } + static const char* kNickName() { return "uint64add"; } + const char* Name() const override { return kClassName(); } + const char* NickName() const override { return kNickName(); } private: // Takes the string and decodes it into a uint64_t diff --git a/utilities/ttl/db_ttl_impl.cc b/utilities/ttl/db_ttl_impl.cc index 917130ca1..c59cc93cc 100644 --- a/utilities/ttl/db_ttl_impl.cc +++ b/utilities/ttl/db_ttl_impl.cc @@ -13,9 +13,146 @@ #include "rocksdb/iterator.h" #include "rocksdb/system_clock.h" #include "rocksdb/utilities/db_ttl.h" +#include "rocksdb/utilities/object_registry.h" +#include "rocksdb/utilities/options_type.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { +static std::unordered_map ttl_merge_op_type_info = + {{"user_operator", + OptionTypeInfo::AsCustomSharedPtr( + 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)}}; + +TtlMergeOperator::TtlMergeOperator( + const std::shared_ptr& merge_op, SystemClock* clock) + : user_merge_op_(merge_op), clock_(clock) { + RegisterOptions("TtlMergeOptions", &user_merge_op_, &ttl_merge_op_type_info); +} + +bool TtlMergeOperator::FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { + const uint32_t ts_len = DBWithTTLImpl::kTSLength; + if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) { + ROCKS_LOG_ERROR(merge_in.logger, + "Error: Could not remove timestamp from existing value."); + return false; + } + + // Extract time-stamp from each operand to be passed to user_merge_op_ + std::vector operands_without_ts; + for (const auto& operand : merge_in.operand_list) { + if (operand.size() < ts_len) { + ROCKS_LOG_ERROR(merge_in.logger, + "Error: Could not remove timestamp from operand value."); + return false; + } + operands_without_ts.push_back(operand); + operands_without_ts.back().remove_suffix(ts_len); + } + + // Apply the user merge operator (store result in *new_value) + bool good = true; + MergeOperationOutput user_merge_out(merge_out->new_value, + merge_out->existing_operand); + if (merge_in.existing_value) { + Slice existing_value_without_ts(merge_in.existing_value->data(), + merge_in.existing_value->size() - ts_len); + good = user_merge_op_->FullMergeV2( + MergeOperationInput(merge_in.key, &existing_value_without_ts, + operands_without_ts, merge_in.logger), + &user_merge_out); + } else { + good = user_merge_op_->FullMergeV2( + MergeOperationInput(merge_in.key, nullptr, operands_without_ts, + merge_in.logger), + &user_merge_out); + } + + // Return false if the user merge operator returned false + if (!good) { + return false; + } + + if (merge_out->existing_operand.data()) { + merge_out->new_value.assign(merge_out->existing_operand.data(), + merge_out->existing_operand.size()); + merge_out->existing_operand = Slice(nullptr, 0); + } + + // Augment the *new_value with the ttl time-stamp + int64_t curtime; + if (!clock_->GetCurrentTime(&curtime).ok()) { + ROCKS_LOG_ERROR( + merge_in.logger, + "Error: Could not get current time to be attached internally " + "to the new value."); + return false; + } else { + char ts_string[ts_len]; + EncodeFixed32(ts_string, (int32_t)curtime); + merge_out->new_value.append(ts_string, ts_len); + return true; + } +} + +bool TtlMergeOperator::PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const { + const uint32_t ts_len = DBWithTTLImpl::kTSLength; + std::deque operands_without_ts; + + for (const auto& operand : operand_list) { + if (operand.size() < ts_len) { + ROCKS_LOG_ERROR(logger, "Error: Could not remove timestamp from value."); + return false; + } + + operands_without_ts.push_back( + Slice(operand.data(), operand.size() - ts_len)); + } + + // Apply the user partial-merge operator (store result in *new_value) + assert(new_value); + if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value, + logger)) { + return false; + } + + // Augment the *new_value with the ttl time-stamp + int64_t curtime; + if (!clock_->GetCurrentTime(&curtime).ok()) { + ROCKS_LOG_ERROR( + logger, + "Error: Could not get current time to be attached internally " + "to the new value."); + return false; + } else { + char ts_string[ts_len]; + EncodeFixed32(ts_string, (int32_t)curtime); + new_value->append(ts_string, ts_len); + return true; + } +} + +Status TtlMergeOperator::PrepareOptions(const ConfigOptions& config_options) { + if (clock_ == nullptr) { + clock_ = config_options.env->GetSystemClock().get(); + } + return MergeOperator::PrepareOptions(config_options); +} + +Status TtlMergeOperator::ValidateOptions( + const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const { + if (user_merge_op_ == nullptr) { + return Status::InvalidArgument( + "UserMergeOperator required by TtlMergeOperator"); + } else if (clock_ == nullptr) { + return Status::InvalidArgument("SystemClock required by TtlMergeOperator"); + } else { + return MergeOperator::ValidateOptions(db_opts, cf_opts); + } +} void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options, SystemClock* clock) { @@ -34,6 +171,139 @@ void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options, } } +static std::unordered_map ttl_type_info = { + {"ttl", {0, OptionType::kInt32T}}, +}; + +static std::unordered_map ttl_cff_type_info = { + {"user_filter_factory", + OptionTypeInfo::AsCustomSharedPtr( + 0, OptionVerificationType::kByNameAllowFromNull, + OptionTypeFlags::kNone)}}; +static std::unordered_map user_cf_type_info = { + {"user_filter", + OptionTypeInfo::AsCustomRawPtr( + 0, OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)}}; + +TtlCompactionFilter::TtlCompactionFilter( + int32_t ttl, SystemClock* clock, const CompactionFilter* _user_comp_filter, + std::unique_ptr _user_comp_filter_from_factory) + : LayeredCompactionFilterBase(_user_comp_filter, + std::move(_user_comp_filter_from_factory)), + ttl_(ttl), + clock_(clock) { + RegisterOptions("TTL", &ttl_, &ttl_type_info); + RegisterOptions("UserFilter", &user_comp_filter_, &user_cf_type_info); +} + +bool TtlCompactionFilter::Filter(int level, const Slice& key, + const Slice& old_val, std::string* new_val, + bool* value_changed) const { + if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) { + return true; + } + if (user_comp_filter() == nullptr) { + return false; + } + assert(old_val.size() >= DBWithTTLImpl::kTSLength); + Slice old_val_without_ts(old_val.data(), + old_val.size() - DBWithTTLImpl::kTSLength); + if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val, + value_changed)) { + return true; + } + if (*value_changed) { + new_val->append(old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength, + DBWithTTLImpl::kTSLength); + } + return false; +} + +Status TtlCompactionFilter::PrepareOptions( + const ConfigOptions& config_options) { + if (clock_ == nullptr) { + clock_ = config_options.env->GetSystemClock().get(); + } + return LayeredCompactionFilterBase::PrepareOptions(config_options); +} + +Status TtlCompactionFilter::ValidateOptions( + const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const { + if (clock_ == nullptr) { + return Status::InvalidArgument( + "SystemClock required by TtlCompactionFilter"); + } else { + return LayeredCompactionFilterBase::ValidateOptions(db_opts, cf_opts); + } +} + +TtlCompactionFilterFactory::TtlCompactionFilterFactory( + int32_t ttl, SystemClock* clock, + std::shared_ptr comp_filter_factory) + : ttl_(ttl), clock_(clock), user_comp_filter_factory_(comp_filter_factory) { + RegisterOptions("UserOptions", &user_comp_filter_factory_, + &ttl_cff_type_info); + RegisterOptions("TTL", &ttl_, &ttl_type_info); +} + +std::unique_ptr +TtlCompactionFilterFactory::CreateCompactionFilter( + const CompactionFilter::Context& context) { + std::unique_ptr user_comp_filter_from_factory = + nullptr; + if (user_comp_filter_factory_) { + user_comp_filter_from_factory = + user_comp_filter_factory_->CreateCompactionFilter(context); + } + + return std::unique_ptr(new TtlCompactionFilter( + ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory))); +} + +Status TtlCompactionFilterFactory::PrepareOptions( + const ConfigOptions& config_options) { + if (clock_ == nullptr) { + clock_ = config_options.env->GetSystemClock().get(); + } + return CompactionFilterFactory::PrepareOptions(config_options); +} + +Status TtlCompactionFilterFactory::ValidateOptions( + const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const { + if (clock_ == nullptr) { + return Status::InvalidArgument( + "SystemClock required by TtlCompactionFilterFactory"); + } else { + return CompactionFilterFactory::ValidateOptions(db_opts, cf_opts); + } +} + +int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/) { + library.Register( + TtlMergeOperator::kClassName(), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new TtlMergeOperator(nullptr, nullptr)); + return guard->get(); + }); + library.Register( + TtlCompactionFilterFactory::kClassName(), + [](const std::string& /*uri*/, + std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new TtlCompactionFilterFactory(0, nullptr, nullptr)); + return guard->get(); + }); + library.Register( + TtlCompactionFilter::kClassName(), + [](const std::string& /*uri*/, + std::unique_ptr* /*guard*/, + std::string* /* errmsg */) { + return new TtlCompactionFilter(0, nullptr, nullptr); + }); + size_t num_types; + return static_cast(library.GetFactoryCount(&num_types)); +} // Open the db inside DBWithTTLImpl because options needs pointer to its ttl DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {} @@ -68,9 +338,15 @@ Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname, return s; } +void DBWithTTLImpl::RegisterTtlClasses() { + static std::once_flag once; + std::call_once(once, [&]() { + ObjectRegistry::Default()->AddLibrary("TTL", RegisterTtlObjects, ""); + }); +} + Status DBWithTTL::Open(const Options& options, const std::string& dbname, DBWithTTL** dbptr, int32_t ttl, bool read_only) { - DBOptions db_options(options); ColumnFamilyOptions cf_options(options); std::vector column_families; @@ -93,6 +369,7 @@ Status DBWithTTL::Open( const std::vector& column_families, std::vector* handles, DBWithTTL** dbptr, const std::vector& ttls, bool read_only) { + DBWithTTLImpl::RegisterTtlClasses(); if (ttls.size() != column_families.size()) { return Status::InvalidArgument( "ttls size has to be the same as number of column families"); @@ -128,6 +405,7 @@ Status DBWithTTL::Open( Status DBWithTTLImpl::CreateColumnFamilyWithTtl( const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle, int ttl) { + RegisterTtlClasses(); ColumnFamilyOptions sanitized_options = options; DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv()->GetSystemClock().get()); diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index ab3ff3729..8f53bc497 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -25,12 +25,15 @@ #endif namespace ROCKSDB_NAMESPACE { - +struct ConfigOptions; +class ObjectLibrary; +class ObjectRegistry; class DBWithTTLImpl : public DBWithTTL { public: static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options, SystemClock* clock); + static void RegisterTtlClasses(); explicit DBWithTTLImpl(DB* db); virtual ~DBWithTTLImpl(); @@ -155,37 +158,24 @@ class TtlCompactionFilter : public LayeredCompactionFilterBase { TtlCompactionFilter(int32_t ttl, SystemClock* clock, const CompactionFilter* _user_comp_filter, std::unique_ptr - _user_comp_filter_from_factory = nullptr) - : LayeredCompactionFilterBase(_user_comp_filter, - std::move(_user_comp_filter_from_factory)), - ttl_(ttl), - clock_(clock) {} + _user_comp_filter_from_factory = nullptr); virtual bool Filter(int level, const Slice& key, const Slice& old_val, - std::string* new_val, bool* value_changed) const - override { - if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) { - return true; - } - if (user_comp_filter() == nullptr) { - return false; - } - assert(old_val.size() >= DBWithTTLImpl::kTSLength); - Slice old_val_without_ts(old_val.data(), - old_val.size() - DBWithTTLImpl::kTSLength); - if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val, - value_changed)) { + std::string* new_val, bool* value_changed) const override; + + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "TtlCompactionFilter"; } + bool IsInstanceOf(const std::string& name) const override { + if (name == "Delete By TTL") { return true; + } else { + return LayeredCompactionFilterBase::IsInstanceOf(name); } - if (*value_changed) { - new_val->append( - old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength, - DBWithTTLImpl::kTSLength); - } - return false; } - virtual const char* Name() const override { return "Delete By TTL"; } + Status PrepareOptions(const ConfigOptions& config_options) override; + Status ValidateOptions(const DBOptions& db_opts, + const ColumnFamilyOptions& cf_opts) const override; private: int32_t ttl_; @@ -196,30 +186,21 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory { public: TtlCompactionFilterFactory( int32_t ttl, SystemClock* clock, - std::shared_ptr comp_filter_factory) - : ttl_(ttl), - clock_(clock), - user_comp_filter_factory_(comp_filter_factory) {} - - virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& context) override { - std::unique_ptr user_comp_filter_from_factory = - nullptr; - if (user_comp_filter_factory_) { - user_comp_filter_from_factory = - user_comp_filter_factory_->CreateCompactionFilter(context); - } - - return std::unique_ptr(new TtlCompactionFilter( - ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory))); - } + std::shared_ptr comp_filter_factory); + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override; void SetTtl(int32_t ttl) { ttl_ = ttl; } - virtual const char* Name() const override { - return "TtlCompactionFilterFactory"; + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "TtlCompactionFilterFactory"; } + Status PrepareOptions(const ConfigOptions& config_options) override; + Status ValidateOptions(const DBOptions& db_opts, + const ColumnFamilyOptions& cf_opts) const override; + const Customizable* Inner() const override { + return user_comp_filter_factory_.get(); } private: @@ -232,125 +213,38 @@ class TtlMergeOperator : public MergeOperator { public: explicit TtlMergeOperator(const std::shared_ptr& merge_op, - SystemClock* clock) - : user_merge_op_(merge_op), clock_(clock) { - assert(merge_op); - assert(clock); - } + SystemClock* clock); - virtual bool FullMergeV2(const MergeOperationInput& merge_in, - MergeOperationOutput* merge_out) const override { - const uint32_t ts_len = DBWithTTLImpl::kTSLength; - if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) { - ROCKS_LOG_ERROR(merge_in.logger, - "Error: Could not remove timestamp from existing value."); - return false; - } + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; - // Extract time-stamp from each operand to be passed to user_merge_op_ - std::vector operands_without_ts; - for (const auto& operand : merge_in.operand_list) { - if (operand.size() < ts_len) { - ROCKS_LOG_ERROR( - merge_in.logger, - "Error: Could not remove timestamp from operand value."); - return false; - } - operands_without_ts.push_back(operand); - operands_without_ts.back().remove_suffix(ts_len); - } + bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const override; - // Apply the user merge operator (store result in *new_value) - bool good = true; - MergeOperationOutput user_merge_out(merge_out->new_value, - merge_out->existing_operand); - if (merge_in.existing_value) { - Slice existing_value_without_ts(merge_in.existing_value->data(), - merge_in.existing_value->size() - ts_len); - good = user_merge_op_->FullMergeV2( - MergeOperationInput(merge_in.key, &existing_value_without_ts, - operands_without_ts, merge_in.logger), - &user_merge_out); - } else { - good = user_merge_op_->FullMergeV2( - MergeOperationInput(merge_in.key, nullptr, operands_without_ts, - merge_in.logger), - &user_merge_out); - } + static const char* kClassName() { return "TtlMergeOperator"; } - // Return false if the user merge operator returned false - if (!good) { - return false; - } - - if (merge_out->existing_operand.data()) { - merge_out->new_value.assign(merge_out->existing_operand.data(), - merge_out->existing_operand.size()); - merge_out->existing_operand = Slice(nullptr, 0); - } - - // Augment the *new_value with the ttl time-stamp - int64_t curtime; - if (!clock_->GetCurrentTime(&curtime).ok()) { - ROCKS_LOG_ERROR( - merge_in.logger, - "Error: Could not get current time to be attached internally " - "to the new value."); - return false; - } else { - char ts_string[ts_len]; - EncodeFixed32(ts_string, (int32_t)curtime); - merge_out->new_value.append(ts_string, ts_len); + const char* Name() const override { return kClassName(); } + bool IsInstanceOf(const std::string& name) const override { + if (name == "Merge By TTL") { return true; - } - } - - virtual bool PartialMergeMulti(const Slice& key, - const std::deque& operand_list, - std::string* new_value, Logger* logger) const - override { - const uint32_t ts_len = DBWithTTLImpl::kTSLength; - std::deque operands_without_ts; - - for (const auto& operand : operand_list) { - if (operand.size() < ts_len) { - ROCKS_LOG_ERROR(logger, - "Error: Could not remove timestamp from value."); - return false; - } - - operands_without_ts.push_back( - Slice(operand.data(), operand.size() - ts_len)); - } - - // Apply the user partial-merge operator (store result in *new_value) - assert(new_value); - if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value, - logger)) { - return false; - } - - // Augment the *new_value with the ttl time-stamp - int64_t curtime; - if (!clock_->GetCurrentTime(&curtime).ok()) { - ROCKS_LOG_ERROR( - logger, - "Error: Could not get current time to be attached internally " - "to the new value."); - return false; } else { - char ts_string[ts_len]; - EncodeFixed32(ts_string, (int32_t)curtime); - new_value->append(ts_string, ts_len); - return true; + return MergeOperator::IsInstanceOf(name); } } - virtual const char* Name() const override { return "Merge By TTL"; } + Status PrepareOptions(const ConfigOptions& config_options) override; + Status ValidateOptions(const DBOptions& db_opts, + const ColumnFamilyOptions& cf_opts) const override; + const Customizable* Inner() const override { return user_merge_op_.get(); } private: std::shared_ptr user_merge_op_; SystemClock* clock_; }; +extern "C" { +int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/); +} // extern "C" + } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index 8e60fbc76..c657dfe2b 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -7,10 +7,16 @@ #include #include + #include "rocksdb/compaction_filter.h" +#include "rocksdb/convenience.h" +#include "rocksdb/merge_operator.h" #include "rocksdb/utilities/db_ttl.h" +#include "rocksdb/utilities/object_registry.h" #include "test_util/testharness.h" #include "util/string_util.h" +#include "utilities/merge_operators/bytesxor.h" +#include "utilities/ttl/db_ttl_impl.h" #ifndef OS_WIN #include #endif @@ -719,6 +725,171 @@ TEST_F(TtlTest, DeleteRangeTest) { CloseTtl(); } +class DummyFilter : public CompactionFilter { + public: + bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, + std::string* /*new_value*/, + bool* /*value_changed*/) const override { + return false; + } + + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "DummyFilter"; } +}; + +class DummyFilterFactory : public CompactionFilterFactory { + public: + const char* Name() const override { return kClassName(); } + static const char* kClassName() { return "DummyFilterFactory"; } + + std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context&) override { + std::unique_ptr f(new DummyFilter()); + return f; + } +}; + +static int RegisterTestObjects(ObjectLibrary& library, + const std::string& /*arg*/) { + library.Register( + "DummyFilter", [](const std::string& /*uri*/, + std::unique_ptr* /*guard*/, + std::string* /* errmsg */) { + static DummyFilter dummy; + return &dummy; + }); + library.Register( + "DummyFilterFactory", [](const std::string& /*uri*/, + std::unique_ptr* guard, + std::string* /* errmsg */) { + guard->reset(new DummyFilterFactory()); + return guard->get(); + }); + return 2; +} + +class TtlOptionsTest : public testing::Test { + public: + TtlOptionsTest() { + config_options_.registry->AddLibrary("RegisterTtlObjects", + RegisterTtlObjects, ""); + config_options_.registry->AddLibrary("RegisterTtlTestObjects", + RegisterTestObjects, ""); + } + ConfigOptions config_options_; +}; + +TEST_F(TtlOptionsTest, LoadTtlCompactionFilter) { + const CompactionFilter* filter = nullptr; + + ASSERT_OK(CompactionFilter::CreateFromString( + config_options_, TtlCompactionFilter::kClassName(), &filter)); + ASSERT_NE(filter, nullptr); + ASSERT_STREQ(filter->Name(), TtlCompactionFilter::kClassName()); + auto ttl = filter->GetOptions("TTL"); + ASSERT_NE(ttl, nullptr); + ASSERT_EQ(*ttl, 0); + ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions())); + delete filter; + filter = nullptr; + + ASSERT_OK(CompactionFilter::CreateFromString( + config_options_, "id=TtlCompactionFilter; ttl=123", &filter)); + ASSERT_NE(filter, nullptr); + ttl = filter->GetOptions("TTL"); + ASSERT_NE(ttl, nullptr); + ASSERT_EQ(*ttl, 123); + ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions())); + delete filter; + filter = nullptr; + + ASSERT_OK(CompactionFilter::CreateFromString( + config_options_, + "id=TtlCompactionFilter; ttl=456; user_filter=DummyFilter;", &filter)); + ASSERT_NE(filter, nullptr); + auto inner = filter->CheckedCast(); + ASSERT_NE(inner, nullptr); + ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions())); + std::string mismatch; + std::string opts_str = filter->ToString(config_options_); + const CompactionFilter* copy = nullptr; + ASSERT_OK( + CompactionFilter::CreateFromString(config_options_, opts_str, ©)); + ASSERT_TRUE(filter->AreEquivalent(config_options_, copy, &mismatch)); + delete filter; + delete copy; +} + +TEST_F(TtlOptionsTest, LoadTtlCompactionFilterFactory) { + std::shared_ptr cff; + + ASSERT_OK(CompactionFilterFactory::CreateFromString( + config_options_, TtlCompactionFilterFactory::kClassName(), &cff)); + ASSERT_NE(cff.get(), nullptr); + ASSERT_STREQ(cff->Name(), TtlCompactionFilterFactory::kClassName()); + auto ttl = cff->GetOptions("TTL"); + ASSERT_NE(ttl, nullptr); + ASSERT_EQ(*ttl, 0); + ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions())); + + ASSERT_OK(CompactionFilterFactory::CreateFromString( + config_options_, "id=TtlCompactionFilterFactory; ttl=123", &cff)); + ASSERT_NE(cff.get(), nullptr); + ASSERT_STREQ(cff->Name(), TtlCompactionFilterFactory::kClassName()); + ttl = cff->GetOptions("TTL"); + ASSERT_NE(ttl, nullptr); + ASSERT_EQ(*ttl, 123); + ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions())); + + ASSERT_OK(CompactionFilterFactory::CreateFromString( + config_options_, + "id=TtlCompactionFilterFactory; ttl=456; " + "user_filter_factory=DummyFilterFactory;", + &cff)); + ASSERT_NE(cff.get(), nullptr); + auto filter = cff->CreateCompactionFilter(CompactionFilter::Context()); + ASSERT_NE(filter.get(), nullptr); + auto ttlf = filter->CheckedCast(); + ASSERT_EQ(filter.get(), ttlf); + auto user = filter->CheckedCast(); + ASSERT_NE(user, nullptr); + ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions())); + + std::string opts_str = cff->ToString(config_options_); + std::string mismatch; + std::shared_ptr copy; + ASSERT_OK(CompactionFilterFactory::CreateFromString(config_options_, opts_str, + ©)); + ASSERT_TRUE(cff->AreEquivalent(config_options_, copy.get(), &mismatch)); +} + +TEST_F(TtlOptionsTest, LoadTtlMergeOperator) { + std::shared_ptr mo; + + config_options_.invoke_prepare_options = false; + ASSERT_OK(MergeOperator::CreateFromString( + config_options_, TtlMergeOperator::kClassName(), &mo)); + ASSERT_NE(mo.get(), nullptr); + ASSERT_STREQ(mo->Name(), TtlMergeOperator::kClassName()); + ASSERT_NOK(mo->ValidateOptions(DBOptions(), ColumnFamilyOptions())); + + config_options_.invoke_prepare_options = true; + ASSERT_OK(MergeOperator::CreateFromString( + config_options_, "id=TtlMergeOperator; user_operator=bytesxor", &mo)); + ASSERT_NE(mo.get(), nullptr); + ASSERT_STREQ(mo->Name(), TtlMergeOperator::kClassName()); + ASSERT_OK(mo->ValidateOptions(DBOptions(), ColumnFamilyOptions())); + auto ttl_mo = mo->CheckedCast(); + ASSERT_EQ(mo.get(), ttl_mo); + auto user = ttl_mo->CheckedCast(); + ASSERT_NE(user, nullptr); + + std::string mismatch; + std::string opts_str = mo->ToString(config_options_); + std::shared_ptr copy; + ASSERT_OK(MergeOperator::CreateFromString(config_options_, opts_str, ©)); + ASSERT_TRUE(mo->AreEquivalent(config_options_, copy.get(), &mismatch)); +} } // namespace ROCKSDB_NAMESPACE // A black-box test for the ttl wrapper around rocksdb