diff --git a/HISTORY.md b/HISTORY.md index b4baa7162..d0503aced 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ ### New Features * TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. Could be used for optimizing certain transactions or during recovery. +* Introduced CacheAllocator, which lets the user specify custom allocator for memory in block cache. ### Bug Fixes * Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction. diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index d4cbb9a45..9f3acd16a 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -461,8 +461,10 @@ std::string LRUCacheShard::GetPrintableOptions() const { } LRUCache::LRUCache(size_t capacity, int num_shard_bits, - bool strict_capacity_limit, double high_pri_pool_ratio) - : ShardedCache(capacity, num_shard_bits, strict_capacity_limit) { + bool strict_capacity_limit, double high_pri_pool_ratio, + std::shared_ptr allocator) + : ShardedCache(capacity, num_shard_bits, strict_capacity_limit, + std::move(allocator)) { num_shards_ = 1 << num_shard_bits; shards_ = reinterpret_cast( port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_)); @@ -537,12 +539,14 @@ double LRUCache::GetHighPriPoolRatio() { std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts) { return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, cache_opts.strict_capacity_limit, - cache_opts.high_pri_pool_ratio); + cache_opts.high_pri_pool_ratio, + cache_opts.cache_allocator); } -std::shared_ptr NewLRUCache(size_t capacity, int num_shard_bits, - bool strict_capacity_limit, - double high_pri_pool_ratio) { +std::shared_ptr NewLRUCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr cache_allocator) { if (num_shard_bits >= 20) { return nullptr; // the cache cannot be sharded into too many fine pieces } @@ -554,7 +558,8 @@ std::shared_ptr NewLRUCache(size_t capacity, int num_shard_bits, num_shard_bits = GetDefaultCacheShardBits(capacity); } return std::make_shared(capacity, num_shard_bits, - strict_capacity_limit, high_pri_pool_ratio); + strict_capacity_limit, high_pri_pool_ratio, + std::move(cache_allocator)); } } // namespace rocksdb diff --git a/cache/lru_cache.h b/cache/lru_cache.h index 3c067f0c1..0b925a166 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -279,7 +279,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard { class LRUCache : public ShardedCache { public: LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, - double high_pri_pool_ratio); + double high_pri_pool_ratio, + std::shared_ptr cache_allocator = nullptr); virtual ~LRUCache(); virtual const char* Name() const override { return "LRUCache"; } virtual CacheShard* GetShard(int shard) override; diff --git a/cache/sharded_cache.cc b/cache/sharded_cache.cc index 6a0a22282..3fef82a79 100644 --- a/cache/sharded_cache.cc +++ b/cache/sharded_cache.cc @@ -20,8 +20,10 @@ namespace rocksdb { ShardedCache::ShardedCache(size_t capacity, int num_shard_bits, - bool strict_capacity_limit) - : num_shard_bits_(num_shard_bits), + bool strict_capacity_limit, + std::shared_ptr allocator) + : Cache(std::move(allocator)), + num_shard_bits_(num_shard_bits), capacity_(capacity), strict_capacity_limit_(strict_capacity_limit), last_id_(1) {} @@ -142,6 +144,9 @@ std::string ShardedCache::GetPrintableOptions() const { strict_capacity_limit_); ret.append(buffer); } + snprintf(buffer, kBufferSize, " cache_allocator : %s\n", + cache_allocator() ? cache_allocator()->Name() : "None"); + ret.append(buffer); ret.append(GetShard(0)->GetPrintableOptions()); return ret; } diff --git a/cache/sharded_cache.h b/cache/sharded_cache.h index 4f9dea2ad..9876a882b 100644 --- a/cache/sharded_cache.h +++ b/cache/sharded_cache.h @@ -47,7 +47,8 @@ class CacheShard { // Keys are sharded by the highest num_shard_bits bits of hash value. class ShardedCache : public Cache { public: - ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit); + ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, + std::shared_ptr cache_allocator = nullptr); virtual ~ShardedCache() = default; virtual const char* Name() const override = 0; virtual CacheShard* GetShard(int shard) = 0; diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index da3b934d8..d4d2b6510 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -25,6 +25,7 @@ #include #include #include +#include "rocksdb/cache_allocator.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" @@ -58,13 +59,20 @@ struct LRUCacheOptions { // BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority. double high_pri_pool_ratio = 0.0; + // If non-nullptr will use this allocator instead of system allocator when + // allocating memory for cache blocks. Call this method before you start using + // the cache! + std::shared_ptr cache_allocator; + LRUCacheOptions() {} LRUCacheOptions(size_t _capacity, int _num_shard_bits, - bool _strict_capacity_limit, double _high_pri_pool_ratio) + bool _strict_capacity_limit, double _high_pri_pool_ratio, + std::shared_ptr _cache_allocator = nullptr) : capacity(_capacity), num_shard_bits(_num_shard_bits), strict_capacity_limit(_strict_capacity_limit), - high_pri_pool_ratio(_high_pri_pool_ratio) {} + high_pri_pool_ratio(_high_pri_pool_ratio), + cache_allocator(std::move(_cache_allocator)) {} }; // Create a new cache with a fixed size capacity. The cache is sharded @@ -75,10 +83,10 @@ struct LRUCacheOptions { // high_pri_pool_pct. // num_shard_bits = -1 means it is automatically determined: every shard // will be at least 512KB and number of shard bits will not exceed 6. -extern std::shared_ptr NewLRUCache(size_t capacity, - int num_shard_bits = -1, - bool strict_capacity_limit = false, - double high_pri_pool_ratio = 0.0); +extern std::shared_ptr NewLRUCache( + size_t capacity, int num_shard_bits = -1, + bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0, + std::shared_ptr cache_allocator = nullptr); extern std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts); @@ -91,13 +99,15 @@ extern std::shared_ptr NewClockCache(size_t capacity, int num_shard_bits = -1, bool strict_capacity_limit = false); + class Cache { public: // Depending on implementation, cache entries with high priority could be less // likely to get evicted than low priority entries. enum class Priority { HIGH, LOW }; - Cache() {} + Cache(std::shared_ptr allocator = nullptr) + : cache_allocator_(std::move(allocator)) {} // Destroys all existing entries by calling the "deleter" // function that was passed via the Insert() function. @@ -228,10 +238,14 @@ class Cache { virtual void TEST_mark_as_data_block(const Slice& /*key*/, size_t /*charge*/) {} + CacheAllocator* cache_allocator() const { return cache_allocator_.get(); } + private: // No copying allowed Cache(const Cache&); Cache& operator=(const Cache&); + + std::shared_ptr cache_allocator_; }; } // namespace rocksdb diff --git a/include/rocksdb/cache_allocator.h b/include/rocksdb/cache_allocator.h new file mode 100644 index 000000000..5bbec0e8a --- /dev/null +++ b/include/rocksdb/cache_allocator.h @@ -0,0 +1,29 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +// CacheAllocator is an interface that a client can implement to supply custom +// cache allocation and deallocation methods. See rocksdb/cache.h for more +// information. +// All methods should be thread-safe. +class CacheAllocator { + public: + virtual ~CacheAllocator() = default; + + // Name of the cache allocator, printed in the log + virtual const char* Name() const = 0; + + // Allocate a block of at least size size + virtual void* Allocate(size_t size) = 0; + // Deallocate previously allocated block + virtual void Deallocate(void* p) = 0; + // Returns the memory size of the block allocated at p. The default + // implementation that just returns the original allocation_size is fine. + virtual size_t UsableSize(void* /*p*/, size_t allocation_size) const { + // default implementation just returns the allocation size + return allocation_size; + } +}; diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 59c385d65..26c14d21d 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -39,6 +39,7 @@ #include "table/full_filter_block.h" #include "table/table_builder.h" +#include "util/cache_allocator.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -654,7 +655,8 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, size_t size = block_contents.size(); - std::unique_ptr ubuf(new char[size + 1]); + auto ubuf = + AllocateBlock(size + 1, block_cache_compressed->cache_allocator()); memcpy(ubuf.get(), block_contents.data(), size); ubuf[size] = type; diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 9f2e02d68..7e7cec1b1 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -80,11 +80,12 @@ Status ReadBlockFromFile( std::unique_ptr* result, const ImmutableCFOptions& ioptions, bool do_uncompress, const Slice& compression_dict, const PersistentCacheOptions& cache_options, SequenceNumber global_seqno, - size_t read_amp_bytes_per_bit, const bool immortal_file = false) { + size_t read_amp_bytes_per_bit, CacheAllocator* allocator = nullptr, + const bool immortal_file = false) { BlockContents contents; - BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle, - &contents, ioptions, do_uncompress, - compression_dict, cache_options, immortal_file); + BlockFetcher block_fetcher( + file, prefetch_buffer, footer, options, handle, &contents, ioptions, + do_uncompress, compression_dict, cache_options, allocator, immortal_file); Status s = block_fetcher.ReadBlockContents(); if (s.ok()) { result->reset(new Block(std::move(contents), global_seqno, @@ -94,6 +95,13 @@ Status ReadBlockFromFile( return s; } +inline CacheAllocator* GetCacheAllocator( + const BlockBasedTableOptions& table_options) { + return table_options.block_cache.get() + ? table_options.block_cache->cache_allocator() + : nullptr; +} + // Delete the resource that is held by the iterator. template void DeleteHeldResource(void* arg, void* /*ignored*/) { @@ -1150,7 +1158,8 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, rep->footer.metaindex_handle(), &meta, rep->ioptions, true /* decompress */, Slice() /*compression dict*/, rep->persistent_cache_options, kDisableGlobalSequenceNumber, - 0 /* read_amp_bytes_per_bit */); + 0 /* read_amp_bytes_per_bit */, + GetCacheAllocator(rep->table_options)); if (!s.ok()) { ROCKS_LOG_ERROR(rep->ioptions.info_log, @@ -1173,7 +1182,7 @@ Status BlockBasedTable::GetDataBlockFromCache( const ImmutableCFOptions& ioptions, const ReadOptions& read_options, BlockBasedTable::CachableEntry* block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index, - GetContext* get_context) { + GetContext* get_context, CacheAllocator* allocator) { Status s; Block* compressed_block = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr; @@ -1230,7 +1239,7 @@ Status BlockBasedTable::GetDataBlockFromCache( compression_dict); s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(), compressed_block->size(), &contents, - format_version, ioptions); + format_version, ioptions, allocator); // Insert uncompressed block into block cache if (s.ok()) { @@ -1292,7 +1301,8 @@ Status BlockBasedTable::PutDataBlockToCache( const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions, CachableEntry* block, Block* raw_block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index, - Cache::Priority priority, GetContext* get_context) { + Cache::Priority priority, GetContext* get_context, + CacheAllocator* allocator) { assert(raw_block->compression_type() == kNoCompression || block_cache_compressed != nullptr); @@ -1305,7 +1315,7 @@ Status BlockBasedTable::PutDataBlockToCache( compression_dict); s = UncompressBlockContents(uncompression_ctx, raw_block->data(), raw_block->size(), &contents, format_version, - ioptions); + ioptions, allocator); } if (!s.ok()) { delete raw_block; @@ -1402,7 +1412,8 @@ FilterBlockReader* BlockBasedTable::ReadFilter( BlockFetcher block_fetcher(rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(), filter_handle, &block, rep->ioptions, false /* decompress */, - dummy_comp_dict, rep->persistent_cache_options); + dummy_comp_dict, rep->persistent_cache_options, + GetCacheAllocator(rep->table_options)); Status s = block_fetcher.ReadBlockContents(); if (!s.ok()) { @@ -1700,7 +1711,9 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( &block_value, rep->ioptions, rep->blocks_maybe_compressed, compression_dict, rep->persistent_cache_options, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit, rep->immortal_table); + rep->table_options.read_amp_bytes_per_bit, + GetCacheAllocator(rep->table_options), + rep->immortal_table); } if (s.ok()) { block.value = block_value.release(); @@ -1792,7 +1805,8 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( s = GetDataBlockFromCache( key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro, block_entry, rep->table_options.format_version, compression_dict, - rep->table_options.read_amp_bytes_per_bit, is_index, get_context); + rep->table_options.read_amp_bytes_per_bit, is_index, get_context, + GetCacheAllocator(rep->table_options)); if (block_entry->value == nullptr && !no_io && ro.fill_cache) { std::unique_ptr raw_block; @@ -1804,7 +1818,9 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( block_cache_compressed == nullptr && rep->blocks_maybe_compressed, compression_dict, rep->persistent_cache_options, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit, rep->immortal_table); + rep->table_options.read_amp_bytes_per_bit, + GetCacheAllocator(rep->table_options), + rep->immortal_table); } if (s.ok()) { @@ -1817,7 +1833,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( .cache_index_and_filter_blocks_with_high_priority ? Cache::Priority::HIGH : Cache::Priority::LOW, - get_context); + get_context, GetCacheAllocator(rep->table_options)); } } } @@ -2524,11 +2540,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks( BlockHandle handle = index_iter->value(); BlockContents contents; Slice dummy_comp_dict; - BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */, - rep_->footer, ReadOptions(), handle, &contents, - rep_->ioptions, false /* decompress */, - dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options); + BlockFetcher block_fetcher( + rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, + ReadOptions(), handle, &contents, rep_->ioptions, + false /* decompress */, dummy_comp_dict /*compression dict*/, + rep_->persistent_cache_options, + GetCacheAllocator(rep_->table_options)); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { break; @@ -2550,11 +2567,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks( s = handle.DecodeFrom(&input); BlockContents contents; Slice dummy_comp_dict; - BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */, - rep_->footer, ReadOptions(), handle, &contents, - rep_->ioptions, false /* decompress */, - dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options); + BlockFetcher block_fetcher( + rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, + ReadOptions(), handle, &contents, rep_->ioptions, + false /* decompress */, dummy_comp_dict /*compression dict*/, + rep_->persistent_cache_options, + GetCacheAllocator(rep_->table_options)); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { break; @@ -2858,7 +2876,8 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file, rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer, ReadOptions(), handle, &block, rep_->ioptions, false /*decompress*/, dummy_comp_dict /*compression dict*/, - rep_->persistent_cache_options); + rep_->persistent_cache_options, + GetCacheAllocator(rep_->table_options)); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { rep_->filter.reset(new BlockBasedFilterBlockReader( diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 3cada0c2c..59a0f36b5 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -303,7 +303,8 @@ class BlockBasedTable : public TableReader { const ImmutableCFOptions& ioptions, const ReadOptions& read_options, BlockBasedTable::CachableEntry* block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, - bool is_index = false, GetContext* get_context = nullptr); + bool is_index = false, GetContext* get_context = nullptr, + CacheAllocator* allocator = nullptr); // Put a raw block (maybe compressed) to the corresponding block caches. // This method will perform decompression against raw_block if needed and then @@ -322,7 +323,7 @@ class BlockBasedTable : public TableReader { CachableEntry* block, Block* raw_block, uint32_t format_version, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index = false, Cache::Priority pri = Cache::Priority::LOW, - GetContext* get_context = nullptr); + GetContext* get_context = nullptr, CacheAllocator* allocator = nullptr); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index ea97066ec..489705758 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -17,8 +17,9 @@ #include "rocksdb/env.h" #include "table/block.h" #include "table/block_based_table_reader.h" -#include "table/persistent_cache_helper.h" #include "table/format.h" +#include "table/persistent_cache_helper.h" +#include "util/cache_allocator.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -107,9 +108,11 @@ bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() { if (cache_options_.persistent_cache && cache_options_.persistent_cache->IsCompressed()) { // lookup uncompressed cache mode p-cache + std::unique_ptr raw_data; status_ = PersistentCacheHelper::LookupRawPage( - cache_options_, handle_, &heap_buf_, block_size_ + kBlockTrailerSize); + cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize); if (status_.ok()) { + heap_buf_ = CacheAllocationPtr(raw_data.release()); used_buf_ = heap_buf_.get(); slice_ = Slice(heap_buf_.get(), block_size_); return true; @@ -132,7 +135,7 @@ void BlockFetcher::PrepareBufferForBlockFromFile() { // trivially allocated stack buffer instead of needing a full malloc() used_buf_ = &stack_buf_[0]; } else { - heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]); + heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_); used_buf_ = heap_buf_.get(); } } @@ -170,7 +173,7 @@ void BlockFetcher::GetBlockContents() { // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096 if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) { assert(used_buf_ != heap_buf_.get()); - heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]); + heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_); memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize); } *contents_ = BlockContents(std::move(heap_buf_), block_size_, true, @@ -228,9 +231,9 @@ Status BlockFetcher::ReadBlockContents() { if (do_uncompress_ && compression_type != kNoCompression) { // compressed page, uncompress, update cache UncompressionContext uncompression_ctx(compression_type, compression_dict_); - status_ = - UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_, - contents_, footer_.version(), ioptions_); + status_ = UncompressBlockContents(uncompression_ctx, slice_.data(), + block_size_, contents_, footer_.version(), + ioptions_, allocator_); } else { GetBlockContents(); } diff --git a/table/block_fetcher.h b/table/block_fetcher.h index 9e0d2448d..a8d9d6572 100644 --- a/table/block_fetcher.h +++ b/table/block_fetcher.h @@ -11,6 +11,8 @@ #include "table/block.h" #include "table/format.h" +#include "util/cache_allocator.h" + namespace rocksdb { class BlockFetcher { public: @@ -26,6 +28,7 @@ class BlockFetcher { BlockContents* contents, const ImmutableCFOptions& ioptions, bool do_uncompress, const Slice& compression_dict, const PersistentCacheOptions& cache_options, + CacheAllocator* allocator = nullptr, const bool immortal_source = false) : file_(file), prefetch_buffer_(prefetch_buffer), @@ -37,7 +40,8 @@ class BlockFetcher { do_uncompress_(do_uncompress), immortal_source_(immortal_source), compression_dict_(compression_dict), - cache_options_(cache_options) {} + cache_options_(cache_options), + allocator_(allocator) {} Status ReadBlockContents(); private: @@ -54,11 +58,12 @@ class BlockFetcher { const bool immortal_source_; const Slice& compression_dict_; const PersistentCacheOptions& cache_options_; + CacheAllocator* allocator_; Status status_; Slice slice_; char* used_buf_ = nullptr; size_t block_size_; - std::unique_ptr heap_buf_; + CacheAllocationPtr heap_buf_; char stack_buf_[kDefaultStackBufferSize]; bool got_from_prefetch_buffer_ = false; rocksdb::CompressionType compression_type; diff --git a/table/format.cc b/table/format.cc index 16d959c3d..f565fb14a 100644 --- a/table/format.cc +++ b/table/format.cc @@ -19,6 +19,7 @@ #include "table/block_based_table_reader.h" #include "table/block_fetcher.h" #include "table/persistent_cache_helper.h" +#include "util/cache_allocator.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -279,8 +280,9 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, Status UncompressBlockContentsForCompressionType( const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t format_version, - const ImmutableCFOptions& ioptions) { - std::unique_ptr ubuf; + const ImmutableCFOptions& ioptions, + CacheAllocator* allocator) { + CacheAllocationPtr ubuf; assert(uncompression_ctx.type() != kNoCompression && "Invalid compression type"); @@ -296,7 +298,7 @@ Status UncompressBlockContentsForCompressionType( if (!Snappy_GetUncompressedLength(data, n, &ulength)) { return Status::Corruption(snappy_corrupt_msg); } - ubuf.reset(new char[ulength]); + ubuf = AllocateBlock(ulength, allocator); if (!Snappy_Uncompress(data, n, ubuf.get())) { return Status::Corruption(snappy_corrupt_msg); } @@ -304,9 +306,10 @@ Status UncompressBlockContentsForCompressionType( break; } case kZlibCompression: - ubuf.reset(Zlib_Uncompress( + ubuf = Zlib_Uncompress( uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kZlibCompression, format_version))); + GetCompressFormatForVersion(kZlibCompression, format_version), + allocator); if (!ubuf) { static char zlib_corrupt_msg[] = "Zlib not supported or corrupted Zlib compressed block contents"; @@ -316,9 +319,10 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kBZip2Compression: - ubuf.reset(BZip2_Uncompress( + ubuf = BZip2_Uncompress( data, n, &decompress_size, - GetCompressFormatForVersion(kBZip2Compression, format_version))); + GetCompressFormatForVersion(kBZip2Compression, format_version), + allocator); if (!ubuf) { static char bzip2_corrupt_msg[] = "Bzip2 not supported or corrupted Bzip2 compressed block contents"; @@ -328,9 +332,10 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4Compression: - ubuf.reset(LZ4_Uncompress( + ubuf = LZ4_Uncompress( uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4Compression, format_version))); + GetCompressFormatForVersion(kLZ4Compression, format_version), + allocator); if (!ubuf) { static char lz4_corrupt_msg[] = "LZ4 not supported or corrupted LZ4 compressed block contents"; @@ -340,9 +345,10 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4HCCompression: - ubuf.reset(LZ4_Uncompress( + ubuf = LZ4_Uncompress( uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4HCCompression, format_version))); + GetCompressFormatForVersion(kLZ4HCCompression, format_version), + allocator); if (!ubuf) { static char lz4hc_corrupt_msg[] = "LZ4HC not supported or corrupted LZ4HC compressed block contents"; @@ -352,6 +358,8 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kXpressCompression: + // XPRESS allocates memory internally, thus no support for custom + // allocator. ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size)); if (!ubuf) { static char xpress_corrupt_msg[] = @@ -363,7 +371,8 @@ Status UncompressBlockContentsForCompressionType( break; case kZSTD: case kZSTDNotFinalCompression: - ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size)); + ubuf = ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size, + allocator); if (!ubuf) { static char zstd_corrupt_msg[] = "ZSTD not supported or corrupted ZSTD compressed block contents"; @@ -396,11 +405,13 @@ Status UncompressBlockContentsForCompressionType( Status UncompressBlockContents(const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t format_version, - const ImmutableCFOptions& ioptions) { + const ImmutableCFOptions& ioptions, + CacheAllocator* allocator) { assert(data[n] != kNoCompression); assert(data[n] == uncompression_ctx.type()); - return UncompressBlockContentsForCompressionType( - uncompression_ctx, data, n, contents, format_version, ioptions); + return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n, + contents, format_version, + ioptions, allocator); } } // namespace rocksdb diff --git a/table/format.h b/table/format.h index 6e0e99c1c..441d7107e 100644 --- a/table/format.h +++ b/table/format.h @@ -26,6 +26,7 @@ #include "port/port.h" // noexcept #include "table/persistent_cache_options.h" #include "util/file_reader_writer.h" +#include "util/cache_allocator.h" namespace rocksdb { @@ -192,7 +193,7 @@ struct BlockContents { Slice data; // Actual contents of data bool cachable; // True iff data can be cached CompressionType compression_type; - std::unique_ptr allocation; + CacheAllocationPtr allocation; BlockContents() : cachable(false), compression_type(kNoCompression) {} @@ -200,16 +201,28 @@ struct BlockContents { CompressionType _compression_type) : data(_data), cachable(_cachable), compression_type(_compression_type) {} - BlockContents(std::unique_ptr&& _data, size_t _size, bool _cachable, + BlockContents(CacheAllocationPtr&& _data, size_t _size, bool _cachable, CompressionType _compression_type) : data(_data.get(), _size), cachable(_cachable), compression_type(_compression_type), allocation(std::move(_data)) {} + BlockContents(std::unique_ptr&& _data, size_t _size, bool _cachable, + CompressionType _compression_type) + : data(_data.get(), _size), + cachable(_cachable), + compression_type(_compression_type) { + allocation.reset(_data.release()); + } + // The additional memory space taken by the block data. size_t usable_size() const { if (allocation.get() != nullptr) { + auto allocator = allocation.get_deleter().allocator; + if (allocator) { + return allocator->UsableSize(allocation.get(), data.size()); + } #ifdef ROCKSDB_MALLOC_USABLE_SIZE return malloc_usable_size(allocation.get()); #else @@ -252,7 +265,7 @@ extern Status ReadBlockContents( extern Status UncompressBlockContents( const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t compress_format_version, - const ImmutableCFOptions& ioptions); + const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr); // This is an extension to UncompressBlockContents that accepts // a specific compression type. This is used by un-wrapped blocks @@ -260,7 +273,7 @@ extern Status UncompressBlockContents( extern Status UncompressBlockContentsForCompressionType( const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, uint32_t compress_format_version, - const ImmutableCFOptions& ioptions); + const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr); // Implementation details follow. Clients should ignore, diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index df08a98fa..d7a8ed4aa 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -153,8 +153,8 @@ class PlainTableReader: public TableReader { DynamicBloom bloom_; PlainTableReaderFileInfo file_info_; Arena arena_; - std::unique_ptr index_block_alloc_; - std::unique_ptr bloom_block_alloc_; + CacheAllocationPtr index_block_alloc_; + CacheAllocationPtr bloom_block_alloc_; const ImmutableCFOptions& ioptions_; uint64_t file_size_; diff --git a/table/table_test.cc b/table/table_test.cc index 26383fa81..a525c5866 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -2477,6 +2477,78 @@ TEST_P(BlockBasedTableTest, BlockCacheLeak) { c.ResetTableReader(); } +namespace { +class CustomCacheAllocator : public CacheAllocator { + public: + virtual const char* Name() const override { return "CustomCacheAllocator"; } + + void* Allocate(size_t size) override { + ++numAllocations; + auto ptr = new char[size + 16]; + memcpy(ptr, "cache_allocator_", 16); // mangle first 16 bytes + return reinterpret_cast(ptr + 16); + } + void Deallocate(void* p) override { + ++numDeallocations; + char* ptr = reinterpret_cast(p) - 16; + delete[] ptr; + } + + std::atomic numAllocations; + std::atomic numDeallocations; +}; +} // namespace + +TEST_P(BlockBasedTableTest, CacheAllocator) { + auto custom_cache_allocator = std::make_shared(); + { + Options opt; + unique_ptr ikc; + ikc.reset(new test::PlainInternalKeyComparator(opt.comparator)); + opt.compression = kNoCompression; + BlockBasedTableOptions table_options; + table_options.block_size = 1024; + LRUCacheOptions lruOptions; + lruOptions.cache_allocator = custom_cache_allocator; + lruOptions.capacity = 16 * 1024 * 1024; + lruOptions.num_shard_bits = 4; + table_options.block_cache = NewLRUCache(std::move(lruOptions)); + opt.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + TableConstructor c(BytewiseComparator(), + true /* convert_to_internal_key_ */); + c.Add("k01", "hello"); + c.Add("k02", "hello2"); + c.Add("k03", std::string(10000, 'x')); + c.Add("k04", std::string(200000, 'x')); + c.Add("k05", std::string(300000, 'x')); + c.Add("k06", "hello3"); + c.Add("k07", std::string(100000, 'x')); + std::vector keys; + stl_wrappers::KVMap kvmap; + const ImmutableCFOptions ioptions(opt); + const MutableCFOptions moptions(opt); + c.Finish(opt, ioptions, moptions, table_options, *ikc, &keys, &kvmap); + + unique_ptr iter( + c.NewIterator(moptions.prefix_extractor.get())); + iter->SeekToFirst(); + while (iter->Valid()) { + iter->key(); + iter->value(); + iter->Next(); + } + ASSERT_OK(iter->status()); + } + + // out of scope, block cache should have been deleted, all allocations + // deallocated + EXPECT_EQ(custom_cache_allocator->numAllocations.load(), + custom_cache_allocator->numDeallocations.load()); + // make sure that allocations actually happened through the cache allocator + EXPECT_GT(custom_cache_allocator->numAllocations.load(), 0); +} + TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) { // A regression test to avoid data race described in // https://github.com/facebook/rocksdb/issues/1267 diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index e3560d6fa..9b72f3a91 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -3040,7 +3040,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { int64_t bytes = 0; int decompress_size; while (ok && bytes < 1024 * 1048576) { - char *uncompressed = nullptr; + CacheAllocationPtr uncompressed; switch (FLAGS_compression_type_e) { case rocksdb::kSnappyCompression: { // get size and allocate here to make comparison fair @@ -3050,45 +3050,44 @@ void VerifyDBFromDB(std::string& truth_db_name) { ok = false; break; } - uncompressed = new char[ulength]; + uncompressed = AllocateBlock(ulength, nullptr); ok = Snappy_Uncompress(compressed.data(), compressed.size(), - uncompressed); + uncompressed.get()); break; } case rocksdb::kZlibCompression: uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed != nullptr; + ok = uncompressed.get() != nullptr; break; case rocksdb::kBZip2Compression: uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed != nullptr; + ok = uncompressed.get() != nullptr; break; case rocksdb::kLZ4Compression: uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed != nullptr; + ok = uncompressed.get() != nullptr; break; case rocksdb::kLZ4HCCompression: uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size, 2); - ok = uncompressed != nullptr; + ok = uncompressed.get() != nullptr; break; case rocksdb::kXpressCompression: - uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(), - &decompress_size); - ok = uncompressed != nullptr; + uncompressed.reset(XPRESS_Uncompress( + compressed.data(), compressed.size(), &decompress_size)); + ok = uncompressed.get() != nullptr; break; case rocksdb::kZSTD: uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(), compressed.size(), &decompress_size); - ok = uncompressed != nullptr; + ok = uncompressed.get() != nullptr; break; default: ok = false; } - delete[] uncompressed; bytes += input.size(); thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress); } diff --git a/util/cache_allocator.h b/util/cache_allocator.h new file mode 100644 index 000000000..68dd3dcea --- /dev/null +++ b/util/cache_allocator.h @@ -0,0 +1,38 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#pragma once + +#include "rocksdb/cache_allocator.h" + +namespace rocksdb { + +struct CustomDeleter { + CustomDeleter(CacheAllocator* a = nullptr) : allocator(a) {} + + void operator()(char* ptr) const { + if (allocator) { + allocator->Deallocate(reinterpret_cast(ptr)); + } else { + delete[] ptr; + } + } + + CacheAllocator* allocator; +}; + +using CacheAllocationPtr = std::unique_ptr; + +inline CacheAllocationPtr AllocateBlock(size_t size, + CacheAllocator* allocator) { + if (allocator) { + auto block = reinterpret_cast(allocator->Allocate(size)); + return CacheAllocationPtr(block, allocator); + } + return CacheAllocationPtr(new char[size]); +} + +} // namespace rocksdb diff --git a/util/compression.h b/util/compression.h index e918e14fb..d7bab05ed 100644 --- a/util/compression.h +++ b/util/compression.h @@ -14,6 +14,8 @@ #include #include "rocksdb/options.h" +#include "rocksdb/table.h" +#include "util/cache_allocator.h" #include "util/coding.h" #include "util/compression_context_cache.h" @@ -495,11 +497,10 @@ inline bool Zlib_Compress(const CompressionContext& ctx, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* Zlib_Uncompress(const UncompressionContext& ctx, - const char* input_data, size_t input_length, - int* decompress_size, - uint32_t compress_format_version, - int windowBits = -14) { +inline CacheAllocationPtr Zlib_Uncompress( + const UncompressionContext& ctx, const char* input_data, + size_t input_length, int* decompress_size, uint32_t compress_format_version, + CacheAllocator* allocator = nullptr, int windowBits = -14) { #ifdef ZLIB uint32_t output_len = 0; if (compress_format_version == 2) { @@ -541,9 +542,9 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx, _stream.next_in = (Bytef*)input_data; _stream.avail_in = static_cast(input_length); - char* output = new char[output_len]; + auto output = AllocateBlock(output_len, allocator); - _stream.next_out = (Bytef*)output; + _stream.next_out = (Bytef*)output.get(); _stream.avail_out = static_cast(output_len); bool done = false; @@ -561,19 +562,17 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx, size_t old_sz = output_len; uint32_t output_len_delta = output_len / 5; output_len += output_len_delta < 10 ? 10 : output_len_delta; - char* tmp = new char[output_len]; - memcpy(tmp, output, old_sz); - delete[] output; - output = tmp; + auto tmp = AllocateBlock(output_len, allocator); + memcpy(tmp.get(), output.get(), old_sz); + output = std::move(tmp); // Set more output. - _stream.next_out = (Bytef*)(output + old_sz); + _stream.next_out = (Bytef*)(output.get() + old_sz); _stream.avail_out = static_cast(output_len - old_sz); break; } case Z_BUF_ERROR: default: - delete[] output; inflateEnd(&_stream); return nullptr; } @@ -660,9 +659,9 @@ inline bool BZip2_Compress(const CompressionContext& /*ctx*/, // block header // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format -inline char* BZip2_Uncompress(const char* input_data, size_t input_length, - int* decompress_size, - uint32_t compress_format_version) { +inline CacheAllocationPtr BZip2_Uncompress( + const char* input_data, size_t input_length, int* decompress_size, + uint32_t compress_format_version, CacheAllocator* allocator = nullptr) { #ifdef BZIP2 uint32_t output_len = 0; if (compress_format_version == 2) { @@ -690,9 +689,9 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length, _stream.next_in = (char*)input_data; _stream.avail_in = static_cast(input_length); - char* output = new char[output_len]; + auto output = AllocateBlock(output_len, allocator); - _stream.next_out = (char*)output; + _stream.next_out = (char*)output.get(); _stream.avail_out = static_cast(output_len); bool done = false; @@ -709,18 +708,16 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length, assert(compress_format_version != 2); uint32_t old_sz = output_len; output_len = output_len * 1.2; - char* tmp = new char[output_len]; - memcpy(tmp, output, old_sz); - delete[] output; - output = tmp; + auto tmp = AllocateBlock(output_len, allocator); + memcpy(tmp.get(), output.get(), old_sz); + output = std::move(tmp); // Set more output. - _stream.next_out = (char*)(output + old_sz); + _stream.next_out = (char*)(output.get() + old_sz); _stream.avail_out = static_cast(output_len - old_sz); break; } default: - delete[] output; BZ2_bzDecompressEnd(&_stream); return nullptr; } @@ -814,10 +811,12 @@ inline bool LZ4_Compress(const CompressionContext& ctx, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* LZ4_Uncompress(const UncompressionContext& ctx, - const char* input_data, size_t input_length, - int* decompress_size, - uint32_t compress_format_version) { +inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx, + const char* input_data, + size_t input_length, + int* decompress_size, + uint32_t compress_format_version, + CacheAllocator* allocator = nullptr) { #ifdef LZ4 uint32_t output_len = 0; if (compress_format_version == 2) { @@ -837,7 +836,7 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx, input_data += 8; } - char* output = new char[output_len]; + auto output = AllocateBlock(output_len, allocator); #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); if (ctx.dict().size()) { @@ -845,17 +844,16 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx, static_cast(ctx.dict().size())); } *decompress_size = LZ4_decompress_safe_continue( - stream, input_data, output, static_cast(input_length), + stream, input_data, output.get(), static_cast(input_length), static_cast(output_len)); LZ4_freeStreamDecode(stream); #else // up to r123 - *decompress_size = - LZ4_decompress_safe(input_data, output, static_cast(input_length), - static_cast(output_len)); + *decompress_size = LZ4_decompress_safe(input_data, output.get(), + static_cast(input_length), + static_cast(output_len)); #endif // LZ4_VERSION_NUMBER >= 10400 if (*decompress_size < 0) { - delete[] output; return nullptr; } assert(*decompress_size == static_cast(output_len)); @@ -866,6 +864,7 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx, (void)input_length; (void)decompress_size; (void)compress_format_version; + (void)allocator; return nullptr; #endif } @@ -1028,9 +1027,11 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input, // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* ZSTD_Uncompress(const UncompressionContext& ctx, - const char* input_data, size_t input_length, - int* decompress_size) { +inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx, + const char* input_data, + size_t input_length, + int* decompress_size, + CacheAllocator* allocator = nullptr) { #ifdef ZSTD uint32_t output_len = 0; if (!compression::GetDecompressedSizeInfo(&input_data, &input_length, @@ -1038,17 +1039,17 @@ inline char* ZSTD_Uncompress(const UncompressionContext& ctx, return nullptr; } - char* output = new char[output_len]; + auto output = AllocateBlock(output_len, allocator); size_t actual_output_length; #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ ZSTD_DCtx* context = ctx.GetZSTDContext(); assert(context != nullptr); actual_output_length = ZSTD_decompress_usingDict( - context, output, output_len, input_data, input_length, ctx.dict().data(), - ctx.dict().size()); + context, output.get(), output_len, input_data, input_length, + ctx.dict().data(), ctx.dict().size()); #else // up to v0.4.x actual_output_length = - ZSTD_decompress(output, output_len, input_data, input_length); + ZSTD_decompress(output.get(), output_len, input_data, input_length); #endif // ZSTD_VERSION_NUMBER >= 500 assert(actual_output_length == output_len); *decompress_size = static_cast(actual_output_length); @@ -1058,6 +1059,7 @@ inline char* ZSTD_Uncompress(const UncompressionContext& ctx, (void)input_data; (void)input_length; (void)decompress_size; + (void)allocator; return nullptr; #endif }