diff --git a/tools/trace_analyzer_tool.cc b/tools/trace_analyzer_tool.cc index 973b3d6bd..732c8889e 100644 --- a/tools/trace_analyzer_tool.cc +++ b/tools/trace_analyzer_tool.cc @@ -132,7 +132,7 @@ DEFINE_bool(analyze_single_delete, false, "Analyze the SingleDelete query."); DEFINE_bool(analyze_range_delete, false, "Analyze the DeleteRange query."); DEFINE_bool(analyze_merge, false, "Analyze the Merge query."); DEFINE_bool(analyze_iterator, false, - " Analyze the iterate query like seek() and seekForPrev()."); + " Analyze the iterate query like Seek() and SeekForPrev()."); DEFINE_bool(analyze_multiget, false, " Analyze the MultiGet query. NOTE: for" " MultiGet, we analyze each KV-pair read in one MultiGet query. " @@ -280,13 +280,14 @@ TraceAnalyzer::TraceAnalyzer(std::string& trace_path, std::string& output_path, total_access_keys_ = 0; total_gets_ = 0; total_writes_ = 0; + total_seeks_ = 0; + total_seek_prevs_ = 0; + total_multigets_ = 0; trace_create_time_ = 0; begin_time_ = 0; end_time_ = 0; time_series_start_ = 0; cur_time_sec_ = 0; - // Set the default trace file version as version 0.2 - trace_file_version_ = 2; if (FLAGS_sample_ratio > 1.0 || FLAGS_sample_ratio <= 0) { sample_max_ = 1; } else { @@ -360,7 +361,11 @@ TraceAnalyzer::~TraceAnalyzer() {} Status TraceAnalyzer::PrepareProcessing() { Status s; // Prepare the trace reader - s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_); + if (trace_reader_ == nullptr) { + s = NewFileTraceReader(env_, env_options_, trace_name_, &trace_reader_); + } else { + s = trace_reader_->Reset(); + } if (!s.ok()) { return s; } @@ -451,8 +456,9 @@ Status TraceAnalyzer::StartProcessing() { fprintf(stderr, "Cannot read the header\n"); return s; } - s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, - &db_version_); + // Set the default trace file version as version 0.2 + int trace_file_version = 2; + s = TracerHelper::ParseTraceHeader(header, &trace_file_version, &db_version_); if (!s.ok()) { return s; } @@ -469,96 +475,29 @@ Status TraceAnalyzer::StartProcessing() { break; } - total_requests_++; end_time_ = trace.ts; if (trace.type == kTraceEnd) { break; } + // Do not count TraceEnd (if there is one) + total_requests_++; std::unique_ptr record; - switch (trace.type) { - case kTraceWrite: { - s = TracerHelper::DecodeWriteRecord(&trace, trace_file_version_, - &record); - if (!s.ok()) { - return s; - } - total_writes_++; - c_time_ = trace.ts; - std::unique_ptr r( - reinterpret_cast(record.release())); - // Note that, if the write happens in a transaction, - // 'Write' will be called twice, one for Prepare, one for - // Commit. Thus, in the trace, for the same WriteBatch, there - // will be two reords if it is in a transaction. Here, we only - // process the reord that is committed. If write is non-transaction, - // HasBeginPrepare()==false, so we process it normally. - WriteBatch batch(r->GetWriteBatchRep().ToString()); - if (batch.HasBeginPrepare() && !batch.HasCommit()) { - continue; - } - TraceWriteHandler write_handler(this); - s = batch.Iterate(&write_handler); - if (!s.ok()) { - fprintf(stderr, "Cannot process the write batch in the trace\n"); - return s; - } - break; - } - case kTraceGet: { - s = TracerHelper::DecodeGetRecord(&trace, trace_file_version_, &record); - if (!s.ok()) { - return s; - } - total_gets_++; - std::unique_ptr r( - reinterpret_cast(record.release())); - s = HandleGet(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(), - 1); - if (!s.ok()) { - fprintf(stderr, "Cannot process the get in the trace\n"); - return s; - } - break; - } - case kTraceIteratorSeek: - case kTraceIteratorSeekForPrev: { - s = TracerHelper::DecodeIterRecord(&trace, trace_file_version_, - &record); - if (!s.ok()) { - return s; - } - std::unique_ptr r( - reinterpret_cast(record.release())); - s = HandleIter(r->GetColumnFamilyID(), r->GetKey(), r->GetTimestamp(), - r->GetTraceType()); - if (!s.ok()) { - fprintf(stderr, "Cannot process the iterator in the trace\n"); - return s; - } - break; - } - case kTraceMultiGet: { - s = TracerHelper::DecodeMultiGetRecord(&trace, trace_file_version_, - &record); - if (!s.ok()) { - return s; - } - std::unique_ptr r( - reinterpret_cast(record.release())); - s = HandleMultiGet(r->GetColumnFamilyIDs(), r->GetKeys(), - r->GetTimestamp()); - break; - } - default: { - // Skip unsupported types - break; - } + s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version, &record); + if (s.IsNotSupported()) { + continue; + } + if (!s.ok()) { + return s; + } + s = record->Accept(this, nullptr); + if (!s.ok()) { + fprintf(stderr, "Cannot process the TraceRecord\n"); + return s; } } if (s.IsIncomplete()) { // Fix it: Reaching eof returns Incomplete status at the moment. - // return Status::OK(); } return s; @@ -1555,14 +1494,41 @@ Status TraceAnalyzer::CloseOutputFiles() { return s; } -// Handle the Get request in the trace -Status TraceAnalyzer::HandleGet(uint32_t column_family_id, const Slice& key, - const uint64_t& ts, const uint32_t& get_ret) { +Status TraceAnalyzer::Handle(const WriteQueryTraceRecord& record, + std::unique_ptr* /*result*/) { + total_writes_++; + // Note that, if the write happens in a transaction, + // 'Write' will be called twice, one for Prepare, one for + // Commit. Thus, in the trace, for the same WriteBatch, there + // will be two records if it is in a transaction. Here, we only + // process the reord that is committed. If write is non-transaction, + // HasBeginPrepare()==false, so we process it normally. + WriteBatch batch(record.GetWriteBatchRep().ToString()); + if (batch.HasBeginPrepare() && !batch.HasCommit()) { + return Status::OK(); + } + c_time_ = record.GetTimestamp(); + Status s = batch.Iterate(this); + if (!s.ok()) { + fprintf(stderr, "Cannot process the write batch in the trace\n"); + return s; + } + return Status::OK(); +} + +Status TraceAnalyzer::Handle(const GetQueryTraceRecord& record, + std::unique_ptr* /*result*/) { + total_gets_++; + + uint32_t cf_id = record.GetColumnFamilyID(); + Slice key = record.GetKey(); + uint64_t ts = record.GetTimestamp(); + Status s; size_t value_size = 0; if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { - s = WriteTraceSequence(TraceOperationType::kGet, column_family_id, key, - value_size, ts); + s = WriteTraceSequence(TraceOperationType::kGet, cf_id, key, value_size, + ts); if (!s.ok()) { return Status::Corruption("Failed to write the trace sequence to file"); } @@ -1580,11 +1546,109 @@ Status TraceAnalyzer::HandleGet(uint32_t column_family_id, const Slice& key, if (!ta_[TraceOperationType::kGet].enabled) { return Status::OK(); } - if (get_ret == 1) { - value_size = 10; + value_size = 10; + s = KeyStatsInsertion(TraceOperationType::kGet, cf_id, key.ToString(), + value_size, ts); + if (!s.ok()) { + return Status::Corruption("Failed to insert key statistics"); + } + return s; +} + +Status TraceAnalyzer::Handle(const IteratorSeekQueryTraceRecord& record, + std::unique_ptr* /*result*/) { + uint32_t cf_id = record.GetColumnFamilyID(); + Slice key = record.GetKey(); + uint64_t ts = record.GetTimestamp(); + + // To do: add lower/upper bounds + + Status s; + size_t value_size = 0; + int type = -1; + if (record.GetTraceType() == kTraceIteratorSeek) { + type = TraceOperationType::kIteratorSeek; + total_seeks_++; + } else if (record.GetTraceType() == kTraceIteratorSeekForPrev) { + type = TraceOperationType::kIteratorSeekForPrev; + total_seek_prevs_++; + } else { + return s; + } + if (type == -1) { + return s; + } + + if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { + s = WriteTraceSequence(type, cf_id, key, value_size, ts); + if (!s.ok()) { + return Status::Corruption("Failed to write the trace sequence to file"); + } + } + + if (ta_[type].sample_count >= sample_max_) { + ta_[type].sample_count = 0; + } + if (ta_[type].sample_count > 0) { + ta_[type].sample_count++; + return Status::OK(); + } + ta_[type].sample_count++; + + if (!ta_[type].enabled) { + return Status::OK(); + } + s = KeyStatsInsertion(type, cf_id, key.ToString(), value_size, ts); + if (!s.ok()) { + return Status::Corruption("Failed to insert key statistics"); + } + return s; +} + +Status TraceAnalyzer::Handle(const MultiGetQueryTraceRecord& record, + std::unique_ptr* /*result*/) { + total_multigets_++; + + std::vector cf_ids = record.GetColumnFamilyIDs(); + std::vector keys = record.GetKeys(); + uint64_t ts = record.GetTimestamp(); + + Status s; + size_t value_size = 0; + if (cf_ids.size() != keys.size()) { + // The size does not match is not the error of tracing and anayzing, we just + // report it to the user. The analyzing continues. + printf("The CF ID vector size does not match the keys vector size!\n"); + } + size_t vector_size = std::min(cf_ids.size(), keys.size()); + if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { + for (size_t i = 0; i < vector_size; i++) { + assert(i < cf_ids.size() && i < keys.size()); + s = WriteTraceSequence(TraceOperationType::kMultiGet, cf_ids[i], keys[i], + value_size, ts); + } + if (!s.ok()) { + return Status::Corruption("Failed to write the trace sequence to file"); + } + } + + if (ta_[TraceOperationType::kMultiGet].sample_count >= sample_max_) { + ta_[TraceOperationType::kMultiGet].sample_count = 0; + } + if (ta_[TraceOperationType::kMultiGet].sample_count > 0) { + ta_[TraceOperationType::kMultiGet].sample_count++; + return Status::OK(); + } + ta_[TraceOperationType::kMultiGet].sample_count++; + + if (!ta_[TraceOperationType::kMultiGet].enabled) { + return Status::OK(); + } + for (size_t i = 0; i < vector_size; i++) { + assert(i < cf_ids.size() && i < keys.size()); + s = KeyStatsInsertion(TraceOperationType::kMultiGet, cf_ids[i], + keys[i].ToString(), value_size, ts); } - s = KeyStatsInsertion(TraceOperationType::kGet, column_family_id, - key.ToString(), value_size, ts); if (!s.ok()) { return Status::Corruption("Failed to insert key statistics"); } @@ -1592,8 +1656,8 @@ Status TraceAnalyzer::HandleGet(uint32_t column_family_id, const Slice& key, } // Handle the Put request in the write batch of the trace -Status TraceAnalyzer::HandlePut(uint32_t column_family_id, const Slice& key, - const Slice& value) { +Status TraceAnalyzer::PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { Status s; size_t value_size = value.ToString().size(); if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { @@ -1625,8 +1689,7 @@ Status TraceAnalyzer::HandlePut(uint32_t column_family_id, const Slice& key, } // Handle the Delete request in the write batch of the trace -Status TraceAnalyzer::HandleDelete(uint32_t column_family_id, - const Slice& key) { +Status TraceAnalyzer::DeleteCF(uint32_t column_family_id, const Slice& key) { Status s; size_t value_size = 0; if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { @@ -1658,8 +1721,8 @@ Status TraceAnalyzer::HandleDelete(uint32_t column_family_id, } // Handle the SingleDelete request in the write batch of the trace -Status TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id, - const Slice& key) { +Status TraceAnalyzer::SingleDeleteCF(uint32_t column_family_id, + const Slice& key) { Status s; size_t value_size = 0; if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { @@ -1691,9 +1754,9 @@ Status TraceAnalyzer::HandleSingleDelete(uint32_t column_family_id, } // Handle the DeleteRange request in the write batch of the trace -Status TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id, - const Slice& begin_key, - const Slice& end_key) { +Status TraceAnalyzer::DeleteRangeCF(uint32_t column_family_id, + const Slice& begin_key, + const Slice& end_key) { Status s; size_t value_size = 0; if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { @@ -1727,8 +1790,8 @@ Status TraceAnalyzer::HandleDeleteRange(uint32_t column_family_id, } // Handle the Merge request in the write batch of the trace -Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key, - const Slice& value) { +Status TraceAnalyzer::MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { Status s; size_t value_size = value.ToString().size(); if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { @@ -1759,95 +1822,6 @@ Status TraceAnalyzer::HandleMerge(uint32_t column_family_id, const Slice& key, return s; } -// Handle the Iterator request in the trace -Status TraceAnalyzer::HandleIter(uint32_t column_family_id, const Slice& key, - const uint64_t& ts, TraceType trace_type) { - Status s; - size_t value_size = 0; - int type = -1; - if (trace_type == kTraceIteratorSeek) { - type = TraceOperationType::kIteratorSeek; - } else if (trace_type == kTraceIteratorSeekForPrev) { - type = TraceOperationType::kIteratorSeekForPrev; - } else { - return s; - } - if (type == -1) { - return s; - } - - if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { - s = WriteTraceSequence(type, column_family_id, key, value_size, ts); - if (!s.ok()) { - return Status::Corruption("Failed to write the trace sequence to file"); - } - } - - if (ta_[type].sample_count >= sample_max_) { - ta_[type].sample_count = 0; - } - if (ta_[type].sample_count > 0) { - ta_[type].sample_count++; - return Status::OK(); - } - ta_[type].sample_count++; - - if (!ta_[type].enabled) { - return Status::OK(); - } - s = KeyStatsInsertion(type, column_family_id, key.ToString(), value_size, ts); - if (!s.ok()) { - return Status::Corruption("Failed to insert key statistics"); - } - return s; -} - -// Handle MultiGet queries in the trace -Status TraceAnalyzer::HandleMultiGet( - const std::vector& column_family_ids, - const std::vector& keys, const uint64_t& ts) { - Status s; - size_t value_size = 0; - if (column_family_ids.size() != keys.size()) { - // The size does not match is not the error of tracing and anayzing, we just - // report it to the user. The analyzing continues. - printf("The CF ID vector size does not match the keys vector size!\n"); - } - size_t vector_size = std::min(column_family_ids.size(), keys.size()); - if (FLAGS_convert_to_human_readable_trace && trace_sequence_f_) { - for (size_t i = 0; i < vector_size; i++) { - assert(i < column_family_ids.size() && i < keys.size()); - s = WriteTraceSequence(TraceOperationType::kMultiGet, - column_family_ids[i], keys[i], value_size, ts); - } - if (!s.ok()) { - return Status::Corruption("Failed to write the trace sequence to file"); - } - } - - if (ta_[TraceOperationType::kMultiGet].sample_count >= sample_max_) { - ta_[TraceOperationType::kMultiGet].sample_count = 0; - } - if (ta_[TraceOperationType::kMultiGet].sample_count > 0) { - ta_[TraceOperationType::kMultiGet].sample_count++; - return Status::OK(); - } - ta_[TraceOperationType::kMultiGet].sample_count++; - - if (!ta_[TraceOperationType::kMultiGet].enabled) { - return Status::OK(); - } - for (size_t i = 0; i < vector_size; i++) { - assert(i < column_family_ids.size() && i < keys.size()); - s = KeyStatsInsertion(TraceOperationType::kMultiGet, column_family_ids[i], - keys[i].ToString(), value_size, ts); - } - if (!s.ok()) { - return Status::Corruption("Failed to insert key statistics"); - } - return s; -} - // Before the analyzer is closed, the requested general statistic results are // printed out here. In current stage, these information are not output to // the files. @@ -1999,8 +1973,11 @@ void TraceAnalyzer::PrintStatistics() { printf("The statistics related to query number need to times: %u\n", sample_max_); printf("Total_requests: %" PRIu64 " Total_accessed_keys: %" PRIu64 - " Total_gets: %" PRIu64 " Total_write_batch: %" PRIu64 "\n", - total_requests_, total_access_keys_, total_gets_, total_writes_); + " Total_gets: %" PRIu64 " Total_write_batches: %" PRIu64 + " Total_seeks: %" PRIu64 " Total_seek_for_prevs: %" PRIu64 + " Total_multigets: %" PRIu64 "\n", + total_requests_, total_access_keys_, total_gets_, total_writes_, + total_seeks_, total_seek_prevs_, total_multigets_); for (int type = 0; type < kTaTypeNum; type++) { if (!ta_[type].enabled) { continue; diff --git a/tools/trace_analyzer_tool.h b/tools/trace_analyzer_tool.h index 7eafd2a3c..14f44ff9c 100644 --- a/tools/trace_analyzer_tool.h +++ b/tools/trace_analyzer_tool.h @@ -164,7 +164,8 @@ struct CfUnit { std::map cf_qps; }; -class TraceAnalyzer { +class TraceAnalyzer : private TraceRecord::Handler, + private WriteBatch::Handler { public: TraceAnalyzer(std::string& trace_path, std::string& output_path, AnalyzerOptions _analyzer_opts); @@ -182,24 +183,64 @@ class TraceAnalyzer { Status WriteTraceUnit(TraceUnit& unit); - // The trace processing functions for different type - Status HandleGet(uint32_t column_family_id, const Slice& key, - const uint64_t& ts, const uint32_t& get_ret); - Status HandlePut(uint32_t column_family_id, const Slice& key, - const Slice& value); - Status HandleDelete(uint32_t column_family_id, const Slice& key); - Status HandleSingleDelete(uint32_t column_family_id, const Slice& key); - Status HandleDeleteRange(uint32_t column_family_id, const Slice& begin_key, - const Slice& end_key); - Status HandleMerge(uint32_t column_family_id, const Slice& key, - const Slice& value); - Status HandleIter(uint32_t column_family_id, const Slice& key, - const uint64_t& ts, TraceType trace_type); - Status HandleMultiGet(const std::vector& column_family_ids, - const std::vector& keys, const uint64_t& ts); std::vector& GetTaVector() { return ta_; } private: + using TraceRecord::Handler::Handle; + Status Handle(const WriteQueryTraceRecord& record, + std::unique_ptr* result) override; + Status Handle(const GetQueryTraceRecord& record, + std::unique_ptr* result) override; + Status Handle(const IteratorSeekQueryTraceRecord& record, + std::unique_ptr* result) override; + Status Handle(const MultiGetQueryTraceRecord& record, + std::unique_ptr* result) override; + + using WriteBatch::Handler::PutCF; + Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override; + + using WriteBatch::Handler::DeleteCF; + Status DeleteCF(uint32_t column_family_id, const Slice& key) override; + + using WriteBatch::Handler::SingleDeleteCF; + Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override; + + using WriteBatch::Handler::DeleteRangeCF; + Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key, + const Slice& end_key) override; + + using WriteBatch::Handler::MergeCF; + Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override; + + // The following hanlders are not implemented, return Status::OK() to avoid + // the running time assertion and other irrelevant falures. + using WriteBatch::Handler::PutBlobIndexCF; + Status PutBlobIndexCF(uint32_t /*column_family_id*/, const Slice& /*key*/, + const Slice& /*value*/) override { + return Status::OK(); + } + + // The default implementation of LogData does nothing. + using WriteBatch::Handler::LogData; + void LogData(const Slice& /*blob*/) override {} + + using WriteBatch::Handler::MarkBeginPrepare; + Status MarkBeginPrepare(bool = false) override { return Status::OK(); } + + using WriteBatch::Handler::MarkEndPrepare; + Status MarkEndPrepare(const Slice& /*xid*/) override { return Status::OK(); } + + using WriteBatch::Handler::MarkNoop; + Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } + + using WriteBatch::Handler::MarkRollback; + Status MarkRollback(const Slice& /*xid*/) override { return Status::OK(); } + + using WriteBatch::Handler::MarkCommit; + Status MarkCommit(const Slice& /*xid*/) override { return Status::OK(); } + ROCKSDB_NAMESPACE::Env* env_; EnvOptions env_options_; std::unique_ptr trace_reader_; @@ -213,6 +254,9 @@ class TraceAnalyzer { uint64_t total_access_keys_; uint64_t total_gets_; uint64_t total_writes_; + uint64_t total_seeks_; + uint64_t total_seek_prevs_; + uint64_t total_multigets_; uint64_t trace_create_time_; uint64_t begin_time_; uint64_t end_time_; @@ -253,76 +297,9 @@ class TraceAnalyzer { Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats); Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit); Status MakeStatisticQPS(); - // Set the default trace file version as version 0.2 - int trace_file_version_; int db_version_; }; -// write bach handler to be used for WriteBache iterator -// when processing the write trace -class TraceWriteHandler : public WriteBatch::Handler { - public: - TraceWriteHandler() { ta_ptr = nullptr; } - explicit TraceWriteHandler(TraceAnalyzer* _ta_ptr) { ta_ptr = _ta_ptr; } - ~TraceWriteHandler() {} - - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { - return ta_ptr->HandlePut(column_family_id, key, value); - } - virtual Status DeleteCF(uint32_t column_family_id, - const Slice& key) override { - return ta_ptr->HandleDelete(column_family_id, key); - } - virtual Status SingleDeleteCF(uint32_t column_family_id, - const Slice& key) override { - return ta_ptr->HandleSingleDelete(column_family_id, key); - } - virtual Status DeleteRangeCF(uint32_t column_family_id, - const Slice& begin_key, - const Slice& end_key) override { - return ta_ptr->HandleDeleteRange(column_family_id, begin_key, end_key); - } - virtual Status MergeCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { - return ta_ptr->HandleMerge(column_family_id, key, value); - } - - // The following hanlders are not implemented, return Status::OK() to avoid - // the running time assertion and other irrelevant falures. - virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/, - const Slice& /*key*/, - const Slice& /*value*/) override { - return Status::OK(); - } - - // The default implementation of LogData does nothing. - virtual void LogData(const Slice& /*blob*/) override {} - - virtual Status MarkBeginPrepare(bool = false) override { - return Status::OK(); - } - - virtual Status MarkEndPrepare(const Slice& /*xid*/) override { - return Status::OK(); - } - - virtual Status MarkNoop(bool /*empty_batch*/) override { - return Status::OK(); - } - - virtual Status MarkRollback(const Slice& /*xid*/) override { - return Status::OK(); - } - - virtual Status MarkCommit(const Slice& /*xid*/) override { - return Status::OK(); - } - - private: - TraceAnalyzer* ta_ptr; -}; - int trace_analyzer_tool(int argc, char** argv); } // namespace ROCKSDB_NAMESPACE diff --git a/trace_replay/trace_replay.cc b/trace_replay/trace_replay.cc index 01867b9f4..89bea7870 100644 --- a/trace_replay/trace_replay.cc +++ b/trace_replay/trace_replay.cc @@ -126,222 +126,218 @@ bool TracerHelper::SetPayloadMap(uint64_t& payload_map, return old_state != payload_map; } -Status TracerHelper::DecodeWriteRecord(Trace* trace, int trace_file_version, +Status TracerHelper::DecodeTraceRecord(Trace* trace, int trace_file_version, std::unique_ptr* record) { assert(trace != nullptr); - assert(trace->type == kTraceWrite); if (record != nullptr) { record->reset(nullptr); } - PinnableSlice rep; - if (trace_file_version < 2) { - rep.PinSelf(trace->payload); - } else { - Slice buf(trace->payload); - GetFixed64(&buf, &trace->payload_map); - int64_t payload_map = static_cast(trace->payload_map); - Slice write_batch_data; - while (payload_map) { - // Find the rightmost set bit. - uint32_t set_pos = - static_cast(log2(payload_map & -payload_map)); - switch (set_pos) { - case TracePayloadType::kWriteBatchData: - GetLengthPrefixedSlice(&buf, &write_batch_data); - break; - default: - assert(false); + switch (trace->type) { + // Write + case kTraceWrite: { + PinnableSlice rep; + if (trace_file_version < 2) { + rep.PinSelf(trace->payload); + } else { + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast(trace->payload_map); + Slice write_batch_data; + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = + static_cast(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kWriteBatchData: { + GetLengthPrefixedSlice(&buf, &write_batch_data); + break; + } + default: { + assert(false); + } + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } + rep.PinSelf(write_batch_data); } - // unset the rightmost bit. - payload_map &= (payload_map - 1); - } - rep.PinSelf(write_batch_data); - } - - if (record != nullptr) { - record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts)); - } - - return Status::OK(); -} - -Status TracerHelper::DecodeGetRecord(Trace* trace, int trace_file_version, - std::unique_ptr* record) { - assert(trace != nullptr); - assert(trace->type == kTraceGet); - - if (record != nullptr) { - record->reset(nullptr); - } - uint32_t cf_id = 0; - Slice get_key; - - if (trace_file_version < 2) { - DecodeCFAndKey(trace->payload, &cf_id, &get_key); - } else { - Slice buf(trace->payload); - GetFixed64(&buf, &trace->payload_map); - int64_t payload_map = static_cast(trace->payload_map); - while (payload_map) { - // Find the rightmost set bit. - uint32_t set_pos = - static_cast(log2(payload_map & -payload_map)); - switch (set_pos) { - case TracePayloadType::kGetCFID: - GetFixed32(&buf, &cf_id); - break; - case TracePayloadType::kGetKey: - GetLengthPrefixedSlice(&buf, &get_key); - break; - default: - assert(false); + if (record != nullptr) { + record->reset(new WriteQueryTraceRecord(std::move(rep), trace->ts)); } - // unset the rightmost bit. - payload_map &= (payload_map - 1); - } - } - if (record != nullptr) { - PinnableSlice ps; - ps.PinSelf(get_key); - record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts)); - } - - return Status::OK(); -} + return Status::OK(); + } + // Get + case kTraceGet: { + uint32_t cf_id = 0; + Slice get_key; + + if (trace_file_version < 2) { + DecodeCFAndKey(trace->payload, &cf_id, &get_key); + } else { + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = + static_cast(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kGetCFID: { + GetFixed32(&buf, &cf_id); + break; + } + case TracePayloadType::kGetKey: { + GetLengthPrefixedSlice(&buf, &get_key); + break; + } + default: { + assert(false); + } + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } + } -Status TracerHelper::DecodeIterRecord(Trace* trace, int trace_file_version, - std::unique_ptr* record) { - assert(trace != nullptr); - assert(trace->type == kTraceIteratorSeek || - trace->type == kTraceIteratorSeekForPrev); + if (record != nullptr) { + PinnableSlice ps; + ps.PinSelf(get_key); + record->reset(new GetQueryTraceRecord(cf_id, std::move(ps), trace->ts)); + } - if (record != nullptr) { - record->reset(nullptr); - } + return Status::OK(); + } + // Iterator Seek and SeekForPrev + case kTraceIteratorSeek: + case kTraceIteratorSeekForPrev: { + uint32_t cf_id = 0; + Slice iter_key; + Slice lower_bound; + Slice upper_bound; + + if (trace_file_version < 2) { + DecodeCFAndKey(trace->payload, &cf_id, &iter_key); + } else { + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = + static_cast(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kIterCFID: { + GetFixed32(&buf, &cf_id); + break; + } + case TracePayloadType::kIterKey: { + GetLengthPrefixedSlice(&buf, &iter_key); + break; + } + case TracePayloadType::kIterLowerBound: { + GetLengthPrefixedSlice(&buf, &lower_bound); + break; + } + case TracePayloadType::kIterUpperBound: { + GetLengthPrefixedSlice(&buf, &upper_bound); + break; + } + default: { + assert(false); + } + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } + } - uint32_t cf_id = 0; - Slice iter_key; - Slice lower_bound; - Slice upper_bound; - - if (trace_file_version < 2) { - DecodeCFAndKey(trace->payload, &cf_id, &iter_key); - } else { - Slice buf(trace->payload); - GetFixed64(&buf, &trace->payload_map); - int64_t payload_map = static_cast(trace->payload_map); - while (payload_map) { - // Find the rightmost set bit. - uint32_t set_pos = - static_cast(log2(payload_map & -payload_map)); - switch (set_pos) { - case TracePayloadType::kIterCFID: - GetFixed32(&buf, &cf_id); - break; - case TracePayloadType::kIterKey: - GetLengthPrefixedSlice(&buf, &iter_key); - break; - case TracePayloadType::kIterLowerBound: - GetLengthPrefixedSlice(&buf, &lower_bound); - break; - case TracePayloadType::kIterUpperBound: - GetLengthPrefixedSlice(&buf, &upper_bound); - break; - default: - assert(false); + if (record != nullptr) { + PinnableSlice ps_key; + ps_key.PinSelf(iter_key); + PinnableSlice ps_lower; + ps_lower.PinSelf(lower_bound); + PinnableSlice ps_upper; + ps_upper.PinSelf(upper_bound); + record->reset(new IteratorSeekQueryTraceRecord( + static_cast(trace->type), + cf_id, std::move(ps_key), std::move(ps_lower), std::move(ps_upper), + trace->ts)); } - // unset the rightmost bit. - payload_map &= (payload_map - 1); - } - } - if (record != nullptr) { - PinnableSlice ps_key; - ps_key.PinSelf(iter_key); - PinnableSlice ps_lower; - ps_lower.PinSelf(lower_bound); - PinnableSlice ps_upper; - ps_upper.PinSelf(upper_bound); - record->reset(new IteratorSeekQueryTraceRecord( - static_cast(trace->type), cf_id, - std::move(ps_key), std::move(ps_lower), std::move(ps_upper), - trace->ts)); - } + return Status::OK(); + } + // MultiGet + case kTraceMultiGet: { + if (trace_file_version < 2) { + return Status::Corruption("MultiGet is not supported."); + } - return Status::OK(); -} + uint32_t multiget_size = 0; + std::vector cf_ids; + std::vector multiget_keys; + + Slice cfids_payload; + Slice keys_payload; + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = + static_cast(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kMultiGetSize: { + GetFixed32(&buf, &multiget_size); + break; + } + case TracePayloadType::kMultiGetCFIDs: { + GetLengthPrefixedSlice(&buf, &cfids_payload); + break; + } + case TracePayloadType::kMultiGetKeys: { + GetLengthPrefixedSlice(&buf, &keys_payload); + break; + } + default: { + assert(false); + } + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } + if (multiget_size == 0) { + return Status::InvalidArgument("Empty MultiGet cf_ids or keys."); + } -Status TracerHelper::DecodeMultiGetRecord( - Trace* trace, int trace_file_version, - std::unique_ptr* record) { - assert(trace != nullptr); - assert(trace->type == kTraceMultiGet); + // Decode the cfids_payload and keys_payload + cf_ids.reserve(multiget_size); + multiget_keys.reserve(multiget_size); + for (uint32_t i = 0; i < multiget_size; i++) { + uint32_t tmp_cfid; + Slice tmp_key; + GetFixed32(&cfids_payload, &tmp_cfid); + GetLengthPrefixedSlice(&keys_payload, &tmp_key); + cf_ids.push_back(tmp_cfid); + Slice s(tmp_key); + PinnableSlice ps; + ps.PinSelf(s); + multiget_keys.push_back(std::move(ps)); + } - if (record != nullptr) { - record->reset(nullptr); - } + if (record != nullptr) { + record->reset(new MultiGetQueryTraceRecord( + std::move(cf_ids), std::move(multiget_keys), trace->ts)); + } - if (trace_file_version < 2) { - return Status::Corruption("MultiGet is not supported."); - } - - uint32_t multiget_size = 0; - std::vector cf_ids; - std::vector multiget_keys; - - Slice cfids_payload; - Slice keys_payload; - Slice buf(trace->payload); - GetFixed64(&buf, &trace->payload_map); - int64_t payload_map = static_cast(trace->payload_map); - while (payload_map) { - // Find the rightmost set bit. - uint32_t set_pos = static_cast(log2(payload_map & -payload_map)); - switch (set_pos) { - case TracePayloadType::kMultiGetSize: - GetFixed32(&buf, &multiget_size); - break; - case TracePayloadType::kMultiGetCFIDs: - GetLengthPrefixedSlice(&buf, &cfids_payload); - break; - case TracePayloadType::kMultiGetKeys: - GetLengthPrefixedSlice(&buf, &keys_payload); - break; - default: - assert(false); + return Status::OK(); } - // unset the rightmost bit. - payload_map &= (payload_map - 1); + default: + return Status::NotSupported("Unsupported trace type."); } - if (multiget_size == 0) { - return Status::InvalidArgument("Empty MultiGet cf_ids or keys."); - } - - // Decode the cfids_payload and keys_payload - cf_ids.reserve(multiget_size); - multiget_keys.reserve(multiget_size); - for (uint32_t i = 0; i < multiget_size; i++) { - uint32_t tmp_cfid; - Slice tmp_key; - GetFixed32(&cfids_payload, &tmp_cfid); - GetLengthPrefixedSlice(&keys_payload, &tmp_key); - cf_ids.push_back(tmp_cfid); - Slice s(tmp_key); - PinnableSlice ps; - ps.PinSelf(s); - multiget_keys.push_back(std::move(ps)); - } - - if (record != nullptr) { - record->reset(new MultiGetQueryTraceRecord( - std::move(cf_ids), std::move(multiget_keys), trace->ts)); - } - - return Status::OK(); } Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options, diff --git a/trace_replay/trace_replay.h b/trace_replay/trace_replay.h index 979eb3492..4990accef 100644 --- a/trace_replay/trace_replay.h +++ b/trace_replay/trace_replay.h @@ -106,21 +106,12 @@ class TracerHelper { static bool SetPayloadMap(uint64_t& payload_map, const TracePayloadType payload_type); - // Decode the write payload and store in WrteiPayload - static Status DecodeWriteRecord(Trace* trace, int trace_file_version, + // Decode a Trace object into the corresponding TraceRecord. + // Return Status::OK() if nothing is wrong, record will be set accordingly. + // Return Status::NotSupported() if the trace type is not support, or the + // corresponding error status, record will be set to nullptr. + static Status DecodeTraceRecord(Trace* trace, int trace_file_version, std::unique_ptr* record); - - // Decode the get payload and store in WrteiPayload - static Status DecodeGetRecord(Trace* trace, int trace_file_version, - std::unique_ptr* record); - - // Decode the iter payload and store in WrteiPayload - static Status DecodeIterRecord(Trace* trace, int trace_file_version, - std::unique_ptr* record); - - // Decode the multiget payload and store in MultiGetPayload - static Status DecodeMultiGetRecord(Trace* trace, int trace_file_version, - std::unique_ptr* record); }; // Tracer captures all RocksDB operations using a user-provided TraceWriter. diff --git a/utilities/trace/replayer_impl.cc b/utilities/trace/replayer_impl.cc index c98155d53..09d94441e 100644 --- a/utilities/trace/replayer_impl.cc +++ b/utilities/trace/replayer_impl.cc @@ -70,7 +70,7 @@ Status ReplayerImpl::Next(std::unique_ptr* record) { return s; } - return DecodeTraceRecord(&trace, trace_file_version_, record); + return TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, record); } Status ReplayerImpl::Execute(const std::unique_ptr& record, @@ -117,7 +117,7 @@ Status ReplayerImpl::Replay( // In single-threaded replay, decode first then sleep. std::unique_ptr record; - s = DecodeTraceRecord(&trace, trace_file_version_, &record); + s = TracerHelper::DecodeTraceRecord(&trace, trace_file_version_, &record); if (!s.ok() && !s.IsNotSupported()) { break; } @@ -283,34 +283,14 @@ Status ReplayerImpl::ReadTrace(Trace* trace) { return TracerHelper::DecodeTrace(encoded_trace, trace); } -Status ReplayerImpl::DecodeTraceRecord(Trace* trace, int trace_file_version, - std::unique_ptr* record) { - switch (trace->type) { - case kTraceWrite: - return TracerHelper::DecodeWriteRecord(trace, trace_file_version, record); - case kTraceGet: - return TracerHelper::DecodeGetRecord(trace, trace_file_version, record); - case kTraceIteratorSeek: - case kTraceIteratorSeekForPrev: - return TracerHelper::DecodeIterRecord(trace, trace_file_version, record); - case kTraceMultiGet: - return TracerHelper::DecodeMultiGetRecord(trace, trace_file_version, - record); - case kTraceEnd: - return Status::Incomplete("Trace end."); - default: - return Status::NotSupported("Unsupported trace type."); - } -} - void ReplayerImpl::BackgroundWork(void* arg) { std::unique_ptr ra( reinterpret_cast(arg)); assert(ra != nullptr); std::unique_ptr record; - Status s = - DecodeTraceRecord(&(ra->trace_entry), ra->trace_file_version, &record); + Status s = TracerHelper::DecodeTraceRecord(&(ra->trace_entry), + ra->trace_file_version, &record); if (!s.ok()) { // Stop the replay if (ra->error_cb != nullptr) { diff --git a/utilities/trace/replayer_impl.h b/utilities/trace/replayer_impl.h index 9cf182960..3a61b2ecb 100644 --- a/utilities/trace/replayer_impl.h +++ b/utilities/trace/replayer_impl.h @@ -53,10 +53,6 @@ class ReplayerImpl : public Replayer { Status ReadFooter(Trace* footer); Status ReadTrace(Trace* trace); - // Generic function to convert a Trace to TraceRecord. - static Status DecodeTraceRecord(Trace* trace, int trace_file_version, - std::unique_ptr* record); - // Generic function to execute a Trace in a thread pool. static void BackgroundWork(void* arg);