Cassandra compaction filter for purge expired columns and rows

Summary:
Major changes in this PR:
* Implement CassandraCompactionFilter to remove expired columns and rows (if all column expired)
* Move cassandra related code from utilities/merge_operators/cassandra to utilities/cassandra/*
* Switch to use shared_ptr<> from uniqu_ptr for Column membership management in RowValue. Since columns do have multiple owners in Merge and GC process, use shared_ptr helps make RowValue immutable.
* Rename cassandra_merge_test to cassandra_functional_test and add two TTL compaction related tests there.
Closes https://github.com/facebook/rocksdb/pull/2588

Differential Revision: D5430010

Pulled By: wpc

fbshipit-source-id: 9566c21e06de17491d486a68c70f52d501f27687
main
Pengchao Wang 7 years ago committed by Facebook Github Bot
parent 63163a8c6e
commit 534c255c7a
  1. 15
      CMakeLists.txt
  2. 10
      Makefile
  3. 17
      TARGETS
  4. 3
      java/CMakeLists.txt
  5. 1
      java/Makefile
  6. 22
      java/rocksjni/cassandra_compactionfilterjni.cc
  7. 2
      java/rocksjni/cassandra_value_operator.cc
  8. 18
      java/src/main/java/org/rocksdb/CassandraCompactionFilter.java
  9. 16
      src.mk
  10. 47
      utilities/cassandra/cassandra_compaction_filter.cc
  11. 39
      utilities/cassandra/cassandra_compaction_filter.h
  12. 71
      utilities/cassandra/cassandra_format_test.cc
  13. 251
      utilities/cassandra/cassandra_functional_test.cc
  14. 6
      utilities/cassandra/cassandra_row_merge_test.cc
  15. 4
      utilities/cassandra/cassandra_serialize_test.cc
  16. 104
      utilities/cassandra/format.cc
  17. 61
      utilities/cassandra/format.h
  18. 2
      utilities/cassandra/merge_operator.cc
  19. 0
      utilities/cassandra/merge_operator.h
  20. 0
      utilities/cassandra/serialize.h
  21. 24
      utilities/cassandra/test_utils.cc
  22. 10
      utilities/cassandra/test_utils.h
  23. 134
      utilities/merge_operators/cassandra/cassandra_merge_test.cc

@ -482,6 +482,9 @@ set(SOURCES
utilities/blob_db/blob_log_reader.cc
utilities/blob_db/blob_log_writer.cc
utilities/blob_db/blob_log_format.cc
utilities/cassandra/cassandra_compaction_filter.cc
utilities/cassandra/format.cc
utilities/cassandra/merge_operator.cc
utilities/checkpoint/checkpoint_impl.cc
utilities/col_buf_decoder.cc
utilities/col_buf_encoder.cc
@ -500,8 +503,6 @@ set(SOURCES
utilities/memory/memory_util.cc
utilities/merge_operators/max.cc
utilities/merge_operators/put.cc
utilities/merge_operators/cassandra/format.cc
utilities/merge_operators/cassandra/merge_operator.cc
utilities/merge_operators/string_append/stringappend.cc
utilities/merge_operators/string_append/stringappend2.cc
utilities/merge_operators/uint64add.cc
@ -705,6 +706,10 @@ set(TESTS
util/thread_local_test.cc
utilities/backupable/backupable_db_test.cc
utilities/blob_db/blob_db_test.cc
utilities/cassandra/cassandra_functional_test.cc
utilities/cassandra/cassandra_format_test.cc
utilities/cassandra/cassandra_row_merge_test.cc
utilities/cassandra/cassandra_serialize_test.cc
utilities/checkpoint/checkpoint_test.cc
utilities/column_aware_encoding_test.cc
utilities/date_tiered/date_tiered_test.cc
@ -713,10 +718,6 @@ set(TESTS
utilities/geodb/geodb_test.cc
utilities/lua/rocks_lua_test.cc
utilities/memory/memory_test.cc
utilities/merge_operators/cassandra/cassandra_merge_test.cc
utilities/merge_operators/cassandra/cassandra_format_test.cc
utilities/merge_operators/cassandra/cassandra_row_merge_test.cc
utilities/merge_operators/cassandra/cassandra_serialize_test.cc
utilities/merge_operators/string_append/stringappend_test.cc
utilities/object_registry_test.cc
utilities/option_change_migration/option_change_migration_test.cc
@ -757,7 +758,7 @@ set(TESTUTIL_SOURCE
monitoring/thread_status_updater_debug.cc
table/mock_table.cc
util/fault_injection_test_env.cc
utilities/merge_operators/cassandra/test_utils.cc
utilities/cassandra/test_utils.cc
)
# test utilities are only build in debug
enable_testing()

@ -405,7 +405,7 @@ TESTS = \
write_buffer_manager_test \
stringappend_test \
cassandra_format_test \
cassandra_merge_test \
cassandra_functional_test \
cassandra_row_merge_test \
cassandra_serialize_test \
ttl_test \
@ -1000,16 +1000,16 @@ option_change_migration_test: utilities/option_change_migration/option_change_mi
stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
cassandra_format_test: utilities/merge_operators/cassandra/cassandra_format_test.o $(LIBOBJECTS) $(TESTHARNESS)
cassandra_format_test: utilities/cassandra/cassandra_format_test.o utilities/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
cassandra_merge_test: utilities/merge_operators/cassandra/cassandra_merge_test.o utilities/merge_operators/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS)
cassandra_functional_test: utilities/cassandra/cassandra_functional_test.o utilities/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
cassandra_row_merge_test: utilities/merge_operators/cassandra/cassandra_row_merge_test.o utilities/merge_operators/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS)
cassandra_row_merge_test: utilities/cassandra/cassandra_row_merge_test.o utilities/cassandra/test_utils.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
cassandra_serialize_test: utilities/merge_operators/cassandra/cassandra_serialize_test.o $(LIBOBJECTS) $(TESTHARNESS)
cassandra_serialize_test: utilities/cassandra/cassandra_serialize_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
redis_test: utilities/redis/redis_lists_test.o $(LIBOBJECTS) $(TESTHARNESS)

@ -212,6 +212,9 @@ cpp_library(
"utilities/blob_db/blob_log_reader.cc",
"utilities/blob_db/blob_log_writer.cc",
"utilities/blob_db/blob_log_format.cc",
"utilities/cassandra/cassandra_compaction_filter.cc",
"utilities/cassandra/format.cc",
"utilities/cassandra/merge_operator.cc",
"utilities/checkpoint/checkpoint_impl.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc",
@ -226,8 +229,6 @@ cpp_library(
"utilities/leveldb_options/leveldb_options.cc",
"utilities/lua/rocks_lua_compaction_filter.cc",
"utilities/memory/memory_util.cc",
"utilities/merge_operators/cassandra/format.cc",
"utilities/merge_operators/cassandra/merge_operator.cc",
"utilities/merge_operators/max.cc",
"utilities/merge_operators/put.cc",
"utilities/merge_operators/string_append/stringappend.cc",
@ -275,7 +276,7 @@ cpp_library(
"util/testharness.cc",
"util/testutil.cc",
"db/db_test_util.cc",
"utilities/merge_operators/cassandra/test_utils.cc",
"utilities/cassandra/test_utils.cc",
"utilities/col_buf_encoder.cc",
"utilities/col_buf_decoder.cc",
"utilities/column_aware_encoding_util.cc",
@ -325,16 +326,16 @@ ROCKS_TESTS = [['arena_test', 'util/arena_test.cc', 'serial'],
['c_test', 'db/c_test.c', 'serial'],
['cache_test', 'cache/cache_test.cc', 'serial'],
['cassandra_format_test',
'utilities/merge_operators/cassandra/cassandra_format_test.cc',
'utilities/cassandra/cassandra_format_test.cc',
'serial'],
['cassandra_merge_test',
'utilities/merge_operators/cassandra/cassandra_merge_test.cc',
['cassandra_functional_test',
'utilities/cassandra/cassandra_functional_test.cc',
'serial'],
['cassandra_row_merge_test',
'utilities/merge_operators/cassandra/cassandra_row_merge_test.cc',
'utilities/cassandra/cassandra_row_merge_test.cc',
'serial'],
['cassandra_serialize_test',
'utilities/merge_operators/cassandra/cassandra_serialize_test.cc',
'utilities/cassandra/cassandra_serialize_test.cc',
'serial'],
['checkpoint_test', 'utilities/checkpoint/checkpoint_test.cc', 'serial'],
['cleanable_test', 'table/cleanable_test.cc', 'serial'],

@ -24,6 +24,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/options.cc
rocksjni/ratelimiterjni.cc
rocksjni/remove_emptyvalue_compactionfilterjni.cc
rocksjni/cassandra_compactionfilterjni.cc
rocksjni/restorejni.cc
rocksjni/rocksdb_exception_test.cc
rocksjni/rocksjni.cc
@ -55,6 +56,8 @@ set(NATIVE_JAVA_CLASSES
org.rocksdb.BlockBasedTableConfig
org.rocksdb.BloomFilter
org.rocksdb.Cache
org.rocksdb.CassandraCompactionFilter
org.rocksdb.CassandraValueMergeOperator
org.rocksdb.Checkpoint
org.rocksdb.ClockCache
org.rocksdb.ColumnFamilyHandle

@ -7,6 +7,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\
org.rocksdb.BloomFilter\
org.rocksdb.Checkpoint\
org.rocksdb.ClockCache\
org.rocksdb.CassandraCompactionFilter\
org.rocksdb.CassandraValueMergeOperator\
org.rocksdb.ColumnFamilyHandle\
org.rocksdb.ColumnFamilyOptions\

@ -0,0 +1,22 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <jni.h>
#include "include/org_rocksdb_CassandraCompactionFilter.h"
#include "utilities/cassandra/cassandra_compaction_filter.h"
/*
* Class: org_rocksdb_CassandraCompactionFilter
* Method: createNewCassandraCompactionFilter0
* Signature: ()J
*/
jlong Java_org_rocksdb_CassandraCompactionFilter_createNewCassandraCompactionFilter0(
JNIEnv* env, jclass jcls, jboolean purge_ttl_on_expiration) {
auto* compaction_filter =
new rocksdb::cassandra::CassandraCompactionFilter(purge_ttl_on_expiration);
// set the native handle to our native compaction filter
return reinterpret_cast<jlong>(compaction_filter);
}

@ -20,7 +20,7 @@
#include "rocksdb/table.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators/cassandra/merge_operator.h"
#include "utilities/cassandra/merge_operator.h"
/*
* Class: org_rocksdb_CassandraValueMergeOperator

@ -0,0 +1,18 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* Just a Java wrapper around CassandraCompactionFilter implemented in C++
*/
public class CassandraCompactionFilter
extends AbstractCompactionFilter<Slice> {
public CassandraCompactionFilter(boolean purgeTtlOnExpiration) {
super(createNewCassandraCompactionFilter0(purgeTtlOnExpiration));
}
private native static long createNewCassandraCompactionFilter0(boolean purgeTtlOnExpiration);
}

@ -159,6 +159,9 @@ LIB_SOURCES = \
utilities/blob_db/blob_log_reader.cc \
utilities/blob_db/blob_log_writer.cc \
utilities/blob_db/blob_log_format.cc \
utilities/cassandra/cassandra_compaction_filter.cc \
utilities/cassandra/format.cc \
utilities/cassandra/merge_operator.cc \
utilities/checkpoint/checkpoint_impl.cc \
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \
utilities/convenience/info_log_finder.cc \
@ -173,8 +176,6 @@ LIB_SOURCES = \
utilities/leveldb_options/leveldb_options.cc \
utilities/lua/rocks_lua_compaction_filter.cc \
utilities/memory/memory_util.cc \
utilities/merge_operators/cassandra/format.cc \
utilities/merge_operators/cassandra/merge_operator.cc \
utilities/merge_operators/max.cc \
utilities/merge_operators/put.cc \
utilities/merge_operators/string_append/stringappend.cc \
@ -225,7 +226,7 @@ TEST_LIB_SOURCES = \
util/testharness.cc \
util/testutil.cc \
db/db_test_util.cc \
utilities/merge_operators/cassandra/test_utils.cc \
utilities/cassandra/test_utils.cc \
MAIN_SOURCES = \
cache/cache_bench.cc \
@ -329,6 +330,10 @@ MAIN_SOURCES = \
util/thread_local_test.cc \
utilities/backupable/backupable_db_test.cc \
utilities/blob_db/blob_db_test.cc \
utilities/cassandra/cassandra_format_test.cc \
utilities/cassandra/cassandra_functional_test.cc \
utilities/cassandra/cassandra_row_merge_test.cc \
utilities/cassandra/cassandra_serialize_test.cc \
utilities/checkpoint/checkpoint_test.cc \
utilities/column_aware_encoding_exp.cc \
utilities/column_aware_encoding_test.cc \
@ -339,10 +344,6 @@ MAIN_SOURCES = \
utilities/lua/rocks_lua_test.cc \
utilities/memory/memory_test.cc \
utilities/merge_operators/string_append/stringappend_test.cc \
utilities/merge_operators/cassandra/cassandra_merge_test.cc \
utilities/merge_operators/cassandra/cassandra_format_test.cc \
utilities/merge_operators/cassandra/cassandra_row_merge_test.cc \
utilities/merge_operators/cassandra/cassandra_serialize_test.cc \
utilities/object_registry_test.cc \
utilities/option_change_migration/option_change_migration_test.cc \
utilities/options/options_util_test.cc \
@ -379,6 +380,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/options.cc \
java/rocksjni/ratelimiterjni.cc \
java/rocksjni/remove_emptyvalue_compactionfilterjni.cc \
java/rocksjni/cassandra_compactionfilterjni.cc \
java/rocksjni/restorejni.cc \
java/rocksjni/rocksjni.cc \
java/rocksjni/rocksdb_exception_test.cc \

@ -0,0 +1,47 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/cassandra/cassandra_compaction_filter.h"
#include <string>
#include "rocksdb/slice.h"
#include "utilities/cassandra/format.h"
namespace rocksdb {
namespace cassandra {
const char* CassandraCompactionFilter::Name() const {
return "CassandraCompactionFilter";
}
CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
int level,
const Slice& key,
ValueType value_type,
const Slice& existing_value,
std::string* new_value,
std::string* skip_until) const {
bool value_changed = false;
RowValue row_value = RowValue::Deserialize(
existing_value.data(), existing_value.size());
RowValue compacted = purge_ttl_on_expiration_ ?
row_value.PurgeTtl(&value_changed) :
row_value.ExpireTtl(&value_changed);
if(compacted.Empty()) {
return Decision::kRemove;
}
if (value_changed) {
compacted.Serialize(new_value);
return Decision::kChangeValue;
}
return Decision::kKeep;
}
} // namespace cassandra
} // namespace rocksdb

@ -0,0 +1,39 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <string>
#include "rocksdb/compaction_filter.h"
#include "rocksdb/slice.h"
namespace rocksdb {
namespace cassandra {
/**
* Compaction filter for removing expired Cassandra data with ttl.
* If option `purge_ttl_on_expiration` is set to true, expired data
* will be directly purged. Otherwise expired data will be converted
* tombstones first, then be eventally removed after gc grace period.
* `purge_ttl_on_expiration` should only be on in the case all the
* writes have same ttl setting, otherwise it could bring old data back.
*/
class CassandraCompactionFilter : public CompactionFilter {
public:
explicit CassandraCompactionFilter(bool purge_ttl_on_expiration)
: purge_ttl_on_expiration_(purge_ttl_on_expiration) {}
const char* Name() const override;
virtual Decision FilterV2(int level,
const Slice& key,
ValueType value_type,
const Slice& existing_value,
std::string* new_value,
std::string* skip_until) const override;
private:
bool purge_ttl_on_expiration_;
};
} // namespace cassandra
} // namespace rocksdb

@ -2,14 +2,13 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include <cstring>
#include <memory>
#include "util/testharness.h"
#include "utilities/merge_operators/cassandra/format.h"
#include "utilities/merge_operators/cassandra/serialize.h"
#include "utilities/cassandra/format.h"
#include "utilities/cassandra/serialize.h"
#include "utilities/cassandra/test_utils.h"
using namespace rocksdb::cassandra;
@ -46,7 +45,7 @@ TEST(ColumnTest, Column) {
// Verify the deserialization.
std::string saved_dest = dest;
std::unique_ptr<Column> c1 = Column::Deserialize(saved_dest.c_str(), 0);
std::shared_ptr<Column> c1 = Column::Deserialize(saved_dest.c_str(), 0);
EXPECT_EQ(c1->Index(), index);
EXPECT_EQ(c1->Timestamp(), timestamp);
EXPECT_EQ(c1->Size(), 14 + sizeof(data));
@ -58,7 +57,7 @@ TEST(ColumnTest, Column) {
// Verify the ColumnBase::Deserialization.
saved_dest = dest;
std::unique_ptr<ColumnBase> c2 =
std::shared_ptr<ColumnBase> c2 =
ColumnBase::Deserialize(saved_dest.c_str(), c.Size());
c2->Serialize(&dest);
EXPECT_EQ(dest.size(), 3 * c.Size());
@ -101,7 +100,7 @@ TEST(ExpiringColumnTest, ExpiringColumn) {
// Verify the deserialization.
std::string saved_dest = dest;
std::unique_ptr<ExpiringColumn> c1 =
std::shared_ptr<ExpiringColumn> c1 =
ExpiringColumn::Deserialize(saved_dest.c_str(), 0);
EXPECT_EQ(c1->Index(), index);
EXPECT_EQ(c1->Timestamp(), timestamp);
@ -114,7 +113,7 @@ TEST(ExpiringColumnTest, ExpiringColumn) {
// Verify the ColumnBase::Deserialization.
saved_dest = dest;
std::unique_ptr<ColumnBase> c2 =
std::shared_ptr<ColumnBase> c2 =
ColumnBase::Deserialize(saved_dest.c_str(), c.Size());
c2->Serialize(&dest);
EXPECT_EQ(dest.size(), 3 * c.Size());
@ -151,7 +150,7 @@ TEST(TombstoneTest, Tombstone) {
EXPECT_EQ(Deserialize<int64_t>(dest.c_str(), offset), marked_for_delete_at);
// Verify the deserialization.
std::unique_ptr<Tombstone> c1 = Tombstone::Deserialize(dest.c_str(), 0);
std::shared_ptr<Tombstone> c1 = Tombstone::Deserialize(dest.c_str(), 0);
EXPECT_EQ(c1->Index(), index);
EXPECT_EQ(c1->Timestamp(), marked_for_delete_at);
EXPECT_EQ(c1->Size(), 14);
@ -162,7 +161,7 @@ TEST(TombstoneTest, Tombstone) {
std::memcmp(dest.c_str(), dest.c_str() + c.Size(), c.Size()) == 0);
// Verify the ColumnBase::Deserialization.
std::unique_ptr<ColumnBase> c2 =
std::shared_ptr<ColumnBase> c2 =
ColumnBase::Deserialize(dest.c_str(), c.Size());
c2->Serialize(&dest);
EXPECT_EQ(dest.size(), 3 * c.Size());
@ -204,7 +203,7 @@ TEST(RowValueTest, RowTombstone) {
}
TEST(RowValueTest, RowWithColumns) {
std::vector<std::unique_ptr<ColumnBase>> columns;
std::vector<std::shared_ptr<ColumnBase>> columns;
int64_t last_modified_time = 1494022807048;
std::size_t columns_data_size = 0;
@ -212,7 +211,7 @@ TEST(RowValueTest, RowWithColumns) {
int8_t e_index = 0;
int64_t e_timestamp = 1494022807044;
int32_t e_ttl = 3600;
columns.push_back(std::unique_ptr<ExpiringColumn>(
columns.push_back(std::shared_ptr<ExpiringColumn>(
new ExpiringColumn(ColumnTypeMask::EXPIRATION_MASK, e_index,
e_timestamp, sizeof(e_data), e_data, e_ttl)));
columns_data_size += columns[0]->Size();
@ -220,14 +219,14 @@ TEST(RowValueTest, RowWithColumns) {
char c_data[4] = {'d', 'a', 't', 'a'};
int8_t c_index = 1;
int64_t c_timestamp = 1494022807048;
columns.push_back(std::unique_ptr<Column>(
columns.push_back(std::shared_ptr<Column>(
new Column(0, c_index, c_timestamp, sizeof(c_data), c_data)));
columns_data_size += columns[1]->Size();
int8_t t_index = 2;
int32_t t_local_deletion_time = 1494022801;
int64_t t_marked_for_delete_at = 1494022807043;
columns.push_back(std::unique_ptr<Tombstone>(
columns.push_back(std::shared_ptr<Tombstone>(
new Tombstone(ColumnTypeMask::DELETION_MASK,
t_index, t_local_deletion_time, t_marked_for_delete_at)));
columns_data_size += columns[2]->Size();
@ -301,6 +300,50 @@ TEST(RowValueTest, RowWithColumns) {
std::memcmp(dest.c_str(), dest.c_str() + r.Size(), r.Size()) == 0);
}
TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired) {
int64_t now = time(nullptr);
auto row_value = CreateTestRowValue({
std::make_tuple(kColumn, 0, ToMicroSeconds(now)),
std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired
std::make_tuple(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired
std::make_tuple(kTombstone, 3, ToMicroSeconds(now))
});
bool changed = false;
auto purged = row_value.PurgeTtl(&changed);
EXPECT_TRUE(changed);
EXPECT_EQ(purged.columns_.size(), 3);
VerifyRowValueColumns(purged.columns_, 0, kColumn, 0, ToMicroSeconds(now));
VerifyRowValueColumns(purged.columns_, 1, kExpiringColumn, 2, ToMicroSeconds(now));
VerifyRowValueColumns(purged.columns_, 2, kTombstone, 3, ToMicroSeconds(now));
purged.PurgeTtl(&changed);
EXPECT_FALSE(changed);
}
TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones) {
int64_t now = time(nullptr);
auto row_value = CreateTestRowValue({
std::make_tuple(kColumn, 0, ToMicroSeconds(now)),
std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 10)), //expired
std::make_tuple(kExpiringColumn, 2, ToMicroSeconds(now)), // not expired
std::make_tuple(kTombstone, 3, ToMicroSeconds(now))
});
bool changed = false;
auto compacted = row_value.ExpireTtl(&changed);
EXPECT_TRUE(changed);
EXPECT_EQ(compacted.columns_.size(), 4);
VerifyRowValueColumns(compacted.columns_, 0, kColumn, 0, ToMicroSeconds(now));
VerifyRowValueColumns(compacted.columns_, 1, kTombstone, 1, ToMicroSeconds(now - 10));
VerifyRowValueColumns(compacted.columns_, 2, kExpiringColumn, 2, ToMicroSeconds(now));
VerifyRowValueColumns(compacted.columns_, 3, kTombstone, 3, ToMicroSeconds(now));
compacted.ExpireTtl(&changed);
EXPECT_FALSE(changed);
}
} // namespace cassandra
} // namespace rocksdb

@ -0,0 +1,251 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <iostream>
#include "rocksdb/db.h"
#include "db/db_impl.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "util/testharness.h"
#include "util/random.h"
#include "utilities/merge_operators.h"
#include "utilities/cassandra/cassandra_compaction_filter.h"
#include "utilities/cassandra/merge_operator.h"
#include "utilities/cassandra/test_utils.h"
using namespace rocksdb;
namespace rocksdb {
namespace cassandra {
// Path to the database on file system
const std::string kDbName = test::TmpDir() + "/cassandra_functional_test";
class CassandraStore {
public:
explicit CassandraStore(std::shared_ptr<DB> db)
: db_(db),
merge_option_(),
get_option_() {
assert(db);
}
bool Append(const std::string& key, const RowValue& val){
std::string result;
val.Serialize(&result);
Slice valSlice(result.data(), result.size());
auto s = db_->Merge(merge_option_, key, valSlice);
if (s.ok()) {
return true;
} else {
std::cerr << "ERROR " << s.ToString() << std::endl;
return false;
}
}
void Flush() {
dbfull()->TEST_FlushMemTable();
dbfull()->TEST_WaitForCompact();
}
void Compact() {
dbfull()->TEST_CompactRange(
0, nullptr, nullptr, db_->DefaultColumnFamily());
}
std::tuple<bool, RowValue> Get(const std::string& key){
std::string result;
auto s = db_->Get(get_option_, key, &result);
if (s.ok()) {
return std::make_tuple(true,
RowValue::Deserialize(result.data(),
result.size()));
}
if (!s.IsNotFound()) {
std::cerr << "ERROR " << s.ToString() << std::endl;
}
return std::make_tuple(false, RowValue(0, 0));
}
private:
std::shared_ptr<DB> db_;
WriteOptions merge_option_;
ReadOptions get_option_;
DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_.get()); }
};
class TestCompactionFilterFactory : public CompactionFilterFactory {
public:
explicit TestCompactionFilterFactory(bool purge_ttl_on_expiration)
: purge_ttl_on_expiration_(purge_ttl_on_expiration) {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
return unique_ptr<CompactionFilter>(new CassandraCompactionFilter(purge_ttl_on_expiration_));
}
virtual const char* Name() const override {
return "TestCompactionFilterFactory";
}
private:
bool purge_ttl_on_expiration_;
};
// The class for unit-testing
class CassandraFunctionalTest : public testing::Test {
public:
CassandraFunctionalTest() {
DestroyDB(kDbName, Options()); // Start each test with a fresh DB
}
std::shared_ptr<DB> OpenDb() {
DB* db;
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new CassandraValueMergeOperator());
auto* cf_factory = new TestCompactionFilterFactory(purge_ttl_on_expiration_);
options.compaction_filter_factory.reset(cf_factory);
EXPECT_OK(DB::Open(options, kDbName, &db));
return std::shared_ptr<DB>(db);
}
bool purge_ttl_on_expiration_ = false;
};
// THE TEST CASES BEGIN HERE
TEST_F(CassandraFunctionalTest, SimpleMergeTest) {
CassandraStore store(OpenDb());
store.Append("k1", CreateTestRowValue({
std::make_tuple(kTombstone, 0, 5),
std::make_tuple(kColumn, 1, 8),
std::make_tuple(kExpiringColumn, 2, 5),
}));
store.Append("k1",CreateTestRowValue({
std::make_tuple(kColumn, 0, 2),
std::make_tuple(kExpiringColumn, 1, 5),
std::make_tuple(kTombstone, 2, 7),
std::make_tuple(kExpiringColumn, 7, 17),
}));
store.Append("k1", CreateTestRowValue({
std::make_tuple(kExpiringColumn, 0, 6),
std::make_tuple(kTombstone, 1, 5),
std::make_tuple(kColumn, 2, 4),
std::make_tuple(kTombstone, 11, 11),
}));
auto ret = store.Get("k1");
ASSERT_TRUE(std::get<0>(ret));
RowValue& merged = std::get<1>(ret);
EXPECT_EQ(merged.columns_.size(), 5);
VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6);
VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8);
VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7);
VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17);
VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11);
}
TEST_F(CassandraFunctionalTest,
CompactionShouldConvertExpiredColumnsToTombstone) {
CassandraStore store(OpenDb());
int64_t now= time(nullptr);
store.Append("k1", CreateTestRowValue({
std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10)), // not expired
std::make_tuple(kTombstone, 3, ToMicroSeconds(now))
}));
store.Flush();
store.Append("k1",CreateTestRowValue({
std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
std::make_tuple(kColumn, 2, ToMicroSeconds(now))
}));
store.Flush();
store.Compact();
auto ret = store.Get("k1");
ASSERT_TRUE(std::get<0>(ret));
RowValue& merged = std::get<1>(ret);
EXPECT_EQ(merged.columns_.size(), 4);
VerifyRowValueColumns(merged.columns_, 0, kTombstone, 0, ToMicroSeconds(now - 10));
VerifyRowValueColumns(merged.columns_, 1, kExpiringColumn, 1, ToMicroSeconds(now - kTtl + 10));
VerifyRowValueColumns(merged.columns_, 2, kColumn, 2, ToMicroSeconds(now));
VerifyRowValueColumns(merged.columns_, 3, kTombstone, 3, ToMicroSeconds(now));
}
TEST_F(CassandraFunctionalTest,
CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn) {
purge_ttl_on_expiration_ = true;
CassandraStore store(OpenDb());
int64_t now = time(nullptr);
store.Append("k1", CreateTestRowValue({
std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)), //expired
std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now)), // not expired
std::make_tuple(kTombstone, 3, ToMicroSeconds(now))
}));
store.Flush();
store.Append("k1",CreateTestRowValue({
std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)), //expired
std::make_tuple(kColumn, 2, ToMicroSeconds(now))
}));
store.Flush();
store.Compact();
auto ret = store.Get("k1");
ASSERT_TRUE(std::get<0>(ret));
RowValue& merged = std::get<1>(ret);
EXPECT_EQ(merged.columns_.size(), 3);
VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 1, ToMicroSeconds(now));
VerifyRowValueColumns(merged.columns_, 1, kColumn, 2, ToMicroSeconds(now));
VerifyRowValueColumns(merged.columns_, 2, kTombstone, 3, ToMicroSeconds(now));
}
TEST_F(CassandraFunctionalTest,
CompactionShouldRemoveRowWhenAllColumnsExpiredIfPurgeTtlIsOn) {
purge_ttl_on_expiration_ = true;
CassandraStore store(OpenDb());
int64_t now = time(nullptr);
store.Append("k1", CreateTestRowValue({
std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 20)),
std::make_tuple(kExpiringColumn, 1, ToMicroSeconds(now - kTtl - 20)),
}));
store.Flush();
store.Append("k1",CreateTestRowValue({
std::make_tuple(kExpiringColumn, 0, ToMicroSeconds(now - kTtl - 10)),
}));
store.Flush();
store.Compact();
ASSERT_FALSE(std::get<0>(store.Get("k1")));
}
} // namespace cassandra
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -2,13 +2,11 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include <memory>
#include "util/testharness.h"
#include "utilities/merge_operators/cassandra/format.h"
#include "utilities/merge_operators/cassandra/test_utils.h"
#include "utilities/cassandra/format.h"
#include "utilities/cassandra/test_utils.h"
namespace rocksdb {
namespace cassandra {

@ -2,11 +2,9 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include "util/testharness.h"
#include "utilities/merge_operators/cassandra/serialize.h"
#include "utilities/cassandra/serialize.h"
using namespace rocksdb::cassandra;

@ -2,8 +2,6 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include "format.h"
@ -11,7 +9,7 @@
#include <map>
#include <memory>
#include "utilities/merge_operators/cassandra/serialize.h"
#include "utilities/cassandra/serialize.h"
namespace rocksdb {
namespace cassandra {
@ -42,7 +40,7 @@ void ColumnBase::Serialize(std::string* dest) const {
rocksdb::cassandra::Serialize<int8_t>(index_, dest);
}
std::unique_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
std::shared_ptr<ColumnBase> ColumnBase::Deserialize(const char* src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
@ -79,7 +77,7 @@ void Column::Serialize(std::string* dest) const {
dest->append(value_, value_size_);
}
std::unique_ptr<Column> Column::Deserialize(const char *src,
std::shared_ptr<Column> Column::Deserialize(const char *src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
@ -89,8 +87,8 @@ std::unique_ptr<Column> Column::Deserialize(const char *src,
offset += sizeof(timestamp);
int32_t value_size = rocksdb::cassandra::Deserialize<int32_t>(src, offset);
offset += sizeof(value_size);
return std::unique_ptr<Column>(
new Column(mask, index, timestamp, value_size, src + offset));
return std::make_shared<Column>(
mask, index, timestamp, value_size, src + offset);
}
ExpiringColumn::ExpiringColumn(
@ -112,7 +110,32 @@ void ExpiringColumn::Serialize(std::string* dest) const {
rocksdb::cassandra::Serialize<int32_t>(ttl_, dest);
}
std::unique_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
std::chrono::time_point<std::chrono::system_clock> ExpiringColumn::TimePoint() const {
return std::chrono::time_point<std::chrono::system_clock>(std::chrono::microseconds(Timestamp()));
}
std::chrono::seconds ExpiringColumn::Ttl() const {
return std::chrono::seconds(ttl_);
}
bool ExpiringColumn::Expired() const {
return TimePoint() + Ttl() < std::chrono::system_clock::now();
}
std::shared_ptr<Tombstone> ExpiringColumn::ToTombstone() const {
auto expired_at = (TimePoint() + Ttl()).time_since_epoch();
int32_t local_deletion_time = static_cast<int32_t>(
std::chrono::duration_cast<std::chrono::seconds>(expired_at).count());
int64_t marked_for_delete_at =
std::chrono::duration_cast<std::chrono::microseconds>(expired_at).count();
return std::make_shared<Tombstone>(
ColumnTypeMask::DELETION_MASK,
Index(),
local_deletion_time,
marked_for_delete_at);
}
std::shared_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
const char *src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
@ -126,8 +149,8 @@ std::unique_ptr<ExpiringColumn> ExpiringColumn::Deserialize(
const char* value = src + offset;
offset += value_size;
int32_t ttl = rocksdb::cassandra::Deserialize<int32_t>(src, offset);
return std::unique_ptr<ExpiringColumn>(
new ExpiringColumn(mask, index, timestamp, value_size, value, ttl));
return std::make_shared<ExpiringColumn>(
mask, index, timestamp, value_size, value, ttl);
}
Tombstone::Tombstone(
@ -153,7 +176,7 @@ void Tombstone::Serialize(std::string* dest) const {
rocksdb::cassandra::Serialize<int64_t>(marked_for_delete_at_, dest);
}
std::unique_ptr<Tombstone> Tombstone::Deserialize(const char *src,
std::shared_ptr<Tombstone> Tombstone::Deserialize(const char *src,
std::size_t offset) {
int8_t mask = rocksdb::cassandra::Deserialize<int8_t>(src, offset);
offset += sizeof(mask);
@ -164,8 +187,8 @@ std::unique_ptr<Tombstone> Tombstone::Deserialize(const char *src,
offset += sizeof(int32_t);
int64_t marked_for_delete_at =
rocksdb::cassandra::Deserialize<int64_t>(src, offset);
return std::unique_ptr<Tombstone>(
new Tombstone(mask, index, local_deletion_time, marked_for_delete_at));
return std::make_shared<Tombstone>(
mask, index, local_deletion_time, marked_for_delete_at);
}
RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
@ -173,7 +196,7 @@ RowValue::RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at)
marked_for_delete_at_(marked_for_delete_at), columns_(),
last_modified_time_(0) {}
RowValue::RowValue(std::vector<std::unique_ptr<ColumnBase>> columns,
RowValue::RowValue(Columns columns,
int64_t last_modified_time)
: local_deletion_time_(kDefaultLocalDeletionTime),
marked_for_delete_at_(kDefaultMarkedForDeleteAt),
@ -208,6 +231,49 @@ void RowValue::Serialize(std::string* dest) const {
}
}
RowValue RowValue::PurgeTtl(bool* changed) const {
*changed = false;
Columns new_columns;
for (auto& column : columns_) {
if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
std::shared_ptr<ExpiringColumn> expiring_column =
std::static_pointer_cast<ExpiringColumn>(column);
if(expiring_column->Expired()){
*changed = true;
continue;
}
}
new_columns.push_back(column);
}
return RowValue(std::move(new_columns), last_modified_time_);
}
RowValue RowValue::ExpireTtl(bool* changed) const {
*changed = false;
Columns new_columns;
for (auto& column : columns_) {
if(column->Mask() == ColumnTypeMask::EXPIRATION_MASK) {
std::shared_ptr<ExpiringColumn> expiring_column =
std::static_pointer_cast<ExpiringColumn>(column);
if(expiring_column->Expired()) {
shared_ptr<Tombstone> tombstone = expiring_column->ToTombstone();
new_columns.push_back(tombstone);
*changed = true;
continue;
}
}
new_columns.push_back(column);
}
return RowValue(std::move(new_columns), last_modified_time_);
}
bool RowValue::Empty() const {
return columns_.empty();
}
RowValue RowValue::Deserialize(const char *src, std::size_t size) {
std::size_t offset = 0;
assert(size >= sizeof(local_deletion_time_) + sizeof(marked_for_delete_at_));
@ -223,7 +289,7 @@ RowValue RowValue::Deserialize(const char *src, std::size_t size) {
assert(local_deletion_time == kDefaultLocalDeletionTime);
assert(marked_for_delete_at == kDefaultMarkedForDeleteAt);
std::vector<std::unique_ptr<ColumnBase>> columns;
Columns columns;
int64_t last_modified_time = 0;
while (offset < size) {
auto c = ColumnBase::Deserialize(src, offset);
@ -254,7 +320,7 @@ RowValue RowValue::Merge(std::vector<RowValue>&& values) {
return r1.LastModifiedTime() > r2.LastModifiedTime();
});
std::map<int8_t, std::unique_ptr<ColumnBase>> merged_columns;
std::map<int8_t, std::shared_ptr<ColumnBase>> merged_columns;
int64_t tombstone_timestamp = 0;
for (auto& value : values) {
@ -268,17 +334,17 @@ RowValue RowValue::Merge(std::vector<RowValue>&& values) {
for (auto& column : value.columns_) {
int8_t index = column->Index();
if (merged_columns.find(index) == merged_columns.end()) {
merged_columns[index] = std::move(column);
merged_columns[index] = column;
} else {
if (column->Timestamp() > merged_columns[index]->Timestamp()) {
merged_columns[index] = std::move(column);
merged_columns[index] = column;
}
}
}
}
int64_t last_modified_time = 0;
std::vector<std::unique_ptr<ColumnBase>> columns;
Columns columns;
for (auto& pair: merged_columns) {
// For some row, its last_modified_time > row tombstone_timestamp, but
// it might have rows whose timestamp is ealier than tombstone, so we

@ -2,8 +2,6 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
/**
* The encoding of Cassandra Row Value.
@ -57,6 +55,7 @@
*/
#pragma once
#include <chrono>
#include <vector>
#include <memory>
#include "rocksdb/merge_operator.h"
@ -72,6 +71,7 @@ enum ColumnTypeMask {
EXPIRATION_MASK = 0x02,
};
class ColumnBase {
public:
ColumnBase(int8_t mask, int8_t index);
@ -82,8 +82,7 @@ public:
virtual int8_t Index() const;
virtual std::size_t Size() const;
virtual void Serialize(std::string* dest) const;
static std::unique_ptr<ColumnBase> Deserialize(const char* src,
static std::shared_ptr<ColumnBase> Deserialize(const char* src,
std::size_t offset);
private:
@ -99,8 +98,7 @@ public:
virtual int64_t Timestamp() const override;
virtual std::size_t Size() const override;
virtual void Serialize(std::string* dest) const override;
static std::unique_ptr<Column> Deserialize(const char* src,
static std::shared_ptr<Column> Deserialize(const char* src,
std::size_t offset);
private:
@ -109,44 +107,50 @@ private:
const char* value_;
};
class ExpiringColumn : public Column {
class Tombstone : public ColumnBase {
public:
ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp,
int32_t value_size, const char* value, int32_t ttl);
Tombstone(int8_t mask, int8_t index,
int32_t local_deletion_time, int64_t marked_for_delete_at);
virtual int64_t Timestamp() const override;
virtual std::size_t Size() const override;
virtual void Serialize(std::string* dest) const override;
static std::unique_ptr<ExpiringColumn> Deserialize(const char* src,
std::size_t offset);
static std::shared_ptr<Tombstone> Deserialize(const char* src,
std::size_t offset);
private:
int32_t ttl_;
int32_t local_deletion_time_;
int64_t marked_for_delete_at_;
};
class Tombstone : public ColumnBase {
class ExpiringColumn : public Column {
public:
Tombstone(int8_t mask, int8_t index,
int32_t local_deletion_time, int64_t marked_for_delete_at);
ExpiringColumn(int8_t mask, int8_t index, int64_t timestamp,
int32_t value_size, const char* value, int32_t ttl);
virtual int64_t Timestamp() const override;
virtual std::size_t Size() const override;
virtual void Serialize(std::string* dest) const override;
bool Expired() const;
std::shared_ptr<Tombstone> ToTombstone() const;
static std::unique_ptr<Tombstone> Deserialize(const char* src,
std::size_t offset);
static std::shared_ptr<ExpiringColumn> Deserialize(const char* src,
std::size_t offset);
private:
int32_t local_deletion_time_;
int64_t marked_for_delete_at_;
int32_t ttl_;
std::chrono::time_point<std::chrono::system_clock> TimePoint() const;
std::chrono::seconds Ttl() const;
};
typedef std::vector<std::shared_ptr<ColumnBase>> Columns;
class RowValue {
public:
// Create a Row Tombstone.
RowValue(int32_t local_deletion_time, int64_t marked_for_delete_at);
// Create a Row containing columns.
RowValue(std::vector<std::unique_ptr<ColumnBase>> columns,
RowValue(Columns columns,
int64_t last_modified_time);
RowValue(const RowValue& that) = delete;
RowValue(RowValue&& that) noexcept = default;
@ -159,6 +163,9 @@ public:
// otherwise it returns the max timestamp of containing columns.
int64_t LastModifiedTime() const;
void Serialize(std::string* dest) const;
RowValue PurgeTtl(bool* changed) const;
RowValue ExpireTtl(bool* changed) const;
bool Empty() const;
static RowValue Deserialize(const char* src, std::size_t size);
// Merge multiple rows according to their timestamp.
@ -167,12 +174,20 @@ public:
private:
int32_t local_deletion_time_;
int64_t marked_for_delete_at_;
std::vector<std::unique_ptr<ColumnBase>> columns_;
Columns columns_;
int64_t last_modified_time_;
FRIEND_TEST(RowValueTest, PurgeTtlShouldRemvoeAllColumnsExpired);
FRIEND_TEST(RowValueTest, ExpireTtlShouldConvertExpiredColumnsToTombstones);
FRIEND_TEST(RowValueMergeTest, Merge);
FRIEND_TEST(RowValueMergeTest, MergeWithRowTombstone);
FRIEND_TEST(CassandraMergeTest, SimpleTest);
FRIEND_TEST(CassandraFunctionalTest, SimpleMergeTest);
FRIEND_TEST(
CassandraFunctionalTest, CompactionShouldConvertExpiredColumnsToTombstone);
FRIEND_TEST(
CassandraFunctionalTest, CompactionShouldPurgeExpiredColumnsIfPurgeTtlIsOn);
FRIEND_TEST(
CassandraFunctionalTest, CompactionShouldRemoveRowWhenAllColumnExpiredIfPurgeTtlIsOn);
};
} // namepsace cassandrda

@ -13,7 +13,7 @@
#include "rocksdb/slice.h"
#include "rocksdb/merge_operator.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/cassandra/format.h"
#include "utilities/cassandra/format.h"
namespace rocksdb {
namespace cassandra {

@ -1,7 +1,7 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
@ -12,29 +12,29 @@ namespace cassandra {
const char kData[] = {'d', 'a', 't', 'a'};
const char kExpiringData[] = {'e', 'd', 'a', 't', 'a'};
const int32_t kLocalDeletionTime = 1;
const int32_t kTtl = 100;
const int32_t kTtl = 86400;
const int8_t kColumn = 0;
const int8_t kTombstone = 1;
const int8_t kExpiringColumn = 2;
std::unique_ptr<ColumnBase> CreateTestColumn(int8_t mask,
std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask,
int8_t index,
int64_t timestamp) {
if ((mask & ColumnTypeMask::DELETION_MASK) != 0) {
return std::unique_ptr<Tombstone>(new Tombstone(
return std::shared_ptr<Tombstone>(new Tombstone(
mask, index, kLocalDeletionTime, timestamp));
} else if ((mask & ColumnTypeMask::EXPIRATION_MASK) != 0) {
return std::unique_ptr<ExpiringColumn>(new ExpiringColumn(
return std::shared_ptr<ExpiringColumn>(new ExpiringColumn(
mask, index, timestamp, sizeof(kExpiringData), kExpiringData, kTtl));
} else {
return std::unique_ptr<Column>(
return std::shared_ptr<Column>(
new Column(mask, index, timestamp, sizeof(kData), kData));
}
}
RowValue CreateTestRowValue(
std::vector<std::tuple<int8_t, int8_t, int64_t>> column_specs) {
std::vector<std::unique_ptr<ColumnBase>> columns;
std::vector<std::shared_ptr<ColumnBase>> columns;
int64_t last_modified_time = 0;
for (auto spec: column_specs) {
auto c = CreateTestColumn(std::get<0>(spec), std::get<1>(spec),
@ -50,7 +50,7 @@ RowValue CreateRowTombstone(int64_t timestamp) {
}
void VerifyRowValueColumns(
std::vector<std::unique_ptr<ColumnBase>> &columns,
std::vector<std::shared_ptr<ColumnBase>> &columns,
std::size_t index_of_vector,
int8_t expected_mask,
int8_t expected_index,
@ -61,5 +61,9 @@ void VerifyRowValueColumns(
EXPECT_EQ(expected_index, columns[index_of_vector]->Index());
}
int64_t ToMicroSeconds(int64_t seconds) {
return seconds * (int64_t)1000000;
}
}
}

@ -8,8 +8,8 @@
#pragma once
#include <memory>
#include "util/testharness.h"
#include "utilities/merge_operators/cassandra/format.h"
#include "utilities/merge_operators/cassandra/serialize.h"
#include "utilities/cassandra/format.h"
#include "utilities/cassandra/serialize.h"
namespace rocksdb {
namespace cassandra {
@ -22,7 +22,7 @@ extern const int8_t kTombstone;
extern const int8_t kExpiringColumn;
std::unique_ptr<ColumnBase> CreateTestColumn(int8_t mask,
std::shared_ptr<ColumnBase> CreateTestColumn(int8_t mask,
int8_t index,
int64_t timestamp);
@ -32,12 +32,14 @@ RowValue CreateTestRowValue(
RowValue CreateRowTombstone(int64_t timestamp);
void VerifyRowValueColumns(
std::vector<std::unique_ptr<ColumnBase>> &columns,
std::vector<std::shared_ptr<ColumnBase>> &columns,
std::size_t index_of_vector,
int8_t expected_mask,
int8_t expected_index,
int64_t expected_timestamp
);
int64_t ToMicroSeconds(int64_t seconds);
}
}

@ -1,134 +0,0 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include <iostream>
#include "rocksdb/db.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "util/testharness.h"
#include "util/random.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/cassandra/merge_operator.h"
#include "utilities/merge_operators/cassandra/test_utils.h"
using namespace rocksdb;
namespace rocksdb {
namespace cassandra {
// Path to the database on file system
const std::string kDbName = test::TmpDir() + "/cassandramerge_test";
class CassandraStore {
public:
explicit CassandraStore(std::shared_ptr<DB> db)
: db_(db),
merge_option_(),
get_option_() {
assert(db);
}
bool Append(const std::string& key, const RowValue& val){
std::string result;
val.Serialize(&result);
Slice valSlice(result.data(), result.size());
auto s = db_->Merge(merge_option_, key, valSlice);
if (s.ok()) {
return true;
} else {
std::cerr << "ERROR " << s.ToString() << std::endl;
return false;
}
}
std::tuple<bool, RowValue> Get(const std::string& key){
std::string result;
auto s = db_->Get(get_option_, key, &result);
if (s.ok()) {
return std::make_tuple(true,
RowValue::Deserialize(result.data(),
result.size()));
}
if (!s.IsNotFound()) {
std::cerr << "ERROR " << s.ToString() << std::endl;
}
return std::make_tuple(false, RowValue(0, 0));
}
private:
std::shared_ptr<DB> db_;
WriteOptions merge_option_;
ReadOptions get_option_;
};
// The class for unit-testing
class CassandraMergeTest : public testing::Test {
public:
CassandraMergeTest() {
DestroyDB(kDbName, Options()); // Start each test with a fresh DB
}
std::shared_ptr<DB> OpenDb() {
DB* db;
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new CassandraValueMergeOperator());
EXPECT_OK(DB::Open(options, kDbName, &db));
return std::shared_ptr<DB>(db);
}
};
// THE TEST CASES BEGIN HERE
TEST_F(CassandraMergeTest, SimpleTest) {
auto db = OpenDb();
CassandraStore store(db);
store.Append("k1", CreateTestRowValue({
std::make_tuple(kTombstone, 0, 5),
std::make_tuple(kColumn, 1, 8),
std::make_tuple(kExpiringColumn, 2, 5),
}));
store.Append("k1",CreateTestRowValue({
std::make_tuple(kColumn, 0, 2),
std::make_tuple(kExpiringColumn, 1, 5),
std::make_tuple(kTombstone, 2, 7),
std::make_tuple(kExpiringColumn, 7, 17),
}));
store.Append("k1", CreateTestRowValue({
std::make_tuple(kExpiringColumn, 0, 6),
std::make_tuple(kTombstone, 1, 5),
std::make_tuple(kColumn, 2, 4),
std::make_tuple(kTombstone, 11, 11),
}));
auto ret = store.Get("k1");
ASSERT_TRUE(std::get<0>(ret));
RowValue& merged = std::get<1>(ret);
EXPECT_EQ(merged.columns_.size(), 5);
VerifyRowValueColumns(merged.columns_, 0, kExpiringColumn, 0, 6);
VerifyRowValueColumns(merged.columns_, 1, kColumn, 1, 8);
VerifyRowValueColumns(merged.columns_, 2, kTombstone, 2, 7);
VerifyRowValueColumns(merged.columns_, 3, kExpiringColumn, 7, 17);
VerifyRowValueColumns(merged.columns_, 4, kTombstone, 11, 11);
}
} // namespace cassandra
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Loading…
Cancel
Save