Add MultiGet to replay (#8577)

Summary:
When the trace contains the MultiGet record, with this PR, it can replay the MultiGet.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8577

Test Plan: make check and replay the real trace.

Reviewed By: anand1976

Differential Revision: D29864060

Pulled By: zhichao-cao

fbshipit-source-id: 5288d4fc9b6a3cb331de1e0c635d4e044dcb534a
main
Zhichao Cao 3 years ago committed by Facebook GitHub Bot
parent ddf439c561
commit eec79b39a6
  1. 51
      trace_replay/trace_replay.cc
  2. 4
      trace_replay/trace_replay.h

@ -607,6 +607,25 @@ Status Replayer::Replay() {
single_iter->SeekForPrev(iter_payload.iter_key); single_iter->SeekForPrev(iter_payload.iter_key);
ops++; ops++;
delete single_iter; delete single_iter;
} else if (trace.type == kTraceMultiGet) {
MultiGetPayload multiget_payload;
assert(trace_file_version_ >= 2);
TracerHelper::DecodeMultiGetPayload(&trace, &multiget_payload);
std::vector<ColumnFamilyHandle*> v_cfd;
std::vector<Slice> keys;
assert(multiget_payload.cf_ids.size() ==
multiget_payload.multiget_keys.size());
for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
assert(i < multiget_payload.cf_ids.size() &&
i < multiget_payload.multiget_keys.size());
if (cf_map_.find(multiget_payload.cf_ids[i]) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
v_cfd.push_back(cf_map_[multiget_payload.cf_ids[i]]);
keys.push_back(Slice(multiget_payload.multiget_keys[i]));
}
std::vector<std::string> values;
std::vector<Status> ss = db_->MultiGet(roptions, v_cfd, keys, &values);
} else if (trace.type == kTraceEnd) { } else if (trace.type == kTraceEnd) {
// Do nothing for now. // Do nothing for now.
// TODO: Add some validations later. // TODO: Add some validations later.
@ -685,6 +704,10 @@ Status Replayer::MultiThreadReplay(uint32_t threads_num) {
thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(), thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
nullptr, nullptr); nullptr, nullptr);
ops++; ops++;
} else if (ra->trace_entry.type == kTraceMultiGet) {
thread_pool.Schedule(&Replayer::BGWorkMultiGet, ra.release(), nullptr,
nullptr);
ops++;
} else if (ra->trace_entry.type == kTraceEnd) { } else if (ra->trace_entry.type == kTraceEnd) {
// Do nothing for now. // Do nothing for now.
// TODO: Add some validations later. // TODO: Add some validations later.
@ -861,4 +884,32 @@ void Replayer::BGWorkIterSeekForPrev(void* arg) {
return; return;
} }
void Replayer::BGWorkMultiGet(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
ra->cf_map);
MultiGetPayload multiget_payload;
if (ra->trace_file_version < 2) {
return;
}
TracerHelper::DecodeMultiGetPayload(&(ra->trace_entry), &multiget_payload);
std::vector<ColumnFamilyHandle*> v_cfd;
std::vector<Slice> keys;
if (multiget_payload.cf_ids.size() != multiget_payload.multiget_keys.size()) {
return;
}
for (size_t i = 0; i < multiget_payload.cf_ids.size(); i++) {
if (cf_map->find(multiget_payload.cf_ids[i]) == cf_map->end()) {
return;
}
v_cfd.push_back((*cf_map)[multiget_payload.cf_ids[i]]);
keys.push_back(Slice(multiget_payload.multiget_keys[i]));
}
std::vector<std::string> values;
std::vector<Status> ss = ra->db->MultiGet(ra->roptions, v_cfd, keys, &values);
return;
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -268,6 +268,10 @@ class Replayer {
// (SeekForPrev) based on the trace records. // (SeekForPrev) based on the trace records.
static void BGWorkIterSeekForPrev(void* arg); static void BGWorkIterSeekForPrev(void* arg);
// The background function for MultiThreadReplay to execute MultiGet based on
// the trace records
static void BGWorkMultiGet(void* arg);
DBImpl* db_; DBImpl* db_;
Env* env_; Env* env_;
std::unique_ptr<TraceReader> trace_reader_; std::unique_ptr<TraceReader> trace_reader_;

Loading…
Cancel
Save