diff --git a/CMakeLists.txt b/CMakeLists.txt index 43b3f6275..a14d2dde7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -816,9 +816,11 @@ set(SOURCES tools/ldb_tool.cc tools/sst_dump_tool.cc tools/trace_analyzer_tool.cc - trace_replay/trace_replay.cc trace_replay/block_cache_tracer.cc trace_replay/io_tracer.cc + trace_replay/trace_record_handler.cc + trace_replay/trace_record.cc + trace_replay/trace_replay.cc util/coding.cc util/compaction_job_stats_impl.cc util/comparator.cc @@ -878,6 +880,7 @@ set(SOURCES utilities/simulator_cache/sim_cache.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/trace/file_trace_reader_writer.cc + utilities/trace/replayer_impl.cc utilities/transactions/lock/lock_manager.cc utilities/transactions/lock/point/point_lock_tracker.cc utilities/transactions/lock/point/point_lock_manager.cc diff --git a/HISTORY.md b/HISTORY.md index d7de634eb..ea98e1a94 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -15,6 +15,10 @@ * BlockBasedTableOptions.prepopulate_block_cache can be dynamically configured using DB::SetOptions. * Add CompactionOptionsFIFO.age_for_warm, which allows RocksDB to move old files to warm tier in FIFO compactions. Note that file temperature is still an experimental feature. * Add a comment to suggest btrfs user to disable file preallocation by setting `options.allow_fallocate=false`. +* Fast forward option in Trace replay changed to double type to allow replaying at a lower speed, by settings the value between 0 and 1. This option can be set via `ReplayOptions` in `Replayer::Replay()`, or via `--trace_replay_fast_forward` in db_bench. + +## Public API change +* Added APIs to decode and replay trace file via Replayer class. Added `DB::NewDefaultReplayer()` to create a default Replayer instance. Created trace_record.h and utilities/replayer.h files to access decoded Trace records and replay them. ### Performance Improvements * Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value. diff --git a/TARGETS b/TARGETS index d96845938..0f50b4fa8 100644 --- a/TARGETS +++ b/TARGETS @@ -335,6 +335,8 @@ cpp_library( "tools/sst_dump_tool.cc", "trace_replay/block_cache_tracer.cc", "trace_replay/io_tracer.cc", + "trace_replay/trace_record.cc", + "trace_replay/trace_record_handler.cc", "trace_replay/trace_replay.cc", "util/build_version.cc", "util/coding.cc", @@ -398,6 +400,7 @@ cpp_library( "utilities/simulator_cache/sim_cache.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/trace/file_trace_reader_writer.cc", + "utilities/trace/replayer_impl.cc", "utilities/transactions/lock/lock_manager.cc", "utilities/transactions/lock/point/point_lock_manager.cc", "utilities/transactions/lock/point/point_lock_tracker.cc", @@ -650,6 +653,8 @@ cpp_library( "tools/sst_dump_tool.cc", "trace_replay/block_cache_tracer.cc", "trace_replay/io_tracer.cc", + "trace_replay/trace_record.cc", + "trace_replay/trace_record_handler.cc", "trace_replay/trace_replay.cc", "util/build_version.cc", "util/coding.cc", @@ -713,6 +718,7 @@ cpp_library( "utilities/simulator_cache/sim_cache.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/trace/file_trace_reader_writer.cc", + "utilities/trace/replayer_impl.cc", "utilities/transactions/lock/lock_manager.cc", "utilities/transactions/lock/point/point_lock_manager.cc", "utilities/transactions/lock/point/point_lock_tracker.cc", diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index e46092ba4..ef6a9cbda 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -94,6 +94,7 @@ #include "table/table_builder.h" #include "table/two_level_iterator.h" #include "test_util/sync_point.h" +#include "trace_replay/trace_replay.h" #include "util/autovector.h" #include "util/cast_util.h" #include "util/coding.h" @@ -103,6 +104,7 @@ #include "util/mutexlock.h" #include "util/stop_watch.h" #include "util/string_util.h" +#include "utilities/trace/replayer_impl.h" namespace ROCKSDB_NAMESPACE { @@ -4359,9 +4361,7 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv, return earliest_seq; } -#endif // ROCKSDB_LITE -#ifndef ROCKSDB_LITE Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, bool cache_only, SequenceNumber lower_bound_seq, @@ -5108,6 +5108,14 @@ Status DBImpl::EndTrace() { return s; } +Status DBImpl::NewDefaultReplayer( + const std::vector& handles, + std::unique_ptr&& reader, + std::unique_ptr* replayer) { + replayer->reset(new ReplayerImpl(this, handles, std::move(reader))); + return Status::OK(); +} + Status DBImpl::StartBlockCacheTrace( const TraceOptions& trace_options, std::unique_ptr&& trace_writer) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 0b65bd41d..c4bb5b68b 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -51,8 +51,13 @@ #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" #include "rocksdb/status.h" +#ifndef ROCKSDB_LITE #include "rocksdb/trace_reader_writer.h" +#endif // ROCKSDB_LITE #include "rocksdb/transaction_log.h" +#ifndef ROCKSDB_LITE +#include "rocksdb/utilities/replayer.h" +#endif // ROCKSDB_LITE #include "rocksdb/write_buffer_manager.h" #include "table/merging_iterator.h" #include "table/scoped_arena_iterator.h" @@ -464,6 +469,12 @@ class DBImpl : public DB { using DB::EndTrace; virtual Status EndTrace() override; + using DB::NewDefaultReplayer; + virtual Status NewDefaultReplayer( + const std::vector& handles, + std::unique_ptr&& reader, + std::unique_ptr* replayer) override; + using DB::StartBlockCacheTrace; Status StartBlockCacheTrace( const TraceOptions& options, diff --git a/db/db_test2.cc b/db/db_test2.cc index 56682dea5..d55f4a449 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -17,6 +17,7 @@ #include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/persistent_cache.h" +#include "rocksdb/utilities/replayer.h" #include "rocksdb/wal_filter.h" #include "util/random.h" #include "utilities/fault_injection_env.h" @@ -4256,8 +4257,160 @@ TEST_F(DBTest2, TraceAndReplay) { std::unique_ptr trace_reader; ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); - Replayer replayer(db2, handles_, std::move(trace_reader)); - ASSERT_OK(replayer.Replay()); + std::unique_ptr replayer; + ASSERT_OK( + db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); + // Unprepared replay should fail with Status::Incomplete() + ASSERT_TRUE(replayer->Replay().IsIncomplete()); + ASSERT_OK(replayer->Prepare()); + // Ok to repeatedly Prepare(). + ASSERT_OK(replayer->Prepare()); + // Replay using 1 thread, 1x speed. + ASSERT_OK(replayer->Replay()); + + ASSERT_OK(db2->Get(ro, handles[0], "a", &value)); + ASSERT_EQ("1", value); + ASSERT_OK(db2->Get(ro, handles[0], "g", &value)); + ASSERT_EQ("12", value); + ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound()); + + ASSERT_OK(db2->Get(ro, handles[1], "foo", &value)); + ASSERT_EQ("bar", value); + ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value)); + ASSERT_EQ("rocks", value); + + // Re-replay should fail with Status::Incomplete() if Prepare() was not + // called. Currently we don't distinguish between unprepared and trace end. + ASSERT_TRUE(replayer->Replay().IsIncomplete()); + + // Re-replay using 2 threads, 2x speed. + ASSERT_OK(replayer->Prepare()); + ASSERT_OK(replayer->Replay(ReplayOptions(2, 2.0))); + + // Re-replay using 2 threads, 1/2 speed. + ASSERT_OK(replayer->Prepare()); + ASSERT_OK(replayer->Replay(ReplayOptions(2, 0.5))); + replayer.reset(); + + for (auto handle : handles) { + delete handle; + } + delete db2; + ASSERT_OK(DestroyDB(dbname2, options)); +} + +TEST_F(DBTest2, TraceAndManualReplay) { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreatePutOperator(); + ReadOptions ro; + WriteOptions wo; + TraceOptions trace_opts; + EnvOptions env_opts; + CreateAndReopenWithCF({"pikachu"}, options); + Random rnd(301); + Iterator* single_iter = nullptr; + + ASSERT_TRUE(db_->EndTrace().IsIOError()); + + std::string trace_filename = dbname_ + "/rocksdb.trace"; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer)); + ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer))); + + ASSERT_OK(Put(0, "a", "1")); + ASSERT_OK(Merge(0, "b", "2")); + ASSERT_OK(Delete(0, "c")); + ASSERT_OK(SingleDelete(0, "d")); + ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f")); + + WriteBatch batch; + ASSERT_OK(batch.Put("f", "11")); + ASSERT_OK(batch.Merge("g", "12")); + ASSERT_OK(batch.Delete("h")); + ASSERT_OK(batch.SingleDelete("i")); + ASSERT_OK(batch.DeleteRange("j", "k")); + ASSERT_OK(db_->Write(wo, &batch)); + + single_iter = db_->NewIterator(ro); + single_iter->Seek("f"); + single_iter->SeekForPrev("g"); + delete single_iter; + + ASSERT_EQ("1", Get(0, "a")); + ASSERT_EQ("12", Get(0, "g")); + + ASSERT_OK(Put(1, "foo", "bar")); + ASSERT_OK(Put(1, "rocksdb", "rocks")); + ASSERT_EQ("NOT_FOUND", Get(1, "leveldb")); + + ASSERT_OK(db_->EndTrace()); + // These should not get into the trace file as it is after EndTrace. + Put("hello", "world"); + Merge("foo", "bar"); + + // Open another db, replay, and verify the data + std::string value; + std::string dbname2 = test::PerThreadDBPath(env_, "/db_replay"); + ASSERT_OK(DestroyDB(dbname2, options)); + + // Using a different name than db2, to pacify infer's use-after-lifetime + // warnings (http://fbinfer.com). + DB* db2_init = nullptr; + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname2, &db2_init)); + ColumnFamilyHandle* cf; + ASSERT_OK( + db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf)); + delete cf; + delete db2_init; + + DB* db2 = nullptr; + std::vector column_families; + ColumnFamilyOptions cf_options; + cf_options.merge_operator = MergeOperators::CreatePutOperator(); + column_families.push_back(ColumnFamilyDescriptor("default", cf_options)); + column_families.push_back( + ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions())); + std::vector handles; + DBOptions db_opts; + db_opts.env = env_; + ASSERT_OK(DB::Open(db_opts, dbname2, column_families, &handles, &db2)); + + env_->SleepForMicroseconds(100); + // Verify that the keys don't already exist + ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound()); + + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); + std::unique_ptr replayer; + ASSERT_OK( + db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); + + // Manual replay for 2 times. The 2nd checks if the replay can restart. + std::unique_ptr record; + for (int i = 0; i < 2; i++) { + // Next should fail if unprepared. + ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete()); + ASSERT_OK(replayer->Prepare()); + Status s = Status::OK(); + // Looping until trace end. + while (s.ok()) { + s = replayer->Next(&record); + // Skip unsupported operations. + if (s.IsNotSupported()) { + continue; + } + if (s.ok()) { + ASSERT_OK(replayer->Execute(std::move(record))); + } + } + // Status::Incomplete() will be returned when manually reading the trace + // end, or Prepare() was not called. + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(replayer->Next(nullptr).IsIncomplete()); + } ASSERT_OK(db2->Get(ro, handles[0], "a", &value)); ASSERT_EQ("1", value); @@ -4271,6 +4424,85 @@ TEST_F(DBTest2, TraceAndReplay) { ASSERT_OK(db2->Get(ro, handles[1], "rocksdb", &value)); ASSERT_EQ("rocks", value); + // Test execution of artificially created TraceRecords. + uint64_t fake_ts = 1U; + // Write + batch.Clear(); + batch.Put("trace-record-write1", "write1"); + batch.Put("trace-record-write2", "write2"); + record.reset(new WriteQueryTraceRecord(batch.Data(), fake_ts++)); + ASSERT_OK(replayer->Execute(std::move(record))); + ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write1", &value)); + ASSERT_EQ("write1", value); + ASSERT_OK(db2->Get(ro, handles[0], "trace-record-write2", &value)); + ASSERT_EQ("write2", value); + + // Get related + // Get an existing key. + record.reset(new GetQueryTraceRecord(handles[0]->GetID(), + "trace-record-write1", fake_ts++)); + ASSERT_OK(replayer->Execute(std::move(record))); + // Get an non-existing key, should still return Status::OK(). + record.reset(new GetQueryTraceRecord(handles[0]->GetID(), "trace-record-get", + fake_ts++)); + ASSERT_OK(replayer->Execute(std::move(record))); + // Get from an invalid (non-existing) cf_id. + uint32_t invalid_cf_id = handles[1]->GetID() + 1; + record.reset(new GetQueryTraceRecord(invalid_cf_id, "whatever", fake_ts++)); + ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption()); + + // Iteration related + for (IteratorSeekQueryTraceRecord::SeekType seekType : + {IteratorSeekQueryTraceRecord::kSeek, + IteratorSeekQueryTraceRecord::kSeekForPrev}) { + // Seek to an existing key. + record.reset(new IteratorSeekQueryTraceRecord( + seekType, handles[0]->GetID(), "trace-record-write1", fake_ts++)); + ASSERT_OK(replayer->Execute(std::move(record))); + // Seek to an non-existing key, should still return Status::OK(). + record.reset(new IteratorSeekQueryTraceRecord( + seekType, handles[0]->GetID(), "trace-record-get", fake_ts++)); + ASSERT_OK(replayer->Execute(std::move(record))); + // Seek from an invalid cf_id. + record.reset(new IteratorSeekQueryTraceRecord(seekType, invalid_cf_id, + "whatever", fake_ts++)); + ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption()); + } + + // MultiGet related + // Get existing keys. + record.reset(new MultiGetQueryTraceRecord( + std::vector({handles[0]->GetID(), handles[1]->GetID()}), + std::vector({"a", "foo"}), fake_ts++)); + ASSERT_OK(replayer->Execute(std::move(record))); + // Get all non-existing keys, should still return Status::OK(). + record.reset(new MultiGetQueryTraceRecord( + std::vector({handles[0]->GetID(), handles[1]->GetID()}), + std::vector({"no1", "no2"}), fake_ts++)); + // Get mixed of existing and non-existing keys, should still return + // Status::OK(). + record.reset(new MultiGetQueryTraceRecord( + std::vector({handles[0]->GetID(), handles[1]->GetID()}), + std::vector({"a", "no2"}), fake_ts++)); + ASSERT_OK(replayer->Execute(std::move(record))); + // Get from an invalid (non-existing) cf_id. + record.reset(new MultiGetQueryTraceRecord( + std::vector( + {handles[0]->GetID(), handles[1]->GetID(), invalid_cf_id}), + std::vector({"a", "foo", "whatever"}), fake_ts++)); + ASSERT_TRUE(replayer->Execute(std::move(record)).IsCorruption()); + // Empty MultiGet + record.reset(new MultiGetQueryTraceRecord( + std::vector(), std::vector(), fake_ts++)); + ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument()); + // MultiGet size mismatch + record.reset(new MultiGetQueryTraceRecord( + std::vector({handles[0]->GetID(), handles[1]->GetID()}), + std::vector({"a"}), fake_ts++)); + ASSERT_TRUE(replayer->Execute(std::move(record)).IsInvalidArgument()); + + replayer.reset(); + for (auto handle : handles) { delete handle; } @@ -4334,8 +4566,12 @@ TEST_F(DBTest2, TraceWithLimit) { std::unique_ptr trace_reader; ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); - Replayer replayer(db2, handles_, std::move(trace_reader)); - ASSERT_OK(replayer.Replay()); + std::unique_ptr replayer; + ASSERT_OK( + db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); + ASSERT_OK(replayer->Prepare()); + ASSERT_OK(replayer->Replay()); + replayer.reset(); ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound()); @@ -4405,8 +4641,12 @@ TEST_F(DBTest2, TraceWithSampling) { std::unique_ptr trace_reader; ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); - Replayer replayer(db2, handles_, std::move(trace_reader)); - ASSERT_OK(replayer.Replay()); + std::unique_ptr replayer; + ASSERT_OK( + db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); + ASSERT_OK(replayer->Prepare()); + ASSERT_OK(replayer->Replay()); + replayer.reset(); ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound()); @@ -4505,8 +4745,12 @@ TEST_F(DBTest2, TraceWithFilter) { std::unique_ptr trace_reader; ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); - Replayer replayer(db2, handles_, std::move(trace_reader)); - ASSERT_OK(replayer.Replay()); + std::unique_ptr replayer; + ASSERT_OK( + db2->NewDefaultReplayer(handles, std::move(trace_reader), &replayer)); + ASSERT_OK(replayer->Prepare()); + ASSERT_OK(replayer->Replay()); + replayer.reset(); // All the key-values should not present since we filter out the WRITE ops. ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); diff --git a/env/file_system_tracer.cc b/env/file_system_tracer.cc index 9a85dd5e0..733f45571 100644 --- a/env/file_system_tracer.cc +++ b/env/file_system_tracer.cc @@ -7,6 +7,7 @@ #include "rocksdb/file_system.h" #include "rocksdb/system_clock.h" +#include "rocksdb/trace_record.h" namespace ROCKSDB_NAMESPACE { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 645ecfb45..1f715b3cf 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -39,25 +39,31 @@ namespace ROCKSDB_NAMESPACE { -struct Options; -struct DBOptions; struct ColumnFamilyOptions; -struct ReadOptions; -struct WriteOptions; -struct FlushOptions; struct CompactionOptions; struct CompactRangeOptions; -struct TableProperties; +struct DBOptions; struct ExternalSstFileInfo; -class WriteBatch; +struct FlushOptions; +struct Options; +struct ReadOptions; +struct TableProperties; +struct WriteOptions; +#ifdef ROCKSDB_LITE +class CompactionJobInfo; +#endif class Env; class EventListener; +class FileSystem; +#ifndef ROCKSDB_LITE +class Replayer; +#endif class StatsHistoryIterator; +#ifndef ROCKSDB_LITE +class TraceReader; class TraceWriter; -#ifdef ROCKSDB_LITE -class CompactionJobInfo; #endif -class FileSystem; +class WriteBatch; extern const std::string kDefaultColumnFamilyName; extern const std::string kPersistentStatsColumnFamilyName; @@ -1628,6 +1634,7 @@ class DB { virtual ColumnFamilyHandle* DefaultColumnFamily() const = 0; #ifndef ROCKSDB_LITE + virtual Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, TablePropertiesCollection* props) = 0; virtual Status GetPropertiesOfAllTables(TablePropertiesCollection* props) { @@ -1678,6 +1685,15 @@ class DB { virtual Status EndBlockCacheTrace() { return Status::NotSupported("EndBlockCacheTrace() is not implemented."); } + + // Create a default trace replayer. + virtual Status NewDefaultReplayer( + const std::vector& /*handles*/, + std::unique_ptr&& /*reader*/, + std::unique_ptr* /*replayer*/) { + return Status::NotSupported("NewDefaultReplayer() is not implemented."); + } + #endif // ROCKSDB_LITE // Needed for StackableDB diff --git a/include/rocksdb/trace_reader_writer.h b/include/rocksdb/trace_reader_writer.h index 26ceab2c8..50c72f7bb 100644 --- a/include/rocksdb/trace_reader_writer.h +++ b/include/rocksdb/trace_reader_writer.h @@ -36,6 +36,11 @@ class TraceReader { virtual Status Read(std::string* data) = 0; virtual Status Close() = 0; + + // Seek back to the trace header. Replayer can call this method for + // repeatedly replaying. Note this method may fail if the reader is already + // closed. + virtual Status Reset() = 0; }; // Factory methods to read/write traces from/to a file. @@ -45,4 +50,5 @@ Status NewFileTraceWriter(Env* env, const EnvOptions& env_options, Status NewFileTraceReader(Env* env, const EnvOptions& env_options, const std::string& trace_filename, std::unique_ptr* trace_reader); + } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/trace_record.h b/include/rocksdb/trace_record.h new file mode 100644 index 000000000..add235eaa --- /dev/null +++ b/include/rocksdb/trace_record.h @@ -0,0 +1,205 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/slice.h" + +namespace ROCKSDB_NAMESPACE { + +class ColumnFamilyHandle; +class DB; +class Status; + +// Supported trace record types. +enum TraceType : char { + kTraceNone = 0, + kTraceBegin = 1, + kTraceEnd = 2, + // Query level tracing related trace types. + kTraceWrite = 3, + kTraceGet = 4, + kTraceIteratorSeek = 5, + kTraceIteratorSeekForPrev = 6, + // Block cache tracing related trace types. + kBlockTraceIndexBlock = 7, + kBlockTraceFilterBlock = 8, + kBlockTraceDataBlock = 9, + kBlockTraceUncompressionDictBlock = 10, + kBlockTraceRangeDeletionBlock = 11, + // IO tracing related trace type. + kIOTracer = 12, + // Query level tracing related trace type. + kTraceMultiGet = 13, + // All trace types should be added before kTraceMax + kTraceMax, +}; + +class WriteQueryTraceRecord; +class GetQueryTraceRecord; +class IteratorSeekQueryTraceRecord; +class MultiGetQueryTraceRecord; + +// Base class for all types of trace records. +class TraceRecord { + public: + TraceRecord(); + explicit TraceRecord(uint64_t timestamp); + virtual ~TraceRecord(); + + virtual TraceType GetTraceType() const = 0; + + virtual uint64_t GetTimestamp() const; + + class Handler { + public: + virtual ~Handler() {} + + virtual Status Handle(const WriteQueryTraceRecord& record) = 0; + virtual Status Handle(const GetQueryTraceRecord& record) = 0; + virtual Status Handle(const IteratorSeekQueryTraceRecord& record) = 0; + virtual Status Handle(const MultiGetQueryTraceRecord& record) = 0; + }; + + virtual Status Accept(Handler* handler) = 0; + + // Create a handler for the exeution of TraceRecord. + static Handler* NewExecutionHandler( + DB* db, const std::vector& handles); + + private: + // Timestamp (in microseconds) of this trace. + uint64_t timestamp_; +}; + +// Base class for all query types of trace records. +class QueryTraceRecord : public TraceRecord { + public: + explicit QueryTraceRecord(uint64_t timestamp); + + virtual ~QueryTraceRecord() override; +}; + +// Trace record for DB::Write() operation. +class WriteQueryTraceRecord : public QueryTraceRecord { + public: + WriteQueryTraceRecord(PinnableSlice&& write_batch_rep, uint64_t timestamp); + + WriteQueryTraceRecord(const std::string& write_batch_rep, uint64_t timestamp); + + virtual ~WriteQueryTraceRecord() override; + + TraceType GetTraceType() const override { return kTraceWrite; }; + + virtual Slice GetWriteBatchRep() const; + + virtual Status Accept(Handler* handler) override; + + private: + PinnableSlice rep_; +}; + +// Trace record for DB::Get() operation +class GetQueryTraceRecord : public QueryTraceRecord { + public: + GetQueryTraceRecord(uint32_t column_family_id, PinnableSlice&& key, + uint64_t timestamp); + + GetQueryTraceRecord(uint32_t column_family_id, const std::string& key, + uint64_t timestamp); + + virtual ~GetQueryTraceRecord() override; + + TraceType GetTraceType() const override { return kTraceGet; }; + + virtual uint32_t GetColumnFamilyID() const; + + virtual Slice GetKey() const; + + virtual Status Accept(Handler* handler) override; + + private: + // Column family ID. + uint32_t cf_id_; + // Key to get. + PinnableSlice key_; +}; + +// Base class for all Iterator related operations. +class IteratorQueryTraceRecord : public QueryTraceRecord { + public: + explicit IteratorQueryTraceRecord(uint64_t timestamp); + + virtual ~IteratorQueryTraceRecord() override; +}; + +// Trace record for Iterator::Seek() and Iterator::SeekForPrev() operation. +class IteratorSeekQueryTraceRecord : public IteratorQueryTraceRecord { + public: + // Currently we only support Seek() and SeekForPrev(). + enum SeekType { + kSeek = kTraceIteratorSeek, + kSeekForPrev = kTraceIteratorSeekForPrev + }; + + IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id, + PinnableSlice&& key, uint64_t timestamp); + + IteratorSeekQueryTraceRecord(SeekType seekType, uint32_t column_family_id, + const std::string& key, uint64_t timestamp); + + virtual ~IteratorSeekQueryTraceRecord() override; + + TraceType GetTraceType() const override; + + virtual SeekType GetSeekType() const; + + virtual uint32_t GetColumnFamilyID() const; + + virtual Slice GetKey() const; + + virtual Status Accept(Handler* handler) override; + + private: + SeekType type_; + // Column family ID. + uint32_t cf_id_; + // Key to seek to. + PinnableSlice key_; +}; + +// Trace record for DB::MultiGet() operation. +class MultiGetQueryTraceRecord : public QueryTraceRecord { + public: + MultiGetQueryTraceRecord(std::vector column_family_ids, + std::vector&& keys, + uint64_t timestamp); + + MultiGetQueryTraceRecord(std::vector column_family_ids, + const std::vector& keys, + uint64_t timestamp); + + virtual ~MultiGetQueryTraceRecord() override; + + TraceType GetTraceType() const override { return kTraceMultiGet; }; + + virtual std::vector GetColumnFamilyIDs() const; + + virtual std::vector GetKeys() const; + + virtual Status Accept(Handler* handler) override; + + private: + // Column familiy IDs. + std::vector cf_ids_; + // Keys to get. + std::vector keys_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/utilities/replayer.h b/include/rocksdb/utilities/replayer.h new file mode 100644 index 000000000..976fadb68 --- /dev/null +++ b/include/rocksdb/utilities/replayer.h @@ -0,0 +1,74 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" +#include "rocksdb/trace_record.h" + +namespace ROCKSDB_NAMESPACE { + +struct ReplayOptions { + // Number of threads used for replaying. If 0 or 1, replay using + // single thread. + uint32_t num_threads; + + // Enables fast forwarding a replay by increasing/reducing the delay between + // the ingested traces. + // If > 0.0 and < 1.0, slow down the replay by this amount. + // If 1.0, replay the operations at the same rate as in the trace stream. + // If > 1, speed up the replay by this amount. + double fast_forward; + + ReplayOptions() : num_threads(1), fast_forward(1.0) {} + ReplayOptions(uint32_t num_of_threads, double fast_forward_ratio) + : num_threads(num_of_threads), fast_forward(fast_forward_ratio) {} +}; + +// Replayer helps to replay the captured RocksDB query level operations. +// The Replayer can either be created from DB::NewReplayer method, or be +// instantiated via db_bench today, on using "replay" benchmark. +class Replayer { + public: + virtual ~Replayer() {} + + // Make some preparation before replaying the trace. This will also reset the + // replayer in order to restart replaying. + virtual Status Prepare() = 0; + + // Return the timestamp when the trace recording was started. + virtual uint64_t GetHeaderTimestamp() const = 0; + + // Atomically read one trace into a TraceRecord (excluding the header and + // footer traces). + // Return Status::OK() on success; + // Status::Incomplete() if Prepare() was not called or no more available + // trace; + // Status::NotSupported() if the read trace type is not supported. + virtual Status Next(std::unique_ptr* record) = 0; + + // Execute one TraceRecord. + // Return Status::OK() if the execution was successful. Get/MultiGet traces + // will still return Status::OK() even if they got Status::NotFound() + // from DB::Get() or DB::MultiGet(); + // Status::Incomplete() if Prepare() was not called or no more available + // trace; + // Status::NotSupported() if the operation is not supported; + // Otherwise, return the corresponding error status. + virtual Status Execute(const std::unique_ptr& record) = 0; + virtual Status Execute(std::unique_ptr&& record) = 0; + + // Replay all the traces from the provided trace stream, taking the delay + // between the traces into consideration. + virtual Status Replay(const ReplayOptions& options) = 0; + virtual Status Replay() { return Replay(ReplayOptions()); } +}; + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index f798652b4..5d3eea6a7 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -390,6 +390,13 @@ class StackableDB : public DB { using DB::EndTrace; Status EndTrace() override { return db_->EndTrace(); } + using DB::NewDefaultReplayer; + Status NewDefaultReplayer(const std::vector& handles, + std::unique_ptr&& reader, + std::unique_ptr* replayer) override { + return db_->NewDefaultReplayer(handles, std::move(reader), replayer); + } + #endif // ROCKSDB_LITE virtual Status GetLiveFiles(std::vector& vec, uint64_t* mfs, diff --git a/src.mk b/src.mk index 053181715..b2c83f048 100644 --- a/src.mk +++ b/src.mk @@ -198,6 +198,8 @@ LIB_SOURCES = \ test_util/sync_point_impl.cc \ test_util/transaction_test_util.cc \ tools/dump/db_dump_tool.cc \ + trace_replay/trace_record_handler.cc \ + trace_replay/trace_record.cc \ trace_replay/trace_replay.cc \ trace_replay/block_cache_tracer.cc \ trace_replay/io_tracer.cc \ @@ -262,6 +264,7 @@ LIB_SOURCES = \ utilities/simulator_cache/sim_cache.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/trace/file_trace_reader_writer.cc \ + utilities/trace/replayer_impl.cc \ utilities/transactions/lock/lock_manager.cc \ utilities/transactions/lock/point/point_lock_tracker.cc \ utilities/transactions/lock/point/point_lock_manager.cc \ diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index d5b1d5e83..f6eca75e3 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -36,6 +36,7 @@ #include "rocksdb/system_clock.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" +#include "rocksdb/trace_record.h" #include "table/block_based/binary_search_index_reader.h" #include "table/block_based/block.h" #include "table/block_based/block_based_filter_block.h" diff --git a/table/table_test.cc b/table/table_test.cc index ac0d6b930..f805418af 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include + #include #include #include @@ -34,6 +35,7 @@ #include "rocksdb/perf_context.h" #include "rocksdb/slice_transform.h" #include "rocksdb/statistics.h" +#include "rocksdb/trace_record.h" #include "rocksdb/write_buffer_manager.h" #include "table/block_based/block.h" #include "table/block_based/block_based_table_builder.h" diff --git a/tools/block_cache_analyzer/block_cache_trace_analyzer.cc b/tools/block_cache_analyzer/block_cache_trace_analyzer.cc index 29ec8cb91..ff618f4b5 100644 --- a/tools/block_cache_analyzer/block_cache_trace_analyzer.cc +++ b/tools/block_cache_analyzer/block_cache_trace_analyzer.cc @@ -20,6 +20,7 @@ #include "monitoring/histogram.h" #include "rocksdb/system_clock.h" +#include "rocksdb/trace_record.h" #include "util/gflags_compat.h" #include "util/string_util.h" diff --git a/tools/block_cache_analyzer/block_cache_trace_analyzer.h b/tools/block_cache_analyzer/block_cache_trace_analyzer.h index 4436e0b77..8afdf5449 100644 --- a/tools/block_cache_analyzer/block_cache_trace_analyzer.h +++ b/tools/block_cache_analyzer/block_cache_trace_analyzer.h @@ -11,6 +11,7 @@ #include "db/dbformat.h" #include "rocksdb/env.h" +#include "rocksdb/trace_record.h" #include "rocksdb/utilities/sim_cache.h" #include "trace_replay/block_cache_tracer.h" #include "utilities/simulator_cache/cache_simulator.h" diff --git a/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc b/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc index 91bd30652..ee65c6076 100644 --- a/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc +++ b/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc @@ -21,6 +21,7 @@ int main() { #include "rocksdb/env.h" #include "rocksdb/status.h" #include "rocksdb/trace_reader_writer.h" +#include "rocksdb/trace_record.h" #include "test_util/testharness.h" #include "test_util/testutil.h" #include "tools/block_cache_analyzer/block_cache_trace_analyzer.h" diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index c7c254735..89d750536 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -63,6 +63,9 @@ #include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/options_type.h" #include "rocksdb/utilities/options_util.h" +#ifndef ROCKSDB_LITE +#include "rocksdb/utilities/replayer.h" +#endif // ROCKSDB_LITE #include "rocksdb/utilities/sim_cache.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" @@ -228,7 +231,9 @@ IF_ROCKSDB_LITE("", "\tmemstats -- Print memtable stats\n" "\tsstables -- Print sstable info\n" "\theapprofile -- Dump a heap profile (if supported by this port)\n" +#ifndef ROCKSDB_LITE "\treplay -- replay the trace file specified with trace_file\n" +#endif // ROCKSDB_LITE "\tgetmergeoperands -- Insert lots of merge records which are a list of " "sorted ints for a key and then compare performance of lookup for another " "key " @@ -997,10 +1002,12 @@ DEFINE_bool(report_bg_io_stats, false, DEFINE_bool(use_stderr_info_logger, false, "Write info logs to stderr instead of to LOG file. "); +#ifndef ROCKSDB_LITE + DEFINE_string(trace_file, "", "Trace workload to a file. "); -DEFINE_int32(trace_replay_fast_forward, 1, - "Fast forward trace replay, must >= 1. "); +DEFINE_double(trace_replay_fast_forward, 1.0, + "Fast forward trace replay, must > 0.0."); DEFINE_int32(block_cache_trace_sampling_frequency, 1, "Block cache trace sampling frequency, termed s. It uses spatial " "downsampling and samples accesses to one out of s blocks."); @@ -1014,6 +1021,8 @@ DEFINE_string(block_cache_trace_file, "", "Block cache trace file path."); DEFINE_int32(trace_replay_threads, 1, "The number of threads to replay, must >=1."); +#endif // ROCKSDB_LITE + static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( const char* ctype) { assert(ctype); @@ -3468,6 +3477,7 @@ class Benchmark { PrintStats("rocksdb.sstables"); } else if (name == "stats_history") { PrintStatsHistory(); +#ifndef ROCKSDB_LITE } else if (name == "replay") { if (num_threads > 1) { fprintf(stderr, "Multi-threaded replay is not yet supported\n"); @@ -3478,6 +3488,7 @@ class Benchmark { ErrorExit(); } method = &Benchmark::Replay; +#endif // ROCKSDB_LITE } else if (name == "getmergeoperands") { method = &Benchmark::GetMergeOperands; } else if (!name.empty()) { // No error message for empty name @@ -7978,6 +7989,8 @@ class Benchmark { } } +#ifndef ROCKSDB_LITE + void Replay(ThreadState* thread) { if (db_.db != nullptr) { Replay(thread, &db_); @@ -7997,20 +8010,34 @@ class Benchmark { s.ToString().c_str()); exit(1); } - Replayer replayer(db_with_cfh->db, db_with_cfh->cfh, - std::move(trace_reader)); - replayer.SetFastForward( - static_cast(FLAGS_trace_replay_fast_forward)); - s = replayer.MultiThreadReplay( - static_cast(FLAGS_trace_replay_threads)); + std::unique_ptr replayer; + s = db_with_cfh->db->NewDefaultReplayer(db_with_cfh->cfh, + std::move(trace_reader), &replayer); + if (!s.ok()) { + fprintf(stderr, + "Encountered an error creating a default Replayer. " + "Error: %s\n", + s.ToString().c_str()); + exit(1); + } + s = replayer->Prepare(); + if (!s.ok()) { + fprintf(stderr, "Prepare for replay failed. Error: %s\n", + s.ToString().c_str()); + } + s = replayer->Replay( + ReplayOptions(static_cast(FLAGS_trace_replay_threads), + FLAGS_trace_replay_fast_forward)); + replayer.reset(); if (s.ok()) { - fprintf(stdout, "Replay started from trace_file: %s\n", + fprintf(stdout, "Replay completed from trace_file: %s\n", FLAGS_trace_file.c_str()); } else { - fprintf(stderr, "Starting replay failed. Error: %s\n", - s.ToString().c_str()); + fprintf(stderr, "Replay failed. Error: %s\n", s.ToString().c_str()); } } + +#endif // ROCKSDB_LITE }; int db_bench_tool(int argc, char** argv) { diff --git a/tools/trace_analyzer_tool.cc b/tools/trace_analyzer_tool.cc index 6dd0a423a..973b3d6bd 100644 --- a/tools/trace_analyzer_tool.cc +++ b/tools/trace_analyzer_tool.cc @@ -195,12 +195,6 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, uint64_t op2) { return (op1 * op2); } -void DecodeCFAndKeyFromString(std::string& buffer, uint32_t* cf_id, Slice* key) { - Slice buf(buffer); - GetFixed32(&buf, cf_id); - GetLengthPrefixedSlice(&buf, key); -} - } // namespace // The default constructor of AnalyzerOptions @@ -477,74 +471,89 @@ Status TraceAnalyzer::StartProcessing() { total_requests_++; end_time_ = trace.ts; - if (trace.type == kTraceWrite) { - total_writes_++; - c_time_ = trace.ts; - Slice batch_data; - if (trace_file_version_ < 2) { - Slice tmp_data(trace.payload); - batch_data = tmp_data; - } else { - WritePayload w_payload; - TracerHelper::DecodeWritePayload(&trace, &w_payload); - batch_data = w_payload.write_batch_data; - } - // Note that, if the write happens in a transaction, - // 'Write' will be called twice, one for Prepare, one for - // Commit. Thus, in the trace, for the same WriteBatch, there - // will be two reords if it is in a transaction. Here, we only - // process the reord that is committed. If write is non-transaction, - // HasBeginPrepare()==false, so we process it normally. - WriteBatch batch(batch_data.ToString()); - if (batch.HasBeginPrepare() && !batch.HasCommit()) { - continue; - } - TraceWriteHandler write_handler(this); - s = batch.Iterate(&write_handler); - if (!s.ok()) { - fprintf(stderr, "Cannot process the write batch in the trace\n"); - return s; + if (trace.type == kTraceEnd) { + break; + } + + std::unique_ptr record; + switch (trace.type) { + case kTraceWrite: { + s = TracerHelper::DecodeWriteRecord(&trace, trace_file_version_, + &record); + if (!s.ok()) { + return s; + } + total_writes_++; + c_time_ = trace.ts; + std::unique_ptr r( + reinterpret_cast(record.release())); + // Note that, if the write happens in a transaction, + // 'Write' will be called twice, one for Prepare, one for + // Commit. Thus, in the trace, for the same WriteBatch, there + // will be two reords if it is in a transaction. Here, we only + // process the reord that is committed. If write is non-transaction, + // HasBeginPrepare()==false, so we process it normally. + WriteBatch batch(r->GetWriteBatchRep().ToString()); + if (batch.HasBeginPrepare() && !batch.HasCommit()) { + continue; + } + TraceWriteHandler write_handler(this); + s = batch.Iterate(&write_handler); + if (!s.ok()) { + fprintf(stderr, "Cannot process the write batch in the trace\n"); + return s; + } + break; } - } else if (trace.type == kTraceGet) { - GetPayload get_payload; - get_payload.get_key = 0; - if (trace_file_version_ < 2) { - DecodeCFAndKeyFromString(trace.payload, &get_payload.cf_id, - &get_payload.get_key); - } else { - TracerHelper::DecodeGetPayload(&trace, &get_payload); + case kTraceGet: { + s = TracerHelper::DecodeGetRecord(&trace, trace_file_version_, &record); + if (!s.ok()) { + return s; + } + total_gets_++; + std::unique_ptr r( + reinterpret_cast(record.release())); + s = HandleGet(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(), + 1); + if (!s.ok()) { + fprintf(stderr, "Cannot process the get in the trace\n"); + return s; + } + break; } - total_gets_++; - - s = HandleGet(get_payload.cf_id, get_payload.get_key.ToString(), trace.ts, - 1); - if (!s.ok()) { - fprintf(stderr, "Cannot process the get in the trace\n"); - return s; + case kTraceIteratorSeek: + case kTraceIteratorSeekForPrev: { + s = TracerHelper::DecodeIterRecord(&trace, trace_file_version_, + &record); + if (!s.ok()) { + return s; + } + std::unique_ptr r( + reinterpret_cast(record.release())); + s = HandleIter(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(), + r->GetTraceType()); + if (!s.ok()) { + fprintf(stderr, "Cannot process the iterator in the trace\n"); + return s; + } + break; } - } else if (trace.type == kTraceIteratorSeek || - trace.type == kTraceIteratorSeekForPrev) { - IterPayload iter_payload; - iter_payload.cf_id = 0; - if (trace_file_version_ < 2) { - DecodeCFAndKeyFromString(trace.payload, &iter_payload.cf_id, - &iter_payload.iter_key); - } else { - TracerHelper::DecodeIterPayload(&trace, &iter_payload); + case kTraceMultiGet: { + s = TracerHelper::DecodeMultiGetRecord(&trace, trace_file_version_, + &record); + if (!s.ok()) { + return s; + } + std::unique_ptr r( + reinterpret_cast(record.release())); + s = HandleMultiGet(r->GetColumnFamilyIDs(), r->GetKeys(), + r->GetTimestamp()); + break; } - s = HandleIter(iter_payload.cf_id, iter_payload.iter_key.ToString(), - trace.ts, trace.type); - if (!s.ok()) { - fprintf(stderr, "Cannot process the iterator in the trace\n"); - return s; + default: { + // Skip unsupported types + break; } - } else if (trace.type == kTraceMultiGet) { - MultiGetPayload multiget_payload; - assert(trace_file_version_ >= 2); - TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload); - s = HandleMultiGet(multiget_payload, trace.ts); - } else if (trace.type == kTraceEnd) { - break; } } if (s.IsIncomplete()) { @@ -825,7 +834,7 @@ Status TraceAnalyzer::MakeStatisticCorrelation(TraceStats& stats, // Process the statistics of QPS Status TraceAnalyzer::MakeStatisticQPS() { - if(begin_time_ == 0) { + if (begin_time_ == 0) { begin_time_ = trace_create_time_; } uint32_t duration = @@ -1547,9 +1556,8 @@ Status TraceAnalyzer::CloseOutputFiles() { } // Handle the Get request in the trace -Status TraceAnalyzer::HandleGet(uint32_t column_family_id, - const std::string& key, const uint64_t& ts, - const uint32_t& get_ret) { +Status TraceAnalyzer::HandleGet(uint32_t column_family_id, const Slice& key, + const uint64_t& ts, const uint32_t& get_ret) { Status s; size_t value_size = 0; if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { @@ -1575,8 +1583,8 @@ Status TraceAnalyzer::HandleGet(uint32_t column_family_id, if (get_ret == 1) { value_size = 10; } - s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, key, - value_size, ts); + s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, + key.ToString(), value_size, ts); if (!s.ok()) { return Status::Corruption("Failed to insert key statistics"); } @@ -1752,9 +1760,8 @@ Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key, } // Handle the Iterator request in the trace -Status TraceAnalyzer::HandleIter(uint32_t column_family_id, - const std::string& key, const uint64_t& ts, - TraceType& trace_type) { +Status TraceAnalyzer::HandleIter(uint32_t column_family_id, const Slice& key, + const uint64_t& ts, TraceType trace_type) { Status s; size_t value_size = 0; int type = -1; @@ -1788,7 +1795,7 @@ Status TraceAnalyzer::HandleIter(uint32_t column_family_id, if (!ta_[type].enabled) { return Status::OK(); } - s = KeyStatsInsertion(type, column_family_id, key, value_size, ts); + s = KeyStatsInsertion(type, column_family_id, key.ToString(), value_size, ts); if (!s.ok()) { return Status::Corruption("Failed to insert key statistics"); } @@ -1796,24 +1803,22 @@ Status TraceAnalyzer::HandleIter(uint32_t column_family_id, } // Handle MultiGet queries in the trace -Status TraceAnalyzer::HandleMultiGet(MultiGetPayload& multiget_payload, - const uint64_t& ts) { +Status TraceAnalyzer::HandleMultiGet( + const std::vector& column_family_ids, + const std::vector& keys, const uint64_t& ts) { Status s; size_t value_size = 0; - if (multiget_payload.cf_ids.size() != multiget_payload.multiget_keys.size()) { + if (column_family_ids.size() != keys.size()) { // The size does not match is not the error of tracing and anayzing, we just // report it to the user. The analyzing continues. printf("The CF ID vector size does not match the keys vector size!\n"); } - size_t vector_size = std::min(multiget_payload.cf_ids.size(), - multiget_payload.multiget_keys.size()); + size_t vector_size = std::min(column_family_ids.size(), keys.size()); if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { for (size_t i = 0; i < vector_size; i++) { - assert(i < multiget_payload.cf_ids.size() && - i < multiget_payload.multiget_keys.size()); + assert(i < column_family_ids.size() && i < keys.size()); s = WriteTraceSequence(TraceOperationType::kMultiGet, - multiget_payload.cf_ids[i], - multiget_payload.multiget_keys[i], value_size, ts); + column_family_ids[i], keys[i], value_size, ts); } if (!s.ok()) { return Status::Corruption("Failed to write the trace sequence to file"); @@ -1833,11 +1838,9 @@ Status TraceAnalyzer::HandleMultiGet(MultiGetPayload& multiget_payload, return Status::OK(); } for (size_t i = 0; i < vector_size; i++) { - assert(i < multiget_payload.cf_ids.size() && - i < multiget_payload.multiget_keys.size()); - s = KeyStatsInsertion(TraceOperationType::kMultiGet, - multiget_payload.cf_ids[i], - multiget_payload.multiget_keys[i], value_size, ts); + assert(i < column_family_ids.size() && i < keys.size()); + s = KeyStatsInsertion(TraceOperationType::kMultiGet, column_family_ids[i], + keys[i].ToString(), value_size, ts); } if (!s.ok()) { return Status::Corruption("Failed to insert key statistics"); @@ -2011,10 +2014,11 @@ void TraceAnalyzer::PrintStatistics() { // Write the trace sequence to file Status TraceAnalyzer::WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id, - const std::string& key, + const Slice& key, const size_t value_size, const uint64_t ts) { - std::string hex_key = ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key); + std::string hex_key = + ROCKSDB_NAMESPACE::LDBCommand::StringToHex(key.ToString()); int ret; ret = snprintf(buffer_, sizeof(buffer_), "%u %u %zu %" PRIu64 "\n", type, cf_id, value_size, ts); diff --git a/tools/trace_analyzer_tool.h b/tools/trace_analyzer_tool.h index bc1b89482..7eafd2a3c 100644 --- a/tools/trace_analyzer_tool.h +++ b/tools/trace_analyzer_tool.h @@ -15,6 +15,7 @@ #include "rocksdb/env.h" #include "rocksdb/trace_reader_writer.h" +#include "rocksdb/trace_record.h" #include "rocksdb/write_batch.h" #include "trace_replay/trace_replay.h" @@ -182,7 +183,7 @@ class TraceAnalyzer { Status WriteTraceUnit(TraceUnit& unit); // The trace processing functions for different type - Status HandleGet(uint32_t column_family_id, const std::string& key, + Status HandleGet(uint32_t column_family_id, const Slice& key, const uint64_t& ts, const uint32_t& get_ret); Status HandlePut(uint32_t column_family_id, const Slice& key, const Slice& value); @@ -192,9 +193,10 @@ class TraceAnalyzer { const Slice& end_key); Status HandleMerge(uint32_t column_family_id, const Slice& key, const Slice& value); - Status HandleIter(uint32_t column_family_id, const std::string& key, - const uint64_t& ts, TraceType& trace_type); - Status HandleMultiGet(MultiGetPayload& multiget_payload, const uint64_t& ts); + Status HandleIter(uint32_t column_family_id, const Slice& key, + const uint64_t& ts, TraceType trace_type); + Status HandleMultiGet(const std::vector& column_family_ids, + const std::vector& keys, const uint64_t& ts); std::vector& GetTaVector() { return ta_; } private: @@ -246,7 +248,7 @@ class TraceAnalyzer { Status TraceUnitWriter( std::unique_ptr& f_ptr, TraceUnit& unit); Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id, - const std::string& key, const size_t value_size, + const Slice& key, const size_t value_size, const uint64_t ts); Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats); Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit); diff --git a/trace_replay/block_cache_tracer.cc b/trace_replay/block_cache_tracer.cc index b9c7477fd..4bd18cda7 100644 --- a/trace_replay/block_cache_tracer.cc +++ b/trace_replay/block_cache_tracer.cc @@ -12,6 +12,7 @@ #include "db/db_impl/db_impl.h" #include "db/dbformat.h" #include "rocksdb/slice.h" +#include "rocksdb/trace_record.h" #include "util/coding.h" #include "util/hash.h" #include "util/string_util.h" diff --git a/trace_replay/block_cache_tracer_test.cc b/trace_replay/block_cache_tracer_test.cc index 01b834ed0..308de8e36 100644 --- a/trace_replay/block_cache_tracer_test.cc +++ b/trace_replay/block_cache_tracer_test.cc @@ -4,9 +4,11 @@ // (found in the LICENSE.Apache file in the root directory). #include "trace_replay/block_cache_tracer.h" + #include "rocksdb/env.h" #include "rocksdb/status.h" #include "rocksdb/trace_reader_writer.h" +#include "rocksdb/trace_record.h" #include "test_util/testharness.h" #include "test_util/testutil.h" diff --git a/trace_replay/io_tracer.h b/trace_replay/io_tracer.h index 8db6e2a27..3fc7cdba0 100644 --- a/trace_replay/io_tracer.h +++ b/trace_replay/io_tracer.h @@ -12,6 +12,7 @@ #include "port/lang.h" #include "rocksdb/file_system.h" #include "rocksdb/options.h" +#include "rocksdb/trace_record.h" #include "trace_replay/trace_replay.h" namespace ROCKSDB_NAMESPACE { diff --git a/trace_replay/io_tracer_test.cc b/trace_replay/io_tracer_test.cc index cc27bb970..3471c9c43 100644 --- a/trace_replay/io_tracer_test.cc +++ b/trace_replay/io_tracer_test.cc @@ -8,6 +8,7 @@ #include "rocksdb/env.h" #include "rocksdb/status.h" #include "rocksdb/trace_reader_writer.h" +#include "rocksdb/trace_record.h" #include "test_util/testharness.h" #include "test_util/testutil.h" diff --git a/trace_replay/trace_record.cc b/trace_replay/trace_record.cc new file mode 100644 index 000000000..75afcf37e --- /dev/null +++ b/trace_replay/trace_record.cc @@ -0,0 +1,163 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/trace_record.h" + +#include + +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "rocksdb/status.h" +#include "trace_replay/trace_record_handler.h" + +namespace ROCKSDB_NAMESPACE { + +// TraceRecord +TraceRecord::TraceRecord(uint64_t timestamp) : timestamp_(timestamp) {} + +TraceRecord::~TraceRecord() {} + +uint64_t TraceRecord::GetTimestamp() const { return timestamp_; } + +TraceRecord::Handler* TraceRecord::NewExecutionHandler( + DB* db, const std::vector& handles) { + return new TraceExecutionHandler(db, handles); +} + +// QueryTraceRecord +QueryTraceRecord::QueryTraceRecord(uint64_t timestamp) + : TraceRecord(timestamp) {} + +QueryTraceRecord::~QueryTraceRecord() {} + +// WriteQueryTraceRecord +WriteQueryTraceRecord::WriteQueryTraceRecord(PinnableSlice&& write_batch_rep, + uint64_t timestamp) + : QueryTraceRecord(timestamp), rep_(std::move(write_batch_rep)) {} + +WriteQueryTraceRecord::WriteQueryTraceRecord(const std::string& write_batch_rep, + uint64_t timestamp) + : QueryTraceRecord(timestamp) { + rep_.PinSelf(write_batch_rep); +} + +WriteQueryTraceRecord::~WriteQueryTraceRecord() {} + +Slice WriteQueryTraceRecord::GetWriteBatchRep() const { return Slice(rep_); } + +Status WriteQueryTraceRecord::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + +// GetQueryTraceRecord +GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id, + PinnableSlice&& key, + uint64_t timestamp) + : QueryTraceRecord(timestamp), + cf_id_(column_family_id), + key_(std::move(key)) {} + +GetQueryTraceRecord::GetQueryTraceRecord(uint32_t column_family_id, + const std::string& key, + uint64_t timestamp) + : QueryTraceRecord(timestamp), cf_id_(column_family_id) { + key_.PinSelf(key); +} + +GetQueryTraceRecord::~GetQueryTraceRecord() {} + +uint32_t GetQueryTraceRecord::GetColumnFamilyID() const { return cf_id_; } + +Slice GetQueryTraceRecord::GetKey() const { return Slice(key_); } + +Status GetQueryTraceRecord::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + +// IteratorQueryTraceRecord +IteratorQueryTraceRecord::IteratorQueryTraceRecord(uint64_t timestamp) + : QueryTraceRecord(timestamp) {} + +IteratorQueryTraceRecord::~IteratorQueryTraceRecord() {} + +// IteratorSeekQueryTraceRecord +IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( + SeekType seek_type, uint32_t column_family_id, PinnableSlice&& key, + uint64_t timestamp) + : IteratorQueryTraceRecord(timestamp), + type_(seek_type), + cf_id_(column_family_id), + key_(std::move(key)) {} + +IteratorSeekQueryTraceRecord::IteratorSeekQueryTraceRecord( + SeekType seek_type, uint32_t column_family_id, const std::string& key, + uint64_t timestamp) + : IteratorQueryTraceRecord(timestamp), + type_(seek_type), + cf_id_(column_family_id) { + key_.PinSelf(key); +} + +IteratorSeekQueryTraceRecord::~IteratorSeekQueryTraceRecord() {} + +TraceType IteratorSeekQueryTraceRecord::GetTraceType() const { + return static_cast(type_); +} + +IteratorSeekQueryTraceRecord::SeekType +IteratorSeekQueryTraceRecord::GetSeekType() const { + return type_; +} + +uint32_t IteratorSeekQueryTraceRecord::GetColumnFamilyID() const { + return cf_id_; +} + +Slice IteratorSeekQueryTraceRecord::GetKey() const { return Slice(key_); } + +Status IteratorSeekQueryTraceRecord::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + +// MultiGetQueryTraceRecord +MultiGetQueryTraceRecord::MultiGetQueryTraceRecord( + std::vector column_family_ids, std::vector&& keys, + uint64_t timestamp) + : QueryTraceRecord(timestamp), + cf_ids_(column_family_ids), + keys_(std::move(keys)) {} + +MultiGetQueryTraceRecord::MultiGetQueryTraceRecord( + std::vector column_family_ids, + const std::vector& keys, uint64_t timestamp) + : QueryTraceRecord(timestamp), cf_ids_(column_family_ids) { + keys_.reserve(keys.size()); + for (const std::string& key : keys) { + PinnableSlice ps; + ps.PinSelf(key); + keys_.push_back(std::move(ps)); + } +} + +MultiGetQueryTraceRecord::~MultiGetQueryTraceRecord() {} + +std::vector MultiGetQueryTraceRecord::GetColumnFamilyIDs() const { + return cf_ids_; +} + +std::vector MultiGetQueryTraceRecord::GetKeys() const { + return std::vector(keys_.begin(), keys_.end()); +} + +Status MultiGetQueryTraceRecord::Accept(Handler* handler) { + assert(handler != nullptr); + return handler->Handle(*this); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/trace_replay/trace_record_handler.cc b/trace_replay/trace_record_handler.cc new file mode 100644 index 000000000..4e8a40b94 --- /dev/null +++ b/trace_replay/trace_record_handler.cc @@ -0,0 +1,108 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/trace_record_handler.h" + +#include "rocksdb/iterator.h" +#include "rocksdb/write_batch.h" + +namespace ROCKSDB_NAMESPACE { + +// TraceExecutionHandler +TraceExecutionHandler::TraceExecutionHandler( + DB* db, const std::vector& handles) + : TraceRecord::Handler(), + db_(db), + write_opts_(WriteOptions()), + read_opts_(ReadOptions()) { + assert(db != nullptr); + assert(!handles.empty()); + cf_map_.reserve(handles.size()); + for (ColumnFamilyHandle* handle : handles) { + assert(handle != nullptr); + cf_map_.insert({handle->GetID(), handle}); + } +} + +TraceExecutionHandler::~TraceExecutionHandler() { cf_map_.clear(); } + +Status TraceExecutionHandler::Handle(const WriteQueryTraceRecord& record) { + WriteBatch batch(record.GetWriteBatchRep().ToString()); + return db_->Write(write_opts_, &batch); +} + +Status TraceExecutionHandler::Handle(const GetQueryTraceRecord& record) { + auto it = cf_map_.find(record.GetColumnFamilyID()); + if (it == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + assert(it->second != nullptr); + + std::string value; + Status s = db_->Get(read_opts_, it->second, record.GetKey(), &value); + + // Treat not found as ok and return other errors. + return s.IsNotFound() ? Status::OK() : s; +} + +Status TraceExecutionHandler::Handle( + const IteratorSeekQueryTraceRecord& record) { + auto it = cf_map_.find(record.GetColumnFamilyID()); + if (it == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + assert(it->second != nullptr); + + Iterator* single_iter = db_->NewIterator(read_opts_, it->second); + + switch (record.GetSeekType()) { + case IteratorSeekQueryTraceRecord::kSeekForPrev: { + single_iter->SeekForPrev(record.GetKey()); + break; + } + default: { + single_iter->Seek(record.GetKey()); + break; + } + } + Status s = single_iter->status(); + delete single_iter; + return s; +} + +Status TraceExecutionHandler::Handle(const MultiGetQueryTraceRecord& record) { + std::vector handles; + handles.reserve(record.GetColumnFamilyIDs().size()); + for (uint32_t cf_id : record.GetColumnFamilyIDs()) { + auto it = cf_map_.find(cf_id); + if (it == cf_map_.end()) { + return Status::Corruption("Invalid Column Family ID."); + } + assert(it->second != nullptr); + handles.push_back(it->second); + } + + std::vector keys = record.GetKeys(); + + if (handles.empty() || keys.empty()) { + return Status::InvalidArgument("Empty MultiGet cf_ids or keys."); + } + if (handles.size() != keys.size()) { + return Status::InvalidArgument("MultiGet cf_ids and keys size mismatch."); + } + + std::vector values; + std::vector ss = db_->MultiGet(read_opts_, handles, keys, &values); + + // Treat not found as ok, return other errors. + for (Status s : ss) { + if (!s.ok() && !s.IsNotFound()) { + return s; + } + } + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/trace_replay/trace_record_handler.h b/trace_replay/trace_record_handler.h new file mode 100644 index 000000000..fbc5a839f --- /dev/null +++ b/trace_replay/trace_record_handler.h @@ -0,0 +1,39 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/status.h" +#include "rocksdb/trace_record.h" + +namespace ROCKSDB_NAMESPACE { + +// Handler to execute TraceRecord. +class TraceExecutionHandler : public TraceRecord::Handler { + public: + TraceExecutionHandler(DB* db, + const std::vector& handles); + virtual ~TraceExecutionHandler() override; + + virtual Status Handle(const WriteQueryTraceRecord& record) override; + virtual Status Handle(const GetQueryTraceRecord& record) override; + virtual Status Handle(const IteratorSeekQueryTraceRecord& record) override; + virtual Status Handle(const MultiGetQueryTraceRecord& record) override; + + private: + DB* db_; + std::unordered_map cf_map_; + WriteOptions write_opts_; + ReadOptions read_opts_; +}; + +// To do: Handler for trace_analyzer. + +} // namespace ROCKSDB_NAMESPACE diff --git a/trace_replay/trace_replay.cc b/trace_replay/trace_replay.cc index 6171d91ec..b3063c143 100644 --- a/trace_replay/trace_replay.cc +++ b/trace_replay/trace_replay.cc @@ -11,6 +11,7 @@ #include "db/db_impl/db_impl.h" #include "rocksdb/env.h" +#include "rocksdb/iterator.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" #include "rocksdb/system_clock.h" @@ -18,7 +19,6 @@ #include "rocksdb/write_batch.h" #include "util/coding.h" #include "util/string_util.h" -#include "util/threadpool_imp.h" namespace ROCKSDB_NAMESPACE { @@ -104,6 +104,20 @@ Status TracerHelper::DecodeTrace(const std::string& encoded_trace, return Status::OK(); } +Status TracerHelper::DecodeHeader(const std::string& encoded_trace, + Trace* header) { + Status s = TracerHelper::DecodeTrace(encoded_trace, header); + + if (header->type != kTraceBegin) { + return Status::Corruption("Corrupted trace file. Incorrect header."); + } + if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) { + return Status::Corruption("Corrupted trace file. Incorrect magic."); + } + + return s; +} + bool TracerHelper::SetPayloadMap(uint64_t& payload_map, const TracePayloadType payload_type) { uint64_t old_state = payload_map; @@ -112,82 +126,153 @@ bool TracerHelper::SetPayloadMap(uint64_t& payload_map, return old_state != payload_map; } -void TracerHelper::DecodeWritePayload(Trace* trace, - WritePayload* write_payload) { - assert(write_payload != nullptr); - Slice buf(trace->payload); - GetFixed64(&buf, &trace->payload_map); - int64_t payload_map = static_cast(trace->payload_map); - while (payload_map) { - // Find the rightmost set bit. - uint32_t set_pos = static_cast(log2(payload_map & -payload_map)); - switch (set_pos) { - case TracePayloadType::kWriteBatchData: - GetLengthPrefixedSlice(&buf, &(write_payload->write_batch_data)); - break; - default: - assert(false); +Status TracerHelper::DecodeWriteRecord(Trace* trace, int trace_file_version, + std::unique_ptr* record) { + assert(trace != nullptr); + assert(trace->type == kTraceWrite); + + PinnableSlice rep; + if (trace_file_version < 2) { + rep.PinSelf(trace->payload); + } else { + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast(trace->payload_map); + Slice write_batch_data; + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = + static_cast(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kWriteBatchData: + GetLengthPrefixedSlice(&buf, &write_batch_data); + break; + default: + assert(false); + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); } - // unset the rightmost bit. - payload_map &= (payload_map - 1); + rep.PinSelf(write_batch_data); + } + + if (record != nullptr) { + record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts)); } + + return Status::OK(); } -void TracerHelper::DecodeGetPayload(Trace* trace, GetPayload* get_payload) { - assert(get_payload != nullptr); - Slice buf(trace->payload); - GetFixed64(&buf, &trace->payload_map); - int64_t payload_map = static_cast(trace->payload_map); - while (payload_map) { - // Find the rightmost set bit. - uint32_t set_pos = static_cast(log2(payload_map & -payload_map)); - switch (set_pos) { - case TracePayloadType::kGetCFID: - GetFixed32(&buf, &(get_payload->cf_id)); - break; - case TracePayloadType::kGetKey: - GetLengthPrefixedSlice(&buf, &(get_payload->get_key)); - break; - default: - assert(false); +Status TracerHelper::DecodeGetRecord(Trace* trace, int trace_file_version, + std::unique_ptr* record) { + assert(trace != nullptr); + assert(trace->type == kTraceGet); + + uint32_t cf_id = 0; + Slice get_key; + + if (trace_file_version < 2) { + DecodeCFAndKey(trace->payload, &cf_id, &get_key); + } else { + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = + static_cast(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kGetCFID: + GetFixed32(&buf, &cf_id); + break; + case TracePayloadType::kGetKey: + GetLengthPrefixedSlice(&buf, &get_key); + break; + default: + assert(false); + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); } - // unset the rightmost bit. - payload_map &= (payload_map - 1); } + + if (record != nullptr) { + PinnableSlice ps; + ps.PinSelf(get_key); + record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts)); + } + + return Status::OK(); } -void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) { - assert(iter_payload != nullptr); - Slice buf(trace->payload); - GetFixed64(&buf, &trace->payload_map); - int64_t payload_map = static_cast(trace->payload_map); - while (payload_map) { - // Find the rightmost set bit. - uint32_t set_pos = static_cast(log2(payload_map & -payload_map)); - switch (set_pos) { - case TracePayloadType::kIterCFID: - GetFixed32(&buf, &(iter_payload->cf_id)); - break; - case TracePayloadType::kIterKey: - GetLengthPrefixedSlice(&buf, &(iter_payload->iter_key)); - break; - case TracePayloadType::kIterLowerBound: - GetLengthPrefixedSlice(&buf, &(iter_payload->lower_bound)); - break; - case TracePayloadType::kIterUpperBound: - GetLengthPrefixedSlice(&buf, &(iter_payload->upper_bound)); - break; - default: - assert(false); +Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version, + std::unique_ptr* record) { + assert(trace != nullptr); + assert(trace->type == kTraceIteratorSeek || + trace->type == kTraceIteratorSeekForPrev); + + uint32_t cf_id = 0; + Slice iter_key; + + if (trace_file_version < 2) { + DecodeCFAndKey(trace->payload, &cf_id, &iter_key); + } else { + // Are these two used anywhere? + Slice lower_bound; + Slice upper_bound; + + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = + static_cast(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kIterCFID: + GetFixed32(&buf, &cf_id); + break; + case TracePayloadType::kIterKey: + GetLengthPrefixedSlice(&buf, &iter_key); + break; + case TracePayloadType::kIterLowerBound: + GetLengthPrefixedSlice(&buf, &lower_bound); + break; + case TracePayloadType::kIterUpperBound: + GetLengthPrefixedSlice(&buf, &upper_bound); + break; + default: + assert(false); + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); } - // unset the rightmost bit. - payload_map &= (payload_map - 1); } + + if (record != nullptr) { + PinnableSlice ps_key; + ps_key.PinSelf(iter_key); + record->reset(new IteratorSeekQueryTraceRecord( + static_cast(trace->type), cf_id, + std::move(ps_key), trace->ts)); + } + + return Status::OK(); } -void TracerHelper::DecodeMultiGetPayload(Trace* trace, - MultiGetPayload* multiget_payload) { - assert(multiget_payload != nullptr); +Status TracerHelper::DecodeMultiGetRecord( + Trace* trace, int trace_file_version, + std::unique_ptr* record) { + assert(trace != nullptr); + assert(trace->type == kTraceMultiGet); + if (trace_file_version < 2) { + return Status::Corruption("MultiGet is not supported."); + } + + uint32_t multiget_size = 0; + std::vector cf_ids; + std::vector multiget_keys; + Slice cfids_payload; Slice keys_payload; Slice buf(trace->payload); @@ -198,7 +283,7 @@ void TracerHelper::DecodeMultiGetPayload(Trace* trace, uint32_t set_pos = static_cast(log2(payload_map & -payload_map)); switch (set_pos) { case TracePayloadType::kMultiGetSize: - GetFixed32(&buf, &(multiget_payload->multiget_size)); + GetFixed32(&buf, &multiget_size); break; case TracePayloadType::kMultiGetCFIDs: GetLengthPrefixedSlice(&buf, &cfids_payload); @@ -212,18 +297,31 @@ void TracerHelper::DecodeMultiGetPayload(Trace* trace, // unset the rightmost bit. payload_map &= (payload_map - 1); } + if (multiget_size == 0) { + return Status::InvalidArgument("Empty MultiGet cf_ids or keys."); + } // Decode the cfids_payload and keys_payload - multiget_payload->cf_ids.reserve(multiget_payload->multiget_size); - multiget_payload->multiget_keys.reserve(multiget_payload->multiget_size); - for (uint32_t i = 0; i < multiget_payload->multiget_size; i++) { + cf_ids.reserve(multiget_size); + multiget_keys.reserve(multiget_size); + for (uint32_t i = 0; i < multiget_size; i++) { uint32_t tmp_cfid; Slice tmp_key; GetFixed32(&cfids_payload, &tmp_cfid); GetLengthPrefixedSlice(&keys_payload, &tmp_key); - multiget_payload->cf_ids.push_back(tmp_cfid); - multiget_payload->multiget_keys.push_back(tmp_key.ToString()); + cf_ids.push_back(tmp_cfid); + Slice s(tmp_key); + PinnableSlice ps; + ps.PinSelf(s); + multiget_keys.push_back(std::move(ps)); + } + + if (record != nullptr) { + record->reset(new MultiGetQueryTraceRecord( + std::move(cf_ids), std::move(multiget_keys), trace->ts)); } + + return Status::OK(); } Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options, @@ -418,10 +516,9 @@ bool Tracer::ShouldSkipTrace(const TraceType& trace_type) { if (IsTraceFileOverMax()) { return true; } - if ((trace_options_.filter & kTraceFilterGet - && trace_type == kTraceGet) - || (trace_options_.filter & kTraceFilterWrite - && trace_type == kTraceWrite)) { + if ((trace_options_.filter & kTraceFilterGet && trace_type == kTraceGet) || + (trace_options_.filter & kTraceFilterWrite && + trace_type == kTraceWrite)) { return true; } ++trace_request_count_; @@ -471,445 +568,4 @@ Status Tracer::WriteTrace(const Trace& trace) { Status Tracer::Close() { return WriteFooter(); } -Replayer::Replayer(DB* db, const std::vector& handles, - std::unique_ptr&& reader) - : trace_reader_(std::move(reader)) { - assert(db != nullptr); - db_ = static_cast(db->GetRootDB()); - env_ = Env::Default(); - for (ColumnFamilyHandle* cfh : handles) { - cf_map_[cfh->GetID()] = cfh; - } - fast_forward_ = 1; -} - -Replayer::~Replayer() { trace_reader_.reset(); } - -Status Replayer::SetFastForward(uint32_t fast_forward) { - Status s; - if (fast_forward < 1) { - s = Status::InvalidArgument("Wrong fast forward speed!"); - } else { - fast_forward_ = fast_forward; - s = Status::OK(); - } - return s; -} - -Status Replayer::Replay() { - Status s; - Trace header; - int db_version; - s = ReadHeader(&header); - if (!s.ok()) { - return s; - } - s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version); - if (!s.ok()) { - return s; - } - - std::chrono::system_clock::time_point replay_epoch = - std::chrono::system_clock::now(); - WriteOptions woptions; - ReadOptions roptions; - Trace trace; - uint64_t ops = 0; - Iterator* single_iter = nullptr; - while (s.ok()) { - trace.reset(); - s = ReadTrace(&trace); - if (!s.ok()) { - break; - } - - std::this_thread::sleep_until( - replay_epoch + - std::chrono::microseconds((trace.ts - header.ts) / fast_forward_)); - if (trace.type == kTraceWrite) { - if (trace_file_version_ < 2) { - WriteBatch batch(trace.payload); - db_->Write(woptions, &batch); - } else { - WritePayload w_payload; - TracerHelper::DecodeWritePayload(&trace, &w_payload); - WriteBatch batch(w_payload.write_batch_data.ToString()); - db_->Write(woptions, &batch); - } - ops++; - } else if (trace.type == kTraceGet) { - GetPayload get_payload; - get_payload.cf_id = 0; - get_payload.get_key = 0; - if (trace_file_version_ < 2) { - DecodeCFAndKey(trace.payload, &get_payload.cf_id, &get_payload.get_key); - } else { - TracerHelper::DecodeGetPayload(&trace, &get_payload); - } - if (get_payload.cf_id > 0 && - cf_map_.find(get_payload.cf_id) == cf_map_.end()) { - return Status::Corruption("Invalid Column Family ID."); - } - - std::string value; - if (get_payload.cf_id == 0) { - db_->Get(roptions, get_payload.get_key, &value); - } else { - db_->Get(roptions, cf_map_[get_payload.cf_id], get_payload.get_key, - &value); - } - ops++; - } else if (trace.type == kTraceIteratorSeek) { - // Currently, we only support to call Seek. The Next() and Prev() is not - // supported. - IterPayload iter_payload; - iter_payload.cf_id = 0; - if (trace_file_version_ < 2) { - DecodeCFAndKey(trace.payload, &iter_payload.cf_id, - &iter_payload.iter_key); - } else { - TracerHelper::DecodeIterPayload(&trace, &iter_payload); - } - if (iter_payload.cf_id > 0 && - cf_map_.find(iter_payload.cf_id) == cf_map_.end()) { - return Status::Corruption("Invalid Column Family ID."); - } - - if (iter_payload.cf_id == 0) { - single_iter = db_->NewIterator(roptions); - } else { - single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]); - } - single_iter->Seek(iter_payload.iter_key); - ops++; - delete single_iter; - } else if (trace.type == kTraceIteratorSeekForPrev) { - // Currently, we only support to call SeekForPrev. The Next() and Prev() - // is not supported. - IterPayload iter_payload; - iter_payload.cf_id = 0; - if (trace_file_version_ < 2) { - DecodeCFAndKey(trace.payload, &iter_payload.cf_id, - &iter_payload.iter_key); - } else { - TracerHelper::DecodeIterPayload(&trace, &iter_payload); - } - if (iter_payload.cf_id > 0 && - cf_map_.find(iter_payload.cf_id) == cf_map_.end()) { - return Status::Corruption("Invalid Column Family ID."); - } - - if (iter_payload.cf_id == 0) { - single_iter = db_->NewIterator(roptions); - } else { - single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]); - } - single_iter->SeekForPrev(iter_payload.iter_key); - ops++; - delete single_iter; - } else if (trace.type == kTraceMultiGet) { - MultiGetPayload multiget_payload; - assert(trace_file_version_ >= 2); - TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload); - std::vector v_cfd; - std::vector keys; - assert(multiget_payload.cf_ids.size() == - multiget_payload.multiget_keys.size()); - for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) { - assert(i < multiget_payload.cf_ids.size() && - i < multiget_payload.multiget_keys.size()); - if (cf_map_.find(multiget_payload.cf_ids[i]) == cf_map_.end()) { - return Status::Corruption("Invalid Column Family ID."); - } - v_cfd.push_back(cf_map_[multiget_payload.cf_ids[i]]); - keys.push_back(Slice(multiget_payload.multiget_keys[i])); - } - std::vector values; - std::vector ss = db_->MultiGet(roptions, v_cfd, keys, &values); - } else if (trace.type == kTraceEnd) { - // Do nothing for now. - // TODO: Add some validations later. - break; - } - } - - if (s.IsIncomplete()) { - // Reaching eof returns Incomplete status at the moment. - // Could happen when killing a process without calling EndTrace() API. - // TODO: Add better error handling. - return Status::OK(); - } - return s; -} - -// The trace can be replayed with multithread by configurnge the number of -// threads in the thread pool. Trace records are read from the trace file -// sequentially and the corresponding queries are scheduled in the task -// queue based on the timestamp. Currently, we support Write_batch (Put, -// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev). -Status Replayer::MultiThreadReplay(uint32_t threads_num) { - Status s; - Trace header; - int db_version; - s = ReadHeader(&header); - if (!s.ok()) { - return s; - } - s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version); - if (!s.ok()) { - return s; - } - ThreadPoolImpl thread_pool; - thread_pool.SetHostEnv(env_); - - if (threads_num > 1) { - thread_pool.SetBackgroundThreads(static_cast(threads_num)); - } else { - thread_pool.SetBackgroundThreads(1); - } - - std::chrono::system_clock::time_point replay_epoch = - std::chrono::system_clock::now(); - WriteOptions woptions; - ReadOptions roptions; - uint64_t ops = 0; - while (s.ok()) { - std::unique_ptr ra(new ReplayerWorkerArg); - ra->db = db_; - s = ReadTrace(&(ra->trace_entry)); - if (!s.ok()) { - break; - } - ra->cf_map = &cf_map_; - ra->woptions = woptions; - ra->roptions = roptions; - ra->trace_file_version = trace_file_version_; - - std::this_thread::sleep_until( - replay_epoch + std::chrono::microseconds( - (ra->trace_entry.ts - header.ts) / fast_forward_)); - if (ra->trace_entry.type == kTraceWrite) { - thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr, - nullptr); - ops++; - } else if (ra->trace_entry.type == kTraceGet) { - thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr, - nullptr); - ops++; - } else if (ra->trace_entry.type == kTraceIteratorSeek) { - thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr, - nullptr); - ops++; - } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) { - thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(), - nullptr, nullptr); - ops++; - } else if (ra->trace_entry.type == kTraceMultiGet) { - thread_pool.Schedule(&Replayer::BGWorkMultiGet, ra.release(), nullptr, - nullptr); - ops++; - } else if (ra->trace_entry.type == kTraceEnd) { - // Do nothing for now. - // TODO: Add some validations later. - break; - } else { - // Other trace entry types that are not implemented for replay. - // To finish the replay, we continue the process. - continue; - } - } - - if (s.IsIncomplete()) { - // Reaching eof returns Incomplete status at the moment. - // Could happen when killing a process without calling EndTrace() API. - // TODO: Add better error handling. - s = Status::OK(); - } - thread_pool.JoinAllThreads(); - return s; -} - -Status Replayer::ReadHeader(Trace* header) { - assert(header != nullptr); - std::string encoded_trace; - // Read the trace head - Status s = trace_reader_->Read(&encoded_trace); - if (!s.ok()) { - return s; - } - - s = TracerHelper::DecodeTrace(encoded_trace, header); - - if (header->type != kTraceBegin) { - return Status::Corruption("Corrupted trace file. Incorrect header."); - } - if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) { - return Status::Corruption("Corrupted trace file. Incorrect magic."); - } - - return s; -} - -Status Replayer::ReadFooter(Trace* footer) { - assert(footer != nullptr); - Status s = ReadTrace(footer); - if (!s.ok()) { - return s; - } - if (footer->type != kTraceEnd) { - return Status::Corruption("Corrupted trace file. Incorrect footer."); - } - - // TODO: Add more validations later - return s; -} - -Status Replayer::ReadTrace(Trace* trace) { - assert(trace != nullptr); - std::string encoded_trace; - Status s = trace_reader_->Read(&encoded_trace); - if (!s.ok()) { - return s; - } - return TracerHelper::DecodeTrace(encoded_trace, trace); -} - -void Replayer::BGWorkGet(void* arg) { - std::unique_ptr ra( - reinterpret_cast(arg)); - assert(ra != nullptr); - auto cf_map = static_cast*>( - ra->cf_map); - GetPayload get_payload; - get_payload.cf_id = 0; - if (ra->trace_file_version < 2) { - DecodeCFAndKey(ra->trace_entry.payload, &get_payload.cf_id, - &get_payload.get_key); - } else { - TracerHelper::DecodeGetPayload(&(ra->trace_entry), &get_payload); - } - if (get_payload.cf_id > 0 && - cf_map->find(get_payload.cf_id) == cf_map->end()) { - return; - } - - std::string value; - if (get_payload.cf_id == 0) { - ra->db->Get(ra->roptions, get_payload.get_key, &value); - } else { - ra->db->Get(ra->roptions, (*cf_map)[get_payload.cf_id], get_payload.get_key, - &value); - } - return; -} - -void Replayer::BGWorkWriteBatch(void* arg) { - std::unique_ptr ra( - reinterpret_cast(arg)); - assert(ra != nullptr); - - if (ra->trace_file_version < 2) { - WriteBatch batch(ra->trace_entry.payload); - ra->db->Write(ra->woptions, &batch); - } else { - WritePayload w_payload; - TracerHelper::DecodeWritePayload(&(ra->trace_entry), &w_payload); - WriteBatch batch(w_payload.write_batch_data.ToString()); - ra->db->Write(ra->woptions, &batch); - } - return; -} - -void Replayer::BGWorkIterSeek(void* arg) { - std::unique_ptr ra( - reinterpret_cast(arg)); - assert(ra != nullptr); - auto cf_map = static_cast*>( - ra->cf_map); - IterPayload iter_payload; - iter_payload.cf_id = 0; - - if (ra->trace_file_version < 2) { - DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id, - &iter_payload.iter_key); - } else { - TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload); - } - if (iter_payload.cf_id > 0 && - cf_map->find(iter_payload.cf_id) == cf_map->end()) { - return; - } - - Iterator* single_iter = nullptr; - if (iter_payload.cf_id == 0) { - single_iter = ra->db->NewIterator(ra->roptions); - } else { - single_iter = - ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]); - } - single_iter->Seek(iter_payload.iter_key); - delete single_iter; - return; -} - -void Replayer::BGWorkIterSeekForPrev(void* arg) { - std::unique_ptr ra( - reinterpret_cast(arg)); - assert(ra != nullptr); - auto cf_map = static_cast*>( - ra->cf_map); - IterPayload iter_payload; - iter_payload.cf_id = 0; - - if (ra->trace_file_version < 2) { - DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id, - &iter_payload.iter_key); - } else { - TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload); - } - if (iter_payload.cf_id > 0 && - cf_map->find(iter_payload.cf_id) == cf_map->end()) { - return; - } - - Iterator* single_iter = nullptr; - if (iter_payload.cf_id == 0) { - single_iter = ra->db->NewIterator(ra->roptions); - } else { - single_iter = - ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]); - } - single_iter->SeekForPrev(iter_payload.iter_key); - delete single_iter; - return; -} - -void Replayer::BGWorkMultiGet(void* arg) { - std::unique_ptr ra( - reinterpret_cast(arg)); - assert(ra != nullptr); - auto cf_map = static_cast*>( - ra->cf_map); - MultiGetPayload multiget_payload; - if (ra->trace_file_version < 2) { - return; - } - TracerHelper::DecodeMultiGetPayload(&(ra->trace_entry), &multiget_payload); - std::vector v_cfd; - std::vector keys; - if (multiget_payload.cf_ids.size() != multiget_payload.multiget_keys.size()) { - return; - } - for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) { - if (cf_map->find(multiget_payload.cf_ids[i]) == cf_map->end()) { - return; - } - v_cfd.push_back((*cf_map)[multiget_payload.cf_ids[i]]); - keys.push_back(Slice(multiget_payload.multiget_keys[i])); - } - std::vector values; - std::vector ss = ra->db->MultiGet(ra->roptions, v_cfd, keys, &values); - return; -} - } // namespace ROCKSDB_NAMESPACE diff --git a/trace_replay/trace_replay.h b/trace_replay/trace_replay.h index d10bc1b46..979eb3492 100644 --- a/trace_replay/trace_replay.h +++ b/trace_replay/trace_replay.h @@ -5,13 +5,17 @@ #pragma once +#include #include +#include #include #include #include "rocksdb/options.h" #include "rocksdb/rocksdb_namespace.h" #include "rocksdb/status.h" +#include "rocksdb/trace_record.h" +#include "rocksdb/utilities/replayer.h" namespace ROCKSDB_NAMESPACE { @@ -43,31 +47,6 @@ const unsigned int kTraceMetadataSize = static const int kTraceFileMajorVersion = 0; static const int kTraceFileMinorVersion = 2; -// Supported Trace types. -enum TraceType : char { - kTraceBegin = 1, - kTraceEnd = 2, - kTraceWrite = 3, - kTraceGet = 4, - kTraceIteratorSeek = 5, - kTraceIteratorSeekForPrev = 6, - // Block cache related types. - kBlockTraceIndexBlock = 7, - kBlockTraceFilterBlock = 8, - kBlockTraceDataBlock = 9, - kBlockTraceUncompressionDictBlock = 10, - kBlockTraceRangeDeletionBlock = 11, - // For IOTracing. - kIOTracer = 12, - // For query tracing - kTraceMultiGet = 13, - // All trace types should be added before kTraceMax - kTraceMax, -}; - -// TODO: This should also be made part of public interface to help users build -// custom TracerReaders and TraceWriters. -// // The data structure that defines a single trace. struct Trace { uint64_t ts; // timestamp @@ -105,28 +84,6 @@ enum TracePayloadType : char { kMultiGetKeys = 10, }; -struct WritePayload { - Slice write_batch_data; -}; - -struct GetPayload { - uint32_t cf_id = 0; - Slice get_key; -}; - -struct IterPayload { - uint32_t cf_id = 0; - Slice iter_key; - Slice lower_bound; - Slice upper_bound; -}; - -struct MultiGetPayload { - uint32_t multiget_size; - std::vector cf_ids; - std::vector multiget_keys; -}; - class TracerHelper { public: // Parse the string with major and minor version only @@ -142,22 +99,28 @@ class TracerHelper { // Decode a string into the given trace object. static Status DecodeTrace(const std::string& encoded_trace, Trace* trace); + // Decode a string into the given trace header. + static Status DecodeHeader(const std::string& encoded_trace, Trace* header); + // Set the payload map based on the payload type static bool SetPayloadMap(uint64_t& payload_map, const TracePayloadType payload_type); // Decode the write payload and store in WrteiPayload - static void DecodeWritePayload(Trace* trace, WritePayload* write_payload); + static Status DecodeWriteRecord(Trace* trace, int trace_file_version, + std::unique_ptr* record); // Decode the get payload and store in WrteiPayload - static void DecodeGetPayload(Trace* trace, GetPayload* get_payload); + static Status DecodeGetRecord(Trace* trace, int trace_file_version, + std::unique_ptr* record); // Decode the iter payload and store in WrteiPayload - static void DecodeIterPayload(Trace* trace, IterPayload* iter_payload); + static Status DecodeIterRecord(Trace* trace, int trace_file_version, + std::unique_ptr* record); // Decode the multiget payload and store in MultiGetPayload - static void DecodeMultiGetPayload(Trace* trace, - MultiGetPayload* multiget_payload); + static Status DecodeMultiGetRecord(Trace* trace, int trace_file_version, + std::unique_ptr* record); }; // Tracer captures all RocksDB operations using a user-provided TraceWriter. @@ -222,75 +185,4 @@ class Tracer { uint64_t trace_request_count_; }; -// Replayer helps to replay the captured RocksDB operations, using a user -// provided TraceReader. -// The Replayer is instantiated via db_bench today, on using "replay" benchmark. -class Replayer { - public: - Replayer(DB* db, const std::vector& handles, - std::unique_ptr&& reader); - ~Replayer(); - - // Replay all the traces from the provided trace stream, taking the delay - // between the traces into consideration. - Status Replay(); - - // Replay the provide trace stream, which is the same as Replay(), with - // multi-threads. Queries are scheduled in the thread pool job queue. - // User can set the number of threads in the thread pool. - Status MultiThreadReplay(uint32_t threads_num); - - // Enables fast forwarding a replay by reducing the delay between the ingested - // traces. - // fast_forward : Rate of replay speedup. - // If 1, replay the operations at the same rate as in the trace stream. - // If > 1, speed up the replay by this amount. - Status SetFastForward(uint32_t fast_forward); - - private: - Status ReadHeader(Trace* header); - Status ReadFooter(Trace* footer); - Status ReadTrace(Trace* trace); - - // The background function for MultiThreadReplay to execute Get query - // based on the trace records. - static void BGWorkGet(void* arg); - - // The background function for MultiThreadReplay to execute WriteBatch - // (Put, Delete, SingleDelete, DeleteRange) based on the trace records. - static void BGWorkWriteBatch(void* arg); - - // The background function for MultiThreadReplay to execute Iterator (Seek) - // based on the trace records. - static void BGWorkIterSeek(void* arg); - - // The background function for MultiThreadReplay to execute Iterator - // (SeekForPrev) based on the trace records. - static void BGWorkIterSeekForPrev(void* arg); - - // The background function for MultiThreadReplay to execute MultiGet based on - // the trace records - static void BGWorkMultiGet(void* arg); - - DBImpl* db_; - Env* env_; - std::unique_ptr trace_reader_; - std::unordered_map cf_map_; - uint32_t fast_forward_; - // When reading the trace header, the trace file version can be parsed. - // Replayer will use different decode method to get the trace content based - // on different trace file version. - int trace_file_version_; -}; - -// The passin arg of MultiThreadRepkay for each trace record. -struct ReplayerWorkerArg { - DB* db; - Trace trace_entry; - std::unordered_map* cf_map; - WriteOptions woptions; - ReadOptions roptions; - int trace_file_version; -}; - } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/simulator_cache/cache_simulator.cc b/utilities/simulator_cache/cache_simulator.cc index 31eaf8554..b214d4ac0 100644 --- a/utilities/simulator_cache/cache_simulator.cc +++ b/utilities/simulator_cache/cache_simulator.cc @@ -4,8 +4,11 @@ // (found in the LICENSE.Apache file in the root directory). #include "utilities/simulator_cache/cache_simulator.h" + #include + #include "db/dbformat.h" +#include "rocksdb/trace_record.h" namespace ROCKSDB_NAMESPACE { diff --git a/utilities/simulator_cache/cache_simulator_test.cc b/utilities/simulator_cache/cache_simulator_test.cc index a205315cc..beacdfa1e 100644 --- a/utilities/simulator_cache/cache_simulator_test.cc +++ b/utilities/simulator_cache/cache_simulator_test.cc @@ -6,7 +6,9 @@ #include "utilities/simulator_cache/cache_simulator.h" #include + #include "rocksdb/env.h" +#include "rocksdb/trace_record.h" #include "test_util/testharness.h" #include "test_util/testutil.h" diff --git a/utilities/trace/file_trace_reader_writer.cc b/utilities/trace/file_trace_reader_writer.cc index d553e2434..dc58ded21 100644 --- a/utilities/trace/file_trace_reader_writer.cc +++ b/utilities/trace/file_trace_reader_writer.cc @@ -31,6 +31,14 @@ Status FileTraceReader::Close() { return Status::OK(); } +Status FileTraceReader::Reset() { + if (file_reader_ == nullptr) { + return Status::IOError("TraceReader is closed."); + } + offset_ = 0; + return Status::OK(); +} + Status FileTraceReader::Read(std::string* data) { assert(file_reader_ != nullptr); Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize, diff --git a/utilities/trace/file_trace_reader_writer.h b/utilities/trace/file_trace_reader_writer.h index a9eafa5af..909317fe4 100644 --- a/utilities/trace/file_trace_reader_writer.h +++ b/utilities/trace/file_trace_reader_writer.h @@ -20,6 +20,7 @@ class FileTraceReader : public TraceReader { virtual Status Read(std::string* data) override; virtual Status Close() override; + virtual Status Reset() override; private: std::unique_ptr file_reader_; diff --git a/utilities/trace/replayer_impl.cc b/utilities/trace/replayer_impl.cc new file mode 100644 index 000000000..2789de5cd --- /dev/null +++ b/utilities/trace/replayer_impl.cc @@ -0,0 +1,305 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/trace/replayer_impl.h" + +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "rocksdb/system_clock.h" +#include "rocksdb/trace_reader_writer.h" +#include "util/threadpool_imp.h" + +namespace ROCKSDB_NAMESPACE { + +ReplayerImpl::ReplayerImpl(DB* db, + const std::vector& handles, + std::unique_ptr&& reader) + : Replayer(), + env_(db->GetEnv()), + trace_reader_(std::move(reader)), + prepared_(false), + trace_end_(false), + header_ts_(0), + exec_handler_(TraceRecord::NewExecutionHandler(db, handles)) {} + +ReplayerImpl::~ReplayerImpl() { + exec_handler_.reset(); + trace_reader_.reset(); +} + +Status ReplayerImpl::Prepare() { + Trace header; + int db_version; + Status s = ReadHeader(&header); + if (!s.ok()) { + return s; + } + s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version); + if (!s.ok()) { + return s; + } + header_ts_ = header.ts; + prepared_ = true; + trace_end_ = false; + return Status::OK(); +} + +Status ReplayerImpl::Next(std::unique_ptr* record) { + if (!prepared_) { + return Status::Incomplete("Not prepared!"); + } + if (trace_end_) { + return Status::Incomplete("Trace end."); + } + + Trace trace; + Status s = ReadTrace(&trace); // ReadTrace is atomic + // Reached the trace end. + if (s.ok() && trace.type == kTraceEnd) { + trace_end_ = true; + return Status::Incomplete("Trace end."); + } + if (!s.ok() || record == nullptr) { + return s; + } + + return DecodeTraceRecord(&trace, trace_file_version_, record); +} + +Status ReplayerImpl::Execute(const std::unique_ptr& record) { + return record->Accept(exec_handler_.get()); +} + +Status ReplayerImpl::Execute(std::unique_ptr&& record) { + Status s = record->Accept(exec_handler_.get()); + record.reset(); + return s; +} + +Status ReplayerImpl::Replay(const ReplayOptions& options) { + if (options.fast_forward <= 0.0) { + return Status::InvalidArgument("Wrong fast forward speed!"); + } + + if (!prepared_) { + return Status::Incomplete("Not prepared!"); + } + if (trace_end_) { + return Status::Incomplete("Trace end."); + } + + Status s = Status::OK(); + + if (options.num_threads <= 1) { + // num_threads == 0 or num_threads == 1 uses single thread. + std::chrono::system_clock::time_point replay_epoch = + std::chrono::system_clock::now(); + + while (s.ok()) { + Trace trace; + s = ReadTrace(&trace); + // If already at trace end, ReadTrace should return Status::Incomplete(). + if (!s.ok()) { + break; + } + + // No need to sleep before breaking the loop if at the trace end. + if (trace.type == kTraceEnd) { + trace_end_ = true; + s = Status::Incomplete("Trace end."); + break; + } + + // In single-threaded replay, decode first then sleep. + std::unique_ptr record; + s = DecodeTraceRecord(&trace, trace_file_version_, &record); + // Skip unsupported traces, stop for other errors. + if (s.IsNotSupported()) { + continue; + } else if (!s.ok()) { + break; + } + + std::this_thread::sleep_until( + replay_epoch + + std::chrono::microseconds(static_cast(std::llround( + 1.0 * (trace.ts - header_ts_) / options.fast_forward)))); + + s = Execute(std::move(record)); + } + } else { + // Multi-threaded replay. + ThreadPoolImpl thread_pool; + thread_pool.SetHostEnv(env_); + thread_pool.SetBackgroundThreads(static_cast(options.num_threads)); + + std::mutex mtx; + // Background decoding and execution status. + Status bg_s = Status::OK(); + uint64_t last_err_ts = static_cast(-1); + // Callback function used in background work to update bg_s at the first + // execution error (with the smallest Trace timestamp). + auto error_cb = [&mtx, &bg_s, &last_err_ts](Status err, uint64_t err_ts) { + std::lock_guard gd(mtx); + // Only record the first error. + if (!err.ok() && !err.IsNotSupported() && err_ts < last_err_ts) { + bg_s = err; + last_err_ts = err_ts; + } + }; + + std::chrono::system_clock::time_point replay_epoch = + std::chrono::system_clock::now(); + + while (bg_s.ok() && s.ok()) { + Trace trace; + s = ReadTrace(&trace); + // If already at trace end, ReadTrace should return Status::Incomplete(). + if (!s.ok()) { + break; + } + + TraceType trace_type = trace.type; + + // No need to sleep before breaking the loop if at the trace end. + if (trace_type == kTraceEnd) { + trace_end_ = true; + s = Status::Incomplete("Trace end."); + break; + } + + // In multi-threaded replay, sleep first thatn start decoding and + // execution in a thread. + std::this_thread::sleep_until( + replay_epoch + + std::chrono::microseconds(static_cast(std::llround( + 1.0 * (trace.ts - header_ts_) / options.fast_forward)))); + + if (trace_type == kTraceWrite || trace_type == kTraceGet || + trace_type == kTraceIteratorSeek || + trace_type == kTraceIteratorSeekForPrev || + trace_type == kTraceMultiGet) { + std::unique_ptr ra(new ReplayerWorkerArg); + ra->trace_entry = std::move(trace); + ra->handler = exec_handler_.get(); + ra->trace_file_version = trace_file_version_; + ra->error_cb = error_cb; + thread_pool.Schedule(&ReplayerImpl::BackgroundWork, ra.release(), + nullptr, nullptr); + } + // Skip unsupported traces. + } + + thread_pool.WaitForJobsAndJoinAllThreads(); + if (!bg_s.ok()) { + s = bg_s; + } + } + + if (s.IsIncomplete()) { + // Reaching eof returns Incomplete status at the moment. + // Could happen when killing a process without calling EndTrace() API. + // TODO: Add better error handling. + trace_end_ = true; + return Status::OK(); + } + return s; +} + +uint64_t ReplayerImpl::GetHeaderTimestamp() const { return header_ts_; } + +Status ReplayerImpl::ReadHeader(Trace* header) { + assert(header != nullptr); + Status s = trace_reader_->Reset(); + if (!s.ok()) { + return s; + } + std::string encoded_trace; + // Read the trace head + s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + + return TracerHelper::DecodeHeader(encoded_trace, header); +} + +Status ReplayerImpl::ReadFooter(Trace* footer) { + assert(footer != nullptr); + Status s = ReadTrace(footer); + if (!s.ok()) { + return s; + } + if (footer->type != kTraceEnd) { + return Status::Corruption("Corrupted trace file. Incorrect footer."); + } + + // TODO: Add more validations later + return s; +} + +Status ReplayerImpl::ReadTrace(Trace* trace) { + assert(trace != nullptr); + std::string encoded_trace; + // We don't know if TraceReader is implemented thread-safe, so we protect the + // reading trace part with a mutex. The decoding part does not need to be + // protected since it's local. + { + std::lock_guard guard(mutex_); + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + } + return TracerHelper::DecodeTrace(encoded_trace, trace); +} + +Status ReplayerImpl::DecodeTraceRecord(Trace* trace, int trace_file_version, + std::unique_ptr* record) { + switch (trace->type) { + case kTraceWrite: + return TracerHelper::DecodeWriteRecord(trace, trace_file_version, record); + case kTraceGet: + return TracerHelper::DecodeGetRecord(trace, trace_file_version, record); + case kTraceIteratorSeek: + case kTraceIteratorSeekForPrev: + return TracerHelper::DecodeIterRecord(trace, trace_file_version, record); + case kTraceMultiGet: + return TracerHelper::DecodeMultiGetRecord(trace, trace_file_version, + record); + case kTraceEnd: + return Status::Incomplete("Trace end."); + default: + return Status::NotSupported("Unsupported trace type."); + } +} + +void ReplayerImpl::BackgroundWork(void* arg) { + std::unique_ptr ra( + reinterpret_cast(arg)); + assert(ra != nullptr); + + std::unique_ptr record; + Status s = + DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record); + if (s.ok()) { + s = record->Accept(ra->handler); + record.reset(); + } + if (!s.ok() && ra->error_cb) { + ra->error_cb(s, ra->trace_entry.ts); + } +} + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/utilities/trace/replayer_impl.h b/utilities/trace/replayer_impl.h new file mode 100644 index 000000000..b796d2226 --- /dev/null +++ b/utilities/trace/replayer_impl.h @@ -0,0 +1,90 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include +#include + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/trace_record.h" +#include "rocksdb/utilities/replayer.h" +#include "trace_replay/trace_replay.h" + +namespace ROCKSDB_NAMESPACE { + +class ColumnFamilyHandle; +class DB; +class Env; +class TraceReader; +class TraceRecord; +class Status; + +struct ReplayOptions; + +class ReplayerImpl : public Replayer { + public: + ReplayerImpl(DB* db, const std::vector& handles, + std::unique_ptr&& reader); + ~ReplayerImpl() override; + + using Replayer::Prepare; + Status Prepare() override; + + using Replayer::Next; + Status Next(std::unique_ptr* record) override; + + using Replayer::Execute; + Status Execute(const std::unique_ptr& record) override; + Status Execute(std::unique_ptr&& record) override; + + using Replayer::Replay; + Status Replay(const ReplayOptions& options) override; + + using Replayer::GetHeaderTimestamp; + uint64_t GetHeaderTimestamp() const override; + + private: + Status ReadHeader(Trace* header); + Status ReadFooter(Trace* footer); + Status ReadTrace(Trace* trace); + + // Generic function to convert a Trace to TraceRecord. + static Status DecodeTraceRecord(Trace* trace, int trace_file_version, + std::unique_ptr* record); + + // Generic function to execute a Trace in a thread pool. + static void BackgroundWork(void* arg); + + Env* env_; + std::unique_ptr trace_reader_; + // When reading the trace header, the trace file version can be parsed. + // Replayer will use different decode method to get the trace content based + // on different trace file version. + int trace_file_version_; + std::mutex mutex_; + std::atomic prepared_; + std::atomic trace_end_; + uint64_t header_ts_; + std::unique_ptr exec_handler_; +}; + +// The passin arg of MultiThreadRepkay for each trace record. +struct ReplayerWorkerArg { + Trace trace_entry; + int trace_file_version; + // Handler to execute TraceRecord. + TraceRecord::Handler* handler; + // Callback function to report the error status and the timestamp of the + // TraceRecord. + std::function error_cb; +}; + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE