From 8bca83e5dde25ebf402429d5c3f9569d2533e799 Mon Sep 17 00:00:00 2001 From: Ari Ekmekji Date: Mon, 13 Jul 2015 15:51:38 -0700 Subject: [PATCH] Add tombstone information in CompactionJobStats Summary: Added new statistics in CompactionJobStats to keep track of deletion entries and the expiration of those entries. Updated these fields in compaction_job.cc as compaction took place and wrote a new test in compaction_job_stats_test.cc to verify accuracy. Test Plan: Wrote new test DeletionStatsTest in compaction_job_stats_test.cc to verify Reviewers: sdong, igor, yhchiang Reviewed By: yhchiang Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D41355 --- db/compaction_job.cc | 13 +- db/compaction_job_stats_test.cc | 161 ++++++++++++++++++++++++- include/rocksdb/compaction_job_stats.h | 13 +- util/compaction_job_stats_impl.cc | 6 +- 4 files changed, 184 insertions(+), 9 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 944b38592..bf8a77f21 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -369,8 +369,8 @@ Status CompactionJob::Run() { !cfd->IsDropped()) { // FLUSH preempts compaction // TODO(icanadi) this currently only checks if flush is necessary on - // compacting column family. we should also check if flush is necessary on - // other column families, too + // compacting column family. we should also check if flush is necessary + // on other column families, too imm_micros += yield_callback_(); @@ -646,7 +646,6 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, if (combined_idx >= compact_->combined_key_buf_.size()) { break; } - assert(combined_idx < compact_->combined_key_buf_.size()); key = compact_->combined_key_buf_[combined_idx]; value = compact_->combined_value_buf_[combined_idx]; @@ -680,6 +679,10 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, last_sequence_for_key = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber; } else { + if (ikey.type == kTypeDeletion) { + compaction_job_stats_->num_input_deletion_records++; + } + if (!has_current_user_key || cfd->user_comparator()->Compare(ikey.user_key, current_user_key.GetKey()) != 0) { @@ -925,6 +928,10 @@ void CompactionJob::RecordDroppedKeys( } if (*key_drop_obsolete > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, *key_drop_obsolete); + if (compaction_job_stats_) { + compaction_job_stats_->num_expired_deletion_records + += *key_drop_obsolete; + } *key_drop_obsolete = 0; } } diff --git a/db/compaction_job_stats_test.cc b/db/compaction_job_stats_test.cc index 29e415787..a138c8d81 100644 --- a/db/compaction_job_stats_test.cc +++ b/db/compaction_job_stats_test.cc @@ -339,6 +339,14 @@ class CompactionJobStatsTest : public testing::Test { } } + static void SetDeletionCompactionStats( + CompactionJobStats *stats, uint64_t input_deletions, + uint64_t expired_deletions, uint64_t records_replaced) { + stats->num_input_deletion_records = input_deletions; + stats->num_expired_deletion_records = expired_deletions; + stats->num_records_replaced = records_replaced; + } + void MakeTableWithKeyValues( Random* rnd, uint64_t smallest, uint64_t largest, int key_size, int value_size, uint64_t interval, @@ -349,6 +357,52 @@ class CompactionJobStatsTest : public testing::Test { } ASSERT_OK(Flush(cf)); } + + // This function behaves with the implicit understanding that two + // rounds of keys are inserted into the database, as per the behavior + // of the DeletionStatsTest. + void SelectivelyDeleteKeys(uint64_t smallest, uint64_t largest, + uint64_t interval, int deletion_interval, int key_size, + uint64_t cutoff_key_num, CompactionJobStats* stats, int cf = 0) { + + // interval needs to be >= 2 so that deletion entries can be inserted + // that are intended to not result in an actual key deletion by using + // an offset of 1 from another existing key + ASSERT_GE(interval, 2); + + uint64_t ctr = 1; + uint32_t deletions_made = 0; + uint32_t num_deleted = 0; + uint32_t num_expired = 0; + for (auto key = smallest; key <= largest; key += interval, ctr++) { + if (ctr % deletion_interval == 0) { + ASSERT_OK(Delete(cf, Key(key, key_size))); + deletions_made++; + num_deleted++; + + if (key > cutoff_key_num) { + num_expired++; + } + } + } + + // Insert some deletions for keys that don't exist that + // are both in and out of the key range + ASSERT_OK(Delete(cf, Key(smallest+1, key_size))); + deletions_made++; + + ASSERT_OK(Delete(cf, Key(smallest-1, key_size))); + deletions_made++; + num_expired++; + + ASSERT_OK(Delete(cf, Key(smallest-9, key_size))); + deletions_made++; + num_expired++; + + ASSERT_OK(Flush(cf)); + SetDeletionCompactionStats(stats, deletions_made, num_expired, + num_deleted); + } }; // An EventListener which helps verify the compaction results in @@ -359,12 +413,11 @@ class CompactionJobStatsChecker : public EventListener { size_t NumberOfUnverifiedStats() { return expected_stats_.size(); } - // Once a compaction completed, this functionw will verify the returned + // Once a compaction completed, this function will verify the returned // CompactionJobInfo with the oldest CompactionJobInfo added earlier // in "expected_stats_" which has not yet being used for verification. virtual void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) { std::lock_guard lock(mutex_); - if (expected_stats_.size()) { Verify(ci.stats, expected_stats_.front()); expected_stats_.pop(); @@ -376,7 +429,7 @@ class CompactionJobStatsChecker : public EventListener { // ASSERT_EQ except for the total input / output bytes, which we // use ASSERT_GE and ASSERT_LE with a reasonable bias --- // 10% in uncompressed case and 20% when compression is used. - void Verify(const CompactionJobStats& current_stats, + virtual void Verify(const CompactionJobStats& current_stats, const CompactionJobStats& stats) { // time ASSERT_GT(current_stats.elapsed_micros, 0U); @@ -440,6 +493,25 @@ class CompactionJobStatsChecker : public EventListener { bool compression_enabled_; }; +// An EventListener which helps verify the compaction statistics in +// the test DeletionStatsTest. +class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker { + public: + // Verifies whether two CompactionJobStats match. + void Verify(const CompactionJobStats& current_stats, + const CompactionJobStats& stats) { + ASSERT_EQ( + current_stats.num_input_deletion_records, + stats.num_input_deletion_records); + ASSERT_EQ( + current_stats.num_expired_deletion_records, + stats.num_expired_deletion_records); + ASSERT_EQ( + current_stats.num_records_replaced, + stats.num_records_replaced); + } +}; + namespace { uint64_t EstimatedFileSize( @@ -681,6 +753,89 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) { ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U); } +TEST_F(CompactionJobStatsTest, DeletionStatsTest) { + Random rnd(301); + uint64_t key_base = 100000l; + // Note: key_base must be multiple of num_keys_per_L0_file + int num_keys_per_L0_file = 20; + const int kTestScale = 8; // make sure this is even + const int kKeySize = 10; + const int kValueSize = 100; + double compression_ratio = 1.0; + uint64_t key_interval = key_base / num_keys_per_L0_file; + uint64_t largest_key_num = key_base * (kTestScale + 1) - key_interval; + uint64_t cutoff_key_num = key_base * (kTestScale / 2 + 1) - key_interval; + const std::string smallest_key = Key(key_base - 10, kKeySize); + const std::string largest_key = Key(largest_key_num + 10, kKeySize); + + // Whenever a compaction completes, this listener will try to + // verify whether the returned CompactionJobStats matches + // what we expect. + auto* stats_checker = new CompactionJobDeletionStatsChecker(); + Options options; + options.listeners.emplace_back(stats_checker); + options.create_if_missing = true; + options.max_background_flushes = 0; + options.max_mem_compaction_level = 0; + options.level0_file_num_compaction_trigger = kTestScale+1; + options.num_levels = 3; + options.compression = kNoCompression; + options.max_bytes_for_level_multiplier = 2; + + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + // Stage 1: Generate several L0 files and then send them to L2 by + // using CompactRangeOptions and CompactRange(). These files will + // have a strict subset of the keys from the full key-range + for (uint64_t start_key = key_base; + start_key <= key_base * kTestScale / 2; + start_key += key_base) { + MakeTableWithKeyValues( + &rnd, start_key, start_key + key_base - 1, + kKeySize, kValueSize, key_interval, + compression_ratio, 1); + } + + CompactRangeOptions cr_options; + cr_options.change_level = true; + cr_options.target_level = 2; + db_->CompactRange(cr_options, handles_[1], nullptr, nullptr); + ASSERT_GT(NumTableFilesAtLevel(2, 1), 0); + + // Stage 2: Generate files including keys from the entire key range + for (uint64_t start_key = key_base; + start_key <= key_base * kTestScale; + start_key += key_base) { + MakeTableWithKeyValues( + &rnd, start_key, start_key + key_base - 1, + kKeySize, kValueSize, key_interval, + compression_ratio, 1); + } + + // Send these L0 files to L1 + TEST_Compact(0, 1, smallest_key, largest_key); + ASSERT_GT(NumTableFilesAtLevel(1, 1), 0); + + // Add a new record and flush so now there is a L0 file + // with a value too (not just deletions from the next step) + ASSERT_OK(Put(1, Key(key_base-6, kKeySize), "test")); + ASSERT_OK(Flush(1)); + + // Stage 3: Generate L0 files with some deletions so now + // there are files with the same key range in L0, L1, and L2 + int deletion_interval = 3; + CompactionJobStats first_compaction_stats; + SelectivelyDeleteKeys(key_base, largest_key_num, + key_interval, deletion_interval, kKeySize, cutoff_key_num, + &first_compaction_stats, 1); + + stats_checker->AddExpectedStats(first_compaction_stats); + + // Stage 4: Trigger compaction and verify the stats + TEST_Compact(0, 1, smallest_key, largest_key); +} + namespace { int GetUniversalCompactionInputUnits(uint32_t num_flushes) { uint32_t compaction_input_units; diff --git a/include/rocksdb/compaction_job_stats.h b/include/rocksdb/compaction_job_stats.h index 7c05c827e..50bbdab33 100644 --- a/include/rocksdb/compaction_job_stats.h +++ b/include/rocksdb/compaction_job_stats.h @@ -36,7 +36,9 @@ struct CompactionJobStats { // the size of the compaction output in bytes. uint64_t total_output_bytes; - // number of records being replaced by newer record associated with same key + // number of records being replaced by newer record associated with same key. + // this could be a new value or a deletion entry for that key so this field + // sums up all updated and deleted keys uint64_t num_records_replaced; // the sum of the uncompressed input keys in bytes. @@ -44,6 +46,15 @@ struct CompactionJobStats { // the sum of the uncompressed input values in bytes. uint64_t total_input_raw_value_bytes; + // the number of deletion entries before compaction. Deletion entries + // can disappear after compaction because they expired + uint64_t num_input_deletion_records; + + // number of deletion records that were found obsolete and discarded + // because it is not possible to delete any more keys with this entry + // (i.e. all possible deletions resulting from it have been completed) + uint64_t num_expired_deletion_records; + // 0-terminated strings storing the first 8 bytes of the smallest and // largest key in the output. static const size_t kMaxPrefixLength = 8; diff --git a/util/compaction_job_stats_impl.cc b/util/compaction_job_stats_impl.cc index b55b112a0..2496b1097 100644 --- a/util/compaction_job_stats_impl.cc +++ b/util/compaction_job_stats_impl.cc @@ -29,12 +29,14 @@ void CompactionJobStats::Reset() { num_records_replaced = 0; is_manual_compaction = 0; + + num_input_deletion_records = 0; + num_expired_deletion_records = 0; } #else -void CompactionJobStats::Reset() { -} +void CompactionJobStats::Reset() {} #endif // !ROCKSDB_LITE