Make TraceRecord and Replayer public (#8611)

Summary:
New public interfaces:
`TraceRecord` and `TraceRecord::Handler`, available in "rocksdb/trace_record.h".
`Replayer`, available in `rocksdb/utilities/replayer.h`.

User can use `DB::NewDefaultReplayer()` to create a Replayer to auto/manual replay a trace file.

Unit tests:
- `./db_test2 --gtest_filter="DBTest2.TraceAndReplay"`: Updated with the internal API changes.
- `./db_test2 --gtest_filter="DBTest2.TraceAndManualReplay"`: New for manual replay.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8611

Reviewed By: ajkr

Differential Revision: D30266329

Pulled By: autopear

fbshipit-source-id: 1ecb3cbbedae0f6a67c18f0cc82e002b4d81b6f8
main
Merlin Mao 3 years ago committed by Facebook GitHub Bot
parent a53563d86e
commit f58d276764
  1. 5
      CMakeLists.txt
  2. 4
      HISTORY.md
  3. 6
      TARGETS
  4. 12
      db/db_impl/db_impl.cc
  5. 11
      db/db_impl/db_impl.h
  6. 260
      db/db_test2.cc
  7. 1
      env/file_system_tracer.cc
  8. 36
      include/rocksdb/db.h
  9. 6
      include/rocksdb/trace_reader_writer.h
  10. 205
      include/rocksdb/trace_record.h
  11. 74
      include/rocksdb/utilities/replayer.h
  12. 7
      include/rocksdb/utilities/stackable_db.h
  13. 3
      src.mk
  14. 1
      table/block_based/block_based_table_reader.cc
  15. 2
      table/table_test.cc
  16. 1
      tools/block_cache_analyzer/block_cache_trace_analyzer.cc
  17. 1
      tools/block_cache_analyzer/block_cache_trace_analyzer.h
  18. 1
      tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc
  19. 49
      tools/db_bench_tool.cc
  20. 142
      tools/trace_analyzer_tool.cc
  21. 12
      tools/trace_analyzer_tool.h
  22. 1
      trace_replay/block_cache_tracer.cc
  23. 2
      trace_replay/block_cache_tracer_test.cc
  24. 1
      trace_replay/io_tracer.h
  25. 1
      trace_replay/io_tracer_test.cc
  26. 163
      trace_replay/trace_record.cc
  27. 108
      trace_replay/trace_record_handler.cc
  28. 39
      trace_replay/trace_record_handler.h
  29. 600
      trace_replay/trace_replay.cc
  30. 138
      trace_replay/trace_replay.h
  31. 3
      utilities/simulator_cache/cache_simulator.cc
  32. 2
      utilities/simulator_cache/cache_simulator_test.cc
  33. 8
      utilities/trace/file_trace_reader_writer.cc
  34. 1
      utilities/trace/file_trace_reader_writer.h
  35. 305
      utilities/trace/replayer_impl.cc
  36. 90
      utilities/trace/replayer_impl.h

@ -816,9 +816,11 @@ set(SOURCES
tools/ldb_tool.cc tools/ldb_tool.cc
tools/sst_dump_tool.cc tools/sst_dump_tool.cc
tools/trace_analyzer_tool.cc tools/trace_analyzer_tool.cc
trace_replay/trace_replay.cc
trace_replay/block_cache_tracer.cc trace_replay/block_cache_tracer.cc
trace_replay/io_tracer.cc trace_replay/io_tracer.cc
trace_replay/trace_record_handler.cc
trace_replay/trace_record.cc
trace_replay/trace_replay.cc
util/coding.cc util/coding.cc
util/compaction_job_stats_impl.cc util/compaction_job_stats_impl.cc
util/comparator.cc util/comparator.cc
@ -878,6 +880,7 @@ set(SOURCES
utilities/simulator_cache/sim_cache.cc utilities/simulator_cache/sim_cache.cc
utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc
utilities/trace/file_trace_reader_writer.cc utilities/trace/file_trace_reader_writer.cc
utilities/trace/replayer_impl.cc
utilities/transactions/lock/lock_manager.cc utilities/transactions/lock/lock_manager.cc
utilities/transactions/lock/point/point_lock_tracker.cc utilities/transactions/lock/point/point_lock_tracker.cc
utilities/transactions/lock/point/point_lock_manager.cc utilities/transactions/lock/point/point_lock_manager.cc

@ -15,6 +15,10 @@
* BlockBasedTableOptions.prepopulate_block_cache can be dynamically configured using DB::SetOptions. * BlockBasedTableOptions.prepopulate_block_cache can be dynamically configured using DB::SetOptions.
* Add CompactionOptionsFIFO.age_for_warm, which allows RocksDB to move old files to warm tier in FIFO compactions. Note that file temperature is still an experimental feature. * Add CompactionOptionsFIFO.age_for_warm, which allows RocksDB to move old files to warm tier in FIFO compactions. Note that file temperature is still an experimental feature.
* Add a comment to suggest btrfs user to disable file preallocation by setting `options.allow_fallocate=false`. * Add a comment to suggest btrfs user to disable file preallocation by setting `options.allow_fallocate=false`.
* Fast forward option in Trace replay changed to double type to allow replaying at a lower speed, by settings the value between 0 and 1. This option can be set via `ReplayOptions` in `Replayer::Replay()`, or via `--trace_replay_fast_forward` in db_bench.
## Public API change
* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Created trace_record.h and utilities/replayer.h files to access decoded Trace records and replay them.
### Performance Improvements ### Performance Improvements
* Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value. * Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value.

@ -335,6 +335,8 @@ cpp_library(
"tools/sst_dump_tool.cc", "tools/sst_dump_tool.cc",
"trace_replay/block_cache_tracer.cc", "trace_replay/block_cache_tracer.cc",
"trace_replay/io_tracer.cc", "trace_replay/io_tracer.cc",
"trace_replay/trace_record.cc",
"trace_replay/trace_record_handler.cc",
"trace_replay/trace_replay.cc", "trace_replay/trace_replay.cc",
"util/build_version.cc", "util/build_version.cc",
"util/coding.cc", "util/coding.cc",
@ -398,6 +400,7 @@ cpp_library(
"utilities/simulator_cache/sim_cache.cc", "utilities/simulator_cache/sim_cache.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/trace/file_trace_reader_writer.cc", "utilities/trace/file_trace_reader_writer.cc",
"utilities/trace/replayer_impl.cc",
"utilities/transactions/lock/lock_manager.cc", "utilities/transactions/lock/lock_manager.cc",
"utilities/transactions/lock/point/point_lock_manager.cc", "utilities/transactions/lock/point/point_lock_manager.cc",
"utilities/transactions/lock/point/point_lock_tracker.cc", "utilities/transactions/lock/point/point_lock_tracker.cc",
@ -650,6 +653,8 @@ cpp_library(
"tools/sst_dump_tool.cc", "tools/sst_dump_tool.cc",
"trace_replay/block_cache_tracer.cc", "trace_replay/block_cache_tracer.cc",
"trace_replay/io_tracer.cc", "trace_replay/io_tracer.cc",
"trace_replay/trace_record.cc",
"trace_replay/trace_record_handler.cc",
"trace_replay/trace_replay.cc", "trace_replay/trace_replay.cc",
"util/build_version.cc", "util/build_version.cc",
"util/coding.cc", "util/coding.cc",
@ -713,6 +718,7 @@ cpp_library(
"utilities/simulator_cache/sim_cache.cc", "utilities/simulator_cache/sim_cache.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/trace/file_trace_reader_writer.cc", "utilities/trace/file_trace_reader_writer.cc",
"utilities/trace/replayer_impl.cc",
"utilities/transactions/lock/lock_manager.cc", "utilities/transactions/lock/lock_manager.cc",
"utilities/transactions/lock/point/point_lock_manager.cc", "utilities/transactions/lock/point/point_lock_manager.cc",
"utilities/transactions/lock/point/point_lock_tracker.cc", "utilities/transactions/lock/point/point_lock_tracker.cc",

@ -94,6 +94,7 @@
#include "table/table_builder.h" #include "table/table_builder.h"
#include "table/two_level_iterator.h" #include "table/two_level_iterator.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "trace_replay/trace_replay.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/cast_util.h" #include "util/cast_util.h"
#include "util/coding.h" #include "util/coding.h"
@ -103,6 +104,7 @@
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/trace/replayer_impl.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -4359,9 +4361,7 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
return earliest_seq; return earliest_seq;
} }
#endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE
Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
bool cache_only, bool cache_only,
SequenceNumber lower_bound_seq, SequenceNumber lower_bound_seq,
@ -5108,6 +5108,14 @@ Status DBImpl::EndTrace() {
return s; return s;
} }
Status DBImpl::NewDefaultReplayer(
const std::vector<ColumnFamilyHandle*>& handles,
std::unique_ptr<TraceReader>&& reader,
std::unique_ptr<Replayer>* replayer) {
replayer->reset(new ReplayerImpl(this, handles, std::move(reader)));
return Status::OK();
}
Status DBImpl::StartBlockCacheTrace( Status DBImpl::StartBlockCacheTrace(
const TraceOptions& trace_options, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) { std::unique_ptr<TraceWriter>&& trace_writer) {

@ -51,8 +51,13 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#ifndef ROCKSDB_LITE
#include "rocksdb/trace_reader_writer.h" #include "rocksdb/trace_reader_writer.h"
#endif // ROCKSDB_LITE
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/replayer.h"
#endif // ROCKSDB_LITE
#include "rocksdb/write_buffer_manager.h" #include "rocksdb/write_buffer_manager.h"
#include "table/merging_iterator.h" #include "table/merging_iterator.h"
#include "table/scoped_arena_iterator.h" #include "table/scoped_arena_iterator.h"
@ -464,6 +469,12 @@ class DBImpl : public DB {
using DB::EndTrace; using DB::EndTrace;
virtual Status EndTrace() override; virtual Status EndTrace() override;
using DB::NewDefaultReplayer;
virtual Status NewDefaultReplayer(
const std::vector<ColumnFamilyHandle*>& handles,
std::unique_ptr<TraceReader>&& reader,
std::unique_ptr<Replayer>* replayer) override;
using DB::StartBlockCacheTrace; using DB::StartBlockCacheTrace;
Status StartBlockCacheTrace( Status StartBlockCacheTrace(
const TraceOptions& options, const TraceOptions& options,

@ -17,6 +17,7 @@
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/persistent_cache.h" #include "rocksdb/persistent_cache.h"
#include "rocksdb/utilities/replayer.h"
#include "rocksdb/wal_filter.h" #include "rocksdb/wal_filter.h"
#include "util/random.h" #include "util/random.h"
#include "utilities/fault_injection_env.h" #include "utilities/fault_injection_env.h"
@ -4256,8 +4257,16 @@ TEST_F(DBTest2, TraceAndReplay) {
std::unique_ptr<TraceReader> trace_reader; std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
Replayer replayer(db2, handles_, std::move(trace_reader)); std::unique_ptr<Replayer> replayer;
ASSERT_OK(replayer.Replay()); ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
// Unprepared replay should fail with Status::Incomplete()
ASSERT_TRUE(replayer->Replay().IsIncomplete());
ASSERT_OK(replayer->Prepare());
// Ok to repeatedly Prepare().
ASSERT_OK(replayer->Prepare());
// Replay using 1 thread, 1x speed.
ASSERT_OK(replayer->Replay());
ASSERT_OK(db2->Get(ro, handles[0], "a", &value)); ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
ASSERT_EQ("1", value); ASSERT_EQ("1", value);
@ -4271,6 +4280,229 @@ TEST_F(DBTest2, TraceAndReplay) {
ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value)); ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
ASSERT_EQ("rocks", value); ASSERT_EQ("rocks", value);
// Re-replay should fail with Status::Incomplete() if Prepare() was not
// called. Currently we don't distinguish between unprepared and trace end.
ASSERT_TRUE(replayer->Replay().IsIncomplete());
// Re-replay using 2 threads, 2x speed.
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0)));
// Re-replay using 2 threads, 1/2 speed.
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5)));
replayer.reset();
for (auto handle : handles) {
delete handle;
}
delete db2;
ASSERT_OK(DestroyDB(dbname2, options));
}
TEST_F(DBTest2, TraceAndManualReplay) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreatePutOperator();
ReadOptions ro;
WriteOptions wo;
TraceOptions trace_opts;
EnvOptions env_opts;
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
Iterator* single_iter = nullptr;
ASSERT_TRUE(db_->EndTrace().IsIOError());
std::string trace_filename = dbname_ + "/rocksdb.trace";
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
ASSERT_OK(Put(0, "a", "1"));
ASSERT_OK(Merge(0, "b", "2"));
ASSERT_OK(Delete(0, "c"));
ASSERT_OK(SingleDelete(0, "d"));
ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
WriteBatch batch;
ASSERT_OK(batch.Put("f", "11"));
ASSERT_OK(batch.Merge("g", "12"));
ASSERT_OK(batch.Delete("h"));
ASSERT_OK(batch.SingleDelete("i"));
ASSERT_OK(batch.DeleteRange("j", "k"));
ASSERT_OK(db_->Write(wo, &batch));
single_iter = db_->NewIterator(ro);
single_iter->Seek("f");
single_iter->SeekForPrev("g");
delete single_iter;
ASSERT_EQ("1", Get(0, "a"));
ASSERT_EQ("12", Get(0, "g"));
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(1, "rocksdb", "rocks"));
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
ASSERT_OK(db_->EndTrace());
// These should not get into the trace file as it is after EndTrace.
Put("hello", "world");
Merge("foo", "bar");
// Open another db, replay, and verify the data
std::string value;
std::string dbname2 = test::PerThreadDBPath(env_, "/db_replay");
ASSERT_OK(DestroyDB(dbname2, options));
// Using a different name than db2, to pacify infer's use-after-lifetime
// warnings (http://fbinfer.com).
DB* db2_init = nullptr;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname2, &db2_init));
ColumnFamilyHandle* cf;
ASSERT_OK(
db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
delete cf;
delete db2_init;
DB* db2 = nullptr;
std::vector<ColumnFamilyDescriptor> column_families;
ColumnFamilyOptions cf_options;
cf_options.merge_operator = MergeOperators::CreatePutOperator();
column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
column_families.push_back(
ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
std::vector<ColumnFamilyHandle*> handles;
DBOptions db_opts;
db_opts.env = env_;
ASSERT_OK(DB::Open(db_opts, dbname2, column_families, &handles, &db2));
env_->SleepForMicroseconds(100);
// Verify that the keys don't already exist
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
std::unique_ptr<Replayer> replayer;
ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
// Manual replay for 2 times. The 2nd checks if the replay can restart.
std::unique_ptr<TraceRecord> record;
for (int i = 0; i < 2; i++) {
// Next should fail if unprepared.
ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
ASSERT_OK(replayer->Prepare());
Status s = Status::OK();
// Looping until trace end.
while (s.ok()) {
s = replayer->Next(&record);
// Skip unsupported operations.
if (s.IsNotSupported()) {
continue;
}
if (s.ok()) {
ASSERT_OK(replayer->Execute(std::move(record)));
}
}
// Status::Incomplete() will be returned when manually reading the trace
// end, or Prepare() was not called.
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete());
}
ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
ASSERT_EQ("1", value);
ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
ASSERT_EQ("12", value);
ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
ASSERT_EQ("bar", value);
ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
ASSERT_EQ("rocks", value);
// Test execution of artificially created TraceRecords.
uint64_t fake_ts = 1U;
// Write
batch.Clear();
batch.Put("trace-record-write1", "write1");
batch.Put("trace-record-write2", "write2");
record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record)));
ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value));
ASSERT_EQ("write1", value);
ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value));
ASSERT_EQ("write2", value);
// Get related
// Get an existing key.
record.reset(new GetQueryTraceRecord(handles[0]->GetID(),
"trace-record-write1", fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record)));
// Get an non-existing key, should still return Status::OK().
record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get",
fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record)));
// Get from an invalid (non-existing) cf_id.
uint32_t invalid_cf_id = handles[1]->GetID() + 1;
record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++));
ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
// Iteration related
for (IteratorSeekQueryTraceRecord::SeekType seekType :
{IteratorSeekQueryTraceRecord::kSeek,
IteratorSeekQueryTraceRecord::kSeekForPrev}) {
// Seek to an existing key.
record.reset(new IteratorSeekQueryTraceRecord(
seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record)));
// Seek to an non-existing key, should still return Status::OK().
record.reset(new IteratorSeekQueryTraceRecord(
seekType, handles[0]->GetID(), "trace-record-get", fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record)));
// Seek from an invalid cf_id.
record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id,
"whatever", fake_ts++));
ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
}
// MultiGet related
// Get existing keys.
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a", "foo"}), fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record)));
// Get all non-existing keys, should still return Status::OK().
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"no1", "no2"}), fake_ts++));
// Get mixed of existing and non-existing keys, should still return
// Status::OK().
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a", "no2"}), fake_ts++));
ASSERT_OK(replayer->Execute(std::move(record)));
// Get from an invalid (non-existing) cf_id.
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>(
{handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}),
std::vector<std::string>({"a", "foo", "whatever"}), fake_ts++));
ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption());
// Empty MultiGet
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>(), std::vector<std::string>(), fake_ts++));
ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument());
// MultiGet size mismatch
record.reset(new MultiGetQueryTraceRecord(
std::vector<uint32_t>({handles[0]->GetID(), handles[1]->GetID()}),
std::vector<std::string>({"a"}), fake_ts++));
ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument());
replayer.reset();
for (auto handle : handles) { for (auto handle : handles) {
delete handle; delete handle;
} }
@ -4334,8 +4566,12 @@ TEST_F(DBTest2, TraceWithLimit) {
std::unique_ptr<TraceReader> trace_reader; std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
Replayer replayer(db2, handles_, std::move(trace_reader)); std::unique_ptr<Replayer> replayer;
ASSERT_OK(replayer.Replay()); ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay());
replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound()); ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
@ -4405,8 +4641,12 @@ TEST_F(DBTest2, TraceWithSampling) {
std::unique_ptr<TraceReader> trace_reader; std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
Replayer replayer(db2, handles_, std::move(trace_reader)); std::unique_ptr<Replayer> replayer;
ASSERT_OK(replayer.Replay()); ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay());
replayer.reset();
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound()); ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
@ -4505,8 +4745,12 @@ TEST_F(DBTest2, TraceWithFilter) {
std::unique_ptr<TraceReader> trace_reader; std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
Replayer replayer(db2, handles_, std::move(trace_reader)); std::unique_ptr<Replayer> replayer;
ASSERT_OK(replayer.Replay()); ASSERT_OK(
db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer));
ASSERT_OK(replayer->Prepare());
ASSERT_OK(replayer->Replay());
replayer.reset();
// All the key-values should not present since we filter out the WRITE ops. // All the key-values should not present since we filter out the WRITE ops.
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());

@ -7,6 +7,7 @@
#include "rocksdb/file_system.h" #include "rocksdb/file_system.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {

@ -39,25 +39,31 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
struct Options;
struct DBOptions;
struct ColumnFamilyOptions; struct ColumnFamilyOptions;
struct ReadOptions;
struct WriteOptions;
struct FlushOptions;
struct CompactionOptions; struct CompactionOptions;
struct CompactRangeOptions; struct CompactRangeOptions;
struct TableProperties; struct DBOptions;
struct ExternalSstFileInfo; struct ExternalSstFileInfo;
class WriteBatch; struct FlushOptions;
struct Options;
struct ReadOptions;
struct TableProperties;
struct WriteOptions;
#ifdef ROCKSDB_LITE
class CompactionJobInfo;
#endif
class Env; class Env;
class EventListener; class EventListener;
class FileSystem;
#ifndef ROCKSDB_LITE
class Replayer;
#endif
class StatsHistoryIterator; class StatsHistoryIterator;
#ifndef ROCKSDB_LITE
class TraceReader;
class TraceWriter; class TraceWriter;
#ifdef ROCKSDB_LITE
class CompactionJobInfo;
#endif #endif
class FileSystem; class WriteBatch;
extern const std::string kDefaultColumnFamilyName; extern const std::string kDefaultColumnFamilyName;
extern const std::string kPersistentStatsColumnFamilyName; extern const std::string kPersistentStatsColumnFamilyName;
@ -1628,6 +1634,7 @@ class DB {
virtual ColumnFamilyHandle* DefaultColumnFamily() const = 0; virtual ColumnFamilyHandle* DefaultColumnFamily() const = 0;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
virtual Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, virtual Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
TablePropertiesCollection* props) = 0; TablePropertiesCollection* props) = 0;
virtual Status GetPropertiesOfAllTables(TablePropertiesCollection* props) { virtual Status GetPropertiesOfAllTables(TablePropertiesCollection* props) {
@ -1678,6 +1685,15 @@ class DB {
virtual Status EndBlockCacheTrace() { virtual Status EndBlockCacheTrace() {
return Status::NotSupported("EndBlockCacheTrace() is not implemented."); return Status::NotSupported("EndBlockCacheTrace() is not implemented.");
} }
// Create a default trace replayer.
virtual Status NewDefaultReplayer(
const std::vector<ColumnFamilyHandle*>& /*handles*/,
std::unique_ptr<TraceReader>&& /*reader*/,
std::unique_ptr<Replayer>* /*replayer*/) {
return Status::NotSupported("NewDefaultReplayer() is not implemented.");
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// Needed for StackableDB // Needed for StackableDB

@ -36,6 +36,11 @@ class TraceReader {
virtual Status Read(std::string* data) = 0; virtual Status Read(std::string* data) = 0;
virtual Status Close() = 0; virtual Status Close() = 0;
// Seek back to the trace header. Replayer can call this method for
// repeatedly replaying. Note this method may fail if the reader is already
// closed.
virtual Status Reset() = 0;
}; };
// Factory methods to read/write traces from/to a file. // Factory methods to read/write traces from/to a file.
@ -45,4 +50,5 @@ Status NewFileTraceWriter(Env* env, const EnvOptions& env_options,
Status NewFileTraceReader(Env* env, const EnvOptions& env_options, Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
const std::string& trace_filename, const std::string& trace_filename,
std::unique_ptr<TraceReader>* trace_reader); std::unique_ptr<TraceReader>* trace_reader);
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,205 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <string>
#include <vector>
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/slice.h"
namespace ROCKSDB_NAMESPACE {
class ColumnFamilyHandle;
class DB;
class Status;
// Supported trace record types.
enum TraceType : char {
kTraceNone = 0,
kTraceBegin = 1,
kTraceEnd = 2,
// Query level tracing related trace types.
kTraceWrite = 3,
kTraceGet = 4,
kTraceIteratorSeek = 5,
kTraceIteratorSeekForPrev = 6,
// Block cache tracing related trace types.
kBlockTraceIndexBlock = 7,
kBlockTraceFilterBlock = 8,
kBlockTraceDataBlock = 9,
kBlockTraceUncompressionDictBlock = 10,
kBlockTraceRangeDeletionBlock = 11,
// IO tracing related trace type.
kIOTracer = 12,
// Query level tracing related trace type.
kTraceMultiGet = 13,
// All trace types should be added before kTraceMax
kTraceMax,
};
class WriteQueryTraceRecord;
class GetQueryTraceRecord;
class IteratorSeekQueryTraceRecord;
class MultiGetQueryTraceRecord;
// Base class for all types of trace records.
class TraceRecord {
public:
TraceRecord();
explicit TraceRecord(uint64_t timestamp);
virtual ~TraceRecord();
virtual TraceType GetTraceType() const = 0;
virtual uint64_t GetTimestamp() const;
class Handler {
public:
virtual ~Handler() {}
virtual Status Handle(const WriteQueryTraceRecord& record) = 0;
virtual Status Handle(const GetQueryTraceRecord& record) = 0;
virtual Status Handle(const IteratorSeekQueryTraceRecord& record) = 0;
virtual Status Handle(const MultiGetQueryTraceRecord& record) = 0;
};
virtual Status Accept(Handler* handler) = 0;
// Create a handler for the exeution of TraceRecord.
static Handler* NewExecutionHandler(
DB* db, const std::vector<ColumnFamilyHandle*>& handles);
private:
// Timestamp (in microseconds) of this trace.
uint64_t timestamp_;
};
// Base class for all query types of trace records.
class QueryTraceRecord : public TraceRecord {
public:
explicit QueryTraceRecord(uint64_t timestamp);
virtual ~QueryTraceRecord() override;
};
// Trace record for DB::Write() operation.
class WriteQueryTraceRecord : public QueryTraceRecord {
public:
WriteQueryTraceRecord(PinnableSlice&& write_batch_rep, uint64_t timestamp);
WriteQueryTraceRecord(const std::string& write_batch_rep, uint64_t timestamp);
virtual ~WriteQueryTraceRecord() override;
TraceType GetTraceType() const override { return kTraceWrite; };
virtual Slice GetWriteBatchRep() const;
virtual Status Accept(Handler* handler) override;
private:
PinnableSlice rep_;
};
// Trace record for DB::Get() operation
class GetQueryTraceRecord : public QueryTraceRecord {
public:
GetQueryTraceRecord(uint32_t column_family_id, PinnableSlice&& key,
uint64_t timestamp);
GetQueryTraceRecord(uint32_t column_family_id, const std::string& key,
uint64_t timestamp);
virtual ~GetQueryTraceRecord() override;
TraceType GetTraceType() const override { return kTraceGet; };
virtual uint32_t GetColumnFamilyID() const;
virtual Slice GetKey() const;
virtual Status Accept(Handler* handler) override;
private:
// Column family ID.
uint32_t cf_id_;
// Key to get.
PinnableSlice key_;
};
// Base class for all Iterator related operations.
class IteratorQueryTraceRecord : public QueryTraceRecord {
public:
explicit IteratorQueryTraceRecord(uint64_t timestamp);
virtual ~IteratorQueryTraceRecord() override;
};
// Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation.
class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord {
public:
// Currently we only support Seek() and SeekForPrev().
enum SeekType {
kSeek = kTraceIteratorSeek,
kSeekForPrev = kTraceIteratorSeekForPrev
};
IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id,
PinnableSlice&& key, uint64_t timestamp);
IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id,
const std::string& key, uint64_t timestamp);
virtual ~IteratorSeekQueryTraceRecord() override;
TraceType GetTraceType() const override;
virtual SeekType GetSeekType() const;
virtual uint32_t GetColumnFamilyID() const;
virtual Slice GetKey() const;
virtual Status Accept(Handler* handler) override;
private:
SeekType type_;
// Column family ID.
uint32_t cf_id_;
// Key to seek to.
PinnableSlice key_;
};
// Trace record for DB::MultiGet() operation.
class MultiGetQueryTraceRecord : public QueryTraceRecord {
public:
MultiGetQueryTraceRecord(std::vector<uint32_t> column_family_ids,
std::vector<PinnableSlice>&& keys,
uint64_t timestamp);
MultiGetQueryTraceRecord(std::vector<uint32_t> column_family_ids,
const std::vector<std::string>& keys,
uint64_t timestamp);
virtual ~MultiGetQueryTraceRecord() override;
TraceType GetTraceType() const override { return kTraceMultiGet; };
virtual std::vector<uint32_t> GetColumnFamilyIDs() const;
virtual std::vector<Slice> GetKeys() const;
virtual Status Accept(Handler* handler) override;
private:
// Column familiy IDs.
std::vector<uint32_t> cf_ids_;
// Keys to get.
std::vector<PinnableSlice> keys_;
};
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,74 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <memory>
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/status.h"
#include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE {
struct ReplayOptions {
// Number of threads used for replaying. If 0 or 1, replay using
// single thread.
uint32_t num_threads;
// Enables fast forwarding a replay by increasing/reducing the delay between
// the ingested traces.
// If > 0.0 and < 1.0, slow down the replay by this amount.
// If 1.0, replay the operations at the same rate as in the trace stream.
// If > 1, speed up the replay by this amount.
double fast_forward;
ReplayOptions() : num_threads(1), fast_forward(1.0) {}
ReplayOptions(uint32_t num_of_threads, double fast_forward_ratio)
: num_threads(num_of_threads), fast_forward(fast_forward_ratio) {}
};
// Replayer helps to replay the captured RocksDB query level operations.
// The Replayer can either be created from DB::NewReplayer method, or be
// instantiated via db_bench today, on using "replay" benchmark.
class Replayer {
public:
virtual ~Replayer() {}
// Make some preparation before replaying the trace. This will also reset the
// replayer in order to restart replaying.
virtual Status Prepare() = 0;
// Return the timestamp when the trace recording was started.
virtual uint64_t GetHeaderTimestamp() const = 0;
// Atomically read one trace into a TraceRecord (excluding the header and
// footer traces).
// Return Status::OK() on success;
// Status::Incomplete() if Prepare() was not called or no more available
// trace;
// Status::NotSupported() if the read trace type is not supported.
virtual Status Next(std::unique_ptr<TraceRecord>* record) = 0;
// Execute one TraceRecord.
// Return Status::OK() if the execution was successful. Get/MultiGet traces
// will still return Status::OK() even if they got Status::NotFound()
// from DB::Get() or DB::MultiGet();
// Status::Incomplete() if Prepare() was not called or no more available
// trace;
// Status::NotSupported() if the operation is not supported;
// Otherwise, return the corresponding error status.
virtual Status Execute(const std::unique_ptr<TraceRecord>& record) = 0;
virtual Status Execute(std::unique_ptr<TraceRecord>&& record) = 0;
// Replay all the traces from the provided trace stream, taking the delay
// between the traces into consideration.
virtual Status Replay(const ReplayOptions& options) = 0;
virtual Status Replay() { return Replay(ReplayOptions()); }
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -390,6 +390,13 @@ class StackableDB : public DB {
using DB::EndTrace; using DB::EndTrace;
Status EndTrace() override { return db_->EndTrace(); } Status EndTrace() override { return db_->EndTrace(); }
using DB::NewDefaultReplayer;
Status NewDefaultReplayer(const std::vector<ColumnFamilyHandle*>& handles,
std::unique_ptr<TraceReader>&& reader,
std::unique_ptr<Replayer>* replayer) override {
return db_->NewDefaultReplayer(handles, std::move(reader), replayer);
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs, virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs,

@ -198,6 +198,8 @@ LIB_SOURCES = \
test_util/sync_point_impl.cc \ test_util/sync_point_impl.cc \
test_util/transaction_test_util.cc \ test_util/transaction_test_util.cc \
tools/dump/db_dump_tool.cc \ tools/dump/db_dump_tool.cc \
trace_replay/trace_record_handler.cc \
trace_replay/trace_record.cc \
trace_replay/trace_replay.cc \ trace_replay/trace_replay.cc \
trace_replay/block_cache_tracer.cc \ trace_replay/block_cache_tracer.cc \
trace_replay/io_tracer.cc \ trace_replay/io_tracer.cc \
@ -262,6 +264,7 @@ LIB_SOURCES = \
utilities/simulator_cache/sim_cache.cc \ utilities/simulator_cache/sim_cache.cc \
utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \
utilities/trace/file_trace_reader_writer.cc \ utilities/trace/file_trace_reader_writer.cc \
utilities/trace/replayer_impl.cc \
utilities/transactions/lock/lock_manager.cc \ utilities/transactions/lock/lock_manager.cc \
utilities/transactions/lock/point/point_lock_tracker.cc \ utilities/transactions/lock/point/point_lock_tracker.cc \
utilities/transactions/lock/point/point_lock_manager.cc \ utilities/transactions/lock/point/point_lock_manager.cc \

@ -36,6 +36,7 @@
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "rocksdb/trace_record.h"
#include "table/block_based/binary_search_index_reader.h" #include "table/block_based/binary_search_index_reader.h"
#include "table/block_based/block.h" #include "table/block_based/block.h"
#include "table/block_based/block_based_filter_block.h" #include "table/block_based/block_based_filter_block.h"

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <stdio.h> #include <stdio.h>
#include <algorithm> #include <algorithm>
#include <iostream> #include <iostream>
#include <map> #include <map>
@ -34,6 +35,7 @@
#include "rocksdb/perf_context.h" #include "rocksdb/perf_context.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/trace_record.h"
#include "rocksdb/write_buffer_manager.h" #include "rocksdb/write_buffer_manager.h"
#include "table/block_based/block.h" #include "table/block_based/block.h"
#include "table/block_based/block_based_table_builder.h" #include "table/block_based/block_based_table_builder.h"

@ -20,6 +20,7 @@
#include "monitoring/histogram.h" #include "monitoring/histogram.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "rocksdb/trace_record.h"
#include "util/gflags_compat.h" #include "util/gflags_compat.h"
#include "util/string_util.h" #include "util/string_util.h"

@ -11,6 +11,7 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/trace_record.h"
#include "rocksdb/utilities/sim_cache.h" #include "rocksdb/utilities/sim_cache.h"
#include "trace_replay/block_cache_tracer.h" #include "trace_replay/block_cache_tracer.h"
#include "utilities/simulator_cache/cache_simulator.h" #include "utilities/simulator_cache/cache_simulator.h"

@ -21,6 +21,7 @@ int main() {
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h" #include "rocksdb/trace_reader_writer.h"
#include "rocksdb/trace_record.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "tools/block_cache_analyzer/block_cache_trace_analyzer.h" #include "tools/block_cache_analyzer/block_cache_trace_analyzer.h"

@ -63,6 +63,9 @@
#include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/options_type.h" #include "rocksdb/utilities/options_type.h"
#include "rocksdb/utilities/options_util.h" #include "rocksdb/utilities/options_util.h"
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/replayer.h"
#endif // ROCKSDB_LITE
#include "rocksdb/utilities/sim_cache.h" #include "rocksdb/utilities/sim_cache.h"
#include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
@ -228,7 +231,9 @@ IF_ROCKSDB_LITE("",
"\tmemstats -- Print memtable stats\n" "\tmemstats -- Print memtable stats\n"
"\tsstables -- Print sstable info\n" "\tsstables -- Print sstable info\n"
"\theapprofile -- Dump a heap profile (if supported by this port)\n" "\theapprofile -- Dump a heap profile (if supported by this port)\n"
#ifndef ROCKSDB_LITE
"\treplay -- replay the trace file specified with trace_file\n" "\treplay -- replay the trace file specified with trace_file\n"
#endif // ROCKSDB_LITE
"\tgetmergeoperands -- Insert lots of merge records which are a list of " "\tgetmergeoperands -- Insert lots of merge records which are a list of "
"sorted ints for a key and then compare performance of lookup for another " "sorted ints for a key and then compare performance of lookup for another "
"key " "key "
@ -997,10 +1002,12 @@ DEFINE_bool(report_bg_io_stats, false,
DEFINE_bool(use_stderr_info_logger, false, DEFINE_bool(use_stderr_info_logger, false,
"Write info logs to stderr instead of to LOG file. "); "Write info logs to stderr instead of to LOG file. ");
#ifndef ROCKSDB_LITE
DEFINE_string(trace_file, "", "Trace workload to a file. "); DEFINE_string(trace_file, "", "Trace workload to a file. ");
DEFINE_int32(trace_replay_fast_forward, 1, DEFINE_double(trace_replay_fast_forward, 1.0,
"Fast forward trace replay, must >= 1. "); "Fast forward trace replay, must > 0.0.");
DEFINE_int32(block_cache_trace_sampling_frequency, 1, DEFINE_int32(block_cache_trace_sampling_frequency, 1,
"Block cache trace sampling frequency, termed s. It uses spatial " "Block cache trace sampling frequency, termed s. It uses spatial "
"downsampling and samples accesses to one out of s blocks."); "downsampling and samples accesses to one out of s blocks.");
@ -1014,6 +1021,8 @@ DEFINE_string(block_cache_trace_file, "", "Block cache trace file path.");
DEFINE_int32(trace_replay_threads, 1, DEFINE_int32(trace_replay_threads, 1,
"The number of threads to replay, must >=1."); "The number of threads to replay, must >=1.");
#endif // ROCKSDB_LITE
static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
const char* ctype) { const char* ctype) {
assert(ctype); assert(ctype);
@ -3468,6 +3477,7 @@ class Benchmark {
PrintStats("rocksdb.sstables"); PrintStats("rocksdb.sstables");
} else if (name == "stats_history") { } else if (name == "stats_history") {
PrintStatsHistory(); PrintStatsHistory();
#ifndef ROCKSDB_LITE
} else if (name == "replay") { } else if (name == "replay") {
if (num_threads > 1) { if (num_threads > 1) {
fprintf(stderr, "Multi-threaded replay is not yet supported\n"); fprintf(stderr, "Multi-threaded replay is not yet supported\n");
@ -3478,6 +3488,7 @@ class Benchmark {
ErrorExit(); ErrorExit();
} }
method = &Benchmark::Replay; method = &Benchmark::Replay;
#endif // ROCKSDB_LITE
} else if (name == "getmergeoperands") { } else if (name == "getmergeoperands") {
method = &Benchmark::GetMergeOperands; method = &Benchmark::GetMergeOperands;
} else if (!name.empty()) { // No error message for empty name } else if (!name.empty()) { // No error message for empty name
@ -7978,6 +7989,8 @@ class Benchmark {
} }
} }
#ifndef ROCKSDB_LITE
void Replay(ThreadState* thread) { void Replay(ThreadState* thread) {
if (db_.db != nullptr) { if (db_.db != nullptr) {
Replay(thread, &db_); Replay(thread, &db_);
@ -7997,20 +8010,34 @@ class Benchmark {
s.ToString().c_str()); s.ToString().c_str());
exit(1); exit(1);
} }
Replayer replayer(db_with_cfh->db, db_with_cfh->cfh, std::unique_ptr<Replayer> replayer;
std::move(trace_reader)); s = db_with_cfh->db->NewDefaultReplayer(db_with_cfh->cfh,
replayer.SetFastForward( std::move(trace_reader), &replayer);
static_cast<uint32_t>(FLAGS_trace_replay_fast_forward)); if (!s.ok()) {
s = replayer.MultiThreadReplay( fprintf(stderr,
static_cast<uint32_t>(FLAGS_trace_replay_threads)); "Encountered an error creating a default Replayer. "
"Error: %s\n",
s.ToString().c_str());
exit(1);
}
s = replayer->Prepare();
if (!s.ok()) {
fprintf(stderr, "Prepare for replay failed. Error: %s\n",
s.ToString().c_str());
}
s = replayer->Replay(
ReplayOptions(static_cast<uint32_t>(FLAGS_trace_replay_threads),
FLAGS_trace_replay_fast_forward));
replayer.reset();
if (s.ok()) { if (s.ok()) {
fprintf(stdout, "Replay started from trace_file: %s\n", fprintf(stdout, "Replay completed from trace_file: %s\n",
FLAGS_trace_file.c_str()); FLAGS_trace_file.c_str());
} else { } else {
fprintf(stderr, "Starting replay failed. Error: %s\n", fprintf(stderr, "Replay failed. Error: %s\n", s.ToString().c_str());
s.ToString().c_str());
} }
} }
#endif // ROCKSDB_LITE
}; };
int db_bench_tool(int argc, char** argv) { int db_bench_tool(int argc, char** argv) {

@ -195,12 +195,6 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, uint64_t op2) {
return (op1 * op2); return (op1 * op2);
} }
void DecodeCFAndKeyFromString(std::string& buffer, uint32_t* cf_id, Slice* key) {
Slice buf(buffer);
GetFixed32(&buf, cf_id);
GetLengthPrefixedSlice(&buf, key);
}
} // namespace } // namespace
// The default constructor of AnalyzerOptions // The default constructor of AnalyzerOptions
@ -477,25 +471,29 @@ Status TraceAnalyzer::StartProcessing() {
total_requests_++; total_requests_++;
end_time_ = trace.ts; end_time_ = trace.ts;
if (trace.type == kTraceWrite) { if (trace.type == kTraceEnd) {
break;
}
std::unique_ptr<TraceRecord> record;
switch (trace.type) {
case kTraceWrite: {
s = TracerHelper::DecodeWriteRecord(&trace, trace_file_version_,
&record);
if (!s.ok()) {
return s;
}
total_writes_++; total_writes_++;
c_time_ = trace.ts; c_time_ = trace.ts;
Slice batch_data; std::unique_ptr<WriteQueryTraceRecord> r(
if (trace_file_version_ < 2) { reinterpret_cast<WriteQueryTraceRecord*>(record.release()));
Slice tmp_data(trace.payload);
batch_data = tmp_data;
} else {
WritePayload w_payload;
TracerHelper::DecodeWritePayload(&trace, &w_payload);
batch_data = w_payload.write_batch_data;
}
// Note that, if the write happens in a transaction, // Note that, if the write happens in a transaction,
// 'Write' will be called twice, one for Prepare, one for // 'Write' will be called twice, one for Prepare, one for
// Commit. Thus, in the trace, for the same WriteBatch, there // Commit. Thus, in the trace, for the same WriteBatch, there
// will be two reords if it is in a transaction. Here, we only // will be two reords if it is in a transaction. Here, we only
// process the reord that is committed. If write is non-transaction, // process the reord that is committed. If write is non-transaction,
// HasBeginPrepare()==false, so we process it normally. // HasBeginPrepare()==false, so we process it normally.
WriteBatch batch(batch_data.ToString()); WriteBatch batch(r->GetWriteBatchRep().ToString());
if (batch.HasBeginPrepare() && !batch.HasCommit()) { if (batch.HasBeginPrepare() && !batch.HasCommit()) {
continue; continue;
} }
@ -505,47 +503,58 @@ Status TraceAnalyzer::StartProcessing() {
fprintf(stderr, "Cannot process the write batch in the trace\n"); fprintf(stderr, "Cannot process the write batch in the trace\n");
return s; return s;
} }
} else if (trace.type == kTraceGet) { break;
GetPayload get_payload; }
get_payload.get_key = 0; case kTraceGet: {
if (trace_file_version_ < 2) { s = TracerHelper::DecodeGetRecord(&trace, trace_file_version_, &record);
DecodeCFAndKeyFromString(trace.payload, &get_payload.cf_id, if (!s.ok()) {
&get_payload.get_key); return s;
} else {
TracerHelper::DecodeGetPayload(&trace, &get_payload);
} }
total_gets_++; total_gets_++;
std::unique_ptr<GetQueryTraceRecord> r(
s = HandleGet(get_payload.cf_id, get_payload.get_key.ToString(), trace.ts, reinterpret_cast<GetQueryTraceRecord*>(record.release()));
s = HandleGet(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(),
1); 1);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Cannot process the get in the trace\n"); fprintf(stderr, "Cannot process the get in the trace\n");
return s; return s;
} }
} else if (trace.type == kTraceIteratorSeek || break;
trace.type == kTraceIteratorSeekForPrev) { }
IterPayload iter_payload; case kTraceIteratorSeek:
iter_payload.cf_id = 0; case kTraceIteratorSeekForPrev: {
if (trace_file_version_ < 2) { s = TracerHelper::DecodeIterRecord(&trace, trace_file_version_,
DecodeCFAndKeyFromString(trace.payload, &iter_payload.cf_id, &record);
&iter_payload.iter_key); if (!s.ok()) {
} else { return s;
TracerHelper::DecodeIterPayload(&trace, &iter_payload);
} }
s = HandleIter(iter_payload.cf_id, iter_payload.iter_key.ToString(), std::unique_ptr<IteratorSeekQueryTraceRecord> r(
trace.ts, trace.type); reinterpret_cast<IteratorSeekQueryTraceRecord*>(record.release()));
s = HandleIter(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(),
r->GetTraceType());
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Cannot process the iterator in the trace\n"); fprintf(stderr, "Cannot process the iterator in the trace\n");
return s; return s;
} }
} else if (trace.type == kTraceMultiGet) {
MultiGetPayload multiget_payload;
assert(trace_file_version_ >= 2);
TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload);
s = HandleMultiGet(multiget_payload, trace.ts);
} else if (trace.type == kTraceEnd) {
break; break;
} }
case kTraceMultiGet: {
s = TracerHelper::DecodeMultiGetRecord(&trace, trace_file_version_,
&record);
if (!s.ok()) {
return s;
}
std::unique_ptr<MultiGetQueryTraceRecord> r(
reinterpret_cast<MultiGetQueryTraceRecord*>(record.release()));
s = HandleMultiGet(r->GetColumnFamilyIDs(), r->GetKeys(),
r->GetTimestamp());
break;
}
default: {
// Skip unsupported types
break;
}
}
} }
if (s.IsIncomplete()) { if (s.IsIncomplete()) {
// Fix it: Reaching eof returns Incomplete status at the moment. // Fix it: Reaching eof returns Incomplete status at the moment.
@ -1547,9 +1556,8 @@ Status TraceAnalyzer::CloseOutputFiles() {
} }
// Handle the Get request in the trace // Handle the Get request in the trace
Status TraceAnalyzer::HandleGet(uint32_t column_family_id, Status TraceAnalyzer::HandleGet(uint32_t column_family_id, const Slice& key,
const std::string& key, const uint64_t& ts, const uint64_t& ts, const uint32_t& get_ret) {
const uint32_t& get_ret) {
Status s; Status s;
size_t value_size = 0; size_t value_size = 0;
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
@ -1575,8 +1583,8 @@ Status TraceAnalyzer::HandleGet(uint32_t column_family_id,
if (get_ret == 1) { if (get_ret == 1) {
value_size = 10; value_size = 10;
} }
s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, key, s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id,
value_size, ts); key.ToString(), value_size, ts);
if (!s.ok()) { if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics"); return Status::Corruption("Failed to insert key statistics");
} }
@ -1752,9 +1760,8 @@ Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key,
} }
// Handle the Iterator request in the trace // Handle the Iterator request in the trace
Status TraceAnalyzer::HandleIter(uint32_t column_family_id, Status TraceAnalyzer::HandleIter(uint32_t column_family_id, const Slice& key,
const std::string& key, const uint64_t& ts, const uint64_t& ts, TraceType trace_type) {
TraceType& trace_type) {
Status s; Status s;
size_t value_size = 0; size_t value_size = 0;
int type = -1; int type = -1;
@ -1788,7 +1795,7 @@ Status TraceAnalyzer::HandleIter(uint32_t column_family_id,
if (!ta_[type].enabled) { if (!ta_[type].enabled) {
return Status::OK(); return Status::OK();
} }
s = KeyStatsInsertion(type, column_family_id, key, value_size, ts); s = KeyStatsInsertion(type, column_family_id, key.ToString(), value_size, ts);
if (!s.ok()) { if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics"); return Status::Corruption("Failed to insert key statistics");
} }
@ -1796,24 +1803,22 @@ Status TraceAnalyzer::HandleIter(uint32_t column_family_id,
} }
// Handle MultiGet queries in the trace // Handle MultiGet queries in the trace
Status TraceAnalyzer::HandleMultiGet(MultiGetPayload& multiget_payload, Status TraceAnalyzer::HandleMultiGet(
const uint64_t& ts) { const std::vector<uint32_t>& column_family_ids,
const std::vector<Slice>& keys, const uint64_t& ts) {
Status s; Status s;
size_t value_size = 0; size_t value_size = 0;
if (multiget_payload.cf_ids.size() != multiget_payload.multiget_keys.size()) { if (column_family_ids.size() != keys.size()) {
// The size does not match is not the error of tracing and anayzing, we just // The size does not match is not the error of tracing and anayzing, we just
// report it to the user. The analyzing continues. // report it to the user. The analyzing continues.
printf("The CF ID vector size does not match the keys vector size!\n"); printf("The CF ID vector size does not match the keys vector size!\n");
} }
size_t vector_size = std::min(multiget_payload.cf_ids.size(), size_t vector_size = std::min(column_family_ids.size(), keys.size());
multiget_payload.multiget_keys.size());
if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) {
for (size_t i = 0; i < vector_size; i++) { for (size_t i = 0; i < vector_size; i++) {
assert(i < multiget_payload.cf_ids.size() && assert(i < column_family_ids.size() && i < keys.size());
i < multiget_payload.multiget_keys.size());
s = WriteTraceSequence(TraceOperationType::kMultiGet, s = WriteTraceSequence(TraceOperationType::kMultiGet,
multiget_payload.cf_ids[i], column_family_ids[i], keys[i], value_size, ts);
multiget_payload.multiget_keys[i], value_size, ts);
} }
if (!s.ok()) { if (!s.ok()) {
return Status::Corruption("Failed to write the trace sequence to file"); return Status::Corruption("Failed to write the trace sequence to file");
@ -1833,11 +1838,9 @@ Status TraceAnalyzer::HandleMultiGet(MultiGetPayload& multiget_payload,
return Status::OK(); return Status::OK();
} }
for (size_t i = 0; i < vector_size; i++) { for (size_t i = 0; i < vector_size; i++) {
assert(i < multiget_payload.cf_ids.size() && assert(i < column_family_ids.size() && i < keys.size());
i < multiget_payload.multiget_keys.size()); s = KeyStatsInsertion(TraceOperationType::kMultiGet, column_family_ids[i],
s = KeyStatsInsertion(TraceOperationType::kMultiGet, keys[i].ToString(), value_size, ts);
multiget_payload.cf_ids[i],
multiget_payload.multiget_keys[i], value_size, ts);
} }
if (!s.ok()) { if (!s.ok()) {
return Status::Corruption("Failed to insert key statistics"); return Status::Corruption("Failed to insert key statistics");
@ -2011,10 +2014,11 @@ void TraceAnalyzer::PrintStatistics() {
// Write the trace sequence to file // Write the trace sequence to file
Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type, Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type,
const uint32_t& cf_id, const uint32_t& cf_id,
const std::string& key, const Slice& key,
const size_t value_size, const size_t value_size,
const uint64_t ts) { const uint64_t ts) {
std::string hex_key = ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key); std::string hex_key =
ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key.ToString());
int ret; int ret;
ret = snprintf(buffer_, sizeof(buffer_), "%u %u %zu %" PRIu64 "\n", type, ret = snprintf(buffer_, sizeof(buffer_), "%u %u %zu %" PRIu64 "\n", type,
cf_id, value_size, ts); cf_id, value_size, ts);

@ -15,6 +15,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/trace_reader_writer.h" #include "rocksdb/trace_reader_writer.h"
#include "rocksdb/trace_record.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "trace_replay/trace_replay.h" #include "trace_replay/trace_replay.h"
@ -182,7 +183,7 @@ class TraceAnalyzer {
Status WriteTraceUnit(TraceUnit& unit); Status WriteTraceUnit(TraceUnit& unit);
// The trace processing functions for different type // The trace processing functions for different type
Status HandleGet(uint32_t column_family_id, const std::string& key, Status HandleGet(uint32_t column_family_id, const Slice& key,
const uint64_t& ts, const uint32_t& get_ret); const uint64_t& ts, const uint32_t& get_ret);
Status HandlePut(uint32_t column_family_id, const Slice& key, Status HandlePut(uint32_t column_family_id, const Slice& key,
const Slice& value); const Slice& value);
@ -192,9 +193,10 @@ class TraceAnalyzer {
const Slice& end_key); const Slice& end_key);
Status HandleMerge(uint32_t column_family_id, const Slice& key, Status HandleMerge(uint32_t column_family_id, const Slice& key,
const Slice& value); const Slice& value);
Status HandleIter(uint32_t column_family_id, const std::string& key, Status HandleIter(uint32_t column_family_id, const Slice& key,
const uint64_t& ts, TraceType& trace_type); const uint64_t& ts, TraceType trace_type);
Status HandleMultiGet(MultiGetPayload& multiget_payload, const uint64_t& ts); Status HandleMultiGet(const std::vector<uint32_t>& column_family_ids,
const std::vector<Slice>& keys, const uint64_t& ts);
std::vector<TypeUnit>& GetTaVector() { return ta_; } std::vector<TypeUnit>& GetTaVector() { return ta_; }
private: private:
@ -246,7 +248,7 @@ class TraceAnalyzer {
Status TraceUnitWriter( Status TraceUnitWriter(
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>& f_ptr, TraceUnit& unit); std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>& f_ptr, TraceUnit& unit);
Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id, Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id,
const std::string& key, const size_t value_size, const Slice& key, const size_t value_size,
const uint64_t ts); const uint64_t ts);
Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats); Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit); Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);

@ -12,6 +12,7 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/trace_record.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/hash.h" #include "util/hash.h"
#include "util/string_util.h" #include "util/string_util.h"

@ -4,9 +4,11 @@
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "trace_replay/block_cache_tracer.h" #include "trace_replay/block_cache_tracer.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h" #include "rocksdb/trace_reader_writer.h"
#include "rocksdb/trace_record.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"

@ -12,6 +12,7 @@
#include "port/lang.h" #include "port/lang.h"
#include "rocksdb/file_system.h" #include "rocksdb/file_system.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/trace_record.h"
#include "trace_replay/trace_replay.h" #include "trace_replay/trace_replay.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {

@ -8,6 +8,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h" #include "rocksdb/trace_reader_writer.h"
#include "rocksdb/trace_record.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"

@ -0,0 +1,163 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/trace_record.h"
#include <utility>
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "trace_replay/trace_record_handler.h"
namespace ROCKSDB_NAMESPACE {
// TraceRecord
TraceRecord::TraceRecord(uint64_t timestamp) : timestamp_(timestamp) {}
TraceRecord::~TraceRecord() {}
uint64_t TraceRecord::GetTimestamp() const { return timestamp_; }
TraceRecord::Handler* TraceRecord::NewExecutionHandler(
DB* db, const std::vector<ColumnFamilyHandle*>& handles) {
return new TraceExecutionHandler(db, handles);
}
// QueryTraceRecord
QueryTraceRecord::QueryTraceRecord(uint64_t timestamp)
: TraceRecord(timestamp) {}
QueryTraceRecord::~QueryTraceRecord() {}
// WriteQueryTraceRecord
WriteQueryTraceRecord::WriteQueryTraceRecord(PinnableSlice&& write_batch_rep,
uint64_t timestamp)
: QueryTraceRecord(timestamp), rep_(std::move(write_batch_rep)) {}
WriteQueryTraceRecord::WriteQueryTraceRecord(const std::string& write_batch_rep,
uint64_t timestamp)
: QueryTraceRecord(timestamp) {
rep_.PinSelf(write_batch_rep);
}
WriteQueryTraceRecord::~WriteQueryTraceRecord() {}
Slice WriteQueryTraceRecord::GetWriteBatchRep() const { return Slice(rep_); }
Status WriteQueryTraceRecord::Accept(Handler* handler) {
assert(handler != nullptr);
return handler->Handle(*this);
}
// GetQueryTraceRecord
GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id,
PinnableSlice&& key,
uint64_t timestamp)
: QueryTraceRecord(timestamp),
cf_id_(column_family_id),
key_(std::move(key)) {}
GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id,
const std::string& key,
uint64_t timestamp)
: QueryTraceRecord(timestamp), cf_id_(column_family_id) {
key_.PinSelf(key);
}
GetQueryTraceRecord::~GetQueryTraceRecord() {}
uint32_t GetQueryTraceRecord::GetColumnFamilyID() const { return cf_id_; }
Slice GetQueryTraceRecord::GetKey() const { return Slice(key_); }
Status GetQueryTraceRecord::Accept(Handler* handler) {
assert(handler != nullptr);
return handler->Handle(*this);
}
// IteratorQueryTraceRecord
IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp)
: QueryTraceRecord(timestamp) {}
IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {}
// IteratorSeekQueryTraceRecord
IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key,
uint64_t timestamp)
: IteratorQueryTraceRecord(timestamp),
type_(seek_type),
cf_id_(column_family_id),
key_(std::move(key)) {}
IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord(
SeekType seek_type, uint32_t column_family_id, const std::string& key,
uint64_t timestamp)
: IteratorQueryTraceRecord(timestamp),
type_(seek_type),
cf_id_(column_family_id) {
key_.PinSelf(key);
}
IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() {}
TraceType IteratorSeekQueryTraceRecord::GetTraceType() const {
return static_cast<TraceType>(type_);
}
IteratorSeekQueryTraceRecord::SeekType
IteratorSeekQueryTraceRecord::GetSeekType() const {
return type_;
}
uint32_t IteratorSeekQueryTraceRecord::GetColumnFamilyID() const {
return cf_id_;
}
Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); }
Status IteratorSeekQueryTraceRecord::Accept(Handler* handler) {
assert(handler != nullptr);
return handler->Handle(*this);
}
// MultiGetQueryTraceRecord
MultiGetQueryTraceRecord::MultiGetQueryTraceRecord(
std::vector<uint32_t> column_family_ids, std::vector<PinnableSlice>&& keys,
uint64_t timestamp)
: QueryTraceRecord(timestamp),
cf_ids_(column_family_ids),
keys_(std::move(keys)) {}
MultiGetQueryTraceRecord::MultiGetQueryTraceRecord(
std::vector<uint32_t> column_family_ids,
const std::vector<std::string>& keys, uint64_t timestamp)
: QueryTraceRecord(timestamp), cf_ids_(column_family_ids) {
keys_.reserve(keys.size());
for (const std::string& key : keys) {
PinnableSlice ps;
ps.PinSelf(key);
keys_.push_back(std::move(ps));
}
}
MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {}
std::vector<uint32_t> MultiGetQueryTraceRecord::GetColumnFamilyIDs() const {
return cf_ids_;
}
std::vector<Slice> MultiGetQueryTraceRecord::GetKeys() const {
return std::vector<Slice>(keys_.begin(), keys_.end());
}
Status MultiGetQueryTraceRecord::Accept(Handler* handler) {
assert(handler != nullptr);
return handler->Handle(*this);
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,108 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "trace_replay/trace_record_handler.h"
#include "rocksdb/iterator.h"
#include "rocksdb/write_batch.h"
namespace ROCKSDB_NAMESPACE {
// TraceExecutionHandler
TraceExecutionHandler::TraceExecutionHandler(
DB* db, const std::vector<ColumnFamilyHandle*>& handles)
: TraceRecord::Handler(),
db_(db),
write_opts_(WriteOptions()),
read_opts_(ReadOptions()) {
assert(db != nullptr);
assert(!handles.empty());
cf_map_.reserve(handles.size());
for (ColumnFamilyHandle* handle : handles) {
assert(handle != nullptr);
cf_map_.insert({handle->GetID(), handle});
}
}
TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); }
Status TraceExecutionHandler::Handle(const WriteQueryTraceRecord& record) {
WriteBatch batch(record.GetWriteBatchRep().ToString());
return db_->Write(write_opts_, &batch);
}
Status TraceExecutionHandler::Handle(const GetQueryTraceRecord& record) {
auto it = cf_map_.find(record.GetColumnFamilyID());
if (it == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
assert(it->second != nullptr);
std::string value;
Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value);
// Treat not found as ok and return other errors.
return s.IsNotFound() ? Status::OK() : s;
}
Status TraceExecutionHandler::Handle(
const IteratorSeekQueryTraceRecord& record) {
auto it = cf_map_.find(record.GetColumnFamilyID());
if (it == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
assert(it->second != nullptr);
Iterator* single_iter = db_->NewIterator(read_opts_, it->second);
switch (record.GetSeekType()) {
case IteratorSeekQueryTraceRecord::kSeekForPrev: {
single_iter->SeekForPrev(record.GetKey());
break;
}
default: {
single_iter->Seek(record.GetKey());
break;
}
}
Status s = single_iter->status();
delete single_iter;
return s;
}
Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) {
std::vector<ColumnFamilyHandle*> handles;
handles.reserve(record.GetColumnFamilyIDs().size());
for (uint32_t cf_id : record.GetColumnFamilyIDs()) {
auto it = cf_map_.find(cf_id);
if (it == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
assert(it->second != nullptr);
handles.push_back(it->second);
}
std::vector<Slice> keys = record.GetKeys();
if (handles.empty() || keys.empty()) {
return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
}
if (handles.size() != keys.size()) {
return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch.");
}
std::vector<std::string> values;
std::vector<Status> ss = db_->MultiGet(read_opts_, handles, keys, &values);
// Treat not found as ok, return other errors.
for (Status s : ss) {
if (!s.ok() && !s.IsNotFound()) {
return s;
}
}
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,39 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <unordered_map>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE {
// Handler to execute TraceRecord.
class TraceExecutionHandler : public TraceRecord::Handler {
public:
TraceExecutionHandler(DB* db,
const std::vector<ColumnFamilyHandle*>& handles);
virtual ~TraceExecutionHandler() override;
virtual Status Handle(const WriteQueryTraceRecord& record) override;
virtual Status Handle(const GetQueryTraceRecord& record) override;
virtual Status Handle(const IteratorSeekQueryTraceRecord& record) override;
virtual Status Handle(const MultiGetQueryTraceRecord& record) override;
private:
DB* db_;
std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
WriteOptions write_opts_;
ReadOptions read_opts_;
};
// To do: Handler for trace_analyzer.
} // namespace ROCKSDB_NAMESPACE

@ -11,6 +11,7 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
@ -18,7 +19,6 @@
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/threadpool_imp.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -104,6 +104,20 @@ Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
return Status::OK(); return Status::OK();
} }
Status TracerHelper::DecodeHeader(const std::string& encoded_trace,
Trace* header) {
Status s = TracerHelper::DecodeTrace(encoded_trace, header);
if (header->type != kTraceBegin) {
return Status::Corruption("Corrupted trace file. Incorrect header.");
}
if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
return Status::Corruption("Corrupted trace file. Incorrect magic.");
}
return s;
}
bool TracerHelper::SetPayloadMap(uint64_t& payload_map, bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
const TracePayloadType payload_type) { const TracePayloadType payload_type) {
uint64_t old_state = payload_map; uint64_t old_state = payload_map;
@ -112,18 +126,26 @@ bool TracerHelper::SetPayloadMap(uint64_t& payload_map,
return old_state != payload_map; return old_state != payload_map;
} }
void TracerHelper::DecodeWritePayload(Trace* trace, Status TracerHelper::DecodeWriteRecord(Trace* trace, int trace_file_version,
WritePayload* write_payload) { std::unique_ptr<TraceRecord>* record) {
assert(write_payload != nullptr); assert(trace != nullptr);
assert(trace->type == kTraceWrite);
PinnableSlice rep;
if (trace_file_version < 2) {
rep.PinSelf(trace->payload);
} else {
Slice buf(trace->payload); Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map); GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map); int64_t payload_map = static_cast<int64_t>(trace->payload_map);
Slice write_batch_data;
while (payload_map) { while (payload_map) {
// Find the rightmost set bit. // Find the rightmost set bit.
uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map)); uint32_t set_pos =
static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) { switch (set_pos) {
case TracePayloadType::kWriteBatchData: case TracePayloadType::kWriteBatchData:
GetLengthPrefixedSlice(&buf, &(write_payload->write_batch_data)); GetLengthPrefixedSlice(&buf, &write_batch_data);
break; break;
default: default:
assert(false); assert(false);
@ -131,22 +153,40 @@ void TracerHelper::DecodeWritePayload(Trace* trace,
// unset the rightmost bit. // unset the rightmost bit.
payload_map &= (payload_map - 1); payload_map &= (payload_map - 1);
} }
rep.PinSelf(write_batch_data);
}
if (record != nullptr) {
record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts));
} }
void TracerHelper::DecodeGetPayload(Trace* trace, GetPayload* get_payload) { return Status::OK();
assert(get_payload != nullptr); }
Status TracerHelper::DecodeGetRecord(Trace* trace, int trace_file_version,
std::unique_ptr<TraceRecord>* record) {
assert(trace != nullptr);
assert(trace->type == kTraceGet);
uint32_t cf_id = 0;
Slice get_key;
if (trace_file_version < 2) {
DecodeCFAndKey(trace->payload, &cf_id, &get_key);
} else {
Slice buf(trace->payload); Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map); GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map); int64_t payload_map = static_cast<int64_t>(trace->payload_map);
while (payload_map) { while (payload_map) {
// Find the rightmost set bit. // Find the rightmost set bit.
uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map)); uint32_t set_pos =
static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) { switch (set_pos) {
case TracePayloadType::kGetCFID: case TracePayloadType::kGetCFID:
GetFixed32(&buf, &(get_payload->cf_id)); GetFixed32(&buf, &cf_id);
break; break;
case TracePayloadType::kGetKey: case TracePayloadType::kGetKey:
GetLengthPrefixedSlice(&buf, &(get_payload->get_key)); GetLengthPrefixedSlice(&buf, &get_key);
break; break;
default: default:
assert(false); assert(false);
@ -156,26 +196,50 @@ void TracerHelper::DecodeGetPayload(Trace* trace, GetPayload* get_payload) {
} }
} }
void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) { if (record != nullptr) {
assert(iter_payload != nullptr); PinnableSlice ps;
ps.PinSelf(get_key);
record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts));
}
return Status::OK();
}
Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version,
std::unique_ptr<TraceRecord>* record) {
assert(trace != nullptr);
assert(trace->type == kTraceIteratorSeek ||
trace->type == kTraceIteratorSeekForPrev);
uint32_t cf_id = 0;
Slice iter_key;
if (trace_file_version < 2) {
DecodeCFAndKey(trace->payload, &cf_id, &iter_key);
} else {
// Are these two used anywhere?
Slice lower_bound;
Slice upper_bound;
Slice buf(trace->payload); Slice buf(trace->payload);
GetFixed64(&buf, &trace->payload_map); GetFixed64(&buf, &trace->payload_map);
int64_t payload_map = static_cast<int64_t>(trace->payload_map); int64_t payload_map = static_cast<int64_t>(trace->payload_map);
while (payload_map) { while (payload_map) {
// Find the rightmost set bit. // Find the rightmost set bit.
uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map)); uint32_t set_pos =
static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) { switch (set_pos) {
case TracePayloadType::kIterCFID: case TracePayloadType::kIterCFID:
GetFixed32(&buf, &(iter_payload->cf_id)); GetFixed32(&buf, &cf_id);
break; break;
case TracePayloadType::kIterKey: case TracePayloadType::kIterKey:
GetLengthPrefixedSlice(&buf, &(iter_payload->iter_key)); GetLengthPrefixedSlice(&buf, &iter_key);
break; break;
case TracePayloadType::kIterLowerBound: case TracePayloadType::kIterLowerBound:
GetLengthPrefixedSlice(&buf, &(iter_payload->lower_bound)); GetLengthPrefixedSlice(&buf, &lower_bound);
break; break;
case TracePayloadType::kIterUpperBound: case TracePayloadType::kIterUpperBound:
GetLengthPrefixedSlice(&buf, &(iter_payload->upper_bound)); GetLengthPrefixedSlice(&buf, &upper_bound);
break; break;
default: default:
assert(false); assert(false);
@ -185,9 +249,30 @@ void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) {
} }
} }
void TracerHelper::DecodeMultiGetPayload(Trace* trace, if (record != nullptr) {
MultiGetPayload* multiget_payload) { PinnableSlice ps_key;
assert(multiget_payload != nullptr); ps_key.PinSelf(iter_key);
record->reset(new IteratorSeekQueryTraceRecord(
static_cast<IteratorSeekQueryTraceRecord::SeekType>(trace->type), cf_id,
std::move(ps_key), trace->ts));
}
return Status::OK();
}
Status TracerHelper::DecodeMultiGetRecord(
Trace* trace, int trace_file_version,
std::unique_ptr<TraceRecord>* record) {
assert(trace != nullptr);
assert(trace->type == kTraceMultiGet);
if (trace_file_version < 2) {
return Status::Corruption("MultiGet is not supported.");
}
uint32_t multiget_size = 0;
std::vector<uint32_t> cf_ids;
std::vector<PinnableSlice> multiget_keys;
Slice cfids_payload; Slice cfids_payload;
Slice keys_payload; Slice keys_payload;
Slice buf(trace->payload); Slice buf(trace->payload);
@ -198,7 +283,7 @@ void TracerHelper::DecodeMultiGetPayload(Trace* trace,
uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map)); uint32_t set_pos = static_cast<uint32_t>(log2(payload_map & -payload_map));
switch (set_pos) { switch (set_pos) {
case TracePayloadType::kMultiGetSize: case TracePayloadType::kMultiGetSize:
GetFixed32(&buf, &(multiget_payload->multiget_size)); GetFixed32(&buf, &multiget_size);
break; break;
case TracePayloadType::kMultiGetCFIDs: case TracePayloadType::kMultiGetCFIDs:
GetLengthPrefixedSlice(&buf, &cfids_payload); GetLengthPrefixedSlice(&buf, &cfids_payload);
@ -212,18 +297,31 @@ void TracerHelper::DecodeMultiGetPayload(Trace* trace,
// unset the rightmost bit. // unset the rightmost bit.
payload_map &= (payload_map - 1); payload_map &= (payload_map - 1);
} }
if (multiget_size == 0) {
return Status::InvalidArgument("Empty MultiGet cf_ids or keys.");
}
// Decode the cfids_payload and keys_payload // Decode the cfids_payload and keys_payload
multiget_payload->cf_ids.reserve(multiget_payload->multiget_size); cf_ids.reserve(multiget_size);
multiget_payload->multiget_keys.reserve(multiget_payload->multiget_size); multiget_keys.reserve(multiget_size);
for (uint32_t i = 0; i < multiget_payload->multiget_size; i++) { for (uint32_t i = 0; i < multiget_size; i++) {
uint32_t tmp_cfid; uint32_t tmp_cfid;
Slice tmp_key; Slice tmp_key;
GetFixed32(&cfids_payload, &tmp_cfid); GetFixed32(&cfids_payload, &tmp_cfid);
GetLengthPrefixedSlice(&keys_payload, &tmp_key); GetLengthPrefixedSlice(&keys_payload, &tmp_key);
multiget_payload->cf_ids.push_back(tmp_cfid); cf_ids.push_back(tmp_cfid);
multiget_payload->multiget_keys.push_back(tmp_key.ToString()); Slice s(tmp_key);
PinnableSlice ps;
ps.PinSelf(s);
multiget_keys.push_back(std::move(ps));
} }
if (record != nullptr) {
record->reset(new MultiGetQueryTraceRecord(
std::move(cf_ids), std::move(multiget_keys), trace->ts));
}
return Status::OK();
} }
Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options, Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
@ -418,10 +516,9 @@ bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
if (IsTraceFileOverMax()) { if (IsTraceFileOverMax()) {
return true; return true;
} }
if ((trace_options_.filter & kTraceFilterGet if ((trace_options_.filter & kTraceFilterGet && trace_type == kTraceGet) ||
&& trace_type == kTraceGet) (trace_options_.filter & kTraceFilterWrite &&
|| (trace_options_.filter & kTraceFilterWrite trace_type == kTraceWrite)) {
&& trace_type == kTraceWrite)) {
return true; return true;
} }
++trace_request_count_; ++trace_request_count_;
@ -471,445 +568,4 @@ Status Tracer::WriteTrace(const Trace& trace) {
Status Tracer::Close() { return WriteFooter(); } Status Tracer::Close() { return WriteFooter(); }
Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
std::unique_ptr<TraceReader>&& reader)
: trace_reader_(std::move(reader)) {
assert(db != nullptr);
db_ = static_cast<DBImpl*>(db->GetRootDB());
env_ = Env::Default();
for (ColumnFamilyHandle* cfh : handles) {
cf_map_[cfh->GetID()] = cfh;
}
fast_forward_ = 1;
}
Replayer::~Replayer() { trace_reader_.reset(); }
Status Replayer::SetFastForward(uint32_t fast_forward) {
Status s;
if (fast_forward < 1) {
s = Status::InvalidArgument("Wrong fast forward speed!");
} else {
fast_forward_ = fast_forward;
s = Status::OK();
}
return s;
}
Status Replayer::Replay() {
Status s;
Trace header;
int db_version;
s = ReadHeader(&header);
if (!s.ok()) {
return s;
}
s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
if (!s.ok()) {
return s;
}
std::chrono::system_clock::time_point replay_epoch =
std::chrono::system_clock::now();
WriteOptions woptions;
ReadOptions roptions;
Trace trace;
uint64_t ops = 0;
Iterator* single_iter = nullptr;
while (s.ok()) {
trace.reset();
s = ReadTrace(&trace);
if (!s.ok()) {
break;
}
std::this_thread::sleep_until(
replay_epoch +
std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
if (trace.type == kTraceWrite) {
if (trace_file_version_ < 2) {
WriteBatch batch(trace.payload);
db_->Write(woptions, &batch);
} else {
WritePayload w_payload;
TracerHelper::DecodeWritePayload(&trace, &w_payload);
WriteBatch batch(w_payload.write_batch_data.ToString());
db_->Write(woptions, &batch);
}
ops++;
} else if (trace.type == kTraceGet) {
GetPayload get_payload;
get_payload.cf_id = 0;
get_payload.get_key = 0;
if (trace_file_version_ < 2) {
DecodeCFAndKey(trace.payload, &get_payload.cf_id, &get_payload.get_key);
} else {
TracerHelper::DecodeGetPayload(&trace, &get_payload);
}
if (get_payload.cf_id > 0 &&
cf_map_.find(get_payload.cf_id) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
std::string value;
if (get_payload.cf_id == 0) {
db_->Get(roptions, get_payload.get_key, &value);
} else {
db_->Get(roptions, cf_map_[get_payload.cf_id], get_payload.get_key,
&value);
}
ops++;
} else if (trace.type == kTraceIteratorSeek) {
// Currently, we only support to call Seek. The Next() and Prev() is not
// supported.
IterPayload iter_payload;
iter_payload.cf_id = 0;
if (trace_file_version_ < 2) {
DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
&iter_payload.iter_key);
} else {
TracerHelper::DecodeIterPayload(&trace, &iter_payload);
}
if (iter_payload.cf_id > 0 &&
cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
if (iter_payload.cf_id == 0) {
single_iter = db_->NewIterator(roptions);
} else {
single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
}
single_iter->Seek(iter_payload.iter_key);
ops++;
delete single_iter;
} else if (trace.type == kTraceIteratorSeekForPrev) {
// Currently, we only support to call SeekForPrev. The Next() and Prev()
// is not supported.
IterPayload iter_payload;
iter_payload.cf_id = 0;
if (trace_file_version_ < 2) {
DecodeCFAndKey(trace.payload, &iter_payload.cf_id,
&iter_payload.iter_key);
} else {
TracerHelper::DecodeIterPayload(&trace, &iter_payload);
}
if (iter_payload.cf_id > 0 &&
cf_map_.find(iter_payload.cf_id) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
if (iter_payload.cf_id == 0) {
single_iter = db_->NewIterator(roptions);
} else {
single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]);
}
single_iter->SeekForPrev(iter_payload.iter_key);
ops++;
delete single_iter;
} else if (trace.type == kTraceMultiGet) {
MultiGetPayload multiget_payload;
assert(trace_file_version_ >= 2);
TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload);
std::vector<ColumnFamilyHandle*> v_cfd;
std::vector<Slice> keys;
assert(multiget_payload.cf_ids.size() ==
multiget_payload.multiget_keys.size());
for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
assert(i < multiget_payload.cf_ids.size() &&
i < multiget_payload.multiget_keys.size());
if (cf_map_.find(multiget_payload.cf_ids[i]) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
v_cfd.push_back(cf_map_[multiget_payload.cf_ids[i]]);
keys.push_back(Slice(multiget_payload.multiget_keys[i]));
}
std::vector<std::string> values;
std::vector<Status> ss = db_->MultiGet(roptions, v_cfd, keys, &values);
} else if (trace.type == kTraceEnd) {
// Do nothing for now.
// TODO: Add some validations later.
break;
}
}
if (s.IsIncomplete()) {
// Reaching eof returns Incomplete status at the moment.
// Could happen when killing a process without calling EndTrace() API.
// TODO: Add better error handling.
return Status::OK();
}
return s;
}
// The trace can be replayed with multithread by configurnge the number of
// threads in the thread pool. Trace records are read from the trace file
// sequentially and the corresponding queries are scheduled in the task
// queue based on the timestamp. Currently, we support Write_batch (Put,
// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
Status Replayer::MultiThreadReplay(uint32_t threads_num) {
Status s;
Trace header;
int db_version;
s = ReadHeader(&header);
if (!s.ok()) {
return s;
}
s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
if (!s.ok()) {
return s;
}
ThreadPoolImpl thread_pool;
thread_pool.SetHostEnv(env_);
if (threads_num > 1) {
thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
} else {
thread_pool.SetBackgroundThreads(1);
}
std::chrono::system_clock::time_point replay_epoch =
std::chrono::system_clock::now();
WriteOptions woptions;
ReadOptions roptions;
uint64_t ops = 0;
while (s.ok()) {
std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
ra->db = db_;
s = ReadTrace(&(ra->trace_entry));
if (!s.ok()) {
break;
}
ra->cf_map = &cf_map_;
ra->woptions = woptions;
ra->roptions = roptions;
ra->trace_file_version = trace_file_version_;
std::this_thread::sleep_until(
replay_epoch + std::chrono::microseconds(
(ra->trace_entry.ts - header.ts) / fast_forward_));
if (ra->trace_entry.type == kTraceWrite) {
thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
nullptr);
ops++;
} else if (ra->trace_entry.type == kTraceGet) {
thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
nullptr);
ops++;
} else if (ra->trace_entry.type == kTraceIteratorSeek) {
thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
nullptr);
ops++;
} else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
nullptr, nullptr);
ops++;
} else if (ra->trace_entry.type == kTraceMultiGet) {
thread_pool.Schedule(&Replayer::BGWorkMultiGet, ra.release(), nullptr,
nullptr);
ops++;
} else if (ra->trace_entry.type == kTraceEnd) {
// Do nothing for now.
// TODO: Add some validations later.
break;
} else {
// Other trace entry types that are not implemented for replay.
// To finish the replay, we continue the process.
continue;
}
}
if (s.IsIncomplete()) {
// Reaching eof returns Incomplete status at the moment.
// Could happen when killing a process without calling EndTrace() API.
// TODO: Add better error handling.
s = Status::OK();
}
thread_pool.JoinAllThreads();
return s;
}
Status Replayer::ReadHeader(Trace* header) {
assert(header != nullptr);
std::string encoded_trace;
// Read the trace head
Status s = trace_reader_->Read(&encoded_trace);
if (!s.ok()) {
return s;
}
s = TracerHelper::DecodeTrace(encoded_trace, header);
if (header->type != kTraceBegin) {
return Status::Corruption("Corrupted trace file. Incorrect header.");
}
if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
return Status::Corruption("Corrupted trace file. Incorrect magic.");
}
return s;
}
Status Replayer::ReadFooter(Trace* footer) {
assert(footer != nullptr);
Status s = ReadTrace(footer);
if (!s.ok()) {
return s;
}
if (footer->type != kTraceEnd) {
return Status::Corruption("Corrupted trace file. Incorrect footer.");
}
// TODO: Add more validations later
return s;
}
Status Replayer::ReadTrace(Trace* trace) {
assert(trace != nullptr);
std::string encoded_trace;
Status s = trace_reader_->Read(&encoded_trace);
if (!s.ok()) {
return s;
}
return TracerHelper::DecodeTrace(encoded_trace, trace);
}
void Replayer::BGWorkGet(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
ra->cf_map);
GetPayload get_payload;
get_payload.cf_id = 0;
if (ra->trace_file_version < 2) {
DecodeCFAndKey(ra->trace_entry.payload, &get_payload.cf_id,
&get_payload.get_key);
} else {
TracerHelper::DecodeGetPayload(&(ra->trace_entry), &get_payload);
}
if (get_payload.cf_id > 0 &&
cf_map->find(get_payload.cf_id) == cf_map->end()) {
return;
}
std::string value;
if (get_payload.cf_id == 0) {
ra->db->Get(ra->roptions, get_payload.get_key, &value);
} else {
ra->db->Get(ra->roptions, (*cf_map)[get_payload.cf_id], get_payload.get_key,
&value);
}
return;
}
void Replayer::BGWorkWriteBatch(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
if (ra->trace_file_version < 2) {
WriteBatch batch(ra->trace_entry.payload);
ra->db->Write(ra->woptions, &batch);
} else {
WritePayload w_payload;
TracerHelper::DecodeWritePayload(&(ra->trace_entry), &w_payload);
WriteBatch batch(w_payload.write_batch_data.ToString());
ra->db->Write(ra->woptions, &batch);
}
return;
}
void Replayer::BGWorkIterSeek(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
ra->cf_map);
IterPayload iter_payload;
iter_payload.cf_id = 0;
if (ra->trace_file_version < 2) {
DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
&iter_payload.iter_key);
} else {
TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
}
if (iter_payload.cf_id > 0 &&
cf_map->find(iter_payload.cf_id) == cf_map->end()) {
return;
}
Iterator* single_iter = nullptr;
if (iter_payload.cf_id == 0) {
single_iter = ra->db->NewIterator(ra->roptions);
} else {
single_iter =
ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
}
single_iter->Seek(iter_payload.iter_key);
delete single_iter;
return;
}
void Replayer::BGWorkIterSeekForPrev(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
ra->cf_map);
IterPayload iter_payload;
iter_payload.cf_id = 0;
if (ra->trace_file_version < 2) {
DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id,
&iter_payload.iter_key);
} else {
TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload);
}
if (iter_payload.cf_id > 0 &&
cf_map->find(iter_payload.cf_id) == cf_map->end()) {
return;
}
Iterator* single_iter = nullptr;
if (iter_payload.cf_id == 0) {
single_iter = ra->db->NewIterator(ra->roptions);
} else {
single_iter =
ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]);
}
single_iter->SeekForPrev(iter_payload.iter_key);
delete single_iter;
return;
}
void Replayer::BGWorkMultiGet(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
ra->cf_map);
MultiGetPayload multiget_payload;
if (ra->trace_file_version < 2) {
return;
}
TracerHelper::DecodeMultiGetPayload(&(ra->trace_entry), &multiget_payload);
std::vector<ColumnFamilyHandle*> v_cfd;
std::vector<Slice> keys;
if (multiget_payload.cf_ids.size() != multiget_payload.multiget_keys.size()) {
return;
}
for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
if (cf_map->find(multiget_payload.cf_ids[i]) == cf_map->end()) {
return;
}
v_cfd.push_back((*cf_map)[multiget_payload.cf_ids[i]]);
keys.push_back(Slice(multiget_payload.multiget_keys[i]));
}
std::vector<std::string> values;
std::vector<Status> ss = ra->db->MultiGet(ra->roptions, v_cfd, keys, &values);
return;
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -5,13 +5,17 @@
#pragma once #pragma once
#include <atomic>
#include <memory> #include <memory>
#include <mutex>
#include <unordered_map> #include <unordered_map>
#include <utility> #include <utility>
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/trace_record.h"
#include "rocksdb/utilities/replayer.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -43,31 +47,6 @@ const unsigned int kTraceMetadataSize =
static const int kTraceFileMajorVersion = 0; static const int kTraceFileMajorVersion = 0;
static const int kTraceFileMinorVersion = 2; static const int kTraceFileMinorVersion = 2;
// Supported Trace types.
enum TraceType : char {
kTraceBegin = 1,
kTraceEnd = 2,
kTraceWrite = 3,
kTraceGet = 4,
kTraceIteratorSeek = 5,
kTraceIteratorSeekForPrev = 6,
// Block cache related types.
kBlockTraceIndexBlock = 7,
kBlockTraceFilterBlock = 8,
kBlockTraceDataBlock = 9,
kBlockTraceUncompressionDictBlock = 10,
kBlockTraceRangeDeletionBlock = 11,
// For IOTracing.
kIOTracer = 12,
// For query tracing
kTraceMultiGet = 13,
// All trace types should be added before kTraceMax
kTraceMax,
};
// TODO: This should also be made part of public interface to help users build
// custom TracerReaders and TraceWriters.
//
// The data structure that defines a single trace. // The data structure that defines a single trace.
struct Trace { struct Trace {
uint64_t ts; // timestamp uint64_t ts; // timestamp
@ -105,28 +84,6 @@ enum TracePayloadType : char {
kMultiGetKeys = 10, kMultiGetKeys = 10,
}; };
struct WritePayload {
Slice write_batch_data;
};
struct GetPayload {
uint32_t cf_id = 0;
Slice get_key;
};
struct IterPayload {
uint32_t cf_id = 0;
Slice iter_key;
Slice lower_bound;
Slice upper_bound;
};
struct MultiGetPayload {
uint32_t multiget_size;
std::vector<uint32_t> cf_ids;
std::vector<std::string> multiget_keys;
};
class TracerHelper { class TracerHelper {
public: public:
// Parse the string with major and minor version only // Parse the string with major and minor version only
@ -142,22 +99,28 @@ class TracerHelper {
// Decode a string into the given trace object. // Decode a string into the given trace object.
static Status DecodeTrace(const std::string& encoded_trace, Trace* trace); static Status DecodeTrace(const std::string& encoded_trace, Trace* trace);
// Decode a string into the given trace header.
static Status DecodeHeader(const std::string& encoded_trace, Trace* header);
// Set the payload map based on the payload type // Set the payload map based on the payload type
static bool SetPayloadMap(uint64_t& payload_map, static bool SetPayloadMap(uint64_t& payload_map,
const TracePayloadType payload_type); const TracePayloadType payload_type);
// Decode the write payload and store in WrteiPayload // Decode the write payload and store in WrteiPayload
static void DecodeWritePayload(Trace* trace, WritePayload* write_payload); static Status DecodeWriteRecord(Trace* trace, int trace_file_version,
std::unique_ptr<TraceRecord>* record);
// Decode the get payload and store in WrteiPayload // Decode the get payload and store in WrteiPayload
static void DecodeGetPayload(Trace* trace, GetPayload* get_payload); static Status DecodeGetRecord(Trace* trace, int trace_file_version,
std::unique_ptr<TraceRecord>* record);
// Decode the iter payload and store in WrteiPayload // Decode the iter payload and store in WrteiPayload
static void DecodeIterPayload(Trace* trace, IterPayload* iter_payload); static Status DecodeIterRecord(Trace* trace, int trace_file_version,
std::unique_ptr<TraceRecord>* record);
// Decode the multiget payload and store in MultiGetPayload // Decode the multiget payload and store in MultiGetPayload
static void DecodeMultiGetPayload(Trace* trace, static Status DecodeMultiGetRecord(Trace* trace, int trace_file_version,
MultiGetPayload* multiget_payload); std::unique_ptr<TraceRecord>* record);
}; };
// Tracer captures all RocksDB operations using a user-provided TraceWriter. // Tracer captures all RocksDB operations using a user-provided TraceWriter.
@ -222,75 +185,4 @@ class Tracer {
uint64_t trace_request_count_; uint64_t trace_request_count_;
}; };
// Replayer helps to replay the captured RocksDB operations, using a user
// provided TraceReader.
// The Replayer is instantiated via db_bench today, on using "replay" benchmark.
class Replayer {
public:
Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
std::unique_ptr<TraceReader>&& reader);
~Replayer();
// Replay all the traces from the provided trace stream, taking the delay
// between the traces into consideration.
Status Replay();
// Replay the provide trace stream, which is the same as Replay(), with
// multi-threads. Queries are scheduled in the thread pool job queue.
// User can set the number of threads in the thread pool.
Status MultiThreadReplay(uint32_t threads_num);
// Enables fast forwarding a replay by reducing the delay between the ingested
// traces.
// fast_forward : Rate of replay speedup.
// If 1, replay the operations at the same rate as in the trace stream.
// If > 1, speed up the replay by this amount.
Status SetFastForward(uint32_t fast_forward);
private:
Status ReadHeader(Trace* header);
Status ReadFooter(Trace* footer);
Status ReadTrace(Trace* trace);
// The background function for MultiThreadReplay to execute Get query
// based on the trace records.
static void BGWorkGet(void* arg);
// The background function for MultiThreadReplay to execute WriteBatch
// (Put, Delete, SingleDelete, DeleteRange) based on the trace records.
static void BGWorkWriteBatch(void* arg);
// The background function for MultiThreadReplay to execute Iterator (Seek)
// based on the trace records.
static void BGWorkIterSeek(void* arg);
// The background function for MultiThreadReplay to execute Iterator
// (SeekForPrev) based on the trace records.
static void BGWorkIterSeekForPrev(void* arg);
// The background function for MultiThreadReplay to execute MultiGet based on
// the trace records
static void BGWorkMultiGet(void* arg);
DBImpl* db_;
Env* env_;
std::unique_ptr<TraceReader> trace_reader_;
std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
uint32_t fast_forward_;
// When reading the trace header, the trace file version can be parsed.
// Replayer will use different decode method to get the trace content based
// on different trace file version.
int trace_file_version_;
};
// The passin arg of MultiThreadRepkay for each trace record.
struct ReplayerWorkerArg {
DB* db;
Trace trace_entry;
std::unordered_map<uint32_t, ColumnFamilyHandle*>* cf_map;
WriteOptions woptions;
ReadOptions roptions;
int trace_file_version;
};
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -4,8 +4,11 @@
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "utilities/simulator_cache/cache_simulator.h" #include "utilities/simulator_cache/cache_simulator.h"
#include <algorithm> #include <algorithm>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "rocksdb/trace_record.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {

@ -6,7 +6,9 @@
#include "utilities/simulator_cache/cache_simulator.h" #include "utilities/simulator_cache/cache_simulator.h"
#include <cstdlib> #include <cstdlib>
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/trace_record.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"

@ -31,6 +31,14 @@ Status FileTraceReader::Close() {
return Status::OK(); return Status::OK();
} }
Status FileTraceReader::Reset() {
if (file_reader_ == nullptr) {
return Status::IOError("TraceReader is closed.");
}
offset_ = 0;
return Status::OK();
}
Status FileTraceReader::Read(std::string* data) { Status FileTraceReader::Read(std::string* data) {
assert(file_reader_ != nullptr); assert(file_reader_ != nullptr);
Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize, Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize,

@ -20,6 +20,7 @@ class FileTraceReader : public TraceReader {
virtual Status Read(std::string* data) override; virtual Status Read(std::string* data) override;
virtual Status Close() override; virtual Status Close() override;
virtual Status Reset() override;
private: private:
std::unique_ptr<RandomAccessFileReader> file_reader_; std::unique_ptr<RandomAccessFileReader> file_reader_;

@ -0,0 +1,305 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/trace/replayer_impl.h"
#include <cmath>
#include <thread>
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/trace_reader_writer.h"
#include "util/threadpool_imp.h"
namespace ROCKSDB_NAMESPACE {
ReplayerImpl::ReplayerImpl(DB* db,
const std::vector<ColumnFamilyHandle*>& handles,
std::unique_ptr<TraceReader>&& reader)
: Replayer(),
env_(db->GetEnv()),
trace_reader_(std::move(reader)),
prepared_(false),
trace_end_(false),
header_ts_(0),
exec_handler_(TraceRecord::NewExecutionHandler(db, handles)) {}
ReplayerImpl::~ReplayerImpl() {
exec_handler_.reset();
trace_reader_.reset();
}
Status ReplayerImpl::Prepare() {
Trace header;
int db_version;
Status s = ReadHeader(&header);
if (!s.ok()) {
return s;
}
s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version);
if (!s.ok()) {
return s;
}
header_ts_ = header.ts;
prepared_ = true;
trace_end_ = false;
return Status::OK();
}
Status ReplayerImpl::Next(std::unique_ptr<TraceRecord>* record) {
if (!prepared_) {
return Status::Incomplete("Not prepared!");
}
if (trace_end_) {
return Status::Incomplete("Trace end.");
}
Trace trace;
Status s = ReadTrace(&trace); // ReadTrace is atomic
// Reached the trace end.
if (s.ok() && trace.type == kTraceEnd) {
trace_end_ = true;
return Status::Incomplete("Trace end.");
}
if (!s.ok() || record == nullptr) {
return s;
}
return DecodeTraceRecord(&trace, trace_file_version_, record);
}
Status ReplayerImpl::Execute(const std::unique_ptr<TraceRecord>& record) {
return record->Accept(exec_handler_.get());
}
Status ReplayerImpl::Execute(std::unique_ptr<TraceRecord>&& record) {
Status s = record->Accept(exec_handler_.get());
record.reset();
return s;
}
Status ReplayerImpl::Replay(const ReplayOptions& options) {
if (options.fast_forward <= 0.0) {
return Status::InvalidArgument("Wrong fast forward speed!");
}
if (!prepared_) {
return Status::Incomplete("Not prepared!");
}
if (trace_end_) {
return Status::Incomplete("Trace end.");
}
Status s = Status::OK();
if (options.num_threads <= 1) {
// num_threads == 0 or num_threads == 1 uses single thread.
std::chrono::system_clock::time_point replay_epoch =
std::chrono::system_clock::now();
while (s.ok()) {
Trace trace;
s = ReadTrace(&trace);
// If already at trace end, ReadTrace should return Status::Incomplete().
if (!s.ok()) {
break;
}
// No need to sleep before breaking the loop if at the trace end.
if (trace.type == kTraceEnd) {
trace_end_ = true;
s = Status::Incomplete("Trace end.");
break;
}
// In single-threaded replay, decode first then sleep.
std::unique_ptr<TraceRecord> record;
s = DecodeTraceRecord(&trace, trace_file_version_, &record);
// Skip unsupported traces, stop for other errors.
if (s.IsNotSupported()) {
continue;
} else if (!s.ok()) {
break;
}
std::this_thread::sleep_until(
replay_epoch +
std::chrono::microseconds(static_cast<uint64_t>(std::llround(
1.0 * (trace.ts - header_ts_) / options.fast_forward))));
s = Execute(std::move(record));
}
} else {
// Multi-threaded replay.
ThreadPoolImpl thread_pool;
thread_pool.SetHostEnv(env_);
thread_pool.SetBackgroundThreads(static_cast<int>(options.num_threads));
std::mutex mtx;
// Background decoding and execution status.
Status bg_s = Status::OK();
uint64_t last_err_ts = static_cast<uint64_t>(-1);
// Callback function used in background work to update bg_s at the first
// execution error (with the smallest Trace timestamp).
auto error_cb = [&mtx, &bg_s, &last_err_ts](Status err, uint64_t err_ts) {
std::lock_guard<std::mutex> gd(mtx);
// Only record the first error.
if (!err.ok() && !err.IsNotSupported() && err_ts < last_err_ts) {
bg_s = err;
last_err_ts = err_ts;
}
};
std::chrono::system_clock::time_point replay_epoch =
std::chrono::system_clock::now();
while (bg_s.ok() && s.ok()) {
Trace trace;
s = ReadTrace(&trace);
// If already at trace end, ReadTrace should return Status::Incomplete().
if (!s.ok()) {
break;
}
TraceType trace_type = trace.type;
// No need to sleep before breaking the loop if at the trace end.
if (trace_type == kTraceEnd) {
trace_end_ = true;
s = Status::Incomplete("Trace end.");
break;
}
// In multi-threaded replay, sleep first thatn start decoding and
// execution in a thread.
std::this_thread::sleep_until(
replay_epoch +
std::chrono::microseconds(static_cast<uint64_t>(std::llround(
1.0 * (trace.ts - header_ts_) / options.fast_forward))));
if (trace_type == kTraceWrite || trace_type == kTraceGet ||
trace_type == kTraceIteratorSeek ||
trace_type == kTraceIteratorSeekForPrev ||
trace_type == kTraceMultiGet) {
std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
ra->trace_entry = std::move(trace);
ra->handler = exec_handler_.get();
ra->trace_file_version = trace_file_version_;
ra->error_cb = error_cb;
thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(),
nullptr, nullptr);
}
// Skip unsupported traces.
}
thread_pool.WaitForJobsAndJoinAllThreads();
if (!bg_s.ok()) {
s = bg_s;
}
}
if (s.IsIncomplete()) {
// Reaching eof returns Incomplete status at the moment.
// Could happen when killing a process without calling EndTrace() API.
// TODO: Add better error handling.
trace_end_ = true;
return Status::OK();
}
return s;
}
uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_; }
Status ReplayerImpl::ReadHeader(Trace* header) {
assert(header != nullptr);
Status s = trace_reader_->Reset();
if (!s.ok()) {
return s;
}
std::string encoded_trace;
// Read the trace head
s = trace_reader_->Read(&encoded_trace);
if (!s.ok()) {
return s;
}
return TracerHelper::DecodeHeader(encoded_trace, header);
}
Status ReplayerImpl::ReadFooter(Trace* footer) {
assert(footer != nullptr);
Status s = ReadTrace(footer);
if (!s.ok()) {
return s;
}
if (footer->type != kTraceEnd) {
return Status::Corruption("Corrupted trace file. Incorrect footer.");
}
// TODO: Add more validations later
return s;
}
Status ReplayerImpl::ReadTrace(Trace* trace) {
assert(trace != nullptr);
std::string encoded_trace;
// We don't know if TraceReader is implemented thread-safe, so we protect the
// reading trace part with a mutex. The decoding part does not need to be
// protected since it's local.
{
std::lock_guard<std::mutex> guard(mutex_);
Status s = trace_reader_->Read(&encoded_trace);
if (!s.ok()) {
return s;
}
}
return TracerHelper::DecodeTrace(encoded_trace, trace);
}
Status ReplayerImpl::DecodeTraceRecord(Trace* trace, int trace_file_version,
std::unique_ptr<TraceRecord>* record) {
switch (trace->type) {
case kTraceWrite:
return TracerHelper::DecodeWriteRecord(trace, trace_file_version, record);
case kTraceGet:
return TracerHelper::DecodeGetRecord(trace, trace_file_version, record);
case kTraceIteratorSeek:
case kTraceIteratorSeekForPrev:
return TracerHelper::DecodeIterRecord(trace, trace_file_version, record);
case kTraceMultiGet:
return TracerHelper::DecodeMultiGetRecord(trace, trace_file_version,
record);
case kTraceEnd:
return Status::Incomplete("Trace end.");
default:
return Status::NotSupported("Unsupported trace type.");
}
}
void ReplayerImpl::BackgroundWork(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
std::unique_ptr<TraceRecord> record;
Status s =
DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record);
if (s.ok()) {
s = record->Accept(ra->handler);
record.reset();
}
if (!s.ok() && ra->error_cb) {
ra->error_cb(s, ra->trace_entry.ts);
}
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -0,0 +1,90 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/trace_record.h"
#include "rocksdb/utilities/replayer.h"
#include "trace_replay/trace_replay.h"
namespace ROCKSDB_NAMESPACE {
class ColumnFamilyHandle;
class DB;
class Env;
class TraceReader;
class TraceRecord;
class Status;
struct ReplayOptions;
class ReplayerImpl : public Replayer {
public:
ReplayerImpl(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
std::unique_ptr<TraceReader>&& reader);
~ReplayerImpl() override;
using Replayer::Prepare;
Status Prepare() override;
using Replayer::Next;
Status Next(std::unique_ptr<TraceRecord>* record) override;
using Replayer::Execute;
Status Execute(const std::unique_ptr<TraceRecord>& record) override;
Status Execute(std::unique_ptr<TraceRecord>&& record) override;
using Replayer::Replay;
Status Replay(const ReplayOptions& options) override;
using Replayer::GetHeaderTimestamp;
uint64_t GetHeaderTimestamp() const override;
private:
Status ReadHeader(Trace* header);
Status ReadFooter(Trace* footer);
Status ReadTrace(Trace* trace);
// Generic function to convert a Trace to TraceRecord.
static Status DecodeTraceRecord(Trace* trace, int trace_file_version,
std::unique_ptr<TraceRecord>* record);
// Generic function to execute a Trace in a thread pool.
static void BackgroundWork(void* arg);
Env* env_;
std::unique_ptr<TraceReader> trace_reader_;
// When reading the trace header, the trace file version can be parsed.
// Replayer will use different decode method to get the trace content based
// on different trace file version.
int trace_file_version_;
std::mutex mutex_;
std::atomic<bool> prepared_;
std::atomic<bool> trace_end_;
uint64_t header_ts_;
std::unique_ptr<TraceRecord::Handler> exec_handler_;
};
// The passin arg of MultiThreadRepkay for each trace record.
struct ReplayerWorkerArg {
Trace trace_entry;
int trace_file_version;
// Handler to execute TraceRecord.
TraceRecord::Handler* handler;
// Callback function to report the error status and the timestamp of the
// TraceRecord.
std::function<void(Status, uint64_t)> error_cb;
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
Loading…
Cancel
Save