From db325a5904e8df0c80b324c510335735adef6558 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Wed, 9 Jun 2021 15:40:16 -0700 Subject: [PATCH] Add a clipping internal iterator (#8327) Summary: Logically, subcompactions process a key range [start, end); however, the way this is currently implemented is that the `CompactionIterator` for any given subcompaction keeps processing key-values until it actually outputs a key that is out of range, which is then discarded. Instead of doing this, the patch introduces a new type of internal iterator called `ClippingIterator` which wraps another internal iterator and "clips" its range of key-values so that any KVs returned are strictly in the [start, end) interval. This does eliminate a (minor) inefficiency by stopping processing in subcompactions exactly at the limit; however, the main motivation is related to BlobDB: namely, we need this to be able to measure the amount of garbage generated by a subcompaction precisely and prevent off-by-one errors. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8327 Test Plan: `make check` Reviewed By: siying Differential Revision: D28761541 Pulled By: ltamasi fbshipit-source-id: ee0e7229f04edabbc7bed5adb51771fbdc287f69 --- CMakeLists.txt | 1 + Makefile | 4 + TARGETS | 7 + db/compaction/clipping_iterator.h | 275 ++++++++++++++++++++++++ db/compaction/clipping_iterator_test.cc | 256 ++++++++++++++++++++++ db/compaction/compaction_job.cc | 58 +++-- src.mk | 1 + 7 files changed, 583 insertions(+), 19 deletions(-) create mode 100644 db/compaction/clipping_iterator.h create mode 100644 db/compaction/clipping_iterator_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 61d94a709..3a2dc7ed8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1133,6 +1133,7 @@ if(WITH_TESTS) db/blob/db_blob_index_test.cc db/column_family_test.cc db/compact_files_test.cc + db/compaction/clipping_iterator_test.cc db/compaction/compaction_job_stats_test.cc db/compaction/compaction_job_test.cc db/compaction/compaction_iterator_test.cc diff --git a/Makefile b/Makefile index c0e3cc969..a4e04a79b 100644 --- a/Makefile +++ b/Makefile @@ -1866,6 +1866,10 @@ db_blob_corruption_test: $(OBJ_DIR)/db/blob/db_blob_corruption_test.o $(TEST_LIB db_write_buffer_manager_test: $(OBJ_DIR)/db/db_write_buffer_manager_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) + +clipping_iterator_test: $(OBJ_DIR)/db/compaction/clipping_iterator_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + #------------------------------------------------- # make install related stuff PREFIX ?= /usr/local diff --git a/TARGETS b/TARGETS index 1f8c57f65..75f53a733 100644 --- a/TARGETS +++ b/TARGETS @@ -1051,6 +1051,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "clipping_iterator_test", + "db/compaction/clipping_iterator_test.cc", + "parallel", + [], + [], + ], [ "coding_test", "util/coding_test.cc", diff --git a/db/compaction/clipping_iterator.h b/db/compaction/clipping_iterator.h new file mode 100644 index 000000000..b287b653e --- /dev/null +++ b/db/compaction/clipping_iterator.h @@ -0,0 +1,275 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include "table/internal_iterator.h" + +namespace ROCKSDB_NAMESPACE { + +// An internal iterator that wraps another one and ensures that any keys +// returned are strictly within a range [start, end). If the underlying +// iterator has already performed the bounds checking, it relies on that result; +// otherwise, it performs the necessary key comparisons itself. Both bounds +// are optional. +class ClippingIterator : public InternalIterator { + public: + ClippingIterator(InternalIterator* iter, const Slice* start, const Slice* end, + const Comparator* cmp) + : iter_(iter), start_(start), end_(end), cmp_(cmp), valid_(false) { + assert(iter_); + assert(cmp_); + assert(!start_ || !end_ || cmp_->Compare(*start_, *end_) <= 0); + + UpdateAndEnforceBounds(); + } + + bool Valid() const override { return valid_; } + + void SeekToFirst() override { + if (start_) { + iter_->Seek(*start_); + } else { + iter_->SeekToFirst(); + } + + UpdateAndEnforceUpperBound(); + } + + void SeekToLast() override { + if (end_) { + iter_->SeekForPrev(*end_); + + // Upper bound is exclusive, so we need a key which is strictly smaller + if (iter_->Valid() && cmp_->Compare(iter_->key(), *end_) == 0) { + iter_->Prev(); + } + } else { + iter_->SeekToLast(); + } + + UpdateAndEnforceLowerBound(); + } + + void Seek(const Slice& target) override { + if (start_ && cmp_->Compare(target, *start_) < 0) { + iter_->Seek(*start_); + UpdateAndEnforceUpperBound(); + return; + } + + if (end_ && cmp_->Compare(target, *end_) >= 0) { + valid_ = false; + return; + } + + iter_->Seek(target); + UpdateAndEnforceUpperBound(); + } + + void SeekForPrev(const Slice& target) override { + if (start_ && cmp_->Compare(target, *start_) < 0) { + valid_ = false; + return; + } + + if (end_ && cmp_->Compare(target, *end_) >= 0) { + iter_->SeekForPrev(*end_); + + // Upper bound is exclusive, so we need a key which is strictly smaller + if (iter_->Valid() && cmp_->Compare(iter_->key(), *end_) == 0) { + iter_->Prev(); + } + + UpdateAndEnforceLowerBound(); + return; + } + + iter_->SeekForPrev(target); + UpdateAndEnforceLowerBound(); + } + + void Next() override { + assert(valid_); + iter_->Next(); + UpdateAndEnforceUpperBound(); + } + + bool NextAndGetResult(IterateResult* result) override { + assert(valid_); + assert(result); + + IterateResult res; + valid_ = iter_->NextAndGetResult(&res); + + if (!valid_) { + return false; + } + + if (end_) { + EnforceUpperBoundImpl(res.bound_check_result); + + if (!valid_) { + return false; + } + } + + res.bound_check_result = IterBoundCheck::kInbound; + *result = res; + + return true; + } + + void Prev() override { + assert(valid_); + iter_->Prev(); + UpdateAndEnforceLowerBound(); + } + + Slice key() const override { + assert(valid_); + return iter_->key(); + } + + Slice user_key() const override { + assert(valid_); + return iter_->user_key(); + } + + Slice value() const override { + assert(valid_); + return iter_->value(); + } + + Status status() const override { return iter_->status(); } + + bool PrepareValue() override { + assert(valid_); + + if (iter_->PrepareValue()) { + return true; + } + + assert(!iter_->Valid()); + valid_ = false; + return false; + } + + bool MayBeOutOfLowerBound() override { + assert(valid_); + return false; + } + + IterBoundCheck UpperBoundCheckResult() override { + assert(valid_); + return IterBoundCheck::kInbound; + } + + void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { + iter_->SetPinnedItersMgr(pinned_iters_mgr); + } + + bool IsKeyPinned() const override { + assert(valid_); + return iter_->IsKeyPinned(); + } + + bool IsValuePinned() const override { + assert(valid_); + return iter_->IsValuePinned(); + } + + Status GetProperty(std::string prop_name, std::string* prop) override { + return iter_->GetProperty(prop_name, prop); + } + + private: + void UpdateValid() { + assert(!iter_->Valid() || iter_->status().ok()); + + valid_ = iter_->Valid(); + } + + void EnforceUpperBoundImpl(IterBoundCheck bound_check_result) { + if (bound_check_result == IterBoundCheck::kInbound) { + return; + } + + if (bound_check_result == IterBoundCheck::kOutOfBound) { + valid_ = false; + return; + } + + assert(bound_check_result == IterBoundCheck::kUnknown); + + if (cmp_->Compare(key(), *end_) >= 0) { + valid_ = false; + } + } + + void EnforceUpperBound() { + if (!valid_) { + return; + } + + if (!end_) { + return; + } + + EnforceUpperBoundImpl(iter_->UpperBoundCheckResult()); + } + + void EnforceLowerBound() { + if (!valid_) { + return; + } + + if (!start_) { + return; + } + + if (!iter_->MayBeOutOfLowerBound()) { + return; + } + + if (cmp_->Compare(key(), *start_) < 0) { + valid_ = false; + } + } + + void AssertBounds() { + assert(!valid_ || !start_ || cmp_->Compare(key(), *start_) >= 0); + assert(!valid_ || !end_ || cmp_->Compare(key(), *end_) < 0); + } + + void UpdateAndEnforceBounds() { + UpdateValid(); + EnforceUpperBound(); + EnforceLowerBound(); + AssertBounds(); + } + + void UpdateAndEnforceUpperBound() { + UpdateValid(); + EnforceUpperBound(); + AssertBounds(); + } + + void UpdateAndEnforceLowerBound() { + UpdateValid(); + EnforceLowerBound(); + AssertBounds(); + } + + InternalIterator* iter_; + const Slice* start_; + const Slice* end_; + const Comparator* cmp_; + bool valid_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/clipping_iterator_test.cc b/db/compaction/clipping_iterator_test.cc new file mode 100644 index 000000000..3a31b61eb --- /dev/null +++ b/db/compaction/clipping_iterator_test.cc @@ -0,0 +1,256 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/compaction/clipping_iterator.h" + +#include +#include +#include +#include + +#include "rocksdb/comparator.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +// A vector iterator which does its own bounds checking. This is for testing the +// optimizations in the clipping iterator where we bypass the bounds checking if +// the input iterator has already performed it. +class BoundsCheckingVectorIterator : public test::VectorIterator { + public: + BoundsCheckingVectorIterator(const std::vector& keys, + const std::vector& values, + const Slice* start, const Slice* end, + const Comparator* cmp) + : VectorIterator(keys, values), start_(start), end_(end), cmp_(cmp) { + assert(cmp_); + } + + bool NextAndGetResult(IterateResult* result) override { + assert(Valid()); + assert(result); + + Next(); + + if (!Valid()) { + return false; + } + + result->key = key(); + result->bound_check_result = UpperBoundCheckResult(); + result->value_prepared = true; + + return true; + } + + bool MayBeOutOfLowerBound() override { + assert(Valid()); + + if (!start_) { + return false; + } + + return cmp_->Compare(key(), *start_) < 0; + } + + IterBoundCheck UpperBoundCheckResult() override { + assert(Valid()); + + if (!end_) { + return IterBoundCheck::kInbound; + } + + return cmp_->Compare(key(), *end_) >= 0 ? IterBoundCheck::kOutOfBound + : IterBoundCheck::kInbound; + } + + private: + const Slice* start_; + const Slice* end_; + const Comparator* cmp_; +}; + +class ClippingIteratorTest + : public ::testing::Test, + public ::testing::WithParamInterface> {}; + +TEST_P(ClippingIteratorTest, Clip) { + const std::vector keys{"key0", "key1", "key2", "key3", "key4", + "key5", "key6", "key7", "key8", "key9"}; + const std::vector values{ + "unused0", "value1", "value2", "value3", "unused4", + "unused5", "unused6", "unused7", "unused8", "unused9"}; + + assert(keys.size() == values.size()); + + // Note: the input always contains key1, key2, and key3; however, the clipping + // window is based on the test parameters: its left edge is a value in the + // range [0, 4], and its size is a value in the range [0, 5] + const std::vector input_keys{keys[1], keys[2], keys[3]}; + const std::vector input_values{values[1], values[2], values[3]}; + + const bool use_bounds_checking_vec_it = std::get<0>(GetParam()); + + const size_t clip_start_idx = std::get<1>(GetParam()); + const size_t clip_window_size = std::get<2>(GetParam()); + const size_t clip_end_idx = clip_start_idx + clip_window_size; + + const Slice start(keys[clip_start_idx]); + const Slice end(keys[clip_end_idx]); + + std::unique_ptr input( + use_bounds_checking_vec_it + ? new BoundsCheckingVectorIterator(input_keys, input_values, &start, + &end, BytewiseComparator()) + : new test::VectorIterator(input_keys, input_values)); + + ClippingIterator clip(input.get(), &start, &end, BytewiseComparator()); + + // The range the clipping iterator should return values from. This is + // essentially the intersection of the input range [1, 4) and the clipping + // window [clip_start_idx, clip_end_idx) + const size_t data_start_idx = + std::max(clip_start_idx, static_cast(1)); + const size_t data_end_idx = std::min(clip_end_idx, static_cast(4)); + + // Range is empty; all Seeks should fail + if (data_start_idx >= data_end_idx) { + clip.SeekToFirst(); + ASSERT_FALSE(clip.Valid()); + + clip.SeekToLast(); + ASSERT_FALSE(clip.Valid()); + + for (size_t i = 0; i < keys.size(); ++i) { + clip.Seek(keys[i]); + ASSERT_FALSE(clip.Valid()); + + clip.SeekForPrev(keys[i]); + ASSERT_FALSE(clip.Valid()); + } + + return; + } + + // Range is non-empty; call SeekToFirst and iterate forward + clip.SeekToFirst(); + ASSERT_TRUE(clip.Valid()); + ASSERT_EQ(clip.key(), keys[data_start_idx]); + ASSERT_EQ(clip.value(), values[data_start_idx]); + ASSERT_FALSE(clip.MayBeOutOfLowerBound()); + ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound); + + for (size_t i = data_start_idx + 1; i < data_end_idx; ++i) { + clip.Next(); + ASSERT_TRUE(clip.Valid()); + ASSERT_EQ(clip.key(), keys[i]); + ASSERT_EQ(clip.value(), values[i]); + ASSERT_FALSE(clip.MayBeOutOfLowerBound()); + ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound); + } + + clip.Next(); + ASSERT_FALSE(clip.Valid()); + + // Do it again using NextAndGetResult + clip.SeekToFirst(); + ASSERT_TRUE(clip.Valid()); + ASSERT_EQ(clip.key(), keys[data_start_idx]); + ASSERT_EQ(clip.value(), values[data_start_idx]); + ASSERT_FALSE(clip.MayBeOutOfLowerBound()); + ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound); + + for (size_t i = data_start_idx + 1; i < data_end_idx; ++i) { + IterateResult result; + ASSERT_TRUE(clip.NextAndGetResult(&result)); + ASSERT_EQ(result.key, keys[i]); + ASSERT_EQ(result.bound_check_result, IterBoundCheck::kInbound); + ASSERT_TRUE(clip.Valid()); + ASSERT_EQ(clip.key(), keys[i]); + ASSERT_EQ(clip.value(), values[i]); + ASSERT_FALSE(clip.MayBeOutOfLowerBound()); + ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound); + } + + IterateResult result; + ASSERT_FALSE(clip.NextAndGetResult(&result)); + ASSERT_FALSE(clip.Valid()); + + // Call SeekToLast and iterate backward + clip.SeekToLast(); + ASSERT_TRUE(clip.Valid()); + ASSERT_EQ(clip.key(), keys[data_end_idx - 1]); + ASSERT_EQ(clip.value(), values[data_end_idx - 1]); + ASSERT_FALSE(clip.MayBeOutOfLowerBound()); + ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound); + + for (size_t i = data_end_idx - 2; i >= data_start_idx; --i) { + clip.Prev(); + ASSERT_TRUE(clip.Valid()); + ASSERT_EQ(clip.key(), keys[i]); + ASSERT_EQ(clip.value(), values[i]); + ASSERT_FALSE(clip.MayBeOutOfLowerBound()); + ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound); + } + + clip.Prev(); + ASSERT_FALSE(clip.Valid()); + + // Call Seek/SeekForPrev for all keys; Seek should return the smallest key + // which is >= the target; SeekForPrev should return the largest key which is + // <= the target + for (size_t i = 0; i < keys.size(); ++i) { + clip.Seek(keys[i]); + + if (i < data_start_idx) { + ASSERT_TRUE(clip.Valid()); + ASSERT_EQ(clip.key(), keys[data_start_idx]); + ASSERT_EQ(clip.value(), values[data_start_idx]); + ASSERT_FALSE(clip.MayBeOutOfLowerBound()); + ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound); + } else if (i < data_end_idx) { + ASSERT_TRUE(clip.Valid()); + ASSERT_EQ(clip.key(), keys[i]); + ASSERT_EQ(clip.value(), values[i]); + ASSERT_FALSE(clip.MayBeOutOfLowerBound()); + ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound); + } else { + ASSERT_FALSE(clip.Valid()); + } + + clip.SeekForPrev(keys[i]); + + if (i < data_start_idx) { + ASSERT_FALSE(clip.Valid()); + } else if (i < data_end_idx) { + ASSERT_TRUE(clip.Valid()); + ASSERT_EQ(clip.key(), keys[i]); + ASSERT_EQ(clip.value(), values[i]); + ASSERT_FALSE(clip.MayBeOutOfLowerBound()); + ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound); + } else { + ASSERT_TRUE(clip.Valid()); + ASSERT_EQ(clip.key(), keys[data_end_idx - 1]); + ASSERT_EQ(clip.value(), values[data_end_idx - 1]); + ASSERT_FALSE(clip.MayBeOutOfLowerBound()); + ASSERT_EQ(clip.UpperBoundCheckResult(), IterBoundCheck::kInbound); + } + } +} + +INSTANTIATE_TEST_CASE_P( + ClippingIteratorTest, ClippingIteratorTest, + ::testing::Combine( + ::testing::Bool(), + ::testing::Range(static_cast(0), static_cast(5)), + ::testing::Range(static_cast(0), static_cast(6)))); + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 1fb2c63e2..550db7d8a 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -23,6 +23,7 @@ #include "db/blob/blob_file_addition.h" #include "db/blob/blob_file_builder.h" #include "db/builder.h" +#include "db/compaction/clipping_iterator.h" #include "db/db_impl/db_impl.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -1086,6 +1087,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(), existing_snapshots_); + + const Slice* const start = sub_compact->start; + const Slice* const end = sub_compact->end; + ReadOptions read_options; read_options.verify_checksums = true; read_options.fill_cache = false; @@ -1094,12 +1099,39 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // (a) concurrent compactions, // (b) CompactionFilter::Decision::kRemoveAndSkipUntil. read_options.total_order_seek = true; + read_options.iterate_lower_bound = start; + read_options.iterate_upper_bound = end; // Although the v2 aggregator is what the level iterator(s) know about, // the AddTombstones calls will be propagated down to the v1 aggregator. - std::unique_ptr input( + std::unique_ptr raw_input( versions_->MakeInputIterator(read_options, sub_compact->compaction, &range_del_agg, file_options_for_read_)); + InternalIterator* input = raw_input.get(); + + IterKey start_ikey; + IterKey end_ikey; + Slice start_slice; + Slice end_slice; + + if (start) { + start_ikey.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); + start_slice = start_ikey.GetInternalKey(); + } + if (end) { + end_ikey.SetInternalKey(*end, kMaxSequenceNumber, kValueTypeForSeek); + end_slice = end_ikey.GetInternalKey(); + } + + std::unique_ptr clip; + if (start || end) { + clip.reset(new ClippingIterator( + raw_input.get(), start ? &start_slice : nullptr, + end ? &end_slice : nullptr, &cfd->internal_comparator())); + input = clip.get(); + } + + input->SeekToFirst(); AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); @@ -1154,21 +1186,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { reinterpret_cast( const_cast*>(manual_compaction_paused_))); - Slice* start = sub_compact->start; - Slice* end = sub_compact->end; - if (start != nullptr) { - IterKey start_iter; - start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); - input->Seek(start_iter.GetInternalKey()); - } else { - input->SeekToFirst(); - } - Status status; const std::string* const full_history_ts_low = full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_; sub_compact->c_iter.reset(new CompactionIterator( - input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), + input, cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), /*expect_valid_internal_key=*/true, &range_del_agg, @@ -1199,12 +1221,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { const Slice& key = c_iter->key(); const Slice& value = c_iter->value(); - // If an end key (exclusive) is specified, check if the current key is - // >= than it and exit if it is because the iterator is out of its range - if (end != nullptr && - cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { - break; - } + assert(!end || + cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0); + if (c_iter_stats.num_input_records % kRecordStatsEvery == kRecordStatsEvery - 1) { RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); @@ -1391,7 +1410,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { #endif // ROCKSDB_ASSERT_STATUS_CHECKED sub_compact->c_iter.reset(); - input.reset(); + clip.reset(); + raw_input.reset(); sub_compact->status = status; } diff --git a/src.mk b/src.mk index 1a99b55a8..e419498bb 100644 --- a/src.mk +++ b/src.mk @@ -388,6 +388,7 @@ TEST_MAIN_SOURCES = \ db/blob/db_blob_index_test.cc \ db/column_family_test.cc \ db/compact_files_test.cc \ + db/compaction/clipping_iterator_test.cc \ db/compaction/compaction_iterator_test.cc \ db/compaction/compaction_job_test.cc \ db/compaction/compaction_job_stats_test.cc \