dedup ReadOptions in iterator hierarchy (#7210)

Summary:
Previously, a `ReadOptions` object was stored in every `BlockBasedTableIterator`
and every `LevelIterator`. This redundancy consumes extra memory,
resulting in the `Arena` making more allocations, and iteration
observing worse cache performance.

This PR migrates callers of `NewInternalIterator()` and
`MakeInputIterator()` to provide a `ReadOptions` object guaranteed to
outlive the returned iterator. When the iterator's lifetime will be managed by the
user, this lifetime guarantee is achieved by storing the `ReadOptions`
value in `ArenaWrappedDBIter`. Then, sub-iterators of `NewInternalIterator()` and
`MakeInputIterator()` can hold a reference-to-const `ReadOptions`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7210

Test Plan:
- `make check` under ASAN and valgrind
- benchmark: on a DB with 2 L0 files and 3 L1+ levels, this PR reduced `Arena` allocation 4792 -> 4160 bytes.

Reviewed By: anand1976

Differential Revision: D22861323

Pulled By: ajkr

fbshipit-source-id: 54aebb3e89c872eeab0f5793b4b6e42878d093ce
main
Andrew Kryczka 4 years ago committed by Facebook GitHub Bot
parent 18efd760c5
commit a4a4a2dabd
  1. 4
      db/arena_wrapped_db_iter.cc
  2. 7
      db/arena_wrapped_db_iter.h
  3. 3
      db/builder.cc
  4. 16
      db/compaction/compaction_job.cc
  5. 33
      db/db_basic_test.cc
  6. 9
      db/db_compaction_filter_test.cc
  7. 22
      db/db_impl/db_impl.cc
  8. 16
      db/db_impl/db_impl.h
  9. 16
      db/db_impl/db_impl_readonly.cc
  10. 8
      db/db_impl/db_impl_secondary.cc
  11. 15
      db/db_test_util.cc
  12. 1
      db/table_cache.h
  13. 15
      db/version_set.cc
  14. 16
      db/version_set.h
  15. 28
      table/block_based/block_based_table_iterator.h
  16. 1
      table/block_based/block_based_table_reader.h
  17. 6
      table/table_reader.h
  18. 33
      table/table_test.cc
  19. 5
      utilities/debug.cc

@ -45,6 +45,7 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
true, max_sequential_skip_in_iteration, true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, allow_blob); read_callback, db_impl, cfd, allow_blob);
sv_number_ = version_number; sv_number_ = version_number;
read_options_ = read_options;
allow_refresh_ = allow_refresh; allow_refresh_ = allow_refresh;
} }
@ -98,8 +99,7 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
max_sequential_skip_in_iterations, version_number, read_callback, max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, allow_blob, allow_refresh); db_impl, cfd, allow_blob, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) { if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback, iter->StoreRefreshInfo(db_impl, cfd, read_callback, allow_blob);
allow_blob);
} }
return iter; return iter;

@ -41,6 +41,7 @@ class ArenaWrappedDBIter : public Iterator {
virtual ReadRangeDelAggregator* GetRangeDelAggregator() { virtual ReadRangeDelAggregator* GetRangeDelAggregator() {
return db_iter_->GetRangeDelAggregator(); return db_iter_->GetRangeDelAggregator();
} }
const ReadOptions& GetReadOptions() { return read_options_; }
// Set the internal iterator wrapped inside the DB Iterator. Usually it is // Set the internal iterator wrapped inside the DB Iterator. Usually it is
// a merging iterator. // a merging iterator.
@ -79,10 +80,8 @@ class ArenaWrappedDBIter : public Iterator {
// Store some parameters so we can refresh the iterator at a later point // Store some parameters so we can refresh the iterator at a later point
// with these same params // with these same params
void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, void StoreRefreshInfo(DBImpl* db_impl, ColumnFamilyData* cfd,
ColumnFamilyData* cfd, ReadCallback* read_callback, ReadCallback* read_callback, bool allow_blob) {
bool allow_blob) {
read_options_ = read_options;
db_impl_ = db_impl; db_impl_ = db_impl;
cfd_ = cfd; cfd_ = cfd;
read_callback_ = read_callback; read_callback_ = read_callback;

@ -247,8 +247,9 @@ Status BuildTable(
// No matter whether use_direct_io_for_flush_and_compaction is true, // No matter whether use_direct_io_for_flush_and_compaction is true,
// we will regrad this verification as user reads since the goal is // we will regrad this verification as user reads since the goal is
// to cache it here for further user reads // to cache it here for further user reads
ReadOptions read_options;
std::unique_ptr<InternalIterator> it(table_cache->NewIterator( std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
ReadOptions(), file_options, internal_comparator, *meta, read_options, file_options, internal_comparator, *meta,
nullptr /* range_del_agg */, nullptr /* range_del_agg */,
mutable_cf_options.prefix_extractor.get(), nullptr, mutable_cf_options.prefix_extractor.get(), nullptr,
(internal_stats == nullptr) ? nullptr (internal_stats == nullptr) ? nullptr

@ -676,8 +676,9 @@ Status CompactionJob::Run() {
// No matter whether use_direct_io_for_flush_and_compaction is true, // No matter whether use_direct_io_for_flush_and_compaction is true,
// we will regard this verification as user reads since the goal is // we will regard this verification as user reads since the goal is
// to cache it here for further user reads // to cache it here for further user reads
ReadOptions read_options;
InternalIterator* iter = cfd->table_cache()->NewIterator( InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), file_options_, cfd->internal_comparator(), read_options, file_options_, cfd->internal_comparator(),
files_output[file_idx]->meta, /*range_del_agg=*/nullptr, files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
prefix_extractor, prefix_extractor,
/*table_reader_ptr=*/nullptr, /*table_reader_ptr=*/nullptr,
@ -877,11 +878,20 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(), CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
existing_snapshots_); existing_snapshots_);
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.fill_cache = false;
// Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for
// (a) concurrent compactions,
// (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
read_options.total_order_seek = true;
// Although the v2 aggregator is what the level iterator(s) know about, // Although the v2 aggregator is what the level iterator(s) know about,
// the AddTombstones calls will be propagated down to the v1 aggregator. // the AddTombstones calls will be propagated down to the v1 aggregator.
std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator( std::unique_ptr<InternalIterator> input(
sub_compact->compaction, &range_del_agg, file_options_for_read_)); versions_->MakeInputIterator(read_options, sub_compact->compaction,
&range_del_agg, file_options_for_read_));
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV); ThreadStatus::STAGE_COMPACTION_PROCESS_KV);

@ -101,19 +101,35 @@ TEST_F(DBBasicTest, ReadOnlyDB) {
ASSERT_OK(Put("foo", "v3")); ASSERT_OK(Put("foo", "v3"));
Close(); Close();
auto verify_one_iter = [&](Iterator* iter) {
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
++count;
}
// Always expect two keys: "foo" and "bar"
ASSERT_EQ(count, 2);
};
auto verify_all_iters = [&]() {
Iterator* iter = db_->NewIterator(ReadOptions());
verify_one_iter(iter);
delete iter;
std::vector<Iterator*> iters;
ASSERT_OK(db_->NewIterators(ReadOptions(),
{dbfull()->DefaultColumnFamily()}, &iters));
ASSERT_EQ(static_cast<uint64_t>(1), iters.size());
verify_one_iter(iters[0]);
delete iters[0];
};
auto options = CurrentOptions(); auto options = CurrentOptions();
assert(options.env == env_); assert(options.env == env_);
ASSERT_OK(ReadOnlyReopen(options)); ASSERT_OK(ReadOnlyReopen(options));
ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar")); ASSERT_EQ("v2", Get("bar"));
Iterator* iter = db_->NewIterator(ReadOptions()); verify_all_iters();
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
++count;
}
ASSERT_EQ(count, 2);
delete iter;
Close(); Close();
// Reopen and flush memtable. // Reopen and flush memtable.
@ -124,6 +140,7 @@ TEST_F(DBBasicTest, ReadOnlyDB) {
ASSERT_OK(ReadOnlyReopen(options)); ASSERT_OK(ReadOnlyReopen(options));
ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar")); ASSERT_EQ("v2", Get("bar"));
verify_all_iters();
ASSERT_TRUE(db_->SyncWAL().IsNotSupported()); ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
} }

@ -336,8 +336,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
InternalKeyComparator icmp(options.comparator); InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp, ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */); kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
ScopedArenaIterator iter(dbfull()->NewInternalIterator( ScopedArenaIterator iter(dbfull()->NewInternalIterator(
&arena, &range_del_agg, kMaxSequenceNumber, handles_[1])); read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
iter->SeekToFirst(); iter->SeekToFirst();
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
while (iter->Valid()) { while (iter->Valid()) {
@ -426,8 +427,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
InternalKeyComparator icmp(options.comparator); InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp, ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */); kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
ScopedArenaIterator iter(dbfull()->NewInternalIterator( ScopedArenaIterator iter(dbfull()->NewInternalIterator(
&arena, &range_del_agg, kMaxSequenceNumber, handles_[1])); read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
iter->SeekToFirst(); iter->SeekToFirst();
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
while (iter->Valid()) { while (iter->Valid()) {
@ -644,8 +646,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
InternalKeyComparator icmp(options.comparator); InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp, ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* snapshots */); kMaxSequenceNumber /* snapshots */);
ReadOptions read_options;
ScopedArenaIterator iter(dbfull()->NewInternalIterator( ScopedArenaIterator iter(dbfull()->NewInternalIterator(
&arena, &range_del_agg, kMaxSequenceNumber)); read_options, &arena, &range_del_agg, kMaxSequenceNumber));
iter->SeekToFirst(); iter->SeekToFirst();
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
while (iter->Valid()) { while (iter->Valid()) {

@ -1364,9 +1364,12 @@ bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
} }
} }
InternalIterator* DBImpl::NewInternalIterator( InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence, Arena* arena,
ColumnFamilyHandle* column_family, bool allow_unprepared_value) { RangeDelAggregator* range_del_agg,
SequenceNumber sequence,
ColumnFamilyHandle* column_family,
bool allow_unprepared_value) {
ColumnFamilyData* cfd; ColumnFamilyData* cfd;
if (column_family == nullptr) { if (column_family == nullptr) {
cfd = default_cf_handle_->cfd(); cfd = default_cf_handle_->cfd();
@ -1378,9 +1381,8 @@ InternalIterator* DBImpl::NewInternalIterator(
mutex_.Lock(); mutex_.Lock();
SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
mutex_.Unlock(); mutex_.Unlock();
ReadOptions roptions; return NewInternalIterator(read_options, cfd, super_version, arena,
return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg, range_del_agg, sequence, allow_unprepared_value);
sequence, allow_unprepared_value);
} }
void DBImpl::SchedulePurge() { void DBImpl::SchedulePurge() {
@ -2829,10 +2831,10 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
sv->version_number, read_callback, this, cfd, allow_blob, sv->version_number, read_callback, this, cfd, allow_blob,
read_options.snapshot != nullptr ? false : allow_refresh); read_options.snapshot != nullptr ? false : allow_refresh);
InternalIterator* internal_iter = InternalIterator* internal_iter = NewInternalIterator(
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot, db_iter->GetRangeDelAggregator(), snapshot,
/* allow_unprepared_value */ true); /* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter); db_iter->SetIterUnderDBIter(internal_iter);
return db_iter; return db_iter;

@ -594,8 +594,10 @@ class DBImpl : public DB {
// the value and so will require PrepareValue() to be called before value(); // the value and so will require PrepareValue() to be called before value();
// allow_unprepared_value = false is convenient when this optimization is not // allow_unprepared_value = false is convenient when this optimization is not
// useful, e.g. when reading the whole column family. // useful, e.g. when reading the whole column family.
// @param read_options Must outlive the returned iterator.
InternalIterator* NewInternalIterator( InternalIterator* NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence, const ReadOptions& read_options, Arena* arena,
RangeDelAggregator* range_del_agg, SequenceNumber sequence,
ColumnFamilyHandle* column_family = nullptr, ColumnFamilyHandle* column_family = nullptr,
bool allow_unprepared_value = false); bool allow_unprepared_value = false);
@ -721,10 +723,14 @@ class DBImpl : public DB {
const WriteController& write_controller() { return write_controller_; } const WriteController& write_controller() { return write_controller_; }
InternalIterator* NewInternalIterator( // @param read_options Must outlive the returned iterator.
const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version, InternalIterator* NewInternalIterator(const ReadOptions& read_options,
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence, ColumnFamilyData* cfd,
bool allow_unprepared_value); SuperVersion* super_version,
Arena* arena,
RangeDelAggregator* range_del_agg,
SequenceNumber sequence,
bool allow_unprepared_value);
// hollow transactions shell used for recovery. // hollow transactions shell used for recovery.
// these will then be passed to TransactionDB so that // these will then be passed to TransactionDB so that

@ -86,10 +86,10 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
read_seq, read_seq,
super_version->mutable_cf_options.max_sequential_skip_in_iterations, super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback); super_version->version_number, read_callback);
auto internal_iter = auto internal_iter = NewInternalIterator(
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), read_seq, db_iter->GetRangeDelAggregator(), read_seq,
/* allow_unprepared_value */ true); /* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter); db_iter->SetIterUnderDBIter(internal_iter);
return db_iter; return db_iter;
} }
@ -118,10 +118,10 @@ Status DBImplReadOnly::NewIterators(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq, env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations, sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback); sv->version_number, read_callback);
auto* internal_iter = auto* internal_iter = NewInternalIterator(
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), read_seq, db_iter->GetRangeDelAggregator(), read_seq,
/* allow_unprepared_value */ true); /* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter); db_iter->SetIterUnderDBIter(internal_iter);
iterators->push_back(db_iter); iterators->push_back(db_iter);
} }

@ -415,10 +415,10 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
snapshot, snapshot,
super_version->mutable_cf_options.max_sequential_skip_in_iterations, super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback); super_version->version_number, read_callback);
auto internal_iter = auto internal_iter = NewInternalIterator(
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot, db_iter->GetRangeDelAggregator(), snapshot,
/* allow_unprepared_value */ true); /* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter); db_iter->SetIterUnderDBIter(internal_iter);
return db_iter; return db_iter;
} }

@ -907,12 +907,13 @@ std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
InternalKeyComparator icmp(options.comparator); InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp, ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */); kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
ScopedArenaIterator iter; ScopedArenaIterator iter;
if (cf == 0) { if (cf == 0) {
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg, iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
kMaxSequenceNumber)); kMaxSequenceNumber));
} else { } else {
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg, iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
kMaxSequenceNumber, handles_[cf])); kMaxSequenceNumber, handles_[cf]));
} }
InternalKey target(user_key, kMaxSequenceNumber, kTypeValue); InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
@ -1322,12 +1323,13 @@ void DBTestBase::validateNumberOfEntries(int numValues, int cf) {
kMaxSequenceNumber /* upper_bound */); kMaxSequenceNumber /* upper_bound */);
// This should be defined after range_del_agg so that it destructs the // This should be defined after range_del_agg so that it destructs the
// assigned iterator before it range_del_agg is already destructed. // assigned iterator before it range_del_agg is already destructed.
ReadOptions read_options;
ScopedArenaIterator iter; ScopedArenaIterator iter;
if (cf != 0) { if (cf != 0) {
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg, iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
kMaxSequenceNumber, handles_[cf])); kMaxSequenceNumber, handles_[cf]));
} else { } else {
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg, iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
kMaxSequenceNumber)); kMaxSequenceNumber));
} }
iter->SeekToFirst(); iter->SeekToFirst();
@ -1530,8 +1532,9 @@ void DBTestBase::VerifyDBInternal(
InternalKeyComparator icmp(last_options_.comparator); InternalKeyComparator icmp(last_options_.comparator);
ReadRangeDelAggregator range_del_agg(&icmp, ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */); kMaxSequenceNumber /* upper_bound */);
auto iter = ReadOptions read_options;
dbfull()->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber); auto iter = dbfull()->NewInternalIterator(read_options, &arena,
&range_del_agg, kMaxSequenceNumber);
iter->SeekToFirst(); iter->SeekToFirst();
for (auto p : true_data) { for (auto p : true_data) {
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());

@ -60,6 +60,7 @@ class TableCache {
// the returned iterator. The returned "*table_reader_ptr" object is owned // the returned iterator. The returned "*table_reader_ptr" object is owned
// by the cache and should not be deleted, and is valid for as long as the // by the cache and should not be deleted, and is valid for as long as the
// returned iterator is live. // returned iterator is live.
// @param options Must outlive the returned iterator.
// @param range_del_agg If non-nullptr, adds range deletions to the // @param range_del_agg If non-nullptr, adds range deletions to the
// aggregator. If an error occurs, returns it in a NewErrorInternalIterator // aggregator. If an error occurs, returns it in a NewErrorInternalIterator
// @param for_compaction If true, a new TableReader may be allocated (but // @param for_compaction If true, a new TableReader may be allocated (but

@ -876,6 +876,7 @@ namespace {
class LevelIterator final : public InternalIterator { class LevelIterator final : public InternalIterator {
public: public:
// @param read_options Must outlive this iterator.
LevelIterator(TableCache* table_cache, const ReadOptions& read_options, LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
const FileOptions& file_options, const FileOptions& file_options,
const InternalKeyComparator& icomparator, const InternalKeyComparator& icomparator,
@ -1020,7 +1021,7 @@ class LevelIterator final : public InternalIterator {
} }
TableCache* table_cache_; TableCache* table_cache_;
const ReadOptions read_options_; const ReadOptions& read_options_;
const FileOptions& file_options_; const FileOptions& file_options_;
const InternalKeyComparator& icomparator_; const InternalKeyComparator& icomparator_;
const UserComparatorWrapper user_comparator_; const UserComparatorWrapper user_comparator_;
@ -5670,18 +5671,10 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_table_files,
} }
InternalIterator* VersionSet::MakeInputIterator( InternalIterator* VersionSet::MakeInputIterator(
const Compaction* c, RangeDelAggregator* range_del_agg, const ReadOptions& read_options, const Compaction* c,
RangeDelAggregator* range_del_agg,
const FileOptions& file_options_compactions) { const FileOptions& file_options_compactions) {
auto cfd = c->column_family_data(); auto cfd = c->column_family_data();
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.fill_cache = false;
// Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for
// (a) concurrent compactions,
// (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
read_options.total_order_seek = true;
// Level-0 files have to be merged together. For other levels, // Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level. // we will make a concatenating iterator per level.
// TODO(opt): use concatenating iterator for level-0 if there is no overlap // TODO(opt): use concatenating iterator for level-0 if there is no overlap

@ -627,13 +627,19 @@ class Version {
public: public:
// Append to *iters a sequence of iterators that will // Append to *iters a sequence of iterators that will
// yield the contents of this Version when merged together. // yield the contents of this Version when merged together.
// REQUIRES: This version has been saved (see VersionSet::SaveTo) // @param read_options Must outlive any iterator built by
void AddIterators(const ReadOptions&, const FileOptions& soptions, // `merger_iter_builder`.
// REQUIRES: This version has been saved (see VersionSet::SaveTo).
void AddIterators(const ReadOptions& read_options,
const FileOptions& soptions,
MergeIteratorBuilder* merger_iter_builder, MergeIteratorBuilder* merger_iter_builder,
RangeDelAggregator* range_del_agg, RangeDelAggregator* range_del_agg,
bool allow_unprepared_value); bool allow_unprepared_value);
void AddIteratorsForLevel(const ReadOptions&, const FileOptions& soptions, // @param read_options Must outlive any iterator built by
// `merger_iter_builder`.
void AddIteratorsForLevel(const ReadOptions& read_options,
const FileOptions& soptions,
MergeIteratorBuilder* merger_iter_builder, MergeIteratorBuilder* merger_iter_builder,
int level, RangeDelAggregator* range_del_agg, int level, RangeDelAggregator* range_del_agg,
bool allow_unprepared_value); bool allow_unprepared_value);
@ -1120,8 +1126,10 @@ class VersionSet {
// Create an iterator that reads over the compaction inputs for "*c". // Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed. // The caller should delete the iterator when no longer needed.
// @param read_options Must outlive the returned iterator.
InternalIterator* MakeInputIterator( InternalIterator* MakeInputIterator(
const Compaction* c, RangeDelAggregator* range_del_agg, const ReadOptions& read_options, const Compaction* c,
RangeDelAggregator* range_del_agg,
const FileOptions& file_options_compactions); const FileOptions& file_options_compactions);
// Add all files listed in any live version to *live_table_files and // Add all files listed in any live version to *live_table_files and

@ -18,6 +18,7 @@ namespace ROCKSDB_NAMESPACE {
class BlockBasedTableIterator : public InternalIteratorBase<Slice> { class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
// compaction_readahead_size: its value will only be used if for_compaction = // compaction_readahead_size: its value will only be used if for_compaction =
// true // true
// @param read_options Must outlive this iterator.
public: public:
BlockBasedTableIterator( BlockBasedTableIterator(
const BlockBasedTable* table, const ReadOptions& read_options, const BlockBasedTable* table, const ReadOptions& read_options,
@ -25,21 +26,20 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
std::unique_ptr<InternalIteratorBase<IndexValue>>&& index_iter, std::unique_ptr<InternalIteratorBase<IndexValue>>&& index_iter,
bool check_filter, bool need_upper_bound_check, bool check_filter, bool need_upper_bound_check,
const SliceTransform* prefix_extractor, TableReaderCaller caller, const SliceTransform* prefix_extractor, TableReaderCaller caller,
size_t compaction_readahead_size = 0, size_t compaction_readahead_size = 0, bool allow_unprepared_value = false)
bool allow_unprepared_value = false)
: table_(table), : table_(table),
read_options_(read_options), read_options_(read_options),
icomp_(icomp), icomp_(icomp),
user_comparator_(icomp.user_comparator()), user_comparator_(icomp.user_comparator()),
allow_unprepared_value_(allow_unprepared_value),
index_iter_(std::move(index_iter)), index_iter_(std::move(index_iter)),
pinned_iters_mgr_(nullptr), pinned_iters_mgr_(nullptr),
block_iter_points_to_real_block_(false),
check_filter_(check_filter),
need_upper_bound_check_(need_upper_bound_check),
prefix_extractor_(prefix_extractor), prefix_extractor_(prefix_extractor),
lookup_context_(caller), lookup_context_(caller),
block_prefetcher_(compaction_readahead_size) {} block_prefetcher_(compaction_readahead_size),
allow_unprepared_value_(allow_unprepared_value),
block_iter_points_to_real_block_(false),
check_filter_(check_filter),
need_upper_bound_check_(need_upper_bound_check) {}
~BlockBasedTableIterator() {} ~BlockBasedTableIterator() {}
@ -151,14 +151,19 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
}; };
const BlockBasedTable* table_; const BlockBasedTable* table_;
const ReadOptions read_options_; const ReadOptions& read_options_;
const InternalKeyComparator& icomp_; const InternalKeyComparator& icomp_;
UserComparatorWrapper user_comparator_; UserComparatorWrapper user_comparator_;
const bool allow_unprepared_value_;
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter_; std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter_;
PinnedIteratorsManager* pinned_iters_mgr_; PinnedIteratorsManager* pinned_iters_mgr_;
DataBlockIter block_iter_; DataBlockIter block_iter_;
const SliceTransform* prefix_extractor_;
uint64_t prev_block_offset_ = std::numeric_limits<uint64_t>::max();
BlockCacheLookupContext lookup_context_;
BlockPrefetcher block_prefetcher_;
const bool allow_unprepared_value_;
// True if block_iter_ is initialized and points to the same block // True if block_iter_ is initialized and points to the same block
// as index iterator. // as index iterator.
bool block_iter_points_to_real_block_; bool block_iter_points_to_real_block_;
@ -172,11 +177,6 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
bool check_filter_; bool check_filter_;
// TODO(Zhongyi): pick a better name // TODO(Zhongyi): pick a better name
bool need_upper_bound_check_; bool need_upper_bound_check_;
const SliceTransform* prefix_extractor_;
uint64_t prev_block_offset_ = std::numeric_limits<uint64_t>::max();
BlockCacheLookupContext lookup_context_;
BlockPrefetcher block_prefetcher_;
// If `target` is null, seek to first. // If `target` is null, seek to first.
void SeekImpl(const Slice* target); void SeekImpl(const Slice* target);

@ -113,6 +113,7 @@ class BlockBasedTable : public TableReader {
// Returns a new iterator over the table contents. // Returns a new iterator over the table contents.
// The result of NewIterator() is initially invalid (caller must // The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it). // call one of the Seek methods on the iterator before using it).
// @param read_options Must outlive the returned iterator.
// @param skip_filters Disables loading/accessing the filter block // @param skip_filters Disables loading/accessing the filter block
// compaction_readahead_size: its value will only be used if caller = // compaction_readahead_size: its value will only be used if caller =
// kCompaction. // kCompaction.

@ -39,6 +39,8 @@ class TableReader {
// Returns a new iterator over the table contents. // Returns a new iterator over the table contents.
// The result of NewIterator() is initially invalid (caller must // The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it). // call one of the Seek methods on the iterator before using it).
//
// read_options: Must outlive the returned iterator.
// arena: If not null, the arena needs to be used to allocate the Iterator. // arena: If not null, the arena needs to be used to allocate the Iterator.
// When destroying the iterator, the caller will not call "delete" // When destroying the iterator, the caller will not call "delete"
// but Iterator::~Iterator() directly. The destructor needs to destroy // but Iterator::~Iterator() directly. The destructor needs to destroy
@ -48,8 +50,8 @@ class TableReader {
// compaction_readahead_size: its value will only be used if caller = // compaction_readahead_size: its value will only be used if caller =
// kCompaction // kCompaction
virtual InternalIterator* NewIterator( virtual InternalIterator* NewIterator(
const ReadOptions&, const SliceTransform* prefix_extractor, Arena* arena, const ReadOptions& read_options, const SliceTransform* prefix_extractor,
bool skip_filters, TableReaderCaller caller, Arena* arena, bool skip_filters, TableReaderCaller caller,
size_t compaction_readahead_size = 0, size_t compaction_readahead_size = 0,
bool allow_unprepared_value = false) = 0; bool allow_unprepared_value = false) = 0;

@ -394,10 +394,9 @@ class TableConstructor : public Constructor {
InternalIterator* NewIterator( InternalIterator* NewIterator(
const SliceTransform* prefix_extractor) const override { const SliceTransform* prefix_extractor) const override {
ReadOptions ro;
InternalIterator* iter = table_reader_->NewIterator( InternalIterator* iter = table_reader_->NewIterator(
ro, prefix_extractor, /*arena=*/nullptr, /*skip_filters=*/false, read_options_, prefix_extractor, /*arena=*/nullptr,
TableReaderCaller::kUncategorized); /*skip_filters=*/false, TableReaderCaller::kUncategorized);
if (convert_to_internal_key_) { if (convert_to_internal_key_) {
return new KeyConvertingIterator(iter); return new KeyConvertingIterator(iter);
} else { } else {
@ -452,6 +451,7 @@ class TableConstructor : public Constructor {
file_reader_.reset(); file_reader_.reset();
} }
const ReadOptions read_options_;
uint64_t uniq_id_; uint64_t uniq_id_;
std::unique_ptr<WritableFileWriter> file_writer_; std::unique_ptr<WritableFileWriter> file_writer_;
std::unique_ptr<RandomAccessFileReader> file_reader_; std::unique_ptr<RandomAccessFileReader> file_reader_;
@ -1883,8 +1883,9 @@ TEST_P(BlockBasedTableTest, SkipPrefixBloomFilter) {
const MutableCFOptions new_moptions(options); const MutableCFOptions new_moptions(options);
c.Reopen(new_ioptions, new_moptions); c.Reopen(new_ioptions, new_moptions);
auto reader = c.GetTableReader(); auto reader = c.GetTableReader();
ReadOptions read_options;
std::unique_ptr<InternalIterator> db_iter(reader->NewIterator( std::unique_ptr<InternalIterator> db_iter(reader->NewIterator(
ReadOptions(), new_moptions.prefix_extractor.get(), /*arena=*/nullptr, read_options, new_moptions.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kUncategorized)); /*skip_filters=*/false, TableReaderCaller::kUncategorized));
// Test point lookup // Test point lookup
@ -1945,8 +1946,9 @@ void TableTest::IndexTest(BlockBasedTableOptions table_options) {
ASSERT_EQ(5u, props->num_data_blocks); ASSERT_EQ(5u, props->num_data_blocks);
// TODO(Zhongyi): update test to use MutableCFOptions // TODO(Zhongyi): update test to use MutableCFOptions
ReadOptions read_options;
std::unique_ptr<InternalIterator> index_iter(reader->NewIterator( std::unique_ptr<InternalIterator> index_iter(reader->NewIterator(
ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr, read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kUncategorized)); /*skip_filters=*/false, TableReaderCaller::kUncategorized));
// -- Find keys do not exist, but have common prefix. // -- Find keys do not exist, but have common prefix.
@ -2254,8 +2256,9 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) {
auto reader = c.GetTableReader(); auto reader = c.GetTableReader();
auto props = reader->GetTableProperties(); auto props = reader->GetTableProperties();
ASSERT_EQ(4u, props->num_data_blocks); ASSERT_EQ(4u, props->num_data_blocks);
ReadOptions read_options;
std::unique_ptr<InternalIterator> iter(reader->NewIterator( std::unique_ptr<InternalIterator> iter(reader->NewIterator(
ReadOptions(), /*prefix_extractor=*/nullptr, /*arena=*/nullptr, read_options, /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kUncategorized, /*skip_filters=*/false, TableReaderCaller::kUncategorized,
/*compaction_readahead_size=*/0, /*allow_unprepared_value=*/true)); /*compaction_readahead_size=*/0, /*allow_unprepared_value=*/true));
@ -2438,8 +2441,9 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKeyGlobalSeqno) {
auto reader = c.GetTableReader(); auto reader = c.GetTableReader();
auto props = reader->GetTableProperties(); auto props = reader->GetTableProperties();
ASSERT_EQ(1u, props->num_data_blocks); ASSERT_EQ(1u, props->num_data_blocks);
ReadOptions read_options;
std::unique_ptr<InternalIterator> iter(reader->NewIterator( std::unique_ptr<InternalIterator> iter(reader->NewIterator(
ReadOptions(), /*prefix_extractor=*/nullptr, /*arena=*/nullptr, read_options, /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kUncategorized, /*skip_filters=*/false, TableReaderCaller::kUncategorized,
/*compaction_readahead_size=*/0, /*allow_unprepared_value=*/true)); /*compaction_readahead_size=*/0, /*allow_unprepared_value=*/true));
@ -2668,8 +2672,9 @@ TEST_P(BlockBasedTableTest, TracingIterator) {
GetPlainInternalComparator(options.comparator), &keys, &kvmap); GetPlainInternalComparator(options.comparator), &keys, &kvmap);
for (uint32_t i = 1; i <= 2; i++) { for (uint32_t i = 1; i <= 2; i++) {
ReadOptions read_options;
std::unique_ptr<InternalIterator> iter(c.GetTableReader()->NewIterator( std::unique_ptr<InternalIterator> iter(c.GetTableReader()->NewIterator(
ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr, read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kUserIterator)); /*skip_filters=*/false, TableReaderCaller::kUserIterator));
iter->SeekToFirst(); iter->SeekToFirst();
while (iter->Valid()) { while (iter->Valid()) {
@ -3883,8 +3888,9 @@ TEST_P(IndexBlockRestartIntervalTest, IndexBlockRestartInterval) {
&kvmap); &kvmap);
auto reader = c.GetTableReader(); auto reader = c.GetTableReader();
ReadOptions read_options;
std::unique_ptr<InternalIterator> db_iter(reader->NewIterator( std::unique_ptr<InternalIterator> db_iter(reader->NewIterator(
ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr, read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kUncategorized)); /*skip_filters=*/false, TableReaderCaller::kUncategorized));
// Test point lookup // Test point lookup
@ -4070,6 +4076,7 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
// Helper function to get the contents of the table InternalIterator // Helper function to get the contents of the table InternalIterator
std::unique_ptr<TableReader> table_reader; std::unique_ptr<TableReader> table_reader;
const ReadOptions read_options;
std::function<InternalIterator*()> GetTableInternalIter = [&]() { std::function<InternalIterator*()> GetTableInternalIter = [&]() {
std::unique_ptr<RandomAccessFileReader> file_reader( std::unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader( test::GetRandomAccessFileReader(
@ -4081,7 +4088,7 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
std::move(file_reader), ss_rw.contents().size(), &table_reader); std::move(file_reader), ss_rw.contents().size(), &table_reader);
return table_reader->NewIterator( return table_reader->NewIterator(
ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr, read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kUncategorized); /*skip_filters=*/false, TableReaderCaller::kUncategorized);
}; };
@ -4252,8 +4259,9 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
GetPlainInternalComparator(options2.comparator)), GetPlainInternalComparator(options2.comparator)),
std::move(file_reader), ss_rw.contents().size(), &table_reader)); std::move(file_reader), ss_rw.contents().size(), &table_reader));
ReadOptions read_options;
std::unique_ptr<InternalIterator> db_iter(table_reader->NewIterator( std::unique_ptr<InternalIterator> db_iter(table_reader->NewIterator(
ReadOptions(), moptions2.prefix_extractor.get(), /*arena=*/nullptr, read_options, moptions2.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kUncategorized)); /*skip_filters=*/false, TableReaderCaller::kUncategorized));
int expected_key = 1; int expected_key = 1;
@ -4548,8 +4556,9 @@ TEST_P(BlockBasedTableTest, DataBlockHashIndex) {
auto reader = c.GetTableReader(); auto reader = c.GetTableReader();
std::unique_ptr<InternalIterator> seek_iter; std::unique_ptr<InternalIterator> seek_iter;
ReadOptions read_options;
seek_iter.reset(reader->NewIterator( seek_iter.reset(reader->NewIterator(
ReadOptions(), moptions.prefix_extractor.get(), /*arena=*/nullptr, read_options, moptions.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kUncategorized)); /*skip_filters=*/false, TableReaderCaller::kUncategorized));
for (int i = 0; i < 2; ++i) { for (int i = 0; i < 2; ++i) {
ReadOptions ro; ReadOptions ro;

@ -37,11 +37,12 @@ Status GetAllKeyVersions(DB* db, ColumnFamilyHandle* cfh, Slice begin_key,
DBImpl* idb = static_cast<DBImpl*>(db->GetRootDB()); DBImpl* idb = static_cast<DBImpl*>(db->GetRootDB());
auto icmp = InternalKeyComparator(idb->GetOptions(cfh).comparator); auto icmp = InternalKeyComparator(idb->GetOptions(cfh).comparator);
ReadOptions read_options;
ReadRangeDelAggregator range_del_agg(&icmp, ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */); kMaxSequenceNumber /* upper_bound */);
Arena arena; Arena arena;
ScopedArenaIterator iter(idb->NewInternalIterator(&arena, &range_del_agg, ScopedArenaIterator iter(idb->NewInternalIterator(
kMaxSequenceNumber, cfh)); read_options, &arena, &range_del_agg, kMaxSequenceNumber, cfh));
if (!begin_key.empty()) { if (!begin_key.empty()) {
InternalKey ikey; InternalKey ikey;

Loading…
Cancel
Save