WritePrepared: disableWAL in commit without prepare (#5327)

Summary:
When committing a transaction without prepare, WritePrepared simply writes the batch to db and add the commit entry to CommitCache. When two_write_queues=true, following the rule of committing only from 2nd write queue, the first write, writes the batch and the only thing the 2nd write does is to write the commit entry to CommitCache. Currently the write batch in 2nd write is set to an empty LogData entry, while the write to the WAL could simply be entirely disabled.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5327

Differential Revision: D15424546

Pulled By: maysamyabandeh

fbshipit-source-id: 3d9ea3922d5196984c584d62a3ed57e1f7ca7b9f
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent 4d0c3b1f96
commit f5576c3317
  1. 2
      utilities/transactions/pessimistic_transaction_db.cc
  2. 5
      utilities/transactions/transaction_test.h
  3. 7
      utilities/transactions/write_prepared_transaction_test.cc
  4. 14
      utilities/transactions/write_prepared_txn_db.cc

@ -235,7 +235,7 @@ Status TransactionDB::Open(
if (txn_db_options.write_policy == WRITE_PREPARED && if (txn_db_options.write_policy == WRITE_PREPARED &&
db_options.unordered_write && !db_options.two_write_queues) { db_options.unordered_write && !db_options.two_write_queues) {
return Status::NotSupported( return Status::NotSupported(
"WRITE_UNPREPARED is incompatible with unordered_writes if " "WRITE_PREPARED is incompatible with unordered_writes if "
"two_write_queues is not enabled."); "two_write_queues is not enabled.");
} }

@ -214,6 +214,8 @@ class TransactionTestBase : public ::testing::Test {
std::atomic<size_t> exp_seq = {0}; std::atomic<size_t> exp_seq = {0};
std::atomic<size_t> commit_writes = {0}; std::atomic<size_t> commit_writes = {0};
std::atomic<size_t> expected_commits = {0}; std::atomic<size_t> expected_commits = {0};
// Without Prepare, the commit does not write to WAL
std::atomic<size_t> with_empty_commits = {0};
std::function<void(size_t, Status)> txn_t0_with_status = [&](size_t index, std::function<void(size_t, Status)> txn_t0_with_status = [&](size_t index,
Status exp_s) { Status exp_s) {
// Test DB's internal txn. It involves no prepare phase nor a commit marker. // Test DB's internal txn. It involves no prepare phase nor a commit marker.
@ -231,6 +233,7 @@ class TransactionTestBase : public ::testing::Test {
exp_seq++; exp_seq++;
} }
} }
with_empty_commits++;
}; };
std::function<void(size_t)> txn_t0 = [&](size_t index) { std::function<void(size_t)> txn_t0 = [&](size_t index) {
return txn_t0_with_status(index, Status::OK()); return txn_t0_with_status(index, Status::OK());
@ -257,6 +260,7 @@ class TransactionTestBase : public ::testing::Test {
} }
} }
ASSERT_OK(s); ASSERT_OK(s);
with_empty_commits++;
}; };
std::function<void(size_t)> txn_t2 = [&](size_t index) { std::function<void(size_t)> txn_t2 = [&](size_t index) {
// Commit without prepare. It should write to DB without a commit marker. // Commit without prepare. It should write to DB without a commit marker.
@ -282,6 +286,7 @@ class TransactionTestBase : public ::testing::Test {
} }
} }
delete txn; delete txn;
with_empty_commits++;
}; };
std::function<void(size_t)> txn_t3 = [&](size_t index) { std::function<void(size_t)> txn_t3 = [&](size_t index) {
// A full 2pc txn that also involves a commit marker. // A full 2pc txn that also involves a commit marker.

@ -1396,6 +1396,7 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) {
} }
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
auto seq = db_impl->TEST_GetLastVisibleSequence(); auto seq = db_impl->TEST_GetLastVisibleSequence();
with_empty_commits = 0;
exp_seq = seq; exp_seq = seq;
// This is increased before writing the batch for commit // This is increased before writing the batch for commit
commit_writes = 0; commit_writes = 0;
@ -1487,12 +1488,12 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) {
assert(db != nullptr); assert(db != nullptr);
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->TEST_GetLastVisibleSequence(); seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_LE(exp_seq, seq); ASSERT_LE(exp_seq, seq + with_empty_commits);
// Check if flush preserves the last sequence number // Check if flush preserves the last sequence number
db_impl->Flush(fopt); db_impl->Flush(fopt);
seq = db_impl->GetLatestSequenceNumber(); seq = db_impl->GetLatestSequenceNumber();
ASSERT_LE(exp_seq, seq); ASSERT_LE(exp_seq, seq + with_empty_commits);
// Check if recovery after flush preserves the last sequence number // Check if recovery after flush preserves the last sequence number
db_impl->FlushWAL(true); db_impl->FlushWAL(true);
@ -1500,7 +1501,7 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) {
assert(db != nullptr); assert(db != nullptr);
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB()); db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber(); seq = db_impl->GetLatestSequenceNumber();
ASSERT_LE(exp_seq, seq); ASSERT_LE(exp_seq, seq + with_empty_commits);
} }
} }

@ -151,11 +151,6 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
WriteOptions write_options(write_options_orig); WriteOptions write_options(write_options_orig);
bool sync = write_options.sync;
if (!do_one_write) {
// No need to sync on the first write
write_options.sync = false;
}
// In the absence of Prepare markers, use Noop as a batch separator // In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(batch); WriteBatchInternal::InsertNoop(batch);
const bool DISABLE_MEMTABLE = true; const bool DISABLE_MEMTABLE = true;
@ -192,8 +187,6 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
if (do_one_write) { if (do_one_write) {
return s; return s;
} // else do the 2nd write for commit } // else do the 2nd write for commit
// Set the original value of sync
write_options.sync = sync;
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitBatchInternal 2nd write prepare_seq: %" PRIu64, "CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
prepare_seq); prepare_seq);
@ -203,10 +196,9 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS); this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS);
WriteBatch empty_batch; WriteBatch empty_batch;
empty_batch.PutLogData(Slice()); write_options.disableWAL = true;
const size_t ONE_BATCH = 1; write_options.sync = false;
// In the absence of Prepare markers, use Noop as a batch separator const size_t ONE_BATCH = 1; // Just to inc the seq
WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare); &update_commit_map_with_prepare);

Loading…
Cancel
Save