diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 4ac0d0504..f7d4c9591 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -109,6 +109,7 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t read_len, uint64_t rounddown_start, uint32_t index) { + TEST_SYNC_POINT("FilePrefetchBuffer::ReadAsync"); // callback for async read request. auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this, std::placeholders::_1, std::placeholders::_2); diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 11cb841b9..23e7454ed 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -79,6 +79,27 @@ class PrefetchTest public ::testing::WithParamInterface> { public: PrefetchTest() : DBTestBase("prefetch_test", true) {} + + void SetGenericOptions(Env* env, bool use_direct_io, Options& options) { + options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env; + options.disable_auto_compactions = true; + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + } + + void SetBlockBasedTableOptions(BlockBasedTableOptions& table_options) { + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + } }; INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest, @@ -89,28 +110,23 @@ std::string BuildKey(int num, std::string postfix = "") { return "my_key_" + std::to_string(num) + postfix; } +// This test verifies the basic functionality of prefetching. TEST_P(PrefetchTest, Basic) { // First param is if the mockFS support_prefetch or not bool support_prefetch = std::get<0>(GetParam()) && test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); + std::shared_ptr fs = + std::make_shared(env_->GetFileSystem(), support_prefetch); // Second param is if directIO is enabled or not bool use_direct_io = std::get<1>(GetParam()); - const int kNumKeys = 1100; - std::shared_ptr fs = - std::make_shared(env_->GetFileSystem(), support_prefetch); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } + Options options; + SetGenericOptions(env.get(), use_direct_io, options); + const int kNumKeys = 1100; int buff_prefetch_count = 0; SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", [&](void*) { buff_prefetch_count++; }); @@ -192,35 +208,24 @@ TEST_P(PrefetchTest, Basic) { } #ifndef ROCKSDB_LITE +// This test verifies BlockBasedTableOptions.max_auto_readahead_size is +// configured dynamically. TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) { // First param is if the mockFS support_prefetch or not bool support_prefetch = std::get<0>(GetParam()) && test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); + std::shared_ptr fs = + std::make_shared(env_->GetFileSystem(), support_prefetch); // Second param is if directIO is enabled or not bool use_direct_io = std::get<1>(GetParam()); - std::shared_ptr fs = - std::make_shared(env_->GetFileSystem(), support_prefetch); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - options.disable_auto_compactions = true; - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } + Options options; + SetGenericOptions(env.get(), use_direct_io, options); BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + SetBlockBasedTableOptions(table_options); table_options.max_auto_readahead_size = 0; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); @@ -329,6 +334,8 @@ TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) { Close(); } +// This test verifies BlockBasedTableOptions.initial_auto_readahead_size is +// configured dynamically. TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) { // First param is if the mockFS support_prefetch or not bool support_prefetch = @@ -341,23 +348,10 @@ TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) { std::shared_ptr fs = std::make_shared(env_->GetFileSystem(), support_prefetch); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - options.disable_auto_compactions = true; - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } + Options options; + SetGenericOptions(env.get(), use_direct_io, options); BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + SetBlockBasedTableOptions(table_options); table_options.initial_auto_readahead_size = 0; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); @@ -468,6 +462,8 @@ TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) { Close(); } +// This test verifies BlockBasedTableOptions.num_file_reads_for_auto_readahead +// is configured dynamically. TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) { // First param is if the mockFS support_prefetch or not bool support_prefetch = @@ -482,26 +478,13 @@ TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) { // Second param is if directIO is enabled or not bool use_direct_io = std::get<1>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - + Options options; + SetGenericOptions(env.get(), use_direct_io, options); BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + SetBlockBasedTableOptions(table_options); table_options.num_file_reads_for_auto_readahead = 0; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } - int buff_prefetch_count = 0; SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", [&](void*) { buff_prefetch_count++; }); @@ -578,6 +561,13 @@ TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) { } #endif // !ROCKSDB_LITE +// This test verifies the basic functionality of implicit autoreadahead: +// - Enable implicit autoreadahead and prefetch only if sequential blocks are +// read, +// - If data is already in buffer and few blocks are not requested to read, +// don't reset, +// - If data blocks are sequential during read after enabling implicit +// autoreadahead, reset readahead parameters. TEST_P(PrefetchTest, PrefetchWhenReseek) { // First param is if the mockFS support_prefetch or not bool support_prefetch = @@ -592,25 +582,12 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) { // Second param is if directIO is enabled or not bool use_direct_io = std::get<1>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - + Options options; + SetGenericOptions(env.get(), use_direct_io, options); BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + SetBlockBasedTableOptions(table_options); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } - int buff_prefetch_count = 0; SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", [&](void*) { buff_prefetch_count++; }); @@ -846,6 +823,12 @@ TEST_P(PrefetchTest, PrefetchWhenReseek) { Close(); } +// This test verifies the functionality of implicit autoreadahead when caching +// is enabled: +// - If data is already in buffer and few blocks are not requested to read, +// don't reset, +// - If block was eligible for prefetching/in buffer but found in cache, don't +// prefetch and reset. TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) { // First param is if the mockFS support_prefetch or not bool support_prefetch = @@ -860,26 +843,15 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) { // Second param is if directIO is enabled or not bool use_direct_io = std::get<1>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - + Options options; + SetGenericOptions(env.get(), use_direct_io, options); BlockBasedTableOptions table_options; + SetBlockBasedTableOptions(table_options); std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB table_options.block_cache = cache; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + table_options.no_block_cache = false; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } - int buff_prefetch_count = 0; SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", [&](void*) { buff_prefetch_count++; }); @@ -978,6 +950,7 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) { } #ifndef ROCKSDB_LITE +// This test verifies the functionality of ReadOptions.adaptive_readahead. TEST_P(PrefetchTest, DBIterLevelReadAhead) { const int kNumKeys = 1000; // Set options @@ -988,23 +961,11 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) { bool use_direct_io = std::get<0>(GetParam()); bool is_adaptive_readahead = std::get<1>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; + Options options; + SetGenericOptions(env.get(), use_direct_io, options); options.statistics = CreateDBStatistics(); - options.env = env.get(); - - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + SetBlockBasedTableOptions(table_options); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Status s = TryReopen(options); @@ -1028,7 +989,6 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) { } MoveFilesToLevel(2); int buff_prefetch_count = 0; - int buff_async_prefetch_count = 0; int readahead_carry_over_count = 0; int num_sst_files = NumTableFilesAtLevel(2); size_t current_readahead_size = 0; @@ -1039,6 +999,101 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) { "FilePrefetchBuffer::Prefetch:Start", [&](void*) { buff_prefetch_count++; }); + // The callback checks, since reads are sequential, readahead_size doesn't + // start from 8KB when iterator moves to next file and its called + // num_sst_files-1 times (excluding for first file). + SyncPoint::GetInstance()->SetCallBack( + "BlockPrefetcher::SetReadaheadState", [&](void* arg) { + readahead_carry_over_count++; + size_t readahead_size = *reinterpret_cast(arg); + if (readahead_carry_over_count) { + ASSERT_GT(readahead_size, 8 * 1024); + } + }); + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { + current_readahead_size = *reinterpret_cast(arg); + ASSERT_GT(current_readahead_size, 0); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + ReadOptions ro; + if (is_adaptive_readahead) { + ro.adaptive_readahead = true; + } + + ASSERT_OK(options.statistics->Reset()); + + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; + } + ASSERT_EQ(num_keys, total_keys); + + // For index and data blocks. + if (is_adaptive_readahead) { + ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1)); + } else { + ASSERT_GT(buff_prefetch_count, 0); + ASSERT_EQ(readahead_carry_over_count, 0); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + } + Close(); +} + +// This test verifies the functionality of ReadOptions.adaptive_readahead when +// async_io is enabled. +TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) { + const int kNumKeys = 1000; + // Set options + std::shared_ptr fs = + std::make_shared(env_->GetFileSystem(), false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = std::get<0>(GetParam()); + bool is_adaptive_readahead = std::get<1>(GetParam()); + + Options options; + SetGenericOptions(env.get(), use_direct_io, options); + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions table_options; + SetBlockBasedTableOptions(table_options); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + int total_keys = 0; + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + int buff_async_prefetch_count = 0; + int readahead_carry_over_count = 0; + int num_sst_files = NumTableFilesAtLevel(2); + size_t current_readahead_size = 0; + + // Test - Iterate over the keys sequentially. + { SyncPoint::GetInstance()->SetCallBack( "FilePrefetchBuffer::PrefetchAsyncInternal:Start", [&](void*) { buff_async_prefetch_count++; }); @@ -1066,8 +1121,8 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) { ReadOptions ro; if (is_adaptive_readahead) { ro.adaptive_readahead = true; - ro.async_io = true; } + ro.async_io = true; ASSERT_OK(options.statistics->Reset()); @@ -1082,11 +1137,10 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) { // For index and data blocks. if (is_adaptive_readahead) { ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1)); - ASSERT_GT(buff_async_prefetch_count, 0); } else { - ASSERT_GT(buff_prefetch_count, 0); ASSERT_EQ(readahead_carry_over_count, 0); } + ASSERT_GT(buff_async_prefetch_count, 0); // Check stats to make sure async prefetch is done. { @@ -1110,11 +1164,34 @@ class PrefetchTest1 : public DBTestBase, public ::testing::WithParamInterface { public: PrefetchTest1() : DBTestBase("prefetch_test1", true) {} + + void SetGenericOptions(Env* env, bool use_direct_io, Options& options) { + options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env; + options.disable_auto_compactions = true; + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + } + + void SetBlockBasedTableOptions(BlockBasedTableOptions& table_options) { + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + } }; INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool()); #ifndef ROCKSDB_LITE +// This test verifies the functionality of ReadOptions.adaptive_readahead when +// reads are not sequential. TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) { const int kNumKeys = 1000; // Set options @@ -1122,21 +1199,10 @@ TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) { std::make_shared(env_->GetFileSystem(), false); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - if (GetParam()) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } + Options options; + SetGenericOptions(env.get(), GetParam(), options); BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + SetBlockBasedTableOptions(table_options); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Status s = TryReopen(options); @@ -1208,6 +1274,16 @@ TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) { } #endif //! ROCKSDB_LITE +// This test verifies the functionality of adaptive_readaheadsize with cache and +// if block is found in cache, decrease the readahead_size if +// - its enabled internally by RocksDB (implicit_auto_readahead_) and, +// - readahead_size is greater than 0 and, +// - the block would have called prefetch API if not found in cache for +// which conditions are: +// - few/no bytes are in buffer and, +// - block is sequential with the previous read and, +// - num_file_reads_ + 1 (including this read) > +// num_file_reads_for_auto_readahead_ TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) { const int kNumKeys = 2000; // Set options @@ -1215,24 +1291,14 @@ TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) { std::make_shared(env_->GetFileSystem(), false); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - if (GetParam()) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } - + Options options; + SetGenericOptions(env.get(), GetParam(), options); options.statistics = CreateDBStatistics(); BlockBasedTableOptions table_options; + SetBlockBasedTableOptions(table_options); std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB table_options.block_cache = cache; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + table_options.no_block_cache = false; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Status s = TryReopen(options); @@ -1352,6 +1418,8 @@ TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) { Close(); } +// This test verifies the basic functionality of seek parallelization for +// async_io. TEST_P(PrefetchTest1, SeekParallelizationTest) { const int kNumKeys = 2000; // Set options @@ -1359,23 +1427,11 @@ TEST_P(PrefetchTest1, SeekParallelizationTest) { std::make_shared(env_->GetFileSystem(), false); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - if (GetParam()) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } - + Options options; + SetGenericOptions(env.get(), GetParam(), options); options.statistics = CreateDBStatistics(); BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + SetBlockBasedTableOptions(table_options); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Status s = TryReopen(options); @@ -1485,7 +1541,8 @@ void RunIOTracerParserTool(std::string trace_file) { #endif // ROCKSDB_LITE } // namespace -// Tests the default implementation of ReadAsync API with PosixFileSystem. +// Tests the default implementation of ReadAsync API with PosixFileSystem during +// prefetching. TEST_P(PrefetchTest, ReadAsyncWithPosixFS) { if (mem_env_ || encrypted_env_) { ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); @@ -1498,22 +1555,11 @@ TEST_P(PrefetchTest, ReadAsyncWithPosixFS) { std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); bool use_direct_io = std::get<0>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); + Options options; + SetGenericOptions(env.get(), use_direct_io, options); options.statistics = CreateDBStatistics(); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + SetBlockBasedTableOptions(table_options); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Status s = TryReopen(options); @@ -1600,6 +1646,8 @@ TEST_P(PrefetchTest, ReadAsyncWithPosixFS) { Close(); } +// This test verifies implementation of seek parallelization with +// PosixFileSystem during prefetching. TEST_P(PrefetchTest, MultipleSeekWithPosixFS) { if (mem_env_ || encrypted_env_) { ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); @@ -1612,22 +1660,11 @@ TEST_P(PrefetchTest, MultipleSeekWithPosixFS) { std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); bool use_direct_io = std::get<0>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); + Options options; + SetGenericOptions(env.get(), use_direct_io, options); options.statistics = CreateDBStatistics(); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + SetBlockBasedTableOptions(table_options); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Status s = TryReopen(options); @@ -1774,7 +1811,9 @@ TEST_P(PrefetchTest, MultipleSeekWithPosixFS) { Close(); } -TEST_P(PrefetchTest, SeekParallelizationTest1) { +// This test verifies implementation of seek parallelization with +// PosixFileSystem during prefetching. +TEST_P(PrefetchTest, SeekParallelizationTestWithPosix) { if (mem_env_ || encrypted_env_) { ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); return; @@ -1786,23 +1825,11 @@ TEST_P(PrefetchTest, SeekParallelizationTest1) { std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); bool use_direct_io = std::get<0>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } - + Options options; + SetGenericOptions(env.get(), use_direct_io, options); options.statistics = CreateDBStatistics(); BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + SetBlockBasedTableOptions(table_options); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Status s = TryReopen(options); @@ -1904,6 +1931,7 @@ TEST_P(PrefetchTest, SeekParallelizationTest1) { #ifndef ROCKSDB_LITE #ifdef GFLAGS +// This test verifies io_tracing with PosixFileSystem during prefetching. TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) { if (mem_env_ || encrypted_env_) { ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); @@ -1916,22 +1944,11 @@ TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) { std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); bool use_direct_io = std::get<0>(GetParam()); - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); + Options options; + SetGenericOptions(env.get(), use_direct_io, options); options.statistics = CreateDBStatistics(); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + SetBlockBasedTableOptions(table_options); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Status s = TryReopen(options); @@ -2036,6 +2053,7 @@ class FilePrefetchBufferTest : public testing::Test { fs_ = FileSystem::Default(); test_dir_ = test::PerThreadDBPath("file_prefetch_buffer_test"); ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr)); + stats_ = CreateDBStatistics(); } void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); } @@ -2052,8 +2070,9 @@ class FilePrefetchBufferTest : public testing::Test { std::string fpath = Path(fname); std::unique_ptr f; ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr)); - reader->reset(new RandomAccessFileReader(std::move(f), fpath, - env_->GetSystemClock().get())); + reader->reset(new RandomAccessFileReader( + std::move(f), fpath, env_->GetSystemClock().get(), + /*io_tracer=*/nullptr, stats_.get())); } void AssertResult(const std::string& content, @@ -2066,11 +2085,13 @@ class FilePrefetchBufferTest : public testing::Test { } FileSystem* fs() { return fs_.get(); } + Statistics* stats() { return stats_.get(); } private: Env* env_; std::shared_ptr fs_; std::string test_dir_; + std::shared_ptr stats_; std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } }; @@ -2099,6 +2120,53 @@ TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) { ASSERT_TRUE(fpb.TryReadFromCacheAsync(IOOptions(), r.get(), 8192, 8192, &result, &s, Env::IOPriority::IO_LOW)); } + +TEST_F(FilePrefetchBufferTest, NoSyncWithAsyncIO) { + std::string fname = "seek-with-block-cache-hit"; + Random rand(0); + std::string content = rand.RandomString(32768); + Write(fname, content); + + FileOptions opts; + std::unique_ptr r; + Read(fname, opts, &r); + + FilePrefetchBuffer fpb( + /*readahead_size=*/8192, /*max_readahead_size=*/16384, /*enable=*/true, + /*track_min_offset=*/false, /*implicit_auto_readahead=*/false, + /*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0, fs()); + + int read_async_called = 0; + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::ReadAsync", + [&](void* /*arg*/) { read_async_called++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Slice async_result; + // Simulate a seek of 4000 bytes at offset 3000. Due to the readahead + // settings, it will do two reads of 4000+4096 and 4096 + Status s = fpb.PrefetchAsync(IOOptions(), r.get(), 3000, 4000, &async_result); + // Platforms that don't have IO uring may not support async IO + ASSERT_TRUE(s.IsTryAgain() || s.IsNotSupported()); + + ASSERT_TRUE(fpb.TryReadFromCacheAsync(IOOptions(), r.get(), /*offset=*/3000, + /*length=*/4000, &async_result, &s, + Env::IOPriority::IO_LOW)); + // No sync call should be made. + HistogramData sst_read_micros; + stats()->histogramData(SST_READ_MICROS, &sst_read_micros); + ASSERT_EQ(sst_read_micros.count, 0); + + // Number of async calls should be. + ASSERT_EQ(read_async_called, 2); + // Length should be 4000. + ASSERT_EQ(async_result.size(), 4000); + // Data correctness. + Slice result(content.c_str() + 3000, 4000); + ASSERT_EQ(result.size(), 4000); + ASSERT_EQ(result, async_result); +} + #endif // ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE