Fix bug caused by releasing snapshot(s) during compaction (#8608)

Summary:
In debug mode, we are seeing assertion failure as follows

```
db/compaction/compaction_iterator.cc:980: void rocksdb::CompactionIterator::PrepareOutput(): \
Assertion `ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion' failed.
```

It is caused by releasing earliest snapshot during compaction between the execution of
`NextFromInput()` and `PrepareOutput()`.

In one case, as demonstrated in unit test `WritePreparedTransaction.ReleaseEarliestSnapshotDuringCompaction_WithSD2`,
incorrect result may be returned by a following range scan if we disable assertion, as in opt compilation
level: the SingleDelete marker's sequence number is zeroed out, but the preceding PUT is also
outputted to the SST file after compaction. Due to the logic of DBIter, the PUT will not be
skipped and will be returned by iterator in range scan. https://github.com/facebook/rocksdb/issues/8661 illustrates what happened.

Fix by taking a more conservative approach: make compaction zero out sequence number only
if key is in the earliest snapshot when the compaction starts.

Another assertion failure is
```
Assertion `current_user_key_snapshot_ == last_snapshot' failed.
```

It's caused by releasing the snapshot between the PUT and SingleDelete during compaction.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8608

Test Plan: make check

Reviewed By: jay-zhuang

Differential Revision: D30145645

Pulled By: riversand963

fbshipit-source-id: 699f58e66faf70732ad53810ccef43935d3bbe81
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent 6878cedcc3
commit 2b367fa8cc
  1. 1
      HISTORY.md
  2. 36
      db/compaction/compaction_iterator.cc
  3. 13
      db/compaction/compaction_iterator.h
  4. 2
      db/compaction/compaction_iterator_test.cc
  5. 8
      utilities/transactions/transaction_test.h
  6. 194
      utilities/transactions/write_prepared_transaction_test.cc

@ -7,6 +7,7 @@
* Removed a call to `RenameFile()` on a non-existent info log file ("LOG") when opening a new DB. Such a call was guaranteed to fail though did not impact applications since we swallowed the error. Now we also stopped swallowing errors in renaming "LOG" file. * Removed a call to `RenameFile()` on a non-existent info log file ("LOG") when opening a new DB. Such a call was guaranteed to fail though did not impact applications since we swallowed the error. Now we also stopped swallowing errors in renaming "LOG" file.
* Fixed an issue where `OnFlushCompleted` was not called for atomic flush. * Fixed an issue where `OnFlushCompleted` was not called for atomic flush.
* Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`. * Fixed a bug affecting the batched `MultiGet` API when used with keys spanning multiple column families and `sorted_input == false`.
* Fixed a potential incorrect result in opt mode and assertion failures caused by releasing snapshot(s) during compaction.
### New Features ### New Features
* Made the EventListener extend the Customizable class. * Made the EventListener extend the Customizable class.

@ -495,11 +495,11 @@ void CompactionIterator::NextFromInput() {
"Unexpected key type %d for compaction output", "Unexpected key type %d for compaction output",
ikey_.type); ikey_.type);
} }
assert(current_user_key_snapshot_ == last_snapshot); assert(current_user_key_snapshot_ >= last_snapshot);
if (current_user_key_snapshot_ != last_snapshot) { if (current_user_key_snapshot_ < last_snapshot) {
ROCKS_LOG_FATAL(info_log_, ROCKS_LOG_FATAL(info_log_,
"current_user_key_snapshot_ (%" PRIu64 "current_user_key_snapshot_ (%" PRIu64
") != last_snapshot (%" PRIu64 ")", ") < last_snapshot (%" PRIu64 ")",
current_user_key_snapshot_, last_snapshot); current_user_key_snapshot_, last_snapshot);
} }
@ -555,10 +555,20 @@ void CompactionIterator::NextFromInput() {
ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok() && .ok() &&
cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
#ifndef NDEBUG
const Compaction* c =
compaction_ ? compaction_->real_compaction() : nullptr;
#endif
TEST_SYNC_POINT_CALLBACK(
"CompactionIterator::NextFromInput:SingleDelete:1",
const_cast<Compaction*>(c));
// Check whether the next key belongs to the same snapshot as the // Check whether the next key belongs to the same snapshot as the
// SingleDelete. // SingleDelete.
if (prev_snapshot == 0 || if (prev_snapshot == 0 ||
DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) { DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
TEST_SYNC_POINT_CALLBACK(
"CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
if (next_ikey.type == kTypeSingleDeletion) { if (next_ikey.type == kTypeSingleDeletion) {
// We encountered two SingleDeletes in a row. This could be due to // We encountered two SingleDeletes in a row. This could be due to
// unexpected user input. // unexpected user input.
@ -604,6 +614,9 @@ void CompactionIterator::NextFromInput() {
// Set up the Put to be outputted in the next iteration. // Set up the Put to be outputted in the next iteration.
// (Optimization 3). // (Optimization 3).
clear_and_output_next_key_ = true; clear_and_output_next_key_ = true;
TEST_SYNC_POINT_CALLBACK(
"CompactionIterator::NextFromInput:KeepSDForWW",
/*arg=*/nullptr);
} }
} else { } else {
// We hit the next snapshot without hitting a put, so the iterator // We hit the next snapshot without hitting a put, so the iterator
@ -619,7 +632,8 @@ void CompactionIterator::NextFromInput() {
// iteration. If the next key is corrupt, we return before the // iteration. If the next key is corrupt, we return before the
// comparison, so the value of has_current_user_key does not matter. // comparison, so the value of has_current_user_key does not matter.
has_current_user_key_ = false; has_current_user_key_ = false;
if (compaction_ != nullptr && InEarliestSnapshot(ikey_.sequence) && if (compaction_ != nullptr &&
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) { &level_ptrs_)) {
// Key doesn't exist outside of this range. // Key doesn't exist outside of this range.
@ -663,7 +677,7 @@ void CompactionIterator::NextFromInput() {
(ikey_.type == kTypeDeletion || (ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeDeletionWithTimestamp && (ikey_.type == kTypeDeletionWithTimestamp &&
cmp_with_history_ts_low_ < 0)) && cmp_with_history_ts_low_ < 0)) &&
InEarliestSnapshot(ikey_.sequence) && DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikeyNotNeededForIncrementalSnapshot() && ikeyNotNeededForIncrementalSnapshot() &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) { &level_ptrs_)) {
@ -706,6 +720,13 @@ void CompactionIterator::NextFromInput() {
ikey_.user_key, &level_ptrs_)); ikey_.user_key, &level_ptrs_));
ParsedInternalKey next_ikey; ParsedInternalKey next_ikey;
AdvanceInputIter(); AdvanceInputIter();
#ifndef NDEBUG
const Compaction* c =
compaction_ ? compaction_->real_compaction() : nullptr;
#endif
TEST_SYNC_POINT_CALLBACK(
"CompactionIterator::NextFromInput:BottommostDelete:1",
const_cast<Compaction*>(c));
// Skip over all versions of this key that happen to occur in the same // Skip over all versions of this key that happen to occur in the same
// snapshot range as the delete. // snapshot range as the delete.
// //
@ -964,7 +985,8 @@ void CompactionIterator::PrepareOutput() {
if (valid_ && compaction_ != nullptr && if (valid_ && compaction_ != nullptr &&
!compaction_->allow_ingest_behind() && !compaction_->allow_ingest_behind() &&
ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
InEarliestSnapshot(ikey_.sequence) && ikey_.type != kTypeMerge) { DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
ROCKS_LOG_FATAL(info_log_, ROCKS_LOG_FATAL(info_log_,
@ -1040,7 +1062,7 @@ inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
(ikey_.sequence < preserve_deletes_seqnum_); (ikey_.sequence < preserve_deletes_seqnum_);
} }
bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) { bool CompactionIterator::IsInCurrentEarliestSnapshot(SequenceNumber sequence) {
assert(snapshot_checker_ != nullptr); assert(snapshot_checker_ != nullptr);
bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber || bool pre_condition = (earliest_snapshot_ == kMaxSequenceNumber ||
(earliest_snapshot_iter_ != snapshots_->end() && (earliest_snapshot_iter_ != snapshots_->end() &&

@ -99,6 +99,8 @@ class CompactionIterator {
virtual Version* input_version() const = 0; virtual Version* input_version() const = 0;
virtual bool DoesInputReferenceBlobFiles() const = 0; virtual bool DoesInputReferenceBlobFiles() const = 0;
virtual const Compaction* real_compaction() const = 0;
}; };
class RealCompaction : public CompactionProxy { class RealCompaction : public CompactionProxy {
@ -152,6 +154,8 @@ class CompactionIterator {
return compaction_->DoesInputReferenceBlobFiles(); return compaction_->DoesInputReferenceBlobFiles();
} }
const Compaction* real_compaction() const override { return compaction_; }
private: private:
const Compaction* compaction_; const Compaction* compaction_;
}; };
@ -267,13 +271,13 @@ class CompactionIterator {
SnapshotCheckerResult::kInSnapshot; SnapshotCheckerResult::kInSnapshot;
} }
bool IsInEarliestSnapshot(SequenceNumber sequence); bool IsInCurrentEarliestSnapshot(SequenceNumber sequence);
bool DefinitelyInSnapshot(SequenceNumber seq, SequenceNumber snapshot); bool DefinitelyInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
bool DefinitelyNotInSnapshot(SequenceNumber seq, SequenceNumber snapshot); bool DefinitelyNotInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
bool InEarliestSnapshot(SequenceNumber seq); bool InCurrentEarliestSnapshot(SequenceNumber seq);
// Extract user-defined timestamp from user key if possible and compare it // Extract user-defined timestamp from user key if possible and compare it
// with *full_history_ts_low_ if applicable. // with *full_history_ts_low_ if applicable.
@ -435,9 +439,10 @@ inline bool CompactionIterator::DefinitelyNotInSnapshot(
SnapshotCheckerResult::kNotInSnapshot))); SnapshotCheckerResult::kNotInSnapshot)));
} }
inline bool CompactionIterator::InEarliestSnapshot(SequenceNumber seq) { inline bool CompactionIterator::InCurrentEarliestSnapshot(SequenceNumber seq) {
return ((seq) <= earliest_snapshot_ && return ((seq) <= earliest_snapshot_ &&
(snapshot_checker_ == nullptr || LIKELY(IsInEarliestSnapshot(seq)))); (snapshot_checker_ == nullptr ||
LIKELY(IsInCurrentEarliestSnapshot(seq))));
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -184,6 +184,8 @@ class FakeCompaction : public CompactionIterator::CompactionProxy {
bool DoesInputReferenceBlobFiles() const override { return false; } bool DoesInputReferenceBlobFiles() const override { return false; }
const Compaction* real_compaction() const override { return nullptr; }
bool key_not_exists_beyond_output_level = false; bool key_not_exists_beyond_output_level = false;
bool is_bottommost_level = false; bool is_bottommost_level = false;

@ -95,8 +95,12 @@ class TransactionTestBase : public ::testing::Test {
// seems to be a bug in btrfs that the makes readdir return recently // seems to be a bug in btrfs that the makes readdir return recently
// unlink-ed files. By using the default fs we simply ignore errors resulted // unlink-ed files. By using the default fs we simply ignore errors resulted
// from attempting to delete such files in DestroyDB. // from attempting to delete such files in DestroyDB.
options.env = Env::Default(); if (getenv("KEEP_DB") == nullptr) {
EXPECT_OK(DestroyDB(dbname, options)); options.env = Env::Default();
EXPECT_OK(DestroyDB(dbname, options));
} else {
fprintf(stdout, "db is still in %s\n", dbname.c_str());
}
delete env; delete env;
} }

@ -2758,6 +2758,7 @@ TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
ASSERT_OK(ReOpen()); ASSERT_OK(ReOpen());
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
SequenceNumber put_seq = db->GetLatestSequenceNumber();
auto* transaction = auto* transaction =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(transaction->SetName("txn")); ASSERT_OK(transaction->SetName("txn"));
@ -2799,11 +2800,202 @@ TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
// Since the delete tombstone is not visible to snapshot2, we need to keep // Since the delete tombstone is not visible to snapshot2, we need to keep
// at least one version of the key, for write-conflict check. // at least one version of the key, for write-conflict check.
VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion}, VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
{"key1", "value1", 0, kTypeValue}}); {"key1", "value1", put_seq, kTypeValue}});
db->ReleaseSnapshot(snapshot2); db->ReleaseSnapshot(snapshot2);
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
} }
TEST_P(WritePreparedTransactionTest,
ReleaseEarliestSnapshotDuringCompaction_WithSD) {
constexpr size_t kSnapshotCacheBits = 7; // same as default
constexpr size_t kCommitCacheBits = 0; // minimum commit cache
UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
options.disable_auto_compactions = true;
ASSERT_OK(ReOpen());
ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(db->Flush(FlushOptions()));
auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
/*old_txn=*/nullptr);
ASSERT_OK(txn->SingleDelete("key"));
ASSERT_OK(txn->Put("wow", "value"));
ASSERT_OK(txn->SetName("txn"));
ASSERT_OK(txn->Prepare());
ASSERT_OK(db->Flush(FlushOptions()));
const bool two_write_queues = std::get<1>(GetParam());
if (two_write_queues) {
// In the case of two queues, commit another txn just to bump
// last_published_seq so that a subsequent GetSnapshot() call can return
// a snapshot with higher sequence.
auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
/*old_txn=*/nullptr);
ASSERT_OK(dummy_txn->Put("haha", "value"));
ASSERT_OK(dummy_txn->Commit());
delete dummy_txn;
}
auto* snapshot = db->GetSnapshot();
ASSERT_OK(txn->Commit());
delete txn;
SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::NextFromInput:SingleDelete:1", [&](void* arg) {
if (!arg) {
return;
}
db->ReleaseSnapshot(snapshot);
// Advance max_evicted_seq
ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(WritePreparedTransactionTest,
ReleaseEarliestSnapshotDuringCompaction_WithSD2) {
constexpr size_t kSnapshotCacheBits = 7; // same as default
constexpr size_t kCommitCacheBits = 0; // minimum commit cache
UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
options.disable_auto_compactions = true;
ASSERT_OK(ReOpen());
ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
ASSERT_OK(db->Flush(FlushOptions()));
auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
/*old_txn=*/nullptr);
ASSERT_OK(txn->Put("bar", "value"));
ASSERT_OK(txn->SingleDelete("key"));
ASSERT_OK(txn->SetName("txn"));
ASSERT_OK(txn->Prepare());
ASSERT_OK(db->Flush(FlushOptions()));
ASSERT_OK(txn->Commit());
delete txn;
ASSERT_OK(db->Put(WriteOptions(), "haha", "value"));
// Create a dummy transaction to take a snapshot for ww-conflict detection.
TransactionOptions txn_opts;
txn_opts.set_snapshot = true;
auto* dummy_txn =
db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::NextFromInput:SingleDelete:2", [&](void* /*arg*/) {
ASSERT_OK(dummy_txn->Rollback());
delete dummy_txn;
ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->Put(WriteOptions(), "haha2", "value"));
auto* snapshot = db->GetSnapshot();
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
db->ReleaseSnapshot(snapshot);
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(WritePreparedTransactionTest,
ReleaseEarliestSnapshotDuringCompaction_WithDelete) {
constexpr size_t kSnapshotCacheBits = 7; // same as default
constexpr size_t kCommitCacheBits = 0; // minimum commit cache
UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
options.disable_auto_compactions = true;
ASSERT_OK(ReOpen());
ASSERT_OK(db->Put(WriteOptions(), "a", "value"));
ASSERT_OK(db->Put(WriteOptions(), "b", "value"));
ASSERT_OK(db->Put(WriteOptions(), "c", "value"));
ASSERT_OK(db->Flush(FlushOptions()));
auto* txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
/*old_txn=*/nullptr);
ASSERT_OK(txn->Delete("b"));
ASSERT_OK(txn->SetName("txn"));
ASSERT_OK(txn->Prepare());
const bool two_write_queues = std::get<1>(GetParam());
if (two_write_queues) {
// In the case of two queues, commit another txn just to bump
// last_published_seq so that a subsequent GetSnapshot() call can return
// a snapshot with higher sequence.
auto* dummy_txn = db->BeginTransaction(WriteOptions(), TransactionOptions(),
/*old_txn=*/nullptr);
ASSERT_OK(dummy_txn->Put("haha", "value"));
ASSERT_OK(dummy_txn->Commit());
delete dummy_txn;
}
auto* snapshot1 = db->GetSnapshot();
ASSERT_OK(txn->Commit());
delete txn;
auto* snapshot2 = db->GetSnapshot();
SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::NextFromInput:BottommostDelete:1", [&](void* arg) {
if (!arg) {
return;
}
db->ReleaseSnapshot(snapshot1);
// Advance max_evicted_seq
ASSERT_OK(db->Put(WriteOptions(), "dummy1", "value"));
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
db->ReleaseSnapshot(snapshot2);
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(WritePreparedTransactionTest,
ReleaseSnapshotBetweenSDAndPutDuringCompaction) {
constexpr size_t kSnapshotCacheBits = 7; // same as default
constexpr size_t kCommitCacheBits = 0; // minimum commit cache
UpdateTransactionDBOptions(kSnapshotCacheBits, kCommitCacheBits);
options.disable_auto_compactions = true;
ASSERT_OK(ReOpen());
// Create a dummy transaction to take a snapshot for ww-conflict detection.
TransactionOptions txn_opts;
txn_opts.set_snapshot = true;
auto* dummy_txn =
db->BeginTransaction(WriteOptions(), txn_opts, /*old_txn=*/nullptr);
// Increment seq
ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(db->SingleDelete(WriteOptions(), "foo"));
auto* snapshot1 = db->GetSnapshot();
// Increment seq
ASSERT_OK(db->Put(WriteOptions(), "dontcare", "value"));
auto* snapshot2 = db->GetSnapshot();
SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::NextFromInput:KeepSDForWW", [&](void* /*arg*/) {
db->ReleaseSnapshot(snapshot1);
ASSERT_OK(db->Put(WriteOptions(), "dontcare2", "value2"));
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->Flush(FlushOptions()));
db->ReleaseSnapshot(snapshot2);
ASSERT_OK(dummy_txn->Commit());
delete dummy_txn;
SyncPoint::GetInstance()->ClearAllCallBacks();
}
// A more complex test to verify compaction/flush should keep keys visible // A more complex test to verify compaction/flush should keep keys visible
// to snapshots. // to snapshots.
TEST_P(WritePreparedTransactionTest, TEST_P(WritePreparedTransactionTest,

Loading…
Cancel
Save