diff --git a/HISTORY.md b/HISTORY.md index eacde6fd8..82478a396 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -33,6 +33,7 @@ * Added new perf context counters `block_cache_standalone_handle_count`, `block_cache_real_handle_count`,`compressed_sec_cache_insert_real_count`, `compressed_sec_cache_insert_dummy_count`, `compressed_sec_cache_uncompressed_bytes`, and `compressed_sec_cache_compressed_bytes`. * Memory for blobs which are to be inserted into the blob cache is now allocated using the cache's allocator (see #10628 and #10647). * HyperClockCache is an experimental, lock-free Cache alternative for block cache that offers much improved CPU efficiency under high parallel load or high contention, with some caveats. As much as 4.5x higher ops/sec vs. LRUCache has been seen in db_bench under high parallel load. +* `CompressedSecondaryCacheOptions::enable_custom_split_merge` is added for enabling the custom split and merge feature, which split the compressed value into chunks so that they may better fit jemalloc bins. ### Performance Improvements * Iterator performance is improved for `DeleteRange()` users. Internally, iterator will skip to the end of a range tombstone when possible, instead of looping through each key and check individually if a key is range deleted. diff --git a/cache/cache.cc b/cache/cache.cc index 769cf8246..7d23fb757 100644 --- a/cache/cache.cc +++ b/cache/cache.cc @@ -58,6 +58,11 @@ static std::unordered_map compress_format_version), OptionType::kUInt32T, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"enable_custom_split_merge", + {offsetof(struct CompressedSecondaryCacheOptions, + enable_custom_split_merge), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, }; #endif // ROCKSDB_LITE diff --git a/cache/compressed_secondary_cache.cc b/cache/compressed_secondary_cache.cc index b255d4e50..fd33de4c6 100644 --- a/cache/compressed_secondary_cache.cc +++ b/cache/compressed_secondary_cache.cc @@ -21,11 +21,13 @@ CompressedSecondaryCache::CompressedSecondaryCache( double high_pri_pool_ratio, double low_pri_pool_ratio, std::shared_ptr memory_allocator, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, - CompressionType compression_type, uint32_t compress_format_version) + CompressionType compression_type, uint32_t compress_format_version, + bool enable_custom_split_merge) : cache_options_(capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, low_pri_pool_ratio, memory_allocator, use_adaptive_mutex, metadata_charge_policy, - compression_type, compress_format_version) { + compression_type, compress_format_version, + enable_custom_split_merge) { cache_ = NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, @@ -50,13 +52,24 @@ std::unique_ptr CompressedSecondaryCache::Lookup( return nullptr; } - CacheAllocationPtr* ptr = reinterpret_cast(handle_value); + CacheAllocationPtr* ptr{nullptr}; + CacheAllocationPtr merged_value; + size_t handle_value_charge{0}; + if (cache_options_.enable_custom_split_merge) { + CacheValueChunk* value_chunk_ptr = + reinterpret_cast(handle_value); + merged_value = MergeChunksIntoValue(value_chunk_ptr, handle_value_charge); + ptr = &merged_value; + } else { + ptr = reinterpret_cast(handle_value); + handle_value_charge = cache_->GetCharge(lru_handle); + } Status s; void* value{nullptr}; size_t charge{0}; if (cache_options_.compression_type == kNoCompression) { - s = create_cb(ptr->get(), cache_->GetCharge(lru_handle), &value, &charge); + s = create_cb(ptr->get(), handle_value_charge, &value, &charge); } else { UncompressionContext uncompression_context(cache_options_.compression_type); UncompressionInfo uncompression_info(uncompression_context, @@ -65,7 +78,7 @@ std::unique_ptr CompressedSecondaryCache::Lookup( size_t uncompressed_size{0}; CacheAllocationPtr uncompressed = UncompressData( - uncompression_info, (char*)ptr->get(), cache_->GetCharge(lru_handle), + uncompression_info, (char*)ptr->get(), handle_value_charge, &uncompressed_size, cache_options_.compress_format_version, cache_options_.memory_allocator.get()); @@ -84,7 +97,9 @@ std::unique_ptr CompressedSecondaryCache::Lookup( if (advise_erase) { cache_->Release(lru_handle, /*erase_if_last_ref=*/true); // Insert a dummy handle. - cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, DeletionCallback) + cache_ + ->Insert(key, /*value=*/nullptr, /*charge=*/0, + GetDeletionCallback(cache_options_.enable_custom_split_merge)) .PermitUncheckedError(); } else { is_in_sec_cache = true; @@ -101,12 +116,12 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value, } Cache::Handle* lru_handle = cache_->Lookup(key); + Cache::DeleterFn del_cb = + GetDeletionCallback(cache_options_.enable_custom_split_merge); if (lru_handle == nullptr) { PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1); // Insert a dummy handle if the handle is evicted for the first time. - return cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, - DeletionCallback); - + return cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, del_cb); } else { cache_->Release(lru_handle, /*erase_if_last_ref=*/false); } @@ -142,13 +157,23 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value, val = Slice(compressed_val); size = compressed_val.size(); PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, size); - ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); - memcpy(ptr.get(), compressed_val.data(), size); + + if (!cache_options_.enable_custom_split_merge) { + ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); + memcpy(ptr.get(), compressed_val.data(), size); + } } - CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1); - return cache_->Insert(key, buf, size, DeletionCallback); + if (cache_options_.enable_custom_split_merge) { + size_t charge{0}; + CacheValueChunk* value_chunks_head = + SplitValueIntoChunks(val, cache_options_.compression_type, charge); + return cache_->Insert(key, value_chunks_head, charge, del_cb); + } else { + CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); + return cache_->Insert(key, buf, size, del_cb); + } } void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } @@ -238,10 +263,24 @@ CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue( return ptr; } -void CompressedSecondaryCache::DeletionCallback(const Slice& /*key*/, - void* obj) { - delete reinterpret_cast(obj); - obj = nullptr; +Cache::DeleterFn CompressedSecondaryCache::GetDeletionCallback( + bool enable_custom_split_merge) { + if (enable_custom_split_merge) { + return [](const Slice& /*key*/, void* obj) { + CacheValueChunk* chunks_head = reinterpret_cast(obj); + while (chunks_head != nullptr) { + CacheValueChunk* tmp_chunk = chunks_head; + chunks_head = chunks_head->next; + tmp_chunk->Free(); + obj = nullptr; + }; + }; + } else { + return [](const Slice& /*key*/, void* obj) { + delete reinterpret_cast(obj); + obj = nullptr; + }; + } } std::shared_ptr NewCompressedSecondaryCache( @@ -249,11 +288,13 @@ std::shared_ptr NewCompressedSecondaryCache( double high_pri_pool_ratio, double low_pri_pool_ratio, std::shared_ptr memory_allocator, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, - CompressionType compression_type, uint32_t compress_format_version) { + CompressionType compression_type, uint32_t compress_format_version, + bool enable_custom_split_merge) { return std::make_shared( capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, low_pri_pool_ratio, memory_allocator, use_adaptive_mutex, - metadata_charge_policy, compression_type, compress_format_version); + metadata_charge_policy, compression_type, compress_format_version, + enable_custom_split_merge); } std::shared_ptr NewCompressedSecondaryCache( @@ -264,7 +305,8 @@ std::shared_ptr NewCompressedSecondaryCache( opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit, opts.high_pri_pool_ratio, opts.low_pri_pool_ratio, opts.memory_allocator, opts.use_adaptive_mutex, opts.metadata_charge_policy, - opts.compression_type, opts.compress_format_version); + opts.compression_type, opts.compress_format_version, + opts.enable_custom_split_merge); } } // namespace ROCKSDB_NAMESPACE diff --git a/cache/compressed_secondary_cache.h b/cache/compressed_secondary_cache.h index 996d831e1..041e96f91 100644 --- a/cache/compressed_secondary_cache.h +++ b/cache/compressed_secondary_cache.h @@ -76,7 +76,8 @@ class CompressedSecondaryCache : public SecondaryCache { CacheMetadataChargePolicy metadata_charge_policy = kDefaultCacheMetadataChargePolicy, CompressionType compression_type = CompressionType::kLZ4Compression, - uint32_t compress_format_version = 2); + uint32_t compress_format_version = 2, + bool enable_custom_split_merge = false); virtual ~CompressedSecondaryCache() override; const char* Name() const override { return "CompressedSecondaryCache"; } @@ -98,10 +99,8 @@ class CompressedSecondaryCache : public SecondaryCache { private: friend class CompressedSecondaryCacheTest; - static constexpr std::array malloc_bin_sizes_{ - 32, 64, 96, 128, 160, 192, 224, 256, 320, 384, 448, - 512, 640, 768, 896, 1024, 1280, 1536, 1792, 2048, 2560, 3072, - 3584, 4096, 5120, 6144, 7168, 8192, 10240, 12288, 14336, 16384, 32768}; + static constexpr std::array malloc_bin_sizes_{ + 128, 256, 512, 1024, 2048, 4096, 8192, 16384}; struct CacheValueChunk { // TODO try "CacheAllocationPtr next;". @@ -126,7 +125,7 @@ class CompressedSecondaryCache : public SecondaryCache { size_t& charge); // An implementation of Cache::DeleterFn. - static void DeletionCallback(const Slice& /*key*/, void* obj); + static Cache::DeleterFn GetDeletionCallback(bool enable_custom_split_merge); std::shared_ptr cache_; CompressedSecondaryCacheOptions cache_options_; }; diff --git a/cache/compressed_secondary_cache_test.cc b/cache/compressed_secondary_cache_test.cc index 24b734a77..3057f84e5 100644 --- a/cache/compressed_secondary_cache_test.cc +++ b/cache/compressed_secondary_cache_test.cc @@ -8,6 +8,7 @@ #include #include #include +#include #include "cache/lru_cache.h" #include "memory/jemalloc_nodump_allocator.h" @@ -111,11 +112,12 @@ class CompressedSecondaryCacheTest : public testing::Test { // Insert and Lookup the item k1 for the second time and advise erasing it. ASSERT_OK(sec_cache->Insert("k1", &item1, &CompressedSecondaryCacheTest::helper_)); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 1); + std::unique_ptr handle1_2 = sec_cache->Lookup( "k1", test_item_creator, true, /*advise_erase=*/true, is_in_sec_cache); ASSERT_NE(handle1_2, nullptr); ASSERT_FALSE(is_in_sec_cache); - ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 1); if (sec_cache_is_compressed) { ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, 1000); @@ -282,7 +284,8 @@ class CompressedSecondaryCacheTest : public testing::Test { sec_cache.reset(); } - void BasicIntegrationTest(bool sec_cache_is_compressed) { + void BasicIntegrationTest(bool sec_cache_is_compressed, + bool enable_custom_split_merge) { CompressedSecondaryCacheOptions secondary_cache_opts; if (sec_cache_is_compressed) { @@ -297,6 +300,7 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.capacity = 6000; secondary_cache_opts.num_shard_bits = 0; + secondary_cache_opts.enable_custom_split_merge = enable_custom_split_merge; std::shared_ptr secondary_cache = NewCompressedSecondaryCache(secondary_cache_opts); LRUCacheOptions lru_cache_opts( @@ -680,24 +684,22 @@ class CompressedSecondaryCacheTest : public testing::Test { std::make_unique(1000, 0, true, 0.5, 0.0, allocator); Random rnd(301); - // 8500 = 8169 + 354, so there should be 2 chunks after split. + // 8500 = 8169 + 233 + 98, so there should be 3 chunks after split. size_t str_size{8500}; std::string str = rnd.RandomString(static_cast(str_size)); size_t charge{0}; CacheValueChunk* chunks_head = sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge); - ASSERT_EQ(charge, str_size + 2 * (sizeof(CacheValueChunk) - 1)); + ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1)); CacheValueChunk* current_chunk = chunks_head; ASSERT_EQ(current_chunk->size, 8192 - sizeof(CacheValueChunk) + 1); current_chunk = current_chunk->next; - ASSERT_EQ(current_chunk->size, 354 - sizeof(CacheValueChunk) + 1); + ASSERT_EQ(current_chunk->size, 256 - sizeof(CacheValueChunk) + 1); + current_chunk = current_chunk->next; + ASSERT_EQ(current_chunk->size, 98); - while (chunks_head != nullptr) { - CacheValueChunk* tmp_chunk = chunks_head; - chunks_head = chunks_head->next; - tmp_chunk->Free(); - } + sec_cache->GetDeletionCallback(true)("dummy", chunks_head); } void MergeChunksIntoValueTest() { @@ -764,13 +766,13 @@ class CompressedSecondaryCacheTest : public testing::Test { std::make_unique(1000, 0, true, 0.5, 0.0, allocator); Random rnd(301); - // 8500 = 8169 + 354, so there should be 2 chunks after split. + // 8500 = 8169 + 233 + 98, so there should be 3 chunks after split. size_t str_size{8500}; std::string str = rnd.RandomString(static_cast(str_size)); size_t charge{0}; CacheValueChunk* chunks_head = sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge); - ASSERT_EQ(charge, str_size + 2 * (sizeof(CacheValueChunk) - 1)); + ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1)); CacheAllocationPtr value = sec_cache->MergeChunksIntoValue(chunks_head, charge); @@ -778,11 +780,7 @@ class CompressedSecondaryCacheTest : public testing::Test { std::string value_str{value.get(), charge}; ASSERT_EQ(strcmp(value_str.data(), str.data()), 0); - while (chunks_head != nullptr) { - CacheValueChunk* tmp_chunk = chunks_head; - chunks_head = chunks_head->next; - tmp_chunk->Free(); - } + sec_cache->GetDeletionCallback(true)("dummy", chunks_head); } private: @@ -799,113 +797,150 @@ Cache::CacheItemHelper CompressedSecondaryCacheTest::helper_fail_( CompressedSecondaryCacheTest::SaveToCallbackFail, CompressedSecondaryCacheTest::DeletionCallback); -TEST_F(CompressedSecondaryCacheTest, BasicTestWithNoCompression) { - BasicTest(false, false); -} +class CompressedSecCacheTestWithCompressAndAllocatorParam + : public CompressedSecondaryCacheTest, + public ::testing::WithParamInterface> { + public: + CompressedSecCacheTestWithCompressAndAllocatorParam() { + sec_cache_is_compressed_ = std::get<0>(GetParam()); + use_jemalloc_ = std::get<1>(GetParam()); + } + bool sec_cache_is_compressed_; + bool use_jemalloc_; +}; -TEST_F(CompressedSecondaryCacheTest, - BasicTestWithMemoryAllocatorAndNoCompression) { - BasicTest(false, true); +TEST_P(CompressedSecCacheTestWithCompressAndAllocatorParam, BasicTes) { + BasicTest(sec_cache_is_compressed_, use_jemalloc_); } -TEST_F(CompressedSecondaryCacheTest, BasicTestWithCompression) { - BasicTest(true, false); -} +INSTANTIATE_TEST_CASE_P(CompressedSecCacheTests, + CompressedSecCacheTestWithCompressAndAllocatorParam, + ::testing::Combine(testing::Bool(), testing::Bool())); -TEST_F(CompressedSecondaryCacheTest, - BasicTestWithMemoryAllocatorAndCompression) { - BasicTest(true, true); -} +class CompressedSecondaryCacheTestWithCompressionParam + : public CompressedSecondaryCacheTest, + public ::testing::WithParamInterface { + public: + CompressedSecondaryCacheTestWithCompressionParam() { + sec_cache_is_compressed_ = GetParam(); + } + bool sec_cache_is_compressed_; +}; #ifndef ROCKSDB_LITE -TEST_F(CompressedSecondaryCacheTest, BasicTestFromStringWithNoCompression) { - std::string sec_cache_uri = - "compressed_secondary_cache://" - "capacity=2048;num_shard_bits=0;compression_type=kNoCompression"; - std::shared_ptr sec_cache; - Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, - &sec_cache); - EXPECT_OK(s); - BasicTestHelper(sec_cache, /*sec_cache_is_compressed=*/false); -} - -TEST_F(CompressedSecondaryCacheTest, BasicTestFromStringWithCompression) { +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, BasicTestFromString) { + std::shared_ptr sec_cache{nullptr}; std::string sec_cache_uri; - bool sec_cache_is_compressed{true}; - if (LZ4_Supported()) { - sec_cache_uri = - "compressed_secondary_cache://" - "capacity=2048;num_shard_bits=0;compression_type=kLZ4Compression;" - "compress_format_version=2"; + if (sec_cache_is_compressed_) { + if (LZ4_Supported()) { + sec_cache_uri = + "compressed_secondary_cache://" + "capacity=2048;num_shard_bits=0;compression_type=kLZ4Compression;" + "compress_format_version=2"; + } else { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + sec_cache_uri = + "compressed_secondary_cache://" + "capacity=2048;num_shard_bits=0;compression_type=kNoCompression"; + sec_cache_is_compressed_ = false; + } + Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, + &sec_cache); + EXPECT_OK(s); } else { - ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); sec_cache_uri = "compressed_secondary_cache://" "capacity=2048;num_shard_bits=0;compression_type=kNoCompression"; - sec_cache_is_compressed = false; + Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, + &sec_cache); + EXPECT_OK(s); } - - std::shared_ptr sec_cache; - Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, - &sec_cache); - EXPECT_OK(s); - BasicTestHelper(sec_cache, sec_cache_is_compressed); -} - -#endif // ROCKSDB_LITE - -TEST_F(CompressedSecondaryCacheTest, FailsTestWithNoCompression) { - FailsTest(false); + BasicTestHelper(sec_cache, sec_cache_is_compressed_); } -TEST_F(CompressedSecondaryCacheTest, FailsTestWithCompression) { - FailsTest(true); +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, + BasicTestFromStringWithSplit) { + std::shared_ptr sec_cache{nullptr}; + std::string sec_cache_uri; + if (sec_cache_is_compressed_) { + if (LZ4_Supported()) { + sec_cache_uri = + "compressed_secondary_cache://" + "capacity=2048;num_shard_bits=0;compression_type=kLZ4Compression;" + "compress_format_version=2;enable_custom_split_merge=true"; + } else { + ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); + sec_cache_uri = + "compressed_secondary_cache://" + "capacity=2048;num_shard_bits=0;compression_type=kNoCompression;" + "enable_custom_split_merge=true"; + sec_cache_is_compressed_ = false; + } + Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, + &sec_cache); + EXPECT_OK(s); + } else { + sec_cache_uri = + "compressed_secondary_cache://" + "capacity=2048;num_shard_bits=0;compression_type=kNoCompression;" + "enable_custom_split_merge=true"; + Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, + &sec_cache); + EXPECT_OK(s); + } + BasicTestHelper(sec_cache, sec_cache_is_compressed_); } -TEST_F(CompressedSecondaryCacheTest, BasicIntegrationTestWithNoCompression) { - BasicIntegrationTest(false); -} +#endif // ROCKSDB_LITE -TEST_F(CompressedSecondaryCacheTest, BasicIntegrationTestWithCompression) { - BasicIntegrationTest(true); +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, FailsTest) { + FailsTest(sec_cache_is_compressed_); } -TEST_F(CompressedSecondaryCacheTest, - BasicIntegrationFailTestWithNoCompression) { - BasicIntegrationFailTest(false); +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, + BasicIntegrationFailTest) { + BasicIntegrationFailTest(sec_cache_is_compressed_); } -TEST_F(CompressedSecondaryCacheTest, BasicIntegrationFailTestWithCompression) { - BasicIntegrationFailTest(true); +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, + IntegrationSaveFailTest) { + IntegrationSaveFailTest(sec_cache_is_compressed_); } -TEST_F(CompressedSecondaryCacheTest, IntegrationSaveFailTestWithNoCompression) { - IntegrationSaveFailTest(false); +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, + IntegrationCreateFailTest) { + IntegrationCreateFailTest(sec_cache_is_compressed_); } -TEST_F(CompressedSecondaryCacheTest, IntegrationSaveFailTestWithCompression) { - IntegrationSaveFailTest(true); +TEST_P(CompressedSecondaryCacheTestWithCompressionParam, + IntegrationFullCapacityTest) { + IntegrationFullCapacityTest(sec_cache_is_compressed_); } -TEST_F(CompressedSecondaryCacheTest, - IntegrationCreateFailTestWithNoCompression) { - IntegrationCreateFailTest(false); -} +INSTANTIATE_TEST_CASE_P(CompressedSecCacheTests, + CompressedSecondaryCacheTestWithCompressionParam, + testing::Bool()); -TEST_F(CompressedSecondaryCacheTest, IntegrationCreateFailTestWithCompression) { - IntegrationCreateFailTest(true); -} +class CompressedSecCacheTestWithCompressAndSplitParam + : public CompressedSecondaryCacheTest, + public ::testing::WithParamInterface> { + public: + CompressedSecCacheTestWithCompressAndSplitParam() { + sec_cache_is_compressed_ = std::get<0>(GetParam()); + enable_custom_split_merge_ = std::get<1>(GetParam()); + } + bool sec_cache_is_compressed_; + bool enable_custom_split_merge_; +}; -TEST_F(CompressedSecondaryCacheTest, - IntegrationFullCapacityTestWithNoCompression) { - IntegrationFullCapacityTest(false); +TEST_P(CompressedSecCacheTestWithCompressAndSplitParam, BasicIntegrationTest) { + BasicIntegrationTest(sec_cache_is_compressed_, enable_custom_split_merge_); } -TEST_F(CompressedSecondaryCacheTest, - IntegrationFullCapacityTestWithCompression) { - IntegrationFullCapacityTest(true); -} +INSTANTIATE_TEST_CASE_P(CompressedSecCacheTests, + CompressedSecCacheTestWithCompressAndSplitParam, + ::testing::Combine(testing::Bool(), testing::Bool())); TEST_F(CompressedSecondaryCacheTest, SplitValueIntoChunksTest) { SplitValueIntoChunksTest(); diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 004c30376..658c5485f 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -182,6 +182,10 @@ struct CompressedSecondaryCacheOptions : LRUCacheOptions { // header in varint32 format. uint32_t compress_format_version = 2; + // Enable the custom split and merge feature, which split the compressed value + // into chunks so that they may better fit jemalloc bins. + bool enable_custom_split_merge = false; + CompressedSecondaryCacheOptions() {} CompressedSecondaryCacheOptions( size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit, @@ -191,13 +195,15 @@ struct CompressedSecondaryCacheOptions : LRUCacheOptions { CacheMetadataChargePolicy _metadata_charge_policy = kDefaultCacheMetadataChargePolicy, CompressionType _compression_type = CompressionType::kLZ4Compression, - uint32_t _compress_format_version = 2) + uint32_t _compress_format_version = 2, + bool _enable_custom_split_merge = false) : LRUCacheOptions(_capacity, _num_shard_bits, _strict_capacity_limit, _high_pri_pool_ratio, std::move(_memory_allocator), _use_adaptive_mutex, _metadata_charge_policy, _low_pri_pool_ratio), compression_type(_compression_type), - compress_format_version(_compress_format_version) {} + compress_format_version(_compress_format_version), + enable_custom_split_merge(_enable_custom_split_merge) {} }; // EXPERIMENTAL @@ -211,7 +217,8 @@ extern std::shared_ptr NewCompressedSecondaryCache( CacheMetadataChargePolicy metadata_charge_policy = kDefaultCacheMetadataChargePolicy, CompressionType compression_type = CompressionType::kLZ4Compression, - uint32_t compress_format_version = 2); + uint32_t compress_format_version = 2, + bool enable_custom_split_merge = false); extern std::shared_ptr NewCompressedSecondaryCache( const CompressedSecondaryCacheOptions& opts); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 2f7af92e3..1c38eb587 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -178,7 +178,8 @@ default_params = { "wal_compression": lambda: random.choice(["none", "zstd"]), "verify_sst_unique_id_in_manifest": 1, # always do unique_id verification "secondary_cache_uri": lambda: random.choice( - ["", "compressed_secondary_cache://capacity=8388608"]), + ["", "compressed_secondary_cache://capacity=8388608", + "compressed_secondary_cache://capacity=8388608;enable_custom_split_merge=true"]), "allow_data_in_errors": True, "readahead_size": lambda: random.choice([0, 16384, 524288]), "initial_auto_readahead_size": lambda: random.choice([0, 16384, 524288]),