Add remote compaction read/write bytes statistics (#8939)

Summary:
Add basic read/write bytes statistics on the primary side:
`REMOTE_COMPACT_READ_BYTES`
`REMOTE_COMPACT_WRITE_BYTES`

Fixed existing statistics missing some IO for remote compaction.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8939

Test Plan: CI

Reviewed By: ajkr

Differential Revision: D31074672

Pulled By: jay-zhuang

fbshipit-source-id: c57afdba369990185008ffaec7e3fe7c62e8902f
main
Jay Zhuang 3 years ago committed by Facebook GitHub Bot
parent d6bd1a0291
commit 6b34eb0ebc
  1. 1
      HISTORY.md
  2. 14
      db/compaction/compaction_job.cc
  3. 13
      db/compaction/compaction_job.h
  4. 79
      db/compaction/compaction_service_test.cc
  5. 4
      include/rocksdb/statistics.h
  6. 8
      java/rocksjni/portal.h
  7. 17
      java/src/main/java/org/rocksdb/TickerType.java
  8. 2
      monitoring/statistics.cc

@ -5,6 +5,7 @@
### New Features ### New Features
* Provided support for SingleDelete with user defined timestamp. * Provided support for SingleDelete with user defined timestamp.
* Add remote compaction read/write bytes statistics: `REMOTE_COMPACT_READ_BYTES`, `REMOTE_COMPACT_WRITE_BYTES`.
### Public API change ### Public API change
* Made SystemClock extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method. * Made SystemClock extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method.

@ -1113,8 +1113,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
sub_compact->num_output_records = compaction_result.num_output_records; sub_compact->num_output_records = compaction_result.num_output_records;
sub_compact->approx_size = compaction_input.approx_size; // is this used? sub_compact->approx_size = compaction_input.approx_size; // is this used?
sub_compact->total_bytes = compaction_result.total_bytes; sub_compact->total_bytes = compaction_result.total_bytes;
IOSTATS_ADD(bytes_written, compaction_result.bytes_written); RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read);
IOSTATS_ADD(bytes_read, compaction_result.bytes_read); RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES,
compaction_result.bytes_written);
return CompactionServiceJobStatus::kSuccess; return CompactionServiceJobStatus::kSuccess;
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
@ -2248,6 +2249,12 @@ std::string CompactionServiceCompactionJob::GetTableFileName(
return MakeTableFileName(output_path_, file_number); return MakeTableFileName(output_path_, file_number);
} }
void CompactionServiceCompactionJob::RecordCompactionIOStats() {
compaction_result_->bytes_read += IOSTATS(bytes_read);
compaction_result_->bytes_written += IOSTATS(bytes_written);
CompactionJob::RecordCompactionIOStats();
}
CompactionServiceCompactionJob::CompactionServiceCompactionJob( CompactionServiceCompactionJob::CompactionServiceCompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const MutableDBOptions& mutable_db_options, const FileOptions& file_options, const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
@ -2338,9 +2345,6 @@ Status CompactionServiceCompactionJob::Run() {
// Finish up all book-keeping to unify the subcompaction results // Finish up all book-keeping to unify the subcompaction results
AggregateStatistics(); AggregateStatistics();
UpdateCompactionStats(); UpdateCompactionStats();
compaction_result_->bytes_written = IOSTATS(bytes_written);
compaction_result_->bytes_read = IOSTATS(bytes_read);
RecordCompactionIOStats(); RecordCompactionIOStats();
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);

@ -117,7 +117,7 @@ class CompactionJob {
void AggregateStatistics(); void AggregateStatistics();
void UpdateCompactionStats(); void UpdateCompactionStats();
void LogCompaction(); void LogCompaction();
void RecordCompactionIOStats(); virtual void RecordCompactionIOStats();
void CleanupCompaction(); void CleanupCompaction();
// Call compaction filter. Then iterate through input and compact the // Call compaction filter. Then iterate through input and compact the
@ -304,10 +304,10 @@ struct CompactionServiceResult {
std::string output_path; std::string output_path;
// some statistics about the compaction // some statistics about the compaction
uint64_t num_output_records; uint64_t num_output_records = 0;
uint64_t total_bytes; uint64_t total_bytes = 0;
uint64_t bytes_read; uint64_t bytes_read = 0;
uint64_t bytes_written; uint64_t bytes_written = 0;
CompactionJobStats stats; CompactionJobStats stats;
// serialization interface to read and write the object // serialization interface to read and write the object
@ -347,6 +347,9 @@ class CompactionServiceCompactionJob : private CompactionJob {
IOStatus io_status() const { return CompactionJob::io_status(); } IOStatus io_status() const { return CompactionJob::io_status(); }
protected:
void RecordCompactionIOStats() override;
private: private:
// Get table file name in output_path // Get table file name in output_path
std::string GetTableFileName(uint64_t file_number) override; std::string GetTableFileName(uint64_t file_number) override;

@ -310,6 +310,9 @@ TEST_P(CompactionServiceTest, BasicCompactions) {
Options options = CurrentOptions(); Options options = CurrentOptions();
ReopenWithCompactionService(&options); ReopenWithCompactionService(&options);
Statistics* primary_statistics = GetPrimaryStatistics();
Statistics* compactor_statistics = GetCompactorStatistics();
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
for (int j = 0; j < 10; j++) { for (int j = 0; j < 10; j++) {
int key_id = i * 10 + j; int key_id = i * 10 + j;
@ -337,14 +340,24 @@ TEST_P(CompactionServiceTest, BasicCompactions) {
} }
} }
auto my_cs = GetCompactionService(); auto my_cs = GetCompactionService();
Statistics* compactor_statistics = GetCompactorStatistics();
ASSERT_GE(my_cs->GetCompactionNum(), 1); ASSERT_GE(my_cs->GetCompactionNum(), 1);
// make sure the compaction statistics is only recorded on the remote side // make sure the compaction statistics is only recorded on the remote side
ASSERT_GE( ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1);
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1); ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
Statistics* primary_statistics = GetPrimaryStatistics(); ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0);
ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), // even with remote compaction, primary host still needs to read SST files to
// `verify_table()`.
ASSERT_GE(primary_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
// all the compaction write happens on the remote side
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
ASSERT_GE(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 1);
ASSERT_GT(primary_statistics->getTickerCount(COMPACT_READ_BYTES),
primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES));
// compactor is already the remote side, which doesn't have remote
ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
0); 0);
// Test failed compaction // Test failed compaction
@ -682,10 +695,10 @@ TEST_P(CompactionServiceTest, FallbackLocalAuto) {
auto my_cs = GetCompactionService(); auto my_cs = GetCompactionService();
Statistics* compactor_statistics = GetCompactorStatistics(); Statistics* compactor_statistics = GetCompactorStatistics();
Statistics* primary_statistics = GetPrimaryStatistics(); Statistics* primary_statistics = GetPrimaryStatistics();
uint64_t compactor_new_key = uint64_t compactor_write_bytes =
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
uint64_t primary_new_key = uint64_t primary_write_bytes =
primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal); my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
@ -719,11 +732,12 @@ TEST_P(CompactionServiceTest, FallbackLocalAuto) {
ASSERT_EQ(my_cs->GetCompactionNum(), 0); ASSERT_EQ(my_cs->GetCompactionNum(), 0);
// make sure the compaction statistics is only recorded on the local side // make sure the compaction statistics is only recorded on the local side
ASSERT_EQ( ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), compactor_write_bytes);
compactor_new_key); ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), primary_write_bytes);
primary_new_key); ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0);
} }
TEST_P(CompactionServiceTest, FallbackLocalManual) { TEST_P(CompactionServiceTest, FallbackLocalManual) {
@ -737,10 +751,10 @@ TEST_P(CompactionServiceTest, FallbackLocalManual) {
auto my_cs = GetCompactionService(); auto my_cs = GetCompactionService();
Statistics* compactor_statistics = GetCompactorStatistics(); Statistics* compactor_statistics = GetCompactorStatistics();
Statistics* primary_statistics = GetPrimaryStatistics(); Statistics* primary_statistics = GetPrimaryStatistics();
uint64_t compactor_new_key = uint64_t compactor_write_bytes =
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
uint64_t primary_new_key = uint64_t primary_write_bytes =
primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
// re-enable remote compaction // re-enable remote compaction
my_cs->ResetOverride(); my_cs->ResetOverride();
@ -753,31 +767,32 @@ TEST_P(CompactionServiceTest, FallbackLocalManual) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
// make sure the compaction statistics is only recorded on the remote side // make sure the compaction statistics is only recorded on the remote side
ASSERT_GT( ASSERT_GT(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), compactor_write_bytes);
compactor_new_key); ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
primary_new_key); ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
primary_write_bytes);
// return run local again with API WaitForComplete // return run local again with API WaitForComplete
my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal); my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
start_str = Key(120); start_str = Key(120);
start = start_str; start = start_str;
comp_num = my_cs->GetCompactionNum(); comp_num = my_cs->GetCompactionNum();
compactor_new_key = compactor_write_bytes =
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
primary_new_key = primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
ASSERT_EQ(my_cs->GetCompactionNum(), ASSERT_EQ(my_cs->GetCompactionNum(),
comp_num); // no remote compaction is run comp_num); // no remote compaction is run
// make sure the compaction statistics is only recorded on the local side // make sure the compaction statistics is only recorded on the local side
ASSERT_EQ( ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), compactor_write_bytes);
compactor_new_key); ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), primary_write_bytes);
primary_new_key); ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
compactor_write_bytes);
// verify result after 2 manual compactions // verify result after 2 manual compactions
VerifyTestData(); VerifyTestData();

@ -413,6 +413,10 @@ enum Tickers : uint32_t {
BACKUP_READ_BYTES, BACKUP_READ_BYTES,
BACKUP_WRITE_BYTES, BACKUP_WRITE_BYTES,
// Remote compaction read/write statistics
REMOTE_COMPACT_READ_BYTES,
REMOTE_COMPACT_WRITE_BYTES,
TICKER_ENUM_MAX TICKER_ENUM_MAX
}; };

@ -5008,6 +5008,10 @@ class TickerTypeJni {
return -0x20; return -0x20;
case ROCKSDB_NAMESPACE::Tickers::BACKUP_WRITE_BYTES: case ROCKSDB_NAMESPACE::Tickers::BACKUP_WRITE_BYTES:
return -0x21; return -0x21;
case ROCKSDB_NAMESPACE::Tickers::REMOTE_COMPACT_READ_BYTES:
return -0x22;
case ROCKSDB_NAMESPACE::Tickers::REMOTE_COMPACT_WRITE_BYTES:
return -0x23;
case ROCKSDB_NAMESPACE::Tickers::TICKER_ENUM_MAX: case ROCKSDB_NAMESPACE::Tickers::TICKER_ENUM_MAX:
// 0x5F was the max value in the initial copy of tickers to Java. // 0x5F was the max value in the initial copy of tickers to Java.
// Since these values are exposed directly to Java clients, we keep // Since these values are exposed directly to Java clients, we keep
@ -5353,6 +5357,10 @@ class TickerTypeJni {
return ROCKSDB_NAMESPACE::Tickers::BACKUP_READ_BYTES; return ROCKSDB_NAMESPACE::Tickers::BACKUP_READ_BYTES;
case -0x21: case -0x21:
return ROCKSDB_NAMESPACE::Tickers::BACKUP_WRITE_BYTES; return ROCKSDB_NAMESPACE::Tickers::BACKUP_WRITE_BYTES;
case -0x22:
return ROCKSDB_NAMESPACE::Tickers::REMOTE_COMPACT_READ_BYTES;
case -0x23:
return ROCKSDB_NAMESPACE::Tickers::REMOTE_COMPACT_WRITE_BYTES;
case 0x5F: case 0x5F:
// 0x5F was the max value in the initial copy of tickers to Java. // 0x5F was the max value in the initial copy of tickers to Java.
// Since these values are exposed directly to Java clients, we keep // Since these values are exposed directly to Java clients, we keep

@ -769,6 +769,23 @@ public enum TickerType {
*/ */
SECONDARY_CACHE_HITS((byte) -0x1E), SECONDARY_CACHE_HITS((byte) -0x1E),
/**
* Bytes read by `VerifyChecksum()` and `VerifyFileChecksums()` APIs.
*/
VERIFY_CHECKSUM_READ_BYTES((byte) -0x1F),
/**
* Bytes read/written while creating backups
*/
BACKUP_READ_BYTES((byte) -0x20),
BACKUP_WRITE_BYTES((byte) -0x21),
/**
* Remote compaction read/write statistics
*/
REMOTE_COMPACT_READ_BYTES((byte) -0x22),
REMOTE_COMPACT_WRITE_BYTES((byte) -0x23),
TICKER_ENUM_MAX((byte) 0x5F); TICKER_ENUM_MAX((byte) 0x5F);
private final byte value; private final byte value;

@ -214,6 +214,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{VERIFY_CHECKSUM_READ_BYTES, "rocksdb.verify_checksum.read.bytes"}, {VERIFY_CHECKSUM_READ_BYTES, "rocksdb.verify_checksum.read.bytes"},
{BACKUP_READ_BYTES, "rocksdb.backup.read.bytes"}, {BACKUP_READ_BYTES, "rocksdb.backup.read.bytes"},
{BACKUP_WRITE_BYTES, "rocksdb.backup.write.bytes"}, {BACKUP_WRITE_BYTES, "rocksdb.backup.write.bytes"},
{REMOTE_COMPACT_READ_BYTES, "rocksdb.remote.compact.read.bytes"},
{REMOTE_COMPACT_WRITE_BYTES, "rocksdb.remote.compact.write.bytes"},
}; };
const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = { const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {

Loading…
Cancel
Save