Merge adjacent file block reads in RocksDB MultiGet() and Add uncompressed block to cache (#6089)

Summary:
In the current MultiGet, if the KV-pairs do not belong to the data blocks in the block cache, multiple blocks are read from a SST. It will trigger one block read for each block request and read them in parallel. In some cases, if some data blocks are adjacent in the SST, the reads for these blocks can be combined to a single large read, which can reduce the system calls and reduce the read latency if possible.

Considering to fill the block cache, if multiple data blocks are in the same memory buffer, we need to copy them to the heap separately. Therefore, only in the case that 1) data block compression is enabled, and 2) compressed block cache is null, we can do combined read. Otherwise, extra memory copy is needed, which may cause extra overhead. In the current case, data blocks will be uncompressed to a new memory space.

Also, in the case that 1) data block compression is enabled, and 2) compressed block cache is null, it is possible the data block is actually not compressed. In the current logic, these data blocks will not be added to the uncompressed_cache. So if memory buffer is shared and the data block is not compressed, the data block are copied to the head and fill the cache.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6089

Test Plan: Added test case to ParallelIO.MultiGet. Pass make asan_check

Differential Revision: D18734668

Pulled By: zhichao-cao

fbshipit-source-id: 67c5615ed373e51e42635fd74b36f8f3a66d5da4
main
Zhichao Cao 5 years ago committed by Facebook Github Bot
parent bcc372c0c3
commit cddd637997
  1. 77
      db/db_basic_test.cc
  2. 1
      table/block_based/block_based_table_builder.cc
  3. 141
      table/block_based/block_based_table_reader.cc

@ -6,7 +6,6 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// #include <iostream>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/perf_context.h"
@ -1706,6 +1705,14 @@ class DBBasicTestWithParallelIO
assert(Put(Key(i), values_[i]) == Status::OK());
}
Flush();
for (int i = 0; i < 100; ++i) {
// block cannot gain space by compression
uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0');
std::string tmp_key = "a" + Key(i);
assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK());
}
Flush();
}
bool CheckValue(int i, const std::string& value) {
@ -1715,6 +1722,13 @@ class DBBasicTestWithParallelIO
return false;
}
bool CheckUncompressableValue(int i, const std::string& value) {
if (uncompressable_values_[i].compare(value) == 0) {
return true;
}
return false;
}
int num_lookups() { return uncompressed_cache_->num_lookups(); }
int num_found() { return uncompressed_cache_->num_found(); }
int num_inserts() { return uncompressed_cache_->num_inserts(); }
@ -1864,6 +1878,7 @@ class DBBasicTestWithParallelIO
std::shared_ptr<MyBlockCache> uncompressed_cache_;
bool compression_enabled_;
std::vector<std::string> values_;
std::vector<std::string> uncompressable_values_;
bool fill_cache_;
};
@ -1928,8 +1943,66 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) {
ASSERT_OK(statuses[i]);
ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
}
expected_reads += (read_from_cache ? 2 : 4);
if (compression_enabled() && !has_compressed_cache()) {
expected_reads += (read_from_cache ? 2 : 3);
} else {
expected_reads += (read_from_cache ? 2 : 4);
}
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
keys.resize(10);
statuses.resize(10);
std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
for (size_t i = 0; i < key_uncmp.size(); ++i) {
key_data[i] = "a" + Key(key_uncmp[i]);
keys[i] = Slice(key_data[i]);
statuses[i] = Status::OK();
values[i].Reset();
}
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
for (size_t i = 0; i < key_uncmp.size(); ++i) {
ASSERT_OK(statuses[i]);
ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
}
if (compression_enabled() && !has_compressed_cache()) {
expected_reads += (read_from_cache ? 3 : 3);
} else {
expected_reads += (read_from_cache ? 4 : 4);
}
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
keys.resize(5);
statuses.resize(5);
std::vector<int> key_tr{1, 2, 15, 16, 55};
for (size_t i = 0; i < key_tr.size(); ++i) {
key_data[i] = "a" + Key(key_tr[i]);
keys[i] = Slice(key_data[i]);
statuses[i] = Status::OK();
values[i].Reset();
}
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
for (size_t i = 0; i < key_tr.size(); ++i) {
ASSERT_OK(statuses[i]);
ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
}
if (compression_enabled() && !has_compressed_cache()) {
expected_reads += (read_from_cache ? 0 : 2);
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
} else {
if (has_uncompressed_cache()) {
expected_reads += (read_from_cache ? 0 : 3);
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
} else {
// A rare case, even we enable the block compression but some of data
// blocks are not compressed due to content. If user only enable the
// compressed cache, the uncompressed blocks will not tbe cached, and
// block reads will be triggered. The number of reads is related to
// the compression algorithm.
ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
}
}
}
INSTANTIATE_TEST_CASE_P(

@ -11,7 +11,6 @@
#include <assert.h>
#include <stdio.h>
#include <list>
#include <map>
#include <memory>

@ -7,7 +7,6 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/block_based_table_reader.h"
#include <algorithm>
#include <array>
#include <limits>
@ -2280,6 +2279,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// false and scratch is non-null and the blocks are uncompressed, it copies
// the buffers to heap. In any case, the CachableEntry<Block> returned will
// own the data bytes.
// If compression is enabled and also there is no compressed block cache,
// the adjacent blocks are read out in one IO (combined read)
// batch - A MultiGetRange with only those keys with unique data blocks not
// found in cache
// handles - A vector of block handles. Some of them me be NULL handles
@ -2320,6 +2321,11 @@ void BlockBasedTable::RetrieveMultipleBlocks(
autovector<FSReadRequest, MultiGetContext::MAX_BATCH_SIZE> read_reqs;
size_t buf_offset = 0;
size_t idx_in_batch = 0;
uint64_t prev_offset = 0;
size_t prev_len = 0;
autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_idx_for_block;
autovector<size_t, MultiGetContext::MAX_BATCH_SIZE> req_offset_for_block;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
const BlockHandle& handle = (*handles)[idx_in_batch];
@ -2327,23 +2333,57 @@ void BlockBasedTable::RetrieveMultipleBlocks(
continue;
}
size_t prev_end = static_cast<size_t>(prev_offset) + prev_len;
// If current block is adjacent to the previous one, at the same time,
// compression is enabled and there is no compressed cache, we combine
// the two block read as one.
if (scratch != nullptr && prev_end == handle.offset()) {
req_offset_for_block.emplace_back(prev_len);
prev_len += block_size(handle);
} else {
// No compression or current block and previous one is not adjacent:
// Step 1, create a new request for previous blocks
if (prev_len != 0) {
FSReadRequest req;
req.offset = prev_offset;
req.len = prev_len;
if (scratch == nullptr) {
req.scratch = new char[req.len];
} else {
req.scratch = scratch + buf_offset;
buf_offset += req.len;
}
req.status = IOStatus::OK();
read_reqs.emplace_back(req);
}
// Step 2, remeber the previous block info
prev_offset = handle.offset();
prev_len = block_size(handle);
req_offset_for_block.emplace_back(0);
}
req_idx_for_block.emplace_back(read_reqs.size());
}
// Handle the last block and process the pending last request
if (prev_len != 0) {
FSReadRequest req;
req.len = block_size(handle);
req.offset = prev_offset;
req.len = prev_len;
if (scratch == nullptr) {
req.scratch = new char[req.len];
} else {
req.scratch = scratch + buf_offset;
buf_offset += req.len;
}
req.offset = handle.offset();
req.status = IOStatus::OK();
read_reqs.emplace_back(req);
}
file->MultiRead(&read_reqs[0], read_reqs.size());
size_t read_req_idx = 0;
idx_in_batch = 0;
size_t valid_batch_idx = 0;
for (auto mget_iter = batch->begin(); mget_iter != batch->end();
++mget_iter, ++idx_in_batch) {
const BlockHandle& handle = (*handles)[idx_in_batch];
@ -2352,41 +2392,86 @@ void BlockBasedTable::RetrieveMultipleBlocks(
continue;
}
FSReadRequest& req = read_reqs[read_req_idx++];
assert(valid_batch_idx < req_idx_for_block.size());
assert(valid_batch_idx < req_offset_for_block.size());
assert(req_idx_for_block[valid_batch_idx] < read_reqs.size());
size_t& req_idx = req_idx_for_block[valid_batch_idx];
size_t& req_offset = req_offset_for_block[valid_batch_idx];
valid_batch_idx++;
FSReadRequest& req = read_reqs[req_idx];
Status s = req.status;
if (s.ok()) {
if (req.result.size() != req.len) {
s = Status::Corruption("truncated block read from " +
rep_->file->file_name() + " offset " +
ToString(handle.offset()) + ", expected " +
ToString(req.len) +
" bytes, got " + ToString(req.result.size()));
s = Status::Corruption(
"truncated block read from " + rep_->file->file_name() +
" offset " + ToString(handle.offset()) + ", expected " +
ToString(req.len) + " bytes, got " + ToString(req.result.size()));
}
}
BlockContents raw_block_contents;
size_t cur_read_end = req_offset + block_size(handle);
if (cur_read_end > req.result.size()) {
s = Status::Corruption(
"truncated block read from " + rep_->file->file_name() + " offset " +
ToString(handle.offset()) + ", expected " + ToString(req.len) +
" bytes, got " + ToString(req.result.size()));
}
bool blocks_share_read_buffer = (req.result.size() != block_size(handle));
if (s.ok()) {
if (scratch == nullptr) {
if (scratch == nullptr && !blocks_share_read_buffer) {
// We allocated a buffer for this block. Give ownership of it to
// BlockContents so it can free the memory
assert(req.result.data() == req.scratch);
std::unique_ptr<char[]> raw_block(req.scratch);
std::unique_ptr<char[]> raw_block(req.scratch + req_offset);
raw_block_contents = BlockContents(std::move(raw_block), handle.size());
} else {
// We used the scratch buffer, so no need to free anything
raw_block_contents = BlockContents(Slice(req.scratch, handle.size()));
// We used the scratch buffer which are shared by the blocks.
// raw_block_contents does not have the ownership.
raw_block_contents =
BlockContents(Slice(req.scratch + req_offset, handle.size()));
}
#ifndef NDEBUG
raw_block_contents.is_raw_block = true;
#endif
if (options.verify_checksums) {
PERF_TIMER_GUARD(block_checksum_time);
const char* data = req.result.data();
uint32_t expected = DecodeFixed32(data + handle.size() + 1);
s = rocksdb::VerifyChecksum(footer.checksum(), req.result.data(),
uint32_t expected =
DecodeFixed32(data + req_offset + handle.size() + 1);
// Since the scratch might be shared. the offset of the data block in
// the buffer might not be 0. req.result.data() only point to the
// begin address of each read request, we need to add the offset
// in each read request. Checksum is stored in the block trailer,
// which is handle.size() + 1.
s = rocksdb::VerifyChecksum(footer.checksum(),
req.result.data() + req_offset,
handle.size() + 1, expected);
}
}
if (s.ok()) {
// It handles a rare case: compression is set and these is no compressed
// cache (enable combined read). In this case, the scratch != nullptr.
// At the same time, some blocks are actually not compressed,
// since its compression space saving is smaller than the threshold. In
// this case, if the block shares the scratch memory, we need to copy it
// to the heap such that it can be added to the regular block cache.
CompressionType compression_type =
raw_block_contents.get_compression_type();
if (scratch != nullptr && compression_type == kNoCompression) {
Slice raw = Slice(req.scratch + req_offset, block_size(handle));
raw_block_contents = BlockContents(
CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),
handle.size());
#ifndef NDEBUG
raw_block_contents.is_raw_block = true;
#endif
}
}
if (s.ok()) {
if (options.fill_cache) {
BlockCacheLookupContext lookup_data_block_context(
@ -2415,25 +2500,21 @@ void BlockBasedTable::RetrieveMultipleBlocks(
if (compression_type != kNoCompression) {
UncompressionContext context(compression_type);
UncompressionInfo info(context, uncompression_dict, compression_type);
s = UncompressBlockContents(info, req.result.data(), handle.size(),
&contents, footer.version(),
s = UncompressBlockContents(info, req.result.data() + req_offset,
handle.size(), &contents, footer.version(),
rep_->ioptions, memory_allocator);
} else {
if (scratch != nullptr) {
// If we used the scratch buffer, then the contents need to be
// copied to heap
Slice raw = Slice(req.result.data(), handle.size());
contents = BlockContents(
CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),
handle.size());
} else {
contents = std::move(raw_block_contents);
}
// There are two cases here: 1) caller uses the scratch buffer; 2) we
// use the requst buffer. If scratch buffer is used, we ensure that
// all raw blocks are copyed to the heap as single blocks. If scratch
// buffer is not used, we also have no combined read, so the raw
// block can be used directly.
contents = std::move(raw_block_contents);
}
if (s.ok()) {
(*results)[idx_in_batch].SetOwnedValue(
new Block(std::move(contents), global_seqno,
read_amp_bytes_per_bit, ioptions.statistics));
new Block(std::move(contents), global_seqno, read_amp_bytes_per_bit,
ioptions.statistics));
}
}
(*statuses)[idx_in_batch] = s;

Loading…
Cancel
Save