From 82a70e14703288c2c5799eedeca946d23fc918b6 Mon Sep 17 00:00:00 2001 From: Zhichao Cao Date: Fri, 18 Jun 2021 15:02:59 -0700 Subject: [PATCH] Trace MultiGet Keys and CF_IDs to the trace file (#8421) Summary: Tracing the MultiGet information including timestamp, keys, and CF_IDs to the trace file for analyzing and replay. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8421 Test Plan: make check, add test to trace_analyzer_test Reviewed By: anand1976 Differential Revision: D29221195 Pulled By: zhichao-cao fbshipit-source-id: 30c677d6c39ab31ef4bbdf7e0d1fa1fd79f295ff --- db/db_impl/db_impl.cc | 29 +++++++++ tools/trace_analyzer_test.cc | 20 ++++++- tools/trace_analyzer_tool.cc | 4 ++ trace_replay/trace_replay.cc | 112 +++++++++++++++++++++++++++++++++++ trace_replay/trace_replay.h | 26 ++++++++ 5 files changed, 190 insertions(+), 1 deletion(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 87d919ac2..a4af7daad 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1880,6 +1880,16 @@ std::vector DBImpl::MultiGet( } #endif // NDEBUG + if (tracer_) { + // TODO: This mutex should be removed later, to improve performance when + // tracing is enabled. + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + // TODO: maybe handle the tracing status? + tracer_->MultiGet(column_family, keys).PermitUncheckedError(); + } + } + SequenceNumber consistent_seqnum; std::unordered_map multiget_cf_data( @@ -2191,6 +2201,16 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, } #endif // NDEBUG + if (tracer_) { + // TODO: This mutex should be removed later, to improve performance when + // tracing is enabled. + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + // TODO: maybe handle the tracing status? + tracer_->MultiGet(num_keys, column_families, keys).PermitUncheckedError(); + } + } + autovector key_context; autovector sorted_keys; sorted_keys.resize(num_keys); @@ -2353,6 +2373,15 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const Slice* keys, PinnableSlice* values, std::string* timestamps, Status* statuses, const bool sorted_input) { + if (tracer_) { + // TODO: This mutex should be removed later, to improve performance when + // tracing is enabled. + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + // TODO: maybe handle the tracing status? + tracer_->MultiGet(num_keys, column_family, keys).PermitUncheckedError(); + } + } autovector key_context; autovector sorted_keys; sorted_keys.resize(num_keys); diff --git a/tools/trace_analyzer_test.cc b/tools/trace_analyzer_test.cc index e016ea591..3c89a9883 100644 --- a/tools/trace_analyzer_test.cc +++ b/tools/trace_analyzer_test.cc @@ -81,8 +81,26 @@ class TraceAnalyzerTest : public testing::Test { ASSERT_OK(batch.SingleDelete("d")); ASSERT_OK(batch.DeleteRange("e", "f")); ASSERT_OK(db_->Write(wo, &batch)); - + std::vector keys; + keys.push_back("a"); + keys.push_back("b"); + keys.push_back("df"); + keys.push_back("gege"); + keys.push_back("hjhjhj"); + std::vector values; + std::vector ss = db_->MultiGet(ro, keys, &values); + ASSERT_GE(ss.size(), 0); + ASSERT_OK(ss[0]); + ASSERT_NOK(ss[2]); + std::vector cfs(2, db_->DefaultColumnFamily()); + std::vector values2(keys.size()); + db_->MultiGet(ro, 2, cfs.data(), keys.data(), values2.data(), ss.data(), + false); + ASSERT_OK(ss[0]); + db_->MultiGet(ro, db_->DefaultColumnFamily(), 2, keys.data() + 3, + values2.data(), ss.data(), false); ASSERT_OK(db_->Get(ro, "a", &value)); + single_iter = db_->NewIterator(ro); single_iter->Seek("a"); ASSERT_OK(single_iter->status()); diff --git a/tools/trace_analyzer_tool.cc b/tools/trace_analyzer_tool.cc index 4d8b3b4a6..c80abdc02 100644 --- a/tools/trace_analyzer_tool.cc +++ b/tools/trace_analyzer_tool.cc @@ -524,6 +524,10 @@ Status TraceAnalyzer::StartProcessing() { fprintf(stderr, "Cannot process the iterator in the trace\n"); return s; } + } else if (trace.type == kTraceMultiGet) { + MultiGetPayload multiget_payload; + assert(trace_file_version_ >= 2); + TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload); } else if (trace.type == kTraceEnd) { break; } diff --git a/trace_replay/trace_replay.cc b/trace_replay/trace_replay.cc index 785e1fa56..5fd529568 100644 --- a/trace_replay/trace_replay.cc +++ b/trace_replay/trace_replay.cc @@ -185,6 +185,47 @@ void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) { } } +void TracerHelper::DecodeMultiGetPayload(Trace* trace, + MultiGetPayload* multiget_payload) { + assert(multiget_payload != nullptr); + Slice cfids_payload; + Slice keys_payload; + Slice buf(trace->payload); + GetFixed64(&buf, &trace->payload_map); + int64_t payload_map = static_cast(trace->payload_map); + while (payload_map) { + // Find the rightmost set bit. + uint32_t set_pos = static_cast(log2(payload_map & -payload_map)); + switch (set_pos) { + case TracePayloadType::kMultiGetSize: + GetFixed32(&buf, &(multiget_payload->multiget_size)); + break; + case TracePayloadType::kMultiGetCFIDs: + GetLengthPrefixedSlice(&buf, &cfids_payload); + break; + case TracePayloadType::kMultiGetKeys: + GetLengthPrefixedSlice(&buf, &keys_payload); + break; + default: + assert(false); + } + // unset the rightmost bit. + payload_map &= (payload_map - 1); + } + + // Decode the cfids_payload and keys_payload + multiget_payload->cf_ids.reserve(multiget_payload->multiget_size); + multiget_payload->multiget_keys.reserve(multiget_payload->multiget_size); + for (uint32_t i = 0; i < multiget_payload->multiget_size; i++) { + uint32_t tmp_cfid; + Slice tmp_key; + GetFixed32(&cfids_payload, &tmp_cfid); + GetLengthPrefixedSlice(&keys_payload, &tmp_key); + multiget_payload->cf_ids.push_back(tmp_cfid); + multiget_payload->multiget_keys.push_back(tmp_key.ToString()); + } +} + Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options, std::unique_ptr&& trace_writer) : clock_(clock), @@ -302,6 +343,77 @@ Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key, return WriteTrace(trace); } +Status Tracer::MultiGet(const size_t num_keys, + ColumnFamilyHandle** column_families, + const Slice* keys) { + if (num_keys == 0) { + return Status::OK(); + } + std::vector v_column_families; + std::vector v_keys; + v_column_families.resize(num_keys); + v_keys.resize(num_keys); + for (size_t i = 0; i < num_keys; i++) { + v_column_families[i] = column_families[i]; + v_keys[i] = keys[i]; + } + return MultiGet(v_column_families, v_keys); +} + +Status Tracer::MultiGet(const size_t num_keys, + ColumnFamilyHandle* column_family, const Slice* keys) { + if (num_keys == 0) { + return Status::OK(); + } + std::vector column_families; + std::vector v_keys; + column_families.resize(num_keys); + v_keys.resize(num_keys); + for (size_t i = 0; i < num_keys; i++) { + column_families[i] = column_family; + v_keys[i] = keys[i]; + } + return MultiGet(column_families, v_keys); +} + +Status Tracer::MultiGet(const std::vector& column_families, + const std::vector& keys) { + if (column_families.size() != keys.size()) { + return Status::Corruption("the CFs size and keys size does not match!"); + } + TraceType trace_type = kTraceMultiGet; + if (ShouldSkipTrace(trace_type)) { + return Status::OK(); + } + uint32_t multiget_size = static_cast(keys.size()); + Trace trace; + trace.ts = clock_->NowMicros(); + trace.type = trace_type; + // Set the payloadmap of the struct member that will be encoded in the + // payload. + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kMultiGetSize); + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kMultiGetCFIDs); + TracerHelper::SetPayloadMap(trace.payload_map, + TracePayloadType::kMultiGetKeys); + // Encode the CFIDs inorder + std::string cfids_payload; + std::string keys_payload; + for (uint32_t i = 0; i < multiget_size; i++) { + assert(i < column_families.size()); + assert(i < keys.size()); + PutFixed32(&cfids_payload, column_families[i]->GetID()); + PutLengthPrefixedSlice(&keys_payload, keys[i]); + } + // Encode the Get struct members into payload. Make sure add them in order. + PutFixed64(&trace.payload, trace.payload_map); + PutFixed32(&trace.payload, multiget_size); + PutLengthPrefixedSlice(&trace.payload, cfids_payload); + PutLengthPrefixedSlice(&trace.payload, keys_payload); + return WriteTrace(trace); +} + bool Tracer::ShouldSkipTrace(const TraceType& trace_type) { if (IsTraceFileOverMax()) { return true; diff --git a/trace_replay/trace_replay.h b/trace_replay/trace_replay.h index 3ae3e8e5b..d3ad2d799 100644 --- a/trace_replay/trace_replay.h +++ b/trace_replay/trace_replay.h @@ -59,6 +59,8 @@ enum TraceType : char { kBlockTraceRangeDeletionBlock = 11, // For IOTracing. kIOTracer = 12, + // For query tracing + kTraceMultiGet = 13, // All trace types should be added before kTraceMax kTraceMax, }; @@ -98,6 +100,9 @@ enum TracePayloadType : char { kIterKey = 5, kIterLowerBound = 6, kIterUpperBound = 7, + kMultiGetSize = 8, + kMultiGetCFIDs = 9, + kMultiGetKeys = 10, }; struct WritePayload { @@ -116,6 +121,12 @@ struct IterPayload { Slice upper_bound; }; +struct MultiGetPayload { + uint32_t multiget_size; + std::vector cf_ids; + std::vector multiget_keys; +}; + class TracerHelper { public: // Parse the string with major and minor version only @@ -143,6 +154,10 @@ class TracerHelper { // Decode the iter payload and store in WrteiPayload static void DecodeIterPayload(Trace* trace, IterPayload* iter_payload); + + // Decode the multiget payload and store in MultiGetPayload + static void DecodeMultiGetPayload(Trace* trace, + MultiGetPayload* multiget_payload); }; // Tracer captures all RocksDB operations using a user-provided TraceWriter. @@ -166,6 +181,17 @@ class Tracer { Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key, const Slice& lower_bound, const Slice upper_bound); + // Trace MultiGet + + Status MultiGet(const size_t num_keys, ColumnFamilyHandle** column_families, + const Slice* keys); + + Status MultiGet(const size_t num_keys, ColumnFamilyHandle* column_family, + const Slice* keys); + + Status MultiGet(const std::vector& column_family, + const std::vector& keys); + // Returns true if the trace is over the configured max trace file limit. // False otherwise. bool IsTraceFileOverMax();