diff --git a/HISTORY.md b/HISTORY.md index e5a2979d6..bae246654 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ ### Public API change * Add a new option BlockBasedTableOptions::max_auto_readahead_size. RocksDB does auto-readahead for iterators on noticing more than two reads for a table file if user doesn't provide readahead_size. The readahead starts at 8KB and doubles on every additional read upto max_auto_readahead_size and now max_auto_readahead_size can be configured dynamically as well. Found that 256 KB readahead size provides the best performance, based on experiments, for auto readahead. Experiment data is in PR #3282. If value is set 0 then no automatic prefetching will be done by rocksdb. Also changing the value will only affect files opened after the change. * Add suppport to extend DB::VerifyFileChecksums API to also verify blob files checksum. +* When using the new BlobDB, the amount of data written by flushes/compactions is now broken down into table files and blob files in the compaction statistics; namely, Write(GB) denotes the amount of data written to table files, while Wblob(GB) means the amount of data written to blob files. ### New Features * Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision. diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 31e6faf55..38387f046 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -795,27 +795,30 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { double bytes_written_per_sec = 0; if (stats.bytes_read_non_output_levels > 0) { - read_write_amp = (stats.bytes_written + stats.bytes_read_output_level + - stats.bytes_read_non_output_levels) / - static_cast(stats.bytes_read_non_output_levels); - write_amp = stats.bytes_written / + read_write_amp = + (stats.bytes_written + stats.bytes_written_blob + + stats.bytes_read_output_level + stats.bytes_read_non_output_levels) / + static_cast(stats.bytes_read_non_output_levels); + write_amp = (stats.bytes_written + stats.bytes_written_blob) / static_cast(stats.bytes_read_non_output_levels); } if (stats.micros > 0) { bytes_read_per_sec = (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) / static_cast(stats.micros); - bytes_written_per_sec = - stats.bytes_written / static_cast(stats.micros); + bytes_written_per_sec = (stats.bytes_written + stats.bytes_written_blob) / + static_cast(stats.micros); } const std::string& column_family_name = cfd->GetName(); + constexpr double kMB = 1048576.0; + ROCKS_LOG_BUFFER( log_buffer_, "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " - "files in(%d, %d) out(%d) " - "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " + "files in(%d, %d) out(%d +%d blob) " + "MB in(%.1f, %.1f) out(%.1f +%.1f blob), read-write-amplify(%.1f) " "write-amplify(%.1f) %s, records in: %" PRIu64 ", records dropped: %" PRIu64 " output_compression: %s\n", column_family_name.c_str(), vstorage->LevelSummary(&tmp), @@ -823,9 +826,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { compact_->compaction->output_level(), stats.num_input_files_in_non_output_levels, stats.num_input_files_in_output_level, stats.num_output_files, - stats.bytes_read_non_output_levels / 1048576.0, - stats.bytes_read_output_level / 1048576.0, - stats.bytes_written / 1048576.0, read_write_amp, write_amp, + stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB, + stats.bytes_read_output_level / kMB, stats.bytes_written / kMB, + stats.bytes_written_blob / kMB, read_write_amp, write_amp, status.ToString().c_str(), stats.num_input_records, stats.num_dropped_records, CompressionTypeToString(compact_->compaction->output_compression()) @@ -1825,10 +1828,11 @@ void CompactionJob::UpdateCompactionStats() { } compaction_stats_.num_output_files = - static_cast(compact_->num_output_files) + + static_cast(compact_->num_output_files); + compaction_stats_.num_output_files_blob = static_cast(compact_->num_blob_output_files); - compaction_stats_.bytes_written = - compact_->total_bytes + compact_->total_blob_bytes; + compaction_stats_.bytes_written = compact_->total_bytes; + compaction_stats_.bytes_written_blob = compact_->total_blob_bytes; if (compaction_stats_.num_input_records > compact_->num_output_records) { compaction_stats_.num_dropped_records = @@ -1867,9 +1871,11 @@ void CompactionJob::UpdateCompactionJobStats( stats.num_input_files_in_output_level; // output information - compaction_job_stats_->total_output_bytes = stats.bytes_written; + compaction_job_stats_->total_output_bytes = + stats.bytes_written + stats.bytes_written_blob; compaction_job_stats_->num_output_records = compact_->num_output_records; - compaction_job_stats_->num_output_files = stats.num_output_files; + compaction_job_stats_->num_output_files = + stats.num_output_files + stats.num_output_files_blob; if (stats.num_output_files > 0) { CopyPrefix(compact_->SmallestUserKey(), diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 4091efb5f..8c8113410 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5946,13 +5946,13 @@ TEST_F(DBCompactionTest, CompactionWithBlob) { const InternalStats* const internal_stats = cfd->internal_stats(); ASSERT_NE(internal_stats, nullptr); - const uint64_t expected_bytes = - table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes(); - const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); ASSERT_GE(compaction_stats.size(), 2); - ASSERT_EQ(compaction_stats[1].bytes_written, expected_bytes); - ASSERT_EQ(compaction_stats[1].num_output_files, 2); + ASSERT_EQ(compaction_stats[1].bytes_written, table_file->fd.GetFileSize()); + ASSERT_EQ(compaction_stats[1].bytes_written_blob, + blob_file->GetTotalBlobBytes()); + ASSERT_EQ(compaction_stats[1].num_output_files, 1); + ASSERT_EQ(compaction_stats[1].num_output_files_blob, 1); } class DBCompactionTestBlobError @@ -6040,11 +6040,15 @@ TEST_P(DBCompactionTestBlobError, CompactionError) { if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") { ASSERT_EQ(compaction_stats[1].bytes_written, 0); + ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); ASSERT_EQ(compaction_stats[1].num_output_files, 0); + ASSERT_EQ(compaction_stats[1].num_output_files_blob, 0); } else { // SST file writing succeeded; blob file writing failed (during Finish) ASSERT_GT(compaction_stats[1].bytes_written, 0); + ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); ASSERT_EQ(compaction_stats[1].num_output_files, 1); + ASSERT_EQ(compaction_stats[1].num_output_files_blob, 0); } } diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index a03ec81e3..8eeafc56b 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -513,16 +513,18 @@ TEST_F(DBFlushTest, FlushWithBlob) { const InternalStats* const internal_stats = cfd->internal_stats(); assert(internal_stats); - const uint64_t expected_bytes = - table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes(); - const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); ASSERT_FALSE(compaction_stats.empty()); - ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes); - ASSERT_EQ(compaction_stats[0].num_output_files, 2); + ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize()); + ASSERT_EQ(compaction_stats[0].bytes_written_blob, + blob_file->GetTotalBlobBytes()); + ASSERT_EQ(compaction_stats[0].num_output_files, 1); + ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1); const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); - ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], + compaction_stats[0].bytes_written + + compaction_stats[0].bytes_written_blob); #endif // ROCKSDB_LITE } @@ -802,16 +804,21 @@ TEST_P(DBFlushTestBlobError, FlushError) { if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") { ASSERT_EQ(compaction_stats[0].bytes_written, 0); + ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0); ASSERT_EQ(compaction_stats[0].num_output_files, 0); + ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0); } else { // SST file writing succeeded; blob file writing failed (during Finish) ASSERT_GT(compaction_stats[0].bytes_written, 0); + ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0); ASSERT_EQ(compaction_stats[0].num_output_files, 1); + ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0); } const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], - compaction_stats[0].bytes_written); + compaction_stats[0].bytes_written + + compaction_stats[0].bytes_written_blob); #endif // ROCKSDB_LITE } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 98757297d..01b65a4b1 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1408,14 +1408,15 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, const auto& blobs = edit->GetBlobFileAdditions(); for (const auto& blob : blobs) { - stats.bytes_written += blob.GetTotalBlobBytes(); + stats.bytes_written_blob += blob.GetTotalBlobBytes(); } - stats.num_output_files += static_cast(blobs.size()); + stats.num_output_files_blob = static_cast(blobs.size()); cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats); - cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, - stats.bytes_written); + cfd->internal_stats()->AddCFStats( + InternalStats::BYTES_FLUSHED, + stats.bytes_written + stats.bytes_written_blob); RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); return s; } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 25803b332..a729800ea 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -425,16 +425,18 @@ TEST_F(DBWALTest, RecoverWithBlob) { const InternalStats* const internal_stats = cfd->internal_stats(); ASSERT_NE(internal_stats, nullptr); - const uint64_t expected_bytes = - table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes(); - const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); ASSERT_FALSE(compaction_stats.empty()); - ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes); - ASSERT_EQ(compaction_stats[0].num_output_files, 2); + ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize()); + ASSERT_EQ(compaction_stats[0].bytes_written_blob, + blob_file->GetTotalBlobBytes()); + ASSERT_EQ(compaction_stats[0].num_output_files, 1); + ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1); const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); - ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], + compaction_stats[0].bytes_written + + compaction_stats[0].bytes_written_blob); #endif // ROCKSDB_LITE } diff --git a/db/flush_job.cc b/db/flush_job.cc index d356b2bb6..ececba81b 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -477,15 +477,16 @@ Status FlushJob::WriteLevel0Table() { const auto& blobs = edit_->GetBlobFileAdditions(); for (const auto& blob : blobs) { - stats.bytes_written += blob.GetTotalBlobBytes(); + stats.bytes_written_blob += blob.GetTotalBlobBytes(); } - stats.num_output_files += static_cast(blobs.size()); + stats.num_output_files_blob = static_cast(blobs.size()); RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros); cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats); - cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, - stats.bytes_written); + cfd_->internal_stats()->AddCFStats( + InternalStats::BYTES_FLUSHED, + stats.bytes_written + stats.bytes_written_blob); RecordFlushIOStats(); return s; } diff --git a/db/internal_stats.cc b/db/internal_stats.cc index fc5981d91..8ef719682 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -50,6 +50,7 @@ const std::map InternalStats::compaction_level_stats = {LevelStatType::AVG_SEC, LevelStat{"AvgSec", "Avg(sec)"}}, {LevelStatType::KEY_IN, LevelStat{"KeyIn", "KeyIn"}}, {LevelStatType::KEY_DROP, LevelStat{"KeyDrop", "KeyDrop"}}, + {LevelStatType::W_BLOB_GB, LevelStat{"WblobGB", "Wblob(GB)"}}, }; namespace { @@ -67,7 +68,7 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name, }; int line_size = snprintf( buf + written_size, len - written_size, - "%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n", + "%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n", // Note that we skip COMPACTED_FILES and merge it with Files column group_by.c_str(), hdr(LevelStatType::NUM_FILES), hdr(LevelStatType::SIZE_BYTES), hdr(LevelStatType::SCORE), @@ -78,7 +79,7 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name, hdr(LevelStatType::WRITE_MBPS), hdr(LevelStatType::COMP_SEC), hdr(LevelStatType::COMP_CPU_SEC), hdr(LevelStatType::COMP_COUNT), hdr(LevelStatType::AVG_SEC), hdr(LevelStatType::KEY_IN), - hdr(LevelStatType::KEY_DROP)); + hdr(LevelStatType::KEY_DROP), hdr(LevelStatType::W_BLOB_GB)); written_size += line_size; written_size = std::min(written_size, static_cast(len)); @@ -90,10 +91,11 @@ void PrepareLevelStats(std::map* level_stats, int num_files, int being_compacted, double total_file_size, double score, double w_amp, const InternalStats::CompactionStats& stats) { - uint64_t bytes_read = + const uint64_t bytes_read = stats.bytes_read_non_output_levels + stats.bytes_read_output_level; - int64_t bytes_new = stats.bytes_written - stats.bytes_read_output_level; - double elapsed = (stats.micros + 1) / kMicrosInSec; + const uint64_t bytes_written = stats.bytes_written + stats.bytes_written_blob; + const int64_t bytes_new = stats.bytes_written - stats.bytes_read_output_level; + const double elapsed = (stats.micros + 1) / kMicrosInSec; (*level_stats)[LevelStatType::NUM_FILES] = num_files; (*level_stats)[LevelStatType::COMPACTED_FILES] = being_compacted; @@ -108,8 +110,7 @@ void PrepareLevelStats(std::map* level_stats, (*level_stats)[LevelStatType::MOVED_GB] = stats.bytes_moved / kGB; (*level_stats)[LevelStatType::WRITE_AMP] = w_amp; (*level_stats)[LevelStatType::READ_MBPS] = bytes_read / kMB / elapsed; - (*level_stats)[LevelStatType::WRITE_MBPS] = - stats.bytes_written / kMB / elapsed; + (*level_stats)[LevelStatType::WRITE_MBPS] = bytes_written / kMB / elapsed; (*level_stats)[LevelStatType::COMP_SEC] = stats.micros / kMicrosInSec; (*level_stats)[LevelStatType::COMP_CPU_SEC] = stats.cpu_micros / kMicrosInSec; (*level_stats)[LevelStatType::COMP_COUNT] = stats.count; @@ -119,6 +120,7 @@ void PrepareLevelStats(std::map* level_stats, static_cast(stats.num_input_records); (*level_stats)[LevelStatType::KEY_DROP] = static_cast(stats.num_dropped_records); + (*level_stats)[LevelStatType::W_BLOB_GB] = stats.bytes_written_blob / kGB; } void PrintLevelStats(char* buf, size_t len, const std::string& name, @@ -143,7 +145,8 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, "%9d " /* Comp(cnt) */ "%8.3f " /* Avg(sec) */ "%7s " /* KeyIn */ - "%6s\n", /* KeyDrop */ + "%6s " /* KeyDrop */ + "%9.1f\n", /* Wblob(GB) */ name.c_str(), static_cast(stat_value.at(LevelStatType::NUM_FILES)), static_cast(stat_value.at(LevelStatType::COMPACTED_FILES)), BytesToHumanString( @@ -168,7 +171,8 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, .c_str(), NumberToHumanString( static_cast(stat_value.at(LevelStatType::KEY_DROP))) - .c_str()); + .c_str(), + stat_value.at(LevelStatType::W_BLOB_GB)); } void PrintLevelStats(char* buf, size_t len, const std::string& name, @@ -1179,7 +1183,8 @@ void InternalStats::DumpCFMapStats( double w_amp = (input_bytes == 0) ? 0.0 - : static_cast(comp_stats_[level].bytes_written) / + : static_cast(comp_stats_[level].bytes_written + + comp_stats_[level].bytes_written_blob) / input_bytes; std::map level_stats; PrepareLevelStats(&level_stats, files, files_being_compacted[level], @@ -1189,7 +1194,8 @@ void InternalStats::DumpCFMapStats( } } // Cumulative summary - double w_amp = compaction_stats_sum->bytes_written / + double w_amp = (compaction_stats_sum->bytes_written + + compaction_stats_sum->bytes_written_blob) / static_cast(curr_ingest + 1); // Stats summary across levels std::map sum_stats; @@ -1294,7 +1300,8 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) { CompactionStats interval_stats(compaction_stats_sum); interval_stats.Subtract(cf_stats_snapshot_.comp_stats); double w_amp = - interval_stats.bytes_written / static_cast(interval_ingest); + (interval_stats.bytes_written + interval_stats.bytes_written_blob) / + static_cast(interval_ingest); PrintLevelStats(buf, sizeof(buf), "Int", 0, 0, 0, 0, w_amp, interval_stats); value->append(buf); @@ -1354,7 +1361,8 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) { for (int level = 0; level < number_levels_; level++) { compact_bytes_read += comp_stats_[level].bytes_read_output_level + comp_stats_[level].bytes_read_non_output_levels; - compact_bytes_write += comp_stats_[level].bytes_written; + compact_bytes_write += comp_stats_[level].bytes_written + + comp_stats_[level].bytes_written_blob; compact_micros += comp_stats_[level].micros; } diff --git a/db/internal_stats.h b/db/internal_stats.h index 28ed50087..1337779d3 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -79,6 +79,7 @@ enum class LevelStatType { AVG_SEC, KEY_IN, KEY_DROP, + W_BLOB_GB, TOTAL // total number of types }; @@ -143,32 +144,39 @@ class InternalStats { uint64_t micros; uint64_t cpu_micros; - // The number of bytes read from all non-output levels + // The number of bytes read from all non-output levels (table files) uint64_t bytes_read_non_output_levels; - // The number of bytes read from the compaction output level. + // The number of bytes read from the compaction output level (table files) uint64_t bytes_read_output_level; - // Total number of bytes written during compaction + // Total number of bytes written to table files during compaction uint64_t bytes_written; - // Total number of bytes moved to the output level + // Total number of bytes written to blob files during compaction + uint64_t bytes_written_blob; + + // Total number of bytes moved to the output level (table files) uint64_t bytes_moved; - // The number of compaction input files in all non-output levels. + // The number of compaction input files in all non-output levels (table + // files) int num_input_files_in_non_output_levels; - // The number of compaction input files in the output level. + // The number of compaction input files in the output level (table files) int num_input_files_in_output_level; - // The number of compaction output files. + // The number of compaction output files (table files) int num_output_files; + // The number of compaction output files (blob files) + int num_output_files_blob; + // Total incoming entries during compaction between levels N and N+1 uint64_t num_input_records; // Accumulated diff number of entries - // (num input entries - num output entires) for compaction levels N and N+1 + // (num input entries - num output entries) for compaction levels N and N+1 uint64_t num_dropped_records; // Number of compactions done @@ -183,10 +191,12 @@ class InternalStats { bytes_read_non_output_levels(0), bytes_read_output_level(0), bytes_written(0), + bytes_written_blob(0), bytes_moved(0), num_input_files_in_non_output_levels(0), num_input_files_in_output_level(0), num_output_files(0), + num_output_files_blob(0), num_input_records(0), num_dropped_records(0), count(0) { @@ -202,10 +212,12 @@ class InternalStats { bytes_read_non_output_levels(0), bytes_read_output_level(0), bytes_written(0), + bytes_written_blob(0), bytes_moved(0), num_input_files_in_non_output_levels(0), num_input_files_in_output_level(0), num_output_files(0), + num_output_files_blob(0), num_input_records(0), num_dropped_records(0), count(c) { @@ -227,11 +239,13 @@ class InternalStats { bytes_read_non_output_levels(c.bytes_read_non_output_levels), bytes_read_output_level(c.bytes_read_output_level), bytes_written(c.bytes_written), + bytes_written_blob(c.bytes_written_blob), bytes_moved(c.bytes_moved), num_input_files_in_non_output_levels( c.num_input_files_in_non_output_levels), num_input_files_in_output_level(c.num_input_files_in_output_level), num_output_files(c.num_output_files), + num_output_files_blob(c.num_output_files_blob), num_input_records(c.num_input_records), num_dropped_records(c.num_dropped_records), count(c.count) { @@ -247,11 +261,13 @@ class InternalStats { bytes_read_non_output_levels = c.bytes_read_non_output_levels; bytes_read_output_level = c.bytes_read_output_level; bytes_written = c.bytes_written; + bytes_written_blob = c.bytes_written_blob; bytes_moved = c.bytes_moved; num_input_files_in_non_output_levels = c.num_input_files_in_non_output_levels; num_input_files_in_output_level = c.num_input_files_in_output_level; num_output_files = c.num_output_files; + num_output_files_blob = c.num_output_files_blob; num_input_records = c.num_input_records; num_dropped_records = c.num_dropped_records; count = c.count; @@ -269,10 +285,12 @@ class InternalStats { this->bytes_read_non_output_levels = 0; this->bytes_read_output_level = 0; this->bytes_written = 0; + this->bytes_written_blob = 0; this->bytes_moved = 0; this->num_input_files_in_non_output_levels = 0; this->num_input_files_in_output_level = 0; this->num_output_files = 0; + this->num_output_files_blob = 0; this->num_input_records = 0; this->num_dropped_records = 0; this->count = 0; @@ -288,12 +306,14 @@ class InternalStats { this->bytes_read_non_output_levels += c.bytes_read_non_output_levels; this->bytes_read_output_level += c.bytes_read_output_level; this->bytes_written += c.bytes_written; + this->bytes_written_blob += c.bytes_written_blob; this->bytes_moved += c.bytes_moved; this->num_input_files_in_non_output_levels += c.num_input_files_in_non_output_levels; this->num_input_files_in_output_level += c.num_input_files_in_output_level; this->num_output_files += c.num_output_files; + this->num_output_files_blob += c.num_output_files_blob; this->num_input_records += c.num_input_records; this->num_dropped_records += c.num_dropped_records; this->count += c.count; @@ -309,12 +329,14 @@ class InternalStats { this->bytes_read_non_output_levels -= c.bytes_read_non_output_levels; this->bytes_read_output_level -= c.bytes_read_output_level; this->bytes_written -= c.bytes_written; + this->bytes_written_blob -= c.bytes_written_blob; this->bytes_moved -= c.bytes_moved; this->num_input_files_in_non_output_levels -= c.num_input_files_in_non_output_levels; this->num_input_files_in_output_level -= c.num_input_files_in_output_level; this->num_output_files -= c.num_output_files; + this->num_output_files_blob -= c.num_output_files_blob; this->num_input_records -= c.num_input_records; this->num_dropped_records -= c.num_dropped_records; this->count -= c.count; @@ -653,10 +675,12 @@ class InternalStats { uint64_t bytes_read_non_output_levels; uint64_t bytes_read_output_level; uint64_t bytes_written; + uint64_t bytes_written_blob; uint64_t bytes_moved; int num_input_files_in_non_output_levels; int num_input_files_in_output_level; int num_output_files; + int num_output_files_blob; uint64_t num_input_records; uint64_t num_dropped_records; int count;