From 6a0f63763383e969d7bde6fd2b47c0d0084bcb4a Mon Sep 17 00:00:00 2001 From: Changyu Bi Date: Fri, 28 Jul 2023 09:47:31 -0700 Subject: [PATCH] Compare the number of input keys and processed keys for compactions (#11571) Summary: ... to improve data integrity validation during compaction. A new option `compaction_verify_record_count` is introduced for this verification and is enabled by default. One exception when the verification is not done is when a compaction filter returns kRemoveAndSkipUntil which can cause CompactionIterator to seek until some key and hence not able to keep track of the number of keys processed. For expected number of input keys, we sum over the number of total keys - number of range tombstones across compaction input files (`CompactionJob::UpdateCompactionStats()`). Table properties are consulted if `FileMetaData` is not initialized for some input file. Since table properties for all input files were also constructed during `DBImpl::NotifyOnCompactionBegin()`, `Compaction::GetTableProperties()` is introduced to reduce duplicated code. For actual number of keys processed, each subcompaction will record its number of keys processed to `sub_compact->compaction_job_stats.num_input_records` and aggregated when all subcompactions finish (`CompactionJob::AggregateCompactionStats()`). In the case when some subcompaction encountered kRemoveAndSkipUntil from compaction filter and does not have accurate count, it propagates this information through `sub_compact->compaction_job_stats.has_num_input_records`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11571 Test Plan: * Add a new unit test `DBCompactionTest.VerifyRecordCount` for the corruption case. * All other unit tests for non-corrupted case. * Ran crash test for a few hours: `python3 ./tools/db_crashtest.py whitebox --simple` Reviewed By: ajkr Differential Revision: D47131965 Pulled By: cbi42 fbshipit-source-id: cc8e94565dd526c4347e9d3843ecf32f6727af92 --- db/builder.cc | 4 +- db/compaction/compaction.cc | 29 +++++ db/compaction/compaction.h | 19 ++- db/compaction/compaction_iterator.cc | 13 +- db/compaction/compaction_iterator.h | 23 +++- db/compaction/compaction_iterator_test.cc | 4 +- db/compaction/compaction_job.cc | 121 ++++++++++++++---- db/compaction/compaction_job.h | 19 ++- db/compaction/compaction_job_test.cc | 3 +- db/db_compaction_test.cc | 37 ++++++ db/db_impl/db_impl.h | 2 +- db/db_impl/db_impl_compaction_flush.cc | 25 +--- db/flush_job.cc | 1 + db/version_edit.h | 3 +- include/rocksdb/compaction_job_stats.h | 3 + include/rocksdb/options.h | 20 ++- options/db_options.cc | 7 + options/db_options.h | 1 + options/options_helper.cc | 2 + options/options_settable_test.cc | 1 + table/mock_table.cc | 8 +- .../compaction_verify_input_count.md | 1 + util/compaction_job_stats_impl.cc | 2 + 23 files changed, 270 insertions(+), 78 deletions(-) create mode 100644 unreleased_history/new_features/compaction_verify_input_count.md diff --git a/db/builder.cc b/db/builder.cc index 84d2396c3..a3a6bc47e 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -203,6 +203,7 @@ Status BuildTable( blob_file_builder.get(), ioptions.allow_data_in_errors, ioptions.enforce_single_del_contracts, /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, + true /* must_count_input_entries */, /*compaction=*/nullptr, compaction_filter.get(), /*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low); @@ -286,8 +287,9 @@ Status BuildTable( TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable"); const bool empty = builder->IsEmpty(); if (num_input_entries != nullptr) { + assert(c_iter.HasNumInputEntryScanned()); *num_input_entries = - c_iter.num_input_entry_scanned() + num_unfragmented_tombstones; + c_iter.NumInputEntryScanned() + num_unfragmented_tombstones; } if (!s.ok() || empty) { builder->Abandon(); diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index ceed9d104..e28257d65 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -13,6 +13,7 @@ #include #include "db/column_family.h" +#include "logging/logging.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/sst_partitioner.h" #include "test_util/sync_point.h" @@ -203,6 +204,34 @@ bool Compaction::IsFullCompaction( return num_files_in_compaction == total_num_files; } +const TablePropertiesCollection& Compaction::GetTableProperties() { + if (!input_table_properties_initialized_) { + const ReadOptions read_options(Env::IOActivity::kCompaction); + for (size_t i = 0; i < num_input_levels(); ++i) { + for (const FileMetaData* fmd : *(this->inputs(i))) { + std::shared_ptr tp; + std::string file_name = + TableFileName(immutable_options_.cf_paths, fmd->fd.GetNumber(), + fmd->fd.GetPathId()); + Status s = input_version_->GetTableProperties(read_options, &tp, fmd, + &file_name); + if (s.ok()) { + table_properties_[file_name] = tp; + } else { + ROCKS_LOG_ERROR(immutable_options_.info_log, + "Unable to load table properties for file %" PRIu64 + " --- %s\n", + fmd->fd.GetNumber(), s.ToString().c_str()); + } + } + } + + input_table_properties_initialized_ = true; + }; + + return table_properties_; +} + Compaction::Compaction( VersionStorageInfo* vstorage, const ImmutableOptions& _immutable_options, const MutableCFOptions& _mutable_cf_options, diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 1bd406bc9..fcb0f3003 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -326,12 +326,16 @@ class Compaction { int output_level, VersionStorageInfo* vstorage, const std::vector& inputs); - TablePropertiesCollection GetOutputTableProperties() const { - return output_table_properties_; - } - - void SetOutputTableProperties(TablePropertiesCollection tp) { - output_table_properties_ = std::move(tp); + // If called before a compaction finishes, will return + // table properties of all compaction input files. + // If called after a compaction finished, will return + // table properties of all compaction input and output files. + const TablePropertiesCollection& GetTableProperties(); + + void SetOutputTableProperties( + const std::string& file_name, + const std::shared_ptr& tp) { + table_properties_[file_name] = tp; } Slice GetSmallestUserKey() const { return smallest_user_key_; } @@ -518,8 +522,9 @@ class Compaction { // Does input compression match the output compression? bool InputCompressionMatchesOutput() const; + bool input_table_properties_initialized_ = false; // table properties of output files - TablePropertiesCollection output_table_properties_; + TablePropertiesCollection table_properties_; // smallest user keys in compaction // includes timestamp if user-defined timestamp is enabled. diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 5be7b565a..1c3ca5e1e 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -31,7 +31,8 @@ CompactionIterator::CompactionIterator( BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, bool enforce_single_del_contracts, const std::atomic& manual_compaction_canceled, - const Compaction* compaction, const CompactionFilter* compaction_filter, + bool must_count_input_entries, const Compaction* compaction, + const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const std::shared_ptr info_log, const std::string* full_history_ts_low, @@ -45,8 +46,9 @@ CompactionIterator::CompactionIterator( manual_compaction_canceled, std::unique_ptr( compaction ? new RealCompaction(compaction) : nullptr), - compaction_filter, shutting_down, info_log, full_history_ts_low, - preserve_time_min_seqno, preclude_last_level_min_seqno) {} + must_count_input_entries, compaction_filter, shutting_down, info_log, + full_history_ts_low, preserve_time_min_seqno, + preclude_last_level_min_seqno) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -58,15 +60,14 @@ CompactionIterator::CompactionIterator( BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, bool enforce_single_del_contracts, const std::atomic& manual_compaction_canceled, - std::unique_ptr compaction, + std::unique_ptr compaction, bool must_count_input_entries, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const std::shared_ptr info_log, const std::string* full_history_ts_low, const SequenceNumber preserve_time_min_seqno, const SequenceNumber preclude_last_level_min_seqno) - : input_(input, cmp, - !compaction || compaction->DoesInputReferenceBlobFiles()), + : input_(input, cmp, must_count_input_entries), cmp_(cmp), merge_helper_(merge_helper), snapshots_(snapshots), diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index ea2dc062e..15193b587 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -38,15 +38,18 @@ class SequenceIterWrapper : public InternalIterator { bool Valid() const override { return inner_iter_->Valid(); } Status status() const override { return inner_iter_->status(); } void Next() override { - num_itered_++; + if (!inner_iter_->IsDeleteRangeSentinelKey()) { + num_itered_++; + } inner_iter_->Next(); } void Seek(const Slice& target) override { if (!need_count_entries_) { + has_num_itered_ = false; inner_iter_->Seek(target); } else { - // For flush cases, we need to count total number of entries, so we - // do Next() rather than Seek(). + // Need to count total number of entries, + // so we do Next() rather than Seek(). while (inner_iter_->Valid() && icmp_.Compare(inner_iter_->key(), target) < 0) { Next(); @@ -62,7 +65,8 @@ class SequenceIterWrapper : public InternalIterator { void SeekForPrev(const Slice& /* target */) override { assert(false); } void SeekToLast() override { assert(false); } - uint64_t num_itered() const { return num_itered_; } + uint64_t NumItered() const { return num_itered_; } + bool HasNumItered() const { return has_num_itered_; } bool IsDeleteRangeSentinelKey() const override { assert(Valid()); return inner_iter_->IsDeleteRangeSentinelKey(); @@ -73,6 +77,7 @@ class SequenceIterWrapper : public InternalIterator { InternalIterator* inner_iter_; // not owned uint64_t num_itered_ = 0; bool need_count_entries_; + bool has_num_itered_ = true; }; class CompactionIterator { @@ -189,6 +194,10 @@ class CompactionIterator { const Compaction* compaction_; }; + // @param must_count_input_entries if true, `NumInputEntryScanned()` will + // return the number of input keys scanned. If false, `NumInputEntryScanned()` + // will return this number if no Seek was called on `input`. User should call + // `HasNumInputEntryScanned()` first in this case. CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, @@ -199,7 +208,7 @@ class CompactionIterator { BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, bool enforce_single_del_contracts, const std::atomic& manual_compaction_canceled, - const Compaction* compaction = nullptr, + bool must_count_input_entries, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const std::shared_ptr info_log = nullptr, @@ -219,6 +228,7 @@ class CompactionIterator { bool enforce_single_del_contracts, const std::atomic& manual_compaction_canceled, std::unique_ptr compaction, + bool must_count_input_entries, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const std::shared_ptr info_log = nullptr, @@ -253,7 +263,8 @@ class CompactionIterator { return current_user_key_; } const CompactionIterationStats& iter_stats() const { return iter_stats_; } - uint64_t num_input_entry_scanned() const { return input_.num_itered(); } + bool HasNumInputEntryScanned() const { return input_.HasNumItered(); } + uint64_t NumInputEntryScanned() const { return input_.NumItered(); } // If the current key should be placed on penultimate level, only valid if // per_key_placement is supported bool output_to_penultimate_level() const { diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 81362d792..20428a586 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -293,8 +293,8 @@ class CompactionIteratorTest : public testing::TestWithParam { nullptr /* blob_file_builder */, true /*allow_data_in_errors*/, true /*enforce_single_del_contracts*/, /*manual_compaction_canceled=*/kManualCompactionCanceledFalse_, - std::move(compaction), filter, &shutting_down_, /*info_log=*/nullptr, - full_history_ts_low)); + std::move(compaction), /*must_count_input_entries=*/false, filter, + &shutting_down_, /*info_log=*/nullptr, full_history_ts_low)); } void AddSnapshot(SequenceNumber snapshot, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index d609e0154..8ea806816 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -796,15 +796,46 @@ Status CompactionJob::Run() { auto fn = TableFileName(state.compaction->immutable_options()->cf_paths, output.meta.fd.GetNumber(), output.meta.fd.GetPathId()); - tp[fn] = output.table_properties; + compact_->compaction->SetOutputTableProperties(fn, + output.table_properties); } } - compact_->compaction->SetOutputTableProperties(std::move(tp)); - // Finish up all book-keeping to unify the subcompaction results + // Finish up all bookkeeping to unify the subcompaction results. compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_); - UpdateCompactionStats(); - + uint64_t num_input_range_del = 0; + bool ok = UpdateCompactionStats(&num_input_range_del); + // (Sub)compactions returned ok, do sanity check on the number of input keys. + if (status.ok() && ok && compaction_job_stats_->has_num_input_records) { + size_t ts_sz = compact_->compaction->column_family_data() + ->user_comparator() + ->timestamp_size(); + // When trim_ts_ is non-empty, CompactionIterator takes + // HistoryTrimmingIterator as input iterator and sees a trimmed view of + // input keys. So the number of keys it processed is not suitable for + // verification here. + // TODO: support verification when trim_ts_ is non-empty. + if (!(ts_sz > 0 && !trim_ts_.empty()) && + db_options_.compaction_verify_record_count) { + assert(compaction_stats_.stats.num_input_records > 0); + // TODO: verify the number of range deletion entries. + uint64_t expected = + compaction_stats_.stats.num_input_records - num_input_range_del; + uint64_t actual = compaction_job_stats_->num_input_records; + if (expected != actual) { + std::string msg = + "Total number of input records: " + std::to_string(expected) + + ", but processed " + std::to_string(actual) + " records."; + ROCKS_LOG_WARN( + db_options_.info_log, "[%s] [JOB %d] Compaction %s", + compact_->compaction->column_family_data()->GetName().c_str(), + job_context_->job_id, msg.c_str()); + status = Status::Corruption( + "Compaction number of input keys does not match number of keys " + "processed."); + } + } + } RecordCompactionIOStats(); LogFlush(db_options_.info_log); TEST_SYNC_POINT("CompactionJob::Run():End"); @@ -1252,6 +1283,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { /*expect_valid_internal_key=*/true, range_del_agg.get(), blob_file_builder.get(), db_options_.allow_data_in_errors, db_options_.enforce_single_del_contracts, manual_compaction_canceled_, + sub_compact->compaction + ->DoesInputReferenceBlobFiles() /* must_count_input_entries */, sub_compact->compaction, compaction_filter, shutting_down_, db_options_.info_log, full_history_ts_low, preserve_time_min_seqno_, preclude_last_level_min_seqno_); @@ -1316,8 +1349,25 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (c_iter->status().IsManualCompactionPaused()) { break; } + +#ifndef NDEBUG + bool stop = false; + TEST_SYNC_POINT_CALLBACK("CompactionJob::ProcessKeyValueCompaction()::stop", + static_cast(&stop)); + if (stop) { + break; + } +#endif // NDEBUG } + // This number may not be accurate when CompactionIterator was created + // with `must_count_input_entries=false`. + assert(!sub_compact->compaction->DoesInputReferenceBlobFiles() || + c_iter->HasNumInputEntryScanned()); + sub_compact->compaction_job_stats.has_num_input_records = + c_iter->HasNumInputEntryScanned(); + sub_compact->compaction_job_stats.num_input_records = + c_iter->NumInputEntryScanned(); sub_compact->compaction_job_stats.num_blobs_read = c_iter_stats.num_blobs_read; sub_compact->compaction_job_stats.total_blob_bytes_read = @@ -1903,24 +1953,53 @@ void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) { } } // namespace - -void CompactionJob::UpdateCompactionStats() { +bool CompactionJob::UpdateCompactionStats(uint64_t* num_input_range_del) { assert(compact_); Compaction* compaction = compact_->compaction; compaction_stats_.stats.num_input_files_in_non_output_levels = 0; compaction_stats_.stats.num_input_files_in_output_level = 0; + + bool has_error = false; + const ReadOptions read_options(Env::IOActivity::kCompaction); + const auto& input_table_properties = compaction->GetTableProperties(); for (int input_level = 0; input_level < static_cast(compaction->num_input_levels()); ++input_level) { + size_t num_input_files = compaction->num_input_files(input_level); + uint64_t* bytes_read; if (compaction->level(input_level) != compaction->output_level()) { - UpdateCompactionInputStatsHelper( - &compaction_stats_.stats.num_input_files_in_non_output_levels, - &compaction_stats_.stats.bytes_read_non_output_levels, input_level); + compaction_stats_.stats.num_input_files_in_non_output_levels += + static_cast(num_input_files); + bytes_read = &compaction_stats_.stats.bytes_read_non_output_levels; } else { - UpdateCompactionInputStatsHelper( - &compaction_stats_.stats.num_input_files_in_output_level, - &compaction_stats_.stats.bytes_read_output_level, input_level); + compaction_stats_.stats.num_input_files_in_output_level += + static_cast(num_input_files); + bytes_read = &compaction_stats_.stats.bytes_read_output_level; + } + for (size_t i = 0; i < num_input_files; ++i) { + const FileMetaData* file_meta = compaction->input(input_level, i); + *bytes_read += file_meta->fd.GetFileSize(); + uint64_t file_input_entries = file_meta->num_entries; + uint64_t file_num_range_del = file_meta->num_range_deletions; + if (file_input_entries == 0) { + uint64_t file_number = file_meta->fd.GetNumber(); + // Try getting info from table property + std::string fn = + TableFileName(compaction->immutable_options()->cf_paths, + file_number, file_meta->fd.GetPathId()); + const auto& tp = input_table_properties.find(fn); + if (tp != input_table_properties.end()) { + file_input_entries = tp->second->num_entries; + file_num_range_del = tp->second->num_range_deletions; + } else { + has_error = true; + } + } + compaction_stats_.stats.num_input_records += file_input_entries; + if (num_input_range_del) { + *num_input_range_del += file_num_range_del; + } } } @@ -1930,21 +2009,7 @@ void CompactionJob::UpdateCompactionStats() { compaction_stats_.stats.num_dropped_records = compaction_stats_.DroppedRecords(); -} - -void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files, - uint64_t* bytes_read, - int input_level) { - const Compaction* compaction = compact_->compaction; - auto num_input_files = compaction->num_input_files(input_level); - *num_files += static_cast(num_input_files); - - for (size_t i = 0; i < num_input_files; ++i) { - const auto* file_meta = compaction->input(input_level, i); - *bytes_read += file_meta->fd.GetFileSize(); - compaction_stats_.stats.num_input_records += - static_cast(file_meta->num_entries); - } + return !has_error; } void CompactionJob::UpdateCompactionJobStats( diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index a930c15f1..926f4a8f9 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -192,7 +192,21 @@ class CompactionJob { IOStatus io_status() const { return io_status_; } protected: - void UpdateCompactionStats(); + // Update the following stats in compaction_stats_.stats + // - num_input_files_in_non_output_levels + // - num_input_files_in_output_level + // - bytes_read_non_output_levels + // - bytes_read_output_level + // - num_input_records + // - bytes_read_blob + // - num_dropped_records + // + // @param num_input_range_del if non-null, will be set to the number of range + // deletion entries in this compaction input. + // + // Returns true iff compaction_stats_.stats.num_input_records and + // num_input_range_del are calculated successfully. + bool UpdateCompactionStats(uint64_t* num_input_range_del = nullptr); void LogCompaction(); virtual void RecordCompactionIOStats(); void CleanupCompaction(); @@ -267,9 +281,6 @@ class CompactionJob { void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats, CompactionJobStats* compaction_job_stats = nullptr); - void UpdateCompactionInputStatsHelper(int* num_files, uint64_t* bytes_read, - int input_level); - void NotifyOnSubcompactionBegin(SubcompactionState* sub_compact); void NotifyOnSubcompactionCompleted(SubcompactionState* sub_compact); diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index f7fc28c15..8f91cc04c 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -655,11 +655,12 @@ class CompactionJobTestBase : public testing::Test { ASSERT_TRUE(full_history_ts_low_.empty() || ucmp_->timestamp_size() == full_history_ts_low_.size()); const std::atomic kManualCompactionCanceledFalse{false}; + JobContext job_context(1, false /* create_superversion */); CompactionJob compaction_job( 0, &compaction, db_options_, mutable_db_options_, env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots, - earliest_write_conflict_snapshot, snapshot_checker, nullptr, + earliest_write_conflict_snapshot, snapshot_checker, &job_context, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */, /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 00a33669b..3e565108a 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -9807,6 +9807,43 @@ TEST_F(DBCompactionTest, NumberOfSubcompactions) { } } +TEST_F(DBCompactionTest, VerifyRecordCount) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleLevel; + options.level0_file_num_compaction_trigger = 3; + options.compaction_verify_record_count = true; + DestroyAndReopen(options); + Random rnd(301); + + // Create 2 overlapping L0 files + for (int i = 1; i < 20; i += 2) { + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + } + ASSERT_OK(Flush()); + + for (int i = 0; i < 20; i += 2) { + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + } + ASSERT_OK(Flush()); + + // Only iterator through 10 keys and force compaction to finish. + int num_iter = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::ProcessKeyValueCompaction()::stop", [&](void* stop_ptr) { + num_iter++; + if (num_iter == 10) { + *(bool*)stop_ptr = true; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + ASSERT_TRUE(s.IsCorruption()); + const char* expect = + "Compaction number of input keys does not match number of keys " + "processed."; + ASSERT_TRUE(std::strstr(s.getState(), expect)); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 27f539182..0c654035b 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2200,7 +2200,7 @@ class DBImpl : public DB { void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c, const Status& st, const CompactionJobStats& compaction_job_stats, - const int job_id, const Version* current, + const int job_id, CompactionJobInfo* compaction_job_info) const; // Reserve the next 'num' file numbers for to-be-ingested external SST files, // and return the current file_number in 'next_file_number'. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 4e0372e69..9cde56061 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1546,7 +1546,7 @@ Status DBImpl::CompactFilesImpl( if (compaction_job_info != nullptr) { BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats, - job_context->job_id, version, compaction_job_info); + job_context->job_id, compaction_job_info); } if (status.ok()) { @@ -1643,21 +1643,18 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, } c->SetNotifyOnCompactionCompleted(); - Version* current = cfd->current(); - current->Ref(); // release lock while notifying events mutex_.Unlock(); TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex"); { CompactionJobInfo info{}; - BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, current, &info); + BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, &info); for (auto listener : immutable_db_options_.listeners) { listener->OnCompactionBegin(this, info); } info.status.PermitUncheckedError(); } mutex_.Lock(); - current->Unref(); } void DBImpl::NotifyOnCompactionCompleted( @@ -1675,21 +1672,17 @@ void DBImpl::NotifyOnCompactionCompleted( return; } - Version* current = cfd->current(); - current->Ref(); // release lock while notifying events mutex_.Unlock(); TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex"); { CompactionJobInfo info{}; - BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current, - &info); + BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, &info); for (auto listener : immutable_db_options_.listeners) { listener->OnCompactionCompleted(this, info); } } mutex_.Lock(); - current->Unref(); // no need to signal bg_cv_ as it will be signaled at the end of the // flush process. } @@ -3923,7 +3916,7 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { void DBImpl::BuildCompactionJobInfo( const ColumnFamilyData* cfd, Compaction* c, const Status& st, const CompactionJobStats& compaction_job_stats, const int job_id, - const Version* current, CompactionJobInfo* compaction_job_info) const { + CompactionJobInfo* compaction_job_info) const { assert(compaction_job_info != nullptr); compaction_job_info->cf_id = cfd->GetID(); compaction_job_info->cf_name = cfd->GetName(); @@ -3933,7 +3926,7 @@ void DBImpl::BuildCompactionJobInfo( compaction_job_info->base_input_level = c->start_level(); compaction_job_info->output_level = c->output_level(); compaction_job_info->stats = compaction_job_stats; - compaction_job_info->table_properties = c->GetOutputTableProperties(); + compaction_job_info->table_properties = c->GetTableProperties(); compaction_job_info->compaction_reason = c->compaction_reason(); compaction_job_info->compression = c->output_compression(); @@ -3947,15 +3940,9 @@ void DBImpl::BuildCompactionJobInfo( compaction_job_info->input_files.push_back(fn); compaction_job_info->input_file_infos.push_back(CompactionFileInfo{ static_cast(i), file_number, fmd->oldest_blob_file_number}); - if (compaction_job_info->table_properties.count(fn) == 0) { - std::shared_ptr tp; - auto s = current->GetTableProperties(read_options, &tp, fmd, &fn); - if (s.ok()) { - compaction_job_info->table_properties[fn] = tp; - } - } } } + for (const auto& newf : c->edit()->GetNewFiles()) { const FileMetaData& meta = newf.second; const FileDescriptor& desc = meta.fd; diff --git a/db/flush_job.cc b/db/flush_job.cc index bfdd9a059..b989cc8e3 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -490,6 +490,7 @@ Status FlushJob::MemPurge() { nullptr, ioptions->allow_data_in_errors, ioptions->enforce_single_del_contracts, /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, + false /* must_count_input_entries */, /*compaction=*/nullptr, compaction_filter.get(), /*shutting_down=*/nullptr, ioptions->info_log, full_history_ts_low); diff --git a/db/version_edit.h b/db/version_edit.h index a13d8e65f..e6d54d31d 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -193,7 +193,8 @@ struct FileMetaData { uint64_t compensated_file_size = 0; // These values can mutate, but they can only be read or written from // single-threaded LogAndApply thread - uint64_t num_entries = 0; // the number of entries. + uint64_t num_entries = + 0; // The number of entries, including deletions and range deletions. // The number of deletion entries, including range deletions. uint64_t num_deletions = 0; uint64_t raw_key_size = 0; // total uncompressed key size. diff --git a/include/rocksdb/compaction_job_stats.h b/include/rocksdb/compaction_job_stats.h index 5ff8eccc8..7e8153044 100644 --- a/include/rocksdb/compaction_job_stats.h +++ b/include/rocksdb/compaction_job_stats.h @@ -24,6 +24,9 @@ struct CompactionJobStats { // the elapsed CPU time of this compaction in microseconds. uint64_t cpu_micros; + // Used internally indicating whether a subcompaction's + // `num_input_records` is accurate. + bool has_num_input_records; // the number of compaction input records. uint64_t num_input_records; // the number of blobs read from blob files diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 6cf91a491..53e534164 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -490,11 +490,29 @@ struct DBOptions { // If true, during memtable flush, RocksDB will validate total entries // read in flush, and compare with counter inserted into it. + // // The option is here to turn the feature off in case this new validation - // feature has a bug. + // feature has a bug. The option may be removed in the future once the + // feature is stable. + // // Default: true bool flush_verify_memtable_count = true; + // If true, during compaction, RocksDB will count the number of entries + // read and compare it against the number of entries in the compaction + // input files. This is intended to add protection against corruption + // during compaction. Note that + // - this verification is not done for compactions during which a compaction + // filter returns kRemoveAndSkipUntil, and + // - the number of range deletions is not verified. + // + // The option is here to turn the feature off in case this new validation + // feature has a bug. The option may be removed in the future once the + // feature is stable. + // + // Default: true + bool compaction_verify_record_count = true; + // If true, the log numbers and sizes of the synced WALs are tracked // in MANIFEST. During DB recovery, if a synced WAL is missing // from disk, or the WAL's size does not match the recorded size in diff --git a/options/db_options.cc b/options/db_options.cc index d81e72833..f009c1a59 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -222,6 +222,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, flush_verify_memtable_count), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"compaction_verify_record_count", + {offsetof(struct ImmutableDBOptions, compaction_verify_record_count), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"track_and_verify_wals_in_manifest", {offsetof(struct ImmutableDBOptions, track_and_verify_wals_in_manifest), @@ -679,6 +683,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) error_if_exists(options.error_if_exists), paranoid_checks(options.paranoid_checks), flush_verify_memtable_count(options.flush_verify_memtable_count), + compaction_verify_record_count(options.compaction_verify_record_count), track_and_verify_wals_in_manifest( options.track_and_verify_wals_in_manifest), verify_sst_unique_id_in_manifest( @@ -771,6 +776,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { paranoid_checks); ROCKS_LOG_HEADER(log, " Options.flush_verify_memtable_count: %d", flush_verify_memtable_count); + ROCKS_LOG_HEADER(log, " Options.compaction_verify_record_count: %d", + compaction_verify_record_count); ROCKS_LOG_HEADER(log, " " "Options.track_and_verify_wals_in_manifest: %d", diff --git a/options/db_options.h b/options/db_options.h index 2a9d98b25..d00a06718 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -25,6 +25,7 @@ struct ImmutableDBOptions { bool error_if_exists; bool paranoid_checks; bool flush_verify_memtable_count; + bool compaction_verify_record_count; bool track_and_verify_wals_in_manifest; bool verify_sst_unique_id_in_manifest; Env* env; diff --git a/options/options_helper.cc b/options/options_helper.cc index abe5053d2..d221f9705 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -60,6 +60,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.paranoid_checks = immutable_db_options.paranoid_checks; options.flush_verify_memtable_count = immutable_db_options.flush_verify_memtable_count; + options.compaction_verify_record_count = + immutable_db_options.compaction_verify_record_count; options.track_and_verify_wals_in_manifest = immutable_db_options.track_and_verify_wals_in_manifest; options.verify_sst_unique_id_in_manifest = diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 19cb6310f..8b69e6079 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -308,6 +308,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "writable_file_max_buffer_size=1048576;" "paranoid_checks=true;" "flush_verify_memtable_count=true;" + "compaction_verify_record_count=true;" "track_and_verify_wals_in_manifest=true;" "verify_sst_unique_id_in_manifest=true;" "is_fd_close_on_exec=false;" diff --git a/table/mock_table.cc b/table/mock_table.cc index c251ea108..d6229ef60 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -230,7 +230,13 @@ Status MockTableReader::Get(const ReadOptions&, const Slice& key, std::shared_ptr MockTableReader::GetTableProperties() const { - return std::shared_ptr(new TableProperties()); + TableProperties* tp = new TableProperties(); + tp->num_entries = table_.size(); + tp->num_range_deletions = 0; + tp->raw_key_size = 1; + tp->raw_value_size = 1; + + return std::shared_ptr(tp); } MockTableFactory::MockTableFactory() diff --git a/unreleased_history/new_features/compaction_verify_input_count.md b/unreleased_history/new_features/compaction_verify_input_count.md new file mode 100644 index 000000000..32cfe0910 --- /dev/null +++ b/unreleased_history/new_features/compaction_verify_input_count.md @@ -0,0 +1 @@ +* RocksDB will compare the number of input keys to the number of keys processed after each compaction. Compaction will fail and report Corruption status if the verification fails. Option `compaction_verify_record_count` is introduced for this purpose and is enabled by default. diff --git a/util/compaction_job_stats_impl.cc b/util/compaction_job_stats_impl.cc index 587a26f24..cdb591f23 100644 --- a/util/compaction_job_stats_impl.cc +++ b/util/compaction_job_stats_impl.cc @@ -12,6 +12,7 @@ void CompactionJobStats::Reset() { elapsed_micros = 0; cpu_micros = 0; + has_num_input_records = true; num_input_records = 0; num_blobs_read = 0; num_input_files = 0; @@ -55,6 +56,7 @@ void CompactionJobStats::Add(const CompactionJobStats& stats) { elapsed_micros += stats.elapsed_micros; cpu_micros += stats.cpu_micros; + has_num_input_records &= stats.has_num_input_records; num_input_records += stats.num_input_records; num_blobs_read += stats.num_blobs_read; num_input_files += stats.num_input_files;