@ -5649,41 +5649,18 @@ TEST_F(DBTest, HardLimit) {
# if !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
# if !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
class WriteStallListener : public EventListener {
class WriteStallListener : public EventListener {
public :
public :
WriteStallListener ( )
WriteStallListener ( ) : condition_ ( WriteStallCondition : : kNormal ) { }
: cond_ ( & mutex_ ) ,
condition_ ( WriteStallCondition : : kNormal ) ,
expected_ ( WriteStallCondition : : kNormal ) ,
expected_set_ ( false ) { }
void OnStallConditionsChanged ( const WriteStallInfo & info ) override {
void OnStallConditionsChanged ( const WriteStallInfo & info ) override {
MutexLock l ( & mutex_ ) ;
MutexLock l ( & mutex_ ) ;
condition_ = info . condition . cur ;
condition_ = info . condition . cur ;
if ( expected_set_ & & condition_ = = expected_ ) {
cond_ . Signal ( ) ;
expected_set_ = false ;
}
}
}
bool CheckCondition ( WriteStallCondition expected ) {
bool CheckCondition ( WriteStallCondition expected ) {
MutexLock l ( & mutex_ ) ;
MutexLock l ( & mutex_ ) ;
if ( expected ! = condition_ ) {
return expected = = condition_ ;
expected_ = expected ;
expected_set_ = true ;
while ( expected ! = condition_ ) {
// We bail out on timeout 500 milliseconds
const uint64_t timeout_us = 500000 ;
if ( cond_ . TimedWait ( timeout_us ) ) {
expected_set_ = false ;
return false ;
}
}
}
return true ;
}
}
private :
private :
port : : Mutex mutex_ ;
port : : Mutex mutex_ ;
port : : CondVar cond_ ;
WriteStallCondition condition_ ;
WriteStallCondition condition_ ;
WriteStallCondition expected_ ;
bool expected_set_ ;
} ;
} ;
TEST_F ( DBTest , SoftLimit ) {
TEST_F ( DBTest , SoftLimit ) {
@ -5704,6 +5681,41 @@ TEST_F(DBTest, SoftLimit) {
WriteStallListener * listener = new WriteStallListener ( ) ;
WriteStallListener * listener = new WriteStallListener ( ) ;
options . listeners . emplace_back ( listener ) ;
options . listeners . emplace_back ( listener ) ;
// FlushMemtable with opt.wait=true does not wait for
// `OnStallConditionsChanged` being called. The event listener is triggered
// on `JobContext::Clean`, which happens after flush result is installed.
// We use sync point to create a custom WaitForFlush that waits for
// context cleanup.
port : : Mutex flush_mutex ;
port : : CondVar flush_cv ( & flush_mutex ) ;
bool flush_finished = false ;
auto InstallFlushCallback = [ & ] ( ) {
{
MutexLock l ( & flush_mutex ) ;
flush_finished = false ;
}
SyncPoint : : GetInstance ( ) - > SetCallBack (
" DBImpl::BackgroundCallFlush:ContextCleanedUp " , [ & ] ( void * ) {
{
MutexLock l ( & flush_mutex ) ;
flush_finished = true ;
}
flush_cv . SignalAll ( ) ;
} ) ;
} ;
auto WaitForFlush = [ & ] ( ) {
{
MutexLock l ( & flush_mutex ) ;
while ( ! flush_finished ) {
flush_cv . Wait ( ) ;
}
}
SyncPoint : : GetInstance ( ) - > ClearCallBack (
" DBImpl::BackgroundCallFlush:ContextCleanedUp " ) ;
} ;
rocksdb : : SyncPoint : : GetInstance ( ) - > EnableProcessing ( ) ;
Reopen ( options ) ;
Reopen ( options ) ;
// Generating 360KB in Level 3
// Generating 360KB in Level 3
@ -5739,7 +5751,9 @@ TEST_F(DBTest, SoftLimit) {
Put ( Key ( i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 100 - i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 100 - i ) , std : : string ( 5000 , ' x ' ) ) ;
// Flush the file. File size is around 30KB.
// Flush the file. File size is around 30KB.
InstallFlushCallback ( ) ;
dbfull ( ) - > TEST_FlushMemTable ( true , true ) ;
dbfull ( ) - > TEST_FlushMemTable ( true , true ) ;
WaitForFlush ( ) ;
}
}
ASSERT_TRUE ( dbfull ( ) - > TEST_write_controler ( ) . NeedsDelay ( ) ) ;
ASSERT_TRUE ( dbfull ( ) - > TEST_write_controler ( ) . NeedsDelay ( ) ) ;
ASSERT_TRUE ( listener - > CheckCondition ( WriteStallCondition : : kDelayed ) ) ;
ASSERT_TRUE ( listener - > CheckCondition ( WriteStallCondition : : kDelayed ) ) ;
@ -5764,8 +5778,6 @@ TEST_F(DBTest, SoftLimit) {
& sleeping_task_low , Env : : Priority : : LOW ) ;
& sleeping_task_low , Env : : Priority : : LOW ) ;
} ) ;
} ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > EnableProcessing ( ) ;
env_ - > Schedule ( & test : : SleepingBackgroundTask : : DoSleepTask , & sleeping_task_low ,
env_ - > Schedule ( & test : : SleepingBackgroundTask : : DoSleepTask , & sleeping_task_low ,
Env : : Priority : : LOW ) ;
Env : : Priority : : LOW ) ;
sleeping_task_low . WaitUntilSleeping ( ) ;
sleeping_task_low . WaitUntilSleeping ( ) ;
@ -5774,7 +5786,9 @@ TEST_F(DBTest, SoftLimit) {
Put ( Key ( 10 + i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 10 + i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 90 - i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 90 - i ) , std : : string ( 5000 , ' x ' ) ) ;
// Flush the file. File size is around 30KB.
// Flush the file. File size is around 30KB.
InstallFlushCallback ( ) ;
dbfull ( ) - > TEST_FlushMemTable ( true , true ) ;
dbfull ( ) - > TEST_FlushMemTable ( true , true ) ;
WaitForFlush ( ) ;
}
}
// Wake up sleep task to enable compaction to run and waits
// Wake up sleep task to enable compaction to run and waits
@ -5795,7 +5809,9 @@ TEST_F(DBTest, SoftLimit) {
Put ( Key ( 20 + i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 20 + i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 80 - i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 80 - i ) , std : : string ( 5000 , ' x ' ) ) ;
// Flush the file. File size is around 30KB.
// Flush the file. File size is around 30KB.
InstallFlushCallback ( ) ;
dbfull ( ) - > TEST_FlushMemTable ( true , true ) ;
dbfull ( ) - > TEST_FlushMemTable ( true , true ) ;
WaitForFlush ( ) ;
}
}
// Wake up sleep task to enable compaction to run and waits
// Wake up sleep task to enable compaction to run and waits
// for it to go to sleep state again to make sure one compaction
// for it to go to sleep state again to make sure one compaction