From 9b53f14a35b42b7686cf731d61f41788bb70489d Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 3 Nov 2021 15:53:40 -0700 Subject: [PATCH] Fixed a bug in CompactionIterator when write-preared transaction is used (#9060) Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/9060 RocksDB bottommost level compaction may zero out an internal key's sequence if the key's sequence is in the earliest_snapshot. In write-prepared transaction, checking the visibility of a certain sequence in a specific released snapshot may return a "snapshot released" result. Therefore, it is possible, after a certain sequence of events, a PUT has its sequence zeroed out, but a subsequent SingleDelete of the same key will still be output with its original sequence. This violates the ascending order of keys and leads to incorrect result. The solution is to use an extra variable `last_key_seq_zeroed_` to track the information about visibility in earliest snapshot. With this variable, we can know for sure that a SingleDelete is in the earliest snapshot even if the said snapshot is released during compaction before processing the SD. Reviewed By: ltamasi Differential Revision: D31813016 fbshipit-source-id: d8cff59d6f34e0bdf282614034aaea99be9174e1 --- HISTORY.md | 1 + db/compaction/compaction_iterator.cc | 27 +++- db/compaction/compaction_iterator.h | 4 + .../write_prepared_transaction_test.cc | 132 ++++++++++++++++++ 4 files changed, 160 insertions(+), 4 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index e915ec8e9..c76046349 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ * Fixed a bug in CompactionIterator when write-prepared transaction is used. A released earliest write conflict snapshot may cause assertion failure in dbg mode and unexpected key in opt mode. * Fix ticker WRITE_WITH_WAL("rocksdb.write.wal"), this bug is caused by a bad extra `RecordTick(stats_, WRITE_WITH_WAL)` (at 2 place), this fix remove the extra `RecordTick`s and fix the corresponding test case. * EventListener::OnTableFileCreated was previously called with OK status and file_size==0 in cases of no SST file contents written (because there was no content to add) and the empty file deleted before calling the listener. Now the status is Aborted. +* Fixed a bug in CompactionIterator when write-preared transaction is used. Releasing earliest_snapshot during compaction may cause a SingleDelete to be output after a PUT of the same user key whose seq has been zeroed. ### Behavior Changes * `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files. diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 70df36561..c4434bca2 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -433,6 +433,8 @@ void CompactionIterator::NextFromInput() { has_outputted_key_ = false; + last_key_seq_zeroed_ = false; + current_key_committed_ = KeyCommitted(ikey_.sequence); // Apply the compaction filter to the first committed version of the user @@ -594,10 +596,16 @@ void CompactionIterator::NextFromInput() { TEST_SYNC_POINT_CALLBACK( "CompactionIterator::NextFromInput:SingleDelete:1", const_cast(c)); - // Check whether the next key belongs to the same snapshot as the - // SingleDelete. - if (prev_snapshot == 0 || - DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) { + if (last_key_seq_zeroed_) { + ++iter_stats_.num_record_drop_hidden; + ++iter_stats_.num_record_drop_obsolete; + assert(bottommost_level_); + AdvanceInputIter(); + } else if (prev_snapshot == 0 || + DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) { + // Check whether the next key belongs to the same snapshot as the + // SingleDelete. + TEST_SYNC_POINT_CALLBACK( "CompactionIterator::NextFromInput:SingleDelete:2", nullptr); if (next_ikey.type == kTypeSingleDeletion) { @@ -661,6 +669,9 @@ void CompactionIterator::NextFromInput() { // We hit the next snapshot without hitting a put, so the iterator // returns the single delete. valid_ = true; + TEST_SYNC_POINT_CALLBACK( + "CompactionIterator::NextFromInput:SingleDelete:3", + const_cast(c)); } } else { // We are at the end of the input, could not parse the next key, or hit @@ -683,6 +694,11 @@ void CompactionIterator::NextFromInput() { if (!bottommost_level_) { ++iter_stats_.num_optimized_del_drop_obsolete; } + } else if (last_key_seq_zeroed_) { + // Skip. + ++iter_stats_.num_record_drop_hidden; + ++iter_stats_.num_record_drop_obsolete; + assert(bottommost_level_); } else { // Output SingleDelete valid_ = true; @@ -1038,6 +1054,9 @@ void CompactionIterator::PrepareOutput() { ikey_.type); } ikey_.sequence = 0; + last_key_seq_zeroed_ = true; + TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput:ZeroingSeq", + &ikey_); if (!timestamp_size_) { current_key_.UpdateInternalKey(0, ikey_.type); } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) { diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 7431e6688..c3785a893 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -401,6 +401,10 @@ class CompactionIterator { const int level_; + // True if the previous internal key (same user key)'s sequence number has + // just been zeroed out during bottommost compaction. + bool last_key_seq_zeroed_{false}; + void AdvanceInputIter() { input_.Next(); } void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); } diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index db0a2a8e7..c90d34b1a 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -3067,6 +3067,138 @@ TEST_P(WritePreparedTransactionTest, db->ReleaseSnapshot(snapshot1); } +TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing) { + constexpr size_t kSnapshotCacheBits = 7; // same as default + constexpr size_t kCommitCacheBits = 0; // minimum commit cache + UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); + options.disable_auto_compactions = true; + ASSERT_OK(ReOpen()); + + ASSERT_OK(db->Put(WriteOptions(), "a", "value")); + ASSERT_OK(db->Put(WriteOptions(), "b", "value")); + ASSERT_OK(db->Put(WriteOptions(), "c", "value")); + ASSERT_OK(db->Flush(FlushOptions())); + + { + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 2; + ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); + } + + ASSERT_OK(db->SingleDelete(WriteOptions(), "b")); + + // Take a snapshot so that the SD won't be dropped during flush. + auto* tmp_snapshot = db->GetSnapshot(); + + ASSERT_OK(db->Put(WriteOptions(), "b", "value2")); + auto* snapshot = db->GetSnapshot(); + ASSERT_OK(db->Flush(FlushOptions())); + + db->ReleaseSnapshot(tmp_snapshot); + + // Bump the sequence so that the below bg compaction job's snapshot will be + // different from snapshot's sequence. + ASSERT_OK(db->Put(WriteOptions(), "z", "foo")); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) { + const auto* const ikey = + reinterpret_cast(arg); + assert(ikey); + if (ikey->user_key == "b") { + assert(ikey->type == kTypeValue); + db->ReleaseSnapshot(snapshot); + + // Bump max_evicted_seq. + ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotAfterSeqZeroing2) { + constexpr size_t kSnapshotCacheBits = 7; // same as default + constexpr size_t kCommitCacheBits = 0; // minimum commit cache + UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits); + options.disable_auto_compactions = true; + ASSERT_OK(ReOpen()); + + // Generate an L0 with only SD for one key "b". + ASSERT_OK(db->Put(WriteOptions(), "a", "value")); + ASSERT_OK(db->Put(WriteOptions(), "b", "value")); + // Take a snapshot so that subsequent flush outputs the SD for "b". + auto* tmp_snapshot = db->GetSnapshot(); + ASSERT_OK(db->SingleDelete(WriteOptions(), "b")); + ASSERT_OK(db->Put(WriteOptions(), "c", "value")); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::NextFromInput:SingleDelete:3", [&](void* arg) { + if (!arg) { + db->ReleaseSnapshot(tmp_snapshot); + // Bump max_evicted_seq + ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->Flush(FlushOptions())); + // Finish generating L0 with only SD for "b". + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Move the L0 to L2. + { + CompactRangeOptions cro; + cro.change_level = true; + cro.target_level = 2; + ASSERT_OK(db->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); + } + + ASSERT_OK(db->Put(WriteOptions(), "b", "value1")); + + auto* snapshot = db->GetSnapshot(); + + // Bump seq so that a subsequent flush/compaction job's snapshot is larger + // than the above snapshot's seq. + ASSERT_OK(db->Put(WriteOptions(), "x", "dontcare")); + + // Generate a second L0. + ASSERT_OK(db->Flush(FlushOptions())); + + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput:ZeroingSeq", [&](void* arg) { + const auto* const ikey = + reinterpret_cast(arg); + assert(ikey); + if (ikey->user_key == "b") { + assert(ikey->type == kTypeValue); + db->ReleaseSnapshot(snapshot); + + // Bump max_evicted_seq. + ASSERT_OK(db->Put(WriteOptions(), "z", "dontcare")); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + // A more complex test to verify compaction/flush should keep keys visible // to snapshots. TEST_P(WritePreparedTransactionTest,