Cache to have an option to fail Cache::Insert() when full

Summary:
Cache to have an option to fail Cache::Insert() when full. Update call sites to check status and handle error.

I totally have no idea what's correct behavior of all the call sites when they encounter error. Please let me know if you see something wrong or more unit test is needed.

Test Plan: make check -j32, see tests pass.

Reviewers: anthony, yhchiang, andrewkr, IslamAbdelRahman, kradhakrishnan, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D54705
main
Yi Wu 9 years ago
parent ee8cc35201
commit f71fc77b7c
  1. 1
      CMakeLists.txt
  2. 1
      HISTORY.md
  3. 4
      Makefile
  4. 237
      db/db_block_cache_test.cc
  5. 13
      db/table_cache.cc
  6. 35
      include/rocksdb/cache.h
  7. 10
      include/rocksdb/statistics.h
  8. 6
      table/block_based_table_builder.cc
  9. 74
      table/block_based_table_reader.cc
  10. 91
      util/cache.cc
  11. 6
      util/cache_bench.cc
  12. 81
      util/cache_test.cc

@ -338,6 +338,7 @@ set(TESTS
db/db_tailing_iter_test.cc
db/db_test.cc
db/db_test2.cc
db/db_block_cache_test.cc
db/db_universal_compaction_test.cc
db/db_wal_test.cc
db/dbformat_test.cc

@ -2,6 +2,7 @@
## Unreleased
### Public API Changes
* Change default of BlockBasedTableOptions.format_version to 2. It means default DB created by 4.6 or up cannot be opened by RocksDB version 3.9 or earlier.
* Added strict_capacity_limit option to NewLRUCache. If the flag is set to true, insert to cache will fail if no enough capacity can be free. Signiture of Cache::Insert() is updated accordingly.
### New Features
* Add CompactionPri::kMinOverlappingRatio, a compaction picking mode friendly to write amplification.
* Deprecate Iterator::IsKeyPinned() and replace it with Iterator::GetProperty() with prop_name="rocksdb.iterator.is.key.pinned"

@ -246,6 +246,7 @@ BENCHTOOLOBJECTS = $(BENCH_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL)
TESTS = \
db_test \
db_test2 \
db_block_cache_test \
db_iter_test \
db_log_iter_test \
db_compaction_filter_test \
@ -794,6 +795,9 @@ db_test: db/db_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
db_test2: db/db_test2.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_block_cache_test: db/db_block_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_log_iter_test: db/db_log_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

@ -0,0 +1,237 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <cstdlib>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
namespace rocksdb {
static uint64_t TestGetTickerCount(const Options& options,
Tickers ticker_type) {
return options.statistics->getTickerCount(ticker_type);
}
class DBBlockCacheTest : public DBTestBase {
private:
size_t miss_count_ = 0;
size_t hit_count_ = 0;
size_t insert_count_ = 0;
size_t failure_count_ = 0;
size_t compressed_miss_count_ = 0;
size_t compressed_hit_count_ = 0;
size_t compressed_insert_count_ = 0;
size_t compressed_failure_count_ = 0;
public:
const size_t kNumBlocks = 10;
const size_t kValueSize = 100;
DBBlockCacheTest() : DBTestBase("/db_block_cache_test") {}
BlockBasedTableOptions GetTableOptions() {
BlockBasedTableOptions table_options;
// Set a small enough block size so that each key-value get its own block.
table_options.block_size = 1;
return table_options;
}
Options GetOptions(const BlockBasedTableOptions& table_options) {
Options options = CurrentOptions();
options.create_if_missing = true;
// options.compression = kNoCompression;
options.statistics = rocksdb::CreateDBStatistics();
options.table_factory.reset(new BlockBasedTableFactory(table_options));
return options;
}
void InitTable(const Options& options) {
std::string value(kValueSize, 'a');
for (size_t i = 0; i < kNumBlocks; i++) {
ASSERT_OK(Put(ToString(i), value.c_str()));
}
}
void RecordCacheCounters(const Options& options) {
miss_count_ = TestGetTickerCount(options, BLOCK_CACHE_MISS);
hit_count_ = TestGetTickerCount(options, BLOCK_CACHE_HIT);
insert_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD);
failure_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES);
compressed_miss_count_ =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS);
compressed_hit_count_ =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT);
compressed_insert_count_ =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD);
compressed_failure_count_ =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
}
void CheckCacheCounters(const Options& options, size_t expected_misses,
size_t expected_hits, size_t expected_inserts,
size_t expected_failures) {
size_t new_miss_count = TestGetTickerCount(options, BLOCK_CACHE_MISS);
size_t new_hit_count = TestGetTickerCount(options, BLOCK_CACHE_HIT);
size_t new_insert_count = TestGetTickerCount(options, BLOCK_CACHE_ADD);
size_t new_failure_count =
TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES);
ASSERT_EQ(miss_count_ + expected_misses, new_miss_count);
ASSERT_EQ(hit_count_ + expected_hits, new_hit_count);
ASSERT_EQ(insert_count_ + expected_inserts, new_insert_count);
ASSERT_EQ(failure_count_ + expected_failures, new_failure_count);
miss_count_ = new_miss_count;
hit_count_ = new_hit_count;
insert_count_ = new_insert_count;
failure_count_ = new_failure_count;
}
void CheckCompressedCacheCounters(const Options& options,
size_t expected_misses,
size_t expected_hits,
size_t expected_inserts,
size_t expected_failures) {
size_t new_miss_count =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS);
size_t new_hit_count =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT);
size_t new_insert_count =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD);
size_t new_failure_count =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
ASSERT_EQ(compressed_miss_count_ + expected_misses, new_miss_count);
ASSERT_EQ(compressed_hit_count_ + expected_hits, new_hit_count);
ASSERT_EQ(compressed_insert_count_ + expected_inserts, new_insert_count);
ASSERT_EQ(compressed_failure_count_ + expected_failures, new_failure_count);
compressed_miss_count_ = new_miss_count;
compressed_hit_count_ = new_hit_count;
compressed_insert_count_ = new_insert_count;
compressed_failure_count_ = new_failure_count;
}
};
TEST_F(DBBlockCacheTest, TestWithoutCompressedBlockCache) {
ReadOptions read_options;
auto table_options = GetTableOptions();
auto options = GetOptions(table_options);
InitTable(options);
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
table_options.block_cache = cache;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
Reopen(options);
RecordCacheCounters(options);
std::vector<std::unique_ptr<Iterator>> iterators(kNumBlocks - 1);
Iterator* iter = nullptr;
// Load blocks into cache.
for (size_t i = 0; i < kNumBlocks - 1; i++) {
iter = db_->NewIterator(read_options);
iter->Seek(ToString(i));
ASSERT_OK(iter->status());
CheckCacheCounters(options, 1, 0, 1, 0);
iterators[i].reset(iter);
}
size_t usage = cache->GetUsage();
ASSERT_LT(0, usage);
cache->SetCapacity(usage);
ASSERT_EQ(usage, cache->GetPinnedUsage());
// Test with strict capacity limit.
cache->SetStrictCapacityLimit(true);
iter = db_->NewIterator(read_options);
iter->Seek(ToString(kNumBlocks - 1));
ASSERT_TRUE(iter->status().IsIncomplete());
CheckCacheCounters(options, 1, 0, 0, 1);
delete iter;
iter = nullptr;
// Release interators and access cache again.
for (size_t i = 0; i < kNumBlocks - 1; i++) {
iterators[i].reset();
CheckCacheCounters(options, 0, 0, 0, 0);
}
ASSERT_EQ(0, cache->GetPinnedUsage());
for (size_t i = 0; i < kNumBlocks - 1; i++) {
iter = db_->NewIterator(read_options);
iter->Seek(ToString(i));
ASSERT_OK(iter->status());
CheckCacheCounters(options, 0, 1, 0, 0);
iterators[i].reset(iter);
}
}
TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) {
ReadOptions read_options;
auto table_options = GetTableOptions();
auto options = GetOptions(table_options);
InitTable(options);
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
std::shared_ptr<Cache> compressed_cache = NewLRUCache(0, 0, false);
table_options.block_cache = cache;
table_options.block_cache_compressed = compressed_cache;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
Reopen(options);
RecordCacheCounters(options);
std::vector<std::unique_ptr<Iterator>> iterators(kNumBlocks - 1);
Iterator* iter = nullptr;
// Load blocks into cache.
for (size_t i = 0; i < kNumBlocks - 1; i++) {
iter = db_->NewIterator(read_options);
iter->Seek(ToString(i));
ASSERT_OK(iter->status());
CheckCacheCounters(options, 1, 0, 1, 0);
CheckCompressedCacheCounters(options, 1, 0, 1, 0);
iterators[i].reset(iter);
}
size_t usage = cache->GetUsage();
ASSERT_LT(0, usage);
ASSERT_EQ(usage, cache->GetPinnedUsage());
size_t compressed_usage = compressed_cache->GetUsage();
ASSERT_LT(0, compressed_usage);
// Compressed block cache cannot be pinned.
ASSERT_EQ(0, compressed_cache->GetPinnedUsage());
// Set strict capacity limit flag. Now block will only load into compressed
// block cache.
cache->SetCapacity(usage);
cache->SetStrictCapacityLimit(true);
ASSERT_EQ(usage, cache->GetPinnedUsage());
// compressed_cache->SetCapacity(compressed_usage);
compressed_cache->SetCapacity(0);
// compressed_cache->SetStrictCapacityLimit(true);
iter = db_->NewIterator(read_options);
iter->Seek(ToString(kNumBlocks - 1));
ASSERT_TRUE(iter->status().IsIncomplete());
CheckCacheCounters(options, 1, 0, 0, 1);
CheckCompressedCacheCounters(options, 1, 0, 1, 0);
delete iter;
iter = nullptr;
// Clear strict capacity limit flag. This time we shall hit compressed block
// cache.
cache->SetStrictCapacityLimit(false);
iter = db_->NewIterator(read_options);
iter->Seek(ToString(kNumBlocks - 1));
ASSERT_OK(iter->status());
CheckCacheCounters(options, 1, 0, 1, 0);
CheckCompressedCacheCounters(options, 0, 1, 0, 0);
delete iter;
iter = nullptr;
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -143,8 +143,12 @@ Status TableCache::FindTable(const EnvOptions& env_options,
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
*handle = cache_->Insert(key, table_reader.release(), 1,
&DeleteEntry<TableReader>);
s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
handle);
if (s.ok()) {
// Release ownership of table reader.
table_reader.release();
}
}
}
return s;
@ -285,9 +289,8 @@ Status TableCache::Get(const ReadOptions& options,
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
auto row_handle = ioptions_.row_cache->Insert(
row_cache_key.GetKey(), row_ptr, charge, &DeleteEntry<std::string>);
ioptions_.row_cache->Release(row_handle);
ioptions_.row_cache->Insert(row_cache_key.GetKey(), row_ptr, charge,
&DeleteEntry<std::string>);
}
#endif // ROCKSDB_LITE

@ -25,6 +25,7 @@
#include <memory>
#include <stdint.h>
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
namespace rocksdb {
@ -33,12 +34,15 @@ using std::shared_ptr;
class Cache;
// Create a new cache with a fixed size capacity. The cache is sharded
// to 2^numShardBits shards, by hash of the key. The total capacity
// to 2^num_shard_bits shards, by hash of the key. The total capacity
// is divided and evenly assigned to each shard.
//
// The functions without parameter numShardBits uses default value, which is 4
// The parameter num_shard_bits defaults to 4, and strict_capacity_limit
// defaults to false.
extern shared_ptr<Cache> NewLRUCache(size_t capacity);
extern shared_ptr<Cache> NewLRUCache(size_t capacity, int numShardBits);
extern shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits);
extern shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit);
class Cache {
public:
@ -55,15 +59,22 @@ class Cache {
// Insert a mapping from key->value into the cache and assign it
// the specified charge against the total cache capacity.
// If strict_capacity_limit is true and cache reaches its full capacity,
// return Status::Incomplete.
//
// Returns a handle that corresponds to the mapping. The caller
// must call this->Release(handle) when the returned mapping is no
// longer needed.
// If handle is not nullptr, returns a handle that corresponds to the
// mapping. The caller must call this->Release(handle) when the returned
// mapping is no longer needed. In case of error caller is responsible to
// cleanup the value (i.e. calling "deleter").
//
// If handle is nullptr, it is as if Release is called immediately after
// insert. In case of error value will be cleanup.
//
// When the inserted entry is no longer needed, the key and
// value will be passed to "deleter".
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) = 0;
virtual Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr) = 0;
// If the cache has no mapping for "key", returns nullptr.
//
@ -100,6 +111,14 @@ class Cache {
// purge the released entries from the cache in order to lower the usage
virtual void SetCapacity(size_t capacity) = 0;
// Set whether to return error on insertion when cache reaches its full
// capacity.
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0;
// Set whether to return error on insertion when cache reaches its full
// capacity.
virtual bool HasStrictCapacityLimit() const = 0;
// returns the maximum configured capacity of the cache
virtual size_t GetCapacity() const = 0;

@ -33,6 +33,8 @@ enum Tickers : uint32_t {
BLOCK_CACHE_HIT,
// # of blocks added to block cache.
BLOCK_CACHE_ADD,
// # of failures when adding blocks to block cache.
BLOCK_CACHE_ADD_FAILURES,
// # of times cache miss when accessing index block from block cache.
BLOCK_CACHE_INDEX_MISS,
// # of times cache hit when accessing index block from block cache.
@ -140,6 +142,10 @@ enum Tickers : uint32_t {
GET_UPDATES_SINCE_CALLS,
BLOCK_CACHE_COMPRESSED_MISS, // miss in the compressed block cache
BLOCK_CACHE_COMPRESSED_HIT, // hit in the compressed block cache
// Number of blocks added to comopressed block cache
BLOCK_CACHE_COMPRESSED_ADD,
// Number of failures when adding blocks to compressed block cache
BLOCK_CACHE_COMPRESSED_ADD_FAILURES,
WAL_FILE_SYNCED, // Number of times WAL sync is done
WAL_FILE_BYTES, // Number of bytes written to WAL
@ -176,6 +182,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{BLOCK_CACHE_MISS, "rocksdb.block.cache.miss"},
{BLOCK_CACHE_HIT, "rocksdb.block.cache.hit"},
{BLOCK_CACHE_ADD, "rocksdb.block.cache.add"},
{BLOCK_CACHE_ADD_FAILURES, "rocksdb.block.cache.add.failures"},
{BLOCK_CACHE_INDEX_MISS, "rocksdb.block.cache.index.miss"},
{BLOCK_CACHE_INDEX_HIT, "rocksdb.block.cache.index.hit"},
{BLOCK_CACHE_FILTER_MISS, "rocksdb.block.cache.filter.miss"},
@ -227,6 +234,9 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls"},
{BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss"},
{BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit"},
{BLOCK_CACHE_COMPRESSED_ADD, "rocksdb.block.cachecompressed.add"},
{BLOCK_CACHE_COMPRESSED_ADD_FAILURES,
"rocksdb.block.cachecompressed.add.failures"},
{WAL_FILE_SYNCED, "rocksdb.wal.synced"},
{WAL_FILE_BYTES, "rocksdb.wal.bytes"},
{WRITE_DONE_BY_SELF, "rocksdb.write.self"},

@ -703,7 +703,6 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
if (type != kNoCompression && block_cache_compressed != nullptr) {
Cache::Handle* cache_handle = nullptr;
size_t size = block_contents.size();
std::unique_ptr<char[]> ubuf(new char[size + 1]);
@ -723,9 +722,8 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
(end - r->compressed_cache_key_prefix));
// Insert into compressed block cache.
cache_handle = block_cache_compressed->Insert(
key, block, block->usable_size(), &DeleteCachedBlock);
block_cache_compressed->Release(cache_handle);
block_cache_compressed->Insert(key, block, block->usable_size(),
&DeleteCachedBlock);
// Invalidate OS cache.
r->file->InvalidateCache(static_cast<size_t>(r->offset), size);

@ -740,11 +740,16 @@ Status BlockBasedTable::GetDataBlockFromCache(
assert(block->value->compression_type() == kNoCompression);
if (block_cache != nullptr && block->value->cachable() &&
read_options.fill_cache) {
block->cache_handle = block_cache->Insert(block_cache_key, block->value,
block->value->usable_size(),
&DeleteCachedEntry<Block>);
assert(reinterpret_cast<Block*>(
block_cache->Value(block->cache_handle)) == block->value);
s = block_cache->Insert(
block_cache_key, block->value, block->value->usable_size(),
&DeleteCachedEntry<Block>, &(block->cache_handle));
if (s.ok()) {
RecordTick(statistics, BLOCK_CACHE_ADD);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
delete block->value;
block->value = nullptr;
}
}
}
@ -784,27 +789,37 @@ Status BlockBasedTable::PutDataBlockToCache(
// Release the hold on the compressed cache entry immediately.
if (block_cache_compressed != nullptr && raw_block != nullptr &&
raw_block->cachable()) {
auto cache_handle = block_cache_compressed->Insert(
compressed_block_cache_key, raw_block, raw_block->usable_size(),
s = block_cache_compressed->Insert(compressed_block_cache_key, raw_block,
raw_block->usable_size(),
&DeleteCachedEntry<Block>);
block_cache_compressed->Release(cache_handle);
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS);
if (s.ok()) {
// Avoid the following code to delete this cached block.
raw_block = nullptr;
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD);
} else {
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
}
}
delete raw_block;
// insert into uncompressed block cache
assert((block->value->compression_type() == kNoCompression));
if (block_cache != nullptr && block->value->cachable()) {
block->cache_handle = block_cache->Insert(block_cache_key, block->value,
s = block_cache->Insert(block_cache_key, block->value,
block->value->usable_size(),
&DeleteCachedEntry<Block>);
&DeleteCachedEntry<Block>, &(block->cache_handle));
if (s.ok()) {
assert(block->cache_handle != nullptr);
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
block->value->usable_size());
assert(reinterpret_cast<Block*>(block_cache->Value(block->cache_handle)) ==
block->value);
assert(reinterpret_cast<Block*>(
block_cache->Value(block->cache_handle)) == block->value);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
delete block->value;
block->value = nullptr;
}
}
return s;
@ -891,10 +906,17 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
filter = ReadFilter(rep_, &filter_size);
if (filter != nullptr) {
assert(filter_size > 0);
cache_handle = block_cache->Insert(key, filter, filter_size,
&DeleteCachedEntry<FilterBlockReader>);
Status s = block_cache->Insert(key, filter, filter_size,
&DeleteCachedEntry<FilterBlockReader>,
&cache_handle);
if (s.ok()) {
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter_size);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
delete filter;
return CachableEntry<FilterBlockReader>();
}
}
}
@ -937,10 +959,18 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
// Create index reader and put it in the cache.
Status s;
s = CreateIndexReader(&index_reader);
if (s.ok()) {
s = block_cache->Insert(key, index_reader, index_reader->usable_size(),
&DeleteCachedEntry<IndexReader>, &cache_handle);
}
if (!s.ok()) {
if (s.ok()) {
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
index_reader->usable_size());
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
// make sure if something goes wrong, index_reader shall remain intact.
assert(index_reader == nullptr);
if (input_iter != nullptr) {
input_iter->SetStatus(s);
return input_iter;
@ -949,12 +979,6 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
}
}
cache_handle =
block_cache->Insert(key, index_reader, index_reader->usable_size(),
&DeleteCachedEntry<IndexReader>);
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
index_reader->usable_size());
}
assert(cache_handle);
@ -1036,7 +1060,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
}
// Didn't get any data from block caches.
if (block.value == nullptr) {
if (s.ok() && block.value == nullptr) {
if (no_io) {
// Could not read from block_cache and can't do IO
if (input_iter != nullptr) {
@ -1055,7 +1079,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
}
InternalIterator* iter;
if (block.value != nullptr) {
if (s.ok() && block.value != nullptr) {
iter = block.value->NewIterator(&rep->internal_comparator, input_iter);
if (block.cache_handle != nullptr) {
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,

@ -196,10 +196,13 @@ class LRUCache {
// free the needed space
void SetCapacity(size_t capacity);
// Set the flag to reject insertion if cache if full.
void SetStrictCapacityLimit(bool strict_capacity_limit);
// Like Cache methods, but with an extra "hash" parameter.
Cache::Handle* Insert(const Slice& key, uint32_t hash,
void* value, size_t charge,
void (*deleter)(const Slice& key, void* value));
Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle);
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
@ -245,6 +248,9 @@ class LRUCache {
// Memory size for entries residing only in the LRU list
size_t lru_usage_;
// Whether to reject insertion if cache reaches its full capacity.
bool strict_capacity_limit_;
// mutex_ protects the following state.
// We don't count mutex_ as the cache's internal state so semantically we
// don't mind mutex_ invoking the non-const actions.
@ -336,6 +342,11 @@ void LRUCache::SetCapacity(size_t capacity) {
}
}
void LRUCache::SetStrictCapacityLimit(bool strict_capacity_limit) {
MutexLock l(&mutex_);
strict_capacity_limit_ = strict_capacity_limit;
}
Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle* e = table_.Lookup(key, hash);
@ -350,6 +361,9 @@ Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
}
void LRUCache::Release(Cache::Handle* handle) {
if (handle == nullptr) {
return;
}
LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
bool last_reference = false;
{
@ -383,15 +397,16 @@ void LRUCache::Release(Cache::Handle* handle) {
}
}
Cache::Handle* LRUCache::Insert(
const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
Status LRUCache::Insert(const Slice& key, uint32_t hash, void* value,
size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle) {
// Allocate the memory here outside of the mutex
// If the cache is full, we'll have to release it
// It shouldn't happen very often though.
LRUHandle* e = reinterpret_cast<LRUHandle*>(
new char[sizeof(LRUHandle) - 1 + key.size()]);
Status s;
autovector<LRUHandle*> last_reference_list;
e->value = value;
@ -399,7 +414,9 @@ Cache::Handle* LRUCache::Insert(
e->charge = charge;
e->key_length = key.size();
e->hash = hash;
e->refs = 2; // One from LRUCache, one for the returned handle
e->refs = (handle == nullptr
? 1
: 2); // One from LRUCache, one for the returned handle
e->next = e->prev = nullptr;
e->in_cache = true;
memcpy(e->key_data, key.data(), key.size());
@ -411,6 +428,15 @@ Cache::Handle* LRUCache::Insert(
// is freed or the lru list is empty
EvictFromLRU(charge, &last_reference_list);
if (strict_capacity_limit_ && usage_ - lru_usage_ + charge > capacity_) {
if (handle == nullptr) {
last_reference_list.push_back(e);
} else {
delete[] reinterpret_cast<char*>(e);
*handle = nullptr;
}
s = Status::Incomplete("Insert failed due to LRU cache being full.");
} else {
// insert into the cache
// note that the cache might get larger than its capacity if not enough
// space was freed
@ -426,6 +452,13 @@ Cache::Handle* LRUCache::Insert(
last_reference_list.push_back(old);
}
}
if (handle == nullptr) {
LRU_Append(e);
} else {
*handle = reinterpret_cast<Cache::Handle*>(e);
}
s = Status::OK();
}
}
// we free the entries here outside of mutex for
@ -434,7 +467,7 @@ Cache::Handle* LRUCache::Insert(
entry->Free();
}
return reinterpret_cast<Cache::Handle*>(e);
return s;
}
void LRUCache::Erase(const Slice& key, uint32_t hash) {
@ -472,6 +505,7 @@ class ShardedLRUCache : public Cache {
uint64_t last_id_;
int num_shard_bits_;
size_t capacity_;
bool strict_capacity_limit_;
static inline uint32_t HashSlice(const Slice& s) {
return Hash(s.data(), s.size(), 0);
@ -483,13 +517,18 @@ class ShardedLRUCache : public Cache {
}
public:
ShardedLRUCache(size_t capacity, int num_shard_bits)
: last_id_(0), num_shard_bits_(num_shard_bits), capacity_(capacity) {
ShardedLRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit)
: last_id_(0),
num_shard_bits_(num_shard_bits),
capacity_(capacity),
strict_capacity_limit_(strict_capacity_limit) {
int num_shards = 1 << num_shard_bits_;
shards_ = new LRUCache[num_shards];
const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
for (int s = 0; s < num_shards; s++) {
shards_[s].SetCapacity(per_shard);
shards_[s].SetStrictCapacityLimit(strict_capacity_limit);
}
}
virtual ~ShardedLRUCache() {
@ -504,11 +543,19 @@ class ShardedLRUCache : public Cache {
}
capacity_ = capacity;
}
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key,
void* value)) override {
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
int num_shards = 1 << num_shard_bits_;
for (int s = 0; s < num_shards; s++) {
shards_[s].SetStrictCapacityLimit(strict_capacity_limit);
}
strict_capacity_limit_ = strict_capacity_limit;
}
virtual Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle) override {
const uint32_t hash = HashSlice(key);
return shards_[Shard(hash)].Insert(key, hash, value, charge, deleter);
return shards_[Shard(hash)].Insert(key, hash, value, charge, deleter,
handle);
}
virtual Handle* Lookup(const Slice& key) override {
const uint32_t hash = HashSlice(key);
@ -531,6 +578,10 @@ class ShardedLRUCache : public Cache {
}
virtual size_t GetCapacity() const override { return capacity_; }
virtual bool HasStrictCapacityLimit() const override {
return strict_capacity_limit_;
}
virtual size_t GetUsage() const override {
// We will not lock the cache when getting the usage from shards.
int num_shards = 1 << num_shard_bits_;
@ -569,14 +620,20 @@ class ShardedLRUCache : public Cache {
} // end anonymous namespace
shared_ptr<Cache> NewLRUCache(size_t capacity) {
return NewLRUCache(capacity, kNumShardBits);
return NewLRUCache(capacity, kNumShardBits, false);
}
shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits) {
return NewLRUCache(capacity, num_shard_bits, false);
}
shared_ptr<Cache> NewLRUCache(size_t capacity, int num_shard_bits,
bool strict_capacity_limit) {
if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces
}
return std::make_shared<ShardedLRUCache>(capacity, num_shard_bits);
return std::make_shared<ShardedLRUCache>(capacity, num_shard_bits,
strict_capacity_limit);
}
} // namespace rocksdb

@ -142,8 +142,7 @@ class CacheBench {
// Cast uint64* to be char*, data would be copied to cache
Slice key(reinterpret_cast<char*>(&rand_key), 8);
// do insert
auto handle = cache_->Insert(key, new char[10], 1, &deleter);
cache_->Release(handle);
cache_->Insert(key, new char[10], 1, &deleter);
}
}
@ -221,8 +220,7 @@ class CacheBench {
int32_t prob_op = thread->rnd.Uniform(100);
if (prob_op >= 0 && prob_op < FLAGS_insert_percent) {
// do insert
auto handle = cache_->Insert(key, new char[10], 1, &deleter);
cache_->Release(handle);
cache_->Insert(key, new char[10], 1, &deleter);
} else if (prob_op -= FLAGS_insert_percent &&
prob_op < FLAGS_lookup_percent) {
// do lookup

@ -73,8 +73,8 @@ class CacheTest : public testing::Test {
}
void Insert(shared_ptr<Cache> cache, int key, int value, int charge = 1) {
cache->Release(cache->Insert(EncodeKey(key), EncodeValue(value), charge,
&CacheTest::Deleter));
cache->Insert(EncodeKey(key), EncodeValue(value), charge,
&CacheTest::Deleter);
}
void Erase(shared_ptr<Cache> cache, int key) {
@ -118,14 +118,12 @@ TEST_F(CacheTest, UsageTest) {
auto cache = NewLRUCache(kCapacity, 8);
size_t usage = 0;
const char* value = "abcdef";
char value[10] = "abcdef";
// make sure everything will be cached
for (int i = 1; i < 100; ++i) {
std::string key(i, 'a');
auto kv_size = key.size() + 5;
cache->Release(
cache->Insert(key, (void*)value, kv_size, dumbDeleter)
);
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter);
usage += kv_size;
ASSERT_EQ(usage, cache->GetUsage());
}
@ -133,9 +131,8 @@ TEST_F(CacheTest, UsageTest) {
// make sure the cache will be overloaded
for (uint64_t i = 1; i < kCapacity; ++i) {
auto key = ToString(i);
cache->Release(
cache->Insert(key, (void*)value, key.size() + 5, dumbDeleter)
);
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
dumbDeleter);
}
// the usage should be close to the capacity
@ -149,7 +146,7 @@ TEST_F(CacheTest, PinnedUsageTest) {
auto cache = NewLRUCache(kCapacity, 8);
size_t pinned_usage = 0;
const char* value = "abcdef";
char value[10] = "abcdef";
std::forward_list<Cache::Handle*> unreleased_handles;
@ -158,7 +155,9 @@ TEST_F(CacheTest, PinnedUsageTest) {
for (int i = 1; i < 100; ++i) {
std::string key(i, 'a');
auto kv_size = key.size() + 5;
auto handle = cache->Insert(key, (void*)value, kv_size, dumbDeleter);
Cache::Handle* handle;
cache->Insert(key, reinterpret_cast<void*>(value), kv_size, dumbDeleter,
&handle);
pinned_usage += kv_size;
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
if (i % 2 == 0) {
@ -182,8 +181,8 @@ TEST_F(CacheTest, PinnedUsageTest) {
// check that overloading the cache does not change the pinned usage
for (uint64_t i = 1; i < 2 * kCapacity; ++i) {
auto key = ToString(i);
cache->Release(
cache->Insert(key, (void*)value, key.size() + 5, dumbDeleter));
cache->Insert(key, reinterpret_cast<void*>(value), key.size() + 5,
dumbDeleter);
}
ASSERT_EQ(pinned_usage, cache->GetPinnedUsage());
@ -408,7 +407,8 @@ TEST_F(CacheTest, SetCapacity) {
// Insert 5 entries, but not releasing.
for (size_t i = 0; i < 5; i++) {
std::string key = ToString(i+1);
handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter);
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
}
ASSERT_EQ(5U, cache->GetCapacity());
ASSERT_EQ(5U, cache->GetUsage());
@ -422,7 +422,8 @@ TEST_F(CacheTest, SetCapacity) {
// and usage should be 7
for (size_t i = 5; i < 10; i++) {
std::string key = ToString(i+1);
handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter);
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
}
ASSERT_EQ(10U, cache->GetCapacity());
ASSERT_EQ(10U, cache->GetUsage());
@ -441,6 +442,53 @@ TEST_F(CacheTest, SetCapacity) {
}
}
TEST_F(CacheTest, SetStrictCapacityLimit) {
// test1: set the flag to false. Insert more keys than capacity. See if they
// all go through.
std::shared_ptr<Cache> cache = NewLRUCache(5, 0, false);
std::vector<Cache::Handle*> handles(10);
Status s;
for (size_t i = 0; i < 10; i++) {
std::string key = ToString(i + 1);
s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
ASSERT_NE(nullptr, handles[i]);
}
// test2: set the flag to true. Insert and check if it fails.
std::string extra_key = "extra";
Value* extra_value = new Value(0);
cache->SetStrictCapacityLimit(true);
Cache::Handle* handle;
s = cache->Insert(extra_key, extra_value, 1, &deleter, &handle);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_EQ(nullptr, handle);
for (size_t i = 0; i < 10; i++) {
cache->Release(handles[i]);
}
// test3: init with flag being true.
std::shared_ptr<Cache> cache2 = NewLRUCache(5, 0, true);
for (size_t i = 0; i < 5; i++) {
std::string key = ToString(i + 1);
s = cache2->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
ASSERT_NE(nullptr, handles[i]);
}
s = cache2->Insert(extra_key, extra_value, 1, &deleter, &handle);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_EQ(nullptr, handle);
// test insert without handle
s = cache2->Insert(extra_key, extra_value, 1, &deleter);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_EQ(5, cache->GetUsage());
for (size_t i = 0; i < 5; i++) {
cache2->Release(handles[i]);
}
}
TEST_F(CacheTest, OverCapacity) {
size_t n = 10;
@ -452,7 +500,8 @@ TEST_F(CacheTest, OverCapacity) {
// Insert n+1 entries, but not releasing.
for (size_t i = 0; i < n + 1; i++) {
std::string key = ToString(i+1);
handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter);
Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
}
// Guess what's in the cache now?

Loading…
Cancel
Save