WritePrepared: switch PreparedHeap from priority_queue to deque (#5436)

Summary:
Internally PreparedHeap is currently using a priority_queue. The rationale was the in the initial design PreparedHeap::AddPrepared could be called in arbitrary order. With the recent optimizations, we call ::AddPrepared only from the main write queue, which results into in-order insertion into PreparedHeap. The patch thus replaces the underlying priority_queue with a more efficient deque implementation.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5436

Differential Revision: D15752147

Pulled By: maysamyabandeh

fbshipit-source-id: e6960f2b2097e13137dded1ceeff3b10b03b0aeb
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent ca1aee2a19
commit 773f914a40
  1. 58
      utilities/transactions/write_prepared_transaction_test.cc
  2. 7
      utilities/transactions/write_prepared_txn_db.cc
  3. 31
      utilities/transactions/write_prepared_txn_db.h
  4. 13
      utilities/transactions/write_unprepared_txn_db.cc

@ -48,6 +48,8 @@ using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat;
TEST(PreparedHeap, BasicsTest) { TEST(PreparedHeap, BasicsTest) {
WritePreparedTxnDB::PreparedHeap heap; WritePreparedTxnDB::PreparedHeap heap;
{
MutexLock ml(heap.push_pop_mutex());
heap.push(14l); heap.push(14l);
// Test with one element // Test with one element
ASSERT_EQ(14l, heap.top()); ASSERT_EQ(14l, heap.top());
@ -60,6 +62,7 @@ TEST(PreparedHeap, BasicsTest) {
heap.push(64l); heap.push(64l);
heap.push(74l); heap.push(74l);
heap.push(84l); heap.push(84l);
}
// Test that old min is still on top // Test that old min is still on top
ASSERT_EQ(14l, heap.top()); ASSERT_EQ(14l, heap.top());
heap.erase(24l); heap.erase(24l);
@ -81,11 +84,14 @@ TEST(PreparedHeap, BasicsTest) {
ASSERT_EQ(64l, heap.top()); ASSERT_EQ(64l, heap.top());
heap.erase(84l); heap.erase(84l);
ASSERT_EQ(64l, heap.top()); ASSERT_EQ(64l, heap.top());
{
MutexLock ml(heap.push_pop_mutex());
heap.push(85l); heap.push(85l);
heap.push(86l); heap.push(86l);
heap.push(87l); heap.push(87l);
heap.push(88l); heap.push(88l);
heap.push(89l); heap.push(89l);
}
heap.erase(87l); heap.erase(87l);
heap.erase(85l); heap.erase(85l);
heap.erase(89l); heap.erase(89l);
@ -106,13 +112,19 @@ TEST(PreparedHeap, BasicsTest) {
// not resurface again. // not resurface again.
TEST(PreparedHeap, EmptyAtTheEnd) { TEST(PreparedHeap, EmptyAtTheEnd) {
WritePreparedTxnDB::PreparedHeap heap; WritePreparedTxnDB::PreparedHeap heap;
{
MutexLock ml(heap.push_pop_mutex());
heap.push(40l); heap.push(40l);
}
ASSERT_EQ(40l, heap.top()); ASSERT_EQ(40l, heap.top());
// Although not a recommended scenario, we must be resilient against erase // Although not a recommended scenario, we must be resilient against erase
// without a prior push. // without a prior push.
heap.erase(50l); heap.erase(50l);
ASSERT_EQ(40l, heap.top()); ASSERT_EQ(40l, heap.top());
{
MutexLock ml(heap.push_pop_mutex());
heap.push(60l); heap.push(60l);
}
ASSERT_EQ(40l, heap.top()); ASSERT_EQ(40l, heap.top());
heap.erase(60l); heap.erase(60l);
@ -120,11 +132,17 @@ TEST(PreparedHeap, EmptyAtTheEnd) {
heap.erase(40l); heap.erase(40l);
ASSERT_TRUE(heap.empty()); ASSERT_TRUE(heap.empty());
{
MutexLock ml(heap.push_pop_mutex());
heap.push(40l); heap.push(40l);
}
ASSERT_EQ(40l, heap.top()); ASSERT_EQ(40l, heap.top());
heap.erase(50l); heap.erase(50l);
ASSERT_EQ(40l, heap.top()); ASSERT_EQ(40l, heap.top());
{
MutexLock ml(heap.push_pop_mutex());
heap.push(60l); heap.push(60l);
}
ASSERT_EQ(40l, heap.top()); ASSERT_EQ(40l, heap.top());
heap.erase(40l); heap.erase(40l);
@ -139,30 +157,37 @@ TEST(PreparedHeap, EmptyAtTheEnd) {
// successfully emptied at the end. // successfully emptied at the end.
TEST(PreparedHeap, Concurrent) { TEST(PreparedHeap, Concurrent) {
const size_t t_cnt = 10; const size_t t_cnt = 10;
rocksdb::port::Thread t[t_cnt]; rocksdb::port::Thread t[t_cnt + 1];
Random rnd(1103);
WritePreparedTxnDB::PreparedHeap heap; WritePreparedTxnDB::PreparedHeap heap;
port::RWMutex prepared_mutex; port::RWMutex prepared_mutex;
std::atomic<size_t> last;
for (size_t n = 0; n < 100; n++) { for (size_t n = 0; n < 100; n++) {
for (size_t i = 0; i < t_cnt; i++) { last = 0;
t[0] = rocksdb::port::Thread([&heap, t_cnt, &last]() {
Random rnd(1103);
for (size_t seq = 1; seq <= t_cnt; seq++) {
// This is not recommended usage but we should be resilient against it. // This is not recommended usage but we should be resilient against it.
bool skip_push = rnd.OneIn(5); bool skip_push = rnd.OneIn(5);
t[i] = rocksdb::port::Thread([&heap, &prepared_mutex, skip_push, i]() {
auto seq = i;
std::this_thread::yield();
if (!skip_push) { if (!skip_push) {
WriteLock wl(&prepared_mutex); MutexLock ml(heap.push_pop_mutex());
std::this_thread::yield();
heap.push(seq); heap.push(seq);
last.store(seq);
} }
}
});
for (size_t i = 1; i <= t_cnt; i++) {
t[i] = rocksdb::port::Thread([&heap, &prepared_mutex, &last, i]() {
auto seq = i;
do {
std::this_thread::yield(); std::this_thread::yield();
{ } while (last.load() < seq);
WriteLock wl(&prepared_mutex); WriteLock wl(&prepared_mutex);
heap.erase(seq); heap.erase(seq);
}
}); });
} }
for (size_t i = 0; i < t_cnt; i++) { for (size_t i = 0; i <= t_cnt; i++) {
t[i].join(); t[i].join();
} }
ASSERT_TRUE(heap.empty()); ASSERT_TRUE(heap.empty());
@ -3197,7 +3222,7 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
ReOpen(); ReOpen();
std::atomic<const Snapshot*> snap = {nullptr}; std::atomic<const Snapshot*> snap = {nullptr};
std::atomic<SequenceNumber> exp_prepare = {0}; std::atomic<SequenceNumber> exp_prepare = {0};
std::atomic<bool> snapshot_taken = {false}; rocksdb::port::Thread callback_thread;
// Value is synchronized via snap // Value is synchronized via snap
PinnableSlice value; PinnableSlice value;
// Take a snapshot after publish and before RemovePrepared:Start // Take a snapshot after publish and before RemovePrepared:Start
@ -3208,7 +3233,6 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
roptions.snapshot = snap.load(); roptions.snapshot = snap.load();
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value); auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &value);
ASSERT_OK(s); ASSERT_OK(s);
snapshot_taken.store(true);
}; };
auto callback = [&](void* param) { auto callback = [&](void* param) {
SequenceNumber prep_seq = *((SequenceNumber*)param); SequenceNumber prep_seq = *((SequenceNumber*)param);
@ -3216,8 +3240,7 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
// We need to spawn a thread to avoid deadlock since getting a // We need to spawn a thread to avoid deadlock since getting a
// snpashot might end up calling AdvanceSeqByOne which needs joining // snpashot might end up calling AdvanceSeqByOne which needs joining
// the write queue. // the write queue.
auto t = rocksdb::port::Thread(snap_callback); callback_thread = rocksdb::port::Thread(snap_callback);
t.detach();
TEST_SYNC_POINT("callback:end"); TEST_SYNC_POINT("callback:end");
} }
}; };
@ -3250,15 +3273,12 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
// Let an eviction to kick in // Let an eviction to kick in
std::this_thread::yield(); std::this_thread::yield();
snapshot_taken.store(false);
exp_prepare.store(txn->GetId()); exp_prepare.store(txn->GetId());
ASSERT_OK(txn->Commit()); ASSERT_OK(txn->Commit());
delete txn; delete txn;
// Wait for the snapshot taking that is triggered by // Wait for the snapshot taking that is triggered by
// RemovePrepared:Start callback // RemovePrepared:Start callback
while (!snapshot_taken) { callback_thread.join();
std::this_thread::yield();
}
// Read with the snapshot taken before delayed_prepared_ cleanup // Read with the snapshot taken before delayed_prepared_ cleanup
ReadOptions roptions; ReadOptions roptions;
@ -3278,11 +3298,11 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
}); });
write_thread.join(); write_thread.join();
eviction_thread.join(); eviction_thread.join();
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
} }
} }
}
// Test that updating the commit map will not affect the existing snapshots // Test that updating the commit map will not affect the existing snapshots
TEST_P(WritePreparedTransactionTest, AtomicCommit) { TEST_P(WritePreparedTransactionTest, AtomicCommit) {

@ -32,12 +32,19 @@ Status WritePreparedTxnDB::Initialize(
auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB()); auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
assert(dbimpl != nullptr); assert(dbimpl != nullptr);
auto rtxns = dbimpl->recovered_transactions(); auto rtxns = dbimpl->recovered_transactions();
std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
for (auto rtxn : rtxns) { for (auto rtxn : rtxns) {
// There should only one batch for WritePrepared policy. // There should only one batch for WritePrepared policy.
assert(rtxn.second->batches_.size() == 1); assert(rtxn.second->batches_.size() == 1);
const auto& seq = rtxn.second->batches_.begin()->first; const auto& seq = rtxn.second->batches_.begin()->first;
const auto& batch_info = rtxn.second->batches_.begin()->second; const auto& batch_info = rtxn.second->batches_.begin()->second;
auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1; auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
ordered_seq_cnt[seq] = cnt;
}
// AddPrepared must be called in order
for (auto seq_cnt: ordered_seq_cnt) {
auto seq = seq_cnt.first;
auto cnt = seq_cnt.second;
for (size_t i = 0; i < cnt; i++) { for (size_t i = 0; i < cnt; i++) {
AddPrepared(seq + i); AddPrepared(seq + i);
} }

@ -511,9 +511,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// The mutex is required for push and pop from PreparedHeap. ::erase will // The mutex is required for push and pop from PreparedHeap. ::erase will
// use external synchronization via prepared_mutex_. // use external synchronization via prepared_mutex_.
port::Mutex push_pop_mutex_; port::Mutex push_pop_mutex_;
// TODO(myabandeh): replace it with deque std::deque<uint64_t> heap_;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
heap_;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
erased_heap_; erased_heap_;
std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber}; std::atomic<uint64_t> heap_top_ = {kMaxSequenceNumber};
@ -534,21 +532,27 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Returns kMaxSequenceNumber if empty() and the smallest otherwise. // Returns kMaxSequenceNumber if empty() and the smallest otherwise.
inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); } inline uint64_t top() { return heap_top_.load(std::memory_order_acquire); }
inline void push(uint64_t v) { inline void push(uint64_t v) {
heap_.push(v); push_pop_mutex_.AssertHeld();
heap_top_.store(heap_.top(), std::memory_order_release); if (heap_.empty()) {
heap_top_.store(v, std::memory_order_release);
} else {
assert(heap_top_.load() < v);
}
heap_.push_back(v);
} }
void pop(bool locked = false) { void pop(bool locked = false) {
if (!locked) { if (!locked) {
push_pop_mutex()->Lock(); push_pop_mutex()->Lock();
} }
heap_.pop(); push_pop_mutex_.AssertHeld();
heap_.pop_front();
while (!heap_.empty() && !erased_heap_.empty() && while (!heap_.empty() && !erased_heap_.empty() &&
// heap_.top() > erased_heap_.top() could happen if we have erased // heap_.top() > erased_heap_.top() could happen if we have erased
// a non-existent entry. Ideally the user should not do that but we // a non-existent entry. Ideally the user should not do that but we
// should be resilient against it. // should be resilient against it.
heap_.top() >= erased_heap_.top()) { heap_.front() >= erased_heap_.top()) {
if (heap_.top() == erased_heap_.top()) { if (heap_.front() == erased_heap_.top()) {
heap_.pop(); heap_.pop_front();
} }
uint64_t erased __attribute__((__unused__)); uint64_t erased __attribute__((__unused__));
erased = erased_heap_.top(); erased = erased_heap_.top();
@ -559,7 +563,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
while (heap_.empty() && !erased_heap_.empty()) { while (heap_.empty() && !erased_heap_.empty()) {
erased_heap_.pop(); erased_heap_.pop();
} }
heap_top_.store(!heap_.empty() ? heap_.top() : kMaxSequenceNumber, heap_top_.store(!heap_.empty() ? heap_.front() : kMaxSequenceNumber,
std::memory_order_release); std::memory_order_release);
if (!locked) { if (!locked) {
push_pop_mutex()->Unlock(); push_pop_mutex()->Unlock();
@ -568,13 +572,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Concurrrent calls needs external synchronization. It is safe to be called // Concurrrent calls needs external synchronization. It is safe to be called
// concurrent to push and pop though. // concurrent to push and pop though.
void erase(uint64_t seq) { void erase(uint64_t seq) {
if (!heap_.empty()) { if (!empty()) {
auto top_seq = top(); auto top_seq = top();
if (seq < top_seq) { if (seq < top_seq) {
// Already popped, ignore it. // Already popped, ignore it.
} else if (top_seq == seq) { } else if (top_seq == seq) {
pop(); pop();
assert(heap_.empty() || heap_.top() != seq); #ifndef NDEBUG
MutexLock ml(push_pop_mutex());
assert(heap_.empty() || heap_.front() != seq);
#endif
} else { // top() > seq } else { // top() > seq
// Down the heap, remember to pop it later // Down the heap, remember to pop it later
erased_heap_.push(seq); erased_heap_.push(seq);

@ -225,6 +225,7 @@ Status WriteUnpreparedTxnDB::Initialize(
// create 'real' transactions from recovered shell transactions // create 'real' transactions from recovered shell transactions
auto rtxns = dbimpl->recovered_transactions(); auto rtxns = dbimpl->recovered_transactions();
std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
for (auto rtxn : rtxns) { for (auto rtxn : rtxns) {
auto recovered_trx = rtxn.second; auto recovered_trx = rtxn.second;
assert(recovered_trx); assert(recovered_trx);
@ -266,9 +267,7 @@ Status WriteUnpreparedTxnDB::Initialize(
auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1; auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
assert(batch_info.log_number_); assert(batch_info.log_number_);
for (size_t i = 0; i < cnt; i++) { ordered_seq_cnt[seq] = cnt;
AddPrepared(seq + i);
}
assert(wupt->unprep_seqs_.count(seq) == 0); assert(wupt->unprep_seqs_.count(seq) == 0);
wupt->unprep_seqs_[seq] = cnt; wupt->unprep_seqs_[seq] = cnt;
KeySetBuilder keyset_handler(wupt, KeySetBuilder keyset_handler(wupt,
@ -288,6 +287,14 @@ Status WriteUnpreparedTxnDB::Initialize(
break; break;
} }
} }
// AddPrepared must be called in order
for (auto seq_cnt: ordered_seq_cnt) {
auto seq = seq_cnt.first;
auto cnt = seq_cnt.second;
for (size_t i = 0; i < cnt; i++) {
AddPrepared(seq + i);
}
}
SequenceNumber prev_max = max_evicted_seq_; SequenceNumber prev_max = max_evicted_seq_;
SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();

Loading…
Cancel
Save