From 758ead5df771d8ac2972c9f2c4d6074c97272151 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 2 Oct 2020 22:09:28 -0700 Subject: [PATCH] Enforce status check for corruption_test (#7453) Summary: Enforce status check for corruption_test. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7453 Test Plan: ``` ASSERT_STATUS_CHECKED=1 make corruption_test ./corruption_test ``` Reviewed By: jay-zhuang Differential Revision: D24006862 Pulled By: riversand963 fbshipit-source-id: 664677caf4c3007a25cf565cec3d677f2dcea130 --- Makefile | 2 ++ db/builder.cc | 12 ++++---- db/corruption_test.cc | 58 ++++++++++++++++++++----------------- db/db_impl/db_impl_open.cc | 2 ++ db/db_impl/db_impl_write.cc | 6 ++-- db/version_builder.cc | 12 ++++---- db/version_set.cc | 13 ++++++--- db/write_thread.cc | 10 ++++++- db/write_thread.h | 2 +- table/table_factory.cc | 3 +- 10 files changed, 71 insertions(+), 49 deletions(-) diff --git a/Makefile b/Makefile index aacdd4ca7..d4e624a69 100644 --- a/Makefile +++ b/Makefile @@ -659,6 +659,8 @@ ifdef ASSERT_STATUS_CHECKED full_filter_block_test \ partitioned_filter_block_test \ column_family_test \ + file_reader_writer_test \ + corruption_test \ ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1) TESTS_PASSING_ASC += folly_synchronization_distributed_mutex_test diff --git a/db/builder.cc b/db/builder.cc index fe73b5ef2..90cfbbffb 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -207,6 +207,11 @@ Status BuildTable( ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); } } + if (!s.ok()) { + c_iter.status().PermitUncheckedError(); + } else if (!c_iter.status().ok()) { + s = c_iter.status(); + } if (s.ok()) { auto range_del_it = range_del_agg->NewIterator(); for (range_del_it->SeekToFirst(); range_del_it->Valid(); @@ -218,13 +223,8 @@ Status BuildTable( tombstone.seq_, internal_comparator); } - // Finish and check for builder errors - s = c_iter.status(); - if (blob_file_builder) { - if (s.ok()) { - s = blob_file_builder->Finish(); - } + s = blob_file_builder->Finish(); } } diff --git a/db/corruption_test.cc b/db/corruption_test.cc index bcce7e0ee..3ba150de7 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -56,7 +56,8 @@ class CorruptionTest : public testing::Test { options_.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords; options_.env = &env_; dbname_ = test::PerThreadDBPath("corruption_test"); - DestroyDB(dbname_, options_); + Status s = DestroyDB(dbname_, options_); + EXPECT_OK(s); db_ = nullptr; options_.create_if_missing = true; @@ -111,12 +112,12 @@ class CorruptionTest : public testing::Test { for (int i = 0; i < n; i++) { if (flush_every != 0 && i != 0 && i % flush_every == 0) { DBImpl* dbi = static_cast_with_check(db_); - dbi->TEST_FlushMemTable(); + ASSERT_OK(dbi->TEST_FlushMemTable()); } //if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n); Slice key = Key(i, &key_space); batch.Clear(); - batch.Put(key, Value(i, &value_space)); + ASSERT_OK(batch.Put(key, Value(i, &value_space))); ASSERT_OK(db_->Write(WriteOptions(), &batch)); } } @@ -135,6 +136,7 @@ class CorruptionTest : public testing::Test { // occurred. Iterator* iter = db_->NewIterator(ReadOptions(false, true)); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); uint64_t key; Slice in(iter->key()); if (!ConsumeDecimalNumber(&in, &key) || @@ -151,6 +153,7 @@ class CorruptionTest : public testing::Test { correct++; } } + iter->status().PermitUncheckedError(); delete iter; fprintf(stderr, @@ -270,7 +273,7 @@ TEST_F(CorruptionTest, NewFileErrorDuringWrite) { bool failed = false; for (int i = 0; i < num; i++) { WriteBatch batch; - batch.Put("a", Value(100, &value_storage)); + ASSERT_OK(batch.Put("a", Value(100, &value_storage))); s = db_->Write(WriteOptions(), &batch); if (!s.ok()) { failed = true; @@ -286,9 +289,9 @@ TEST_F(CorruptionTest, NewFileErrorDuringWrite) { TEST_F(CorruptionTest, TableFile) { Build(100); DBImpl* dbi = static_cast_with_check(db_); - dbi->TEST_FlushMemTable(); - dbi->TEST_CompactRange(0, nullptr, nullptr); - dbi->TEST_CompactRange(1, nullptr, nullptr); + ASSERT_OK(dbi->TEST_FlushMemTable()); + ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr)); + ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr)); Corrupt(kTableFile, 100, 1); Check(99, 99); @@ -309,9 +312,9 @@ TEST_F(CorruptionTest, VerifyChecksumReadahead) { Build(10000); DBImpl* dbi = static_cast_with_check(db_); - dbi->TEST_FlushMemTable(); - dbi->TEST_CompactRange(0, nullptr, nullptr); - dbi->TEST_CompactRange(1, nullptr, nullptr); + ASSERT_OK(dbi->TEST_FlushMemTable()); + ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr)); + ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr)); senv.count_random_reads_ = true; senv.random_read_counter_.Reset(); @@ -356,7 +359,7 @@ TEST_F(CorruptionTest, TableFileIndexData) { // build 2 tables, flush at 5000 Build(10000, 5000); DBImpl* dbi = static_cast_with_check(db_); - dbi->TEST_FlushMemTable(); + ASSERT_OK(dbi->TEST_FlushMemTable()); // corrupt an index block of an entire file Corrupt(kTableFile, -2000, 500); @@ -403,8 +406,8 @@ TEST_F(CorruptionTest, SequenceNumberRecovery) { TEST_F(CorruptionTest, CorruptedDescriptor) { ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello")); DBImpl* dbi = static_cast_with_check(db_); - dbi->TEST_FlushMemTable(); - dbi->TEST_CompactRange(0, nullptr, nullptr); + ASSERT_OK(dbi->TEST_FlushMemTable()); + ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr)); Corrupt(kDescriptorFile, 0, 1000); Status s = TryReopen(); @@ -422,9 +425,9 @@ TEST_F(CorruptionTest, CompactionInputError) { Reopen(&options); Build(10); DBImpl* dbi = static_cast_with_check(db_); - dbi->TEST_FlushMemTable(); - dbi->TEST_CompactRange(0, nullptr, nullptr); - dbi->TEST_CompactRange(1, nullptr, nullptr); + ASSERT_OK(dbi->TEST_FlushMemTable()); + ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr)); + ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr)); ASSERT_EQ(1, Property("rocksdb.num-files-at-level2")); Corrupt(kTableFile, 100, 1); @@ -447,12 +450,12 @@ TEST_F(CorruptionTest, CompactionInputErrorParanoid) { // Fill levels >= 1 for (int level = 1; level < dbi->NumberLevels(); level++) { - dbi->Put(WriteOptions(), "", "begin"); - dbi->Put(WriteOptions(), "~", "end"); - dbi->TEST_FlushMemTable(); + ASSERT_OK(dbi->Put(WriteOptions(), "", "begin")); + ASSERT_OK(dbi->Put(WriteOptions(), "~", "end")); + ASSERT_OK(dbi->TEST_FlushMemTable()); for (int comp_level = 0; comp_level < dbi->NumberLevels() - level; ++comp_level) { - dbi->TEST_CompactRange(comp_level, nullptr, nullptr); + ASSERT_OK(dbi->TEST_CompactRange(comp_level, nullptr, nullptr)); } } @@ -460,8 +463,8 @@ TEST_F(CorruptionTest, CompactionInputErrorParanoid) { dbi = static_cast_with_check(db_); Build(10); - dbi->TEST_FlushMemTable(); - dbi->TEST_WaitForCompact(); + ASSERT_OK(dbi->TEST_FlushMemTable()); + ASSERT_OK(dbi->TEST_WaitForCompact()); ASSERT_EQ(1, Property("rocksdb.num-files-at-level0")); CorruptTableFileAtLevel(0, 100, 1); @@ -486,7 +489,7 @@ TEST_F(CorruptionTest, CompactionInputErrorParanoid) { TEST_F(CorruptionTest, UnrelatedKeys) { Build(10); DBImpl* dbi = static_cast_with_check(db_); - dbi->TEST_FlushMemTable(); + ASSERT_OK(dbi->TEST_FlushMemTable()); Corrupt(kTableFile, 100, 1); ASSERT_NOK(dbi->VerifyChecksum()); @@ -495,7 +498,7 @@ TEST_F(CorruptionTest, UnrelatedKeys) { std::string v; ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v)); ASSERT_EQ(Value(1000, &tmp2).ToString(), v); - dbi->TEST_FlushMemTable(); + ASSERT_OK(dbi->TEST_FlushMemTable()); ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v)); ASSERT_EQ(Value(1000, &tmp2).ToString(), v); } @@ -548,17 +551,17 @@ TEST_F(CorruptionTest, FileSystemStateCorrupted) { if (iter == 0) { // corrupt file size std::unique_ptr file; env_.NewWritableFile(filename, &file, EnvOptions()); - file->Append(Slice("corrupted sst")); + ASSERT_OK(file->Append(Slice("corrupted sst"))); file.reset(); Status x = TryReopen(&options); ASSERT_TRUE(x.IsCorruption()); } else { // delete the file - env_.DeleteFile(filename); + ASSERT_OK(env_.DeleteFile(filename)); Status x = TryReopen(&options); ASSERT_TRUE(x.IsPathNotFound()); } - DestroyDB(dbname_, options_); + ASSERT_OK(DestroyDB(dbname_, options_)); } } @@ -577,6 +580,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnFlush) { delete db_; db_ = nullptr; s = DestroyDB(dbname_, options); + ASSERT_OK(s); std::shared_ptr mock = std::make_shared(); options.table_factory = mock; diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index de43a0a8d..ed37632d1 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1678,6 +1678,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, *dbptr = impl; impl->opened_successfully_ = true; impl->MaybeScheduleFlushOrCompaction(); + } else { + persist_options_status.PermitUncheckedError(); } impl->mutex_.Unlock(); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index f120a0d9f..79427f6b0 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -999,8 +999,10 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, merged_batch = tmp_batch; for (auto writer : write_group) { if (!writer->CallbackFailed()) { - WriteBatchInternal::Append(merged_batch, writer->batch, - /*WAL_only*/ true); + Status s = WriteBatchInternal::Append(merged_batch, writer->batch, + /*WAL_only*/ true); + // Always returns Status::OK. + assert(s.ok()); if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) { // We only need to cache the last of such write batch *to_be_cached_state = writer->batch; diff --git a/db/version_builder.cc b/db/version_builder.cc index 2ede35824..49c35cf9f 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -979,17 +979,15 @@ class VersionBuilder::Rep { for (auto& t : threads) { t.join(); } -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - for (const auto& s : statuses) { - s.PermitUncheckedError(); - } -#endif // ROCKSDB_ASSERT_STATUS_CHECKED + Status ret; for (const auto& s : statuses) { if (!s.ok()) { - return s; + if (ret.ok()) { + ret = s; + } } } - return Status::OK(); + return ret; } void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) { diff --git a/db/version_set.cc b/db/version_set.cc index 7b0413943..1fadd929e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4132,10 +4132,15 @@ Status VersionSet::ProcessManifestWrites( ROCKS_LOG_INFO(db_options_->info_log, "Deleting manifest %" PRIu64 " current manifest %" PRIu64 "\n", - manifest_file_number_, pending_manifest_file_number_); - env_->DeleteFile( - DescriptorFileName(dbname_, pending_manifest_file_number_)) - .PermitUncheckedError(); + pending_manifest_file_number_, manifest_file_number_); + Status manifest_del_status = env_->DeleteFile( + DescriptorFileName(dbname_, pending_manifest_file_number_)); + if (!manifest_del_status.ok()) { + ROCKS_LOG_WARN(db_options_->info_log, + "Failed to delete manifest %" PRIu64 ": %s", + pending_manifest_file_number_, + manifest_del_status.ToString().c_str()); + } } } diff --git a/db/write_thread.cc b/db/write_thread.cc index c74b7b0ed..343534705 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -602,6 +602,8 @@ bool WriteThread::CompleteParallelMemTableWriter(Writer* w) { } // else we're the last parallel worker and should perform exit duties. w->status = write_group->status; + // Callers of this function must ensure w->status is checked. + write_group->status.PermitUncheckedError(); return true; } @@ -618,11 +620,17 @@ void WriteThread::ExitAsBatchGroupFollower(Writer* w) { static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader"); void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, - Status status) { + Status& status) { Writer* leader = write_group.leader; Writer* last_writer = write_group.last_writer; assert(leader->link_older == nullptr); + // If status is non-ok already, then write_group.status won't have the chance + // of being propagated to caller. + if (!status.ok()) { + write_group.status.PermitUncheckedError(); + } + // Propagate memtable write error to the whole group. if (status.ok() && !write_group.status.ok()) { status = write_group.status; diff --git a/db/write_thread.h b/db/write_thread.h index 8c2d431dc..9b6abd483 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -292,7 +292,7 @@ class WriteThread { // // WriteGroup* write_group: the write group // Status status: Status of write operation - void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status); + void ExitAsBatchGroupLeader(WriteGroup& write_group, Status& status); // Exit batch group on behalf of batch group leader. void ExitAsBatchGroupFollower(Writer* w); diff --git a/table/table_factory.cc b/table/table_factory.cc index ebf5cfaf7..18935c859 100644 --- a/table/table_factory.cc +++ b/table/table_factory.cc @@ -37,7 +37,8 @@ Status TableFactory::CreateFromString(const ConfigOptions& config_options_in, factory->reset(new CuckooTableFactory()); #endif // ROCKSDB_LITE } else { - return Status::NotSupported("Could not load table factory: ", name); + status = Status::NotSupported("Could not load table factory: ", name); + return status; } if (!existing_opts.empty()) { config_options.invoke_prepare_options = false;