diff --git a/HISTORY.md b/HISTORY.md index c7a9055b8..0cf2efc06 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,16 +5,18 @@ * Added single delete operation as a more efficient way to delete keys that have not been overwritten. * Added experimental AddFile() to DB interface that allow users to add files created by SstFileWriter into an empty Database, see include/rocksdb/sst_file_writer.h and DB::AddFile() for more info. * Added support for opening SST files with .ldb suffix which enables opening LevelDB databases. +* CompactionFilter now supports filtering of merge operands and merge results. ### Public API Changes * Added SingleDelete() to the DB interface. * Added AddFile() to DB interface. * Added SstFileWriter class. +* CompactionFilter has a new method FilterMergeOperand() that RocksDB applies to every merge operand during compaction to decide whether to filter the operand. ## 4.0.0 (9/9/2015) ### New Features * Added support for transactions. See include/rocksdb/utilities/transaction.h for more info. -* DB::GetProperty() now accept "rocksdb.aggregated-table-properties" and "rocksdb.aggregated-table-properties-at-levelN", in which case it returns aggregated table properties of the target column family, or the aggregated table properties of the specified level N if the "at-level" version is used. +* DB::GetProperty() now accepts "rocksdb.aggregated-table-properties" and "rocksdb.aggregated-table-properties-at-levelN", in which case it returns aggregated table properties of the target column family, or the aggregated table properties of the specified level N if the "at-level" version is used. * Add compression option kZSTDNotFinalCompression for people to experiment ZSTD although its format is not finalized. * We removed the need for LATEST_BACKUP file in BackupEngine. We still keep writing it when we create new backups (because of backward compatibility), but we don't read it anymore. diff --git a/db/builder.cc b/db/builder.cc index 388c94126..3d07a0f30 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -85,10 +85,11 @@ Status BuildTable( file_writer.get(), compression, compression_opts); } - MergeHelper merge(internal_comparator.user_comparator(), - ioptions.merge_operator, ioptions.info_log, + MergeHelper merge(env, internal_comparator.user_comparator(), + ioptions.merge_operator, nullptr, ioptions.info_log, ioptions.min_partial_merge_operands, - true /* internal key corruption is not ok */); + true /* internal key corruption is not ok */, + snapshots.empty() ? 0 : snapshots.back()); CompactionIterator c_iter(iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, &snapshots, env, diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index e6f1bb60a..d242291dd 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -12,16 +12,14 @@ namespace rocksdb { CompactionIterator::CompactionIterator( Iterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, - Env* env, bool expect_valid_internal_key, Statistics* stats, - Compaction* compaction, const CompactionFilter* compaction_filter, - LogBuffer* log_buffer) + Env* env, bool expect_valid_internal_key, Compaction* compaction, + const CompactionFilter* compaction_filter, LogBuffer* log_buffer) : input_(input), cmp_(cmp), merge_helper_(merge_helper), snapshots_(snapshots), env_(env), expect_valid_internal_key_(expect_valid_internal_key), - stats_(stats), compaction_(compaction), compaction_filter_(compaction_filter), log_buffer_(log_buffer), @@ -277,24 +275,30 @@ void CompactionIterator::NextFromInput() { // have hit (A) // We encapsulate the merge related state machine in a different // object to minimize change to the existing flow. - merge_helper_->MergeUntil(input_, prev_snapshot, bottommost_level_, - stats_, env_); + merge_helper_->MergeUntil(input_, prev_snapshot, bottommost_level_); merge_out_iter_.SeekToFirst(); - // NOTE: key, value, and ikey_ refer to old entries. - // These will be correctly set below. - key_ = merge_out_iter_.key(); - value_ = merge_out_iter_.value(); - bool valid_key __attribute__((__unused__)) = - ParseInternalKey(key_, &ikey_); - // MergeUntil stops when it encounters a corrupt key and does not - // include them in the result, so we expect the keys here to valid. - assert(valid_key); - // Keep current_key_ in sync. - current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); - key_ = current_key_.GetKey(); - ikey_.user_key = current_key_.GetUserKey(); - valid_ = true; + if (merge_out_iter_.Valid()) { + // NOTE: key, value, and ikey_ refer to old entries. + // These will be correctly set below. + key_ = merge_out_iter_.key(); + value_ = merge_out_iter_.value(); + bool valid_key __attribute__((__unused__)) = + ParseInternalKey(key_, &ikey_); + // MergeUntil stops when it encounters a corrupt key and does not + // include them in the result, so we expect the keys here to valid. + assert(valid_key); + // Keep current_key_ in sync. + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + key_ = current_key_.GetKey(); + ikey_.user_key = current_key_.GetUserKey(); + valid_ = true; + } else { + // all merge operands were filtered out. reset the user key, since the + // batch consumed by the merge operator should not shadow any keys + // coming after the merges + has_current_user_key_ = false; + } } else { valid_ = true; } diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 8229f2f00..da242f6aa 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -41,7 +41,6 @@ class CompactionIterator { MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, Env* env, bool expect_valid_internal_key, - Statistics* stats = nullptr, Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, LogBuffer* log_buffer = nullptr); @@ -91,7 +90,6 @@ class CompactionIterator { const std::vector* snapshots_; Env* env_; bool expect_valid_internal_key_; - Statistics* stats_; Compaction* compaction_; const CompactionFilter* compaction_filter_; LogBuffer* log_buffer_; diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 8b8a3c9a4..1148c2ac7 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -16,7 +16,8 @@ class CompactionIteratorTest : public testing::Test { void InitIterator(const std::vector& ks, const std::vector& vs, SequenceNumber last_sequence) { - merge_helper_.reset(new MergeHelper(cmp_, nullptr, nullptr, 0U, false)); + merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, nullptr, nullptr, + nullptr, 0U, false, 0)); iter_.reset(new test::VectorIterator(ks, vs)); iter_->SeekToFirst(); c_iter_.reset(new CompactionIterator(iter_.get(), cmp_, merge_helper_.get(), diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 691aa2f62..4d6656d4e 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -597,10 +597,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); - MergeHelper merge(cfd->user_comparator(), cfd->ioptions()->merge_operator, - db_options_.info_log.get(), - cfd->ioptions()->min_partial_merge_operands, - false /* internal key corruption is expected */); auto compaction_filter = cfd->ioptions()->compaction_filter; std::unique_ptr compaction_filter_from_factory = nullptr; if (compaction_filter == nullptr) { @@ -608,6 +604,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { sub_compact->compaction->CreateCompactionFilter(); compaction_filter = compaction_filter_from_factory.get(); } + MergeHelper merge( + env_, cfd->user_comparator(), cfd->ioptions()->merge_operator, + compaction_filter, db_options_.info_log.get(), + cfd->ioptions()->min_partial_merge_operands, + false /* internal key corruption is expected */, + existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), + compact_->compaction->level(), db_options_.statistics.get()); TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); @@ -624,8 +627,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { Status status; sub_compact->c_iter.reset(new CompactionIterator( input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), - &existing_snapshots_, env_, false, db_options_.statistics.get(), - sub_compact->compaction, compaction_filter)); + &existing_snapshots_, env_, false, sub_compact->compaction, + compaction_filter)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); const auto& c_iter_stats = c_iter->iter_stats(); diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 795607afe..1e57c9c36 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -182,7 +182,7 @@ class CompactionJobTest : public testing::Test { return expected_results; } - void NewDB(std::shared_ptr merge_operator = nullptr) { + void NewDB() { VersionEdit new_db; new_db.SetLogNumber(0); new_db.SetNextFile(2); @@ -207,7 +207,8 @@ class CompactionJobTest : public testing::Test { std::vector column_families; cf_options_.table_factory = mock_table_factory_; - cf_options_.merge_operator = merge_operator; + cf_options_.merge_operator = merge_op_; + cf_options_.compaction_filter = compaction_filter_.get(); column_families.emplace_back(kDefaultColumnFamilyName, cf_options_); EXPECT_OK(versions_->Recover(column_families, false)); @@ -258,10 +259,16 @@ class CompactionJobTest : public testing::Test { &mutex_)); mutex_.Unlock(); - ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U); - ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files); - ASSERT_EQ(compaction_job_stats_.num_output_files, 1U); - mock_table_factory_->AssertLatestFile(expected_results); + if (expected_results.size() == 0) { + ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U); + ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files); + ASSERT_EQ(compaction_job_stats_.num_output_files, 0U); + } else { + ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U); + ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files); + ASSERT_EQ(compaction_job_stats_.num_output_files, 1U); + mock_table_factory_->AssertLatestFile(expected_results); + } } Env* env_; @@ -279,6 +286,8 @@ class CompactionJobTest : public testing::Test { std::shared_ptr mock_table_factory_; CompactionJobStats compaction_job_stats_; ColumnFamilyData* cfd_; + std::unique_ptr compaction_filter_; + std::shared_ptr merge_op_; }; TEST_F(CompactionJobTest, Simple) { @@ -297,7 +306,7 @@ TEST_F(CompactionJobTest, SimpleCorrupted) { auto expected_results = CreateTwoFiles(true); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto files = cfd->current()->storage_info()->LevelFiles(0); - RunCompaction({ files }, expected_results); + RunCompaction({files}, expected_results); ASSERT_EQ(compaction_job_stats_.num_corrupt_keys, 400U); } @@ -317,7 +326,7 @@ TEST_F(CompactionJobTest, SimpleDeletion) { SetLastSequence(4U); auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({ files }, expected_results); + RunCompaction({files}, expected_results); } TEST_F(CompactionJobTest, SimpleOverwrite) { @@ -339,7 +348,7 @@ TEST_F(CompactionJobTest, SimpleOverwrite) { SetLastSequence(4U); auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({ files }, expected_results); + RunCompaction({files}, expected_results); } TEST_F(CompactionJobTest, SimpleNonLastLevel) { @@ -368,12 +377,12 @@ TEST_F(CompactionJobTest, SimpleNonLastLevel) { SetLastSequence(6U); auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0); auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1); - RunCompaction({ lvl0_files, lvl1_files }, expected_results); + RunCompaction({lvl0_files, lvl1_files}, expected_results); } TEST_F(CompactionJobTest, SimpleMerge) { - auto merge_op = MergeOperators::CreateStringAppendOperator(); - NewDB(merge_op); + merge_op_ = MergeOperators::CreateStringAppendOperator(); + NewDB(); auto file1 = mock::MakeMockFile({ {KeyStr("a", 5U, kTypeMerge), "5"}, @@ -392,12 +401,12 @@ TEST_F(CompactionJobTest, SimpleMerge) { SetLastSequence(5U); auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({ files }, expected_results); + RunCompaction({files}, expected_results); } TEST_F(CompactionJobTest, NonAssocMerge) { - auto merge_op = MergeOperators::CreateStringAppendTESTOperator(); - NewDB(merge_op); + merge_op_ = MergeOperators::CreateStringAppendTESTOperator(); + NewDB(); auto file1 = mock::MakeMockFile({ {KeyStr("a", 5U, kTypeMerge), "5"}, @@ -417,7 +426,104 @@ TEST_F(CompactionJobTest, NonAssocMerge) { SetLastSequence(5U); auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({ files }, expected_results); + RunCompaction({files}, expected_results); +} + +// Filters merge operands with value 10. +TEST_F(CompactionJobTest, MergeOperandFilter) { + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + compaction_filter_.reset(new test::FilterNumber(10U)); + NewDB(); + + auto file1 = mock::MakeMockFile( + {{KeyStr("a", 5U, kTypeMerge), test::EncodeInt(5U)}, + {KeyStr("a", 4U, kTypeMerge), test::EncodeInt(10U)}, // Filtered + {KeyStr("a", 3U, kTypeMerge), test::EncodeInt(3U)}}); + AddMockFile(file1); + + auto file2 = mock::MakeMockFile({ + {KeyStr("b", 2U, kTypeMerge), test::EncodeInt(2U)}, + {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)} // Filtered + }); + AddMockFile(file2); + + auto expected_results = + mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), test::EncodeInt(8U)}, + {KeyStr("b", 2U, kTypeMerge), test::EncodeInt(2U)}}); + + SetLastSequence(5U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({files}, expected_results); +} + +TEST_F(CompactionJobTest, FilterSomeMergeOperands) { + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + compaction_filter_.reset(new test::FilterNumber(10U)); + NewDB(); + + auto file1 = mock::MakeMockFile( + {{KeyStr("a", 5U, kTypeMerge), test::EncodeInt(5U)}, + {KeyStr("a", 4U, kTypeMerge), test::EncodeInt(10U)}, // Filtered + {KeyStr("a", 3U, kTypeValue), test::EncodeInt(5U)}, + {KeyStr("d", 8U, kTypeMerge), test::EncodeInt(10U)}}); + AddMockFile(file1); + + auto file2 = + mock::MakeMockFile({{KeyStr("b", 2U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("c", 2U, kTypeMerge), test::EncodeInt(3U)}, + {KeyStr("c", 1U, kTypeValue), test::EncodeInt(7U)}, + {KeyStr("d", 1U, kTypeValue), test::EncodeInt(6U)}}); + AddMockFile(file2); + + auto file3 = + mock::MakeMockFile({{KeyStr("a", 1U, kTypeMerge), test::EncodeInt(3U)}}); + AddMockFile(file3, 2); + + auto expected_results = mock::MakeMockFile({ + {KeyStr("a", 5U, kTypeValue), test::EncodeInt(10U)}, + {KeyStr("c", 2U, kTypeValue), test::EncodeInt(10U)}, + {KeyStr("d", 1U, kTypeValue), test::EncodeInt(6U)} + // b does not appear because the operands are filtered + }); + + SetLastSequence(5U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({files}, expected_results); +} + +// Test where all operands/merge results are filtered out. +TEST_F(CompactionJobTest, FilterAllMergeOperands) { + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + compaction_filter_.reset(new test::FilterNumber(10U)); + NewDB(); + + auto file1 = + mock::MakeMockFile({{KeyStr("a", 11U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("a", 10U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("a", 9U, kTypeMerge), test::EncodeInt(10U)}}); + AddMockFile(file1); + + auto file2 = + mock::MakeMockFile({{KeyStr("b", 8U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("b", 7U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("b", 6U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("b", 5U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("b", 4U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("b", 3U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("b", 2U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("c", 2U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("c", 1U, kTypeMerge), test::EncodeInt(10U)}}); + AddMockFile(file2); + + auto file3 = + mock::MakeMockFile({{KeyStr("a", 2U, kTypeMerge), test::EncodeInt(10U)}, + {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)}}); + AddMockFile(file3, 2); + + SetLastSequence(11U); + auto files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({files}, {}); } TEST_F(CompactionJobTest, SimpleSingleDelete) { diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 325fc33fa..f9cb67e9c 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -14,7 +14,6 @@ #include "rocksdb/merge_operator.h" #include "util/perf_context_imp.h" #include "util/statistics.h" -#include "util/stop_watch.h" namespace rocksdb { @@ -41,8 +40,7 @@ Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value, bool success = merge_operator->FullMerge(key, value, operands, result, logger); - RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, - env != nullptr ? timer.ElapsedNanos() : 0); + RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, timer.ElapsedNanosSafe()); if (!success) { RecordTick(statistics, NUMBER_MERGE_FAILURES); @@ -59,30 +57,33 @@ Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value, // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, - const bool at_bottom, Statistics* stats, - Env* env_) { + const bool at_bottom) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. assert(HasOperator()); keys_.clear(); operands_.clear(); - keys_.push_front(iter->key().ToString()); - operands_.push_front(iter->value().ToString()); assert(user_merge_operator_); + bool first_key = true; // We need to parse the internal key again as the parsed key is // backed by the internal key! // Assume no internal key corruption as it has been successfully parsed // by the caller. - // Invariant: keys_.back() will not change. Hence, orig_ikey is always valid. + // original_key_is_iter variable is just caching the information: + // original_key_is_iter == (iter->key().ToString() == original_key) + bool original_key_is_iter = true; + std::string original_key = iter->key().ToString(); + // Important: + // orig_ikey is backed by original_key if keys_.empty() + // orig_ikey is backed by keys_.back() if !keys_.empty() ParsedInternalKey orig_ikey; - ParseInternalKey(keys_.back(), &orig_ikey); + ParseInternalKey(original_key, &orig_ikey); Status s; bool hit_the_next_user_key = false; - for (iter->Next(); iter->Valid(); iter->Next()) { + for (; iter->Valid(); iter->Next(), original_key_is_iter = false) { ParsedInternalKey ikey; - assert(operands_.size() >= 1); // Should be invariants! assert(keys_.size() == operands_.size()); if (!ParseInternalKey(iter->key(), &ikey)) { @@ -92,6 +93,9 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, return Status::Corruption("Corrupted internal key not expected."); } break; + } else if (first_key) { + assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)); + first_key = false; } else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) { // hit a different user key, stop right here hit_the_next_user_key = true; @@ -105,16 +109,29 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, assert(IsValueType(ikey.type)); if (ikey.type != kTypeMerge) { - // Merges operands can only be used with puts and deletions, single - // deletions are not supported. - assert(ikey.type == kTypeValue || ikey.type == kTypeDeletion); + if (ikey.type != kTypeValue && ikey.type != kTypeDeletion) { + // Merges operands can only be used with puts and deletions, single + // deletions are not supported. + assert(false); + // release build doesn't have asserts, so we return error status + return Status::InvalidArgument( + " Merges operands can only be used with puts and deletions, single " + "deletions are not supported."); + } // hit a put/delete // => merge the put value or a nullptr with operands_ // => store result in operands_.back() (and update keys_.back()) // => change the entry type to kTypeValue for keys_.back() // We are done! Success! - // + + // If there are no operands, just return the Status::OK(). That will cause + // the compaction iterator to write out the key we're currently at, which + // is the put/delete we just encountered. + if (keys_.empty()) { + return Status::OK(); + } + // TODO(noetzli) If the merge operator returns false, we are currently // (almost) silently dropping the put/delete. That's probably not what we // want. @@ -122,14 +139,14 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr; std::string merge_result; s = TimedFullMerge(ikey.user_key, val_ptr, operands_, - user_merge_operator_, stats, env_, logger_, + user_merge_operator_, stats_, env_, logger_, &merge_result); // We store the result in keys_.back() and operands_.back() // if nothing went wrong (i.e.: no operand corruption on disk) if (s.ok()) { // The original key encountered - std::string original_key = std::move(keys_.back()); + original_key = std::move(keys_.back()); orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); keys_.clear(); @@ -143,18 +160,42 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, return s; } else { // hit a merge + // => if there is a compaction filter, apply it. // => merge the operand into the front of the operands_ list - // => use the user's associative merge function to determine how. + // if not filtered // => then continue because we haven't yet seen a Put/Delete. - assert(!operands_.empty()); // Should have at least one element in it - - // keep queuing keys and operands until we either meet a put / delete + // + // Keep queuing keys and operands until we either meet a put / delete // request or later did a partial merge. - keys_.push_front(iter->key().ToString()); - operands_.push_front(iter->value().ToString()); + + Slice value_slice = iter->value(); + // add an operand to the list if: + // 1) it's included in one of the snapshots. in that case we *must* write + // it out, no matter what compaction filter says + // 2) it's not filtered by a compaction filter + if (ikey.sequence <= latest_snapshot_ || + !FilterMerge(orig_ikey.user_key, value_slice)) { + if (original_key_is_iter) { + // this is just an optimization that saves us one memcpy + keys_.push_front(std::move(original_key)); + } else { + keys_.push_front(iter->key().ToString()); + } + if (keys_.size() == 1) { + // we need to re-anchor the orig_ikey because it was anchored by + // original_key before + ParseInternalKey(keys_.back(), &orig_ikey); + } + operands_.push_front(value_slice.ToString()); + } } } + if (operands_.size() == 0) { + // we filtered out all the merge operands + return Status::OK(); + } + // We are sure we have seen this key's entire history if we are at the // last level and exhausted all internal keys of this user key. // NOTE: !iter->Valid() does not necessarily mean we hit the @@ -179,11 +220,13 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, assert(operands_.size() == keys_.size()); std::string merge_result; s = TimedFullMerge(orig_ikey.user_key, nullptr, operands_, - user_merge_operator_, stats, env_, logger_, + user_merge_operator_, stats_, env_, logger_, &merge_result); if (s.ok()) { // The original key encountered - std::string original_key = std::move(keys_.back()); + // We are certain that keys_ is not empty here (see assertions couple of + // lines before). + original_key = std::move(keys_.back()); orig_ikey.type = kTypeValue; UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); keys_.clear(); @@ -205,14 +248,14 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, bool merge_success = false; std::string merge_result; { - StopWatchNano timer(env_, stats != nullptr); + StopWatchNano timer(env_, stats_ != nullptr); PERF_TIMER_GUARD(merge_operator_time_nanos); merge_success = user_merge_operator_->PartialMergeMulti( orig_ikey.user_key, std::deque(operands_.begin(), operands_.end()), &merge_result, logger_); - RecordTick(stats, MERGE_OPERATION_TOTAL_TIME, - env_ != nullptr ? timer.ElapsedNanos() : 0); + RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME, + timer.ElapsedNanosSafe()); } if (merge_success) { // Merging of operands (associative merge) was successful. @@ -236,7 +279,6 @@ MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper) void MergeOutputIterator::SeekToFirst() { const auto& keys = merge_helper_->keys(); const auto& values = merge_helper_->values(); - assert(keys.size() > 0); assert(keys.size() == values.size()); it_keys_ = keys.rbegin(); it_values_ = values.rbegin(); @@ -247,4 +289,17 @@ void MergeOutputIterator::Next() { ++it_values_; } +bool MergeHelper::FilterMerge(const Slice& user_key, const Slice& value_slice) { + if (compaction_filter_ == nullptr) { + return false; + } + if (stats_ != nullptr) { + filter_timer_.Start(); + } + bool to_delete = + compaction_filter_->FilterMergeOperand(level_, user_key, value_slice); + total_filter_time_ += filter_timer_.ElapsedNanosSafe(); + return to_delete; +} + } // namespace rocksdb diff --git a/db/merge_helper.h b/db/merge_helper.h index 39c7126de..ade3d71a6 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -10,8 +10,10 @@ #include #include "db/dbformat.h" +#include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" #include "rocksdb/slice.h" +#include "util/stop_watch.h" namespace rocksdb { @@ -23,17 +25,26 @@ class Statistics; class MergeHelper { public: - MergeHelper(const Comparator* user_comparator, - const MergeOperator* user_merge_operator, Logger* logger, + MergeHelper(Env* env, const Comparator* user_comparator, + const MergeOperator* user_merge_operator, + const CompactionFilter* compaction_filter, Logger* logger, unsigned min_partial_merge_operands, - bool assert_valid_internal_key) - : user_comparator_(user_comparator), + bool assert_valid_internal_key, SequenceNumber latest_snapshot, + int level = 0, Statistics* stats = nullptr) + : env_(env), + user_comparator_(user_comparator), user_merge_operator_(user_merge_operator), + compaction_filter_(compaction_filter), logger_(logger), min_partial_merge_operands_(min_partial_merge_operands), assert_valid_internal_key_(assert_valid_internal_key), + latest_snapshot_(latest_snapshot), + level_(level), keys_(), - operands_() { + operands_(), + filter_timer_(env_), + total_filter_time_(0U), + stats_(stats) { assert(user_comparator_ != nullptr); } @@ -62,6 +73,7 @@ class MergeHelper { // 0 means no restriction // at_bottom: (IN) true if the iterator covers the bottem level, which means // we could reach the start of the history of this user key. + // // Returns one of the following statuses: // - OK: Entries were successfully merged. // - MergeInProgress: Put/Delete not encountered and unable to merge operands. @@ -71,8 +83,11 @@ class MergeHelper { // // REQUIRED: The first key in the input is not corrupted. Status MergeUntil(Iterator* iter, const SequenceNumber stop_before = 0, - const bool at_bottom = false, Statistics* stats = nullptr, - Env* env_ = nullptr); + const bool at_bottom = false); + + // Filters a merge operand using the compaction filter specified + // in the constructor. Returns true if the operand should be filtered out. + bool FilterMerge(const Slice& user_key, const Slice& value_slice); // Query the merge result // These are valid until the next MergeUntil call @@ -101,19 +116,28 @@ class MergeHelper { // TODO: Re-style this comment to be like the first one const std::deque& keys() const { return keys_; } const std::deque& values() const { return operands_; } + uint64_t TotalFilterTime() const { return total_filter_time_; } bool HasOperator() const { return user_merge_operator_ != nullptr; } private: + Env* env_; const Comparator* user_comparator_; const MergeOperator* user_merge_operator_; + const CompactionFilter* compaction_filter_; Logger* logger_; unsigned min_partial_merge_operands_; bool assert_valid_internal_key_; // enforce no internal key corruption? + SequenceNumber latest_snapshot_; + int level_; // the scratch area that holds the result of MergeUntil // valid up to the next MergeUntil call std::deque keys_; // Keeps track of the sequence of keys seen std::deque operands_; // Parallel with keys_; stores the values + + StopWatchNano filter_timer_; + uint64_t total_filter_time_; + Statistics* stats_; }; // MergeOutputIterator can be used to iterate over the result of a merge. diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc index 209ede01f..2ef0d39e4 100644 --- a/db/merge_helper_test.cc +++ b/db/merge_helper_test.cc @@ -18,26 +18,18 @@ namespace rocksdb { class MergeHelperTest : public testing::Test { public: - MergeHelperTest() = default; - ~MergeHelperTest() = default; + MergeHelperTest() { env_ = Env::Default(); } - Status RunUInt64MergeHelper(SequenceNumber stop_before, bool at_bottom) { - InitIterator(); - merge_op_ = MergeOperators::CreateUInt64AddOperator(); - merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), - nullptr, 2U, false)); - return merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, - nullptr, Env::Default()); - } + ~MergeHelperTest() = default; - Status RunStringAppendMergeHelper(SequenceNumber stop_before, - bool at_bottom) { - InitIterator(); - merge_op_ = MergeOperators::CreateStringAppendTESTOperator(); - merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), - nullptr, 2U, false)); - return merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, - nullptr, Env::Default()); + Status Run(SequenceNumber stop_before, bool at_bottom, + SequenceNumber latest_snapshot = 0) { + iter_.reset(new test::VectorIterator(ks_, vs_)); + iter_->SeekToFirst(); + merge_helper_.reset(new MergeHelper(env_, BytewiseComparator(), + merge_op_.get(), filter_.get(), nullptr, + 2U, false, latest_snapshot)); + return merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom); } void AddKeyVal(const std::string& user_key, const SequenceNumber& seq, @@ -51,66 +43,63 @@ class MergeHelperTest : public testing::Test { vs_.push_back(val); } - void InitIterator() { - iter_.reset(new test::VectorIterator(ks_, vs_)); - iter_->SeekToFirst(); - } - - std::string EncodeInt(uint64_t x) { - std::string result; - PutFixed64(&result, x); - return result; - } - + Env* env_; std::unique_ptr iter_; std::shared_ptr merge_op_; std::unique_ptr merge_helper_; std::vector ks_; std::vector vs_; + std::unique_ptr filter_; }; // If MergeHelper encounters a new key on the last level, we know that // the key has no more history and it can merge keys. TEST_F(MergeHelperTest, MergeAtBottomSuccess) { - AddKeyVal("a", 20, kTypeMerge, EncodeInt(1U)); - AddKeyVal("a", 10, kTypeMerge, EncodeInt(3U)); - AddKeyVal("b", 10, kTypeMerge, EncodeInt(4U)); // <- Iterator after merge + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + + AddKeyVal("a", 20, kTypeMerge, test::EncodeInt(1U)); + AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(3U)); + AddKeyVal("b", 10, kTypeMerge, test::EncodeInt(4U)); // <- iter_ after merge - ASSERT_TRUE(RunUInt64MergeHelper(0, true).ok()); + ASSERT_TRUE(Run(0, true).ok()); ASSERT_EQ(ks_[2], iter_->key()); ASSERT_EQ(test::KeyStr("a", 20, kTypeValue), merge_helper_->keys()[0]); - ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); + ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); } // Merging with a value results in a successful merge. TEST_F(MergeHelperTest, MergeValue) { - AddKeyVal("a", 40, kTypeMerge, EncodeInt(1U)); - AddKeyVal("a", 30, kTypeMerge, EncodeInt(3U)); - AddKeyVal("a", 20, kTypeValue, EncodeInt(4U)); // <- Iterator after merge - AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + + AddKeyVal("a", 40, kTypeMerge, test::EncodeInt(1U)); + AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U)); + AddKeyVal("a", 20, kTypeValue, test::EncodeInt(4U)); // <- iter_ after merge + AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(1U)); - ASSERT_TRUE(RunUInt64MergeHelper(0, false).ok()); + ASSERT_TRUE(Run(0, false).ok()); ASSERT_EQ(ks_[3], iter_->key()); ASSERT_EQ(test::KeyStr("a", 40, kTypeValue), merge_helper_->keys()[0]); - ASSERT_EQ(EncodeInt(8U), merge_helper_->values()[0]); + ASSERT_EQ(test::EncodeInt(8U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); } // Merging stops before a snapshot. TEST_F(MergeHelperTest, SnapshotBeforeValue) { - AddKeyVal("a", 50, kTypeMerge, EncodeInt(1U)); - AddKeyVal("a", 40, kTypeMerge, EncodeInt(3U)); // <- Iterator after merge - AddKeyVal("a", 30, kTypeMerge, EncodeInt(1U)); - AddKeyVal("a", 20, kTypeValue, EncodeInt(4U)); - AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); + merge_op_ = MergeOperators::CreateUInt64AddOperator(); - ASSERT_TRUE(RunUInt64MergeHelper(31, true).IsMergeInProgress()); + AddKeyVal("a", 50, kTypeMerge, test::EncodeInt(1U)); + AddKeyVal("a", 40, kTypeMerge, test::EncodeInt(3U)); // <- iter_ after merge + AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(1U)); + AddKeyVal("a", 20, kTypeValue, test::EncodeInt(4U)); + AddKeyVal("a", 10, kTypeMerge, test::EncodeInt(1U)); + + ASSERT_TRUE(Run(31, true).IsMergeInProgress()); ASSERT_EQ(ks_[2], iter_->key()); ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]); - ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); + ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); } @@ -118,11 +107,13 @@ TEST_F(MergeHelperTest, SnapshotBeforeValue) { // MergeHelper preserves the operand stack for merge operators that // cannot do a partial merge. TEST_F(MergeHelperTest, NoPartialMerge) { + merge_op_ = MergeOperators::CreateStringAppendTESTOperator(); + AddKeyVal("a", 50, kTypeMerge, "v2"); - AddKeyVal("a", 40, kTypeMerge, "v"); // <- Iterator after merge + AddKeyVal("a", 40, kTypeMerge, "v"); // <- iter_ after merge AddKeyVal("a", 30, kTypeMerge, "v"); - ASSERT_TRUE(RunStringAppendMergeHelper(31, true).IsMergeInProgress()); + ASSERT_TRUE(Run(31, true).IsMergeInProgress()); ASSERT_EQ(ks_[2], iter_->key()); ASSERT_EQ(test::KeyStr("a", 40, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ("v", merge_helper_->values()[0]); @@ -134,44 +125,162 @@ TEST_F(MergeHelperTest, NoPartialMerge) { // A single operand can not be merged. TEST_F(MergeHelperTest, SingleOperand) { - AddKeyVal("a", 50, kTypeMerge, EncodeInt(1U)); + merge_op_ = MergeOperators::CreateUInt64AddOperator(); - ASSERT_TRUE(RunUInt64MergeHelper(31, true).IsMergeInProgress()); + AddKeyVal("a", 50, kTypeMerge, test::EncodeInt(1U)); + + ASSERT_TRUE(Run(31, true).IsMergeInProgress()); ASSERT_FALSE(iter_->Valid()); ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]); - ASSERT_EQ(EncodeInt(1U), merge_helper_->values()[0]); + ASSERT_EQ(test::EncodeInt(1U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); } // Merging with a deletion turns the deletion into a value TEST_F(MergeHelperTest, MergeDeletion) { - AddKeyVal("a", 30, kTypeMerge, EncodeInt(3U)); + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + + AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U)); AddKeyVal("a", 20, kTypeDeletion, ""); - ASSERT_TRUE(RunUInt64MergeHelper(15, false).ok()); + ASSERT_TRUE(Run(15, false).ok()); ASSERT_FALSE(iter_->Valid()); ASSERT_EQ(test::KeyStr("a", 30, kTypeValue), merge_helper_->keys()[0]); - ASSERT_EQ(EncodeInt(3U), merge_helper_->values()[0]); + ASSERT_EQ(test::EncodeInt(3U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); } // The merge helper stops upon encountering a corrupt key TEST_F(MergeHelperTest, CorruptKey) { - AddKeyVal("a", 30, kTypeMerge, EncodeInt(3U)); - AddKeyVal("a", 25, kTypeMerge, EncodeInt(1U)); + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + + AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U)); + AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(1U)); // Corrupt key - AddKeyVal("a", 20, kTypeDeletion, "", true); // <- Iterator after merge + AddKeyVal("a", 20, kTypeDeletion, "", true); // <- iter_ after merge - ASSERT_TRUE(RunUInt64MergeHelper(15, false).IsMergeInProgress()); + ASSERT_TRUE(Run(15, false).IsMergeInProgress()); ASSERT_EQ(ks_[2], iter_->key()); ASSERT_EQ(test::KeyStr("a", 30, kTypeMerge), merge_helper_->keys()[0]); - ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); + ASSERT_EQ(test::EncodeInt(4U), merge_helper_->values()[0]); ASSERT_EQ(1U, merge_helper_->keys().size()); ASSERT_EQ(1U, merge_helper_->values().size()); } +// The compaction filter is called on every merge operand +TEST_F(MergeHelperTest, FilterMergeOperands) { + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + filter_.reset(new test::FilterNumber(5U)); + + AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(3U)); + AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(5U)); // Filtered + AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(3U)); + AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(1U)); + AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); // Filtered + AddKeyVal("a", 25, kTypeValue, test::EncodeInt(1U)); + + ASSERT_TRUE(Run(15, false).ok()); + ASSERT_FALSE(iter_->Valid()); + MergeOutputIterator merge_output_iter(merge_helper_.get()); + merge_output_iter.SeekToFirst(); + ASSERT_EQ(test::KeyStr("a", 30, kTypeValue), + merge_output_iter.key().ToString()); + ASSERT_EQ(test::EncodeInt(8U), merge_output_iter.value().ToString()); + merge_output_iter.Next(); + ASSERT_FALSE(merge_output_iter.Valid()); +} + +TEST_F(MergeHelperTest, FilterAllMergeOperands) { + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + filter_.reset(new test::FilterNumber(5U)); + + AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U)); + AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(5U)); + AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(5U)); + AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(5U)); + AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); + AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U)); + + // filtered out all + ASSERT_TRUE(Run(15, false).ok()); + ASSERT_FALSE(iter_->Valid()); + MergeOutputIterator merge_output_iter(merge_helper_.get()); + merge_output_iter.SeekToFirst(); + ASSERT_FALSE(merge_output_iter.Valid()); + + // we have one operand that will survive because it's a delete + AddKeyVal("a", 24, kTypeDeletion, test::EncodeInt(5U)); + AddKeyVal("b", 23, kTypeValue, test::EncodeInt(5U)); + ASSERT_TRUE(Run(15, true).ok()); + merge_output_iter = MergeOutputIterator(merge_helper_.get()); + ASSERT_TRUE(iter_->Valid()); + merge_output_iter.SeekToFirst(); + ASSERT_FALSE(merge_output_iter.Valid()); + + // when all merge operands are filtered out, we leave the iterator pointing to + // the Put/Delete that survived + ASSERT_EQ(test::KeyStr("a", 24, kTypeDeletion), iter_->key().ToString()); + ASSERT_EQ(test::EncodeInt(5U), iter_->value().ToString()); +} + +// Make sure that merge operands are filtered at the beginning +TEST_F(MergeHelperTest, FilterFirstMergeOperand) { + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + filter_.reset(new test::FilterNumber(5U)); + + AddKeyVal("a", 31, kTypeMerge, test::EncodeInt(5U)); // Filtered + AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U)); // Filtered + AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(2U)); + AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(1U)); + AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(3U)); + AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); // Filtered + AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U)); // Filtered + AddKeyVal("b", 24, kTypeValue, test::EncodeInt(5U)); // next user key + + ASSERT_OK(Run(15, true)); + ASSERT_TRUE(iter_->Valid()); + MergeOutputIterator merge_output_iter(merge_helper_.get()); + merge_output_iter.SeekToFirst(); + // sequence number is 29 here, because the first merge operand got filtered + // out + ASSERT_EQ(test::KeyStr("a", 29, kTypeValue), + merge_output_iter.key().ToString()); + ASSERT_EQ(test::EncodeInt(6U), merge_output_iter.value().ToString()); + merge_output_iter.Next(); + ASSERT_FALSE(merge_output_iter.Valid()); + + // make sure that we're passing user keys into the filter + ASSERT_EQ("a", filter_->last_merge_operand_key()); +} + +// Make sure that merge operands are not filtered out if there's a snapshot +// pointing at them +TEST_F(MergeHelperTest, DontFilterMergeOperandsBeforeSnapshotTest) { + merge_op_ = MergeOperators::CreateUInt64AddOperator(); + filter_.reset(new test::FilterNumber(5U)); + + AddKeyVal("a", 31, kTypeMerge, test::EncodeInt(5U)); + AddKeyVal("a", 30, kTypeMerge, test::EncodeInt(5U)); + AddKeyVal("a", 29, kTypeMerge, test::EncodeInt(2U)); + AddKeyVal("a", 28, kTypeMerge, test::EncodeInt(1U)); + AddKeyVal("a", 27, kTypeMerge, test::EncodeInt(3U)); + AddKeyVal("a", 26, kTypeMerge, test::EncodeInt(5U)); + AddKeyVal("a", 25, kTypeMerge, test::EncodeInt(5U)); + AddKeyVal("b", 24, kTypeValue, test::EncodeInt(5U)); + + ASSERT_OK(Run(15, true, 32)); + ASSERT_TRUE(iter_->Valid()); + MergeOutputIterator merge_output_iter(merge_helper_.get()); + merge_output_iter.SeekToFirst(); + ASSERT_EQ(test::KeyStr("a", 31, kTypeValue), + merge_output_iter.key().ToString()); + ASSERT_EQ(test::EncodeInt(26U), merge_output_iter.value().ToString()); + merge_output_iter.Next(); + ASSERT_FALSE(merge_output_iter.Valid()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/examples/.gitignore b/examples/.gitignore index 1c63561a6..8c06e7972 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -1,6 +1,7 @@ -column_families_example -simple_example c_simple_example +column_families_example compact_files_example -transaction_example +compaction_filter_example optimistic_transaction_example +simple_example +transaction_example diff --git a/examples/Makefile b/examples/Makefile index 0882f83e6..fe82d11cd 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -10,6 +10,9 @@ simple_example: librocksdb simple_example.cc column_families_example: librocksdb column_families_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) +compaction_filter_example: librocksdb compaction_filter_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + compact_files_example: librocksdb compact_files_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) @@ -26,7 +29,7 @@ transaction_example: librocksdb transaction_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) clean: - rm -rf ./simple_example ./column_families_example ./compact_files_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example + rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example librocksdb: cd .. && $(MAKE) librocksdb.a diff --git a/examples/compaction_filter_example.cc b/examples/compaction_filter_example.cc new file mode 100644 index 000000000..050f4611a --- /dev/null +++ b/examples/compaction_filter_example.cc @@ -0,0 +1,84 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include +#include +#include + +class MyMerge : public rocksdb::MergeOperator { + public: + bool FullMerge(const rocksdb::Slice& key, + const rocksdb::Slice* existing_value, + const std::deque& operand_list, + std::string* new_value, + rocksdb::Logger* logger) const override { + new_value->clear(); + if (existing_value != nullptr) { + new_value->assign(existing_value->data(), existing_value->size()); + } + for (const std::string& m : operand_list) { + fprintf(stderr, "Merge(%s)\n", m.c_str()); + assert(m != "bad"); // the compaction filter filters out bad values + new_value->assign(m); + } + return true; + } + + const char* Name() const override { return "MyMerge"; } +}; + +class MyFilter : public rocksdb::CompactionFilter { + public: + bool Filter(int level, const rocksdb::Slice& key, + const rocksdb::Slice& existing_value, std::string* new_value, + bool* value_changed) const override { + fprintf(stderr, "Filter(%s)\n", key.ToString().c_str()); + ++count_; + assert(*value_changed == false); + return false; + } + + bool FilterMergeOperand(int level, const rocksdb::Slice& key, + const rocksdb::Slice& existing_value) const override { + fprintf(stderr, "FilterMerge(%s)\n", key.ToString().c_str()); + ++merge_count_; + return existing_value == "bad"; + } + + const char* Name() const override { return "MyFilter"; } + + mutable int count_ = 0; + mutable int merge_count_ = 0; +}; + +int main() { + rocksdb::DB* raw_db; + rocksdb::Status status; + + MyFilter filter; + + system("rm -rf /tmp/rocksmergetest"); + rocksdb::Options options; + options.create_if_missing = true; + options.merge_operator.reset(new MyMerge); + options.compaction_filter = &filter; + status = rocksdb::DB::Open(options, "/tmp/rocksmergetest", &raw_db); + assert(status.ok()); + std::unique_ptr db(raw_db); + + rocksdb::WriteOptions wopts; + db->Merge(wopts, "0", "bad"); // This is filtered out + db->Merge(wopts, "1", "data1"); + db->Merge(wopts, "1", "bad"); + db->Merge(wopts, "1", "data2"); + db->Merge(wopts, "1", "bad"); + db->Merge(wopts, "3", "data3"); + db->CompactRange(rocksdb::CompactRangeOptions(), nullptr, nullptr); + fprintf(stderr, "filter.count_ = %d\n", filter.count_); + assert(filter.count_ == 1); + fprintf(stderr, "filter.merge_count_ = %d\n", filter.merge_count_); + assert(filter.merge_count_ == 5); +} diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index dce69d2d7..da809f544 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -51,10 +51,24 @@ class CompactionFilter { // output of the compaction. The application can inspect // the existing value of the key and make decision based on it. // + // Key-Values that are results of merge operation during compaction are not + // passed into this function. Currently, when you have a mix of Put()s and + // Merge()s on a same key, we only guarantee to process the merge operands + // through the compaction filters. Put()s might be processed, or might not. + // // When the value is to be preserved, the application has the option // to modify the existing_value and pass it back through new_value. // value_changed needs to be set to true in this case. // + // If you use snapshot feature of RocksDB (i.e. call GetSnapshot() API on a + // DB* object), CompactionFilter might not be very useful for you. Due to + // guarantees we need to maintain, compaction process will not call Filter() + // on any keys that were written before the latest snapshot. In other words, + // compaction will only call Filter() on keys written after your most recent + // call to GetSnapshot(). In most cases, Filter() will not be called very + // often. This is something we're fixing. See the discussion at: + // https://www.facebook.com/groups/mysqlonrocksdb/permalink/999723240091865/ + // // If multithreaded compaction is being used *and* a single CompactionFilter // instance was supplied via Options::compaction_filter, this method may be // called from different threads concurrently. The application must ensure @@ -70,6 +84,14 @@ class CompactionFilter { std::string* new_value, bool* value_changed) const = 0; + // The compaction process invokes this method on every merge operand. If this + // method returns true, the merge operand will be ignored and not written out + // in the compaction output + virtual bool FilterMergeOperand(int level, const Slice& key, + const Slice& operand) const { + return false; + } + // Returns a name that identifies this compaction filter. // The name will be printed to LOG file on start up for diagnosis. virtual const char* Name() const = 0; diff --git a/table/get_context.cc b/table/get_context.cc index bfa53d7a4..609ca3083 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -92,7 +92,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, user_key_, &value, merge_context_->GetOperands(), value_, logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, - env_ != nullptr ? timer.ElapsedNanos() : 0); + timer.ElapsedNanosSafe()); } if (!merge_success) { RecordTick(statistics_, NUMBER_MERGE_FAILURES); @@ -118,7 +118,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, user_key_, nullptr, merge_context_->GetOperands(), value_, logger_); RecordTick(statistics_, MERGE_OPERATION_TOTAL_TIME, - env_ != nullptr ? timer.ElapsedNanos() : 0); + timer.ElapsedNanosSafe()); } if (!merge_success) { RecordTick(statistics_, NUMBER_MERGE_FAILURES); diff --git a/util/stop_watch.h b/util/stop_watch.h index 3637533bb..86cb2653c 100644 --- a/util/stop_watch.h +++ b/util/stop_watch.h @@ -67,6 +67,10 @@ class StopWatchNano { return elapsed; } + uint64_t ElapsedNanosSafe(bool reset = false) { + return (env_ != nullptr) ? ElapsedNanos(reset) : 0U; + } + private: Env* const env_; uint64_t start_; diff --git a/util/testutil.h b/util/testutil.h index 1729b3ee0..29806285e 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -13,6 +13,7 @@ #include #include "db/dbformat.h" +#include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/slice.h" @@ -337,5 +338,42 @@ class SleepingBackgroundTask { bool sleeping_; }; +// Filters merge operands and values that are equal to `num`. +class FilterNumber : public CompactionFilter { + public: + explicit FilterNumber(uint64_t num) : num_(num) {} + + std::string last_merge_operand_key() { return last_merge_operand_key_; } + + bool Filter(int level, const rocksdb::Slice& key, const rocksdb::Slice& value, + std::string* new_value, bool* value_changed) const override { + if (value.size() == sizeof(uint64_t)) { + return num_ == DecodeFixed64(value.data()); + } + return true; + } + + bool FilterMergeOperand(int level, const rocksdb::Slice& key, + const rocksdb::Slice& value) const override { + last_merge_operand_key_ = key.ToString(); + if (value.size() == sizeof(uint64_t)) { + return num_ == DecodeFixed64(value.data()); + } + return true; + } + + const char* Name() const override { return "FilterBadMergeOperand"; } + + private: + mutable std::string last_merge_operand_key_; + uint64_t num_; +}; + +inline std::string EncodeInt(uint64_t x) { + std::string result; + PutFixed64(&result, x); + return result; +} + } // namespace test } // namespace rocksdb