WritePrepared Txn: enable TryAgain for duplicates at the end of the batch

Summary:
The WriteBatch::Iterate will try with a larger sequence number if the memtable reports a duplicate. This status is specified with TryAgain status. So far the assumption was that the last entry in the batch will never return TryAgain, which is correct when WAL is created via WritePrepared since it always appends a batch separator if a natural one does not exist. However when reading a WAL generated by WriteCommitted this batch separator might  not exist. Although WritePrepared is not supposed to be able to read the WAL generated by WriteCommitted we should avoid confusing scenarios in which the behavior becomes unpredictable. The path fixes that by allowing TryAgain even for the last entry of the write batch.
Closes https://github.com/facebook/rocksdb/pull/3747

Differential Revision: D7708391

Pulled By: maysamyabandeh

fbshipit-source-id: bfaddaa9b14a4cdaff6977f6f63c789a6ab1ee0d
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 17e04039dd
commit c3d1e36cce
  1. 11
      db/write_batch.cc
  2. 16
      utilities/transactions/transaction_test.h

@ -405,9 +405,11 @@ Status WriteBatch::Iterate(Handler* handler) const {
Status s;
char tag = 0;
uint32_t column_family = 0; // default
while ((s.ok() || UNLIKELY(s.IsTryAgain())) && !input.empty() &&
bool last_was_try_again = false;
while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain())) &&
handler->Continue()) {
if (LIKELY(!s.IsTryAgain())) {
last_was_try_again = false;
tag = 0;
column_family = 0; // default
@ -418,6 +420,13 @@ Status WriteBatch::Iterate(Handler* handler) const {
}
} else {
assert(s.IsTryAgain());
assert(!last_was_try_again); // to detect infinite loop bugs
if (UNLIKELY(last_was_try_again)) {
return Status::Corruption(
"two consecutive TryAgain in WriteBatch handler; this is either a "
"software bug or data corruption.");
}
last_was_try_again = true;
s = Status::OK();
}

@ -359,12 +359,8 @@ class TransactionTestBase : public ::testing::Test {
WriteBatch wb;
committed_kvs[k] = v;
wb.Put(k, v);
// TODO(myabandeh): remove this when we supprot duplicate keys in
// db->Write method
if (false) {
committed_kvs[k] = v2;
wb.Put(k, v2);
}
committed_kvs[k] = v2;
wb.Put(k, v2);
s = db->Write(write_options, &wb);
ASSERT_OK(s);
} break;
@ -376,12 +372,8 @@ class TransactionTestBase : public ::testing::Test {
committed_kvs[k] = v;
s = txn->Put(k, v);
ASSERT_OK(s);
// TODO(myabandeh): remove this when we supprot duplicate keys in
// db->Write method
if (false) {
committed_kvs[k] = v2;
s = txn->Put(k, v2);
}
committed_kvs[k] = v2;
s = txn->Put(k, v2);
ASSERT_OK(s);
if (type == 3) {
s = txn->Prepare();

Loading…
Cancel
Save