Read blob from blob cache if exists when GetBlob() (#10178)

Summary:
There is currently no caching mechanism for blobs, which is not ideal especially when the database resides on remote storage (where we cannot rely on the OS page cache). As part of this task, we would like to make it possible for the application to configure a blob cache.
In this task, we added a new abstraction layer `BlobSource` to retrieve blobs from either blob cache or raw blob file. Note: For simplicity, the current PR only includes `GetBlob()`.  `MultiGetBlob()` will be included in the next PR.

This PR is a part of https://github.com/facebook/rocksdb/issues/10156

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10178

Reviewed By: ltamasi

Differential Revision: D37250507

Pulled By: gangliao

fbshipit-source-id: 3fc4a55a0cea955a3147bdc7dba06430e377259b
main
Gang Liao 2 years ago committed by Facebook GitHub Bot
parent 1aac814578
commit c965c9ef65
  1. 2
      CMakeLists.txt
  2. 3
      Makefile
  3. 8
      TARGETS
  4. 169
      db/blob/blob_source.cc
  5. 88
      db/blob/blob_source.h
  6. 298
      db/blob/blob_source_test.cc
  7. 6
      include/rocksdb/cache.h
  8. 2
      src.mk

@ -615,6 +615,7 @@ set(SOURCES
db/blob/blob_log_format.cc
db/blob/blob_log_sequential_reader.cc
db/blob/blob_log_writer.cc
db/blob/blob_source.cc
db/blob/prefetch_buffer_collection.cc
db/builder.cc
db/c.cc
@ -1217,6 +1218,7 @@ if(WITH_TESTS)
db/blob/blob_file_garbage_test.cc
db/blob/blob_file_reader_test.cc
db/blob/blob_garbage_meter_test.cc
db/blob/blob_source_test.cc
db/blob/db_blob_basic_test.cc
db/blob/db_blob_compaction_test.cc
db/blob/db_blob_corruption_test.cc

@ -1863,6 +1863,9 @@ blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRA
blob_file_reader_test: $(OBJ_DIR)/db/blob/blob_file_reader_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
blob_source_test: $(OBJ_DIR)/db/blob/blob_source_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
blob_garbage_meter_test: $(OBJ_DIR)/db/blob/blob_garbage_meter_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

@ -30,6 +30,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/blob/blob_log_format.cc",
"db/blob/blob_log_sequential_reader.cc",
"db/blob/blob_log_writer.cc",
"db/blob/blob_source.cc",
"db/blob/prefetch_buffer_collection.cc",
"db/builder.cc",
"db/c.cc",
@ -359,6 +360,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"db/blob/blob_log_format.cc",
"db/blob/blob_log_sequential_reader.cc",
"db/blob/blob_log_writer.cc",
"db/blob/blob_source.cc",
"db/blob/prefetch_buffer_collection.cc",
"db/builder.cc",
"db/c.cc",
@ -4800,6 +4802,12 @@ cpp_unittest_wrapper(name="blob_garbage_meter_test",
extra_compiler_flags=[])
cpp_unittest_wrapper(name="blob_source_test",
srcs=["db/blob/blob_source_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="block_based_table_reader_test",
srcs=["table/block_based/block_based_table_reader_test.cc"],
deps=[":rocksdb_test_lib"],

@ -0,0 +1,169 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_source.h"
#include <cassert>
#include <string>
#include "db/blob/blob_file_reader.h"
#include "options/cf_options.h"
namespace ROCKSDB_NAMESPACE {
BlobSource::BlobSource(const ImmutableOptions* immutable_options,
const std::string& db_id,
const std::string& db_session_id,
BlobFileCache* blob_file_cache)
: db_id_(db_id),
db_session_id_(db_session_id),
statistics_(immutable_options->statistics.get()),
blob_file_cache_(blob_file_cache),
blob_cache_(immutable_options->blob_cache) {}
BlobSource::~BlobSource() = default;
Status BlobSource::GetBlobFromCache(const Slice& cache_key,
CachableEntry<std::string>* blob) const {
assert(blob);
assert(blob->IsEmpty());
assert(blob_cache_);
assert(!cache_key.empty());
Cache::Handle* cache_handle = nullptr;
cache_handle = GetEntryFromCache(cache_key);
if (cache_handle != nullptr) {
blob->SetCachedValue(
static_cast<std::string*>(blob_cache_->Value(cache_handle)),
blob_cache_.get(), cache_handle);
return Status::OK();
}
assert(blob->IsEmpty());
return Status::NotFound("Blob not found in cache");
}
Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
CachableEntry<std::string>* cached_blob,
PinnableSlice* blob) const {
assert(blob);
assert(!cache_key.empty());
assert(blob_cache_);
Status s;
const Cache::Priority priority = Cache::Priority::LOW;
// Objects to be put into the cache have to be heap-allocated and
// self-contained, i.e. own their contents. The Cache has to be able to take
// unique ownership of them. Therefore, we copy the blob into a string
// directly, and insert that into the cache.
std::string* buf = new std::string();
buf->assign(blob->data(), blob->size());
// TODO: support custom allocators and provide a better estimated memory
// usage using malloc_usable_size.
Cache::Handle* cache_handle = nullptr;
s = InsertEntryIntoCache(cache_key, buf, buf->size(), &cache_handle,
priority);
if (s.ok()) {
assert(cache_handle != nullptr);
cached_blob->SetCachedValue(buf, blob_cache_.get(), cache_handle);
}
return s;
}
Status BlobSource::GetBlob(const ReadOptions& read_options,
const Slice& user_key, uint64_t file_number,
uint64_t offset, uint64_t file_size,
uint64_t value_size,
CompressionType compression_type,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* value, uint64_t* bytes_read) {
assert(value);
Status s;
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
CachableEntry<std::string> blob_entry;
// First, try to get the blob from the cache
//
// If blob cache is enabled, we'll try to read from it.
if (blob_cache_) {
Slice key = cache_key.AsSlice();
s = GetBlobFromCache(key, &blob_entry);
if (s.ok() && blob_entry.GetValue()) {
assert(blob_entry.GetValue()->size() == value_size);
if (bytes_read) {
*bytes_read = value_size;
}
value->PinSelf(*blob_entry.GetValue());
return s;
}
}
assert(blob_entry.IsEmpty());
const bool no_io = read_options.read_tier == kBlockCacheTier;
if (no_io) {
return Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
}
// Can't find the blob from the cache. Since I/O is allowed, read from the
// file.
{
CacheHandleGuard<BlobFileReader> blob_file_reader;
s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
if (!s.ok()) {
return s;
}
assert(blob_file_reader.GetValue());
if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) {
return Status::Corruption("Compression type mismatch when reading blob");
}
s = blob_file_reader.GetValue()->GetBlob(
read_options, user_key, offset, value_size, compression_type,
prefetch_buffer, value, bytes_read);
if (!s.ok()) {
return s;
}
}
if (blob_cache_ && read_options.fill_cache) {
// If filling cache is allowed and a cache is configured, try to put the
// blob to the cache.
Slice key = cache_key.AsSlice();
s = PutBlobIntoCache(key, &blob_entry, value);
if (!s.ok()) {
return s;
}
}
assert(s.ok());
return s;
}
bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
uint64_t offset) const {
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
const Slice key = cache_key.AsSlice();
CachableEntry<std::string> blob_entry;
const Status s = GetBlobFromCache(key, &blob_entry);
if (s.ok() && blob_entry.GetValue() != nullptr) {
return true;
}
return false;
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,88 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cinttypes>
#include "cache/cache_helpers.h"
#include "cache/cache_key.h"
#include "db/blob/blob_file_cache.h"
#include "include/rocksdb/cache.h"
#include "rocksdb/rocksdb_namespace.h"
#include "table/block_based/cachable_entry.h"
namespace ROCKSDB_NAMESPACE {
struct ImmutableOptions;
class Status;
class FilePrefetchBuffer;
class Slice;
// BlobSource is a class that provides universal access to blobs, regardless of
// whether they are in the blob cache, secondary cache, or (remote) storage.
// Depending on user settings, it always fetch blobs from multi-tier cache and
// storage with minimal cost.
class BlobSource {
public:
BlobSource(const ImmutableOptions* immutable_options,
const std::string& db_id, const std::string& db_session_id,
BlobFileCache* blob_file_cache);
BlobSource(const BlobSource&) = delete;
BlobSource& operator=(const BlobSource&) = delete;
~BlobSource();
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
uint64_t file_number, uint64_t offset, uint64_t file_size,
uint64_t value_size, CompressionType compression_type,
FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value,
uint64_t* bytes_read);
bool TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
uint64_t offset) const;
private:
Status GetBlobFromCache(const Slice& cache_key,
CachableEntry<std::string>* blob) const;
Status PutBlobIntoCache(const Slice& cache_key,
CachableEntry<std::string>* cached_blob,
PinnableSlice* blob) const;
inline CacheKey GetCacheKey(uint64_t file_number, uint64_t file_size,
uint64_t offset) const {
OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number,
file_size);
return base_cache_key.WithOffset(offset);
}
inline Cache::Handle* GetEntryFromCache(const Slice& key) const {
return blob_cache_->Lookup(key, statistics_);
}
inline Status InsertEntryIntoCache(const Slice& key, std::string* value,
size_t charge,
Cache::Handle** cache_handle,
Cache::Priority priority) const {
return blob_cache_->Insert(key, value, charge,
&DeleteCacheEntry<std::string>, cache_handle,
priority);
}
const std::string db_id_;
const std::string db_session_id_;
Statistics* statistics_;
// A cache to store blob file reader.
BlobFileCache* blob_file_cache_;
// A cache to store uncompressed blobs.
std::shared_ptr<Cache> blob_cache_;
};
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,298 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_source.h"
#include <cassert>
#include <cstdint>
#include <cstdio>
#include <memory>
#include <string>
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_writer.h"
#include "db/db_test_util.h"
#include "file/filename.h"
#include "file/read_write_util.h"
#include "options/cf_options.h"
#include "rocksdb/options.h"
#include "util/compression.h"
namespace ROCKSDB_NAMESPACE {
namespace {
// Creates a test blob file with `num` blobs in it.
void WriteBlobFile(const ImmutableOptions& immutable_options,
uint32_t column_family_id, bool has_ttl,
const ExpirationRange& expiration_range_header,
const ExpirationRange& expiration_range_footer,
uint64_t blob_file_number, const std::vector<Slice>& keys,
const std::vector<Slice>& blobs, CompressionType compression,
std::vector<uint64_t>& blob_offsets,
std::vector<uint64_t>& blob_sizes) {
assert(!immutable_options.cf_paths.empty());
size_t num = keys.size();
assert(num == blobs.size());
assert(num == blob_offsets.size());
assert(num == blob_sizes.size());
const std::string blob_file_path =
BlobFileName(immutable_options.cf_paths.front().path, blob_file_number);
std::unique_ptr<FSWritableFile> file;
ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file,
FileOptions()));
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), blob_file_path, FileOptions(), immutable_options.clock));
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
constexpr bool do_flush = false;
BlobLogWriter blob_log_writer(std::move(file_writer), immutable_options.clock,
statistics, blob_file_number, use_fsync,
do_flush);
BlobLogHeader header(column_family_id, compression, has_ttl,
expiration_range_header);
ASSERT_OK(blob_log_writer.WriteHeader(header));
std::vector<std::string> compressed_blobs(num);
std::vector<Slice> blobs_to_write(num);
if (kNoCompression == compression) {
for (size_t i = 0; i < num; ++i) {
blobs_to_write[i] = blobs[i];
blob_sizes[i] = blobs[i].size();
}
} else {
CompressionOptions opts;
CompressionContext context(compression);
constexpr uint64_t sample_for_compression = 0;
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
compression, sample_for_compression);
constexpr uint32_t compression_format_version = 2;
for (size_t i = 0; i < num; ++i) {
ASSERT_TRUE(CompressData(blobs[i], info, compression_format_version,
&compressed_blobs[i]));
blobs_to_write[i] = compressed_blobs[i];
blob_sizes[i] = compressed_blobs[i].size();
}
}
for (size_t i = 0; i < num; ++i) {
uint64_t key_offset = 0;
ASSERT_OK(blob_log_writer.AddRecord(keys[i], blobs_to_write[i], &key_offset,
&blob_offsets[i]));
}
BlobLogFooter footer;
footer.blob_count = num;
footer.expiration_range = expiration_range_footer;
std::string checksum_method;
std::string checksum_value;
ASSERT_OK(
blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value));
}
} // anonymous namespace
class BlobSourceTest : public DBTestBase {
protected:
public:
explicit BlobSourceTest()
: DBTestBase("blob_source_test", /*env_do_fsync=*/true) {}
};
TEST_F(BlobSourceTest, GetBlobsFromCache) {
Options options;
options.env = env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(env_, "BlobSourceTest_GetBlobsFromCache"), 0);
options.enable_blob_files = true;
LRUCacheOptions co;
co.capacity = 2048;
co.num_shard_bits = 2;
co.metadata_charge_policy = kDontChargeCacheMetadata;
options.blob_cache = NewLRUCache(co);
options.lowest_used_cache_tier = CacheTier::kVolatileTier;
Reopen(options);
std::string db_id;
ASSERT_OK(db_->GetDbIdentity(db_id));
std::string db_session_id;
ASSERT_OK(db_->GetDbSessionId(db_session_id));
ImmutableOptions immutable_options(options);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
constexpr uint64_t blob_file_number = 1;
constexpr size_t num_blobs = 16;
std::vector<std::string> key_strs;
std::vector<std::string> blob_strs;
for (size_t i = 0; i < num_blobs; ++i) {
key_strs.push_back("key" + std::to_string(i));
blob_strs.push_back("blob" + std::to_string(i));
}
std::vector<Slice> keys;
std::vector<Slice> blobs;
uint64_t file_size = BlobLogHeader::kSize;
for (size_t i = 0; i < num_blobs; ++i) {
keys.push_back({key_strs[i]});
blobs.push_back({blob_strs[i]});
file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size();
}
file_size += BlobLogFooter::kSize;
std::vector<uint64_t> blob_offsets(keys.size());
std::vector<uint64_t> blob_sizes(keys.size());
WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
expiration_range, blob_file_number, keys, blobs, kNoCompression,
blob_offsets, blob_sizes);
constexpr size_t capacity = 1024;
std::shared_ptr<Cache> backing_cache =
NewLRUCache(capacity); // Blob file cache
FileOptions file_options;
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileCache> blob_file_cache(new BlobFileCache(
backing_cache.get(), &immutable_options, &file_options, column_family_id,
blob_file_read_hist, nullptr /*IOTracer*/));
BlobSource blob_source(&immutable_options, db_id, db_session_id,
blob_file_cache.get());
ReadOptions read_options;
read_options.verify_checksums = true;
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
{
// GetBlob
std::vector<PinnableSlice> values(keys.size());
uint64_t bytes_read = 0;
read_options.fill_cache = false;
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number,
blob_offsets[i], file_size, blob_sizes[i],
kNoCompression, prefetch_buffer, &values[i],
&bytes_read));
ASSERT_EQ(values[i], blobs[i]);
ASSERT_EQ(bytes_read,
blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize);
ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
}
read_options.fill_cache = true;
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number,
blob_offsets[i], file_size, blob_sizes[i],
kNoCompression, prefetch_buffer, &values[i],
&bytes_read));
ASSERT_EQ(values[i], blobs[i]);
ASSERT_EQ(bytes_read,
blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize);
ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
}
read_options.fill_cache = true;
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number,
blob_offsets[i], file_size, blob_sizes[i],
kNoCompression, prefetch_buffer, &values[i],
&bytes_read));
ASSERT_EQ(values[i], blobs[i]);
ASSERT_EQ(bytes_read, blob_sizes[i]);
ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
}
// Cache-only GetBlob
read_options.read_tier = ReadTier::kBlockCacheTier;
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number,
blob_offsets[i], file_size, blob_sizes[i],
kNoCompression, prefetch_buffer, &values[i],
&bytes_read));
ASSERT_EQ(values[i], blobs[i]);
ASSERT_EQ(bytes_read, blob_sizes[i]);
ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
}
}
options.blob_cache->EraseUnRefEntries();
{
// Cache-only GetBlob
std::vector<PinnableSlice> values(keys.size());
uint64_t bytes_read = 0;
read_options.read_tier = ReadTier::kBlockCacheTier;
read_options.fill_cache = true;
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
ASSERT_NOK(blob_source.GetBlob(read_options, keys[i], blob_file_number,
blob_offsets[i], file_size, blob_sizes[i],
kNoCompression, prefetch_buffer,
&values[i], &bytes_read));
ASSERT_TRUE(values[i].empty());
ASSERT_EQ(bytes_read, 0);
ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
}
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -290,7 +290,7 @@ class Cache {
virtual const char* Name() const = 0;
// Insert a mapping from key->value into the volatile cache only
// and assign it // the specified charge against the total cache capacity.
// and assign it with the specified charge against the total cache capacity.
// If strict_capacity_limit is true and cache reaches its full capacity,
// return Status::Incomplete.
//
@ -394,8 +394,8 @@ class Cache {
// memory - call this only if you're shutting down the process.
// Any attempts of using cache after this call will fail terribly.
// Always delete the DB object before calling this method!
virtual void DisownData(){
// default implementation is noop
virtual void DisownData() {
// default implementation is noop
}
struct ApplyToAllEntriesOptions {

@ -21,6 +21,7 @@ LIB_SOURCES = \
db/blob/blob_log_format.cc \
db/blob/blob_log_sequential_reader.cc \
db/blob/blob_log_writer.cc \
db/blob/blob_source.cc \
db/blob/prefetch_buffer_collection.cc \
db/builder.cc \
db/c.cc \
@ -419,6 +420,7 @@ TEST_MAIN_SOURCES = \
db/blob/blob_file_garbage_test.cc \
db/blob/blob_file_reader_test.cc \
db/blob/blob_garbage_meter_test.cc \
db/blob/blob_source_test.cc \
db/blob/db_blob_basic_test.cc \
db/blob/db_blob_compaction_test.cc \
db/blob/db_blob_corruption_test.cc \

Loading…
Cancel
Save