diff --git a/CMakeLists.txt b/CMakeLists.txt index 2d56c8d7d..1eb98b226 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -482,6 +482,9 @@ set(SOURCES utilities/blob_db/blob_log_reader.cc utilities/blob_db/blob_log_writer.cc utilities/blob_db/blob_log_format.cc + utilities/cassandra/cassandra_compaction_filter.cc + utilities/cassandra/format.cc + utilities/cassandra/merge_operator.cc utilities/checkpoint/checkpoint_impl.cc utilities/col_buf_decoder.cc utilities/col_buf_encoder.cc @@ -500,8 +503,6 @@ set(SOURCES utilities/memory/memory_util.cc utilities/merge_operators/max.cc utilities/merge_operators/put.cc - utilities/merge_operators/cassandra/format.cc - utilities/merge_operators/cassandra/merge_operator.cc utilities/merge_operators/string_append/stringappend.cc utilities/merge_operators/string_append/stringappend2.cc utilities/merge_operators/uint64add.cc @@ -705,6 +706,10 @@ set(TESTS util/thread_local_test.cc utilities/backupable/backupable_db_test.cc utilities/blob_db/blob_db_test.cc + utilities/cassandra/cassandra_functional_test.cc + utilities/cassandra/cassandra_format_test.cc + utilities/cassandra/cassandra_row_merge_test.cc + utilities/cassandra/cassandra_serialize_test.cc utilities/checkpoint/checkpoint_test.cc utilities/column_aware_encoding_test.cc utilities/date_tiered/date_tiered_test.cc @@ -713,10 +718,6 @@ set(TESTS utilities/geodb/geodb_test.cc utilities/lua/rocks_lua_test.cc utilities/memory/memory_test.cc - utilities/merge_operators/cassandra/cassandra_merge_test.cc - utilities/merge_operators/cassandra/cassandra_format_test.cc - utilities/merge_operators/cassandra/cassandra_row_merge_test.cc - utilities/merge_operators/cassandra/cassandra_serialize_test.cc utilities/merge_operators/string_append/stringappend_test.cc utilities/object_registry_test.cc utilities/option_change_migration/option_change_migration_test.cc @@ -757,7 +758,7 @@ set(TESTUTIL_SOURCE monitoring/thread_status_updater_debug.cc table/mock_table.cc util/fault_injection_test_env.cc - utilities/merge_operators/cassandra/test_utils.cc + utilities/cassandra/test_utils.cc ) # test utilities are only build in debug enable_testing() diff --git a/Makefile b/Makefile index 1b273224b..c40d741d7 100644 --- a/Makefile +++ b/Makefile @@ -405,7 +405,7 @@ TESTS = \ write_buffer_manager_test \ stringappend_test \ cassandra_format_test \ - cassandra_merge_test \ + cassandra_functional_test \ cassandra_row_merge_test \ cassandra_serialize_test \ ttl_test \ @@ -1000,16 +1000,16 @@ option_change_migration_test: utilities/option_change_migration/option_change_mi stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) -cassandra_format_test: utilities/merge_operators/cassandra/cassandra_format_test.o $(LIBOBJECTS) $(TESTHARNESS) +cassandra_format_test: utilities/cassandra/cassandra_format_test.o utilities/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) -cassandra_merge_test: utilities/merge_operators/cassandra/cassandra_merge_test.o utilities/merge_operators/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS) +cassandra_functional_test: utilities/cassandra/cassandra_functional_test.o utilities/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) -cassandra_row_merge_test: utilities/merge_operators/cassandra/cassandra_row_merge_test.o utilities/merge_operators/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS) +cassandra_row_merge_test: utilities/cassandra/cassandra_row_merge_test.o utilities/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) -cassandra_serialize_test: utilities/merge_operators/cassandra/cassandra_serialize_test.o $(LIBOBJECTS) $(TESTHARNESS) +cassandra_serialize_test: utilities/cassandra/cassandra_serialize_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) redis_test: utilities/redis/redis_lists_test.o $(LIBOBJECTS) $(TESTHARNESS) diff --git a/TARGETS b/TARGETS index 1bafb01ca..134bb5081 100644 --- a/TARGETS +++ b/TARGETS @@ -212,6 +212,9 @@ cpp_library( "utilities/blob_db/blob_log_reader.cc", "utilities/blob_db/blob_log_writer.cc", "utilities/blob_db/blob_log_format.cc", + "utilities/cassandra/cassandra_compaction_filter.cc", + "utilities/cassandra/format.cc", + "utilities/cassandra/merge_operator.cc", "utilities/checkpoint/checkpoint_impl.cc", "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", "utilities/convenience/info_log_finder.cc", @@ -226,8 +229,6 @@ cpp_library( "utilities/leveldb_options/leveldb_options.cc", "utilities/lua/rocks_lua_compaction_filter.cc", "utilities/memory/memory_util.cc", - "utilities/merge_operators/cassandra/format.cc", - "utilities/merge_operators/cassandra/merge_operator.cc", "utilities/merge_operators/max.cc", "utilities/merge_operators/put.cc", "utilities/merge_operators/string_append/stringappend.cc", @@ -275,7 +276,7 @@ cpp_library( "util/testharness.cc", "util/testutil.cc", "db/db_test_util.cc", - "utilities/merge_operators/cassandra/test_utils.cc", + "utilities/cassandra/test_utils.cc", "utilities/col_buf_encoder.cc", "utilities/col_buf_decoder.cc", "utilities/column_aware_encoding_util.cc", @@ -325,16 +326,16 @@ ROCKS_TESTS = [['arena_test', 'util/arena_test.cc', 'serial'], ['c_test', 'db/c_test.c', 'serial'], ['cache_test', 'cache/cache_test.cc', 'serial'], ['cassandra_format_test', - 'utilities/merge_operators/cassandra/cassandra_format_test.cc', + 'utilities/cassandra/cassandra_format_test.cc', 'serial'], - ['cassandra_merge_test', - 'utilities/merge_operators/cassandra/cassandra_merge_test.cc', + ['cassandra_functional_test', + 'utilities/cassandra/cassandra_functional_test.cc', 'serial'], ['cassandra_row_merge_test', - 'utilities/merge_operators/cassandra/cassandra_row_merge_test.cc', + 'utilities/cassandra/cassandra_row_merge_test.cc', 'serial'], ['cassandra_serialize_test', - 'utilities/merge_operators/cassandra/cassandra_serialize_test.cc', + 'utilities/cassandra/cassandra_serialize_test.cc', 'serial'], ['checkpoint_test', 'utilities/checkpoint/checkpoint_test.cc', 'serial'], ['cleanable_test', 'table/cleanable_test.cc', 'serial'], diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index 6a22cee26..a34cda6ca 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -24,6 +24,7 @@ set(JNI_NATIVE_SOURCES rocksjni/options.cc rocksjni/ratelimiterjni.cc rocksjni/remove_emptyvalue_compactionfilterjni.cc + rocksjni/cassandra_compactionfilterjni.cc rocksjni/restorejni.cc rocksjni/rocksdb_exception_test.cc rocksjni/rocksjni.cc @@ -55,6 +56,8 @@ set(NATIVE_JAVA_CLASSES org.rocksdb.BlockBasedTableConfig org.rocksdb.BloomFilter org.rocksdb.Cache + org.rocksdb.CassandraCompactionFilter + org.rocksdb.CassandraValueMergeOperator org.rocksdb.Checkpoint org.rocksdb.ClockCache org.rocksdb.ColumnFamilyHandle diff --git a/java/Makefile b/java/Makefile index 1210159af..b29447bd8 100644 --- a/java/Makefile +++ b/java/Makefile @@ -7,6 +7,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\ org.rocksdb.BloomFilter\ org.rocksdb.Checkpoint\ org.rocksdb.ClockCache\ + org.rocksdb.CassandraCompactionFilter\ org.rocksdb.CassandraValueMergeOperator\ org.rocksdb.ColumnFamilyHandle\ org.rocksdb.ColumnFamilyOptions\ diff --git a/java/rocksjni/cassandra_compactionfilterjni.cc b/java/rocksjni/cassandra_compactionfilterjni.cc new file mode 100644 index 000000000..9d77559ab --- /dev/null +++ b/java/rocksjni/cassandra_compactionfilterjni.cc @@ -0,0 +1,22 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include + +#include "include/org_rocksdb_CassandraCompactionFilter.h" +#include "utilities/cassandra/cassandra_compaction_filter.h" + +/* + * Class: org_rocksdb_CassandraCompactionFilter + * Method: createNewCassandraCompactionFilter0 + * Signature: ()J + */ +jlong Java_org_rocksdb_CassandraCompactionFilter_createNewCassandraCompactionFilter0( + JNIEnv* env, jclass jcls, jboolean purge_ttl_on_expiration) { + auto* compaction_filter = + new rocksdb::cassandra::CassandraCompactionFilter(purge_ttl_on_expiration); + // set the native handle to our native compaction filter + return reinterpret_cast(compaction_filter); +} diff --git a/java/rocksjni/cassandra_value_operator.cc b/java/rocksjni/cassandra_value_operator.cc index 889213b9c..6be661407 100644 --- a/java/rocksjni/cassandra_value_operator.cc +++ b/java/rocksjni/cassandra_value_operator.cc @@ -20,7 +20,7 @@ #include "rocksdb/table.h" #include "rocksdb/slice_transform.h" #include "rocksdb/merge_operator.h" -#include "utilities/merge_operators/cassandra/merge_operator.h" +#include "utilities/cassandra/merge_operator.h" /* * Class: org_rocksdb_CassandraValueMergeOperator diff --git a/java/src/main/java/org/rocksdb/CassandraCompactionFilter.java b/java/src/main/java/org/rocksdb/CassandraCompactionFilter.java new file mode 100644 index 000000000..05d9aabcf --- /dev/null +++ b/java/src/main/java/org/rocksdb/CassandraCompactionFilter.java @@ -0,0 +1,18 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +/** + * Just a Java wrapper around CassandraCompactionFilter implemented in C++ + */ +public class CassandraCompactionFilter + extends AbstractCompactionFilter { + public CassandraCompactionFilter(boolean purgeTtlOnExpiration) { + super(createNewCassandraCompactionFilter0(purgeTtlOnExpiration)); + } + + private native static long createNewCassandraCompactionFilter0(boolean purgeTtlOnExpiration); +} diff --git a/src.mk b/src.mk index 8250947f5..fb7f97939 100644 --- a/src.mk +++ b/src.mk @@ -159,6 +159,9 @@ LIB_SOURCES = \ utilities/blob_db/blob_log_reader.cc \ utilities/blob_db/blob_log_writer.cc \ utilities/blob_db/blob_log_format.cc \ + utilities/cassandra/cassandra_compaction_filter.cc \ + utilities/cassandra/format.cc \ + utilities/cassandra/merge_operator.cc \ utilities/checkpoint/checkpoint_impl.cc \ utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \ utilities/convenience/info_log_finder.cc \ @@ -173,8 +176,6 @@ LIB_SOURCES = \ utilities/leveldb_options/leveldb_options.cc \ utilities/lua/rocks_lua_compaction_filter.cc \ utilities/memory/memory_util.cc \ - utilities/merge_operators/cassandra/format.cc \ - utilities/merge_operators/cassandra/merge_operator.cc \ utilities/merge_operators/max.cc \ utilities/merge_operators/put.cc \ utilities/merge_operators/string_append/stringappend.cc \ @@ -225,7 +226,7 @@ TEST_LIB_SOURCES = \ util/testharness.cc \ util/testutil.cc \ db/db_test_util.cc \ - utilities/merge_operators/cassandra/test_utils.cc \ + utilities/cassandra/test_utils.cc \ MAIN_SOURCES = \ cache/cache_bench.cc \ @@ -329,6 +330,10 @@ MAIN_SOURCES = \ util/thread_local_test.cc \ utilities/backupable/backupable_db_test.cc \ utilities/blob_db/blob_db_test.cc \ + utilities/cassandra/cassandra_format_test.cc \ + utilities/cassandra/cassandra_functional_test.cc \ + utilities/cassandra/cassandra_row_merge_test.cc \ + utilities/cassandra/cassandra_serialize_test.cc \ utilities/checkpoint/checkpoint_test.cc \ utilities/column_aware_encoding_exp.cc \ utilities/column_aware_encoding_test.cc \ @@ -339,10 +344,6 @@ MAIN_SOURCES = \ utilities/lua/rocks_lua_test.cc \ utilities/memory/memory_test.cc \ utilities/merge_operators/string_append/stringappend_test.cc \ - utilities/merge_operators/cassandra/cassandra_merge_test.cc \ - utilities/merge_operators/cassandra/cassandra_format_test.cc \ - utilities/merge_operators/cassandra/cassandra_row_merge_test.cc \ - utilities/merge_operators/cassandra/cassandra_serialize_test.cc \ utilities/object_registry_test.cc \ utilities/option_change_migration/option_change_migration_test.cc \ utilities/options/options_util_test.cc \ @@ -379,6 +380,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/options.cc \ java/rocksjni/ratelimiterjni.cc \ java/rocksjni/remove_emptyvalue_compactionfilterjni.cc \ + java/rocksjni/cassandra_compactionfilterjni.cc \ java/rocksjni/restorejni.cc \ java/rocksjni/rocksjni.cc \ java/rocksjni/rocksdb_exception_test.cc \ diff --git a/utilities/cassandra/cassandra_compaction_filter.cc b/utilities/cassandra/cassandra_compaction_filter.cc new file mode 100644 index 000000000..e817972ee --- /dev/null +++ b/utilities/cassandra/cassandra_compaction_filter.cc @@ -0,0 +1,47 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/cassandra/cassandra_compaction_filter.h" +#include +#include "rocksdb/slice.h" +#include "utilities/cassandra/format.h" + + +namespace rocksdb { +namespace cassandra { + +const char* CassandraCompactionFilter::Name() const { + return "CassandraCompactionFilter"; +} + +CompactionFilter::Decision CassandraCompactionFilter::FilterV2( + int level, + const Slice& key, + ValueType value_type, + const Slice& existing_value, + std::string* new_value, + std::string* skip_until) const { + + bool value_changed = false; + RowValue row_value = RowValue::Deserialize( + existing_value.data(), existing_value.size()); + RowValue compacted = purge_ttl_on_expiration_ ? + row_value.PurgeTtl(&value_changed) : + row_value.ExpireTtl(&value_changed); + + if(compacted.Empty()) { + return Decision::kRemove; + } + + if (value_changed) { + compacted.Serialize(new_value); + return Decision::kChangeValue; + } + + return Decision::kKeep; +} + +} // namespace cassandra +} // namespace rocksdb diff --git a/utilities/cassandra/cassandra_compaction_filter.h b/utilities/cassandra/cassandra_compaction_filter.h new file mode 100644 index 000000000..c09b8e74a --- /dev/null +++ b/utilities/cassandra/cassandra_compaction_filter.h @@ -0,0 +1,39 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include +#include "rocksdb/compaction_filter.h" +#include "rocksdb/slice.h" + +namespace rocksdb { +namespace cassandra { + +/** + * Compaction filter for removing expired Cassandra data with ttl. + * If option `purge_ttl_on_expiration` is set to true, expired data + * will be directly purged. Otherwise expired data will be converted + * tombstones first, then be eventally removed after gc grace period. + * `purge_ttl_on_expiration` should only be on in the case all the + * writes have same ttl setting, otherwise it could bring old data back. + */ +class CassandraCompactionFilter : public CompactionFilter { +public: + explicit CassandraCompactionFilter(bool purge_ttl_on_expiration) + : purge_ttl_on_expiration_(purge_ttl_on_expiration) {} + + const char* Name() const override; + virtual Decision FilterV2(int level, + const Slice& key, + ValueType value_type, + const Slice& existing_value, + std::string* new_value, + std::string* skip_until) const override; + +private: + bool purge_ttl_on_expiration_; +}; +} // namespace cassandra +} // namespace rocksdb diff --git a/utilities/merge_operators/cassandra/cassandra_format_test.cc b/utilities/cassandra/cassandra_format_test.cc similarity index 80% rename from utilities/merge_operators/cassandra/cassandra_format_test.cc rename to utilities/cassandra/cassandra_format_test.cc index 866098a1b..0cf124d0c 100644 --- a/utilities/merge_operators/cassandra/cassandra_format_test.cc +++ b/utilities/cassandra/cassandra_format_test.cc @@ -2,14 +2,13 @@ // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -// This source code is also licensed under the GPLv2 license found in the -// COPYING file in the root directory of this source tree. #include #include #include "util/testharness.h" -#include "utilities/merge_operators/cassandra/format.h" -#include "utilities/merge_operators/cassandra/serialize.h" +#include "utilities/cassandra/format.h" +#include "utilities/cassandra/serialize.h" +#include "utilities/cassandra/test_utils.h" using namespace rocksdb::cassandra; @@ -46,7 +45,7 @@ TEST(ColumnTest, Column) { // Verify the deserialization. std::string saved_dest = dest; - std::unique_ptr c1 = Column::Deserialize(saved_dest.c_str(), 0); + std::shared_ptr c1 = Column::Deserialize(saved_dest.c_str(), 0); EXPECT_EQ(c1->Index(), index); EXPECT_EQ(c1->Timestamp(), timestamp); EXPECT_EQ(c1->Size(), 14 + sizeof(data)); @@ -58,7 +57,7 @@ TEST(ColumnTest, Column) { // Verify the ColumnBase::Deserialization. saved_dest = dest; - std::unique_ptr c2 = + std::shared_ptr c2 = ColumnBase::Deserialize(saved_dest.c_str(), c.Size()); c2->Serialize(&dest); EXPECT_EQ(dest.size(), 3 * c.Size()); @@ -101,7 +100,7 @@ TEST(ExpiringColumnTest, ExpiringColumn) { // Verify the deserialization. std::string saved_dest = dest; - std::unique_ptr c1 = + std::shared_ptr c1 = ExpiringColumn::Deserialize(saved_dest.c_str(), 0); EXPECT_EQ(c1->Index(), index); EXPECT_EQ(c1->Timestamp(), timestamp); @@ -114,7 +113,7 @@ TEST(ExpiringColumnTest, ExpiringColumn) { // Verify the ColumnBase::Deserialization. saved_dest = dest; - std::unique_ptr c2 = + std::shared_ptr c2 = ColumnBase::Deserialize(saved_dest.c_str(), c.Size()); c2->Serialize(&dest); EXPECT_EQ(dest.size(), 3 * c.Size()); @@ -151,7 +150,7 @@ TEST(TombstoneTest, Tombstone) { EXPECT_EQ(Deserialize(dest.c_str(), offset), marked_for_delete_at); // Verify the deserialization. - std::unique_ptr c1 = Tombstone::Deserialize(dest.c_str(), 0); + std::shared_ptr c1 = Tombstone::Deserialize(dest.c_str(), 0); EXPECT_EQ(c1->Index(), index); EXPECT_EQ(c1->Timestamp(), marked_for_delete_at); EXPECT_EQ(c1->Size(), 14); @@ -162,7 +161,7 @@ TEST(TombstoneTest, Tombstone) { std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0); // Verify the ColumnBase::Deserialization. - std::unique_ptr c2 = + std::shared_ptr c2 = ColumnBase::Deserialize(dest.c_str(), c.Size()); c2->Serialize(&dest); EXPECT_EQ(dest.size(), 3 * c.Size()); @@ -204,7 +203,7 @@ TEST(RowValueTest, RowTombstone) { } TEST(RowValueTest, RowWithColumns) { - std::vector> columns; + std::vector> columns; int64_t last_modified_time = 1494022807048; std::size_t columns_data_size = 0; @@ -212,7 +211,7 @@ TEST(RowValueTest, RowWithColumns) { int8_t e_index = 0; int64_t e_timestamp = 1494022807044; int32_t e_ttl = 3600; - columns.push_back(std::unique_ptr( + columns.push_back(std::shared_ptr( new ExpiringColumn(ColumnTypeMask::EXPIRATION_MASK, e_index, e_timestamp, sizeof(e_data), e_data, e_ttl))); columns_data_size += columns[0]->Size(); @@ -220,14 +219,14 @@ TEST(RowValueTest, RowWithColumns) { char c_data[4] = {'d', 'a', 't', 'a'}; int8_t c_index = 1; int64_t c_timestamp = 1494022807048; - columns.push_back(std::unique_ptr( + columns.push_back(std::shared_ptr( new Column(0, c_index, c_timestamp, sizeof(c_data), c_data))); columns_data_size += columns[1]->Size(); int8_t t_index = 2; int32_t t_local_deletion_time = 1494022801; int64_t t_marked_for_delete_at = 1494022807043; - columns.push_back(std::unique_ptr( + columns.push_back(std::shared_ptr( new Tombstone(ColumnTypeMask::DELETION_MASK, t_index, t_local_deletion_time, t_marked_for_delete_at))); columns_data_size += columns[2]->Size(); @@ -301,6 +300,50 @@ TEST(RowValueTest, RowWithColumns) { std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0); } +TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired) { + int64_t now = time(nullptr); + + auto row_value = CreateTestRowValue({ + std::make_tuple(kColumn, 0, ToMicroSeconds(now)), + std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired + std::make_tuple(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired + std::make_tuple(kTombstone, 3, ToMicroSeconds(now)) + }); + + bool changed = false; + auto purged = row_value.PurgeTtl(&changed); + EXPECT_TRUE(changed); + EXPECT_EQ(purged.columns_.size(), 3); + VerifyRowValueColumns(purged.columns_, 0, kColumn, 0, ToMicroSeconds(now)); + VerifyRowValueColumns(purged.columns_, 1, kExpiringColumn, 2, ToMicroSeconds(now)); + VerifyRowValueColumns(purged.columns_, 2, kTombstone, 3, ToMicroSeconds(now)); + + purged.PurgeTtl(&changed); + EXPECT_FALSE(changed); +} + +TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) { + int64_t now = time(nullptr); + + auto row_value = CreateTestRowValue({ + std::make_tuple(kColumn, 0, ToMicroSeconds(now)), + std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired + std::make_tuple(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired + std::make_tuple(kTombstone, 3, ToMicroSeconds(now)) + }); + + bool changed = false; + auto compacted = row_value.ExpireTtl(&changed); + EXPECT_TRUE(changed); + EXPECT_EQ(compacted.columns_.size(), 4); + VerifyRowValueColumns(compacted.columns_, 0, kColumn, 0, ToMicroSeconds(now)); + VerifyRowValueColumns(compacted.columns_, 1, kTombstone, 1, ToMicroSeconds(now - 10)); + VerifyRowValueColumns(compacted.columns_, 2, kExpiringColumn, 2, ToMicroSeconds(now)); + VerifyRowValueColumns(compacted.columns_, 3, kTombstone, 3, ToMicroSeconds(now)); + + compacted.ExpireTtl(&changed); + EXPECT_FALSE(changed); +} } // namespace cassandra } // namespace rocksdb diff --git a/utilities/cassandra/cassandra_functional_test.cc b/utilities/cassandra/cassandra_functional_test.cc new file mode 100644 index 000000000..0c02228a7 --- /dev/null +++ b/utilities/cassandra/cassandra_functional_test.cc @@ -0,0 +1,251 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include "rocksdb/db.h" +#include "db/db_impl.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/utilities/db_ttl.h" +#include "util/testharness.h" +#include "util/random.h" +#include "utilities/merge_operators.h" +#include "utilities/cassandra/cassandra_compaction_filter.h" +#include "utilities/cassandra/merge_operator.h" +#include "utilities/cassandra/test_utils.h" + +using namespace rocksdb; + +namespace rocksdb { +namespace cassandra { + +// Path to the database on file system +const std::string kDbName = test::TmpDir() + "/cassandra_functional_test"; + +class CassandraStore { + public: + explicit CassandraStore(std::shared_ptr db) + : db_(db), + merge_option_(), + get_option_() { + assert(db); + } + + bool Append(const std::string& key, const RowValue& val){ + std::string result; + val.Serialize(&result); + Slice valSlice(result.data(), result.size()); + auto s = db_->Merge(merge_option_, key, valSlice); + + if (s.ok()) { + return true; + } else { + std::cerr << "ERROR " << s.ToString() << std::endl; + return false; + } + } + + void Flush() { + dbfull()->TEST_FlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + + void Compact() { + dbfull()->TEST_CompactRange( + 0, nullptr, nullptr, db_->DefaultColumnFamily()); + } + + std::tuple Get(const std::string& key){ + std::string result; + auto s = db_->Get(get_option_, key, &result); + + if (s.ok()) { + return std::make_tuple(true, + RowValue::Deserialize(result.data(), + result.size())); + } + + if (!s.IsNotFound()) { + std::cerr << "ERROR " << s.ToString() << std::endl; + } + + return std::make_tuple(false, RowValue(0, 0)); + } + + private: + std::shared_ptr db_; + WriteOptions merge_option_; + ReadOptions get_option_; + + DBImpl* dbfull() { return reinterpret_cast(db_.get()); } + +}; + +class TestCompactionFilterFactory : public CompactionFilterFactory { +public: + explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration) + : purge_ttl_on_expiration_(purge_ttl_on_expiration) {} + + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { + return unique_ptr(new CassandraCompactionFilter(purge_ttl_on_expiration_)); + } + + virtual const char* Name() const override { + return "TestCompactionFilterFactory"; + } + +private: + bool purge_ttl_on_expiration_; +}; + + +// The class for unit-testing +class CassandraFunctionalTest : public testing::Test { +public: + CassandraFunctionalTest() { + DestroyDB(kDbName, Options()); // Start each test with a fresh DB + } + + std::shared_ptr OpenDb() { + DB* db; + Options options; + options.create_if_missing = true; + options.merge_operator.reset(new CassandraValueMergeOperator()); + auto* cf_factory = new TestCompactionFilterFactory(purge_ttl_on_expiration_); + options.compaction_filter_factory.reset(cf_factory); + EXPECT_OK(DB::Open(options, kDbName, &db)); + return std::shared_ptr(db); + } + + bool purge_ttl_on_expiration_ = false; +}; + +// THE TEST CASES BEGIN HERE + +TEST_F(CassandraFunctionalTest, SimpleMergeTest) { + CassandraStore store(OpenDb()); + + store.Append("k1", CreateTestRowValue({ + std::make_tuple(kTombstone, 0, 5), + std::make_tuple(kColumn, 1, 8), + std::make_tuple(kExpiringColumn, 2, 5), + })); + store.Append("k1",CreateTestRowValue({ + std::make_tuple(kColumn, 0, 2), + std::make_tuple(kExpiringColumn, 1, 5), + std::make_tuple(kTombstone, 2, 7), + std::make_tuple(kExpiringColumn, 7, 17), + })); + store.Append("k1", CreateTestRowValue({ + std::make_tuple(kExpiringColumn, 0, 6), + std::make_tuple(kTombstone, 1, 5), + std::make_tuple(kColumn, 2, 4), + std::make_tuple(kTombstone, 11, 11), + })); + + auto ret = store.Get("k1"); + + ASSERT_TRUE(std::get<0>(ret)); + RowValue& merged = std::get<1>(ret); + EXPECT_EQ(merged.columns_.size(), 5); + VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6); + VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8); + VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7); + VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17); + VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11); +} + +TEST_F(CassandraFunctionalTest, + CompactionShouldConvertExpiredColumnsToTombstone) { + CassandraStore store(OpenDb()); + int64_t now= time(nullptr); + + store.Append("k1", CreateTestRowValue({ + std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired + std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired + std::make_tuple(kTombstone, 3, ToMicroSeconds(now)) + })); + + store.Flush(); + + store.Append("k1",CreateTestRowValue({ + std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired + std::make_tuple(kColumn, 2, ToMicroSeconds(now)) + })); + + store.Flush(); + store.Compact(); + + auto ret = store.Get("k1"); + ASSERT_TRUE(std::get<0>(ret)); + RowValue& merged = std::get<1>(ret); + EXPECT_EQ(merged.columns_.size(), 4); + VerifyRowValueColumns(merged.columns_, 0, kTombstone, 0, ToMicroSeconds(now - 10)); + VerifyRowValueColumns(merged.columns_, 1, kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)); + VerifyRowValueColumns(merged.columns_, 2, kColumn, 2, ToMicroSeconds(now)); + VerifyRowValueColumns(merged.columns_, 3, kTombstone, 3, ToMicroSeconds(now)); +} + + +TEST_F(CassandraFunctionalTest, + CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) { + purge_ttl_on_expiration_ = true; + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append("k1", CreateTestRowValue({ + std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired + std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired + std::make_tuple(kTombstone, 3, ToMicroSeconds(now)) + })); + + store.Flush(); + + store.Append("k1",CreateTestRowValue({ + std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired + std::make_tuple(kColumn, 2, ToMicroSeconds(now)) + })); + + store.Flush(); + store.Compact(); + + auto ret = store.Get("k1"); + ASSERT_TRUE(std::get<0>(ret)); + RowValue& merged = std::get<1>(ret); + EXPECT_EQ(merged.columns_.size(), 3); + VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 1, ToMicroSeconds(now)); + VerifyRowValueColumns(merged.columns_, 1, kColumn, 2, ToMicroSeconds(now)); + VerifyRowValueColumns(merged.columns_, 2, kTombstone, 3, ToMicroSeconds(now)); +} + +TEST_F(CassandraFunctionalTest, + CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) { + purge_ttl_on_expiration_ = true; + CassandraStore store(OpenDb()); + int64_t now = time(nullptr); + + store.Append("k1", CreateTestRowValue({ + std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), + std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)), + })); + + store.Flush(); + + store.Append("k1",CreateTestRowValue({ + std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), + })); + + store.Flush(); + store.Compact(); + ASSERT_FALSE(std::get<0>(store.Get("k1"))); +} + +} // namespace cassandra +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/utilities/merge_operators/cassandra/cassandra_row_merge_test.cc b/utilities/cassandra/cassandra_row_merge_test.cc similarity index 92% rename from utilities/merge_operators/cassandra/cassandra_row_merge_test.cc rename to utilities/cassandra/cassandra_row_merge_test.cc index 76d112c7b..78c7d8e57 100644 --- a/utilities/merge_operators/cassandra/cassandra_row_merge_test.cc +++ b/utilities/cassandra/cassandra_row_merge_test.cc @@ -2,13 +2,11 @@ // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -// This source code is also licensed under the GPLv2 license found in the -// COPYING file in the root directory of this source tree. #include #include "util/testharness.h" -#include "utilities/merge_operators/cassandra/format.h" -#include "utilities/merge_operators/cassandra/test_utils.h" +#include "utilities/cassandra/format.h" +#include "utilities/cassandra/test_utils.h" namespace rocksdb { namespace cassandra { diff --git a/utilities/merge_operators/cassandra/cassandra_serialize_test.cc b/utilities/cassandra/cassandra_serialize_test.cc similarity index 96% rename from utilities/merge_operators/cassandra/cassandra_serialize_test.cc rename to utilities/cassandra/cassandra_serialize_test.cc index 978878b64..68d2c163d 100644 --- a/utilities/merge_operators/cassandra/cassandra_serialize_test.cc +++ b/utilities/cassandra/cassandra_serialize_test.cc @@ -2,11 +2,9 @@ // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -// This source code is also licensed under the GPLv2 license found in the -// COPYING file in the root directory of this source tree. #include "util/testharness.h" -#include "utilities/merge_operators/cassandra/serialize.h" +#include "utilities/cassandra/serialize.h" using namespace rocksdb::cassandra; diff --git a/utilities/merge_operators/cassandra/format.cc b/utilities/cassandra/format.cc similarity index 75% rename from utilities/merge_operators/cassandra/format.cc rename to utilities/cassandra/format.cc index 01eff67e3..2b096cdbb 100644 --- a/utilities/merge_operators/cassandra/format.cc +++ b/utilities/cassandra/format.cc @@ -2,8 +2,6 @@ // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -// This source code is also licensed under the GPLv2 license found in the -// COPYING file in the root directory of this source tree. #include "format.h" @@ -11,7 +9,7 @@ #include #include -#include "utilities/merge_operators/cassandra/serialize.h" +#include "utilities/cassandra/serialize.h" namespace rocksdb { namespace cassandra { @@ -42,7 +40,7 @@ void ColumnBase::Serialize(std::string* dest) const { rocksdb::cassandra::Serialize(index_, dest); } -std::unique_ptr ColumnBase::Deserialize(const char* src, +std::shared_ptr ColumnBase::Deserialize(const char* src, std::size_t offset) { int8_t mask = rocksdb::cassandra::Deserialize(src, offset); if ((mask & ColumnTypeMask::DELETION_MASK) != 0) { @@ -79,7 +77,7 @@ void Column::Serialize(std::string* dest) const { dest->append(value_, value_size_); } -std::unique_ptr Column::Deserialize(const char *src, +std::shared_ptr Column::Deserialize(const char *src, std::size_t offset) { int8_t mask = rocksdb::cassandra::Deserialize(src, offset); offset += sizeof(mask); @@ -89,8 +87,8 @@ std::unique_ptr Column::Deserialize(const char *src, offset += sizeof(timestamp); int32_t value_size = rocksdb::cassandra::Deserialize(src, offset); offset += sizeof(value_size); - return std::unique_ptr( - new Column(mask, index, timestamp, value_size, src + offset)); + return std::make_shared( + mask, index, timestamp, value_size, src + offset); } ExpiringColumn::ExpiringColumn( @@ -112,7 +110,32 @@ void ExpiringColumn::Serialize(std::string* dest) const { rocksdb::cassandra::Serialize(ttl_, dest); } -std::unique_ptr ExpiringColumn::Deserialize( +std::chrono::time_point ExpiringColumn::TimePoint() const { + return std::chrono::time_point(std::chrono::microseconds(Timestamp())); +} + +std::chrono::seconds ExpiringColumn::Ttl() const { + return std::chrono::seconds(ttl_); +} + +bool ExpiringColumn::Expired() const { + return TimePoint() + Ttl() < std::chrono::system_clock::now(); +} + +std::shared_ptr ExpiringColumn::ToTombstone() const { + auto expired_at = (TimePoint() + Ttl()).time_since_epoch(); + int32_t local_deletion_time = static_cast( + std::chrono::duration_cast(expired_at).count()); + int64_t marked_for_delete_at = + std::chrono::duration_cast(expired_at).count(); + return std::make_shared( + ColumnTypeMask::DELETION_MASK, + Index(), + local_deletion_time, + marked_for_delete_at); +} + +std::shared_ptr ExpiringColumn::Deserialize( const char *src, std::size_t offset) { int8_t mask = rocksdb::cassandra::Deserialize(src, offset); @@ -126,8 +149,8 @@ std::unique_ptr ExpiringColumn::Deserialize( const char* value = src + offset; offset += value_size; int32_t ttl = rocksdb::cassandra::Deserialize(src, offset); - return std::unique_ptr( - new ExpiringColumn(mask, index, timestamp, value_size, value, ttl)); + return std::make_shared( + mask, index, timestamp, value_size, value, ttl); } Tombstone::Tombstone( @@ -153,7 +176,7 @@ void Tombstone::Serialize(std::string* dest) const { rocksdb::cassandra::Serialize(marked_for_delete_at_, dest); } -std::unique_ptr Tombstone::Deserialize(const char *src, +std::shared_ptr Tombstone::Deserialize(const char *src, std::size_t offset) { int8_t mask = rocksdb::cassandra::Deserialize(src, offset); offset += sizeof(mask); @@ -164,8 +187,8 @@ std::unique_ptr Tombstone::Deserialize(const char *src, offset += sizeof(int32_t); int64_t marked_for_delete_at = rocksdb::cassandra::Deserialize(src, offset); - return std::unique_ptr( - new Tombstone(mask, index, local_deletion_time, marked_for_delete_at)); + return std::make_shared( + mask, index, local_deletion_time, marked_for_delete_at); } RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at) @@ -173,7 +196,7 @@ RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at) marked_for_delete_at_(marked_for_delete_at), columns_(), last_modified_time_(0) {} -RowValue::RowValue(std::vector> columns, +RowValue::RowValue(Columns columns, int64_t last_modified_time) : local_deletion_time_(kDefaultLocalDeletionTime), marked_for_delete_at_(kDefaultMarkedForDeleteAt), @@ -208,6 +231,49 @@ void RowValue::Serialize(std::string* dest) const { } } +RowValue RowValue::PurgeTtl(bool* changed) const { + *changed = false; + Columns new_columns; + for (auto& column : columns_) { + if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) { + std::shared_ptr expiring_column = + std::static_pointer_cast(column); + + if(expiring_column->Expired()){ + *changed = true; + continue; + } + } + + new_columns.push_back(column); + } + return RowValue(std::move(new_columns), last_modified_time_); +} + +RowValue RowValue::ExpireTtl(bool* changed) const { + *changed = false; + Columns new_columns; + for (auto& column : columns_) { + if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) { + std::shared_ptr expiring_column = + std::static_pointer_cast(column); + + if(expiring_column->Expired()) { + shared_ptr tombstone = expiring_column->ToTombstone(); + new_columns.push_back(tombstone); + *changed = true; + continue; + } + } + new_columns.push_back(column); + } + return RowValue(std::move(new_columns), last_modified_time_); +} + +bool RowValue::Empty() const { + return columns_.empty(); +} + RowValue RowValue::Deserialize(const char *src, std::size_t size) { std::size_t offset = 0; assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_)); @@ -223,7 +289,7 @@ RowValue RowValue::Deserialize(const char *src, std::size_t size) { assert(local_deletion_time == kDefaultLocalDeletionTime); assert(marked_for_delete_at == kDefaultMarkedForDeleteAt); - std::vector> columns; + Columns columns; int64_t last_modified_time = 0; while (offset < size) { auto c = ColumnBase::Deserialize(src, offset); @@ -254,7 +320,7 @@ RowValue RowValue::Merge(std::vector&& values) { return r1.LastModifiedTime() > r2.LastModifiedTime(); }); - std::map> merged_columns; + std::map> merged_columns; int64_t tombstone_timestamp = 0; for (auto& value : values) { @@ -268,17 +334,17 @@ RowValue RowValue::Merge(std::vector&& values) { for (auto& column : value.columns_) { int8_t index = column->Index(); if (merged_columns.find(index) == merged_columns.end()) { - merged_columns[index] = std::move(column); + merged_columns[index] = column; } else { if (column->Timestamp() > merged_columns[index]->Timestamp()) { - merged_columns[index] = std::move(column); + merged_columns[index] = column; } } } } int64_t last_modified_time = 0; - std::vector> columns; + Columns columns; for (auto& pair: merged_columns) { // For some row, its last_modified_time > row tombstone_timestamp, but // it might have rows whose timestamp is ealier than tombstone, so we diff --git a/utilities/merge_operators/cassandra/format.h b/utilities/cassandra/format.h similarity index 80% rename from utilities/merge_operators/cassandra/format.h rename to utilities/cassandra/format.h index 0ffd9a5bb..d8f51df14 100644 --- a/utilities/merge_operators/cassandra/format.h +++ b/utilities/cassandra/format.h @@ -2,8 +2,6 @@ // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -// This source code is also licensed under the GPLv2 license found in the -// COPYING file in the root directory of this source tree. /** * The encoding of Cassandra Row Value. @@ -57,6 +55,7 @@ */ #pragma once +#include #include #include #include "rocksdb/merge_operator.h" @@ -72,6 +71,7 @@ enum ColumnTypeMask { EXPIRATION_MASK = 0x02, }; + class ColumnBase { public: ColumnBase(int8_t mask, int8_t index); @@ -82,8 +82,7 @@ public: virtual int8_t Index() const; virtual std::size_t Size() const; virtual void Serialize(std::string* dest) const; - - static std::unique_ptr Deserialize(const char* src, + static std::shared_ptr Deserialize(const char* src, std::size_t offset); private: @@ -99,8 +98,7 @@ public: virtual int64_t Timestamp() const override; virtual std::size_t Size() const override; virtual void Serialize(std::string* dest) const override; - - static std::unique_ptr Deserialize(const char* src, + static std::shared_ptr Deserialize(const char* src, std::size_t offset); private: @@ -109,44 +107,50 @@ private: const char* value_; }; -class ExpiringColumn : public Column { +class Tombstone : public ColumnBase { public: - ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp, - int32_t value_size, const char* value, int32_t ttl); + Tombstone(int8_t mask, int8_t index, + int32_t local_deletion_time, int64_t marked_for_delete_at); + virtual int64_t Timestamp() const override; virtual std::size_t Size() const override; virtual void Serialize(std::string* dest) const override; - static std::unique_ptr Deserialize(const char* src, - std::size_t offset); + static std::shared_ptr Deserialize(const char* src, + std::size_t offset); private: - int32_t ttl_; + int32_t local_deletion_time_; + int64_t marked_for_delete_at_; }; -class Tombstone : public ColumnBase { +class ExpiringColumn : public Column { public: - Tombstone(int8_t mask, int8_t index, - int32_t local_deletion_time, int64_t marked_for_delete_at); + ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp, + int32_t value_size, const char* value, int32_t ttl); - virtual int64_t Timestamp() const override; virtual std::size_t Size() const override; virtual void Serialize(std::string* dest) const override; + bool Expired() const; + std::shared_ptr ToTombstone() const; - static std::unique_ptr Deserialize(const char* src, - std::size_t offset); + static std::shared_ptr Deserialize(const char* src, + std::size_t offset); private: - int32_t local_deletion_time_; - int64_t marked_for_delete_at_; + int32_t ttl_; + std::chrono::time_point TimePoint() const; + std::chrono::seconds Ttl() const; }; +typedef std::vector> Columns; + class RowValue { public: // Create a Row Tombstone. RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at); // Create a Row containing columns. - RowValue(std::vector> columns, + RowValue(Columns columns, int64_t last_modified_time); RowValue(const RowValue& that) = delete; RowValue(RowValue&& that) noexcept = default; @@ -159,6 +163,9 @@ public: // otherwise it returns the max timestamp of containing columns. int64_t LastModifiedTime() const; void Serialize(std::string* dest) const; + RowValue PurgeTtl(bool* changed) const; + RowValue ExpireTtl(bool* changed) const; + bool Empty() const; static RowValue Deserialize(const char* src, std::size_t size); // Merge multiple rows according to their timestamp. @@ -167,12 +174,20 @@ public: private: int32_t local_deletion_time_; int64_t marked_for_delete_at_; - std::vector> columns_; + Columns columns_; int64_t last_modified_time_; + FRIEND_TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired); + FRIEND_TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones); FRIEND_TEST(RowValueMergeTest, Merge); FRIEND_TEST(RowValueMergeTest, MergeWithRowTombstone); - FRIEND_TEST(CassandraMergeTest, SimpleTest); + FRIEND_TEST(CassandraFunctionalTest, SimpleMergeTest); + FRIEND_TEST( + CassandraFunctionalTest, CompactionShouldConvertExpiredColumnsToTombstone); + FRIEND_TEST( + CassandraFunctionalTest, CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn); + FRIEND_TEST( + CassandraFunctionalTest, CompactionShouldRemoveRowWhenAllColumnExpiredIfPurgeTtlIsOn); }; } // namepsace cassandrda diff --git a/utilities/merge_operators/cassandra/merge_operator.cc b/utilities/cassandra/merge_operator.cc similarity index 98% rename from utilities/merge_operators/cassandra/merge_operator.cc rename to utilities/cassandra/merge_operator.cc index 03b4ec2e3..75817a78b 100644 --- a/utilities/merge_operators/cassandra/merge_operator.cc +++ b/utilities/cassandra/merge_operator.cc @@ -13,7 +13,7 @@ #include "rocksdb/slice.h" #include "rocksdb/merge_operator.h" #include "utilities/merge_operators.h" -#include "utilities/merge_operators/cassandra/format.h" +#include "utilities/cassandra/format.h" namespace rocksdb { namespace cassandra { diff --git a/utilities/merge_operators/cassandra/merge_operator.h b/utilities/cassandra/merge_operator.h similarity index 100% rename from utilities/merge_operators/cassandra/merge_operator.h rename to utilities/cassandra/merge_operator.h diff --git a/utilities/merge_operators/cassandra/serialize.h b/utilities/cassandra/serialize.h similarity index 100% rename from utilities/merge_operators/cassandra/serialize.h rename to utilities/cassandra/serialize.h diff --git a/utilities/merge_operators/cassandra/test_utils.cc b/utilities/cassandra/test_utils.cc similarity index 73% rename from utilities/merge_operators/cassandra/test_utils.cc rename to utilities/cassandra/test_utils.cc index 91b9e6349..68d0381e0 100644 --- a/utilities/merge_operators/cassandra/test_utils.cc +++ b/utilities/cassandra/test_utils.cc @@ -1,7 +1,7 @@ // Copyright (c) 2017-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. // This source code is also licensed under the GPLv2 license found in the // COPYING file in the root directory of this source tree. @@ -12,29 +12,29 @@ namespace cassandra { const char kData[] = {'d', 'a', 't', 'a'}; const char kExpiringData[] = {'e', 'd', 'a', 't', 'a'}; const int32_t kLocalDeletionTime = 1; -const int32_t kTtl = 100; +const int32_t kTtl = 86400; const int8_t kColumn = 0; const int8_t kTombstone = 1; const int8_t kExpiringColumn = 2; -std::unique_ptr CreateTestColumn(int8_t mask, +std::shared_ptr CreateTestColumn(int8_t mask, int8_t index, int64_t timestamp) { if ((mask & ColumnTypeMask::DELETION_MASK) != 0) { - return std::unique_ptr(new Tombstone( + return std::shared_ptr(new Tombstone( mask, index, kLocalDeletionTime, timestamp)); } else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) { - return std::unique_ptr(new ExpiringColumn( + return std::shared_ptr(new ExpiringColumn( mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl)); } else { - return std::unique_ptr( + return std::shared_ptr( new Column(mask, index, timestamp, sizeof(kData), kData)); } } RowValue CreateTestRowValue( std::vector> column_specs) { - std::vector> columns; + std::vector> columns; int64_t last_modified_time = 0; for (auto spec: column_specs) { auto c = CreateTestColumn(std::get<0>(spec), std::get<1>(spec), @@ -50,7 +50,7 @@ RowValue CreateRowTombstone(int64_t timestamp) { } void VerifyRowValueColumns( - std::vector> &columns, + std::vector> &columns, std::size_t index_of_vector, int8_t expected_mask, int8_t expected_index, @@ -61,5 +61,9 @@ void VerifyRowValueColumns( EXPECT_EQ(expected_index, columns[index_of_vector]->Index()); } +int64_t ToMicroSeconds(int64_t seconds) { + return seconds * (int64_t)1000000; +} + } } diff --git a/utilities/merge_operators/cassandra/test_utils.h b/utilities/cassandra/test_utils.h similarity index 82% rename from utilities/merge_operators/cassandra/test_utils.h rename to utilities/cassandra/test_utils.h index 4025b2a3f..7ca6cfd61 100644 --- a/utilities/merge_operators/cassandra/test_utils.h +++ b/utilities/cassandra/test_utils.h @@ -8,8 +8,8 @@ #pragma once #include #include "util/testharness.h" -#include "utilities/merge_operators/cassandra/format.h" -#include "utilities/merge_operators/cassandra/serialize.h" +#include "utilities/cassandra/format.h" +#include "utilities/cassandra/serialize.h" namespace rocksdb { namespace cassandra { @@ -22,7 +22,7 @@ extern const int8_t kTombstone; extern const int8_t kExpiringColumn; -std::unique_ptr CreateTestColumn(int8_t mask, +std::shared_ptr CreateTestColumn(int8_t mask, int8_t index, int64_t timestamp); @@ -32,12 +32,14 @@ RowValue CreateTestRowValue( RowValue CreateRowTombstone(int64_t timestamp); void VerifyRowValueColumns( - std::vector> &columns, + std::vector> &columns, std::size_t index_of_vector, int8_t expected_mask, int8_t expected_index, int64_t expected_timestamp ); +int64_t ToMicroSeconds(int64_t seconds); + } } diff --git a/utilities/merge_operators/cassandra/cassandra_merge_test.cc b/utilities/merge_operators/cassandra/cassandra_merge_test.cc deleted file mode 100644 index 84886161e..000000000 --- a/utilities/merge_operators/cassandra/cassandra_merge_test.cc +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). -// This source code is also licensed under the GPLv2 license found in the -// COPYING file in the root directory of this source tree. - -#include - -#include "rocksdb/db.h" -#include "rocksdb/merge_operator.h" -#include "rocksdb/utilities/db_ttl.h" -#include "util/testharness.h" -#include "util/random.h" -#include "utilities/merge_operators.h" -#include "utilities/merge_operators/cassandra/merge_operator.h" -#include "utilities/merge_operators/cassandra/test_utils.h" - -using namespace rocksdb; - -namespace rocksdb { -namespace cassandra { - -// Path to the database on file system -const std::string kDbName = test::TmpDir() + "/cassandramerge_test"; - -class CassandraStore { - public: - explicit CassandraStore(std::shared_ptr db) - : db_(db), - merge_option_(), - get_option_() { - assert(db); - } - - bool Append(const std::string& key, const RowValue& val){ - std::string result; - val.Serialize(&result); - Slice valSlice(result.data(), result.size()); - auto s = db_->Merge(merge_option_, key, valSlice); - - if (s.ok()) { - return true; - } else { - std::cerr << "ERROR " << s.ToString() << std::endl; - return false; - } - } - - std::tuple Get(const std::string& key){ - std::string result; - auto s = db_->Get(get_option_, key, &result); - - if (s.ok()) { - return std::make_tuple(true, - RowValue::Deserialize(result.data(), - result.size())); - } - - if (!s.IsNotFound()) { - std::cerr << "ERROR " << s.ToString() << std::endl; - } - - return std::make_tuple(false, RowValue(0, 0)); - } - - private: - std::shared_ptr db_; - WriteOptions merge_option_; - ReadOptions get_option_; -}; - - -// The class for unit-testing -class CassandraMergeTest : public testing::Test { - public: - CassandraMergeTest() { - DestroyDB(kDbName, Options()); // Start each test with a fresh DB - } - - std::shared_ptr OpenDb() { - DB* db; - Options options; - options.create_if_missing = true; - options.merge_operator.reset(new CassandraValueMergeOperator()); - EXPECT_OK(DB::Open(options, kDbName, &db)); - return std::shared_ptr(db); - } -}; - -// THE TEST CASES BEGIN HERE - -TEST_F(CassandraMergeTest, SimpleTest) { - auto db = OpenDb(); - CassandraStore store(db); - - store.Append("k1", CreateTestRowValue({ - std::make_tuple(kTombstone, 0, 5), - std::make_tuple(kColumn, 1, 8), - std::make_tuple(kExpiringColumn, 2, 5), - })); - store.Append("k1",CreateTestRowValue({ - std::make_tuple(kColumn, 0, 2), - std::make_tuple(kExpiringColumn, 1, 5), - std::make_tuple(kTombstone, 2, 7), - std::make_tuple(kExpiringColumn, 7, 17), - })); - store.Append("k1", CreateTestRowValue({ - std::make_tuple(kExpiringColumn, 0, 6), - std::make_tuple(kTombstone, 1, 5), - std::make_tuple(kColumn, 2, 4), - std::make_tuple(kTombstone, 11, 11), - })); - - auto ret = store.Get("k1"); - - ASSERT_TRUE(std::get<0>(ret)); - RowValue& merged = std::get<1>(ret); - EXPECT_EQ(merged.columns_.size(), 5); - VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6); - VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8); - VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7); - VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17); - VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11); -} - - -} // namespace cassandra -} // namespace rocksdb - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -}