diff --git a/HISTORY.md b/HISTORY.md index e915ec8e9..c76046349 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ * Fixed a bug in CompactionIterator when write-prepared transaction is used. A released earliest write conflict snapshot may cause assertion failure in dbg mode and unexpected key in opt mode. * Fix ticker WRITE_WITH_WAL("rocksdb.write.wal"), this bug is caused by a bad extra `RecordTick(stats_, WRITE_WITH_WAL)` (at 2 place), this fix remove the extra `RecordTick`s and fix the corresponding test case. * EventListener::OnTableFileCreated was previously called with OK status and file_size==0 in cases of no SST file contents written (because there was no content to add) and the empty file deleted before calling the listener. Now the status is Aborted. +* Fixed a bug in CompactionIterator when write-preared transaction is used. Releasing earliest_snapshot during compaction may cause a SingleDelete to be output after a PUT of the same user key whose seq has been zeroed. ### Behavior Changes * `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files. diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 70df36561..c4434bca2 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -433,6 +433,8 @@ void CompactionIterator::NextFromInput() { has_outputted_key_ = false; + last_key_seq_zeroed_ = false; + current_key_committed_ = KeyCommitted(ikey_.sequence); // Apply the compaction filter to the first committed version of the user @@ -594,10 +596,16 @@ void CompactionIterator::NextFromInput() { TEST_SYNC_POINT_CALLBACK( "CompactionIterator::NextFromInput:SingleDelete:1", const_cast(c)); - // Check whether the next key belongs to the same snapshot as the - // SingleDelete. - if (prev_snapshot == 0 || - DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) { + if (last_key_seq_zeroed_) { + ++iter_stats_.num_record_drop_hidden; + ++iter_stats_.num_record_drop_obsolete; + assert(bottommost_level_); + AdvanceInputIter(); + } else if (prev_snapshot == 0 || + DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) { + // Check whether the next key belongs to the same snapshot as the + // SingleDelete. + TEST_SYNC_POINT_CALLBACK( "CompactionIterator::NextFromInput:SingleDelete:2", nullptr); if (next_ikey.type == kTypeSingleDeletion) { @@ -661,6 +669,9 @@ void CompactionIterator::NextFromInput() { // We hit the next snapshot without hitting a put, so the iterator // returns the single delete. valid_ = true; + TEST_SYNC_POINT_CALLBACK( + "CompactionIterator::NextFromInput:SingleDelete:3", + const_cast(c)); } } else { // We are at the end of the input, could not parse the next key, or hit @@ -683,6 +694,11 @@ void CompactionIterator::NextFromInput() { if (!bottommost_level_) { ++iter_stats_.num_optimized_del_drop_obsolete; } + } else if (last_key_seq_zeroed_) { + // Skip. + ++iter_stats_.num_record_drop_hidden; + ++iter_stats_.num_record_drop_obsolete; + assert(bottommost_level_); } else { // Output SingleDelete valid_ = true; @@ -1038,6 +1054,9 @@ void CompactionIterator::PrepareOutput() { ikey_.type); } ikey_.sequence = 0; + last_key_seq_zeroed_ = true; + TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput:ZeroingSeq", + &ikey_); if (!timestamp_size_) { current_key_.UpdateInternalKey(0, ikey_.type); } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) { diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 7431e6688..c3785a893 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -401,6 +401,10 @@ class CompactionIterator { const int level_; + // True if the previous internal key (same user key)'s sequence number has + // just been zeroed out during bottommost compaction. + bool last_key_seq_zeroed_{false}; + void AdvanceInputIter() { input_.Next(); } void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); } diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index db0a2a8e7..c90d34b1a 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -3067,6 +3067,138 @@ TEST_P(WritePreparedTransactionTest, db->ReleaseSnapshot(snapshot1); } +TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing) { + constexpr size_t kSnapshotCacheBits = 7; // same as default + constexpr size_t kCommitCacheBits = 0; // minimum commit cache + UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); + options.disable_auto_compactions = true; + ASSERT_OK(ReOpen()); + + ASSERT_OK(db->Put(WriteOptions(), "a", "value")); + ASSERT_OK(db->Put(WriteOptions(), "b", "value")); + ASSERT_OK(db->Put(WriteOptions(), "c", "value")); + ASSERT_OK(db->Flush(FlushOptions())); + + { + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 2; + ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); + } + + ASSERT_OK(db->SingleDelete(WriteOptions(), "b")); + + // Take a snapshot so that the SD won't be dropped during flush. + auto* tmp_snapshot = db->GetSnapshot(); + + ASSERT_OK(db->Put(WriteOptions(), "b", "value2")); + auto* snapshot = db->GetSnapshot(); + ASSERT_OK(db->Flush(FlushOptions())); + + db->ReleaseSnapshot(tmp_snapshot); + + // Bump the sequence so that the below bg compaction job's snapshot will be + // different from snapshot's sequence. + ASSERT_OK(db->Put(WriteOptions(), "z", "foo")); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) { + const auto* const ikey = + reinterpret_cast(arg); + assert(ikey); + if (ikey->user_key == "b") { + assert(ikey->type == kTypeValue); + db->ReleaseSnapshot(snapshot); + + // Bump max_evicted_seq. + ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing2) { + constexpr size_t kSnapshotCacheBits = 7; // same as default + constexpr size_t kCommitCacheBits = 0; // minimum commit cache + UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); + options.disable_auto_compactions = true; + ASSERT_OK(ReOpen()); + + // Generate an L0 with only SD for one key "b". + ASSERT_OK(db->Put(WriteOptions(), "a", "value")); + ASSERT_OK(db->Put(WriteOptions(), "b", "value")); + // Take a snapshot so that subsequent flush outputs the SD for "b". + auto* tmp_snapshot = db->GetSnapshot(); + ASSERT_OK(db->SingleDelete(WriteOptions(), "b")); + ASSERT_OK(db->Put(WriteOptions(), "c", "value")); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::NextFromInput:SingleDelete:3", [&](void* arg) { + if (!arg) { + db->ReleaseSnapshot(tmp_snapshot); + // Bump max_evicted_seq + ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->Flush(FlushOptions())); + // Finish generating L0 with only SD for "b". + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Move the L0 to L2. + { + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 2; + ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); + } + + ASSERT_OK(db->Put(WriteOptions(), "b", "value1")); + + auto* snapshot = db->GetSnapshot(); + + // Bump seq so that a subsequent flush/compaction job's snapshot is larger + // than the above snapshot's seq. + ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare")); + + // Generate a second L0. + ASSERT_OK(db->Flush(FlushOptions())); + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) { + const auto* const ikey = + reinterpret_cast(arg); + assert(ikey); + if (ikey->user_key == "b") { + assert(ikey->type == kTypeValue); + db->ReleaseSnapshot(snapshot); + + // Bump max_evicted_seq. + ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + // A more complex test to verify compaction/flush should keep keys visible // to snapshots. TEST_P(WritePreparedTransactionTest,