Introduce a dedicated class to represent blob values (#10571)

Summary:
The patch introduces a new class called `BlobContents`, which represents
a single uncompressed blob value. We currently use `std::string` for this
purpose; `BlobContents` is somewhat smaller but the primary reason for a
dedicated class is that it enables certain improvements and optimizations
like eliding a copy when inserting a blob into the cache, using custom
allocators, or more control over and better accounting of the memory usage
of cached blobs (see https://github.com/facebook/rocksdb/issues/10484).
(We plan to implement these in subsequent PRs.)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10571

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D39000965

Pulled By: ltamasi

fbshipit-source-id: f296eddf9dec4fc3e11cad525b462bdf63c78f96
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent 418b36a9bc
commit 3f57d84af4
  1. 1
      CMakeLists.txt
  2. 2
      TARGETS
  3. 63
      db/blob/blob_contents.cc
  4. 56
      db/blob/blob_contents.h
  5. 12
      db/blob/blob_file_builder.cc
  6. 76
      db/blob/blob_source.cc
  7. 15
      db/blob/blob_source.h
  8. 34
      db/blob/blob_source_test.cc
  9. 1
      src.mk

@ -605,6 +605,7 @@ set(SOURCES
cache/lru_cache.cc
cache/sharded_cache.cc
db/arena_wrapped_db_iter.cc
db/blob/blob_contents.cc
db/blob/blob_fetcher.cc
db/blob/blob_file_addition.cc
db/blob/blob_file_builder.cc

@ -20,6 +20,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_contents.cc",
"db/blob/blob_fetcher.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
@ -357,6 +358,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_contents.cc",
"db/blob/blob_fetcher.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",

@ -0,0 +1,63 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_contents.h"
#include <cassert>
#include "cache/cache_helpers.h"
namespace ROCKSDB_NAMESPACE {
std::unique_ptr<BlobContents> BlobContents::Create(
CacheAllocationPtr&& allocation, size_t size) {
return std::unique_ptr<BlobContents>(
new BlobContents(std::move(allocation), size));
}
size_t BlobContents::SizeCallback(void* obj) {
assert(obj);
return static_cast<const BlobContents*>(obj)->size();
}
Status BlobContents::SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out) {
assert(from_obj);
const BlobContents* buf = static_cast<const BlobContents*>(from_obj);
assert(buf->size() >= from_offset + length);
memcpy(out, buf->data().data() + from_offset, length);
return Status::OK();
}
void BlobContents::DeleteCallback(const Slice& key, void* value) {
DeleteCacheEntry<BlobContents>(key, value);
}
Cache::CacheItemHelper* BlobContents::GetCacheItemHelper() {
static Cache::CacheItemHelper cache_helper(&SizeCallback, &SaveToCallback,
&DeleteCallback);
return &cache_helper;
}
Status BlobContents::CreateCallback(const void* buf, size_t size,
void** out_obj, size_t* charge) {
CacheAllocationPtr allocation(new char[size]);
memcpy(allocation.get(), buf, size);
std::unique_ptr<BlobContents> obj = Create(std::move(allocation), size);
BlobContents* const contents = obj.release();
*out_obj = contents;
*charge = contents->size();
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,56 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <memory>
#include "memory/memory_allocator.h"
#include "rocksdb/cache.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
// A class representing a single uncompressed value read from a blob file.
class BlobContents {
public:
static std::unique_ptr<BlobContents> Create(CacheAllocationPtr&& allocation,
size_t size);
BlobContents(const BlobContents&) = delete;
BlobContents& operator=(const BlobContents&) = delete;
BlobContents(BlobContents&&) = default;
BlobContents& operator=(BlobContents&&) = default;
~BlobContents() = default;
const Slice& data() const { return data_; }
size_t size() const { return data_.size(); }
// Callbacks for secondary cache
static size_t SizeCallback(void* obj);
static Status SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out);
static void DeleteCallback(const Slice& key, void* value);
static Cache::CacheItemHelper* GetCacheItemHelper();
static Status CreateCallback(const void* buf, size_t size, void** out_obj,
size_t* charge);
private:
BlobContents(CacheAllocationPtr&& allocation, size_t size)
: allocation_(std::move(allocation)), data_(allocation_.get(), size) {}
CacheAllocationPtr allocation_;
Slice data_;
};
} // namespace ROCKSDB_NAMESPACE

@ -7,6 +7,7 @@
#include <cassert>
#include "db/blob/blob_contents.h"
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_file_completion_callback.h"
#include "db/blob/blob_index.h"
@ -408,15 +409,16 @@ Status BlobFileBuilder::PutBlobIntoCacheIfNeeded(const Slice& blob,
// Objects to be put into the cache have to be heap-allocated and
// self-contained, i.e. own their contents. The Cache has to be able to
// take unique ownership of them. Therefore, we copy the blob into a
// string directly, and insert that into the cache.
std::unique_ptr<std::string> buf = std::make_unique<std::string>();
buf->assign(blob.data(), blob.size());
// take unique ownership of them.
CacheAllocationPtr allocation(new char[blob.size()]);
memcpy(allocation.get(), blob.data(), blob.size());
std::unique_ptr<BlobContents> buf =
BlobContents::Create(std::move(allocation), blob.size());
// TODO: support custom allocators and provide a better estimated memory
// usage using malloc_usable_size.
s = blob_cache->Insert(key, buf.get(), buf->size(),
&DeleteCacheEntry<std::string>,
&BlobContents::DeleteCallback,
nullptr /* cache_handle */, priority);
if (s.ok()) {
RecordTick(statistics, BLOB_DB_CACHE_ADD);

@ -10,6 +10,7 @@
#include "cache/cache_reservation_manager.h"
#include "cache/charged_cache.h"
#include "db/blob/blob_contents.h"
#include "db/blob/blob_file_reader.h"
#include "db/blob/blob_log_format.h"
#include "monitoring/statistics.h"
@ -43,8 +44,8 @@ BlobSource::BlobSource(const ImmutableOptions* immutable_options,
BlobSource::~BlobSource() = default;
Status BlobSource::GetBlobFromCache(const Slice& cache_key,
CacheHandleGuard<std::string>* blob) const {
Status BlobSource::GetBlobFromCache(
const Slice& cache_key, CacheHandleGuard<BlobContents>* blob) const {
assert(blob);
assert(blob->IsEmpty());
assert(blob_cache_);
@ -53,7 +54,7 @@ Status BlobSource::GetBlobFromCache(const Slice& cache_key,
Cache::Handle* cache_handle = nullptr;
cache_handle = GetEntryFromCache(cache_key);
if (cache_handle != nullptr) {
*blob = CacheHandleGuard<std::string>(blob_cache_.get(), cache_handle);
*blob = CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
return Status::OK();
}
@ -63,7 +64,7 @@ Status BlobSource::GetBlobFromCache(const Slice& cache_key,
}
Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
CacheHandleGuard<std::string>* cached_blob,
CacheHandleGuard<BlobContents>* cached_blob,
PinnableSlice* blob) const {
assert(blob);
assert(!cache_key.empty());
@ -74,10 +75,11 @@ Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
// Objects to be put into the cache have to be heap-allocated and
// self-contained, i.e. own their contents. The Cache has to be able to take
// unique ownership of them. Therefore, we copy the blob into a string
// directly, and insert that into the cache.
std::unique_ptr<std::string> buf = std::make_unique<std::string>();
buf->assign(blob->data(), blob->size());
// unique ownership of them.
CacheAllocationPtr allocation(new char[blob->size()]);
memcpy(allocation.get(), blob->data(), blob->size());
std::unique_ptr<BlobContents> buf =
BlobContents::Create(std::move(allocation), blob->size());
// TODO: support custom allocators and provide a better estimated memory
// usage using malloc_usable_size.
@ -88,7 +90,7 @@ Status BlobSource::PutBlobIntoCache(const Slice& cache_key,
buf.release();
assert(cache_handle != nullptr);
*cached_blob =
CacheHandleGuard<std::string>(blob_cache_.get(), cache_handle);
CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
}
return s;
@ -98,18 +100,9 @@ Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const {
Cache::Handle* cache_handle = nullptr;
if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) {
Cache::CreateCallback create_cb = [&](const void* buf, size_t size,
void** out_obj,
size_t* charge) -> Status {
std::string* blob = new std::string();
blob->assign(static_cast<const char*>(buf), size);
*out_obj = blob;
*charge = size;
return Status::OK();
};
cache_handle = blob_cache_->Lookup(key, GetCacheItemHelper(), create_cb,
Cache::Priority::BOTTOM,
true /* wait_for_cache */, statistics_);
cache_handle = blob_cache_->Lookup(
key, BlobContents::GetCacheItemHelper(), &BlobContents::CreateCallback,
Cache::Priority::BOTTOM, true /* wait_for_cache */, statistics_);
} else {
cache_handle = blob_cache_->Lookup(key, statistics_);
}
@ -125,17 +118,17 @@ Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const {
return cache_handle;
}
Status BlobSource::InsertEntryIntoCache(const Slice& key, std::string* value,
Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value,
size_t charge,
Cache::Handle** cache_handle,
Cache::Priority priority) const {
Status s;
if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) {
s = blob_cache_->Insert(key, value, GetCacheItemHelper(), charge,
cache_handle, priority);
s = blob_cache_->Insert(key, value, BlobContents::GetCacheItemHelper(),
charge, cache_handle, priority);
} else {
s = blob_cache_->Insert(key, value, charge, &DeleteCacheEntry<std::string>,
s = blob_cache_->Insert(key, value, charge, &BlobContents::DeleteCallback,
cache_handle, priority);
}
@ -164,7 +157,7 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
CacheHandleGuard<std::string> blob_handle;
CacheHandleGuard<BlobContents> blob_handle;
// First, try to get the blob from the cache
//
@ -180,7 +173,7 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
// the target PinnableSlice. This has the potential to save a lot of
// CPU, especially with large blob values.
value->PinSlice(
*blob_handle.GetValue(),
blob_handle.GetValue()->data(),
[](void* arg1, void* arg2) {
Cache* const cache = static_cast<Cache*>(arg1);
Cache::Handle* const handle = static_cast<Cache::Handle*>(arg2);
@ -310,7 +303,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
for (size_t i = 0; i < num_blobs; ++i) {
auto& req = blob_reqs[i];
CacheHandleGuard<std::string> blob_handle;
CacheHandleGuard<BlobContents> blob_handle;
const CacheKey cache_key = base_cache_key.WithOffset(req.offset);
const Slice key = cache_key.AsSlice();
@ -327,7 +320,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
// to the target PinnableSlice. This has the potential to save a lot
// of CPU, especially with large blob values.
req.result->PinSlice(
*blob_handle.GetValue(),
blob_handle.GetValue()->data(),
[](void* arg1, void* arg2) {
Cache* const cache = static_cast<Cache*>(arg1);
Cache::Handle* const handle = static_cast<Cache::Handle*>(arg2);
@ -407,7 +400,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
// the blob(s) to the cache.
for (size_t i = 0; i < _blob_reqs.size(); ++i) {
if (_blob_reqs[i]->status->ok()) {
CacheHandleGuard<std::string> blob_handle;
CacheHandleGuard<BlobContents> blob_handle;
const CacheKey cache_key =
base_cache_key.WithOffset(_blob_reqs[i]->offset);
const Slice key = cache_key.AsSlice();
@ -431,7 +424,7 @@ bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
const Slice key = cache_key.AsSlice();
CacheHandleGuard<std::string> blob_handle;
CacheHandleGuard<BlobContents> blob_handle;
const Status s = GetBlobFromCache(key, &blob_handle);
if (s.ok() && blob_handle.GetValue() != nullptr) {
@ -441,25 +434,4 @@ bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
return false;
}
// Callbacks for secondary blob cache
size_t BlobSource::SizeCallback(void* obj) {
assert(obj != nullptr);
return static_cast<const std::string*>(obj)->size();
}
Status BlobSource::SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out) {
assert(from_obj != nullptr);
const std::string* buf = static_cast<const std::string*>(from_obj);
assert(buf->size() >= from_offset + length);
memcpy(out, buf->data() + from_offset, length);
return Status::OK();
}
Cache::CacheItemHelper* BlobSource::GetCacheItemHelper() {
static Cache::CacheItemHelper cache_helper(SizeCallback, SaveToCallback,
&DeleteCacheEntry<std::string>);
return &cache_helper;
}
} // namespace ROCKSDB_NAMESPACE

@ -22,6 +22,7 @@ struct ImmutableOptions;
class Status;
class FilePrefetchBuffer;
class Slice;
class BlobContents;
// BlobSource is a class that provides universal access to blobs, regardless of
// whether they are in the blob cache, secondary cache, or (remote) storage.
@ -106,15 +107,15 @@ class BlobSource {
private:
Status GetBlobFromCache(const Slice& cache_key,
CacheHandleGuard<std::string>* blob) const;
CacheHandleGuard<BlobContents>* blob) const;
Status PutBlobIntoCache(const Slice& cache_key,
CacheHandleGuard<std::string>* cached_blob,
CacheHandleGuard<BlobContents>* cached_blob,
PinnableSlice* blob) const;
Cache::Handle* GetEntryFromCache(const Slice& key) const;
Status InsertEntryIntoCache(const Slice& key, std::string* value,
Status InsertEntryIntoCache(const Slice& key, BlobContents* value,
size_t charge, Cache::Handle** cache_handle,
Cache::Priority priority) const;
@ -124,14 +125,6 @@ class BlobSource {
return base_cache_key.WithOffset(offset);
}
// Callbacks for secondary blob cache
static size_t SizeCallback(void* obj);
static Status SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out);
static Cache::CacheItemHelper* GetCacheItemHelper();
const std::string& db_id_;
const std::string& db_session_id_;

@ -13,6 +13,7 @@
#include "cache/charged_cache.h"
#include "cache/compressed_secondary_cache.h"
#include "db/blob/blob_contents.h"
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_file_reader.h"
#include "db/blob/blob_log_format.h"
@ -1136,16 +1137,6 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
auto blob_cache = options_.blob_cache;
auto secondary_cache = lru_cache_opts_.secondary_cache;
Cache::CreateCallback create_cb = [&](const void* buf, size_t size,
void** out_obj,
size_t* charge) -> Status {
std::string* blob = new std::string();
blob->assign(static_cast<const char*>(buf), size);
*out_obj = blob;
*charge = size;
return Status::OK();
};
{
// GetBlob
std::vector<PinnableSlice> values(keys.size());
@ -1187,13 +1178,14 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
// key0 should be in the secondary cache. After looking up key0 in the
// secondary cache, it will be erased from the secondary cache.
bool is_in_sec_cache = false;
auto sec_handle0 =
secondary_cache->Lookup(key0, create_cb, true, is_in_sec_cache);
auto sec_handle0 = secondary_cache->Lookup(
key0, &BlobContents::CreateCallback, true, is_in_sec_cache);
ASSERT_FALSE(is_in_sec_cache);
ASSERT_NE(sec_handle0, nullptr);
ASSERT_TRUE(sec_handle0->IsReady());
auto value = static_cast<std::string*>(sec_handle0->Value());
ASSERT_EQ(*value, blobs[0]);
auto value = static_cast<BlobContents*>(sec_handle0->Value());
ASSERT_NE(value, nullptr);
ASSERT_EQ(value->data(), blobs[0]);
delete value;
// key0 doesn't exist in the blob cache
@ -1210,8 +1202,8 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
blob_cache->Release(handle1);
bool is_in_sec_cache = false;
auto sec_handle1 =
secondary_cache->Lookup(key1, create_cb, true, is_in_sec_cache);
auto sec_handle1 = secondary_cache->Lookup(
key1, &BlobContents::CreateCallback, true, is_in_sec_cache);
ASSERT_FALSE(is_in_sec_cache);
ASSERT_EQ(sec_handle1, nullptr);
@ -1232,8 +1224,9 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
const Slice key0 = cache_key0.AsSlice();
auto handle0 = blob_cache->Lookup(key0, statistics);
ASSERT_NE(handle0, nullptr);
auto value = static_cast<std::string*>(blob_cache->Value(handle0));
ASSERT_EQ(*value, blobs[0]);
auto value = static_cast<BlobContents*>(blob_cache->Value(handle0));
ASSERT_NE(value, nullptr);
ASSERT_EQ(value->data(), blobs[0]);
blob_cache->Release(handle0);
// key1 is not in the primary cache, and it should be demoted to the
@ -1259,8 +1252,9 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
// key1 should be in the primary cache.
handle1 = blob_cache->Lookup(key1, statistics);
ASSERT_NE(handle1, nullptr);
value = static_cast<std::string*>(blob_cache->Value(handle1));
ASSERT_EQ(*value, blobs[1]);
value = static_cast<BlobContents*>(blob_cache->Value(handle1));
ASSERT_NE(value, nullptr);
ASSERT_EQ(value->data(), blobs[1]);
blob_cache->Release(handle1);
}
}

@ -11,6 +11,7 @@ LIB_SOURCES = \
cache/compressed_secondary_cache.cc \
cache/sharded_cache.cc \
db/arena_wrapped_db_iter.cc \
db/blob/blob_contents.cc \
db/blob/blob_fetcher.cc \
db/blob/blob_file_addition.cc \
db/blob/blob_file_builder.cc \

Loading…
Cancel
Save