WritePrepared Txn: Recovery

Summary:
Recover txns from the WAL. Also added some unit tests.
Closes https://github.com/facebook/rocksdb/pull/2901

Differential Revision: D5859596

Pulled By: maysamyabandeh

fbshipit-source-id: 6424967b231388093b4effffe0a3b1b7ec8caeb0
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 8c724f5c7f
commit 385049baf2
  1. 4
      db/db_impl.cc
  2. 12
      db/db_impl.h
  3. 2
      db/db_impl_files.cc
  4. 4
      db/db_impl_write.cc
  5. 2
      db/memtable.cc
  6. 18
      db/version_set.cc
  7. 86
      db/write_batch.cc
  8. 5
      db/write_batch_test.cc
  9. 1
      db/write_thread.cc
  10. 3
      db/write_thread.h
  11. 15
      include/rocksdb/utilities/transaction.h
  12. 2
      include/rocksdb/write_batch.h
  13. 43
      utilities/transactions/pessimistic_transaction_db.cc
  14. 16
      utilities/transactions/pessimistic_transaction_db.h
  15. 170
      utilities/transactions/transaction_test.cc
  16. 99
      utilities/transactions/transaction_test.h
  17. 505
      utilities/transactions/write_prepared_transaction_test.cc
  18. 20
      utilities/transactions/write_prepared_txn.cc
  19. 3
      utilities/transactions/write_prepared_txn.h

@ -940,6 +940,10 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
SequenceNumber snapshot; SequenceNumber snapshot;
if (read_options.snapshot != nullptr) { if (read_options.snapshot != nullptr) {
// Note: In WritePrepared txns this is not necessary but not harmful either.
// Because prep_seq > snapshot => commit_seq > snapshot so if a snapshot is
// specified we should be fine with skipping seq numbers that are greater
// than that.
snapshot = reinterpret_cast<const SnapshotImpl*>( snapshot = reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_; read_options.snapshot)->number_;
} else { } else {

@ -510,9 +510,11 @@ class DBImpl : public DB {
uint64_t log_number_; uint64_t log_number_;
std::string name_; std::string name_;
WriteBatch* batch_; WriteBatch* batch_;
// The seq number of the first key in the batch
SequenceNumber seq_;
explicit RecoveredTransaction(const uint64_t log, const std::string& name, explicit RecoveredTransaction(const uint64_t log, const std::string& name,
WriteBatch* batch) WriteBatch* batch, SequenceNumber seq)
: log_number_(log), name_(name), batch_(batch) {} : log_number_(log), name_(name), batch_(batch), seq_(seq) {}
~RecoveredTransaction() { delete batch_; } ~RecoveredTransaction() { delete batch_; }
}; };
@ -534,8 +536,9 @@ class DBImpl : public DB {
} }
void InsertRecoveredTransaction(const uint64_t log, const std::string& name, void InsertRecoveredTransaction(const uint64_t log, const std::string& name,
WriteBatch* batch) { WriteBatch* batch, SequenceNumber seq) {
recovered_transactions_[name] = new RecoveredTransaction(log, name, batch); recovered_transactions_[name] =
new RecoveredTransaction(log, name, batch, seq);
MarkLogAsContainingPrepSection(log); MarkLogAsContainingPrepSection(log);
} }
@ -640,6 +643,7 @@ class DBImpl : public DB {
friend class PessimisticTransaction; friend class PessimisticTransaction;
friend class WriteCommittedTxn; friend class WriteCommittedTxn;
friend class WritePreparedTxn; friend class WritePreparedTxn;
friend class WritePreparedTxnDB;
friend class WriteBatchWithIndex; friend class WriteBatchWithIndex;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
friend class ForwardIterator; friend class ForwardIterator;

@ -48,6 +48,7 @@ uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
return min_log; return min_log;
} }
// TODO(myabandeh): Avoid using locks
void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) { void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) {
assert(log != 0); assert(log != 0);
std::lock_guard<std::mutex> lock(prep_heap_mutex_); std::lock_guard<std::mutex> lock(prep_heap_mutex_);
@ -56,6 +57,7 @@ void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) {
it->second += 1; it->second += 1;
} }
// TODO(myabandeh): Avoid using locks
void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) { void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
assert(log != 0); assert(log != 0);
std::lock_guard<std::mutex> lock(prep_heap_mutex_); std::lock_guard<std::mutex> lock(prep_heap_mutex_);

@ -264,9 +264,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} else { } else {
SequenceNumber next_sequence = current_sequence; SequenceNumber next_sequence = current_sequence;
for (auto* writer : write_group) { for (auto* writer : write_group) {
if (writer->ShouldWriteToMemtable()) { writer->sequence = next_sequence;
writer->sequence = next_sequence;
}
if (seq_per_batch_) { if (seq_per_batch_) {
next_sequence++; next_sequence++;
} else if (writer->ShouldWriteToMemtable()) { } else if (writer->ShouldWriteToMemtable()) {

@ -475,7 +475,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
} }
// The first sequence number inserted into the memtable // The first sequence number inserted into the memtable
assert(first_seqno_ == 0 || s > first_seqno_); assert(first_seqno_ == 0 || s >= first_seqno_);
if (first_seqno_ == 0) { if (first_seqno_ == 0) {
first_seqno_.store(s, std::memory_order_relaxed); first_seqno_.store(s, std::memory_order_relaxed);

@ -2655,7 +2655,14 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
assert(edit->IsColumnFamilyManipulation()); assert(edit->IsColumnFamilyManipulation());
edit->SetNextFile(next_file_number_.load()); edit->SetNextFile(next_file_number_.load());
edit->SetLastSequence(last_sequence_); // The log might have data that is not visible to memtbale and hence have not
// updated the last_sequence_ yet. It is also possible that the log has is
// expecting some new data that is not written yet. Since LastSequence is an
// upper bound on the sequence, it is ok to record
// last_to_be_written_sequence_ as the last sequence.
edit->SetLastSequence(db_options_->concurrent_prepare
? last_to_be_written_sequence_
: last_sequence_);
if (edit->is_column_family_drop_) { if (edit->is_column_family_drop_) {
// if we drop column family, we have to make sure to save max column family, // if we drop column family, we have to make sure to save max column family,
// so that we don't reuse existing ID // so that we don't reuse existing ID
@ -2678,7 +2685,14 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
edit->SetPrevLogNumber(prev_log_number_); edit->SetPrevLogNumber(prev_log_number_);
} }
edit->SetNextFile(next_file_number_.load()); edit->SetNextFile(next_file_number_.load());
edit->SetLastSequence(last_sequence_); // The log might have data that is not visible to memtbale and hence have not
// updated the last_sequence_ yet. It is also possible that the log has is
// expecting some new data that is not written yet. Since LastSequence is an
// upper bound on the sequence, it is ok to record
// last_to_be_written_sequence_ as the last sequence.
edit->SetLastSequence(db_options_->concurrent_prepare
? last_to_be_written_sequence_
: last_sequence_);
builder->Apply(edit); builder->Apply(edit);
} }

@ -366,7 +366,11 @@ Status WriteBatch::Iterate(Handler* handler) const {
input.remove_prefix(WriteBatchInternal::kHeader); input.remove_prefix(WriteBatchInternal::kHeader);
Slice key, value, blob, xid; Slice key, value, blob, xid;
bool first_tag = true; // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
// the batch boundry sybmols otherwise we would mis-count the number of
// batches. We do that by checking whether the accumulated batch is empty
// before seeing the next Noop.
bool empty_batch = true;
int found = 0; int found = 0;
Status s; Status s;
while (s.ok() && !input.empty() && handler->Continue()) { while (s.ok() && !input.empty() && handler->Continue()) {
@ -385,6 +389,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) & assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
s = handler->PutCF(column_family, key, value); s = handler->PutCF(column_family, key, value);
empty_batch = false;
found++; found++;
break; break;
case kTypeColumnFamilyDeletion: case kTypeColumnFamilyDeletion:
@ -392,6 +397,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) & assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE)); (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
s = handler->DeleteCF(column_family, key); s = handler->DeleteCF(column_family, key);
empty_batch = false;
found++; found++;
break; break;
case kTypeColumnFamilySingleDeletion: case kTypeColumnFamilySingleDeletion:
@ -399,6 +405,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) & assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE)); (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
s = handler->SingleDeleteCF(column_family, key); s = handler->SingleDeleteCF(column_family, key);
empty_batch = false;
found++; found++;
break; break;
case kTypeColumnFamilyRangeDeletion: case kTypeColumnFamilyRangeDeletion:
@ -406,6 +413,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) & assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE)); (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
s = handler->DeleteRangeCF(column_family, key, value); s = handler->DeleteRangeCF(column_family, key, value);
empty_batch = false;
found++; found++;
break; break;
case kTypeColumnFamilyMerge: case kTypeColumnFamilyMerge:
@ -413,38 +421,44 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) & assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_MERGE)); (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
s = handler->MergeCF(column_family, key, value); s = handler->MergeCF(column_family, key, value);
empty_batch = false;
found++; found++;
break; break;
case kTypeLogData: case kTypeLogData:
handler->LogData(blob); handler->LogData(blob);
empty_batch = true;
break; break;
case kTypeBeginPrepareXID: case kTypeBeginPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) & assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
handler->MarkBeginPrepare(); handler->MarkBeginPrepare();
empty_batch = false;
break; break;
case kTypeEndPrepareXID: case kTypeEndPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) & assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE)); (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
handler->MarkEndPrepare(xid); handler->MarkEndPrepare(xid);
empty_batch = true;
break; break;
case kTypeCommitXID: case kTypeCommitXID:
assert(content_flags_.load(std::memory_order_relaxed) & assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT)); (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
handler->MarkCommit(xid); handler->MarkCommit(xid);
empty_batch = true;
break; break;
case kTypeRollbackXID: case kTypeRollbackXID:
assert(content_flags_.load(std::memory_order_relaxed) & assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK)); (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
handler->MarkRollback(xid); handler->MarkRollback(xid);
empty_batch = true;
break; break;
case kTypeNoop: case kTypeNoop:
handler->MarkNoop(first_tag); handler->MarkNoop(empty_batch);
empty_batch = true;
break; break;
default: default:
return Status::Corruption("unknown WriteBatch tag"); return Status::Corruption("unknown WriteBatch tag");
} }
first_tag = false;
} }
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -841,9 +855,12 @@ class MemTableInserter : public WriteBatch::Handler {
PostMapType mem_post_info_map_; PostMapType mem_post_info_map_;
// current recovered transaction we are rebuilding (recovery) // current recovered transaction we are rebuilding (recovery)
WriteBatch* rebuilding_trx_; WriteBatch* rebuilding_trx_;
SequenceNumber rebuilding_trx_seq_;
// Increase seq number once per each write batch. Otherwise increase it once // Increase seq number once per each write batch. Otherwise increase it once
// per key. // per key.
bool seq_per_batch_; bool seq_per_batch_;
// Whether the memtable write will be done only after the commit
bool write_after_commit_;
MemPostInfoMap& GetPostMap() { MemPostInfoMap& GetPostMap() {
assert(concurrent_memtable_writes_); assert(concurrent_memtable_writes_);
@ -873,7 +890,11 @@ class MemTableInserter : public WriteBatch::Handler {
post_info_created_(false), post_info_created_(false),
has_valid_writes_(has_valid_writes), has_valid_writes_(has_valid_writes),
rebuilding_trx_(nullptr), rebuilding_trx_(nullptr),
seq_per_batch_(seq_per_batch) { seq_per_batch_(seq_per_batch),
// Write after commit currently uses one seq per key (instead of per
// batch). So seq_per_batch being false indicates write_after_commit
// approach.
write_after_commit_(!seq_per_batch) {
assert(cf_mems_); assert(cf_mems_);
} }
@ -952,7 +973,10 @@ class MemTableInserter : public WriteBatch::Handler {
const Slice& value) override { const Slice& value) override {
if (rebuilding_trx_ != nullptr) { if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
return Status::OK(); if (write_after_commit_) {
return Status::OK();
}
// else insert the values to the memtable right away
} }
Status seek_status; Status seek_status;
@ -1030,7 +1054,10 @@ class MemTableInserter : public WriteBatch::Handler {
const Slice& key) override { const Slice& key) override {
if (rebuilding_trx_ != nullptr) { if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
return Status::OK(); if (write_after_commit_) {
return Status::OK();
}
// else insert the values to the memtable right away
} }
Status seek_status; Status seek_status;
@ -1046,7 +1073,10 @@ class MemTableInserter : public WriteBatch::Handler {
const Slice& key) override { const Slice& key) override {
if (rebuilding_trx_ != nullptr) { if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
return Status::OK(); if (write_after_commit_) {
return Status::OK();
}
// else insert the values to the memtable right away
} }
Status seek_status; Status seek_status;
@ -1064,7 +1094,10 @@ class MemTableInserter : public WriteBatch::Handler {
if (rebuilding_trx_ != nullptr) { if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
begin_key, end_key); begin_key, end_key);
return Status::OK(); if (write_after_commit_) {
return Status::OK();
}
// else insert the values to the memtable right away
} }
Status seek_status; Status seek_status;
@ -1094,7 +1127,10 @@ class MemTableInserter : public WriteBatch::Handler {
assert(!concurrent_memtable_writes_); assert(!concurrent_memtable_writes_);
if (rebuilding_trx_ != nullptr) { if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
return Status::OK(); if (write_after_commit_) {
return Status::OK();
}
// else insert the values to the memtable right away
} }
Status seek_status; Status seek_status;
@ -1200,6 +1236,7 @@ class MemTableInserter : public WriteBatch::Handler {
// we are now iterating through a prepared section // we are now iterating through a prepared section
rebuilding_trx_ = new WriteBatch(); rebuilding_trx_ = new WriteBatch();
rebuilding_trx_seq_ = sequence_;
if (has_valid_writes_ != nullptr) { if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true; *has_valid_writes_ = true;
} }
@ -1215,7 +1252,7 @@ class MemTableInserter : public WriteBatch::Handler {
if (recovering_log_number_ != 0) { if (recovering_log_number_ != 0) {
assert(db_->allow_2pc()); assert(db_->allow_2pc());
db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(), db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
rebuilding_trx_); rebuilding_trx_, rebuilding_trx_seq_);
rebuilding_trx_ = nullptr; rebuilding_trx_ = nullptr;
} else { } else {
assert(rebuilding_trx_ == nullptr); assert(rebuilding_trx_ == nullptr);
@ -1226,10 +1263,10 @@ class MemTableInserter : public WriteBatch::Handler {
return Status::OK(); return Status::OK();
} }
Status MarkNoop(bool first_tag) override { Status MarkNoop(bool empty_batch) override {
// A hack in pessimistic transaction could result into a noop at the start // A hack in pessimistic transaction could result into a noop at the start
// of the write batch, that should be ignored. // of the write batch, that should be ignored.
if (!first_tag) { if (!empty_batch) {
// In the absence of Prepare markers, a kTypeNoop tag indicates the end of // In the absence of Prepare markers, a kTypeNoop tag indicates the end of
// a batch. This happens when write batch commits skipping the prepare // a batch. This happens when write batch commits skipping the prepare
// phase. // phase.
@ -1257,12 +1294,13 @@ class MemTableInserter : public WriteBatch::Handler {
// at this point individual CF lognumbers will prevent // at this point individual CF lognumbers will prevent
// duplicate re-insertion of values. // duplicate re-insertion of values.
assert(log_number_ref_ == 0); assert(log_number_ref_ == 0);
// all insertes must reference this trx log number if (write_after_commit_) {
log_number_ref_ = trx->log_number_; // all insertes must reference this trx log number
s = trx->batch_->Iterate(this); log_number_ref_ = trx->log_number_;
// TODO(myabandeh): In WritePrepared txn, a commit marker should s = trx->batch_->Iterate(this);
// reference the log that contains the prepare marker. log_number_ref_ = 0;
log_number_ref_ = 0; }
// else the values are already inserted before the commit
if (s.ok()) { if (s.ok()) {
db_->DeleteRecoveredTransaction(name.ToString()); db_->DeleteRecoveredTransaction(name.ToString());
@ -1272,12 +1310,10 @@ class MemTableInserter : public WriteBatch::Handler {
} }
} }
} else { } else {
// TODO(myabandeh): In WritePrepared txn, a commit marker should // When writes are not delayed until commit, there is no disconnect
// reference the log that contains the prepare marker. This is to be able // between a memtable write and the WAL that supports it. So the commit
// to reconsutrct the prepared list after recovery. // need not reference any log as the only log to which it depends.
// TODO(myabandeh): In WritePrepared txn, we do not reach here since assert(!write_after_commit_ || log_number_ref_ > 0);
// disable_memtable is set for commit.
assert(log_number_ref_ > 0);
} }
const bool batch_boundry = true; const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry); MaybeAdvanceSeq(batch_boundry);
@ -1330,6 +1366,8 @@ Status WriteBatchInternal::InsertInto(
nullptr /*has_valid_writes*/, seq_per_batch); nullptr /*has_valid_writes*/, seq_per_batch);
for (auto w : write_group) { for (auto w : write_group) {
if (!w->ShouldWriteToMemtable()) { if (!w->ShouldWriteToMemtable()) {
inserter.MaybeAdvanceSeq(true);
w->sequence = inserter.sequence();
continue; continue;
} }
SetSequence(w->batch, inserter.sequence()); SetSequence(w->batch, inserter.sequence());

@ -18,7 +18,6 @@
#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksdb/write_buffer_manager.h" #include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h" #include "table/scoped_arena_iterator.h"
#include "util/logging.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/testharness.h" #include "util/testharness.h"
@ -299,8 +298,8 @@ namespace {
seen += "MarkEndPrepare(" + xid.ToString() + ")"; seen += "MarkEndPrepare(" + xid.ToString() + ")";
return Status::OK(); return Status::OK();
} }
virtual Status MarkNoop(bool first_tag) override { virtual Status MarkNoop(bool empty_batch) override {
seen += "MarkNoop(" + std::string(first_tag ? "true" : "false") + ")"; seen += "MarkNoop(" + std::string(empty_batch ? "true" : "false") + ")";
return Status::OK(); return Status::OK();
} }
virtual Status MarkCommit(const Slice& xid) override { virtual Status MarkCommit(const Slice& xid) override {

@ -394,6 +394,7 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
write_group->last_writer = w; write_group->last_writer = w;
write_group->size++; write_group->size++;
} }
TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
return size; return size;
} }

@ -14,6 +14,7 @@
#include <type_traits> #include <type_traits>
#include <vector> #include <vector>
#include "db/dbformat.h"
#include "db/write_callback.h" #include "db/write_callback.h"
#include "monitoring/instrumented_mutex.h" #include "monitoring/instrumented_mutex.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
@ -142,6 +143,7 @@ class WriteThread {
made_waitable(false), made_waitable(false),
state(STATE_INIT), state(STATE_INIT),
write_group(nullptr), write_group(nullptr),
sequence(kMaxSequenceNumber),
link_older(nullptr), link_older(nullptr),
link_newer(nullptr) {} link_newer(nullptr) {}
@ -158,6 +160,7 @@ class WriteThread {
made_waitable(false), made_waitable(false),
state(STATE_INIT), state(STATE_INIT),
write_group(nullptr), write_group(nullptr),
sequence(kMaxSequenceNumber),
link_older(nullptr), link_older(nullptr),
link_newer(nullptr) {} link_newer(nullptr) {}

@ -460,6 +460,14 @@ class Transaction {
TransactionState GetState() const { return txn_state_; } TransactionState GetState() const { return txn_state_; }
void SetState(TransactionState state) { txn_state_ = state; } void SetState(TransactionState state) { txn_state_ = state; }
// NOTE: Experimental feature
// The globally unique id with which the transaction is identified. This id
// might or might not be set depending on the implementation. Similarly the
// implementation decides the point in lifetime of a transaction at which it
// assigns the id. Although currently it is the case, the id is not guaranteed
// to remain the same across restarts.
uint64_t GetId() { return id_; }
protected: protected:
explicit Transaction(const TransactionDB* db) {} explicit Transaction(const TransactionDB* db) {}
Transaction() {} Transaction() {}
@ -472,7 +480,14 @@ class Transaction {
// Execution status of the transaction. // Execution status of the transaction.
std::atomic<TransactionState> txn_state_; std::atomic<TransactionState> txn_state_;
uint64_t id_ = 0;
virtual void SetId(uint64_t id) {
assert(id_ == 0);
id_ = id;
}
private: private:
friend class PessimisticTransactionDB;
// No copying allowed // No copying allowed
Transaction(const Transaction&); Transaction(const Transaction&);
void operator=(const Transaction&); void operator=(const Transaction&);

@ -244,7 +244,7 @@ class WriteBatch : public WriteBatchBase {
return Status::InvalidArgument("MarkEndPrepare() handler not defined."); return Status::InvalidArgument("MarkEndPrepare() handler not defined.");
} }
virtual Status MarkNoop(bool first_tag) { virtual Status MarkNoop(bool empty_batch) {
return Status::InvalidArgument("MarkNoop() handler not defined."); return Status::InvalidArgument("MarkNoop() handler not defined.");
} }

@ -115,6 +115,8 @@ Status PessimisticTransactionDB::Initialize(
Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr); Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
assert(real_trx); assert(real_trx);
real_trx->SetLogNumber(recovered_trx->log_number_); real_trx->SetLogNumber(recovered_trx->log_number_);
assert(recovered_trx->seq_ != kMaxSequenceNumber);
real_trx->SetId(recovered_trx->seq_);
s = real_trx->SetName(recovered_trx->name_); s = real_trx->SetName(recovered_trx->name_);
if (!s.ok()) { if (!s.ok()) {
@ -133,6 +135,23 @@ Status PessimisticTransactionDB::Initialize(
return s; return s;
} }
Status WritePreparedTxnDB::Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) {
auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
assert(dbimpl != nullptr);
auto rtxns = dbimpl->recovered_transactions();
for (auto rtxn : rtxns) {
AddPrepared(rtxn.second->seq_);
}
SequenceNumber prev_max = max_evicted_seq_;
SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
AdvanceMaxEvictedSeq(prev_max, last_seq);
auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
handles);
return s;
}
Transaction* WriteCommittedTxnDB::BeginTransaction( Transaction* WriteCommittedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options, const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) { Transaction* old_txn) {
@ -547,6 +566,19 @@ void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
transactions_.erase(it); transactions_.erase(it);
} }
Status WritePreparedTxnDB::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
// We are fine with the latest committed value. This could be done by
// specifying the snapshot as kMaxSequenceNumber.
WritePreparedTxnReadCallback callback(this, kMaxSequenceNumber);
bool* dont_care = nullptr;
// Note: no need to specify a snapshot for read options as no specific
// snapshot is requested by the user.
return db_impl_->GetImpl(options, column_family, key, value, dont_care,
&callback);
}
// Returns true if commit_seq <= snapshot_seq // Returns true if commit_seq <= snapshot_seq
bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
uint64_t snapshot_seq) { uint64_t snapshot_seq) {
@ -571,14 +603,14 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
CommitEntry64b dont_care; CommitEntry64b dont_care;
CommitEntry cached; CommitEntry cached;
bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached);
if (!exist) { if (exist && prep_seq == cached.prep_seq) {
// It is not committed, so it must be still prepared
return false;
}
if (prep_seq == cached.prep_seq) {
// It is committed and also not evicted from commit cache // It is committed and also not evicted from commit cache
return cached.commit_seq <= snapshot_seq; return cached.commit_seq <= snapshot_seq;
} }
// else it could be committed but not inserted in the map which could happen
// after recovery, or it could be committed and evicted by another commit, or
// never committed.
// At this point we dont know if it was committed or it is still prepared // At this point we dont know if it was committed or it is still prepared
auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire); auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
if (max_evicted_seq < prep_seq) { if (max_evicted_seq < prep_seq) {
@ -618,6 +650,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
void WritePreparedTxnDB::AddPrepared(uint64_t seq) { void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq); ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq);
assert(seq > max_evicted_seq_);
WriteLock wl(&prepared_mutex_); WriteLock wl(&prepared_mutex_);
prepared_txns_.push(seq); prepared_txns_.push(seq);
} }

@ -33,8 +33,9 @@ class PessimisticTransactionDB : public TransactionDB {
virtual ~PessimisticTransactionDB(); virtual ~PessimisticTransactionDB();
Status Initialize(const std::vector<size_t>& compaction_enabled_cf_indices, virtual Status Initialize(
const std::vector<ColumnFamilyHandle*>& handles); const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles);
Transaction* BeginTransaction(const WriteOptions& write_options, Transaction* BeginTransaction(const WriteOptions& write_options,
const TransactionOptions& txn_options, const TransactionOptions& txn_options,
@ -191,10 +192,19 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
virtual ~WritePreparedTxnDB() {} virtual ~WritePreparedTxnDB() {}
virtual Status Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) override;
Transaction* BeginTransaction(const WriteOptions& write_options, Transaction* BeginTransaction(const WriteOptions& write_options,
const TransactionOptions& txn_options, const TransactionOptions& txn_options,
Transaction* old_txn) override; Transaction* old_txn) override;
using DB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
// Check whether the transaction that wrote the value with seqeunce number seq // Check whether the transaction that wrote the value with seqeunce number seq
// is visible to the snapshot with sequence number snapshot_seq // is visible to the snapshot with sequence number snapshot_seq
bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq); bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq);
@ -294,6 +304,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class PreparedHeap_BasicsTest_Test; friend class PreparedHeap_BasicsTest_Test;
friend class WritePreparedTxnDBMock; friend class WritePreparedTxnDBMock;
friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
void init(const TransactionDBOptions& /* unused */) { void init(const TransactionDBOptions& /* unused */) {
// Adcance max_evicted_seq_ no more than 100 times before the cache wraps // Adcance max_evicted_seq_ no more than 100 times before the cache wraps

@ -4640,89 +4640,107 @@ TEST_P(TransactionTest, MemoryLimitTest) {
delete txn; delete txn;
} }
// This test clarfies the existing expectation from the sequence number // This test clarifies the existing expectation from the sequence number
// algorithm. It could detect mistakes in updating the code but it is not // algorithm. It could detect mistakes in updating the code but it is not
// necessarily the one acceptable way. If the algorithm is legitimately changed, // necessarily the one acceptable way. If the algorithm is legitimately changed,
// this unit test should be updated as well. // this unit test should be updated as well.
TEST_P(TransactionTest, SeqAdvanceTest) { TEST_P(TransactionTest, SeqAdvanceTest) {
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
auto seq = db_impl->GetLatestSequenceNumber();
auto exp_seq = seq;
// Test DB's internal txn. It involves no prepare phase nor a commit marker.
WriteOptions wopts; WriteOptions wopts;
auto s = db->Put(wopts, "key", "value"); FlushOptions fopt;
// Consume one seq per key
exp_seq++; // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some
ASSERT_OK(s); // of the branches. This is the same as counting a binary number where i-th
seq = db_impl->GetLatestSequenceNumber(); // bit represents whether we take branch i in the represented by the number.
ASSERT_EQ(exp_seq, seq); const size_t NUM_BRANCHES = 8;
// Helper function that shows if the branch is to be taken in the run
// Doing it twice might detect some bugs // represented by the number n.
s = db->Put(wopts, "key", "value"); auto branch_do = [&](size_t n, size_t* branch) {
exp_seq++; assert(*branch < NUM_BRANCHES);
ASSERT_OK(s); const size_t filter = static_cast<size_t>(1) << *branch;
seq = db_impl->GetLatestSequenceNumber(); return n & filter;
ASSERT_EQ(exp_seq, seq); };
const size_t max_n = static_cast<size_t>(1) << NUM_BRANCHES;
// Testing directly writing a write batch. Functionality-wise it is equivalent for (size_t n = 0; n < max_n; n++, ReOpen()) {
// to commit without prepare. DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
WriteBatch wb; size_t branch = 0;
wb.Put("k1", "v1"); auto seq = db_impl->GetLatestSequenceNumber();
wb.Put("k2", "v2"); exp_seq = seq;
wb.Put("k3", "v3"); txn_t0(0);
s = db->Write(wopts, &wb); seq = db_impl->GetLatestSequenceNumber();
// One seq per key. ASSERT_EQ(exp_seq, seq);
exp_seq += 3;
ASSERT_OK(s); if (branch_do(n, &branch)) {
seq = db_impl->GetLatestSequenceNumber(); db_impl->Flush(fopt);
ASSERT_EQ(exp_seq, seq); seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// A full 2pc txn that also involves a commit marker. }
TransactionOptions txn_options; if (branch_do(n, &branch)) {
WriteOptions write_options; db_impl->FlushWAL(true);
Transaction* txn = db->BeginTransaction(write_options, txn_options); ReOpenNoDelete();
s = txn->SetName("xid"); db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
ASSERT_OK(s); seq = db_impl->GetLatestSequenceNumber();
s = txn->Put(Slice("foo"), Slice("bar")); ASSERT_EQ(exp_seq, seq);
s = txn->Put(Slice("foo2"), Slice("bar2")); }
s = txn->Put(Slice("foo3"), Slice("bar3"));
s = txn->Put(Slice("foo4"), Slice("bar4"));
s = txn->Put(Slice("foo5"), Slice("bar5"));
ASSERT_OK(s);
s = txn->Prepare();
ASSERT_OK(s);
// Consume one seq per key
exp_seq += 5;
s = txn->Commit();
ASSERT_OK(s);
s = db->Put(wopts, "key", "value"); // Doing it twice might detect some bugs
exp_seq++; txn_t0(1);
ASSERT_OK(s); seq = db_impl->GetLatestSequenceNumber();
seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq);
ASSERT_EQ(exp_seq, seq);
delete txn;
// Commit without prepare. It shoudl write to DB without a commit marker. txn_t1(0);
txn = db->BeginTransaction(write_options, txn_options); seq = db_impl->GetLatestSequenceNumber();
s = txn->SetName("xid2"); ASSERT_EQ(exp_seq, seq);
ASSERT_OK(s);
s = txn->Put(Slice("foo"), Slice("bar")); if (branch_do(n, &branch)) {
s = txn->Put(Slice("foo2"), Slice("bar2")); db_impl->Flush(fopt);
s = txn->Put(Slice("foo3"), Slice("bar3")); seq = db_impl->GetLatestSequenceNumber();
s = txn->Put(Slice("foo4"), Slice("bar4")); ASSERT_EQ(exp_seq, seq);
s = txn->Put(Slice("foo5"), Slice("bar5")); }
ASSERT_OK(s); if (branch_do(n, &branch)) {
s = txn->Commit(); db_impl->FlushWAL(true);
ASSERT_OK(s); ReOpenNoDelete();
// One seq per key db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
exp_seq += 5; seq = db_impl->GetLatestSequenceNumber();
seq = db_impl->GetLatestSequenceNumber(); ASSERT_EQ(exp_seq, seq);
ASSERT_EQ(exp_seq, seq); }
pdb->UnregisterTransaction(txn);
delete txn; txn_t3(0);
// Since commit marker does not write to memtable, the last seq number is
// not updated immediately. But the advance should be visible after the next
// write.
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
}
if (branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
txn_t0(0);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
txn_t2(0);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
if (branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
}
} }
} // namespace rocksdb } // namespace rocksdb

@ -124,6 +124,105 @@ class TransactionTest : public ::testing::TestWithParam<
} }
return s; return s;
} }
std::atomic<size_t> linked = {0};
std::atomic<size_t> exp_seq = {0};
std::atomic<size_t> commit_writes = {0};
std::atomic<size_t> expected_commits = {0};
std::function<void(size_t, Status)> txn_t0_with_status = [&](size_t index,
Status exp_s) {
// Test DB's internal txn. It involves no prepare phase nor a commit marker.
WriteOptions wopts;
auto s = db->Put(wopts, "key" + std::to_string(index), "value");
ASSERT_EQ(exp_s, s);
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
// Consume one seq per key
exp_seq++;
} else {
// Consume one seq per batch
exp_seq++;
}
};
std::function<void(size_t)> txn_t0 = [&](size_t index) {
return txn_t0_with_status(index, Status::OK());
};
std::function<void(size_t)> txn_t1 = [&](size_t index) {
// Testing directly writing a write batch. Functionality-wise it is
// equivalent to commit without prepare.
WriteBatch wb;
auto istr = std::to_string(index);
wb.Put("k1" + istr, "v1");
wb.Put("k2" + istr, "v2");
wb.Put("k3" + istr, "v3");
WriteOptions wopts;
auto s = db->Write(wopts, &wb);
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
// Consume one seq per key
exp_seq += 3;
;
} else {
// Consume one seq per batch
exp_seq++;
}
ASSERT_OK(s);
};
std::function<void(size_t)> txn_t2 = [&](size_t index) {
// Commit without prepare. It should write to DB without a commit marker.
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
auto istr = std::to_string(index);
auto s = txn->SetName("xid" + istr);
ASSERT_OK(s);
s = txn->Put(Slice("foo" + istr), Slice("bar"));
s = txn->Put(Slice("foo2" + istr), Slice("bar2"));
s = txn->Put(Slice("foo3" + istr), Slice("bar3"));
s = txn->Put(Slice("foo4" + istr), Slice("bar4"));
ASSERT_OK(s);
s = txn->Commit();
ASSERT_OK(s);
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
// Consume one seq per key
exp_seq += 4;
} else {
// Consume one seq per batch
exp_seq++;
}
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
pdb->UnregisterTransaction(txn);
delete txn;
};
std::function<void(size_t)> txn_t3 = [&](size_t index) {
// A full 2pc txn that also involves a commit marker.
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
auto istr = std::to_string(index);
auto s = txn->SetName("xid" + istr);
ASSERT_OK(s);
s = txn->Put(Slice("foo" + istr), Slice("bar"));
s = txn->Put(Slice("foo2" + istr), Slice("bar2"));
s = txn->Put(Slice("foo3" + istr), Slice("bar3"));
s = txn->Put(Slice("foo4" + istr), Slice("bar4"));
s = txn->Put(Slice("foo5" + istr), Slice("bar5"));
ASSERT_OK(s);
expected_commits++;
s = txn->Prepare();
ASSERT_OK(s);
commit_writes++;
s = txn->Commit();
ASSERT_OK(s);
if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
// Consume one seq per key
exp_seq += 5;
} else {
// Consume one seq per batch
exp_seq++;
// Consume one seq per commit marker
exp_seq++;
}
delete txn;
};
}; };
class MySQLStyleTransactionTest : public TransactionTest {}; class MySQLStyleTransactionTest : public TransactionTest {};

@ -285,9 +285,10 @@ class WritePreparedTransactionTest : public TransactionTest {
} }
}; };
// TODO(myabandeh): enable it for concurrent_prepare
INSTANTIATE_TEST_CASE_P(WritePreparedTransactionTest, INSTANTIATE_TEST_CASE_P(WritePreparedTransactionTest,
WritePreparedTransactionTest, WritePreparedTransactionTest,
::testing::Values(std::make_tuple(false, true, ::testing::Values(std::make_tuple(false, false,
WRITE_PREPARED))); WRITE_PREPARED)));
TEST_P(WritePreparedTransactionTest, CommitMapTest) { TEST_P(WritePreparedTransactionTest, CommitMapTest) {
@ -552,95 +553,441 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) {
} }
} }
// This test clarfies the existing expectation from the sequence number // TODO(myabandeh): remove this redundant test after transaction_test is enabled
// algorithm. It could detect mistakes in updating the code but it is not // with WRITE_PREPARED too This test clarifies the existing expectation from the
// necessarily the one acceptable way. If the algorithm is legitimately changed, // sequence number algorithm. It could detect mistakes in updating the code but
// this unit test should be updated as well. // it is not necessarily the one acceptable way. If the algorithm is
// legitimately changed, this unit test should be updated as well.
TEST_P(WritePreparedTransactionTest, SeqAdvanceTest) { TEST_P(WritePreparedTransactionTest, SeqAdvanceTest) {
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
auto seq = db_impl->GetLatestSequenceNumber();
auto exp_seq = seq;
// Test DB's internal txn. It involves no prepare phase nor a commit marker.
WriteOptions wopts; WriteOptions wopts;
auto s = db->Put(wopts, "key", "value"); FlushOptions fopt;
// Consume one seq per batch
exp_seq++;
ASSERT_OK(s);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// Doing it twice might detect some bugs // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some
s = db->Put(wopts, "key", "value"); // of the branches. This is the same as counting a binary number where i-th
exp_seq++; // bit represents whether we take branch i in the represented by the number.
ASSERT_OK(s); const size_t NUM_BRANCHES = 8;
seq = db_impl->GetLatestSequenceNumber(); // Helper function that shows if the branch is to be taken in the run
ASSERT_EQ(exp_seq, seq); // represented by the number n.
auto branch_do = [&](size_t n, size_t* branch) {
// Testing directly writing a write batch. Functionality-wise it is equivalent assert(*branch < NUM_BRANCHES);
// to commit without prepare. const size_t filter = static_cast<size_t>(1) << *branch;
WriteBatch wb; return n & filter;
wb.Put("k1", "v1"); };
wb.Put("k2", "v2"); const size_t max_n = static_cast<size_t>(1) << NUM_BRANCHES;
wb.Put("k3", "v3"); for (size_t n = 0; n < max_n; n++, ReOpen()) {
s = pdb->Write(wopts, &wb); DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
// Consume one seq per batch size_t branch = 0;
exp_seq++; auto seq = db_impl->GetLatestSequenceNumber();
ASSERT_OK(s); exp_seq = seq;
seq = db_impl->GetLatestSequenceNumber(); txn_t0(0);
ASSERT_EQ(exp_seq, seq); seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
if (branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
// Doing it twice might detect some bugs
txn_t0(1);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
txn_t1(0);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
if (branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
txn_t3(0);
// Since commit marker does not write to memtable, the last seq number is
// not updated immediately. But the advance should be visible after the next
// write.
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
}
if (branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
txn_t0(0);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
txn_t2(0);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
if (branch_do(n, &branch)) {
db_impl->Flush(fopt);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
if (branch_do(n, &branch)) {
db_impl->FlushWAL(true);
ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
}
}
TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) {
// Given the sequential run of txns, with this timeout we should never see a
// deadlock nor a timeout unless we have a key conflict, which should be
// almost infeasible.
txn_db_options.transaction_lock_timeout = 1000;
txn_db_options.default_lock_timeout = 1000;
ReOpen();
FlushOptions fopt;
// Number of different txn types we use in this test
const size_t type_cnt = 4;
// The size of the first write group
// TODO(myabandeh): This should be increase for pre-release tests
const size_t first_group_size = 2;
// Total number of txns we run in each test
const size_t txn_cnt = first_group_size * 2;
size_t base[txn_cnt + 1] = {
1,
};
for (size_t bi = 1; bi <= txn_cnt; bi++) {
base[bi] = base[bi - 1] * type_cnt;
}
const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt));
printf("Number of cases being tested is %" PRIu64 "\n", max_n);
for (size_t n = 0; n < max_n; n++, ReOpen()) {
if (n % 1000 == 0) {
printf("Tested %" PRIu64 " cases so far\n", n);
}
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
auto seq = db_impl->GetLatestSequenceNumber();
exp_seq = seq;
// This is increased before writing the batch for commit
commit_writes = 0;
// This is increased before txn starts linking if it expects to do a commit
// eventually
expected_commits = 0;
std::vector<port::Thread> threads;
linked = 0;
std::atomic<bool> batch_formed(false);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::EnterAsBatchGroupLeader:End",
[&](void* arg) { batch_formed = true; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
linked++;
if (linked == 1) {
// Wait until the others are linked too.
while (linked < first_group_size) {
}
} else if (linked == 1 + first_group_size) {
// Make the 2nd batch of the rest of writes plus any followup
// commits from the first batch
while (linked < txn_cnt + commit_writes) {
}
}
// Then we will have one or more batches consisting of follow-up
// commits from the 2nd batch. There is a bit of non-determinism here
// but it should be tolerable.
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (size_t bi = 0; bi < txn_cnt; bi++) {
size_t d =
(n % base[bi + 1]) /
base[bi]; // get the bi-th digit in number system based on type_cnt
switch (d) {
case 0:
threads.emplace_back(txn_t0, bi);
break;
case 1:
threads.emplace_back(txn_t1, bi);
break;
case 2:
threads.emplace_back(txn_t2, bi);
break;
case 3:
threads.emplace_back(txn_t3, bi);
break;
default:
assert(false);
}
// wait to be linked
while (linked.load() <= bi) {
}
if (bi + 1 ==
first_group_size) { // after a queue of size first_group_size
while (!batch_formed) {
}
// to make it more deterministic, wait until the commits are linked
while (linked.load() <= bi + expected_commits) {
}
}
}
for (auto& t : threads) {
t.join();
}
if (txn_db_options.write_policy == WRITE_PREPARED) {
// In this case none of the above scheduling tricks to deterministically
// form merged bactches works because the writes go to saparte queues.
// This would result in different write groups in each run of the test. We
// still keep the test since althgouh non-deterministic and hard to debug,
// it is still useful to have. Since in this case we could finish with
// commit writes that dont write to memtable, the seq is not advanced in
// this code path. It will be after the next write. So we do one more
// write to make the impact of last seq visible.
txn_t0(0);
}
// Check if memtable inserts advanced seq number as expected
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
// Check if recovery preserves the last sequence number
db_impl->FlushWAL(true);
ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// Check if flush preserves the last sequence number
db_impl->Flush(fopt);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// Check if recovery after flush preserves the last sequence number
db_impl->FlushWAL(true);
ReOpenNoDelete();
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
}
}
// Run a couple of differnet txns among them some uncommitted. Restart the db at
// a couple points to check whether the list of uncommitted txns are recovered
// properly.
TEST_P(WritePreparedTransactionTest, BasicRecoveryTest) {
options.disable_auto_compactions = true;
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
txn_t0(0);
// A full 2pc txn that also involves a commit marker.
TransactionOptions txn_options; TransactionOptions txn_options;
WriteOptions write_options; WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options); size_t index = 1000;
s = txn->SetName("xid"); Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(s); auto istr0 = std::to_string(index);
s = txn->Put(Slice("foo"), Slice("bar")); auto s = txn0->SetName("xid" + istr0);
s = txn->Put(Slice("foo2"), Slice("bar2"));
s = txn->Put(Slice("foo3"), Slice("bar3"));
s = txn->Put(Slice("foo4"), Slice("bar4"));
s = txn->Put(Slice("foo5"), Slice("bar5"));
ASSERT_OK(s); ASSERT_OK(s);
s = txn->Prepare(); s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0));
ASSERT_OK(s); ASSERT_OK(s);
// Consume one seq per batch s = txn0->Prepare();
exp_seq++; auto prep_seq_0 = txn0->GetId();
s = txn->Commit();
txn_t1(0);
index++;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
auto istr1 = std::to_string(index);
s = txn1->SetName("xid" + istr1);
ASSERT_OK(s); ASSERT_OK(s);
// Consume one seq per commit marker s = txn1->Put(Slice("foo1" + istr1), Slice("bar"));
exp_seq++;
// Since commit marker does not write to memtable, the last seq number is not
// updated immedaitely. But the advance should be visible after the next
// write.
s = db->Put(wopts, "key", "value");
// Consume one seq per batch
exp_seq++;
ASSERT_OK(s); ASSERT_OK(s);
seq = db_impl->GetLatestSequenceNumber(); s = txn1->Prepare();
ASSERT_EQ(exp_seq, seq); auto prep_seq_1 = txn1->GetId();
delete txn;
txn_t2(0);
// Commit without prepare. It shoudl write to DB without a commit marker. ReadOptions ropt;
txn = db->BeginTransaction(write_options, txn_options); PinnableSlice pinnable_val;
s = txn->SetName("xid2"); // Check the value is not committed before restart
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
pinnable_val.Reset();
delete txn0;
delete txn1;
wp_db->db_impl_->FlushWAL(true);
ReOpenNoDelete();
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// After recovery, all the uncommitted txns (0 and 1) should be inserted into
// delayed_prepared_
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_);
{
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_EQ(2, wp_db->delayed_prepared_.size());
ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) !=
wp_db->delayed_prepared_.end());
ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) !=
wp_db->delayed_prepared_.end());
}
// Check the value is still not committed after restart
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
pinnable_val.Reset();
txn_t3(0);
// Test that a recovered txns will be properly marked committed for the next
// recovery
txn1 = db->GetTransactionByName("xid" + istr1);
ASSERT_NE(txn1, nullptr);
txn1->Commit();
index++;
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
auto istr2 = std::to_string(index);
s = txn2->SetName("xid" + istr2);
ASSERT_OK(s);
s = txn2->Put(Slice("foo2" + istr2), Slice("bar"));
ASSERT_OK(s); ASSERT_OK(s);
s = txn->Put(Slice("foo"), Slice("bar")); s = txn2->Prepare();
s = txn->Put(Slice("foo2"), Slice("bar2")); auto prep_seq_2 = txn2->GetId();
s = txn->Put(Slice("foo3"), Slice("bar3"));
s = txn->Put(Slice("foo4"), Slice("bar4")); delete txn2;
s = txn->Put(Slice("foo5"), Slice("bar5")); wp_db->db_impl_->FlushWAL(true);
ReOpenNoDelete();
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
// 0 and 2 are prepared and 1 is committed
{
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_EQ(2, wp_db->delayed_prepared_.size());
const auto& end = wp_db->delayed_prepared_.end();
ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end);
ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end);
ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end);
}
ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_);
// Commit all the remaining txns
txn0 = db->GetTransactionByName("xid" + istr0);
ASSERT_NE(txn0, nullptr);
txn0->Commit();
txn2 = db->GetTransactionByName("xid" + istr2);
ASSERT_NE(txn2, nullptr);
txn2->Commit();
// Check the value is committed after commit
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.ok());
ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
pinnable_val.Reset();
delete txn0;
delete txn2;
wp_db->db_impl_->FlushWAL(true);
ReOpenNoDelete();
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_TRUE(wp_db->delayed_prepared_empty_);
// Check the value is still committed after recovery
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.ok());
ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
pinnable_val.Reset();
}
// After recovery the new transactions should still conflict with recovered
// transactions.
TEST_P(WritePreparedTransactionTest, ConflictDetectionAfterRecoveryTest) {
options.disable_auto_compactions = true;
ReOpen();
TransactionOptions txn_options;
WriteOptions write_options;
size_t index = 0;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto istr0 = std::to_string(index);
auto s = txn0->SetName("xid" + istr0);
ASSERT_OK(s); ASSERT_OK(s);
s = txn->Commit(); s = txn0->Put(Slice("key" + istr0), Slice("bar0" + istr0));
ASSERT_OK(s); ASSERT_OK(s);
// Consume one seq per batch s = txn0->Prepare();
exp_seq++;
seq = db_impl->GetLatestSequenceNumber(); // With the same index 0 and key prefix, txn_t0 should conflict with txn0
ASSERT_EQ(exp_seq, seq); txn_t0_with_status(0, Status::TimedOut());
pdb->UnregisterTransaction(txn); delete txn0;
delete txn;
auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
ReOpenNoDelete();
// It should still conflict after the recovery
txn_t0_with_status(0, Status::TimedOut());
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
ReOpenNoDelete();
// Check that a recovered txn will still cause conflicts after 2nd recovery
txn_t0_with_status(0, Status::TimedOut());
txn0 = db->GetTransactionByName("xid" + istr0);
ASSERT_NE(txn0, nullptr);
txn0->Commit();
delete txn0;
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
ReOpenNoDelete();
// tnx0 is now committed and should no longer cause a conflict
txn_t0_with_status(0, Status::OK());
}
// After recovery the commit map is empty while the max is set. The code would
// go through a different path which requires a separate test.
TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
wp_db->max_evicted_seq_ = 100;
ASSERT_FALSE(wp_db->IsInSnapshot(50, 40));
ASSERT_TRUE(wp_db->IsInSnapshot(50, 50));
ASSERT_TRUE(wp_db->IsInSnapshot(50, 100));
ASSERT_TRUE(wp_db->IsInSnapshot(50, 150));
ASSERT_FALSE(wp_db->IsInSnapshot(100, 80));
ASSERT_TRUE(wp_db->IsInSnapshot(100, 100));
ASSERT_TRUE(wp_db->IsInSnapshot(100, 150));
} }
// Test WritePreparedTxnDB's IsInSnapshot against different ordering of // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
@ -683,6 +1030,8 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
// We keep the list of txns comitted before we take the last snaphot. // We keep the list of txns comitted before we take the last snaphot.
// These should be the only seq numbers that will be found in the snapshot // These should be the only seq numbers that will be found in the snapshot
std::set<uint64_t> committed_before; std::set<uint64_t> committed_before;
// The set of commit seq numbers to be excluded from IsInSnapshot queries
std::set<uint64_t> commit_seqs;
DBImpl* mock_db = new DBImpl(options, dbname); DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock( std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits)); mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits));
@ -701,6 +1050,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
} else { // else commit it } else { // else commit it
seq++; seq++;
wp_db->AddCommitted(cur_txn, seq); wp_db->AddCommitted(cur_txn, seq);
commit_seqs.insert(seq);
if (!snapshot) { if (!snapshot) {
committed_before.insert(cur_txn); committed_before.insert(cur_txn);
} }
@ -725,7 +1075,8 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
// it at each cycle to test that the system is still sound when // it at each cycle to test that the system is still sound when
// max_evicted_seq_ advances. // max_evicted_seq_ advances.
if (snapshot) { if (snapshot) {
for (uint64_t s = 0; s <= seq; s++) { for (uint64_t s = 1;
s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) {
bool was_committed = bool was_committed =
(committed_before.find(s) != committed_before.end()); (committed_before.find(s) != committed_before.end());
bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot); bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);

@ -52,8 +52,9 @@ Status WritePreparedTxn::PrepareInternal() {
/*callback*/ nullptr, &log_number_, /*log ref*/ 0, /*callback*/ nullptr, &log_number_, /*log ref*/ 0,
!disable_memtable, &seq_used); !disable_memtable, &seq_used);
assert(seq_used != kMaxSequenceNumber); assert(seq_used != kMaxSequenceNumber);
prepare_seq_ = seq_used; auto prepare_seq = seq_used;
wpt_db_->AddPrepared(prepare_seq_); SetId(prepare_seq);
wpt_db_->AddPrepared(prepare_seq);
return s; return s;
} }
@ -66,9 +67,10 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
WriteBatchInternal::InsertNoop(batch); WriteBatchInternal::InsertNoop(batch);
const bool disable_memtable = true; const bool disable_memtable = true;
const uint64_t no_log_ref = 0; const uint64_t no_log_ref = 0;
uint64_t seq_used; uint64_t seq_used = kMaxSequenceNumber;
auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr, auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr,
no_log_ref, !disable_memtable, &seq_used); no_log_ref, !disable_memtable, &seq_used);
assert(seq_used != kMaxSequenceNumber);
uint64_t& prepare_seq = seq_used; uint64_t& prepare_seq = seq_used;
uint64_t& commit_seq = seq_used; uint64_t& commit_seq = seq_used;
// TODO(myabandeh): skip AddPrepared // TODO(myabandeh): skip AddPrepared
@ -90,13 +92,19 @@ Status WritePreparedTxn::CommitInternal() {
working_batch->MarkWalTerminationPoint(); working_batch->MarkWalTerminationPoint();
const bool disable_memtable = true; const bool disable_memtable = true;
uint64_t seq_used; uint64_t seq_used = kMaxSequenceNumber;
// Since the prepared batch is directly written to memtable, there is already
// a connection between the memtable and its WAL, so there is no need to
// redundantly reference the log that contains the prepared data.
const uint64_t zero_log_number = 0ull;
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
log_number_, disable_memtable, &seq_used); zero_log_number, disable_memtable, &seq_used);
assert(seq_used != kMaxSequenceNumber);
uint64_t& commit_seq = seq_used; uint64_t& commit_seq = seq_used;
// TODO(myabandeh): Reject a commit request if AddCommitted cannot encode // TODO(myabandeh): Reject a commit request if AddCommitted cannot encode
// commit_seq. This happens if prep_seq <<< commit_seq. // commit_seq. This happens if prep_seq <<< commit_seq.
wpt_db_->AddCommitted(prepare_seq_, commit_seq); auto prepare_seq = GetId();
wpt_db_->AddCommitted(prepare_seq, commit_seq);
return s; return s;
} }

@ -53,6 +53,8 @@ class WritePreparedTxn : public PessimisticTransaction {
Status Rollback() override; Status Rollback() override;
private: private:
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
Status PrepareInternal() override; Status PrepareInternal() override;
Status CommitWithoutPrepareInternal() override; Status CommitWithoutPrepareInternal() override;
@ -73,7 +75,6 @@ class WritePreparedTxn : public PessimisticTransaction {
void operator=(const WritePreparedTxn&); void operator=(const WritePreparedTxn&);
WritePreparedTxnDB* wpt_db_; WritePreparedTxnDB* wpt_db_;
uint64_t prepare_seq_;
}; };
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save