diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc index 5e96e7ae9..a4eabb605 100644 --- a/db/blob/blob_file_reader.cc +++ b/db/blob/blob_file_reader.cc @@ -8,6 +8,7 @@ #include #include +#include "db/blob/blob_contents.h" #include "db/blob/blob_log_format.h" #include "file/file_prefetch_buffer.h" #include "file/filename.h" @@ -283,14 +284,12 @@ BlobFileReader::BlobFileReader( BlobFileReader::~BlobFileReader() = default; -Status BlobFileReader::GetBlob(const ReadOptions& read_options, - const Slice& user_key, uint64_t offset, - uint64_t value_size, - CompressionType compression_type, - FilePrefetchBuffer* prefetch_buffer, - PinnableSlice* value, - uint64_t* bytes_read) const { - assert(value); +Status BlobFileReader::GetBlob( + const ReadOptions& read_options, const Slice& user_key, uint64_t offset, + uint64_t value_size, CompressionType compression_type, + FilePrefetchBuffer* prefetch_buffer, MemoryAllocator* allocator, + std::unique_ptr* result, uint64_t* bytes_read) const { + assert(result); const uint64_t key_size = user_key.size(); @@ -361,8 +360,8 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, const Slice value_slice(record_slice.data() + adjustment, value_size); { - const Status s = UncompressBlobIfNeeded(value_slice, compression_type, - clock_, statistics_, value); + const Status s = UncompressBlobIfNeeded( + value_slice, compression_type, allocator, clock_, statistics_, result); if (!s.ok()) { return s; } @@ -375,16 +374,18 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, return Status::OK(); } -void BlobFileReader::MultiGetBlob(const ReadOptions& read_options, - autovector& blob_reqs, - uint64_t* bytes_read) const { +void BlobFileReader::MultiGetBlob( + const ReadOptions& read_options, MemoryAllocator* allocator, + autovector>>& + blob_reqs, + uint64_t* bytes_read) const { const size_t num_blobs = blob_reqs.size(); assert(num_blobs > 0); assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE); #ifndef NDEBUG for (size_t i = 0; i < num_blobs - 1; ++i) { - assert(blob_reqs[i]->offset <= blob_reqs[i + 1]->offset); + assert(blob_reqs[i].first->offset <= blob_reqs[i + 1].first->offset); } #endif // !NDEBUG @@ -393,16 +394,21 @@ void BlobFileReader::MultiGetBlob(const ReadOptions& read_options, uint64_t total_len = 0; read_reqs.reserve(num_blobs); for (size_t i = 0; i < num_blobs; ++i) { - const size_t key_size = blob_reqs[i]->user_key->size(); - const uint64_t offset = blob_reqs[i]->offset; - const uint64_t value_size = blob_reqs[i]->len; + BlobReadRequest* const req = blob_reqs[i].first; + assert(req); + assert(req->user_key); + assert(req->status); + + const size_t key_size = req->user_key->size(); + const uint64_t offset = req->offset; + const uint64_t value_size = req->len; if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) { - *blob_reqs[i]->status = Status::Corruption("Invalid blob offset"); + *req->status = Status::Corruption("Invalid blob offset"); continue; } - if (blob_reqs[i]->compression != compression_type_) { - *blob_reqs[i]->status = + if (req->compression != compression_type_) { + *req->status = Status::Corruption("Compression type mismatch when reading a blob"); continue; } @@ -411,12 +417,12 @@ void BlobFileReader::MultiGetBlob(const ReadOptions& read_options, read_options.verify_checksums ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) : 0; - assert(blob_reqs[i]->offset >= adjustment); + assert(req->offset >= adjustment); adjustments.push_back(adjustment); FSReadRequest read_req = {}; - read_req.offset = blob_reqs[i]->offset - adjustment; - read_req.len = blob_reqs[i]->len + adjustment; + read_req.offset = req->offset - adjustment; + read_req.len = req->len + adjustment; read_reqs.emplace_back(read_req); total_len += read_req.len; } @@ -450,8 +456,11 @@ void BlobFileReader::MultiGetBlob(const ReadOptions& read_options, for (auto& req : read_reqs) { req.status.PermitUncheckedError(); } - for (auto& req : blob_reqs) { + for (auto& blob_req : blob_reqs) { + BlobReadRequest* const req = blob_req.first; + assert(req); assert(req->status); + if (!req->status->IsCorruption()) { // Avoid overwriting corruption status. *req->status = s; @@ -464,38 +473,42 @@ void BlobFileReader::MultiGetBlob(const ReadOptions& read_options, uint64_t total_bytes = 0; for (size_t i = 0, j = 0; i < num_blobs; ++i) { - assert(blob_reqs[i]->status); - if (!blob_reqs[i]->status->ok()) { + BlobReadRequest* const req = blob_reqs[i].first; + assert(req); + assert(req->user_key); + assert(req->status); + + if (!req->status->ok()) { continue; } assert(j < read_reqs.size()); - auto& req = read_reqs[j++]; - const auto& record_slice = req.result; - if (req.status.ok() && record_slice.size() != req.len) { - req.status = IOStatus::Corruption("Failed to read data from blob file"); + auto& read_req = read_reqs[j++]; + const auto& record_slice = read_req.result; + if (read_req.status.ok() && record_slice.size() != read_req.len) { + read_req.status = + IOStatus::Corruption("Failed to read data from blob file"); } - *blob_reqs[i]->status = req.status; - if (!blob_reqs[i]->status->ok()) { + *req->status = read_req.status; + if (!req->status->ok()) { continue; } // Verify checksums if enabled if (read_options.verify_checksums) { - *blob_reqs[i]->status = - VerifyBlob(record_slice, *blob_reqs[i]->user_key, blob_reqs[i]->len); - if (!blob_reqs[i]->status->ok()) { + *req->status = VerifyBlob(record_slice, *req->user_key, req->len); + if (!req->status->ok()) { continue; } } // Uncompress blob if needed - Slice value_slice(record_slice.data() + adjustments[i], blob_reqs[i]->len); - *blob_reqs[i]->status = - UncompressBlobIfNeeded(value_slice, compression_type_, clock_, - statistics_, blob_reqs[i]->result); - if (blob_reqs[i]->status->ok()) { + Slice value_slice(record_slice.data() + adjustments[i], req->len); + *req->status = + UncompressBlobIfNeeded(value_slice, compression_type_, allocator, + clock_, statistics_, &blob_reqs[i].second); + if (req->status->ok()) { total_bytes += record_slice.size(); } } @@ -549,15 +562,18 @@ Status BlobFileReader::VerifyBlob(const Slice& record_slice, return Status::OK(); } -Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice, - CompressionType compression_type, - SystemClock* clock, - Statistics* statistics, - PinnableSlice* value) { - assert(value); +Status BlobFileReader::UncompressBlobIfNeeded( + const Slice& value_slice, CompressionType compression_type, + MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics, + std::unique_ptr* result) { + assert(result); if (compression_type == kNoCompression) { - SaveValue(value_slice, value); + CacheAllocationPtr allocation = + AllocateBlock(value_slice.size(), allocator); + memcpy(allocation.get(), value_slice.data(), value_slice.size()); + + *result = BlobContents::Create(std::move(allocation), value_slice.size()); return Status::OK(); } @@ -568,7 +584,6 @@ Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice, size_t uncompressed_size = 0; constexpr uint32_t compression_format_version = 2; - constexpr MemoryAllocator* allocator = nullptr; CacheAllocationPtr output; @@ -587,19 +602,9 @@ Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice, return Status::Corruption("Unable to uncompress blob"); } - SaveValue(Slice(output.get(), uncompressed_size), value); + *result = BlobContents::Create(std::move(output), uncompressed_size); return Status::OK(); } -void BlobFileReader::SaveValue(const Slice& src, PinnableSlice* dst) { - assert(dst); - - if (dst->IsPinned()) { - dst->Reset(); - } - - dst->PinSelf(src); -} - } // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_reader.h b/db/blob/blob_file_reader.h index a30b9234c..75b756da1 100644 --- a/db/blob/blob_file_reader.h +++ b/db/blob/blob_file_reader.h @@ -23,7 +23,7 @@ class HistogramImpl; struct ReadOptions; class Slice; class FilePrefetchBuffer; -class PinnableSlice; +class BlobContents; class Statistics; class BlobFileReader { @@ -44,13 +44,17 @@ class BlobFileReader { Status GetBlob(const ReadOptions& read_options, const Slice& user_key, uint64_t offset, uint64_t value_size, CompressionType compression_type, - FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, + FilePrefetchBuffer* prefetch_buffer, + MemoryAllocator* allocator, + std::unique_ptr* result, uint64_t* bytes_read) const; // offsets must be sorted in ascending order by caller. - void MultiGetBlob(const ReadOptions& read_options, - autovector& blob_reqs, - uint64_t* bytes_read) const; + void MultiGetBlob( + const ReadOptions& read_options, MemoryAllocator* allocator, + autovector>>& + blob_reqs, + uint64_t* bytes_read) const; CompressionType GetCompressionType() const { return compression_type_; } @@ -89,11 +93,10 @@ class BlobFileReader { static Status UncompressBlobIfNeeded(const Slice& value_slice, CompressionType compression_type, + MemoryAllocator* allocator, SystemClock* clock, Statistics* statistics, - PinnableSlice* value); - - static void SaveValue(const Slice& src, PinnableSlice* dst); + std::unique_ptr* result); std::unique_ptr file_reader_; uint64_t file_size_; diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc index 15ea6c953..413040fcb 100644 --- a/db/blob/blob_file_reader_test.cc +++ b/db/blob/blob_file_reader_test.cc @@ -8,6 +8,7 @@ #include #include +#include "db/blob/blob_contents.h" #include "db/blob/blob_log_format.h" #include "db/blob/blob_log_writer.h" #include "env/mock_env.h" @@ -180,15 +181,17 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { read_options.verify_checksums = false; constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; + constexpr MemoryAllocator* allocator = nullptr; { - PinnableSlice value; + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, keys[0], blob_offsets[0], blob_sizes[0], kNoCompression, prefetch_buffer, - &value, &bytes_read)); - ASSERT_EQ(value, blobs[0]); + allocator, &value, &bytes_read)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(value->data(), blobs[0]); ASSERT_EQ(bytes_read, blob_sizes[0]); // MultiGetBlob @@ -196,22 +199,25 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { size_t total_size = 0; std::array statuses_buf; - std::array value_buf; std::array requests_buf; - autovector blob_reqs; + autovector>> + blob_reqs; for (size_t i = 0; i < num_blobs; ++i) { requests_buf[i] = BlobReadRequest(keys[i], blob_offsets[i], blob_sizes[i], - kNoCompression, &value_buf[i], &statuses_buf[i]); - blob_reqs.push_back(&requests_buf[i]); + kNoCompression, nullptr, &statuses_buf[i]); + blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr()); } - reader->MultiGetBlob(read_options, blob_reqs, &bytes_read); + reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read); for (size_t i = 0; i < num_blobs; ++i) { + const auto& result = blob_reqs[i].second; + ASSERT_OK(statuses_buf[i]); - ASSERT_EQ(value_buf[i], blobs[i]); + ASSERT_NE(result, nullptr); + ASSERT_EQ(result->data(), blobs[i]); total_size += blob_sizes[i]; } ASSERT_EQ(bytes_read, total_size); @@ -220,13 +226,14 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { read_options.verify_checksums = true; { - PinnableSlice value; + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, keys[1], blob_offsets[1], blob_sizes[1], kNoCompression, prefetch_buffer, - &value, &bytes_read)); - ASSERT_EQ(value, blobs[1]); + allocator, &value, &bytes_read)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(value->data(), blobs[1]); const uint64_t key_size = keys[1].size(); ASSERT_EQ(bytes_read, @@ -236,47 +243,50 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { // Invalid offset (too close to start of file) { - PinnableSlice value; + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(read_options, keys[0], blob_offsets[0] - 1, blob_sizes[0], kNoCompression, prefetch_buffer, - &value, &bytes_read) + allocator, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(value, nullptr); ASSERT_EQ(bytes_read, 0); } // Invalid offset (too close to end of file) { - PinnableSlice value; + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(read_options, keys[2], blob_offsets[2] + 1, blob_sizes[2], kNoCompression, prefetch_buffer, - &value, &bytes_read) + allocator, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(value, nullptr); ASSERT_EQ(bytes_read, 0); } // Incorrect compression type { - PinnableSlice value; + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(read_options, keys[0], blob_offsets[0], - blob_sizes[0], kZSTD, prefetch_buffer, &value, - &bytes_read) + blob_sizes[0], kZSTD, prefetch_buffer, allocator, + &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(value, nullptr); ASSERT_EQ(bytes_read, 0); } // Incorrect key size { constexpr char shorter_key[] = "k"; - PinnableSlice value; + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_TRUE(reader @@ -284,8 +294,9 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { blob_offsets[0] - (keys[0].size() - sizeof(shorter_key) + 1), blob_sizes[0], kNoCompression, prefetch_buffer, - &value, &bytes_read) + allocator, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(value, nullptr); ASSERT_EQ(bytes_read, 0); // MultiGetBlob @@ -302,18 +313,18 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { blob_offsets[2]}; std::array statuses_buf; - std::array value_buf; std::array requests_buf; - autovector blob_reqs; + autovector>> + blob_reqs; for (size_t i = 0; i < num_blobs; ++i) { requests_buf[i] = BlobReadRequest(key_refs[i], offsets[i], blob_sizes[i], - kNoCompression, &value_buf[i], &statuses_buf[i]); - blob_reqs.push_back(&requests_buf[i]); + kNoCompression, nullptr, &statuses_buf[i]); + blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr()); } - reader->MultiGetBlob(read_options, blob_reqs, &bytes_read); + reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read); for (size_t i = 0; i < num_blobs; ++i) { if (i == 1) { @@ -327,14 +338,15 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { // Incorrect key { constexpr char incorrect_key[] = "foo1"; - PinnableSlice value; + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(read_options, incorrect_key, blob_offsets[0], blob_sizes[0], kNoCompression, prefetch_buffer, - &value, &bytes_read) + allocator, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(value, nullptr); ASSERT_EQ(bytes_read, 0); // MultiGetBlob @@ -346,19 +358,18 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { key_refs[2] = std::cref(wrong_key_slice); std::array statuses_buf; - std::array value_buf; std::array requests_buf; + autovector>> + blob_reqs; for (size_t i = 0; i < num_blobs; ++i) { requests_buf[i] = BlobReadRequest(key_refs[i], blob_offsets[i], blob_sizes[i], - kNoCompression, &value_buf[i], &statuses_buf[i]); + kNoCompression, nullptr, &statuses_buf[i]); + blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr()); } - autovector blob_reqs = { - &requests_buf[0], &requests_buf[1], &requests_buf[2]}; - - reader->MultiGetBlob(read_options, blob_reqs, &bytes_read); + reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read); for (size_t i = 0; i < num_blobs; ++i) { if (i == num_blobs - 1) { @@ -371,14 +382,15 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { // Incorrect value size { - PinnableSlice value; + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(read_options, keys[1], blob_offsets[1], blob_sizes[1] + 1, kNoCompression, - prefetch_buffer, &value, &bytes_read) + prefetch_buffer, allocator, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(value, nullptr); ASSERT_EQ(bytes_read, 0); // MultiGetBlob @@ -388,23 +400,26 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { } std::array statuses_buf; - std::array value_buf; std::array requests_buf; requests_buf[0] = BlobReadRequest(key_refs[0], blob_offsets[0], blob_sizes[0], - kNoCompression, &value_buf[0], &statuses_buf[0]); + kNoCompression, nullptr, &statuses_buf[0]); requests_buf[1] = BlobReadRequest(key_refs[1], blob_offsets[1], blob_sizes[1] + 1, - kNoCompression, &value_buf[1], &statuses_buf[1]); + kNoCompression, nullptr, &statuses_buf[1]); requests_buf[2] = BlobReadRequest(key_refs[2], blob_offsets[2], blob_sizes[2], - kNoCompression, &value_buf[2], &statuses_buf[2]); + kNoCompression, nullptr, &statuses_buf[2]); - autovector blob_reqs = { - &requests_buf[0], &requests_buf[1], &requests_buf[2]}; + autovector>> + blob_reqs; - reader->MultiGetBlob(read_options, blob_reqs, &bytes_read); + for (size_t i = 0; i < num_blobs; ++i) { + blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr()); + } + + reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read); for (size_t i = 0; i < num_blobs; ++i) { if (i != 1) { @@ -665,14 +680,17 @@ TEST_F(BlobFileReaderTest, BlobCRCError) { SyncPoint::GetInstance()->EnableProcessing(); constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; - PinnableSlice value; + constexpr MemoryAllocator* allocator = nullptr; + + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, prefetch_buffer, &value, + kNoCompression, prefetch_buffer, allocator, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(value, nullptr); ASSERT_EQ(bytes_read, 0); SyncPoint::GetInstance()->DisableProcessing(); @@ -720,28 +738,31 @@ TEST_F(BlobFileReaderTest, Compression) { read_options.verify_checksums = false; constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; + constexpr MemoryAllocator* allocator = nullptr; { - PinnableSlice value; + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kSnappyCompression, prefetch_buffer, &value, - &bytes_read)); - ASSERT_EQ(value, blob); + kSnappyCompression, prefetch_buffer, allocator, + &value, &bytes_read)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(value->data(), blob); ASSERT_EQ(bytes_read, blob_size); } read_options.verify_checksums = true; { - PinnableSlice value; + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kSnappyCompression, prefetch_buffer, &value, - &bytes_read)); - ASSERT_EQ(value, blob); + kSnappyCompression, prefetch_buffer, allocator, + &value, &bytes_read)); + ASSERT_NE(value, nullptr); + ASSERT_EQ(value->data(), blob); constexpr uint64_t key_size = sizeof(key) - 1; ASSERT_EQ(bytes_read, @@ -799,14 +820,17 @@ TEST_F(BlobFileReaderTest, UncompressionError) { SyncPoint::GetInstance()->EnableProcessing(); constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; - PinnableSlice value; + constexpr MemoryAllocator* allocator = nullptr; + + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kSnappyCompression, prefetch_buffer, &value, - &bytes_read) + kSnappyCompression, prefetch_buffer, allocator, + &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(value, nullptr); ASSERT_EQ(bytes_read, 0); SyncPoint::GetInstance()->DisableProcessing(); @@ -885,14 +909,17 @@ TEST_P(BlobFileReaderIOErrorTest, IOError) { ASSERT_OK(s); constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; - PinnableSlice value; + constexpr MemoryAllocator* allocator = nullptr; + + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, prefetch_buffer, &value, - &bytes_read) + kNoCompression, prefetch_buffer, allocator, + &value, &bytes_read) .IsIOError()); + ASSERT_EQ(value, nullptr); ASSERT_EQ(bytes_read, 0); } @@ -970,14 +997,17 @@ TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) { ASSERT_OK(s); constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; - PinnableSlice value; + constexpr MemoryAllocator* allocator = nullptr; + + std::unique_ptr value; uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, prefetch_buffer, &value, - &bytes_read) + kNoCompression, prefetch_buffer, allocator, + &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(value, nullptr); ASSERT_EQ(bytes_read, 0); } diff --git a/db/blob/blob_source.cc b/db/blob/blob_source.cc index 12bf2b01d..bfade2507 100644 --- a/db/blob/blob_source.cc +++ b/db/blob/blob_source.cc @@ -45,61 +45,59 @@ BlobSource::BlobSource(const ImmutableOptions* immutable_options, BlobSource::~BlobSource() = default; Status BlobSource::GetBlobFromCache( - const Slice& cache_key, CacheHandleGuard* blob) const { - assert(blob); - assert(blob->IsEmpty()); + const Slice& cache_key, CacheHandleGuard* cached_blob) const { assert(blob_cache_); assert(!cache_key.empty()); + assert(cached_blob); + assert(cached_blob->IsEmpty()); Cache::Handle* cache_handle = nullptr; cache_handle = GetEntryFromCache(cache_key); if (cache_handle != nullptr) { - *blob = CacheHandleGuard(blob_cache_.get(), cache_handle); + *cached_blob = + CacheHandleGuard(blob_cache_.get(), cache_handle); + + assert(cached_blob->GetValue()); PERF_COUNTER_ADD(blob_cache_hit_count, 1); RecordTick(statistics_, BLOB_DB_CACHE_HIT); - RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ, blob->GetValue()->size()); + RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ, + cached_blob->GetValue()->size()); return Status::OK(); } - assert(blob->IsEmpty()); - RecordTick(statistics_, BLOB_DB_CACHE_MISS); return Status::NotFound("Blob not found in cache"); } -Status BlobSource::PutBlobIntoCache(const Slice& cache_key, - CacheHandleGuard* cached_blob, - PinnableSlice* blob) const { - assert(blob); - assert(!cache_key.empty()); +Status BlobSource::PutBlobIntoCache( + const Slice& cache_key, std::unique_ptr* blob, + CacheHandleGuard* cached_blob) const { assert(blob_cache_); - - Status s; - const Cache::Priority priority = Cache::Priority::BOTTOM; - - // Objects to be put into the cache have to be heap-allocated and - // self-contained, i.e. own their contents. The Cache has to be able to take - // unique ownership of them. - CacheAllocationPtr allocation = - AllocateBlock(blob->size(), blob_cache_->memory_allocator()); - memcpy(allocation.get(), blob->data(), blob->size()); - std::unique_ptr buf = - BlobContents::Create(std::move(allocation), blob->size()); + assert(!cache_key.empty()); + assert(blob); + assert(*blob); + assert(cached_blob); + assert(cached_blob->IsEmpty()); Cache::Handle* cache_handle = nullptr; - s = InsertEntryIntoCache(cache_key, buf.get(), buf->ApproximateMemoryUsage(), - &cache_handle, priority); + const Status s = InsertEntryIntoCache(cache_key, blob->get(), + (*blob)->ApproximateMemoryUsage(), + &cache_handle, Cache::Priority::BOTTOM); if (s.ok()) { - buf.release(); + blob->release(); + assert(cache_handle != nullptr); *cached_blob = CacheHandleGuard(blob_cache_.get(), cache_handle); + assert(cached_blob->GetValue()); + RecordTick(statistics_, BLOB_DB_CACHE_ADD); - RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE, blob->size()); + RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE, + cached_blob->GetValue()->size()); } else { RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES); @@ -149,6 +147,24 @@ void BlobSource::PinCachedBlob(CacheHandleGuard* cached_blob, cached_blob->TransferTo(value); } +void BlobSource::PinOwnedBlob(std::unique_ptr* owned_blob, + PinnableSlice* value) { + assert(owned_blob); + assert(*owned_blob); + assert(value); + + BlobContents* const blob = owned_blob->release(); + assert(blob); + + value->Reset(); + value->PinSlice( + blob->data(), + [](void* arg1, void* /* arg2 */) { + delete static_cast(arg1); + }, + blob, nullptr); +} + Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value, size_t charge, Cache::Handle** cache_handle, @@ -191,7 +207,7 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, if (blob_cache_) { Slice key = cache_key.AsSlice(); s = GetBlobFromCache(key, &blob_handle); - if (s.ok() && blob_handle.GetValue()) { + if (s.ok()) { PinCachedBlob(&blob_handle, value); // For consistency, the size of on-disk (possibly compressed) blob record @@ -221,6 +237,8 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, // Can't find the blob from the cache. Since I/O is allowed, read from the // file. + std::unique_ptr blob_contents; + { CacheHandleGuard blob_file_reader; s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader); @@ -234,10 +252,14 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, return Status::Corruption("Compression type mismatch when reading blob"); } + MemoryAllocator* const allocator = (blob_cache_ && read_options.fill_cache) + ? blob_cache_->memory_allocator() + : nullptr; + uint64_t read_size = 0; s = blob_file_reader.GetValue()->GetBlob( read_options, user_key, offset, value_size, compression_type, - prefetch_buffer, value, &read_size); + prefetch_buffer, allocator, &blob_contents, &read_size); if (!s.ok()) { return s; } @@ -250,12 +272,14 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, // If filling cache is allowed and a cache is configured, try to put the // blob to the cache. Slice key = cache_key.AsSlice(); - s = PutBlobIntoCache(key, &blob_handle, value); + s = PutBlobIntoCache(key, &blob_contents, &blob_handle); if (!s.ok()) { return s; } PinCachedBlob(&blob_handle, value); + } else { + PinOwnedBlob(&blob_contents, value); } assert(s.ok()); @@ -321,7 +345,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, const Status s = GetBlobFromCache(key, &blob_handle); - if (s.ok() && blob_handle.GetValue()) { + if (s.ok()) { assert(req.status); *req.status = s; @@ -356,8 +380,10 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, if (no_io) { for (size_t i = 0; i < num_blobs; ++i) { if (!(cache_hit_mask & (Mask{1} << i))) { - assert(blob_reqs[i].status); - *blob_reqs[i].status = + BlobReadRequest& req = blob_reqs[i]; + assert(req.status); + + *req.status = Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); } } @@ -366,12 +392,13 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, { // Find the rest of blobs from the file since I/O is allowed. - autovector _blob_reqs; + autovector>> + _blob_reqs; uint64_t _bytes_read = 0; for (size_t i = 0; i < num_blobs; ++i) { if (!(cache_hit_mask & (Mask{1} << i))) { - _blob_reqs.push_back(&blob_reqs[i]); + _blob_reqs.emplace_back(&blob_reqs[i], std::unique_ptr()); } } @@ -380,28 +407,35 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader); if (!s.ok()) { for (size_t i = 0; i < _blob_reqs.size(); ++i) { - assert(_blob_reqs[i]->status); - *_blob_reqs[i]->status = s; + BlobReadRequest* const req = _blob_reqs[i].first; + assert(req); + assert(req->status); + + *req->status = s; } return; } assert(blob_file_reader.GetValue()); - blob_file_reader.GetValue()->MultiGetBlob(read_options, _blob_reqs, - &_bytes_read); + MemoryAllocator* const allocator = (blob_cache_ && read_options.fill_cache) + ? blob_cache_->memory_allocator() + : nullptr; + + blob_file_reader.GetValue()->MultiGetBlob(read_options, allocator, + _blob_reqs, &_bytes_read); if (blob_cache_ && read_options.fill_cache) { // If filling cache is allowed and a cache is configured, try to put // the blob(s) to the cache. - for (BlobReadRequest* req : _blob_reqs) { + for (auto& [req, blob_contents] : _blob_reqs) { assert(req); if (req->status->ok()) { CacheHandleGuard blob_handle; const CacheKey cache_key = base_cache_key.WithOffset(req->offset); const Slice key = cache_key.AsSlice(); - s = PutBlobIntoCache(key, &blob_handle, req->result); + s = PutBlobIntoCache(key, &blob_contents, &blob_handle); if (!s.ok()) { *req->status = s; } else { @@ -409,6 +443,14 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, } } } + } else { + for (auto& [req, blob_contents] : _blob_reqs) { + assert(req); + + if (req->status->ok()) { + PinOwnedBlob(&blob_contents, req->result); + } + } } total_bytes += _bytes_read; diff --git a/db/blob/blob_source.h b/db/blob/blob_source.h index 2027e16fa..2ed296eeb 100644 --- a/db/blob/blob_source.h +++ b/db/blob/blob_source.h @@ -6,6 +6,7 @@ #pragma once #include +#include #include "cache/cache_helpers.h" #include "cache/cache_key.h" @@ -107,15 +108,18 @@ class BlobSource { private: Status GetBlobFromCache(const Slice& cache_key, - CacheHandleGuard* blob) const; + CacheHandleGuard* cached_blob) const; Status PutBlobIntoCache(const Slice& cache_key, - CacheHandleGuard* cached_blob, - PinnableSlice* blob) const; + std::unique_ptr* blob, + CacheHandleGuard* cached_blob) const; static void PinCachedBlob(CacheHandleGuard* cached_blob, PinnableSlice* value); + static void PinOwnedBlob(std::unique_ptr* owned_blob, + PinnableSlice* value); + Cache::Handle* GetEntryFromCache(const Slice& key) const; Status InsertEntryIntoCache(const Slice& key, BlobContents* value, diff --git a/db/blob/blob_source_test.cc b/db/blob/blob_source_test.cc index 8a4964ddf..0f2d19260 100644 --- a/db/blob/blob_source_test.cc +++ b/db/blob/blob_source_test.cc @@ -219,7 +219,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { kNoCompression, prefetch_buffer, &values[i], &bytes_read)); ASSERT_EQ(values[i], blobs[i]); - ASSERT_FALSE(values[i].IsPinned()); + ASSERT_TRUE(values[i].IsPinned()); ASSERT_EQ(bytes_read, BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);