From d490bfcdb6d1274e378661513ce44beb4a9e3d8e Mon Sep 17 00:00:00 2001 From: Bo Wang Date: Wed, 7 Sep 2022 19:00:27 -0700 Subject: [PATCH] Avoid recompressing cold block in CompressedSecondaryCache (#10527) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: **Summary:** When a block is firstly `Lookup` from the secondary cache, we just insert a dummy block in the primary cache (charging the actual size of the block) and don’t erase the block from the secondary cache. A standalone handle is returned from `Lookup`. Only if the block is hit again, we erase it from the secondary cache and add it into the primary cache. When a block is firstly evicted from the primary cache to the secondary cache, we just insert a dummy block (size 0) in the secondary cache. When the block is evicted again, it is treated as a hot block and is inserted into the secondary cache. **Implementation Details** Add a new state of LRUHandle: The handle is never inserted into the LRUCache (both hash table and LRU list) and it doesn't experience the above three states. The entry can be freed when refs becomes 0. (refs >= 1 && in_cache == false && IS_STANDALONE == true) The behaviors of `LRUCacheShard::Lookup()` are updated if the secondary_cache is CompressedSecondaryCache: 1. If a handle is found in primary cache: 1.1. If the handle's value is not nullptr, it is returned immediately. 1.2. If the handle's value is nullptr, this means the handle is a dummy one. For a dummy handle, if it was retrieved from secondary cache, it may still exist in secondary cache. - 1.2.1. If no valid handle can be `Lookup` from secondary cache, return nullptr. - 1.2.2. If the handle from secondary cache is valid, erase it from the secondary cache and add it into the primary cache. 2. If a handle is not found in primary cache: 2.1. If no valid handle can be `Lookup` from secondary cache, return nullptr. 2.2. If the handle from secondary cache is valid, insert a dummy block in the primary cache (charging the actual size of the block) and return a standalone handle. The behaviors of `LRUCacheShard::Promote()` are updated as follows: 1. If `e->sec_handle` has value, one of the following steps can happen: 1.1. Insert a dummy handle and return a standalone handle to caller when `secondary_cache_` is `CompressedSecondaryCache` and e is a standalone handle. 1.2. Insert the item into the primary cache and return the handle to caller. 1.3. Exception handling. 3. If `e->sec_handle` has no value, mark the item as not in cache and charge the cache as its only metadata that'll shortly be released. The behavior of `CompressedSecondaryCache::Insert()` is updated: 1. If a block is evicted from the primary cache for the first time, a dummy item is inserted. 4. If a dummy item is found for a block, the block is inserted into the secondary cache. The behavior of `CompressedSecondaryCache:::Lookup()` is updated: 1. If a handle is not found or it is a dummy item, a nullptr is returned. 2. If `erase_handle` is true, the handle is erased. The behaviors of `LRUCacheShard::Release()` are adjusted for the standalone handles. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10527 Test Plan: 1. stress tests. 5. unit tests. 6. CPU profiling for db_bench. Reviewed By: siying Differential Revision: D38747613 Pulled By: gitbw95 fbshipit-source-id: 74a1eba7e1957c9affb2bd2ae3e0194584fa6eca --- HISTORY.md | 2 + cache/compressed_secondary_cache.cc | 93 +++--- cache/compressed_secondary_cache.h | 20 +- cache/compressed_secondary_cache_test.cc | 296 ++++++++++++------- cache/lru_cache.cc | 152 +++++++--- cache/lru_cache.h | 22 +- cache/lru_cache_test.cc | 15 +- db/blob/blob_source_test.cc | 59 ++-- db/blob/db_blob_basic_test.cc | 33 ++- include/rocksdb/cache.h | 7 +- include/rocksdb/secondary_cache.h | 23 +- options/customizable_test.cc | 5 +- utilities/fault_injection_secondary_cache.cc | 7 +- utilities/fault_injection_secondary_cache.h | 4 +- 14 files changed, 499 insertions(+), 239 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 75a86e27c..5d9536bb7 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -18,6 +18,8 @@ ### Behavior Change * Right now, when the option migration tool (OptionChangeMigration()) migrates to FIFO compaction, it compacts all the data into one single SST file and move to L0. This might create a problem for some users: the giant file may be soon deleted to satisfy max_table_files_size, and might cayse the DB to be almost empty. We change the behavior so that the files are cut to be smaller, but these files might not follow the data insertion order. With the change, after the migration, migrated data might not be dropped by insertion order by FIFO compaction. +* When a block is firstly found from `CompressedSecondaryCache`, we just insert a dummy block into the primary cache and don’t erase the block from `CompressedSecondaryCache`. A standalone handle is returned to the caller. Only if the block is found again from `CompressedSecondaryCache` before the dummy block is evicted, we erase the block from `CompressedSecondaryCache` and insert it into the primary cache. +* When a block is firstly evicted from the primary cache to `CompressedSecondaryCache`, we just insert a dummy block in `CompressedSecondaryCache`. Only if it is evicted again before the dummy block is evicted from the cache, it is treated as a hot block and is inserted into `CompressedSecondaryCache`. ### New Features * RocksDB does internal auto prefetching if it notices 2 sequential reads if readahead_size is not specified. New option `num_file_reads_for_auto_readahead` is added in BlockBasedTableOptions which indicates after how many sequential reads internal auto prefetching should be start (default is 2). diff --git a/cache/compressed_secondary_cache.cc b/cache/compressed_secondary_cache.cc index a77ceff19..da38db711 100644 --- a/cache/compressed_secondary_cache.cc +++ b/cache/compressed_secondary_cache.cc @@ -22,9 +22,9 @@ CompressedSecondaryCache::CompressedSecondaryCache( CacheMetadataChargePolicy metadata_charge_policy, CompressionType compression_type, uint32_t compress_format_version) : cache_options_(capacity, num_shard_bits, strict_capacity_limit, - high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, - metadata_charge_policy, compression_type, - compress_format_version, low_pri_pool_ratio) { + high_pri_pool_ratio, low_pri_pool_ratio, memory_allocator, + use_adaptive_mutex, metadata_charge_policy, + compression_type, compress_format_version) { cache_ = NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, @@ -35,25 +35,27 @@ CompressedSecondaryCache::~CompressedSecondaryCache() { cache_.reset(); } std::unique_ptr CompressedSecondaryCache::Lookup( const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/, - bool& is_in_sec_cache) { + bool advise_erase, bool& is_in_sec_cache) { std::unique_ptr handle; is_in_sec_cache = false; Cache::Handle* lru_handle = cache_->Lookup(key); if (lru_handle == nullptr) { - return handle; + return nullptr; } - CacheValueChunk* handle_value = - reinterpret_cast(cache_->Value(lru_handle)); - size_t handle_value_charge{0}; - CacheAllocationPtr merged_value = - MergeChunksIntoValue(handle_value, handle_value_charge); + void* handle_value = cache_->Value(lru_handle); + if (handle_value == nullptr) { + cache_->Release(lru_handle, /*erase_if_last_ref=*/false); + return nullptr; + } + + CacheAllocationPtr* ptr = reinterpret_cast(handle_value); Status s; void* value{nullptr}; size_t charge{0}; if (cache_options_.compression_type == kNoCompression) { - s = create_cb(merged_value.get(), handle_value_charge, &value, &charge); + s = create_cb(ptr->get(), cache_->GetCharge(lru_handle), &value, &charge); } else { UncompressionContext uncompression_context(cache_options_.compression_type); UncompressionInfo uncompression_info(uncompression_context, @@ -61,32 +63,51 @@ std::unique_ptr CompressedSecondaryCache::Lookup( cache_options_.compression_type); size_t uncompressed_size{0}; - CacheAllocationPtr uncompressed; - uncompressed = UncompressData(uncompression_info, (char*)merged_value.get(), - handle_value_charge, &uncompressed_size, - cache_options_.compress_format_version, - cache_options_.memory_allocator.get()); + CacheAllocationPtr uncompressed = UncompressData( + uncompression_info, (char*)ptr->get(), cache_->GetCharge(lru_handle), + &uncompressed_size, cache_options_.compress_format_version, + cache_options_.memory_allocator.get()); if (!uncompressed) { - cache_->Release(lru_handle, /* erase_if_last_ref */ true); - return handle; + cache_->Release(lru_handle, /*erase_if_last_ref=*/true); + return nullptr; } s = create_cb(uncompressed.get(), uncompressed_size, &value, &charge); } if (!s.ok()) { - cache_->Release(lru_handle, /* erase_if_last_ref */ true); - return handle; + cache_->Release(lru_handle, /*erase_if_last_ref=*/true); + return nullptr; } - cache_->Release(lru_handle, /* erase_if_last_ref */ true); + if (advise_erase) { + cache_->Release(lru_handle, /*erase_if_last_ref=*/true); + // Insert a dummy handle. + cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, DeletionCallback) + .PermitUncheckedError(); + } else { + is_in_sec_cache = true; + cache_->Release(lru_handle, /*erase_if_last_ref=*/false); + } handle.reset(new CompressedSecondaryCacheResultHandle(value, charge)); - return handle; } Status CompressedSecondaryCache::Insert(const Slice& key, void* value, const Cache::CacheItemHelper* helper) { + if (value == nullptr) { + return Status::InvalidArgument(); + } + + Cache::Handle* lru_handle = cache_->Lookup(key); + if (lru_handle == nullptr) { + // Insert a dummy handle if the handle is evicted for the first time. + return cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, + DeletionCallback); + } else { + cache_->Release(lru_handle, /*erase_if_last_ref=*/false); + } + size_t size = (*helper->size_cb)(value); CacheAllocationPtr ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); @@ -115,12 +136,14 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value, } val = Slice(compressed_val); + size = compressed_val.size(); + ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); + memcpy(ptr.get(), compressed_val.data(), size); } - size_t charge{0}; - CacheValueChunk* value_chunks_head = - SplitValueIntoChunks(val, cache_options_.compression_type, charge); - return cache_->Insert(key, value_chunks_head, charge, DeletionCallback); + CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); + + return cache_->Insert(key, buf, size, DeletionCallback); } void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } @@ -212,22 +235,16 @@ CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue( void CompressedSecondaryCache::DeletionCallback(const Slice& /*key*/, void* obj) { - CacheValueChunk* chunks_head = reinterpret_cast(obj); - while (chunks_head != nullptr) { - CacheValueChunk* tmp_chunk = chunks_head; - chunks_head = chunks_head->next; - tmp_chunk->Free(); - } + delete reinterpret_cast(obj); obj = nullptr; } std::shared_ptr NewCompressedSecondaryCache( size_t capacity, int num_shard_bits, bool strict_capacity_limit, - double high_pri_pool_ratio, + double high_pri_pool_ratio, double low_pri_pool_ratio, std::shared_ptr memory_allocator, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, - CompressionType compression_type, uint32_t compress_format_version, - double low_pri_pool_ratio) { + CompressionType compression_type, uint32_t compress_format_version) { return std::make_shared( capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, low_pri_pool_ratio, memory_allocator, use_adaptive_mutex, @@ -240,9 +257,9 @@ std::shared_ptr NewCompressedSecondaryCache( assert(opts.secondary_cache == nullptr); return NewCompressedSecondaryCache( opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit, - opts.high_pri_pool_ratio, opts.memory_allocator, opts.use_adaptive_mutex, - opts.metadata_charge_policy, opts.compression_type, - opts.compress_format_version, opts.low_pri_pool_ratio); + opts.high_pri_pool_ratio, opts.low_pri_pool_ratio, opts.memory_allocator, + opts.use_adaptive_mutex, opts.metadata_charge_policy, + opts.compression_type, opts.compress_format_version); } } // namespace ROCKSDB_NAMESPACE diff --git a/cache/compressed_secondary_cache.h b/cache/compressed_secondary_cache.h index bc194ee24..996d831e1 100644 --- a/cache/compressed_secondary_cache.h +++ b/cache/compressed_secondary_cache.h @@ -45,7 +45,21 @@ class CompressedSecondaryCacheResultHandle : public SecondaryCacheResultHandle { // The CompressedSecondaryCache is a concrete implementation of // rocksdb::SecondaryCache. // -// Users can also cast a pointer to it and call methods on +// When a block is found from CompressedSecondaryCache::Lookup, we check whether +// there is a dummy block with the same key in the primary cache. +// 1. If the dummy block exits, we erase the block from +// CompressedSecondaryCache and insert it into the primary cache. +// 2. If not, we just insert a dummy block into the primary cache +// (charging the actual size of the block) and don not erase the block from +// CompressedSecondaryCache. A standalone handle is returned to the caller. +// +// When a block is evicted from the primary cache, we check whether +// there is a dummy block with the same key in CompressedSecondaryCache. +// 1. If the dummy block exits, the block is inserted into +// CompressedSecondaryCache. +// 2. If not, we just insert a dummy block (size 0) in CompressedSecondaryCache. +// +// Users can also cast a pointer to CompressedSecondaryCache and call methods on // it directly, especially custom methods that may be added // in the future. For example - // std::unique_ptr cache = @@ -72,7 +86,9 @@ class CompressedSecondaryCache : public SecondaryCache { std::unique_ptr Lookup( const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/, - bool& is_in_sec_cache) override; + bool advise_erase, bool& is_in_sec_cache) override; + + bool SupportForceErase() const override { return true; } void Erase(const Slice& key) override; diff --git a/cache/compressed_secondary_cache_test.cc b/cache/compressed_secondary_cache_test.cc index 4f1d02afa..92df1904e 100644 --- a/cache/compressed_secondary_cache_test.cc +++ b/cache/compressed_secondary_cache_test.cc @@ -87,49 +87,61 @@ class CompressedSecondaryCacheTest : public testing::Test { void BasicTestHelper(std::shared_ptr sec_cache) { bool is_in_sec_cache{true}; // Lookup an non-existent key. - std::unique_ptr handle0 = - sec_cache->Lookup("k0", test_item_creator, true, is_in_sec_cache); + std::unique_ptr handle0 = sec_cache->Lookup( + "k0", test_item_creator, true, /*advise_erase=*/true, is_in_sec_cache); ASSERT_EQ(handle0, nullptr); Random rnd(301); - // Insert and Lookup the first item. - std::string str1; - test::CompressibleString(&rnd, 0.25, 1000, &str1); + // Insert and Lookup the item k1 for the first time. + std::string str1(rnd.RandomString(1000)); TestItem item1(str1.data(), str1.length()); + // A dummy handle is inserted if the item is inserted for the first time. ASSERT_OK(sec_cache->Insert("k1", &item1, &CompressedSecondaryCacheTest::helper_)); - std::unique_ptr handle1 = - sec_cache->Lookup("k1", test_item_creator, true, is_in_sec_cache); - ASSERT_NE(handle1, nullptr); + std::unique_ptr handle1_1 = sec_cache->Lookup( + "k1", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); + ASSERT_EQ(handle1_1, nullptr); + + // Insert and Lookup the item k1 for the second time. + ASSERT_OK(sec_cache->Insert("k1", &item1, + &CompressedSecondaryCacheTest::helper_)); + std::unique_ptr handle1_2 = sec_cache->Lookup( + "k1", test_item_creator, true, /*advise_erase=*/true, is_in_sec_cache); + ASSERT_NE(handle1_2, nullptr); ASSERT_FALSE(is_in_sec_cache); std::unique_ptr val1 = - std::unique_ptr(static_cast(handle1->Value())); + std::unique_ptr(static_cast(handle1_2->Value())); ASSERT_NE(val1, nullptr); ASSERT_EQ(memcmp(val1->Buf(), item1.Buf(), item1.Size()), 0); - // Lookup the first item again. - std::unique_ptr handle1_1 = - sec_cache->Lookup("k1", test_item_creator, true, is_in_sec_cache); - ASSERT_EQ(handle1_1, nullptr); + // Lookup the item k1 again. + std::unique_ptr handle1_3 = sec_cache->Lookup( + "k1", test_item_creator, true, /*advise_erase=*/true, is_in_sec_cache); + ASSERT_EQ(handle1_3, nullptr); - // Insert and Lookup the second item. - std::string str2; - test::CompressibleString(&rnd, 0.5, 1000, &str2); + // Insert and Lookup the item k2. + std::string str2(rnd.RandomString(1000)); TestItem item2(str2.data(), str2.length()); ASSERT_OK(sec_cache->Insert("k2", &item2, &CompressedSecondaryCacheTest::helper_)); - std::unique_ptr handle2 = - sec_cache->Lookup("k2", test_item_creator, true, is_in_sec_cache); - ASSERT_NE(handle2, nullptr); + std::unique_ptr handle2_1 = sec_cache->Lookup( + "k2", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); + ASSERT_EQ(handle2_1, nullptr); + + ASSERT_OK(sec_cache->Insert("k2", &item2, + &CompressedSecondaryCacheTest::helper_)); + std::unique_ptr handle2_2 = sec_cache->Lookup( + "k2", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); + ASSERT_EQ(handle2_1, nullptr); std::unique_ptr val2 = - std::unique_ptr(static_cast(handle2->Value())); + std::unique_ptr(static_cast(handle2_2->Value())); ASSERT_NE(val2, nullptr); ASSERT_EQ(memcmp(val2->Buf(), item2.Buf(), item2.Size()), 0); - std::vector handles = {handle1.get(), - handle2.get()}; + std::vector handles = {handle1_2.get(), + handle2_2.get()}; sec_cache->WaitAll(handles); sec_cache.reset(); @@ -188,36 +200,55 @@ class CompressedSecondaryCacheTest : public testing::Test { Random rnd(301); std::string str1(rnd.RandomString(1000)); TestItem item1(str1.data(), str1.length()); + // Insert a dummy handle. + ASSERT_OK(sec_cache->Insert("k1", &item1, + &CompressedSecondaryCacheTest::helper_)); + // Insert k1. ASSERT_OK(sec_cache->Insert("k1", &item1, &CompressedSecondaryCacheTest::helper_)); // Insert and Lookup the second item. std::string str2(rnd.RandomString(200)); TestItem item2(str2.data(), str2.length()); - // k1 is evicted. + // Insert a dummy handle, k1 is not evicted. ASSERT_OK(sec_cache->Insert("k2", &item2, &CompressedSecondaryCacheTest::helper_)); bool is_in_sec_cache{false}; - std::unique_ptr handle1_1 = - sec_cache->Lookup("k1", test_item_creator, true, is_in_sec_cache); - ASSERT_EQ(handle1_1, nullptr); - std::unique_ptr handle2 = - sec_cache->Lookup("k2", test_item_creator, true, is_in_sec_cache); + std::unique_ptr handle1 = sec_cache->Lookup( + "k1", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); + ASSERT_EQ(handle1, nullptr); + + // Insert k2 and k1 is evicted. + ASSERT_OK(sec_cache->Insert("k2", &item2, + &CompressedSecondaryCacheTest::helper_)); + std::unique_ptr handle2 = sec_cache->Lookup( + "k2", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); ASSERT_NE(handle2, nullptr); std::unique_ptr val2 = std::unique_ptr(static_cast(handle2->Value())); ASSERT_NE(val2, nullptr); ASSERT_EQ(memcmp(val2->Buf(), item2.Buf(), item2.Size()), 0); + // Insert k1 again and a dummy handle is inserted. + ASSERT_OK(sec_cache->Insert("k1", &item1, + &CompressedSecondaryCacheTest::helper_)); + + std::unique_ptr handle1_1 = sec_cache->Lookup( + "k1", test_item_creator, true, /*advise_erase=*/false, is_in_sec_cache); + ASSERT_EQ(handle1_1, nullptr); + // Create Fails. SetFailCreate(true); - std::unique_ptr handle2_1 = - sec_cache->Lookup("k2", test_item_creator, true, is_in_sec_cache); + std::unique_ptr handle2_1 = sec_cache->Lookup( + "k2", test_item_creator, true, /*advise_erase=*/true, is_in_sec_cache); ASSERT_EQ(handle2_1, nullptr); // Save Fails. std::string str3 = rnd.RandomString(10); TestItem item3(str3.data(), str3.length()); + // The Status is OK because a dummy handle is inserted. + ASSERT_OK(sec_cache->Insert("k3", &item3, + &CompressedSecondaryCacheTest::helper_fail_)); ASSERT_NOK(sec_cache->Insert("k3", &item3, &CompressedSecondaryCacheTest::helper_fail_)); @@ -236,41 +267,56 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.compression_type = CompressionType::kNoCompression; } - secondary_cache_opts.capacity = 2300; + secondary_cache_opts.capacity = 6000; secondary_cache_opts.num_shard_bits = 0; std::shared_ptr secondary_cache = NewCompressedSecondaryCache(secondary_cache_opts); LRUCacheOptions lru_cache_opts( - 1300 /* capacity */, 0 /* num_shard_bits */, - false /* strict_capacity_limit */, 0.5 /* high_pri_pool_ratio */, - nullptr /* memory_allocator */, kDefaultToAdaptiveMutex, - kDefaultCacheMetadataChargePolicy); + /*_capacity =*/1300, /*_num_shard_bits =*/0, + /*_strict_capacity_limit =*/false, /*_high_pri_pool_ratio =*/0.5, + /*_memory_allocator =*/nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy, /*_low_pri_pool_ratio =*/0.0); lru_cache_opts.secondary_cache = secondary_cache; std::shared_ptr cache = NewLRUCache(lru_cache_opts); std::shared_ptr stats = CreateDBStatistics(); Random rnd(301); - - std::string str1; - test::CompressibleString(&rnd, 0.5, 1001, &str1); - std::string str1_clone{str1}; - TestItem* item1 = new TestItem(str1.data(), str1.length()); - ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_, - str1.length())); - - std::string str2; - test::CompressibleString(&rnd, 0.5, 1012, &str2); - TestItem* item2 = new TestItem(str2.data(), str2.length()); - // After Insert, cache contains k2 and secondary cache contains k1. - ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, - str2.length())); - - std::string str3; - test::CompressibleString(&rnd, 0.5, 1024, &str3); - TestItem* item3 = new TestItem(str3.data(), str3.length()); - // After Insert, cache contains k3 and secondary cache contains k1 and k2. - ASSERT_OK(cache->Insert("k3", item3, &CompressedSecondaryCacheTest::helper_, - str3.length())); + std::string str1 = rnd.RandomString(1001); + TestItem* item1_1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert( + "k1", item1_1, &CompressedSecondaryCacheTest::helper_, str1.length())); + + std::string str2 = rnd.RandomString(1012); + TestItem* item2_1 = new TestItem(str2.data(), str2.length()); + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's dummy item. + ASSERT_OK(cache->Insert( + "k2", item2_1, &CompressedSecondaryCacheTest::helper_, str2.length())); + + std::string str3 = rnd.RandomString(1024); + TestItem* item3_1 = new TestItem(str3.data(), str3.length()); + // After this Insert, primary cache contains k3 and secondary cache contains + // k1's dummy item and k2's dummy item. + ASSERT_OK(cache->Insert( + "k3", item3_1, &CompressedSecondaryCacheTest::helper_, str3.length())); + + // After this Insert, primary cache contains k1 and secondary cache contains + // k1's dummy item, k2's dummy item, and k3's dummy item. + TestItem* item1_2 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert( + "k1", item1_2, &CompressedSecondaryCacheTest::helper_, str1.length())); + + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's item, k2's dummy item, and k3's dummy item. + TestItem* item2_2 = new TestItem(str2.data(), str2.length()); + ASSERT_OK(cache->Insert( + "k2", item2_2, &CompressedSecondaryCacheTest::helper_, str2.length())); + + // After this Insert, primary cache contains k3 and secondary cache contains + // k1's item and k2's item. + TestItem* item3_2 = new TestItem(str3.data(), str3.length()); + ASSERT_OK(cache->Insert( + "k3", item3_2, &CompressedSecondaryCacheTest::helper_, str3.length())); Cache::Handle* handle; handle = cache->Lookup("k3", &CompressedSecondaryCacheTest::helper_, @@ -279,7 +325,7 @@ class CompressedSecondaryCacheTest : public testing::Test { ASSERT_NE(handle, nullptr); TestItem* val3 = static_cast(cache->Value(handle)); ASSERT_NE(val3, nullptr); - ASSERT_EQ(memcmp(val3->Buf(), item3->Buf(), item3->Size()), 0); + ASSERT_EQ(memcmp(val3->Buf(), item3_2->Buf(), item3_2->Size()), 0); cache->Release(handle); // Lookup an non-existent key. @@ -288,17 +334,26 @@ class CompressedSecondaryCacheTest : public testing::Test { stats.get()); ASSERT_EQ(handle, nullptr); - // This Lookup should promote k1 and erase k1 from the secondary cache, - // then k3 is demoted. So k2 and k3 are in the secondary cache. + // This Lookup should just insert a dummy handle in the primary cache + // and the k1 is still in the secondary cache. handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true, stats.get()); ASSERT_NE(handle, nullptr); TestItem* val1_1 = static_cast(cache->Value(handle)); ASSERT_NE(val1_1, nullptr); - ASSERT_EQ(memcmp(val1_1->Buf(), str1_clone.data(), str1_clone.size()), 0); + ASSERT_EQ(memcmp(val1_1->Buf(), str1.data(), str1.size()), 0); + cache->Release(handle); + + // This Lookup should erase k1 from the secondary cache and insert + // it into primary cache; then k3 is demoted. + handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true, + stats.get()); + ASSERT_NE(handle, nullptr); cache->Release(handle); + // k2 is still in secondary cache. handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true, stats.get()); @@ -321,22 +376,21 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.compression_type = CompressionType::kNoCompression; } - secondary_cache_opts.capacity = 2300; + secondary_cache_opts.capacity = 6000; secondary_cache_opts.num_shard_bits = 0; std::shared_ptr secondary_cache = NewCompressedSecondaryCache(secondary_cache_opts); LRUCacheOptions opts( - 1024 /* capacity */, 0 /* num_shard_bits */, - false /* strict_capacity_limit */, 0.5 /* high_pri_pool_ratio */, - nullptr /* memory_allocator */, kDefaultToAdaptiveMutex, - kDefaultCacheMetadataChargePolicy); + /*_capacity=*/1300, /*_num_shard_bits=*/0, + /*_strict_capacity_limit=*/false, /*_high_pri_pool_ratio=*/0.5, + /*_memory_allocator=*/nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy, /*_low_pri_pool_ratio=*/0.0); opts.secondary_cache = secondary_cache; std::shared_ptr cache = NewLRUCache(opts); Random rnd(301); - std::string str1; - test::CompressibleString(&rnd, 0.5, 1001, &str1); + std::string str1 = rnd.RandomString(1001); auto item1 = std::unique_ptr(new TestItem(str1.data(), str1.length())); ASSERT_NOK(cache->Insert("k1", item1.get(), nullptr, str1.length())); @@ -369,30 +423,28 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.compression_type = CompressionType::kNoCompression; } - secondary_cache_opts.capacity = 2300; + secondary_cache_opts.capacity = 6000; secondary_cache_opts.num_shard_bits = 0; std::shared_ptr secondary_cache = NewCompressedSecondaryCache(secondary_cache_opts); LRUCacheOptions opts( - 1200 /* capacity */, 0 /* num_shard_bits */, - false /* strict_capacity_limit */, 0.5 /* high_pri_pool_ratio */, - nullptr /* memory_allocator */, kDefaultToAdaptiveMutex, - kDefaultCacheMetadataChargePolicy); + /*_capacity=*/1300, /*_num_shard_bits=*/0, + /*_strict_capacity_limit=*/false, /*_high_pri_pool_ratio=*/0.5, + /*_memory_allocator=*/nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy, /*_low_pri_pool_ratio=*/0.0); opts.secondary_cache = secondary_cache; std::shared_ptr cache = NewLRUCache(opts); Random rnd(301); - std::string str1; - test::CompressibleString(&rnd, 0.5, 1001, &str1); + std::string str1 = rnd.RandomString(1001); TestItem* item1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_fail_, str1.length())); - std::string str2; - test::CompressibleString(&rnd, 0.5, 1002, &str2); + std::string str2 = rnd.RandomString(1002); TestItem* item2 = new TestItem(str2.data(), str2.length()); // k1 should be demoted to the secondary cache. ASSERT_OK(cache->Insert("k2", item2, @@ -404,11 +456,11 @@ class CompressedSecondaryCacheTest : public testing::Test { test_item_creator, Cache::Priority::LOW, true); ASSERT_NE(handle, nullptr); cache->Release(handle); - // This lookup should fail, since k1 demotion would have failed + // This lookup should fail, since k1 demotion would have failed. handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_fail_, test_item_creator, Cache::Priority::LOW, true); ASSERT_EQ(handle, nullptr); - // Since k1 didn't get promoted, k2 should still be in cache + // Since k1 was not promoted, k2 should still be in cache. handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_fail_, test_item_creator, Cache::Priority::LOW, true); ASSERT_NE(handle, nullptr); @@ -430,29 +482,27 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.compression_type = CompressionType::kNoCompression; } - secondary_cache_opts.capacity = 2300; + secondary_cache_opts.capacity = 6000; secondary_cache_opts.num_shard_bits = 0; std::shared_ptr secondary_cache = NewCompressedSecondaryCache(secondary_cache_opts); LRUCacheOptions opts( - 1200 /* capacity */, 0 /* num_shard_bits */, - false /* strict_capacity_limit */, 0.5 /* high_pri_pool_ratio */, - nullptr /* memory_allocator */, kDefaultToAdaptiveMutex, - kDefaultCacheMetadataChargePolicy); + /*_capacity=*/1300, /*_num_shard_bits=*/0, + /*_strict_capacity_limit=*/false, /*_high_pri_pool_ratio=*/0.5, + /*_memory_allocator=*/nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy, /*_low_pri_pool_ratio=*/0.0); opts.secondary_cache = secondary_cache; std::shared_ptr cache = NewLRUCache(opts); Random rnd(301); - std::string str1; - test::CompressibleString(&rnd, 0.5, 1001, &str1); + std::string str1 = rnd.RandomString(1001); TestItem* item1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_, str1.length())); - std::string str2; - test::CompressibleString(&rnd, 0.5, 1002, &str2); + std::string str2 = rnd.RandomString(1002); TestItem* item2 = new TestItem(str2.data(), str2.length()); // k1 should be demoted to the secondary cache. ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, @@ -490,35 +540,46 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.compression_type = CompressionType::kNoCompression; } - secondary_cache_opts.capacity = 2300; + secondary_cache_opts.capacity = 6000; secondary_cache_opts.num_shard_bits = 0; std::shared_ptr secondary_cache = NewCompressedSecondaryCache(secondary_cache_opts); LRUCacheOptions opts( - 1200 /* capacity */, 0 /* num_shard_bits */, - true /* strict_capacity_limit */, 0.5 /* high_pri_pool_ratio */, - nullptr /* memory_allocator */, kDefaultToAdaptiveMutex, - kDefaultCacheMetadataChargePolicy); + /*_capacity=*/1300, /*_num_shard_bits=*/0, + /*_strict_capacity_limit=*/false, /*_high_pri_pool_ratio=*/0.5, + /*_memory_allocator=*/nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy, /*_low_pri_pool_ratio=*/0.0); opts.secondary_cache = secondary_cache; std::shared_ptr cache = NewLRUCache(opts); Random rnd(301); - std::string str1; - test::CompressibleString(&rnd, 0.5, 1001, &str1); - TestItem* item1 = new TestItem(str1.data(), str1.length()); - ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_, - str1.length())); + std::string str1 = rnd.RandomString(1001); + TestItem* item1_1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert( + "k1", item1_1, &CompressedSecondaryCacheTest::helper_, str1.length())); - std::string str2; - test::CompressibleString(&rnd, 0.5, 1002, &str2); + std::string str2 = rnd.RandomString(1002); std::string str2_clone{str2}; TestItem* item2 = new TestItem(str2.data(), str2.length()); - // k1 should be demoted to the secondary cache. + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's dummy item. ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, str2.length())); + // After this Insert, primary cache contains k1 and secondary cache contains + // k1's dummy item and k2's dummy item. + TestItem* item1_2 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert( + "k1", item1_2, &CompressedSecondaryCacheTest::helper_, str1.length())); + + TestItem* item2_2 = new TestItem(str2.data(), str2.length()); + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's item and k2's dummy item. + ASSERT_OK(cache->Insert( + "k2", item2_2, &CompressedSecondaryCacheTest::helper_, str2.length())); + Cache::Handle* handle2; handle2 = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true); @@ -527,6 +588,7 @@ class CompressedSecondaryCacheTest : public testing::Test { // k1 promotion should fail because cache is at capacity and // strict_capacity_limit is true, but the lookup should still succeed. + // A k1's dummy item is inserted into primary cache. Cache::Handle* handle1; handle1 = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true); @@ -561,22 +623,24 @@ class CompressedSecondaryCacheTest : public testing::Test { std::make_unique(1000, 0, true, 0.5, 0.0, allocator); Random rnd(301); - // 10000 = 8169 + 1769 + 62 , so there should be 3 chunks after split. - size_t str_size{10000}; + // 8500 = 8169 + 354, so there should be 2 chunks after split. + size_t str_size{8500}; std::string str = rnd.RandomString(static_cast(str_size)); size_t charge{0}; CacheValueChunk* chunks_head = sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge); - ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1)); + ASSERT_EQ(charge, str_size + 2 * (sizeof(CacheValueChunk) - 1)); CacheValueChunk* current_chunk = chunks_head; ASSERT_EQ(current_chunk->size, 8192 - sizeof(CacheValueChunk) + 1); current_chunk = current_chunk->next; - ASSERT_EQ(current_chunk->size, 1792 - sizeof(CacheValueChunk) + 1); - current_chunk = current_chunk->next; - ASSERT_EQ(current_chunk->size, 62); + ASSERT_EQ(current_chunk->size, 354 - sizeof(CacheValueChunk) + 1); - sec_cache->DeletionCallback("dummy", chunks_head); + while (chunks_head != nullptr) { + CacheValueChunk* tmp_chunk = chunks_head; + chunks_head = chunks_head->next; + tmp_chunk->Free(); + } } void MergeChunksIntoValueTest() { @@ -618,7 +682,11 @@ class CompressedSecondaryCacheTest : public testing::Test { std::string value_str{value.get(), charge}; ASSERT_EQ(strcmp(value_str.data(), str.data()), 0); - sec_cache->DeletionCallback("dummy", chunks_head); + while (chunks_head != nullptr) { + CacheValueChunk* tmp_chunk = chunks_head; + chunks_head = chunks_head->next; + tmp_chunk->Free(); + } } void SplictValueAndMergeChunksTest() { @@ -639,13 +707,13 @@ class CompressedSecondaryCacheTest : public testing::Test { std::make_unique(1000, 0, true, 0.5, 0.0, allocator); Random rnd(301); - // 10000 = 8169 + 1769 + 62 , so there should be 3 chunks after split. - size_t str_size{10000}; + // 8500 = 8169 + 354, so there should be 2 chunks after split. + size_t str_size{8500}; std::string str = rnd.RandomString(static_cast(str_size)); size_t charge{0}; CacheValueChunk* chunks_head = sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge); - ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1)); + ASSERT_EQ(charge, str_size + 2 * (sizeof(CacheValueChunk) - 1)); CacheAllocationPtr value = sec_cache->MergeChunksIntoValue(chunks_head, charge); @@ -653,7 +721,11 @@ class CompressedSecondaryCacheTest : public testing::Test { std::string value_str{value.get(), charge}; ASSERT_EQ(strcmp(value_str.data(), str.data()), 0); - sec_cache->DeletionCallback("dummy", chunks_head); + while (chunks_head != nullptr) { + CacheValueChunk* tmp_chunk = chunks_head; + chunks_head = chunks_head->next; + tmp_chunk->Free(); + } } private: diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index 1434e18ba..2457758b8 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -12,6 +12,7 @@ #include #include #include +#include #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" @@ -329,6 +330,19 @@ void LRUCacheShard::EvictFromLRU(size_t charge, } } +void LRUCacheShard::TryInsertIntoSecondaryCache( + autovector evicted_handles) { + for (auto entry : evicted_handles) { + if (secondary_cache_ && entry->IsSecondaryCacheCompatible() && + !entry->IsInSecondaryCache()) { + secondary_cache_->Insert(entry->key(), entry->value, entry->info_.helper) + .PermitUncheckedError(); + } + // Free the entries here outside of mutex for performance reasons. + entry->Free(); + } +} + void LRUCacheShard::SetCapacity(size_t capacity) { autovector last_reference_list; { @@ -339,16 +353,7 @@ void LRUCacheShard::SetCapacity(size_t capacity) { EvictFromLRU(0, &last_reference_list); } - // Try to insert the evicted entries into tiered cache. - // Free the entries outside of mutex for performance reasons. - for (auto entry : last_reference_list) { - if (secondary_cache_ && entry->IsSecondaryCacheCompatible() && - !entry->IsInSecondaryCache()) { - secondary_cache_->Insert(entry->key(), entry->value, entry->info_.helper) - .PermitUncheckedError(); - } - entry->Free(); - } + TryInsertIntoSecondaryCache(last_reference_list); } void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { @@ -411,16 +416,7 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle, } } - // Try to insert the evicted entries into the secondary cache. - // Free the entries here outside of mutex for performance reasons. - for (auto entry : last_reference_list) { - if (secondary_cache_ && entry->IsSecondaryCacheCompatible() && - !entry->IsInSecondaryCache()) { - secondary_cache_->Insert(entry->key(), entry->value, entry->info_.helper) - .PermitUncheckedError(); - } - entry->Free(); - } + TryInsertIntoSecondaryCache(last_reference_list); return s; } @@ -430,23 +426,65 @@ void LRUCacheShard::Promote(LRUHandle* e) { assert(secondary_handle->IsReady()); e->SetIncomplete(false); - e->SetInCache(true); + e->SetInCache(false); e->value = secondary_handle->Value(); e->CalcTotalCharge(secondary_handle->Size(), metadata_charge_policy_); delete secondary_handle; - // This call could fail if the cache is over capacity and - // strict_capacity_limit_ is true. In such a case, we don't want - // InsertItem() to free the handle, since the item is already in memory - // and the caller will most likely just read from disk if we erase it here. if (e->value) { - Cache::Handle* handle = reinterpret_cast(e); - Status s = InsertItem(e, &handle, /*free_handle_on_fail=*/false); + Status s; + if (secondary_cache_ && secondary_cache_->SupportForceErase() && + e->IsStandalone()) { + // Insert a dummy handle and return a standalone handle to caller. + // Charge the standalone handle. + autovector last_reference_list; + bool free_standalone_handle{false}; + { + DMutexLock l(mutex_); + + // Free the space following strict LRU policy until enough space + // is freed or the lru list is empty. + EvictFromLRU(e->total_charge, &last_reference_list); + + if ((usage_ + e->total_charge) > capacity_ && strict_capacity_limit_) { + free_standalone_handle = true; + } else { + usage_ += e->total_charge; + } + } + + TryInsertIntoSecondaryCache(last_reference_list); + if (free_standalone_handle) { + e->Unref(); + e->Free(); + e = nullptr; + } + + // Insert a dummy handle into the primary cache. This dummy handle is + // not IsSecondaryCacheCompatible(). + Cache::Priority priority = + e->IsHighPri() ? Cache::Priority::HIGH : Cache::Priority::LOW; + s = Insert(e->key(), e->hash, /*value=*/nullptr, 0, + /*deleter=*/nullptr, /*helper=*/nullptr, /*handle=*/nullptr, + priority); + } else { + e->SetInCache(true); + e->SetIsStandalone(false); + Cache::Handle* handle = reinterpret_cast(e); + // This InsertItem() could fail if the cache is over capacity and + // strict_capacity_limit_ is true. In such a case, we don't want + // InsertItem() to free the handle, since the item is already in memory + // and the caller will most likely just read it from disk if we erase it + // here. + s = InsertItem(e, &handle, /*free_handle_on_fail=*/false); + } + if (!s.ok()) { // Item is in memory, but not accounted against the cache capacity. // When the handle is released, the item should get deleted. assert(!e->InCache()); } + } else { // Since the secondary cache lookup failed, mark the item as not in cache // Don't charge the cache as its only metadata that'll shortly be released @@ -454,6 +492,7 @@ void LRUCacheShard::Promote(LRUHandle* e) { // TODO e->CalcTotalCharge(0, metadata_charge_policy_); e->SetInCache(false); + e->SetIsStandalone(false); } } @@ -463,34 +502,59 @@ Cache::Handle* LRUCacheShard::Lookup( const ShardedCache::CreateCallback& create_cb, Cache::Priority priority, bool wait, Statistics* stats) { LRUHandle* e = nullptr; + bool found_dummy_entry{false}; { DMutexLock l(mutex_); e = table_.Lookup(key, hash); if (e != nullptr) { assert(e->InCache()); if (!e->HasRefs()) { - // The entry is in LRU since it's in hash and has no external references + // The entry is in LRU since it's in hash and has no external + // references. LRU_Remove(e); } e->Ref(); e->SetHit(); + + // For a dummy handle, if it was retrieved from secondary cache, + // it may still exist in secondary cache. + // If the handle exists in secondary cache, the value should be + // erased from sec cache and be inserted into primary cache. + if (!e->value && secondary_cache_ && + secondary_cache_->SupportForceErase()) { + found_dummy_entry = true; + } } } - // If handle table lookup failed, then allocate a handle outside the - // mutex if we're going to lookup in the secondary cache. + // If handle table lookup failed or the handle is a dummy one, allocate + // a handle outside the mutex if we re going to lookup in the secondary cache. + // + // When a block is firstly Lookup from CompressedSecondaryCache, we just + // insert a dummy block into the primary cache (charging the actual size of + // the block) and don't erase the block from CompressedSecondaryCache. A + // standalone handle is returned to the caller. Only if the block is hit + // again, we erase it from CompressedSecondaryCache and add it into the + // primary cache. + // // Only support synchronous for now. // TODO: Support asynchronous lookup in secondary cache - if (!e && secondary_cache_ && helper && helper->saveto_cb) { + if ((!e || found_dummy_entry) && secondary_cache_ && helper && + helper->saveto_cb) { // For objects from the secondary cache, we expect the caller to provide // a way to create/delete the primary cache object. The only case where // a deleter would not be required is for dummy entries inserted for // accounting purposes, which we won't demote to the secondary cache // anyway. assert(create_cb && helper->del_cb); + // Release the dummy handle. + if (e) { + Release(reinterpret_cast(e), true /*erase_if_last_ref*/); + } bool is_in_sec_cache{false}; std::unique_ptr secondary_handle = - secondary_cache_->Lookup(key, create_cb, wait, is_in_sec_cache); + secondary_cache_->Lookup(key, create_cb, wait, found_dummy_entry, + is_in_sec_cache); if (secondary_handle != nullptr) { e = reinterpret_cast( new char[sizeof(LRUHandle) - 1 + key.size()]); @@ -510,16 +574,22 @@ Cache::Handle* LRUCacheShard::Lookup( e->Ref(); e->SetIsInSecondaryCache(is_in_sec_cache); + if (secondary_cache_->SupportForceErase() && !found_dummy_entry) { + e->SetIsStandalone(true); + } + if (wait) { Promote(e); - if (!e->value) { - // The secondary cache returned a handle, but the lookup failed. - e->Unref(); - e->Free(); - e = nullptr; - } else { - PERF_COUNTER_ADD(secondary_cache_hit_count, 1); - RecordTick(stats, SECONDARY_CACHE_HITS); + if (e) { + if (!e->value) { + // The secondary cache returned a handle, but the lookup failed. + e->Unref(); + e->Free(); + e = nullptr; + } else { + PERF_COUNTER_ADD(secondary_cache_hit_count, 1); + RecordTick(stats, SECONDARY_CACHE_HITS); + } } } else { // If wait is false, we always return a handle and let the caller @@ -530,6 +600,8 @@ Cache::Handle* LRUCacheShard::Lookup( PERF_COUNTER_ADD(secondary_cache_hit_count, 1); RecordTick(stats, SECONDARY_CACHE_HITS); } + } else { + e = nullptr; } } return reinterpret_cast(e); diff --git a/cache/lru_cache.h b/cache/lru_cache.h index bdb6c44ab..b60d5ac7b 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -40,8 +40,11 @@ namespace lru_cache { // In that case the entry is not in the LRU list and not in hash table. // The entry can be freed when refs becomes 0. // (refs >= 1 && in_cache == false) -// -// All newly created LRUHandles are in state 1. If you call +// 4. The handle is never inserted into the LRUCache (both hash table and LRU +// list) and it doesn't experience the above three states. +// The entry can be freed when refs becomes 0. +// (refs >= 1 && in_cache == false && IS_STANDALONE == true) +// All newly created LRUHandles are in state 1 or 4. If you call // LRUCacheShard::Release on entry in state 1, it will go into state 2. // To move from state 1 to state 3, either call LRUCacheShard::Erase or // LRUCacheShard::Insert with the same key (but possibly different value). @@ -93,6 +96,9 @@ struct LRUHandle { IS_LOW_PRI = (1 << 7), // Whether this entry is in low-pri pool. IN_LOW_PRI_POOL = (1 << 8), + // Whether this entry is not inserted into the cache (both hash table and + // LRU list). + IS_STANDALONE = (1 << 9), }; uint16_t flags; @@ -138,6 +144,7 @@ struct LRUHandle { } bool IsPending() const { return flags & IS_PENDING; } bool IsInSecondaryCache() const { return flags & IS_IN_SECONDARY_CACHE; } + bool IsStandalone() const { return flags & IS_STANDALONE; } void SetInCache(bool in_cache) { if (in_cache) { @@ -205,6 +212,14 @@ struct LRUHandle { } } + void SetIsStandalone(bool is_standalone) { + if (is_standalone) { + flags |= IS_STANDALONE; + } else { + flags &= ~IS_STANDALONE; + } + } + void Free() { assert(refs == 0); #ifdef __SANITIZE_THREAD__ @@ -435,6 +450,9 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { // holding the mutex_. void EvictFromLRU(size_t charge, autovector* deleted); + // Try to insert the evicted handles into the secondary cache. + void TryInsertIntoSecondaryCache(autovector evicted_handles); + // Initialized before use. size_t capacity_; diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index cb472538d..a5021dd75 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -46,10 +46,11 @@ class LRUCacheTest : public testing::Test { DeleteCache(); cache_ = reinterpret_cast( port::cacheline_aligned_alloc(sizeof(LRUCacheShard))); - new (cache_) LRUCacheShard( - capacity, false /*strict_capcity_limit*/, high_pri_pool_ratio, - low_pri_pool_ratio, use_adaptive_mutex, kDontChargeCacheMetadata, - 24 /*max_upper_hash_bits*/, nullptr /*secondary_cache*/); + new (cache_) LRUCacheShard(capacity, /*strict_capacity_limit=*/false, + high_pri_pool_ratio, low_pri_pool_ratio, + use_adaptive_mutex, kDontChargeCacheMetadata, + /*max_upper_hash_bits=*/24, + /*secondary_cache=*/nullptr); } void Insert(const std::string& key, @@ -742,7 +743,7 @@ class TestSecondaryCache : public SecondaryCache { std::unique_ptr Lookup( const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/, - bool& is_in_sec_cache) override { + bool /*advise_erase*/, bool& is_in_sec_cache) override { std::string key_str = key.ToString(); TEST_SYNC_POINT_CALLBACK("TestSecondaryCache::Lookup", &key_str); @@ -780,6 +781,8 @@ class TestSecondaryCache : public SecondaryCache { return secondary_handle; } + bool SupportForceErase() const override { return false; } + void Erase(const Slice& /*key*/) override {} void WaitAll(std::vector handles) override { @@ -960,7 +963,7 @@ TEST_F(LRUCacheSecondaryCacheTest, BasicTest) { TestItem* item1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert(k1.AsSlice(), item1, &LRUCacheSecondaryCacheTest::helper_, str1.length())); - std::string str2 = rnd.RandomString(1020); + std::string str2 = rnd.RandomString(1021); TestItem* item2 = new TestItem(str2.data(), str2.length()); // k1 should be demoted to NVM ASSERT_OK(cache->Insert(k2.AsSlice(), item2, diff --git a/db/blob/blob_source_test.cc b/db/blob/blob_source_test.cc index a8323bc28..8a4964ddf 100644 --- a/db/blob/blob_source_test.cc +++ b/db/blob/blob_source_test.cc @@ -1062,7 +1062,8 @@ class BlobSecondaryCacheTest : public DBTestBase { secondary_cache_opts_.capacity = 8 << 20; // 8 MB secondary_cache_opts_.num_shard_bits = 0; - secondary_cache_opts_.metadata_charge_policy = kDontChargeCacheMetadata; + secondary_cache_opts_.metadata_charge_policy = + kDefaultCacheMetadataChargePolicy; // Read blobs from the secondary cache if they are not in the primary cache options_.lowest_used_cache_tier = CacheTier::kNonVolatileBlockTier; @@ -1166,6 +1167,25 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { get_perf_context()->Reset(); // key0 should be filled to the primary cache from the blob file. + ASSERT_OK(blob_source.GetBlob(read_options, keys[0], file_number, + blob_offsets[0], file_size, blob_sizes[0], + kNoCompression, nullptr /* prefetch_buffer */, + &values[0], nullptr /* bytes_read */)); + // Release cache handle + values[0].Reset(); + + // key0 should be evicted and key0's dummy item is inserted into secondary + // cache. key1 should be filled to the primary cache from the blob file. + ASSERT_OK(blob_source.GetBlob(read_options, keys[1], file_number, + blob_offsets[1], file_size, blob_sizes[1], + kNoCompression, nullptr /* prefetch_buffer */, + &values[1], nullptr /* bytes_read */)); + + // Release cache handle + values[1].Reset(); + + // key0 should be filled to the primary cache from the blob file. key1 + // should be evicted and key1's dummy item is inserted into secondary cache. ASSERT_OK(blob_source.GetBlob(read_options, keys[0], file_number, blob_offsets[0], file_size, blob_sizes[0], kNoCompression, nullptr /* prefetch_buffer */, @@ -1177,8 +1197,8 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { // Release cache handle values[0].Reset(); - // key0 should be demoted to the secondary cache, and key1 should be filled - // to the primary cache from the blob file. + // key0 should be evicted and is inserted into secondary cache. + // key1 should be filled to the primary cache from the blob file. ASSERT_OK(blob_source.GetBlob(read_options, keys[1], file_number, blob_offsets[1], file_size, blob_sizes[1], kNoCompression, nullptr /* prefetch_buffer */, @@ -1202,11 +1222,11 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { auto handle0 = blob_cache->Lookup(key0, statistics); ASSERT_EQ(handle0, nullptr); - // key0 should be in the secondary cache. After looking up key0 in the - // secondary cache, it will be erased from the secondary cache. + // key0's item should be in the secondary cache. bool is_in_sec_cache = false; auto sec_handle0 = - secondary_cache->Lookup(key0, create_cb, true, is_in_sec_cache); + secondary_cache->Lookup(key0, create_cb, true, + /*advise_erase=*/true, is_in_sec_cache); ASSERT_FALSE(is_in_sec_cache); ASSERT_NE(sec_handle0, nullptr); ASSERT_TRUE(sec_handle0->IsReady()); @@ -1215,12 +1235,14 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { ASSERT_EQ(value->data(), blobs[0]); delete value; - // key0 doesn't exist in the blob cache + // key0 doesn't exist in the blob cache although key0's dummy + // item exist in the secondary cache. ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[0])); } - // key1 should exist in the primary cache. + // key1 should exists in the primary cache. key1's dummy item exists + // in the secondary cache. { CacheKey cache_key = base_cache_key.WithOffset(blob_offsets[1]); const Slice key1 = cache_key.AsSlice(); @@ -1230,7 +1252,8 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { bool is_in_sec_cache = false; auto sec_handle1 = - secondary_cache->Lookup(key1, create_cb, true, is_in_sec_cache); + secondary_cache->Lookup(key1, create_cb, true, + /*advise_erase=*/true, is_in_sec_cache); ASSERT_FALSE(is_in_sec_cache); ASSERT_EQ(sec_handle1, nullptr); @@ -1240,6 +1263,7 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { { // fetch key0 from the blob file to the primary cache. + // key1 is evicted and inserted into the secondary cache. ASSERT_OK(blob_source.GetBlob( read_options, keys[0], file_number, blob_offsets[0], file_size, blob_sizes[0], kNoCompression, nullptr /* prefetch_buffer */, @@ -1259,8 +1283,7 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { ASSERT_EQ(value->data(), blobs[0]); blob_cache->Release(handle0); - // key1 is not in the primary cache, and it should be demoted to the - // secondary cache. + // key1 is not in the primary cache and is in the secondary cache. CacheKey cache_key1 = base_cache_key.WithOffset(blob_offsets[1]); const Slice key1 = cache_key1.AsSlice(); auto handle1 = blob_cache->Lookup(key1, statistics); @@ -1274,17 +1297,19 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { // key1 promotion should succeed due to the primary cache being empty. we // did't call secondary cache's Lookup() here, because it will remove the // key but it won't be able to promote the key to the primary cache. - // Instead we use the end-to-end blob source API to promote the key to - // the primary cache. + // Instead we use the end-to-end blob source API to read key1. + // In function TEST_BlobInCache, key1's dummy item is inserted into the + // primary cache and a standalone handle is checked by GetValue(). ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[1])); - // key1 should be in the primary cache. + // key1's dummy handle is in the primary cache and key1's item is still + // in the secondary cache. So, the primary cache's Lookup() can only + // get a dummy handle. handle1 = blob_cache->Lookup(key1, statistics); ASSERT_NE(handle1, nullptr); - value = static_cast(blob_cache->Value(handle1)); - ASSERT_NE(value, nullptr); - ASSERT_EQ(value->data(), blobs[1]); + // handl1 is a dummy handle. + ASSERT_EQ(blob_cache->Value(handle1), nullptr); blob_cache->Release(handle1); } } diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index bf5f646bb..c7fbc332a 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -1734,28 +1734,45 @@ TEST_F(DBBlobBasicTest, WarmCacheWithBlobsSecondary) { constexpr size_t second_blob_size = 768; const std::string second_blob(second_blob_size, 'b'); - // First blob gets inserted into primary cache during flush + // First blob is inserted into primary cache during flush. ASSERT_OK(Put(first_key, first_blob)); ASSERT_OK(Flush()); ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 1); - // Second blob gets inserted into primary cache during flush, first blob gets - // evicted to secondary cache + // Second blob is inserted into primary cache during flush, + // First blob is evicted but only a dummy handle is inserted into secondary + // cache. ASSERT_OK(Put(second_key, second_blob)); ASSERT_OK(Flush()); ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 1); - // First blob gets promoted back to primary cache b/c of lookup, second blob - // gets evicted to secondary cache + // First blob is inserted into primary cache. + // Second blob is evicted but only a dummy handle is inserted into secondary + // cache. + ASSERT_EQ(Get(first_key), first_blob); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 1); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 0); + ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS), + 0); + // Second blob is inserted into primary cache, + // First blob is evicted and is inserted into secondary cache. + ASSERT_EQ(Get(second_key), second_blob); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 1); + ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 0); + ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS), + 0); + + // First blob's dummy item is inserted into primary cache b/c of lookup. + // Second blob is still in primary cache. ASSERT_EQ(Get(first_key), first_blob); ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 0); ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 1); ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS), 1); - // Second blob gets promoted back to primary cache b/c of lookup, first blob - // gets evicted to secondary cache - ASSERT_EQ(Get(second_key), second_blob); + // First blob's item is inserted into primary cache b/c of lookup. + // Second blob is evicted and inserted into secondary cache. + ASSERT_EQ(Get(first_key), first_blob); ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 0); ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 1); ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS), diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 8bad61b52..1e87f5f72 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -159,13 +159,13 @@ struct CompressedSecondaryCacheOptions : LRUCacheOptions { CompressedSecondaryCacheOptions() {} CompressedSecondaryCacheOptions( size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit, - double _high_pri_pool_ratio, + double _high_pri_pool_ratio, double _low_pri_pool_ratio = 0.0, std::shared_ptr _memory_allocator = nullptr, bool _use_adaptive_mutex = kDefaultToAdaptiveMutex, CacheMetadataChargePolicy _metadata_charge_policy = kDefaultCacheMetadataChargePolicy, CompressionType _compression_type = CompressionType::kLZ4Compression, - uint32_t _compress_format_version = 2, double _low_pri_pool_ratio = 0.0) + uint32_t _compress_format_version = 2) : LRUCacheOptions(_capacity, _num_shard_bits, _strict_capacity_limit, _high_pri_pool_ratio, std::move(_memory_allocator), _use_adaptive_mutex, _metadata_charge_policy, @@ -179,12 +179,13 @@ struct CompressedSecondaryCacheOptions : LRUCacheOptions { extern std::shared_ptr NewCompressedSecondaryCache( size_t capacity, int num_shard_bits = -1, bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.5, + double low_pri_pool_ratio = 0.0, std::shared_ptr memory_allocator = nullptr, bool use_adaptive_mutex = kDefaultToAdaptiveMutex, CacheMetadataChargePolicy metadata_charge_policy = kDefaultCacheMetadataChargePolicy, CompressionType compression_type = CompressionType::kLZ4Compression, - uint32_t compress_format_version = 2, double low_pri_pool_ratio = 0.0); + uint32_t compress_format_version = 2); extern std::shared_ptr NewCompressedSecondaryCache( const CompressedSecondaryCacheOptions& opts); diff --git a/include/rocksdb/secondary_cache.h b/include/rocksdb/secondary_cache.h index 7f3f4f430..cb22c2f51 100644 --- a/include/rocksdb/secondary_cache.h +++ b/include/rocksdb/secondary_cache.h @@ -68,20 +68,31 @@ class SecondaryCache : public Customizable { // Lookup the data for the given key in this cache. The create_cb // will be used to create the object. The handle returned may not be // ready yet, unless wait=true, in which case Lookup() will block until - // the handle is ready. is_in_sec_cache is to indicate whether the - // handle is possibly erased from the secondary cache after the Lookup. + // the handle is ready. + // + // advise_erase is a hint from the primary cache indicating that the handle + // will be cached there, so the secondary cache is advised to drop it from + // the cache as an optimization. To use this feature, SupportForceErase() + // needs to return true. + // This hint can also be safely ignored. + // + // is_in_sec_cache is to indicate whether the handle is possibly erased + // from the secondary cache after the Lookup. virtual std::unique_ptr Lookup( const Slice& key, const Cache::CreateCallback& create_cb, bool wait, - bool& is_in_sec_cache) = 0; + bool advise_erase, bool& is_in_sec_cache) = 0; + + // Indicate whether a handle can be erased in this secondary cache. + virtual bool SupportForceErase() const = 0; // At the discretion of the implementation, erase the data associated - // with key + // with key. virtual void Erase(const Slice& key) = 0; - // Wait for a collection of handles to become ready + // Wait for a collection of handles to become ready. virtual void WaitAll(std::vector handles) = 0; - virtual std::string GetPrintableOptions() const override = 0; + virtual std::string GetPrintableOptions() const = 0; }; } // namespace ROCKSDB_NAMESPACE diff --git a/options/customizable_test.cc b/options/customizable_test.cc index e7ab2cd08..9d3c86c62 100644 --- a/options/customizable_test.cc +++ b/options/customizable_test.cc @@ -1330,10 +1330,13 @@ class TestSecondaryCache : public SecondaryCache { } std::unique_ptr Lookup( const Slice& /*key*/, const Cache::CreateCallback& /*create_cb*/, - bool /*wait*/, bool& is_in_sec_cache) override { + bool /*wait*/, bool /*advise_erase*/, bool& is_in_sec_cache) override { is_in_sec_cache = true; return nullptr; } + + bool SupportForceErase() const override { return false; } + void Erase(const Slice& /*key*/) override {} // Wait for a collection of handles to become ready diff --git a/utilities/fault_injection_secondary_cache.cc b/utilities/fault_injection_secondary_cache.cc index 502fd773b..2758c2a19 100644 --- a/utilities/fault_injection_secondary_cache.cc +++ b/utilities/fault_injection_secondary_cache.cc @@ -87,17 +87,18 @@ Status FaultInjectionSecondaryCache::Insert( std::unique_ptr FaultInjectionSecondaryCache::Lookup(const Slice& key, const Cache::CreateCallback& create_cb, - bool wait, bool& is_in_sec_cache) { + bool wait, bool advise_erase, + bool& is_in_sec_cache) { ErrorContext* ctx = GetErrorContext(); if (base_is_compressed_sec_cache_) { if (ctx->rand.OneIn(prob_)) { return nullptr; } else { - return base_->Lookup(key, create_cb, wait, is_in_sec_cache); + return base_->Lookup(key, create_cb, wait, advise_erase, is_in_sec_cache); } } else { std::unique_ptr hdl = - base_->Lookup(key, create_cb, wait, is_in_sec_cache); + base_->Lookup(key, create_cb, wait, advise_erase, is_in_sec_cache); if (wait && ctx->rand.OneIn(prob_)) { hdl.reset(); } diff --git a/utilities/fault_injection_secondary_cache.h b/utilities/fault_injection_secondary_cache.h index acd960747..582ea14ed 100644 --- a/utilities/fault_injection_secondary_cache.h +++ b/utilities/fault_injection_secondary_cache.h @@ -36,7 +36,9 @@ class FaultInjectionSecondaryCache : public SecondaryCache { std::unique_ptr Lookup( const Slice& key, const Cache::CreateCallback& create_cb, bool wait, - bool& is_in_sec_cache) override; + bool advise_erase, bool& is_in_sec_cache) override; + + bool SupportForceErase() const override { return base_->SupportForceErase(); } void Erase(const Slice& key) override;