From e45673deceedc96b9b715679129ec67d3d1c7cdc Mon Sep 17 00:00:00 2001 From: Mike Kolupaev Date: Wed, 15 Apr 2020 17:37:23 -0700 Subject: [PATCH] Properly report IO errors when IndexType::kBinarySearchWithFirstKey is used (#6621) Summary: Context: Index type `kBinarySearchWithFirstKey` added the ability for sst file iterator to sometimes report a key from index without reading the corresponding data block. This is useful when sst blocks are cut at some meaningful boundaries (e.g. one block per key prefix), and many seeks land between blocks (e.g. for each prefix, the ranges of keys in different sst files are nearly disjoint, so a typical seek needs to read a data block from only one file even if all files have the prefix). But this added a new error condition, which rocksdb code was really not equipped to deal with: `InternalIterator::value()` may fail with an IO error or Status::Incomplete, but it's just a method returning a Slice, with no way to report error instead. Before this PR, this type of error wasn't handled at all (an empty slice was returned), and kBinarySearchWithFirstKey implementation was considered a prototype. Now that we (LogDevice) have experimented with kBinarySearchWithFirstKey for a while and confirmed that it's really useful, this PR is adding the missing error handling. It's a pretty inconvenient situation implementation-wise. The error needs to be reported from InternalIterator when trying to access value. But there are ~700 call sites of `InternalIterator::value()`, most of which either can't hit the error condition (because the iterator is reading from memtable or from index or something) or wouldn't benefit from the deferred loading of the value (e.g. compaction iterator that reads all values anyway). Adding error handling to all these call sites would needlessly bloat the code. So instead I made the deferred value loading optional: only the call sites that may use deferred loading have to call the new method `PrepareValue()` before calling `value()`. The feature is enabled with a new bool argument `allow_unprepared_value` to a bunch of methods that create iterators (it wouldn't make sense to put it in ReadOptions because it's completely internal to iterators, with virtually no user-visible effect). Lmk if you have better ideas. Note that the deferred value loading only happens for *internal* iterators. The user-visible iterator (DBIter) always prepares the value before returning from Seek/Next/etc. We could go further and add an API to defer that value loading too, but that's most likely not useful for LogDevice, so it doesn't seem worth the complexity for now. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6621 Test Plan: make -j5 check . Will also deploy to some logdevice test clusters and look at stats. Reviewed By: siying Differential Revision: D20786930 Pulled By: al13n321 fbshipit-source-id: 6da77d918bad3780522e918f17f4d5513d3e99ee --- HISTORY.md | 1 + db/arena_wrapped_db_iter.cc | 2 +- db/builder.cc | 3 +- db/compaction/compaction_job.cc | 3 +- db/db_impl/db_impl.cc | 19 ++++--- db/db_impl/db_impl.h | 10 +++- db/db_impl/db_impl_readonly.cc | 6 ++- db/db_impl/db_impl_secondary.cc | 3 +- db/db_iter.cc | 35 +++++++++++-- db/db_iterator_test.cc | 50 ++++++++++++++---- db/forward_iterator.cc | 52 ++++++++++++++++--- db/forward_iterator.h | 5 +- db/repair.cc | 3 +- db/table_cache.cc | 5 +- db/table_cache.h | 2 +- db/version_set.cc | 32 ++++++++---- db/version_set.h | 6 ++- include/rocksdb/table.h | 5 -- .../block_based/block_based_table_iterator.cc | 14 ++--- .../block_based/block_based_table_iterator.h | 33 +++++++----- table/block_based/block_based_table_reader.cc | 6 +-- table/block_based/block_based_table_reader.h | 3 +- table/cuckoo/cuckoo_table_reader.cc | 3 +- table/cuckoo/cuckoo_table_reader.h | 3 +- table/internal_iterator.h | 18 +++++++ table/iterator_wrapper.h | 19 +++++++ table/merging_iterator.cc | 12 +++++ table/mock_table.cc | 2 +- table/mock_table.h | 3 +- table/plain/plain_table_reader.cc | 3 +- table/plain/plain_table_reader.h | 3 +- table/table_reader.h | 3 +- table/table_test.cc | 15 +++++- tools/sst_dump_tool.cc | 8 +-- 34 files changed, 295 insertions(+), 95 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index bc7b2784f..815b5e422 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Bug Fixes * Fix wrong result being read from ingested file. May happen when a key in the file happen to be prefix of another key also in the file. The issue can further cause more data corruption. The issue exists with rocksdb >= 5.0.0 since DB::IngestExternalFile() was introduced. +* Finish implementation of BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey. It's now ready for use. Significantly reduces read amplification in some setups, especially for iterator seeks. ### Public API Change * Add NewFileChecksumGenCrc32cFactory to the file checksum public API, such that the builtin Crc32c based file checksum generator factory can be used by applications. diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc index f43282a75..72cc42904 100644 --- a/db/arena_wrapped_db_iter.cc +++ b/db/arena_wrapped_db_iter.cc @@ -75,7 +75,7 @@ Status ArenaWrappedDBIter::Refresh() { InternalIterator* internal_iter = db_impl_->NewInternalIterator( read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(), - latest_seq); + latest_seq, /* allow_unprepared_value */ true); SetIterUnderDBIter(internal_iter); } else { db_iter_->set_sequence(latest_seq); diff --git a/db/builder.cc b/db/builder.cc index 7d6b36d71..8568f8139 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -240,7 +240,8 @@ Status BuildTable( : internal_stats->GetFileReadHist(0), TableReaderCaller::kFlush, /*arena=*/nullptr, /*skip_filter=*/false, level, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key*/ nullptr)); + /*largest_compaction_key*/ nullptr, + /*allow_unprepared_value*/ false)); s = it->status(); if (s.ok() && paranoid_file_checks) { for (it->SeekToFirst(); it->Valid(); it->Next()) { diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 8ab390799..06dd520fc 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -655,7 +655,8 @@ Status CompactionJob::Run() { TableReaderCaller::kCompactionRefill, /*arena=*/nullptr, /*skip_filters=*/false, compact_->compaction->output_level(), /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr); + /*largest_compaction_key=*/nullptr, + /*allow_unprepared_value=*/false); auto s = iter->status(); if (s.ok() && paranoid_file_checks_) { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index e65ccff03..8b4e21c6c 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1313,7 +1313,7 @@ bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) { InternalIterator* DBImpl::NewInternalIterator( Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence, - ColumnFamilyHandle* column_family) { + ColumnFamilyHandle* column_family, bool allow_unprepared_value) { ColumnFamilyData* cfd; if (column_family == nullptr) { cfd = default_cf_handle_->cfd(); @@ -1327,7 +1327,7 @@ InternalIterator* DBImpl::NewInternalIterator( mutex_.Unlock(); ReadOptions roptions; return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg, - sequence); + sequence, allow_unprepared_value); } void DBImpl::SchedulePurge() { @@ -1450,7 +1450,8 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options, SuperVersion* super_version, Arena* arena, RangeDelAggregator* range_del_agg, - SequenceNumber sequence) { + SequenceNumber sequence, + bool allow_unprepared_value) { InternalIterator* internal_iter; assert(arena != nullptr); assert(range_del_agg != nullptr); @@ -1482,7 +1483,8 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options, // Collect iterators for files in L0 - Ln if (read_options.read_tier != kMemtableTier) { super_version->current->AddIterators(read_options, file_options_, - &merge_iter_builder, range_del_agg); + &merge_iter_builder, range_del_agg, + allow_unprepared_value); } internal_iter = merge_iter_builder.Finish(); IterState* cleanup = @@ -2548,7 +2550,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, #else SuperVersion* sv = cfd->GetReferencedSuperVersion(this); - auto iter = new ForwardIterator(this, read_options, cfd, sv); + auto iter = new ForwardIterator(this, read_options, cfd, sv, + /* allow_unprepared_value */ true); result = NewDBIterator( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, cfd->user_comparator(), iter, kMaxSequenceNumber, @@ -2625,7 +2628,8 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), - db_iter->GetRangeDelAggregator(), snapshot); + db_iter->GetRangeDelAggregator(), snapshot, + /* allow_unprepared_value */ true); db_iter->SetIterUnderDBIter(internal_iter); return db_iter; @@ -2653,7 +2657,8 @@ Status DBImpl::NewIterators( for (auto cfh : column_families) { auto cfd = reinterpret_cast(cfh)->cfd(); SuperVersion* sv = cfd->GetReferencedSuperVersion(this); - auto iter = new ForwardIterator(this, read_options, cfd, sv); + auto iter = new ForwardIterator(this, read_options, cfd, sv, + /* allow_unprepared_value */ true); iterators->push_back(NewDBIterator( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, cfd->user_comparator(), iter, kMaxSequenceNumber, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 167bc81e9..9e33955d6 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -581,9 +581,14 @@ class DBImpl : public DB { // Return an internal iterator over the current state of the database. // The keys of this iterator are internal keys (see format.h). // The returned iterator should be deleted when no longer needed. + // If allow_unprepared_value is true, the returned iterator may defer reading + // the value and so will require PrepareValue() to be called before value(); + // allow_unprepared_value = false is convenient when this optimization is not + // useful, e.g. when reading the whole column family. InternalIterator* NewInternalIterator( Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence, - ColumnFamilyHandle* column_family = nullptr); + ColumnFamilyHandle* column_family = nullptr, + bool allow_unprepared_value = false); LogsWithPrepTracker* logs_with_prep_tracker() { return &logs_with_prep_tracker_; @@ -709,7 +714,8 @@ class DBImpl : public DB { InternalIterator* NewInternalIterator( const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version, - Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence); + Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence, + bool allow_unprepared_value); // hollow transactions shell used for recovery. // these will then be passed to TransactionDB so that diff --git a/db/db_impl/db_impl_readonly.cc b/db/db_impl/db_impl_readonly.cc index adb88bf23..d728a4f68 100644 --- a/db/db_impl/db_impl_readonly.cc +++ b/db/db_impl/db_impl_readonly.cc @@ -87,7 +87,8 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options, super_version->version_number, read_callback); auto internal_iter = NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), - db_iter->GetRangeDelAggregator(), read_seq); + db_iter->GetRangeDelAggregator(), read_seq, + /* allow_unprepared_value */ true); db_iter->SetIterUnderDBIter(internal_iter); return db_iter; } @@ -118,7 +119,8 @@ Status DBImplReadOnly::NewIterators( sv->version_number, read_callback); auto* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), - db_iter->GetRangeDelAggregator(), read_seq); + db_iter->GetRangeDelAggregator(), read_seq, + /* allow_unprepared_value */ true); db_iter->SetIterUnderDBIter(internal_iter); iterators->push_back(db_iter); } diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index e12721399..e0c351f90 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -417,7 +417,8 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( super_version->version_number, read_callback); auto internal_iter = NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), - db_iter->GetRangeDelAggregator(), snapshot); + db_iter->GetRangeDelAggregator(), snapshot, + /* allow_unprepared_value */ true); db_iter->SetIterUnderDBIter(internal_iter); return db_iter; } diff --git a/db/db_iter.cc b/db/db_iter.cc index 64388e797..ba179ade8 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -267,6 +267,11 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, } else { assert(!skipping_saved_key || CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) > 0); + if (!iter_.PrepareValue()) { + assert(!iter_.status().ok()); + valid_ = false; + return false; + } num_skipped = 0; reseek_done = false; switch (ikey_.type) { @@ -452,6 +457,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, // Scan from the newer entries to older entries. // PRE: iter_.key() points to the first merge type entry // saved_key_ stores the user key +// iter_.PrepareValue() has been called // POST: saved_value_ has the merged value for the user key // iter_ points to the next entry (or invalid) bool DBIter::MergeValuesNewToOld() { @@ -481,14 +487,21 @@ bool DBIter::MergeValuesNewToOld() { if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) { // hit the next user key, stop right here break; - } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type || + } + if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type || range_del_agg_.ShouldDelete( ikey, RangeDelPositioningMode::kForwardTraversal)) { // hit a delete with the same user key, stop right here // iter_ is positioned after delete iter_.Next(); break; - } else if (kTypeValue == ikey.type) { + } + if (!iter_.PrepareValue()) { + valid_ = false; + return false; + } + + if (kTypeValue == ikey.type) { // hit a put, merge the put value with operands and store the // final result in saved_value_. We are done! const Slice val = iter_.value(); @@ -760,6 +773,11 @@ bool DBIter::FindValueForCurrentKey() { return FindValueForCurrentKeyUsingSeek(); } + if (!iter_.PrepareValue()) { + valid_ = false; + return false; + } + last_key_entry_type = ikey.type; switch (last_key_entry_type) { case kTypeValue: @@ -937,6 +955,10 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { valid_ = false; return false; } + if (!iter_.PrepareValue()) { + valid_ = false; + return false; + } if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) { assert(iter_.iter()->IsValuePinned()); pinned_value_ = iter_.value(); @@ -968,12 +990,17 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) { break; } - if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || range_del_agg_.ShouldDelete( ikey, RangeDelPositioningMode::kForwardTraversal)) { break; - } else if (ikey.type == kTypeValue) { + } + if (!iter_.PrepareValue()) { + valid_ = false; + return false; + } + + if (ikey.type == kTypeValue) { const Slice val = iter_.value(); Status s = MergeHelper::TimedFullMerge( merge_operator_, saved_key_.GetUserKey(), &val, diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index f8bf9fd1d..dcef1f897 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -1167,32 +1167,62 @@ TEST_P(DBIteratorTest, IndexWithFirstKey) { ropt.tailing = tailing; std::unique_ptr iter(NewIterator(ropt)); + ropt.read_tier = ReadTier::kBlockCacheTier; + std::unique_ptr nonblocking_iter(NewIterator(ropt)); + iter->Seek("b10"); ASSERT_TRUE(iter->Valid()); EXPECT_EQ("b2", iter->key().ToString()); EXPECT_EQ("y2", iter->value().ToString()); EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); + // The cache-only iterator should succeed too, using the blocks pulled into + // the cache by the previous iterator. + nonblocking_iter->Seek("b10"); + ASSERT_TRUE(nonblocking_iter->Valid()); + EXPECT_EQ("b2", nonblocking_iter->key().ToString()); + EXPECT_EQ("y2", nonblocking_iter->value().ToString()); + EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + + // ... but it shouldn't be able to step forward since the next block is + // not in cache yet. + nonblocking_iter->Next(); + ASSERT_FALSE(nonblocking_iter->Valid()); + ASSERT_TRUE(nonblocking_iter->status().IsIncomplete()); + + // ... nor should a seek to the next key succeed. + nonblocking_iter->Seek("b20"); + ASSERT_FALSE(nonblocking_iter->Valid()); + ASSERT_TRUE(nonblocking_iter->status().IsIncomplete()); + iter->Next(); ASSERT_TRUE(iter->Valid()); EXPECT_EQ("b3", iter->key().ToString()); EXPECT_EQ("y3", iter->value().ToString()); - EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); - EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + EXPECT_EQ(4, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); + EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + + // After the blocking iterator loaded the next block, the nonblocking + // iterator's seek should succeed. + nonblocking_iter->Seek("b20"); + ASSERT_TRUE(nonblocking_iter->Valid()); + EXPECT_EQ("b3", nonblocking_iter->key().ToString()); + EXPECT_EQ("y3", nonblocking_iter->value().ToString()); + EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); iter->Seek("c0"); ASSERT_TRUE(iter->Valid()); EXPECT_EQ("c0", iter->key().ToString()); EXPECT_EQ("z1,z2", iter->value().ToString()); - EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); - EXPECT_EQ(4, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); + EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + EXPECT_EQ(6, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); iter->Next(); ASSERT_TRUE(iter->Valid()); EXPECT_EQ("c3", iter->key().ToString()); EXPECT_EQ("z3", iter->value().ToString()); - EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); - EXPECT_EQ(5, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); + EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + EXPECT_EQ(7, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); iter.reset(); @@ -1207,13 +1237,13 @@ TEST_P(DBIteratorTest, IndexWithFirstKey) { ASSERT_TRUE(iter->Valid()); EXPECT_EQ("b2", iter->key().ToString()); EXPECT_EQ("y2", iter->value().ToString()); - EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); - EXPECT_EQ(5, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); + EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + EXPECT_EQ(7, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); iter->Next(); ASSERT_FALSE(iter->Valid()); - EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); - EXPECT_EQ(5, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); + EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + EXPECT_EQ(7, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); } } diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index f2b882549..156a23a45 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -36,7 +36,8 @@ class ForwardLevelIterator : public InternalIterator { ForwardLevelIterator(const ColumnFamilyData* const cfd, const ReadOptions& read_options, const std::vector& files, - const SliceTransform* prefix_extractor) + const SliceTransform* prefix_extractor, + bool allow_unprepared_value) : cfd_(cfd), read_options_(read_options), files_(files), @@ -44,7 +45,8 @@ class ForwardLevelIterator : public InternalIterator { file_index_(std::numeric_limits::max()), file_iter_(nullptr), pinned_iters_mgr_(nullptr), - prefix_extractor_(prefix_extractor) {} + prefix_extractor_(prefix_extractor), + allow_unprepared_value_(allow_unprepared_value) {} ~ForwardLevelIterator() override { // Reset current pointer @@ -83,7 +85,8 @@ class ForwardLevelIterator : public InternalIterator { /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator, /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr); + /*largest_compaction_key=*/nullptr, + allow_unprepared_value_); file_iter_->SetPinnedItersMgr(pinned_iters_mgr_); valid_ = false; if (!range_del_agg.IsEmpty()) { @@ -171,6 +174,16 @@ class ForwardLevelIterator : public InternalIterator { } return Status::OK(); } + bool PrepareValue() override { + assert(valid_); + if (file_iter_->PrepareValue()) { + return true; + } + + assert(!file_iter_->Valid()); + valid_ = false; + return false; + } bool IsKeyPinned() const override { return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && file_iter_->IsKeyPinned(); @@ -197,16 +210,19 @@ class ForwardLevelIterator : public InternalIterator { InternalIterator* file_iter_; PinnedIteratorsManager* pinned_iters_mgr_; const SliceTransform* prefix_extractor_; + const bool allow_unprepared_value_; }; ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, ColumnFamilyData* cfd, - SuperVersion* current_sv) + SuperVersion* current_sv, + bool allow_unprepared_value) : db_(db), read_options_(read_options), cfd_(cfd), prefix_extractor_(current_sv->mutable_cf_options.prefix_extractor.get()), user_comparator_(cfd->user_comparator()), + allow_unprepared_value_(allow_unprepared_value), immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())), sv_(current_sv), mutable_iter_(nullptr), @@ -560,6 +576,22 @@ Status ForwardIterator::status() const { return immutable_status_; } +bool ForwardIterator::PrepareValue() { + assert(valid_); + if (current_->PrepareValue()) { + return true; + } + + assert(!current_->Valid()); + assert(!current_->status().ok()); + assert(current_ != mutable_iter_); // memtable iterator can't fail + assert(immutable_status_.ok()); + + valid_ = false; + immutable_status_ = current_->status(); + return false; +} + Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) { assert(prop != nullptr); if (prop_name == "rocksdb.iterator.super-version-number") { @@ -655,7 +687,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { TableReaderCaller::kUserIterator, /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr)); + /*largest_compaction_key=*/nullptr, + allow_unprepared_value_)); } BuildLevelIterators(vstorage); current_ = nullptr; @@ -732,7 +765,8 @@ void ForwardIterator::RenewIterators() { TableReaderCaller::kUserIterator, /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr)); + /*largest_compaction_key=*/nullptr, + allow_unprepared_value_)); } for (auto* f : l0_iters_) { @@ -775,7 +809,8 @@ void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) { } else { level_iters_.push_back(new ForwardLevelIterator( cfd_, read_options_, level_files, - sv_->mutable_cf_options.prefix_extractor.get())); + sv_->mutable_cf_options.prefix_extractor.get(), + allow_unprepared_value_)); } } } @@ -796,7 +831,8 @@ void ForwardIterator::ResetIncompleteIterators() { TableReaderCaller::kUserIterator, /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr); + /*largest_compaction_key=*/nullptr, + allow_unprepared_value_); l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_); } diff --git a/db/forward_iterator.h b/db/forward_iterator.h index 8c671c75f..e094c6695 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -52,7 +52,8 @@ typedef std::priority_queue, class ForwardIterator : public InternalIterator { public: ForwardIterator(DBImpl* db, const ReadOptions& read_options, - ColumnFamilyData* cfd, SuperVersion* current_sv = nullptr); + ColumnFamilyData* cfd, SuperVersion* current_sv = nullptr, + bool allow_unprepared_value = false); virtual ~ForwardIterator(); void SeekForPrev(const Slice& /*target*/) override { @@ -75,6 +76,7 @@ class ForwardIterator : public InternalIterator { virtual Slice key() const override; virtual Slice value() const override; virtual Status status() const override; + virtual bool PrepareValue() override; virtual Status GetProperty(std::string prop_name, std::string* prop) override; virtual void SetPinnedItersMgr( PinnedIteratorsManager* pinned_iters_mgr) override; @@ -120,6 +122,7 @@ class ForwardIterator : public InternalIterator { ColumnFamilyData* const cfd_; const SliceTransform* const prefix_extractor_; const Comparator* user_comparator_; + const bool allow_unprepared_value_; MinIterHeap immutable_min_heap_; SuperVersion* sv_; diff --git a/db/repair.cc b/db/repair.cc index 5155be985..e964f6884 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -529,7 +529,8 @@ class Repairer { /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, TableReaderCaller::kRepair, /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr); + /*largest_compaction_key=*/nullptr, + /*allow_unprepared_value=*/false); ParsedInternalKey parsed; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); diff --git a/db/table_cache.cc b/db/table_cache.cc index 411959a33..4628a3735 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -184,7 +184,7 @@ InternalIterator* TableCache::NewIterator( TableReader** table_reader_ptr, HistogramImpl* file_read_hist, TableReaderCaller caller, Arena* arena, bool skip_filters, int level, const InternalKey* smallest_compaction_key, - const InternalKey* largest_compaction_key) { + const InternalKey* largest_compaction_key, bool allow_unprepared_value) { PERF_TIMER_GUARD(new_table_iterator_nanos); Status s; @@ -213,7 +213,8 @@ InternalIterator* TableCache::NewIterator( } else { result = table_reader->NewIterator(options, prefix_extractor, arena, skip_filters, caller, - file_options.compaction_readahead_size); + file_options.compaction_readahead_size, + allow_unprepared_value); } if (handle != nullptr) { result->RegisterCleanup(&UnrefEntry, cache_, handle); diff --git a/db/table_cache.h b/db/table_cache.h index b9de824ee..45d4f2998 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -73,7 +73,7 @@ class TableCache { const SliceTransform* prefix_extractor, TableReader** table_reader_ptr, HistogramImpl* file_read_hist, TableReaderCaller caller, Arena* arena, bool skip_filters, int level, const InternalKey* smallest_compaction_key, - const InternalKey* largest_compaction_key); + const InternalKey* largest_compaction_key, bool allow_unprepared_value); // If a seek to internal key "k" in specified file finds an entry, // call get_context->SaveValue() repeatedly until diff --git a/db/version_set.cc b/db/version_set.cc index 4712f0694..c2674dcae 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -861,7 +861,8 @@ class LevelIterator final : public InternalIterator { HistogramImpl* file_read_hist, TableReaderCaller caller, bool skip_filters, int level, RangeDelAggregator* range_del_agg, const std::vector* - compaction_boundaries = nullptr) + compaction_boundaries = nullptr, + bool allow_unprepared_value = false) : table_cache_(table_cache), read_options_(read_options), file_options_(file_options), @@ -873,6 +874,7 @@ class LevelIterator final : public InternalIterator { should_sample_(should_sample), caller_(caller), skip_filters_(skip_filters), + allow_unprepared_value_(allow_unprepared_value), file_index_(flevel_->num_files), level_(level), range_del_agg_(range_del_agg), @@ -907,6 +909,10 @@ class LevelIterator final : public InternalIterator { return file_iter_.iter() ? file_iter_.status() : Status::OK(); } + bool PrepareValue() override { + return file_iter_.PrepareValue(); + } + inline bool MayBeOutOfLowerBound() override { assert(Valid()); return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound(); @@ -979,7 +985,7 @@ class LevelIterator final : public InternalIterator { range_del_agg_, prefix_extractor_, nullptr /* don't need reference to table */, file_read_hist_, caller_, /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key, - largest_compaction_key); + largest_compaction_key, allow_unprepared_value_); } // Check if current file being fully within iterate_lower_bound. @@ -1012,6 +1018,7 @@ class LevelIterator final : public InternalIterator { bool should_sample_; TableReaderCaller caller_; bool skip_filters_; + bool allow_unprepared_value_; bool may_be_out_of_lower_bound_ = true; size_t file_index_; int level_; @@ -1117,6 +1124,7 @@ bool LevelIterator::NextAndGetResult(IterateResult* result) { if (is_valid) { result->key = key(); result->may_be_out_of_upper_bound = MayBeOutOfUpperBound(); + result->value_prepared = !allow_unprepared_value_; } return is_valid; } @@ -1533,12 +1541,13 @@ double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel( void Version::AddIterators(const ReadOptions& read_options, const FileOptions& soptions, MergeIteratorBuilder* merge_iter_builder, - RangeDelAggregator* range_del_agg) { + RangeDelAggregator* range_del_agg, + bool allow_unprepared_value) { assert(storage_info_.finalized_); for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) { AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level, - range_del_agg); + range_del_agg, allow_unprepared_value); } } @@ -1546,7 +1555,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, const FileOptions& soptions, MergeIteratorBuilder* merge_iter_builder, int level, - RangeDelAggregator* range_del_agg) { + RangeDelAggregator* range_del_agg, + bool allow_unprepared_value) { assert(storage_info_.finalized_); if (level >= storage_info_.num_non_empty_levels()) { // This is an empty level @@ -1571,7 +1581,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, TableReaderCaller::kUserIterator, arena, /*skip_filters=*/false, /*level=*/0, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr)); + /*largest_compaction_key=*/nullptr, + allow_unprepared_value)); } if (should_sample) { // Count ones for every L0 files. This is done per iterator creation @@ -1593,7 +1604,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(), cfd_->internal_stats()->GetFileReadHist(level), TableReaderCaller::kUserIterator, IsFilterSkipped(level), level, - range_del_agg, /*largest_compaction_key=*/nullptr)); + range_del_agg, /*largest_compaction_key=*/nullptr, + allow_unprepared_value)); } } @@ -1629,7 +1641,8 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, TableReaderCaller::kUserIterator, &arena, /*skip_filters=*/false, /*level=*/0, /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr)); + /*largest_compaction_key=*/nullptr, + /*allow_unprepared_value=*/false)); status = OverlapWithIterator( ucmp, smallest_user_key, largest_user_key, iter.get(), overlap); if (!status.ok() || *overlap) { @@ -5572,7 +5585,8 @@ InternalIterator* VersionSet::MakeInputIterator( /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/static_cast(which), /*smallest_compaction_key=*/nullptr, - /*largest_compaction_key=*/nullptr); + /*largest_compaction_key=*/nullptr, + /*allow_unprepared_value=*/false); } } else { // Create concatenating iterator for the files from this level diff --git a/db/version_set.h b/db/version_set.h index 8e29a6d94..dc9b26289 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -573,11 +573,13 @@ class Version { // REQUIRES: This version has been saved (see VersionSet::SaveTo) void AddIterators(const ReadOptions&, const FileOptions& soptions, MergeIteratorBuilder* merger_iter_builder, - RangeDelAggregator* range_del_agg); + RangeDelAggregator* range_del_agg, + bool allow_unprepared_value); void AddIteratorsForLevel(const ReadOptions&, const FileOptions& soptions, MergeIteratorBuilder* merger_iter_builder, - int level, RangeDelAggregator* range_del_agg); + int level, RangeDelAggregator* range_del_agg, + bool allow_unprepared_value); Status OverlapWithLevelIterator(const ReadOptions&, const FileOptions&, const Slice& smallest_user_key, diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 2263c765c..7963513e0 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -113,11 +113,6 @@ struct BlockBasedTableOptions { // e.g. when prefix changes. // Makes the index significantly bigger (2x or more), especially when keys // are long. - // - // IO errors are not handled correctly in this mode right now: if an error - // happens when lazily reading a block in value(), value() returns empty - // slice, and you need to call Valid()/status() afterwards. - // TODO(kolmike): Fix it. kBinarySearchWithFirstKey = 0x03, }; diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index 603c5af38..aa398766b 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -61,10 +61,9 @@ void BlockBasedTableIterator::SeekImpl(const Slice* target) { const bool same_block = block_iter_points_to_real_block_ && v.handle.offset() == prev_block_offset_; - // TODO(kolmike): Remove the != kBlockCacheTier condition. if (!v.first_internal_key.empty() && !same_block && (!target || icomp_.Compare(*target, v.first_internal_key) <= 0) && - read_options_.read_tier != kBlockCacheTier) { + allow_unprepared_value_) { // Index contains the first key of the block, and it's >= target. // We can defer reading the block. is_at_first_key_from_index_ = true; @@ -191,6 +190,7 @@ bool BlockBasedTableIterator::NextAndGetResult(IterateResult* result) { if (is_valid) { result->key = key(); result->may_be_out_of_upper_bound = MayBeOutOfUpperBound(); + result->value_prepared = !is_at_first_key_from_index_; } return is_valid; } @@ -255,12 +255,16 @@ bool BlockBasedTableIterator::MaterializeCurrentBlock() { is_at_first_key_from_index_ = false; InitDataBlock(); assert(block_iter_points_to_real_block_); + + if (!block_iter_.status().ok()) { + return false; + } + block_iter_.SeekToFirst(); if (!block_iter_.Valid() || icomp_.Compare(block_iter_.key(), index_iter_->value().first_internal_key) != 0) { - // Uh oh. block_iter_.Invalidate(Status::Corruption( "first key in index doesn't match first key in block")); return false; @@ -321,9 +325,7 @@ void BlockBasedTableIterator::FindBlockForward() { IndexValue v = index_iter_->value(); - // TODO(kolmike): Remove the != kBlockCacheTier condition. - if (!v.first_internal_key.empty() && - read_options_.read_tier != kBlockCacheTier) { + if (!v.first_internal_key.empty() && allow_unprepared_value_) { // Index contains the first key of the block. Defer reading the block. is_at_first_key_from_index_ = true; return; diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index 0ad8cb99f..a5fe22875 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -25,11 +25,13 @@ class BlockBasedTableIterator : public InternalIteratorBase { std::unique_ptr>&& index_iter, bool check_filter, bool need_upper_bound_check, const SliceTransform* prefix_extractor, TableReaderCaller caller, - size_t compaction_readahead_size = 0) + size_t compaction_readahead_size = 0, + bool allow_unprepared_value = false) : table_(table), read_options_(read_options), icomp_(icomp), user_comparator_(icomp.user_comparator()), + allow_unprepared_value_(allow_unprepared_value), index_iter_(std::move(index_iter)), pinned_iters_mgr_(nullptr), block_iter_points_to_real_block_(false), @@ -69,18 +71,21 @@ class BlockBasedTableIterator : public InternalIteratorBase { return block_iter_.user_key(); } } - Slice value() const override { + bool PrepareValue() override { assert(Valid()); - // Load current block if not loaded. - if (is_at_first_key_from_index_ && - !const_cast(this) - ->MaterializeCurrentBlock()) { - // Oops, index is not consistent with block contents, but we have - // no good way to report error at this point. Let's return empty value. - return Slice(); + if (!is_at_first_key_from_index_) { + return true; } + return const_cast(this) + ->MaterializeCurrentBlock(); + } + Slice value() const override { + // PrepareValue() must have been called. + assert(!is_at_first_key_from_index_); + assert(Valid()); + return block_iter_.value(); } Status status() const override { @@ -113,10 +118,9 @@ class BlockBasedTableIterator : public InternalIteratorBase { (block_iter_points_to_real_block_ && block_iter_.IsKeyPinned())); } bool IsValuePinned() const override { - // Load current block if not loaded. - if (is_at_first_key_from_index_) { - const_cast(this)->MaterializeCurrentBlock(); - } + assert(!is_at_first_key_from_index_); + assert(Valid()); + // BlockIter::IsValuePinned() is always true. No need to check return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && block_iter_points_to_real_block_; @@ -150,6 +154,7 @@ class BlockBasedTableIterator : public InternalIteratorBase { const ReadOptions read_options_; const InternalKeyComparator& icomp_; UserComparatorWrapper user_comparator_; + const bool allow_unprepared_value_; std::unique_ptr> index_iter_; PinnedIteratorsManager* pinned_iters_mgr_; DataBlockIter block_iter_; @@ -162,7 +167,7 @@ class BlockBasedTableIterator : public InternalIteratorBase { // Whether current data block being fully within iterate upper bound. bool data_block_within_upper_bound_ = false; // True if we're standing at the first key of a block, and we haven't loaded - // that block yet. A call to value() will trigger loading the block. + // that block yet. A call to PrepareValue() will trigger loading the block. bool is_at_first_key_from_index_ = false; bool check_filter_; // TODO(Zhongyi): pick a better name diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index f6a5068fa..1dc2ae080 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -2027,7 +2027,7 @@ bool BlockBasedTable::PrefixMayMatch( InternalIterator* BlockBasedTable::NewIterator( const ReadOptions& read_options, const SliceTransform* prefix_extractor, Arena* arena, bool skip_filters, TableReaderCaller caller, - size_t compaction_readahead_size) { + size_t compaction_readahead_size, bool allow_unprepared_value) { BlockCacheLookupContext lookup_context{caller}; bool need_upper_bound_check = read_options.auto_prefix_mode || @@ -2043,7 +2043,7 @@ InternalIterator* BlockBasedTable::NewIterator( !skip_filters && !read_options.total_order_seek && prefix_extractor != nullptr, need_upper_bound_check, prefix_extractor, caller, - compaction_readahead_size); + compaction_readahead_size, allow_unprepared_value); } else { auto* mem = arena->AllocateAligned(sizeof(BlockBasedTableIterator)); return new (mem) BlockBasedTableIterator( @@ -2051,7 +2051,7 @@ InternalIterator* BlockBasedTable::NewIterator( !skip_filters && !read_options.total_order_seek && prefix_extractor != nullptr, need_upper_bound_check, prefix_extractor, caller, - compaction_readahead_size); + compaction_readahead_size, allow_unprepared_value); } } diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index de914d337..bc73f2117 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -116,7 +116,8 @@ class BlockBasedTable : public TableReader { const SliceTransform* prefix_extractor, Arena* arena, bool skip_filters, TableReaderCaller caller, - size_t compaction_readahead_size = 0) override; + size_t compaction_readahead_size = 0, + bool allow_unprepared_value = false) override; FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( const ReadOptions& read_options) override; diff --git a/table/cuckoo/cuckoo_table_reader.cc b/table/cuckoo/cuckoo_table_reader.cc index 930204fb2..1c068b08e 100644 --- a/table/cuckoo/cuckoo_table_reader.cc +++ b/table/cuckoo/cuckoo_table_reader.cc @@ -380,7 +380,8 @@ InternalIterator* CuckooTableReader::NewIterator( const ReadOptions& /*read_options*/, const SliceTransform* /* prefix_extractor */, Arena* arena, bool /*skip_filters*/, TableReaderCaller /*caller*/, - size_t /*compaction_readahead_size*/) { + size_t /*compaction_readahead_size*/, + bool /* allow_unprepared_value */) { if (!status().ok()) { return NewErrorInternalIterator( Status::Corruption("CuckooTableReader status is not okay."), arena); diff --git a/table/cuckoo/cuckoo_table_reader.h b/table/cuckoo/cuckoo_table_reader.h index 65bd13e1a..5a7c8b72d 100644 --- a/table/cuckoo/cuckoo_table_reader.h +++ b/table/cuckoo/cuckoo_table_reader.h @@ -52,7 +52,8 @@ class CuckooTableReader: public TableReader { const SliceTransform* prefix_extractor, Arena* arena, bool skip_filters, TableReaderCaller caller, - size_t compaction_readahead_size = 0) override; + size_t compaction_readahead_size = 0, + bool allow_unprepared_value = false) override; void Prepare(const Slice& target) override; // Report an approximation of how much memory has been used. diff --git a/table/internal_iterator.h b/table/internal_iterator.h index fef6afe70..829cb2fc8 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -20,6 +20,8 @@ class PinnedIteratorsManager; struct IterateResult { Slice key; bool may_be_out_of_upper_bound; + // If false, PrepareValue() needs to be called before value(). + bool value_prepared = true; }; template @@ -78,6 +80,7 @@ class InternalIteratorBase : public Cleanable { // call. If an implementation has non-trivial MayBeOutOfUpperBound(), // it should also override NextAndGetResult(). result->may_be_out_of_upper_bound = true; + result->value_prepared = false; assert(MayBeOutOfUpperBound()); } return is_valid; @@ -102,6 +105,7 @@ class InternalIteratorBase : public Cleanable { // the returned slice is valid only until the next modification of // the iterator. // REQUIRES: Valid() + // REQUIRES: PrepareValue() has been called if needed (see PrepareValue()). virtual TValue value() const = 0; // If an error has occurred, return it. Else return an ok status. @@ -109,6 +113,19 @@ class InternalIteratorBase : public Cleanable { // satisfied without doing some IO, then this returns Status::Incomplete(). virtual Status status() const = 0; + // For some types of iterators, sometimes Seek()/Next()/SeekForPrev()/etc may + // load key but not value (to avoid the IO cost of reading the value from disk + // if it won't be not needed). This method loads the value in such situation. + // + // Needs to be called before value() at least once after each iterator + // movement (except if IterateResult::value_prepared = true), for iterators + // created with allow_unprepared_value = true. + // + // Returns false if an error occurred; in this case Valid() is also changed + // to false, and status() is changed to non-ok. + // REQUIRES: Valid() + virtual bool PrepareValue() { return true; } + // True if the iterator is invalidated because it reached a key that is above // the iterator upper bound. Used by LevelIterator to decide whether it should // stop or move on to the next file. @@ -144,6 +161,7 @@ class InternalIteratorBase : public Cleanable { // If true, this means that the Slice returned by value() is valid as long as // PinnedIteratorsManager::ReleasePinnedData is not called and the // Iterator is not deleted. + // REQUIRES: Same as for value(). virtual bool IsValuePinned() const { return false; } virtual Status GetProperty(std::string /*prop_name*/, std::string* /*prop*/) { diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index c13359e99..b4b051ff6 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -70,6 +70,20 @@ class IteratorWrapperBase { assert(iter_); return iter_->status(); } + bool PrepareValue() { + assert(Valid()); + if (result_.value_prepared) { + return true; + } + if (iter_->PrepareValue()) { + result_.value_prepared = true; + return true; + } + + assert(!iter_->Valid()); + valid_ = false; + return false; + } void Next() { assert(iter_); valid_ = iter_->NextAndGetResult(&result_); @@ -124,6 +138,10 @@ class IteratorWrapperBase { return iter_->IsValuePinned(); } + bool IsValuePrepared() const { + return result_.value_prepared; + } + private: void Update() { valid_ = iter_->Valid(); @@ -131,6 +149,7 @@ class IteratorWrapperBase { assert(iter_->status().ok()); result_.key = iter_->key(); result_.may_be_out_of_upper_bound = true; + result_.value_prepared = false; } } diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 47fa048f3..695a03013 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -195,6 +195,7 @@ class MergingIterator : public InternalIterator { if (is_valid) { result->key = key(); result->may_be_out_of_upper_bound = MayBeOutOfUpperBound(); + result->value_prepared = current_->IsValuePrepared(); } return is_valid; } @@ -240,6 +241,17 @@ class MergingIterator : public InternalIterator { return current_->value(); } + bool PrepareValue() override { + assert(Valid()); + if (current_->PrepareValue()) { + return true; + } + + considerStatus(current_->status()); + assert(!status_.ok()); + return false; + } + // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result // from current child iterator. Potentially as long as one of child iterator // report out of bound is not possible, we know current key is within bound. diff --git a/table/mock_table.cc b/table/mock_table.cc index ca085a198..aabde104b 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -30,7 +30,7 @@ stl_wrappers::KVMap MakeMockFile( InternalIterator* MockTableReader::NewIterator( const ReadOptions&, const SliceTransform* /* prefix_extractor */, Arena* /*arena*/, bool /*skip_filters*/, TableReaderCaller /*caller*/, - size_t /*compaction_readahead_size*/) { + size_t /*compaction_readahead_size*/, bool /* allow_unprepared_value */) { return new MockTableIterator(table_); } diff --git a/table/mock_table.h b/table/mock_table.h index 798acb025..1e77288e2 100644 --- a/table/mock_table.h +++ b/table/mock_table.h @@ -44,7 +44,8 @@ class MockTableReader : public TableReader { const SliceTransform* prefix_extractor, Arena* arena, bool skip_filters, TableReaderCaller caller, - size_t compaction_readahead_size = 0) override; + size_t compaction_readahead_size = 0, + bool allow_unprepared_value = false) override; Status Get(const ReadOptions& readOptions, const Slice& key, GetContext* get_context, const SliceTransform* prefix_extractor, diff --git a/table/plain/plain_table_reader.cc b/table/plain/plain_table_reader.cc index 6d560477d..484a1105b 100644 --- a/table/plain/plain_table_reader.cc +++ b/table/plain/plain_table_reader.cc @@ -201,7 +201,8 @@ void PlainTableReader::SetupForCompaction() { InternalIterator* PlainTableReader::NewIterator( const ReadOptions& options, const SliceTransform* /* prefix_extractor */, Arena* arena, bool /*skip_filters*/, TableReaderCaller /*caller*/, - size_t /*compaction_readahead_size*/) { + size_t /*compaction_readahead_size*/, + bool /* allow_unprepared_value */) { // Not necessarily used here, but make sure this has been initialized assert(table_properties_); diff --git a/table/plain/plain_table_reader.h b/table/plain/plain_table_reader.h index db7b0626f..e3b12a9c3 100644 --- a/table/plain/plain_table_reader.h +++ b/table/plain/plain_table_reader.h @@ -84,7 +84,8 @@ class PlainTableReader: public TableReader { const SliceTransform* prefix_extractor, Arena* arena, bool skip_filters, TableReaderCaller caller, - size_t compaction_readahead_size = 0) override; + size_t compaction_readahead_size = 0, + bool allow_unprepared_value = false) override; void Prepare(const Slice& target) override; diff --git a/table/table_reader.h b/table/table_reader.h index 4a08e3883..f54ada09f 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -50,7 +50,8 @@ class TableReader { virtual InternalIterator* NewIterator( const ReadOptions&, const SliceTransform* prefix_extractor, Arena* arena, bool skip_filters, TableReaderCaller caller, - size_t compaction_readahead_size = 0) = 0; + size_t compaction_readahead_size = 0, + bool allow_unprepared_value = false) = 0; virtual FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( const ReadOptions& /*read_options*/) { diff --git a/table/table_test.cc b/table/table_test.cc index e45c67e35..17f8d1b9c 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -2252,7 +2252,8 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) { ASSERT_EQ(4u, props->num_data_blocks); std::unique_ptr iter(reader->NewIterator( ReadOptions(), /*prefix_extractor=*/nullptr, /*arena=*/nullptr, - /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + /*skip_filters=*/false, TableReaderCaller::kUncategorized, + /*compaction_readahead_size=*/0, /*allow_unprepared_value=*/true)); // Shouldn't have read data blocks before iterator is seeked. EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); @@ -2269,6 +2270,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) { EXPECT_EQ(keys[2], iter->key().ToString()); EXPECT_EQ(use_first_key ? 0 : 1, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); + ASSERT_TRUE(iter->PrepareValue()); EXPECT_EQ("v2", iter->value().ToString()); EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); @@ -2279,6 +2281,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) { EXPECT_EQ(keys[4], iter->key().ToString()); EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + ASSERT_TRUE(iter->PrepareValue()); EXPECT_EQ("v4", iter->value().ToString()); EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); @@ -2294,6 +2297,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) { ASSERT_TRUE(iter->Valid()); EXPECT_EQ(keys[5], iter->key().ToString()); EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + ASSERT_TRUE(iter->PrepareValue()); EXPECT_EQ("v5", iter->value().ToString()); EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); @@ -2311,6 +2315,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) { ASSERT_TRUE(iter->Valid()); EXPECT_EQ(keys[7], iter->key().ToString()); EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); + ASSERT_TRUE(iter->PrepareValue()); EXPECT_EQ("v7", iter->value().ToString()); EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); @@ -2332,6 +2337,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) { EXPECT_EQ(keys[3], iter->key().ToString()); EXPECT_EQ(use_first_key ? 1 : 2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + ASSERT_TRUE(iter->PrepareValue()); EXPECT_EQ("v3", iter->value().ToString()); EXPECT_EQ(2, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); EXPECT_EQ(3, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); @@ -2351,6 +2357,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) { stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); // All blocks are in cache now, there'll be no more misses ever. EXPECT_EQ(4, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); + ASSERT_TRUE(iter->PrepareValue()); EXPECT_EQ("v1", iter->value().ToString()); // Next into the next block again. @@ -2378,6 +2385,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) { EXPECT_EQ(keys[4], iter->key().ToString()); EXPECT_EQ(use_first_key ? 3 : 6, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + ASSERT_TRUE(iter->PrepareValue()); EXPECT_EQ("v4", iter->value().ToString()); EXPECT_EQ(use_first_key ? 3 : 6, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); @@ -2387,6 +2395,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKey2) { EXPECT_EQ(keys[7], iter->key().ToString()); EXPECT_EQ(use_first_key ? 4 : 7, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); + ASSERT_TRUE(iter->PrepareValue()); EXPECT_EQ("v7", iter->value().ToString()); EXPECT_EQ(use_first_key ? 4 : 7, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); @@ -2427,7 +2436,8 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKeyGlobalSeqno) { ASSERT_EQ(1u, props->num_data_blocks); std::unique_ptr iter(reader->NewIterator( ReadOptions(), /*prefix_extractor=*/nullptr, /*arena=*/nullptr, - /*skip_filters=*/false, TableReaderCaller::kUncategorized)); + /*skip_filters=*/false, TableReaderCaller::kUncategorized, + /*compaction_readahead_size=*/0, /*allow_unprepared_value=*/true)); iter->Seek(InternalKey("a", 0, kTypeValue).Encode().ToString()); ASSERT_TRUE(iter->Valid()); @@ -2437,6 +2447,7 @@ TEST_P(BlockBasedTableTest, BinaryIndexWithFirstKeyGlobalSeqno) { // Key should have been served from index, without reading data blocks. EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); + ASSERT_TRUE(iter->PrepareValue()); EXPECT_EQ("x", iter->value().ToString()); EXPECT_EQ(1, stats->getTickerCount(BLOCK_CACHE_DATA_MISS)); EXPECT_EQ(0, stats->getTickerCount(BLOCK_CACHE_DATA_HIT)); diff --git a/tools/sst_dump_tool.cc b/tools/sst_dump_tool.cc index 45be045b7..5f907c6f4 100644 --- a/tools/sst_dump_tool.cc +++ b/tools/sst_dump_tool.cc @@ -186,12 +186,12 @@ uint64_t SstFileDumper::CalculateCompressedTableSize( ReadOptions(), moptions_.prefix_extractor.get(), /*arena=*/nullptr, /*skip_filters=*/false, TableReaderCaller::kSSTDumpTool)); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - if (!iter->status().ok()) { - fputs(iter->status().ToString().c_str(), stderr); - exit(1); - } table_builder->Add(iter->key(), iter->value()); } + if (!iter->status().ok()) { + fputs(iter->status().ToString().c_str(), stderr); + exit(1); + } Status s = table_builder->Finish(); if (!s.ok()) { fputs(s.ToString().c_str(), stderr);