From 7a959388995f19607310ea6fd8efc5cace65c20a Mon Sep 17 00:00:00 2001 From: Changyu Bi Date: Tue, 25 Oct 2022 11:33:04 -0700 Subject: [PATCH] Improve FragmentTombstones() speed by lazily initializing `seq_set_` (#10848) Summary: FragmentedRangeTombstoneList has a member variable `seq_set_` that contains the sequence numbers of all range tombstones in a set. The set is constructed in `FragmentTombstones()` and is used only in `FragmentedRangeTombstoneList::ContainsRange()` which only happens during compaction. This PR moves the initialization of `seq_set_` to `FragmentedRangeTombstoneList::ContainsRange()`. This should speed up `FragmentTombstones()` when the range tombstone list is used for read/scan requests. Microbench shows the speed improvement to be ~45%. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10848 Test Plan: - Existing tests and stress test: `python3 tools/db_crashtest.py whitebox --simple --verify_iterator_with_expected_state_one_in=5`. - Microbench: update `range_del_aggregator_bench` to benchmark speed of `FragmentTombstones()`: ``` ./range_del_aggregator_bench --num_range_tombstones=1000 --tombstone_start_upper_bound=50000000 --num_runs=10000 --tombstone_width_mean=200 --should_deletes_per_run=100 --use_compaction_range_del_aggregator=true Before this PR: ========================= Fragment Tombstones: 270.286 us AddTombstones: 1.28933 us ShouldDelete (first): 0.525528 us ShouldDelete (rest): 0.0797519 us After this PR: time to fragment tombstones is pushed to AddTombstones() which only happen during compaction. ========================= Fragment Tombstones: 149.879 us AddTombstones: 102.131 us ShouldDelete (first): 0.565871 us ShouldDelete (rest): 0.0729444 us ``` - db_bench: this should improve speed for fragmenting range tombstones for mutable memtable: ``` ./db_bench --benchmarks=readwhilewriting --writes_per_range_tombstone=100 --max_write_buffer_number=100 --min_write_buffer_number_to_merge=100 --writes=500000 --reads=250000 --disable_auto_compactions --max_num_range_tombstones=100000 --finish_after_writes --write_buffer_size=1073741824 --threads=25 Before this PR: readwhilewriting : 18.301 micros/op 1310445 ops/sec 4.769 seconds 6250000 operations; 28.1 MB/s (41001 of 250000 found) After this PR: readwhilewriting : 16.943 micros/op 1439376 ops/sec 4.342 seconds 6250000 operations; 23.8 MB/s (28977 of 250000 found) ``` Reviewed By: ajkr Differential Revision: D40646227 Pulled By: cbi42 fbshipit-source-id: ea471667edb258f67d01cfd828588e80a89e4083 --- db/memtable.cc | 7 ++-- db/range_del_aggregator_bench.cc | 38 +++++++++++++++----- db/range_tombstone_fragmenter.cc | 17 +++++---- db/range_tombstone_fragmenter.h | 20 ++++++----- table/block_based/block_based_table_reader.h | 2 +- 5 files changed, 55 insertions(+), 29 deletions(-) diff --git a/db/memtable.cc b/db/memtable.cc index a1c061932..2dac88dd3 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -587,10 +587,9 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal( auto* unfragmented_iter = new MemTableIterator(*this, read_options, nullptr /* arena */, true /* use_range_del_table */); - cache->tombstones = std::make_unique( - FragmentedRangeTombstoneList( - std::unique_ptr(unfragmented_iter), - comparator_.comparator)); + cache->tombstones.reset(new FragmentedRangeTombstoneList( + std::unique_ptr(unfragmented_iter), + comparator_.comparator)); cache->initialized.store(true, std::memory_order_release); } cache->reader_mutex.unlock(); diff --git a/db/range_del_aggregator_bench.cc b/db/range_del_aggregator_bench.cc index 651999bd8..9dca707e5 100644 --- a/db/range_del_aggregator_bench.cc +++ b/db/range_del_aggregator_bench.cc @@ -54,12 +54,17 @@ DEFINE_int32(should_deletes_per_run, 1, "number of ShouldDelete calls per run"); DEFINE_int32(add_tombstones_per_run, 1, "number of AddTombstones calls per run"); +DEFINE_bool(use_compaction_range_del_aggregator, false, + "Whether to use CompactionRangeDelAggregator. Default is to use " + "ReadRangeDelAggregator."); + namespace { struct Stats { uint64_t time_add_tombstones = 0; uint64_t time_first_should_delete = 0; uint64_t time_rest_should_delete = 0; + uint64_t time_fragment_tombstones = 0; }; std::ostream& operator<<(std::ostream& os, const Stats& s) { @@ -67,6 +72,10 @@ std::ostream& operator<<(std::ostream& os, const Stats& s) { fmt_holder.copyfmt(os); os << std::left; + os << std::setw(25) << "Fragment Tombstones: " + << s.time_fragment_tombstones / + (FLAGS_add_tombstones_per_run * FLAGS_num_runs * 1.0e3) + << " us\n"; os << std::setw(25) << "AddTombstones: " << s.time_add_tombstones / (FLAGS_add_tombstones_per_run * FLAGS_num_runs * 1.0e3) @@ -186,10 +195,17 @@ int main(int argc, char** argv) { FLAGS_num_range_tombstones); } auto mode = ROCKSDB_NAMESPACE::RangeDelPositioningMode::kForwardTraversal; - + std::vector snapshots{0}; for (int i = 0; i < FLAGS_num_runs; i++) { - ROCKSDB_NAMESPACE::ReadRangeDelAggregator range_del_agg( - &icmp, ROCKSDB_NAMESPACE::kMaxSequenceNumber /* upper_bound */); + std::unique_ptr range_del_agg = + nullptr; + if (FLAGS_use_compaction_range_del_aggregator) { + range_del_agg.reset(new ROCKSDB_NAMESPACE::CompactionRangeDelAggregator( + &icmp, snapshots)); + } else { + range_del_agg.reset(new ROCKSDB_NAMESPACE::ReadRangeDelAggregator( + &icmp, ROCKSDB_NAMESPACE::kMaxSequenceNumber /* upper_bound */)); + } std::vector< std::unique_ptr > @@ -207,12 +223,16 @@ int main(int argc, char** argv) { ROCKSDB_NAMESPACE::PersistentRangeTombstone( ROCKSDB_NAMESPACE::Key(start), ROCKSDB_NAMESPACE::Key(end), j); } - + auto iter = + ROCKSDB_NAMESPACE::MakeRangeDelIterator(persistent_range_tombstones); + ROCKSDB_NAMESPACE::StopWatchNano stop_watch_fragment_tombstones( + clock, true /* auto_start */); fragmented_range_tombstone_lists.emplace_back( new ROCKSDB_NAMESPACE::FragmentedRangeTombstoneList( - ROCKSDB_NAMESPACE::MakeRangeDelIterator( - persistent_range_tombstones), - icmp)); + std::move(iter), icmp, FLAGS_use_compaction_range_del_aggregator, + snapshots)); + stats.time_fragment_tombstones += + stop_watch_fragment_tombstones.ElapsedNanos(); std::unique_ptr fragmented_range_del_iter( new ROCKSDB_NAMESPACE::FragmentedRangeTombstoneIterator( @@ -221,7 +241,7 @@ int main(int argc, char** argv) { ROCKSDB_NAMESPACE::StopWatchNano stop_watch_add_tombstones( clock, true /* auto_start */); - range_del_agg.AddTombstones(std::move(fragmented_range_del_iter)); + range_del_agg->AddTombstones(std::move(fragmented_range_del_iter)); stats.time_add_tombstones += stop_watch_add_tombstones.ElapsedNanos(); } @@ -238,7 +258,7 @@ int main(int argc, char** argv) { ROCKSDB_NAMESPACE::StopWatchNano stop_watch_should_delete( clock, true /* auto_start */); - range_del_agg.ShouldDelete(parsed_key, mode); + range_del_agg->ShouldDelete(parsed_key, mode); uint64_t call_time = stop_watch_should_delete.ElapsedNanos(); if (j == 0) { diff --git a/db/range_tombstone_fragmenter.cc b/db/range_tombstone_fragmenter.cc index 925b4ed33..7e7cedeca 100644 --- a/db/range_tombstone_fragmenter.cc +++ b/db/range_tombstone_fragmenter.cc @@ -156,7 +156,6 @@ void FragmentedRangeTombstoneList::FragmentTombstones( if (seq <= next_snapshot) { // This seqnum is visible by a lower snapshot. tombstone_seqs_.push_back(seq); - seq_set_.insert(seq); auto upper_bound_it = std::lower_bound(snapshots.begin(), snapshots.end(), seq); if (upper_bound_it == snapshots.begin()) { @@ -173,7 +172,6 @@ void FragmentedRangeTombstoneList::FragmentTombstones( // The fragmentation is being done for reads, so preserve all seqnums. tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(), seqnums_to_flush.end()); - seq_set_.insert(seqnums_to_flush.begin(), seqnums_to_flush.end()); if (ts_sz) { tombstone_timestamps_.insert(tombstone_timestamps_.end(), timestamps_to_flush.begin(), @@ -258,15 +256,20 @@ void FragmentedRangeTombstoneList::FragmentTombstones( } bool FragmentedRangeTombstoneList::ContainsRange(SequenceNumber lower, - SequenceNumber upper) const { + SequenceNumber upper) { + std::call_once(seq_set_init_once_flag_, [this]() { + for (auto s : tombstone_seqs_) { + seq_set_.insert(s); + } + }); auto seq_it = seq_set_.lower_bound(lower); return seq_it != seq_set_.end() && *seq_it <= upper; } FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator( - const FragmentedRangeTombstoneList* tombstones, - const InternalKeyComparator& icmp, SequenceNumber _upper_bound, - const Slice* ts_upper_bound, SequenceNumber _lower_bound) + FragmentedRangeTombstoneList* tombstones, const InternalKeyComparator& icmp, + SequenceNumber _upper_bound, const Slice* ts_upper_bound, + SequenceNumber _lower_bound) : tombstone_start_cmp_(icmp.user_comparator()), tombstone_end_cmp_(icmp.user_comparator()), icmp_(&icmp), @@ -280,7 +283,7 @@ FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator( } FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator( - const std::shared_ptr& tombstones, + const std::shared_ptr& tombstones, const InternalKeyComparator& icmp, SequenceNumber _upper_bound, const Slice* ts_upper_bound, SequenceNumber _lower_bound) : tombstone_start_cmp_(icmp.user_comparator()), diff --git a/db/range_tombstone_fragmenter.h b/db/range_tombstone_fragmenter.h index f4b0eab42..df07fa894 100644 --- a/db/range_tombstone_fragmenter.h +++ b/db/range_tombstone_fragmenter.h @@ -84,7 +84,9 @@ struct FragmentedRangeTombstoneList { // Returns true if the stored tombstones contain with one with a sequence // number in [lower, upper]. - bool ContainsRange(SequenceNumber lower, SequenceNumber upper) const; + // This method is not const as it internally lazy initialize a set of + // sequence numbers (`seq_set_`). + bool ContainsRange(SequenceNumber lower, SequenceNumber upper); uint64_t num_unfragmented_tombstones() const { return num_unfragmented_tombstones_; @@ -113,6 +115,7 @@ struct FragmentedRangeTombstoneList { std::vector tombstones_; std::vector tombstone_seqs_; std::vector tombstone_timestamps_; + std::once_flag seq_set_init_once_flag_; std::set seq_set_; std::list pinned_slices_; PinnedIteratorsManager pinned_iters_mgr_; @@ -131,12 +134,13 @@ struct FragmentedRangeTombstoneList { // tombstone collapsing is always O(n log n). class FragmentedRangeTombstoneIterator : public InternalIterator { public: + FragmentedRangeTombstoneIterator(FragmentedRangeTombstoneList* tombstones, + const InternalKeyComparator& icmp, + SequenceNumber upper_bound, + const Slice* ts_upper_bound = nullptr, + SequenceNumber lower_bound = 0); FragmentedRangeTombstoneIterator( - const FragmentedRangeTombstoneList* tombstones, - const InternalKeyComparator& icmp, SequenceNumber upper_bound, - const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0); - FragmentedRangeTombstoneIterator( - const std::shared_ptr& tombstones, + const std::shared_ptr& tombstones, const InternalKeyComparator& icmp, SequenceNumber upper_bound, const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0); FragmentedRangeTombstoneIterator( @@ -311,9 +315,9 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { const RangeTombstoneStackEndComparator tombstone_end_cmp_; const InternalKeyComparator* icmp_; const Comparator* ucmp_; - std::shared_ptr tombstones_ref_; + std::shared_ptr tombstones_ref_; std::shared_ptr tombstones_cache_ref_; - const FragmentedRangeTombstoneList* tombstones_; + FragmentedRangeTombstoneList* tombstones_; SequenceNumber upper_bound_; SequenceNumber lower_bound_; // Only consider timestamps <= ts_upper_bound_. diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 7054a2dd4..89de891c9 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -597,7 +597,7 @@ struct BlockBasedTable::Rep { bool prefix_filtering; std::shared_ptr table_prefix_extractor; - std::shared_ptr fragmented_range_dels; + std::shared_ptr fragmented_range_dels; // If global_seqno is used, all Keys in this file will have the same // seqno with value `global_seqno`.