From a1eb02f0896ac62a7c7a576bc8d59200cc32bb07 Mon Sep 17 00:00:00 2001 From: Gang Liao Date: Mon, 27 Jun 2022 17:15:21 -0700 Subject: [PATCH] Change the semantics of bytes_read in GetBlob/MultiGetBlob for consistency (#10248) Summary: The `bytes_read` returned by the current BlobSource interface is ambiguous. The uncompressed blob size is returned if the cache hits. The size of the blob read from disk, presumably the compressed version, is returned if the cache misses. Two differing semantics might cause ambiguity and consistency issues. For example, this inconsistency causes the assertion failure (T124246362 and its hot fix is https://github.com/facebook/rocksdb/issues/10249). This goal is to require that the value of `byte read` always be an on-disk blob record size. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10248 Reviewed By: ltamasi Differential Revision: D37470292 Pulled By: gangliao fbshipit-source-id: fbca521b2791d3674dbf2484cea5fcae2fdd94d2 --- db/blob/blob_file_reader.cc | 46 ++++---- db/blob/blob_source.cc | 25 +++-- db/blob/blob_source.h | 22 +++- db/blob/blob_source_test.cc | 203 +++++++++++++++++++++++++++--------- 4 files changed, 212 insertions(+), 84 deletions(-) diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc index 3651bb4a7..751a677d9 100644 --- a/db/blob/blob_file_reader.cc +++ b/db/blob/blob_file_reader.cc @@ -443,50 +443,40 @@ void BlobFileReader::MultiGetBlob( } assert(s.ok()); + + uint64_t total_bytes = 0; for (size_t i = 0; i < num_blobs; ++i) { auto& req = read_reqs[i]; + const auto& record_slice = req.result; + assert(statuses[i]); - if (req.status.ok() && req.result.size() != req.len) { + if (req.status.ok() && record_slice.size() != req.len) { req.status = IOStatus::Corruption("Failed to read data from blob file"); } + *statuses[i] = req.status; - } + if (!statuses[i]->ok()) { + continue; + } - if (read_options.verify_checksums) { - for (size_t i = 0; i < num_blobs; ++i) { - assert(statuses[i]); + // Verify checksums if enabled + if (read_options.verify_checksums) { + *statuses[i] = VerifyBlob(record_slice, user_keys[i], value_sizes[i]); if (!statuses[i]->ok()) { continue; } - const Slice& record_slice = read_reqs[i].result; - s = VerifyBlob(record_slice, user_keys[i], value_sizes[i]); - if (!s.ok()) { - assert(statuses[i]); - *statuses[i] = s; - } } - } - for (size_t i = 0; i < num_blobs; ++i) { - assert(statuses[i]); - if (!statuses[i]->ok()) { - continue; - } - const Slice& record_slice = read_reqs[i].result; - const Slice value_slice(record_slice.data() + adjustments[i], - value_sizes[i]); - s = UncompressBlobIfNeeded(value_slice, compression_type_, clock_, - statistics_, values[i]); - if (!s.ok()) { - *statuses[i] = s; + // Uncompress blob if needed + Slice value_slice(record_slice.data() + adjustments[i], value_sizes[i]); + *statuses[i] = UncompressBlobIfNeeded(value_slice, compression_type_, + clock_, statistics_, values[i]); + if (statuses[i]->ok()) { + total_bytes += record_slice.size(); } } if (bytes_read) { - uint64_t total_bytes = 0; - for (const auto& req : read_reqs) { - total_bytes += req.result.size(); - } *bytes_read = total_bytes; } } diff --git a/db/blob/blob_source.cc b/db/blob/blob_source.cc index cde76c305..c066e8d85 100644 --- a/db/blob/blob_source.cc +++ b/db/blob/blob_source.cc @@ -9,6 +9,7 @@ #include #include "db/blob/blob_file_reader.h" +#include "db/blob/blob_log_format.h" #include "options/cf_options.h" #include "table/multiget_context.h" @@ -99,8 +100,16 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, Slice key = cache_key.AsSlice(); s = GetBlobFromCache(key, &blob_entry); if (s.ok() && blob_entry.GetValue()) { + // For consistency, the size of on-disk (possibly compressed) blob record + // is assigned to bytes_read. if (bytes_read) { - *bytes_read = value_size; + uint64_t adjustment = + read_options.verify_checksums + ? BlobLogRecord::CalculateAdjustmentForRecordHeader( + user_key.size()) + : 0; + assert(offset >= adjustment); + *bytes_read = value_size + adjustment; } value->PinSelf(*blob_entry.GetValue()); return s; @@ -191,13 +200,20 @@ void BlobSource::MultiGetBlob( s = GetBlobFromCache(key, &blob_entry); if (s.ok() && blob_entry.GetValue()) { assert(statuses[i]); - *statuses[i] = s; blobs[i]->PinSelf(*blob_entry.GetValue()); // Update the counter for the number of valid blobs read from the cache. ++cached_blob_count; - total_bytes += value_sizes[i]; + // For consistency, the size of each on-disk (possibly compressed) blob + // record is accumulated to total_bytes. + uint64_t adjustment = + read_options.verify_checksums + ? BlobLogRecord::CalculateAdjustmentForRecordHeader( + user_keys[i].get().size()) + : 0; + assert(offsets[i] >= adjustment); + total_bytes += value_sizes[i] + adjustment; cache_hit_mask |= (Mask{1} << i); // cache hit } } @@ -266,7 +282,6 @@ void BlobSource::MultiGetBlob( CachableEntry blob_entry; const CacheKey cache_key = base_cache_key.WithOffset(_offsets[i]); const Slice key = cache_key.AsSlice(); - s = PutBlobIntoCache(key, &blob_entry, _blobs[i]); if (!s.ok()) { *_statuses[i] = s; @@ -279,8 +294,6 @@ void BlobSource::MultiGetBlob( if (bytes_read) { *bytes_read = total_bytes; } - - RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, _bytes_read); } } diff --git a/db/blob/blob_source.h b/db/blob/blob_source.h index 0d32e8060..ffce13b10 100644 --- a/db/blob/blob_source.h +++ b/db/blob/blob_source.h @@ -37,13 +37,33 @@ class BlobSource { ~BlobSource(); + // Read a blob from the underlying cache or storage. + // + // If successful, returns ok and sets "*value" to the newly retrieved + // uncompressed blob. If there was an error while fetching the blob, sets + // "*value" to empty and returns a non-ok status. + // + // Note: For consistency, whether the blob is found in the cache or on disk, + // sets "*bytes_read" to the size of on-disk (possibly compressed) blob + // record. Status GetBlob(const ReadOptions& read_options, const Slice& user_key, uint64_t file_number, uint64_t offset, uint64_t file_size, uint64_t value_size, CompressionType compression_type, FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read); - // Offsets must be sorted in ascending order by caller. + // Read multiple blobs from the underlying cache or storage. + // + // If successful, returns ok and sets the elements of blobs to the newly + // retrieved uncompressed blobs. If there was an error while fetching one of + // blobs, sets its corresponding "blobs[i]" to empty and sets "statuses[i]" to + // a non-ok status. + // + // Note: + // - Offsets must be sorted in ascending order by caller. + // - For consistency, whether the blob is found in the cache or on disk, sets + // "*bytes_read" to the total size of on-disk (possibly compressed) blob + // records. void MultiGetBlob( const ReadOptions& read_options, const autovector>& user_keys, diff --git a/db/blob/blob_source_test.cc b/db/blob/blob_source_test.cc index 7bd650b14..491077c8b 100644 --- a/db/blob/blob_source_test.cc +++ b/db/blob/blob_source_test.cc @@ -12,6 +12,7 @@ #include #include "db/blob/blob_file_cache.h" +#include "db/blob/blob_file_reader.h" #include "db/blob/blob_log_format.h" #include "db/blob/blob_log_writer.h" #include "db/db_test_util.h" @@ -108,33 +109,34 @@ class BlobSourceTest : public DBTestBase { protected: public: explicit BlobSourceTest() - : DBTestBase("blob_source_test", /*env_do_fsync=*/true) {} + : DBTestBase("blob_source_test", /*env_do_fsync=*/true) { + options_.env = env_; + options_.enable_blob_files = true; + options_.create_if_missing = true; + + LRUCacheOptions co; + co.capacity = 2048; + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + options_.blob_cache = NewLRUCache(co); + options_.lowest_used_cache_tier = CacheTier::kVolatileTier; + + assert(db_->GetDbIdentity(db_id_).ok()); + assert(db_->GetDbSessionId(db_session_id_).ok()); + } + + Options options_; + std::string db_id_; + std::string db_session_id_; }; TEST_F(BlobSourceTest, GetBlobsFromCache) { - Options options; - options.env = env_; - options.cf_paths.emplace_back( + options_.cf_paths.emplace_back( test::PerThreadDBPath(env_, "BlobSourceTest_GetBlobsFromCache"), 0); - options.enable_blob_files = true; - options.create_if_missing = true; - - LRUCacheOptions co; - co.capacity = 2048; - co.num_shard_bits = 2; - co.metadata_charge_policy = kDontChargeCacheMetadata; - options.blob_cache = NewLRUCache(co); - options.lowest_used_cache_tier = CacheTier::kVolatileTier; - DestroyAndReopen(options); + DestroyAndReopen(options_); - std::string db_id; - ASSERT_OK(db_->GetDbIdentity(db_id)); - - std::string db_session_id; - ASSERT_OK(db_->GetDbSessionId(db_session_id)); - - ImmutableOptions immutable_options(options); + ImmutableOptions immutable_options(options_); constexpr uint32_t column_family_id = 1; constexpr bool has_ttl = false; @@ -179,7 +181,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { backing_cache.get(), &immutable_options, &file_options, column_family_id, blob_file_read_hist, nullptr /*IOTracer*/)); - BlobSource blob_source(&immutable_options, db_id, db_session_id, + BlobSource blob_source(&immutable_options, db_id_, db_session_id_, blob_file_cache.get()); ReadOptions read_options; @@ -204,7 +206,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { &bytes_read)); ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(bytes_read, - blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); @@ -222,7 +224,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { &bytes_read)); ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(bytes_read, - blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); @@ -239,7 +241,8 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { kNoCompression, prefetch_buffer, &values[i], &bytes_read)); ASSERT_EQ(values[i], blobs[i]); - ASSERT_EQ(bytes_read, blob_sizes[i]); + ASSERT_EQ(bytes_read, + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); @@ -257,14 +260,15 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { kNoCompression, prefetch_buffer, &values[i], &bytes_read)); ASSERT_EQ(values[i], blobs[i]); - ASSERT_EQ(bytes_read, blob_sizes[i]); + ASSERT_EQ(bytes_read, + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } } - options.blob_cache->EraseUnRefEntries(); + options_.blob_cache->EraseUnRefEntries(); { // Cache-only GetBlob @@ -320,30 +324,131 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) { } } -TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { - Options options; - options.env = env_; - options.cf_paths.emplace_back( - test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0); - options.enable_blob_files = true; - options.create_if_missing = true; +TEST_F(BlobSourceTest, GetCompressedBlobs) { + if (!Snappy_Supported()) { + return; + } + + const CompressionType compression = kSnappyCompression; + + options_.cf_paths.emplace_back( + test::PerThreadDBPath(env_, "BlobSourceTest_GetCompressedBlobs"), 0); + + DestroyAndReopen(options_); + + ImmutableOptions immutable_options(options_); + + constexpr uint32_t column_family_id = 1; + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + constexpr size_t num_blobs = 256; + + std::vector key_strs; + std::vector blob_strs; + + for (size_t i = 0; i < num_blobs; ++i) { + key_strs.push_back("key" + std::to_string(i)); + blob_strs.push_back("blob" + std::to_string(i)); + } + + std::vector keys; + std::vector blobs; + + for (size_t i = 0; i < num_blobs; ++i) { + keys.push_back({key_strs[i]}); + blobs.push_back({blob_strs[i]}); + } + + std::vector blob_offsets(keys.size()); + std::vector blob_sizes(keys.size()); + + constexpr size_t capacity = 1024; + auto backing_cache = NewLRUCache(capacity); // Blob file cache + + FileOptions file_options; + std::unique_ptr blob_file_cache(new BlobFileCache( + backing_cache.get(), &immutable_options, &file_options, column_family_id, + nullptr /*HistogramImpl*/, nullptr /*IOTracer*/)); + + BlobSource blob_source(&immutable_options, db_id_, db_session_id_, + blob_file_cache.get()); + + ReadOptions read_options; + read_options.verify_checksums = true; + + uint64_t bytes_read = 0; + std::vector values(keys.size()); + + { + // Snappy Compression + const uint64_t file_number = 1; + + read_options.read_tier = ReadTier::kReadAllTier; + + WriteBlobFile(immutable_options, column_family_id, has_ttl, + expiration_range, expiration_range, file_number, keys, blobs, + compression, blob_offsets, blob_sizes); + + CacheHandleGuard blob_file_reader; + ASSERT_OK(blob_source.GetBlobFileReader(file_number, &blob_file_reader)); + ASSERT_NE(blob_file_reader.GetValue(), nullptr); + + const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize(); + ASSERT_EQ(blob_file_reader.GetValue()->GetCompressionType(), compression); + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_NE(blobs[i].size() /*uncompressed size*/, + blob_sizes[i] /*compressed size*/); + } + + read_options.fill_cache = true; + read_options.read_tier = ReadTier::kReadAllTier; + + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + ASSERT_OK(blob_source.GetBlob(read_options, keys[i], file_number, + blob_offsets[i], file_size, blob_sizes[i], + compression, nullptr /*prefetch_buffer*/, + &values[i], &bytes_read)); + ASSERT_EQ(values[i], blobs[i] /*uncompressed blob*/); + ASSERT_NE(values[i].size(), blob_sizes[i] /*compressed size*/); + ASSERT_EQ(bytes_read, + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); - LRUCacheOptions co; - co.capacity = 2048; - co.num_shard_bits = 2; - co.metadata_charge_policy = kDontChargeCacheMetadata; - options.blob_cache = NewLRUCache(co); - options.lowest_used_cache_tier = CacheTier::kVolatileTier; + ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + } + + read_options.read_tier = ReadTier::kBlockCacheTier; - DestroyAndReopen(options); + for (size_t i = 0; i < num_blobs; ++i) { + ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); - std::string db_id; - ASSERT_OK(db_->GetDbIdentity(db_id)); + // Compressed blob size is passed in GetBlob + ASSERT_OK(blob_source.GetBlob(read_options, keys[i], file_number, + blob_offsets[i], file_size, blob_sizes[i], + compression, nullptr /*prefetch_buffer*/, + &values[i], &bytes_read)); + ASSERT_EQ(values[i], blobs[i] /*uncompressed blob*/); + ASSERT_NE(values[i].size(), blob_sizes[i] /*compressed size*/); + ASSERT_EQ(bytes_read, + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); + + ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, + blob_offsets[i])); + } + } +} + +TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { + options_.cf_paths.emplace_back( + test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0); - std::string db_session_id; - ASSERT_OK(db_->GetDbSessionId(db_session_id)); + DestroyAndReopen(options_); - ImmutableOptions immutable_options(options); + ImmutableOptions immutable_options(options_); constexpr uint32_t column_family_id = 1; constexpr bool has_ttl = false; @@ -388,7 +493,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { backing_cache.get(), &immutable_options, &file_options, column_family_id, blob_file_read_hist, nullptr /*IOTracer*/)); - BlobSource blob_source(&immutable_options, db_id, db_session_id, + BlobSource blob_source(&immutable_options, db_id_, db_session_id_, blob_file_cache.get()); ReadOptions read_options; @@ -451,7 +556,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { &value_buf[i], &bytes_read)); ASSERT_EQ(value_buf[i], blobs[i]); ASSERT_EQ(bytes_read, - blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); + BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); @@ -485,7 +590,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { } } - options.blob_cache->EraseUnRefEntries(); + options_.blob_cache->EraseUnRefEntries(); { // Cache-only MultiGetBlob