|
|
|
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
|
|
|
|
#include "db/blob/blob_source.h"
|
|
|
|
|
|
|
|
#include <cassert>
|
|
|
|
#include <string>
|
|
|
|
|
|
|
|
#include "cache/cache_reservation_manager.h"
|
|
|
|
#include "cache/charged_cache.h"
|
|
|
|
#include "db/blob/blob_file_reader.h"
|
|
|
|
#include "db/blob/blob_log_format.h"
|
|
|
|
#include "monitoring/statistics.h"
|
|
|
|
#include "options/cf_options.h"
|
|
|
|
#include "table/get_context.h"
|
|
|
|
#include "table/multiget_context.h"
|
|
|
|
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
|
|
|
|
BlobSource::BlobSource(const ImmutableOptions* immutable_options,
|
|
|
|
const std::string& db_id,
|
|
|
|
const std::string& db_session_id,
|
|
|
|
BlobFileCache* blob_file_cache)
|
|
|
|
: db_id_(db_id),
|
|
|
|
db_session_id_(db_session_id),
|
|
|
|
statistics_(immutable_options->statistics.get()),
|
|
|
|
blob_file_cache_(blob_file_cache),
|
|
|
|
blob_cache_(immutable_options->blob_cache),
|
|
|
|
lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
auto bbto =
|
|
|
|
immutable_options->table_factory->GetOptions<BlockBasedTableOptions>();
|
|
|
|
if (bbto &&
|
|
|
|
bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache)
|
|
|
|
.charged == CacheEntryRoleOptions::Decision::kEnabled) {
|
|
|
|
blob_cache_ = std::make_shared<ChargedCache>(immutable_options->blob_cache,
|
|
|
|
bbto->block_cache);
|
|
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
}
|
|
|
|
|
|
|
|
BlobSource::~BlobSource() = default;
|
|
|
|
|
|
|
|
Status BlobSource::GetBlobFromCache(const Slice& cache_key,
|
|
|
|
CacheHandleGuard<std::string>* blob) const {
|
|
|
|
assert(blob);
|
|
|
|
assert(blob->IsEmpty());
|
|
|
|
assert(blob_cache_);
|
|
|
|
assert(!cache_key.empty());
|
|
|
|
|
|
|
|
Cache::Handle* cache_handle = nullptr;
|
|
|
|
cache_handle = GetEntryFromCache(cache_key);
|
|
|
|
if (cache_handle != nullptr) {
|
|
|
|
*blob = CacheHandleGuard<std::string>(blob_cache_.get(), cache_handle);
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(blob->IsEmpty());
|
|
|
|
|
|
|
|
return Status::NotFound("Blob not found in cache");
|
|
|
|
}
|
|
|
|
|
|
|
|
Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
|
|
|
|
CacheHandleGuard<std::string>* cached_blob,
|
|
|
|
PinnableSlice* blob) const {
|
|
|
|
assert(blob);
|
|
|
|
assert(!cache_key.empty());
|
|
|
|
assert(blob_cache_);
|
|
|
|
|
|
|
|
Status s;
|
|
|
|
const Cache::Priority priority = Cache::Priority::LOW;
|
|
|
|
|
|
|
|
// Objects to be put into the cache have to be heap-allocated and
|
|
|
|
// self-contained, i.e. own their contents. The Cache has to be able to take
|
|
|
|
// unique ownership of them. Therefore, we copy the blob into a string
|
|
|
|
// directly, and insert that into the cache.
|
|
|
|
std::unique_ptr<std::string> buf = std::make_unique<std::string>();
|
|
|
|
buf->assign(blob->data(), blob->size());
|
|
|
|
|
|
|
|
// TODO: support custom allocators and provide a better estimated memory
|
|
|
|
// usage using malloc_usable_size.
|
|
|
|
Cache::Handle* cache_handle = nullptr;
|
|
|
|
s = InsertEntryIntoCache(cache_key, buf.get(), buf->size(), &cache_handle,
|
|
|
|
priority);
|
|
|
|
if (s.ok()) {
|
|
|
|
buf.release();
|
|
|
|
assert(cache_handle != nullptr);
|
|
|
|
*cached_blob =
|
|
|
|
CacheHandleGuard<std::string>(blob_cache_.get(), cache_handle);
|
|
|
|
}
|
|
|
|
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const {
|
|
|
|
Cache::Handle* cache_handle = nullptr;
|
|
|
|
|
|
|
|
if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) {
|
|
|
|
Cache::CreateCallback create_cb = [&](const void* buf, size_t size,
|
|
|
|
void** out_obj,
|
|
|
|
size_t* charge) -> Status {
|
|
|
|
std::string* blob = new std::string();
|
|
|
|
blob->assign(static_cast<const char*>(buf), size);
|
|
|
|
*out_obj = blob;
|
|
|
|
*charge = size;
|
|
|
|
return Status::OK();
|
|
|
|
};
|
|
|
|
cache_handle = blob_cache_->Lookup(key, GetCacheItemHelper(), create_cb,
|
|
|
|
Cache::Priority::LOW,
|
|
|
|
true /* wait_for_cache */, statistics_);
|
|
|
|
} else {
|
|
|
|
cache_handle = blob_cache_->Lookup(key, statistics_);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (cache_handle != nullptr) {
|
|
|
|
PERF_COUNTER_ADD(blob_cache_hit_count, 1);
|
|
|
|
RecordTick(statistics_, BLOB_DB_CACHE_HIT);
|
|
|
|
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ,
|
|
|
|
blob_cache_->GetUsage(cache_handle));
|
|
|
|
} else {
|
|
|
|
RecordTick(statistics_, BLOB_DB_CACHE_MISS);
|
|
|
|
}
|
|
|
|
return cache_handle;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status BlobSource::InsertEntryIntoCache(const Slice& key, std::string* value,
|
|
|
|
size_t charge,
|
|
|
|
Cache::Handle** cache_handle,
|
|
|
|
Cache::Priority priority) const {
|
|
|
|
Status s;
|
|
|
|
|
|
|
|
if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) {
|
|
|
|
s = blob_cache_->Insert(key, value, GetCacheItemHelper(), charge,
|
|
|
|
cache_handle, priority);
|
|
|
|
} else {
|
|
|
|
s = blob_cache_->Insert(key, value, charge, &DeleteCacheEntry<std::string>,
|
|
|
|
cache_handle, priority);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (s.ok()) {
|
|
|
|
assert(*cache_handle != nullptr);
|
|
|
|
RecordTick(statistics_, BLOB_DB_CACHE_ADD);
|
|
|
|
RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE,
|
|
|
|
blob_cache_->GetUsage(*cache_handle));
|
|
|
|
} else {
|
|
|
|
RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES);
|
|
|
|
}
|
|
|
|
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status BlobSource::GetBlob(const ReadOptions& read_options,
|
|
|
|
const Slice& user_key, uint64_t file_number,
|
|
|
|
uint64_t offset, uint64_t file_size,
|
|
|
|
uint64_t value_size,
|
|
|
|
CompressionType compression_type,
|
|
|
|
FilePrefetchBuffer* prefetch_buffer,
|
|
|
|
PinnableSlice* value, uint64_t* bytes_read) {
|
|
|
|
assert(value);
|
|
|
|
|
|
|
|
Status s;
|
|
|
|
|
|
|
|
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
|
|
|
|
|
|
|
|
CacheHandleGuard<std::string> blob_handle;
|
|
|
|
|
|
|
|
// First, try to get the blob from the cache
|
|
|
|
//
|
|
|
|
// If blob cache is enabled, we'll try to read from it.
|
|
|
|
if (blob_cache_) {
|
|
|
|
Slice key = cache_key.AsSlice();
|
|
|
|
s = GetBlobFromCache(key, &blob_handle);
|
|
|
|
if (s.ok() && blob_handle.GetValue()) {
|
|
|
|
{
|
|
|
|
value->Reset();
|
|
|
|
// To avoid copying the cached blob into the buffer provided by the
|
|
|
|
// application, we can simply transfer ownership of the cache handle to
|
|
|
|
// the target PinnableSlice. This has the potential to save a lot of
|
|
|
|
// CPU, especially with large blob values.
|
|
|
|
value->PinSlice(
|
|
|
|
*blob_handle.GetValue(),
|
|
|
|
[](void* arg1, void* arg2) {
|
|
|
|
Cache* const cache = static_cast<Cache*>(arg1);
|
|
|
|
Cache::Handle* const handle = static_cast<Cache::Handle*>(arg2);
|
|
|
|
cache->Release(handle);
|
|
|
|
},
|
|
|
|
blob_handle.GetCache(), blob_handle.GetCacheHandle());
|
|
|
|
// Make the CacheHandleGuard relinquish ownership of the handle.
|
|
|
|
blob_handle.TransferTo(nullptr);
|
|
|
|
}
|
|
|
|
|
|
|
|
// For consistency, the size of on-disk (possibly compressed) blob record
|
|
|
|
// is assigned to bytes_read.
|
|
|
|
uint64_t adjustment =
|
|
|
|
read_options.verify_checksums
|
|
|
|
? BlobLogRecord::CalculateAdjustmentForRecordHeader(
|
|
|
|
user_key.size())
|
|
|
|
: 0;
|
|
|
|
assert(offset >= adjustment);
|
|
|
|
|
|
|
|
uint64_t record_size = value_size + adjustment;
|
|
|
|
if (bytes_read) {
|
|
|
|
*bytes_read = record_size;
|
|
|
|
}
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(blob_handle.IsEmpty());
|
|
|
|
|
|
|
|
const bool no_io = read_options.read_tier == kBlockCacheTier;
|
|
|
|
if (no_io) {
|
|
|
|
s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Can't find the blob from the cache. Since I/O is allowed, read from the
|
|
|
|
// file.
|
|
|
|
{
|
|
|
|
CacheHandleGuard<BlobFileReader> blob_file_reader;
|
|
|
|
s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
|
|
|
|
if (!s.ok()) {
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(blob_file_reader.GetValue());
|
|
|
|
|
|
|
|
if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) {
|
|
|
|
return Status::Corruption("Compression type mismatch when reading blob");
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64_t read_size = 0;
|
|
|
|
s = blob_file_reader.GetValue()->GetBlob(
|
|
|
|
read_options, user_key, offset, value_size, compression_type,
|
|
|
|
prefetch_buffer, value, &read_size);
|
|
|
|
if (!s.ok()) {
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
if (bytes_read) {
|
|
|
|
*bytes_read = read_size;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (blob_cache_ && read_options.fill_cache) {
|
|
|
|
// If filling cache is allowed and a cache is configured, try to put the
|
|
|
|
// blob to the cache.
|
|
|
|
Slice key = cache_key.AsSlice();
|
|
|
|
s = PutBlobIntoCache(key, &blob_handle, value);
|
|
|
|
if (!s.ok()) {
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(s.ok());
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
void BlobSource::MultiGetBlob(const ReadOptions& read_options,
|
|
|
|
autovector<BlobFileReadRequests>& blob_reqs,
|
|
|
|
uint64_t* bytes_read) {
|
|
|
|
assert(blob_reqs.size() > 0);
|
|
|
|
|
|
|
|
uint64_t total_bytes_read = 0;
|
|
|
|
uint64_t bytes_read_in_file = 0;
|
|
|
|
|
|
|
|
for (auto& [file_number, file_size, blob_reqs_in_file] : blob_reqs) {
|
|
|
|
// sort blob_reqs_in_file by file offset.
|
|
|
|
std::sort(
|
|
|
|
blob_reqs_in_file.begin(), blob_reqs_in_file.end(),
|
|
|
|
[](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
|
|
|
|
return lhs.offset < rhs.offset;
|
|
|
|
});
|
|
|
|
|
|
|
|
MultiGetBlobFromOneFile(read_options, file_number, file_size,
|
|
|
|
blob_reqs_in_file, &bytes_read_in_file);
|
|
|
|
|
|
|
|
total_bytes_read += bytes_read_in_file;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (bytes_read) {
|
|
|
|
*bytes_read = total_bytes_read;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
|
|
|
|
uint64_t file_number,
|
Derive cache keys from SST unique IDs (#10394)
Summary:
... so that cache keys can be derived from DB manifest data
before reading the file from storage--so that every part of the file
can potentially go in a persistent cache.
See updated comments in cache_key.cc for technical details. Importantly,
the new cache key encoding uses some fancy but efficient math to pack
data into the cache key without depending on the sizes of the various
pieces. This simplifies some existing code creating cache keys, like
cache warming before the file size is known.
This should provide us an essentially permanent mapping between SST
unique IDs and base cache keys, with the ability to "upgrade" SST
unique IDs (and thus cache keys) with new SST format_versions.
These cache keys are of similar, perhaps indistinguishable quality to
the previous generation. Before this change (see "corrected" days
between collision):
```
./cache_bench -stress_cache_key -sck_keep_bits=43
18 collisions after 2 x 90 days, est 10 days between (1.15292e+19 corrected)
```
After this change (keep 43 bits, up through 50, to validate "trajectory"
is ok on "corrected" days between collision):
```
19 collisions after 3 x 90 days, est 14.2105 days between (1.63836e+19 corrected)
16 collisions after 5 x 90 days, est 28.125 days between (1.6213e+19 corrected)
15 collisions after 7 x 90 days, est 42 days between (1.21057e+19 corrected)
15 collisions after 17 x 90 days, est 102 days between (1.46997e+19 corrected)
15 collisions after 49 x 90 days, est 294 days between (2.11849e+19 corrected)
15 collisions after 62 x 90 days, est 372 days between (1.34027e+19 corrected)
15 collisions after 53 x 90 days, est 318 days between (5.72858e+18 corrected)
15 collisions after 309 x 90 days, est 1854 days between (1.66994e+19 corrected)
```
However, the change does modify (probably weaken) the "guaranteed unique" promise from this
> SST files generated in a single process are guaranteed to have unique cache keys, unless/until number session ids * max file number = 2**86
to this (see https://github.com/facebook/rocksdb/issues/10388)
> With the DB id limitation, we only have nice guaranteed unique cache keys for files generated in a single process until biggest session_id_counter and offset_in_file reach combined 64 bits
I don't think this is a practical concern, though.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/10394
Test Plan: unit tests updated, see simulation results above
Reviewed By: jay-zhuang
Differential Revision: D38667529
Pulled By: pdillinger
fbshipit-source-id: 49af3fe7f47e5b61162809a78b76c769fd519fba
2 years ago
|
|
|
uint64_t /*file_size*/,
|
|
|
|
autovector<BlobReadRequest>& blob_reqs,
|
|
|
|
uint64_t* bytes_read) {
|
|
|
|
const size_t num_blobs = blob_reqs.size();
|
|
|
|
assert(num_blobs > 0);
|
|
|
|
assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);
|
|
|
|
|
|
|
|
#ifndef NDEBUG
|
|
|
|
for (size_t i = 0; i < num_blobs - 1; ++i) {
|
|
|
|
assert(blob_reqs[i].offset <= blob_reqs[i + 1].offset);
|
|
|
|
}
|
|
|
|
#endif // !NDEBUG
|
|
|
|
|
|
|
|
using Mask = uint64_t;
|
|
|
|
Mask cache_hit_mask = 0;
|
|
|
|
|
|
|
|
uint64_t total_bytes = 0;
|
Derive cache keys from SST unique IDs (#10394)
Summary:
... so that cache keys can be derived from DB manifest data
before reading the file from storage--so that every part of the file
can potentially go in a persistent cache.
See updated comments in cache_key.cc for technical details. Importantly,
the new cache key encoding uses some fancy but efficient math to pack
data into the cache key without depending on the sizes of the various
pieces. This simplifies some existing code creating cache keys, like
cache warming before the file size is known.
This should provide us an essentially permanent mapping between SST
unique IDs and base cache keys, with the ability to "upgrade" SST
unique IDs (and thus cache keys) with new SST format_versions.
These cache keys are of similar, perhaps indistinguishable quality to
the previous generation. Before this change (see "corrected" days
between collision):
```
./cache_bench -stress_cache_key -sck_keep_bits=43
18 collisions after 2 x 90 days, est 10 days between (1.15292e+19 corrected)
```
After this change (keep 43 bits, up through 50, to validate "trajectory"
is ok on "corrected" days between collision):
```
19 collisions after 3 x 90 days, est 14.2105 days between (1.63836e+19 corrected)
16 collisions after 5 x 90 days, est 28.125 days between (1.6213e+19 corrected)
15 collisions after 7 x 90 days, est 42 days between (1.21057e+19 corrected)
15 collisions after 17 x 90 days, est 102 days between (1.46997e+19 corrected)
15 collisions after 49 x 90 days, est 294 days between (2.11849e+19 corrected)
15 collisions after 62 x 90 days, est 372 days between (1.34027e+19 corrected)
15 collisions after 53 x 90 days, est 318 days between (5.72858e+18 corrected)
15 collisions after 309 x 90 days, est 1854 days between (1.66994e+19 corrected)
```
However, the change does modify (probably weaken) the "guaranteed unique" promise from this
> SST files generated in a single process are guaranteed to have unique cache keys, unless/until number session ids * max file number = 2**86
to this (see https://github.com/facebook/rocksdb/issues/10388)
> With the DB id limitation, we only have nice guaranteed unique cache keys for files generated in a single process until biggest session_id_counter and offset_in_file reach combined 64 bits
I don't think this is a practical concern, though.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/10394
Test Plan: unit tests updated, see simulation results above
Reviewed By: jay-zhuang
Differential Revision: D38667529
Pulled By: pdillinger
fbshipit-source-id: 49af3fe7f47e5b61162809a78b76c769fd519fba
2 years ago
|
|
|
const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number);
|
|
|
|
|
|
|
|
if (blob_cache_) {
|
|
|
|
size_t cached_blob_count = 0;
|
|
|
|
for (size_t i = 0; i < num_blobs; ++i) {
|
|
|
|
auto& req = blob_reqs[i];
|
|
|
|
|
|
|
|
CacheHandleGuard<std::string> blob_handle;
|
|
|
|
const CacheKey cache_key = base_cache_key.WithOffset(req.offset);
|
|
|
|
const Slice key = cache_key.AsSlice();
|
|
|
|
|
|
|
|
const Status s = GetBlobFromCache(key, &blob_handle);
|
|
|
|
|
|
|
|
if (s.ok() && blob_handle.GetValue()) {
|
|
|
|
assert(req.status);
|
|
|
|
*req.status = s;
|
|
|
|
|
|
|
|
{
|
|
|
|
req.result->Reset();
|
|
|
|
// To avoid copying the cached blob into the buffer provided by the
|
|
|
|
// application, we can simply transfer ownership of the cache handle
|
|
|
|
// to the target PinnableSlice. This has the potential to save a lot
|
|
|
|
// of CPU, especially with large blob values.
|
|
|
|
req.result->PinSlice(
|
|
|
|
*blob_handle.GetValue(),
|
|
|
|
[](void* arg1, void* arg2) {
|
|
|
|
Cache* const cache = static_cast<Cache*>(arg1);
|
|
|
|
Cache::Handle* const handle = static_cast<Cache::Handle*>(arg2);
|
|
|
|
cache->Release(handle);
|
|
|
|
},
|
|
|
|
blob_handle.GetCache(), blob_handle.GetCacheHandle());
|
|
|
|
// Make the CacheHandleGuard relinquish ownership of the handle.
|
|
|
|
blob_handle.TransferTo(nullptr);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update the counter for the number of valid blobs read from the cache.
|
|
|
|
++cached_blob_count;
|
|
|
|
|
|
|
|
// For consistency, the size of each on-disk (possibly compressed) blob
|
|
|
|
// record is accumulated to total_bytes.
|
|
|
|
uint64_t adjustment =
|
|
|
|
read_options.verify_checksums
|
|
|
|
? BlobLogRecord::CalculateAdjustmentForRecordHeader(
|
|
|
|
req.user_key->size())
|
|
|
|
: 0;
|
|
|
|
assert(req.offset >= adjustment);
|
|
|
|
total_bytes += req.len + adjustment;
|
|
|
|
cache_hit_mask |= (Mask{1} << i); // cache hit
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// All blobs were read from the cache.
|
|
|
|
if (cached_blob_count == num_blobs) {
|
|
|
|
if (bytes_read) {
|
|
|
|
*bytes_read = total_bytes;
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const bool no_io = read_options.read_tier == kBlockCacheTier;
|
|
|
|
if (no_io) {
|
|
|
|
for (size_t i = 0; i < num_blobs; ++i) {
|
|
|
|
if (!(cache_hit_mask & (Mask{1} << i))) {
|
|
|
|
assert(blob_reqs[i].status);
|
|
|
|
*blob_reqs[i].status =
|
|
|
|
Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
{
|
|
|
|
// Find the rest of blobs from the file since I/O is allowed.
|
|
|
|
autovector<BlobReadRequest*> _blob_reqs;
|
|
|
|
uint64_t _bytes_read = 0;
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_blobs; ++i) {
|
|
|
|
if (!(cache_hit_mask & (Mask{1} << i))) {
|
|
|
|
_blob_reqs.push_back(&blob_reqs[i]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
CacheHandleGuard<BlobFileReader> blob_file_reader;
|
|
|
|
Status s =
|
|
|
|
blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
|
|
|
|
if (!s.ok()) {
|
|
|
|
for (size_t i = 0; i < _blob_reqs.size(); ++i) {
|
|
|
|
assert(_blob_reqs[i]->status);
|
|
|
|
*_blob_reqs[i]->status = s;
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(blob_file_reader.GetValue());
|
|
|
|
|
|
|
|
blob_file_reader.GetValue()->MultiGetBlob(read_options, _blob_reqs,
|
|
|
|
&_bytes_read);
|
|
|
|
|
|
|
|
if (blob_cache_ && read_options.fill_cache) {
|
|
|
|
// If filling cache is allowed and a cache is configured, try to put
|
|
|
|
// the blob(s) to the cache.
|
|
|
|
for (size_t i = 0; i < _blob_reqs.size(); ++i) {
|
|
|
|
if (_blob_reqs[i]->status->ok()) {
|
|
|
|
CacheHandleGuard<std::string> blob_handle;
|
|
|
|
const CacheKey cache_key =
|
|
|
|
base_cache_key.WithOffset(_blob_reqs[i]->offset);
|
|
|
|
const Slice key = cache_key.AsSlice();
|
|
|
|
s = PutBlobIntoCache(key, &blob_handle, _blob_reqs[i]->result);
|
|
|
|
if (!s.ok()) {
|
|
|
|
*_blob_reqs[i]->status = s;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
total_bytes += _bytes_read;
|
|
|
|
if (bytes_read) {
|
|
|
|
*bytes_read = total_bytes;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
|
|
|
|
uint64_t offset) const {
|
|
|
|
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
|
|
|
|
const Slice key = cache_key.AsSlice();
|
|
|
|
|
|
|
|
CacheHandleGuard<std::string> blob_handle;
|
|
|
|
const Status s = GetBlobFromCache(key, &blob_handle);
|
|
|
|
|
|
|
|
if (s.ok() && blob_handle.GetValue() != nullptr) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Callbacks for secondary blob cache
|
|
|
|
size_t BlobSource::SizeCallback(void* obj) {
|
|
|
|
assert(obj != nullptr);
|
|
|
|
return static_cast<const std::string*>(obj)->size();
|
|
|
|
}
|
|
|
|
|
|
|
|
Status BlobSource::SaveToCallback(void* from_obj, size_t from_offset,
|
|
|
|
size_t length, void* out) {
|
|
|
|
assert(from_obj != nullptr);
|
|
|
|
const std::string* buf = static_cast<const std::string*>(from_obj);
|
|
|
|
assert(buf->size() >= from_offset + length);
|
|
|
|
memcpy(out, buf->data() + from_offset, length);
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
|
|
|
|
Cache::CacheItemHelper* BlobSource::GetCacheItemHelper() {
|
|
|
|
static Cache::CacheItemHelper cache_helper(SizeCallback, SaveToCallback,
|
|
|
|
&DeleteCacheEntry<std::string>);
|
|
|
|
return &cache_helper;
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|