WriteAtPrepare: Efficient read from snapshot list

Summary:
Divide the old snapshots to two lists: a few that fit into a cached array and the rest in a vector, which is expected to be empty in normal cases. The former is to optimize concurrent reads from snapshots without requiring locks. It is done by an array of std::atomic, from which std::memory_order_acquire reads are compiled to simple read instructions in most of the x86_64 architectures.
Closes https://github.com/facebook/rocksdb/pull/2758

Differential Revision: D5660504

Pulled By: maysamyabandeh

fbshipit-source-id: 524fcf9a8e7f90a92324536456912a99aaa6740c
main
Maysam Yabandeh 8 years ago committed by Facebook Github Bot
parent b01f426f56
commit fbfa3e7a43
  1. 9
      db/snapshot_impl.h
  2. 6
      include/rocksdb/utilities/transaction_db.h
  3. 135
      utilities/transactions/pessimistic_transaction_db.cc
  4. 51
      utilities/transactions/pessimistic_transaction_db.h
  5. 33
      utilities/transactions/transaction_test.cc

@ -74,9 +74,11 @@ class SnapshotList {
count_--;
}
// retrieve all snapshot numbers. They are sorted in ascending order.
// retrieve all snapshot numbers up until max_seq. They are sorted in
// ascending order.
std::vector<SequenceNumber> GetAll(
SequenceNumber* oldest_write_conflict_snapshot = nullptr) const {
SequenceNumber* oldest_write_conflict_snapshot = nullptr,
const SequenceNumber& max_seq = kMaxSequenceNumber) const {
std::vector<SequenceNumber> ret;
if (oldest_write_conflict_snapshot != nullptr) {
@ -88,6 +90,9 @@ class SnapshotList {
}
const SnapshotImpl* s = &list_;
while (s->next_ != &list_) {
if (s->next_->number_ > max_seq) {
break;
}
ret.push_back(s->next_->number_);
if (oldest_write_conflict_snapshot != nullptr &&

@ -25,8 +25,10 @@ class TransactionDBMutexFactory;
enum TxnDBWritePolicy {
WRITE_COMMITTED = 0, // write only the committed data
WRITE_PREPARED, // write data after the prepare phase of 2pc
WRITE_UNPREPARED // write data before the prepare phase of 2pc
// TODO(myabandeh): Not implemented yet
WRITE_PREPARED, // write data after the prepare phase of 2pc
// TODO(myabandeh): Not implemented yet
WRITE_UNPREPARED // write data before the prepare phase of 2pc
};
const uint32_t kInitialMaxDeadlocks = 5;

@ -5,8 +5,13 @@
#ifndef ROCKSDB_LITE
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include "utilities/transactions/pessimistic_transaction_db.h"
#include <inttypes.h>
#include <string>
#include <unordered_set>
#include <vector>
@ -34,6 +39,7 @@ PessimisticTransactionDB::PessimisticTransactionDB(
: std::shared_ptr<TransactionDBMutexFactory>(
new TransactionDBMutexFactoryImpl())) {
assert(db_impl_ != nullptr);
info_log_ = db_impl_->GetDBOptions().info_log;
}
// Support initiliazing PessimisticTransactionDB from a stackable db
@ -581,16 +587,23 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
return false;
}
void WritePreparedTxnDB::AddPrepared(uint64_t seq) { prepared_txns_.push(seq); }
void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq);
WriteLock wl(&prepared_mutex_);
prepared_txns_.push(seq);
}
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
uint64_t commit_seq) {
ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
prepare_seq, commit_seq);
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
CommitEntry evicted;
bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted);
if (to_be_evicted) {
auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
if (prev_max < evicted.commit_seq) {
// TODO(myabandeh) inc max in larger steps to avoid frequent updates
auto max_evicted_seq = evicted.commit_seq;
// When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max,
@ -607,11 +620,59 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
delayed_prepared_empty_.store(false, std::memory_order_release);
}
}
// With each change to max_evicted_seq_ fetch the live snapshots behind it
SequenceNumber curr_seq;
std::vector<SequenceNumber> all_snapshots;
bool update_snapshots = false;
{
WriteLock wl(&snapshots_mutex_);
InstrumentedMutex(db_impl_->mutex());
snapshots_ = db_impl_->snapshots().GetAll();
// We use this to identify how fresh are the snapshot list. Since this
// is done atomically with obtaining the snapshot list, the one with
// the larger seq is more fresh. If the seq is equal the full snapshot
// list could be different since taking snapshots does not increase
// the db seq. However since we only care about snapshots before the
// new max, such recent snapshots would not be included the in the
// list anyway.
curr_seq = db_impl_->GetLatestSequenceNumber();
if (curr_seq > snapshots_version_) {
// This is to avoid updating the snapshots_ if it already updated
// with a more recent vesion by a concrrent thread
update_snapshots = true;
// We only care about snapshots lower then max
all_snapshots =
db_impl_->snapshots().GetAll(nullptr, max_evicted_seq);
}
}
if (update_snapshots) {
WriteLock wl(&snapshots_mutex_);
snapshots_version_ = curr_seq;
// We update the list concurrently with the readers.
// Both new and old lists are sorted and the new list is subset of the
// previous list plus some new items. Thus if a snapshot repeats in
// both new and old lists, it will appear upper in the new list. So if
// we simply insert the new snapshots in order, if an overwritten item
// is still valid in the new list is either written to the same place in
// the array or it is written in a higher palce before it gets
// overwritten by another item. This guarantess a reader that reads the
// list bottom-up will eventaully see a snapshot that repeats in the
// update, either before it gets overwritten by the writer or
// afterwards.
size_t i = 0;
auto it = all_snapshots.begin();
for (; it != all_snapshots.end() && i < SNAPSHOT_CACHE_SIZE;
it++, i++) {
snapshot_cache_[i].store(*it, std::memory_order_release);
}
snapshots_.clear();
for (; it != all_snapshots.end(); it++) {
// Insert them to a vector that is less efficient to access
// concurrently
snapshots_.push_back(*it);
}
// Update the size at the end. Otherwise a parallel reader might read
// items that are not set yet.
snapshots_total_.store(all_snapshots.size(), std::memory_order_release);
}
while (prev_max < max_evicted_seq &&
!max_evicted_seq_.compare_exchange_weak(
@ -621,17 +682,41 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
}
// After each eviction from commit cache, check if the commit entry should
// be kept around because it overlaps with a live snapshot.
{
// First check the snapshot cache that is efficient for concurrent access
auto cnt = snapshots_total_.load(std::memory_order_acquire);
// The list might get updated concurrently as we are reading from it. The
// reader should be able to read all the snapshots that are still valid
// after the update. Since the survived snapshots are written in a higher
// place before gets overwritten the reader that reads bottom-up will
// eventully see it.
const bool next_is_larger = true;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
for (; 0 < ip1; ip1--) {
snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq, !next_is_larger)) {
break;
}
}
if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
snapshot_seq < evicted.prep_seq)) {
// Then access the less efficient list of snapshots_
ReadLock rl(&snapshots_mutex_);
for (auto snapshot_seq : snapshots_) {
if (evicted.commit_seq <= snapshot_seq) {
// Items could have moved from the snapshots_ to snapshot_cache_ before
// accquiring the lock. To make sure that we do not miss a valid snapshot,
// read snapshot_cache_ again while holding the lock.
for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire);
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq, next_is_larger)) {
break;
}
// then snapshot_seq < evicted.commit_seq
if (evicted.prep_seq <= snapshot_seq) { // overlapping range
WriteLock wl(&old_commit_map_mutex_);
old_commit_map_empty_.store(false, std::memory_order_release);
old_commit_map_[evicted.prep_seq] = evicted.commit_seq;
}
for (auto snapshot_seq_2 : snapshots_) {
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq_2, next_is_larger)) {
break;
}
}
}
@ -691,7 +776,31 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(uint64_t indexed_seq,
}
// 10m entry, 80MB size
uint64_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE =
static_cast<uint64_t>(1 << 21);
size_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = static_cast<size_t>(1 << 21);
size_t WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE =
static_cast<size_t>(1 << 7);
bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
const uint64_t& prep_seq, const uint64_t& commit_seq,
const uint64_t& snapshot_seq, const bool next_is_larger = true) {
// If we do not store an entry in old_commit_map we assume it is committed in
// all snapshots. if commit_seq <= snapshot_seq, it is considered already in
// the snapshot so we need not to keep the entry around for this snapshot.
if (commit_seq <= snapshot_seq) {
// continue the search if the next snapshot could be smaller than commit_seq
return !next_is_larger;
}
// then snapshot_seq < commit_seq
if (prep_seq <= snapshot_seq) { // overlapping range
WriteLock wl(&old_commit_map_mutex_);
old_commit_map_empty_.store(false, std::memory_order_release);
old_commit_map_[prep_seq] = commit_seq;
// Storing once is enough. No need to check it for other snapshots.
return false;
}
// continue the search if the next snapshot could be larger than prep_seq
return next_is_larger;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -107,6 +107,8 @@ class PessimisticTransactionDB : public TransactionDB {
struct CommitEntry {
uint64_t prep_seq;
uint64_t commit_seq;
CommitEntry() : prep_seq(0), commit_seq(0) {}
CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {}
};
protected:
@ -114,8 +116,10 @@ class PessimisticTransactionDB : public TransactionDB {
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions());
DBImpl* db_impl_;
std::shared_ptr<Logger> info_log_;
private:
friend class WritePreparedTxnDB;
const TransactionDBOptions txn_db_options_;
TransactionLockMgr lock_mgr_;
@ -162,6 +166,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
explicit WritePreparedTxnDB(DB* db,
const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE),
COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) {
init(txn_db_options);
}
@ -169,6 +174,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
explicit WritePreparedTxnDB(StackableDB* db,
const TransactionDBOptions& txn_db_options)
: PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE),
COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) {
init(txn_db_options);
}
@ -192,6 +198,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
void init(const TransactionDBOptions& /* unused */) {
snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
commit_cache_ =
unique_ptr<CommitEntry[]>(new CommitEntry[COMMIT_CACHE_SIZE]{});
}
@ -199,8 +207,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// A heap with the amortized O(1) complexity for erase. It uses one extra heap
// to keep track of erased entries that are not yet on top of the main heap.
class PreparedHeap {
std::priority_queue<uint64_t> heap_;
std::priority_queue<uint64_t> erased_heap_;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
heap_;
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
erased_heap_;
public:
bool empty() { return heap_.empty(); }
@ -216,7 +226,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
}
void erase(uint64_t seq) {
if (!heap_.empty()) {
if (heap_.top() < seq) {
if (seq < heap_.top()) {
// Already popped, ignore it.
} else if (heap_.top() == seq) {
heap_.pop();
@ -242,15 +252,42 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
bool ExchangeCommitEntry(uint64_t indexed_seq, CommitEntry& expected_entry,
CommitEntry new_entry);
// Add a new entry to old_commit_map_ if prep_seq <= snapshot_seq <
// commit_seq. Return false if checking the next snapshot(s) is not needed.
// This is the case if the entry already added to old_commit_map_ or none of
// the next snapshots could satisfy the condition. next_is_larger: the next
// snapshot will be a larger value
bool MaybeUpdateOldCommitMap(const uint64_t& prep_seq,
const uint64_t& commit_seq,
const uint64_t& snapshot_seq,
const bool next_is_larger);
// The list of live snapshots at the last time that max_evicted_seq_ advanced.
// The list sorted in ascending order. Thread-safety is provided with
// snapshots_mutex_.
// The list stored into two data structures: in snapshot_cache_ that is
// efficient for concurrent reads, and in snapshots_ if the data does not fit
// into snapshot_cache_. The total number of snapshots in the two lists
std::atomic<size_t> snapshots_total_ = {};
// The list sorted in ascending order. Thread-safety for writes is provided
// with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
// each entry. In x86_64 architecture such reads are compiled to simple read
// instructions. 128 entries
// TODO(myabandeh): avoid non-const static variables
static size_t DEF_SNAPSHOT_CACHE_SIZE;
const size_t SNAPSHOT_CACHE_SIZE;
unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
// 2nd list for storing snapshots. The list sorted in ascending order.
// Thread-safety is provided with snapshots_mutex_.
std::vector<SequenceNumber> snapshots_;
// The version of the latest list of snapshots. This can be used to avoid
// rewrittiing a list that is concurrently updated with a more recent version.
SequenceNumber snapshots_version_ = 0;
// A heap of prepared transactions. Thread-safety is provided with
// prepared_mutex_.
PreparedHeap prepared_txns_;
static uint64_t DEF_COMMIT_CACHE_SIZE;
const uint64_t COMMIT_CACHE_SIZE;
// TODO(myabandeh): avoid non-const static variables
static size_t DEF_COMMIT_CACHE_SIZE;
const size_t COMMIT_CACHE_SIZE;
// commit_cache_ must be initialized to zero to tell apart an empty index from
// a filled one. Thread-safety is provided with commit_cache_mutex_.
unique_ptr<CommitEntry[]> commit_cache_;

@ -5,6 +5,11 @@
#ifndef ROCKSDB_LITE
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <algorithm>
#include <functional>
#include <string>
@ -4734,8 +4739,10 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
WriteOptions wo;
// Use small commit cache to trigger lots of eviction and fast advance of
// max_evicted_seq_
WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE =
8; // will take effect after ReOpen
// will take effect after ReOpen
WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = 8;
// Same for snapshot cache size
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = 5;
// Take some preliminary snapshots first. This is to stress the data structure
// that holds the old snapshots as it will be designed to be efficient when
@ -4755,6 +4762,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
uint64_t cur_txn = 0;
// Number of snapshots taken so far
int num_snapshots = 0;
std::vector<const Snapshot*> to_be_released;
// Number of gaps applied so far
int gap_cnt = 0;
// The final snapshot that we will inspect
@ -4800,14 +4808,17 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
if (num_snapshots < max_snapshots - 1) {
// Take preliminary snapshots
db->GetSnapshot();
auto tmp_snapshot = db->GetSnapshot();
to_be_released.push_back(tmp_snapshot);
num_snapshots++;
} else if (gap_cnt < max_gap) {
// Wait for some gap before taking the final snapshot
gap_cnt++;
} else if (!snapshot) {
// Take the final snapshot if it is not already taken
snapshot = db->GetSnapshot()->GetSequenceNumber();
auto tmp_snapshot = db->GetSnapshot();
to_be_released.push_back(tmp_snapshot);
snapshot = tmp_snapshot->GetSequenceNumber();
// We increase the db seq artificailly by a dummy Put. Check that this
// technique is effective and db seq is that same as ours.
ASSERT_EQ(snapshot, seq);
@ -4823,11 +4834,12 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
(committed_before.find(s) != committed_before.end());
bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
if (was_committed != is_in_snapshot) {
printf(
"max_snapshots %d max_gap %d seq %lu max %lu snapshot %lu "
"gap_cnt %d num_snapshots %d\n",
max_snapshots, max_gap, seq, wp_db->max_evicted_seq_.load(),
snapshot, gap_cnt, num_snapshots);
printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
" snapshot %" PRIu64
" gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
max_snapshots, max_gap, seq,
wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
num_snapshots, s);
}
ASSERT_EQ(was_committed, is_in_snapshot);
found_committed = found_committed || is_in_snapshot;
@ -4846,6 +4858,9 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
for (auto s : to_be_released) {
db->ReleaseSnapshot(s);
}
}
}
}

Loading…
Cancel
Save