diff --git a/CMakeLists.txt b/CMakeLists.txt index 7949d39d1..ea19aba25 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -272,6 +272,7 @@ set(SOURCES db/memtable_list.cc db/merge_helper.cc db/merge_operator.cc + db/range_del_aggregator.cc db/repair.cc db/snapshot_impl.cc db/table_cache.cc diff --git a/db/builder.cc b/db/builder.cc index 29eda01e5..0cda4e536 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -112,16 +112,18 @@ Status BuildTable( compression_opts, level); } + std::unique_ptr range_del_agg; + range_del_agg.reset(new RangeDelAggregator(internal_comparator, snapshots)); MergeHelper merge(env, internal_comparator.user_comparator(), ioptions.merge_operator, nullptr, ioptions.info_log, mutable_cf_options.min_partial_merge_operands, true /* internal key corruption is not ok */, snapshots.empty() ? 0 : snapshots.back()); - CompactionIterator c_iter(iter, internal_comparator.user_comparator(), - &merge, kMaxSequenceNumber, &snapshots, - earliest_write_conflict_snapshot, env, - true /* internal key corruption is not ok */); + CompactionIterator c_iter( + iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, + &snapshots, earliest_write_conflict_snapshot, env, + true /* internal key corruption is not ok */, range_del_agg.get()); c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { const Slice& key = c_iter.key(); diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 0333bf3a2..f6725cbe3 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -14,8 +14,9 @@ CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, Env* env, - bool expect_valid_internal_key, const Compaction* compaction, - const CompactionFilter* compaction_filter, LogBuffer* log_buffer) + bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, + const Compaction* compaction, const CompactionFilter* compaction_filter, + LogBuffer* log_buffer) : input_(input), cmp_(cmp), merge_helper_(merge_helper), @@ -23,6 +24,7 @@ CompactionIterator::CompactionIterator( earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), env_(env), expect_valid_internal_key_(expect_valid_internal_key), + range_del_agg_(range_del_agg), compaction_(compaction), compaction_filter_(compaction_filter), log_buffer_(log_buffer), @@ -153,6 +155,7 @@ void CompactionIterator::NextFromInput() { if (!has_current_user_key_ || !cmp_->Equal(ikey_.user_key, current_user_key_)) { // First occurrence of this user key + // Copy key for output key_ = current_key_.SetKey(key_, &ikey_); current_user_key_ = ikey_.user_key; has_current_user_key_ = true; @@ -321,7 +324,7 @@ void CompactionIterator::NextFromInput() { } } else { // We are at the end of the input, could not parse the next key, or hit - // the next key. The iterator returns the single delete if the key + // a different key. The iterator returns the single delete if the key // possibly exists beyond the current output level. We set // has_current_user_key to false so that if the iterator is at the next // key, we do not compare it again against the previous key at the next @@ -390,7 +393,8 @@ void CompactionIterator::NextFromInput() { // have hit (A) // We encapsulate the merge related state machine in a different // object to minimize change to the existing flow. - merge_helper_->MergeUntil(input_, prev_snapshot, bottommost_level_); + merge_helper_->MergeUntil(input_, range_del_agg_, prev_snapshot, + bottommost_level_); merge_out_iter_.SeekToFirst(); if (merge_out_iter_.Valid()) { @@ -416,7 +420,15 @@ void CompactionIterator::NextFromInput() { pinned_iters_mgr_.ReleasePinnedData(); } } else { - valid_ = true; + // 1. new user key -OR- + // 2. different snapshot stripe + bool should_delete = + range_del_agg_->ShouldDelete(key_, true /* for_compaction */); + if (should_delete) { + input_->Next(); + } else { + valid_ = true; + } } } } diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 04a24fdfe..6ede0d66d 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -14,6 +14,7 @@ #include "db/compaction.h" #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" +#include "db/range_del_aggregator.h" #include "rocksdb/compaction_filter.h" #include "util/log_buffer.h" @@ -47,6 +48,7 @@ class CompactionIterator { std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, Env* env, bool expect_valid_internal_key, + RangeDelAggregator* range_del_agg, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, LogBuffer* log_buffer = nullptr); @@ -99,6 +101,7 @@ class CompactionIterator { const SequenceNumber earliest_write_conflict_snapshot_; Env* env_; bool expect_valid_internal_key_; + RangeDelAggregator* range_del_agg_; const Compaction* compaction_; const CompactionFilter* compaction_filter_; LogBuffer* log_buffer_; diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 4cbccca55..61b8ba970 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -4,6 +4,10 @@ // of patent rights can be found in the PATENTS file in the same directory. #include "db/compaction_iterator.h" + +#include +#include + #include "util/testharness.h" #include "util/testutil.h" @@ -11,33 +15,45 @@ namespace rocksdb { class CompactionIteratorTest : public testing::Test { public: - CompactionIteratorTest() : cmp_(BytewiseComparator()), snapshots_({}) {} + CompactionIteratorTest() + : cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {} + + void InitIterators(const std::vector& ks, + const std::vector& vs, + const std::vector& range_del_ks, + const std::vector& range_del_vs, + SequenceNumber last_sequence) { + std::unique_ptr range_del_iter( + new test::VectorIterator(range_del_ks, range_del_vs)); + range_del_agg_.reset(new RangeDelAggregator(icmp_, snapshots_)); + range_del_agg_->AddTombstones(std::move(range_del_iter)); - void InitIterator(const std::vector& ks, - const std::vector& vs, - SequenceNumber last_sequence) { merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, nullptr, nullptr, nullptr, 0U, false, 0)); iter_.reset(new test::VectorIterator(ks, vs)); iter_->SeekToFirst(); c_iter_.reset(new CompactionIterator( iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, - kMaxSequenceNumber, Env::Default(), false)); + kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get())); } + void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); } + const Comparator* cmp_; + const InternalKeyComparator icmp_; std::vector snapshots_; std::unique_ptr merge_helper_; std::unique_ptr iter_; std::unique_ptr c_iter_; + std::unique_ptr range_del_agg_; }; // It is possible that the output of the compaction iterator is empty even if // the input is not. TEST_F(CompactionIteratorTest, EmptyResult) { - InitIterator({test::KeyStr("a", 5, kTypeSingleDeletion), - test::KeyStr("a", 3, kTypeValue)}, - {"", "val"}, 5); + InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion), + test::KeyStr("a", 3, kTypeValue)}, + {"", "val"}, {}, {}, 5); c_iter_->SeekToFirst(); ASSERT_FALSE(c_iter_->Valid()); } @@ -45,10 +61,10 @@ TEST_F(CompactionIteratorTest, EmptyResult) { // If there is a corruption after a single deletion, the corrupted key should // be preserved. TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) { - InitIterator({test::KeyStr("a", 5, kTypeSingleDeletion), - test::KeyStr("a", 3, kTypeValue, true), - test::KeyStr("b", 10, kTypeValue)}, - {"", "val", "val2"}, 10); + InitIterators({test::KeyStr("a", 5, kTypeSingleDeletion), + test::KeyStr("a", 3, kTypeValue, true), + test::KeyStr("b", 10, kTypeValue)}, + {"", "val", "val2"}, {}, {}, 10); c_iter_->SeekToFirst(); ASSERT_TRUE(c_iter_->Valid()); ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion), @@ -63,6 +79,43 @@ TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) { ASSERT_FALSE(c_iter_->Valid()); } +TEST_F(CompactionIteratorTest, SimpleRangeDeletion) { + InitIterators({test::KeyStr("morning", 5, kTypeValue), + test::KeyStr("morning", 2, kTypeValue), + test::KeyStr("night", 3, kTypeValue)}, + {"zao", "zao", "wan"}, + {test::KeyStr("ma", 4, kTypeRangeDeletion)}, {"mz"}, 5); + c_iter_->SeekToFirst(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString()); + c_iter_->Next(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("night", 3, kTypeValue), c_iter_->key().ToString()); + c_iter_->Next(); + ASSERT_FALSE(c_iter_->Valid()); +} + +TEST_F(CompactionIteratorTest, RangeDeletionWithSnapshots) { + AddSnapshot(10); + std::vector ks1; + ks1.push_back(test::KeyStr("ma", 28, kTypeRangeDeletion)); + std::vector vs1{"mz"}; + std::vector ks2{test::KeyStr("morning", 15, kTypeValue), + test::KeyStr("morning", 5, kTypeValue), + test::KeyStr("night", 40, kTypeValue), + test::KeyStr("night", 20, kTypeValue)}; + std::vector vs2{"zao 15", "zao 5", "wan 40", "wan 20"}; + InitIterators(ks2, vs2, ks1, vs1, 40); + c_iter_->SeekToFirst(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("morning", 5, kTypeValue), c_iter_->key().ToString()); + c_iter_->Next(); + ASSERT_TRUE(c_iter_->Valid()); + ASSERT_EQ(test::KeyStr("night", 40, kTypeValue), c_iter_->key().ToString()); + c_iter_->Next(); + ASSERT_FALSE(c_iter_->Valid()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 3931becb9..22f376f90 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -658,8 +658,11 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact != nullptr); - std::unique_ptr input( - versions_->MakeInputIterator(sub_compact->compaction)); + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); + std::unique_ptr range_del_agg( + new RangeDelAggregator(cfd->internal_comparator(), existing_snapshots_)); + std::unique_ptr input(versions_->MakeInputIterator( + sub_compact->compaction, range_del_agg.get())); AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); @@ -680,7 +683,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); } - ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); const MutableCFOptions* mutable_cf_options = sub_compact->compaction->mutable_cf_options(); @@ -737,7 +739,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { sub_compact->c_iter.reset(new CompactionIterator( input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, env_, false, - sub_compact->compaction, compaction_filter)); + range_del_agg.get(), sub_compact->compaction, compaction_filter)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); const auto& c_iter_stats = c_iter->iter_stats(); @@ -766,7 +768,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { sub_compact->ShouldStopBefore( key, sub_compact->current_output_file_size) && sub_compact->builder != nullptr) { - status = FinishCompactionOutputFile(input->status(), sub_compact); + status = FinishCompactionOutputFile(input->status(), sub_compact, + range_del_agg.get()); if (!status.ok()) { break; } @@ -843,6 +846,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } } + Status input_status = input->status(); + c_iter->Next(); + // Close output file if it is big enough // TODO(aekmekji): determine if file should be closed earlier than this // during subcompactions (i.e. if output size, estimated by input size, is @@ -850,14 +856,18 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // and 0.6MB instead of 1MB and 0.2MB) if (sub_compact->current_output_file_size >= sub_compact->compaction->max_output_file_size()) { - status = FinishCompactionOutputFile(input->status(), sub_compact); + const Slice* next_key = nullptr; + if (c_iter->Valid()) { + next_key = &c_iter->key(); + } + status = FinishCompactionOutputFile(input_status, sub_compact, + range_del_agg.get(), next_key); if (sub_compact->outputs.size() == 1) { // Use dictionary from first output file for compression of subsequent // files. sub_compact->compression_dict = std::move(compression_dict); } } - c_iter->Next(); } sub_compact->num_input_records = c_iter_stats.num_input_records; @@ -884,8 +894,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { status = Status::ShutdownInProgress( "Database shutdown or Column family drop during compaction"); } + if (status.ok() && sub_compact->builder == nullptr && + range_del_agg->ShouldAddTombstones(bottommost_level_)) { + status = OpenCompactionOutputFile(sub_compact); + } if (status.ok() && sub_compact->builder != nullptr) { - status = FinishCompactionOutputFile(input->status(), sub_compact); + status = FinishCompactionOutputFile(input->status(), sub_compact, + range_del_agg.get()); } if (status.ok()) { status = input->status(); @@ -936,7 +951,9 @@ void CompactionJob::RecordDroppedKeys( } Status CompactionJob::FinishCompactionOutputFile( - const Status& input_status, SubcompactionState* sub_compact) { + const Status& input_status, SubcompactionState* sub_compact, + RangeDelAggregator* range_del_agg, + const Slice* next_table_min_key /* = nullptr */) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_SYNC_FILE); assert(sub_compact != nullptr); @@ -951,6 +968,16 @@ Status CompactionJob::FinishCompactionOutputFile( // Check for iterator errors Status s = input_status; auto meta = &sub_compact->current_output()->meta; + if (s.ok()) { + // For the first output table, include range tombstones before the min key + // boundary. For subsequent output tables, this is unnecessary because we + // extend each file's max key boundary up until the next file's min key when + // range tombstones fall in the gap. + range_del_agg->AddToBuilder( + sub_compact->builder.get(), + sub_compact->outputs.size() == 1 /* extend_before_min_key */, + next_table_min_key, meta, bottommost_level_); + } const uint64_t current_entries = sub_compact->builder->NumEntries(); meta->marked_for_compaction = sub_compact->builder->NeedCompact(); if (s.ok()) { diff --git a/db/compaction_job.h b/db/compaction_job.h index a8ace56c6..12a57d71b 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -25,6 +25,7 @@ #include "db/job_context.h" #include "db/log_writer.h" #include "db/memtable_list.h" +#include "db/range_del_aggregator.h" #include "db/version_edit.h" #include "db/write_controller.h" #include "db/write_thread.h" @@ -96,7 +97,9 @@ class CompactionJob { void ProcessKeyValueCompaction(SubcompactionState* sub_compact); Status FinishCompactionOutputFile(const Status& input_status, - SubcompactionState* sub_compact); + SubcompactionState* sub_compact, + RangeDelAggregator* range_del_agg = nullptr, + const Slice* next_table_min_key = nullptr); Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options); void RecordCompactionIOStats(); Status OpenCompactionOutputFile(SubcompactionState* sub_compact); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 6f7d78c06..518c63c50 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -305,7 +305,8 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { dbfull()->TEST_WaitForCompact(); // Preloading iterator issues one table cache lookup and creates // a new table reader. One file is created for flush and one for compaction. - // Compaction inputs make no table cache look-up. + // Compaction inputs make no table cache look-up for data iterators or + // range tombstone iterators since they're already cached. ASSERT_EQ(num_table_cache_lookup, 2); // Create new iterator for: // (1) 1 for verifying flush results diff --git a/db/dbformat.h b/db/dbformat.h index 29d17b0ec..a59b15992 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -507,23 +507,30 @@ struct RangeTombstone { explicit RangeTombstone(Slice internal_key, Slice value) { ParsedInternalKey parsed_key; - if (ParseInternalKey(internal_key, &parsed_key)) { - start_key_ = parsed_key.user_key; - seq_ = parsed_key.sequence; - end_key_ = value; + if (!ParseInternalKey(internal_key, &parsed_key)) { + assert(false); } + start_key_ = parsed_key.user_key; + seq_ = parsed_key.sequence; + end_key_ = value; } - // be careful to use Serialize(); InternalKey() allocates new memory - std::pair Serialize() { + // be careful to use Serialize(), allocates new memory + std::pair Serialize() const { auto key = InternalKey(start_key_, seq_, kTypeRangeDeletion); Slice value = end_key_; return std::make_pair(std::move(key), std::move(value)); } - InternalKey SerializeKey() { + // be careful to use SerializeKey(), allocates new memory + InternalKey SerializeKey() const { return InternalKey(start_key_, seq_, kTypeRangeDeletion); } + + // be careful to use SerializeEndKey(), allocates new memory + InternalKey SerializeEndKey() const { + return InternalKey(end_key_, seq_, kTypeRangeDeletion); + } }; } // namespace rocksdb diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 9eefd402c..d78c9f237 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -75,6 +75,7 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. Status MergeHelper::MergeUntil(InternalIterator* iter, + RangeDelAggregator* range_del_agg, const SequenceNumber stop_before, const bool at_bottom) { // Get a copy of the internal key, before it's invalidated by iter->Next() @@ -171,6 +172,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, } else { // hit a merge // => if there is a compaction filter, apply it. + // => check for range tombstones covering the operand // => merge the operand into the front of the operands_ list // if not filtered // => then continue because we haven't yet seen a Put/Delete. @@ -183,8 +185,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // 1) it's included in one of the snapshots. in that case we *must* write // it out, no matter what compaction filter says // 2) it's not filtered by a compaction filter - if (ikey.sequence <= latest_snapshot_ || - !FilterMerge(orig_ikey.user_key, value_slice)) { + if ((ikey.sequence <= latest_snapshot_ || + !FilterMerge(orig_ikey.user_key, value_slice)) && + (range_del_agg == nullptr || + !range_del_agg->ShouldDelete(iter->key()))) { if (original_key_is_iter) { // this is just an optimization that saves us one memcpy keys_.push_front(std::move(original_key)); diff --git a/db/merge_helper.h b/db/merge_helper.h index 7cd992f34..de3e3c949 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -12,6 +12,7 @@ #include "db/dbformat.h" #include "db/merge_context.h" +#include "db/range_del_aggregator.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" #include "rocksdb/slice.h" @@ -71,6 +72,7 @@ class MergeHelper { // or - the end of iteration // iter: (IN) points to the first merge type entry // (OUT) points to the first entry not included in the merge process + // range_del_agg: (IN) filters merge operands covered by range tombstones. // stop_before: (IN) a sequence number that merge should not cross. // 0 means no restriction // at_bottom: (IN) true if the iterator covers the bottem level, which means @@ -85,6 +87,7 @@ class MergeHelper { // // REQUIRED: The first key in the input is not corrupted. Status MergeUntil(InternalIterator* iter, + RangeDelAggregator* range_del_agg = nullptr, const SequenceNumber stop_before = 0, const bool at_bottom = false); diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc index b21f56078..e468f7ed2 100644 --- a/db/merge_helper_test.cc +++ b/db/merge_helper_test.cc @@ -29,7 +29,8 @@ class MergeHelperTest : public testing::Test { merge_helper_.reset(new MergeHelper(env_, BytewiseComparator(), merge_op_.get(), filter_.get(), nullptr, 2U, false, latest_snapshot)); - return merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom); + return merge_helper_->MergeUntil(iter_.get(), nullptr /* range_del_agg */, + stop_before, at_bottom); } void AddKeyVal(const std::string& user_key, const SequenceNumber& seq, diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc new file mode 100644 index 000000000..1515a63f6 --- /dev/null +++ b/db/range_del_aggregator.cc @@ -0,0 +1,195 @@ +// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/range_del_aggregator.h" + +#include + +namespace rocksdb { + +RangeDelAggregator::RangeDelAggregator( + const InternalKeyComparator& icmp, + const std::vector& snapshots) + : icmp_(icmp) { + pinned_iters_mgr_.StartPinning(); + for (auto snapshot : snapshots) { + stripe_map_.emplace( + snapshot, + TombstoneMap(stl_wrappers::LessOfComparator(icmp_.user_comparator()))); + } + // Data newer than any snapshot falls in this catch-all stripe + stripe_map_.emplace(kMaxSequenceNumber, TombstoneMap()); +} + +bool RangeDelAggregator::ShouldDelete(const Slice& internal_key, + bool for_compaction /* = false */) { + ParsedInternalKey parsed; + if (!ParseInternalKey(internal_key, &parsed)) { + assert(false); + } + assert(IsValueType(parsed.type)); + + // Starting point is the snapshot stripe in which the key lives, then need to + // search all earlier stripes too, unless it's for compaction. + for (auto stripe_map_iter = GetStripeMapIter(parsed.sequence); + stripe_map_iter != stripe_map_.end(); ++stripe_map_iter) { + const auto& tombstone_map = stripe_map_iter->second; + for (const auto& start_key_and_tombstone : tombstone_map) { + const auto& tombstone = start_key_and_tombstone.second; + if (icmp_.user_comparator()->Compare(parsed.user_key, + tombstone.start_key_) < 0) { + break; + } + if (parsed.sequence < tombstone.seq_ && + icmp_.user_comparator()->Compare(parsed.user_key, + tombstone.end_key_) <= 0) { + return true; + } + } + if (for_compaction) { + break; + } + } + return false; +} + +bool RangeDelAggregator::ShouldAddTombstones( + bool bottommost_level /* = false */) { + auto stripe_map_iter = stripe_map_.begin(); + assert(stripe_map_iter != stripe_map_.end()); + if (bottommost_level) { + // For the bottommost level, keys covered by tombstones in the first + // (oldest) stripe have been compacted away, so the tombstones are obsolete. + ++stripe_map_iter; + } + while (stripe_map_iter != stripe_map_.end()) { + if (!stripe_map_iter->second.empty()) { + return true; + } + ++stripe_map_iter; + } + return false; +} + +void RangeDelAggregator::AddTombstones(ScopedArenaIterator input) { + AddTombstones(input.release(), true /* arena */); +} + +void RangeDelAggregator::AddTombstones( + std::unique_ptr input) { + AddTombstones(input.release(), false /* arena */); +} + +void RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) { + pinned_iters_mgr_.PinIterator(input, arena); + input->SeekToFirst(); + while (input->Valid()) { + RangeTombstone tombstone(input->key(), input->value()); + auto& tombstone_map = GetStripeMapIter(tombstone.seq_)->second; + tombstone_map.emplace(tombstone.start_key_.ToString(), + std::move(tombstone)); + input->Next(); + } +} + +RangeDelAggregator::StripeMap::iterator RangeDelAggregator::GetStripeMapIter( + SequenceNumber seq) { + // The stripe includes seqnum for the snapshot above and excludes seqnum for + // the snapshot below. + StripeMap::iterator iter; + if (seq > 0) { + // upper_bound() checks strict inequality so need to subtract one + iter = stripe_map_.upper_bound(seq - 1); + } else { + iter = stripe_map_.begin(); + } + // catch-all stripe justifies this assertion in either of above cases + assert(iter != stripe_map_.end()); + return iter; +} + +// TODO(andrewkr): We should implement an iterator over range tombstones in our +// map. It'd enable compaction to open tables on-demand, i.e., only once range +// tombstones are known to be available, without the code duplication we have +// in ShouldAddTombstones(). It'll also allow us to move the table-modifying +// code into more coherent places: CompactionJob and BuildTable(). +void RangeDelAggregator::AddToBuilder(TableBuilder* builder, + bool extend_before_min_key, + const Slice* next_table_min_key, + FileMetaData* meta, + bool bottommost_level /* = false */) { + auto stripe_map_iter = stripe_map_.begin(); + assert(stripe_map_iter != stripe_map_.end()); + if (bottommost_level) { + // For the bottommost level, keys covered by tombstones in the first + // (oldest) stripe have been compacted away, so the tombstones are obsolete. + ++stripe_map_iter; + } + + // Note the order in which tombstones are stored is insignificant since we + // insert them into a std::map on the read path. + bool first_added = false; + while (stripe_map_iter != stripe_map_.end()) { + for (const auto& start_key_and_tombstone : stripe_map_iter->second) { + const auto& tombstone = start_key_and_tombstone.second; + if (next_table_min_key != nullptr && + icmp_.user_comparator()->Compare(*next_table_min_key, + tombstone.start_key_) < 0) { + // Tombstones starting after next_table_min_key only need to be included + // in the next table. + break; + } + if (!extend_before_min_key && meta->smallest.size() != 0 && + icmp_.user_comparator()->Compare(tombstone.end_key_, + meta->smallest.user_key()) < 0) { + // Tombstones ending before this table's smallest key can conditionally + // be excluded, e.g., when this table is a non-first compaction output, + // we know such tombstones are included in the previous table. In that + // case extend_before_min_key would be false. + continue; + } + + auto ikey_and_end_key = tombstone.Serialize(); + builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second); + if (!first_added) { + first_added = true; + if (extend_before_min_key && + (meta->smallest.size() == 0 || + icmp_.Compare(ikey_and_end_key.first, meta->smallest) < 0)) { + meta->smallest = ikey_and_end_key.first; + } + } + auto end_ikey = tombstone.SerializeEndKey(); + if (meta->largest.size() == 0 || + icmp_.Compare(meta->largest, end_ikey) < 0) { + if (next_table_min_key != nullptr && + icmp_.Compare(*next_table_min_key, end_ikey.Encode()) < 0) { + // Pretend the largest key has the same user key as the min key in the + // following table in order for files to appear key-space partitioned. + // Choose highest seqnum so this file's largest comes before the next + // file's smallest. The fake seqnum is OK because the read path's + // file-picking code only considers the user key portion. + // + // Note Seek() also creates InternalKey with (user_key, + // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of + // kTypeRangeDeletion (0xF), so the range tombstone comes before the + // Seek() key in InternalKey's ordering. So Seek() will look in the + // next file for the user key. + ParsedInternalKey parsed; + ParseInternalKey(*next_table_min_key, &parsed); + meta->largest = InternalKey(parsed.user_key, kMaxSequenceNumber, + kTypeRangeDeletion); + } else { + meta->largest = std::move(end_ikey); + } + } + meta->smallest_seqno = std::min(meta->smallest_seqno, tombstone.seq_); + meta->largest_seqno = std::max(meta->largest_seqno, tombstone.seq_); + } + ++stripe_map_iter; + } +} + +} // namespace rocksdb diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h new file mode 100644 index 000000000..97a76e484 --- /dev/null +++ b/db/range_del_aggregator.h @@ -0,0 +1,53 @@ +// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include +#include + +#include "db/dbformat.h" +#include "db/pinned_iterators_manager.h" +#include "db/version_edit.h" +#include "include/rocksdb/comparator.h" +#include "include/rocksdb/types.h" +#include "table/internal_iterator.h" +#include "table/scoped_arena_iterator.h" +#include "table/table_builder.h" +#include "util/kv_map.h" + +namespace rocksdb { +class RangeDelAggregator { + public: + RangeDelAggregator(const InternalKeyComparator& icmp, + const std::vector& snapshots); + + bool ShouldDelete(const Slice& internal_key, bool for_compaction = false); + bool ShouldAddTombstones(bool bottommost_level = false); + void AddTombstones(ScopedArenaIterator input); + void AddTombstones(std::unique_ptr input); + // write tombstones covering a range to a table builder + // usually don't add to a max-level table builder + void AddToBuilder(TableBuilder* builder, bool extend_before_min_key, + const Slice* next_table_min_key, FileMetaData* meta, + bool bottommost_level = false); + + private: + // Maps tombstone start key -> tombstone object + typedef std::map + TombstoneMap; + // Maps snapshot seqnum -> map of tombstones that fall in that stripe, i.e., + // their seqnums are greater than the next smaller snapshot's seqnum. + typedef std::map StripeMap; + + void AddTombstones(InternalIterator* input, bool arena); + StripeMap::iterator GetStripeMapIter(SequenceNumber seq); + + PinnedIteratorsManager pinned_iters_mgr_; + StripeMap stripe_map_; + const InternalKeyComparator icmp_; +}; +} // namespace rocksdb diff --git a/db/table_cache.cc b/db/table_cache.cc index ff7997566..1ae95d925 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -170,7 +170,10 @@ InternalIterator* TableCache::NewIterator( const ReadOptions& options, const EnvOptions& env_options, const InternalKeyComparator& icomparator, const FileDescriptor& fd, TableReader** table_reader_ptr, HistogramImpl* file_read_hist, - bool for_compaction, Arena* arena, bool skip_filters, int level) { + bool for_compaction, Arena* arena, bool skip_filters, int level, + RangeDelAggregator* range_del_agg /* = nullptr */, + bool is_range_del_only /* = false */) { + assert(!is_range_del_only || range_del_agg != nullptr); PERF_TIMER_GUARD(new_table_iterator_nanos); if (table_reader_ptr != nullptr) { @@ -182,14 +185,18 @@ InternalIterator* TableCache::NewIterator( size_t readahead = 0; bool create_new_table_reader = false; - if (for_compaction) { - if (ioptions_.new_table_reader_for_compaction_inputs) { - readahead = ioptions_.compaction_readahead_size; - create_new_table_reader = true; + // pointless to create a new table reader for range tombstones only since the + // reader isn't reused + if (!is_range_del_only) { + if (for_compaction) { + if (ioptions_.new_table_reader_for_compaction_inputs) { + readahead = ioptions_.compaction_readahead_size; + create_new_table_reader = true; + } + } else { + readahead = options.readahead_size; + create_new_table_reader = readahead > 0; } - } else { - readahead = options.readahead_size; - create_new_table_reader = readahead > 0; } if (create_new_table_reader) { @@ -216,23 +223,36 @@ InternalIterator* TableCache::NewIterator( } } - InternalIterator* result = - table_reader->NewIterator(options, arena, skip_filters); - - if (create_new_table_reader) { - assert(handle == nullptr); - result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr); - } else if (handle != nullptr) { - result->RegisterCleanup(&UnrefEntry, cache_, handle); + if (range_del_agg != nullptr) { + std::unique_ptr iter( + table_reader->NewRangeTombstoneIterator(options)); + range_del_agg->AddTombstones(std::move(iter)); } - if (for_compaction) { - table_reader->SetupForCompaction(); - } - if (table_reader_ptr != nullptr) { - *table_reader_ptr = table_reader; - } + InternalIterator* result = nullptr; + if (!is_range_del_only) { + result = table_reader->NewIterator(options, arena, skip_filters); + if (create_new_table_reader) { + assert(handle == nullptr); + result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr); + } else if (handle != nullptr) { + result->RegisterCleanup(&UnrefEntry, cache_, handle); + } + if (for_compaction) { + table_reader->SetupForCompaction(); + } + if (table_reader_ptr != nullptr) { + *table_reader_ptr = table_reader; + } + } else { + assert(!create_new_table_reader); + // don't need the table reader at all since the iterator over the meta-block + // doesn't require it + if (handle != nullptr) { + UnrefEntry(cache_, handle); + } + } return result; } diff --git a/db/table_cache.h b/db/table_cache.h index 402b94fe5..2bf7f3c77 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -15,6 +15,7 @@ #include #include "db/dbformat.h" +#include "db/range_del_aggregator.h" #include "port/port.h" #include "rocksdb/cache.h" #include "rocksdb/env.h" @@ -47,12 +48,18 @@ class TableCache { // returned iterator is live. // @param skip_filters Disables loading/accessing the filter block // @param level The level this table is at, -1 for "not set / don't know" + // @param range_del_agg When non-nullptr, creates a range tombstone iterator + // over this file's meta-block and gives it to this object + // @param is_range_del_only When set, this function only gives a range + // tombstone iterator to range_del_agg and then returns nullptr InternalIterator* NewIterator( const ReadOptions& options, const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, TableReader** table_reader_ptr = nullptr, HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, - Arena* arena = nullptr, bool skip_filters = false, int level = -1); + Arena* arena = nullptr, bool skip_filters = false, int level = -1, + RangeDelAggregator* range_del_agg = nullptr, + bool is_range_del_only = false); // If a seek to internal key "k" in specified file finds an entry, // call (*handle_result)(arg, found_key, found_value) repeatedly until diff --git a/db/version_set.cc b/db/version_set.cc index b58eab415..730c797d5 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -497,7 +497,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState { const EnvOptions& env_options, const InternalKeyComparator& icomparator, HistogramImpl* file_read_hist, bool for_compaction, - bool prefix_enabled, bool skip_filters, int level) + bool prefix_enabled, bool skip_filters, int level, + RangeDelAggregator* range_del_agg) : TwoLevelIteratorState(prefix_enabled), table_cache_(table_cache), read_options_(read_options), @@ -506,7 +507,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState { file_read_hist_(file_read_hist), for_compaction_(for_compaction), skip_filters_(skip_filters), - level_(level) {} + level_(level), + range_del_agg_(range_del_agg) {} InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override { if (meta_handle.size() != sizeof(FileDescriptor)) { @@ -518,7 +520,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState { return table_cache_->NewIterator( read_options_, env_options_, icomparator_, *fd, nullptr /* don't need reference to table*/, file_read_hist_, - for_compaction_, nullptr /* arena */, skip_filters_, level_); + for_compaction_, nullptr /* arena */, skip_filters_, level_, + range_del_agg_, false /* is_range_del_only */); } } @@ -535,6 +538,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { bool for_compaction_; bool skip_filters_; int level_; + RangeDelAggregator* range_del_agg_; }; // A wrapper of version builder which references the current version in @@ -826,13 +830,13 @@ void Version::AddIterators(const ReadOptions& read_options, for (int level = 1; level < storage_info_.num_non_empty_levels(); level++) { if (storage_info_.LevelFilesBrief(level).num_files != 0) { auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState)); - auto* state = new (mem) - LevelFileIteratorState(cfd_->table_cache(), read_options, soptions, - cfd_->internal_comparator(), - cfd_->internal_stats()->GetFileReadHist(level), - false /* for_compaction */, - cfd_->ioptions()->prefix_extractor != nullptr, - IsFilterSkipped(level), level); + auto* state = new (mem) LevelFileIteratorState( + cfd_->table_cache(), read_options, soptions, + cfd_->internal_comparator(), + cfd_->internal_stats()->GetFileReadHist(level), + false /* for_compaction */, + cfd_->ioptions()->prefix_extractor != nullptr, IsFilterSkipped(level), + level, nullptr /* range_del_agg */); mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); auto* first_level_iter = new (mem) LevelFileNumIterator( cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level)); @@ -3343,7 +3347,8 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { } } -InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { +InternalIterator* VersionSet::MakeInputIterator( + const Compaction* c, RangeDelAggregator* range_del_agg) { auto cfd = c->column_family_data(); ReadOptions read_options; read_options.verify_checksums = @@ -3371,7 +3376,7 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { cfd->internal_comparator(), flevel->files[i].fd, nullptr, nullptr, /* no per level latency histogram*/ true /* for_compaction */, nullptr /* arena */, - false /* skip_filters */, (int)which /* level */); + false /* skip_filters */, (int)which /* level */, range_del_agg); } } else { // Create concatenating iterator for the files from this level @@ -3381,7 +3386,8 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { cfd->internal_comparator(), nullptr /* no per level latency histogram */, true /* for_compaction */, false /* prefix enabled */, - false /* skip_filters */, (int)which /* level */), + false /* skip_filters */, (int)which /* level */, + range_del_agg), new LevelFileNumIterator(cfd->internal_comparator(), c->input_levels(which))); } diff --git a/db/version_set.h b/db/version_set.h index 2e995227d..f84d21f77 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -695,7 +695,8 @@ class VersionSet { // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. - InternalIterator* MakeInputIterator(const Compaction* c); + InternalIterator* MakeInputIterator(const Compaction* c, + RangeDelAggregator* range_del_agg); // Add all files listed in any live version to *live. void AddLiveFiles(std::vector* live_list); diff --git a/src.mk b/src.mk index 2fe89f479..6c21b557d 100644 --- a/src.mk +++ b/src.mk @@ -10,6 +10,7 @@ LIB_SOURCES = \ db/compaction_job.cc \ db/compaction_picker.cc \ db/convenience.cc \ + db/range_del_aggregator.cc \ db/db_filesnapshot.cc \ db/dbformat.cc \ db/db_impl.cc \ diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index d1d488653..6a062424a 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -86,13 +86,14 @@ class BlockBasedTableBuilder : public TableBuilder { private: bool ok() const { return status().ok(); } - // Call block's Finish() method and then write the finalize block contents to - // file. + // Call block's Finish() method + // and then write the compressed block contents to file. void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block); - // Directly write block content to the file. + // Compress and write block content to the file. void WriteBlock(const Slice& block_contents, BlockHandle* handle, bool is_data_block); + // Directly write data to the file. void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle); Status InsertBlockInCache(const Slice& block_contents, const CompressionType type, diff --git a/table/plain_table_builder.cc b/table/plain_table_builder.cc index 9f81c262e..fa3c7874e 100644 --- a/table/plain_table_builder.cc +++ b/table/plain_table_builder.cc @@ -121,7 +121,10 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) { size_t meta_bytes_buf_size = 0; ParsedInternalKey internal_key; - ParseInternalKey(key, &internal_key); + if (!ParseInternalKey(key, &internal_key)) { + assert(false); + return; + } if (internal_key.type == kTypeRangeDeletion) { status_ = Status::NotSupported("Range deletion unsupported"); return; diff --git a/table/scoped_arena_iterator.h b/table/scoped_arena_iterator.h index 68cc144ec..d6183b0c0 100644 --- a/table/scoped_arena_iterator.h +++ b/table/scoped_arena_iterator.h @@ -40,10 +40,16 @@ class ScopedArenaIterator { } InternalIterator* operator->() { return iter_; } + InternalIterator* get() { return iter_; } void set(InternalIterator* iter) { reset(iter); } - InternalIterator* get() { return iter_; } + InternalIterator* release() { + assert(iter_ != nullptr); + auto* res = iter_; + iter_ = nullptr; + return res; + } ~ScopedArenaIterator() { reset(nullptr);