diff --git a/db/builder.cc b/db/builder.cc index 9f127e895..c3f0f4cf2 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -62,9 +62,9 @@ Status BuildTable( FileMetaData* meta, std::vector* blob_file_additions, std::vector snapshots, SequenceNumber earliest_write_conflict_snapshot, - SnapshotChecker* snapshot_checker, bool paranoid_file_checks, - InternalStats* internal_stats, IOStatus* io_status, - const std::shared_ptr& io_tracer, + SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker, + bool paranoid_file_checks, InternalStats* internal_stats, + IOStatus* io_status, const std::shared_ptr& io_tracer, BlobFileCreationReason blob_creation_reason, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, TableProperties* table_properties, Env::WriteLifeTimeHint write_hint, @@ -189,7 +189,8 @@ Status BuildTable( CompactionIterator c_iter( iter, tboptions.internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, &snapshots, earliest_write_conflict_snapshot, - snapshot_checker, env, ShouldReportDetailedTime(env, ioptions.stats), + job_snapshot, snapshot_checker, env, + ShouldReportDetailedTime(env, ioptions.stats), true /* internal key corruption is not ok */, range_del_agg.get(), blob_file_builder.get(), ioptions.allow_data_in_errors, /*compaction=*/nullptr, compaction_filter.get(), diff --git a/db/builder.h b/db/builder.h index c8f39b237..73acf5924 100644 --- a/db/builder.h +++ b/db/builder.h @@ -57,9 +57,9 @@ extern Status BuildTable( FileMetaData* meta, std::vector* blob_file_additions, std::vector snapshots, SequenceNumber earliest_write_conflict_snapshot, - SnapshotChecker* snapshot_checker, bool paranoid_file_checks, - InternalStats* internal_stats, IOStatus* io_status, - const std::shared_ptr& io_tracer, + SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker, + bool paranoid_file_checks, InternalStats* internal_stats, + IOStatus* io_status, const std::shared_ptr& io_tracer, BlobFileCreationReason blob_creation_reason, EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 7ee1dc15c..36616791e 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -24,8 +24,8 @@ CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, + SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, + Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, const Compaction* compaction, const CompactionFilter* compaction_filter, @@ -36,7 +36,7 @@ CompactionIterator::CompactionIterator( const std::string* full_history_ts_low) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, - earliest_write_conflict_snapshot, snapshot_checker, env, + earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, report_detailed_time, expect_valid_internal_key, range_del_agg, blob_file_builder, allow_data_in_errors, std::unique_ptr( @@ -48,8 +48,8 @@ CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber /*last_sequence*/, std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, + SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, + Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, std::unique_ptr compaction, @@ -65,6 +65,7 @@ CompactionIterator::CompactionIterator( merge_helper_(merge_helper), snapshots_(snapshots), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), + job_snapshot_(job_snapshot), snapshot_checker_(snapshot_checker), env_(env), clock_(env_->GetSystemClock().get()), @@ -1057,7 +1058,7 @@ void CompactionIterator::PrepareOutput() { if (valid_ && compaction_ != nullptr && !compaction_->allow_ingest_behind() && bottommost_level_ && DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && - ikey_.type != kTypeMerge) { + ikey_.type != kTypeMerge && current_key_committed_) { assert(ikey_.type != kTypeDeletion); assert(ikey_.type != kTypeSingleDeletion || (timestamp_size_ || full_history_ts_low_)); diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index c02a8d7ec..d93e43642 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -172,8 +172,8 @@ class CompactionIterator { InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, + SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, + Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, const Compaction* compaction = nullptr, @@ -189,8 +189,8 @@ class CompactionIterator { InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, + SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, + Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, std::unique_ptr compaction, @@ -268,7 +268,7 @@ class CompactionIterator { inline bool KeyCommitted(SequenceNumber sequence) { return snapshot_checker_ == nullptr || - snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) == + snapshot_checker_->CheckInSnapshot(sequence, job_snapshot_) == SnapshotCheckerResult::kInSnapshot; } @@ -309,6 +309,7 @@ class CompactionIterator { std::unordered_set released_snapshots_; std::vector::const_iterator earliest_snapshot_iter_; const SequenceNumber earliest_write_conflict_snapshot_; + const SequenceNumber job_snapshot_; const SnapshotChecker* const snapshot_checker_; Env* env_; SystemClock* clock_; diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 285cad057..47289db65 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -275,11 +275,11 @@ class CompactionIteratorTest : public testing::TestWithParam { iter_->SeekToFirst(); c_iter_.reset(new CompactionIterator( iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, - earliest_write_conflict_snapshot, snapshot_checker_.get(), - Env::Default(), false /* report_detailed_time */, false, - range_del_agg_.get(), nullptr /* blob_file_builder */, - true /*allow_data_in_errors*/, std::move(compaction), filter, - &shutting_down_, + earliest_write_conflict_snapshot, kMaxSequenceNumber, + snapshot_checker_.get(), Env::Default(), + false /* report_detailed_time */, false, range_del_agg_.get(), + nullptr /* blob_file_builder */, true /*allow_data_in_errors*/, + std::move(compaction), filter, &shutting_down_, /*manual_compaction_paused=*/nullptr, /*manual_compaction_canceled=*/nullptr, /*info_log=*/nullptr, full_history_ts_low)); diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index e4ec263b5..1780011ee 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -423,10 +423,11 @@ CompactionJob::CompactionJob( ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, std::shared_ptr table_cache, - EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, - const std::string& dbname, CompactionJobStats* compaction_job_stats, - Env::Priority thread_pri, const std::shared_ptr& io_tracer, + const SnapshotChecker* snapshot_checker, JobContext* job_context, + std::shared_ptr table_cache, EventLogger* event_logger, + bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, + CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, + const std::shared_ptr& io_tracer, const std::atomic* manual_compaction_paused, const std::atomic* manual_compaction_canceled, const std::string& db_id, const std::string& db_session_id, @@ -463,6 +464,7 @@ CompactionJob::CompactionJob( existing_snapshots_(std::move(existing_snapshots)), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), snapshot_checker_(snapshot_checker), + job_context_(job_context), table_cache_(std::move(table_cache)), event_logger_(event_logger), paranoid_file_checks_(paranoid_file_checks), @@ -1468,9 +1470,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { Status status; const std::string* const full_history_ts_low = full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_; + const SequenceNumber job_snapshot_seq = + job_context_ ? job_context_->GetJobSnapshotSequence() + : kMaxSequenceNumber; sub_compact->c_iter.reset(new CompactionIterator( input, cfd->user_comparator(), &merge, versions_->LastSequence(), - &existing_snapshots_, earliest_write_conflict_snapshot_, + &existing_snapshots_, earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), /*expect_valid_internal_key=*/true, &range_del_agg, blob_file_builder.get(), db_options_.allow_data_in_errors, @@ -2496,7 +2501,7 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob( job_id, compaction, db_options, mutable_db_options, file_options, versions, shutting_down, log_buffer, nullptr, output_directory, nullptr, stats, db_mutex, db_error_handler, existing_snapshots, - kMaxSequenceNumber, nullptr, table_cache, event_logger, + kMaxSequenceNumber, nullptr, nullptr, table_cache, event_logger, compaction->mutable_cf_options()->paranoid_file_checks, compaction->mutable_cf_options()->report_bg_io_stats, dbname, &(compaction_service_result->stats), Env::Priority::USER, io_tracer, diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 8bec6fe85..24a77c679 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -73,7 +73,7 @@ class CompactionJob { InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, - const SnapshotChecker* snapshot_checker, + const SnapshotChecker* snapshot_checker, JobContext* job_context, std::shared_ptr table_cache, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats, @@ -212,6 +212,8 @@ class CompactionJob { const SnapshotChecker* const snapshot_checker_; + JobContext* job_context_; + std::shared_ptr table_cache_; EventLogger* event_logger_; diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index f87b8197d..7d2564695 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -355,9 +355,9 @@ class CompactionJobTestBase : public testing::Test { 0, &compaction, db_options_, mutable_db_options_, env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots, - earliest_write_conflict_snapshot, snapshot_checker, table_cache_, - &event_logger, false, false, dbname_, &compaction_job_stats_, - Env::Priority::USER, nullptr /* IOTracer */, + earliest_write_conflict_snapshot, snapshot_checker, nullptr, + table_cache_, &event_logger, false, false, dbname_, + &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */, /*manual_compaction_paused=*/nullptr, /*manual_compaction_canceled=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", full_history_ts_low_); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 6d893a8b7..7808de8de 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1367,7 +1367,7 @@ Status DBImpl::CompactFilesImpl( GetDataDir(c->column_family_data(), c->output_path_id()), GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, - table_cache_, &event_logger_, + job_context, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, @@ -3361,7 +3361,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, GetDataDir(c->column_family_data(), c->output_path_id()), GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker, table_cache_, &event_logger_, + snapshot_checker, job_context, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, thread_pri, io_tracer_, diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 7bad3cad7..37b567baf 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1528,9 +1528,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, dbname_, versions_.get(), immutable_db_options_, tboptions, file_options_for_compaction_, cfd->table_cache(), iter.get(), std::move(range_del_iters), &meta, &blob_file_additions, - snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, - paranoid_file_checks, cfd->internal_stats(), &io_s, io_tracer_, - BlobFileCreationReason::kRecovery, &event_logger_, job_id, + snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber, + snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s, + io_tracer_, BlobFileCreationReason::kRecovery, &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */, write_hint, nullptr /*full_history_ts_low*/, &blob_callback_); LogFlush(immutable_db_options_.info_log); diff --git a/db/flush_job.cc b/db/flush_job.cc index 55aeed19c..3ccce2af1 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -455,11 +455,13 @@ Status FlushJob::MemPurge() { ioptions->logger, true /* internal key corruption is not ok */, existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), snapshot_checker_); + assert(job_context_); + SequenceNumber job_snapshot_seq = job_context_->GetJobSnapshotSequence(); CompactionIterator c_iter( iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge, kMaxSequenceNumber, &existing_snapshots_, - earliest_write_conflict_snapshot_, snapshot_checker_, env, - ShouldReportDetailedTime(env, ioptions->stats), + earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_, + env, ShouldReportDetailedTime(env, ioptions->stats), true /* internal key corruption is not ok */, range_del_agg.get(), nullptr, ioptions->allow_data_in_errors, /*compaction=*/nullptr, compaction_filter.get(), @@ -829,6 +831,7 @@ Status FlushJob::WriteLevel0Table() { // TEST_SYNC_POINT_CALLBACK not used. TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables", &mems_size); + assert(job_context_); for (MemTable* m : mems_) { ROCKS_LOG_INFO( db_options_.info_log, @@ -911,16 +914,19 @@ Status FlushJob::WriteLevel0Table() { TableFileCreationReason::kFlush, creation_time, oldest_key_time, current_time, db_id_, db_session_id_, 0 /* target_file_size */, meta_.fd.GetNumber()); + const SequenceNumber job_snapshot_seq = + job_context_->GetJobSnapshotSequence(); s = BuildTable( dbname_, versions_, db_options_, tboptions, file_options_, cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_, &blob_file_additions, existing_snapshots_, - earliest_write_conflict_snapshot_, snapshot_checker_, - mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), - &io_s, io_tracer_, BlobFileCreationReason::kFlush, event_logger_, - job_context_->job_id, Env::IO_HIGH, &table_properties_, write_hint, - full_history_ts_low, blob_callback_, &num_input_entries, - &memtable_payload_bytes, &memtable_garbage_bytes); + earliest_write_conflict_snapshot_, job_snapshot_seq, + snapshot_checker_, mutable_cf_options_.paranoid_file_checks, + cfd_->internal_stats(), &io_s, io_tracer_, + BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id, + Env::IO_HIGH, &table_properties_, write_hint, full_history_ts_low, + blob_callback_, &num_input_entries, &memtable_payload_bytes, + &memtable_garbage_bytes); // TODO: Cleanup io_status in BuildTable and table builders assert(!s.ok() || io_s.ok()); io_s.PermitUncheckedError(); diff --git a/db/job_context.h b/db/job_context.h index 5fff57888..d7d05b11a 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -124,6 +124,14 @@ struct JobContext { job_snapshot != nullptr || sv_have_sth; } + SequenceNumber GetJobSnapshotSequence() const { + if (job_snapshot) { + assert(job_snapshot->snapshot()); + return job_snapshot->snapshot()->GetSequenceNumber(); + } + return kMaxSequenceNumber; + } + // Structure to store information for candidate files to delete. struct CandidateFileInfo { std::string file_name; diff --git a/db/repair.cc b/db/repair.cc index dbf4614d1..4d29329c9 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -450,7 +450,7 @@ class Repairer { dbname_, /* versions */ nullptr, immutable_db_options_, tboptions, file_options_, table_cache_.get(), iter.get(), std::move(range_del_iters), &meta, nullptr /* blob_file_additions */, - {}, kMaxSequenceNumber, snapshot_checker, + {}, kMaxSequenceNumber, kMaxSequenceNumber, snapshot_checker, false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s, nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery, nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,