Ensure that MultiGet works properly with compressed cache (#7756)

Summary:
Ensure that when direct IO is enabled and a compressed block cache is
configured, MultiGet inserts compressed data blocks into the compressed
block cache.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7756

Test Plan: Add unit test to db_basic_test

Reviewed By: cheng-chang

Differential Revision: D25416240

Pulled By: anand1976

fbshipit-source-id: 75d57526370c9c0a45ff72651f3278dbd8a9086f
main
anand76 4 years ago committed by Facebook GitHub Bot
parent 3c2a448856
commit 8a1488efbf
  1. 1
      HISTORY.md
  2. 120
      db/db_basic_test.cc
  3. 18
      table/block_based/block_based_table_reader.cc

@ -5,6 +5,7 @@
### Bug Fixes
* Truncated WALs ending in incomplete records can no longer produce gaps in the recovered data when `WALRecoveryMode::kPointInTimeRecovery` is used. Gaps are still possible when WALs are truncated exactly on record boundaries; for complete protection, users should enable `track_and_verify_wals_in_manifest`.
* Fix a bug where compressed blocks read by MultiGet are not inserted into the compressed block cache when use_direct_reads = true.
### Bug Fixes
* Fixed the logic of populating native data structure for `read_amp_bytes_per_bit` during OPTIONS file parsing on big-endian architecture. Without this fix, original code introduced in PR7659, when running on big-endian machine, can mistakenly store read_amp_bytes_per_bit (an uint32) in little endian format. Future access to `read_amp_bytes_per_bit` will give wrong values. Little endian architecture is not affected.

@ -2693,6 +2693,7 @@ class DBBasicTestMultiGet : public DBTestBase {
} else {
options.compression_opts.parallel_threads = compression_parallel_threads;
}
options_ = options;
Reopen(options);
if (num_cfs > 1) {
@ -2762,6 +2763,7 @@ class DBBasicTestMultiGet : public DBTestBase {
bool compression_enabled() { return compression_enabled_; }
bool has_compressed_cache() { return compressed_cache_ != nullptr; }
bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
Options get_options() { return options_; }
static void SetUpTestCase() {}
static void TearDownTestCase() {}
@ -2847,6 +2849,7 @@ class DBBasicTestMultiGet : public DBTestBase {
std::shared_ptr<MyBlockCache> compressed_cache_;
std::shared_ptr<MyBlockCache> uncompressed_cache_;
Options options_;
bool compression_enabled_;
std::vector<std::string> values_;
std::vector<std::string> uncompressable_values_;
@ -2989,6 +2992,123 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) {
}
}
#ifndef ROCKSDB_LITE
TEST_P(DBBasicTestWithParallelIO, MultiGetDirectIO) {
class FakeDirectIOEnv : public EnvWrapper {
class FakeDirectIOSequentialFile;
class FakeDirectIORandomAccessFile;
public:
FakeDirectIOEnv(Env* env) : EnvWrapper(env) {}
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) override {
std::unique_ptr<RandomAccessFile> file;
assert(options.use_direct_reads);
EnvOptions opts = options;
opts.use_direct_reads = false;
Status s = target()->NewRandomAccessFile(fname, &file, opts);
if (!s.ok()) {
return s;
}
result->reset(new FakeDirectIORandomAccessFile(std::move(file)));
return s;
}
private:
class FakeDirectIOSequentialFile : public SequentialFileWrapper {
public:
FakeDirectIOSequentialFile(std::unique_ptr<SequentialFile>&& file)
: SequentialFileWrapper(file.get()), file_(std::move(file)) {}
~FakeDirectIOSequentialFile() {}
bool use_direct_io() const override { return true; }
size_t GetRequiredBufferAlignment() const override { return 1; }
private:
std::unique_ptr<SequentialFile> file_;
};
class FakeDirectIORandomAccessFile : public RandomAccessFileWrapper {
public:
FakeDirectIORandomAccessFile(std::unique_ptr<RandomAccessFile>&& file)
: RandomAccessFileWrapper(file.get()), file_(std::move(file)) {}
~FakeDirectIORandomAccessFile() {}
bool use_direct_io() const override { return true; }
size_t GetRequiredBufferAlignment() const override { return 1; }
private:
std::unique_ptr<RandomAccessFile> file_;
};
};
std::unique_ptr<FakeDirectIOEnv> env(new FakeDirectIOEnv(env_));
Options opts = get_options();
opts.env = env.get();
opts.use_direct_reads = true;
Reopen(opts);
std::vector<std::string> key_data(10);
std::vector<Slice> keys;
// We cannot resize a PinnableSlice vector, so just set initial size to
// largest we think we will need
std::vector<PinnableSlice> values(10);
std::vector<Status> statuses;
ReadOptions ro;
ro.fill_cache = fill_cache();
// Warm up the cache first
key_data.emplace_back(Key(0));
keys.emplace_back(Slice(key_data.back()));
key_data.emplace_back(Key(50));
keys.emplace_back(Slice(key_data.back()));
statuses.resize(keys.size());
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(0, values[0].ToString()));
ASSERT_TRUE(CheckValue(50, values[1].ToString()));
int random_reads = env_->random_read_counter_.Read();
key_data[0] = Key(1);
key_data[1] = Key(51);
keys[0] = Slice(key_data[0]);
keys[1] = Slice(key_data[1]);
values[0].Reset();
values[1].Reset();
if (uncompressed_cache_) {
uncompressed_cache_->SetCapacity(0);
uncompressed_cache_->SetCapacity(1048576);
}
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data(), true);
ASSERT_TRUE(CheckValue(1, values[0].ToString()));
ASSERT_TRUE(CheckValue(51, values[1].ToString()));
bool read_from_cache = false;
if (fill_cache()) {
if (has_uncompressed_cache()) {
read_from_cache = true;
} else if (has_compressed_cache() && compression_enabled()) {
read_from_cache = true;
}
}
int expected_reads = random_reads;
if (!compression_enabled() || !has_compressed_cache()) {
expected_reads += 2;
} else {
expected_reads += (read_from_cache ? 0 : 2);
}
if (env_->random_read_counter_.Read() != expected_reads) {
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
}
Close();
}
#endif // ROCKSDB_LITE
TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
std::vector<std::string> key_data(10);
std::vector<Slice> keys;

@ -1821,13 +1821,21 @@ void BlockBasedTable::RetrieveMultipleBlocks(
if (s.ok()) {
// When the blocks share the same underlying buffer (scratch or direct io
// buffer), if the block is compressed, the shared buffer will be
// uncompressed into heap during uncompressing; otherwise, we need to
// manually copy the block into heap before inserting the block to block
// cache.
// buffer), we may need to manually copy the block into heap if the raw
// block has to be inserted into a cache. That falls into th following
// cases -
// 1. Raw block is not compressed, it needs to be inserted into the
// uncompressed block cache if there is one
// 2. If the raw block is compressed, it needs to be inserted into the
// compressed block cache if there is one
//
// In all other cases, the raw block is either uncompressed into a heap
// buffer or there is no cache at all.
CompressionType compression_type =
raw_block_contents.get_compression_type();
if (use_shared_buffer && compression_type == kNoCompression) {
if (use_shared_buffer && (compression_type == kNoCompression ||
(compression_type != kNoCompression &&
rep_->table_options.block_cache_compressed))) {
Slice raw = Slice(req.result.data() + req_offset, block_size(handle));
raw_block_contents = BlockContents(
CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw),

Loading…
Cancel
Save