@ -10,7 +10,6 @@
# include "file/file_prefetch_buffer.h"
# include "file/file_prefetch_buffer.h"
# include <algorithm>
# include <algorithm>
# include <mutex>
# include "file/random_access_file_reader.h"
# include "file/random_access_file_reader.h"
# include "monitoring/histogram.h"
# include "monitoring/histogram.h"
@ -21,49 +20,32 @@
# include "util/rate_limiter.h"
# include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE {
namespace ROCKSDB_NAMESPACE {
Status FilePrefetchBuffer : : Prefetch ( const IOOptions & opts ,
RandomAccessFileReader * reader ,
uint64_t offset , size_t n ,
Env : : IOPriority rate_limiter_priority ) {
if ( ! enable_ | | reader = = nullptr ) {
return Status : : OK ( ) ;
}
TEST_SYNC_POINT ( " FilePrefetchBuffer::Prefetch:Start " ) ;
size_t alignment = reader - > file ( ) - > GetRequiredBufferAlignment ( ) ;
size_t offset_ = static_cast < size_t > ( offset ) ;
uint64_t rounddown_offset = Rounddown ( offset_ , alignment ) ;
uint64_t roundup_end = Roundup ( offset_ + n , alignment ) ;
uint64_t roundup_len = roundup_end - rounddown_offset ;
assert ( roundup_len > = alignment ) ;
assert ( roundup_len % alignment = = 0 ) ;
void FilePrefetchBuffer : : CalculateOffsetAndLen ( size_t alignment ,
uint64_t offset ,
size_t roundup_len , size_t index ,
bool refit_tail ,
uint64_t & chunk_len ) {
uint64_t chunk_offset_in_buffer = 0 ;
bool copy_data_to_new_buffer = false ;
// Check if requested bytes are in the existing buffer_.
// Check if requested bytes are in the existing buffer_.
// If all bytes exist -- return.
// If only a few bytes exist -- reuse them & read only what is really needed.
// If only a few bytes exist -- reuse them & read only what is really needed.
// This is typically the case of incremental reading of data.
// This is typically the case of incremental reading of data.
// If no bytes exist in buffer -- full pread.
// If no bytes exist in buffer -- full pread.
if ( bufs_ [ index ] . buffer_ . CurrentSize ( ) > 0 & &
Status s ;
offset > = bufs_ [ index ] . offset_ & &
uint64_t chunk_offset_in_buffer = 0 ;
offset < = bufs_ [ index ] . offset_ + bufs_ [ index ] . buffer_ . CurrentSize ( ) ) {
uint64_t chunk_len = 0 ;
bool copy_data_to_new_buffer = false ;
if ( buffer_ . CurrentSize ( ) > 0 & & offset > = buffer_offset_ & &
offset < = buffer_offset_ + buffer_ . CurrentSize ( ) ) {
if ( offset + n < = buffer_offset_ + buffer_ . CurrentSize ( ) ) {
// All requested bytes are already in the buffer. So no need to Read
// again.
return s ;
} else {
// Only a few requested bytes are in the buffer. memmove those chunk of
// Only a few requested bytes are in the buffer. memmove those chunk of
// bytes to the beginning, and memcpy them back into the new buffer if a
// bytes to the beginning, and memcpy them back into the new buffer if a
// new buffer is created.
// new buffer is created.
chunk_offset_in_buffer =
chunk_offset_in_buffer = Rounddown (
Rounddown ( static_cast < size_t > ( offset - buffer_offset_ ) , alignment ) ;
static_cast < size_t > ( offset - bufs_ [ index ] . offset_ ) , alignment ) ;
chunk_len = buffer_ . CurrentSize ( ) - chunk_offset_in_buffer ;
chunk_len = static_cast < uint64_t > ( bufs_ [ index ] . buffer_ . CurrentSize ( ) ) -
chunk_offset_in_buffer ;
assert ( chunk_offset_in_buffer % alignment = = 0 ) ;
assert ( chunk_offset_in_buffer % alignment = = 0 ) ;
assert ( chunk_len % alignment = = 0 ) ;
// assert(chunk_len % alignment == 0);
assert ( chunk_offset_in_buffer + chunk_len < =
assert ( chunk_offset_in_buffer + chunk_len < =
buffer_offset_ + buffer_ . CurrentSize ( ) ) ;
bufs_ [ index ] . offset_ + bufs_ [ index ] . buffer_ . CurrentSize ( ) ) ;
if ( chunk_len > 0 ) {
if ( chunk_len > 0 ) {
copy_data_to_new_buffer = true ;
copy_data_to_new_buffer = true ;
} else {
} else {
@ -71,31 +53,31 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
chunk_offset_in_buffer = 0 ;
chunk_offset_in_buffer = 0 ;
}
}
}
}
}
// Create a new buffer only if current capacity is not sufficient, and memcopy
// Create a new buffer only if current capacity is not sufficient, and memcopy
// bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
// bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
if ( buffer_ . Capacity ( ) < roundup_len ) {
if ( bufs_ [ index ] . buf fer_ . Capacity ( ) < roundup_len ) {
buffer_ . Alignment ( alignment ) ;
bufs_ [ index ] . buf fer_ . Alignment ( alignment ) ;
buffer_ . AllocateNewBuffer ( static_cast < size_t > ( roundup_len ) ,
bufs_ [ index ] . buffer_ . AllocateNewBuffer (
copy_data_to_new_buffer , chunk_offset_in _buffer ,
static_cast < size_t > ( roundup_len ) , copy_data_to_new _buffer,
static_cast < size_t > ( chunk_len ) ) ;
chunk_offset_in_buffer , static_cast < size_t > ( chunk_len ) ) ;
} else if ( chunk_len > 0 ) {
} else if ( chunk_len > 0 & & refit_tail ) {
// New buffer not needed. But memmove bytes from tail to the beginning since
// New buffer not needed. But memmove bytes from tail to the beginning since
// chunk_len is greater than 0.
// chunk_len is greater than 0.
buffer_ . RefitTail ( static_cast < size_t > ( chunk_offset_in_buffer ) ,
bufs_ [ index ] . buf fer_ . RefitTail ( static_cast < size_t > ( chunk_offset_in_buffer ) ,
static_cast < size_t > ( chunk_len ) ) ;
static_cast < size_t > ( chunk_len ) ) ;
}
}
Slice result ;
size_t read_len = static_cast < size_t > ( roundup_len - chunk_len ) ;
s = reader - > Read ( opts , rounddown_offset + chunk_len , read_len , & result ,
buffer_ . BufferStart ( ) + chunk_len , nullptr ,
rate_limiter_priority ) ;
if ( ! s . ok ( ) ) {
return s ;
}
}
Status FilePrefetchBuffer : : Read ( const IOOptions & opts ,
RandomAccessFileReader * reader ,
Env : : IOPriority rate_limiter_priority ,
uint64_t read_len , uint64_t chunk_len ,
uint64_t rounddown_start , uint32_t index ) {
Slice result ;
Status s = reader - > Read ( opts , rounddown_start + chunk_len , read_len , & result ,
bufs_ [ index ] . buffer_ . BufferStart ( ) + chunk_len ,
nullptr , rate_limiter_priority ) ;
# ifndef NDEBUG
# ifndef NDEBUG
if ( result . size ( ) < read_len ) {
if ( result . size ( ) < read_len ) {
// Fake an IO error to force db_stress fault injection to ignore
// Fake an IO error to force db_stress fault injection to ignore
@ -103,8 +85,270 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
IGNORE_STATUS_IF_ERROR ( Status : : IOError ( ) ) ;
IGNORE_STATUS_IF_ERROR ( Status : : IOError ( ) ) ;
}
}
# endif
# endif
buffer_offset_ = rounddown_offset ;
if ( ! s . ok ( ) ) {
buffer_ . Size ( static_cast < size_t > ( chunk_len ) + result . size ( ) ) ;
return s ;
}
// Update the buffer offset and size.
bufs_ [ index ] . offset_ = rounddown_start ;
bufs_ [ index ] . buffer_ . Size ( static_cast < size_t > ( chunk_len ) + result . size ( ) ) ;
return s ;
}
Status FilePrefetchBuffer : : ReadAsync ( const IOOptions & opts ,
RandomAccessFileReader * reader ,
Env : : IOPriority rate_limiter_priority ,
uint64_t read_len , uint64_t chunk_len ,
uint64_t rounddown_start , uint32_t index ) {
// Reset io_handle.
if ( io_handle_ ! = nullptr & & del_fn_ ! = nullptr ) {
del_fn_ ( io_handle_ ) ;
io_handle_ = nullptr ;
del_fn_ = nullptr ;
}
// callback for async read request.
auto fp = std : : bind ( & FilePrefetchBuffer : : PrefetchAsyncCallback , this ,
std : : placeholders : : _1 , std : : placeholders : : _2 ) ;
FSReadRequest req ;
Slice result ;
req . len = read_len ;
req . offset = rounddown_start + chunk_len ;
req . result = result ;
req . scratch = bufs_ [ index ] . buffer_ . BufferStart ( ) + chunk_len ;
Status s = reader - > ReadAsync ( req , opts , fp , nullptr /*cb_arg*/ , & io_handle_ ,
& del_fn_ , rate_limiter_priority ) ;
if ( s . ok ( ) ) {
async_read_in_progress_ = true ;
}
return s ;
}
Status FilePrefetchBuffer : : Prefetch ( const IOOptions & opts ,
RandomAccessFileReader * reader ,
uint64_t offset , size_t n ,
Env : : IOPriority rate_limiter_priority ) {
if ( ! enable_ | | reader = = nullptr ) {
return Status : : OK ( ) ;
}
TEST_SYNC_POINT ( " FilePrefetchBuffer::Prefetch:Start " ) ;
if ( offset + n < = bufs_ [ curr_ ] . offset_ + bufs_ [ curr_ ] . buffer_ . CurrentSize ( ) ) {
// All requested bytes are already in the curr_ buffer. So no need to Read
// again.
return Status : : OK ( ) ;
}
size_t alignment = reader - > file ( ) - > GetRequiredBufferAlignment ( ) ;
size_t offset_ = static_cast < size_t > ( offset ) ;
uint64_t rounddown_offset = Rounddown ( offset_ , alignment ) ;
uint64_t roundup_end = Roundup ( offset_ + n , alignment ) ;
uint64_t roundup_len = roundup_end - rounddown_offset ;
assert ( roundup_len > = alignment ) ;
assert ( roundup_len % alignment = = 0 ) ;
uint64_t chunk_len = 0 ;
CalculateOffsetAndLen ( alignment , offset , roundup_len , curr_ ,
true /*refit_tail*/ , chunk_len ) ;
size_t read_len = static_cast < size_t > ( roundup_len - chunk_len ) ;
Status s = Read ( opts , reader , rate_limiter_priority , read_len , chunk_len ,
rounddown_offset , curr_ ) ;
return s ;
}
// Copy data from src to third buffer.
void FilePrefetchBuffer : : CopyDataToBuffer ( uint32_t src , uint64_t & offset ,
size_t & length ) {
if ( length = = 0 ) {
return ;
}
uint64_t copy_offset = ( offset - bufs_ [ src ] . offset_ ) ;
size_t copy_len = 0 ;
if ( offset + length < =
bufs_ [ src ] . offset_ + bufs_ [ src ] . buffer_ . CurrentSize ( ) ) {
// All the bytes are in src.
copy_len = length ;
} else {
copy_len = bufs_ [ src ] . buffer_ . CurrentSize ( ) - copy_offset ;
}
memcpy ( bufs_ [ 2 ] . buffer_ . BufferStart ( ) + bufs_ [ 2 ] . buffer_ . CurrentSize ( ) ,
bufs_ [ src ] . buffer_ . BufferStart ( ) + copy_offset , copy_len ) ;
bufs_ [ 2 ] . buffer_ . Size ( bufs_ [ 2 ] . buffer_ . CurrentSize ( ) + copy_len ) ;
// Update offset and length.
offset + = copy_len ;
length - = copy_len ;
// length > 0 indicates it has consumed all data from the src buffer and it
// still needs to read more other buffer.
if ( length > 0 ) {
bufs_ [ src ] . buffer_ . Clear ( ) ;
}
}
// If async_read = true:
// async_read is enabled in case of sequential reads. So when
// buffers are switched, we clear the curr_ buffer as we assume the data has
// been consumed because of sequential reads.
//
// Scenarios for prefetching asynchronously:
// Case1: If both buffers are empty, prefetch n bytes
// synchronously in curr_
// and prefetch readahead_size_/2 async in second buffer.
// Case2: If second buffer has partial or full data, make it current and
// prefetch readahead_size_/2 async in second buffer. In case of
// partial data, prefetch remaining bytes from size n synchronously to
// fulfill the requested bytes request.
// Case3: If curr_ has partial data, prefetch remaining bytes from size n
// synchronously in curr_ to fulfill the requested bytes request and
// prefetch readahead_size_/2 bytes async in second buffer.
// Case4: If data is in both buffers, copy requested data from curr_ and second
// buffer to third buffer. If all requested bytes have been copied, do
// the asynchronous prefetching in second buffer.
Status FilePrefetchBuffer : : PrefetchAsync ( const IOOptions & opts ,
RandomAccessFileReader * reader ,
FileSystem * fs , uint64_t offset ,
size_t length , size_t readahead_size ,
Env : : IOPriority rate_limiter_priority ,
bool & copy_to_third_buffer ) {
if ( ! enable_ ) {
return Status : : OK ( ) ;
}
if ( async_read_in_progress_ & & fs ! = nullptr ) {
// Wait for prefetch data to complete.
// No mutex is needed as PrefetchAsyncCallback updates the result in second
// buffer and FilePrefetchBuffer should wait for Poll before accessing the
// second buffer.
std : : vector < void * > handles ;
handles . emplace_back ( io_handle_ ) ;
fs - > Poll ( handles , 1 ) . PermitUncheckedError ( ) ;
}
// TODO akanksha: Update TEST_SYNC_POINT after new tests are added.
TEST_SYNC_POINT ( " FilePrefetchBuffer::Prefetch:Start " ) ;
Status s ;
size_t prefetch_size = length + readahead_size ;
size_t alignment = reader - > file ( ) - > GetRequiredBufferAlignment ( ) ;
// Index of second buffer.
uint32_t second = curr_ ^ 1 ;
// If data is in second buffer, make it curr_. Second buffer can be either
// partial filled or full.
if ( bufs_ [ second ] . buffer_ . CurrentSize ( ) > 0 & &
offset > = bufs_ [ second ] . offset_ & &
offset < = bufs_ [ second ] . offset_ + bufs_ [ second ] . buffer_ . CurrentSize ( ) ) {
// Clear the curr_ as buffers have been swapped and curr_ contains the
// outdated data.
bufs_ [ curr_ ] . buffer_ . Clear ( ) ;
// Switch the buffers.
curr_ = curr_ ^ 1 ;
second = curr_ ^ 1 ;
}
// If second buffer contains outdated data, clear it for async prefetching.
// Outdated can be because previous sequential reads were read from the cache
// instead of this buffer.
if ( bufs_ [ second ] . buffer_ . CurrentSize ( ) > 0 & &
offset > = bufs_ [ second ] . offset_ + bufs_ [ second ] . buffer_ . CurrentSize ( ) ) {
bufs_ [ second ] . buffer_ . Clear ( ) ;
}
// Data is overlapping i.e. some of the data is in curr_ buffer and remaining
// in second buffer.
if ( bufs_ [ curr_ ] . buffer_ . CurrentSize ( ) > 0 & &
bufs_ [ second ] . buffer_ . CurrentSize ( ) > 0 & &
offset > = bufs_ [ curr_ ] . offset_ & &
offset < bufs_ [ curr_ ] . offset_ + bufs_ [ curr_ ] . buffer_ . CurrentSize ( ) & &
offset + prefetch_size > bufs_ [ second ] . offset_ ) {
// Allocate new buffer to third buffer;
bufs_ [ 2 ] . buffer_ . Clear ( ) ;
bufs_ [ 2 ] . buffer_ . Alignment ( alignment ) ;
bufs_ [ 2 ] . buffer_ . AllocateNewBuffer ( length ) ;
bufs_ [ 2 ] . offset_ = offset ;
copy_to_third_buffer = true ;
// Move data from curr_ buffer to third.
CopyDataToBuffer ( curr_ , offset , length ) ;
if ( length = = 0 ) {
// Requested data has been copied and curr_ still has unconsumed data.
return s ;
}
CopyDataToBuffer ( second , offset , length ) ;
// Length == 0: All the requested data has been copied to third buffer. It
// should go for only async prefetching.
// Length > 0: More data needs to be consumed so it will continue async and
// sync prefetching and copy the remaining data to third buffer in the end.
// swap the buffers.
curr_ = curr_ ^ 1 ;
prefetch_size - = length ;
}
// Update second again if swap happened.
second = curr_ ^ 1 ;
size_t _offset = static_cast < size_t > ( offset ) ;
// offset and size alignment for curr_ buffer with synchronous prefetching
uint64_t rounddown_start1 = Rounddown ( _offset , alignment ) ;
uint64_t roundup_end1 = Roundup ( _offset + prefetch_size , alignment ) ;
uint64_t roundup_len1 = roundup_end1 - rounddown_start1 ;
assert ( roundup_len1 > = alignment ) ;
assert ( roundup_len1 % alignment = = 0 ) ;
uint64_t chunk_len1 = 0 ;
uint64_t read_len1 = 0 ;
// For length == 0, skip the synchronous prefetching. read_len1 will be 0.
if ( length > 0 ) {
CalculateOffsetAndLen ( alignment , offset , roundup_len1 , curr_ ,
false /*refit_tail*/ , chunk_len1 ) ;
read_len1 = static_cast < size_t > ( roundup_len1 - chunk_len1 ) ;
}
{
// offset and size alignment for second buffer for asynchronous
// prefetching
uint64_t rounddown_start2 = roundup_end1 ;
uint64_t roundup_end2 =
Roundup ( rounddown_start2 + readahead_size , alignment ) ;
// For length == 0, do the asynchronous prefetching in second instead of
// synchronous prefetching of remaining prefetch_size.
if ( length = = 0 ) {
rounddown_start2 =
bufs_ [ curr_ ] . offset_ + bufs_ [ curr_ ] . buffer_ . CurrentSize ( ) ;
roundup_end2 = Roundup ( rounddown_start2 + prefetch_size , alignment ) ;
}
uint64_t roundup_len2 = roundup_end2 - rounddown_start2 ;
uint64_t chunk_len2 = 0 ;
CalculateOffsetAndLen ( alignment , rounddown_start2 , roundup_len2 , second ,
false /*refit_tail*/ , chunk_len2 ) ;
// Update the buffer offset.
bufs_ [ second ] . offset_ = rounddown_start2 ;
uint64_t read_len2 = static_cast < size_t > ( roundup_len2 - chunk_len2 ) ;
ReadAsync ( opts , reader , rate_limiter_priority , read_len2 , chunk_len2 ,
rounddown_start2 , second )
. PermitUncheckedError ( ) ;
}
if ( read_len1 > 0 ) {
s = Read ( opts , reader , rate_limiter_priority , read_len1 , chunk_len1 ,
rounddown_start1 , curr_ ) ;
if ( ! s . ok ( ) ) {
return s ;
}
}
// Copy remaining requested bytes to third_buffer.
if ( copy_to_third_buffer & & length > 0 ) {
CopyDataToBuffer ( curr_ , offset , length ) ;
}
return s ;
return s ;
}
}
@ -117,7 +361,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
if ( track_min_offset_ & & offset < min_offset_read_ ) {
if ( track_min_offset_ & & offset < min_offset_read_ ) {
min_offset_read_ = static_cast < size_t > ( offset ) ;
min_offset_read_ = static_cast < size_t > ( offset ) ;
}
}
if ( ! enable_ | | offset < buffer_offset_ ) {
if ( ! enable_ | | ( offset < bufs_ [ curr_ ] . offset_ ) ) {
return false ;
return false ;
}
}
@ -127,36 +371,94 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
// If readahead is not enabled: return false.
// If readahead is not enabled: return false.
TEST_SYNC_POINT_CALLBACK ( " FilePrefetchBuffer::TryReadFromCache " ,
TEST_SYNC_POINT_CALLBACK ( " FilePrefetchBuffer::TryReadFromCache " ,
& readahead_size_ ) ;
& readahead_size_ ) ;
if ( offset + n > buffer_offset_ + buffer_ . CurrentSize ( ) ) {
if ( offset + n > bufs_ [ curr_ ] . offset_ + bufs_ [ curr_ ] . buffer_ . CurrentSize ( ) ) {
if ( readahead_size_ > 0 ) {
if ( readahead_size_ > 0 ) {
Status s ;
assert ( reader ! = nullptr ) ;
assert ( reader ! = nullptr ) ;
assert ( max_readahead_size_ > = readahead_size_ ) ;
assert ( max_readahead_size_ > = readahead_size_ ) ;
Status s ;
if ( for_compaction ) {
if ( for_compaction ) {
s = Prefetch ( opts , reader , offset , std : : max ( n , readahead_size_ ) ,
s = Prefetch ( opts , reader , offset , std : : max ( n , readahead_size_ ) ,
rate_limiter_priority ) ;
rate_limiter_priority ) ;
} else {
} else {
if ( implicit_auto_readahead_ ) {
if ( implicit_auto_readahead_ ) {
// Prefetch only if this read is sequential otherwise reset
if ( ! IsEligibleForPrefetch ( offset , n ) ) {
// readahead_size_ to initial value.
if ( ! IsBlockSequential ( offset ) ) {
UpdateReadPattern ( offset , n ) ;
ResetValues ( ) ;
// Ignore status as Prefetch is not called.
// Ignore status as Prefetch is not called.
s . PermitUncheckedError ( ) ;
s . PermitUncheckedError ( ) ;
return false ;
return false ;
}
}
num_file_reads_ + + ;
}
if ( num_file_reads_ < = kMinNumFileReadsToStartAutoReadahead ) {
s = Prefetch ( opts , reader , offset , n + readahead_size_ ,
UpdateReadPattern ( offset , n ) ;
rate_limiter_priority ) ;
}
if ( ! s . ok ( ) ) {
if ( status ) {
* status = s ;
}
# ifndef NDEBUG
IGNORE_STATUS_IF_ERROR ( s ) ;
# endif
return false ;
}
readahead_size_ = std : : min ( max_readahead_size_ , readahead_size_ * 2 ) ;
} else {
return false ;
}
}
UpdateReadPattern ( offset , n , false /*decrease_readaheadsize*/ ) ;
uint64_t offset_in_buffer = offset - bufs_ [ curr_ ] . offset_ ;
* result = Slice ( bufs_ [ curr_ ] . buffer_ . BufferStart ( ) + offset_in_buffer , n ) ;
return true ;
}
// TODO akanksha: Merge this function with TryReadFromCache once async
// functionality is stable.
bool FilePrefetchBuffer : : TryReadFromCacheAsync (
const IOOptions & opts , RandomAccessFileReader * reader , uint64_t offset ,
size_t n , Slice * result , Status * status ,
Env : : IOPriority rate_limiter_priority , bool for_compaction /* = false */ ,
FileSystem * fs ) {
if ( track_min_offset_ & & offset < min_offset_read_ ) {
min_offset_read_ = static_cast < size_t > ( offset ) ;
}
if ( ! enable_ | | ( offset < bufs_ [ curr_ ] . offset_ ) ) {
return false ;
}
bool prefetched = false ;
bool copy_to_third_buffer = false ;
// If the buffer contains only a few of the requested bytes:
// If readahead is enabled: prefetch the remaining bytes + readahead bytes
// and satisfy the request.
// If readahead is not enabled: return false.
TEST_SYNC_POINT_CALLBACK ( " FilePrefetchBuffer::TryReadFromCache " ,
& readahead_size_ ) ;
if ( offset + n > bufs_ [ curr_ ] . offset_ + bufs_ [ curr_ ] . buffer_ . CurrentSize ( ) ) {
if ( readahead_size_ > 0 ) {
Status s ;
assert ( reader ! = nullptr ) ;
assert ( max_readahead_size_ > = readahead_size_ ) ;
if ( for_compaction ) {
s = Prefetch ( opts , reader , offset , std : : max ( n , readahead_size_ ) ,
rate_limiter_priority ) ;
} else {
if ( implicit_auto_readahead_ ) {
if ( ! IsEligibleForPrefetch ( offset , n ) ) {
// Ignore status as Prefetch is not called.
// Ignore status as Prefetch is not called.
s . PermitUncheckedError ( ) ;
s . PermitUncheckedError ( ) ;
return false ;
return false ;
}
}
}
}
if ( implicit_auto_readahead_ & & async_io_ ) {
// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously.
s = PrefetchAsync ( opts , reader , fs , offset , n , readahead_size_ / 2 ,
rate_limiter_priority , copy_to_third_buffer ) ;
} else {
s = Prefetch ( opts , reader , offset , n + readahead_size_ ,
s = Prefetch ( opts , reader , offset , n + readahead_size_ ,
rate_limiter_priority ) ;
rate_limiter_priority ) ;
}
}
}
if ( ! s . ok ( ) ) {
if ( ! s . ok ( ) ) {
if ( status ) {
if ( status ) {
* status = s ;
* status = s ;
@ -166,14 +468,49 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
# endif
# endif
return false ;
return false ;
}
}
readahead_size_ = std : : min ( max_readahead_size_ , readahead_size_ * 2 ) ;
prefetched = true ;
} else {
} else {
return false ;
return false ;
}
}
}
}
UpdateReadPattern ( offset , n ) ;
UpdateReadPattern ( offset , n , false /*decrease_readaheadsize*/ ) ;
uint64_t offset_in_buffer = offset - buffer_offset_ ;
* result = Slice ( buffer_ . BufferStart ( ) + offset_in_buffer , n ) ;
uint32_t index = curr_ ;
if ( copy_to_third_buffer ) {
index = 2 ;
}
uint64_t offset_in_buffer = offset - bufs_ [ index ] . offset_ ;
* result = Slice ( bufs_ [ index ] . buffer_ . BufferStart ( ) + offset_in_buffer , n ) ;
if ( prefetched ) {
readahead_size_ = std : : min ( max_readahead_size_ , readahead_size_ * 2 ) ;
}
return true ;
return true ;
}
}
void FilePrefetchBuffer : : PrefetchAsyncCallback ( const FSReadRequest & req ,
void * /*cb_arg*/ ) {
async_read_in_progress_ = false ;
uint32_t index = curr_ ^ 1 ;
if ( req . status . ok ( ) ) {
if ( req . offset + req . result . size ( ) < =
bufs_ [ index ] . offset_ + bufs_ [ index ] . buffer_ . CurrentSize ( ) ) {
// All requested bytes are already in the buffer. So no need to update.
return ;
}
if ( req . offset < bufs_ [ index ] . offset_ ) {
// Next block to be read has changed (Recent read was not a sequential
// read). So ignore this read.
return ;
}
size_t current_size = bufs_ [ index ] . buffer_ . CurrentSize ( ) ;
bufs_ [ index ] . buffer_ . Size ( current_size + req . result . size ( ) ) ;
}
// Release io_handle_.
if ( io_handle_ ! = nullptr & & del_fn_ ! = nullptr ) {
del_fn_ ( io_handle_ ) ;
io_handle_ = nullptr ;
del_fn_ = nullptr ;
}
}
} // namespace ROCKSDB_NAMESPACE
} // namespace ROCKSDB_NAMESPACE