From 6fbe96baf8a4539d800e7c124ca85443a0ca7876 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 18 Oct 2016 12:04:56 -0700 Subject: [PATCH] Compaction Support for Range Deletion Summary: This diff introduces RangeDelAggregator, which takes ownership of iterators provided to it via AddTombstones(). The tombstones are organized in a two-level map (snapshot stripe -> begin key -> tombstone). Tombstone creation avoids data copy by holding Slices returned by the iterator, which remain valid thanks to pinning. For compaction, we create a hierarchical range tombstone iterator with structure matching the iterator over compaction input data. An aggregator based on that iterator is used by CompactionIterator to determine which keys are covered by range tombstones. In case of merge operand, the same aggregator is used by MergeHelper. Upon finishing each file in the compaction, relevant range tombstones are added to the output file's range tombstone metablock and file boundaries are updated accordingly. To check whether a key is covered by range tombstone, RangeDelAggregator::ShouldDelete() considers tombstones in the key's snapshot stripe. When this function is used outside of compaction, it also checks newer stripes, which can contain covering tombstones. Currently the intra-stripe check involves a linear scan; however, in the future we plan to collapse ranges within a stripe such that binary search can be used. RangeDelAggregator::AddToBuilder() adds all range tombstones in the table's key-range to a new table's range tombstone meta-block. Since range tombstones may fall in the gap between files, we may need to extend some files' key-ranges. The strategy is (1) first file extends as far left as possible and other files do not extend left, (2) all files extend right until either the start of the next file or the end of the last range tombstone in the gap, whichever comes first. One other notable change is adding release/move semantics to ScopedArenaIterator such that it can be used to transfer ownership of an arena-allocated iterator, similar to how unique_ptr is used for malloc'd data. Depends on D61473 Test Plan: compaction_iterator_test, mock_table, end-to-end tests in D63927 Reviewers: sdong, IslamAbdelRahman, wanning, yhchiang, lightmark Reviewed By: lightmark Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D62205 --- CMakeLists.txt | 1 + db/builder.cc | 10 +- db/compaction_iterator.cc | 22 +++- db/compaction_iterator.h | 3 + db/compaction_iterator_test.cc | 77 ++++++++++-- db/compaction_job.cc | 45 +++++-- db/compaction_job.h | 5 +- db/db_compaction_test.cc | 3 +- db/dbformat.h | 21 ++-- db/merge_helper.cc | 8 +- db/merge_helper.h | 3 + db/merge_helper_test.cc | 3 +- db/range_del_aggregator.cc | 195 ++++++++++++++++++++++++++++++ db/range_del_aggregator.h | 53 ++++++++ db/table_cache.cc | 64 ++++++---- db/table_cache.h | 9 +- db/version_set.cc | 32 +++-- db/version_set.h | 3 +- src.mk | 1 + table/block_based_table_builder.h | 7 +- table/plain_table_builder.cc | 5 +- table/scoped_arena_iterator.h | 8 +- 22 files changed, 494 insertions(+), 84 deletions(-) create mode 100644 db/range_del_aggregator.cc create mode 100644 db/range_del_aggregator.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 7949d39d1..ea19aba25 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -272,6 +272,7 @@ set(SOURCES db/memtable_list.cc db/merge_helper.cc db/merge_operator.cc + db/range_del_aggregator.cc db/repair.cc db/snapshot_impl.cc db/table_cache.cc diff --git a/db/builder.cc b/db/builder.cc index 29eda01e5..0cda4e536 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -112,16 +112,18 @@ Status BuildTable( compression_opts, level); } + std::unique_ptr range_del_agg; + range_del_agg.reset(new RangeDelAggregator(internal_comparator, snapshots)); MergeHelper merge(env, internal_comparator.user_comparator(), ioptions.merge_operator, nullptr, ioptions.info_log, mutable_cf_options.min_partial_merge_operands, true /* internal key corruption is not ok */, snapshots.empty() ? 0 : snapshots.back()); - CompactionIterator c_iter(iter, internal_comparator.user_comparator(), - &merge, kMaxSequenceNumber, &snapshots, - earliest_write_conflict_snapshot, env, - true /* internal key corruption is not ok */); + CompactionIterator c_iter( + iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, + &snapshots, earliest_write_conflict_snapshot, env, + true /* internal key corruption is not ok */, range_del_agg.get()); c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { const Slice& key = c_iter.key(); diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 0333bf3a2..f6725cbe3 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -14,8 +14,9 @@ CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, Env* env, - bool expect_valid_internal_key, const Compaction* compaction, - const CompactionFilter* compaction_filter, LogBuffer* log_buffer) + bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, + const Compaction* compaction, const CompactionFilter* compaction_filter, + LogBuffer* log_buffer) : input_(input), cmp_(cmp), merge_helper_(merge_helper), @@ -23,6 +24,7 @@ CompactionIterator::CompactionIterator( earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), env_(env), expect_valid_internal_key_(expect_valid_internal_key), + range_del_agg_(range_del_agg), compaction_(compaction), compaction_filter_(compaction_filter), log_buffer_(log_buffer), @@ -153,6 +155,7 @@ void CompactionIterator::NextFromInput() { if (!has_current_user_key_ || !cmp_->Equal(ikey_.user_key, current_user_key_)) { // First occurrence of this user key + // Copy key for output key_ = current_key_.SetKey(key_, &ikey_); current_user_key_ = ikey_.user_key; has_current_user_key_ = true; @@ -321,7 +324,7 @@ void CompactionIterator::NextFromInput() { } } else { // We are at the end of the input, could not parse the next key, or hit - // the next key. The iterator returns the single delete if the key + // a different key. The iterator returns the single delete if the key // possibly exists beyond the current output level. We set // has_current_user_key to false so that if the iterator is at the next // key, we do not compare it again against the previous key at the next @@ -390,7 +393,8 @@ void CompactionIterator::NextFromInput() { // have hit (A) // We encapsulate the merge related state machine in a different // object to minimize change to the existing flow. - merge_helper_->MergeUntil(input_, prev_snapshot, bottommost_level_); + merge_helper_->MergeUntil(input_, range_del_agg_, prev_snapshot, + bottommost_level_); merge_out_iter_.SeekToFirst(); if (merge_out_iter_.Valid()) { @@ -416,7 +420,15 @@ void CompactionIterator::NextFromInput() { pinned_iters_mgr_.ReleasePinnedData(); } } else { - valid_ = true; + // 1. new user key -OR- + // 2. different snapshot stripe + bool should_delete = + range_del_agg_->ShouldDelete(key_, true /* for_compaction */); + if (should_delete) { + input_->Next(); + } else { + valid_ = true; + } } } } diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 04a24fdfe..6ede0d66d 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -14,6 +14,7 @@ #include "db/compaction.h" #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" +#include "db/range_del_aggregator.h" #include "rocksdb/compaction_filter.h" #include "util/log_buffer.h" @@ -47,6 +48,7 @@ class CompactionIterator { std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, Env* env, bool expect_valid_internal_key, + RangeDelAggregator* range_del_agg, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, LogBuffer* log_buffer = nullptr); @@ -99,6 +101,7 @@ class CompactionIterator { const SequenceNumber earliest_write_conflict_snapshot_; Env* env_; bool expect_valid_internal_key_; + RangeDelAggregator* range_del_agg_; const Compaction* compaction_; const CompactionFilter* compaction_filter_; LogBuffer* log_buffer_; diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 4cbccca55..61b8ba970 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -4,6 +4,10 @@ // of patent rights can be found in the PATENTS file in the same directory. #include "db/compaction_iterator.h" + +#include +#include + #include "util/testharness.h" #include "util/testutil.h" @@ -11,33 +15,45 @@ namespace rocksdb { class CompactionIteratorTest : public testing::Test { public: - CompactionIteratorTest() : cmp_(BytewiseComparator()), snapshots_({}) {} + CompactionIteratorTest() + : cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {} + + void InitIterators(const std::vector& ks, + const std::vector& vs, + const std::vector& range_del_ks, + const std::vector& range_del_vs, + SequenceNumber last_sequence) { + std::unique_ptr range_del_iter( + new test::VectorIterator(range_del_ks, range_del_vs)); + range_del_agg_.reset(new RangeDelAggregator(icmp_, snapshots_)); + range_del_agg_->AddTombstones(std::move(range_del_iter)); - void InitIterator(const std::vector& ks, - const std::vector& vs, - SequenceNumber last_sequence) { merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, nullptr, nullptr, nullptr, 0U, false, 0)); iter_.reset(new test::VectorIterator(ks, vs)); iter_->SeekToFirst(); c_iter_.reset(new CompactionIterator( iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, - kMaxSequenceNumber, Env::Default(), false)); + kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get())); } + void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); } + const Comparator* cmp_; + const InternalKeyComparator icmp_; std::vector snapshots_; std::unique_ptr merge_helper_; std::unique_ptr iter_; std::unique_ptr c_iter_; + std::unique_ptr range_del_agg_; }; // It is possible that the output of the compaction iterator is empty even if // the input is not. TEST_F(CompactionIteratorTest, EmptyResult) { - InitIterator({test::KeyStr("a", 5, kTypeSingleDeletion), - test::KeyStr("a", 3, kTypeValue)}, - {"", "val"}, 5); + InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion), + test::KeyStr("a", 3, kTypeValue)}, + {"", "val"}, {}, {}, 5); c_iter_->SeekToFirst(); ASSERT_FALSE(c_iter_->Valid()); } @@ -45,10 +61,10 @@ TEST_F(CompactionIteratorTest, EmptyResult) { // If there is a corruption after a single deletion, the corrupted key should // be preserved. TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) { - InitIterator({test::KeyStr("a", 5, kTypeSingleDeletion), - test::KeyStr("a", 3, kTypeValue, true), - test::KeyStr("b", 10, kTypeValue)}, - {"", "val", "val2"}, 10); + InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion), + test::KeyStr("a", 3, kTypeValue, true), + test::KeyStr("b", 10, kTypeValue)}, + {"", "val", "val2"}, {}, {}, 10); c_iter_->SeekToFirst(); ASSERT_TRUE(c_iter_->Valid()); ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion), @@ -63,6 +79,43 @@ TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) { ASSERT_FALSE(c_iter_->Valid()); } +TEST_F(CompactionIteratorTest, SimpleRangeDeletion) { + InitIterators({test::KeyStr("morning", 5, kTypeValue), + test::KeyStr("morning", 2, kTypeValue), + test::KeyStr("night", 3, kTypeValue)}, + {"zao", "zao", "wan"}, + {test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 5); + c_iter_->SeekToFirst(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString()); + c_iter_->Next(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("night", 3, kTypeValue), c_iter_->key().ToString()); + c_iter_->Next(); + ASSERT_FALSE(c_iter_->Valid()); +} + +TEST_F(CompactionIteratorTest, RangeDeletionWithSnapshots) { + AddSnapshot(10); + std::vector ks1; + ks1.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion)); + std::vector vs1{"mz"}; + std::vector ks2{test::KeyStr("morning", 15, kTypeValue), + test::KeyStr("morning", 5, kTypeValue), + test::KeyStr("night", 40, kTypeValue), + test::KeyStr("night", 20, kTypeValue)}; + std::vector vs2{"zao 15", "zao 5", "wan 40", "wan 20"}; + InitIterators(ks2, vs2, ks1, vs1, 40); + c_iter_->SeekToFirst(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString()); + c_iter_->Next(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("night", 40, kTypeValue), c_iter_->key().ToString()); + c_iter_->Next(); + ASSERT_FALSE(c_iter_->Valid()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 3931becb9..22f376f90 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -658,8 +658,11 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact != nullptr); - std::unique_ptr input( - versions_->MakeInputIterator(sub_compact->compaction)); + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); + std::unique_ptr range_del_agg( + new RangeDelAggregator(cfd->internal_comparator(), existing_snapshots_)); + std::unique_ptr input(versions_->MakeInputIterator( + sub_compact->compaction, range_del_agg.get())); AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); @@ -680,7 +683,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); } - ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); const MutableCFOptions* mutable_cf_options = sub_compact->compaction->mutable_cf_options(); @@ -737,7 +739,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { sub_compact->c_iter.reset(new CompactionIterator( input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, env_, false, - sub_compact->compaction, compaction_filter)); + range_del_agg.get(), sub_compact->compaction, compaction_filter)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); const auto& c_iter_stats = c_iter->iter_stats(); @@ -766,7 +768,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { sub_compact->ShouldStopBefore( key, sub_compact->current_output_file_size) && sub_compact->builder != nullptr) { - status = FinishCompactionOutputFile(input->status(), sub_compact); + status = FinishCompactionOutputFile(input->status(), sub_compact, + range_del_agg.get()); if (!status.ok()) { break; } @@ -843,6 +846,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } } + Status input_status = input->status(); + c_iter->Next(); + // Close output file if it is big enough // TODO(aekmekji): determine if file should be closed earlier than this // during subcompactions (i.e. if output size, estimated by input size, is @@ -850,14 +856,18 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // and 0.6MB instead of 1MB and 0.2MB) if (sub_compact->current_output_file_size >= sub_compact->compaction->max_output_file_size()) { - status = FinishCompactionOutputFile(input->status(), sub_compact); + const Slice* next_key = nullptr; + if (c_iter->Valid()) { + next_key = &c_iter->key(); + } + status = FinishCompactionOutputFile(input_status, sub_compact, + range_del_agg.get(), next_key); if (sub_compact->outputs.size() == 1) { // Use dictionary from first output file for compression of subsequent // files. sub_compact->compression_dict = std::move(compression_dict); } } - c_iter->Next(); } sub_compact->num_input_records = c_iter_stats.num_input_records; @@ -884,8 +894,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { status = Status::ShutdownInProgress( "Database shutdown or Column family drop during compaction"); } + if (status.ok() && sub_compact->builder == nullptr && + range_del_agg->ShouldAddTombstones(bottommost_level_)) { + status = OpenCompactionOutputFile(sub_compact); + } if (status.ok() && sub_compact->builder != nullptr) { - status = FinishCompactionOutputFile(input->status(), sub_compact); + status = FinishCompactionOutputFile(input->status(), sub_compact, + range_del_agg.get()); } if (status.ok()) { status = input->status(); @@ -936,7 +951,9 @@ void CompactionJob::RecordDroppedKeys( } Status CompactionJob::FinishCompactionOutputFile( - const Status& input_status, SubcompactionState* sub_compact) { + const Status& input_status, SubcompactionState* sub_compact, + RangeDelAggregator* range_del_agg, + const Slice* next_table_min_key /* = nullptr */) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_SYNC_FILE); assert(sub_compact != nullptr); @@ -951,6 +968,16 @@ Status CompactionJob::FinishCompactionOutputFile( // Check for iterator errors Status s = input_status; auto meta = &sub_compact->current_output()->meta; + if (s.ok()) { + // For the first output table, include range tombstones before the min key + // boundary. For subsequent output tables, this is unnecessary because we + // extend each file's max key boundary up until the next file's min key when + // range tombstones fall in the gap. + range_del_agg->AddToBuilder( + sub_compact->builder.get(), + sub_compact->outputs.size() == 1 /* extend_before_min_key */, + next_table_min_key, meta, bottommost_level_); + } const uint64_t current_entries = sub_compact->builder->NumEntries(); meta->marked_for_compaction = sub_compact->builder->NeedCompact(); if (s.ok()) { diff --git a/db/compaction_job.h b/db/compaction_job.h index a8ace56c6..12a57d71b 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -25,6 +25,7 @@ #include "db/job_context.h" #include "db/log_writer.h" #include "db/memtable_list.h" +#include "db/range_del_aggregator.h" #include "db/version_edit.h" #include "db/write_controller.h" #include "db/write_thread.h" @@ -96,7 +97,9 @@ class CompactionJob { void ProcessKeyValueCompaction(SubcompactionState* sub_compact); Status FinishCompactionOutputFile(const Status& input_status, - SubcompactionState* sub_compact); + SubcompactionState* sub_compact, + RangeDelAggregator* range_del_agg = nullptr, + const Slice* next_table_min_key = nullptr); Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options); void RecordCompactionIOStats(); Status OpenCompactionOutputFile(SubcompactionState* sub_compact); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 6f7d78c06..518c63c50 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -305,7 +305,8 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { dbfull()->TEST_WaitForCompact(); // Preloading iterator issues one table cache lookup and creates // a new table reader. One file is created for flush and one for compaction. - // Compaction inputs make no table cache look-up. + // Compaction inputs make no table cache look-up for data iterators or + // range tombstone iterators since they're already cached. ASSERT_EQ(num_table_cache_lookup, 2); // Create new iterator for: // (1) 1 for verifying flush results diff --git a/db/dbformat.h b/db/dbformat.h index 29d17b0ec..a59b15992 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -507,23 +507,30 @@ struct RangeTombstone { explicit RangeTombstone(Slice internal_key, Slice value) { ParsedInternalKey parsed_key; - if (ParseInternalKey(internal_key, &parsed_key)) { - start_key_ = parsed_key.user_key; - seq_ = parsed_key.sequence; - end_key_ = value; + if (!ParseInternalKey(internal_key, &parsed_key)) { + assert(false); } + start_key_ = parsed_key.user_key; + seq_ = parsed_key.sequence; + end_key_ = value; } - // be careful to use Serialize(); InternalKey() allocates new memory - std::pair Serialize() { + // be careful to use Serialize(), allocates new memory + std::pair Serialize() const { auto key = InternalKey(start_key_, seq_, kTypeRangeDeletion); Slice value = end_key_; return std::make_pair(std::move(key), std::move(value)); } - InternalKey SerializeKey() { + // be careful to use SerializeKey(), allocates new memory + InternalKey SerializeKey() const { return InternalKey(start_key_, seq_, kTypeRangeDeletion); } + + // be careful to use SerializeEndKey(), allocates new memory + InternalKey SerializeEndKey() const { + return InternalKey(end_key_, seq_, kTypeRangeDeletion); + } }; } // namespace rocksdb diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 9eefd402c..d78c9f237 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -75,6 +75,7 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. Status MergeHelper::MergeUntil(InternalIterator* iter, + RangeDelAggregator* range_del_agg, const SequenceNumber stop_before, const bool at_bottom) { // Get a copy of the internal key, before it's invalidated by iter->Next() @@ -171,6 +172,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, } else { // hit a merge // => if there is a compaction filter, apply it. + // => check for range tombstones covering the operand // => merge the operand into the front of the operands_ list // if not filtered // => then continue because we haven't yet seen a Put/Delete. @@ -183,8 +185,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // 1) it's included in one of the snapshots. in that case we *must* write // it out, no matter what compaction filter says // 2) it's not filtered by a compaction filter - if (ikey.sequence <= latest_snapshot_ || - !FilterMerge(orig_ikey.user_key, value_slice)) { + if ((ikey.sequence <= latest_snapshot_ || + !FilterMerge(orig_ikey.user_key, value_slice)) && + (range_del_agg == nullptr || + !range_del_agg->ShouldDelete(iter->key()))) { if (original_key_is_iter) { // this is just an optimization that saves us one memcpy keys_.push_front(std::move(original_key)); diff --git a/db/merge_helper.h b/db/merge_helper.h index 7cd992f34..de3e3c949 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -12,6 +12,7 @@ #include "db/dbformat.h" #include "db/merge_context.h" +#include "db/range_del_aggregator.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" #include "rocksdb/slice.h" @@ -71,6 +72,7 @@ class MergeHelper { // or - the end of iteration // iter: (IN) points to the first merge type entry // (OUT) points to the first entry not included in the merge process + // range_del_agg: (IN) filters merge operands covered by range tombstones. // stop_before: (IN) a sequence number that merge should not cross. // 0 means no restriction // at_bottom: (IN) true if the iterator covers the bottem level, which means @@ -85,6 +87,7 @@ class MergeHelper { // // REQUIRED: The first key in the input is not corrupted. Status MergeUntil(InternalIterator* iter, + RangeDelAggregator* range_del_agg = nullptr, const SequenceNumber stop_before = 0, const bool at_bottom = false); diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc index b21f56078..e468f7ed2 100644 --- a/db/merge_helper_test.cc +++ b/db/merge_helper_test.cc @@ -29,7 +29,8 @@ class MergeHelperTest : public testing::Test { merge_helper_.reset(new MergeHelper(env_, BytewiseComparator(), merge_op_.get(), filter_.get(), nullptr, 2U, false, latest_snapshot)); - return merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom); + return merge_helper_->MergeUntil(iter_.get(), nullptr /* range_del_agg */, + stop_before, at_bottom); } void AddKeyVal(const std::string& user_key, const SequenceNumber& seq, diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc new file mode 100644 index 000000000..1515a63f6 --- /dev/null +++ b/db/range_del_aggregator.cc @@ -0,0 +1,195 @@ +// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/range_del_aggregator.h" + +#include + +namespace rocksdb { + +RangeDelAggregator::RangeDelAggregator( + const InternalKeyComparator& icmp, + const std::vector& snapshots) + : icmp_(icmp) { + pinned_iters_mgr_.StartPinning(); + for (auto snapshot : snapshots) { + stripe_map_.emplace( + snapshot, + TombstoneMap(stl_wrappers::LessOfComparator(icmp_.user_comparator()))); + } + // Data newer than any snapshot falls in this catch-all stripe + stripe_map_.emplace(kMaxSequenceNumber, TombstoneMap()); +} + +bool RangeDelAggregator::ShouldDelete(const Slice& internal_key, + bool for_compaction /* = false */) { + ParsedInternalKey parsed; + if (!ParseInternalKey(internal_key, &parsed)) { + assert(false); + } + assert(IsValueType(parsed.type)); + + // Starting point is the snapshot stripe in which the key lives, then need to + // search all earlier stripes too, unless it's for compaction. + for (auto stripe_map_iter = GetStripeMapIter(parsed.sequence); + stripe_map_iter != stripe_map_.end(); ++stripe_map_iter) { + const auto& tombstone_map = stripe_map_iter->second; + for (const auto& start_key_and_tombstone : tombstone_map) { + const auto& tombstone = start_key_and_tombstone.second; + if (icmp_.user_comparator()->Compare(parsed.user_key, + tombstone.start_key_) < 0) { + break; + } + if (parsed.sequence < tombstone.seq_ && + icmp_.user_comparator()->Compare(parsed.user_key, + tombstone.end_key_) <= 0) { + return true; + } + } + if (for_compaction) { + break; + } + } + return false; +} + +bool RangeDelAggregator::ShouldAddTombstones( + bool bottommost_level /* = false */) { + auto stripe_map_iter = stripe_map_.begin(); + assert(stripe_map_iter != stripe_map_.end()); + if (bottommost_level) { + // For the bottommost level, keys covered by tombstones in the first + // (oldest) stripe have been compacted away, so the tombstones are obsolete. + ++stripe_map_iter; + } + while (stripe_map_iter != stripe_map_.end()) { + if (!stripe_map_iter->second.empty()) { + return true; + } + ++stripe_map_iter; + } + return false; +} + +void RangeDelAggregator::AddTombstones(ScopedArenaIterator input) { + AddTombstones(input.release(), true /* arena */); +} + +void RangeDelAggregator::AddTombstones( + std::unique_ptr input) { + AddTombstones(input.release(), false /* arena */); +} + +void RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) { + pinned_iters_mgr_.PinIterator(input, arena); + input->SeekToFirst(); + while (input->Valid()) { + RangeTombstone tombstone(input->key(), input->value()); + auto& tombstone_map = GetStripeMapIter(tombstone.seq_)->second; + tombstone_map.emplace(tombstone.start_key_.ToString(), + std::move(tombstone)); + input->Next(); + } +} + +RangeDelAggregator::StripeMap::iterator RangeDelAggregator::GetStripeMapIter( + SequenceNumber seq) { + // The stripe includes seqnum for the snapshot above and excludes seqnum for + // the snapshot below. + StripeMap::iterator iter; + if (seq > 0) { + // upper_bound() checks strict inequality so need to subtract one + iter = stripe_map_.upper_bound(seq - 1); + } else { + iter = stripe_map_.begin(); + } + // catch-all stripe justifies this assertion in either of above cases + assert(iter != stripe_map_.end()); + return iter; +} + +// TODO(andrewkr): We should implement an iterator over range tombstones in our +// map. It'd enable compaction to open tables on-demand, i.e., only once range +// tombstones are known to be available, without the code duplication we have +// in ShouldAddTombstones(). It'll also allow us to move the table-modifying +// code into more coherent places: CompactionJob and BuildTable(). +void RangeDelAggregator::AddToBuilder(TableBuilder* builder, + bool extend_before_min_key, + const Slice* next_table_min_key, + FileMetaData* meta, + bool bottommost_level /* = false */) { + auto stripe_map_iter = stripe_map_.begin(); + assert(stripe_map_iter != stripe_map_.end()); + if (bottommost_level) { + // For the bottommost level, keys covered by tombstones in the first + // (oldest) stripe have been compacted away, so the tombstones are obsolete. + ++stripe_map_iter; + } + + // Note the order in which tombstones are stored is insignificant since we + // insert them into a std::map on the read path. + bool first_added = false; + while (stripe_map_iter != stripe_map_.end()) { + for (const auto& start_key_and_tombstone : stripe_map_iter->second) { + const auto& tombstone = start_key_and_tombstone.second; + if (next_table_min_key != nullptr && + icmp_.user_comparator()->Compare(*next_table_min_key, + tombstone.start_key_) < 0) { + // Tombstones starting after next_table_min_key only need to be included + // in the next table. + break; + } + if (!extend_before_min_key && meta->smallest.size() != 0 && + icmp_.user_comparator()->Compare(tombstone.end_key_, + meta->smallest.user_key()) < 0) { + // Tombstones ending before this table's smallest key can conditionally + // be excluded, e.g., when this table is a non-first compaction output, + // we know such tombstones are included in the previous table. In that + // case extend_before_min_key would be false. + continue; + } + + auto ikey_and_end_key = tombstone.Serialize(); + builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second); + if (!first_added) { + first_added = true; + if (extend_before_min_key && + (meta->smallest.size() == 0 || + icmp_.Compare(ikey_and_end_key.first, meta->smallest) < 0)) { + meta->smallest = ikey_and_end_key.first; + } + } + auto end_ikey = tombstone.SerializeEndKey(); + if (meta->largest.size() == 0 || + icmp_.Compare(meta->largest, end_ikey) < 0) { + if (next_table_min_key != nullptr && + icmp_.Compare(*next_table_min_key, end_ikey.Encode()) < 0) { + // Pretend the largest key has the same user key as the min key in the + // following table in order for files to appear key-space partitioned. + // Choose highest seqnum so this file's largest comes before the next + // file's smallest. The fake seqnum is OK because the read path's + // file-picking code only considers the user key portion. + // + // Note Seek() also creates InternalKey with (user_key, + // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of + // kTypeRangeDeletion (0xF), so the range tombstone comes before the + // Seek() key in InternalKey's ordering. So Seek() will look in the + // next file for the user key. + ParsedInternalKey parsed; + ParseInternalKey(*next_table_min_key, &parsed); + meta->largest = InternalKey(parsed.user_key, kMaxSequenceNumber, + kTypeRangeDeletion); + } else { + meta->largest = std::move(end_ikey); + } + } + meta->smallest_seqno = std::min(meta->smallest_seqno, tombstone.seq_); + meta->largest_seqno = std::max(meta->largest_seqno, tombstone.seq_); + } + ++stripe_map_iter; + } +} + +} // namespace rocksdb diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h new file mode 100644 index 000000000..97a76e484 --- /dev/null +++ b/db/range_del_aggregator.h @@ -0,0 +1,53 @@ +// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include +#include + +#include "db/dbformat.h" +#include "db/pinned_iterators_manager.h" +#include "db/version_edit.h" +#include "include/rocksdb/comparator.h" +#include "include/rocksdb/types.h" +#include "table/internal_iterator.h" +#include "table/scoped_arena_iterator.h" +#include "table/table_builder.h" +#include "util/kv_map.h" + +namespace rocksdb { +class RangeDelAggregator { + public: + RangeDelAggregator(const InternalKeyComparator& icmp, + const std::vector& snapshots); + + bool ShouldDelete(const Slice& internal_key, bool for_compaction = false); + bool ShouldAddTombstones(bool bottommost_level = false); + void AddTombstones(ScopedArenaIterator input); + void AddTombstones(std::unique_ptr input); + // write tombstones covering a range to a table builder + // usually don't add to a max-level table builder + void AddToBuilder(TableBuilder* builder, bool extend_before_min_key, + const Slice* next_table_min_key, FileMetaData* meta, + bool bottommost_level = false); + + private: + // Maps tombstone start key -> tombstone object + typedef std::map + TombstoneMap; + // Maps snapshot seqnum -> map of tombstones that fall in that stripe, i.e., + // their seqnums are greater than the next smaller snapshot's seqnum. + typedef std::map StripeMap; + + void AddTombstones(InternalIterator* input, bool arena); + StripeMap::iterator GetStripeMapIter(SequenceNumber seq); + + PinnedIteratorsManager pinned_iters_mgr_; + StripeMap stripe_map_; + const InternalKeyComparator icmp_; +}; +} // namespace rocksdb diff --git a/db/table_cache.cc b/db/table_cache.cc index ff7997566..1ae95d925 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -170,7 +170,10 @@ InternalIterator* TableCache::NewIterator( const ReadOptions& options, const EnvOptions& env_options, const InternalKeyComparator& icomparator, const FileDescriptor& fd, TableReader** table_reader_ptr, HistogramImpl* file_read_hist, - bool for_compaction, Arena* arena, bool skip_filters, int level) { + bool for_compaction, Arena* arena, bool skip_filters, int level, + RangeDelAggregator* range_del_agg /* = nullptr */, + bool is_range_del_only /* = false */) { + assert(!is_range_del_only || range_del_agg != nullptr); PERF_TIMER_GUARD(new_table_iterator_nanos); if (table_reader_ptr != nullptr) { @@ -182,14 +185,18 @@ InternalIterator* TableCache::NewIterator( size_t readahead = 0; bool create_new_table_reader = false; - if (for_compaction) { - if (ioptions_.new_table_reader_for_compaction_inputs) { - readahead = ioptions_.compaction_readahead_size; - create_new_table_reader = true; + // pointless to create a new table reader for range tombstones only since the + // reader isn't reused + if (!is_range_del_only) { + if (for_compaction) { + if (ioptions_.new_table_reader_for_compaction_inputs) { + readahead = ioptions_.compaction_readahead_size; + create_new_table_reader = true; + } + } else { + readahead = options.readahead_size; + create_new_table_reader = readahead > 0; } - } else { - readahead = options.readahead_size; - create_new_table_reader = readahead > 0; } if (create_new_table_reader) { @@ -216,23 +223,36 @@ InternalIterator* TableCache::NewIterator( } } - InternalIterator* result = - table_reader->NewIterator(options, arena, skip_filters); - - if (create_new_table_reader) { - assert(handle == nullptr); - result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr); - } else if (handle != nullptr) { - result->RegisterCleanup(&UnrefEntry, cache_, handle); + if (range_del_agg != nullptr) { + std::unique_ptr iter( + table_reader->NewRangeTombstoneIterator(options)); + range_del_agg->AddTombstones(std::move(iter)); } - if (for_compaction) { - table_reader->SetupForCompaction(); - } - if (table_reader_ptr != nullptr) { - *table_reader_ptr = table_reader; - } + InternalIterator* result = nullptr; + if (!is_range_del_only) { + result = table_reader->NewIterator(options, arena, skip_filters); + if (create_new_table_reader) { + assert(handle == nullptr); + result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr); + } else if (handle != nullptr) { + result->RegisterCleanup(&UnrefEntry, cache_, handle); + } + if (for_compaction) { + table_reader->SetupForCompaction(); + } + if (table_reader_ptr != nullptr) { + *table_reader_ptr = table_reader; + } + } else { + assert(!create_new_table_reader); + // don't need the table reader at all since the iterator over the meta-block + // doesn't require it + if (handle != nullptr) { + UnrefEntry(cache_, handle); + } + } return result; } diff --git a/db/table_cache.h b/db/table_cache.h index 402b94fe5..2bf7f3c77 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -15,6 +15,7 @@ #include #include "db/dbformat.h" +#include "db/range_del_aggregator.h" #include "port/port.h" #include "rocksdb/cache.h" #include "rocksdb/env.h" @@ -47,12 +48,18 @@ class TableCache { // returned iterator is live. // @param skip_filters Disables loading/accessing the filter block // @param level The level this table is at, -1 for "not set / don't know" + // @param range_del_agg When non-nullptr, creates a range tombstone iterator + // over this file's meta-block and gives it to this object + // @param is_range_del_only When set, this function only gives a range + // tombstone iterator to range_del_agg and then returns nullptr InternalIterator* NewIterator( const ReadOptions& options, const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, TableReader** table_reader_ptr = nullptr, HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, - Arena* arena = nullptr, bool skip_filters = false, int level = -1); + Arena* arena = nullptr, bool skip_filters = false, int level = -1, + RangeDelAggregator* range_del_agg = nullptr, + bool is_range_del_only = false); // If a seek to internal key "k" in specified file finds an entry, // call (*handle_result)(arg, found_key, found_value) repeatedly until diff --git a/db/version_set.cc b/db/version_set.cc index b58eab415..730c797d5 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -497,7 +497,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState { const EnvOptions& env_options, const InternalKeyComparator& icomparator, HistogramImpl* file_read_hist, bool for_compaction, - bool prefix_enabled, bool skip_filters, int level) + bool prefix_enabled, bool skip_filters, int level, + RangeDelAggregator* range_del_agg) : TwoLevelIteratorState(prefix_enabled), table_cache_(table_cache), read_options_(read_options), @@ -506,7 +507,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState { file_read_hist_(file_read_hist), for_compaction_(for_compaction), skip_filters_(skip_filters), - level_(level) {} + level_(level), + range_del_agg_(range_del_agg) {} InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override { if (meta_handle.size() != sizeof(FileDescriptor)) { @@ -518,7 +520,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState { return table_cache_->NewIterator( read_options_, env_options_, icomparator_, *fd, nullptr /* don't need reference to table*/, file_read_hist_, - for_compaction_, nullptr /* arena */, skip_filters_, level_); + for_compaction_, nullptr /* arena */, skip_filters_, level_, + range_del_agg_, false /* is_range_del_only */); } } @@ -535,6 +538,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { bool for_compaction_; bool skip_filters_; int level_; + RangeDelAggregator* range_del_agg_; }; // A wrapper of version builder which references the current version in @@ -826,13 +830,13 @@ void Version::AddIterators(const ReadOptions& read_options, for (int level = 1; level < storage_info_.num_non_empty_levels(); level++) { if (storage_info_.LevelFilesBrief(level).num_files != 0) { auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState)); - auto* state = new (mem) - LevelFileIteratorState(cfd_->table_cache(), read_options, soptions, - cfd_->internal_comparator(), - cfd_->internal_stats()->GetFileReadHist(level), - false /* for_compaction */, - cfd_->ioptions()->prefix_extractor != nullptr, - IsFilterSkipped(level), level); + auto* state = new (mem) LevelFileIteratorState( + cfd_->table_cache(), read_options, soptions, + cfd_->internal_comparator(), + cfd_->internal_stats()->GetFileReadHist(level), + false /* for_compaction */, + cfd_->ioptions()->prefix_extractor != nullptr, IsFilterSkipped(level), + level, nullptr /* range_del_agg */); mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); auto* first_level_iter = new (mem) LevelFileNumIterator( cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level)); @@ -3343,7 +3347,8 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { } } -InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { +InternalIterator* VersionSet::MakeInputIterator( + const Compaction* c, RangeDelAggregator* range_del_agg) { auto cfd = c->column_family_data(); ReadOptions read_options; read_options.verify_checksums = @@ -3371,7 +3376,7 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { cfd->internal_comparator(), flevel->files[i].fd, nullptr, nullptr, /* no per level latency histogram*/ true /* for_compaction */, nullptr /* arena */, - false /* skip_filters */, (int)which /* level */); + false /* skip_filters */, (int)which /* level */, range_del_agg); } } else { // Create concatenating iterator for the files from this level @@ -3381,7 +3386,8 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { cfd->internal_comparator(), nullptr /* no per level latency histogram */, true /* for_compaction */, false /* prefix enabled */, - false /* skip_filters */, (int)which /* level */), + false /* skip_filters */, (int)which /* level */, + range_del_agg), new LevelFileNumIterator(cfd->internal_comparator(), c->input_levels(which))); } diff --git a/db/version_set.h b/db/version_set.h index 2e995227d..f84d21f77 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -695,7 +695,8 @@ class VersionSet { // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. - InternalIterator* MakeInputIterator(const Compaction* c); + InternalIterator* MakeInputIterator(const Compaction* c, + RangeDelAggregator* range_del_agg); // Add all files listed in any live version to *live. void AddLiveFiles(std::vector* live_list); diff --git a/src.mk b/src.mk index 2fe89f479..6c21b557d 100644 --- a/src.mk +++ b/src.mk @@ -10,6 +10,7 @@ LIB_SOURCES = \ db/compaction_job.cc \ db/compaction_picker.cc \ db/convenience.cc \ + db/range_del_aggregator.cc \ db/db_filesnapshot.cc \ db/dbformat.cc \ db/db_impl.cc \ diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index d1d488653..6a062424a 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -86,13 +86,14 @@ class BlockBasedTableBuilder : public TableBuilder { private: bool ok() const { return status().ok(); } - // Call block's Finish() method and then write the finalize block contents to - // file. + // Call block's Finish() method + // and then write the compressed block contents to file. void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block); - // Directly write block content to the file. + // Compress and write block content to the file. void WriteBlock(const Slice& block_contents, BlockHandle* handle, bool is_data_block); + // Directly write data to the file. void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle); Status InsertBlockInCache(const Slice& block_contents, const CompressionType type, diff --git a/table/plain_table_builder.cc b/table/plain_table_builder.cc index 9f81c262e..fa3c7874e 100644 --- a/table/plain_table_builder.cc +++ b/table/plain_table_builder.cc @@ -121,7 +121,10 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) { size_t meta_bytes_buf_size = 0; ParsedInternalKey internal_key; - ParseInternalKey(key, &internal_key); + if (!ParseInternalKey(key, &internal_key)) { + assert(false); + return; + } if (internal_key.type == kTypeRangeDeletion) { status_ = Status::NotSupported("Range deletion unsupported"); return; diff --git a/table/scoped_arena_iterator.h b/table/scoped_arena_iterator.h index 68cc144ec..d6183b0c0 100644 --- a/table/scoped_arena_iterator.h +++ b/table/scoped_arena_iterator.h @@ -40,10 +40,16 @@ class ScopedArenaIterator { } InternalIterator* operator->() { return iter_; } + InternalIterator* get() { return iter_; } void set(InternalIterator* iter) { reset(iter); } - InternalIterator* get() { return iter_; } + InternalIterator* release() { + assert(iter_ != nullptr); + auto* res = iter_; + iter_ = nullptr; + return res; + } ~ScopedArenaIterator() { reset(nullptr);