Double Crash in kPointInTimeRecovery with TransactionDB (#6313)

Summary:
In WritePrepared there could be gap in sequence numbers. This breaks the trick we use in kPointInTimeRecovery which assume the first seq in the log right after the corrupted log is one larger than the last seq we read from the logs. To let this trick keep working, we add a dummy entry with the expected sequence to the first log right after recovery.
Also in WriteCommitted, if the log right after the corrupted log is empty, since it has no sequence number to let the sequential trick work, it is assumed as unexpected behavior. This is however expected to happen if we close the db after recovering from a corruption and before writing anything new to it. To remedy that, we apply the same technique by writing a dummy entry to the log that is created after the corrupted log.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6313

Differential Revision: D19458291

Pulled By: maysamyabandeh

fbshipit-source-id: 09bc49e574690085df45b034ca863ff315937e2d
main
Maysam Yabandeh 5 years ago committed by Facebook Github Bot
parent a07a9dc904
commit 2f973ca96e
  1. 1
      HISTORY.md
  2. 9
      db/db_impl/db_impl.h
  3. 42
      db/db_impl/db_impl_open.cc
  4. 2
      db/db_impl/db_impl_secondary.cc
  5. 3
      db/db_impl/db_impl_secondary.h
  6. 3
      utilities/transactions/pessimistic_transaction_db.h
  7. 86
      utilities/transactions/transaction_test.cc
  8. 2
      utilities/transactions/transaction_test.h

@ -2,6 +2,7 @@
## Unreleased
### Bug Fixes
* Fix incorrect results while block-based table uses kHashSearch, together with Prev()/SeekForPrev().
* Fix a bug that prevents opening a DB after two consecutive crash with TransactionDB, where the first crash recovers from a corrupted WAL with kPointInTimeRecovery but the second cannot.
## 6.7.0 (01/21/2020)
### Public API Change

@ -1110,10 +1110,13 @@ class DBImpl : public DB {
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
// recovered_seq is set to less than kMaxSequenceNumber if the log's tail is
// skipped.
virtual Status Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_log_file_exist = false,
bool error_if_data_exists_in_logs = false);
bool error_if_data_exists_in_logs = false,
uint64_t* recovered_seq = nullptr);
virtual bool OwnTablesAndLogs() const { return true; }
@ -1355,8 +1358,10 @@ class DBImpl : public DB {
JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);
// REQUIRES: log_numbers are sorted in ascending order
// corrupted_log_found is set to true if we recover from a corrupted log file.
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_log_found);
// The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the

@ -345,7 +345,8 @@ Status Directories::SetDirectories(Env* env, const std::string& dbname,
Status DBImpl::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
bool error_if_log_file_exist, bool error_if_data_exists_in_logs) {
bool error_if_log_file_exist, bool error_if_data_exists_in_logs,
uint64_t* recovered_seq) {
mutex_.AssertHeld();
bool is_new_db = false;
@ -541,7 +542,12 @@ Status DBImpl::Recover(
if (!logs.empty()) {
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
s = RecoverLogFiles(logs, &next_sequence, read_only);
bool corrupted_log_found = false;
s = RecoverLogFiles(logs, &next_sequence, read_only,
&corrupted_log_found);
if (corrupted_log_found && recovered_seq != nullptr) {
*recovered_seq = next_sequence;
}
if (!s.ok()) {
// Clear memtables if recovery failed
for (auto cfd : *versions_->GetColumnFamilySet()) {
@ -671,7 +677,8 @@ Status DBImpl::InitPersistStatsColumnFamily() {
// REQUIRES: log_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only) {
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_log_found) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
@ -973,6 +980,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
status = Status::OK();
stop_replay_for_corruption = true;
corrupted_log_number = log_number;
if (corrupted_log_found != nullptr) {
*corrupted_log_found = true;
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Point in time recovered to log #%" PRIu64
" seq #%" PRIu64,
@ -1001,6 +1011,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// the corrupted log number, which means CF contains data beyond the point of
// corruption. This could during PIT recovery when the WAL is corrupted and
// some (but not all) CFs are flushed
// Exclude the PIT case where no log is dropped after the corruption point.
// This is to cover the case for empty logs after corrupted log, in which we
// don't reset stop_replay_for_corruption.
if (stop_replay_for_corruption == true &&
(immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery ||
@ -1396,7 +1409,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
impl->mutex_.Lock();
// Handles create_if_missing, error_if_exists
s = impl->Recover(column_families);
uint64_t recovered_seq(kMaxSequenceNumber);
s = impl->Recover(column_families, false, false, false, &recovered_seq);
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
log::Writer* new_log = nullptr;
@ -1454,9 +1468,29 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
if (impl->two_write_queues_) {
impl->log_write_mutex_.Unlock();
}
impl->DeleteObsoleteFiles();
s = impl->directories_.GetDbDir()->Fsync();
}
if (s.ok()) {
// In WritePrepared there could be gap in sequence numbers. This breaks
// the trick we use in kPointInTimeRecovery which assumes the first seq in
// the log right after the corrupted log is one larger than the last seq
// we read from the logs. To let this trick keep working, we add a dummy
// entry with the expected sequence to the first log right after recovery.
// In non-WritePrepared case also the new log after recovery could be
// empty, and thus missing the consecutive seq hint to distinguish
// middle-log corruption to corrupted-log-remained-after-recovery. This
// case also will be addressed by a dummy write.
if (recovered_seq != kMaxSequenceNumber) {
WriteBatch empty_batch;
WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
WriteOptions write_options;
uint64_t log_used, log_size;
log::Writer* log_writer = impl->logs_.back().writer;
s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
}
}
}
if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
// try to read format version but no need to fail Open() even if it fails

@ -29,7 +29,7 @@ DBImplSecondary::~DBImplSecondary() {}
Status DBImplSecondary::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool /*readonly*/, bool /*error_if_log_file_exist*/,
bool /*error_if_data_exists_in_logs*/) {
bool /*error_if_data_exists_in_logs*/, uint64_t*) {
mutex_.AssertHeld();
JobContext job_context(0);

@ -78,7 +78,8 @@ class DBImplSecondary : public DBImpl {
// and log_readers_ to facilitate future operations.
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only, bool error_if_log_file_exist,
bool error_if_data_exists_in_logs) override;
bool error_if_data_exists_in_logs,
uint64_t* = nullptr) override;
// Implementations of the DB interface
using DB::Get;

@ -158,12 +158,13 @@ class PessimisticTransactionDB : public TransactionDB {
friend class WritePreparedTxnDB;
friend class WritePreparedTxnDBMock;
friend class WriteUnpreparedTxn;
friend class TransactionTest_DoubleCrashInRecovery_Test;
friend class TransactionTest_DoubleEmptyWrite_Test;
friend class TransactionTest_DuplicateKeys_Test;
friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test;
friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test;
friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;
friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test;
friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
TransactionLockMgr lock_mgr_;

@ -6116,6 +6116,92 @@ TEST_P(TransactionTest, ReseekOptimization) {
delete txn0;
}
// After recovery in kPointInTimeRecovery mode, the corrupted log file remains
// there. The new log files should be still read succesfully during recovery of
// the 2nd crash.
TEST_P(TransactionTest, DoubleCrashInRecovery) {
for (const bool write_after_recovery : {false, true}) {
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
ReOpen();
std::string cf_name = "two";
ColumnFamilyOptions cf_options;
ColumnFamilyHandle* cf_handle = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
// Add a prepare entry to prevent the older logs from being deleted.
WriteOptions write_options;
TransactionOptions txn_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put(Slice("foo-prepare"), Slice("bar-prepare")));
ASSERT_OK(txn->Prepare());
FlushOptions flush_ops;
db->Flush(flush_ops);
// Now we have a log that cannot be deleted
ASSERT_OK(db->Put(write_options, cf_handle, "foo1", "bar1"));
// Flush only the 2nd cf
db->Flush(flush_ops, cf_handle);
// The value is large enough to be touched by the corruption we ingest
// below.
std::string large_value(400, ' ');
// key/value not touched by corruption
ASSERT_OK(db->Put(write_options, "foo2", "bar2"));
// key/value touched by corruption
ASSERT_OK(db->Put(write_options, "foo3", large_value));
// key/value not touched by corruption
ASSERT_OK(db->Put(write_options, "foo4", "bar4"));
db->FlushWAL(true);
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
uint64_t wal_file_id = db_impl->TEST_LogfileNumber();
std::string fname = LogFileName(dbname, wal_file_id);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
delete txn;
delete cf_handle;
delete db;
db = nullptr;
// Corrupt the last log file in the middle, so that it is not corrupted
// in the tail.
std::string file_content;
ASSERT_OK(ReadFileToString(env, fname, &file_content));
file_content[400] = 'h';
file_content[401] = 'a';
ASSERT_OK(env->DeleteFile(fname));
ASSERT_OK(WriteStringToFile(env, file_content, fname));
// Recover from corruption
std::vector<ColumnFamilyHandle*> handles;
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(ColumnFamilyDescriptor(kDefaultColumnFamilyName,
ColumnFamilyOptions()));
column_families.push_back(
ColumnFamilyDescriptor("two", ColumnFamilyOptions()));
ASSERT_OK(ReOpenNoDelete(column_families, &handles));
if (write_after_recovery) {
// Write data to the log right after the corrupted log
ASSERT_OK(db->Put(write_options, "foo5", large_value));
}
// Persist data written to WAL during recovery or by the last Put
db->FlushWAL(true);
// 2nd crash to recover while having a valid log after the corrupted one.
ASSERT_OK(ReOpenNoDelete(column_families, &handles));
assert(db != nullptr);
txn = db->GetTransactionByName("xid");
ASSERT_TRUE(txn != nullptr);
ASSERT_OK(txn->Commit());
delete txn;
for (auto handle : handles) {
delete handle;
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -129,7 +129,7 @@ class TransactionTestBase : public ::testing::Test {
} else {
s = OpenWithStackableDB(cfs, handles);
}
assert(db != nullptr);
assert(!s.ok() || db != nullptr);
return s;
}

Loading…
Cancel
Save