Add block cache tracer. (#5410)
Summary: This PR adds a help class block cache tracer to read/write block cache accesses. It uses the trace reader/writer to perform this task. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5410 Differential Revision: D15612843 Pulled By: HaoyuHuang fbshipit-source-id: f30fd1e1524355ca87db5d533a5c086728b141eamain
parent
340ed4fac7
commit
aa71718ac3
@ -0,0 +1,218 @@ |
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#include "trace_replay/block_cache_tracer.h" |
||||
|
||||
#include "db/db_impl/db_impl.h" |
||||
#include "rocksdb/slice.h" |
||||
#include "util/coding.h" |
||||
#include "util/hash.h" |
||||
#include "util/string_util.h" |
||||
|
||||
namespace rocksdb { |
||||
|
||||
namespace { |
||||
const unsigned int kCharSize = 1; |
||||
bool ShouldTraceReferencedKey(const BlockCacheTraceRecord& record) { |
||||
return (record.block_type == TraceType::kBlockTraceDataBlock) && |
||||
(record.caller == BlockCacheLookupCaller::kUserGet || |
||||
record.caller == BlockCacheLookupCaller::kUserMGet); |
||||
} |
||||
} // namespace
|
||||
|
||||
BlockCacheTraceWriter::BlockCacheTraceWriter( |
||||
Env* env, const TraceOptions& trace_options, |
||||
std::unique_ptr<TraceWriter>&& trace_writer) |
||||
: env_(env), |
||||
trace_options_(trace_options), |
||||
trace_writer_(std::move(trace_writer)) {} |
||||
|
||||
bool BlockCacheTraceWriter::ShouldTrace( |
||||
const BlockCacheTraceRecord& record) const { |
||||
if (trace_options_.sampling_frequency == 0 || |
||||
trace_options_.sampling_frequency == 1) { |
||||
return true; |
||||
} |
||||
// We use spatial downsampling so that we have a complete access history for a
|
||||
// block.
|
||||
const uint64_t hash = GetSliceNPHash64(Slice(record.block_key)); |
||||
return hash % trace_options_.sampling_frequency == 0; |
||||
} |
||||
|
||||
Status BlockCacheTraceWriter::WriteBlockAccess( |
||||
const BlockCacheTraceRecord& record) { |
||||
uint64_t trace_file_size = trace_writer_->GetFileSize(); |
||||
if (trace_file_size > trace_options_.max_trace_file_size || |
||||
!ShouldTrace(record)) { |
||||
return Status::OK(); |
||||
} |
||||
Trace trace; |
||||
trace.ts = record.access_timestamp; |
||||
trace.type = record.block_type; |
||||
PutLengthPrefixedSlice(&trace.payload, record.block_key); |
||||
PutFixed64(&trace.payload, record.block_size); |
||||
PutFixed32(&trace.payload, record.cf_id); |
||||
PutLengthPrefixedSlice(&trace.payload, record.cf_name); |
||||
PutFixed32(&trace.payload, record.level); |
||||
PutFixed32(&trace.payload, record.sst_fd_number); |
||||
trace.payload.push_back(record.caller); |
||||
trace.payload.push_back(record.is_cache_hit); |
||||
trace.payload.push_back(record.no_insert); |
||||
if (ShouldTraceReferencedKey(record)) { |
||||
PutLengthPrefixedSlice(&trace.payload, record.referenced_key); |
||||
PutFixed64(&trace.payload, record.num_keys_in_block); |
||||
trace.payload.push_back(record.is_referenced_key_exist_in_block); |
||||
} |
||||
std::string encoded_trace; |
||||
TracerHelper::EncodeTrace(trace, &encoded_trace); |
||||
InstrumentedMutexLock lock_guard(&trace_writer_mutex_); |
||||
return trace_writer_->Write(encoded_trace); |
||||
} |
||||
|
||||
Status BlockCacheTraceWriter::WriteHeader() { |
||||
Trace trace; |
||||
trace.ts = env_->NowMicros(); |
||||
trace.type = TraceType::kTraceBegin; |
||||
PutLengthPrefixedSlice(&trace.payload, kTraceMagic); |
||||
PutFixed32(&trace.payload, kMajorVersion); |
||||
PutFixed32(&trace.payload, kMinorVersion); |
||||
std::string encoded_trace; |
||||
TracerHelper::EncodeTrace(trace, &encoded_trace); |
||||
InstrumentedMutexLock lock_guard(&trace_writer_mutex_); |
||||
return trace_writer_->Write(encoded_trace); |
||||
} |
||||
|
||||
BlockCacheTraceReader::BlockCacheTraceReader( |
||||
std::unique_ptr<TraceReader>&& reader) |
||||
: trace_reader_(std::move(reader)) {} |
||||
|
||||
Status BlockCacheTraceReader::ReadHeader(BlockCacheTraceHeader* header) { |
||||
assert(header != nullptr); |
||||
std::string encoded_trace; |
||||
Status s = trace_reader_->Read(&encoded_trace); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
Trace trace; |
||||
s = TracerHelper::DecodeTrace(encoded_trace, &trace); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
header->start_time = trace.ts; |
||||
Slice enc_slice = Slice(trace.payload); |
||||
Slice magnic_number; |
||||
if (!GetLengthPrefixedSlice(&enc_slice, &magnic_number)) { |
||||
return Status::Corruption( |
||||
"Corrupted header in the trace file: Failed to read the magic number."); |
||||
} |
||||
if (magnic_number.ToString() != kTraceMagic) { |
||||
return Status::Corruption( |
||||
"Corrupted header in the trace file: Magic number does not match."); |
||||
} |
||||
if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) { |
||||
return Status::Corruption( |
||||
"Corrupted header in the trace file: Failed to read rocksdb major " |
||||
"version number."); |
||||
} |
||||
if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) { |
||||
return Status::Corruption( |
||||
"Corrupted header in the trace file: Failed to read rocksdb minor " |
||||
"version number."); |
||||
} |
||||
// We should have retrieved all information in the header.
|
||||
if (!enc_slice.empty()) { |
||||
return Status::Corruption( |
||||
"Corrupted header in the trace file: The length of header is too " |
||||
"long."); |
||||
} |
||||
return Status::OK(); |
||||
} |
||||
|
||||
Status BlockCacheTraceReader::ReadAccess(BlockCacheTraceRecord* record) { |
||||
assert(record); |
||||
std::string encoded_trace; |
||||
Status s = trace_reader_->Read(&encoded_trace); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
Trace trace; |
||||
s = TracerHelper::DecodeTrace(encoded_trace, &trace); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
record->access_timestamp = trace.ts; |
||||
record->block_type = trace.type; |
||||
Slice enc_slice = Slice(trace.payload); |
||||
Slice block_key; |
||||
if (!GetLengthPrefixedSlice(&enc_slice, &block_key)) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read block key."); |
||||
} |
||||
record->block_key = block_key.ToString(); |
||||
if (!GetFixed64(&enc_slice, &record->block_size)) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read block size."); |
||||
} |
||||
if (!GetFixed32(&enc_slice, &record->cf_id)) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read column family ID."); |
||||
} |
||||
Slice cf_name; |
||||
if (!GetLengthPrefixedSlice(&enc_slice, &cf_name)) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read column family name."); |
||||
} |
||||
record->cf_name = cf_name.ToString(); |
||||
if (!GetFixed32(&enc_slice, &record->level)) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read level."); |
||||
} |
||||
if (!GetFixed32(&enc_slice, &record->sst_fd_number)) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read SST file number."); |
||||
} |
||||
if (enc_slice.empty()) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read caller."); |
||||
} |
||||
record->caller = static_cast<BlockCacheLookupCaller>(enc_slice[0]); |
||||
enc_slice.remove_prefix(kCharSize); |
||||
if (enc_slice.empty()) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read is_cache_hit."); |
||||
} |
||||
record->is_cache_hit = static_cast<Boolean>(enc_slice[0]); |
||||
enc_slice.remove_prefix(kCharSize); |
||||
if (enc_slice.empty()) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read no_insert."); |
||||
} |
||||
record->no_insert = static_cast<Boolean>(enc_slice[0]); |
||||
enc_slice.remove_prefix(kCharSize); |
||||
|
||||
if (ShouldTraceReferencedKey(*record)) { |
||||
Slice referenced_key; |
||||
if (!GetLengthPrefixedSlice(&enc_slice, &referenced_key)) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read the referenced key."); |
||||
} |
||||
record->referenced_key = referenced_key.ToString(); |
||||
if (!GetFixed64(&enc_slice, &record->num_keys_in_block)) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read the number of keys in the " |
||||
"block."); |
||||
} |
||||
if (enc_slice.empty()) { |
||||
return Status::Incomplete( |
||||
"Incomplete access record: Failed to read " |
||||
"is_referenced_key_exist_in_block."); |
||||
} |
||||
record->is_referenced_key_exist_in_block = |
||||
static_cast<Boolean>(enc_slice[0]); |
||||
} |
||||
return Status::OK(); |
||||
} |
||||
|
||||
} // namespace rocksdb
|
@ -0,0 +1,105 @@ |
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#pragma once |
||||
|
||||
#include "monitoring/instrumented_mutex.h" |
||||
#include "rocksdb/env.h" |
||||
#include "rocksdb/options.h" |
||||
#include "rocksdb/trace_reader_writer.h" |
||||
#include "trace_replay/trace_replay.h" |
||||
|
||||
namespace rocksdb { |
||||
|
||||
enum BlockCacheLookupCaller : char { |
||||
kUserGet = 1, |
||||
kUserMGet = 2, |
||||
kUserIterator = 3, |
||||
kPrefetch = 4, |
||||
kCompaction = 5, |
||||
// All callers should be added before kMaxBlockCacheLookupCaller.
|
||||
kMaxBlockCacheLookupCaller |
||||
}; |
||||
|
||||
enum Boolean : char { kTrue = 1, kFalse = 0 }; |
||||
|
||||
struct BlockCacheTraceRecord { |
||||
// Required fields for all accesses.
|
||||
uint64_t access_timestamp; |
||||
std::string block_key; |
||||
TraceType block_type; |
||||
uint64_t block_size; |
||||
uint32_t cf_id; |
||||
std::string cf_name; |
||||
uint32_t level; |
||||
uint32_t sst_fd_number; |
||||
BlockCacheLookupCaller caller; |
||||
Boolean is_cache_hit; |
||||
Boolean no_insert; |
||||
|
||||
// Required fields for data block and user Get/Multi-Get only.
|
||||
std::string referenced_key; |
||||
uint64_t num_keys_in_block = 0; |
||||
Boolean is_referenced_key_exist_in_block = Boolean::kFalse; |
||||
}; |
||||
|
||||
struct BlockCacheTraceHeader { |
||||
uint64_t start_time; |
||||
uint32_t rocksdb_major_version; |
||||
uint32_t rocksdb_minor_version; |
||||
}; |
||||
|
||||
// BlockCacheTraceWriter captures all RocksDB block cache accesses using a
|
||||
// user-provided TraceWriter. Every RocksDB operation is written as a single
|
||||
// trace. Each trace will have a timestamp and type, followed by the trace
|
||||
// payload.
|
||||
class BlockCacheTraceWriter { |
||||
public: |
||||
BlockCacheTraceWriter(Env* env, const TraceOptions& trace_options, |
||||
std::unique_ptr<TraceWriter>&& trace_writer); |
||||
~BlockCacheTraceWriter() = default; |
||||
// No copy and move.
|
||||
BlockCacheTraceWriter(const BlockCacheTraceWriter&) = delete; |
||||
BlockCacheTraceWriter& operator=(const BlockCacheTraceWriter&) = delete; |
||||
BlockCacheTraceWriter(BlockCacheTraceWriter&&) = delete; |
||||
BlockCacheTraceWriter& operator=(BlockCacheTraceWriter&&) = delete; |
||||
|
||||
Status WriteBlockAccess(const BlockCacheTraceRecord& record); |
||||
|
||||
// Write a trace header at the beginning, typically on initiating a trace,
|
||||
// with some metadata like a magic number and RocksDB version.
|
||||
Status WriteHeader(); |
||||
|
||||
private: |
||||
bool ShouldTrace(const BlockCacheTraceRecord& record) const; |
||||
|
||||
Env* env_; |
||||
TraceOptions trace_options_; |
||||
std::unique_ptr<TraceWriter> trace_writer_; |
||||
/*Mutex to protect trace_writer_ */ |
||||
InstrumentedMutex trace_writer_mutex_; |
||||
}; |
||||
|
||||
// BlockCacheTraceReader helps read the trace file generated by
|
||||
// BlockCacheTraceWriter using a user provided TraceReader.
|
||||
class BlockCacheTraceReader { |
||||
public: |
||||
BlockCacheTraceReader(std::unique_ptr<TraceReader>&& reader); |
||||
~BlockCacheTraceReader() = default; |
||||
// No copy and move.
|
||||
BlockCacheTraceReader(const BlockCacheTraceReader&) = delete; |
||||
BlockCacheTraceReader& operator=(const BlockCacheTraceReader&) = delete; |
||||
BlockCacheTraceReader(BlockCacheTraceReader&&) = delete; |
||||
BlockCacheTraceReader& operator=(BlockCacheTraceReader&&) = delete; |
||||
|
||||
Status ReadHeader(BlockCacheTraceHeader* header); |
||||
|
||||
Status ReadAccess(BlockCacheTraceRecord* record); |
||||
|
||||
private: |
||||
std::unique_ptr<TraceReader> trace_reader_; |
||||
}; |
||||
|
||||
} // namespace rocksdb
|
@ -0,0 +1,167 @@ |
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#include "trace_replay/block_cache_tracer.h" |
||||
#include "rocksdb/env.h" |
||||
#include "rocksdb/status.h" |
||||
#include "rocksdb/trace_reader_writer.h" |
||||
#include "test_util/testharness.h" |
||||
#include "test_util/testutil.h" |
||||
|
||||
namespace rocksdb { |
||||
|
||||
namespace { |
||||
const uint64_t kBlockSize = 1024; |
||||
const std::string kBlockKeyPrefix = "test-block-"; |
||||
const uint32_t kCFId = 0; |
||||
const uint32_t kLevel = 1; |
||||
const uint64_t kSSTFDNumber = 100; |
||||
const std::string kRefKeyPrefix = "test-get-"; |
||||
const uint64_t kNumKeysInBlock = 1024; |
||||
} // namespace
|
||||
|
||||
class BlockCacheTracerTest : public testing::Test { |
||||
public: |
||||
BlockCacheTracerTest() { |
||||
test_path_ = test::PerThreadDBPath("block_cache_tracer_test"); |
||||
env_ = rocksdb::Env::Default(); |
||||
EXPECT_OK(env_->CreateDir(test_path_)); |
||||
trace_file_path_ = test_path_ + "/block_cache_trace"; |
||||
} |
||||
|
||||
~BlockCacheTracerTest() override { |
||||
EXPECT_OK(env_->DeleteFile(trace_file_path_)); |
||||
EXPECT_OK(env_->DeleteDir(test_path_)); |
||||
} |
||||
|
||||
BlockCacheLookupCaller GetCaller(uint32_t key_id) { |
||||
uint32_t n = key_id % 5; |
||||
switch (n) { |
||||
case 0: |
||||
return BlockCacheLookupCaller::kPrefetch; |
||||
case 1: |
||||
return BlockCacheLookupCaller::kCompaction; |
||||
case 2: |
||||
return BlockCacheLookupCaller::kUserGet; |
||||
case 3: |
||||
return BlockCacheLookupCaller::kUserMGet; |
||||
case 4: |
||||
return BlockCacheLookupCaller::kUserIterator; |
||||
} |
||||
assert(false); |
||||
} |
||||
|
||||
void WriteBlockAccess(BlockCacheTraceWriter* writer, uint32_t from_key_id, |
||||
TraceType block_type, uint32_t nblocks) { |
||||
assert(writer); |
||||
for (uint32_t i = 0; i < nblocks; i++) { |
||||
uint32_t key_id = from_key_id + i; |
||||
BlockCacheTraceRecord record; |
||||
record.block_type = block_type; |
||||
record.block_size = kBlockSize + key_id; |
||||
record.block_key = kBlockKeyPrefix + std::to_string(key_id); |
||||
record.access_timestamp = env_->NowMicros(); |
||||
record.cf_id = kCFId; |
||||
record.cf_name = kDefaultColumnFamilyName; |
||||
record.caller = GetCaller(key_id); |
||||
record.level = kLevel; |
||||
record.sst_fd_number = kSSTFDNumber + key_id; |
||||
record.is_cache_hit = Boolean::kFalse; |
||||
record.no_insert = Boolean::kFalse; |
||||
// Provide these fields for all block types.
|
||||
// The writer should only write these fields for data blocks and the
|
||||
// caller is either GET or MGET.
|
||||
record.referenced_key = kRefKeyPrefix + std::to_string(key_id); |
||||
record.is_referenced_key_exist_in_block = Boolean::kTrue; |
||||
record.num_keys_in_block = kNumKeysInBlock; |
||||
ASSERT_OK(writer->WriteBlockAccess(record)); |
||||
} |
||||
} |
||||
|
||||
void VerifyAccess(BlockCacheTraceReader* reader, uint32_t from_key_id, |
||||
TraceType block_type, uint32_t nblocks) { |
||||
assert(reader); |
||||
for (uint32_t i = 0; i < nblocks; i++) { |
||||
uint32_t key_id = from_key_id + i; |
||||
BlockCacheTraceRecord record; |
||||
ASSERT_OK(reader->ReadAccess(&record)); |
||||
ASSERT_EQ(block_type, record.block_type); |
||||
ASSERT_EQ(kBlockSize + key_id, record.block_size); |
||||
ASSERT_EQ(kBlockKeyPrefix + std::to_string(key_id), record.block_key); |
||||
ASSERT_EQ(kCFId, record.cf_id); |
||||
ASSERT_EQ(kDefaultColumnFamilyName, record.cf_name); |
||||
ASSERT_EQ(GetCaller(key_id), record.caller); |
||||
ASSERT_EQ(kLevel, record.level); |
||||
ASSERT_EQ(kSSTFDNumber + key_id, record.sst_fd_number); |
||||
ASSERT_EQ(Boolean::kFalse, record.is_cache_hit); |
||||
ASSERT_EQ(Boolean::kFalse, record.no_insert); |
||||
if (block_type == TraceType::kBlockTraceDataBlock && |
||||
(record.caller == BlockCacheLookupCaller::kUserGet || |
||||
record.caller == BlockCacheLookupCaller::kUserMGet)) { |
||||
ASSERT_EQ(kRefKeyPrefix + std::to_string(key_id), |
||||
record.referenced_key); |
||||
ASSERT_EQ(Boolean::kTrue, record.is_referenced_key_exist_in_block); |
||||
ASSERT_EQ(kNumKeysInBlock, record.num_keys_in_block); |
||||
continue; |
||||
} |
||||
ASSERT_EQ("", record.referenced_key); |
||||
ASSERT_EQ(Boolean::kFalse, record.is_referenced_key_exist_in_block); |
||||
ASSERT_EQ(0, record.num_keys_in_block); |
||||
} |
||||
} |
||||
|
||||
Env* env_; |
||||
EnvOptions env_options_; |
||||
std::string trace_file_path_; |
||||
std::string test_path_; |
||||
}; |
||||
|
||||
TEST_F(BlockCacheTracerTest, MixedBlocks) { |
||||
{ |
||||
// Generate a trace file containing a mix of blocks.
|
||||
TraceOptions trace_opt; |
||||
std::unique_ptr<TraceWriter> trace_writer; |
||||
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, |
||||
&trace_writer)); |
||||
BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer)); |
||||
ASSERT_OK(writer.WriteHeader()); |
||||
// Write blocks of different types.
|
||||
WriteBlockAccess(&writer, 0, TraceType::kBlockTraceUncompressionDictBlock, |
||||
10); |
||||
WriteBlockAccess(&writer, 10, TraceType::kBlockTraceDataBlock, 10); |
||||
WriteBlockAccess(&writer, 20, TraceType::kBlockTraceFilterBlock, 10); |
||||
WriteBlockAccess(&writer, 30, TraceType::kBlockTraceIndexBlock, 10); |
||||
WriteBlockAccess(&writer, 40, TraceType::kBlockTraceRangeDeletionBlock, 10); |
||||
ASSERT_OK(env_->FileExists(trace_file_path_)); |
||||
} |
||||
|
||||
{ |
||||
// Verify trace file is generated correctly.
|
||||
std::unique_ptr<TraceReader> trace_reader; |
||||
ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, |
||||
&trace_reader)); |
||||
BlockCacheTraceReader reader(std::move(trace_reader)); |
||||
BlockCacheTraceHeader header; |
||||
ASSERT_OK(reader.ReadHeader(&header)); |
||||
ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); |
||||
ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); |
||||
// Read blocks.
|
||||
VerifyAccess(&reader, 0, TraceType::kBlockTraceUncompressionDictBlock, 10); |
||||
VerifyAccess(&reader, 10, TraceType::kBlockTraceDataBlock, 10); |
||||
VerifyAccess(&reader, 20, TraceType::kBlockTraceFilterBlock, 10); |
||||
VerifyAccess(&reader, 30, TraceType::kBlockTraceIndexBlock, 10); |
||||
VerifyAccess(&reader, 40, TraceType::kBlockTraceRangeDeletionBlock, 10); |
||||
// Read one more record should report an error.
|
||||
BlockCacheTraceRecord record; |
||||
ASSERT_NOK(reader.ReadAccess(&record)); |
||||
} |
||||
} |
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue