Not insert into block cache if cache is full and not holding handle

Summary:
We used to allow insert into full block cache as long as `strict_capacity_limit=false`. This diff further restrict insert to full cache if caller don't intent to hold handle to the cache entry after insert.

Hope this diff fix the assertion failure with db_stress: https://our.intern.facebook.com/intern/sandcastle/log/?instance_id=211853102&step_id=2475070014

  db_stress: util/lru_cache.cc:278: virtual void rocksdb::LRUCacheShard::Release(rocksdb::Cache::Handle*): Assertion `lru_.next == &lru_' failed.

The assertion at lru_cache.cc:278 can fail when an entry is inserted into full cache and stay in LRU list.

Test Plan:
  make all check

Reviewers: IslamAbdelRahman, lightmark, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D62325
main
Yi Wu 9 years ago
parent 4a16c32ece
commit badbff65b7
  1. 5
      db/db_block_cache_test.cc
  2. 8
      util/cache_test.cc
  3. 28
      util/clock_cache.cc
  4. 7
      util/lru_cache.cc

@ -172,7 +172,7 @@ TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) {
InitTable(options);
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
std::shared_ptr<Cache> compressed_cache = NewLRUCache(0, 0, false);
std::shared_ptr<Cache> compressed_cache = NewLRUCache(1 << 25, 0, false);
table_options.block_cache = cache;
table_options.block_cache_compressed = compressed_cache;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
@ -204,9 +204,6 @@ TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) {
cache->SetCapacity(usage);
cache->SetStrictCapacityLimit(true);
ASSERT_EQ(usage, cache->GetPinnedUsage());
// compressed_cache->SetCapacity(compressed_usage);
compressed_cache->SetCapacity(0);
// compressed_cache->SetStrictCapacityLimit(true);
iter = db_->NewIterator(read_options);
iter->Seek(ToString(kNumBlocks - 1));
ASSERT_TRUE(iter->status().IsIncomplete());

@ -480,7 +480,7 @@ TEST_P(CacheTest, SetStrictCapacityLimit) {
for (size_t i = 0; i < 10; i++) {
std::string key = ToString(i + 1);
s = cache->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
ASSERT_OK(s);
ASSERT_NE(nullptr, handles[i]);
}
@ -502,7 +502,7 @@ TEST_P(CacheTest, SetStrictCapacityLimit) {
for (size_t i = 0; i < 5; i++) {
std::string key = ToString(i + 1);
s = cache2->Insert(key, new Value(i + 1), 1, &deleter, &handles[i]);
ASSERT_TRUE(s.ok());
ASSERT_OK(s);
ASSERT_NE(nullptr, handles[i]);
}
s = cache2->Insert(extra_key, extra_value, 1, &deleter, &handle);
@ -510,8 +510,10 @@ TEST_P(CacheTest, SetStrictCapacityLimit) {
ASSERT_EQ(nullptr, handle);
// test insert without handle
s = cache2->Insert(extra_key, extra_value, 1, &deleter);
ASSERT_TRUE(s.IsIncomplete());
// AS if the key have been inserted into cache but get evicted immediately.
ASSERT_OK(s);
ASSERT_EQ(5, cache->GetUsage());
ASSERT_EQ(nullptr, cache2->Lookup(extra_key));
for (size_t i = 0; i < 5; i++) {
cache2->Release(handles[i]);

@ -187,6 +187,10 @@ struct CacheHandle {
CacheHandle(const CacheHandle& a) { *this = a; }
CacheHandle(const Slice& k, void* v,
void (*del)(const Slice& key, void* value))
: key(k), value(v), deleter(del) {}
CacheHandle& operator=(const CacheHandle& a) {
// Only copy members needed for deletion.
key = a.key;
@ -526,7 +530,11 @@ CacheHandle* ClockCacheShard::Insert(
MutexLock l(&mutex_);
bool success = EvictFromCache(charge, context);
bool strict = strict_capacity_limit_.load(std::memory_order_relaxed);
if (!success && strict) {
if (!success && (strict || !hold_reference)) {
context->to_delete_key.push_back(key.data());
if (!hold_reference) {
context->to_delete_value.emplace_back(key, value, deleter);
}
return nullptr;
}
// Grab available handle from recycle bin. If recycle bin is empty, create
@ -571,20 +579,22 @@ CacheHandle* ClockCacheShard::Insert(
Status ClockCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** h, Cache::Priority priority) {
Cache::Handle** out_handle,
Cache::Priority priority) {
CleanupContext context;
HashTable::accessor accessor;
char* key_data = new char[key.size()];
memcpy(key_data, key.data(), key.size());
Slice key_copy(key_data, key.size());
CacheHandle* handle =
Insert(key_copy, hash, value, charge, deleter, h != nullptr, &context);
CacheHandle* handle = Insert(key_copy, hash, value, charge, deleter,
out_handle != nullptr, &context);
Status s;
if (h != nullptr) {
*h = reinterpret_cast<Cache::Handle*>(handle);
}
if (handle == nullptr) {
s = Status::Incomplete("Insert failed due to LRU cache being full.");
if (out_handle != nullptr) {
if (handle == nullptr) {
s = Status::Incomplete("Insert failed due to LRU cache being full.");
} else {
*out_handle = reinterpret_cast<Cache::Handle*>(handle);
}
}
Cleanup(context);
return s;

@ -327,14 +327,17 @@ Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
// is freed or the lru list is empty
EvictFromLRU(charge, &last_reference_list);
if (strict_capacity_limit_ && usage_ - lru_usage_ + charge > capacity_) {
if (usage_ - lru_usage_ + charge > capacity_ &&
(strict_capacity_limit_ || handle == nullptr)) {
if (handle == nullptr) {
// Don't insert the entry but still return ok, as if the entry inserted
// into cache and get evicted immediately.
last_reference_list.push_back(e);
} else {
delete[] reinterpret_cast<char*>(e);
*handle = nullptr;
s = Status::Incomplete("Insert failed due to LRU cache being full.");
}
s = Status::Incomplete("Insert failed due to LRU cache being full.");
} else {
// insert into the cache
// note that the cache might get larger than its capacity if not enough

Loading…
Cancel
Save