diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 89ceb06ea..59097eec0 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -3,6 +3,8 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include + #include "db/compaction/compaction_iterator.h" #include "db/snapshot_checker.h" #include "port/likely.h" @@ -38,7 +40,8 @@ CompactionIterator::CompactionIterator( const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, - const std::atomic* manual_compaction_paused) + const std::atomic* manual_compaction_paused, + const std::shared_ptr info_log) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, @@ -46,7 +49,7 @@ CompactionIterator::CompactionIterator( std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), compaction_filter, shutting_down, preserve_deletes_seqnum, - manual_compaction_paused) {} + manual_compaction_paused, info_log) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -59,7 +62,8 @@ CompactionIterator::CompactionIterator( const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, - const std::atomic* manual_compaction_paused) + const std::atomic* manual_compaction_paused, + const std::shared_ptr info_log) : input_(input), cmp_(cmp), merge_helper_(merge_helper), @@ -78,7 +82,8 @@ CompactionIterator::CompactionIterator( current_user_key_sequence_(0), current_user_key_snapshot_(0), merge_out_iter_(merge_helper_), - current_key_committed_(false) { + current_key_committed_(false), + info_log_(info_log) { assert(compaction_filter_ == nullptr || compaction_ != nullptr); assert(snapshots_ != nullptr); bottommost_level_ = @@ -142,6 +147,11 @@ void CompactionIterator::Next() { // MergeUntil stops when it encounters a corrupt key and does not // include them in the result, so we expect the keys here to be valid. assert(valid_key); + if (!valid_key) { + ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction", + key_.ToString(true).c_str()); + } + // Keep current_key_ in sync. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); key_ = current_key_.GetInternalKey(); @@ -338,7 +348,18 @@ void CompactionIterator::NextFromInput() { // not compact out. We will keep this Put, but can drop it's data. // (See Optimization 3, below.) assert(ikey_.type == kTypeValue); + if (ikey_.type != kTypeValue) { + ROCKS_LOG_FATAL(info_log_, + "Unexpected key type %d for compaction output", + ikey_.type); + } assert(current_user_key_snapshot_ == last_snapshot); + if (current_user_key_snapshot_ != last_snapshot) { + ROCKS_LOG_FATAL(info_log_, + "current_user_key_snapshot_ (%" PRIu64 + ") != last_snapshot (%" PRIu64 ")", + current_user_key_snapshot_, last_snapshot); + } value_.clear(); valid_ = true; @@ -480,6 +501,12 @@ void CompactionIterator::NextFromInput() { // checking since there has already been a record returned for this key // in this snapshot. assert(last_sequence >= current_user_key_sequence_); + if (last_sequence < current_user_key_sequence_) { + ROCKS_LOG_FATAL(info_log_, + "last_sequence (%" PRIu64 + ") < current_user_key_sequence_ (%" PRIu64 ")", + last_sequence, current_user_key_sequence_); + } ++iter_stats_.num_record_drop_hidden; // (A) input_->Next(); @@ -563,6 +590,10 @@ void CompactionIterator::NextFromInput() { // MergeUntil stops when it encounters a corrupt key and does not // include them in the result, so we expect the keys here to valid. assert(valid_key); + if (!valid_key) { + ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction", + key_.ToString(true).c_str()); + } // Keep current_key_ in sync. current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); key_ = current_key_.GetInternalKey(); @@ -623,6 +654,11 @@ void CompactionIterator::PrepareOutput() { ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && valid_ && IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge) { assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); + if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { + ROCKS_LOG_FATAL(info_log_, + "Unexpected key type %d for seq-zero optimization", + ikey_.type); + } ikey_.sequence = 0; current_key_.UpdateInternalKey(0, ikey_.type); } @@ -631,6 +667,10 @@ void CompactionIterator::PrepareOutput() { inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( SequenceNumber in, SequenceNumber* prev_snapshot) { assert(snapshots_->size()); + if (snapshots_->size() == 0) { + ROCKS_LOG_FATAL(info_log_, + "No snapshot left in findEarliestVisibleSnapshot"); + } auto snapshots_iter = std::lower_bound( snapshots_->begin(), snapshots_->end(), in); if (snapshots_iter == snapshots_->begin()) { @@ -638,6 +678,10 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( } else { *prev_snapshot = *std::prev(snapshots_iter); assert(*prev_snapshot < in); + if (*prev_snapshot >= in) { + ROCKS_LOG_FATAL(info_log_, + "*prev_snapshot >= in in findEarliestVisibleSnapshot"); + } } if (snapshot_checker_ == nullptr) { return snapshots_iter != snapshots_->end() @@ -647,6 +691,9 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) { auto cur = *snapshots_iter; assert(in <= cur); + if (in > cur) { + ROCKS_LOG_FATAL(info_log_, "in > cur in findEarliestVisibleSnapshot"); + } // Skip if cur is in released_snapshots. if (has_released_snapshot && released_snapshots_.count(cur) > 0) { continue; @@ -671,9 +718,14 @@ inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() { bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) { assert(snapshot_checker_ != nullptr); - assert(earliest_snapshot_ == kMaxSequenceNumber || - (earliest_snapshot_iter_ != snapshots_->end() && - *earliest_snapshot_iter_ == earliest_snapshot_)); + bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber || + (earliest_snapshot_iter_ != snapshots_->end() && + *earliest_snapshot_iter_ == earliest_snapshot_)); + assert(pre_condition); + if (!pre_condition) { + ROCKS_LOG_FATAL(info_log_, + "Pre-Condition is not hold in IsInEarliestSnapshot"); + } auto in_snapshot = snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_); while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) { @@ -692,6 +744,10 @@ bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) { snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_); } assert(in_snapshot != SnapshotCheckerResult::kSnapshotReleased); + if (in_snapshot == SnapshotCheckerResult::kSnapshotReleased) { + ROCKS_LOG_FATAL(info_log_, + "Unexpected released snapshot in IsInEarliestSnapshot"); + } return in_snapshot == SnapshotCheckerResult::kInSnapshot; } diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index a06a867dd..1e08b407d 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -70,7 +70,8 @@ class CompactionIterator { const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, - const std::atomic* manual_compaction_paused = nullptr); + const std::atomic* manual_compaction_paused = nullptr, + const std::shared_ptr info_log = nullptr); // Constructor with custom CompactionProxy, used for tests. CompactionIterator( @@ -84,7 +85,8 @@ class CompactionIterator { const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, - const std::atomic* manual_compaction_paused = nullptr); + const std::atomic* manual_compaction_paused = nullptr, + const std::shared_ptr info_log = nullptr); ~CompactionIterator(); @@ -222,6 +224,7 @@ class CompactionIterator { // Used to avoid purging uncommitted values. The application can specify // uncommitted values by providing a SnapshotChecker object. bool current_key_committed_; + std::shared_ptr info_log_; bool IsShuttingDown() { // This is a best-effort facility, so memory_order_relaxed is sufficient. diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 2c625997e..4204c402c 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -891,7 +891,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false, &range_del_agg, sub_compact->compaction, compaction_filter, - shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_)); + shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_, + db_options_.info_log)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {