Add enable_split_merge option for CompressedSecondaryCache (#10690)

Summary:
`enable_custom_split_merge` is added for enabling the custom split and merge feature, which split the compressed value into chunks so that they may better fit jemalloc bins.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10690

Test Plan:
Unit Tests
Stress Tests

Reviewed By: anand1976

Differential Revision: D39567604

Pulled By: gitbw95

fbshipit-source-id: f6d1d46200f365220055f793514601dcb0edc4b7
main
gitbw95 2 years ago committed by Facebook GitHub Bot
parent e053ccde99
commit 2cc5b39560
  1. 1
      HISTORY.md
  2. 5
      cache/cache.cc
  3. 74
      cache/compressed_secondary_cache.cc
  4. 11
      cache/compressed_secondary_cache.h
  5. 187
      cache/compressed_secondary_cache_test.cc
  6. 13
      include/rocksdb/cache.h
  7. 3
      tools/db_crashtest.py

@ -33,6 +33,7 @@
* Added new perf context counters `block_cache_standalone_handle_count`, `block_cache_real_handle_count`,`compressed_sec_cache_insert_real_count`, `compressed_sec_cache_insert_dummy_count`, `compressed_sec_cache_uncompressed_bytes`, and `compressed_sec_cache_compressed_bytes`. * Added new perf context counters `block_cache_standalone_handle_count`, `block_cache_real_handle_count`,`compressed_sec_cache_insert_real_count`, `compressed_sec_cache_insert_dummy_count`, `compressed_sec_cache_uncompressed_bytes`, and `compressed_sec_cache_compressed_bytes`.
* Memory for blobs which are to be inserted into the blob cache is now allocated using the cache's allocator (see #10628 and #10647). * Memory for blobs which are to be inserted into the blob cache is now allocated using the cache's allocator (see #10628 and #10647).
* HyperClockCache is an experimental, lock-free Cache alternative for block cache that offers much improved CPU efficiency under high parallel load or high contention, with some caveats. As much as 4.5x higher ops/sec vs. LRUCache has been seen in db_bench under high parallel load. * HyperClockCache is an experimental, lock-free Cache alternative for block cache that offers much improved CPU efficiency under high parallel load or high contention, with some caveats. As much as 4.5x higher ops/sec vs. LRUCache has been seen in db_bench under high parallel load.
* `CompressedSecondaryCacheOptions::enable_custom_split_merge` is added for enabling the custom split and merge feature, which split the compressed value into chunks so that they may better fit jemalloc bins.
### Performance Improvements ### Performance Improvements
* Iterator performance is improved for `DeleteRange()` users. Internally, iterator will skip to the end of a range tombstone when possible, instead of looping through each key and check individually if a key is range deleted. * Iterator performance is improved for `DeleteRange()` users. Internally, iterator will skip to the end of a range tombstone when possible, instead of looping through each key and check individually if a key is range deleted.

5
cache/cache.cc vendored

@ -58,6 +58,11 @@ static std::unordered_map<std::string, OptionTypeInfo>
compress_format_version), compress_format_version),
OptionType::kUInt32T, OptionVerificationType::kNormal, OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}}, OptionTypeFlags::kMutable}},
{"enable_custom_split_merge",
{offsetof(struct CompressedSecondaryCacheOptions,
enable_custom_split_merge),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
}; };
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -21,11 +21,13 @@ CompressedSecondaryCache::CompressedSecondaryCache(
double high_pri_pool_ratio, double low_pri_pool_ratio, double high_pri_pool_ratio, double low_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex, std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
CacheMetadataChargePolicy metadata_charge_policy, CacheMetadataChargePolicy metadata_charge_policy,
CompressionType compression_type, uint32_t compress_format_version) CompressionType compression_type, uint32_t compress_format_version,
bool enable_custom_split_merge)
: cache_options_(capacity, num_shard_bits, strict_capacity_limit, : cache_options_(capacity, num_shard_bits, strict_capacity_limit,
high_pri_pool_ratio, low_pri_pool_ratio, memory_allocator, high_pri_pool_ratio, low_pri_pool_ratio, memory_allocator,
use_adaptive_mutex, metadata_charge_policy, use_adaptive_mutex, metadata_charge_policy,
compression_type, compress_format_version) { compression_type, compress_format_version,
enable_custom_split_merge) {
cache_ = cache_ =
NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, NewLRUCache(capacity, num_shard_bits, strict_capacity_limit,
high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, high_pri_pool_ratio, memory_allocator, use_adaptive_mutex,
@ -50,13 +52,24 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
return nullptr; return nullptr;
} }
CacheAllocationPtr* ptr = reinterpret_cast<CacheAllocationPtr*>(handle_value); CacheAllocationPtr* ptr{nullptr};
CacheAllocationPtr merged_value;
size_t handle_value_charge{0};
if (cache_options_.enable_custom_split_merge) {
CacheValueChunk* value_chunk_ptr =
reinterpret_cast<CacheValueChunk*>(handle_value);
merged_value = MergeChunksIntoValue(value_chunk_ptr, handle_value_charge);
ptr = &merged_value;
} else {
ptr = reinterpret_cast<CacheAllocationPtr*>(handle_value);
handle_value_charge = cache_->GetCharge(lru_handle);
}
Status s; Status s;
void* value{nullptr}; void* value{nullptr};
size_t charge{0}; size_t charge{0};
if (cache_options_.compression_type == kNoCompression) { if (cache_options_.compression_type == kNoCompression) {
s = create_cb(ptr->get(), cache_->GetCharge(lru_handle), &value, &charge); s = create_cb(ptr->get(), handle_value_charge, &value, &charge);
} else { } else {
UncompressionContext uncompression_context(cache_options_.compression_type); UncompressionContext uncompression_context(cache_options_.compression_type);
UncompressionInfo uncompression_info(uncompression_context, UncompressionInfo uncompression_info(uncompression_context,
@ -65,7 +78,7 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
size_t uncompressed_size{0}; size_t uncompressed_size{0};
CacheAllocationPtr uncompressed = UncompressData( CacheAllocationPtr uncompressed = UncompressData(
uncompression_info, (char*)ptr->get(), cache_->GetCharge(lru_handle), uncompression_info, (char*)ptr->get(), handle_value_charge,
&uncompressed_size, cache_options_.compress_format_version, &uncompressed_size, cache_options_.compress_format_version,
cache_options_.memory_allocator.get()); cache_options_.memory_allocator.get());
@ -84,7 +97,9 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
if (advise_erase) { if (advise_erase) {
cache_->Release(lru_handle, /*erase_if_last_ref=*/true); cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
// Insert a dummy handle. // Insert a dummy handle.
cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, DeletionCallback) cache_
->Insert(key, /*value=*/nullptr, /*charge=*/0,
GetDeletionCallback(cache_options_.enable_custom_split_merge))
.PermitUncheckedError(); .PermitUncheckedError();
} else { } else {
is_in_sec_cache = true; is_in_sec_cache = true;
@ -101,12 +116,12 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value,
} }
Cache::Handle* lru_handle = cache_->Lookup(key); Cache::Handle* lru_handle = cache_->Lookup(key);
Cache::DeleterFn del_cb =
GetDeletionCallback(cache_options_.enable_custom_split_merge);
if (lru_handle == nullptr) { if (lru_handle == nullptr) {
PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1); PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1);
// Insert a dummy handle if the handle is evicted for the first time. // Insert a dummy handle if the handle is evicted for the first time.
return cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, return cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, del_cb);
DeletionCallback);
} else { } else {
cache_->Release(lru_handle, /*erase_if_last_ref=*/false); cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
} }
@ -142,13 +157,23 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value,
val = Slice(compressed_val); val = Slice(compressed_val);
size = compressed_val.size(); size = compressed_val.size();
PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, size); PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, size);
if (!cache_options_.enable_custom_split_merge) {
ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); ptr = AllocateBlock(size, cache_options_.memory_allocator.get());
memcpy(ptr.get(), compressed_val.data(), size); memcpy(ptr.get(), compressed_val.data(), size);
} }
}
CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr));
PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1); PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1);
return cache_->Insert(key, buf, size, DeletionCallback); if (cache_options_.enable_custom_split_merge) {
size_t charge{0};
CacheValueChunk* value_chunks_head =
SplitValueIntoChunks(val, cache_options_.compression_type, charge);
return cache_->Insert(key, value_chunks_head, charge, del_cb);
} else {
CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr));
return cache_->Insert(key, buf, size, del_cb);
}
} }
void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
@ -238,10 +263,24 @@ CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue(
return ptr; return ptr;
} }
void CompressedSecondaryCache::DeletionCallback(const Slice& /*key*/, Cache::DeleterFn CompressedSecondaryCache::GetDeletionCallback(
void* obj) { bool enable_custom_split_merge) {
if (enable_custom_split_merge) {
return [](const Slice& /*key*/, void* obj) {
CacheValueChunk* chunks_head = reinterpret_cast<CacheValueChunk*>(obj);
while (chunks_head != nullptr) {
CacheValueChunk* tmp_chunk = chunks_head;
chunks_head = chunks_head->next;
tmp_chunk->Free();
obj = nullptr;
};
};
} else {
return [](const Slice& /*key*/, void* obj) {
delete reinterpret_cast<CacheAllocationPtr*>(obj); delete reinterpret_cast<CacheAllocationPtr*>(obj);
obj = nullptr; obj = nullptr;
};
}
} }
std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache( std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
@ -249,11 +288,13 @@ std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
double high_pri_pool_ratio, double low_pri_pool_ratio, double high_pri_pool_ratio, double low_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex, std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
CacheMetadataChargePolicy metadata_charge_policy, CacheMetadataChargePolicy metadata_charge_policy,
CompressionType compression_type, uint32_t compress_format_version) { CompressionType compression_type, uint32_t compress_format_version,
bool enable_custom_split_merge) {
return std::make_shared<CompressedSecondaryCache>( return std::make_shared<CompressedSecondaryCache>(
capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio,
low_pri_pool_ratio, memory_allocator, use_adaptive_mutex, low_pri_pool_ratio, memory_allocator, use_adaptive_mutex,
metadata_charge_policy, compression_type, compress_format_version); metadata_charge_policy, compression_type, compress_format_version,
enable_custom_split_merge);
} }
std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache( std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
@ -264,7 +305,8 @@ std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit, opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit,
opts.high_pri_pool_ratio, opts.low_pri_pool_ratio, opts.memory_allocator, opts.high_pri_pool_ratio, opts.low_pri_pool_ratio, opts.memory_allocator,
opts.use_adaptive_mutex, opts.metadata_charge_policy, opts.use_adaptive_mutex, opts.metadata_charge_policy,
opts.compression_type, opts.compress_format_version); opts.compression_type, opts.compress_format_version,
opts.enable_custom_split_merge);
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -76,7 +76,8 @@ class CompressedSecondaryCache : public SecondaryCache {
CacheMetadataChargePolicy metadata_charge_policy = CacheMetadataChargePolicy metadata_charge_policy =
kDefaultCacheMetadataChargePolicy, kDefaultCacheMetadataChargePolicy,
CompressionType compression_type = CompressionType::kLZ4Compression, CompressionType compression_type = CompressionType::kLZ4Compression,
uint32_t compress_format_version = 2); uint32_t compress_format_version = 2,
bool enable_custom_split_merge = false);
virtual ~CompressedSecondaryCache() override; virtual ~CompressedSecondaryCache() override;
const char* Name() const override { return "CompressedSecondaryCache"; } const char* Name() const override { return "CompressedSecondaryCache"; }
@ -98,10 +99,8 @@ class CompressedSecondaryCache : public SecondaryCache {
private: private:
friend class CompressedSecondaryCacheTest; friend class CompressedSecondaryCacheTest;
static constexpr std::array<uint16_t, 33> malloc_bin_sizes_{ static constexpr std::array<uint16_t, 8> malloc_bin_sizes_{
32, 64, 96, 128, 160, 192, 224, 256, 320, 384, 448, 128, 256, 512, 1024, 2048, 4096, 8192, 16384};
512, 640, 768, 896, 1024, 1280, 1536, 1792, 2048, 2560, 3072,
3584, 4096, 5120, 6144, 7168, 8192, 10240, 12288, 14336, 16384, 32768};
struct CacheValueChunk { struct CacheValueChunk {
// TODO try "CacheAllocationPtr next;". // TODO try "CacheAllocationPtr next;".
@ -126,7 +125,7 @@ class CompressedSecondaryCache : public SecondaryCache {
size_t& charge); size_t& charge);
// An implementation of Cache::DeleterFn. // An implementation of Cache::DeleterFn.
static void DeletionCallback(const Slice& /*key*/, void* obj); static Cache::DeleterFn GetDeletionCallback(bool enable_custom_split_merge);
std::shared_ptr<Cache> cache_; std::shared_ptr<Cache> cache_;
CompressedSecondaryCacheOptions cache_options_; CompressedSecondaryCacheOptions cache_options_;
}; };

@ -8,6 +8,7 @@
#include <algorithm> #include <algorithm>
#include <cstdint> #include <cstdint>
#include <iterator> #include <iterator>
#include <tuple>
#include "cache/lru_cache.h" #include "cache/lru_cache.h"
#include "memory/jemalloc_nodump_allocator.h" #include "memory/jemalloc_nodump_allocator.h"
@ -111,11 +112,12 @@ class CompressedSecondaryCacheTest : public testing::Test {
// Insert and Lookup the item k1 for the second time and advise erasing it. // Insert and Lookup the item k1 for the second time and advise erasing it.
ASSERT_OK(sec_cache->Insert("k1", &item1, ASSERT_OK(sec_cache->Insert("k1", &item1,
&CompressedSecondaryCacheTest::helper_)); &CompressedSecondaryCacheTest::helper_));
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 1);
std::unique_ptr<SecondaryCacheResultHandle> handle1_2 = sec_cache->Lookup( std::unique_ptr<SecondaryCacheResultHandle> handle1_2 = sec_cache->Lookup(
"k1", test_item_creator, true, /*advise_erase=*/true, is_in_sec_cache); "k1", test_item_creator, true, /*advise_erase=*/true, is_in_sec_cache);
ASSERT_NE(handle1_2, nullptr); ASSERT_NE(handle1_2, nullptr);
ASSERT_FALSE(is_in_sec_cache); ASSERT_FALSE(is_in_sec_cache);
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 1);
if (sec_cache_is_compressed) { if (sec_cache_is_compressed) {
ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes, ASSERT_EQ(get_perf_context()->compressed_sec_cache_uncompressed_bytes,
1000); 1000);
@ -282,7 +284,8 @@ class CompressedSecondaryCacheTest : public testing::Test {
sec_cache.reset(); sec_cache.reset();
} }
void BasicIntegrationTest(bool sec_cache_is_compressed) { void BasicIntegrationTest(bool sec_cache_is_compressed,
bool enable_custom_split_merge) {
CompressedSecondaryCacheOptions secondary_cache_opts; CompressedSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) { if (sec_cache_is_compressed) {
@ -297,6 +300,7 @@ class CompressedSecondaryCacheTest : public testing::Test {
secondary_cache_opts.capacity = 6000; secondary_cache_opts.capacity = 6000;
secondary_cache_opts.num_shard_bits = 0; secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.enable_custom_split_merge = enable_custom_split_merge;
std::shared_ptr<SecondaryCache> secondary_cache = std::shared_ptr<SecondaryCache> secondary_cache =
NewCompressedSecondaryCache(secondary_cache_opts); NewCompressedSecondaryCache(secondary_cache_opts);
LRUCacheOptions lru_cache_opts( LRUCacheOptions lru_cache_opts(
@ -680,24 +684,22 @@ class CompressedSecondaryCacheTest : public testing::Test {
std::make_unique<CompressedSecondaryCache>(1000, 0, true, 0.5, 0.0, std::make_unique<CompressedSecondaryCache>(1000, 0, true, 0.5, 0.0,
allocator); allocator);
Random rnd(301); Random rnd(301);
// 8500 = 8169 + 354, so there should be 2 chunks after split. // 8500 = 8169 + 233 + 98, so there should be 3 chunks after split.
size_t str_size{8500}; size_t str_size{8500};
std::string str = rnd.RandomString(static_cast<int>(str_size)); std::string str = rnd.RandomString(static_cast<int>(str_size));
size_t charge{0}; size_t charge{0};
CacheValueChunk* chunks_head = CacheValueChunk* chunks_head =
sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge); sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge);
ASSERT_EQ(charge, str_size + 2 * (sizeof(CacheValueChunk) - 1)); ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1));
CacheValueChunk* current_chunk = chunks_head; CacheValueChunk* current_chunk = chunks_head;
ASSERT_EQ(current_chunk->size, 8192 - sizeof(CacheValueChunk) + 1); ASSERT_EQ(current_chunk->size, 8192 - sizeof(CacheValueChunk) + 1);
current_chunk = current_chunk->next; current_chunk = current_chunk->next;
ASSERT_EQ(current_chunk->size, 354 - sizeof(CacheValueChunk) + 1); ASSERT_EQ(current_chunk->size, 256 - sizeof(CacheValueChunk) + 1);
current_chunk = current_chunk->next;
ASSERT_EQ(current_chunk->size, 98);
while (chunks_head != nullptr) { sec_cache->GetDeletionCallback(true)("dummy", chunks_head);
CacheValueChunk* tmp_chunk = chunks_head;
chunks_head = chunks_head->next;
tmp_chunk->Free();
}
} }
void MergeChunksIntoValueTest() { void MergeChunksIntoValueTest() {
@ -764,13 +766,13 @@ class CompressedSecondaryCacheTest : public testing::Test {
std::make_unique<CompressedSecondaryCache>(1000, 0, true, 0.5, 0.0, std::make_unique<CompressedSecondaryCache>(1000, 0, true, 0.5, 0.0,
allocator); allocator);
Random rnd(301); Random rnd(301);
// 8500 = 8169 + 354, so there should be 2 chunks after split. // 8500 = 8169 + 233 + 98, so there should be 3 chunks after split.
size_t str_size{8500}; size_t str_size{8500};
std::string str = rnd.RandomString(static_cast<int>(str_size)); std::string str = rnd.RandomString(static_cast<int>(str_size));
size_t charge{0}; size_t charge{0};
CacheValueChunk* chunks_head = CacheValueChunk* chunks_head =
sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge); sec_cache->SplitValueIntoChunks(str, kLZ4Compression, charge);
ASSERT_EQ(charge, str_size + 2 * (sizeof(CacheValueChunk) - 1)); ASSERT_EQ(charge, str_size + 3 * (sizeof(CacheValueChunk) - 1));
CacheAllocationPtr value = CacheAllocationPtr value =
sec_cache->MergeChunksIntoValue(chunks_head, charge); sec_cache->MergeChunksIntoValue(chunks_head, charge);
@ -778,11 +780,7 @@ class CompressedSecondaryCacheTest : public testing::Test {
std::string value_str{value.get(), charge}; std::string value_str{value.get(), charge};
ASSERT_EQ(strcmp(value_str.data(), str.data()), 0); ASSERT_EQ(strcmp(value_str.data(), str.data()), 0);
while (chunks_head != nullptr) { sec_cache->GetDeletionCallback(true)("dummy", chunks_head);
CacheValueChunk* tmp_chunk = chunks_head;
chunks_head = chunks_head->next;
tmp_chunk->Free();
}
} }
private: private:
@ -799,113 +797,150 @@ Cache::CacheItemHelper CompressedSecondaryCacheTest::helper_fail_(
CompressedSecondaryCacheTest::SaveToCallbackFail, CompressedSecondaryCacheTest::SaveToCallbackFail,
CompressedSecondaryCacheTest::DeletionCallback); CompressedSecondaryCacheTest::DeletionCallback);
TEST_F(CompressedSecondaryCacheTest, BasicTestWithNoCompression) { class CompressedSecCacheTestWithCompressAndAllocatorParam
BasicTest(false, false); : public CompressedSecondaryCacheTest,
public ::testing::WithParamInterface<std::tuple<bool, bool>> {
public:
CompressedSecCacheTestWithCompressAndAllocatorParam() {
sec_cache_is_compressed_ = std::get<0>(GetParam());
use_jemalloc_ = std::get<1>(GetParam());
} }
bool sec_cache_is_compressed_;
bool use_jemalloc_;
};
TEST_F(CompressedSecondaryCacheTest, TEST_P(CompressedSecCacheTestWithCompressAndAllocatorParam, BasicTes) {
BasicTestWithMemoryAllocatorAndNoCompression) { BasicTest(sec_cache_is_compressed_, use_jemalloc_);
BasicTest(false, true);
} }
TEST_F(CompressedSecondaryCacheTest, BasicTestWithCompression) { INSTANTIATE_TEST_CASE_P(CompressedSecCacheTests,
BasicTest(true, false); CompressedSecCacheTestWithCompressAndAllocatorParam,
} ::testing::Combine(testing::Bool(), testing::Bool()));
TEST_F(CompressedSecondaryCacheTest, class CompressedSecondaryCacheTestWithCompressionParam
BasicTestWithMemoryAllocatorAndCompression) { : public CompressedSecondaryCacheTest,
BasicTest(true, true); public ::testing::WithParamInterface<bool> {
public:
CompressedSecondaryCacheTestWithCompressionParam() {
sec_cache_is_compressed_ = GetParam();
} }
bool sec_cache_is_compressed_;
};
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_F(CompressedSecondaryCacheTest, BasicTestFromStringWithNoCompression) { TEST_P(CompressedSecondaryCacheTestWithCompressionParam, BasicTestFromString) {
std::string sec_cache_uri = std::shared_ptr<SecondaryCache> sec_cache{nullptr};
std::string sec_cache_uri;
if (sec_cache_is_compressed_) {
if (LZ4_Supported()) {
sec_cache_uri =
"compressed_secondary_cache://"
"capacity=2048;num_shard_bits=0;compression_type=kLZ4Compression;"
"compress_format_version=2";
} else {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
sec_cache_uri =
"compressed_secondary_cache://"
"capacity=2048;num_shard_bits=0;compression_type=kNoCompression";
sec_cache_is_compressed_ = false;
}
Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri,
&sec_cache);
EXPECT_OK(s);
} else {
sec_cache_uri =
"compressed_secondary_cache://" "compressed_secondary_cache://"
"capacity=2048;num_shard_bits=0;compression_type=kNoCompression"; "capacity=2048;num_shard_bits=0;compression_type=kNoCompression";
std::shared_ptr<SecondaryCache> sec_cache;
Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri,
&sec_cache); &sec_cache);
EXPECT_OK(s); EXPECT_OK(s);
BasicTestHelper(sec_cache, /*sec_cache_is_compressed=*/false); }
BasicTestHelper(sec_cache, sec_cache_is_compressed_);
} }
TEST_F(CompressedSecondaryCacheTest, BasicTestFromStringWithCompression) { TEST_P(CompressedSecondaryCacheTestWithCompressionParam,
BasicTestFromStringWithSplit) {
std::shared_ptr<SecondaryCache> sec_cache{nullptr};
std::string sec_cache_uri; std::string sec_cache_uri;
bool sec_cache_is_compressed{true}; if (sec_cache_is_compressed_) {
if (LZ4_Supported()) { if (LZ4_Supported()) {
sec_cache_uri = sec_cache_uri =
"compressed_secondary_cache://" "compressed_secondary_cache://"
"capacity=2048;num_shard_bits=0;compression_type=kLZ4Compression;" "capacity=2048;num_shard_bits=0;compression_type=kLZ4Compression;"
"compress_format_version=2"; "compress_format_version=2;enable_custom_split_merge=true";
} else { } else {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support."); ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
sec_cache_uri = sec_cache_uri =
"compressed_secondary_cache://" "compressed_secondary_cache://"
"capacity=2048;num_shard_bits=0;compression_type=kNoCompression"; "capacity=2048;num_shard_bits=0;compression_type=kNoCompression;"
sec_cache_is_compressed = false; "enable_custom_split_merge=true";
sec_cache_is_compressed_ = false;
} }
std::shared_ptr<SecondaryCache> sec_cache;
Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri, Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri,
&sec_cache); &sec_cache);
EXPECT_OK(s); EXPECT_OK(s);
BasicTestHelper(sec_cache, sec_cache_is_compressed); } else {
} sec_cache_uri =
"compressed_secondary_cache://"
#endif // ROCKSDB_LITE "capacity=2048;num_shard_bits=0;compression_type=kNoCompression;"
"enable_custom_split_merge=true";
TEST_F(CompressedSecondaryCacheTest, FailsTestWithNoCompression) { Status s = SecondaryCache::CreateFromString(ConfigOptions(), sec_cache_uri,
FailsTest(false); &sec_cache);
EXPECT_OK(s);
} }
BasicTestHelper(sec_cache, sec_cache_is_compressed_);
TEST_F(CompressedSecondaryCacheTest, FailsTestWithCompression) {
FailsTest(true);
} }
TEST_F(CompressedSecondaryCacheTest, BasicIntegrationTestWithNoCompression) { #endif // ROCKSDB_LITE
BasicIntegrationTest(false);
}
TEST_F(CompressedSecondaryCacheTest, BasicIntegrationTestWithCompression) { TEST_P(CompressedSecondaryCacheTestWithCompressionParam, FailsTest) {
BasicIntegrationTest(true); FailsTest(sec_cache_is_compressed_);
} }
TEST_F(CompressedSecondaryCacheTest, TEST_P(CompressedSecondaryCacheTestWithCompressionParam,
BasicIntegrationFailTestWithNoCompression) { BasicIntegrationFailTest) {
BasicIntegrationFailTest(false); BasicIntegrationFailTest(sec_cache_is_compressed_);
} }
TEST_F(CompressedSecondaryCacheTest, BasicIntegrationFailTestWithCompression) { TEST_P(CompressedSecondaryCacheTestWithCompressionParam,
BasicIntegrationFailTest(true); IntegrationSaveFailTest) {
IntegrationSaveFailTest(sec_cache_is_compressed_);
} }
TEST_F(CompressedSecondaryCacheTest, IntegrationSaveFailTestWithNoCompression) { TEST_P(CompressedSecondaryCacheTestWithCompressionParam,
IntegrationSaveFailTest(false); IntegrationCreateFailTest) {
IntegrationCreateFailTest(sec_cache_is_compressed_);
} }
TEST_F(CompressedSecondaryCacheTest, IntegrationSaveFailTestWithCompression) { TEST_P(CompressedSecondaryCacheTestWithCompressionParam,
IntegrationSaveFailTest(true); IntegrationFullCapacityTest) {
IntegrationFullCapacityTest(sec_cache_is_compressed_);
} }
TEST_F(CompressedSecondaryCacheTest, INSTANTIATE_TEST_CASE_P(CompressedSecCacheTests,
IntegrationCreateFailTestWithNoCompression) { CompressedSecondaryCacheTestWithCompressionParam,
IntegrationCreateFailTest(false); testing::Bool());
}
TEST_F(CompressedSecondaryCacheTest, IntegrationCreateFailTestWithCompression) { class CompressedSecCacheTestWithCompressAndSplitParam
IntegrationCreateFailTest(true); : public CompressedSecondaryCacheTest,
public ::testing::WithParamInterface<std::tuple<bool, bool>> {
public:
CompressedSecCacheTestWithCompressAndSplitParam() {
sec_cache_is_compressed_ = std::get<0>(GetParam());
enable_custom_split_merge_ = std::get<1>(GetParam());
} }
bool sec_cache_is_compressed_;
bool enable_custom_split_merge_;
};
TEST_F(CompressedSecondaryCacheTest, TEST_P(CompressedSecCacheTestWithCompressAndSplitParam, BasicIntegrationTest) {
IntegrationFullCapacityTestWithNoCompression) { BasicIntegrationTest(sec_cache_is_compressed_, enable_custom_split_merge_);
IntegrationFullCapacityTest(false);
} }
TEST_F(CompressedSecondaryCacheTest, INSTANTIATE_TEST_CASE_P(CompressedSecCacheTests,
IntegrationFullCapacityTestWithCompression) { CompressedSecCacheTestWithCompressAndSplitParam,
IntegrationFullCapacityTest(true); ::testing::Combine(testing::Bool(), testing::Bool()));
}
TEST_F(CompressedSecondaryCacheTest, SplitValueIntoChunksTest) { TEST_F(CompressedSecondaryCacheTest, SplitValueIntoChunksTest) {
SplitValueIntoChunksTest(); SplitValueIntoChunksTest();

@ -182,6 +182,10 @@ struct CompressedSecondaryCacheOptions : LRUCacheOptions {
// header in varint32 format. // header in varint32 format.
uint32_t compress_format_version = 2; uint32_t compress_format_version = 2;
// Enable the custom split and merge feature, which split the compressed value
// into chunks so that they may better fit jemalloc bins.
bool enable_custom_split_merge = false;
CompressedSecondaryCacheOptions() {} CompressedSecondaryCacheOptions() {}
CompressedSecondaryCacheOptions( CompressedSecondaryCacheOptions(
size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit, size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit,
@ -191,13 +195,15 @@ struct CompressedSecondaryCacheOptions : LRUCacheOptions {
CacheMetadataChargePolicy _metadata_charge_policy = CacheMetadataChargePolicy _metadata_charge_policy =
kDefaultCacheMetadataChargePolicy, kDefaultCacheMetadataChargePolicy,
CompressionType _compression_type = CompressionType::kLZ4Compression, CompressionType _compression_type = CompressionType::kLZ4Compression,
uint32_t _compress_format_version = 2) uint32_t _compress_format_version = 2,
bool _enable_custom_split_merge = false)
: LRUCacheOptions(_capacity, _num_shard_bits, _strict_capacity_limit, : LRUCacheOptions(_capacity, _num_shard_bits, _strict_capacity_limit,
_high_pri_pool_ratio, std::move(_memory_allocator), _high_pri_pool_ratio, std::move(_memory_allocator),
_use_adaptive_mutex, _metadata_charge_policy, _use_adaptive_mutex, _metadata_charge_policy,
_low_pri_pool_ratio), _low_pri_pool_ratio),
compression_type(_compression_type), compression_type(_compression_type),
compress_format_version(_compress_format_version) {} compress_format_version(_compress_format_version),
enable_custom_split_merge(_enable_custom_split_merge) {}
}; };
// EXPERIMENTAL // EXPERIMENTAL
@ -211,7 +217,8 @@ extern std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
CacheMetadataChargePolicy metadata_charge_policy = CacheMetadataChargePolicy metadata_charge_policy =
kDefaultCacheMetadataChargePolicy, kDefaultCacheMetadataChargePolicy,
CompressionType compression_type = CompressionType::kLZ4Compression, CompressionType compression_type = CompressionType::kLZ4Compression,
uint32_t compress_format_version = 2); uint32_t compress_format_version = 2,
bool enable_custom_split_merge = false);
extern std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache( extern std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
const CompressedSecondaryCacheOptions& opts); const CompressedSecondaryCacheOptions& opts);

@ -178,7 +178,8 @@ default_params = {
"wal_compression": lambda: random.choice(["none", "zstd"]), "wal_compression": lambda: random.choice(["none", "zstd"]),
"verify_sst_unique_id_in_manifest": 1, # always do unique_id verification "verify_sst_unique_id_in_manifest": 1, # always do unique_id verification
"secondary_cache_uri": lambda: random.choice( "secondary_cache_uri": lambda: random.choice(
["", "compressed_secondary_cache://capacity=8388608"]), ["", "compressed_secondary_cache://capacity=8388608",
"compressed_secondary_cache://capacity=8388608;enable_custom_split_merge=true"]),
"allow_data_in_errors": True, "allow_data_in_errors": True,
"readahead_size": lambda: random.choice([0, 16384, 524288]), "readahead_size": lambda: random.choice([0, 16384, 524288]),
"initial_auto_readahead_size": lambda: random.choice([0, 16384, 524288]), "initial_auto_readahead_size": lambda: random.choice([0, 16384, 524288]),

Loading…
Cancel
Save