Add a unit test to verify iterators release data blocks after using them (#4170)

Summary:
Add a unit test to check that iterators release data blocks after it has moved away from it. Verify the same for compaction input iterators.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4170

Differential Revision: D8962513

Pulled By: siying

fbshipit-source-id: 05a5b604d7d29887fb488f2cda7286f554a14407
main
Siying Dong 6 years ago committed by Facebook Github Bot
parent 999d955e4f
commit f3d91a0b57
  1. 2
      db/compaction_job.cc
  2. 104
      db/db_test2.cc
  3. 3
      table/block.cc
  4. 2
      table/block.h

@ -600,6 +600,8 @@ Status CompactionJob::Run() {
compaction_stats_.micros = env_->NowMicros() - start_micros;
MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
// Check if any thread encountered an error during execution
Status status;
for (const auto& state : compact_->sub_compact_states) {

@ -2649,6 +2649,110 @@ TEST_F(DBTest2, PinnableSliceAndMmapReads) {
#endif
}
TEST_F(DBTest2, IteratorPinnedMemory) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.no_block_cache = false;
bbto.cache_index_and_filter_blocks = false;
bbto.block_cache = NewLRUCache(100000);
bbto.block_size = 400; // small block size
options.table_factory.reset(new BlockBasedTableFactory(bbto));
Reopen(options);
Random rnd(301);
std::string v = RandomString(&rnd, 400);
// Since v is the size of a block, each key should take a block
// of 400+ bytes.
Put("1", v);
Put("3", v);
Put("5", v);
Put("7", v);
ASSERT_OK(Flush());
ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
// Verify that iterators don't pin more than one data block in block cache
// at each time.
{
unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->SeekToFirst();
for (int i = 0; i < 4; i++) {
ASSERT_TRUE(iter->Valid());
// Block cache should contain exactly one block.
ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
iter->Next();
}
ASSERT_FALSE(iter->Valid());
iter->Seek("4");
ASSERT_TRUE(iter->Valid());
ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
iter->Seek("3");
ASSERT_TRUE(iter->Valid());
ASSERT_GT(bbto.block_cache->GetPinnedUsage(), 0);
ASSERT_LT(bbto.block_cache->GetPinnedUsage(), 800);
}
ASSERT_EQ(0, bbto.block_cache->GetPinnedUsage());
// Test compaction case
Put("2", v);
Put("5", v);
Put("6", v);
Put("8", v);
ASSERT_OK(Flush());
// Clear existing data in block cache
bbto.block_cache->SetCapacity(0);
bbto.block_cache->SetCapacity(100000);
// Verify compaction input iterators don't hold more than one data blocks at
// one time.
std::atomic<bool> finished(false);
std::atomic<int> block_newed(0);
std::atomic<int> block_destroyed(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Block::Block:0", [&](void* /*arg*/) {
if (finished) {
return;
}
// Two iterators. At most 2 outstanding blocks.
EXPECT_GE(block_newed.load(), block_destroyed.load());
EXPECT_LE(block_newed.load(), block_destroyed.load() + 1);
block_newed.fetch_add(1);
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Block::~Block", [&](void* /*arg*/) {
if (finished) {
return;
}
// Two iterators. At most 2 outstanding blocks.
EXPECT_GE(block_newed.load(), block_destroyed.load() + 1);
EXPECT_LE(block_newed.load(), block_destroyed.load() + 2);
block_destroyed.fetch_add(1);
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run:BeforeVerify",
[&](void* /*arg*/) { finished = true; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Two input files. Each of them has 4 data blocks.
ASSERT_EQ(8, block_newed.load());
ASSERT_EQ(8, block_destroyed.load());
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest2, TestBBTTailPrefetch) {
std::atomic<bool> called(false);
size_t expected_lower_bound = 512 * 1024;

@ -611,6 +611,8 @@ uint32_t Block::NumRestarts() const {
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}
Block::~Block() { TEST_SYNC_POINT("Block::~Block"); }
Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
size_t read_amp_bytes_per_bit, Statistics* statistics)
: contents_(std::move(contents)),
@ -619,6 +621,7 @@ Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
restart_offset_(0),
num_restarts_(0),
global_seqno_(_global_seqno) {
TEST_SYNC_POINT("Block::Block:0");
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {

@ -147,7 +147,7 @@ class Block {
size_t read_amp_bytes_per_bit = 0,
Statistics* statistics = nullptr);
~Block() = default;
~Block();
size_t size() const { return size_; }
const char* data() const { return data_; }

Loading…
Cancel
Save