From ae82d91492a900d0b6ee5bbf06fad5cfcc8c7316 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Mon, 11 Apr 2022 15:39:31 -0700 Subject: [PATCH] Remove corrupted WAL files in kPointRecoveryMode with avoid_flush_duing_recovery set true (#9634) Summary: 1) In case of non-TransactionDB and avoid_flush_during_recovery = true, RocksDB won't flush the data from WAL to L0 for all column families if possible. As a result, not all column families can increase their log_numbers, and min_log_number_to_keep won't change. 2) For transaction DB (.allow_2pc), even with the flush, there may be old WAL files that it must not delete because they can contain data of uncommitted transactions and min_log_number_to_keep won't change. If we persist a new MANIFEST with advanced log_numbers for some column families, then during a second crash after persisting the MANIFEST, RocksDB will see some column families' log_numbers larger than the corrupted wal, and the "column family inconsistency" error will be hit, causing recovery to fail. As a solution, 1. the corrupted WALs whose numbers are larger than the corrupted wal and smaller than the new WAL will be moved to archive folder. 2. Currently, RocksDB DB::Open() may creates and writes to two new MANIFEST files even before recovery succeeds. This PR buffers the edits in a structure and writes to a new MANIFEST after recovery is successful Pull Request resolved: https://github.com/facebook/rocksdb/pull/9634 Test Plan: 1. Added new unit tests 2. make crast_test -j Reviewed By: riversand963 Differential Revision: D34463666 Pulled By: akankshamahajan15 fbshipit-source-id: e233d3af0ed4e2028ca0cf051e5a334a0fdc9d19 --- HISTORY.md | 2 + db/corruption_test.cc | 344 ++++++++++++++++++ db/db_basic_test.cc | 5 +- db/db_impl/db_impl.h | 69 +++- db/db_impl/db_impl_files.cc | 36 +- db/db_impl/db_impl_open.cc | 229 ++++++++---- db/db_impl/db_impl_secondary.cc | 3 +- db/db_impl/db_impl_secondary.h | 4 +- db/db_wal_test.cc | 1 - .../test/java/org/rocksdb/RocksDBTest.java | 2 +- monitoring/stats_history_test.cc | 10 +- 11 files changed, 590 insertions(+), 115 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 1521ee101..052bd4835 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,8 @@ * Fixed a bug affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#9766). * Fix segfault in FilePrefetchBuffer with async_io as it doesn't wait for pending jobs to complete on destruction. * Fix ERROR_HANDLER_AUTORESUME_RETRY_COUNT stat whose value was set wrong in portal.h +* Fixed a bug for non-TransactionDB with avoid_flush_during_recovery = true and TransactionDB where in case of crash, min_log_number_to_keep may not change on recovery and persisting a new MANIFEST with advanced log_numbers for some column families, results in "column family inconsistency" error on second recovery. As a solution the corrupted WALs whose numbers are larger than the corrupted wal and smaller than the new WAL will be moved to archive folder. +* Fixed a bug in RocksDB DB::Open() which may creates and writes to two new MANIFEST files even before recovery succeeds. Now writes to MANIFEST are persisted only after recovery is successful. ### New Features * For db_bench when --seed=0 or --seed is not set then it uses the current time as the seed value. Previously it used the value 1000. diff --git a/db/corruption_test.cc b/db/corruption_test.cc index dbd9ca919..7cd5ad7e2 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -26,6 +26,7 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/table.h" +#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/write_batch.h" #include "table/block_based/block_based_table_builder.h" #include "table/meta_blocks.h" @@ -275,6 +276,42 @@ class CorruptionTest : public testing::Test { } return Slice(*storage); } + + void GetSortedWalFiles(std::vector& file_nums) { + std::vector tmp_files; + ASSERT_OK(env_->GetChildren(dbname_, &tmp_files)); + FileType type = kWalFile; + for (const auto& file : tmp_files) { + uint64_t number = 0; + if (ParseFileName(file, &number, &type) && type == kWalFile) { + file_nums.push_back(number); + } + } + std::sort(file_nums.begin(), file_nums.end()); + } + + void CorruptFileWithTruncation(FileType file, uint64_t number, + uint64_t bytes_to_truncate = 0) { + std::string path; + switch (file) { + case FileType::kWalFile: + path = LogFileName(dbname_, number); + break; + // TODO: Add other file types as this method is being used for those file + // types. + default: + return; + } + uint64_t old_size = 0; + ASSERT_OK(env_->GetFileSize(path, &old_size)); + assert(old_size > bytes_to_truncate); + uint64_t new_size = old_size - bytes_to_truncate; + // If bytes_to_truncate == 0, it will do full truncation. + if (bytes_to_truncate == 0) { + new_size = old_size; + } + ASSERT_OK(test::TruncateFile(env_, path, new_size)); + } }; TEST_F(CorruptionTest, Recovery) { @@ -912,6 +949,313 @@ TEST_F(CorruptionTest, VerifyWholeTableChecksum) { ASSERT_EQ(1, count); } +class CrashDuringRecoveryWithCorruptionTest + : public CorruptionTest, + public testing::WithParamInterface> { + public: + explicit CrashDuringRecoveryWithCorruptionTest() + : CorruptionTest(), + avoid_flush_during_recovery_(std::get<0>(GetParam())), + track_and_verify_wals_in_manifest_(std::get<1>(GetParam())) {} + + protected: + const bool avoid_flush_during_recovery_; + const bool track_and_verify_wals_in_manifest_; +}; + +INSTANTIATE_TEST_CASE_P(CorruptionTest, CrashDuringRecoveryWithCorruptionTest, + ::testing::Values(std::make_tuple(true, false), + std::make_tuple(false, false), + std::make_tuple(true, true), + std::make_tuple(false, true))); + +// In case of non-TransactionDB with avoid_flush_during_recovery = true, RocksDB +// won't flush the data from WAL to L0 for all column families if possible. As a +// result, not all column families can increase their log_numbers, and +// min_log_number_to_keep won't change. +// It may prematurely persist a new MANIFEST even before we can declare the DB +// is in consistent state after recovery (this is when the new WAL is synced) +// and advances log_numbers for some column families. +// +// If there is power failure before we sync the new WAL, we will end up in +// a situation in which after persisting the MANIFEST, RocksDB will see some +// column families' log_numbers larger than the corrupted wal, and +// "Column family inconsistency: SST file contains data beyond the point of +// corruption" error will be hit, causing recovery to fail. +// +// After adding the fix, corrupted WALs whose numbers are larger than the +// corrupted wal and smaller than the new WAL are moved to a separate folder. +// Only after new WAL is synced, RocksDB persist a new MANIFEST with column +// families to ensure RocksDB is in consistent state. +// RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is +// synced immediately afterwards. The sequence number of the sentinel +// WriteBatch will be the next sequence number immediately after the largest +// sequence number recovered from previous WALs and MANIFEST because of which DB +// will be in consistent state. +TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecovery) { + CloseDb(); + Options options; + options.track_and_verify_wals_in_manifest = + track_and_verify_wals_in_manifest_; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + options.avoid_flush_during_recovery = false; + options.env = env_; + ASSERT_OK(DestroyDB(dbname_, options)); + options.create_if_missing = true; + options.max_write_buffer_number = 3; + + Reopen(&options); + Status s; + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + s = db_->CreateColumnFamily(options, test_cf_name, &cfh); + ASSERT_OK(s); + delete cfh; + CloseDb(); + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, options); + std::vector handles; + + // 1. Open and populate the DB. Write and flush default_cf several times to + // advance wal number so that some column families have advanced log_number + // while other don't. + { + ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_)); + auto* dbimpl = static_cast_with_check(db_); + assert(dbimpl); + + // Write one key to test_cf. + ASSERT_OK(db_->Put(WriteOptions(), handles[1], "old_key", "dontcare")); + // Write to default_cf and flush this cf several times to advance wal + // number. + for (int i = 0; i < 2; ++i) { + ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i), "value")); + ASSERT_OK(dbimpl->TEST_SwitchMemtable()); + } + ASSERT_OK(db_->Put(WriteOptions(), handles[1], "dontcare", "dontcare")); + + for (auto* h : handles) { + delete h; + } + handles.clear(); + CloseDb(); + } + + // 2. Corrupt second last wal file to emulate power reset which caused the DB + // to lose the un-synced WAL. + { + std::vector file_nums; + GetSortedWalFiles(file_nums); + size_t size = file_nums.size(); + uint64_t log_num = file_nums[size - 2]; + CorruptFileWithTruncation(FileType::kWalFile, log_num, + /*bytes_to_truncate=*/8); + } + + // 3. After first crash reopen the DB which contains corrupted WAL. Default + // family has higher log number than corrupted wal number. + // + // Case1: If avoid_flush_during_recovery = true, RocksDB won't flush the data + // from WAL to L0 for all column families (test_cf_name in this case). As a + // result, not all column families can increase their log_numbers, and + // min_log_number_to_keep won't change. + // + // Case2: If avoid_flush_during_recovery = false, all column families have + // flushed their data from WAL to L0 during recovery, and none of them will + // ever need to read the WALs again. + { + options.avoid_flush_during_recovery = avoid_flush_during_recovery_; + s = DB::Open(options, dbname_, cf_descs, &handles, &db_); + ASSERT_OK(s); + for (auto* h : handles) { + delete h; + } + handles.clear(); + CloseDb(); + } + + // 4. Corrupt max_wal_num to emulate second power reset which caused the + // DB to again lose the un-synced WAL. + { + std::vector file_nums; + GetSortedWalFiles(file_nums); + size_t size = file_nums.size(); + uint64_t log_num = file_nums[size - 1]; + CorruptFileWithTruncation(FileType::kWalFile, log_num); + } + + // 5. After second crash reopen the db with second corruption. Default family + // has higher log number than corrupted wal number. + // + // Case1: If avoid_flush_during_recovery = true, we persist a new + // MANIFEST with advanced log_numbers for some column families only after + // syncing the WAL. So during second crash, RocksDB will skip the corrupted + // WAL files as they have been moved to different folder. Since newly synced + // WAL file's sequence number (sentinel WriteBatch) will be the next + // sequence number immediately after the largest sequence number recovered + // from previous WALs and MANIFEST, db will be in consistent state and opens + // successfully. + // + // Case2: If avoid_flush_during_recovery = false, the corrupted WAL is below + // this number. So during a second crash after persisting the new MANIFEST, + // RocksDB will skip the corrupted WAL(s) because they are all below this + // bound. Therefore, we won't hit the "column family inconsistency" error + // message. + { + options.avoid_flush_during_recovery = avoid_flush_during_recovery_; + ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_)); + for (auto* h : handles) { + delete h; + } + handles.clear(); + CloseDb(); + } +} + +// In case of TransactionDB, it enables two-phase-commit. The prepare section of +// an uncommitted transaction always need to be kept. Even if we perform flush +// during recovery, we may still need to hold an old WAL. The +// min_log_number_to_keep won't change, and "Column family inconsistency: SST +// file contains data beyond the point of corruption" error will be hit, causing +// recovery to fail. +// +// After adding the fix, corrupted WALs whose numbers are larger than the +// corrupted wal and smaller than the new WAL are moved to a separate folder. +// Only after new WAL is synced, RocksDB persist a new MANIFEST with column +// families to ensure RocksDB is in consistent state. +// RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is +// synced immediately afterwards. The sequence number of the sentinel +// WriteBatch will be the next sequence number immediately after the largest +// sequence number recovered from previous WALs and MANIFEST because of which DB +// will be in consistent state. +TEST_P(CrashDuringRecoveryWithCorruptionTest, TxnDbCrashDuringRecovery) { + CloseDb(); + Options options; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + options.track_and_verify_wals_in_manifest = + track_and_verify_wals_in_manifest_; + options.avoid_flush_during_recovery = false; + options.env = env_; + ASSERT_OK(DestroyDB(dbname_, options)); + options.create_if_missing = true; + options.max_write_buffer_number = 3; + Reopen(&options); + + // Create cf test_cf_name. + ColumnFamilyHandle* cfh = nullptr; + const std::string test_cf_name = "test_cf"; + Status s = db_->CreateColumnFamily(options, test_cf_name, &cfh); + ASSERT_OK(s); + delete cfh; + CloseDb(); + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, options); + std::vector handles; + + TransactionDB* txn_db = nullptr; + TransactionDBOptions txn_db_opts; + + // 1. Open and populate the DB. Write and flush default_cf several times to + // advance wal number so that some column families have advanced log_number + // while other don't. + { + options.avoid_flush_during_recovery = avoid_flush_during_recovery_; + ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs, + &handles, &txn_db)); + + auto* txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions()); + // Put cf1 + ASSERT_OK(txn->Put(handles[1], "foo", "value")); + ASSERT_OK(txn->SetName("txn0")); + ASSERT_OK(txn->Prepare()); + delete txn; + txn = nullptr; + + auto* dbimpl = static_cast_with_check(txn_db->GetRootDB()); + assert(dbimpl); + + // Put and flush cf0 + for (int i = 0; i < 2; ++i) { + ASSERT_OK(txn_db->Put(WriteOptions(), "dontcare", "value")); + ASSERT_OK(dbimpl->TEST_SwitchMemtable()); + } + + // Put cf1 + txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions()); + ASSERT_OK(txn->Put(handles[1], "foo1", "value")); + ASSERT_OK(txn->Commit()); + + delete txn; + txn = nullptr; + for (auto* h : handles) { + delete h; + } + handles.clear(); + delete txn_db; + } + + // 2. Corrupt second last wal to emulate power reset which caused the DB to + // lose the un-synced WAL. + { + std::vector file_nums; + GetSortedWalFiles(file_nums); + size_t size = file_nums.size(); + uint64_t log_num = file_nums[size - 2]; + CorruptFileWithTruncation(FileType::kWalFile, log_num, + /*bytes_to_truncate=*/8); + } + + // 3. After first crash reopen the DB which contains corrupted WAL. Default + // family has higher log number than corrupted wal number. There may be old + // WAL files that it must not delete because they can contain data of + // uncommitted transactions. As a result, min_log_number_to_keep won't change. + { + options.avoid_flush_during_recovery = avoid_flush_during_recovery_; + ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs, + &handles, &txn_db)); + + for (auto* h : handles) { + delete h; + } + handles.clear(); + delete txn_db; + } + + // 4. Corrupt max_wal_num to emulate second power reset which caused the + // DB to again lose the un-synced WAL. + { + std::vector file_nums; + GetSortedWalFiles(file_nums); + size_t size = file_nums.size(); + uint64_t log_num = file_nums[size - 1]; + CorruptFileWithTruncation(FileType::kWalFile, log_num); + } + + // 5. After second crash reopen the db with second corruption. Default family + // has higher log number than corrupted wal number. + // We persist a new MANIFEST with advanced log_numbers for some column + // families only after syncing the WAL. So during second crash, RocksDB will + // skip the corrupted WAL files as they have been moved to different folder. + // Since newly synced WAL file's sequence number (sentinel WriteBatch) will be + // the next sequence number immediately after the largest sequence number + // recovered from previous WALs and MANIFEST, db will be in consistent state + // and opens successfully. + { + options.avoid_flush_during_recovery = false; + + ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs, + &handles, &txn_db)); + for (auto* h : handles) { + delete h; + } + delete txn_db; + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index e54e485dd..3833066dd 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -175,7 +175,10 @@ TEST_F(DBBasicTest, ReadOnlyDB) { ASSERT_TRUE(db_->SyncWAL().IsNotSupported()); } -TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) { +// TODO akanksha: Update the test to check that combination +// does not actually write to FS (use open read-only with +// CompositeEnvWrapper+ReadOnlyFileSystem). +TEST_F(DBBasicTest, DISABLED_ReadOnlyDBWithWriteDBIdToManifestSet) { ASSERT_OK(Put("foo", "v1")); ASSERT_OK(Put("bar", "v2")); ASSERT_OK(Put("foo", "v3")); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 97e3d1b8a..77987ab41 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1240,6 +1240,43 @@ class DBImpl : public DB { std::atomic shutting_down_; + // RecoveryContext struct stores the context about version edits along + // with corresponding column_family_data and column_family_options. + class RecoveryContext { + public: + ~RecoveryContext() { + for (auto& edit_list : edit_lists_) { + for (auto* edit : edit_list) { + delete edit; + } + edit_list.clear(); + } + cfds_.clear(); + mutable_cf_opts_.clear(); + edit_lists_.clear(); + files_to_delete_.clear(); + } + + void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) { + if (map_.find(cfd->GetID()) == map_.end()) { + uint32_t size = static_cast(map_.size()); + map_.emplace(cfd->GetID(), size); + cfds_.emplace_back(cfd); + mutable_cf_opts_.emplace_back(cfd->GetLatestMutableCFOptions()); + edit_lists_.emplace_back(autovector()); + } + uint32_t i = map_[cfd->GetID()]; + edit_lists_[i].emplace_back(new VersionEdit(edit)); + } + + std::unordered_map map_; // cf_id to index; + autovector cfds_; + autovector mutable_cf_opts_; + autovector> edit_lists_; + // files_to_delete_ contains sst files + std::set files_to_delete_; + }; + // Except in DB::Open(), WriteOptionsFile can only be called when: // Persist options to options file. // If need_mutex_lock = false, the method will lock DB mutex. @@ -1356,16 +1393,19 @@ class DBImpl : public DB { // be made to the descriptor are added to *edit. // recovered_seq is set to less than kMaxSequenceNumber if the log's tail is // skipped. + // recovery_ctx stores the context about version edits and all those + // edits are persisted to new Manifest after successfully syncing the new WAL. virtual Status Recover( const std::vector& column_families, bool read_only = false, bool error_if_wal_file_exists = false, bool error_if_data_exists_in_wals = false, - uint64_t* recovered_seq = nullptr); + uint64_t* recovered_seq = nullptr, + RecoveryContext* recovery_ctx = nullptr); virtual bool OwnTablesAndLogs() const { return true; } // Set DB identity file, and write DB ID to manifest if necessary. - Status SetDBId(bool read_only); + Status SetDBId(bool read_only, RecoveryContext* recovery_ctx); // REQUIRES: db mutex held when calling this function, but the db mutex can // be released and re-acquired. Db mutex will be held when the function @@ -1374,12 +1414,15 @@ class DBImpl : public DB { // not referenced in the MANIFEST (e.g. // 1. It's best effort recovery; // 2. The VersionEdits referencing the SST files are appended to - // MANIFEST, DB crashes when syncing the MANIFEST, the VersionEdits are + // RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are // still not synced to MANIFEST during recovery.) - // We delete these SST files. In the + // It stores the SST files to be deleted in RecoveryContext. In the // meantime, we find out the largest file number present in the paths, and // bump up the version set's next_file_number_ to be 1 + largest_file_number. - Status DeleteUnreferencedSstFiles(); + // recovery_ctx stores the context about version edits and files to be + // deleted. All those edits are persisted to new Manifest after successfully + // syncing the new WAL. + Status DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx); // SetDbSessionId() should be called in the constuctor DBImpl() // to ensure that db_session_id_ gets updated every time the DB is opened @@ -1389,6 +1432,11 @@ class DBImpl : public DB { Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family, const Slice& ts) const; + // recovery_ctx stores the context about version edits and + // LogAndApplyForRecovery persist all those edits to new Manifest after + // successfully syncing new WAL. + Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx); + private: friend class DB; friend class ErrorHandler; @@ -1643,9 +1691,10 @@ class DBImpl : public DB { // REQUIRES: log_numbers are sorted in ascending order // corrupted_log_found is set to true if we recover from a corrupted log file. - Status RecoverLogFiles(const std::vector& log_numbers, + Status RecoverLogFiles(std::vector& log_numbers, SequenceNumber* next_sequence, bool read_only, - bool* corrupted_log_found); + bool* corrupted_log_found, + RecoveryContext* recovery_ctx); // The following two methods are used to flush a memtable to // storage. The first one is used at database RecoveryTime (when the @@ -1655,6 +1704,12 @@ class DBImpl : public DB { Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit); + // Move all the WAL files starting from corrupted WAL found to + // max_wal_number to avoid column family inconsistency error on recovery. It + // also removes the deleted file from the vector wal_numbers. + void MoveCorruptedWalFiles(std::vector& wal_numbers, + uint64_t corrupted_wal_number); + // Get the size of a log file and, if truncate is true, truncate the // log file to its actual size, thereby freeing preallocated space. // Return success even if truncate fails diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 1790ed836..0d3a3bea7 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -863,7 +863,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC( return min_log_number_to_keep; } -Status DBImpl::SetDBId(bool read_only) { +Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) { Status s; // Happens when immutable_db_options_.write_dbid_to_manifest is set to true // the very first time. @@ -890,14 +890,14 @@ Status DBImpl::SetDBId(bool read_only) { } s = GetDbIdentityFromIdentityFile(&db_id_); if (immutable_db_options_.write_dbid_to_manifest && s.ok()) { + assert(!read_only); + assert(recovery_ctx != nullptr); + assert(versions_->GetColumnFamilySet() != nullptr); VersionEdit edit; edit.SetDBId(db_id_); - Options options; - MutableCFOptions mutable_cf_options(options); versions_->db_id_ = db_id_; - s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), - mutable_cf_options, &edit, &mutex_, nullptr, - /* new_descriptor_log */ false); + recovery_ctx->UpdateVersionEdits( + versions_->GetColumnFamilySet()->GetDefault(), edit); } } else if (!read_only) { s = SetIdentityFile(env_, dbname_, db_id_); @@ -905,7 +905,7 @@ Status DBImpl::SetDBId(bool read_only) { return s; } -Status DBImpl::DeleteUnreferencedSstFiles() { +Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) { mutex_.AssertHeld(); std::vector paths; paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator))); @@ -925,7 +925,6 @@ Status DBImpl::DeleteUnreferencedSstFiles() { uint64_t next_file_number = versions_->current_next_file_number(); uint64_t largest_file_number = next_file_number; - std::set files_to_delete; Status s; for (const auto& path : paths) { std::vector files; @@ -943,8 +942,9 @@ Status DBImpl::DeleteUnreferencedSstFiles() { const std::string normalized_fpath = path + fname; largest_file_number = std::max(largest_file_number, number); if (type == kTableFile && number >= next_file_number && - files_to_delete.find(normalized_fpath) == files_to_delete.end()) { - files_to_delete.insert(normalized_fpath); + recovery_ctx->files_to_delete_.find(normalized_fpath) == + recovery_ctx->files_to_delete_.end()) { + recovery_ctx->files_to_delete_.insert(normalized_fpath); } } } @@ -961,21 +961,7 @@ Status DBImpl::DeleteUnreferencedSstFiles() { assert(versions_->GetColumnFamilySet()); ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault(); assert(default_cfd); - s = versions_->LogAndApply( - default_cfd, *default_cfd->GetLatestMutableCFOptions(), &edit, &mutex_, - directories_.GetDbDir(), /*new_descriptor_log*/ false); - if (!s.ok()) { - return s; - } - - mutex_.Unlock(); - for (const auto& fname : files_to_delete) { - s = env_->DeleteFile(fname); - if (!s.ok()) { - break; - } - } - mutex_.Lock(); + recovery_ctx->UpdateVersionEdits(default_cfd, edit); return s; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 9a402b6dc..7bad3cad7 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -399,7 +399,7 @@ IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname, Status DBImpl::Recover( const std::vector& column_families, bool read_only, bool error_if_wal_file_exists, bool error_if_data_exists_in_wals, - uint64_t* recovered_seq) { + uint64_t* recovered_seq, RecoveryContext* recovery_ctx) { mutex_.AssertHeld(); bool is_new_db = false; @@ -518,9 +518,10 @@ Status DBImpl::Recover( if (!s.ok()) { return s; } - s = SetDBId(read_only); + + s = SetDBId(read_only, recovery_ctx); if (s.ok() && !read_only) { - s = DeleteUnreferencedSstFiles(); + s = DeleteUnreferencedSstFiles(recovery_ctx); } if (immutable_db_options_.paranoid_checks && s.ok()) { @@ -535,10 +536,6 @@ Status DBImpl::Recover( } } } - // DB mutex is already held - if (s.ok() && immutable_db_options_.persist_stats_to_disk) { - s = InitPersistStatsColumnFamily(); - } std::vector files_in_wal_dir; if (s.ok()) { @@ -608,7 +605,10 @@ Status DBImpl::Recover( WalNumber max_wal_number = versions_->GetWalSet().GetWals().rbegin()->first; edit.DeleteWalsBefore(max_wal_number + 1); - s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_); + assert(recovery_ctx != nullptr); + assert(versions_->GetColumnFamilySet() != nullptr); + recovery_ctx->UpdateVersionEdits( + versions_->GetColumnFamilySet()->GetDefault(), edit); } if (!s.ok()) { return s; @@ -644,8 +644,8 @@ Status DBImpl::Recover( std::sort(wals.begin(), wals.end()); bool corrupted_wal_found = false; - s = RecoverLogFiles(wals, &next_sequence, read_only, - &corrupted_wal_found); + s = RecoverLogFiles(wals, &next_sequence, read_only, &corrupted_wal_found, + recovery_ctx); if (corrupted_wal_found && recovered_seq != nullptr) { *recovered_seq = next_sequence; } @@ -805,10 +805,30 @@ Status DBImpl::InitPersistStatsColumnFamily() { return s; } +Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) { + mutex_.AssertHeld(); + assert(versions_->descriptor_log_ == nullptr); + Status s = versions_->LogAndApply( + recovery_ctx.cfds_, recovery_ctx.mutable_cf_opts_, + recovery_ctx.edit_lists_, &mutex_, directories_.GetDbDir()); + if (s.ok() && !(recovery_ctx.files_to_delete_.empty())) { + mutex_.Unlock(); + for (const auto& fname : recovery_ctx.files_to_delete_) { + s = env_->DeleteFile(fname); + if (!s.ok()) { + break; + } + } + mutex_.Lock(); + } + return s; +} + // REQUIRES: wal_numbers are sorted in ascending order -Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, +Status DBImpl::RecoverLogFiles(std::vector& wal_numbers, SequenceNumber* next_sequence, bool read_only, - bool* corrupted_wal_found) { + bool* corrupted_wal_found, + RecoveryContext* recovery_ctx) { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; @@ -833,6 +853,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, edit.SetColumnFamily(cfd->GetID()); version_edits.insert({cfd->GetID(), edit}); } + int job_id = next_job_id_.fetch_add(1); { auto stream = event_logger_.Log(); @@ -1256,6 +1277,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, edit->SetLogNumber(max_wal_number + 1); } } + if (status.ok()) { // we must mark the next log number as used, even though it's // not actually used. that is because VersionSet assumes @@ -1263,42 +1285,40 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, // log number versions_->MarkFileNumberUsed(max_wal_number + 1); - autovector cfds; - autovector cf_opts; - autovector> edit_lists; + if (corrupted_wal_found != nullptr && *corrupted_wal_found == true && + immutable_db_options_.wal_recovery_mode == + WALRecoveryMode::kPointInTimeRecovery) { + MoveCorruptedWalFiles(wal_numbers, corrupted_wal_number); + } + + assert(recovery_ctx != nullptr); for (auto* cfd : *versions_->GetColumnFamilySet()) { - cfds.push_back(cfd); - cf_opts.push_back(cfd->GetLatestMutableCFOptions()); auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); - edit_lists.push_back({&iter->second}); + recovery_ctx->UpdateVersionEdits(cfd, iter->second); } - std::unique_ptr wal_deletion; if (flushed) { - wal_deletion = std::make_unique(); + VersionEdit wal_deletion; if (immutable_db_options_.track_and_verify_wals_in_manifest) { - wal_deletion->DeleteWalsBefore(max_wal_number + 1); + wal_deletion.DeleteWalsBefore(max_wal_number + 1); } if (!allow_2pc()) { // In non-2pc mode, flushing the memtables of the column families // means we can advance min_log_number_to_keep. - wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1); + wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1); } - edit_lists.back().push_back(wal_deletion.get()); + assert(versions_->GetColumnFamilySet() != nullptr); + recovery_ctx->UpdateVersionEdits( + versions_->GetColumnFamilySet()->GetDefault(), wal_deletion); } - - // write MANIFEST with update - status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_, - directories_.GetDbDir(), - /*new_descriptor_log=*/true); } } if (status.ok()) { if (data_seen && !flushed) { status = RestoreAliveLogFiles(wal_numbers); - } else { + } else if (!wal_numbers.empty()) { // If there's no data in the WAL, or we flushed all the data, still // truncate the log file. If the process goes into a crash loop before // the file is deleted, the preallocated space will never get freed. @@ -1314,6 +1334,48 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, return status; } +void DBImpl::MoveCorruptedWalFiles(std::vector& wal_numbers, + uint64_t corrupted_wal_number) { + size_t num_wals = wal_numbers.size(); + // Find the first corrupted wal. + auto iter = std::lower_bound(wal_numbers.begin(), wal_numbers.end(), + corrupted_wal_number); + auto corrupt_start_iter = iter; + + // Increment iter to move WAL files from first corrupted_wal_number + 1. + iter++; + + std::string archival_path = + ArchivalDirectory(immutable_db_options_.GetWalDir()); + Status create_status = env_->CreateDirIfMissing(archival_path); + + // create_status is only checked when it needs to move the corrupted WAL files + // to archive folder. + create_status.PermitUncheckedError(); + + // Truncate the last WAL to reclaim the pre allocated space before + // moving it. + GetLogSizeAndMaybeTruncate(wal_numbers.back(), /*truncate=*/true, nullptr) + .PermitUncheckedError(); + + // Move all the WAL files from corrupted_wal_number + 1 to last WAL + // (max_wal_number) to avoid column family inconsistency error to archival + // directory. If its unable to create archive dir, it will delete the + // corrupted WAL files. + // We are moving all but first corrupted WAL file to a different folder. + while (iter != wal_numbers.end()) { + LogFileNumberSize log(*iter); + std::string fname = LogFileName(immutable_db_options_.GetWalDir(), *iter); +#ifndef ROCKSDB_LITE + if (create_status.ok()) { + wal_manager_.ArchiveWALFile(fname, *iter); + } +#endif + iter++; + } + wal_numbers.erase(corrupt_start_iter + 1, wal_numbers.begin() + num_wals); +} + Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate, LogFileNumberSize* log_ptr) { LogFileNumberSize log(wal_number); @@ -1376,7 +1438,8 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { // log has such preallocated space, so we only truncate for the last log. LogFileNumberSize log; s = GetLogSizeAndMaybeTruncate( - wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log); + wal_number, + /*truncate=*/(wal_number == wal_numbers.back()), &log); if (!s.ok()) { break; } @@ -1737,9 +1800,13 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath(); impl->mutex_.Lock(); + + RecoveryContext recovery_ctx; + // Handles create_if_missing, error_if_exists uint64_t recovered_seq(kMaxSequenceNumber); - s = impl->Recover(column_families, false, false, false, &recovered_seq); + s = impl->Recover(column_families, false, false, false, &recovered_seq, + &recovery_ctx); if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); log::Writer* new_log = nullptr; @@ -1756,40 +1823,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { - // set column family handles - for (auto cf : column_families) { - auto cfd = - impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); - if (cfd != nullptr) { - handles->push_back( - new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); - impl->NewThreadStatusCfInfo(cfd); - } else { - if (db_options.create_missing_column_families) { - // missing column family, create it - ColumnFamilyHandle* handle; - impl->mutex_.Unlock(); - s = impl->CreateColumnFamily(cf.options, cf.name, &handle); - impl->mutex_.Lock(); - if (s.ok()) { - handles->push_back(handle); - } else { - break; - } - } else { - s = Status::InvalidArgument("Column family not found", cf.name); - break; - } - } - } - } - if (s.ok()) { - SuperVersionContext sv_context(/* create_superversion */ true); - for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - impl->InstallSuperVersionAndScheduleWork( - cfd, &sv_context, *cfd->GetLatestMutableCFOptions()); - } - sv_context.Clean(); if (impl->two_write_queues_) { impl->log_write_mutex_.Lock(); } @@ -1802,14 +1835,15 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { // In WritePrepared there could be gap in sequence numbers. This breaks - // the trick we use in kPointInTimeRecovery which assumes the first seq in - // the log right after the corrupted log is one larger than the last seq - // we read from the wals. To let this trick keep working, we add a dummy - // entry with the expected sequence to the first log right after recovery. - // In non-WritePrepared case also the new log after recovery could be - // empty, and thus missing the consecutive seq hint to distinguish - // middle-log corruption to corrupted-log-remained-after-recovery. This - // case also will be addressed by a dummy write. + // the trick we use in kPointInTimeRecovery which assumes the first seq + // in the log right after the corrupted log is one larger than the last + // seq we read from the wals. To let this trick keep working, we add a + // dummy entry with the expected sequence to the first log right after + // recovery. In non-WritePrepared case also the new log after recovery + // could be empty, and thus missing the consecutive seq hint to + // distinguish middle-log corruption to + // corrupted-log-remained-after-recovery. This case also will be + // addressed by a dummy write. if (recovered_seq != kMaxSequenceNumber) { WriteBatch empty_batch; WriteBatchInternal::SetSequence(&empty_batch, recovered_seq); @@ -1828,6 +1862,52 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } } } + if (s.ok()) { + s = impl->LogAndApplyForRecovery(recovery_ctx); + } + + if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) { + impl->mutex_.AssertHeld(); + s = impl->InitPersistStatsColumnFamily(); + } + + if (s.ok()) { + // set column family handles + for (auto cf : column_families) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (cfd != nullptr) { + handles->push_back( + new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); + impl->NewThreadStatusCfInfo(cfd); + } else { + if (db_options.create_missing_column_families) { + // missing column family, create it + ColumnFamilyHandle* handle; + impl->mutex_.Unlock(); + s = impl->CreateColumnFamily(cf.options, cf.name, &handle); + impl->mutex_.Lock(); + if (s.ok()) { + handles->push_back(handle); + } else { + break; + } + } else { + s = Status::InvalidArgument("Column family not found", cf.name); + break; + } + } + } + } + + if (s.ok()) { + SuperVersionContext sv_context(/* create_superversion */ true); + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + impl->InstallSuperVersionAndScheduleWork( + cfd, &sv_context, *cfd->GetLatestMutableCFOptions()); + } + sv_context.Clean(); + } if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) { // try to read format version s = impl->PersistentStatsProcessFormatVersion(); @@ -1853,7 +1933,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, if (cfd->ioptions()->merge_operator != nullptr && !cfd->mem()->IsMergeOperatorSupported()) { s = Status::InvalidArgument( - "The memtable of column family %s does not support merge operator " + "The memtable of column family %s does not support merge " + "operator " "its options.merge_operator is non-null", cfd->GetName().c_str()); } diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 35d51721f..37b2955b3 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -33,7 +33,8 @@ DBImplSecondary::~DBImplSecondary() {} Status DBImplSecondary::Recover( const std::vector& column_families, bool /*readonly*/, bool /*error_if_wal_file_exists*/, - bool /*error_if_data_exists_in_wals*/, uint64_t*) { + bool /*error_if_data_exists_in_wals*/, uint64_t*, + RecoveryContext* /*recovery_ctx*/) { mutex_.AssertHeld(); JobContext job_context(0); diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index 1e5af2e13..5e5d03b67 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -81,8 +81,8 @@ class DBImplSecondary : public DBImpl { // and log_readers_ to facilitate future operations. Status Recover(const std::vector& column_families, bool read_only, bool error_if_wal_file_exists, - bool error_if_data_exists_in_wals, - uint64_t* = nullptr) override; + bool error_if_data_exists_in_wals, uint64_t* = nullptr, + RecoveryContext* recovery_ctx = nullptr) override; // Implementations of the DB interface using DB::Get; diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index f2b94ed07..9a953a178 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -287,7 +287,6 @@ TEST_F(DBWALTest, Recover) { ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); ASSERT_EQ("v1", Get(1, "foo")); - ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v5", Get(1, "baz")); ASSERT_OK(Put(1, "bar", "v2")); diff --git a/java/src/test/java/org/rocksdb/RocksDBTest.java b/java/src/test/java/org/rocksdb/RocksDBTest.java index 4193bcb44..422bed40c 100644 --- a/java/src/test/java/org/rocksdb/RocksDBTest.java +++ b/java/src/test/java/org/rocksdb/RocksDBTest.java @@ -1428,7 +1428,7 @@ public class RocksDBTest { assertThat(livefiles.manifestFileSize).isEqualTo(59); assertThat(livefiles.files.size()).isEqualTo(3); assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT"); - assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004"); + assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000005"); assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000007"); } } diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index 59e7be3d9..1fe5503cb 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -604,10 +604,14 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { dbfull()->TEST_WaitForStatsDumpRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to all three cf, flush default cf - // LogNumbers: default: 14, stats: 4, pikachu: 4 + // LogNumbers: default: 16, stats: 10, pikachu: 5 + // Since in recovery process, cfd_stats column is created after WAL is + // created, synced and MANIFEST is persisted, its log number which depends on + // logfile_number_ will be different. Since "pikachu" is never flushed, thus + // its log_number should be the smallest of the three. ASSERT_OK(Flush()); - ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); - ASSERT_LT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + ASSERT_LT(cfd_test->GetLogNumber(), cfd_stats->GetLogNumber()); + ASSERT_LT(cfd_test->GetLogNumber(), cfd_default->GetLogNumber()); ASSERT_OK(Put("foo1", "v1")); ASSERT_OK(Put("bar1", "v1"));