Introduce CacheAllocator, a custom allocator for cache blocks (#4437)

Summary:
This is a conceptually simple change, but it touches many files to
pass the allocator through function calls.

We introduce CacheAllocator, which can be used by clients to configure
custom allocator for cache blocks. Our motivation is to hook this up
with folly's `JemallocNodumpAllocator`
(f43ce6d686/folly/experimental/JemallocNodumpAllocator.h),
but there are many other possible use cases.

Additionally, this commit cleans up memory allocation in
`util/compression.h`, making sure that all allocations are wrapped in a
unique_ptr as soon as possible.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4437

Differential Revision: D10132814

Pulled By: yiwu-arbug

fbshipit-source-id: be1343a4b69f6048df127939fea9bbc96969f564
main
Igor Canadi 6 years ago committed by Facebook Github Bot
parent 4e58b2ea3d
commit 1cf5deb8fd
  1. 1
      HISTORY.md
  2. 19
      cache/lru_cache.cc
  3. 3
      cache/lru_cache.h
  4. 9
      cache/sharded_cache.cc
  5. 3
      cache/sharded_cache.h
  6. 28
      include/rocksdb/cache.h
  7. 29
      include/rocksdb/cache_allocator.h
  8. 4
      table/block_based_table_builder.cc
  9. 69
      table/block_based_table_reader.cc
  10. 5
      table/block_based_table_reader.h
  11. 17
      table/block_fetcher.cc
  12. 9
      table/block_fetcher.h
  13. 41
      table/format.cc
  14. 21
      table/format.h
  15. 4
      table/plain_table_reader.h
  16. 72
      table/table_test.cc
  17. 23
      tools/db_bench_tool.cc
  18. 38
      util/cache_allocator.h
  19. 80
      util/compression.h

@ -7,6 +7,7 @@
### New Features
* TransactionOptions::skip_concurrency_control allows pessimistic transactions to skip the overhead of concurrency control. Could be used for optimizing certain transactions or during recovery.
* Introduced CacheAllocator, which lets the user specify custom allocator for memory in block cache.
### Bug Fixes
* Avoid creating empty SSTs and subsequently deleting them in certain cases during compaction.

19
cache/lru_cache.cc vendored

@ -461,8 +461,10 @@ std::string LRUCacheShard::GetPrintableOptions() const {
}
LRUCache::LRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit, double high_pri_pool_ratio)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit) {
bool strict_capacity_limit, double high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> allocator)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit,
std::move(allocator)) {
num_shards_ = 1 << num_shard_bits;
shards_ = reinterpret_cast<LRUCacheShard*>(
port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_));
@ -537,12 +539,14 @@ double LRUCache::GetHighPriPoolRatio() {
std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) {
return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit,
cache_opts.high_pri_pool_ratio);
cache_opts.high_pri_pool_ratio,
cache_opts.cache_allocator);
}
std::shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit,
double high_pri_pool_ratio) {
std::shared_ptr<Cache> NewLRUCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> cache_allocator) {
if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces
}
@ -554,7 +558,8 @@ std::shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
num_shard_bits = GetDefaultCacheShardBits(capacity);
}
return std::make_shared<LRUCache>(capacity, num_shard_bits,
strict_capacity_limit, high_pri_pool_ratio);
strict_capacity_limit, high_pri_pool_ratio,
std::move(cache_allocator));
}
} // namespace rocksdb

3
cache/lru_cache.h vendored

@ -279,7 +279,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard {
class LRUCache : public ShardedCache {
public:
LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio);
double high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
virtual ~LRUCache();
virtual const char* Name() const override { return "LRUCache"; }
virtual CacheShard* GetShard(int shard) override;

@ -20,8 +20,10 @@
namespace rocksdb {
ShardedCache::ShardedCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit)
: num_shard_bits_(num_shard_bits),
bool strict_capacity_limit,
std::shared_ptr<CacheAllocator> allocator)
: Cache(std::move(allocator)),
num_shard_bits_(num_shard_bits),
capacity_(capacity),
strict_capacity_limit_(strict_capacity_limit),
last_id_(1) {}
@ -142,6 +144,9 @@ std::string ShardedCache::GetPrintableOptions() const {
strict_capacity_limit_);
ret.append(buffer);
}
snprintf(buffer, kBufferSize, " cache_allocator : %s\n",
cache_allocator() ? cache_allocator()->Name() : "None");
ret.append(buffer);
ret.append(GetShard(0)->GetPrintableOptions());
return ret;
}

@ -47,7 +47,8 @@ class CacheShard {
// Keys are sharded by the highest num_shard_bits bits of hash value.
class ShardedCache : public Cache {
public:
ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit);
ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
virtual ~ShardedCache() = default;
virtual const char* Name() const override = 0;
virtual CacheShard* GetShard(int shard) = 0;

@ -25,6 +25,7 @@
#include <stdint.h>
#include <memory>
#include <string>
#include "rocksdb/cache_allocator.h"
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
@ -58,13 +59,20 @@ struct LRUCacheOptions {
// BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority.
double high_pri_pool_ratio = 0.0;
// If non-nullptr will use this allocator instead of system allocator when
// allocating memory for cache blocks. Call this method before you start using
// the cache!
std::shared_ptr<CacheAllocator> cache_allocator;
LRUCacheOptions() {}
LRUCacheOptions(size_t _capacity, int _num_shard_bits,
bool _strict_capacity_limit, double _high_pri_pool_ratio)
bool _strict_capacity_limit, double _high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> _cache_allocator = nullptr)
: capacity(_capacity),
num_shard_bits(_num_shard_bits),
strict_capacity_limit(_strict_capacity_limit),
high_pri_pool_ratio(_high_pri_pool_ratio) {}
high_pri_pool_ratio(_high_pri_pool_ratio),
cache_allocator(std::move(_cache_allocator)) {}
};
// Create a new cache with a fixed size capacity. The cache is sharded
@ -75,10 +83,10 @@ struct LRUCacheOptions {
// high_pri_pool_pct.
// num_shard_bits = -1 means it is automatically determined: every shard
// will be at least 512KB and number of shard bits will not exceed 6.
extern std::shared_ptr<Cache> NewLRUCache(size_t capacity,
int num_shard_bits = -1,
bool strict_capacity_limit = false,
double high_pri_pool_ratio = 0.0);
extern std::shared_ptr<Cache> NewLRUCache(
size_t capacity, int num_shard_bits = -1,
bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0,
std::shared_ptr<CacheAllocator> cache_allocator = nullptr);
extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts);
@ -91,13 +99,15 @@ extern std::shared_ptr<Cache> NewClockCache(size_t capacity,
int num_shard_bits = -1,
bool strict_capacity_limit = false);
class Cache {
public:
// Depending on implementation, cache entries with high priority could be less
// likely to get evicted than low priority entries.
enum class Priority { HIGH, LOW };
Cache() {}
Cache(std::shared_ptr<CacheAllocator> allocator = nullptr)
: cache_allocator_(std::move(allocator)) {}
// Destroys all existing entries by calling the "deleter"
// function that was passed via the Insert() function.
@ -228,10 +238,14 @@ class Cache {
virtual void TEST_mark_as_data_block(const Slice& /*key*/,
size_t /*charge*/) {}
CacheAllocator* cache_allocator() const { return cache_allocator_.get(); }
private:
// No copying allowed
Cache(const Cache&);
Cache& operator=(const Cache&);
std::shared_ptr<CacheAllocator> cache_allocator_;
};
} // namespace rocksdb

@ -0,0 +1,29 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
// CacheAllocator is an interface that a client can implement to supply custom
// cache allocation and deallocation methods. See rocksdb/cache.h for more
// information.
// All methods should be thread-safe.
class CacheAllocator {
public:
virtual ~CacheAllocator() = default;
// Name of the cache allocator, printed in the log
virtual const char* Name() const = 0;
// Allocate a block of at least size size
virtual void* Allocate(size_t size) = 0;
// Deallocate previously allocated block
virtual void Deallocate(void* p) = 0;
// Returns the memory size of the block allocated at p. The default
// implementation that just returns the original allocation_size is fine.
virtual size_t UsableSize(void* /*p*/, size_t allocation_size) const {
// default implementation just returns the allocation size
return allocation_size;
}
};

@ -39,6 +39,7 @@
#include "table/full_filter_block.h"
#include "table/table_builder.h"
#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
@ -654,7 +655,8 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
size_t size = block_contents.size();
std::unique_ptr<char[]> ubuf(new char[size + 1]);
auto ubuf =
AllocateBlock(size + 1, block_cache_compressed->cache_allocator());
memcpy(ubuf.get(), block_contents.data(), size);
ubuf[size] = type;

@ -80,11 +80,12 @@ Status ReadBlockFromFile(
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, const bool immortal_file = false) {
size_t read_amp_bytes_per_bit, CacheAllocator* allocator = nullptr,
const bool immortal_file = false) {
BlockContents contents;
BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
&contents, ioptions, do_uncompress,
compression_dict, cache_options, immortal_file);
BlockFetcher block_fetcher(
file, prefetch_buffer, footer, options, handle, &contents, ioptions,
do_uncompress, compression_dict, cache_options, allocator, immortal_file);
Status s = block_fetcher.ReadBlockContents();
if (s.ok()) {
result->reset(new Block(std::move(contents), global_seqno,
@ -94,6 +95,13 @@ Status ReadBlockFromFile(
return s;
}
inline CacheAllocator* GetCacheAllocator(
const BlockBasedTableOptions& table_options) {
return table_options.block_cache.get()
? table_options.block_cache->cache_allocator()
: nullptr;
}
// Delete the resource that is held by the iterator.
template <class ResourceType>
void DeleteHeldResource(void* arg, void* /*ignored*/) {
@ -1150,7 +1158,8 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options, kDisableGlobalSequenceNumber,
0 /* read_amp_bytes_per_bit */);
0 /* read_amp_bytes_per_bit */,
GetCacheAllocator(rep->table_options));
if (!s.ok()) {
ROCKS_LOG_ERROR(rep->ioptions.info_log,
@ -1173,7 +1182,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
GetContext* get_context) {
GetContext* get_context, CacheAllocator* allocator) {
Status s;
Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
@ -1230,7 +1239,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
compression_dict);
s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(),
compressed_block->size(), &contents,
format_version, ioptions);
format_version, ioptions, allocator);
// Insert uncompressed block into block cache
if (s.ok()) {
@ -1292,7 +1301,8 @@ Status BlockBasedTable::PutDataBlockToCache(
const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
Cache::Priority priority, GetContext* get_context) {
Cache::Priority priority, GetContext* get_context,
CacheAllocator* allocator) {
assert(raw_block->compression_type() == kNoCompression ||
block_cache_compressed != nullptr);
@ -1305,7 +1315,7 @@ Status BlockBasedTable::PutDataBlockToCache(
compression_dict);
s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
raw_block->size(), &contents, format_version,
ioptions);
ioptions, allocator);
}
if (!s.ok()) {
delete raw_block;
@ -1402,7 +1412,8 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
BlockFetcher block_fetcher(rep->file.get(), prefetch_buffer, rep->footer,
ReadOptions(), filter_handle, &block,
rep->ioptions, false /* decompress */,
dummy_comp_dict, rep->persistent_cache_options);
dummy_comp_dict, rep->persistent_cache_options,
GetCacheAllocator(rep->table_options));
Status s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
@ -1700,7 +1711,9 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
&block_value, rep->ioptions, rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
rep->table_options.read_amp_bytes_per_bit,
GetCacheAllocator(rep->table_options),
rep->immortal_table);
}
if (s.ok()) {
block.value = block_value.release();
@ -1792,7 +1805,8 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
s = GetDataBlockFromCache(
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
block_entry, rep->table_options.format_version, compression_dict,
rep->table_options.read_amp_bytes_per_bit, is_index, get_context);
rep->table_options.read_amp_bytes_per_bit, is_index, get_context,
GetCacheAllocator(rep->table_options));
if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
std::unique_ptr<Block> raw_block;
@ -1804,7 +1818,9 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
block_cache_compressed == nullptr && rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit, rep->immortal_table);
rep->table_options.read_amp_bytes_per_bit,
GetCacheAllocator(rep->table_options),
rep->immortal_table);
}
if (s.ok()) {
@ -1817,7 +1833,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
.cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH
: Cache::Priority::LOW,
get_context);
get_context, GetCacheAllocator(rep->table_options));
}
}
}
@ -2524,11 +2540,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
BlockHandle handle = index_iter->value();
BlockContents contents;
Slice dummy_comp_dict;
BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
rep_->footer, ReadOptions(), handle, &contents,
rep_->ioptions, false /* decompress */,
dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options);
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
ReadOptions(), handle, &contents, rep_->ioptions,
false /* decompress */, dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options,
GetCacheAllocator(rep_->table_options));
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
@ -2550,11 +2567,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
s = handle.DecodeFrom(&input);
BlockContents contents;
Slice dummy_comp_dict;
BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
rep_->footer, ReadOptions(), handle, &contents,
rep_->ioptions, false /* decompress */,
dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options);
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
ReadOptions(), handle, &contents, rep_->ioptions,
false /* decompress */, dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options,
GetCacheAllocator(rep_->table_options));
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
@ -2858,7 +2876,8 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file,
rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer,
ReadOptions(), handle, &block, rep_->ioptions,
false /*decompress*/, dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options);
rep_->persistent_cache_options,
GetCacheAllocator(rep_->table_options));
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
rep_->filter.reset(new BlockBasedFilterBlockReader(

@ -303,7 +303,8 @@ class BlockBasedTable : public TableReader {
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, GetContext* get_context = nullptr);
bool is_index = false, GetContext* get_context = nullptr,
CacheAllocator* allocator = nullptr);
// Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then
@ -322,7 +323,7 @@ class BlockBasedTable : public TableReader {
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW,
GetContext* get_context = nullptr);
GetContext* get_context = nullptr, CacheAllocator* allocator = nullptr);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.

@ -17,8 +17,9 @@
#include "rocksdb/env.h"
#include "table/block.h"
#include "table/block_based_table_reader.h"
#include "table/persistent_cache_helper.h"
#include "table/format.h"
#include "table/persistent_cache_helper.h"
#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
@ -107,9 +108,11 @@ bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
if (cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) {
// lookup uncompressed cache mode p-cache
std::unique_ptr<char[]> raw_data;
status_ = PersistentCacheHelper::LookupRawPage(
cache_options_, handle_, &heap_buf_, block_size_ + kBlockTrailerSize);
cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize);
if (status_.ok()) {
heap_buf_ = CacheAllocationPtr(raw_data.release());
used_buf_ = heap_buf_.get();
slice_ = Slice(heap_buf_.get(), block_size_);
return true;
@ -132,7 +135,7 @@ void BlockFetcher::PrepareBufferForBlockFromFile() {
// trivially allocated stack buffer instead of needing a full malloc()
used_buf_ = &stack_buf_[0];
} else {
heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]);
heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
used_buf_ = heap_buf_.get();
}
}
@ -170,7 +173,7 @@ void BlockFetcher::GetBlockContents() {
// or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
assert(used_buf_ != heap_buf_.get());
heap_buf_.reset(new char[block_size_ + kBlockTrailerSize]);
heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
}
*contents_ = BlockContents(std::move(heap_buf_), block_size_, true,
@ -228,9 +231,9 @@ Status BlockFetcher::ReadBlockContents() {
if (do_uncompress_ && compression_type != kNoCompression) {
// compressed page, uncompress, update cache
UncompressionContext uncompression_ctx(compression_type, compression_dict_);
status_ =
UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_,
contents_, footer_.version(), ioptions_);
status_ = UncompressBlockContents(uncompression_ctx, slice_.data(),
block_size_, contents_, footer_.version(),
ioptions_, allocator_);
} else {
GetBlockContents();
}

@ -11,6 +11,8 @@
#include "table/block.h"
#include "table/format.h"
#include "util/cache_allocator.h"
namespace rocksdb {
class BlockFetcher {
public:
@ -26,6 +28,7 @@ class BlockFetcher {
BlockContents* contents, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
CacheAllocator* allocator = nullptr,
const bool immortal_source = false)
: file_(file),
prefetch_buffer_(prefetch_buffer),
@ -37,7 +40,8 @@ class BlockFetcher {
do_uncompress_(do_uncompress),
immortal_source_(immortal_source),
compression_dict_(compression_dict),
cache_options_(cache_options) {}
cache_options_(cache_options),
allocator_(allocator) {}
Status ReadBlockContents();
private:
@ -54,11 +58,12 @@ class BlockFetcher {
const bool immortal_source_;
const Slice& compression_dict_;
const PersistentCacheOptions& cache_options_;
CacheAllocator* allocator_;
Status status_;
Slice slice_;
char* used_buf_ = nullptr;
size_t block_size_;
std::unique_ptr<char[]> heap_buf_;
CacheAllocationPtr heap_buf_;
char stack_buf_[kDefaultStackBufferSize];
bool got_from_prefetch_buffer_ = false;
rocksdb::CompressionType compression_type;

@ -19,6 +19,7 @@
#include "table/block_based_table_reader.h"
#include "table/block_fetcher.h"
#include "table/persistent_cache_helper.h"
#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
@ -279,8 +280,9 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions) {
std::unique_ptr<char[]> ubuf;
const ImmutableCFOptions& ioptions,
CacheAllocator* allocator) {
CacheAllocationPtr ubuf;
assert(uncompression_ctx.type() != kNoCompression &&
"Invalid compression type");
@ -296,7 +298,7 @@ Status UncompressBlockContentsForCompressionType(
if (!Snappy_GetUncompressedLength(data, n, &ulength)) {
return Status::Corruption(snappy_corrupt_msg);
}
ubuf.reset(new char[ulength]);
ubuf = AllocateBlock(ulength, allocator);
if (!Snappy_Uncompress(data, n, ubuf.get())) {
return Status::Corruption(snappy_corrupt_msg);
}
@ -304,9 +306,10 @@ Status UncompressBlockContentsForCompressionType(
break;
}
case kZlibCompression:
ubuf.reset(Zlib_Uncompress(
ubuf = Zlib_Uncompress(
uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kZlibCompression, format_version)));
GetCompressFormatForVersion(kZlibCompression, format_version),
allocator);
if (!ubuf) {
static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents";
@ -316,9 +319,10 @@ Status UncompressBlockContentsForCompressionType(
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kBZip2Compression:
ubuf.reset(BZip2_Uncompress(
ubuf = BZip2_Uncompress(
data, n, &decompress_size,
GetCompressFormatForVersion(kBZip2Compression, format_version)));
GetCompressFormatForVersion(kBZip2Compression, format_version),
allocator);
if (!ubuf) {
static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents";
@ -328,9 +332,10 @@ Status UncompressBlockContentsForCompressionType(
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kLZ4Compression:
ubuf.reset(LZ4_Uncompress(
ubuf = LZ4_Uncompress(
uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4Compression, format_version)));
GetCompressFormatForVersion(kLZ4Compression, format_version),
allocator);
if (!ubuf) {
static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents";
@ -340,9 +345,10 @@ Status UncompressBlockContentsForCompressionType(
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kLZ4HCCompression:
ubuf.reset(LZ4_Uncompress(
ubuf = LZ4_Uncompress(
uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
allocator);
if (!ubuf) {
static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents";
@ -352,6 +358,8 @@ Status UncompressBlockContentsForCompressionType(
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kXpressCompression:
// XPRESS allocates memory internally, thus no support for custom
// allocator.
ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size));
if (!ubuf) {
static char xpress_corrupt_msg[] =
@ -363,7 +371,8 @@ Status UncompressBlockContentsForCompressionType(
break;
case kZSTD:
case kZSTDNotFinalCompression:
ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size));
ubuf = ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size,
allocator);
if (!ubuf) {
static char zstd_corrupt_msg[] =
"ZSTD not supported or corrupted ZSTD compressed block contents";
@ -396,11 +405,13 @@ Status UncompressBlockContentsForCompressionType(
Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions) {
const ImmutableCFOptions& ioptions,
CacheAllocator* allocator) {
assert(data[n] != kNoCompression);
assert(data[n] == uncompression_ctx.type());
return UncompressBlockContentsForCompressionType(
uncompression_ctx, data, n, contents, format_version, ioptions);
return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n,
contents, format_version,
ioptions, allocator);
}
} // namespace rocksdb

@ -26,6 +26,7 @@
#include "port/port.h" // noexcept
#include "table/persistent_cache_options.h"
#include "util/file_reader_writer.h"
#include "util/cache_allocator.h"
namespace rocksdb {
@ -192,7 +193,7 @@ struct BlockContents {
Slice data; // Actual contents of data
bool cachable; // True iff data can be cached
CompressionType compression_type;
std::unique_ptr<char[]> allocation;
CacheAllocationPtr allocation;
BlockContents() : cachable(false), compression_type(kNoCompression) {}
@ -200,16 +201,28 @@ struct BlockContents {
CompressionType _compression_type)
: data(_data), cachable(_cachable), compression_type(_compression_type) {}
BlockContents(std::unique_ptr<char[]>&& _data, size_t _size, bool _cachable,
BlockContents(CacheAllocationPtr&& _data, size_t _size, bool _cachable,
CompressionType _compression_type)
: data(_data.get(), _size),
cachable(_cachable),
compression_type(_compression_type),
allocation(std::move(_data)) {}
BlockContents(std::unique_ptr<char[]>&& _data, size_t _size, bool _cachable,
CompressionType _compression_type)
: data(_data.get(), _size),
cachable(_cachable),
compression_type(_compression_type) {
allocation.reset(_data.release());
}
// The additional memory space taken by the block data.
size_t usable_size() const {
if (allocation.get() != nullptr) {
auto allocator = allocation.get_deleter().allocator;
if (allocator) {
return allocator->UsableSize(allocation.get(), data.size());
}
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
return malloc_usable_size(allocation.get());
#else
@ -252,7 +265,7 @@ extern Status ReadBlockContents(
extern Status UncompressBlockContents(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version,
const ImmutableCFOptions& ioptions);
const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr);
// This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks
@ -260,7 +273,7 @@ extern Status UncompressBlockContents(
extern Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version,
const ImmutableCFOptions& ioptions);
const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr);
// Implementation details follow. Clients should ignore,

@ -153,8 +153,8 @@ class PlainTableReader: public TableReader {
DynamicBloom bloom_;
PlainTableReaderFileInfo file_info_;
Arena arena_;
std::unique_ptr<char[]> index_block_alloc_;
std::unique_ptr<char[]> bloom_block_alloc_;
CacheAllocationPtr index_block_alloc_;
CacheAllocationPtr bloom_block_alloc_;
const ImmutableCFOptions& ioptions_;
uint64_t file_size_;

@ -2477,6 +2477,78 @@ TEST_P(BlockBasedTableTest, BlockCacheLeak) {
c.ResetTableReader();
}
namespace {
class CustomCacheAllocator : public CacheAllocator {
public:
virtual const char* Name() const override { return "CustomCacheAllocator"; }
void* Allocate(size_t size) override {
++numAllocations;
auto ptr = new char[size + 16];
memcpy(ptr, "cache_allocator_", 16); // mangle first 16 bytes
return reinterpret_cast<void*>(ptr + 16);
}
void Deallocate(void* p) override {
++numDeallocations;
char* ptr = reinterpret_cast<char*>(p) - 16;
delete[] ptr;
}
std::atomic<int> numAllocations;
std::atomic<int> numDeallocations;
};
} // namespace
TEST_P(BlockBasedTableTest, CacheAllocator) {
auto custom_cache_allocator = std::make_shared<CustomCacheAllocator>();
{
Options opt;
unique_ptr<InternalKeyComparator> ikc;
ikc.reset(new test::PlainInternalKeyComparator(opt.comparator));
opt.compression = kNoCompression;
BlockBasedTableOptions table_options;
table_options.block_size = 1024;
LRUCacheOptions lruOptions;
lruOptions.cache_allocator = custom_cache_allocator;
lruOptions.capacity = 16 * 1024 * 1024;
lruOptions.num_shard_bits = 4;
table_options.block_cache = NewLRUCache(std::move(lruOptions));
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
TableConstructor c(BytewiseComparator(),
true /* convert_to_internal_key_ */);
c.Add("k01", "hello");
c.Add("k02", "hello2");
c.Add("k03", std::string(10000, 'x'));
c.Add("k04", std::string(200000, 'x'));
c.Add("k05", std::string(300000, 'x'));
c.Add("k06", "hello3");
c.Add("k07", std::string(100000, 'x'));
std::vector<std::string> keys;
stl_wrappers::KVMap kvmap;
const ImmutableCFOptions ioptions(opt);
const MutableCFOptions moptions(opt);
c.Finish(opt, ioptions, moptions, table_options, *ikc, &keys, &kvmap);
unique_ptr<InternalIterator> iter(
c.NewIterator(moptions.prefix_extractor.get()));
iter->SeekToFirst();
while (iter->Valid()) {
iter->key();
iter->value();
iter->Next();
}
ASSERT_OK(iter->status());
}
// out of scope, block cache should have been deleted, all allocations
// deallocated
EXPECT_EQ(custom_cache_allocator->numAllocations.load(),
custom_cache_allocator->numDeallocations.load());
// make sure that allocations actually happened through the cache allocator
EXPECT_GT(custom_cache_allocator->numAllocations.load(), 0);
}
TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) {
// A regression test to avoid data race described in
// https://github.com/facebook/rocksdb/issues/1267

@ -3040,7 +3040,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
int64_t bytes = 0;
int decompress_size;
while (ok && bytes < 1024 * 1048576) {
char *uncompressed = nullptr;
CacheAllocationPtr uncompressed;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression: {
// get size and allocate here to make comparison fair
@ -3050,45 +3050,44 @@ void VerifyDBFromDB(std::string& truth_db_name) {
ok = false;
break;
}
uncompressed = new char[ulength];
uncompressed = AllocateBlock(ulength, nullptr);
ok = Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed);
uncompressed.get());
break;
}
case rocksdb::kZlibCompression:
uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed != nullptr;
ok = uncompressed.get() != nullptr;
break;
case rocksdb::kBZip2Compression:
uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
&decompress_size, 2);
ok = uncompressed != nullptr;
ok = uncompressed.get() != nullptr;
break;
case rocksdb::kLZ4Compression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed != nullptr;
ok = uncompressed.get() != nullptr;
break;
case rocksdb::kLZ4HCCompression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed != nullptr;
ok = uncompressed.get() != nullptr;
break;
case rocksdb::kXpressCompression:
uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(),
&decompress_size);
ok = uncompressed != nullptr;
uncompressed.reset(XPRESS_Uncompress(
compressed.data(), compressed.size(), &decompress_size));
ok = uncompressed.get() != nullptr;
break;
case rocksdb::kZSTD:
uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
ok = uncompressed.get() != nullptr;
break;
default:
ok = false;
}
delete[] uncompressed;
bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
}

@ -0,0 +1,38 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include "rocksdb/cache_allocator.h"
namespace rocksdb {
struct CustomDeleter {
CustomDeleter(CacheAllocator* a = nullptr) : allocator(a) {}
void operator()(char* ptr) const {
if (allocator) {
allocator->Deallocate(reinterpret_cast<void*>(ptr));
} else {
delete[] ptr;
}
}
CacheAllocator* allocator;
};
using CacheAllocationPtr = std::unique_ptr<char[], CustomDeleter>;
inline CacheAllocationPtr AllocateBlock(size_t size,
CacheAllocator* allocator) {
if (allocator) {
auto block = reinterpret_cast<char*>(allocator->Allocate(size));
return CacheAllocationPtr(block, allocator);
}
return CacheAllocationPtr(new char[size]);
}
} // namespace rocksdb

@ -14,6 +14,8 @@
#include <string>
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "util/cache_allocator.h"
#include "util/coding.h"
#include "util/compression_context_cache.h"
@ -495,11 +497,10 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
// header in varint32 format
// @param compression_dict Data for presetting the compression library's
// dictionary.
inline char* Zlib_Uncompress(const UncompressionContext& ctx,
const char* input_data, size_t input_length,
int* decompress_size,
uint32_t compress_format_version,
int windowBits = -14) {
inline CacheAllocationPtr Zlib_Uncompress(
const UncompressionContext& ctx, const char* input_data,
size_t input_length, int* decompress_size, uint32_t compress_format_version,
CacheAllocator* allocator = nullptr, int windowBits = -14) {
#ifdef ZLIB
uint32_t output_len = 0;
if (compress_format_version == 2) {
@ -541,9 +542,9 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx,
_stream.next_in = (Bytef*)input_data;
_stream.avail_in = static_cast<unsigned int>(input_length);
char* output = new char[output_len];
auto output = AllocateBlock(output_len, allocator);
_stream.next_out = (Bytef*)output;
_stream.next_out = (Bytef*)output.get();
_stream.avail_out = static_cast<unsigned int>(output_len);
bool done = false;
@ -561,19 +562,17 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx,
size_t old_sz = output_len;
uint32_t output_len_delta = output_len / 5;
output_len += output_len_delta < 10 ? 10 : output_len_delta;
char* tmp = new char[output_len];
memcpy(tmp, output, old_sz);
delete[] output;
output = tmp;
auto tmp = AllocateBlock(output_len, allocator);
memcpy(tmp.get(), output.get(), old_sz);
output = std::move(tmp);
// Set more output.
_stream.next_out = (Bytef*)(output + old_sz);
_stream.next_out = (Bytef*)(output.get() + old_sz);
_stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
break;
}
case Z_BUF_ERROR:
default:
delete[] output;
inflateEnd(&_stream);
return nullptr;
}
@ -660,9 +659,9 @@ inline bool BZip2_Compress(const CompressionContext& /*ctx*/,
// block header
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format
inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
int* decompress_size,
uint32_t compress_format_version) {
inline CacheAllocationPtr BZip2_Uncompress(
const char* input_data, size_t input_length, int* decompress_size,
uint32_t compress_format_version, CacheAllocator* allocator = nullptr) {
#ifdef BZIP2
uint32_t output_len = 0;
if (compress_format_version == 2) {
@ -690,9 +689,9 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
_stream.next_in = (char*)input_data;
_stream.avail_in = static_cast<unsigned int>(input_length);
char* output = new char[output_len];
auto output = AllocateBlock(output_len, allocator);
_stream.next_out = (char*)output;
_stream.next_out = (char*)output.get();
_stream.avail_out = static_cast<unsigned int>(output_len);
bool done = false;
@ -709,18 +708,16 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
assert(compress_format_version != 2);
uint32_t old_sz = output_len;
output_len = output_len * 1.2;
char* tmp = new char[output_len];
memcpy(tmp, output, old_sz);
delete[] output;
output = tmp;
auto tmp = AllocateBlock(output_len, allocator);
memcpy(tmp.get(), output.get(), old_sz);
output = std::move(tmp);
// Set more output.
_stream.next_out = (char*)(output + old_sz);
_stream.next_out = (char*)(output.get() + old_sz);
_stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
break;
}
default:
delete[] output;
BZ2_bzDecompressEnd(&_stream);
return nullptr;
}
@ -814,10 +811,12 @@ inline bool LZ4_Compress(const CompressionContext& ctx,
// header in varint32 format
// @param compression_dict Data for presetting the compression library's
// dictionary.
inline char* LZ4_Uncompress(const UncompressionContext& ctx,
const char* input_data, size_t input_length,
inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
const char* input_data,
size_t input_length,
int* decompress_size,
uint32_t compress_format_version) {
uint32_t compress_format_version,
CacheAllocator* allocator = nullptr) {
#ifdef LZ4
uint32_t output_len = 0;
if (compress_format_version == 2) {
@ -837,7 +836,7 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx,
input_data += 8;
}
char* output = new char[output_len];
auto output = AllocateBlock(output_len, allocator);
#if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
if (ctx.dict().size()) {
@ -845,17 +844,16 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx,
static_cast<int>(ctx.dict().size()));
}
*decompress_size = LZ4_decompress_safe_continue(
stream, input_data, output, static_cast<int>(input_length),
stream, input_data, output.get(), static_cast<int>(input_length),
static_cast<int>(output_len));
LZ4_freeStreamDecode(stream);
#else // up to r123
*decompress_size =
LZ4_decompress_safe(input_data, output, static_cast<int>(input_length),
*decompress_size = LZ4_decompress_safe(input_data, output.get(),
static_cast<int>(input_length),
static_cast<int>(output_len));
#endif // LZ4_VERSION_NUMBER >= 10400
if (*decompress_size < 0) {
delete[] output;
return nullptr;
}
assert(*decompress_size == static_cast<int>(output_len));
@ -866,6 +864,7 @@ inline char* LZ4_Uncompress(const UncompressionContext& ctx,
(void)input_length;
(void)decompress_size;
(void)compress_format_version;
(void)allocator;
return nullptr;
#endif
}
@ -1028,9 +1027,11 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
// @param compression_dict Data for presetting the compression library's
// dictionary.
inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
const char* input_data, size_t input_length,
int* decompress_size) {
inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx,
const char* input_data,
size_t input_length,
int* decompress_size,
CacheAllocator* allocator = nullptr) {
#ifdef ZSTD
uint32_t output_len = 0;
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
@ -1038,17 +1039,17 @@ inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
return nullptr;
}
char* output = new char[output_len];
auto output = AllocateBlock(output_len, allocator);
size_t actual_output_length;
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_DCtx* context = ctx.GetZSTDContext();
assert(context != nullptr);
actual_output_length = ZSTD_decompress_usingDict(
context, output, output_len, input_data, input_length, ctx.dict().data(),
ctx.dict().size());
context, output.get(), output_len, input_data, input_length,
ctx.dict().data(), ctx.dict().size());
#else // up to v0.4.x
actual_output_length =
ZSTD_decompress(output, output_len, input_data, input_length);
ZSTD_decompress(output.get(), output_len, input_data, input_length);
#endif // ZSTD_VERSION_NUMBER >= 500
assert(actual_output_length == output_len);
*decompress_size = static_cast<int>(actual_output_length);
@ -1058,6 +1059,7 @@ inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
(void)input_data;
(void)input_length;
(void)decompress_size;
(void)allocator;
return nullptr;
#endif
}

Loading…
Cancel
Save