Get() to fail with underlying failures in PartitionIndexReader::CacheDependencies() (#7297)

Summary:
Right now all I/O failures under PartitionIndexReader::CacheDependencies() is swallowed. This doesn't impact correctness but we've made a decision that any I/O error in read path now should be returned to users for awareness. Return errors in those cases instead.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7297

Test Plan: Add a new unit test that ingest errors in this code path and see Get() fails. Only one I/O path is hit in PartitionIndexReader::CacheDependencies(). Several option changes are attempt but not able to got other pread paths triggered. Not sure whether other failure cases would be even possible. Would rely on continuous stress test to validate it.

Reviewed By: anand1976

Differential Revision: D23257950

fbshipit-source-id: 859dbc92fa239996e1bb378329344d3d54168c03
main
sdong 4 years ago committed by Facebook GitHub Bot
parent cecdd5d2ab
commit 722814e357
  1. 3
      HISTORY.md
  2. 39
      db/db_test2.cc
  3. 40
      db/db_test_util.h
  4. 5
      table/block_based/block_based_table_reader.cc
  5. 5
      table/block_based/block_based_table_reader.h
  6. 26
      table/block_based/partitioned_index_reader.cc
  7. 2
      table/block_based/partitioned_index_reader.h

@ -21,6 +21,9 @@
* Added file_checksum and file_checksum_func_name to TableFileCreationInfo, which can pass the table file checksum information through the OnTableFileCreated callback during flush and compaction.
### Others
* Error in prefetching partitioned index blocks will not be swallowed. It will fail the query and return the IOError users.
## 6.12 (2020-07-28)
### Public API Change
* Encryption file classes now exposed for inheritance in env_encryption.h

@ -4805,6 +4805,45 @@ TEST_F(DBTest2, BlockBasedTablePrefixIndexSeekForPrev) {
}
}
TEST_F(DBTest2, PartitionedIndexPrefetchFailure) {
Options options = last_options_;
options.max_open_files = 20;
BlockBasedTableOptions bbto;
bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
bbto.metadata_block_size = 128;
bbto.block_size = 128;
bbto.block_cache = NewLRUCache(16777216);
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
// Force no table cache so every read will preload the SST file.
dbfull()->TEST_table_cache()->SetCapacity(0);
bbto.block_cache->SetCapacity(0);
Random rnd(301);
for (int i = 0; i < 4096; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(32)));
}
ASSERT_OK(Flush());
// Try different random failures in table open for 300 times.
for (int i = 0; i < 300; i++) {
env_->num_reads_fails_ = 0;
env_->rand_reads_fail_odd_ = 8;
std::string value;
Status s = dbfull()->Get(ReadOptions(), Key(1), &value);
if (env_->num_reads_fails_ > 0) {
ASSERT_NOK(s);
} else {
ASSERT_OK(s);
}
}
env_->rand_reads_fail_odd_ = 0;
}
TEST_F(DBTest2, ChangePrefixExtractor) {
for (bool use_partitioned_filter : {true, false}) {
// create a DB with block prefix index

@ -488,12 +488,44 @@ class SpecialEnv : public EnvWrapper {
std::atomic<size_t>* bytes_read_;
};
class RandomFailureFile : public RandomAccessFile {
public:
RandomFailureFile(std::unique_ptr<RandomAccessFile>&& target,
std::atomic<uint64_t>* failure_cnt, uint32_t fail_odd)
: target_(std::move(target)),
fail_cnt_(failure_cnt),
fail_odd_(fail_odd) {}
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
if (Random::GetTLSInstance()->OneIn(fail_odd_)) {
fail_cnt_->fetch_add(1);
return Status::IOError("random error");
}
return target_->Read(offset, n, result, scratch);
}
virtual Status Prefetch(uint64_t offset, size_t n) override {
return target_->Prefetch(offset, n);
}
private:
std::unique_ptr<RandomAccessFile> target_;
std::atomic<uint64_t>* fail_cnt_;
uint32_t fail_odd_;
};
Status s = target()->NewRandomAccessFile(f, r, soptions);
random_file_open_counter_++;
if (s.ok() && count_random_reads_) {
r->reset(new CountingFile(std::move(*r), &random_read_counter_,
&random_read_bytes_counter_));
if (s.ok()) {
if (count_random_reads_) {
r->reset(new CountingFile(std::move(*r), &random_read_counter_,
&random_read_bytes_counter_));
} else if (rand_reads_fail_odd_ > 0) {
r->reset(new RandomFailureFile(std::move(*r), &num_reads_fails_,
rand_reads_fail_odd_));
}
}
if (s.ok() && soptions.compaction_readahead_size > 0) {
compaction_readahead_size_ = soptions.compaction_readahead_size;
}
@ -636,6 +668,8 @@ class SpecialEnv : public EnvWrapper {
std::atomic<int> num_open_wal_file_;
bool count_random_reads_;
uint32_t rand_reads_fail_odd_ = 0;
std::atomic<uint64_t> num_reads_fails_;
anon::AtomicCounter random_read_counter_;
std::atomic<size_t> random_read_bytes_counter_;
std::atomic<int> random_file_open_counter_;

@ -1015,7 +1015,10 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
// are hence follow the configuration for pin and prefetch regardless of
// the value of cache_index_and_filter_blocks
if (prefetch_all) {
rep_->index_reader->CacheDependencies(ro, pin_all);
s = rep_->index_reader->CacheDependencies(ro, pin_all);
}
if (!s.ok()) {
return s;
}
// prefetch the first level of filter

@ -206,7 +206,10 @@ class BlockBasedTable : public TableReader {
virtual size_t ApproximateMemoryUsage() const = 0;
// Cache the dependencies of the index reader (e.g. the partitions
// of a partitioned index).
virtual void CacheDependencies(const ReadOptions& /*ro*/, bool /* pin */) {}
virtual Status CacheDependencies(const ReadOptions& /*ro*/,
bool /* pin */) {
return Status::OK();
}
};
class IndexReaderCommon;

@ -104,7 +104,8 @@ InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
// the first level iter is always on heap and will attempt to delete it
// in its destructor.
}
void PartitionIndexReader::CacheDependencies(const ReadOptions& ro, bool pin) {
Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
bool pin) {
// Before read partitions, prefetch them to avoid lots of IOs
BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
const BlockBasedTable::Rep* rep = table()->rep_;
@ -116,12 +117,7 @@ void PartitionIndexReader::CacheDependencies(const ReadOptions& ro, bool pin) {
Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */,
&lookup_context, &index_block);
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log,
"Error retrieving top-level index block while trying to "
"cache index partitions: %s",
s.ToString().c_str());
IGNORE_STATUS_IF_ERROR(s);
return;
return s;
}
// We don't return pinned data from index blocks, so no need
@ -135,7 +131,7 @@ void PartitionIndexReader::CacheDependencies(const ReadOptions& ro, bool pin) {
biter.SeekToFirst();
if (!biter.Valid()) {
// Empty index.
return;
return biter.status();
}
handle = biter.value().handle;
uint64_t prefetch_off = handle.offset();
@ -144,7 +140,7 @@ void PartitionIndexReader::CacheDependencies(const ReadOptions& ro, bool pin) {
biter.SeekToLast();
if (!biter.Valid()) {
// Empty index.
return;
return biter.status();
}
handle = biter.value().handle;
uint64_t last_off = handle.offset() + block_size(handle);
@ -157,6 +153,9 @@ void PartitionIndexReader::CacheDependencies(const ReadOptions& ro, bool pin) {
s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
static_cast<size_t>(prefetch_len));
}
if (!s.ok()) {
return s;
}
// After prefetch, read the partitions one by one
biter.SeekToFirst();
@ -170,10 +169,10 @@ void PartitionIndexReader::CacheDependencies(const ReadOptions& ro, bool pin) {
&block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context,
/*contents=*/nullptr);
IGNORE_STATUS_IF_ERROR(s);
assert(s.ok() || block.GetValue() == nullptr);
if (s.ok() && block.GetValue() != nullptr) {
if (!s.ok()) {
return s;
}
if (block.GetValue() != nullptr) {
if (block.IsCached()) {
if (pin) {
partition_map_[handle.offset()] = std::move(block);
@ -181,6 +180,7 @@ void PartitionIndexReader::CacheDependencies(const ReadOptions& ro, bool pin) {
}
}
}
return biter.status();
}
} // namespace ROCKSDB_NAMESPACE

@ -29,7 +29,7 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) override;
void CacheDependencies(const ReadOptions& ro, bool pin) override;
Status CacheDependencies(const ReadOptions& ro, bool pin) override;
size_t ApproximateMemoryUsage() const override {
size_t usage = ApproximateIndexBlockMemoryUsage();
#ifdef ROCKSDB_MALLOC_USABLE_SIZE

Loading…
Cancel
Save