WritePrepared: reduce prepared_mutex_ overhead (#5420)

Summary:
The patch reduces the contention over prepared_mutex_ using these techniques:
1) Move ::RemovePrepared() to be called from the commit callback when we have two write queues.
2) Use two separate mutex for PreparedHeap, one prepared_mutex_ needed for ::RemovePrepared, and one ::push_pop_mutex() needed for ::AddPrepared(). Given that we call ::AddPrepared only from the first write queue and ::RemovePrepared mostly from the 2nd, this will result into each the two write queues not competing with each other over a single mutex. ::RemovePrepared might occasionally need to acquire ::push_pop_mutex() if ::erase() ends up with calling ::pop()
3) Acquire ::push_pop_mutex() on the first callback of the write queue and release it on the last.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5420

Differential Revision: D15741985

Pulled By: maysamyabandeh

fbshipit-source-id: 84ce8016007e88bb6e10da5760ba1f0d26347735
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent a16d0cc494
commit c292dc8540
  1. 19
      db/db_impl/db_impl_write.cc
  2. 6
      db/pre_release_callback.h
  3. 3
      db/write_callback_test.cc
  4. 3
      utilities/transactions/pessimistic_transaction.cc
  5. 61
      utilities/transactions/write_prepared_transaction_test.cc
  6. 26
      utilities/transactions/write_prepared_txn.cc
  7. 67
      utilities/transactions/write_prepared_txn_db.cc
  8. 113
      utilities/transactions/write_prepared_txn_db.h
  9. 4
      utilities/transactions/write_unprepared_txn.cc
  10. 4
      utilities/transactions/write_unprepared_txn_db.cc
  11. 6
      utilities/transactions/write_unprepared_txn_db.h

@ -263,6 +263,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
size_t total_count = 0;
size_t valid_batches = 0;
size_t total_byte_size = 0;
size_t pre_release_callback_cnt = 0;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
valid_batches += writer->batch_cnt;
@ -270,9 +271,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
total_count += WriteBatchInternal::Count(writer->batch);
parallel = parallel && !writer->batch->HasMerge();
}
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
}
}
// Note about seq_per_batch_: either disableWAL is set for the entire write
@ -336,6 +339,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// PreReleaseCallback is called after WAL write and before memtable write
if (status.ok()) {
SequenceNumber next_sequence = current_sequence;
size_t index = 0;
// Note: the logic for advancing seq here must be consistent with the
// logic in WriteBatchInternal::InsertInto(write_group...) as well as
// with WriteBatchInternal::InsertInto(write_batch...) that is called on
@ -347,7 +351,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
writer->sequence = next_sequence;
if (writer->pre_release_callback) {
Status ws = writer->pre_release_callback->Callback(
writer->sequence, disable_memtable, writer->log_used);
writer->sequence, disable_memtable, writer->log_used, index++,
pre_release_callback_cnt);
if (!ws.ok()) {
status = ws;
break;
@ -675,11 +680,15 @@ Status DBImpl::WriteImplWALOnly(
// Note: no need to update last_batch_group_size_ here since the batch writes
// to WAL only
size_t pre_release_callback_cnt = 0;
size_t total_byte_size = 0;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
}
}
@ -758,11 +767,13 @@ Status DBImpl::WriteImplWALOnly(
WriteStatusCheck(status);
}
if (status.ok()) {
size_t index = 0;
for (auto* writer : write_group) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(
writer->sequence, disable_memtable, writer->log_used);
writer->sequence, disable_memtable, writer->log_used, index++,
pre_release_callback_cnt);
if (!ws.ok()) {
status = ws;
break;
@ -1121,7 +1132,7 @@ Status DBImpl::WriteRecoverableState() {
// AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB
mutex_.Unlock();
status = recoverable_state_pre_release_callback_->Callback(
sub_batch_seq, !DISABLE_MEMTABLE, no_log_num);
sub_batch_seq, !DISABLE_MEMTABLE, no_log_num, 0, 1);
mutex_.Lock();
}
}

@ -27,8 +27,12 @@ class PreReleaseCallback {
// is_mem_disabled is currently used for debugging purposes to assert that
// the callback is done from the right write queue.
// If non-zero, log_number indicates the WAL log to which we wrote.
// index >= 0 specifies the order of callback in the same write thread.
// total > index specifies the total number of callbacks in the same write
// thread. Together with index, could be used to reduce the redundant
// operations among the callbacks.
virtual Status Callback(SequenceNumber seq, bool is_mem_disabled,
uint64_t log_number) = 0;
uint64_t log_number, size_t index, size_t total) = 0;
};
} // namespace rocksdb

@ -304,7 +304,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
PublishSeqCallback(DBImpl* db_impl_in)
: db_impl_(db_impl_in) {}
Status Callback(SequenceNumber last_seq, bool /*not used*/,
uint64_t) override {
uint64_t, size_t /*index*/,
size_t /*total*/) override {
db_impl_->SetLastPublishedSequence(last_seq);
return Status::OK();
}

@ -231,7 +231,8 @@ Status WriteCommittedTxn::PrepareInternal() {
(void)two_write_queues_; // to silence unused private field warning
}
virtual Status Callback(SequenceNumber, bool is_mem_disabled,
uint64_t log_number) override {
uint64_t log_number, size_t /*index*/,
size_t /*total*/) override {
#ifdef NDEBUG
(void)is_mem_disabled;
#endif

@ -7,9 +7,9 @@
#include "utilities/transactions/transaction_test.h"
#include <cinttypes>
#include <algorithm>
#include <atomic>
#include <cinttypes>
#include <functional>
#include <string>
#include <thread>
@ -55,25 +55,17 @@ TEST(PreparedHeap, BasicsTest) {
heap.push(34l);
// Test that old min is still on top
ASSERT_EQ(14l, heap.top());
heap.push(13l);
// Test that the new min will be on top
ASSERT_EQ(13l, heap.top());
// Test that it is persistent
ASSERT_EQ(13l, heap.top());
heap.push(44l);
heap.push(54l);
heap.push(64l);
heap.push(74l);
heap.push(84l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
ASSERT_EQ(14l, heap.top());
heap.erase(24l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
ASSERT_EQ(14l, heap.top());
heap.erase(14l);
// Test that old min is still on top
ASSERT_EQ(13l, heap.top());
heap.erase(13l);
// Test that the new comes to the top after multiple erase
ASSERT_EQ(34l, heap.top());
heap.erase(34l);
@ -3001,13 +2993,16 @@ TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
port::Mutex txn_mutex_;
// t1) Insert prepared entry, t2) commit other entires to advance max
// evicted sec and finish checking the existing prepared entires, t1)
// t1) Insert prepared entry, t2) commit other entries to advance max
// evicted sec and finish checking the existing prepared entries, t1)
// AddPrepared, t2) update max_evicted_seq_
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"AddPrepared::begin:pause", "AddPreparedBeforeMax::read_thread:start"},
{"AdvanceMaxEvictedSeq::update_max:pause", "AddPrepared::begin:resume"},
{"AddPrepared::end", "AdvanceMaxEvictedSeq::update_max:resume"},
{"AddPreparedCallback::AddPrepared::begin:pause",
"AddPreparedBeforeMax::read_thread:start"},
{"AdvanceMaxEvictedSeq::update_max:pause",
"AddPreparedCallback::AddPrepared::begin:resume"},
{"AddPreparedCallback::AddPrepared::end",
"AdvanceMaxEvictedSeq::update_max:resume"},
});
SyncPoint::GetInstance()->EnableProcessing();
@ -3061,20 +3056,36 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
ReOpen();
std::atomic<const Snapshot*> snap = {nullptr};
std::atomic<SequenceNumber> exp_prepare = {0};
std::atomic<bool> snapshot_taken = {false};
// Value is synchronized via snap
PinnableSlice value;
// Take a snapshot after publish and before RemovePrepared:Start
auto snap_callback = [&]() {
ASSERT_EQ(nullptr, snap.load());
snap.store(db->GetSnapshot());
ReadOptions roptions;
roptions.snapshot = snap.load();
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value);
ASSERT_OK(s);
snapshot_taken.store(true);
};
auto callback = [&](void* param) {
SequenceNumber prep_seq = *((SequenceNumber*)param);
if (prep_seq == exp_prepare.load()) { // only for write_thread
ASSERT_EQ(nullptr, snap.load());
snap.store(db->GetSnapshot());
ReadOptions roptions;
roptions.snapshot = snap.load();
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value);
ASSERT_OK(s);
// We need to spawn a thread to avoid deadlock since getting a
// snpashot might end up calling AdvanceSeqByOne which needs joining
// the write queue.
auto t = rocksdb::port::Thread(snap_callback);
t.detach();
TEST_SYNC_POINT("callback:end");
}
};
// Wait for the first snapshot be taken in GetSnapshotInternal. Although
// it might be updated before GetSnapshotInternal finishes but this should
// cover most of the cases.
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"},
});
SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
SyncPoint::GetInstance()->EnableProcessing();
// Thread to cause frequent evictions
@ -3098,9 +3109,15 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
// Let an eviction to kick in
std::this_thread::yield();
snapshot_taken.store(false);
exp_prepare.store(txn->GetId());
ASSERT_OK(txn->Commit());
delete txn;
// Wait for the snapshot taking that is triggered by
// RemovePrepared:Start callback
while (!snapshot_taken) {
std::this_thread::yield();
}
// Read with the snapshot taken before delayed_prepared_ cleanup
ReadOptions roptions;

@ -169,12 +169,15 @@ Status WritePreparedTxn::CommitInternal() {
assert(!s.ok() || seq_used != kMaxSequenceNumber);
const SequenceNumber commit_batch_seq = seq_used;
if (LIKELY(do_one_write || !s.ok())) {
if (LIKELY(s.ok())) {
// Note RemovePrepared should be called after WriteImpl that publishsed
if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues &&
s.ok())) {
// Note: RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
}
} // else RemovePrepared is called from within PreReleaseCallback
if (UNLIKELY(!do_one_write)) {
assert(!s.ok());
// Cleanup the prepared entry we added with add_prepared_callback
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
}
return s;
@ -199,10 +202,14 @@ Status WritePreparedTxn::CommitInternal() {
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_aux_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) {
if (s.ok()) {
// Note: RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
}
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
} // else RemovePrepared is called from within PreReleaseCallback
return s;
}
@ -348,6 +355,7 @@ Status WritePreparedTxn::RollbackInternal() {
return s;
}
if (do_one_write) {
assert(!db_impl_->immutable_db_options().two_write_queues);
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
return s;
} // else do the 2nd write for commit
@ -370,9 +378,13 @@ Status WritePreparedTxn::RollbackInternal() {
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"RollbackInternal (status=%s) commit: %" PRIu64,
s.ToString().c_str(), GetId());
// TODO(lth): For WriteUnPrepared that rollback is called frequently,
// RemovePrepared could be moved to the callback to reduce lock contention.
if (s.ok()) {
wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
}
// Note: RemovePrepared for prepared batch is called from within
// PreReleaseCallback
wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
return s;

@ -7,8 +7,8 @@
#include "utilities/transactions/write_prepared_txn_db.h"
#include <cinttypes>
#include <algorithm>
#include <cinttypes>
#include <string>
#include <unordered_set>
#include <vector>
@ -61,8 +61,8 @@ Status WritePreparedTxnDB::Initialize(
explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
: db_(db) {}
Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
bool is_mem_disabled __attribute__((__unused__)), uint64_t,
size_t /*index*/, size_t /*total*/) override {
assert(!is_mem_disabled);
db_->AddCommitted(commit_seq, commit_seq);
return Status::OK();
@ -211,9 +211,7 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
RemovePrepared(prepare_seq, batch_cnt);
// Note: RemovePrepared is called from within PreReleaseCallback
return s;
}
@ -389,8 +387,8 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
}
void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max) {
prepared_mutex_.AssertHeld();
void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max,
bool locked) {
// When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max,
// then it is not in prepared_txns_ and save an expensive, synchronized
@ -401,25 +399,42 @@ void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max) {
"CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64,
prepared_txns_.empty(),
prepared_txns_.empty() ? 0 : prepared_txns_.top());
while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped);
ROCKS_LOG_WARN(info_log_,
"prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
" new_max=%" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()),
to_be_popped, new_max);
prepared_txns_.pop();
delayed_prepared_empty_.store(false, std::memory_order_release);
const SequenceNumber prepared_top = prepared_txns_.top();
const bool empty = prepared_top == kMaxSequenceNumber;
// Preliminary check to avoid the synchronization cost
if (!empty && prepared_top <= new_max) {
if (locked) {
// Needed to avoid double locking in pop().
prepared_txns_.push_pop_mutex()->Unlock();
}
WriteLock wl(&prepared_mutex_);
// Need to fetch fresh values of ::top after mutex is acquired
while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped);
ROCKS_LOG_WARN(info_log_,
"prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
" new_max=%" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()),
to_be_popped, new_max);
prepared_txns_.pop();
delayed_prepared_empty_.store(false, std::memory_order_release);
}
if (locked) {
prepared_txns_.push_pop_mutex()->Lock();
}
}
}
void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
void WritePreparedTxnDB::AddPrepared(uint64_t seq, bool locked) {
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64,
seq, max_evicted_seq_.load());
TEST_SYNC_POINT("AddPrepared::begin:pause");
TEST_SYNC_POINT("AddPrepared::begin:resume");
WriteLock wl(&prepared_mutex_);
if (!locked) {
prepared_txns_.push_pop_mutex()->Lock();
}
prepared_txns_.push_pop_mutex()->AssertHeld();
prepared_txns_.push(seq);
auto new_max = future_max_evicted_seq_.load();
if (UNLIKELY(seq <= new_max)) {
@ -429,7 +444,10 @@ void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
"Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
" <= %" PRIu64,
seq, new_max);
CheckPreparedAgainstMax(new_max);
CheckPreparedAgainstMax(new_max, true /*locked*/);
}
if (!locked) {
prepared_txns_.push_pop_mutex()->Unlock();
}
TEST_SYNC_POINT("AddPrepared::end");
}
@ -582,10 +600,7 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
std::memory_order_relaxed)) {
};
{
WriteLock wl(&prepared_mutex_);
CheckPreparedAgainstMax(new_max);
}
CheckPreparedAgainstMax(new_max, false /*locked*/);
// With each change to max_evicted_seq_ fetch the live snapshots behind it.
// We use max as the version of snapshots to identify how fresh are the
@ -641,6 +656,7 @@ SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
// than the smallest uncommitted seq when the snapshot was taken.
auto min_uncommitted = WritePreparedTxnDB::SmallestUnCommittedSeq();
SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(for_ww_conflict_check);
TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:first");
assert(snap_impl);
SequenceNumber snap_seq = snap_impl->GetSequenceNumber();
// Note: Check against future_max_evicted_seq_ (in contrast with
@ -679,6 +695,7 @@ SnapshotImpl* WritePreparedTxnDB::GetSnapshotInternal(
db_impl_->immutable_db_options().info_log,
"GetSnapshot %" PRIu64 " ww:%" PRIi32 " min_uncommitted: %" PRIu64,
snap_impl->GetSequenceNumber(), for_ww_conflict_check, min_uncommitted);
TEST_SYNC_POINT("WritePreparedTxnDB::GetSnapshotInternal:end");
return snap_impl;
}

@ -324,10 +324,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Add the transaction with prepare sequence seq to the prepared list.
// Note: must be called serially with increasing seq on each call.
void AddPrepared(uint64_t seq);
// locked is true if prepared_mutex_ is already locked.
void AddPrepared(uint64_t seq, bool locked = false);
// Check if any of the prepared txns are less than new max_evicted_seq_. Must
// be called with prepared_mutex_ write locked.
void CheckPreparedAgainstMax(SequenceNumber new_max);
void CheckPreparedAgainstMax(SequenceNumber new_max, bool locked);
// Remove the transaction with prepare sequence seq from the prepared list
void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
// Add the transaction with prepare sequence prepare_seq and commit sequence
@ -461,6 +462,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
std::memory_order order = std::memory_order_relaxed);
private:
friend class AddPreparedCallback;
friend class PreparedHeap_BasicsTest_Test;
friend class PreparedHeap_Concurrent_Test;
friend class PreparedHeap_EmptyAtTheEnd_Test;
@ -506,10 +508,15 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// A heap with the amortized O(1) complexity for erase. It uses one extra heap
// to keep track of erased entries that are not yet on top of the main heap.
class PreparedHeap {
// The mutex is required for push and pop from PreparedHeap. ::erase will
// use external synchronization via prepared_mutex_.
port::Mutex push_pop_mutex_;
// TODO(myabandeh): replace it with deque
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
heap_;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
erased_heap_;
std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber};
// True when testing crash recovery
bool TEST_CRASH_ = false;
friend class WritePreparedTxnDB;
@ -521,10 +528,19 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
assert(erased_heap_.empty());
}
}
bool empty() { return heap_.empty(); }
uint64_t top() { return heap_.top(); }
void push(uint64_t v) { heap_.push(v); }
void pop() {
port::Mutex* push_pop_mutex() { return &push_pop_mutex_; }
inline bool empty() { return top() == kMaxSequenceNumber; }
// Returns kMaxSequenceNumber if empty() and the smallest otherwise.
inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); }
inline void push(uint64_t v) {
heap_.push(v);
heap_top_.store(heap_.top(), std::memory_order_release);
}
void pop(bool locked = false) {
if (!locked) {
push_pop_mutex()->Lock();
}
heap_.pop();
while (!heap_.empty() && !erased_heap_.empty() &&
// heap_.top() > erased_heap_.top() could happen if we have erased
@ -543,15 +559,23 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
while (heap_.empty() && !erased_heap_.empty()) {
erased_heap_.pop();
}
heap_top_.store(!heap_.empty() ? heap_.top() : kMaxSequenceNumber,
std::memory_order_release);
if (!locked) {
push_pop_mutex()->Unlock();
}
}
// Concurrrent calls needs external synchronization. It is safe to be called
// concurrent to push and pop though.
void erase(uint64_t seq) {
if (!heap_.empty()) {
if (seq < heap_.top()) {
auto top_seq = top();
if (seq < top_seq) {
// Already popped, ignore it.
} else if (heap_.top() == seq) {
} else if (top_seq == seq) {
pop();
assert(heap_.empty() || heap_.top() != seq);
} else { // (heap_.top() > seq)
} else { // top() > seq
// Down the heap, remember to pop it later
erased_heap_.push(seq);
}
@ -596,27 +620,37 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// written in two steps, we also update prepared_txns_ at the first step
// (via the same mechanism) so that their uncommitted data is reflected in
// SmallestUnCommittedSeq.
ReadLock rl(&prepared_mutex_);
// Since we are holding the mutex, and GetLatestSequenceNumber is updated
// after prepared_txns_ are, the value of GetLatestSequenceNumber would
// reflect any uncommitted data that is not added to prepared_txns_ yet.
// Otherwise, if there is no concurrent txn, this value simply reflects that
// latest value in the memtable.
if (!delayed_prepared_.empty()) {
assert(!delayed_prepared_empty_.load());
return *delayed_prepared_.begin();
if (!delayed_prepared_empty_.load()) {
ReadLock rl(&prepared_mutex_);
if (!delayed_prepared_.empty()) {
return *delayed_prepared_.begin();
}
}
if (prepared_txns_.empty()) {
return db_impl_->GetLatestSequenceNumber() + 1;
// This must be called before calling ::top. This is because the concurrent
// thread would call ::RemovePrepared before updating
// GetLatestSequenceNumber(). Reading then in opposite order here guarantees
// that the ::top that we read would be lower the ::top if we had otherwise
// update/read them atomically.
auto next_prepare = db_impl_->GetLatestSequenceNumber() + 1;
auto min_prepare = prepared_txns_.top();
bool empty = min_prepare == kMaxSequenceNumber;
if (empty) {
// Since GetLatestSequenceNumber is updated
// after prepared_txns_ are, the value of GetLatestSequenceNumber would
// reflect any uncommitted data that is not added to prepared_txns_ yet.
// Otherwise, if there is no concurrent txn, this value simply reflects
// that latest value in the memtable.
return next_prepare;
} else {
return std::min(prepared_txns_.top(),
db_impl_->GetLatestSequenceNumber() + 1);
return std::min(min_prepare, next_prepare);
}
}
// Enhance the snapshot object by recording in it the smallest uncommitted seq
inline void EnhanceSnapshot(SnapshotImpl* snapshot,
SequenceNumber min_uncommitted) {
assert(snapshot);
assert(min_uncommitted <= snapshot->number_ + 1);
snapshot->min_uncommitted_ = min_uncommitted;
}
@ -778,12 +812,28 @@ class AddPreparedCallback : public PreReleaseCallback {
}
virtual Status Callback(SequenceNumber prepare_seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t log_number) override {
uint64_t log_number, size_t index,
size_t total) override {
assert(index < total);
// To reduce the cost of lock acquisition competing with the concurrent
// prepare requests, lock on the first callback and unlock on the last.
const bool do_lock = !two_write_queues_ || index == 0;
const bool do_unlock = !two_write_queues_ || index + 1 == total;
// Always Prepare from the main queue
assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue
TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:pause");
TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::begin:resume");
if (do_lock) {
db_->prepared_txns_.push_pop_mutex()->Lock();
}
const bool kLocked = true;
for (size_t i = 0; i < sub_batch_cnt_; i++) {
db_->AddPrepared(prepare_seq + i);
db_->AddPrepared(prepare_seq + i, kLocked);
}
if (do_unlock) {
db_->prepared_txns_.push_pop_mutex()->Unlock();
}
TEST_SYNC_POINT("AddPreparedCallback::AddPrepared::end");
if (first_prepare_batch_) {
assert(log_number != 0);
db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
@ -826,7 +876,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
uint64_t, size_t /*index*/,
size_t /*total*/) override {
// Always commit from the 2nd queue
assert(!db_impl_->immutable_db_options().two_write_queues ||
is_mem_disabled);
@ -863,6 +914,14 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
// publish sequence numbers will be in order, i.e., once a seq is
// published all the seq prior to that are also publishable.
db_impl_->SetLastPublishedSequence(last_commit_seq);
// Note RemovePrepared should be called after publishing the seq.
// Otherwise SmallestUnCommittedSeq optimization breaks.
if (prep_seq_ != kMaxSequenceNumber) {
db_->RemovePrepared(prep_seq_, prep_batch_cnt_);
} // else there was no prepare phase
if (includes_aux_batch_) {
db_->RemovePrepared(aux_seq_, aux_batch_cnt_);
}
}
// else SequenceNumber that is updated as part of the write already does the
// publishing
@ -907,8 +966,8 @@ class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
assert(prep_batch_cnt_ > 0);
}
Status Callback(SequenceNumber commit_seq, bool is_mem_disabled,
uint64_t) override {
Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, uint64_t,
size_t /*index*/, size_t /*total*/) override {
// Always commit from the 2nd queue
assert(is_mem_disabled); // implies the 2nd queue
assert(db_impl_->immutable_db_options().two_write_queues);

@ -319,8 +319,8 @@ Status WriteUnpreparedTxn::CommitInternal() {
explicit PublishSeqPreReleaseCallback(DBImpl* db_impl)
: db_impl_(db_impl) {}
Status Callback(SequenceNumber seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
bool is_mem_disabled __attribute__((__unused__)), uint64_t,
size_t /*index*/, size_t /*total*/) override {
assert(is_mem_disabled);
assert(db_impl_->immutable_db_options().two_write_queues);
db_impl_->SetLastPublishedSequence(seq);

@ -185,8 +185,8 @@ Status WriteUnpreparedTxnDB::Initialize(
explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
: db_(db) {}
Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
bool is_mem_disabled __attribute__((__unused__)), uint64_t,
size_t /*index*/, size_t /*total*/) override {
assert(!is_mem_disabled);
db_->AddCommitted(commit_seq, commit_seq);
return Status::OK();

@ -57,7 +57,8 @@ class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
uint64_t, size_t /*index*/,
size_t /*total*/) override {
const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
? commit_seq
: commit_seq + data_batch_cnt_ - 1;
@ -121,7 +122,8 @@ class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback {
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
uint64_t, size_t /*index*/,
size_t /*total*/) override {
assert(is_mem_disabled); // implies the 2nd queue
const uint64_t last_commit_seq = commit_seq;
db_->AddCommitted(rollback_seq_, last_commit_seq);

Loading…
Cancel
Save