From 6b34eb0ebc04403365a2a061c61c00d3429489f1 Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Tue, 28 Sep 2021 13:59:15 -0700 Subject: [PATCH] Add remote compaction read/write bytes statistics (#8939) Summary: Add basic read/write bytes statistics on the primary side: `REMOTE_COMPACT_READ_BYTES` `REMOTE_COMPACT_WRITE_BYTES` Fixed existing statistics missing some IO for remote compaction. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8939 Test Plan: CI Reviewed By: ajkr Differential Revision: D31074672 Pulled By: jay-zhuang fbshipit-source-id: c57afdba369990185008ffaec7e3fe7c62e8902f --- HISTORY.md | 1 + db/compaction/compaction_job.cc | 14 ++-- db/compaction/compaction_job.h | 13 +-- db/compaction/compaction_service_test.cc | 79 +++++++++++-------- include/rocksdb/statistics.h | 4 + java/rocksjni/portal.h | 8 ++ .../src/main/java/org/rocksdb/TickerType.java | 17 ++++ monitoring/statistics.cc | 2 + 8 files changed, 96 insertions(+), 42 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index bb01abd92..7ddb19c94 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ ### New Features * Provided support for SingleDelete with user defined timestamp. +* Add remote compaction read/write bytes statistics: `REMOTE_COMPACT_READ_BYTES`, `REMOTE_COMPACT_WRITE_BYTES`. ### Public API change * Made SystemClock extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method. diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 1bdc4125f..8b67d3323 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1113,8 +1113,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( sub_compact->num_output_records = compaction_result.num_output_records; sub_compact->approx_size = compaction_input.approx_size; // is this used? sub_compact->total_bytes = compaction_result.total_bytes; - IOSTATS_ADD(bytes_written, compaction_result.bytes_written); - IOSTATS_ADD(bytes_read, compaction_result.bytes_read); + RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read); + RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES, + compaction_result.bytes_written); return CompactionServiceJobStatus::kSuccess; } #endif // !ROCKSDB_LITE @@ -2248,6 +2249,12 @@ std::string CompactionServiceCompactionJob::GetTableFileName( return MakeTableFileName(output_path_, file_number); } +void CompactionServiceCompactionJob::RecordCompactionIOStats() { + compaction_result_->bytes_read += IOSTATS(bytes_read); + compaction_result_->bytes_written += IOSTATS(bytes_written); + CompactionJob::RecordCompactionIOStats(); +} + CompactionServiceCompactionJob::CompactionServiceCompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, const MutableDBOptions& mutable_db_options, const FileOptions& file_options, @@ -2338,9 +2345,6 @@ Status CompactionServiceCompactionJob::Run() { // Finish up all book-keeping to unify the subcompaction results AggregateStatistics(); UpdateCompactionStats(); - - compaction_result_->bytes_written = IOSTATS(bytes_written); - compaction_result_->bytes_read = IOSTATS(bytes_read); RecordCompactionIOStats(); LogFlush(db_options_.info_log); diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 4f4563991..67ec2d7c7 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -117,7 +117,7 @@ class CompactionJob { void AggregateStatistics(); void UpdateCompactionStats(); void LogCompaction(); - void RecordCompactionIOStats(); + virtual void RecordCompactionIOStats(); void CleanupCompaction(); // Call compaction filter. Then iterate through input and compact the @@ -304,10 +304,10 @@ struct CompactionServiceResult { std::string output_path; // some statistics about the compaction - uint64_t num_output_records; - uint64_t total_bytes; - uint64_t bytes_read; - uint64_t bytes_written; + uint64_t num_output_records = 0; + uint64_t total_bytes = 0; + uint64_t bytes_read = 0; + uint64_t bytes_written = 0; CompactionJobStats stats; // serialization interface to read and write the object @@ -347,6 +347,9 @@ class CompactionServiceCompactionJob : private CompactionJob { IOStatus io_status() const { return CompactionJob::io_status(); } + protected: + void RecordCompactionIOStats() override; + private: // Get table file name in output_path std::string GetTableFileName(uint64_t file_number) override; diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index d68838182..4b70c7822 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -310,6 +310,9 @@ TEST_P(CompactionServiceTest, BasicCompactions) { Options options = CurrentOptions(); ReopenWithCompactionService(&options); + Statistics* primary_statistics = GetPrimaryStatistics(); + Statistics* compactor_statistics = GetCompactorStatistics(); + for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; @@ -337,14 +340,24 @@ TEST_P(CompactionServiceTest, BasicCompactions) { } } auto my_cs = GetCompactionService(); - Statistics* compactor_statistics = GetCompactorStatistics(); ASSERT_GE(my_cs->GetCompactionNum(), 1); // make sure the compaction statistics is only recorded on the remote side - ASSERT_GE( - compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1); - Statistics* primary_statistics = GetPrimaryStatistics(); - ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), + ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1); + ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_READ_BYTES), 1); + ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0); + // even with remote compaction, primary host still needs to read SST files to + // `verify_table()`. + ASSERT_GE(primary_statistics->getTickerCount(COMPACT_READ_BYTES), 1); + // all the compaction write happens on the remote side + ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), + compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES)); + ASSERT_GE(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 1); + ASSERT_GT(primary_statistics->getTickerCount(COMPACT_READ_BYTES), + primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES)); + // compactor is already the remote side, which doesn't have remote + ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0); + ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0); // Test failed compaction @@ -682,10 +695,10 @@ TEST_P(CompactionServiceTest, FallbackLocalAuto) { auto my_cs = GetCompactionService(); Statistics* compactor_statistics = GetCompactorStatistics(); Statistics* primary_statistics = GetPrimaryStatistics(); - uint64_t compactor_new_key = - compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); - uint64_t primary_new_key = - primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); + uint64_t compactor_write_bytes = + compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES); + uint64_t primary_write_bytes = + primary_statistics->getTickerCount(COMPACT_WRITE_BYTES); my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal); @@ -719,11 +732,12 @@ TEST_P(CompactionServiceTest, FallbackLocalAuto) { ASSERT_EQ(my_cs->GetCompactionNum(), 0); // make sure the compaction statistics is only recorded on the local side - ASSERT_EQ( - compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), - compactor_new_key); - ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), - primary_new_key); + ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), + compactor_write_bytes); + ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), + primary_write_bytes); + ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0); + ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0); } TEST_P(CompactionServiceTest, FallbackLocalManual) { @@ -737,10 +751,10 @@ TEST_P(CompactionServiceTest, FallbackLocalManual) { auto my_cs = GetCompactionService(); Statistics* compactor_statistics = GetCompactorStatistics(); Statistics* primary_statistics = GetPrimaryStatistics(); - uint64_t compactor_new_key = - compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); - uint64_t primary_new_key = - primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); + uint64_t compactor_write_bytes = + compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES); + uint64_t primary_write_bytes = + primary_statistics->getTickerCount(COMPACT_WRITE_BYTES); // re-enable remote compaction my_cs->ResetOverride(); @@ -753,31 +767,32 @@ TEST_P(CompactionServiceTest, FallbackLocalManual) { ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); // make sure the compaction statistics is only recorded on the remote side - ASSERT_GT( - compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), - compactor_new_key); - ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), - primary_new_key); + ASSERT_GT(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), + compactor_write_bytes); + ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), + compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES)); + ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), + primary_write_bytes); // return run local again with API WaitForComplete my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal); start_str = Key(120); start = start_str; comp_num = my_cs->GetCompactionNum(); - compactor_new_key = - compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); - primary_new_key = - primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); + compactor_write_bytes = + compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES); + primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr)); ASSERT_EQ(my_cs->GetCompactionNum(), comp_num); // no remote compaction is run // make sure the compaction statistics is only recorded on the local side - ASSERT_EQ( - compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), - compactor_new_key); - ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), - primary_new_key); + ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), + compactor_write_bytes); + ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), + primary_write_bytes); + ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), + compactor_write_bytes); // verify result after 2 manual compactions VerifyTestData(); diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index c4f2d5154..9dc2bcce9 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -413,6 +413,10 @@ enum Tickers : uint32_t { BACKUP_READ_BYTES, BACKUP_WRITE_BYTES, + // Remote compaction read/write statistics + REMOTE_COMPACT_READ_BYTES, + REMOTE_COMPACT_WRITE_BYTES, + TICKER_ENUM_MAX }; diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 88bb509a3..261769764 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -5008,6 +5008,10 @@ class TickerTypeJni { return -0x20; case ROCKSDB_NAMESPACE::Tickers::BACKUP_WRITE_BYTES: return -0x21; + case ROCKSDB_NAMESPACE::Tickers::REMOTE_COMPACT_READ_BYTES: + return -0x22; + case ROCKSDB_NAMESPACE::Tickers::REMOTE_COMPACT_WRITE_BYTES: + return -0x23; case ROCKSDB_NAMESPACE::Tickers::TICKER_ENUM_MAX: // 0x5F was the max value in the initial copy of tickers to Java. // Since these values are exposed directly to Java clients, we keep @@ -5353,6 +5357,10 @@ class TickerTypeJni { return ROCKSDB_NAMESPACE::Tickers::BACKUP_READ_BYTES; case -0x21: return ROCKSDB_NAMESPACE::Tickers::BACKUP_WRITE_BYTES; + case -0x22: + return ROCKSDB_NAMESPACE::Tickers::REMOTE_COMPACT_READ_BYTES; + case -0x23: + return ROCKSDB_NAMESPACE::Tickers::REMOTE_COMPACT_WRITE_BYTES; case 0x5F: // 0x5F was the max value in the initial copy of tickers to Java. // Since these values are exposed directly to Java clients, we keep diff --git a/java/src/main/java/org/rocksdb/TickerType.java b/java/src/main/java/org/rocksdb/TickerType.java index 1381a0a6d..9fa1942bf 100644 --- a/java/src/main/java/org/rocksdb/TickerType.java +++ b/java/src/main/java/org/rocksdb/TickerType.java @@ -769,6 +769,23 @@ public enum TickerType { */ SECONDARY_CACHE_HITS((byte) -0x1E), + /** + * Bytes read by `VerifyChecksum()` and `VerifyFileChecksums()` APIs. + */ + VERIFY_CHECKSUM_READ_BYTES((byte) -0x1F), + + /** + * Bytes read/written while creating backups + */ + BACKUP_READ_BYTES((byte) -0x20), + BACKUP_WRITE_BYTES((byte) -0x21), + + /** + * Remote compaction read/write statistics + */ + REMOTE_COMPACT_READ_BYTES((byte) -0x22), + REMOTE_COMPACT_WRITE_BYTES((byte) -0x23), + TICKER_ENUM_MAX((byte) 0x5F); private final byte value; diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 00533006e..3bb4b5964 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -214,6 +214,8 @@ const std::vector> TickersNameMap = { {VERIFY_CHECKSUM_READ_BYTES, "rocksdb.verify_checksum.read.bytes"}, {BACKUP_READ_BYTES, "rocksdb.backup.read.bytes"}, {BACKUP_WRITE_BYTES, "rocksdb.backup.write.bytes"}, + {REMOTE_COMPACT_READ_BYTES, "rocksdb.remote.compact.read.bytes"}, + {REMOTE_COMPACT_WRITE_BYTES, "rocksdb.remote.compact.write.bytes"}, }; const std::vector> HistogramsNameMap = {