diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc index a1af799bd..cc46f7a76 100644 --- a/db/blob/blob_file_reader.cc +++ b/db/blob/blob_file_reader.cc @@ -16,6 +16,7 @@ #include "rocksdb/file_system.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "table/multiget_context.h" #include "test_util/sync_point.h" #include "util/compression.h" #include "util/crc32c.h" @@ -374,40 +375,50 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, return Status::OK(); } -void BlobFileReader::MultiGetBlob( - const ReadOptions& read_options, - const autovector>& user_keys, - const autovector& offsets, - const autovector& value_sizes, autovector& statuses, - autovector& values, uint64_t* bytes_read) const { - const size_t num_blobs = user_keys.size(); +void BlobFileReader::MultiGetBlob(const ReadOptions& read_options, + autovector& blob_reqs, + uint64_t* bytes_read) const { + const size_t num_blobs = blob_reqs.size(); assert(num_blobs > 0); - assert(num_blobs == offsets.size()); - assert(num_blobs == value_sizes.size()); - assert(num_blobs == statuses.size()); - assert(num_blobs == values.size()); + assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE); #ifndef NDEBUG - for (size_t i = 0; i < offsets.size() - 1; ++i) { - assert(offsets[i] <= offsets[i + 1]); + for (size_t i = 0; i < num_blobs - 1; ++i) { + assert(blob_reqs[i]->offset <= blob_reqs[i + 1]->offset); } #endif // !NDEBUG - std::vector read_reqs(num_blobs); + std::vector read_reqs; autovector adjustments; uint64_t total_len = 0; + read_reqs.reserve(num_blobs); for (size_t i = 0; i < num_blobs; ++i) { - const size_t key_size = user_keys[i].get().size(); - assert(IsValidBlobOffset(offsets[i], key_size, value_sizes[i], file_size_)); + const size_t key_size = blob_reqs[i]->user_key->size(); + const uint64_t offset = blob_reqs[i]->offset; + const uint64_t value_size = blob_reqs[i]->len; + + if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) { + *blob_reqs[i]->status = Status::Corruption("Invalid blob offset"); + continue; + } + if (blob_reqs[i]->compression != compression_type_) { + *blob_reqs[i]->status = + Status::Corruption("Compression type mismatch when reading a blob"); + continue; + } + const uint64_t adjustment = read_options.verify_checksums ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) : 0; - assert(offsets[i] >= adjustment); + assert(blob_reqs[i]->offset >= adjustment); adjustments.push_back(adjustment); - read_reqs[i].offset = offsets[i] - adjustment; - read_reqs[i].len = value_sizes[i] + adjustment; - total_len += read_reqs[i].len; + + FSReadRequest read_req; + read_req.offset = blob_reqs[i]->offset - adjustment; + read_req.len = blob_reqs[i]->len + adjustment; + read_reqs.emplace_back(read_req); + total_len += read_req.len; } RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len); @@ -439,9 +450,12 @@ void BlobFileReader::MultiGetBlob( for (auto& req : read_reqs) { req.status.PermitUncheckedError(); } - for (size_t i = 0; i < num_blobs; ++i) { - assert(statuses[i]); - *statuses[i] = s; + for (auto& req : blob_reqs) { + assert(req->status); + if (!req->status->IsCorruption()) { + // Avoid overwriting corruption status. + *req->status = s; + } } return; } @@ -449,33 +463,39 @@ void BlobFileReader::MultiGetBlob( assert(s.ok()); uint64_t total_bytes = 0; - for (size_t i = 0; i < num_blobs; ++i) { - auto& req = read_reqs[i]; - const auto& record_slice = req.result; + for (size_t i = 0, j = 0; i < num_blobs; ++i) { + assert(blob_reqs[i]->status); + if (!blob_reqs[i]->status->ok()) { + continue; + } - assert(statuses[i]); + assert(j < read_reqs.size()); + auto& req = read_reqs[j++]; + const auto& record_slice = req.result; if (req.status.ok() && record_slice.size() != req.len) { req.status = IOStatus::Corruption("Failed to read data from blob file"); } - *statuses[i] = req.status; - if (!statuses[i]->ok()) { + *blob_reqs[i]->status = req.status; + if (!blob_reqs[i]->status->ok()) { continue; } // Verify checksums if enabled if (read_options.verify_checksums) { - *statuses[i] = VerifyBlob(record_slice, user_keys[i], value_sizes[i]); - if (!statuses[i]->ok()) { + *blob_reqs[i]->status = + VerifyBlob(record_slice, *blob_reqs[i]->user_key, blob_reqs[i]->len); + if (!blob_reqs[i]->status->ok()) { continue; } } // Uncompress blob if needed - Slice value_slice(record_slice.data() + adjustments[i], value_sizes[i]); - *statuses[i] = UncompressBlobIfNeeded(value_slice, compression_type_, - clock_, statistics_, values[i]); - if (statuses[i]->ok()) { + Slice value_slice(record_slice.data() + adjustments[i], blob_reqs[i]->len); + *blob_reqs[i]->status = + UncompressBlobIfNeeded(value_slice, compression_type_, clock_, + statistics_, blob_reqs[i]->result); + if (blob_reqs[i]->status->ok()) { total_bytes += record_slice.size(); } } diff --git a/db/blob/blob_file_reader.h b/db/blob/blob_file_reader.h index 59bf13182..a30b9234c 100644 --- a/db/blob/blob_file_reader.h +++ b/db/blob/blob_file_reader.h @@ -8,6 +8,7 @@ #include #include +#include "db/blob/blob_read_request.h" #include "file/random_access_file_reader.h" #include "rocksdb/compression_type.h" #include "rocksdb/rocksdb_namespace.h" @@ -47,12 +48,9 @@ class BlobFileReader { uint64_t* bytes_read) const; // offsets must be sorted in ascending order by caller. - void MultiGetBlob( - const ReadOptions& read_options, - const autovector>& user_keys, - const autovector& offsets, - const autovector& value_sizes, autovector& statuses, - autovector& values, uint64_t* bytes_read) const; + void MultiGetBlob(const ReadOptions& read_options, + autovector& blob_reqs, + uint64_t* bytes_read) const; CompressionType GetCompressionType() const { return compression_type_; } diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc index 8cbb982a8..15ea6c953 100644 --- a/db/blob/blob_file_reader_test.cc +++ b/db/blob/blob_file_reader_test.cc @@ -194,21 +194,21 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { // MultiGetBlob bytes_read = 0; size_t total_size = 0; - autovector> key_refs; - for (const auto& key_ref : keys) { - key_refs.emplace_back(std::cref(key_ref)); - } - autovector offsets{blob_offsets[0], blob_offsets[1], - blob_offsets[2]}; - autovector sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]}; + std::array statuses_buf; - autovector statuses{&statuses_buf[0], &statuses_buf[1], - &statuses_buf[2]}; std::array value_buf; - autovector values{&value_buf[0], &value_buf[1], - &value_buf[2]}; - reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses, - values, &bytes_read); + std::array requests_buf; + autovector blob_reqs; + + for (size_t i = 0; i < num_blobs; ++i) { + requests_buf[i] = + BlobReadRequest(keys[i], blob_offsets[i], blob_sizes[i], + kNoCompression, &value_buf[i], &statuses_buf[i]); + blob_reqs.push_back(&requests_buf[i]); + } + + reader->MultiGetBlob(read_options, blob_reqs, &bytes_read); + for (size_t i = 0; i < num_blobs; ++i) { ASSERT_OK(statuses_buf[i]); ASSERT_EQ(value_buf[i], blobs[i]); @@ -300,15 +300,21 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { blob_offsets[0], blob_offsets[1] - (keys[1].size() - key_refs[1].get().size()), blob_offsets[2]}; - autovector sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]}; + std::array statuses_buf; - autovector statuses{&statuses_buf[0], &statuses_buf[1], - &statuses_buf[2]}; std::array value_buf; - autovector values{&value_buf[0], &value_buf[1], - &value_buf[2]}; - reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses, - values, &bytes_read); + std::array requests_buf; + autovector blob_reqs; + + for (size_t i = 0; i < num_blobs; ++i) { + requests_buf[i] = + BlobReadRequest(key_refs[i], offsets[i], blob_sizes[i], + kNoCompression, &value_buf[i], &statuses_buf[i]); + blob_reqs.push_back(&requests_buf[i]); + } + + reader->MultiGetBlob(read_options, blob_reqs, &bytes_read); + for (size_t i = 0; i < num_blobs; ++i) { if (i == 1) { ASSERT_TRUE(statuses_buf[i].IsCorruption()); @@ -339,17 +345,21 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { Slice wrong_key_slice(incorrect_key, sizeof(incorrect_key) - 1); key_refs[2] = std::cref(wrong_key_slice); - autovector offsets{blob_offsets[0], blob_offsets[1], - blob_offsets[2]}; - autovector sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]}; std::array statuses_buf; - autovector statuses{&statuses_buf[0], &statuses_buf[1], - &statuses_buf[2]}; std::array value_buf; - autovector values{&value_buf[0], &value_buf[1], - &value_buf[2]}; - reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses, - values, &bytes_read); + std::array requests_buf; + + for (size_t i = 0; i < num_blobs; ++i) { + requests_buf[i] = + BlobReadRequest(key_refs[i], blob_offsets[i], blob_sizes[i], + kNoCompression, &value_buf[i], &statuses_buf[i]); + } + + autovector blob_reqs = { + &requests_buf[0], &requests_buf[1], &requests_buf[2]}; + + reader->MultiGetBlob(read_options, blob_reqs, &bytes_read); + for (size_t i = 0; i < num_blobs; ++i) { if (i == num_blobs - 1) { ASSERT_TRUE(statuses_buf[i].IsCorruption()); @@ -376,17 +386,26 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { for (const auto& key_ref : keys) { key_refs.emplace_back(std::cref(key_ref)); } - autovector offsets{blob_offsets[0], blob_offsets[1], - blob_offsets[2]}; - autovector sizes{blob_sizes[0], blob_sizes[1] + 1, blob_sizes[2]}; + std::array statuses_buf; - autovector statuses{&statuses_buf[0], &statuses_buf[1], - &statuses_buf[2]}; std::array value_buf; - autovector values{&value_buf[0], &value_buf[1], - &value_buf[2]}; - reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses, - values, &bytes_read); + std::array requests_buf; + + requests_buf[0] = + BlobReadRequest(key_refs[0], blob_offsets[0], blob_sizes[0], + kNoCompression, &value_buf[0], &statuses_buf[0]); + requests_buf[1] = + BlobReadRequest(key_refs[1], blob_offsets[1], blob_sizes[1] + 1, + kNoCompression, &value_buf[1], &statuses_buf[1]); + requests_buf[2] = + BlobReadRequest(key_refs[2], blob_offsets[2], blob_sizes[2], + kNoCompression, &value_buf[2], &statuses_buf[2]); + + autovector blob_reqs = { + &requests_buf[0], &requests_buf[1], &requests_buf[2]}; + + reader->MultiGetBlob(read_options, blob_reqs, &bytes_read); + for (size_t i = 0; i < num_blobs; ++i) { if (i != 1) { ASSERT_OK(statuses_buf[i]); diff --git a/db/blob/blob_read_request.h b/db/blob/blob_read_request.h new file mode 100644 index 000000000..f9668ca2e --- /dev/null +++ b/db/blob/blob_read_request.h @@ -0,0 +1,58 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include "rocksdb/compression_type.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "util/autovector.h" + +namespace ROCKSDB_NAMESPACE { + +// A read Blob request structure for use in BlobSource::MultiGetBlob and +// BlobFileReader::MultiGetBlob. +struct BlobReadRequest { + // User key to lookup the paired blob + const Slice* user_key = nullptr; + + // File offset in bytes + uint64_t offset = 0; + + // Length to read in bytes + size_t len = 0; + + // Blob compression type + CompressionType compression = kNoCompression; + + // Output parameter set by MultiGetBlob() to point to the data buffer, and + // the number of valid bytes + PinnableSlice* result = nullptr; + + // Status of read + Status* status = nullptr; + + BlobReadRequest(const Slice& _user_key, uint64_t _offset, size_t _len, + CompressionType _compression, PinnableSlice* _result, + Status* _status) + : user_key(&_user_key), + offset(_offset), + len(_len), + compression(_compression), + result(_result), + status(_status) {} + + BlobReadRequest() = default; + BlobReadRequest(const BlobReadRequest& other) = default; + BlobReadRequest& operator=(const BlobReadRequest& other) = default; +}; + +using BlobFileReadRequests = + std::tuple>; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_source.cc b/db/blob/blob_source.cc index 8220755f2..2b09930a0 100644 --- a/db/blob/blob_source.cc +++ b/db/blob/blob_source.cc @@ -202,31 +202,51 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, return s; } -void BlobSource::MultiGetBlob( - const ReadOptions& read_options, - const autovector>& user_keys, - uint64_t file_number, uint64_t file_size, - const autovector& offsets, - const autovector& value_sizes, autovector& statuses, - autovector& blobs, uint64_t* bytes_read) { - size_t num_blobs = user_keys.size(); +void BlobSource::MultiGetBlob(const ReadOptions& read_options, + autovector& blob_reqs, + uint64_t* bytes_read) { + assert(blob_reqs.size() > 0); + + uint64_t total_bytes_read = 0; + uint64_t bytes_read_in_file = 0; + + for (auto& [file_number, file_size, blob_reqs_in_file] : blob_reqs) { + // sort blob_reqs_in_file by file offset. + std::sort( + blob_reqs_in_file.begin(), blob_reqs_in_file.end(), + [](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool { + return lhs.offset < rhs.offset; + }); + + MultiGetBlobFromOneFile(read_options, file_number, file_size, + blob_reqs_in_file, &bytes_read_in_file); + + total_bytes_read += bytes_read_in_file; + } + + if (bytes_read) { + *bytes_read = total_bytes_read; + } +} + +void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, + uint64_t file_number, + uint64_t file_size, + autovector& blob_reqs, + uint64_t* bytes_read) { + const size_t num_blobs = blob_reqs.size(); assert(num_blobs > 0); assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE); - assert(num_blobs == offsets.size()); - assert(num_blobs == value_sizes.size()); - assert(num_blobs == statuses.size()); - assert(num_blobs == blobs.size()); #ifndef NDEBUG - for (size_t i = 0; i < offsets.size() - 1; ++i) { - assert(offsets[i] <= offsets[i + 1]); + for (size_t i = 0; i < num_blobs - 1; ++i) { + assert(blob_reqs[i].offset <= blob_reqs[i + 1].offset); } #endif // !NDEBUG using Mask = uint64_t; Mask cache_hit_mask = 0; - Status s; uint64_t total_bytes = 0; const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number, file_size); @@ -234,15 +254,18 @@ void BlobSource::MultiGetBlob( if (blob_cache_) { size_t cached_blob_count = 0; for (size_t i = 0; i < num_blobs; ++i) { + auto& req = blob_reqs[i]; + CachableEntry blob_entry; - const CacheKey cache_key = base_cache_key.WithOffset(offsets[i]); + const CacheKey cache_key = base_cache_key.WithOffset(req.offset); const Slice key = cache_key.AsSlice(); - s = GetBlobFromCache(key, &blob_entry); + const Status s = GetBlobFromCache(key, &blob_entry); + if (s.ok() && blob_entry.GetValue()) { - assert(statuses[i]); - *statuses[i] = s; - blobs[i]->PinSelf(*blob_entry.GetValue()); + assert(req.status); + *req.status = s; + req.result->PinSelf(*blob_entry.GetValue()); // Update the counter for the number of valid blobs read from the cache. ++cached_blob_count; @@ -251,10 +274,10 @@ void BlobSource::MultiGetBlob( uint64_t adjustment = read_options.verify_checksums ? BlobLogRecord::CalculateAdjustmentForRecordHeader( - user_keys[i].get().size()) + req.user_key->size()) : 0; - assert(offsets[i] >= adjustment); - total_bytes += value_sizes[i] + adjustment; + assert(req.offset >= adjustment); + total_bytes += req.len + adjustment; cache_hit_mask |= (Mask{1} << i); // cache hit } } @@ -272,8 +295,8 @@ void BlobSource::MultiGetBlob( if (no_io) { for (size_t i = 0; i < num_blobs; ++i) { if (!(cache_hit_mask & (Mask{1} << i))) { - assert(statuses[i]); - *statuses[i] = + assert(blob_reqs[i].status); + *blob_reqs[i].status = Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); } } @@ -282,50 +305,43 @@ void BlobSource::MultiGetBlob( { // Find the rest of blobs from the file since I/O is allowed. - autovector> _user_keys; - autovector _offsets; - autovector _value_sizes; - autovector _statuses; - autovector _blobs; + autovector _blob_reqs; uint64_t _bytes_read = 0; for (size_t i = 0; i < num_blobs; ++i) { if (!(cache_hit_mask & (Mask{1} << i))) { - _user_keys.emplace_back(user_keys[i]); - _offsets.push_back(offsets[i]); - _value_sizes.push_back(value_sizes[i]); - _statuses.push_back(statuses[i]); - _blobs.push_back(blobs[i]); + _blob_reqs.push_back(&blob_reqs[i]); } } CacheHandleGuard blob_file_reader; - s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader); + Status s = + blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader); if (!s.ok()) { - for (size_t i = 0; i < _blobs.size(); ++i) { - assert(_statuses[i]); - *_statuses[i] = s; + for (size_t i = 0; i < _blob_reqs.size(); ++i) { + assert(_blob_reqs[i]->status); + *_blob_reqs[i]->status = s; } return; } assert(blob_file_reader.GetValue()); - blob_file_reader.GetValue()->MultiGetBlob(read_options, _user_keys, - _offsets, _value_sizes, _statuses, - _blobs, &_bytes_read); + blob_file_reader.GetValue()->MultiGetBlob(read_options, _blob_reqs, + &_bytes_read); - if (read_options.fill_cache) { + if (blob_cache_ && read_options.fill_cache) { // If filling cache is allowed and a cache is configured, try to put // the blob(s) to the cache. - for (size_t i = 0; i < _blobs.size(); ++i) { - if (_statuses[i]->ok()) { + for (size_t i = 0; i < _blob_reqs.size(); ++i) { + if (_blob_reqs[i]->status->ok()) { CachableEntry blob_entry; - const CacheKey cache_key = base_cache_key.WithOffset(_offsets[i]); + const CacheKey cache_key = + base_cache_key.WithOffset(_blob_reqs[i]->offset); const Slice key = cache_key.AsSlice(); - s = PutBlobIntoCache(key, &blob_entry, _blobs[i]); + s = PutBlobIntoCache(key, &blob_entry, _blob_reqs[i]->result); if (!s.ok()) { - *_statuses[i] = s; + *_blob_reqs[i]->status = s; } } } diff --git a/db/blob/blob_source.h b/db/blob/blob_source.h index 2f62d01c1..aa46311c1 100644 --- a/db/blob/blob_source.h +++ b/db/blob/blob_source.h @@ -10,6 +10,7 @@ #include "cache/cache_helpers.h" #include "cache/cache_key.h" #include "db/blob/blob_file_cache.h" +#include "db/blob/blob_read_request.h" #include "rocksdb/cache.h" #include "rocksdb/rocksdb_namespace.h" #include "table/block_based/cachable_entry.h" @@ -37,7 +38,7 @@ class BlobSource { ~BlobSource(); - // Read a blob from the underlying cache or storage. + // Read a blob from the underlying cache or one blob file. // // If successful, returns ok and sets "*value" to the newly retrieved // uncompressed blob. If there was an error while fetching the blob, sets @@ -52,25 +53,44 @@ class BlobSource { FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read); - // Read multiple blobs from the underlying cache or storage. + // Read multiple blobs from the underlying cache or blob file(s). // - // If successful, returns ok and sets the elements of blobs to the newly - // retrieved uncompressed blobs. If there was an error while fetching one of - // blobs, sets its corresponding "blobs[i]" to empty and sets "statuses[i]" to - // a non-ok status. + // If successful, returns ok and sets "result" in the elements of "blob_reqs" + // to the newly retrieved uncompressed blobs. If there was an error while + // fetching one of blobs, sets its "result" to empty and sets its + // corresponding "status" to a non-ok status. // // Note: - // - Offsets must be sorted in ascending order by caller. + // - The main difference between this function and MultiGetBlobFromOneFile is + // that this function can read multiple blobs from multiple blob files. + // + // - For consistency, whether the blob is found in the cache or on disk, sets + // "*bytes_read" to the total size of on-disk (possibly compressed) blob + // records. + void MultiGetBlob(const ReadOptions& read_options, + autovector& blob_reqs, + uint64_t* bytes_read); + + // Read multiple blobs from the underlying cache or one blob file. + // + // If successful, returns ok and sets "result" in the elements of "blob_reqs" + // to the newly retrieved uncompressed blobs. If there was an error while + // fetching one of blobs, sets its "result" to empty and sets its + // corresponding "status" to a non-ok status. + // + // Note: + // - The main difference between this function and MultiGetBlob is that this + // function is only used for the case where the demanded blobs are stored in + // one blob file. MultiGetBlob will call this function multiple times if the + // demanded blobs are stored in multiple blob files. + // // - For consistency, whether the blob is found in the cache or on disk, sets // "*bytes_read" to the total size of on-disk (possibly compressed) blob // records. - void MultiGetBlob( - const ReadOptions& read_options, - const autovector>& user_keys, - uint64_t file_number, uint64_t file_size, - const autovector& offsets, - const autovector& value_sizes, autovector& statuses, - autovector& blobs, uint64_t* bytes_read); + void MultiGetBlobFromOneFile(const ReadOptions& read_options, + uint64_t file_number, uint64_t file_size, + autovector& blob_reqs, + uint64_t* bytes_read); inline Status GetBlobFileReader( uint64_t blob_file_number, diff --git a/db/blob/blob_source_test.cc b/db/blob/blob_source_test.cc index 6a9f66c83..4d35af31a 100644 --- a/db/blob/blob_source_test.cc +++ b/db/blob/blob_source_test.cc @@ -561,6 +561,199 @@ TEST_F(BlobSourceTest, GetCompressedBlobs) { } } +TEST_F(BlobSourceTest, MultiGetBlobsFromMultiFiles) { + options_.cf_paths.emplace_back( + test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromMultiFiles"), + 0); + + options_.statistics = CreateDBStatistics(); + Statistics* statistics = options_.statistics.get(); + assert(statistics); + + DestroyAndReopen(options_); + + ImmutableOptions immutable_options(options_); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr uint64_t blob_files = 2; + constexpr size_t num_blobs = 32; + + std::vector key_strs; + std::vector blob_strs; + + for (size_t i = 0; i < num_blobs; ++i) { + key_strs.push_back("key" + std::to_string(i)); + blob_strs.push_back("blob" + std::to_string(i)); + } + + std::vector keys; + std::vector blobs; + + uint64_t file_size = BlobLogHeader::kSize; + uint64_t blob_value_bytes = 0; + for (size_t i = 0; i < num_blobs; ++i) { + keys.push_back({key_strs[i]}); + blobs.push_back({blob_strs[i]}); + blob_value_bytes += blobs[i].size(); + file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size(); + } + file_size += BlobLogFooter::kSize; + const uint64_t blob_records_bytes = + file_size - BlobLogHeader::kSize - BlobLogFooter::kSize; + + std::vector blob_offsets(keys.size()); + std::vector blob_sizes(keys.size()); + + { + // Write key/blob pairs to multiple blob files. + for (size_t i = 0; i < blob_files; ++i) { + const uint64_t file_number = i + 1; + WriteBlobFile(immutable_options, column_family_id, has_ttl, + expiration_range, expiration_range, file_number, keys, + blobs, kNoCompression, blob_offsets, blob_sizes); + } + } + + constexpr size_t capacity = 10; + std::shared_ptr backing_cache = + NewLRUCache(capacity); // Blob file cache + + FileOptions file_options; + constexpr HistogramImpl* blob_file_read_hist = nullptr; + + std::unique_ptr blob_file_cache(new BlobFileCache( + backing_cache.get(), &immutable_options, &file_options, column_family_id, + blob_file_read_hist, nullptr /*IOTracer*/)); + + BlobSource blob_source(&immutable_options, db_id_, db_session_id_, + blob_file_cache.get()); + + ReadOptions read_options; + read_options.verify_checksums = true; + + uint64_t bytes_read = 0; + + { + // MultiGetBlob + read_options.fill_cache = true; + read_options.read_tier = ReadTier::kReadAllTier; + + autovector blob_reqs; + std::array, blob_files> blob_reqs_in_file; + std::array value_buf; + std::array statuses_buf; + + for (size_t i = 0; i < blob_files; ++i) { + const uint64_t file_number = i + 1; + for (size_t j = 0; j < num_blobs; ++j) { + blob_reqs_in_file[i].emplace_back( + keys[j], blob_offsets[j], blob_sizes[j], kNoCompression, + &value_buf[i * num_blobs + j], &statuses_buf[i * num_blobs + j]); + } + blob_reqs.emplace_back(file_number, file_size, blob_reqs_in_file[i]); + } + + get_perf_context()->Reset(); + statistics->Reset().PermitUncheckedError(); + + blob_source.MultiGetBlob(read_options, blob_reqs, &bytes_read); + + for (size_t i = 0; i < blob_files; ++i) { + const uint64_t file_number = i + 1; + for (size_t j = 0; j < num_blobs; ++j) { + ASSERT_OK(statuses_buf[i * num_blobs + j]); + ASSERT_EQ(value_buf[i * num_blobs + j], blobs[j]); + ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[j])); + } + } + + // Retrieved all blobs from 2 blob files twice via MultiGetBlob and + // TEST_BlobInCache. + ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, + num_blobs * blob_files); + ASSERT_EQ((int)get_perf_context()->blob_read_count, + num_blobs * blob_files); // blocking i/o + ASSERT_EQ((int)get_perf_context()->blob_read_byte, + blob_records_bytes * blob_files); // blocking i/o + ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0); + ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0); + + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), + num_blobs * blob_files); // MultiGetBlob + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), + num_blobs * blob_files); // TEST_BlobInCache + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), + num_blobs * blob_files); // MultiGetBlob + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), + blob_value_bytes * blob_files); // TEST_BlobInCache + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), + blob_value_bytes * blob_files); // MultiGetBlob + + get_perf_context()->Reset(); + statistics->Reset().PermitUncheckedError(); + + autovector fake_blob_reqs_in_file; + std::array fake_value_buf; + std::array fake_statuses_buf; + + const uint64_t fake_file_number = 100; + for (size_t i = 0; i < num_blobs; ++i) { + fake_blob_reqs_in_file.emplace_back( + keys[i], blob_offsets[i], blob_sizes[i], kNoCompression, + &fake_value_buf[i], &fake_statuses_buf[i]); + } + + // Add a fake multi-get blob request. + blob_reqs.emplace_back(fake_file_number, file_size, fake_blob_reqs_in_file); + + blob_source.MultiGetBlob(read_options, blob_reqs, &bytes_read); + + // Check the real blob read requests. + for (size_t i = 0; i < blob_files; ++i) { + const uint64_t file_number = i + 1; + for (size_t j = 0; j < num_blobs; ++j) { + ASSERT_OK(statuses_buf[i * num_blobs + j]); + ASSERT_EQ(value_buf[i * num_blobs + j], blobs[j]); + ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[j])); + } + } + + // Check the fake blob request. + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_TRUE(fake_statuses_buf[i].IsIOError()); + ASSERT_TRUE(fake_value_buf[i].empty()); + ASSERT_FALSE(blob_source.TEST_BlobInCache(fake_file_number, file_size, + blob_offsets[i])); + } + + // Retrieved all blobs from 3 blob files (including the fake one) twice + // via MultiGetBlob and TEST_BlobInCache. + ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, + num_blobs * blob_files * 2); + ASSERT_EQ((int)get_perf_context()->blob_read_count, + 0); // blocking i/o + ASSERT_EQ((int)get_perf_context()->blob_read_byte, + 0); // blocking i/o + ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0); + ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0); + + // Fake blob requests: MultiGetBlob and TEST_BlobInCache + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 2); + // Real blob requests: MultiGetBlob and TEST_BlobInCache + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), + num_blobs * blob_files * 2); + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0); + // Real blob requests: MultiGetBlob and TEST_BlobInCache + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), + blob_value_bytes * blob_files * 2); + ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0); + } +} + TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { options_.cf_paths.emplace_back( test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0); @@ -625,23 +818,15 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; { - // MultiGetBlob + // MultiGetBlobFromOneFile uint64_t bytes_read = 0; - - autovector> key_refs; - autovector offsets; - autovector sizes; std::array statuses_buf; - autovector statuses; std::array value_buf; - autovector values; + autovector blob_reqs; for (size_t i = 0; i < num_blobs; i += 2) { // even index - key_refs.emplace_back(std::cref(keys[i])); - offsets.push_back(blob_offsets[i]); - sizes.push_back(blob_sizes[i]); - statuses.push_back(&statuses_buf[i]); - values.push_back(&value_buf[i]); + blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i], + kNoCompression, &value_buf[i], &statuses_buf[i]); ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } @@ -652,9 +837,8 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { statistics->Reset().PermitUncheckedError(); // Get half of blobs - blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, - file_size, offsets, sizes, statuses, values, - &bytes_read); + blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number, + file_size, blob_reqs, &bytes_read); uint64_t fs_read_bytes = 0; uint64_t ca_read_bytes = 0; @@ -709,27 +893,19 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { blob_offsets[i])); } - // Cache-only MultiGetBlob + // Cache-only MultiGetBlobFromOneFile read_options.read_tier = ReadTier::kBlockCacheTier; get_perf_context()->Reset(); statistics->Reset().PermitUncheckedError(); - key_refs.clear(); - offsets.clear(); - sizes.clear(); - statuses.clear(); - values.clear(); + blob_reqs.clear(); for (size_t i = 0; i < num_blobs; ++i) { - key_refs.emplace_back(std::cref(keys[i])); - offsets.push_back(blob_offsets[i]); - sizes.push_back(blob_sizes[i]); - statuses.push_back(&statuses_buf[i]); - values.push_back(&value_buf[i]); + blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i], + kNoCompression, &value_buf[i], &statuses_buf[i]); } - blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, - file_size, offsets, sizes, statuses, values, - &bytes_read); + blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number, + file_size, blob_reqs, &bytes_read); uint64_t blob_bytes = 0; for (size_t i = 0; i < num_blobs; ++i) { @@ -759,24 +935,17 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { options_.blob_cache->EraseUnRefEntries(); { - // Cache-only MultiGetBlob + // Cache-only MultiGetBlobFromOneFile uint64_t bytes_read = 0; read_options.read_tier = ReadTier::kBlockCacheTier; - autovector> key_refs; - autovector offsets; - autovector sizes; std::array statuses_buf; - autovector statuses; std::array value_buf; - autovector values; + autovector blob_reqs; for (size_t i = 0; i < num_blobs; i++) { - key_refs.emplace_back(std::cref(keys[i])); - offsets.push_back(blob_offsets[i]); - sizes.push_back(blob_sizes[i]); - statuses.push_back(&statuses_buf[i]); - values.push_back(&value_buf[i]); + blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i], + kNoCompression, &value_buf[i], &statuses_buf[i]); ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } @@ -784,9 +953,8 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { get_perf_context()->Reset(); statistics->Reset().PermitUncheckedError(); - blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, - file_size, offsets, sizes, statuses, values, - &bytes_read); + blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number, + file_size, blob_reqs, &bytes_read); for (size_t i = 0; i < num_blobs; ++i) { ASSERT_TRUE(statuses_buf[i].IsIncomplete()); @@ -809,40 +977,33 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { } { - // MultiGetBlob from non-existing file + // MultiGetBlobFromOneFile from non-existing file uint64_t bytes_read = 0; - uint64_t file_number = 100; // non-existing file + uint64_t non_existing_file_number = 100; read_options.read_tier = ReadTier::kReadAllTier; - autovector> key_refs; - autovector offsets; - autovector sizes; std::array statuses_buf; - autovector statuses; std::array value_buf; - autovector values; + autovector blob_reqs; for (size_t i = 0; i < num_blobs; i++) { - key_refs.emplace_back(std::cref(keys[i])); - offsets.push_back(blob_offsets[i]); - sizes.push_back(blob_sizes[i]); - statuses.push_back(&statuses_buf[i]); - values.push_back(&value_buf[i]); - ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, - blob_offsets[i])); + blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i], + kNoCompression, &value_buf[i], &statuses_buf[i]); + ASSERT_FALSE(blob_source.TEST_BlobInCache(non_existing_file_number, + file_size, blob_offsets[i])); } get_perf_context()->Reset(); statistics->Reset().PermitUncheckedError(); - blob_source.MultiGetBlob(read_options, key_refs, file_number, file_size, - offsets, sizes, statuses, values, &bytes_read); + blob_source.MultiGetBlobFromOneFile(read_options, non_existing_file_number, + file_size, blob_reqs, &bytes_read); for (size_t i = 0; i < num_blobs; ++i) { ASSERT_TRUE(statuses_buf[i].IsIOError()); ASSERT_TRUE(value_buf[i].empty()); - ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, - blob_offsets[i])); + ASSERT_FALSE(blob_source.TEST_BlobInCache(non_existing_file_number, + file_size, blob_offsets[i])); } ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0); diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index c63e1f654..417e5c739 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -299,6 +299,141 @@ TEST_F(DBBlobBasicTest, MultiGetBlobs) { } } +TEST_F(DBBlobBasicTest, MultiGetBlobsFromCache) { + Options options = GetDefaultOptions(); + + LRUCacheOptions co; + co.capacity = 2048; + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + auto backing_cache = NewLRUCache(co); + + constexpr size_t min_blob_size = 6; + options.min_blob_size = min_blob_size; + options.create_if_missing = true; + options.enable_blob_files = true; + options.blob_cache = backing_cache; + + BlockBasedTableOptions block_based_options; + block_based_options.no_block_cache = false; + block_based_options.block_cache = backing_cache; + block_based_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); + + DestroyAndReopen(options); + + // Put then retrieve three key-values. The first value is below the size limit + // and is thus stored inline; the other two are stored separately as blobs. + constexpr size_t num_keys = 3; + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "short"; + static_assert(sizeof(first_value) - 1 < min_blob_size, + "first_value too long to be inlined"); + + ASSERT_OK(Put(first_key, first_value)); + + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "long_value"; + static_assert(sizeof(second_value) - 1 >= min_blob_size, + "second_value too short to be stored as blob"); + + ASSERT_OK(Put(second_key, second_value)); + + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "other_long_value"; + static_assert(sizeof(third_value) - 1 >= min_blob_size, + "third_value too short to be stored as blob"); + + ASSERT_OK(Put(third_key, third_value)); + + ASSERT_OK(Flush()); + + ReadOptions read_options; + read_options.fill_cache = false; + + std::array keys{{first_key, second_key, third_key}}; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], second_value); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], third_value); + } + + // Try again with no I/O allowed. The first (inlined) value should be + // successfully read; however, the two blob values could only be read from the + // blob file, so for those the read should return Incomplete. + read_options.read_tier = kBlockCacheTier; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_TRUE(statuses[1].IsIncomplete()); + + ASSERT_TRUE(statuses[2].IsIncomplete()); + } + + // Fill the cache when reading blobs from the blob file. + read_options.read_tier = kReadAllTier; + read_options.fill_cache = true; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], second_value); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], third_value); + } + + // Try again with no I/O allowed. All blobs should be successfully read from + // the cache. + read_options.read_tier = kBlockCacheTier; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], second_value); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], third_value); + } +} + #ifndef ROCKSDB_LITE TEST_F(DBBlobBasicTest, MultiGetWithDirectIO) { Options options = GetDefaultOptions(); @@ -492,8 +627,23 @@ TEST_F(DBBlobBasicTest, MultiGetWithDirectIO) { TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) { Options options = GetDefaultOptions(); - options.enable_blob_files = true; + + LRUCacheOptions co; + co.capacity = 2 << 20; // 2MB + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + auto backing_cache = NewLRUCache(co); + options.min_blob_size = 0; + options.create_if_missing = true; + options.enable_blob_files = true; + options.blob_cache = backing_cache; + + BlockBasedTableOptions block_based_options; + block_based_options.no_block_cache = false; + block_based_options.block_cache = backing_cache; + block_based_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); Reopen(options); @@ -519,14 +669,64 @@ TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) { for (size_t i = 0; i < keys.size(); ++i) { keys[i] = key_strs[i]; } - std::array values; - std::array statuses; - db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), kNumKeys, &keys[0], - &values[0], &statuses[0]); - for (size_t i = 0; i < kNumKeys; ++i) { - ASSERT_OK(statuses[i]); - ASSERT_EQ(value_strs[i], values[i]); + ReadOptions read_options; + read_options.read_tier = kReadAllTier; + read_options.fill_cache = false; + + { + std::array values; + std::array statuses; + db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0], + &values[0], &statuses[0]); + + for (size_t i = 0; i < kNumKeys; ++i) { + ASSERT_OK(statuses[i]); + ASSERT_EQ(value_strs[i], values[i]); + } + } + + read_options.read_tier = kBlockCacheTier; + + { + std::array values; + std::array statuses; + db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0], + &values[0], &statuses[0]); + + for (size_t i = 0; i < kNumKeys; ++i) { + ASSERT_TRUE(statuses[i].IsIncomplete()); + ASSERT_TRUE(values[i].empty()); + } + } + + read_options.read_tier = kReadAllTier; + read_options.fill_cache = true; + + { + std::array values; + std::array statuses; + db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0], + &values[0], &statuses[0]); + + for (size_t i = 0; i < kNumKeys; ++i) { + ASSERT_OK(statuses[i]); + ASSERT_EQ(value_strs[i], values[i]); + } + } + + read_options.read_tier = kBlockCacheTier; + + { + std::array values; + std::array statuses; + db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0], + &values[0], &statuses[0]); + + for (size_t i = 0; i < kNumKeys; ++i) { + ASSERT_OK(statuses[i]); + ASSERT_EQ(value_strs[i], values[i]); + } } } diff --git a/db/version_set.cc b/db/version_set.cc index ea91a9b1e..c6098723d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1907,114 +1907,62 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, void Version::MultiGetBlob( const ReadOptions& read_options, MultiGetRange& range, - std::unordered_map& blob_rqs) { - if (read_options.read_tier == kBlockCacheTier) { - Status s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); - for (const auto& elem : blob_rqs) { - for (const auto& blob_rq : elem.second) { - const KeyContext& key_context = blob_rq.second; - assert(key_context.s); - assert(key_context.s->ok()); - *(key_context.s) = s; - assert(key_context.get_context); - auto& get_context = *(key_context.get_context); - get_context.MarkKeyMayExist(); - } - } - return; - } + std::unordered_map& blob_ctxs) { + assert(!blob_ctxs.empty()); - assert(!blob_rqs.empty()); - Status status; + autovector blob_reqs; - for (auto& elem : blob_rqs) { - const uint64_t blob_file_number = elem.first; + for (auto& ctx : blob_ctxs) { + const auto file_number = ctx.first; + const auto blob_file_meta = storage_info_.GetBlobFileMetaData(file_number); - if (!storage_info_.GetBlobFileMetaData(blob_file_number)) { - auto& blobs_in_file = elem.second; - for (const auto& blob : blobs_in_file) { - const KeyContext& key_context = blob.second; - *(key_context.s) = Status::Corruption("Invalid blob file number"); - } - continue; - } + autovector blob_reqs_in_file; + BlobReadContexts& blobs_in_file = ctx.second; + for (const auto& blob : blobs_in_file) { + const BlobIndex& blob_index = blob.first; + const KeyContext& key_context = blob.second; - CacheHandleGuard blob_file_reader; - assert(blob_source_); - status = - blob_source_->GetBlobFileReader(blob_file_number, &blob_file_reader); - assert(!status.ok() || blob_file_reader.GetValue()); + if (!blob_file_meta) { + *key_context.s = Status::Corruption("Invalid blob file number"); + continue; + } - auto& blobs_in_file = elem.second; - if (!status.ok()) { - for (const auto& blob : blobs_in_file) { - const KeyContext& key_context = blob.second; - *(key_context.s) = status; + if (blob_index.HasTTL() || blob_index.IsInlined()) { + *key_context.s = + Status::Corruption("Unexpected TTL/inlined blob index"); + continue; } - continue; - } - assert(blob_file_reader.GetValue()); - const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize(); - const CompressionType compression = - blob_file_reader.GetValue()->GetCompressionType(); + key_context.value->Reset(); + blob_reqs_in_file.emplace_back( + key_context.ukey_with_ts, blob_index.offset(), blob_index.size(), + blob_index.compression(), key_context.value, key_context.s); + } + if (blob_reqs_in_file.size() > 0) { + const auto file_size = blob_file_meta->GetBlobFileSize(); + blob_reqs.emplace_back(file_number, file_size, blob_reqs_in_file); + } + } - // sort blobs_in_file by file offset. - std::sort( - blobs_in_file.begin(), blobs_in_file.end(), - [](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool { - assert(lhs.first.file_number() == rhs.first.file_number()); - return lhs.first.offset() < rhs.first.offset(); - }); + if (blob_reqs.size() > 0) { + blob_source_->MultiGetBlob(read_options, blob_reqs, /*bytes_read=*/nullptr); + } - autovector> blob_read_key_contexts; - autovector> user_keys; - autovector offsets; - autovector value_sizes; - autovector statuses; - autovector values; + for (auto& ctx : blob_ctxs) { + BlobReadContexts& blobs_in_file = ctx.second; for (const auto& blob : blobs_in_file) { - const auto& blob_index = blob.first; const KeyContext& key_context = blob.second; - if (blob_index.HasTTL() || blob_index.IsInlined()) { - *(key_context.s) = - Status::Corruption("Unexpected TTL/inlined blob index"); - continue; - } - const uint64_t key_size = key_context.ukey_with_ts.size(); - const uint64_t offset = blob_index.offset(); - const uint64_t value_size = blob_index.size(); - if (!IsValidBlobOffset(offset, key_size, value_size, file_size)) { - *(key_context.s) = Status::Corruption("Invalid blob offset"); - continue; - } - if (blob_index.compression() != compression) { - *(key_context.s) = - Status::Corruption("Compression type mismatch when reading a blob"); - continue; - } - blob_read_key_contexts.emplace_back(std::cref(key_context)); - user_keys.emplace_back(std::cref(key_context.ukey_with_ts)); - offsets.push_back(blob_index.offset()); - value_sizes.push_back(blob_index.size()); - statuses.push_back(key_context.s); - values.push_back(key_context.value); - } - blob_file_reader.GetValue()->MultiGetBlob(read_options, user_keys, offsets, - value_sizes, statuses, values, - /*bytes_read=*/nullptr); - size_t num = blob_read_key_contexts.size(); - assert(num == user_keys.size()); - assert(num == offsets.size()); - assert(num == value_sizes.size()); - assert(num == statuses.size()); - assert(num == values.size()); - for (size_t i = 0; i < num; ++i) { - if (statuses[i]->ok()) { - range.AddValueSize(blob_read_key_contexts[i].get().value->size()); + if (key_context.s->ok()) { + range.AddValueSize(key_context.value->size()); if (range.GetValueSize() > read_options.value_size_soft_limit) { - *(blob_read_key_contexts[i].get().s) = Status::Aborted(); + *key_context.s = Status::Aborted(); } + } else if (key_context.s->IsIncomplete()) { + // read_options.read_tier == kBlockCacheTier + // Cannot read blob(s): no disk I/O allowed + assert(key_context.get_context); + auto& get_context = *(key_context.get_context); + get_context.MarkKeyMayExist(); } } } @@ -2253,7 +2201,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end()); // blob_file => [[blob_idx, it], ...] - std::unordered_map blob_rqs; + std::unordered_map blob_ctxs; int prev_level = -1; while (!fp.IsSearchEnded()) { @@ -2270,7 +2218,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, // Call MultiGetFromSST for looking up a single file s = MultiGetFromSST(read_options, fp.CurrentFileRange(), fp.GetHitFileLevel(), fp.IsHitFileLastInLevel(), f, - blob_rqs, num_filter_read, num_index_read, + blob_ctxs, num_filter_read, num_index_read, num_sst_read); if (fp.GetHitFileLevel() == 0) { dump_stats_for_l0_file = true; @@ -2285,7 +2233,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, while (f != nullptr) { mget_tasks.emplace_back(MultiGetFromSSTCoroutine( read_options, fp.CurrentFileRange(), fp.GetHitFileLevel(), - fp.IsHitFileLastInLevel(), f, blob_rqs, num_filter_read, + fp.IsHitFileLastInLevel(), f, blob_ctxs, num_filter_read, num_index_read, num_sst_read)); if (fp.KeyMaySpanNextFile()) { break; @@ -2358,8 +2306,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, num_level_read); } - if (s.ok() && !blob_rqs.empty()) { - MultiGetBlob(read_options, keys_with_blobs_range, blob_rqs); + if (s.ok() && !blob_ctxs.empty()) { + MultiGetBlob(read_options, keys_with_blobs_range, blob_ctxs); } // Process any left over keys diff --git a/db/version_set.h b/db/version_set.h index a1aec545e..93c670706 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -860,11 +860,11 @@ class Version { FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read) const; - using BlobReadRequest = + using BlobReadContext = std::pair>; - using BlobReadRequests = std::vector; + using BlobReadContexts = std::vector; void MultiGetBlob(const ReadOptions& read_options, MultiGetRange& range, - std::unordered_map& blob_rqs); + std::unordered_map& blob_ctxs); // Loads some stats information from files (if update_stats is set) and // populates derived data structures. Call without mutex held. It needs to be @@ -989,7 +989,7 @@ class Version { /* ret_type */ Status, /* func_name */ MultiGetFromSST, const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f, - std::unordered_map& blob_rqs, + std::unordered_map& blob_ctxs, uint64_t& num_filter_read, uint64_t& num_index_read, uint64_t& num_sst_read); diff --git a/db/version_set_sync_and_async.h b/db/version_set_sync_and_async.h index e06cc08da..af281d415 100644 --- a/db/version_set_sync_and_async.h +++ b/db/version_set_sync_and_async.h @@ -14,7 +14,7 @@ namespace ROCKSDB_NAMESPACE { DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST) (const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f, - std::unordered_map& blob_rqs, + std::unordered_map& blob_ctxs, uint64_t& num_filter_read, uint64_t& num_index_read, uint64_t& num_sst_read) { bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && get_perf_context()->per_level_perf_context_enabled; @@ -110,7 +110,7 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST) Status tmp_s = blob_index.DecodeFrom(blob_index_slice); if (tmp_s.ok()) { const uint64_t blob_file_num = blob_index.file_number(); - blob_rqs[blob_file_num].emplace_back( + blob_ctxs[blob_file_num].emplace_back( std::make_pair(blob_index, std::cref(*iter))); } else { *(iter->s) = tmp_s;