From a4a4a2dabdb9fc8dbd8567abaa50042de84a44f4 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Mon, 3 Aug 2020 15:21:56 -0700 Subject: [PATCH] dedup ReadOptions in iterator hierarchy (#7210) Summary: Previously, a `ReadOptions` object was stored in every `BlockBasedTableIterator` and every `LevelIterator`. This redundancy consumes extra memory, resulting in the `Arena` making more allocations, and iteration observing worse cache performance. This PR migrates callers of `NewInternalIterator()` and `MakeInputIterator()` to provide a `ReadOptions` object guaranteed to outlive the returned iterator. When the iterator's lifetime will be managed by the user, this lifetime guarantee is achieved by storing the `ReadOptions` value in `ArenaWrappedDBIter`. Then, sub-iterators of `NewInternalIterator()` and `MakeInputIterator()` can hold a reference-to-const `ReadOptions`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7210 Test Plan: - `make check` under ASAN and valgrind - benchmark: on a DB with 2 L0 files and 3 L1+ levels, this PR reduced `Arena` allocation 4792 -> 4160 bytes. Reviewed By: anand1976 Differential Revision: D22861323 Pulled By: ajkr fbshipit-source-id: 54aebb3e89c872eeab0f5793b4b6e42878d093ce --- db/arena_wrapped_db_iter.cc | 4 +-- db/arena_wrapped_db_iter.h | 7 ++-- db/builder.cc | 3 +- db/compaction/compaction_job.cc | 16 +++++++-- db/db_basic_test.cc | 33 ++++++++++++++----- db/db_compaction_filter_test.cc | 9 +++-- db/db_impl/db_impl.cc | 22 +++++++------ db/db_impl/db_impl.h | 16 ++++++--- db/db_impl/db_impl_readonly.cc | 16 ++++----- db/db_impl/db_impl_secondary.cc | 8 ++--- db/db_test_util.cc | 15 +++++---- db/table_cache.h | 1 + db/version_set.cc | 15 +++------ db/version_set.h | 16 ++++++--- .../block_based/block_based_table_iterator.h | 28 ++++++++-------- table/block_based/block_based_table_reader.h | 1 + table/table_reader.h | 6 ++-- table/table_test.cc | 33 ++++++++++++------- utilities/debug.cc | 5 +-- 19 files changed, 155 insertions(+), 99 deletions(-) diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc index f5a95f6df..a1a1be3a2 100644 --- a/db/arena_wrapped_db_iter.cc +++ b/db/arena_wrapped_db_iter.cc @@ -45,6 +45,7 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options, true, max_sequential_skip_in_iteration, read_callback, db_impl, cfd, allow_blob); sv_number_ = version_number; + read_options_ = read_options; allow_refresh_ = allow_refresh; } @@ -98,8 +99,7 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( max_sequential_skip_in_iterations, version_number, read_callback, db_impl, cfd, allow_blob, allow_refresh); if (db_impl != nullptr && cfd != nullptr && allow_refresh) { - iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback, - allow_blob); + iter->StoreRefreshInfo(db_impl, cfd, read_callback, allow_blob); } return iter; diff --git a/db/arena_wrapped_db_iter.h b/db/arena_wrapped_db_iter.h index af6eb68f7..80422f63a 100644 --- a/db/arena_wrapped_db_iter.h +++ b/db/arena_wrapped_db_iter.h @@ -41,6 +41,7 @@ class ArenaWrappedDBIter : public Iterator { virtual ReadRangeDelAggregator* GetRangeDelAggregator() { return db_iter_->GetRangeDelAggregator(); } + const ReadOptions& GetReadOptions() { return read_options_; } // Set the internal iterator wrapped inside the DB Iterator. Usually it is // a merging iterator. @@ -79,10 +80,8 @@ class ArenaWrappedDBIter : public Iterator { // Store some parameters so we can refresh the iterator at a later point // with these same params - void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, - ColumnFamilyData* cfd, ReadCallback* read_callback, - bool allow_blob) { - read_options_ = read_options; + void StoreRefreshInfo(DBImpl* db_impl, ColumnFamilyData* cfd, + ReadCallback* read_callback, bool allow_blob) { db_impl_ = db_impl; cfd_ = cfd; read_callback_ = read_callback; diff --git a/db/builder.cc b/db/builder.cc index 510eb36fb..b61f57a14 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -247,8 +247,9 @@ Status BuildTable( // No matter whether use_direct_io_for_flush_and_compaction is true, // we will regrad this verification as user reads since the goal is // to cache it here for further user reads + ReadOptions read_options; std::unique_ptr it(table_cache->NewIterator( - ReadOptions(), file_options, internal_comparator, *meta, + read_options, file_options, internal_comparator, *meta, nullptr /* range_del_agg */, mutable_cf_options.prefix_extractor.get(), nullptr, (internal_stats == nullptr) ? nullptr diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 933524d1c..3467f6749 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -676,8 +676,9 @@ Status CompactionJob::Run() { // No matter whether use_direct_io_for_flush_and_compaction is true, // we will regard this verification as user reads since the goal is // to cache it here for further user reads + ReadOptions read_options; InternalIterator* iter = cfd->table_cache()->NewIterator( - ReadOptions(), file_options_, cfd->internal_comparator(), + read_options, file_options_, cfd->internal_comparator(), files_output[file_idx]->meta, /*range_del_agg=*/nullptr, prefix_extractor, /*table_reader_ptr=*/nullptr, @@ -877,11 +878,20 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(), existing_snapshots_); + ReadOptions read_options; + read_options.verify_checksums = true; + read_options.fill_cache = false; + // Compaction iterators shouldn't be confined to a single prefix. + // Compactions use Seek() for + // (a) concurrent compactions, + // (b) CompactionFilter::Decision::kRemoveAndSkipUntil. + read_options.total_order_seek = true; // Although the v2 aggregator is what the level iterator(s) know about, // the AddTombstones calls will be propagated down to the v1 aggregator. - std::unique_ptr input(versions_->MakeInputIterator( - sub_compact->compaction, &range_del_agg, file_options_for_read_)); + std::unique_ptr input( + versions_->MakeInputIterator(read_options, sub_compact->compaction, + &range_del_agg, file_options_for_read_)); AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 3795a08b7..2bffbc6bb 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -101,19 +101,35 @@ TEST_F(DBBasicTest, ReadOnlyDB) { ASSERT_OK(Put("foo", "v3")); Close(); + auto verify_one_iter = [&](Iterator* iter) { + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ++count; + } + // Always expect two keys: "foo" and "bar" + ASSERT_EQ(count, 2); + }; + + auto verify_all_iters = [&]() { + Iterator* iter = db_->NewIterator(ReadOptions()); + verify_one_iter(iter); + delete iter; + + std::vector iters; + ASSERT_OK(db_->NewIterators(ReadOptions(), + {dbfull()->DefaultColumnFamily()}, &iters)); + ASSERT_EQ(static_cast(1), iters.size()); + verify_one_iter(iters[0]); + delete iters[0]; + }; + auto options = CurrentOptions(); assert(options.env == env_); ASSERT_OK(ReadOnlyReopen(options)); ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v2", Get("bar")); - Iterator* iter = db_->NewIterator(ReadOptions()); - int count = 0; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - ASSERT_OK(iter->status()); - ++count; - } - ASSERT_EQ(count, 2); - delete iter; + verify_all_iters(); Close(); // Reopen and flush memtable. @@ -124,6 +140,7 @@ TEST_F(DBBasicTest, ReadOnlyDB) { ASSERT_OK(ReadOnlyReopen(options)); ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v2", Get("bar")); + verify_all_iters(); ASSERT_TRUE(db_->SyncWAL().IsNotSupported()); } diff --git a/db/db_compaction_filter_test.cc b/db/db_compaction_filter_test.cc index a708c0b1a..74c624ad1 100644 --- a/db/db_compaction_filter_test.cc +++ b/db/db_compaction_filter_test.cc @@ -336,8 +336,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) { InternalKeyComparator icmp(options.comparator); ReadRangeDelAggregator range_del_agg(&icmp, kMaxSequenceNumber /* upper_bound */); + ReadOptions read_options; ScopedArenaIterator iter(dbfull()->NewInternalIterator( - &arena, &range_del_agg, kMaxSequenceNumber, handles_[1])); + read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1])); iter->SeekToFirst(); ASSERT_OK(iter->status()); while (iter->Valid()) { @@ -426,8 +427,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) { InternalKeyComparator icmp(options.comparator); ReadRangeDelAggregator range_del_agg(&icmp, kMaxSequenceNumber /* upper_bound */); + ReadOptions read_options; ScopedArenaIterator iter(dbfull()->NewInternalIterator( - &arena, &range_del_agg, kMaxSequenceNumber, handles_[1])); + read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1])); iter->SeekToFirst(); ASSERT_OK(iter->status()); while (iter->Valid()) { @@ -644,8 +646,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) { InternalKeyComparator icmp(options.comparator); ReadRangeDelAggregator range_del_agg(&icmp, kMaxSequenceNumber /* snapshots */); + ReadOptions read_options; ScopedArenaIterator iter(dbfull()->NewInternalIterator( - &arena, &range_del_agg, kMaxSequenceNumber)); + read_options, &arena, &range_del_agg, kMaxSequenceNumber)); iter->SeekToFirst(); ASSERT_OK(iter->status()); while (iter->Valid()) { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 36fe87912..b95d85a44 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1364,9 +1364,12 @@ bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) { } } -InternalIterator* DBImpl::NewInternalIterator( - Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence, - ColumnFamilyHandle* column_family, bool allow_unprepared_value) { +InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options, + Arena* arena, + RangeDelAggregator* range_del_agg, + SequenceNumber sequence, + ColumnFamilyHandle* column_family, + bool allow_unprepared_value) { ColumnFamilyData* cfd; if (column_family == nullptr) { cfd = default_cf_handle_->cfd(); @@ -1378,9 +1381,8 @@ InternalIterator* DBImpl::NewInternalIterator( mutex_.Lock(); SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); mutex_.Unlock(); - ReadOptions roptions; - return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg, - sequence, allow_unprepared_value); + return NewInternalIterator(read_options, cfd, super_version, arena, + range_del_agg, sequence, allow_unprepared_value); } void DBImpl::SchedulePurge() { @@ -2829,10 +2831,10 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, sv->version_number, read_callback, this, cfd, allow_blob, read_options.snapshot != nullptr ? false : allow_refresh); - InternalIterator* internal_iter = - NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), - db_iter->GetRangeDelAggregator(), snapshot, - /* allow_unprepared_value */ true); + InternalIterator* internal_iter = NewInternalIterator( + db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), + db_iter->GetRangeDelAggregator(), snapshot, + /* allow_unprepared_value */ true); db_iter->SetIterUnderDBIter(internal_iter); return db_iter; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index a6b42f7b4..13dc575cc 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -594,8 +594,10 @@ class DBImpl : public DB { // the value and so will require PrepareValue() to be called before value(); // allow_unprepared_value = false is convenient when this optimization is not // useful, e.g. when reading the whole column family. + // @param read_options Must outlive the returned iterator. InternalIterator* NewInternalIterator( - Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence, + const ReadOptions& read_options, Arena* arena, + RangeDelAggregator* range_del_agg, SequenceNumber sequence, ColumnFamilyHandle* column_family = nullptr, bool allow_unprepared_value = false); @@ -721,10 +723,14 @@ class DBImpl : public DB { const WriteController& write_controller() { return write_controller_; } - InternalIterator* NewInternalIterator( - const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version, - Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence, - bool allow_unprepared_value); + // @param read_options Must outlive the returned iterator. + InternalIterator* NewInternalIterator(const ReadOptions& read_options, + ColumnFamilyData* cfd, + SuperVersion* super_version, + Arena* arena, + RangeDelAggregator* range_del_agg, + SequenceNumber sequence, + bool allow_unprepared_value); // hollow transactions shell used for recovery. // these will then be passed to TransactionDB so that diff --git a/db/db_impl/db_impl_readonly.cc b/db/db_impl/db_impl_readonly.cc index 5ddde3891..99c6c9e6d 100644 --- a/db/db_impl/db_impl_readonly.cc +++ b/db/db_impl/db_impl_readonly.cc @@ -86,10 +86,10 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options, read_seq, super_version->mutable_cf_options.max_sequential_skip_in_iterations, super_version->version_number, read_callback); - auto internal_iter = - NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), - db_iter->GetRangeDelAggregator(), read_seq, - /* allow_unprepared_value */ true); + auto internal_iter = NewInternalIterator( + db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(), + db_iter->GetRangeDelAggregator(), read_seq, + /* allow_unprepared_value */ true); db_iter->SetIterUnderDBIter(internal_iter); return db_iter; } @@ -118,10 +118,10 @@ Status DBImplReadOnly::NewIterators( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, sv->version_number, read_callback); - auto* internal_iter = - NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), - db_iter->GetRangeDelAggregator(), read_seq, - /* allow_unprepared_value */ true); + auto* internal_iter = NewInternalIterator( + db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), + db_iter->GetRangeDelAggregator(), read_seq, + /* allow_unprepared_value */ true); db_iter->SetIterUnderDBIter(internal_iter); iterators->push_back(db_iter); } diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 31dcf75bc..d66732c08 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -415,10 +415,10 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( snapshot, super_version->mutable_cf_options.max_sequential_skip_in_iterations, super_version->version_number, read_callback); - auto internal_iter = - NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), - db_iter->GetRangeDelAggregator(), snapshot, - /* allow_unprepared_value */ true); + auto internal_iter = NewInternalIterator( + db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(), + db_iter->GetRangeDelAggregator(), snapshot, + /* allow_unprepared_value */ true); db_iter->SetIterUnderDBIter(internal_iter); return db_iter; } diff --git a/db/db_test_util.cc b/db/db_test_util.cc index b40eac672..902361caa 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -907,12 +907,13 @@ std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) { InternalKeyComparator icmp(options.comparator); ReadRangeDelAggregator range_del_agg(&icmp, kMaxSequenceNumber /* upper_bound */); + ReadOptions read_options; ScopedArenaIterator iter; if (cf == 0) { - iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg, + iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg, kMaxSequenceNumber)); } else { - iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg, + iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[cf])); } InternalKey target(user_key, kMaxSequenceNumber, kTypeValue); @@ -1322,12 +1323,13 @@ void DBTestBase::validateNumberOfEntries(int numValues, int cf) { kMaxSequenceNumber /* upper_bound */); // This should be defined after range_del_agg so that it destructs the // assigned iterator before it range_del_agg is already destructed. + ReadOptions read_options; ScopedArenaIterator iter; if (cf != 0) { - iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg, + iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[cf])); } else { - iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg, + iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg, kMaxSequenceNumber)); } iter->SeekToFirst(); @@ -1530,8 +1532,9 @@ void DBTestBase::VerifyDBInternal( InternalKeyComparator icmp(last_options_.comparator); ReadRangeDelAggregator range_del_agg(&icmp, kMaxSequenceNumber /* upper_bound */); - auto iter = - dbfull()->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber); + ReadOptions read_options; + auto iter = dbfull()->NewInternalIterator(read_options, &arena, + &range_del_agg, kMaxSequenceNumber); iter->SeekToFirst(); for (auto p : true_data) { ASSERT_TRUE(iter->Valid()); diff --git a/db/table_cache.h b/db/table_cache.h index 5c5ce1e78..2fdb251ac 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -60,6 +60,7 @@ class TableCache { // the returned iterator. The returned "*table_reader_ptr" object is owned // by the cache and should not be deleted, and is valid for as long as the // returned iterator is live. + // @param options Must outlive the returned iterator. // @param range_del_agg If non-nullptr, adds range deletions to the // aggregator. If an error occurs, returns it in a NewErrorInternalIterator // @param for_compaction If true, a new TableReader may be allocated (but diff --git a/db/version_set.cc b/db/version_set.cc index 67d1be3ad..5bbed8d18 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -876,6 +876,7 @@ namespace { class LevelIterator final : public InternalIterator { public: + // @param read_options Must outlive this iterator. LevelIterator(TableCache* table_cache, const ReadOptions& read_options, const FileOptions& file_options, const InternalKeyComparator& icomparator, @@ -1020,7 +1021,7 @@ class LevelIterator final : public InternalIterator { } TableCache* table_cache_; - const ReadOptions read_options_; + const ReadOptions& read_options_; const FileOptions& file_options_; const InternalKeyComparator& icomparator_; const UserComparatorWrapper user_comparator_; @@ -5670,18 +5671,10 @@ void VersionSet::AddLiveFiles(std::vector* live_table_files, } InternalIterator* VersionSet::MakeInputIterator( - const Compaction* c, RangeDelAggregator* range_del_agg, + const ReadOptions& read_options, const Compaction* c, + RangeDelAggregator* range_del_agg, const FileOptions& file_options_compactions) { auto cfd = c->column_family_data(); - ReadOptions read_options; - read_options.verify_checksums = true; - read_options.fill_cache = false; - // Compaction iterators shouldn't be confined to a single prefix. - // Compactions use Seek() for - // (a) concurrent compactions, - // (b) CompactionFilter::Decision::kRemoveAndSkipUntil. - read_options.total_order_seek = true; - // Level-0 files have to be merged together. For other levels, // we will make a concatenating iterator per level. // TODO(opt): use concatenating iterator for level-0 if there is no overlap diff --git a/db/version_set.h b/db/version_set.h index 63f91e441..b36e88620 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -627,13 +627,19 @@ class Version { public: // Append to *iters a sequence of iterators that will // yield the contents of this Version when merged together. - // REQUIRES: This version has been saved (see VersionSet::SaveTo) - void AddIterators(const ReadOptions&, const FileOptions& soptions, + // @param read_options Must outlive any iterator built by + // `merger_iter_builder`. + // REQUIRES: This version has been saved (see VersionSet::SaveTo). + void AddIterators(const ReadOptions& read_options, + const FileOptions& soptions, MergeIteratorBuilder* merger_iter_builder, RangeDelAggregator* range_del_agg, bool allow_unprepared_value); - void AddIteratorsForLevel(const ReadOptions&, const FileOptions& soptions, + // @param read_options Must outlive any iterator built by + // `merger_iter_builder`. + void AddIteratorsForLevel(const ReadOptions& read_options, + const FileOptions& soptions, MergeIteratorBuilder* merger_iter_builder, int level, RangeDelAggregator* range_del_agg, bool allow_unprepared_value); @@ -1120,8 +1126,10 @@ class VersionSet { // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. + // @param read_options Must outlive the returned iterator. InternalIterator* MakeInputIterator( - const Compaction* c, RangeDelAggregator* range_del_agg, + const ReadOptions& read_options, const Compaction* c, + RangeDelAggregator* range_del_agg, const FileOptions& file_options_compactions); // Add all files listed in any live version to *live_table_files and diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index a5fe22875..026ac9ee7 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -18,6 +18,7 @@ namespace ROCKSDB_NAMESPACE { class BlockBasedTableIterator : public InternalIteratorBase { // compaction_readahead_size: its value will only be used if for_compaction = // true + // @param read_options Must outlive this iterator. public: BlockBasedTableIterator( const BlockBasedTable* table, const ReadOptions& read_options, @@ -25,21 +26,20 @@ class BlockBasedTableIterator : public InternalIteratorBase { std::unique_ptr>&& index_iter, bool check_filter, bool need_upper_bound_check, const SliceTransform* prefix_extractor, TableReaderCaller caller, - size_t compaction_readahead_size = 0, - bool allow_unprepared_value = false) + size_t compaction_readahead_size = 0, bool allow_unprepared_value = false) : table_(table), read_options_(read_options), icomp_(icomp), user_comparator_(icomp.user_comparator()), - allow_unprepared_value_(allow_unprepared_value), index_iter_(std::move(index_iter)), pinned_iters_mgr_(nullptr), - block_iter_points_to_real_block_(false), - check_filter_(check_filter), - need_upper_bound_check_(need_upper_bound_check), prefix_extractor_(prefix_extractor), lookup_context_(caller), - block_prefetcher_(compaction_readahead_size) {} + block_prefetcher_(compaction_readahead_size), + allow_unprepared_value_(allow_unprepared_value), + block_iter_points_to_real_block_(false), + check_filter_(check_filter), + need_upper_bound_check_(need_upper_bound_check) {} ~BlockBasedTableIterator() {} @@ -151,14 +151,19 @@ class BlockBasedTableIterator : public InternalIteratorBase { }; const BlockBasedTable* table_; - const ReadOptions read_options_; + const ReadOptions& read_options_; const InternalKeyComparator& icomp_; UserComparatorWrapper user_comparator_; - const bool allow_unprepared_value_; std::unique_ptr> index_iter_; PinnedIteratorsManager* pinned_iters_mgr_; DataBlockIter block_iter_; + const SliceTransform* prefix_extractor_; + uint64_t prev_block_offset_ = std::numeric_limits::max(); + BlockCacheLookupContext lookup_context_; + + BlockPrefetcher block_prefetcher_; + const bool allow_unprepared_value_; // True if block_iter_ is initialized and points to the same block // as index iterator. bool block_iter_points_to_real_block_; @@ -172,11 +177,6 @@ class BlockBasedTableIterator : public InternalIteratorBase { bool check_filter_; // TODO(Zhongyi): pick a better name bool need_upper_bound_check_; - const SliceTransform* prefix_extractor_; - uint64_t prev_block_offset_ = std::numeric_limits::max(); - BlockCacheLookupContext lookup_context_; - - BlockPrefetcher block_prefetcher_; // If `target` is null, seek to first. void SeekImpl(const Slice* target); diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 889a4a29f..bf47dd34d 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -113,6 +113,7 @@ class BlockBasedTable : public TableReader { // Returns a new iterator over the table contents. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). + // @param read_options Must outlive the returned iterator. // @param skip_filters Disables loading/accessing the filter block // compaction_readahead_size: its value will only be used if caller = // kCompaction. diff --git a/table/table_reader.h b/table/table_reader.h index 2459ba078..3631705c4 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -39,6 +39,8 @@ class TableReader { // Returns a new iterator over the table contents. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). + // + // read_options: Must outlive the returned iterator. // arena: If not null, the arena needs to be used to allocate the Iterator. // When destroying the iterator, the caller will not call "delete" // but Iterator::~Iterator() directly. The destructor needs to destroy @@ -48,8 +50,8 @@ class TableReader { // compaction_readahead_size: its value will only be used if caller = // kCompaction virtual InternalIterator* NewIterator( - const ReadOptions&, const SliceTransform* prefix_extractor, Arena* arena, - bool skip_filters, TableReaderCaller caller, + const ReadOptions& read_options, const SliceTransform* prefix_extractor, + Arena* arena, bool skip_filters, TableReaderCaller caller, size_t compaction_readahead_size = 0, bool allow_unprepared_value = false) = 0; diff --git a/table/table_test.cc b/table/table_test.cc index 3554f3463..760154822 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -394,10 +394,9 @@ class TableConstructor : public Constructor { InternalIterator* NewIterator( const SliceTransform* prefix_extractor) const override { - ReadOptions ro; InternalIterator* iter = table_reader_->NewIterator( - ro, prefix_extractor, /*arena=*/nullptr, /*skip_filters=*/false, - TableReaderCaller::kUncategorized); + read_options_, prefix_extractor, /*arena=*/nullptr, + /*skip_filters=*/false, TableReaderCaller::kUncategorized); if (convert_to_internal_key_) { return new KeyConvertingIterator(iter); } else { @@ -452,6 +451,7 @@ class TableConstructor : public Constructor { file_reader_.reset(); } + const ReadOptions read_options_; uint64_t uniq_id_; std::unique_ptr file_writer_; std::unique_ptr file_reader_; @@ -1883,8 +1883,9 @@ TEST_P(BlockBasedTableTest, SkipPrefixBloomFilter) { const MutableCFOptions new_moptions(options); c.Reopen(new_ioptions, new_moptions); auto reader = c.GetTableReader(); + ReadOptions read_options; std::unique_ptr db_iter(reader->NewIterator( - ReadOptions(), new_moptions.prefix_extractor.get(), /*arena=*/nullptr, + read_options, new_moptions.prefix_extractor.get(), /*arena=*/nullptr, /*skip_filters=*/false, TableReaderCaller::kUncategorized)); // Test point lookup @@ -1945,8 +1946,9 @@ void TableTest::IndexTest(BlockBasedTableOptions table_options) { ASSERT_EQ(5u, props->num_data_blocks); // TODO(Zhongyi): update test to use MutableCFOptions + ReadOptions read_options; std::unique_ptr index_iter(reader->NewIterator( - ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr, + read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr, /*skip_filters=*/false, TableReaderCaller::kUncategorized)); // -- Find keys do not exist, but have common prefix. @@ -2254,8 +2256,9 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) { auto reader = c.GetTableReader(); auto props = reader->GetTableProperties(); ASSERT_EQ(4u, props->num_data_blocks); + ReadOptions read_options; std::unique_ptr iter(reader->NewIterator( - ReadOptions(), /*prefix_extractor=*/nullptr, /*arena=*/nullptr, + read_options, /*prefix_extractor=*/nullptr, /*arena=*/nullptr, /*skip_filters=*/false, TableReaderCaller::kUncategorized, /*compaction_readahead_size=*/0, /*allow_unprepared_value=*/true)); @@ -2438,8 +2441,9 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKeyGlobalSeqno) { auto reader = c.GetTableReader(); auto props = reader->GetTableProperties(); ASSERT_EQ(1u, props->num_data_blocks); + ReadOptions read_options; std::unique_ptr iter(reader->NewIterator( - ReadOptions(), /*prefix_extractor=*/nullptr, /*arena=*/nullptr, + read_options, /*prefix_extractor=*/nullptr, /*arena=*/nullptr, /*skip_filters=*/false, TableReaderCaller::kUncategorized, /*compaction_readahead_size=*/0, /*allow_unprepared_value=*/true)); @@ -2668,8 +2672,9 @@ TEST_P(BlockBasedTableTest, TracingIterator) { GetPlainInternalComparator(options.comparator), &keys, &kvmap); for (uint32_t i = 1; i <= 2; i++) { + ReadOptions read_options; std::unique_ptr iter(c.GetTableReader()->NewIterator( - ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr, + read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr, /*skip_filters=*/false, TableReaderCaller::kUserIterator)); iter->SeekToFirst(); while (iter->Valid()) { @@ -3883,8 +3888,9 @@ TEST_P(IndexBlockRestartIntervalTest, IndexBlockRestartInterval) { &kvmap); auto reader = c.GetTableReader(); + ReadOptions read_options; std::unique_ptr db_iter(reader->NewIterator( - ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr, + read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr, /*skip_filters=*/false, TableReaderCaller::kUncategorized)); // Test point lookup @@ -4070,6 +4076,7 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) { // Helper function to get the contents of the table InternalIterator std::unique_ptr table_reader; + const ReadOptions read_options; std::function GetTableInternalIter = [&]() { std::unique_ptr file_reader( test::GetRandomAccessFileReader( @@ -4081,7 +4088,7 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) { std::move(file_reader), ss_rw.contents().size(), &table_reader); return table_reader->NewIterator( - ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr, + read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr, /*skip_filters=*/false, TableReaderCaller::kUncategorized); }; @@ -4252,8 +4259,9 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) { GetPlainInternalComparator(options2.comparator)), std::move(file_reader), ss_rw.contents().size(), &table_reader)); + ReadOptions read_options; std::unique_ptr db_iter(table_reader->NewIterator( - ReadOptions(), moptions2.prefix_extractor.get(), /*arena=*/nullptr, + read_options, moptions2.prefix_extractor.get(), /*arena=*/nullptr, /*skip_filters=*/false, TableReaderCaller::kUncategorized)); int expected_key = 1; @@ -4548,8 +4556,9 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) { auto reader = c.GetTableReader(); std::unique_ptr seek_iter; + ReadOptions read_options; seek_iter.reset(reader->NewIterator( - ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr, + read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr, /*skip_filters=*/false, TableReaderCaller::kUncategorized)); for (int i = 0; i < 2; ++i) { ReadOptions ro; diff --git a/utilities/debug.cc b/utilities/debug.cc index b51f9da0d..12fab0f08 100644 --- a/utilities/debug.cc +++ b/utilities/debug.cc @@ -37,11 +37,12 @@ Status GetAllKeyVersions(DB* db, ColumnFamilyHandle* cfh, Slice begin_key, DBImpl* idb = static_cast(db->GetRootDB()); auto icmp = InternalKeyComparator(idb->GetOptions(cfh).comparator); + ReadOptions read_options; ReadRangeDelAggregator range_del_agg(&icmp, kMaxSequenceNumber /* upper_bound */); Arena arena; - ScopedArenaIterator iter(idb->NewInternalIterator(&arena, &range_del_agg, - kMaxSequenceNumber, cfh)); + ScopedArenaIterator iter(idb->NewInternalIterator( + read_options, &arena, &range_del_agg, kMaxSequenceNumber, cfh)); if (!begin_key.empty()) { InternalKey ikey;