@ -225,10 +225,8 @@ void FilePrefetchBuffer::AbortIOIfNeeded(uint64_t offset) {
bufs_ [ second ] . async_read_in_progress_ = false ;
bufs_ [ second ] . async_read_in_progress_ = false ;
}
}
if ( bufs_ [ curr_ ] . io_handle_ = = nullptr & &
if ( bufs_ [ curr_ ] . io_handle_ = = nullptr ) {
bufs_ [ curr_ ] . async_read_in_progress_ ) {
bufs_ [ curr_ ] . async_read_in_progress_ = false ;
bufs_ [ curr_ ] . async_read_in_progress_ = false ;
curr_ = curr_ ^ 1 ;
}
}
}
}
@ -268,16 +266,36 @@ void FilePrefetchBuffer::UpdateBuffersIfNeeded(uint64_t offset) {
bufs_ [ second ] . buffer_ . Clear ( ) ;
bufs_ [ second ] . buffer_ . Clear ( ) ;
}
}
{
// In case buffers do not align, reset second buffer. This can happen in
// case readahead_size is set.
if ( ! bufs_ [ second ] . async_read_in_progress_ & &
! bufs_ [ curr_ ] . async_read_in_progress_ ) {
if ( DoesBufferContainData ( curr_ ) ) {
if ( bufs_ [ curr_ ] . offset_ + bufs_ [ curr_ ] . buffer_ . CurrentSize ( ) ! =
bufs_ [ second ] . offset_ ) {
bufs_ [ second ] . buffer_ . Clear ( ) ;
}
} else {
if ( ! IsOffsetInBuffer ( offset , second ) ) {
bufs_ [ second ] . buffer_ . Clear ( ) ;
}
}
}
}
// If data starts from second buffer, make it curr_. Second buffer can be
// If data starts from second buffer, make it curr_. Second buffer can be
// either partial filled or full.
// either partial filled, full or async read is in progress.
if ( ! bufs_ [ second ] . async_read_in_progress_ & & DoesBufferContainData ( second ) & &
if ( bufs_ [ second ] . async_read_in_progress_ ) {
IsOffsetInBuffer ( offset , second ) ) {
if ( IsOffsetInBufferWithAsyncProgress ( offset , second ) ) {
// Clear the curr_ as buffers have been swapped and curr_ contains the
curr_ = curr_ ^ 1 ;
// outdated data and switch the buffers.
}
if ( ! bufs_ [ curr_ ] . async_read_in_progress_ ) {
} else {
bufs_ [ curr_ ] . buffer_ . Clear ( ) ;
if ( DoesBufferContainData ( second ) & & IsOffsetInBuffer ( offset , second ) ) {
assert ( bufs_ [ curr_ ] . async_read_in_progress_ | |
bufs_ [ curr_ ] . buffer_ . CurrentSize ( ) = = 0 ) ;
curr_ = curr_ ^ 1 ;
}
}
curr_ = curr_ ^ 1 ;
}
}
}
}
@ -300,53 +318,16 @@ void FilePrefetchBuffer::PollAndUpdateBuffersIfNeeded(uint64_t offset) {
UpdateBuffersIfNeeded ( offset ) ;
UpdateBuffersIfNeeded ( offset ) ;
}
}
// If async_io is enabled in case of sequential reads, PrefetchAsyncInternal is
Status FilePrefetchBuffer : : HandleOverlappingData (
// called. When buffers are switched, we clear the curr_ buffer as we assume the
// data has been consumed because of sequential reads.
// Data in buffers will always be sequential with curr_ following second and
// not vice versa.
//
// Scenarios for prefetching asynchronously:
// Case1: If both buffers are empty, prefetch n + readahead_size_/2 bytes
// synchronously in curr_ and prefetch readahead_size_/2 async in second
// buffer.
// Case2: If second buffer has partial or full data, make it current and
// prefetch readahead_size_/2 async in second buffer. In case of
// partial data, prefetch remaining bytes from size n synchronously to
// fulfill the requested bytes request.
// Case3: If curr_ has partial data, prefetch remaining bytes from size n
// synchronously in curr_ to fulfill the requested bytes request and
// prefetch readahead_size_/2 bytes async in second buffer.
// Case4: (Special case) If data is in both buffers, copy requested data from
// curr_, send async request on curr_, wait for poll to fill second
// buffer (if any), and copy remaining data from second buffer to third
// buffer.
Status FilePrefetchBuffer : : PrefetchAsyncInternal (
const IOOptions & opts , RandomAccessFileReader * reader , uint64_t offset ,
const IOOptions & opts , RandomAccessFileReader * reader , uint64_t offset ,
size_t length , size_t readahead_size , Env : : IOPriority rate_limiter_priority ,
size_t length , size_t readahead_size ,
bool & copy_to_third_buffer ) {
Env : : IOPriority /*rate_limiter_priority*/ , bool & copy_to_third_buffer ,
if ( ! enable_ ) {
uint64_t & tmp_offset , size_t & tmp_length ) {
return Status : : OK ( ) ;
}
TEST_SYNC_POINT ( " FilePrefetchBuffer::PrefetchAsyncInternal:Start " ) ;
size_t alignment = reader - > file ( ) - > GetRequiredBufferAlignment ( ) ;
Status s ;
Status s ;
uint64_t tmp_offset = offset ;
size_t alignment = reader - > file ( ) - > GetRequiredBufferAlignment ( ) ;
size_t tmp_length = length ;
// 1. Abort IO and swap buffers if needed to point curr_ to first buffer with
// data.
{
if ( ! explicit_prefetch_submitted_ ) {
AbortIOIfNeeded ( offset ) ;
}
UpdateBuffersIfNeeded ( offset ) ;
}
uint32_t second = curr_ ^ 1 ;
uint32_t second = curr_ ^ 1 ;
// 2. If data is overlapping over two buffers, copy the data from curr_ and
// If data is overlapping over two buffers, copy the data from curr_ and
// call ReadAsync on curr_.
// call ReadAsync on curr_.
if ( ! bufs_ [ curr_ ] . async_read_in_progress_ & & DoesBufferContainData ( curr_ ) & &
if ( ! bufs_ [ curr_ ] . async_read_in_progress_ & & DoesBufferContainData ( curr_ ) & &
IsOffsetInBuffer ( offset , curr_ ) & &
IsOffsetInBuffer ( offset , curr_ ) & &
@ -391,21 +372,80 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(
}
}
curr_ = curr_ ^ 1 ;
curr_ = curr_ ^ 1 ;
}
}
return s ;
}
// If async_io is enabled in case of sequential reads, PrefetchAsyncInternal is
// called. When buffers are switched, we clear the curr_ buffer as we assume the
// data has been consumed because of sequential reads.
// Data in buffers will always be sequential with curr_ following second and
// not vice versa.
//
// Scenarios for prefetching asynchronously:
// Case1: If both buffers are empty, prefetch n + readahead_size_/2 bytes
// synchronously in curr_ and prefetch readahead_size_/2 async in second
// buffer.
// Case2: If second buffer has partial or full data, make it current and
// prefetch readahead_size_/2 async in second buffer. In case of
// partial data, prefetch remaining bytes from size n synchronously to
// fulfill the requested bytes request.
// Case3: If curr_ has partial data, prefetch remaining bytes from size n
// synchronously in curr_ to fulfill the requested bytes request and
// prefetch readahead_size_/2 bytes async in second buffer.
// Case4: (Special case) If data is in both buffers, copy requested data from
// curr_, send async request on curr_, wait for poll to fill second
// buffer (if any), and copy remaining data from second buffer to third
// buffer.
Status FilePrefetchBuffer : : PrefetchAsyncInternal (
const IOOptions & opts , RandomAccessFileReader * reader , uint64_t offset ,
size_t length , size_t readahead_size , Env : : IOPriority rate_limiter_priority ,
bool & copy_to_third_buffer ) {
if ( ! enable_ ) {
return Status : : OK ( ) ;
}
TEST_SYNC_POINT ( " FilePrefetchBuffer::PrefetchAsyncInternal:Start " ) ;
size_t alignment = reader - > file ( ) - > GetRequiredBufferAlignment ( ) ;
Status s ;
uint64_t tmp_offset = offset ;
size_t tmp_length = length ;
// 1. Abort IO and swap buffers if needed to point curr_ to first buffer with
// data.
if ( ! explicit_prefetch_submitted_ ) {
AbortIOIfNeeded ( offset ) ;
}
UpdateBuffersIfNeeded ( offset ) ;
// 2. Handle overlapping data over two buffers. If data is overlapping then
// during this call:
// - data from curr_ is copied into third buffer,
// - curr_ is send for async prefetching of further data if second buffer
// contains remaining requested data or in progress for async prefetch,
// - switch buffers and curr_ now points to second buffer to copy remaining
// data.
s = HandleOverlappingData ( opts , reader , offset , length , readahead_size ,
rate_limiter_priority , copy_to_third_buffer ,
tmp_offset , tmp_length ) ;
if ( ! s . ok ( ) ) {
return s ;
}
// 3. Call Poll only if data is needed for the second buffer.
// 3. Call Poll only if data is needed for the second buffer.
// - Return if whole data is in curr_ and second buffer in progress.
// - Return if whole data is in curr_ and second buffer is in progress or
// already full.
// - If second buffer is empty, it will go for ReadAsync for second buffer.
// - If second buffer is empty, it will go for ReadAsync for second buffer.
if ( ! bufs_ [ curr_ ] . async_read_in_progress_ & & DoesBufferContainData ( curr_ ) & &
if ( ! bufs_ [ curr_ ] . async_read_in_progress_ & & DoesBufferContainData ( curr_ ) & &
IsDataBlockInBuffer ( offset , length , curr_ ) ) {
IsDataBlockInBuffer ( offset , length , curr_ ) ) {
// Whole data is in curr_.
// Whole data is in curr_.
UpdateBuffersIfNeeded ( offset ) ;
UpdateBuffersIfNeeded ( offset ) ;
second = curr_ ^ 1 ;
if ( ! IsSecondBuffEligibleForPrefetching ( ) ) {
if ( bufs_ [ second ] . async_read_in_progress_ ) {
return s ;
return s ;
}
}
} else {
} else {
// After poll request, curr_ might be empty because of IOError in
// callback while reading or may contain required data.
PollAndUpdateBuffersIfNeeded ( offset ) ;
PollAndUpdateBuffersIfNeeded ( offset ) ;
second = curr_ ^ 1 ;
}
}
if ( copy_to_third_buffer ) {
if ( copy_to_third_buffer ) {
@ -427,19 +467,42 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(
if ( explicit_prefetch_submitted_ ) {
if ( explicit_prefetch_submitted_ ) {
return s ;
return s ;
}
}
if ( ! IsSecondBuffEligibleForPrefetching ( ) ) {
return s ;
}
}
uint32_t second = curr_ ^ 1 ;
assert ( ! bufs_ [ curr_ ] . async_read_in_progress_ ) ;
// In case because of some IOError curr_ got empty, abort IO for second as
// well. Otherwise data might not align if more data needs to be read in curr_
// which might overlap with second buffer.
if ( ! DoesBufferContainData ( curr_ ) & & bufs_ [ second ] . async_read_in_progress_ ) {
if ( bufs_ [ second ] . io_handle_ ! = nullptr ) {
std : : vector < void * > handles ;
handles . emplace_back ( bufs_ [ second ] . io_handle_ ) ;
{
StopWatch sw ( clock_ , stats_ , ASYNC_PREFETCH_ABORT_MICROS ) ;
Status status = fs_ - > AbortIO ( handles ) ;
assert ( status . ok ( ) ) ;
}
}
DestroyAndClearIOHandle ( second ) ;
bufs_ [ second ] . buffer_ . Clear ( ) ;
}
}
// 5. Data is overlapping i.e. some of the data has been copied to third
// 5. Data is overlapping i.e. some of the data has been copied to third
// buffer
// buffer and remaining will be updated below.
// and remaining will be updated below.
if ( copy_to_third_buffer & & DoesBufferContainData ( curr_ ) ) {
if ( copy_to_third_buffer ) {
CopyDataToBuffer ( curr_ , offset , length ) ;
CopyDataToBuffer ( curr_ , offset , length ) ;
// Length == 0: All the requested data has been copied to third buffer and
// Length == 0: All the requested data has been copied to third buffer and
// it has already gone for async prefetching. It can return without doing
// it has already gone for async prefetching. It can return without doing
// anything further.
// anything further.
// Length > 0: More data needs to be consumed so it will continue async and
// Length > 0: More data needs to be consumed so it will continue async
// sync prefetching and copy the remaining data to third buffer in the end.
// and sync prefetching and copy the remaining data to third buffer in the
// end.
if ( length = = 0 ) {
if ( length = = 0 ) {
return s ;
return s ;
}
}
@ -458,6 +521,9 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(
uint64_t chunk_len1 = 0 ;
uint64_t chunk_len1 = 0 ;
uint64_t read_len1 = 0 ;
uint64_t read_len1 = 0 ;
assert ( ! bufs_ [ second ] . async_read_in_progress_ & &
! DoesBufferContainData ( second ) ) ;
// For length == 0, skip the synchronous prefetching. read_len1 will be 0.
// For length == 0, skip the synchronous prefetching. read_len1 will be 0.
if ( length > 0 ) {
if ( length > 0 ) {
CalculateOffsetAndLen ( alignment , offset , roundup_len1 , curr_ ,
CalculateOffsetAndLen ( alignment , offset , roundup_len1 , curr_ ,
@ -594,8 +660,11 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync(
}
}
if ( explicit_prefetch_submitted_ ) {
if ( explicit_prefetch_submitted_ ) {
if ( prev_offset_ ! = offset ) {
// explicit_prefetch_submitted_ is special case where it expects request
// Random offset called. So abort the IOs.
// submitted in PrefetchAsync should match with this request. Otherwise
// buffers will be outdated.
// Random offset called. So abort the IOs.
if ( bufs_ [ curr_ ] . offset_ ! = offset ) {
AbortAllIOs ( ) ;
AbortAllIOs ( ) ;
bufs_ [ curr_ ] . buffer_ . Clear ( ) ;
bufs_ [ curr_ ] . buffer_ . Clear ( ) ;
bufs_ [ curr_ ^ 1 ] . buffer_ . Clear ( ) ;
bufs_ [ curr_ ^ 1 ] . buffer_ . Clear ( ) ;