diff --git a/CMakeLists.txt b/CMakeLists.txt index eda0d703b..16219b96f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -338,6 +338,7 @@ set(TESTS db/db_tailing_iter_test.cc db/db_test.cc db/db_test2.cc + db/db_block_cache_test.cc db/db_universal_compaction_test.cc db/db_wal_test.cc db/dbformat_test.cc diff --git a/HISTORY.md b/HISTORY.md index 567d40d08..64c89e8ee 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Public API Changes * Change default of BlockBasedTableOptions.format_version to 2. It means default DB created by 4.6 or up cannot be opened by RocksDB version 3.9 or earlier. +* Added strict_capacity_limit option to NewLRUCache. If the flag is set to true, insert to cache will fail if no enough capacity can be free. Signiture of Cache::Insert() is updated accordingly. ### New Features * Add CompactionPri::kMinOverlappingRatio, a compaction picking mode friendly to write amplification. * Deprecate Iterator::IsKeyPinned() and replace it with Iterator::GetProperty() with prop_name="rocksdb.iterator.is.key.pinned" diff --git a/Makefile b/Makefile index cfe70bea9..a1e321f83 100644 --- a/Makefile +++ b/Makefile @@ -246,6 +246,7 @@ BENCHTOOLOBJECTS = $(BENCH_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL) TESTS = \ db_test \ db_test2 \ + db_block_cache_test \ db_iter_test \ db_log_iter_test \ db_compaction_filter_test \ @@ -794,6 +795,9 @@ db_test: db/db_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_test2: db/db_test2.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_block_cache_test: db/db_block_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_log_iter_test: db/db_log_iter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc new file mode 100644 index 000000000..18fb5b2ee --- /dev/null +++ b/db/db_block_cache_test.cc @@ -0,0 +1,237 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include "db/db_test_util.h" +#include "port/stack_trace.h" + +namespace rocksdb { + +static uint64_t TestGetTickerCount(const Options& options, + Tickers ticker_type) { + return options.statistics->getTickerCount(ticker_type); +} + +class DBBlockCacheTest : public DBTestBase { + private: + size_t miss_count_ = 0; + size_t hit_count_ = 0; + size_t insert_count_ = 0; + size_t failure_count_ = 0; + size_t compressed_miss_count_ = 0; + size_t compressed_hit_count_ = 0; + size_t compressed_insert_count_ = 0; + size_t compressed_failure_count_ = 0; + + public: + const size_t kNumBlocks = 10; + const size_t kValueSize = 100; + + DBBlockCacheTest() : DBTestBase("/db_block_cache_test") {} + + BlockBasedTableOptions GetTableOptions() { + BlockBasedTableOptions table_options; + // Set a small enough block size so that each key-value get its own block. + table_options.block_size = 1; + return table_options; + } + + Options GetOptions(const BlockBasedTableOptions& table_options) { + Options options = CurrentOptions(); + options.create_if_missing = true; + // options.compression = kNoCompression; + options.statistics = rocksdb::CreateDBStatistics(); + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + return options; + } + + void InitTable(const Options& options) { + std::string value(kValueSize, 'a'); + for (size_t i = 0; i < kNumBlocks; i++) { + ASSERT_OK(Put(ToString(i), value.c_str())); + } + } + + void RecordCacheCounters(const Options& options) { + miss_count_ = TestGetTickerCount(options, BLOCK_CACHE_MISS); + hit_count_ = TestGetTickerCount(options, BLOCK_CACHE_HIT); + insert_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD); + failure_count_ = TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES); + compressed_miss_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS); + compressed_hit_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT); + compressed_insert_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD); + compressed_failure_count_ = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); + } + + void CheckCacheCounters(const Options& options, size_t expected_misses, + size_t expected_hits, size_t expected_inserts, + size_t expected_failures) { + size_t new_miss_count = TestGetTickerCount(options, BLOCK_CACHE_MISS); + size_t new_hit_count = TestGetTickerCount(options, BLOCK_CACHE_HIT); + size_t new_insert_count = TestGetTickerCount(options, BLOCK_CACHE_ADD); + size_t new_failure_count = + TestGetTickerCount(options, BLOCK_CACHE_ADD_FAILURES); + ASSERT_EQ(miss_count_ + expected_misses, new_miss_count); + ASSERT_EQ(hit_count_ + expected_hits, new_hit_count); + ASSERT_EQ(insert_count_ + expected_inserts, new_insert_count); + ASSERT_EQ(failure_count_ + expected_failures, new_failure_count); + miss_count_ = new_miss_count; + hit_count_ = new_hit_count; + insert_count_ = new_insert_count; + failure_count_ = new_failure_count; + } + + void CheckCompressedCacheCounters(const Options& options, + size_t expected_misses, + size_t expected_hits, + size_t expected_inserts, + size_t expected_failures) { + size_t new_miss_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS); + size_t new_hit_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT); + size_t new_insert_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD); + size_t new_failure_count = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); + ASSERT_EQ(compressed_miss_count_ + expected_misses, new_miss_count); + ASSERT_EQ(compressed_hit_count_ + expected_hits, new_hit_count); + ASSERT_EQ(compressed_insert_count_ + expected_inserts, new_insert_count); + ASSERT_EQ(compressed_failure_count_ + expected_failures, new_failure_count); + compressed_miss_count_ = new_miss_count; + compressed_hit_count_ = new_hit_count; + compressed_insert_count_ = new_insert_count; + compressed_failure_count_ = new_failure_count; + } +}; + +TEST_F(DBBlockCacheTest, TestWithoutCompressedBlockCache) { + ReadOptions read_options; + auto table_options = GetTableOptions(); + auto options = GetOptions(table_options); + InitTable(options); + + std::shared_ptr cache = NewLRUCache(0, 0, false); + table_options.block_cache = cache; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + Reopen(options); + RecordCacheCounters(options); + + std::vector> iterators(kNumBlocks - 1); + Iterator* iter = nullptr; + + // Load blocks into cache. + for (size_t i = 0; i < kNumBlocks - 1; i++) { + iter = db_->NewIterator(read_options); + iter->Seek(ToString(i)); + ASSERT_OK(iter->status()); + CheckCacheCounters(options, 1, 0, 1, 0); + iterators[i].reset(iter); + } + size_t usage = cache->GetUsage(); + ASSERT_LT(0, usage); + cache->SetCapacity(usage); + ASSERT_EQ(usage, cache->GetPinnedUsage()); + + // Test with strict capacity limit. + cache->SetStrictCapacityLimit(true); + iter = db_->NewIterator(read_options); + iter->Seek(ToString(kNumBlocks - 1)); + ASSERT_TRUE(iter->status().IsIncomplete()); + CheckCacheCounters(options, 1, 0, 0, 1); + delete iter; + iter = nullptr; + + // Release interators and access cache again. + for (size_t i = 0; i < kNumBlocks - 1; i++) { + iterators[i].reset(); + CheckCacheCounters(options, 0, 0, 0, 0); + } + ASSERT_EQ(0, cache->GetPinnedUsage()); + for (size_t i = 0; i < kNumBlocks - 1; i++) { + iter = db_->NewIterator(read_options); + iter->Seek(ToString(i)); + ASSERT_OK(iter->status()); + CheckCacheCounters(options, 0, 1, 0, 0); + iterators[i].reset(iter); + } +} + +TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) { + ReadOptions read_options; + auto table_options = GetTableOptions(); + auto options = GetOptions(table_options); + InitTable(options); + + std::shared_ptr cache = NewLRUCache(0, 0, false); + std::shared_ptr compressed_cache = NewLRUCache(0, 0, false); + table_options.block_cache = cache; + table_options.block_cache_compressed = compressed_cache; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + Reopen(options); + RecordCacheCounters(options); + + std::vector> iterators(kNumBlocks - 1); + Iterator* iter = nullptr; + + // Load blocks into cache. + for (size_t i = 0; i < kNumBlocks - 1; i++) { + iter = db_->NewIterator(read_options); + iter->Seek(ToString(i)); + ASSERT_OK(iter->status()); + CheckCacheCounters(options, 1, 0, 1, 0); + CheckCompressedCacheCounters(options, 1, 0, 1, 0); + iterators[i].reset(iter); + } + size_t usage = cache->GetUsage(); + ASSERT_LT(0, usage); + ASSERT_EQ(usage, cache->GetPinnedUsage()); + size_t compressed_usage = compressed_cache->GetUsage(); + ASSERT_LT(0, compressed_usage); + // Compressed block cache cannot be pinned. + ASSERT_EQ(0, compressed_cache->GetPinnedUsage()); + + // Set strict capacity limit flag. Now block will only load into compressed + // block cache. + cache->SetCapacity(usage); + cache->SetStrictCapacityLimit(true); + ASSERT_EQ(usage, cache->GetPinnedUsage()); + // compressed_cache->SetCapacity(compressed_usage); + compressed_cache->SetCapacity(0); + // compressed_cache->SetStrictCapacityLimit(true); + iter = db_->NewIterator(read_options); + iter->Seek(ToString(kNumBlocks - 1)); + ASSERT_TRUE(iter->status().IsIncomplete()); + CheckCacheCounters(options, 1, 0, 0, 1); + CheckCompressedCacheCounters(options, 1, 0, 1, 0); + delete iter; + iter = nullptr; + + // Clear strict capacity limit flag. This time we shall hit compressed block + // cache. + cache->SetStrictCapacityLimit(false); + iter = db_->NewIterator(read_options); + iter->Seek(ToString(kNumBlocks - 1)); + ASSERT_OK(iter->status()); + CheckCacheCounters(options, 1, 0, 1, 0); + CheckCompressedCacheCounters(options, 0, 1, 0, 0); + delete iter; + iter = nullptr; +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/table_cache.cc b/db/table_cache.cc index 53e35d3a0..2a4621b7e 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -143,8 +143,12 @@ Status TableCache::FindTable(const EnvOptions& env_options, // We do not cache error results so that if the error is transient, // or somebody repairs the file, we recover automatically. } else { - *handle = cache_->Insert(key, table_reader.release(), 1, - &DeleteEntry); + s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry, + handle); + if (s.ok()) { + // Release ownership of table reader. + table_reader.release(); + } } } return s; @@ -285,9 +289,8 @@ Status TableCache::Get(const ReadOptions& options, size_t charge = row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string); void* row_ptr = new std::string(std::move(*row_cache_entry)); - auto row_handle = ioptions_.row_cache->Insert( - row_cache_key.GetKey(), row_ptr, charge, &DeleteEntry); - ioptions_.row_cache->Release(row_handle); + ioptions_.row_cache->Insert(row_cache_key.GetKey(), row_ptr, charge, + &DeleteEntry); } #endif // ROCKSDB_LITE diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 30d9c67d3..327270e34 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -25,6 +25,7 @@ #include #include #include "rocksdb/slice.h" +#include "rocksdb/status.h" namespace rocksdb { @@ -33,12 +34,15 @@ using std::shared_ptr; class Cache; // Create a new cache with a fixed size capacity. The cache is sharded -// to 2^numShardBits shards, by hash of the key. The total capacity +// to 2^num_shard_bits shards, by hash of the key. The total capacity // is divided and evenly assigned to each shard. // -// The functions without parameter numShardBits uses default value, which is 4 +// The parameter num_shard_bits defaults to 4, and strict_capacity_limit +// defaults to false. extern shared_ptr NewLRUCache(size_t capacity); -extern shared_ptr NewLRUCache(size_t capacity, int numShardBits); +extern shared_ptr NewLRUCache(size_t capacity, int num_shard_bits); +extern shared_ptr NewLRUCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit); class Cache { public: @@ -55,15 +59,22 @@ class Cache { // Insert a mapping from key->value into the cache and assign it // the specified charge against the total cache capacity. + // If strict_capacity_limit is true and cache reaches its full capacity, + // return Status::Incomplete. // - // Returns a handle that corresponds to the mapping. The caller - // must call this->Release(handle) when the returned mapping is no - // longer needed. + // If handle is not nullptr, returns a handle that corresponds to the + // mapping. The caller must call this->Release(handle) when the returned + // mapping is no longer needed. In case of error caller is responsible to + // cleanup the value (i.e. calling "deleter"). + // + // If handle is nullptr, it is as if Release is called immediately after + // insert. In case of error value will be cleanup. // // When the inserted entry is no longer needed, the key and // value will be passed to "deleter". - virtual Handle* Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value)) = 0; + virtual Status Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Handle** handle = nullptr) = 0; // If the cache has no mapping for "key", returns nullptr. // @@ -100,6 +111,14 @@ class Cache { // purge the released entries from the cache in order to lower the usage virtual void SetCapacity(size_t capacity) = 0; + // Set whether to return error on insertion when cache reaches its full + // capacity. + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; + + // Set whether to return error on insertion when cache reaches its full + // capacity. + virtual bool HasStrictCapacityLimit() const = 0; + // returns the maximum configured capacity of the cache virtual size_t GetCapacity() const = 0; diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index c16c3a7ca..c832516da 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -33,6 +33,8 @@ enum Tickers : uint32_t { BLOCK_CACHE_HIT, // # of blocks added to block cache. BLOCK_CACHE_ADD, + // # of failures when adding blocks to block cache. + BLOCK_CACHE_ADD_FAILURES, // # of times cache miss when accessing index block from block cache. BLOCK_CACHE_INDEX_MISS, // # of times cache hit when accessing index block from block cache. @@ -140,8 +142,12 @@ enum Tickers : uint32_t { GET_UPDATES_SINCE_CALLS, BLOCK_CACHE_COMPRESSED_MISS, // miss in the compressed block cache BLOCK_CACHE_COMPRESSED_HIT, // hit in the compressed block cache - WAL_FILE_SYNCED, // Number of times WAL sync is done - WAL_FILE_BYTES, // Number of bytes written to WAL + // Number of blocks added to comopressed block cache + BLOCK_CACHE_COMPRESSED_ADD, + // Number of failures when adding blocks to compressed block cache + BLOCK_CACHE_COMPRESSED_ADD_FAILURES, + WAL_FILE_SYNCED, // Number of times WAL sync is done + WAL_FILE_BYTES, // Number of bytes written to WAL // Writes can be processed by requesting thread or by the thread at the // head of the writers queue. @@ -176,6 +182,7 @@ const std::vector> TickersNameMap = { {BLOCK_CACHE_MISS, "rocksdb.block.cache.miss"}, {BLOCK_CACHE_HIT, "rocksdb.block.cache.hit"}, {BLOCK_CACHE_ADD, "rocksdb.block.cache.add"}, + {BLOCK_CACHE_ADD_FAILURES, "rocksdb.block.cache.add.failures"}, {BLOCK_CACHE_INDEX_MISS, "rocksdb.block.cache.index.miss"}, {BLOCK_CACHE_INDEX_HIT, "rocksdb.block.cache.index.hit"}, {BLOCK_CACHE_FILTER_MISS, "rocksdb.block.cache.filter.miss"}, @@ -227,6 +234,9 @@ const std::vector> TickersNameMap = { {GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls"}, {BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss"}, {BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit"}, + {BLOCK_CACHE_COMPRESSED_ADD, "rocksdb.block.cachecompressed.add"}, + {BLOCK_CACHE_COMPRESSED_ADD_FAILURES, + "rocksdb.block.cachecompressed.add.failures"}, {WAL_FILE_SYNCED, "rocksdb.wal.synced"}, {WAL_FILE_BYTES, "rocksdb.wal.bytes"}, {WRITE_DONE_BY_SELF, "rocksdb.write.self"}, diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 1484acb51..47d74bc5f 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -703,7 +703,6 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, if (type != kNoCompression && block_cache_compressed != nullptr) { - Cache::Handle* cache_handle = nullptr; size_t size = block_contents.size(); std::unique_ptr ubuf(new char[size + 1]); @@ -723,9 +722,8 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, (end - r->compressed_cache_key_prefix)); // Insert into compressed block cache. - cache_handle = block_cache_compressed->Insert( - key, block, block->usable_size(), &DeleteCachedBlock); - block_cache_compressed->Release(cache_handle); + block_cache_compressed->Insert(key, block, block->usable_size(), + &DeleteCachedBlock); // Invalidate OS cache. r->file->InvalidateCache(static_cast(r->offset), size); diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 42c5aa494..cbaf90a90 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -740,11 +740,16 @@ Status BlockBasedTable::GetDataBlockFromCache( assert(block->value->compression_type() == kNoCompression); if (block_cache != nullptr && block->value->cachable() && read_options.fill_cache) { - block->cache_handle = block_cache->Insert(block_cache_key, block->value, - block->value->usable_size(), - &DeleteCachedEntry); - assert(reinterpret_cast( - block_cache->Value(block->cache_handle)) == block->value); + s = block_cache->Insert( + block_cache_key, block->value, block->value->usable_size(), + &DeleteCachedEntry, &(block->cache_handle)); + if (s.ok()) { + RecordTick(statistics, BLOCK_CACHE_ADD); + } else { + RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); + delete block->value; + block->value = nullptr; + } } } @@ -784,27 +789,37 @@ Status BlockBasedTable::PutDataBlockToCache( // Release the hold on the compressed cache entry immediately. if (block_cache_compressed != nullptr && raw_block != nullptr && raw_block->cachable()) { - auto cache_handle = block_cache_compressed->Insert( - compressed_block_cache_key, raw_block, raw_block->usable_size(), - &DeleteCachedEntry); - block_cache_compressed->Release(cache_handle); - RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); - // Avoid the following code to delete this cached block. - raw_block = nullptr; + s = block_cache_compressed->Insert(compressed_block_cache_key, raw_block, + raw_block->usable_size(), + &DeleteCachedEntry); + if (s.ok()) { + // Avoid the following code to delete this cached block. + raw_block = nullptr; + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD); + } else { + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); + } } delete raw_block; // insert into uncompressed block cache assert((block->value->compression_type() == kNoCompression)); if (block_cache != nullptr && block->value->cachable()) { - block->cache_handle = block_cache->Insert(block_cache_key, block->value, - block->value->usable_size(), - &DeleteCachedEntry); - RecordTick(statistics, BLOCK_CACHE_ADD); - RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, - block->value->usable_size()); - assert(reinterpret_cast(block_cache->Value(block->cache_handle)) == - block->value); + s = block_cache->Insert(block_cache_key, block->value, + block->value->usable_size(), + &DeleteCachedEntry, &(block->cache_handle)); + if (s.ok()) { + assert(block->cache_handle != nullptr); + RecordTick(statistics, BLOCK_CACHE_ADD); + RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, + block->value->usable_size()); + assert(reinterpret_cast( + block_cache->Value(block->cache_handle)) == block->value); + } else { + RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); + delete block->value; + block->value = nullptr; + } } return s; @@ -891,10 +906,17 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( filter = ReadFilter(rep_, &filter_size); if (filter != nullptr) { assert(filter_size > 0); - cache_handle = block_cache->Insert(key, filter, filter_size, - &DeleteCachedEntry); - RecordTick(statistics, BLOCK_CACHE_ADD); - RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter_size); + Status s = block_cache->Insert(key, filter, filter_size, + &DeleteCachedEntry, + &cache_handle); + if (s.ok()) { + RecordTick(statistics, BLOCK_CACHE_ADD); + RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter_size); + } else { + RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); + delete filter; + return CachableEntry(); + } } } @@ -937,10 +959,18 @@ InternalIterator* BlockBasedTable::NewIndexIterator( // Create index reader and put it in the cache. Status s; s = CreateIndexReader(&index_reader); + if (s.ok()) { + s = block_cache->Insert(key, index_reader, index_reader->usable_size(), + &DeleteCachedEntry, &cache_handle); + } - if (!s.ok()) { + if (s.ok()) { + RecordTick(statistics, BLOCK_CACHE_ADD); + RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, + index_reader->usable_size()); + } else { + RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); // make sure if something goes wrong, index_reader shall remain intact. - assert(index_reader == nullptr); if (input_iter != nullptr) { input_iter->SetStatus(s); return input_iter; @@ -949,12 +979,6 @@ InternalIterator* BlockBasedTable::NewIndexIterator( } } - cache_handle = - block_cache->Insert(key, index_reader, index_reader->usable_size(), - &DeleteCachedEntry); - RecordTick(statistics, BLOCK_CACHE_ADD); - RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, - index_reader->usable_size()); } assert(cache_handle); @@ -1036,7 +1060,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } // Didn't get any data from block caches. - if (block.value == nullptr) { + if (s.ok() && block.value == nullptr) { if (no_io) { // Could not read from block_cache and can't do IO if (input_iter != nullptr) { @@ -1055,7 +1079,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } InternalIterator* iter; - if (block.value != nullptr) { + if (s.ok() && block.value != nullptr) { iter = block.value->NewIterator(&rep->internal_comparator, input_iter); if (block.cache_handle != nullptr) { iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, diff --git a/util/cache.cc b/util/cache.cc index 078b10e1a..6015644f6 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -196,10 +196,13 @@ class LRUCache { // free the needed space void SetCapacity(size_t capacity); + // Set the flag to reject insertion if cache if full. + void SetStrictCapacityLimit(bool strict_capacity_limit); + // Like Cache methods, but with an extra "hash" parameter. - Cache::Handle* Insert(const Slice& key, uint32_t hash, - void* value, size_t charge, - void (*deleter)(const Slice& key, void* value)); + Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle); Cache::Handle* Lookup(const Slice& key, uint32_t hash); void Release(Cache::Handle* handle); void Erase(const Slice& key, uint32_t hash); @@ -245,6 +248,9 @@ class LRUCache { // Memory size for entries residing only in the LRU list size_t lru_usage_; + // Whether to reject insertion if cache reaches its full capacity. + bool strict_capacity_limit_; + // mutex_ protects the following state. // We don't count mutex_ as the cache's internal state so semantically we // don't mind mutex_ invoking the non-const actions. @@ -336,6 +342,11 @@ void LRUCache::SetCapacity(size_t capacity) { } } +void LRUCache::SetStrictCapacityLimit(bool strict_capacity_limit) { + MutexLock l(&mutex_); + strict_capacity_limit_ = strict_capacity_limit; +} + Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) { MutexLock l(&mutex_); LRUHandle* e = table_.Lookup(key, hash); @@ -350,6 +361,9 @@ Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) { } void LRUCache::Release(Cache::Handle* handle) { + if (handle == nullptr) { + return; + } LRUHandle* e = reinterpret_cast(handle); bool last_reference = false; { @@ -383,15 +397,16 @@ void LRUCache::Release(Cache::Handle* handle) { } } -Cache::Handle* LRUCache::Insert( - const Slice& key, uint32_t hash, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value)) { - +Status LRUCache::Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle) { // Allocate the memory here outside of the mutex // If the cache is full, we'll have to release it // It shouldn't happen very often though. LRUHandle* e = reinterpret_cast( new char[sizeof(LRUHandle) - 1 + key.size()]); + Status s; autovector last_reference_list; e->value = value; @@ -399,7 +414,9 @@ Cache::Handle* LRUCache::Insert( e->charge = charge; e->key_length = key.size(); e->hash = hash; - e->refs = 2; // One from LRUCache, one for the returned handle + e->refs = (handle == nullptr + ? 1 + : 2); // One from LRUCache, one for the returned handle e->next = e->prev = nullptr; e->in_cache = true; memcpy(e->key_data, key.data(), key.size()); @@ -411,20 +428,36 @@ Cache::Handle* LRUCache::Insert( // is freed or the lru list is empty EvictFromLRU(charge, &last_reference_list); - // insert into the cache - // note that the cache might get larger than its capacity if not enough - // space was freed - LRUHandle* old = table_.Insert(e); - usage_ += e->charge; - if (old != nullptr) { - old->in_cache = false; - if (Unref(old)) { - usage_ -= old->charge; - // old is on LRU because it's in cache and its reference count - // was just 1 (Unref returned 0) - LRU_Remove(old); - last_reference_list.push_back(old); + if (strict_capacity_limit_ && usage_ - lru_usage_ + charge > capacity_) { + if (handle == nullptr) { + last_reference_list.push_back(e); + } else { + delete[] reinterpret_cast(e); + *handle = nullptr; + } + s = Status::Incomplete("Insert failed due to LRU cache being full."); + } else { + // insert into the cache + // note that the cache might get larger than its capacity if not enough + // space was freed + LRUHandle* old = table_.Insert(e); + usage_ += e->charge; + if (old != nullptr) { + old->in_cache = false; + if (Unref(old)) { + usage_ -= old->charge; + // old is on LRU because it's in cache and its reference count + // was just 1 (Unref returned 0) + LRU_Remove(old); + last_reference_list.push_back(old); + } + } + if (handle == nullptr) { + LRU_Append(e); + } else { + *handle = reinterpret_cast(e); } + s = Status::OK(); } } @@ -434,7 +467,7 @@ Cache::Handle* LRUCache::Insert( entry->Free(); } - return reinterpret_cast(e); + return s; } void LRUCache::Erase(const Slice& key, uint32_t hash) { @@ -472,6 +505,7 @@ class ShardedLRUCache : public Cache { uint64_t last_id_; int num_shard_bits_; size_t capacity_; + bool strict_capacity_limit_; static inline uint32_t HashSlice(const Slice& s) { return Hash(s.data(), s.size(), 0); @@ -483,13 +517,18 @@ class ShardedLRUCache : public Cache { } public: - ShardedLRUCache(size_t capacity, int num_shard_bits) - : last_id_(0), num_shard_bits_(num_shard_bits), capacity_(capacity) { + ShardedLRUCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) + : last_id_(0), + num_shard_bits_(num_shard_bits), + capacity_(capacity), + strict_capacity_limit_(strict_capacity_limit) { int num_shards = 1 << num_shard_bits_; shards_ = new LRUCache[num_shards]; const size_t per_shard = (capacity + (num_shards - 1)) / num_shards; for (int s = 0; s < num_shards; s++) { shards_[s].SetCapacity(per_shard); + shards_[s].SetStrictCapacityLimit(strict_capacity_limit); } } virtual ~ShardedLRUCache() { @@ -504,11 +543,19 @@ class ShardedLRUCache : public Cache { } capacity_ = capacity; } - virtual Handle* Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, - void* value)) override { + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + shards_[s].SetStrictCapacityLimit(strict_capacity_limit); + } + strict_capacity_limit_ = strict_capacity_limit; + } + virtual Status Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Handle** handle) override { const uint32_t hash = HashSlice(key); - return shards_[Shard(hash)].Insert(key, hash, value, charge, deleter); + return shards_[Shard(hash)].Insert(key, hash, value, charge, deleter, + handle); } virtual Handle* Lookup(const Slice& key) override { const uint32_t hash = HashSlice(key); @@ -531,6 +578,10 @@ class ShardedLRUCache : public Cache { } virtual size_t GetCapacity() const override { return capacity_; } + virtual bool HasStrictCapacityLimit() const override { + return strict_capacity_limit_; + } + virtual size_t GetUsage() const override { // We will not lock the cache when getting the usage from shards. int num_shards = 1 << num_shard_bits_; @@ -569,14 +620,20 @@ class ShardedLRUCache : public Cache { } // end anonymous namespace shared_ptr NewLRUCache(size_t capacity) { - return NewLRUCache(capacity, kNumShardBits); + return NewLRUCache(capacity, kNumShardBits, false); } shared_ptr NewLRUCache(size_t capacity, int num_shard_bits) { + return NewLRUCache(capacity, num_shard_bits, false); +} + +shared_ptr NewLRUCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) { if (num_shard_bits >= 20) { return nullptr; // the cache cannot be sharded into too many fine pieces } - return std::make_shared(capacity, num_shard_bits); + return std::make_shared(capacity, num_shard_bits, + strict_capacity_limit); } } // namespace rocksdb diff --git a/util/cache_bench.cc b/util/cache_bench.cc index 0e0d70d62..266c9e1c5 100644 --- a/util/cache_bench.cc +++ b/util/cache_bench.cc @@ -142,8 +142,7 @@ class CacheBench { // Cast uint64* to be char*, data would be copied to cache Slice key(reinterpret_cast(&rand_key), 8); // do insert - auto handle = cache_->Insert(key, new char[10], 1, &deleter); - cache_->Release(handle); + cache_->Insert(key, new char[10], 1, &deleter); } } @@ -221,8 +220,7 @@ class CacheBench { int32_t prob_op = thread->rnd.Uniform(100); if (prob_op >= 0 && prob_op < FLAGS_insert_percent) { // do insert - auto handle = cache_->Insert(key, new char[10], 1, &deleter); - cache_->Release(handle); + cache_->Insert(key, new char[10], 1, &deleter); } else if (prob_op -= FLAGS_insert_percent && prob_op < FLAGS_lookup_percent) { // do lookup diff --git a/util/cache_test.cc b/util/cache_test.cc index d49cd4fdf..3df71c098 100644 --- a/util/cache_test.cc +++ b/util/cache_test.cc @@ -73,8 +73,8 @@ class CacheTest : public testing::Test { } void Insert(shared_ptr cache, int key, int value, int charge = 1) { - cache->Release(cache->Insert(EncodeKey(key), EncodeValue(value), charge, - &CacheTest::Deleter)); + cache->Insert(EncodeKey(key), EncodeValue(value), charge, + &CacheTest::Deleter); } void Erase(shared_ptr cache, int key) { @@ -118,14 +118,12 @@ TEST_F(CacheTest, UsageTest) { auto cache = NewLRUCache(kCapacity, 8); size_t usage = 0; - const char* value = "abcdef"; + char value[10] = "abcdef"; // make sure everything will be cached for (int i = 1; i < 100; ++i) { std::string key(i, 'a'); auto kv_size = key.size() + 5; - cache->Release( - cache->Insert(key, (void*)value, kv_size, dumbDeleter) - ); + cache->Insert(key, reinterpret_cast(value), kv_size, dumbDeleter); usage += kv_size; ASSERT_EQ(usage, cache->GetUsage()); } @@ -133,9 +131,8 @@ TEST_F(CacheTest, UsageTest) { // make sure the cache will be overloaded for (uint64_t i = 1; i < kCapacity; ++i) { auto key = ToString(i); - cache->Release( - cache->Insert(key, (void*)value, key.size() + 5, dumbDeleter) - ); + cache->Insert(key, reinterpret_cast(value), key.size() + 5, + dumbDeleter); } // the usage should be close to the capacity @@ -149,7 +146,7 @@ TEST_F(CacheTest, PinnedUsageTest) { auto cache = NewLRUCache(kCapacity, 8); size_t pinned_usage = 0; - const char* value = "abcdef"; + char value[10] = "abcdef"; std::forward_list unreleased_handles; @@ -158,7 +155,9 @@ TEST_F(CacheTest, PinnedUsageTest) { for (int i = 1; i < 100; ++i) { std::string key(i, 'a'); auto kv_size = key.size() + 5; - auto handle = cache->Insert(key, (void*)value, kv_size, dumbDeleter); + Cache::Handle* handle; + cache->Insert(key, reinterpret_cast(value), kv_size, dumbDeleter, + &handle); pinned_usage += kv_size; ASSERT_EQ(pinned_usage, cache->GetPinnedUsage()); if (i % 2 == 0) { @@ -182,8 +181,8 @@ TEST_F(CacheTest, PinnedUsageTest) { // check that overloading the cache does not change the pinned usage for (uint64_t i = 1; i < 2 * kCapacity; ++i) { auto key = ToString(i); - cache->Release( - cache->Insert(key, (void*)value, key.size() + 5, dumbDeleter)); + cache->Insert(key, reinterpret_cast(value), key.size() + 5, + dumbDeleter); } ASSERT_EQ(pinned_usage, cache->GetPinnedUsage()); @@ -408,7 +407,8 @@ TEST_F(CacheTest, SetCapacity) { // Insert 5 entries, but not releasing. for (size_t i = 0; i < 5; i++) { std::string key = ToString(i+1); - handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter); + Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); } ASSERT_EQ(5U, cache->GetCapacity()); ASSERT_EQ(5U, cache->GetUsage()); @@ -422,7 +422,8 @@ TEST_F(CacheTest, SetCapacity) { // and usage should be 7 for (size_t i = 5; i < 10; i++) { std::string key = ToString(i+1); - handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter); + Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); } ASSERT_EQ(10U, cache->GetCapacity()); ASSERT_EQ(10U, cache->GetUsage()); @@ -441,6 +442,53 @@ TEST_F(CacheTest, SetCapacity) { } } +TEST_F(CacheTest, SetStrictCapacityLimit) { + // test1: set the flag to false. Insert more keys than capacity. See if they + // all go through. + std::shared_ptr cache = NewLRUCache(5, 0, false); + std::vector handles(10); + Status s; + for (size_t i = 0; i < 10; i++) { + std::string key = ToString(i + 1); + s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); + ASSERT_NE(nullptr, handles[i]); + } + + // test2: set the flag to true. Insert and check if it fails. + std::string extra_key = "extra"; + Value* extra_value = new Value(0); + cache->SetStrictCapacityLimit(true); + Cache::Handle* handle; + s = cache->Insert(extra_key, extra_value, 1, &deleter, &handle); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_EQ(nullptr, handle); + + for (size_t i = 0; i < 10; i++) { + cache->Release(handles[i]); + } + + // test3: init with flag being true. + std::shared_ptr cache2 = NewLRUCache(5, 0, true); + for (size_t i = 0; i < 5; i++) { + std::string key = ToString(i + 1); + s = cache2->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); + ASSERT_NE(nullptr, handles[i]); + } + s = cache2->Insert(extra_key, extra_value, 1, &deleter, &handle); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_EQ(nullptr, handle); + // test insert without handle + s = cache2->Insert(extra_key, extra_value, 1, &deleter); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_EQ(5, cache->GetUsage()); + + for (size_t i = 0; i < 5; i++) { + cache2->Release(handles[i]); + } +} + TEST_F(CacheTest, OverCapacity) { size_t n = 10; @@ -452,7 +500,8 @@ TEST_F(CacheTest, OverCapacity) { // Insert n+1 entries, but not releasing. for (size_t i = 0; i < n + 1; i++) { std::string key = ToString(i+1); - handles[i] = cache->Insert(key, new Value(i+1), 1, &deleter); + Status s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]); + ASSERT_TRUE(s.ok()); } // Guess what's in the cache now?