Fix extra prefetching when num_file_reads_for_auto_readahead is 1 in async_io (#11560)

Summary:
When num_file_reads_for_auto_readahead = 1, during seek, it would go for prefetchingextra data in second buffer along with seek data, that would lead to increase in read data and
discarded bytes.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11560

Test Plan: Added unit test

Reviewed By: anand1976

Differential Revision: D47008102

Pulled By: akankshamahajan15

fbshipit-source-id: 566c6131cb5f968d5efb81fd0ab233ff7e534ab0
oxigraph-main
akankshamahajan 2 years ago committed by Facebook GitHub Bot
parent ca50ccc71a
commit ff1cc8a63e
  1. 3
      file/file_prefetch_buffer.cc
  2. 99
      file/prefetch_test.cc
  3. 1
      unreleased_history/bug_fixes/seek_extraprefetch.md

@ -831,7 +831,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
bool is_eligible_for_prefetching = false;
if (readahead_size_ > 0 &&
(!implicit_auto_readahead_ ||
num_file_reads_ + 1 >= num_file_reads_for_auto_readahead_)) {
num_file_reads_ >= num_file_reads_for_auto_readahead_)) {
is_eligible_for_prefetching = true;
}
@ -941,6 +941,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
prev_len_ = 0;
}
if (read_len2) {
TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching");
s = ReadAsync(opts, reader, read_len2, rounddown_start2, second);
if (!s.ok()) {
DestroyAndClearIOHandle(second);

@ -1558,6 +1558,105 @@ class PrefetchTest1 : public DBTestBase,
INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool());
TEST_P(PrefetchTest1, SeekWithExtraPrefetchAsyncIO) {
const int kNumKeys = 2000;
// Set options
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), GetParam(), options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
Close();
for (size_t i = 0; i < 3; i++) {
table_options.num_file_reads_for_auto_readahead = i;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
s = TryReopen(options);
ASSERT_OK(s);
int buff_prefetch_count = 0;
int extra_prefetch_buff_cnt = 0;
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching",
[&](void*) { extra_prefetch_buff_cnt++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.async_io = true;
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
// First Seek
iter->Seek(BuildKey(
0)); // Prefetch data on seek because of seek parallelization.
ASSERT_TRUE(iter->Valid());
// Do extra prefetching in Seek only if num_file_reads_for_auto_readahead
// = 0.
ASSERT_EQ(extra_prefetch_buff_cnt, (i == 0 ? 1 : 0));
// buff_prefetch_count is 2 because of index block when
// num_file_reads_for_auto_readahead = 0.
// If num_file_reads_for_auto_readahead > 0, index block isn't prefetched.
ASSERT_EQ(buff_prefetch_count, i == 0 ? 2 : 1);
extra_prefetch_buff_cnt = 0;
buff_prefetch_count = 0;
// Reset all values of FilePrefetchBuffer on new seek.
iter->Seek(
BuildKey(22)); // Prefetch data because of seek parallelization.
ASSERT_TRUE(iter->Valid());
// Do extra prefetching in Seek only if num_file_reads_for_auto_readahead
// = 0.
ASSERT_EQ(extra_prefetch_buff_cnt, (i == 0 ? 1 : 0));
ASSERT_EQ(buff_prefetch_count, 1);
extra_prefetch_buff_cnt = 0;
buff_prefetch_count = 0;
// Reset all values of FilePrefetchBuffer on new seek.
iter->Seek(
BuildKey(33)); // Prefetch data because of seek parallelization.
ASSERT_TRUE(iter->Valid());
// Do extra prefetching in Seek only if num_file_reads_for_auto_readahead
// = 0.
ASSERT_EQ(extra_prefetch_buff_cnt, (i == 0 ? 1 : 0));
ASSERT_EQ(buff_prefetch_count, 1);
}
Close();
}
}
// This test verifies the functionality of ReadOptions.adaptive_readahead when
// reads are not sequential.
TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) {

@ -0,0 +1 @@
Fix extra prefetching during seek in async_io when BlockBasedTableOptions.num_file_reads_for_auto_readahead_ = 1 leading to extra reads than required.
Loading…
Cancel
Save