You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
rocksdb/db/blob/blob_source.cc

446 lines
14 KiB

// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_source.h"
#include <cassert>
#include <string>
#include "cache/cache_reservation_manager.h"
#include "cache/charged_cache.h"
#include "db/blob/blob_contents.h"
#include "db/blob/blob_file_reader.h"
#include "db/blob/blob_log_format.h"
#include "monitoring/statistics.h"
#include "options/cf_options.h"
#include "table/get_context.h"
#include "table/multiget_context.h"
namespace ROCKSDB_NAMESPACE {
BlobSource::BlobSource(const ImmutableOptions* immutable_options,
const std::string& db_id,
const std::string& db_session_id,
BlobFileCache* blob_file_cache)
: db_id_(db_id),
db_session_id_(db_session_id),
statistics_(immutable_options->statistics.get()),
blob_file_cache_(blob_file_cache),
blob_cache_(immutable_options->blob_cache),
lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) {
#ifndef ROCKSDB_LITE
auto bbto =
immutable_options->table_factory->GetOptions<BlockBasedTableOptions>();
if (bbto &&
bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache)
.charged == CacheEntryRoleOptions::Decision::kEnabled) {
blob_cache_ = std::make_shared<ChargedCache>(immutable_options->blob_cache,
bbto->block_cache);
}
#endif // ROCKSDB_LITE
}
BlobSource::~BlobSource() = default;
Status BlobSource::GetBlobFromCache(
const Slice& cache_key, CacheHandleGuard<BlobContents>* blob) const {
assert(blob);
assert(blob->IsEmpty());
assert(blob_cache_);
assert(!cache_key.empty());
Cache::Handle* cache_handle = nullptr;
cache_handle = GetEntryFromCache(cache_key);
if (cache_handle != nullptr) {
*blob = CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
PERF_COUNTER_ADD(blob_cache_hit_count, 1);
RecordTick(statistics_, BLOB_DB_CACHE_HIT);
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ, blob->GetValue()->size());
return Status::OK();
}
assert(blob->IsEmpty());
RecordTick(statistics_, BLOB_DB_CACHE_MISS);
return Status::NotFound("Blob not found in cache");
}
Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
CacheHandleGuard<BlobContents>* cached_blob,
PinnableSlice* blob) const {
assert(blob);
assert(!cache_key.empty());
assert(blob_cache_);
Status s;
const Cache::Priority priority = Cache::Priority::BOTTOM;
// Objects to be put into the cache have to be heap-allocated and
// self-contained, i.e. own their contents. The Cache has to be able to take
// unique ownership of them.
CacheAllocationPtr allocation =
AllocateBlock(blob->size(), blob_cache_->memory_allocator());
memcpy(allocation.get(), blob->data(), blob->size());
std::unique_ptr<BlobContents> buf =
BlobContents::Create(std::move(allocation), blob->size());
Cache::Handle* cache_handle = nullptr;
s = InsertEntryIntoCache(cache_key, buf.get(), buf->ApproximateMemoryUsage(),
&cache_handle, priority);
if (s.ok()) {
buf.release();
assert(cache_handle != nullptr);
*cached_blob =
CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
RecordTick(statistics_, BLOB_DB_CACHE_ADD);
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE, blob->size());
} else {
RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES);
}
return s;
}
Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const {
Cache::Handle* cache_handle = nullptr;
if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) {
Cache::CreateCallback create_cb =
[allocator = blob_cache_->memory_allocator()](
const void* buf, size_t size, void** out_obj,
size_t* charge) -> Status {
return BlobContents::CreateCallback(AllocateBlock(size, allocator), buf,
size, out_obj, charge);
};
cache_handle = blob_cache_->Lookup(key, BlobContents::GetCacheItemHelper(),
create_cb, Cache::Priority::BOTTOM,
true /* wait_for_cache */, statistics_);
} else {
cache_handle = blob_cache_->Lookup(key, statistics_);
}
return cache_handle;
}
void BlobSource::PinCachedBlob(CacheHandleGuard<BlobContents>* cached_blob,
PinnableSlice* value) {
assert(cached_blob);
assert(cached_blob->GetValue());
assert(value);
// To avoid copying the cached blob into the buffer provided by the
// application, we can simply transfer ownership of the cache handle to
// the target PinnableSlice. This has the potential to save a lot of
// CPU, especially with large blob values.
value->Reset();
constexpr Cleanable* cleanable = nullptr;
value->PinSlice(cached_blob->GetValue()->data(), cleanable);
cached_blob->TransferTo(value);
}
Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value,
size_t charge,
Cache::Handle** cache_handle,
Cache::Priority priority) const {
Status s;
Cache::CacheItemHelper* const cache_item_helper =
BlobContents::GetCacheItemHelper();
assert(cache_item_helper);
if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) {
s = blob_cache_->Insert(key, value, cache_item_helper, charge, cache_handle,
priority);
} else {
s = blob_cache_->Insert(key, value, charge, cache_item_helper->del_cb,
cache_handle, priority);
}
return s;
}
Status BlobSource::GetBlob(const ReadOptions& read_options,
const Slice& user_key, uint64_t file_number,
uint64_t offset, uint64_t file_size,
uint64_t value_size,
CompressionType compression_type,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* value, uint64_t* bytes_read) {
assert(value);
Status s;
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
CacheHandleGuard<BlobContents> blob_handle;
// First, try to get the blob from the cache
//
// If blob cache is enabled, we'll try to read from it.
if (blob_cache_) {
Slice key = cache_key.AsSlice();
s = GetBlobFromCache(key, &blob_handle);
if (s.ok() && blob_handle.GetValue()) {
PinCachedBlob(&blob_handle, value);
// For consistency, the size of on-disk (possibly compressed) blob record
// is assigned to bytes_read.
uint64_t adjustment =
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(
user_key.size())
: 0;
assert(offset >= adjustment);
uint64_t record_size = value_size + adjustment;
if (bytes_read) {
*bytes_read = record_size;
}
return s;
}
}
assert(blob_handle.IsEmpty());
const bool no_io = read_options.read_tier == kBlockCacheTier;
if (no_io) {
s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
return s;
}
// Can't find the blob from the cache. Since I/O is allowed, read from the
// file.
{
CacheHandleGuard<BlobFileReader> blob_file_reader;
s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
if (!s.ok()) {
return s;
}
assert(blob_file_reader.GetValue());
if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) {
return Status::Corruption("Compression type mismatch when reading blob");
}
uint64_t read_size = 0;
s = blob_file_reader.GetValue()->GetBlob(
read_options, user_key, offset, value_size, compression_type,
prefetch_buffer, value, &read_size);
if (!s.ok()) {
return s;
}
if (bytes_read) {
*bytes_read = read_size;
}
}
if (blob_cache_ && read_options.fill_cache) {
// If filling cache is allowed and a cache is configured, try to put the
// blob to the cache.
Slice key = cache_key.AsSlice();
s = PutBlobIntoCache(key, &blob_handle, value);
if (!s.ok()) {
return s;
}
PinCachedBlob(&blob_handle, value);
}
assert(s.ok());
return s;
}
void BlobSource::MultiGetBlob(const ReadOptions& read_options,
autovector<BlobFileReadRequests>& blob_reqs,
uint64_t* bytes_read) {
assert(blob_reqs.size() > 0);
uint64_t total_bytes_read = 0;
uint64_t bytes_read_in_file = 0;
for (auto& [file_number, file_size, blob_reqs_in_file] : blob_reqs) {
// sort blob_reqs_in_file by file offset.
std::sort(
blob_reqs_in_file.begin(), blob_reqs_in_file.end(),
[](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
return lhs.offset < rhs.offset;
});
MultiGetBlobFromOneFile(read_options, file_number, file_size,
blob_reqs_in_file, &bytes_read_in_file);
total_bytes_read += bytes_read_in_file;
}
if (bytes_read) {
*bytes_read = total_bytes_read;
}
}
void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
uint64_t file_number,
uint64_t /*file_size*/,
autovector<BlobReadRequest>& blob_reqs,
uint64_t* bytes_read) {
const size_t num_blobs = blob_reqs.size();
assert(num_blobs > 0);
assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);
#ifndef NDEBUG
for (size_t i = 0; i < num_blobs - 1; ++i) {
assert(blob_reqs[i].offset <= blob_reqs[i + 1].offset);
}
#endif // !NDEBUG
using Mask = uint64_t;
Mask cache_hit_mask = 0;
uint64_t total_bytes = 0;
const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number);
if (blob_cache_) {
size_t cached_blob_count = 0;
for (size_t i = 0; i < num_blobs; ++i) {
auto& req = blob_reqs[i];
CacheHandleGuard<BlobContents> blob_handle;
const CacheKey cache_key = base_cache_key.WithOffset(req.offset);
const Slice key = cache_key.AsSlice();
const Status s = GetBlobFromCache(key, &blob_handle);
if (s.ok() && blob_handle.GetValue()) {
assert(req.status);
*req.status = s;
PinCachedBlob(&blob_handle, req.result);
// Update the counter for the number of valid blobs read from the cache.
++cached_blob_count;
// For consistency, the size of each on-disk (possibly compressed) blob
// record is accumulated to total_bytes.
uint64_t adjustment =
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(
req.user_key->size())
: 0;
assert(req.offset >= adjustment);
total_bytes += req.len + adjustment;
cache_hit_mask |= (Mask{1} << i); // cache hit
}
}
// All blobs were read from the cache.
if (cached_blob_count == num_blobs) {
if (bytes_read) {
*bytes_read = total_bytes;
}
return;
}
}
const bool no_io = read_options.read_tier == kBlockCacheTier;
if (no_io) {
for (size_t i = 0; i < num_blobs; ++i) {
if (!(cache_hit_mask & (Mask{1} << i))) {
assert(blob_reqs[i].status);
*blob_reqs[i].status =
Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
}
}
return;
}
{
// Find the rest of blobs from the file since I/O is allowed.
autovector<BlobReadRequest*> _blob_reqs;
uint64_t _bytes_read = 0;
for (size_t i = 0; i < num_blobs; ++i) {
if (!(cache_hit_mask & (Mask{1} << i))) {
_blob_reqs.push_back(&blob_reqs[i]);
}
}
CacheHandleGuard<BlobFileReader> blob_file_reader;
Status s =
blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
if (!s.ok()) {
for (size_t i = 0; i < _blob_reqs.size(); ++i) {
assert(_blob_reqs[i]->status);
*_blob_reqs[i]->status = s;
}
return;
}
assert(blob_file_reader.GetValue());
blob_file_reader.GetValue()->MultiGetBlob(read_options, _blob_reqs,
&_bytes_read);
if (blob_cache_ && read_options.fill_cache) {
// If filling cache is allowed and a cache is configured, try to put
// the blob(s) to the cache.
for (BlobReadRequest* req : _blob_reqs) {
assert(req);
if (req->status->ok()) {
CacheHandleGuard<BlobContents> blob_handle;
const CacheKey cache_key = base_cache_key.WithOffset(req->offset);
const Slice key = cache_key.AsSlice();
s = PutBlobIntoCache(key, &blob_handle, req->result);
if (!s.ok()) {
*req->status = s;
} else {
PinCachedBlob(&blob_handle, req->result);
}
}
}
}
total_bytes += _bytes_read;
if (bytes_read) {
*bytes_read = total_bytes;
}
}
}
bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
uint64_t offset, size_t* charge) const {
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
const Slice key = cache_key.AsSlice();
CacheHandleGuard<BlobContents> blob_handle;
const Status s = GetBlobFromCache(key, &blob_handle);
if (s.ok() && blob_handle.GetValue() != nullptr) {
if (charge) {
const Cache* const cache = blob_handle.GetCache();
assert(cache);
Cache::Handle* const handle = blob_handle.GetCacheHandle();
assert(handle);
*charge = cache->GetUsage(handle);
}
return true;
}
return false;
}
} // namespace ROCKSDB_NAMESPACE