From 36c2a7cfb15933a51de27dbe531a019afbd6a5d3 Mon Sep 17 00:00:00 2001 From: Wenjie Yang Date: Tue, 19 Mar 2019 14:19:01 -0700 Subject: [PATCH] Add an option to filter traces (#5082) Summary: Add an option to filter out READ or WRITE operations while tracing. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5082 Differential Revision: D14515083 Pulled By: mrmiywj fbshipit-source-id: 2504c89a9abf1dd629cad44b4104092702d77610 --- HISTORY.md | 1 + db/db_test2.cc | 171 ++++++++++++++++++++++++++++++++++++++ include/rocksdb/options.h | 11 +++ util/trace_replay.cc | 28 +++++-- util/trace_replay.h | 2 +- 5 files changed, 203 insertions(+), 10 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index e7729005f..ab576e507 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### New Features * Introduce two more stats levels, kExceptHistogramOrTimers and kExceptTimers. +* Add support for trace filtering. ### Public API Change * statistics.stats_level_ becomes atomic. It is preferred to use statistics.set_stats_level() and statistics.get_stats_level() to access it. diff --git a/db/db_test2.cc b/db/db_test2.cc index 2bb80548e..7cec8e66b 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -3019,6 +3019,177 @@ TEST_F(DBTest2, TraceWithSampling) { ASSERT_OK(DestroyDB(dbname2, options)); } +TEST_F(DBTest2, TraceWithFilter) { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreatePutOperator(); + ReadOptions ro; + WriteOptions wo; + TraceOptions trace_opts; + EnvOptions env_opts; + CreateAndReopenWithCF({"pikachu"}, options); + Random rnd(301); + Iterator* single_iter = nullptr; + + trace_opts.filter = TraceFilterType::kTraceFilterWrite; + + std::string trace_filename = dbname_ + "/rocksdb.trace"; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer)); + ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer))); + + ASSERT_OK(Put(0, "a", "1")); + ASSERT_OK(Merge(0, "b", "2")); + ASSERT_OK(Delete(0, "c")); + ASSERT_OK(SingleDelete(0, "d")); + ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f")); + + WriteBatch batch; + ASSERT_OK(batch.Put("f", "11")); + ASSERT_OK(batch.Merge("g", "12")); + ASSERT_OK(batch.Delete("h")); + ASSERT_OK(batch.SingleDelete("i")); + ASSERT_OK(batch.DeleteRange("j", "k")); + ASSERT_OK(db_->Write(wo, &batch)); + + single_iter = db_->NewIterator(ro); + single_iter->Seek("f"); + single_iter->SeekForPrev("g"); + delete single_iter; + + ASSERT_EQ("1", Get(0, "a")); + ASSERT_EQ("12", Get(0, "g")); + + ASSERT_OK(Put(1, "foo", "bar")); + ASSERT_OK(Put(1, "rocksdb", "rocks")); + ASSERT_EQ("NOT_FOUND", Get(1, "leveldb")); + + ASSERT_OK(db_->EndTrace()); + // These should not get into the trace file as it is after EndTrace. + Put("hello", "world"); + Merge("foo", "bar"); + + // Open another db, replay, and verify the data + std::string value; + std::string dbname2 = test::TmpDir(env_) + "/db_replay"; + ASSERT_OK(DestroyDB(dbname2, options)); + + // Using a different name than db2, to pacify infer's use-after-lifetime + // warnings (http://fbinfer.com). + DB* db2_init = nullptr; + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname2, &db2_init)); + ColumnFamilyHandle* cf; + ASSERT_OK( + db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf)); + delete cf; + delete db2_init; + + DB* db2 = nullptr; + std::vector column_families; + ColumnFamilyOptions cf_options; + cf_options.merge_operator = MergeOperators::CreatePutOperator(); + column_families.push_back(ColumnFamilyDescriptor("default", cf_options)); + column_families.push_back( + ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions())); + std::vector handles; + ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2)); + + env_->SleepForMicroseconds(100); + // Verify that the keys don't already exist + ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound()); + + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); + Replayer replayer(db2, handles_, std::move(trace_reader)); + ASSERT_OK(replayer.Replay()); + + // All the key-values should not present since we filter out the WRITE ops. + ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "foo", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "rocksdb", &value).IsNotFound()); + + for (auto handle : handles) { + delete handle; + } + delete db2; + ASSERT_OK(DestroyDB(dbname2, options)); + + // Set up a new db. + std::string dbname3 = test::TmpDir(env_) + "/db_not_trace_read"; + ASSERT_OK(DestroyDB(dbname3, options)); + + DB* db3_init = nullptr; + options.create_if_missing = true; + ColumnFamilyHandle* cf3; + ASSERT_OK(DB::Open(options, dbname3, &db3_init)); + ASSERT_OK( + db3_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf3)); + delete cf3; + delete db3_init; + + column_families.clear(); + column_families.push_back(ColumnFamilyDescriptor("default", cf_options)); + column_families.push_back( + ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions())); + handles.clear(); + + DB* db3 = nullptr; + ASSERT_OK(DB::Open(DBOptions(), dbname3, column_families, &handles, &db3)); + + env_->SleepForMicroseconds(100); + // Verify that the keys don't already exist + ASSERT_TRUE(db3->Get(ro, handles[0], "a", &value).IsNotFound()); + ASSERT_TRUE(db3->Get(ro, handles[0], "g", &value).IsNotFound()); + + //The tracer will not record the READ ops. + trace_opts.filter = TraceFilterType::kTraceFilterGet; + std::string trace_filename3 = dbname_ + "/rocksdb.trace_3"; + std::unique_ptr trace_writer3; + ASSERT_OK( + NewFileTraceWriter(env_, env_opts, trace_filename3, &trace_writer3)); + ASSERT_OK(db3->StartTrace(trace_opts, std::move(trace_writer3))); + + ASSERT_OK(db3->Put(wo, handles[0], "a", "1")); + ASSERT_OK(db3->Merge(wo, handles[0], "b", "2")); + ASSERT_OK(db3->Delete(wo, handles[0], "c")); + ASSERT_OK(db3->SingleDelete(wo, handles[0], "d")); + + ASSERT_OK(db3->Get(ro, handles[0], "a", &value)); + ASSERT_EQ(value, "1"); + ASSERT_TRUE(db3->Get(ro, handles[0], "c", &value).IsNotFound()); + + ASSERT_OK(db3->EndTrace()); + + for (auto handle : handles) { + delete handle; + } + delete db3; + ASSERT_OK(DestroyDB(dbname3, options)); + + std::unique_ptr trace_reader3; + ASSERT_OK( + NewFileTraceReader(env_, env_opts, trace_filename3, &trace_reader3)); + + // Count the number of records in the trace file; + int count = 0; + std::string data; + Status s; + while (true) { + s = trace_reader3->Read(&data); + if (!s.ok()) { + break; + } + count += 1; + } + // We also need to count the header and footer + // 4 WRITE + HEADER + FOOTER = 6 + ASSERT_EQ(count, 6); +} + #endif // ROCKSDB_LITE TEST_F(DBTest2, PinnableSliceAndMmapReads) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 2d9da1878..c9c2d0ba8 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1361,6 +1361,15 @@ struct IngestExternalFileOptions { bool verify_checksums_before_ingest = false; }; +enum TraceFilterType: uint64_t { + // Trace all the operations + kTraceFilterNone = 0x0, + // Do not trace the get operations + kTraceFilterGet = 0x1 << 0, + // Do not trace the write operations + kTraceFilterWrite = 0x1 << 1 +}; + // TraceOptions is used for StartTrace struct TraceOptions { // To avoid the trace file size grows large than the storage space, @@ -1369,6 +1378,8 @@ struct TraceOptions { // Specify trace sampling option, i.e. capture one per how many requests. // Default to 1 (capture every request). uint64_t sampling_frequency = 1; + // Note: The filtering happens before sampling. + uint64_t filter = kTraceFilterNone; }; } // namespace rocksdb diff --git a/util/trace_replay.cc b/util/trace_replay.cc index 60ebdd91c..28160b292 100644 --- a/util/trace_replay.cc +++ b/util/trace_replay.cc @@ -43,53 +43,63 @@ Tracer::Tracer(Env* env, const TraceOptions& trace_options, Tracer::~Tracer() { trace_writer_.reset(); } Status Tracer::Write(WriteBatch* write_batch) { - if (ShouldSkipTrace()) { + TraceType trace_type = kTraceWrite; + if (ShouldSkipTrace(trace_type)) { return Status::OK(); } Trace trace; trace.ts = env_->NowMicros(); - trace.type = kTraceWrite; + trace.type = trace_type; trace.payload = write_batch->Data(); return WriteTrace(trace); } Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { - if (ShouldSkipTrace()) { + TraceType trace_type = kTraceGet; + if (ShouldSkipTrace(trace_type)) { return Status::OK(); } Trace trace; trace.ts = env_->NowMicros(); - trace.type = kTraceGet; + trace.type = trace_type; EncodeCFAndKey(&trace.payload, column_family->GetID(), key); return WriteTrace(trace); } Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { - if (ShouldSkipTrace()) { + TraceType trace_type = kTraceIteratorSeek; + if (ShouldSkipTrace(trace_type)) { return Status::OK(); } Trace trace; trace.ts = env_->NowMicros(); - trace.type = kTraceIteratorSeek; + trace.type = trace_type; EncodeCFAndKey(&trace.payload, cf_id, key); return WriteTrace(trace); } Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { - if (ShouldSkipTrace()) { + TraceType trace_type = kTraceIteratorSeekForPrev; + if (ShouldSkipTrace(trace_type)) { return Status::OK(); } Trace trace; trace.ts = env_->NowMicros(); - trace.type = kTraceIteratorSeekForPrev; + trace.type = trace_type; EncodeCFAndKey(&trace.payload, cf_id, key); return WriteTrace(trace); } -bool Tracer::ShouldSkipTrace() { +bool Tracer::ShouldSkipTrace(const TraceType& trace_type) { if (IsTraceFileOverMax()) { return true; } + if ((trace_options_.filter & kTraceFilterGet + && trace_type == kTraceGet) + || (trace_options_.filter & kTraceFilterWrite + && trace_type == kTraceWrite)) { + return true; + } ++trace_request_count_; if (trace_request_count_ < trace_options_.sampling_frequency) { return true; diff --git a/util/trace_replay.h b/util/trace_replay.h index 92da3fc64..749ea2f64 100644 --- a/util/trace_replay.h +++ b/util/trace_replay.h @@ -72,7 +72,7 @@ class Tracer { Status WriteHeader(); Status WriteFooter(); Status WriteTrace(const Trace& trace); - bool ShouldSkipTrace(); + bool ShouldSkipTrace(const TraceType& type); Env* env_; TraceOptions trace_options_;