Eliminate some allocations/copies around the blob cache (#10647)

Summary:
Historically, `BlobFileReader` has returned the blob(s) read from the file
in the `PinnableSlice` provided by the client. This interface was
preserved when caching was implemented for blobs, which meant that
the blob data was copied multiple times when caching was in use: first,
into the client-provided `PinnableSlice` (by `BlobFileReader::SaveValue`),
and then, into the object stored in the cache (by `BlobSource::PutBlobIntoCache`).
The patch eliminates these copies and the related allocations by changing
`BlobFileReader` so it returns its results in the form of heap-allocated `BlobContents`
objects that can be directly inserted into the cache. The allocations backing
these `BlobContents` objects are made using the blob cache's allocator if the
blobs are to be inserted into the cache (i.e. if a cache is configured and the
`fill_cache` read option is set). Note: this PR focuses on the common case when
blobs are compressed; some further small optimizations are possible for uncompressed
blobs.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10647

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D39335185

Pulled By: ltamasi

fbshipit-source-id: 464503d60a5520d654c8273ffb8efd5d1bcd7b36
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent 6de7081cf3
commit fe56cb9aa0
  1. 123
      db/blob/blob_file_reader.cc
  2. 19
      db/blob/blob_file_reader.h
  3. 154
      db/blob/blob_file_reader_test.cc
  4. 126
      db/blob/blob_source.cc
  5. 10
      db/blob/blob_source.h
  6. 2
      db/blob/blob_source_test.cc

@ -8,6 +8,7 @@
#include <cassert>
#include <string>
#include "db/blob/blob_contents.h"
#include "db/blob/blob_log_format.h"
#include "file/file_prefetch_buffer.h"
#include "file/filename.h"
@ -283,14 +284,12 @@ BlobFileReader::BlobFileReader(
BlobFileReader::~BlobFileReader() = default;
Status BlobFileReader::GetBlob(const ReadOptions& read_options,
const Slice& user_key, uint64_t offset,
uint64_t value_size,
CompressionType compression_type,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* value,
uint64_t* bytes_read) const {
assert(value);
Status BlobFileReader::GetBlob(
const ReadOptions& read_options, const Slice& user_key, uint64_t offset,
uint64_t value_size, CompressionType compression_type,
FilePrefetchBuffer* prefetch_buffer, MemoryAllocator* allocator,
std::unique_ptr<BlobContents>* result, uint64_t* bytes_read) const {
assert(result);
const uint64_t key_size = user_key.size();
@ -361,8 +360,8 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
const Slice value_slice(record_slice.data() + adjustment, value_size);
{
const Status s = UncompressBlobIfNeeded(value_slice, compression_type,
clock_, statistics_, value);
const Status s = UncompressBlobIfNeeded(
value_slice, compression_type, allocator, clock_, statistics_, result);
if (!s.ok()) {
return s;
}
@ -375,16 +374,18 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
return Status::OK();
}
void BlobFileReader::MultiGetBlob(const ReadOptions& read_options,
autovector<BlobReadRequest*>& blob_reqs,
uint64_t* bytes_read) const {
void BlobFileReader::MultiGetBlob(
const ReadOptions& read_options, MemoryAllocator* allocator,
autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>&
blob_reqs,
uint64_t* bytes_read) const {
const size_t num_blobs = blob_reqs.size();
assert(num_blobs > 0);
assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);
#ifndef NDEBUG
for (size_t i = 0; i < num_blobs - 1; ++i) {
assert(blob_reqs[i]->offset <= blob_reqs[i + 1]->offset);
assert(blob_reqs[i].first->offset <= blob_reqs[i + 1].first->offset);
}
#endif // !NDEBUG
@ -393,16 +394,21 @@ void BlobFileReader::MultiGetBlob(const ReadOptions& read_options,
uint64_t total_len = 0;
read_reqs.reserve(num_blobs);
for (size_t i = 0; i < num_blobs; ++i) {
const size_t key_size = blob_reqs[i]->user_key->size();
const uint64_t offset = blob_reqs[i]->offset;
const uint64_t value_size = blob_reqs[i]->len;
BlobReadRequest* const req = blob_reqs[i].first;
assert(req);
assert(req->user_key);
assert(req->status);
const size_t key_size = req->user_key->size();
const uint64_t offset = req->offset;
const uint64_t value_size = req->len;
if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
*blob_reqs[i]->status = Status::Corruption("Invalid blob offset");
*req->status = Status::Corruption("Invalid blob offset");
continue;
}
if (blob_reqs[i]->compression != compression_type_) {
*blob_reqs[i]->status =
if (req->compression != compression_type_) {
*req->status =
Status::Corruption("Compression type mismatch when reading a blob");
continue;
}
@ -411,12 +417,12 @@ void BlobFileReader::MultiGetBlob(const ReadOptions& read_options,
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
: 0;
assert(blob_reqs[i]->offset >= adjustment);
assert(req->offset >= adjustment);
adjustments.push_back(adjustment);
FSReadRequest read_req = {};
read_req.offset = blob_reqs[i]->offset - adjustment;
read_req.len = blob_reqs[i]->len + adjustment;
read_req.offset = req->offset - adjustment;
read_req.len = req->len + adjustment;
read_reqs.emplace_back(read_req);
total_len += read_req.len;
}
@ -450,8 +456,11 @@ void BlobFileReader::MultiGetBlob(const ReadOptions& read_options,
for (auto& req : read_reqs) {
req.status.PermitUncheckedError();
}
for (auto& req : blob_reqs) {
for (auto& blob_req : blob_reqs) {
BlobReadRequest* const req = blob_req.first;
assert(req);
assert(req->status);
if (!req->status->IsCorruption()) {
// Avoid overwriting corruption status.
*req->status = s;
@ -464,38 +473,42 @@ void BlobFileReader::MultiGetBlob(const ReadOptions& read_options,
uint64_t total_bytes = 0;
for (size_t i = 0, j = 0; i < num_blobs; ++i) {
assert(blob_reqs[i]->status);
if (!blob_reqs[i]->status->ok()) {
BlobReadRequest* const req = blob_reqs[i].first;
assert(req);
assert(req->user_key);
assert(req->status);
if (!req->status->ok()) {
continue;
}
assert(j < read_reqs.size());
auto& req = read_reqs[j++];
const auto& record_slice = req.result;
if (req.status.ok() && record_slice.size() != req.len) {
req.status = IOStatus::Corruption("Failed to read data from blob file");
auto& read_req = read_reqs[j++];
const auto& record_slice = read_req.result;
if (read_req.status.ok() && record_slice.size() != read_req.len) {
read_req.status =
IOStatus::Corruption("Failed to read data from blob file");
}
*blob_reqs[i]->status = req.status;
if (!blob_reqs[i]->status->ok()) {
*req->status = read_req.status;
if (!req->status->ok()) {
continue;
}
// Verify checksums if enabled
if (read_options.verify_checksums) {
*blob_reqs[i]->status =
VerifyBlob(record_slice, *blob_reqs[i]->user_key, blob_reqs[i]->len);
if (!blob_reqs[i]->status->ok()) {
*req->status = VerifyBlob(record_slice, *req->user_key, req->len);
if (!req->status->ok()) {
continue;
}
}
// Uncompress blob if needed
Slice value_slice(record_slice.data() + adjustments[i], blob_reqs[i]->len);
*blob_reqs[i]->status =
UncompressBlobIfNeeded(value_slice, compression_type_, clock_,
statistics_, blob_reqs[i]->result);
if (blob_reqs[i]->status->ok()) {
Slice value_slice(record_slice.data() + adjustments[i], req->len);
*req->status =
UncompressBlobIfNeeded(value_slice, compression_type_, allocator,
clock_, statistics_, &blob_reqs[i].second);
if (req->status->ok()) {
total_bytes += record_slice.size();
}
}
@ -549,15 +562,18 @@ Status BlobFileReader::VerifyBlob(const Slice& record_slice,
return Status::OK();
}
Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice,
CompressionType compression_type,
SystemClock* clock,
Statistics* statistics,
PinnableSlice* value) {
assert(value);
Status BlobFileReader::UncompressBlobIfNeeded(
const Slice& value_slice, CompressionType compression_type,
MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics,
std::unique_ptr<BlobContents>* result) {
assert(result);
if (compression_type == kNoCompression) {
SaveValue(value_slice, value);
CacheAllocationPtr allocation =
AllocateBlock(value_slice.size(), allocator);
memcpy(allocation.get(), value_slice.data(), value_slice.size());
*result = BlobContents::Create(std::move(allocation), value_slice.size());
return Status::OK();
}
@ -568,7 +584,6 @@ Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice,
size_t uncompressed_size = 0;
constexpr uint32_t compression_format_version = 2;
constexpr MemoryAllocator* allocator = nullptr;
CacheAllocationPtr output;
@ -587,19 +602,9 @@ Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice,
return Status::Corruption("Unable to uncompress blob");
}
SaveValue(Slice(output.get(), uncompressed_size), value);
*result = BlobContents::Create(std::move(output), uncompressed_size);
return Status::OK();
}
void BlobFileReader::SaveValue(const Slice& src, PinnableSlice* dst) {
assert(dst);
if (dst->IsPinned()) {
dst->Reset();
}
dst->PinSelf(src);
}
} // namespace ROCKSDB_NAMESPACE

@ -23,7 +23,7 @@ class HistogramImpl;
struct ReadOptions;
class Slice;
class FilePrefetchBuffer;
class PinnableSlice;
class BlobContents;
class Statistics;
class BlobFileReader {
@ -44,13 +44,17 @@ class BlobFileReader {
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
uint64_t offset, uint64_t value_size,
CompressionType compression_type,
FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value,
FilePrefetchBuffer* prefetch_buffer,
MemoryAllocator* allocator,
std::unique_ptr<BlobContents>* result,
uint64_t* bytes_read) const;
// offsets must be sorted in ascending order by caller.
void MultiGetBlob(const ReadOptions& read_options,
autovector<BlobReadRequest*>& blob_reqs,
uint64_t* bytes_read) const;
void MultiGetBlob(
const ReadOptions& read_options, MemoryAllocator* allocator,
autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>&
blob_reqs,
uint64_t* bytes_read) const;
CompressionType GetCompressionType() const { return compression_type_; }
@ -89,11 +93,10 @@ class BlobFileReader {
static Status UncompressBlobIfNeeded(const Slice& value_slice,
CompressionType compression_type,
MemoryAllocator* allocator,
SystemClock* clock,
Statistics* statistics,
PinnableSlice* value);
static void SaveValue(const Slice& src, PinnableSlice* dst);
std::unique_ptr<BlobContents>* result);
std::unique_ptr<RandomAccessFileReader> file_reader_;
uint64_t file_size_;

@ -8,6 +8,7 @@
#include <cassert>
#include <string>
#include "db/blob/blob_contents.h"
#include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_writer.h"
#include "env/mock_env.h"
@ -180,15 +181,17 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
read_options.verify_checksums = false;
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
constexpr MemoryAllocator* allocator = nullptr;
{
PinnableSlice value;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, keys[0], blob_offsets[0],
blob_sizes[0], kNoCompression, prefetch_buffer,
&value, &bytes_read));
ASSERT_EQ(value, blobs[0]);
allocator, &value, &bytes_read));
ASSERT_NE(value, nullptr);
ASSERT_EQ(value->data(), blobs[0]);
ASSERT_EQ(bytes_read, blob_sizes[0]);
// MultiGetBlob
@ -196,22 +199,25 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
size_t total_size = 0;
std::array<Status, num_blobs> statuses_buf;
std::array<PinnableSlice, num_blobs> value_buf;
std::array<BlobReadRequest, num_blobs> requests_buf;
autovector<BlobReadRequest*> blob_reqs;
autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
blob_reqs;
for (size_t i = 0; i < num_blobs; ++i) {
requests_buf[i] =
BlobReadRequest(keys[i], blob_offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
blob_reqs.push_back(&requests_buf[i]);
kNoCompression, nullptr, &statuses_buf[i]);
blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
}
reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);
reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
for (size_t i = 0; i < num_blobs; ++i) {
const auto& result = blob_reqs[i].second;
ASSERT_OK(statuses_buf[i]);
ASSERT_EQ(value_buf[i], blobs[i]);
ASSERT_NE(result, nullptr);
ASSERT_EQ(result->data(), blobs[i]);
total_size += blob_sizes[i];
}
ASSERT_EQ(bytes_read, total_size);
@ -220,13 +226,14 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
read_options.verify_checksums = true;
{
PinnableSlice value;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, keys[1], blob_offsets[1],
blob_sizes[1], kNoCompression, prefetch_buffer,
&value, &bytes_read));
ASSERT_EQ(value, blobs[1]);
allocator, &value, &bytes_read));
ASSERT_NE(value, nullptr);
ASSERT_EQ(value->data(), blobs[1]);
const uint64_t key_size = keys[1].size();
ASSERT_EQ(bytes_read,
@ -236,47 +243,50 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
// Invalid offset (too close to start of file)
{
PinnableSlice value;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(read_options, keys[0], blob_offsets[0] - 1,
blob_sizes[0], kNoCompression, prefetch_buffer,
&value, &bytes_read)
allocator, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(value, nullptr);
ASSERT_EQ(bytes_read, 0);
}
// Invalid offset (too close to end of file)
{
PinnableSlice value;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(read_options, keys[2], blob_offsets[2] + 1,
blob_sizes[2], kNoCompression, prefetch_buffer,
&value, &bytes_read)
allocator, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(value, nullptr);
ASSERT_EQ(bytes_read, 0);
}
// Incorrect compression type
{
PinnableSlice value;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(read_options, keys[0], blob_offsets[0],
blob_sizes[0], kZSTD, prefetch_buffer, &value,
&bytes_read)
blob_sizes[0], kZSTD, prefetch_buffer, allocator,
&value, &bytes_read)
.IsCorruption());
ASSERT_EQ(value, nullptr);
ASSERT_EQ(bytes_read, 0);
}
// Incorrect key size
{
constexpr char shorter_key[] = "k";
PinnableSlice value;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
@ -284,8 +294,9 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
blob_offsets[0] -
(keys[0].size() - sizeof(shorter_key) + 1),
blob_sizes[0], kNoCompression, prefetch_buffer,
&value, &bytes_read)
allocator, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(value, nullptr);
ASSERT_EQ(bytes_read, 0);
// MultiGetBlob
@ -302,18 +313,18 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
blob_offsets[2]};
std::array<Status, num_blobs> statuses_buf;
std::array<PinnableSlice, num_blobs> value_buf;
std::array<BlobReadRequest, num_blobs> requests_buf;
autovector<BlobReadRequest*> blob_reqs;
autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
blob_reqs;
for (size_t i = 0; i < num_blobs; ++i) {
requests_buf[i] =
BlobReadRequest(key_refs[i], offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
blob_reqs.push_back(&requests_buf[i]);
kNoCompression, nullptr, &statuses_buf[i]);
blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
}
reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);
reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
for (size_t i = 0; i < num_blobs; ++i) {
if (i == 1) {
@ -327,14 +338,15 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
// Incorrect key
{
constexpr char incorrect_key[] = "foo1";
PinnableSlice value;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(read_options, incorrect_key, blob_offsets[0],
blob_sizes[0], kNoCompression, prefetch_buffer,
&value, &bytes_read)
allocator, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(value, nullptr);
ASSERT_EQ(bytes_read, 0);
// MultiGetBlob
@ -346,19 +358,18 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
key_refs[2] = std::cref(wrong_key_slice);
std::array<Status, num_blobs> statuses_buf;
std::array<PinnableSlice, num_blobs> value_buf;
std::array<BlobReadRequest, num_blobs> requests_buf;
autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
blob_reqs;
for (size_t i = 0; i < num_blobs; ++i) {
requests_buf[i] =
BlobReadRequest(key_refs[i], blob_offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
kNoCompression, nullptr, &statuses_buf[i]);
blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
}
autovector<BlobReadRequest*> blob_reqs = {
&requests_buf[0], &requests_buf[1], &requests_buf[2]};
reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);
reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
for (size_t i = 0; i < num_blobs; ++i) {
if (i == num_blobs - 1) {
@ -371,14 +382,15 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
// Incorrect value size
{
PinnableSlice value;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(read_options, keys[1], blob_offsets[1],
blob_sizes[1] + 1, kNoCompression,
prefetch_buffer, &value, &bytes_read)
prefetch_buffer, allocator, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(value, nullptr);
ASSERT_EQ(bytes_read, 0);
// MultiGetBlob
@ -388,23 +400,26 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
}
std::array<Status, num_blobs> statuses_buf;
std::array<PinnableSlice, num_blobs> value_buf;
std::array<BlobReadRequest, num_blobs> requests_buf;
requests_buf[0] =
BlobReadRequest(key_refs[0], blob_offsets[0], blob_sizes[0],
kNoCompression, &value_buf[0], &statuses_buf[0]);
kNoCompression, nullptr, &statuses_buf[0]);
requests_buf[1] =
BlobReadRequest(key_refs[1], blob_offsets[1], blob_sizes[1] + 1,
kNoCompression, &value_buf[1], &statuses_buf[1]);
kNoCompression, nullptr, &statuses_buf[1]);
requests_buf[2] =
BlobReadRequest(key_refs[2], blob_offsets[2], blob_sizes[2],
kNoCompression, &value_buf[2], &statuses_buf[2]);
kNoCompression, nullptr, &statuses_buf[2]);
autovector<BlobReadRequest*> blob_reqs = {
&requests_buf[0], &requests_buf[1], &requests_buf[2]};
autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
blob_reqs;
reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);
for (size_t i = 0; i < num_blobs; ++i) {
blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
}
reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
for (size_t i = 0; i < num_blobs; ++i) {
if (i != 1) {
@ -665,14 +680,17 @@ TEST_F(BlobFileReaderTest, BlobCRCError) {
SyncPoint::GetInstance()->EnableProcessing();
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
PinnableSlice value;
constexpr MemoryAllocator* allocator = nullptr;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, prefetch_buffer, &value,
kNoCompression, prefetch_buffer, allocator, &value,
&bytes_read)
.IsCorruption());
ASSERT_EQ(value, nullptr);
ASSERT_EQ(bytes_read, 0);
SyncPoint::GetInstance()->DisableProcessing();
@ -720,28 +738,31 @@ TEST_F(BlobFileReaderTest, Compression) {
read_options.verify_checksums = false;
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
constexpr MemoryAllocator* allocator = nullptr;
{
PinnableSlice value;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kSnappyCompression, prefetch_buffer, &value,
&bytes_read));
ASSERT_EQ(value, blob);
kSnappyCompression, prefetch_buffer, allocator,
&value, &bytes_read));
ASSERT_NE(value, nullptr);
ASSERT_EQ(value->data(), blob);
ASSERT_EQ(bytes_read, blob_size);
}
read_options.verify_checksums = true;
{
PinnableSlice value;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kSnappyCompression, prefetch_buffer, &value,
&bytes_read));
ASSERT_EQ(value, blob);
kSnappyCompression, prefetch_buffer, allocator,
&value, &bytes_read));
ASSERT_NE(value, nullptr);
ASSERT_EQ(value->data(), blob);
constexpr uint64_t key_size = sizeof(key) - 1;
ASSERT_EQ(bytes_read,
@ -799,14 +820,17 @@ TEST_F(BlobFileReaderTest, UncompressionError) {
SyncPoint::GetInstance()->EnableProcessing();
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
PinnableSlice value;
constexpr MemoryAllocator* allocator = nullptr;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kSnappyCompression, prefetch_buffer, &value,
&bytes_read)
kSnappyCompression, prefetch_buffer, allocator,
&value, &bytes_read)
.IsCorruption());
ASSERT_EQ(value, nullptr);
ASSERT_EQ(bytes_read, 0);
SyncPoint::GetInstance()->DisableProcessing();
@ -885,14 +909,17 @@ TEST_P(BlobFileReaderIOErrorTest, IOError) {
ASSERT_OK(s);
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
PinnableSlice value;
constexpr MemoryAllocator* allocator = nullptr;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, prefetch_buffer, &value,
&bytes_read)
kNoCompression, prefetch_buffer, allocator,
&value, &bytes_read)
.IsIOError());
ASSERT_EQ(value, nullptr);
ASSERT_EQ(bytes_read, 0);
}
@ -970,14 +997,17 @@ TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) {
ASSERT_OK(s);
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
PinnableSlice value;
constexpr MemoryAllocator* allocator = nullptr;
std::unique_ptr<BlobContents> value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, prefetch_buffer, &value,
&bytes_read)
kNoCompression, prefetch_buffer, allocator,
&value, &bytes_read)
.IsCorruption());
ASSERT_EQ(value, nullptr);
ASSERT_EQ(bytes_read, 0);
}

@ -45,61 +45,59 @@ BlobSource::BlobSource(const ImmutableOptions* immutable_options,
BlobSource::~BlobSource() = default;
Status BlobSource::GetBlobFromCache(
const Slice& cache_key, CacheHandleGuard<BlobContents>* blob) const {
assert(blob);
assert(blob->IsEmpty());
const Slice& cache_key, CacheHandleGuard<BlobContents>* cached_blob) const {
assert(blob_cache_);
assert(!cache_key.empty());
assert(cached_blob);
assert(cached_blob->IsEmpty());
Cache::Handle* cache_handle = nullptr;
cache_handle = GetEntryFromCache(cache_key);
if (cache_handle != nullptr) {
*blob = CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
*cached_blob =
CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
assert(cached_blob->GetValue());
PERF_COUNTER_ADD(blob_cache_hit_count, 1);
RecordTick(statistics_, BLOB_DB_CACHE_HIT);
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ, blob->GetValue()->size());
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ,
cached_blob->GetValue()->size());
return Status::OK();
}
assert(blob->IsEmpty());
RecordTick(statistics_, BLOB_DB_CACHE_MISS);
return Status::NotFound("Blob not found in cache");
}
Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
CacheHandleGuard<BlobContents>* cached_blob,
PinnableSlice* blob) const {
assert(blob);
assert(!cache_key.empty());
Status BlobSource::PutBlobIntoCache(
const Slice& cache_key, std::unique_ptr<BlobContents>* blob,
CacheHandleGuard<BlobContents>* cached_blob) const {
assert(blob_cache_);
Status s;
const Cache::Priority priority = Cache::Priority::BOTTOM;
// Objects to be put into the cache have to be heap-allocated and
// self-contained, i.e. own their contents. The Cache has to be able to take
// unique ownership of them.
CacheAllocationPtr allocation =
AllocateBlock(blob->size(), blob_cache_->memory_allocator());
memcpy(allocation.get(), blob->data(), blob->size());
std::unique_ptr<BlobContents> buf =
BlobContents::Create(std::move(allocation), blob->size());
assert(!cache_key.empty());
assert(blob);
assert(*blob);
assert(cached_blob);
assert(cached_blob->IsEmpty());
Cache::Handle* cache_handle = nullptr;
s = InsertEntryIntoCache(cache_key, buf.get(), buf->ApproximateMemoryUsage(),
&cache_handle, priority);
const Status s = InsertEntryIntoCache(cache_key, blob->get(),
(*blob)->ApproximateMemoryUsage(),
&cache_handle, Cache::Priority::BOTTOM);
if (s.ok()) {
buf.release();
blob->release();
assert(cache_handle != nullptr);
*cached_blob =
CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
assert(cached_blob->GetValue());
RecordTick(statistics_, BLOB_DB_CACHE_ADD);
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE, blob->size());
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE,
cached_blob->GetValue()->size());
} else {
RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES);
@ -149,6 +147,24 @@ void BlobSource::PinCachedBlob(CacheHandleGuard<BlobContents>* cached_blob,
cached_blob->TransferTo(value);
}
void BlobSource::PinOwnedBlob(std::unique_ptr<BlobContents>* owned_blob,
PinnableSlice* value) {
assert(owned_blob);
assert(*owned_blob);
assert(value);
BlobContents* const blob = owned_blob->release();
assert(blob);
value->Reset();
value->PinSlice(
blob->data(),
[](void* arg1, void* /* arg2 */) {
delete static_cast<BlobContents*>(arg1);
},
blob, nullptr);
}
Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value,
size_t charge,
Cache::Handle** cache_handle,
@ -191,7 +207,7 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
if (blob_cache_) {
Slice key = cache_key.AsSlice();
s = GetBlobFromCache(key, &blob_handle);
if (s.ok() && blob_handle.GetValue()) {
if (s.ok()) {
PinCachedBlob(&blob_handle, value);
// For consistency, the size of on-disk (possibly compressed) blob record
@ -221,6 +237,8 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
// Can't find the blob from the cache. Since I/O is allowed, read from the
// file.
std::unique_ptr<BlobContents> blob_contents;
{
CacheHandleGuard<BlobFileReader> blob_file_reader;
s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
@ -234,10 +252,14 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
return Status::Corruption("Compression type mismatch when reading blob");
}
MemoryAllocator* const allocator = (blob_cache_ && read_options.fill_cache)
? blob_cache_->memory_allocator()
: nullptr;
uint64_t read_size = 0;
s = blob_file_reader.GetValue()->GetBlob(
read_options, user_key, offset, value_size, compression_type,
prefetch_buffer, value, &read_size);
prefetch_buffer, allocator, &blob_contents, &read_size);
if (!s.ok()) {
return s;
}
@ -250,12 +272,14 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
// If filling cache is allowed and a cache is configured, try to put the
// blob to the cache.
Slice key = cache_key.AsSlice();
s = PutBlobIntoCache(key, &blob_handle, value);
s = PutBlobIntoCache(key, &blob_contents, &blob_handle);
if (!s.ok()) {
return s;
}
PinCachedBlob(&blob_handle, value);
} else {
PinOwnedBlob(&blob_contents, value);
}
assert(s.ok());
@ -321,7 +345,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
const Status s = GetBlobFromCache(key, &blob_handle);
if (s.ok() && blob_handle.GetValue()) {
if (s.ok()) {
assert(req.status);
*req.status = s;
@ -356,8 +380,10 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
if (no_io) {
for (size_t i = 0; i < num_blobs; ++i) {
if (!(cache_hit_mask & (Mask{1} << i))) {
assert(blob_reqs[i].status);
*blob_reqs[i].status =
BlobReadRequest& req = blob_reqs[i];
assert(req.status);
*req.status =
Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
}
}
@ -366,12 +392,13 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
{
// Find the rest of blobs from the file since I/O is allowed.
autovector<BlobReadRequest*> _blob_reqs;
autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
_blob_reqs;
uint64_t _bytes_read = 0;
for (size_t i = 0; i < num_blobs; ++i) {
if (!(cache_hit_mask & (Mask{1} << i))) {
_blob_reqs.push_back(&blob_reqs[i]);
_blob_reqs.emplace_back(&blob_reqs[i], std::unique_ptr<BlobContents>());
}
}
@ -380,28 +407,35 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
if (!s.ok()) {
for (size_t i = 0; i < _blob_reqs.size(); ++i) {
assert(_blob_reqs[i]->status);
*_blob_reqs[i]->status = s;
BlobReadRequest* const req = _blob_reqs[i].first;
assert(req);
assert(req->status);
*req->status = s;
}
return;
}
assert(blob_file_reader.GetValue());
blob_file_reader.GetValue()->MultiGetBlob(read_options, _blob_reqs,
&_bytes_read);
MemoryAllocator* const allocator = (blob_cache_ && read_options.fill_cache)
? blob_cache_->memory_allocator()
: nullptr;
blob_file_reader.GetValue()->MultiGetBlob(read_options, allocator,
_blob_reqs, &_bytes_read);
if (blob_cache_ && read_options.fill_cache) {
// If filling cache is allowed and a cache is configured, try to put
// the blob(s) to the cache.
for (BlobReadRequest* req : _blob_reqs) {
for (auto& [req, blob_contents] : _blob_reqs) {
assert(req);
if (req->status->ok()) {
CacheHandleGuard<BlobContents> blob_handle;
const CacheKey cache_key = base_cache_key.WithOffset(req->offset);
const Slice key = cache_key.AsSlice();
s = PutBlobIntoCache(key, &blob_handle, req->result);
s = PutBlobIntoCache(key, &blob_contents, &blob_handle);
if (!s.ok()) {
*req->status = s;
} else {
@ -409,6 +443,14 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
}
}
}
} else {
for (auto& [req, blob_contents] : _blob_reqs) {
assert(req);
if (req->status->ok()) {
PinOwnedBlob(&blob_contents, req->result);
}
}
}
total_bytes += _bytes_read;

@ -6,6 +6,7 @@
#pragma once
#include <cinttypes>
#include <memory>
#include "cache/cache_helpers.h"
#include "cache/cache_key.h"
@ -107,15 +108,18 @@ class BlobSource {
private:
Status GetBlobFromCache(const Slice& cache_key,
CacheHandleGuard<BlobContents>* blob) const;
CacheHandleGuard<BlobContents>* cached_blob) const;
Status PutBlobIntoCache(const Slice& cache_key,
CacheHandleGuard<BlobContents>* cached_blob,
PinnableSlice* blob) const;
std::unique_ptr<BlobContents>* blob,
CacheHandleGuard<BlobContents>* cached_blob) const;
static void PinCachedBlob(CacheHandleGuard<BlobContents>* cached_blob,
PinnableSlice* value);
static void PinOwnedBlob(std::unique_ptr<BlobContents>* owned_blob,
PinnableSlice* value);
Cache::Handle* GetEntryFromCache(const Slice& key) const;
Status InsertEntryIntoCache(const Slice& key, BlobContents* value,

@ -219,7 +219,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
kNoCompression, prefetch_buffer, &values[i],
&bytes_read));
ASSERT_EQ(values[i], blobs[i]);
ASSERT_FALSE(values[i].IsPinned());
ASSERT_TRUE(values[i].IsPinned());
ASSERT_EQ(bytes_read,
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);

Loading…
Cancel
Save