@ -32,6 +32,7 @@
# include <cstddef>
# include <memory>
# include <mutex>
# include <queue>
# include <thread>
# include <unordered_map>
@ -351,6 +352,42 @@ DEFINE_uint32(overwrite_window_size, 1,
" Warning: large values can affect throughput. "
" Valid overwrite_window_size values: [1, kMaxUint32]. " ) ;
DEFINE_uint64 (
disposable_entries_delete_delay , 0 ,
" Minimum delay in microseconds for the series of Deletes "
" to be issued. When 0 the insertion of the last disposable entry is "
" immediately followed by the issuance of the Deletes. "
" (only compatible with fillanddeleteuniquerandom benchmark). " ) ;
DEFINE_uint64 ( disposable_entries_batch_size , 0 ,
" Number of consecutively inserted disposable KV entries "
" that will be deleted after 'delete_delay' microseconds. "
" A series of Deletes is always issued once all the "
" disposable KV entries it targets have been inserted "
" into the DB. When 0 no deletes are issued and a "
" regular 'filluniquerandom' benchmark occurs. "
" (only compatible with fillanddeleteuniquerandom benchmark) " ) ;
DEFINE_int32 ( disposable_entries_value_size , 64 ,
" Size of the values (in bytes) of the entries targeted by "
" selective deletes. "
" (only compatible with fillanddeleteuniquerandom benchmark) " ) ;
DEFINE_uint64 (
persistent_entries_batch_size , 0 ,
" Number of KV entries being inserted right before the deletes "
" targeting the disposable KV entries are issued. These "
" persistent keys are not targeted by the deletes, and will always "
" remain valid in the DB. (only compatible with "
" --benchmarks='fillanddeleteuniquerandom' "
" and used when--disposable_entries_batch_size is > 0). " ) ;
DEFINE_int32 ( persistent_entries_value_size , 64 ,
" Size of the values (in bytes) of the entries not targeted by "
" deletes. (only compatible with "
" --benchmarks='fillanddeleteuniquerandom' "
" and used when--disposable_entries_batch_size is > 0). " ) ;
DEFINE_double ( read_random_exp_range , 0.0 ,
" Read random's key will be generated using distribution of "
" num * exp(-r) where r is uniform number from 0 to this value. "
@ -3265,12 +3302,13 @@ class Benchmark {
} else if ( name = = " fillrandom " ) {
fresh_db = true ;
method = & Benchmark : : WriteRandom ;
} else if ( name = = " filluniquerandom " ) {
} else if ( name = = " filluniquerandom " | |
name = = " fillanddeleteuniquerandom " ) {
fresh_db = true ;
if ( num_threads > 1 ) {
fprintf ( stderr ,
" filluniquerandom multithreaded not supported "
" , use 1 thread " ) ;
" filluniquerandom and fillanddeleteuniquerandom "
" multithreaded not supported , use 1 thread" ) ;
num_threads = 1 ;
}
method = & Benchmark : : WriteUniqueRandom ;
@ -4663,6 +4701,13 @@ class Benchmark {
return std : : numeric_limits < uint64_t > : : max ( ) ;
}
// Only available for UNIQUE_RANDOM mode.
uint64_t Fetch ( uint64_t index ) {
assert ( mode_ = = UNIQUE_RANDOM ) ;
assert ( index < values_ . size ( ) ) ;
return values_ [ index ] ;
}
private :
Random64 * rand_ ;
WriteMode mode_ ;
@ -4734,7 +4779,7 @@ class Benchmark {
std : : unique_ptr < const char [ ] > end_key_guard ;
Slice end_key = AllocateKey ( & end_key_guard ) ;
double p = 0.0 ;
uint64_t num_overwrites = 0 , num_unique_keys = 0 ;
uint64_t num_overwrites = 0 , num_unique_keys = 0 , num_selective_deletes = 0 ;
// If user set overwrite_probability flag,
// check if value is in [0.0,1.0].
if ( FLAGS_overwrite_probability > 0.0 ) {
@ -4763,6 +4808,49 @@ class Benchmark {
std : : deque < int64_t > inserted_key_window ;
Random64 reservoir_id_gen ( FLAGS_seed ) ;
// --- Variables used in disposable/persistent keys simulation:
// The following variables are used when
// disposable_entries_batch_size is >0. We simualte a workload
// where the following sequence is repeated multiple times:
// "A set of keys S1 is inserted ('disposable entries'), then after
// some delay another set of keys S2 is inserted ('persistent entries')
// and the first set of keys S1 is deleted. S2 artificially represents
// the insertion of hypothetical results from some undefined computation
// done on the first set of keys S1. The next sequence can start as soon
// as the last disposable entry in the set S1 of this sequence is
// inserted, if the delay is non negligible"
bool skip_for_loop = false , is_disposable_entry = true ;
std : : vector < uint64_t > disposable_entries_index ( num_key_gens , 0 ) ;
std : : vector < uint64_t > persistent_ent_and_del_index ( num_key_gens , 0 ) ;
const uint64_t kNumDispAndPersEntries =
FLAGS_disposable_entries_batch_size +
FLAGS_persistent_entries_batch_size ;
if ( kNumDispAndPersEntries > 0 ) {
if ( ( write_mode ! = UNIQUE_RANDOM ) | | ( writes_per_range_tombstone_ > 0 ) | |
( p > 0.0 ) ) {
fprintf (
stderr ,
" Disposable/persistent deletes are not compatible with overwrites "
" and DeleteRanges; and are only supported in filluniquerandom. \n " ) ;
ErrorExit ( ) ;
}
if ( FLAGS_disposable_entries_value_size < 0 | |
FLAGS_persistent_entries_value_size < 0 ) {
fprintf (
stderr ,
" disposable_entries_value_size and persistent_entries_value_size "
" have to be positive. \n " ) ;
ErrorExit ( ) ;
}
}
Random rnd_disposable_entry ( static_cast < uint32_t > ( FLAGS_seed ) ) ;
std : : string random_value ;
// Queue that stores scheduled timestamp of disposable entries deletes,
// along with starting index of disposable entry keys to delete.
std : : vector < std : : queue < std : : pair < uint64_t , uint64_t > > > disposable_entries_q (
num_key_gens ) ;
// --- End of variables used in disposable/persistent keys simulation.
std : : vector < std : : unique_ptr < const char [ ] > > expanded_key_guards ;
std : : vector < Slice > expanded_keys ;
if ( FLAGS_expand_range_tombstones ) {
@ -4814,11 +4902,101 @@ class Benchmark {
inserted_key_window . push_back ( rand_num ) ;
}
}
} else if ( kNumDispAndPersEntries > 0 ) {
// Check if queue is non-empty and if we need to insert
// 'persistent' KV entries (KV entries that are never deleted)
// and delete disposable entries previously inserted.
if ( ! disposable_entries_q [ id ] . empty ( ) & &
( disposable_entries_q [ id ] . front ( ) . first <
FLAGS_env - > NowMicros ( ) ) ) {
// If we need to perform a "merge op" pattern,
// we first write all the persistent KV entries not targeted
// by deletes, and then we write the disposable entries deletes.
if ( persistent_ent_and_del_index [ id ] <
FLAGS_persistent_entries_batch_size ) {
// Generate key to insert.
rand_num =
key_gens [ id ] - > Fetch ( disposable_entries_q [ id ] . front ( ) . second +
FLAGS_disposable_entries_batch_size +
persistent_ent_and_del_index [ id ] ) ;
persistent_ent_and_del_index [ id ] + + ;
is_disposable_entry = false ;
skip_for_loop = false ;
} else if ( persistent_ent_and_del_index [ id ] <
kNumDispAndPersEntries ) {
// Find key of the entry to delete.
rand_num =
key_gens [ id ] - > Fetch ( disposable_entries_q [ id ] . front ( ) . second +
( persistent_ent_and_del_index [ id ] -
FLAGS_persistent_entries_batch_size ) ) ;
persistent_ent_and_del_index [ id ] + + ;
GenerateKeyFromInt ( rand_num , FLAGS_num , & key ) ;
// For the delete operation, everything happens here and we
// skip the rest of the for-loop, which is designed for
// inserts.
if ( FLAGS_num_column_families < = 1 ) {
batch . Delete ( key ) ;
} else {
// We use same rand_num as seed for key and column family so
// that we can deterministically find the cfh corresponding to a
// particular key while reading the key.
batch . Delete ( db_with_cfh - > GetCfh ( rand_num ) , key ) ;
}
// A delete only includes Key+Timestamp (no value).
batch_bytes + = key_size_ + user_timestamp_size_ ;
bytes + = key_size_ + user_timestamp_size_ ;
num_selective_deletes + + ;
// Skip rest of the for-loop (j=0, j<entries_per_batch_,j++).
skip_for_loop = true ;
} else {
assert ( false ) ; // should never reach this point.
}
// If disposable_entries_q needs to be updated (ie: when a selective
// insert+delete was successfully completed, pop the job out of the
// queue).
if ( ! disposable_entries_q [ id ] . empty ( ) & &
( disposable_entries_q [ id ] . front ( ) . first <
FLAGS_env - > NowMicros ( ) ) & &
persistent_ent_and_del_index [ id ] = = kNumDispAndPersEntries ) {
disposable_entries_q [ id ] . pop ( ) ;
persistent_ent_and_del_index [ id ] = 0 ;
}
// If we are deleting disposable entries, skip the rest of the
// for-loop since there is no key-value inserts at this moment in
// time.
if ( skip_for_loop ) {
continue ;
}
}
// If no job is in the queue, then we keep inserting disposable KV
// entries that will be deleted later by a series of deletes.
else {
rand_num = key_gens [ id ] - > Fetch ( disposable_entries_index [ id ] ) ;
disposable_entries_index [ id ] + + ;
is_disposable_entry = true ;
if ( ( disposable_entries_index [ id ] %
FLAGS_disposable_entries_batch_size ) = = 0 ) {
// Skip the persistent KV entries inserts for now
disposable_entries_index [ id ] + =
FLAGS_persistent_entries_batch_size ;
}
}
} else {
rand_num = key_gens [ id ] - > Next ( ) ;
}
GenerateKeyFromInt ( rand_num , FLAGS_num , & key ) ;
Slice val = gen . Generate ( ) ;
Slice val ;
if ( kNumDispAndPersEntries > 0 ) {
random_value = rnd_disposable_entry . RandomString (
is_disposable_entry ? FLAGS_disposable_entries_value_size
: FLAGS_persistent_entries_value_size ) ;
val = Slice ( random_value ) ;
num_unique_keys + + ;
} else {
val = gen . Generate ( ) ;
}
if ( use_blob_db_ ) {
# ifndef ROCKSDB_LITE
// Stacked BlobDB
@ -4843,6 +5021,23 @@ class Benchmark {
batch_bytes + = val . size ( ) + key_size_ + user_timestamp_size_ ;
bytes + = val . size ( ) + key_size_ + user_timestamp_size_ ;
+ + num_written ;
// If all disposable entries have been inserted, then we need to
// add in the job queue a call for 'persistent entry insertions +
// disposable entry deletions'.
if ( kNumDispAndPersEntries > 0 & & is_disposable_entry & &
( ( disposable_entries_index [ id ] % kNumDispAndPersEntries ) = = 0 ) ) {
// Queue contains [timestamp, starting_idx],
// timestamp = current_time + delay (minimum aboslute time when to
// start inserting the selective deletes) starting_idx = index in the
// keygen of the rand_num to generate the key of the first KV entry to
// delete (= key of the first selective delete).
disposable_entries_q [ id ] . push ( std : : make_pair (
FLAGS_env - > NowMicros ( ) +
FLAGS_disposable_entries_delete_delay /* timestamp */ ,
disposable_entries_index [ id ] - kNumDispAndPersEntries
/*starting idx*/ ) ) ;
}
if ( writes_per_range_tombstone_ > 0 & &
num_written > writes_before_delete_range_ & &
( num_written - writes_before_delete_range_ ) /
@ -4946,9 +5141,14 @@ class Benchmark {
}
if ( ( write_mode = = UNIQUE_RANDOM ) & & ( p > 0.0 ) ) {
fprintf ( stdout ,
" Number of unique keys inerted: % " PRIu64
" Number of unique keys ins erted: % " PRIu64
" . \n Number of overwrites: % " PRIu64 " \n " ,
num_unique_keys , num_overwrites ) ;
} else if ( kNumDispAndPersEntries > 0 ) {
fprintf ( stdout ,
" Number of unique keys inserted (disposable+persistent): % " PRIu64
" . \n Number of 'disposable entry delete': % " PRIu64 " \n " ,
num_written , num_selective_deletes ) ;
}
thread - > stats . AddBytes ( bytes ) ;
}