From ba1f62ddfbffed7f7bc6dd4344f3639ae0df75aa Mon Sep 17 00:00:00 2001 From: Gang Liao Date: Thu, 23 Jun 2022 13:52:00 -0700 Subject: [PATCH] Read from blob cache first when MultiGetBlob() (#10225) Summary: There is currently no caching mechanism for blobs, which is not ideal especially when the database resides on remote storage (where we cannot rely on the OS page cache). As part of this task, we would like to make it possible for the application to configure a blob cache. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10225 Test Plan: Add test cases for MultiGetBlob In this task, we added the new API MultiGetBlob() for BlobSource. This PR is a part of https://github.com/facebook/rocksdb/issues/10156 Reviewed By: ltamasi Differential Revision: D37358364 Pulled By: gangliao fbshipit-source-id: aff053a37615d96d768fb9aedde17da5618c7ae6 --- db/blob/blob_source.cc | 134 +++++++++++++++++ db/blob/blob_source.h | 10 ++ db/blob/blob_source_test.cc | 278 +++++++++++++++++++++++++++++++++++- 3 files changed, 417 insertions(+), 5 deletions(-) diff --git a/db/blob/blob_source.cc b/db/blob/blob_source.cc index 83d00db10..9bba5d8cc 100644 --- a/db/blob/blob_source.cc +++ b/db/blob/blob_source.cc @@ -10,6 +10,7 @@ #include "db/blob/blob_file_reader.h" #include "options/cf_options.h" +#include "table/multiget_context.h" namespace ROCKSDB_NAMESPACE { @@ -152,6 +153,139 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, return s; } +void BlobSource::MultiGetBlob( + const ReadOptions& read_options, + const autovector>& user_keys, + uint64_t file_number, uint64_t file_size, + const autovector& offsets, + const autovector& value_sizes, autovector& statuses, + autovector& blobs, uint64_t* bytes_read) { + size_t num_blobs = user_keys.size(); + assert(num_blobs > 0); + assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE); + assert(num_blobs == offsets.size()); + assert(num_blobs == value_sizes.size()); + assert(num_blobs == statuses.size()); + assert(num_blobs == blobs.size()); + +#ifndef NDEBUG + for (size_t i = 0; i < offsets.size() - 1; ++i) { + assert(offsets[i] <= offsets[i + 1]); + } +#endif // !NDEBUG + + using Mask = uint64_t; + Mask cache_hit_mask = 0; + + Status s; + uint64_t total_bytes = 0; + const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number, + file_size); + + if (blob_cache_) { + size_t cached_blob_count = 0; + for (size_t i = 0; i < num_blobs; ++i) { + CachableEntry blob_entry; + const CacheKey cache_key = base_cache_key.WithOffset(offsets[i]); + const Slice key = cache_key.AsSlice(); + + s = GetBlobFromCache(key, &blob_entry); + if (s.ok() && blob_entry.GetValue()) { + assert(statuses[i]); + assert(blob_entry.GetValue()->size() == value_sizes[i]); + + *statuses[i] = s; + blobs[i]->PinSelf(*blob_entry.GetValue()); + + // Update the counter for the number of valid blobs read from the cache. + ++cached_blob_count; + total_bytes += value_sizes[i]; + cache_hit_mask |= (Mask{1} << i); // cache hit + } + } + + // All blobs were read from the cache. + if (cached_blob_count == num_blobs) { + if (bytes_read) { + *bytes_read = total_bytes; + } + return; + } + } + + const bool no_io = read_options.read_tier == kBlockCacheTier; + if (no_io) { + for (size_t i = 0; i < num_blobs; ++i) { + if (!(cache_hit_mask & (Mask{1} << i))) { + assert(statuses[i]); + *statuses[i] = + Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); + } + } + return; + } + + { + // Find the rest of blobs from the file since I/O is allowed. + autovector> _user_keys; + autovector _offsets; + autovector _value_sizes; + autovector _statuses; + autovector _blobs; + uint64_t _bytes_read = 0; + + for (size_t i = 0; i < num_blobs; ++i) { + if (!(cache_hit_mask & (Mask{1} << i))) { + _user_keys.emplace_back(user_keys[i]); + _offsets.push_back(offsets[i]); + _value_sizes.push_back(value_sizes[i]); + _statuses.push_back(statuses[i]); + _blobs.push_back(blobs[i]); + } + } + + CacheHandleGuard blob_file_reader; + s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader); + if (!s.ok()) { + for (size_t i = 0; i < _blobs.size(); ++i) { + assert(_statuses[i]); + *_statuses[i] = s; + } + return; + } + + assert(blob_file_reader.GetValue()); + + blob_file_reader.GetValue()->MultiGetBlob(read_options, _user_keys, + _offsets, _value_sizes, _statuses, + _blobs, &_bytes_read); + + if (read_options.fill_cache) { + // If filling cache is allowed and a cache is configured, try to put + // the blob(s) to the cache. + for (size_t i = 0; i < _blobs.size(); ++i) { + if (_statuses[i]->ok()) { + CachableEntry blob_entry; + const CacheKey cache_key = base_cache_key.WithOffset(_offsets[i]); + const Slice key = cache_key.AsSlice(); + + s = PutBlobIntoCache(key, &blob_entry, _blobs[i]); + if (!s.ok()) { + *_statuses[i] = s; + } + } + } + } + + total_bytes += _bytes_read; + if (bytes_read) { + *bytes_read = total_bytes; + } + + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, _bytes_read); + } +} + bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size, uint64_t offset) const { const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); diff --git a/db/blob/blob_source.h b/db/blob/blob_source.h index 2a1a13c80..0d32e8060 100644 --- a/db/blob/blob_source.h +++ b/db/blob/blob_source.h @@ -13,6 +13,7 @@ #include "rocksdb/cache.h" #include "rocksdb/rocksdb_namespace.h" #include "table/block_based/cachable_entry.h" +#include "util/autovector.h" namespace ROCKSDB_NAMESPACE { @@ -42,6 +43,15 @@ class BlobSource { FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read); + // Offsets must be sorted in ascending order by caller. + void MultiGetBlob( + const ReadOptions& read_options, + const autovector>& user_keys, + uint64_t file_number, uint64_t file_size, + const autovector& offsets, + const autovector& value_sizes, autovector& statuses, + autovector& blobs, uint64_t* bytes_read); + inline Status GetBlobFileReader( uint64_t blob_file_number, CacheHandleGuard* blob_file_reader) { diff --git a/db/blob/blob_source_test.cc b/db/blob/blob_source_test.cc index 55f451d77..7bd650b14 100644 --- a/db/blob/blob_source_test.cc +++ b/db/blob/blob_source_test.cc @@ -117,6 +117,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { options.cf_paths.emplace_back( test::PerThreadDBPath(env_, "BlobSourceTest_GetBlobsFromCache"), 0); options.enable_blob_files = true; + options.create_if_missing = true; LRUCacheOptions co; co.capacity = 2048; @@ -125,7 +126,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { options.blob_cache = NewLRUCache(co); options.lowest_used_cache_tier = CacheTier::kVolatileTier; - Reopen(options); + DestroyAndReopen(options); std::string db_id; ASSERT_OK(db_->GetDbIdentity(db_id)); @@ -277,10 +278,12 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); - ASSERT_NOK(blob_source.GetBlob(read_options, keys[i], blob_file_number, - blob_offsets[i], file_size, blob_sizes[i], - kNoCompression, prefetch_buffer, - &values[i], &bytes_read)); + ASSERT_TRUE(blob_source + .GetBlob(read_options, keys[i], blob_file_number, + blob_offsets[i], file_size, blob_sizes[i], + kNoCompression, prefetch_buffer, &values[i], + &bytes_read) + .IsIncomplete()); ASSERT_TRUE(values[i].empty()); ASSERT_EQ(bytes_read, 0); @@ -288,6 +291,271 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { blob_offsets[i])); } } + + { + // GetBlob from non-existing file + std::vector values(keys.size()); + uint64_t bytes_read = 0; + uint64_t file_number = 100; // non-existing file + + read_options.read_tier = ReadTier::kReadAllTier; + read_options.fill_cache = true; + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + + ASSERT_TRUE(blob_source + .GetBlob(read_options, keys[i], file_number, + blob_offsets[i], file_size, blob_sizes[i], + kNoCompression, prefetch_buffer, &values[i], + &bytes_read) + .IsIOError()); + ASSERT_TRUE(values[i].empty()); + ASSERT_EQ(bytes_read, 0); + + ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + } + } +} + +TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { + Options options; + options.env = env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0); + options.enable_blob_files = true; + options.create_if_missing = true; + + LRUCacheOptions co; + co.capacity = 2048; + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + options.blob_cache = NewLRUCache(co); + options.lowest_used_cache_tier = CacheTier::kVolatileTier; + + DestroyAndReopen(options); + + std::string db_id; + ASSERT_OK(db_->GetDbIdentity(db_id)); + + std::string db_session_id; + ASSERT_OK(db_->GetDbSessionId(db_session_id)); + + ImmutableOptions immutable_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_file_number = 1; + constexpr size_t num_blobs = 16; + + std::vector key_strs; + std::vector blob_strs; + + for (size_t i = 0; i < num_blobs; ++i) { + key_strs.push_back("key" + std::to_string(i)); + blob_strs.push_back("blob" + std::to_string(i)); + } + + std::vector keys; + std::vector blobs; + + uint64_t file_size = BlobLogHeader::kSize; + for (size_t i = 0; i < num_blobs; ++i) { + keys.push_back({key_strs[i]}); + blobs.push_back({blob_strs[i]}); + file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size(); + } + file_size += BlobLogFooter::kSize; + + std::vector blob_offsets(keys.size()); + std::vector blob_sizes(keys.size()); + + WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range, + expiration_range, blob_file_number, keys, blobs, kNoCompression, + blob_offsets, blob_sizes); + + constexpr size_t capacity = 10; + std::shared_ptr backing_cache = + NewLRUCache(capacity); // Blob file cache + + FileOptions file_options; + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr blob_file_cache(new BlobFileCache( + backing_cache.get(), &immutable_options, &file_options, column_family_id, + blob_file_read_hist, nullptr /*IOTracer*/)); + + BlobSource blob_source(&immutable_options, db_id, db_session_id, + blob_file_cache.get()); + + ReadOptions read_options; + read_options.verify_checksums = true; + + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; + + { + // MultiGetBlob + uint64_t bytes_read = 0; + + autovector> key_refs; + autovector offsets; + autovector sizes; + std::array statuses_buf; + autovector statuses; + std::array value_buf; + autovector values; + + for (size_t i = 0; i < num_blobs; i += 2) { // even index + key_refs.emplace_back(std::cref(keys[i])); + offsets.push_back(blob_offsets[i]); + sizes.push_back(blob_sizes[i]); + statuses.push_back(&statuses_buf[i]); + values.push_back(&value_buf[i]); + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + + read_options.fill_cache = true; + read_options.read_tier = ReadTier::kReadAllTier; + + // Get half of blobs + blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, + file_size, offsets, sizes, statuses, values, + &bytes_read); + + for (size_t i = 0; i < num_blobs; ++i) { + if (i % 2 == 0) { + ASSERT_OK(statuses_buf[i]); + ASSERT_EQ(value_buf[i], blobs[i]); + ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } else { + statuses_buf[i].PermitUncheckedError(); + ASSERT_TRUE(value_buf[i].empty()); + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + } + + // Get the rest of blobs + for (size_t i = 1; i < num_blobs; i += 2) { // odd index + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + + ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, + blob_offsets[i], file_size, blob_sizes[i], + kNoCompression, prefetch_buffer, + &value_buf[i], &bytes_read)); + ASSERT_EQ(value_buf[i], blobs[i]); + ASSERT_EQ(bytes_read, + blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); + + ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + + // Cache-only MultiGetBlob + read_options.read_tier = ReadTier::kBlockCacheTier; + + key_refs.clear(); + offsets.clear(); + sizes.clear(); + statuses.clear(); + values.clear(); + for (size_t i = 0; i < num_blobs; ++i) { + key_refs.emplace_back(std::cref(keys[i])); + offsets.push_back(blob_offsets[i]); + sizes.push_back(blob_sizes[i]); + statuses.push_back(&statuses_buf[i]); + values.push_back(&value_buf[i]); + } + + blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, + file_size, offsets, sizes, statuses, values, + &bytes_read); + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_OK(statuses_buf[i]); + ASSERT_EQ(value_buf[i], blobs[i]); + ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + } + + options.blob_cache->EraseUnRefEntries(); + + { + // Cache-only MultiGetBlob + uint64_t bytes_read = 0; + read_options.read_tier = ReadTier::kBlockCacheTier; + + autovector> key_refs; + autovector offsets; + autovector sizes; + std::array statuses_buf; + autovector statuses; + std::array value_buf; + autovector values; + + for (size_t i = 0; i < num_blobs; i++) { + key_refs.emplace_back(std::cref(keys[i])); + offsets.push_back(blob_offsets[i]); + sizes.push_back(blob_sizes[i]); + statuses.push_back(&statuses_buf[i]); + values.push_back(&value_buf[i]); + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + + blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, + file_size, offsets, sizes, statuses, values, + &bytes_read); + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_TRUE(statuses_buf[i].IsIncomplete()); + ASSERT_TRUE(value_buf[i].empty()); + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + } + + { + // MultiGetBlob from non-existing file + uint64_t bytes_read = 0; + uint64_t file_number = 100; // non-existing file + read_options.read_tier = ReadTier::kReadAllTier; + + autovector> key_refs; + autovector offsets; + autovector sizes; + std::array statuses_buf; + autovector statuses; + std::array value_buf; + autovector values; + + for (size_t i = 0; i < num_blobs; i++) { + key_refs.emplace_back(std::cref(keys[i])); + offsets.push_back(blob_offsets[i]); + sizes.push_back(blob_sizes[i]); + statuses.push_back(&statuses_buf[i]); + values.push_back(&value_buf[i]); + ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + } + + blob_source.MultiGetBlob(read_options, key_refs, file_number, file_size, + offsets, sizes, statuses, values, &bytes_read); + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_TRUE(statuses_buf[i].IsIOError()); + ASSERT_TRUE(value_buf[i].empty()); + ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + } + } } } // namespace ROCKSDB_NAMESPACE