@ -7470,7 +7470,7 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
options . level0_file_num_compaction_trigger = 2 ;
options . level0_file_num_compaction_trigger = 2 ;
options . level0_slowdown_writes_trigger = 2 ;
options . level0_slowdown_writes_trigger = 2 ;
options . level0_stop_writes_trigger = 2 ;
options . level0_stop_writes_trigger = 2 ;
options . soft_rate_limit = 1.1 ;
options . soft_pending_compaction_bytes_limit = 1024 * 1024 ;
// Use file size to distinguish levels
// Use file size to distinguish levels
// L1: 10, L2: 20, L3 40, L4 80
// L1: 10, L2: 20, L3 40, L4 80
@ -7586,7 +7586,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
options . env = env_ ;
options . env = env_ ;
options . create_if_missing = true ;
options . create_if_missing = true ;
options . compression = kNoCompression ;
options . compression = kNoCompression ;
options . soft_rate_limit = 1.1 ;
options . soft_pending_compaction_bytes_limit = 1024 * 1024 ;
options . write_buffer_size = k64KB ;
options . write_buffer_size = k64KB ;
options . arena_block_size = 4 * k4KB ;
options . arena_block_size = 4 * k4KB ;
options . max_write_buffer_number = 2 ;
options . max_write_buffer_number = 2 ;
@ -8491,7 +8491,7 @@ TEST_F(DBTest, TablePropertiesNeedCompactTest) {
options . target_file_size_base = 2048 ;
options . target_file_size_base = 2048 ;
options . max_bytes_for_level_base = 10240 ;
options . max_bytes_for_level_base = 10240 ;
options . max_bytes_for_level_multiplier = 4 ;
options . max_bytes_for_level_multiplier = 4 ;
options . soft_rate_limit = 1.1 ;
options . soft_pending_compaction_bytes_limit = 1024 * 1024 ;
options . num_levels = 8 ;
options . num_levels = 8 ;
std : : shared_ptr < TablePropertiesCollectorFactory > collector_factory =
std : : shared_ptr < TablePropertiesCollectorFactory > collector_factory =
@ -8995,118 +8995,116 @@ TEST_F(DBTest, SoftLimit) {
options = CurrentOptions ( options ) ;
options = CurrentOptions ( options ) ;
options . write_buffer_size = 100000 ; // Small write buffer
options . write_buffer_size = 100000 ; // Small write buffer
options . max_write_buffer_number = 256 ;
options . max_write_buffer_number = 256 ;
options . level0_file_num_compaction_trigger = 3 ;
options . level0_file_num_compaction_trigger = 1 ;
options . level0_slowdown_writes_trigger = 3 ;
options . level0_slowdown_writes_trigger = 3 ;
options . level0_stop_writes_trigger = 999999 ;
options . level0_stop_writes_trigger = 999999 ;
options . delayed_write_rate = 200000 ; // About 200KB/s limited rate
options . delayed_write_rate = 20000 ; // About 200KB/s limited rate
options . soft_rate_limit = 1.1 ;
options . soft_pending_compaction_bytes_limit = 200000 ;
options . target_file_size_base = 99999999 ; // All into one file
options . target_file_size_base = 99999999 ; // All into one file
options . max_bytes_for_level_base = 50000 ;
options . max_bytes_for_level_base = 50000 ;
options . max_bytes_for_level_multiplier = 10 ;
options . max_background_compactions = 1 ;
options . compression = kNoCompression ;
options . compression = kNoCompression ;
Reopen ( options ) ;
Reopen ( options ) ;
Put ( Key ( 0 ) , " " ) ;
Put ( Key ( 0 ) , " " ) ;
// Only allow two compactions
test : : SleepingBackgroundTask sleeping_task_low ;
port : : Mutex mut ;
// Block compactions
port : : CondVar cv ( & mut ) ;
env_ - > Schedule ( & test : : SleepingBackgroundTask : : DoSleepTask , & sleeping_task_low ,
std : : atomic < int > compaction_cnt ( 0 ) ;
Env : : Priority : : LOW ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > SetCallBack (
sleeping_task_low . WaitUntilSleeping ( ) ;
" VersionSet::LogAndApply:WriteManifest " , [ & ] ( void * arg ) {
// Three flushes and the first compaction,
// three flushes and the second compaction go through.
MutexLock l ( & mut ) ;
while ( compaction_cnt . load ( ) > = 8 ) {
cv . Wait ( ) ;
}
compaction_cnt . fetch_add ( 1 ) ;
} ) ;
std : : atomic < int > sleep_count ( 0 ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" DBImpl::DelayWrite:Sleep " , [ & ] ( void * arg ) { sleep_count . fetch_add ( 1 ) ; } ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > EnableProcessing ( ) ;
// Create 3 L0 files, making score of L0 to be 3.
for ( int i = 0 ; i < 3 ; i + + ) {
for ( int i = 0 ; i < 3 ; i + + ) {
Put ( Key ( i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 100 - i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 100 - i ) , std : : string ( 5000 , ' x ' ) ) ;
// Flush the file. File size is around 30KB.
Flush ( ) ;
Flush ( ) ;
}
}
while ( compaction_cnt . load ( ) < 4 | | NumTableFilesAtLevel ( 0 ) > 0 ) {
ASSERT_TRUE ( dbfull ( ) - > TEST_write_controler ( ) . NeedsDelay ( ) ) ;
env_ - > SleepForMicroseconds ( 1000 ) ;
}
sleeping_task_low . WakeUp ( ) ;
sleeping_task_low . WaitUntilDone ( ) ;
sleeping_task_low . Reset ( ) ;
dbfull ( ) - > TEST_WaitForCompact ( ) ;
// Now there is one L1 file but doesn't trigger soft_rate_limit
// Now there is one L1 file but doesn't trigger soft_rate_limit
// The L1 file size is around 30KB.
ASSERT_EQ ( NumTableFilesAtLevel ( 1 ) , 1 ) ;
ASSERT_EQ ( NumTableFilesAtLevel ( 1 ) , 1 ) ;
ASSERT_EQ ( sleep_count . load ( ) , 0 ) ;
ASSERT_TRUE ( ! dbfull ( ) - > TEST_write_controler ( ) . NeedsDelay ( ) ) ;
// Only allow one compactin going through.
rocksdb : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" BackgroundCallCompaction:0 " , [ & ] ( void * arg ) {
// Schedule a sleeping task.
sleeping_task_low . Reset ( ) ;
env_ - > Schedule ( & test : : SleepingBackgroundTask : : DoSleepTask ,
& sleeping_task_low , Env : : Priority : : LOW ) ;
} ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > EnableProcessing ( ) ;
env_ - > Schedule ( & test : : SleepingBackgroundTask : : DoSleepTask , & sleeping_task_low ,
Env : : Priority : : LOW ) ;
sleeping_task_low . WaitUntilSleeping ( ) ;
// Create 3 L0 files, making score of L0 to be 3
for ( int i = 0 ; i < 3 ; i + + ) {
for ( int i = 0 ; i < 3 ; i + + ) {
Put ( Key ( 10 + i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 10 + i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 90 - i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 90 - i ) , std : : string ( 5000 , ' x ' ) ) ;
// Flush the file. File size is around 30KB.
Flush ( ) ;
Flush ( ) ;
}
}
while ( compaction_cnt . load ( ) < 8 | | NumTableFilesAtLevel ( 0 ) > 0 ) {
env_ - > SleepForMicroseconds ( 1000 ) ;
// Wake up sleep task to enable compaction to run and waits
}
// for it to go to sleep state again to make sure one compaction
// goes through.
sleeping_task_low . WakeUp ( ) ;
sleeping_task_low . WaitUntilSleeping ( ) ;
// Now there is one L1 file (around 60KB) which exceeds 50KB base by 10KB
// Given level multiplier 10, estimated pending compaction is around 100KB
// doesn't trigger soft_pending_compaction_bytes_limit
ASSERT_EQ ( NumTableFilesAtLevel ( 1 ) , 1 ) ;
ASSERT_EQ ( NumTableFilesAtLevel ( 1 ) , 1 ) ;
ASSERT_EQ ( sleep_count . load ( ) , 0 ) ;
ASSERT_TRUE ( ! dbfull ( ) - > TEST_write_controler ( ) . NeedsDelay ( ) ) ;
// Slowdown is triggered now
// Create 3 L0 files, making score of L0 to be 3, higher than L0.
for ( int i = 0 ; i < 10 ; i + + ) {
for ( int i = 0 ; i < 3 ; i + + ) {
Put ( Key ( i ) , std : : string ( 100 , ' x ' ) ) ;
Put ( Key ( 20 + i ) , std : : string ( 5000 , ' x ' ) ) ;
Put ( Key ( 80 - i ) , std : : string ( 5000 , ' x ' ) ) ;
// Flush the file. File size is around 30KB.
Flush ( ) ;
}
}
ASSERT_GT ( sleep_count . load ( ) , 0 ) ;
// Wake up sleep task to enable compaction to run and waits
// for it to go to sleep state again to make sure one compaction
// goes through.
sleeping_task_low . WakeUp ( ) ;
sleeping_task_low . WaitUntilSleeping ( ) ;
{
// Now there is one L1 file (around 90KB) which exceeds 50KB base by 40KB
MutexLock l ( & mut ) ;
// Given level multiplier 10, estimated pending compaction is around 400KB
compaction_cnt . store ( 7 ) ;
// triggerring soft_pending_compaction_bytes_limit
cv . SignalAll ( ) ;
ASSERT_EQ ( NumTableFilesAtLevel ( 1 ) , 1 ) ;
}
ASSERT_TRUE ( dbfull ( ) - > TEST_write_controler ( ) . NeedsDelay ( ) ) ;
while ( NumTableFilesAtLevel ( 1 ) > 0 ) {
env_ - > SleepForMicroseconds ( 1000 ) ;
}
// Slowdown is not triggered any more.
sleeping_task_low . WakeUp ( ) ;
sleep_count . store ( 0 ) ;
sleeping_task_low . WaitUntilSleeping ( ) ;
// Slowdown is not triggered now
for ( int i = 0 ; i < 10 ; i + + ) {
ASSERT_TRUE ( ! dbfull ( ) - > TEST_write_controler ( ) . NeedsDelay ( ) ) ;
Put ( Key ( i ) , std : : string ( 100 , ' x ' ) ) ;
}
ASSERT_EQ ( sleep_count . load ( ) , 0 ) ;
// shrink level base so L2 will hit soft limit easier.
// shrink level base so L2 will hit soft limit easier.
ASSERT_OK ( dbfull ( ) - > SetOptions ( {
ASSERT_OK ( dbfull ( ) - > SetOptions ( {
{ " max_bytes_for_level_base " , " 5000 " } ,
{ " max_bytes_for_level_base " , " 5000 " } ,
} ) ) ;
} ) ) ;
compaction_cnt . store ( 7 ) ;
Flush ( ) ;
while ( NumTableFilesAtLevel ( 0 ) = = 0 ) {
env_ - > SleepForMicroseconds ( 1000 ) ;
}
// Slowdown is triggered now
for ( int i = 0 ; i < 10 ; i + + ) {
Put ( Key ( i ) , std : : string ( 100 , ' x ' ) ) ;
}
ASSERT_GT ( sleep_count . load ( ) , 0 ) ;
{
MutexLock l ( & mut ) ;
compaction_cnt . store ( 7 ) ;
cv . SignalAll ( ) ;
}
while ( NumTableFilesAtLevel ( 2 ) ! = 0 ) {
Put ( " " , " " ) ;
env_ - > SleepForMicroseconds ( 1000 ) ;
Flush ( ) ;
}
ASSERT_TRUE ( dbfull ( ) - > TEST_write_controler ( ) . NeedsDelay ( ) ) ;
// Slowdown is not triggered anymore
sleeping_task_low . WaitUntilSleeping ( ) ;
sleep_count . store ( 0 ) ;
for ( int i = 0 ; i < 10 ; i + + ) {
Put ( Key ( i ) , std : : string ( 100 , ' x ' ) ) ;
}
ASSERT_EQ ( sleep_count . load ( ) , 0 ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > DisableProcessing ( ) ;
rocksdb : : SyncPoint : : GetInstance ( ) - > DisableProcessing ( ) ;
sleeping_task_low . WakeUp ( ) ;
sleeping_task_low . WaitUntilDone ( ) ;
}
}
# endif // ROCKSDB_LITE
# endif // ROCKSDB_LITE