@ -40,6 +40,97 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) {
ASSERT_TRUE ( dbfull ( ) - > Write ( write_options , & batch ) . IsInvalidArgument ( ) ) ;
}
TEST_P ( DBWriteTest , WriteThreadHangOnWriteStall ) {
Options options = GetOptions ( ) ;
options . level0_stop_writes_trigger = options . level0_slowdown_writes_trigger = 4 ;
std : : vector < port : : Thread > threads ;
std : : atomic < int > thread_num ( 0 ) ;
port : : Mutex mutex ;
port : : CondVar cv ( & mutex ) ;
Reopen ( options ) ;
std : : function < void ( ) > write_slowdown_func = [ & ] ( ) {
int a = thread_num . fetch_add ( 1 ) ;
std : : string key = " foo " + std : : to_string ( a ) ;
WriteOptions wo ;
wo . no_slowdown = false ;
dbfull ( ) - > Put ( wo , key , " bar " ) ;
} ;
std : : function < void ( ) > write_no_slowdown_func = [ & ] ( ) {
int a = thread_num . fetch_add ( 1 ) ;
std : : string key = " foo " + std : : to_string ( a ) ;
WriteOptions wo ;
wo . no_slowdown = true ;
dbfull ( ) - > Put ( wo , key , " bar " ) ;
} ;
std : : function < void ( void * ) > unblock_main_thread_func = [ & ] ( void * ) {
mutex . Lock ( ) ;
cv . SignalAll ( ) ;
mutex . Unlock ( ) ;
} ;
// Create 3 L0 files and schedule 4th without waiting
Put ( " foo " + std : : to_string ( thread_num . fetch_add ( 1 ) ) , " bar " ) ;
Flush ( ) ;
Put ( " foo " + std : : to_string ( thread_num . fetch_add ( 1 ) ) , " bar " ) ;
Flush ( ) ;
Put ( " foo " + std : : to_string ( thread_num . fetch_add ( 1 ) ) , " bar " ) ;
Flush ( ) ;
Put ( " foo " + std : : to_string ( thread_num . fetch_add ( 1 ) ) , " bar " ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" WriteThread::JoinBatchGroup:Start " , unblock_main_thread_func ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > LoadDependency (
{ { " DBWriteTest::WriteThreadHangOnWriteStall:1 " ,
" DBImpl::BackgroundCallFlush:start " } ,
{ " DBWriteTest::WriteThreadHangOnWriteStall:2 " ,
" DBImpl::WriteImpl:BeforeLeaderEnters " } ,
// Make compaction start wait for the write stall to be detected and
// implemented by a write group leader
{ " DBWriteTest::WriteThreadHangOnWriteStall:3 " ,
" BackgroundCallCompaction:0 " } } ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > EnableProcessing ( ) ;
// Schedule creation of 4th L0 file without waiting. This will seal the
// memtable and then wait for a sync point before writing the file. We need
// to do it this way because SwitchMemtable() needs to enter the
// write_thread
FlushOptions fopt ;
fopt . wait = false ;
dbfull ( ) - > Flush ( fopt ) ;
// Create a mix of slowdown/no_slowdown write threads
mutex . Lock ( ) ;
// First leader
threads . emplace_back ( write_slowdown_func ) ;
cv . Wait ( ) ;
// Second leader. Will stall writes
threads . emplace_back ( write_slowdown_func ) ;
cv . Wait ( ) ;
threads . emplace_back ( write_no_slowdown_func ) ;
cv . Wait ( ) ;
threads . emplace_back ( write_slowdown_func ) ;
cv . Wait ( ) ;
threads . emplace_back ( write_no_slowdown_func ) ;
cv . Wait ( ) ;
threads . emplace_back ( write_slowdown_func ) ;
cv . Wait ( ) ;
mutex . Unlock ( ) ;
TEST_SYNC_POINT ( " DBWriteTest::WriteThreadHangOnWriteStall:1 " ) ;
dbfull ( ) - > TEST_WaitForFlushMemTable ( nullptr ) ;
// This would have triggered a write stall. Unblock the write group leader
TEST_SYNC_POINT ( " DBWriteTest::WriteThreadHangOnWriteStall:2 " ) ;
// The leader is going to create missing newer links. When the leader finishes,
// the next leader is going to delay writes and fail writers with no_slowdown
TEST_SYNC_POINT ( " DBWriteTest::WriteThreadHangOnWriteStall:3 " ) ;
for ( auto & t : threads ) {
t . join ( ) ;
}
}
TEST_P ( DBWriteTest , IOErrorOnWALWritePropagateToWriteThreadFollower ) {
constexpr int kNumThreads = 5 ;
std : : unique_ptr < FaultInjectionTestEnv > mock_env (