Update compaction statistics to include the amount of data read from blob files (#8022)

Summary:
The patch does the following:
1) Exposes the amount of data (number of bytes) read from blob files from
`BlobFileReader::GetBlob` / `Version::GetBlob`.
2) Tracks the total number and size of blobs read from blob files during a
compaction (due to garbage collection or compaction filter usage) in
`CompactionIterationStats` and propagates this data to
`InternalStats::CompactionStats` / `CompactionJobStats`.
3) Updates the formulae for write amplification calculations to include the
amount of data read from blob files.
4) Extends the compaction stats dump with a new column `Rblob(GB)` and
a new line containing the total number and size of blob files in the current
`Version` to complement the information about the shape and size of the LSM tree
that's already there.
5) Updates `CompactionJobStats` so that the number of files and amount of data
written by a compaction are broken down per file type (i.e. table/blob file).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8022

Test Plan: Ran `make check` and `db_bench`.

Reviewed By: riversand963

Differential Revision: D26801199

Pulled By: ltamasi

fbshipit-source-id: 28a5f072048a702643b28cb5971b4099acabbfb2
main
Levi Tamasi 3 years ago committed by Facebook GitHub Bot
parent 4126bdc0e1
commit cb25bc1128
  1. 1
      HISTORY.md
  2. 13
      db/blob/blob_file_reader.cc
  3. 3
      db/blob/blob_file_reader.h
  4. 70
      db/blob/blob_file_reader_test.cc
  5. 61
      db/blob/db_blob_compaction_test.cc
  6. 4
      db/compaction/compaction_iteration_stats.h
  7. 17
      db/compaction/compaction_iterator.cc
  8. 53
      db/compaction/compaction_job.cc
  9. 33
      db/db_compaction_test.cc
  10. 6
      db/db_iter.cc
  11. 36
      db/internal_stats.cc
  12. 13
      db/internal_stats.h
  13. 21
      db/version_set.cc
  14. 19
      db/version_set.h
  15. 18
      include/rocksdb/compaction_job_stats.h
  16. 8
      util/compaction_job_stats_impl.cc

@ -8,6 +8,7 @@
* Add suppport to extend DB::VerifyFileChecksums API to also verify blob files checksum.
* When using the new BlobDB, the amount of data written by flushes/compactions is now broken down into table files and blob files in the compaction statistics; namely, Write(GB) denotes the amount of data written to table files, while Wblob(GB) means the amount of data written to blob files.
* Add new SetBufferSize API to WriteBufferManager to allow dynamic management of memory allotted to all write buffers. This allows user code to adjust memory monitoring provided by WriteBufferManager as process memory needs change datasets grow and shrink.
* For the new integrated BlobDB implementation, compaction statistics now include the amount of data read from blob files during compaction (due to garbage collection or compaction filters). Write amplification metrics have also been extended to account for data read from blob files.
### New Features
* Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision.

@ -271,7 +271,8 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
const Slice& user_key, uint64_t offset,
uint64_t value_size,
CompressionType compression_type,
PinnableSlice* value) const {
PinnableSlice* value,
uint64_t* bytes_read) const {
assert(value);
const uint64_t key_size = user_key.size();
@ -294,6 +295,9 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
: 0;
assert(offset >= adjustment);
const uint64_t record_offset = offset - adjustment;
const uint64_t record_size = value_size + adjustment;
Slice record_slice;
Buffer buf;
AlignedBuf aligned_buf;
@ -301,9 +305,6 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
{
TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile");
const uint64_t record_offset = offset - adjustment;
const uint64_t record_size = value_size + adjustment;
const Status s = ReadFromFile(file_reader_.get(), record_offset,
static_cast<size_t>(record_size),
&record_slice, &buf, &aligned_buf);
@ -332,6 +333,10 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
}
}
if (bytes_read) {
*bytes_read = record_size;
}
return Status::OK();
}

@ -39,7 +39,8 @@ class BlobFileReader {
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
uint64_t offset, uint64_t value_size,
CompressionType compression_type, PinnableSlice* value) const;
CompressionType compression_type, PinnableSlice* value,
uint64_t* bytes_read) const;
private:
BlobFileReader(std::unique_ptr<RandomAccessFileReader>&& file_reader,

@ -152,83 +152,103 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
{
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kNoCompression, &value));
kNoCompression, &value, &bytes_read));
ASSERT_EQ(value, blob);
ASSERT_EQ(bytes_read, blob_size);
}
read_options.verify_checksums = true;
{
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kNoCompression, &value));
kNoCompression, &value, &bytes_read));
ASSERT_EQ(value, blob);
constexpr uint64_t key_size = sizeof(key) - 1;
ASSERT_EQ(bytes_read,
BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
blob_size);
}
// Invalid offset (too close to start of file)
{
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(read_options, key, blob_offset - 1, blob_size,
kNoCompression, &value)
kNoCompression, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
}
// Invalid offset (too close to end of file)
{
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(read_options, key, blob_offset + 1, blob_size,
kNoCompression, &value)
kNoCompression, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
}
// Incorrect compression type
{
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(
reader
->GetBlob(read_options, key, blob_offset, blob_size, kZSTD, &value)
.IsCorruption());
ASSERT_TRUE(reader
->GetBlob(read_options, key, blob_offset, blob_size, kZSTD,
&value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
}
// Incorrect key size
{
constexpr char shorter_key[] = "k";
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(read_options, shorter_key,
blob_offset - (sizeof(key) - sizeof(shorter_key)),
blob_size, kNoCompression, &value)
blob_size, kNoCompression, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
}
// Incorrect key
{
constexpr char incorrect_key[] = "foo";
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(read_options, incorrect_key, blob_offset,
blob_size, kNoCompression, &value)
blob_size, kNoCompression, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
}
// Incorrect value size
{
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(read_options, key, blob_offset, blob_size + 1,
kNoCompression, &value)
kNoCompression, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
}
}
@ -479,11 +499,13 @@ TEST_F(BlobFileReaderTest, BlobCRCError) {
SyncPoint::GetInstance()->EnableProcessing();
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, &value)
kNoCompression, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
@ -530,20 +552,28 @@ TEST_F(BlobFileReaderTest, Compression) {
{
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kSnappyCompression, &value));
kSnappyCompression, &value, &bytes_read));
ASSERT_EQ(value, blob);
ASSERT_EQ(bytes_read, blob_size);
}
read_options.verify_checksums = true;
{
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kSnappyCompression, &value));
kSnappyCompression, &value, &bytes_read));
ASSERT_EQ(value, blob);
constexpr uint64_t key_size = sizeof(key) - 1;
ASSERT_EQ(bytes_read,
BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
blob_size);
}
}
@ -596,11 +626,13 @@ TEST_F(BlobFileReaderTest, UncompressionError) {
SyncPoint::GetInstance()->EnableProcessing();
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kSnappyCompression, &value)
kSnappyCompression, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
@ -678,11 +710,13 @@ TEST_P(BlobFileReaderIOErrorTest, IOError) {
ASSERT_OK(s);
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, &value)
kNoCompression, &value, &bytes_read)
.IsIOError());
ASSERT_EQ(bytes_read, 0);
}
SyncPoint::GetInstance()->DisableProcessing();
@ -758,11 +792,13 @@ TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) {
ASSERT_OK(s);
PinnableSlice value;
uint64_t bytes_read = 0;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, &value)
kNoCompression, &value, &bytes_read)
.IsCorruption());
ASSERT_EQ(bytes_read, 0);
}
SyncPoint::GetInstance()->DisableProcessing();

@ -41,6 +41,22 @@ class DBBlobCompactionTest : public DBTestBase {
return result;
}
#ifndef ROCKSDB_LITE
const std::vector<InternalStats::CompactionStats>& GetCompactionStats() {
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
assert(versions->GetColumnFamilySet());
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
return internal_stats->TEST_GetCompactionStats();
}
#endif // ROCKSDB_LITE
};
namespace {
@ -214,6 +230,17 @@ TEST_F(DBBlobCompactionTest, FilterByKeyLength) {
value.clear();
ASSERT_OK(db_->Get(ReadOptions(), long_key, &value));
ASSERT_EQ("value", value);
#ifndef ROCKSDB_LITE
const auto& compaction_stats = GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
// Filter decides between kKeep and kRemove solely based on key;
// this involves neither reading nor writing blobs
ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
#endif // ROCKSDB_LITE
Close();
}
@ -239,6 +266,17 @@ TEST_F(DBBlobCompactionTest, BlindWriteFilter) {
for (const auto& key : keys) {
ASSERT_EQ(new_blob_value, Get(key));
}
#ifndef ROCKSDB_LITE
const auto& compaction_stats = GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
// Filter unconditionally changes value in FilterBlobByKey;
// this involves writing but not reading blobs
ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
#endif // ROCKSDB_LITE
Close();
}
@ -312,6 +350,17 @@ TEST_F(DBBlobCompactionTest, CompactionFilter) {
for (const auto& kv : kvs) {
ASSERT_EQ(kv.second + std::string(padding), Get(kv.first));
}
#ifndef ROCKSDB_LITE
const auto& compaction_stats = GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
// Filter changes the value using the previous value in FilterV2;
// this involves reading and writing blobs
ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
#endif // ROCKSDB_LITE
Close();
}
@ -354,6 +403,16 @@ TEST_F(DBBlobCompactionTest, CompactionFilterReadBlobAndKeep) {
/*end=*/nullptr));
ASSERT_EQ(blob_files, GetBlobFileNumbers());
#ifndef ROCKSDB_LITE
const auto& compaction_stats = GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
// Filter decides to keep the existing value in FilterV2;
// this involves reading but not writing blobs
ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
#endif // ROCKSDB_LITE
Close();
}
@ -363,4 +422,4 @@ int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}

@ -34,4 +34,8 @@ struct CompactionIterationStats {
// Single-Delete diagnostics for exceptional situations
uint64_t num_single_del_fallthru = 0;
uint64_t num_single_del_mismatch = 0;
// Blob related statistics
uint64_t num_blobs_read = 0;
uint64_t total_blob_bytes_read = 0;
};

@ -253,13 +253,19 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
}
const Version* const version = compaction_->input_version();
assert(version);
uint64_t bytes_read = 0;
s = version->GetBlob(ReadOptions(), ikey_.user_key, blob_index,
&blob_value_);
&blob_value_, &bytes_read);
if (!s.ok()) {
status_ = s;
valid_ = false;
return false;
}
++iter_stats_.num_blobs_read;
iter_stats_.total_blob_bytes_read += bytes_read;
value_type = CompactionFilter::ValueType::kValue;
}
}
@ -883,9 +889,11 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() {
const Version* const version = compaction_->input_version();
assert(version);
uint64_t bytes_read = 0;
{
const Status s =
version->GetBlob(ReadOptions(), user_key(), blob_index, &blob_value_);
const Status s = version->GetBlob(ReadOptions(), user_key(), blob_index,
&blob_value_, &bytes_read);
if (!s.ok()) {
status_ = s;
@ -895,6 +903,9 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() {
}
}
++iter_stats_.num_blobs_read;
iter_stats_.total_blob_bytes_read += bytes_read;
value_ = blob_value_;
if (ExtractLargeValueIfNeededImpl()) {

@ -794,20 +794,23 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
double bytes_read_per_sec = 0;
double bytes_written_per_sec = 0;
if (stats.bytes_read_non_output_levels > 0) {
read_write_amp =
(stats.bytes_written + stats.bytes_written_blob +
stats.bytes_read_output_level + stats.bytes_read_non_output_levels) /
static_cast<double>(stats.bytes_read_non_output_levels);
write_amp = (stats.bytes_written + stats.bytes_written_blob) /
static_cast<double>(stats.bytes_read_non_output_levels);
const uint64_t bytes_read_non_output_and_blob =
stats.bytes_read_non_output_levels + stats.bytes_read_blob;
const uint64_t bytes_read_all =
stats.bytes_read_output_level + bytes_read_non_output_and_blob;
const uint64_t bytes_written_all =
stats.bytes_written + stats.bytes_written_blob;
if (bytes_read_non_output_and_blob > 0) {
read_write_amp = (bytes_written_all + bytes_read_all) /
static_cast<double>(bytes_read_non_output_and_blob);
write_amp =
bytes_written_all / static_cast<double>(bytes_read_non_output_and_blob);
}
if (stats.micros > 0) {
bytes_read_per_sec =
(stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
static_cast<double>(stats.micros);
bytes_written_per_sec = (stats.bytes_written + stats.bytes_written_blob) /
static_cast<double>(stats.micros);
bytes_read_per_sec = bytes_read_all / static_cast<double>(stats.micros);
bytes_written_per_sec =
bytes_written_all / static_cast<double>(stats.micros);
}
const std::string& column_family_name = cfd->GetName();
@ -818,8 +821,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d +%d blob) "
"MB in(%.1f, %.1f) out(%.1f +%.1f blob), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s, records in: %" PRIu64
"MB in(%.1f, %.1f +%.1f blob) out(%.1f +%.1f blob), "
"read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
", records dropped: %" PRIu64 " output_compression: %s\n",
column_family_name.c_str(), vstorage->LevelSummary(&tmp),
bytes_read_per_sec, bytes_written_per_sec,
@ -827,9 +830,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
stats.num_input_files_in_non_output_levels,
stats.num_input_files_in_output_level, stats.num_output_files,
stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
stats.bytes_read_output_level / kMB, stats.bytes_written / kMB,
stats.bytes_written_blob / kMB, read_write_amp, write_amp,
status.ToString().c_str(), stats.num_input_records,
stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
stats.bytes_written / kMB, stats.bytes_written_blob / kMB, read_write_amp,
write_amp, status.ToString().c_str(), stats.num_input_records,
stats.num_dropped_records,
CompressionTypeToString(compact_->compaction->output_compression())
.c_str());
@ -1124,6 +1127,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
}
sub_compact->compaction_job_stats.num_blobs_read =
c_iter_stats.num_blobs_read;
sub_compact->compaction_job_stats.total_blob_bytes_read =
c_iter_stats.total_blob_bytes_read;
sub_compact->compaction_job_stats.num_input_deletion_records =
c_iter_stats.num_input_deletion_records;
sub_compact->compaction_job_stats.num_corrupt_keys =
@ -1827,6 +1834,10 @@ void CompactionJob::UpdateCompactionStats() {
}
}
assert(compaction_job_stats_);
compaction_stats_.bytes_read_blob =
compaction_job_stats_->total_blob_bytes_read;
compaction_stats_.num_output_files =
static_cast<int>(compact_->num_output_files);
compaction_stats_.num_output_files_blob =
@ -1871,11 +1882,11 @@ void CompactionJob::UpdateCompactionJobStats(
stats.num_input_files_in_output_level;
// output information
compaction_job_stats_->total_output_bytes =
stats.bytes_written + stats.bytes_written_blob;
compaction_job_stats_->total_output_bytes = stats.bytes_written;
compaction_job_stats_->total_output_bytes_blob = stats.bytes_written_blob;
compaction_job_stats_->num_output_records = compact_->num_output_records;
compaction_job_stats_->num_output_files =
stats.num_output_files + stats.num_output_files_blob;
compaction_job_stats_->num_output_files = stats.num_output_files;
compaction_job_stats_->num_output_files_blob = stats.num_output_files_blob;
if (stats.num_output_files > 0) {
CopyPrefix(compact_->SmallestUserKey(),

@ -5948,6 +5948,7 @@ TEST_F(DBCompactionTest, CompactionWithBlob) {
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
ASSERT_EQ(compaction_stats[1].bytes_written, table_file->fd.GetFileSize());
ASSERT_EQ(compaction_stats[1].bytes_written_blob,
blob_file->GetTotalBlobBytes());
@ -6039,12 +6040,14 @@ TEST_P(DBCompactionTestBlobError, CompactionError) {
ASSERT_GE(compaction_stats.size(), 2);
if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
ASSERT_EQ(compaction_stats[1].bytes_written, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
ASSERT_EQ(compaction_stats[1].num_output_files, 0);
ASSERT_EQ(compaction_stats[1].num_output_files_blob, 0);
} else {
// SST file writing succeeded; blob file writing failed (during Finish)
ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
ASSERT_GT(compaction_stats[1].bytes_written, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
ASSERT_EQ(compaction_stats[1].num_output_files, 1);
@ -6133,6 +6136,36 @@ TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) {
for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
}
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
assert(versions->GetColumnFamilySet());
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
if (blob_gc_age_cutoff_ > 0.0) {
ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
if (updated_enable_blob_files_) {
// GC relocated some blobs to new blob files
ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
ASSERT_EQ(compaction_stats[1].bytes_read_blob,
compaction_stats[1].bytes_written_blob);
} else {
// GC moved some blobs back to the LSM, no new blob files
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
}
} else {
ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
}
}
TEST_F(DBCompactionTest, CompactionWithBlobGCError_CorruptIndex) {

@ -193,8 +193,10 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
read_options.read_tier = read_tier_;
read_options.verify_checksums = verify_checksums_;
const Status s =
version_->GetBlob(read_options, user_key, blob_index, &blob_value_);
constexpr uint64_t* bytes_read = nullptr;
const Status s = version_->GetBlob(read_options, user_key, blob_index,
&blob_value_, bytes_read);
if (!s.ok()) {
status_ = s;

@ -50,6 +50,7 @@ const std::map<LevelStatType, LevelStat> InternalStats::compaction_level_stats =
{LevelStatType::AVG_SEC, LevelStat{"AvgSec", "Avg(sec)"}},
{LevelStatType::KEY_IN, LevelStat{"KeyIn", "KeyIn"}},
{LevelStatType::KEY_DROP, LevelStat{"KeyDrop", "KeyDrop"}},
{LevelStatType::R_BLOB_GB, LevelStat{"RblobGB", "Rblob(GB)"}},
{LevelStatType::W_BLOB_GB, LevelStat{"WblobGB", "Wblob(GB)"}},
};
@ -68,7 +69,8 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name,
};
int line_size = snprintf(
buf + written_size, len - written_size,
"%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n",
"%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s "
"%s\n",
// Note that we skip COMPACTED_FILES and merge it with Files column
group_by.c_str(), hdr(LevelStatType::NUM_FILES),
hdr(LevelStatType::SIZE_BYTES), hdr(LevelStatType::SCORE),
@ -79,7 +81,8 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name,
hdr(LevelStatType::WRITE_MBPS), hdr(LevelStatType::COMP_SEC),
hdr(LevelStatType::COMP_CPU_SEC), hdr(LevelStatType::COMP_COUNT),
hdr(LevelStatType::AVG_SEC), hdr(LevelStatType::KEY_IN),
hdr(LevelStatType::KEY_DROP), hdr(LevelStatType::W_BLOB_GB));
hdr(LevelStatType::KEY_DROP), hdr(LevelStatType::R_BLOB_GB),
hdr(LevelStatType::W_BLOB_GB));
written_size += line_size;
written_size = std::min(written_size, static_cast<int>(len));
@ -91,8 +94,9 @@ void PrepareLevelStats(std::map<LevelStatType, double>* level_stats,
int num_files, int being_compacted,
double total_file_size, double score, double w_amp,
const InternalStats::CompactionStats& stats) {
const uint64_t bytes_read =
stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
const uint64_t bytes_read = stats.bytes_read_non_output_levels +
stats.bytes_read_output_level +
stats.bytes_read_blob;
const uint64_t bytes_written = stats.bytes_written + stats.bytes_written_blob;
const int64_t bytes_new = stats.bytes_written - stats.bytes_read_output_level;
const double elapsed = (stats.micros + 1) / kMicrosInSec;
@ -120,6 +124,7 @@ void PrepareLevelStats(std::map<LevelStatType, double>* level_stats,
static_cast<double>(stats.num_input_records);
(*level_stats)[LevelStatType::KEY_DROP] =
static_cast<double>(stats.num_dropped_records);
(*level_stats)[LevelStatType::R_BLOB_GB] = stats.bytes_read_blob / kGB;
(*level_stats)[LevelStatType::W_BLOB_GB] = stats.bytes_written_blob / kGB;
}
@ -146,6 +151,7 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name,
"%8.3f " /* Avg(sec) */
"%7s " /* KeyIn */
"%6s " /* KeyDrop */
"%9.1f " /* Rblob(GB) */
"%9.1f\n", /* Wblob(GB) */
name.c_str(), static_cast<int>(stat_value.at(LevelStatType::NUM_FILES)),
static_cast<int>(stat_value.at(LevelStatType::COMPACTED_FILES)),
@ -172,6 +178,7 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name,
NumberToHumanString(
static_cast<std::int64_t>(stat_value.at(LevelStatType::KEY_DROP)))
.c_str(),
stat_value.at(LevelStatType::R_BLOB_GB),
stat_value.at(LevelStatType::W_BLOB_GB));
}
@ -1116,9 +1123,10 @@ void InternalStats::DumpDBStats(std::string* value) {
*/
void InternalStats::DumpCFMapStats(
std::map<std::string, std::string>* cf_stats) {
const VersionStorageInfo* vstorage = cfd_->current()->storage_info();
CompactionStats compaction_stats_sum;
std::map<int, std::map<LevelStatType, double>> levels_stats;
DumpCFMapStats(&levels_stats, &compaction_stats_sum);
DumpCFMapStats(vstorage, &levels_stats, &compaction_stats_sum);
for (auto const& level_ent : levels_stats) {
auto level_str =
level_ent.first == -1 ? "Sum" : "L" + ToString(level_ent.first);
@ -1135,9 +1143,10 @@ void InternalStats::DumpCFMapStats(
}
void InternalStats::DumpCFMapStats(
const VersionStorageInfo* vstorage,
std::map<int, std::map<LevelStatType, double>>* levels_stats,
CompactionStats* compaction_stats_sum) {
const VersionStorageInfo* vstorage = cfd_->current()->storage_info();
assert(vstorage);
int num_levels_to_check =
(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO)
@ -1178,7 +1187,8 @@ void InternalStats::DumpCFMapStats(
if (level == 0) {
input_bytes = curr_ingest;
} else {
input_bytes = comp_stats_[level].bytes_read_non_output_levels;
input_bytes = comp_stats_[level].bytes_read_non_output_levels +
comp_stats_[level].bytes_read_blob;
}
double w_amp =
(input_bytes == 0)
@ -1262,9 +1272,10 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
value->append(buf);
// Print stats for each level
const VersionStorageInfo* vstorage = cfd_->current()->storage_info();
std::map<int, std::map<LevelStatType, double>> levels_stats;
CompactionStats compaction_stats_sum;
DumpCFMapStats(&levels_stats, &compaction_stats_sum);
DumpCFMapStats(vstorage, &levels_stats, &compaction_stats_sum);
for (int l = 0; l < number_levels_; ++l) {
if (levels_stats.find(l) != levels_stats.end()) {
PrintLevelStats(buf, sizeof(buf), "L" + ToString(l), levels_stats[l]);
@ -1320,6 +1331,12 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
}
}
snprintf(buf, sizeof(buf),
"\nBlob file count: %" ROCKSDB_PRIszt ", total size: %.1f GB\n\n",
vstorage->GetBlobFiles().size(),
vstorage->GetTotalBlobFileSize() / kGB);
value->append(buf);
double seconds_up = (clock_->NowMicros() - started_at_ + 1) / kMicrosInSec;
double interval_seconds_up = seconds_up - cf_stats_snapshot_.seconds_up;
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n",
@ -1360,7 +1377,8 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
uint64_t compact_micros = 0;
for (int level = 0; level < number_levels_; level++) {
compact_bytes_read += comp_stats_[level].bytes_read_output_level +
comp_stats_[level].bytes_read_non_output_levels;
comp_stats_[level].bytes_read_non_output_levels +
comp_stats_[level].bytes_read_blob;
compact_bytes_write += comp_stats_[level].bytes_written +
comp_stats_[level].bytes_written_blob;
compact_micros += comp_stats_[level].micros;

@ -79,6 +79,7 @@ enum class LevelStatType {
AVG_SEC,
KEY_IN,
KEY_DROP,
R_BLOB_GB,
W_BLOB_GB,
TOTAL // total number of types
};
@ -150,6 +151,9 @@ class InternalStats {
// The number of bytes read from the compaction output level (table files)
uint64_t bytes_read_output_level;
// The number of bytes read from blob files
uint64_t bytes_read_blob;
// Total number of bytes written to table files during compaction
uint64_t bytes_written;
@ -190,6 +194,7 @@ class InternalStats {
cpu_micros(0),
bytes_read_non_output_levels(0),
bytes_read_output_level(0),
bytes_read_blob(0),
bytes_written(0),
bytes_written_blob(0),
bytes_moved(0),
@ -211,6 +216,7 @@ class InternalStats {
cpu_micros(0),
bytes_read_non_output_levels(0),
bytes_read_output_level(0),
bytes_read_blob(0),
bytes_written(0),
bytes_written_blob(0),
bytes_moved(0),
@ -238,6 +244,7 @@ class InternalStats {
cpu_micros(c.cpu_micros),
bytes_read_non_output_levels(c.bytes_read_non_output_levels),
bytes_read_output_level(c.bytes_read_output_level),
bytes_read_blob(c.bytes_read_blob),
bytes_written(c.bytes_written),
bytes_written_blob(c.bytes_written_blob),
bytes_moved(c.bytes_moved),
@ -260,6 +267,7 @@ class InternalStats {
cpu_micros = c.cpu_micros;
bytes_read_non_output_levels = c.bytes_read_non_output_levels;
bytes_read_output_level = c.bytes_read_output_level;
bytes_read_blob = c.bytes_read_blob;
bytes_written = c.bytes_written;
bytes_written_blob = c.bytes_written_blob;
bytes_moved = c.bytes_moved;
@ -284,6 +292,7 @@ class InternalStats {
this->cpu_micros = 0;
this->bytes_read_non_output_levels = 0;
this->bytes_read_output_level = 0;
this->bytes_read_blob = 0;
this->bytes_written = 0;
this->bytes_written_blob = 0;
this->bytes_moved = 0;
@ -305,6 +314,7 @@ class InternalStats {
this->cpu_micros += c.cpu_micros;
this->bytes_read_non_output_levels += c.bytes_read_non_output_levels;
this->bytes_read_output_level += c.bytes_read_output_level;
this->bytes_read_blob += c.bytes_read_blob;
this->bytes_written += c.bytes_written;
this->bytes_written_blob += c.bytes_written_blob;
this->bytes_moved += c.bytes_moved;
@ -328,6 +338,7 @@ class InternalStats {
this->cpu_micros -= c.cpu_micros;
this->bytes_read_non_output_levels -= c.bytes_read_non_output_levels;
this->bytes_read_output_level -= c.bytes_read_output_level;
this->bytes_read_blob -= c.bytes_read_blob;
this->bytes_written -= c.bytes_written;
this->bytes_written_blob -= c.bytes_written_blob;
this->bytes_moved -= c.bytes_moved;
@ -435,6 +446,7 @@ class InternalStats {
void DumpDBStats(std::string* value);
void DumpCFMapStats(std::map<std::string, std::string>* cf_stats);
void DumpCFMapStats(
const VersionStorageInfo* vstorage,
std::map<int, std::map<LevelStatType, double>>* level_stats,
CompactionStats* compaction_stats_sum);
void DumpCFMapStatsByPriority(
@ -674,6 +686,7 @@ class InternalStats {
uint64_t cpu_micros;
uint64_t bytes_read_non_output_levels;
uint64_t bytes_read_output_level;
uint64_t bytes_read_blob;
uint64_t bytes_written;
uint64_t bytes_written_blob;
uint64_t bytes_moved;

@ -1793,8 +1793,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
io_tracer_(io_tracer) {}
Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
const Slice& blob_index_slice,
PinnableSlice* value) const {
const Slice& blob_index_slice, PinnableSlice* value,
uint64_t* bytes_read) const {
if (read_options.read_tier == kBlockCacheTier) {
return Status::Incomplete("Cannot read blob: no disk I/O allowed");
}
@ -1808,12 +1808,12 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
}
}
return GetBlob(read_options, user_key, blob_index, value);
return GetBlob(read_options, user_key, blob_index, value, bytes_read);
}
Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
const BlobIndex& blob_index,
PinnableSlice* value) const {
const BlobIndex& blob_index, PinnableSlice* value,
uint64_t* bytes_read) const {
assert(value);
if (blob_index.HasTTL() || blob_index.IsInlined()) {
@ -1843,7 +1843,7 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
assert(blob_file_reader.GetValue());
const Status s = blob_file_reader.GetValue()->GetBlob(
read_options, user_key, blob_index.offset(), blob_index.size(),
blob_index.compression(), value);
blob_index.compression(), value, bytes_read);
return s;
}
@ -1953,7 +1953,10 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
if (is_blob_index) {
if (do_merge && value) {
*status = GetBlob(read_options, user_key, *value, value);
constexpr uint64_t* bytes_read = nullptr;
*status =
GetBlob(read_options, user_key, *value, value, bytes_read);
if (!status->ok()) {
if (status->IsIncomplete()) {
get_context.MarkKeyMayExist();
@ -2147,8 +2150,10 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
if (iter->is_blob_index) {
if (iter->value) {
constexpr uint64_t* bytes_read = nullptr;
*status = GetBlob(read_options, iter->ukey_with_ts, *iter->value,
iter->value);
iter->value, bytes_read);
if (!status->ok()) {
if (status->IsIncomplete()) {
get_context.MarkKeyMayExist();

@ -344,6 +344,19 @@ class VersionStorageInfo {
using BlobFiles = std::map<uint64_t, std::shared_ptr<BlobFileMetaData>>;
const BlobFiles& GetBlobFiles() const { return blob_files_; }
uint64_t GetTotalBlobFileSize() const {
uint64_t total_blob_bytes = 0;
for (const auto& pair : blob_files_) {
const auto& meta = pair.second;
assert(meta);
total_blob_bytes += meta->GetTotalBlobBytes();
}
return total_blob_bytes;
}
const ROCKSDB_NAMESPACE::LevelFilesBrief& LevelFilesBrief(int level) const {
assert(level < static_cast<int>(level_files_brief_.size()));
return level_files_brief_[level];
@ -690,12 +703,14 @@ class Version {
// saves it in *value.
// REQUIRES: blob_index_slice stores an encoded blob reference
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
const Slice& blob_index_slice, PinnableSlice* value) const;
const Slice& blob_index_slice, PinnableSlice* value,
uint64_t* bytes_read) const;
// Retrieves a blob using a blob reference and saves it in *value,
// assuming the corresponding blob file is part of this Version.
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
const BlobIndex& blob_index, PinnableSlice* value) const;
const BlobIndex& blob_index, PinnableSlice* value,
uint64_t* bytes_read) const;
// Loads some stats information from files. Call without mutex held. It needs
// to be called before applying the version to the version set.

@ -25,25 +25,33 @@ struct CompactionJobStats {
// the number of compaction input records.
uint64_t num_input_records;
// the number of compaction input files.
// the number of blobs read from blob files
uint64_t num_blobs_read;
// the number of compaction input files (table files)
size_t num_input_files;
// the number of compaction input files at the output level.
// the number of compaction input files at the output level (table files)
size_t num_input_files_at_output_level;
// the number of compaction output records.
uint64_t num_output_records;
// the number of compaction output files.
// the number of compaction output files (table files)
size_t num_output_files;
// the number of compaction output files (blob files)
size_t num_output_files_blob;
// true if the compaction is a full compaction (all live SST files input)
bool is_full_compaction;
// true if the compaction is a manual compaction
bool is_manual_compaction;
// the size of the compaction input in bytes.
// the total size of table files in the compaction input
uint64_t total_input_bytes;
// the size of the compaction output in bytes.
// the total size of blobs read from blob files
uint64_t total_blob_bytes_read;
// the total size of table files in the compaction output
uint64_t total_output_bytes;
// the total size of blob files in the compaction output
uint64_t total_output_bytes_blob;
// number of records being replaced by newer record associated with same key.
// this could be a new value or a deletion entry for that key so this field

@ -14,17 +14,21 @@ void CompactionJobStats::Reset() {
cpu_micros = 0;
num_input_records = 0;
num_blobs_read = 0;
num_input_files = 0;
num_input_files_at_output_level = 0;
num_output_records = 0;
num_output_files = 0;
num_output_files_blob = 0;
is_full_compaction = false;
is_manual_compaction = false;
total_input_bytes = 0;
total_blob_bytes_read = 0;
total_output_bytes = 0;
total_output_bytes_blob = 0;
num_records_replaced = 0;
@ -53,14 +57,18 @@ void CompactionJobStats::Add(const CompactionJobStats& stats) {
cpu_micros += stats.cpu_micros;
num_input_records += stats.num_input_records;
num_blobs_read += stats.num_blobs_read;
num_input_files += stats.num_input_files;
num_input_files_at_output_level += stats.num_input_files_at_output_level;
num_output_records += stats.num_output_records;
num_output_files += stats.num_output_files;
num_output_files_blob += stats.num_output_files_blob;
total_input_bytes += stats.total_input_bytes;
total_blob_bytes_read += stats.total_blob_bytes_read;
total_output_bytes += stats.total_output_bytes;
total_output_bytes_blob += stats.total_output_bytes_blob;
num_records_replaced += stats.num_records_replaced;

Loading…
Cancel
Save