diff --git a/Makefile b/Makefile index d434041da..d96dee283 100644 --- a/Makefile +++ b/Makefile @@ -393,9 +393,10 @@ PARALLEL_TEST = \ db_compaction_filter_test \ db_compaction_test \ db_sst_test \ - external_sst_file_test \ db_test \ db_universal_compaction_test \ + db_wal_test \ + external_sst_file_test \ fault_injection_test \ inlineskiplist_test \ manual_compaction_test \ @@ -599,7 +600,7 @@ gen_parallel_tests: # 107.816 PASS t/DBTest.EncodeDecompressedBlockSizeTest # slow_test_regexp = \ - ^t/run-table_test-HarnessTest.Randomized$$|^t/run-db_test-.*(?:FileCreationRandomFailure|EncodeDecompressedBlockSizeTest)$$ + ^t/run-table_test-HarnessTest.Randomized$$|^t/run-db_test-.*(?:FileCreationRandomFailure|EncodeDecompressedBlockSizeTest)$$|^.*RecoverFromCorruptedWALWithoutFlush$$ prioritize_long_running_tests = \ perl -pe 's,($(slow_test_regexp)),100 $$1,' \ | sort -k1,1gr \ diff --git a/db/db_impl.cc b/db/db_impl.cc index e3d1a4491..e26a33d45 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1414,7 +1414,6 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, bool stop_replay_by_wal_filter = false; bool stop_replay_for_corruption = false; bool flushed = false; - SequenceNumber recovered_sequence = 0; for (auto log_number : log_numbers) { // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually @@ -1493,13 +1492,13 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, WriteBatchInternal::SetContents(&batch, record); SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); - // In point-in-time recovery mode, if sequence id of log files are - // consecutive, we continue recovery despite corruption. This could happen - // when we open and write to a corrupted DB, where sequence id will start - // from the last sequence id we recovered. if (db_options_.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { - if (sequence == recovered_sequence + 1) { + // In point-in-time recovery mode, if sequence id of log files are + // consecutive, we continue recovery despite corruption. This could + // happen when we open and write to a corrupted DB, where sequence id + // will start from the last sequence id we recovered. + if (sequence == *next_sequence) { stop_replay_for_corruption = false; } if (stop_replay_for_corruption) { @@ -1508,13 +1507,16 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } } - recovered_sequence = sequence; bool no_prev_seq = true; - if (*next_sequence == kMaxSequenceNumber) { + if (!db_options_.allow_2pc) { *next_sequence = sequence; } else { - no_prev_seq = false; - WriteBatchInternal::SetSequence(&batch, *next_sequence); + if (*next_sequence == kMaxSequenceNumber) { + *next_sequence = sequence; + } else { + no_prev_seq = false; + WriteBatchInternal::SetSequence(&batch, *next_sequence); + } } #ifndef ROCKSDB_LITE @@ -1605,8 +1607,10 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // after replaying the file, this file may be a stale file. We ignore // sequence IDs from the file. Otherwise, if a newer stale log file that // has been deleted, the sequenceID may be wrong. - if (no_prev_seq && !has_valid_writes) { - *next_sequence = kMaxSequenceNumber; + if (db_options_.allow_2pc) { + if (no_prev_seq && !has_valid_writes) { + *next_sequence = kMaxSequenceNumber; + } } MaybeIgnoreError(&status); if (!status.ok()) { diff --git a/db/db_impl.h b/db/db_impl.h index 49c3a1584..b7dd36b95 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -619,7 +619,7 @@ class DBImpl : public DB { // REQUIRES: log_numbers are sorted in ascending order Status RecoverLogFiles(const std::vector& log_numbers, - SequenceNumber* max_sequence, bool read_only); + SequenceNumber* next_sequence, bool read_only); // The following two methods are used to flush a memtable to // storage. The first one is used at database RecoveryTime (when the diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 0d2a74e0a..1d5c046b8 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -9,6 +9,7 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" +#include "util/fault_injection_test_env.h" #include "util/options_helper.h" #include "util/sync_point.h" @@ -598,6 +599,41 @@ TEST_F(DBWALTest, SyncMultipleLogs) { ASSERT_OK(dbfull()->SyncWAL()); } +// Github issue 1339. Prior the fix we read sequence id from the first log to +// a local variable, then keep increase the variable as we replay logs, +// ignoring actual sequence id of the records. This is incorrect if some writes +// come with WAL disabled. +TEST_F(DBWALTest, PartOfWritesWithWALDisabled) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(env_)); + Options options = CurrentOptions(); + options.env = fault_env.get(); + options.disable_auto_compactions = true; + // TODO(yiwu): fix for 2PC. + options.allow_2pc = false; + WriteOptions wal_on, wal_off; + wal_on.sync = true; + wal_on.disableWAL = false; + wal_off.disableWAL = true; + CreateAndReopenWithCF({"dummy"}, options); + ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1 + ASSERT_OK(Put(1, "dummy", "d2", wal_off)); + ASSERT_OK(Put(1, "dummy", "d3", wal_off)); + ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4 + ASSERT_OK(Flush(0)); + ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5 + ASSERT_EQ("v5", Get(0, "key")); + // Simulate a crash. + fault_env->SetFilesystemActive(false); + Close(); + fault_env->ResetState(); + ReopenWithColumnFamilies({"default", "dummy"}, options); + // Prior to the fix, we may incorrectly recover "v5" with sequence id = 3. + ASSERT_EQ("v5", Get(0, "key")); + // Destroy DB before destruct fault_env. + Destroy(options); +} + // // Test WAL recovery for the various modes available //