Add the max trace file size limitation option to Tracing (#4610)

Summary:
If user do not end the trace manually, the tracing will continue which can potential use up all the storage space and cause problem. In this PR, the max trace file size is added to the TraceOptions and user can set the value if they need or the default is 64GB.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4610

Differential Revision: D12893400

Pulled By: zhichao-cao

fbshipit-source-id: acf4b5a6076bb691778bdfbac4864e1006758953
main
Zhichao Cao 6 years ago committed by Facebook Github Bot
parent c94f073e5e
commit 7125e24619
  1. 4
      db/db_impl.cc
  2. 68
      db/db_test2.cc
  3. 7
      include/rocksdb/options.h
  4. 1
      include/rocksdb/trace_reader_writer.h
  5. 24
      util/trace_replay.cc
  6. 6
      util/trace_replay.h
  7. 2
      utilities/trace/file_trace_reader_writer.cc
  8. 1
      utilities/trace/file_trace_reader_writer.h

@ -3276,10 +3276,10 @@ void DBImpl::WaitForIngestFile() {
} }
} }
Status DBImpl::StartTrace(const TraceOptions& /* options */, Status DBImpl::StartTrace(const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) { std::unique_ptr<TraceWriter>&& trace_writer) {
InstrumentedMutexLock lock(&trace_mutex_); InstrumentedMutexLock lock(&trace_mutex_);
tracer_.reset(new Tracer(env_, std::move(trace_writer))); tracer_.reset(new Tracer(env_, trace_options, std::move(trace_writer)));
return Status::OK(); return Status::OK();
} }

@ -2688,6 +2688,74 @@ TEST_F(DBTest2, TraceAndReplay) {
ASSERT_OK(DestroyDB(dbname2, options)); ASSERT_OK(DestroyDB(dbname2, options));
} }
TEST_F(DBTest2, TraceWithLimit) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreatePutOperator();
ReadOptions ro;
WriteOptions wo;
TraceOptions trace_opts;
EnvOptions env_opts;
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
// test the max trace file size options
trace_opts.max_trace_file_size = 5;
std::string trace_filename = dbname_ + "/rocksdb.trace1";
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
ASSERT_OK(Put(0, "a", "1"));
ASSERT_OK(Put(0, "b", "1"));
ASSERT_OK(Put(0, "c", "1"));
ASSERT_OK(db_->EndTrace());
std::string dbname2 = test::TmpDir(env_) + "/db_replay2";
std::string value;
ASSERT_OK(DestroyDB(dbname2, options));
// Using a different name than db2, to pacify infer's use-after-lifetime
// warnings (http://fbinfer.com).
DB* db2_init = nullptr;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname2, &db2_init));
ColumnFamilyHandle* cf;
ASSERT_OK(
db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
delete cf;
delete db2_init;
DB* db2 = nullptr;
std::vector<ColumnFamilyDescriptor> column_families;
ColumnFamilyOptions cf_options;
cf_options.merge_operator = MergeOperators::CreatePutOperator();
column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
column_families.push_back(
ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
env_->SleepForMicroseconds(100);
// Verify that the keys don't already exist
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
Replayer replayer(db2, handles_, std::move(trace_reader));
ASSERT_OK(replayer.Replay());
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
for (auto handle : handles) {
delete handle;
}
delete db2;
ASSERT_OK(DestroyDB(dbname2, options));
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
TEST_F(DBTest2, PinnableSliceAndMmapReads) { TEST_F(DBTest2, PinnableSliceAndMmapReads) {

@ -1337,6 +1337,11 @@ struct IngestExternalFileOptions {
bool write_global_seqno = true; bool write_global_seqno = true;
}; };
struct TraceOptions {}; // TraceOptions is used for StartTrace
struct TraceOptions {
// To avoid the trace file size grows large than the storage space,
// user can set the max trace file size in Bytes. Default is 64GB
uint64_t max_trace_file_size = uint64_t{64} * 1024 * 1024 * 1024;
};
} // namespace rocksdb } // namespace rocksdb

@ -24,6 +24,7 @@ class TraceWriter {
virtual Status Write(const Slice& data) = 0; virtual Status Write(const Slice& data) = 0;
virtual Status Close() = 0; virtual Status Close() = 0;
virtual uint64_t GetFileSize() = 0;
}; };
// TraceReader allows reading RocksDB traces from any system, one operation at // TraceReader allows reading RocksDB traces from any system, one operation at

@ -31,14 +31,20 @@ void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) {
} }
} // namespace } // namespace
Tracer::Tracer(Env* env, std::unique_ptr<TraceWriter>&& trace_writer) Tracer::Tracer(Env* env, const TraceOptions& trace_options,
: env_(env), trace_writer_(std::move(trace_writer)) { std::unique_ptr<TraceWriter>&& trace_writer)
: env_(env),
trace_options_(trace_options),
trace_writer_(std::move(trace_writer)) {
WriteHeader(); WriteHeader();
} }
Tracer::~Tracer() { trace_writer_.reset(); } Tracer::~Tracer() { trace_writer_.reset(); }
Status Tracer::Write(WriteBatch* write_batch) { Status Tracer::Write(WriteBatch* write_batch) {
if (IsTraceFileOverMax()) {
return Status::OK();
}
Trace trace; Trace trace;
trace.ts = env_->NowMicros(); trace.ts = env_->NowMicros();
trace.type = kTraceWrite; trace.type = kTraceWrite;
@ -47,6 +53,9 @@ Status Tracer::Write(WriteBatch* write_batch) {
} }
Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
if (IsTraceFileOverMax()) {
return Status::OK();
}
Trace trace; Trace trace;
trace.ts = env_->NowMicros(); trace.ts = env_->NowMicros();
trace.type = kTraceGet; trace.type = kTraceGet;
@ -55,6 +64,9 @@ Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
} }
Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
if (IsTraceFileOverMax()) {
return Status::OK();
}
Trace trace; Trace trace;
trace.ts = env_->NowMicros(); trace.ts = env_->NowMicros();
trace.type = kTraceIteratorSeek; trace.type = kTraceIteratorSeek;
@ -63,6 +75,9 @@ Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
} }
Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
if (IsTraceFileOverMax()) {
return Status::OK();
}
Trace trace; Trace trace;
trace.ts = env_->NowMicros(); trace.ts = env_->NowMicros();
trace.type = kTraceIteratorSeekForPrev; trace.type = kTraceIteratorSeekForPrev;
@ -70,6 +85,11 @@ Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
return WriteTrace(trace); return WriteTrace(trace);
} }
bool Tracer::IsTraceFileOverMax() {
uint64_t trace_file_size = trace_writer_->GetFileSize();
return (trace_file_size > trace_options_.max_trace_file_size);
}
Status Tracer::WriteHeader() { Status Tracer::WriteHeader() {
std::ostringstream s; std::ostringstream s;
s << kTraceMagic << "\t" s << kTraceMagic << "\t"

@ -10,6 +10,7 @@
#include <utility> #include <utility>
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/trace_reader_writer.h" #include "rocksdb/trace_reader_writer.h"
namespace rocksdb { namespace rocksdb {
@ -55,13 +56,15 @@ struct Trace {
// Trace RocksDB operations using a TraceWriter. // Trace RocksDB operations using a TraceWriter.
class Tracer { class Tracer {
public: public:
Tracer(Env* env, std::unique_ptr<TraceWriter>&& trace_writer); Tracer(Env* env, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer);
~Tracer(); ~Tracer();
Status Write(WriteBatch* write_batch); Status Write(WriteBatch* write_batch);
Status Get(ColumnFamilyHandle* cfname, const Slice& key); Status Get(ColumnFamilyHandle* cfname, const Slice& key);
Status IteratorSeek(const uint32_t& cf_id, const Slice& key); Status IteratorSeek(const uint32_t& cf_id, const Slice& key);
Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key); Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key);
bool IsTraceFileOverMax();
Status Close(); Status Close();
@ -71,6 +74,7 @@ class Tracer {
Status WriteTrace(const Trace& trace); Status WriteTrace(const Trace& trace);
Env* env_; Env* env_;
TraceOptions trace_options_;
std::unique_ptr<TraceWriter> trace_writer_; std::unique_ptr<TraceWriter> trace_writer_;
}; };

@ -83,6 +83,8 @@ Status FileTraceWriter::Write(const Slice& data) {
return file_writer_->Append(data); return file_writer_->Append(data);
} }
uint64_t FileTraceWriter::GetFileSize() { return file_writer_->GetFileSize(); }
Status NewFileTraceReader(Env* env, const EnvOptions& env_options, Status NewFileTraceReader(Env* env, const EnvOptions& env_options,
const std::string& trace_filename, const std::string& trace_filename,
std::unique_ptr<TraceReader>* trace_reader) { std::unique_ptr<TraceReader>* trace_reader) {

@ -39,6 +39,7 @@ class FileTraceWriter : public TraceWriter {
virtual Status Write(const Slice& data) override; virtual Status Write(const Slice& data) override;
virtual Status Close() override; virtual Status Close() override;
virtual uint64_t GetFileSize() override;
private: private:
std::unique_ptr<WritableFileWriter> file_writer_; std::unique_ptr<WritableFileWriter> file_writer_;

Loading…
Cancel
Save