Count number of corrupt keys during compaction

Summary:
For task #7771355, we would like to log the number of corrupt keys
during a compaction. This patch implements and tests the count
as part of CompactionJobStats.

Test Plan: make && make check

Reviewers: rven, igor, yhchiang, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D42921
main
Andres Notzli 9 years ago
parent 1bdfcef7bf
commit e95c59cd2f
  1. 4
      db/compaction_job.cc
  2. 6
      db/compaction_job_stats_test.cc
  3. 23
      db/compaction_job_test.cc
  4. 5
      include/rocksdb/compaction_job_stats.h
  5. 17
      util/compaction_job_stats_impl.cc

@ -385,6 +385,10 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
if (compaction_job_stats_ != nullptr) {
compaction_job_stats_->num_corrupt_keys++;
}
} else {
if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) {
compaction_job_stats_->num_input_deletion_records++;

@ -467,6 +467,9 @@ class CompactionJobStatsChecker : public EventListener {
ASSERT_EQ(current_stats.num_records_replaced,
stats.num_records_replaced);
ASSERT_EQ(current_stats.num_corrupt_keys,
stats.num_corrupt_keys);
ASSERT_EQ(
std::string(current_stats.smallest_output_key_prefix),
std::string(stats.smallest_output_key_prefix));
@ -509,6 +512,9 @@ class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker {
ASSERT_EQ(
current_stats.num_records_replaced,
stats.num_records_replaced);
ASSERT_EQ(current_stats.num_corrupt_keys,
stats.num_corrupt_keys);
}
};

@ -46,16 +46,14 @@ void VerifyInitializationOfCompactionJobStats(
ASSERT_EQ(compaction_job_stats.largest_output_key_prefix[0], 0);
ASSERT_EQ(compaction_job_stats.num_records_replaced, 0U);
ASSERT_EQ(compaction_job_stats.num_input_deletion_records, 0U);
ASSERT_EQ(compaction_job_stats.num_expired_deletion_records, 0U);
ASSERT_EQ(compaction_job_stats.num_corrupt_keys, 0U);
#endif // !defined(IOS_CROSS_COMPILE)
}
void VerifyCompactionJobStats(const CompactionJobStats& compaction_job_stats,
const std::vector<FileMetaData*>& files,
size_t num_output_files) {
ASSERT_GE(compaction_job_stats.elapsed_micros, 0U);
ASSERT_EQ(compaction_job_stats.num_input_files, files.size());
ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files);
}
} // namespace
// TODO(icanadi) Make it simpler once we mock out VersionSet
@ -197,13 +195,12 @@ class CompactionJobTest : public testing::Test {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
mutex_.Lock();
EventLogger event_logger(db_options_.info_log.get());
CompactionJobStats compaction_job_stats;
CompactionJob compaction_job(
0, &compaction, db_options_, env_options_, versions_.get(),
&shutting_down_, &log_buffer, nullptr, nullptr, nullptr, {},
table_cache_, &event_logger, false, dbname_, &compaction_job_stats);
table_cache_, &event_logger, false, dbname_, &compaction_job_stats_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
compaction_job.Prepare();
mutex_.Unlock();
@ -214,7 +211,9 @@ class CompactionJobTest : public testing::Test {
ASSERT_OK(s);
mutex_.Unlock();
VerifyCompactionJobStats(compaction_job_stats, files, 1);
ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
ASSERT_EQ(compaction_job_stats_.num_input_files, files.size());
ASSERT_EQ(compaction_job_stats_.num_output_files, 1U);
}
Env* env_;
@ -230,6 +229,7 @@ class CompactionJobTest : public testing::Test {
InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_;
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
CompactionJobStats compaction_job_stats_;
};
TEST_F(CompactionJobTest, Simple) {
@ -248,6 +248,7 @@ TEST_F(CompactionJobTest, SimpleCorrupted) {
auto files = cfd->current()->storage_info()->LevelFiles(0);
RunCompaction(files);
ASSERT_EQ(compaction_job_stats_.num_corrupt_keys, 400U);
mock_table_factory_->AssertLatestFile(expected_results);
}

@ -49,12 +49,15 @@ struct CompactionJobStats {
// the number of deletion entries before compaction. Deletion entries
// can disappear after compaction because they expired
uint64_t num_input_deletion_records;
// number of deletion records that were found obsolete and discarded
// because it is not possible to delete any more keys with this entry
// (i.e. all possible deletions resulting from it have been completed)
uint64_t num_expired_deletion_records;
// number of corrupt keys (ParseInternalKey returned false when applied to
// the key) encountered and written out.
uint64_t num_corrupt_keys;
// 0-terminated strings storing the first 8 bytes of the smallest and
// largest key in the output.
static const size_t kMaxPrefixLength = 8;

@ -3,8 +3,7 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <cstring>
#include "include/rocksdb/compaction_job_stats.h"
#include "rocksdb/compaction_job_stats.h"
namespace rocksdb {
@ -13,25 +12,27 @@ namespace rocksdb {
void CompactionJobStats::Reset() {
elapsed_micros = 0;
num_input_records = 0;
num_input_files = 0;
num_input_files_at_output_level = 0;
num_output_files = 0;
num_input_records = 0;
num_output_records = 0;
num_output_files = 0;
is_manual_compaction = 0;
total_input_bytes = 0;
total_output_bytes = 0;
total_input_raw_key_bytes = 0;
total_input_raw_value_bytes = 0;
num_records_replaced = 0;
is_manual_compaction = 0;
total_input_raw_key_bytes = 0;
total_input_raw_value_bytes = 0;
num_input_deletion_records = 0;
num_expired_deletion_records = 0;
num_corrupt_keys = 0;
}
#else

Loading…
Cancel
Save