From 12b6cdeed32536c935a43f86f7950674619bf49f Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Wed, 1 Aug 2018 00:14:43 -0700 Subject: [PATCH] Trace and Replay for RocksDB (#3837) Summary: A framework for tracing and replaying RocksDB operations. A binary trace file is created by capturing the DB operations, and it can be replayed back at the same rate using db_bench. - Column-families are supported - Multi-threaded tracing is supported. - TraceReader and TraceWriter are exposed to the user, so that tracing to various destinations can be enabled (say, to other messaging/logging services). By default, a FileTraceReader and FileTraceWriter are implemented to capture to a file and replay from it. - This is not yet ideal to be enabled in production due to large performance overhead, but it can be safely tried out in a shadow setup, say, for analyzing RocksDB operations. Currently supported DB operations: - Writes: -- Put -- Merge -- Delete -- SingleDelete -- DeleteRange -- Write - Reads: -- Get (point lookups) Pull Request resolved: https://github.com/facebook/rocksdb/pull/3837 Differential Revision: D7974837 Pulled By: sagar0 fbshipit-source-id: 8ec65aaf336504bc1f6ed0feae67f6ed5ef97a72 --- CMakeLists.txt | 2 + TARGETS | 2 + db/db_impl.cc | 23 +++ db/db_impl.h | 12 ++ db/db_impl_readonly.cc | 6 + db/db_impl_write.cc | 6 + db/db_test2.cc | 96 +++++++++ include/rocksdb/db.h | 10 + include/rocksdb/options.h | 2 + include/rocksdb/trace_reader_writer.h | 47 +++++ src.mk | 2 + tools/db_bench_tool.cc | 81 +++++++- util/trace_replay.cc | 205 ++++++++++++++++++++ util/trace_replay.h | 91 +++++++++ utilities/trace/file_trace_reader_writer.cc | 117 +++++++++++ utilities/trace/file_trace_reader_writer.h | 47 +++++ 16 files changed, 747 insertions(+), 2 deletions(-) create mode 100644 include/rocksdb/trace_reader_writer.h create mode 100644 util/trace_replay.cc create mode 100644 util/trace_replay.h create mode 100644 utilities/trace/file_trace_reader_writer.cc create mode 100644 utilities/trace/file_trace_reader_writer.h diff --git a/CMakeLists.txt b/CMakeLists.txt index e165d35eb..501ec1fca 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -603,6 +603,7 @@ set(SOURCES util/testutil.cc util/thread_local.cc util/threadpool_imp.cc + util/trace_replay.cc util/transaction_test_util.cc util/xxhash.cc utilities/backupable/backupable_db.cc @@ -651,6 +652,7 @@ set(SOURCES utilities/simulator_cache/sim_cache.cc utilities/spatialdb/spatial_db.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc + utilities/trace/file_trace_reader_writer.cc utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/optimistic_transaction.cc utilities/transactions/pessimistic_transaction.cc diff --git a/TARGETS b/TARGETS index e63f07c93..e0b9d91b1 100644 --- a/TARGETS +++ b/TARGETS @@ -224,6 +224,7 @@ cpp_library( "util/sync_point_impl.cc", "util/thread_local.cc", "util/threadpool_imp.cc", + "util/trace_replay.cc", "util/transaction_test_util.cc", "util/xxhash.cc", "utilities/backupable/backupable_db.cc", @@ -269,6 +270,7 @@ cpp_library( "utilities/redis/redis_lists.cc", "utilities/simulator_cache/sim_cache.cc", "utilities/spatialdb/spatial_db.cc", + "utilities/trace/file_trace_reader_writer.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/transactions/optimistic_transaction.cc", "utilities/transactions/optimistic_transaction_db_impl.cc", diff --git a/db/db_impl.cc b/db/db_impl.cc index ba9fd46fe..970672b12 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1071,6 +1071,15 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); + if (tracer_) { + // TODO: This mutex should be removed later, to improve performance when + // tracing is enabled. + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + tracer_->Get(column_family, key); + } + } + // Acquire SuperVersion SuperVersion* sv = GetAndRefSuperVersion(cfd); @@ -3107,5 +3116,19 @@ void DBImpl::WaitForIngestFile() { } } +Status DBImpl::StartTrace(const TraceOptions& /* options */, + std::unique_ptr&& trace_writer) { + InstrumentedMutexLock lock(&trace_mutex_); + tracer_.reset(new Tracer(env_, std::move(trace_writer))); + return Status::OK(); +} + +Status DBImpl::EndTrace() { + InstrumentedMutexLock lock(&trace_mutex_); + Status s = tracer_->Close(); + tracer_.reset(); + return s; +} + #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/db/db_impl.h b/db/db_impl.h index d0123e9b7..a5b33f119 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -46,6 +46,7 @@ #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" #include "rocksdb/status.h" +#include "rocksdb/trace_reader_writer.h" #include "rocksdb/transaction_log.h" #include "rocksdb/write_buffer_manager.h" #include "table/scoped_arena_iterator.h" @@ -54,6 +55,7 @@ #include "util/hash.h" #include "util/stop_watch.h" #include "util/thread_local.h" +#include "util/trace_replay.h" namespace rocksdb { @@ -333,6 +335,14 @@ class DBImpl : public DB { virtual Status VerifyChecksum() override; + using DB::StartTrace; + virtual Status StartTrace( + const TraceOptions& options, + std::unique_ptr&& trace_writer) override; + + using DB::EndTrace; + virtual Status EndTrace() override; + #endif // ROCKSDB_LITE // Similar to GetSnapshot(), but also lets the db know that this snapshot @@ -697,6 +707,8 @@ class DBImpl : public DB { Statistics* stats_; std::unordered_map recovered_transactions_; + std::unique_ptr tracer_; + InstrumentedMutex trace_mutex_; // Except in DB::Open(), WriteOptionsFile can only be called when: // Persist options to options file. diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index e302344d9..c1c1943b2 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -35,6 +35,12 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options, SequenceNumber snapshot = versions_->LastSequence(); auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + tracer_->Get(column_family, key); + } + } SuperVersion* super_version = cfd->GetSuperVersion(); MergeContext merge_context; RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index f171d90d4..aa5ac27fa 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -76,6 +76,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + tracer_->Write(my_batch); + } + } if (write_options.sync && write_options.disableWAL) { return Status::InvalidArgument("Sync writes has to enable WAL."); } diff --git a/db/db_test2.cc b/db/db_test2.cc index 79cb605eb..784386ac8 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2500,6 +2500,102 @@ TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(DBTest2, TraceAndReplay) { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreatePutOperator(); + ReadOptions ro; + WriteOptions wo; + TraceOptions trace_opts; + EnvOptions env_opts; + CreateAndReopenWithCF({"pikachu"}, options); + Random rnd(301); + + std::string trace_filename = dbname_ + "/rocksdb.trace"; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer)); + ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer))); + + ASSERT_OK(Put(0, "a", "1")); + ASSERT_OK(Merge(0, "b", "2")); + ASSERT_OK(Delete(0, "c")); + ASSERT_OK(SingleDelete(0, "d")); + ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f")); + + WriteBatch batch; + ASSERT_OK(batch.Put("f", "11")); + ASSERT_OK(batch.Merge("g", "12")); + ASSERT_OK(batch.Delete("h")); + ASSERT_OK(batch.SingleDelete("i")); + ASSERT_OK(batch.DeleteRange("j", "k")); + ASSERT_OK(db_->Write(wo, &batch)); + + ASSERT_EQ("1", Get(0, "a")); + ASSERT_EQ("12", Get(0, "g")); + + ASSERT_OK(Put(1, "foo", "bar")); + ASSERT_OK(Put(1, "rocksdb", "rocks")); + ASSERT_EQ("NOT_FOUND", Get(1, "leveldb")); + + ASSERT_OK(db_->EndTrace()); + // These should not get into the trace file as it is after EndTrace. + Put("hello", "world"); + Merge("foo", "bar"); + + // Open another db, replay, and verify the data + std::string value; + std::string dbname2 = test::TmpDir(env_) + "/db_replay"; + ASSERT_OK(DestroyDB(dbname2, options)); + + // Using a different name than db2, to pacify infer's use-after-lifetime + // warnings (http://fbinfer.com). + DB* db2_init = nullptr; + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname2, &db2_init)); + ColumnFamilyHandle* cf; + ASSERT_OK( + db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf)); + delete cf; + delete db2_init; + + DB* db2 = nullptr; + std::vector column_families; + ColumnFamilyOptions cf_options; + cf_options.merge_operator = MergeOperators::CreatePutOperator(); + column_families.push_back(ColumnFamilyDescriptor("default", cf_options)); + column_families.push_back( + ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions())); + std::vector handles; + ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2)); + + env_->SleepForMicroseconds(100); + // Verify that the keys don't already exist + ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound()); + + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); + Replayer replayer(db2, handles_, std::move(trace_reader)); + ASSERT_OK(replayer.Replay()); + + ASSERT_OK(db2->Get(ro, handles[0], "a", &value)); + ASSERT_EQ("1", value); + ASSERT_OK(db2->Get(ro, handles[0], "g", &value)); + ASSERT_EQ("12", value); + ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound()); + + ASSERT_OK(db2->Get(ro, handles[1], "foo", &value)); + ASSERT_EQ("bar", value); + ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value)); + ASSERT_EQ("rocks", value); + + for (auto handle : handles) { + delete handle; + } + delete db2; + ASSERT_OK(DestroyDB(dbname2, options)); +} + #endif // ROCKSDB_LITE TEST_F(DBTest2, PinnableSliceAndMmapReads) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index a44fea421..746770836 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -53,6 +53,7 @@ struct ExternalSstFileInfo; class WriteBatch; class Env; class EventListener; +class TraceWriter; using std::unique_ptr; @@ -1168,6 +1169,15 @@ class DB { return Status::NotSupported("PromoteL0() is not implemented."); } + // Trace DB operations. Use EndTrace() to stop tracing. + virtual Status StartTrace(const TraceOptions& /*options*/, + std::unique_ptr&& /*trace_writer*/) { + return Status::NotSupported("StartTrace() is not implemented."); + } + + virtual Status EndTrace() { + return Status::NotSupported("EndTrace() is not implemented."); + } #endif // ROCKSDB_LITE // Needed for StackableDB diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 0a07c3ced..5499595d5 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1281,6 +1281,8 @@ struct IngestExternalFileOptions { bool write_global_seqno = true; }; +struct TraceOptions {}; + } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ diff --git a/include/rocksdb/trace_reader_writer.h b/include/rocksdb/trace_reader_writer.h new file mode 100644 index 000000000..31226487b --- /dev/null +++ b/include/rocksdb/trace_reader_writer.h @@ -0,0 +1,47 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/env.h" + +namespace rocksdb { + +// Allow custom implementations of TraceWriter and TraceReader. +// By default, RocksDB provides a way to capture the traces to a file using the +// factory NewFileTraceWriter(). But users could also choose to export traces to +// any other system by providing custom implementations of TraceWriter and +// TraceReader. + +// TraceWriter allows exporting RocksDB traces to any system, one operation at +// a time. +class TraceWriter { + public: + TraceWriter() {} + virtual ~TraceWriter() {} + + virtual Status Write(const Slice& data) = 0; + virtual Status Close() = 0; +}; + +// TraceReader allows reading RocksDB traces from any system, one operation at +// a time. A RocksDB Replayer could depend on this to replay opertions. +class TraceReader { + public: + TraceReader() {} + virtual ~TraceReader() {} + + virtual Status Read(std::string* data) = 0; + virtual Status Close() = 0; +}; + +// Factory methods to read/write traces from/to a file. +Status NewFileTraceWriter(Env* env, const EnvOptions& env_options, + const std::string& trace_filename, + std::unique_ptr* trace_writer); +Status NewFileTraceReader(Env* env, const EnvOptions& env_options, + const std::string& trace_filename, + std::unique_ptr* trace_reader); +} // namespace rocksdb diff --git a/src.mk b/src.mk index eeda95929..b213db86d 100644 --- a/src.mk +++ b/src.mk @@ -153,6 +153,7 @@ LIB_SOURCES = \ util/sync_point_impl.cc \ util/thread_local.cc \ util/threadpool_imp.cc \ + util/trace_replay.cc \ util/transaction_test_util.cc \ util/xxhash.cc \ utilities/backupable/backupable_db.cc \ @@ -198,6 +199,7 @@ LIB_SOURCES = \ utilities/simulator_cache/sim_cache.cc \ utilities/spatialdb/spatial_db.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \ + utilities/trace/file_trace_reader_writer.cc \ utilities/transactions/optimistic_transaction.cc \ utilities/transactions/optimistic_transaction_db_impl.cc \ utilities/transactions/pessimistic_transaction.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 9dd946c29..2eb609538 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -189,8 +189,8 @@ DEFINE_string( "\tresetstats -- Reset DB stats\n" "\tlevelstats -- Print the number of files and bytes per level\n" "\tsstables -- Print sstable info\n" - "\theapprofile -- Dump a heap profile (if supported by this" - " port)\n"); + "\theapprofile -- Dump a heap profile (if supported by this port)\n" + "\treplay -- replay the trace file specified with trace_file\n"); DEFINE_int64(num, 1000000, "Number of key/values to place in database"); @@ -728,6 +728,8 @@ DEFINE_bool(report_bg_io_stats, false, DEFINE_bool(use_stderr_info_logger, false, "Write info logs to stderr instead of to LOG file. "); +DEFINE_string(trace_file, "", "Trace workload to a file. "); + static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { assert(ctype); @@ -1970,6 +1972,7 @@ class Benchmark { int64_t max_num_range_tombstones_; WriteOptions write_options_; Options open_options_; // keep options around to properly destroy db later + TraceOptions trace_options_; int64_t reads_; int64_t deletes_; double read_random_exp_range_; @@ -2658,6 +2661,16 @@ void VerifyDBFromDB(std::string& truth_db_name) { PrintStats("rocksdb.levelstats"); } else if (name == "sstables") { PrintStats("rocksdb.sstables"); + } else if (name == "replay") { + if (num_threads > 1) { + fprintf(stderr, "Multi-threaded replay is not yet supported\n"); + exit(1); + } + if (FLAGS_trace_file == "") { + fprintf(stderr, "Please set --trace_file to be replayed from\n"); + exit(1); + } + method = &Benchmark::Replay; } else if (!name.empty()) { // No error message for empty name fprintf(stderr, "unknown benchmark '%s'\n", name.c_str()); exit(1); @@ -2688,6 +2701,30 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (method != nullptr) { fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); + + // A trace_file option can be provided both for trace and replay + // operations. But db_bench does not support tracing and replaying at + // the same time, for now. So, start tracing only when it is not a + // replay. + if (FLAGS_trace_file != "" && name != "replay") { + std::unique_ptr trace_writer; + Status s = NewFileTraceWriter(FLAGS_env, EnvOptions(), + FLAGS_trace_file, &trace_writer); + if (!s.ok()) { + fprintf(stderr, "Encountered an error starting a trace, %s\n", + s.ToString().c_str()); + exit(1); + } + s = db_.db->StartTrace(trace_options_, std::move(trace_writer)); + if (!s.ok()) { + fprintf(stderr, "Encountered an error starting a trace, %s\n", + s.ToString().c_str()); + exit(1); + } + fprintf(stdout, "Tracing the workload to: [%s]\n", + FLAGS_trace_file.c_str()); + } + if (num_warmup > 0) { printf("Warming up benchmark by running %d times\n", num_warmup); } @@ -2713,6 +2750,15 @@ void VerifyDBFromDB(std::string& truth_db_name) { (this->*post_process_method)(); } } + + if (name != "replay" && FLAGS_trace_file != "") { + Status s = db_.db->EndTrace(); + if (!s.ok()) { + fprintf(stderr, "Encountered an error ending the trace, %s\n", + s.ToString().c_str()); + } + } + if (FLAGS_statistics) { fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); } @@ -5545,6 +5591,37 @@ void VerifyDBFromDB(std::string& truth_db_name) { } fprintf(stdout, "\n%s\n", stats.c_str()); } + + void Replay(ThreadState* thread) { + if (db_.db != nullptr) { + Replay(thread, &db_); + } + } + + void Replay(ThreadState* /*thread*/, DBWithColumnFamilies* db_with_cfh) { + Status s; + unique_ptr trace_reader; + s = NewFileTraceReader(FLAGS_env, EnvOptions(), FLAGS_trace_file, + &trace_reader); + if (!s.ok()) { + fprintf( + stderr, + "Encountered an error creating a TraceReader from the trace file. " + "Error: %s\n", + s.ToString().c_str()); + exit(1); + } + Replayer replayer(db_with_cfh->db, db_with_cfh->cfh, + std::move(trace_reader)); + s = replayer.Replay(); + if (s.ok()) { + fprintf(stdout, "Replay started from trace_file: %s\n", + FLAGS_trace_file.c_str()); + } else { + fprintf(stderr, "Starting replay failed. Error: %s\n", + s.ToString().c_str()); + } + } }; int db_bench_tool(int argc, char** argv) { diff --git a/util/trace_replay.cc b/util/trace_replay.cc new file mode 100644 index 000000000..ca02dccbc --- /dev/null +++ b/util/trace_replay.cc @@ -0,0 +1,205 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/trace_replay.h" + +#include +#include +#include +#include "db/db_impl.h" +#include "rocksdb/slice.h" +#include "rocksdb/write_batch.h" +#include "util/coding.h" +#include "util/string_util.h" + +namespace rocksdb { + +namespace { +void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) { + PutFixed32(dst, cf_id); + PutLengthPrefixedSlice(dst, key); +} + +void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) { + Slice buf(buffer); + GetFixed32(&buf, cf_id); + GetLengthPrefixedSlice(&buf, key); +} +} // namespace + +Tracer::Tracer(Env* env, std::unique_ptr&& trace_writer) + : env_(env), trace_writer_(std::move(trace_writer)) { + WriteHeader(); +} + +Tracer::~Tracer() { trace_writer_.reset(); } + +Status Tracer::Write(WriteBatch* write_batch) { + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = kTraceWrite; + trace.payload = write_batch->Data(); + return WriteTrace(trace); +} + +Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = kTraceGet; + EncodeCFAndKey(&trace.payload, column_family->GetID(), key); + return WriteTrace(trace); +} + +Status Tracer::WriteHeader() { + std::ostringstream s; + s << kTraceMagic << "\t" + << "Trace Version: 0.1\t" + << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t" + << "Format: Timestamp OpType Payload\n"; + std::string header(s.str()); + + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = kTraceBegin; + trace.payload = header; + return WriteTrace(trace); +} + +Status Tracer::WriteFooter() { + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = kTraceEnd; + trace.payload = ""; + return WriteTrace(trace); +} + +Status Tracer::WriteTrace(const Trace& trace) { + std::string encoded_trace; + PutFixed64(&encoded_trace, trace.ts); + encoded_trace.push_back(trace.type); + PutFixed32(&encoded_trace, static_cast(trace.payload.size())); + encoded_trace.append(trace.payload); + return trace_writer_->Write(Slice(encoded_trace)); +} + +Status Tracer::Close() { return WriteFooter(); } + +Replayer::Replayer(DB* db, const std::vector& handles, + unique_ptr&& reader) + : trace_reader_(std::move(reader)) { + assert(db != nullptr); + db_ = static_cast(db->GetRootDB()); + for (ColumnFamilyHandle* cfh : handles) { + cf_map_[cfh->GetID()] = cfh; + } +} + +Replayer::~Replayer() { trace_reader_.reset(); } + +Status Replayer::Replay() { + Status s; + Trace header; + s = ReadHeader(&header); + if (!s.ok()) { + return s; + } + + std::chrono::system_clock::time_point replay_epoch = + std::chrono::system_clock::now(); + WriteOptions woptions; + ReadOptions roptions; + Trace trace; + uint64_t ops = 0; + while (s.ok()) { + trace.reset(); + s = ReadTrace(&trace); + if (!s.ok()) { + break; + } + + std::this_thread::sleep_until( + replay_epoch + std::chrono::microseconds(trace.ts - header.ts)); + if (trace.type == kTraceWrite) { + WriteBatch batch(trace.payload); + db_->Write(woptions, &batch); + ops++; + } else if (trace.type == kTraceGet) { + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(trace.payload, &cf_id, &key); + if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + + std::string value; + if (cf_id == 0) { + db_->Get(roptions, key, &value); + } else { + db_->Get(roptions, cf_map_[cf_id], key, &value); + } + ops++; + } else if (trace.type == kTraceEnd) { + // Do nothing for now. + // TODO: Add some validations later. + break; + } + } + + if (s.IsIncomplete()) { + // Reaching eof returns Incomplete status at the moment. + // Could happen when killing a process without calling EndTrace() API. + // TODO: Add better error handling. + return Status::OK(); + } + return s; +} + +Status Replayer::ReadHeader(Trace* header) { + assert(header != nullptr); + Status s = ReadTrace(header); + if (!s.ok()) { + return s; + } + if (header->type != kTraceBegin) { + return Status::Corruption("Corrupted trace file. Incorrect header."); + } + if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) { + return Status::Corruption("Corrupted trace file. Incorrect magic."); + } + + return s; +} + +Status Replayer::ReadFooter(Trace* footer) { + assert(footer != nullptr); + Status s = ReadTrace(footer); + if (!s.ok()) { + return s; + } + if (footer->type != kTraceEnd) { + return Status::Corruption("Corrupted trace file. Incorrect footer."); + } + + // TODO: Add more validations later + return s; +} + +Status Replayer::ReadTrace(Trace* trace) { + assert(trace != nullptr); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + + Slice enc_slice = Slice(encoded_trace); + GetFixed64(&enc_slice, &trace->ts); + trace->type = static_cast(enc_slice[0]); + enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize); + trace->payload = enc_slice.ToString(); + return s; +} + +} // namespace rocksdb diff --git a/util/trace_replay.h b/util/trace_replay.h new file mode 100644 index 000000000..84a164014 --- /dev/null +++ b/util/trace_replay.h @@ -0,0 +1,91 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include + +#include "rocksdb/env.h" +#include "rocksdb/trace_reader_writer.h" + +namespace rocksdb { + +class ColumnFamilyHandle; +class DB; +class DBImpl; +class Slice; +class WriteBatch; + +const std::string kTraceMagic = "feedcafedeadbeef"; +const unsigned int kTraceTimestampSize = 8; +const unsigned int kTraceTypeSize = 1; +const unsigned int kTracePayloadLengthSize = 4; +const unsigned int kTraceMetadataSize = + kTraceTimestampSize + kTraceTypeSize + kTracePayloadLengthSize; + +enum TraceType : char { + kTraceBegin = 1, + kTraceEnd = 2, + kTraceWrite = 3, + kTraceGet = 4, + kTraceMax, +}; + +// TODO: This should also be made part of public interface to help users build +// custom TracerReaders and TraceWriters. +struct Trace { + uint64_t ts; + TraceType type; + std::string payload; + + void reset() { + ts = 0; + type = kTraceMax; + payload.clear(); + } +}; + +// Trace RocksDB operations using a TraceWriter. +class Tracer { + public: + Tracer(Env* env, std::unique_ptr&& trace_writer); + ~Tracer(); + + Status Write(WriteBatch* write_batch); + Status Get(ColumnFamilyHandle* cfname, const Slice& key); + + Status Close(); + + private: + Status WriteHeader(); + Status WriteFooter(); + Status WriteTrace(const Trace& trace); + + Env* env_; + unique_ptr trace_writer_; +}; + +// Replay RocksDB operations from a trace. +class Replayer { + public: + Replayer(DB* db, const std::vector& handles, + std::unique_ptr&& reader); + ~Replayer(); + + Status Replay(); + + private: + Status ReadHeader(Trace* header); + Status ReadFooter(Trace* footer); + Status ReadTrace(Trace* trace); + + DBImpl* db_; + std::unique_ptr trace_reader_; + std::unordered_map cf_map_; +}; + +} // namespace rocksdb diff --git a/utilities/trace/file_trace_reader_writer.cc b/utilities/trace/file_trace_reader_writer.cc new file mode 100644 index 000000000..dde36aa93 --- /dev/null +++ b/utilities/trace/file_trace_reader_writer.cc @@ -0,0 +1,117 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/trace/file_trace_reader_writer.h" + +#include "util/coding.h" +#include "util/file_reader_writer.h" +#include "util/trace_replay.h" + +namespace rocksdb { + +const unsigned int FileTraceReader::kBufferSize = 1024; // 1KB + +FileTraceReader::FileTraceReader( + std::unique_ptr&& reader) + : file_reader_(std::move(reader)), + offset_(0), + buffer_(new char[kBufferSize]) {} + +FileTraceReader::~FileTraceReader() { + Close(); + delete[] buffer_; +} + +Status FileTraceReader::Close() { + file_reader_.reset(); + return Status::OK(); +} + +Status FileTraceReader::Read(std::string* data) { + assert(file_reader_ != nullptr); + Status s = file_reader_->Read(offset_, kTraceMetadataSize, &result_, buffer_); + if (!s.ok()) { + return s; + } + if (result_.size() == 0) { + // No more data to read + // Todo: Come up with a better way to indicate end of data. May be this + // could be avoided once footer is introduced. + return Status::Incomplete(); + } + if (result_.size() < kTraceMetadataSize) { + return Status::Corruption("Corrupted trace file."); + } + *data = result_.ToString(); + offset_ += kTraceMetadataSize; + + uint32_t payload_len = + DecodeFixed32(&buffer_[kTraceTimestampSize + kTraceTypeSize]); + + // Read Payload + unsigned int bytes_to_read = payload_len; + unsigned int to_read = + bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read; + while (to_read > 0) { + s = file_reader_->Read(offset_, to_read, &result_, buffer_); + if (!s.ok()) { + return s; + } + if (result_.size() < to_read) { + return Status::Corruption("Corrupted trace file."); + } + data->append(result_.data(), result_.size()); + + offset_ += to_read; + bytes_to_read -= to_read; + to_read = bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read; + } + + return s; +} + +FileTraceWriter::~FileTraceWriter() { Close(); } + +Status FileTraceWriter::Close() { + file_writer_.reset(); + return Status::OK(); +} + +Status FileTraceWriter::Write(const Slice& data) { + return file_writer_->Append(data); +} + +Status NewFileTraceReader(Env* env, const EnvOptions& env_options, + const std::string& trace_filename, + std::unique_ptr* trace_reader) { + unique_ptr trace_file; + Status s = env->NewRandomAccessFile(trace_filename, &trace_file, env_options); + if (!s.ok()) { + return s; + } + + unique_ptr file_reader; + file_reader.reset( + new RandomAccessFileReader(std::move(trace_file), trace_filename)); + trace_reader->reset(new FileTraceReader(std::move(file_reader))); + return s; +} + +Status NewFileTraceWriter(Env* env, const EnvOptions& env_options, + const std::string& trace_filename, + std::unique_ptr* trace_writer) { + unique_ptr trace_file; + Status s = env->NewWritableFile(trace_filename, &trace_file, env_options); + if (!s.ok()) { + return s; + } + + unique_ptr file_writer; + file_writer.reset(new WritableFileWriter(std::move(trace_file), env_options)); + trace_writer->reset(new FileTraceWriter(std::move(file_writer))); + return s; +} + +} // namespace rocksdb diff --git a/utilities/trace/file_trace_reader_writer.h b/utilities/trace/file_trace_reader_writer.h new file mode 100644 index 000000000..b363a3f09 --- /dev/null +++ b/utilities/trace/file_trace_reader_writer.h @@ -0,0 +1,47 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/trace_reader_writer.h" + +namespace rocksdb { + +class RandomAccessFileReader; +class WritableFileWriter; + +// FileTraceReader allows reading RocksDB traces from a file. +class FileTraceReader : public TraceReader { + public: + explicit FileTraceReader(std::unique_ptr&& reader); + ~FileTraceReader(); + + virtual Status Read(std::string* data) override; + virtual Status Close() override; + + private: + unique_ptr file_reader_; + Slice result_; + size_t offset_; + char* const buffer_; + + static const unsigned int kBufferSize; +}; + +// FileTraceWriter allows writing RocksDB traces to a file. +class FileTraceWriter : public TraceWriter { + public: + explicit FileTraceWriter(std::unique_ptr&& file_writer) + : file_writer_(std::move(file_writer)) {} + ~FileTraceWriter(); + + virtual Status Write(const Slice& data) override; + virtual Status Close() override; + + private: + unique_ptr file_writer_; +}; + +} // namespace rocksdb