From 1cf5deb8fdecb7f63ce5ce1a0e942222a95f881e Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 2 Oct 2018 17:21:54 -0700 Subject: [PATCH] Introduce CacheAllocator, a custom allocator for cache blocks (#4437) Summary: This is a conceptually simple change, but it touches many files to pass the allocator through function calls. We introduce CacheAllocator, which can be used by clients to configure custom allocator for cache blocks. Our motivation is to hook this up with folly's `JemallocNodumpAllocator` (https://github.com/facebook/folly/blob/f43ce6d6866b7b994b3019df561109afae050ebc/folly/experimental/JemallocNodumpAllocator.h), but there are many other possible use cases. Additionally, this commit cleans up memory allocation in `util/compression.h`, making sure that all allocations are wrapped in a unique_ptr as soon as possible. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4437 Differential Revision: D10132814 Pulled By: yiwu-arbug fbshipit-source-id: be1343a4b69f6048df127939fea9bbc96969f564 --- HISTORY.md | 1 + cache/lru_cache.cc | 19 ++++--- cache/lru_cache.h | 3 +- cache/sharded_cache.cc | 9 +++- cache/sharded_cache.h | 3 +- include/rocksdb/cache.h | 28 +++++++--- include/rocksdb/cache_allocator.h | 29 +++++++++++ table/block_based_table_builder.cc | 4 +- table/block_based_table_reader.cc | 69 +++++++++++++++--------- table/block_based_table_reader.h | 5 +- table/block_fetcher.cc | 17 +++--- table/block_fetcher.h | 9 +++- table/format.cc | 41 +++++++++------ table/format.h | 21 ++++++-- table/plain_table_reader.h | 4 +- table/table_test.cc | 72 +++++++++++++++++++++++++ tools/db_bench_tool.cc | 23 ++++---- util/cache_allocator.h | 38 ++++++++++++++ util/compression.h | 84 +++++++++++++++--------------- 19 files changed, 350 insertions(+), 129 deletions(-) create mode 100644 include/rocksdb/cache_allocator.h create mode 100644 util/cache_allocator.h diff --git a/HISTORY.md b/HISTORY.md index b4baa7162..d0503aced 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ ### New Features * TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. Could be used for optimizing certain transactions or during recovery. +* Introduced CacheAllocator, which lets the user specify custom allocator for memory in block cache. ### Bug Fixes * Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction. diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index d4cbb9a45..9f3acd16a 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -461,8 +461,10 @@ std::string LRUCacheShard::GetPrintableOptions() const { } LRUCache::LRUCache(size_t capacity, int num_shard_bits, - bool strict_capacity_limit, double high_pri_pool_ratio) - : ShardedCache(capacity, num_shard_bits, strict_capacity_limit) { + bool strict_capacity_limit, double high_pri_pool_ratio, + std::shared_ptr allocator) + : ShardedCache(capacity, num_shard_bits, strict_capacity_limit, + std::move(allocator)) { num_shards_ = 1 << num_shard_bits; shards_ = reinterpret_cast( port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_)); @@ -537,12 +539,14 @@ double LRUCache::GetHighPriPoolRatio() { std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts) { return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, cache_opts.strict_capacity_limit, - cache_opts.high_pri_pool_ratio); + cache_opts.high_pri_pool_ratio, + cache_opts.cache_allocator); } -std::shared_ptr NewLRUCache(size_t capacity, int num_shard_bits, - bool strict_capacity_limit, - double high_pri_pool_ratio) { +std::shared_ptr NewLRUCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr cache_allocator) { if (num_shard_bits >= 20) { return nullptr; // the cache cannot be sharded into too many fine pieces } @@ -554,7 +558,8 @@ std::shared_ptr NewLRUCache(size_t capacity, int num_shard_bits, num_shard_bits = GetDefaultCacheShardBits(capacity); } return std::make_shared(capacity, num_shard_bits, - strict_capacity_limit, high_pri_pool_ratio); + strict_capacity_limit, high_pri_pool_ratio, + std::move(cache_allocator)); } } // namespace rocksdb diff --git a/cache/lru_cache.h b/cache/lru_cache.h index 3c067f0c1..0b925a166 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -279,7 +279,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard { class LRUCache : public ShardedCache { public: LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, - double high_pri_pool_ratio); + double high_pri_pool_ratio, + std::shared_ptr cache_allocator = nullptr); virtual ~LRUCache(); virtual const char* Name() const override { return "LRUCache"; } virtual CacheShard* GetShard(int shard) override; diff --git a/cache/sharded_cache.cc b/cache/sharded_cache.cc index 6a0a22282..3fef82a79 100644 --- a/cache/sharded_cache.cc +++ b/cache/sharded_cache.cc @@ -20,8 +20,10 @@ namespace rocksdb { ShardedCache::ShardedCache(size_t capacity, int num_shard_bits, - bool strict_capacity_limit) - : num_shard_bits_(num_shard_bits), + bool strict_capacity_limit, + std::shared_ptr allocator) + : Cache(std::move(allocator)), + num_shard_bits_(num_shard_bits), capacity_(capacity), strict_capacity_limit_(strict_capacity_limit), last_id_(1) {} @@ -142,6 +144,9 @@ std::string ShardedCache::GetPrintableOptions() const { strict_capacity_limit_); ret.append(buffer); } + snprintf(buffer, kBufferSize, " cache_allocator : %s\n", + cache_allocator() ? cache_allocator()->Name() : "None"); + ret.append(buffer); ret.append(GetShard(0)->GetPrintableOptions()); return ret; } diff --git a/cache/sharded_cache.h b/cache/sharded_cache.h index 4f9dea2ad..9876a882b 100644 --- a/cache/sharded_cache.h +++ b/cache/sharded_cache.h @@ -47,7 +47,8 @@ class CacheShard { // Keys are sharded by the highest num_shard_bits bits of hash value. class ShardedCache : public Cache { public: - ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit); + ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, + std::shared_ptr cache_allocator = nullptr); virtual ~ShardedCache() = default; virtual const char* Name() const override = 0; virtual CacheShard* GetShard(int shard) = 0; diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index da3b934d8..d4d2b6510 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -25,6 +25,7 @@ #include #include #include +#include "rocksdb/cache_allocator.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" @@ -58,13 +59,20 @@ struct LRUCacheOptions { // BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority. double high_pri_pool_ratio = 0.0; + // If non-nullptr will use this allocator instead of system allocator when + // allocating memory for cache blocks. Call this method before you start using + // the cache! + std::shared_ptr cache_allocator; + LRUCacheOptions() {} LRUCacheOptions(size_t _capacity, int _num_shard_bits, - bool _strict_capacity_limit, double _high_pri_pool_ratio) + bool _strict_capacity_limit, double _high_pri_pool_ratio, + std::shared_ptr _cache_allocator = nullptr) : capacity(_capacity), num_shard_bits(_num_shard_bits), strict_capacity_limit(_strict_capacity_limit), - high_pri_pool_ratio(_high_pri_pool_ratio) {} + high_pri_pool_ratio(_high_pri_pool_ratio), + cache_allocator(std::move(_cache_allocator)) {} }; // Create a new cache with a fixed size capacity. The cache is sharded @@ -75,10 +83,10 @@ struct LRUCacheOptions { // high_pri_pool_pct. // num_shard_bits = -1 means it is automatically determined: every shard // will be at least 512KB and number of shard bits will not exceed 6. -extern std::shared_ptr NewLRUCache(size_t capacity, - int num_shard_bits = -1, - bool strict_capacity_limit = false, - double high_pri_pool_ratio = 0.0); +extern std::shared_ptr NewLRUCache( + size_t capacity, int num_shard_bits = -1, + bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0, + std::shared_ptr cache_allocator = nullptr); extern std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts); @@ -91,13 +99,15 @@ extern std::shared_ptr NewClockCache(size_t capacity, int num_shard_bits = -1, bool strict_capacity_limit = false); + class Cache { public: // Depending on implementation, cache entries with high priority could be less // likely to get evicted than low priority entries. enum class Priority { HIGH, LOW }; - Cache() {} + Cache(std::shared_ptr allocator = nullptr) + : cache_allocator_(std::move(allocator)) {} // Destroys all existing entries by calling the "deleter" // function that was passed via the Insert() function. @@ -228,10 +238,14 @@ class Cache { virtual void TEST_mark_as_data_block(const Slice& /*key*/, size_t /*charge*/) {} + CacheAllocator* cache_allocator() const { return cache_allocator_.get(); } + private: // No copying allowed Cache(const Cache&); Cache& operator=(const Cache&); + + std::shared_ptr cache_allocator_; }; } // namespace rocksdb diff --git a/include/rocksdb/cache_allocator.h b/include/rocksdb/cache_allocator.h new file mode 100644 index 000000000..5bbec0e8a --- /dev/null +++ b/include/rocksdb/cache_allocator.h @@ -0,0 +1,29 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +// CacheAllocator is an interface that a client can implement to supply custom +// cache allocation and deallocation methods. See rocksdb/cache.h for more +// information. +// All methods should be thread-safe. +class CacheAllocator { + public: + virtual ~CacheAllocator() = default; + + // Name of the cache allocator, printed in the log + virtual const char* Name() const = 0; + + // Allocate a block of at least size size + virtual void* Allocate(size_t size) = 0; + // Deallocate previously allocated block + virtual void Deallocate(void* p) = 0; + // Returns the memory size of the block allocated at p. The default + // implementation that just returns the original allocation_size is fine. + virtual size_t UsableSize(void* /*p*/, size_t allocation_size) const { + // default implementation just returns the allocation size + return allocation_size; + } +}; diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 59c385d65..26c14d21d 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -39,6 +39,7 @@ #include "table/full_filter_block.h" #include "table/table_builder.h" +#include "util/cache_allocator.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -654,7 +655,8 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, size_t size = block_contents.size(); - std::unique_ptr ubuf(new char[size + 1]); + auto ubuf = + AllocateBlock(size + 1, block_cache_compressed->cache_allocator()); memcpy(ubuf.get(), block_contents.data(), size); ubuf[size] = type; diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 9f2e02d68..7e7cec1b1 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -80,11 +80,12 @@ Status ReadBlockFromFile( std::unique_ptr* result, const ImmutableCFOptions& ioptions, bool do_uncompress, const Slice& compression_dict, const PersistentCacheOptions& cache_options, SequenceNumber global_seqno, - size_t read_amp_bytes_per_bit, const bool immortal_file = false) { + size_t read_amp_bytes_per_bit, CacheAllocator* allocator = nullptr, + const bool immortal_file = false) { BlockContents contents; - BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle, - &contents, ioptions, do_uncompress, - compression_dict, cache_options, immortal_file); + BlockFetcher block_fetcher( + file, prefetch_buffer, footer, options, handle, &contents, ioptions, + do_uncompress, compression_dict, cache_options, allocator, immortal_file); Status s = block_fetcher.ReadBlockContents(); if (s.ok()) { result->reset(new Block(std::move(contents), global_seqno, @@ -94,6 +95,13 @@ Status ReadBlockFromFile( return s; } +inline CacheAllocator* GetCacheAllocator( + const BlockBasedTableOptions& table_options) { + return table_options.block_cache.get() + ? table_options.block_cache->cache_allocator() + : nullptr; +} + // Delete the resource that is held by the iterator. template void DeleteHeldResource(void* arg, void* /*ignored*/) { @@ -1150,7 +1158,8 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, rep->footer.metaindex_handle(), &meta, rep->ioptions, true /* decompress */, Slice() /*compression dict*/, rep->persistent_cache_options, kDisableGlobalSequenceNumber, - 0 /* read_amp_bytes_per_bit */); + 0 /* read_amp_bytes_per_bit */, + GetCacheAllocator(rep->table_options)); if (!s.ok()) { ROCKS_LOG_ERROR(rep->ioptions.info_log, @@ -1173,7 +1182,7 @@ Status BlockBasedTable::GetDataBlockFromCache( const ImmutableCFOptions& ioptions, const ReadOptions& read_options, BlockBasedTable::CachableEntry* block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index, - GetContext* get_context) { + GetContext* get_context, CacheAllocator* allocator) { Status s; Block* compressed_block = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr; @@ -1230,7 +1239,7 @@ Status BlockBasedTable::GetDataBlockFromCache( compression_dict); s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(), compressed_block->size(), &contents, - format_version, ioptions); + format_version, ioptions, allocator); // Insert uncompressed block into block cache if (s.ok()) { @@ -1292,7 +1301,8 @@ Status BlockBasedTable::PutDataBlockToCache( const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions, CachableEntry* block, Block* raw_block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index, - Cache::Priority priority, GetContext* get_context) { + Cache::Priority priority, GetContext* get_context, + CacheAllocator* allocator) { assert(raw_block->compression_type() == kNoCompression || block_cache_compressed != nullptr); @@ -1305,7 +1315,7 @@ Status BlockBasedTable::PutDataBlockToCache( compression_dict); s = UncompressBlockContents(uncompression_ctx, raw_block->data(), raw_block->size(), &contents, format_version, - ioptions); + ioptions, allocator); } if (!s.ok()) { delete raw_block; @@ -1402,7 +1412,8 @@ FilterBlockReader* BlockBasedTable::ReadFilter( BlockFetcher block_fetcher(rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(), filter_handle, &block, rep->ioptions, false /* decompress */, - dummy_comp_dict, rep->persistent_cache_options); + dummy_comp_dict, rep->persistent_cache_options, + GetCacheAllocator(rep->table_options)); Status s = block_fetcher.ReadBlockContents(); if (!s.ok()) { @@ -1700,7 +1711,9 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( &block_value, rep->ioptions, rep->blocks_maybe_compressed, compression_dict, rep->persistent_cache_options, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit, rep->immortal_table); + rep->table_options.read_amp_bytes_per_bit, + GetCacheAllocator(rep->table_options), + rep->immortal_table); } if (s.ok()) { block.value = block_value.release(); @@ -1792,7 +1805,8 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( s = GetDataBlockFromCache( key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro, block_entry, rep->table_options.format_version, compression_dict, - rep->table_options.read_amp_bytes_per_bit, is_index, get_context); + rep->table_options.read_amp_bytes_per_bit, is_index, get_context, + GetCacheAllocator(rep->table_options)); if (block_entry->value == nullptr && !no_io && ro.fill_cache) { std::unique_ptr raw_block; @@ -1804,7 +1818,9 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( block_cache_compressed == nullptr && rep->blocks_maybe_compressed, compression_dict, rep->persistent_cache_options, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit, rep->immortal_table); + rep->table_options.read_amp_bytes_per_bit, + GetCacheAllocator(rep->table_options), + rep->immortal_table); } if (s.ok()) { @@ -1817,7 +1833,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( .cache_index_and_filter_blocks_with_high_priority ? Cache::Priority::HIGH : Cache::Priority::LOW, - get_context); + get_context, GetCacheAllocator(rep->table_options)); } } } @@ -2524,11 +2540,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks( BlockHandle handle = index_iter->value(); BlockContents contents; Slice dummy_comp_dict; - BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */, - rep_->footer, ReadOptions(), handle, &contents, - rep_->ioptions, false /* decompress */, - dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options); + BlockFetcher block_fetcher( + rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, + ReadOptions(), handle, &contents, rep_->ioptions, + false /* decompress */, dummy_comp_dict /*compression dict*/, + rep_->persistent_cache_options, + GetCacheAllocator(rep_->table_options)); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { break; @@ -2550,11 +2567,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks( s = handle.DecodeFrom(&input); BlockContents contents; Slice dummy_comp_dict; - BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */, - rep_->footer, ReadOptions(), handle, &contents, - rep_->ioptions, false /* decompress */, - dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options); + BlockFetcher block_fetcher( + rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, + ReadOptions(), handle, &contents, rep_->ioptions, + false /* decompress */, dummy_comp_dict /*compression dict*/, + rep_->persistent_cache_options, + GetCacheAllocator(rep_->table_options)); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { break; @@ -2858,7 +2876,8 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file, rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer, ReadOptions(), handle, &block, rep_->ioptions, false /*decompress*/, dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options); + rep_->persistent_cache_options, + GetCacheAllocator(rep_->table_options)); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { rep_->filter.reset(new BlockBasedFilterBlockReader( diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 3cada0c2c..59a0f36b5 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -303,7 +303,8 @@ class BlockBasedTable : public TableReader { const ImmutableCFOptions& ioptions, const ReadOptions& read_options, BlockBasedTable::CachableEntry* block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, - bool is_index = false, GetContext* get_context = nullptr); + bool is_index = false, GetContext* get_context = nullptr, + CacheAllocator* allocator = nullptr); // Put a raw block (maybe compressed) to the corresponding block caches. // This method will perform decompression against raw_block if needed and then @@ -322,7 +323,7 @@ class BlockBasedTable : public TableReader { CachableEntry* block, Block* raw_block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index = false, Cache::Priority pri = Cache::Priority::LOW, - GetContext* get_context = nullptr); + GetContext* get_context = nullptr, CacheAllocator* allocator = nullptr); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index ea97066ec..489705758 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -17,8 +17,9 @@ #include "rocksdb/env.h" #include "table/block.h" #include "table/block_based_table_reader.h" -#include "table/persistent_cache_helper.h" #include "table/format.h" +#include "table/persistent_cache_helper.h" +#include "util/cache_allocator.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -107,9 +108,11 @@ bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() { if (cache_options_.persistent_cache && cache_options_.persistent_cache->IsCompressed()) { // lookup uncompressed cache mode p-cache + std::unique_ptr raw_data; status_ = PersistentCacheHelper::LookupRawPage( - cache_options_, handle_, &heap_buf_, block_size_ + kBlockTrailerSize); + cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize); if (status_.ok()) { + heap_buf_ = CacheAllocationPtr(raw_data.release()); used_buf_ = heap_buf_.get(); slice_ = Slice(heap_buf_.get(), block_size_); return true; @@ -132,7 +135,7 @@ void BlockFetcher::PrepareBufferForBlockFromFile() { // trivially allocated stack buffer instead of needing a full malloc() used_buf_ = &stack_buf_[0]; } else { - heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]); + heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_); used_buf_ = heap_buf_.get(); } } @@ -170,7 +173,7 @@ void BlockFetcher::GetBlockContents() { // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096 if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) { assert(used_buf_ != heap_buf_.get()); - heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]); + heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_); memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize); } *contents_ = BlockContents(std::move(heap_buf_), block_size_, true, @@ -228,9 +231,9 @@ Status BlockFetcher::ReadBlockContents() { if (do_uncompress_ && compression_type != kNoCompression) { // compressed page, uncompress, update cache UncompressionContext uncompression_ctx(compression_type, compression_dict_); - status_ = - UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_, - contents_, footer_.version(), ioptions_); + status_ = UncompressBlockContents(uncompression_ctx, slice_.data(), + block_size_, contents_, footer_.version(), + ioptions_, allocator_); } else { GetBlockContents(); } diff --git a/table/block_fetcher.h b/table/block_fetcher.h index 9e0d2448d..a8d9d6572 100644 --- a/table/block_fetcher.h +++ b/table/block_fetcher.h @@ -11,6 +11,8 @@ #include "table/block.h" #include "table/format.h" +#include "util/cache_allocator.h" + namespace rocksdb { class BlockFetcher { public: @@ -26,6 +28,7 @@ class BlockFetcher { BlockContents* contents, const ImmutableCFOptions& ioptions, bool do_uncompress, const Slice& compression_dict, const PersistentCacheOptions& cache_options, + CacheAllocator* allocator = nullptr, const bool immortal_source = false) : file_(file), prefetch_buffer_(prefetch_buffer), @@ -37,7 +40,8 @@ class BlockFetcher { do_uncompress_(do_uncompress), immortal_source_(immortal_source), compression_dict_(compression_dict), - cache_options_(cache_options) {} + cache_options_(cache_options), + allocator_(allocator) {} Status ReadBlockContents(); private: @@ -54,11 +58,12 @@ class BlockFetcher { const bool immortal_source_; const Slice& compression_dict_; const PersistentCacheOptions& cache_options_; + CacheAllocator* allocator_; Status status_; Slice slice_; char* used_buf_ = nullptr; size_t block_size_; - std::unique_ptr heap_buf_; + CacheAllocationPtr heap_buf_; char stack_buf_[kDefaultStackBufferSize]; bool got_from_prefetch_buffer_ = false; rocksdb::CompressionType compression_type; diff --git a/table/format.cc b/table/format.cc index 16d959c3d..f565fb14a 100644 --- a/table/format.cc +++ b/table/format.cc @@ -19,6 +19,7 @@ #include "table/block_based_table_reader.h" #include "table/block_fetcher.h" #include "table/persistent_cache_helper.h" +#include "util/cache_allocator.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -279,8 +280,9 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, Status UncompressBlockContentsForCompressionType( const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t format_version, - const ImmutableCFOptions& ioptions) { - std::unique_ptr ubuf; + const ImmutableCFOptions& ioptions, + CacheAllocator* allocator) { + CacheAllocationPtr ubuf; assert(uncompression_ctx.type() != kNoCompression && "Invalid compression type"); @@ -296,7 +298,7 @@ Status UncompressBlockContentsForCompressionType( if (!Snappy_GetUncompressedLength(data, n, &ulength)) { return Status::Corruption(snappy_corrupt_msg); } - ubuf.reset(new char[ulength]); + ubuf = AllocateBlock(ulength, allocator); if (!Snappy_Uncompress(data, n, ubuf.get())) { return Status::Corruption(snappy_corrupt_msg); } @@ -304,9 +306,10 @@ Status UncompressBlockContentsForCompressionType( break; } case kZlibCompression: - ubuf.reset(Zlib_Uncompress( + ubuf = Zlib_Uncompress( uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kZlibCompression, format_version))); + GetCompressFormatForVersion(kZlibCompression, format_version), + allocator); if (!ubuf) { static char zlib_corrupt_msg[] = "Zlib not supported or corrupted Zlib compressed block contents"; @@ -316,9 +319,10 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kBZip2Compression: - ubuf.reset(BZip2_Uncompress( + ubuf = BZip2_Uncompress( data, n, &decompress_size, - GetCompressFormatForVersion(kBZip2Compression, format_version))); + GetCompressFormatForVersion(kBZip2Compression, format_version), + allocator); if (!ubuf) { static char bzip2_corrupt_msg[] = "Bzip2 not supported or corrupted Bzip2 compressed block contents"; @@ -328,9 +332,10 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4Compression: - ubuf.reset(LZ4_Uncompress( + ubuf = LZ4_Uncompress( uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4Compression, format_version))); + GetCompressFormatForVersion(kLZ4Compression, format_version), + allocator); if (!ubuf) { static char lz4_corrupt_msg[] = "LZ4 not supported or corrupted LZ4 compressed block contents"; @@ -340,9 +345,10 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4HCCompression: - ubuf.reset(LZ4_Uncompress( + ubuf = LZ4_Uncompress( uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4HCCompression, format_version))); + GetCompressFormatForVersion(kLZ4HCCompression, format_version), + allocator); if (!ubuf) { static char lz4hc_corrupt_msg[] = "LZ4HC not supported or corrupted LZ4HC compressed block contents"; @@ -352,6 +358,8 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kXpressCompression: + // XPRESS allocates memory internally, thus no support for custom + // allocator. ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size)); if (!ubuf) { static char xpress_corrupt_msg[] = @@ -363,7 +371,8 @@ Status UncompressBlockContentsForCompressionType( break; case kZSTD: case kZSTDNotFinalCompression: - ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size)); + ubuf = ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size, + allocator); if (!ubuf) { static char zstd_corrupt_msg[] = "ZSTD not supported or corrupted ZSTD compressed block contents"; @@ -396,11 +405,13 @@ Status UncompressBlockContentsForCompressionType( Status UncompressBlockContents(const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t format_version, - const ImmutableCFOptions& ioptions) { + const ImmutableCFOptions& ioptions, + CacheAllocator* allocator) { assert(data[n] != kNoCompression); assert(data[n] == uncompression_ctx.type()); - return UncompressBlockContentsForCompressionType( - uncompression_ctx, data, n, contents, format_version, ioptions); + return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n, + contents, format_version, + ioptions, allocator); } } // namespace rocksdb diff --git a/table/format.h b/table/format.h index 6e0e99c1c..441d7107e 100644 --- a/table/format.h +++ b/table/format.h @@ -26,6 +26,7 @@ #include "port/port.h" // noexcept #include "table/persistent_cache_options.h" #include "util/file_reader_writer.h" +#include "util/cache_allocator.h" namespace rocksdb { @@ -192,7 +193,7 @@ struct BlockContents { Slice data; // Actual contents of data bool cachable; // True iff data can be cached CompressionType compression_type; - std::unique_ptr allocation; + CacheAllocationPtr allocation; BlockContents() : cachable(false), compression_type(kNoCompression) {} @@ -200,16 +201,28 @@ struct BlockContents { CompressionType _compression_type) : data(_data), cachable(_cachable), compression_type(_compression_type) {} - BlockContents(std::unique_ptr&& _data, size_t _size, bool _cachable, + BlockContents(CacheAllocationPtr&& _data, size_t _size, bool _cachable, CompressionType _compression_type) : data(_data.get(), _size), cachable(_cachable), compression_type(_compression_type), allocation(std::move(_data)) {} + BlockContents(std::unique_ptr&& _data, size_t _size, bool _cachable, + CompressionType _compression_type) + : data(_data.get(), _size), + cachable(_cachable), + compression_type(_compression_type) { + allocation.reset(_data.release()); + } + // The additional memory space taken by the block data. size_t usable_size() const { if (allocation.get() != nullptr) { + auto allocator = allocation.get_deleter().allocator; + if (allocator) { + return allocator->UsableSize(allocation.get(), data.size()); + } #ifdef ROCKSDB_MALLOC_USABLE_SIZE return malloc_usable_size(allocation.get()); #else @@ -252,7 +265,7 @@ extern Status ReadBlockContents( extern Status UncompressBlockContents( const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t compress_format_version, - const ImmutableCFOptions& ioptions); + const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr); // This is an extension to UncompressBlockContents that accepts // a specific compression type. This is used by un-wrapped blocks @@ -260,7 +273,7 @@ extern Status UncompressBlockContents( extern Status UncompressBlockContentsForCompressionType( const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t compress_format_version, - const ImmutableCFOptions& ioptions); + const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr); // Implementation details follow. Clients should ignore, diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index df08a98fa..d7a8ed4aa 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -153,8 +153,8 @@ class PlainTableReader: public TableReader { DynamicBloom bloom_; PlainTableReaderFileInfo file_info_; Arena arena_; - std::unique_ptr index_block_alloc_; - std::unique_ptr bloom_block_alloc_; + CacheAllocationPtr index_block_alloc_; + CacheAllocationPtr bloom_block_alloc_; const ImmutableCFOptions& ioptions_; uint64_t file_size_; diff --git a/table/table_test.cc b/table/table_test.cc index 26383fa81..a525c5866 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -2477,6 +2477,78 @@ TEST_P(BlockBasedTableTest, BlockCacheLeak) { c.ResetTableReader(); } +namespace { +class CustomCacheAllocator : public CacheAllocator { + public: + virtual const char* Name() const override { return "CustomCacheAllocator"; } + + void* Allocate(size_t size) override { + ++numAllocations; + auto ptr = new char[size + 16]; + memcpy(ptr, "cache_allocator_", 16); // mangle first 16 bytes + return reinterpret_cast(ptr + 16); + } + void Deallocate(void* p) override { + ++numDeallocations; + char* ptr = reinterpret_cast(p) - 16; + delete[] ptr; + } + + std::atomic numAllocations; + std::atomic numDeallocations; +}; +} // namespace + +TEST_P(BlockBasedTableTest, CacheAllocator) { + auto custom_cache_allocator = std::make_shared(); + { + Options opt; + unique_ptr ikc; + ikc.reset(new test::PlainInternalKeyComparator(opt.comparator)); + opt.compression = kNoCompression; + BlockBasedTableOptions table_options; + table_options.block_size = 1024; + LRUCacheOptions lruOptions; + lruOptions.cache_allocator = custom_cache_allocator; + lruOptions.capacity = 16 * 1024 * 1024; + lruOptions.num_shard_bits = 4; + table_options.block_cache = NewLRUCache(std::move(lruOptions)); + opt.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + TableConstructor c(BytewiseComparator(), + true /* convert_to_internal_key_ */); + c.Add("k01", "hello"); + c.Add("k02", "hello2"); + c.Add("k03", std::string(10000, 'x')); + c.Add("k04", std::string(200000, 'x')); + c.Add("k05", std::string(300000, 'x')); + c.Add("k06", "hello3"); + c.Add("k07", std::string(100000, 'x')); + std::vector keys; + stl_wrappers::KVMap kvmap; + const ImmutableCFOptions ioptions(opt); + const MutableCFOptions moptions(opt); + c.Finish(opt, ioptions, moptions, table_options, *ikc, &keys, &kvmap); + + unique_ptr iter( + c.NewIterator(moptions.prefix_extractor.get())); + iter->SeekToFirst(); + while (iter->Valid()) { + iter->key(); + iter->value(); + iter->Next(); + } + ASSERT_OK(iter->status()); + } + + // out of scope, block cache should have been deleted, all allocations + // deallocated + EXPECT_EQ(custom_cache_allocator->numAllocations.load(), + custom_cache_allocator->numDeallocations.load()); + // make sure that allocations actually happened through the cache allocator + EXPECT_GT(custom_cache_allocator->numAllocations.load(), 0); +} + TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) { // A regression test to avoid data race described in // https://github.com/facebook/rocksdb/issues/1267 diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index e3560d6fa..9b72f3a91 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -3040,7 +3040,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { int64_t bytes = 0; int decompress_size; while (ok && bytes < 1024 * 1048576) { - char *uncompressed = nullptr; + CacheAllocationPtr uncompressed; switch (FLAGS_compression_type_e) { case rocksdb::kSnappyCompression: { // get size and allocate here to make comparison fair @@ -3050,45 +3050,44 @@ void VerifyDBFromDB(std::string& truth_db_name) { ok = false; break; } - uncompressed = new char[ulength]; + uncompressed = AllocateBlock(ulength, nullptr); ok = Snappy_Uncompress(compressed.data(), compressed.size(), - uncompressed); + uncompressed.get()); break; } case rocksdb::kZlibCompression: uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed != nullptr; + ok = uncompressed.get() != nullptr; break; case rocksdb::kBZip2Compression: uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed != nullptr; + ok = uncompressed.get() != nullptr; break; case rocksdb::kLZ4Compression: uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed != nullptr; + ok = uncompressed.get() != nullptr; break; case rocksdb::kLZ4HCCompression: uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed != nullptr; + ok = uncompressed.get() != nullptr; break; case rocksdb::kXpressCompression: - uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(), - &decompress_size); - ok = uncompressed != nullptr; + uncompressed.reset(XPRESS_Uncompress( + compressed.data(), compressed.size(), &decompress_size)); + ok = uncompressed.get() != nullptr; break; case rocksdb::kZSTD: uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size); - ok = uncompressed != nullptr; + ok = uncompressed.get() != nullptr; break; default: ok = false; } - delete[] uncompressed; bytes += input.size(); thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress); } diff --git a/util/cache_allocator.h b/util/cache_allocator.h new file mode 100644 index 000000000..68dd3dcea --- /dev/null +++ b/util/cache_allocator.h @@ -0,0 +1,38 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#pragma once + +#include "rocksdb/cache_allocator.h" + +namespace rocksdb { + +struct CustomDeleter { + CustomDeleter(CacheAllocator* a = nullptr) : allocator(a) {} + + void operator()(char* ptr) const { + if (allocator) { + allocator->Deallocate(reinterpret_cast(ptr)); + } else { + delete[] ptr; + } + } + + CacheAllocator* allocator; +}; + +using CacheAllocationPtr = std::unique_ptr; + +inline CacheAllocationPtr AllocateBlock(size_t size, + CacheAllocator* allocator) { + if (allocator) { + auto block = reinterpret_cast(allocator->Allocate(size)); + return CacheAllocationPtr(block, allocator); + } + return CacheAllocationPtr(new char[size]); +} + +} // namespace rocksdb diff --git a/util/compression.h b/util/compression.h index e918e14fb..d7bab05ed 100644 --- a/util/compression.h +++ b/util/compression.h @@ -14,6 +14,8 @@ #include #include "rocksdb/options.h" +#include "rocksdb/table.h" +#include "util/cache_allocator.h" #include "util/coding.h" #include "util/compression_context_cache.h" @@ -495,11 +497,10 @@ inline bool Zlib_Compress(const CompressionContext& ctx, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* Zlib_Uncompress(const UncompressionContext& ctx, - const char* input_data, size_t input_length, - int* decompress_size, - uint32_t compress_format_version, - int windowBits = -14) { +inline CacheAllocationPtr Zlib_Uncompress( + const UncompressionContext& ctx, const char* input_data, + size_t input_length, int* decompress_size, uint32_t compress_format_version, + CacheAllocator* allocator = nullptr, int windowBits = -14) { #ifdef ZLIB uint32_t output_len = 0; if (compress_format_version == 2) { @@ -541,9 +542,9 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx, _stream.next_in = (Bytef*)input_data; _stream.avail_in = static_cast(input_length); - char* output = new char[output_len]; + auto output = AllocateBlock(output_len, allocator); - _stream.next_out = (Bytef*)output; + _stream.next_out = (Bytef*)output.get(); _stream.avail_out = static_cast(output_len); bool done = false; @@ -561,19 +562,17 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx, size_t old_sz = output_len; uint32_t output_len_delta = output_len / 5; output_len += output_len_delta < 10 ? 10 : output_len_delta; - char* tmp = new char[output_len]; - memcpy(tmp, output, old_sz); - delete[] output; - output = tmp; + auto tmp = AllocateBlock(output_len, allocator); + memcpy(tmp.get(), output.get(), old_sz); + output = std::move(tmp); // Set more output. - _stream.next_out = (Bytef*)(output + old_sz); + _stream.next_out = (Bytef*)(output.get() + old_sz); _stream.avail_out = static_cast(output_len - old_sz); break; } case Z_BUF_ERROR: default: - delete[] output; inflateEnd(&_stream); return nullptr; } @@ -660,9 +659,9 @@ inline bool BZip2_Compress(const CompressionContext& /*ctx*/, // block header // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format -inline char* BZip2_Uncompress(const char* input_data, size_t input_length, - int* decompress_size, - uint32_t compress_format_version) { +inline CacheAllocationPtr BZip2_Uncompress( + const char* input_data, size_t input_length, int* decompress_size, + uint32_t compress_format_version, CacheAllocator* allocator = nullptr) { #ifdef BZIP2 uint32_t output_len = 0; if (compress_format_version == 2) { @@ -690,9 +689,9 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length, _stream.next_in = (char*)input_data; _stream.avail_in = static_cast(input_length); - char* output = new char[output_len]; + auto output = AllocateBlock(output_len, allocator); - _stream.next_out = (char*)output; + _stream.next_out = (char*)output.get(); _stream.avail_out = static_cast(output_len); bool done = false; @@ -709,18 +708,16 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length, assert(compress_format_version != 2); uint32_t old_sz = output_len; output_len = output_len * 1.2; - char* tmp = new char[output_len]; - memcpy(tmp, output, old_sz); - delete[] output; - output = tmp; + auto tmp = AllocateBlock(output_len, allocator); + memcpy(tmp.get(), output.get(), old_sz); + output = std::move(tmp); // Set more output. - _stream.next_out = (char*)(output + old_sz); + _stream.next_out = (char*)(output.get() + old_sz); _stream.avail_out = static_cast(output_len - old_sz); break; } default: - delete[] output; BZ2_bzDecompressEnd(&_stream); return nullptr; } @@ -814,10 +811,12 @@ inline bool LZ4_Compress(const CompressionContext& ctx, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* LZ4_Uncompress(const UncompressionContext& ctx, - const char* input_data, size_t input_length, - int* decompress_size, - uint32_t compress_format_version) { +inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx, + const char* input_data, + size_t input_length, + int* decompress_size, + uint32_t compress_format_version, + CacheAllocator* allocator = nullptr) { #ifdef LZ4 uint32_t output_len = 0; if (compress_format_version == 2) { @@ -837,7 +836,7 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx, input_data += 8; } - char* output = new char[output_len]; + auto output = AllocateBlock(output_len, allocator); #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); if (ctx.dict().size()) { @@ -845,17 +844,16 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx, static_cast(ctx.dict().size())); } *decompress_size = LZ4_decompress_safe_continue( - stream, input_data, output, static_cast(input_length), + stream, input_data, output.get(), static_cast(input_length), static_cast(output_len)); LZ4_freeStreamDecode(stream); #else // up to r123 - *decompress_size = - LZ4_decompress_safe(input_data, output, static_cast(input_length), - static_cast(output_len)); + *decompress_size = LZ4_decompress_safe(input_data, output.get(), + static_cast(input_length), + static_cast(output_len)); #endif // LZ4_VERSION_NUMBER >= 10400 if (*decompress_size < 0) { - delete[] output; return nullptr; } assert(*decompress_size == static_cast(output_len)); @@ -866,6 +864,7 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx, (void)input_length; (void)decompress_size; (void)compress_format_version; + (void)allocator; return nullptr; #endif } @@ -1028,9 +1027,11 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input, // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* ZSTD_Uncompress(const UncompressionContext& ctx, - const char* input_data, size_t input_length, - int* decompress_size) { +inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx, + const char* input_data, + size_t input_length, + int* decompress_size, + CacheAllocator* allocator = nullptr) { #ifdef ZSTD uint32_t output_len = 0; if (!compression::GetDecompressedSizeInfo(&input_data, &input_length, @@ -1038,17 +1039,17 @@ inline char* ZSTD_Uncompress(const UncompressionContext& ctx, return nullptr; } - char* output = new char[output_len]; + auto output = AllocateBlock(output_len, allocator); size_t actual_output_length; #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ ZSTD_DCtx* context = ctx.GetZSTDContext(); assert(context != nullptr); actual_output_length = ZSTD_decompress_usingDict( - context, output, output_len, input_data, input_length, ctx.dict().data(), - ctx.dict().size()); + context, output.get(), output_len, input_data, input_length, + ctx.dict().data(), ctx.dict().size()); #else // up to v0.4.x actual_output_length = - ZSTD_decompress(output, output_len, input_data, input_length); + ZSTD_decompress(output.get(), output_len, input_data, input_length); #endif // ZSTD_VERSION_NUMBER >= 500 assert(actual_output_length == output_len); *decompress_size = static_cast(actual_output_length); @@ -1058,6 +1059,7 @@ inline char* ZSTD_Uncompress(const UncompressionContext& ctx, (void)input_data; (void)input_length; (void)decompress_size; + (void)allocator; return nullptr; #endif }