From cacd812fb27ba110d1fb261c52031af7459bfa37 Mon Sep 17 00:00:00 2001 From: Mayank Agarwal Date: Sat, 3 Aug 2013 00:40:22 -0700 Subject: [PATCH] Support user's compaction filter in TTL logic Summary: TTL uses compaction filter to purge key-values and required the user to not pass one. This diff makes it accommodating of user's compaciton filter. Added test to ttl_test Test Plan: make; ./ttl_test Reviewers: dhruba, haobo, vamsi Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D11973 --- include/utilities/utility_db.h | 1 - utilities/ttl/db_ttl.cc | 38 +++---- utilities/ttl/db_ttl.h | 62 +++++++--- utilities/ttl/ttl_test.cc | 201 ++++++++++++++++++++++++--------- 4 files changed, 211 insertions(+), 91 deletions(-) diff --git a/include/utilities/utility_db.h b/include/utilities/utility_db.h index 0bbfd7dc0..8331ccef6 100644 --- a/include/utilities/utility_db.h +++ b/include/utilities/utility_db.h @@ -34,7 +34,6 @@ class UtilityDB { // triggered(neither manual nor automatic), so no expired entries removed // // CONSTRAINTS: - // The caller must not specify any compaction-filter in options // Not specifying/passing or non-positive TTL behaves like TTL = infinity // // !!!WARNING!!!: diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index a4a7134de..308b9eb35 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -18,13 +18,17 @@ DBWithTTL::DBWithTTL(const int32_t ttl, Status& st, bool read_only) : ttl_(ttl) { - assert(options.compaction_filter == nullptr); Options options_to_open = options; - options_to_open.compaction_filter = this; + + ttl_comp_filter_.reset(new TtlCompactionFilter(ttl, + options.compaction_filter)); + options_to_open.compaction_filter = ttl_comp_filter_.get(); + if (options.merge_operator) { ttl_merge_op_.reset(new TtlMergeOperator(options.merge_operator)); options_to_open.merge_operator = ttl_merge_op_.get(); } + if (read_only) { st = DB::OpenForReadOnly(options_to_open, dbname, &db_); } else { @@ -50,21 +54,6 @@ Status UtilityDB::OpenTtlDB( return st; } -// returns true(i.e. key-value to be deleted) if its TS has expired based on ttl -bool DBWithTTL::Filter( - int level, - const Slice& key, - const Slice& old_val, - std::string* new_val, - bool* value_changed) const { - return IsStale(old_val, ttl_); -} - -const char* DBWithTTL::Name() const { - return "Delete By TTL"; -} - - // Gives back the current time Status DBWithTTL::GetCurrentTime(int32_t& curtime) { return Env::Default()->GetCurrentTime((int64_t*)&curtime); @@ -89,7 +78,7 @@ Status DBWithTTL::AppendTS(const Slice& val, std::string& val_with_ts) { // Returns corruption if the length of the string is lesser than timestamp, or // timestamp refers to a time lesser than ttl-feature release time Status DBWithTTL::SanityCheckTimestamp(const Slice& str) { - if (str.size() < (unsigned)kTSLength) { + if (str.size() < kTSLength) { return Status::Corruption("Error: value's length less than timestamp's\n"); } // Checks that TS is not lesser than kMinTimestamp @@ -110,19 +99,18 @@ bool DBWithTTL::IsStale(const Slice& value, int32_t ttl) { int32_t curtime; if (!GetCurrentTime(curtime).ok()) { return false; // Treat the data as fresh if could not get current time - } else { - int32_t timestamp_value = - DecodeFixed32(value.data() + value.size() - kTSLength); - if ((timestamp_value + ttl) < curtime) { - return true; // Data is stale - } } - return false; + int32_t timestamp_value = + DecodeFixed32(value.data() + value.size() - kTSLength); + return (timestamp_value + ttl) < curtime; } // Strips the TS from the end of the string Status DBWithTTL::StripTS(std::string* str) { Status st; + if (str->length() < kTSLength) { + return Status::Corruption("Bad timestamp in key-value"); + } // Erasing characters which hold the TS str->erase(str->length() - kTSLength, kTSLength); return st; diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 3b8ba8e95..122b34d4c 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -13,7 +13,7 @@ namespace leveldb { -class DBWithTTL : public DB, CompactionFilter { +class DBWithTTL : public DB { public: DBWithTTL(const int32_t ttl, const Options& options, @@ -84,15 +84,6 @@ class DBWithTTL : public DB, CompactionFilter { // Simulate a db crash, no elegant closing of database. void TEST_Destroy_DBWithTtl(); - // The following two methods are for CompactionFilter - virtual bool Filter(int level, - const Slice& key, - const Slice& old_val, - std::string* new_val, - bool* value_changed) const override; - - virtual const char* Name() const override; - static bool IsStale(const Slice& value, int32_t ttl); static Status AppendTS(const Slice& val, std::string& val_with_ts); @@ -103,7 +94,7 @@ class DBWithTTL : public DB, CompactionFilter { static Status GetCurrentTime(int32_t& curtime); - static const int32_t kTSLength = sizeof(int32_t); // size of timestamp + static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8 @@ -113,6 +104,7 @@ class DBWithTTL : public DB, CompactionFilter { DB* db_; int32_t ttl_; unique_ptr ttl_merge_op_; + unique_ptr ttl_comp_filter_; }; class TtlIterator : public Iterator { @@ -176,6 +168,50 @@ class TtlIterator : public Iterator { Iterator* iter_; }; +class TtlCompactionFilter : public CompactionFilter { + + public: + TtlCompactionFilter(int32_t ttl, const CompactionFilter* comp_filter) + : ttl_(ttl), + user_comp_filter_(comp_filter) { + // Unlike the merge operator, compaction filter is necessary for TTL, hence + // this would be called even if user doesn't specify any compaction-filter + } + + virtual bool Filter(int level, + const Slice& key, + const Slice& old_val, + std::string* new_val, + bool* value_changed) const override { + if (DBWithTTL::IsStale(old_val, ttl_)) { + return true; + } + if (user_comp_filter_ == nullptr) { + return false; + } + assert(old_val.size() >= DBWithTTL::kTSLength); + Slice old_val_without_ts(old_val.data(), + old_val.size() - DBWithTTL::kTSLength); + if (user_comp_filter_->Filter(level, key, old_val_without_ts, new_val, + value_changed)) { + return true; + } + if (*value_changed) { + new_val->append(old_val.data() + old_val.size() - DBWithTTL::kTSLength, + DBWithTTL::kTSLength); + } + return false; + } + + virtual const char* Name() const override { + return "Delete By TTL"; + } + + private: + int32_t ttl_; + const CompactionFilter* user_comp_filter_; +}; + class TtlMergeOperator : public MergeOperator { public: @@ -188,7 +224,7 @@ class TtlMergeOperator : public MergeOperator { const Slice* existing_value, const Slice& value, std::string* new_value, - Logger* logger) const { + Logger* logger) const override { const uint32_t& ts_len = DBWithTTL::kTSLength; if ((existing_value && existing_value->size() < ts_len) || value.size() < ts_len) { @@ -219,7 +255,7 @@ class TtlMergeOperator : public MergeOperator { } } - virtual const char* Name() const { + virtual const char* Name() const override { return "Merge By TTL"; } diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index 745042566..da037c8a7 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -2,7 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "include/utilities/utility_db.h" +#include "leveldb/compaction_filter.h" +#include "utilities/utility_db.h" #include "util/testharness.h" #include "util/logging.h" #include @@ -11,11 +12,14 @@ namespace leveldb { namespace { + typedef std::map KVMap; + enum BatchOperation { PUT = 0, DELETE = 1 }; + } class TtlTest { @@ -48,6 +52,13 @@ class TtlTest { ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl)); } + // Open with TestFilter compaction filter + void OpenTtlWithTestCompaction(int32_t ttl) { + test_comp_filter_.reset(new TestFilter(kSampleSize_, kNewValue_)); + options_.compaction_filter = test_comp_filter_.get(); + OpenTtl(ttl); + } + // Open database with TTL support in read_only mode void OpenReadOnlyTtl(int32_t ttl) { assert(db_ttl_ == nullptr); @@ -62,10 +73,19 @@ class TtlTest { // Populates and returns a kv-map void MakeKVMap(int64_t num_entries) { kvmap_.clear(); - + int digits = 1; + for (int dummy = num_entries; dummy /= 10 ; ++digits); + int digits_in_i = 1; for (int64_t i = 0; i < num_entries; i++) { std::string key = "key"; std::string value = "value"; + if (i % 10 == 0) { + digits_in_i++; + } + for(int j = digits_in_i; j < digits; j++) { + key.append("0"); + value.append("0"); + } AppendNumberTo(&key, i); AppendNumberTo(&value, i); kvmap_[key] = value; @@ -120,8 +140,10 @@ class TtlTest { // Sleeps for slp_tim then runs a manual compaction // Checks span starting from st_pos from kvmap_ in the db and // Gets should return true if check is true and false otherwise - // Also checks that value that we got is the same as inserted - void SleepCompactCheck(int slp_tim, int st_pos, int span, bool check = true) { + // Also checks that value that we got is the same as inserted; and =kNewValue + // if test_compaction_change is true + void SleepCompactCheck(int slp_tim, int st_pos, int span, bool check = true, + bool test_compaction_change = false) { assert(db_ttl_); sleep(slp_tim); ManualCompact(); @@ -132,19 +154,25 @@ class TtlTest { for (int i = 0; kv_it_ != kvmap_.end(), i < span; i++, kv_it_++) { Status s = db_ttl_->Get(ropts, kv_it_->first, &v); if (s.ok() != check) { - fprintf(stderr, "key=%s ", - kv_it_->first.c_str()); + fprintf(stderr, "key=%s ", kv_it_->first.c_str()); if (!s.ok()) { fprintf(stderr, "is absent from db but was expected to be present\n"); } else { fprintf(stderr, "is present in db but was expected to be absent\n"); } assert(false); - } else if (s.ok() && (v.compare(kv_it_->second) != 0)) { - fprintf(stderr, " value for key=%s present in database is %s but " - " should be %s\n", kv_it_->first.c_str(), v.c_str(), - kv_it_->second.c_str()); - assert(false); + } else if (s.ok()) { + if (test_compaction_change && v.compare(kNewValue_) != 0) { + fprintf(stderr, " value for key=%s present in database is %s but " + " should be %s\n", kv_it_->first.c_str(), v.c_str(), + kNewValue_.c_str()); + assert(false); + } else if (!test_compaction_change && v.compare(kv_it_->second) !=0) { + fprintf(stderr, " value for key=%s present in database is %s but " + " should be %s\n", kv_it_->first.c_str(), v.c_str(), + kv_it_->second.c_str()); + assert(false); + } } } } @@ -176,8 +204,56 @@ class TtlTest { delete dbiter; } + class TestFilter : public CompactionFilter { + public: + TestFilter(const int64_t kSampleSize, const std::string kNewValue) + : kSampleSize_(kSampleSize), + kNewValue_(kNewValue) { + } + + // Works on keys of the form "key" + // Drops key if number at the end of key is in [0, kSampleSize_/3), + // Keeps key if it is in [kSampleSize_/3, 2*kSampleSize_/3), + // Change value if it is in [2*kSampleSize_/3, kSampleSize_) + // Eg. kSampleSize_=6. Drop:key0-1...Keep:key2-3...Change:key4-5... + virtual bool Filter(int level, const Slice& key, + const Slice& value, std::string* new_value, + bool* value_changed) const override { + assert(new_value != nullptr); + + std::string search_str = "0123456789"; + std::string key_string = key.ToString(); + size_t pos = key_string.find_first_of(search_str); + int num_key_end; + if (pos != std::string::npos) { + num_key_end = stoi(key_string.substr(pos, key.size() - pos)); + } else { + return false; // Keep keys not matching the format "key" + } + + int partition = kSampleSize_ / 3; + if (num_key_end < partition) { + return true; + } else if (num_key_end < partition * 2) { + return false; + } else { + *new_value = kNewValue_; + *value_changed = true; + return false; + } + } + + virtual const char* Name() const override { + return "TestFilter"; + } + + private: + const int64_t kSampleSize_; + const std::string kNewValue_; + }; + // Choose carefully so that Put, Gets & Compaction complete in 1 second buffer - const int64_t kSampleSize = 100; + const int64_t kSampleSize_ = 100; private: std::string dbname_; @@ -185,6 +261,8 @@ class TtlTest { Options options_; KVMap kvmap_; KVMap::iterator kv_it_; + const std::string kNewValue_ = "new_value"; + unique_ptr test_comp_filter_; }; // class TtlTest // If TTL is non positive or not provided, the behaviour is TTL = infinity @@ -192,8 +270,8 @@ class TtlTest { // bunch of kvs each time. All kvs should accummulate in the db till the end // Partitions the sample-size provided into 3 sets over boundary1 and boundary2 TEST(TtlTest, NoEffect) { - MakeKVMap(kSampleSize); - int boundary1 = kSampleSize / 3; + MakeKVMap(kSampleSize_); + int boundary1 = kSampleSize_ / 3; int boundary2 = 2 * boundary1; OpenTtl(); @@ -207,135 +285,154 @@ TEST(TtlTest, NoEffect) { CloseTtl(); OpenTtl(-1); - PutValues(boundary2, kSampleSize - boundary2); //T=3: Set3 never deleted - SleepCompactCheck(1, 0, kSampleSize, true); //T=4: Sets 1,2,3 still there + PutValues(boundary2, kSampleSize_ - boundary2); //T=3: Set3 never deleted + SleepCompactCheck(1, 0, kSampleSize_, true); //T=4: Sets 1,2,3 still there CloseTtl(); } // Puts a set of values and checks its presence using Get during ttl TEST(TtlTest, PresentDuringTTL) { - MakeKVMap(kSampleSize); + MakeKVMap(kSampleSize_); OpenTtl(2); // T=0:Open the db with ttl = 2 - PutValues(0, kSampleSize); // T=0:Insert Set1. Delete at t=2 - SleepCompactCheck(1, 0, kSampleSize, true); // T=1:Set1 should still be there + PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=2 + SleepCompactCheck(1, 0, kSampleSize_, true); // T=1:Set1 should still be there CloseTtl(); } // Puts a set of values and checks its absence using Get after ttl TEST(TtlTest, AbsentAfterTTL) { - MakeKVMap(kSampleSize); + MakeKVMap(kSampleSize_); OpenTtl(1); // T=0:Open the db with ttl = 2 - PutValues(0, kSampleSize); // T=0:Insert Set1. Delete at t=2 - SleepCompactCheck(2, 0, kSampleSize, false); // T=2:Set1 should not be there + PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=2 + SleepCompactCheck(2, 0, kSampleSize_, false); // T=2:Set1 should not be there CloseTtl(); } // Resets the timestamp of a set of kvs by updating them and checks that they // are not deleted according to the old timestamp TEST(TtlTest, ResetTimestamp) { - MakeKVMap(kSampleSize); + MakeKVMap(kSampleSize_); OpenTtl(3); - PutValues(0, kSampleSize); // T=0: Insert Set1. Delete at t=3 + PutValues(0, kSampleSize_); // T=0: Insert Set1. Delete at t=3 sleep(2); // T=2 - PutValues(0, kSampleSize); // T=2: Insert Set1. Delete at t=5 - SleepCompactCheck(2, 0, kSampleSize); // T=4: Set1 should still be there + PutValues(0, kSampleSize_); // T=2: Insert Set1. Delete at t=5 + SleepCompactCheck(2, 0, kSampleSize_); // T=4: Set1 should still be there CloseTtl(); } // Similar to PresentDuringTTL but uses Iterator TEST(TtlTest, IterPresentDuringTTL) { - MakeKVMap(kSampleSize); + MakeKVMap(kSampleSize_); OpenTtl(2); - PutValues(0, kSampleSize); // T=0: Insert. Delete at t=2 - SleepCompactCheckIter(1, 0, kSampleSize); // T=1: Set should be there + PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=2 + SleepCompactCheckIter(1, 0, kSampleSize_); // T=1: Set should be there CloseTtl(); } // Similar to AbsentAfterTTL but uses Iterator TEST(TtlTest, IterAbsentAfterTTL) { - MakeKVMap(kSampleSize); + MakeKVMap(kSampleSize_); OpenTtl(1); - PutValues(0, kSampleSize); // T=0: Insert. Delete at t=1 - SleepCompactCheckIter(2, 0, kSampleSize, false); // T=2: Should not be there + PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1 + SleepCompactCheckIter(2, 0, kSampleSize_, false); // T=2: Should not be there CloseTtl(); } // Checks presence while opening the same db more than once with the same ttl // Note: The second open will open the same db TEST(TtlTest, MultiOpenSamePresent) { - MakeKVMap(kSampleSize); + MakeKVMap(kSampleSize_); OpenTtl(2); - PutValues(0, kSampleSize); // T=0: Insert. Delete at t=2 + PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=2 CloseTtl(); OpenTtl(2); // T=0. Delete at t=2 - SleepCompactCheck(1, 0, kSampleSize); // T=1: Set should be there + SleepCompactCheck(1, 0, kSampleSize_); // T=1: Set should be there CloseTtl(); } // Checks absence while opening the same db more than once with the same ttl // Note: The second open will open the same db TEST(TtlTest, MultiOpenSameAbsent) { - MakeKVMap(kSampleSize); + MakeKVMap(kSampleSize_); OpenTtl(1); - PutValues(0, kSampleSize); // T=0: Insert. Delete at t=1 + PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1 CloseTtl(); OpenTtl(1); // T=0.Delete at t=1 - SleepCompactCheck(2, 0, kSampleSize, false); // T=2: Set should not be there + SleepCompactCheck(2, 0, kSampleSize_, false); // T=2: Set should not be there CloseTtl(); } // Checks presence while opening the same db more than once with bigger ttl TEST(TtlTest, MultiOpenDifferent) { - MakeKVMap(kSampleSize); + MakeKVMap(kSampleSize_); OpenTtl(1); - PutValues(0, kSampleSize); // T=0: Insert. Delete at t=1 + PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1 CloseTtl(); OpenTtl(3); // T=0: Set deleted at t=3 - SleepCompactCheck(2, 0, kSampleSize); // T=2: Set should be there + SleepCompactCheck(2, 0, kSampleSize_); // T=2: Set should be there CloseTtl(); } // Checks presence during ttl in read_only mode TEST(TtlTest, ReadOnlyPresentForever) { - MakeKVMap(kSampleSize); + MakeKVMap(kSampleSize_); OpenTtl(1); // T=0:Open the db normally - PutValues(0, kSampleSize); // T=0:Insert Set1. Delete at t=1 + PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=1 CloseTtl(); OpenReadOnlyTtl(1); - SleepCompactCheck(2, 0, kSampleSize); // T=2:Set1 should still be there + SleepCompactCheck(2, 0, kSampleSize_); // T=2:Set1 should still be there CloseTtl(); } // Checks whether WriteBatch works well with TTL // Puts all kvs in kvmap_ in a batch and writes first, then deletes first half TEST(TtlTest, WriteBatchTest) { - MakeKVMap(kSampleSize); - BatchOperation batch_ops[kSampleSize]; - for (int i = 0; i < kSampleSize; i++) { + MakeKVMap(kSampleSize_); + BatchOperation batch_ops[kSampleSize_]; + for (int i = 0; i < kSampleSize_; i++) { batch_ops[i] = PUT; } OpenTtl(2); - MakePutWriteBatch(batch_ops, kSampleSize); - for (int i = 0; i < kSampleSize / 2; i++) { + MakePutWriteBatch(batch_ops, kSampleSize_); + for (int i = 0; i < kSampleSize_ / 2; i++) { batch_ops[i] = DELETE; } - MakePutWriteBatch(batch_ops, kSampleSize / 2); - SleepCompactCheck(0, 0, kSampleSize / 2, false); - SleepCompactCheck(0, kSampleSize / 2, kSampleSize - kSampleSize / 2); + MakePutWriteBatch(batch_ops, kSampleSize_ / 2); + SleepCompactCheck(0, 0, kSampleSize_ / 2, false); + SleepCompactCheck(0, kSampleSize_ / 2, kSampleSize_ - kSampleSize_ / 2); + CloseTtl(); +} + +// Checks user's compaction filter for correctness with TTL logic +TEST(TtlTest, CompactionFilter) { + MakeKVMap(kSampleSize_); + + OpenTtlWithTestCompaction(1); + PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=1 + // T=2: TTL logic takes precedence over TestFilter:-Set1 should not be there + SleepCompactCheck(2, 0, kSampleSize_, false); + CloseTtl(); + + OpenTtlWithTestCompaction(3); + PutValues(0, kSampleSize_); // T=0:Insert Set1. + int partition = kSampleSize_ / 3; + SleepCompactCheck(1, 0, partition, false); // Part dropped + SleepCompactCheck(0, partition, partition); // Part kept + SleepCompactCheck(0, 2 * partition, partition, true, true); // Part changed CloseTtl(); }