Support for adding TTL-ed column family

Summary:
This enables user to add a TTL column family to normal DB.

Next step should be to expand StackableDB and create StackableColumnFamily, such that users can for example add geo-spatial column families to normal DB.

Test Plan: added a test

Reviewers: dhruba, haobo, ljin

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D18201
main
Igor Canadi 10 years ago
parent 72ff275e3c
commit f868dcbbed
  1. 68
      include/utilities/db_ttl.h
  2. 10
      include/utilities/stackable_db.h
  3. 55
      include/utilities/utility_db.h
  4. 9
      util/ldb_cmd.cc
  5. 12
      util/ldb_cmd.h
  6. 144
      utilities/ttl/db_ttl_impl.cc
  7. 193
      utilities/ttl/db_ttl_impl.h
  8. 22
      utilities/ttl/ttl_test.cc

@ -0,0 +1,68 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <string>
#include <vector>
#include "utilities/stackable_db.h"
#include "rocksdb/db.h"
namespace rocksdb {
// Database with TTL support.
//
// USE-CASES:
// This API should be used to open the db when key-values inserted are
// meant to be removed from the db in a non-strict 'ttl' amount of time
// Therefore, this guarantees that key-values inserted will remain in the
// db for >= ttl amount of time and the db will make efforts to remove the
// key-values as soon as possible after ttl seconds of their insertion.
//
// BEHAVIOUR:
// TTL is accepted in seconds
// (int32_t)Timestamp(creation) is suffixed to values in Put internally
// Expired TTL values deleted in compaction only:(Timestamp+ttl<time_now)
// Get/Iterator may return expired entries(compaction not run on them yet)
// Different TTL may be used during different Opens
// Example: Open1 at t=0 with ttl=4 and insert k1,k2, close at t=2
// Open2 at t=3 with ttl=5. Now k1,k2 should be deleted at t>=5
// read_only=true opens in the usual read-only mode. Compactions will not be
// triggered(neither manual nor automatic), so no expired entries removed
//
// CONSTRAINTS:
// Not specifying/passing or non-positive TTL behaves like TTL = infinity
//
// !!!WARNING!!!:
// Calling DB::Open directly to re-open a db created by this API will get
// corrupt values(timestamp suffixed) and no ttl effect will be there
// during the second Open, so use this API consistently to open the db
// Be careful when passing ttl with a small positive value because the
// whole database may be deleted in a small amount of time
class DBWithTTL : public StackableDB {
public:
virtual Status CreateColumnFamilyWithTtl(
const ColumnFamilyOptions& options, const std::string& column_family_name,
ColumnFamilyHandle** handle, int ttl) = 0;
static Status Open(const Options& options, const std::string& dbname,
DBWithTTL** dbptr, int32_t ttl = 0,
bool read_only = false);
static Status Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
DBWithTTL** dbptr, std::vector<int32_t> ttls,
bool read_only = false);
protected:
explicit DBWithTTL(DB* db) : StackableDB(db) {}
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -21,6 +21,16 @@ class StackableDB : public DB {
return db_;
}
virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) {
return db_->CreateColumnFamily(options, column_family_name, handle);
}
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) {
return db_->DropColumnFamily(column_family);
}
using DB::Put;
virtual Status Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,

@ -8,55 +8,22 @@
#include <string>
#include "utilities/stackable_db.h"
#include "utilities/db_ttl.h"
#include "rocksdb/db.h"
namespace rocksdb {
// This class contains APIs to open rocksdb with specific support eg. TTL
// Please don't use this class. It's deprecated
class UtilityDB {
public:
// Open the database with TTL support.
//
// USE-CASES:
// This API should be used to open the db when key-values inserted are
// meant to be removed from the db in a non-strict 'ttl' amount of time
// Therefore, this guarantees that key-values inserted will remain in the
// db for >= ttl amount of time and the db will make efforts to remove the
// key-values as soon as possible after ttl seconds of their insertion.
//
// BEHAVIOUR:
// TTL is accepted in seconds
// (int32_t)Timestamp(creation) is suffixed to values in Put internally
// Expired TTL values deleted in compaction only:(Timestamp+ttl<time_now)
// Get/Iterator may return expired entries(compaction not run on them yet)
// Different TTL may be used during different Opens
// Example: Open1 at t=0 with ttl=4 and insert k1,k2, close at t=2
// Open2 at t=3 with ttl=5. Now k1,k2 should be deleted at t>=5
// read_only=true opens in the usual read-only mode. Compactions will not be
// triggered(neither manual nor automatic), so no expired entries removed
//
// CONSTRAINTS:
// Not specifying/passing or non-positive TTL behaves like TTL = infinity
//
// !!!WARNING!!!:
// Calling DB::Open directly to re-open a db created by this API will get
// corrupt values(timestamp suffixed) and no ttl effect will be there
// during the second Open, so use this API consistently to open the db
// Be careful when passing ttl with a small positive value because the
// whole database may be deleted in a small amount of time
static Status OpenTtlDB(const Options& options,
const std::string& name,
StackableDB** dbptr,
int32_t ttl = 0,
bool read_only = false);
// OpenTtlDB with column family support
static Status OpenTtlDB(
const DBOptions& db_options, const std::string& name,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, StackableDB** dbptr,
std::vector<int32_t> ttls, bool read_only = false);
public:
// This function is here only for backwards compatibility. Please use the
// functions defined in DBWithTTl (utilities/db_ttl.h)
// (deprecated)
__attribute__((deprecated)) static Status OpenTtlDB(const Options& options,
const std::string& name,
StackableDB** dbptr,
int32_t ttl = 0,
bool read_only = false);
};
} // namespace rocksdb

@ -14,6 +14,7 @@
#include "rocksdb/write_batch.h"
#include "rocksdb/cache.h"
#include "util/coding.h"
#include "utilities/ttl/db_ttl_impl.h"
#include <ctime>
#include <dirent.h>
@ -909,11 +910,11 @@ void DBDumperCommand::DoCommand() {
int max_keys = max_keys_;
int ttl_start;
if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) {
ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time
ttl_start = DBWithTTLImpl::kMinTimestamp; // TTL introduction time
}
int ttl_end;
if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) {
ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature
ttl_end = DBWithTTLImpl::kMaxTimestamp; // Max time allowed by TTL feature
}
if (ttl_end < ttl_start) {
fprintf(stderr, "Error: End time can't be less than start time\n");
@ -1600,11 +1601,11 @@ void ScanCommand::DoCommand() {
}
int ttl_start;
if (!ParseIntOption(option_map_, ARG_TTL_START, ttl_start, exec_state_)) {
ttl_start = DBWithTTL::kMinTimestamp; // TTL introduction time
ttl_start = DBWithTTLImpl::kMinTimestamp; // TTL introduction time
}
int ttl_end;
if (!ParseIntOption(option_map_, ARG_TTL_END, ttl_end, exec_state_)) {
ttl_end = DBWithTTL::kMaxTimestamp; // Max time allowed by TTL feature
ttl_end = DBWithTTLImpl::kMaxTimestamp; // Max time allowed by TTL feature
}
if (ttl_end < ttl_start) {
fprintf(stderr, "Error: End time can't be less than start time\n");

@ -19,8 +19,8 @@
#include "util/logging.h"
#include "util/ldb_cmd_execute_result.h"
#include "util/string_util.h"
#include "utilities/utility_db.h"
#include "utilities/ttl/db_ttl.h"
#include "utilities/db_ttl.h"
#include "utilities/ttl/db_ttl_impl.h"
using std::string;
using std::map;
@ -149,7 +149,7 @@ protected:
LDBCommandExecuteResult exec_state_;
string db_path_;
DB* db_;
StackableDB* sdb_;
DBWithTTL* db_ttl_;
/**
* true implies that this command can work if the db is opened in read-only
@ -217,11 +217,11 @@ protected:
Status st;
if (is_db_ttl_) {
if (is_read_only_) {
st = UtilityDB::OpenTtlDB(opt, db_path_, &sdb_, 0, true);
st = DBWithTTL::Open(opt, db_path_, &db_ttl_, 0, true);
} else {
st = UtilityDB::OpenTtlDB(opt, db_path_, &sdb_);
st = DBWithTTL::Open(opt, db_path_, &db_ttl_);
}
db_ = sdb_;
db_ = db_ttl_;
} else if (is_read_only_) {
st = DB::OpenForReadOnly(opt, db_path_, &db_);
} else {

@ -3,16 +3,18 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#include "utilities/ttl/db_ttl.h"
#include "utilities/ttl/db_ttl_impl.h"
#include "utilities/db_ttl.h"
#include "db/filename.h"
#include "db/write_batch_internal.h"
#include "util/coding.h"
#include "include/rocksdb/env.h"
#include "include/rocksdb/iterator.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
namespace rocksdb {
void DBWithTTL::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) {
void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) {
if (options->compaction_filter) {
options->compaction_filter =
new TtlCompactionFilter(ttl, options->compaction_filter);
@ -28,19 +30,25 @@ void DBWithTTL::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) {
}
}
// Open the db inside DBWithTTL because options needs pointer to its ttl
DBWithTTL::DBWithTTL(DB* db) : StackableDB(db) {}
// Open the db inside DBWithTTLImpl because options needs pointer to its ttl
DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db) {}
DBWithTTLImpl::~DBWithTTLImpl() { delete GetOptions().compaction_filter; }
DBWithTTL::~DBWithTTL() {
delete GetOptions().compaction_filter;
Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
StackableDB** dbptr, int32_t ttl, bool read_only) {
DBWithTTL* db;
Status s = DBWithTTL::Open(options, dbname, &db, ttl, read_only);
if (s.ok()) {
*dbptr = db;
} else {
*dbptr = nullptr;
}
return s;
}
Status UtilityDB::OpenTtlDB(
const Options& options,
const std::string& dbname,
StackableDB** dbptr,
int32_t ttl,
bool read_only) {
Status DBWithTTL::Open(const Options& options, const std::string& dbname,
DBWithTTL** dbptr, int32_t ttl, bool read_only) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
@ -48,8 +56,8 @@ Status UtilityDB::OpenTtlDB(
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<ColumnFamilyHandle*> handles;
Status s = UtilityDB::OpenTtlDB(db_options, dbname, column_families, &handles,
dbptr, {ttl}, read_only);
Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
dbptr, {ttl}, read_only);
if (s.ok()) {
assert(handles.size() == 1);
// i can delete the handle since DBImpl is always holding a reference to
@ -59,10 +67,10 @@ Status UtilityDB::OpenTtlDB(
return s;
}
Status UtilityDB::OpenTtlDB(
Status DBWithTTL::Open(
const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, StackableDB** dbptr,
std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
std::vector<int32_t> ttls, bool read_only) {
if (ttls.size() != column_families.size()) {
@ -73,7 +81,8 @@ Status UtilityDB::OpenTtlDB(
std::vector<ColumnFamilyDescriptor> column_families_sanitized =
column_families;
for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
DBWithTTL::SanitizeOptions(ttls[i], &column_families_sanitized[i].options);
DBWithTTLImpl::SanitizeOptions(ttls[i],
&column_families_sanitized[i].options);
}
DB* db;
@ -85,66 +94,81 @@ Status UtilityDB::OpenTtlDB(
st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
}
if (st.ok()) {
*dbptr = new DBWithTTL(db);
*dbptr = new DBWithTTLImpl(db);
} else {
*dbptr = nullptr;
}
return st;
}
Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
const ColumnFamilyOptions& options, const std::string& column_family_name,
ColumnFamilyHandle** handle, int ttl) {
ColumnFamilyOptions sanitized_options = options;
DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options);
return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
handle);
}
Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) {
return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
}
// Gives back the current time
Status DBWithTTL::GetCurrentTime(int64_t& curtime) {
return Env::Default()->GetCurrentTime(&curtime);
Status DBWithTTLImpl::GetCurrentTime(int64_t* curtime) {
return Env::Default()->GetCurrentTime(curtime);
}
// Appends the current timestamp to the string.
// Returns false if could not get the current_time, true if append succeeds
Status DBWithTTL::AppendTS(const Slice& val, std::string& val_with_ts) {
val_with_ts.reserve(kTSLength + val.size());
Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts) {
val_with_ts->reserve(kTSLength + val.size());
char ts_string[kTSLength];
int64_t curtime;
Status st = GetCurrentTime(curtime);
Status st = GetCurrentTime(&curtime);
if (!st.ok()) {
return st;
}
EncodeFixed32(ts_string, (int32_t)curtime);
val_with_ts.append(val.data(), val.size());
val_with_ts.append(ts_string, kTSLength);
val_with_ts->append(val.data(), val.size());
val_with_ts->append(ts_string, kTSLength);
return st;
}
// Returns corruption if the length of the string is lesser than timestamp, or
// timestamp refers to a time lesser than ttl-feature release time
Status DBWithTTL::SanityCheckTimestamp(const Slice& str) {
Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
if (str.size() < kTSLength) {
return Status::Corruption("Error: value's length less than timestamp's\n");
}
// Checks that TS is not lesser than kMinTimestamp
// Gaurds against corruption & normal database opened incorrectly in ttl mode
int32_t timestamp_value =
DecodeFixed32(str.data() + str.size() - kTSLength);
if (timestamp_value < kMinTimestamp){
int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
if (timestamp_value < kMinTimestamp) {
return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
}
return Status::OK();
}
// Checks if the string is stale or not according to TTl provided
bool DBWithTTL::IsStale(const Slice& value, int32_t ttl) {
if (ttl <= 0) { // Data is fresh if TTL is non-positive
bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl) {
if (ttl <= 0) { // Data is fresh if TTL is non-positive
return false;
}
int64_t curtime;
if (!GetCurrentTime(curtime).ok()) {
return false; // Treat the data as fresh if could not get current time
if (!GetCurrentTime(&curtime).ok()) {
return false; // Treat the data as fresh if could not get current time
}
int32_t timestamp_value =
DecodeFixed32(value.data() + value.size() - kTSLength);
DecodeFixed32(value.data() + value.size() - kTSLength);
return (timestamp_value + ttl) < curtime;
}
// Strips the TS from the end of the string
Status DBWithTTL::StripTS(std::string* str) {
Status DBWithTTLImpl::StripTS(std::string* str) {
Status st;
if (str->length() < kTSLength) {
return Status::Corruption("Bad timestamp in key-value");
@ -154,17 +178,17 @@ Status DBWithTTL::StripTS(std::string* str) {
return st;
}
Status DBWithTTL::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& val) {
Status DBWithTTLImpl::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& val) {
WriteBatch batch;
batch.Put(column_family, key, val);
return Write(options, &batch);
}
Status DBWithTTL::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
Status DBWithTTLImpl::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
Status st = db_->Get(options, column_family, key, value);
if (!st.ok()) {
return st;
@ -176,18 +200,18 @@ Status DBWithTTL::Get(const ReadOptions& options,
return StripTS(value);
}
std::vector<Status> DBWithTTL::MultiGet(
std::vector<Status> DBWithTTLImpl::MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
return std::vector<Status>(keys.size(),
Status::NotSupported("MultiGet not\
supported with TTL"));
return std::vector<Status>(
keys.size(), Status::NotSupported("MultiGet not supported with TTL"));
}
bool DBWithTTL::KeyMayExist(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) {
bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value,
bool* value_found) {
bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
if (ret && value != nullptr && value_found != nullptr && *value_found) {
if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
@ -197,15 +221,15 @@ bool DBWithTTL::KeyMayExist(const ReadOptions& options,
return ret;
}
Status DBWithTTL::Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
Status DBWithTTLImpl::Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
WriteBatch batch;
batch.Merge(column_family, key, value);
return Write(options, &batch);
}
Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
class Handler : public WriteBatch::Handler {
public:
WriteBatch updates_ttl;
@ -213,7 +237,7 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
std::string value_with_ts;
Status st = AppendTS(value, value_with_ts);
Status st = AppendTS(value, &value_with_ts);
if (!st.ok()) {
batch_rewrite_status = st;
} else {
@ -225,7 +249,7 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
std::string value_with_ts;
Status st = AppendTS(value, value_with_ts);
Status st = AppendTS(value, &value_with_ts);
if (!st.ok()) {
batch_rewrite_status = st;
} else {
@ -238,9 +262,7 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
return Status::OK();
}
virtual void LogData(const Slice& blob) {
updates_ttl.PutLogData(blob);
}
virtual void LogData(const Slice& blob) { updates_ttl.PutLogData(blob); }
};
Handler handler;
updates->Iterate(&handler);
@ -251,8 +273,8 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
}
}
Iterator* DBWithTTL::NewIterator(const ReadOptions& opts,
ColumnFamilyHandle* column_family) {
Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
ColumnFamilyHandle* column_family) {
return new TtlIterator(db_->NewIterator(opts, column_family));
}

@ -14,17 +14,27 @@
#include "rocksdb/compaction_filter.h"
#include "rocksdb/merge_operator.h"
#include "utilities/utility_db.h"
#include "utilities/db_ttl.h"
#include "db/db_impl.h"
namespace rocksdb {
class DBWithTTL : public StackableDB {
class DBWithTTLImpl : public DBWithTTL {
public:
static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options);
explicit DBWithTTL(DB* db);
explicit DBWithTTLImpl(DB* db);
virtual ~DBWithTTL();
virtual ~DBWithTTLImpl();
Status CreateColumnFamilyWithTtl(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle** handle,
int ttl) override;
Status CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) override;
using StackableDB::Put;
virtual Status Put(const WriteOptions& options,
@ -60,83 +70,60 @@ class DBWithTTL : public StackableDB {
virtual Iterator* NewIterator(const ReadOptions& opts,
ColumnFamilyHandle* column_family) override;
virtual DB* GetBaseDB() {
return db_;
}
virtual DB* GetBaseDB() { return db_; }
static bool IsStale(const Slice& value, int32_t ttl);
static Status AppendTS(const Slice& val, std::string& val_with_ts);
static Status AppendTS(const Slice& val, std::string* val_with_ts);
static Status SanityCheckTimestamp(const Slice& str);
static Status StripTS(std::string* str);
static Status GetCurrentTime(int64_t& curtime);
static Status GetCurrentTime(int64_t* curtime);
static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp
static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp
static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8
static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8
};
class TtlIterator : public Iterator {
public:
explicit TtlIterator(Iterator* iter)
: iter_(iter) {
assert(iter_);
}
explicit TtlIterator(Iterator* iter) : iter_(iter) { assert(iter_); }
~TtlIterator() {
delete iter_;
}
~TtlIterator() { delete iter_; }
bool Valid() const {
return iter_->Valid();
}
bool Valid() const { return iter_->Valid(); }
void SeekToFirst() {
iter_->SeekToFirst();
}
void SeekToFirst() { iter_->SeekToFirst(); }
void SeekToLast() {
iter_->SeekToLast();
}
void SeekToLast() { iter_->SeekToLast(); }
void Seek(const Slice& target) {
iter_->Seek(target);
}
void Seek(const Slice& target) { iter_->Seek(target); }
void Next() {
iter_->Next();
}
void Next() { iter_->Next(); }
void Prev() {
iter_->Prev();
}
void Prev() { iter_->Prev(); }
Slice key() const {
return iter_->key();
}
Slice key() const { return iter_->key(); }
int32_t timestamp() const {
return DecodeFixed32(
iter_->value().data() + iter_->value().size() - DBWithTTL::kTSLength);
return DecodeFixed32(iter_->value().data() + iter_->value().size() -
DBWithTTLImpl::kTSLength);
}
Slice value() const {
//TODO: handle timestamp corruption like in general iterator semantics
assert(DBWithTTL::SanityCheckTimestamp(iter_->value()).ok());
// TODO: handle timestamp corruption like in general iterator semantics
assert(DBWithTTLImpl::SanityCheckTimestamp(iter_->value()).ok());
Slice trimmed_value = iter_->value();
trimmed_value.size_ -= DBWithTTL::kTSLength;
trimmed_value.size_ -= DBWithTTLImpl::kTSLength;
return trimmed_value;
}
Status status() const {
return iter_->status();
}
Status status() const { return iter_->status(); }
private:
Iterator* iter_;
@ -146,13 +133,13 @@ class TtlCompactionFilter : public CompactionFilter {
public:
TtlCompactionFilter(
int32_t ttl,
const CompactionFilter* user_comp_filter,
std::unique_ptr<const CompactionFilter>
user_comp_filter_from_factory = nullptr)
: ttl_(ttl),
user_comp_filter_(user_comp_filter),
user_comp_filter_from_factory_(std::move(user_comp_filter_from_factory)) {
int32_t ttl, const CompactionFilter* user_comp_filter,
std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
nullptr)
: ttl_(ttl),
user_comp_filter_(user_comp_filter),
user_comp_filter_from_factory_(
std::move(user_comp_filter_from_factory)) {
// Unlike the merge operator, compaction filter is necessary for TTL, hence
// this would be called even if user doesn't specify any compaction-filter
if (!user_comp_filter_) {
@ -160,34 +147,31 @@ class TtlCompactionFilter : public CompactionFilter {
}
}
virtual bool Filter(int level,
const Slice& key,
const Slice& old_val,
std::string* new_val,
bool* value_changed) const override {
if (DBWithTTL::IsStale(old_val, ttl_)) {
virtual bool Filter(int level, const Slice& key, const Slice& old_val,
std::string* new_val, bool* value_changed) const
override {
if (DBWithTTLImpl::IsStale(old_val, ttl_)) {
return true;
}
if (user_comp_filter_ == nullptr) {
return false;
}
assert(old_val.size() >= DBWithTTL::kTSLength);
assert(old_val.size() >= DBWithTTLImpl::kTSLength);
Slice old_val_without_ts(old_val.data(),
old_val.size() - DBWithTTL::kTSLength);
old_val.size() - DBWithTTLImpl::kTSLength);
if (user_comp_filter_->Filter(level, key, old_val_without_ts, new_val,
value_changed)) {
return true;
}
if (*value_changed) {
new_val->append(old_val.data() + old_val.size() - DBWithTTL::kTSLength,
DBWithTTL::kTSLength);
new_val->append(
old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
DBWithTTLImpl::kTSLength);
}
return false;
}
virtual const char* Name() const override {
return "Delete By TTL";
}
virtual const char* Name() const override { return "Delete By TTL"; }
private:
int32_t ttl_;
@ -196,47 +180,40 @@ class TtlCompactionFilter : public CompactionFilter {
};
class TtlCompactionFilterFactory : public CompactionFilterFactory {
public:
TtlCompactionFilterFactory(
int32_t ttl,
std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
: ttl_(ttl),
user_comp_filter_factory_(comp_filter_factory) { }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) {
return std::unique_ptr<TtlCompactionFilter>(
new TtlCompactionFilter(
ttl_,
nullptr,
std::move(user_comp_filter_factory_->CreateCompactionFilter(context))
)
);
}
public:
TtlCompactionFilterFactory(
int32_t ttl, std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
: ttl_(ttl), user_comp_filter_factory_(comp_filter_factory) {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) {
return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
ttl_, nullptr,
std::move(user_comp_filter_factory_->CreateCompactionFilter(context))));
}
virtual const char* Name() const override {
return "TtlCompactionFilterFactory";
}
virtual const char* Name() const override {
return "TtlCompactionFilterFactory";
}
private:
int32_t ttl_;
std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
private:
int32_t ttl_;
std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
};
class TtlMergeOperator : public MergeOperator {
public:
explicit TtlMergeOperator(const std::shared_ptr<MergeOperator> merge_op)
: user_merge_op_(merge_op) {
: user_merge_op_(merge_op) {
assert(merge_op);
}
virtual bool FullMerge(const Slice& key,
const Slice* existing_value,
virtual bool FullMerge(const Slice& key, const Slice* existing_value,
const std::deque<std::string>& operands,
std::string* new_value,
Logger* logger) const override {
const uint32_t ts_len = DBWithTTL::kTSLength;
std::string* new_value, Logger* logger) const
override {
const uint32_t ts_len = DBWithTTLImpl::kTSLength;
if (existing_value && existing_value->size() < ts_len) {
Log(logger, "Error: Could not remove timestamp from existing value.");
return false;
@ -244,7 +221,7 @@ class TtlMergeOperator : public MergeOperator {
// Extract time-stamp from each operand to be passed to user_merge_op_
std::deque<std::string> operands_without_ts;
for (const auto &operand : operands) {
for (const auto& operand : operands) {
if (operand.size() < ts_len) {
Log(logger, "Error: Could not remove timestamp from operand value.");
return false;
@ -271,9 +248,10 @@ class TtlMergeOperator : public MergeOperator {
// Augment the *new_value with the ttl time-stamp
int64_t curtime;
if (!DBWithTTL::GetCurrentTime(curtime).ok()) {
Log(logger, "Error: Could not get current time to be attached internally "
"to the new value.");
if (!DBWithTTLImpl::GetCurrentTime(&curtime).ok()) {
Log(logger,
"Error: Could not get current time to be attached internally "
"to the new value.");
return false;
} else {
char ts_string[ts_len];
@ -287,7 +265,7 @@ class TtlMergeOperator : public MergeOperator {
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const
override {
const uint32_t ts_len = DBWithTTL::kTSLength;
const uint32_t ts_len = DBWithTTLImpl::kTSLength;
std::deque<Slice> operands_without_ts;
for (const auto& operand : operand_list) {
@ -309,9 +287,10 @@ class TtlMergeOperator : public MergeOperator {
// Augment the *new_value with the ttl time-stamp
int64_t curtime;
if (!DBWithTTL::GetCurrentTime(curtime).ok()) {
Log(logger, "Error: Could not get current time to be attached internally "
"to the new value.");
if (!DBWithTTLImpl::GetCurrentTime(&curtime).ok()) {
Log(logger,
"Error: Could not get current time to be attached internally "
"to the new value.");
return false;
} else {
char ts_string[ts_len];
@ -319,16 +298,12 @@ class TtlMergeOperator : public MergeOperator {
new_value->append(ts_string, ts_len);
return true;
}
}
virtual const char* Name() const override {
return "Merge By TTL";
}
virtual const char* Name() const override { return "Merge By TTL"; }
private:
std::shared_ptr<MergeOperator> user_merge_op_;
};
}
#endif // ROCKSDB_LITE

@ -4,7 +4,7 @@
#include <memory>
#include "rocksdb/compaction_filter.h"
#include "utilities/utility_db.h"
#include "utilities/db_ttl.h"
#include "util/testharness.h"
#include "util/logging.h"
#include <map>
@ -45,13 +45,13 @@ class TtlTest {
void OpenTtl() {
ASSERT_TRUE(db_ttl_ ==
nullptr); // db should be closed before opening again
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_));
ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_));
}
// Open database with TTL support when TTL provided with db_ttl_ pointer
void OpenTtl(int32_t ttl) {
ASSERT_TRUE(db_ttl_ == nullptr);
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl));
ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_, ttl));
}
// Open with TestFilter compaction filter
@ -65,7 +65,7 @@ class TtlTest {
// Open database with TTL support in read_only mode
void OpenReadOnlyTtl(int32_t ttl) {
ASSERT_TRUE(db_ttl_ == nullptr);
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl, true));
ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_, ttl, true));
}
void CloseTtl() {
@ -317,7 +317,7 @@ class TtlTest {
// Choose carefully so that Put, Gets & Compaction complete in 1 second buffer
const int64_t kSampleSize_ = 100;
std::string dbname_;
StackableDB* db_ttl_;
DBWithTTL* db_ttl_;
private:
Options options_;
@ -532,25 +532,33 @@ TEST(TtlTest, ColumnFamiliesTest) {
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(UtilityDB::OpenTtlDB(DBOptions(options), dbname_, column_families,
&handles, &db_ttl_, {2, 4}, false));
ASSERT_OK(DBWithTTL::Open(DBOptions(options), dbname_, column_families,
&handles, &db_ttl_, {2, 4}, false));
ASSERT_EQ(handles.size(), 2U);
ColumnFamilyHandle* new_handle;
ASSERT_OK(db_ttl_->CreateColumnFamilyWithTtl(options, "ttl_column_family_2",
&new_handle, 2));
handles.push_back(new_handle);
MakeKVMap(kSampleSize_);
PutValues(0, kSampleSize_, false, handles[0]);
PutValues(0, kSampleSize_, false, handles[1]);
PutValues(0, kSampleSize_, false, handles[2]);
// everything should be there after 1 second
SleepCompactCheck(1, 0, kSampleSize_, true, false, handles[0]);
SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]);
SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[2]);
// only column family 1 should be alive after 3 seconds
SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]);
SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]);
SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[2]);
// nothing should be there after 5 seconds
SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]);
SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[1]);
SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[2]);
for (auto h : handles) {
delete h;

Loading…
Cancel
Save