WriteUnPrepared: Use WriteUnpreparedTxnReadCallback for MultiGet (#5634)

Summary:
The `TransactionTest.MultiGetBatchedTest` were failing with unprepared batches because we were not using the correct callbacks. Override MultiGet to pass down the correct ReadCallback. A similar problem is also fixed in WritePrepared.

This PR also fixes an issue similar to (https://github.com/facebook/rocksdb/pull/5147), but for MultiGet instead of Get.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5634

Differential Revision: D16552674

Pulled By: lth

fbshipit-source-id: 736eaf8e919c6b13d5f5655b1c0d36b57ad04804
main
Manuel Ung 5 years ago committed by Facebook Github Bot
parent e648c1d9eb
commit 399f477818
  1. 18
      db/db_impl/db_impl.cc
  2. 19
      utilities/transactions/write_prepared_txn.cc
  3. 7
      utilities/transactions/write_prepared_txn.h
  4. 20
      utilities/transactions/write_unprepared_txn.cc
  5. 7
      utilities/transactions/write_unprepared_txn.h

@ -1857,6 +1857,24 @@ void DBImpl::MultiGetImpl(
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
if (callback) {
// The unprep_seqs are not published for write unprepared, so it could be
// that max_visible_seq is larger. Seek to the std::max of the two.
// However, we still want our callback to contain the actual snapshot so
// that it can do the correct visibility filtering.
callback->Refresh(snapshot);
// Internally, WriteUnpreparedTxnReadCallback::Refresh would set
// max_visible_seq = max(max_visible_seq, snapshot)
//
// Currently, the commented out assert is broken by
// InvalidSnapshotReadCallback, but if write unprepared recovery followed
// the regular transaction flow, then this special read callback would not
// be needed.
//
// assert(callback->max_visible_seq() >= snapshot);
snapshot = callback->max_visible_seq();
}
}
// For each of the given keys, apply the entire "get" process as follows:

@ -40,6 +40,25 @@ void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
prepare_batch_cnt_ = 0;
}
void WritePreparedTxn::MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
bool sorted_input) {
SequenceNumber min_uncommitted, snap_seq;
const bool backed_by_snapshot =
wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted);
write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
keys, values, statuses, sorted_input,
&callback);
if (UNLIKELY(!wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
for (size_t i = 0; i < num_keys; i++) {
statuses[i] = Status::TryAgain();
}
}
}
Status WritePreparedTxn::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {

@ -53,6 +53,13 @@ class WritePreparedTxn : public PessimisticTransaction {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using Transaction::MultiGet;
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
bool sorted_input = false) override;
// Note: The behavior is undefined in presence of interleaved writes to the
// same transaction.
// To make WAL commit markers visible, the snapshot will be

@ -524,6 +524,26 @@ void WriteUnpreparedTxn::Clear() {
TransactionBaseImpl::Clear();
}
void WriteUnpreparedTxn::MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
bool sorted_input) {
SequenceNumber min_uncommitted, snap_seq;
const bool backed_by_snapshot =
wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
unprep_seqs_);
write_batch_.MultiGetFromBatchAndDB(db_, options, column_family, num_keys,
keys, values, statuses, sorted_input,
&callback);
if (UNLIKELY(!wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
for (size_t i = 0; i < num_keys; i++) {
statuses[i] = Status::TryAgain();
}
}
}
Status WriteUnpreparedTxn::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {

@ -146,6 +146,13 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using Transaction::MultiGet;
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
bool sorted_input = false) override;
using Transaction::GetIterator;
virtual Iterator* GetIterator(const ReadOptions& options) override;
virtual Iterator* GetIterator(const ReadOptions& options,

Loading…
Cancel
Save