diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 10e3c7113..675e88d63 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -258,6 +258,7 @@ void CompactionIterator::NextFromInput() { valid_ = true; break; } + TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); // Update input statistics if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { @@ -473,17 +474,30 @@ void CompactionIterator::NextFromInput() { if (valid_) { at_next_ = true; } - } else if (last_snapshot == current_user_key_snapshot_) { + } else if (last_snapshot == current_user_key_snapshot_ || + (last_snapshot > 0 && + last_snapshot < current_user_key_snapshot_)) { // If the earliest snapshot is which this key is visible in // is the same as the visibility of a previous instance of the // same key, then this kv is not visible in any snapshot. // Hidden by an newer entry for same user key - // TODO(noetzli): why not > ? // // Note: Dropping this key will not affect TransactionDB write-conflict // checking since there has already been a record returned for this key // in this snapshot. assert(last_sequence >= current_user_key_sequence_); + + // Note2: if last_snapshot < current_user_key_snapshot, it can only + // mean last_snapshot is released between we process last value and + // this value, and findEarliestVisibleSnapshot returns the next snapshot + // as current_user_key_snapshot. In this case last value and current + // value are both in current_user_key_snapshot currently. + assert(last_snapshot == current_user_key_snapshot_ || + (snapshot_checker_ != nullptr && + snapshot_checker_->CheckInSnapshot(current_user_key_sequence_, + last_snapshot) == + SnapshotCheckerResult::kSnapshotReleased)); + ++iter_stats_.num_record_drop_hidden; // (A) input_->Next(); } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && @@ -639,13 +653,23 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( *prev_snapshot = *std::prev(snapshots_iter); assert(*prev_snapshot < in); } + if (snapshot_checker_ == nullptr) { + return snapshots_iter != snapshots_->end() + ? *snapshots_iter : kMaxSequenceNumber; + } + bool has_released_snapshot = !released_snapshots_.empty(); for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) { auto cur = *snapshots_iter; assert(in <= cur); - if (snapshot_checker_ == nullptr || - snapshot_checker_->CheckInSnapshot(in, cur) == - SnapshotCheckerResult::kInSnapshot) { + // Skip if cur is in released_snapshots. + if (has_released_snapshot && released_snapshots_.count(cur) > 0) { + continue; + } + auto res = snapshot_checker_->CheckInSnapshot(in, cur); + if (res == SnapshotCheckerResult::kInSnapshot) { return cur; + } else if (res == SnapshotCheckerResult::kSnapshotReleased) { + released_snapshots_.insert(cur); } *prev_snapshot = cur; } @@ -667,7 +691,12 @@ bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) { auto in_snapshot = snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_); while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) { + // Avoid the the current earliest_snapshot_ being return as + // earliest visible snapshot for the next value. So if a value's sequence + // is zero-ed out by PrepareOutput(), the next value will be compact out. + released_snapshots_.insert(earliest_snapshot_); earliest_snapshot_iter_++; + if (earliest_snapshot_iter_ == snapshots_->end()) { earliest_snapshot_ = kMaxSequenceNumber; } else { diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 1e39564df..516ffc8db 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include "db/compaction.h" @@ -144,6 +145,12 @@ class CompactionIterator { const Comparator* cmp_; MergeHelper* merge_helper_; const std::vector* snapshots_; + // List of snapshots released during compaction. + // findEarliestVisibleSnapshot() find them out from return of + // snapshot_checker, and make sure they will not be returned as + // earliest visible snapshot of an older value. + // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3. + std::unordered_set released_snapshots_; std::vector::const_iterator earliest_snapshot_iter_; const SequenceNumber earliest_write_conflict_snapshot_; const SnapshotChecker* const snapshot_checker_; diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index f73333c61..e4badf793 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -2242,6 +2242,10 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) { } } +// Insert two values, v1 and v2, for a key. Between prepare and commit of v2 +// take two snapshots, s1 and s2. Release s1 during compaction. +// Test to make sure compaction doesn't get confused and think s1 can see both +// values, and thus compact out the older value by mistake. TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) { const size_t snapshot_cache_bits = 7; // same as default const size_t commit_cache_bits = 0; // minimum commit cache @@ -2285,6 +2289,110 @@ TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) { SyncPoint::GetInstance()->ClearAllCallBacks(); } +// Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2, +// after committing v2. Release s1 during compaction, right after compaction +// processes v2 and before processes v1. Test to make sure compaction doesn't +// get confused and believe v1 and v2 are visible to different snapshot +// (v1 by s2, v2 by s1) and refuse to compact out v1. +TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) { + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 0; // minimum commit cache + DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + + ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); + ASSERT_OK(db->Put(WriteOptions(), "key1", "value2")); + SequenceNumber v2_seq = db->GetLatestSequenceNumber(); + auto* s1 = db->GetSnapshot(); + // Advance sequence number. + ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy")); + auto* s2 = db->GetSnapshot(); + + int count_value = 0; + auto callback = [&](void* arg) { + auto* ikey = reinterpret_cast(arg); + if (ikey->user_key == "key1") { + count_value++; + if (count_value == 2) { + // Processing v1. + db->ReleaseSnapshot(s1); + // Add some keys to advance max_evicted_seq and update + // old_commit_map. + ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy")); + ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy")); + } + } + }; + SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV", + callback); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->Flush(FlushOptions())); + // value1 should be compact out. + VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}}); + + // cleanup + db->ReleaseSnapshot(s2); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +// Insert two values, v1 and v2, for a key. Insert another dummy key +// so to evict the commit cache for v2, while v1 is still in commit cache. +// Take two snapshots, s1 and s2. Release s1 during compaction. +// Since commit cache for v2 is evicted, and old_commit_map don't have +// s1 (it is released), +// TODO(myabandeh): how can we be sure that the v2's commit info is evicted +// (and not v1's)? Instead of putting a dummy, we can directly call +// AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache. +TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) { + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 1; // commit cache size = 2 + DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + + // Add a dummy key to evict v2 commit cache, but keep v1 commit cache. + // It also advance max_evicted_seq and can trigger old_commit_map cleanup. + auto add_dummy = [&]() { + auto* txn_dummy = + db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); + ASSERT_OK(txn_dummy->SetName("txn_dummy")); + ASSERT_OK(txn_dummy->Put("dummy", "dummy")); + ASSERT_OK(txn_dummy->Prepare()); + ASSERT_OK(txn_dummy->Commit()); + delete txn_dummy; + }; + + ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); + auto* txn = + db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); + ASSERT_OK(txn->SetName("txn")); + ASSERT_OK(txn->Put("key1", "value2")); + ASSERT_OK(txn->Prepare()); + // TODO(myabandeh): replace it with GetId()? + auto v2_seq = db->GetLatestSequenceNumber(); + ASSERT_OK(txn->Commit()); + delete txn; + auto* s1 = db->GetSnapshot(); + // Dummy key to advance sequence number. + add_dummy(); + auto* s2 = db->GetSnapshot(); + + auto callback = [&](void*) { + db->ReleaseSnapshot(s1); + // Add some dummy entries to trigger s1 being cleanup from old_commit_map. + add_dummy(); + add_dummy(); + }; + SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit", + callback); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->Flush(FlushOptions())); + // value1 should be compact out. + VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}}); + + db->ReleaseSnapshot(s2); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) { const size_t snapshot_cache_bits = 7; // same as default const size_t commit_cache_bits = 0; // minimum commit cache diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index c34b55760..a212d1370 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -357,9 +357,8 @@ Status WritePreparedTxn::RollbackInternal() { prepare_seq); // Commit the batch by writing an empty batch to the queue that will release // the commit sequence number to readers. - const size_t ZERO_COMMITS = 0; - WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( - wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS); + WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare( + wpt_db_, db_impl_, GetId(), prepare_seq, prepare_batch_cnt_); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); // In the absence of Prepare markers, use Noop as a batch separator @@ -368,18 +367,10 @@ Status WritePreparedTxn::RollbackInternal() { NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, &update_commit_map_with_prepare); assert(!s.ok() || seq_used != kMaxSequenceNumber); - // Mark the txn as rolled back - uint64_t& rollback_seq = seq_used; + ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, + "RollbackInternal (status=%s) commit: %" PRIu64, + s.ToString().c_str(), GetId()); if (s.ok()) { - // Note: it is safe to do it after PreReleaseCallback via WriteImpl since - // all the writes by the prpared batch are already blinded by the rollback - // batch. The only reason we commit the prepared batch here is to benefit - // from the existing mechanism in CommitCache that takes care of the rare - // cases that the prepare seq is visible to a snsapshot but max evicted seq - // advances that prepare seq. - for (size_t i = 0; i < prepare_batch_cnt_; i++) { - wpt_db_->AddCommitted(GetId() + i, rollback_seq); - } wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); } diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 4c0fd8ccd..c16bbea38 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -791,6 +791,47 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { bool publish_seq_; }; +// For two_write_queues commit both the aborted batch and the cleanup batch and then published the seq +class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback { + public: + WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db, + DBImpl* db_impl, + SequenceNumber prep_seq, + SequenceNumber rollback_seq, + size_t prep_batch_cnt) + : db_(db), + db_impl_(db_impl), + prep_seq_(prep_seq), + rollback_seq_(rollback_seq), + prep_batch_cnt_(prep_batch_cnt) { + assert(prep_seq != kMaxSequenceNumber); + assert(rollback_seq != kMaxSequenceNumber); + assert(prep_batch_cnt_ > 0); + } + + virtual Status Callback(SequenceNumber commit_seq, + bool is_mem_disabled) override { + assert(is_mem_disabled); // implies the 2nd queue +#ifdef NDEBUG + (void)is_mem_disabled; +#endif + const uint64_t last_commit_seq = commit_seq; + db_->AddCommitted(rollback_seq_, last_commit_seq); + for (size_t i = 0; i < prep_batch_cnt_; i++) { + db_->AddCommitted(prep_seq_ + i, last_commit_seq); + } + db_impl_->SetLastPublishedSequence(last_commit_seq); + return Status::OK(); + } + + private: + WritePreparedTxnDB* db_; + DBImpl* db_impl_; + SequenceNumber prep_seq_; + SequenceNumber rollback_seq_; + size_t prep_batch_cnt_; +}; + // Count the number of sub-batches inside a batch. A sub-batch does not have // duplicate keys. struct SubBatchCounter : public WriteBatch::Handler { diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 40d8fe091..dc63c424b 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -447,9 +447,8 @@ Status WriteUnpreparedTxn::RollbackInternal() { prepare_seq); // Commit the batch by writing an empty batch to the queue that will release // the commit sequence number to readers. - const size_t ZERO_COMMITS = 0; - WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( - wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS); + WriteUnpreparedRollbackPreReleaseCallback update_commit_map_with_prepare( + wpt_db_, db_impl_, unprep_seqs_, prepare_seq); WriteBatch empty_batch; empty_batch.PutLogData(Slice()); // In the absence of Prepare markers, use Noop as a batch separator @@ -459,19 +458,7 @@ Status WriteUnpreparedTxn::RollbackInternal() { &update_commit_map_with_prepare); assert(!s.ok() || seq_used != kMaxSequenceNumber); // Mark the txn as rolled back - uint64_t& rollback_seq = seq_used; if (s.ok()) { - // Note: it is safe to do it after PreReleaseCallback via WriteImpl since - // all the writes by the prpared batch are already blinded by the rollback - // batch. The only reason we commit the prepared batch here is to benefit - // from the existing mechanism in CommitCache that takes care of the rare - // cases that the prepare seq is visible to a snsapshot but max evicted seq - // advances that prepare seq. - for (const auto& seq : unprep_seqs_) { - for (size_t i = 0; i < seq.second; i++) { - wpt_db_->AddCommitted(seq.first + i, rollback_seq); - } - } for (const auto& seq : unprep_seqs_) { wpt_db_->RemovePrepared(seq.first, seq.second); } diff --git a/utilities/transactions/write_unprepared_txn_db.h b/utilities/transactions/write_unprepared_txn_db.h index 6763aa99f..dfba4c136 100644 --- a/utilities/transactions/write_unprepared_txn_db.h +++ b/utilities/transactions/write_unprepared_txn_db.h @@ -106,6 +106,44 @@ class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { bool publish_seq_; }; +class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback { + // TODO(lth): Reduce code duplication with + // WritePreparedCommitEntryPreReleaseCallback + public: + WriteUnpreparedRollbackPreReleaseCallback( + WritePreparedTxnDB* db, DBImpl* db_impl, + const std::map& unprep_seqs, + SequenceNumber rollback_seq) + : db_(db), + db_impl_(db_impl), + unprep_seqs_(unprep_seqs), + rollback_seq_(rollback_seq) { + assert(unprep_seqs.size() > 0); + assert(db_impl_->immutable_db_options().two_write_queues); + } + + virtual Status Callback(SequenceNumber commit_seq, bool is_mem_disabled + __attribute__((__unused__))) override { + assert(is_mem_disabled); // implies the 2nd queue + const uint64_t last_commit_seq = commit_seq; + db_->AddCommitted(rollback_seq_, last_commit_seq); + // Recall that unprep_seqs maps (un)prepared_seq => prepare_batch_cnt. + for (const auto& s : unprep_seqs_) { + for (size_t i = 0; i < s.second; i++) { + db_->AddCommitted(s.first + i, last_commit_seq); + } + } + db_impl_->SetLastPublishedSequence(last_commit_seq); + return Status::OK(); + } + + private: + WritePreparedTxnDB* db_; + DBImpl* db_impl_; + const std::map& unprep_seqs_; + SequenceNumber rollback_seq_; +}; + struct KeySetBuilder : public WriteBatch::Handler { WriteUnpreparedTxn* txn_; bool rollback_merge_operands_;