You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

781 lines
32 KiB

// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "util/async_file_reader.h"
#include "util/coro_utils.h"
#if defined(WITHOUT_COROUTINES) || \
// This function reads multiple data blocks from disk using Env::MultiRead()
// and optionally inserts them into the block cache. It uses the scratch
// buffer provided by the caller, which is contiguous. If scratch is a nullptr
// it allocates a separate buffer for each block. Typically, if the blocks
// need to be uncompressed and there is no compressed block cache, callers
// can allocate a temporary scratch buffer in order to minimize memory
// allocations.
// If options.fill_cache is true, it inserts the blocks into cache. If its
// false and scratch is non-null and the blocks are uncompressed, it copies
// the buffers to heap. In any case, the CachableEntry<Block> returned will
// own the data bytes.
// If compression is enabled and also there is no compressed block cache,
// the adjacent blocks are read out in one IO (combined read)
// batch - A MultiGetRange with only those keys with unique data blocks not
// found in cache
// handles - A vector of block handles. Some of them me be NULL handles
// scratch - An optional contiguous buffer to read compressed blocks into
DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
(const ReadOptions& options, const MultiGetRange* batch,
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
Status* statuses, CachableEntry<Block_kData>* results, char* scratch,
const UncompressionDict& uncompression_dict, bool use_fs_scratch) const {
RandomAccessFileReader* file = rep_->file.get();
const Footer& footer = rep_->footer;
const ImmutableOptions& ioptions = rep_->ioptions;
size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);
if (ioptions.allow_mmap_reads) {
size_t idx_in_batch = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
// XXX: use_cache=true means double cache query?
statuses[idx_in_batch] =
RetrieveBlock(nullptr, options, handle, uncompression_dict,
mget_iter->get_context, /* lookup_context */ nullptr,
/* for_compaction */ false, /* use_cache */ true,
/* async_read */ false);
assert(idx_in_batch == handles->size());
// In direct IO mode, blocks share the direct io buffer.
// Otherwise, blocks share the scratch buffer.
const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr;
autovector<FSReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
size_t buf_offset = 0;
size_t idx_in_batch = 0;
uint64_t prev_offset = 0;
size_t prev_len = 0;
autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_idx_for_block;
autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_offset_for_block;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
size_t prev_end = static_cast<size_t>(prev_offset) + prev_len;
// If current block is adjacent to the previous one, at the same time,
// compression is enabled and there is no compressed cache, we combine
// the two block read as one.
// We don't combine block reads here in direct IO mode, because when doing
// direct IO read, the block requests will be realigned and merged when
// necessary.
if ((use_shared_buffer || use_fs_scratch) && !file->use_direct_io() &&
prev_end == handle.offset()) {
prev_len += BlockSizeWithTrailer(handle);
} else {
// No compression or current block and previous one is not adjacent:
// Step 1, create a new request for previous blocks
if (prev_len != 0) {
FSReadRequest req;
req.offset = prev_offset;
req.len = prev_len;
if (file->use_direct_io() || use_fs_scratch) {
req.scratch = nullptr;
} else if (use_shared_buffer) {
req.scratch = scratch + buf_offset;
buf_offset += req.len;
} else {
req.scratch = new char[req.len];
// Step 2, remember the previous block info
prev_offset = handle.offset();
prev_len = BlockSizeWithTrailer(handle);
PERF_COUNTER_ADD(block_read_count, 1);
PERF_COUNTER_ADD(block_read_byte, BlockSizeWithTrailer(handle));
// Handle the last block and process the pending last request
if (prev_len != 0) {
FSReadRequest req;
req.offset = prev_offset;
req.len = prev_len;
if (file->use_direct_io() || use_fs_scratch) {
req.scratch = nullptr;
} else if (use_shared_buffer) {
req.scratch = scratch + buf_offset;
} else {
req.scratch = new char[req.len];
AlignedBuf direct_io_buf;
IOOptions opts;
IOStatus s = file->PrepareIOOptions(options, opts);
if (s.ok()) {
#if defined(WITH_COROUTINES)
if (file->use_direct_io()) {
s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(),
&direct_io_buf, options.rate_limiter_priority);
#if defined(WITH_COROUTINES)
} else {
co_await batch->context()->reader().MultiReadAsync(
file, opts, &read_reqs[0], read_reqs.size(), &direct_io_buf);
if (!s.ok()) {
// Discard all the results in this batch if there is any time out
// or overall MultiRead error
for (FSReadRequest& req : read_reqs) {
req.status = s;
idx_in_batch = 0;
size_t valid_batch_idx = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
assert(valid_batch_idx < req_idx_for_block.size());
assert(valid_batch_idx < req_offset_for_block.size());
assert(req_idx_for_block[valid_batch_idx] < read_reqs.size());
size_t& req_idx = req_idx_for_block[valid_batch_idx];
size_t& req_offset = req_offset_for_block[valid_batch_idx];
FSReadRequest& req = read_reqs[req_idx];
Status s = req.status;
if (s.ok()) {
if ((req.result.size() != req.len) ||
(req_offset + BlockSizeWithTrailer(handle) > req.result.size())) {
s = Status::Corruption("truncated block read from " +
rep_->file->file_name() + " offset " +
std::to_string(handle.offset()) + ", expected " +
std::to_string(req.len) + " bytes, got " +
BlockContents serialized_block;
if (s.ok()) {
if (use_fs_scratch) {
serialized_block =
BlockContents(Slice( + req_offset, handle.size()));
} else if (!use_shared_buffer) {
// We allocated a buffer for this block. Give ownership of it to
// BlockContents so it can free the memory
assert( == req.scratch);
assert(req.result.size() == BlockSizeWithTrailer(handle));
assert(req_offset == 0);
serialized_block =
BlockContents(std::unique_ptr<char[]>(req.scratch), handle.size());
} else {
// We used the scratch buffer or direct io buffer
// which are shared by the blocks.
// serialized_block does not have the ownership.
serialized_block =
BlockContents(Slice( + req_offset, handle.size()));
#ifndef NDEBUG
serialized_block.has_trailer = true;
if (options.verify_checksums) {
const char* data =;
// Since the scratch might be shared, the offset of the data block in
// the buffer might not be 0. only point to the
// begin address of each read request, we need to add the offset
// in each read request. Checksum is stored in the block trailer,
// beyond the payload size.
s = VerifyBlockChecksum(footer, data + req_offset, handle.size(),
rep_->file->file_name(), handle.offset());
TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
} else if (!use_shared_buffer) {
// Free the allocated scratch buffer.
delete[] req.scratch;
if (s.ok()) {
// When the blocks share the same underlying buffer (scratch or direct io
// buffer), we may need to manually copy the block into heap if the
// serialized block has to be inserted into a cache. That falls into the
// following cases -
// 1. serialized block is not compressed, it needs to be inserted into
// the uncompressed block cache if there is one
// 2. If the serialized block is compressed, it needs to be inserted
// into the compressed block cache if there is one
// In all other cases, the serialized block is either uncompressed into a
// heap buffer or there is no cache at all.
CompressionType compression_type =
if ((use_fs_scratch || use_shared_buffer) &&
compression_type == kNoCompression) {
Slice serialized =
Slice( + req_offset, BlockSizeWithTrailer(handle));
serialized_block = BlockContents(
#ifndef NDEBUG
serialized_block.has_trailer = true;
if (s.ok()) {
if (options.fill_cache) {
CachableEntry<Block_kData>* block_entry = &results[idx_in_batch];
// MaybeReadBlockAndLoadToCache will insert into the block caches if
// necessary. Since we're passing the serialized block contents, it
// will avoid looking up the block cache
s = MaybeReadBlockAndLoadToCache(
nullptr, options, handle, uncompression_dict,
/*for_compaction=*/false, block_entry, mget_iter->get_context,
/*lookup_context=*/nullptr, &serialized_block,
// block_entry value could be null if no block cache is present, i.e
// BlockBasedTableOptions::no_block_cache is true and no compressed
// block cache is configured. In that case, fall
// through and set up the block explicitly
if (block_entry->GetValue() != nullptr) {
CompressionType compression_type =
BlockContents contents;
if (compression_type != kNoCompression) {
UncompressionContext context(compression_type);
UncompressionInfo info(context, uncompression_dict, compression_type);
s = UncompressSerializedBlock(
info, + req_offset, handle.size(), &contents,
footer.format_version(), rep_->ioptions, memory_allocator);
} else {
// There are two cases here:
// 1) caller uses the shared buffer (scratch or direct io buffer);
// 2) we use the requst buffer.
// If scratch buffer or direct io buffer is used, we ensure that
// all serialized blocks are copyed to the heap as single blocks. If
// scratch buffer is not used, we also have no combined read, so the
// serialized block can be used directly.
contents = std::move(serialized_block);
if (s.ok()) {
std::move(contents), read_amp_bytes_per_bit, ioptions.stats));
statuses[idx_in_batch] = s;
if (use_fs_scratch) {
// Free the allocated scratch buffer by fs here as read requests might have
// been combined into one.
for (FSReadRequest& req : read_reqs) {
if (req.fs_scratch != nullptr) {
req.fs_scratch = nullptr;
using MultiGetRange = MultiGetContext::Range;
DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
(const ReadOptions& read_options, const MultiGetRange* mget_range,
const SliceTransform* prefix_extractor, bool skip_filters) {
if (mget_range->empty()) {
// Caller should ensure non-empty (performance bug)
CO_RETURN; // Nothing to do
FilterBlockReader* const filter =
!skip_filters ? rep_->filter.get() : nullptr;
MultiGetRange sst_file_range(*mget_range, mget_range->begin(),
// First check the full filter
// If full filter not useful, Then go into each block
const bool no_io = read_options.read_tier == kBlockCacheTier;
uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
if (sst_file_range.begin()->get_context) {
tracing_mget_id = sst_file_range.begin()->get_context->get_tracing_get_id();
// TODO: need more than one lookup_context here to track individual filter
// and index partition hits and misses.
BlockCacheLookupContext metadata_lookup_context{
TableReaderCaller::kUserMultiGet, tracing_mget_id,
/*_get_from_user_specified_snapshot=*/read_options.snapshot != nullptr};
FullFilterKeysMayMatch(filter, &sst_file_range, no_io, prefix_extractor,
&metadata_lookup_context, read_options);
if (!sst_file_range.empty()) {
IndexBlockIter iiter_on_stack;
// if prefix_extractor found in block differs from options, disable
// BlockPrefixIndex. Only do this check when index_type is kHashSearch.
bool need_upper_bound_check = false;
if (rep_->index_type == BlockBasedTableOptions::kHashSearch) {
need_upper_bound_check = PrefixExtractorChanged(prefix_extractor);
auto iiter = NewIndexIterator(
read_options, need_upper_bound_check, &iiter_on_stack,
sst_file_range.begin()->get_context, &metadata_lookup_context);
std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
if (iiter != &iiter_on_stack) {
uint64_t prev_offset = std::numeric_limits<uint64_t>::max();
autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
std::array<CachableEntry<Block_kData>, MultiGetContext::MAX_BATCH_SIZE>
std::array<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
// Empty data_lookup_contexts means "unused," when block cache tracing is
// disabled. (Limited options as element type is not default contructible.)
std::vector<BlockCacheLookupContext> data_lookup_contexts;
MultiGetContext::Mask reused_mask = 0;
char stack_buf[kMultiGetReadStackBufSize];
std::unique_ptr<char[]> block_buf;
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
// Awkward because BlockCacheLookupContext is not CopyAssignable
for (size_t i = 0; i < MultiGetContext::MAX_BATCH_SIZE; ++i) {
MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
CachableEntry<UncompressionDict> uncompression_dict;
Status uncompression_dict_status;
bool uncompression_dict_inited = false;
size_t total_len = 0;
// GetContext for any key will do, as the stats will be aggregated
// anyway
GetContext* get_context = sst_file_range.begin()->get_context;
using BCI = BlockCacheInterface<Block_kData>;
BCI block_cache{rep_->table_options.block_cache.get()};
std::array<BCI::TypedAsyncLookupHandle, MultiGetContext::MAX_BATCH_SIZE>
std::array<CacheKey, MultiGetContext::MAX_BATCH_SIZE> cache_keys;
size_t cache_lookup_count = 0;
for (auto miter = data_block_range.begin();
miter != data_block_range.end(); ++miter) {
const Slice& key = miter->ikey;
IndexValue v;
if (iiter->Valid()) {
v = iiter->value();
if (!iiter->Valid() ||
(!v.first_internal_key.empty() && !skip_filters &&
ExtractUserKey(v.first_internal_key)) < 0)) {
// The requested key falls between highest key in previous block and
// lowest key in current block.
if (!iiter->status().IsNotFound()) {
*(miter->s) = iiter->status();
if (!uncompression_dict_inited && rep_->uncompression_dict_reader) {
uncompression_dict_status =
nullptr /* prefetch_buffer */, read_options, no_io,
read_options.verify_checksums, get_context,
&metadata_lookup_context, &uncompression_dict);
uncompression_dict_inited = true;
if (!uncompression_dict_status.ok()) {
*(miter->s) = uncompression_dict_status;
if (v.handle.offset() == prev_offset) {
// This key can reuse the previous block (later on).
// Mark previous as "reused"
reused_mask |= MultiGetContext::Mask{1}
<< (block_handles.size() - 1);
// Use null handle to indicate this one reuses same block as
// previous.
prev_offset = v.handle.offset();
if (block_cache) {
// Lookup the cache for the given data block referenced by an index
// iterator value (i.e BlockHandle). If it exists in the cache,
// initialize block to the contents of the data block.
// An async version of MaybeReadBlockAndLoadToCache /
// GetDataBlockFromCache
BCI::TypedAsyncLookupHandle& async_handle =
cache_keys[cache_lookup_count] =
GetCacheKey(rep_->base_cache_key, v.handle);
async_handle.key = cache_keys[cache_lookup_count].AsSlice();
// NB: StartAsyncLookupFull populates async_handle.helper
async_handle.create_context = &rep_->create_context;
async_handle.priority = GetCachePriority<Block_kData>();
async_handle.stats = rep_->ioptions.statistics.get();
async_handle, rep_->ioptions.lowest_used_cache_tier);
// TODO: stats?
if (block_cache) {
block_cache.get()->WaitAll(&async_handles[0], cache_lookup_count);
size_t lookup_idx = 0;
for (size_t i = 0; i < block_handles.size(); ++i) {
// If this block was a success or failure or not needed because
// the corresponding key is in the same block as a prior key, skip
if (block_handles[i] == BlockHandle::NullBlockHandle()) {
if (!block_cache) {
total_len += BlockSizeWithTrailer(block_handles[i]);
} else {
BCI::TypedHandle* h = async_handles[lookup_idx].Result();
if (h) {
// Cache hit
results[i].SetCachedValue(block_cache.Value(h), block_cache.get(),
// Don't need to fetch
block_handles[i] = BlockHandle::NullBlockHandle();
UpdateCacheHitMetrics(BlockType::kData, get_context,
} else {
// Cache miss
total_len += BlockSizeWithTrailer(block_handles[i]);
UpdateCacheMissMetrics(BlockType::kData, get_context);
if (!data_lookup_contexts.empty()) {
// Populate cache key before it's discarded
data_lookup_contexts[i].block_key =
assert(lookup_idx == cache_lookup_count);
if (total_len) {
char* scratch = nullptr;
bool use_fs_scratch = false;
const UncompressionDict& dict = uncompression_dict.GetValue()
? *uncompression_dict.GetValue()
: UncompressionDict::GetEmptyDict();
assert(uncompression_dict_inited || !rep_->uncompression_dict_reader);
if (!rep_->file->use_direct_io()) {
if (CheckFSFeatureSupport(rep_->ioptions.fs.get(),
FSSupportedOps::kFSBuffer)) {
use_fs_scratch = true;
// If using direct IO, then scratch is not used, so keep it nullptr.
// If the blocks need to be uncompressed and we don't need the
// compressed blocks, then we can use a contiguous block of
// memory to read in all the blocks as it will be temporary
// storage
// 1. If blocks are compressed and compressed block cache is there,
// alloc heap bufs
// 2. If blocks are uncompressed, alloc heap bufs
// 3. If blocks are compressed and no compressed block cache, use
// stack buf
if (!use_fs_scratch && !rep_->file->use_direct_io() &&
rep_->blocks_maybe_compressed) {
if (total_len <= kMultiGetReadStackBufSize) {
scratch = stack_buf;
} else {
scratch = new char[total_len];
(read_options, &data_block_range, &block_handles, &statuses[0],
&results[0], scratch, dict, use_fs_scratch);
if (get_context) {
DataBlockIter first_biter;
DataBlockIter next_biter;
size_t idx_in_batch = 0;
SharedCleanablePtr shared_cleanable;
for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
++miter) {
Status s;
GetContext* get_context = miter->get_context;
const Slice& key = miter->ikey;
bool matched = false; // if such user key matched a key in SST
bool done = false;
bool first_block = true;
do {
DataBlockIter* biter = nullptr;
uint64_t referenced_data_size = 0;
Block_kData* parsed_block_value = nullptr;
bool reusing_prev_block;
bool later_reused;
bool does_referenced_key_exist = false;
bool handle_present = false;
BlockCacheLookupContext* lookup_data_block_context =
data_lookup_contexts.empty() ? nullptr
: &data_lookup_contexts[idx_in_batch];
if (first_block) {
handle_present = !block_handles[idx_in_batch].IsNull();
parsed_block_value = results[idx_in_batch].GetValue();
if (handle_present || parsed_block_value) {
read_options, results[idx_in_batch].As<Block>(), &first_biter,
reusing_prev_block = false;
} else {
// If handle is null and result is empty, then the status is never
// set, which should be the initial value: ok().
reusing_prev_block = true;
biter = &first_biter;
later_reused =
(reused_mask & (MultiGetContext::Mask{1} << idx_in_batch)) != 0;
} else {
IndexValue v = iiter->value();
if (!v.first_internal_key.empty() && !skip_filters &&
ExtractUserKey(v.first_internal_key)) < 0) {
// The requested key falls between highest key in previous block and
// lowest key in current block.
Status tmp_s;
read_options, iiter->value().handle, &next_biter,
BlockType::kData, get_context, lookup_data_block_context,
/* prefetch_buffer= */ nullptr, /* for_compaction = */ false,
/*async_read = */ false, tmp_s);
biter = &next_biter;
reusing_prev_block = false;
later_reused = false;
if (read_options.read_tier == kBlockCacheTier &&
biter->status().IsIncomplete()) {
// couldn't get block from block_cache
// Update Saver.state to Found because we are only looking for
// whether we can guarantee the key is not there when "no_io" is set
if (!biter->status().ok()) {
s = biter->status();
// Reusing blocks complicates pinning/Cleanable, because the cache
// entry referenced by biter can only be released once all returned
// pinned values are released. This code previously did an extra
// block_cache Ref for each reuse, but that unnecessarily increases
// block cache contention. Instead we can use a variant of shared_ptr
// to release in block cache only once.
// Although the biter loop below might SaveValue multiple times for
// merges, just one value_pinner suffices, as MultiGet will merge
// the operands before returning to the API user.
Cleanable* value_pinner;
if (biter->IsValuePinned()) {
if (reusing_prev_block) {
// Note that we don't yet know if the MultiGet results will need
// to pin this block, so we might wrap a block for sharing and
// still end up with 1 (or 0) pinning ref. Not ideal but OK.
// Here we avoid adding redundant cleanups if we didn't end up
// delegating the cleanup from last time around.
if (!biter->HasCleanups()) {
if (later_reused) {
} else {
} else if (later_reused) {
// Make the existing cleanups on `biter` sharable:
// Move existing `biter` cleanup(s) to `shared_cleanable`
// Reference `shared_cleanable` as new cleanup for `biter`
value_pinner = biter;
} else {
value_pinner = nullptr;
bool may_exist = biter->SeekForGet(key);
if (!may_exist) {
// HashSeek cannot find the key this block and the the iter is not
// the end of the block, i.e. cannot be in the following blocks
// either. In this case, the seek_key cannot be found, so we break
// from the top level for-loop.
// Call the *saver function on each entry/block until it returns false
for (; biter->Valid(); biter->Next()) {
ParsedInternalKey parsed_key;
Status pik_status = ParseInternalKey(
biter->key(), &parsed_key, false /* log_err_key */); // TODO
if (!pik_status.ok()) {
s = pik_status;
if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
value_pinner)) {
if (get_context->State() == GetContext::GetState::kFound) {
does_referenced_key_exist = true;
referenced_data_size =
biter->key().size() + biter->value().size();
done = true;
s = biter->status();
// Write the block cache access.
// XXX: There appear to be 'break' statements above that bypass this
// writing of the block cache trace record
if (lookup_data_block_context && !reusing_prev_block && first_block) {
Slice referenced_key;
if (does_referenced_key_exist) {
referenced_key = biter->key();
} else {
referenced_key = key;
// block_key is self-assigned here (previously assigned from
// cache_keys / async_handles, now out of scope)
read_options, parsed_block_value,
*lookup_data_block_context, lookup_data_block_context->block_key,
referenced_key, does_referenced_key_exist, referenced_data_size);
s = biter->status();
if (done) {
// Avoid the extra Next which is expensive in two-level indexes
if (first_block) {
if (!iiter->Valid()) {
first_block = false;
} while (iiter->Valid());
if (matched && filter != nullptr) {
if (rep_->whole_key_filtering) {
RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_TRUE_POSITIVE);
} else {
RecordTick(rep_->ioptions.stats, BLOOM_FILTER_PREFIX_TRUE_POSITIVE);
// Includes prefix stats
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
if (s.ok() && !iiter->status().IsNotFound()) {
s = iiter->status();
*(miter->s) = s;
// Not sure why we need to do it. Should investigate more.
for (auto& st : statuses) {
} // namespace ROCKSDB_NAMESPACE