Integrate block cache tracer into db_impl (#5433)

Summary:
This PR integrates the block cache tracer class into db_impl.cc.
db_impl.cc contains a member variable of AtomicBlockCacheTraceWriter class and passes its reference to the block_based_table_reader.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5433

Differential Revision: D15728016

Pulled By: HaoyuHuang

fbshipit-source-id: 23d5659e8c82d556833dcc1a5558aac8c1f7db71
main
haoyuhuang 6 years ago committed by Facebook Github Bot
parent a3b8c76d8e
commit bb4178066d
  1. 13
      TARGETS
  2. 20
      db/column_family.cc
  3. 8
      db/column_family.h
  4. 6
      db/compaction/compaction_job_test.cc
  5. 14
      db/db_impl/db_impl.cc
  6. 11
      db/db_impl/db_impl.h
  7. 3
      db/db_wal_test.cc
  8. 3
      db/flush_job_test.cc
  9. 4
      db/memtable_list_test.cc
  10. 6
      db/repair.cc
  11. 8
      db/table_cache.cc
  12. 5
      db/table_cache.h
  13. 18
      db/version_set.cc
  14. 6
      db/version_set.h
  15. 3
      db/version_set_test.cc
  16. 3
      db/wal_manager_test.cc
  17. 11
      include/rocksdb/db.h
  18. 10
      include/rocksdb/utilities/stackable_db.h
  19. 3
      table/block_based/block_based_table_factory.cc
  20. 27
      table/block_based/block_based_table_reader.cc
  21. 8
      table/block_based/block_based_table_reader.h
  22. 3
      table/block_based/partitioned_filter_block_test.cc
  23. 14
      table/table_builder.h
  24. 6
      tools/ldb_cmd.cc
  25. 66
      trace_replay/block_cache_tracer.cc
  26. 35
      trace_replay/block_cache_tracer.h
  27. 102
      trace_replay/block_cache_tracer_test.cc

@ -222,6 +222,7 @@ cpp_library(
"tools/ldb_cmd.cc",
"tools/ldb_tool.cc",
"tools/sst_dump_tool.cc",
"trace_replay/block_cache_tracer.cc",
"trace_replay/trace_replay.cc",
"util/bloom.cc",
"util/build_version.cc",
@ -314,6 +315,7 @@ cpp_library(
"test_util/fault_injection_test_env.cc",
"test_util/testharness.cc",
"test_util/testutil.cc",
"tools/block_cache_trace_analyzer.cc",
"tools/trace_analyzer_tool.cc",
"utilities/cassandra/test_utils.cc",
],
@ -329,6 +331,7 @@ cpp_library(
name = "rocksdb_tools_lib",
srcs = [
"test_util/testutil.cc",
"tools/block_cache_trace_analyzer.cc",
"tools/db_bench_tool.cc",
"tools/trace_analyzer_tool.cc",
],
@ -383,6 +386,16 @@ ROCKS_TESTS = [
"table/block_based/block_based_filter_block_test.cc",
"serial",
],
[
"block_cache_trace_analyzer_test",
"tools/block_cache_trace_analyzer_test.cc",
"serial",
],
[
"block_cache_tracer_test",
"trace_replay/block_cache_tracer_test.cc",
"serial",
],
[
"block_test",
"table/block_based/block_test.cc",

@ -405,7 +405,8 @@ ColumnFamilyData::ColumnFamilyData(
uint32_t id, const std::string& name, Version* _dummy_versions,
Cache* _table_cache, WriteBufferManager* write_buffer_manager,
const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
const EnvOptions& env_options, ColumnFamilySet* column_family_set)
const EnvOptions& env_options, ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer)
: id_(id),
name_(name),
dummy_versions_(_dummy_versions),
@ -445,7 +446,8 @@ ColumnFamilyData::ColumnFamilyData(
if (_dummy_versions != nullptr) {
internal_stats_.reset(
new InternalStats(ioptions_.num_levels, db_options.env, this));
table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache,
block_cache_tracer));
if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
new LevelCompactionPicker(ioptions_, &internal_comparator_));
@ -1254,18 +1256,20 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const EnvOptions& env_options,
Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller)
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
ColumnFamilyOptions(), *db_options,
env_options, nullptr)),
dummy_cfd_(new ColumnFamilyData(
0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
env_options, nullptr, block_cache_tracer)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
env_options_(env_options),
table_cache_(table_cache),
write_buffer_manager_(write_buffer_manager),
write_controller_(write_controller) {
write_controller_(write_controller),
block_cache_tracer_(block_cache_tracer) {
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;
dummy_cfd_->next_ = dummy_cfd_;
@ -1333,7 +1337,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
*db_options_, env_options_, this);
*db_options_, env_options_, this, block_cache_tracer_);
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);

@ -24,6 +24,7 @@
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "trace_replay/block_cache_tracer.h"
#include "util/thread_local.h"
namespace rocksdb {
@ -504,7 +505,8 @@ class ColumnFamilyData {
const ColumnFamilyOptions& options,
const ImmutableDBOptions& db_options,
const EnvOptions& env_options,
ColumnFamilySet* column_family_set);
ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer);
uint32_t id_;
const std::string name_;
@ -632,7 +634,8 @@ class ColumnFamilySet {
const ImmutableDBOptions* db_options,
const EnvOptions& env_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller);
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer);
~ColumnFamilySet();
ColumnFamilyData* GetDefault() const;
@ -691,6 +694,7 @@ class ColumnFamilySet {
Cache* table_cache_;
WriteBufferManager* write_buffer_manager_;
WriteController* write_controller_;
BlockCacheTracer* const block_cache_tracer_;
};
// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access

@ -77,7 +77,8 @@ class CompactionJobTest : public testing::Test {
write_buffer_manager_(db_options_.db_write_buffer_size),
versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_)),
&write_controller_,
/*block_cache_tracer=*/nullptr)),
shutting_down_(false),
preserve_deletes_seqnum_(0),
mock_table_factory_(new mock::MockTableFactory()),
@ -200,7 +201,8 @@ class CompactionJobTest : public testing::Test {
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_));
&write_controller_,
/*block_cache_tracer=*/nullptr));
compaction_job_stats_.Reset();
VersionEdit new_db;

@ -237,7 +237,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
versions_.reset(new VersionSet(dbname_, &immutable_db_options_, env_options_,
table_cache_.get(), write_buffer_manager_,
&write_controller_));
&write_controller_, &block_cache_tracer_));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
@ -3924,6 +3924,18 @@ Status DBImpl::EndTrace() {
return s;
}
Status DBImpl::StartBlockCacheTrace(
const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
return block_cache_tracer_.StartTrace(env_, trace_options,
std::move(trace_writer));
}
Status DBImpl::EndBlockCacheTrace() {
block_cache_tracer_.EndTrace();
return Status::OK();
}
Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) {
Status s;
if (tracer_) {

@ -40,7 +40,6 @@
#include "db/wal_manager.h"
#include "db/write_controller.h"
#include "db/write_thread.h"
#include "db/memtable_list.h"
#include "logging/event_logger.h"
#include "monitoring/instrumented_mutex.h"
#include "options/db_options.h"
@ -53,6 +52,7 @@
#include "rocksdb/transaction_log.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h"
#include "trace_replay/block_cache_tracer.h"
#include "trace_replay/trace_replay.h"
#include "util/autovector.h"
#include "util/hash.h"
@ -331,6 +331,14 @@ class DBImpl : public DB {
using DB::EndTrace;
virtual Status EndTrace() override;
using DB::StartBlockCacheTrace;
Status StartBlockCacheTrace(
const TraceOptions& options,
std::unique_ptr<TraceWriter>&& trace_writer) override;
using DB::EndBlockCacheTrace;
Status EndBlockCacheTrace() override;
using DB::GetPropertiesOfAllTables;
virtual Status GetPropertiesOfAllTables(
ColumnFamilyHandle* column_family,
@ -832,6 +840,7 @@ class DBImpl : public DB {
recovered_transactions_;
std::unique_ptr<Tracer> tracer_;
InstrumentedMutex trace_mutex_;
BlockCacheTracer block_cache_tracer_;
// State below is protected by mutex_
// With two_write_queues enabled, some of the variables that accessed during

@ -838,7 +838,8 @@ class RecoveryTestHelper {
versions.reset(new VersionSet(test->dbname_, &db_options, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller));
&write_controller,
/*block_cache_tracer=*/nullptr));
wal_manager.reset(new WalManager(db_options, env_options));

@ -35,7 +35,8 @@ class FlushJobTest : public testing::Test {
write_buffer_manager_(db_options_.db_write_buffer_size),
versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_)),
&write_controller_,
/*block_cache_tracer=*/nullptr)),
shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()) {
EXPECT_OK(env_->CreateDirIfMissing(dbname_));

@ -100,7 +100,7 @@ class MemTableListTest : public testing::Test {
VersionSet versions(dbname, &immutable_db_options, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller);
&write_controller, /*block_cache_tracer=*/nullptr);
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());
@ -144,7 +144,7 @@ class MemTableListTest : public testing::Test {
VersionSet versions(dbname, &immutable_db_options, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller);
&write_controller, /*block_cache_tracer=*/nullptr);
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());

@ -109,11 +109,13 @@ class Repairer {
// once.
NewLRUCache(10, db_options_.table_cache_numshardbits)),
table_cache_(new TableCache(default_cf_iopts_, env_options_,
raw_table_cache_.get())),
raw_table_cache_.get(),
/*block_cache_tracer=*/nullptr)),
wb_(db_options_.db_write_buffer_size),
wc_(db_options_.delayed_write_rate),
vset_(dbname_, &immutable_db_options_, env_options_,
raw_table_cache_.get(), &wb_, &wc_),
raw_table_cache_.get(), &wb_, &wc_,
/*block_cache_tracer=*/nullptr),
next_file_number_(1),
db_lock_(nullptr) {
for (const auto& cfd : column_families) {

@ -68,11 +68,13 @@ void AppendVarint64(IterKey* key, uint64_t v) {
} // namespace
TableCache::TableCache(const ImmutableCFOptions& ioptions,
const EnvOptions& env_options, Cache* const cache)
const EnvOptions& env_options, Cache* const cache,
BlockCacheTracer* const block_cache_tracer)
: ioptions_(ioptions),
env_options_(env_options),
cache_(cache),
immortal_tables_(false) {
immortal_tables_(false),
block_cache_tracer_(block_cache_tracer) {
if (ioptions_.row_cache) {
// If the same cache is shared by multiple instances, we need to
// disambiguate its entries.
@ -125,7 +127,7 @@ Status TableCache::GetTableReader(
s = ioptions_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, prefix_extractor, env_options,
internal_comparator, skip_filters, immortal_tables_,
level, fd.largest_seqno),
level, fd.largest_seqno, block_cache_tracer_),
std::move(file_reader), fd.GetFileSize(), table_reader,
prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0");

@ -23,6 +23,7 @@
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "table/table_reader.h"
#include "trace_replay/block_cache_tracer.h"
namespace rocksdb {
@ -48,7 +49,8 @@ class HistogramImpl;
class TableCache {
public:
TableCache(const ImmutableCFOptions& ioptions,
const EnvOptions& storage_options, Cache* cache);
const EnvOptions& storage_options, Cache* cache,
BlockCacheTracer* const block_cache_tracer);
~TableCache();
// Return an iterator for the specified file number (the corresponding
@ -188,6 +190,7 @@ class TableCache {
Cache* const cache_;
std::string row_cache_id_;
bool immortal_tables_;
BlockCacheTracer* const block_cache_tracer_;
};
} // namespace rocksdb

@ -3342,10 +3342,11 @@ VersionSet::VersionSet(const std::string& dbname,
const ImmutableDBOptions* _db_options,
const EnvOptions& storage_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller)
: column_family_set_(
new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller)),
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer)
: column_family_set_(new ColumnFamilySet(
dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller, block_cache_tracer)),
env_(_db_options->env),
dbname_(dbname),
db_options_(_db_options),
@ -3359,7 +3360,8 @@ VersionSet::VersionSet(const std::string& dbname,
prev_log_number_(0),
current_version_number_(0),
manifest_file_size_(0),
env_options_(storage_options) {}
env_options_(storage_options),
block_cache_tracer_(block_cache_tracer) {}
void CloseTables(void* ptr, size_t) {
TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
@ -4445,7 +4447,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
options->table_cache_numshardbits));
WriteController wc(options->delayed_write_rate);
WriteBufferManager wb(options->db_write_buffer_size);
VersionSet versions(dbname, &db_options, env_options, tc.get(), &wb, &wc);
VersionSet versions(dbname, &db_options, env_options, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr);
Status status;
std::vector<ColumnFamilyDescriptor> dummy;
@ -5200,7 +5203,8 @@ ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller)
: VersionSet(dbname, _db_options, _env_options, table_cache,
write_buffer_manager, write_controller) {}
write_buffer_manager, write_controller,
/*block_cache_tracer=*/nullptr) {}
ReactiveVersionSet::~ReactiveVersionSet() {}

@ -46,6 +46,7 @@
#include "rocksdb/env.h"
#include "table/get_context.h"
#include "table/multiget_context.h"
#include "trace_replay/block_cache_tracer.h"
namespace rocksdb {
@ -777,7 +778,8 @@ class VersionSet {
VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options,
const EnvOptions& env_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller);
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer);
virtual ~VersionSet();
// Apply *edit to the current version to form a new descriptor that
@ -1125,6 +1127,8 @@ class VersionSet {
// env options for all reads and writes except compactions
EnvOptions env_options_;
BlockCacheTracer* const block_cache_tracer_;
private:
// No copying allowed
VersionSet(const VersionSet&);

@ -618,7 +618,8 @@ class VersionSetTestBase {
write_buffer_manager_(db_options_.db_write_buffer_size),
versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_)),
&write_controller_,
/*block_cache_tracer=*/nullptr)),
reactive_versions_(std::make_shared<ReactiveVersionSet>(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_)),

@ -50,7 +50,8 @@ class WalManagerTest : public testing::Test {
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_));
&write_controller_,
/*block_cache_tracer=*/nullptr));
wal_manager_.reset(new WalManager(db_options_, env_options_));
}

@ -1317,6 +1317,17 @@ class DB {
virtual Status EndTrace() {
return Status::NotSupported("EndTrace() is not implemented.");
}
// Trace block cache accesses. Use EndBlockCacheTrace() to stop tracing.
virtual Status StartBlockCacheTrace(
const TraceOptions& /*options*/,
std::unique_ptr<TraceWriter>&& /*trace_writer*/) {
return Status::NotSupported("StartBlockCacheTrace() is not implemented.");
}
virtual Status EndBlockCacheTrace() {
return Status::NotSupported("EndBlockCacheTrace() is not implemented.");
}
#endif // ROCKSDB_LITE
// Needed for StackableDB

@ -315,6 +315,16 @@ class StackableDB : public DB {
db_->GetColumnFamilyMetaData(column_family, cf_meta);
}
using DB::StartBlockCacheTrace;
Status StartBlockCacheTrace(
const TraceOptions& options,
std::unique_ptr<TraceWriter>&& trace_writer) override {
return db_->StartBlockCacheTrace(options, std::move(trace_writer));
}
using DB::EndBlockCacheTrace;
Status EndBlockCacheTrace() override { return db_->EndBlockCacheTrace(); }
#endif // ROCKSDB_LITE
virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs,

@ -198,7 +198,8 @@ Status BlockBasedTableFactory::NewTableReader(
file_size, table_reader, table_reader_options.prefix_extractor,
prefetch_index_and_filter_in_cache, table_reader_options.skip_filters,
table_reader_options.level, table_reader_options.immortal,
table_reader_options.largest_seqno, &tail_prefetch_stats_);
table_reader_options.largest_seqno, &tail_prefetch_stats_,
table_reader_options.block_cache_tracer);
}
TableBuilder* BlockBasedTableFactory::NewTableBuilder(

@ -1020,19 +1020,17 @@ Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix,
return Slice(cache_key, static_cast<size_t>(end - cache_key));
}
Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
const EnvOptions& env_options,
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_comparator,
std::unique_ptr<RandomAccessFileReader>&& file,
uint64_t file_size,
std::unique_ptr<TableReader>* table_reader,
const SliceTransform* prefix_extractor,
const bool prefetch_index_and_filter_in_cache,
const bool skip_filters, const int level,
const bool immortal_table,
const SequenceNumber largest_seqno,
TailPrefetchStats* tail_prefetch_stats) {
Status BlockBasedTable::Open(
const ImmutableCFOptions& ioptions, const EnvOptions& env_options,
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_comparator,
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
std::unique_ptr<TableReader>* table_reader,
const SliceTransform* prefix_extractor,
const bool prefetch_index_and_filter_in_cache, const bool skip_filters,
const int level, const bool immortal_table,
const SequenceNumber largest_seqno, TailPrefetchStats* tail_prefetch_stats,
BlockCacheTracer* const block_cache_tracer) {
table_reader->reset();
Status s;
@ -1082,7 +1080,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
rep->internal_prefix_transform.reset(
new InternalKeySliceTransform(prefix_extractor));
SetupCacheKeyPrefix(rep);
std::unique_ptr<BlockBasedTable> new_table(new BlockBasedTable(rep));
std::unique_ptr<BlockBasedTable> new_table(
new BlockBasedTable(rep, block_cache_tracer));
// page cache options
rep->persistent_cache_options =

@ -35,6 +35,7 @@
#include "table/table_properties_internal.h"
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
#include "trace_replay/block_cache_tracer.h"
#include "util/coding.h"
#include "util/file_reader_writer.h"
#include "util/user_comparator_wrapper.h"
@ -108,7 +109,8 @@ class BlockBasedTable : public TableReader {
bool skip_filters = false, int level = -1,
const bool immortal_table = false,
const SequenceNumber largest_seqno = 0,
TailPrefetchStats* tail_prefetch_stats = nullptr);
TailPrefetchStats* tail_prefetch_stats = nullptr,
BlockCacheTracer* const block_cache_tracer = nullptr);
bool PrefixMayMatch(const Slice& internal_key,
const ReadOptions& read_options,
@ -239,11 +241,13 @@ class BlockBasedTable : public TableReader {
protected:
Rep* rep_;
explicit BlockBasedTable(Rep* rep) : rep_(rep) {}
explicit BlockBasedTable(Rep* rep, BlockCacheTracer* const block_cache_tracer)
: rep_(rep), block_cache_tracer_(block_cache_tracer) {}
private:
friend class MockedBlockBasedTable;
static std::atomic<uint64_t> next_cache_key_id_;
BlockCacheTracer* const block_cache_tracer_;
void UpdateCacheHitMetrics(BlockType block_type, GetContext* get_context,
size_t usage) const;

@ -23,7 +23,8 @@ std::map<uint64_t, Slice> slices;
class MockedBlockBasedTable : public BlockBasedTable {
public:
explicit MockedBlockBasedTable(Rep* rep) : BlockBasedTable(rep) {
explicit MockedBlockBasedTable(Rep* rep)
: BlockBasedTable(rep, /*block_cache_tracer=*/nullptr) {
// Initialize what Open normally does as much as necessary for the test
rep->cache_key_prefix_size = 10;
}

@ -18,6 +18,7 @@
#include "options/cf_options.h"
#include "rocksdb/options.h"
#include "rocksdb/table_properties.h"
#include "trace_replay/block_cache_tracer.h"
#include "util/file_reader_writer.h"
namespace rocksdb {
@ -32,10 +33,12 @@ struct TableReaderOptions {
const EnvOptions& _env_options,
const InternalKeyComparator& _internal_comparator,
bool _skip_filters = false, bool _immortal = false,
int _level = -1)
int _level = -1,
BlockCacheTracer* const _block_cache_tracer = nullptr)
: TableReaderOptions(_ioptions, _prefix_extractor, _env_options,
_internal_comparator, _skip_filters, _immortal,
_level, 0 /* _largest_seqno */) {}
_level, 0 /* _largest_seqno */,
_block_cache_tracer) {}
// @param skip_filters Disables loading/accessing the filter block
TableReaderOptions(const ImmutableCFOptions& _ioptions,
@ -43,7 +46,8 @@ struct TableReaderOptions {
const EnvOptions& _env_options,
const InternalKeyComparator& _internal_comparator,
bool _skip_filters, bool _immortal, int _level,
SequenceNumber _largest_seqno)
SequenceNumber _largest_seqno,
BlockCacheTracer* const _block_cache_tracer)
: ioptions(_ioptions),
prefix_extractor(_prefix_extractor),
env_options(_env_options),
@ -51,7 +55,8 @@ struct TableReaderOptions {
skip_filters(_skip_filters),
immortal(_immortal),
level(_level),
largest_seqno(_largest_seqno) {}
largest_seqno(_largest_seqno),
block_cache_tracer(_block_cache_tracer) {}
const ImmutableCFOptions& ioptions;
const SliceTransform* prefix_extractor;
@ -65,6 +70,7 @@ struct TableReaderOptions {
int level;
// largest seqno in the table
SequenceNumber largest_seqno;
BlockCacheTracer* const block_cache_tracer;
};
struct TableBuilderOptions {

@ -954,7 +954,8 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex,
WriteController wc(options.delayed_write_rate);
WriteBufferManager wb(options.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options);
VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc);
VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr);
Status s = versions.DumpManifest(options, file, verbose, hex, json);
if (!s.ok()) {
printf("Error in processing file %s %s\n", file.c_str(),
@ -1664,7 +1665,8 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
const InternalKeyComparator cmp(opt.comparator);
WriteController wc(opt.delayed_write_rate);
WriteBufferManager wb(opt.db_write_buffer_size);
VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc);
VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr);
std::vector<ColumnFamilyDescriptor> dummy;
ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
ColumnFamilyOptions(opt));

@ -23,30 +23,29 @@ bool ShouldTraceReferencedKey(const BlockCacheTraceRecord& record) {
record.caller == BlockCacheLookupCaller::kUserMGet);
}
BlockCacheTraceWriter::BlockCacheTraceWriter(
Env* env, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer)
: env_(env),
trace_options_(trace_options),
trace_writer_(std::move(trace_writer)) {}
bool BlockCacheTraceWriter::ShouldTrace(
const BlockCacheTraceRecord& record) const {
if (trace_options_.sampling_frequency == 0 ||
trace_options_.sampling_frequency == 1) {
bool ShouldTrace(const BlockCacheTraceRecord& record,
const TraceOptions& trace_options) {
if (trace_options.sampling_frequency == 0 ||
trace_options.sampling_frequency == 1) {
return true;
}
// We use spatial downsampling so that we have a complete access history for a
// block.
const uint64_t hash = GetSliceNPHash64(Slice(record.block_key));
return hash % trace_options_.sampling_frequency == 0;
return hash % trace_options.sampling_frequency == 0;
}
BlockCacheTraceWriter::BlockCacheTraceWriter(
Env* env, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer)
: env_(env),
trace_options_(trace_options),
trace_writer_(std::move(trace_writer)) {}
Status BlockCacheTraceWriter::WriteBlockAccess(
const BlockCacheTraceRecord& record) {
uint64_t trace_file_size = trace_writer_->GetFileSize();
if (trace_file_size > trace_options_.max_trace_file_size ||
!ShouldTrace(record)) {
if (trace_file_size > trace_options_.max_trace_file_size) {
return Status::OK();
}
Trace trace;
@ -68,7 +67,6 @@ Status BlockCacheTraceWriter::WriteBlockAccess(
}
std::string encoded_trace;
TracerHelper::EncodeTrace(trace, &encoded_trace);
InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
return trace_writer_->Write(encoded_trace);
}
@ -81,7 +79,6 @@ Status BlockCacheTraceWriter::WriteHeader() {
PutFixed32(&trace.payload, kMinorVersion);
std::string encoded_trace;
TracerHelper::EncodeTrace(trace, &encoded_trace);
InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
return trace_writer_->Write(encoded_trace);
}
@ -216,4 +213,41 @@ Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) {
return Status::OK();
}
BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }
Status BlockCacheTracer::StartTrace(
Env* env, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
if (writer_.load()) {
return Status::OK();
}
trace_options_ = trace_options;
writer_.store(
new BlockCacheTraceWriter(env, trace_options, std::move(trace_writer)));
return writer_.load()->WriteHeader();
}
void BlockCacheTracer::EndTrace() {
InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
if (!writer_.load()) {
return;
}
delete writer_.load();
writer_.store(nullptr);
}
Status BlockCacheTracer::WriteBlockAccess(const BlockCacheTraceRecord& record) {
if (!writer_.load() || !ShouldTrace(record, trace_options_)) {
return Status::OK();
}
InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
if (!writer_.load()) {
return Status::OK();
}
return writer_.load()->WriteBlockAccess(record);
}
} // namespace rocksdb

@ -5,6 +5,8 @@
#pragma once
#include <atomic>
#include "monitoring/instrumented_mutex.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
@ -101,13 +103,9 @@ class BlockCacheTraceWriter {
Status WriteHeader();
private:
bool ShouldTrace(const BlockCacheTraceRecord& record) const;
Env* env_;
TraceOptions trace_options_;
std::unique_ptr<TraceWriter> trace_writer_;
/*Mutex to protect trace_writer_ */
InstrumentedMutex trace_writer_mutex_;
};
// BlockCacheTraceReader helps read the trace file generated by
@ -130,4 +128,33 @@ class BlockCacheTraceReader {
std::unique_ptr<TraceReader> trace_reader_;
};
// A block cache tracer. It downsamples the accesses according to
// trace_options and uses BlockCacheTraceWriter to write the access record to
// the trace file.
class BlockCacheTracer {
public:
BlockCacheTracer();
~BlockCacheTracer();
// No copy and move.
BlockCacheTracer(const BlockCacheTracer&) = delete;
BlockCacheTracer& operator=(const BlockCacheTracer&) = delete;
BlockCacheTracer(BlockCacheTracer&&) = delete;
BlockCacheTracer& operator=(BlockCacheTracer&&) = delete;
// Start writing block cache accesses to the trace_writer.
Status StartTrace(Env* env, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer);
// Stop writing block cache accesses to the trace_writer.
void EndTrace();
Status WriteBlockAccess(const BlockCacheTraceRecord& record);
private:
TraceOptions trace_options_;
// A mutex protects the writer_.
InstrumentedMutex trace_writer_mutex_;
std::atomic<BlockCacheTraceWriter*> writer_;
};
} // namespace rocksdb

@ -80,6 +80,26 @@ class BlockCacheTracerTest : public testing::Test {
}
}
BlockCacheTraceRecord GenerateAccessRecord() {
uint32_t key_id = 0;
BlockCacheTraceRecord record;
record.block_type = TraceType::kBlockTraceDataBlock;
record.block_size = kBlockSize;
record.block_key = kBlockKeyPrefix + std::to_string(key_id);
record.access_timestamp = env_->NowMicros();
record.cf_id = kCFId;
record.cf_name = kDefaultColumnFamilyName;
record.caller = GetCaller(key_id);
record.level = kLevel;
record.sst_fd_number = kSSTFDNumber + key_id;
record.is_cache_hit = Boolean::kFalse;
record.no_insert = Boolean::kFalse;
record.referenced_key = kRefKeyPrefix + std::to_string(key_id);
record.is_referenced_key_exist_in_block = Boolean::kTrue;
record.num_keys_in_block = kNumKeysInBlock;
return record;
}
void VerifyAccess(BlockCacheTraceReader* reader, uint32_t from_key_id,
TraceType block_type, uint32_t nblocks) {
assert(reader);
@ -118,6 +138,88 @@ class BlockCacheTracerTest : public testing::Test {
std::string test_path_;
};
TEST_F(BlockCacheTracerTest, AtomicWriteBeforeStartTrace) {
BlockCacheTraceRecord record = GenerateAccessRecord();
{
TraceOptions trace_opt;
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
BlockCacheTracer writer;
// The record should be written to the trace_file since StartTrace is not
// called.
ASSERT_OK(writer.WriteBlockAccess(record));
ASSERT_OK(env_->FileExists(trace_file_path_));
}
{
// Verify trace file contains nothing.
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
&trace_reader));
BlockCacheTraceReader reader(std::move(trace_reader));
BlockCacheTraceHeader header;
ASSERT_NOK(reader.ReadHeader(&header));
}
}
TEST_F(BlockCacheTracerTest, AtomicWrite) {
BlockCacheTraceRecord record = GenerateAccessRecord();
{
TraceOptions trace_opt;
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
BlockCacheTracer writer;
ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
ASSERT_OK(writer.WriteBlockAccess(record));
ASSERT_OK(env_->FileExists(trace_file_path_));
}
{
// Verify trace file contains one record.
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
&trace_reader));
BlockCacheTraceReader reader(std::move(trace_reader));
BlockCacheTraceHeader header;
ASSERT_OK(reader.ReadHeader(&header));
ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1);
ASSERT_NOK(reader.ReadAccess(&record));
}
}
TEST_F(BlockCacheTracerTest, AtomicNoWriteAfterEndTrace) {
BlockCacheTraceRecord record = GenerateAccessRecord();
{
TraceOptions trace_opt;
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
BlockCacheTracer writer;
ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
ASSERT_OK(writer.WriteBlockAccess(record));
writer.EndTrace();
// Write the record again. This time the record should not be written since
// EndTrace is called.
ASSERT_OK(writer.WriteBlockAccess(record));
ASSERT_OK(env_->FileExists(trace_file_path_));
}
{
// Verify trace file contains one record.
std::unique_ptr<TraceReader> trace_reader;
ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
&trace_reader));
BlockCacheTraceReader reader(std::move(trace_reader));
BlockCacheTraceHeader header;
ASSERT_OK(reader.ReadHeader(&header));
ASSERT_EQ(kMajorVersion, header.rocksdb_major_version);
ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version);
VerifyAccess(&reader, 0, TraceType::kBlockTraceDataBlock, 1);
ASSERT_NOK(reader.ReadAccess(&record));
}
}
TEST_F(BlockCacheTracerTest, MixedBlocks) {
{
// Generate a trace file containing a mix of blocks.

Loading…
Cancel
Save