Enable async prefetching for ReadOptions.readahead_size (#9827)

Summary:
Currently async prefetching is enabled for implicit internal auto readahead in FilePrefetchBuffer if `ReadOptions.async_io` is set. This PR enables async prefetching for `ReadOptions.readahead_size` when `ReadOptions.async_io` is set true.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9827

Test Plan: Update unit test

Reviewed By: anand1976

Differential Revision: D35552129

Pulled By: akankshamahajan15

fbshipit-source-id: d9f9a96672852a591375a21eef15355cf3289f5c
main
Akanksha Mahajan 2 years ago committed by Facebook GitHub Bot
parent b7db7eae26
commit 63e68a4e77
  1. 1
      HISTORY.md
  2. 9
      file/file_prefetch_buffer.cc
  3. 54
      file/prefetch_test.cc
  4. 1
      table/block_based/partitioned_index_reader.cc

@ -15,6 +15,7 @@
* For db_bench when --benchmark lists multiple tests and each test uses a seed for a RNG then the seeds across tests will no longer be repeated.
* Added an option to dynamically charge an updating estimated memory usage of block-based table reader to block cache if block cache available. To enable this feature, set `BlockBasedTableOptions::reserve_table_reader_memory = true`.
* Add new stat ASYNC_READ_BYTES that calculates number of bytes read during async read call and users can check if async code path is being called by RocksDB internal automatic prefetching for sequential reads.
* Enable async prefetching if ReadOptions.readahead_size is set along with ReadOptions.async_io in FilePrefetchBuffer.
### Behavior changes
* Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794).

@ -241,9 +241,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
del_fn_ = nullptr;
}
// TODO akanksha: Update TEST_SYNC_POINT after Async APIs are merged with
// normal prefetching.
TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:Start");
Status s;
size_t prefetch_size = length + readahead_size;
@ -475,7 +473,10 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync(
return false;
}
}
if (implicit_auto_readahead_ && async_io_) {
// async prefetching is enabled if it's implicit_auto_readahead_ or
// explicit readahead_size_ is passed along with ReadOptions.async_io =
// true.
if (async_io_) {
// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously.
s = PrefetchAsync(opts, reader, offset, n, readahead_size_ / 2,

@ -730,6 +730,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
}
MoveFilesToLevel(2);
int buff_prefetch_count = 0;
int buff_async_prefetch_count = 0;
int readahead_carry_over_count = 0;
int num_sst_files = NumTableFilesAtLevel(2);
size_t current_readahead_size = 0;
@ -740,6 +741,10 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
"FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsync:Start",
[&](void*) { buff_async_prefetch_count++; });
// The callback checks, since reads are sequential, readahead_size doesn't
// start from 8KB when iterator moves to next file and its called
// num_sst_files-1 times (excluding for first file).
@ -749,7 +754,6 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
size_t readahead_size = *reinterpret_cast<size_t*>(arg);
if (readahead_carry_over_count) {
ASSERT_GT(readahead_size, 8 * 1024);
// ASSERT_GE(readahead_size, current_readahead_size);
}
});
@ -764,7 +768,6 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
ReadOptions ro;
if (is_adaptive_readahead) {
ro.adaptive_readahead = true;
// TODO akanksha: Remove after adding new units.
ro.async_io = true;
}
@ -776,11 +779,13 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
num_keys++;
}
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
// For index and data blocks.
if (is_adaptive_readahead) {
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
ASSERT_GT(buff_async_prefetch_count, 0);
} else {
ASSERT_GT(buff_prefetch_count, 0);
ASSERT_EQ(readahead_carry_over_count, 0);
}
@ -858,8 +863,9 @@ TEST_P(PrefetchTest2, NonSequentialReads) {
int set_readahead = 0;
size_t readahead_size = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsync:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"BlockPrefetcher::SetReadaheadState",
[&](void* /*arg*/) { set_readahead++; });
@ -953,8 +959,9 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
size_t expected_current_readahead_size = 8 * 1024;
size_t decrease_readahead_size = 8 * 1024;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsync:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
current_readahead_size = *reinterpret_cast<size_t*>(arg);
@ -1043,8 +1050,17 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
extern "C" bool RocksDbIOUringEnable() { return true; }
class PrefetchTestWithPosix : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
PrefetchTestWithPosix() : DBTestBase("prefetch_test_with_posix", true) {}
};
INSTANTIATE_TEST_CASE_P(PrefetchTestWithPosix, PrefetchTestWithPosix,
::testing::Bool());
// Tests the default implementation of ReadAsync API with PosixFileSystem.
TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
@ -1100,19 +1116,25 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) {
int buff_prefetch_count = 0;
bool read_async_called = false;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
ReadOptions ro;
ro.adaptive_readahead = true;
ro.async_io = true;
if (GetParam()) {
ro.readahead_size = 16 * 1024;
}
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsync:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* /*arg*/) { read_async_called = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->EnableProcessing();
// Read the keys.
{
ReadOptions ro;
ro.adaptive_readahead = true;
ro.async_io = true;
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;

@ -82,6 +82,7 @@ InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
ro.io_timeout = read_options.io_timeout;
ro.adaptive_readahead = read_options.adaptive_readahead;
ro.async_io = read_options.async_io;
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(

Loading…
Cancel
Save