Clarify caching behavior for index and filter partitions (#9068)

Summary:
Somewhat confusingly, index and filter partition blocks are
never owned by table readers, even with
cache_index_and_filter_blocks=false. They still go into block cache
(possibly pinned by table reader) if there is a block cache. If no block
cache, they are only loaded transiently on demand.

This PR primarily clarifies the options APIs and some internal code
comments.

Also, this closes a hypothetical data corruption vulnerability where
some but not all index partitions are pinned. I haven't been able to
reproduce a case where it can happen (the failure seems to propagate
to abort table open) but it's worth patching nonetheless.

Fixes https://github.com/facebook/rocksdb/issues/8979

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9068

Test Plan:
existing tests :-/  I could cover the new code using sync
points, but then I'd have to very carefully relax my `assert(false)`

Reviewed By: ajkr

Differential Revision: D31898284

Pulled By: pdillinger

fbshipit-source-id: f2511a7d3a36bc04b627935d8e6cfea6422f98be
main
Peter Dillinger 3 years ago committed by Facebook GitHub Bot
parent 82846f41d3
commit 5bf9a7d5ee
  1. 12
      include/rocksdb/table.h
  2. 35
      table/block_based/block_based_table_reader.cc
  3. 2
      table/block_based/partitioned_filter_block.h
  4. 45
      table/block_based/partitioned_index_reader.cc
  5. 3
      table/block_based/partitioned_index_reader.h
  6. 4
      table/two_level_iterator.cc

@ -117,9 +117,10 @@ struct BlockBasedTableOptions {
// caching as they should now apply to range tombstone and compression // caching as they should now apply to range tombstone and compression
// dictionary meta-blocks, in addition to index and filter meta-blocks. // dictionary meta-blocks, in addition to index and filter meta-blocks.
// //
// Indicating if we'd put index/filter blocks to the block cache. // Whether to put index/filter blocks in the block cache. When false,
// If not specified, each "table reader" object will pre-load index/filter // each "table reader" object will pre-load index/filter blocks during
// block during table initialization. // table initialization. Index and filter partition blocks always use
// block cache regardless of this option.
bool cache_index_and_filter_blocks = false; bool cache_index_and_filter_blocks = false;
// If cache_index_and_filter_blocks is enabled, cache index and filter // If cache_index_and_filter_blocks is enabled, cache index and filter
@ -190,6 +191,8 @@ struct BlockBasedTableOptions {
kHashSearch = 0x01, kHashSearch = 0x01,
// A two-level index implementation. Both levels are binary search indexes. // A two-level index implementation. Both levels are binary search indexes.
// Second level index blocks ("partitions") use block cache even when
// cache_index_and_filter_blocks=false.
kTwoLevelIndexSearch = 0x02, kTwoLevelIndexSearch = 0x02,
// Like kBinarySearch, but index also contains first key of each block. // Like kBinarySearch, but index also contains first key of each block.
@ -285,7 +288,8 @@ struct BlockBasedTableOptions {
// well. // well.
// TODO(myabandeh): remove the note above once the limitation is lifted // TODO(myabandeh): remove the note above once the limitation is lifted
// Use partitioned full filters for each SST file. This option is // Use partitioned full filters for each SST file. This option is
// incompatible with block-based filters. // incompatible with block-based filters. Filter partition blocks use
// block cache even when cache_index_and_filter_blocks=false.
bool partition_filters = false; bool partition_filters = false;
// Option to generate Bloom/Ribbon filters that minimize memory // Option to generate Bloom/Ribbon filters that minimize memory

@ -2063,24 +2063,23 @@ BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator(
const BlockHandle& handle) { const BlockHandle& handle) {
// Return a block iterator on the index partition // Return a block iterator on the index partition
auto block = block_map_->find(handle.offset()); auto block = block_map_->find(handle.offset());
// This is a possible scenario since block cache might not have had space // block_map_ must be exhaustive
// for the partition if (block == block_map_->end()) {
if (block != block_map_->end()) { assert(false);
const Rep* rep = table_->get_rep(); // Signal problem to caller
assert(rep); return nullptr;
}
Statistics* kNullStats = nullptr; const Rep* rep = table_->get_rep();
// We don't return pinned data from index blocks, so no need assert(rep);
// to set `block_contents_pinned`.
return block->second.GetValue()->NewIndexIterator( Statistics* kNullStats = nullptr;
rep->internal_comparator.user_comparator(), // We don't return pinned data from index blocks, so no need
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true, // to set `block_contents_pinned`.
rep->index_has_first_key, rep->index_key_includes_seq, return block->second.GetValue()->NewIndexIterator(
rep->index_value_is_full); rep->internal_comparator.user_comparator(),
} rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
// Create an empty iterator rep->index_has_first_key, rep->index_key_includes_seq,
// TODO(ajkr): this is not the right way to handle an unpinned partition. rep->index_value_is_full);
return new IndexBlockIter();
} }
// This will be broken if the user specifies an unusual implementation // This will be broken if the user specifies an unusual implementation

@ -142,6 +142,8 @@ class PartitionedFilterBlockReader : public FilterBlockReaderCommon<Block> {
bool index_value_is_full() const; bool index_value_is_full() const;
protected: protected:
// For partition blocks pinned in cache. Can be a subset of blocks
// in case some fail insertion on attempt to pin.
std::unordered_map<uint64_t, CachableEntry<ParsedFullFilterBlock>> std::unordered_map<uint64_t, CachableEntry<ParsedFullFilterBlock>>
filter_map_; filter_map_;
}; };

@ -114,10 +114,12 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
Statistics* kNullStats = nullptr; Statistics* kNullStats = nullptr;
CachableEntry<Block> index_block; CachableEntry<Block> index_block;
Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */, {
&lookup_context, &index_block); Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */,
if (!s.ok()) { &lookup_context, &index_block);
return s; if (!s.ok()) {
return s;
}
} }
// We don't return pinned data from index blocks, so no need // We don't return pinned data from index blocks, so no need
@ -149,23 +151,30 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer, rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer,
false /*Implicit auto readahead*/); false /*Implicit auto readahead*/);
IOOptions opts; IOOptions opts;
s = rep->file->PrepareIOOptions(ro, opts); {
if (s.ok()) { Status s = rep->file->PrepareIOOptions(ro, opts);
s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off, if (s.ok()) {
static_cast<size_t>(prefetch_len)); s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
} static_cast<size_t>(prefetch_len));
if (!s.ok()) { }
return s; if (!s.ok()) {
return s;
}
} }
// For saving "all or nothing" to partition_map_
std::unordered_map<uint64_t, CachableEntry<Block>> map_in_progress;
// After prefetch, read the partitions one by one // After prefetch, read the partitions one by one
biter.SeekToFirst(); biter.SeekToFirst();
size_t partition_count = 0;
for (; biter.Valid(); biter.Next()) { for (; biter.Valid(); biter.Next()) {
handle = biter.value().handle; handle = biter.value().handle;
CachableEntry<Block> block; CachableEntry<Block> block;
++partition_count;
// TODO: Support counter batch update for partitioned index and // TODO: Support counter batch update for partitioned index and
// filter blocks // filter blocks
s = table()->MaybeReadBlockAndLoadToCache( Status s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
/*wait=*/true, &block, BlockType::kIndex, /*get_context=*/nullptr, /*wait=*/true, &block, BlockType::kIndex, /*get_context=*/nullptr,
&lookup_context, /*contents=*/nullptr); &lookup_context, /*contents=*/nullptr);
@ -174,14 +183,22 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
return s; return s;
} }
if (block.GetValue() != nullptr) { if (block.GetValue() != nullptr) {
// Might need to "pin" some mmap-read blocks (GetOwnValue) if some
// partitions are successfully compressed (cached) and some are not
// compressed (mmap eligible)
if (block.IsCached() || block.GetOwnValue()) { if (block.IsCached() || block.GetOwnValue()) {
if (pin) { if (pin) {
partition_map_[handle.offset()] = std::move(block); map_in_progress[handle.offset()] = std::move(block);
} }
} }
} }
} }
return biter.status(); Status s = biter.status();
// Save (pin) them only if everything checks out
if (map_in_progress.size() == partition_count && s.ok()) {
std::swap(partition_map_, map_in_progress);
}
return s;
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -46,6 +46,9 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
CachableEntry<Block>&& index_block) CachableEntry<Block>&& index_block)
: IndexReaderCommon(t, std::move(index_block)) {} : IndexReaderCommon(t, std::move(index_block)) {}
// For partition blocks pinned in cache. This is expected to be "all or
// none" so that !partition_map_.empty() can use an iterator expecting
// all partitions to be saved here.
std::unordered_map<uint64_t, CachableEntry<Block>> partition_map_; std::unordered_map<uint64_t, CachableEntry<Block>> partition_map_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -201,6 +201,10 @@ void TwoLevelIndexIterator::InitDataBlock() {
state_->NewSecondaryIterator(handle); state_->NewSecondaryIterator(handle);
data_block_handle_ = handle; data_block_handle_ = handle;
SetSecondLevelIterator(iter); SetSecondLevelIterator(iter);
if (iter == nullptr) {
status_ = Status::Corruption("Missing block for partition " +
handle.ToString());
}
} }
} }
} }

Loading…
Cancel
Save