From b0fd1cc45a99384b92f781511075333656592844 Mon Sep 17 00:00:00 2001 From: Zhichao Cao Date: Thu, 18 Feb 2021 23:03:49 -0800 Subject: [PATCH] Introduce a new trace file format (v 0.2) for better extension (#7977) Summary: The trace file record and payload encode is fixed, which requires complex backward compatibility resolving. This PR introduce a new trace file format, which makes it easier to add new entries to the payload and does not have backward compatible issues. V 0.1 is still supported in this PR. Added the tracing for lower_bound and upper_bound for iterator. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7977 Test Plan: make check. tested with old trace file in replay and analyzing. Reviewed By: anand1976 Differential Revision: D26529948 Pulled By: zhichao-cao fbshipit-source-id: ebb75a127ce3c07c25a1ccc194c551f917896a76 --- HISTORY.md | 1 + db/db_impl/db_impl.cc | 13 +- db/db_impl/db_impl.h | 7 +- db/db_iter.cc | 29 ++- tools/trace_analyzer_test.cc | 4 + tools/trace_analyzer_tool.cc | 62 ++++-- tools/trace_analyzer_tool.h | 3 + trace_replay/trace_replay.cc | 383 +++++++++++++++++++++++++++++------ trace_replay/trace_replay.h | 73 ++++++- 9 files changed, 481 insertions(+), 94 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 871ae2ccc..1730585d8 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Behavior Changes * When retryable IO error occurs during compaction, it is mapped to soft error and set the BG error. However, auto resume is not called to clean the soft error since compaction will reschedule by itself. In this change, When retryable IO error occurs during compaction, BG error is not set. User will be informed the error via EventHelper. +* Introduce a new trace file format for query tracing and replay and trace file version is bump up to 0.2. A payload map is added as the first portion of the payload. We will not have backward compatible issues when adding new entries to trace records. Added the iterator_upper_bound and iterator_lower_bound in Seek and SeekForPrev tracing function. Added them as the new payload member for iterator tracing. ### New Features * Add support for key-value integrity protection in live updates from the user buffers provided to `WriteBatch` through the write to RocksDB's in-memory update buffer (memtable). This is intended to detect some cases of in-memory data corruption, due to either software or hardware errors. Users can enable protection by constructing their `WriteBatch` with `protection_bytes_per_key == 8`. diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 5a0792ee5..19b682b2c 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -4975,24 +4975,27 @@ Status DBImpl::EndBlockCacheTrace() { return Status::OK(); } -Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) { +Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, + const Slice upper_bound) { Status s; if (tracer_) { InstrumentedMutexLock lock(&trace_mutex_); if (tracer_) { - s = tracer_->IteratorSeek(cf_id, key); + s = tracer_->IteratorSeek(cf_id, key, lower_bound, upper_bound); } } return s; } -Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, - const Slice& key) { +Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, + const Slice upper_bound) { Status s; if (tracer_) { InstrumentedMutexLock lock(&trace_mutex_); if (tracer_) { - s = tracer_->IteratorSeekForPrev(cf_id, key); + s = tracer_->IteratorSeekForPrev(cf_id, key, lower_bound, upper_bound); } } return s; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 470135612..1973eceb2 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -592,8 +592,11 @@ class DBImpl : public DB { bool* found_record_for_key, bool* is_blob_index = nullptr); - Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key); - Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key); + Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, const Slice upper_bound); + Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, + const Slice upper_bound); #endif // ROCKSDB_LITE // Similar to GetSnapshot(), but also lets the db know that this snapshot diff --git a/db/db_iter.cc b/db/db_iter.cc index dc67aa309..f25fd8b2c 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -1257,7 +1257,19 @@ void DBIter::Seek(const Slice& target) { #ifndef ROCKSDB_LITE if (db_impl_ != nullptr && cfd_ != nullptr) { // TODO: What do we do if this returns an error? - db_impl_->TraceIteratorSeek(cfd_->GetID(), target).PermitUncheckedError(); + Slice lower_bound, upper_bound; + if (iterate_lower_bound_ != nullptr) { + lower_bound = *iterate_lower_bound_; + } else { + lower_bound = Slice(""); + } + if (iterate_upper_bound_ != nullptr) { + upper_bound = *iterate_upper_bound_; + } else { + upper_bound = Slice(""); + } + db_impl_->TraceIteratorSeek(cfd_->GetID(), target, lower_bound, upper_bound) + .PermitUncheckedError(); } #endif // ROCKSDB_LITE @@ -1319,7 +1331,20 @@ void DBIter::SeekForPrev(const Slice& target) { #ifndef ROCKSDB_LITE if (db_impl_ != nullptr && cfd_ != nullptr) { // TODO: What do we do if this returns an error? - db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target) + Slice lower_bound, upper_bound; + if (iterate_lower_bound_ != nullptr) { + lower_bound = *iterate_lower_bound_; + } else { + lower_bound = Slice(""); + } + if (iterate_upper_bound_ != nullptr) { + upper_bound = *iterate_upper_bound_; + } else { + upper_bound = Slice(""); + } + db_impl_ + ->TraceIteratorSeekForPrev(cfd_->GetID(), target, lower_bound, + upper_bound) .PermitUncheckedError(); } #endif // ROCKSDB_LITE diff --git a/tools/trace_analyzer_test.cc b/tools/trace_analyzer_test.cc index dc41e9069..d82d438c6 100644 --- a/tools/trace_analyzer_test.cc +++ b/tools/trace_analyzer_test.cc @@ -57,7 +57,11 @@ class TraceAnalyzerTest : public testing::Test { Options options; options.create_if_missing = true; options.merge_operator = MergeOperators::CreatePutOperator(); + Slice upper_bound("a"); + Slice lower_bound("abce"); ReadOptions ro; + ro.iterate_upper_bound = &upper_bound; + ro.iterate_lower_bound = &lower_bound; WriteOptions wo; TraceOptions trace_opt; DB* db_ = nullptr; diff --git a/tools/trace_analyzer_tool.cc b/tools/trace_analyzer_tool.cc index 9f9a36636..a19947af2 100644 --- a/tools/trace_analyzer_tool.cc +++ b/tools/trace_analyzer_tool.cc @@ -283,6 +283,8 @@ TraceAnalyzer::TraceAnalyzer(std::string& trace_path, std::string& output_path, end_time_ = 0; time_series_start_ = 0; cur_time_sec_ = 0; + // Set the default trace file version as version 0.2 + trace_file_version_ = 2; if (FLAGS_sample_ratio > 1.0 || FLAGS_sample_ratio <= 0) { sample_max_ = 1; } else { @@ -389,10 +391,15 @@ Status TraceAnalyzer::PrepareProcessing() { Status TraceAnalyzer::ReadTraceHeader(Trace* header) { assert(header != nullptr); - Status s = ReadTraceRecord(header); + std::string encoded_trace; + // Read the trace head + Status s = trace_reader_->Read(&encoded_trace); if (!s.ok()) { return s; } + + s = TracerHelper::DecodeTrace(encoded_trace, header); + if (header->type != kTraceBegin) { return Status::Corruption("Corrupted trace file. Incorrect header."); } @@ -422,13 +429,7 @@ Status TraceAnalyzer::ReadTraceRecord(Trace* trace) { if (!s.ok()) { return s; } - - Slice enc_slice = Slice(encoded_trace); - GetFixed64(&enc_slice, &trace->ts); - trace->type = static_cast(enc_slice[0]); - enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize); - trace->payload = enc_slice.ToString(); - return s; + return TracerHelper::DecodeTrace(encoded_trace, trace); } // process the trace itself and redirect the trace content @@ -442,6 +443,11 @@ Status TraceAnalyzer::StartProcessing() { fprintf(stderr, "Cannot read the header\n"); return s; } + s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, + &db_version_); + if (!s.ok()) { + return s; + } trace_create_time_ = header.ts; if (FLAGS_output_time_series) { time_series_start_ = header.ts; @@ -460,14 +466,22 @@ Status TraceAnalyzer::StartProcessing() { if (trace.type == kTraceWrite) { total_writes_++; c_time_ = trace.ts; - WriteBatch batch(trace.payload); - + Slice batch_data; + if (trace_file_version_ < 2) { + Slice tmp_data(trace.payload); + batch_data = tmp_data; + } else { + WritePayload w_payload; + TracerHelper::DecodeWritePayload(&trace, &w_payload); + batch_data = w_payload.write_batch_data; + } // Note that, if the write happens in a transaction, // 'Write' will be called twice, one for Prepare, one for // Commit. Thus, in the trace, for the same WriteBatch, there // will be two reords if it is in a transaction. Here, we only // process the reord that is committed. If write is non-transaction, // HasBeginPrepare()==false, so we process it normally. + WriteBatch batch(batch_data.ToString()); if (batch.HasBeginPrepare() && !batch.HasCommit()) { continue; } @@ -478,22 +492,34 @@ Status TraceAnalyzer::StartProcessing() { return s; } } else if (trace.type == kTraceGet) { - uint32_t cf_id = 0; - Slice key; - DecodeCFAndKeyFromString(trace.payload, &cf_id, &key); + GetPayload get_payload; + get_payload.get_key = 0; + if (trace_file_version_ < 2) { + DecodeCFAndKeyFromString(trace.payload, &get_payload.cf_id, + &get_payload.get_key); + } else { + TracerHelper::DecodeGetPayload(&trace, &get_payload); + } total_gets_++; - s = HandleGet(cf_id, key.ToString(), trace.ts, 1); + s = HandleGet(get_payload.cf_id, get_payload.get_key.ToString(), trace.ts, + 1); if (!s.ok()) { fprintf(stderr, "Cannot process the get in the trace\n"); return s; } } else if (trace.type == kTraceIteratorSeek || trace.type == kTraceIteratorSeekForPrev) { - uint32_t cf_id = 0; - Slice key; - DecodeCFAndKeyFromString(trace.payload, &cf_id, &key); - s = HandleIter(cf_id, key.ToString(), trace.ts, trace.type); + IterPayload iter_payload; + iter_payload.cf_id = 0; + if (trace_file_version_ < 2) { + DecodeCFAndKeyFromString(trace.payload, &iter_payload.cf_id, + &iter_payload.iter_key); + } else { + TracerHelper::DecodeIterPayload(&trace, &iter_payload); + } + s = HandleIter(iter_payload.cf_id, iter_payload.iter_key.ToString(), + trace.ts, trace.type); if (!s.ok()) { fprintf(stderr, "Cannot process the iterator in the trace\n"); return s; diff --git a/tools/trace_analyzer_tool.h b/tools/trace_analyzer_tool.h index 82210c1c7..865296d79 100644 --- a/tools/trace_analyzer_tool.h +++ b/tools/trace_analyzer_tool.h @@ -249,6 +249,9 @@ class TraceAnalyzer { Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats); Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit); Status MakeStatisticQPS(); + // Set the default trace file version as version 0.2 + int trace_file_version_; + int db_version_; }; // write bach handler to be used for WriteBache iterator diff --git a/trace_replay/trace_replay.cc b/trace_replay/trace_replay.cc index 24344c75f..37b9a9416 100644 --- a/trace_replay/trace_replay.cc +++ b/trace_replay/trace_replay.cc @@ -25,11 +25,6 @@ namespace ROCKSDB_NAMESPACE { const std::string kTraceMagic = "feedcafedeadbeef"; namespace { -void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) { - PutFixed32(dst, cf_id); - PutLengthPrefixedSlice(dst, key); -} - void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) { Slice buf(buffer); GetFixed32(&buf, cf_id); @@ -37,6 +32,54 @@ void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) { } } // namespace +Status TracerHelper::ParseVersionStr(std::string& v_string, int* v_num) { + if (v_string.find_first_of('.') == std::string::npos || + v_string.find_first_of('.') != v_string.find_last_of('.')) { + return Status::Corruption( + "Corrupted trace file. Incorrect version format."); + } + int tmp_num = 0; + for (int i = 0; i < static_cast(v_string.size()); i++) { + if (v_string[i] == '.') { + continue; + } else if (isdigit(v_string[i])) { + tmp_num = tmp_num * 10 + (v_string[i] - '0'); + } else { + return Status::Corruption( + "Corrupted trace file. Incorrect version format"); + } + } + *v_num = tmp_num; + return Status::OK(); +} + +Status TracerHelper::ParseTraceHeader(const Trace& header, int* trace_version, + int* db_version) { + std::vector s_vec; + int begin = 0, end; + for (int i = 0; i < 3; i++) { + assert(header.payload.find("\t", begin) != std::string::npos); + end = static_cast(header.payload.find("\t", begin)); + s_vec.push_back(header.payload.substr(begin, end - begin)); + begin = end + 1; + } + + std::string t_v_str, db_v_str; + assert(s_vec.size() == 3); + assert(s_vec[1].find("Trace Version: ") != std::string::npos); + t_v_str = s_vec[1].substr(15); + assert(s_vec[2].find("RocksDB Version: ") != std::string::npos); + db_v_str = s_vec[2].substr(17); + + Status s; + s = ParseVersionStr(t_v_str, trace_version); + if (s != Status::OK()) { + return s; + } + s = ParseVersionStr(db_v_str, db_version); + return s; +} + void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) { assert(encoded_trace); PutFixed64(encoded_trace, trace.ts); @@ -61,6 +104,87 @@ Status TracerHelper::DecodeTrace(const std::string& encoded_trace, return Status::OK(); } +bool TracerHelper::SetPayloadMap(uint64_t& payload_map, + const TracePayloadType payload_type) { + uint64_t old_state = payload_map; + uint64_t tmp = 1; + payload_map |= (tmp << payload_type); + return old_state != payload_map; +} + +void TracerHelper::DecodeWritePayload(Trace* trace, + WritePayload* write_payload) { + assert(write_payload != nullptr); + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = static_cast(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kWriteBatchData: + GetLengthPrefixedSlice(&buf, &(write_payload->write_batch_data)); + break; + default: + assert(false); + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } +} + +void TracerHelper::DecodeGetPayload(Trace* trace, GetPayload* get_payload) { + assert(get_payload != nullptr); + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = static_cast(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kGetCFID: + GetFixed32(&buf, &(get_payload->cf_id)); + break; + case TracePayloadType::kGetKey: + GetLengthPrefixedSlice(&buf, &(get_payload->get_key)); + break; + default: + assert(false); + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } +} + +void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) { + assert(iter_payload != nullptr); + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = static_cast(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kIterCFID: + GetFixed32(&buf, &(iter_payload->cf_id)); + break; + case TracePayloadType::kIterKey: + GetLengthPrefixedSlice(&buf, &(iter_payload->iter_key)); + break; + case TracePayloadType::kIterLowerBound: + GetLengthPrefixedSlice(&buf, &(iter_payload->lower_bound)); + break; + case TracePayloadType::kIterUpperBound: + GetLengthPrefixedSlice(&buf, &(iter_payload->upper_bound)); + break; + default: + assert(false); + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } +} + Tracer::Tracer(const std::shared_ptr& clock, const TraceOptions& trace_options, std::unique_ptr&& trace_writer) @@ -82,7 +206,10 @@ Status Tracer::Write(WriteBatch* write_batch) { Trace trace; trace.ts = clock_->NowMicros(); trace.type = trace_type; - trace.payload = write_batch->Data(); + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kWriteBatchData); + PutFixed64(&trace.payload, trace.payload_map); + PutLengthPrefixedSlice(&trace.payload, Slice(write_batch->Data())); return WriteTrace(trace); } @@ -94,11 +221,19 @@ Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { Trace trace; trace.ts = clock_->NowMicros(); trace.type = trace_type; - EncodeCFAndKey(&trace.payload, column_family->GetID(), key); + // Set the payloadmap of the struct member that will be encoded in the + // payload. + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetCFID); + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kGetKey); + // Encode the Get struct members into payload. Make sure add them in order. + PutFixed64(&trace.payload, trace.payload_map); + PutFixed32(&trace.payload, column_family->GetID()); + PutLengthPrefixedSlice(&trace.payload, key); return WriteTrace(trace); } -Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { +Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, const Slice upper_bound) { TraceType trace_type = kTraceIteratorSeek; if (ShouldSkipTrace(trace_type)) { return Status::OK(); @@ -106,11 +241,35 @@ Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { Trace trace; trace.ts = clock_->NowMicros(); trace.type = trace_type; - EncodeCFAndKey(&trace.payload, cf_id, key); + // Set the payloadmap of the struct member that will be encoded in the + // payload. + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID); + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey); + if (lower_bound.size() > 0) { + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kIterLowerBound); + } + if (upper_bound.size() > 0) { + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kIterUpperBound); + } + // Encode the Iterator struct members into payload. Make sure add them in + // order. + PutFixed64(&trace.payload, trace.payload_map); + PutFixed32(&trace.payload, cf_id); + PutLengthPrefixedSlice(&trace.payload, key); + if (lower_bound.size() > 0) { + PutLengthPrefixedSlice(&trace.payload, lower_bound); + } + if (upper_bound.size() > 0) { + PutLengthPrefixedSlice(&trace.payload, upper_bound); + } return WriteTrace(trace); } -Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { +Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, + const Slice upper_bound) { TraceType trace_type = kTraceIteratorSeekForPrev; if (ShouldSkipTrace(trace_type)) { return Status::OK(); @@ -118,7 +277,29 @@ Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { Trace trace; trace.ts = clock_->NowMicros(); trace.type = trace_type; - EncodeCFAndKey(&trace.payload, cf_id, key); + // Set the payloadmap of the struct member that will be encoded in the + // payload. + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterCFID); + TracerHelper::SetPayloadMap(trace.payload_map, TracePayloadType::kIterKey); + if (lower_bound.size() > 0) { + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kIterLowerBound); + } + if (upper_bound.size() > 0) { + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kIterUpperBound); + } + // Encode the Iterator struct members into payload. Make sure add them in + // order. + PutFixed64(&trace.payload, trace.payload_map); + PutFixed32(&trace.payload, cf_id); + PutLengthPrefixedSlice(&trace.payload, key); + if (lower_bound.size() > 0) { + PutLengthPrefixedSlice(&trace.payload, lower_bound); + } + if (upper_bound.size() > 0) { + PutLengthPrefixedSlice(&trace.payload, upper_bound); + } return WriteTrace(trace); } @@ -148,7 +329,8 @@ bool Tracer::IsTraceFileOverMax() { Status Tracer::WriteHeader() { std::ostringstream s; s << kTraceMagic << "\t" - << "Trace Version: 0.1\t" + << "Trace Version: " << kTraceFileMajorVersion << "." + << kTraceFileMinorVersion << "\t" << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t" << "Format: Timestamp OpType Payload\n"; std::string header(s.str()); @@ -164,6 +346,8 @@ Status Tracer::WriteFooter() { Trace trace; trace.ts = clock_->NowMicros(); trace.type = kTraceEnd; + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kEmptyPayload); trace.payload = ""; return WriteTrace(trace); } @@ -204,10 +388,15 @@ Status Replayer::SetFastForward(uint32_t fast_forward) { Status Replayer::Replay() { Status s; Trace header; + int db_version; s = ReadHeader(&header); if (!s.ok()) { return s; } + s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version); + if (!s.ok()) { + return s; + } std::chrono::system_clock::time_point replay_epoch = std::chrono::system_clock::now(); @@ -227,55 +416,83 @@ Status Replayer::Replay() { replay_epoch + std::chrono::microseconds((trace.ts - header.ts) / fast_forward_)); if (trace.type == kTraceWrite) { - WriteBatch batch(trace.payload); - db_->Write(woptions, &batch); + if (trace_file_version_ < 2) { + WriteBatch batch(trace.payload); + db_->Write(woptions, &batch); + } else { + WritePayload w_payload; + TracerHelper::DecodeWritePayload(&trace, &w_payload); + WriteBatch batch(w_payload.write_batch_data.ToString()); + db_->Write(woptions, &batch); + } ops++; } else if (trace.type == kTraceGet) { - uint32_t cf_id = 0; - Slice key; - DecodeCFAndKey(trace.payload, &cf_id, &key); - if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { + GetPayload get_payload; + get_payload.get_key = 0; + if (trace_file_version_ < 2) { + DecodeCFAndKey(trace.payload, &get_payload.cf_id, &get_payload.get_key); + } else { + TracerHelper::DecodeGetPayload(&trace, &get_payload); + } + if (get_payload.cf_id > 0 && + cf_map_.find(get_payload.cf_id) == cf_map_.end()) { return Status::Corruption("Invalid Column Family ID."); } std::string value; - if (cf_id == 0) { - db_->Get(roptions, key, &value); + if (get_payload.cf_id == 0) { + db_->Get(roptions, get_payload.get_key, &value); } else { - db_->Get(roptions, cf_map_[cf_id], key, &value); + db_->Get(roptions, cf_map_[get_payload.cf_id], get_payload.get_key, + &value); } ops++; } else if (trace.type == kTraceIteratorSeek) { - uint32_t cf_id = 0; - Slice key; - DecodeCFAndKey(trace.payload, &cf_id, &key); - if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { + // Currently, we only support to call Seek. The Next() and Prev() is not + // supported. + IterPayload iter_payload; + iter_payload.cf_id = 0; + if (trace_file_version_ < 2) { + DecodeCFAndKey(trace.payload, &iter_payload.cf_id, + &iter_payload.iter_key); + } else { + TracerHelper::DecodeIterPayload(&trace, &iter_payload); + } + if (iter_payload.cf_id > 0 && + cf_map_.find(iter_payload.cf_id) == cf_map_.end()) { return Status::Corruption("Invalid Column Family ID."); } - if (cf_id == 0) { + if (iter_payload.cf_id == 0) { single_iter = db_->NewIterator(roptions); } else { - single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); + single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]); } - single_iter->Seek(key); + single_iter->Seek(iter_payload.iter_key); ops++; delete single_iter; } else if (trace.type == kTraceIteratorSeekForPrev) { - // Currently, only support to call the Seek() - uint32_t cf_id = 0; - Slice key; - DecodeCFAndKey(trace.payload, &cf_id, &key); - if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) { + // Currently, we only support to call SeekForPrev. The Next() and Prev() + // is not supported. + IterPayload iter_payload; + iter_payload.cf_id = 0; + if (trace_file_version_ < 2) { + DecodeCFAndKey(trace.payload, &iter_payload.cf_id, + &iter_payload.iter_key); + } else { + TracerHelper::DecodeIterPayload(&trace, &iter_payload); + } + if (iter_payload.cf_id > 0 && + cf_map_.find(iter_payload.cf_id) == cf_map_.end()) { return Status::Corruption("Invalid Column Family ID."); } - if (cf_id == 0) { + if (iter_payload.cf_id == 0) { single_iter = db_->NewIterator(roptions); } else { - single_iter = db_->NewIterator(roptions, cf_map_[cf_id]); + single_iter = db_->NewIterator(roptions, cf_map_[iter_payload.cf_id]); } - single_iter->SeekForPrev(key); + single_iter->SeekForPrev(iter_payload.iter_key); ops++; delete single_iter; } else if (trace.type == kTraceEnd) { @@ -302,11 +519,15 @@ Status Replayer::Replay() { Status Replayer::MultiThreadReplay(uint32_t threads_num) { Status s; Trace header; + int db_version; s = ReadHeader(&header); if (!s.ok()) { return s; } - + s = TracerHelper::ParseTraceHeader(header, &trace_file_version_, &db_version); + if (!s.ok()) { + return s; + } ThreadPoolImpl thread_pool; thread_pool.SetHostEnv(env_); @@ -331,6 +552,7 @@ Status Replayer::MultiThreadReplay(uint32_t threads_num) { ra->cf_map = &cf_map_; ra->woptions = woptions; ra->roptions = roptions; + ra->trace_file_version = trace_file_version_; std::this_thread::sleep_until( replay_epoch + std::chrono::microseconds( @@ -374,10 +596,15 @@ Status Replayer::MultiThreadReplay(uint32_t threads_num) { Status Replayer::ReadHeader(Trace* header) { assert(header != nullptr); - Status s = ReadTrace(header); + std::string encoded_trace; + // Read the trace head + Status s = trace_reader_->Read(&encoded_trace); if (!s.ok()) { return s; } + + s = TracerHelper::DecodeTrace(encoded_trace, header); + if (header->type != kTraceBegin) { return Status::Corruption("Corrupted trace file. Incorrect header."); } @@ -418,20 +645,26 @@ void Replayer::BGWorkGet(void* arg) { assert(ra != nullptr); auto cf_map = static_cast*>( ra->cf_map); - uint32_t cf_id = 0; - Slice key; - DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); - if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { + GetPayload get_payload; + get_payload.cf_id = 0; + if (ra->trace_file_version < 2) { + DecodeCFAndKey(ra->trace_entry.payload, &get_payload.cf_id, + &get_payload.get_key); + } else { + TracerHelper::DecodeGetPayload(&(ra->trace_entry), &get_payload); + } + if (get_payload.cf_id > 0 && + cf_map->find(get_payload.cf_id) == cf_map->end()) { return; } std::string value; - if (cf_id == 0) { - ra->db->Get(ra->roptions, key, &value); + if (get_payload.cf_id == 0) { + ra->db->Get(ra->roptions, get_payload.get_key, &value); } else { - ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value); + ra->db->Get(ra->roptions, (*cf_map)[get_payload.cf_id], get_payload.get_key, + &value); } - return; } @@ -439,8 +672,16 @@ void Replayer::BGWorkWriteBatch(void* arg) { std::unique_ptr ra( reinterpret_cast(arg)); assert(ra != nullptr); - WriteBatch batch(ra->trace_entry.payload); - ra->db->Write(ra->woptions, &batch); + + if (ra->trace_file_version < 2) { + WriteBatch batch(ra->trace_entry.payload); + ra->db->Write(ra->woptions, &batch); + } else { + WritePayload w_payload; + TracerHelper::DecodeWritePayload(&(ra->trace_entry), &w_payload); + WriteBatch batch(w_payload.write_batch_data.ToString()); + ra->db->Write(ra->woptions, &batch); + } return; } @@ -450,21 +691,28 @@ void Replayer::BGWorkIterSeek(void* arg) { assert(ra != nullptr); auto cf_map = static_cast*>( ra->cf_map); - uint32_t cf_id = 0; - Slice key; - DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); - if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { + IterPayload iter_payload; + iter_payload.cf_id = 0; + + if (ra->trace_file_version < 2) { + DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id, + &iter_payload.iter_key); + } else { + TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload); + } + if (iter_payload.cf_id > 0 && + cf_map->find(iter_payload.cf_id) == cf_map->end()) { return; } - std::string value; Iterator* single_iter = nullptr; - if (cf_id == 0) { + if (iter_payload.cf_id == 0) { single_iter = ra->db->NewIterator(ra->roptions); } else { - single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]); + single_iter = + ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]); } - single_iter->Seek(key); + single_iter->Seek(iter_payload.iter_key); delete single_iter; return; } @@ -475,21 +723,28 @@ void Replayer::BGWorkIterSeekForPrev(void* arg) { assert(ra != nullptr); auto cf_map = static_cast*>( ra->cf_map); - uint32_t cf_id = 0; - Slice key; - DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); - if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { + IterPayload iter_payload; + iter_payload.cf_id = 0; + + if (ra->trace_file_version < 2) { + DecodeCFAndKey(ra->trace_entry.payload, &iter_payload.cf_id, + &iter_payload.iter_key); + } else { + TracerHelper::DecodeIterPayload(&(ra->trace_entry), &iter_payload); + } + if (iter_payload.cf_id > 0 && + cf_map->find(iter_payload.cf_id) == cf_map->end()) { return; } - std::string value; Iterator* single_iter = nullptr; - if (cf_id == 0) { + if (iter_payload.cf_id == 0) { single_iter = ra->db->NewIterator(ra->roptions); } else { - single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]); + single_iter = + ra->db->NewIterator(ra->roptions, (*cf_map)[iter_payload.cf_id]); } - single_iter->SeekForPrev(key); + single_iter->SeekForPrev(iter_payload.iter_key); delete single_iter; return; } diff --git a/trace_replay/trace_replay.h b/trace_replay/trace_replay.h index a7f773a91..f0afcc45a 100644 --- a/trace_replay/trace_replay.h +++ b/trace_replay/trace_replay.h @@ -40,6 +40,9 @@ const unsigned int kTracePayloadLengthSize = 4; const unsigned int kTraceMetadataSize = kTraceTimestampSize + kTraceTypeSize + kTracePayloadLengthSize; +static const int kTraceFileMajorVersion = 0; +static const int kTraceFileMinorVersion = 2; + // Supported Trace types. enum TraceType : char { kTraceBegin = 1, @@ -67,22 +70,79 @@ enum TraceType : char { struct Trace { uint64_t ts; // timestamp TraceType type; + // Each bit in payload_map stores which corresponding struct member added in + // the payload. Each TraceType has its corresponding payload struct. For + // example, if bit at position 0 is set in write payload, then the write batch + // will be addedd. + uint64_t payload_map = 0; + // Each trace type has its own payload_struct, which will be serilized in the + // payload. std::string payload; void reset() { ts = 0; type = kTraceMax; + payload_map = 0; payload.clear(); } }; +enum TracePayloadType : char { + // Each member of all query payload structs should have a corresponding flag + // here. Make sure to add them sequentially in the order of it is added. + kEmptyPayload = 0, + kWriteBatchData = 1, + kGetCFID = 2, + kGetKey = 3, + kIterCFID = 4, + kIterKey = 5, + kIterLowerBound = 6, + kIterUpperBound = 7, +}; + +struct WritePayload { + Slice write_batch_data; +}; + +struct GetPayload { + uint32_t cf_id; + Slice get_key; +}; + +struct IterPayload { + uint32_t cf_id; + Slice iter_key; + Slice lower_bound; + Slice upper_bound; +}; + class TracerHelper { public: - // Encode a trace object into the given string. + // Parse the string with major and minor version only + static Status ParseVersionStr(std::string& v_string, int* v_num); + + // Parse the trace file version and db version in trace header + static Status ParseTraceHeader(const Trace& header, int* trace_version, + int* db_version); + + // Encode a version 0.1 trace object into the given string. static void EncodeTrace(const Trace& trace, std::string* encoded_trace); // Decode a string into the given trace object. static Status DecodeTrace(const std::string& encoded_trace, Trace* trace); + + // Set the payload map based on the payload type + static bool SetPayloadMap(uint64_t& payload_map, + const TracePayloadType payload_type); + + // Decode the write payload and store in WrteiPayload + static void DecodeWritePayload(Trace* trace, WritePayload* write_payload); + + // Decode the get payload and store in WrteiPayload + static void DecodeGetPayload(Trace* trace, GetPayload* get_payload); + + // Decode the iter payload and store in WrteiPayload + static void DecodeIterPayload(Trace* trace, IterPayload* iter_payload); }; // Tracer captures all RocksDB operations using a user-provided TraceWriter. @@ -102,8 +162,10 @@ class Tracer { Status Get(ColumnFamilyHandle* cfname, const Slice& key); // Trace Iterators. - Status IteratorSeek(const uint32_t& cf_id, const Slice& key); - Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key); + Status IteratorSeek(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, const Slice upper_bound); + Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key, + const Slice& lower_bound, const Slice upper_bound); // Returns true if the trace is over the configured max trace file limit. // False otherwise. @@ -186,6 +248,10 @@ class Replayer { std::unique_ptr trace_reader_; std::unordered_map cf_map_; uint32_t fast_forward_; + // When reading the trace header, the trace file version can be parsed. + // Replayer will use different decode method to get the trace content based + // on different trace file version. + int trace_file_version_; }; // The passin arg of MultiThreadRepkay for each trace record. @@ -195,6 +261,7 @@ struct ReplayerWorkerArg { std::unordered_map* cf_map; WriteOptions woptions; ReadOptions roptions; + int trace_file_version; }; } // namespace ROCKSDB_NAMESPACE