Add tombstone information in CompactionJobStats

Summary:
Added new statistics in CompactionJobStats to keep track of
deletion entries and the expiration of those entries. Updated these
fields in compaction_job.cc as compaction took place and wrote a new
test in compaction_job_stats_test.cc to verify accuracy.

Test Plan:
Wrote new test DeletionStatsTest in
compaction_job_stats_test.cc to verify

Reviewers: sdong, igor, yhchiang

Reviewed By: yhchiang

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D41355
main
Ari Ekmekji 9 years ago
parent f9728640f3
commit 8bca83e5dd
  1. 13
      db/compaction_job.cc
  2. 161
      db/compaction_job_stats_test.cc
  3. 13
      include/rocksdb/compaction_job_stats.h
  4. 6
      util/compaction_job_stats_impl.cc

@ -369,8 +369,8 @@ Status CompactionJob::Run() {
!cfd->IsDropped()) {
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
// compacting column family. we should also check if flush is necessary
// on other column families, too
imm_micros += yield_callback_();
@ -646,7 +646,6 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
if (combined_idx >= compact_->combined_key_buf_.size()) {
break;
}
assert(combined_idx < compact_->combined_key_buf_.size());
key = compact_->combined_key_buf_[combined_idx];
value = compact_->combined_value_buf_[combined_idx];
@ -680,6 +679,10 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
} else {
if (ikey.type == kTypeDeletion) {
compaction_job_stats_->num_input_deletion_records++;
}
if (!has_current_user_key ||
cfd->user_comparator()->Compare(ikey.user_key,
current_user_key.GetKey()) != 0) {
@ -925,6 +928,10 @@ void CompactionJob::RecordDroppedKeys(
}
if (*key_drop_obsolete > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, *key_drop_obsolete);
if (compaction_job_stats_) {
compaction_job_stats_->num_expired_deletion_records
+= *key_drop_obsolete;
}
*key_drop_obsolete = 0;
}
}

@ -339,6 +339,14 @@ class CompactionJobStatsTest : public testing::Test {
}
}
static void SetDeletionCompactionStats(
CompactionJobStats *stats, uint64_t input_deletions,
uint64_t expired_deletions, uint64_t records_replaced) {
stats->num_input_deletion_records = input_deletions;
stats->num_expired_deletion_records = expired_deletions;
stats->num_records_replaced = records_replaced;
}
void MakeTableWithKeyValues(
Random* rnd, uint64_t smallest, uint64_t largest,
int key_size, int value_size, uint64_t interval,
@ -349,6 +357,52 @@ class CompactionJobStatsTest : public testing::Test {
}
ASSERT_OK(Flush(cf));
}
// This function behaves with the implicit understanding that two
// rounds of keys are inserted into the database, as per the behavior
// of the DeletionStatsTest.
void SelectivelyDeleteKeys(uint64_t smallest, uint64_t largest,
uint64_t interval, int deletion_interval, int key_size,
uint64_t cutoff_key_num, CompactionJobStats* stats, int cf = 0) {
// interval needs to be >= 2 so that deletion entries can be inserted
// that are intended to not result in an actual key deletion by using
// an offset of 1 from another existing key
ASSERT_GE(interval, 2);
uint64_t ctr = 1;
uint32_t deletions_made = 0;
uint32_t num_deleted = 0;
uint32_t num_expired = 0;
for (auto key = smallest; key <= largest; key += interval, ctr++) {
if (ctr % deletion_interval == 0) {
ASSERT_OK(Delete(cf, Key(key, key_size)));
deletions_made++;
num_deleted++;
if (key > cutoff_key_num) {
num_expired++;
}
}
}
// Insert some deletions for keys that don't exist that
// are both in and out of the key range
ASSERT_OK(Delete(cf, Key(smallest+1, key_size)));
deletions_made++;
ASSERT_OK(Delete(cf, Key(smallest-1, key_size)));
deletions_made++;
num_expired++;
ASSERT_OK(Delete(cf, Key(smallest-9, key_size)));
deletions_made++;
num_expired++;
ASSERT_OK(Flush(cf));
SetDeletionCompactionStats(stats, deletions_made, num_expired,
num_deleted);
}
};
// An EventListener which helps verify the compaction results in
@ -359,12 +413,11 @@ class CompactionJobStatsChecker : public EventListener {
size_t NumberOfUnverifiedStats() { return expected_stats_.size(); }
// Once a compaction completed, this functionw will verify the returned
// Once a compaction completed, this function will verify the returned
// CompactionJobInfo with the oldest CompactionJobInfo added earlier
// in "expected_stats_" which has not yet being used for verification.
virtual void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) {
std::lock_guard<std::mutex> lock(mutex_);
if (expected_stats_.size()) {
Verify(ci.stats, expected_stats_.front());
expected_stats_.pop();
@ -376,7 +429,7 @@ class CompactionJobStatsChecker : public EventListener {
// ASSERT_EQ except for the total input / output bytes, which we
// use ASSERT_GE and ASSERT_LE with a reasonable bias ---
// 10% in uncompressed case and 20% when compression is used.
void Verify(const CompactionJobStats& current_stats,
virtual void Verify(const CompactionJobStats& current_stats,
const CompactionJobStats& stats) {
// time
ASSERT_GT(current_stats.elapsed_micros, 0U);
@ -440,6 +493,25 @@ class CompactionJobStatsChecker : public EventListener {
bool compression_enabled_;
};
// An EventListener which helps verify the compaction statistics in
// the test DeletionStatsTest.
class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker {
public:
// Verifies whether two CompactionJobStats match.
void Verify(const CompactionJobStats& current_stats,
const CompactionJobStats& stats) {
ASSERT_EQ(
current_stats.num_input_deletion_records,
stats.num_input_deletion_records);
ASSERT_EQ(
current_stats.num_expired_deletion_records,
stats.num_expired_deletion_records);
ASSERT_EQ(
current_stats.num_records_replaced,
stats.num_records_replaced);
}
};
namespace {
uint64_t EstimatedFileSize(
@ -681,6 +753,89 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
}
TEST_F(CompactionJobStatsTest, DeletionStatsTest) {
Random rnd(301);
uint64_t key_base = 100000l;
// Note: key_base must be multiple of num_keys_per_L0_file
int num_keys_per_L0_file = 20;
const int kTestScale = 8; // make sure this is even
const int kKeySize = 10;
const int kValueSize = 100;
double compression_ratio = 1.0;
uint64_t key_interval = key_base / num_keys_per_L0_file;
uint64_t largest_key_num = key_base * (kTestScale + 1) - key_interval;
uint64_t cutoff_key_num = key_base * (kTestScale / 2 + 1) - key_interval;
const std::string smallest_key = Key(key_base - 10, kKeySize);
const std::string largest_key = Key(largest_key_num + 10, kKeySize);
// Whenever a compaction completes, this listener will try to
// verify whether the returned CompactionJobStats matches
// what we expect.
auto* stats_checker = new CompactionJobDeletionStatsChecker();
Options options;
options.listeners.emplace_back(stats_checker);
options.create_if_missing = true;
options.max_background_flushes = 0;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = kTestScale+1;
options.num_levels = 3;
options.compression = kNoCompression;
options.max_bytes_for_level_multiplier = 2;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// Stage 1: Generate several L0 files and then send them to L2 by
// using CompactRangeOptions and CompactRange(). These files will
// have a strict subset of the keys from the full key-range
for (uint64_t start_key = key_base;
start_key <= key_base * kTestScale / 2;
start_key += key_base) {
MakeTableWithKeyValues(
&rnd, start_key, start_key + key_base - 1,
kKeySize, kValueSize, key_interval,
compression_ratio, 1);
}
CompactRangeOptions cr_options;
cr_options.change_level = true;
cr_options.target_level = 2;
db_->CompactRange(cr_options, handles_[1], nullptr, nullptr);
ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
// Stage 2: Generate files including keys from the entire key range
for (uint64_t start_key = key_base;
start_key <= key_base * kTestScale;
start_key += key_base) {
MakeTableWithKeyValues(
&rnd, start_key, start_key + key_base - 1,
kKeySize, kValueSize, key_interval,
compression_ratio, 1);
}
// Send these L0 files to L1
TEST_Compact(0, 1, smallest_key, largest_key);
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
// Add a new record and flush so now there is a L0 file
// with a value too (not just deletions from the next step)
ASSERT_OK(Put(1, Key(key_base-6, kKeySize), "test"));
ASSERT_OK(Flush(1));
// Stage 3: Generate L0 files with some deletions so now
// there are files with the same key range in L0, L1, and L2
int deletion_interval = 3;
CompactionJobStats first_compaction_stats;
SelectivelyDeleteKeys(key_base, largest_key_num,
key_interval, deletion_interval, kKeySize, cutoff_key_num,
&first_compaction_stats, 1);
stats_checker->AddExpectedStats(first_compaction_stats);
// Stage 4: Trigger compaction and verify the stats
TEST_Compact(0, 1, smallest_key, largest_key);
}
namespace {
int GetUniversalCompactionInputUnits(uint32_t num_flushes) {
uint32_t compaction_input_units;

@ -36,7 +36,9 @@ struct CompactionJobStats {
// the size of the compaction output in bytes.
uint64_t total_output_bytes;
// number of records being replaced by newer record associated with same key
// number of records being replaced by newer record associated with same key.
// this could be a new value or a deletion entry for that key so this field
// sums up all updated and deleted keys
uint64_t num_records_replaced;
// the sum of the uncompressed input keys in bytes.
@ -44,6 +46,15 @@ struct CompactionJobStats {
// the sum of the uncompressed input values in bytes.
uint64_t total_input_raw_value_bytes;
// the number of deletion entries before compaction. Deletion entries
// can disappear after compaction because they expired
uint64_t num_input_deletion_records;
// number of deletion records that were found obsolete and discarded
// because it is not possible to delete any more keys with this entry
// (i.e. all possible deletions resulting from it have been completed)
uint64_t num_expired_deletion_records;
// 0-terminated strings storing the first 8 bytes of the smallest and
// largest key in the output.
static const size_t kMaxPrefixLength = 8;

@ -29,12 +29,14 @@ void CompactionJobStats::Reset() {
num_records_replaced = 0;
is_manual_compaction = 0;
num_input_deletion_records = 0;
num_expired_deletion_records = 0;
}
#else
void CompactionJobStats::Reset() {
}
void CompactionJobStats::Reset() {}
#endif // !ROCKSDB_LITE

Loading…
Cancel
Save