Add support to start and end IOTracing through DB APIs (#7203)

Summary:
1. Add support to start io tracing through DB::StartIOTrace(Env*, const TraceOptions&, std::unique_ptr<TraceWriter>&&) and end tracing through DB::EndIOTrace(). This doesn't trace DB::Open.

User side code:

//Open DB
DB::Open(options, dbname, &db);

/* Start tracing */
db->StartIOTrace(env, trace_opt, std::move(trace_writer));

/* Perform Operations */

/*End tracing*/
db->EndIOTrace();

2. Fix the build errors for Windows.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7203

Test Plan: make check -j64

Reviewed By: anand1976

Differential Revision: D22901947

Pulled By: akankshamahajan15

fbshipit-source-id: e59c0b785a802168e6f1aa028d99c224a35cb30c
main
Akanksha Mahajan 4 years ago committed by Facebook GitHub Bot
parent 41c328fe57
commit 493f425e77
  1. 2
      CMakeLists.txt
  2. 15
      db/db_impl/db_impl.cc
  3. 9
      db/db_impl/db_impl.h
  4. 10
      env/composite_env_wrapper.h
  5. 3
      env/file_system_tracer.h
  6. 10
      include/rocksdb/db.h
  7. 7
      trace_replay/io_tracer.cc
  8. 2
      trace_replay/io_tracer.h
  9. 22
      trace_replay/io_tracer_test.cc

@ -617,6 +617,7 @@ set(SOURCES
env/env_encryption.cc
env/env_hdfs.cc
env/file_system.cc
env/file_system_tracer.cc
env/mock_env.cc
file/delete_scheduler.cc
file/file_prefetch_buffer.cc
@ -719,6 +720,7 @@ set(SOURCES
tools/trace_analyzer_tool.cc
trace_replay/trace_replay.cc
trace_replay/block_cache_tracer.cc
trace_replay/io_tracer.cc
util/coding.cc
util/compaction_job_stats_impl.cc
util/comparator.cc

@ -150,6 +150,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
own_info_log_(options.info_log == nullptr),
initial_db_options_(SanitizeOptions(dbname, options)),
env_(initial_db_options_.env),
io_tracer_(std::make_shared<IOTracer>()),
fs_(initial_db_options_.env->GetFileSystem()),
immutable_db_options_(initial_db_options_),
mutable_db_options_(initial_db_options_),
@ -3044,6 +3045,20 @@ FileSystem* DBImpl::GetFileSystem() const {
return immutable_db_options_.fs.get();
}
#ifndef ROCKSDB_LITE
Status DBImpl::StartIOTrace(Env* env, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
return io_tracer_->StartIOTrace(env, trace_options, std::move(trace_writer));
}
Status DBImpl::EndIOTrace() {
io_tracer_->EndIOTrace();
return Status::OK();
}
#endif // ROCKSDB_LITE
Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
InstrumentedMutexLock l(&mutex_);
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);

@ -55,6 +55,7 @@
#include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h"
#include "trace_replay/block_cache_tracer.h"
#include "trace_replay/io_tracer.h"
#include "trace_replay/trace_replay.h"
#include "util/autovector.h"
#include "util/hash.h"
@ -445,6 +446,13 @@ class DBImpl : public DB {
using DB::EndBlockCacheTrace;
Status EndBlockCacheTrace() override;
using DB::StartIOTrace;
Status StartIOTrace(Env* env, const TraceOptions& options,
std::unique_ptr<TraceWriter>&& trace_writer) override;
using DB::EndIOTrace;
Status EndIOTrace() override;
using DB::GetPropertiesOfAllTables;
virtual Status GetPropertiesOfAllTables(
ColumnFamilyHandle* column_family,
@ -1003,6 +1011,7 @@ class DBImpl : public DB {
bool own_info_log_;
const DBOptions initial_db_options_;
Env* const env_;
std::shared_ptr<IOTracer> io_tracer_;
std::shared_ptr<FileSystem> fs_;
const ImmutableDBOptions immutable_db_options_;
MutableDBOptions mutable_db_options_;

@ -613,6 +613,11 @@ class CompositeEnvWrapper : public Env {
return file_system_->OptimizeForCompactionTableRead(
FileOptions(env_options), db_options);
}
// This seems to clash with a macro on Windows, so #undef it here
#ifdef GetFreeSpace
#undef GetFreeSpace
#endif
Status GetFreeSpace(const std::string& path, uint64_t* diskfree) override {
IOOptions io_opts;
IODebugContext dbg;
@ -1089,6 +1094,11 @@ class LegacyFileSystemWrapper : public FileSystem {
const ImmutableDBOptions& db_options) const override {
return target_->OptimizeForCompactionTableRead(file_options, db_options);
}
// This seems to clash with a macro on Windows, so #undef it here
#ifdef GetFreeSpace
#undef GetFreeSpace
#endif
IOStatus GetFreeSpace(const std::string& path, const IOOptions& /*options*/,
uint64_t* diskfree, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetFreeSpace(path, diskfree));

@ -72,9 +72,6 @@ class FileSystemPtr {
fs_tracer_(
std::make_shared<FileSystemTracingWrapper>(fs_, io_tracer_)) {}
explicit FileSystemPtr(std::shared_ptr<FileSystem> fs)
: fs_(fs), io_tracer_(nullptr), fs_tracer_(nullptr) {}
std::shared_ptr<FileSystem> operator->() const {
if (io_tracer_ && io_tracer_->is_tracing_enabled()) {
return fs_tracer_;

@ -1587,6 +1587,16 @@ class DB {
return Status::NotSupported("EndTrace() is not implemented.");
}
// StartIOTrace and EndIOTrace are experimental. They are not enabled yet.
virtual Status StartIOTrace(Env* /*env*/, const TraceOptions& /*options*/,
std::unique_ptr<TraceWriter>&& /*trace_writer*/) {
return Status::NotSupported("StartTrace() is not implemented.");
}
virtual Status EndIOTrace() {
return Status::NotSupported("StartTrace() is not implemented.");
}
// Trace block cache accesses. Use EndBlockCacheTrace() to stop tracing.
virtual Status StartBlockCacheTrace(
const TraceOptions& /*options*/,

@ -52,7 +52,7 @@ Status IOTraceWriter::WriteIOOp(const IOTraceRecord& record) {
PutFixed64(&trace.payload, record.offset);
FALLTHROUGH_INTENDED;
case TraceType::kIOLen:
trace.payload.push_back(record.len);
PutFixed64(&trace.payload, record.len);
break;
default:
assert(false);
@ -177,13 +177,10 @@ Status IOTraceReader::ReadIOOp(IOTraceRecord* record) {
}
FALLTHROUGH_INTENDED;
case TraceType::kIOLen: {
if (enc_slice.empty()) {
if (!GetFixed64(&enc_slice, &record->len)) {
return Status::Incomplete(
"Incomplete access record: Failed to read length.");
}
record->len = static_cast<size_t>(enc_slice[0]);
const unsigned int kCharSize = 1;
enc_slice.remove_prefix(kCharSize);
break;
}
default:

@ -25,7 +25,7 @@ struct IOTraceRecord {
std::string io_status;
// Required fields for read.
std::string file_name;
size_t len = 0;
uint64_t len = 0;
uint64_t offset = 0;
uint64_t file_size = 0;

@ -31,7 +31,7 @@ class IOTracerTest : public testing::Test {
EXPECT_OK(env_->DeleteDir(test_path_));
}
std::string GetFileOperation(uint32_t id) {
std::string GetFileOperation(uint64_t id) {
id = id % 4;
switch (id) {
case 0:
@ -42,13 +42,15 @@ class IOTracerTest : public testing::Test {
return "FileSize";
case 3:
return "DeleteDir";
default:
assert(false);
}
assert(false);
return "";
}
void WriteIOOp(IOTraceWriter* writer, uint32_t nrecords) {
void WriteIOOp(IOTraceWriter* writer, uint64_t nrecords) {
assert(writer);
for (uint32_t i = 0; i < nrecords; i++) {
for (uint64_t i = 0; i < nrecords; i++) {
IOTraceRecord record;
record.trace_type = TraceType::kIOLenAndOffset;
record.file_operation = GetFileOperation(i);
@ -100,8 +102,8 @@ TEST_F(IOTracerTest, AtomicWrite) {
IOTraceReader reader(std::move(trace_reader));
IOTraceHeader header;
ASSERT_OK(reader.ReadHeader(&header));
ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
// Read record and verify data.
IOTraceRecord access_record;
ASSERT_OK(reader.ReadIOOp(&access_record));
@ -162,8 +164,8 @@ TEST_F(IOTracerTest, AtomicNoWriteAfterEndTrace) {
IOTraceReader reader(std::move(trace_reader));
IOTraceHeader header;
ASSERT_OK(reader.ReadHeader(&header));
ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
IOTraceRecord access_record;
ASSERT_OK(reader.ReadIOOp(&access_record));
@ -196,8 +198,8 @@ TEST_F(IOTracerTest, AtomicMultipleWrites) {
IOTraceReader reader(std::move(trace_reader));
IOTraceHeader header;
ASSERT_OK(reader.ReadHeader(&header));
ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
// Read 10 records.
VerifyIOOp(&reader, 10);
// Read one more and record and it should report error.

Loading…
Cancel
Save