WritePrepared Txn: smallest_prepare optimization

Summary:
The is an optimization to reduce lookup in the CommitCache when querying IsInSnapshot. The optimization takes the smallest uncommitted data at the time that the snapshot was taken and if the sequence number of the read data is lower than that number it assumes the data as committed.
To implement this optimization two changes are required: i) The AddPrepared function must be called sequentially to avoid out of order insertion in the PrepareHeap (otherwise the top of the heap does not indicate the smallest prepare in future too), ii) non-2PC transactions also call AddPrepared if they do not commit in one step.
Closes https://github.com/facebook/rocksdb/pull/3649

Differential Revision: D7388630

Pulled By: maysamyabandeh

fbshipit-source-id: b79506238c17467d590763582960d4d90181c600
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 1579626d0d
commit b225de7e10
  1. 2
      db/db_impl.cc
  2. 3
      db/db_impl.h
  3. 8
      db/snapshot_impl.h
  4. 2
      utilities/transactions/pessimistic_transaction_db.h
  5. 4
      utilities/transactions/transaction_base.h
  6. 63
      utilities/transactions/write_prepared_txn.cc
  7. 2
      utilities/transactions/write_prepared_txn.h
  8. 180
      utilities/transactions/write_prepared_txn_db.cc
  9. 195
      utilities/transactions/write_prepared_txn_db.h

@ -1681,7 +1681,7 @@ const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) { SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) {
int64_t unix_time = 0; int64_t unix_time = 0;
env_->GetCurrentTime(&unix_time); // Ignore error env_->GetCurrentTime(&unix_time); // Ignore error
SnapshotImpl* s = new SnapshotImpl; SnapshotImpl* s = new SnapshotImpl;

@ -731,6 +731,7 @@ class DBImpl : public DB {
friend class DB; friend class DB;
friend class InternalStats; friend class InternalStats;
friend class PessimisticTransaction; friend class PessimisticTransaction;
friend class TransactionBaseImpl;
friend class WriteCommittedTxn; friend class WriteCommittedTxn;
friend class WritePreparedTxn; friend class WritePreparedTxn;
friend class WritePreparedTxnDB; friend class WritePreparedTxnDB;
@ -955,7 +956,7 @@ class DBImpl : public DB {
// helper function to call after some of the logs_ were synced // helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status); void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
const Snapshot* GetSnapshotImpl(bool is_write_conflict_boundary); SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary);
uint64_t GetMaxTotalWalSize() const; uint64_t GetMaxTotalWalSize() const;

@ -21,6 +21,10 @@ class SnapshotList;
class SnapshotImpl : public Snapshot { class SnapshotImpl : public Snapshot {
public: public:
SequenceNumber number_; // const after creation SequenceNumber number_; // const after creation
// It indicates the smallest uncommitted data at the time the snapshot was
// taken. This is currently used by WritePrepared transactions to limit the
// scope of queries to IsInSnpashot.
SequenceNumber min_uncommitted_ = 0;
virtual SequenceNumber GetSequenceNumber() const override { return number_; } virtual SequenceNumber GetSequenceNumber() const override { return number_; }
@ -56,8 +60,8 @@ class SnapshotList {
SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; } SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; }
SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; } SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; }
const SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time,
uint64_t unix_time, bool is_write_conflict_boundary) { bool is_write_conflict_boundary) {
s->number_ = seq; s->number_ = seq;
s->unix_time_ = unix_time; s->unix_time_ = unix_time;
s->is_write_conflict_boundary_ = is_write_conflict_boundary; s->is_write_conflict_boundary_ = is_write_conflict_boundary;

@ -35,6 +35,8 @@ class PessimisticTransactionDB : public TransactionDB {
virtual ~PessimisticTransactionDB(); virtual ~PessimisticTransactionDB();
virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }
virtual Status Initialize( virtual Status Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices, const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles); const std::vector<ColumnFamilyHandle*>& handles);

@ -187,7 +187,7 @@ class TransactionBaseImpl : public Transaction {
return snapshot_ ? snapshot_.get() : nullptr; return snapshot_ ? snapshot_.get() : nullptr;
} }
void SetSnapshot() override; virtual void SetSnapshot() override;
void SetSnapshotOnNextOperation( void SetSnapshotOnNextOperation(
std::shared_ptr<TransactionNotifier> notifier = nullptr) override; std::shared_ptr<TransactionNotifier> notifier = nullptr) override;
@ -303,6 +303,7 @@ class TransactionBaseImpl : public Transaction {
WriteBatchWithIndex write_batch_; WriteBatchWithIndex write_batch_;
private: private:
friend class WritePreparedTxn;
// Extra data to be persisted with the commit. Note this is only used when // Extra data to be persisted with the commit. Note this is only used when
// prepare phase is not skipped. // prepare phase is not skipped.
WriteBatch commit_time_batch_; WriteBatch commit_time_batch_;
@ -335,7 +336,6 @@ class TransactionBaseImpl : public Transaction {
bool read_only, bool exclusive, bool skip_validate = false); bool read_only, bool exclusive, bool skip_validate = false);
WriteBatchBase* GetBatchForWrite(); WriteBatchBase* GetBatchForWrite();
void SetSnapshotInternal(const Snapshot* snapshot); void SetSnapshotInternal(const Snapshot* snapshot);
}; };

@ -20,6 +20,7 @@
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "utilities/transactions/pessimistic_transaction.h" #include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/write_prepared_txn_db.h" #include "utilities/transactions/write_prepared_txn_db.h"
@ -39,8 +40,14 @@ Status WritePreparedTxn::Get(const ReadOptions& read_options,
auto snapshot = read_options.snapshot; auto snapshot = read_options.snapshot;
auto snap_seq = auto snap_seq =
snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0; // by default disable the optimization
if (snapshot != nullptr) {
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
}
WritePreparedTxnReadCallback callback(wpt_db_, snap_seq); WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted);
return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key, return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
pinnable_val, &callback); pinnable_val, &callback);
} }
@ -68,25 +75,26 @@ Status WritePreparedTxn::PrepareInternal() {
const bool WRITE_AFTER_COMMIT = true; const bool WRITE_AFTER_COMMIT = true;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
!WRITE_AFTER_COMMIT); !WRITE_AFTER_COMMIT);
const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber;
// For each duplicate key we account for a new sub-batch // For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
Status s = // AddPrepared better to be called in the pre-release callback otherwise there
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), // is a non-zero chance of max advancing prepare_seq and readers assume the
/*callback*/ nullptr, &log_number_, /*log ref*/ 0, // data as committed.
!DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_); // Also having it in the PreReleaseCallback allows in-order addition of
// prepared entries to PrepareHeap and hence enables an optimization. Refer to
// SmallestUnCommittedSeq for more details.
AddPreparedCallback add_prepared_callback(
wpt_db_, prepare_batch_cnt_,
db_impl_->immutable_db_options().two_write_queues);
const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber;
Status s = db_impl_->WriteImpl(
write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE,
&seq_used, prepare_batch_cnt_, &add_prepared_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
auto prepare_seq = seq_used; auto prepare_seq = seq_used;
SetId(prepare_seq); SetId(prepare_seq);
// TODO(myabandeh): AddPrepared better to be called in the pre-release
// callback otherwise there is a non-zero chance of max dvancing prepare_seq
// and readers assume the data as committed.
if (s.ok()) {
for (size_t i = 0; i < prepare_batch_cnt_; i++) {
wpt_db_->AddPrepared(prepare_seq + i);
}
}
return s; return s;
} }
@ -135,6 +143,10 @@ Status WritePreparedTxn::CommitInternal() {
const bool do_one_write = const bool do_one_write =
!db_impl_->immutable_db_options().two_write_queues || disable_memtable; !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
const bool publish_seq = do_one_write; const bool publish_seq = do_one_write;
// Note: CommitTimeWriteBatch does not need AddPrepared since it is written to
// DB in one shot. min_uncommitted still works since it requires capturing
// data that is written to DB but not yet committed, while
// CommitTimeWriteBatch commits with PreReleaseCallback.
WritePreparedCommitEntryPreReleaseCallback update_commit_map( WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt, wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt,
!PREP_HEAP_SKIPPED, publish_seq); !PREP_HEAP_SKIPPED, publish_seq);
@ -204,7 +216,8 @@ Status WritePreparedTxn::RollbackInternal() {
WriteBatch* dst_batch, WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators) std::map<uint32_t, const Comparator*>& comparators)
: db_(db), : db_(db),
callback(wpt_db, snap_seq), callback(wpt_db, snap_seq,
0), // 0 disables min_uncommitted optimization
rollback_batch_(dst_batch), rollback_batch_(dst_batch),
comparators_(comparators) {} comparators_(comparators) {}
@ -285,6 +298,10 @@ Status WritePreparedTxn::RollbackInternal() {
const size_t ONE_BATCH = 1; const size_t ONE_BATCH = 1;
WritePreparedCommitEntryPreReleaseCallback update_commit_map( WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, ONE_BATCH); wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, ONE_BATCH);
// Note: the rollback batch does not need AddPrepared since it is written to
// DB in one shot. min_uncommitted still works since it requires capturing
// data that is written to DB but not yet committed, while
// the roolback batch commits with PreReleaseCallback.
s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
do_one_write ? &update_commit_map : nullptr); do_one_write ? &update_commit_map : nullptr);
@ -335,6 +352,10 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
SequenceNumber* tracked_at_seq) { SequenceNumber* tracked_at_seq) {
assert(snapshot_); assert(snapshot_);
SequenceNumber min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(
snapshot_.get())
->min_uncommitted_;
SequenceNumber snap_seq = snapshot_->GetSequenceNumber(); SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
// tracked_at_seq is either max or the last snapshot with which this key was // tracked_at_seq is either max or the last snapshot with which this key was
// trackeed so there is no need to apply the IsInSnapshot to this comparison // trackeed so there is no need to apply the IsInSnapshot to this comparison
@ -351,12 +372,20 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
ColumnFamilyHandle* cfh = ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily(); column_family ? column_family : db_impl_->DefaultColumnFamily();
WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq); WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted);
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snap_seq, false /* cache_only */, snap_seq, false /* cache_only */,
&snap_checker); &snap_checker);
} }
void WritePreparedTxn::SetSnapshot() {
const bool FOR_WW_CONFLICT_CHECK = true;
SnapshotImpl* snapshot = dbimpl_->GetSnapshotImpl(FOR_WW_CONFLICT_CHECK);
assert(snapshot);
wpt_db_->EnhanceSnapshot(snapshot);
SetSnapshotInternal(snapshot);
}
Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) { Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch); auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();

@ -61,6 +61,8 @@ class WritePreparedTxn : public PessimisticTransaction {
virtual Iterator* GetIterator(const ReadOptions& options, virtual Iterator* GetIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override; ColumnFamilyHandle* column_family) override;
virtual void SetSnapshot() override;
protected: protected:
// Override the protected SetId to make it visible to the friend class // Override the protected SetId to make it visible to the friend class
// WritePreparedTxnDB // WritePreparedTxnDB

@ -21,6 +21,7 @@
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -149,11 +150,22 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
const uint64_t no_log_ref = 0; const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
const size_t ZERO_PREPARES = 0; const size_t ZERO_PREPARES = 0;
// Since this is not 2pc, there is no need for AddPrepared but having it in
// the PreReleaseCallback enables an optimization. Refer to
// SmallestUnCommittedSeq for more details.
AddPreparedCallback add_prepared_callback(
this, batch_cnt, db_impl_->immutable_db_options().two_write_queues);
WritePreparedCommitEntryPreReleaseCallback update_commit_map( WritePreparedCommitEntryPreReleaseCallback update_commit_map(
this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt); this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
auto s = db_impl_->WriteImpl( PreReleaseCallback* pre_release_callback;
write_options, batch, nullptr, nullptr, no_log_ref, !DISABLE_MEMTABLE, if (do_one_write) {
&seq_used, batch_cnt, do_one_write ? &update_commit_map : nullptr); pre_release_callback = &update_commit_map;
} else {
pre_release_callback = &add_prepared_callback;
}
auto s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr,
no_log_ref, !DISABLE_MEMTABLE, &seq_used,
batch_cnt, pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
uint64_t& prepare_seq = seq_used; uint64_t& prepare_seq = seq_used;
if (txn != nullptr) { if (txn != nullptr) {
@ -170,15 +182,12 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitBatchInternal 2nd write prepare_seq: %" PRIu64, "CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
prepare_seq); prepare_seq);
// TODO(myabandeh): What if max advances the prepare_seq_ in the meanwhile and
// readers assume the prepared data as committed? Almost zero probability.
// Commit the batch by writing an empty batch to the 2nd queue that will // Commit the batch by writing an empty batch to the 2nd queue that will
// release the commit sequence number to readers. // release the commit sequence number to readers.
const size_t ZERO_COMMITS = 0; const size_t ZERO_COMMITS = 0;
const bool PREP_HEAP_SKIPPED = true; const bool PREP_HEAP_SKIPPED = true;
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS, PREP_HEAP_SKIPPED); this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS, !PREP_HEAP_SKIPPED);
WriteBatch empty_batch; WriteBatch empty_batch;
empty_batch.PutLogData(Slice()); empty_batch.PutLogData(Slice());
const size_t ONE_BATCH = 1; const size_t ONE_BATCH = 1;
@ -197,10 +206,16 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options,
// We are fine with the latest committed value. This could be done by // We are fine with the latest committed value. This could be done by
// specifying the snapshot as kMaxSequenceNumber. // specifying the snapshot as kMaxSequenceNumber.
SequenceNumber seq = kMaxSequenceNumber; SequenceNumber seq = kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0;
if (options.snapshot != nullptr) { if (options.snapshot != nullptr) {
seq = options.snapshot->GetSequenceNumber(); seq = options.snapshot->GetSequenceNumber();
min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
options.snapshot)
->min_uncommitted_;
} else {
min_uncommitted = SmallestUnCommittedSeq();
} }
WritePreparedTxnReadCallback callback(this, seq); WritePreparedTxnReadCallback callback(this, seq, min_uncommitted);
bool* dont_care = nullptr; bool* dont_care = nullptr;
// Note: no need to specify a snapshot for read options as no specific // Note: no need to specify a snapshot for read options as no specific
// snapshot is requested by the user. // snapshot is requested by the user.
@ -252,8 +267,9 @@ std::vector<Status> WritePreparedTxnDB::MultiGet(
// Struct to hold ownership of snapshot and read callback for iterator cleanup. // Struct to hold ownership of snapshot and read callback for iterator cleanup.
struct WritePreparedTxnDB::IteratorState { struct WritePreparedTxnDB::IteratorState {
IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
std::shared_ptr<ManagedSnapshot> s) std::shared_ptr<ManagedSnapshot> s,
: callback(txn_db, sequence), snapshot(s) {} SequenceNumber min_uncommitted)
: callback(txn_db, sequence, min_uncommitted), snapshot(s) {}
WritePreparedTxnReadCallback callback; WritePreparedTxnReadCallback callback;
std::shared_ptr<ManagedSnapshot> snapshot; std::shared_ptr<ManagedSnapshot> snapshot;
@ -271,18 +287,26 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
constexpr bool ALLOW_REFRESH = true; constexpr bool ALLOW_REFRESH = true;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber; SequenceNumber snapshot_seq = kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0;
if (options.snapshot != nullptr) { if (options.snapshot != nullptr) {
snapshot_seq = options.snapshot->GetSequenceNumber(); snapshot_seq = options.snapshot->GetSequenceNumber();
min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
options.snapshot)
->min_uncommitted_;
} else { } else {
auto* snapshot = db_impl_->GetSnapshot(); auto* snapshot = GetSnapshot();
// We take a snapshot to make sure that the related data in the commit map // We take a snapshot to make sure that the related data in the commit map
// are not deleted. // are not deleted.
snapshot_seq = snapshot->GetSequenceNumber(); snapshot_seq = snapshot->GetSequenceNumber();
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot); own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
} }
assert(snapshot_seq != kMaxSequenceNumber); assert(snapshot_seq != kMaxSequenceNumber);
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
auto* state = new IteratorState(this, snapshot_seq, own_snapshot); auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
auto* db_iter = auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
!ALLOW_BLOB, !ALLOW_REFRESH); !ALLOW_BLOB, !ALLOW_REFRESH);
@ -298,20 +322,28 @@ Status WritePreparedTxnDB::NewIterators(
constexpr bool ALLOW_REFRESH = true; constexpr bool ALLOW_REFRESH = true;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber; SequenceNumber snapshot_seq = kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0;
if (options.snapshot != nullptr) { if (options.snapshot != nullptr) {
snapshot_seq = options.snapshot->GetSequenceNumber(); snapshot_seq = options.snapshot->GetSequenceNumber();
min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
options.snapshot)
->min_uncommitted_;
} else { } else {
auto* snapshot = db_impl_->GetSnapshot(); auto* snapshot = GetSnapshot();
// We take a snapshot to make sure that the related data in the commit map // We take a snapshot to make sure that the related data in the commit map
// are not deleted. // are not deleted.
snapshot_seq = snapshot->GetSequenceNumber(); snapshot_seq = snapshot->GetSequenceNumber();
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot); own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
min_uncommitted =
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
->min_uncommitted_;
} }
iterators->clear(); iterators->clear();
iterators->reserve(column_families.size()); iterators->reserve(column_families.size());
for (auto* column_family : column_families) { for (auto* column_family : column_families) {
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
auto* state = new IteratorState(this, snapshot_seq, own_snapshot); auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
auto* db_iter = auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
!ALLOW_BLOB, !ALLOW_REFRESH); !ALLOW_BLOB, !ALLOW_REFRESH);
@ -332,118 +364,6 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {}); new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
} }
// Returns true if commit_seq <= snapshot_seq
bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
uint64_t snapshot_seq) const {
// Here we try to infer the return value without looking into prepare list.
// This would help avoiding synchronization over a shared map.
// TODO(myabandeh): optimize this. This sequence of checks must be correct but
// not necessary efficient
if (prep_seq == 0) {
// Compaction will output keys to bottom-level with sequence number 0 if
// it is visible to the earliest snapshot.
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
}
if (snapshot_seq < prep_seq) {
// snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
}
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
// We should not normally reach here
ReadLock rl(&prepared_mutex_);
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()));
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
// Then it is not committed yet
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
}
}
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
CommitEntry64b dont_care;
CommitEntry cached;
bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
if (exist && prep_seq == cached.prep_seq) {
// It is committed and also not evicted from commit cache
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
return cached.commit_seq <= snapshot_seq;
}
// else it could be committed but not inserted in the map which could happen
// after recovery, or it could be committed and evicted by another commit, or
// never committed.
// At this point we dont know if it was committed or it is still prepared
auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
// max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now
if (max_evicted_seq < prep_seq) {
// Not evicted from cache and also not present, so must be still prepared
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
}
// When advancing max_evicted_seq_, we move older entires from prepared to
// delayed_prepared_. Also we move evicted entries from commit cache to
// old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
// max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
// old_commit_map_, iii) committed with no conflict with any snapshot. Case
// (i) delayed_prepared_ is checked above
if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case
// only (iii) is the case: committed
// commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
// snapshot_seq
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
}
// else (ii) might be the case: check the commit data saved for this snapshot.
// If there was no overlapping commit entry, then it is committed with a
// commit_seq lower than any live snapshot, including snapshot_seq.
if (old_commit_map_empty_.load(std::memory_order_acquire)) {
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
}
{
// We should not normally reach here unless sapshot_seq is old. This is a
// rare case and it is ok to pay the cost of mutex ReadLock for such old,
// reading transactions.
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
ReadLock rl(&old_commit_map_mutex_);
auto prep_set_entry = old_commit_map_.find(snapshot_seq);
bool found = prep_set_entry != old_commit_map_.end();
if (found) {
auto& vec = prep_set_entry->second;
found = std::binary_search(vec.begin(), vec.end(), prep_seq);
}
if (!found) {
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
}
}
// (ii) it the case: it is committed but after the snapshot_seq
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
}
void WritePreparedTxnDB::AddPrepared(uint64_t seq) { void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq); ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq);
assert(seq > max_evicted_seq_); assert(seq > max_evicted_seq_);
@ -622,6 +542,14 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
}; };
} }
const Snapshot* WritePreparedTxnDB::GetSnapshot() {
const bool FOR_WW_CONFLICT_CHECK = true;
SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(!FOR_WW_CONFLICT_CHECK);
assert(snap_impl);
EnhanceSnapshot(snap_impl);
return snap_impl;
}
const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB( const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
SequenceNumber max) { SequenceNumber max) {
ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max); ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);

@ -6,6 +6,11 @@
#pragma once #pragma once
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <mutex> #include <mutex>
#include <queue> #include <queue>
#include <set> #include <set>
@ -110,8 +115,131 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
virtual void ReleaseSnapshot(const Snapshot* snapshot) override; virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
// Check whether the transaction that wrote the value with sequence number seq // Check whether the transaction that wrote the value with sequence number seq
// is visible to the snapshot with sequence number snapshot_seq // is visible to the snapshot with sequence number snapshot_seq.
bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const; // Returns true if commit_seq <= snapshot_seq
inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
uint64_t min_uncommitted = 0) const {
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
" min_uncommitted %" PRIu64,
prep_seq, snapshot_seq, min_uncommitted);
// Here we try to infer the return value without looking into prepare list.
// This would help avoiding synchronization over a shared map.
// TODO(myabandeh): optimize this. This sequence of checks must be correct
// but not necessary efficient
if (prep_seq == 0) {
// Compaction will output keys to bottom-level with sequence number 0 if
// it is visible to the earliest snapshot.
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
}
if (snapshot_seq < prep_seq) {
// snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
}
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
// We should not normally reach here
ReadLock rl(&prepared_mutex_);
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()));
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
// Then it is not committed yet
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
" returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
}
}
// Note: since min_uncommitted does not include the delayed_prepared_ we
// should check delayed_prepared_ first before applying this optimization.
// TODO(myabandeh): include delayed_prepared_ in min_uncommitted
if (prep_seq < min_uncommitted) {
return true;
}
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
CommitEntry64b dont_care;
CommitEntry cached;
bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
if (exist && prep_seq == cached.prep_seq) {
// It is committed and also not evicted from commit cache
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq);
return cached.commit_seq <= snapshot_seq;
}
// else it could be committed but not inserted in the map which could happen
// after recovery, or it could be committed and evicted by another commit,
// or never committed.
// At this point we dont know if it was committed or it is still prepared
auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
// max_evicted_seq_ when we did GetCommitEntry <= max_evicted_seq now
if (max_evicted_seq < prep_seq) {
// Not evicted from cache and also not present, so must be still prepared
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
}
// When advancing max_evicted_seq_, we move older entires from prepared to
// delayed_prepared_. Also we move evicted entries from commit cache to
// old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
// max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
// old_commit_map_, iii) committed with no conflict with any snapshot. Case
// (i) delayed_prepared_ is checked above
if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case
// only (iii) is the case: committed
// commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
// snapshot_seq
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
}
// else (ii) might be the case: check the commit data saved for this
// snapshot. If there was no overlapping commit entry, then it is committed
// with a commit_seq lower than any live snapshot, including snapshot_seq.
if (old_commit_map_empty_.load(std::memory_order_acquire)) {
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
}
{
// We should not normally reach here unless sapshot_seq is old. This is a
// rare case and it is ok to pay the cost of mutex ReadLock for such old,
// reading transactions.
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
ReadLock rl(&old_commit_map_mutex_);
auto prep_set_entry = old_commit_map_.find(snapshot_seq);
bool found = prep_set_entry != old_commit_map_.end();
if (found) {
auto& vec = prep_set_entry->second;
found = std::binary_search(vec.begin(), vec.end(), prep_seq);
}
if (!found) {
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
" returns %" PRId32,
prep_seq, snapshot_seq, 1);
return true;
}
}
// (ii) it the case: it is committed but after the snapshot_seq
ROCKS_LOG_DETAILS(
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
prep_seq, snapshot_seq, 0);
return false;
}
// Add the transaction with prepare sequence seq to the prepared list // Add the transaction with prepare sequence seq to the prepared list
void AddPrepared(uint64_t seq); void AddPrepared(uint64_t seq);
// Rollback a prepared txn identified with prep_seq. rollback_seq is the seq // Rollback a prepared txn identified with prep_seq. rollback_seq is the seq
@ -224,6 +352,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
const std::vector<ColumnFamilyHandle*>& handles) override; const std::vector<ColumnFamilyHandle*>& handles) override;
void UpdateCFComparatorMap(const ColumnFamilyHandle* handle) override; void UpdateCFComparatorMap(const ColumnFamilyHandle* handle) override;
virtual const Snapshot* GetSnapshot() override;
protected: protected:
virtual Status VerifyCFOptions( virtual Status VerifyCFOptions(
const ColumnFamilyOptions& cf_options) override; const ColumnFamilyOptions& cf_options) override;
@ -239,6 +369,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class PreparedHeap_BasicsTest_Test; friend class PreparedHeap_BasicsTest_Test;
friend class PreparedHeap_EmptyAtTheEnd_Test; friend class PreparedHeap_EmptyAtTheEnd_Test;
friend class PreparedHeap_Concurrent_Test; friend class PreparedHeap_Concurrent_Test;
friend class WritePreparedTxn;
friend class WritePreparedTxnDBMock; friend class WritePreparedTxnDBMock;
friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
friend class friend class
@ -336,6 +467,32 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, void AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
const SequenceNumber& new_max); const SequenceNumber& new_max);
inline SequenceNumber SmallestUnCommittedSeq() {
// Since we update the prepare_heap always from the main write queue via
// PreReleaseCallback, the prepared_txns_.top() indicates the smallest
// prepared data in 2pc transactions. For non-2pc transactions that are
// written in two steps, we also update prepared_txns_ at the first step
// (via the same mechanism) so that their uncommitted data is reflected in
// SmallestUnCommittedSeq.
ReadLock rl(&prepared_mutex_);
// Since we are holding the mutex, and GetLatestSequenceNumber is updated
// after prepared_txns_ are, the value of GetLatestSequenceNumber would
// reflect any uncommitted data that is not added to prepared_txns_ yet.
// Otherwise, if there is no concurrent txn, this value simply reflects that
// latest value in the memtable.
if (prepared_txns_.empty()) {
return db_impl_->GetLatestSequenceNumber() + 1;
} else {
return std::min(prepared_txns_.top(),
db_impl_->GetLatestSequenceNumber() + 1);
}
}
// Enhance the snapshot object by recording in it the smallest uncommitted seq
inline void EnhanceSnapshot(SnapshotImpl* snapshot) {
assert(snapshot);
snapshot->min_uncommitted_ = WritePreparedTxnDB::SmallestUnCommittedSeq();
}
virtual const std::vector<SequenceNumber> GetSnapshotListFromDB( virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
SequenceNumber max); SequenceNumber max);
@ -438,18 +595,44 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
class WritePreparedTxnReadCallback : public ReadCallback { class WritePreparedTxnReadCallback : public ReadCallback {
public: public:
WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
: db_(db), snapshot_(snapshot) {} SequenceNumber min_uncommitted)
: db_(db), snapshot_(snapshot), min_uncommitted_(min_uncommitted) {}
// Will be called to see if the seq number accepted; if not it moves on to the // Will be called to see if the seq number accepted; if not it moves on to the
// next seq number. // next seq number.
virtual bool IsCommitted(SequenceNumber seq) override { inline virtual bool IsCommitted(SequenceNumber seq) override {
return db_->IsInSnapshot(seq, snapshot_); return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_);
} }
private: private:
WritePreparedTxnDB* db_; WritePreparedTxnDB* db_;
SequenceNumber snapshot_; SequenceNumber snapshot_;
SequenceNumber min_uncommitted_;
};
class AddPreparedCallback : public PreReleaseCallback {
public:
AddPreparedCallback(WritePreparedTxnDB* db, size_t sub_batch_cnt,
bool two_write_queues)
: db_(db),
sub_batch_cnt_(sub_batch_cnt),
two_write_queues_(two_write_queues) {
(void)two_write_queues_; // to silence unused private field warning
}
virtual Status Callback(SequenceNumber prepare_seq,
bool is_mem_disabled) override {
assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue
for (size_t i = 0; i < sub_batch_cnt_; i++) {
db_->AddPrepared(prepare_seq + i);
}
return Status::OK();
}
private:
WritePreparedTxnDB* db_;
size_t sub_batch_cnt_;
bool two_write_queues_;
}; };
class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {

Loading…
Cancel
Save