WriteUnPrepared Txn: Disable seek to snapshot optimization (#3955)

Summary:
This is implemented by extending ReadCallback with another function `MaxUnpreparedSequenceNumber` which returns the largest visible sequence number for the current transaction, if there is uncommitted data written to DB. Otherwise, it returns zero, indicating no uncommitted data.

There are the places where reads had to be modified.
- Get and Seek/Next was just updated to seek to max(snapshot_seq, MaxUnpreparedSequenceNumber()) instead, and iterate until a key was visible.
- Prev did not need need updates since it did not use the Seek to sequence number optimization. Assuming that locks were held when writing unprepared keys, and ValidateSnapshot runs, there should only be committed keys and unprepared keys of the current transaction, all of which are visible. Prev will simply iterate to get the last visible key.
- Reseeking to skip keys optimization was also disabled for write unprepared, since it's possible to hit the max_skip condition even while reseeking. There needs to be some way to resolve infinite looping in this case.
Closes https://github.com/facebook/rocksdb/pull/3955

Differential Revision: D8286688

Pulled By: lth

fbshipit-source-id: 25e42f47fdeb5f7accea0f4fd350ef35198caafe
main
Manuel Ung 6 years ago committed by Facebook Github Bot
parent 17339dc2f3
commit a16e00b7b9
  1. 23
      db/db_impl.cc
  2. 41
      db/db_iter.cc
  3. 4
      db/db_iterator_test.cc
  4. 2
      db/db_merge_operator_test.cc
  5. 2
      db/db_test2.cc
  6. 2
      db/memtable.cc
  7. 15
      db/read_callback.h
  8. 1
      include/rocksdb/utilities/write_batch_with_index.h
  9. 2
      table/get_context.h
  10. 2
      utilities/transactions/transaction_util.cc
  11. 7
      utilities/transactions/write_prepared_txn_db.cc
  12. 6
      utilities/transactions/write_prepared_txn_db.h
  13. 179
      utilities/transactions/write_unprepared_transaction_test.cc
  14. 71
      utilities/transactions/write_unprepared_txn.cc
  15. 57
      utilities/transactions/write_unprepared_txn.h
  16. 58
      utilities/transactions/write_unprepared_txn_db.cc
  17. 12
      utilities/transactions/write_unprepared_txn_db.h

@ -1037,12 +1037,19 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
// Note: In WritePrepared txns this is not necessary but not harmful either.
// Because prep_seq > snapshot => commit_seq > snapshot so if a snapshot is
// specified we should be fine with skipping seq numbers that are greater
// than that.
// Note: In WritePrepared txns this is not necessary but not harmful
// either. Because prep_seq > snapshot => commit_seq > snapshot so if
// a snapshot is specified we should be fine with skipping seq numbers
// that are greater than that.
//
// In WriteUnprepared, we cannot set snapshot in the lookup key because we
// may skip uncommitted data that should be visible to the transaction for
// reading own writes.
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
if (callback) {
snapshot = std::max(snapshot, callback->MaxUnpreparedSequenceNumber());
}
} else {
// Since we get and reference the super version before getting
// the snapshot number, without a mutex protection, it is possible
@ -1050,10 +1057,10 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
// data for this snapshot is available. But it will contain all
// the data available in the super version we have, which is also
// a valid snapshot to read from.
// We shouldn't get snapshot before finding and referencing the
// super versipon because a flush happening in between may compact
// away data for the snapshot, but the snapshot is earlier than the
// data overwriting it, so users may see wrong results.
// We shouldn't get snapshot before finding and referencing the super
// version because a flush happening in between may compact away data for
// the snapshot, but the snapshot is earlier than the data overwriting it,
// so users may see wrong results.
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();

@ -260,6 +260,18 @@ class DBIter final: public Iterator {
bool TooManyInternalKeysSkipped(bool increment = true);
bool IsVisible(SequenceNumber sequence);
// CanReseekToSkip() returns whether the iterator can use the optimization
// where it reseek by sequence number to get the next key when there are too
// many versions. This is disabled for write unprepared because seeking to
// sequence number does not guarantee that it is visible.
inline bool CanReseekToSkip();
// MaxVisibleSequenceNumber() returns the maximum visible sequence number
// for this snapshot. This sequence number may be greater than snapshot
// seqno because uncommitted data written to DB for write unprepared will
// have a higher sequence number.
inline SequenceNumber MaxVisibleSequenceNumber();
// Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
// is called
void TempPinData() {
@ -578,7 +590,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
// If we have sequentially iterated via numerous equal keys, then it's
// better to seek so that we can avoid too many key comparisons.
if (num_skipped > max_skip_) {
if (num_skipped > max_skip_ && CanReseekToSkip()) {
num_skipped = 0;
std::string last_key;
if (skipping) {
@ -895,7 +907,7 @@ bool DBIter::FindValueForCurrentKey() {
// This user key has lots of entries.
// We're going from old to new, and it's taking too long. Let's do a Seek()
// and go from new to old. This helps when a key was overwritten many times.
if (num_skipped >= max_skip_) {
if (num_skipped >= max_skip_ && CanReseekToSkip()) {
return FindValueForCurrentKeyUsingSeek();
}
@ -1194,7 +1206,7 @@ bool DBIter::FindUserKeyBeforeSavedKey() {
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
}
if (num_skipped >= max_skip_) {
if (num_skipped >= max_skip_ && CanReseekToSkip()) {
num_skipped = 0;
IterKey last_key;
last_key.SetInternalKey(ParsedInternalKey(
@ -1234,8 +1246,21 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) {
}
bool DBIter::IsVisible(SequenceNumber sequence) {
return sequence <= sequence_ &&
(read_callback_ == nullptr || read_callback_->IsCommitted(sequence));
return sequence <= MaxVisibleSequenceNumber() &&
(read_callback_ == nullptr || read_callback_->IsVisible(sequence));
}
bool DBIter::CanReseekToSkip() {
return read_callback_ == nullptr ||
read_callback_->MaxUnpreparedSequenceNumber() == 0;
}
SequenceNumber DBIter::MaxVisibleSequenceNumber() {
if (read_callback_ == nullptr) {
return sequence_;
}
return std::max(sequence_, read_callback_->MaxUnpreparedSequenceNumber());
}
void DBIter::Seek(const Slice& target) {
@ -1243,14 +1268,16 @@ void DBIter::Seek(const Slice& target) {
status_ = Status::OK();
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
SequenceNumber seq = MaxVisibleSequenceNumber();
saved_key_.Clear();
saved_key_.SetInternalKey(target, sequence_);
saved_key_.SetInternalKey(target, seq);
if (iterate_lower_bound_ != nullptr &&
user_comparator_->Compare(saved_key_.GetUserKey(),
*iterate_lower_bound_) < 0) {
saved_key_.Clear();
saved_key_.SetInternalKey(*iterate_lower_bound_, sequence_);
saved_key_.SetInternalKey(*iterate_lower_bound_, seq);
}
{

@ -20,7 +20,7 @@ namespace rocksdb {
// A dumb ReadCallback which saying every key is committed.
class DummyReadCallback : public ReadCallback {
bool IsCommitted(SequenceNumber /*seq*/) { return true; }
bool IsVisible(SequenceNumber /*seq*/) override { return true; }
};
// Test param:
@ -2417,7 +2417,7 @@ TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) {
explicit TestReadCallback(SequenceNumber last_visible_seq)
: last_visible_seq_(last_visible_seq) {}
bool IsCommitted(SequenceNumber seq) override {
bool IsVisible(SequenceNumber seq) override {
return seq <= last_visible_seq_;
}

@ -20,7 +20,7 @@ class TestReadCallback : public ReadCallback {
SequenceNumber snapshot_seq)
: snapshot_checker_(snapshot_checker), snapshot_seq_(snapshot_seq) {}
bool IsCommitted(SequenceNumber seq) override {
bool IsVisible(SequenceNumber seq) override {
return snapshot_checker_->IsInSnapshot(seq, snapshot_seq_);
}

@ -2422,7 +2422,7 @@ TEST_F(DBTest2, ReadCallbackTest) {
class TestReadCallback : public ReadCallback {
public:
explicit TestReadCallback(SequenceNumber snapshot) : snapshot_(snapshot) {}
virtual bool IsCommitted(SequenceNumber seq) override {
virtual bool IsVisible(SequenceNumber seq) override {
return seq <= snapshot_;
}

@ -579,7 +579,7 @@ struct Saver {
bool CheckCallback(SequenceNumber _seq) {
if (callback_) {
return callback_->IsCommitted(_seq);
return callback_->IsVisible(_seq);
}
return true;
}

@ -13,9 +13,18 @@ class ReadCallback {
public:
virtual ~ReadCallback() {}
// Will be called to see if the seq number accepted; if not it moves on to the
// next seq number.
virtual bool IsCommitted(SequenceNumber seq) = 0;
// Will be called to see if the seq number visible; if not it moves on to
// the next seq number.
virtual bool IsVisible(SequenceNumber seq) = 0;
// This is called to determine the maximum visible sequence number for the
// current transaction for read-your-own-write semantics. This is so that
// for write unprepared, we will not skip keys that are written by the
// current transaction with the seek to snapshot optimization.
//
// For other uses, this returns zero, meaning that the current snapshot
// sequence number is the maximum visible sequence number.
inline virtual SequenceNumber MaxUnpreparedSequenceNumber() { return 0; };
};
} // namespace rocksdb

@ -229,6 +229,7 @@ class WriteBatchWithIndex : public WriteBatchBase {
private:
friend class PessimisticTransactionDB;
friend class WritePreparedTxn;
friend class WriteUnpreparedTxn;
friend class WriteBatchWithIndex_SubBatchCnt_Test;
// Returns the number of sub-batches inside the write batch. A sub-batch
// starts right before inserting a key that is a duplicate of a key in the

@ -72,7 +72,7 @@ class GetContext {
bool CheckCallback(SequenceNumber seq) {
if (callback_) {
return callback_->IsCommitted(seq);
return callback_->IsVisible(seq);
}
return true;
}

@ -108,7 +108,7 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
} else if (found_record_for_key) {
bool write_conflict = snap_checker == nullptr
? snap_seq < seq
: !snap_checker->IsCommitted(seq);
: !snap_checker->IsVisible(seq);
if (write_conflict) {
result = Status::Busy();
}

@ -306,9 +306,10 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
SequenceNumber min_uncommitted = 0;
if (options.snapshot != nullptr) {
snapshot_seq = options.snapshot->GetSequenceNumber();
min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
options.snapshot)
->min_uncommitted_;
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(
options.snapshot)
->min_uncommitted_;
} else {
auto* snapshot = GetSnapshot();
// We take a snapshot to make sure that the related data in the commit map

@ -612,9 +612,9 @@ class WritePreparedTxnReadCallback : public ReadCallback {
SequenceNumber min_uncommitted)
: db_(db), snapshot_(snapshot), min_uncommitted_(min_uncommitted) {}
// Will be called to see if the seq number accepted; if not it moves on to the
// next seq number.
inline virtual bool IsCommitted(SequenceNumber seq) override {
// Will be called to see if the seq number visible; if not it moves on to
// the next seq number.
inline virtual bool IsVisible(SequenceNumber seq) override {
return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_);
}

@ -10,8 +10,185 @@
#endif
#include "utilities/transactions/transaction_test.h"
#include "utilities/transactions/write_unprepared_txn.h"
#include "utilities/transactions/write_unprepared_txn_db.h"
namespace rocksdb {} // namespace rocksdb
namespace rocksdb {
class WriteUnpreparedTransactionTestBase : public TransactionTestBase {
public:
WriteUnpreparedTransactionTestBase(bool use_stackable_db,
bool two_write_queue,
TxnDBWritePolicy write_policy)
: TransactionTestBase(use_stackable_db, two_write_queue, write_policy){}
};
class WriteUnpreparedTransactionTest
: public WriteUnpreparedTransactionTestBase,
virtual public ::testing::WithParamInterface<
std::tuple<bool, bool, TxnDBWritePolicy>> {
public:
WriteUnpreparedTransactionTest()
: WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()),
std::get<1>(GetParam()),
std::get<2>(GetParam())){}
};
INSTANTIATE_TEST_CASE_P(
WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest,
::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED),
std::make_tuple(false, true, WRITE_UNPREPARED)));
TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
auto verify_state = [](Iterator* iter, const std::string& key,
const std::string& value) {
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(key, iter->key().ToString());
ASSERT_EQ(value, iter->value().ToString());
};
options.disable_auto_compactions = true;
ReOpen();
// The following tests checks whether reading your own write for
// a transaction works for write unprepared, when there are uncommitted
// values written into DB.
//
// Although the values written by DB::Put are technically committed, we add
// their seq num to unprep_seqs_ to pretend that they were written into DB
// as part of an unprepared batch, and then check if they are visible to the
// transaction.
auto snapshot0 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "a", "v1"));
ASSERT_OK(db->Put(WriteOptions(), "b", "v2"));
auto snapshot2 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "a", "v3"));
ASSERT_OK(db->Put(WriteOptions(), "b", "v4"));
auto snapshot4 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "a", "v5"));
ASSERT_OK(db->Put(WriteOptions(), "b", "v6"));
auto snapshot6 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "a", "v7"));
ASSERT_OK(db->Put(WriteOptions(), "b", "v8"));
auto snapshot8 = db->GetSnapshot();
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
ReadOptions roptions;
roptions.snapshot = snapshot0;
auto iter = txn->GetIterator(roptions);
// Test Get().
std::string value;
wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
ASSERT_OK(txn->Get(roptions, Slice("a"), &value));
ASSERT_EQ(value, "v3");
ASSERT_OK(txn->Get(roptions, Slice("b"), &value));
ASSERT_EQ(value, "v4");
wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
ASSERT_OK(txn->Get(roptions, Slice("a"), &value));
ASSERT_EQ(value, "v7");
ASSERT_OK(txn->Get(roptions, Slice("b"), &value));
ASSERT_EQ(value, "v8");
wup_txn->unprep_seqs_.clear();
// Test Next().
wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
iter->Seek("a");
verify_state(iter, "a", "v3");
iter->Next();
verify_state(iter, "b", "v4");
iter->SeekToFirst();
verify_state(iter, "a", "v3");
iter->Next();
verify_state(iter, "b", "v4");
wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
iter->Seek("a");
verify_state(iter, "a", "v7");
iter->Next();
verify_state(iter, "b", "v8");
iter->SeekToFirst();
verify_state(iter, "a", "v7");
iter->Next();
verify_state(iter, "b", "v8");
wup_txn->unprep_seqs_.clear();
// Test Prev(). For Prev(), we need to adjust the snapshot to match what is
// possible in WriteUnpreparedTxn.
//
// Because of row locks and ValidateSnapshot, there cannot be any committed
// entries after snapshot, but before the first prepared key.
delete iter;
roptions.snapshot = snapshot2;
iter = txn->GetIterator(roptions);
wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] =
snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber();
iter->SeekForPrev("b");
verify_state(iter, "b", "v4");
iter->Prev();
verify_state(iter, "a", "v3");
iter->SeekToLast();
verify_state(iter, "b", "v4");
iter->Prev();
verify_state(iter, "a", "v3");
delete iter;
roptions.snapshot = snapshot6;
iter = txn->GetIterator(roptions);
wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] =
snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber();
iter->SeekForPrev("b");
verify_state(iter, "b", "v8");
iter->Prev();
verify_state(iter, "a", "v7");
iter->SeekToLast();
verify_state(iter, "b", "v8");
iter->Prev();
verify_state(iter, "a", "v7");
db->ReleaseSnapshot(snapshot0);
db->ReleaseSnapshot(snapshot2);
db->ReleaseSnapshot(snapshot4);
db->ReleaseSnapshot(snapshot6);
db->ReleaseSnapshot(snapshot8);
delete iter;
delete txn;
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);

@ -6,6 +6,9 @@
#ifndef ROCKSDB_LITE
#include "utilities/transactions/write_unprepared_txn.h"
#include "db/db_impl.h"
#include "util/cast_util.h"
#include "utilities/transactions/write_unprepared_txn_db.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
@ -13,6 +16,74 @@
namespace rocksdb {
bool WriteUnpreparedTxnReadCallback::IsVisible(SequenceNumber seq) {
auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers();
// Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is
// in unprep_seqs, we have to check if seq is equal to prep_seq or any of
// the prepare_batch_cnt seq nums after it.
//
// TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is
// large.
for (const auto& it : unprep_seqs) {
if (it.first <= seq && seq < it.first + it.second) {
return true;
}
}
return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_);
}
SequenceNumber WriteUnpreparedTxnReadCallback::MaxUnpreparedSequenceNumber() {
auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers();
if (unprep_seqs.size()) {
return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
}
return 0;
}
WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options)
: WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) {}
Status WriteUnpreparedTxn::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
auto snapshot = options.snapshot;
auto snap_seq =
snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0; // by default disable the optimization
if (snapshot != nullptr) {
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
}
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
this);
return write_batch_.GetFromBatchAndDB(db_, options, column_family, key, value,
&callback);
}
Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {
return GetIterator(options, wupt_db_->DefaultColumnFamily());
}
Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) {
// Make sure to get iterator from WriteUnprepareTxnDB, not the root db.
Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
assert(db_iter);
return write_batch_.NewIteratorWithBase(column_family, db_iter);
}
const std::map<SequenceNumber, size_t>&
WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() {
return unprep_seqs_;
}
} // namespace rocksdb

@ -8,12 +8,67 @@
#ifndef ROCKSDB_LITE
#include "utilities/transactions/write_prepared_txn.h"
#include "utilities/transactions/write_unprepared_txn_db.h"
namespace rocksdb {
class WriteUnpreparedTxnDB;
class WriteUnpreparedTxn;
class WriteUnpreparedTxnReadCallback : public ReadCallback {
public:
WriteUnpreparedTxnReadCallback(WritePreparedTxnDB* db,
SequenceNumber snapshot,
SequenceNumber min_uncommitted,
WriteUnpreparedTxn* txn)
: db_(db),
snapshot_(snapshot),
min_uncommitted_(min_uncommitted),
txn_(txn) {}
virtual bool IsVisible(SequenceNumber seq) override;
virtual SequenceNumber MaxUnpreparedSequenceNumber() override;
private:
WritePreparedTxnDB* db_;
SequenceNumber snapshot_;
SequenceNumber min_uncommitted_;
WriteUnpreparedTxn* txn_;
};
class WriteUnpreparedTxn : public WritePreparedTxn {
using WritePreparedTxn::WritePreparedTxn;
public:
WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
const WriteOptions& write_options,
const TransactionOptions& txn_options);
virtual ~WriteUnpreparedTxn() {}
// Get and GetIterator needs to be overridden so that a ReadCallback to
// handle read-your-own-write is used.
using Transaction::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using Transaction::GetIterator;
virtual Iterator* GetIterator(const ReadOptions& options) override;
virtual Iterator* GetIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override;
const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
private:
friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
WriteUnpreparedTxnDB* wupt_db_;
// Ordered list of unprep_seq sequence numbers that we have already written
// to DB.
//
// This maps unprep_seq => prepare_batch_cnt for each prepared batch written
// by this transactioin.
std::map<SequenceNumber, size_t> unprep_seqs_;
};
} // namespace rocksdb

@ -11,12 +11,13 @@
#include "utilities/transactions/write_unprepared_txn_db.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
namespace rocksdb {
Transaction* WriteUnpreparedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
if (old_txn != nullptr) {
ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn;
@ -25,5 +26,58 @@ Transaction* old_txn) {
}
}
// Struct to hold ownership of snapshot and read callback for iterator cleanup.
struct WriteUnpreparedTxnDB::IteratorState {
IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
std::shared_ptr<ManagedSnapshot> s,
SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
: callback(txn_db, sequence, min_uncommitted, txn), snapshot(s) {}
WriteUnpreparedTxnReadCallback callback;
std::shared_ptr<ManagedSnapshot> snapshot;
};
namespace {
static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
delete reinterpret_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1);
}
} // anonymous namespace
Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family,
WriteUnpreparedTxn* txn) {
// TODO(lth): Refactor so that this logic is shared with WritePrepared.
constexpr bool ALLOW_BLOB = true;
constexpr bool ALLOW_REFRESH = true;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq;
SequenceNumber min_uncommitted = 0;
if (options.snapshot != nullptr) {
snapshot_seq = options.snapshot->GetSequenceNumber();
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(
options.snapshot)
->min_uncommitted_;
} else {
auto* snapshot = GetSnapshot();
// We take a snapshot to make sure that the related data in the commit map
// are not deleted.
snapshot_seq = snapshot->GetSequenceNumber();
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
}
assert(snapshot_seq != kMaxSequenceNumber);
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
!ALLOW_BLOB, !ALLOW_REFRESH);
db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
return db_iter;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -11,16 +11,26 @@
#endif
#include "utilities/transactions/write_prepared_txn_db.h"
#include "utilities/transactions/write_unprepared_txn.h"
namespace rocksdb {
class WriteUnpreparedTxn;
class WriteUnpreparedTxnDB : public WritePreparedTxnDB {
public:
using WritePreparedTxnDB::WritePreparedTxnDB;
Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) override;
// Struct to hold ownership of snapshot and read callback for cleanup.
struct IteratorState;
using WritePreparedTxnDB::NewIterator;
Iterator* NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family,
WriteUnpreparedTxn* txn);
};
} // namespace rocksdb

Loading…
Cancel
Save