@ -13,6 +13,7 @@
# include "rocksdb/write_batch.h"
# include "util/coding.h"
# include "util/string_util.h"
# include "util/threadpool_imp.h"
namespace rocksdb {
@ -173,6 +174,7 @@ Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
: trace_reader_ ( std : : move ( reader ) ) {
assert ( db ! = nullptr ) ;
db_ = static_cast < DBImpl * > ( db - > GetRootDB ( ) ) ;
env_ = Env : : Default ( ) ;
for ( ColumnFamilyHandle * cfh : handles ) {
cf_map_ [ cfh - > GetID ( ) ] = cfh ;
}
@ -285,6 +287,78 @@ Status Replayer::Replay() {
return s ;
}
// The trace can be replayed with multithread by configurnge the number of
// threads in the thread pool. Trace records are read from the trace file
// sequentially and the corresponding queries are scheduled in the task
// queue based on the timestamp. Currently, we support Write_batch (Put,
// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
Status Replayer : : MultiThreadReplay ( uint32_t threads_num ) {
Status s ;
Trace header ;
s = ReadHeader ( & header ) ;
if ( ! s . ok ( ) ) {
return s ;
}
ThreadPoolImpl thread_pool ;
thread_pool . SetHostEnv ( env_ ) ;
if ( threads_num > 1 ) {
thread_pool . SetBackgroundThreads ( static_cast < int > ( threads_num ) ) ;
} else {
thread_pool . SetBackgroundThreads ( 1 ) ;
}
std : : chrono : : system_clock : : time_point replay_epoch =
std : : chrono : : system_clock : : now ( ) ;
WriteOptions woptions ;
ReadOptions roptions ;
ReplayerWorkerArg * ra ;
uint64_t ops = 0 ;
while ( s . ok ( ) ) {
ra = new ReplayerWorkerArg ;
ra - > db = db_ ;
s = ReadTrace ( & ( ra - > trace_entry ) ) ;
if ( ! s . ok ( ) ) {
break ;
}
ra - > woptions = woptions ;
ra - > roptions = roptions ;
std : : this_thread : : sleep_until (
replay_epoch + std : : chrono : : microseconds (
( ra - > trace_entry . ts - header . ts ) / fast_forward_ ) ) ;
if ( ra - > trace_entry . type = = kTraceWrite ) {
thread_pool . Schedule ( & Replayer : : BGWorkWriteBatch , ra , nullptr , nullptr ) ;
ops + + ;
} else if ( ra - > trace_entry . type = = kTraceGet ) {
thread_pool . Schedule ( & Replayer : : BGWorkGet , ra , nullptr , nullptr ) ;
ops + + ;
} else if ( ra - > trace_entry . type = = kTraceIteratorSeek ) {
thread_pool . Schedule ( & Replayer : : BGWorkIterSeek , ra , nullptr , nullptr ) ;
ops + + ;
} else if ( ra - > trace_entry . type = = kTraceIteratorSeekForPrev ) {
thread_pool . Schedule ( & Replayer : : BGWorkIterSeekForPrev , ra , nullptr ,
nullptr ) ;
ops + + ;
} else if ( ra - > trace_entry . type = = kTraceEnd ) {
// Do nothing for now.
// TODO: Add some validations later.
delete ra ;
break ;
}
}
if ( s . IsIncomplete ( ) ) {
// Reaching eof returns Incomplete status at the moment.
// Could happen when killing a process without calling EndTrace() API.
// TODO: Add better error handling.
s = Status : : OK ( ) ;
}
thread_pool . JoinAllThreads ( ) ;
return s ;
}
Status Replayer : : ReadHeader ( Trace * header ) {
assert ( header ! = nullptr ) ;
Status s = ReadTrace ( header ) ;
@ -325,4 +399,82 @@ Status Replayer::ReadTrace(Trace* trace) {
return TracerHelper : : DecodeTrace ( encoded_trace , trace ) ;
}
void Replayer : : BGWorkGet ( void * arg ) {
std : : unique_ptr < ReplayerWorkerArg > ra (
reinterpret_cast < ReplayerWorkerArg * > ( arg ) ) ;
auto cf_map = static_cast < std : : unordered_map < uint32_t , ColumnFamilyHandle * > * > (
ra - > cf_map ) ;
uint32_t cf_id = 0 ;
Slice key ;
DecodeCFAndKey ( ra - > trace_entry . payload , & cf_id , & key ) ;
if ( cf_id > 0 & & cf_map - > find ( cf_id ) = = cf_map - > end ( ) ) {
return ;
}
std : : string value ;
if ( cf_id = = 0 ) {
ra - > db - > Get ( ra - > roptions , key , & value ) ;
} else {
ra - > db - > Get ( ra - > roptions , ( * cf_map ) [ cf_id ] , key , & value ) ;
}
return ;
}
void Replayer : : BGWorkWriteBatch ( void * arg ) {
std : : unique_ptr < ReplayerWorkerArg > ra (
reinterpret_cast < ReplayerWorkerArg * > ( arg ) ) ;
WriteBatch batch ( ra - > trace_entry . payload ) ;
ra - > db - > Write ( ra - > woptions , & batch ) ;
return ;
}
void Replayer : : BGWorkIterSeek ( void * arg ) {
std : : unique_ptr < ReplayerWorkerArg > ra (
reinterpret_cast < ReplayerWorkerArg * > ( arg ) ) ;
auto cf_map = static_cast < std : : unordered_map < uint32_t , ColumnFamilyHandle * > * > (
ra - > cf_map ) ;
uint32_t cf_id = 0 ;
Slice key ;
DecodeCFAndKey ( ra - > trace_entry . payload , & cf_id , & key ) ;
if ( cf_id > 0 & & cf_map - > find ( cf_id ) = = cf_map - > end ( ) ) {
return ;
}
std : : string value ;
Iterator * single_iter = nullptr ;
if ( cf_id = = 0 ) {
single_iter = ra - > db - > NewIterator ( ra - > roptions ) ;
} else {
single_iter = ra - > db - > NewIterator ( ra - > roptions , ( * cf_map ) [ cf_id ] ) ;
}
single_iter - > Seek ( key ) ;
delete single_iter ;
return ;
}
void Replayer : : BGWorkIterSeekForPrev ( void * arg ) {
std : : unique_ptr < ReplayerWorkerArg > ra (
reinterpret_cast < ReplayerWorkerArg * > ( arg ) ) ;
auto cf_map = static_cast < std : : unordered_map < uint32_t , ColumnFamilyHandle * > * > (
ra - > cf_map ) ;
uint32_t cf_id = 0 ;
Slice key ;
DecodeCFAndKey ( ra - > trace_entry . payload , & cf_id , & key ) ;
if ( cf_id > 0 & & cf_map - > find ( cf_id ) = = cf_map - > end ( ) ) {
return ;
}
std : : string value ;
Iterator * single_iter = nullptr ;
if ( cf_id = = 0 ) {
single_iter = ra - > db - > NewIterator ( ra - > roptions ) ;
} else {
single_iter = ra - > db - > NewIterator ( ra - > roptions , ( * cf_map ) [ cf_id ] ) ;
}
single_iter - > SeekForPrev ( key ) ;
delete single_iter ;
return ;
}
} // namespace rocksdb