diff --git a/db/builder.cc b/db/builder.cc index 84d2396c3..a3a6bc47e 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -203,6 +203,7 @@ Status BuildTable( blob_file_builder.get(), ioptions.allow_data_in_errors, ioptions.enforce_single_del_contracts, /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, + true /* must_count_input_entries */, /*compaction=*/nullptr, compaction_filter.get(), /*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low); @@ -286,8 +287,9 @@ Status BuildTable( TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable"); const bool empty = builder->IsEmpty(); if (num_input_entries != nullptr) { + assert(c_iter.HasNumInputEntryScanned()); *num_input_entries = - c_iter.num_input_entry_scanned() + num_unfragmented_tombstones; + c_iter.NumInputEntryScanned() + num_unfragmented_tombstones; } if (!s.ok() || empty) { builder->Abandon(); diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index ceed9d104..e28257d65 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -13,6 +13,7 @@ #include #include "db/column_family.h" +#include "logging/logging.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/sst_partitioner.h" #include "test_util/sync_point.h" @@ -203,6 +204,34 @@ bool Compaction::IsFullCompaction( return num_files_in_compaction == total_num_files; } +const TablePropertiesCollection& Compaction::GetTableProperties() { + if (!input_table_properties_initialized_) { + const ReadOptions read_options(Env::IOActivity::kCompaction); + for (size_t i = 0; i < num_input_levels(); ++i) { + for (const FileMetaData* fmd : *(this->inputs(i))) { + std::shared_ptr tp; + std::string file_name = + TableFileName(immutable_options_.cf_paths, fmd->fd.GetNumber(), + fmd->fd.GetPathId()); + Status s = input_version_->GetTableProperties(read_options, &tp, fmd, + &file_name); + if (s.ok()) { + table_properties_[file_name] = tp; + } else { + ROCKS_LOG_ERROR(immutable_options_.info_log, + "Unable to load table properties for file %" PRIu64 + " --- %s\n", + fmd->fd.GetNumber(), s.ToString().c_str()); + } + } + } + + input_table_properties_initialized_ = true; + }; + + return table_properties_; +} + Compaction::Compaction( VersionStorageInfo* vstorage, const ImmutableOptions& _immutable_options, const MutableCFOptions& _mutable_cf_options, diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 1bd406bc9..fcb0f3003 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -326,12 +326,16 @@ class Compaction { int output_level, VersionStorageInfo* vstorage, const std::vector& inputs); - TablePropertiesCollection GetOutputTableProperties() const { - return output_table_properties_; - } - - void SetOutputTableProperties(TablePropertiesCollection tp) { - output_table_properties_ = std::move(tp); + // If called before a compaction finishes, will return + // table properties of all compaction input files. + // If called after a compaction finished, will return + // table properties of all compaction input and output files. + const TablePropertiesCollection& GetTableProperties(); + + void SetOutputTableProperties( + const std::string& file_name, + const std::shared_ptr& tp) { + table_properties_[file_name] = tp; } Slice GetSmallestUserKey() const { return smallest_user_key_; } @@ -518,8 +522,9 @@ class Compaction { // Does input compression match the output compression? bool InputCompressionMatchesOutput() const; + bool input_table_properties_initialized_ = false; // table properties of output files - TablePropertiesCollection output_table_properties_; + TablePropertiesCollection table_properties_; // smallest user keys in compaction // includes timestamp if user-defined timestamp is enabled. diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 5be7b565a..1c3ca5e1e 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -31,7 +31,8 @@ CompactionIterator::CompactionIterator( BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, bool enforce_single_del_contracts, const std::atomic& manual_compaction_canceled, - const Compaction* compaction, const CompactionFilter* compaction_filter, + bool must_count_input_entries, const Compaction* compaction, + const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const std::shared_ptr info_log, const std::string* full_history_ts_low, @@ -45,8 +46,9 @@ CompactionIterator::CompactionIterator( manual_compaction_canceled, std::unique_ptr( compaction ? new RealCompaction(compaction) : nullptr), - compaction_filter, shutting_down, info_log, full_history_ts_low, - preserve_time_min_seqno, preclude_last_level_min_seqno) {} + must_count_input_entries, compaction_filter, shutting_down, info_log, + full_history_ts_low, preserve_time_min_seqno, + preclude_last_level_min_seqno) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -58,15 +60,14 @@ CompactionIterator::CompactionIterator( BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, bool enforce_single_del_contracts, const std::atomic& manual_compaction_canceled, - std::unique_ptr compaction, + std::unique_ptr compaction, bool must_count_input_entries, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const std::shared_ptr info_log, const std::string* full_history_ts_low, const SequenceNumber preserve_time_min_seqno, const SequenceNumber preclude_last_level_min_seqno) - : input_(input, cmp, - !compaction || compaction->DoesInputReferenceBlobFiles()), + : input_(input, cmp, must_count_input_entries), cmp_(cmp), merge_helper_(merge_helper), snapshots_(snapshots), diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index ea2dc062e..15193b587 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -38,15 +38,18 @@ class SequenceIterWrapper : public InternalIterator { bool Valid() const override { return inner_iter_->Valid(); } Status status() const override { return inner_iter_->status(); } void Next() override { - num_itered_++; + if (!inner_iter_->IsDeleteRangeSentinelKey()) { + num_itered_++; + } inner_iter_->Next(); } void Seek(const Slice& target) override { if (!need_count_entries_) { + has_num_itered_ = false; inner_iter_->Seek(target); } else { - // For flush cases, we need to count total number of entries, so we - // do Next() rather than Seek(). + // Need to count total number of entries, + // so we do Next() rather than Seek(). while (inner_iter_->Valid() && icmp_.Compare(inner_iter_->key(), target) < 0) { Next(); @@ -62,7 +65,8 @@ class SequenceIterWrapper : public InternalIterator { void SeekForPrev(const Slice& /* target */) override { assert(false); } void SeekToLast() override { assert(false); } - uint64_t num_itered() const { return num_itered_; } + uint64_t NumItered() const { return num_itered_; } + bool HasNumItered() const { return has_num_itered_; } bool IsDeleteRangeSentinelKey() const override { assert(Valid()); return inner_iter_->IsDeleteRangeSentinelKey(); @@ -73,6 +77,7 @@ class SequenceIterWrapper : public InternalIterator { InternalIterator* inner_iter_; // not owned uint64_t num_itered_ = 0; bool need_count_entries_; + bool has_num_itered_ = true; }; class CompactionIterator { @@ -189,6 +194,10 @@ class CompactionIterator { const Compaction* compaction_; }; + // @param must_count_input_entries if true, `NumInputEntryScanned()` will + // return the number of input keys scanned. If false, `NumInputEntryScanned()` + // will return this number if no Seek was called on `input`. User should call + // `HasNumInputEntryScanned()` first in this case. CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, @@ -199,7 +208,7 @@ class CompactionIterator { BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, bool enforce_single_del_contracts, const std::atomic& manual_compaction_canceled, - const Compaction* compaction = nullptr, + bool must_count_input_entries, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const std::shared_ptr info_log = nullptr, @@ -219,6 +228,7 @@ class CompactionIterator { bool enforce_single_del_contracts, const std::atomic& manual_compaction_canceled, std::unique_ptr compaction, + bool must_count_input_entries, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const std::shared_ptr info_log = nullptr, @@ -253,7 +263,8 @@ class CompactionIterator { return current_user_key_; } const CompactionIterationStats& iter_stats() const { return iter_stats_; } - uint64_t num_input_entry_scanned() const { return input_.num_itered(); } + bool HasNumInputEntryScanned() const { return input_.HasNumItered(); } + uint64_t NumInputEntryScanned() const { return input_.NumItered(); } // If the current key should be placed on penultimate level, only valid if // per_key_placement is supported bool output_to_penultimate_level() const { diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 81362d792..20428a586 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -293,8 +293,8 @@ class CompactionIteratorTest : public testing::TestWithParam { nullptr /* blob_file_builder */, true /*allow_data_in_errors*/, true /*enforce_single_del_contracts*/, /*manual_compaction_canceled=*/kManualCompactionCanceledFalse_, - std::move(compaction), filter, &shutting_down_, /*info_log=*/nullptr, - full_history_ts_low)); + std::move(compaction), /*must_count_input_entries=*/false, filter, + &shutting_down_, /*info_log=*/nullptr, full_history_ts_low)); } void AddSnapshot(SequenceNumber snapshot, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index d609e0154..8ea806816 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -796,15 +796,46 @@ Status CompactionJob::Run() { auto fn = TableFileName(state.compaction->immutable_options()->cf_paths, output.meta.fd.GetNumber(), output.meta.fd.GetPathId()); - tp[fn] = output.table_properties; + compact_->compaction->SetOutputTableProperties(fn, + output.table_properties); } } - compact_->compaction->SetOutputTableProperties(std::move(tp)); - // Finish up all book-keeping to unify the subcompaction results + // Finish up all bookkeeping to unify the subcompaction results. compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_); - UpdateCompactionStats(); - + uint64_t num_input_range_del = 0; + bool ok = UpdateCompactionStats(&num_input_range_del); + // (Sub)compactions returned ok, do sanity check on the number of input keys. + if (status.ok() && ok && compaction_job_stats_->has_num_input_records) { + size_t ts_sz = compact_->compaction->column_family_data() + ->user_comparator() + ->timestamp_size(); + // When trim_ts_ is non-empty, CompactionIterator takes + // HistoryTrimmingIterator as input iterator and sees a trimmed view of + // input keys. So the number of keys it processed is not suitable for + // verification here. + // TODO: support verification when trim_ts_ is non-empty. + if (!(ts_sz > 0 && !trim_ts_.empty()) && + db_options_.compaction_verify_record_count) { + assert(compaction_stats_.stats.num_input_records > 0); + // TODO: verify the number of range deletion entries. + uint64_t expected = + compaction_stats_.stats.num_input_records - num_input_range_del; + uint64_t actual = compaction_job_stats_->num_input_records; + if (expected != actual) { + std::string msg = + "Total number of input records: " + std::to_string(expected) + + ", but processed " + std::to_string(actual) + " records."; + ROCKS_LOG_WARN( + db_options_.info_log, "[%s] [JOB %d] Compaction %s", + compact_->compaction->column_family_data()->GetName().c_str(), + job_context_->job_id, msg.c_str()); + status = Status::Corruption( + "Compaction number of input keys does not match number of keys " + "processed."); + } + } + } RecordCompactionIOStats(); LogFlush(db_options_.info_log); TEST_SYNC_POINT("CompactionJob::Run():End"); @@ -1252,6 +1283,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { /*expect_valid_internal_key=*/true, range_del_agg.get(), blob_file_builder.get(), db_options_.allow_data_in_errors, db_options_.enforce_single_del_contracts, manual_compaction_canceled_, + sub_compact->compaction + ->DoesInputReferenceBlobFiles() /* must_count_input_entries */, sub_compact->compaction, compaction_filter, shutting_down_, db_options_.info_log, full_history_ts_low, preserve_time_min_seqno_, preclude_last_level_min_seqno_); @@ -1316,8 +1349,25 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (c_iter->status().IsManualCompactionPaused()) { break; } + +#ifndef NDEBUG + bool stop = false; + TEST_SYNC_POINT_CALLBACK("CompactionJob::ProcessKeyValueCompaction()::stop", + static_cast(&stop)); + if (stop) { + break; + } +#endif // NDEBUG } + // This number may not be accurate when CompactionIterator was created + // with `must_count_input_entries=false`. + assert(!sub_compact->compaction->DoesInputReferenceBlobFiles() || + c_iter->HasNumInputEntryScanned()); + sub_compact->compaction_job_stats.has_num_input_records = + c_iter->HasNumInputEntryScanned(); + sub_compact->compaction_job_stats.num_input_records = + c_iter->NumInputEntryScanned(); sub_compact->compaction_job_stats.num_blobs_read = c_iter_stats.num_blobs_read; sub_compact->compaction_job_stats.total_blob_bytes_read = @@ -1903,24 +1953,53 @@ void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) { } } // namespace - -void CompactionJob::UpdateCompactionStats() { +bool CompactionJob::UpdateCompactionStats(uint64_t* num_input_range_del) { assert(compact_); Compaction* compaction = compact_->compaction; compaction_stats_.stats.num_input_files_in_non_output_levels = 0; compaction_stats_.stats.num_input_files_in_output_level = 0; + + bool has_error = false; + const ReadOptions read_options(Env::IOActivity::kCompaction); + const auto& input_table_properties = compaction->GetTableProperties(); for (int input_level = 0; input_level < static_cast(compaction->num_input_levels()); ++input_level) { + size_t num_input_files = compaction->num_input_files(input_level); + uint64_t* bytes_read; if (compaction->level(input_level) != compaction->output_level()) { - UpdateCompactionInputStatsHelper( - &compaction_stats_.stats.num_input_files_in_non_output_levels, - &compaction_stats_.stats.bytes_read_non_output_levels, input_level); + compaction_stats_.stats.num_input_files_in_non_output_levels += + static_cast(num_input_files); + bytes_read = &compaction_stats_.stats.bytes_read_non_output_levels; } else { - UpdateCompactionInputStatsHelper( - &compaction_stats_.stats.num_input_files_in_output_level, - &compaction_stats_.stats.bytes_read_output_level, input_level); + compaction_stats_.stats.num_input_files_in_output_level += + static_cast(num_input_files); + bytes_read = &compaction_stats_.stats.bytes_read_output_level; + } + for (size_t i = 0; i < num_input_files; ++i) { + const FileMetaData* file_meta = compaction->input(input_level, i); + *bytes_read += file_meta->fd.GetFileSize(); + uint64_t file_input_entries = file_meta->num_entries; + uint64_t file_num_range_del = file_meta->num_range_deletions; + if (file_input_entries == 0) { + uint64_t file_number = file_meta->fd.GetNumber(); + // Try getting info from table property + std::string fn = + TableFileName(compaction->immutable_options()->cf_paths, + file_number, file_meta->fd.GetPathId()); + const auto& tp = input_table_properties.find(fn); + if (tp != input_table_properties.end()) { + file_input_entries = tp->second->num_entries; + file_num_range_del = tp->second->num_range_deletions; + } else { + has_error = true; + } + } + compaction_stats_.stats.num_input_records += file_input_entries; + if (num_input_range_del) { + *num_input_range_del += file_num_range_del; + } } } @@ -1930,21 +2009,7 @@ void CompactionJob::UpdateCompactionStats() { compaction_stats_.stats.num_dropped_records = compaction_stats_.DroppedRecords(); -} - -void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files, - uint64_t* bytes_read, - int input_level) { - const Compaction* compaction = compact_->compaction; - auto num_input_files = compaction->num_input_files(input_level); - *num_files += static_cast(num_input_files); - - for (size_t i = 0; i < num_input_files; ++i) { - const auto* file_meta = compaction->input(input_level, i); - *bytes_read += file_meta->fd.GetFileSize(); - compaction_stats_.stats.num_input_records += - static_cast(file_meta->num_entries); - } + return !has_error; } void CompactionJob::UpdateCompactionJobStats( diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index a930c15f1..926f4a8f9 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -192,7 +192,21 @@ class CompactionJob { IOStatus io_status() const { return io_status_; } protected: - void UpdateCompactionStats(); + // Update the following stats in compaction_stats_.stats + // - num_input_files_in_non_output_levels + // - num_input_files_in_output_level + // - bytes_read_non_output_levels + // - bytes_read_output_level + // - num_input_records + // - bytes_read_blob + // - num_dropped_records + // + // @param num_input_range_del if non-null, will be set to the number of range + // deletion entries in this compaction input. + // + // Returns true iff compaction_stats_.stats.num_input_records and + // num_input_range_del are calculated successfully. + bool UpdateCompactionStats(uint64_t* num_input_range_del = nullptr); void LogCompaction(); virtual void RecordCompactionIOStats(); void CleanupCompaction(); @@ -267,9 +281,6 @@ class CompactionJob { void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats, CompactionJobStats* compaction_job_stats = nullptr); - void UpdateCompactionInputStatsHelper(int* num_files, uint64_t* bytes_read, - int input_level); - void NotifyOnSubcompactionBegin(SubcompactionState* sub_compact); void NotifyOnSubcompactionCompleted(SubcompactionState* sub_compact); diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index f7fc28c15..8f91cc04c 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -655,11 +655,12 @@ class CompactionJobTestBase : public testing::Test { ASSERT_TRUE(full_history_ts_low_.empty() || ucmp_->timestamp_size() == full_history_ts_low_.size()); const std::atomic kManualCompactionCanceledFalse{false}; + JobContext job_context(1, false /* create_superversion */); CompactionJob compaction_job( 0, &compaction, db_options_, mutable_db_options_, env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots, - earliest_write_conflict_snapshot, snapshot_checker, nullptr, + earliest_write_conflict_snapshot, snapshot_checker, &job_context, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */, /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 00a33669b..3e565108a 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -9807,6 +9807,43 @@ TEST_F(DBCompactionTest, NumberOfSubcompactions) { } } +TEST_F(DBCompactionTest, VerifyRecordCount) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleLevel; + options.level0_file_num_compaction_trigger = 3; + options.compaction_verify_record_count = true; + DestroyAndReopen(options); + Random rnd(301); + + // Create 2 overlapping L0 files + for (int i = 1; i < 20; i += 2) { + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + } + ASSERT_OK(Flush()); + + for (int i = 0; i < 20; i += 2) { + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + } + ASSERT_OK(Flush()); + + // Only iterator through 10 keys and force compaction to finish. + int num_iter = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::ProcessKeyValueCompaction()::stop", [&](void* stop_ptr) { + num_iter++; + if (num_iter == 10) { + *(bool*)stop_ptr = true; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + ASSERT_TRUE(s.IsCorruption()); + const char* expect = + "Compaction number of input keys does not match number of keys " + "processed."; + ASSERT_TRUE(std::strstr(s.getState(), expect)); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 27f539182..0c654035b 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2200,7 +2200,7 @@ class DBImpl : public DB { void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c, const Status& st, const CompactionJobStats& compaction_job_stats, - const int job_id, const Version* current, + const int job_id, CompactionJobInfo* compaction_job_info) const; // Reserve the next 'num' file numbers for to-be-ingested external SST files, // and return the current file_number in 'next_file_number'. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 4e0372e69..9cde56061 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1546,7 +1546,7 @@ Status DBImpl::CompactFilesImpl( if (compaction_job_info != nullptr) { BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats, - job_context->job_id, version, compaction_job_info); + job_context->job_id, compaction_job_info); } if (status.ok()) { @@ -1643,21 +1643,18 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, } c->SetNotifyOnCompactionCompleted(); - Version* current = cfd->current(); - current->Ref(); // release lock while notifying events mutex_.Unlock(); TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex"); { CompactionJobInfo info{}; - BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, current, &info); + BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, &info); for (auto listener : immutable_db_options_.listeners) { listener->OnCompactionBegin(this, info); } info.status.PermitUncheckedError(); } mutex_.Lock(); - current->Unref(); } void DBImpl::NotifyOnCompactionCompleted( @@ -1675,21 +1672,17 @@ void DBImpl::NotifyOnCompactionCompleted( return; } - Version* current = cfd->current(); - current->Ref(); // release lock while notifying events mutex_.Unlock(); TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex"); { CompactionJobInfo info{}; - BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current, - &info); + BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, &info); for (auto listener : immutable_db_options_.listeners) { listener->OnCompactionCompleted(this, info); } } mutex_.Lock(); - current->Unref(); // no need to signal bg_cv_ as it will be signaled at the end of the // flush process. } @@ -3923,7 +3916,7 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { void DBImpl::BuildCompactionJobInfo( const ColumnFamilyData* cfd, Compaction* c, const Status& st, const CompactionJobStats& compaction_job_stats, const int job_id, - const Version* current, CompactionJobInfo* compaction_job_info) const { + CompactionJobInfo* compaction_job_info) const { assert(compaction_job_info != nullptr); compaction_job_info->cf_id = cfd->GetID(); compaction_job_info->cf_name = cfd->GetName(); @@ -3933,7 +3926,7 @@ void DBImpl::BuildCompactionJobInfo( compaction_job_info->base_input_level = c->start_level(); compaction_job_info->output_level = c->output_level(); compaction_job_info->stats = compaction_job_stats; - compaction_job_info->table_properties = c->GetOutputTableProperties(); + compaction_job_info->table_properties = c->GetTableProperties(); compaction_job_info->compaction_reason = c->compaction_reason(); compaction_job_info->compression = c->output_compression(); @@ -3947,15 +3940,9 @@ void DBImpl::BuildCompactionJobInfo( compaction_job_info->input_files.push_back(fn); compaction_job_info->input_file_infos.push_back(CompactionFileInfo{ static_cast(i), file_number, fmd->oldest_blob_file_number}); - if (compaction_job_info->table_properties.count(fn) == 0) { - std::shared_ptr tp; - auto s = current->GetTableProperties(read_options, &tp, fmd, &fn); - if (s.ok()) { - compaction_job_info->table_properties[fn] = tp; - } - } } } + for (const auto& newf : c->edit()->GetNewFiles()) { const FileMetaData& meta = newf.second; const FileDescriptor& desc = meta.fd; diff --git a/db/flush_job.cc b/db/flush_job.cc index bfdd9a059..b989cc8e3 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -490,6 +490,7 @@ Status FlushJob::MemPurge() { nullptr, ioptions->allow_data_in_errors, ioptions->enforce_single_del_contracts, /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, + false /* must_count_input_entries */, /*compaction=*/nullptr, compaction_filter.get(), /*shutting_down=*/nullptr, ioptions->info_log, full_history_ts_low); diff --git a/db/version_edit.h b/db/version_edit.h index a13d8e65f..e6d54d31d 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -193,7 +193,8 @@ struct FileMetaData { uint64_t compensated_file_size = 0; // These values can mutate, but they can only be read or written from // single-threaded LogAndApply thread - uint64_t num_entries = 0; // the number of entries. + uint64_t num_entries = + 0; // The number of entries, including deletions and range deletions. // The number of deletion entries, including range deletions. uint64_t num_deletions = 0; uint64_t raw_key_size = 0; // total uncompressed key size. diff --git a/include/rocksdb/compaction_job_stats.h b/include/rocksdb/compaction_job_stats.h index 5ff8eccc8..7e8153044 100644 --- a/include/rocksdb/compaction_job_stats.h +++ b/include/rocksdb/compaction_job_stats.h @@ -24,6 +24,9 @@ struct CompactionJobStats { // the elapsed CPU time of this compaction in microseconds. uint64_t cpu_micros; + // Used internally indicating whether a subcompaction's + // `num_input_records` is accurate. + bool has_num_input_records; // the number of compaction input records. uint64_t num_input_records; // the number of blobs read from blob files diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 6cf91a491..53e534164 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -490,11 +490,29 @@ struct DBOptions { // If true, during memtable flush, RocksDB will validate total entries // read in flush, and compare with counter inserted into it. + // // The option is here to turn the feature off in case this new validation - // feature has a bug. + // feature has a bug. The option may be removed in the future once the + // feature is stable. + // // Default: true bool flush_verify_memtable_count = true; + // If true, during compaction, RocksDB will count the number of entries + // read and compare it against the number of entries in the compaction + // input files. This is intended to add protection against corruption + // during compaction. Note that + // - this verification is not done for compactions during which a compaction + // filter returns kRemoveAndSkipUntil, and + // - the number of range deletions is not verified. + // + // The option is here to turn the feature off in case this new validation + // feature has a bug. The option may be removed in the future once the + // feature is stable. + // + // Default: true + bool compaction_verify_record_count = true; + // If true, the log numbers and sizes of the synced WALs are tracked // in MANIFEST. During DB recovery, if a synced WAL is missing // from disk, or the WAL's size does not match the recorded size in diff --git a/options/db_options.cc b/options/db_options.cc index d81e72833..f009c1a59 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -222,6 +222,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, flush_verify_memtable_count), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"compaction_verify_record_count", + {offsetof(struct ImmutableDBOptions, compaction_verify_record_count), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"track_and_verify_wals_in_manifest", {offsetof(struct ImmutableDBOptions, track_and_verify_wals_in_manifest), @@ -679,6 +683,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) error_if_exists(options.error_if_exists), paranoid_checks(options.paranoid_checks), flush_verify_memtable_count(options.flush_verify_memtable_count), + compaction_verify_record_count(options.compaction_verify_record_count), track_and_verify_wals_in_manifest( options.track_and_verify_wals_in_manifest), verify_sst_unique_id_in_manifest( @@ -771,6 +776,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { paranoid_checks); ROCKS_LOG_HEADER(log, " Options.flush_verify_memtable_count: %d", flush_verify_memtable_count); + ROCKS_LOG_HEADER(log, " Options.compaction_verify_record_count: %d", + compaction_verify_record_count); ROCKS_LOG_HEADER(log, " " "Options.track_and_verify_wals_in_manifest: %d", diff --git a/options/db_options.h b/options/db_options.h index 2a9d98b25..d00a06718 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -25,6 +25,7 @@ struct ImmutableDBOptions { bool error_if_exists; bool paranoid_checks; bool flush_verify_memtable_count; + bool compaction_verify_record_count; bool track_and_verify_wals_in_manifest; bool verify_sst_unique_id_in_manifest; Env* env; diff --git a/options/options_helper.cc b/options/options_helper.cc index abe5053d2..d221f9705 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -60,6 +60,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.paranoid_checks = immutable_db_options.paranoid_checks; options.flush_verify_memtable_count = immutable_db_options.flush_verify_memtable_count; + options.compaction_verify_record_count = + immutable_db_options.compaction_verify_record_count; options.track_and_verify_wals_in_manifest = immutable_db_options.track_and_verify_wals_in_manifest; options.verify_sst_unique_id_in_manifest = diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 19cb6310f..8b69e6079 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -308,6 +308,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "writable_file_max_buffer_size=1048576;" "paranoid_checks=true;" "flush_verify_memtable_count=true;" + "compaction_verify_record_count=true;" "track_and_verify_wals_in_manifest=true;" "verify_sst_unique_id_in_manifest=true;" "is_fd_close_on_exec=false;" diff --git a/table/mock_table.cc b/table/mock_table.cc index c251ea108..d6229ef60 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -230,7 +230,13 @@ Status MockTableReader::Get(const ReadOptions&, const Slice& key, std::shared_ptr MockTableReader::GetTableProperties() const { - return std::shared_ptr(new TableProperties()); + TableProperties* tp = new TableProperties(); + tp->num_entries = table_.size(); + tp->num_range_deletions = 0; + tp->raw_key_size = 1; + tp->raw_value_size = 1; + + return std::shared_ptr(tp); } MockTableFactory::MockTableFactory() diff --git a/unreleased_history/new_features/compaction_verify_input_count.md b/unreleased_history/new_features/compaction_verify_input_count.md new file mode 100644 index 000000000..32cfe0910 --- /dev/null +++ b/unreleased_history/new_features/compaction_verify_input_count.md @@ -0,0 +1 @@ +* RocksDB will compare the number of input keys to the number of keys processed after each compaction. Compaction will fail and report Corruption status if the verification fails. Option `compaction_verify_record_count` is introduced for this purpose and is enabled by default. diff --git a/util/compaction_job_stats_impl.cc b/util/compaction_job_stats_impl.cc index 587a26f24..cdb591f23 100644 --- a/util/compaction_job_stats_impl.cc +++ b/util/compaction_job_stats_impl.cc @@ -12,6 +12,7 @@ void CompactionJobStats::Reset() { elapsed_micros = 0; cpu_micros = 0; + has_num_input_records = true; num_input_records = 0; num_blobs_read = 0; num_input_files = 0; @@ -55,6 +56,7 @@ void CompactionJobStats::Add(const CompactionJobStats& stats) { elapsed_micros += stats.elapsed_micros; cpu_micros += stats.cpu_micros; + has_num_input_records &= stats.has_num_input_records; num_input_records += stats.num_input_records; num_blobs_read += stats.num_blobs_read; num_input_files += stats.num_input_files;