WritePrepared: fix two versions in compaction see different status for released snapshots (#4890)

Summary:
Fix how CompactionIterator::findEarliestVisibleSnapshots handles released snapshot. It fixing the two scenarios:

Scenario 1:
key1 has two values v1 and v2. There're two snapshots s1 and s2 taken after v1 and v2 are committed. Right after compaction output v2, s1 is released. Now findEarliestVisibleSnapshot may see s1 being released, and return the next snapshot, which is s2. That's larger than v2's earliest visible snapshot, which was s1.
The fix: the only place we check against last snapshot and current key snapshot is when we decide whether to compact out a value if it is hidden by a later value. In the check if we see current snapshot is even larger than last snapshot, we know last snapshot is released, and we are safe to compact out current key.

Scenario 2:
key1 has two values v1 and v2. there are two snapshots s1 and s2 taken after v1 and v2 are committed. During compaction before we process the key, s1 is released. When compaction process v2, snapshot checker may return kSnapshotReleased, and the earliest visible snapshot for v2 become s2. When compaction process v1, snapshot checker may return kIsInSnapshot (for WritePrepared transaction, it could be because v1 is still in commit cache). The result will become inconsistent here.
The fix: remember the set of released snapshots ever reported by snapshot checker, and ignore them when finding result for findEarliestVisibleSnapshot.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4890

Differential Revision: D13705538

Pulled By: maysamyabandeh

fbshipit-source-id: e577f0d9ee1ff5a6035f26859e56902ecc85a5a4
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent 7fd9813b9f
commit b1ad6ebba8
  1. 39
      db/compaction_iterator.cc
  2. 7
      db/compaction_iterator.h
  3. 108
      utilities/transactions/write_prepared_transaction_test.cc
  4. 19
      utilities/transactions/write_prepared_txn.cc
  5. 41
      utilities/transactions/write_prepared_txn_db.h
  6. 17
      utilities/transactions/write_unprepared_txn.cc
  7. 38
      utilities/transactions/write_unprepared_txn_db.h

@ -258,6 +258,7 @@ void CompactionIterator::NextFromInput() {
valid_ = true; valid_ = true;
break; break;
} }
TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
// Update input statistics // Update input statistics
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
@ -473,17 +474,30 @@ void CompactionIterator::NextFromInput() {
if (valid_) { if (valid_) {
at_next_ = true; at_next_ = true;
} }
} else if (last_snapshot == current_user_key_snapshot_) { } else if (last_snapshot == current_user_key_snapshot_ ||
(last_snapshot > 0 &&
last_snapshot < current_user_key_snapshot_)) {
// If the earliest snapshot is which this key is visible in // If the earliest snapshot is which this key is visible in
// is the same as the visibility of a previous instance of the // is the same as the visibility of a previous instance of the
// same key, then this kv is not visible in any snapshot. // same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key // Hidden by an newer entry for same user key
// TODO(noetzli): why not > ?
// //
// Note: Dropping this key will not affect TransactionDB write-conflict // Note: Dropping this key will not affect TransactionDB write-conflict
// checking since there has already been a record returned for this key // checking since there has already been a record returned for this key
// in this snapshot. // in this snapshot.
assert(last_sequence >= current_user_key_sequence_); assert(last_sequence >= current_user_key_sequence_);
// Note2: if last_snapshot < current_user_key_snapshot, it can only
// mean last_snapshot is released between we process last value and
// this value, and findEarliestVisibleSnapshot returns the next snapshot
// as current_user_key_snapshot. In this case last value and current
// value are both in current_user_key_snapshot currently.
assert(last_snapshot == current_user_key_snapshot_ ||
(snapshot_checker_ != nullptr &&
snapshot_checker_->CheckInSnapshot(current_user_key_sequence_,
last_snapshot) ==
SnapshotCheckerResult::kSnapshotReleased));
++iter_stats_.num_record_drop_hidden; // (A) ++iter_stats_.num_record_drop_hidden; // (A)
input_->Next(); input_->Next();
} else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
@ -639,13 +653,23 @@ inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
*prev_snapshot = *std::prev(snapshots_iter); *prev_snapshot = *std::prev(snapshots_iter);
assert(*prev_snapshot < in); assert(*prev_snapshot < in);
} }
if (snapshot_checker_ == nullptr) {
return snapshots_iter != snapshots_->end()
? *snapshots_iter : kMaxSequenceNumber;
}
bool has_released_snapshot = !released_snapshots_.empty();
for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) { for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
auto cur = *snapshots_iter; auto cur = *snapshots_iter;
assert(in <= cur); assert(in <= cur);
if (snapshot_checker_ == nullptr || // Skip if cur is in released_snapshots.
snapshot_checker_->CheckInSnapshot(in, cur) == if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
SnapshotCheckerResult::kInSnapshot) { continue;
}
auto res = snapshot_checker_->CheckInSnapshot(in, cur);
if (res == SnapshotCheckerResult::kInSnapshot) {
return cur; return cur;
} else if (res == SnapshotCheckerResult::kSnapshotReleased) {
released_snapshots_.insert(cur);
} }
*prev_snapshot = cur; *prev_snapshot = cur;
} }
@ -667,7 +691,12 @@ bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) {
auto in_snapshot = auto in_snapshot =
snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_); snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) { while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) {
// Avoid the the current earliest_snapshot_ being return as
// earliest visible snapshot for the next value. So if a value's sequence
// is zero-ed out by PrepareOutput(), the next value will be compact out.
released_snapshots_.insert(earliest_snapshot_);
earliest_snapshot_iter_++; earliest_snapshot_iter_++;
if (earliest_snapshot_iter_ == snapshots_->end()) { if (earliest_snapshot_iter_ == snapshots_->end()) {
earliest_snapshot_ = kMaxSequenceNumber; earliest_snapshot_ = kMaxSequenceNumber;
} else { } else {

@ -7,6 +7,7 @@
#include <algorithm> #include <algorithm>
#include <deque> #include <deque>
#include <string> #include <string>
#include <unordered_set>
#include <vector> #include <vector>
#include "db/compaction.h" #include "db/compaction.h"
@ -144,6 +145,12 @@ class CompactionIterator {
const Comparator* cmp_; const Comparator* cmp_;
MergeHelper* merge_helper_; MergeHelper* merge_helper_;
const std::vector<SequenceNumber>* snapshots_; const std::vector<SequenceNumber>* snapshots_;
// List of snapshots released during compaction.
// findEarliestVisibleSnapshot() find them out from return of
// snapshot_checker, and make sure they will not be returned as
// earliest visible snapshot of an older value.
// See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
std::unordered_set<SequenceNumber> released_snapshots_;
std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_; std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_;
const SequenceNumber earliest_write_conflict_snapshot_; const SequenceNumber earliest_write_conflict_snapshot_;
const SnapshotChecker* const snapshot_checker_; const SnapshotChecker* const snapshot_checker_;

@ -2242,6 +2242,10 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
} }
} }
// Insert two values, v1 and v2, for a key. Between prepare and commit of v2
// take two snapshots, s1 and s2. Release s1 during compaction.
// Test to make sure compaction doesn't get confused and think s1 can see both
// values, and thus compact out the older value by mistake.
TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) { TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
const size_t snapshot_cache_bits = 7; // same as default const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache const size_t commit_cache_bits = 0; // minimum commit cache
@ -2285,6 +2289,110 @@ TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
} }
// Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2,
// after committing v2. Release s1 during compaction, right after compaction
// processes v2 and before processes v1. Test to make sure compaction doesn't
// get confused and believe v1 and v2 are visible to different snapshot
// (v1 by s2, v2 by s1) and refuse to compact out v1.
TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits);
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
ASSERT_OK(db->Put(WriteOptions(), "key1", "value2"));
SequenceNumber v2_seq = db->GetLatestSequenceNumber();
auto* s1 = db->GetSnapshot();
// Advance sequence number.
ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy"));
auto* s2 = db->GetSnapshot();
int count_value = 0;
auto callback = [&](void* arg) {
auto* ikey = reinterpret_cast<ParsedInternalKey*>(arg);
if (ikey->user_key == "key1") {
count_value++;
if (count_value == 2) {
// Processing v1.
db->ReleaseSnapshot(s1);
// Add some keys to advance max_evicted_seq and update
// old_commit_map.
ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy"));
}
}
};
SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV",
callback);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->Flush(FlushOptions()));
// value1 should be compact out.
VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
// cleanup
db->ReleaseSnapshot(s2);
SyncPoint::GetInstance()->ClearAllCallBacks();
}
// Insert two values, v1 and v2, for a key. Insert another dummy key
// so to evict the commit cache for v2, while v1 is still in commit cache.
// Take two snapshots, s1 and s2. Release s1 during compaction.
// Since commit cache for v2 is evicted, and old_commit_map don't have
// s1 (it is released),
// TODO(myabandeh): how can we be sure that the v2's commit info is evicted
// (and not v1's)? Instead of putting a dummy, we can directly call
// AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache.
TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 1; // commit cache size = 2
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits);
// Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
// It also advance max_evicted_seq and can trigger old_commit_map cleanup.
auto add_dummy = [&]() {
auto* txn_dummy =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(txn_dummy->SetName("txn_dummy"));
ASSERT_OK(txn_dummy->Put("dummy", "dummy"));
ASSERT_OK(txn_dummy->Prepare());
ASSERT_OK(txn_dummy->Commit());
delete txn_dummy;
};
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* txn =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(txn->SetName("txn"));
ASSERT_OK(txn->Put("key1", "value2"));
ASSERT_OK(txn->Prepare());
// TODO(myabandeh): replace it with GetId()?
auto v2_seq = db->GetLatestSequenceNumber();
ASSERT_OK(txn->Commit());
delete txn;
auto* s1 = db->GetSnapshot();
// Dummy key to advance sequence number.
add_dummy();
auto* s2 = db->GetSnapshot();
auto callback = [&](void*) {
db->ReleaseSnapshot(s1);
// Add some dummy entries to trigger s1 being cleanup from old_commit_map.
add_dummy();
add_dummy();
};
SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
callback);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->Flush(FlushOptions()));
// value1 should be compact out.
VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
db->ReleaseSnapshot(s2);
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) { TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
const size_t snapshot_cache_bits = 7; // same as default const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache const size_t commit_cache_bits = 0; // minimum commit cache

@ -357,9 +357,8 @@ Status WritePreparedTxn::RollbackInternal() {
prepare_seq); prepare_seq);
// Commit the batch by writing an empty batch to the queue that will release // Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers. // the commit sequence number to readers.
const size_t ZERO_COMMITS = 0; WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( wpt_db_, db_impl_, GetId(), prepare_seq, prepare_batch_cnt_);
wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS);
WriteBatch empty_batch; WriteBatch empty_batch;
empty_batch.PutLogData(Slice()); empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator // In the absence of Prepare markers, use Noop as a batch separator
@ -368,18 +367,10 @@ Status WritePreparedTxn::RollbackInternal() {
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare); &update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Mark the txn as rolled back ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
uint64_t& rollback_seq = seq_used; "RollbackInternal (status=%s) commit: %" PRIu64,
s.ToString().c_str(), GetId());
if (s.ok()) { if (s.ok()) {
// Note: it is safe to do it after PreReleaseCallback via WriteImpl since
// all the writes by the prpared batch are already blinded by the rollback
// batch. The only reason we commit the prepared batch here is to benefit
// from the existing mechanism in CommitCache that takes care of the rare
// cases that the prepare seq is visible to a snsapshot but max evicted seq
// advances that prepare seq.
for (size_t i = 0; i < prepare_batch_cnt_; i++) {
wpt_db_->AddCommitted(GetId() + i, rollback_seq);
}
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_); wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
} }

@ -791,6 +791,47 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
bool publish_seq_; bool publish_seq_;
}; };
// For two_write_queues commit both the aborted batch and the cleanup batch and then published the seq
class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
public:
WritePreparedRollbackPreReleaseCallback(WritePreparedTxnDB* db,
DBImpl* db_impl,
SequenceNumber prep_seq,
SequenceNumber rollback_seq,
size_t prep_batch_cnt)
: db_(db),
db_impl_(db_impl),
prep_seq_(prep_seq),
rollback_seq_(rollback_seq),
prep_batch_cnt_(prep_batch_cnt) {
assert(prep_seq != kMaxSequenceNumber);
assert(rollback_seq != kMaxSequenceNumber);
assert(prep_batch_cnt_ > 0);
}
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled) override {
assert(is_mem_disabled); // implies the 2nd queue
#ifdef NDEBUG
(void)is_mem_disabled;
#endif
const uint64_t last_commit_seq = commit_seq;
db_->AddCommitted(rollback_seq_, last_commit_seq);
for (size_t i = 0; i < prep_batch_cnt_; i++) {
db_->AddCommitted(prep_seq_ + i, last_commit_seq);
}
db_impl_->SetLastPublishedSequence(last_commit_seq);
return Status::OK();
}
private:
WritePreparedTxnDB* db_;
DBImpl* db_impl_;
SequenceNumber prep_seq_;
SequenceNumber rollback_seq_;
size_t prep_batch_cnt_;
};
// Count the number of sub-batches inside a batch. A sub-batch does not have // Count the number of sub-batches inside a batch. A sub-batch does not have
// duplicate keys. // duplicate keys.
struct SubBatchCounter : public WriteBatch::Handler { struct SubBatchCounter : public WriteBatch::Handler {

@ -447,9 +447,8 @@ Status WriteUnpreparedTxn::RollbackInternal() {
prepare_seq); prepare_seq);
// Commit the batch by writing an empty batch to the queue that will release // Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers. // the commit sequence number to readers.
const size_t ZERO_COMMITS = 0; WriteUnpreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( wpt_db_, db_impl_, unprep_seqs_, prepare_seq);
wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS);
WriteBatch empty_batch; WriteBatch empty_batch;
empty_batch.PutLogData(Slice()); empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator // In the absence of Prepare markers, use Noop as a batch separator
@ -459,19 +458,7 @@ Status WriteUnpreparedTxn::RollbackInternal() {
&update_commit_map_with_prepare); &update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Mark the txn as rolled back // Mark the txn as rolled back
uint64_t& rollback_seq = seq_used;
if (s.ok()) { if (s.ok()) {
// Note: it is safe to do it after PreReleaseCallback via WriteImpl since
// all the writes by the prpared batch are already blinded by the rollback
// batch. The only reason we commit the prepared batch here is to benefit
// from the existing mechanism in CommitCache that takes care of the rare
// cases that the prepare seq is visible to a snsapshot but max evicted seq
// advances that prepare seq.
for (const auto& seq : unprep_seqs_) {
for (size_t i = 0; i < seq.second; i++) {
wpt_db_->AddCommitted(seq.first + i, rollback_seq);
}
}
for (const auto& seq : unprep_seqs_) { for (const auto& seq : unprep_seqs_) {
wpt_db_->RemovePrepared(seq.first, seq.second); wpt_db_->RemovePrepared(seq.first, seq.second);
} }

@ -106,6 +106,44 @@ class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
bool publish_seq_; bool publish_seq_;
}; };
class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback {
// TODO(lth): Reduce code duplication with
// WritePreparedCommitEntryPreReleaseCallback
public:
WriteUnpreparedRollbackPreReleaseCallback(
WritePreparedTxnDB* db, DBImpl* db_impl,
const std::map<SequenceNumber, size_t>& unprep_seqs,
SequenceNumber rollback_seq)
: db_(db),
db_impl_(db_impl),
unprep_seqs_(unprep_seqs),
rollback_seq_(rollback_seq) {
assert(unprep_seqs.size() > 0);
assert(db_impl_->immutable_db_options().two_write_queues);
}
virtual Status Callback(SequenceNumber commit_seq, bool is_mem_disabled
__attribute__((__unused__))) override {
assert(is_mem_disabled); // implies the 2nd queue
const uint64_t last_commit_seq = commit_seq;
db_->AddCommitted(rollback_seq_, last_commit_seq);
// Recall that unprep_seqs maps (un)prepared_seq => prepare_batch_cnt.
for (const auto& s : unprep_seqs_) {
for (size_t i = 0; i < s.second; i++) {
db_->AddCommitted(s.first + i, last_commit_seq);
}
}
db_impl_->SetLastPublishedSequence(last_commit_seq);
return Status::OK();
}
private:
WritePreparedTxnDB* db_;
DBImpl* db_impl_;
const std::map<SequenceNumber, size_t>& unprep_seqs_;
SequenceNumber rollback_seq_;
};
struct KeySetBuilder : public WriteBatch::Handler { struct KeySetBuilder : public WriteBatch::Handler {
WriteUnpreparedTxn* txn_; WriteUnpreparedTxn* txn_;
bool rollback_merge_operands_; bool rollback_merge_operands_;

Loading…
Cancel
Save