@ -60,12 +60,14 @@ static bool ValidateUint32Range(const char* flagname, uint64_t value) {
return true ;
}
DEFINE_uint64 ( seed , 2341234 , " Seed for PRNG " ) ;
static const bool FLAGS_seed_dummy =
google : : RegisterFlagValidator ( & FLAGS_seed , & ValidateUint32Range ) ;
static const bool FLAGS_seed_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_seed , & ValidateUint32Range ) ;
DEFINE_int64 ( max_key , 1 * KB * KB * KB ,
DEFINE_int64 ( max_key , 1 * KB * KB ,
" Max number of key/values to place in database " ) ;
DEFINE_int32 ( column_families , 10 , " Number of column families " ) ;
DEFINE_bool ( test_batches_snapshots , false ,
" If set, the test uses MultiGet(), MultiPut() and MultiDelete() "
" which read/write/delete multiple keys in a batch. In this mode, "
@ -146,6 +148,10 @@ DEFINE_int32(max_background_compactions,
" The maximum number of concurrent background compactions "
" that can occur in parallel. " ) ;
DEFINE_int32 ( max_background_flushes , rocksdb : : Options ( ) . max_background_flushes ,
" The maximum number of concurrent background flushes "
" that can occur in parallel. " ) ;
DEFINE_int32 ( universal_size_ratio , 0 , " The ratio of file sizes that trigger "
" compaction in universal style " ) ;
@ -158,6 +164,11 @@ DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
DEFINE_int32 ( universal_max_size_amplification_percent , 0 ,
" The max size amplification for universal style compaction " ) ;
DEFINE_int32 ( clear_column_family_one_in , 1000000 ,
" With a chance of 1/N, delete a column family and then recreate "
" it again. If N == 0, never drop/create column families. "
" When test_batches_snapshots is true, this flag has no effect " ) ;
DEFINE_int64 ( cache_size , 2 * KB * KB * KB ,
" Number of bytes to use as a cache of uncompressed data. " ) ;
@ -170,8 +181,8 @@ static bool ValidateInt32Positive(const char* flagname, int32_t value) {
return true ;
}
DEFINE_int32 ( reopen , 10 , " Number of times database reopens " ) ;
static const bool FLAGS_reopen_dummy =
google : : RegisterFlagValidator ( & FLAGS_reopen , & ValidateInt32Positive ) ;
static const bool FLAGS_reopen_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_reopen , & ValidateInt32Positive ) ;
DEFINE_int32 ( bloom_bits , 10 , " Bloom filter bits per key. "
" Negative means use default settings. " ) ;
@ -198,9 +209,9 @@ DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
DEFINE_int32 ( kill_random_test , 0 ,
" If non-zero, kill at various points in source code with "
" probability 1/this " ) ;
static const bool FLAGS_kill_random_test_dummy =
google : : RegisterFlagValidator ( & FLAGS_kill_random_test ,
& ValidateInt32Positive ) ;
static const bool FLAGS_kill_random_test_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_kill_random_test ,
& ValidateInt32Positive ) ;
extern int rocksdb_kill_odds ;
DEFINE_bool ( disable_wal , false , " If true, do not write WAL for write. " ) ;
@ -226,42 +237,37 @@ static bool ValidateInt32Percent(const char* flagname, int32_t value) {
}
DEFINE_int32 ( readpercent , 10 ,
" Ratio of reads to total workload (expressed as a percentage) " ) ;
static const bool FLAGS_readpercent_dummy =
google : : RegisterFlagValidator ( & FLAGS_readpercent , & ValidateInt32Percent ) ;
static const bool FLAGS_readpercent_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_readpercent , & ValidateInt32Percent ) ;
DEFINE_int32 ( prefixpercent , 20 ,
" Ratio of prefix iterators to total workload (expressed as a "
" percentage) " ) ;
static const bool FLAGS_prefixpercent_dummy =
google : : RegisterFlagValidator ( & FLAGS_prefixpercent , & ValidateInt32Percent ) ;
static const bool FLAGS_prefixpercent_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_prefixpercent , & ValidateInt32Percent ) ;
DEFINE_int32 ( writepercent , 45 ,
" Ratio of deletes to total workload (expressed as a percentage) " ) ;
static const bool FLAGS_writepercent_dummy =
google : : RegisterFlagValidator ( & FLAGS_writepercent , & ValidateInt32Percent ) ;
static const bool FLAGS_writepercent_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_writepercent , & ValidateInt32Percent ) ;
DEFINE_int32 ( delpercent , 15 ,
" Ratio of deletes to total workload (expressed as a percentage) " ) ;
static const bool FLAGS_delpercent_dummy =
google : : RegisterFlagValidator ( & FLAGS_delpercent , & ValidateInt32Percent ) ;
static const bool FLAGS_delpercent_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_delpercent , & ValidateInt32Percent ) ;
DEFINE_int32 ( iterpercent , 10 , " Ratio of iterations to total workload "
" (expressed as a percentage) " ) ;
static const bool FLAGS_iterpercent_dummy =
google : : RegisterFlagValidator ( & FLAGS_iterpercent , & ValidateInt32Percent ) ;
static const bool FLAGS_iterpercent_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_iterpercent , & ValidateInt32Percent ) ;
DEFINE_uint64 ( num_iterations , 10 , " Number of iterations per MultiIterate run " ) ;
static const bool FLAGS_num_iterations_dummy =
google : : RegisterFlagValidator ( & FLAGS_num_iterations , & ValidateUint32Range ) ;
static const bool FLAGS_num_iterations_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_num_iterations , & ValidateUint32Range ) ;
DEFINE_bool ( disable_seek_compaction , false ,
" Option to disable compation triggered by read. " ) ;
DEFINE_uint64 ( delete_obsolete_files_period_micros , 0 ,
" Option to delete obsolete files periodically "
" 0 means that obsolete files are "
" deleted after every compaction run. " ) ;
enum rocksdb : : CompressionType StringToCompressionType ( const char * ctype ) {
assert ( ctype ) ;
@ -290,21 +296,21 @@ DEFINE_string(hdfs, "", "Name of hdfs environment");
// posix or hdfs environment
static rocksdb : : Env * FLAGS_env = rocksdb : : Env : : Default ( ) ;
DEFINE_uint64 ( ops_per_thread , 6 00000, " Number of operations per thread. " ) ;
static const bool FLAGS_ops_per_thread_dummy =
google : : RegisterFlagValidator ( & FLAGS_ops_per_thread , & ValidateUint32Range ) ;
DEFINE_uint64 ( ops_per_thread , 12 00000, " Number of operations per thread. " ) ;
static const bool FLAGS_ops_per_thread_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_ops_per_thread , & ValidateUint32Range ) ;
DEFINE_uint64 ( log2_keys_per_lock , 2 , " Log2 of number of keys per lock " ) ;
static const bool FLAGS_log2_keys_per_lock_dummy =
google : : RegisterFlagValidator ( & FLAGS_log2_keys_per_lock ,
& ValidateUint32Range ) ;
static const bool FLAGS_log2_keys_per_lock_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_log2_keys_per_lock ,
& ValidateUint32Range ) ;
DEFINE_int32 ( purge_redundant_percent , 50 ,
" Percentage of times we want to purge redundant keys in memory "
" before flushing " ) ;
static const bool FLAGS_purge_redundant_percent_dummy =
google : : RegisterFlagValidator ( & FLAGS_purge_redundant_percent ,
& ValidateInt32Percent ) ;
static const bool FLAGS_purge_redundant_percent_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_purge_redundant_percent ,
& ValidateInt32Percent ) ;
DEFINE_bool ( filter_deletes , false , " On true, deletes use KeyMayExist to drop "
" the delete if key not present " ) ;
@ -339,8 +345,8 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) {
return true ;
}
DEFINE_int32 ( prefix_size , 0 , " Control the prefix size for HashSkipListRep " ) ;
static const bool FLAGS_prefix_size_dummy =
google : : RegisterFlagValidator ( & FLAGS_prefix_size , & ValidatePrefixSize ) ;
static const bool FLAGS_prefix_size_dummy __attribute__ ( ( unused ) ) =
google : : RegisterFlagValidator ( & FLAGS_prefix_size , & ValidatePrefixSize ) ;
DEFINE_bool ( use_merge , false , " On true, replaces all writes with a Merge "
" that behaves like a Put " ) ;
@ -531,28 +537,27 @@ class SharedState {
start_verify_ ( false ) ,
stress_test_ ( stress_test ) {
if ( FLAGS_test_batches_snapshots ) {
key_locks_ = nullptr ;
values_ = nullptr ;
fprintf ( stdout , " No lock creation because test_batches_snapshots set \n " ) ;
return ;
}
values_ = new uint32_t [ max_key_ ] ;
for ( long i = 0 ; i < max_key_ ; i + + ) {
values_ [ i ] = SENTINEL ;
values_ . resize ( FLAGS_column_families ) ;
for ( int i = 0 ; i < FLAGS_column_families ; + + i ) {
values_ [ i ] = std : : vector < uint32_t > ( max_key_ , SENTINEL ) ;
}
long num_locks = ( max_key_ > > log2_keys_per_lock_ ) ;
if ( max_key_ & ( ( 1 < < log2_keys_per_lock_ ) - 1 ) ) {
num_locks + + ;
num_locks + + ;
}
fprintf ( stdout , " Creating %ld locks \n " , num_locks * FLAGS_column_families ) ;
key_locks_ . resize ( FLAGS_column_families ) ;
for ( int i = 0 ; i < FLAGS_column_families ; + + i ) {
key_locks_ [ i ] = std : : vector < port : : Mutex > ( num_locks ) ;
}
fprintf ( stdout , " Creating %ld locks \n " , num_locks ) ;
key_locks_ = new port : : Mutex [ num_locks ] ;
}
~ SharedState ( ) {
delete [ ] values_ ;
delete [ ] key_locks_ ;
}
~ SharedState ( ) { }
port : : Mutex * GetMutex ( ) {
return & mu_ ;
@ -622,26 +627,36 @@ class SharedState {
return start_verify_ ;
}
port : : Mutex * GetMutexForKey ( long key ) {
return & key_locks_ [ key > > log2_keys_per_lock_ ] ;
port : : Mutex * GetMutexForKey ( int cf , long key ) {
return & key_locks_ [ cf ] [ key > > log2_keys_per_lock_ ] ;
}
void Put ( long key , uint32_t value_base ) {
values_ [ key ] = value_base ;
void LockColumnFamily ( int cf ) {
for ( auto & mutex : key_locks_ [ cf ] ) {
mutex . Lock ( ) ;
}
}
uint32_t Get ( long key ) const {
return values_ [ key ] ;
void UnlockColumnFamily ( int cf ) {
for ( auto & mutex : key_locks_ [ cf ] ) {
mutex . Unlock ( ) ;
}
}
void Delete ( long key ) const {
values_ [ key ] = SENTINEL ;
void ClearColumnFamily ( int cf ) {
std : : fill ( values_ [ cf ] . begin ( ) , values_ [ cf ] . end ( ) , SENTINEL ) ;
}
uint32_t GetSeed ( ) const {
return seed_ ;
void Put ( int cf , long key , uint32_t value_base ) {
values_ [ cf ] [ key ] = value_base ;
}
uint32_t Get ( int cf , long key ) const { return values_ [ cf ] [ key ] ; }
void Delete ( int cf , long key ) { values_ [ cf ] [ key ] = SENTINEL ; }
uint32_t GetSeed ( ) const { return seed_ ; }
private :
port : : Mutex mu_ ;
port : : CondVar cv_ ;
@ -657,9 +672,8 @@ class SharedState {
bool start_verify_ ;
StressTest * stress_test_ ;
uint32_t * values_ ;
port : : Mutex * key_locks_ ;
std : : vector < std : : vector < uint32_t > > values_ ;
std : : vector < std : : vector < port : : Mutex > > key_locks_ ;
} ;
// Per-thread state for concurrent executions of the same benchmark.
@ -682,13 +696,14 @@ class StressTest {
public :
StressTest ( )
: cache_ ( NewLRUCache ( FLAGS_cache_size ) ) ,
compressed_cache_ ( FLAGS_compressed_cache_size > = 0 ?
NewLRUCache ( FLAGS_compressed_cache_size ) :
nullptr ) ,
compressed_cache_ ( FLAGS_compressed_cache_size > = 0
? NewLRUCache ( FLAGS_compressed_cache_size )
: nullptr ) ,
filter_policy_ ( FLAGS_bloom_bits > = 0
? NewBloomFilterPolicy ( FLAGS_bloom_bits )
: nullptr ) ,
? NewBloomFilterPolicy ( FLAGS_bloom_bits )
: nullptr ) ,
db_ ( nullptr ) ,
new_column_family_name_ ( 0 ) ,
num_times_reopened_ ( 0 ) {
if ( FLAGS_destroy_db_initially ) {
std : : vector < std : : string > files ;
@ -703,6 +718,10 @@ class StressTest {
}
~ StressTest ( ) {
for ( auto cf : column_families_ ) {
delete cf ;
}
column_families_ . clear ( ) ;
delete db_ ;
delete filter_policy_ ;
}
@ -817,9 +836,9 @@ class StressTest {
// Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
// ("9"+K, "9"+V) in DB atomically i.e in a single batch.
// Also refer MultiGet.
Status MultiPut ( ThreadState * thread ,
const WriteOptions & writeoptions ,
const Slice & key , const Slice & value , size_t sz ) {
Status MultiPut ( ThreadState * thread , const WriteOptions & writeoptions ,
ColumnFamilyHandle * column_family , const Slice & key ,
const Slice & value , size_t sz ) {
std : : string keys [ 10 ] = { " 9 " , " 8 " , " 7 " , " 6 " , " 5 " ,
" 4 " , " 3 " , " 2 " , " 1 " , " 0 " } ;
std : : string values [ 10 ] = { " 9 " , " 8 " , " 7 " , " 6 " , " 5 " ,
@ -832,9 +851,9 @@ class StressTest {
values [ i ] + = value . ToString ( ) ;
value_slices [ i ] = values [ i ] ;
if ( FLAGS_use_merge ) {
batch . Merge ( keys [ i ] , value_slices [ i ] ) ;
batch . Merge ( column_family - > GetID ( ) , keys [ i ] , value_slices [ i ] ) ;
} else {
batch . Put ( keys [ i ] , value_slices [ i ] ) ;
batch . Put ( column_family - > GetID ( ) , keys [ i ] , value_slices [ i ] ) ;
}
}
@ -852,9 +871,8 @@ class StressTest {
// Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K)
// in DB atomically i.e in a single batch. Also refer MultiGet.
Status MultiDelete ( ThreadState * thread ,
const WriteOptions & writeoptions ,
const Slice & key ) {
Status MultiDelete ( ThreadState * thread , const WriteOptions & writeoptions ,
ColumnFamilyHandle * column_family , const Slice & key ) {
std : : string keys [ 10 ] = { " 9 " , " 7 " , " 5 " , " 3 " , " 1 " ,
" 8 " , " 6 " , " 4 " , " 2 " , " 0 " } ;
@ -862,7 +880,7 @@ class StressTest {
Status s ;
for ( int i = 0 ; i < 10 ; i + + ) {
keys [ i ] + = key . ToString ( ) ;
batch . Delete ( keys [ i ] ) ;
batch . Delete ( column_family - > GetID ( ) , keys [ i ] ) ;
}
s = db_ - > Write ( writeoptions , & batch ) ;
@ -880,9 +898,9 @@ class StressTest {
// in the same snapshot, and verifies that all the values are of the form
// "0"+V, "1"+V,..."9"+V.
// ASSUMES that MultiPut was used to put (K, V) into the DB.
Status MultiGet ( ThreadState * thread ,
const ReadOptions & readoptions ,
const Slice & key , std : : string * value ) {
Status MultiGet ( ThreadState * thread , const ReadOptions & readoptions ,
ColumnFamilyHandle * column_family , const Slice & key ,
std : : string * value ) {
std : : string keys [ 10 ] = { " 0 " , " 1 " , " 2 " , " 3 " , " 4 " , " 5 " , " 6 " , " 7 " , " 8 " , " 9 " } ;
Slice key_slices [ 10 ] ;
std : : string values [ 10 ] ;
@ -892,7 +910,7 @@ class StressTest {
for ( int i = 0 ; i < 10 ; i + + ) {
keys [ i ] + = key . ToString ( ) ;
key_slices [ i ] = keys [ i ] ;
s = db_ - > Get ( readoptionscopy , key_slices [ i ] , value ) ;
s = db_ - > Get ( readoptionscopy , column_family , key_slices [ i ] , value ) ;
if ( ! s . ok ( ) & & ! s . IsNotFound ( ) ) {
fprintf ( stderr , " get error: %s \n " , s . ToString ( ) . c_str ( ) ) ;
values [ i ] = " " ;
@ -937,8 +955,8 @@ class StressTest {
// for each index i that all the i'th values are of the form "0"+V,
// "1"+V,..."9"+V.
// ASSUMES that MultiPut was used to put (K, V)
Status MultiPrefixScan ( ThreadState * thread ,
const ReadOptions & readoptions ,
Status MultiPrefixScan ( ThreadState * thread , const ReadOptions & readoptions ,
ColumnFamilyHandle * column_family ,
const Slice & prefix ) {
std : : string prefixes [ 10 ] = { " 0 " , " 1 " , " 2 " , " 3 " , " 4 " ,
" 5 " , " 6 " , " 7 " , " 8 " , " 9 " } ;
@ -953,7 +971,7 @@ class StressTest {
readoptionscopy [ i ] = readoptions ;
readoptionscopy [ i ] . prefix = & prefix_slices [ i ] ;
readoptionscopy [ i ] . snapshot = snapshot ;
iters [ i ] = db_ - > NewIterator ( readoptionscopy [ i ] ) ;
iters [ i ] = db_ - > NewIterator ( readoptionscopy [ i ] , column_family ) ;
iters [ i ] - > SeekToFirst ( ) ;
}
@ -1009,14 +1027,13 @@ class StressTest {
// Given a key K, this creates an iterator which scans to K and then
// does a random sequence of Next/Prev operations.
Status MultiIterate ( ThreadState * thread ,
const ReadOptions & readoptions ,
const Slice & key ) {
Status MultiIterate ( ThreadState * thread , const ReadOptions & readoptions ,
ColumnFamilyHandle * column_family , const Slice & key ) {
Status s ;
const Snapshot * snapshot = db_ - > GetSnapshot ( ) ;
ReadOptions readoptionscopy = readoptions ;
readoptionscopy . snapshot = snapshot ;
unique_ptr < Iterator > iter ( db_ - > NewIterator ( readoptionscopy ) ) ;
unique_ptr < Iterator > iter ( db_ - > NewIterator ( readoptionscopy , column_family ) ) ;
iter - > Seek ( key ) ;
for ( uint64_t i = 0 ; i < FLAGS_num_iterations & & iter - > Valid ( ) ; i + + ) {
@ -1071,15 +1088,50 @@ class StressTest {
}
}
if ( ! FLAGS_test_batches_snapshots & &
FLAGS_clear_column_family_one_in ! = 0 ) {
if ( thread - > rand . OneIn ( FLAGS_clear_column_family_one_in ) ) {
// drop column family and then create it again (can't drop default)
int cf = thread - > rand . Next ( ) % ( FLAGS_column_families - 1 ) + 1 ;
std : : string new_name =
std : : to_string ( new_column_family_name_ . fetch_add ( 1 ) ) ;
{
MutexLock l ( thread - > shared - > GetMutex ( ) ) ;
fprintf (
stdout ,
" [CF %d] Dropping and recreating column family. new name: %s \n " ,
cf , new_name . c_str ( ) ) ;
}
thread - > shared - > LockColumnFamily ( cf ) ;
Status s __attribute__ ( ( unused ) ) ;
s = db_ - > DropColumnFamily ( column_families_ [ cf ] ) ;
delete column_families_ [ cf ] ;
assert ( s . ok ( ) ) ;
s = db_ - > CreateColumnFamily ( ColumnFamilyOptions ( options_ ) , new_name ,
& column_families_ [ cf ] ) ;
column_family_names_ [ cf ] = new_name ;
thread - > shared - > ClearColumnFamily ( cf ) ;
assert ( s . ok ( ) ) ;
thread - > shared - > UnlockColumnFamily ( cf ) ;
}
}
long rand_key = thread - > rand . Next ( ) % max_key ;
int rand_column_family = thread - > rand . Next ( ) % FLAGS_column_families ;
std : : string keystr = Key ( rand_key ) ;
Slice key = keystr ;
int prob_op = thread - > rand . Uniform ( 100 ) ;
std : : unique_ptr < MutexLock > l ;
if ( ! FLAGS_test_batches_snapshots ) {
l . reset ( new MutexLock (
thread - > shared - > GetMutexForKey ( rand_column_family , rand_key ) ) ) ;
}
auto column_family = column_families_ [ rand_column_family ] ;
if ( prob_op > = 0 & & prob_op < ( int ) FLAGS_readpercent ) {
// OPERATION read
if ( ! FLAGS_test_batches_snapshots ) {
Status s = db_ - > Get ( read_opts , key , & from_db ) ;
Status s = db_ - > Get ( read_opts , column_family , key , & from_db ) ;
if ( s . ok ( ) ) {
// found case
thread - > stats . AddGets ( 1 , 1 ) ;
@ -1091,7 +1143,7 @@ class StressTest {
thread - > stats . AddErrors ( 1 ) ;
}
} else {
MultiGet ( thread , read_opts , key , & from_db ) ;
MultiGet ( thread , read_opts , column_family , key , & from_db ) ;
}
} else if ( ( int ) FLAGS_readpercent < = prob_op & & prob_op < prefixBound ) {
// OPERATION prefix scan
@ -1101,7 +1153,7 @@ class StressTest {
Slice prefix = Slice ( key . data ( ) , key . size ( ) - 1 ) ;
if ( ! FLAGS_test_batches_snapshots ) {
read_opts . prefix = & prefix ;
Iterator * iter = db_ - > NewIterator ( read_opts ) ;
Iterator * iter = db_ - > NewIterator ( read_opts , column_family ) ;
int count = 0 ;
for ( iter - > SeekToFirst ( ) ; iter - > Valid ( ) ; iter - > Next ( ) ) {
assert ( iter - > key ( ) . starts_with ( prefix ) ) ;
@ -1115,7 +1167,7 @@ class StressTest {
}
delete iter ;
} else {
MultiPrefixScan ( thread , read_opts , prefix ) ;
MultiPrefixScan ( thread , read_opts , column_family , prefix ) ;
}
read_opts . prefix = nullptr ;
} else if ( prefixBound < = prob_op & & prob_op < writeBound ) {
@ -1124,42 +1176,36 @@ class StressTest {
size_t sz = GenerateValue ( value_base , value , sizeof ( value ) ) ;
Slice v ( value , sz ) ;
if ( ! FLAGS_test_batches_snapshots ) {
MutexLock l ( thread - > shared - > GetMutexForKey ( rand_key ) ) ;
if ( FLAGS_verify_before_write ) {
std : : string keystr2 = Key ( rand_key ) ;
Slice k = keystr2 ;
Status s = db_ - > Get ( read_opts , k , & from_db ) ;
VerifyValue ( rand_key ,
read_opts ,
* ( thread - > shared ) ,
from_db ,
s ,
true ) ;
Status s = db_ - > Get ( read_opts , column_family , k , & from_db ) ;
VerifyValue ( rand_column_family , rand_key , read_opts ,
* ( thread - > shared ) , from_db , s , true ) ;
}
thread - > shared - > Put ( rand_key , value_base ) ;
thread - > shared - > Put ( rand_column_family , rand_key , value_base ) ;
if ( FLAGS_use_merge ) {
db_ - > Merge ( write_opts , key , v ) ;
db_ - > Merge ( write_opts , column_family , key , v ) ;
} else {
db_ - > Put ( write_opts , key , v ) ;
db_ - > Put ( write_opts , column_family , key , v ) ;
}
thread - > stats . AddBytesForWrites ( 1 , sz ) ;
} else {
MultiPut ( thread , write_opts , key , v , sz ) ;
MultiPut ( thread , write_opts , column_family , key , v , sz ) ;
}
PrintKeyValue ( rand_key , value , sz ) ;
PrintKeyValue ( rand_column_family , rand_ key , value , sz ) ;
} else if ( writeBound < = prob_op & & prob_op < delBound ) {
// OPERATION delete
if ( ! FLAGS_test_batches_snapshots ) {
MutexLock l ( thread - > shared - > GetMutexForKey ( rand_key ) ) ;
thread - > shared - > Delete ( rand_key ) ;
db_ - > Delete ( write_opts , key ) ;
thread - > shared - > Delete ( rand_column_family , rand_key ) ;
db_ - > Delete ( write_opts , column_family , key ) ;
thread - > stats . AddDeletes ( 1 ) ;
} else {
MultiDelete ( thread , write_opts , key ) ;
MultiDelete ( thread , write_opts , column_family , key ) ;
}
} else {
// OPERATION iterate
MultiIterate ( thread , read_opts , key ) ;
MultiIterate ( thread , read_opts , column_family , key ) ;
}
thread - > stats . FinishedSingleOp ( ) ;
}
@ -1177,91 +1223,93 @@ class StressTest {
if ( thread - > tid = = shared . GetNumThreads ( ) - 1 ) {
end = max_key ;
}
if ( ! thread - > rand . OneIn ( 2 ) ) {
// Use iterator to verify this range
unique_ptr < Iterator > iter ( db_ - > NewIterator ( options ) ) ;
iter - > Seek ( Key ( start ) ) ;
for ( long i = start ; i < end ; i + + ) {
std : : string from_db ;
std : : string keystr = Key ( i ) ;
Slice k = keystr ;
Status s = iter - > status ( ) ;
if ( iter - > Valid ( ) ) {
if ( iter - > key ( ) . compare ( k ) > 0 ) {
for ( size_t cf = 0 ; cf < column_families_ . size ( ) ; + + cf ) {
if ( ! thread - > rand . OneIn ( 2 ) ) {
// Use iterator to verify this range
unique_ptr < Iterator > iter (
db_ - > NewIterator ( options , column_families_ [ cf ] ) ) ;
iter - > Seek ( Key ( start ) ) ;
for ( long i = start ; i < end ; i + + ) {
std : : string from_db ;
std : : string keystr = Key ( i ) ;
Slice k = keystr ;
Status s = iter - > status ( ) ;
if ( iter - > Valid ( ) ) {
if ( iter - > key ( ) . compare ( k ) > 0 ) {
s = Status : : NotFound ( Slice ( ) ) ;
} else if ( iter - > key ( ) . compare ( k ) = = 0 ) {
from_db = iter - > value ( ) . ToString ( ) ;
iter - > Next ( ) ;
} else if ( iter - > key ( ) . compare ( k ) < 0 ) {
VerificationAbort ( " An out of range key was found " , cf , i ) ;
}
} else {
// The iterator found no value for the key in question, so do not
// move to the next item in the iterator
s = Status : : NotFound ( Slice ( ) ) ;
} else if ( iter - > key ( ) . compare ( k ) = = 0 ) {
from_db = iter - > value ( ) . ToString ( ) ;
iter - > Next ( ) ;
} else if ( iter - > key ( ) . compare ( k ) < 0 ) {
VerificationAbort ( " An out of range key was found " , i ) ;
}
} else {
// The iterator found no value for the key in question, so do not
// move to the next item in the iterator
s = Status : : NotFound ( Slice ( ) ) ;
}
VerifyValue ( i , options , shared , from_db , s , true ) ;
if ( from_db . length ( ) ) {
PrintKeyValue ( i , from_db . data ( ) , from_db . length ( ) ) ;
VerifyValue ( cf , i , options , shared , from_db , s , true ) ;
if ( from_db . length ( ) ) {
PrintKeyValue ( cf , i , from_db . data ( ) , from_db . length ( ) ) ;
}
}
}
}
else {
// Use Get to verify this range
for ( long i = start ; i < end ; i + + ) {
std : : string from_db ;
std : : string keystr = Key ( i ) ;
Slice k = keystr ;
Status s = db_ - > Get ( options , k , & from_db ) ;
VerifyValue ( i , options , shared , from_db , s , true ) ;
if ( from_db . length ( ) ) {
PrintKeyValue ( i , from_db . data ( ) , from_db . length ( ) ) ;
} else {
// Use Get to verify this range
for ( long i = start ; i < end ; i + + ) {
std : : string from_db ;
std : : string keystr = Key ( i ) ;
Slice k = keystr ;
Status s = db_ - > Get ( options , column_families_ [ cf ] , k , & from_db ) ;
if ( from_db . length ( ) ) {
PrintKeyValue ( cf , i , from_db . data ( ) , from_db . length ( ) ) ;
}
VerifyValue ( cf , i , options , shared , from_db , s , true ) ;
}
}
}
}
void VerificationAbort ( std : : string msg , long key ) const {
fprintf ( stderr , " Verification failed for key %ld: %s \n " ,
key , msg . c_str ( ) ) ;
void VerificationAbort ( std : : string msg , int cf , long key ) const {
fprintf ( stderr , " Verification failed for column family %d key %ld: %s \n " ,
cf , key , msg . c_str ( ) ) ;
exit ( 1 ) ;
}
void VerifyValue ( long key ,
const ReadOptions & opts ,
const SharedState & shared ,
const std : : string & value_from_db ,
Status s ,
bool strict = false ) const {
void VerifyValue ( int cf , long key , const ReadOptions & opts ,
const SharedState & shared , const std : : string & value_from_db ,
Status s , bool strict = false ) const {
// compare value_from_db with the value in the shared state
char value [ 100 ] ;
uint32_t value_base = shared . Get ( key ) ;
uint32_t value_base = shared . Get ( cf , key ) ;
if ( value_base = = SharedState : : SENTINEL & & ! strict ) {
return ;
}
if ( s . ok ( ) ) {
if ( value_base = = SharedState : : SENTINEL ) {
VerificationAbort ( " Unexpected value found " , key ) ;
VerificationAbort ( " Unexpected value found " , cf , key ) ;
}
size_t sz = GenerateValue ( value_base , value , sizeof ( value ) ) ;
if ( value_from_db . length ( ) ! = sz ) {
VerificationAbort ( " Length of value read is not equal " , key ) ;
VerificationAbort ( " Length of value read is not equal " , cf , key ) ;
}
if ( memcmp ( value_from_db . data ( ) , value , sz ) ! = 0 ) {
VerificationAbort ( " Contents of value read don't match " , key ) ;
VerificationAbort ( " Contents of value read don't match " , cf , key ) ;
}
} else {
if ( value_base ! = SharedState : : SENTINEL ) {
VerificationAbort ( " Value not found " , key ) ;
VerificationAbort ( " Value not found " , cf , key ) ;
}
}
}
static void PrintKeyValue ( uint32_t key , const char * value , size_t sz ) {
if ( ! FLAGS_verbose ) return ;
fprintf ( stdout , " %u ==> (%u) " , key , ( unsigned int ) sz ) ;
for ( size_t i = 0 ; i < sz ; i + + ) {
static void PrintKeyValue ( int cf , uint32_t key , const char * value ,
size_t sz ) {
if ( ! FLAGS_verbose ) {
return ;
}
fprintf ( stdout , " [CF %d] %u ==> (%u) " , cf , key , ( unsigned int ) sz ) ;
for ( size_t i = 0 ; i < sz ; i + + ) {
fprintf ( stdout , " %X " , value [ i ] ) ;
}
fprintf ( stdout , " \n " ) ;
@ -1279,8 +1327,13 @@ class StressTest {
}
void PrintEnv ( ) const {
fprintf ( stdout , " LevelDB version : %d.%d \n " ,
kMajorVersion , kMinorVersion ) ;
fprintf ( stdout , " RocksDB version : %d.%d \n " , kMajorVersion ,
kMinorVersion ) ;
fprintf ( stdout , " Column families : %d \n " , FLAGS_column_families ) ;
if ( ! FLAGS_test_batches_snapshots ) {
fprintf ( stdout , " Clear CFs one in : %d \n " ,
FLAGS_clear_column_family_one_in ) ;
}
fprintf ( stdout , " Number of threads : %d \n " , FLAGS_threads ) ;
fprintf ( stdout ,
" Ops per thread : %lu \n " ,
@ -1357,43 +1410,41 @@ class StressTest {
void Open ( ) {
assert ( db_ = = nullptr ) ;
Options options ;
options . block_cache = cache_ ;
options . block_cache_compressed = compressed_cache_ ;
options . write_buffer_size = FLAGS_write_buffer_size ;
options . max_write_buffer_number = FLAGS_max_write_buffer_number ;
options . min_write_buffer_number_to_merge =
FLAGS_min_write_buffer_number_to_merge ;
options . max_background_compactions = FLAGS_max_background_compaction s ;
options . compaction_style =
static_cast < rocksdb : : CompactionStyle > ( FLAGS_compaction_style ) ;
options . block_size = FLAGS_block_size ;
options . filter_policy = filter_policy_ ;
options . prefix_extractor . reset ( NewFixedPrefixTransform ( FLAGS_prefix_size ) ) ;
options . max_open_files = FLAGS_open_files ;
options . statistics = dbstats ;
options . env = FLAGS_env ;
options . disableDataSync = FLAGS_disable_data_sync ;
options . use_fsync = FLAGS_use_fsync ;
options . allow_mmap_reads = FLAGS_mmap_read ;
options_ . block_cache = cache_ ;
options_ . block_cache_compressed = compressed_ cache_;
options_ . write_buffer_size = FLAGS_write_buffer_size ;
options_ . max_write_buffer_number = FLAGS_max_write_buffer_number ;
options_ . min_write_buffer_number_to_merge =
FLAGS_min_write_buffer_number_to_merge ;
options_ . max_background_compactions = FLAGS_max_background_compactions ;
options_ . max_background_flushes = FLAGS_max_background_flushe s ;
options_ . compaction_style =
static_cast < rocksdb : : CompactionStyle > ( FLAGS_compaction_style ) ;
options_ . block_size = FLAGS_block_size ;
options_ . filter_policy = filter_policy_ ;
options_ . prefix_extractor . reset ( NewFixedPrefixTransform ( FLAGS_prefix_size ) ) ;
options_ . max_open_files = FLAGS_open_files ;
options_ . statistics = dbstats ;
options_ . env = FLAGS_env ;
options_ . disableDataSync = FLAGS_disable_data_sync ;
options_ . use_fsync = FLAGS_use_fsync ;
options_ . allow_mmap_reads = FLAGS_mmap_read ;
rocksdb_kill_odds = FLAGS_kill_random_test ;
options . target_file_size_base = FLAGS_target_file_size_base ;
options . target_file_size_multiplier = FLAGS_target_file_size_multiplier ;
options . max_bytes_for_level_base = FLAGS_max_bytes_for_level_base ;
options . max_bytes_for_level_multiplier =
options_ . target_file_size_base = FLAGS_target_file_size_base ;
options_ . target_file_size_multiplier = FLAGS_target_file_size_multiplier ;
options_ . max_bytes_for_level_base = FLAGS_max_bytes_for_level_base ;
options_ . max_bytes_for_level_multiplier =
FLAGS_max_bytes_for_level_multiplier ;
options . level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger ;
options . level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger ;
options . level0_file_num_compaction_trigger =
FLAGS_level0_file_num_compaction_trigger ;
options . compression = FLAGS_compression_type_e ;
options . create_if_missing = true ;
options . disable_seek_compaction = FLAGS_disable_seek_compaction ;
options . delete_obsolete_files_period_micros =
FLAGS_delete_obsolete_files_period_micros ;
options . max_manifest_file_size = 1024 ;
options . filter_deletes = FLAGS_filter_deletes ;
options_ . level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger ;
options_ . level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger ;
options_ . level0_file_num_compaction_trigger =
FLAGS_level0_file_num_compaction_trigger ;
options_ . compression = FLAGS_compression_type_e ;
options_ . create_if_missing = true ;
options_ . disable_seek_compaction = FLAGS_disable_seek_compaction ;
options_ . max_manifest_file_size = 10 * 1024 ;
options_ . filter_deletes = FLAGS_filter_deletes ;
if ( ( FLAGS_prefix_size = = 0 ) = = ( FLAGS_rep_factory = = kHashSkipList ) ) {
fprintf ( stderr ,
" prefix_size should be non-zero iff memtablerep == prefix_hash \n " ) ;
@ -1401,51 +1452,107 @@ class StressTest {
}
switch ( FLAGS_rep_factory ) {
case kHashSkipList :
options . memtable_factory . reset ( NewHashSkipListRepFactory ( ) ) ;
options_ . memtable_factory . reset ( NewHashSkipListRepFactory ( ) ) ;
break ;
case kSkipList :
// no need to do anything
break ;
case kVectorRep :
options . memtable_factory . reset ( new VectorRepFactory ( ) ) ;
options_ . memtable_factory . reset ( new VectorRepFactory ( ) ) ;
break ;
}
static Random purge_percent ( 1000 ) ; // no benefit from non-determinism here
if ( static_cast < int32_t > ( purge_percent . Uniform ( 100 ) ) <
FLAGS_purge_redundant_percent - 1 ) {
options . purge_redundant_kvs_while_flush = false ;
options_ . purge_redundant_kvs_while_flush = false ;
}
if ( FLAGS_use_merge ) {
options . merge_operator = MergeOperators : : CreatePutOperator ( ) ;
options_ . merge_operator = MergeOperators : : CreatePutOperator ( ) ;
}
// set universal style compaction configurations, if applicable
if ( FLAGS_universal_size_ratio ! = 0 ) {
options . compaction_options_universal . size_ratio =
FLAGS_universal_size_ratio ;
options_ . compaction_options_universal . size_ratio =
FLAGS_universal_size_ratio ;
}
if ( FLAGS_universal_min_merge_width ! = 0 ) {
options . compaction_options_universal . min_merge_width =
FLAGS_universal_min_merge_width ;
options_ . compaction_options_universal . min_merge_width =
FLAGS_universal_min_merge_width ;
}
if ( FLAGS_universal_max_merge_width ! = 0 ) {
options . compaction_options_universal . max_merge_width =
FLAGS_universal_max_merge_width ;
options_ . compaction_options_universal . max_merge_width =
FLAGS_universal_max_merge_width ;
}
if ( FLAGS_universal_max_size_amplification_percent ! = 0 ) {
options . compaction_options_universal . max_size_amplification_percent =
FLAGS_universal_max_size_amplification_percent ;
options_ . compaction_options_universal . max_size_amplification_percent =
FLAGS_universal_max_size_amplification_percent ;
}
fprintf ( stdout , " DB path: [%s] \n " , FLAGS_db . c_str ( ) ) ;
Status s ;
if ( FLAGS_ttl = = - 1 ) {
s = DB : : Open ( options , FLAGS_db , & db_ ) ;
std : : vector < std : : string > existing_column_families ;
s = DB : : ListColumnFamilies ( DBOptions ( options_ ) , FLAGS_db ,
& existing_column_families ) ; // ignore errors
if ( ! s . ok ( ) ) {
// DB doesn't exist
assert ( existing_column_families . empty ( ) ) ;
assert ( column_family_names_ . empty ( ) ) ;
column_family_names_ . push_back ( default_column_family_name ) ;
} else if ( column_family_names_ . empty ( ) ) {
// this is the first call to the function Open()
column_family_names_ = existing_column_families ;
} else {
// this is a reopen. just assert that existing column_family_names are
// equivalent to what we remember
auto sorted_cfn = column_family_names_ ;
sort ( sorted_cfn . begin ( ) , sorted_cfn . end ( ) ) ;
sort ( existing_column_families . begin ( ) , existing_column_families . end ( ) ) ;
if ( sorted_cfn ! = existing_column_families ) {
fprintf ( stderr ,
" Expected column families differ from the existing: \n " ) ;
printf ( " Expected: { " ) ;
for ( auto cf : sorted_cfn ) {
printf ( " %s " , cf . c_str ( ) ) ;
}
printf ( " } \n " ) ;
printf ( " Existing: { " ) ;
for ( auto cf : existing_column_families ) {
printf ( " %s " , cf . c_str ( ) ) ;
}
printf ( " } \n " ) ;
}
assert ( sorted_cfn = = existing_column_families ) ;
}
std : : vector < ColumnFamilyDescriptor > cf_descriptors ;
for ( auto name : column_family_names_ ) {
if ( name ! = default_column_family_name ) {
new_column_family_name_ =
std : : max ( new_column_family_name_ . load ( ) , std : : stoi ( name ) + 1 ) ;
}
cf_descriptors . emplace_back ( name , ColumnFamilyOptions ( options_ ) ) ;
}
s = DB : : Open ( DBOptions ( options_ ) , FLAGS_db , cf_descriptors ,
& column_families_ , & db_ ) ;
if ( s . ok ( ) ) {
while ( s . ok ( ) & &
column_families_ . size ( ) < ( size_t ) FLAGS_column_families ) {
ColumnFamilyHandle * cf = nullptr ;
std : : string name = std : : to_string ( new_column_family_name_ . load ( ) ) ;
new_column_family_name_ + + ;
s = db_ - > CreateColumnFamily ( ColumnFamilyOptions ( options_ ) , name , & cf ) ;
column_families_ . push_back ( cf ) ;
column_family_names_ . push_back ( name ) ;
}
}
assert ( ! s . ok ( ) | | column_families_ . size ( ) = =
static_cast < size_t > ( FLAGS_column_families ) ) ;
} else {
s = UtilityDB : : OpenTtlDB ( options , FLAGS_db , & sdb_ , FLAGS_ttl ) ;
db_ = sdb_ ;
StackableDB * sdb ;
s = UtilityDB : : OpenTtlDB ( options_ , FLAGS_db , & sdb , FLAGS_ttl ) ;
db_ = sdb ;
}
if ( ! s . ok ( ) ) {
fprintf ( stderr , " open error: %s \n " , s . ToString ( ) . c_str ( ) ) ;
@ -1454,13 +1561,11 @@ class StressTest {
}
void Reopen ( ) {
// do not close the db. Just delete the lock file. This
// simulates a crash-recovery kind of situation.
if ( FLAGS_ttl ! = - 1 ) {
( ( DBWithTTL * ) db_ ) - > TEST_Destroy_DBWithTtl ( ) ;
} else {
( ( DBImpl * ) db_ ) - > TEST_Destroy_DBImpl ( ) ;
for ( auto cf : column_families_ ) {
delete cf ;
}
column_families_ . clear ( ) ;
delete db_ ;
db_ = nullptr ;
num_times_reopened_ + + ;
@ -1482,14 +1587,15 @@ class StressTest {
shared_ptr < Cache > compressed_cache_ ;
const FilterPolicy * filter_policy_ ;
DB * db_ ;
StackableDB * sdb_ ;
Options options_ ;
std : : vector < ColumnFamilyHandle * > column_families_ ;
std : : vector < std : : string > column_family_names_ ;
std : : atomic < int > new_column_family_name_ ;
int num_times_reopened_ ;
} ;
} // namespace rocksdb
int main ( int argc , char * * argv ) {
google : : SetUsageMessage ( std : : string ( " \n USAGE: \n " ) + std : : string ( argv [ 0 ] ) +
" [OPTIONS]... " ) ;