From 5584595f80208f13b8e8938fd8d1b70d1c0600ac Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 5 May 2020 10:44:12 -0700 Subject: [PATCH] Do not swallow error returned from SaveTo() (#6801) Summary: With consistency check enabled, VersionBuilder::SaveTo() may return error once corruption is detected while building versions. We should handle these errors. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6801 Test Plan: make check Reviewed By: siying Differential Revision: D21385045 Pulled By: riversand963 fbshipit-source-id: 98f6424e2a4699b62befa21e9fe00e70a771118e --- db/db_basic_test.cc | 22 ++++++++++ db/db_impl/db_secondary_test.cc | 58 +++++++++++++++++++++++- db/version_edit_handler.cc | 40 ++++++++++------- db/version_set.cc | 78 +++++++++++++++++++++------------ db/version_set_test.cc | 4 +- 5 files changed, 155 insertions(+), 47 deletions(-) diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 1fa9a1a9f..979c2309d 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1774,6 +1774,28 @@ TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) { } } +TEST_F(DBBasicTest, BestEffortsRecoveryWithVersionBuildingFailure) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "value")); + ASSERT_OK(Flush()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) { + ASSERT_NE(nullptr, arg); + *(reinterpret_cast(arg)) = + Status::Corruption("Inject corruption"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + options.best_efforts_recovery = true; + Status s = TryReopen(options); + ASSERT_TRUE(s.IsCorruption()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + #ifndef ROCKSDB_LITE namespace { class TableFileListener : public EventListener { diff --git a/db/db_impl/db_secondary_test.cc b/db/db_impl/db_secondary_test.cc index 0b34181de..d54a52493 100644 --- a/db/db_impl/db_secondary_test.cc +++ b/db/db_impl/db_secondary_test.cc @@ -45,6 +45,8 @@ class DBSecondaryTest : public DBTestBase { void OpenSecondary(const Options& options); + Status TryOpenSecondary(const Options& options); + void OpenSecondaryWithColumnFamilies( const std::vector& column_families, const Options& options); @@ -70,9 +72,13 @@ class DBSecondaryTest : public DBTestBase { }; void DBSecondaryTest::OpenSecondary(const Options& options) { + ASSERT_OK(TryOpenSecondary(options)); +} + +Status DBSecondaryTest::TryOpenSecondary(const Options& options) { Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_); - ASSERT_OK(s); + return s; } void DBSecondaryTest::OpenSecondaryWithColumnFamilies( @@ -858,6 +864,56 @@ TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) { thread.join(); ASSERT_TRUE(called); } + +TEST_F(DBSecondaryTest, StartFromInconsistent) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "value")); + ASSERT_OK(Flush()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) { + ASSERT_NE(nullptr, arg); + *(reinterpret_cast(arg)) = + Status::Corruption("Inject corruption"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + Options options1; + Status s = TryOpenSecondary(options1); + ASSERT_TRUE(s.IsCorruption()); +} + +TEST_F(DBSecondaryTest, InconsistencyDuringCatchUp) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "value")); + ASSERT_OK(Flush()); + + Options options1; + OpenSecondary(options1); + + { + std::string value; + ASSERT_OK(db_secondary_->Get(ReadOptions(), "foo", &value)); + ASSERT_EQ("value", value); + } + + ASSERT_OK(Put("bar", "value1")); + ASSERT_OK(Flush()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) { + ASSERT_NE(nullptr, arg); + *(reinterpret_cast(arg)) = + Status::Corruption("Inject corruption"); + }); + SyncPoint::GetInstance()->EnableProcessing(); + Status s = db_secondary_->TryCatchUpWithPrimary(); + ASSERT_TRUE(s.IsCorruption()); +} #endif //! ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 44e21efa6..2304705ce 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -377,6 +377,7 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/, ColumnFamilyData* cfd, bool force_create_version) { assert(cfd->initialized()); + Status s; if (force_create_version) { auto builder_iter = builders_.find(cfd->GetID()); assert(builder_iter != builders_.end()); @@ -384,13 +385,18 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/, auto* v = new Version(cfd, version_set_, version_set_->file_options_, *cfd->GetLatestMutableCFOptions(), version_set_->current_version_number_++); - builder->SaveTo(v->storage_info()); - // Install new version - v->PrepareApply(*cfd->GetLatestMutableCFOptions(), - !(version_set_->db_options_->skip_stats_update_on_db_open)); - version_set_->AppendVersion(cfd, v); + s = builder->SaveTo(v->storage_info()); + if (s.ok()) { + // Install new version + v->PrepareApply( + *cfd->GetLatestMutableCFOptions(), + !(version_set_->db_options_->skip_stats_update_on_db_open)); + version_set_->AppendVersion(cfd, v); + } else { + delete v; + } } - return Status::OK(); + return s; } Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd, @@ -558,16 +564,20 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion( auto* version = new Version(cfd, version_set_, version_set_->file_options_, *cfd->GetLatestMutableCFOptions(), version_set_->current_version_number_++); - builder->SaveTo(version->storage_info()); - version->PrepareApply( - *cfd->GetLatestMutableCFOptions(), - !version_set_->db_options_->skip_stats_update_on_db_open); - auto v_iter = versions_.find(cfd->GetID()); - if (v_iter != versions_.end()) { - delete v_iter->second; - v_iter->second = version; + s = builder->SaveTo(version->storage_info()); + if (s.ok()) { + version->PrepareApply( + *cfd->GetLatestMutableCFOptions(), + !version_set_->db_options_->skip_stats_update_on_db_open); + auto v_iter = versions_.find(cfd->GetID()); + if (v_iter != versions_.end()) { + delete v_iter->second; + v_iter->second = version; + } else { + versions_.emplace(cfd->GetID(), version); + } } else { - versions_.emplace(cfd->GetID(), version); + delete version; } } return s; diff --git a/db/version_set.cc b/db/version_set.cc index 65f88cdc8..bf23d1e0e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -6010,13 +6010,23 @@ Status ReactiveVersionSet::Recover( Version* v = new Version(cfd, this, file_options_, *cfd->GetLatestMutableCFOptions(), current_version_number_++); - builder->SaveTo(v->storage_info()); + s = builder->SaveTo(v->storage_info()); - // Install recovered version - v->PrepareApply(*cfd->GetLatestMutableCFOptions(), - !(db_options_->skip_stats_update_on_db_open)); - AppendVersion(cfd, v); + if (s.ok()) { + // Install recovered version + v->PrepareApply(*cfd->GetLatestMutableCFOptions(), + !(db_options_->skip_stats_update_on_db_open)); + AppendVersion(cfd, v); + } else { + ROCKS_LOG_ERROR(db_options_->info_log, + "[%s]: inconsistent version: %s\n", + cfd->GetName().c_str(), s.ToString().c_str()); + delete v; + break; + } } + } + if (s.ok()) { next_file_number_.store(version_edit.next_file_number_ + 1); last_allocated_sequence_ = version_edit.last_sequence_; last_published_sequence_ = version_edit.last_sequence_; @@ -6093,6 +6103,8 @@ Status ReactiveVersionSet::ReadAndApply( s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit); if (s.ok()) { applied_edits++; + } else { + break; } } } @@ -6102,13 +6114,14 @@ Status ReactiveVersionSet::ReadAndApply( } // It's possible that: // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted. + // Or the version(s) rebuilt from tailing the MANIFEST is inconsistent. // 2) we have finished reading the current MANIFEST. // 3) we have encountered an IOError reading the current MANIFEST. // We need to look for the next MANIFEST and start from there. If we cannot // find the next MANIFEST, we should exit the loop. - s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader); + Status tmp_s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader); reader = manifest_reader->get(); - if (s.ok()) { + if (tmp_s.ok()) { if (reader->file()->file_name() == old_manifest_path) { // Still processing the same MANIFEST, thus no need to continue this // loop since no record is available if we have reached here. @@ -6138,6 +6151,7 @@ Status ReactiveVersionSet::ReadAndApply( number_of_edits_to_skip_ += 2; } } + s = tmp_s; } } } @@ -6230,12 +6244,16 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( auto version = new Version(cfd, this, file_options_, *cfd->GetLatestMutableCFOptions(), current_version_number_++); - builder->SaveTo(version->storage_info()); - version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true); - AppendVersion(cfd, version); - active_version_builders_.erase(builder_iter); - if (cfds_changed->count(cfd) == 0) { - cfds_changed->insert(cfd); + s = builder->SaveTo(version->storage_info()); + if (s.ok()) { + version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true); + AppendVersion(cfd, version); + active_version_builders_.erase(builder_iter); + if (cfds_changed->count(cfd) == 0) { + cfds_changed->insert(cfd); + } + } else { + delete version; } } else if (s.IsPathNotFound()) { s = Status::OK(); @@ -6243,23 +6261,25 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( // Some other error has occurred during LoadTableHandlers. } - if (version_edit->HasNextFile()) { - next_file_number_.store(version_edit->next_file_number_ + 1); - } - if (version_edit->has_last_sequence_) { - last_allocated_sequence_ = version_edit->last_sequence_; - last_published_sequence_ = version_edit->last_sequence_; - last_sequence_ = version_edit->last_sequence_; - } - if (version_edit->has_prev_log_number_) { - prev_log_number_ = version_edit->prev_log_number_; - MarkFileNumberUsed(version_edit->prev_log_number_); - } - if (version_edit->has_log_number_) { - MarkFileNumberUsed(version_edit->log_number_); + if (s.ok()) { + if (version_edit->HasNextFile()) { + next_file_number_.store(version_edit->next_file_number_ + 1); + } + if (version_edit->has_last_sequence_) { + last_allocated_sequence_ = version_edit->last_sequence_; + last_published_sequence_ = version_edit->last_sequence_; + last_sequence_ = version_edit->last_sequence_; + } + if (version_edit->has_prev_log_number_) { + prev_log_number_ = version_edit->prev_log_number_; + MarkFileNumberUsed(version_edit->prev_log_number_); + } + if (version_edit->has_log_number_) { + MarkFileNumberUsed(version_edit->log_number_); + } + column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_); + MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_); } - column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_); - MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_); return s; } diff --git a/db/version_set_test.cc b/db/version_set_test.cc index c335589e3..1607ddae1 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1448,7 +1448,7 @@ TEST_F(VersionSetAtomicGroupTest, // Write the corrupted edits. AddNewEditsToLog(kAtomicGroupSize); mu.Lock(); - EXPECT_OK( + EXPECT_NOK( reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); mu.Unlock(); EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), @@ -1498,7 +1498,7 @@ TEST_F(VersionSetAtomicGroupTest, &manifest_reader_status)); AddNewEditsToLog(kAtomicGroupSize); mu.Lock(); - EXPECT_OK( + EXPECT_NOK( reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); mu.Unlock(); EXPECT_EQ(edits_[1].DebugString(),