Update WritePrepared with the pseudo code

Summary:
Implement the main body of WritePrepared pseudo code. This includes PrepareInternal and CommitInternal, as well as AddCommitted which updates the commit map. It also provides a IsInSnapshot method that could be later called form the read path to decide if a version is in the read snapshot or it should other be skipped.

This patch lacks unit tests and does not attempt to offer an efficient implementation. The idea is that to have the API specified so that we can work on related tasks in parallel.
Closes https://github.com/facebook/rocksdb/pull/2713

Differential Revision: D5640021

Pulled By: maysamyabandeh

fbshipit-source-id: bfa7a05e8d8498811fab714ce4b9c21530514e1c
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 132306fbf0
commit eb6425303e
  1. 8
      db/db_impl.h
  2. 33
      db/db_impl_write.cc
  3. 4
      db/snapshot_impl.h
  4. 1
      db/write_batch.cc
  5. 2
      utilities/blob_db/blob_db_impl.cc
  6. 11
      utilities/transactions/optimistic_transaction.cc
  7. 4
      utilities/transactions/optimistic_transaction.h
  8. 35
      utilities/transactions/pessimistic_transaction.cc
  9. 4
      utilities/transactions/pessimistic_transaction.h
  10. 198
      utilities/transactions/pessimistic_transaction_db.cc
  11. 105
      utilities/transactions/pessimistic_transaction_db.h
  12. 12
      utilities/transactions/transaction_lock_mgr.cc
  13. 47
      utilities/transactions/write_prepared_txn.cc
  14. 12
      utilities/transactions/write_prepared_txn.h

@ -616,16 +616,18 @@ class DBImpl : public DB {
Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false);
bool disable_memtable = false, uint64_t* seq_used = nullptr);
Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false);
bool disable_memtable = false,
uint64_t* seq_used = nullptr);
Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0);
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
uint64_t FindMinLogContainingOutstandingPrep();
uint64_t FindMinPrepLogReferencedByMemTable();

@ -60,7 +60,7 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable) {
bool disable_memtable, uint64_t* seq_used) {
if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!");
}
@ -79,12 +79,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (concurrent_prepare_ && disable_memtable) {
return WriteImplWALOnly(write_options, my_batch, callback, log_used,
log_ref);
log_ref, seq_used);
}
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable);
log_ref, disable_memtable, seq_used);
}
PERF_TIMER_GUARD(write_pre_and_post_process_time);
@ -127,6 +127,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (log_used != nullptr) {
*log_used = w.log_used;
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
// write is complete and leader has updated sequence
return w.FinalStatus();
}
@ -278,6 +281,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/);
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
}
}
}
@ -325,7 +331,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable) {
bool disable_memtable, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
@ -440,6 +446,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
}
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
assert(w.state == WriteThread::STATE_COMPLETED);
return w.FinalStatus();
@ -447,7 +456,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref) {
uint64_t* log_used, uint64_t log_ref,
uint64_t* seq_used) {
Status status;
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
@ -464,6 +474,9 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
if (log_used != nullptr) {
*log_used = w.log_used;
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
return w.FinalStatus();
}
// else we are the leader of the write batch group
@ -509,6 +522,13 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
// wal_write_mutex_ to ensure ordered events in WAL
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
0 /*total_count*/);
auto curr_seq = last_sequence + 1;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
writer->sequence = curr_seq;
curr_seq += WriteBatchInternal::Count(writer->batch);
}
}
if (status.ok() && write_options.sync) {
// Requesting sync with concurrent_prepare_ is expected to be very rare. We
// hance provide a simple implementation that is not necessarily efficient.
@ -527,6 +547,9 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
if (status.ok()) {
status = w.FinalStatus();
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
return status;
}

@ -76,7 +76,7 @@ class SnapshotList {
// retrieve all snapshot numbers. They are sorted in ascending order.
std::vector<SequenceNumber> GetAll(
SequenceNumber* oldest_write_conflict_snapshot = nullptr) {
SequenceNumber* oldest_write_conflict_snapshot = nullptr) const {
std::vector<SequenceNumber> ret;
if (oldest_write_conflict_snapshot != nullptr) {
@ -86,7 +86,7 @@ class SnapshotList {
if (empty()) {
return ret;
}
SnapshotImpl* s = &list_;
const SnapshotImpl* s = &list_;
while (s->next_ != &list_) {
ret.push_back(s->next_->number_);

@ -1303,6 +1303,7 @@ Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group,
continue;
}
SetSequence(w->batch, inserter.sequence());
w->sequence = inserter.sequence();
inserter.set_log_number_ref(w->log_ref);
w->status = w->batch->Iterate(&inserter);
if (!w->status.ok()) {

@ -32,8 +32,8 @@
#include "util/random.h"
#include "util/sync_point.h"
#include "util/timer_queue.h"
#include "utilities/transactions/optimistic_transaction_db_impl.h"
#include "utilities/transactions/optimistic_transaction.h"
#include "utilities/transactions/optimistic_transaction_db_impl.h"
namespace {
int kBlockBasedTableVersionFormat = 2;

@ -44,12 +44,9 @@ void OptimisticTransaction::Reinitialize(
Initialize(txn_options);
}
OptimisticTransaction::~OptimisticTransaction() {
}
OptimisticTransaction::~OptimisticTransaction() {}
void OptimisticTransaction::Clear() {
TransactionBaseImpl::Clear();
}
void OptimisticTransaction::Clear() { TransactionBaseImpl::Clear(); }
Status OptimisticTransaction::Prepare() {
return Status::InvalidArgument(
@ -82,8 +79,8 @@ Status OptimisticTransaction::Rollback() {
//
// 'exclusive' is unused for OptimisticTransaction.
Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only,
bool exclusive, bool untracked) {
const Slice& key, bool read_only,
bool exclusive, bool untracked) {
if (untracked) {
return Status::OK();
}

@ -29,8 +29,8 @@ namespace rocksdb {
class OptimisticTransaction : public TransactionBaseImpl {
public:
OptimisticTransaction(OptimisticTransactionDB* db,
const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options);
const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options);
virtual ~OptimisticTransaction();

@ -35,9 +35,9 @@ TransactionID PessimisticTransaction::GenTxnID() {
return txn_id_counter_.fetch_add(1);
}
PessimisticTransaction::PessimisticTransaction(TransactionDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options)
PessimisticTransaction::PessimisticTransaction(
TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options)
: TransactionBaseImpl(txn_db->GetRootDB(), write_options),
txn_db_impl_(nullptr),
expiration_time_(0),
@ -99,9 +99,9 @@ void PessimisticTransaction::Clear() {
TransactionBaseImpl::Clear();
}
void PessimisticTransaction::Reinitialize(TransactionDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options) {
void PessimisticTransaction::Reinitialize(
TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options) {
if (!name_.empty() && txn_state_ != COMMITED) {
txn_db_impl_->UnregisterTransaction(this);
}
@ -120,9 +120,9 @@ bool PessimisticTransaction::IsExpired() const {
return false;
}
WriteCommittedTxn::WriteCommittedTxn(
TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options)
WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options)
: PessimisticTransaction(txn_db, write_options, txn_options){};
Status WriteCommittedTxn::CommitBatch(WriteBatch* batch) {
@ -370,7 +370,7 @@ Status PessimisticTransaction::RollbackToSavePoint() {
// Lock all keys in this batch.
// On success, caller should unlock keys_to_unlock
Status PessimisticTransaction::LockBatch(WriteBatch* batch,
TransactionKeyMap* keys_to_unlock) {
TransactionKeyMap* keys_to_unlock) {
class Handler : public WriteBatch::Handler {
public:
// Sorted map of column_family_id to sorted set of keys.
@ -448,8 +448,8 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch,
// this key will only be locked if there have been no writes to this key since
// the snapshot time.
Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only, bool exclusive,
bool untracked) {
const Slice& key, bool read_only,
bool exclusive, bool untracked) {
uint32_t cfh_id = GetColumnFamilyID(column_family);
std::string key_str = key.ToString();
bool previously_locked;
@ -535,10 +535,9 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
// Return OK() if this key has not been modified more recently than the
// transaction snapshot_.
Status PessimisticTransaction::ValidateSnapshot(ColumnFamilyHandle* column_family,
const Slice& key,
SequenceNumber prev_seqno,
SequenceNumber* new_seqno) {
Status PessimisticTransaction::ValidateSnapshot(
ColumnFamilyHandle* column_family, const Slice& key,
SequenceNumber prev_seqno, SequenceNumber* new_seqno) {
assert(snapshot_);
SequenceNumber seq = snapshot_->GetSequenceNumber();
@ -566,8 +565,8 @@ bool PessimisticTransaction::TryStealingLocks() {
LOCKS_STOLEN);
}
void PessimisticTransaction::UnlockGetForUpdate(ColumnFamilyHandle* column_family,
const Slice& key) {
void PessimisticTransaction::UnlockGetForUpdate(
ColumnFamilyHandle* column_family, const Slice& key) {
txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString());
}

@ -38,7 +38,7 @@ class PessimisticTransactionDB;
class PessimisticTransaction : public TransactionBaseImpl {
public:
PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options);
const TransactionOptions& txn_options);
virtual ~PessimisticTransaction();
@ -182,7 +182,7 @@ class PessimisticTransaction : public TransactionBaseImpl {
class WriteCommittedTxn : public PessimisticTransaction {
public:
WriteCommittedTxn(TransactionDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options);
const TransactionOptions& txn_options);
virtual ~WriteCommittedTxn() {}

@ -16,8 +16,9 @@
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
#include "util/mutexlock.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/transaction_db_mutex_impl.h"
namespace rocksdb {
@ -301,7 +302,8 @@ Status PessimisticTransactionDB::DropColumnFamily(
return s;
}
Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
uint32_t cfh_id,
const std::string& key,
bool exclusive) {
return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
@ -312,8 +314,8 @@ void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
lock_mgr_.UnLock(txn, keys, GetEnv());
}
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
const std::string& key) {
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
uint32_t cfh_id, const std::string& key) {
lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
}
@ -409,7 +411,8 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts,
Transaction* txn = BeginInternalTransaction(opts);
txn->DisableIndexing();
auto txn_impl = static_cast_with_check<PessimisticTransaction, Transaction>(txn);
auto txn_impl =
static_cast_with_check<PessimisticTransaction, Transaction>(txn);
// Since commitBatch sorts the keys before locking, concurrent Write()
// operations will not cause a deadlock.
@ -422,8 +425,8 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts,
return s;
}
void PessimisticTransactionDB::InsertExpirableTransaction(TransactionID tx_id,
PessimisticTransaction* tx) {
void PessimisticTransactionDB::InsertExpirableTransaction(
TransactionID tx_id, PessimisticTransaction* tx) {
assert(tx->GetExpirationTime() > 0);
std::lock_guard<std::mutex> lock(map_mutex_);
expirable_transactions_map_.insert({tx_id, tx});
@ -449,7 +452,8 @@ bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
void PessimisticTransactionDB::ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options) {
auto txn_impl = static_cast_with_check<PessimisticTransaction, Transaction>(txn);
auto txn_impl =
static_cast_with_check<PessimisticTransaction, Transaction>(txn);
txn_impl->Reinitialize(this, write_options, txn_options);
}
@ -499,5 +503,183 @@ void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
transactions_.erase(it);
}
// Returns true if commit_seq <= snapshot_seq
bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
uint64_t snapshot_seq) {
// Here we try to infer the return value without looking into prepare list.
// This would help avoiding synchronization over a shared map.
// TODO(myabandeh): read your own writes
// TODO(myabandeh): optimize this. This sequence of checks must be correct but
// not necessary efficient
if (snapshot_seq < prep_seq) {
// snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq
return false;
}
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
// We should not normally reach here
ReadLock rl(&prepared_mutex_);
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
// Then it is not committed yet
return false;
}
}
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
CommitEntry cached;
bool exist = GetCommitEntry(indexed_seq, &cached);
if (!exist) {
// It is not committed, so it must be still prepared
return false;
}
if (prep_seq == cached.prep_seq) {
// It is committed and also not evicted from commit cache
return cached.commit_seq <= snapshot_seq;
}
// At this point we dont know if it was committed or it is still prepared
auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire);
if (max_evicted_seq < prep_seq) {
// Not evicted from cache and also not present, so must be still prepared
return false;
}
// When advancing max_evicted_seq_, we move older entires from prepared to
// delayed_prepared_. Also we move evicted entries from commit cache to
// old_commit_map_ if it overlaps with any snapshot. Since prep_seq <=
// max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in
// old_commit_map_, iii) committed with no conflict with any snapshot (i)
// delayed_prepared_ is checked above
if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case
// only (iii) is the case: committed
// commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq <
// snapshot_seq
return true;
}
// else (ii) might be the case: check the commit data saved for this snapshot.
// If there was no overlapping commit entry, then it is committed with a
// commit_seq lower than any live snapshot, including snapshot_seq.
if (old_commit_map_empty_.load(std::memory_order_acquire)) {
return true;
}
{
// We should not normally reach here
ReadLock rl(&old_commit_map_mutex_);
auto old_commit_entry = old_commit_map_.find(prep_seq);
if (old_commit_entry == old_commit_map_.end() ||
old_commit_entry->second <= snapshot_seq) {
return true;
}
}
// (ii) it the case: it is committed but after the snapshot_seq
return false;
}
void WritePreparedTxnDB::AddPrepared(uint64_t seq) { prepared_txns_.push(seq); }
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
uint64_t commit_seq) {
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
CommitEntry evicted;
bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted);
if (to_be_evicted) {
auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
if (prev_max < evicted.commit_seq) {
auto max_evicted_seq = evicted.commit_seq;
// When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max,
// then it is not in prepared_txns_ ans save an expensive, synchronized
// lookup from a shared set. delayed_prepared_ is expected to be empty in
// normal cases.
{
WriteLock wl(&prepared_mutex_);
while (!prepared_txns_.empty() &&
prepared_txns_.top() <= max_evicted_seq) {
auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped);
prepared_txns_.pop();
delayed_prepared_empty_.store(false, std::memory_order_release);
}
}
{
WriteLock wl(&snapshots_mutex_);
InstrumentedMutex(db_impl_->mutex());
snapshots_ = db_impl_->snapshots().GetAll();
}
while (prev_max < max_evicted_seq &&
!max_evicted_seq_.compare_exchange_weak(
prev_max, max_evicted_seq, std::memory_order_release,
std::memory_order_acquire)) {
};
}
// After each eviction from commit cache, check if the commit entry should
// be kept around because it overlaps with a live snapshot.
{
ReadLock rl(&snapshots_mutex_);
for (auto snapshot : snapshots_) {
auto snapshot_seq =
reinterpret_cast<const SnapshotImpl*>(snapshot)->number_;
if (evicted.commit_seq <= snapshot_seq) {
break;
}
// then snapshot_seq < evicted.commit_seq
if (evicted.prep_seq <= snapshot_seq) { // overlapping range
WriteLock wl(&old_commit_map_mutex_);
old_commit_map_empty_.store(false, std::memory_order_release);
old_commit_map_[evicted.prep_seq] = evicted.commit_seq;
}
}
}
}
bool succ =
ExchangeCommitEntry(indexed_seq, evicted, {prepare_seq, commit_seq});
if (!succ) {
// A very rare event, in which the commit entry is updated before we do.
// Here we apply a very simple solution of retrying.
// TODO(myabandeh): do precautions to detect bugs that cause infinite loops
AddCommitted(prepare_seq, commit_seq);
return;
}
{
WriteLock wl(&prepared_mutex_);
prepared_txns_.erase(prepare_seq);
bool was_empty = delayed_prepared_.empty();
if (!was_empty) {
delayed_prepared_.erase(prepare_seq);
bool is_empty = delayed_prepared_.empty();
if (was_empty != is_empty) {
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
}
}
}
}
bool WritePreparedTxnDB::GetCommitEntry(uint64_t indexed_seq,
CommitEntry* entry) {
// TODO(myabandeh): implement lock-free commit_cache_
ReadLock rl(&commit_cache_mutex_);
*entry = commit_cache_[indexed_seq];
return (entry->commit_seq != 0); // initialized
}
bool WritePreparedTxnDB::AddCommitEntry(uint64_t indexed_seq,
CommitEntry& new_entry,
CommitEntry* evicted_entry) {
// TODO(myabandeh): implement lock-free commit_cache_
WriteLock wl(&commit_cache_mutex_);
*evicted_entry = commit_cache_[indexed_seq];
commit_cache_[indexed_seq] = new_entry;
return (evicted_entry->commit_seq != 0); // initialized
}
bool WritePreparedTxnDB::ExchangeCommitEntry(uint64_t indexed_seq,
CommitEntry& expected_entry,
CommitEntry new_entry) {
// TODO(myabandeh): implement lock-free commit_cache_
WriteLock wl(&commit_cache_mutex_);
auto& evicted_entry = commit_cache_[indexed_seq];
if (evicted_entry.prep_seq != expected_entry.prep_seq) {
return false;
}
commit_cache_[indexed_seq] = new_entry;
return true;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -8,6 +8,7 @@
#include <mutex>
#include <queue>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
@ -64,11 +65,12 @@ class PessimisticTransactionDB : public TransactionDB {
using StackableDB::DropColumnFamily;
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key,
bool exclusive);
Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
const std::string& key, bool exclusive);
void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys);
void UnLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key);
void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
const std::string& key);
void AddColumnFamily(const ColumnFamilyHandle* handle);
@ -79,7 +81,8 @@ class PessimisticTransactionDB : public TransactionDB {
return txn_db_options_;
}
void InsertExpirableTransaction(TransactionID tx_id, PessimisticTransaction* tx);
void InsertExpirableTransaction(TransactionID tx_id,
PessimisticTransaction* tx);
void RemoveExpirableTransaction(TransactionID tx_id);
// If transaction is no longer available, locks can be stolen
@ -97,14 +100,18 @@ class PessimisticTransactionDB : public TransactionDB {
void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;
TransactionLockMgr::LockStatusData GetLockStatusData() override;
struct CommitEntry {
uint64_t prep_seq;
uint64_t commit_seq;
};
protected:
void ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions());
DBImpl* db_impl_;
private:
DBImpl* db_impl_;
const TransactionDBOptions txn_db_options_;
TransactionLockMgr lock_mgr_;
@ -161,6 +168,94 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
Transaction* BeginTransaction(const WriteOptions& write_options,
const TransactionOptions& txn_options,
Transaction* old_txn) override;
// Check whether the transaction that wrote the value with seqeunce number seq
// is visible to the snapshot with sequence number snapshot_seq
bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq);
// Add the trasnaction with prepare sequence seq to the prepared list
void AddPrepared(uint64_t seq);
// Add the transaction with prepare sequence prepare_seq and commit sequence
// commit_seq to the commit map
void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq);
private:
// A heap with the amortized O(1) complexity for erase. It uses one extra heap
// to keep track of erased entries that are not yet on top of the main heap.
class PreparedHeap {
std::priority_queue<uint64_t> heap_;
std::priority_queue<uint64_t> erased_heap_;
public:
bool empty() { return heap_.empty(); }
uint64_t top() { return heap_.top(); }
void push(uint64_t v) { heap_.push(v); }
void pop() {
heap_.pop();
while (!heap_.empty() && !erased_heap_.empty() &&
heap_.top() == erased_heap_.top()) {
heap_.pop();
erased_heap_.pop();
}
}
void erase(uint64_t seq) {
if (!heap_.empty()) {
if (heap_.top() < seq) {
// Already popped, ignore it.
} else if (heap_.top() == seq) {
heap_.pop();
} else { // (heap_.top() > seq)
// Down the heap, remember to pop it later
erased_heap_.push(seq);
}
}
}
};
// Get the commit entry with index indexed_seq from the commit table. It
// returns true if such entry exists.
bool GetCommitEntry(uint64_t indexed_seq, CommitEntry* entry);
// Rewrite the entry with the index indexed_seq in the commit table with the
// commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
// sets the evicted_entry and returns true.
bool AddCommitEntry(uint64_t indexed_seq, CommitEntry& new_entry,
CommitEntry* evicted_entry);
// Rewrite the entry with the index indexed_seq in the commit table with the
// commit entry new_entry only if the existing entry matches the
// expected_entry. Returns false otherwise.
bool ExchangeCommitEntry(uint64_t indexed_seq, CommitEntry& expected_entry,
CommitEntry new_entry);
// The list of live snapshots at the last time that max_evicted_seq_ advanced.
// The list sorted in ascending order. Thread-safety is provided with
// snapshots_mutex_.
std::vector<SequenceNumber> snapshots_;
// A heap of prepared transactions. Thread-safety is provided with
// prepared_mutex_.
PreparedHeap prepared_txns_;
// 10m entry, 80MB size
static const uint64_t COMMIT_CACHE_SIZE = static_cast<uint64_t>(1 << 21);
// commit_cache_ is initialized to zero to tell apart an empty index from a
// filled one. Thread-safety is provided with commit_cache_mutex_.
CommitEntry commit_cache_[COMMIT_CACHE_SIZE] = {};
// The largest evicted *commit* sequence number from the commit_cache_
std::atomic<uint64_t> max_evicted_seq_ = {};
// A map of the evicted entries from commit_cache_ that has to be kept around
// to service the old snapshots. This is expected to be empty normally.
// Thread-safety is provided with old_commit_map_mutex_.
std::map<uint64_t, uint64_t> old_commit_map_;
// A set of long-running prepared transactions that are not finished by the
// time max_evicted_seq_ advances their sequence number. This is expected to
// be empty normally. Thread-safety is provided with prepared_mutex_.
std::set<uint64_t> delayed_prepared_;
// Update when delayed_prepared_.empty() changes. Expected to be true
// normally.
std::atomic<bool> delayed_prepared_empty_ = {true};
// Update when old_commit_map_.empty() changes. Expected to be true normally.
std::atomic<bool> old_commit_map_empty_ = {true};
port::RWMutex prepared_mutex_;
port::RWMutex old_commit_map_mutex_;
port::RWMutex commit_cache_mutex_;
port::RWMutex snapshots_mutex_;
};
} // namespace rocksdb

@ -357,13 +357,15 @@ Status TransactionLockMgr::AcquireWithTimeout(
}
void TransactionLockMgr::DecrementWaiters(
const PessimisticTransaction* txn, const autovector<TransactionID>& wait_ids) {
const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids) {
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
DecrementWaitersImpl(txn, wait_ids);
}
void TransactionLockMgr::DecrementWaitersImpl(
const PessimisticTransaction* txn, const autovector<TransactionID>& wait_ids) {
const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids) {
auto id = txn->GetID();
assert(wait_txn_map_.Contains(id));
wait_txn_map_.Delete(id);
@ -377,7 +379,8 @@ void TransactionLockMgr::DecrementWaitersImpl(
}
bool TransactionLockMgr::IncrementWaiters(
const PessimisticTransaction* txn, const autovector<TransactionID>& wait_ids) {
const PessimisticTransaction* txn,
const autovector<TransactionID>& wait_ids) {
auto id = txn->GetID();
std::vector<TransactionID> queue(txn->GetDeadlockDetectDepth());
std::lock_guard<std::mutex> lock(wait_txn_map_mutex_);
@ -537,7 +540,8 @@ void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn,
}
}
void TransactionLockMgr::UnLock(PessimisticTransaction* txn, uint32_t column_family_id,
void TransactionLockMgr::UnLock(PessimisticTransaction* txn,
uint32_t column_family_id,
const std::string& key, Env* env) {
std::shared_ptr<LockMap> lock_map_ptr = GetLockMap(column_family_id);
LockMap* lock_map = lock_map_ptr.get();

@ -14,17 +14,18 @@
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/transaction_db.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
namespace rocksdb {
struct WriteOptions;
WritePreparedTxn::WritePreparedTxn(
TransactionDB* txn_db, const WriteOptions& write_options,
const TransactionOptions& txn_options)
: PessimisticTransaction(txn_db, write_options, txn_options) {
WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options)
: PessimisticTransaction(txn_db, write_options, txn_options),
wpt_db_(txn_db) {
PessimisticTransaction::Initialize(txn_options);
}
@ -35,9 +36,18 @@ Status WritePreparedTxn::CommitBatch(WriteBatch* /* unused */) {
}
Status WritePreparedTxn::PrepareInternal() {
// TODO(myabandeh) Implement this
throw std::runtime_error("Prepare not Implemented");
return Status::OK();
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
const bool disable_memtable = true;
uint64_t seq_used;
Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
!disable_memtable, &seq_used);
prepare_seq_ = seq_used;
wpt_db_->AddPrepared(prepare_seq_);
return s;
}
Status WritePreparedTxn::CommitWithoutPrepareInternal() {
@ -47,9 +57,24 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() {
}
Status WritePreparedTxn::CommitInternal() {
// TODO(myabandeh) Implement this
throw std::runtime_error("Commit not Implemented");
return Status::OK();
// We take the commit-time batch and append the Commit marker.
// The Memtable will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
// TODO(myabandeh): prevent the users from writing to txn after the prepare
// phase
assert(working_batch->Count() == 0);
WriteBatchInternal::MarkCommit(working_batch, name_);
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();
const bool disable_memtable = true;
uint64_t seq_used;
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
log_number_, disable_memtable, &seq_used);
uint64_t& commit_seq = seq_used;
wpt_db_->AddCommitted(prepare_seq_, commit_seq);
return s;
}
Status WritePreparedTxn::Rollback() {

@ -25,13 +25,14 @@
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/autovector.h"
#include "utilities/transactions/transaction_base.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_base.h"
#include "utilities/transactions/transaction_util.h"
namespace rocksdb {
class TransactionDBImpl;
class WritePreparedTxnDB;
// This impl could write to DB also uncomitted data and then later tell apart
// committed data from uncomitted data. Uncommitted data could be after the
@ -39,8 +40,8 @@ class TransactionDBImpl;
// (WriteUnpreparedTxnImpl).
class WritePreparedTxn : public PessimisticTransaction {
public:
WritePreparedTxn(TransactionDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options);
WritePreparedTxn(WritePreparedTxnDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options);
virtual ~WritePreparedTxn() {}
@ -65,6 +66,9 @@ class WritePreparedTxn : public PessimisticTransaction {
// No copying allowed
WritePreparedTxn(const WritePreparedTxn&);
void operator=(const WritePreparedTxn&);
WritePreparedTxnDB* wpt_db_;
uint64_t prepare_seq_;
};
} // namespace rocksdb

Loading…
Cancel
Save