Enforce status check for corruption_test (#7453)

Summary:
Enforce status check for corruption_test.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7453

Test Plan:
```
ASSERT_STATUS_CHECKED=1 make corruption_test
./corruption_test
```

Reviewed By: jay-zhuang

Differential Revision: D24006862

Pulled By: riversand963

fbshipit-source-id: 664677caf4c3007a25cf565cec3d677f2dcea130
main
Yanqin Jin 4 years ago committed by Facebook GitHub Bot
parent 7d503e66a9
commit 758ead5df7
  1. 2
      Makefile
  2. 12
      db/builder.cc
  3. 58
      db/corruption_test.cc
  4. 2
      db/db_impl/db_impl_open.cc
  5. 6
      db/db_impl/db_impl_write.cc
  6. 12
      db/version_builder.cc
  7. 13
      db/version_set.cc
  8. 10
      db/write_thread.cc
  9. 2
      db/write_thread.h
  10. 3
      table/table_factory.cc

@ -659,6 +659,8 @@ ifdef ASSERT_STATUS_CHECKED
full_filter_block_test \
partitioned_filter_block_test \
column_family_test \
file_reader_writer_test \
corruption_test \
ifeq ($(USE_FOLLY_DISTRIBUTED_MUTEX),1)
TESTS_PASSING_ASC += folly_synchronization_distributed_mutex_test

@ -207,6 +207,11 @@ Status BuildTable(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
}
}
if (!s.ok()) {
c_iter.status().PermitUncheckedError();
} else if (!c_iter.status().ok()) {
s = c_iter.status();
}
if (s.ok()) {
auto range_del_it = range_del_agg->NewIterator();
for (range_del_it->SeekToFirst(); range_del_it->Valid();
@ -218,13 +223,8 @@ Status BuildTable(
tombstone.seq_, internal_comparator);
}
// Finish and check for builder errors
s = c_iter.status();
if (blob_file_builder) {
if (s.ok()) {
s = blob_file_builder->Finish();
}
s = blob_file_builder->Finish();
}
}

@ -56,7 +56,8 @@ class CorruptionTest : public testing::Test {
options_.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
options_.env = &env_;
dbname_ = test::PerThreadDBPath("corruption_test");
DestroyDB(dbname_, options_);
Status s = DestroyDB(dbname_, options_);
EXPECT_OK(s);
db_ = nullptr;
options_.create_if_missing = true;
@ -111,12 +112,12 @@ class CorruptionTest : public testing::Test {
for (int i = 0; i < n; i++) {
if (flush_every != 0 && i != 0 && i % flush_every == 0) {
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
dbi->TEST_FlushMemTable();
ASSERT_OK(dbi->TEST_FlushMemTable());
}
//if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
Slice key = Key(i, &key_space);
batch.Clear();
batch.Put(key, Value(i, &value_space));
ASSERT_OK(batch.Put(key, Value(i, &value_space)));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
}
}
@ -135,6 +136,7 @@ class CorruptionTest : public testing::Test {
// occurred.
Iterator* iter = db_->NewIterator(ReadOptions(false, true));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
uint64_t key;
Slice in(iter->key());
if (!ConsumeDecimalNumber(&in, &key) ||
@ -151,6 +153,7 @@ class CorruptionTest : public testing::Test {
correct++;
}
}
iter->status().PermitUncheckedError();
delete iter;
fprintf(stderr,
@ -270,7 +273,7 @@ TEST_F(CorruptionTest, NewFileErrorDuringWrite) {
bool failed = false;
for (int i = 0; i < num; i++) {
WriteBatch batch;
batch.Put("a", Value(100, &value_storage));
ASSERT_OK(batch.Put("a", Value(100, &value_storage)));
s = db_->Write(WriteOptions(), &batch);
if (!s.ok()) {
failed = true;
@ -286,9 +289,9 @@ TEST_F(CorruptionTest, NewFileErrorDuringWrite) {
TEST_F(CorruptionTest, TableFile) {
Build(100);
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
dbi->TEST_FlushMemTable();
dbi->TEST_CompactRange(0, nullptr, nullptr);
dbi->TEST_CompactRange(1, nullptr, nullptr);
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
Corrupt(kTableFile, 100, 1);
Check(99, 99);
@ -309,9 +312,9 @@ TEST_F(CorruptionTest, VerifyChecksumReadahead) {
Build(10000);
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
dbi->TEST_FlushMemTable();
dbi->TEST_CompactRange(0, nullptr, nullptr);
dbi->TEST_CompactRange(1, nullptr, nullptr);
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
senv.count_random_reads_ = true;
senv.random_read_counter_.Reset();
@ -356,7 +359,7 @@ TEST_F(CorruptionTest, TableFileIndexData) {
// build 2 tables, flush at 5000
Build(10000, 5000);
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
dbi->TEST_FlushMemTable();
ASSERT_OK(dbi->TEST_FlushMemTable());
// corrupt an index block of an entire file
Corrupt(kTableFile, -2000, 500);
@ -403,8 +406,8 @@ TEST_F(CorruptionTest, SequenceNumberRecovery) {
TEST_F(CorruptionTest, CorruptedDescriptor) {
ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello"));
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
dbi->TEST_FlushMemTable();
dbi->TEST_CompactRange(0, nullptr, nullptr);
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
Corrupt(kDescriptorFile, 0, 1000);
Status s = TryReopen();
@ -422,9 +425,9 @@ TEST_F(CorruptionTest, CompactionInputError) {
Reopen(&options);
Build(10);
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
dbi->TEST_FlushMemTable();
dbi->TEST_CompactRange(0, nullptr, nullptr);
dbi->TEST_CompactRange(1, nullptr, nullptr);
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
ASSERT_EQ(1, Property("rocksdb.num-files-at-level2"));
Corrupt(kTableFile, 100, 1);
@ -447,12 +450,12 @@ TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
// Fill levels >= 1
for (int level = 1; level < dbi->NumberLevels(); level++) {
dbi->Put(WriteOptions(), "", "begin");
dbi->Put(WriteOptions(), "~", "end");
dbi->TEST_FlushMemTable();
ASSERT_OK(dbi->Put(WriteOptions(), "", "begin"));
ASSERT_OK(dbi->Put(WriteOptions(), "~", "end"));
ASSERT_OK(dbi->TEST_FlushMemTable());
for (int comp_level = 0; comp_level < dbi->NumberLevels() - level;
++comp_level) {
dbi->TEST_CompactRange(comp_level, nullptr, nullptr);
ASSERT_OK(dbi->TEST_CompactRange(comp_level, nullptr, nullptr));
}
}
@ -460,8 +463,8 @@ TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
dbi = static_cast_with_check<DBImpl>(db_);
Build(10);
dbi->TEST_FlushMemTable();
dbi->TEST_WaitForCompact();
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_WaitForCompact());
ASSERT_EQ(1, Property("rocksdb.num-files-at-level0"));
CorruptTableFileAtLevel(0, 100, 1);
@ -486,7 +489,7 @@ TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
TEST_F(CorruptionTest, UnrelatedKeys) {
Build(10);
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
dbi->TEST_FlushMemTable();
ASSERT_OK(dbi->TEST_FlushMemTable());
Corrupt(kTableFile, 100, 1);
ASSERT_NOK(dbi->VerifyChecksum());
@ -495,7 +498,7 @@ TEST_F(CorruptionTest, UnrelatedKeys) {
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
dbi->TEST_FlushMemTable();
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
}
@ -548,17 +551,17 @@ TEST_F(CorruptionTest, FileSystemStateCorrupted) {
if (iter == 0) { // corrupt file size
std::unique_ptr<WritableFile> file;
env_.NewWritableFile(filename, &file, EnvOptions());
file->Append(Slice("corrupted sst"));
ASSERT_OK(file->Append(Slice("corrupted sst")));
file.reset();
Status x = TryReopen(&options);
ASSERT_TRUE(x.IsCorruption());
} else { // delete the file
env_.DeleteFile(filename);
ASSERT_OK(env_.DeleteFile(filename));
Status x = TryReopen(&options);
ASSERT_TRUE(x.IsPathNotFound());
}
DestroyDB(dbname_, options_);
ASSERT_OK(DestroyDB(dbname_, options_));
}
}
@ -577,6 +580,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnFlush) {
delete db_;
db_ = nullptr;
s = DestroyDB(dbname_, options);
ASSERT_OK(s);
std::shared_ptr<mock::MockTableFactory> mock =
std::make_shared<mock::MockTableFactory>();
options.table_factory = mock;

@ -1678,6 +1678,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
*dbptr = impl;
impl->opened_successfully_ = true;
impl->MaybeScheduleFlushOrCompaction();
} else {
persist_options_status.PermitUncheckedError();
}
impl->mutex_.Unlock();

@ -999,8 +999,10 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
merged_batch = tmp_batch;
for (auto writer : write_group) {
if (!writer->CallbackFailed()) {
WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true);
Status s = WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true);
// Always returns Status::OK.
assert(s.ok());
if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
// We only need to cache the last of such write batch
*to_be_cached_state = writer->batch;

@ -979,17 +979,15 @@ class VersionBuilder::Rep {
for (auto& t : threads) {
t.join();
}
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
for (const auto& s : statuses) {
s.PermitUncheckedError();
}
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
Status ret;
for (const auto& s : statuses) {
if (!s.ok()) {
return s;
if (ret.ok()) {
ret = s;
}
}
}
return Status::OK();
return ret;
}
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {

@ -4132,10 +4132,15 @@ Status VersionSet::ProcessManifestWrites(
ROCKS_LOG_INFO(db_options_->info_log,
"Deleting manifest %" PRIu64 " current manifest %" PRIu64
"\n",
manifest_file_number_, pending_manifest_file_number_);
env_->DeleteFile(
DescriptorFileName(dbname_, pending_manifest_file_number_))
.PermitUncheckedError();
pending_manifest_file_number_, manifest_file_number_);
Status manifest_del_status = env_->DeleteFile(
DescriptorFileName(dbname_, pending_manifest_file_number_));
if (!manifest_del_status.ok()) {
ROCKS_LOG_WARN(db_options_->info_log,
"Failed to delete manifest %" PRIu64 ": %s",
pending_manifest_file_number_,
manifest_del_status.ToString().c_str());
}
}
}

@ -602,6 +602,8 @@ bool WriteThread::CompleteParallelMemTableWriter(Writer* w) {
}
// else we're the last parallel worker and should perform exit duties.
w->status = write_group->status;
// Callers of this function must ensure w->status is checked.
write_group->status.PermitUncheckedError();
return true;
}
@ -618,11 +620,17 @@ void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Status status) {
Status& status) {
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
assert(leader->link_older == nullptr);
// If status is non-ok already, then write_group.status won't have the chance
// of being propagated to caller.
if (!status.ok()) {
write_group.status.PermitUncheckedError();
}
// Propagate memtable write error to the whole group.
if (status.ok() && !write_group.status.ok()) {
status = write_group.status;

@ -292,7 +292,7 @@ class WriteThread {
//
// WriteGroup* write_group: the write group
// Status status: Status of write operation
void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status);
void ExitAsBatchGroupLeader(WriteGroup& write_group, Status& status);
// Exit batch group on behalf of batch group leader.
void ExitAsBatchGroupFollower(Writer* w);

@ -37,7 +37,8 @@ Status TableFactory::CreateFromString(const ConfigOptions& config_options_in,
factory->reset(new CuckooTableFactory());
#endif // ROCKSDB_LITE
} else {
return Status::NotSupported("Could not load table factory: ", name);
status = Status::NotSupported("Could not load table factory: ", name);
return status;
}
if (!existing_opts.empty()) {
config_options.invoke_prepare_options = false;

Loading…
Cancel
Save