diff --git a/HISTORY.md b/HISTORY.md index b65acfa87..c3cb17349 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,7 @@ * `BlockBasedTableOptions::detect_filter_construct_corruption` can now be dynamically configured using `DB::SetOptions`. * Automatically recover from retryable read IO errors during backgorund flush/compaction. * Experimental support for preserving file Temperatures through backup and restore, and for updating DB metadata for outside changes to file Temperature (`UpdateManifestForFilesState` or `ldb update_manifest --update_temperatures`). +* Experimental support for async_io in ReadOptions which is used by FilePrefetchBuffer to prefetch some of the data asynchronously, if reads are sequential and auto readahead is enabled by rocksdb internally. ### Bug Fixes * Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496) diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 488844c4f..77539abd2 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -10,7 +10,6 @@ #include "file/file_prefetch_buffer.h" #include -#include #include "file/random_access_file_reader.h" #include "monitoring/histogram.h" @@ -21,6 +20,110 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { + +void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment, + uint64_t offset, + size_t roundup_len, size_t index, + bool refit_tail, + uint64_t& chunk_len) { + uint64_t chunk_offset_in_buffer = 0; + bool copy_data_to_new_buffer = false; + // Check if requested bytes are in the existing buffer_. + // If only a few bytes exist -- reuse them & read only what is really needed. + // This is typically the case of incremental reading of data. + // If no bytes exist in buffer -- full pread. + if (bufs_[index].buffer_.CurrentSize() > 0 && + offset >= bufs_[index].offset_ && + offset <= bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) { + // Only a few requested bytes are in the buffer. memmove those chunk of + // bytes to the beginning, and memcpy them back into the new buffer if a + // new buffer is created. + chunk_offset_in_buffer = Rounddown( + static_cast(offset - bufs_[index].offset_), alignment); + chunk_len = static_cast(bufs_[index].buffer_.CurrentSize()) - + chunk_offset_in_buffer; + assert(chunk_offset_in_buffer % alignment == 0); + // assert(chunk_len % alignment == 0); + assert(chunk_offset_in_buffer + chunk_len <= + bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()); + if (chunk_len > 0) { + copy_data_to_new_buffer = true; + } else { + // this reset is not necessary, but just to be safe. + chunk_offset_in_buffer = 0; + } + } + + // Create a new buffer only if current capacity is not sufficient, and memcopy + // bytes from old buffer if needed (i.e., if chunk_len is greater than 0). + if (bufs_[index].buffer_.Capacity() < roundup_len) { + bufs_[index].buffer_.Alignment(alignment); + bufs_[index].buffer_.AllocateNewBuffer( + static_cast(roundup_len), copy_data_to_new_buffer, + chunk_offset_in_buffer, static_cast(chunk_len)); + } else if (chunk_len > 0 && refit_tail) { + // New buffer not needed. But memmove bytes from tail to the beginning since + // chunk_len is greater than 0. + bufs_[index].buffer_.RefitTail(static_cast(chunk_offset_in_buffer), + static_cast(chunk_len)); + } +} + +Status FilePrefetchBuffer::Read(const IOOptions& opts, + RandomAccessFileReader* reader, + Env::IOPriority rate_limiter_priority, + uint64_t read_len, uint64_t chunk_len, + uint64_t rounddown_start, uint32_t index) { + Slice result; + Status s = reader->Read(opts, rounddown_start + chunk_len, read_len, &result, + bufs_[index].buffer_.BufferStart() + chunk_len, + nullptr, rate_limiter_priority); +#ifndef NDEBUG + if (result.size() < read_len) { + // Fake an IO error to force db_stress fault injection to ignore + // truncated read errors + IGNORE_STATUS_IF_ERROR(Status::IOError()); + } +#endif + if (!s.ok()) { + return s; + } + + // Update the buffer offset and size. + bufs_[index].offset_ = rounddown_start; + bufs_[index].buffer_.Size(static_cast(chunk_len) + result.size()); + return s; +} + +Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts, + RandomAccessFileReader* reader, + Env::IOPriority rate_limiter_priority, + uint64_t read_len, uint64_t chunk_len, + uint64_t rounddown_start, uint32_t index) { + // Reset io_handle. + if (io_handle_ != nullptr && del_fn_ != nullptr) { + del_fn_(io_handle_); + io_handle_ = nullptr; + del_fn_ = nullptr; + } + + // callback for async read request. + auto fp = std::bind(&FilePrefetchBuffer::PrefetchAsyncCallback, this, + std::placeholders::_1, std::placeholders::_2); + FSReadRequest req; + Slice result; + req.len = read_len; + req.offset = rounddown_start + chunk_len; + req.result = result; + req.scratch = bufs_[index].buffer_.BufferStart() + chunk_len; + Status s = reader->ReadAsync(req, opts, fp, nullptr /*cb_arg*/, &io_handle_, + &del_fn_, rate_limiter_priority); + if (s.ok()) { + async_read_in_progress_ = true; + } + return s; +} + Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, @@ -29,6 +132,13 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, return Status::OK(); } TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start"); + + if (offset + n <= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { + // All requested bytes are already in the curr_ buffer. So no need to Read + // again. + return Status::OK(); + } + size_t alignment = reader->file()->GetRequiredBufferAlignment(); size_t offset_ = static_cast(offset); uint64_t rounddown_offset = Rounddown(offset_, alignment); @@ -37,74 +147,208 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, assert(roundup_len >= alignment); assert(roundup_len % alignment == 0); - // Check if requested bytes are in the existing buffer_. - // If all bytes exist -- return. - // If only a few bytes exist -- reuse them & read only what is really needed. - // This is typically the case of incremental reading of data. - // If no bytes exist in buffer -- full pread. + uint64_t chunk_len = 0; + CalculateOffsetAndLen(alignment, offset, roundup_len, curr_, + true /*refit_tail*/, chunk_len); + size_t read_len = static_cast(roundup_len - chunk_len); + + Status s = Read(opts, reader, rate_limiter_priority, read_len, chunk_len, + rounddown_offset, curr_); + return s; +} + +// Copy data from src to third buffer. +void FilePrefetchBuffer::CopyDataToBuffer(uint32_t src, uint64_t& offset, + size_t& length) { + if (length == 0) { + return; + } + uint64_t copy_offset = (offset - bufs_[src].offset_); + size_t copy_len = 0; + if (offset + length <= + bufs_[src].offset_ + bufs_[src].buffer_.CurrentSize()) { + // All the bytes are in src. + copy_len = length; + } else { + copy_len = bufs_[src].buffer_.CurrentSize() - copy_offset; + } + + memcpy(bufs_[2].buffer_.BufferStart() + bufs_[2].buffer_.CurrentSize(), + bufs_[src].buffer_.BufferStart() + copy_offset, copy_len); + + bufs_[2].buffer_.Size(bufs_[2].buffer_.CurrentSize() + copy_len); + + // Update offset and length. + offset += copy_len; + length -= copy_len; + + // length > 0 indicates it has consumed all data from the src buffer and it + // still needs to read more other buffer. + if (length > 0) { + bufs_[src].buffer_.Clear(); + } +} + +// If async_read = true: +// async_read is enabled in case of sequential reads. So when +// buffers are switched, we clear the curr_ buffer as we assume the data has +// been consumed because of sequential reads. +// +// Scenarios for prefetching asynchronously: +// Case1: If both buffers are empty, prefetch n bytes +// synchronously in curr_ +// and prefetch readahead_size_/2 async in second buffer. +// Case2: If second buffer has partial or full data, make it current and +// prefetch readahead_size_/2 async in second buffer. In case of +// partial data, prefetch remaining bytes from size n synchronously to +// fulfill the requested bytes request. +// Case3: If curr_ has partial data, prefetch remaining bytes from size n +// synchronously in curr_ to fulfill the requested bytes request and +// prefetch readahead_size_/2 bytes async in second buffer. +// Case4: If data is in both buffers, copy requested data from curr_ and second +// buffer to third buffer. If all requested bytes have been copied, do +// the asynchronous prefetching in second buffer. +Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, + RandomAccessFileReader* reader, + FileSystem* fs, uint64_t offset, + size_t length, size_t readahead_size, + Env::IOPriority rate_limiter_priority, + bool& copy_to_third_buffer) { + if (!enable_) { + return Status::OK(); + } + if (async_read_in_progress_ && fs != nullptr) { + // Wait for prefetch data to complete. + // No mutex is needed as PrefetchAsyncCallback updates the result in second + // buffer and FilePrefetchBuffer should wait for Poll before accessing the + // second buffer. + std::vector handles; + handles.emplace_back(io_handle_); + fs->Poll(handles, 1).PermitUncheckedError(); + } + // TODO akanksha: Update TEST_SYNC_POINT after new tests are added. + TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start"); Status s; - uint64_t chunk_offset_in_buffer = 0; - uint64_t chunk_len = 0; - bool copy_data_to_new_buffer = false; - if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ && - offset <= buffer_offset_ + buffer_.CurrentSize()) { - if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) { - // All requested bytes are already in the buffer. So no need to Read - // again. + size_t prefetch_size = length + readahead_size; + + size_t alignment = reader->file()->GetRequiredBufferAlignment(); + // Index of second buffer. + uint32_t second = curr_ ^ 1; + + // If data is in second buffer, make it curr_. Second buffer can be either + // partial filled or full. + if (bufs_[second].buffer_.CurrentSize() > 0 && + offset >= bufs_[second].offset_ && + offset <= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) { + // Clear the curr_ as buffers have been swapped and curr_ contains the + // outdated data. + bufs_[curr_].buffer_.Clear(); + // Switch the buffers. + curr_ = curr_ ^ 1; + second = curr_ ^ 1; + } + + // If second buffer contains outdated data, clear it for async prefetching. + // Outdated can be because previous sequential reads were read from the cache + // instead of this buffer. + if (bufs_[second].buffer_.CurrentSize() > 0 && + offset >= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) { + bufs_[second].buffer_.Clear(); + } + + // Data is overlapping i.e. some of the data is in curr_ buffer and remaining + // in second buffer. + if (bufs_[curr_].buffer_.CurrentSize() > 0 && + bufs_[second].buffer_.CurrentSize() > 0 && + offset >= bufs_[curr_].offset_ && + offset < bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() && + offset + prefetch_size > bufs_[second].offset_) { + // Allocate new buffer to third buffer; + bufs_[2].buffer_.Clear(); + bufs_[2].buffer_.Alignment(alignment); + bufs_[2].buffer_.AllocateNewBuffer(length); + bufs_[2].offset_ = offset; + copy_to_third_buffer = true; + + // Move data from curr_ buffer to third. + CopyDataToBuffer(curr_, offset, length); + + if (length == 0) { + // Requested data has been copied and curr_ still has unconsumed data. return s; - } else { - // Only a few requested bytes are in the buffer. memmove those chunk of - // bytes to the beginning, and memcpy them back into the new buffer if a - // new buffer is created. - chunk_offset_in_buffer = - Rounddown(static_cast(offset - buffer_offset_), alignment); - chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer; - assert(chunk_offset_in_buffer % alignment == 0); - assert(chunk_len % alignment == 0); - assert(chunk_offset_in_buffer + chunk_len <= - buffer_offset_ + buffer_.CurrentSize()); - if (chunk_len > 0) { - copy_data_to_new_buffer = true; - } else { - // this reset is not necessary, but just to be safe. - chunk_offset_in_buffer = 0; - } } + + CopyDataToBuffer(second, offset, length); + // Length == 0: All the requested data has been copied to third buffer. It + // should go for only async prefetching. + // Length > 0: More data needs to be consumed so it will continue async and + // sync prefetching and copy the remaining data to third buffer in the end. + // swap the buffers. + curr_ = curr_ ^ 1; + prefetch_size -= length; } - // Create a new buffer only if current capacity is not sufficient, and memcopy - // bytes from old buffer if needed (i.e., if chunk_len is greater than 0). - if (buffer_.Capacity() < roundup_len) { - buffer_.Alignment(alignment); - buffer_.AllocateNewBuffer(static_cast(roundup_len), - copy_data_to_new_buffer, chunk_offset_in_buffer, - static_cast(chunk_len)); - } else if (chunk_len > 0) { - // New buffer not needed. But memmove bytes from tail to the beginning since - // chunk_len is greater than 0. - buffer_.RefitTail(static_cast(chunk_offset_in_buffer), - static_cast(chunk_len)); + // Update second again if swap happened. + second = curr_ ^ 1; + size_t _offset = static_cast(offset); + + // offset and size alignment for curr_ buffer with synchronous prefetching + uint64_t rounddown_start1 = Rounddown(_offset, alignment); + uint64_t roundup_end1 = Roundup(_offset + prefetch_size, alignment); + uint64_t roundup_len1 = roundup_end1 - rounddown_start1; + assert(roundup_len1 >= alignment); + assert(roundup_len1 % alignment == 0); + uint64_t chunk_len1 = 0; + uint64_t read_len1 = 0; + + // For length == 0, skip the synchronous prefetching. read_len1 will be 0. + if (length > 0) { + CalculateOffsetAndLen(alignment, offset, roundup_len1, curr_, + false /*refit_tail*/, chunk_len1); + read_len1 = static_cast(roundup_len1 - chunk_len1); } + { + // offset and size alignment for second buffer for asynchronous + // prefetching + uint64_t rounddown_start2 = roundup_end1; + uint64_t roundup_end2 = + Roundup(rounddown_start2 + readahead_size, alignment); - Slice result; - size_t read_len = static_cast(roundup_len - chunk_len); - s = reader->Read(opts, rounddown_offset + chunk_len, read_len, &result, - buffer_.BufferStart() + chunk_len, nullptr, - rate_limiter_priority); - if (!s.ok()) { - return s; + // For length == 0, do the asynchronous prefetching in second instead of + // synchronous prefetching of remaining prefetch_size. + if (length == 0) { + rounddown_start2 = + bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize(); + roundup_end2 = Roundup(rounddown_start2 + prefetch_size, alignment); + } + + uint64_t roundup_len2 = roundup_end2 - rounddown_start2; + uint64_t chunk_len2 = 0; + CalculateOffsetAndLen(alignment, rounddown_start2, roundup_len2, second, + false /*refit_tail*/, chunk_len2); + + // Update the buffer offset. + bufs_[second].offset_ = rounddown_start2; + uint64_t read_len2 = static_cast(roundup_len2 - chunk_len2); + + ReadAsync(opts, reader, rate_limiter_priority, read_len2, chunk_len2, + rounddown_start2, second) + .PermitUncheckedError(); } -#ifndef NDEBUG - if (result.size() < read_len) { - // Fake an IO error to force db_stress fault injection to ignore - // truncated read errors - IGNORE_STATUS_IF_ERROR(Status::IOError()); + if (read_len1 > 0) { + s = Read(opts, reader, rate_limiter_priority, read_len1, chunk_len1, + rounddown_start1, curr_); + if (!s.ok()) { + return s; + } + } + + // Copy remaining requested bytes to third_buffer. + if (copy_to_third_buffer && length > 0) { + CopyDataToBuffer(curr_, offset, length); } -#endif - buffer_offset_ = rounddown_offset; - buffer_.Size(static_cast(chunk_len) + result.size()); return s; } @@ -117,7 +361,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, if (track_min_offset_ && offset < min_offset_read_) { min_offset_read_ = static_cast(offset); } - if (!enable_ || offset < buffer_offset_) { + if (!enable_ || (offset < bufs_[curr_].offset_)) { return false; } @@ -127,35 +371,93 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, // If readahead is not enabled: return false. TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache", &readahead_size_); - if (offset + n > buffer_offset_ + buffer_.CurrentSize()) { + if (offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { if (readahead_size_ > 0) { + Status s; assert(reader != nullptr); assert(max_readahead_size_ >= readahead_size_); - Status s; if (for_compaction) { s = Prefetch(opts, reader, offset, std::max(n, readahead_size_), rate_limiter_priority); } else { if (implicit_auto_readahead_) { - // Prefetch only if this read is sequential otherwise reset - // readahead_size_ to initial value. - if (!IsBlockSequential(offset)) { - UpdateReadPattern(offset, n); - ResetValues(); + if (!IsEligibleForPrefetch(offset, n)) { // Ignore status as Prefetch is not called. s.PermitUncheckedError(); return false; } - num_file_reads_++; - if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) { - UpdateReadPattern(offset, n); + } + s = Prefetch(opts, reader, offset, n + readahead_size_, + rate_limiter_priority); + } + if (!s.ok()) { + if (status) { + *status = s; + } +#ifndef NDEBUG + IGNORE_STATUS_IF_ERROR(s); +#endif + return false; + } + readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); + } else { + return false; + } + } + UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); + + uint64_t offset_in_buffer = offset - bufs_[curr_].offset_; + *result = Slice(bufs_[curr_].buffer_.BufferStart() + offset_in_buffer, n); + return true; +} + +// TODO akanksha: Merge this function with TryReadFromCache once async +// functionality is stable. +bool FilePrefetchBuffer::TryReadFromCacheAsync( + const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, + size_t n, Slice* result, Status* status, + Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */, + FileSystem* fs) { + if (track_min_offset_ && offset < min_offset_read_) { + min_offset_read_ = static_cast(offset); + } + if (!enable_ || (offset < bufs_[curr_].offset_)) { + return false; + } + + bool prefetched = false; + bool copy_to_third_buffer = false; + // If the buffer contains only a few of the requested bytes: + // If readahead is enabled: prefetch the remaining bytes + readahead bytes + // and satisfy the request. + // If readahead is not enabled: return false. + TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache", + &readahead_size_); + if (offset + n > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { + if (readahead_size_ > 0) { + Status s; + assert(reader != nullptr); + assert(max_readahead_size_ >= readahead_size_); + if (for_compaction) { + s = Prefetch(opts, reader, offset, std::max(n, readahead_size_), + rate_limiter_priority); + } else { + if (implicit_auto_readahead_) { + if (!IsEligibleForPrefetch(offset, n)) { // Ignore status as Prefetch is not called. s.PermitUncheckedError(); return false; } } - s = Prefetch(opts, reader, offset, n + readahead_size_, - rate_limiter_priority); + if (implicit_auto_readahead_ && async_io_) { + // Prefetch n + readahead_size_/2 synchronously as remaining + // readahead_size_/2 will be prefetched asynchronously. + s = PrefetchAsync(opts, reader, fs, offset, n, readahead_size_ / 2, + rate_limiter_priority, copy_to_third_buffer); + } else { + s = Prefetch(opts, reader, offset, n + readahead_size_, + rate_limiter_priority); + } } if (!s.ok()) { if (status) { @@ -166,14 +468,49 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, #endif return false; } - readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); + prefetched = true; } else { return false; } } - UpdateReadPattern(offset, n); - uint64_t offset_in_buffer = offset - buffer_offset_; - *result = Slice(buffer_.BufferStart() + offset_in_buffer, n); + UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); + + uint32_t index = curr_; + if (copy_to_third_buffer) { + index = 2; + } + uint64_t offset_in_buffer = offset - bufs_[index].offset_; + *result = Slice(bufs_[index].buffer_.BufferStart() + offset_in_buffer, n); + if (prefetched) { + readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); + } return true; } + +void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, + void* /*cb_arg*/) { + async_read_in_progress_ = false; + uint32_t index = curr_ ^ 1; + if (req.status.ok()) { + if (req.offset + req.result.size() <= + bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) { + // All requested bytes are already in the buffer. So no need to update. + return; + } + if (req.offset < bufs_[index].offset_) { + // Next block to be read has changed (Recent read was not a sequential + // read). So ignore this read. + return; + } + size_t current_size = bufs_[index].buffer_.CurrentSize(); + bufs_[index].buffer_.Size(current_size + req.result.size()); + } + + // Release io_handle_. + if (io_handle_ != nullptr && del_fn_ != nullptr) { + del_fn_(io_handle_); + io_handle_ = nullptr; + del_fn_ = nullptr; + } +} } // namespace ROCKSDB_NAMESPACE diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index a6e135e4e..c4713782c 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -16,6 +16,7 @@ #include "file/readahead_file_info.h" #include "port/port.h" #include "rocksdb/env.h" +#include "rocksdb/file_system.h" #include "rocksdb/options.h" #include "util/aligned_buffer.h" @@ -26,6 +27,11 @@ namespace ROCKSDB_NAMESPACE { struct IOOptions; class RandomAccessFileReader; +struct BufferInfo { + AlignedBuffer buffer_; + uint64_t offset_ = 0; +}; + // FilePrefetchBuffer is a smart buffer to store and read data from a file. class FilePrefetchBuffer { public: @@ -48,6 +54,9 @@ class FilePrefetchBuffer { // it. Used for adaptable readahead of the file footer/metadata. // implicit_auto_readahead : Readahead is enabled implicitly by rocksdb after // doing sequential scans for two times. + // async_io : When async_io is enabled, if it's implicit_auto_readahead, it + // prefetches data asynchronously in second buffer while curr_ is being + // consumed. // // Automatic readhead is enabled for a file if readahead_size // and max_readahead_size are passed in. @@ -55,8 +64,9 @@ class FilePrefetchBuffer { // `Prefetch` to load data into the buffer. FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0, bool enable = true, bool track_min_offset = false, - bool implicit_auto_readahead = false) - : buffer_offset_(0), + bool implicit_auto_readahead = false, + bool async_io = false) + : curr_(0), readahead_size_(readahead_size), max_readahead_size_(max_readahead_size), min_offset_read_(port::kMaxSizet), @@ -65,7 +75,16 @@ class FilePrefetchBuffer { implicit_auto_readahead_(implicit_auto_readahead), prev_offset_(0), prev_len_(0), - num_file_reads_(kMinNumFileReadsToStartAutoReadahead + 1) {} + num_file_reads_(kMinNumFileReadsToStartAutoReadahead + 1), + io_handle_(nullptr), + del_fn_(nullptr), + async_read_in_progress_(false), + async_io_(async_io) { + // If async_io_ is enabled, data is asynchronously filled in second buffer + // while curr_ is being consumed. If data is overlapping in two buffers, + // data is copied to third buffer to return continuous buffer. + bufs_.resize(3); + } // Load data into the buffer from a file. // reader : the file reader. @@ -73,10 +92,19 @@ class FilePrefetchBuffer { // n : the number of bytes to read. // rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to // bypass. + // is_async_read : if the data should be prefetched by calling read + // asynchronously. It should be set true when called + // from TryReadFromCache. Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Env::IOPriority rate_limiter_priority); + Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader, + FileSystem* fs, uint64_t offset, size_t length, + size_t readahead_size, + Env::IOPriority rate_limiter_priority, + bool& copy_to_third_buffer); + // Tries returning the data for a file read from this buffer if that data is // in the buffer. // It handles tracking the minimum read offset if track_min_offset = true. @@ -97,14 +125,20 @@ class FilePrefetchBuffer { Env::IOPriority rate_limiter_priority, bool for_compaction = false); + bool TryReadFromCacheAsync(const IOOptions& opts, + RandomAccessFileReader* reader, uint64_t offset, + size_t n, Slice* result, Status* status, + Env::IOPriority rate_limiter_priority, + bool for_compaction /* = false */, FileSystem* fs); + // The minimum `offset` ever passed to TryReadFromCache(). This will nly be // tracked if track_min_offset = true. size_t min_offset_read() const { return min_offset_read_; } // Called in case of implicit auto prefetching. void UpdateReadPattern(const uint64_t& offset, const size_t& len, - bool is_adaptive_readahead = false) { - if (is_adaptive_readahead) { + bool decrease_readaheadsize) { + if (decrease_readaheadsize) { // Since this block was eligible for prefetch but it was found in // cache, so check and decrease the readahead_size by 8KB (default) // if eligible. @@ -114,16 +148,6 @@ class FilePrefetchBuffer { prev_len_ = len; } - bool IsBlockSequential(const size_t& offset) { - return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset)); - } - - // Called in case of implicit auto prefetching. - void ResetValues() { - num_file_reads_ = 1; - readahead_size_ = kInitAutoReadaheadSize; - } - void GetReadaheadState(ReadaheadFileInfo::ReadaheadInfo* readahead_info) { readahead_info->readahead_size = readahead_size_; readahead_info->num_file_reads = num_file_reads_; @@ -141,7 +165,8 @@ class FilePrefetchBuffer { // - num_file_reads_ + 1 (including this read) > // kMinNumFileReadsToStartAutoReadahead if (implicit_auto_readahead_ && readahead_size_ > 0) { - if ((offset + size > buffer_offset_ + buffer_.CurrentSize()) && + if ((offset + size > + bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) && IsBlockSequential(offset) && (num_file_reads_ + 1 > kMinNumFileReadsToStartAutoReadahead)) { size_t initial_auto_readahead_size = kInitAutoReadaheadSize; @@ -152,9 +177,59 @@ class FilePrefetchBuffer { } } + bool IsEligibleForPrefetch(uint64_t offset, size_t n) { + // Prefetch only if this read is sequential otherwise reset readahead_size_ + // to initial value. + if (!IsBlockSequential(offset)) { + UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); + ResetValues(); + return false; + } + num_file_reads_++; + if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) { + UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); + return false; + } + return true; + } + + // Callback function passed to underlying FS in case of asynchronous reads. + void PrefetchAsyncCallback(const FSReadRequest& req, void* cb_arg); + private: - AlignedBuffer buffer_; - uint64_t buffer_offset_; + // Calculates roundoff offset and length to be prefetched based on alignment + // and data present in buffer_. It also allocates new buffer or refit tail if + // required. + void CalculateOffsetAndLen(size_t alignment, uint64_t offset, + size_t roundup_len, size_t index, bool refit_tail, + uint64_t& chunk_len); + + Status Read(const IOOptions& opts, RandomAccessFileReader* reader, + Env::IOPriority rate_limiter_priority, uint64_t read_len, + uint64_t chunk_len, uint64_t rounddown_start, uint32_t index); + + Status ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader, + Env::IOPriority rate_limiter_priority, uint64_t read_len, + uint64_t chunk_len, uint64_t rounddown_start, + uint32_t index); + + // Copy the data from src to third buffer. + void CopyDataToBuffer(uint32_t src, uint64_t& offset, size_t& length); + + bool IsBlockSequential(const size_t& offset) { + return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset)); + } + + // Called in case of implicit auto prefetching. + void ResetValues() { + num_file_reads_ = 1; + readahead_size_ = kInitAutoReadaheadSize; + } + + std::vector bufs_; + // curr_ represents the index for bufs_ indicating which buffer is being + // consumed currently. + uint32_t curr_; size_t readahead_size_; // FilePrefetchBuffer object won't be created from Iterator flow if // max_readahead_size_ = 0. @@ -174,5 +249,12 @@ class FilePrefetchBuffer { uint64_t prev_offset_; size_t prev_len_; int64_t num_file_reads_; + + // io_handle_ is allocated and used by underlying file system in case of + // asynchronous reads. + void* io_handle_; + IOHandleDeleter del_fn_; + bool async_read_in_progress_; + bool async_io_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 28f1a1b94..e13fc5d8d 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -718,9 +718,11 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { WriteBatch batch; Random rnd(309); + int total_keys = 0; for (int j = 0; j < 5; j++) { for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + total_keys++; } ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(Flush()); @@ -761,12 +763,16 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { ReadOptions ro; if (is_adaptive_readahead) { ro.adaptive_readahead = true; + // TODO akanksha: Remove after adding new units. + ro.async_io = true; } auto iter = std::unique_ptr(db_->NewIterator(ro)); int num_keys = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); num_keys++; } + ASSERT_EQ(num_keys, total_keys); ASSERT_GT(buff_prefetch_count, 0); buff_prefetch_count = 0; @@ -854,6 +860,8 @@ TEST_P(PrefetchTest2, NonSequentialReads) { // Iterate until prefetch is done. ReadOptions ro; ro.adaptive_readahead = true; + // TODO akanksha: Remove after adding new units. + ro.async_io = true; auto iter = std::unique_ptr(db_->NewIterator(ro)); iter->SeekToFirst(); while (iter->Valid() && buff_prefetch_count == 0) { @@ -940,6 +948,8 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { SyncPoint::GetInstance()->EnableProcessing(); ReadOptions ro; ro.adaptive_readahead = true; + // TODO akanksha: Remove after adding new units. + ro.async_io = true; { /* * Reseek keys from sequential Data Blocks within same partitioned @@ -958,28 +968,35 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { // After caching, blocks will be read from cache (Sequential blocks) auto iter = std::unique_ptr(db_->NewIterator(ro)); iter->Seek(BuildKey(0)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1000)); + ASSERT_TRUE(iter->Valid()); iter->Seek(BuildKey(1004)); // Prefetch data (not in cache). + ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); // Missed one sequential block but 1011 is already in buffer so // readahead will not be reset. iter->Seek(BuildKey(1011)); + ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); // Eligible to Prefetch data (not in buffer) but block is in cache so no // prefetch will happen and will result in decrease in readahead_size. // readahead_size will be 8 * 1024 iter->Seek(BuildKey(1015)); + ASSERT_TRUE(iter->Valid()); expected_current_readahead_size -= decrease_readahead_size; // 1016 is the same block as 1015. So no change in readahead_size. iter->Seek(BuildKey(1016)); + ASSERT_TRUE(iter->Valid()); // Prefetch data (not in buffer) but found in cache. So decrease // readahead_size. Since it will 0 after decrementing so readahead_size will // be set to initial value. iter->Seek(BuildKey(1019)); + ASSERT_TRUE(iter->Valid()); expected_current_readahead_size = std::max( decrease_readahead_size, (expected_current_readahead_size >= decrease_readahead_size @@ -988,6 +1005,7 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { // Prefetch next sequential data. iter->Seek(BuildKey(1022)); + ASSERT_TRUE(iter->Valid()); ASSERT_EQ(current_readahead_size, expected_current_readahead_size); ASSERT_EQ(buff_prefetch_count, 2); buff_prefetch_count = 0; diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 976f64ec6..0d354cfa4 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -424,4 +424,20 @@ IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro, return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts); } } + +// TODO akanksha: Add perf_times etc. +IOStatus RandomAccessFileReader::ReadAsync( + FSReadRequest& req, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handle, IOHandleDeleter* del_fn, + Env::IOPriority rate_limiter_priority) { + if (use_direct_io()) { + req.status = Read(opts, req.offset, req.len, &(req.result), req.scratch, + nullptr /*dbg*/, rate_limiter_priority); + cb(req, cb_arg); + return IOStatus::OK(); + } + return file_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, + nullptr /*dbg*/); +} } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index 8f1e179f4..26e48478d 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -174,5 +174,10 @@ class RandomAccessFileReader { bool use_direct_io() const { return file_->use_direct_io(); } IOStatus PrepareIOOptions(const ReadOptions& ro, IOOptions& opts); + + IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, + std::function cb, + void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, + Env::IOPriority rate_limiter_priority); }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 44c30447f..ea5351bf5 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1598,6 +1598,15 @@ struct ReadOptions { // Default: `Env::IO_TOTAL`. Env::IOPriority rate_limiter_priority = Env::IO_TOTAL; + // Experimental + // + // If async_io is enabled, RocksDB will prefetch some of data asynchronously. + // RocksDB apply it if reads are sequential and its internal automatic + // prefetching. + // + // Default: false + bool async_io; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/options/options.cc b/options/options.cc index 3ed6f196f..26b5a4d33 100644 --- a/options/options.cc +++ b/options/options.cc @@ -665,7 +665,8 @@ ReadOptions::ReadOptions() deadline(std::chrono::microseconds::zero()), io_timeout(std::chrono::microseconds::zero()), value_size_soft_limit(std::numeric_limits::max()), - adaptive_readahead(false) {} + adaptive_readahead(false), + async_io(false) {} ReadOptions::ReadOptions(bool cksum, bool cache) : snapshot(nullptr), @@ -689,6 +690,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) deadline(std::chrono::microseconds::zero()), io_timeout(std::chrono::microseconds::zero()), value_size_soft_limit(std::numeric_limits::max()), - adaptive_readahead(false) {} + adaptive_readahead(false), + async_io(false) {} } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_iterator.cc b/table/block_based/block_based_table_iterator.cc index fee006d15..75b595cc1 100644 --- a/table/block_based/block_based_table_iterator.cc +++ b/table/block_based/block_based_table_iterator.cc @@ -232,9 +232,9 @@ void BlockBasedTableIterator::InitDataBlock() { // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0. // Explicit user requested readahead: // Enabled from the very first IO when ReadOptions.readahead_size is set. - block_prefetcher_.PrefetchIfNeeded(rep, data_block_handle, - read_options_.readahead_size, - is_for_compaction); + block_prefetcher_.PrefetchIfNeeded( + rep, data_block_handle, read_options_.readahead_size, is_for_compaction, + read_options_.async_io); Status s; table_->NewDataBlockIterator( read_options_, data_block_handle, &block_iter_, BlockType::kData, diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 3a1643846..b6e512c24 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -1472,9 +1472,9 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( // Update the block details so that PrefetchBuffer can use the read // pattern to determine if reads are sequential or not for // prefetching. It should also take in account blocks read from cache. - prefetch_buffer->UpdateReadPattern(handle.offset(), - BlockSizeWithTrailer(handle), - ro.adaptive_readahead); + prefetch_buffer->UpdateReadPattern( + handle.offset(), BlockSizeWithTrailer(handle), + ro.adaptive_readahead /*decrease_readahead_size*/); } } } diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 29736d326..febfc8b81 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -652,20 +652,21 @@ struct BlockBasedTable::Rep { void CreateFilePrefetchBuffer(size_t readahead_size, size_t max_readahead_size, std::unique_ptr* fpb, - bool implicit_auto_readahead) const { + bool implicit_auto_readahead, + bool async_io) const { fpb->reset(new FilePrefetchBuffer(readahead_size, max_readahead_size, !ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */, - implicit_auto_readahead)); + implicit_auto_readahead, async_io)); } void CreateFilePrefetchBufferIfNotExists( size_t readahead_size, size_t max_readahead_size, - std::unique_ptr* fpb, - bool implicit_auto_readahead) const { + std::unique_ptr* fpb, bool implicit_auto_readahead, + bool async_io) const { if (!(*fpb)) { CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb, - implicit_auto_readahead); + implicit_auto_readahead, async_io); } } }; diff --git a/table/block_based/block_prefetcher.cc b/table/block_based/block_prefetcher.cc index e488e4f5c..924a35194 100644 --- a/table/block_based/block_prefetcher.cc +++ b/table/block_based/block_prefetcher.cc @@ -14,18 +14,18 @@ namespace ROCKSDB_NAMESPACE { void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep, const BlockHandle& handle, size_t readahead_size, - bool is_for_compaction) { + bool is_for_compaction, bool async_io) { if (is_for_compaction) { - rep->CreateFilePrefetchBufferIfNotExists(compaction_readahead_size_, - compaction_readahead_size_, - &prefetch_buffer_, false); + rep->CreateFilePrefetchBufferIfNotExists( + compaction_readahead_size_, compaction_readahead_size_, + &prefetch_buffer_, false, async_io); return; } // Explicit user requested readahead. if (readahead_size > 0) { - rep->CreateFilePrefetchBufferIfNotExists(readahead_size, readahead_size, - &prefetch_buffer_, false); + rep->CreateFilePrefetchBufferIfNotExists( + readahead_size, readahead_size, &prefetch_buffer_, false, async_io); return; } @@ -71,7 +71,7 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep, if (rep->file->use_direct_io()) { rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_, max_auto_readahead_size, - &prefetch_buffer_, true); + &prefetch_buffer_, true, async_io); return; } @@ -88,7 +88,7 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep, if (s.IsNotSupported()) { rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_, max_auto_readahead_size, - &prefetch_buffer_, true); + &prefetch_buffer_, true, async_io); return; } diff --git a/table/block_based/block_prefetcher.h b/table/block_based/block_prefetcher.h index 74100c380..85c52be21 100644 --- a/table/block_based/block_prefetcher.h +++ b/table/block_based/block_prefetcher.h @@ -16,7 +16,7 @@ class BlockPrefetcher { : compaction_readahead_size_(compaction_readahead_size) {} void PrefetchIfNeeded(const BlockBasedTable::Rep* rep, const BlockHandle& handle, size_t readahead_size, - bool is_for_compaction); + bool is_for_compaction, bool async_io); FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); } void UpdateReadPattern(const uint64_t& offset, const size_t& len) { diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index f9d53aba7..540c95d4d 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -497,7 +497,8 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro, uint64_t prefetch_len = last_off - prefetch_off; std::unique_ptr prefetch_buffer; rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer, - false /* Implicit autoreadahead */); + false /* Implicit autoreadahead */, + false /*async_io*/); IOOptions opts; s = rep->file->PrepareIOOptions(ro, opts); diff --git a/table/block_based/partitioned_index_iterator.cc b/table/block_based/partitioned_index_iterator.cc index 12ac6f96c..15a3aac87 100644 --- a/table/block_based/partitioned_index_iterator.cc +++ b/table/block_based/partitioned_index_iterator.cc @@ -89,9 +89,9 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() { // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0. // Explicit user requested readahead: // Enabled from the very first IO when ReadOptions.readahead_size is set. - block_prefetcher_.PrefetchIfNeeded(rep, partitioned_index_handle, - read_options_.readahead_size, - is_for_compaction); + block_prefetcher_.PrefetchIfNeeded( + rep, partitioned_index_handle, read_options_.readahead_size, + is_for_compaction, read_options_.async_io); Status s; table_->NewDataBlockIterator( read_options_, partitioned_index_handle, &block_iter_, diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index e295d41a4..e3b433b9a 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -81,6 +81,7 @@ InternalIteratorBase* PartitionIndexReader::NewIterator( ro.deadline = read_options.deadline; ro.io_timeout = read_options.io_timeout; ro.adaptive_readahead = read_options.adaptive_readahead; + ro.async_io = read_options.async_io; // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. std::unique_ptr> index_iter( @@ -152,7 +153,8 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro, uint64_t prefetch_len = last_off - prefetch_off; std::unique_ptr prefetch_buffer; rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer, - false /*Implicit auto readahead*/); + false /*Implicit auto readahead*/, + false /*async_io*/); IOOptions opts; { Status s = rep->file->PrepareIOOptions(ro, opts); diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index cca6d4911..dbcdef74c 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -69,17 +69,28 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() { if (prefetch_buffer_ != nullptr) { IOOptions opts; IOStatus io_s = file_->PrepareIOOptions(read_options_, opts); - if (io_s.ok() && - prefetch_buffer_->TryReadFromCache( + if (io_s.ok()) { + bool read_from_prefetch_buffer = false; + if (read_options_.async_io) { + read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCacheAsync( opts, file_, handle_.offset(), block_size_with_trailer_, &slice_, - &io_s, read_options_.rate_limiter_priority, for_compaction_)) { - ProcessTrailerIfPresent(); - if (!io_status_.ok()) { - return true; + &io_s, read_options_.rate_limiter_priority, for_compaction_, + ioptions_.fs.get()); + } else { + read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache( + opts, file_, handle_.offset(), block_size_with_trailer_, &slice_, + &io_s, read_options_.rate_limiter_priority, for_compaction_); } - got_from_prefetch_buffer_ = true; - used_buf_ = const_cast(slice_.data()); - } else if (!io_s.ok()) { + if (read_from_prefetch_buffer) { + ProcessTrailerIfPresent(); + if (!io_status_.ok()) { + return true; + } + got_from_prefetch_buffer_ = true; + used_buf_ = const_cast(slice_.data()); + } + } + if (!io_s.ok()) { io_status_ = io_s; return true; }