diff --git a/HISTORY.md b/HISTORY.md index b16628ad0..0fae8f800 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Public API Change * Options.level0_stop_writes_trigger default value changes from 24 to 32. +* New compaction filter API: CompactionFilter::FilterV2(). Allows to drop ranges of keys. ## 5.0.0 (11/17/2016) ### Public API Change diff --git a/db/compaction_iteration_stats.h b/db/compaction_iteration_stats.h index f11369ec1..212bac587 100644 --- a/db/compaction_iteration_stats.h +++ b/db/compaction_iteration_stats.h @@ -7,7 +7,11 @@ struct CompactionIterationStats { // Compaction statistics + + // Doesn't include records skipped because of + // CompactionFilter::Decision::kRemoveAndSkipUntil. int64_t num_record_drop_user = 0; + int64_t num_record_drop_hidden = 0; int64_t num_record_drop_obsolete = 0; int64_t num_record_drop_range_del = 0; diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index dcaf8ab0e..e46c679bf 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -17,6 +17,21 @@ CompactionIterator::CompactionIterator( bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, const Compaction* compaction, const CompactionFilter* compaction_filter, LogBuffer* log_buffer) + : CompactionIterator( + input, cmp, merge_helper, last_sequence, snapshots, + earliest_write_conflict_snapshot, env, expect_valid_internal_key, + range_del_agg, + std::unique_ptr( + compaction ? new CompactionProxy(compaction) : nullptr), + compaction_filter, log_buffer) {} + +CompactionIterator::CompactionIterator( + InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, + SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, Env* env, + bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, + std::unique_ptr compaction, + const CompactionFilter* compaction_filter, LogBuffer* log_buffer) : input_(input), cmp_(cmp), merge_helper_(merge_helper), @@ -25,7 +40,7 @@ CompactionIterator::CompactionIterator( env_(env), expect_valid_internal_key_(expect_valid_internal_key), range_del_agg_(range_del_agg), - compaction_(compaction), + compaction_(std::move(compaction)), compaction_filter_(compaction_filter), log_buffer_(log_buffer), merge_out_iter_(merge_helper_) { @@ -110,7 +125,7 @@ void CompactionIterator::Next() { } if (valid_) { - // Record that we've ouputted a record for the current key. + // Record that we've outputted a record for the current key. has_outputted_key_ = true; } @@ -151,6 +166,13 @@ void CompactionIterator::NextFromInput() { iter_stats_.total_input_raw_key_bytes += key_.size(); iter_stats_.total_input_raw_value_bytes += value_.size(); + // If need_skip is true, we should seek the input iterator + // to internal key skip_until and continue from there. + bool need_skip = false; + // Points either into compaction_filter_skip_until_ or into + // merge_helper_->compaction_filter_skip_until_. + Slice skip_until; + // Check whether the user key changed. After this if statement current_key_ // is a copy of the current input key (maybe converted to a delete by the // compaction filter). ikey_.user_key is pointing to the copy. @@ -173,26 +195,41 @@ void CompactionIterator::NextFromInput() { // number is greater than any external snapshot, then invoke the // filter. If the return value of the compaction filter is true, // replace the entry with a deletion marker. - bool value_changed = false; - bool to_delete = false; + CompactionFilter::Decision filter; compaction_filter_value_.clear(); + compaction_filter_skip_until_.Clear(); { StopWatchNano timer(env_, true); - to_delete = compaction_filter_->Filter( - compaction_->level(), ikey_.user_key, value_, - &compaction_filter_value_, &value_changed); + filter = compaction_filter_->FilterV2( + compaction_->level(), ikey_.user_key, + CompactionFilter::ValueType::kValue, value_, + &compaction_filter_value_, compaction_filter_skip_until_.rep()); iter_stats_.total_filter_time += env_ != nullptr ? timer.ElapsedNanos() : 0; } - if (to_delete) { + + if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil && + cmp_->Compare(*compaction_filter_skip_until_.rep(), + ikey_.user_key) <= 0) { + // Can't skip to a key smaller than the current one. + // Keep the key as per FilterV2 documentation. + filter = CompactionFilter::Decision::kKeep; + } + + if (filter == CompactionFilter::Decision::kRemove) { // convert the current key to a delete ikey_.type = kTypeDeletion; current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); // no value associated with delete value_.clear(); iter_stats_.num_record_drop_user++; - } else if (value_changed) { + } else if (filter == CompactionFilter::Decision::kChangeValue) { value_ = compaction_filter_value_; + } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { + need_skip = true; + compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, + kValueTypeForSeek); + skip_until = compaction_filter_skip_until_.Encode(); } } } else { @@ -219,7 +256,9 @@ void CompactionIterator::NextFromInput() { ? earliest_snapshot_ : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot); - if (clear_and_output_next_key_) { + if (need_skip) { + // This case is handled below. + } else if (clear_and_output_next_key_) { // In the previous iteration we encountered a single delete that we could // not compact out. We will keep this Put, but can drop it's data. // (See Optimization 3, below.) @@ -398,7 +437,9 @@ void CompactionIterator::NextFromInput() { bottommost_level_); merge_out_iter_.SeekToFirst(); - if (merge_out_iter_.Valid()) { + if (merge_helper_->FilteredUntil(&skip_until)) { + need_skip = true; + } else if (merge_out_iter_.Valid()) { // NOTE: key, value, and ikey_ refer to old entries. // These will be correctly set below. key_ = merge_out_iter_.key(); @@ -432,6 +473,10 @@ void CompactionIterator::NextFromInput() { valid_ = true; } } + + if (need_skip) { + input_->Seek(skip_until); + } } } diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index d22ca01b5..8677719f9 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -23,6 +23,36 @@ namespace rocksdb { class CompactionIterator { public: + // A wrapper around Compaction. Has a much smaller interface, only what + // CompactionIterator uses. Tests can override it. + class CompactionProxy { + public: + explicit CompactionProxy(const Compaction* compaction) + : compaction_(compaction) {} + + virtual ~CompactionProxy() = default; + virtual int level(size_t compaction_input_level = 0) const { + return compaction_->level(); + } + virtual bool KeyNotExistsBeyondOutputLevel( + const Slice& user_key, std::vector* level_ptrs) const { + return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs); + } + virtual bool bottommost_level() const { + return compaction_->bottommost_level(); + } + virtual int number_levels() const { return compaction_->number_levels(); } + virtual Slice GetLargestUserKey() const { + return compaction_->GetLargestUserKey(); + } + + protected: + CompactionProxy() = default; + + private: + const Compaction* compaction_; + }; + CompactionIterator(InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, @@ -33,6 +63,17 @@ class CompactionIterator { const CompactionFilter* compaction_filter = nullptr, LogBuffer* log_buffer = nullptr); + // Constructor with custom CompactionProxy, used for tests. + CompactionIterator(InternalIterator* input, const Comparator* cmp, + MergeHelper* merge_helper, SequenceNumber last_sequence, + std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, Env* env, + bool expect_valid_internal_key, + RangeDelAggregator* range_del_agg, + std::unique_ptr compaction, + const CompactionFilter* compaction_filter = nullptr, + LogBuffer* log_buffer = nullptr); + ~CompactionIterator(); void ResetRecordCounts(); @@ -82,7 +123,7 @@ class CompactionIterator { Env* env_; bool expect_valid_internal_key_; RangeDelAggregator* range_del_agg_; - const Compaction* compaction_; + std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; LogBuffer* log_buffer_; bool bottommost_level_; @@ -130,6 +171,7 @@ class CompactionIterator { // merge operands and then releasing them after consuming them. PinnedIteratorsManager pinned_iters_mgr_; std::string compaction_filter_value_; + InternalKey compaction_filter_skip_until_; // "level_ptrs" holds indices that remember which file of an associated // level we were last checking during the last call to compaction-> // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 12df6ea83..857f68a35 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -13,6 +13,90 @@ namespace rocksdb { +class LoggingForwardVectorIterator : public InternalIterator { + public: + struct Action { + enum class Type { + SEEK_TO_FIRST, + SEEK, + NEXT, + }; + + Type type; + std::string arg; + + explicit Action(Type _type, std::string _arg = "") + : type(_type), arg(_arg) {} + + bool operator==(const Action& rhs) const { + return std::tie(type, arg) == std::tie(rhs.type, rhs.arg); + } + }; + + LoggingForwardVectorIterator(const std::vector& keys, + const std::vector& values) + : keys_(keys), values_(values), current_(keys.size()) { + assert(keys_.size() == values_.size()); + } + + virtual bool Valid() const override { return current_ < keys_.size(); } + + virtual void SeekToFirst() override { + log.emplace_back(Action::Type::SEEK_TO_FIRST); + current_ = 0; + } + virtual void SeekToLast() override { assert(false); } + + virtual void Seek(const Slice& target) override { + log.emplace_back(Action::Type::SEEK, target.ToString()); + current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) - + keys_.begin(); + } + + virtual void SeekForPrev(const Slice& target) override { assert(false); } + + virtual void Next() override { + assert(Valid()); + log.emplace_back(Action::Type::NEXT); + current_++; + } + virtual void Prev() override { assert(false); } + + virtual Slice key() const override { + assert(Valid()); + return Slice(keys_[current_]); + } + virtual Slice value() const override { + assert(Valid()); + return Slice(values_[current_]); + } + + virtual Status status() const override { return Status::OK(); } + + std::vector log; + + private: + std::vector keys_; + std::vector values_; + size_t current_; +}; + +class FakeCompaction : public CompactionIterator::CompactionProxy { + public: + FakeCompaction() = default; + + virtual int level(size_t compaction_input_level) const { return 0; } + virtual bool KeyNotExistsBeyondOutputLevel( + const Slice& user_key, std::vector* level_ptrs) const { + return false; + } + virtual bool bottommost_level() const { return false; } + virtual int number_levels() const { return 1; } + virtual Slice GetLargestUserKey() const { + return "\xff\xff\xff\xff\xff\xff\xff\xff\xff"; + } +}; + class CompactionIteratorTest : public testing::Test { public: CompactionIteratorTest() @@ -22,19 +106,27 @@ class CompactionIteratorTest : public testing::Test { const std::vector& vs, const std::vector& range_del_ks, const std::vector& range_del_vs, - SequenceNumber last_sequence) { + SequenceNumber last_sequence, + MergeOperator* merge_op = nullptr, + CompactionFilter* filter = nullptr) { std::unique_ptr range_del_iter( new test::VectorIterator(range_del_ks, range_del_vs)); range_del_agg_.reset(new RangeDelAggregator(icmp_, snapshots_)); ASSERT_OK(range_del_agg_->AddTombstones(std::move(range_del_iter))); - merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, nullptr, nullptr, + std::unique_ptr compaction; + if (filter) { + compaction.reset(new FakeCompaction()); + } + + merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, merge_op, filter, nullptr, 0U, false, 0)); - iter_.reset(new test::VectorIterator(ks, vs)); + iter_.reset(new LoggingForwardVectorIterator(ks, vs)); iter_->SeekToFirst(); c_iter_.reset(new CompactionIterator( iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, - kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get())); + kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get(), + std::move(compaction), filter)); } void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); } @@ -43,7 +135,7 @@ class CompactionIteratorTest : public testing::Test { const InternalKeyComparator icmp_; std::vector snapshots_; std::unique_ptr merge_helper_; - std::unique_ptr iter_; + std::unique_ptr iter_; std::unique_ptr c_iter_; std::unique_ptr range_del_agg_; }; @@ -116,6 +208,136 @@ TEST_F(CompactionIteratorTest, RangeDeletionWithSnapshots) { ASSERT_FALSE(c_iter_->Valid()); } +TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) { + // Expect no merging attempts. + class MergeOp : public MergeOperator { + public: + bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + ADD_FAILURE(); + return false; + } + bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const override { + ADD_FAILURE(); + return false; + } + const char* Name() const override { + return "CompactionIteratorTest.CompactionFilterSkipUntil::MergeOp"; + } + }; + + class Filter : public CompactionFilter { + virtual Decision FilterV2(int level, const Slice& key, ValueType t, + const Slice& existing_value, + std::string* new_value, + std::string* skip_until) const { + std::string k = key.ToString(); + std::string v = existing_value.ToString(); + // See InitIterators() call below for the sequence of keys and their + // filtering decisions. Here we closely assert that compaction filter is + // called with the expected keys and only them, and with the right values. + if (k == "a") { + EXPECT_EQ(ValueType::kValue, t); + EXPECT_EQ("av50", v); + return Decision::kKeep; + } + if (k == "b") { + EXPECT_EQ(ValueType::kValue, t); + EXPECT_EQ("bv60", v); + *skip_until = "d+"; + return Decision::kRemoveAndSkipUntil; + } + if (k == "e") { + EXPECT_EQ(ValueType::kMergeOperand, t); + EXPECT_EQ("em71", v); + return Decision::kKeep; + } + if (k == "f") { + if (v == "fm65") { + EXPECT_EQ(ValueType::kMergeOperand, t); + *skip_until = "f"; + } else { + EXPECT_EQ("fm30", v); + EXPECT_EQ(ValueType::kMergeOperand, t); + *skip_until = "g+"; + } + return Decision::kRemoveAndSkipUntil; + } + if (k == "h") { + EXPECT_EQ(ValueType::kValue, t); + EXPECT_EQ("hv91", v); + return Decision::kKeep; + } + if (k == "i") { + EXPECT_EQ(ValueType::kValue, t); + EXPECT_EQ("iv95", v); + *skip_until = "z"; + return Decision::kRemoveAndSkipUntil; + } + ADD_FAILURE(); + return Decision::kKeep; + } + + const char* Name() const override { + return "CompactionIteratorTest.CompactionFilterSkipUntil::Filter"; + } + }; + + MergeOp merge_op; + Filter filter; + InitIterators( + {test::KeyStr("a", 50, kTypeValue), // keep + test::KeyStr("a", 45, kTypeMerge), + test::KeyStr("b", 60, kTypeValue), // skip to "d+" + test::KeyStr("b", 40, kTypeValue), test::KeyStr("c", 35, kTypeValue), + test::KeyStr("d", 70, kTypeMerge), + test::KeyStr("e", 71, kTypeMerge), // keep + test::KeyStr("f", 65, kTypeMerge), // skip to "f", aka keep + test::KeyStr("f", 30, kTypeMerge), // skip to "g+" + test::KeyStr("f", 25, kTypeValue), test::KeyStr("g", 90, kTypeValue), + test::KeyStr("h", 91, kTypeValue), // keep + test::KeyStr("i", 95, kTypeValue), // skip to "z" + test::KeyStr("j", 99, kTypeValue)}, + {"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30", + "fv25", "gv90", "hv91", "iv95", "jv99"}, + {}, {}, kMaxSequenceNumber, &merge_op, &filter); + + // Compaction should output just "a", "e" and "h" keys. + c_iter_->SeekToFirst(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("a", 50, kTypeValue), c_iter_->key().ToString()); + ASSERT_EQ("av50", c_iter_->value().ToString()); + c_iter_->Next(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("e", 71, kTypeMerge), c_iter_->key().ToString()); + ASSERT_EQ("em71", c_iter_->value().ToString()); + c_iter_->Next(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("h", 91, kTypeValue), c_iter_->key().ToString()); + ASSERT_EQ("hv91", c_iter_->value().ToString()); + c_iter_->Next(); + ASSERT_FALSE(c_iter_->Valid()); + + // Check that the compaction iterator did the correct sequence of calls on + // the underlying iterator. + using A = LoggingForwardVectorIterator::Action; + using T = A::Type; + std::vector expected_actions = { + A(T::SEEK_TO_FIRST), + A(T::NEXT), + A(T::NEXT), + A(T::SEEK, test::KeyStr("d+", kMaxSequenceNumber, kValueTypeForSeek)), + A(T::NEXT), + A(T::NEXT), + A(T::SEEK, test::KeyStr("g+", kMaxSequenceNumber, kValueTypeForSeek)), + A(T::NEXT), + A(T::SEEK, test::KeyStr("z", kMaxSequenceNumber, kValueTypeForSeek))}; + ASSERT_EQ(expected_actions, iter_->log); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_compaction_filter_test.cc b/db/db_compaction_filter_test.cc index 8b6f50bfa..db1bd3fb8 100644 --- a/db/db_compaction_filter_test.cc +++ b/db/db_compaction_filter_test.cc @@ -13,6 +13,7 @@ namespace rocksdb { static int cfilter_count = 0; +static int cfilter_skips = 0; // This is a static filter used for filtering // kvs during the compaction process. @@ -65,6 +66,30 @@ class DeleteISFilter : public CompactionFilter { virtual const char* Name() const override { return "DeleteFilter"; } }; +// Skip x if floor(x/10) is even, use range skips. Requires that keys are +// zero-padded to length 10. +class SkipEvenFilter : public CompactionFilter { + public: + virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* skip_until) const override { + cfilter_count++; + int i = std::stoi(key.ToString()); + if (i / 10 % 2 == 0) { + char key_str[100]; + snprintf(key_str, sizeof(key), "%010d", i / 10 * 10 + 10); + *skip_until = key_str; + ++cfilter_skips; + return Decision::kRemoveAndSkipUntil; + } + return Decision::kKeep; + } + + virtual bool IgnoreSnapshots() const override { return true; } + + virtual const char* Name() const override { return "DeleteFilter"; } +}; + class DelayFilter : public CompactionFilter { public: explicit DelayFilter(DBTestBase* d) : db_test(d) {} @@ -174,6 +199,20 @@ class DeleteISFilterFactory : public CompactionFilterFactory { virtual const char* Name() const override { return "DeleteFilterFactory"; } }; +class SkipEvenFilterFactory : public CompactionFilterFactory { + public: + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { + if (context.is_manual_compaction) { + return std::unique_ptr(new SkipEvenFilter()); + } else { + return std::unique_ptr(nullptr); + } + } + + virtual const char* Name() const override { return "SkipEvenFilterFactory"; } +}; + class DelayFilterFactory : public CompactionFilterFactory { public: explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {} @@ -719,6 +758,47 @@ TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) { } #endif // ROCKSDB_LITE +TEST_F(DBTestCompactionFilter, SkipUntil) { + Options options = CurrentOptions(); + options.compaction_filter_factory = std::make_shared(); + options.disable_auto_compactions = true; + options.create_if_missing = true; + DestroyAndReopen(options); + + // Write 100K keys, these are written to a few files in L0. + for (int table = 0; table < 4; ++table) { + // Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371]. + for (int i = table * 6; i < 39 + table * 11; ++i) { + char key[100]; + snprintf(key, sizeof(key), "%010d", table * 100 + i); + Put(key, std::to_string(table * 1000 + i)); + } + Flush(); + } + + cfilter_skips = 0; + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // Numberof skips in tables: 2, 3, 3, 3. + ASSERT_EQ(11, cfilter_skips); + + for (int table = 0; table < 4; ++table) { + for (int i = table * 6; i < 39 + table * 11; ++i) { + int k = table * 100 + i; + char key[100]; + snprintf(key, sizeof(key), "%010d", table * 100 + i); + auto expected = std::to_string(table * 1000 + i); + std::string val; + Status s = db_->Get(ReadOptions(), key, &val); + if (k / 10 % 2 == 0) { + ASSERT_TRUE(s.IsNotFound()); + } else { + ASSERT_OK(s); + ASSERT_EQ(expected, val); + } + } + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/dbformat.cc b/db/dbformat.cc index 5914e6dbf..baa1c9114 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -48,6 +48,11 @@ void AppendInternalKey(std::string* result, const ParsedInternalKey& key) { PutFixed64(result, PackSequenceAndType(key.sequence, key.type)); } +void AppendInternalKeyFooter(std::string* result, SequenceNumber s, + ValueType t) { + PutFixed64(result, PackSequenceAndType(s, t)); +} + std::string ParsedInternalKey::DebugString(bool hex) const { char buf[50]; snprintf(buf, sizeof(buf), "' @ %" PRIu64 ": %d", sequence, diff --git a/db/dbformat.h b/db/dbformat.h index 6aaff1390..06939d923 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -99,6 +99,11 @@ extern void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t); // Append the serialization of "key" to *result. extern void AppendInternalKey(std::string* result, const ParsedInternalKey& key); +// Serialized internal key consists of user key followed by footer. +// This function appends the footer to *result, assuming that *result already +// contains the user key at the end. +extern void AppendInternalKeyFooter(std::string* result, SequenceNumber s, + ValueType t); // Attempt to parse an internal key from "internal_key". On success, // stores the parsed data in "*result", and returns true. @@ -197,6 +202,16 @@ class InternalKey { void Clear() { rep_.clear(); } + // The underlying representation. + // Intended only to be used together with ConvertFromUserKey(). + std::string* rep() { return &rep_; } + + // Assuming that *rep() contains a user key, this method makes internal key + // out of it in-place. This saves a memcpy compared to Set()/SetFrom(). + void ConvertFromUserKey(SequenceNumber s, ValueType t) { + AppendInternalKeyFooter(&rep_, s, t); + } + std::string DebugString(bool hex = false) const; }; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index d78c9f237..5a1b047d6 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -83,6 +83,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, assert(HasOperator()); keys_.clear(); merge_context_.Clear(); + has_compaction_filter_skip_until_ = false; assert(user_merge_operator_); bool first_key = true; @@ -145,7 +146,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // TODO(noetzli) If the merge operator returns false, we are currently // (almost) silently dropping the put/delete. That's probably not what we - // want. + // want. Also if we're in compaction and it's a put, it would be nice to + // run compaction filter on it. const Slice val = iter->value(); const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr; std::string merge_result; @@ -185,10 +187,17 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // 1) it's included in one of the snapshots. in that case we *must* write // it out, no matter what compaction filter says // 2) it's not filtered by a compaction filter - if ((ikey.sequence <= latest_snapshot_ || - !FilterMerge(orig_ikey.user_key, value_slice)) && - (range_del_agg == nullptr || - !range_del_agg->ShouldDelete(iter->key()))) { + CompactionFilter::Decision filter = + ikey.sequence <= latest_snapshot_ + ? CompactionFilter::Decision::kKeep + : FilterMerge(orig_ikey.user_key, value_slice); + if (range_del_agg != nullptr && + range_del_agg->ShouldDelete(iter->key()) && + filter != CompactionFilter::Decision::kRemoveAndSkipUntil) { + filter = CompactionFilter::Decision::kRemove; + } + if (filter == CompactionFilter::Decision::kKeep || + filter == CompactionFilter::Decision::kChangeValue) { if (original_key_is_iter) { // this is just an optimization that saves us one memcpy keys_.push_front(std::move(original_key)); @@ -200,8 +209,21 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // original_key before ParseInternalKey(keys_.back(), &orig_ikey); } - merge_context_.PushOperand(value_slice, - iter->IsValuePinned() /* operand_pinned */); + if (filter == CompactionFilter::Decision::kKeep) { + merge_context_.PushOperand( + value_slice, iter->IsValuePinned() /* operand_pinned */); + } else { // kChangeValue + // Compaction filter asked us to change the operand from value_slice + // to compaction_filter_value_. + merge_context_.PushOperand(compaction_filter_value_, false); + } + } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { + // Compaction filter asked us to remove this key altogether + // (not just this operand), along with some keys following it. + keys_.clear(); + merge_context_.Clear(); + has_compaction_filter_skip_until_ = true; + return Status::OK(); } } } @@ -305,17 +327,32 @@ void MergeOutputIterator::Next() { ++it_values_; } -bool MergeHelper::FilterMerge(const Slice& user_key, const Slice& value_slice) { +CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key, + const Slice& value_slice) { if (compaction_filter_ == nullptr) { - return false; + return CompactionFilter::Decision::kKeep; } if (stats_ != nullptr) { filter_timer_.Start(); } - bool to_delete = - compaction_filter_->FilterMergeOperand(level_, user_key, value_slice); + compaction_filter_value_.clear(); + compaction_filter_skip_until_.Clear(); + auto ret = compaction_filter_->FilterV2( + level_, user_key, CompactionFilter::ValueType::kMergeOperand, value_slice, + &compaction_filter_value_, compaction_filter_skip_until_.rep()); + if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) { + if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(), + user_key) <= 0) { + // Invalid skip_until returned from compaction filter. + // Keep the key as per FilterV2 documentation. + ret = CompactionFilter::Decision::kKeep; + } else { + compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, + kValueTypeForSeek); + } + } total_filter_time_ += filter_timer_.ElapsedNanosSafe(); - return to_delete; + return ret; } } // namespace rocksdb diff --git a/db/merge_helper.h b/db/merge_helper.h index de3e3c949..00a8cf1cf 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -69,6 +69,7 @@ class MergeHelper { // - a Put/Delete, // - a different user key, // - a specific sequence number (snapshot boundary), + // - REMOVE_AND_SKIP_UNTIL returned from compaction filter, // or - the end of iteration // iter: (IN) points to the first merge type entry // (OUT) points to the first entry not included in the merge process @@ -92,8 +93,11 @@ class MergeHelper { const bool at_bottom = false); // Filters a merge operand using the compaction filter specified - // in the constructor. Returns true if the operand should be filtered out. - bool FilterMerge(const Slice& user_key, const Slice& value_slice); + // in the constructor. Returns the decision that the filter made. + // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the + // optional outputs of compaction filter. + CompactionFilter::Decision FilterMerge(const Slice& user_key, + const Slice& value_slice); // Query the merge result // These are valid until the next MergeUntil call @@ -127,6 +131,20 @@ class MergeHelper { uint64_t TotalFilterTime() const { return total_filter_time_; } bool HasOperator() const { return user_merge_operator_ != nullptr; } + // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will + // return true and fill *until with the key to which we should skip. + // If true, keys() and values() are empty. + bool FilteredUntil(Slice* skip_until) const { + if (!has_compaction_filter_skip_until_) { + return false; + } + assert(compaction_filter_ != nullptr); + assert(skip_until != nullptr); + assert(compaction_filter_skip_until_.Valid()); + *skip_until = compaction_filter_skip_until_.Encode(); + return true; + } + private: Env* env_; const Comparator* user_comparator_; @@ -149,6 +167,10 @@ class MergeHelper { StopWatchNano filter_timer_; uint64_t total_filter_time_; Statistics* stats_; + + bool has_compaction_filter_skip_until_ = false; + std::string compaction_filter_value_; + InternalKey compaction_filter_skip_until_; }; // MergeOutputIterator can be used to iterate over the result of a merge. diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index d32f37b26..ff6d6e2db 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -9,6 +9,7 @@ #ifndef STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_ #define STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_ +#include #include #include #include @@ -32,6 +33,18 @@ struct CompactionFilterContext { class CompactionFilter { public: + enum ValueType { + kValue, + kMergeOperand, + }; + + enum class Decision { + kKeep, + kRemove, + kChangeValue, + kRemoveAndSkipUntil, + }; + // Context information of a compaction run struct Context { // Does this compaction run include all data files @@ -84,11 +97,10 @@ class CompactionFilter { // The last paragraph is not true if you set max_subcompactions to more than // 1. In that case, subcompaction from multiple threads may call a single // CompactionFilter concurrently. - virtual bool Filter(int level, - const Slice& key, - const Slice& existing_value, - std::string* new_value, - bool* value_changed) const = 0; + virtual bool Filter(int level, const Slice& key, const Slice& existing_value, + std::string* new_value, bool* value_changed) const { + return false; + } // The compaction process invokes this method on every merge operand. If this // method returns true, the merge operand will be ignored and not written out @@ -104,6 +116,60 @@ class CompactionFilter { return false; } + // An extended API. Called for both values and merge operands. + // Allows changing value and skipping ranges of keys. + // The default implementation uses Filter() and FilterMergeOperand(). + // If you're overriding this method, no need to override the other two. + // `value_type` indicates whether this key-value corresponds to a normal + // value (e.g. written with Put()) or a merge operand (written with Merge()). + // + // Possible return values: + // * kKeep - keep the key-value pair. + // * kRemove - remove the key-value pair or merge operand. + // * kChangeValue - keep the key and change the value/operand to *new_value. + // * kRemoveAndSkipUntil - remove this key-value pair, and also remove + // all key-value pairs with key in [key, *skip_until). This range + // of keys will be skipped without reading, potentially saving some + // IO operations compared to removing the keys one by one. + // + // *skip_until <= key is treated the same as Decision::kKeep + // (since the range [key, *skip_until) is empty). + // + // The keys are skipped even if there are snapshots containing them, + // as if IgnoreSnapshots() was true; i.e. values removed + // by kRemoveAndSkipUntil can disappear from a snapshot - beware + // if you're using TransactionDB or DB::GetSnapshot(). + // + // If you use kRemoveAndSkipUntil, consider also reducing + // compaction_readahead_size option. + // + // Note: If you are using a TransactionDB, it is not recommended to filter + // out or modify merge operands (ValueType::kMergeOperand). + // If a merge operation is filtered out, TransactionDB may not realize there + // is a write conflict and may allow a Transaction to Commit that should have + // failed. Instead, it is better to implement any Merge filtering inside the + // MergeOperator. + virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* skip_until) const { + switch (value_type) { + case ValueType::kValue: { + bool value_changed = false; + bool rv = Filter(level, key, existing_value, new_value, &value_changed); + if (rv) { + return Decision::kRemove; + } + return value_changed ? Decision::kChangeValue : Decision::kKeep; + } + case ValueType::kMergeOperand: { + bool rv = FilterMergeOperand(level, key, existing_value); + return rv ? Decision::kRemove : Decision::kKeep; + } + } + assert(false); + return Decision::kKeep; + } + // By default, compaction will only call Filter() on keys written after the // most recent call to GetSnapshot(). However, if the compaction filter // overrides IgnoreSnapshots to make it return true, the compaction filter