diff --git a/HISTORY.md b/HISTORY.md index 15b66aeb2..1521ee101 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -15,6 +15,7 @@ * For db_bench when --benchmark lists multiple tests and each test uses a seed for a RNG then the seeds across tests will no longer be repeated. * Added an option to dynamically charge an updating estimated memory usage of block-based table reader to block cache if block cache available. To enable this feature, set `BlockBasedTableOptions::reserve_table_reader_memory = true`. * Add new stat ASYNC_READ_BYTES that calculates number of bytes read during async read call and users can check if async code path is being called by RocksDB internal automatic prefetching for sequential reads. +* Enable async prefetching if ReadOptions.readahead_size is set along with ReadOptions.async_io in FilePrefetchBuffer. ### Behavior changes * Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794). diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index a357221a7..6d98f3b53 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -241,9 +241,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, del_fn_ = nullptr; } - // TODO akanksha: Update TEST_SYNC_POINT after Async APIs are merged with - // normal prefetching. - TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start"); + TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:Start"); Status s; size_t prefetch_size = length + readahead_size; @@ -475,7 +473,10 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( return false; } } - if (implicit_auto_readahead_ && async_io_) { + // async prefetching is enabled if it's implicit_auto_readahead_ or + // explicit readahead_size_ is passed along with ReadOptions.async_io = + // true. + if (async_io_) { // Prefetch n + readahead_size_/2 synchronously as remaining // readahead_size_/2 will be prefetched asynchronously. s = PrefetchAsync(opts, reader, offset, n, readahead_size_ / 2, diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index b9cfa285a..579d15d6b 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -730,6 +730,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { } MoveFilesToLevel(2); int buff_prefetch_count = 0; + int buff_async_prefetch_count = 0; int readahead_carry_over_count = 0; int num_sst_files = NumTableFilesAtLevel(2); size_t current_readahead_size = 0; @@ -740,6 +741,10 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { "FilePrefetchBuffer::Prefetch:Start", [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsync:Start", + [&](void*) { buff_async_prefetch_count++; }); + // The callback checks, since reads are sequential, readahead_size doesn't // start from 8KB when iterator moves to next file and its called // num_sst_files-1 times (excluding for first file). @@ -749,7 +754,6 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { size_t readahead_size = *reinterpret_cast(arg); if (readahead_carry_over_count) { ASSERT_GT(readahead_size, 8 * 1024); - // ASSERT_GE(readahead_size, current_readahead_size); } }); @@ -764,7 +768,6 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { ReadOptions ro; if (is_adaptive_readahead) { ro.adaptive_readahead = true; - // TODO akanksha: Remove after adding new units. ro.async_io = true; } @@ -776,11 +779,13 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { num_keys++; } ASSERT_EQ(num_keys, total_keys); - ASSERT_GT(buff_prefetch_count, 0); + // For index and data blocks. if (is_adaptive_readahead) { ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1)); + ASSERT_GT(buff_async_prefetch_count, 0); } else { + ASSERT_GT(buff_prefetch_count, 0); ASSERT_EQ(readahead_carry_over_count, 0); } @@ -858,8 +863,9 @@ TEST_P(PrefetchTest2, NonSequentialReads) { int set_readahead = 0; size_t readahead_size = 0; - SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", - [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsync:Start", + [&](void*) { buff_prefetch_count++; }); SyncPoint::GetInstance()->SetCallBack( "BlockPrefetcher::SetReadaheadState", [&](void* /*arg*/) { set_readahead++; }); @@ -953,8 +959,9 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { size_t expected_current_readahead_size = 8 * 1024; size_t decrease_readahead_size = 8 * 1024; - SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", - [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsync:Start", + [&](void*) { buff_prefetch_count++; }); SyncPoint::GetInstance()->SetCallBack( "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { current_readahead_size = *reinterpret_cast(arg); @@ -1043,8 +1050,17 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { extern "C" bool RocksDbIOUringEnable() { return true; } +class PrefetchTestWithPosix : public DBTestBase, + public ::testing::WithParamInterface { + public: + PrefetchTestWithPosix() : DBTestBase("prefetch_test_with_posix", true) {} +}; + +INSTANTIATE_TEST_CASE_P(PrefetchTestWithPosix, PrefetchTestWithPosix, + ::testing::Bool()); + // Tests the default implementation of ReadAsync API with PosixFileSystem. -TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) { +TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) { if (mem_env_ || encrypted_env_) { ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); return; @@ -1100,19 +1116,25 @@ TEST_F(PrefetchTest2, ReadAsyncWithPosixFS) { int buff_prefetch_count = 0; bool read_async_called = false; - SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", - [&](void*) { buff_prefetch_count++; }); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + if (GetParam()) { + ro.readahead_size = 16 * 1024; + } + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsync:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->SetCallBack( "UpdateResults::io_uring_result", [&](void* /*arg*/) { read_async_called = true; }); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + SyncPoint::GetInstance()->EnableProcessing(); // Read the keys. { - ReadOptions ro; - ro.adaptive_readahead = true; - ro.async_io = true; - ASSERT_OK(options.statistics->Reset()); auto iter = std::unique_ptr(db_->NewIterator(ro)); int num_keys = 0; diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index 4502d0122..8e690a61c 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -82,6 +82,7 @@ InternalIteratorBase* PartitionIndexReader::NewIterator( ro.io_timeout = read_options.io_timeout; ro.adaptive_readahead = read_options.adaptive_readahead; ro.async_io = read_options.async_io; + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. std::unique_ptr> index_iter(