diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index d67896c2c..2cf80b473 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -25,6 +25,7 @@ set(JNI_NATIVE_SOURCES rocksjni/ratelimiterjni.cc rocksjni/remove_emptyvalue_compactionfilterjni.cc rocksjni/cassandra_compactionfilterjni.cc + rocksjni/cassandra_value_operator.cc rocksjni/restorejni.cc rocksjni/rocksdb_exception_test.cc rocksjni/rocksjni.cc diff --git a/java/rocksjni/cassandra_value_operator.cc b/java/rocksjni/cassandra_value_operator.cc index aa58eccc2..5ba13cfe5 100644 --- a/java/rocksjni/cassandra_value_operator.cc +++ b/java/rocksjni/cassandra_value_operator.cc @@ -23,13 +23,14 @@ /* * Class: org_rocksdb_CassandraValueMergeOperator * Method: newSharedCassandraValueMergeOperator - * Signature: ()J + * Signature: (I)J */ -jlong Java_org_rocksdb_CassandraValueMergeOperator_newSharedCassandraValueMergeOperator -(JNIEnv* env, jclass jclazz) { - auto* sptr_string_append_op = new std::shared_ptr( - rocksdb::CassandraValueMergeOperator::CreateSharedInstance()); - return reinterpret_cast(sptr_string_append_op); +jlong Java_org_rocksdb_CassandraValueMergeOperator_newSharedCassandraValueMergeOperator( + JNIEnv* env, jclass jclazz, jint gcGracePeriodInSeconds) { + auto* op = new std::shared_ptr( + new rocksdb::cassandra::CassandraValueMergeOperator( + gcGracePeriodInSeconds)); + return reinterpret_cast(op); } /* @@ -39,7 +40,7 @@ jlong Java_org_rocksdb_CassandraValueMergeOperator_newSharedCassandraValueMergeO */ void Java_org_rocksdb_CassandraValueMergeOperator_disposeInternal( JNIEnv* env, jobject jobj, jlong jhandle) { - auto* sptr_string_append_op = - reinterpret_cast* >(jhandle); - delete sptr_string_append_op; // delete std::shared_ptr + auto* op = + reinterpret_cast*>(jhandle); + delete op; } diff --git a/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java b/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java index a09556a2b..310a6e8ff 100644 --- a/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java +++ b/java/src/main/java/org/rocksdb/CassandraValueMergeOperator.java @@ -10,11 +10,11 @@ package org.rocksdb; * values. */ public class CassandraValueMergeOperator extends MergeOperator { - public CassandraValueMergeOperator() { - super(newSharedCassandraValueMergeOperator()); + public CassandraValueMergeOperator(int gcGracePeriodInSeconds) { + super(newSharedCassandraValueMergeOperator(gcGracePeriodInSeconds)); } - private native static long newSharedCassandraValueMergeOperator(); + private native static long newSharedCassandraValueMergeOperator(int gcGracePeriodInSeconds); @Override protected final native void disposeInternal(final long handle); } diff --git a/src.mk b/src.mk index 90fecee5b..443070ea2 100644 --- a/src.mk +++ b/src.mk @@ -383,6 +383,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/ratelimiterjni.cc \ java/rocksjni/remove_emptyvalue_compactionfilterjni.cc \ java/rocksjni/cassandra_compactionfilterjni.cc \ + java/rocksjni/cassandra_value_operator.cc \ java/rocksjni/restorejni.cc \ java/rocksjni/rocksjni.cc \ java/rocksjni/rocksdb_exception_test.cc \ diff --git a/utilities/cassandra/cassandra_compaction_filter.cc b/utilities/cassandra/cassandra_compaction_filter.cc index e817972ee..a33c64211 100644 --- a/utilities/cassandra/cassandra_compaction_filter.cc +++ b/utilities/cassandra/cassandra_compaction_filter.cc @@ -27,9 +27,10 @@ CompactionFilter::Decision CassandraCompactionFilter::FilterV2( bool value_changed = false; RowValue row_value = RowValue::Deserialize( existing_value.data(), existing_value.size()); - RowValue compacted = purge_ttl_on_expiration_ ? - row_value.PurgeTtl(&value_changed) : - row_value.ExpireTtl(&value_changed); + RowValue compacted = + purge_ttl_on_expiration_ + ? row_value.RemoveExpiredColumns(&value_changed) + : row_value.ConvertExpiredColumnsToTombstones(&value_changed); if(compacted.Empty()) { return Decision::kRemove; diff --git a/utilities/cassandra/cassandra_format_test.cc b/utilities/cassandra/cassandra_format_test.cc index 0cf124d0c..1cb114843 100644 --- a/utilities/cassandra/cassandra_format_test.cc +++ b/utilities/cassandra/cassandra_format_test.cc @@ -311,14 +311,14 @@ TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired) { }); bool changed = false; - auto purged = row_value.PurgeTtl(&changed); + auto purged = row_value.RemoveExpiredColumns(&changed); EXPECT_TRUE(changed); EXPECT_EQ(purged.columns_.size(), 3); VerifyRowValueColumns(purged.columns_, 0, kColumn, 0, ToMicroSeconds(now)); VerifyRowValueColumns(purged.columns_, 1, kExpiringColumn, 2, ToMicroSeconds(now)); VerifyRowValueColumns(purged.columns_, 2, kTombstone, 3, ToMicroSeconds(now)); - purged.PurgeTtl(&changed); + purged.RemoveExpiredColumns(&changed); EXPECT_FALSE(changed); } @@ -333,7 +333,7 @@ TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) { }); bool changed = false; - auto compacted = row_value.ExpireTtl(&changed); + auto compacted = row_value.ConvertExpiredColumnsToTombstones(&changed); EXPECT_TRUE(changed); EXPECT_EQ(compacted.columns_.size(), 4); VerifyRowValueColumns(compacted.columns_, 0, kColumn, 0, ToMicroSeconds(now)); @@ -341,7 +341,7 @@ TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) { VerifyRowValueColumns(compacted.columns_, 2, kExpiringColumn, 2, ToMicroSeconds(now)); VerifyRowValueColumns(compacted.columns_, 3, kTombstone, 3, ToMicroSeconds(now)); - compacted.ExpireTtl(&changed); + compacted.ConvertExpiredColumnsToTombstones(&changed); EXPECT_FALSE(changed); } } // namespace cassandra diff --git a/utilities/cassandra/cassandra_functional_test.cc b/utilities/cassandra/cassandra_functional_test.cc index 0c02228a7..28fb5c555 100644 --- a/utilities/cassandra/cassandra_functional_test.cc +++ b/utilities/cassandra/cassandra_functional_test.cc @@ -112,7 +112,7 @@ public: DB* db; Options options; options.create_if_missing = true; - options.merge_operator.reset(new CassandraValueMergeOperator()); + options.merge_operator.reset(new CassandraValueMergeOperator(gc_grace_period_in_seconds_)); auto* cf_factory = new TestCompactionFilterFactory(purge_ttl_on_expiration_); options.compaction_filter_factory.reset(cf_factory); EXPECT_OK(DB::Open(options, kDbName, &db)); @@ -120,29 +120,31 @@ public: } bool purge_ttl_on_expiration_ = false; + int32_t gc_grace_period_in_seconds_ = 100; }; // THE TEST CASES BEGIN HERE TEST_F(CassandraFunctionalTest, SimpleMergeTest) { CassandraStore store(OpenDb()); + int64_t now = time(nullptr); store.Append("k1", CreateTestRowValue({ - std::make_tuple(kTombstone, 0, 5), - std::make_tuple(kColumn, 1, 8), - std::make_tuple(kExpiringColumn, 2, 5), + std::make_tuple(kTombstone, 0, ToMicroSeconds(now + 5)), + std::make_tuple(kColumn, 1, ToMicroSeconds(now + 8)), + std::make_tuple(kExpiringColumn, 2, ToMicroSeconds(now + 5)), })); store.Append("k1",CreateTestRowValue({ - std::make_tuple(kColumn, 0, 2), - std::make_tuple(kExpiringColumn, 1, 5), - std::make_tuple(kTombstone, 2, 7), - std::make_tuple(kExpiringColumn, 7, 17), + std::make_tuple(kColumn, 0, ToMicroSeconds(now + 2)), + std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now + 5)), + std::make_tuple(kTombstone, 2, ToMicroSeconds(now + 7)), + std::make_tuple(kExpiringColumn, 7, ToMicroSeconds(now + 17)), })); store.Append("k1", CreateTestRowValue({ - std::make_tuple(kExpiringColumn, 0, 6), - std::make_tuple(kTombstone, 1, 5), - std::make_tuple(kColumn, 2, 4), - std::make_tuple(kTombstone, 11, 11), + std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now + 6)), + std::make_tuple(kTombstone, 1, ToMicroSeconds(now + 5)), + std::make_tuple(kColumn, 2, ToMicroSeconds(now + 4)), + std::make_tuple(kTombstone, 11, ToMicroSeconds(now + 11)), })); auto ret = store.Get("k1"); @@ -150,11 +152,11 @@ TEST_F(CassandraFunctionalTest, SimpleMergeTest) { ASSERT_TRUE(std::get<0>(ret)); RowValue& merged = std::get<1>(ret); EXPECT_EQ(merged.columns_.size(), 5); - VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6); - VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8); - VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7); - VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17); - VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11); + VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, ToMicroSeconds(now + 6)); + VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, ToMicroSeconds(now + 8)); + VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, ToMicroSeconds(now + 7)); + VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, ToMicroSeconds(now + 17)); + VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, ToMicroSeconds(now + 11)); } TEST_F(CassandraFunctionalTest, @@ -242,6 +244,38 @@ TEST_F(CassandraFunctionalTest, ASSERT_FALSE(std::get<0>(store.Get("k1"))); } +TEST_F(CassandraFunctionalTest, + CompactionShouldRemoveTombstoneExceedingGCGracePeriod) { + purge_ttl_on_expiration_ = true; + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append("k1", CreateTestRowValue({ + std::make_tuple(kTombstone, 0, ToMicroSeconds(now - gc_grace_period_in_seconds_ - 1)), + std::make_tuple(kColumn, 1, ToMicroSeconds(now)) + })); + + store.Append("k2", CreateTestRowValue({ + std::make_tuple(kColumn, 0, ToMicroSeconds(now)) + })); + + store.Flush(); + + store.Append("k1",CreateTestRowValue({ + std::make_tuple(kColumn, 1, ToMicroSeconds(now)), + })); + + store.Flush(); + store.Compact(); + + auto ret = store.Get("k1"); + ASSERT_TRUE(std::get<0>(ret)); + RowValue& gced = std::get<1>(ret); + EXPECT_EQ(gced.columns_.size(), 1); + VerifyRowValueColumns(gced.columns_, 0, kColumn, 1, ToMicroSeconds(now)); +} + + } // namespace cassandra } // namespace rocksdb diff --git a/utilities/cassandra/format.cc b/utilities/cassandra/format.cc index 2b096cdbb..c5657e280 100644 --- a/utilities/cassandra/format.cc +++ b/utilities/cassandra/format.cc @@ -176,6 +176,13 @@ void Tombstone::Serialize(std::string* dest) const { rocksdb::cassandra::Serialize(marked_for_delete_at_, dest); } +bool Tombstone::Collectable(int32_t gc_grace_period_in_seconds) const { + auto local_deleted_at = std::chrono::time_point( + std::chrono::seconds(local_deletion_time_)); + auto gc_grace_period = std::chrono::seconds(gc_grace_period_in_seconds); + return local_deleted_at + gc_grace_period < std::chrono::system_clock::now(); +} + std::shared_ptr Tombstone::Deserialize(const char *src, std::size_t offset) { int8_t mask = rocksdb::cassandra::Deserialize(src, offset); @@ -231,7 +238,7 @@ void RowValue::Serialize(std::string* dest) const { } } -RowValue RowValue::PurgeTtl(bool* changed) const { +RowValue RowValue::RemoveExpiredColumns(bool* changed) const { *changed = false; Columns new_columns; for (auto& column : columns_) { @@ -250,7 +257,7 @@ RowValue RowValue::PurgeTtl(bool* changed) const { return RowValue(std::move(new_columns), last_modified_time_); } -RowValue RowValue::ExpireTtl(bool* changed) const { +RowValue RowValue::ConvertExpiredColumnsToTombstones(bool* changed) const { *changed = false; Columns new_columns; for (auto& column : columns_) { @@ -270,6 +277,23 @@ RowValue RowValue::ExpireTtl(bool* changed) const { return RowValue(std::move(new_columns), last_modified_time_); } +RowValue RowValue::RemoveTombstones(int32_t gc_grace_period) const { + Columns new_columns; + for (auto& column : columns_) { + if (column->Mask() == ColumnTypeMask::DELETION_MASK) { + std::shared_ptr tombstone = + std::static_pointer_cast(column); + + if (tombstone->Collectable(gc_grace_period)) { + continue; + } + } + + new_columns.push_back(column); + } + return RowValue(std::move(new_columns), last_modified_time_); +} + bool RowValue::Empty() const { return columns_.empty(); } diff --git a/utilities/cassandra/format.h b/utilities/cassandra/format.h index d8f51df14..6e743b0d4 100644 --- a/utilities/cassandra/format.h +++ b/utilities/cassandra/format.h @@ -115,7 +115,7 @@ public: virtual int64_t Timestamp() const override; virtual std::size_t Size() const override; virtual void Serialize(std::string* dest) const override; - + bool Collectable(int32_t gc_grace_period) const; static std::shared_ptr Deserialize(const char* src, std::size_t offset); @@ -163,8 +163,9 @@ public: // otherwise it returns the max timestamp of containing columns. int64_t LastModifiedTime() const; void Serialize(std::string* dest) const; - RowValue PurgeTtl(bool* changed) const; - RowValue ExpireTtl(bool* changed) const; + RowValue RemoveExpiredColumns(bool* changed) const; + RowValue ConvertExpiredColumnsToTombstones(bool* changed) const; + RowValue RemoveTombstones(int32_t gc_grace_period) const; bool Empty() const; static RowValue Deserialize(const char* src, std::size_t size); @@ -188,6 +189,8 @@ private: CassandraFunctionalTest, CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn); FRIEND_TEST( CassandraFunctionalTest, CompactionShouldRemoveRowWhenAllColumnExpiredIfPurgeTtlIsOn); + FRIEND_TEST(CassandraFunctionalTest, + CompactionShouldRemoveTombstoneExceedingGCGracePeriod); }; } // namepsace cassandrda diff --git a/utilities/cassandra/merge_operator.cc b/utilities/cassandra/merge_operator.cc index 715ef8586..470190380 100644 --- a/utilities/cassandra/merge_operator.cc +++ b/utilities/cassandra/merge_operator.cc @@ -41,6 +41,7 @@ bool CassandraValueMergeOperator::FullMergeV2( } RowValue merged = RowValue::Merge(std::move(row_values)); + merged = merged.RemoveTombstones(gc_grace_period_in_seconds_); merge_out->new_value.reserve(merged.Size()); merged.Serialize(&(merge_out->new_value)); @@ -72,9 +73,4 @@ const char* CassandraValueMergeOperator::Name() const { } // namespace cassandra -std::shared_ptr - MergeOperators::CreateCassandraMergeOperator() { - return std::make_shared(); -} - } // namespace rocksdb diff --git a/utilities/cassandra/merge_operator.h b/utilities/cassandra/merge_operator.h index 28066ca05..272bfc21e 100644 --- a/utilities/cassandra/merge_operator.h +++ b/utilities/cassandra/merge_operator.h @@ -15,19 +15,23 @@ namespace cassandra { */ class CassandraValueMergeOperator : public MergeOperator { public: - static std::shared_ptr CreateSharedInstance(); + explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds) + : gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {} - virtual bool FullMergeV2(const MergeOperationInput& merge_in, - MergeOperationOutput* merge_out) const override; + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override; - virtual bool PartialMergeMulti(const Slice& key, - const std::deque& operand_list, - std::string* new_value, - Logger* logger) const override; + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const override; - virtual const char* Name() const override; + virtual const char* Name() const override; - virtual bool AllowSingleOperand() const override { return true; } + virtual bool AllowSingleOperand() const override { return true; } + +private: + int32_t gc_grace_period_in_seconds_; }; } // namespace cassandra } // namespace rocksdb diff --git a/utilities/cassandra/test_utils.cc b/utilities/cassandra/test_utils.cc index 61f53b2d3..9ee1115d2 100644 --- a/utilities/cassandra/test_utils.cc +++ b/utilities/cassandra/test_utils.cc @@ -9,7 +9,6 @@ namespace rocksdb { namespace cassandra { const char kData[] = {'d', 'a', 't', 'a'}; const char kExpiringData[] = {'e', 'd', 'a', 't', 'a'}; -const int32_t kLocalDeletionTime = 1; const int32_t kTtl = 86400; const int8_t kColumn = 0; const int8_t kTombstone = 1; @@ -19,8 +18,8 @@ std::shared_ptr CreateTestColumn(int8_t mask, int8_t index, int64_t timestamp) { if ((mask & ColumnTypeMask::DELETION_MASK) != 0) { - return std::shared_ptr(new Tombstone( - mask, index, kLocalDeletionTime, timestamp)); + return std::shared_ptr( + new Tombstone(mask, index, ToSeconds(timestamp), timestamp)); } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) { return std::shared_ptr(new ExpiringColumn( mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl)); @@ -44,7 +43,7 @@ RowValue CreateTestRowValue( } RowValue CreateRowTombstone(int64_t timestamp) { - return RowValue(kLocalDeletionTime, timestamp); + return RowValue(ToSeconds(timestamp), timestamp); } void VerifyRowValueColumns( @@ -63,5 +62,8 @@ int64_t ToMicroSeconds(int64_t seconds) { return seconds * (int64_t)1000000; } +int32_t ToSeconds(int64_t microseconds) { + return (int32_t)(microseconds / (int64_t)1000000); +} } } diff --git a/utilities/cassandra/test_utils.h b/utilities/cassandra/test_utils.h index 463b12bf2..a65b34430 100644 --- a/utilities/cassandra/test_utils.h +++ b/utilities/cassandra/test_utils.h @@ -13,7 +13,6 @@ namespace rocksdb { namespace cassandra { extern const char kData[]; extern const char kExpiringData[]; -extern const int32_t kLocalDeletionTime; extern const int32_t kTtl; extern const int8_t kColumn; extern const int8_t kTombstone; @@ -38,6 +37,6 @@ void VerifyRowValueColumns( ); int64_t ToMicroSeconds(int64_t seconds); - +int32_t ToSeconds(int64_t microseconds); } } diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h index 72f805a86..602a4d01a 100644 --- a/utilities/merge_operators.h +++ b/utilities/merge_operators.h @@ -21,7 +21,6 @@ class MergeOperators { static std::shared_ptr CreateStringAppendOperator(); static std::shared_ptr CreateStringAppendTESTOperator(); static std::shared_ptr CreateMaxOperator(); - static std::shared_ptr CreateCassandraMergeOperator(); // Will return a different merge operator depending on the string. // TODO: Hook the "name" up to the actual Name() of the MergeOperators? @@ -39,8 +38,6 @@ class MergeOperators { return CreateStringAppendTESTOperator(); } else if (name == "max") { return CreateMaxOperator(); - } else if (name == "cassandra") { - return CreateCassandraMergeOperator(); } else { // Empty or unknown, just return nullptr return nullptr;