MultiGet parallel IO (#5464)

Summary:
Enhancement to MultiGet batching to read data blocks required for keys in a batch in parallel from disk. It uses Env::MultiRead() API to read multiple blocks and reduce latency.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5464

Test Plan:
1. make check
2. make asan_check
3. make asan_crash

Differential Revision: D15911771

Pulled By: anand1976

fbshipit-source-id: 605036b9af0f90ca0020dc87c3a86b4da6e83394
main
anand76 6 years ago committed by Facebook Github Bot
parent 68b46a2e36
commit 7259e28d91
  1. 1
      HISTORY.md
  2. 289
      db/db_basic_test.cc
  3. 461
      table/block_based/block_based_table_reader.cc
  4. 23
      table/block_based/block_based_table_reader.h
  5. 2
      table/format.h
  6. 43
      util/file_reader_writer.cc
  7. 2
      util/file_reader_writer.h

@ -25,6 +25,7 @@
* DBIter::Next() can skip user key checking if previous entry's seqnum is 0.
* Merging iterator to avoid child iterator reseek for some cases
* Log Writer will flush after finishing the whole record, rather than a fragment.
* Lower MultiGet batching API latency by reading data blocks from disk in parallel
### General Improvements
* Added new status code kColumnFamilyDropped to distinguish between Column Family Dropped and DB Shutdown in progress.

@ -10,6 +10,7 @@
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/perf_context.h"
#include "table/block_based/block_builder.h"
#include "test_util/fault_injection_test_env.h"
#if !defined(ROCKSDB_LITE)
#include "test_util/sync_point.h"
@ -1285,6 +1286,294 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
}
}
class DBBasicTestWithParallelIO
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool,bool,bool,bool>> {
public:
DBBasicTestWithParallelIO()
: DBTestBase("/db_basic_test_with_parallel_io") {
bool compressed_cache = std::get<0>(GetParam());
bool uncompressed_cache = std::get<1>(GetParam());
compression_enabled_ = std::get<2>(GetParam());
fill_cache_ = std::get<3>(GetParam());
if (compressed_cache) {
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
compressed_cache_ = std::make_shared<MyBlockCache>(cache);
}
if (uncompressed_cache) {
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
}
env_->count_random_reads_ = true;
Options options = CurrentOptions();
Random rnd(301);
BlockBasedTableOptions table_options;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.block_cache = uncompressed_cache_;
table_options.block_cache_compressed = compressed_cache_;
table_options.flush_block_policy_factory.reset(
new MyFlushBlockPolicyFactory());
options.table_factory.reset(new BlockBasedTableFactory(table_options));
if (!compression_enabled_) {
options.compression = kNoCompression;
}
Reopen(options);
std::string zero_str(128, '\0');
for (int i = 0; i < 100; ++i) {
// Make the value compressible. A purely random string doesn't compress
// and the resultant data block will not be compressed
values_.emplace_back(RandomString(&rnd, 128) + zero_str);
assert(Put(Key(i), values_[i]) == Status::OK());
}
Flush();
}
bool CheckValue(int i, const std::string& value) {
if (values_[i].compare(value) == 0) {
return true;
}
return false;
}
int num_lookups() { return uncompressed_cache_->num_lookups(); }
int num_found() { return uncompressed_cache_->num_found(); }
int num_inserts() { return uncompressed_cache_->num_inserts(); }
int num_lookups_compressed() {
return compressed_cache_->num_lookups();
}
int num_found_compressed() {
return compressed_cache_->num_found();
}
int num_inserts_compressed() {
return compressed_cache_->num_inserts();
}
bool fill_cache() { return fill_cache_; }
static void SetUpTestCase() {}
static void TearDownTestCase() {}
private:
class MyFlushBlockPolicyFactory
: public FlushBlockPolicyFactory {
public:
MyFlushBlockPolicyFactory() {}
virtual const char* Name() const override {
return "MyFlushBlockPolicyFactory";
}
virtual FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& /*table_options*/,
const BlockBuilder& data_block_builder) const override {
return new MyFlushBlockPolicy(data_block_builder);
}
};
class MyFlushBlockPolicy
: public FlushBlockPolicy {
public:
explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
: num_keys_(0), data_block_builder_(data_block_builder) {}
bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
if (data_block_builder_.empty()) {
// First key in this block
num_keys_ = 1;
return false;
}
// Flush every 10 keys
if (num_keys_ == 10) {
num_keys_ = 1;
return true;
}
num_keys_++;
return false;
}
private:
int num_keys_;
const BlockBuilder& data_block_builder_;
};
class MyBlockCache
: public Cache {
public:
explicit MyBlockCache(std::shared_ptr<Cache>& target)
: target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {}
virtual const char* Name() const override { return "MyBlockCache"; }
virtual Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
num_inserts_++;
return target_->Insert(key, value, charge, deleter, handle, priority);
}
virtual Handle* Lookup(const Slice& key,
Statistics* stats = nullptr) override {
num_lookups_++;
Handle* handle = target_->Lookup(key, stats);
if (handle != nullptr) {
num_found_++;
}
return handle;
}
virtual bool Ref(Handle* handle) override {
return target_->Ref(handle);
}
virtual bool Release(Handle* handle, bool force_erase = false) override {
return target_->Release(handle, force_erase);
}
virtual void* Value(Handle* handle) override {
return target_->Value(handle);
}
virtual void Erase(const Slice& key) override {
target_->Erase(key);
}
virtual uint64_t NewId() override {
return target_->NewId();
}
virtual void SetCapacity(size_t capacity) override {
target_->SetCapacity(capacity);
}
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
target_->SetStrictCapacityLimit(strict_capacity_limit);
}
virtual bool HasStrictCapacityLimit() const override {
return target_->HasStrictCapacityLimit();
}
virtual size_t GetCapacity() const override {
return target_->GetCapacity();
}
virtual size_t GetUsage() const override {
return target_->GetUsage();
}
virtual size_t GetUsage(Handle* handle) const override {
return target_->GetUsage(handle);
}
virtual size_t GetPinnedUsage() const override {
return target_->GetPinnedUsage();
}
virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; }
virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe) override {
return target_->ApplyToAllCacheEntries(callback, thread_safe);
}
virtual void EraseUnRefEntries() override {
return target_->EraseUnRefEntries();
}
int num_lookups() { return num_lookups_; }
int num_found() { return num_found_; }
int num_inserts() { return num_inserts_; }
private:
std::shared_ptr<Cache> target_;
int num_lookups_;
int num_found_;
int num_inserts_;
};
std::shared_ptr<MyBlockCache> compressed_cache_;
std::shared_ptr<MyBlockCache> uncompressed_cache_;
bool compression_enabled_;
std::vector<std::string> values_;
bool fill_cache_;
};
TEST_P(DBBasicTestWithParallelIO, MultiGet) {
std::vector<std::string> key_data(10);
std::vector<Slice> keys;
// We cannot resize a PinnableSlice vector, so just set initial size to
// largest we think we will need
std::vector<PinnableSlice> values(10);
std::vector<Status> statuses;
ReadOptions ro;
ro.fill_cache = fill_cache();
// Warm up the cache first
key_data.emplace_back(Key(0));
keys.emplace_back(Slice(key_data.back()));
key_data.emplace_back(Key(50));
keys.emplace_back(Slice(key_data.back()));
statuses.resize(keys.size());
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(0, values[0].ToString()));
ASSERT_TRUE(CheckValue(50, values[1].ToString()));
int random_reads = env_->random_read_counter_.Read();
key_data[0] = Key(1);
key_data[1] = Key(51);
keys[0] = Slice(key_data[0]);
keys[1] = Slice(key_data[1]);
values[0].Reset();
values[1].Reset();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(1, values[0].ToString()));
ASSERT_TRUE(CheckValue(51, values[1].ToString()));
int expected_reads = random_reads + (fill_cache() ? 0 : 2);
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
keys.resize(10);
statuses.resize(10);
std::vector<int> key_ints{1,2,15,16,55,81,82,83,84,85};
for (size_t i = 0; i < key_ints.size(); ++i) {
key_data[i] = Key(key_ints[i]);
keys[i] = Slice(key_data[i]);
statuses[i] = Status::OK();
values[i].Reset();
}
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
for (size_t i = 0; i < key_ints.size(); ++i) {
ASSERT_OK(statuses[i]);
ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
}
expected_reads += (fill_cache() ? 2 : 4);
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
}
INSTANTIATE_TEST_CASE_P(
ParallelIO, DBBasicTestWithParallelIO,
// Params are as follows -
// Param 0 - Compressed cache enabled
// Param 1 - Uncompressed cache enabled
// Param 2 - Data compression enabled
// Param 3 - ReadOptions::fill_cache
::testing::Values(std::make_tuple(false, true, true, true),
std::make_tuple(true, true, true, true),
std::make_tuple(false, true, false, true),
std::make_tuple(false, true, true, false),
std::make_tuple(true, true, true, false),
std::make_tuple(false, true, false, false)));
class DBBasicTestWithTimestampWithParam
: public DBTestBase,
public testing::WithParamInterface<bool> {

@ -160,6 +160,13 @@ bool PrefixExtractorChanged(const TableProperties* table_properties,
}
}
CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) {
CacheAllocationPtr heap_buf;
heap_buf = AllocateBlock(buf.size(), allocator);
memcpy(heap_buf.get(), buf.data(), buf.size());
return heap_buf;
}
} // namespace
// Encapsulates common functionality for the various index reader
@ -421,7 +428,8 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
// filter blocks
s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
&block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context);
&block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context,
/*contents=*/nullptr);
assert(s.ok() || block.GetValue() == nullptr);
if (s.ok() && block.GetValue() != nullptr) {
@ -1745,8 +1753,6 @@ Status BlockBasedTable::PutDataBlockToCache(
: Cache::Priority::LOW;
assert(cached_block);
assert(cached_block->IsEmpty());
assert(raw_block_comp_type == kNoCompression ||
block_cache_compressed != nullptr);
Status s;
Statistics* statistics = ioptions.statistics;
@ -2195,11 +2201,105 @@ IndexBlockIter* BlockBasedTable::InitBlockIterator<IndexBlockIter>(
rep->index_value_is_full, block_contents_pinned);
}
// Convert an uncompressed data block (i.e CachableEntry<Block>)
// into an iterator over the contents of the corresponding block.
// If input_iter is null, new a iterator
// If input_iter is not null, update this iter and return it
template <typename TBlockIter>
TBlockIter* BlockBasedTable::NewDataBlockIterator(
const ReadOptions& ro, CachableEntry<Block>& block, TBlockIter* input_iter,
Status s) const {
PERF_TIMER_GUARD(new_table_block_iter_nanos);
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
assert(block.GetValue() != nullptr);
// Block contents are pinned and it is still pinned after the iterator
// is destroyed as long as cleanup functions are moved to another object,
// when:
// 1. block cache handle is set to be released in cleanup function, or
// 2. it's pointing to immortal source. If own_bytes is true then we are
// not reading data from the original source, whether immortal or not.
// Otherwise, the block is pinned iff the source is immortal.
const bool block_contents_pinned =
block.IsCached() ||
(!block.GetValue()->own_bytes() && rep_->immortal_table);
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), iter,
block_contents_pinned);
if (!block.IsCached()) {
if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
// insert a dummy record to block cache to track the memory usage
Cache* const block_cache = rep_->table_options.block_cache.get();
Cache::Handle* cache_handle = nullptr;
// There are two other types of cache keys: 1) SST cache key added in
// `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
// `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
// from SST cache key(31 bytes), and use non-zero prefix to
// differentiate from `write_buffer_manager`
const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
// Prefix: use rep_->cache_key_prefix padded by 0s
memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
assert(rep_->cache_key_prefix_size != 0);
assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
next_cache_key_id_++);
assert(end - cache_key <=
static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
s = block_cache->Insert(unique_key, nullptr,
block.GetValue()->ApproximateMemoryUsage(),
nullptr, &cache_handle);
if (s.ok()) {
assert(cache_handle != nullptr);
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
cache_handle);
}
}
} else {
iter->SetCacheHandle(block.GetCacheHandle());
}
block.TransferTo(iter);
return iter;
}
// Lookup the cache for the given data block referenced by an index iterator
// value (i.e BlockHandle). If it exists in the cache, initialize block to
// the contents of the data block.
Status BlockBasedTable::GetDataBlockFromCache(
const ReadOptions& ro, const BlockHandle& handle,
const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block, BlockType block_type,
GetContext* get_context) const {
BlockCacheLookupContext lookup_data_block_context(
TableReaderCaller::kUserMultiGet);
Status s = RetrieveBlock(nullptr, ro, handle, uncompression_dict, block,
block_type, get_context, &lookup_data_block_context);
if (s.IsIncomplete()) {
s = Status::OK();
}
return s;
}
// If contents is nullptr, this function looks up the block caches for the
// data block referenced by handle, and read the block from disk if necessary.
// If contents is non-null, it skips the cache lookup and disk read, since
// the caller has already read it. In both cases, if ro.fill_cache is true,
// it inserts the block into the block cache.
Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context) const {
GetContext* get_context, BlockCacheLookupContext* lookup_context,
BlockContents* contents) const {
assert(block_entry != nullptr);
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep_->table_options.block_cache.get();
@ -2231,6 +2331,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
compressed_cache_key);
}
if (!contents) {
s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
ro, block_entry, uncompression_dict, block_type,
get_context);
@ -2239,6 +2340,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// compressed block cache.
is_cache_hit = true;
}
}
// Can't find the block from the cache. If I/O is allowed, read from the
// file.
if (block_entry->GetValue() == nullptr && !no_io && ro.fill_cache) {
@ -2248,7 +2351,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
block_cache_compressed == nullptr && rep_->blocks_maybe_compressed;
CompressionType raw_block_comp_type;
BlockContents raw_block_contents;
{
if (!contents) {
StopWatch sw(rep_->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
BlockFetcher block_fetcher(
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle,
@ -2259,6 +2362,9 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
GetMemoryAllocatorForCompressedBlock(rep_->table_options));
s = block_fetcher.ReadBlockContents();
raw_block_comp_type = block_fetcher.get_compression_type();
contents = &raw_block_contents;
} else {
raw_block_comp_type = contents->get_compression_type();
}
if (s.ok()) {
@ -2266,7 +2372,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// If filling cache is allowed and a cache is configured, try to put the
// block to the cache.
s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed,
block_entry, &raw_block_contents,
block_entry, contents,
raw_block_comp_type, uncompression_dict, seq_no,
GetMemoryAllocator(rep_->table_options),
block_type, get_context);
@ -2331,6 +2437,172 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
return s;
}
// This function reads multiple data blocks from disk using Env::MultiRead()
// and optionally inserts them into the block cache. It uses the scratch
// buffer provided by the caller, which is contiguous. If scratch is a nullptr
// it allocates a separate buffer for each block. Typically, if the blocks
// need to be uncompressed and there is no compressed block cache, callers
// can allocate a temporary scratch buffer in order to minimize memory
// allocations.
// If options.fill_cache is true, it inserts the blocks into cache. If its
// false and scratch is non-null and the blocks are uncompressed, it copies
// the buffers to heap. In any case, the CachableEntry<Block> returned will
// own the data bytes.
// batch - A MultiGetRange with only those keys with unique data blocks not
// found in cache
// handles - A vector of block handles. Some of them me be NULL handles
// scratch - An optional contiguous buffer to read compressed blocks into
void BlockBasedTable::MaybeLoadBlocksToCache(
const ReadOptions& options,
const MultiGetRange* batch,
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
autovector<
CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
char* scratch,
const UncompressionDict& uncompression_dict) const {
RandomAccessFileReader* file = rep_->file.get();
const Footer& footer = rep_->footer;
const ImmutableCFOptions& ioptions = rep_->ioptions;
SequenceNumber global_seqno = rep_->get_global_seqno(BlockType::kData);
size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit;
MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options);
if (file->use_direct_io() || ioptions.allow_mmap_reads) {
size_t idx_in_batch = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
BlockCacheLookupContext lookup_data_block_context(
TableReaderCaller::kUserMultiGet);
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
continue;
}
(*statuses)[idx_in_batch] = RetrieveBlock(nullptr, options, handle,
uncompression_dict, &(*results)[idx_in_batch], BlockType::kData,
mget_iter->get_context, &lookup_data_block_context);
}
return;
}
autovector<ReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
size_t buf_offset = 0;
size_t idx_in_batch = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
continue;
}
ReadRequest req;
req.len = handle.size() + kBlockTrailerSize;
if (scratch == nullptr) {
req.scratch = new char[req.len];
} else {
req.scratch = scratch + buf_offset;
buf_offset += req.len;
}
req.offset = handle.offset();
req.status = Status::OK();
read_reqs.emplace_back(req);
}
file->MultiRead(&read_reqs[0], read_reqs.size());
size_t read_req_idx = 0;
idx_in_batch = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
const BlockHandle& handle = (*handles)[idx_in_batch];
if (handle.IsNull()) {
continue;
}
ReadRequest& req = read_reqs[read_req_idx++];
Status s = req.status;
if (s.ok()) {
if (req.result.size() != handle.size() + kBlockTrailerSize) {
s = Status::Corruption("truncated block read from " +
rep_->file->file_name() + " offset " +
ToString(handle.offset()) + ", expected " +
ToString(handle.size() + kBlockTrailerSize) +
" bytes, got " + ToString(req.result.size()));
}
}
BlockContents raw_block_contents;
if (s.ok()) {
if (scratch == nullptr) {
// We allocated a buffer for this block. Give ownership of it to
// BlockContents so it can free the memory
assert(req.result.data() == req.scratch);
std::unique_ptr<char[]> raw_block(req.scratch);
raw_block_contents = BlockContents(std::move(raw_block),
handle.size());
} else {
// We used the scratch buffer, so no need to free anything
raw_block_contents = BlockContents(Slice(req.scratch,
handle.size()));
}
#ifndef NDEBUG
raw_block_contents.is_raw_block = true;
#endif
if (options.verify_checksums) {
PERF_TIMER_GUARD(block_checksum_time);
const char* data = req.result.data();
uint32_t expected = DecodeFixed32(data + handle.size() + 1);
s = rocksdb::VerifyChecksum(footer.checksum(), req.result.data(),
handle.size() + 1, expected);
}
}
if (s.ok()) {
if (options.fill_cache) {
BlockCacheLookupContext lookup_data_block_context(
TableReaderCaller::kUserMultiGet);
CachableEntry<Block>* block_entry = &(*results)[idx_in_batch];
// MaybeReadBlockAndLoadToCache will insert into the block caches if
// necessary. Since we're passing the raw block contents, it will
// avoid looking up the block cache
s = MaybeReadBlockAndLoadToCache(nullptr, options, handle,
uncompression_dict, block_entry, BlockType::kData,
mget_iter->get_context, &lookup_data_block_context,
&raw_block_contents);
} else {
CompressionType compression_type =
raw_block_contents.get_compression_type();
BlockContents contents;
if (compression_type != kNoCompression) {
UncompressionContext context(compression_type);
UncompressionInfo info(context, uncompression_dict, compression_type);
s = UncompressBlockContents(info, req.result.data(), handle.size(),
&contents, footer.version(), rep_->ioptions,
memory_allocator);
} else {
if (scratch != nullptr) {
// If we used the scratch buffer, then the contents need to be
// copied to heap
Slice raw = Slice(req.result.data(), handle.size());
contents = BlockContents(CopyBufferToHeap(
GetMemoryAllocator(rep_->table_options), raw),
handle.size());
} else {
contents = std::move(raw_block_contents);
}
}
if (s.ok()) {
(*results)[idx_in_batch].SetOwnedValue(new Block(std::move(contents),
global_seqno, read_amp_bytes_per_bit, ioptions.statistics));
}
}
}
(*statuses)[idx_in_batch] = s;
}
}
Status BlockBasedTable::RetrieveBlock(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
@ -2347,7 +2619,8 @@ Status BlockBasedTable::RetrieveBlock(
block_type != BlockType::kIndex)) {
s = MaybeReadBlockAndLoadToCache(prefetch_buffer, ro, handle,
uncompression_dict, block_entry,
block_type, get_context, lookup_context);
block_type, get_context, lookup_context,
/*contents=*/nullptr);
if (!s.ok()) {
return s;
@ -3248,8 +3521,101 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
iiter_unique_ptr.reset(iiter);
}
DataBlockIter biter;
uint64_t offset = std::numeric_limits<uint64_t>::max();
autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE> block_handles;
autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE> results;
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
static const size_t kMultiGetReadStackBufSize = 8192;
char stack_buf[kMultiGetReadStackBufSize];
std::unique_ptr<char[]> block_buf;
{
MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
sst_file_range.end());
BlockCacheLookupContext lookup_compression_dict_context(
TableReaderCaller::kUserMultiGet);
auto uncompression_dict_storage = GetUncompressionDict(nullptr, no_io,
sst_file_range.begin()->get_context,
&lookup_compression_dict_context);
const UncompressionDict& uncompression_dict =
uncompression_dict_storage.GetValue() == nullptr
? UncompressionDict::GetEmptyDict()
: *uncompression_dict_storage.GetValue();
size_t total_len = 0;
ReadOptions ro = read_options;
ro.read_tier = kBlockCacheTier;
for (auto miter = data_block_range.begin();
miter != data_block_range.end(); ++miter) {
const Slice& key = miter->ikey;
iiter->Seek(miter->ikey);
IndexValue v;
if (iiter->Valid()) {
v = iiter->value();
}
if (!iiter->Valid() ||
(!v.first_internal_key.empty() && !skip_filters &&
UserComparatorWrapper(rep_->internal_comparator.user_comparator())
.Compare(ExtractUserKey(key),
ExtractUserKey(v.first_internal_key)) < 0)) {
// The requested key falls between highest key in previous block and
// lowest key in current block.
*(miter->s) = iiter->status();
data_block_range.SkipKey(miter);
sst_file_range.SkipKey(miter);
continue;
}
statuses.emplace_back();
results.emplace_back();
if (v.handle.offset() == offset) {
// We're going to reuse the block for this key later on. No need to
// look it up now. Place a null handle
block_handles.emplace_back(BlockHandle::NullBlockHandle());
continue;
}
offset = v.handle.offset();
BlockHandle handle = v.handle;
Status s = GetDataBlockFromCache(ro, handle, uncompression_dict,
&(results.back()), BlockType::kData, miter->get_context);
if (s.ok() && !results.back().IsEmpty()) {
// Found it in the cache. Add NULL handle to indicate there is
// nothing to read from disk
block_handles.emplace_back(BlockHandle::NullBlockHandle());
} else {
block_handles.emplace_back(handle);
total_len += handle.size();
}
}
if (total_len) {
char* scratch = nullptr;
// If the blocks need to be uncompressed and we don't need the
// compressed blocks, then we can use a contiguous block of
// memory to read in all the blocks as it will be temporary
// storage
// 1. If blocks are compressed and compressed block cache is there,
// alloc heap bufs
// 2. If blocks are uncompressed, alloc heap bufs
// 3. If blocks are compressed and no compressed block cache, use
// stack buf
if (rep_->table_options.block_cache_compressed == nullptr &&
rep_->blocks_maybe_compressed) {
if (total_len <= kMultiGetReadStackBufSize) {
scratch = stack_buf;
} else {
scratch = new char[total_len];
block_buf.reset(scratch);
}
}
MaybeLoadBlocksToCache(read_options,
&data_block_range, &block_handles, &statuses, &results,
scratch, uncompression_dict);
}
}
DataBlockIter first_biter;
DataBlockIter next_biter;
size_t idx_in_batch = 0;
for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
++miter) {
Status s;
@ -3257,7 +3623,26 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
const Slice& key = miter->ikey;
bool matched = false; // if such user key matched a key in SST
bool done = false;
for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) {
bool first_block = true;
do {
DataBlockIter* biter = nullptr;
bool reusing_block = true;
uint64_t referenced_data_size = 0;
bool does_referenced_key_exist = false;
BlockCacheLookupContext lookup_data_block_context(
TableReaderCaller::kUserMultiGet);
if (first_block) {
if (!block_handles[idx_in_batch].IsNull() ||
!results[idx_in_batch].IsEmpty()) {
first_biter.Invalidate(Status::OK());
NewDataBlockIterator<DataBlockIter>(
read_options, results[idx_in_batch], &first_biter,
statuses[idx_in_batch]);
reusing_block = false;
}
biter = &first_biter;
idx_in_batch++;
} else {
IndexValue v = iiter->value();
if (!v.first_internal_key.empty() && !skip_filters &&
UserComparatorWrapper(rep_->internal_comparator.user_comparator())
@ -3268,72 +3653,67 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
break;
}
bool reusing_block = true;
uint64_t referenced_data_size = 0;
bool does_referenced_key_exist = false;
BlockCacheLookupContext lookup_data_block_context(
TableReaderCaller::kUserMultiGet);
if (iiter->value().handle.offset() != offset) {
offset = iiter->value().handle.offset();
biter.Invalidate(Status::OK());
next_biter.Invalidate(Status::OK());
NewDataBlockIterator<DataBlockIter>(
read_options, v.handle, &biter, BlockType::kData, get_context,
&lookup_data_block_context, Status(), nullptr);
read_options, iiter->value().handle, &next_biter,
BlockType::kData, get_context, &lookup_data_block_context,
Status(), nullptr);
biter = &next_biter;
reusing_block = false;
}
if (read_options.read_tier == kBlockCacheTier &&
biter.status().IsIncomplete()) {
biter->status().IsIncomplete()) {
// couldn't get block from block_cache
// Update Saver.state to Found because we are only looking for
// whether we can guarantee the key is not there when "no_io" is set
get_context->MarkKeyMayExist();
break;
}
if (!biter.status().ok()) {
s = biter.status();
if (!biter->status().ok()) {
s = biter->status();
break;
}
bool may_exist = biter.SeekForGet(key);
bool may_exist = biter->SeekForGet(key);
if (!may_exist) {
// HashSeek cannot find the key this block and the the iter is not
// the end of the block, i.e. cannot be in the following blocks
// either. In this case, the seek_key cannot be found, so we break
// from the top level for-loop.
done = true;
} else {
break;
}
// Call the *saver function on each entry/block until it returns false
for (; biter.Valid(); biter.Next()) {
for (; biter->Valid(); biter->Next()) {
ParsedInternalKey parsed_key;
Cleanable dummy;
Cleanable* value_pinner = nullptr;
if (!ParseInternalKey(biter.key(), &parsed_key)) {
if (!ParseInternalKey(biter->key(), &parsed_key)) {
s = Status::Corruption(Slice());
}
if (biter.IsValuePinned()) {
if (biter->IsValuePinned()) {
if (reusing_block) {
Cache* block_cache = rep_->table_options.block_cache.get();
assert(biter.cache_handle() != nullptr);
block_cache->Ref(biter.cache_handle());
assert(biter->cache_handle() != nullptr);
block_cache->Ref(biter->cache_handle());
dummy.RegisterCleanup(&ReleaseCachedEntry, block_cache,
biter.cache_handle());
biter->cache_handle());
value_pinner = &dummy;
} else {
value_pinner = &biter;
value_pinner = biter;
}
}
if (!get_context->SaveValue(parsed_key, biter.value(), &matched,
value_pinner)) {
if (!get_context->SaveValue(
parsed_key, biter->value(), &matched, value_pinner)) {
does_referenced_key_exist = true;
referenced_data_size = biter.key().size() + biter.value().size();
referenced_data_size = biter->key().size() + biter->value().size();
done = true;
break;
}
}
s = biter.status();
s = biter->status();
}
// Write the block cache access.
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) {
@ -3354,11 +3734,18 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
access_record, lookup_data_block_context.block_key,
rep_->cf_name_for_tracing(), key);
}
s = biter->status();
if (done) {
// Avoid the extra Next which is expensive in two-level indexes
break;
}
if (first_block) {
iiter->Seek(key);
}
first_block = false;
iiter->Next();
} while (iiter->Valid());
if (matched && filter != nullptr && !filter->IsBlockBased()) {
RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_TRUE_POSITIVE);
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,

@ -233,6 +233,12 @@ class BlockBasedTable : public TableReader {
BlockCacheLookupContext* lookup_context, Status s,
FilePrefetchBuffer* prefetch_buffer, bool for_compaction = false) const;
// input_iter: if it is not null, update this one and return it as Iterator
template <typename TBlockIter>
TBlockIter* NewDataBlockIterator(const ReadOptions& ro,
CachableEntry<Block>& block,
TBlockIter* input_iter, Status s) const;
class PartitionedIndexIteratorState;
friend class PartitionIndexReader;
@ -276,7 +282,8 @@ class BlockBasedTable : public TableReader {
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context) const;
GetContext* get_context, BlockCacheLookupContext* lookup_context,
BlockContents* contents) const;
// Similar to the above, with one crucial difference: it will retrieve the
// block from the file even if there are no caches configured (assuming the
@ -289,6 +296,20 @@ class BlockBasedTable : public TableReader {
BlockCacheLookupContext* lookup_context,
bool for_compaction = false) const;
Status GetDataBlockFromCache(
const ReadOptions& ro, const BlockHandle& handle,
const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context) const;
void MaybeLoadBlocksToCache(
const ReadOptions& options, const MultiGetRange* batch,
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
autovector<
CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
char* scratch, const UncompressionDict& uncompression_dict) const;
// For the following two functions:
// if `no_io == true`, we will not try to read filter/index from sst file
// were they not present in cache yet.

@ -26,7 +26,9 @@
#include "options/cf_options.h"
#include "port/port.h" // noexcept
#include "table/persistent_cache_options.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/xxhash.h"
namespace rocksdb {

@ -192,6 +192,49 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
return s;
}
Status RandomAccessFileReader::MultiRead(ReadRequest* read_reqs,
size_t num_reqs) const {
Status s;
uint64_t elapsed = 0;
assert(!use_direct_io());
assert(!for_compaction_);
{
StopWatch sw(env_, stats_, hist_type_,
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
auto prev_perf_level = GetPerfLevel();
IOSTATS_TIMER_GUARD(read_nanos);
#ifndef ROCKSDB_LITE
FileOperationInfo::TimePoint start_ts;
if (ShouldNotifyListeners()) {
start_ts = std::chrono::system_clock::now();
}
#endif // ROCKSDB_LITE
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->MultiRead(read_reqs, num_reqs);
}
for (size_t i = 0; i < num_reqs; ++i) {
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileReadFinish(read_reqs[i].offset,
read_reqs[i].result.size(), start_ts, finish_ts,
read_reqs[i].status);
}
#endif // ROCKSDB_LITE
IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
}
SetPerfLevel(prev_perf_level);
}
if (stats_ != nullptr && file_read_hist_ != nullptr) {
file_read_hist_->Add(elapsed);
}
return s;
}
Status WritableFileWriter::Append(const Slice& data) {
const char* src = data.data();
size_t left = data.size();

@ -161,6 +161,8 @@ class RandomAccessFileReader {
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch,
bool for_compaction = false) const;
Status MultiRead(ReadRequest* reqs, size_t num_reqs) const;
Status Prefetch(uint64_t offset, size_t n) const {
return file_->Prefetch(offset, n);
}

Loading…
Cancel
Save