// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "cache/lru_secondary_cache.h" #include #include "memory/memory_allocator.h" #include "util/compression.h" #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { namespace { void DeletionCallback(const Slice& /*key*/, void* obj) { delete reinterpret_cast(obj); obj = nullptr; } } // namespace LRUSecondaryCache::LRUSecondaryCache( size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, std::shared_ptr memory_allocator, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, CompressionType compression_type, uint32_t compress_format_version) : cache_options_(capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, metadata_charge_policy, compression_type, compress_format_version) { cache_ = NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, metadata_charge_policy); } LRUSecondaryCache::~LRUSecondaryCache() { cache_.reset(); } std::unique_ptr LRUSecondaryCache::Lookup( const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/) { std::unique_ptr handle; Cache::Handle* lru_handle = cache_->Lookup(key); if (lru_handle == nullptr) { return handle; } CacheAllocationPtr* ptr = reinterpret_cast(cache_->Value(lru_handle)); void* value = nullptr; size_t charge = 0; Status s; if (cache_options_.compression_type == kNoCompression) { s = create_cb(ptr->get(), cache_->GetCharge(lru_handle), &value, &charge); } else { UncompressionContext uncompression_context(cache_options_.compression_type); UncompressionInfo uncompression_info(uncompression_context, UncompressionDict::GetEmptyDict(), cache_options_.compression_type); size_t uncompressed_size = 0; CacheAllocationPtr uncompressed; uncompressed = UncompressData( uncompression_info, (char*)ptr->get(), cache_->GetCharge(lru_handle), &uncompressed_size, cache_options_.compress_format_version, cache_options_.memory_allocator.get()); if (!uncompressed) { cache_->Release(lru_handle, true); return handle; } s = create_cb(uncompressed.get(), uncompressed_size, &value, &charge); } if (!s.ok()) { cache_->Release(lru_handle, true); return handle; } handle.reset(new LRUSecondaryCacheResultHandle(value, charge)); cache_->Release(lru_handle); return handle; } Status LRUSecondaryCache::Insert(const Slice& key, void* value, const Cache::CacheItemHelper* helper) { size_t size = (*helper->size_cb)(value); CacheAllocationPtr ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); Status s = (*helper->saveto_cb)(value, 0, size, ptr.get()); if (!s.ok()) { return s; } Slice val(ptr.get(), size); std::string compressed_val; if (cache_options_.compression_type != kNoCompression) { CompressionOptions compression_opts; CompressionContext compression_context(cache_options_.compression_type); uint64_t sample_for_compression = 0; CompressionInfo compression_info( compression_opts, compression_context, CompressionDict::GetEmptyDict(), cache_options_.compression_type, sample_for_compression); bool success = CompressData(val, compression_info, cache_options_.compress_format_version, &compressed_val); if (!success) { return Status::Corruption("Error compressing value."); } val = Slice(compressed_val); size = compressed_val.size(); ptr = AllocateBlock(size, cache_options_.memory_allocator.get()); memcpy(ptr.get(), compressed_val.data(), size); } CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); return cache_->Insert(key, buf, size, DeletionCallback); } void LRUSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } std::string LRUSecondaryCache::GetPrintableOptions() const { std::string ret; ret.reserve(20000); const int kBufferSize = 200; char buffer[kBufferSize]; ret.append(cache_->GetPrintableOptions()); snprintf(buffer, kBufferSize, " compression_type : %s\n", CompressionTypeToString(cache_options_.compression_type).c_str()); ret.append(buffer); snprintf(buffer, kBufferSize, " compression_type : %d\n", cache_options_.compress_format_version); ret.append(buffer); return ret; } std::shared_ptr NewLRUSecondaryCache( size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, std::shared_ptr memory_allocator, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, CompressionType compression_type, uint32_t compress_format_version) { return std::make_shared( capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, metadata_charge_policy, compression_type, compress_format_version); } std::shared_ptr NewLRUSecondaryCache( const LRUSecondaryCacheOptions& opts) { // The secondary_cache is disabled for this LRUCache instance. assert(opts.secondary_cache == nullptr); return NewLRUSecondaryCache( opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit, opts.high_pri_pool_ratio, opts.memory_allocator, opts.use_adaptive_mutex, opts.metadata_charge_policy, opts.compression_type, opts.compress_format_version); } } // namespace ROCKSDB_NAMESPACE