WritePrepared Txn: enable rollback in stress test

Summary:
Rollback was disabled in stress test since there was a concurrency issue in WritePrepared rollback algorithm. The issue is fixed by caching the column family handles in WritePrepared to skip getting them from the db when needed for rollback.

Tested by running transaction stress test under tsan.
Closes https://github.com/facebook/rocksdb/pull/3785

Differential Revision: D7793727

Pulled By: maysamyabandeh

fbshipit-source-id: d81ab6fda0e53186ca69944cfe0712ce4869451e
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 5bed8a0065
commit cfb86659bf
  1. 4
      util/transaction_test_util.cc
  2. 2
      utilities/transactions/pessimistic_transaction_db.h
  3. 9
      utilities/transactions/write_prepared_txn.cc
  4. 26
      utilities/transactions/write_prepared_txn_db.cc
  5. 18
      utilities/transactions/write_prepared_txn_db.h

@ -184,9 +184,7 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
s = txn->Prepare(); s = txn->Prepare();
assert(s.ok()); assert(s.ok());
} }
// TODO(myabandeh): enable this when WritePreparedTxnDB::RollbackPrepared if (!rand_->OneIn(20)) {
// is updated to handle in-the-middle rollbacks.
if (!rand_->OneIn(0)) {
s = txn->Commit(); s = txn->Commit();
} else { } else {
// Also try 5% rollback // Also try 5% rollback

@ -120,7 +120,7 @@ class PessimisticTransactionDB : public TransactionDB {
// an odd performance drop we observed when the added std::atomic member to // an odd performance drop we observed when the added std::atomic member to
// the base class even when the subclass do not read it in the fast path. // the base class even when the subclass do not read it in the fast path.
virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {} virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}
virtual void UpdateCFComparatorMap(const ColumnFamilyHandle*) {} virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}
protected: protected:
DBImpl* db_impl_; DBImpl* db_impl_;

@ -211,6 +211,8 @@ Status WritePreparedTxn::RollbackInternal() {
WriteBatch rollback_batch; WriteBatch rollback_batch;
assert(GetId() != kMaxSequenceNumber); assert(GetId() != kMaxSequenceNumber);
assert(GetId() > 0); assert(GetId() > 0);
auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
// In WritePrepared, the txn is is the same as prepare seq // In WritePrepared, the txn is is the same as prepare seq
auto last_visible_txn = GetId() - 1; auto last_visible_txn = GetId() - 1;
struct RollbackWriteBatchBuilder : public WriteBatch::Handler { struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
@ -219,6 +221,7 @@ Status WritePreparedTxn::RollbackInternal() {
WritePreparedTxnReadCallback callback; WritePreparedTxnReadCallback callback;
WriteBatch* rollback_batch_; WriteBatch* rollback_batch_;
std::map<uint32_t, const Comparator*>& comparators_; std::map<uint32_t, const Comparator*>& comparators_;
std::map<uint32_t, ColumnFamilyHandle*>& handles_;
using CFKeys = std::set<Slice, SetComparator>; using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_; std::map<uint32_t, CFKeys> keys_;
bool rollback_merge_operands_; bool rollback_merge_operands_;
@ -226,12 +229,14 @@ Status WritePreparedTxn::RollbackInternal() {
DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
WriteBatch* dst_batch, WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators, std::map<uint32_t, const Comparator*>& comparators,
std::map<uint32_t, ColumnFamilyHandle*>& handles,
bool rollback_merge_operands) bool rollback_merge_operands)
: db_(db), : db_(db),
callback(wpt_db, snap_seq, callback(wpt_db, snap_seq,
0), // 0 disables min_uncommitted optimization 0), // 0 disables min_uncommitted optimization
rollback_batch_(dst_batch), rollback_batch_(dst_batch),
comparators_(comparators), comparators_(comparators),
handles_(handles),
rollback_merge_operands_(rollback_merge_operands) {} rollback_merge_operands_(rollback_merge_operands) {}
Status Rollback(uint32_t cf, const Slice& key) { Status Rollback(uint32_t cf, const Slice& key) {
@ -249,7 +254,7 @@ Status WritePreparedTxn::RollbackInternal() {
PinnableSlice pinnable_val; PinnableSlice pinnable_val;
bool not_used; bool not_used;
auto cf_handle = db_->GetColumnFamilyHandle(cf); auto cf_handle = handles_[cf];
s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used, s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
&callback); &callback);
assert(s.ok() || s.IsNotFound()); assert(s.ok() || s.IsNotFound());
@ -299,7 +304,7 @@ Status WritePreparedTxn::RollbackInternal() {
protected: protected:
virtual bool WriteAfterCommit() const override { return false; } virtual bool WriteAfterCommit() const override { return false; }
} rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch, } rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch,
*wpt_db_->GetCFComparatorMap(), *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
wpt_db_->txn_db_options_.rollback_merge_operands); wpt_db_->txn_db_options_.rollback_merge_operands);
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler); auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
assert(s.ok()); assert(s.ok());

@ -230,25 +230,37 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options,
void WritePreparedTxnDB::UpdateCFComparatorMap( void WritePreparedTxnDB::UpdateCFComparatorMap(
const std::vector<ColumnFamilyHandle*>& handles) { const std::vector<ColumnFamilyHandle*>& handles) {
auto cf_map = new std::map<uint32_t, const Comparator*>(); auto cf_map = new std::map<uint32_t, const Comparator*>();
auto handle_map = new std::map<uint32_t, ColumnFamilyHandle*>();
for (auto h : handles) { for (auto h : handles) {
auto id = h->GetID(); auto id = h->GetID();
const Comparator* comparator = h->GetComparator(); const Comparator* comparator = h->GetComparator();
(*cf_map)[id] = comparator; (*cf_map)[id] = comparator;
if (id != 0) {
(*handle_map)[id] = h;
} else {
// The pointer to the default cf handle in the handles will be deleted.
// Use the pointer maintained by the db instead.
(*handle_map)[id] = DefaultColumnFamily();
}
} }
cf_map_.store(cf_map); cf_map_.reset(cf_map);
cf_map_gc_.reset(cf_map); handle_map_.reset(handle_map);
} }
void WritePreparedTxnDB::UpdateCFComparatorMap( void WritePreparedTxnDB::UpdateCFComparatorMap(ColumnFamilyHandle* h) {
const ColumnFamilyHandle* h) { auto old_cf_map_ptr = cf_map_.get();
auto old_cf_map_ptr = cf_map_.load();
assert(old_cf_map_ptr); assert(old_cf_map_ptr);
auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr); auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
auto old_handle_map_ptr = handle_map_.get();
assert(old_handle_map_ptr);
auto handle_map =
new std::map<uint32_t, ColumnFamilyHandle*>(*old_handle_map_ptr);
auto id = h->GetID(); auto id = h->GetID();
const Comparator* comparator = h->GetComparator(); const Comparator* comparator = h->GetComparator();
(*cf_map)[id] = comparator; (*cf_map)[id] = comparator;
cf_map_.store(cf_map); (*handle_map)[id] = h;
cf_map_gc_.reset(cf_map); cf_map_.reset(cf_map);
handle_map_.reset(handle_map);
} }

@ -347,12 +347,15 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Struct to hold ownership of snapshot and read callback for cleanup. // Struct to hold ownership of snapshot and read callback for cleanup.
struct IteratorState; struct IteratorState;
std::map<uint32_t, const Comparator*>* GetCFComparatorMap() { std::shared_ptr<std::map<uint32_t, const Comparator*>> GetCFComparatorMap() {
return cf_map_.load(); return cf_map_;
}
std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> GetCFHandleMap() {
return handle_map_;
} }
void UpdateCFComparatorMap( void UpdateCFComparatorMap(
const std::vector<ColumnFamilyHandle*>& handles) override; const std::vector<ColumnFamilyHandle*>& handles) override;
void UpdateCFComparatorMap(const ColumnFamilyHandle* handle) override; void UpdateCFComparatorMap(ColumnFamilyHandle* handle) override;
virtual const Snapshot* GetSnapshot() override; virtual const Snapshot* GetSnapshot() override;
@ -595,9 +598,12 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
mutable port::RWMutex commit_cache_mutex_; mutable port::RWMutex commit_cache_mutex_;
mutable port::RWMutex snapshots_mutex_; mutable port::RWMutex snapshots_mutex_;
// A cache of the cf comparators // A cache of the cf comparators
std::atomic<std::map<uint32_t, const Comparator*>*> cf_map_; // Thread safety: since it is a const it is safe to read it concurrently
// GC of the object above std::shared_ptr<std::map<uint32_t, const Comparator*>> cf_map_;
std::unique_ptr<std::map<uint32_t, const Comparator*>> cf_map_gc_; // A cache of the cf handles
// Thread safety: since the handle is read-only object it is a const it is
// safe to read it concurrently
std::shared_ptr<std::map<uint32_t, ColumnFamilyHandle*>> handle_map_;
}; };
class WritePreparedTxnReadCallback : public ReadCallback { class WritePreparedTxnReadCallback : public ReadCallback {

Loading…
Cancel
Save