s/CacheAllocator/MemoryAllocator/g (#4590)

Summary:
Rename the interface, as it is mean to be a generic interface for memory allocation.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4590

Differential Revision: D10866340

Pulled By: yiwu-arbug

fbshipit-source-id: 85cb753351a40cb856c046aeaa3f3b369eef3d16
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent 7528130e38
commit f560c8f5c8
  1. 2
      HISTORY.md
  2. 8
      cache/lru_cache.cc
  3. 2
      cache/lru_cache.h
  4. 6
      cache/sharded_cache.cc
  5. 2
      cache/sharded_cache.h
  6. 1
      db/dbformat.h
  7. 18
      include/rocksdb/cache.h
  8. 12
      include/rocksdb/memory_allocator.h
  9. 4
      table/block_based_table_builder.cc
  10. 46
      table/block_based_table_reader.cc
  11. 4
      table/block_based_table_reader.h
  12. 2
      table/block_fetcher.cc
  13. 6
      table/block_fetcher.h
  14. 6
      table/format.cc
  15. 6
      table/format.h
  16. 18
      table/table_test.cc
  17. 17
      util/compression.h
  18. 8
      util/memory_allocator.h

@ -1,7 +1,7 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### New Features ### New Features
* Introduced `CacheAllocator`, which lets the user specify custom allocator for memory in block cache. * Introduced `Memoryllocator`, which lets the user specify custom allocator for memory in block cache.
* Introduced `PerfContextByLevel` as part of `PerfContext` which allows storing perf context at each level. Also replaced `__thread` with `thread_local` keyword for perf_context. * Introduced `PerfContextByLevel` as part of `PerfContext` which allows storing perf context at each level. Also replaced `__thread` with `thread_local` keyword for perf_context.
* With level_compaction_dynamic_level_bytes = true, level multiplier may be adjusted automatically when Level 0 to 1 compaction is lagged behind. * With level_compaction_dynamic_level_bytes = true, level multiplier may be adjusted automatically when Level 0 to 1 compaction is lagged behind.

@ -462,7 +462,7 @@ std::string LRUCacheShard::GetPrintableOptions() const {
LRUCache::LRUCache(size_t capacity, int num_shard_bits, LRUCache::LRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit, double high_pri_pool_ratio, bool strict_capacity_limit, double high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> allocator) std::shared_ptr<MemoryAllocator> allocator)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit, : ShardedCache(capacity, num_shard_bits, strict_capacity_limit,
std::move(allocator)) { std::move(allocator)) {
num_shards_ = 1 << num_shard_bits; num_shards_ = 1 << num_shard_bits;
@ -540,13 +540,13 @@ std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) {
return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit, cache_opts.strict_capacity_limit,
cache_opts.high_pri_pool_ratio, cache_opts.high_pri_pool_ratio,
cache_opts.cache_allocator); cache_opts.memory_allocator);
} }
std::shared_ptr<Cache> NewLRUCache( std::shared_ptr<Cache> NewLRUCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit, size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio, double high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> cache_allocator) { std::shared_ptr<MemoryAllocator> memory_allocator) {
if (num_shard_bits >= 20) { if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces return nullptr; // the cache cannot be sharded into too many fine pieces
} }
@ -559,7 +559,7 @@ std::shared_ptr<Cache> NewLRUCache(
} }
return std::make_shared<LRUCache>(capacity, num_shard_bits, return std::make_shared<LRUCache>(capacity, num_shard_bits,
strict_capacity_limit, high_pri_pool_ratio, strict_capacity_limit, high_pri_pool_ratio,
std::move(cache_allocator)); std::move(memory_allocator));
} }
} // namespace rocksdb } // namespace rocksdb

2
cache/lru_cache.h vendored

@ -280,7 +280,7 @@ class LRUCache : public ShardedCache {
public: public:
LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio, double high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> cache_allocator = nullptr); std::shared_ptr<MemoryAllocator> memory_allocator = nullptr);
virtual ~LRUCache(); virtual ~LRUCache();
virtual const char* Name() const override { return "LRUCache"; } virtual const char* Name() const override { return "LRUCache"; }
virtual CacheShard* GetShard(int shard) override; virtual CacheShard* GetShard(int shard) override;

@ -21,7 +21,7 @@ namespace rocksdb {
ShardedCache::ShardedCache(size_t capacity, int num_shard_bits, ShardedCache::ShardedCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit, bool strict_capacity_limit,
std::shared_ptr<CacheAllocator> allocator) std::shared_ptr<MemoryAllocator> allocator)
: Cache(std::move(allocator)), : Cache(std::move(allocator)),
num_shard_bits_(num_shard_bits), num_shard_bits_(num_shard_bits),
capacity_(capacity), capacity_(capacity),
@ -144,8 +144,8 @@ std::string ShardedCache::GetPrintableOptions() const {
strict_capacity_limit_); strict_capacity_limit_);
ret.append(buffer); ret.append(buffer);
} }
snprintf(buffer, kBufferSize, " cache_allocator : %s\n", snprintf(buffer, kBufferSize, " memory_allocator : %s\n",
cache_allocator() ? cache_allocator()->Name() : "None"); memory_allocator() ? memory_allocator()->Name() : "None");
ret.append(buffer); ret.append(buffer);
ret.append(GetShard(0)->GetPrintableOptions()); ret.append(GetShard(0)->GetPrintableOptions());
return ret; return ret;

@ -48,7 +48,7 @@ class CacheShard {
class ShardedCache : public Cache { class ShardedCache : public Cache {
public: public:
ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
std::shared_ptr<CacheAllocator> cache_allocator = nullptr); std::shared_ptr<MemoryAllocator> memory_allocator = nullptr);
virtual ~ShardedCache() = default; virtual ~ShardedCache() = default;
virtual const char* Name() const override = 0; virtual const char* Name() const override = 0;
virtual CacheShard* GetShard(int shard) = 0; virtual CacheShard* GetShard(int shard) = 0;

@ -686,5 +686,4 @@ struct ParsedInternalKeyComparator {
const InternalKeyComparator* cmp; const InternalKeyComparator* cmp;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -25,7 +25,7 @@
#include <stdint.h> #include <stdint.h>
#include <memory> #include <memory>
#include <string> #include <string>
#include "rocksdb/cache_allocator.h" #include "rocksdb/memory_allocator.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
@ -62,17 +62,17 @@ struct LRUCacheOptions {
// If non-nullptr will use this allocator instead of system allocator when // If non-nullptr will use this allocator instead of system allocator when
// allocating memory for cache blocks. Call this method before you start using // allocating memory for cache blocks. Call this method before you start using
// the cache! // the cache!
std::shared_ptr<CacheAllocator> cache_allocator; std::shared_ptr<MemoryAllocator> memory_allocator;
LRUCacheOptions() {} LRUCacheOptions() {}
LRUCacheOptions(size_t _capacity, int _num_shard_bits, LRUCacheOptions(size_t _capacity, int _num_shard_bits,
bool _strict_capacity_limit, double _high_pri_pool_ratio, bool _strict_capacity_limit, double _high_pri_pool_ratio,
std::shared_ptr<CacheAllocator> _cache_allocator = nullptr) std::shared_ptr<MemoryAllocator> _memory_allocator = nullptr)
: capacity(_capacity), : capacity(_capacity),
num_shard_bits(_num_shard_bits), num_shard_bits(_num_shard_bits),
strict_capacity_limit(_strict_capacity_limit), strict_capacity_limit(_strict_capacity_limit),
high_pri_pool_ratio(_high_pri_pool_ratio), high_pri_pool_ratio(_high_pri_pool_ratio),
cache_allocator(std::move(_cache_allocator)) {} memory_allocator(std::move(_memory_allocator)) {}
}; };
// Create a new cache with a fixed size capacity. The cache is sharded // Create a new cache with a fixed size capacity. The cache is sharded
@ -86,7 +86,7 @@ struct LRUCacheOptions {
extern std::shared_ptr<Cache> NewLRUCache( extern std::shared_ptr<Cache> NewLRUCache(
size_t capacity, int num_shard_bits = -1, size_t capacity, int num_shard_bits = -1,
bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0, bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0,
std::shared_ptr<CacheAllocator> cache_allocator = nullptr); std::shared_ptr<MemoryAllocator> memory_allocator = nullptr);
extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts); extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts);
@ -105,8 +105,8 @@ class Cache {
// likely to get evicted than low priority entries. // likely to get evicted than low priority entries.
enum class Priority { HIGH, LOW }; enum class Priority { HIGH, LOW };
Cache(std::shared_ptr<CacheAllocator> allocator = nullptr) Cache(std::shared_ptr<MemoryAllocator> allocator = nullptr)
: cache_allocator_(std::move(allocator)) {} : memory_allocator_(std::move(allocator)) {}
// Destroys all existing entries by calling the "deleter" // Destroys all existing entries by calling the "deleter"
// function that was passed via the Insert() function. // function that was passed via the Insert() function.
@ -237,14 +237,14 @@ class Cache {
virtual void TEST_mark_as_data_block(const Slice& /*key*/, virtual void TEST_mark_as_data_block(const Slice& /*key*/,
size_t /*charge*/) {} size_t /*charge*/) {}
CacheAllocator* cache_allocator() const { return cache_allocator_.get(); } MemoryAllocator* memory_allocator() const { return memory_allocator_.get(); }
private: private:
// No copying allowed // No copying allowed
Cache(const Cache&); Cache(const Cache&);
Cache& operator=(const Cache&); Cache& operator=(const Cache&);
std::shared_ptr<CacheAllocator> cache_allocator_; std::shared_ptr<MemoryAllocator> memory_allocator_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -5,13 +5,15 @@
#pragma once #pragma once
// CacheAllocator is an interface that a client can implement to supply custom namespace rocksdb {
// cache allocation and deallocation methods. See rocksdb/cache.h for more
// MemoryAllocator is an interface that a client can implement to supply custom
// memory allocation and deallocation methods. See rocksdb/cache.h for more
// information. // information.
// All methods should be thread-safe. // All methods should be thread-safe.
class CacheAllocator { class MemoryAllocator {
public: public:
virtual ~CacheAllocator() = default; virtual ~MemoryAllocator() = default;
// Name of the cache allocator, printed in the log // Name of the cache allocator, printed in the log
virtual const char* Name() const = 0; virtual const char* Name() const = 0;
@ -27,3 +29,5 @@ class CacheAllocator {
return allocation_size; return allocation_size;
} }
}; };
} // namespace rocksdb

@ -39,10 +39,10 @@
#include "table/full_filter_block.h" #include "table/full_filter_block.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "util/cache_allocator.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/memory_allocator.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/xxhash.h" #include "util/xxhash.h"
@ -656,7 +656,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
size_t size = block_contents.size(); size_t size = block_contents.size();
auto ubuf = auto ubuf =
AllocateBlock(size + 1, block_cache_compressed->cache_allocator()); AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
memcpy(ubuf.get(), block_contents.data(), size); memcpy(ubuf.get(), block_contents.data(), size);
ubuf[size] = type; ubuf[size] = type;

@ -80,7 +80,7 @@ Status ReadBlockFromFile(
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions, std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict, bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno, const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, CacheAllocator* allocator = nullptr, size_t read_amp_bytes_per_bit, MemoryAllocator* allocator = nullptr,
const bool immortal_file = false) { const bool immortal_file = false) {
BlockContents contents; BlockContents contents;
BlockFetcher block_fetcher( BlockFetcher block_fetcher(
@ -95,10 +95,10 @@ Status ReadBlockFromFile(
return s; return s;
} }
inline CacheAllocator* GetCacheAllocator( inline MemoryAllocator* GetMemoryAllocator(
const BlockBasedTableOptions& table_options) { const BlockBasedTableOptions& table_options) {
return table_options.block_cache.get() return table_options.block_cache.get()
? table_options.block_cache->cache_allocator() ? table_options.block_cache->memory_allocator()
: nullptr; : nullptr;
} }
@ -1160,7 +1160,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
rep->footer.metaindex_handle(), &meta, rep->ioptions, rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, Slice() /*compression dict*/, true /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options, kDisableGlobalSequenceNumber, rep->persistent_cache_options, kDisableGlobalSequenceNumber,
0 /* read_amp_bytes_per_bit */, GetCacheAllocator(rep->table_options)); 0 /* read_amp_bytes_per_bit */, GetMemoryAllocator(rep->table_options));
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(rep->ioptions.info_log, ROCKS_LOG_ERROR(rep->ioptions.info_log,
@ -1183,7 +1183,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
const ImmutableCFOptions& ioptions, const ReadOptions& read_options, const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version, BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
GetContext* get_context, CacheAllocator* allocator) { GetContext* get_context, MemoryAllocator* allocator) {
Status s; Status s;
Block* compressed_block = nullptr; Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr;
@ -1303,7 +1303,7 @@ Status BlockBasedTable::PutDataBlockToCache(
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version, CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
Cache::Priority priority, GetContext* get_context, Cache::Priority priority, GetContext* get_context,
CacheAllocator* allocator) { MemoryAllocator* allocator) {
assert(raw_block->compression_type() == kNoCompression || assert(raw_block->compression_type() == kNoCompression ||
block_cache_compressed != nullptr); block_cache_compressed != nullptr);
@ -1414,7 +1414,7 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
ReadOptions(), filter_handle, &block, ReadOptions(), filter_handle, &block,
rep->ioptions, false /* decompress */, rep->ioptions, false /* decompress */,
dummy_comp_dict, rep->persistent_cache_options, dummy_comp_dict, rep->persistent_cache_options,
GetCacheAllocator(rep->table_options)); GetMemoryAllocator(rep->table_options));
Status s = block_fetcher.ReadBlockContents(); Status s = block_fetcher.ReadBlockContents();
if (!s.ok()) { if (!s.ok()) {
@ -1713,7 +1713,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
compression_dict, rep->persistent_cache_options, compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit, rep->table_options.read_amp_bytes_per_bit,
GetCacheAllocator(rep->table_options), rep->immortal_table); GetMemoryAllocator(rep->table_options), rep->immortal_table);
} }
if (s.ok()) { if (s.ok()) {
block.value = block_value.release(); block.value = block_value.release();
@ -1806,7 +1806,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro, key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
block_entry, rep->table_options.format_version, compression_dict, block_entry, rep->table_options.format_version, compression_dict,
rep->table_options.read_amp_bytes_per_bit, is_index, get_context, rep->table_options.read_amp_bytes_per_bit, is_index, get_context,
GetCacheAllocator(rep->table_options)); GetMemoryAllocator(rep->table_options));
if (block_entry->value == nullptr && !no_io && ro.fill_cache) { if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
std::unique_ptr<Block> raw_block; std::unique_ptr<Block> raw_block;
@ -1819,7 +1819,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
compression_dict, rep->persistent_cache_options, compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit, rep->table_options.read_amp_bytes_per_bit,
GetCacheAllocator(rep->table_options), rep->immortal_table); GetMemoryAllocator(rep->table_options), rep->immortal_table);
} }
if (s.ok()) { if (s.ok()) {
@ -1832,7 +1832,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
.cache_index_and_filter_blocks_with_high_priority .cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH ? Cache::Priority::HIGH
: Cache::Priority::LOW, : Cache::Priority::LOW,
get_context, GetCacheAllocator(rep->table_options)); get_context, GetMemoryAllocator(rep->table_options));
} }
} }
} }
@ -2553,11 +2553,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
BlockHandle handle = index_iter->value(); BlockHandle handle = index_iter->value();
BlockContents contents; BlockContents contents;
Slice dummy_comp_dict; Slice dummy_comp_dict;
BlockFetcher block_fetcher( BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, rep_->footer, ReadOptions(), handle, &contents,
ReadOptions(), handle, &contents, rep_->ioptions, rep_->ioptions, false /* decompress */,
false /* decompress */, dummy_comp_dict /*compression dict*/, dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options, GetCacheAllocator(rep_->table_options)); rep_->persistent_cache_options,
GetMemoryAllocator(rep_->table_options));
s = block_fetcher.ReadBlockContents(); s = block_fetcher.ReadBlockContents();
if (!s.ok()) { if (!s.ok()) {
break; break;
@ -2579,11 +2580,12 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
s = handle.DecodeFrom(&input); s = handle.DecodeFrom(&input);
BlockContents contents; BlockContents contents;
Slice dummy_comp_dict; Slice dummy_comp_dict;
BlockFetcher block_fetcher( BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, rep_->footer, ReadOptions(), handle, &contents,
ReadOptions(), handle, &contents, rep_->ioptions, rep_->ioptions, false /* decompress */,
false /* decompress */, dummy_comp_dict /*compression dict*/, dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options, GetCacheAllocator(rep_->table_options)); rep_->persistent_cache_options,
GetMemoryAllocator(rep_->table_options));
s = block_fetcher.ReadBlockContents(); s = block_fetcher.ReadBlockContents();
if (!s.ok()) { if (!s.ok()) {
break; break;
@ -2888,7 +2890,7 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file,
ReadOptions(), handle, &block, rep_->ioptions, ReadOptions(), handle, &block, rep_->ioptions,
false /*decompress*/, dummy_comp_dict /*compression dict*/, false /*decompress*/, dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options, rep_->persistent_cache_options,
GetCacheAllocator(rep_->table_options)); GetMemoryAllocator(rep_->table_options));
s = block_fetcher.ReadBlockContents(); s = block_fetcher.ReadBlockContents();
if (!s.ok()) { if (!s.ok()) {
rep_->filter.reset(new BlockBasedFilterBlockReader( rep_->filter.reset(new BlockBasedFilterBlockReader(

@ -305,7 +305,7 @@ class BlockBasedTable : public TableReader {
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version, BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, GetContext* get_context = nullptr, bool is_index = false, GetContext* get_context = nullptr,
CacheAllocator* allocator = nullptr); MemoryAllocator* allocator = nullptr);
// Put a raw block (maybe compressed) to the corresponding block caches. // Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then // This method will perform decompression against raw_block if needed and then
@ -324,7 +324,7 @@ class BlockBasedTable : public TableReader {
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version, CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW, bool is_index = false, Cache::Priority pri = Cache::Priority::LOW,
GetContext* get_context = nullptr, CacheAllocator* allocator = nullptr); GetContext* get_context = nullptr, MemoryAllocator* allocator = nullptr);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false. // after a call to Seek(key), until handle_result returns false.

@ -19,12 +19,12 @@
#include "table/block_based_table_reader.h" #include "table/block_based_table_reader.h"
#include "table/format.h" #include "table/format.h"
#include "table/persistent_cache_helper.h" #include "table/persistent_cache_helper.h"
#include "util/cache_allocator.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/memory_allocator.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/xxhash.h" #include "util/xxhash.h"

@ -11,7 +11,7 @@
#include "table/block.h" #include "table/block.h"
#include "table/format.h" #include "table/format.h"
#include "util/cache_allocator.h" #include "util/memory_allocator.h"
namespace rocksdb { namespace rocksdb {
class BlockFetcher { class BlockFetcher {
@ -28,7 +28,7 @@ class BlockFetcher {
BlockContents* contents, const ImmutableCFOptions& ioptions, BlockContents* contents, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict, bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, const PersistentCacheOptions& cache_options,
CacheAllocator* allocator = nullptr, MemoryAllocator* allocator = nullptr,
const bool immortal_source = false) const bool immortal_source = false)
: file_(file), : file_(file),
prefetch_buffer_(prefetch_buffer), prefetch_buffer_(prefetch_buffer),
@ -58,7 +58,7 @@ class BlockFetcher {
const bool immortal_source_; const bool immortal_source_;
const Slice& compression_dict_; const Slice& compression_dict_;
const PersistentCacheOptions& cache_options_; const PersistentCacheOptions& cache_options_;
CacheAllocator* allocator_; MemoryAllocator* allocator_;
Status status_; Status status_;
Slice slice_; Slice slice_;
char* used_buf_ = nullptr; char* used_buf_ = nullptr;

@ -19,12 +19,12 @@
#include "table/block_based_table_reader.h" #include "table/block_based_table_reader.h"
#include "table/block_fetcher.h" #include "table/block_fetcher.h"
#include "table/persistent_cache_helper.h" #include "table/persistent_cache_helper.h"
#include "util/cache_allocator.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/memory_allocator.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/xxhash.h" #include "util/xxhash.h"
@ -280,7 +280,7 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
Status UncompressBlockContentsForCompressionType( Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n, const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t format_version, BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions, CacheAllocator* allocator) { const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) {
CacheAllocationPtr ubuf; CacheAllocationPtr ubuf;
assert(uncompression_ctx.type() != kNoCompression && assert(uncompression_ctx.type() != kNoCompression &&
@ -405,7 +405,7 @@ Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
const char* data, size_t n, const char* data, size_t n,
BlockContents* contents, uint32_t format_version, BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions, const ImmutableCFOptions& ioptions,
CacheAllocator* allocator) { MemoryAllocator* allocator) {
assert(data[n] != kNoCompression); assert(data[n] != kNoCompression);
assert(data[n] == uncompression_ctx.type()); assert(data[n] == uncompression_ctx.type());
return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n, return UncompressBlockContentsForCompressionType(uncompression_ctx, data, n,

@ -25,8 +25,8 @@
#include "options/cf_options.h" #include "options/cf_options.h"
#include "port/port.h" // noexcept #include "port/port.h" // noexcept
#include "table/persistent_cache_options.h" #include "table/persistent_cache_options.h"
#include "util/cache_allocator.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "util/memory_allocator.h"
namespace rocksdb { namespace rocksdb {
@ -265,7 +265,7 @@ extern Status ReadBlockContents(
extern Status UncompressBlockContents( extern Status UncompressBlockContents(
const UncompressionContext& uncompression_ctx, const char* data, size_t n, const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version, BlockContents* contents, uint32_t compress_format_version,
const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr); const ImmutableCFOptions& ioptions, MemoryAllocator* allocator = nullptr);
// This is an extension to UncompressBlockContents that accepts // This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks // a specific compression type. This is used by un-wrapped blocks
@ -273,7 +273,7 @@ extern Status UncompressBlockContents(
extern Status UncompressBlockContentsForCompressionType( extern Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const char* data, size_t n, const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version, BlockContents* contents, uint32_t compress_format_version,
const ImmutableCFOptions& ioptions, CacheAllocator* allocator = nullptr); const ImmutableCFOptions& ioptions, MemoryAllocator* allocator = nullptr);
// Implementation details follow. Clients should ignore, // Implementation details follow. Clients should ignore,

@ -2486,14 +2486,14 @@ TEST_P(BlockBasedTableTest, BlockCacheLeak) {
} }
namespace { namespace {
class CustomCacheAllocator : public CacheAllocator { class CustomMemoryAllocator : public MemoryAllocator {
public: public:
virtual const char* Name() const override { return "CustomCacheAllocator"; } virtual const char* Name() const override { return "CustomMemoryAllocator"; }
void* Allocate(size_t size) override { void* Allocate(size_t size) override {
++numAllocations; ++numAllocations;
auto ptr = new char[size + 16]; auto ptr = new char[size + 16];
memcpy(ptr, "cache_allocator_", 16); // mangle first 16 bytes memcpy(ptr, "memory_allocator_", 16); // mangle first 16 bytes
return reinterpret_cast<void*>(ptr + 16); return reinterpret_cast<void*>(ptr + 16);
} }
void Deallocate(void* p) override { void Deallocate(void* p) override {
@ -2507,8 +2507,8 @@ class CustomCacheAllocator : public CacheAllocator {
}; };
} // namespace } // namespace
TEST_P(BlockBasedTableTest, CacheAllocator) { TEST_P(BlockBasedTableTest, MemoryAllocator) {
auto custom_cache_allocator = std::make_shared<CustomCacheAllocator>(); auto custom_memory_allocator = std::make_shared<CustomMemoryAllocator>();
{ {
Options opt; Options opt;
unique_ptr<InternalKeyComparator> ikc; unique_ptr<InternalKeyComparator> ikc;
@ -2517,7 +2517,7 @@ TEST_P(BlockBasedTableTest, CacheAllocator) {
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
table_options.block_size = 1024; table_options.block_size = 1024;
LRUCacheOptions lruOptions; LRUCacheOptions lruOptions;
lruOptions.cache_allocator = custom_cache_allocator; lruOptions.memory_allocator = custom_memory_allocator;
lruOptions.capacity = 16 * 1024 * 1024; lruOptions.capacity = 16 * 1024 * 1024;
lruOptions.num_shard_bits = 4; lruOptions.num_shard_bits = 4;
table_options.block_cache = NewLRUCache(std::move(lruOptions)); table_options.block_cache = NewLRUCache(std::move(lruOptions));
@ -2551,10 +2551,10 @@ TEST_P(BlockBasedTableTest, CacheAllocator) {
// out of scope, block cache should have been deleted, all allocations // out of scope, block cache should have been deleted, all allocations
// deallocated // deallocated
EXPECT_EQ(custom_cache_allocator->numAllocations.load(), EXPECT_EQ(custom_memory_allocator->numAllocations.load(),
custom_cache_allocator->numDeallocations.load()); custom_memory_allocator->numDeallocations.load());
// make sure that allocations actually happened through the cache allocator // make sure that allocations actually happened through the cache allocator
EXPECT_GT(custom_cache_allocator->numAllocations.load(), 0); EXPECT_GT(custom_memory_allocator->numAllocations.load(), 0);
} }
TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) { TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) {

@ -15,9 +15,9 @@
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "util/cache_allocator.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/compression_context_cache.h" #include "util/compression_context_cache.h"
#include "util/memory_allocator.h"
#ifdef SNAPPY #ifdef SNAPPY
#include <snappy.h> #include <snappy.h>
@ -500,7 +500,7 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
inline CacheAllocationPtr Zlib_Uncompress( inline CacheAllocationPtr Zlib_Uncompress(
const UncompressionContext& ctx, const char* input_data, const UncompressionContext& ctx, const char* input_data,
size_t input_length, int* decompress_size, uint32_t compress_format_version, size_t input_length, int* decompress_size, uint32_t compress_format_version,
CacheAllocator* allocator = nullptr, int windowBits = -14) { MemoryAllocator* allocator = nullptr, int windowBits = -14) {
#ifdef ZLIB #ifdef ZLIB
uint32_t output_len = 0; uint32_t output_len = 0;
if (compress_format_version == 2) { if (compress_format_version == 2) {
@ -662,7 +662,7 @@ inline bool BZip2_Compress(const CompressionContext& /*ctx*/,
// header in varint32 format // header in varint32 format
inline CacheAllocationPtr BZip2_Uncompress( inline CacheAllocationPtr BZip2_Uncompress(
const char* input_data, size_t input_length, int* decompress_size, const char* input_data, size_t input_length, int* decompress_size,
uint32_t compress_format_version, CacheAllocator* allocator = nullptr) { uint32_t compress_format_version, MemoryAllocator* allocator = nullptr) {
#ifdef BZIP2 #ifdef BZIP2
uint32_t output_len = 0; uint32_t output_len = 0;
if (compress_format_version == 2) { if (compress_format_version == 2) {
@ -819,7 +819,7 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionContext& ctx,
size_t input_length, size_t input_length,
int* decompress_size, int* decompress_size,
uint32_t compress_format_version, uint32_t compress_format_version,
CacheAllocator* allocator = nullptr) { MemoryAllocator* allocator = nullptr) {
#ifdef LZ4 #ifdef LZ4
uint32_t output_len = 0; uint32_t output_len = 0;
if (compress_format_version == 2) { if (compress_format_version == 2) {
@ -1031,11 +1031,10 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline CacheAllocationPtr ZSTD_Uncompress(const UncompressionContext& ctx, inline CacheAllocationPtr ZSTD_Uncompress(
const char* input_data, const UncompressionContext& ctx, const char* input_data,
size_t input_length, size_t input_length, int* decompress_size,
int* decompress_size, MemoryAllocator* allocator = nullptr) {
CacheAllocator* allocator = nullptr) {
#ifdef ZSTD #ifdef ZSTD
uint32_t output_len = 0; uint32_t output_len = 0;
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length, if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,

@ -6,12 +6,12 @@
#pragma once #pragma once
#include "rocksdb/cache_allocator.h" #include "rocksdb/memory_allocator.h"
namespace rocksdb { namespace rocksdb {
struct CustomDeleter { struct CustomDeleter {
CustomDeleter(CacheAllocator* a = nullptr) : allocator(a) {} CustomDeleter(MemoryAllocator* a = nullptr) : allocator(a) {}
void operator()(char* ptr) const { void operator()(char* ptr) const {
if (allocator) { if (allocator) {
@ -21,13 +21,13 @@ struct CustomDeleter {
} }
} }
CacheAllocator* allocator; MemoryAllocator* allocator;
}; };
using CacheAllocationPtr = std::unique_ptr<char[], CustomDeleter>; using CacheAllocationPtr = std::unique_ptr<char[], CustomDeleter>;
inline CacheAllocationPtr AllocateBlock(size_t size, inline CacheAllocationPtr AllocateBlock(size_t size,
CacheAllocator* allocator) { MemoryAllocator* allocator) {
if (allocator) { if (allocator) {
auto block = reinterpret_cast<char*>(allocator->Allocate(size)); auto block = reinterpret_cast<char*>(allocator->Allocate(size));
return CacheAllocationPtr(block, allocator); return CacheAllocationPtr(block, allocator);
Loading…
Cancel
Save