diff --git a/Makefile b/Makefile index 4e555abf8..fb3c6e669 100644 --- a/Makefile +++ b/Makefile @@ -640,6 +640,7 @@ TESTS = \ timer_test \ db_with_timestamp_compaction_test \ testutil_test \ + io_tracer_test \ PARALLEL_TEST = \ backupable_db_test \ @@ -1896,6 +1897,8 @@ timer_test: util/timer_test.o $(LIBOBJECTS) $(TESTHARNESS) testutil_test: test_util/testutil_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +io_tracer_test: trace_replay/io_tracer_test.o trace_replay/io_tracer.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) #------------------------------------------------- # make install related stuff diff --git a/TARGETS b/TARGETS index b20fa774b..132e5ff53 100644 --- a/TARGETS +++ b/TARGETS @@ -294,6 +294,7 @@ cpp_library( "tools/ldb_tool.cc", "tools/sst_dump_tool.cc", "trace_replay/block_cache_tracer.cc", + "trace_replay/io_tracer.cc", "trace_replay/trace_replay.cc", "util/build_version.cc", "util/coding.cc", @@ -1155,6 +1156,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "io_tracer_test", + "trace_replay/io_tracer_test.cc", + "serial", + [], + [], + ], [ "iostats_context_test", "monitoring/iostats_context_test.cc", diff --git a/src.mk b/src.mk index 81c1039e4..218ef0ab6 100644 --- a/src.mk +++ b/src.mk @@ -176,6 +176,7 @@ LIB_SOURCES = \ tools/dump/db_dump_tool.cc \ trace_replay/trace_replay.cc \ trace_replay/block_cache_tracer.cc \ + trace_replay/io_tracer.cc \ util/build_version.cc \ util/coding.cc \ util/compaction_job_stats_impl.cc \ @@ -437,8 +438,9 @@ MAIN_SOURCES = \ tools/ldb_cmd_test.cc \ tools/reduce_levels_test.cc \ tools/sst_dump_test.cc \ - tools/trace_analyzer_test.cc \ + tools/trace_analyzer_test.cc \ trace_replay/block_cache_tracer_test.cc \ + trace_replay/io_tracer_test.cc \ util/autovector_test.cc \ util/bloom_test.cc \ util/coding_test.cc \ diff --git a/trace_replay/io_tracer.cc b/trace_replay/io_tracer.cc new file mode 100644 index 000000000..0a4218f85 --- /dev/null +++ b/trace_replay/io_tracer.cc @@ -0,0 +1,190 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/io_tracer.h" + +#include +#include +#include + +#include "db/db_impl/db_impl.h" +#include "db/dbformat.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "util/hash.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +IOTraceWriter::IOTraceWriter(Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer) + : env_(env), + trace_options_(trace_options), + trace_writer_(std::move(trace_writer)) {} + +Status IOTraceWriter::WriteIOOp(const IOTraceRecord& record) { + uint64_t trace_file_size = trace_writer_->GetFileSize(); + if (trace_file_size > trace_options_.max_trace_file_size) { + return Status::OK(); + } + Trace trace; + trace.ts = record.access_timestamp; + trace.type = record.trace_type; + Slice file_operation(record.file_operation); + PutLengthPrefixedSlice(&trace.payload, file_operation); + Slice io_status(record.io_status); + PutLengthPrefixedSlice(&trace.payload, io_status); + Slice file_name(record.file_name); + PutLengthPrefixedSlice(&trace.payload, file_name); + // TODO: add below options based on file_operation + trace.payload.push_back(record.len); + PutFixed64(&trace.payload, record.offset); + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(encoded_trace); +} + +Status IOTraceWriter::WriteHeader() { + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = TraceType::kTraceBegin; + PutLengthPrefixedSlice(&trace.payload, kTraceMagic); + PutFixed32(&trace.payload, kMajorVersion); + PutFixed32(&trace.payload, kMinorVersion); + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(encoded_trace); +} + +IOTraceReader::IOTraceReader(std::unique_ptr&& reader) + : trace_reader_(std::move(reader)) {} + +Status IOTraceReader::ReadHeader(IOTraceHeader* header) { + assert(header != nullptr); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + Trace trace; + s = TracerHelper::DecodeTrace(encoded_trace, &trace); + if (!s.ok()) { + return s; + } + header->start_time = trace.ts; + Slice enc_slice = Slice(trace.payload); + Slice magic_number; + if (!GetLengthPrefixedSlice(&enc_slice, &magic_number)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read the magic number."); + } + if (magic_number.ToString() != kTraceMagic) { + return Status::Corruption( + "Corrupted header in the trace file: Magic number does not match."); + } + if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read rocksdb major " + "version number."); + } + if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read rocksdb minor " + "version number."); + } + // We should have retrieved all information in the header. + if (!enc_slice.empty()) { + return Status::Corruption( + "Corrupted header in the trace file: The length of header is too " + "long."); + } + return Status::OK(); +} + +Status IOTraceReader::ReadIOOp(IOTraceRecord* record) { + assert(record); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + Trace trace; + s = TracerHelper::DecodeTrace(encoded_trace, &trace); + if (!s.ok()) { + return s; + } + record->access_timestamp = trace.ts; + record->trace_type = trace.type; + Slice enc_slice = Slice(trace.payload); + + Slice file_operation; + if (!GetLengthPrefixedSlice(&enc_slice, &file_operation)) { + return Status::Incomplete( + "Incomplete access record: Failed to read file operation."); + } + record->file_operation = file_operation.ToString(); + Slice io_status; + if (!GetLengthPrefixedSlice(&enc_slice, &io_status)) { + return Status::Incomplete( + "Incomplete access record: Failed to read IO status."); + } + record->io_status = io_status.ToString(); + Slice file_name; + if (!GetLengthPrefixedSlice(&enc_slice, &file_name)) { + return Status::Incomplete( + "Incomplete access record: Failed to read file name."); + } + record->file_name = file_name.ToString(); + // TODO: Read below options based on file_operation. + record->len = static_cast(enc_slice[0]); + enc_slice.remove_prefix(kCharSize); + if (enc_slice.empty()) { + return Status::Incomplete( + "Incomplete access record: Failed to read is_cache_hit."); + } + if (!GetFixed64(&enc_slice, &record->offset)) { + return Status::Incomplete( + "Incomplete access record: Failed to read offset."); + } + return Status::OK(); +} + +IOTracer::IOTracer() : tracing_enabled(false) { writer_.store(nullptr); } + +IOTracer::~IOTracer() { EndIOTrace(); } + +Status IOTracer::StartIOTrace(Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer) { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (writer_.load()) { + return Status::Busy(); + } + trace_options_ = trace_options; + writer_.store(new IOTraceWriter(env, trace_options, std::move(trace_writer))); + tracing_enabled = true; + return writer_.load()->WriteHeader(); +} + +void IOTracer::EndIOTrace() { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return; + } + delete writer_.load(); + writer_.store(nullptr); + tracing_enabled = false; +} + +Status IOTracer::WriteIOOp(const IOTraceRecord& record) { + if (!writer_.load()) { + return Status::OK(); + } + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return Status::OK(); + } + return writer_.load()->WriteIOOp(record); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/trace_replay/io_tracer.h b/trace_replay/io_tracer.h new file mode 100644 index 000000000..56f2f526e --- /dev/null +++ b/trace_replay/io_tracer.h @@ -0,0 +1,153 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "monitoring/instrumented_mutex.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "rocksdb/trace_reader_writer.h" +#include "trace_replay/trace_replay.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +const unsigned int kCharSize = 1; +} // namespace + +struct IOTraceRecord { + // Required fields for all accesses. + uint64_t access_timestamp = 0; + TraceType trace_type = TraceType::kTraceMax; + std::string file_operation; + std::string io_status; + // Required fields for read. + std::string file_name; + uint64_t offset = 0; + size_t len = 0; + uint64_t file_size = 0; + + IOTraceRecord() {} + + IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, + const std::string& _file_operation, + const std::string& _io_status, const std::string& _file_name) + : access_timestamp(_access_timestamp), + trace_type(_trace_type), + file_operation(_file_operation), + io_status(_io_status), + file_name(_file_name) {} + + IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, + const std::string& _file_operation, + const std::string& _io_status, const std::string& _file_name, + const uint64_t& _file_size) + : access_timestamp(_access_timestamp), + trace_type(_trace_type), + file_operation(_file_operation), + io_status(_io_status), + file_name(_file_name), + file_size(_file_size) {} + + IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, + const std::string& _file_operation, + const std::string& _io_status, const uint64_t& _offset = 0, + const uint64_t& _len = 0) + : access_timestamp(_access_timestamp), + trace_type(_trace_type), + file_operation(_file_operation), + io_status(_io_status), + offset(_offset), + len(_len) {} +}; + +struct IOTraceHeader { + uint64_t start_time; + uint32_t rocksdb_major_version; + uint32_t rocksdb_minor_version; +}; + +// IOTraceWriter writes IO operation as a single trace. Each trace will have a +// timestamp and type, followed by the trace payload. +class IOTraceWriter { + public: + IOTraceWriter(Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer); + ~IOTraceWriter() = default; + // No copy and move. + IOTraceWriter(const IOTraceWriter&) = delete; + IOTraceWriter& operator=(const IOTraceWriter&) = delete; + IOTraceWriter(IOTraceWriter&&) = delete; + IOTraceWriter& operator=(IOTraceWriter&&) = delete; + + Status WriteIOOp(const IOTraceRecord& record); + + // Write a trace header at the beginning, typically on initiating a trace, + // with some metadata like a magic number and RocksDB version. + Status WriteHeader(); + + private: + Env* env_; + TraceOptions trace_options_; + std::unique_ptr trace_writer_; +}; + +// IOTraceReader helps read the trace file generated by IOTraceWriter. +class IOTraceReader { + public: + explicit IOTraceReader(std::unique_ptr&& reader); + ~IOTraceReader() = default; + // No copy and move. + IOTraceReader(const IOTraceReader&) = delete; + IOTraceReader& operator=(const IOTraceReader&) = delete; + IOTraceReader(IOTraceReader&&) = delete; + IOTraceReader& operator=(IOTraceReader&&) = delete; + + Status ReadHeader(IOTraceHeader* header); + + Status ReadIOOp(IOTraceRecord* record); + + private: + std::unique_ptr trace_reader_; +}; + +// An IO tracer. It uses IOTraceWriter to write the access record to the +// trace file. +class IOTracer { + public: + IOTracer(); + ~IOTracer(); + // No copy and move. + IOTracer(const IOTracer&) = delete; + IOTracer& operator=(const IOTracer&) = delete; + IOTracer(IOTracer&&) = delete; + IOTracer& operator=(IOTracer&&) = delete; + + // Start writing IO operations to the trace_writer. + Status StartIOTrace(Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer); + + // Stop writing IO operations to the trace_writer. + void EndIOTrace(); + + bool is_tracing_enabled() const { return tracing_enabled; } + + Status WriteIOOp(const IOTraceRecord& record); + + private: + TraceOptions trace_options_; + // A mutex protects the writer_. + InstrumentedMutex trace_writer_mutex_; + std::atomic writer_; + // bool tracing_enabled is added to avoid costly operation of checking atomic + // variable 'writer_' is nullptr or not in is_tracing_enabled(). + // is_tracing_enabled() is invoked multiple times by FileSystem classes. + bool tracing_enabled; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/trace_replay/io_tracer_test.cc b/trace_replay/io_tracer_test.cc new file mode 100644 index 000000000..3ec312eda --- /dev/null +++ b/trace_replay/io_tracer_test.cc @@ -0,0 +1,207 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/io_tracer.h" + +#include "rocksdb/env.h" +#include "rocksdb/status.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +const std::string kDummyFile = "/dummy/file"; + +} // namespace + +class IOTracerTest : public testing::Test { + public: + IOTracerTest() { + test_path_ = test::PerThreadDBPath("io_tracer_test"); + env_ = ROCKSDB_NAMESPACE::Env::Default(); + EXPECT_OK(env_->CreateDir(test_path_)); + trace_file_path_ = test_path_ + "/io_trace"; + } + + ~IOTracerTest() override { + EXPECT_OK(env_->DeleteFile(trace_file_path_)); + EXPECT_OK(env_->DeleteDir(test_path_)); + } + + std::string GetFileOperation(uint32_t id) { + id = id % 4; + switch (id) { + case 0: + return "CreateDir"; + case 1: + return "GetChildren"; + case 2: + return "FileSize"; + case 3: + return "DeleteDir"; + } + assert(false); + } + + void WriteIOOp(IOTraceWriter* writer, uint32_t nrecords) { + assert(writer); + for (uint32_t i = 0; i < nrecords; i++) { + IOTraceRecord record; + record.file_operation = GetFileOperation(i); + record.io_status = IOStatus::OK().ToString(); + record.file_name = kDummyFile + std::to_string(i); + record.len = i; + record.offset = i + 20; + ASSERT_OK(writer->WriteIOOp(record)); + } + } + + IOTraceRecord GenerateAccessRecord(int i) { + IOTraceRecord record; + record.file_operation = GetFileOperation(i); + record.io_status = IOStatus::OK().ToString(); + record.file_name = kDummyFile + std::to_string(i); + record.len = i; + record.offset = i + 20; + return record; + } + + void VerifyIOOp(IOTraceReader* reader, uint32_t nrecords) { + assert(reader); + for (uint32_t i = 0; i < nrecords; i++) { + IOTraceRecord record; + ASSERT_OK(reader->ReadIOOp(&record)); + ASSERT_EQ(record.file_operation, GetFileOperation(i)); + ASSERT_EQ(record.io_status, IOStatus::OK().ToString()); + ASSERT_EQ(record.len, i); + ASSERT_EQ(record.offset, i + 20); + ASSERT_EQ(record.file_name, kDummyFile + std::to_string(i)); + } + } + + Env* env_; + EnvOptions env_options_; + std::string trace_file_path_; + std::string test_path_; +}; + +TEST_F(IOTracerTest, AtomicWrite) { + IOTraceRecord record = GenerateAccessRecord(0); + { + TraceOptions trace_opt; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTracer writer; + ASSERT_OK(writer.StartIOTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_OK(writer.WriteIOOp(record)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); + ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); + VerifyIOOp(&reader, 1); + ASSERT_NOK(reader.ReadIOOp(&record)); + } +} + +TEST_F(IOTracerTest, AtomicWriteBeforeStartTrace) { + IOTraceRecord record = GenerateAccessRecord(0); + { + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTracer writer; + // The record should not be written to the trace_file since StartIOTrace is + // not called. + ASSERT_OK(writer.WriteIOOp(record)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains nothing. + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_NOK(reader.ReadHeader(&header)); + } +} + +TEST_F(IOTracerTest, AtomicNoWriteAfterEndTrace) { + IOTraceRecord record = GenerateAccessRecord(0); + { + TraceOptions trace_opt; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTracer writer; + ASSERT_OK(writer.StartIOTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_OK(writer.WriteIOOp(record)); + writer.EndIOTrace(); + // Write the record again. This time the record should not be written since + // EndIOTrace is called. + ASSERT_OK(writer.WriteIOOp(record)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); + ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); + VerifyIOOp(&reader, 1); + ASSERT_NOK(reader.ReadIOOp(&record)); + } +} + +TEST_F(IOTracerTest, AtomicMultipleWrites) { + { + TraceOptions trace_opt; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTraceWriter writer(env_, trace_opt, std::move(trace_writer)); + ASSERT_OK(writer.WriteHeader()); + // Write 10 records + WriteIOOp(&writer, 10); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + + { + // Verify trace file is generated correctly. + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); + ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); + // Read 10 records. + VerifyIOOp(&reader, 10); + // Read one more and record and it should report error. + IOTraceRecord record; + ASSERT_NOK(reader.ReadIOOp(&record)); + } +} +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}