Fix 2PC Recovery SeqId Miscount

Summary:
Originally sequence ids were calculated, in recovery, based off of the first seqid found if the first log recovered. The working seqid was then incremented from that value based on every insertion that took place. This was faulty because of the potential for missing log files or inserts that skipped the WAL. The current recovery scheme grabs sequence from current recovering batch and increments using memtableinserter to track how many actual inserts take place. This works for 2PC batches as well scenarios where some logs are missing or inserts that skip the WAL.
Closes https://github.com/facebook/rocksdb/pull/1486

Differential Revision: D4156064

Pulled By: reidHoruff

fbshipit-source-id: a6da8d9
main
Reid Horuff 8 years ago committed by Facebook Github Bot
parent e095d0cbc7
commit 1ca5f6d132
  1. 21
      db/db_impl.cc
  2. 2
      db/db_wal_test.cc
  3. 59
      utilities/transactions/transaction_test.cc

@ -1516,18 +1516,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
} }
} }
bool no_prev_seq = true;
if (!immutable_db_options_.allow_2pc) {
*next_sequence = sequence;
} else {
if (*next_sequence == kMaxSequenceNumber) {
*next_sequence = sequence;
} else {
no_prev_seq = false;
WriteBatchInternal::SetSequence(&batch, *next_sequence);
}
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (immutable_db_options_.wal_filter != nullptr) { if (immutable_db_options_.wal_filter != nullptr) {
WriteBatch new_batch; WriteBatch new_batch;
@ -1614,15 +1602,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
&batch, column_family_memtables_.get(), &flush_scheduler_, true, &batch, column_family_memtables_.get(), &flush_scheduler_, true,
log_number, this, false /* concurrent_memtable_writes */, log_number, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes); next_sequence, &has_valid_writes);
// If it is the first log file and there is no column family updated
// after replaying the file, this file may be a stale file. We ignore
// sequence IDs from the file. Otherwise, if a newer stale log file that
// has been deleted, the sequenceID may be wrong.
if (immutable_db_options_.allow_2pc) {
if (no_prev_seq && !has_valid_writes) {
*next_sequence = kMaxSequenceNumber;
}
}
MaybeIgnoreError(&status); MaybeIgnoreError(&status);
if (!status.ok()) { if (!status.ok()) {
// We are treating this as a failure while reading since we read valid // We are treating this as a failure while reading since we read valid

@ -645,8 +645,6 @@ TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = fault_env.get(); options.env = fault_env.get();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
// TODO(yiwu): fix for 2PC.
options.allow_2pc = false;
WriteOptions wal_on, wal_off; WriteOptions wal_on, wal_off;
wal_on.sync = true; wal_on.sync = true;
wal_on.disableWAL = false; wal_on.disableWAL = false;

@ -1178,6 +1178,65 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
delete cfb; delete cfb;
} }
/*
* 1) use prepare to keep first log around to determine starting sequence
* during recovery.
* 2) insert many values, skipping wal, to increase seqid.
* 3) insert final value into wal
* 4) recover and see that final value was properly recovered - not
* hidden behind improperly summed sequence ids
*/
TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
WriteOptions wal_on, wal_off;
wal_on.sync = true;
wal_on.disableWAL = false;
wal_off.disableWAL = true;
ReadOptions read_options;
TransactionOptions txn_options;
std::string value;
Status s;
Transaction* txn1 = db->BeginTransaction(wal_on, txn_options);
s = txn1->SetName("1");
ASSERT_OK(s);
s = db->Put(wal_on, "first", "first");
ASSERT_OK(s);
s = txn1->Put(Slice("dummy"), Slice("dummy"));
ASSERT_OK(s);
s = txn1->Prepare();
ASSERT_OK(s);
s = db->Put(wal_off, "cats", "dogs1");
ASSERT_OK(s);
s = db->Put(wal_off, "cats", "dogs2");
ASSERT_OK(s);
s = db->Put(wal_off, "cats", "dogs3");
ASSERT_OK(s);
s = db_impl->TEST_FlushMemTable(true);
ASSERT_OK(s);
s = db->Put(wal_on, "cats", "dogs4");
ASSERT_OK(s);
// kill and reopen
env->SetFilesystemActive(false);
ReOpenNoDelete();
s = db->Get(read_options, "first", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "first");
s = db->Get(read_options, "cats", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "dogs4");
}
TEST_P(TransactionTest, FirstWriteTest) { TEST_P(TransactionTest, FirstWriteTest) {
WriteOptions write_options; WriteOptions write_options;

Loading…
Cancel
Save