fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
452 lines
15 KiB
452 lines
15 KiB
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/blob/blob_source.h"
|
|
|
|
#include <cassert>
|
|
#include <string>
|
|
|
|
#include "db/blob/blob_file_reader.h"
|
|
#include "db/blob/blob_log_format.h"
|
|
#include "monitoring/statistics.h"
|
|
#include "options/cf_options.h"
|
|
#include "table/get_context.h"
|
|
#include "table/multiget_context.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
BlobSource::BlobSource(const ImmutableOptions* immutable_options,
|
|
const std::string& db_id,
|
|
const std::string& db_session_id,
|
|
BlobFileCache* blob_file_cache)
|
|
: db_id_(db_id),
|
|
db_session_id_(db_session_id),
|
|
statistics_(immutable_options->statistics.get()),
|
|
blob_file_cache_(blob_file_cache),
|
|
blob_cache_(immutable_options->blob_cache),
|
|
lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) {}
|
|
|
|
BlobSource::~BlobSource() = default;
|
|
|
|
Status BlobSource::GetBlobFromCache(const Slice& cache_key,
|
|
CacheHandleGuard<std::string>* blob) const {
|
|
assert(blob);
|
|
assert(blob->IsEmpty());
|
|
assert(blob_cache_);
|
|
assert(!cache_key.empty());
|
|
|
|
Cache::Handle* cache_handle = nullptr;
|
|
cache_handle = GetEntryFromCache(cache_key);
|
|
if (cache_handle != nullptr) {
|
|
*blob = CacheHandleGuard<std::string>(blob_cache_.get(), cache_handle);
|
|
return Status::OK();
|
|
}
|
|
|
|
assert(blob->IsEmpty());
|
|
|
|
return Status::NotFound("Blob not found in cache");
|
|
}
|
|
|
|
Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
|
|
CacheHandleGuard<std::string>* cached_blob,
|
|
PinnableSlice* blob) const {
|
|
assert(blob);
|
|
assert(!cache_key.empty());
|
|
assert(blob_cache_);
|
|
|
|
Status s;
|
|
const Cache::Priority priority = Cache::Priority::LOW;
|
|
|
|
// Objects to be put into the cache have to be heap-allocated and
|
|
// self-contained, i.e. own their contents. The Cache has to be able to take
|
|
// unique ownership of them. Therefore, we copy the blob into a string
|
|
// directly, and insert that into the cache.
|
|
std::string* buf = new std::string();
|
|
buf->assign(blob->data(), blob->size());
|
|
|
|
// TODO: support custom allocators and provide a better estimated memory
|
|
// usage using malloc_usable_size.
|
|
Cache::Handle* cache_handle = nullptr;
|
|
s = InsertEntryIntoCache(cache_key, buf, buf->size(), &cache_handle,
|
|
priority);
|
|
if (s.ok()) {
|
|
assert(cache_handle != nullptr);
|
|
*cached_blob =
|
|
CacheHandleGuard<std::string>(blob_cache_.get(), cache_handle);
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const {
|
|
Cache::Handle* cache_handle = nullptr;
|
|
|
|
if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) {
|
|
Cache::CreateCallback create_cb = [&](const void* buf, size_t size,
|
|
void** out_obj,
|
|
size_t* charge) -> Status {
|
|
std::string* blob = new std::string();
|
|
blob->assign(static_cast<const char*>(buf), size);
|
|
*out_obj = blob;
|
|
*charge = size;
|
|
return Status::OK();
|
|
};
|
|
cache_handle = blob_cache_->Lookup(key, GetCacheItemHelper(), create_cb,
|
|
Cache::Priority::LOW,
|
|
true /* wait_for_cache */, statistics_);
|
|
} else {
|
|
cache_handle = blob_cache_->Lookup(key, statistics_);
|
|
}
|
|
|
|
if (cache_handle != nullptr) {
|
|
PERF_COUNTER_ADD(blob_cache_hit_count, 1);
|
|
RecordTick(statistics_, BLOB_DB_CACHE_HIT);
|
|
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ,
|
|
blob_cache_->GetUsage(cache_handle));
|
|
} else {
|
|
RecordTick(statistics_, BLOB_DB_CACHE_MISS);
|
|
}
|
|
return cache_handle;
|
|
}
|
|
|
|
Status BlobSource::InsertEntryIntoCache(const Slice& key, std::string* value,
|
|
size_t charge,
|
|
Cache::Handle** cache_handle,
|
|
Cache::Priority priority) const {
|
|
Status s;
|
|
|
|
if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) {
|
|
s = blob_cache_->Insert(key, value, GetCacheItemHelper(), charge,
|
|
cache_handle, priority);
|
|
} else {
|
|
s = blob_cache_->Insert(key, value, charge, &DeleteCacheEntry<std::string>,
|
|
cache_handle, priority);
|
|
}
|
|
|
|
if (s.ok()) {
|
|
assert(*cache_handle != nullptr);
|
|
RecordTick(statistics_, BLOB_DB_CACHE_ADD);
|
|
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE,
|
|
blob_cache_->GetUsage(*cache_handle));
|
|
} else {
|
|
RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES);
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Status BlobSource::GetBlob(const ReadOptions& read_options,
|
|
const Slice& user_key, uint64_t file_number,
|
|
uint64_t offset, uint64_t file_size,
|
|
uint64_t value_size,
|
|
CompressionType compression_type,
|
|
FilePrefetchBuffer* prefetch_buffer,
|
|
PinnableSlice* value, uint64_t* bytes_read) {
|
|
assert(value);
|
|
|
|
Status s;
|
|
|
|
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
|
|
|
|
CacheHandleGuard<std::string> blob_handle;
|
|
|
|
// First, try to get the blob from the cache
|
|
//
|
|
// If blob cache is enabled, we'll try to read from it.
|
|
if (blob_cache_) {
|
|
Slice key = cache_key.AsSlice();
|
|
s = GetBlobFromCache(key, &blob_handle);
|
|
if (s.ok() && blob_handle.GetValue()) {
|
|
{
|
|
value->Reset();
|
|
// To avoid copying the cached blob into the buffer provided by the
|
|
// application, we can simply transfer ownership of the cache handle to
|
|
// the target PinnableSlice. This has the potential to save a lot of
|
|
// CPU, especially with large blob values.
|
|
value->PinSlice(
|
|
*blob_handle.GetValue(),
|
|
[](void* arg1, void* arg2) {
|
|
Cache* const cache = static_cast<Cache*>(arg1);
|
|
Cache::Handle* const handle = static_cast<Cache::Handle*>(arg2);
|
|
cache->Release(handle);
|
|
},
|
|
blob_handle.GetCache(), blob_handle.GetCacheHandle());
|
|
// Make the CacheHandleGuard relinquish ownership of the handle.
|
|
blob_handle.TransferTo(nullptr);
|
|
}
|
|
|
|
// For consistency, the size of on-disk (possibly compressed) blob record
|
|
// is assigned to bytes_read.
|
|
uint64_t adjustment =
|
|
read_options.verify_checksums
|
|
? BlobLogRecord::CalculateAdjustmentForRecordHeader(
|
|
user_key.size())
|
|
: 0;
|
|
assert(offset >= adjustment);
|
|
|
|
uint64_t record_size = value_size + adjustment;
|
|
if (bytes_read) {
|
|
*bytes_read = record_size;
|
|
}
|
|
return s;
|
|
}
|
|
}
|
|
|
|
assert(blob_handle.IsEmpty());
|
|
|
|
const bool no_io = read_options.read_tier == kBlockCacheTier;
|
|
if (no_io) {
|
|
s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
|
|
return s;
|
|
}
|
|
|
|
// Can't find the blob from the cache. Since I/O is allowed, read from the
|
|
// file.
|
|
{
|
|
CacheHandleGuard<BlobFileReader> blob_file_reader;
|
|
s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
assert(blob_file_reader.GetValue());
|
|
|
|
if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) {
|
|
return Status::Corruption("Compression type mismatch when reading blob");
|
|
}
|
|
|
|
uint64_t read_size = 0;
|
|
s = blob_file_reader.GetValue()->GetBlob(
|
|
read_options, user_key, offset, value_size, compression_type,
|
|
prefetch_buffer, value, &read_size);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
if (bytes_read) {
|
|
*bytes_read = read_size;
|
|
}
|
|
}
|
|
|
|
if (blob_cache_ && read_options.fill_cache) {
|
|
// If filling cache is allowed and a cache is configured, try to put the
|
|
// blob to the cache.
|
|
Slice key = cache_key.AsSlice();
|
|
s = PutBlobIntoCache(key, &blob_handle, value);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
}
|
|
|
|
assert(s.ok());
|
|
return s;
|
|
}
|
|
|
|
void BlobSource::MultiGetBlob(const ReadOptions& read_options,
|
|
autovector<BlobFileReadRequests>& blob_reqs,
|
|
uint64_t* bytes_read) {
|
|
assert(blob_reqs.size() > 0);
|
|
|
|
uint64_t total_bytes_read = 0;
|
|
uint64_t bytes_read_in_file = 0;
|
|
|
|
for (auto& [file_number, file_size, blob_reqs_in_file] : blob_reqs) {
|
|
// sort blob_reqs_in_file by file offset.
|
|
std::sort(
|
|
blob_reqs_in_file.begin(), blob_reqs_in_file.end(),
|
|
[](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
|
|
return lhs.offset < rhs.offset;
|
|
});
|
|
|
|
MultiGetBlobFromOneFile(read_options, file_number, file_size,
|
|
blob_reqs_in_file, &bytes_read_in_file);
|
|
|
|
total_bytes_read += bytes_read_in_file;
|
|
}
|
|
|
|
if (bytes_read) {
|
|
*bytes_read = total_bytes_read;
|
|
}
|
|
}
|
|
|
|
void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
|
|
uint64_t file_number,
|
|
uint64_t file_size,
|
|
autovector<BlobReadRequest>& blob_reqs,
|
|
uint64_t* bytes_read) {
|
|
const size_t num_blobs = blob_reqs.size();
|
|
assert(num_blobs > 0);
|
|
assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);
|
|
|
|
#ifndef NDEBUG
|
|
for (size_t i = 0; i < num_blobs - 1; ++i) {
|
|
assert(blob_reqs[i].offset <= blob_reqs[i + 1].offset);
|
|
}
|
|
#endif // !NDEBUG
|
|
|
|
using Mask = uint64_t;
|
|
Mask cache_hit_mask = 0;
|
|
|
|
uint64_t total_bytes = 0;
|
|
const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number,
|
|
file_size);
|
|
|
|
if (blob_cache_) {
|
|
size_t cached_blob_count = 0;
|
|
for (size_t i = 0; i < num_blobs; ++i) {
|
|
auto& req = blob_reqs[i];
|
|
|
|
CacheHandleGuard<std::string> blob_handle;
|
|
const CacheKey cache_key = base_cache_key.WithOffset(req.offset);
|
|
const Slice key = cache_key.AsSlice();
|
|
|
|
const Status s = GetBlobFromCache(key, &blob_handle);
|
|
|
|
if (s.ok() && blob_handle.GetValue()) {
|
|
assert(req.status);
|
|
*req.status = s;
|
|
|
|
{
|
|
req.result->Reset();
|
|
// To avoid copying the cached blob into the buffer provided by the
|
|
// application, we can simply transfer ownership of the cache handle
|
|
// to the target PinnableSlice. This has the potential to save a lot
|
|
// of CPU, especially with large blob values.
|
|
req.result->PinSlice(
|
|
*blob_handle.GetValue(),
|
|
[](void* arg1, void* arg2) {
|
|
Cache* const cache = static_cast<Cache*>(arg1);
|
|
Cache::Handle* const handle = static_cast<Cache::Handle*>(arg2);
|
|
cache->Release(handle);
|
|
},
|
|
blob_handle.GetCache(), blob_handle.GetCacheHandle());
|
|
// Make the CacheHandleGuard relinquish ownership of the handle.
|
|
blob_handle.TransferTo(nullptr);
|
|
}
|
|
|
|
// Update the counter for the number of valid blobs read from the cache.
|
|
++cached_blob_count;
|
|
|
|
// For consistency, the size of each on-disk (possibly compressed) blob
|
|
// record is accumulated to total_bytes.
|
|
uint64_t adjustment =
|
|
read_options.verify_checksums
|
|
? BlobLogRecord::CalculateAdjustmentForRecordHeader(
|
|
req.user_key->size())
|
|
: 0;
|
|
assert(req.offset >= adjustment);
|
|
total_bytes += req.len + adjustment;
|
|
cache_hit_mask |= (Mask{1} << i); // cache hit
|
|
}
|
|
}
|
|
|
|
// All blobs were read from the cache.
|
|
if (cached_blob_count == num_blobs) {
|
|
if (bytes_read) {
|
|
*bytes_read = total_bytes;
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
|
|
const bool no_io = read_options.read_tier == kBlockCacheTier;
|
|
if (no_io) {
|
|
for (size_t i = 0; i < num_blobs; ++i) {
|
|
if (!(cache_hit_mask & (Mask{1} << i))) {
|
|
assert(blob_reqs[i].status);
|
|
*blob_reqs[i].status =
|
|
Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
|
|
{
|
|
// Find the rest of blobs from the file since I/O is allowed.
|
|
autovector<BlobReadRequest*> _blob_reqs;
|
|
uint64_t _bytes_read = 0;
|
|
|
|
for (size_t i = 0; i < num_blobs; ++i) {
|
|
if (!(cache_hit_mask & (Mask{1} << i))) {
|
|
_blob_reqs.push_back(&blob_reqs[i]);
|
|
}
|
|
}
|
|
|
|
CacheHandleGuard<BlobFileReader> blob_file_reader;
|
|
Status s =
|
|
blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
|
|
if (!s.ok()) {
|
|
for (size_t i = 0; i < _blob_reqs.size(); ++i) {
|
|
assert(_blob_reqs[i]->status);
|
|
*_blob_reqs[i]->status = s;
|
|
}
|
|
return;
|
|
}
|
|
|
|
assert(blob_file_reader.GetValue());
|
|
|
|
blob_file_reader.GetValue()->MultiGetBlob(read_options, _blob_reqs,
|
|
&_bytes_read);
|
|
|
|
if (blob_cache_ && read_options.fill_cache) {
|
|
// If filling cache is allowed and a cache is configured, try to put
|
|
// the blob(s) to the cache.
|
|
for (size_t i = 0; i < _blob_reqs.size(); ++i) {
|
|
if (_blob_reqs[i]->status->ok()) {
|
|
CacheHandleGuard<std::string> blob_handle;
|
|
const CacheKey cache_key =
|
|
base_cache_key.WithOffset(_blob_reqs[i]->offset);
|
|
const Slice key = cache_key.AsSlice();
|
|
s = PutBlobIntoCache(key, &blob_handle, _blob_reqs[i]->result);
|
|
if (!s.ok()) {
|
|
*_blob_reqs[i]->status = s;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
total_bytes += _bytes_read;
|
|
if (bytes_read) {
|
|
*bytes_read = total_bytes;
|
|
}
|
|
}
|
|
}
|
|
|
|
bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
|
|
uint64_t offset) const {
|
|
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
|
|
const Slice key = cache_key.AsSlice();
|
|
|
|
CacheHandleGuard<std::string> blob_handle;
|
|
const Status s = GetBlobFromCache(key, &blob_handle);
|
|
|
|
if (s.ok() && blob_handle.GetValue() != nullptr) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
// Callbacks for secondary blob cache
|
|
size_t BlobSource::SizeCallback(void* obj) {
|
|
assert(obj != nullptr);
|
|
return static_cast<const std::string*>(obj)->size();
|
|
}
|
|
|
|
Status BlobSource::SaveToCallback(void* from_obj, size_t from_offset,
|
|
size_t length, void* out) {
|
|
assert(from_obj != nullptr);
|
|
const std::string* buf = static_cast<const std::string*>(from_obj);
|
|
assert(buf->size() >= from_offset + length);
|
|
memcpy(out, buf->data() + from_offset, length);
|
|
return Status::OK();
|
|
}
|
|
|
|
Cache::CacheItemHelper* BlobSource::GetCacheItemHelper() {
|
|
static Cache::CacheItemHelper cache_helper(SizeCallback, SaveToCallback,
|
|
&DeleteCacheEntry<std::string>);
|
|
return &cache_helper;
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|