// Copyright (c) Meta Platforms, Inc. and affiliates. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "db/blob/blob_source.h" #include #include #include #include #include #include "db/blob/blob_file_cache.h" #include "db/blob/blob_log_format.h" #include "db/blob/blob_log_writer.h" #include "db/db_test_util.h" #include "file/filename.h" #include "file/read_write_util.h" #include "options/cf_options.h" #include "rocksdb/options.h" #include "util/compression.h" namespace ROCKSDB_NAMESPACE { namespace { // Creates a test blob file with `num` blobs in it. void WriteBlobFile(const ImmutableOptions& immutable_options, uint32_t column_family_id, bool has_ttl, const ExpirationRange& expiration_range_header, const ExpirationRange& expiration_range_footer, uint64_t blob_file_number, const std::vector& keys, const std::vector& blobs, CompressionType compression, std::vector& blob_offsets, std::vector& blob_sizes) { assert(!immutable_options.cf_paths.empty()); size_t num = keys.size(); assert(num == blobs.size()); assert(num == blob_offsets.size()); assert(num == blob_sizes.size()); const std::string blob_file_path = BlobFileName(immutable_options.cf_paths.front().path, blob_file_number); std::unique_ptr file; ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file, FileOptions())); std::unique_ptr file_writer(new WritableFileWriter( std::move(file), blob_file_path, FileOptions(), immutable_options.clock)); constexpr Statistics* statistics = nullptr; constexpr bool use_fsync = false; constexpr bool do_flush = false; BlobLogWriter blob_log_writer(std::move(file_writer), immutable_options.clock, statistics, blob_file_number, use_fsync, do_flush); BlobLogHeader header(column_family_id, compression, has_ttl, expiration_range_header); ASSERT_OK(blob_log_writer.WriteHeader(header)); std::vector compressed_blobs(num); std::vector blobs_to_write(num); if (kNoCompression == compression) { for (size_t i = 0; i < num; ++i) { blobs_to_write[i] = blobs[i]; blob_sizes[i] = blobs[i].size(); } } else { CompressionOptions opts; CompressionContext context(compression); constexpr uint64_t sample_for_compression = 0; CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), compression, sample_for_compression); constexpr uint32_t compression_format_version = 2; for (size_t i = 0; i < num; ++i) { ASSERT_TRUE(CompressData(blobs[i], info, compression_format_version, &compressed_blobs[i])); blobs_to_write[i] = compressed_blobs[i]; blob_sizes[i] = compressed_blobs[i].size(); } } for (size_t i = 0; i < num; ++i) { uint64_t key_offset = 0; ASSERT_OK(blob_log_writer.AddRecord(keys[i], blobs_to_write[i], &key_offset, &blob_offsets[i])); } BlobLogFooter footer; footer.blob_count = num; footer.expiration_range = expiration_range_footer; std::string checksum_method; std::string checksum_value; ASSERT_OK( blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value)); } } // anonymous namespace class BlobSourceTest : public DBTestBase { protected: public: explicit BlobSourceTest() : DBTestBase("blob_source_test", /*env_do_fsync=*/true) {} }; TEST_F(BlobSourceTest, GetBlobsFromCache) { Options options; options.env = env_; options.cf_paths.emplace_back( test::PerThreadDBPath(env_, "BlobSourceTest_GetBlobsFromCache"), 0); options.enable_blob_files = true; options.create_if_missing = true; LRUCacheOptions co; co.capacity = 2048; co.num_shard_bits = 2; co.metadata_charge_policy = kDontChargeCacheMetadata; options.blob_cache = NewLRUCache(co); options.lowest_used_cache_tier = CacheTier::kVolatileTier; DestroyAndReopen(options); std::string db_id; ASSERT_OK(db_->GetDbIdentity(db_id)); std::string db_session_id; ASSERT_OK(db_->GetDbSessionId(db_session_id)); ImmutableOptions immutable_options(options); constexpr uint32_t column_family_id = 1; constexpr bool has_ttl = false; constexpr ExpirationRange expiration_range; constexpr uint64_t blob_file_number = 1; constexpr size_t num_blobs = 16; std::vector key_strs; std::vector blob_strs; for (size_t i = 0; i < num_blobs; ++i) { key_strs.push_back("key" + std::to_string(i)); blob_strs.push_back("blob" + std::to_string(i)); } std::vector keys; std::vector blobs; uint64_t file_size = BlobLogHeader::kSize; for (size_t i = 0; i < num_blobs; ++i) { keys.push_back({key_strs[i]}); blobs.push_back({blob_strs[i]}); file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size(); } file_size += BlobLogFooter::kSize; std::vector blob_offsets(keys.size()); std::vector blob_sizes(keys.size()); WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range, expiration_range, blob_file_number, keys, blobs, kNoCompression, blob_offsets, blob_sizes); constexpr size_t capacity = 1024; std::shared_ptr backing_cache = NewLRUCache(capacity); // Blob file cache FileOptions file_options; constexpr HistogramImpl* blob_file_read_hist = nullptr; std::unique_ptr blob_file_cache(new BlobFileCache( backing_cache.get(), &immutable_options, &file_options, column_family_id, blob_file_read_hist, nullptr /*IOTracer*/)); BlobSource blob_source(&immutable_options, db_id, db_session_id, blob_file_cache.get()); ReadOptions read_options; read_options.verify_checksums = true; constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; { // GetBlob std::vector values(keys.size()); uint64_t bytes_read = 0; read_options.fill_cache = false; for (size_t i = 0; i < num_blobs; ++i) { ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, blob_offsets[i], file_size, blob_sizes[i], kNoCompression, prefetch_buffer, &values[i], &bytes_read)); ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(bytes_read, blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } read_options.fill_cache = true; for (size_t i = 0; i < num_blobs; ++i) { ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, blob_offsets[i], file_size, blob_sizes[i], kNoCompression, prefetch_buffer, &values[i], &bytes_read)); ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(bytes_read, blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } read_options.fill_cache = true; for (size_t i = 0; i < num_blobs; ++i) { ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, blob_offsets[i], file_size, blob_sizes[i], kNoCompression, prefetch_buffer, &values[i], &bytes_read)); ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(bytes_read, blob_sizes[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } // Cache-only GetBlob read_options.read_tier = ReadTier::kBlockCacheTier; for (size_t i = 0; i < num_blobs; ++i) { ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, blob_offsets[i], file_size, blob_sizes[i], kNoCompression, prefetch_buffer, &values[i], &bytes_read)); ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(bytes_read, blob_sizes[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } } options.blob_cache->EraseUnRefEntries(); { // Cache-only GetBlob std::vector values(keys.size()); uint64_t bytes_read = 0; read_options.read_tier = ReadTier::kBlockCacheTier; read_options.fill_cache = true; for (size_t i = 0; i < num_blobs; ++i) { ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); ASSERT_TRUE(blob_source .GetBlob(read_options, keys[i], blob_file_number, blob_offsets[i], file_size, blob_sizes[i], kNoCompression, prefetch_buffer, &values[i], &bytes_read) .IsIncomplete()); ASSERT_TRUE(values[i].empty()); ASSERT_EQ(bytes_read, 0); ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } } { // GetBlob from non-existing file std::vector values(keys.size()); uint64_t bytes_read = 0; uint64_t file_number = 100; // non-existing file read_options.read_tier = ReadTier::kReadAllTier; read_options.fill_cache = true; for (size_t i = 0; i < num_blobs; ++i) { ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[i])); ASSERT_TRUE(blob_source .GetBlob(read_options, keys[i], file_number, blob_offsets[i], file_size, blob_sizes[i], kNoCompression, prefetch_buffer, &values[i], &bytes_read) .IsIOError()); ASSERT_TRUE(values[i].empty()); ASSERT_EQ(bytes_read, 0); ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[i])); } } } TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { Options options; options.env = env_; options.cf_paths.emplace_back( test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0); options.enable_blob_files = true; options.create_if_missing = true; LRUCacheOptions co; co.capacity = 2048; co.num_shard_bits = 2; co.metadata_charge_policy = kDontChargeCacheMetadata; options.blob_cache = NewLRUCache(co); options.lowest_used_cache_tier = CacheTier::kVolatileTier; DestroyAndReopen(options); std::string db_id; ASSERT_OK(db_->GetDbIdentity(db_id)); std::string db_session_id; ASSERT_OK(db_->GetDbSessionId(db_session_id)); ImmutableOptions immutable_options(options); constexpr uint32_t column_family_id = 1; constexpr bool has_ttl = false; constexpr ExpirationRange expiration_range; constexpr uint64_t blob_file_number = 1; constexpr size_t num_blobs = 16; std::vector key_strs; std::vector blob_strs; for (size_t i = 0; i < num_blobs; ++i) { key_strs.push_back("key" + std::to_string(i)); blob_strs.push_back("blob" + std::to_string(i)); } std::vector keys; std::vector blobs; uint64_t file_size = BlobLogHeader::kSize; for (size_t i = 0; i < num_blobs; ++i) { keys.push_back({key_strs[i]}); blobs.push_back({blob_strs[i]}); file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size(); } file_size += BlobLogFooter::kSize; std::vector blob_offsets(keys.size()); std::vector blob_sizes(keys.size()); WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range, expiration_range, blob_file_number, keys, blobs, kNoCompression, blob_offsets, blob_sizes); constexpr size_t capacity = 10; std::shared_ptr backing_cache = NewLRUCache(capacity); // Blob file cache FileOptions file_options; constexpr HistogramImpl* blob_file_read_hist = nullptr; std::unique_ptr blob_file_cache(new BlobFileCache( backing_cache.get(), &immutable_options, &file_options, column_family_id, blob_file_read_hist, nullptr /*IOTracer*/)); BlobSource blob_source(&immutable_options, db_id, db_session_id, blob_file_cache.get()); ReadOptions read_options; read_options.verify_checksums = true; constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; { // MultiGetBlob uint64_t bytes_read = 0; autovector> key_refs; autovector offsets; autovector sizes; std::array statuses_buf; autovector statuses; std::array value_buf; autovector values; for (size_t i = 0; i < num_blobs; i += 2) { // even index key_refs.emplace_back(std::cref(keys[i])); offsets.push_back(blob_offsets[i]); sizes.push_back(blob_sizes[i]); statuses.push_back(&statuses_buf[i]); values.push_back(&value_buf[i]); ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } read_options.fill_cache = true; read_options.read_tier = ReadTier::kReadAllTier; // Get half of blobs blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, file_size, offsets, sizes, statuses, values, &bytes_read); for (size_t i = 0; i < num_blobs; ++i) { if (i % 2 == 0) { ASSERT_OK(statuses_buf[i]); ASSERT_EQ(value_buf[i], blobs[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } else { statuses_buf[i].PermitUncheckedError(); ASSERT_TRUE(value_buf[i].empty()); ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } } // Get the rest of blobs for (size_t i = 1; i < num_blobs; i += 2) { // odd index ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, blob_offsets[i], file_size, blob_sizes[i], kNoCompression, prefetch_buffer, &value_buf[i], &bytes_read)); ASSERT_EQ(value_buf[i], blobs[i]); ASSERT_EQ(bytes_read, blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } // Cache-only MultiGetBlob read_options.read_tier = ReadTier::kBlockCacheTier; key_refs.clear(); offsets.clear(); sizes.clear(); statuses.clear(); values.clear(); for (size_t i = 0; i < num_blobs; ++i) { key_refs.emplace_back(std::cref(keys[i])); offsets.push_back(blob_offsets[i]); sizes.push_back(blob_sizes[i]); statuses.push_back(&statuses_buf[i]); values.push_back(&value_buf[i]); } blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, file_size, offsets, sizes, statuses, values, &bytes_read); for (size_t i = 0; i < num_blobs; ++i) { ASSERT_OK(statuses_buf[i]); ASSERT_EQ(value_buf[i], blobs[i]); ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } } options.blob_cache->EraseUnRefEntries(); { // Cache-only MultiGetBlob uint64_t bytes_read = 0; read_options.read_tier = ReadTier::kBlockCacheTier; autovector> key_refs; autovector offsets; autovector sizes; std::array statuses_buf; autovector statuses; std::array value_buf; autovector values; for (size_t i = 0; i < num_blobs; i++) { key_refs.emplace_back(std::cref(keys[i])); offsets.push_back(blob_offsets[i]); sizes.push_back(blob_sizes[i]); statuses.push_back(&statuses_buf[i]); values.push_back(&value_buf[i]); ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } blob_source.MultiGetBlob(read_options, key_refs, blob_file_number, file_size, offsets, sizes, statuses, values, &bytes_read); for (size_t i = 0; i < num_blobs; ++i) { ASSERT_TRUE(statuses_buf[i].IsIncomplete()); ASSERT_TRUE(value_buf[i].empty()); ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, blob_offsets[i])); } } { // MultiGetBlob from non-existing file uint64_t bytes_read = 0; uint64_t file_number = 100; // non-existing file read_options.read_tier = ReadTier::kReadAllTier; autovector> key_refs; autovector offsets; autovector sizes; std::array statuses_buf; autovector statuses; std::array value_buf; autovector values; for (size_t i = 0; i < num_blobs; i++) { key_refs.emplace_back(std::cref(keys[i])); offsets.push_back(blob_offsets[i]); sizes.push_back(blob_sizes[i]); statuses.push_back(&statuses_buf[i]); values.push_back(&value_buf[i]); ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[i])); } blob_source.MultiGetBlob(read_options, key_refs, file_number, file_size, offsets, sizes, statuses, values, &bytes_read); for (size_t i = 0; i < num_blobs; ++i) { ASSERT_TRUE(statuses_buf[i].IsIOError()); ASSERT_TRUE(value_buf[i].empty()); ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[i])); } } } } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }