Allow Replayer to report the results of TraceRecords. (#8657)

Summary:
`Replayer::Execute()` can directly returns the result (e.g, request latency, DB::Get() return code, returned value, etc.)
`Replayer::Replay()` reports the results via a callback function.

New interface:
`TraceRecordResult` in "rocksdb/trace_record_result.h".

`DBTest2.TraceAndReplay` and `DBTest2.TraceAndManualReplay` are updated accordingly.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8657

Reviewed By: ajkr

Differential Revision: D30290216

Pulled By: autopear

fbshipit-source-id: 3c8d4e6b180ec743de1a9d9dcaee86064c74f0d6
main
Merlin Mao 3 years ago committed by Facebook GitHub Bot
parent b6269b078a
commit d10801e983
  1. 1
      CMakeLists.txt
  2. 2
      HISTORY.md
  3. 2
      TARGETS
  4. 250
      db/db_test2.cc
  5. 69
      include/rocksdb/trace_record.h
  6. 178
      include/rocksdb/trace_record_result.h
  7. 25
      include/rocksdb/utilities/replayer.h
  8. 1
      src.mk
  9. 3
      tools/db_bench_tool.cc
  10. 38
      trace_replay/trace_record.cc
  11. 85
      trace_replay/trace_record_handler.cc
  12. 15
      trace_replay/trace_record_handler.h
  13. 106
      trace_replay/trace_record_result.cc
  14. 17
      trace_replay/trace_replay.cc
  15. 87
      utilities/trace/replayer_impl.cc
  16. 27
      utilities/trace/replayer_impl.h

@ -819,6 +819,7 @@ set(SOURCES
trace_replay/block_cache_tracer.cc trace_replay/block_cache_tracer.cc
trace_replay/io_tracer.cc trace_replay/io_tracer.cc
trace_replay/trace_record_handler.cc trace_replay/trace_record_handler.cc
trace_replay/trace_record_result.cc
trace_replay/trace_record.cc trace_replay/trace_record.cc
trace_replay/trace_replay.cc trace_replay/trace_replay.cc
util/coding.cc util/coding.cc

@ -22,7 +22,7 @@
* The integrated BlobDB implementation now supports the tickers `BLOB_DB_BLOB_FILE_BYTES_READ`, `BLOB_DB_GC_NUM_KEYS_RELOCATED`, and `BLOB_DB_GC_BYTES_RELOCATED`, as well as the histograms `BLOB_DB_COMPRESSION_MICROS` and `BLOB_DB_DECOMPRESSION_MICROS`. * The integrated BlobDB implementation now supports the tickers `BLOB_DB_BLOB_FILE_BYTES_READ`, `BLOB_DB_GC_NUM_KEYS_RELOCATED`, and `BLOB_DB_GC_BYTES_RELOCATED`, as well as the histograms `BLOB_DB_COMPRESSION_MICROS` and `BLOB_DB_DECOMPRESSION_MICROS`.
## Public API change ## Public API change
* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Added `TraceReader::Reset()` to restart reading a trace file. Created trace_record.h and utilities/replayer.h files to access decoded Trace records and replay them. * Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Added `TraceReader::Reset()` to restart reading a trace file. Created trace_record.h, trace_record_result.h and utilities/replayer.h files to access the decoded Trace records, replay them, and query the actual operation results.
### Performance Improvements ### Performance Improvements
* Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value. * Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value.

@ -337,6 +337,7 @@ cpp_library(
"trace_replay/io_tracer.cc", "trace_replay/io_tracer.cc",
"trace_replay/trace_record.cc", "trace_replay/trace_record.cc",
"trace_replay/trace_record_handler.cc", "trace_replay/trace_record_handler.cc",
"trace_replay/trace_record_result.cc",
"trace_replay/trace_replay.cc", "trace_replay/trace_replay.cc",
"util/build_version.cc", "util/build_version.cc",
"util/coding.cc", "util/coding.cc",
@ -655,6 +656,7 @@ cpp_library(
"trace_replay/io_tracer.cc", "trace_replay/io_tracer.cc",
"trace_replay/trace_record.cc", "trace_replay/trace_record.cc",
"trace_replay/trace_record_handler.cc", "trace_replay/trace_record_handler.cc",
"trace_replay/trace_record_result.cc",
"trace_replay/trace_replay.cc", "trace_replay/trace_replay.cc",
"util/build_version.cc", "util/build_version.cc",
"util/coding.cc", "util/coding.cc",

@ -6,6 +6,7 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <atomic> #include <atomic>
#include <cstdlib> #include <cstdlib>
#include <functional> #include <functional>
@ -17,6 +18,8 @@
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/persistent_cache.h" #include "rocksdb/persistent_cache.h"
#include "rocksdb/trace_record.h"
#include "rocksdb/trace_record_result.h"
#include "rocksdb/utilities/replayer.h" #include "rocksdb/utilities/replayer.h"
#include "rocksdb/wal_filter.h" #include "rocksdb/wal_filter.h"
#include "util/random.h" #include "util/random.h"
@ -4236,6 +4239,106 @@ TEST_F(DBTest2, TestNumPread) {
ASSERT_EQ(0, env_->random_file_open_counter_.load()); ASSERT_EQ(0, env_->random_file_open_counter_.load());
} }
class TraceExecutionResultHandler : public TraceRecordResult::Handler {
public:
TraceExecutionResultHandler() {}
~TraceExecutionResultHandler() override {}
virtual Status Handle(const StatusOnlyTraceExecutionResult& result) override {
if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
return Status::InvalidArgument("Invalid timestamps.");
}
result.GetStatus().PermitUncheckedError();
switch (result.GetTraceType()) {
case kTraceWrite: {
total_latency_ += result.GetLatency();
cnt_++;
writes_++;
break;
}
case kTraceIteratorSeek:
case kTraceIteratorSeekForPrev: {
total_latency_ += result.GetLatency();
cnt_++;
seeks_++;
break;
}
default:
return Status::Corruption("Type mismatch.");
}
return Status::OK();
}
virtual Status Handle(
const SingleValueTraceExecutionResult& result) override {
if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
return Status::InvalidArgument("Invalid timestamps.");
}
result.GetStatus().PermitUncheckedError();
switch (result.GetTraceType()) {
case kTraceGet: {
total_latency_ += result.GetLatency();
cnt_++;
gets_++;
break;
}
default:
return Status::Corruption("Type mismatch.");
}
return Status::OK();
}
virtual Status Handle(
const MultiValuesTraceExecutionResult& result) override {
if (result.GetStartTimestamp() > result.GetEndTimestamp()) {
return Status::InvalidArgument("Invalid timestamps.");
}
for (const Status& s : result.GetMultiStatus()) {
s.PermitUncheckedError();
}
switch (result.GetTraceType()) {
case kTraceMultiGet: {
total_latency_ += result.GetLatency();
cnt_++;
multigets_++;
break;
}
default:
return Status::Corruption("Type mismatch.");
}
return Status::OK();
}
void Reset() {
total_latency_ = 0;
cnt_ = 0;
writes_ = 0;
gets_ = 0;
seeks_ = 0;
multigets_ = 0;
}
double GetAvgLatency() const {
return cnt_ == 0 ? 0.0 : 1.0 * total_latency_ / cnt_;
}
int GetNumWrites() const { return writes_; }
int GetNumGets() const { return gets_; }
int GetNumIterSeeks() const { return seeks_; }
int GetNumMultiGets() const { return multigets_; }
private:
std::atomic<uint64_t> total_latency_{0};
std::atomic<uint32_t> cnt_{0};
std::atomic<int> writes_{0};
std::atomic<int> gets_{0};
std::atomic<int> seeks_{0};
std::atomic<int> multigets_{0};
};
TEST_F(DBTest2, TraceAndReplay) { TEST_F(DBTest2, TraceAndReplay) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreatePutOperator(); options.merge_operator = MergeOperators::CreatePutOperator();
@ -4254,12 +4357,14 @@ TEST_F(DBTest2, TraceAndReplay) {
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer)); ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer))); ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
// 5 Writes
ASSERT_OK(Put(0, "a", "1")); ASSERT_OK(Put(0, "a", "1"));
ASSERT_OK(Merge(0, "b", "2")); ASSERT_OK(Merge(0, "b", "2"));
ASSERT_OK(Delete(0, "c")); ASSERT_OK(Delete(0, "c"));
ASSERT_OK(SingleDelete(0, "d")); ASSERT_OK(SingleDelete(0, "d"));
ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f")); ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
// 6th Write
WriteBatch batch; WriteBatch batch;
ASSERT_OK(batch.Put("f", "11")); ASSERT_OK(batch.Put("f", "11"));
ASSERT_OK(batch.Merge("g", "12")); ASSERT_OK(batch.Merge("g", "12"));
@ -4268,19 +4373,23 @@ TEST_F(DBTest2, TraceAndReplay) {
ASSERT_OK(batch.DeleteRange("j", "k")); ASSERT_OK(batch.DeleteRange("j", "k"));
ASSERT_OK(db_->Write(wo, &batch)); ASSERT_OK(db_->Write(wo, &batch));
// 2 Seek(ForPrev)s
single_iter = db_->NewIterator(ro); single_iter = db_->NewIterator(ro);
single_iter->Seek("f"); single_iter->Seek("f"); // Seek 1
single_iter->SeekForPrev("g"); single_iter->SeekForPrev("g");
ASSERT_OK(single_iter->status()); ASSERT_OK(single_iter->status());
delete single_iter; delete single_iter;
// 2 Gets
ASSERT_EQ("1", Get(0, "a")); ASSERT_EQ("1", Get(0, "a"));
ASSERT_EQ("12", Get(0, "g")); ASSERT_EQ("12", Get(0, "g"));
// 7th and 8th Write, 3rd Get
ASSERT_OK(Put(1, "foo", "bar")); ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(1, "rocksdb", "rocks")); ASSERT_OK(Put(1, "rocksdb", "rocks"));
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb")); ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
// Total Write x 8, Get x 3, Seek x 2.
ASSERT_OK(db_->EndTrace()); ASSERT_OK(db_->EndTrace());
// These should not get into the trace file as it is after EndTrace. // These should not get into the trace file as it is after EndTrace.
ASSERT_OK(Put("hello", "world")); ASSERT_OK(Put("hello", "world"));
@ -4324,13 +4433,30 @@ TEST_F(DBTest2, TraceAndReplay) {
std::unique_ptr<Replayer> replayer; std::unique_ptr<Replayer> replayer;
ASSERT_OK( ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
TraceExecutionResultHandler res_handler;
std::function<void(Status, std::unique_ptr<TraceRecordResult> &&)> res_cb =
[&res_handler](Status exec_s, std::unique_ptr<TraceRecordResult>&& res) {
ASSERT_TRUE(exec_s.ok() || exec_s.IsNotSupported());
if (res != nullptr) {
ASSERT_OK(res->Accept(&res_handler));
res.reset();
}
};
// Unprepared replay should fail with Status::Incomplete() // Unprepared replay should fail with Status::Incomplete()
ASSERT_TRUE(replayer->Replay().IsIncomplete()); ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
ASSERT_OK(replayer->Prepare()); ASSERT_OK(replayer->Prepare());
// Ok to repeatedly Prepare(). // Ok to repeatedly Prepare().
ASSERT_OK(replayer->Prepare()); ASSERT_OK(replayer->Prepare());
// Replay using 1 thread, 1x speed. // Replay using 1 thread, 1x speed.
ASSERT_OK(replayer->Replay()); ASSERT_OK(replayer->Replay(ReplayOptions(1, 1.0), res_cb));
ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 8);
ASSERT_EQ(res_handler.GetNumGets(), 3);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
ASSERT_OK(db2->Get(ro, handles[0], "a", &value)); ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
ASSERT_EQ("1", value); ASSERT_EQ("1", value);
@ -4346,15 +4472,28 @@ TEST_F(DBTest2, TraceAndReplay) {
// Re-replay should fail with Status::Incomplete() if Prepare() was not // Re-replay should fail with Status::Incomplete() if Prepare() was not
// called. Currently we don't distinguish between unprepared and trace end. // called. Currently we don't distinguish between unprepared and trace end.
ASSERT_TRUE(replayer->Replay().IsIncomplete()); ASSERT_TRUE(replayer->Replay(ReplayOptions(), nullptr).IsIncomplete());
// Re-replay using 2 threads, 2x speed. // Re-replay using 2 threads, 2x speed.
ASSERT_OK(replayer->Prepare()); ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0))); ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0), res_cb));
ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 8);
ASSERT_EQ(res_handler.GetNumGets(), 3);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
// Re-replay using 2 threads, 1/2 speed. // Re-replay using 2 threads, 1/2 speed.
ASSERT_OK(replayer->Prepare()); ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5))); ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5), res_cb));
ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 8);
ASSERT_EQ(res_handler.GetNumGets(), 3);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
replayer.reset(); replayer.reset();
for (auto handle : handles) { for (auto handle : handles) {
@ -4408,6 +4547,7 @@ TEST_F(DBTest2, TraceAndManualReplay) {
ASSERT_OK(Put(1, "rocksdb", "rocks")); ASSERT_OK(Put(1, "rocksdb", "rocks"));
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb")); ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
// Same as TraceAndReplay, Write x 8, Get x 3, Seek x 2.
ASSERT_OK(db_->EndTrace()); ASSERT_OK(db_->EndTrace());
// These should not get into the trace file as it is after EndTrace. // These should not get into the trace file as it is after EndTrace.
ASSERT_OK(Put("hello", "world")); ASSERT_OK(Put("hello", "world"));
@ -4452,8 +4592,11 @@ TEST_F(DBTest2, TraceAndManualReplay) {
ASSERT_OK( ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
TraceExecutionResultHandler res_handler;
// Manual replay for 2 times. The 2nd checks if the replay can restart. // Manual replay for 2 times. The 2nd checks if the replay can restart.
std::unique_ptr<TraceRecord> record; std::unique_ptr<TraceRecord> record;
std::unique_ptr<TraceRecordResult> result;
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
// Next should fail if unprepared. // Next should fail if unprepared.
ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete()); ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
@ -4467,13 +4610,23 @@ TEST_F(DBTest2, TraceAndManualReplay) {
continue; continue;
} }
if (s.ok()) { if (s.ok()) {
ASSERT_OK(replayer->Execute(std::move(record))); ASSERT_OK(replayer->Execute(record, &result));
if (result != nullptr) {
ASSERT_OK(result->Accept(&res_handler));
result.reset();
}
} }
} }
// Status::Incomplete() will be returned when manually reading the trace // Status::Incomplete() will be returned when manually reading the trace
// end, or Prepare() was not called. // end, or Prepare() was not called.
ASSERT_TRUE(s.IsIncomplete()); ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete()); ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 8);
ASSERT_EQ(res_handler.GetNumGets(), 3);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 2);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
} }
ASSERT_OK(db2->Get(ro, handles[0], "a", &value)); ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
@ -4495,25 +4648,44 @@ TEST_F(DBTest2, TraceAndManualReplay) {
ASSERT_OK(batch.Put("trace-record-write1", "write1")); ASSERT_OK(batch.Put("trace-record-write1", "write1"));
ASSERT_OK(batch.Put("trace-record-write2", "write2")); ASSERT_OK(batch.Put("trace-record-write2", "write2"));
record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++)); record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record))); ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); // Write x 1
ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value)); ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value));
ASSERT_EQ("write1", value); ASSERT_EQ("write1", value);
ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value)); ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value));
ASSERT_EQ("write2", value); ASSERT_EQ("write2", value);
ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 1);
ASSERT_EQ(res_handler.GetNumGets(), 0);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
// Get related // Get related
// Get an existing key. // Get an existing key.
record.reset(new GetQueryTraceRecord(handles[0]->GetID(), record.reset(new GetQueryTraceRecord(handles[0]->GetID(),
"trace-record-write1", fake_ts++)); "trace-record-write1", fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record))); ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); // Get x 1
// Get an non-existing key, should still return Status::OK(). // Get an non-existing key, should still return Status::OK().
record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get", record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get",
fake_ts++)); fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record))); ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); // Get x 2
// Get from an invalid (non-existing) cf_id. // Get from an invalid (non-existing) cf_id.
uint32_t invalid_cf_id = handles[1]->GetID() + 1; uint32_t invalid_cf_id = handles[1]->GetID() + 1;
record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++)); record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++));
ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption()); ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
ASSERT_TRUE(result == nullptr);
ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 0);
ASSERT_EQ(res_handler.GetNumGets(), 2);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
// Iteration related // Iteration related
for (IteratorSeekQueryTraceRecord::SeekType seekType : for (IteratorSeekQueryTraceRecord::SeekType seekType :
@ -4522,48 +4694,82 @@ TEST_F(DBTest2, TraceAndManualReplay) {
// Seek to an existing key. // Seek to an existing key.
record.reset(new IteratorSeekQueryTraceRecord( record.reset(new IteratorSeekQueryTraceRecord(
seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++)); seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record))); ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); // Seek x 1 in one iteration
// Seek to an non-existing key, should still return Status::OK(). // Seek to an non-existing key, should still return Status::OK().
record.reset(new IteratorSeekQueryTraceRecord( record.reset(new IteratorSeekQueryTraceRecord(
seekType, handles[0]->GetID(), "trace-record-get", fake_ts++)); seekType, handles[0]->GetID(), "trace-record-get", fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record))); ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); // Seek x 2 in one iteration
// Seek from an invalid cf_id. // Seek from an invalid cf_id.
record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id, record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id,
"whatever", fake_ts++)); "whatever", fake_ts++));
ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption()); ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
ASSERT_TRUE(result == nullptr);
} }
ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 0);
ASSERT_EQ(res_handler.GetNumGets(), 0);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 4); // Seek x 2 in two iterations
ASSERT_EQ(res_handler.GetNumMultiGets(), 0);
res_handler.Reset();
// MultiGet related // MultiGet related
// Get existing keys. // Get existing keys.
record.reset(new MultiGetQueryTraceRecord( record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}), std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a", "foo"}), fake_ts++)); std::vector<std::string>({"a", "foo"}), fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record))); ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 1
// Get all non-existing keys, should still return Status::OK(). // Get all non-existing keys, should still return Status::OK().
record.reset(new MultiGetQueryTraceRecord( record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}), std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"no1", "no2"}), fake_ts++)); std::vector<std::string>({"no1", "no2"}), fake_ts++));
ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 2
// Get mixed of existing and non-existing keys, should still return // Get mixed of existing and non-existing keys, should still return
// Status::OK(). // Status::OK().
record.reset(new MultiGetQueryTraceRecord( record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}), std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a", "no2"}), fake_ts++)); std::vector<std::string>({"a", "no2"}), fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record))); ASSERT_OK(replayer->Execute(record, &result));
ASSERT_TRUE(result != nullptr);
MultiValuesTraceExecutionResult* mvr =
dynamic_cast<MultiValuesTraceExecutionResult*>(result.get());
ASSERT_TRUE(mvr != nullptr);
ASSERT_OK(mvr->GetMultiStatus()[0]);
ASSERT_TRUE(mvr->GetMultiStatus()[1].IsNotFound());
ASSERT_EQ(mvr->GetValues()[0], "1");
ASSERT_EQ(mvr->GetValues()[1], "");
ASSERT_OK(result->Accept(&res_handler)); // MultiGet x 3
// Get from an invalid (non-existing) cf_id. // Get from an invalid (non-existing) cf_id.
record.reset(new MultiGetQueryTraceRecord( record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>( std::vector<uint32_t>(
{handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}), {handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}),
std::vector<std::string>({"a", "foo", "whatever"}), fake_ts++)); std::vector<std::string>({"a", "foo", "whatever"}), fake_ts++));
ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption()); ASSERT_TRUE(replayer->Execute(record, &result).IsCorruption());
ASSERT_TRUE(result == nullptr);
// Empty MultiGet // Empty MultiGet
record.reset(new MultiGetQueryTraceRecord( record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>(), std::vector<std::string>(), fake_ts++)); std::vector<uint32_t>(), std::vector<std::string>(), fake_ts++));
ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument()); ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
ASSERT_TRUE(result == nullptr);
// MultiGet size mismatch // MultiGet size mismatch
record.reset(new MultiGetQueryTraceRecord( record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}), std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a"}), fake_ts++)); std::vector<std::string>({"a"}), fake_ts++));
ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument()); ASSERT_TRUE(replayer->Execute(record, &result).IsInvalidArgument());
ASSERT_TRUE(result == nullptr);
ASSERT_GT(res_handler.GetAvgLatency(), 0.0);
ASSERT_EQ(res_handler.GetNumWrites(), 0);
ASSERT_EQ(res_handler.GetNumGets(), 0);
ASSERT_EQ(res_handler.GetNumIterSeeks(), 0);
ASSERT_EQ(res_handler.GetNumMultiGets(), 3);
res_handler.Reset();
replayer.reset(); replayer.reset();
@ -4634,7 +4840,7 @@ TEST_F(DBTest2, TraceWithLimit) {
ASSERT_OK( ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare()); ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay()); ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
replayer.reset(); replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
@ -4709,7 +4915,7 @@ TEST_F(DBTest2, TraceWithSampling) {
ASSERT_OK( ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare()); ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay()); ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
replayer.reset(); replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
@ -4813,7 +5019,7 @@ TEST_F(DBTest2, TraceWithFilter) {
ASSERT_OK( ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare()); ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay()); ASSERT_OK(replayer->Replay(ReplayOptions(), nullptr));
replayer.reset(); replayer.reset();
// All the key-values should not present since we filter out the WRITE ops. // All the key-values should not present since we filter out the WRITE ops.

@ -5,17 +5,18 @@
#pragma once #pragma once
#include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class ColumnFamilyHandle; class ColumnFamilyHandle;
class DB; class DB;
class Status;
// Supported trace record types. // Supported trace record types.
enum TraceType : char { enum TraceType : char {
@ -41,40 +42,55 @@ enum TraceType : char {
kTraceMax, kTraceMax,
}; };
class WriteQueryTraceRecord;
class GetQueryTraceRecord; class GetQueryTraceRecord;
class IteratorSeekQueryTraceRecord; class IteratorSeekQueryTraceRecord;
class MultiGetQueryTraceRecord; class MultiGetQueryTraceRecord;
class TraceRecordResult;
class WriteQueryTraceRecord;
// Base class for all types of trace records. // Base class for all types of trace records.
class TraceRecord { class TraceRecord {
public: public:
TraceRecord();
explicit TraceRecord(uint64_t timestamp); explicit TraceRecord(uint64_t timestamp);
virtual ~TraceRecord();
virtual ~TraceRecord() = default;
// Type of the trace record.
virtual TraceType GetTraceType() const = 0; virtual TraceType GetTraceType() const = 0;
// Timestamp (in microseconds) of this trace.
virtual uint64_t GetTimestamp() const; virtual uint64_t GetTimestamp() const;
class Handler { class Handler {
public: public:
virtual ~Handler() {} virtual ~Handler() = default;
// Handle WriteQueryTraceRecord
virtual Status Handle(const WriteQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) = 0;
// Handle GetQueryTraceRecord
virtual Status Handle(const GetQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) = 0;
virtual Status Handle(const WriteQueryTraceRecord& record) = 0; // Handle IteratorSeekQueryTraceRecord
virtual Status Handle(const GetQueryTraceRecord& record) = 0; virtual Status Handle(const IteratorSeekQueryTraceRecord& record,
virtual Status Handle(const IteratorSeekQueryTraceRecord& record) = 0; std::unique_ptr<TraceRecordResult>* result) = 0;
virtual Status Handle(const MultiGetQueryTraceRecord& record) = 0;
// Handle MultiGetQueryTraceRecord
virtual Status Handle(const MultiGetQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) = 0;
}; };
virtual Status Accept(Handler* handler) = 0; // Accept the handler and report the corresponding result in `result`.
virtual Status Accept(Handler* handler,
std::unique_ptr<TraceRecordResult>* result) = 0;
// Create a handler for the exeution of TraceRecord. // Create a handler for the exeution of TraceRecord.
static Handler* NewExecutionHandler( static Handler* NewExecutionHandler(
DB* db, const std::vector<ColumnFamilyHandle*>& handles); DB* db, const std::vector<ColumnFamilyHandle*>& handles);
private: private:
// Timestamp (in microseconds) of this trace.
uint64_t timestamp_; uint64_t timestamp_;
}; };
@ -82,8 +98,6 @@ class TraceRecord {
class QueryTraceRecord : public TraceRecord { class QueryTraceRecord : public TraceRecord {
public: public:
explicit QueryTraceRecord(uint64_t timestamp); explicit QueryTraceRecord(uint64_t timestamp);
virtual ~QueryTraceRecord() override;
}; };
// Trace record for DB::Write() operation. // Trace record for DB::Write() operation.
@ -97,9 +111,11 @@ class WriteQueryTraceRecord : public QueryTraceRecord {
TraceType GetTraceType() const override { return kTraceWrite; } TraceType GetTraceType() const override { return kTraceWrite; }
// rep string for the WriteBatch.
virtual Slice GetWriteBatchRep() const; virtual Slice GetWriteBatchRep() const;
virtual Status Accept(Handler* handler) override; Status Accept(Handler* handler,
std::unique_ptr<TraceRecordResult>* result) override;
private: private:
PinnableSlice rep_; PinnableSlice rep_;
@ -118,16 +134,17 @@ class GetQueryTraceRecord : public QueryTraceRecord {
TraceType GetTraceType() const override { return kTraceGet; } TraceType GetTraceType() const override { return kTraceGet; }
// Column family ID.
virtual uint32_t GetColumnFamilyID() const; virtual uint32_t GetColumnFamilyID() const;
// Key to get.
virtual Slice GetKey() const; virtual Slice GetKey() const;
virtual Status Accept(Handler* handler) override; Status Accept(Handler* handler,
std::unique_ptr<TraceRecordResult>* result) override;
private: private:
// Column family ID.
uint32_t cf_id_; uint32_t cf_id_;
// Key to get.
PinnableSlice key_; PinnableSlice key_;
}; };
@ -135,8 +152,6 @@ class GetQueryTraceRecord : public QueryTraceRecord {
class IteratorQueryTraceRecord : public QueryTraceRecord { class IteratorQueryTraceRecord : public QueryTraceRecord {
public: public:
explicit IteratorQueryTraceRecord(uint64_t timestamp); explicit IteratorQueryTraceRecord(uint64_t timestamp);
virtual ~IteratorQueryTraceRecord() override;
}; };
// Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation. // Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation.
@ -156,21 +171,24 @@ class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord {
virtual ~IteratorSeekQueryTraceRecord() override; virtual ~IteratorSeekQueryTraceRecord() override;
// Trace type matches the seek type.
TraceType GetTraceType() const override; TraceType GetTraceType() const override;
// Type of seek, Seek or SeekForPrev.
virtual SeekType GetSeekType() const; virtual SeekType GetSeekType() const;
// Column family ID.
virtual uint32_t GetColumnFamilyID() const; virtual uint32_t GetColumnFamilyID() const;
// Key to seek to.
virtual Slice GetKey() const; virtual Slice GetKey() const;
virtual Status Accept(Handler* handler) override; Status Accept(Handler* handler,
std::unique_ptr<TraceRecordResult>* result) override;
private: private:
SeekType type_; SeekType type_;
// Column family ID.
uint32_t cf_id_; uint32_t cf_id_;
// Key to seek to.
PinnableSlice key_; PinnableSlice key_;
}; };
@ -189,16 +207,17 @@ class MultiGetQueryTraceRecord : public QueryTraceRecord {
TraceType GetTraceType() const override { return kTraceMultiGet; } TraceType GetTraceType() const override { return kTraceMultiGet; }
// Column familiy IDs.
virtual std::vector<uint32_t> GetColumnFamilyIDs() const; virtual std::vector<uint32_t> GetColumnFamilyIDs() const;
// Keys to get.
virtual std::vector<Slice> GetKeys() const; virtual std::vector<Slice> GetKeys() const;
virtual Status Accept(Handler* handler) override; Status Accept(Handler* handler,
std::unique_ptr<TraceRecordResult>* result) override;
private: private:
// Column familiy IDs.
std::vector<uint32_t> cf_ids_; std::vector<uint32_t> cf_ids_;
// Keys to get.
std::vector<PinnableSlice> keys_; std::vector<PinnableSlice> keys_;
}; };

@ -0,0 +1,178 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <string>
#include <vector>
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/status.h"
#include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE {
class MultiValuesTraceExecutionResult;
class SingleValueTraceExecutionResult;
class StatusOnlyTraceExecutionResult;
// Base class for the results of all types of trace records.
// Theses classes can be used to report the execution result of
// TraceRecord::Handler::Handle() or TraceRecord::Accept().
class TraceRecordResult {
public:
explicit TraceRecordResult(TraceType trace_type);
virtual ~TraceRecordResult() = default;
// Trace type of the corresponding TraceRecord.
virtual TraceType GetTraceType() const;
class Handler {
public:
virtual ~Handler() = default;
// Handle StatusOnlyTraceExecutionResult
virtual Status Handle(const StatusOnlyTraceExecutionResult& result) = 0;
// Handle SingleValueTraceExecutionResult
virtual Status Handle(const SingleValueTraceExecutionResult& result) = 0;
// Handle MultiValuesTraceExecutionResult
virtual Status Handle(const MultiValuesTraceExecutionResult& result) = 0;
};
/*
* Example handler to just print the trace record execution results.
*
* class ResultPrintHandler : public TraceRecordResult::Handler {
* public:
* ResultPrintHandler();
* ~ResultPrintHandler() override {}
*
* Status Handle(const StatusOnlyTraceExecutionResult& result) override {
* std::cout << "Status: " << result.GetStatus().ToString() << std::endl;
* }
*
* Status Handle(const SingleValueTraceExecutionResult& result) override {
* std::cout << "Status: " << result.GetStatus().ToString()
* << ", value: " << result.GetValue() << std::endl;
* }
*
* Status Handle(const MultiValuesTraceExecutionResult& result) override {
* size_t size = result.GetMultiStatus().size();
* for (size_t i = 0; i < size; i++) {
* std::cout << "Status: " << result.GetMultiStatus()[i].ToString()
* << ", value: " << result.GetValues()[i] << std::endl;
* }
* }
* };
* */
// Accept the handler.
virtual Status Accept(Handler* handler) = 0;
private:
TraceType trace_type_;
};
// Base class for the results from the trace record execution handler (created
// by TraceRecord::NewExecutionHandler()).
//
// The actual execution status or returned values may be hidden from
// TraceRecord::Handler::Handle and TraceRecord::Accept. For example, a
// GetQueryTraceRecord's execution calls DB::Get() internally. DB::Get() may
// return Status::NotFound() but TraceRecord::Handler::Handle() or
// TraceRecord::Accept() will still return Status::OK(). The actual status from
// DB::Get() and the returned value string may be saved in a
// SingleValueTraceExecutionResult.
class TraceExecutionResult : public TraceRecordResult {
public:
TraceExecutionResult(uint64_t start_timestamp, uint64_t end_timestamp,
TraceType trace_type);
// Execution start/end timestamps and request latency in microseconds.
virtual uint64_t GetStartTimestamp() const;
virtual uint64_t GetEndTimestamp() const;
inline uint64_t GetLatency() const {
return GetEndTimestamp() - GetStartTimestamp();
}
private:
uint64_t ts_start_;
uint64_t ts_end_;
};
// Result for operations that only return a single Status.
// Example operations: DB::Write(), Iterator::Seek() and
// Iterator::SeekForPrev().
class StatusOnlyTraceExecutionResult : public TraceExecutionResult {
public:
StatusOnlyTraceExecutionResult(Status status, uint64_t start_timestamp,
uint64_t end_timestamp, TraceType trace_type);
virtual ~StatusOnlyTraceExecutionResult() override = default;
// Return value of DB::Write(), etc.
virtual const Status& GetStatus() const;
virtual Status Accept(Handler* handler) override;
private:
Status status_;
};
// Result for operations that return a Status and a value.
// Example operation: DB::Get()
class SingleValueTraceExecutionResult : public TraceExecutionResult {
public:
SingleValueTraceExecutionResult(Status status, const std::string& value,
uint64_t start_timestamp,
uint64_t end_timestamp, TraceType trace_type);
SingleValueTraceExecutionResult(Status status, std::string&& value,
uint64_t start_timestamp,
uint64_t end_timestamp, TraceType trace_type);
virtual ~SingleValueTraceExecutionResult() override;
// Return status of DB::Get(), etc.
virtual const Status& GetStatus() const;
// Value for the searched key.
virtual const std::string& GetValue() const;
virtual Status Accept(Handler* handler) override;
private:
Status status_;
std::string value_;
};
// Result for operations that return multiple Status(es) and values.
// Example operation: DB::MultiGet()
class MultiValuesTraceExecutionResult : public TraceExecutionResult {
public:
MultiValuesTraceExecutionResult(std::vector<Status> multi_status,
std::vector<std::string> values,
uint64_t start_timestamp,
uint64_t end_timestamp, TraceType trace_type);
virtual ~MultiValuesTraceExecutionResult() override;
// Returned Status(es) of DB::MultiGet(), etc.
virtual const std::vector<Status>& GetMultiStatus() const;
// Returned values for the searched keys.
virtual const std::vector<std::string>& GetValues() const;
virtual Status Accept(Handler* handler) override;
private:
std::vector<Status> multi_status_;
std::vector<std::string> values_;
};
} // namespace ROCKSDB_NAMESPACE

@ -6,14 +6,17 @@
#pragma once #pragma once
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <functional>
#include <memory> #include <memory>
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class TraceRecord;
class TraceRecordResult;
struct ReplayOptions { struct ReplayOptions {
// Number of threads used for replaying. If 0 or 1, replay using // Number of threads used for replaying. If 0 or 1, replay using
// single thread. // single thread.
@ -27,6 +30,7 @@ struct ReplayOptions {
double fast_forward; double fast_forward;
ReplayOptions() : num_threads(1), fast_forward(1.0) {} ReplayOptions() : num_threads(1), fast_forward(1.0) {}
ReplayOptions(uint32_t num_of_threads, double fast_forward_ratio) ReplayOptions(uint32_t num_of_threads, double fast_forward_ratio)
: num_threads(num_of_threads), fast_forward(fast_forward_ratio) {} : num_threads(num_of_threads), fast_forward(fast_forward_ratio) {}
}; };
@ -36,7 +40,7 @@ struct ReplayOptions {
// instantiated via db_bench today, on using "replay" benchmark. // instantiated via db_bench today, on using "replay" benchmark.
class Replayer { class Replayer {
public: public:
virtual ~Replayer() {} virtual ~Replayer() = default;
// Make some preparation before replaying the trace. This will also reset the // Make some preparation before replaying the trace. This will also reset the
// replayer in order to restart replaying. // replayer in order to restart replaying.
@ -61,13 +65,22 @@ class Replayer {
// trace; // trace;
// Status::NotSupported() if the operation is not supported; // Status::NotSupported() if the operation is not supported;
// Otherwise, return the corresponding error status. // Otherwise, return the corresponding error status.
virtual Status Execute(const std::unique_ptr<TraceRecord>& record) = 0; //
virtual Status Execute(std::unique_ptr<TraceRecord>&& record) = 0; // The actual operation execution status and result(s) will be saved in
// result. For example, a GetQueryTraceRecord will have its DB::Get() status
// and the returned value saved in a SingleValueTraceExecutionResult.
virtual Status Execute(const std::unique_ptr<TraceRecord>& record,
std::unique_ptr<TraceRecordResult>* result) = 0;
// Replay all the traces from the provided trace stream, taking the delay // Replay all the traces from the provided trace stream, taking the delay
// between the traces into consideration. // between the traces into consideration.
virtual Status Replay(const ReplayOptions& options) = 0; //
virtual Status Replay() { return Replay(ReplayOptions()); } // result_callback reports the status of executing a trace record, and the
// actual operation execution result (See the description for Execute()).
virtual Status Replay(
const ReplayOptions& options,
const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
result_callback) = 0;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -199,6 +199,7 @@ LIB_SOURCES = \
test_util/transaction_test_util.cc \ test_util/transaction_test_util.cc \
tools/dump/db_dump_tool.cc \ tools/dump/db_dump_tool.cc \
trace_replay/trace_record_handler.cc \ trace_replay/trace_record_handler.cc \
trace_replay/trace_record_result.cc \
trace_replay/trace_record.cc \ trace_replay/trace_record.cc \
trace_replay/trace_replay.cc \ trace_replay/trace_replay.cc \
trace_replay/block_cache_tracer.cc \ trace_replay/block_cache_tracer.cc \

@ -8027,7 +8027,8 @@ class Benchmark {
} }
s = replayer->Replay( s = replayer->Replay(
ReplayOptions(static_cast<uint32_t>(FLAGS_trace_replay_threads), ReplayOptions(static_cast<uint32_t>(FLAGS_trace_replay_threads),
FLAGS_trace_replay_fast_forward)); FLAGS_trace_replay_fast_forward),
nullptr);
replayer.reset(); replayer.reset();
if (s.ok()) { if (s.ok()) {
fprintf(stdout, "Replay completed from trace_file: %s\n", fprintf(stdout, "Replay completed from trace_file: %s\n",

@ -11,6 +11,7 @@
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/trace_record_result.h"
#include "trace_replay/trace_record_handler.h" #include "trace_replay/trace_record_handler.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -18,8 +19,6 @@ namespace ROCKSDB_NAMESPACE {
// TraceRecord // TraceRecord
TraceRecord::TraceRecord(uint64_t timestamp) : timestamp_(timestamp) {} TraceRecord::TraceRecord(uint64_t timestamp) : timestamp_(timestamp) {}
TraceRecord::~TraceRecord() {}
uint64_t TraceRecord::GetTimestamp() const { return timestamp_; } uint64_t TraceRecord::GetTimestamp() const { return timestamp_; }
TraceRecord::Handler* TraceRecord::NewExecutionHandler( TraceRecord::Handler* TraceRecord::NewExecutionHandler(
@ -31,8 +30,6 @@ TraceRecord::Handler* TraceRecord::NewExecutionHandler(
QueryTraceRecord::QueryTraceRecord(uint64_t timestamp) QueryTraceRecord::QueryTraceRecord(uint64_t timestamp)
: TraceRecord(timestamp) {} : TraceRecord(timestamp) {}
QueryTraceRecord::~QueryTraceRecord() {}
// WriteQueryTraceRecord // WriteQueryTraceRecord
WriteQueryTraceRecord::WriteQueryTraceRecord(PinnableSlice&& write_batch_rep, WriteQueryTraceRecord::WriteQueryTraceRecord(PinnableSlice&& write_batch_rep,
uint64_t timestamp) uint64_t timestamp)
@ -44,13 +41,14 @@ WriteQueryTraceRecord::WriteQueryTraceRecord(const std::string& write_batch_rep,
rep_.PinSelf(write_batch_rep); rep_.PinSelf(write_batch_rep);
} }
WriteQueryTraceRecord::~WriteQueryTraceRecord() {} WriteQueryTraceRecord::~WriteQueryTraceRecord() { rep_.clear(); }
Slice WriteQueryTraceRecord::GetWriteBatchRep() const { return Slice(rep_); } Slice WriteQueryTraceRecord::GetWriteBatchRep() const { return Slice(rep_); }
Status WriteQueryTraceRecord::Accept(Handler* handler) { Status WriteQueryTraceRecord::Accept(
Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
assert(handler != nullptr); assert(handler != nullptr);
return handler->Handle(*this); return handler->Handle(*this, result);
} }
// GetQueryTraceRecord // GetQueryTraceRecord
@ -68,23 +66,22 @@ GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id,
key_.PinSelf(key); key_.PinSelf(key);
} }
GetQueryTraceRecord::~GetQueryTraceRecord() {} GetQueryTraceRecord::~GetQueryTraceRecord() { key_.clear(); }
uint32_t GetQueryTraceRecord::GetColumnFamilyID() const { return cf_id_; } uint32_t GetQueryTraceRecord::GetColumnFamilyID() const { return cf_id_; }
Slice GetQueryTraceRecord::GetKey() const { return Slice(key_); } Slice GetQueryTraceRecord::GetKey() const { return Slice(key_); }
Status GetQueryTraceRecord::Accept(Handler* handler) { Status GetQueryTraceRecord::Accept(Handler* handler,
std::unique_ptr<TraceRecordResult>* result) {
assert(handler != nullptr); assert(handler != nullptr);
return handler->Handle(*this); return handler->Handle(*this, result);
} }
// IteratorQueryTraceRecord // IteratorQueryTraceRecord
IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp) IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp)
: QueryTraceRecord(timestamp) {} : QueryTraceRecord(timestamp) {}
IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {}
// IteratorSeekQueryTraceRecord // IteratorSeekQueryTraceRecord
IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key, SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key,
@ -103,7 +100,7 @@ IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
key_.PinSelf(key); key_.PinSelf(key);
} }
IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() {} IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() { key_.clear(); }
TraceType IteratorSeekQueryTraceRecord::GetTraceType() const { TraceType IteratorSeekQueryTraceRecord::GetTraceType() const {
return static_cast<TraceType>(type_); return static_cast<TraceType>(type_);
@ -120,9 +117,10 @@ uint32_t IteratorSeekQueryTraceRecord::GetColumnFamilyID() const {
Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); } Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); }
Status IteratorSeekQueryTraceRecord::Accept(Handler* handler) { Status IteratorSeekQueryTraceRecord::Accept(
Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
assert(handler != nullptr); assert(handler != nullptr);
return handler->Handle(*this); return handler->Handle(*this, result);
} }
// MultiGetQueryTraceRecord // MultiGetQueryTraceRecord
@ -145,7 +143,10 @@ MultiGetQueryTraceRecord::MultiGetQueryTraceRecord(
} }
} }
MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {} MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {
cf_ids_.clear();
keys_.clear();
}
std::vector<uint32_t> MultiGetQueryTraceRecord::GetColumnFamilyIDs() const { std::vector<uint32_t> MultiGetQueryTraceRecord::GetColumnFamilyIDs() const {
return cf_ids_; return cf_ids_;
@ -155,9 +156,10 @@ std::vector<Slice> MultiGetQueryTraceRecord::GetKeys() const {
return std::vector<Slice>(keys_.begin(), keys_.end()); return std::vector<Slice>(keys_.begin(), keys_.end());
} }
Status MultiGetQueryTraceRecord::Accept(Handler* handler) { Status MultiGetQueryTraceRecord::Accept(
Handler* handler, std::unique_ptr<TraceRecordResult>* result) {
assert(handler != nullptr); assert(handler != nullptr);
return handler->Handle(*this); return handler->Handle(*this, result);
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -6,6 +6,7 @@
#include "trace_replay/trace_record_handler.h" #include "trace_replay/trace_record_handler.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/trace_record_result.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -16,7 +17,8 @@ TraceExecutionHandler::TraceExecutionHandler(
: TraceRecord::Handler(), : TraceRecord::Handler(),
db_(db), db_(db),
write_opts_(WriteOptions()), write_opts_(WriteOptions()),
read_opts_(ReadOptions()) { read_opts_(ReadOptions()),
clock_(SystemClock::Default()) {
assert(db != nullptr); assert(db != nullptr);
assert(!handles.empty()); assert(!handles.empty());
cf_map_.reserve(handles.size()); cf_map_.reserve(handles.size());
@ -28,26 +30,64 @@ TraceExecutionHandler::TraceExecutionHandler(
TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); } TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); }
Status TraceExecutionHandler::Handle(const WriteQueryTraceRecord& record) { Status TraceExecutionHandler::Handle(
const WriteQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) {
if (result != nullptr) {
result->reset(nullptr);
}
uint64_t start = clock_->NowMicros();
WriteBatch batch(record.GetWriteBatchRep().ToString()); WriteBatch batch(record.GetWriteBatchRep().ToString());
return db_->Write(write_opts_, &batch); Status s = db_->Write(write_opts_, &batch);
uint64_t end = clock_->NowMicros();
if (s.ok() && result != nullptr) {
result->reset(new StatusOnlyTraceExecutionResult(s, start, end,
record.GetTraceType()));
}
return s;
} }
Status TraceExecutionHandler::Handle(const GetQueryTraceRecord& record) { Status TraceExecutionHandler::Handle(
const GetQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) {
if (result != nullptr) {
result->reset(nullptr);
}
auto it = cf_map_.find(record.GetColumnFamilyID()); auto it = cf_map_.find(record.GetColumnFamilyID());
if (it == cf_map_.end()) { if (it == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID."); return Status::Corruption("Invalid Column Family ID.");
} }
uint64_t start = clock_->NowMicros();
std::string value; std::string value;
Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value); Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value);
// Treat not found as ok and return other errors. uint64_t end = clock_->NowMicros();
return s.IsNotFound() ? Status::OK() : s;
// Treat not found as ok, return other errors.
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (result != nullptr) {
// Report the actual opetation status in TraceExecutionResult
result->reset(new SingleValueTraceExecutionResult(
std::move(s), std::move(value), start, end, record.GetTraceType()));
}
return Status::OK();
} }
Status TraceExecutionHandler::Handle( Status TraceExecutionHandler::Handle(
const IteratorSeekQueryTraceRecord& record) { const IteratorSeekQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) {
if (result != nullptr) {
result->reset(nullptr);
}
auto it = cf_map_.find(record.GetColumnFamilyID()); auto it = cf_map_.find(record.GetColumnFamilyID());
if (it == cf_map_.end()) { if (it == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID."); return Status::Corruption("Invalid Column Family ID.");
@ -55,6 +95,8 @@ Status TraceExecutionHandler::Handle(
Iterator* single_iter = db_->NewIterator(read_opts_, it->second); Iterator* single_iter = db_->NewIterator(read_opts_, it->second);
uint64_t start = clock_->NowMicros();
switch (record.GetSeekType()) { switch (record.GetSeekType()) {
case IteratorSeekQueryTraceRecord::kSeekForPrev: { case IteratorSeekQueryTraceRecord::kSeekForPrev: {
single_iter->SeekForPrev(record.GetKey()); single_iter->SeekForPrev(record.GetKey());
@ -65,12 +107,26 @@ Status TraceExecutionHandler::Handle(
break; break;
} }
} }
uint64_t end = clock_->NowMicros();
Status s = single_iter->status(); Status s = single_iter->status();
delete single_iter; delete single_iter;
if (s.ok() && result != nullptr) {
result->reset(new StatusOnlyTraceExecutionResult(s, start, end,
record.GetTraceType()));
}
return s; return s;
} }
Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) { Status TraceExecutionHandler::Handle(
const MultiGetQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) {
if (result != nullptr) {
result->reset(nullptr);
}
std::vector<ColumnFamilyHandle*> handles; std::vector<ColumnFamilyHandle*> handles;
handles.reserve(record.GetColumnFamilyIDs().size()); handles.reserve(record.GetColumnFamilyIDs().size());
for (uint32_t cf_id : record.GetColumnFamilyIDs()) { for (uint32_t cf_id : record.GetColumnFamilyIDs()) {
@ -90,15 +146,26 @@ Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) {
return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch."); return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch.");
} }
uint64_t start = clock_->NowMicros();
std::vector<std::string> values; std::vector<std::string> values;
std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values); std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values);
uint64_t end = clock_->NowMicros();
// Treat not found as ok, return other errors. // Treat not found as ok, return other errors.
for (Status s : ss) { for (const Status& s : ss) {
if (!s.ok() && !s.IsNotFound()) { if (!s.ok() && !s.IsNotFound()) {
return s; return s;
} }
} }
if (result != nullptr) {
// Report the actual opetation status in TraceExecutionResult
result->reset(new MultiValuesTraceExecutionResult(
std::move(ss), std::move(values), start, end, record.GetTraceType()));
}
return Status::OK(); return Status::OK();
} }

@ -5,12 +5,14 @@
#pragma once #pragma once
#include <memory>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/trace_record.h" #include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -22,16 +24,21 @@ class TraceExecutionHandler : public TraceRecord::Handler {
const std::vector<ColumnFamilyHandle*>& handles); const std::vector<ColumnFamilyHandle*>& handles);
virtual ~TraceExecutionHandler() override; virtual ~TraceExecutionHandler() override;
virtual Status Handle(const WriteQueryTraceRecord& record) override; virtual Status Handle(const WriteQueryTraceRecord& record,
virtual Status Handle(const GetQueryTraceRecord& record) override; std::unique_ptr<TraceRecordResult>* result) override;
virtual Status Handle(const IteratorSeekQueryTraceRecord& record) override; virtual Status Handle(const GetQueryTraceRecord& record,
virtual Status Handle(const MultiGetQueryTraceRecord& record) override; std::unique_ptr<TraceRecordResult>* result) override;
virtual Status Handle(const IteratorSeekQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) override;
virtual Status Handle(const MultiGetQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) override;
private: private:
DB* db_; DB* db_;
std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_; std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
WriteOptions write_opts_; WriteOptions write_opts_;
ReadOptions read_opts_; ReadOptions read_opts_;
std::shared_ptr<SystemClock> clock_;
}; };
// To do: Handler for trace_analyzer. // To do: Handler for trace_analyzer.

@ -0,0 +1,106 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/trace_record_result.h"
namespace ROCKSDB_NAMESPACE {
// TraceRecordResult
TraceRecordResult::TraceRecordResult(TraceType trace_type)
: trace_type_(trace_type) {}
TraceType TraceRecordResult::GetTraceType() const { return trace_type_; }
// TraceExecutionResult
TraceExecutionResult::TraceExecutionResult(uint64_t start_timestamp,
uint64_t end_timestamp,
TraceType trace_type)
: TraceRecordResult(trace_type),
ts_start_(start_timestamp),
ts_end_(end_timestamp) {
assert(ts_start_ <= ts_end_);
}
uint64_t TraceExecutionResult::GetStartTimestamp() const { return ts_start_; }
uint64_t TraceExecutionResult::GetEndTimestamp() const { return ts_end_; }
// StatusOnlyTraceExecutionResult
StatusOnlyTraceExecutionResult::StatusOnlyTraceExecutionResult(
Status status, uint64_t start_timestamp, uint64_t end_timestamp,
TraceType trace_type)
: TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
status_(std::move(status)) {}
const Status& StatusOnlyTraceExecutionResult::GetStatus() const {
return status_;
}
Status StatusOnlyTraceExecutionResult::Accept(Handler* handler) {
assert(handler != nullptr);
return handler->Handle(*this);
}
// SingleValueTraceExecutionResult
SingleValueTraceExecutionResult::SingleValueTraceExecutionResult(
Status status, const std::string& value, uint64_t start_timestamp,
uint64_t end_timestamp, TraceType trace_type)
: TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
status_(std::move(status)),
value_(value) {}
SingleValueTraceExecutionResult::SingleValueTraceExecutionResult(
Status status, std::string&& value, uint64_t start_timestamp,
uint64_t end_timestamp, TraceType trace_type)
: TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
status_(std::move(status)),
value_(std::move(value)) {}
SingleValueTraceExecutionResult::~SingleValueTraceExecutionResult() {
value_.clear();
}
const Status& SingleValueTraceExecutionResult::GetStatus() const {
return status_;
}
const std::string& SingleValueTraceExecutionResult::GetValue() const {
return value_;
}
Status SingleValueTraceExecutionResult::Accept(Handler* handler) {
assert(handler != nullptr);
return handler->Handle(*this);
}
// MultiValuesTraceExecutionResult
MultiValuesTraceExecutionResult::MultiValuesTraceExecutionResult(
std::vector<Status> multi_status, std::vector<std::string> values,
uint64_t start_timestamp, uint64_t end_timestamp, TraceType trace_type)
: TraceExecutionResult(start_timestamp, end_timestamp, trace_type),
multi_status_(std::move(multi_status)),
values_(std::move(values)) {}
MultiValuesTraceExecutionResult::~MultiValuesTraceExecutionResult() {
multi_status_.clear();
values_.clear();
}
const std::vector<Status>& MultiValuesTraceExecutionResult::GetMultiStatus()
const {
return multi_status_;
}
const std::vector<std::string>& MultiValuesTraceExecutionResult::GetValues()
const {
return values_;
}
Status MultiValuesTraceExecutionResult::Accept(Handler* handler) {
assert(handler != nullptr);
return handler->Handle(*this);
}
} // namespace ROCKSDB_NAMESPACE

@ -131,6 +131,10 @@ Status TracerHelper::DecodeWriteRecord(Trace* trace, int trace_file_version,
assert(trace != nullptr); assert(trace != nullptr);
assert(trace->type == kTraceWrite); assert(trace->type == kTraceWrite);
if (record != nullptr) {
record->reset(nullptr);
}
PinnableSlice rep; PinnableSlice rep;
if (trace_file_version < 2) { if (trace_file_version < 2) {
rep.PinSelf(trace->payload); rep.PinSelf(trace->payload);
@ -168,6 +172,10 @@ Status TracerHelper::DecodeGetRecord(Trace* trace, int trace_file_version,
assert(trace != nullptr); assert(trace != nullptr);
assert(trace->type == kTraceGet); assert(trace->type == kTraceGet);
if (record != nullptr) {
record->reset(nullptr);
}
uint32_t cf_id = 0; uint32_t cf_id = 0;
Slice get_key; Slice get_key;
@ -211,6 +219,10 @@ Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version,
assert(trace->type == kTraceIteratorSeek || assert(trace->type == kTraceIteratorSeek ||
trace->type == kTraceIteratorSeekForPrev); trace->type == kTraceIteratorSeekForPrev);
if (record != nullptr) {
record->reset(nullptr);
}
uint32_t cf_id = 0; uint32_t cf_id = 0;
Slice iter_key; Slice iter_key;
@ -265,6 +277,11 @@ Status TracerHelper::DecodeMultiGetRecord(
std::unique_ptr<TraceRecord>* record) { std::unique_ptr<TraceRecord>* record) {
assert(trace != nullptr); assert(trace != nullptr);
assert(trace->type == kTraceMultiGet); assert(trace->type == kTraceMultiGet);
if (record != nullptr) {
record->reset(nullptr);
}
if (trace_file_version < 2) { if (trace_file_version < 2) {
return Status::Corruption("MultiGet is not supported."); return Status::Corruption("MultiGet is not supported.");
} }

@ -10,13 +10,9 @@
#include <cmath> #include <cmath>
#include <thread> #include <thread>
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "rocksdb/trace_reader_writer.h"
#include "util/threadpool_imp.h" #include "util/threadpool_imp.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -77,17 +73,15 @@ Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) {
return DecodeTraceRecord(&trace, trace_file_version_, record); return DecodeTraceRecord(&trace, trace_file_version_, record);
} }
Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record) { Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record,
return record->Accept(exec_handler_.get()); std::unique_ptr<TraceRecordResult>* result) {
return record->Accept(exec_handler_.get(), result);
} }
Status ReplayerImpl::Execute(std::unique_ptr<TraceRecord>&& record) { Status ReplayerImpl::Replay(
Status s = record->Accept(exec_handler_.get()); const ReplayOptions& options,
record.reset(); const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
return s; result_callback) {
}
Status ReplayerImpl::Replay(const ReplayOptions& options) {
if (options.fast_forward <= 0.0) { if (options.fast_forward <= 0.0) {
return Status::InvalidArgument("Wrong fast forward speed!"); return Status::InvalidArgument("Wrong fast forward speed!");
} }
@ -124,19 +118,34 @@ Status ReplayerImpl::Replay(const ReplayOptions& options) {
// In single-threaded replay, decode first then sleep. // In single-threaded replay, decode first then sleep.
std::unique_ptr<TraceRecord> record; std::unique_ptr<TraceRecord> record;
s = DecodeTraceRecord(&trace, trace_file_version_, &record); s = DecodeTraceRecord(&trace, trace_file_version_, &record);
// Skip unsupported traces, stop for other errors. if (!s.ok() && !s.IsNotSupported()) {
if (s.IsNotSupported()) {
continue;
} else if (!s.ok()) {
break; break;
} }
std::this_thread::sleep_until( std::chrono::system_clock::time_point sleep_to =
replay_epoch + replay_epoch +
std::chrono::microseconds(static_cast<uint64_t>(std::llround( std::chrono::microseconds(static_cast<uint64_t>(std::llround(
1.0 * (trace.ts - header_ts_) / options.fast_forward)))); 1.0 * (trace.ts - header_ts_) / options.fast_forward)));
if (sleep_to > std::chrono::system_clock::now()) {
std::this_thread::sleep_until(sleep_to);
}
s = Execute(std::move(record)); // Skip unsupported traces, stop for other errors.
if (s.IsNotSupported()) {
if (result_callback != nullptr) {
result_callback(s, nullptr);
}
s = Status::OK();
continue;
}
if (result_callback == nullptr) {
s = Execute(record, nullptr);
} else {
std::unique_ptr<TraceRecordResult> res;
s = Execute(record, &res);
result_callback(s, std::move(res));
}
} }
} else { } else {
// Multi-threaded replay. // Multi-threaded replay.
@ -181,10 +190,13 @@ Status ReplayerImpl::Replay(const ReplayOptions& options) {
// In multi-threaded replay, sleep first thatn start decoding and // In multi-threaded replay, sleep first thatn start decoding and
// execution in a thread. // execution in a thread.
std::this_thread::sleep_until( std::chrono::system_clock::time_point sleep_to =
replay_epoch + replay_epoch +
std::chrono::microseconds(static_cast<uint64_t>(std::llround( std::chrono::microseconds(static_cast<uint64_t>(std::llround(
1.0 * (trace.ts - header_ts_) / options.fast_forward)))); 1.0 * (trace.ts - header_ts_) / options.fast_forward)));
if (sleep_to > std::chrono::system_clock::now()) {
std::this_thread::sleep_until(sleep_to);
}
if (trace_type == kTraceWrite || trace_type == kTraceGet || if (trace_type == kTraceWrite || trace_type == kTraceGet ||
trace_type == kTraceIteratorSeek || trace_type == kTraceIteratorSeek ||
@ -195,10 +207,16 @@ Status ReplayerImpl::Replay(const ReplayOptions& options) {
ra->handler = exec_handler_.get(); ra->handler = exec_handler_.get();
ra->trace_file_version = trace_file_version_; ra->trace_file_version = trace_file_version_;
ra->error_cb = error_cb; ra->error_cb = error_cb;
ra->result_cb = result_callback;
thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(), thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(),
nullptr, nullptr); nullptr, nullptr);
} else {
// Skip unsupported traces.
if (result_callback != nullptr) {
result_callback(Status::NotSupported("Unsupported trace type."),
nullptr);
}
} }
// Skip unsupported traces.
} }
thread_pool.WaitForJobsAndJoinAllThreads(); thread_pool.WaitForJobsAndJoinAllThreads();
@ -293,13 +311,26 @@ void ReplayerImpl::BackgroundWork(void* arg) {
std::unique_ptr<TraceRecord> record; std::unique_ptr<TraceRecord> record;
Status s = Status s =
DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record); DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record);
if (s.ok()) { if (!s.ok()) {
s = record->Accept(ra->handler); // Stop the replay
record.reset(); if (ra->error_cb != nullptr) {
ra->error_cb(s, ra->trace_entry.ts);
}
// Report the result
if (ra->result_cb != nullptr) {
ra->result_cb(s, nullptr);
}
return;
} }
if (!s.ok() && ra->error_cb) {
ra->error_cb(s, ra->trace_entry.ts); if (ra->result_cb == nullptr) {
s = record->Accept(ra->handler, nullptr);
} else {
std::unique_ptr<TraceRecordResult> res;
s = record->Accept(ra->handler, &res);
ra->result_cb(s, std::move(res));
} }
record.reset();
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -12,22 +12,17 @@
#include <mutex> #include <mutex>
#include <unordered_map> #include <unordered_map>
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/trace_record.h" #include "rocksdb/trace_record.h"
#include "rocksdb/trace_record_result.h"
#include "rocksdb/utilities/replayer.h" #include "rocksdb/utilities/replayer.h"
#include "trace_replay/trace_replay.h" #include "trace_replay/trace_replay.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class ColumnFamilyHandle;
class DB;
class Env;
class TraceReader;
class TraceRecord;
class Status;
struct ReplayOptions;
class ReplayerImpl : public Replayer { class ReplayerImpl : public Replayer {
public: public:
ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles, ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
@ -41,11 +36,14 @@ class ReplayerImpl : public Replayer {
Status Next(std::unique_ptr<TraceRecord>* record) override; Status Next(std::unique_ptr<TraceRecord>* record) override;
using Replayer::Execute; using Replayer::Execute;
Status Execute(const std::unique_ptr<TraceRecord>& record) override; Status Execute(const std::unique_ptr<TraceRecord>& record,
Status Execute(std::unique_ptr<TraceRecord>&& record) override; std::unique_ptr<TraceRecordResult>* result) override;
using Replayer::Replay; using Replayer::Replay;
Status Replay(const ReplayOptions& options) override; Status Replay(
const ReplayOptions& options,
const std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)>&
result_callback) override;
using Replayer::GetHeaderTimestamp; using Replayer::GetHeaderTimestamp;
uint64_t GetHeaderTimestamp() const override; uint64_t GetHeaderTimestamp() const override;
@ -84,6 +82,9 @@ struct ReplayerWorkerArg {
// Callback function to report the error status and the timestamp of the // Callback function to report the error status and the timestamp of the
// TraceRecord. // TraceRecord.
std::function<void(Status, uint64_t)> error_cb; std::function<void(Status, uint64_t)> error_cb;
// Callback function to report the trace execution status and operation
// execution status/result(s).
std::function<void(Status, std::unique_ptr<TraceRecordResult>&&)> result_cb;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save