diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index bce0b82db..ca55eef71 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -493,20 +493,6 @@ void CompactionIterator::NextFromInput() { // in this snapshot. assert(last_sequence >= current_user_key_sequence_); - // Note2: if last_snapshot < current_user_key_snapshot, it can only - // mean last_snapshot is released between we process last value and - // this value, and findEarliestVisibleSnapshot returns the next snapshot - // as current_user_key_snapshot. In this case last value and current - // value are both in current_user_key_snapshot currently. - // Although last_snapshot is released we might still get a definitive - // response when key sequence number changes, e.g., when seq is determined - // too old and visible in all snapshots. - assert(last_snapshot == current_user_key_snapshot_ || - (snapshot_checker_ != nullptr && - snapshot_checker_->CheckInSnapshot(current_user_key_sequence_, - last_snapshot) != - SnapshotCheckerResult::kNotInSnapshot)); - ++iter_stats_.num_record_drop_hidden; // (A) input_->Next(); } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 733eb408a..f4c72e298 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -214,9 +214,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteThread::WriteGroup write_group; bool in_parallel_group = false; uint64_t last_sequence = kMaxSequenceNumber; - if (!two_write_queues_) { - last_sequence = versions_->LastSequence(); - } mutex_.Lock(); @@ -231,6 +228,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_STOP(write_pre_and_post_process_time); status = PreprocessWrite(write_options, &need_log_sync, &write_context); + if (!two_write_queues_) { + // Assign it after ::PreprocessWrite since the sequence might advance + // inside it by WriteRecoverableState + last_sequence = versions_->LastSequence(); + } PERF_TIMER_START(write_pre_and_post_process_time); } @@ -1113,8 +1115,12 @@ Status DBImpl::WriteRecoverableState() { for (uint64_t sub_batch_seq = seq + 1; sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) { uint64_t const no_log_num = 0; + // Unlock it since the callback might end up locking mutex. e.g., + // AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB + mutex_.Unlock(); status = recoverable_state_pre_release_callback_->Callback( sub_batch_seq, !DISABLE_MEMTABLE, no_log_num); + mutex_.Lock(); } } if (status.ok()) { diff --git a/util/transaction_test_util.cc b/util/transaction_test_util.cc index 30cff11e1..bd2d6afdc 100644 --- a/util/transaction_test_util.cc +++ b/util/transaction_test_util.cc @@ -205,6 +205,12 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, "Prepare of %" PRIu64 " %s (%s)", txn->GetId(), s.ToString().c_str(), txn->GetName().c_str()); + if (rand_->OneIn(20)) { + // This currently only tests the mechanics of writing commit time + // write batch so the exact values would not matter. + s = txn_->GetCommitTimeWriteBatch()->Put("cat", "dog"); + assert(s.ok()); + } db->GetDBOptions().env->SleepForMicroseconds( static_cast(cmt_delay_ms_ * 1000)); } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 2433af826..6c71b679d 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -5096,6 +5096,9 @@ Status TransactionStressTestInserter( WriteOptions write_options; ReadOptions read_options; TransactionOptions txn_options; + if (rand->OneIn(2)) { + txn_options.use_only_the_last_commit_time_batch_for_recovery = true; + } // Inside the inserter we might also retake the snapshot. We do both since two // separte functions are engaged for each. txn_options.set_snapshot = rand->OneIn(2); diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index b42548709..00fa6cf03 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -477,6 +477,7 @@ class MySQLStyleTransactionTest // structures. txn_db_options.wp_snapshot_cache_bits = 1; txn_db_options.wp_commit_cache_bits = 10; + options.write_buffer_size = 1024; EXPECT_OK(ReOpen()); } };