diff --git a/CMakeLists.txt b/CMakeLists.txt index 42ccd07f1..6bfe8605e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -605,6 +605,7 @@ set(SOURCES cache/lru_cache.cc cache/sharded_cache.cc db/arena_wrapped_db_iter.cc + db/blob/blob_contents.cc db/blob/blob_fetcher.cc db/blob/blob_file_addition.cc db/blob/blob_file_builder.cc diff --git a/TARGETS b/TARGETS index 9f51dfa92..3d0033d87 100644 --- a/TARGETS +++ b/TARGETS @@ -20,6 +20,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "cache/lru_cache.cc", "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", + "db/blob/blob_contents.cc", "db/blob/blob_fetcher.cc", "db/blob/blob_file_addition.cc", "db/blob/blob_file_builder.cc", @@ -357,6 +358,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "cache/lru_cache.cc", "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", + "db/blob/blob_contents.cc", "db/blob/blob_fetcher.cc", "db/blob/blob_file_addition.cc", "db/blob/blob_file_builder.cc", diff --git a/db/blob/blob_contents.cc b/db/blob/blob_contents.cc new file mode 100644 index 000000000..32c959b0b --- /dev/null +++ b/db/blob/blob_contents.cc @@ -0,0 +1,63 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_contents.h" + +#include + +#include "cache/cache_helpers.h" + +namespace ROCKSDB_NAMESPACE { + +std::unique_ptr BlobContents::Create( + CacheAllocationPtr&& allocation, size_t size) { + return std::unique_ptr( + new BlobContents(std::move(allocation), size)); +} + +size_t BlobContents::SizeCallback(void* obj) { + assert(obj); + + return static_cast(obj)->size(); +} + +Status BlobContents::SaveToCallback(void* from_obj, size_t from_offset, + size_t length, void* out) { + assert(from_obj); + + const BlobContents* buf = static_cast(from_obj); + assert(buf->size() >= from_offset + length); + + memcpy(out, buf->data().data() + from_offset, length); + + return Status::OK(); +} + +void BlobContents::DeleteCallback(const Slice& key, void* value) { + DeleteCacheEntry(key, value); +} + +Cache::CacheItemHelper* BlobContents::GetCacheItemHelper() { + static Cache::CacheItemHelper cache_helper(&SizeCallback, &SaveToCallback, + &DeleteCallback); + + return &cache_helper; +} + +Status BlobContents::CreateCallback(const void* buf, size_t size, + void** out_obj, size_t* charge) { + CacheAllocationPtr allocation(new char[size]); + memcpy(allocation.get(), buf, size); + + std::unique_ptr obj = Create(std::move(allocation), size); + BlobContents* const contents = obj.release(); + + *out_obj = contents; + *charge = contents->size(); + + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_contents.h b/db/blob/blob_contents.h new file mode 100644 index 000000000..9b5369721 --- /dev/null +++ b/db/blob/blob_contents.h @@ -0,0 +1,56 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include "memory/memory_allocator.h" +#include "rocksdb/cache.h" +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" + +namespace ROCKSDB_NAMESPACE { + +// A class representing a single uncompressed value read from a blob file. +class BlobContents { + public: + static std::unique_ptr Create(CacheAllocationPtr&& allocation, + size_t size); + + BlobContents(const BlobContents&) = delete; + BlobContents& operator=(const BlobContents&) = delete; + + BlobContents(BlobContents&&) = default; + BlobContents& operator=(BlobContents&&) = default; + + ~BlobContents() = default; + + const Slice& data() const { return data_; } + size_t size() const { return data_.size(); } + + // Callbacks for secondary cache + static size_t SizeCallback(void* obj); + + static Status SaveToCallback(void* from_obj, size_t from_offset, + size_t length, void* out); + + static void DeleteCallback(const Slice& key, void* value); + + static Cache::CacheItemHelper* GetCacheItemHelper(); + + static Status CreateCallback(const void* buf, size_t size, void** out_obj, + size_t* charge); + + private: + BlobContents(CacheAllocationPtr&& allocation, size_t size) + : allocation_(std::move(allocation)), data_(allocation_.get(), size) {} + + CacheAllocationPtr allocation_; + Slice data_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index c8e99a838..9f769af59 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -7,6 +7,7 @@ #include +#include "db/blob/blob_contents.h" #include "db/blob/blob_file_addition.h" #include "db/blob/blob_file_completion_callback.h" #include "db/blob/blob_index.h" @@ -408,15 +409,16 @@ Status BlobFileBuilder::PutBlobIntoCacheIfNeeded(const Slice& blob, // Objects to be put into the cache have to be heap-allocated and // self-contained, i.e. own their contents. The Cache has to be able to - // take unique ownership of them. Therefore, we copy the blob into a - // string directly, and insert that into the cache. - std::unique_ptr buf = std::make_unique(); - buf->assign(blob.data(), blob.size()); + // take unique ownership of them. + CacheAllocationPtr allocation(new char[blob.size()]); + memcpy(allocation.get(), blob.data(), blob.size()); + std::unique_ptr buf = + BlobContents::Create(std::move(allocation), blob.size()); // TODO: support custom allocators and provide a better estimated memory // usage using malloc_usable_size. s = blob_cache->Insert(key, buf.get(), buf->size(), - &DeleteCacheEntry, + &BlobContents::DeleteCallback, nullptr /* cache_handle */, priority); if (s.ok()) { RecordTick(statistics, BLOB_DB_CACHE_ADD); diff --git a/db/blob/blob_source.cc b/db/blob/blob_source.cc index 945f5481d..8e3ee7834 100644 --- a/db/blob/blob_source.cc +++ b/db/blob/blob_source.cc @@ -10,6 +10,7 @@ #include "cache/cache_reservation_manager.h" #include "cache/charged_cache.h" +#include "db/blob/blob_contents.h" #include "db/blob/blob_file_reader.h" #include "db/blob/blob_log_format.h" #include "monitoring/statistics.h" @@ -43,8 +44,8 @@ BlobSource::BlobSource(const ImmutableOptions* immutable_options, BlobSource::~BlobSource() = default; -Status BlobSource::GetBlobFromCache(const Slice& cache_key, - CacheHandleGuard* blob) const { +Status BlobSource::GetBlobFromCache( + const Slice& cache_key, CacheHandleGuard* blob) const { assert(blob); assert(blob->IsEmpty()); assert(blob_cache_); @@ -53,7 +54,7 @@ Status BlobSource::GetBlobFromCache(const Slice& cache_key, Cache::Handle* cache_handle = nullptr; cache_handle = GetEntryFromCache(cache_key); if (cache_handle != nullptr) { - *blob = CacheHandleGuard(blob_cache_.get(), cache_handle); + *blob = CacheHandleGuard(blob_cache_.get(), cache_handle); return Status::OK(); } @@ -63,7 +64,7 @@ Status BlobSource::GetBlobFromCache(const Slice& cache_key, } Status BlobSource::PutBlobIntoCache(const Slice& cache_key, - CacheHandleGuard* cached_blob, + CacheHandleGuard* cached_blob, PinnableSlice* blob) const { assert(blob); assert(!cache_key.empty()); @@ -74,10 +75,11 @@ Status BlobSource::PutBlobIntoCache(const Slice& cache_key, // Objects to be put into the cache have to be heap-allocated and // self-contained, i.e. own their contents. The Cache has to be able to take - // unique ownership of them. Therefore, we copy the blob into a string - // directly, and insert that into the cache. - std::unique_ptr buf = std::make_unique(); - buf->assign(blob->data(), blob->size()); + // unique ownership of them. + CacheAllocationPtr allocation(new char[blob->size()]); + memcpy(allocation.get(), blob->data(), blob->size()); + std::unique_ptr buf = + BlobContents::Create(std::move(allocation), blob->size()); // TODO: support custom allocators and provide a better estimated memory // usage using malloc_usable_size. @@ -88,7 +90,7 @@ Status BlobSource::PutBlobIntoCache(const Slice& cache_key, buf.release(); assert(cache_handle != nullptr); *cached_blob = - CacheHandleGuard(blob_cache_.get(), cache_handle); + CacheHandleGuard(blob_cache_.get(), cache_handle); } return s; @@ -98,18 +100,9 @@ Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const { Cache::Handle* cache_handle = nullptr; if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) { - Cache::CreateCallback create_cb = [&](const void* buf, size_t size, - void** out_obj, - size_t* charge) -> Status { - std::string* blob = new std::string(); - blob->assign(static_cast(buf), size); - *out_obj = blob; - *charge = size; - return Status::OK(); - }; - cache_handle = blob_cache_->Lookup(key, GetCacheItemHelper(), create_cb, - Cache::Priority::BOTTOM, - true /* wait_for_cache */, statistics_); + cache_handle = blob_cache_->Lookup( + key, BlobContents::GetCacheItemHelper(), &BlobContents::CreateCallback, + Cache::Priority::BOTTOM, true /* wait_for_cache */, statistics_); } else { cache_handle = blob_cache_->Lookup(key, statistics_); } @@ -125,17 +118,17 @@ Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const { return cache_handle; } -Status BlobSource::InsertEntryIntoCache(const Slice& key, std::string* value, +Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value, size_t charge, Cache::Handle** cache_handle, Cache::Priority priority) const { Status s; if (lowest_used_cache_tier_ == CacheTier::kNonVolatileBlockTier) { - s = blob_cache_->Insert(key, value, GetCacheItemHelper(), charge, - cache_handle, priority); + s = blob_cache_->Insert(key, value, BlobContents::GetCacheItemHelper(), + charge, cache_handle, priority); } else { - s = blob_cache_->Insert(key, value, charge, &DeleteCacheEntry, + s = blob_cache_->Insert(key, value, charge, &BlobContents::DeleteCallback, cache_handle, priority); } @@ -164,7 +157,7 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); - CacheHandleGuard blob_handle; + CacheHandleGuard blob_handle; // First, try to get the blob from the cache // @@ -180,7 +173,7 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, // the target PinnableSlice. This has the potential to save a lot of // CPU, especially with large blob values. value->PinSlice( - *blob_handle.GetValue(), + blob_handle.GetValue()->data(), [](void* arg1, void* arg2) { Cache* const cache = static_cast(arg1); Cache::Handle* const handle = static_cast(arg2); @@ -310,7 +303,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, for (size_t i = 0; i < num_blobs; ++i) { auto& req = blob_reqs[i]; - CacheHandleGuard blob_handle; + CacheHandleGuard blob_handle; const CacheKey cache_key = base_cache_key.WithOffset(req.offset); const Slice key = cache_key.AsSlice(); @@ -327,7 +320,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, // to the target PinnableSlice. This has the potential to save a lot // of CPU, especially with large blob values. req.result->PinSlice( - *blob_handle.GetValue(), + blob_handle.GetValue()->data(), [](void* arg1, void* arg2) { Cache* const cache = static_cast(arg1); Cache::Handle* const handle = static_cast(arg2); @@ -407,7 +400,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, // the blob(s) to the cache. for (size_t i = 0; i < _blob_reqs.size(); ++i) { if (_blob_reqs[i]->status->ok()) { - CacheHandleGuard blob_handle; + CacheHandleGuard blob_handle; const CacheKey cache_key = base_cache_key.WithOffset(_blob_reqs[i]->offset); const Slice key = cache_key.AsSlice(); @@ -431,7 +424,7 @@ bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size, const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); const Slice key = cache_key.AsSlice(); - CacheHandleGuard blob_handle; + CacheHandleGuard blob_handle; const Status s = GetBlobFromCache(key, &blob_handle); if (s.ok() && blob_handle.GetValue() != nullptr) { @@ -441,25 +434,4 @@ bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size, return false; } -// Callbacks for secondary blob cache -size_t BlobSource::SizeCallback(void* obj) { - assert(obj != nullptr); - return static_cast(obj)->size(); -} - -Status BlobSource::SaveToCallback(void* from_obj, size_t from_offset, - size_t length, void* out) { - assert(from_obj != nullptr); - const std::string* buf = static_cast(from_obj); - assert(buf->size() >= from_offset + length); - memcpy(out, buf->data() + from_offset, length); - return Status::OK(); -} - -Cache::CacheItemHelper* BlobSource::GetCacheItemHelper() { - static Cache::CacheItemHelper cache_helper(SizeCallback, SaveToCallback, - &DeleteCacheEntry); - return &cache_helper; -} - } // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_source.h b/db/blob/blob_source.h index ffc8ae45f..dd10e4d0e 100644 --- a/db/blob/blob_source.h +++ b/db/blob/blob_source.h @@ -22,6 +22,7 @@ struct ImmutableOptions; class Status; class FilePrefetchBuffer; class Slice; +class BlobContents; // BlobSource is a class that provides universal access to blobs, regardless of // whether they are in the blob cache, secondary cache, or (remote) storage. @@ -106,15 +107,15 @@ class BlobSource { private: Status GetBlobFromCache(const Slice& cache_key, - CacheHandleGuard* blob) const; + CacheHandleGuard* blob) const; Status PutBlobIntoCache(const Slice& cache_key, - CacheHandleGuard* cached_blob, + CacheHandleGuard* cached_blob, PinnableSlice* blob) const; Cache::Handle* GetEntryFromCache(const Slice& key) const; - Status InsertEntryIntoCache(const Slice& key, std::string* value, + Status InsertEntryIntoCache(const Slice& key, BlobContents* value, size_t charge, Cache::Handle** cache_handle, Cache::Priority priority) const; @@ -124,14 +125,6 @@ class BlobSource { return base_cache_key.WithOffset(offset); } - // Callbacks for secondary blob cache - static size_t SizeCallback(void* obj); - - static Status SaveToCallback(void* from_obj, size_t from_offset, - size_t length, void* out); - - static Cache::CacheItemHelper* GetCacheItemHelper(); - const std::string& db_id_; const std::string& db_session_id_; diff --git a/db/blob/blob_source_test.cc b/db/blob/blob_source_test.cc index b3e3c953f..eac2936cd 100644 --- a/db/blob/blob_source_test.cc +++ b/db/blob/blob_source_test.cc @@ -13,6 +13,7 @@ #include "cache/charged_cache.h" #include "cache/compressed_secondary_cache.h" +#include "db/blob/blob_contents.h" #include "db/blob/blob_file_cache.h" #include "db/blob/blob_file_reader.h" #include "db/blob/blob_log_format.h" @@ -1136,16 +1137,6 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { auto blob_cache = options_.blob_cache; auto secondary_cache = lru_cache_opts_.secondary_cache; - Cache::CreateCallback create_cb = [&](const void* buf, size_t size, - void** out_obj, - size_t* charge) -> Status { - std::string* blob = new std::string(); - blob->assign(static_cast(buf), size); - *out_obj = blob; - *charge = size; - return Status::OK(); - }; - { // GetBlob std::vector values(keys.size()); @@ -1187,13 +1178,14 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { // key0 should be in the secondary cache. After looking up key0 in the // secondary cache, it will be erased from the secondary cache. bool is_in_sec_cache = false; - auto sec_handle0 = - secondary_cache->Lookup(key0, create_cb, true, is_in_sec_cache); + auto sec_handle0 = secondary_cache->Lookup( + key0, &BlobContents::CreateCallback, true, is_in_sec_cache); ASSERT_FALSE(is_in_sec_cache); ASSERT_NE(sec_handle0, nullptr); ASSERT_TRUE(sec_handle0->IsReady()); - auto value = static_cast(sec_handle0->Value()); - ASSERT_EQ(*value, blobs[0]); + auto value = static_cast(sec_handle0->Value()); + ASSERT_NE(value, nullptr); + ASSERT_EQ(value->data(), blobs[0]); delete value; // key0 doesn't exist in the blob cache @@ -1210,8 +1202,8 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { blob_cache->Release(handle1); bool is_in_sec_cache = false; - auto sec_handle1 = - secondary_cache->Lookup(key1, create_cb, true, is_in_sec_cache); + auto sec_handle1 = secondary_cache->Lookup( + key1, &BlobContents::CreateCallback, true, is_in_sec_cache); ASSERT_FALSE(is_in_sec_cache); ASSERT_EQ(sec_handle1, nullptr); @@ -1232,8 +1224,9 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { const Slice key0 = cache_key0.AsSlice(); auto handle0 = blob_cache->Lookup(key0, statistics); ASSERT_NE(handle0, nullptr); - auto value = static_cast(blob_cache->Value(handle0)); - ASSERT_EQ(*value, blobs[0]); + auto value = static_cast(blob_cache->Value(handle0)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(value->data(), blobs[0]); blob_cache->Release(handle0); // key1 is not in the primary cache, and it should be demoted to the @@ -1259,8 +1252,9 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { // key1 should be in the primary cache. handle1 = blob_cache->Lookup(key1, statistics); ASSERT_NE(handle1, nullptr); - value = static_cast(blob_cache->Value(handle1)); - ASSERT_EQ(*value, blobs[1]); + value = static_cast(blob_cache->Value(handle1)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(value->data(), blobs[1]); blob_cache->Release(handle1); } } diff --git a/src.mk b/src.mk index c6dad4f00..c8b470213 100644 --- a/src.mk +++ b/src.mk @@ -11,6 +11,7 @@ LIB_SOURCES = \ cache/compressed_secondary_cache.cc \ cache/sharded_cache.cc \ db/arena_wrapped_db_iter.cc \ + db/blob/blob_contents.cc \ db/blob/blob_fetcher.cc \ db/blob/blob_file_addition.cc \ db/blob/blob_file_builder.cc \