Add an option to filter traces (#5082)

Summary:
Add an option to filter out READ or WRITE operations while tracing.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5082

Differential Revision: D14515083

Pulled By: mrmiywj

fbshipit-source-id: 2504c89a9abf1dd629cad44b4104092702d77610
main
Wenjie Yang 6 years ago committed by Facebook Github Bot
parent f2f6acbef3
commit 36c2a7cfb1
  1. 1
      HISTORY.md
  2. 171
      db/db_test2.cc
  3. 11
      include/rocksdb/options.h
  4. 28
      util/trace_replay.cc
  5. 2
      util/trace_replay.h

@ -2,6 +2,7 @@
## Unreleased ## Unreleased
### New Features ### New Features
* Introduce two more stats levels, kExceptHistogramOrTimers and kExceptTimers. * Introduce two more stats levels, kExceptHistogramOrTimers and kExceptTimers.
* Add support for trace filtering.
### Public API Change ### Public API Change
* statistics.stats_level_ becomes atomic. It is preferred to use statistics.set_stats_level() and statistics.get_stats_level() to access it. * statistics.stats_level_ becomes atomic. It is preferred to use statistics.set_stats_level() and statistics.get_stats_level() to access it.

@ -3019,6 +3019,177 @@ TEST_F(DBTest2, TraceWithSampling) {
ASSERT_OK(DestroyDB(dbname2, options)); ASSERT_OK(DestroyDB(dbname2, options));
} }
TEST_F(DBTest2, TraceWithFilter) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreatePutOperator();
ReadOptions ro;
WriteOptions wo;
TraceOptions trace_opts;
EnvOptions env_opts;
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
Iterator* single_iter = nullptr;
trace_opts.filter = TraceFilterType::kTraceFilterWrite;
std::string trace_filename = dbname_ + "/rocksdb.trace";
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
ASSERT_OK(Put(0, "a", "1"));
ASSERT_OK(Merge(0, "b", "2"));
ASSERT_OK(Delete(0, "c"));
ASSERT_OK(SingleDelete(0, "d"));
ASSERT_OK(db_->DeleteRange(wo, dbfull()->DefaultColumnFamily(), "e", "f"));
WriteBatch batch;
ASSERT_OK(batch.Put("f", "11"));
ASSERT_OK(batch.Merge("g", "12"));
ASSERT_OK(batch.Delete("h"));
ASSERT_OK(batch.SingleDelete("i"));
ASSERT_OK(batch.DeleteRange("j", "k"));
ASSERT_OK(db_->Write(wo, &batch));
single_iter = db_->NewIterator(ro);
single_iter->Seek("f");
single_iter->SeekForPrev("g");
delete single_iter;
ASSERT_EQ("1", Get(0, "a"));
ASSERT_EQ("12", Get(0, "g"));
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(1, "rocksdb", "rocks"));
ASSERT_EQ("NOT_FOUND", Get(1, "leveldb"));
ASSERT_OK(db_->EndTrace());
// These should not get into the trace file as it is after EndTrace.
Put("hello", "world");
Merge("foo", "bar");
// Open another db, replay, and verify the data
std::string value;
std::string dbname2 = test::TmpDir(env_) + "/db_replay";
ASSERT_OK(DestroyDB(dbname2, options));
// Using a different name than db2, to pacify infer's use-after-lifetime
// warnings (http://fbinfer.com).
DB* db2_init = nullptr;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname2, &db2_init));
ColumnFamilyHandle* cf;
ASSERT_OK(
db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
delete cf;
delete db2_init;
DB* db2 = nullptr;
std::vector<ColumnFamilyDescriptor> column_families;
ColumnFamilyOptions cf_options;
cf_options.merge_operator = MergeOperators::CreatePutOperator();
column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
column_families.push_back(
ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
env_->SleepForMicroseconds(100);
// Verify that the keys don't already exist
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
Replayer replayer(db2, handles_, std::move(trace_reader));
ASSERT_OK(replayer.Replay());
// All the key-values should not present since we filter out the WRITE ops.
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "g", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "hello", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "world", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "foo", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "rocksdb", &value).IsNotFound());
for (auto handle : handles) {
delete handle;
}
delete db2;
ASSERT_OK(DestroyDB(dbname2, options));
// Set up a new db.
std::string dbname3 = test::TmpDir(env_) + "/db_not_trace_read";
ASSERT_OK(DestroyDB(dbname3, options));
DB* db3_init = nullptr;
options.create_if_missing = true;
ColumnFamilyHandle* cf3;
ASSERT_OK(DB::Open(options, dbname3, &db3_init));
ASSERT_OK(
db3_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf3));
delete cf3;
delete db3_init;
column_families.clear();
column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
column_families.push_back(
ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
handles.clear();
DB* db3 = nullptr;
ASSERT_OK(DB::Open(DBOptions(), dbname3, column_families, &handles, &db3));
env_->SleepForMicroseconds(100);
// Verify that the keys don't already exist
ASSERT_TRUE(db3->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db3->Get(ro, handles[0], "g", &value).IsNotFound());
//The tracer will not record the READ ops.
trace_opts.filter = TraceFilterType::kTraceFilterGet;
std::string trace_filename3 = dbname_ + "/rocksdb.trace_3";
std::unique_ptr<TraceWriter> trace_writer3;
ASSERT_OK(
NewFileTraceWriter(env_, env_opts, trace_filename3, &trace_writer3));
ASSERT_OK(db3->StartTrace(trace_opts, std::move(trace_writer3)));
ASSERT_OK(db3->Put(wo, handles[0], "a", "1"));
ASSERT_OK(db3->Merge(wo, handles[0], "b", "2"));
ASSERT_OK(db3->Delete(wo, handles[0], "c"));
ASSERT_OK(db3->SingleDelete(wo, handles[0], "d"));
ASSERT_OK(db3->Get(ro, handles[0], "a", &value));
ASSERT_EQ(value, "1");
ASSERT_TRUE(db3->Get(ro, handles[0], "c", &value).IsNotFound());
ASSERT_OK(db3->EndTrace());
for (auto handle : handles) {
delete handle;
}
delete db3;
ASSERT_OK(DestroyDB(dbname3, options));
std::unique_ptr<TraceReader> trace_reader3;
ASSERT_OK(
NewFileTraceReader(env_, env_opts, trace_filename3, &trace_reader3));
// Count the number of records in the trace file;
int count = 0;
std::string data;
Status s;
while (true) {
s = trace_reader3->Read(&data);
if (!s.ok()) {
break;
}
count += 1;
}
// We also need to count the header and footer
// 4 WRITE + HEADER + FOOTER = 6
ASSERT_EQ(count, 6);
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
TEST_F(DBTest2, PinnableSliceAndMmapReads) { TEST_F(DBTest2, PinnableSliceAndMmapReads) {

@ -1361,6 +1361,15 @@ struct IngestExternalFileOptions {
bool verify_checksums_before_ingest = false; bool verify_checksums_before_ingest = false;
}; };
enum TraceFilterType: uint64_t {
// Trace all the operations
kTraceFilterNone = 0x0,
// Do not trace the get operations
kTraceFilterGet = 0x1 << 0,
// Do not trace the write operations
kTraceFilterWrite = 0x1 << 1
};
// TraceOptions is used for StartTrace // TraceOptions is used for StartTrace
struct TraceOptions { struct TraceOptions {
// To avoid the trace file size grows large than the storage space, // To avoid the trace file size grows large than the storage space,
@ -1369,6 +1378,8 @@ struct TraceOptions {
// Specify trace sampling option, i.e. capture one per how many requests. // Specify trace sampling option, i.e. capture one per how many requests.
// Default to 1 (capture every request). // Default to 1 (capture every request).
uint64_t sampling_frequency = 1; uint64_t sampling_frequency = 1;
// Note: The filtering happens before sampling.
uint64_t filter = kTraceFilterNone;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -43,53 +43,63 @@ Tracer::Tracer(Env* env, const TraceOptions& trace_options,
Tracer::~Tracer() { trace_writer_.reset(); } Tracer::~Tracer() { trace_writer_.reset(); }
Status Tracer::Write(WriteBatch* write_batch) { Status Tracer::Write(WriteBatch* write_batch) {
if (ShouldSkipTrace()) { TraceType trace_type = kTraceWrite;
if (ShouldSkipTrace(trace_type)) {
return Status::OK(); return Status::OK();
} }
Trace trace; Trace trace;
trace.ts = env_->NowMicros(); trace.ts = env_->NowMicros();
trace.type = kTraceWrite; trace.type = trace_type;
trace.payload = write_batch->Data(); trace.payload = write_batch->Data();
return WriteTrace(trace); return WriteTrace(trace);
} }
Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
if (ShouldSkipTrace()) { TraceType trace_type = kTraceGet;
if (ShouldSkipTrace(trace_type)) {
return Status::OK(); return Status::OK();
} }
Trace trace; Trace trace;
trace.ts = env_->NowMicros(); trace.ts = env_->NowMicros();
trace.type = kTraceGet; trace.type = trace_type;
EncodeCFAndKey(&trace.payload, column_family->GetID(), key); EncodeCFAndKey(&trace.payload, column_family->GetID(), key);
return WriteTrace(trace); return WriteTrace(trace);
} }
Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
if (ShouldSkipTrace()) { TraceType trace_type = kTraceIteratorSeek;
if (ShouldSkipTrace(trace_type)) {
return Status::OK(); return Status::OK();
} }
Trace trace; Trace trace;
trace.ts = env_->NowMicros(); trace.ts = env_->NowMicros();
trace.type = kTraceIteratorSeek; trace.type = trace_type;
EncodeCFAndKey(&trace.payload, cf_id, key); EncodeCFAndKey(&trace.payload, cf_id, key);
return WriteTrace(trace); return WriteTrace(trace);
} }
Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
if (ShouldSkipTrace()) { TraceType trace_type = kTraceIteratorSeekForPrev;
if (ShouldSkipTrace(trace_type)) {
return Status::OK(); return Status::OK();
} }
Trace trace; Trace trace;
trace.ts = env_->NowMicros(); trace.ts = env_->NowMicros();
trace.type = kTraceIteratorSeekForPrev; trace.type = trace_type;
EncodeCFAndKey(&trace.payload, cf_id, key); EncodeCFAndKey(&trace.payload, cf_id, key);
return WriteTrace(trace); return WriteTrace(trace);
} }
bool Tracer::ShouldSkipTrace() { bool Tracer::ShouldSkipTrace(const TraceType& trace_type) {
if (IsTraceFileOverMax()) { if (IsTraceFileOverMax()) {
return true; return true;
} }
if ((trace_options_.filter & kTraceFilterGet
&& trace_type == kTraceGet)
|| (trace_options_.filter & kTraceFilterWrite
&& trace_type == kTraceWrite)) {
return true;
}
++trace_request_count_; ++trace_request_count_;
if (trace_request_count_ < trace_options_.sampling_frequency) { if (trace_request_count_ < trace_options_.sampling_frequency) {
return true; return true;

@ -72,7 +72,7 @@ class Tracer {
Status WriteHeader(); Status WriteHeader();
Status WriteFooter(); Status WriteFooter();
Status WriteTrace(const Trace& trace); Status WriteTrace(const Trace& trace);
bool ShouldSkipTrace(); bool ShouldSkipTrace(const TraceType& type);
Env* env_; Env* env_;
TraceOptions trace_options_; TraceOptions trace_options_;

Loading…
Cancel
Save