@ -3110,6 +3110,204 @@ TEST_F(EnvTest, CreateCompositeEnv) {
}
# endif // ROCKSDB_LITE
// Forward declaration
class ReadAsyncFS ;
struct MockIOHandle {
std : : function < void ( const FSReadRequest & , void * ) > cb ;
void * cb_arg ;
bool create_io_error ;
} ;
// ReadAsyncFS and ReadAsyncRandomAccessFile mocks the FS doing asynchronous
// reads by creating threads that submit read requests and then calling Poll API
// to obtain those results.
class ReadAsyncRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public :
ReadAsyncRandomAccessFile ( ReadAsyncFS & fs ,
std : : unique_ptr < FSRandomAccessFile > & file )
: FSRandomAccessFileOwnerWrapper ( std : : move ( file ) ) , fs_ ( fs ) { }
IOStatus ReadAsync ( FSReadRequest & req , const IOOptions & opts ,
std : : function < void ( const FSReadRequest & , void * ) > cb ,
void * cb_arg , void * * io_handle , IOHandleDeleter * del_fn ,
IODebugContext * dbg ) override ;
private :
ReadAsyncFS & fs_ ;
std : : unique_ptr < FSRandomAccessFile > file_ ;
int counter = 0 ;
} ;
class ReadAsyncFS : public FileSystemWrapper {
public :
explicit ReadAsyncFS ( const std : : shared_ptr < FileSystem > & wrapped )
: FileSystemWrapper ( wrapped ) { }
static const char * kClassName ( ) { return " ReadAsyncFS " ; }
const char * Name ( ) const override { return kClassName ( ) ; }
IOStatus NewRandomAccessFile ( const std : : string & fname ,
const FileOptions & opts ,
std : : unique_ptr < FSRandomAccessFile > * result ,
IODebugContext * dbg ) override {
std : : unique_ptr < FSRandomAccessFile > file ;
IOStatus s = target ( ) - > NewRandomAccessFile ( fname , opts , & file , dbg ) ;
EXPECT_OK ( s ) ;
result - > reset ( new ReadAsyncRandomAccessFile ( * this , file ) ) ;
return s ;
}
IOStatus Poll ( std : : vector < void * > & io_handles ,
size_t /*min_completions*/ ) override {
// Wait for the threads completion.
for ( auto & t : workers ) {
t . join ( ) ;
}
for ( size_t i = 0 ; i < io_handles . size ( ) ; i + + ) {
MockIOHandle * handle = static_cast < MockIOHandle * > ( io_handles [ i ] ) ;
if ( handle - > create_io_error ) {
FSReadRequest req ;
req . status = IOStatus : : IOError ( ) ;
handle - > cb ( req , handle - > cb_arg ) ;
}
}
return IOStatus : : OK ( ) ;
}
std : : vector < std : : thread > workers ;
} ;
IOStatus ReadAsyncRandomAccessFile : : ReadAsync (
FSReadRequest & req , const IOOptions & opts ,
std : : function < void ( const FSReadRequest & , void * ) > cb , void * cb_arg ,
void * * io_handle , IOHandleDeleter * del_fn , IODebugContext * dbg ) {
IOHandleDeleter deletefn = [ ] ( void * args ) - > void {
delete ( static_cast < MockIOHandle * > ( args ) ) ;
args = nullptr ;
} ;
* del_fn = deletefn ;
// Allocate and populate io_handle.
MockIOHandle * mock_handle = new MockIOHandle ( ) ;
bool create_io_error = false ;
if ( counter % 2 ) {
create_io_error = true ;
}
mock_handle - > create_io_error = create_io_error ;
mock_handle - > cb = cb ;
mock_handle - > cb_arg = cb_arg ;
* io_handle = static_cast < void * > ( mock_handle ) ;
counter + + ;
// Submit read request asynchronously.
std : : function < void ( FSReadRequest ) > submit_request =
[ & opts , cb , cb_arg , io_handle , del_fn , dbg , create_io_error ,
this ] ( FSReadRequest _req ) {
if ( ! create_io_error ) {
target ( ) - > ReadAsync ( _req , opts , cb , cb_arg , io_handle , del_fn , dbg ) ;
}
} ;
fs_ . workers . emplace_back ( submit_request , req ) ;
return IOStatus : : OK ( ) ;
}
class TestAsyncRead : public testing : : Test {
public :
TestAsyncRead ( ) { env_ = Env : : Default ( ) ; }
Env * env_ ;
} ;
// Tests the default implementation of ReadAsync API.
TEST_F ( TestAsyncRead , ReadAsync ) {
EnvOptions soptions ;
std : : shared_ptr < ReadAsyncFS > fs =
std : : make_shared < ReadAsyncFS > ( env_ - > GetFileSystem ( ) ) ;
std : : string fname = test : : PerThreadDBPath ( env_ , " testfile " ) ;
const size_t kSectorSize = 4096 ;
const size_t kNumSectors = 8 ;
// 1. create & write to a file.
{
std : : unique_ptr < FSWritableFile > wfile ;
ASSERT_OK (
fs - > NewWritableFile ( fname , FileOptions ( ) , & wfile , nullptr /*dbg*/ ) ) ;
for ( size_t i = 0 ; i < kNumSectors ; + + i ) {
auto data = NewAligned ( kSectorSize * 8 , static_cast < char > ( i + 1 ) ) ;
Slice slice ( data . get ( ) , kSectorSize ) ;
ASSERT_OK ( wfile - > Append ( slice , IOOptions ( ) , nullptr ) ) ;
}
ASSERT_OK ( wfile - > Close ( IOOptions ( ) , nullptr ) ) ;
}
// 2. Read file
{
std : : unique_ptr < FSRandomAccessFile > file ;
ASSERT_OK ( fs - > NewRandomAccessFile ( fname , FileOptions ( ) , & file , nullptr ) ) ;
IOOptions opts ;
std : : vector < void * > io_handles ( kNumSectors ) ;
std : : vector < FSReadRequest > reqs ( kNumSectors ) ;
std : : vector < std : : unique_ptr < char , Deleter > > data ;
std : : vector < size_t > vals ;
IOHandleDeleter del_fn ;
uint64_t offset = 0 ;
// Initialize read requests
for ( size_t i = 0 ; i < kNumSectors ; i + + ) {
reqs [ i ] . offset = offset ;
reqs [ i ] . len = kSectorSize ;
data . emplace_back ( NewAligned ( kSectorSize , 0 ) ) ;
reqs [ i ] . scratch = data . back ( ) . get ( ) ;
vals . push_back ( i ) ;
offset + = kSectorSize ;
}
// callback function passed to async read.
std : : function < void ( const FSReadRequest & , void * ) > callback =
[ & ] ( const FSReadRequest & req , void * cb_arg ) {
assert ( cb_arg ! = nullptr ) ;
size_t i = * ( reinterpret_cast < size_t * > ( cb_arg ) ) ;
reqs [ i ] . offset = req . offset ;
reqs [ i ] . result = req . result ;
reqs [ i ] . status = req . status ;
} ;
// Submit asynchronous read requests.
for ( size_t i = 0 ; i < kNumSectors ; i + + ) {
void * cb_arg = static_cast < void * > ( & ( vals [ i ] ) ) ;
ASSERT_OK ( file - > ReadAsync ( reqs [ i ] , opts , callback , cb_arg ,
& ( io_handles [ i ] ) , & del_fn , nullptr ) ) ;
}
// Poll for the submitted requests.
fs - > Poll ( io_handles , kNumSectors ) ;
// Check the status of read requests.
for ( size_t i = 0 ; i < kNumSectors ; i + + ) {
if ( i % 2 ) {
ASSERT_EQ ( reqs [ i ] . status , IOStatus : : IOError ( ) ) ;
} else {
auto buf = NewAligned ( kSectorSize * 8 , static_cast < char > ( i + 1 ) ) ;
Slice expected_data ( buf . get ( ) , kSectorSize ) ;
ASSERT_EQ ( reqs [ i ] . offset , i * kSectorSize ) ;
ASSERT_OK ( reqs [ i ] . status ) ;
ASSERT_EQ ( expected_data . ToString ( ) , reqs [ i ] . result . ToString ( ) ) ;
}
}
// Delete io_handles.
for ( size_t i = 0 ; i < io_handles . size ( ) ; i + + ) {
del_fn ( io_handles [ i ] ) ;
}
}
}
} // namespace ROCKSDB_NAMESPACE
int main ( int argc , char * * argv ) {