diff --git a/db/blob/blob_source.cc b/db/blob/blob_source.cc index 83d00db10..9bba5d8cc 100644 --- a/db/blob/blob_source.cc +++ b/db/blob/blob_source.cc @@ -10,6 +10,7 @@ #include "db/blob/blob_file_reader.h" #include "options/cf_options.h" +#include "table/multiget_context.h" namespace ROCKSDB_NAMESPACE { @@ -152,6 +153,139 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, return s; } +void BlobSource::MultiGetBlob( + const ReadOptions& read_options, + const autovector>& user_keys, + uint64_t file_number, uint64_t file_size, + const autovector& offsets, + const autovector& value_sizes, autovector& statuses, + autovector& blobs, uint64_t* bytes_read) { + size_t num_blobs = user_keys.size(); + assert(num_blobs > 0); + assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE); + assert(num_blobs == offsets.size()); + assert(num_blobs == value_sizes.size()); + assert(num_blobs == statuses.size()); + assert(num_blobs == blobs.size()); + +#ifndef NDEBUG + for (size_t i = 0; i < offsets.size() - 1; ++i) { + assert(offsets[i] <= offsets[i + 1]); + } +#endif // !NDEBUG + + using Mask = uint64_t; + Mask cache_hit_mask = 0; + + Status s; + uint64_t total_bytes = 0; + const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number, + file_size); + + if (blob_cache_) { + size_t cached_blob_count = 0; + for (size_t i = 0; i < num_blobs; ++i) { + CachableEntry blob_entry; + const CacheKey cache_key = base_cache_key.WithOffset(offsets[i]); + const Slice key = cache_key.AsSlice(); + + s = GetBlobFromCache(key, &blob_entry); + if (s.ok() && blob_entry.GetValue()) { + assert(statuses[i]); + assert(blob_entry.GetValue()->size() == value_sizes[i]); + + *statuses[i] = s; + blobs[i]->PinSelf(*blob_entry.GetValue()); + + // Update the counter for the number of valid blobs read from the cache. + ++cached_blob_count; + total_bytes += value_sizes[i]; + cache_hit_mask |= (Mask{1} << i); // cache hit + } + } + + // All blobs were read from the cache. + if (cached_blob_count == num_blobs) { + if (bytes_read) { + *bytes_read = total_bytes; + } + return; + } + } + + const bool no_io = read_options.read_tier == kBlockCacheTier; + if (no_io) { + for (size_t i = 0; i < num_blobs; ++i) { + if (!(cache_hit_mask & (Mask{1} << i))) { + assert(statuses[i]); + *statuses[i] = + Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); + } + } + return; + } + + { + // Find the rest of blobs from the file since I/O is allowed. + autovector> _user_keys; + autovector _offsets; + autovector _value_sizes; + autovector _statuses; + autovector _blobs; + uint64_t _bytes_read = 0; + + for (size_t i = 0; i < num_blobs; ++i) { + if (!(cache_hit_mask & (Mask{1} << i))) { + _user_keys.emplace_back(user_keys[i]); + _offsets.push_back(offsets[i]); + _value_sizes.push_back(value_sizes[i]); + _statuses.push_back(statuses[i]); + _blobs.push_back(blobs[i]); + } + } + + CacheHandleGuard blob_file_reader; + s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader); + if (!s.ok()) { + for (size_t i = 0; i < _blobs.size(); ++i) { + assert(_statuses[i]); + *_statuses[i] = s; + } + return; + } + + assert(blob_file_reader.GetValue()); + + blob_file_reader.GetValue()->MultiGetBlob(read_options, _user_keys, + _offsets, _value_sizes, _statuses, + _blobs, &_bytes_read); + + if (read_options.fill_cache) { + // If filling cache is allowed and a cache is configured, try to put + // the blob(s) to the cache. + for (size_t i = 0; i < _blobs.size(); ++i) { + if (_statuses[i]->ok()) { + CachableEntry blob_entry; + const CacheKey cache_key = base_cache_key.WithOffset(_offsets[i]); + const Slice key = cache_key.AsSlice(); + + s = PutBlobIntoCache(key, &blob_entry, _blobs[i]); + if (!s.ok()) { + *_statuses[i] = s; + } + } + } + } + + total_bytes += _bytes_read; + if (bytes_read) { + *bytes_read = total_bytes; + } + + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, _bytes_read); + } +} + bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size, uint64_t offset) const { const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); diff --git a/db/blob/blob_source.h b/db/blob/blob_source.h index 2a1a13c80..0d32e8060 100644 --- a/db/blob/blob_source.h +++ b/db/blob/blob_source.h @@ -13,6 +13,7 @@ #include "rocksdb/cache.h" #include "rocksdb/rocksdb_namespace.h" #include "table/block_based/cachable_entry.h" +#include "util/autovector.h" namespace ROCKSDB_NAMESPACE { @@ -42,6 +43,15 @@ class BlobSource { FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read); + // Offsets must be sorted in ascending order by caller. + void MultiGetBlob( + const ReadOptions& read_options, + const autovector>& user_keys, + uint64_t file_number, uint64_t file_size, + const autovector& offsets, + const autovector& value_sizes, autovector& statuses, + autovector& blobs, uint64_t* bytes_read); + inline Status GetBlobFileReader( uint64_t blob_file_number, CacheHandleGuard* blob_file_reader) { diff --git a/db/blob/blob_source_test.cc b/db/blob/blob_source_test.cc index 55f451d77..7bd650b14 100644 --- a/db/blob/blob_source_test.cc +++ b/db/blob/blob_source_test.cc @@ -117,6 +117,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { options.cf_paths.emplace_back( test::PerThreadDBPath(env_, "BlobSourceTest_GetBlobsFromCache"), 0); options.enable_blob_files = true; + options.create_if_missing = true; LRUCacheOptions co; co.capacity = 2048; @@ -125,7 +126,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { options.blob_cache = NewLRUCache(co); options.lowest_used_cache_tier = CacheTier::kVolatileTier; - Reopen(options); + DestroyAndReopen(options); std::string db_id; ASSERT_OK(db_->GetDbIdentity(db_id)); @@ -277,10 +278,12 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); - ASSERT_NOK(blob_source.GetBlob(read_options, keys[i], blob_file_number, - blob_offsets[i], file_size, blob_sizes[i], - kNoCompression, prefetch_buffer, - &values[i], &bytes_read)); + ASSERT_TRUE(blob_source + .GetBlob(read_options, keys[i], blob_file_number, + blob_offsets[i], file_size, blob_sizes[i], + kNoCompression, prefetch_buffer, &values[i], + &bytes_read) + .IsIncomplete()); ASSERT_TRUE(values[i].empty()); ASSERT_EQ(bytes_read, 0); @@ -288,6 +291,271 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { blob_offsets[i])); } } + + { + // GetBlob from non-existing file + std::vector values(keys.size()); + uint64_t bytes_read = 0; + uint64_t file_number = 100; // non-existing file + + read_options.read_tier = ReadTier::kReadAllTier; + read_options.fill_cache = true; + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + + ASSERT_TRUE(blob_source + .GetBlob(read_options, keys[i], file_number, + blob_offsets[i], file_size, blob_sizes[i], + kNoCompression, prefetch_buffer, &values[i], + &bytes_read) + .IsIOError()); + ASSERT_TRUE(values[i].empty()); + ASSERT_EQ(bytes_read, 0); + + ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + } + } +} + +TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { + Options options; + options.env = env_; + options.cf_paths.emplace_back( + test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0); + options.enable_blob_files = true; + options.create_if_missing = true; + + LRUCacheOptions co; + co.capacity = 2048; + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + options.blob_cache = NewLRUCache(co); + options.lowest_used_cache_tier = CacheTier::kVolatileTier; + + DestroyAndReopen(options); + + std::string db_id; + ASSERT_OK(db_->GetDbIdentity(db_id)); + + std::string db_session_id; + ASSERT_OK(db_->GetDbSessionId(db_session_id)); + + ImmutableOptions immutable_options(options); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_file_number = 1; + constexpr size_t num_blobs = 16; + + std::vector key_strs; + std::vector blob_strs; + + for (size_t i = 0; i < num_blobs; ++i) { + key_strs.push_back("key" + std::to_string(i)); + blob_strs.push_back("blob" + std::to_string(i)); + } + + std::vector keys; + std::vector blobs; + + uint64_t file_size = BlobLogHeader::kSize; + for (size_t i = 0; i < num_blobs; ++i) { + keys.push_back({key_strs[i]}); + blobs.push_back({blob_strs[i]}); + file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size(); + } + file_size += BlobLogFooter::kSize; + + std::vector blob_offsets(keys.size()); + std::vector blob_sizes(keys.size()); + + WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range, + expiration_range, blob_file_number, keys, blobs, kNoCompression, + blob_offsets, blob_sizes); + + constexpr size_t capacity = 10; + std::shared_ptr backing_cache = + NewLRUCache(capacity); // Blob file cache + + FileOptions file_options; + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr blob_file_cache(new BlobFileCache( + backing_cache.get(), &immutable_options, &file_options, column_family_id, + blob_file_read_hist, nullptr /*IOTracer*/)); + + BlobSource blob_source(&immutable_options, db_id, db_session_id, + blob_file_cache.get()); + + ReadOptions read_options; + read_options.verify_checksums = true; + + constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; + + { + // MultiGetBlob + uint64_t bytes_read = 0; + + autovector> key_refs; + autovector offsets; + autovector sizes; + std::array statuses_buf; + autovector statuses; + std::array value_buf; + autovector values; + + for (size_t i = 0; i < num_blobs; i += 2) { // even index + key_refs.emplace_back(std::cref(keys[i])); + offsets.push_back(blob_offsets[i]); + sizes.push_back(blob_sizes[i]); + statuses.push_back(&statuses_buf[i]); + values.push_back(&value_buf[i]); + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + + read_options.fill_cache = true; + read_options.read_tier = ReadTier::kReadAllTier; + + // Get half of blobs + blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, + file_size, offsets, sizes, statuses, values, + &bytes_read); + + for (size_t i = 0; i < num_blobs; ++i) { + if (i % 2 == 0) { + ASSERT_OK(statuses_buf[i]); + ASSERT_EQ(value_buf[i], blobs[i]); + ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } else { + statuses_buf[i].PermitUncheckedError(); + ASSERT_TRUE(value_buf[i].empty()); + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + } + + // Get the rest of blobs + for (size_t i = 1; i < num_blobs; i += 2) { // odd index + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + + ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, + blob_offsets[i], file_size, blob_sizes[i], + kNoCompression, prefetch_buffer, + &value_buf[i], &bytes_read)); + ASSERT_EQ(value_buf[i], blobs[i]); + ASSERT_EQ(bytes_read, + blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); + + ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + + // Cache-only MultiGetBlob + read_options.read_tier = ReadTier::kBlockCacheTier; + + key_refs.clear(); + offsets.clear(); + sizes.clear(); + statuses.clear(); + values.clear(); + for (size_t i = 0; i < num_blobs; ++i) { + key_refs.emplace_back(std::cref(keys[i])); + offsets.push_back(blob_offsets[i]); + sizes.push_back(blob_sizes[i]); + statuses.push_back(&statuses_buf[i]); + values.push_back(&value_buf[i]); + } + + blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, + file_size, offsets, sizes, statuses, values, + &bytes_read); + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_OK(statuses_buf[i]); + ASSERT_EQ(value_buf[i], blobs[i]); + ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + } + + options.blob_cache->EraseUnRefEntries(); + + { + // Cache-only MultiGetBlob + uint64_t bytes_read = 0; + read_options.read_tier = ReadTier::kBlockCacheTier; + + autovector> key_refs; + autovector offsets; + autovector sizes; + std::array statuses_buf; + autovector statuses; + std::array value_buf; + autovector values; + + for (size_t i = 0; i < num_blobs; i++) { + key_refs.emplace_back(std::cref(keys[i])); + offsets.push_back(blob_offsets[i]); + sizes.push_back(blob_sizes[i]); + statuses.push_back(&statuses_buf[i]); + values.push_back(&value_buf[i]); + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + + blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, + file_size, offsets, sizes, statuses, values, + &bytes_read); + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_TRUE(statuses_buf[i].IsIncomplete()); + ASSERT_TRUE(value_buf[i].empty()); + ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, + blob_offsets[i])); + } + } + + { + // MultiGetBlob from non-existing file + uint64_t bytes_read = 0; + uint64_t file_number = 100; // non-existing file + read_options.read_tier = ReadTier::kReadAllTier; + + autovector> key_refs; + autovector offsets; + autovector sizes; + std::array statuses_buf; + autovector statuses; + std::array value_buf; + autovector values; + + for (size_t i = 0; i < num_blobs; i++) { + key_refs.emplace_back(std::cref(keys[i])); + offsets.push_back(blob_offsets[i]); + sizes.push_back(blob_sizes[i]); + statuses.push_back(&statuses_buf[i]); + values.push_back(&value_buf[i]); + ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + } + + blob_source.MultiGetBlob(read_options, key_refs, file_number, file_size, + offsets, sizes, statuses, values, &bytes_read); + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_TRUE(statuses_buf[i].IsIOError()); + ASSERT_TRUE(value_buf[i].empty()); + ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + } + } } } // namespace ROCKSDB_NAMESPACE