WritePrepared: fix issue with snapshot released during compaction (#4858)

Summary:
Compaction iterator keep a copy of list of live snapshots at the beginning of compaction, and then query snapshot checker to verify if values of a sequence number is visible to these snapshots. However when the snapshot is released in the middle of compaction, the snapshot checker implementation (i.e. WritePreparedSnapshotChecker) may remove info with the snapshot and may report incorrect result, which lead to values being compacted out when it shouldn't. This patch conservatively keep the values if snapshot checker determines that the snapshots is released.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4858

Differential Revision: D13617146

Pulled By: maysamyabandeh

fbshipit-source-id: cf18a94f6f61a94bcff73c280f117b224af5fbc3
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent e79df377c5
commit 128f532858
  1. 99
      db/compaction_iterator.cc
  2. 10
      db/compaction_iterator.h
  3. 12
      db/compaction_iterator_test.cc
  4. 14
      db/db_merge_operator_test.cc
  5. 8
      db/merge_helper.cc
  6. 24
      db/snapshot_checker.h
  7. 16
      utilities/transactions/snapshot_checker.cc
  8. 95
      utilities/transactions/write_prepared_transaction_test.cc
  9. 2
      utilities/transactions/write_prepared_txn_db.h

@ -9,6 +9,23 @@
#include "port/likely.h"
#include "rocksdb/listener.h"
#include "table/internal_iterator.h"
#include "util/sync_point.h"
#define DEFINITELY_IN_SNAPSHOT(seq, snapshot) \
((seq) <= (snapshot) && \
(snapshot_checker_ == nullptr || \
LIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \
SnapshotCheckerResult::kInSnapshot)))
#define DEFINITELY_NOT_IN_SNAPSHOT(seq, snapshot) \
((seq) > (snapshot) || \
(snapshot_checker_ != nullptr && \
UNLIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \
SnapshotCheckerResult::kNotInSnapshot)))
#define IN_EARLIEST_SNAPSHOT(seq) \
((seq) <= earliest_snapshot_ && \
(snapshot_checker_ == nullptr || LIKELY(IsInEarliestSnapshot(seq))))
namespace rocksdb {
@ -61,19 +78,21 @@ CompactionIterator::CompactionIterator(
merge_out_iter_(merge_helper_),
current_key_committed_(false) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
assert(snapshots_ != nullptr);
bottommost_level_ =
compaction_ == nullptr ? false : compaction_->bottommost_level();
if (compaction_ != nullptr) {
level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
}
if (snapshots_->size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip_ = true;
earliest_snapshot_iter_ = snapshots_->end();
earliest_snapshot_ = kMaxSequenceNumber;
latest_snapshot_ = 0;
} else {
visible_at_tip_ = false;
earliest_snapshot_iter_ = snapshots_->begin();
earliest_snapshot_ = snapshots_->at(0);
latest_snapshot_ = snapshots_->back();
}
@ -163,10 +182,7 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
if (compaction_filter_ != nullptr &&
(ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex) &&
(visible_at_tip_ || ignore_snapshots_ ||
ikey_.sequence > latest_snapshot_ ||
(snapshot_checker_ != nullptr &&
UNLIKELY(!snapshot_checker_->IsInSnapshot(ikey_.sequence,
latest_snapshot_))))) {
DEFINITELY_NOT_IN_SNAPSHOT(ikey_.sequence, latest_snapshot_))) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true,
@ -270,9 +286,7 @@ void CompactionIterator::NextFromInput() {
has_outputted_key_ = false;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
current_key_committed_ =
(snapshot_checker_ == nullptr ||
snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber));
current_key_committed_ = KeyCommitted(ikey_.sequence);
// Apply the compaction filter to the first committed version of the user
// key.
@ -294,8 +308,7 @@ void CompactionIterator::NextFromInput() {
// to query snapshot_checker_ in that case.
if (UNLIKELY(!current_key_committed_)) {
assert(snapshot_checker_ != nullptr);
current_key_committed_ =
snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber);
current_key_committed_ = KeyCommitted(ikey_.sequence);
// Apply the compaction filter to the first committed version of the
// user key.
if (current_key_committed_) {
@ -379,10 +392,8 @@ void CompactionIterator::NextFromInput() {
cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
// Check whether the next key belongs to the same snapshot as the
// SingleDelete.
if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot ||
(snapshot_checker_ != nullptr &&
UNLIKELY(!snapshot_checker_->IsInSnapshot(next_ikey.sequence,
prev_snapshot)))) {
if (prev_snapshot == 0 ||
DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot)) {
if (next_ikey.type == kTypeSingleDeletion) {
// We encountered two SingleDeletes in a row. This could be due to
// unexpected user input.
@ -394,11 +405,8 @@ void CompactionIterator::NextFromInput() {
++iter_stats_.num_record_drop_obsolete;
++iter_stats_.num_single_del_mismatch;
} else if (has_outputted_key_ ||
(ikey_.sequence <= earliest_write_conflict_snapshot_ &&
(snapshot_checker_ == nullptr ||
LIKELY(snapshot_checker_->IsInSnapshot(
ikey_.sequence,
earliest_write_conflict_snapshot_))))) {
DEFINITELY_IN_SNAPSHOT(
ikey_.sequence, earliest_write_conflict_snapshot_)) {
// Found a matching value, we can drop the single delete and the
// value. It is safe to drop both records since we've already
// outputted a key in this snapshot, or there is no earlier
@ -446,10 +454,7 @@ void CompactionIterator::NextFromInput() {
// iteration. If the next key is corrupt, we return before the
// comparison, so the value of has_current_user_key does not matter.
has_current_user_key_ = false;
if (compaction_ != nullptr && ikey_.sequence <= earliest_snapshot_ &&
(snapshot_checker_ == nullptr ||
LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence,
earliest_snapshot_))) &&
if (compaction_ != nullptr && IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) {
// Key doesn't exist outside of this range.
@ -482,10 +487,7 @@ void CompactionIterator::NextFromInput() {
++iter_stats_.num_record_drop_hidden; // (A)
input_->Next();
} else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
ikey_.sequence <= earliest_snapshot_ &&
(snapshot_checker_ == nullptr ||
LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence,
earliest_snapshot_))) &&
IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
ikeyNotNeededForIncrementalSnapshot() &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) {
@ -522,13 +524,10 @@ void CompactionIterator::NextFromInput() {
input_->Next();
// Skip over all versions of this key that happen to occur in the same snapshot
// range as the delete
while (input_->Valid() &&
ParseInternalKey(input_->key(), &next_ikey) &&
while (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
cmp_->Equal(ikey_.user_key, next_ikey.user_key) &&
(prev_snapshot == 0 || next_ikey.sequence > prev_snapshot ||
(snapshot_checker_ != nullptr &&
UNLIKELY(!snapshot_checker_->IsInSnapshot(next_ikey.sequence,
prev_snapshot))))) {
(prev_snapshot == 0 ||
DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) {
input_->Next();
}
// If you find you still need to output a row with this key, we need to output the
@ -619,13 +618,9 @@ void CompactionIterator::PrepareOutput() {
//
// Can we do the same for levels above bottom level as long as
// KeyNotExistsBeyondOutputLevel() return true?
if ((compaction_ != nullptr &&
!compaction_->allow_ingest_behind()) &&
ikeyNotNeededForIncrementalSnapshot() &&
bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ &&
(snapshot_checker_ == nullptr || LIKELY(snapshot_checker_->IsInSnapshot(
ikey_.sequence, earliest_snapshot_))) &&
ikey_.type != kTypeMerge &&
if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && valid_ &&
IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge &&
!cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
ikey_.sequence = 0;
@ -648,7 +643,8 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
auto cur = *snapshots_iter;
assert(in <= cur);
if (snapshot_checker_ == nullptr ||
snapshot_checker_->IsInSnapshot(in, cur)) {
snapshot_checker_->CheckInSnapshot(in, cur) ==
SnapshotCheckerResult::kInSnapshot) {
return cur;
}
*prev_snapshot = cur;
@ -663,4 +659,25 @@ inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
(ikey_.sequence < preserve_deletes_seqnum_);
}
bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) {
assert(snapshot_checker_ != nullptr);
assert(earliest_snapshot_ == kMaxSequenceNumber ||
(earliest_snapshot_iter_ != snapshots_->end() &&
*earliest_snapshot_iter_ == earliest_snapshot_));
auto in_snapshot =
snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) {
earliest_snapshot_iter_++;
if (earliest_snapshot_iter_ == snapshots_->end()) {
earliest_snapshot_ = kMaxSequenceNumber;
} else {
earliest_snapshot_ = *earliest_snapshot_iter_;
}
in_snapshot =
snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
}
assert(in_snapshot != SnapshotCheckerResult::kSnapshotReleased);
return in_snapshot == SnapshotCheckerResult::kInSnapshot;
}
} // namespace rocksdb

@ -132,10 +132,19 @@ class CompactionIterator {
// or seqnum be zero-ed out even if all other conditions for it are met.
inline bool ikeyNotNeededForIncrementalSnapshot();
inline bool KeyCommitted(SequenceNumber sequence) {
return snapshot_checker_ == nullptr ||
snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) ==
SnapshotCheckerResult::kInSnapshot;
}
bool IsInEarliestSnapshot(SequenceNumber sequence);
InternalIterator* input_;
const Comparator* cmp_;
MergeHelper* merge_helper_;
const std::vector<SequenceNumber>* snapshots_;
std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_;
const SequenceNumber earliest_write_conflict_snapshot_;
const SnapshotChecker* const snapshot_checker_;
Env* env_;
@ -151,6 +160,7 @@ class CompactionIterator {
bool visible_at_tip_;
SequenceNumber earliest_snapshot_;
SequenceNumber latest_snapshot_;
bool ignore_snapshots_;
// State

@ -190,13 +190,17 @@ class TestSnapshotChecker : public SnapshotChecker {
: last_committed_sequence_(last_committed_sequence),
snapshots_(snapshots) {}
bool IsInSnapshot(SequenceNumber seq,
SequenceNumber snapshot_seq) const override {
SnapshotCheckerResult CheckInSnapshot(
SequenceNumber seq, SequenceNumber snapshot_seq) const override {
if (snapshot_seq == kMaxSequenceNumber) {
return seq <= last_committed_sequence_;
return seq <= last_committed_sequence_
? SnapshotCheckerResult::kInSnapshot
: SnapshotCheckerResult::kNotInSnapshot;
}
assert(snapshots_.count(snapshot_seq) > 0);
return seq <= snapshots_.at(snapshot_seq);
return seq <= snapshots_.at(snapshot_seq)
? SnapshotCheckerResult::kInSnapshot
: SnapshotCheckerResult::kNotInSnapshot;
}
private:

@ -21,7 +21,8 @@ class TestReadCallback : public ReadCallback {
: snapshot_checker_(snapshot_checker), snapshot_seq_(snapshot_seq) {}
bool IsVisible(SequenceNumber seq) override {
return snapshot_checker_->IsInSnapshot(seq, snapshot_seq_);
return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==
SnapshotCheckerResult::kInSnapshot;
}
private:
@ -547,8 +548,15 @@ TEST_F(DBMergeOperatorTest, SnapshotCheckerAndReadCallback) {
DestroyAndReopen(options);
class TestSnapshotChecker : public SnapshotChecker {
bool IsInSnapshot(SequenceNumber seq,
SequenceNumber snapshot_seq) const override {
public:
SnapshotCheckerResult CheckInSnapshot(
SequenceNumber seq, SequenceNumber snapshot_seq) const override {
return IsInSnapshot(seq, snapshot_seq)
? SnapshotCheckerResult::kInSnapshot
: SnapshotCheckerResult::kNotInSnapshot;
}
bool IsInSnapshot(SequenceNumber seq, SequenceNumber snapshot_seq) const {
switch (snapshot_seq) {
case 0:
return seq == 0;

@ -166,9 +166,11 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
break;
} else if (stop_before > 0 && ikey.sequence <= stop_before &&
LIKELY(snapshot_checker_ == nullptr ||
snapshot_checker_->IsInSnapshot(ikey.sequence,
stop_before))) {
// hit an entry that's visible by the previous snapshot, can't touch that
snapshot_checker_->CheckInSnapshot(ikey.sequence,
stop_before) !=
SnapshotCheckerResult::kNotInSnapshot)) {
// hit an entry that's possibly visible by the previous snapshot, can't
// touch that
break;
}

@ -8,22 +8,30 @@
namespace rocksdb {
// Callback class that control GC of duplicate keys in flush/compaction
enum class SnapshotCheckerResult : int {
kInSnapshot = 0,
kNotInSnapshot = 1,
// In case snapshot is released and the checker has no clue whether
// the given sequence is visible to the snapshot.
kSnapshotReleased = 2,
};
// Callback class that control GC of duplicate keys in flush/compaction.
class SnapshotChecker {
public:
virtual ~SnapshotChecker() {}
virtual bool IsInSnapshot(SequenceNumber sequence,
SequenceNumber snapshot_sequence) const = 0;
virtual SnapshotCheckerResult CheckInSnapshot(
SequenceNumber sequence, SequenceNumber snapshot_sequence) const = 0;
};
class DisableGCSnapshotChecker : public SnapshotChecker {
public:
virtual ~DisableGCSnapshotChecker() {}
virtual bool IsInSnapshot(
virtual SnapshotCheckerResult CheckInSnapshot(
SequenceNumber /*sequence*/,
SequenceNumber /*snapshot_sequence*/) const override {
// By returning false, we prevent all the values from being GCed
return false;
// By returning kNotInSnapshot, we prevent all the values from being GCed
return SnapshotCheckerResult::kNotInSnapshot;
}
static DisableGCSnapshotChecker* Instance() { return &instance_; }
@ -41,8 +49,8 @@ class WritePreparedSnapshotChecker : public SnapshotChecker {
explicit WritePreparedSnapshotChecker(WritePreparedTxnDB* txn_db);
virtual ~WritePreparedSnapshotChecker() {}
virtual bool IsInSnapshot(SequenceNumber sequence,
SequenceNumber snapshot_sequence) const override;
virtual SnapshotCheckerResult CheckInSnapshot(
SequenceNumber sequence, SequenceNumber snapshot_sequence) const override;
private:
#ifndef ROCKSDB_LITE

@ -17,11 +17,11 @@ namespace rocksdb {
WritePreparedSnapshotChecker::WritePreparedSnapshotChecker(
WritePreparedTxnDB* /*txn_db*/) {}
bool WritePreparedSnapshotChecker::IsInSnapshot(
SnapshotCheckerResult WritePreparedSnapshotChecker::CheckInSnapshot(
SequenceNumber /*sequence*/, SequenceNumber /*snapshot_sequence*/) const {
// Should never be called in LITE mode.
assert(false);
return true;
return SnapshotCheckerResult::kInSnapshot;
}
#else
@ -30,9 +30,17 @@ WritePreparedSnapshotChecker::WritePreparedSnapshotChecker(
WritePreparedTxnDB* txn_db)
: txn_db_(txn_db){};
bool WritePreparedSnapshotChecker::IsInSnapshot(
SnapshotCheckerResult WritePreparedSnapshotChecker::CheckInSnapshot(
SequenceNumber sequence, SequenceNumber snapshot_sequence) const {
return txn_db_->IsInSnapshot(sequence, snapshot_sequence);
bool snapshot_released = false;
// TODO(myabandeh): set min_uncommitted
bool in_snapshot = txn_db_->IsInSnapshot(
sequence, snapshot_sequence, 0 /*min_uncommitted*/, &snapshot_released);
if (snapshot_released) {
return SnapshotCheckerResult::kSnapshotReleased;
}
return in_snapshot ? SnapshotCheckerResult::kInSnapshot
: SnapshotCheckerResult::kNotInSnapshot;
}
#endif // ROCKSDB_LITE

@ -2243,6 +2243,101 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
}
}
TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits);
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1"));
auto* transaction =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Put("key1", "value1_2"));
ASSERT_OK(transaction->Prepare());
auto snapshot1 = db->GetSnapshot();
// Increment sequence number.
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
auto snapshot2 = db->GetSnapshot();
ASSERT_OK(transaction->Commit());
delete transaction;
VerifyKeys({{"key1", "value1_2"}});
VerifyKeys({{"key1", "value1_1"}}, snapshot1);
VerifyKeys({{"key1", "value1_1"}}, snapshot2);
// Add a flush to avoid compaction to fallback to trivial move.
auto callback = [&](void*) {
// Release snapshot1 after CompactionIterator init.
// CompactionIterator need to figure out the earliest snapshot
// that can see key1:value1_2 is kMaxSequenceNumber, not
// snapshot1 or snapshot2.
db->ReleaseSnapshot(snapshot1);
// Add some keys to advance max_evicted_seq.
ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
};
SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
callback);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->Flush(FlushOptions()));
VerifyKeys({{"key1", "value1_2"}});
VerifyKeys({{"key1", "value1_1"}}, snapshot2);
db->ReleaseSnapshot(snapshot2);
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits);
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* transaction =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Delete("key1"));
ASSERT_OK(transaction->Prepare());
SequenceNumber del_seq = db->GetLatestSequenceNumber();
auto snapshot1 = db->GetSnapshot();
// Increment sequence number.
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
auto snapshot2 = db->GetSnapshot();
ASSERT_OK(transaction->Commit());
delete transaction;
VerifyKeys({{"key1", "NOT_FOUND"}});
VerifyKeys({{"key1", "value1"}}, snapshot1);
VerifyKeys({{"key1", "value1"}}, snapshot2);
ASSERT_OK(db->Flush(FlushOptions()));
auto callback = [&](void* compaction) {
// Release snapshot1 after CompactionIterator init.
// CompactionIterator need to double check and find out snapshot2 is now
// the earliest existing snapshot.
if (compaction != nullptr) {
db->ReleaseSnapshot(snapshot1);
// Add some keys to advance max_evicted_seq.
ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
}
};
SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
callback);
SyncPoint::GetInstance()->EnableProcessing();
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Only verify for key1. Both the put and delete for the key should be kept.
// Since the delete tombstone is not visible to snapshot2, we need to keep
// at least one version of the key, for write-conflict check.
VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
{"key1", "value1", 0, kTypeValue}});
db->ReleaseSnapshot(snapshot2);
SyncPoint::GetInstance()->ClearAllCallBacks();
}
// A more complex test to verify compaction/flush should keep keys visible
// to snapshots.
TEST_P(WritePreparedTransactionTest,

@ -124,6 +124,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
"IsInSnapshot %" PRIu64 " in %" PRIu64
" min_uncommitted %" PRIu64,
prep_seq, snapshot_seq, min_uncommitted);
// Caller is responsible to initialize snap_released.
assert(snap_released == nullptr || *snap_released == false);
// Here we try to infer the return value without looking into prepare list.
// This would help avoiding synchronization over a shared map.
// TODO(myabandeh): optimize this. This sequence of checks must be correct

Loading…
Cancel
Save