diff --git a/db/db_impl.h b/db/db_impl.h index 31d69a970..39c1d6103 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -616,16 +616,18 @@ class DBImpl : public DB { Status WriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, uint64_t log_ref = 0, - bool disable_memtable = false); + bool disable_memtable = false, uint64_t* seq_used = nullptr); Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, uint64_t log_ref = 0, - bool disable_memtable = false); + bool disable_memtable = false, + uint64_t* seq_used = nullptr); Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, - uint64_t* log_used = nullptr, uint64_t log_ref = 0); + uint64_t* log_used = nullptr, uint64_t log_ref = 0, + uint64_t* seq_used = nullptr); uint64_t FindMinLogContainingOutstandingPrep(); uint64_t FindMinPrepLogReferencedByMemTable(); diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index b93dd6f8f..512819772 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -60,7 +60,7 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options, Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, uint64_t log_ref, - bool disable_memtable) { + bool disable_memtable, uint64_t* seq_used) { if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } @@ -79,12 +79,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (concurrent_prepare_ && disable_memtable) { return WriteImplWALOnly(write_options, my_batch, callback, log_used, - log_ref); + log_ref, seq_used); } if (immutable_db_options_.enable_pipelined_write) { return PipelinedWriteImpl(write_options, my_batch, callback, log_used, - log_ref, disable_memtable); + log_ref, disable_memtable, seq_used); } PERF_TIMER_GUARD(write_pre_and_post_process_time); @@ -127,6 +127,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (log_used != nullptr) { *log_used = w.log_used; } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } // write is complete and leader has updated sequence return w.FinalStatus(); } @@ -278,6 +281,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } } } } @@ -325,7 +331,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, uint64_t log_ref, - bool disable_memtable) { + bool disable_memtable, uint64_t* seq_used) { PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); @@ -440,6 +446,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, write_thread_.ExitAsMemTableWriter(&w, *w.write_group); } } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } assert(w.state == WriteThread::STATE_COMPLETED); return w.FinalStatus(); @@ -447,7 +456,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, - uint64_t* log_used, uint64_t log_ref) { + uint64_t* log_used, uint64_t log_ref, + uint64_t* seq_used) { Status status; PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, log_ref, @@ -464,6 +474,9 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, if (log_used != nullptr) { *log_used = w.log_used; } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } return w.FinalStatus(); } // else we are the leader of the write batch group @@ -509,6 +522,13 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, // wal_write_mutex_ to ensure ordered events in WAL status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, 0 /*total_count*/); + auto curr_seq = last_sequence + 1; + for (auto* writer : write_group) { + if (writer->CheckCallback(this)) { + writer->sequence = curr_seq; + curr_seq += WriteBatchInternal::Count(writer->batch); + } + } if (status.ok() && write_options.sync) { // Requesting sync with concurrent_prepare_ is expected to be very rare. We // hance provide a simple implementation that is not necessarily efficient. @@ -527,6 +547,9 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, if (status.ok()) { status = w.FinalStatus(); } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } return status; } diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index b94602f2a..8441050fd 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -76,7 +76,7 @@ class SnapshotList { // retrieve all snapshot numbers. They are sorted in ascending order. std::vector GetAll( - SequenceNumber* oldest_write_conflict_snapshot = nullptr) { + SequenceNumber* oldest_write_conflict_snapshot = nullptr) const { std::vector ret; if (oldest_write_conflict_snapshot != nullptr) { @@ -86,7 +86,7 @@ class SnapshotList { if (empty()) { return ret; } - SnapshotImpl* s = &list_; + const SnapshotImpl* s = &list_; while (s->next_ != &list_) { ret.push_back(s->next_->number_); diff --git a/db/write_batch.cc b/db/write_batch.cc index 91be9a0df..43639ac23 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1303,6 +1303,7 @@ Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group, continue; } SetSequence(w->batch, inserter.sequence()); + w->sequence = inserter.sequence(); inserter.set_log_number_ref(w->log_ref); w->status = w->batch->Iterate(&inserter); if (!w->status.ok()) { diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index a15fe4a18..06350b896 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -32,8 +32,8 @@ #include "util/random.h" #include "util/sync_point.h" #include "util/timer_queue.h" -#include "utilities/transactions/optimistic_transaction_db_impl.h" #include "utilities/transactions/optimistic_transaction.h" +#include "utilities/transactions/optimistic_transaction_db_impl.h" namespace { int kBlockBasedTableVersionFormat = 2; diff --git a/utilities/transactions/optimistic_transaction.cc b/utilities/transactions/optimistic_transaction.cc index 882fbec4a..89d3226d5 100644 --- a/utilities/transactions/optimistic_transaction.cc +++ b/utilities/transactions/optimistic_transaction.cc @@ -44,12 +44,9 @@ void OptimisticTransaction::Reinitialize( Initialize(txn_options); } -OptimisticTransaction::~OptimisticTransaction() { -} +OptimisticTransaction::~OptimisticTransaction() {} -void OptimisticTransaction::Clear() { - TransactionBaseImpl::Clear(); -} +void OptimisticTransaction::Clear() { TransactionBaseImpl::Clear(); } Status OptimisticTransaction::Prepare() { return Status::InvalidArgument( @@ -82,8 +79,8 @@ Status OptimisticTransaction::Rollback() { // // 'exclusive' is unused for OptimisticTransaction. Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family, - const Slice& key, bool read_only, - bool exclusive, bool untracked) { + const Slice& key, bool read_only, + bool exclusive, bool untracked) { if (untracked) { return Status::OK(); } diff --git a/utilities/transactions/optimistic_transaction.h b/utilities/transactions/optimistic_transaction.h index b49bd6ab9..5a19489f2 100644 --- a/utilities/transactions/optimistic_transaction.h +++ b/utilities/transactions/optimistic_transaction.h @@ -29,8 +29,8 @@ namespace rocksdb { class OptimisticTransaction : public TransactionBaseImpl { public: OptimisticTransaction(OptimisticTransactionDB* db, - const WriteOptions& write_options, - const OptimisticTransactionOptions& txn_options); + const WriteOptions& write_options, + const OptimisticTransactionOptions& txn_options); virtual ~OptimisticTransaction(); diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index 092b7132c..68b8b4f1a 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -35,9 +35,9 @@ TransactionID PessimisticTransaction::GenTxnID() { return txn_id_counter_.fetch_add(1); } -PessimisticTransaction::PessimisticTransaction(TransactionDB* txn_db, - const WriteOptions& write_options, - const TransactionOptions& txn_options) +PessimisticTransaction::PessimisticTransaction( + TransactionDB* txn_db, const WriteOptions& write_options, + const TransactionOptions& txn_options) : TransactionBaseImpl(txn_db->GetRootDB(), write_options), txn_db_impl_(nullptr), expiration_time_(0), @@ -99,9 +99,9 @@ void PessimisticTransaction::Clear() { TransactionBaseImpl::Clear(); } -void PessimisticTransaction::Reinitialize(TransactionDB* txn_db, - const WriteOptions& write_options, - const TransactionOptions& txn_options) { +void PessimisticTransaction::Reinitialize( + TransactionDB* txn_db, const WriteOptions& write_options, + const TransactionOptions& txn_options) { if (!name_.empty() && txn_state_ != COMMITED) { txn_db_impl_->UnregisterTransaction(this); } @@ -120,9 +120,9 @@ bool PessimisticTransaction::IsExpired() const { return false; } -WriteCommittedTxn::WriteCommittedTxn( - TransactionDB* txn_db, const WriteOptions& write_options, - const TransactionOptions& txn_options) +WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db, + const WriteOptions& write_options, + const TransactionOptions& txn_options) : PessimisticTransaction(txn_db, write_options, txn_options){}; Status WriteCommittedTxn::CommitBatch(WriteBatch* batch) { @@ -370,7 +370,7 @@ Status PessimisticTransaction::RollbackToSavePoint() { // Lock all keys in this batch. // On success, caller should unlock keys_to_unlock Status PessimisticTransaction::LockBatch(WriteBatch* batch, - TransactionKeyMap* keys_to_unlock) { + TransactionKeyMap* keys_to_unlock) { class Handler : public WriteBatch::Handler { public: // Sorted map of column_family_id to sorted set of keys. @@ -448,8 +448,8 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch, // this key will only be locked if there have been no writes to this key since // the snapshot time. Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, - const Slice& key, bool read_only, bool exclusive, - bool untracked) { + const Slice& key, bool read_only, + bool exclusive, bool untracked) { uint32_t cfh_id = GetColumnFamilyID(column_family); std::string key_str = key.ToString(); bool previously_locked; @@ -535,10 +535,9 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, // Return OK() if this key has not been modified more recently than the // transaction snapshot_. -Status PessimisticTransaction::ValidateSnapshot(ColumnFamilyHandle* column_family, - const Slice& key, - SequenceNumber prev_seqno, - SequenceNumber* new_seqno) { +Status PessimisticTransaction::ValidateSnapshot( + ColumnFamilyHandle* column_family, const Slice& key, + SequenceNumber prev_seqno, SequenceNumber* new_seqno) { assert(snapshot_); SequenceNumber seq = snapshot_->GetSequenceNumber(); @@ -566,8 +565,8 @@ bool PessimisticTransaction::TryStealingLocks() { LOCKS_STOLEN); } -void PessimisticTransaction::UnlockGetForUpdate(ColumnFamilyHandle* column_family, - const Slice& key) { +void PessimisticTransaction::UnlockGetForUpdate( + ColumnFamilyHandle* column_family, const Slice& key) { txn_db_impl_->UnLock(this, GetColumnFamilyID(column_family), key.ToString()); } diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index a0162fa27..5c6d4d261 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -38,7 +38,7 @@ class PessimisticTransactionDB; class PessimisticTransaction : public TransactionBaseImpl { public: PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options, - const TransactionOptions& txn_options); + const TransactionOptions& txn_options); virtual ~PessimisticTransaction(); @@ -182,7 +182,7 @@ class PessimisticTransaction : public TransactionBaseImpl { class WriteCommittedTxn : public PessimisticTransaction { public: WriteCommittedTxn(TransactionDB* db, const WriteOptions& write_options, - const TransactionOptions& txn_options); + const TransactionOptions& txn_options); virtual ~WriteCommittedTxn() {} diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 9787d76df..156e7a12b 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -16,8 +16,9 @@ #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" #include "util/cast_util.h" -#include "utilities/transactions/transaction_db_mutex_impl.h" +#include "util/mutexlock.h" #include "utilities/transactions/pessimistic_transaction.h" +#include "utilities/transactions/transaction_db_mutex_impl.h" namespace rocksdb { @@ -301,7 +302,8 @@ Status PessimisticTransactionDB::DropColumnFamily( return s; } -Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, uint32_t cfh_id, +Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn, + uint32_t cfh_id, const std::string& key, bool exclusive) { return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive); @@ -312,8 +314,8 @@ void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, lock_mgr_.UnLock(txn, keys, GetEnv()); } -void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, uint32_t cfh_id, - const std::string& key) { +void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn, + uint32_t cfh_id, const std::string& key) { lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); } @@ -409,7 +411,8 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts, Transaction* txn = BeginInternalTransaction(opts); txn->DisableIndexing(); - auto txn_impl = static_cast_with_check(txn); + auto txn_impl = + static_cast_with_check(txn); // Since commitBatch sorts the keys before locking, concurrent Write() // operations will not cause a deadlock. @@ -422,8 +425,8 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts, return s; } -void PessimisticTransactionDB::InsertExpirableTransaction(TransactionID tx_id, - PessimisticTransaction* tx) { +void PessimisticTransactionDB::InsertExpirableTransaction( + TransactionID tx_id, PessimisticTransaction* tx) { assert(tx->GetExpirationTime() > 0); std::lock_guard lock(map_mutex_); expirable_transactions_map_.insert({tx_id, tx}); @@ -449,7 +452,8 @@ bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks( void PessimisticTransactionDB::ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options) { - auto txn_impl = static_cast_with_check(txn); + auto txn_impl = + static_cast_with_check(txn); txn_impl->Reinitialize(this, write_options, txn_options); } @@ -499,5 +503,183 @@ void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) { transactions_.erase(it); } +// Returns true if commit_seq <= snapshot_seq +bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, + uint64_t snapshot_seq) { + // Here we try to infer the return value without looking into prepare list. + // This would help avoiding synchronization over a shared map. + // TODO(myabandeh): read your own writes + // TODO(myabandeh): optimize this. This sequence of checks must be correct but + // not necessary efficient + if (snapshot_seq < prep_seq) { + // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq + return false; + } + if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { + // We should not normally reach here + ReadLock rl(&prepared_mutex_); + if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { + // Then it is not committed yet + return false; + } + } + auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; + CommitEntry cached; + bool exist = GetCommitEntry(indexed_seq, &cached); + if (!exist) { + // It is not committed, so it must be still prepared + return false; + } + if (prep_seq == cached.prep_seq) { + // It is committed and also not evicted from commit cache + return cached.commit_seq <= snapshot_seq; + } + // At this point we dont know if it was committed or it is still prepared + auto max_evicted_seq = max_evicted_seq_.load(std::memory_order_acquire); + if (max_evicted_seq < prep_seq) { + // Not evicted from cache and also not present, so must be still prepared + return false; + } + // When advancing max_evicted_seq_, we move older entires from prepared to + // delayed_prepared_. Also we move evicted entries from commit cache to + // old_commit_map_ if it overlaps with any snapshot. Since prep_seq <= + // max_evicted_seq_, we have three cases: i) in delayed_prepared_, ii) in + // old_commit_map_, iii) committed with no conflict with any snapshot (i) + // delayed_prepared_ is checked above + if (max_evicted_seq < snapshot_seq) { // then (ii) cannot be the case + // only (iii) is the case: committed + // commit_seq <= max_evicted_seq_ < snapshot_seq => commit_seq < + // snapshot_seq + return true; + } + // else (ii) might be the case: check the commit data saved for this snapshot. + // If there was no overlapping commit entry, then it is committed with a + // commit_seq lower than any live snapshot, including snapshot_seq. + if (old_commit_map_empty_.load(std::memory_order_acquire)) { + return true; + } + { + // We should not normally reach here + ReadLock rl(&old_commit_map_mutex_); + auto old_commit_entry = old_commit_map_.find(prep_seq); + if (old_commit_entry == old_commit_map_.end() || + old_commit_entry->second <= snapshot_seq) { + return true; + } + } + // (ii) it the case: it is committed but after the snapshot_seq + return false; +} + +void WritePreparedTxnDB::AddPrepared(uint64_t seq) { prepared_txns_.push(seq); } + +void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, + uint64_t commit_seq) { + auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; + CommitEntry evicted; + bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted); + if (to_be_evicted) { + auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); + if (prev_max < evicted.commit_seq) { + auto max_evicted_seq = evicted.commit_seq; + // When max_evicted_seq_ advances, move older entries from prepared_txns_ + // to delayed_prepared_. This guarantees that if a seq is lower than max, + // then it is not in prepared_txns_ ans save an expensive, synchronized + // lookup from a shared set. delayed_prepared_ is expected to be empty in + // normal cases. + { + WriteLock wl(&prepared_mutex_); + while (!prepared_txns_.empty() && + prepared_txns_.top() <= max_evicted_seq) { + auto to_be_popped = prepared_txns_.top(); + delayed_prepared_.insert(to_be_popped); + prepared_txns_.pop(); + delayed_prepared_empty_.store(false, std::memory_order_release); + } + } + { + WriteLock wl(&snapshots_mutex_); + InstrumentedMutex(db_impl_->mutex()); + snapshots_ = db_impl_->snapshots().GetAll(); + } + while (prev_max < max_evicted_seq && + !max_evicted_seq_.compare_exchange_weak( + prev_max, max_evicted_seq, std::memory_order_release, + std::memory_order_acquire)) { + }; + } + // After each eviction from commit cache, check if the commit entry should + // be kept around because it overlaps with a live snapshot. + { + ReadLock rl(&snapshots_mutex_); + for (auto snapshot : snapshots_) { + auto snapshot_seq = + reinterpret_cast(snapshot)->number_; + if (evicted.commit_seq <= snapshot_seq) { + break; + } + // then snapshot_seq < evicted.commit_seq + if (evicted.prep_seq <= snapshot_seq) { // overlapping range + WriteLock wl(&old_commit_map_mutex_); + old_commit_map_empty_.store(false, std::memory_order_release); + old_commit_map_[evicted.prep_seq] = evicted.commit_seq; + } + } + } + } + bool succ = + ExchangeCommitEntry(indexed_seq, evicted, {prepare_seq, commit_seq}); + if (!succ) { + // A very rare event, in which the commit entry is updated before we do. + // Here we apply a very simple solution of retrying. + // TODO(myabandeh): do precautions to detect bugs that cause infinite loops + AddCommitted(prepare_seq, commit_seq); + return; + } + { + WriteLock wl(&prepared_mutex_); + prepared_txns_.erase(prepare_seq); + bool was_empty = delayed_prepared_.empty(); + if (!was_empty) { + delayed_prepared_.erase(prepare_seq); + bool is_empty = delayed_prepared_.empty(); + if (was_empty != is_empty) { + delayed_prepared_empty_.store(is_empty, std::memory_order_release); + } + } + } +} + +bool WritePreparedTxnDB::GetCommitEntry(uint64_t indexed_seq, + CommitEntry* entry) { + // TODO(myabandeh): implement lock-free commit_cache_ + ReadLock rl(&commit_cache_mutex_); + *entry = commit_cache_[indexed_seq]; + return (entry->commit_seq != 0); // initialized +} + +bool WritePreparedTxnDB::AddCommitEntry(uint64_t indexed_seq, + CommitEntry& new_entry, + CommitEntry* evicted_entry) { + // TODO(myabandeh): implement lock-free commit_cache_ + WriteLock wl(&commit_cache_mutex_); + *evicted_entry = commit_cache_[indexed_seq]; + commit_cache_[indexed_seq] = new_entry; + return (evicted_entry->commit_seq != 0); // initialized +} + +bool WritePreparedTxnDB::ExchangeCommitEntry(uint64_t indexed_seq, + CommitEntry& expected_entry, + CommitEntry new_entry) { + // TODO(myabandeh): implement lock-free commit_cache_ + WriteLock wl(&commit_cache_mutex_); + auto& evicted_entry = commit_cache_[indexed_seq]; + if (evicted_entry.prep_seq != expected_entry.prep_seq) { + return false; + } + commit_cache_[indexed_seq] = new_entry; + return true; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 6ff1d015a..35c2a0143 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -64,11 +65,12 @@ class PessimisticTransactionDB : public TransactionDB { using StackableDB::DropColumnFamily; virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; - Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key, - bool exclusive); + Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id, + const std::string& key, bool exclusive); void UnLock(PessimisticTransaction* txn, const TransactionKeyMap* keys); - void UnLock(PessimisticTransaction* txn, uint32_t cfh_id, const std::string& key); + void UnLock(PessimisticTransaction* txn, uint32_t cfh_id, + const std::string& key); void AddColumnFamily(const ColumnFamilyHandle* handle); @@ -79,7 +81,8 @@ class PessimisticTransactionDB : public TransactionDB { return txn_db_options_; } - void InsertExpirableTransaction(TransactionID tx_id, PessimisticTransaction* tx); + void InsertExpirableTransaction(TransactionID tx_id, + PessimisticTransaction* tx); void RemoveExpirableTransaction(TransactionID tx_id); // If transaction is no longer available, locks can be stolen @@ -97,14 +100,18 @@ class PessimisticTransactionDB : public TransactionDB { void GetAllPreparedTransactions(std::vector* trans) override; TransactionLockMgr::LockStatusData GetLockStatusData() override; + struct CommitEntry { + uint64_t prep_seq; + uint64_t commit_seq; + }; protected: void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options = TransactionOptions()); + DBImpl* db_impl_; private: - DBImpl* db_impl_; const TransactionDBOptions txn_db_options_; TransactionLockMgr lock_mgr_; @@ -161,6 +168,94 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) override; + + // Check whether the transaction that wrote the value with seqeunce number seq + // is visible to the snapshot with sequence number snapshot_seq + bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq); + // Add the trasnaction with prepare sequence seq to the prepared list + void AddPrepared(uint64_t seq); + // Add the transaction with prepare sequence prepare_seq and commit sequence + // commit_seq to the commit map + void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq); + + private: + // A heap with the amortized O(1) complexity for erase. It uses one extra heap + // to keep track of erased entries that are not yet on top of the main heap. + class PreparedHeap { + std::priority_queue heap_; + std::priority_queue erased_heap_; + + public: + bool empty() { return heap_.empty(); } + uint64_t top() { return heap_.top(); } + void push(uint64_t v) { heap_.push(v); } + void pop() { + heap_.pop(); + while (!heap_.empty() && !erased_heap_.empty() && + heap_.top() == erased_heap_.top()) { + heap_.pop(); + erased_heap_.pop(); + } + } + void erase(uint64_t seq) { + if (!heap_.empty()) { + if (heap_.top() < seq) { + // Already popped, ignore it. + } else if (heap_.top() == seq) { + heap_.pop(); + } else { // (heap_.top() > seq) + // Down the heap, remember to pop it later + erased_heap_.push(seq); + } + } + } + }; + + // Get the commit entry with index indexed_seq from the commit table. It + // returns true if such entry exists. + bool GetCommitEntry(uint64_t indexed_seq, CommitEntry* entry); + // Rewrite the entry with the index indexed_seq in the commit table with the + // commit entry . If the rewrite results into eviction, + // sets the evicted_entry and returns true. + bool AddCommitEntry(uint64_t indexed_seq, CommitEntry& new_entry, + CommitEntry* evicted_entry); + // Rewrite the entry with the index indexed_seq in the commit table with the + // commit entry new_entry only if the existing entry matches the + // expected_entry. Returns false otherwise. + bool ExchangeCommitEntry(uint64_t indexed_seq, CommitEntry& expected_entry, + CommitEntry new_entry); + + // The list of live snapshots at the last time that max_evicted_seq_ advanced. + // The list sorted in ascending order. Thread-safety is provided with + // snapshots_mutex_. + std::vector snapshots_; + // A heap of prepared transactions. Thread-safety is provided with + // prepared_mutex_. + PreparedHeap prepared_txns_; + // 10m entry, 80MB size + static const uint64_t COMMIT_CACHE_SIZE = static_cast(1 << 21); + // commit_cache_ is initialized to zero to tell apart an empty index from a + // filled one. Thread-safety is provided with commit_cache_mutex_. + CommitEntry commit_cache_[COMMIT_CACHE_SIZE] = {}; + // The largest evicted *commit* sequence number from the commit_cache_ + std::atomic max_evicted_seq_ = {}; + // A map of the evicted entries from commit_cache_ that has to be kept around + // to service the old snapshots. This is expected to be empty normally. + // Thread-safety is provided with old_commit_map_mutex_. + std::map old_commit_map_; + // A set of long-running prepared transactions that are not finished by the + // time max_evicted_seq_ advances their sequence number. This is expected to + // be empty normally. Thread-safety is provided with prepared_mutex_. + std::set delayed_prepared_; + // Update when delayed_prepared_.empty() changes. Expected to be true + // normally. + std::atomic delayed_prepared_empty_ = {true}; + // Update when old_commit_map_.empty() changes. Expected to be true normally. + std::atomic old_commit_map_empty_ = {true}; + port::RWMutex prepared_mutex_; + port::RWMutex old_commit_map_mutex_; + port::RWMutex commit_cache_mutex_; + port::RWMutex snapshots_mutex_; }; } // namespace rocksdb diff --git a/utilities/transactions/transaction_lock_mgr.cc b/utilities/transactions/transaction_lock_mgr.cc index d93d5bcde..9b7a4e640 100644 --- a/utilities/transactions/transaction_lock_mgr.cc +++ b/utilities/transactions/transaction_lock_mgr.cc @@ -357,13 +357,15 @@ Status TransactionLockMgr::AcquireWithTimeout( } void TransactionLockMgr::DecrementWaiters( - const PessimisticTransaction* txn, const autovector& wait_ids) { + const PessimisticTransaction* txn, + const autovector& wait_ids) { std::lock_guard lock(wait_txn_map_mutex_); DecrementWaitersImpl(txn, wait_ids); } void TransactionLockMgr::DecrementWaitersImpl( - const PessimisticTransaction* txn, const autovector& wait_ids) { + const PessimisticTransaction* txn, + const autovector& wait_ids) { auto id = txn->GetID(); assert(wait_txn_map_.Contains(id)); wait_txn_map_.Delete(id); @@ -377,7 +379,8 @@ void TransactionLockMgr::DecrementWaitersImpl( } bool TransactionLockMgr::IncrementWaiters( - const PessimisticTransaction* txn, const autovector& wait_ids) { + const PessimisticTransaction* txn, + const autovector& wait_ids) { auto id = txn->GetID(); std::vector queue(txn->GetDeadlockDetectDepth()); std::lock_guard lock(wait_txn_map_mutex_); @@ -537,7 +540,8 @@ void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn, } } -void TransactionLockMgr::UnLock(PessimisticTransaction* txn, uint32_t column_family_id, +void TransactionLockMgr::UnLock(PessimisticTransaction* txn, + uint32_t column_family_id, const std::string& key, Env* env) { std::shared_ptr lock_map_ptr = GetLockMap(column_family_id); LockMap* lock_map = lock_map_ptr.get(); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index f3942855b..211e21724 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -14,17 +14,18 @@ #include "rocksdb/db.h" #include "rocksdb/status.h" #include "rocksdb/utilities/transaction_db.h" -#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/pessimistic_transaction.h" +#include "utilities/transactions/pessimistic_transaction_db.h" namespace rocksdb { struct WriteOptions; -WritePreparedTxn::WritePreparedTxn( - TransactionDB* txn_db, const WriteOptions& write_options, - const TransactionOptions& txn_options) - : PessimisticTransaction(txn_db, write_options, txn_options) { +WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db, + const WriteOptions& write_options, + const TransactionOptions& txn_options) + : PessimisticTransaction(txn_db, write_options, txn_options), + wpt_db_(txn_db) { PessimisticTransaction::Initialize(txn_options); } @@ -35,9 +36,18 @@ Status WritePreparedTxn::CommitBatch(WriteBatch* /* unused */) { } Status WritePreparedTxn::PrepareInternal() { - // TODO(myabandeh) Implement this - throw std::runtime_error("Prepare not Implemented"); - return Status::OK(); + WriteOptions write_options = write_options_; + write_options.disableWAL = false; + WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_); + const bool disable_memtable = true; + uint64_t seq_used; + Status s = + db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), + /*callback*/ nullptr, &log_number_, /*log ref*/ 0, + !disable_memtable, &seq_used); + prepare_seq_ = seq_used; + wpt_db_->AddPrepared(prepare_seq_); + return s; } Status WritePreparedTxn::CommitWithoutPrepareInternal() { @@ -47,9 +57,24 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() { } Status WritePreparedTxn::CommitInternal() { - // TODO(myabandeh) Implement this - throw std::runtime_error("Commit not Implemented"); - return Status::OK(); + // We take the commit-time batch and append the Commit marker. + // The Memtable will ignore the Commit marker in non-recovery mode + WriteBatch* working_batch = GetCommitTimeWriteBatch(); + // TODO(myabandeh): prevent the users from writing to txn after the prepare + // phase + assert(working_batch->Count() == 0); + WriteBatchInternal::MarkCommit(working_batch, name_); + + // any operations appended to this working_batch will be ignored from WAL + working_batch->MarkWalTerminationPoint(); + + const bool disable_memtable = true; + uint64_t seq_used; + auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, + log_number_, disable_memtable, &seq_used); + uint64_t& commit_seq = seq_used; + wpt_db_->AddCommitted(prepare_seq_, commit_seq); + return s; } Status WritePreparedTxn::Rollback() { diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index c0feb2207..b7cc6ba1b 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -25,13 +25,14 @@ #include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/write_batch_with_index.h" #include "util/autovector.h" -#include "utilities/transactions/transaction_base.h" #include "utilities/transactions/pessimistic_transaction.h" +#include "utilities/transactions/pessimistic_transaction_db.h" +#include "utilities/transactions/transaction_base.h" #include "utilities/transactions/transaction_util.h" namespace rocksdb { -class TransactionDBImpl; +class WritePreparedTxnDB; // This impl could write to DB also uncomitted data and then later tell apart // committed data from uncomitted data. Uncommitted data could be after the @@ -39,8 +40,8 @@ class TransactionDBImpl; // (WriteUnpreparedTxnImpl). class WritePreparedTxn : public PessimisticTransaction { public: - WritePreparedTxn(TransactionDB* db, const WriteOptions& write_options, - const TransactionOptions& txn_options); + WritePreparedTxn(WritePreparedTxnDB* db, const WriteOptions& write_options, + const TransactionOptions& txn_options); virtual ~WritePreparedTxn() {} @@ -65,6 +66,9 @@ class WritePreparedTxn : public PessimisticTransaction { // No copying allowed WritePreparedTxn(const WritePreparedTxn&); void operator=(const WritePreparedTxn&); + + WritePreparedTxnDB* wpt_db_; + uint64_t prepare_seq_; }; } // namespace rocksdb