Move MemoryAllocator option from Cache to BlockBasedTableOptions (#4676)

Summary:
Per offline discussion with siying, `MemoryAllocator` and `Cache` should be decouple. The idea is that memory allocator handles memory allocation, while cache handle cache policy.

It is normal that external cache libraries pack couple the two components for better optimization. If we want to integrate with such library in the future, we can make a wrapper of the library implementing both `Cache` and `MemoryAllocator` interface.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4676

Differential Revision: D13047662

Pulled By: yiwu-arbug

fbshipit-source-id: cd42e246d80ab600b4de47d073f7d2db308ce6dd
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent abb1a8fc23
commit b32d087dbb
  1. 4
      HISTORY.md
  2. 19
      cache/lru_cache.cc
  3. 3
      cache/lru_cache.h
  4. 7
      cache/sharded_cache.cc
  5. 3
      cache/sharded_cache.h
  6. 3
      db/version_set.cc
  7. 27
      include/rocksdb/cache.h
  8. 5
      include/rocksdb/table.h
  9. 2
      options/options_settable_test.cc
  10. 2
      table/block_based_table_builder.cc
  11. 8
      table/block_based_table_factory.cc
  12. 1
      table/block_based_table_factory.h
  13. 4
      table/block_based_table_reader.cc
  14. 2
      table/table_test.cc

@ -3,9 +3,11 @@
### Public API Change ### Public API Change
* `NO_ITERATORS` is divided into two counters `NO_ITERATOR_CREATED` and `NO_ITERATOR_DELETE`. Both of them are only increasing now, just as other counters. * `NO_ITERATORS` is divided into two counters `NO_ITERATOR_CREATED` and `NO_ITERATOR_DELETE`. Both of them are only increasing now, just as other counters.
### New Features
* Introduced `Memoryllocator`, which lets the user specify custom memory allocator for block based table.
## 5.18.0 (11/12/2018) ## 5.18.0 (11/12/2018)
### New Features ### New Features
* Introduced `Memoryllocator`, which lets the user specify custom allocator for memory in block cache.
* Introduced `PerfContextByLevel` as part of `PerfContext` which allows storing perf context at each level. Also replaced `__thread` with `thread_local` keyword for perf_context. Added per-level perf context for bloom filter and `Get` query. * Introduced `PerfContextByLevel` as part of `PerfContext` which allows storing perf context at each level. Also replaced `__thread` with `thread_local` keyword for perf_context. Added per-level perf context for bloom filter and `Get` query.
* With level_compaction_dynamic_level_bytes = true, level multiplier may be adjusted automatically when Level 0 to 1 compaction is lagged behind. * With level_compaction_dynamic_level_bytes = true, level multiplier may be adjusted automatically when Level 0 to 1 compaction is lagged behind.
* Introduced DB option `atomic_flush`. If true, RocksDB supports flushing multiple column families and atomically committing the result to MANIFEST. Useful when WAL is disabled. * Introduced DB option `atomic_flush`. If true, RocksDB supports flushing multiple column families and atomically committing the result to MANIFEST. Useful when WAL is disabled.

19
cache/lru_cache.cc vendored

@ -461,10 +461,8 @@ std::string LRUCacheShard::GetPrintableOptions() const {
} }
LRUCache::LRUCache(size_t capacity, int num_shard_bits, LRUCache::LRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit, double high_pri_pool_ratio, bool strict_capacity_limit, double high_pri_pool_ratio)
std::shared_ptr<MemoryAllocator> allocator) : ShardedCache(capacity, num_shard_bits, strict_capacity_limit) {
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit,
std::move(allocator)) {
num_shards_ = 1 << num_shard_bits; num_shards_ = 1 << num_shard_bits;
shards_ = reinterpret_cast<LRUCacheShard*>( shards_ = reinterpret_cast<LRUCacheShard*>(
port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_)); port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_));
@ -539,14 +537,12 @@ double LRUCache::GetHighPriPoolRatio() {
std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) { std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) {
return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit, cache_opts.strict_capacity_limit,
cache_opts.high_pri_pool_ratio, cache_opts.high_pri_pool_ratio);
cache_opts.memory_allocator);
} }
std::shared_ptr<Cache> NewLRUCache( std::shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
size_t capacity, int num_shard_bits, bool strict_capacity_limit, bool strict_capacity_limit,
double high_pri_pool_ratio, double high_pri_pool_ratio) {
std::shared_ptr<MemoryAllocator> memory_allocator) {
if (num_shard_bits >= 20) { if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces return nullptr; // the cache cannot be sharded into too many fine pieces
} }
@ -558,8 +554,7 @@ std::shared_ptr<Cache> NewLRUCache(
num_shard_bits = GetDefaultCacheShardBits(capacity); num_shard_bits = GetDefaultCacheShardBits(capacity);
} }
return std::make_shared<LRUCache>(capacity, num_shard_bits, return std::make_shared<LRUCache>(capacity, num_shard_bits,
strict_capacity_limit, high_pri_pool_ratio, strict_capacity_limit, high_pri_pool_ratio);
std::move(memory_allocator));
} }
} // namespace rocksdb } // namespace rocksdb

3
cache/lru_cache.h vendored

@ -279,8 +279,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard {
class LRUCache : public ShardedCache { class LRUCache : public ShardedCache {
public: public:
LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio, double high_pri_pool_ratio);
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr);
virtual ~LRUCache(); virtual ~LRUCache();
virtual const char* Name() const override { return "LRUCache"; } virtual const char* Name() const override { return "LRUCache"; }
virtual CacheShard* GetShard(int shard) override; virtual CacheShard* GetShard(int shard) override;

@ -20,9 +20,8 @@
namespace rocksdb { namespace rocksdb {
ShardedCache::ShardedCache(size_t capacity, int num_shard_bits, ShardedCache::ShardedCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit, bool strict_capacity_limit)
std::shared_ptr<MemoryAllocator> allocator) : Cache(),
: Cache(std::move(allocator)),
num_shard_bits_(num_shard_bits), num_shard_bits_(num_shard_bits),
capacity_(capacity), capacity_(capacity),
strict_capacity_limit_(strict_capacity_limit), strict_capacity_limit_(strict_capacity_limit),
@ -144,8 +143,6 @@ std::string ShardedCache::GetPrintableOptions() const {
strict_capacity_limit_); strict_capacity_limit_);
ret.append(buffer); ret.append(buffer);
} }
snprintf(buffer, kBufferSize, " memory_allocator : %s\n",
memory_allocator() ? memory_allocator()->Name() : "None");
ret.append(buffer); ret.append(buffer);
ret.append(GetShard(0)->GetPrintableOptions()); ret.append(GetShard(0)->GetPrintableOptions());
return ret; return ret;

@ -47,8 +47,7 @@ class CacheShard {
// Keys are sharded by the highest num_shard_bits bits of hash value. // Keys are sharded by the highest num_shard_bits bits of hash value.
class ShardedCache : public Cache { class ShardedCache : public Cache {
public: public:
ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit);
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr);
virtual ~ShardedCache() = default; virtual ~ShardedCache() = default;
virtual const char* Name() const override = 0; virtual const char* Name() const override = 0;
virtual CacheShard* GetShard(int shard) = 0; virtual CacheShard* GetShard(int shard) = 0;

@ -1259,7 +1259,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
} else if (fp.GetHitFileLevel() >= 2) { } else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP); RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
} }
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel()); PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
fp.GetHitFileLevel());
return; return;
case GetContext::kDeleted: case GetContext::kDeleted:
// Use empty error message for speed // Use empty error message for speed

@ -25,7 +25,6 @@
#include <stdint.h> #include <stdint.h>
#include <memory> #include <memory>
#include <string> #include <string>
#include "rocksdb/memory_allocator.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
@ -59,20 +58,13 @@ struct LRUCacheOptions {
// BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority. // BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority.
double high_pri_pool_ratio = 0.0; double high_pri_pool_ratio = 0.0;
// If non-nullptr will use this allocator instead of system allocator when
// allocating memory for cache blocks. Call this method before you start using
// the cache!
std::shared_ptr<MemoryAllocator> memory_allocator;
LRUCacheOptions() {} LRUCacheOptions() {}
LRUCacheOptions(size_t _capacity, int _num_shard_bits, LRUCacheOptions(size_t _capacity, int _num_shard_bits,
bool _strict_capacity_limit, double _high_pri_pool_ratio, bool _strict_capacity_limit, double _high_pri_pool_ratio)
std::shared_ptr<MemoryAllocator> _memory_allocator = nullptr)
: capacity(_capacity), : capacity(_capacity),
num_shard_bits(_num_shard_bits), num_shard_bits(_num_shard_bits),
strict_capacity_limit(_strict_capacity_limit), strict_capacity_limit(_strict_capacity_limit),
high_pri_pool_ratio(_high_pri_pool_ratio), high_pri_pool_ratio(_high_pri_pool_ratio) {}
memory_allocator(std::move(_memory_allocator)) {}
}; };
// Create a new cache with a fixed size capacity. The cache is sharded // Create a new cache with a fixed size capacity. The cache is sharded
@ -83,10 +75,10 @@ struct LRUCacheOptions {
// high_pri_pool_pct. // high_pri_pool_pct.
// num_shard_bits = -1 means it is automatically determined: every shard // num_shard_bits = -1 means it is automatically determined: every shard
// will be at least 512KB and number of shard bits will not exceed 6. // will be at least 512KB and number of shard bits will not exceed 6.
extern std::shared_ptr<Cache> NewLRUCache( extern std::shared_ptr<Cache> NewLRUCache(size_t capacity,
size_t capacity, int num_shard_bits = -1, int num_shard_bits = -1,
bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0, bool strict_capacity_limit = false,
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr); double high_pri_pool_ratio = 0.0);
extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts); extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts);
@ -105,8 +97,7 @@ class Cache {
// likely to get evicted than low priority entries. // likely to get evicted than low priority entries.
enum class Priority { HIGH, LOW }; enum class Priority { HIGH, LOW };
Cache(std::shared_ptr<MemoryAllocator> allocator = nullptr) Cache() {}
: memory_allocator_(std::move(allocator)) {}
// Destroys all existing entries by calling the "deleter" // Destroys all existing entries by calling the "deleter"
// function that was passed via the Insert() function. // function that was passed via the Insert() function.
@ -237,14 +228,10 @@ class Cache {
virtual void TEST_mark_as_data_block(const Slice& /*key*/, virtual void TEST_mark_as_data_block(const Slice& /*key*/,
size_t /*charge*/) {} size_t /*charge*/) {}
MemoryAllocator* memory_allocator() const { return memory_allocator_.get(); }
private: private:
// No copying allowed // No copying allowed
Cache(const Cache&); Cache(const Cache&);
Cache& operator=(const Cache&); Cache& operator=(const Cache&);
std::shared_ptr<MemoryAllocator> memory_allocator_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -24,6 +24,7 @@
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/memory_allocator.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
@ -255,6 +256,10 @@ struct BlockBasedTableOptions {
// Align data blocks on lesser of page size and block size // Align data blocks on lesser of page size and block size
bool block_align = false; bool block_align = false;
// If non-nullptr will use this allocator instead of malloc/free to
// allocating memory for blocks.
std::shared_ptr<MemoryAllocator> memory_allocator;
}; };
// Table Properties that are specific to block-based table properties. // Table Properties that are specific to block-based table properties.

@ -97,6 +97,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
sizeof(std::shared_ptr<Cache>)}, sizeof(std::shared_ptr<Cache>)},
{offsetof(struct BlockBasedTableOptions, filter_policy), {offsetof(struct BlockBasedTableOptions, filter_policy),
sizeof(std::shared_ptr<const FilterPolicy>)}, sizeof(std::shared_ptr<const FilterPolicy>)},
{offsetof(struct BlockBasedTableOptions, memory_allocator),
sizeof(std::shared_ptr<MemoryAllocator>)},
}; };
// In this test, we catch a new option of BlockBasedTableOptions that is not // In this test, we catch a new option of BlockBasedTableOptions that is not

@ -673,7 +673,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
size_t size = block_contents.size(); size_t size = block_contents.size();
auto ubuf = auto ubuf =
AllocateBlock(size + 1, block_cache_compressed->memory_allocator()); AllocateBlock(size + 1, r->table_options.memory_allocator.get());
memcpy(ubuf.get(), block_contents.data(), size); memcpy(ubuf.get(), block_contents.data(), size);
ubuf[size] = type; ubuf[size] = type;

@ -383,6 +383,14 @@ std::string BlockBasedTableFactory::GetPrintableTableOptions() const {
snprintf(buffer, kBufferSize, " block_align: %d\n", snprintf(buffer, kBufferSize, " block_align: %d\n",
table_options_.block_align); table_options_.block_align);
ret.append(buffer); ret.append(buffer);
snprintf(buffer, kBufferSize, " memory_allocator: %p\n",
table_options_.memory_allocator.get());
ret.append(buffer);
if (table_options_.memory_allocator) {
snprintf(buffer, kBufferSize, " memory_allocator_name: %s\n",
table_options_.memory_allocator->Name());
ret.append(buffer);
}
return ret; return ret;
} }

@ -99,6 +99,7 @@ static std::unordered_map<std::string, OptionTypeInfo>
/* currently not supported /* currently not supported
std::shared_ptr<Cache> block_cache = nullptr; std::shared_ptr<Cache> block_cache = nullptr;
std::shared_ptr<Cache> block_cache_compressed = nullptr; std::shared_ptr<Cache> block_cache_compressed = nullptr;
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr;
*/ */
{"flush_block_policy_factory", {"flush_block_policy_factory",
{offsetof(struct BlockBasedTableOptions, flush_block_policy_factory), {offsetof(struct BlockBasedTableOptions, flush_block_policy_factory),

@ -97,9 +97,7 @@ Status ReadBlockFromFile(
inline MemoryAllocator* GetMemoryAllocator( inline MemoryAllocator* GetMemoryAllocator(
const BlockBasedTableOptions& table_options) { const BlockBasedTableOptions& table_options) {
return table_options.block_cache.get() return table_options.memory_allocator.get();
? table_options.block_cache->memory_allocator()
: nullptr;
} }
// Delete the resource that is held by the iterator. // Delete the resource that is held by the iterator.

@ -2516,8 +2516,8 @@ TEST_P(BlockBasedTableTest, MemoryAllocator) {
opt.compression = kNoCompression; opt.compression = kNoCompression;
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
table_options.block_size = 1024; table_options.block_size = 1024;
table_options.memory_allocator = custom_memory_allocator;
LRUCacheOptions lruOptions; LRUCacheOptions lruOptions;
lruOptions.memory_allocator = custom_memory_allocator;
lruOptions.capacity = 16 * 1024 * 1024; lruOptions.capacity = 16 * 1024 * 1024;
lruOptions.num_shard_bits = 4; lruOptions.num_shard_bits = 4;
table_options.block_cache = NewLRUCache(std::move(lruOptions)); table_options.block_cache = NewLRUCache(std::move(lruOptions));

Loading…
Cancel
Save