Recover same sequence id from WAL (#1350)

Summary:
Revert the behavior where we don't read sequence id from WAL, but increase it as we replay the log. We still keep the behave for 2PC for now but will fix later.

This change fixes github issue 1339, where some writes come with WAL disabled and we may recover records with wrong sequence id.

Test Plan: Added unit test.

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D64275
main
yiwu-arbug 8 years ago committed by Siying Dong
parent 0a1bd9c509
commit 4bc8c88e6b
  1. 5
      Makefile
  2. 18
      db/db_impl.cc
  3. 2
      db/db_impl.h
  4. 36
      db/db_wal_test.cc

@ -393,9 +393,10 @@ PARALLEL_TEST = \
db_compaction_filter_test \ db_compaction_filter_test \
db_compaction_test \ db_compaction_test \
db_sst_test \ db_sst_test \
external_sst_file_test \
db_test \ db_test \
db_universal_compaction_test \ db_universal_compaction_test \
db_wal_test \
external_sst_file_test \
fault_injection_test \ fault_injection_test \
inlineskiplist_test \ inlineskiplist_test \
manual_compaction_test \ manual_compaction_test \
@ -599,7 +600,7 @@ gen_parallel_tests:
# 107.816 PASS t/DBTest.EncodeDecompressedBlockSizeTest # 107.816 PASS t/DBTest.EncodeDecompressedBlockSizeTest
# #
slow_test_regexp = \ slow_test_regexp = \
^t/run-table_test-HarnessTest.Randomized$$|^t/run-db_test-.*(?:FileCreationRandomFailure|EncodeDecompressedBlockSizeTest)$$ ^t/run-table_test-HarnessTest.Randomized$$|^t/run-db_test-.*(?:FileCreationRandomFailure|EncodeDecompressedBlockSizeTest)$$|^.*RecoverFromCorruptedWALWithoutFlush$$
prioritize_long_running_tests = \ prioritize_long_running_tests = \
perl -pe 's,($(slow_test_regexp)),100 $$1,' \ perl -pe 's,($(slow_test_regexp)),100 $$1,' \
| sort -k1,1gr \ | sort -k1,1gr \

@ -1414,7 +1414,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
bool stop_replay_by_wal_filter = false; bool stop_replay_by_wal_filter = false;
bool stop_replay_for_corruption = false; bool stop_replay_for_corruption = false;
bool flushed = false; bool flushed = false;
SequenceNumber recovered_sequence = 0;
for (auto log_number : log_numbers) { for (auto log_number : log_numbers) {
// The previous incarnation may not have written any MANIFEST // The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually // records after allocating this log number. So we manually
@ -1493,13 +1492,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
WriteBatchInternal::SetContents(&batch, record); WriteBatchInternal::SetContents(&batch, record);
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
// In point-in-time recovery mode, if sequence id of log files are
// consecutive, we continue recovery despite corruption. This could happen
// when we open and write to a corrupted DB, where sequence id will start
// from the last sequence id we recovered.
if (db_options_.wal_recovery_mode == if (db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) { WALRecoveryMode::kPointInTimeRecovery) {
if (sequence == recovered_sequence + 1) { // In point-in-time recovery mode, if sequence id of log files are
// consecutive, we continue recovery despite corruption. This could
// happen when we open and write to a corrupted DB, where sequence id
// will start from the last sequence id we recovered.
if (sequence == *next_sequence) {
stop_replay_for_corruption = false; stop_replay_for_corruption = false;
} }
if (stop_replay_for_corruption) { if (stop_replay_for_corruption) {
@ -1508,14 +1507,17 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
} }
} }
recovered_sequence = sequence;
bool no_prev_seq = true; bool no_prev_seq = true;
if (!db_options_.allow_2pc) {
*next_sequence = sequence;
} else {
if (*next_sequence == kMaxSequenceNumber) { if (*next_sequence == kMaxSequenceNumber) {
*next_sequence = sequence; *next_sequence = sequence;
} else { } else {
no_prev_seq = false; no_prev_seq = false;
WriteBatchInternal::SetSequence(&batch, *next_sequence); WriteBatchInternal::SetSequence(&batch, *next_sequence);
} }
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (db_options_.wal_filter != nullptr) { if (db_options_.wal_filter != nullptr) {
@ -1605,9 +1607,11 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// after replaying the file, this file may be a stale file. We ignore // after replaying the file, this file may be a stale file. We ignore
// sequence IDs from the file. Otherwise, if a newer stale log file that // sequence IDs from the file. Otherwise, if a newer stale log file that
// has been deleted, the sequenceID may be wrong. // has been deleted, the sequenceID may be wrong.
if (db_options_.allow_2pc) {
if (no_prev_seq && !has_valid_writes) { if (no_prev_seq && !has_valid_writes) {
*next_sequence = kMaxSequenceNumber; *next_sequence = kMaxSequenceNumber;
} }
}
MaybeIgnoreError(&status); MaybeIgnoreError(&status);
if (!status.ok()) { if (!status.ok()) {
// We are treating this as a failure while reading since we read valid // We are treating this as a failure while reading since we read valid

@ -619,7 +619,7 @@ class DBImpl : public DB {
// REQUIRES: log_numbers are sorted in ascending order // REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers, Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* max_sequence, bool read_only); SequenceNumber* next_sequence, bool read_only);
// The following two methods are used to flush a memtable to // The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the // storage. The first one is used at database RecoveryTime (when the

@ -9,6 +9,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "util/fault_injection_test_env.h"
#include "util/options_helper.h" #include "util/options_helper.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -598,6 +599,41 @@ TEST_F(DBWALTest, SyncMultipleLogs) {
ASSERT_OK(dbfull()->SyncWAL()); ASSERT_OK(dbfull()->SyncWAL());
} }
// Github issue 1339. Prior the fix we read sequence id from the first log to
// a local variable, then keep increase the variable as we replay logs,
// ignoring actual sequence id of the records. This is incorrect if some writes
// come with WAL disabled.
TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.env = fault_env.get();
options.disable_auto_compactions = true;
// TODO(yiwu): fix for 2PC.
options.allow_2pc = false;
WriteOptions wal_on, wal_off;
wal_on.sync = true;
wal_on.disableWAL = false;
wal_off.disableWAL = true;
CreateAndReopenWithCF({"dummy"}, options);
ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1
ASSERT_OK(Put(1, "dummy", "d2", wal_off));
ASSERT_OK(Put(1, "dummy", "d3", wal_off));
ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4
ASSERT_OK(Flush(0));
ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
ASSERT_EQ("v5", Get(0, "key"));
// Simulate a crash.
fault_env->SetFilesystemActive(false);
Close();
fault_env->ResetState();
ReopenWithColumnFamilies({"default", "dummy"}, options);
// Prior to the fix, we may incorrectly recover "v5" with sequence id = 3.
ASSERT_EQ("v5", Get(0, "key"));
// Destroy DB before destruct fault_env.
Destroy(options);
}
// //
// Test WAL recovery for the various modes available // Test WAL recovery for the various modes available
// //

Loading…
Cancel
Save