@ -420,10 +420,6 @@ static class std::shared_ptr<rocksdb::Statistics> dbstats;
DEFINE_int64 ( writes , - 1 , " Number of write operations to do. If negative, do "
DEFINE_int64 ( writes , - 1 , " Number of write operations to do. If negative, do "
" --num reads. " ) ;
" --num reads. " ) ;
DEFINE_int32 ( writes_per_second , 0 , " Per-thread rate limit on writes and merges "
" per second. No limit when <= 0. Only for the readwhilewriting "
" and readwhilemerging tests. " ) ;
DEFINE_bool ( sync , false , " Sync all writes to disk " ) ;
DEFINE_bool ( sync , false , " Sync all writes to disk " ) ;
DEFINE_bool ( disable_data_sync , false , " If true, do not wait until data is "
DEFINE_bool ( disable_data_sync , false , " If true, do not wait until data is "
@ -668,7 +664,8 @@ DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
DEFINE_uint64 (
DEFINE_uint64 (
benchmark_write_rate_limit , 0 ,
benchmark_write_rate_limit , 0 ,
" If non-zero, db_bench will rate-limit the writes going into RocksDB " ) ;
" If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
" is the global rate in bytes/second. " ) ;
DEFINE_int32 ( max_grandparent_overlap_factor , 10 , " Control maximum bytes of "
DEFINE_int32 ( max_grandparent_overlap_factor , 10 , " Control maximum bytes of "
" overlaps in grandparent (i.e., level+2) before we stop building a "
" overlaps in grandparent (i.e., level+2) before we stop building a "
@ -1312,6 +1309,11 @@ class Stats {
}
}
}
}
void ResetLastOpTime ( ) {
// Set to now to avoid latency from calls to SleepForMicroseconds
last_op_finish_ = FLAGS_env - > NowMicros ( ) ;
}
void FinishedOps ( DBWithColumnFamilies * db_with_cfh , DB * db , int64_t num_ops ,
void FinishedOps ( DBWithColumnFamilies * db_with_cfh , DB * db , int64_t num_ops ,
enum OperationType op_type = kOthers ) {
enum OperationType op_type = kOthers ) {
if ( reporter_agent_ ) {
if ( reporter_agent_ ) {
@ -1633,7 +1635,8 @@ class Benchmark {
( ( ( FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio )
( ( ( FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio )
* num_ )
* num_ )
/ 1048576.0 ) ) ;
/ 1048576.0 ) ) ;
fprintf ( stdout , " Writes per second: %d \n " , FLAGS_writes_per_second ) ;
fprintf ( stdout , " Write rate: % " PRIu64 " bytes/second \n " ,
FLAGS_benchmark_write_rate_limit ) ;
if ( FLAGS_enable_numa ) {
if ( FLAGS_enable_numa ) {
fprintf ( stderr , " Running in NUMA enabled mode. \n " ) ;
fprintf ( stderr , " Running in NUMA enabled mode. \n " ) ;
# ifndef NUMA
# ifndef NUMA
@ -2845,14 +2848,22 @@ class Benchmark {
}
}
}
}
}
}
size_t id = thread - > rand . Next ( ) % num_key_gens ;
size_t id = thread - > rand . Next ( ) % num_key_gens ;
DBWithColumnFamilies * db_with_cfh = SelectDBWithCfh ( id ) ;
DBWithColumnFamilies * db_with_cfh = SelectDBWithCfh ( id ) ;
batch . Clear ( ) ;
batch . Clear ( ) ;
if ( thread - > shared - > write_rate_limiter . get ( ) ! = nullptr ) {
thread - > shared - > write_rate_limiter - > Request (
entries_per_batch_ * ( value_size_ + key_size_ ) ,
Env : : IO_HIGH ) ;
// Set time at which last op finished to Now() to hide latency and
// sleep from rate limiter. Also, do the check once per batch, not
// once per write.
thread - > stats . ResetLastOpTime ( ) ;
}
for ( int64_t j = 0 ; j < entries_per_batch_ ; j + + ) {
for ( int64_t j = 0 ; j < entries_per_batch_ ; j + + ) {
if ( thread - > shared - > write_rate_limiter . get ( ) ! = nullptr ) {
thread - > shared - > write_rate_limiter - > Request ( value_size_ + key_size_ ,
Env : : IO_HIGH ) ;
}
int64_t rand_num = key_gens [ id ] - > Next ( ) ;
int64_t rand_num = key_gens [ id ] - > Next ( ) ;
GenerateKeyFromInt ( rand_num , FLAGS_num , & key ) ;
GenerateKeyFromInt ( rand_num , FLAGS_num , & key ) ;
if ( FLAGS_num_column_families < = 1 ) {
if ( FLAGS_num_column_families < = 1 ) {
@ -3253,16 +3264,13 @@ class Benchmark {
void BGWriter ( ThreadState * thread , enum OperationType write_merge ) {
void BGWriter ( ThreadState * thread , enum OperationType write_merge ) {
// Special thread that keeps writing until other threads are done.
// Special thread that keeps writing until other threads are done.
RandomGenerator gen ;
RandomGenerator gen ;
uint64_t last = FLAGS_env - > NowMicros ( ) ;
int writes_per_second_by_10 = 0 ;
int num_writes = 0 ;
int64_t bytes = 0 ;
int64_t bytes = 0 ;
// --writes_per_second rate limit is enforced per 100 milliseconds
std : : unique_ptr < RateLimiter > write_rate_limiter ;
// intervals to avoid a burst of writes at the start of each second.
if ( FLAGS_benchmark_write_rate_limit > 0 ) {
write_rate_limiter . reset (
if ( FLAGS_writes_per_second > 0 )
NewGenericRateLimiter ( FLAGS_benchmark_write_rate_limit ) ) ;
writes_per_second_by_10 = FLAGS_writes_per_second / 10 ;
}
// Don't merge stats from this thread with the readers.
// Don't merge stats from this thread with the readers.
thread - > stats . SetExcludeFromMerge ( ) ;
thread - > stats . SetExcludeFromMerge ( ) ;
@ -3296,18 +3304,10 @@ class Benchmark {
bytes + = key . size ( ) + value_size_ ;
bytes + = key . size ( ) + value_size_ ;
thread - > stats . FinishedOps ( & db_ , db_ . db , 1 , kWrite ) ;
thread - > stats . FinishedOps ( & db_ , db_ . db , 1 , kWrite ) ;
+ + num_writes ;
if ( FLAGS_benchmark_write_rate_limit > 0 ) {
if ( writes_per_second_by_10 & & num_writes > = writes_per_second_by_10 ) {
write_rate_limiter - > Request (
uint64_t now = FLAGS_env - > NowMicros ( ) ;
entries_per_batch_ * ( value_size_ + key_size_ ) ,
uint64_t usecs_since_last = now - last ;
Env : : IO_HIGH ) ;
num_writes = 0 ;
last = now ;
if ( usecs_since_last < 100000 ) {
FLAGS_env - > SleepForMicroseconds ( static_cast < int > ( 100000 - usecs_since_last ) ) ;
last = FLAGS_env - > NowMicros ( ) ;
}
}
}
}
}
thread - > stats . AddBytes ( bytes ) ;
thread - > stats . AddBytes ( bytes ) ;