Added trace replay fast forward function (#5273)

Summary:
In the current db_bench trace replay, the replay process strictly follows the timestamp to issue the queries. In some cases, user does not care about the time. Therefore, fast forward is needed for users to speed up the replay process.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5273

Differential Revision: D15389232

Pulled By: zhichao-cao

fbshipit-source-id: 735d629b9d2a167b05af3e4fa0ddf9d5d0be1806
main
Zhichao Cao 7 years ago committed by Facebook Github Bot
parent c71f5bb9aa
commit a13026fb2f
  1. 5
      tools/db_bench_tool.cc
  2. 15
      util/trace_replay.cc
  3. 2
      util/trace_replay.h

@ -762,6 +762,9 @@ DEFINE_bool(use_stderr_info_logger, false,
DEFINE_string(trace_file, "", "Trace workload to a file. "); DEFINE_string(trace_file, "", "Trace workload to a file. ");
DEFINE_int32(trace_replay_fast_forward, 1,
"Fast forward trace replay, must >= 1. ");
static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
assert(ctype); assert(ctype);
@ -6163,6 +6166,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
} }
Replayer replayer(db_with_cfh->db, db_with_cfh->cfh, Replayer replayer(db_with_cfh->db, db_with_cfh->cfh,
std::move(trace_reader)); std::move(trace_reader));
replayer.SetFastForward(
static_cast<uint32_t>(FLAGS_trace_replay_fast_forward));
s = replayer.Replay(); s = replayer.Replay();
if (s.ok()) { if (s.ok()) {
fprintf(stdout, "Replay started from trace_file: %s\n", fprintf(stdout, "Replay started from trace_file: %s\n",

@ -155,10 +155,22 @@ Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
for (ColumnFamilyHandle* cfh : handles) { for (ColumnFamilyHandle* cfh : handles) {
cf_map_[cfh->GetID()] = cfh; cf_map_[cfh->GetID()] = cfh;
} }
fast_forward_ = 1;
} }
Replayer::~Replayer() { trace_reader_.reset(); } Replayer::~Replayer() { trace_reader_.reset(); }
Status Replayer::SetFastForward(uint32_t fast_forward) {
Status s;
if (fast_forward < 1) {
s = Status::InvalidArgument("Wrong fast forward speed!");
} else {
fast_forward_ = fast_forward;
s = Status::OK();
}
return s;
}
Status Replayer::Replay() { Status Replayer::Replay() {
Status s; Status s;
Trace header; Trace header;
@ -182,7 +194,8 @@ Status Replayer::Replay() {
} }
std::this_thread::sleep_until( std::this_thread::sleep_until(
replay_epoch + std::chrono::microseconds(trace.ts - header.ts)); replay_epoch +
std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
if (trace.type == kTraceWrite) { if (trace.type == kTraceWrite) {
WriteBatch batch(trace.payload); WriteBatch batch(trace.payload);
db_->Write(woptions, &batch); db_->Write(woptions, &batch);

@ -88,6 +88,7 @@ class Replayer {
~Replayer(); ~Replayer();
Status Replay(); Status Replay();
Status SetFastForward(uint32_t fast_forward);
private: private:
Status ReadHeader(Trace* header); Status ReadHeader(Trace* header);
@ -97,6 +98,7 @@ class Replayer {
DBImpl* db_; DBImpl* db_;
std::unique_ptr<TraceReader> trace_reader_; std::unique_ptr<TraceReader> trace_reader_;
std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_; std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
uint32_t fast_forward_;
}; };
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save