Move prefetching responsibility to page cache for compaction read under non directIO usecase (#11631)

Summary:
**Context/Summary**
As titled. The benefit of doing so is to explicitly call readahead() instead of relying page cache behavior for compaction read when we know that we most likely need readahead as compaction read is sequential read .

**Test**
Extended the existing UT to cover compaction read case

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11631

Reviewed By: ajkr

Differential Revision: D47681437

Pulled By: hx235

fbshipit-source-id: 78792f64985c4dc44aa8f2a9c41ab3e8bbc0bc90
oxigraph-main
Hui Xiao 1 year ago committed by Facebook GitHub Bot
parent df543460d5
commit 629605d645
  1. 52
      file/prefetch_test.cc
  2. 25
      table/block_based/block_prefetcher.cc
  3. 7
      table/block_based/block_prefetcher.h
  4. 1
      unreleased_history/behavior_changes/fs_prefetch_compaction_read.md

@ -128,7 +128,14 @@ std::string BuildKey(int num, std::string postfix = "") {
return "my_key_" + std::to_string(num) + postfix; return "my_key_" + std::to_string(num) + postfix;
} }
// This test verifies the basic functionality of prefetching. // This test verifies the following basic functionalities of prefetching:
// (1) If underline file system supports prefetch, and directIO is not enabled
// make sure prefetch() is called and FilePrefetchBuffer is not used.
// (2) If underline file system doesn't support prefetch, or directIO is
// enabled, make sure prefetch() is not called and FilePrefetchBuffer is
// used.
// (3) Measure read bytes, hit and miss of SST's tail prefetching during table
// open.
TEST_P(PrefetchTest, Basic) { TEST_P(PrefetchTest, Basic) {
// First param is if the mockFS support_prefetch or not // First param is if the mockFS support_prefetch or not
bool support_prefetch = bool support_prefetch =
@ -165,6 +172,7 @@ TEST_P(PrefetchTest, Basic) {
ASSERT_OK(batch.Put(BuildKey(i), "value for range 1 key")); ASSERT_OK(batch.Put(BuildKey(i), "value for range 1 key"));
} }
ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// create second key range // create second key range
batch.Clear(); batch.Clear();
@ -172,6 +180,7 @@ TEST_P(PrefetchTest, Basic) {
ASSERT_OK(batch.Put(BuildKey(i, "key2"), "value for range 2 key")); ASSERT_OK(batch.Put(BuildKey(i, "key2"), "value for range 2 key"));
} }
ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// delete second key range // delete second key range
batch.Clear(); batch.Clear();
@ -179,6 +188,20 @@ TEST_P(PrefetchTest, Basic) {
ASSERT_OK(batch.Delete(BuildKey(i, "key2"))); ASSERT_OK(batch.Delete(BuildKey(i, "key2")));
} }
ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(db_->Flush(FlushOptions()));
// To verify SST file tail prefetch (once per file) during flush output
// verification
if (support_prefetch && !use_direct_io) {
ASSERT_TRUE(fs->IsPrefetchCalled());
ASSERT_EQ(3, fs->GetPrefetchCount());
ASSERT_EQ(0, buff_prefetch_count);
fs->ClearPrefetchCount();
} else {
ASSERT_FALSE(fs->IsPrefetchCalled());
ASSERT_EQ(buff_prefetch_count, 3);
buff_prefetch_count = 0;
}
// compact database // compact database
std::string start_key = BuildKey(0); std::string start_key = BuildKey(0);
@ -205,25 +228,34 @@ TEST_P(PrefetchTest, Basic) {
const uint64_t cur_table_open_prefetch_tail_hit = const uint64_t cur_table_open_prefetch_tail_hit =
options.statistics->getTickerCount(TABLE_OPEN_PREFETCH_TAIL_HIT); options.statistics->getTickerCount(TABLE_OPEN_PREFETCH_TAIL_HIT);
// To verify prefetch during compaction input read
if (support_prefetch && !use_direct_io) { if (support_prefetch && !use_direct_io) {
// If underline file system supports prefetch, and directIO is not enabled
// make sure prefetch() is called and FilePrefetchBuffer is not used.
ASSERT_TRUE(fs->IsPrefetchCalled()); ASSERT_TRUE(fs->IsPrefetchCalled());
fs->ClearPrefetchCount(); // To rule out false positive by the SST file tail prefetch during
// compaction output verification
ASSERT_GT(fs->GetPrefetchCount(), 1);
ASSERT_EQ(0, buff_prefetch_count); ASSERT_EQ(0, buff_prefetch_count);
fs->ClearPrefetchCount();
} else { } else {
// If underline file system doesn't support prefetch, or directIO is
// enabled, make sure prefetch() is not called and FilePrefetchBuffer is
// used.
ASSERT_FALSE(fs->IsPrefetchCalled()); ASSERT_FALSE(fs->IsPrefetchCalled());
ASSERT_GT(buff_prefetch_count, 0); if (use_direct_io) {
// To rule out false positive by the SST file tail prefetch during
// compaction output verification
ASSERT_GT(buff_prefetch_count, 1);
} else {
// In buffered IO, compaction readahead size is 0, leading to no prefetch
// during compaction input read
ASSERT_EQ(buff_prefetch_count, 1);
}
buff_prefetch_count = 0;
ASSERT_GT(cur_table_open_prefetch_tail_read.count, ASSERT_GT(cur_table_open_prefetch_tail_read.count,
prev_table_open_prefetch_tail_read.count); prev_table_open_prefetch_tail_read.count);
ASSERT_GT(cur_table_open_prefetch_tail_hit, ASSERT_GT(cur_table_open_prefetch_tail_hit,
prev_table_open_prefetch_tail_hit); prev_table_open_prefetch_tail_hit);
ASSERT_GE(cur_table_open_prefetch_tail_miss, ASSERT_GE(cur_table_open_prefetch_tail_miss,
prev_table_open_prefetch_tail_miss); prev_table_open_prefetch_tail_miss);
buff_prefetch_count = 0;
} }
// count the keys // count the keys
@ -236,7 +268,7 @@ TEST_P(PrefetchTest, Basic) {
(void)num_keys; (void)num_keys;
} }
// Make sure prefetch is called only if file system support prefetch. // To verify prefetch during user scan
if (support_prefetch && !use_direct_io) { if (support_prefetch && !use_direct_io) {
ASSERT_TRUE(fs->IsPrefetchCalled()); ASSERT_TRUE(fs->IsPrefetchCalled());
fs->ClearPrefetchCount(); fs->ClearPrefetchCount();

@ -17,9 +17,29 @@ void BlockPrefetcher::PrefetchIfNeeded(
const size_t readahead_size, bool is_for_compaction, const size_t readahead_size, bool is_for_compaction,
const bool no_sequential_checking, const bool no_sequential_checking,
const Env::IOPriority rate_limiter_priority) { const Env::IOPriority rate_limiter_priority) {
const size_t len = BlockBasedTable::BlockSizeWithTrailer(handle);
const size_t offset = handle.offset();
if (is_for_compaction) {
if (!rep->file->use_direct_io()) {
// If FS supports prefetching (readahead_limit_ will be non zero in that
// case) and current block exists in prefetch buffer then return.
if (offset + len <= readahead_limit_) {
return;
}
Status s = rep->file->Prefetch(offset, len + compaction_readahead_size_,
rate_limiter_priority);
if (s.ok()) {
readahead_limit_ = offset + len + compaction_readahead_size_;
return;
}
}
// If FS prefetch is not supported, fall back to use internal prefetch
// buffer. Discarding other return status of Prefetch calls intentionally,
// as we can fallback to reading from disk if Prefetch fails.
//
// num_file_reads is used by FilePrefetchBuffer only when // num_file_reads is used by FilePrefetchBuffer only when
// implicit_auto_readahead is set. // implicit_auto_readahead is set.
if (is_for_compaction) {
rep->CreateFilePrefetchBufferIfNotExists( rep->CreateFilePrefetchBufferIfNotExists(
compaction_readahead_size_, compaction_readahead_size_, compaction_readahead_size_, compaction_readahead_size_,
&prefetch_buffer_, /*implicit_auto_readahead=*/false, &prefetch_buffer_, /*implicit_auto_readahead=*/false,
@ -60,9 +80,6 @@ void BlockPrefetcher::PrefetchIfNeeded(
return; return;
} }
size_t len = BlockBasedTable::BlockSizeWithTrailer(handle);
size_t offset = handle.offset();
// If FS supports prefetching (readahead_limit_ will be non zero in that case) // If FS supports prefetching (readahead_limit_ will be non zero in that case)
// and current block exists in prefetch buffer then return. // and current block exists in prefetch buffer then return.
if (offset + len <= readahead_limit_) { if (offset + len <= readahead_limit_) {

@ -58,11 +58,12 @@ class BlockPrefetcher {
// lookup_context_.caller = kCompaction. // lookup_context_.caller = kCompaction.
size_t compaction_readahead_size_; size_t compaction_readahead_size_;
// readahead_size_ is used if underlying FS supports prefetching. // readahead_size_ is used in non-compaction read if underlying FS supports
// prefetching.
size_t readahead_size_; size_t readahead_size_;
size_t readahead_limit_ = 0; size_t readahead_limit_ = 0;
// initial_auto_readahead_size_ is used if RocksDB uses internal prefetch // initial_auto_readahead_size_ is used in non-compaction read if RocksDB uses
// buffer. // internal prefetch buffer.
uint64_t initial_auto_readahead_size_; uint64_t initial_auto_readahead_size_;
uint64_t num_file_reads_ = 0; uint64_t num_file_reads_ = 0;
uint64_t prev_offset_ = 0; uint64_t prev_offset_ = 0;

@ -0,0 +1 @@
Move prefetching responsibility to page cache for compaction read for non directIO use case
Loading…
Cancel
Save