From e4af3bfb27679abdb22e5c6e33ecf23abb9f8f54 Mon Sep 17 00:00:00 2001 From: Andres Notzli Date: Thu, 16 Jul 2015 09:18:35 -0700 Subject: [PATCH] Test for compaction of corrupted keys Summary: Fixes T7697334. Adds a simple test to check whether CompactionJob deals with corrupted keys correctly. Right now, we preserve corrupted keys. Note: depending on the type of corruption and options like comparators, CompactionJob fails. This test just checks whether corrupted keys that do not fail CompactionJob are preserved. Test Plan: `make compaction_job_test && ./compaction_job_test` -> Tests pass. Add `input->Next(); continue;` in CompactionJob::ProcessKeyValueCompaction() inside then-branch of `!ParseInternalKey(key, &ikey)` -> Tests fail. Reviewers: sdong, rven, yhchiang, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D42237 --- db/compaction_job_test.cc | 187 +++++++++++++++++++++++--------------- 1 file changed, 112 insertions(+), 75 deletions(-) diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 1cabee24b..e485de9d4 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -20,6 +20,43 @@ namespace rocksdb { +namespace { +void VerifyInitializationOfCompactionJobStats( + const CompactionJobStats& compaction_job_stats) { +#if !defined(IOS_CROSS_COMPILE) + ASSERT_EQ(compaction_job_stats.elapsed_micros, 0U); + + ASSERT_EQ(compaction_job_stats.num_input_records, 0U); + ASSERT_EQ(compaction_job_stats.num_input_files, 0U); + ASSERT_EQ(compaction_job_stats.num_input_files_at_output_level, 0U); + + ASSERT_EQ(compaction_job_stats.num_output_records, 0U); + ASSERT_EQ(compaction_job_stats.num_output_files, 0U); + + ASSERT_EQ(compaction_job_stats.is_manual_compaction, false); + + ASSERT_EQ(compaction_job_stats.total_input_bytes, 0U); + ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U); + + ASSERT_EQ(compaction_job_stats.total_input_raw_key_bytes, 0U); + ASSERT_EQ(compaction_job_stats.total_input_raw_value_bytes, 0U); + + ASSERT_EQ(compaction_job_stats.smallest_output_key_prefix[0], 0); + ASSERT_EQ(compaction_job_stats.largest_output_key_prefix[0], 0); + + ASSERT_EQ(compaction_job_stats.num_records_replaced, 0U); +#endif // !defined(IOS_CROSS_COMPILE) +} + +void VerifyCompactionJobStats(const CompactionJobStats& compaction_job_stats, + const std::vector& files, + size_t num_output_files) { + ASSERT_GE(compaction_job_stats.elapsed_micros, 0U); + ASSERT_EQ(compaction_job_stats.num_input_files, files.size()); + ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files); +} +} // namespace + // TODO(icanadi) Make it simpler once we mock out VersionSet class CompactionJobTest : public testing::Test { public: @@ -53,22 +90,40 @@ class CompactionJobTest : public testing::Test { return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId()); } + // Corrupts key by changing the type + void CorruptKey(InternalKey* ikey) { + std::string keystr = ikey->Encode().ToString(); + keystr[keystr.size() - 8] = kTypeLogData; + ikey->DecodeFrom(Slice(keystr.data(), keystr.size())); + } + // returns expected result after compaction - mock::MockFileContents CreateTwoFiles() { + mock::MockFileContents CreateTwoFiles(bool gen_corrupted_keys) { mock::MockFileContents expected_results; const int kKeysPerFile = 10000; + const int kCorruptKeysPerFile = 200; + const int kMatchingKeys = kKeysPerFile / 2; SequenceNumber sequence_number = 0; + + auto corrupt_id = [&](int id) { + return gen_corrupted_keys && id > 0 && id <= kCorruptKeysPerFile; + }; + for (int i = 0; i < 2; ++i) { mock::MockFileContents contents; SequenceNumber smallest_seqno = 0, largest_seqno = 0; InternalKey smallest, largest; for (int k = 0; k < kKeysPerFile; ++k) { - auto key = ToString(i * (kKeysPerFile / 2) + k); + auto key = ToString(i * kMatchingKeys + k); auto value = ToString(i * kKeysPerFile + k); InternalKey internal_key(key, ++sequence_number, kTypeValue); // This is how the key will look like once it's written in bottommost // file InternalKey bottommost_internal_key(key, 0, kTypeValue); + if (corrupt_id(k)) { + CorruptKey(&internal_key); + CorruptKey(&bottommost_internal_key); + } if (k == 0) { smallest = internal_key; smallest_seqno = sequence_number; @@ -77,7 +132,7 @@ class CompactionJobTest : public testing::Test { largest_seqno = sequence_number; } contents.insert({internal_key.Encode().ToString(), value}); - if (i == 1 || k < kKeysPerFile / 2) { + if (i == 1 || k < kMatchingKeys || corrupt_id(k - kMatchingKeys)) { expected_results.insert( {bottommost_internal_key.Encode().ToString(), value}); } @@ -122,6 +177,45 @@ class CompactionJobTest : public testing::Test { s = SetCurrentFile(env_, dbname_, 1, nullptr); } + void RunCompaction(std::function yield_callback, + const std::vector& files) { + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + + CompactionInputFiles compaction_input_files; + compaction_input_files.level = 0; + for (auto file : files) { + compaction_input_files.files.push_back(file); + } + Compaction compaction(cfd->current()->storage_info(), + *cfd->GetLatestMutableCFOptions(), + {compaction_input_files}, 1, 1024 * 1024, 10, 0, + kNoCompression, {}); + compaction.SetInputVersion(cfd->current()); + + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); + mutex_.Lock(); + EventLogger event_logger(db_options_.info_log.get()); + CompactionJobStats compaction_job_stats; + CompactionJob compaction_job(0, &compaction, db_options_, env_options_, + versions_.get(), &shutting_down_, &log_buffer, + nullptr, nullptr, nullptr, {}, table_cache_, + yield_callback, &event_logger, false, + dbname_, &compaction_job_stats); + + VerifyInitializationOfCompactionJobStats(compaction_job_stats); + + compaction_job.Prepare(); + mutex_.Unlock(); + ASSERT_OK(compaction_job.Run()); + mutex_.Lock(); + Status s; + compaction_job.Install(&s, *cfd->GetLatestMutableCFOptions(), &mutex_); + ASSERT_OK(s); + mutex_.Unlock(); + + VerifyCompactionJobStats(compaction_job_stats, files, 1); + } + Env* env_; std::string dbname_; EnvOptions env_options_; @@ -137,93 +231,36 @@ class CompactionJobTest : public testing::Test { std::shared_ptr mock_table_factory_; }; -namespace { -void VerifyInitializationOfCompactionJobStats( - const CompactionJobStats& compaction_job_stats) { -#if !defined(IOS_CROSS_COMPILE) - ASSERT_EQ(compaction_job_stats.elapsed_micros, 0U); - - ASSERT_EQ(compaction_job_stats.num_input_records, 0U); - ASSERT_EQ(compaction_job_stats.num_input_files, 0U); - ASSERT_EQ(compaction_job_stats.num_input_files_at_output_level, 0U); - - ASSERT_EQ(compaction_job_stats.num_output_records, 0U); - ASSERT_EQ(compaction_job_stats.num_output_files, 0U); - - ASSERT_EQ(compaction_job_stats.is_manual_compaction, false); - - ASSERT_EQ(compaction_job_stats.total_input_bytes, 0U); - ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U); - - ASSERT_EQ(compaction_job_stats.total_input_raw_key_bytes, 0U); - ASSERT_EQ(compaction_job_stats.total_input_raw_value_bytes, 0U); - - ASSERT_EQ(compaction_job_stats.smallest_output_key_prefix[0], 0); - ASSERT_EQ(compaction_job_stats.largest_output_key_prefix[0], 0); - - ASSERT_EQ(compaction_job_stats.num_records_replaced, 0U); -#endif // !defined(IOS_CROSS_COMPILE) -} - -void VerifyCompactionJobStats(const CompactionJobStats& compaction_job_stats, - const std::vector& files, - size_t num_output_files) { - ASSERT_GE(compaction_job_stats.elapsed_micros, 0U); - ASSERT_EQ(compaction_job_stats.num_input_files, files.size()); - ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files); -} -} // namespace - TEST_F(CompactionJobTest, Simple) { + auto expected_results = CreateTwoFiles(false); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); - - auto expected_results = CreateTwoFiles(); - auto files = cfd->current()->storage_info()->LevelFiles(0); ASSERT_EQ(2U, files.size()); - CompactionInputFiles compaction_input_files; - compaction_input_files.level = 0; - compaction_input_files.files.push_back(files[0]); - compaction_input_files.files.push_back(files[1]); - std::unique_ptr compaction(new Compaction( - cfd->current()->storage_info(), *cfd->GetLatestMutableCFOptions(), - {compaction_input_files}, 1, 1024 * 1024, 10, 0, kNoCompression, {})); - compaction->SetInputVersion(cfd->current()); - int yield_callback_called = 0; std::function yield_callback = [&]() { yield_callback_called++; return 0; }; - LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); - mutex_.Lock(); - EventLogger event_logger(db_options_.info_log.get()); - std::string db_name = "dbname"; - CompactionJobStats compaction_job_stats; - CompactionJob compaction_job(0, compaction.get(), db_options_, env_options_, - versions_.get(), &shutting_down_, &log_buffer, - nullptr, nullptr, nullptr, {}, table_cache_, - std::move(yield_callback), &event_logger, false, - db_name, &compaction_job_stats); - - VerifyInitializationOfCompactionJobStats(compaction_job_stats); - - compaction_job.Prepare(); - mutex_.Unlock(); - ASSERT_OK(compaction_job.Run()); - mutex_.Lock(); - Status s; - compaction_job.Install(&s, *cfd->GetLatestMutableCFOptions(), &mutex_); - ASSERT_OK(s); - mutex_.Unlock(); - - VerifyCompactionJobStats(compaction_job_stats, files, 1); + RunCompaction(std::move(yield_callback), files); mock_table_factory_->AssertLatestFile(expected_results); ASSERT_EQ(yield_callback_called, 20000); } +TEST_F(CompactionJobTest, SimpleCorrupted) { + auto expected_results = CreateTwoFiles(true); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto files = cfd->current()->storage_info()->LevelFiles(0); + + std::function yield_callback = [&]() { + return 0; + }; + + RunCompaction(std::move(yield_callback), files); + mock_table_factory_->AssertLatestFile(expected_results); +} + } // namespace rocksdb int main(int argc, char** argv) {