diff --git a/HISTORY.md b/HISTORY.md index 1ee8d9baf..8be7cbf97 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -23,6 +23,7 @@ ### Behavior Change * Added checksum handshake during the copying of decompressed WAL fragment. This together with #9875, #10037, #10212, #10114 and #10319 provides end-to-end integrity protection for write batch during recovery. +* To minimize the internal fragmentation caused by the variable size of the compressed blocks in `CompressedSecondaryCache`, the original block is split according to the jemalloc bin size in `Insert()` and then merged back in `Lookup()`. * PosixLogger is removed and by default EnvLogger will be used for info logging. The behavior of the two loggers should be very similar when using the default Posix Env. * Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly. diff --git a/cache/compressed_secondary_cache.cc b/cache/compressed_secondary_cache.cc index d52d94c94..919f651a4 100644 --- a/cache/compressed_secondary_cache.cc +++ b/cache/compressed_secondary_cache.cc @@ -5,6 +5,8 @@ #include "cache/compressed_secondary_cache.h" +#include +#include #include #include "memory/memory_allocator.h" @@ -13,15 +15,6 @@ namespace ROCKSDB_NAMESPACE { -namespace { - -void DeletionCallback(const Slice& /*key*/, void* obj) { - delete reinterpret_cast(obj); - obj = nullptr; -} - -} // namespace - CompressedSecondaryCache::CompressedSecondaryCache( size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, @@ -49,26 +42,29 @@ std::unique_ptr CompressedSecondaryCache::Lookup( return handle; } - CacheAllocationPtr* ptr = - reinterpret_cast(cache_->Value(lru_handle)); - void* value = nullptr; - size_t charge = 0; - Status s; + CacheValueChunk* handle_value = + reinterpret_cast(cache_->Value(lru_handle)); + size_t handle_value_charge{0}; + CacheAllocationPtr merged_value = + MergeChunksIntoValue(handle_value, handle_value_charge); + Status s; + void* value{nullptr}; + size_t charge{0}; if (cache_options_.compression_type == kNoCompression) { - s = create_cb(ptr->get(), cache_->GetCharge(lru_handle), &value, &charge); + s = create_cb(merged_value.get(), handle_value_charge, &value, &charge); } else { UncompressionContext uncompression_context(cache_options_.compression_type); UncompressionInfo uncompression_info(uncompression_context, UncompressionDict::GetEmptyDict(), cache_options_.compression_type); - size_t uncompressed_size = 0; + size_t uncompressed_size{0}; CacheAllocationPtr uncompressed; - uncompressed = UncompressData( - uncompression_info, (char*)ptr->get(), cache_->GetCharge(lru_handle), - &uncompressed_size, cache_options_.compress_format_version, - cache_options_.memory_allocator.get()); + uncompressed = UncompressData(uncompression_info, (char*)merged_value.get(), + handle_value_charge, &uncompressed_size, + cache_options_.compress_format_version, + cache_options_.memory_allocator.get()); if (!uncompressed) { cache_->Release(lru_handle, /* erase_if_last_ref */ true); @@ -104,7 +100,7 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value, if (cache_options_.compression_type != kNoCompression) { CompressionOptions compression_opts; CompressionContext compression_context(cache_options_.compression_type); - uint64_t sample_for_compression = 0; + uint64_t sample_for_compression{0}; CompressionInfo compression_info( compression_opts, compression_context, CompressionDict::GetEmptyDict(), cache_options_.compression_type, sample_for_compression); @@ -118,14 +114,12 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value, } val = Slice(compressed_val); - size = compressed_val.size(); - ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); - memcpy(ptr.get(), compressed_val.data(), size); } - CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); - - return cache_->Insert(key, buf, size, DeletionCallback); + size_t charge{0}; + CacheValueChunk* value_chunks_head = + SplitValueIntoChunks(val, cache_options_.compression_type, charge); + return cache_->Insert(key, value_chunks_head, charge, DeletionCallback); } void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } @@ -133,7 +127,7 @@ void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } std::string CompressedSecondaryCache::GetPrintableOptions() const { std::string ret; ret.reserve(20000); - const int kBufferSize = 200; + const int kBufferSize{200}; char buffer[kBufferSize]; ret.append(cache_->GetPrintableOptions()); snprintf(buffer, kBufferSize, " compression_type : %s\n", @@ -145,6 +139,87 @@ std::string CompressedSecondaryCache::GetPrintableOptions() const { return ret; } +CompressedSecondaryCache::CacheValueChunk* +CompressedSecondaryCache::SplitValueIntoChunks( + const Slice& value, const CompressionType compression_type, + size_t& charge) { + assert(!value.empty()); + const char* src_ptr = value.data(); + size_t src_size{value.size()}; + + CacheValueChunk dummy_head = CacheValueChunk(); + CacheValueChunk* current_chunk = &dummy_head; + CacheAllocationPtr ptr; + // Do not split when value size is large or there is no compression. + size_t predicted_chunk_size{0}; + size_t actual_chunk_size{0}; + size_t tmp_size{0}; + while (src_size > 0) { + predicted_chunk_size = sizeof(CacheValueChunk) - 1 + src_size; + auto upper = + std::upper_bound(malloc_bin_sizes_.begin(), malloc_bin_sizes_.end(), + predicted_chunk_size); + // Do not split when value size is too small, too large, close to a bin + // size, or there is no compression. + if (upper == malloc_bin_sizes_.begin() || + upper == malloc_bin_sizes_.end() || + *upper - predicted_chunk_size < malloc_bin_sizes_.front() || + compression_type == kNoCompression) { + tmp_size = predicted_chunk_size; + } else { + tmp_size = *(--upper); + } + + ptr = AllocateBlock(tmp_size, cache_options_.memory_allocator.get()); + current_chunk->next = reinterpret_cast(ptr.release()); + current_chunk = current_chunk->next; + actual_chunk_size = tmp_size - sizeof(CacheValueChunk) + 1; + memcpy(current_chunk->data, src_ptr, actual_chunk_size); + current_chunk->size = actual_chunk_size; + src_ptr += actual_chunk_size; + src_size -= actual_chunk_size; + charge += tmp_size; + } + current_chunk->next = nullptr; + + return dummy_head.next; +} + +CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue( + const void* chunks_head, size_t& charge) { + const CacheValueChunk* head = + reinterpret_cast(chunks_head); + const CacheValueChunk* current_chunk = head; + charge = 0; + while (current_chunk != nullptr) { + charge += current_chunk->size; + current_chunk = current_chunk->next; + } + + CacheAllocationPtr ptr = + AllocateBlock(charge, cache_options_.memory_allocator.get()); + current_chunk = head; + size_t pos{0}; + while (current_chunk != nullptr) { + memcpy(ptr.get() + pos, current_chunk->data, current_chunk->size); + pos += current_chunk->size; + current_chunk = current_chunk->next; + } + + return ptr; +} + +void CompressedSecondaryCache::DeletionCallback(const Slice& /*key*/, + void* obj) { + CacheValueChunk* chunks_head = reinterpret_cast(obj); + while (chunks_head != nullptr) { + CacheValueChunk* tmp_chunk = chunks_head; + chunks_head = chunks_head->next; + tmp_chunk->Free(); + } + obj = nullptr; +} + std::shared_ptr NewCompressedSecondaryCache( size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, diff --git a/cache/compressed_secondary_cache.h b/cache/compressed_secondary_cache.h index 2ed6784ce..e5ca55a33 100644 --- a/cache/compressed_secondary_cache.h +++ b/cache/compressed_secondary_cache.h @@ -5,6 +5,8 @@ #pragma once +#include +#include #include #include "cache/lru_cache.h" @@ -58,7 +60,7 @@ class CompressedSecondaryCache : public SecondaryCache { std::shared_ptr memory_allocator = nullptr, bool use_adaptive_mutex = kDefaultToAdaptiveMutex, CacheMetadataChargePolicy metadata_charge_policy = - kDontChargeCacheMetadata, + kDefaultCacheMetadataChargePolicy, CompressionType compression_type = CompressionType::kLZ4Compression, uint32_t compress_format_version = 2); virtual ~CompressedSecondaryCache() override; @@ -79,6 +81,36 @@ class CompressedSecondaryCache : public SecondaryCache { std::string GetPrintableOptions() const override; private: + friend class CompressedSecondaryCacheTest; + static constexpr std::array malloc_bin_sizes_{ + 32, 64, 96, 128, 160, 192, 224, 256, 320, 384, 448, + 512, 640, 768, 896, 1024, 1280, 1536, 1792, 2048, 2560, 3072, + 3584, 4096, 5120, 6144, 7168, 8192, 10240, 12288, 14336, 16384, 32768}; + + struct CacheValueChunk { + // TODO try "CacheAllocationPtr next;". + CacheValueChunk* next; + size_t size; + // Beginning of the chunk data (MUST BE THE LAST FIELD IN THIS STRUCT!) + char data[1]; + + void Free() { delete[] reinterpret_cast(this); } + }; + + // Split value into chunks to better fit into jemalloc bins. The chunks + // are stored in CacheValueChunk and extra charge is needed for each chunk, + // so the cache charge is recalculated here. + CacheValueChunk* SplitValueIntoChunks(const Slice& value, + const CompressionType compression_type, + size_t& charge); + + // After merging chunks, the extra charge for each chunk is removed, so + // the charge is recalculated. + CacheAllocationPtr MergeChunksIntoValue(const void* chunks_head, + size_t& charge); + + // An implementation of Cache::DeleterFn. + static void DeletionCallback(const Slice& /*key*/, void* obj); std::shared_ptr cache_; CompressedSecondaryCacheOptions cache_options_; }; diff --git a/cache/compressed_secondary_cache_test.cc b/cache/compressed_secondary_cache_test.cc index 757625ba9..76f047b57 100644 --- a/cache/compressed_secondary_cache_test.cc +++ b/cache/compressed_secondary_cache_test.cc @@ -7,9 +7,12 @@ #include #include +#include +#include "cache/lru_cache.h" #include "memory/jemalloc_nodump_allocator.h" #include "memory/memory_allocator.h" +#include "rocksdb/compression_type.h" #include "rocksdb/convenience.h" #include "rocksdb/secondary_cache.h" #include "test_util/testharness.h" @@ -136,7 +139,6 @@ class CompressedSecondaryCacheTest : public testing::Test { CompressedSecondaryCacheOptions opts; opts.capacity = 2048; opts.num_shard_bits = 0; - opts.metadata_charge_policy = kDontChargeCacheMetadata; if (sec_cache_is_compressed) { if (!LZ4_Supported()) { @@ -162,6 +164,8 @@ class CompressedSecondaryCacheTest : public testing::Test { } std::shared_ptr sec_cache = NewCompressedSecondaryCache(opts); + + BasicTestHelper(sec_cache); } void FailsTest(bool sec_cache_is_compressed) { @@ -177,7 +181,6 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.capacity = 1100; secondary_cache_opts.num_shard_bits = 0; - secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; std::shared_ptr sec_cache = NewCompressedSecondaryCache(secondary_cache_opts); @@ -235,34 +238,35 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.capacity = 2300; secondary_cache_opts.num_shard_bits = 0; - secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; std::shared_ptr secondary_cache = NewCompressedSecondaryCache(secondary_cache_opts); - LRUCacheOptions lru_cache_opts(1024, 0, false, 0.5, nullptr, - kDefaultToAdaptiveMutex, - kDontChargeCacheMetadata); + LRUCacheOptions lru_cache_opts(1300, 0, /*_strict_capacity_limit=*/false, + 0.5, nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy); lru_cache_opts.secondary_cache = secondary_cache; std::shared_ptr cache = NewLRUCache(lru_cache_opts); std::shared_ptr stats = CreateDBStatistics(); Random rnd(301); - std::string str1 = rnd.RandomString(1010); + std::string str1; + test::CompressibleString(&rnd, 0.5, 1001, &str1); std::string str1_clone{str1}; TestItem* item1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_, str1.length())); - std::string str2 = rnd.RandomString(1020); + std::string str2; + test::CompressibleString(&rnd, 0.5, 1012, &str2); TestItem* item2 = new TestItem(str2.data(), str2.length()); - // After Insert, lru cache contains k2 and secondary cache contains k1. + // After Insert, cache contains k2 and secondary cache contains k1. ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, str2.length())); - std::string str3 = rnd.RandomString(1020); + std::string str3; + test::CompressibleString(&rnd, 0.5, 1024, &str3); TestItem* item3 = new TestItem(str3.data(), str3.length()); - // After Insert, lru cache contains k3 and secondary cache contains k1 and - // k2 + // After Insert, cache contains k3 and secondary cache contains k1 and k2. ASSERT_OK(cache->Insert("k3", item3, &CompressedSecondaryCacheTest::helper_, str3.length())); @@ -287,7 +291,6 @@ class CompressedSecondaryCacheTest : public testing::Test { handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true, stats.get()); - ASSERT_NE(handle, nullptr); TestItem* val1_1 = static_cast(cache->Value(handle)); ASSERT_NE(val1_1, nullptr); @@ -316,19 +319,20 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.compression_type = CompressionType::kNoCompression; } - secondary_cache_opts.capacity = 2048; + secondary_cache_opts.capacity = 2300; secondary_cache_opts.num_shard_bits = 0; - secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; std::shared_ptr secondary_cache = NewCompressedSecondaryCache(secondary_cache_opts); - LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, - kDontChargeCacheMetadata); + LRUCacheOptions opts(1024, 0, /*_strict_capacity_limit=*/false, 0.5, + nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy); opts.secondary_cache = secondary_cache; std::shared_ptr cache = NewLRUCache(opts); Random rnd(301); - std::string str1 = rnd.RandomString(1020); + std::string str1; + test::CompressibleString(&rnd, 0.5, 1001, &str1); auto item1 = std::unique_ptr(new TestItem(str1.data(), str1.length())); ASSERT_NOK(cache->Insert("k1", item1.get(), nullptr, str1.length())); @@ -361,25 +365,28 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.compression_type = CompressionType::kNoCompression; } - secondary_cache_opts.capacity = 2048; + secondary_cache_opts.capacity = 2300; secondary_cache_opts.num_shard_bits = 0; - secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; std::shared_ptr secondary_cache = NewCompressedSecondaryCache(secondary_cache_opts); - LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, - kDontChargeCacheMetadata); + LRUCacheOptions opts(1200, 0, /*_strict_capacity_limit=*/false, 0.5, + nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy); opts.secondary_cache = secondary_cache; std::shared_ptr cache = NewLRUCache(opts); Random rnd(301); - std::string str1 = rnd.RandomString(1020); + std::string str1; + test::CompressibleString(&rnd, 0.5, 1001, &str1); TestItem* item1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_fail_, str1.length())); - std::string str2 = rnd.RandomString(1020); + + std::string str2; + test::CompressibleString(&rnd, 0.5, 1002, &str2); TestItem* item2 = new TestItem(str2.data(), str2.length()); // k1 should be demoted to the secondary cache. ASSERT_OK(cache->Insert("k2", item2, @@ -417,25 +424,27 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.compression_type = CompressionType::kNoCompression; } - secondary_cache_opts.capacity = 2048; + secondary_cache_opts.capacity = 2300; secondary_cache_opts.num_shard_bits = 0; - secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; std::shared_ptr secondary_cache = NewCompressedSecondaryCache(secondary_cache_opts); - LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, - kDontChargeCacheMetadata); + LRUCacheOptions opts(1200, 0, /*_strict_capacity_limit=*/false, 0.5, + nullptr, kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy); opts.secondary_cache = secondary_cache; std::shared_ptr cache = NewLRUCache(opts); Random rnd(301); - std::string str1 = rnd.RandomString(1020); + std::string str1; + test::CompressibleString(&rnd, 0.5, 1001, &str1); TestItem* item1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_, str1.length())); - std::string str2 = rnd.RandomString(1020); + std::string str2; + test::CompressibleString(&rnd, 0.5, 1002, &str2); TestItem* item2 = new TestItem(str2.data(), str2.length()); // k1 should be demoted to the secondary cache. ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, @@ -473,24 +482,28 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache_opts.compression_type = CompressionType::kNoCompression; } - secondary_cache_opts.capacity = 2048; + secondary_cache_opts.capacity = 2300; secondary_cache_opts.num_shard_bits = 0; - secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata; std::shared_ptr secondary_cache = NewCompressedSecondaryCache(secondary_cache_opts); - LRUCacheOptions opts(1024, 0, /*_strict_capacity_limit=*/true, 0.5, nullptr, - kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); + LRUCacheOptions opts(1200, 0, /*_strict_capacity_limit=*/true, 0.5, nullptr, + kDefaultToAdaptiveMutex, + kDefaultCacheMetadataChargePolicy); opts.secondary_cache = secondary_cache; std::shared_ptr cache = NewLRUCache(opts); Random rnd(301); - std::string str1 = rnd.RandomString(1020); + std::string str1; + test::CompressibleString(&rnd, 0.5, 1001, &str1); TestItem* item1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_, str1.length())); - std::string str2 = rnd.RandomString(1020); + + std::string str2; + test::CompressibleString(&rnd, 0.5, 1002, &str2); + std::string str2_clone{str2}; TestItem* item2 = new TestItem(str2.data(), str2.length()); // k1 should be demoted to the secondary cache. ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, @@ -501,8 +514,9 @@ class CompressedSecondaryCacheTest : public testing::Test { test_item_creator, Cache::Priority::LOW, true); ASSERT_NE(handle2, nullptr); cache->Release(handle2); - // k1 promotion should fail due to the block cache being at capacity, - // but the lookup should still succeed + + // k1 promotion should fail because cache is at capacity and + // strict_capacity_limit is true, but the lookup should still succeed. Cache::Handle* handle1; handle1 = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true); @@ -519,6 +533,134 @@ class CompressedSecondaryCacheTest : public testing::Test { secondary_cache.reset(); } + void SplitValueIntoChunksTest() { + JemallocAllocatorOptions jopts; + std::shared_ptr allocator; + std::string msg; + if (JemallocNodumpAllocator::IsSupported(&msg)) { + Status s = NewJemallocNodumpAllocator(jopts, &allocator); + if (!s.ok()) { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + } else { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + + using CacheValueChunk = CompressedSecondaryCache::CacheValueChunk; + std::unique_ptr sec_cache = + std::make_unique(1000, 0, true, 0.5, + allocator); + Random rnd(301); + // 10000 = 8169 + 1769 + 62 , so there should be 3 chunks after split. + size_t str_size{10000}; + std::string str = rnd.RandomString(static_cast(str_size)); + size_t charge{0}; + CacheValueChunk* chunks_head = + sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge); + ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1)); + + CacheValueChunk* current_chunk = chunks_head; + ASSERT_EQ(current_chunk->size, 8192 - sizeof(CacheValueChunk) + 1); + current_chunk = current_chunk->next; + ASSERT_EQ(current_chunk->size, 1792 - sizeof(CacheValueChunk) + 1); + current_chunk = current_chunk->next; + ASSERT_EQ(current_chunk->size, 62); + + sec_cache->DeletionCallback("dummy", chunks_head); + } + + void MergeChunksIntoValueTest() { + JemallocAllocatorOptions jopts; + std::shared_ptr allocator; + std::string msg; + if (JemallocNodumpAllocator::IsSupported(&msg)) { + Status s = NewJemallocNodumpAllocator(jopts, &allocator); + if (!s.ok()) { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + } else { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + + using CacheValueChunk = CompressedSecondaryCache::CacheValueChunk; + Random rnd(301); + size_t size1{2048}; + std::string str1 = rnd.RandomString(static_cast(size1)); + CacheAllocationPtr ptr = + AllocateBlock(sizeof(CacheValueChunk) - 1 + size1, allocator.get()); + CacheValueChunk* current_chunk = + reinterpret_cast(ptr.release()); + CacheValueChunk* chunks_head = current_chunk; + memcpy(current_chunk->data, str1.data(), size1); + current_chunk->size = size1; + + size_t size2{256}; + std::string str2 = rnd.RandomString(static_cast(size2)); + ptr = AllocateBlock(sizeof(CacheValueChunk) - 1 + size2, allocator.get()); + current_chunk->next = reinterpret_cast(ptr.release()); + current_chunk = current_chunk->next; + memcpy(current_chunk->data, str2.data(), size2); + current_chunk->size = size2; + + size_t size3{31}; + std::string str3 = rnd.RandomString(static_cast(size3)); + ptr = AllocateBlock(sizeof(CacheValueChunk) - 1 + size3, allocator.get()); + current_chunk->next = reinterpret_cast(ptr.release()); + current_chunk = current_chunk->next; + memcpy(current_chunk->data, str3.data(), size3); + current_chunk->size = size3; + current_chunk->next = nullptr; + + std::string str = str1 + str2 + str3; + + std::unique_ptr sec_cache = + std::make_unique(1000, 0, true, 0.5, + allocator); + size_t charge{0}; + CacheAllocationPtr value = + sec_cache->MergeChunksIntoValue(chunks_head, charge); + ASSERT_EQ(charge, size1 + size2 + size3); + std::string value_str{value.get(), charge}; + ASSERT_EQ(strcmp(value_str.data(), str.data()), 0); + + sec_cache->DeletionCallback("dummy", chunks_head); + } + + void SplictValueAndMergeChunksTest() { + JemallocAllocatorOptions jopts; + std::shared_ptr allocator; + std::string msg; + if (JemallocNodumpAllocator::IsSupported(&msg)) { + Status s = NewJemallocNodumpAllocator(jopts, &allocator); + if (!s.ok()) { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + } else { + ROCKSDB_GTEST_BYPASS("JEMALLOC not supported"); + } + + using CacheValueChunk = CompressedSecondaryCache::CacheValueChunk; + std::unique_ptr sec_cache = + std::make_unique(1000, 0, true, 0.5, + allocator); + Random rnd(301); + // 10000 = 8169 + 1769 + 62 , so there should be 3 chunks after split. + size_t str_size{10000}; + std::string str = rnd.RandomString(static_cast(str_size)); + size_t charge{0}; + CacheValueChunk* chunks_head = + sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge); + ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1)); + + CacheAllocationPtr value = + sec_cache->MergeChunksIntoValue(chunks_head, charge); + ASSERT_EQ(charge, str_size); + std::string value_str{value.get(), charge}; + ASSERT_EQ(strcmp(value_str.data(), str.data()), 0); + + sec_cache->DeletionCallback("dummy", chunks_head); + } + private: bool fail_create_; }; @@ -639,6 +781,18 @@ TEST_F(CompressedSecondaryCacheTest, IntegrationFullCapacityTest(true); } +TEST_F(CompressedSecondaryCacheTest, SplitValueIntoChunksTest) { + SplitValueIntoChunksTest(); +} + +TEST_F(CompressedSecondaryCacheTest, MergeChunksIntoValueTest) { + MergeChunksIntoValueTest(); +} + +TEST_F(CompressedSecondaryCacheTest, SplictValueAndMergeChunksTest) { + SplictValueAndMergeChunksTest(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {