You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rocksdb/trace_replay/trace_replay.cc

498 lines
14 KiB

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "trace_replay/trace_replay.h"
#include <chrono>
#include <sstream>
#include <thread>
#include "db/db_impl/db_impl.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/write_batch.h"
#include "util/coding.h"
#include "util/string_util.h"
#include "util/threadpool_imp.h"
namespace ROCKSDB_NAMESPACE {
FIX #3278: Move global const object definitions from .h to .cc (#4691) Summary: Summary We should declare constants in headers and define them in source files. But this commit is only aimed at compound types. I don't know if it is necessary to do the same thing to fundamental types. I used this command to find all of the constant definitions in header files. `find . -name "*.h" | xargs grep -e "^const .*=.*"` And here is what I found: ``` ./db/version_edit.h:const uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF; ./include/rocksdb/env.h:const size_t kDefaultPageSize = 4 * 1024; ./include/rocksdb/statistics.h:const std::vector<std::pair<Tickers, std::string>> TickersNameMap = { ./include/rocksdb/statistics.h:const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = { ./include/rocksdb/table.h:const uint32_t kPlainTableVariableLength = 0; ./include/rocksdb/utilities/transaction_db.h:const uint32_t kInitialMaxDeadlocks = 5; ./port/port_posix.h:const uint32_t kMaxUint32 = std::numeric_limits<uint32_t>::max(); ./port/port_posix.h:const int kMaxInt32 = std::numeric_limits<int32_t>::max(); ./port/port_posix.h:const uint64_t kMaxUint64 = std::numeric_limits<uint64_t>::max(); ./port/port_posix.h:const int64_t kMaxInt64 = std::numeric_limits<int64_t>::max(); ./port/port_posix.h:const size_t kMaxSizet = std::numeric_limits<size_t>::max(); ./port/win/port_win.h:const uint32_t kMaxUint32 = UINT32_MAX; ./port/win/port_win.h:const int kMaxInt32 = INT32_MAX; ./port/win/port_win.h:const int64_t kMaxInt64 = INT64_MAX; ./port/win/port_win.h:const uint64_t kMaxUint64 = UINT64_MAX; ./port/win/port_win.h:const size_t kMaxSizet = UINT64_MAX; ./port/win/port_win.h:const size_t kMaxSizet = UINT_MAX; ./port/win/port_win.h:const uint32_t kMaxUint32 = std::numeric_limits<uint32_t>::max(); ./port/win/port_win.h:const int kMaxInt32 = std::numeric_limits<int>::max(); ./port/win/port_win.h:const uint64_t kMaxUint64 = std::numeric_limits<uint64_t>::max(); ./port/win/port_win.h:const int64_t kMaxInt64 = std::numeric_limits<int64_t>::max(); ./port/win/port_win.h:const size_t kMaxSizet = std::numeric_limits<size_t>::max(); ./port/win/port_win.h:const bool kLittleEndian = true; ./table/cuckoo_table_factory.h:const uint32_t kCuckooMurmurSeedMultiplier = 816922183; ./table/data_block_hash_index.h:const uint8_t kNoEntry = 255; ./table/data_block_hash_index.h:const uint8_t kCollision = 254; ./table/data_block_hash_index.h:const uint8_t kMaxRestartSupportedByHashIndex = 253; ./table/data_block_hash_index.h:const size_t kMaxBlockSizeSupportedByHashIndex = 1u << 16; ./table/data_block_hash_index.h:const double kDefaultUtilRatio = 0.75; ./table/filter_block.h:const uint64_t kNotValid = ULLONG_MAX; ./table/format.h:const int kMagicNumberLengthByte = 8; ./third-party/fbson/FbsonJsonParser.h:const char* const kJsonDelim = " ,]}\t\r\n"; ./third-party/fbson/FbsonJsonParser.h:const char* const kWhiteSpace = " \t\n\r"; ./third-party/gtest-1.7.0/fused-src/gtest/gtest.h:const BiggestInt kMaxBiggestInt = ./third-party/gtest-1.7.0/fused-src/gtest/gtest.h:const char kDeathTestStyleFlag[] = "death_test_style"; ./third-party/gtest-1.7.0/fused-src/gtest/gtest.h:const char kDeathTestUseFork[] = "death_test_use_fork"; ./third-party/gtest-1.7.0/fused-src/gtest/gtest.h:const char kInternalRunDeathTestFlag[] = "internal_run_death_test"; ./third-party/gtest-1.7.0/fused-src/gtest/gtest.h:const char* pets[] = {"cat", "dog"}; ./third-party/gtest-1.7.0/fused-src/gtest/gtest.h:const size_t kProtobufOneLinerMaxLength = 50; ./third-party/gtest-1.7.0/fused-src/gtest/gtest.h:const int kMaxStackTraceDepth = 100; ./third-party/gtest-1.7.0/fused-src/gtest/gtest.h:const T* WithParamInterface<T>::parameter_ = NULL; ./util/coding.h:const unsigned int kMaxVarint64Length = 10; ./util/filename.h:const size_t kFormatFileNumberBufSize = 38; ./util/testutil.h:const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined = -1); ./util/trace_replay.h:const std::string kTraceMagic = "feedcafedeadbeef"; ./util/trace_replay.h:const unsigned int kTraceTimestampSize = 8; ./util/trace_replay.h:const unsigned int kTraceTypeSize = 1; ./util/trace_replay.h:const unsigned int kTracePayloadLengthSize = 4; ./util/trace_replay.h:const unsigned int kTraceMetadataSize = ./utilities/cassandra/serialize.h:const int64_t kCharMask = 0xFFLL; ./utilities/cassandra/serialize.h:const int32_t kBitsPerByte = 8; ``` And these 3 lines are related to this commit: ``` ./include/rocksdb/statistics.h:const std::vector<std::pair<Tickers, std::string>> TickersNameMap = { ./include/rocksdb/statistics.h:const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = { ./util/trace_replay.h:const std::string kTraceMagic = "feedcafedeadbeef"; ``` Any comments would be appreciated. Thanks. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4691 Differential Revision: D13208049 Pulled By: ajkr fbshipit-source-id: e5ee55fdaec5447fc5798c6721e2821e7cdc0d5b
6 years ago
const std::string kTraceMagic = "feedcafedeadbeef";
namespace {
void EncodeCFAndKey(std::string* dst, uint32_t cf_id, const Slice& key) {
PutFixed32(dst, cf_id);
PutLengthPrefixedSlice(dst, key);
}
void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
Slice buf(buffer);
GetFixed32(&buf, cf_id);
GetLengthPrefixedSlice(&buf, key);
}
} // namespace
void TracerHelper::EncodeTrace(const Trace& trace, std::string* encoded_trace) {
assert(encoded_trace);
PutFixed64(encoded_trace, trace.ts);
encoded_trace->push_back(trace.type);
PutFixed32(encoded_trace, static_cast<uint32_t>(trace.payload.size()));
encoded_trace->append(trace.payload);
}
Status TracerHelper::DecodeTrace(const std::string& encoded_trace,
Trace* trace) {
assert(trace != nullptr);
Slice enc_slice = Slice(encoded_trace);
if (!GetFixed64(&enc_slice, &trace->ts)) {
return Status::Incomplete("Decode trace string failed");
}
if (enc_slice.size() < kTraceTypeSize + kTracePayloadLengthSize) {
return Status::Incomplete("Decode trace string failed");
}
trace->type = static_cast<TraceType>(enc_slice[0]);
enc_slice.remove_prefix(kTraceTypeSize + kTracePayloadLengthSize);
trace->payload = enc_slice.ToString();
return Status::OK();
}
Tracer::Tracer(const std::shared_ptr<SystemClock>& clock,
const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer)
: clock_(clock),
trace_options_(trace_options),
trace_writer_(std::move(trace_writer)),
trace_request_count_(0) {
// TODO: What if this fails?
WriteHeader().PermitUncheckedError();
}
Tracer::~Tracer() { trace_writer_.reset(); }
Status Tracer::Write(WriteBatch* write_batch) {
TraceType trace_type = kTraceWrite;
if (ShouldSkipTrace(trace_type)) {
return Status::OK();
}
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = trace_type;
trace.payload = write_batch->Data();
return WriteTrace(trace);
}
Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
TraceType trace_type = kTraceGet;
if (ShouldSkipTrace(trace_type)) {
return Status::OK();
}
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = trace_type;
EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
return WriteTrace(trace);
}
Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
TraceType trace_type = kTraceIteratorSeek;
if (ShouldSkipTrace(trace_type)) {
return Status::OK();
}
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = trace_type;
EncodeCFAndKey(&trace.payload, cf_id, key);
return WriteTrace(trace);
}
Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
TraceType trace_type = kTraceIteratorSeekForPrev;
if (ShouldSkipTrace(trace_type)) {
return Status::OK();
}
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = trace_type;
EncodeCFAndKey(&trace.payload, cf_id, key);
return WriteTrace(trace);
}
bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
if (IsTraceFileOverMax()) {
return true;
}
if ((trace_options_.filter & kTraceFilterGet
&& trace_type == kTraceGet)
|| (trace_options_.filter & kTraceFilterWrite
&& trace_type == kTraceWrite)) {
return true;
}
++trace_request_count_;
if (trace_request_count_ < trace_options_.sampling_frequency) {
return true;
}
trace_request_count_ = 0;
return false;
}
bool Tracer::IsTraceFileOverMax() {
uint64_t trace_file_size = trace_writer_->GetFileSize();
return (trace_file_size > trace_options_.max_trace_file_size);
}
Status Tracer::WriteHeader() {
std::ostringstream s;
s << kTraceMagic << "\t"
<< "Trace Version: 0.1\t"
<< "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
<< "Format: Timestamp OpType Payload\n";
std::string header(s.str());
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = kTraceBegin;
trace.payload = header;
return WriteTrace(trace);
}
Status Tracer::WriteFooter() {
Trace trace;
trace.ts = clock_->NowMicros();
trace.type = kTraceEnd;
trace.payload = "";
return WriteTrace(trace);
}
Status Tracer::WriteTrace(const Trace& trace) {
std::string encoded_trace;
TracerHelper::EncodeTrace(trace, &encoded_trace);
return trace_writer_->Write(Slice(encoded_trace));
}
Status Tracer::Close() { return WriteFooter(); }
Replayer::Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles,
std::unique_ptr<TraceReader>&& reader)
: trace_reader_(std::move(reader)) {
assert(db != nullptr);
db_ = static_cast<DBImpl*>(db->GetRootDB());
env_ = Env::Default();
for (ColumnFamilyHandle* cfh : handles) {
cf_map_[cfh->GetID()] = cfh;
}
fast_forward_ = 1;
}
Replayer::~Replayer() { trace_reader_.reset(); }
Status Replayer::SetFastForward(uint32_t fast_forward) {
Status s;
if (fast_forward < 1) {
s = Status::InvalidArgument("Wrong fast forward speed!");
} else {
fast_forward_ = fast_forward;
s = Status::OK();
}
return s;
}
Status Replayer::Replay() {
Status s;
Trace header;
s = ReadHeader(&header);
if (!s.ok()) {
return s;
}
std::chrono::system_clock::time_point replay_epoch =
std::chrono::system_clock::now();
WriteOptions woptions;
ReadOptions roptions;
Trace trace;
uint64_t ops = 0;
Iterator* single_iter = nullptr;
while (s.ok()) {
trace.reset();
s = ReadTrace(&trace);
if (!s.ok()) {
break;
}
std::this_thread::sleep_until(
replay_epoch +
std::chrono::microseconds((trace.ts - header.ts) / fast_forward_));
if (trace.type == kTraceWrite) {
WriteBatch batch(trace.payload);
db_->Write(woptions, &batch);
ops++;
} else if (trace.type == kTraceGet) {
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(trace.payload, &cf_id, &key);
if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
std::string value;
if (cf_id == 0) {
db_->Get(roptions, key, &value);
} else {
db_->Get(roptions, cf_map_[cf_id], key, &value);
}
ops++;
} else if (trace.type == kTraceIteratorSeek) {
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(trace.payload, &cf_id, &key);
if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
if (cf_id == 0) {
single_iter = db_->NewIterator(roptions);
} else {
single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
}
single_iter->Seek(key);
ops++;
delete single_iter;
} else if (trace.type == kTraceIteratorSeekForPrev) {
// Currently, only support to call the Seek()
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(trace.payload, &cf_id, &key);
if (cf_id > 0 && cf_map_.find(cf_id) == cf_map_.end()) {
return Status::Corruption("Invalid Column Family ID.");
}
if (cf_id == 0) {
single_iter = db_->NewIterator(roptions);
} else {
single_iter = db_->NewIterator(roptions, cf_map_[cf_id]);
}
single_iter->SeekForPrev(key);
ops++;
delete single_iter;
} else if (trace.type == kTraceEnd) {
// Do nothing for now.
// TODO: Add some validations later.
break;
}
}
if (s.IsIncomplete()) {
// Reaching eof returns Incomplete status at the moment.
// Could happen when killing a process without calling EndTrace() API.
// TODO: Add better error handling.
return Status::OK();
}
return s;
}
// The trace can be replayed with multithread by configurnge the number of
// threads in the thread pool. Trace records are read from the trace file
// sequentially and the corresponding queries are scheduled in the task
// queue based on the timestamp. Currently, we support Write_batch (Put,
// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev).
Status Replayer::MultiThreadReplay(uint32_t threads_num) {
Status s;
Trace header;
s = ReadHeader(&header);
if (!s.ok()) {
return s;
}
ThreadPoolImpl thread_pool;
thread_pool.SetHostEnv(env_);
if (threads_num > 1) {
thread_pool.SetBackgroundThreads(static_cast<int>(threads_num));
} else {
thread_pool.SetBackgroundThreads(1);
}
std::chrono::system_clock::time_point replay_epoch =
std::chrono::system_clock::now();
WriteOptions woptions;
ReadOptions roptions;
uint64_t ops = 0;
while (s.ok()) {
std::unique_ptr<ReplayerWorkerArg> ra(new ReplayerWorkerArg);
ra->db = db_;
s = ReadTrace(&(ra->trace_entry));
if (!s.ok()) {
break;
}
ra->cf_map = &cf_map_;
ra->woptions = woptions;
ra->roptions = roptions;
std::this_thread::sleep_until(
replay_epoch + std::chrono::microseconds(
(ra->trace_entry.ts - header.ts) / fast_forward_));
if (ra->trace_entry.type == kTraceWrite) {
thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra.release(), nullptr,
nullptr);
ops++;
} else if (ra->trace_entry.type == kTraceGet) {
thread_pool.Schedule(&Replayer::BGWorkGet, ra.release(), nullptr,
nullptr);
ops++;
} else if (ra->trace_entry.type == kTraceIteratorSeek) {
thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra.release(), nullptr,
nullptr);
ops++;
} else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) {
thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra.release(),
nullptr, nullptr);
ops++;
} else if (ra->trace_entry.type == kTraceEnd) {
// Do nothing for now.
// TODO: Add some validations later.
break;
} else {
// Other trace entry types that are not implemented for replay.
// To finish the replay, we continue the process.
continue;
}
}
if (s.IsIncomplete()) {
// Reaching eof returns Incomplete status at the moment.
// Could happen when killing a process without calling EndTrace() API.
// TODO: Add better error handling.
s = Status::OK();
}
thread_pool.JoinAllThreads();
return s;
}
Status Replayer::ReadHeader(Trace* header) {
assert(header != nullptr);
Status s = ReadTrace(header);
if (!s.ok()) {
return s;
}
if (header->type != kTraceBegin) {
return Status::Corruption("Corrupted trace file. Incorrect header.");
}
if (header->payload.substr(0, kTraceMagic.length()) != kTraceMagic) {
return Status::Corruption("Corrupted trace file. Incorrect magic.");
}
return s;
}
Status Replayer::ReadFooter(Trace* footer) {
assert(footer != nullptr);
Status s = ReadTrace(footer);
if (!s.ok()) {
return s;
}
if (footer->type != kTraceEnd) {
return Status::Corruption("Corrupted trace file. Incorrect footer.");
}
// TODO: Add more validations later
return s;
}
Status Replayer::ReadTrace(Trace* trace) {
assert(trace != nullptr);
std::string encoded_trace;
Status s = trace_reader_->Read(&encoded_trace);
if (!s.ok()) {
return s;
}
return TracerHelper::DecodeTrace(encoded_trace, trace);
}
void Replayer::BGWorkGet(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
ra->cf_map);
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
return;
}
std::string value;
if (cf_id == 0) {
ra->db->Get(ra->roptions, key, &value);
} else {
ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value);
}
return;
}
void Replayer::BGWorkWriteBatch(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
WriteBatch batch(ra->trace_entry.payload);
ra->db->Write(ra->woptions, &batch);
return;
}
void Replayer::BGWorkIterSeek(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
ra->cf_map);
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
return;
}
std::string value;
Iterator* single_iter = nullptr;
if (cf_id == 0) {
single_iter = ra->db->NewIterator(ra->roptions);
} else {
single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
}
single_iter->Seek(key);
delete single_iter;
return;
}
void Replayer::BGWorkIterSeekForPrev(void* arg) {
std::unique_ptr<ReplayerWorkerArg> ra(
reinterpret_cast<ReplayerWorkerArg*>(arg));
assert(ra != nullptr);
auto cf_map = static_cast<std::unordered_map<uint32_t, ColumnFamilyHandle*>*>(
ra->cf_map);
uint32_t cf_id = 0;
Slice key;
DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key);
if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) {
return;
}
std::string value;
Iterator* single_iter = nullptr;
if (cf_id == 0) {
single_iter = ra->db->NewIterator(ra->roptions);
} else {
single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]);
}
single_iter->SeekForPrev(key);
delete single_iter;
return;
}
} // namespace ROCKSDB_NAMESPACE