Do not swallow error returned from SaveTo() (#6801)

Summary:
With consistency check enabled, VersionBuilder::SaveTo() may return error once
corruption is detected while building versions. We should handle these errors.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6801

Test Plan: make check

Reviewed By: siying

Differential Revision: D21385045

Pulled By: riversand963

fbshipit-source-id: 98f6424e2a4699b62befa21e9fe00e70a771118e
main
Yanqin Jin 4 years ago committed by Facebook GitHub Bot
parent 5a61e7864d
commit 5584595f80
  1. 22
      db/db_basic_test.cc
  2. 58
      db/db_impl/db_secondary_test.cc
  3. 40
      db/version_edit_handler.cc
  4. 78
      db/version_set.cc
  5. 4
      db/version_set_test.cc

@ -1774,6 +1774,28 @@ TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
}
}
TEST_F(DBBasicTest, BestEffortsRecoveryWithVersionBuildingFailure) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
ASSERT_NE(nullptr, arg);
*(reinterpret_cast<Status*>(arg)) =
Status::Corruption("Inject corruption");
});
SyncPoint::GetInstance()->EnableProcessing();
options.best_efforts_recovery = true;
Status s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
#ifndef ROCKSDB_LITE
namespace {
class TableFileListener : public EventListener {

@ -45,6 +45,8 @@ class DBSecondaryTest : public DBTestBase {
void OpenSecondary(const Options& options);
Status TryOpenSecondary(const Options& options);
void OpenSecondaryWithColumnFamilies(
const std::vector<std::string>& column_families, const Options& options);
@ -70,9 +72,13 @@ class DBSecondaryTest : public DBTestBase {
};
void DBSecondaryTest::OpenSecondary(const Options& options) {
ASSERT_OK(TryOpenSecondary(options));
}
Status DBSecondaryTest::TryOpenSecondary(const Options& options) {
Status s =
DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
ASSERT_OK(s);
return s;
}
void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
@ -858,6 +864,56 @@ TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) {
thread.join();
ASSERT_TRUE(called);
}
TEST_F(DBSecondaryTest, StartFromInconsistent) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
ASSERT_NE(nullptr, arg);
*(reinterpret_cast<Status*>(arg)) =
Status::Corruption("Inject corruption");
});
SyncPoint::GetInstance()->EnableProcessing();
Options options1;
Status s = TryOpenSecondary(options1);
ASSERT_TRUE(s.IsCorruption());
}
TEST_F(DBSecondaryTest, InconsistencyDuringCatchUp) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
Options options1;
OpenSecondary(options1);
{
std::string value;
ASSERT_OK(db_secondary_->Get(ReadOptions(), "foo", &value));
ASSERT_EQ("value", value);
}
ASSERT_OK(Put("bar", "value1"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
ASSERT_NE(nullptr, arg);
*(reinterpret_cast<Status*>(arg)) =
Status::Corruption("Inject corruption");
});
SyncPoint::GetInstance()->EnableProcessing();
Status s = db_secondary_->TryCatchUpWithPrimary();
ASSERT_TRUE(s.IsCorruption());
}
#endif //! ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

@ -377,6 +377,7 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
ColumnFamilyData* cfd,
bool force_create_version) {
assert(cfd->initialized());
Status s;
if (force_create_version) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
@ -384,13 +385,18 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
auto* v = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(),
version_set_->current_version_number_++);
builder->SaveTo(v->storage_info());
// Install new version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(version_set_->db_options_->skip_stats_update_on_db_open));
version_set_->AppendVersion(cfd, v);
s = builder->SaveTo(v->storage_info());
if (s.ok()) {
// Install new version
v->PrepareApply(
*cfd->GetLatestMutableCFOptions(),
!(version_set_->db_options_->skip_stats_update_on_db_open));
version_set_->AppendVersion(cfd, v);
} else {
delete v;
}
}
return Status::OK();
return s;
}
Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
@ -558,16 +564,20 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
auto* version = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(),
version_set_->current_version_number_++);
builder->SaveTo(version->storage_info());
version->PrepareApply(
*cfd->GetLatestMutableCFOptions(),
!version_set_->db_options_->skip_stats_update_on_db_open);
auto v_iter = versions_.find(cfd->GetID());
if (v_iter != versions_.end()) {
delete v_iter->second;
v_iter->second = version;
s = builder->SaveTo(version->storage_info());
if (s.ok()) {
version->PrepareApply(
*cfd->GetLatestMutableCFOptions(),
!version_set_->db_options_->skip_stats_update_on_db_open);
auto v_iter = versions_.find(cfd->GetID());
if (v_iter != versions_.end()) {
delete v_iter->second;
v_iter->second = version;
} else {
versions_.emplace(cfd->GetID(), version);
}
} else {
versions_.emplace(cfd->GetID(), version);
delete version;
}
}
return s;

@ -6010,13 +6010,23 @@ Status ReactiveVersionSet::Recover(
Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
s = builder->SaveTo(v->storage_info());
// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(db_options_->skip_stats_update_on_db_open));
AppendVersion(cfd, v);
if (s.ok()) {
// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(db_options_->skip_stats_update_on_db_open));
AppendVersion(cfd, v);
} else {
ROCKS_LOG_ERROR(db_options_->info_log,
"[%s]: inconsistent version: %s\n",
cfd->GetName().c_str(), s.ToString().c_str());
delete v;
break;
}
}
}
if (s.ok()) {
next_file_number_.store(version_edit.next_file_number_ + 1);
last_allocated_sequence_ = version_edit.last_sequence_;
last_published_sequence_ = version_edit.last_sequence_;
@ -6093,6 +6103,8 @@ Status ReactiveVersionSet::ReadAndApply(
s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
if (s.ok()) {
applied_edits++;
} else {
break;
}
}
}
@ -6102,13 +6114,14 @@ Status ReactiveVersionSet::ReadAndApply(
}
// It's possible that:
// 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
// Or the version(s) rebuilt from tailing the MANIFEST is inconsistent.
// 2) we have finished reading the current MANIFEST.
// 3) we have encountered an IOError reading the current MANIFEST.
// We need to look for the next MANIFEST and start from there. If we cannot
// find the next MANIFEST, we should exit the loop.
s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
Status tmp_s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
reader = manifest_reader->get();
if (s.ok()) {
if (tmp_s.ok()) {
if (reader->file()->file_name() == old_manifest_path) {
// Still processing the same MANIFEST, thus no need to continue this
// loop since no record is available if we have reached here.
@ -6138,6 +6151,7 @@ Status ReactiveVersionSet::ReadAndApply(
number_of_edits_to_skip_ += 2;
}
}
s = tmp_s;
}
}
}
@ -6230,12 +6244,16 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
auto version = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(version->storage_info());
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
AppendVersion(cfd, version);
active_version_builders_.erase(builder_iter);
if (cfds_changed->count(cfd) == 0) {
cfds_changed->insert(cfd);
s = builder->SaveTo(version->storage_info());
if (s.ok()) {
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
AppendVersion(cfd, version);
active_version_builders_.erase(builder_iter);
if (cfds_changed->count(cfd) == 0) {
cfds_changed->insert(cfd);
}
} else {
delete version;
}
} else if (s.IsPathNotFound()) {
s = Status::OK();
@ -6243,23 +6261,25 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
// Some other error has occurred during LoadTableHandlers.
}
if (version_edit->HasNextFile()) {
next_file_number_.store(version_edit->next_file_number_ + 1);
}
if (version_edit->has_last_sequence_) {
last_allocated_sequence_ = version_edit->last_sequence_;
last_published_sequence_ = version_edit->last_sequence_;
last_sequence_ = version_edit->last_sequence_;
}
if (version_edit->has_prev_log_number_) {
prev_log_number_ = version_edit->prev_log_number_;
MarkFileNumberUsed(version_edit->prev_log_number_);
}
if (version_edit->has_log_number_) {
MarkFileNumberUsed(version_edit->log_number_);
if (s.ok()) {
if (version_edit->HasNextFile()) {
next_file_number_.store(version_edit->next_file_number_ + 1);
}
if (version_edit->has_last_sequence_) {
last_allocated_sequence_ = version_edit->last_sequence_;
last_published_sequence_ = version_edit->last_sequence_;
last_sequence_ = version_edit->last_sequence_;
}
if (version_edit->has_prev_log_number_) {
prev_log_number_ = version_edit->prev_log_number_;
MarkFileNumberUsed(version_edit->prev_log_number_);
}
if (version_edit->has_log_number_) {
MarkFileNumberUsed(version_edit->log_number_);
}
column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
}
column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
return s;
}

@ -1448,7 +1448,7 @@ TEST_F(VersionSetAtomicGroupTest,
// Write the corrupted edits.
AddNewEditsToLog(kAtomicGroupSize);
mu.Lock();
EXPECT_OK(
EXPECT_NOK(
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
mu.Unlock();
EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
@ -1498,7 +1498,7 @@ TEST_F(VersionSetAtomicGroupTest,
&manifest_reader_status));
AddNewEditsToLog(kAtomicGroupSize);
mu.Lock();
EXPECT_OK(
EXPECT_NOK(
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
mu.Unlock();
EXPECT_EQ(edits_[1].DebugString(),

Loading…
Cancel
Save