WritePrepared Txn: Duplicate Keys, Txn Part

Summary:
This patch takes advantage of memtable being able to detect duplicate <key,seq> and returning TryAgain to handle duplicate keys in WritePrepared Txns. Through WriteBatchWithIndex's index it detects existence of at least a duplicate key in the write batch. If duplicate key was reported, it then pays the cost of counting the number of sub-patches by iterating over the write batch and pass it to DBImpl::Write. DB will make use of the provided batch_count to assign proper sequence numbers before sending them to the WAL. When later inserting the batch to the memtable, it increases the seq each time memtbale reports a duplicate (a sub-patch in our counting) and tries again.
Closes https://github.com/facebook/rocksdb/pull/3455

Differential Revision: D6873699

Pulled By: maysamyabandeh

fbshipit-source-id: db8487526c3a5dc1ddda0ea49f0f979b26ae648d
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 4b124fb9d3
commit 88d8b2a2f5
  1. 9
      db/db_impl.h
  2. 35
      db/db_impl_write.cc
  3. 83
      db/write_batch.cc
  4. 2
      db/write_batch_internal.h
  5. 12
      db/write_callback_test.cc
  6. 5
      db/write_thread.h
  7. 9
      include/rocksdb/utilities/write_batch_with_index.h
  8. 2
      utilities/transactions/pessimistic_transaction.cc
  9. 7
      utilities/transactions/pessimistic_transaction.h
  10. 52
      utilities/transactions/pessimistic_transaction_db.cc
  11. 12
      utilities/transactions/pessimistic_transaction_db.h
  12. 2
      utilities/transactions/transaction_base.h
  13. 260
      utilities/transactions/transaction_test.cc
  14. 191
      utilities/transactions/write_prepared_txn.cc
  15. 6
      utilities/transactions/write_prepared_txn.h
  16. 15
      utilities/transactions/write_prepared_txn_db.cc
  17. 40
      utilities/transactions/write_prepared_txn_db.h
  18. 82
      utilities/write_batch_with_index/write_batch_with_index.cc

@ -673,10 +673,14 @@ class DBImpl : public DB {
// batch which will be written to memtable later during the commit, and in
// WritePrepared it is guaranteed since it will be used only for WAL markers
// which will never be written to memtable.
// batch_cnt is expected to be non-zero in seq_per_batch mode and indicates
// the number of sub-patches. A sub-patch is a subset of the write batch that
// does not have duplicate keys.
Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false, uint64_t* seq_used = nullptr,
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);
Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
@ -685,10 +689,13 @@ class DBImpl : public DB {
bool disable_memtable = false,
uint64_t* seq_used = nullptr);
// batch_cnt is expected to be non-zero in seq_per_batch mode and indicates
// the number of sub-patches. A sub-patch is a subset of the write batch that
// does not have duplicate keys.
Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr,
uint64_t* seq_used = nullptr, size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);
uint64_t FindMinLogContainingOutstandingPrep();

@ -64,7 +64,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used,
size_t batch_cnt,
PreReleaseCallback* pre_release_callback) {
assert(!seq_per_batch_ || batch_cnt != 0);
if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!");
}
@ -76,6 +78,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
"pipelined_writes is not compatible with concurrent prepares");
}
if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
// TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
return Status::NotSupported(
"pipelined_writes is not compatible with seq_per_batch");
}
@ -93,7 +96,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (two_write_queues_ && disable_memtable) {
return WriteImplWALOnly(write_options, my_batch, callback, log_used,
log_ref, seq_used, pre_release_callback);
log_ref, seq_used, batch_cnt, pre_release_callback);
}
if (immutable_db_options_.enable_pipelined_write) {
@ -103,7 +106,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, pre_release_callback);
disable_memtable, batch_cnt, pre_release_callback);
if (!write_options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL);
@ -122,7 +125,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_);
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt);
}
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
@ -214,7 +217,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
uint64_t total_byte_size = 0;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
valid_batches++;
valid_batches += writer->batch_cnt;
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->batch);
parallel = parallel && !writer->batch->HasMerge();
@ -303,7 +306,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
writer->sequence = next_sequence;
if (seq_per_batch_) {
next_sequence++;
assert(writer->batch_cnt);
next_sequence += writer->batch_cnt;
} else if (writer->ShouldWriteToMemtable()) {
next_sequence += WriteBatchInternal::Count(writer->batch);
}
@ -323,7 +327,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_);
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
w.batch_cnt);
}
}
if (seq_used != nullptr) {
@ -515,12 +520,13 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
uint64_t* seq_used,
uint64_t* seq_used, size_t batch_cnt,
PreReleaseCallback* pre_release_callback) {
Status status;
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
true /* disable_memtable */, pre_release_callback);
true /* disable_memtable */, batch_cnt,
pre_release_callback);
RecordTick(stats_, WRITE_WITH_WAL);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
@ -576,7 +582,15 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
PERF_TIMER_GUARD(write_wal_time);
// LastAllocatedSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
size_t seq_inc = seq_per_batch_ ? write_group.size : 0 /*total_count*/;
size_t seq_inc = 0 /* total_count */;
if (seq_per_batch_) {
size_t total_batch_cnt = 0;
for (auto* writer : write_group) {
assert(writer->batch_cnt);
total_batch_cnt += writer->batch_cnt;
}
seq_inc = total_batch_cnt;
}
if (!write_options.disableWAL) {
status =
ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
@ -591,7 +605,8 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
}
writer->sequence = curr_seq;
if (seq_per_batch_) {
curr_seq++;
assert(writer->batch_cnt);
curr_seq += writer->batch_cnt;
}
// else seq advances only by memtable writes
}

@ -402,15 +402,22 @@ Status WriteBatch::Iterate(Handler* handler) const {
bool empty_batch = true;
int found = 0;
Status s;
while (s.ok() && !input.empty() && handler->Continue()) {
char tag = 0;
uint32_t column_family = 0; // default
while ((s.ok() || s.IsTryAgain()) && !input.empty() && handler->Continue()) {
if (!s.IsTryAgain()) {
tag = 0;
column_family = 0; // default
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob, &xid);
if (!s.ok()) {
return s;
}
} else {
assert(s.IsTryAgain());
s = Status::OK();
}
switch (tag) {
case kTypeColumnFamilyValue:
@ -418,47 +425,59 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
s = handler->PutCF(column_family, key, value);
if (s.ok()) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
s = handler->DeleteCF(column_family, key);
if (s.ok()) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
s = handler->SingleDeleteCF(column_family, key);
if (s.ok()) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyRangeDeletion:
case kTypeRangeDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
s = handler->DeleteRangeCF(column_family, key, value);
if (s.ok()) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
s = handler->MergeCF(column_family, key, value);
if (s.ok()) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyBlobIndex:
case kTypeBlobIndex:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
s = handler->PutBlobIndexCF(column_family, key, value);
if (s.ok()) {
found++;
}
break;
case kTypeLogData:
handler->LogData(blob);
@ -1084,12 +1103,23 @@ class MemTableInserter : public WriteBatch::Handler {
MaybeAdvanceSeq();
return seek_status;
}
Status ret_status;
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetImmutableMemTableOptions();
// inplace_update_support is inconsistent with snapshots, and therefore with
// any kind of transactions including the ones that use seq_per_batch
assert(!seq_per_batch_ || !moptions->inplace_update_support);
if (!moptions->inplace_update_support) {
mem->Add(sequence_, value_type, key, value, concurrent_memtable_writes_,
get_post_process_info(mem));
bool mem_res =
mem->Add(sequence_, value_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
if (!mem_res) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;
MaybeAdvanceSeq(BATCH_BOUNDRY);
}
} else if (moptions->inplace_callback == nullptr) {
assert(!concurrent_memtable_writes_);
mem->Update(sequence_, key, value);
@ -1125,11 +1155,15 @@ class MemTableInserter : public WriteBatch::Handler {
value, &merged_value);
if (status == UpdateStatus::UPDATED_INPLACE) {
// prev_value is updated in-place with final value.
mem->Add(sequence_, value_type, key, Slice(prev_buffer, prev_size));
bool mem_res __attribute__((__unused__)) = mem->Add(
sequence_, value_type, key, Slice(prev_buffer, prev_size));
assert(mem_res);
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
} else if (status == UpdateStatus::UPDATED) {
// merged_value contains the final value.
bool mem_res __attribute__((__unused__)) =
mem->Add(sequence_, value_type, key, Slice(merged_value));
assert(mem_res);
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
}
}
@ -1139,7 +1173,7 @@ class MemTableInserter : public WriteBatch::Handler {
// in memtable add/update.
MaybeAdvanceSeq();
CheckMemtableFull();
return Status::OK();
return ret_status;
}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
@ -1149,12 +1183,20 @@ class MemTableInserter : public WriteBatch::Handler {
Status DeleteImpl(uint32_t column_family_id, const Slice& key,
const Slice& value, ValueType delete_type) {
Status ret_status;
MemTable* mem = cf_mems_->GetMemTable();
mem->Add(sequence_, delete_type, key, value, concurrent_memtable_writes_,
get_post_process_info(mem));
bool mem_res =
mem->Add(sequence_, delete_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
if (!mem_res) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;
MaybeAdvanceSeq(BATCH_BOUNDRY);
}
MaybeAdvanceSeq();
CheckMemtableFull();
return Status::OK();
return ret_status;
}
virtual Status DeleteCF(uint32_t column_family_id,
@ -1246,6 +1288,7 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
Status ret_status;
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetImmutableMemTableOptions();
bool perform_merge = false;
@ -1301,18 +1344,30 @@ class MemTableInserter : public WriteBatch::Handler {
perform_merge = false;
} else {
// 3) Add value to memtable
mem->Add(sequence_, kTypeValue, key, new_value);
bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value);
if (!mem_res) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;
MaybeAdvanceSeq(BATCH_BOUNDRY);
}
}
}
if (!perform_merge) {
// Add merge operator to memtable
mem->Add(sequence_, kTypeMerge, key, value);
bool mem_res = mem->Add(sequence_, kTypeMerge, key, value);
if (!mem_res) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;
MaybeAdvanceSeq(BATCH_BOUNDRY);
}
}
MaybeAdvanceSeq();
CheckMemtableFull();
return Status::OK();
return ret_status;
}
virtual Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
@ -1496,6 +1551,8 @@ Status WriteBatchInternal::InsertInto(
if (!w->status.ok()) {
return w->status;
}
assert(!seq_per_batch || w->batch_cnt != 0);
assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
}
return Status::OK();
}
@ -1504,7 +1561,7 @@ Status WriteBatchInternal::InsertInto(
WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch) {
bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt) {
assert(writer->ShouldWriteToMemtable());
MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, log_number, db,
@ -1513,6 +1570,8 @@ Status WriteBatchInternal::InsertInto(
SetSequence(writer->batch, sequence);
inserter.set_log_number_ref(writer->log_ref);
Status s = writer->batch->Iterate(&inserter);
assert(!seq_per_batch || batch_cnt != 0);
assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
if (concurrent_memtable_writes) {
inserter.PostProcess();
}

@ -182,7 +182,7 @@ class WriteBatchInternal {
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false,
bool seq_per_batch = false);
bool seq_per_batch = false, size_t batch_cnt = 0);
static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);

@ -291,7 +291,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
woptions.disableWAL = !enable_WAL;
woptions.sync = enable_WAL;
Status s;
if (seq_per_batch && two_queues) {
if (seq_per_batch) {
class PublishSeqCallback : public PreReleaseCallback {
public:
PublishSeqCallback(DBImpl* db_impl_in)
@ -302,9 +302,13 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
}
DBImpl* db_impl_;
} publish_seq_callback(db_impl);
s = db_impl->WriteImpl(woptions, &write_op.write_batch_,
&write_op.callback_, nullptr, 0, false,
nullptr, &publish_seq_callback);
// seq_per_batch requires a natural batch separator or Noop
WriteBatchInternal::InsertNoop(&write_op.write_batch_);
const size_t ONE_BATCH = 1;
s = db_impl->WriteImpl(
woptions, &write_op.write_batch_, &write_op.callback_,
nullptr, 0, false, nullptr, ONE_BATCH,
two_queues ? &publish_seq_callback : nullptr);
} else {
s = db_impl->WriteWithCallback(
woptions, &write_op.write_batch_, &write_op.callback_);

@ -118,6 +118,7 @@ class WriteThread {
bool no_slowdown;
bool disable_wal;
bool disable_memtable;
size_t batch_cnt; // if non-zero, number of sub-batches in the write batch
PreReleaseCallback* pre_release_callback;
uint64_t log_used; // log number that this batch was inserted into
uint64_t log_ref; // log number that memtable insert should reference
@ -128,6 +129,7 @@ class WriteThread {
SequenceNumber sequence; // the sequence number to use for the first key
Status status; // status of memtable inserter
Status callback_status; // status returned by callback->Callback()
std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
Writer* link_older; // read/write only before linking, or as leader
@ -139,6 +141,7 @@ class WriteThread {
no_slowdown(false),
disable_wal(false),
disable_memtable(false),
batch_cnt(0),
pre_release_callback(nullptr),
log_used(0),
log_ref(0),
@ -152,12 +155,14 @@ class WriteThread {
Writer(const WriteOptions& write_options, WriteBatch* _batch,
WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable,
size_t _batch_cnt = 0,
PreReleaseCallback* _pre_release_callback = nullptr)
: batch(_batch),
sync(write_options.sync),
no_slowdown(write_options.no_slowdown),
disable_wal(write_options.disableWAL),
disable_memtable(_disable_memtable),
batch_cnt(_batch_cnt),
pre_release_callback(_pre_release_callback),
log_used(0),
log_ref(_log_ref),

@ -228,13 +228,8 @@ class WriteBatchWithIndex : public WriteBatchBase {
private:
friend class WritePreparedTxn;
// TODO(myabandeh): this is hackish, non-efficient solution to enable the e2e
// unit tests. Replace it with a proper solution. Collapse the WriteBatch to
// remove the duplicate keys. The index will not be updated after this.
// Returns false if collapse was not necessary
bool Collapse();
void DisableDuplicateMergeKeys() { allow_dup_merge_ = false; }
bool allow_dup_merge_ = true;
// Returns true if there has been duplicate keys in the batch.
bool HasDuplicateKeys();
Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,

@ -307,7 +307,7 @@ Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
return s;
}
Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch) {
Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {
Status s = db_->Write(write_options_, batch);
return s;
}

@ -121,7 +121,10 @@ class PessimisticTransaction : public TransactionBaseImpl {
virtual Status CommitWithoutPrepareInternal() = 0;
virtual Status CommitBatchInternal(WriteBatch* batch) = 0;
// batch_cnt if non-zero is the number of sub-batches. A sub-batch is a batch
// with no duplicate keys. If zero, then the number of sub-batches is unknown.
virtual Status CommitBatchInternal(WriteBatch* batch,
size_t batch_cnt = 0) = 0;
virtual Status CommitInternal() = 0;
@ -204,7 +207,7 @@ class WriteCommittedTxn : public PessimisticTransaction {
Status CommitWithoutPrepareInternal() override;
Status CommitBatchInternal(WriteBatch* batch) override;
Status CommitBatchInternal(WriteBatch* batch, size_t batch_cnt) override;
Status CommitInternal() override;

@ -82,12 +82,29 @@ PessimisticTransactionDB::~PessimisticTransactionDB() {
}
}
Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) {
return Status::OK();
}
Status PessimisticTransactionDB::Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) {
for (auto cf_ptr : handles) {
AddColumnFamily(cf_ptr);
}
// Verify cf options
for (auto handle : handles) {
ColumnFamilyDescriptor cfd;
Status s = handle->GetDescriptor(&cfd);
if (!s.ok()) {
return s;
}
s = VerifyCFOptions(cfd.options);
if (!s.ok()) {
return s;
}
}
// Re-enable compaction for the column families that initially had
// compaction enabled.
std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
@ -158,6 +175,30 @@ TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
return validated;
}
void PessimisticTransactionDB::UpdateCFComparatorMap(
const std::vector<ColumnFamilyHandle*>& handles) {
auto cf_map = new std::map<uint32_t, const Comparator*>();
for (auto h : handles) {
auto id = h->GetID();
const Comparator* comparator = h->GetComparator();
(*cf_map)[id] = comparator;
}
cf_map_.store(cf_map);
cf_map_gc_.reset(cf_map);
}
void PessimisticTransactionDB::UpdateCFComparatorMap(
const ColumnFamilyHandle* h) {
auto old_cf_map_ptr = cf_map_.load();
assert(old_cf_map_ptr);
auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
auto id = h->GetID();
const Comparator* comparator = h->GetComparator();
(*cf_map)[id] = comparator;
cf_map_.store(cf_map);
cf_map_gc_.reset(cf_map);
}
Status TransactionDB::Open(const Options& options,
const TransactionDBOptions& txn_db_options,
const std::string& dbname, TransactionDB** dbptr) {
@ -245,6 +286,7 @@ Status TransactionDB::WrapDB(
txn_db = new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
}
txn_db->UpdateCFComparatorMap(handles);
*dbptr = txn_db;
Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
return s;
@ -270,6 +312,7 @@ Status TransactionDB::WrapStackableDB(
txn_db = new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
}
txn_db->UpdateCFComparatorMap(handles);
*dbptr = txn_db;
Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
return s;
@ -286,10 +329,15 @@ Status PessimisticTransactionDB::CreateColumnFamily(
const ColumnFamilyOptions& options, const std::string& column_family_name,
ColumnFamilyHandle** handle) {
InstrumentedMutexLock l(&column_family_mutex_);
Status s = VerifyCFOptions(options);
if (!s.ok()) {
return s;
}
Status s = db_->CreateColumnFamily(options, column_family_name, handle);
s = db_->CreateColumnFamily(options, column_family_name, handle);
if (s.ok()) {
lock_mgr_.AddColumnFamily((*handle)->GetID());
UpdateCFComparatorMap(*handle);
}
return s;
@ -439,8 +487,6 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts,
// concurrent transactions.
Transaction* txn = BeginInternalTransaction(opts);
txn->DisableIndexing();
// TODO(myabandeh): indexing being disabled we need another machanism to
// detect duplicattes in the input patch
auto txn_impl =
static_cast_with_check<PessimisticTransaction, Transaction>(txn);

@ -113,15 +113,27 @@ class PessimisticTransactionDB : public TransactionDB {
std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
void SetDeadlockInfoBufferSize(uint32_t target_size) override;
void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>& handles);
void UpdateCFComparatorMap(const ColumnFamilyHandle* handle);
std::map<uint32_t, const Comparator*>* GetCFComparatorMap() {
return cf_map_.load();
}
protected:
DBImpl* db_impl_;
std::shared_ptr<Logger> info_log_;
const TransactionDBOptions txn_db_options_;
// A cache of the cf comparators
std::atomic<std::map<uint32_t, const Comparator*>*> cf_map_;
// GC of the object above
std::unique_ptr<std::map<uint32_t, const Comparator*>> cf_map_gc_;
void ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions());
virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options);
private:
friend class WritePreparedTxnDB;
friend class WritePreparedTxnDBMock;

@ -232,7 +232,7 @@ class TransactionBaseImpl : public Transaction {
// iterates over the given batch and makes the appropriate inserts.
// used for rebuilding prepared transactions after recovery.
Status RebuildFromWriteBatch(WriteBatch* src_batch) override;
virtual Status RebuildFromWriteBatch(WriteBatch* src_batch) override;
WriteBatch* GetCommitTimeWriteBatch() override;

@ -70,6 +70,25 @@ TEST_P(TransactionTest, DoubleEmptyWrite) {
ASSERT_OK(db->Write(write_options, &batch));
ASSERT_OK(db->Write(write_options, &batch));
// Also test committing empty transactions in 2PC
TransactionOptions txn_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn0->SetName("xid"));
ASSERT_OK(txn0->Prepare());
ASSERT_OK(txn0->Commit());
delete txn0;
// Also test that it works during recovery
txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn0->SetName("xid2"));
txn0->Put(Slice("foo0"), Slice("bar0a"));
ASSERT_OK(txn0->Prepare());
delete txn0;
ASSERT_OK(ReOpenNoDelete());
txn0 = db->GetTransactionByName("xid2");
ASSERT_OK(txn0->Commit());
delete txn0;
}
TEST_P(TransactionTest, SuccessTest) {
@ -2874,12 +2893,8 @@ TEST_P(TransactionTest, UntrackedWrites) {
// Untracked writes should succeed even though key was written after snapshot
s = txn->PutUntracked("untracked", "1");
ASSERT_OK(s);
if (txn_db_options.write_policy != WRITE_PREPARED) {
// WRITE_PREPARED does not currently support dup merge keys.
// TODO(myabandeh): remove this if-then when the support is added
s = txn->MergeUntracked("untracked", "2");
ASSERT_OK(s);
}
s = txn->DeleteUntracked("untracked");
ASSERT_OK(s);
@ -4250,11 +4265,6 @@ TEST_P(TransactionTest, SingleDeleteTest) {
}
TEST_P(TransactionTest, MergeTest) {
if (txn_db_options.write_policy == WRITE_PREPARED) {
// WRITE_PREPARED does not currently support dup merge keys.
// TODO(myabandeh): remove this if-then when the support is added
return;
}
WriteOptions write_options;
ReadOptions read_options;
string value;
@ -4978,9 +4988,119 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
}
// Test that the transactional db can handle duplicate keys in the write batch
TEST_P(TransactionTest, DuplicateKeyTest) {
TEST_P(TransactionTest, DuplicateKeys) {
ColumnFamilyOptions cf_options;
std::string cf_name = "two";
ColumnFamilyHandle* cf_handle = nullptr;
{
db->CreateColumnFamily(cf_options, cf_name, &cf_handle);
WriteOptions write_options;
WriteBatch batch;
batch.Put(Slice("key"), Slice("value"));
batch.Put(Slice("key2"), Slice("value2"));
// duplicate the keys
batch.Put(Slice("key"), Slice("value3"));
// duplicate the 2nd key. It should not be counted duplicate since a
// sub-patch is cut after the last duplicate.
batch.Put(Slice("key2"), Slice("value4"));
// duplicate the keys but in a different cf. It should not be counted as
// duplicate keys
batch.Put(cf_handle, Slice("key"), Slice("value5"));
ASSERT_OK(db->Write(write_options, &batch));
ReadOptions ropt;
PinnableSlice pinnable_val;
auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("value3"));
s = db->Get(ropt, db->DefaultColumnFamily(), "key2", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("value4"));
s = db->Get(ropt, cf_handle, "key", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("value5"));
delete cf_handle;
}
// Test with non-bytewise comparator
{
// A comparator that uses only the first three bytes
class ThreeBytewiseComparator : public Comparator {
public:
ThreeBytewiseComparator() {}
virtual const char* Name() const override {
return "test.ThreeBytewiseComparator";
}
virtual int Compare(const Slice& a, const Slice& b) const override {
Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
return na.compare(nb);
}
virtual bool Equal(const Slice& a, const Slice& b) const override {
Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3);
Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3);
return na == nb;
}
// This methods below dont seem relevant to this test. Implement them if
// proven othersize.
void FindShortestSeparator(std::string* start,
const Slice& limit) const override {
const Comparator* bytewise_comp = BytewiseComparator();
bytewise_comp->FindShortestSeparator(start, limit);
}
void FindShortSuccessor(std::string* key) const override {
const Comparator* bytewise_comp = BytewiseComparator();
bytewise_comp->FindShortSuccessor(key);
}
};
ReOpen();
std::unique_ptr<const Comparator> comp_gc(new ThreeBytewiseComparator());
cf_options.comparator = comp_gc.get();
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
WriteOptions write_options;
WriteBatch batch;
batch.Put(cf_handle, Slice("key"), Slice("value"));
// The first three bytes are the same, do it must be counted as duplicate
batch.Put(cf_handle, Slice("key2"), Slice("value2"));
ASSERT_OK(db->Write(write_options, &batch));
// The value must be the most recent value for all the keys equal to "key",
// including "key2"
ReadOptions ropt;
PinnableSlice pinnable_val;
ASSERT_OK(db->Get(ropt, cf_handle, "key", &pinnable_val));
ASSERT_TRUE(pinnable_val == ("value2"));
// Test duplicate keys with rollback
TransactionOptions txn_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn0->SetName("xid"));
ASSERT_OK(txn0->Put(cf_handle, Slice("key3"), Slice("value3")));
ASSERT_OK(txn0->Merge(cf_handle, Slice("key4"), Slice("value4")));
ASSERT_OK(txn0->Rollback());
ASSERT_OK(db->Get(ropt, cf_handle, "key5", &pinnable_val));
ASSERT_TRUE(pinnable_val == ("value2"));
delete txn0;
delete cf_handle;
cf_options.comparator = BytewiseComparator();
}
for (bool do_prepare : {true, false}) {
for (bool do_rollback : {true, false}) {
for (bool with_commit_batch : {true, false}) {
if (with_commit_batch && !do_prepare) {
continue;
}
if (with_commit_batch && do_rollback) {
continue;
}
ReOpen();
db->CreateColumnFamily(cf_options, cf_name, &cf_handle);
TransactionOptions txn_options;
txn_options.use_only_the_last_commit_time_batch_for_recovery = false;
WriteOptions write_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto s = txn0->SetName("xid");
@ -4993,16 +5113,21 @@ TEST_P(TransactionTest, DuplicateKeyTest) {
ASSERT_OK(s);
s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
ASSERT_OK(s);
// TODO(myabandeh): enable this after duplicatae merge keys are supported
// s = txn0->Merge(Slice("foo2"), Slice("bar2a"));
// ASSERT_OK(s);
s = txn0->Put(Slice("foo2"), Slice("bar2b"));
// Repeat a key after the start of a sub-patch. This should not cause a
// duplicate in the most recent sub-patch and hence not creating a new
// sub-patch.
s = txn0->Put(Slice("foo0"), Slice("bar0c"));
ASSERT_OK(s);
s = txn0->Merge(Slice("foo2"), Slice("bar2b"));
ASSERT_OK(s);
// duplicate the keys but in a different cf. It should not be counted as
// duplicate.
s = txn0->Put(cf_handle, Slice("foo0"), Slice("bar0-cf1"));
ASSERT_OK(s);
s = txn0->Put(Slice("foo3"), Slice("bar3"));
ASSERT_OK(s);
// TODO(myabandeh): enable this after duplicatae merge keys are supported
// s = txn0->Merge(Slice("foo3"), Slice("bar3"));
// ASSERT_OK(s);
s = txn0->Merge(Slice("foo3"), Slice("bar3"));
ASSERT_OK(s);
s = txn0->Put(Slice("foo4"), Slice("bar4"));
ASSERT_OK(s);
s = txn0->Delete(Slice("foo4"));
@ -5013,9 +5138,36 @@ TEST_P(TransactionTest, DuplicateKeyTest) {
s = txn0->Prepare();
ASSERT_OK(s);
}
if (do_rollback) {
// Test rolling back the batch with duplicates
s = txn0->Rollback();
ASSERT_OK(s);
} else {
if (with_commit_batch) {
assert(do_prepare);
auto cb = txn0->GetCommitTimeWriteBatch();
// duplicate a key in the original batch
// TODO(myabandeh): the behavior of GetCommitTimeWriteBatch
// conflicting with the prepared batch is currently undefined and
// gives different results in different implementations.
// s = cb->Put(Slice("foo0"), Slice("bar0d"));
// ASSERT_OK(s);
// add a new duplicate key
s = cb->Put(Slice("foo6"), Slice("bar6a"));
ASSERT_OK(s);
s = cb->Put(Slice("foo6"), Slice("bar6b"));
ASSERT_OK(s);
// add a duplicate key that is removed in the same batch
s = cb->Put(Slice("foo7"), Slice("bar7a"));
ASSERT_OK(s);
s = cb->Delete(Slice("foo7"));
ASSERT_OK(s);
}
s = txn0->Commit();
ASSERT_OK(s);
if (!do_prepare) {
}
if (!do_prepare && !do_rollback) {
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
pdb->UnregisterTransaction(txn0);
}
@ -5023,20 +5175,86 @@ TEST_P(TransactionTest, DuplicateKeyTest) {
ReadOptions ropt;
PinnableSlice pinnable_val;
if (do_rollback) {
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
} else {
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar0b"));
ASSERT_TRUE(pinnable_val == ("bar0c"));
s = db->Get(ropt, cf_handle, "foo0", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar0-cf1"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar1"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar2b"));
ASSERT_TRUE(pinnable_val == ("bar2a,bar2b"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar3"));
ASSERT_TRUE(pinnable_val == ("bar3,bar3"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
if (with_commit_batch) {
s = db->Get(ropt, db->DefaultColumnFamily(), "foo6", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar6b"));
s = db->Get(ropt, db->DefaultColumnFamily(), "foo7", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
}
}
delete cf_handle;
} // with_commit_batch
} // do_rollback
} // do_prepare
{
// Also test with max_successive_merges > 0. max_successive_merges will not
// affect our algorithm for duplicate key insertion but we add the test to
// verify that.
cf_options.max_successive_merges = 2;
cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
ReOpen();
db->CreateColumnFamily(cf_options, cf_name, &cf_handle);
WriteOptions write_options;
// Ensure one value for the key
db->Put(write_options, cf_handle, Slice("key"), Slice("value"));
WriteBatch batch;
// Merge more than max_successive_merges times
batch.Merge(cf_handle, Slice("key"), Slice("1"));
batch.Merge(cf_handle, Slice("key"), Slice("2"));
batch.Merge(cf_handle, Slice("key"), Slice("3"));
batch.Merge(cf_handle, Slice("key"), Slice("4"));
ASSERT_OK(db->Write(write_options, &batch));
ReadOptions read_options;
string value;
ASSERT_OK(db->Get(read_options, cf_handle, "key", &value));
ASSERT_EQ(value, "value,1,2,3,4");
delete cf_handle;
}
{
// Test that the duplicate detection is not compromised after rolling back
// to a save point
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b")));
txn0->SetSavePoint();
txn0->RollbackToSavePoint();
ASSERT_OK(txn0->Commit());
}
}

@ -13,6 +13,7 @@
#include <inttypes.h>
#include <map>
#include <set>
#include "db/column_family.h"
#include "db/db_impl.h"
@ -30,9 +31,7 @@ WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options)
: PessimisticTransaction(txn_db, write_options, txn_options),
wpt_db_(txn_db) {
GetWriteBatch()->DisableDuplicateMergeKeys();
}
wpt_db_(txn_db) {}
Status WritePreparedTxn::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
@ -63,6 +62,69 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
return write_batch_.NewIteratorWithBase(column_family, db_iter);
}
namespace {
// A wrapper around Comparator to make it usable in std::set
struct SetComparator {
explicit SetComparator() : user_comparator_(BytewiseComparator()) {}
explicit SetComparator(const Comparator* user_comparator)
: user_comparator_(user_comparator ? user_comparator
: BytewiseComparator()) {}
bool operator()(const Slice& lhs, const Slice& rhs) const {
return user_comparator_->Compare(lhs, rhs) < 0;
}
private:
const Comparator* user_comparator_;
};
// Count the number of sub-batches inside a batch. A sub-batch does not have
// duplicate keys.
struct SubBatchCounter : public WriteBatch::Handler {
explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
: comparators_(comparators), batches_(1) {}
std::map<uint32_t, const Comparator*>& comparators_;
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
size_t batches_;
size_t BatchCount() { return batches_; }
void AddKey(uint32_t cf, const Slice& key) {
CFKeys& cf_keys = keys_[cf];
if (cf_keys.size() == 0) { // just inserted
auto cmp = comparators_[cf];
keys_[cf] = CFKeys(SetComparator(cmp));
}
auto it = cf_keys.insert(key);
if (it.second == false) { // second is false if a element already existed.
batches_++;
keys_.clear();
keys_[cf].insert(key);
}
}
Status MarkNoop(bool) override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
AddKey(cf, key);
return Status::OK();
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
AddKey(cf, key);
return Status::OK();
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
AddKey(cf, key);
return Status::OK();
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
AddKey(cf, key);
return Status::OK();
}
Status MarkBeginPrepare() override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
bool WriteAfterCommit() const override { return false; }
};
} // namespace
Status WritePreparedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
@ -71,15 +133,18 @@ Status WritePreparedTxn::PrepareInternal() {
!WRITE_AFTER_COMMIT);
const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber;
bool collapsed = GetWriteBatch()->Collapse();
if (collapsed) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Collapse overhead due to duplicate keys");
// For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = 1;
if (GetWriteBatch()->HasDuplicateKeys()) {
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter);
assert(s.ok());
prepare_batch_cnt_ = counter.BatchCount();
}
Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
!DISABLE_MEMTABLE, &seq_used);
!DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
auto prepare_seq = seq_used;
SetId(prepare_seq);
@ -93,18 +158,32 @@ Status WritePreparedTxn::PrepareInternal() {
}
Status WritePreparedTxn::CommitWithoutPrepareInternal() {
bool collapsed = GetWriteBatch()->Collapse();
if (collapsed) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Collapse overhead due to duplicate keys");
// For each duplicate key we account for a new sub-batch
size_t batch_cnt = 1;
if (GetWriteBatch()->HasDuplicateKeys()) {
batch_cnt = 0; // this will trigger a batch cnt compute
}
return CommitBatchInternal(GetWriteBatch()->GetWriteBatch());
return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
}
Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch,
size_t batch_cnt) {
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitBatchInternal");
// TODO(myabandeh): handle the duplicate keys in the batch
if (batch->Count() == 0) {
// Otherwise our 1 seq per batch logic will break since there is no seq
// increased for this batch.
return Status::OK();
}
if (batch_cnt == 0) { // not provided, then compute it
// TODO(myabandeh): add an option to allow user skipping this cost
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = batch->Iterate(&counter);
assert(s.ok());
batch_cnt = counter.BatchCount();
}
assert(batch_cnt);
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
bool sync = write_options_.sync;
if (!do_one_write) {
@ -116,12 +195,12 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber;
const bool INCLUDES_DATA = true;
const size_t ZERO_PREPARES = 0;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, kMaxSequenceNumber, INCLUDES_DATA);
auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr,
no_log_ref, !DISABLE_MEMTABLE, &seq_used,
do_one_write ? &update_commit_map : nullptr);
wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
auto s = db_impl_->WriteImpl(
write_options_, batch, nullptr, nullptr, no_log_ref, !DISABLE_MEMTABLE,
&seq_used, batch_cnt, do_one_write ? &update_commit_map : nullptr);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
uint64_t& prepare_seq = seq_used;
SetId(prepare_seq);
@ -144,13 +223,14 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
// Commit the batch by writing an empty batch to the 2nd queue that will
// release the commit sequence number to readers.
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, prepare_seq);
wpt_db_, db_impl_, prepare_seq, batch_cnt);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
const size_t ONE_BATCH = 1;
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
return s;
@ -175,17 +255,26 @@ Status WritePreparedTxn::CommitInternal() {
auto prepare_seq = GetId();
const bool includes_data = !empty && !for_recovery;
assert(prepare_batch_cnt_);
size_t commit_batch_cnt = 0;
if (includes_data) {
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = working_batch->Iterate(&counter);
assert(s.ok());
commit_batch_cnt = counter.BatchCount();
}
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, prepare_seq, includes_data);
wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
const bool disable_memtable = !includes_data;
uint64_t seq_used = kMaxSequenceNumber;
// Since the prepared batch is directly written to memtable, there is already
// a connection between the memtable and its WAL, so there is no need to
// redundantly reference the log that contains the prepared data.
const uint64_t zero_log_number = 0ull;
size_t batch_cnt = commit_batch_cnt ? commit_batch_cnt : 1;
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used,
&update_commit_map);
batch_cnt, &update_commit_map);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
return s;
}
@ -203,15 +292,35 @@ Status WritePreparedTxn::RollbackInternal() {
ReadOptions roptions;
WritePreparedTxnReadCallback callback;
WriteBatch* rollback_batch_;
RollbackWriteBatchBuilder(DBImpl* db, WritePreparedTxnDB* wpt_db,
SequenceNumber snap_seq, WriteBatch* dst_batch)
: db_(db), callback(wpt_db, snap_seq), rollback_batch_(dst_batch) {}
std::map<uint32_t, const Comparator*>& comparators_;
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
RollbackWriteBatchBuilder(
DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators)
: db_(db),
callback(wpt_db, snap_seq),
rollback_batch_(dst_batch),
comparators_(comparators) {}
Status Rollback(uint32_t cf, const Slice& key) {
Status s;
CFKeys& cf_keys = keys_[cf];
if (cf_keys.size() == 0) { // just inserted
auto cmp = comparators_[cf];
keys_[cf] = CFKeys(SetComparator(cmp));
}
auto it = cf_keys.insert(key);
if (it.second ==
false) { // second is false if a element already existed.
return s;
}
PinnableSlice pinnable_val;
bool not_used;
auto cf_handle = db_->GetColumnFamilyHandle(cf);
auto s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
&callback);
assert(s.ok() || s.IsNotFound());
if (s.ok()) {
@ -254,7 +363,8 @@ Status WritePreparedTxn::RollbackInternal() {
protected:
virtual bool WriteAfterCommit() const override { return false; }
} rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch);
} rollback_handler(db_impl_, wpt_db_, last_visible_txn, &rollback_batch,
*wpt_db_->GetCFComparatorMap());
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
assert(s.ok());
if (!s.ok()) {
@ -266,11 +376,12 @@ Status WritePreparedTxn::RollbackInternal() {
const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber;
const bool INCLUDES_DATA = true;
const size_t ZERO_PREPARES = 0;
const size_t ONE_BATCH = 1;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, kMaxSequenceNumber, INCLUDES_DATA);
wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, ONE_BATCH);
s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
no_log_ref, !DISABLE_MEMTABLE, &seq_used,
no_log_ref, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
do_one_write ? &update_commit_map : nullptr);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (!s.ok()) {
@ -289,13 +400,13 @@ Status WritePreparedTxn::RollbackInternal() {
// Commit the batch by writing an empty batch to the queue that will release
// the commit sequence number to readers.
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, prepare_seq);
wpt_db_, db_impl_, prepare_seq, ONE_BATCH);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Mark the txn as rolled back
@ -334,6 +445,18 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
&snap_checker);
}
Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
prepare_batch_cnt_ = 1;
if (GetWriteBatch()->HasDuplicateKeys()) {
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter);
assert(s.ok());
prepare_batch_cnt_ = counter.BatchCount();
}
return ret;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -68,7 +68,7 @@ class WritePreparedTxn : public PessimisticTransaction {
Status CommitWithoutPrepareInternal() override;
Status CommitBatchInternal(WriteBatch* batch) override;
Status CommitBatchInternal(WriteBatch* batch, size_t batch_cnt) override;
// Since the data is already written to memtables at the Prepare phase, the
// commit entails writing only a commit marker in the WAL. The sequence number
@ -84,11 +84,15 @@ class WritePreparedTxn : public PessimisticTransaction {
const Slice& key,
SequenceNumber* tracked_at_seq) override;
virtual Status RebuildFromWriteBatch(WriteBatch* src_batch) override;
// No copying allowed
WritePreparedTxn(const WritePreparedTxn&);
void operator=(const WritePreparedTxn&);
WritePreparedTxnDB* wpt_db_;
// Number of sub-batches in prepare
size_t prepare_batch_cnt_ = 0;
};
} // namespace rocksdb

@ -11,7 +11,6 @@
#include "utilities/transactions/write_prepared_txn_db.h"
#include <inttypes.h>
#include <algorithm>
#include <string>
#include <unordered_set>
@ -49,6 +48,20 @@ Status WritePreparedTxnDB::Initialize(
return s;
}
Status WritePreparedTxnDB::VerifyCFOptions(
const ColumnFamilyOptions& cf_options) {
Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options);
if (!s.ok()) {
return s;
}
if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) {
return Status::InvalidArgument(
"memtable_factory->CanHandleDuplicatedKey() cannot be false with "
"WritePrpeared transactions");
}
return Status::OK();
}
Transaction* WritePreparedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {

@ -205,6 +205,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Struct to hold ownership of snapshot and read callback for cleanup.
struct IteratorState;
protected:
virtual Status VerifyCFOptions(
const ColumnFamilyOptions& cf_options) override;
private:
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
@ -401,30 +405,48 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
public:
// includes_data indicates that the commit also writes non-empty
// CommitTimeWriteBatch to memtable, which needs to be committed separately.
WritePreparedCommitEntryPreReleaseCallback(
WritePreparedTxnDB* db, DBImpl* db_impl,
SequenceNumber prep_seq = kMaxSequenceNumber, bool includes_data = false)
WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db,
DBImpl* db_impl,
SequenceNumber prep_seq,
size_t prep_batch_cnt,
size_t data_batch_cnt = 0)
: db_(db),
db_impl_(db_impl),
prep_seq_(prep_seq),
includes_data_(includes_data) {}
prep_batch_cnt_(prep_batch_cnt),
data_batch_cnt_(data_batch_cnt),
includes_data_(data_batch_cnt_ > 0) {
assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
}
virtual Status Callback(SequenceNumber commit_seq) {
virtual Status Callback(SequenceNumber commit_seq) override {
assert(includes_data_ || prep_seq_ != kMaxSequenceNumber);
const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
? commit_seq
: commit_seq + data_batch_cnt_ - 1;
if (prep_seq_ != kMaxSequenceNumber) {
db_->AddCommitted(prep_seq_, commit_seq);
for (size_t i = 0; i < prep_batch_cnt_; i++) {
db_->AddCommitted(prep_seq_ + i, last_commit_seq);
}
} // else there was no prepare phase
if (includes_data_) {
assert(data_batch_cnt_);
// Commit the data that is accompnaied with the commit request
const bool PREPARE_SKIPPED = true;
db_->AddCommitted(commit_seq, commit_seq, PREPARE_SKIPPED);
for (size_t i = 0; i < data_batch_cnt_; i++) {
// For commit seq of each batch use the commit seq of the last batch.
// This would make debugging easier by having all the batches having
// the same sequence number.
db_->AddCommitted(commit_seq + i, last_commit_seq, PREPARE_SKIPPED);
}
}
if (db_impl_->immutable_db_options().two_write_queues) {
// Publish the sequence number. We can do that here assuming the callback
// is invoked only from one write queue, which would guarantee that the
// publish sequence numbers will be in order, i.e., once a seq is
// published all the seq prior to that are also publishable.
db_impl_->SetLastPublishedSequence(commit_seq);
db_impl_->SetLastPublishedSequence(last_commit_seq);
}
// else SequenceNumber that is updated as part of the write already does the
// publishing
@ -436,6 +458,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
DBImpl* db_impl_;
// kMaxSequenceNumber if there was no prepare phase
SequenceNumber prep_seq_;
size_t prep_batch_cnt_;
size_t data_batch_cnt_;
// Either because it is commit without prepare or it has a
// CommitTimeWriteBatch
bool includes_data_;

@ -582,67 +582,8 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
bool WriteBatchWithIndex::Collapse() {
if (rep->obsolete_offsets.size() == 0) {
return false;
}
std::sort(rep->obsolete_offsets.begin(), rep->obsolete_offsets.end());
WriteBatch& write_batch = rep->write_batch;
assert(write_batch.Count() != 0);
size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch);
Slice input(write_batch.Data());
input.remove_prefix(offset);
std::string collapsed_buf;
collapsed_buf.resize(WriteBatchInternal::kHeader);
size_t count = 0;
Status s;
// Loop through all entries in the write batch and add keep them if they are
// not obsolete by a newere entry.
while (s.ok() && !input.empty()) {
Slice key, value, blob, xid;
uint32_t column_family_id = 0; // default
char tag = 0;
// set offset of current entry for call to AddNewEntry()
size_t last_entry_offset = input.data() - write_batch.Data().data();
s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
&value, &blob, &xid);
if (!rep->obsolete_offsets.empty() &&
rep->obsolete_offsets.front() == last_entry_offset) {
rep->obsolete_offsets.erase(rep->obsolete_offsets.begin());
continue;
}
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
case kTypeColumnFamilyMerge:
case kTypeMerge:
count++;
break;
case kTypeLogData:
case kTypeBeginPrepareXID:
case kTypeBeginPersistedPrepareXID:
case kTypeEndPrepareXID:
case kTypeCommitXID:
case kTypeRollbackXID:
case kTypeNoop:
break;
default:
assert(0);
}
size_t entry_offset = input.data() - write_batch.Data().data();
const std::string& wb_data = write_batch.Data();
Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
entry_offset - last_entry_offset);
collapsed_buf.append(entry_ptr.data(), entry_ptr.size());
}
write_batch.rep_ = std::move(collapsed_buf);
WriteBatchInternal::SetCount(&write_batch, static_cast<int>(count));
return true;
bool WriteBatchWithIndex::HasDuplicateKeys() {
return rep->obsolete_offsets.size() > 0;
}
WBWIIterator* WriteBatchWithIndex::NewIterator() {
@ -758,15 +699,7 @@ Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
rep->SetLastEntryOffset();
auto s = rep->write_batch.Merge(column_family, key, value);
if (s.ok()) {
auto size_before = rep->obsolete_offsets.size();
rep->AddOrUpdateIndex(column_family, key);
auto size_after = rep->obsolete_offsets.size();
bool duplicate_key = size_before != size_after;
if (!allow_dup_merge_ && duplicate_key) {
assert(0);
return Status::NotSupported(
"Duplicate key with merge value is not supported yet");
}
}
return s;
}
@ -775,15 +708,7 @@ Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
rep->SetLastEntryOffset();
auto s = rep->write_batch.Merge(key, value);
if (s.ok()) {
auto size_before = rep->obsolete_offsets.size();
rep->AddOrUpdateIndex(key);
auto size_after = rep->obsolete_offsets.size();
bool duplicate_key = size_before != size_after;
if (!allow_dup_merge_ && duplicate_key) {
assert(0);
return Status::NotSupported(
"Duplicate key with merge value is not supported yet");
}
}
return s;
}
@ -958,8 +883,9 @@ Status WriteBatchWithIndex::RollbackToSavePoint() {
Status s = rep->write_batch.RollbackToSavePoint();
if (s.ok()) {
s = rep->ReBuildIndex();
// obsolete_offsets will be rebuilt by ReBuildIndex
rep->obsolete_offsets.clear();
s = rep->ReBuildIndex();
}
return s;

Loading…
Cancel
Save