Enable blob caching for MultiGetBlob in RocksDB (#10272)

Summary:
- [x] Enabled blob caching for MultiGetBlob in RocksDB
- [x] Refactored MultiGetBlob logic and interface in RocksDB
- [x] Cleaned up Version::MultiGetBlob() and moved 'blob'-related code snippets into BlobSource
- [x] Add End-to-end test cases in db_blob_basic_test and also add unit tests in blob_source_test

This task is a part of https://github.com/facebook/rocksdb/issues/10156

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10272

Reviewed By: ltamasi

Differential Revision: D37558112

Pulled By: gangliao

fbshipit-source-id: a73a6a94ffdee0024d5b2a39e6d1c1a7d38664db
main
Gang Liao 3 years ago committed by Facebook GitHub Bot
parent 20754b3654
commit 056e08d6c4
  1. 90
      db/blob/blob_file_reader.cc
  2. 10
      db/blob/blob_file_reader.h
  3. 95
      db/blob/blob_file_reader_test.cc
  4. 58
      db/blob/blob_read_request.h
  5. 112
      db/blob/blob_source.cc
  6. 46
      db/blob/blob_source.h
  7. 283
      db/blob/blob_source_test.cc
  8. 204
      db/blob/db_blob_basic_test.cc
  9. 140
      db/version_set.cc
  10. 8
      db/version_set.h
  11. 4
      db/version_set_sync_and_async.h

@ -16,6 +16,7 @@
#include "rocksdb/file_system.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "table/multiget_context.h"
#include "test_util/sync_point.h"
#include "util/compression.h"
#include "util/crc32c.h"
@ -374,40 +375,50 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
return Status::OK();
}
void BlobFileReader::MultiGetBlob(
const ReadOptions& read_options,
const autovector<std::reference_wrapper<const Slice>>& user_keys,
const autovector<uint64_t>& offsets,
const autovector<uint64_t>& value_sizes, autovector<Status*>& statuses,
autovector<PinnableSlice*>& values, uint64_t* bytes_read) const {
const size_t num_blobs = user_keys.size();
void BlobFileReader::MultiGetBlob(const ReadOptions& read_options,
autovector<BlobReadRequest*>& blob_reqs,
uint64_t* bytes_read) const {
const size_t num_blobs = blob_reqs.size();
assert(num_blobs > 0);
assert(num_blobs == offsets.size());
assert(num_blobs == value_sizes.size());
assert(num_blobs == statuses.size());
assert(num_blobs == values.size());
assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);
#ifndef NDEBUG
for (size_t i = 0; i < offsets.size() - 1; ++i) {
assert(offsets[i] <= offsets[i + 1]);
for (size_t i = 0; i < num_blobs - 1; ++i) {
assert(blob_reqs[i]->offset <= blob_reqs[i + 1]->offset);
}
#endif // !NDEBUG
std::vector<FSReadRequest> read_reqs(num_blobs);
std::vector<FSReadRequest> read_reqs;
autovector<uint64_t> adjustments;
uint64_t total_len = 0;
read_reqs.reserve(num_blobs);
for (size_t i = 0; i < num_blobs; ++i) {
const size_t key_size = user_keys[i].get().size();
assert(IsValidBlobOffset(offsets[i], key_size, value_sizes[i], file_size_));
const size_t key_size = blob_reqs[i]->user_key->size();
const uint64_t offset = blob_reqs[i]->offset;
const uint64_t value_size = blob_reqs[i]->len;
if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
*blob_reqs[i]->status = Status::Corruption("Invalid blob offset");
continue;
}
if (blob_reqs[i]->compression != compression_type_) {
*blob_reqs[i]->status =
Status::Corruption("Compression type mismatch when reading a blob");
continue;
}
const uint64_t adjustment =
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
: 0;
assert(offsets[i] >= adjustment);
assert(blob_reqs[i]->offset >= adjustment);
adjustments.push_back(adjustment);
read_reqs[i].offset = offsets[i] - adjustment;
read_reqs[i].len = value_sizes[i] + adjustment;
total_len += read_reqs[i].len;
FSReadRequest read_req;
read_req.offset = blob_reqs[i]->offset - adjustment;
read_req.len = blob_reqs[i]->len + adjustment;
read_reqs.emplace_back(read_req);
total_len += read_req.len;
}
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len);
@ -439,9 +450,12 @@ void BlobFileReader::MultiGetBlob(
for (auto& req : read_reqs) {
req.status.PermitUncheckedError();
}
for (size_t i = 0; i < num_blobs; ++i) {
assert(statuses[i]);
*statuses[i] = s;
for (auto& req : blob_reqs) {
assert(req->status);
if (!req->status->IsCorruption()) {
// Avoid overwriting corruption status.
*req->status = s;
}
}
return;
}
@ -449,33 +463,39 @@ void BlobFileReader::MultiGetBlob(
assert(s.ok());
uint64_t total_bytes = 0;
for (size_t i = 0; i < num_blobs; ++i) {
auto& req = read_reqs[i];
const auto& record_slice = req.result;
for (size_t i = 0, j = 0; i < num_blobs; ++i) {
assert(blob_reqs[i]->status);
if (!blob_reqs[i]->status->ok()) {
continue;
}
assert(statuses[i]);
assert(j < read_reqs.size());
auto& req = read_reqs[j++];
const auto& record_slice = req.result;
if (req.status.ok() && record_slice.size() != req.len) {
req.status = IOStatus::Corruption("Failed to read data from blob file");
}
*statuses[i] = req.status;
if (!statuses[i]->ok()) {
*blob_reqs[i]->status = req.status;
if (!blob_reqs[i]->status->ok()) {
continue;
}
// Verify checksums if enabled
if (read_options.verify_checksums) {
*statuses[i] = VerifyBlob(record_slice, user_keys[i], value_sizes[i]);
if (!statuses[i]->ok()) {
*blob_reqs[i]->status =
VerifyBlob(record_slice, *blob_reqs[i]->user_key, blob_reqs[i]->len);
if (!blob_reqs[i]->status->ok()) {
continue;
}
}
// Uncompress blob if needed
Slice value_slice(record_slice.data() + adjustments[i], value_sizes[i]);
*statuses[i] = UncompressBlobIfNeeded(value_slice, compression_type_,
clock_, statistics_, values[i]);
if (statuses[i]->ok()) {
Slice value_slice(record_slice.data() + adjustments[i], blob_reqs[i]->len);
*blob_reqs[i]->status =
UncompressBlobIfNeeded(value_slice, compression_type_, clock_,
statistics_, blob_reqs[i]->result);
if (blob_reqs[i]->status->ok()) {
total_bytes += record_slice.size();
}
}

@ -8,6 +8,7 @@
#include <cinttypes>
#include <memory>
#include "db/blob/blob_read_request.h"
#include "file/random_access_file_reader.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/rocksdb_namespace.h"
@ -47,12 +48,9 @@ class BlobFileReader {
uint64_t* bytes_read) const;
// offsets must be sorted in ascending order by caller.
void MultiGetBlob(
const ReadOptions& read_options,
const autovector<std::reference_wrapper<const Slice>>& user_keys,
const autovector<uint64_t>& offsets,
const autovector<uint64_t>& value_sizes, autovector<Status*>& statuses,
autovector<PinnableSlice*>& values, uint64_t* bytes_read) const;
void MultiGetBlob(const ReadOptions& read_options,
autovector<BlobReadRequest*>& blob_reqs,
uint64_t* bytes_read) const;
CompressionType GetCompressionType() const { return compression_type_; }

@ -194,21 +194,21 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
// MultiGetBlob
bytes_read = 0;
size_t total_size = 0;
autovector<std::reference_wrapper<const Slice>> key_refs;
for (const auto& key_ref : keys) {
key_refs.emplace_back(std::cref(key_ref));
}
autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
blob_offsets[2]};
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};
std::array<Status, num_blobs> statuses_buf;
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
&statuses_buf[2]};
std::array<PinnableSlice, num_blobs> value_buf;
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
&value_buf[2]};
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
values, &bytes_read);
std::array<BlobReadRequest, num_blobs> requests_buf;
autovector<BlobReadRequest*> blob_reqs;
for (size_t i = 0; i < num_blobs; ++i) {
requests_buf[i] =
BlobReadRequest(keys[i], blob_offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
blob_reqs.push_back(&requests_buf[i]);
}
reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_OK(statuses_buf[i]);
ASSERT_EQ(value_buf[i], blobs[i]);
@ -300,15 +300,21 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
blob_offsets[0],
blob_offsets[1] - (keys[1].size() - key_refs[1].get().size()),
blob_offsets[2]};
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};
std::array<Status, num_blobs> statuses_buf;
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
&statuses_buf[2]};
std::array<PinnableSlice, num_blobs> value_buf;
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
&value_buf[2]};
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
values, &bytes_read);
std::array<BlobReadRequest, num_blobs> requests_buf;
autovector<BlobReadRequest*> blob_reqs;
for (size_t i = 0; i < num_blobs; ++i) {
requests_buf[i] =
BlobReadRequest(key_refs[i], offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
blob_reqs.push_back(&requests_buf[i]);
}
reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);
for (size_t i = 0; i < num_blobs; ++i) {
if (i == 1) {
ASSERT_TRUE(statuses_buf[i].IsCorruption());
@ -339,17 +345,21 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
Slice wrong_key_slice(incorrect_key, sizeof(incorrect_key) - 1);
key_refs[2] = std::cref(wrong_key_slice);
autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
blob_offsets[2]};
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};
std::array<Status, num_blobs> statuses_buf;
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
&statuses_buf[2]};
std::array<PinnableSlice, num_blobs> value_buf;
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
&value_buf[2]};
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
values, &bytes_read);
std::array<BlobReadRequest, num_blobs> requests_buf;
for (size_t i = 0; i < num_blobs; ++i) {
requests_buf[i] =
BlobReadRequest(key_refs[i], blob_offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
}
autovector<BlobReadRequest*> blob_reqs = {
&requests_buf[0], &requests_buf[1], &requests_buf[2]};
reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);
for (size_t i = 0; i < num_blobs; ++i) {
if (i == num_blobs - 1) {
ASSERT_TRUE(statuses_buf[i].IsCorruption());
@ -376,17 +386,26 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
for (const auto& key_ref : keys) {
key_refs.emplace_back(std::cref(key_ref));
}
autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
blob_offsets[2]};
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1] + 1, blob_sizes[2]};
std::array<Status, num_blobs> statuses_buf;
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
&statuses_buf[2]};
std::array<PinnableSlice, num_blobs> value_buf;
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
&value_buf[2]};
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
values, &bytes_read);
std::array<BlobReadRequest, num_blobs> requests_buf;
requests_buf[0] =
BlobReadRequest(key_refs[0], blob_offsets[0], blob_sizes[0],
kNoCompression, &value_buf[0], &statuses_buf[0]);
requests_buf[1] =
BlobReadRequest(key_refs[1], blob_offsets[1], blob_sizes[1] + 1,
kNoCompression, &value_buf[1], &statuses_buf[1]);
requests_buf[2] =
BlobReadRequest(key_refs[2], blob_offsets[2], blob_sizes[2],
kNoCompression, &value_buf[2], &statuses_buf[2]);
autovector<BlobReadRequest*> blob_reqs = {
&requests_buf[0], &requests_buf[1], &requests_buf[2]};
reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);
for (size_t i = 0; i < num_blobs; ++i) {
if (i != 1) {
ASSERT_OK(statuses_buf[i]);

@ -0,0 +1,58 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cinttypes>
#include "rocksdb/compression_type.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/autovector.h"
namespace ROCKSDB_NAMESPACE {
// A read Blob request structure for use in BlobSource::MultiGetBlob and
// BlobFileReader::MultiGetBlob.
struct BlobReadRequest {
// User key to lookup the paired blob
const Slice* user_key = nullptr;
// File offset in bytes
uint64_t offset = 0;
// Length to read in bytes
size_t len = 0;
// Blob compression type
CompressionType compression = kNoCompression;
// Output parameter set by MultiGetBlob() to point to the data buffer, and
// the number of valid bytes
PinnableSlice* result = nullptr;
// Status of read
Status* status = nullptr;
BlobReadRequest(const Slice& _user_key, uint64_t _offset, size_t _len,
CompressionType _compression, PinnableSlice* _result,
Status* _status)
: user_key(&_user_key),
offset(_offset),
len(_len),
compression(_compression),
result(_result),
status(_status) {}
BlobReadRequest() = default;
BlobReadRequest(const BlobReadRequest& other) = default;
BlobReadRequest& operator=(const BlobReadRequest& other) = default;
};
using BlobFileReadRequests =
std::tuple<uint64_t /* file_number */, uint64_t /* file_size */,
autovector<BlobReadRequest>>;
} // namespace ROCKSDB_NAMESPACE

@ -202,31 +202,51 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
return s;
}
void BlobSource::MultiGetBlob(
const ReadOptions& read_options,
const autovector<std::reference_wrapper<const Slice>>& user_keys,
uint64_t file_number, uint64_t file_size,
const autovector<uint64_t>& offsets,
const autovector<uint64_t>& value_sizes, autovector<Status*>& statuses,
autovector<PinnableSlice*>& blobs, uint64_t* bytes_read) {
size_t num_blobs = user_keys.size();
void BlobSource::MultiGetBlob(const ReadOptions& read_options,
autovector<BlobFileReadRequests>& blob_reqs,
uint64_t* bytes_read) {
assert(blob_reqs.size() > 0);
uint64_t total_bytes_read = 0;
uint64_t bytes_read_in_file = 0;
for (auto& [file_number, file_size, blob_reqs_in_file] : blob_reqs) {
// sort blob_reqs_in_file by file offset.
std::sort(
blob_reqs_in_file.begin(), blob_reqs_in_file.end(),
[](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
return lhs.offset < rhs.offset;
});
MultiGetBlobFromOneFile(read_options, file_number, file_size,
blob_reqs_in_file, &bytes_read_in_file);
total_bytes_read += bytes_read_in_file;
}
if (bytes_read) {
*bytes_read = total_bytes_read;
}
}
void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
uint64_t file_number,
uint64_t file_size,
autovector<BlobReadRequest>& blob_reqs,
uint64_t* bytes_read) {
const size_t num_blobs = blob_reqs.size();
assert(num_blobs > 0);
assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);
assert(num_blobs == offsets.size());
assert(num_blobs == value_sizes.size());
assert(num_blobs == statuses.size());
assert(num_blobs == blobs.size());
#ifndef NDEBUG
for (size_t i = 0; i < offsets.size() - 1; ++i) {
assert(offsets[i] <= offsets[i + 1]);
for (size_t i = 0; i < num_blobs - 1; ++i) {
assert(blob_reqs[i].offset <= blob_reqs[i + 1].offset);
}
#endif // !NDEBUG
using Mask = uint64_t;
Mask cache_hit_mask = 0;
Status s;
uint64_t total_bytes = 0;
const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number,
file_size);
@ -234,15 +254,18 @@ void BlobSource::MultiGetBlob(
if (blob_cache_) {
size_t cached_blob_count = 0;
for (size_t i = 0; i < num_blobs; ++i) {
auto& req = blob_reqs[i];
CachableEntry<std::string> blob_entry;
const CacheKey cache_key = base_cache_key.WithOffset(offsets[i]);
const CacheKey cache_key = base_cache_key.WithOffset(req.offset);
const Slice key = cache_key.AsSlice();
s = GetBlobFromCache(key, &blob_entry);
const Status s = GetBlobFromCache(key, &blob_entry);
if (s.ok() && blob_entry.GetValue()) {
assert(statuses[i]);
*statuses[i] = s;
blobs[i]->PinSelf(*blob_entry.GetValue());
assert(req.status);
*req.status = s;
req.result->PinSelf(*blob_entry.GetValue());
// Update the counter for the number of valid blobs read from the cache.
++cached_blob_count;
@ -251,10 +274,10 @@ void BlobSource::MultiGetBlob(
uint64_t adjustment =
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(
user_keys[i].get().size())
req.user_key->size())
: 0;
assert(offsets[i] >= adjustment);
total_bytes += value_sizes[i] + adjustment;
assert(req.offset >= adjustment);
total_bytes += req.len + adjustment;
cache_hit_mask |= (Mask{1} << i); // cache hit
}
}
@ -272,8 +295,8 @@ void BlobSource::MultiGetBlob(
if (no_io) {
for (size_t i = 0; i < num_blobs; ++i) {
if (!(cache_hit_mask & (Mask{1} << i))) {
assert(statuses[i]);
*statuses[i] =
assert(blob_reqs[i].status);
*blob_reqs[i].status =
Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
}
}
@ -282,50 +305,43 @@ void BlobSource::MultiGetBlob(
{
// Find the rest of blobs from the file since I/O is allowed.
autovector<std::reference_wrapper<const Slice>> _user_keys;
autovector<uint64_t> _offsets;
autovector<uint64_t> _value_sizes;
autovector<Status*> _statuses;
autovector<PinnableSlice*> _blobs;
autovector<BlobReadRequest*> _blob_reqs;
uint64_t _bytes_read = 0;
for (size_t i = 0; i < num_blobs; ++i) {
if (!(cache_hit_mask & (Mask{1} << i))) {
_user_keys.emplace_back(user_keys[i]);
_offsets.push_back(offsets[i]);
_value_sizes.push_back(value_sizes[i]);
_statuses.push_back(statuses[i]);
_blobs.push_back(blobs[i]);
_blob_reqs.push_back(&blob_reqs[i]);
}
}
CacheHandleGuard<BlobFileReader> blob_file_reader;
s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
Status s =
blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
if (!s.ok()) {
for (size_t i = 0; i < _blobs.size(); ++i) {
assert(_statuses[i]);
*_statuses[i] = s;
for (size_t i = 0; i < _blob_reqs.size(); ++i) {
assert(_blob_reqs[i]->status);
*_blob_reqs[i]->status = s;
}
return;
}
assert(blob_file_reader.GetValue());
blob_file_reader.GetValue()->MultiGetBlob(read_options, _user_keys,
_offsets, _value_sizes, _statuses,
_blobs, &_bytes_read);
blob_file_reader.GetValue()->MultiGetBlob(read_options, _blob_reqs,
&_bytes_read);
if (read_options.fill_cache) {
if (blob_cache_ && read_options.fill_cache) {
// If filling cache is allowed and a cache is configured, try to put
// the blob(s) to the cache.
for (size_t i = 0; i < _blobs.size(); ++i) {
if (_statuses[i]->ok()) {
for (size_t i = 0; i < _blob_reqs.size(); ++i) {
if (_blob_reqs[i]->status->ok()) {
CachableEntry<std::string> blob_entry;
const CacheKey cache_key = base_cache_key.WithOffset(_offsets[i]);
const CacheKey cache_key =
base_cache_key.WithOffset(_blob_reqs[i]->offset);
const Slice key = cache_key.AsSlice();
s = PutBlobIntoCache(key, &blob_entry, _blobs[i]);
s = PutBlobIntoCache(key, &blob_entry, _blob_reqs[i]->result);
if (!s.ok()) {
*_statuses[i] = s;
*_blob_reqs[i]->status = s;
}
}
}

@ -10,6 +10,7 @@
#include "cache/cache_helpers.h"
#include "cache/cache_key.h"
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_read_request.h"
#include "rocksdb/cache.h"
#include "rocksdb/rocksdb_namespace.h"
#include "table/block_based/cachable_entry.h"
@ -37,7 +38,7 @@ class BlobSource {
~BlobSource();
// Read a blob from the underlying cache or storage.
// Read a blob from the underlying cache or one blob file.
//
// If successful, returns ok and sets "*value" to the newly retrieved
// uncompressed blob. If there was an error while fetching the blob, sets
@ -52,25 +53,44 @@ class BlobSource {
FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value,
uint64_t* bytes_read);
// Read multiple blobs from the underlying cache or storage.
// Read multiple blobs from the underlying cache or blob file(s).
//
// If successful, returns ok and sets the elements of blobs to the newly
// retrieved uncompressed blobs. If there was an error while fetching one of
// blobs, sets its corresponding "blobs[i]" to empty and sets "statuses[i]" to
// a non-ok status.
// If successful, returns ok and sets "result" in the elements of "blob_reqs"
// to the newly retrieved uncompressed blobs. If there was an error while
// fetching one of blobs, sets its "result" to empty and sets its
// corresponding "status" to a non-ok status.
//
// Note:
// - Offsets must be sorted in ascending order by caller.
// - The main difference between this function and MultiGetBlobFromOneFile is
// that this function can read multiple blobs from multiple blob files.
//
// - For consistency, whether the blob is found in the cache or on disk, sets
// "*bytes_read" to the total size of on-disk (possibly compressed) blob
// records.
void MultiGetBlob(
const ReadOptions& read_options,
const autovector<std::reference_wrapper<const Slice>>& user_keys,
void MultiGetBlob(const ReadOptions& read_options,
autovector<BlobFileReadRequests>& blob_reqs,
uint64_t* bytes_read);
// Read multiple blobs from the underlying cache or one blob file.
//
// If successful, returns ok and sets "result" in the elements of "blob_reqs"
// to the newly retrieved uncompressed blobs. If there was an error while
// fetching one of blobs, sets its "result" to empty and sets its
// corresponding "status" to a non-ok status.
//
// Note:
// - The main difference between this function and MultiGetBlob is that this
// function is only used for the case where the demanded blobs are stored in
// one blob file. MultiGetBlob will call this function multiple times if the
// demanded blobs are stored in multiple blob files.
//
// - For consistency, whether the blob is found in the cache or on disk, sets
// "*bytes_read" to the total size of on-disk (possibly compressed) blob
// records.
void MultiGetBlobFromOneFile(const ReadOptions& read_options,
uint64_t file_number, uint64_t file_size,
const autovector<uint64_t>& offsets,
const autovector<uint64_t>& value_sizes, autovector<Status*>& statuses,
autovector<PinnableSlice*>& blobs, uint64_t* bytes_read);
autovector<BlobReadRequest>& blob_reqs,
uint64_t* bytes_read);
inline Status GetBlobFileReader(
uint64_t blob_file_number,

@ -561,6 +561,199 @@ TEST_F(BlobSourceTest, GetCompressedBlobs) {
}
}
TEST_F(BlobSourceTest, MultiGetBlobsFromMultiFiles) {
options_.cf_paths.emplace_back(
test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromMultiFiles"),
0);
options_.statistics = CreateDBStatistics();
Statistics* statistics = options_.statistics.get();
assert(statistics);
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options_);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
constexpr uint64_t blob_files = 2;
constexpr size_t num_blobs = 32;
std::vector<std::string> key_strs;
std::vector<std::string> blob_strs;
for (size_t i = 0; i < num_blobs; ++i) {
key_strs.push_back("key" + std::to_string(i));
blob_strs.push_back("blob" + std::to_string(i));
}
std::vector<Slice> keys;
std::vector<Slice> blobs;
uint64_t file_size = BlobLogHeader::kSize;
uint64_t blob_value_bytes = 0;
for (size_t i = 0; i < num_blobs; ++i) {
keys.push_back({key_strs[i]});
blobs.push_back({blob_strs[i]});
blob_value_bytes += blobs[i].size();
file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size();
}
file_size += BlobLogFooter::kSize;
const uint64_t blob_records_bytes =
file_size - BlobLogHeader::kSize - BlobLogFooter::kSize;
std::vector<uint64_t> blob_offsets(keys.size());
std::vector<uint64_t> blob_sizes(keys.size());
{
// Write key/blob pairs to multiple blob files.
for (size_t i = 0; i < blob_files; ++i) {
const uint64_t file_number = i + 1;
WriteBlobFile(immutable_options, column_family_id, has_ttl,
expiration_range, expiration_range, file_number, keys,
blobs, kNoCompression, blob_offsets, blob_sizes);
}
}
constexpr size_t capacity = 10;
std::shared_ptr<Cache> backing_cache =
NewLRUCache(capacity); // Blob file cache
FileOptions file_options;
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileCache> blob_file_cache(new BlobFileCache(
backing_cache.get(), &immutable_options, &file_options, column_family_id,
blob_file_read_hist, nullptr /*IOTracer*/));
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
ReadOptions read_options;
read_options.verify_checksums = true;
uint64_t bytes_read = 0;
{
// MultiGetBlob
read_options.fill_cache = true;
read_options.read_tier = ReadTier::kReadAllTier;
autovector<BlobFileReadRequests> blob_reqs;
std::array<autovector<BlobReadRequest>, blob_files> blob_reqs_in_file;
std::array<PinnableSlice, num_blobs * blob_files> value_buf;
std::array<Status, num_blobs * blob_files> statuses_buf;
for (size_t i = 0; i < blob_files; ++i) {
const uint64_t file_number = i + 1;
for (size_t j = 0; j < num_blobs; ++j) {
blob_reqs_in_file[i].emplace_back(
keys[j], blob_offsets[j], blob_sizes[j], kNoCompression,
&value_buf[i * num_blobs + j], &statuses_buf[i * num_blobs + j]);
}
blob_reqs.emplace_back(file_number, file_size, blob_reqs_in_file[i]);
}
get_perf_context()->Reset();
statistics->Reset().PermitUncheckedError();
blob_source.MultiGetBlob(read_options, blob_reqs, &bytes_read);
for (size_t i = 0; i < blob_files; ++i) {
const uint64_t file_number = i + 1;
for (size_t j = 0; j < num_blobs; ++j) {
ASSERT_OK(statuses_buf[i * num_blobs + j]);
ASSERT_EQ(value_buf[i * num_blobs + j], blobs[j]);
ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
blob_offsets[j]));
}
}
// Retrieved all blobs from 2 blob files twice via MultiGetBlob and
// TEST_BlobInCache.
ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count,
num_blobs * blob_files);
ASSERT_EQ((int)get_perf_context()->blob_read_count,
num_blobs * blob_files); // blocking i/o
ASSERT_EQ((int)get_perf_context()->blob_read_byte,
blob_records_bytes * blob_files); // blocking i/o
ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0);
ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS),
num_blobs * blob_files); // MultiGetBlob
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT),
num_blobs * blob_files); // TEST_BlobInCache
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD),
num_blobs * blob_files); // MultiGetBlob
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ),
blob_value_bytes * blob_files); // TEST_BlobInCache
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE),
blob_value_bytes * blob_files); // MultiGetBlob
get_perf_context()->Reset();
statistics->Reset().PermitUncheckedError();
autovector<BlobReadRequest> fake_blob_reqs_in_file;
std::array<PinnableSlice, num_blobs> fake_value_buf;
std::array<Status, num_blobs> fake_statuses_buf;
const uint64_t fake_file_number = 100;
for (size_t i = 0; i < num_blobs; ++i) {
fake_blob_reqs_in_file.emplace_back(
keys[i], blob_offsets[i], blob_sizes[i], kNoCompression,
&fake_value_buf[i], &fake_statuses_buf[i]);
}
// Add a fake multi-get blob request.
blob_reqs.emplace_back(fake_file_number, file_size, fake_blob_reqs_in_file);
blob_source.MultiGetBlob(read_options, blob_reqs, &bytes_read);
// Check the real blob read requests.
for (size_t i = 0; i < blob_files; ++i) {
const uint64_t file_number = i + 1;
for (size_t j = 0; j < num_blobs; ++j) {
ASSERT_OK(statuses_buf[i * num_blobs + j]);
ASSERT_EQ(value_buf[i * num_blobs + j], blobs[j]);
ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
blob_offsets[j]));
}
}
// Check the fake blob request.
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_TRUE(fake_statuses_buf[i].IsIOError());
ASSERT_TRUE(fake_value_buf[i].empty());
ASSERT_FALSE(blob_source.TEST_BlobInCache(fake_file_number, file_size,
blob_offsets[i]));
}
// Retrieved all blobs from 3 blob files (including the fake one) twice
// via MultiGetBlob and TEST_BlobInCache.
ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count,
num_blobs * blob_files * 2);
ASSERT_EQ((int)get_perf_context()->blob_read_count,
0); // blocking i/o
ASSERT_EQ((int)get_perf_context()->blob_read_byte,
0); // blocking i/o
ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0);
ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0);
// Fake blob requests: MultiGetBlob and TEST_BlobInCache
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 2);
// Real blob requests: MultiGetBlob and TEST_BlobInCache
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT),
num_blobs * blob_files * 2);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
// Real blob requests: MultiGetBlob and TEST_BlobInCache
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ),
blob_value_bytes * blob_files * 2);
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0);
}
}
TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
options_.cf_paths.emplace_back(
test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0);
@ -625,23 +818,15 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
{
// MultiGetBlob
// MultiGetBlobFromOneFile
uint64_t bytes_read = 0;
autovector<std::reference_wrapper<const Slice>> key_refs;
autovector<uint64_t> offsets;
autovector<uint64_t> sizes;
std::array<Status, num_blobs> statuses_buf;
autovector<Status*> statuses;
std::array<PinnableSlice, num_blobs> value_buf;
autovector<PinnableSlice*> values;
autovector<BlobReadRequest> blob_reqs;
for (size_t i = 0; i < num_blobs; i += 2) { // even index
key_refs.emplace_back(std::cref(keys[i]));
offsets.push_back(blob_offsets[i]);
sizes.push_back(blob_sizes[i]);
statuses.push_back(&statuses_buf[i]);
values.push_back(&value_buf[i]);
blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
}
@ -652,9 +837,8 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
statistics->Reset().PermitUncheckedError();
// Get half of blobs
blob_source.MultiGetBlob(read_options, key_refs, blob_file_number,
file_size, offsets, sizes, statuses, values,
&bytes_read);
blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number,
file_size, blob_reqs, &bytes_read);
uint64_t fs_read_bytes = 0;
uint64_t ca_read_bytes = 0;
@ -709,27 +893,19 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
blob_offsets[i]));
}
// Cache-only MultiGetBlob
// Cache-only MultiGetBlobFromOneFile
read_options.read_tier = ReadTier::kBlockCacheTier;
get_perf_context()->Reset();
statistics->Reset().PermitUncheckedError();
key_refs.clear();
offsets.clear();
sizes.clear();
statuses.clear();
values.clear();
blob_reqs.clear();
for (size_t i = 0; i < num_blobs; ++i) {
key_refs.emplace_back(std::cref(keys[i]));
offsets.push_back(blob_offsets[i]);
sizes.push_back(blob_sizes[i]);
statuses.push_back(&statuses_buf[i]);
values.push_back(&value_buf[i]);
blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
}
blob_source.MultiGetBlob(read_options, key_refs, blob_file_number,
file_size, offsets, sizes, statuses, values,
&bytes_read);
blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number,
file_size, blob_reqs, &bytes_read);
uint64_t blob_bytes = 0;
for (size_t i = 0; i < num_blobs; ++i) {
@ -759,24 +935,17 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
options_.blob_cache->EraseUnRefEntries();
{
// Cache-only MultiGetBlob
// Cache-only MultiGetBlobFromOneFile
uint64_t bytes_read = 0;
read_options.read_tier = ReadTier::kBlockCacheTier;
autovector<std::reference_wrapper<const Slice>> key_refs;
autovector<uint64_t> offsets;
autovector<uint64_t> sizes;
std::array<Status, num_blobs> statuses_buf;
autovector<Status*> statuses;
std::array<PinnableSlice, num_blobs> value_buf;
autovector<PinnableSlice*> values;
autovector<BlobReadRequest> blob_reqs;
for (size_t i = 0; i < num_blobs; i++) {
key_refs.emplace_back(std::cref(keys[i]));
offsets.push_back(blob_offsets[i]);
sizes.push_back(blob_sizes[i]);
statuses.push_back(&statuses_buf[i]);
values.push_back(&value_buf[i]);
blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
}
@ -784,9 +953,8 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
get_perf_context()->Reset();
statistics->Reset().PermitUncheckedError();
blob_source.MultiGetBlob(read_options, key_refs, blob_file_number,
file_size, offsets, sizes, statuses, values,
&bytes_read);
blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number,
file_size, blob_reqs, &bytes_read);
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_TRUE(statuses_buf[i].IsIncomplete());
@ -809,40 +977,33 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
}
{
// MultiGetBlob from non-existing file
// MultiGetBlobFromOneFile from non-existing file
uint64_t bytes_read = 0;
uint64_t file_number = 100; // non-existing file
uint64_t non_existing_file_number = 100;
read_options.read_tier = ReadTier::kReadAllTier;
autovector<std::reference_wrapper<const Slice>> key_refs;
autovector<uint64_t> offsets;
autovector<uint64_t> sizes;
std::array<Status, num_blobs> statuses_buf;
autovector<Status*> statuses;
std::array<PinnableSlice, num_blobs> value_buf;
autovector<PinnableSlice*> values;
autovector<BlobReadRequest> blob_reqs;
for (size_t i = 0; i < num_blobs; i++) {
key_refs.emplace_back(std::cref(keys[i]));
offsets.push_back(blob_offsets[i]);
sizes.push_back(blob_sizes[i]);
statuses.push_back(&statuses_buf[i]);
values.push_back(&value_buf[i]);
ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size,
blob_offsets[i]));
blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
ASSERT_FALSE(blob_source.TEST_BlobInCache(non_existing_file_number,
file_size, blob_offsets[i]));
}
get_perf_context()->Reset();
statistics->Reset().PermitUncheckedError();
blob_source.MultiGetBlob(read_options, key_refs, file_number, file_size,
offsets, sizes, statuses, values, &bytes_read);
blob_source.MultiGetBlobFromOneFile(read_options, non_existing_file_number,
file_size, blob_reqs, &bytes_read);
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_TRUE(statuses_buf[i].IsIOError());
ASSERT_TRUE(value_buf[i].empty());
ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size,
blob_offsets[i]));
ASSERT_FALSE(blob_source.TEST_BlobInCache(non_existing_file_number,
file_size, blob_offsets[i]));
}
ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0);

@ -299,6 +299,141 @@ TEST_F(DBBlobBasicTest, MultiGetBlobs) {
}
}
TEST_F(DBBlobBasicTest, MultiGetBlobsFromCache) {
Options options = GetDefaultOptions();
LRUCacheOptions co;
co.capacity = 2048;
co.num_shard_bits = 2;
co.metadata_charge_policy = kDontChargeCacheMetadata;
auto backing_cache = NewLRUCache(co);
constexpr size_t min_blob_size = 6;
options.min_blob_size = min_blob_size;
options.create_if_missing = true;
options.enable_blob_files = true;
options.blob_cache = backing_cache;
BlockBasedTableOptions block_based_options;
block_based_options.no_block_cache = false;
block_based_options.block_cache = backing_cache;
block_based_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
DestroyAndReopen(options);
// Put then retrieve three key-values. The first value is below the size limit
// and is thus stored inline; the other two are stored separately as blobs.
constexpr size_t num_keys = 3;
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "short";
static_assert(sizeof(first_value) - 1 < min_blob_size,
"first_value too long to be inlined");
ASSERT_OK(Put(first_key, first_value));
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "long_value";
static_assert(sizeof(second_value) - 1 >= min_blob_size,
"second_value too short to be stored as blob");
ASSERT_OK(Put(second_key, second_value));
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "other_long_value";
static_assert(sizeof(third_value) - 1 >= min_blob_size,
"third_value too short to be stored as blob");
ASSERT_OK(Put(third_key, third_value));
ASSERT_OK(Flush());
ReadOptions read_options;
read_options.fill_cache = false;
std::array<Slice, num_keys> keys{{first_key, second_key, third_key}};
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_value);
ASSERT_OK(statuses[1]);
ASSERT_EQ(values[1], second_value);
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], third_value);
}
// Try again with no I/O allowed. The first (inlined) value should be
// successfully read; however, the two blob values could only be read from the
// blob file, so for those the read should return Incomplete.
read_options.read_tier = kBlockCacheTier;
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_value);
ASSERT_TRUE(statuses[1].IsIncomplete());
ASSERT_TRUE(statuses[2].IsIncomplete());
}
// Fill the cache when reading blobs from the blob file.
read_options.read_tier = kReadAllTier;
read_options.fill_cache = true;
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_value);
ASSERT_OK(statuses[1]);
ASSERT_EQ(values[1], second_value);
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], third_value);
}
// Try again with no I/O allowed. All blobs should be successfully read from
// the cache.
read_options.read_tier = kBlockCacheTier;
{
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
&values[0], &statuses[0]);
ASSERT_OK(statuses[0]);
ASSERT_EQ(values[0], first_value);
ASSERT_OK(statuses[1]);
ASSERT_EQ(values[1], second_value);
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], third_value);
}
}
#ifndef ROCKSDB_LITE
TEST_F(DBBlobBasicTest, MultiGetWithDirectIO) {
Options options = GetDefaultOptions();
@ -492,8 +627,23 @@ TEST_F(DBBlobBasicTest, MultiGetWithDirectIO) {
TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
LRUCacheOptions co;
co.capacity = 2 << 20; // 2MB
co.num_shard_bits = 2;
co.metadata_charge_policy = kDontChargeCacheMetadata;
auto backing_cache = NewLRUCache(co);
options.min_blob_size = 0;
options.create_if_missing = true;
options.enable_blob_files = true;
options.blob_cache = backing_cache;
BlockBasedTableOptions block_based_options;
block_based_options.no_block_cache = false;
block_based_options.block_cache = backing_cache;
block_based_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
Reopen(options);
@ -519,9 +669,15 @@ TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) {
for (size_t i = 0; i < keys.size(); ++i) {
keys[i] = key_strs[i];
}
ReadOptions read_options;
read_options.read_tier = kReadAllTier;
read_options.fill_cache = false;
{
std::array<PinnableSlice, kNumKeys> values;
std::array<Status, kNumKeys> statuses;
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), kNumKeys, &keys[0],
db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0],
&values[0], &statuses[0]);
for (size_t i = 0; i < kNumKeys; ++i) {
@ -530,6 +686,50 @@ TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) {
}
}
read_options.read_tier = kBlockCacheTier;
{
std::array<PinnableSlice, kNumKeys> values;
std::array<Status, kNumKeys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0],
&values[0], &statuses[0]);
for (size_t i = 0; i < kNumKeys; ++i) {
ASSERT_TRUE(statuses[i].IsIncomplete());
ASSERT_TRUE(values[i].empty());
}
}
read_options.read_tier = kReadAllTier;
read_options.fill_cache = true;
{
std::array<PinnableSlice, kNumKeys> values;
std::array<Status, kNumKeys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0],
&values[0], &statuses[0]);
for (size_t i = 0; i < kNumKeys; ++i) {
ASSERT_OK(statuses[i]);
ASSERT_EQ(value_strs[i], values[i]);
}
}
read_options.read_tier = kBlockCacheTier;
{
std::array<PinnableSlice, kNumKeys> values;
std::array<Status, kNumKeys> statuses;
db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0],
&values[0], &statuses[0]);
for (size_t i = 0; i < kNumKeys; ++i) {
ASSERT_OK(statuses[i]);
ASSERT_EQ(value_strs[i], values[i]);
}
}
}
TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;

@ -1907,114 +1907,62 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
void Version::MultiGetBlob(
const ReadOptions& read_options, MultiGetRange& range,
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs) {
if (read_options.read_tier == kBlockCacheTier) {
Status s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
for (const auto& elem : blob_rqs) {
for (const auto& blob_rq : elem.second) {
const KeyContext& key_context = blob_rq.second;
assert(key_context.s);
assert(key_context.s->ok());
*(key_context.s) = s;
assert(key_context.get_context);
auto& get_context = *(key_context.get_context);
get_context.MarkKeyMayExist();
}
}
return;
}
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs) {
assert(!blob_ctxs.empty());
assert(!blob_rqs.empty());
Status status;
autovector<BlobFileReadRequests> blob_reqs;
for (auto& elem : blob_rqs) {
const uint64_t blob_file_number = elem.first;
for (auto& ctx : blob_ctxs) {
const auto file_number = ctx.first;
const auto blob_file_meta = storage_info_.GetBlobFileMetaData(file_number);
if (!storage_info_.GetBlobFileMetaData(blob_file_number)) {
auto& blobs_in_file = elem.second;
autovector<BlobReadRequest> blob_reqs_in_file;
BlobReadContexts& blobs_in_file = ctx.second;
for (const auto& blob : blobs_in_file) {
const BlobIndex& blob_index = blob.first;
const KeyContext& key_context = blob.second;
*(key_context.s) = Status::Corruption("Invalid blob file number");
}
continue;
}
CacheHandleGuard<BlobFileReader> blob_file_reader;
assert(blob_source_);
status =
blob_source_->GetBlobFileReader(blob_file_number, &blob_file_reader);
assert(!status.ok() || blob_file_reader.GetValue());
auto& blobs_in_file = elem.second;
if (!status.ok()) {
for (const auto& blob : blobs_in_file) {
const KeyContext& key_context = blob.second;
*(key_context.s) = status;
}
if (!blob_file_meta) {
*key_context.s = Status::Corruption("Invalid blob file number");
continue;
}
assert(blob_file_reader.GetValue());
const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize();
const CompressionType compression =
blob_file_reader.GetValue()->GetCompressionType();
// sort blobs_in_file by file offset.
std::sort(
blobs_in_file.begin(), blobs_in_file.end(),
[](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
assert(lhs.first.file_number() == rhs.first.file_number());
return lhs.first.offset() < rhs.first.offset();
});
autovector<std::reference_wrapper<const KeyContext>> blob_read_key_contexts;
autovector<std::reference_wrapper<const Slice>> user_keys;
autovector<uint64_t> offsets;
autovector<uint64_t> value_sizes;
autovector<Status*> statuses;
autovector<PinnableSlice*> values;
for (const auto& blob : blobs_in_file) {
const auto& blob_index = blob.first;
const KeyContext& key_context = blob.second;
if (blob_index.HasTTL() || blob_index.IsInlined()) {
*(key_context.s) =
*key_context.s =
Status::Corruption("Unexpected TTL/inlined blob index");
continue;
}
const uint64_t key_size = key_context.ukey_with_ts.size();
const uint64_t offset = blob_index.offset();
const uint64_t value_size = blob_index.size();
if (!IsValidBlobOffset(offset, key_size, value_size, file_size)) {
*(key_context.s) = Status::Corruption("Invalid blob offset");
continue;
key_context.value->Reset();
blob_reqs_in_file.emplace_back(
key_context.ukey_with_ts, blob_index.offset(), blob_index.size(),
blob_index.compression(), key_context.value, key_context.s);
}
if (blob_reqs_in_file.size() > 0) {
const auto file_size = blob_file_meta->GetBlobFileSize();
blob_reqs.emplace_back(file_number, file_size, blob_reqs_in_file);
}
if (blob_index.compression() != compression) {
*(key_context.s) =
Status::Corruption("Compression type mismatch when reading a blob");
continue;
}
blob_read_key_contexts.emplace_back(std::cref(key_context));
user_keys.emplace_back(std::cref(key_context.ukey_with_ts));
offsets.push_back(blob_index.offset());
value_sizes.push_back(blob_index.size());
statuses.push_back(key_context.s);
values.push_back(key_context.value);
}
blob_file_reader.GetValue()->MultiGetBlob(read_options, user_keys, offsets,
value_sizes, statuses, values,
/*bytes_read=*/nullptr);
size_t num = blob_read_key_contexts.size();
assert(num == user_keys.size());
assert(num == offsets.size());
assert(num == value_sizes.size());
assert(num == statuses.size());
assert(num == values.size());
for (size_t i = 0; i < num; ++i) {
if (statuses[i]->ok()) {
range.AddValueSize(blob_read_key_contexts[i].get().value->size());
if (blob_reqs.size() > 0) {
blob_source_->MultiGetBlob(read_options, blob_reqs, /*bytes_read=*/nullptr);
}
for (auto& ctx : blob_ctxs) {
BlobReadContexts& blobs_in_file = ctx.second;
for (const auto& blob : blobs_in_file) {
const KeyContext& key_context = blob.second;
if (key_context.s->ok()) {
range.AddValueSize(key_context.value->size());
if (range.GetValueSize() > read_options.value_size_soft_limit) {
*(blob_read_key_contexts[i].get().s) = Status::Aborted();
*key_context.s = Status::Aborted();
}
} else if (key_context.s->IsIncomplete()) {
// read_options.read_tier == kBlockCacheTier
// Cannot read blob(s): no disk I/O allowed
assert(key_context.get_context);
auto& get_context = *(key_context.get_context);
get_context.MarkKeyMayExist();
}
}
}
@ -2253,7 +2201,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end());
// blob_file => [[blob_idx, it], ...]
std::unordered_map<uint64_t, BlobReadRequests> blob_rqs;
std::unordered_map<uint64_t, BlobReadContexts> blob_ctxs;
int prev_level = -1;
while (!fp.IsSearchEnded()) {
@ -2270,7 +2218,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
// Call MultiGetFromSST for looking up a single file
s = MultiGetFromSST(read_options, fp.CurrentFileRange(),
fp.GetHitFileLevel(), fp.IsHitFileLastInLevel(), f,
blob_rqs, num_filter_read, num_index_read,
blob_ctxs, num_filter_read, num_index_read,
num_sst_read);
if (fp.GetHitFileLevel() == 0) {
dump_stats_for_l0_file = true;
@ -2285,7 +2233,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
while (f != nullptr) {
mget_tasks.emplace_back(MultiGetFromSSTCoroutine(
read_options, fp.CurrentFileRange(), fp.GetHitFileLevel(),
fp.IsHitFileLastInLevel(), f, blob_rqs, num_filter_read,
fp.IsHitFileLastInLevel(), f, blob_ctxs, num_filter_read,
num_index_read, num_sst_read));
if (fp.KeyMaySpanNextFile()) {
break;
@ -2358,8 +2306,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
num_level_read);
}
if (s.ok() && !blob_rqs.empty()) {
MultiGetBlob(read_options, keys_with_blobs_range, blob_rqs);
if (s.ok() && !blob_ctxs.empty()) {
MultiGetBlob(read_options, keys_with_blobs_range, blob_ctxs);
}
// Process any left over keys

@ -860,11 +860,11 @@ class Version {
FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value,
uint64_t* bytes_read) const;
using BlobReadRequest =
using BlobReadContext =
std::pair<BlobIndex, std::reference_wrapper<const KeyContext>>;
using BlobReadRequests = std::vector<BlobReadRequest>;
using BlobReadContexts = std::vector<BlobReadContext>;
void MultiGetBlob(const ReadOptions& read_options, MultiGetRange& range,
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs);
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs);
// Loads some stats information from files (if update_stats is set) and
// populates derived data structures. Call without mutex held. It needs to be
@ -989,7 +989,7 @@ class Version {
/* ret_type */ Status, /* func_name */ MultiGetFromSST,
const ReadOptions& read_options, MultiGetRange file_range,
int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f,
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs,
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs,
uint64_t& num_filter_read, uint64_t& num_index_read,
uint64_t& num_sst_read);

@ -14,7 +14,7 @@ namespace ROCKSDB_NAMESPACE {
DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
(const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level,
bool is_hit_file_last_in_level, FdWithKeyRange* f,
std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs,
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs,
uint64_t& num_filter_read, uint64_t& num_index_read, uint64_t& num_sst_read) {
bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
get_perf_context()->per_level_perf_context_enabled;
@ -110,7 +110,7 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
if (tmp_s.ok()) {
const uint64_t blob_file_num = blob_index.file_number();
blob_rqs[blob_file_num].emplace_back(
blob_ctxs[blob_file_num].emplace_back(
std::make_pair(blob_index, std::cref(*iter)));
} else {
*(iter->s) = tmp_s;

Loading…
Cancel
Save