diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc index 3651bb4a7..751a677d9 100644 --- a/db/blob/blob_file_reader.cc +++ b/db/blob/blob_file_reader.cc @@ -443,50 +443,40 @@ void BlobFileReader::MultiGetBlob( } assert(s.ok()); + + uint64_t total_bytes = 0; for (size_t i = 0; i < num_blobs; ++i) { auto& req = read_reqs[i]; + const auto& record_slice = req.result; + assert(statuses[i]); - if (req.status.ok() && req.result.size() != req.len) { + if (req.status.ok() && record_slice.size() != req.len) { req.status = IOStatus::Corruption("Failed to read data from blob file"); } + *statuses[i] = req.status; - } + if (!statuses[i]->ok()) { + continue; + } - if (read_options.verify_checksums) { - for (size_t i = 0; i < num_blobs; ++i) { - assert(statuses[i]); + // Verify checksums if enabled + if (read_options.verify_checksums) { + *statuses[i] = VerifyBlob(record_slice, user_keys[i], value_sizes[i]); if (!statuses[i]->ok()) { continue; } - const Slice& record_slice = read_reqs[i].result; - s = VerifyBlob(record_slice, user_keys[i], value_sizes[i]); - if (!s.ok()) { - assert(statuses[i]); - *statuses[i] = s; - } } - } - for (size_t i = 0; i < num_blobs; ++i) { - assert(statuses[i]); - if (!statuses[i]->ok()) { - continue; - } - const Slice& record_slice = read_reqs[i].result; - const Slice value_slice(record_slice.data() + adjustments[i], - value_sizes[i]); - s = UncompressBlobIfNeeded(value_slice, compression_type_, clock_, - statistics_, values[i]); - if (!s.ok()) { - *statuses[i] = s; + // Uncompress blob if needed + Slice value_slice(record_slice.data() + adjustments[i], value_sizes[i]); + *statuses[i] = UncompressBlobIfNeeded(value_slice, compression_type_, + clock_, statistics_, values[i]); + if (statuses[i]->ok()) { + total_bytes += record_slice.size(); } } if (bytes_read) { - uint64_t total_bytes = 0; - for (const auto& req : read_reqs) { - total_bytes += req.result.size(); - } *bytes_read = total_bytes; } } diff --git a/db/blob/blob_source.cc b/db/blob/blob_source.cc index cde76c305..c066e8d85 100644 --- a/db/blob/blob_source.cc +++ b/db/blob/blob_source.cc @@ -9,6 +9,7 @@ #include #include "db/blob/blob_file_reader.h" +#include "db/blob/blob_log_format.h" #include "options/cf_options.h" #include "table/multiget_context.h" @@ -99,8 +100,16 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, Slice key = cache_key.AsSlice(); s = GetBlobFromCache(key, &blob_entry); if (s.ok() && blob_entry.GetValue()) { + // For consistency, the size of on-disk (possibly compressed) blob record + // is assigned to bytes_read. if (bytes_read) { - *bytes_read = value_size; + uint64_t adjustment = + read_options.verify_checksums + ? BlobLogRecord::CalculateAdjustmentForRecordHeader( + user_key.size()) + : 0; + assert(offset >= adjustment); + *bytes_read = value_size + adjustment; } value->PinSelf(*blob_entry.GetValue()); return s; @@ -191,13 +200,20 @@ void BlobSource::MultiGetBlob( s = GetBlobFromCache(key, &blob_entry); if (s.ok() && blob_entry.GetValue()) { assert(statuses[i]); - *statuses[i] = s; blobs[i]->PinSelf(*blob_entry.GetValue()); // Update the counter for the number of valid blobs read from the cache. ++cached_blob_count; - total_bytes += value_sizes[i]; + // For consistency, the size of each on-disk (possibly compressed) blob + // record is accumulated to total_bytes. + uint64_t adjustment = + read_options.verify_checksums + ? BlobLogRecord::CalculateAdjustmentForRecordHeader( + user_keys[i].get().size()) + : 0; + assert(offsets[i] >= adjustment); + total_bytes += value_sizes[i] + adjustment; cache_hit_mask |= (Mask{1} << i); // cache hit } } @@ -266,7 +282,6 @@ void BlobSource::MultiGetBlob( CachableEntry blob_entry; const CacheKey cache_key = base_cache_key.WithOffset(_offsets[i]); const Slice key = cache_key.AsSlice(); - s = PutBlobIntoCache(key, &blob_entry, _blobs[i]); if (!s.ok()) { *_statuses[i] = s; @@ -279,8 +294,6 @@ void BlobSource::MultiGetBlob( if (bytes_read) { *bytes_read = total_bytes; } - - RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, _bytes_read); } } diff --git a/db/blob/blob_source.h b/db/blob/blob_source.h index 0d32e8060..ffce13b10 100644 --- a/db/blob/blob_source.h +++ b/db/blob/blob_source.h @@ -37,13 +37,33 @@ class BlobSource { ~BlobSource(); + // Read a blob from the underlying cache or storage. + // + // If successful, returns ok and sets "*value" to the newly retrieved + // uncompressed blob. If there was an error while fetching the blob, sets + // "*value" to empty and returns a non-ok status. + // + // Note: For consistency, whether the blob is found in the cache or on disk, + // sets "*bytes_read" to the size of on-disk (possibly compressed) blob + // record. Status GetBlob(const ReadOptions& read_options, const Slice& user_key, uint64_t file_number, uint64_t offset, uint64_t file_size, uint64_t value_size, CompressionType compression_type, FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read); - // Offsets must be sorted in ascending order by caller. + // Read multiple blobs from the underlying cache or storage. + // + // If successful, returns ok and sets the elements of blobs to the newly + // retrieved uncompressed blobs. If there was an error while fetching one of + // blobs, sets its corresponding "blobs[i]" to empty and sets "statuses[i]" to + // a non-ok status. + // + // Note: + // - Offsets must be sorted in ascending order by caller. + // - For consistency, whether the blob is found in the cache or on disk, sets + // "*bytes_read" to the total size of on-disk (possibly compressed) blob + // records. void MultiGetBlob( const ReadOptions& read_options, const autovector>& user_keys, diff --git a/db/blob/blob_source_test.cc b/db/blob/blob_source_test.cc index 7bd650b14..491077c8b 100644 --- a/db/blob/blob_source_test.cc +++ b/db/blob/blob_source_test.cc @@ -12,6 +12,7 @@ #include #include "db/blob/blob_file_cache.h" +#include "db/blob/blob_file_reader.h" #include "db/blob/blob_log_format.h" #include "db/blob/blob_log_writer.h" #include "db/db_test_util.h" @@ -108,33 +109,34 @@ class BlobSourceTest : public DBTestBase { protected: public: explicit BlobSourceTest() - : DBTestBase("blob_source_test", /*env_do_fsync=*/true) {} + : DBTestBase("blob_source_test", /*env_do_fsync=*/true) { + options_.env = env_; + options_.enable_blob_files = true; + options_.create_if_missing = true; + + LRUCacheOptions co; + co.capacity = 2048; + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + options_.blob_cache = NewLRUCache(co); + options_.lowest_used_cache_tier = CacheTier::kVolatileTier; + + assert(db_->GetDbIdentity(db_id_).ok()); + assert(db_->GetDbSessionId(db_session_id_).ok()); + } + + Options options_; + std::string db_id_; + std::string db_session_id_; }; TEST_F(BlobSourceTest, GetBlobsFromCache) { - Options options; - options.env = env_; - options.cf_paths.emplace_back( + options_.cf_paths.emplace_back( test::PerThreadDBPath(env_, "BlobSourceTest_GetBlobsFromCache"), 0); - options.enable_blob_files = true; - options.create_if_missing = true; - - LRUCacheOptions co; - co.capacity = 2048; - co.num_shard_bits = 2; - co.metadata_charge_policy = kDontChargeCacheMetadata; - options.blob_cache = NewLRUCache(co); - options.lowest_used_cache_tier = CacheTier::kVolatileTier; - DestroyAndReopen(options); + DestroyAndReopen(options_); - std::string db_id; - ASSERT_OK(db_->GetDbIdentity(db_id)); - - std::string db_session_id; - ASSERT_OK(db_->GetDbSessionId(db_session_id)); - - ImmutableOptions immutable_options(options); + ImmutableOptions immutable_options(options_); constexpr uint32_t column_family_id = 1; constexpr bool has_ttl = false; @@ -179,7 +181,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { backing_cache.get(), &immutable_options, &file_options, column_family_id, blob_file_read_hist, nullptr /*IOTracer*/)); - BlobSource blob_source(&immutable_options, db_id, db_session_id, + BlobSource blob_source(&immutable_options, db_id_, db_session_id_, blob_file_cache.get()); ReadOptions read_options; @@ -204,7 +206,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { &bytes_read)); ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(bytes_read, - blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); @@ -222,7 +224,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { &bytes_read)); ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(bytes_read, - blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); @@ -239,7 +241,8 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { kNoCompression, prefetch_buffer, &values[i], &bytes_read)); ASSERT_EQ(values[i], blobs[i]); - ASSERT_EQ(bytes_read, blob_sizes[i]); + ASSERT_EQ(bytes_read, + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); @@ -257,14 +260,15 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { kNoCompression, prefetch_buffer, &values[i], &bytes_read)); ASSERT_EQ(values[i], blobs[i]); - ASSERT_EQ(bytes_read, blob_sizes[i]); + ASSERT_EQ(bytes_read, + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } } - options.blob_cache->EraseUnRefEntries(); + options_.blob_cache->EraseUnRefEntries(); { // Cache-only GetBlob @@ -320,30 +324,131 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { } } -TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { - Options options; - options.env = env_; - options.cf_paths.emplace_back( - test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0); - options.enable_blob_files = true; - options.create_if_missing = true; +TEST_F(BlobSourceTest, GetCompressedBlobs) { + if (!Snappy_Supported()) { + return; + } + + const CompressionType compression = kSnappyCompression; + + options_.cf_paths.emplace_back( + test::PerThreadDBPath(env_, "BlobSourceTest_GetCompressedBlobs"), 0); + + DestroyAndReopen(options_); + + ImmutableOptions immutable_options(options_); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr size_t num_blobs = 256; + + std::vector key_strs; + std::vector blob_strs; + + for (size_t i = 0; i < num_blobs; ++i) { + key_strs.push_back("key" + std::to_string(i)); + blob_strs.push_back("blob" + std::to_string(i)); + } + + std::vector keys; + std::vector blobs; + + for (size_t i = 0; i < num_blobs; ++i) { + keys.push_back({key_strs[i]}); + blobs.push_back({blob_strs[i]}); + } + + std::vector blob_offsets(keys.size()); + std::vector blob_sizes(keys.size()); + + constexpr size_t capacity = 1024; + auto backing_cache = NewLRUCache(capacity); // Blob file cache + + FileOptions file_options; + std::unique_ptr blob_file_cache(new BlobFileCache( + backing_cache.get(), &immutable_options, &file_options, column_family_id, + nullptr /*HistogramImpl*/, nullptr /*IOTracer*/)); + + BlobSource blob_source(&immutable_options, db_id_, db_session_id_, + blob_file_cache.get()); + + ReadOptions read_options; + read_options.verify_checksums = true; + + uint64_t bytes_read = 0; + std::vector values(keys.size()); + + { + // Snappy Compression + const uint64_t file_number = 1; + + read_options.read_tier = ReadTier::kReadAllTier; + + WriteBlobFile(immutable_options, column_family_id, has_ttl, + expiration_range, expiration_range, file_number, keys, blobs, + compression, blob_offsets, blob_sizes); + + CacheHandleGuard blob_file_reader; + ASSERT_OK(blob_source.GetBlobFileReader(file_number, &blob_file_reader)); + ASSERT_NE(blob_file_reader.GetValue(), nullptr); + + const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize(); + ASSERT_EQ(blob_file_reader.GetValue()->GetCompressionType(), compression); + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_NE(blobs[i].size() /*uncompressed size*/, + blob_sizes[i] /*compressed size*/); + } + + read_options.fill_cache = true; + read_options.read_tier = ReadTier::kReadAllTier; + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + ASSERT_OK(blob_source.GetBlob(read_options, keys[i], file_number, + blob_offsets[i], file_size, blob_sizes[i], + compression, nullptr /*prefetch_buffer*/, + &values[i], &bytes_read)); + ASSERT_EQ(values[i], blobs[i] /*uncompressed blob*/); + ASSERT_NE(values[i].size(), blob_sizes[i] /*compressed size*/); + ASSERT_EQ(bytes_read, + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); - LRUCacheOptions co; - co.capacity = 2048; - co.num_shard_bits = 2; - co.metadata_charge_policy = kDontChargeCacheMetadata; - options.blob_cache = NewLRUCache(co); - options.lowest_used_cache_tier = CacheTier::kVolatileTier; + ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + } + + read_options.read_tier = ReadTier::kBlockCacheTier; - DestroyAndReopen(options); + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); - std::string db_id; - ASSERT_OK(db_->GetDbIdentity(db_id)); + // Compressed blob size is passed in GetBlob + ASSERT_OK(blob_source.GetBlob(read_options, keys[i], file_number, + blob_offsets[i], file_size, blob_sizes[i], + compression, nullptr /*prefetch_buffer*/, + &values[i], &bytes_read)); + ASSERT_EQ(values[i], blobs[i] /*uncompressed blob*/); + ASSERT_NE(values[i].size(), blob_sizes[i] /*compressed size*/); + ASSERT_EQ(bytes_read, + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); + + ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + } + } +} + +TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { + options_.cf_paths.emplace_back( + test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0); - std::string db_session_id; - ASSERT_OK(db_->GetDbSessionId(db_session_id)); + DestroyAndReopen(options_); - ImmutableOptions immutable_options(options); + ImmutableOptions immutable_options(options_); constexpr uint32_t column_family_id = 1; constexpr bool has_ttl = false; @@ -388,7 +493,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { backing_cache.get(), &immutable_options, &file_options, column_family_id, blob_file_read_hist, nullptr /*IOTracer*/)); - BlobSource blob_source(&immutable_options, db_id, db_session_id, + BlobSource blob_source(&immutable_options, db_id_, db_session_id_, blob_file_cache.get()); ReadOptions read_options; @@ -451,7 +556,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { &value_buf[i], &bytes_read)); ASSERT_EQ(value_buf[i], blobs[i]); ASSERT_EQ(bytes_read, - blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); @@ -485,7 +590,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { } } - options.blob_cache->EraseUnRefEntries(); + options_.blob_cache->EraseUnRefEntries(); { // Cache-only MultiGetBlob