// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "cache/compressed_secondary_cache.h" #include #include #include #include "memory/memory_allocator.h" #include "monitoring/perf_context_imp.h" #include "util/compression.h" #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { CompressedSecondaryCache::CompressedSecondaryCache( size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, double low_pri_pool_ratio, std::shared_ptr memory_allocator, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, CompressionType compression_type, uint32_t compress_format_version, bool enable_custom_split_merge) : cache_options_(capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, low_pri_pool_ratio, memory_allocator, use_adaptive_mutex, metadata_charge_policy, compression_type, compress_format_version, enable_custom_split_merge) { cache_ = NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, metadata_charge_policy, low_pri_pool_ratio); } CompressedSecondaryCache::~CompressedSecondaryCache() { cache_.reset(); } std::unique_ptr CompressedSecondaryCache::Lookup( const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/, bool advise_erase, bool& is_in_sec_cache) { std::unique_ptr handle; is_in_sec_cache = false; Cache::Handle* lru_handle = cache_->Lookup(key); if (lru_handle == nullptr) { return nullptr; } void* handle_value = cache_->Value(lru_handle); if (handle_value == nullptr) { cache_->Release(lru_handle, /*erase_if_last_ref=*/false); return nullptr; } CacheAllocationPtr* ptr{nullptr}; CacheAllocationPtr merged_value; size_t handle_value_charge{0}; if (cache_options_.enable_custom_split_merge) { CacheValueChunk* value_chunk_ptr = reinterpret_cast(handle_value); merged_value = MergeChunksIntoValue(value_chunk_ptr, handle_value_charge); ptr = &merged_value; } else { ptr = reinterpret_cast(handle_value); handle_value_charge = cache_->GetCharge(lru_handle); } Status s; void* value{nullptr}; size_t charge{0}; if (cache_options_.compression_type == kNoCompression) { s = create_cb(ptr->get(), handle_value_charge, &value, &charge); } else { UncompressionContext uncompression_context(cache_options_.compression_type); UncompressionInfo uncompression_info(uncompression_context, UncompressionDict::GetEmptyDict(), cache_options_.compression_type); size_t uncompressed_size{0}; CacheAllocationPtr uncompressed = UncompressData( uncompression_info, (char*)ptr->get(), handle_value_charge, &uncompressed_size, cache_options_.compress_format_version, cache_options_.memory_allocator.get()); if (!uncompressed) { cache_->Release(lru_handle, /*erase_if_last_ref=*/true); return nullptr; } s = create_cb(uncompressed.get(), uncompressed_size, &value, &charge); } if (!s.ok()) { cache_->Release(lru_handle, /*erase_if_last_ref=*/true); return nullptr; } if (advise_erase) { cache_->Release(lru_handle, /*erase_if_last_ref=*/true); // Insert a dummy handle. cache_ ->Insert(key, /*value=*/nullptr, /*charge=*/0, GetDeletionCallback(cache_options_.enable_custom_split_merge)) .PermitUncheckedError(); } else { is_in_sec_cache = true; cache_->Release(lru_handle, /*erase_if_last_ref=*/false); } handle.reset(new CompressedSecondaryCacheResultHandle(value, charge)); return handle; } Status CompressedSecondaryCache::Insert(const Slice& key, void* value, const Cache::CacheItemHelper* helper) { if (value == nullptr) { return Status::InvalidArgument(); } Cache::Handle* lru_handle = cache_->Lookup(key); Cache::DeleterFn del_cb = GetDeletionCallback(cache_options_.enable_custom_split_merge); if (lru_handle == nullptr) { PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1); // Insert a dummy handle if the handle is evicted for the first time. return cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, del_cb); } else { cache_->Release(lru_handle, /*erase_if_last_ref=*/false); } size_t size = (*helper->size_cb)(value); CacheAllocationPtr ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); Status s = (*helper->saveto_cb)(value, 0, size, ptr.get()); if (!s.ok()) { return s; } Slice val(ptr.get(), size); std::string compressed_val; if (cache_options_.compression_type != kNoCompression) { PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, size); CompressionOptions compression_opts; CompressionContext compression_context(cache_options_.compression_type); uint64_t sample_for_compression{0}; CompressionInfo compression_info( compression_opts, compression_context, CompressionDict::GetEmptyDict(), cache_options_.compression_type, sample_for_compression); bool success = CompressData(val, compression_info, cache_options_.compress_format_version, &compressed_val); if (!success) { return Status::Corruption("Error compressing value."); } val = Slice(compressed_val); size = compressed_val.size(); PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, size); if (!cache_options_.enable_custom_split_merge) { ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); memcpy(ptr.get(), compressed_val.data(), size); } } PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1); if (cache_options_.enable_custom_split_merge) { size_t charge{0}; CacheValueChunk* value_chunks_head = SplitValueIntoChunks(val, cache_options_.compression_type, charge); return cache_->Insert(key, value_chunks_head, charge, del_cb); } else { CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); return cache_->Insert(key, buf, size, del_cb); } } void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } Status CompressedSecondaryCache::SetCapacity(size_t capacity) { MutexLock l(&capacity_mutex_); cache_options_.capacity = capacity; cache_->SetCapacity(capacity); return Status::OK(); } Status CompressedSecondaryCache::GetCapacity(size_t& capacity) { MutexLock l(&capacity_mutex_); capacity = cache_options_.capacity; return Status::OK(); } std::string CompressedSecondaryCache::GetPrintableOptions() const { std::string ret; ret.reserve(20000); const int kBufferSize{200}; char buffer[kBufferSize]; ret.append(cache_->GetPrintableOptions()); snprintf(buffer, kBufferSize, " compression_type : %s\n", CompressionTypeToString(cache_options_.compression_type).c_str()); ret.append(buffer); snprintf(buffer, kBufferSize, " compress_format_version : %d\n", cache_options_.compress_format_version); ret.append(buffer); return ret; } CompressedSecondaryCache::CacheValueChunk* CompressedSecondaryCache::SplitValueIntoChunks(const Slice& value, CompressionType compression_type, size_t& charge) { assert(!value.empty()); const char* src_ptr = value.data(); size_t src_size{value.size()}; CacheValueChunk dummy_head = CacheValueChunk(); CacheValueChunk* current_chunk = &dummy_head; // Do not split when value size is large or there is no compression. size_t predicted_chunk_size{0}; size_t actual_chunk_size{0}; size_t tmp_size{0}; while (src_size > 0) { predicted_chunk_size = sizeof(CacheValueChunk) - 1 + src_size; auto upper = std::upper_bound(malloc_bin_sizes_.begin(), malloc_bin_sizes_.end(), predicted_chunk_size); // Do not split when value size is too small, too large, close to a bin // size, or there is no compression. if (upper == malloc_bin_sizes_.begin() || upper == malloc_bin_sizes_.end() || *upper - predicted_chunk_size < malloc_bin_sizes_.front() || compression_type == kNoCompression) { tmp_size = predicted_chunk_size; } else { tmp_size = *(--upper); } CacheValueChunk* new_chunk = reinterpret_cast(new char[tmp_size]); current_chunk->next = new_chunk; current_chunk = current_chunk->next; actual_chunk_size = tmp_size - sizeof(CacheValueChunk) + 1; memcpy(current_chunk->data, src_ptr, actual_chunk_size); current_chunk->size = actual_chunk_size; src_ptr += actual_chunk_size; src_size -= actual_chunk_size; charge += tmp_size; } current_chunk->next = nullptr; return dummy_head.next; } CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue( const void* chunks_head, size_t& charge) { const CacheValueChunk* head = reinterpret_cast(chunks_head); const CacheValueChunk* current_chunk = head; charge = 0; while (current_chunk != nullptr) { charge += current_chunk->size; current_chunk = current_chunk->next; } CacheAllocationPtr ptr = AllocateBlock(charge, cache_options_.memory_allocator.get()); current_chunk = head; size_t pos{0}; while (current_chunk != nullptr) { memcpy(ptr.get() + pos, current_chunk->data, current_chunk->size); pos += current_chunk->size; current_chunk = current_chunk->next; } return ptr; } Cache::DeleterFn CompressedSecondaryCache::GetDeletionCallback( bool enable_custom_split_merge) { if (enable_custom_split_merge) { return [](const Slice& /*key*/, void* obj) { CacheValueChunk* chunks_head = reinterpret_cast(obj); while (chunks_head != nullptr) { CacheValueChunk* tmp_chunk = chunks_head; chunks_head = chunks_head->next; tmp_chunk->Free(); obj = nullptr; }; }; } else { return [](const Slice& /*key*/, void* obj) { delete reinterpret_cast(obj); obj = nullptr; }; } } std::shared_ptr NewCompressedSecondaryCache( size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, double low_pri_pool_ratio, std::shared_ptr memory_allocator, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, CompressionType compression_type, uint32_t compress_format_version, bool enable_custom_split_merge) { return std::make_shared( capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, low_pri_pool_ratio, memory_allocator, use_adaptive_mutex, metadata_charge_policy, compression_type, compress_format_version, enable_custom_split_merge); } std::shared_ptr NewCompressedSecondaryCache( const CompressedSecondaryCacheOptions& opts) { // The secondary_cache is disabled for this LRUCache instance. assert(opts.secondary_cache == nullptr); return NewCompressedSecondaryCache( opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit, opts.high_pri_pool_ratio, opts.low_pri_pool_ratio, opts.memory_allocator, opts.use_adaptive_mutex, opts.metadata_charge_policy, opts.compression_type, opts.compress_format_version, opts.enable_custom_split_merge); } } // namespace ROCKSDB_NAMESPACE