garbage collect tombstones in merge operator

Summary:
Remove cassandra tombstone when reaching the max compaction level (full merge). if all columns collected key will be removed in next compaction via compaction filter
Closes https://github.com/facebook/rocksdb/pull/2791

Reviewed By: sagar0

Differential Revision: D5722465

Pulled By: wpc

fbshipit-source-id: 61e9898a5686551653a16383255aeaab3197e65e
main
Pengchao Wang 7 years ago committed by Facebook Github Bot
parent 26ac24f199
commit 825a22c00c
  1. 1
      java/CMakeLists.txt
  2. 17
      java/rocksjni/cassandra_value_operator.cc
  3. 6
      java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java
  4. 1
      src.mk
  5. 7
      utilities/cassandra/cassandra_compaction_filter.cc
  6. 8
      utilities/cassandra/cassandra_format_test.cc
  7. 68
      utilities/cassandra/cassandra_functional_test.cc
  8. 28
      utilities/cassandra/format.cc
  9. 9
      utilities/cassandra/format.h
  10. 6
      utilities/cassandra/merge_operator.cc
  11. 6
      utilities/cassandra/merge_operator.h
  12. 10
      utilities/cassandra/test_utils.cc
  13. 3
      utilities/cassandra/test_utils.h
  14. 3
      utilities/merge_operators.h

@ -25,6 +25,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/ratelimiterjni.cc rocksjni/ratelimiterjni.cc
rocksjni/remove_emptyvalue_compactionfilterjni.cc rocksjni/remove_emptyvalue_compactionfilterjni.cc
rocksjni/cassandra_compactionfilterjni.cc rocksjni/cassandra_compactionfilterjni.cc
rocksjni/cassandra_value_operator.cc
rocksjni/restorejni.cc rocksjni/restorejni.cc
rocksjni/rocksdb_exception_test.cc rocksjni/rocksdb_exception_test.cc
rocksjni/rocksjni.cc rocksjni/rocksjni.cc

@ -23,13 +23,14 @@
/* /*
* Class: org_rocksdb_CassandraValueMergeOperator * Class: org_rocksdb_CassandraValueMergeOperator
* Method: newSharedCassandraValueMergeOperator * Method: newSharedCassandraValueMergeOperator
* Signature: ()J * Signature: (I)J
*/ */
jlong Java_org_rocksdb_CassandraValueMergeOperator_newSharedCassandraValueMergeOperator jlong Java_org_rocksdb_CassandraValueMergeOperator_newSharedCassandraValueMergeOperator(
(JNIEnv* env, jclass jclazz) { JNIEnv* env, jclass jclazz, jint gcGracePeriodInSeconds) {
auto* sptr_string_append_op = new std::shared_ptr<rocksdb::MergeOperator>( auto* op = new std::shared_ptr<rocksdb::MergeOperator>(
rocksdb::CassandraValueMergeOperator::CreateSharedInstance()); new rocksdb::cassandra::CassandraValueMergeOperator(
return reinterpret_cast<jlong>(sptr_string_append_op); gcGracePeriodInSeconds));
return reinterpret_cast<jlong>(op);
} }
/* /*
@ -39,7 +40,7 @@ jlong Java_org_rocksdb_CassandraValueMergeOperator_newSharedCassandraValueMergeO
*/ */
void Java_org_rocksdb_CassandraValueMergeOperator_disposeInternal( void Java_org_rocksdb_CassandraValueMergeOperator_disposeInternal(
JNIEnv* env, jobject jobj, jlong jhandle) { JNIEnv* env, jobject jobj, jlong jhandle) {
auto* sptr_string_append_op = auto* op =
reinterpret_cast<std::shared_ptr<rocksdb::MergeOperator>*>(jhandle); reinterpret_cast<std::shared_ptr<rocksdb::MergeOperator>*>(jhandle);
delete sptr_string_append_op; // delete std::shared_ptr delete op;
} }

@ -10,11 +10,11 @@ package org.rocksdb;
* values. * values.
*/ */
public class CassandraValueMergeOperator extends MergeOperator { public class CassandraValueMergeOperator extends MergeOperator {
public CassandraValueMergeOperator() { public CassandraValueMergeOperator(int gcGracePeriodInSeconds) {
super(newSharedCassandraValueMergeOperator()); super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds));
} }
private native static long newSharedCassandraValueMergeOperator(); private native static long newSharedCassandraValueMergeOperator(int gcGracePeriodInSeconds);
@Override protected final native void disposeInternal(final long handle); @Override protected final native void disposeInternal(final long handle);
} }

@ -383,6 +383,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/ratelimiterjni.cc \ java/rocksjni/ratelimiterjni.cc \
java/rocksjni/remove_emptyvalue_compactionfilterjni.cc \ java/rocksjni/remove_emptyvalue_compactionfilterjni.cc \
java/rocksjni/cassandra_compactionfilterjni.cc \ java/rocksjni/cassandra_compactionfilterjni.cc \
java/rocksjni/cassandra_value_operator.cc \
java/rocksjni/restorejni.cc \ java/rocksjni/restorejni.cc \
java/rocksjni/rocksjni.cc \ java/rocksjni/rocksjni.cc \
java/rocksjni/rocksdb_exception_test.cc \ java/rocksjni/rocksdb_exception_test.cc \

@ -27,9 +27,10 @@ CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
bool value_changed = false; bool value_changed = false;
RowValue row_value = RowValue::Deserialize( RowValue row_value = RowValue::Deserialize(
existing_value.data(), existing_value.size()); existing_value.data(), existing_value.size());
RowValue compacted = purge_ttl_on_expiration_ ? RowValue compacted =
row_value.PurgeTtl(&value_changed) : purge_ttl_on_expiration_
row_value.ExpireTtl(&value_changed); ? row_value.RemoveExpiredColumns(&value_changed)
: row_value.ConvertExpiredColumnsToTombstones(&value_changed);
if(compacted.Empty()) { if(compacted.Empty()) {
return Decision::kRemove; return Decision::kRemove;

@ -311,14 +311,14 @@ TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired) {
}); });
bool changed = false; bool changed = false;
auto purged = row_value.PurgeTtl(&changed); auto purged = row_value.RemoveExpiredColumns(&changed);
EXPECT_TRUE(changed); EXPECT_TRUE(changed);
EXPECT_EQ(purged.columns_.size(), 3); EXPECT_EQ(purged.columns_.size(), 3);
VerifyRowValueColumns(purged.columns_, 0, kColumn, 0, ToMicroSeconds(now)); VerifyRowValueColumns(purged.columns_, 0, kColumn, 0, ToMicroSeconds(now));
VerifyRowValueColumns(purged.columns_, 1, kExpiringColumn, 2, ToMicroSeconds(now)); VerifyRowValueColumns(purged.columns_, 1, kExpiringColumn, 2, ToMicroSeconds(now));
VerifyRowValueColumns(purged.columns_, 2, kTombstone, 3, ToMicroSeconds(now)); VerifyRowValueColumns(purged.columns_, 2, kTombstone, 3, ToMicroSeconds(now));
purged.PurgeTtl(&changed); purged.RemoveExpiredColumns(&changed);
EXPECT_FALSE(changed); EXPECT_FALSE(changed);
} }
@ -333,7 +333,7 @@ TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) {
}); });
bool changed = false; bool changed = false;
auto compacted = row_value.ExpireTtl(&changed); auto compacted = row_value.ConvertExpiredColumnsToTombstones(&changed);
EXPECT_TRUE(changed); EXPECT_TRUE(changed);
EXPECT_EQ(compacted.columns_.size(), 4); EXPECT_EQ(compacted.columns_.size(), 4);
VerifyRowValueColumns(compacted.columns_, 0, kColumn, 0, ToMicroSeconds(now)); VerifyRowValueColumns(compacted.columns_, 0, kColumn, 0, ToMicroSeconds(now));
@ -341,7 +341,7 @@ TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) {
VerifyRowValueColumns(compacted.columns_, 2, kExpiringColumn, 2, ToMicroSeconds(now)); VerifyRowValueColumns(compacted.columns_, 2, kExpiringColumn, 2, ToMicroSeconds(now));
VerifyRowValueColumns(compacted.columns_, 3, kTombstone, 3, ToMicroSeconds(now)); VerifyRowValueColumns(compacted.columns_, 3, kTombstone, 3, ToMicroSeconds(now));
compacted.ExpireTtl(&changed); compacted.ConvertExpiredColumnsToTombstones(&changed);
EXPECT_FALSE(changed); EXPECT_FALSE(changed);
} }
} // namespace cassandra } // namespace cassandra

@ -112,7 +112,7 @@ public:
DB* db; DB* db;
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.merge_operator.reset(new CassandraValueMergeOperator()); options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_));
auto* cf_factory = new TestCompactionFilterFactory(purge_ttl_on_expiration_); auto* cf_factory = new TestCompactionFilterFactory(purge_ttl_on_expiration_);
options.compaction_filter_factory.reset(cf_factory); options.compaction_filter_factory.reset(cf_factory);
EXPECT_OK(DB::Open(options, kDbName, &db)); EXPECT_OK(DB::Open(options, kDbName, &db));
@ -120,29 +120,31 @@ public:
} }
bool purge_ttl_on_expiration_ = false; bool purge_ttl_on_expiration_ = false;
int32_t gc_grace_period_in_seconds_ = 100;
}; };
// THE TEST CASES BEGIN HERE // THE TEST CASES BEGIN HERE
TEST_F(CassandraFunctionalTest, SimpleMergeTest) { TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
CassandraStore store(OpenDb()); CassandraStore store(OpenDb());
int64_t now = time(nullptr);
store.Append("k1", CreateTestRowValue({ store.Append("k1", CreateTestRowValue({
std::make_tuple(kTombstone, 0, 5), std::make_tuple(kTombstone, 0, ToMicroSeconds(now + 5)),
std::make_tuple(kColumn, 1, 8), std::make_tuple(kColumn, 1, ToMicroSeconds(now + 8)),
std::make_tuple(kExpiringColumn, 2, 5), std::make_tuple(kExpiringColumn, 2, ToMicroSeconds(now + 5)),
})); }));
store.Append("k1",CreateTestRowValue({ store.Append("k1",CreateTestRowValue({
std::make_tuple(kColumn, 0, 2), std::make_tuple(kColumn, 0, ToMicroSeconds(now + 2)),
std::make_tuple(kExpiringColumn, 1, 5), std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now + 5)),
std::make_tuple(kTombstone, 2, 7), std::make_tuple(kTombstone, 2, ToMicroSeconds(now + 7)),
std::make_tuple(kExpiringColumn, 7, 17), std::make_tuple(kExpiringColumn, 7, ToMicroSeconds(now + 17)),
})); }));
store.Append("k1", CreateTestRowValue({ store.Append("k1", CreateTestRowValue({
std::make_tuple(kExpiringColumn, 0, 6), std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now + 6)),
std::make_tuple(kTombstone, 1, 5), std::make_tuple(kTombstone, 1, ToMicroSeconds(now + 5)),
std::make_tuple(kColumn, 2, 4), std::make_tuple(kColumn, 2, ToMicroSeconds(now + 4)),
std::make_tuple(kTombstone, 11, 11), std::make_tuple(kTombstone, 11, ToMicroSeconds(now + 11)),
})); }));
auto ret = store.Get("k1"); auto ret = store.Get("k1");
@ -150,11 +152,11 @@ TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
ASSERT_TRUE(std::get<0>(ret)); ASSERT_TRUE(std::get<0>(ret));
RowValue& merged = std::get<1>(ret); RowValue& merged = std::get<1>(ret);
EXPECT_EQ(merged.columns_.size(), 5); EXPECT_EQ(merged.columns_.size(), 5);
VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6); VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, ToMicroSeconds(now + 6));
VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8); VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, ToMicroSeconds(now + 8));
VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7); VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, ToMicroSeconds(now + 7));
VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17); VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, ToMicroSeconds(now + 17));
VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11); VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, ToMicroSeconds(now + 11));
} }
TEST_F(CassandraFunctionalTest, TEST_F(CassandraFunctionalTest,
@ -242,6 +244,38 @@ TEST_F(CassandraFunctionalTest,
ASSERT_FALSE(std::get<0>(store.Get("k1"))); ASSERT_FALSE(std::get<0>(store.Get("k1")));
} }
TEST_F(CassandraFunctionalTest,
CompactionShouldRemoveTombstoneExceedingGCGracePeriod) {
purge_ttl_on_expiration_ = true;
CassandraStore store(OpenDb());
int64_t now = time(nullptr);
store.Append("k1", CreateTestRowValue({
std::make_tuple(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)),
std::make_tuple(kColumn, 1, ToMicroSeconds(now))
}));
store.Append("k2", CreateTestRowValue({
std::make_tuple(kColumn, 0, ToMicroSeconds(now))
}));
store.Flush();
store.Append("k1",CreateTestRowValue({
std::make_tuple(kColumn, 1, ToMicroSeconds(now)),
}));
store.Flush();
store.Compact();
auto ret = store.Get("k1");
ASSERT_TRUE(std::get<0>(ret));
RowValue& gced = std::get<1>(ret);
EXPECT_EQ(gced.columns_.size(), 1);
VerifyRowValueColumns(gced.columns_, 0, kColumn, 1, ToMicroSeconds(now));
}
} // namespace cassandra } // namespace cassandra
} // namespace rocksdb } // namespace rocksdb

@ -176,6 +176,13 @@ void Tombstone::Serialize(std::string* dest) const {
rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest); rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
} }
bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const {
auto local_deleted_at = std::chrono::time_point<std::chrono::system_clock>(
std::chrono::seconds(local_deletion_time_));
auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds);
return local_deleted_at + gc_grace_period < std::chrono::system_clock::now();
}
std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src, std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src,
std::size_t offset) { std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset); int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
@ -231,7 +238,7 @@ void RowValue::Serialize(std::string* dest) const {
} }
} }
RowValue RowValue::PurgeTtl(bool* changed) const { RowValue RowValue::RemoveExpiredColumns(bool* changed) const {
*changed = false; *changed = false;
Columns new_columns; Columns new_columns;
for (auto& column : columns_) { for (auto& column : columns_) {
@ -250,7 +257,7 @@ RowValue RowValue::PurgeTtl(bool* changed) const {
return RowValue(std::move(new_columns), last_modified_time_); return RowValue(std::move(new_columns), last_modified_time_);
} }
RowValue RowValue::ExpireTtl(bool* changed) const { RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const {
*changed = false; *changed = false;
Columns new_columns; Columns new_columns;
for (auto& column : columns_) { for (auto& column : columns_) {
@ -270,6 +277,23 @@ RowValue RowValue::ExpireTtl(bool* changed) const {
return RowValue(std::move(new_columns), last_modified_time_); return RowValue(std::move(new_columns), last_modified_time_);
} }
RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const {
Columns new_columns;
for (auto& column : columns_) {
if (column->Mask() == ColumnTypeMask::DELETION_MASK) {
std::shared_ptr<Tombstone> tombstone =
std::static_pointer_cast<Tombstone>(column);
if (tombstone->Collectable(gc_grace_period)) {
continue;
}
}
new_columns.push_back(column);
}
return RowValue(std::move(new_columns), last_modified_time_);
}
bool RowValue::Empty() const { bool RowValue::Empty() const {
return columns_.empty(); return columns_.empty();
} }

@ -115,7 +115,7 @@ public:
virtual int64_t Timestamp() const override; virtual int64_t Timestamp() const override;
virtual std::size_t Size() const override; virtual std::size_t Size() const override;
virtual void Serialize(std::string* dest) const override; virtual void Serialize(std::string* dest) const override;
bool Collectable(int32_t gc_grace_period) const;
static std::shared_ptr<Tombstone> Deserialize(const char* src, static std::shared_ptr<Tombstone> Deserialize(const char* src,
std::size_t offset); std::size_t offset);
@ -163,8 +163,9 @@ public:
// otherwise it returns the max timestamp of containing columns. // otherwise it returns the max timestamp of containing columns.
int64_t LastModifiedTime() const; int64_t LastModifiedTime() const;
void Serialize(std::string* dest) const; void Serialize(std::string* dest) const;
RowValue PurgeTtl(bool* changed) const; RowValue RemoveExpiredColumns(bool* changed) const;
RowValue ExpireTtl(bool* changed) const; RowValue ConvertExpiredColumnsToTombstones(bool* changed) const;
RowValue RemoveTombstones(int32_t gc_grace_period) const;
bool Empty() const; bool Empty() const;
static RowValue Deserialize(const char* src, std::size_t size); static RowValue Deserialize(const char* src, std::size_t size);
@ -188,6 +189,8 @@ private:
CassandraFunctionalTest, CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn); CassandraFunctionalTest, CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn);
FRIEND_TEST( FRIEND_TEST(
CassandraFunctionalTest, CompactionShouldRemoveRowWhenAllColumnExpiredIfPurgeTtlIsOn); CassandraFunctionalTest, CompactionShouldRemoveRowWhenAllColumnExpiredIfPurgeTtlIsOn);
FRIEND_TEST(CassandraFunctionalTest,
CompactionShouldRemoveTombstoneExceedingGCGracePeriod);
}; };
} // namepsace cassandrda } // namepsace cassandrda

@ -41,6 +41,7 @@ bool CassandraValueMergeOperator::FullMergeV2(
} }
RowValue merged = RowValue::Merge(std::move(row_values)); RowValue merged = RowValue::Merge(std::move(row_values));
merged = merged.RemoveTombstones(gc_grace_period_in_seconds_);
merge_out->new_value.reserve(merged.Size()); merge_out->new_value.reserve(merged.Size());
merged.Serialize(&(merge_out->new_value)); merged.Serialize(&(merge_out->new_value));
@ -72,9 +73,4 @@ const char* CassandraValueMergeOperator::Name() const {
} // namespace cassandra } // namespace cassandra
std::shared_ptr<MergeOperator>
MergeOperators::CreateCassandraMergeOperator() {
return std::make_shared<rocksdb::cassandra::CassandraValueMergeOperator>();
}
} // namespace rocksdb } // namespace rocksdb

@ -15,7 +15,8 @@ namespace cassandra {
*/ */
class CassandraValueMergeOperator : public MergeOperator { class CassandraValueMergeOperator : public MergeOperator {
public: public:
static std::shared_ptr<MergeOperator> CreateSharedInstance(); explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds)
: gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
virtual bool FullMergeV2(const MergeOperationInput& merge_in, virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override; MergeOperationOutput* merge_out) const override;
@ -28,6 +29,9 @@ public:
virtual const char* Name() const override; virtual const char* Name() const override;
virtual bool AllowSingleOperand() const override { return true; } virtual bool AllowSingleOperand() const override { return true; }
private:
int32_t gc_grace_period_in_seconds_;
}; };
} // namespace cassandra } // namespace cassandra
} // namespace rocksdb } // namespace rocksdb

@ -9,7 +9,6 @@ namespace rocksdb {
namespace cassandra { namespace cassandra {
const char kData[] = {'d', 'a', 't', 'a'}; const char kData[] = {'d', 'a', 't', 'a'};
const char kExpiringData[] = {'e', 'd', 'a', 't', 'a'}; const char kExpiringData[] = {'e', 'd', 'a', 't', 'a'};
const int32_t kLocalDeletionTime = 1;
const int32_t kTtl = 86400; const int32_t kTtl = 86400;
const int8_t kColumn = 0; const int8_t kColumn = 0;
const int8_t kTombstone = 1; const int8_t kTombstone = 1;
@ -19,8 +18,8 @@ std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask,
int8_t index, int8_t index,
int64_t timestamp) { int64_t timestamp) {
if ((mask & ColumnTypeMask::DELETION_MASK) != 0) { if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
return std::shared_ptr<Tombstone>(new Tombstone( return std::shared_ptr<Tombstone>(
mask, index, kLocalDeletionTime, timestamp)); new Tombstone(mask, index, ToSeconds(timestamp), timestamp));
} else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) { } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
return std::shared_ptr<ExpiringColumn>(new ExpiringColumn( return std::shared_ptr<ExpiringColumn>(new ExpiringColumn(
mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl)); mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl));
@ -44,7 +43,7 @@ RowValue CreateTestRowValue(
} }
RowValue CreateRowTombstone(int64_t timestamp) { RowValue CreateRowTombstone(int64_t timestamp) {
return RowValue(kLocalDeletionTime, timestamp); return RowValue(ToSeconds(timestamp), timestamp);
} }
void VerifyRowValueColumns( void VerifyRowValueColumns(
@ -63,5 +62,8 @@ int64_t ToMicroSeconds(int64_t seconds) {
return seconds * (int64_t)1000000; return seconds * (int64_t)1000000;
} }
int32_t ToSeconds(int64_t microseconds) {
return (int32_t)(microseconds / (int64_t)1000000);
}
} }
} }

@ -13,7 +13,6 @@ namespace rocksdb {
namespace cassandra { namespace cassandra {
extern const char kData[]; extern const char kData[];
extern const char kExpiringData[]; extern const char kExpiringData[];
extern const int32_t kLocalDeletionTime;
extern const int32_t kTtl; extern const int32_t kTtl;
extern const int8_t kColumn; extern const int8_t kColumn;
extern const int8_t kTombstone; extern const int8_t kTombstone;
@ -38,6 +37,6 @@ void VerifyRowValueColumns(
); );
int64_t ToMicroSeconds(int64_t seconds); int64_t ToMicroSeconds(int64_t seconds);
int32_t ToSeconds(int64_t microseconds);
} }
} }

@ -21,7 +21,6 @@ class MergeOperators {
static std::shared_ptr<MergeOperator> CreateStringAppendOperator(); static std::shared_ptr<MergeOperator> CreateStringAppendOperator();
static std::shared_ptr<MergeOperator> CreateStringAppendTESTOperator(); static std::shared_ptr<MergeOperator> CreateStringAppendTESTOperator();
static std::shared_ptr<MergeOperator> CreateMaxOperator(); static std::shared_ptr<MergeOperator> CreateMaxOperator();
static std::shared_ptr<MergeOperator> CreateCassandraMergeOperator();
// Will return a different merge operator depending on the string. // Will return a different merge operator depending on the string.
// TODO: Hook the "name" up to the actual Name() of the MergeOperators? // TODO: Hook the "name" up to the actual Name() of the MergeOperators?
@ -39,8 +38,6 @@ class MergeOperators {
return CreateStringAppendTESTOperator(); return CreateStringAppendTESTOperator();
} else if (name == "max") { } else if (name == "max") {
return CreateMaxOperator(); return CreateMaxOperator();
} else if (name == "cassandra") {
return CreateCassandraMergeOperator();
} else { } else {
// Empty or unknown, just return nullptr // Empty or unknown, just return nullptr
return nullptr; return nullptr;

Loading…
Cancel
Save