From f71fc77b7c58e8863900d7fa469106470bbcaef9 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 10 Mar 2016 17:35:19 -0800 Subject: [PATCH] Cache to have an option to fail Cache::Insert() when full Summary: Cache to have an option to fail Cache::Insert() when full. Update call sites to check status and handle error. I totally have no idea what's correct behavior of all the call sites when they encounter error. Please let me know if you see something wrong or more unit test is needed. Test Plan: make check -j32, see tests pass. Reviewers: anthony, yhchiang, andrewkr, IslamAbdelRahman, kradhakrishnan, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D54705 --- CMakeLists.txt | 1 + HISTORY.md | 1 + Makefile | 4 + db/db_block_cache_test.cc | 237 +++++++++++++++++++++++++++++ db/table_cache.cc | 13 +- include/rocksdb/cache.h | 35 ++++- include/rocksdb/statistics.h | 14 +- table/block_based_table_builder.cc | 6 +- table/block_based_table_reader.cc | 92 ++++++----- util/cache.cc | 117 ++++++++++---- util/cache_bench.cc | 6 +- util/cache_test.cc | 81 ++++++++-- 12 files changed, 504 insertions(+), 103 deletions(-) create mode 100644 db/db_block_cache_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index eda0d703b..16219b96f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -338,6 +338,7 @@ set(TESTS db/db_tailing_iter_test.cc db/db_test.cc db/db_test2.cc + db/db_block_cache_test.cc db/db_universal_compaction_test.cc db/db_wal_test.cc db/dbformat_test.cc diff --git a/HISTORY.md b/HISTORY.md index 567d40d08..64c89e8ee 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Public API Changes * Change default of BlockBasedTableOptions.format_version to 2. It means default DB created by 4.6 or up cannot be opened by RocksDB version 3.9 or earlier. +* Added strict_capacity_limit option to NewLRUCache. If the flag is set to true, insert to cache will fail if no enough capacity can be free. Signiture of Cache::Insert() is updated accordingly. ### New Features * Add CompactionPri::kMinOverlappingRatio, a compaction picking mode friendly to write amplification. * Deprecate Iterator::IsKeyPinned() and replace it with Iterator::GetProperty() with prop_name="rocksdb.iterator.is.key.pinned" diff --git a/Makefile b/Makefile index cfe70bea9..a1e321f83 100644 --- a/Makefile +++ b/Makefile @@ -246,6 +246,7 @@ BENCHTOOLOBJECTS = $(BENCH_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL) TESTS = \ db_test \ db_test2 \ + db_block_cache_test \ db_iter_test \ db_log_iter_test \ db_compaction_filter_test \ @@ -794,6 +795,9 @@ db_test: db/db_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_test2: db/db_test2.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_block_cache_test: db/db_block_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_log_iter_test: db/db_log_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc new file mode 100644 index 000000000..18fb5b2ee --- /dev/null +++ b/db/db_block_cache_test.cc @@ -0,0 +1,237 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include "db/db_test_util.h" +#include "port/stack_trace.h" + +namespace rocksdb { + +static uint64_t TestGetTickerCount(const Options& options, + Tickers ticker_type) { + return options.statistics->getTickerCount(ticker_type); +} + +class DBBlockCacheTest : public DBTestBase { + private: + size_t miss_count_ = 0; + size_t hit_count_ = 0; + size_t insert_count_ = 0; + size_t failure_count_ = 0; + size_t compressed_miss_count_ = 0; + size_t compressed_hit_count_ = 0; + size_t compressed_insert_count_ = 0; + size_t compressed_failure_count_ = 0; + + public: + const size_t kNumBlocks = 10; + const size_t kValueSize = 100; + + DBBlockCacheTest() : DBTestBase("/db_block_cache_test") {} + + BlockBasedTableOptions GetTableOptions() { + BlockBasedTableOptions table_options; + // Set a small enough block size so that each key-value get its own block. + table_options.block_size = 1; + return table_options; + } + + Options GetOptions(const BlockBasedTableOptions& table_options) { + Options options = CurrentOptions(); + options.create_if_missing = true; + // options.compression = kNoCompression; + options.statistics = rocksdb::CreateDBStatistics(); + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + return options; + } + + void InitTable(const Options& options) { + std::string value(kValueSize, 'a'); + for (size_t i = 0; i < kNumBlocks; i++) { + ASSERT_OK(Put(ToString(i), value.c_str())); + } + } + + void RecordCacheCounters(const Options& options) { + miss_count_ = TestGetTickerCount(options, BLOCK_CACHE_MISS); + hit_count_ = TestGetTickerCount(options, BLOCK_CACHE_HIT); + insert_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD); + failure_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES); + compressed_miss_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS); + compressed_hit_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT); + compressed_insert_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD); + compressed_failure_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); + } + + void CheckCacheCounters(const Options& options, size_t expected_misses, + size_t expected_hits, size_t expected_inserts, + size_t expected_failures) { + size_t new_miss_count = TestGetTickerCount(options, BLOCK_CACHE_MISS); + size_t new_hit_count = TestGetTickerCount(options, BLOCK_CACHE_HIT); + size_t new_insert_count = TestGetTickerCount(options, BLOCK_CACHE_ADD); + size_t new_failure_count = + TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES); + ASSERT_EQ(miss_count_ + expected_misses, new_miss_count); + ASSERT_EQ(hit_count_ + expected_hits, new_hit_count); + ASSERT_EQ(insert_count_ + expected_inserts, new_insert_count); + ASSERT_EQ(failure_count_ + expected_failures, new_failure_count); + miss_count_ = new_miss_count; + hit_count_ = new_hit_count; + insert_count_ = new_insert_count; + failure_count_ = new_failure_count; + } + + void CheckCompressedCacheCounters(const Options& options, + size_t expected_misses, + size_t expected_hits, + size_t expected_inserts, + size_t expected_failures) { + size_t new_miss_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS); + size_t new_hit_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT); + size_t new_insert_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD); + size_t new_failure_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); + ASSERT_EQ(compressed_miss_count_ + expected_misses, new_miss_count); + ASSERT_EQ(compressed_hit_count_ + expected_hits, new_hit_count); + ASSERT_EQ(compressed_insert_count_ + expected_inserts, new_insert_count); + ASSERT_EQ(compressed_failure_count_ + expected_failures, new_failure_count); + compressed_miss_count_ = new_miss_count; + compressed_hit_count_ = new_hit_count; + compressed_insert_count_ = new_insert_count; + compressed_failure_count_ = new_failure_count; + } +}; + +TEST_F(DBBlockCacheTest, TestWithoutCompressedBlockCache) { + ReadOptions read_options; + auto table_options = GetTableOptions(); + auto options = GetOptions(table_options); + InitTable(options); + + std::shared_ptr cache = NewLRUCache(0, 0, false); + table_options.block_cache = cache; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + Reopen(options); + RecordCacheCounters(options); + + std::vector> iterators(kNumBlocks - 1); + Iterator* iter = nullptr; + + // Load blocks into cache. + for (size_t i = 0; i < kNumBlocks - 1; i++) { + iter = db_->NewIterator(read_options); + iter->Seek(ToString(i)); + ASSERT_OK(iter->status()); + CheckCacheCounters(options, 1, 0, 1, 0); + iterators[i].reset(iter); + } + size_t usage = cache->GetUsage(); + ASSERT_LT(0, usage); + cache->SetCapacity(usage); + ASSERT_EQ(usage, cache->GetPinnedUsage()); + + // Test with strict capacity limit. + cache->SetStrictCapacityLimit(true); + iter = db_->NewIterator(read_options); + iter->Seek(ToString(kNumBlocks - 1)); + ASSERT_TRUE(iter->status().IsIncomplete()); + CheckCacheCounters(options, 1, 0, 0, 1); + delete iter; + iter = nullptr; + + // Release interators and access cache again. + for (size_t i = 0; i < kNumBlocks - 1; i++) { + iterators[i].reset(); + CheckCacheCounters(options, 0, 0, 0, 0); + } + ASSERT_EQ(0, cache->GetPinnedUsage()); + for (size_t i = 0; i < kNumBlocks - 1; i++) { + iter = db_->NewIterator(read_options); + iter->Seek(ToString(i)); + ASSERT_OK(iter->status()); + CheckCacheCounters(options, 0, 1, 0, 0); + iterators[i].reset(iter); + } +} + +TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) { + ReadOptions read_options; + auto table_options = GetTableOptions(); + auto options = GetOptions(table_options); + InitTable(options); + + std::shared_ptr cache = NewLRUCache(0, 0, false); + std::shared_ptr compressed_cache = NewLRUCache(0, 0, false); + table_options.block_cache = cache; + table_options.block_cache_compressed = compressed_cache; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + Reopen(options); + RecordCacheCounters(options); + + std::vector> iterators(kNumBlocks - 1); + Iterator* iter = nullptr; + + // Load blocks into cache. + for (size_t i = 0; i < kNumBlocks - 1; i++) { + iter = db_->NewIterator(read_options); + iter->Seek(ToString(i)); + ASSERT_OK(iter->status()); + CheckCacheCounters(options, 1, 0, 1, 0); + CheckCompressedCacheCounters(options, 1, 0, 1, 0); + iterators[i].reset(iter); + } + size_t usage = cache->GetUsage(); + ASSERT_LT(0, usage); + ASSERT_EQ(usage, cache->GetPinnedUsage()); + size_t compressed_usage = compressed_cache->GetUsage(); + ASSERT_LT(0, compressed_usage); + // Compressed block cache cannot be pinned. + ASSERT_EQ(0, compressed_cache->GetPinnedUsage()); + + // Set strict capacity limit flag. Now block will only load into compressed + // block cache. + cache->SetCapacity(usage); + cache->SetStrictCapacityLimit(true); + ASSERT_EQ(usage, cache->GetPinnedUsage()); + // compressed_cache->SetCapacity(compressed_usage); + compressed_cache->SetCapacity(0); + // compressed_cache->SetStrictCapacityLimit(true); + iter = db_->NewIterator(read_options); + iter->Seek(ToString(kNumBlocks - 1)); + ASSERT_TRUE(iter->status().IsIncomplete()); + CheckCacheCounters(options, 1, 0, 0, 1); + CheckCompressedCacheCounters(options, 1, 0, 1, 0); + delete iter; + iter = nullptr; + + // Clear strict capacity limit flag. This time we shall hit compressed block + // cache. + cache->SetStrictCapacityLimit(false); + iter = db_->NewIterator(read_options); + iter->Seek(ToString(kNumBlocks - 1)); + ASSERT_OK(iter->status()); + CheckCacheCounters(options, 1, 0, 1, 0); + CheckCompressedCacheCounters(options, 0, 1, 0, 0); + delete iter; + iter = nullptr; +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/table_cache.cc b/db/table_cache.cc index 53e35d3a0..2a4621b7e 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -143,8 +143,12 @@ Status TableCache::FindTable(const EnvOptions& env_options, // We do not cache error results so that if the error is transient, // or somebody repairs the file, we recover automatically. } else { - *handle = cache_->Insert(key, table_reader.release(), 1, - &DeleteEntry); + s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry, + handle); + if (s.ok()) { + // Release ownership of table reader. + table_reader.release(); + } } } return s; @@ -285,9 +289,8 @@ Status TableCache::Get(const ReadOptions& options, size_t charge = row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string); void* row_ptr = new std::string(std::move(*row_cache_entry)); - auto row_handle = ioptions_.row_cache->Insert( - row_cache_key.GetKey(), row_ptr, charge, &DeleteEntry); - ioptions_.row_cache->Release(row_handle); + ioptions_.row_cache->Insert(row_cache_key.GetKey(), row_ptr, charge, + &DeleteEntry); } #endif // ROCKSDB_LITE diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 30d9c67d3..327270e34 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -25,6 +25,7 @@ #include #include #include "rocksdb/slice.h" +#include "rocksdb/status.h" namespace rocksdb { @@ -33,12 +34,15 @@ using std::shared_ptr; class Cache; // Create a new cache with a fixed size capacity. The cache is sharded -// to 2^numShardBits shards, by hash of the key. The total capacity +// to 2^num_shard_bits shards, by hash of the key. The total capacity // is divided and evenly assigned to each shard. // -// The functions without parameter numShardBits uses default value, which is 4 +// The parameter num_shard_bits defaults to 4, and strict_capacity_limit +// defaults to false. extern shared_ptr NewLRUCache(size_t capacity); -extern shared_ptr NewLRUCache(size_t capacity, int numShardBits); +extern shared_ptr NewLRUCache(size_t capacity, int num_shard_bits); +extern shared_ptr NewLRUCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit); class Cache { public: @@ -55,15 +59,22 @@ class Cache { // Insert a mapping from key->value into the cache and assign it // the specified charge against the total cache capacity. + // If strict_capacity_limit is true and cache reaches its full capacity, + // return Status::Incomplete. // - // Returns a handle that corresponds to the mapping. The caller - // must call this->Release(handle) when the returned mapping is no - // longer needed. + // If handle is not nullptr, returns a handle that corresponds to the + // mapping. The caller must call this->Release(handle) when the returned + // mapping is no longer needed. In case of error caller is responsible to + // cleanup the value (i.e. calling "deleter"). + // + // If handle is nullptr, it is as if Release is called immediately after + // insert. In case of error value will be cleanup. // // When the inserted entry is no longer needed, the key and // value will be passed to "deleter". - virtual Handle* Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value)) = 0; + virtual Status Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Handle** handle = nullptr) = 0; // If the cache has no mapping for "key", returns nullptr. // @@ -100,6 +111,14 @@ class Cache { // purge the released entries from the cache in order to lower the usage virtual void SetCapacity(size_t capacity) = 0; + // Set whether to return error on insertion when cache reaches its full + // capacity. + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; + + // Set whether to return error on insertion when cache reaches its full + // capacity. + virtual bool HasStrictCapacityLimit() const = 0; + // returns the maximum configured capacity of the cache virtual size_t GetCapacity() const = 0; diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index c16c3a7ca..c832516da 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -33,6 +33,8 @@ enum Tickers : uint32_t { BLOCK_CACHE_HIT, // # of blocks added to block cache. BLOCK_CACHE_ADD, + // # of failures when adding blocks to block cache. + BLOCK_CACHE_ADD_FAILURES, // # of times cache miss when accessing index block from block cache. BLOCK_CACHE_INDEX_MISS, // # of times cache hit when accessing index block from block cache. @@ -140,8 +142,12 @@ enum Tickers : uint32_t { GET_UPDATES_SINCE_CALLS, BLOCK_CACHE_COMPRESSED_MISS, // miss in the compressed block cache BLOCK_CACHE_COMPRESSED_HIT, // hit in the compressed block cache - WAL_FILE_SYNCED, // Number of times WAL sync is done - WAL_FILE_BYTES, // Number of bytes written to WAL + // Number of blocks added to comopressed block cache + BLOCK_CACHE_COMPRESSED_ADD, + // Number of failures when adding blocks to compressed block cache + BLOCK_CACHE_COMPRESSED_ADD_FAILURES, + WAL_FILE_SYNCED, // Number of times WAL sync is done + WAL_FILE_BYTES, // Number of bytes written to WAL // Writes can be processed by requesting thread or by the thread at the // head of the writers queue. @@ -176,6 +182,7 @@ const std::vector> TickersNameMap = { {BLOCK_CACHE_MISS, "rocksdb.block.cache.miss"}, {BLOCK_CACHE_HIT, "rocksdb.block.cache.hit"}, {BLOCK_CACHE_ADD, "rocksdb.block.cache.add"}, + {BLOCK_CACHE_ADD_FAILURES, "rocksdb.block.cache.add.failures"}, {BLOCK_CACHE_INDEX_MISS, "rocksdb.block.cache.index.miss"}, {BLOCK_CACHE_INDEX_HIT, "rocksdb.block.cache.index.hit"}, {BLOCK_CACHE_FILTER_MISS, "rocksdb.block.cache.filter.miss"}, @@ -227,6 +234,9 @@ const std::vector> TickersNameMap = { {GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls"}, {BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss"}, {BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit"}, + {BLOCK_CACHE_COMPRESSED_ADD, "rocksdb.block.cachecompressed.add"}, + {BLOCK_CACHE_COMPRESSED_ADD_FAILURES, + "rocksdb.block.cachecompressed.add.failures"}, {WAL_FILE_SYNCED, "rocksdb.wal.synced"}, {WAL_FILE_BYTES, "rocksdb.wal.bytes"}, {WRITE_DONE_BY_SELF, "rocksdb.write.self"}, diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 1484acb51..47d74bc5f 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -703,7 +703,6 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, if (type != kNoCompression && block_cache_compressed != nullptr) { - Cache::Handle* cache_handle = nullptr; size_t size = block_contents.size(); std::unique_ptr ubuf(new char[size + 1]); @@ -723,9 +722,8 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, (end - r->compressed_cache_key_prefix)); // Insert into compressed block cache. - cache_handle = block_cache_compressed->Insert( - key, block, block->usable_size(), &DeleteCachedBlock); - block_cache_compressed->Release(cache_handle); + block_cache_compressed->Insert(key, block, block->usable_size(), + &DeleteCachedBlock); // Invalidate OS cache. r->file->InvalidateCache(static_cast(r->offset), size); diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 42c5aa494..cbaf90a90 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -740,11 +740,16 @@ Status BlockBasedTable::GetDataBlockFromCache( assert(block->value->compression_type() == kNoCompression); if (block_cache != nullptr && block->value->cachable() && read_options.fill_cache) { - block->cache_handle = block_cache->Insert(block_cache_key, block->value, - block->value->usable_size(), - &DeleteCachedEntry); - assert(reinterpret_cast( - block_cache->Value(block->cache_handle)) == block->value); + s = block_cache->Insert( + block_cache_key, block->value, block->value->usable_size(), + &DeleteCachedEntry, &(block->cache_handle)); + if (s.ok()) { + RecordTick(statistics, BLOCK_CACHE_ADD); + } else { + RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); + delete block->value; + block->value = nullptr; + } } } @@ -784,27 +789,37 @@ Status BlockBasedTable::PutDataBlockToCache( // Release the hold on the compressed cache entry immediately. if (block_cache_compressed != nullptr && raw_block != nullptr && raw_block->cachable()) { - auto cache_handle = block_cache_compressed->Insert( - compressed_block_cache_key, raw_block, raw_block->usable_size(), - &DeleteCachedEntry); - block_cache_compressed->Release(cache_handle); - RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); - // Avoid the following code to delete this cached block. - raw_block = nullptr; + s = block_cache_compressed->Insert(compressed_block_cache_key, raw_block, + raw_block->usable_size(), + &DeleteCachedEntry); + if (s.ok()) { + // Avoid the following code to delete this cached block. + raw_block = nullptr; + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD); + } else { + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); + } } delete raw_block; // insert into uncompressed block cache assert((block->value->compression_type() == kNoCompression)); if (block_cache != nullptr && block->value->cachable()) { - block->cache_handle = block_cache->Insert(block_cache_key, block->value, - block->value->usable_size(), - &DeleteCachedEntry); - RecordTick(statistics, BLOCK_CACHE_ADD); - RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, - block->value->usable_size()); - assert(reinterpret_cast(block_cache->Value(block->cache_handle)) == - block->value); + s = block_cache->Insert(block_cache_key, block->value, + block->value->usable_size(), + &DeleteCachedEntry, &(block->cache_handle)); + if (s.ok()) { + assert(block->cache_handle != nullptr); + RecordTick(statistics, BLOCK_CACHE_ADD); + RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, + block->value->usable_size()); + assert(reinterpret_cast( + block_cache->Value(block->cache_handle)) == block->value); + } else { + RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); + delete block->value; + block->value = nullptr; + } } return s; @@ -891,10 +906,17 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( filter = ReadFilter(rep_, &filter_size); if (filter != nullptr) { assert(filter_size > 0); - cache_handle = block_cache->Insert(key, filter, filter_size, - &DeleteCachedEntry); - RecordTick(statistics, BLOCK_CACHE_ADD); - RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter_size); + Status s = block_cache->Insert(key, filter, filter_size, + &DeleteCachedEntry, + &cache_handle); + if (s.ok()) { + RecordTick(statistics, BLOCK_CACHE_ADD); + RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter_size); + } else { + RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); + delete filter; + return CachableEntry(); + } } } @@ -937,10 +959,18 @@ InternalIterator* BlockBasedTable::NewIndexIterator( // Create index reader and put it in the cache. Status s; s = CreateIndexReader(&index_reader); + if (s.ok()) { + s = block_cache->Insert(key, index_reader, index_reader->usable_size(), + &DeleteCachedEntry, &cache_handle); + } - if (!s.ok()) { + if (s.ok()) { + RecordTick(statistics, BLOCK_CACHE_ADD); + RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, + index_reader->usable_size()); + } else { + RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); // make sure if something goes wrong, index_reader shall remain intact. - assert(index_reader == nullptr); if (input_iter != nullptr) { input_iter->SetStatus(s); return input_iter; @@ -949,12 +979,6 @@ InternalIterator* BlockBasedTable::NewIndexIterator( } } - cache_handle = - block_cache->Insert(key, index_reader, index_reader->usable_size(), - &DeleteCachedEntry); - RecordTick(statistics, BLOCK_CACHE_ADD); - RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, - index_reader->usable_size()); } assert(cache_handle); @@ -1036,7 +1060,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } // Didn't get any data from block caches. - if (block.value == nullptr) { + if (s.ok() && block.value == nullptr) { if (no_io) { // Could not read from block_cache and can't do IO if (input_iter != nullptr) { @@ -1055,7 +1079,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } InternalIterator* iter; - if (block.value != nullptr) { + if (s.ok() && block.value != nullptr) { iter = block.value->NewIterator(&rep->internal_comparator, input_iter); if (block.cache_handle != nullptr) { iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, diff --git a/util/cache.cc b/util/cache.cc index 078b10e1a..6015644f6 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -196,10 +196,13 @@ class LRUCache { // free the needed space void SetCapacity(size_t capacity); + // Set the flag to reject insertion if cache if full. + void SetStrictCapacityLimit(bool strict_capacity_limit); + // Like Cache methods, but with an extra "hash" parameter. - Cache::Handle* Insert(const Slice& key, uint32_t hash, - void* value, size_t charge, - void (*deleter)(const Slice& key, void* value)); + Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle); Cache::Handle* Lookup(const Slice& key, uint32_t hash); void Release(Cache::Handle* handle); void Erase(const Slice& key, uint32_t hash); @@ -245,6 +248,9 @@ class LRUCache { // Memory size for entries residing only in the LRU list size_t lru_usage_; + // Whether to reject insertion if cache reaches its full capacity. + bool strict_capacity_limit_; + // mutex_ protects the following state. // We don't count mutex_ as the cache's internal state so semantically we // don't mind mutex_ invoking the non-const actions. @@ -336,6 +342,11 @@ void LRUCache::SetCapacity(size_t capacity) { } } +void LRUCache::SetStrictCapacityLimit(bool strict_capacity_limit) { + MutexLock l(&mutex_); + strict_capacity_limit_ = strict_capacity_limit; +} + Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) { MutexLock l(&mutex_); LRUHandle* e = table_.Lookup(key, hash); @@ -350,6 +361,9 @@ Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) { } void LRUCache::Release(Cache::Handle* handle) { + if (handle == nullptr) { + return; + } LRUHandle* e = reinterpret_cast(handle); bool last_reference = false; { @@ -383,15 +397,16 @@ void LRUCache::Release(Cache::Handle* handle) { } } -Cache::Handle* LRUCache::Insert( - const Slice& key, uint32_t hash, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value)) { - +Status LRUCache::Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle) { // Allocate the memory here outside of the mutex // If the cache is full, we'll have to release it // It shouldn't happen very often though. LRUHandle* e = reinterpret_cast( new char[sizeof(LRUHandle) - 1 + key.size()]); + Status s; autovector last_reference_list; e->value = value; @@ -399,7 +414,9 @@ Cache::Handle* LRUCache::Insert( e->charge = charge; e->key_length = key.size(); e->hash = hash; - e->refs = 2; // One from LRUCache, one for the returned handle + e->refs = (handle == nullptr + ? 1 + : 2); // One from LRUCache, one for the returned handle e->next = e->prev = nullptr; e->in_cache = true; memcpy(e->key_data, key.data(), key.size()); @@ -411,20 +428,36 @@ Cache::Handle* LRUCache::Insert( // is freed or the lru list is empty EvictFromLRU(charge, &last_reference_list); - // insert into the cache - // note that the cache might get larger than its capacity if not enough - // space was freed - LRUHandle* old = table_.Insert(e); - usage_ += e->charge; - if (old != nullptr) { - old->in_cache = false; - if (Unref(old)) { - usage_ -= old->charge; - // old is on LRU because it's in cache and its reference count - // was just 1 (Unref returned 0) - LRU_Remove(old); - last_reference_list.push_back(old); + if (strict_capacity_limit_ && usage_ - lru_usage_ + charge > capacity_) { + if (handle == nullptr) { + last_reference_list.push_back(e); + } else { + delete[] reinterpret_cast(e); + *handle = nullptr; + } + s = Status::Incomplete("Insert failed due to LRU cache being full."); + } else { + // insert into the cache + // note that the cache might get larger than its capacity if not enough + // space was freed + LRUHandle* old = table_.Insert(e); + usage_ += e->charge; + if (old != nullptr) { + old->in_cache = false; + if (Unref(old)) { + usage_ -= old->charge; + // old is on LRU because it's in cache and its reference count + // was just 1 (Unref returned 0) + LRU_Remove(old); + last_reference_list.push_back(old); + } + } + if (handle == nullptr) { + LRU_Append(e); + } else { + *handle = reinterpret_cast(e); } + s = Status::OK(); } } @@ -434,7 +467,7 @@ Cache::Handle* LRUCache::Insert( entry->Free(); } - return reinterpret_cast(e); + return s; } void LRUCache::Erase(const Slice& key, uint32_t hash) { @@ -472,6 +505,7 @@ class ShardedLRUCache : public Cache { uint64_t last_id_; int num_shard_bits_; size_t capacity_; + bool strict_capacity_limit_; static inline uint32_t HashSlice(const Slice& s) { return Hash(s.data(), s.size(), 0); @@ -483,13 +517,18 @@ class ShardedLRUCache : public Cache { } public: - ShardedLRUCache(size_t capacity, int num_shard_bits) - : last_id_(0), num_shard_bits_(num_shard_bits), capacity_(capacity) { + ShardedLRUCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) + : last_id_(0), + num_shard_bits_(num_shard_bits), + capacity_(capacity), + strict_capacity_limit_(strict_capacity_limit) { int num_shards = 1 << num_shard_bits_; shards_ = new LRUCache[num_shards]; const size_t per_shard = (capacity + (num_shards - 1)) / num_shards; for (int s = 0; s < num_shards; s++) { shards_[s].SetCapacity(per_shard); + shards_[s].SetStrictCapacityLimit(strict_capacity_limit); } } virtual ~ShardedLRUCache() { @@ -504,11 +543,19 @@ class ShardedLRUCache : public Cache { } capacity_ = capacity; } - virtual Handle* Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, - void* value)) override { + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + shards_[s].SetStrictCapacityLimit(strict_capacity_limit); + } + strict_capacity_limit_ = strict_capacity_limit; + } + virtual Status Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Handle** handle) override { const uint32_t hash = HashSlice(key); - return shards_[Shard(hash)].Insert(key, hash, value, charge, deleter); + return shards_[Shard(hash)].Insert(key, hash, value, charge, deleter, + handle); } virtual Handle* Lookup(const Slice& key) override { const uint32_t hash = HashSlice(key); @@ -531,6 +578,10 @@ class ShardedLRUCache : public Cache { } virtual size_t GetCapacity() const override { return capacity_; } + virtual bool HasStrictCapacityLimit() const override { + return strict_capacity_limit_; + } + virtual size_t GetUsage() const override { // We will not lock the cache when getting the usage from shards. int num_shards = 1 << num_shard_bits_; @@ -569,14 +620,20 @@ class ShardedLRUCache : public Cache { } // end anonymous namespace shared_ptr NewLRUCache(size_t capacity) { - return NewLRUCache(capacity, kNumShardBits); + return NewLRUCache(capacity, kNumShardBits, false); } shared_ptr NewLRUCache(size_t capacity, int num_shard_bits) { + return NewLRUCache(capacity, num_shard_bits, false); +} + +shared_ptr NewLRUCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) { if (num_shard_bits >= 20) { return nullptr; // the cache cannot be sharded into too many fine pieces } - return std::make_shared(capacity, num_shard_bits); + return std::make_shared(capacity, num_shard_bits, + strict_capacity_limit); } } // namespace rocksdb diff --git a/util/cache_bench.cc b/util/cache_bench.cc index 0e0d70d62..266c9e1c5 100644 --- a/util/cache_bench.cc +++ b/util/cache_bench.cc @@ -142,8 +142,7 @@ class CacheBench { // Cast uint64* to be char*, data would be copied to cache Slice key(reinterpret_cast(&rand_key), 8); // do insert - auto handle = cache_->Insert(key, new char[10], 1, &deleter); - cache_->Release(handle); + cache_->Insert(key, new char[10], 1, &deleter); } } @@ -221,8 +220,7 @@ class CacheBench { int32_t prob_op = thread->rnd.Uniform(100); if (prob_op >= 0 && prob_op < FLAGS_insert_percent) { // do insert - auto handle = cache_->Insert(key, new char[10], 1, &deleter); - cache_->Release(handle); + cache_->Insert(key, new char[10], 1, &deleter); } else if (prob_op -= FLAGS_insert_percent && prob_op < FLAGS_lookup_percent) { // do lookup diff --git a/util/cache_test.cc b/util/cache_test.cc index d49cd4fdf..3df71c098 100644 --- a/util/cache_test.cc +++ b/util/cache_test.cc @@ -73,8 +73,8 @@ class CacheTest : public testing::Test { } void Insert(shared_ptr cache, int key, int value, int charge = 1) { - cache->Release(cache->Insert(EncodeKey(key), EncodeValue(value), charge, - &CacheTest::Deleter)); + cache->Insert(EncodeKey(key), EncodeValue(value), charge, + &CacheTest::Deleter); } void Erase(shared_ptr cache, int key) { @@ -118,14 +118,12 @@ TEST_F(CacheTest, UsageTest) { auto cache = NewLRUCache(kCapacity, 8); size_t usage = 0; - const char* value = "abcdef"; + char value[10] = "abcdef"; // make sure everything will be cached for (int i = 1; i < 100; ++i) { std::string key(i, 'a'); auto kv_size = key.size() + 5; - cache->Release( - cache->Insert(key, (void*)value, kv_size, dumbDeleter) - ); + cache->Insert(key, reinterpret_cast(value), kv_size, dumbDeleter); usage += kv_size; ASSERT_EQ(usage, cache->GetUsage()); } @@ -133,9 +131,8 @@ TEST_F(CacheTest, UsageTest) { // make sure the cache will be overloaded for (uint64_t i = 1; i < kCapacity; ++i) { auto key = ToString(i); - cache->Release( - cache->Insert(key, (void*)value, key.size() + 5, dumbDeleter) - ); + cache->Insert(key, reinterpret_cast(value), key.size() + 5, + dumbDeleter); } // the usage should be close to the capacity @@ -149,7 +146,7 @@ TEST_F(CacheTest, PinnedUsageTest) { auto cache = NewLRUCache(kCapacity, 8); size_t pinned_usage = 0; - const char* value = "abcdef"; + char value[10] = "abcdef"; std::forward_list unreleased_handles; @@ -158,7 +155,9 @@ TEST_F(CacheTest, PinnedUsageTest) { for (int i = 1; i < 100; ++i) { std::string key(i, 'a'); auto kv_size = key.size() + 5; - auto handle = cache->Insert(key, (void*)value, kv_size, dumbDeleter); + Cache::Handle* handle; + cache->Insert(key, reinterpret_cast(value), kv_size, dumbDeleter, + &handle); pinned_usage += kv_size; ASSERT_EQ(pinned_usage, cache->GetPinnedUsage()); if (i % 2 == 0) { @@ -182,8 +181,8 @@ TEST_F(CacheTest, PinnedUsageTest) { // check that overloading the cache does not change the pinned usage for (uint64_t i = 1; i < 2 * kCapacity; ++i) { auto key = ToString(i); - cache->Release( - cache->Insert(key, (void*)value, key.size() + 5, dumbDeleter)); + cache->Insert(key, reinterpret_cast(value), key.size() + 5, + dumbDeleter); } ASSERT_EQ(pinned_usage, cache->GetPinnedUsage()); @@ -408,7 +407,8 @@ TEST_F(CacheTest, SetCapacity) { // Insert 5 entries, but not releasing. for (size_t i = 0; i < 5; i++) { std::string key = ToString(i+1); - handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter); + Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); } ASSERT_EQ(5U, cache->GetCapacity()); ASSERT_EQ(5U, cache->GetUsage()); @@ -422,7 +422,8 @@ TEST_F(CacheTest, SetCapacity) { // and usage should be 7 for (size_t i = 5; i < 10; i++) { std::string key = ToString(i+1); - handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter); + Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); } ASSERT_EQ(10U, cache->GetCapacity()); ASSERT_EQ(10U, cache->GetUsage()); @@ -441,6 +442,53 @@ TEST_F(CacheTest, SetCapacity) { } } +TEST_F(CacheTest, SetStrictCapacityLimit) { + // test1: set the flag to false. Insert more keys than capacity. See if they + // all go through. + std::shared_ptr cache = NewLRUCache(5, 0, false); + std::vector handles(10); + Status s; + for (size_t i = 0; i < 10; i++) { + std::string key = ToString(i + 1); + s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); + ASSERT_NE(nullptr, handles[i]); + } + + // test2: set the flag to true. Insert and check if it fails. + std::string extra_key = "extra"; + Value* extra_value = new Value(0); + cache->SetStrictCapacityLimit(true); + Cache::Handle* handle; + s = cache->Insert(extra_key, extra_value, 1, &deleter, &handle); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_EQ(nullptr, handle); + + for (size_t i = 0; i < 10; i++) { + cache->Release(handles[i]); + } + + // test3: init with flag being true. + std::shared_ptr cache2 = NewLRUCache(5, 0, true); + for (size_t i = 0; i < 5; i++) { + std::string key = ToString(i + 1); + s = cache2->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); + ASSERT_NE(nullptr, handles[i]); + } + s = cache2->Insert(extra_key, extra_value, 1, &deleter, &handle); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_EQ(nullptr, handle); + // test insert without handle + s = cache2->Insert(extra_key, extra_value, 1, &deleter); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_EQ(5, cache->GetUsage()); + + for (size_t i = 0; i < 5; i++) { + cache2->Release(handles[i]); + } +} + TEST_F(CacheTest, OverCapacity) { size_t n = 10; @@ -452,7 +500,8 @@ TEST_F(CacheTest, OverCapacity) { // Insert n+1 entries, but not releasing. for (size_t i = 0; i < n + 1; i++) { std::string key = ToString(i+1); - handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter); + Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); } // Guess what's in the cache now?