diff --git a/HISTORY.md b/HISTORY.md index 0c2fd8aca..a7afd99db 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Behavior changes * Compaction output file cutting logic now considers range tombstone start keys. For example, SST partitioner now may receive ParitionRequest for range tombstone start keys. +* If the async_io ReadOption is specified for MultiGet or NewIterator on a platform that doesn't support IO uring, the option is ignored and synchronous IO is used. ### Bug Fixes * Fixed an issue for backward iteration when user defined timestamp is enabled in combination with BlobDB. diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc index 607403ccc..2dbfff79a 100644 --- a/db/arena_wrapped_db_iter.cc +++ b/db/arena_wrapped_db_iter.cc @@ -47,6 +47,9 @@ void ArenaWrappedDBIter::Init( read_options_ = read_options; allow_refresh_ = allow_refresh; memtable_range_tombstone_iter_ = nullptr; + if (!env->GetFileSystem()->use_async_io()) { + read_options_.async_io = false; + } } Status ArenaWrappedDBIter::Refresh() { diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 6bd80bc6c..ad73d292e 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2302,9 +2302,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL0) { ASSERT_EQ(multiget_io_batch_size.count, 3); } #else // ROCKSDB_IOURING_PRESENT - if (GetParam()) { - ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); - } + ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0); #endif // ROCKSDB_IOURING_PRESENT } @@ -2338,16 +2336,18 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) { ASSERT_EQ(values[1], "val_l1_" + std::to_string(54)); ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); -#ifdef ROCKSDB_IOURING_PRESENT HistogramData multiget_io_batch_size; statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); +#ifdef ROCKSDB_IOURING_PRESENT // A batch of 3 async IOs is expected, one for each overlapping file in L1 ASSERT_EQ(multiget_io_batch_size.count, 1); ASSERT_EQ(multiget_io_batch_size.max, 3); -#endif // ROCKSDB_IOURING_PRESENT ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); +#else // ROCKSDB_IOURING_PRESENT + ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0); +#endif // ROCKSDB_IOURING_PRESENT } #ifdef ROCKSDB_IOURING_PRESENT @@ -2531,8 +2531,12 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { ASSERT_EQ(values[0], "val_l2_" + std::to_string(19)); ASSERT_EQ(values[1], "val_l2_" + std::to_string(26)); +#ifdef ROCKSDB_IOURING_PRESENT // Bloom filters in L0/L1 will avoid the coroutine calls in those levels ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); +#else // ROCKSDB_IOURING_PRESENT + ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0); +#endif // ROCKSDB_IOURING_PRESENT } #ifdef ROCKSDB_IOURING_PRESENT @@ -2623,18 +2627,17 @@ TEST_P(DBMultiGetAsyncIOTest, GetNoIOUring) { dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); ASSERT_EQ(values.size(), 3); - ASSERT_EQ(statuses[0], Status::NotSupported()); - ASSERT_EQ(statuses[1], Status::NotSupported()); - ASSERT_EQ(statuses[2], Status::NotSupported()); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::OK()); + ASSERT_EQ(statuses[2], Status::OK()); - HistogramData multiget_io_batch_size; + HistogramData async_read_bytes; - statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + statistics()->histogramData(ASYNC_READ_BYTES, &async_read_bytes); // A batch of 3 async IOs is expected, one for each overlapping file in L1 - ASSERT_EQ(multiget_io_batch_size.count, 1); - ASSERT_EQ(multiget_io_batch_size.max, 3); - ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 0); } INSTANTIATE_TEST_CASE_P(DBMultiGetAsyncIOTest, DBMultiGetAsyncIOTest, diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 7c9afea44..eddade837 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -238,6 +238,9 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, if (sv_) { RebuildIterators(false); } + if (!cfd_->ioptions()->env->GetFileSystem()->use_async_io()) { + read_options_.async_io = false; + } // immutable_status_ is a local aggregation of the // status of the immutable Iterators. diff --git a/db/forward_iterator.h b/db/forward_iterator.h index 748f7a1f7..cb418aeeb 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -122,7 +122,7 @@ class ForwardIterator : public InternalIterator { void DeleteIterator(InternalIterator* iter, bool is_arena = false); DBImpl* const db_; - const ReadOptions read_options_; + ReadOptions read_options_; ColumnFamilyData* const cfd_; const SliceTransform* const prefix_extractor_; const Comparator* user_comparator_; diff --git a/db/version_set.cc b/db/version_set.cc index acccf54d5..9075a58ac 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2121,7 +2121,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, max_file_size_for_l0_meta_pin_( MaxFileSizeForL0MetaPin(mutable_cf_options_)), version_number_(version_number), - io_tracer_(io_tracer) {} + io_tracer_(io_tracer), + use_async_io_(env_->GetFileSystem()->use_async_io()) {} Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, const Slice& blob_index_slice, @@ -2505,7 +2506,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end()); #if USE_COROUTINES if (read_options.async_io && read_options.optimize_multiget_for_io && - using_coroutines()) { + using_coroutines() && use_async_io_) { s = MultiGetAsync(read_options, range, &blob_ctxs); } else #endif // USE_COROUTINES @@ -2531,7 +2532,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, // Avoid using the coroutine version if we're looking in a L0 file, since // L0 files won't be parallelized anyway. The regular synchronous version // is faster. - if (!read_options.async_io || !using_coroutines() || + if (!read_options.async_io || !using_coroutines() || !use_async_io_ || fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) { if (f) { bool skip_filters = diff --git a/db/version_set.h b/db/version_set.h index 9c428526c..ef7e69fc7 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1075,6 +1075,7 @@ class Version { // used for debugging and logging purposes only. uint64_t version_number_; std::shared_ptr io_tracer_; + bool use_async_io_; Version(ColumnFamilyData* cfd, VersionSet* vset, const FileOptions& file_opt, MutableCFOptions mutable_cf_options, diff --git a/env/fs_posix.cc b/env/fs_posix.cc index 18f774f16..c2e8cfaf0 100644 --- a/env/fs_posix.cc +++ b/env/fs_posix.cc @@ -1183,6 +1183,14 @@ class PosixFileSystem : public FileSystem { #endif } + bool use_async_io() override { +#if defined(ROCKSDB_IOURING_PRESENT) + return IsIOUringEnabled(); +#else + return false; +#endif + } + #if defined(ROCKSDB_IOURING_PRESENT) // io_uring instance std::unique_ptr thread_local_io_urings_; diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 96abe598b..488e037ff 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -13,6 +13,11 @@ #endif #include "util/random.h" +namespace { +static bool enable_io_uring = true; +extern "C" bool RocksDbIOUringEnable() { return enable_io_uring; } +} // namespace + namespace ROCKSDB_NAMESPACE { class MockFS; @@ -1179,6 +1184,104 @@ TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) { Close(); } +TEST_P(PrefetchTest, DBIterAsyncIONoIOUring) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + + const int kNumKeys = 1000; + // Set options + bool use_direct_io = std::get<0>(GetParam()); + bool is_adaptive_readahead = std::get<1>(GetParam()); + + Options options; + SetGenericOptions(Env::Default(), use_direct_io, options); + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions table_options; + SetBlockBasedTableOptions(table_options); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + enable_io_uring = false; + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + int total_keys = 0; + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + + // Test - Iterate over the keys sequentially. + { + ReadOptions ro; + if (is_adaptive_readahead) { + ro.adaptive_readahead = true; + } + ro.async_io = true; + + ASSERT_OK(options.statistics->Reset()); + + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; + } + ASSERT_EQ(num_keys, total_keys); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(options.statistics->getTickerCount(READ_ASYNC_MICROS), 0); + } + } + + { + ReadOptions ro; + if (is_adaptive_readahead) { + ro.adaptive_readahead = true; + } + ro.async_io = true; + ro.tailing = true; + + ASSERT_OK(options.statistics->Reset()); + + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; + } + ASSERT_EQ(num_keys, total_keys); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(options.statistics->getTickerCount(READ_ASYNC_MICROS), 0); + } + } + Close(); + + enable_io_uring = true; +} + class PrefetchTest1 : public DBTestBase, public ::testing::WithParamInterface { public: @@ -1527,8 +1630,6 @@ TEST_P(PrefetchTest1, SeekParallelizationTest) { Close(); } -extern "C" bool RocksDbIOUringEnable() { return true; } - namespace { #ifdef GFLAGS const int kMaxArgCount = 100; @@ -1647,7 +1748,8 @@ TEST_P(PrefetchTest, ReadAsyncWithPosixFS) { } else { // Not all platforms support iouring. In that case, ReadAsync in posix // won't submit async requests. - ASSERT_EQ(iter->status(), Status::NotSupported()); + ASSERT_EQ(num_keys, total_keys); + ASSERT_EQ(buff_prefetch_count, 0); } } @@ -1760,18 +1862,19 @@ TEST_P(PrefetchTest, MultipleSeekWithPosixFS) { iter->Next(); } + ASSERT_OK(iter->status()); + ASSERT_EQ(num_keys, num_keys_first_batch); + // Check stats to make sure async prefetch is done. + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); if (read_async_called) { - ASSERT_OK(iter->status()); - ASSERT_EQ(num_keys, num_keys_first_batch); - // Check stats to make sure async prefetch is done. - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); ASSERT_GT(async_read_bytes.count, 0); ASSERT_GT(get_perf_context()->number_async_seek, 0); } else { // Not all platforms support iouring. In that case, ReadAsync in posix // won't submit async requests. - ASSERT_EQ(iter->status(), Status::NotSupported()); + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); } } @@ -1788,25 +1891,26 @@ TEST_P(PrefetchTest, MultipleSeekWithPosixFS) { iter->Next(); } - if (read_async_called) { - ASSERT_OK(iter->status()); - ASSERT_EQ(num_keys, num_keys_second_batch); + ASSERT_OK(iter->status()); + ASSERT_EQ(num_keys, num_keys_second_batch); + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + HistogramData prefetched_bytes_discarded; + options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, + &prefetched_bytes_discarded); + ASSERT_GT(prefetched_bytes_discarded.count, 0); + if (read_async_called) { ASSERT_GT(buff_prefetch_count, 0); // Check stats to make sure async prefetch is done. - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); - HistogramData prefetched_bytes_discarded; - options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, - &prefetched_bytes_discarded); ASSERT_GT(async_read_bytes.count, 0); ASSERT_GT(get_perf_context()->number_async_seek, 0); - ASSERT_GT(prefetched_bytes_discarded.count, 0); } else { // Not all platforms support iouring. In that case, ReadAsync in posix // won't submit async requests. - ASSERT_EQ(iter->status(), Status::NotSupported()); + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); } } } @@ -1885,51 +1989,44 @@ TEST_P(PrefetchTest, SeekParallelizationTestWithPosix) { // Each block contains around 4 keys. auto iter = std::unique_ptr(db_->NewIterator(ro)); iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization. - if (std::get<1>(GetParam()) && !read_async_called) { - ASSERT_EQ(iter->status(), Status::NotSupported()); - } else { - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); - // New data block. Since num_file_reads in FilePrefetch after this read is - // 2, it won't go for prefetching. - iter->Next(); - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); - iter->Next(); - ASSERT_TRUE(iter->Valid()); + // New data block. Since num_file_reads in FilePrefetch after this read is + // 2, it won't go for prefetching. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); - // Prefetch data. - iter->Next(); + // Prefetch data. + iter->Next(); - if (read_async_called) { - ASSERT_TRUE(iter->Valid()); - // Check stats to make sure async prefetch is done. - { - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, - &async_read_bytes); - ASSERT_GT(async_read_bytes.count, 0); - ASSERT_GT(get_perf_context()->number_async_seek, 0); - if (std::get<1>(GetParam())) { - ASSERT_EQ(buff_prefetch_count, 1); - } else { - ASSERT_EQ(buff_prefetch_count, 2); - } - } + ASSERT_TRUE(iter->Valid()); + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + if (std::get<1>(GetParam())) { + ASSERT_EQ(buff_prefetch_count, 1); } else { - // Not all platforms support iouring. In that case, ReadAsync in posix - // won't submit async requests. - ASSERT_EQ(iter->status(), Status::NotSupported()); + ASSERT_EQ(buff_prefetch_count, 2); } + } else { + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); } } Close(); @@ -2023,17 +2120,17 @@ TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) { ASSERT_OK(db_->EndIOTrace()); ASSERT_OK(env_->FileExists(trace_file_path)); + ASSERT_EQ(num_keys, total_keys); + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); if (read_async_called) { - ASSERT_EQ(num_keys, total_keys); ASSERT_GT(buff_prefetch_count, 0); // Check stats to make sure async prefetch is done. - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); ASSERT_GT(async_read_bytes.count, 0); } else { // Not all platforms support iouring. In that case, ReadAsync in posix // won't submit async requests. - ASSERT_EQ(iter->status(), Status::NotSupported()); + ASSERT_EQ(async_read_bytes.count, 0); } // Check the file to see if ReadAsync is logged. diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 77f9e2dfa..97b21e286 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -682,6 +682,10 @@ class FileSystem : public Customizable { return IOStatus::OK(); } + // Indicates to upper layers whether the FileSystem supports/uses async IO + // or not + virtual bool use_async_io() { return true; } + // If you're adding methods here, remember to add them to EnvWrapper too. private: @@ -1522,6 +1526,8 @@ class FileSystemWrapper : public FileSystem { return target_->AbortIO(io_handles); } + virtual bool use_async_io() override { return target_->use_async_io(); } + protected: std::shared_ptr target_; }; diff --git a/port/win/env_win.h b/port/win/env_win.h index 8cd8699b5..cf04ec2fe 100644 --- a/port/win/env_win.h +++ b/port/win/env_win.h @@ -227,6 +227,7 @@ class WinFileSystem : public FileSystem { const FileOptions& file_options) const override; FileOptions OptimizeForManifestWrite( const FileOptions& file_options) const override; + bool use_async_io() override { return false; } protected: static uint64_t FileTimeToUnixTime(const FILETIME& ftTime);