Trace and Replay for RocksDB (#3837)

Summary:
A framework for tracing and replaying RocksDB operations.

A binary trace file is created by capturing the DB operations, and it can be replayed back at the same rate using db_bench.

- Column-families are supported
- Multi-threaded tracing is supported.
- TraceReader and TraceWriter are exposed to the user, so that tracing to various destinations can be enabled (say, to other messaging/logging services). By default, a FileTraceReader and FileTraceWriter are implemented to capture to a file and replay from it.
- This is not yet ideal to be enabled in production due to large performance overhead, but it can be safely tried out in a shadow setup, say, for analyzing RocksDB operations.

Currently supported DB operations:
- Writes:
-- Put
-- Merge
-- Delete
-- SingleDelete
-- DeleteRange
-- Write
- Reads:
-- Get (point lookups)
Pull Request resolved: https://github.com/facebook/rocksdb/pull/3837

Differential Revision: D7974837

Pulled By: sagar0

fbshipit-source-id: 8ec65aaf336504bc1f6ed0feae67f6ed5ef97a72
main
Sagar Vemuri 6 years ago committed by Facebook Github Bot
parent ee7617167f
commit 12b6cdeed3
  1. 2
      CMakeLists.txt
  2. 2
      TARGETS
  3. 23
      db/db_impl.cc
  4. 12
      db/db_impl.h
  5. 6
      db/db_impl_readonly.cc
  6. 6
      db/db_impl_write.cc
  7. 96
      db/db_test2.cc
  8. 10
      include/rocksdb/db.h
  9. 2
      include/rocksdb/options.h
  10. 47
      include/rocksdb/trace_reader_writer.h
  11. 2
      src.mk
  12. 81
      tools/db_bench_tool.cc
  13. 205
      util/trace_replay.cc
  14. 91
      util/trace_replay.h
  15. 117
      utilities/trace/file_trace_reader_writer.cc
  16. 47
      utilities/trace/file_trace_reader_writer.h

@ -603,6 +603,7 @@ set(SOURCES
util/testutil.cc util/testutil.cc
util/thread_local.cc util/thread_local.cc
util/threadpool_imp.cc util/threadpool_imp.cc
util/trace_replay.cc
util/transaction_test_util.cc util/transaction_test_util.cc
util/xxhash.cc util/xxhash.cc
utilities/backupable/backupable_db.cc utilities/backupable/backupable_db.cc
@ -651,6 +652,7 @@ set(SOURCES
utilities/simulator_cache/sim_cache.cc utilities/simulator_cache/sim_cache.cc
utilities/spatialdb/spatial_db.cc utilities/spatialdb/spatial_db.cc
utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc
utilities/trace/file_trace_reader_writer.cc
utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/optimistic_transaction_db_impl.cc
utilities/transactions/optimistic_transaction.cc utilities/transactions/optimistic_transaction.cc
utilities/transactions/pessimistic_transaction.cc utilities/transactions/pessimistic_transaction.cc

@ -224,6 +224,7 @@ cpp_library(
"util/sync_point_impl.cc", "util/sync_point_impl.cc",
"util/thread_local.cc", "util/thread_local.cc",
"util/threadpool_imp.cc", "util/threadpool_imp.cc",
"util/trace_replay.cc",
"util/transaction_test_util.cc", "util/transaction_test_util.cc",
"util/xxhash.cc", "util/xxhash.cc",
"utilities/backupable/backupable_db.cc", "utilities/backupable/backupable_db.cc",
@ -269,6 +270,7 @@ cpp_library(
"utilities/redis/redis_lists.cc", "utilities/redis/redis_lists.cc",
"utilities/simulator_cache/sim_cache.cc", "utilities/simulator_cache/sim_cache.cc",
"utilities/spatialdb/spatial_db.cc", "utilities/spatialdb/spatial_db.cc",
"utilities/trace/file_trace_reader_writer.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/transactions/optimistic_transaction.cc", "utilities/transactions/optimistic_transaction.cc",
"utilities/transactions/optimistic_transaction_db_impl.cc", "utilities/transactions/optimistic_transaction_db_impl.cc",

@ -1071,6 +1071,15 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
// tracing is enabled.
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->Get(column_family, key);
}
}
// Acquire SuperVersion // Acquire SuperVersion
SuperVersion* sv = GetAndRefSuperVersion(cfd); SuperVersion* sv = GetAndRefSuperVersion(cfd);
@ -3107,5 +3116,19 @@ void DBImpl::WaitForIngestFile() {
} }
} }
Status DBImpl::StartTrace(const TraceOptions& /* options */,
std::unique_ptr<TraceWriter>&& trace_writer) {
InstrumentedMutexLock lock(&trace_mutex_);
tracer_.reset(new Tracer(env_, std::move(trace_writer)));
return Status::OK();
}
Status DBImpl::EndTrace() {
InstrumentedMutexLock lock(&trace_mutex_);
Status s = tracer_->Close();
tracer_.reset();
return s;
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb

@ -46,6 +46,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "rocksdb/write_buffer_manager.h" #include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h" #include "table/scoped_arena_iterator.h"
@ -54,6 +55,7 @@
#include "util/hash.h" #include "util/hash.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/thread_local.h" #include "util/thread_local.h"
#include "util/trace_replay.h"
namespace rocksdb { namespace rocksdb {
@ -333,6 +335,14 @@ class DBImpl : public DB {
virtual Status VerifyChecksum() override; virtual Status VerifyChecksum() override;
using DB::StartTrace;
virtual Status StartTrace(
const TraceOptions& options,
std::unique_ptr<TraceWriter>&& trace_writer) override;
using DB::EndTrace;
virtual Status EndTrace() override;
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// Similar to GetSnapshot(), but also lets the db know that this snapshot // Similar to GetSnapshot(), but also lets the db know that this snapshot
@ -697,6 +707,8 @@ class DBImpl : public DB {
Statistics* stats_; Statistics* stats_;
std::unordered_map<std::string, RecoveredTransaction*> std::unordered_map<std::string, RecoveredTransaction*>
recovered_transactions_; recovered_transactions_;
std::unique_ptr<Tracer> tracer_;
InstrumentedMutex trace_mutex_;
// Except in DB::Open(), WriteOptionsFile can only be called when: // Except in DB::Open(), WriteOptionsFile can only be called when:
// Persist options to options file. // Persist options to options file.

@ -35,6 +35,12 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
SequenceNumber snapshot = versions_->LastSequence(); SequenceNumber snapshot = versions_->LastSequence();
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->Get(column_family, key);
}
}
SuperVersion* super_version = cfd->GetSuperVersion(); SuperVersion* super_version = cfd->GetSuperVersion();
MergeContext merge_context; MergeContext merge_context;
RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);

@ -76,6 +76,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (my_batch == nullptr) { if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!"); return Status::Corruption("Batch is nullptr!");
} }
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->Write(my_batch);
}
}
if (write_options.sync && write_options.disableWAL) { if (write_options.sync && write_options.disableWAL) {
return Status::InvalidArgument("Sync writes has to enable WAL."); return Status::InvalidArgument("Sync writes has to enable WAL.");
} }

@ -2500,6 +2500,102 @@ TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(DBTest2, TraceAndReplay) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreatePutOperator();
ReadOptions ro;
WriteOptions wo;
TraceOptions trace_opts;
EnvOptions env_opts;
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
std::string trace_filename = dbname_ + "/rocksdb.trace";
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
ASSERT_OK(Put(0, "a", "1"));
ASSERT_OK(Merge(0, "b", "2"));
ASSERT_OK(Delete(0, "c"));
ASSERT_OK(SingleDelete(0, "d"));
ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
WriteBatch batch;
ASSERT_OK(batch.Put("f", "11"));
ASSERT_OK(batch.Merge("g", "12"));
ASSERT_OK(batch.Delete("h"));
ASSERT_OK(batch.SingleDelete("i"));
ASSERT_OK(batch.DeleteRange("j", "k"));
ASSERT_OK(db_->Write(wo, &batch));
ASSERT_EQ("1", Get(0, "a"));
ASSERT_EQ("12", Get(0, "g"));
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(1, "rocksdb", "rocks"));
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
ASSERT_OK(db_->EndTrace());
// These should not get into the trace file as it is after EndTrace.
Put("hello", "world");
Merge("foo", "bar");
// Open another db, replay, and verify the data
std::string value;
std::string dbname2 = test::TmpDir(env_) + "/db_replay";
ASSERT_OK(DestroyDB(dbname2, options));
// Using a different name than db2, to pacify infer's use-after-lifetime
// warnings (http://fbinfer.com).
DB* db2_init = nullptr;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname2, &db2_init));
ColumnFamilyHandle* cf;
ASSERT_OK(
db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
delete cf;
delete db2_init;
DB* db2 = nullptr;
std::vector<ColumnFamilyDescriptor> column_families;
ColumnFamilyOptions cf_options;
cf_options.merge_operator = MergeOperators::CreatePutOperator();
column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
column_families.push_back(
ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
env_->SleepForMicroseconds(100);
// Verify that the keys don't already exist
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
Replayer replayer(db2, handles_, std::move(trace_reader));
ASSERT_OK(replayer.Replay());
ASSERT_OK(db2->Get(ro, handles[0], "a", &value));
ASSERT_EQ("1", value);
ASSERT_OK(db2->Get(ro, handles[0], "g", &value));
ASSERT_EQ("12", value);
ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
ASSERT_OK(db2->Get(ro, handles[1], "foo", &value));
ASSERT_EQ("bar", value);
ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value));
ASSERT_EQ("rocks", value);
for (auto handle : handles) {
delete handle;
}
delete db2;
ASSERT_OK(DestroyDB(dbname2, options));
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
TEST_F(DBTest2, PinnableSliceAndMmapReads) { TEST_F(DBTest2, PinnableSliceAndMmapReads) {

@ -53,6 +53,7 @@ struct ExternalSstFileInfo;
class WriteBatch; class WriteBatch;
class Env; class Env;
class EventListener; class EventListener;
class TraceWriter;
using std::unique_ptr; using std::unique_ptr;
@ -1168,6 +1169,15 @@ class DB {
return Status::NotSupported("PromoteL0() is not implemented."); return Status::NotSupported("PromoteL0() is not implemented.");
} }
// Trace DB operations. Use EndTrace() to stop tracing.
virtual Status StartTrace(const TraceOptions& /*options*/,
std::unique_ptr<TraceWriter>&& /*trace_writer*/) {
return Status::NotSupported("StartTrace() is not implemented.");
}
virtual Status EndTrace() {
return Status::NotSupported("EndTrace() is not implemented.");
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// Needed for StackableDB // Needed for StackableDB

@ -1281,6 +1281,8 @@ struct IngestExternalFileOptions {
bool write_global_seqno = true; bool write_global_seqno = true;
}; };
struct TraceOptions {};
} // namespace rocksdb } // namespace rocksdb
#endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ #endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_

@ -0,0 +1,47 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/env.h"
namespace rocksdb {
// Allow custom implementations of TraceWriter and TraceReader.
// By default, RocksDB provides a way to capture the traces to a file using the
// factory NewFileTraceWriter(). But users could also choose to export traces to
// any other system by providing custom implementations of TraceWriter and
// TraceReader.
// TraceWriter allows exporting RocksDB traces to any system, one operation at
// a time.
class TraceWriter {
public:
TraceWriter() {}
virtual ~TraceWriter() {}
virtual Status Write(const Slice& data) = 0;
virtual Status Close() = 0;
};
// TraceReader allows reading RocksDB traces from any system, one operation at
// a time. A RocksDB Replayer could depend on this to replay opertions.
class TraceReader {
public:
TraceReader() {}
virtual ~TraceReader() {}
virtual Status Read(std::string* data) = 0;
virtual Status Close() = 0;
};
// Factory methods to read/write traces from/to a file.
Status NewFileTraceWriter(Env* env, const EnvOptions& env_options,
const std::string& trace_filename,
std::unique_ptr<TraceWriter>* trace_writer);
Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
const std::string& trace_filename,
std::unique_ptr<TraceReader>* trace_reader);
} // namespace rocksdb

@ -153,6 +153,7 @@ LIB_SOURCES = \
util/sync_point_impl.cc \ util/sync_point_impl.cc \
util/thread_local.cc \ util/thread_local.cc \
util/threadpool_imp.cc \ util/threadpool_imp.cc \
util/trace_replay.cc \
util/transaction_test_util.cc \ util/transaction_test_util.cc \
util/xxhash.cc \ util/xxhash.cc \
utilities/backupable/backupable_db.cc \ utilities/backupable/backupable_db.cc \
@ -198,6 +199,7 @@ LIB_SOURCES = \
utilities/simulator_cache/sim_cache.cc \ utilities/simulator_cache/sim_cache.cc \
utilities/spatialdb/spatial_db.cc \ utilities/spatialdb/spatial_db.cc \
utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \
utilities/trace/file_trace_reader_writer.cc \
utilities/transactions/optimistic_transaction.cc \ utilities/transactions/optimistic_transaction.cc \
utilities/transactions/optimistic_transaction_db_impl.cc \ utilities/transactions/optimistic_transaction_db_impl.cc \
utilities/transactions/pessimistic_transaction.cc \ utilities/transactions/pessimistic_transaction.cc \

@ -189,8 +189,8 @@ DEFINE_string(
"\tresetstats -- Reset DB stats\n" "\tresetstats -- Reset DB stats\n"
"\tlevelstats -- Print the number of files and bytes per level\n" "\tlevelstats -- Print the number of files and bytes per level\n"
"\tsstables -- Print sstable info\n" "\tsstables -- Print sstable info\n"
"\theapprofile -- Dump a heap profile (if supported by this" "\theapprofile -- Dump a heap profile (if supported by this port)\n"
" port)\n"); "\treplay -- replay the trace file specified with trace_file\n");
DEFINE_int64(num, 1000000, "Number of key/values to place in database"); DEFINE_int64(num, 1000000, "Number of key/values to place in database");
@ -728,6 +728,8 @@ DEFINE_bool(report_bg_io_stats, false,
DEFINE_bool(use_stderr_info_logger, false, DEFINE_bool(use_stderr_info_logger, false,
"Write info logs to stderr instead of to LOG file. "); "Write info logs to stderr instead of to LOG file. ");
DEFINE_string(trace_file, "", "Trace workload to a file. ");
static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
assert(ctype); assert(ctype);
@ -1970,6 +1972,7 @@ class Benchmark {
int64_t max_num_range_tombstones_; int64_t max_num_range_tombstones_;
WriteOptions write_options_; WriteOptions write_options_;
Options open_options_; // keep options around to properly destroy db later Options open_options_; // keep options around to properly destroy db later
TraceOptions trace_options_;
int64_t reads_; int64_t reads_;
int64_t deletes_; int64_t deletes_;
double read_random_exp_range_; double read_random_exp_range_;
@ -2658,6 +2661,16 @@ void VerifyDBFromDB(std::string& truth_db_name) {
PrintStats("rocksdb.levelstats"); PrintStats("rocksdb.levelstats");
} else if (name == "sstables") { } else if (name == "sstables") {
PrintStats("rocksdb.sstables"); PrintStats("rocksdb.sstables");
} else if (name == "replay") {
if (num_threads > 1) {
fprintf(stderr, "Multi-threaded replay is not yet supported\n");
exit(1);
}
if (FLAGS_trace_file == "") {
fprintf(stderr, "Please set --trace_file to be replayed from\n");
exit(1);
}
method = &Benchmark::Replay;
} else if (!name.empty()) { // No error message for empty name } else if (!name.empty()) { // No error message for empty name
fprintf(stderr, "unknown benchmark '%s'\n", name.c_str()); fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
exit(1); exit(1);
@ -2688,6 +2701,30 @@ void VerifyDBFromDB(std::string& truth_db_name) {
if (method != nullptr) { if (method != nullptr) {
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
// A trace_file option can be provided both for trace and replay
// operations. But db_bench does not support tracing and replaying at
// the same time, for now. So, start tracing only when it is not a
// replay.
if (FLAGS_trace_file != "" && name != "replay") {
std::unique_ptr<TraceWriter> trace_writer;
Status s = NewFileTraceWriter(FLAGS_env, EnvOptions(),
FLAGS_trace_file, &trace_writer);
if (!s.ok()) {
fprintf(stderr, "Encountered an error starting a trace, %s\n",
s.ToString().c_str());
exit(1);
}
s = db_.db->StartTrace(trace_options_, std::move(trace_writer));
if (!s.ok()) {
fprintf(stderr, "Encountered an error starting a trace, %s\n",
s.ToString().c_str());
exit(1);
}
fprintf(stdout, "Tracing the workload to: [%s]\n",
FLAGS_trace_file.c_str());
}
if (num_warmup > 0) { if (num_warmup > 0) {
printf("Warming up benchmark by running %d times\n", num_warmup); printf("Warming up benchmark by running %d times\n", num_warmup);
} }
@ -2713,6 +2750,15 @@ void VerifyDBFromDB(std::string& truth_db_name) {
(this->*post_process_method)(); (this->*post_process_method)();
} }
} }
if (name != "replay" && FLAGS_trace_file != "") {
Status s = db_.db->EndTrace();
if (!s.ok()) {
fprintf(stderr, "Encountered an error ending the trace, %s\n",
s.ToString().c_str());
}
}
if (FLAGS_statistics) { if (FLAGS_statistics) {
fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
} }
@ -5545,6 +5591,37 @@ void VerifyDBFromDB(std::string& truth_db_name) {
} }
fprintf(stdout, "\n%s\n", stats.c_str()); fprintf(stdout, "\n%s\n", stats.c_str());
} }
void Replay(ThreadState* thread) {
if (db_.db != nullptr) {
Replay(thread, &db_);
}
}
void Replay(ThreadState* /*thread*/, DBWithColumnFamilies* db_with_cfh) {
Status s;
unique_ptr<TraceReader> trace_reader;
s = NewFileTraceReader(FLAGS_env, EnvOptions(), FLAGS_trace_file,
&trace_reader);
if (!s.ok()) {
fprintf(
stderr,
"Encountered an error creating a TraceReader from the trace file. "
"Error: %s\n",
s.ToString().c_str());
exit(1);
}
Replayer replayer(db_with_cfh->db, db_with_cfh->cfh,
std::move(trace_reader));
s = replayer.Replay();
if (s.ok()) {
fprintf(stdout, "Replay started from trace_file: %s\n",
FLAGS_trace_file.c_str());
} else {
fprintf(stderr, "Starting replay failed. Error: %s\n",
s.ToString().c_str());
}
}
}; };
int db_bench_tool(int argc, char** argv) { int db_bench_tool(int argc, char** argv) {

@ -0,0 +1,205 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "util/trace_replay.h"
#include <chrono>
#include <sstream>
#include <thread>
#include "db/db_impl.h"
#include "rocksdb/slice.h"
#include "rocksdb/write_batch.h"
#include "util/coding.h"
#include "util/string_util.h"
namespace rocksdb {
namespace {
void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) {
PutFixed32(dst, cf_id);
PutLengthPrefixedSlice(dst, key);
}
void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
Slice buf(buffer);
GetFixed32(&buf, cf_id);
GetLengthPrefixedSlice(&buf, key);
}
} // namespace
Tracer::Tracer(Env* env, std::unique_ptr<TraceWriter>&& trace_writer)
: env_(env), trace_writer_(std::move(trace_writer)) {
WriteHeader();
}
Tracer::~Tracer() { trace_writer_.reset(); }
Status Tracer::Write(WriteBatch* write_batch) {
Trace trace;
trace.ts = env_->NowMicros();
trace.type = kTraceWrite;
trace.payload = write_batch->Data();
return WriteTrace(trace);
}
Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
Trace trace;
trace.ts = env_->NowMicros();
trace.type = kTraceGet;
EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
return WriteTrace(trace);
}
Status Tracer::WriteHeader() {
std::ostringstream s;
s << kTraceMagic << "\t"
<< "Trace Version: 0.1\t"
<< "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
<< "Format: Timestamp OpType Payload\n";
std::string header(s.str());
Trace trace;
trace.ts = env_->NowMicros();
trace.type = kTraceBegin;
trace.payload = header;
return WriteTrace(trace);
}
Status Tracer::WriteFooter() {
Trace trace;
trace.ts = env_->NowMicros();
trace.type = kTraceEnd;
trace.payload = "";
return WriteTrace(trace);
}
Status Tracer::WriteTrace(const Trace& trace) {
std::string encoded_trace;
PutFixed64(&encoded_trace, trace.ts);
encoded_trace.push_back(trace.type);
PutFixed32(&encoded_trace, static_cast<uint32_t>(trace.payload.size()));
encoded_trace.append(trace.payload);
return trace_writer_->Write(Slice(encoded_trace));
}
Status Tracer::Close() { return WriteFooter(); }
Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
unique_ptr<TraceReader>&& reader)
: trace_reader_(std::move(reader)) {
assert(db != nullptr);
db_ = static_cast<DBImpl*>(db->GetRootDB());
for (ColumnFamilyHandle* cfh : handles) {
cf_map_[cfh->GetID()] = cfh;
}
}
Replayer::~Replayer() { trace_reader_.reset(); }
Status Replayer::Replay() {
Status s;
Trace header;
s = ReadHeader(&header);
if (!s.ok()) {
return s;
}
std::chrono::system_clock::time_point replay_epoch =
std::chrono::system_clock::now();
WriteOptions woptions;
ReadOptions roptions;
Trace trace;
uint64_t ops = 0;
while (s.ok()) {
trace.reset();
s = ReadTrace(&trace);
if (!s.ok()) {
break;
}
std::this_thread::sleep_until(
replay_epoch + std::chrono::microseconds(trace.ts - header.ts));
if (trace.type == kTraceWrite) {
WriteBatch batch(trace.payload);
db_->Write(woptions, &batch);
ops++;
} else if (trace.type == kTraceGet) {
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(trace.payload, &cf_id, &key);
if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
std::string value;
if (cf_id == 0) {
db_->Get(roptions, key, &value);
} else {
db_->Get(roptions, cf_map_[cf_id], key, &value);
}
ops++;
} else if (trace.type == kTraceEnd) {
// Do nothing for now.
// TODO: Add some validations later.
break;
}
}
if (s.IsIncomplete()) {
// Reaching eof returns Incomplete status at the moment.
// Could happen when killing a process without calling EndTrace() API.
// TODO: Add better error handling.
return Status::OK();
}
return s;
}
Status Replayer::ReadHeader(Trace* header) {
assert(header != nullptr);
Status s = ReadTrace(header);
if (!s.ok()) {
return s;
}
if (header->type != kTraceBegin) {
return Status::Corruption("Corrupted trace file. Incorrect header.");
}
if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
return Status::Corruption("Corrupted trace file. Incorrect magic.");
}
return s;
}
Status Replayer::ReadFooter(Trace* footer) {
assert(footer != nullptr);
Status s = ReadTrace(footer);
if (!s.ok()) {
return s;
}
if (footer->type != kTraceEnd) {
return Status::Corruption("Corrupted trace file. Incorrect footer.");
}
// TODO: Add more validations later
return s;
}
Status Replayer::ReadTrace(Trace* trace) {
assert(trace != nullptr);
std::string encoded_trace;
Status s = trace_reader_->Read(&encoded_trace);
if (!s.ok()) {
return s;
}
Slice enc_slice = Slice(encoded_trace);
GetFixed64(&enc_slice, &trace->ts);
trace->type = static_cast<TraceType>(enc_slice[0]);
enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
trace->payload = enc_slice.ToString();
return s;
}
} // namespace rocksdb

@ -0,0 +1,91 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <memory>
#include <unordered_map>
#include <utility>
#include "rocksdb/env.h"
#include "rocksdb/trace_reader_writer.h"
namespace rocksdb {
class ColumnFamilyHandle;
class DB;
class DBImpl;
class Slice;
class WriteBatch;
const std::string kTraceMagic = "feedcafedeadbeef";
const unsigned int kTraceTimestampSize = 8;
const unsigned int kTraceTypeSize = 1;
const unsigned int kTracePayloadLengthSize = 4;
const unsigned int kTraceMetadataSize =
kTraceTimestampSize + kTraceTypeSize + kTracePayloadLengthSize;
enum TraceType : char {
kTraceBegin = 1,
kTraceEnd = 2,
kTraceWrite = 3,
kTraceGet = 4,
kTraceMax,
};
// TODO: This should also be made part of public interface to help users build
// custom TracerReaders and TraceWriters.
struct Trace {
uint64_t ts;
TraceType type;
std::string payload;
void reset() {
ts = 0;
type = kTraceMax;
payload.clear();
}
};
// Trace RocksDB operations using a TraceWriter.
class Tracer {
public:
Tracer(Env* env, std::unique_ptr<TraceWriter>&& trace_writer);
~Tracer();
Status Write(WriteBatch* write_batch);
Status Get(ColumnFamilyHandle* cfname, const Slice& key);
Status Close();
private:
Status WriteHeader();
Status WriteFooter();
Status WriteTrace(const Trace& trace);
Env* env_;
unique_ptr<TraceWriter> trace_writer_;
};
// Replay RocksDB operations from a trace.
class Replayer {
public:
Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
std::unique_ptr<TraceReader>&& reader);
~Replayer();
Status Replay();
private:
Status ReadHeader(Trace* header);
Status ReadFooter(Trace* footer);
Status ReadTrace(Trace* trace);
DBImpl* db_;
std::unique_ptr<TraceReader> trace_reader_;
std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_;
};
} // namespace rocksdb

@ -0,0 +1,117 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/trace/file_trace_reader_writer.h"
#include "util/coding.h"
#include "util/file_reader_writer.h"
#include "util/trace_replay.h"
namespace rocksdb {
const unsigned int FileTraceReader::kBufferSize = 1024; // 1KB
FileTraceReader::FileTraceReader(
std::unique_ptr<RandomAccessFileReader>&& reader)
: file_reader_(std::move(reader)),
offset_(0),
buffer_(new char[kBufferSize]) {}
FileTraceReader::~FileTraceReader() {
Close();
delete[] buffer_;
}
Status FileTraceReader::Close() {
file_reader_.reset();
return Status::OK();
}
Status FileTraceReader::Read(std::string* data) {
assert(file_reader_ != nullptr);
Status s = file_reader_->Read(offset_, kTraceMetadataSize, &result_, buffer_);
if (!s.ok()) {
return s;
}
if (result_.size() == 0) {
// No more data to read
// Todo: Come up with a better way to indicate end of data. May be this
// could be avoided once footer is introduced.
return Status::Incomplete();
}
if (result_.size() < kTraceMetadataSize) {
return Status::Corruption("Corrupted trace file.");
}
*data = result_.ToString();
offset_ += kTraceMetadataSize;
uint32_t payload_len =
DecodeFixed32(&buffer_[kTraceTimestampSize + kTraceTypeSize]);
// Read Payload
unsigned int bytes_to_read = payload_len;
unsigned int to_read =
bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
while (to_read > 0) {
s = file_reader_->Read(offset_, to_read, &result_, buffer_);
if (!s.ok()) {
return s;
}
if (result_.size() < to_read) {
return Status::Corruption("Corrupted trace file.");
}
data->append(result_.data(), result_.size());
offset_ += to_read;
bytes_to_read -= to_read;
to_read = bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
}
return s;
}
FileTraceWriter::~FileTraceWriter() { Close(); }
Status FileTraceWriter::Close() {
file_writer_.reset();
return Status::OK();
}
Status FileTraceWriter::Write(const Slice& data) {
return file_writer_->Append(data);
}
Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
const std::string& trace_filename,
std::unique_ptr<TraceReader>* trace_reader) {
unique_ptr<RandomAccessFile> trace_file;
Status s = env->NewRandomAccessFile(trace_filename, &trace_file, env_options);
if (!s.ok()) {
return s;
}
unique_ptr<RandomAccessFileReader> file_reader;
file_reader.reset(
new RandomAccessFileReader(std::move(trace_file), trace_filename));
trace_reader->reset(new FileTraceReader(std::move(file_reader)));
return s;
}
Status NewFileTraceWriter(Env* env, const EnvOptions& env_options,
const std::string& trace_filename,
std::unique_ptr<TraceWriter>* trace_writer) {
unique_ptr<WritableFile> trace_file;
Status s = env->NewWritableFile(trace_filename, &trace_file, env_options);
if (!s.ok()) {
return s;
}
unique_ptr<WritableFileWriter> file_writer;
file_writer.reset(new WritableFileWriter(std::move(trace_file), env_options));
trace_writer->reset(new FileTraceWriter(std::move(file_writer)));
return s;
}
} // namespace rocksdb

@ -0,0 +1,47 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/trace_reader_writer.h"
namespace rocksdb {
class RandomAccessFileReader;
class WritableFileWriter;
// FileTraceReader allows reading RocksDB traces from a file.
class FileTraceReader : public TraceReader {
public:
explicit FileTraceReader(std::unique_ptr<RandomAccessFileReader>&& reader);
~FileTraceReader();
virtual Status Read(std::string* data) override;
virtual Status Close() override;
private:
unique_ptr<RandomAccessFileReader> file_reader_;
Slice result_;
size_t offset_;
char* const buffer_;
static const unsigned int kBufferSize;
};
// FileTraceWriter allows writing RocksDB traces to a file.
class FileTraceWriter : public TraceWriter {
public:
explicit FileTraceWriter(std::unique_ptr<WritableFileWriter>&& file_writer)
: file_writer_(std::move(file_writer)) {}
~FileTraceWriter();
virtual Status Write(const Slice& data) override;
virtual Status Close() override;
private:
unique_ptr<WritableFileWriter> file_writer_;
};
} // namespace rocksdb
Loading…
Cancel
Save