Support for range skips in compaction filter

Summary:
This adds the ability for compaction filter to say "drop this key-value, and also drop everything up to key x". This will cause the compaction to seek input iterator to x, without reading the data. This can make compaction much faster when large consecutive chunks of data are filtered out. See the changes in include/rocksdb/compaction_filter.h for the new API.

Along the way this diff also adds ability for compaction filter changing merge operands, similar to how it can change values; we're not going to use this feature, it just seemed easier and cleaner to implement it than to document that it's not implemented :)

The diff is not as big as it may seem, about half of the lines are a test.
Closes https://github.com/facebook/rocksdb/pull/1599

Differential Revision: D4252092

Pulled By: al13n321

fbshipit-source-id: 41e1e48
main
Mike Kolupaev 8 years ago committed by Facebook Github Bot
parent 96fcefbf1d
commit 247d0979aa
  1. 1
      HISTORY.md
  2. 4
      db/compaction_iteration_stats.h
  3. 67
      db/compaction_iterator.cc
  4. 44
      db/compaction_iterator.h
  5. 232
      db/compaction_iterator_test.cc
  6. 80
      db/db_compaction_filter_test.cc
  7. 5
      db/dbformat.cc
  8. 15
      db/dbformat.h
  9. 61
      db/merge_helper.cc
  10. 26
      db/merge_helper.h
  11. 76
      include/rocksdb/compaction_filter.h

@ -2,6 +2,7 @@
## Unreleased
### Public API Change
* Options.level0_stop_writes_trigger default value changes from 24 to 32.
* New compaction filter API: CompactionFilter::FilterV2(). Allows to drop ranges of keys.
## 5.0.0 (11/17/2016)
### Public API Change

@ -7,7 +7,11 @@
struct CompactionIterationStats {
// Compaction statistics
// Doesn't include records skipped because of
// CompactionFilter::Decision::kRemoveAndSkipUntil.
int64_t num_record_drop_user = 0;
int64_t num_record_drop_hidden = 0;
int64_t num_record_drop_obsolete = 0;
int64_t num_record_drop_range_del = 0;

@ -17,6 +17,21 @@ CompactionIterator::CompactionIterator(
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
const Compaction* compaction, const CompactionFilter* compaction_filter,
LogBuffer* log_buffer)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, env, expect_valid_internal_key,
range_del_agg,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
compaction_filter, log_buffer) {}
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter, LogBuffer* log_buffer)
: input_(input),
cmp_(cmp),
merge_helper_(merge_helper),
@ -25,7 +40,7 @@ CompactionIterator::CompactionIterator(
env_(env),
expect_valid_internal_key_(expect_valid_internal_key),
range_del_agg_(range_del_agg),
compaction_(compaction),
compaction_(std::move(compaction)),
compaction_filter_(compaction_filter),
log_buffer_(log_buffer),
merge_out_iter_(merge_helper_) {
@ -110,7 +125,7 @@ void CompactionIterator::Next() {
}
if (valid_) {
// Record that we've ouputted a record for the current key.
// Record that we've outputted a record for the current key.
has_outputted_key_ = true;
}
@ -151,6 +166,13 @@ void CompactionIterator::NextFromInput() {
iter_stats_.total_input_raw_key_bytes += key_.size();
iter_stats_.total_input_raw_value_bytes += value_.size();
// If need_skip is true, we should seek the input iterator
// to internal key skip_until and continue from there.
bool need_skip = false;
// Points either into compaction_filter_skip_until_ or into
// merge_helper_->compaction_filter_skip_until_.
Slice skip_until;
// Check whether the user key changed. After this if statement current_key_
// is a copy of the current input key (maybe converted to a delete by the
// compaction filter). ikey_.user_key is pointing to the copy.
@ -173,26 +195,41 @@ void CompactionIterator::NextFromInput() {
// number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true,
// replace the entry with a deletion marker.
bool value_changed = false;
bool to_delete = false;
CompactionFilter::Decision filter;
compaction_filter_value_.clear();
compaction_filter_skip_until_.Clear();
{
StopWatchNano timer(env_, true);
to_delete = compaction_filter_->Filter(
compaction_->level(), ikey_.user_key, value_,
&compaction_filter_value_, &value_changed);
filter = compaction_filter_->FilterV2(
compaction_->level(), ikey_.user_key,
CompactionFilter::ValueType::kValue, value_,
&compaction_filter_value_, compaction_filter_skip_until_.rep());
iter_stats_.total_filter_time +=
env_ != nullptr ? timer.ElapsedNanos() : 0;
}
if (to_delete) {
if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
cmp_->Compare(*compaction_filter_skip_until_.rep(),
ikey_.user_key) <= 0) {
// Can't skip to a key smaller than the current one.
// Keep the key as per FilterV2 documentation.
filter = CompactionFilter::Decision::kKeep;
}
if (filter == CompactionFilter::Decision::kRemove) {
// convert the current key to a delete
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (value_changed) {
} else if (filter == CompactionFilter::Decision::kChangeValue) {
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
need_skip = true;
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
kValueTypeForSeek);
skip_until = compaction_filter_skip_until_.Encode();
}
}
} else {
@ -219,7 +256,9 @@ void CompactionIterator::NextFromInput() {
? earliest_snapshot_
: findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
if (clear_and_output_next_key_) {
if (need_skip) {
// This case is handled below.
} else if (clear_and_output_next_key_) {
// In the previous iteration we encountered a single delete that we could
// not compact out. We will keep this Put, but can drop it's data.
// (See Optimization 3, below.)
@ -398,7 +437,9 @@ void CompactionIterator::NextFromInput() {
bottommost_level_);
merge_out_iter_.SeekToFirst();
if (merge_out_iter_.Valid()) {
if (merge_helper_->FilteredUntil(&skip_until)) {
need_skip = true;
} else if (merge_out_iter_.Valid()) {
// NOTE: key, value, and ikey_ refer to old entries.
// These will be correctly set below.
key_ = merge_out_iter_.key();
@ -432,6 +473,10 @@ void CompactionIterator::NextFromInput() {
valid_ = true;
}
}
if (need_skip) {
input_->Seek(skip_until);
}
}
}

@ -23,6 +23,36 @@ namespace rocksdb {
class CompactionIterator {
public:
// A wrapper around Compaction. Has a much smaller interface, only what
// CompactionIterator uses. Tests can override it.
class CompactionProxy {
public:
explicit CompactionProxy(const Compaction* compaction)
: compaction_(compaction) {}
virtual ~CompactionProxy() = default;
virtual int level(size_t compaction_input_level = 0) const {
return compaction_->level();
}
virtual bool KeyNotExistsBeyondOutputLevel(
const Slice& user_key, std::vector<size_t>* level_ptrs) const {
return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs);
}
virtual bool bottommost_level() const {
return compaction_->bottommost_level();
}
virtual int number_levels() const { return compaction_->number_levels(); }
virtual Slice GetLargestUserKey() const {
return compaction_->GetLargestUserKey();
}
protected:
CompactionProxy() = default;
private:
const Compaction* compaction_;
};
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
@ -33,6 +63,17 @@ class CompactionIterator {
const CompactionFilter* compaction_filter = nullptr,
LogBuffer* log_buffer = nullptr);
// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
bool expect_valid_internal_key,
RangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
LogBuffer* log_buffer = nullptr);
~CompactionIterator();
void ResetRecordCounts();
@ -82,7 +123,7 @@ class CompactionIterator {
Env* env_;
bool expect_valid_internal_key_;
RangeDelAggregator* range_del_agg_;
const Compaction* compaction_;
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
LogBuffer* log_buffer_;
bool bottommost_level_;
@ -130,6 +171,7 @@ class CompactionIterator {
// merge operands and then releasing them after consuming them.
PinnedIteratorsManager pinned_iters_mgr_;
std::string compaction_filter_value_;
InternalKey compaction_filter_skip_until_;
// "level_ptrs" holds indices that remember which file of an associated
// level we were last checking during the last call to compaction->
// KeyNotExistsBeyondOutputLevel(). This allows future calls to the function

@ -13,6 +13,90 @@
namespace rocksdb {
class LoggingForwardVectorIterator : public InternalIterator {
public:
struct Action {
enum class Type {
SEEK_TO_FIRST,
SEEK,
NEXT,
};
Type type;
std::string arg;
explicit Action(Type _type, std::string _arg = "")
: type(_type), arg(_arg) {}
bool operator==(const Action& rhs) const {
return std::tie(type, arg) == std::tie(rhs.type, rhs.arg);
}
};
LoggingForwardVectorIterator(const std::vector<std::string>& keys,
const std::vector<std::string>& values)
: keys_(keys), values_(values), current_(keys.size()) {
assert(keys_.size() == values_.size());
}
virtual bool Valid() const override { return current_ < keys_.size(); }
virtual void SeekToFirst() override {
log.emplace_back(Action::Type::SEEK_TO_FIRST);
current_ = 0;
}
virtual void SeekToLast() override { assert(false); }
virtual void Seek(const Slice& target) override {
log.emplace_back(Action::Type::SEEK, target.ToString());
current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) -
keys_.begin();
}
virtual void SeekForPrev(const Slice& target) override { assert(false); }
virtual void Next() override {
assert(Valid());
log.emplace_back(Action::Type::NEXT);
current_++;
}
virtual void Prev() override { assert(false); }
virtual Slice key() const override {
assert(Valid());
return Slice(keys_[current_]);
}
virtual Slice value() const override {
assert(Valid());
return Slice(values_[current_]);
}
virtual Status status() const override { return Status::OK(); }
std::vector<Action> log;
private:
std::vector<std::string> keys_;
std::vector<std::string> values_;
size_t current_;
};
class FakeCompaction : public CompactionIterator::CompactionProxy {
public:
FakeCompaction() = default;
virtual int level(size_t compaction_input_level) const { return 0; }
virtual bool KeyNotExistsBeyondOutputLevel(
const Slice& user_key, std::vector<size_t>* level_ptrs) const {
return false;
}
virtual bool bottommost_level() const { return false; }
virtual int number_levels() const { return 1; }
virtual Slice GetLargestUserKey() const {
return "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
}
};
class CompactionIteratorTest : public testing::Test {
public:
CompactionIteratorTest()
@ -22,19 +106,27 @@ class CompactionIteratorTest : public testing::Test {
const std::vector<std::string>& vs,
const std::vector<std::string>& range_del_ks,
const std::vector<std::string>& range_del_vs,
SequenceNumber last_sequence) {
SequenceNumber last_sequence,
MergeOperator* merge_op = nullptr,
CompactionFilter* filter = nullptr) {
std::unique_ptr<InternalIterator> range_del_iter(
new test::VectorIterator(range_del_ks, range_del_vs));
range_del_agg_.reset(new RangeDelAggregator(icmp_, snapshots_));
ASSERT_OK(range_del_agg_->AddTombstones(std::move(range_del_iter)));
merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, nullptr, nullptr,
std::unique_ptr<CompactionIterator::CompactionProxy> compaction;
if (filter) {
compaction.reset(new FakeCompaction());
}
merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, merge_op, filter,
nullptr, 0U, false, 0));
iter_.reset(new test::VectorIterator(ks, vs));
iter_.reset(new LoggingForwardVectorIterator(ks, vs));
iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get()));
kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get(),
std::move(compaction), filter));
}
void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); }
@ -43,7 +135,7 @@ class CompactionIteratorTest : public testing::Test {
const InternalKeyComparator icmp_;
std::vector<SequenceNumber> snapshots_;
std::unique_ptr<MergeHelper> merge_helper_;
std::unique_ptr<test::VectorIterator> iter_;
std::unique_ptr<LoggingForwardVectorIterator> iter_;
std::unique_ptr<CompactionIterator> c_iter_;
std::unique_ptr<RangeDelAggregator> range_del_agg_;
};
@ -116,6 +208,136 @@ TEST_F(CompactionIteratorTest, RangeDeletionWithSnapshots) {
ASSERT_FALSE(c_iter_->Valid());
}
TEST_F(CompactionIteratorTest, CompactionFilterSkipUntil) {
// Expect no merging attempts.
class MergeOp : public MergeOperator {
public:
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
ADD_FAILURE();
return false;
}
bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const override {
ADD_FAILURE();
return false;
}
const char* Name() const override {
return "CompactionIteratorTest.CompactionFilterSkipUntil::MergeOp";
}
};
class Filter : public CompactionFilter {
virtual Decision FilterV2(int level, const Slice& key, ValueType t,
const Slice& existing_value,
std::string* new_value,
std::string* skip_until) const {
std::string k = key.ToString();
std::string v = existing_value.ToString();
// See InitIterators() call below for the sequence of keys and their
// filtering decisions. Here we closely assert that compaction filter is
// called with the expected keys and only them, and with the right values.
if (k == "a") {
EXPECT_EQ(ValueType::kValue, t);
EXPECT_EQ("av50", v);
return Decision::kKeep;
}
if (k == "b") {
EXPECT_EQ(ValueType::kValue, t);
EXPECT_EQ("bv60", v);
*skip_until = "d+";
return Decision::kRemoveAndSkipUntil;
}
if (k == "e") {
EXPECT_EQ(ValueType::kMergeOperand, t);
EXPECT_EQ("em71", v);
return Decision::kKeep;
}
if (k == "f") {
if (v == "fm65") {
EXPECT_EQ(ValueType::kMergeOperand, t);
*skip_until = "f";
} else {
EXPECT_EQ("fm30", v);
EXPECT_EQ(ValueType::kMergeOperand, t);
*skip_until = "g+";
}
return Decision::kRemoveAndSkipUntil;
}
if (k == "h") {
EXPECT_EQ(ValueType::kValue, t);
EXPECT_EQ("hv91", v);
return Decision::kKeep;
}
if (k == "i") {
EXPECT_EQ(ValueType::kValue, t);
EXPECT_EQ("iv95", v);
*skip_until = "z";
return Decision::kRemoveAndSkipUntil;
}
ADD_FAILURE();
return Decision::kKeep;
}
const char* Name() const override {
return "CompactionIteratorTest.CompactionFilterSkipUntil::Filter";
}
};
MergeOp merge_op;
Filter filter;
InitIterators(
{test::KeyStr("a", 50, kTypeValue), // keep
test::KeyStr("a", 45, kTypeMerge),
test::KeyStr("b", 60, kTypeValue), // skip to "d+"
test::KeyStr("b", 40, kTypeValue), test::KeyStr("c", 35, kTypeValue),
test::KeyStr("d", 70, kTypeMerge),
test::KeyStr("e", 71, kTypeMerge), // keep
test::KeyStr("f", 65, kTypeMerge), // skip to "f", aka keep
test::KeyStr("f", 30, kTypeMerge), // skip to "g+"
test::KeyStr("f", 25, kTypeValue), test::KeyStr("g", 90, kTypeValue),
test::KeyStr("h", 91, kTypeValue), // keep
test::KeyStr("i", 95, kTypeValue), // skip to "z"
test::KeyStr("j", 99, kTypeValue)},
{"av50", "am45", "bv60", "bv40", "cv35", "dm70", "em71", "fm65", "fm30",
"fv25", "gv90", "hv91", "iv95", "jv99"},
{}, {}, kMaxSequenceNumber, &merge_op, &filter);
// Compaction should output just "a", "e" and "h" keys.
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("a", 50, kTypeValue), c_iter_->key().ToString());
ASSERT_EQ("av50", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("e", 71, kTypeMerge), c_iter_->key().ToString());
ASSERT_EQ("em71", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("h", 91, kTypeValue), c_iter_->key().ToString());
ASSERT_EQ("hv91", c_iter_->value().ToString());
c_iter_->Next();
ASSERT_FALSE(c_iter_->Valid());
// Check that the compaction iterator did the correct sequence of calls on
// the underlying iterator.
using A = LoggingForwardVectorIterator::Action;
using T = A::Type;
std::vector<A> expected_actions = {
A(T::SEEK_TO_FIRST),
A(T::NEXT),
A(T::NEXT),
A(T::SEEK, test::KeyStr("d+", kMaxSequenceNumber, kValueTypeForSeek)),
A(T::NEXT),
A(T::NEXT),
A(T::SEEK, test::KeyStr("g+", kMaxSequenceNumber, kValueTypeForSeek)),
A(T::NEXT),
A(T::SEEK, test::KeyStr("z", kMaxSequenceNumber, kValueTypeForSeek))};
ASSERT_EQ(expected_actions, iter_->log);
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -13,6 +13,7 @@
namespace rocksdb {
static int cfilter_count = 0;
static int cfilter_skips = 0;
// This is a static filter used for filtering
// kvs during the compaction process.
@ -65,6 +66,30 @@ class DeleteISFilter : public CompactionFilter {
virtual const char* Name() const override { return "DeleteFilter"; }
};
// Skip x if floor(x/10) is even, use range skips. Requires that keys are
// zero-padded to length 10.
class SkipEvenFilter : public CompactionFilter {
public:
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const override {
cfilter_count++;
int i = std::stoi(key.ToString());
if (i / 10 % 2 == 0) {
char key_str[100];
snprintf(key_str, sizeof(key), "%010d", i / 10 * 10 + 10);
*skip_until = key_str;
++cfilter_skips;
return Decision::kRemoveAndSkipUntil;
}
return Decision::kKeep;
}
virtual bool IgnoreSnapshots() const override { return true; }
virtual const char* Name() const override { return "DeleteFilter"; }
};
class DelayFilter : public CompactionFilter {
public:
explicit DelayFilter(DBTestBase* d) : db_test(d) {}
@ -174,6 +199,20 @@ class DeleteISFilterFactory : public CompactionFilterFactory {
virtual const char* Name() const override { return "DeleteFilterFactory"; }
};
class SkipEvenFilterFactory : public CompactionFilterFactory {
public:
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
if (context.is_manual_compaction) {
return std::unique_ptr<CompactionFilter>(new SkipEvenFilter());
} else {
return std::unique_ptr<CompactionFilter>(nullptr);
}
}
virtual const char* Name() const override { return "SkipEvenFilterFactory"; }
};
class DelayFilterFactory : public CompactionFilterFactory {
public:
explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
@ -719,6 +758,47 @@ TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) {
}
#endif // ROCKSDB_LITE
TEST_F(DBTestCompactionFilter, SkipUntil) {
Options options = CurrentOptions();
options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>();
options.disable_auto_compactions = true;
options.create_if_missing = true;
DestroyAndReopen(options);
// Write 100K keys, these are written to a few files in L0.
for (int table = 0; table < 4; ++table) {
// Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371].
for (int i = table * 6; i < 39 + table * 11; ++i) {
char key[100];
snprintf(key, sizeof(key), "%010d", table * 100 + i);
Put(key, std::to_string(table * 1000 + i));
}
Flush();
}
cfilter_skips = 0;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Numberof skips in tables: 2, 3, 3, 3.
ASSERT_EQ(11, cfilter_skips);
for (int table = 0; table < 4; ++table) {
for (int i = table * 6; i < 39 + table * 11; ++i) {
int k = table * 100 + i;
char key[100];
snprintf(key, sizeof(key), "%010d", table * 100 + i);
auto expected = std::to_string(table * 1000 + i);
std::string val;
Status s = db_->Get(ReadOptions(), key, &val);
if (k / 10 % 2 == 0) {
ASSERT_TRUE(s.IsNotFound());
} else {
ASSERT_OK(s);
ASSERT_EQ(expected, val);
}
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -48,6 +48,11 @@ void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}
void AppendInternalKeyFooter(std::string* result, SequenceNumber s,
ValueType t) {
PutFixed64(result, PackSequenceAndType(s, t));
}
std::string ParsedInternalKey::DebugString(bool hex) const {
char buf[50];
snprintf(buf, sizeof(buf), "' @ %" PRIu64 ": %d", sequence,

@ -99,6 +99,11 @@ extern void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t);
// Append the serialization of "key" to *result.
extern void AppendInternalKey(std::string* result,
const ParsedInternalKey& key);
// Serialized internal key consists of user key followed by footer.
// This function appends the footer to *result, assuming that *result already
// contains the user key at the end.
extern void AppendInternalKeyFooter(std::string* result, SequenceNumber s,
ValueType t);
// Attempt to parse an internal key from "internal_key". On success,
// stores the parsed data in "*result", and returns true.
@ -197,6 +202,16 @@ class InternalKey {
void Clear() { rep_.clear(); }
// The underlying representation.
// Intended only to be used together with ConvertFromUserKey().
std::string* rep() { return &rep_; }
// Assuming that *rep() contains a user key, this method makes internal key
// out of it in-place. This saves a memcpy compared to Set()/SetFrom().
void ConvertFromUserKey(SequenceNumber s, ValueType t) {
AppendInternalKeyFooter(&rep_, s, t);
}
std::string DebugString(bool hex = false) const;
};

@ -83,6 +83,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
assert(HasOperator());
keys_.clear();
merge_context_.Clear();
has_compaction_filter_skip_until_ = false;
assert(user_merge_operator_);
bool first_key = true;
@ -145,7 +146,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// TODO(noetzli) If the merge operator returns false, we are currently
// (almost) silently dropping the put/delete. That's probably not what we
// want.
// want. Also if we're in compaction and it's a put, it would be nice to
// run compaction filter on it.
const Slice val = iter->value();
const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr;
std::string merge_result;
@ -185,10 +187,17 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// 1) it's included in one of the snapshots. in that case we *must* write
// it out, no matter what compaction filter says
// 2) it's not filtered by a compaction filter
if ((ikey.sequence <= latest_snapshot_ ||
!FilterMerge(orig_ikey.user_key, value_slice)) &&
(range_del_agg == nullptr ||
!range_del_agg->ShouldDelete(iter->key()))) {
CompactionFilter::Decision filter =
ikey.sequence <= latest_snapshot_
? CompactionFilter::Decision::kKeep
: FilterMerge(orig_ikey.user_key, value_slice);
if (range_del_agg != nullptr &&
range_del_agg->ShouldDelete(iter->key()) &&
filter != CompactionFilter::Decision::kRemoveAndSkipUntil) {
filter = CompactionFilter::Decision::kRemove;
}
if (filter == CompactionFilter::Decision::kKeep ||
filter == CompactionFilter::Decision::kChangeValue) {
if (original_key_is_iter) {
// this is just an optimization that saves us one memcpy
keys_.push_front(std::move(original_key));
@ -200,8 +209,21 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// original_key before
ParseInternalKey(keys_.back(), &orig_ikey);
}
merge_context_.PushOperand(value_slice,
iter->IsValuePinned() /* operand_pinned */);
if (filter == CompactionFilter::Decision::kKeep) {
merge_context_.PushOperand(
value_slice, iter->IsValuePinned() /* operand_pinned */);
} else { // kChangeValue
// Compaction filter asked us to change the operand from value_slice
// to compaction_filter_value_.
merge_context_.PushOperand(compaction_filter_value_, false);
}
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
// Compaction filter asked us to remove this key altogether
// (not just this operand), along with some keys following it.
keys_.clear();
merge_context_.Clear();
has_compaction_filter_skip_until_ = true;
return Status::OK();
}
}
}
@ -305,17 +327,32 @@ void MergeOutputIterator::Next() {
++it_values_;
}
bool MergeHelper::FilterMerge(const Slice& user_key, const Slice& value_slice) {
CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key,
const Slice& value_slice) {
if (compaction_filter_ == nullptr) {
return false;
return CompactionFilter::Decision::kKeep;
}
if (stats_ != nullptr) {
filter_timer_.Start();
}
bool to_delete =
compaction_filter_->FilterMergeOperand(level_, user_key, value_slice);
compaction_filter_value_.clear();
compaction_filter_skip_until_.Clear();
auto ret = compaction_filter_->FilterV2(
level_, user_key, CompactionFilter::ValueType::kMergeOperand, value_slice,
&compaction_filter_value_, compaction_filter_skip_until_.rep());
if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) {
if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(),
user_key) <= 0) {
// Invalid skip_until returned from compaction filter.
// Keep the key as per FilterV2 documentation.
ret = CompactionFilter::Decision::kKeep;
} else {
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
kValueTypeForSeek);
}
}
total_filter_time_ += filter_timer_.ElapsedNanosSafe();
return to_delete;
return ret;
}
} // namespace rocksdb

@ -69,6 +69,7 @@ class MergeHelper {
// - a Put/Delete,
// - a different user key,
// - a specific sequence number (snapshot boundary),
// - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
// or - the end of iteration
// iter: (IN) points to the first merge type entry
// (OUT) points to the first entry not included in the merge process
@ -92,8 +93,11 @@ class MergeHelper {
const bool at_bottom = false);
// Filters a merge operand using the compaction filter specified
// in the constructor. Returns true if the operand should be filtered out.
bool FilterMerge(const Slice& user_key, const Slice& value_slice);
// in the constructor. Returns the decision that the filter made.
// Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
// optional outputs of compaction filter.
CompactionFilter::Decision FilterMerge(const Slice& user_key,
const Slice& value_slice);
// Query the merge result
// These are valid until the next MergeUntil call
@ -127,6 +131,20 @@ class MergeHelper {
uint64_t TotalFilterTime() const { return total_filter_time_; }
bool HasOperator() const { return user_merge_operator_ != nullptr; }
// If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
// return true and fill *until with the key to which we should skip.
// If true, keys() and values() are empty.
bool FilteredUntil(Slice* skip_until) const {
if (!has_compaction_filter_skip_until_) {
return false;
}
assert(compaction_filter_ != nullptr);
assert(skip_until != nullptr);
assert(compaction_filter_skip_until_.Valid());
*skip_until = compaction_filter_skip_until_.Encode();
return true;
}
private:
Env* env_;
const Comparator* user_comparator_;
@ -149,6 +167,10 @@ class MergeHelper {
StopWatchNano filter_timer_;
uint64_t total_filter_time_;
Statistics* stats_;
bool has_compaction_filter_skip_until_ = false;
std::string compaction_filter_value_;
InternalKey compaction_filter_skip_until_;
};
// MergeOutputIterator can be used to iterate over the result of a merge.

@ -9,6 +9,7 @@
#ifndef STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_
#define STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_
#include <cassert>
#include <memory>
#include <string>
#include <vector>
@ -32,6 +33,18 @@ struct CompactionFilterContext {
class CompactionFilter {
public:
enum ValueType {
kValue,
kMergeOperand,
};
enum class Decision {
kKeep,
kRemove,
kChangeValue,
kRemoveAndSkipUntil,
};
// Context information of a compaction run
struct Context {
// Does this compaction run include all data files
@ -84,11 +97,10 @@ class CompactionFilter {
// The last paragraph is not true if you set max_subcompactions to more than
// 1. In that case, subcompaction from multiple threads may call a single
// CompactionFilter concurrently.
virtual bool Filter(int level,
const Slice& key,
const Slice& existing_value,
std::string* new_value,
bool* value_changed) const = 0;
virtual bool Filter(int level, const Slice& key, const Slice& existing_value,
std::string* new_value, bool* value_changed) const {
return false;
}
// The compaction process invokes this method on every merge operand. If this
// method returns true, the merge operand will be ignored and not written out
@ -104,6 +116,60 @@ class CompactionFilter {
return false;
}
// An extended API. Called for both values and merge operands.
// Allows changing value and skipping ranges of keys.
// The default implementation uses Filter() and FilterMergeOperand().
// If you're overriding this method, no need to override the other two.
// `value_type` indicates whether this key-value corresponds to a normal
// value (e.g. written with Put()) or a merge operand (written with Merge()).
//
// Possible return values:
// * kKeep - keep the key-value pair.
// * kRemove - remove the key-value pair or merge operand.
// * kChangeValue - keep the key and change the value/operand to *new_value.
// * kRemoveAndSkipUntil - remove this key-value pair, and also remove
// all key-value pairs with key in [key, *skip_until). This range
// of keys will be skipped without reading, potentially saving some
// IO operations compared to removing the keys one by one.
//
// *skip_until <= key is treated the same as Decision::kKeep
// (since the range [key, *skip_until) is empty).
//
// The keys are skipped even if there are snapshots containing them,
// as if IgnoreSnapshots() was true; i.e. values removed
// by kRemoveAndSkipUntil can disappear from a snapshot - beware
// if you're using TransactionDB or DB::GetSnapshot().
//
// If you use kRemoveAndSkipUntil, consider also reducing
// compaction_readahead_size option.
//
// Note: If you are using a TransactionDB, it is not recommended to filter
// out or modify merge operands (ValueType::kMergeOperand).
// If a merge operation is filtered out, TransactionDB may not realize there
// is a write conflict and may allow a Transaction to Commit that should have
// failed. Instead, it is better to implement any Merge filtering inside the
// MergeOperator.
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const {
switch (value_type) {
case ValueType::kValue: {
bool value_changed = false;
bool rv = Filter(level, key, existing_value, new_value, &value_changed);
if (rv) {
return Decision::kRemove;
}
return value_changed ? Decision::kChangeValue : Decision::kKeep;
}
case ValueType::kMergeOperand: {
bool rv = FilterMergeOperand(level, key, existing_value);
return rv ? Decision::kRemove : Decision::kKeep;
}
}
assert(false);
return Decision::kKeep;
}
// By default, compaction will only call Filter() on keys written after the
// most recent call to GetSnapshot(). However, if the compaction filter
// overrides IgnoreSnapshots to make it return true, the compaction filter

Loading…
Cancel
Save