diff --git a/db/merge_test.cc b/db/merge_test.cc index 47fad025d..903a824a5 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -8,20 +8,29 @@ #include "leveldb/env.h" #include "leveldb/merge_operator.h" #include "db/dbformat.h" +#include "db/db_impl.h" #include "utilities/merge_operators.h" #include "util/testharness.h" +#include "utilities/utility_db.h" using namespace std; using namespace leveldb; auto mergeOperator = MergeOperators::CreateUInt64AddOperator(); -std::shared_ptr OpenDb() { +std::shared_ptr OpenDb(const string& dbname, const bool ttl = false) { DB* db; Options options; options.create_if_missing = true; options.merge_operator = mergeOperator.get(); - Status s = DB::Open(options, test::TmpDir() + "/merge_testdb", &db); + Status s; + DestroyDB(dbname, Options()); + if (ttl) { + cout << "Opening database with TTL\n"; + s = UtilityDB::OpenTtlDB(options, test::TmpDir() + "/merge_testdbttl", &db); + } else { + s = DB::Open(options, test::TmpDir() + "/merge_testdb", &db); + } if (!s.ok()) { cerr << s.ToString() << endl; assert(false); @@ -228,9 +237,8 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) { } } -int main(int argc, char *argv[]) { - - auto db = OpenDb(); +void runTest(int argc, const string& dbname, const bool use_ttl = false) { + auto db = OpenDb(dbname, use_ttl); { cout << "Test read-modify-write counters... \n"; @@ -250,5 +258,12 @@ int main(int argc, char *argv[]) { testCounters(counters, db.get(), compact); } + DestroyDB(dbname, Options()); +} + +int main(int argc, char *argv[]) { + //TODO: Make this test like a general rocksdb unit-test + runTest(argc, "/tmp/testdb"); + runTest(argc, "/tmp/testdbttl", true); // Run test on TTL database return 0; } diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 90cc1ec4c..a4a7134de 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -21,6 +21,10 @@ DBWithTTL::DBWithTTL(const int32_t ttl, assert(options.compaction_filter == nullptr); Options options_to_open = options; options_to_open.compaction_filter = this; + if (options.merge_operator) { + ttl_merge_op_.reset(new TtlMergeOperator(options.merge_operator)); + options_to_open.merge_operator = ttl_merge_op_.get(); + } if (read_only) { st = DB::OpenForReadOnly(options_to_open, dbname, &db_); } else { @@ -125,15 +129,12 @@ Status DBWithTTL::StripTS(std::string* str) { } Status DBWithTTL::Put( - const WriteOptions& o, + const WriteOptions& opt, const Slice& key, const Slice& val) { - std::string value_with_ts; - Status st = AppendTS(val, value_with_ts); - if (!st.ok()) { - return st; - } - return db_->Put(o, key, value_with_ts); + WriteBatch batch; + batch.Put(key, val); + return Write(opt, &batch); } Status DBWithTTL::Get(const ReadOptions& options, @@ -169,10 +170,12 @@ Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) { return db_->Delete(wopts, key); } -Status DBWithTTL::Merge(const WriteOptions& options, +Status DBWithTTL::Merge(const WriteOptions& opt, const Slice& key, const Slice& value) { - return Status::NotSupported("Merge operation not supported."); + WriteBatch batch; + batch.Merge(key, value); + return Write(opt, &batch); } Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { @@ -190,8 +193,13 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { } } virtual void Merge(const Slice& key, const Slice& value) { - // TTL doesn't support merge operation - batch_rewrite_status = Status::NotSupported("TTL doesn't support Merge"); + std::string value_with_ts; + Status st = AppendTS(value, value_with_ts); + if (!st.ok()) { + batch_rewrite_status = st; + } else { + updates_ttl.Merge(key, value_with_ts); + } } virtual void Delete(const Slice& key) { updates_ttl.Delete(key); diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 078a069ba..3b8ba8e95 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -5,8 +5,10 @@ #ifndef LEVELDB_UTILITIES_TTL_DB_TTL_H_ #define LEVELDB_UTILITIES_TTL_DB_TTL_H_ -#include "include/leveldb/db.h" -#include "include/leveldb/compaction_filter.h" +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/compaction_filter.h" +#include "leveldb/merge_operator.h" #include "db/db_impl.h" namespace leveldb { @@ -110,6 +112,7 @@ class DBWithTTL : public DB, CompactionFilter { private: DB* db_; int32_t ttl_; + unique_ptr ttl_merge_op_; }; class TtlIterator : public Iterator { @@ -173,5 +176,56 @@ class TtlIterator : public Iterator { Iterator* iter_; }; +class TtlMergeOperator : public MergeOperator { + + public: + explicit TtlMergeOperator(const MergeOperator* merge_op) + : user_merge_op_(merge_op) { + assert(merge_op); + } + + virtual void Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const { + const uint32_t& ts_len = DBWithTTL::kTSLength; + if ((existing_value && existing_value->size() < ts_len) || + value.size() < ts_len) { + Log(logger, "Error: Could not remove timestamp correctly from value."); + assert(false); + //TODO: Remove assert and make this function return false. + //TODO: Change Merge semantics and add a counter here + } + Slice value_without_ts(value.data(), value.size() - ts_len); + if (existing_value) { + Slice existing_value_without_ts(existing_value->data(), + existing_value->size() - ts_len); + user_merge_op_->Merge(key, &existing_value_without_ts, value_without_ts, + new_value, logger); + } else { + user_merge_op_->Merge(key, nullptr, value_without_ts, new_value, logger); + } + int32_t curtime; + if (!DBWithTTL::GetCurrentTime(curtime).ok()) { + Log(logger, "Error: Could not get current time to be attached internally " + "to the new value."); + assert(false); + //TODO: Remove assert and make this function return false. + } else { + char ts_string[ts_len]; + EncodeFixed32(ts_string, curtime); + new_value->append(ts_string, ts_len); + } + } + + virtual const char* Name() const { + return "Merge By TTL"; + } + + private: + const MergeOperator* user_merge_op_; +}; + } #endif // LEVELDB_UTILITIES_TTL_DB_TTL_H_