You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rocksdb/utilities/transactions/write_prepared_txn.cc

463 lines
18 KiB

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/transactions/write_prepared_txn.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <map>
#include <set>
#include "db/column_family.h"
#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/transaction_db.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/write_prepared_txn_db.h"
namespace rocksdb {
struct WriteOptions;
WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options)
: PessimisticTransaction(txn_db, write_options, txn_options),
wpt_db_(txn_db) {}
Status WritePreparedTxn::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {
auto snapshot = read_options.snapshot;
auto snap_seq =
snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
WritePreparedTxnReadCallback callback(wpt_db_, snap_seq);
return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
pinnable_val, &callback);
}
Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
// Make sure to get iterator from WritePrepareTxnDB, not the root db.
Iterator* db_iter = wpt_db_->NewIterator(options);
assert(db_iter);
return write_batch_.NewIteratorWithBase(db_iter);
}
Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) {
// Make sure to get iterator from WritePrepareTxnDB, not the root db.
Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
assert(db_iter);
return write_batch_.NewIteratorWithBase(column_family, db_iter);
}
namespace {
// A wrapper around Comparator to make it usable in std::set
struct SetComparator {
explicit SetComparator() : user_comparator_(BytewiseComparator()) {}
explicit SetComparator(const Comparator* user_comparator)
: user_comparator_(user_comparator ? user_comparator
: BytewiseComparator()) {}
bool operator()(const Slice& lhs, const Slice& rhs) const {
return user_comparator_->Compare(lhs, rhs) < 0;
}
private:
const Comparator* user_comparator_;
};
// Count the number of sub-batches inside a batch. A sub-batch does not have
// duplicate keys.
struct SubBatchCounter : public WriteBatch::Handler {
explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
: comparators_(comparators), batches_(1) {}
std::map<uint32_t, const Comparator*>& comparators_;
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
size_t batches_;
size_t BatchCount() { return batches_; }
void AddKey(uint32_t cf, const Slice& key) {
CFKeys& cf_keys = keys_[cf];
if (cf_keys.size() == 0) { // just inserted
auto cmp = comparators_[cf];
keys_[cf] = CFKeys(SetComparator(cmp));
}
auto it = cf_keys.insert(key);
if (it.second == false) { // second is false if a element already existed.
batches_++;
keys_.clear();
keys_[cf].insert(key);
}
}
Status MarkNoop(bool) override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
AddKey(cf, key);
return Status::OK();
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
AddKey(cf, key);
return Status::OK();
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
AddKey(cf, key);
return Status::OK();
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
AddKey(cf, key);
return Status::OK();
}
Status MarkBeginPrepare() override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
bool WriteAfterCommit() const override { return false; }
};
} // namespace
Status WritePreparedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
const bool WRITE_AFTER_COMMIT = true;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
!WRITE_AFTER_COMMIT);
const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber;
// For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = 1;
if (GetWriteBatch()->HasDuplicateKeys()) {
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter);
assert(s.ok());
prepare_batch_cnt_ = counter.BatchCount();
}
Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
!DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
auto prepare_seq = seq_used;
SetId(prepare_seq);
// TODO(myabandeh): AddPrepared better to be called in the pre-release
// callback otherwise there is a non-zero chance of max dvancing prepare_seq
// and readers assume the data as committed.
if (s.ok()) {
wpt_db_->AddPrepared(prepare_seq);
}
return s;
}
Status WritePreparedTxn::CommitWithoutPrepareInternal() {
// For each duplicate key we account for a new sub-batch
size_t batch_cnt = 1;
if (GetWriteBatch()->HasDuplicateKeys()) {
batch_cnt = 0; // this will trigger a batch cnt compute
}
return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
}
Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch,
size_t batch_cnt) {
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitBatchInternal");
if (batch->Count() == 0) {
// Otherwise our 1 seq per batch logic will break since there is no seq
// increased for this batch.
return Status::OK();
}
if (batch_cnt == 0) { // not provided, then compute it
// TODO(myabandeh): add an option to allow user skipping this cost
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = batch->Iterate(&counter);
assert(s.ok());
batch_cnt = counter.BatchCount();
}
assert(batch_cnt);
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
bool sync = write_options_.sync;
if (!do_one_write) {
// No need to sync on the first write
write_options_.sync = false;
}
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(batch);
const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber;
const size_t ZERO_PREPARES = 0;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
auto s = db_impl_->WriteImpl(
write_options_, batch, nullptr, nullptr, no_log_ref, !DISABLE_MEMTABLE,
&seq_used, batch_cnt, do_one_write ? &update_commit_map : nullptr);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
uint64_t& prepare_seq = seq_used;
SetId(prepare_seq);
if (!s.ok()) {
return s;
}
if (do_one_write) {
return s;
} // else do the 2nd write for commit
// Set the original value of sync
write_options_.sync = sync;
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
prepare_seq);
// Note: we skip AddPrepared here. This could be further optimized by skip
// erasing prepare_seq from prepared_txn_ in the following callback.
// TODO(myabandeh): What if max advances the prepare_seq_ in the meanwhile and
// readers assume the prepared data as committed? Almost zero probability.
// Commit the batch by writing an empty batch to the 2nd queue that will
// release the commit sequence number to readers.
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, prepare_seq, batch_cnt);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
const size_t ONE_BATCH = 1;
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
return s;
}
Status WritePreparedTxn::CommitInternal() {
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitInternal prepare_seq: %" PRIu64, GetID());
// We take the commit-time batch and append the Commit marker.
// The Memtable will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
const bool empty = working_batch->Count() == 0;
WriteBatchInternal::MarkCommit(working_batch, name_);
const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
if (!empty && for_recovery) {
// When not writing to memtable, we can still cache the latest write batch.
// The cached batch will be written to memtable in WriteRecoverableState
// during FlushMemTable
WriteBatchInternal::SetAsLastestPersistentState(working_batch);
}
auto prepare_seq = GetId();
const bool includes_data = !empty && !for_recovery;
assert(prepare_batch_cnt_);
size_t commit_batch_cnt = 0;
if (includes_data) {
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = working_batch->Iterate(&counter);
assert(s.ok());
commit_batch_cnt = counter.BatchCount();
}
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
const bool disable_memtable = !includes_data;
uint64_t seq_used = kMaxSequenceNumber;
// Since the prepared batch is directly written to memtable, there is already
// a connection between the memtable and its WAL, so there is no need to
// redundantly reference the log that contains the prepared data.
const uint64_t zero_log_number = 0ull;
size_t batch_cnt = commit_batch_cnt ? commit_batch_cnt : 1;
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used,
batch_cnt, &update_commit_map);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
return s;
}
Status WritePreparedTxn::RollbackInternal() {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"RollbackInternal prepare_seq: %" PRIu64, GetId());
WriteBatch rollback_batch;
assert(GetId() != kMaxSequenceNumber);
assert(GetId() > 0);
// In WritePrepared, the txn is is the same as prepare seq
auto last_visible_txn = GetId() - 1;
struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
DBImpl* db_;
ReadOptions roptions;
WritePreparedTxnReadCallback callback;
WriteBatch* rollback_batch_;
std::map<uint32_t, const Comparator*>& comparators_;
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
RollbackWriteBatchBuilder(
DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators)
: db_(db),
callback(wpt_db, snap_seq),
rollback_batch_(dst_batch),
comparators_(comparators) {}
Status Rollback(uint32_t cf, const Slice& key) {
Status s;
CFKeys& cf_keys = keys_[cf];
if (cf_keys.size() == 0) { // just inserted
auto cmp = comparators_[cf];
keys_[cf] = CFKeys(SetComparator(cmp));
}
auto it = cf_keys.insert(key);
if (it.second ==
false) { // second is false if a element already existed.
return s;
}
PinnableSlice pinnable_val;
bool not_used;
auto cf_handle = db_->GetColumnFamilyHandle(cf);
s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
&callback);
assert(s.ok() || s.IsNotFound());
if (s.ok()) {
s = rollback_batch_->Put(cf_handle, key, pinnable_val);
assert(s.ok());
} else if (s.IsNotFound()) {
// There has been no readable value before txn. By adding a delete we
// make sure that there will be none afterwards either.
s = rollback_batch_->Delete(cf_handle, key);
assert(s.ok());
} else {
// Unexpected status. Return it to the user.
}
return s;
}
Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
return Rollback(cf, key);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return Rollback(cf, key);
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return Rollback(cf, key);
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
return Rollback(cf, key);
}
Status MarkNoop(bool) override { return Status::OK(); }
Status MarkBeginPrepare() override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}
protected:
virtual bool WriteAfterCommit() const override { return false; }
} rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch,
*wpt_db_->GetCFComparatorMap());
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
assert(s.ok());
if (!s.ok()) {
return s;
}
// The Rollback marker will be used as a batch separator
WriteBatchInternal::MarkRollback(&rollback_batch, name_);
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber;
const size_t ZERO_PREPARES = 0;
const size_t ONE_BATCH = 1;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, ONE_BATCH);
s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
no_log_ref, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
do_one_write ? &update_commit_map : nullptr);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (!s.ok()) {
return s;
}
if (do_one_write) {
// Mark the txn as rolled back
uint64_t& rollback_seq = seq_used;
wpt_db_->RollbackPrepared(GetId(), rollback_seq);
return s;
} // else do the 2nd write for commit
uint64_t& prepare_seq = seq_used;
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"RollbackInternal 2nd write prepare_seq: %" PRIu64,
prepare_seq);
// Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers.
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, prepare_seq, ONE_BATCH);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Mark the txn as rolled back
uint64_t& rollback_seq = seq_used;
if (s.ok()) {
wpt_db_->RollbackPrepared(GetId(), rollback_seq);
}
return s;
}
Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
const Slice& key,
SequenceNumber* tracked_at_seq) {
assert(snapshot_);
SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
// tracked_at_seq is either max or the last snapshot with which this key was
// trackeed so there is no need to apply the IsInSnapshot to this comparison
// here as tracked_at_seq is not a prepare seq.
if (*tracked_at_seq <= snap_seq) {
// If the key has been previous validated at a sequence number earlier
// than the curent snapshot's sequence number, we already know it has not
// been modified.
return Status::OK();
}
*tracked_at_seq = snap_seq;
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();
WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq);
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snap_seq, false /* cache_only */,
&snap_checker);
}
Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
prepare_batch_cnt_ = 1;
if (GetWriteBatch()->HasDuplicateKeys()) {
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter);
assert(s.ok());
prepare_batch_cnt_ = counter.BatchCount();
}
return ret;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE