@ -153,6 +153,13 @@ DEFINE_int64(merge_keys, -1,
" If negative, there will be FLAGS_num keys. " ) ;
DEFINE_int32 ( num_column_families , 1 , " Number of Column Families to use. " ) ;
DEFINE_int32 (
num_hot_column_families , 8 ,
" Number of Hot Column Families. If more than 0, only write to this "
" number of column families. After finishing all the writes to them, "
" create new set of column families and insert to them. Only used "
" when num_column_families > 1. " ) ;
DEFINE_int64 ( reads , - 1 , " Number of read operations to do. "
" If negative, do FLAGS_num reads. " ) ;
@ -390,6 +397,16 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
fprintf ( stdout , " Cannot parse compression type '%s' \n " , ctype ) ;
return rocksdb : : kSnappyCompression ; //default value
}
std : : string ColumnFamilyName ( int i ) {
if ( i = = 0 ) {
return rocksdb : : kDefaultColumnFamilyName ;
} else {
char name [ 100 ] ;
snprintf ( name , sizeof ( name ) , " column_family_name_%06d " , i ) ;
return std : : string ( name ) ;
}
}
} // namespace
DEFINE_string ( compression_type , " snappy " ,
@ -475,6 +492,7 @@ DEFINE_int32(source_compaction_factor, 1, "Cap the size of data in level-K for"
DEFINE_uint64 ( wal_ttl_seconds , 0 , " Set the TTL for the WAL Files in seconds. " ) ;
DEFINE_uint64 ( wal_size_limit_MB , 0 , " Set the size limit for the WAL Files "
" in MB. " ) ;
DEFINE_uint64 ( max_total_wal_size , 0 , " Set total max WAL size " ) ;
DEFINE_bool ( bufferedio , rocksdb : : EnvOptions ( ) . use_os_buffer ,
" Allow buffered io using OS buffers " ) ;
@ -779,9 +797,50 @@ static void AppendWithSpace(std::string* str, Slice msg) {
struct DBWithColumnFamilies {
std : : vector < ColumnFamilyHandle * > cfh ;
DB * db ;
std : : atomic < size_t > num_created ; // Need to be updated after all the
// new entries in cfh are set.
size_t num_hot ; // Number of column families to be queried at each moment.
// After each CreateNewCf(), another num_hot number of new
// Column families will be created and used to be queried.
port : : Mutex create_cf_mutex ; // Only one thread can execute CreateNewCf()
DBWithColumnFamilies ( ) : db ( nullptr ) {
cfh . clear ( ) ;
}
DBWithColumnFamilies ( const DBWithColumnFamilies & other )
: cfh ( other . cfh ) ,
db ( other . db ) ,
num_created ( other . num_created . load ( ) ) ,
num_hot ( other . num_hot ) { }
ColumnFamilyHandle * GetCfh ( int64_t rand_num ) {
assert ( num_hot > 0 ) ;
return cfh [ num_created . load ( std : : memory_order_acquire ) - num_hot +
rand_num % num_hot ] ;
}
// stage: assume CF from 0 to stage * num_hot has be created. Need to create
// stage * num_hot + 1 to stage * (num_hot + 1).
void CreateNewCf ( ColumnFamilyOptions options , int64_t stage ) {
MutexLock l ( & create_cf_mutex ) ;
if ( ( stage + 1 ) * num_hot < = num_created ) {
// Already created.
return ;
}
auto new_num_created = num_created + num_hot ;
assert ( new_num_created < = cfh . size ( ) ) ;
for ( size_t i = num_created ; i < new_num_created ; i + + ) {
Status s =
db - > CreateColumnFamily ( options , ColumnFamilyName ( i ) , & ( cfh [ i ] ) ) ;
if ( ! s . ok ( ) ) {
fprintf ( stderr , " create column family error: %s \n " ,
s . ToString ( ) . c_str ( ) ) ;
abort ( ) ;
}
}
num_created . store ( new_num_created , std : : memory_order_release ) ;
}
} ;
class Stats {
@ -888,8 +947,8 @@ class Stats {
if ( FLAGS_stats_per_interval ) {
std : : string stats ;
if ( db_with_cfh & & db_with_cfh - > cfh . size ( ) ) {
for ( size_t i = 0 ; i < db_with_cfh - > cfh . size ( ) ; + + i ) {
if ( db_with_cfh & & db_with_cfh - > num_created . load ( ) ) {
for ( size_t i = 0 ; i < db_with_cfh - > num_created . load ( ) ; + + i ) {
if ( db - > GetProperty ( db_with_cfh - > cfh [ i ] , " rocksdb.cfstats " ,
& stats ) )
fprintf ( stderr , " %s \n " , stats . c_str ( ) ) ;
@ -994,13 +1053,16 @@ struct ThreadState {
class Duration {
public :
Duration ( int max_seconds , int64_t max_ops ) {
Duration ( int max_seconds , int64_t max_ops , int64_t ops_per_stage = 0 ) {
max_seconds_ = max_seconds ;
max_ops_ = max_ops ;
ops_per_stage_ = ( ops_per_stage > 0 ) ? ops_per_stage : max_ops ;
ops_ = 0 ;
start_at_ = FLAGS_env - > NowMicros ( ) ;
}
int64_t GetStage ( ) { return std : : min ( ops_ , max_ops_ - 1 ) / ops_per_stage_ ; }
bool Done ( int64_t increment ) {
if ( increment < = 0 ) increment = 1 ; // avoid Done(0) and infinite loops
ops_ + = increment ;
@ -1021,6 +1083,7 @@ class Duration {
private :
int max_seconds_ ;
int64_t max_ops_ ;
int64_t ops_per_stage_ ;
int64_t ops_ ;
double start_at_ ;
} ;
@ -1040,6 +1103,7 @@ class Benchmark {
int64_t keys_per_prefix_ ;
int64_t entries_per_batch_ ;
WriteOptions write_options_ ;
Options open_options_ ; // keep options around to properly destroy db later
int64_t reads_ ;
int64_t writes_ ;
int64_t readwrites_ ;
@ -1355,24 +1419,12 @@ class Benchmark {
return base_name + ToString ( id ) ;
}
std : : string ColumnFamilyName ( int i ) {
if ( i = = 0 ) {
return kDefaultColumnFamilyName ;
} else {
char name [ 100 ] ;
snprintf ( name , sizeof ( name ) , " column_family_name_%06d " , i ) ;
return std : : string ( name ) ;
}
}
void Run ( ) {
Options open_options ; // keep options around to properly destroy db later
if ( ! SanityCheck ( ) ) {
exit ( 1 ) ;
}
PrintHeader ( ) ;
Open ( & open_options ) ;
Open ( & open_options_ ) ;
const char * benchmarks = FLAGS_benchmarks . c_str ( ) ;
while ( benchmarks ! = nullptr ) {
const char * sep = strchr ( benchmarks , ' , ' ) ;
@ -1533,15 +1585,15 @@ class Benchmark {
delete db_ . db ;
db_ . db = nullptr ;
db_ . cfh . clear ( ) ;
DestroyDB ( FLAGS_db , open_options ) ;
DestroyDB ( FLAGS_db , open_options_ ) ;
}
for ( size_t i = 0 ; i < multi_dbs_ . size ( ) ; i + + ) {
delete multi_dbs_ [ i ] . db ;
DestroyDB ( GetDbNameForMultiple ( FLAGS_db , i ) , open_options ) ;
DestroyDB ( GetDbNameForMultiple ( FLAGS_db , i ) , open_options_ ) ;
}
multi_dbs_ . clear ( ) ;
}
Open ( & open_options ) ; // use open_options for the last accessed
Open ( & open_options_ ) ; // use open_options for the last accessed
}
if ( method ! = nullptr ) {
@ -1996,6 +2048,8 @@ class Benchmark {
options . compression_opts . level = FLAGS_compression_level ;
options . WAL_ttl_seconds = FLAGS_wal_ttl_seconds ;
options . WAL_size_limit_MB = FLAGS_wal_size_limit_MB ;
options . max_total_wal_size = FLAGS_max_total_wal_size ;
if ( FLAGS_min_level_to_compress > = 0 ) {
assert ( FLAGS_min_level_to_compress < = FLAGS_num_levels ) ;
options . compression_per_level . resize ( FLAGS_num_levels ) ;
@ -2077,9 +2131,15 @@ class Benchmark {
Status s ;
// Open with column families if necessary.
if ( FLAGS_num_column_families > 1 ) {
db - > cfh . resize ( FLAGS_num_column_families ) ;
size_t num_hot = FLAGS_num_column_families ;
if ( FLAGS_num_hot_column_families > 0 & &
FLAGS_num_hot_column_families < FLAGS_num_column_families ) {
num_hot = FLAGS_num_hot_column_families ;
} else {
FLAGS_num_hot_column_families = FLAGS_num_column_families ;
}
std : : vector < ColumnFamilyDescriptor > column_families ;
for ( int i = 0 ; i < FLAGS_num_column_families ; i + + ) {
for ( size_ t i = 0 ; i < num_hot ; i + + ) {
column_families . push_back ( ColumnFamilyDescriptor (
ColumnFamilyName ( i ) , ColumnFamilyOptions ( options ) ) ) ;
}
@ -2089,6 +2149,10 @@ class Benchmark {
} else {
s = DB : : Open ( options , db_name , column_families , & db - > cfh , & db - > db ) ;
}
db - > cfh . resize ( FLAGS_num_column_families ) ;
db - > num_created = num_hot ;
db - > num_hot = num_hot ;
} else if ( FLAGS_readonly ) {
s = DB : : OpenForReadOnly ( options , db_name , & db - > db ) ;
} else {
@ -2185,9 +2249,18 @@ class Benchmark {
num_key_gens = multi_dbs_ . size ( ) ;
}
std : : vector < std : : unique_ptr < KeyGenerator > > key_gens ( num_key_gens ) ;
Duration duration ( test_duration , num_ops * num_key_gens ) ;
int64_t max_ops = num_ops * num_key_gens ;
int64_t ops_per_stage = max_ops ;
if ( FLAGS_num_column_families > 1 & & FLAGS_num_hot_column_families > 0 ) {
ops_per_stage = ( max_ops - 1 ) / ( FLAGS_num_column_families /
FLAGS_num_hot_column_families ) +
1 ;
}
Duration duration ( test_duration , max_ops , ops_per_stage ) ;
for ( size_t i = 0 ; i < num_key_gens ; i + + ) {
key_gens [ i ] . reset ( new KeyGenerator ( & ( thread - > rand ) , write_mode , num_ops ) ) ;
key_gens [ i ] . reset ( new KeyGenerator ( & ( thread - > rand ) , write_mode , num_ops ,
ops_per_stage ) ) ;
}
if ( num_ ! = FLAGS_num ) {
@ -2203,7 +2276,18 @@ class Benchmark {
Slice key = AllocateKey ( ) ;
std : : unique_ptr < const char [ ] > key_guard ( key . data ( ) ) ;
int64_t stage = 0 ;
while ( ! duration . Done ( entries_per_batch_ ) ) {
if ( duration . GetStage ( ) ! = stage ) {
stage = duration . GetStage ( ) ;
if ( db_ . db ! = nullptr ) {
db_ . CreateNewCf ( open_options_ , stage ) ;
} else {
for ( auto & db : multi_dbs_ ) {
db . CreateNewCf ( open_options_ , stage ) ;
}
}
}
size_t id = thread - > rand . Next ( ) % num_key_gens ;
DBWithColumnFamilies * db_with_cfh = SelectDBWithCfh ( id ) ;
batch . Clear ( ) ;
@ -2216,8 +2300,8 @@ class Benchmark {
// We use same rand_num as seed for key and column family so that we
// can deterministically find the cfh corresponding to a particular
// key while reading the key.
batch . Put ( db_with_cfh - > cfh [ rand_num % db_with_cfh - > cfh . size ( ) ] ,
key , gen . Generate ( value_size_ ) ) ;
batch . Put ( db_with_cfh - > GetCfh ( rand_num ) , key ,
gen . Generate ( value_size_ ) ) ;
}
bytes + = value_size_ + key_size_ ;
}
@ -2343,8 +2427,8 @@ class Benchmark {
read + + ;
Status s ;
if ( FLAGS_num_column_families > 1 ) {
s = db_with_cfh - > db - > Get ( options ,
db_with_cfh - > cfh [ key_rand % db_with_cfh - > cfh . size ( ) ] , key , & value ) ;
s = db_with_cfh - > db - > Get ( options , db_with_cfh - > GetCfh ( key_rand ) , key ,
& value ) ;
} else {
s = db_with_cfh - > db - > Get ( options , key , & value ) ;
}