Merge operator for ttl

Summary: Implemented a TtlMergeOperator class which inherits from MergeOperator and is TTL aware. It strips out timestamp from existing_value and attaches timestamp to new_value, calling user-provided-Merge in between.

Test Plan: make all check

Reviewers: haobo, dhruba

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D11775
main
Mayank Agarwal 12 years ago
parent 59d0b02f8b
commit c42485f67c
  1. 25
      db/merge_test.cc
  2. 30
      utilities/ttl/db_ttl.cc
  3. 58
      utilities/ttl/db_ttl.h

@ -8,20 +8,29 @@
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/merge_operator.h" #include "leveldb/merge_operator.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/db_impl.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "utilities/utility_db.h"
using namespace std; using namespace std;
using namespace leveldb; using namespace leveldb;
auto mergeOperator = MergeOperators::CreateUInt64AddOperator(); auto mergeOperator = MergeOperators::CreateUInt64AddOperator();
std::shared_ptr<DB> OpenDb() { std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false) {
DB* db; DB* db;
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.merge_operator = mergeOperator.get(); options.merge_operator = mergeOperator.get();
Status s = DB::Open(options, test::TmpDir() + "/merge_testdb", &db); Status s;
DestroyDB(dbname, Options());
if (ttl) {
cout << "Opening database with TTL\n";
s = UtilityDB::OpenTtlDB(options, test::TmpDir() + "/merge_testdbttl", &db);
} else {
s = DB::Open(options, test::TmpDir() + "/merge_testdb", &db);
}
if (!s.ok()) { if (!s.ok()) {
cerr << s.ToString() << endl; cerr << s.ToString() << endl;
assert(false); assert(false);
@ -228,9 +237,8 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) {
} }
} }
int main(int argc, char *argv[]) { void runTest(int argc, const string& dbname, const bool use_ttl = false) {
auto db = OpenDb(dbname, use_ttl);
auto db = OpenDb();
{ {
cout << "Test read-modify-write counters... \n"; cout << "Test read-modify-write counters... \n";
@ -250,5 +258,12 @@ int main(int argc, char *argv[]) {
testCounters(counters, db.get(), compact); testCounters(counters, db.get(), compact);
} }
DestroyDB(dbname, Options());
}
int main(int argc, char *argv[]) {
//TODO: Make this test like a general rocksdb unit-test
runTest(argc, "/tmp/testdb");
runTest(argc, "/tmp/testdbttl", true); // Run test on TTL database
return 0; return 0;
} }

@ -21,6 +21,10 @@ DBWithTTL::DBWithTTL(const int32_t ttl,
assert(options.compaction_filter == nullptr); assert(options.compaction_filter == nullptr);
Options options_to_open = options; Options options_to_open = options;
options_to_open.compaction_filter = this; options_to_open.compaction_filter = this;
if (options.merge_operator) {
ttl_merge_op_.reset(new TtlMergeOperator(options.merge_operator));
options_to_open.merge_operator = ttl_merge_op_.get();
}
if (read_only) { if (read_only) {
st = DB::OpenForReadOnly(options_to_open, dbname, &db_); st = DB::OpenForReadOnly(options_to_open, dbname, &db_);
} else { } else {
@ -125,15 +129,12 @@ Status DBWithTTL::StripTS(std::string* str) {
} }
Status DBWithTTL::Put( Status DBWithTTL::Put(
const WriteOptions& o, const WriteOptions& opt,
const Slice& key, const Slice& key,
const Slice& val) { const Slice& val) {
std::string value_with_ts; WriteBatch batch;
Status st = AppendTS(val, value_with_ts); batch.Put(key, val);
if (!st.ok()) { return Write(opt, &batch);
return st;
}
return db_->Put(o, key, value_with_ts);
} }
Status DBWithTTL::Get(const ReadOptions& options, Status DBWithTTL::Get(const ReadOptions& options,
@ -169,10 +170,12 @@ Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) {
return db_->Delete(wopts, key); return db_->Delete(wopts, key);
} }
Status DBWithTTL::Merge(const WriteOptions& options, Status DBWithTTL::Merge(const WriteOptions& opt,
const Slice& key, const Slice& key,
const Slice& value) { const Slice& value) {
return Status::NotSupported("Merge operation not supported."); WriteBatch batch;
batch.Merge(key, value);
return Write(opt, &batch);
} }
Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
@ -190,8 +193,13 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
} }
} }
virtual void Merge(const Slice& key, const Slice& value) { virtual void Merge(const Slice& key, const Slice& value) {
// TTL doesn't support merge operation std::string value_with_ts;
batch_rewrite_status = Status::NotSupported("TTL doesn't support Merge"); Status st = AppendTS(value, value_with_ts);
if (!st.ok()) {
batch_rewrite_status = st;
} else {
updates_ttl.Merge(key, value_with_ts);
}
} }
virtual void Delete(const Slice& key) { virtual void Delete(const Slice& key) {
updates_ttl.Delete(key); updates_ttl.Delete(key);

@ -5,8 +5,10 @@
#ifndef LEVELDB_UTILITIES_TTL_DB_TTL_H_ #ifndef LEVELDB_UTILITIES_TTL_DB_TTL_H_
#define LEVELDB_UTILITIES_TTL_DB_TTL_H_ #define LEVELDB_UTILITIES_TTL_DB_TTL_H_
#include "include/leveldb/db.h" #include "leveldb/db.h"
#include "include/leveldb/compaction_filter.h" #include "leveldb/env.h"
#include "leveldb/compaction_filter.h"
#include "leveldb/merge_operator.h"
#include "db/db_impl.h" #include "db/db_impl.h"
namespace leveldb { namespace leveldb {
@ -110,6 +112,7 @@ class DBWithTTL : public DB, CompactionFilter {
private: private:
DB* db_; DB* db_;
int32_t ttl_; int32_t ttl_;
unique_ptr<MergeOperator> ttl_merge_op_;
}; };
class TtlIterator : public Iterator { class TtlIterator : public Iterator {
@ -173,5 +176,56 @@ class TtlIterator : public Iterator {
Iterator* iter_; Iterator* iter_;
}; };
class TtlMergeOperator : public MergeOperator {
public:
explicit TtlMergeOperator(const MergeOperator* merge_op)
: user_merge_op_(merge_op) {
assert(merge_op);
}
virtual void Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const {
const uint32_t& ts_len = DBWithTTL::kTSLength;
if ((existing_value && existing_value->size() < ts_len) ||
value.size() < ts_len) {
Log(logger, "Error: Could not remove timestamp correctly from value.");
assert(false);
//TODO: Remove assert and make this function return false.
//TODO: Change Merge semantics and add a counter here
}
Slice value_without_ts(value.data(), value.size() - ts_len);
if (existing_value) {
Slice existing_value_without_ts(existing_value->data(),
existing_value->size() - ts_len);
user_merge_op_->Merge(key, &existing_value_without_ts, value_without_ts,
new_value, logger);
} else {
user_merge_op_->Merge(key, nullptr, value_without_ts, new_value, logger);
}
int32_t curtime;
if (!DBWithTTL::GetCurrentTime(curtime).ok()) {
Log(logger, "Error: Could not get current time to be attached internally "
"to the new value.");
assert(false);
//TODO: Remove assert and make this function return false.
} else {
char ts_string[ts_len];
EncodeFixed32(ts_string, curtime);
new_value->append(ts_string, ts_len);
}
}
virtual const char* Name() const {
return "Merge By TTL";
}
private:
const MergeOperator* user_merge_op_;
};
} }
#endif // LEVELDB_UTILITIES_TTL_DB_TTL_H_ #endif // LEVELDB_UTILITIES_TTL_DB_TTL_H_

Loading…
Cancel
Save