WriteUnPrepared: less virtual in iterator callback (#5049)

Summary:
WriteUnPrepared adds a virtual function, MaxUnpreparedSequenceNumber, to ReadCallback, which returns 0 unless WriteUnPrepared is enabled and the transaction has uncommitted data written to the DB. Together with snapshot sequence number, this determines the last sequence that is visible to reads.
The patch clarifies the guarantees of the GetIterator API in WriteUnPrepared transactions and make use of that to statically initialize the read callback and thus avoid the virtual call.
Furthermore it increases the minimum value for min_uncommitted from 0 to 1 as seq 0 is used only for last level keys that are committed in all snapshots.

The following benchmark shows +0.26% higher throughput in seekrandom benchmark.

Benchmark:
./db_bench --benchmarks=fillrandom --use_existing_db=0 --num=1000000 --db=/dev/shm/dbbench

./db_bench --benchmarks=seekrandom[X10] --use_existing_db=1 --db=/dev/shm/dbbench --num=1000000 --duration=60 --seek_nexts=100
seekrandom [AVG    10 runs] : 20355 ops/sec;  225.2 MB/sec
seekrandom [MEDIAN 10 runs] : 20425 ops/sec;  225.9 MB/sec

./db_bench_lessvirtual3 --benchmarks=seekrandom[X10] --use_existing_db=1 --db=/dev/shm/dbbench --num=1000000 --duration=60 --seek_nexts=100
seekrandom [AVG    10 runs] : 20409 ops/sec;  225.8 MB/sec
seekrandom [MEDIAN 10 runs] : 20487 ops/sec;  226.6 MB/sec
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5049

Differential Revision: D14366459

Pulled By: maysamyabandeh

fbshipit-source-id: ebaff8908332a5ae9af7defeadabcb624be660ef
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent d9d3cacaf5
commit 14b3f683a1
  1. 2
      db/db_impl.cc
  2. 40
      db/db_iter.cc
  3. 13
      db/db_iterator_test.cc
  4. 4
      db/db_merge_operator_test.cc
  5. 3
      db/db_test2.cc
  6. 39
      db/read_callback.h
  7. 2
      db/snapshot_impl.h
  8. 2
      include/rocksdb/types.h
  9. 2
      utilities/transactions/snapshot_checker.cc
  10. 5
      utilities/transactions/write_prepared_transaction_test.cc
  11. 6
      utilities/transactions/write_prepared_txn.cc
  12. 8
      utilities/transactions/write_prepared_txn.h
  13. 9
      utilities/transactions/write_prepared_txn_db.h
  14. 18
      utilities/transactions/write_unprepared_transaction_test.cc
  15. 11
      utilities/transactions/write_unprepared_txn.cc
  16. 24
      utilities/transactions/write_unprepared_txn.h
  17. 19
      utilities/transactions/write_unprepared_txn_db.cc

@ -1374,7 +1374,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
snapshot = snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_; reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
if (callback) { if (callback) {
snapshot = std::max(snapshot, callback->MaxUnpreparedSequenceNumber()); snapshot = std::max(snapshot, callback->max_visible_seq());
} }
} else { } else {
// Since we get and reference the super version before getting // Since we get and reference the super version before getting

@ -238,7 +238,12 @@ class DBIter final: public Iterator {
void SeekToFirst() override; void SeekToFirst() override;
void SeekToLast() override; void SeekToLast() override;
Env* env() { return env_; } Env* env() { return env_; }
void set_sequence(uint64_t s) { sequence_ = s; } void set_sequence(uint64_t s) {
sequence_ = s;
if (read_callback_) {
read_callback_->Refresh(s);
}
}
void set_valid(bool v) { valid_ = v; } void set_valid(bool v) { valid_ = v; }
private: private:
@ -258,7 +263,7 @@ class DBIter final: public Iterator {
void PrevInternal(); void PrevInternal();
bool TooManyInternalKeysSkipped(bool increment = true); bool TooManyInternalKeysSkipped(bool increment = true);
bool IsVisible(SequenceNumber sequence); inline bool IsVisible(SequenceNumber sequence);
// CanReseekToSkip() returns whether the iterator can use the optimization // CanReseekToSkip() returns whether the iterator can use the optimization
// where it reseek by sequence number to get the next key when there are too // where it reseek by sequence number to get the next key when there are too
@ -266,12 +271,6 @@ class DBIter final: public Iterator {
// sequence number does not guarantee that it is visible. // sequence number does not guarantee that it is visible.
inline bool CanReseekToSkip(); inline bool CanReseekToSkip();
// MaxVisibleSequenceNumber() returns the maximum visible sequence number
// for this snapshot. This sequence number may be greater than snapshot
// seqno because uncommitted data written to DB for write unprepared will
// have a higher sequence number.
inline SequenceNumber MaxVisibleSequenceNumber();
// Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
// is called // is called
void TempPinData() { void TempPinData() {
@ -311,6 +310,8 @@ class DBIter final: public Iterator {
const MergeOperator* const merge_operator_; const MergeOperator* const merge_operator_;
InternalIterator* iter_; InternalIterator* iter_;
ReadCallback* read_callback_; ReadCallback* read_callback_;
// Max visible sequence number. It is normally the snapshot seq unless we have
// uncommitted data in db as in WriteUnCommitted.
SequenceNumber sequence_; SequenceNumber sequence_;
IterKey saved_key_; IterKey saved_key_;
@ -1246,21 +1247,15 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) {
} }
bool DBIter::IsVisible(SequenceNumber sequence) { bool DBIter::IsVisible(SequenceNumber sequence) {
return sequence <= MaxVisibleSequenceNumber() &&
(read_callback_ == nullptr || read_callback_->IsVisible(sequence));
}
bool DBIter::CanReseekToSkip() {
return read_callback_ == nullptr ||
read_callback_->MaxUnpreparedSequenceNumber() == 0;
}
SequenceNumber DBIter::MaxVisibleSequenceNumber() {
if (read_callback_ == nullptr) { if (read_callback_ == nullptr) {
return sequence_; return sequence <= sequence_;
} else {
return read_callback_->IsVisible(sequence);
} }
}
return std::max(sequence_, read_callback_->MaxUnpreparedSequenceNumber()); bool DBIter::CanReseekToSkip() {
return read_callback_ == nullptr || read_callback_->CanReseekToSkip();
} }
void DBIter::Seek(const Slice& target) { void DBIter::Seek(const Slice& target) {
@ -1270,7 +1265,7 @@ void DBIter::Seek(const Slice& target) {
ReleaseTempPinnedData(); ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter(); ResetInternalKeysSkippedCounter();
SequenceNumber seq = MaxVisibleSequenceNumber(); SequenceNumber seq = sequence_;
saved_key_.Clear(); saved_key_.Clear();
saved_key_.SetInternalKey(target, seq); saved_key_.SetInternalKey(target, seq);
@ -1556,6 +1551,9 @@ Status ArenaWrappedDBIter::Refresh() {
new (&arena_) Arena(); new (&arena_) Arena();
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
if (read_callback_) {
read_callback_->Refresh(latest_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options, Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_, cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,

@ -20,7 +20,10 @@ namespace rocksdb {
// A dumb ReadCallback which saying every key is committed. // A dumb ReadCallback which saying every key is committed.
class DummyReadCallback : public ReadCallback { class DummyReadCallback : public ReadCallback {
public:
DummyReadCallback() : ReadCallback(kMaxSequenceNumber) {}
bool IsVisibleFullCheck(SequenceNumber /*seq*/) override { return true; } bool IsVisibleFullCheck(SequenceNumber /*seq*/) override { return true; }
void SetSnapshot(SequenceNumber seq) { max_visible_seq_ = seq; }
}; };
// Test param: // Test param:
@ -39,6 +42,7 @@ class DBIteratorTest : public DBTestBase,
SequenceNumber seq = read_options.snapshot != nullptr SequenceNumber seq = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber() ? read_options.snapshot->GetSequenceNumber()
: db_->GetLatestSequenceNumber(); : db_->GetLatestSequenceNumber();
read_callback_.SetSnapshot(seq);
bool use_read_callback = GetParam(); bool use_read_callback = GetParam();
ReadCallback* read_callback = use_read_callback ? &read_callback_ : nullptr; ReadCallback* read_callback = use_read_callback ? &read_callback_ : nullptr;
return dbfull()->NewIteratorImpl(read_options, cfd, seq, read_callback); return dbfull()->NewIteratorImpl(read_options, cfd, seq, read_callback);
@ -2476,15 +2480,12 @@ class DBIteratorWithReadCallbackTest : public DBIteratorTest {};
TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) { TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) {
class TestReadCallback : public ReadCallback { class TestReadCallback : public ReadCallback {
public: public:
explicit TestReadCallback(SequenceNumber last_visible_seq) explicit TestReadCallback(SequenceNumber max_visible_seq)
: last_visible_seq_(last_visible_seq) {} : ReadCallback(max_visible_seq) {}
bool IsVisibleFullCheck(SequenceNumber seq) override { bool IsVisibleFullCheck(SequenceNumber seq) override {
return seq <= last_visible_seq_; return seq <= max_visible_seq_;
} }
private:
SequenceNumber last_visible_seq_;
}; };
ASSERT_OK(Put("foo", "v1")); ASSERT_OK(Put("foo", "v1"));

@ -18,7 +18,9 @@ class TestReadCallback : public ReadCallback {
public: public:
TestReadCallback(SnapshotChecker* snapshot_checker, TestReadCallback(SnapshotChecker* snapshot_checker,
SequenceNumber snapshot_seq) SequenceNumber snapshot_seq)
: snapshot_checker_(snapshot_checker), snapshot_seq_(snapshot_seq) {} : ReadCallback(snapshot_seq),
snapshot_checker_(snapshot_checker),
snapshot_seq_(snapshot_seq) {}
bool IsVisibleFullCheck(SequenceNumber seq) override { bool IsVisibleFullCheck(SequenceNumber seq) override {
return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) == return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) ==

@ -2729,7 +2729,8 @@ TEST_F(DBTest2, ReadCallbackTest) {
class TestReadCallback : public ReadCallback { class TestReadCallback : public ReadCallback {
public: public:
explicit TestReadCallback(SequenceNumber snapshot) : snapshot_(snapshot) {} explicit TestReadCallback(SequenceNumber snapshot)
: ReadCallback(snapshot), snapshot_(snapshot) {}
bool IsVisibleFullCheck(SequenceNumber seq) override { bool IsVisibleFullCheck(SequenceNumber seq) override {
return seq <= snapshot_; return seq <= snapshot_;
} }

@ -11,10 +11,10 @@ namespace rocksdb {
class ReadCallback { class ReadCallback {
public: public:
ReadCallback() {} ReadCallback(SequenceNumber last_visible_seq)
ReadCallback(SequenceNumber snapshot) : snapshot_(snapshot) {} : max_visible_seq_(last_visible_seq) {}
ReadCallback(SequenceNumber snapshot, SequenceNumber min_uncommitted) ReadCallback(SequenceNumber last_visible_seq, SequenceNumber min_uncommitted)
: snapshot_(snapshot), min_uncommitted_(min_uncommitted) {} : max_visible_seq_(last_visible_seq), min_uncommitted_(min_uncommitted) {}
virtual ~ReadCallback() {} virtual ~ReadCallback() {}
@ -23,30 +23,33 @@ class ReadCallback {
virtual bool IsVisibleFullCheck(SequenceNumber seq) = 0; virtual bool IsVisibleFullCheck(SequenceNumber seq) = 0;
inline bool IsVisible(SequenceNumber seq) { inline bool IsVisible(SequenceNumber seq) {
if (seq == 0 || seq < min_uncommitted_) { assert(min_uncommitted_ > 0);
assert(seq <= snapshot_); assert(min_uncommitted_ >= kMinUnCommittedSeq);
if (seq < min_uncommitted_) { // handles seq == 0 as well
assert(seq <= max_visible_seq_);
return true; return true;
} else if (snapshot_ < seq) { } else if (max_visible_seq_ < seq) {
assert(seq != 0);
return false; return false;
} else { } else {
assert(seq != 0); // already handled in the first if-then clause
return IsVisibleFullCheck(seq); return IsVisibleFullCheck(seq);
} }
} }
// This is called to determine the maximum visible sequence number for the inline SequenceNumber max_visible_seq() { return max_visible_seq_; }
// current transaction for read-your-own-write semantics. This is so that
// for write unprepared, we will not skip keys that are written by the virtual void Refresh(SequenceNumber seq) { max_visible_seq_ = seq; }
// current transaction with the seek to snapshot optimization.
// // Refer to DBIter::CanReseekToSkip
// For other uses, this returns zero, meaning that the current snapshot virtual bool CanReseekToSkip() { return true; }
// sequence number is the maximum visible sequence number.
inline virtual SequenceNumber MaxUnpreparedSequenceNumber() { return 0; };
protected: protected:
// The snapshot at which the read is performed. // The max visible seq, it is usually the snapshot but could be larger if
const SequenceNumber snapshot_ = kMaxSequenceNumber; // transaction has its own writes written to db.
SequenceNumber max_visible_seq_ = kMaxSequenceNumber;
// Any seq less than min_uncommitted_ is committed. // Any seq less than min_uncommitted_ is committed.
const SequenceNumber min_uncommitted_ = 0; const SequenceNumber min_uncommitted_ = kMinUnCommittedSeq;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -24,7 +24,7 @@ class SnapshotImpl : public Snapshot {
// It indicates the smallest uncommitted data at the time the snapshot was // It indicates the smallest uncommitted data at the time the snapshot was
// taken. This is currently used by WritePrepared transactions to limit the // taken. This is currently used by WritePrepared transactions to limit the
// scope of queries to IsInSnpashot. // scope of queries to IsInSnpashot.
SequenceNumber min_uncommitted_ = 0; SequenceNumber min_uncommitted_ = kMinUnCommittedSeq;
virtual SequenceNumber GetSequenceNumber() const override { return number_; } virtual SequenceNumber GetSequenceNumber() const override { return number_; }

@ -15,6 +15,8 @@ namespace rocksdb {
// Represents a sequence number in a WAL file. // Represents a sequence number in a WAL file.
typedef uint64_t SequenceNumber; typedef uint64_t SequenceNumber;
const SequenceNumber kMinUnCommittedSeq = 1; // 0 is always committed
// User-oriented representation of internal key types. // User-oriented representation of internal key types.
enum EntryType { enum EntryType {
kEntryPut, kEntryPut,

@ -35,7 +35,7 @@ SnapshotCheckerResult WritePreparedSnapshotChecker::CheckInSnapshot(
bool snapshot_released = false; bool snapshot_released = false;
// TODO(myabandeh): set min_uncommitted // TODO(myabandeh): set min_uncommitted
bool in_snapshot = txn_db_->IsInSnapshot( bool in_snapshot = txn_db_->IsInSnapshot(
sequence, snapshot_sequence, 0 /*min_uncommitted*/, &snapshot_released); sequence, snapshot_sequence, kMinUnCommittedSeq, &snapshot_released);
if (snapshot_released) { if (snapshot_released) {
return SnapshotCheckerResult::kSnapshotReleased; return SnapshotCheckerResult::kSnapshotReleased;
} }

@ -1262,6 +1262,7 @@ TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) {
TEST_P(WritePreparedTransactionTest, TxnInitialize) { TEST_P(WritePreparedTransactionTest, TxnInitialize) {
TransactionOptions txn_options; TransactionOptions txn_options;
WriteOptions write_options; WriteOptions write_options;
ASSERT_OK(db->Put(write_options, "key", "value"));
Transaction* txn0 = db->BeginTransaction(write_options, txn_options); Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn0->SetName("xid")); ASSERT_OK(txn0->SetName("xid"));
ASSERT_OK(txn0->Put(Slice("key"), Slice("value1"))); ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
@ -1274,7 +1275,7 @@ TEST_P(WritePreparedTransactionTest, TxnInitialize) {
auto snap_impl = reinterpret_cast<const SnapshotImpl*>(snap); auto snap_impl = reinterpret_cast<const SnapshotImpl*>(snap);
// If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
// udpated // udpated
ASSERT_GT(snap_impl->min_uncommitted_, 0); ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq);
txn0->Rollback(); txn0->Rollback();
txn1->Rollback(); txn1->Rollback();
@ -1679,7 +1680,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) {
size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq; size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq;
wp_db->AddCommitted(overwrite_seq, overwrite_seq); wp_db->AddCommitted(overwrite_seq, overwrite_seq);
SequenceNumber snap_seq; SequenceNumber snap_seq;
uint64_t min_uncommitted = 0; uint64_t min_uncommitted = kMinUnCommittedSeq;
bool released; bool released;
released = false; released = false;

@ -50,7 +50,8 @@ Status WritePreparedTxn::Get(const ReadOptions& read_options,
auto snapshot = read_options.snapshot; auto snapshot = read_options.snapshot;
auto snap_seq = auto snap_seq =
snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0; // by default disable the optimization SequenceNumber min_uncommitted =
kMinUnCommittedSeq; // by default disable the optimization
if (snapshot != nullptr) { if (snapshot != nullptr) {
min_uncommitted = min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot) static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
@ -235,8 +236,7 @@ Status WritePreparedTxn::RollbackInternal() {
std::map<uint32_t, ColumnFamilyHandle*>& handles, std::map<uint32_t, ColumnFamilyHandle*>& handles,
bool rollback_merge_operands) bool rollback_merge_operands)
: db_(db), : db_(db),
callback(wpt_db, snap_seq, callback(wpt_db, snap_seq), // disable min_uncommitted optimization
0), // 0 disables min_uncommitted optimization
rollback_batch_(dst_batch), rollback_batch_(dst_batch),
comparators_(comparators), comparators_(comparators),
handles_(handles), handles_(handles),

@ -53,9 +53,11 @@ class WritePreparedTxn : public PessimisticTransaction {
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override; PinnableSlice* value) override;
// To make WAL commit markers visible, the snapshot will be based on the last // Note: The behavior is undefined in presence of interleaved writes to the
// seq in the WAL that is also published, LastPublishedSequence, as opposed to // same transaction.
// the last seq in the memtable. // To make WAL commit markers visible, the snapshot will be
// based on the last seq in the WAL that is also published,
// LastPublishedSequence, as opposed to the last seq in the memtable.
using Transaction::GetIterator; using Transaction::GetIterator;
virtual Iterator* GetIterator(const ReadOptions& options) override; virtual Iterator* GetIterator(const ReadOptions& options) override;
virtual Iterator* GetIterator(const ReadOptions& options, virtual Iterator* GetIterator(const ReadOptions& options,

@ -110,12 +110,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// If the snapshot_seq is already released and snapshot_seq <= max, sets // If the snapshot_seq is already released and snapshot_seq <= max, sets
// *snap_released to true and returns true as well. // *snap_released to true and returns true as well.
inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq, inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
uint64_t min_uncommitted = 0, uint64_t min_uncommitted = kMinUnCommittedSeq,
bool* snap_released = nullptr) const { bool* snap_released = nullptr) const {
ROCKS_LOG_DETAILS(info_log_, ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64 "IsInSnapshot %" PRIu64 " in %" PRIu64
" min_uncommitted %" PRIu64, " min_uncommitted %" PRIu64,
prep_seq, snapshot_seq, min_uncommitted); prep_seq, snapshot_seq, min_uncommitted);
assert(min_uncommitted >= kMinUnCommittedSeq);
// Caller is responsible to initialize snap_released. // Caller is responsible to initialize snap_released.
assert(snap_released == nullptr || *snap_released == false); assert(snap_released == nullptr || *snap_released == false);
// Here we try to infer the return value without looking into prepare list. // Here we try to infer the return value without looking into prepare list.
@ -730,6 +731,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
class WritePreparedTxnReadCallback : public ReadCallback { class WritePreparedTxnReadCallback : public ReadCallback {
public: public:
WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
: ReadCallback(snapshot), db_(db) {}
WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
SequenceNumber min_uncommitted) SequenceNumber min_uncommitted)
: ReadCallback(snapshot, min_uncommitted), db_(db) {} : ReadCallback(snapshot, min_uncommitted), db_(db) {}
@ -737,9 +740,11 @@ class WritePreparedTxnReadCallback : public ReadCallback {
// Will be called to see if the seq number visible; if not it moves on to // Will be called to see if the seq number visible; if not it moves on to
// the next seq number. // the next seq number.
inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override { inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override {
return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_); auto snapshot = max_visible_seq_;
return db_->IsInSnapshot(seq, snapshot, min_uncommitted_);
} }
// TODO(myabandeh): override Refresh when Iterator::Refresh is supported
private: private:
WritePreparedTxnDB* db_; WritePreparedTxnDB* db_;
}; };

@ -81,12 +81,12 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
ReadOptions roptions; ReadOptions roptions;
roptions.snapshot = snapshot0; roptions.snapshot = snapshot0;
wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
auto iter = txn->GetIterator(roptions); auto iter = txn->GetIterator(roptions);
// Test Get(). // Test Get().
std::string value; std::string value;
wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); ASSERT_OK(txn->Get(roptions, Slice("a"), &value));
ASSERT_EQ(value, "v3"); ASSERT_EQ(value, "v3");
@ -96,6 +96,8 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
delete iter;
iter = txn->GetIterator(roptions);
ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); ASSERT_OK(txn->Get(roptions, Slice("a"), &value));
ASSERT_EQ(value, "v7"); ASSERT_EQ(value, "v7");
@ -108,6 +110,8 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
// Test Next(). // Test Next().
wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
delete iter;
iter = txn->GetIterator(roptions);
iter->Seek("a"); iter->Seek("a");
verify_state(iter, "a", "v3"); verify_state(iter, "a", "v3");
@ -123,6 +127,8 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
delete iter;
iter = txn->GetIterator(roptions);
iter->Seek("a"); iter->Seek("a");
verify_state(iter, "a", "v7"); verify_state(iter, "a", "v7");
@ -143,11 +149,11 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
// //
// Because of row locks and ValidateSnapshot, there cannot be any committed // Because of row locks and ValidateSnapshot, there cannot be any committed
// entries after snapshot, but before the first prepared key. // entries after snapshot, but before the first prepared key.
delete iter;
roptions.snapshot = snapshot2; roptions.snapshot = snapshot2;
iter = txn->GetIterator(roptions);
wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
delete iter;
iter = txn->GetIterator(roptions);
iter->SeekForPrev("b"); iter->SeekForPrev("b");
verify_state(iter, "b", "v4"); verify_state(iter, "b", "v4");
@ -161,11 +167,11 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
iter->Prev(); iter->Prev();
verify_state(iter, "a", "v3"); verify_state(iter, "a", "v3");
delete iter;
roptions.snapshot = snapshot6; roptions.snapshot = snapshot6;
iter = txn->GetIterator(roptions);
wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
delete iter;
iter = txn->GetIterator(roptions);
iter->SeekForPrev("b"); iter->SeekForPrev("b");
verify_state(iter, "b", "v8"); verify_state(iter, "b", "v8");

@ -34,12 +34,12 @@ bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) {
return db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_); return db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_);
} }
SequenceNumber WriteUnpreparedTxnReadCallback::MaxUnpreparedSequenceNumber() { SequenceNumber WriteUnpreparedTxnReadCallback::CalcMaxUnpreparedSequenceNumber(
auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers(); WriteUnpreparedTxn* txn) {
auto unprep_seqs = txn->GetUnpreparedSequenceNumbers();
if (unprep_seqs.size()) { if (unprep_seqs.size()) {
return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1; return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
} }
return 0; return 0;
} }
@ -379,7 +379,7 @@ Status WriteUnpreparedTxn::RollbackInternal() {
// Note that we do not use WriteUnpreparedTxnReadCallback because we do not // Note that we do not use WriteUnpreparedTxnReadCallback because we do not
// need to read our own writes when reading prior versions of the key for // need to read our own writes when reading prior versions of the key for
// rollback. // rollback.
WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq, 0); WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq);
for (const auto& cfkey : write_set_keys_) { for (const auto& cfkey : write_set_keys_) {
const auto cfid = cfkey.first; const auto cfid = cfkey.first;
const auto& keys = cfkey.second; const auto& keys = cfkey.second;
@ -475,7 +475,8 @@ Status WriteUnpreparedTxn::Get(const ReadOptions& options,
auto snapshot = options.snapshot; auto snapshot = options.snapshot;
auto snap_seq = auto snap_seq =
snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0; // by default disable the optimization SequenceNumber min_uncommitted =
kMinUnCommittedSeq; // by default disable the optimization
if (snapshot != nullptr) { if (snapshot != nullptr) {
min_uncommitted = min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot) static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)

@ -23,17 +23,33 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback {
SequenceNumber snapshot, SequenceNumber snapshot,
SequenceNumber min_uncommitted, SequenceNumber min_uncommitted,
WriteUnpreparedTxn* txn) WriteUnpreparedTxn* txn)
// Disable snapshot check on parent class since it would violate // Pass our last uncommitted seq as the snapshot to the parent class to
// read-your-own-write guarantee. // ensure that the parent will not prematurely filter out own writes. We
: ReadCallback(kMaxSequenceNumber, min_uncommitted), // will do the exact comparison agaisnt snapshots in IsVisibleFullCheck
// override.
: ReadCallback(CalcMaxVisibleSeq(txn, snapshot), min_uncommitted),
db_(db), db_(db),
txn_(txn), txn_(txn),
wup_snapshot_(snapshot) {} wup_snapshot_(snapshot) {}
virtual bool IsVisibleFullCheck(SequenceNumber seq) override; virtual bool IsVisibleFullCheck(SequenceNumber seq) override;
virtual SequenceNumber MaxUnpreparedSequenceNumber() override;
bool CanReseekToSkip() override {
return wup_snapshot_ == max_visible_seq_;
// Otherwise our own writes uncommitted are in db, and the assumptions
// behind reseek optimizations are no longer valid.
}
// TODO(myabandeh): override Refresh when Iterator::Refresh is supported
private: private:
SequenceNumber CalcMaxVisibleSeq(WriteUnpreparedTxn* txn,
SequenceNumber snapshot_seq) {
SequenceNumber max_unprepared = CalcMaxUnpreparedSequenceNumber(txn);
assert(snapshot_seq < max_unprepared || max_unprepared == 0 ||
snapshot_seq == kMaxSequenceNumber);
return std::max(max_unprepared, snapshot_seq);
}
SequenceNumber CalcMaxUnpreparedSequenceNumber(WriteUnpreparedTxn* txn);
WritePreparedTxnDB* db_; WritePreparedTxnDB* db_;
WriteUnpreparedTxn* txn_; WriteUnpreparedTxn* txn_;
SequenceNumber wup_snapshot_; SequenceNumber wup_snapshot_;

@ -31,17 +31,17 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
class InvalidSnapshotReadCallback : public ReadCallback { class InvalidSnapshotReadCallback : public ReadCallback {
public: public:
InvalidSnapshotReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, InvalidSnapshotReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot)
SequenceNumber min_uncommitted) : ReadCallback(snapshot), db_(db) {}
: ReadCallback(snapshot, min_uncommitted), db_(db) {}
// Will be called to see if the seq number visible; if not it moves on to // Will be called to see if the seq number visible; if not it moves on to
// the next seq number. // the next seq number.
inline bool IsVisibleFullCheck(SequenceNumber seq) override { inline bool IsVisibleFullCheck(SequenceNumber seq) override {
// Becomes true if it cannot tell by comparing seq with snapshot seq since // Becomes true if it cannot tell by comparing seq with snapshot seq since
// the snapshot_ is not a real snapshot. // the snapshot is not a real snapshot.
auto snapshot = max_visible_seq_;
bool released = false; bool released = false;
auto ret = db_->IsInSnapshot(seq, snapshot_, min_uncommitted_, &released); auto ret = db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &released);
assert(!released || ret); assert(!released || ret);
return ret; return ret;
} }
@ -73,8 +73,8 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
std::map<uint32_t, ColumnFamilyHandle*>& handles, std::map<uint32_t, ColumnFamilyHandle*>& handles,
bool rollback_merge_operands) bool rollback_merge_operands)
: db_(db), : db_(db),
callback(wpt_db, snap_seq, callback(wpt_db, snap_seq),
0), // 0 disables min_uncommitted optimization // disable min_uncommitted optimization
rollback_batch_(dst_batch), rollback_batch_(dst_batch),
comparators_(comparators), comparators_(comparators),
handles_(handles), handles_(handles),
@ -354,6 +354,7 @@ struct WriteUnpreparedTxnDB::IteratorState {
std::shared_ptr<ManagedSnapshot> s, std::shared_ptr<ManagedSnapshot> s,
SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn) SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
: callback(txn_db, sequence, min_uncommitted, txn), snapshot(s) {} : callback(txn_db, sequence, min_uncommitted, txn), snapshot(s) {}
SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }
WriteUnpreparedTxnReadCallback callback; WriteUnpreparedTxnReadCallback callback;
std::shared_ptr<ManagedSnapshot> snapshot; std::shared_ptr<ManagedSnapshot> snapshot;
@ -395,8 +396,8 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
auto* state = auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn); new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
auto* db_iter = auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, db_impl_->NewIteratorImpl(options, cfd, state->MaxVisibleSeq(),
!ALLOW_BLOB, !ALLOW_REFRESH); &state->callback, !ALLOW_BLOB, !ALLOW_REFRESH);
db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr); db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
return db_iter; return db_iter;
} }

Loading…
Cancel
Save