diff --git a/HISTORY.md b/HISTORY.md index 93561a73b..8bfd73876 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -34,6 +34,7 @@ ### Performance Improvements * Iterator performance is improved for `DeleteRange()` users. Internally, iterator will skip to the end of a range tombstone when possible, instead of looping through each key and check individually if a key is range deleted. * Eliminated some allocations and copies in the blob read path. Also, `PinnableSlice` now only points to the blob value and pins the backing resource (cache entry or buffer) in all cases, instead of containing a copy of the blob value. See #10625 and #10647. +* In case of scans with async_io enabled, few optimizations have been added to issue more asynchronous requests in parallel in order to avoid synchronous prefetching. ## 7.6.0 (08/19/2022) ### New Features diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 5a2607c81..00a76aed5 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -10,6 +10,7 @@ #include "file/file_prefetch_buffer.h" #include +#include #include "file/random_access_file_reader.h" #include "monitoring/histogram.h" @@ -23,8 +24,8 @@ namespace ROCKSDB_NAMESPACE { void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment, uint64_t offset, - size_t roundup_len, size_t index, - bool refit_tail, + size_t roundup_len, + uint32_t index, bool refit_tail, uint64_t& chunk_len) { uint64_t chunk_offset_in_buffer = 0; bool copy_data_to_new_buffer = false; @@ -32,9 +33,7 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment, // If only a few bytes exist -- reuse them & read only what is really needed. // This is typically the case of incremental reading of data. // If no bytes exist in buffer -- full pread. - if (bufs_[index].buffer_.CurrentSize() > 0 && - offset >= bufs_[index].offset_ && - offset <= bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) { + if (DoesBufferContainData(index) && IsOffsetInBuffer(offset, index)) { // Only a few requested bytes are in the buffer. memmove those chunk of // bytes to the beginning, and memcpy them back into the new buffer if a // new buffer is created. @@ -43,7 +42,7 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment, chunk_len = static_cast(bufs_[index].buffer_.CurrentSize()) - chunk_offset_in_buffer; assert(chunk_offset_in_buffer % alignment == 0); - // assert(chunk_len % alignment == 0); + assert(chunk_len % alignment == 0); assert(chunk_offset_in_buffer + chunk_len <= bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()); if (chunk_len > 0) { @@ -108,7 +107,7 @@ Status FilePrefetchBuffer::Read(const IOOptions& opts, Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader, - uint64_t read_len, uint64_t chunk_len, + uint64_t read_len, uint64_t rounddown_start, uint32_t index) { // callback for async read request. auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this, @@ -116,15 +115,18 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, FSReadRequest req; Slice result; req.len = read_len; - req.offset = rounddown_start + chunk_len; + req.offset = rounddown_start; req.result = result; - req.scratch = bufs_[index].buffer_.BufferStart() + chunk_len; - Status s = reader->ReadAsync(req, opts, fp, - /*cb_arg=*/nullptr, &io_handle_, &del_fn_, - /*aligned_buf=*/nullptr); + req.scratch = bufs_[index].buffer_.BufferStart(); + bufs_[index].async_req_len_ = req.len; + + Status s = + reader->ReadAsync(req, opts, fp, &(bufs_[index].pos_), + &(bufs_[index].io_handle_), &(bufs_[index].del_fn_), + /*aligned_buf=*/nullptr); req.status.PermitUncheckedError(); if (s.ok()) { - async_read_in_progress_ = true; + bufs_[index].async_read_in_progress_ = true; } return s; } @@ -170,8 +172,7 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset, } uint64_t copy_offset = (offset - bufs_[src].offset_); size_t copy_len = 0; - if (offset + length <= - bufs_[src].offset_ + bufs_[src].buffer_.CurrentSize()) { + if (IsDataBlockInBuffer(offset, length, src)) { // All the bytes are in src. copy_len = length; } else { @@ -194,65 +195,121 @@ void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset, } } -void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset) { - if (async_read_in_progress_ && fs_ != nullptr) { - // Wait for prefetch data to complete. - // No mutex is needed as PrefetchAsyncCallback updates the result in second - // buffer and FilePrefetchBuffer should wait for Poll before accessing the - // second buffer. - std::vector handles; - handles.emplace_back(io_handle_); - StopWatch sw(clock_, stats_, POLL_WAIT_MICROS); - fs_->Poll(handles, 1).PermitUncheckedError(); - } - - // Reset and Release io_handle_ after the Poll API as request has been - // completed. - async_read_in_progress_ = false; - if (io_handle_ != nullptr && del_fn_ != nullptr) { - del_fn_(io_handle_); - io_handle_ = nullptr; - del_fn_ = nullptr; - } - - // Index of second buffer. +// Clear the buffers if it contains outdated data. Outdated data can be +// because previous sequential reads were read from the cache instead of these +// buffer. In that case outdated IOs should be aborted. +void FilePrefetchBuffer::AbortIOIfNeeded(uint64_t offset) { uint32_t second = curr_ ^ 1; + std::vector handles; + autovector buf_pos; + if (IsBufferOutdatedWithAsyncProgress(offset, curr_)) { + handles.emplace_back(bufs_[curr_].io_handle_); + buf_pos.emplace_back(curr_); + } + if (IsBufferOutdatedWithAsyncProgress(offset, second)) { + handles.emplace_back(bufs_[second].io_handle_); + buf_pos.emplace_back(second); + } + if (!handles.empty()) { + StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); + Status s = fs_->AbortIO(handles); + assert(s.ok()); + } - // First clear the buffers if it contains outdated data. Outdated data can be - // because previous sequential reads were read from the cache instead of these - // buffer. - { - if (bufs_[curr_].buffer_.CurrentSize() > 0 && - offset >= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { - bufs_[curr_].buffer_.Clear(); - } - if (bufs_[second].buffer_.CurrentSize() > 0 && - offset >= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) { - bufs_[second].buffer_.Clear(); + for (auto& pos : buf_pos) { + // Release io_handle. + DestroyAndClearIOHandle(pos); + } + + if (bufs_[second].io_handle_ == nullptr) { + bufs_[second].async_read_in_progress_ = false; + } + + if (bufs_[curr_].io_handle_ == nullptr && + bufs_[curr_].async_read_in_progress_) { + bufs_[curr_].async_read_in_progress_ = false; + curr_ = curr_ ^ 1; + } +} + +void FilePrefetchBuffer::AbortAllIOs() { + uint32_t second = curr_ ^ 1; + std::vector handles; + for (uint32_t i = 0; i < 2; i++) { + if (bufs_[i].async_read_in_progress_ && bufs_[i].io_handle_ != nullptr) { + handles.emplace_back(bufs_[i].io_handle_); } } + if (!handles.empty()) { + StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); + Status s = fs_->AbortIO(handles); + assert(s.ok()); + } - // If data is in second buffer, make it curr_. Second buffer can be either - // partial filled or full. - if (bufs_[second].buffer_.CurrentSize() > 0 && - offset >= bufs_[second].offset_ && - offset < bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) { + // Release io_handles. + if (bufs_[curr_].io_handle_ != nullptr && bufs_[curr_].del_fn_ != nullptr) { + DestroyAndClearIOHandle(curr_); + } + + if (bufs_[second].io_handle_ != nullptr && bufs_[second].del_fn_ != nullptr) { + DestroyAndClearIOHandle(second); + } +} + +// Clear the buffers if it contains outdated data. Outdated data can be +// because previous sequential reads were read from the cache instead of these +// buffer. +void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset) { + uint32_t second = curr_ ^ 1; + if (IsBufferOutdated(offset, curr_)) { + bufs_[curr_].buffer_.Clear(); + } + if (IsBufferOutdated(offset, second)) { + bufs_[second].buffer_.Clear(); + } + + // If data starts from second buffer, make it curr_. Second buffer can be + // either partial filled or full. + if (!bufs_[second].async_read_in_progress_ && DoesBufferContainData(second) && + IsOffsetInBuffer(offset, second)) { // Clear the curr_ as buffers have been swapped and curr_ contains the // outdated data and switch the buffers. - bufs_[curr_].buffer_.Clear(); + if (!bufs_[curr_].async_read_in_progress_) { + bufs_[curr_].buffer_.Clear(); + } curr_ = curr_ ^ 1; } } -// If async_read = true: -// async_read is enabled in case of sequential reads. So when -// buffers are switched, we clear the curr_ buffer as we assume the data has -// been consumed because of sequential reads. +void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset) { + if (bufs_[curr_].async_read_in_progress_ && fs_ != nullptr) { + if (bufs_[curr_].io_handle_ != nullptr) { + // Wait for prefetch data to complete. + // No mutex is needed as async_read_in_progress behaves as mutex and is + // updated by main thread only. + std::vector handles; + handles.emplace_back(bufs_[curr_].io_handle_); + StopWatch sw(clock_, stats_, POLL_WAIT_MICROS); + fs_->Poll(handles, 1).PermitUncheckedError(); + } + + // Reset and Release io_handle after the Poll API as request has been + // completed. + DestroyAndClearIOHandle(curr_); + } + UpdateBuffersIfNeeded(offset); +} + +// If async_io is enabled in case of sequential reads, PrefetchAsyncInternal is +// called. When buffers are switched, we clear the curr_ buffer as we assume the +// data has been consumed because of sequential reads. +// Data in buffers will always be sequential with curr_ following second and +// not vice versa. // // Scenarios for prefetching asynchronously: -// Case1: If both buffers are empty, prefetch n bytes -// synchronously in curr_ -// and prefetch readahead_size_/2 async in second buffer. +// Case1: If both buffers are empty, prefetch n + readahead_size_/2 bytes +// synchronously in curr_ and prefetch readahead_size_/2 async in second +// buffer. // Case2: If second buffer has partial or full data, make it current and // prefetch readahead_size_/2 async in second buffer. In case of // partial data, prefetch remaining bytes from size n synchronously to @@ -260,9 +317,10 @@ void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset) { // Case3: If curr_ has partial data, prefetch remaining bytes from size n // synchronously in curr_ to fulfill the requested bytes request and // prefetch readahead_size_/2 bytes async in second buffer. -// Case4: If data is in both buffers, copy requested data from curr_ and second -// buffer to third buffer. If all requested bytes have been copied, do -// the asynchronous prefetching in second buffer. +// Case4: (Special case) If data is in both buffers, copy requested data from +// curr_, send async request on curr_, wait for poll to fill second +// buffer (if any), and copy remaining data from second buffer to third +// buffer. Status FilePrefetchBuffer::PrefetchAsyncInternal( const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t length, size_t readahead_size, Env::IOPriority rate_limiter_priority, @@ -273,39 +331,30 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal( TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsyncInternal:Start"); - PollAndUpdateBuffersIfNeeded(offset); - - // If all the requested bytes are in curr_, it will go for async prefetching - // only. - if (bufs_[curr_].buffer_.CurrentSize() > 0 && - offset + length <= - bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { - offset += length; - length = 0; + size_t alignment = reader->file()->GetRequiredBufferAlignment(); + Status s; + uint64_t tmp_offset = offset; + size_t tmp_length = length; - // Since async request was submitted directly by calling PrefetchAsync in - // last call, we don't need to prefetch further as this call is to poll the - // data submitted in previous call. - if (async_request_submitted_) { - return Status::OK(); + // 1. Abort IO and swap buffers if needed to point curr_ to first buffer with + // data. + { + if (!explicit_prefetch_submitted_) { + AbortIOIfNeeded(offset); } + UpdateBuffersIfNeeded(offset); } - - async_request_submitted_ = false; - - Status s; - size_t prefetch_size = length + readahead_size; - size_t alignment = reader->file()->GetRequiredBufferAlignment(); - // Index of second buffer. uint32_t second = curr_ ^ 1; - // Data is overlapping i.e. some of the data is in curr_ buffer and remaining - // in second buffer. - if (bufs_[curr_].buffer_.CurrentSize() > 0 && - bufs_[second].buffer_.CurrentSize() > 0 && - offset >= bufs_[curr_].offset_ && - offset < bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() && - offset + length > bufs_[second].offset_) { + // 2. If data is overlapping over two buffers, copy the data from curr_ and + // call ReadAsync on curr_. + if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) && + IsOffsetInBuffer(offset, curr_) && + (/*Data extends over curr_ buffer and second buffer either has data or in + process of population=*/ + (offset + length > bufs_[second].offset_) && + (bufs_[second].async_read_in_progress_ || + DoesBufferContainData(second)))) { // Allocate new buffer to third buffer; bufs_[2].buffer_.Clear(); bufs_[2].buffer_.Alignment(alignment); @@ -313,25 +362,92 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal( bufs_[2].offset_ = offset; copy_to_third_buffer = true; - // Move data from curr_ buffer to third. - CopyDataToBuffer(curr_, offset, length); - if (length == 0) { - // Requested data has been copied and curr_ still has unconsumed data. + CopyDataToBuffer(curr_, tmp_offset, tmp_length); + + // Call async prefetching on curr_ since data has been consumed in curr_ + // only if data lies within second buffer. + size_t second_size = bufs_[second].async_read_in_progress_ + ? bufs_[second].async_req_len_ + : bufs_[second].buffer_.CurrentSize(); + if (tmp_offset + tmp_length <= bufs_[second].offset_ + second_size) { + uint64_t rounddown_start = bufs_[second].offset_ + second_size; + uint64_t roundup_end = + Roundup(rounddown_start + readahead_size, alignment); + uint64_t roundup_len = roundup_end - rounddown_start; + uint64_t chunk_len = 0; + CalculateOffsetAndLen(alignment, rounddown_start, roundup_len, curr_, + false, chunk_len); + assert(chunk_len == 0); + assert(roundup_len >= chunk_len); + + bufs_[curr_].offset_ = rounddown_start; + uint64_t read_len = static_cast(roundup_len - chunk_len); + s = ReadAsync(opts, reader, read_len, rounddown_start, curr_); + if (!s.ok()) { + DestroyAndClearIOHandle(curr_); + bufs_[curr_].buffer_.Clear(); + return s; + } + } + curr_ = curr_ ^ 1; + } + + // 3. Call Poll only if data is needed for the second buffer. + // - Return if whole data is in curr_ and second buffer in progress. + // - If second buffer is empty, it will go for ReadAsync for second buffer. + if (!bufs_[curr_].async_read_in_progress_ && DoesBufferContainData(curr_) && + IsDataBlockInBuffer(offset, length, curr_)) { + // Whole data is in curr_. + UpdateBuffersIfNeeded(offset); + second = curr_ ^ 1; + if (bufs_[second].async_read_in_progress_) { return s; } - CopyDataToBuffer(second, offset, length); - // Length == 0: All the requested data has been copied to third buffer. It - // should go for only async prefetching. + } else { + PollAndUpdateBuffersIfNeeded(offset); + second = curr_ ^ 1; + } + + if (copy_to_third_buffer) { + offset = tmp_offset; + length = tmp_length; + } + + // 4. After polling and swapping buffers, if all the requested bytes are in + // curr_, it will only go for async prefetching. + // copy_to_third_buffer is a special case so it will be handled separately. + if (!copy_to_third_buffer && DoesBufferContainData(curr_) && + IsDataBlockInBuffer(offset, length, curr_)) { + offset += length; + length = 0; + + // Since async request was submitted directly by calling PrefetchAsync in + // last call, we don't need to prefetch further as this call is to poll + // the data submitted in previous call. + if (explicit_prefetch_submitted_) { + return s; + } + } + + // 5. Data is overlapping i.e. some of the data has been copied to third + // buffer + // and remaining will be updated below. + if (copy_to_third_buffer) { + CopyDataToBuffer(curr_, offset, length); + + // Length == 0: All the requested data has been copied to third buffer and + // it has already gone for async prefetching. It can return without doing + // anything further. // Length > 0: More data needs to be consumed so it will continue async and // sync prefetching and copy the remaining data to third buffer in the end. - // swap the buffers. - curr_ = curr_ ^ 1; - // Update prefetch_size as length has been updated in CopyDataToBuffer. - prefetch_size = length + readahead_size; + if (length == 0) { + return s; + } } + // 6. Go for ReadAsync and Read (if needed). + size_t prefetch_size = length + readahead_size; size_t _offset = static_cast(offset); - second = curr_ ^ 1; // offset and size alignment for curr_ buffer with synchronous prefetching uint64_t rounddown_start1 = Rounddown(_offset, alignment); @@ -368,19 +484,34 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal( uint64_t chunk_len2 = 0; CalculateOffsetAndLen(alignment, rounddown_start2, roundup_len2, second, false /*refit_tail*/, chunk_len2); - + assert(chunk_len2 == 0); // Update the buffer offset. bufs_[second].offset_ = rounddown_start2; assert(roundup_len2 >= chunk_len2); uint64_t read_len2 = static_cast(roundup_len2 - chunk_len2); - ReadAsync(opts, reader, read_len2, chunk_len2, rounddown_start2, second) - .PermitUncheckedError(); + Status tmp_s = ReadAsync(opts, reader, read_len2, rounddown_start2, second); + if (!tmp_s.ok()) { + DestroyAndClearIOHandle(second); + bufs_[second].buffer_.Clear(); + } } if (read_len1 > 0) { s = Read(opts, reader, rate_limiter_priority, read_len1, chunk_len1, rounddown_start1, curr_); if (!s.ok()) { + if (bufs_[second].io_handle_ != nullptr) { + std::vector handles; + handles.emplace_back(bufs_[second].io_handle_); + { + StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); + Status status = fs_->AbortIO(handles); + assert(status.ok()); + } + } + DestroyAndClearIOHandle(second); + bufs_[second].buffer_.Clear(); + bufs_[curr_].buffer_.Clear(); return s; } } @@ -462,12 +593,18 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( return false; } - // In case of async_io_, offset can be less than bufs_[curr_].offset_ because - // of reads not sequential and PrefetchAsync can be called for any block and - // RocksDB will call TryReadFromCacheAsync after PrefetchAsync to Poll for - // requested bytes. - if (bufs_[curr_].buffer_.CurrentSize() > 0 && offset < bufs_[curr_].offset_ && - prev_len_ != 0) { + if (explicit_prefetch_submitted_) { + if (prev_offset_ != offset) { + // Random offset called. So abort the IOs. + AbortAllIOs(); + bufs_[curr_].buffer_.Clear(); + bufs_[curr_ ^ 1].buffer_.Clear(); + explicit_prefetch_submitted_ = false; + return false; + } + } + + if (!explicit_prefetch_submitted_ && offset < bufs_[curr_].offset_) { return false; } @@ -479,8 +616,11 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( // If readahead is not enabled: return false. TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache", &readahead_size_); - if (offset < bufs_[curr_].offset_ || - offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { + + if (explicit_prefetch_submitted_ || + (bufs_[curr_].async_read_in_progress_ || + offset + n > + bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize())) { if (readahead_size_ > 0) { Status s; assert(reader != nullptr); @@ -493,11 +633,11 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( return false; } } - // Prefetch n + readahead_size_/2 synchronously as remaining // readahead_size_/2 will be prefetched asynchronously. s = PrefetchAsyncInternal(opts, reader, offset, n, readahead_size_ / 2, rate_limiter_priority, copy_to_third_buffer); + explicit_prefetch_submitted_ = false; if (!s.ok()) { if (status) { *status = s; @@ -507,11 +647,12 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( #endif return false; } - prefetched = async_request_submitted_ ? false : true; + prefetched = explicit_prefetch_submitted_ ? false : true; } else { return false; } } + UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); uint32_t index = curr_; @@ -523,14 +664,12 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync( if (prefetched) { readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); } - async_request_submitted_ = false; return true; } void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, - void* /*cb_arg*/) { - uint32_t index = curr_ ^ 1; - + void* cb_arg) { + uint32_t index = *(static_cast(cb_arg)); #ifndef NDEBUG if (req.result.size() < req.len) { // Fake an IO error to force db_stress fault injection to ignore @@ -565,82 +704,133 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, if (!enable_) { return Status::NotSupported(); } + TEST_SYNC_POINT("FilePrefetchBuffer::PrefetchAsync:Start"); - PollAndUpdateBuffersIfNeeded(offset); + num_file_reads_ = 0; + explicit_prefetch_submitted_ = false; + bool is_eligible_for_prefetching = false; + if (readahead_size_ > 0 && + (!implicit_auto_readahead_ || + num_file_reads_ + 1 >= num_file_reads_for_auto_readahead_)) { + is_eligible_for_prefetching = true; + } - // Index of second buffer. - uint32_t second = curr_ ^ 1; + // 1. Cancel any pending async read to make code simpler as buffers can be out + // of sync. + AbortAllIOs(); + // 2. Clear outdated data. + UpdateBuffersIfNeeded(offset); + uint32_t second = curr_ ^ 1; // Since PrefetchAsync can be called on non sequential reads. So offset can - // be less than buffers' offset. In that case it clears the buffer and - // prefetch that block. - if (bufs_[curr_].buffer_.CurrentSize() > 0 && offset < bufs_[curr_].offset_) { + // be less than curr_ buffers' offset. In that case also it clears both + // buffers. + if (DoesBufferContainData(curr_) && !IsOffsetInBuffer(offset, curr_)) { bufs_[curr_].buffer_.Clear(); + bufs_[second].buffer_.Clear(); } - // All requested bytes are already in the curr_ buffer. So no need to Read - // again. - if (bufs_[curr_].buffer_.CurrentSize() > 0 && - offset + n <= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { + UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false); + + bool data_found = false; + + // 3. If curr_ has full data. + if (DoesBufferContainData(curr_) && IsDataBlockInBuffer(offset, n, curr_)) { uint64_t offset_in_buffer = offset - bufs_[curr_].offset_; *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n); - return Status::OK(); + data_found = true; + // Update num_file_reads_ as TryReadFromCacheAsync won't be called for + // poll and update num_file_reads_ if data is found. + num_file_reads_++; + + // 3.1 If second also has some data or is not eligible for prefetching, + // return. + if (!is_eligible_for_prefetching || DoesBufferContainData(second)) { + return Status::OK(); + } + } else { + // Partial data in curr_. + bufs_[curr_].buffer_.Clear(); } + bufs_[second].buffer_.Clear(); Status s; size_t alignment = reader->file()->GetRequiredBufferAlignment(); - - // TODO akanksha: Handle the scenario if data is overlapping in 2 buffers. - // Currently, tt covers 2 scenarios. Either one buffer (curr_) has no data or - // it has partial data. It ignores the contents in second buffer (overlapping - // data in 2 buffers) and send the request to re-read that data again. - - // Clear the second buffer in order to do asynchronous prefetching. - bufs_[second].buffer_.Clear(); - + size_t prefetch_size = is_eligible_for_prefetching ? readahead_size_ / 2 : 0; size_t offset_to_read = static_cast(offset); - uint64_t rounddown_start = 0; - uint64_t roundup_end = 0; - - if (bufs_[curr_].buffer_.CurrentSize() == 0) { - // Prefetch full data. - rounddown_start = Rounddown(offset_to_read, alignment); - roundup_end = Roundup(offset_to_read + n, alignment); - } else { - // Prefetch remaining data. - size_t rem_length = n - (bufs_[curr_].buffer_.CurrentSize() - - (offset - bufs_[curr_].offset_)); - rounddown_start = bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize(); - roundup_end = Roundup(rounddown_start + rem_length, alignment); + uint64_t rounddown_start1 = 0; + uint64_t roundup_end1 = 0; + uint64_t rounddown_start2 = 0; + uint64_t roundup_end2 = 0; + uint64_t chunk_len1 = 0; + uint64_t chunk_len2 = 0; + size_t read_len1 = 0; + size_t read_len2 = 0; + + // - If curr_ is empty. + // - Call async read for full data + prefetch_size on curr_. + // - Call async read for prefetch_size on second if eligible. + // - If curr_ is filled. + // - prefetch_size on second. + // Calculate length and offsets for reading. + if (!DoesBufferContainData(curr_)) { + // Prefetch full data + prefetch_size in curr_. + rounddown_start1 = Rounddown(offset_to_read, alignment); + roundup_end1 = Roundup(offset_to_read + n + prefetch_size, alignment); + uint64_t roundup_len1 = roundup_end1 - rounddown_start1; + assert(roundup_len1 >= alignment); + assert(roundup_len1 % alignment == 0); + + CalculateOffsetAndLen(alignment, rounddown_start1, roundup_len1, curr_, + false, chunk_len1); + assert(chunk_len1 == 0); + assert(roundup_len1 >= chunk_len1); + read_len1 = static_cast(roundup_len1 - chunk_len1); + bufs_[curr_].offset_ = rounddown_start1; } - uint64_t roundup_len = roundup_end - rounddown_start; - assert(roundup_len >= alignment); - assert(roundup_len % alignment == 0); - - uint64_t chunk_len = 0; - CalculateOffsetAndLen(alignment, rounddown_start, roundup_len, second, false, - chunk_len); - - // Update the buffer offset. - bufs_[second].offset_ = rounddown_start; - assert(roundup_len >= chunk_len); - - size_t read_len = static_cast(roundup_len - chunk_len); + if (is_eligible_for_prefetching) { + if (DoesBufferContainData(curr_)) { + rounddown_start2 = + bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize(); + } else { + rounddown_start2 = roundup_end1; + } - s = ReadAsync(opts, reader, read_len, chunk_len, rounddown_start, second); + roundup_end2 = Roundup(rounddown_start2 + prefetch_size, alignment); + uint64_t roundup_len2 = roundup_end2 - rounddown_start2; - if (!s.ok()) { - return s; + assert(roundup_len2 >= alignment); + CalculateOffsetAndLen(alignment, rounddown_start2, roundup_len2, second, + false, chunk_len2); + assert(chunk_len2 == 0); + assert(roundup_len2 >= chunk_len2); + read_len2 = static_cast(roundup_len2 - chunk_len2); + // Update the buffer offset. + bufs_[second].offset_ = rounddown_start2; } - // Update read pattern so that TryReadFromCacheAsync call be called to Poll - // the data. It will return without polling if blocks are not sequential. - UpdateReadPattern(offset, n, /*decrease_readaheadsize=*/false); - prev_len_ = 0; - async_request_submitted_ = true; - - return Status::TryAgain(); + if (read_len1) { + s = ReadAsync(opts, reader, read_len1, rounddown_start1, curr_); + if (!s.ok()) { + DestroyAndClearIOHandle(curr_); + bufs_[curr_].buffer_.Clear(); + return s; + } + explicit_prefetch_submitted_ = true; + prev_len_ = 0; + } + if (read_len2) { + s = ReadAsync(opts, reader, read_len2, rounddown_start2, second); + if (!s.ok()) { + DestroyAndClearIOHandle(second); + bufs_[second].buffer_.Clear(); + return s; + } + readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); + } + return (data_found ? Status::OK() : Status::TryAgain()); } + } // namespace ROCKSDB_NAMESPACE diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 9c70d4895..00ea8ae64 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -20,18 +20,37 @@ #include "rocksdb/file_system.h" #include "rocksdb/options.h" #include "util/aligned_buffer.h" +#include "util/autovector.h" #include "util/stop_watch.h" namespace ROCKSDB_NAMESPACE { -#define DEAFULT_DECREMENT 8 * 1024 +#define DEFAULT_DECREMENT 8 * 1024 struct IOOptions; class RandomAccessFileReader; struct BufferInfo { AlignedBuffer buffer_; + uint64_t offset_ = 0; + + // Below parameters are used in case of async read flow. + // Length requested for in ReadAsync. + size_t async_req_len_ = 0; + + // async_read_in_progress can be used as mutex. Callback can update the buffer + // and its size but async_read_in_progress is only set by main thread. + bool async_read_in_progress_ = false; + + // io_handle is allocated and used by underlying file system in case of + // asynchronous reads. + void* io_handle_ = nullptr; + + IOHandleDeleter del_fn_ = nullptr; + + // pos represents the index of this buffer in vector of BufferInfo. + uint32_t pos_ = 0; }; // FilePrefetchBuffer is a smart buffer to store and read data from a file. @@ -53,9 +72,6 @@ class FilePrefetchBuffer { // it. Used for adaptable readahead of the file footer/metadata. // implicit_auto_readahead : Readahead is enabled implicitly by rocksdb after // doing sequential scans for two times. - // async_io : When async_io is enabled, if it's implicit_auto_readahead, it - // prefetches data asynchronously in second buffer while curr_ is being - // consumed. // // Automatic readhead is enabled for a file if readahead_size // and max_readahead_size are passed in. @@ -80,29 +96,36 @@ class FilePrefetchBuffer { prev_len_(0), num_file_reads_for_auto_readahead_(num_file_reads_for_auto_readahead), num_file_reads_(num_file_reads), - io_handle_(nullptr), - del_fn_(nullptr), - async_read_in_progress_(false), - async_request_submitted_(false), + explicit_prefetch_submitted_(false), fs_(fs), clock_(clock), stats_(stats) { assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) || (num_file_reads_ == 0)); - // If async_io_ is enabled, data is asynchronously filled in second buffer - // while curr_ is being consumed. If data is overlapping in two buffers, - // data is copied to third buffer to return continuous buffer. + // If ReadOptions.async_io is enabled, data is asynchronously filled in + // second buffer while curr_ is being consumed. If data is overlapping in + // two buffers, data is copied to third buffer to return continuous buffer. bufs_.resize(3); + for (uint32_t i = 0; i < 2; i++) { + bufs_[i].pos_ = i; + } } ~FilePrefetchBuffer() { // Abort any pending async read request before destroying the class object. - if (async_read_in_progress_ && fs_ != nullptr) { + if (fs_ != nullptr) { std::vector handles; - handles.emplace_back(io_handle_); - StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); - Status s = fs_->AbortIO(handles); - assert(s.ok()); + for (uint32_t i = 0; i < 2; i++) { + if (bufs_[i].async_read_in_progress_ && + bufs_[i].io_handle_ != nullptr) { + handles.emplace_back(bufs_[i].io_handle_); + } + } + if (!handles.empty()) { + StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); + Status s = fs_->AbortIO(handles); + assert(s.ok()); + } } // Prefetch buffer bytes discarded. @@ -112,7 +135,7 @@ class FilePrefetchBuffer { int first = i; int second = i ^ 1; - if (bufs_[first].buffer_.CurrentSize() > 0) { + if (DoesBufferContainData(first)) { // If last block was read completely from first and some bytes in // first buffer are still unconsumed. if (prev_offset_ >= bufs_[first].offset_ && @@ -124,7 +147,7 @@ class FilePrefetchBuffer { // If data was in second buffer and some/whole block bytes were read // from second buffer. else if (prev_offset_ < bufs_[first].offset_ && - bufs_[second].buffer_.CurrentSize() > 0) { + !DoesBufferContainData(second)) { // If last block read was completely from different buffer, this // buffer is unconsumed. if (prev_offset_ + prev_len_ <= bufs_[first].offset_) { @@ -142,14 +165,12 @@ class FilePrefetchBuffer { } } } - RecordInHistogram(stats_, PREFETCHED_BYTES_DISCARDED, bytes_discarded); - // Release io_handle_. - if (io_handle_ != nullptr && del_fn_ != nullptr) { - del_fn_(io_handle_); - io_handle_ = nullptr; - del_fn_ = nullptr; + for (uint32_t i = 0; i < 2; i++) { + // Release io_handle. + DestroyAndClearIOHandle(i); } + RecordInHistogram(stats_, PREFETCHED_BYTES_DISCARDED, bytes_discarded); } // Load data into the buffer from a file. @@ -158,9 +179,6 @@ class FilePrefetchBuffer { // n : the number of bytes to read. // rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to // bypass. - // is_async_read : if the data should be prefetched by calling read - // asynchronously. It should be set true when called - // from TryReadFromCache. Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Env::IOPriority rate_limiter_priority); @@ -226,7 +244,7 @@ class FilePrefetchBuffer { } void DecreaseReadAheadIfEligible(uint64_t offset, size_t size, - size_t value = DEAFULT_DECREMENT) { + size_t value = DEFAULT_DECREMENT) { // Decrease the readahead_size if // - its enabled internally by RocksDB (implicit_auto_readahead_) and, // - readahead_size is greater than 0 and, @@ -236,9 +254,11 @@ class FilePrefetchBuffer { // - block is sequential with the previous read and, // - num_file_reads_ + 1 (including this read) > // num_file_reads_for_auto_readahead_ + size_t curr_size = bufs_[curr_].async_read_in_progress_ + ? bufs_[curr_].async_req_len_ + : bufs_[curr_].buffer_.CurrentSize(); if (implicit_auto_readahead_ && readahead_size_ > 0) { - if ((offset + size > - bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) && + if ((offset + size > bufs_[curr_].offset_ + curr_size) && IsBlockSequential(offset) && (num_file_reads_ + 1 > num_file_reads_for_auto_readahead_)) { readahead_size_ = @@ -256,8 +276,14 @@ class FilePrefetchBuffer { // and data present in buffer_. It also allocates new buffer or refit tail if // required. void CalculateOffsetAndLen(size_t alignment, uint64_t offset, - size_t roundup_len, size_t index, bool refit_tail, - uint64_t& chunk_len); + size_t roundup_len, uint32_t index, + bool refit_tail, uint64_t& chunk_len); + + void AbortIOIfNeeded(uint64_t offset); + + void AbortAllIOs(); + + void UpdateBuffersIfNeeded(uint64_t offset); // It calls Poll API if any there is any pending asynchronous request. It then // checks if data is in any buffer. It clears the outdated data and swaps the @@ -275,8 +301,7 @@ class FilePrefetchBuffer { uint64_t chunk_len, uint64_t rounddown_start, uint32_t index); Status ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader, - uint64_t read_len, uint64_t chunk_len, - uint64_t rounddown_start, uint32_t index); + uint64_t read_len, uint64_t rounddown_start, uint32_t index); // Copy the data from src to third buffer. void CopyDataToBuffer(uint32_t src, uint64_t& offset, size_t& length); @@ -305,7 +330,7 @@ class FilePrefetchBuffer { // Since async request was submitted in last call directly by calling // PrefetchAsync, it skips num_file_reads_ check as this call is to poll the // data submitted in previous call. - if (async_request_submitted_) { + if (explicit_prefetch_submitted_) { return true; } if (num_file_reads_ <= num_file_reads_for_auto_readahead_) { @@ -315,15 +340,50 @@ class FilePrefetchBuffer { return true; } + // Helper functions. + bool IsDataBlockInBuffer(uint64_t offset, size_t length, uint32_t index) { + return (offset >= bufs_[index].offset_ && + offset + length <= + bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()); + } + bool IsOffsetInBuffer(uint64_t offset, uint32_t index) { + return (offset >= bufs_[index].offset_ && + offset < bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()); + } + bool DoesBufferContainData(uint32_t index) { + return bufs_[index].buffer_.CurrentSize() > 0; + } + bool IsBufferOutdated(uint64_t offset, uint32_t index) { + return ( + !bufs_[index].async_read_in_progress_ && DoesBufferContainData(index) && + offset >= bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()); + } + bool IsBufferOutdatedWithAsyncProgress(uint64_t offset, uint32_t index) { + return (bufs_[index].async_read_in_progress_ && + bufs_[index].io_handle_ != nullptr && + offset >= bufs_[index].offset_ + bufs_[index].async_req_len_); + } + + void DestroyAndClearIOHandle(uint32_t index) { + if (bufs_[index].io_handle_ != nullptr && bufs_[index].del_fn_ != nullptr) { + bufs_[index].del_fn_(bufs_[index].io_handle_); + bufs_[index].io_handle_ = nullptr; + bufs_[index].del_fn_ = nullptr; + } + bufs_[index].async_read_in_progress_ = false; + } + std::vector bufs_; // curr_ represents the index for bufs_ indicating which buffer is being // consumed currently. uint32_t curr_; + size_t readahead_size_; size_t initial_auto_readahead_size_; // FilePrefetchBuffer object won't be created from Iterator flow if // max_readahead_size_ = 0. size_t max_readahead_size_; + // The minimum `offset` ever passed to TryReadFromCache(). size_t min_offset_read_; // if false, TryReadFromCache() always return false, and we only take stats @@ -343,17 +403,11 @@ class FilePrefetchBuffer { uint64_t num_file_reads_for_auto_readahead_; uint64_t num_file_reads_; - // io_handle_ is allocated and used by underlying file system in case of - // asynchronous reads. - void* io_handle_; - IOHandleDeleter del_fn_; - bool async_read_in_progress_; - - // If async_request_submitted_ is set then it indicates RocksDB called - // PrefetchAsync to submit request. It needs to TryReadFromCacheAsync to poll - // the submitted request without checking if data is sequential and + // If explicit_prefetch_submitted_ is set then it indicates RocksDB called + // PrefetchAsync to submit request. It needs to call TryReadFromCacheAsync to + // poll the submitted request without checking if data is sequential and // num_file_reads_. - bool async_request_submitted_; + bool explicit_prefetch_submitted_; FileSystem* fs_; SystemClock* clock_; diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 24c439cb7..3e006246f 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -1066,6 +1066,7 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) { } ASSERT_OK(options.statistics->Reset()); + auto iter = std::unique_ptr(db_->NewIterator(ro)); int num_keys = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -1644,6 +1645,311 @@ namespace { Close(); } + TEST_P(PrefetchTest, MultipleSeekWithPosixFS) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + + const int kNumKeys = 1000; + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + options.statistics = CreateDBStatistics(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + int total_keys = 0; + // Write the keys. + { + WriteBatch batch; + Random rnd(309); + for (int j = 0; j < 5; j++) { + for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + } + + int num_keys_first_batch = 0; + int num_keys_second_batch = 0; + // Calculate number of keys without async_io for correctness validation. + { + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + // First Seek. + iter->Seek(BuildKey(450)); + while (iter->Valid() && num_keys_first_batch < 100) { + ASSERT_OK(iter->status()); + num_keys_first_batch++; + iter->Next(); + } + ASSERT_OK(iter->status()); + + iter->Seek(BuildKey(942)); + while (iter->Valid()) { + ASSERT_OK(iter->status()); + num_keys_second_batch++; + iter->Next(); + } + ASSERT_OK(iter->status()); + } + + int buff_prefetch_count = 0; + bool read_async_called = false; + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; + } + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Read the keys using seek. + { + ASSERT_OK(options.statistics->Reset()); + get_perf_context()->Reset(); + + auto iter = std::unique_ptr(db_->NewIterator(ro)); + int num_keys = 0; + // First Seek. + { + iter->Seek(BuildKey(450)); + while (iter->Valid() && num_keys < 100) { + ASSERT_OK(iter->status()); + num_keys++; + iter->Next(); + } + ASSERT_OK(iter->status()); + ASSERT_EQ(num_keys, num_keys_first_batch); + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, + &async_read_bytes); + + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + } + } + } + + // Second Seek. + { + num_keys = 0; + ASSERT_OK(options.statistics->Reset()); + get_perf_context()->Reset(); + + iter->Seek(BuildKey(942)); + while (iter->Valid()) { + ASSERT_OK(iter->status()); + num_keys++; + iter->Next(); + } + ASSERT_OK(iter->status()); + ASSERT_EQ(num_keys, num_keys_second_batch); + + ASSERT_GT(buff_prefetch_count, 0); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, + &async_read_bytes); + HistogramData prefetched_bytes_discarded; + options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, + &prefetched_bytes_discarded); + + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + } else { + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + } + ASSERT_GT(prefetched_bytes_discarded.count, 0); + } + } + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + Close(); + } + + TEST_P(PrefetchTest, SeekParallelizationTest1) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + const int kNumKeys = 2000; + // Set options + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + bool use_direct_io = std::get<0>(GetParam()); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + options.statistics = CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + int buff_prefetch_count = 0; + + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::PrefetchAsyncInternal:Start", + [&](void*) { buff_prefetch_count++; }); + + bool read_async_called = false; + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + SyncPoint::GetInstance()->EnableProcessing(); + + SyncPoint::GetInstance()->EnableProcessing(); + ReadOptions ro; + ro.adaptive_readahead = true; + ro.async_io = true; + + if (std::get<1>(GetParam())) { + ro.readahead_size = 16 * 1024; + } + + { + ASSERT_OK(options.statistics->Reset()); + // Each block contains around 4 keys. + auto iter = std::unique_ptr(db_->NewIterator(ro)); + iter->Seek( + BuildKey(0)); // Prefetch data because of seek parallelization. + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // New data block. Since num_file_reads in FilePrefetch after this read is + // 2, it won't go for prefetching. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // Prefetch data. + iter->Next(); + ASSERT_TRUE(iter->Valid()); + + // Check stats to make sure async prefetch is done. + { + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(async_read_bytes.count, 0); + ASSERT_GT(get_perf_context()->number_async_seek, 0); + if (std::get<1>(GetParam())) { + ASSERT_EQ(buff_prefetch_count, 1); + } else { + ASSERT_EQ(buff_prefetch_count, 2); + } + } else { + ASSERT_EQ(async_read_bytes.count, 0); + ASSERT_EQ(get_perf_context()->number_async_seek, 0); + ASSERT_EQ(buff_prefetch_count, 1); + } + } + + buff_prefetch_count = 0; + } + Close(); + } + #ifndef ROCKSDB_LITE #ifdef GFLAGS TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) { diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 8725584d7..71ae4577e 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -473,6 +473,7 @@ IOStatus RandomAccessFileReader::ReadAsync( if (use_direct_io() && is_aligned == false) { FSReadRequest aligned_req = Align(req, alignment); + aligned_req.status.PermitUncheckedError(); // Allocate aligned buffer. read_async_info->buf_.Alignment(alignment);