diff --git a/HISTORY.md b/HISTORY.md index 8b2aaa130..2d053289e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### New Features * Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`. * Support backward iteration when `ReadOptions::iter_start_ts` is set. +* Provide support for ReadOptions.async_io with direct_io to improve Seek latency by using async IO to parallelize child iterator seek and doing asynchronous prefetching on sequential scans. ### Public API changes * Add metadata related structs and functions in C API, including diff --git a/env/fs_posix.cc b/env/fs_posix.cc index bf204ac96..7dba5b81c 100644 --- a/env/fs_posix.cc +++ b/env/fs_posix.cc @@ -1102,15 +1102,21 @@ class PosixFileSystem : public FileSystem { req.scratch = posix_handle->scratch; req.offset = posix_handle->offset; req.len = posix_handle->len; + size_t finished_len = 0; size_t bytes_read = 0; + bool read_again = false; UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len, - true /*async_read*/, finished_len, &req, bytes_read); + true /*async_read*/, posix_handle->use_direct_io, + posix_handle->alignment, finished_len, &req, bytes_read, + read_again); posix_handle->is_finished = true; io_uring_cqe_seen(iu, cqe); posix_handle->cb(req, posix_handle->cb_arg); + (void)finished_len; (void)bytes_read; + (void)read_again; if (static_cast(io_handles[i]) == posix_handle) { break; diff --git a/env/io_posix.cc b/env/io_posix.cc index 2e865bb75..e7dfe8fa8 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -199,23 +199,6 @@ bool IsSyncFileRangeSupported(int fd) { } // anonymous namespace -/* - * DirectIOHelper - */ -namespace { - -bool IsSectorAligned(const size_t off, size_t sector_size) { - assert((sector_size & (sector_size - 1)) == 0); - return (off & (sector_size - 1)) == 0; -} - -#ifndef NDEBUG -bool IsSectorAligned(const void* ptr, size_t sector_size) { - return uintptr_t(ptr) % sector_size == 0; -} -#endif -} // namespace - /* * PosixSequentialFile */ @@ -760,32 +743,21 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, FSReadRequest* req = req_wrap->req; size_t bytes_read = 0; + bool read_again = false; UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len, - false /*async_read*/, req_wrap->finished_len, req, - bytes_read); + false /*async_read*/, use_direct_io(), + GetRequiredBufferAlignment(), req_wrap->finished_len, req, + bytes_read, read_again); int32_t res = cqe->res; if (res >= 0) { - if (bytes_read == 0) { - /// cqe->res == 0 can means EOF, or can mean partial results. See - // comment - // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435 - // Fall back to pread in this case. - if (use_direct_io() && - !IsSectorAligned(req_wrap->finished_len, - GetRequiredBufferAlignment())) { - // Bytes reads don't fill sectors. Should only happen at the end - // of the file. - req->result = Slice(req->scratch, req_wrap->finished_len); - req->status = IOStatus::OK(); - } else { - Slice tmp_slice; - req->status = - Read(req->offset + req_wrap->finished_len, - req->len - req_wrap->finished_len, options, &tmp_slice, - req->scratch + req_wrap->finished_len, dbg); - req->result = - Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); - } + if (bytes_read == 0 && read_again) { + Slice tmp_slice; + req->status = + Read(req->offset + req_wrap->finished_len, + req->len - req_wrap->finished_len, options, &tmp_slice, + req->scratch + req_wrap->finished_len, dbg); + req->result = + Slice(req->scratch, req_wrap->finished_len + tmp_slice.size()); } else if (bytes_read < req_wrap->iov.iov_len) { incomplete_rq_list.push_back(req_wrap); } @@ -910,19 +882,15 @@ IOStatus PosixRandomAccessFile::ReadAsync( args = nullptr; }; - Posix_IOHandle* posix_handle = new Posix_IOHandle(); - *io_handle = static_cast(posix_handle); - *del_fn = deletefn; - // Initialize Posix_IOHandle. - posix_handle->iu = iu; + Posix_IOHandle* posix_handle = + new Posix_IOHandle(iu, cb, cb_arg, req.offset, req.len, req.scratch, + use_direct_io(), GetRequiredBufferAlignment()); posix_handle->iov.iov_base = req.scratch; posix_handle->iov.iov_len = req.len; - posix_handle->cb = cb; - posix_handle->cb_arg = cb_arg; - posix_handle->offset = req.offset; - posix_handle->len = req.len; - posix_handle->scratch = req.scratch; + + *io_handle = static_cast(posix_handle); + *del_fn = deletefn; // Step 3: io_uring_sqe_set_data struct io_uring_sqe* sqe; diff --git a/env/io_posix.h b/env/io_posix.h index 9659b430c..1cad4f9bd 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -52,8 +52,37 @@ class PosixHelper { size_t* size); }; +/* + * DirectIOHelper + */ +inline bool IsSectorAligned(const size_t off, size_t sector_size) { + assert((sector_size & (sector_size - 1)) == 0); + return (off & (sector_size - 1)) == 0; +} + +#ifndef NDEBUG +inline bool IsSectorAligned(const void* ptr, size_t sector_size) { + return uintptr_t(ptr) % sector_size == 0; +} +#endif + #if defined(ROCKSDB_IOURING_PRESENT) struct Posix_IOHandle { + Posix_IOHandle(struct io_uring* _iu, + std::function _cb, + void* _cb_arg, uint64_t _offset, size_t _len, char* _scratch, + bool _use_direct_io, size_t _alignment) + : iu(_iu), + cb(_cb), + cb_arg(_cb_arg), + offset(_offset), + len(_len), + scratch(_scratch), + use_direct_io(_use_direct_io), + alignment(_alignment), + is_finished(false), + req_count(0) {} + struct iovec iov; struct io_uring* iu; std::function cb; @@ -61,15 +90,19 @@ struct Posix_IOHandle { uint64_t offset; size_t len; char* scratch; - bool is_finished = false; + bool use_direct_io; + size_t alignment; + bool is_finished; // req_count is used by AbortIO API to keep track of number of requests. - uint32_t req_count = 0; + uint32_t req_count; }; inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name, size_t len, size_t iov_len, bool async_read, + bool use_direct_io, uint32_t alignment, size_t& finished_len, FSReadRequest* req, - size_t& bytes_read) { + size_t& bytes_read, bool& read_again) { + read_again = false; if (cqe->res < 0) { req->result = Slice(req->scratch, 0); req->status = IOError("Req failed", file_name, cqe->res); @@ -80,10 +113,24 @@ inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name, req->result = Slice(req->scratch, req->len); req->status = IOStatus::OK(); } else if (bytes_read == 0) { - if (async_read) { - // No bytes read. It can means EOF. - req->result = Slice(req->scratch, 0); + /// cqe->res == 0 can means EOF, or can mean partial results. See + // comment + // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435 + // Fall back to pread in this case. + if (use_direct_io && !IsSectorAligned(finished_len, alignment)) { + // Bytes reads don't fill sectors. Should only happen at the end + // of the file. + req->result = Slice(req->scratch, finished_len); req->status = IOStatus::OK(); + } else { + if (async_read) { + // No bytes read. It can means EOF. In case of partial results, it's + // caller responsibility to call read/readasync again. + req->result = Slice(req->scratch, 0); + req->status = IOStatus::OK(); + } else { + read_again = true; + } } } else if (bytes_read < iov_len) { assert(bytes_read > 0); diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 9e97845c3..5a2607c81 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -88,7 +88,7 @@ Status FilePrefetchBuffer::Read(const IOOptions& opts, Slice result; Status s = reader->Read(opts, rounddown_start + chunk_len, read_len, &result, bufs_[index].buffer_.BufferStart() + chunk_len, - nullptr, rate_limiter_priority); + /*aligned_buf=*/nullptr, rate_limiter_priority); #ifndef NDEBUG if (result.size() < read_len) { // Fake an IO error to force db_stress fault injection to ignore @@ -108,7 +108,6 @@ Status FilePrefetchBuffer::Read(const IOOptions& opts, Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader, - Env::IOPriority rate_limiter_priority, uint64_t read_len, uint64_t chunk_len, uint64_t rounddown_start, uint32_t index) { // callback for async read request. @@ -120,8 +119,9 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, req.offset = rounddown_start + chunk_len; req.result = result; req.scratch = bufs_[index].buffer_.BufferStart() + chunk_len; - Status s = reader->ReadAsync(req, opts, fp, nullptr /*cb_arg*/, &io_handle_, - &del_fn_, rate_limiter_priority); + Status s = reader->ReadAsync(req, opts, fp, + /*cb_arg=*/nullptr, &io_handle_, &del_fn_, + /*aligned_buf=*/nullptr); req.status.PermitUncheckedError(); if (s.ok()) { async_read_in_progress_ = true; @@ -373,8 +373,7 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal( bufs_[second].offset_ = rounddown_start2; assert(roundup_len2 >= chunk_len2); uint64_t read_len2 = static_cast(roundup_len2 - chunk_len2); - ReadAsync(opts, reader, rate_limiter_priority, read_len2, chunk_len2, - rounddown_start2, second) + ReadAsync(opts, reader, read_len2, chunk_len2, rounddown_start2, second) .PermitUncheckedError(); } @@ -544,7 +543,8 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, if (req.status.ok()) { if (req.offset + req.result.size() <= bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) { - // All requested bytes are already in the buffer. So no need to update. + // All requested bytes are already in the buffer or no data is read + // because of EOF. So no need to update. return; } if (req.offset < bufs_[index].offset_) { @@ -560,7 +560,6 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, - Env::IOPriority rate_limiter_priority, Slice* result) { assert(reader != nullptr); if (!enable_) { @@ -630,8 +629,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, size_t read_len = static_cast(roundup_len - chunk_len); - s = ReadAsync(opts, reader, rate_limiter_priority, read_len, chunk_len, - rounddown_start, second); + s = ReadAsync(opts, reader, read_len, chunk_len, rounddown_start, second); if (!s.ok()) { return s; diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 04303aa30..9a19b5249 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -138,16 +138,13 @@ class FilePrefetchBuffer { // reader : the file reader. // offset : the file offset to start reading from. // n : the number of bytes to read. - // rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to - // bypass. // result : if data already exists in the buffer, result will // be updated with the data. // // If data already exist in the buffer, it will return Status::OK, otherwise // it will send asynchronous request and return Status::TryAgain. Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader, - uint64_t offset, size_t n, - Env::IOPriority rate_limiter_priority, Slice* result); + uint64_t offset, size_t n, Slice* result); // Tries returning the data for a file read from this buffer if that data is // in the buffer. @@ -246,9 +243,8 @@ class FilePrefetchBuffer { uint64_t chunk_len, uint64_t rounddown_start, uint32_t index); Status ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader, - Env::IOPriority rate_limiter_priority, uint64_t read_len, - uint64_t chunk_len, uint64_t rounddown_start, - uint32_t index); + uint64_t read_len, uint64_t chunk_len, + uint64_t rounddown_start, uint32_t index); // Copy the data from src to third buffer. void CopyDataToBuffer(uint32_t src, uint64_t& offset, size_t& length); diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 88e74cc35..d4d996fa7 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -864,33 +864,24 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) { Close(); } -class PrefetchTest1 - : public DBTestBase, - public ::testing::WithParamInterface> { - public: - PrefetchTest1() : DBTestBase("prefetch_test1", true) {} -}; - -INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, - ::testing::Combine(::testing::Bool(), - ::testing::Bool())); - #ifndef ROCKSDB_LITE -TEST_P(PrefetchTest1, DBIterLevelReadAhead) { +TEST_P(PrefetchTest, DBIterLevelReadAhead) { const int kNumKeys = 1000; // Set options std::shared_ptr fs = std::make_shared(env_->GetFileSystem(), false); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + bool use_direct_io = std::get<0>(GetParam()); bool is_adaptive_readahead = std::get<1>(GetParam()); + Options options = CurrentOptions(); options.write_buffer_size = 1024; options.create_if_missing = true; options.compression = kNoCompression; options.statistics = CreateDBStatistics(); options.env = env.get(); - bool use_direct_io = std::get<0>(GetParam()); + if (use_direct_io) { options.use_direct_reads = true; options.use_direct_io_for_flush_and_compaction = true; @@ -987,7 +978,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { { HistogramData async_read_bytes; options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); - if (ro.async_io && !use_direct_io) { + if (ro.async_io) { ASSERT_GT(async_read_bytes.count, 0); } else { ASSERT_EQ(async_read_bytes.count, 0); @@ -1001,16 +992,16 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { } #endif //! ROCKSDB_LITE -class PrefetchTest2 : public DBTestBase, +class PrefetchTest1 : public DBTestBase, public ::testing::WithParamInterface { public: - PrefetchTest2() : DBTestBase("prefetch_test2", true) {} + PrefetchTest1() : DBTestBase("prefetch_test1", true) {} }; -INSTANTIATE_TEST_CASE_P(PrefetchTest2, PrefetchTest2, ::testing::Bool()); +INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool()); #ifndef ROCKSDB_LITE -TEST_P(PrefetchTest2, NonSequentialReadsWithAdaptiveReadahead) { +TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) { const int kNumKeys = 1000; // Set options std::shared_ptr fs = @@ -1103,7 +1094,7 @@ TEST_P(PrefetchTest2, NonSequentialReadsWithAdaptiveReadahead) { } #endif //! ROCKSDB_LITE -TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { +TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) { const int kNumKeys = 2000; // Set options std::shared_ptr fs = @@ -1167,7 +1158,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { SyncPoint::GetInstance()->EnableProcessing(); ReadOptions ro; ro.adaptive_readahead = true; - // ro.async_io = true; { /* * Reseek keys from sequential Data Blocks within same partitioned @@ -1248,7 +1238,7 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { Close(); } -TEST_P(PrefetchTest2, SeekParallelizationTest) { +TEST_P(PrefetchTest1, SeekParallelizationTest) { const int kNumKeys = 2000; // Set options std::shared_ptr fs = @@ -1341,12 +1331,8 @@ TEST_P(PrefetchTest2, SeekParallelizationTest) { { HistogramData async_read_bytes; options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); - if (GetParam()) { - ASSERT_EQ(async_read_bytes.count, 0); - } else { - ASSERT_GT(async_read_bytes.count, 0); - ASSERT_GT(get_perf_context()->number_async_seek, 0); - } + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); } buff_prefetch_count = 0; @@ -1356,11 +1342,7 @@ TEST_P(PrefetchTest2, SeekParallelizationTest) { extern "C" bool RocksDbIOUringEnable() { return true; } -class PrefetchTestWithPosix : public DBTestBase, - public ::testing::WithParamInterface { - public: - PrefetchTestWithPosix() : DBTestBase("prefetch_test_with_posix", true) {} - +namespace { #ifndef ROCKSDB_LITE #ifdef GFLAGS const int kMaxArgCount = 100; @@ -1387,144 +1369,95 @@ class PrefetchTestWithPosix : public DBTestBase, } #endif // GFLAGS #endif // ROCKSDB_LITE -}; - -INSTANTIATE_TEST_CASE_P(PrefetchTestWithPosix, PrefetchTestWithPosix, - ::testing::Bool()); + } // namespace // Tests the default implementation of ReadAsync API with PosixFileSystem. -TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) { - if (mem_env_ || encrypted_env_) { - ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); - return; - } - - const int kNumKeys = 1000; - std::shared_ptr fs = std::make_shared( - FileSystem::Default(), /*support_prefetch=*/false); - std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); - - bool use_direct_io = false; - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - options.statistics = CreateDBStatistics(); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; - } - BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + TEST_P(PrefetchTest, ReadAsyncWithPosixFS) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } - Status s = TryReopen(options); - if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { - // If direct IO is not supported, skip the test - return; - } else { - ASSERT_OK(s); - } + const int kNumKeys = 1000; + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.statistics = CreateDBStatistics(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } - int total_keys = 0; - // Write the keys. - { - WriteBatch batch; - Random rnd(309); - for (int j = 0; j < 5; j++) { - for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { - ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); - total_keys++; + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); } - ASSERT_OK(db_->Write(WriteOptions(), &batch)); - ASSERT_OK(Flush()); + MoveFilesToLevel(2); } - MoveFilesToLevel(2); - } - - int buff_prefetch_count = 0; - bool read_async_called = false; - ReadOptions ro; - ro.adaptive_readahead = true; - ro.async_io = true; - - if (GetParam()) { - ro.readahead_size = 16 * 1024; - } - - SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsyncInternal:Start", - [&](void*) { buff_prefetch_count++; }); - SyncPoint::GetInstance()->SetCallBack( - "UpdateResults::io_uring_result", - [&](void* /*arg*/) { read_async_called = true; }); - SyncPoint::GetInstance()->EnableProcessing(); - - // Read the keys. - { - ASSERT_OK(options.statistics->Reset()); - get_perf_context()->Reset(); + int buff_prefetch_count = 0; + bool read_async_called = false; + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; - auto iter = std::unique_ptr(db_->NewIterator(ro)); - int num_keys = 0; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - ASSERT_OK(iter->status()); - num_keys++; + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; } - ASSERT_EQ(num_keys, total_keys); - ASSERT_GT(buff_prefetch_count, 0); - - // Check stats to make sure async prefetch is done. - { - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); - HistogramData prefetched_bytes_discarded; - options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, - &prefetched_bytes_discarded); + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); - // Not all platforms support iouring. In that case, ReadAsync in posix - // won't submit async requests. - if (read_async_called) { - ASSERT_GT(async_read_bytes.count, 0); - } else { - ASSERT_EQ(async_read_bytes.count, 0); - } - ASSERT_GT(prefetched_bytes_discarded.count, 0); - } - ASSERT_EQ(get_perf_context()->number_async_seek, 0); - } + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); - { - // Read the keys using seek. + // Read the keys. { ASSERT_OK(options.statistics->Reset()); get_perf_context()->Reset(); auto iter = std::unique_ptr(db_->NewIterator(ro)); int num_keys = 0; - iter->Seek(BuildKey(450)); - while (iter->Valid()) { - ASSERT_OK(iter->status()); - num_keys++; - iter->Next(); - } - ASSERT_OK(iter->status()); - - iter->Seek(BuildKey(450)); - while (iter->Valid()) { + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_OK(iter->status()); num_keys++; - iter->Prev(); } - ASSERT_EQ(num_keys, total_keys + 1); + ASSERT_EQ(num_keys, total_keys); ASSERT_GT(buff_prefetch_count, 0); // Check stats to make sure async prefetch is done. @@ -1539,146 +1472,193 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) { // won't submit async requests. if (read_async_called) { ASSERT_GT(async_read_bytes.count, 0); - ASSERT_GT(get_perf_context()->number_async_seek, 0); } else { ASSERT_EQ(async_read_bytes.count, 0); - ASSERT_EQ(get_perf_context()->number_async_seek, 0); } ASSERT_GT(prefetched_bytes_discarded.count, 0); } + ASSERT_EQ(get_perf_context()->number_async_seek, 0); } - } - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); + { + // Read the keys using seek. + { + ASSERT_OK(options.statistics->Reset()); + get_perf_context()->Reset(); + + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + iter->Seek(BuildKey(450)); + while (iter->Valid()) { + ASSERT_OK(iter->status()); + num_keys++; + iter->Next(); + } + ASSERT_OK(iter->status()); - Close(); -} + iter->Seek(BuildKey(450)); + while (iter->Valid()) { + ASSERT_OK(iter->status()); + num_keys++; + iter->Prev(); + } -#ifndef ROCKSDB_LITE -#ifdef GFLAGS -TEST_P(PrefetchTestWithPosix, TraceReadAsyncWithCallbackWrapper) { - if (mem_env_ || encrypted_env_) { - ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); - return; - } + ASSERT_EQ(num_keys, total_keys + 1); + ASSERT_GT(buff_prefetch_count, 0); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, + &async_read_bytes); + HistogramData prefetched_bytes_discarded; + options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, + &prefetched_bytes_discarded); + + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + } + ASSERT_GT(prefetched_bytes_discarded.count, 0); + } + } + } - const int kNumKeys = 1000; - std::shared_ptr fs = std::make_shared( - FileSystem::Default(), /*support_prefetch=*/false); - std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); - bool use_direct_io = false; - Options options = CurrentOptions(); - options.write_buffer_size = 1024; - options.create_if_missing = true; - options.compression = kNoCompression; - options.env = env.get(); - options.statistics = CreateDBStatistics(); - if (use_direct_io) { - options.use_direct_reads = true; - options.use_direct_io_for_flush_and_compaction = true; + Close(); } - BlockBasedTableOptions table_options; - table_options.no_block_cache = true; - table_options.cache_index_and_filter_blocks = false; - table_options.metadata_block_size = 1024; - table_options.index_type = - BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - Status s = TryReopen(options); - if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { - // If direct IO is not supported, skip the test - return; - } else { - ASSERT_OK(s); - } +#ifndef ROCKSDB_LITE +#ifdef GFLAGS + TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } - int total_keys = 0; - // Write the keys. - { - WriteBatch batch; - Random rnd(309); - for (int j = 0; j < 5; j++) { - for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { - ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); - total_keys++; + const int kNumKeys = 1000; + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.statistics = CreateDBStatistics(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); } - ASSERT_OK(db_->Write(WriteOptions(), &batch)); - ASSERT_OK(Flush()); + MoveFilesToLevel(2); } - MoveFilesToLevel(2); - } - int buff_prefetch_count = 0; - bool read_async_called = false; - ReadOptions ro; - ro.adaptive_readahead = true; - ro.async_io = true; + int buff_prefetch_count = 0; + bool read_async_called = false; + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; - if (GetParam()) { - ro.readahead_size = 16 * 1024; - } + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; + } - SyncPoint::GetInstance()->SetCallBack( - "FilePrefetchBuffer::PrefetchAsyncInternal:Start", - [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); - SyncPoint::GetInstance()->SetCallBack( - "UpdateResults::io_uring_result", - [&](void* /*arg*/) { read_async_called = true; }); - SyncPoint::GetInstance()->EnableProcessing(); + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); - // Read the keys. - { - // Start io_tracing. - WriteOptions write_opt; - TraceOptions trace_opt; - std::unique_ptr trace_writer; - std::string trace_file_path = dbname_ + "/io_trace_file"; - - ASSERT_OK( - NewFileTraceWriter(env_, EnvOptions(), trace_file_path, &trace_writer)); - ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer))); - ASSERT_OK(options.statistics->Reset()); + // Read the keys. + { + // Start io_tracing. + WriteOptions write_opt; + TraceOptions trace_opt; + std::unique_ptr trace_writer; + std::string trace_file_path = dbname_ + "/io_trace_file"; + + ASSERT_OK(NewFileTraceWriter(env_, EnvOptions(), trace_file_path, + &trace_writer)); + ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer))); + ASSERT_OK(options.statistics->Reset()); - auto iter = std::unique_ptr(db_->NewIterator(ro)); - int num_keys = 0; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - ASSERT_OK(iter->status()); - num_keys++; - } + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + num_keys++; + } - // End the tracing. - ASSERT_OK(db_->EndIOTrace()); - ASSERT_OK(env_->FileExists(trace_file_path)); + // End the tracing. + ASSERT_OK(db_->EndIOTrace()); + ASSERT_OK(env_->FileExists(trace_file_path)); - ASSERT_EQ(num_keys, total_keys); - ASSERT_GT(buff_prefetch_count, 0); + ASSERT_EQ(num_keys, total_keys); + ASSERT_GT(buff_prefetch_count, 0); - // Check stats to make sure async prefetch is done. - { - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); - // Not all platforms support iouring. In that case, ReadAsync in posix - // won't submit async requests. - if (read_async_called) { - ASSERT_GT(async_read_bytes.count, 0); - } else { - ASSERT_EQ(async_read_bytes.count, 0); + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + } } - } - // Check the file to see if ReadAsync is logged. - RunIOTracerParserTool(trace_file_path); - } + // Check the file to see if ReadAsync is logged. + RunIOTracerParserTool(trace_file_path); + } - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); - Close(); -} + Close(); + } #endif // GFLAGS #endif // ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index d02b7b5f6..dc93c1a34 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -446,29 +446,17 @@ IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro, } } -// TODO akanksha: -// 1. Handle use_direct_io case which currently calls Read API. IOStatus RandomAccessFileReader::ReadAsync( FSReadRequest& req, const IOOptions& opts, std::function cb, void* cb_arg, - void** io_handle, IOHandleDeleter* del_fn, - Env::IOPriority rate_limiter_priority) { - if (use_direct_io()) { - // For direct_io, it calls Read API. - req.status = Read(opts, req.offset, req.len, &(req.result), req.scratch, - nullptr /*dbg*/, rate_limiter_priority); - cb(req, cb_arg); - return IOStatus::OK(); - } - + void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf) { + IOStatus s; // Create a callback and populate info. auto read_async_callback = std::bind(&RandomAccessFileReader::ReadAsyncCallback, this, std::placeholders::_1, std::placeholders::_2); - ReadAsyncInfo* read_async_info = new ReadAsyncInfo; - read_async_info->cb_ = cb; - read_async_info->cb_arg_ = cb_arg; - read_async_info->start_time_ = clock_->NowMicros(); + ReadAsyncInfo* read_async_info = + new ReadAsyncInfo(cb, cb_arg, clock_->NowMicros()); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { @@ -476,8 +464,38 @@ IOStatus RandomAccessFileReader::ReadAsync( } #endif - IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info, - io_handle, del_fn, nullptr /*dbg*/); + size_t alignment = file_->GetRequiredBufferAlignment(); + bool is_aligned = (req.offset & (alignment - 1)) == 0 && + (req.len & (alignment - 1)) == 0 && + (uintptr_t(req.scratch) & (alignment - 1)) == 0; + read_async_info->is_aligned_ = is_aligned; + + if (use_direct_io() && is_aligned == false) { + FSReadRequest aligned_req = Align(req, alignment); + + // Allocate aligned buffer. + read_async_info->buf_.Alignment(alignment); + read_async_info->buf_.AllocateNewBuffer(aligned_req.len); + + // Set rem fields in aligned FSReadRequest. + aligned_req.scratch = read_async_info->buf_.BufferStart(); + + // Set user provided fields to populate back in callback. + read_async_info->user_scratch_ = req.scratch; + read_async_info->user_aligned_buf_ = aligned_buf; + read_async_info->user_len_ = req.len; + read_async_info->user_offset_ = req.offset; + read_async_info->user_result_ = req.result; + + assert(read_async_info->buf_.CurrentSize() == 0); + + s = file_->ReadAsync(aligned_req, opts, read_async_callback, + read_async_info, io_handle, del_fn, nullptr /*dbg*/); + } else { + s = file_->ReadAsync(req, opts, read_async_callback, read_async_info, + io_handle, del_fn, nullptr /*dbg*/); + } + // Suppress false positive clang analyzer warnings. // Memory is not released if file_->ReadAsync returns !s.ok(), because // ReadAsyncCallback is never called in that case. If ReadAsyncCallback is @@ -497,7 +515,54 @@ void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req, assert(read_async_info); assert(read_async_info->cb_); - read_async_info->cb_(req, read_async_info->cb_arg_); + if (use_direct_io() && read_async_info->is_aligned_ == false) { + // Create FSReadRequest with user provided fields. + FSReadRequest user_req; + user_req.scratch = read_async_info->user_scratch_; + user_req.offset = read_async_info->user_offset_; + user_req.len = read_async_info->user_len_; + + // Update results in user_req. + user_req.result = req.result; + user_req.status = req.status; + + read_async_info->buf_.Size(read_async_info->buf_.CurrentSize() + + req.result.size()); + + size_t offset_advance_len = static_cast( + /*offset_passed_by_user=*/read_async_info->user_offset_ - + /*aligned_offset=*/req.offset); + + size_t res_len = 0; + if (req.status.ok() && + offset_advance_len < read_async_info->buf_.CurrentSize()) { + res_len = + std::min(read_async_info->buf_.CurrentSize() - offset_advance_len, + read_async_info->user_len_); + if (read_async_info->user_aligned_buf_ == nullptr) { + // Copy the data into user's scratch. +// Clang analyzer assumes that it will take use_direct_io() == false in +// ReadAsync and use_direct_io() == true in Callback which cannot be true. +#ifndef __clang_analyzer__ + read_async_info->buf_.Read(user_req.scratch, offset_advance_len, + res_len); +#endif // __clang_analyzer__ + } else { + // Set aligned_buf provided by user without additional copy. + user_req.scratch = + read_async_info->buf_.BufferStart() + offset_advance_len; + read_async_info->user_aligned_buf_->reset( + read_async_info->buf_.Release()); + } + user_req.result = Slice(user_req.scratch, res_len); + } else { + // Either req.status is not ok or data was not read. + user_req.result = Slice(); + } + read_async_info->cb_(user_req, read_async_info->cb_arg_); + } else { + read_async_info->cb_(req, read_async_info->cb_arg_); + } // Update stats and notify listeners. if (stats_ != nullptr && file_read_hist_ != nullptr) { diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index 0d85600fb..ea7cfd234 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -93,12 +93,32 @@ class RandomAccessFileReader { const bool is_last_level_; struct ReadAsyncInfo { + ReadAsyncInfo(std::function cb, + void* cb_arg, uint64_t start_time) + : cb_(cb), + cb_arg_(cb_arg), + start_time_(start_time), + user_scratch_(nullptr), + user_aligned_buf_(nullptr), + user_offset_(0), + user_len_(0), + is_aligned_(false) {} + + std::function cb_; + void* cb_arg_; + uint64_t start_time_; #ifndef ROCKSDB_LITE FileOperationInfo::StartTimePoint fs_start_ts_; #endif - uint64_t start_time_; - std::function cb_; - void* cb_arg_; + // Below fields stores the parameters passed by caller in case of direct_io. + char* user_scratch_; + AlignedBuf* user_aligned_buf_; + uint64_t user_offset_; + size_t user_len_; + Slice user_result_; + // Used in case of direct_io + AlignedBuffer buf_; + bool is_aligned_; }; public: @@ -190,7 +210,7 @@ class RandomAccessFileReader { IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, std::function cb, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, - Env::IOPriority rate_limiter_priority); + AlignedBuf* aligned_buf); void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg); }; diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index 0e7e378b7..d2605670f 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -257,7 +257,7 @@ void BlockBasedTableIterator::InitDataBlock() { // Enabled from the very first IO when ReadOptions.readahead_size is set. block_prefetcher_.PrefetchIfNeeded( rep, data_block_handle, read_options_.readahead_size, is_for_compaction, - /*async_io=*/false, read_options_.rate_limiter_priority); + /*no_sequential_checking=*/false, read_options_.rate_limiter_priority); Status s; table_->NewDataBlockIterator( read_options_, data_block_handle, &block_iter_, BlockType::kData, @@ -288,9 +288,12 @@ void BlockBasedTableIterator::AsyncInitDataBlock(bool is_first_pass) { // Explicit user requested readahead: // Enabled from the very first IO when ReadOptions.readahead_size is // set. + // In case of async_io with Implicit readahead, block_prefetcher_ will + // always the create the prefetch buffer by setting no_sequential_checking + // = true. block_prefetcher_.PrefetchIfNeeded( rep, data_block_handle, read_options_.readahead_size, - is_for_compaction, read_options_.async_io, + is_for_compaction, /*no_sequential_checking=*/read_options_.async_io, read_options_.rate_limiter_priority); Status s; diff --git a/table/block_based/block_prefetcher.cc b/table/block_based/block_prefetcher.cc index 981e2043c..e84872754 100644 --- a/table/block_based/block_prefetcher.cc +++ b/table/block_based/block_prefetcher.cc @@ -14,7 +14,8 @@ namespace ROCKSDB_NAMESPACE { void BlockPrefetcher::PrefetchIfNeeded( const BlockBasedTable::Rep* rep, const BlockHandle& handle, - const size_t readahead_size, bool is_for_compaction, const bool async_io, + const size_t readahead_size, bool is_for_compaction, + const bool no_sequential_checking, const Env::IOPriority rate_limiter_priority) { // num_file_reads is used by FilePrefetchBuffer only when // implicit_auto_readahead is set. @@ -43,9 +44,9 @@ void BlockPrefetcher::PrefetchIfNeeded( return; } - // In case of async_io, always creates the PrefetchBuffer irrespective of - // num_file_reads_. - if (async_io) { + // In case of no_sequential_checking, it will skip the num_file_reads_ and + // will always creates the FilePrefetchBuffer. + if (no_sequential_checking) { rep->CreateFilePrefetchBufferIfNotExists( initial_auto_readahead_size_, max_auto_readahead_size, &prefetch_buffer_, /*implicit_auto_readahead=*/true, diff --git a/table/block_based/block_prefetcher.h b/table/block_based/block_prefetcher.h index 4fae7f0bb..518868a30 100644 --- a/table/block_based/block_prefetcher.h +++ b/table/block_based/block_prefetcher.h @@ -20,7 +20,8 @@ class BlockPrefetcher { void PrefetchIfNeeded(const BlockBasedTable::Rep* rep, const BlockHandle& handle, size_t readahead_size, - bool is_for_compaction, bool async_io, + bool is_for_compaction, + const bool no_sequential_checking, Env::IOPriority rate_limiter_priority); FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); } diff --git a/table/block_based/partitioned_index_iterator.cc b/table/block_based/partitioned_index_iterator.cc index 94a023133..b9bc2155a 100644 --- a/table/block_based/partitioned_index_iterator.cc +++ b/table/block_based/partitioned_index_iterator.cc @@ -89,10 +89,10 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() { // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0. // Explicit user requested readahead: // Enabled from the very first IO when ReadOptions.readahead_size is set. - block_prefetcher_.PrefetchIfNeeded(rep, partitioned_index_handle, - read_options_.readahead_size, - is_for_compaction, /*async_io=*/false, - read_options_.rate_limiter_priority); + block_prefetcher_.PrefetchIfNeeded( + rep, partitioned_index_handle, read_options_.readahead_size, + is_for_compaction, /*no_sequential_checking=*/false, + read_options_.rate_limiter_priority); Status s; table_->NewDataBlockIterator( read_options_, partitioned_index_handle, &block_iter_, diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index e303b8363..9f12a4c45 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -359,8 +359,7 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() { return io_s; } io_s = status_to_io_status(prefetch_buffer_->PrefetchAsync( - opts, file_, handle_.offset(), block_size_with_trailer_, - read_options_.rate_limiter_priority, &slice_)); + opts, file_, handle_.offset(), block_size_with_trailer_, &slice_)); if (io_s.IsTryAgain()) { return io_s; } diff --git a/util/async_file_reader.cc b/util/async_file_reader.cc index f8fad5d4d..83f6afc6e 100644 --- a/util/async_file_reader.cc +++ b/util/async_file_reader.cc @@ -28,7 +28,7 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) { read_req->result = req.result; }, &awaiter->read_reqs_[i], &awaiter->io_handle_[i], - &awaiter->del_fn_[i], Env::IOPriority::IO_TOTAL) + &awaiter->del_fn_[i], /*aligned_buf=*/nullptr) .PermitUncheckedError(); } return true;