diff --git a/HISTORY.md b/HISTORY.md index da06ca228..9bb717423 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * Fix bloom filter lookups by the MultiGet batching API when BlockBasedTableOptions::whole_key_filtering is false, by checking that a key is in the perfix_extractor domain and extracting the prefix before looking up. ### New Features * VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting. +* When user uses options.force_consistency_check in RocksDb, instead of crashing the process, we now pass the error back to the users without killing the process. ### Public API Change * Added max_write_buffer_size_to_maintain option to better control memory usage of immutable memtables. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 45ba17011..49307bc78 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -4658,7 +4658,31 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) { ASSERT_OK(dbfull()->TEST_WaitForCompact()); Close(); } +TEST_F(DBCompactionTest, ConsistencyFailTest) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "VersionBuilder::CheckConsistency", [&](void* arg) { + auto p = + reinterpret_cast*>(arg); + // just swap the two FileMetaData so that we hit error + // in CheckConsistency funcion + FileMetaData* temp = *(p->first); + *(p->first) = *(p->second); + *(p->second) = temp; + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + for (int k = 0; k < 2; ++k) { + ASSERT_OK(Put("foo", "bar")); + Flush(); + } + ASSERT_NOK(Put("foo", "bar")); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} #endif // !defined(ROCKSDB_LITE) } // namespace rocksdb diff --git a/db/version_builder.cc b/db/version_builder.cc index 9d2ba9ab4..b97853f2d 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -27,6 +27,7 @@ #include "db/version_set.h" #include "port/port.h" #include "table/table_reader.h" +#include "util/string_util.h" namespace rocksdb { @@ -138,12 +139,12 @@ class VersionBuilder::Rep { } } - void CheckConsistency(VersionStorageInfo* vstorage) { + Status CheckConsistency(VersionStorageInfo* vstorage) { #ifdef NDEBUG if (!vstorage->force_consistency_checks()) { // Dont run consistency checks in release mode except if // explicitly asked to - return; + return Status::OK(); } #endif // make sure the files are sorted correctly @@ -152,10 +153,14 @@ class VersionBuilder::Rep { for (size_t i = 1; i < level_files.size(); i++) { auto f1 = level_files[i - 1]; auto f2 = level_files[i]; +#ifndef NDEBUG + auto pair = std::make_pair(&f1, &f2); + TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency", &pair); +#endif if (level == 0) { if (!level_zero_cmp_(f1, f2)) { fprintf(stderr, "L0 files are not sorted properly"); - abort(); + return Status::Corruption("L0 files are not sorted properly"); } if (f2->fd.smallest_seqno == f2->fd.largest_seqno) { @@ -168,7 +173,14 @@ class VersionBuilder::Rep { " vs. file with global_seqno %" PRIu64 "\n", f1->fd.smallest_seqno, f1->fd.largest_seqno, external_file_seqno); - abort(); + return Status::Corruption("L0 file with seqno " + + NumberToString(f1->fd.smallest_seqno) + + " " + + NumberToString(f1->fd.largest_seqno) + + " vs. file with global_seqno" + + NumberToString(external_file_seqno) + + " with fileNumber " + + NumberToString(f1->fd.GetNumber())); } } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) { fprintf(stderr, @@ -176,12 +188,19 @@ class VersionBuilder::Rep { " %" PRIu64 "\n", f1->fd.smallest_seqno, f1->fd.largest_seqno, f2->fd.smallest_seqno, f2->fd.largest_seqno); - abort(); + return Status::Corruption( + "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) + + " " + NumberToString(f1->fd.largest_seqno) + " " + + NumberToString(f1->fd.GetNumber()) + " vs. " + + NumberToString(f2->fd.smallest_seqno) + " " + + NumberToString(f2->fd.largest_seqno) + " " + + NumberToString(f2->fd.GetNumber())); } } else { if (!level_nonzero_cmp_(f1, f2)) { fprintf(stderr, "L%d files are not sorted properly", level); - abort(); + return Status::Corruption("L" + NumberToString(level) + + " files are not sorted properly"); } // Make sure there is no overlap in levels > 0 @@ -190,20 +209,24 @@ class VersionBuilder::Rep { fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level, (f1->largest).DebugString(true).c_str(), (f2->smallest).DebugString(true).c_str()); - abort(); + return Status::Corruption( + "L" + NumberToString(level) + " have overlapping ranges " + + (f1->largest).DebugString(true) + " vs. " + + (f2->smallest).DebugString(true)); } } } } + return Status::OK(); } - void CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number, - int level) { + Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number, + int level) { #ifdef NDEBUG if (!base_vstorage_->force_consistency_checks()) { // Dont run consistency checks in release mode except if // explicitly asked to - return; + return Status::OK(); } #endif // a file to be deleted better exist in the previous version @@ -241,8 +264,9 @@ class VersionBuilder::Rep { } if (!found) { fprintf(stderr, "not found %" PRIu64 "\n", number); - abort(); + return Status::Corruption("not found " + NumberToString(number)); } + return Status::OK(); } bool CheckConsistencyForNumLevels() { @@ -259,8 +283,11 @@ class VersionBuilder::Rep { } // Apply all of the edits in *edit to the current state. - void Apply(VersionEdit* edit) { - CheckConsistency(base_vstorage_); + Status Apply(VersionEdit* edit) { + Status s = CheckConsistency(base_vstorage_); + if (!s.ok()) { + return s; + } // Delete files const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles(); @@ -308,12 +335,20 @@ class VersionBuilder::Rep { } } } + return s; } // Save the current state in *v. - void SaveTo(VersionStorageInfo* vstorage) { - CheckConsistency(base_vstorage_); - CheckConsistency(vstorage); + Status SaveTo(VersionStorageInfo* vstorage) { + Status s = CheckConsistency(base_vstorage_); + if (!s.ok()) { + return s; + } + + s = CheckConsistency(vstorage); + if (!s.ok()) { + return s; + } for (int level = 0; level < num_levels_; level++) { const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_; @@ -357,7 +392,8 @@ class VersionBuilder::Rep { } } - CheckConsistency(vstorage); + s = CheckConsistency(vstorage); + return s; } Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, @@ -475,23 +511,23 @@ VersionBuilder::VersionBuilder(const EnvOptions& env_options, VersionBuilder::~VersionBuilder() { delete rep_; } -void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) { - rep_->CheckConsistency(vstorage); +Status VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) { + return rep_->CheckConsistency(vstorage); } -void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit, - uint64_t number, int level) { - rep_->CheckConsistencyForDeletes(edit, number, level); +Status VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit, + uint64_t number, int level) { + return rep_->CheckConsistencyForDeletes(edit, number, level); } bool VersionBuilder::CheckConsistencyForNumLevels() { return rep_->CheckConsistencyForNumLevels(); } -void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); } +Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); } -void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { - rep_->SaveTo(vstorage); +Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { + return rep_->SaveTo(vstorage); } Status VersionBuilder::LoadTableHandlers( diff --git a/db/version_builder.h b/db/version_builder.h index 168301fdd..f5fd12189 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -27,12 +27,12 @@ class VersionBuilder { VersionBuilder(const EnvOptions& env_options, TableCache* table_cache, VersionStorageInfo* base_vstorage, Logger* info_log = nullptr); ~VersionBuilder(); - void CheckConsistency(VersionStorageInfo* vstorage); - void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number, - int level); + Status CheckConsistency(VersionStorageInfo* vstorage); + Status CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number, + int level); bool CheckConsistencyForNumLevels(); - void Apply(VersionEdit* edit); - void SaveTo(VersionStorageInfo* vstorage); + Status Apply(VersionEdit* edit); + Status SaveTo(VersionStorageInfo* vstorage); Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, bool prefetch_index_and_filter_in_cache, bool is_initial_load, diff --git a/db/version_set.cc b/db/version_set.cc index ea6b0ab6a..e7d334825 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3622,7 +3622,14 @@ Status VersionSet::ProcessManifestWrites( } else if (group_start != std::numeric_limits::max()) { group_start = std::numeric_limits::max(); } - LogAndApplyHelper(last_writer->cfd, builder, e, mu); + Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu); + if (!s.ok()) { + // free up the allocated memory + for (auto v : versions) { + delete v; + } + return s; + } batch_edits.push_back(e); } } @@ -3630,7 +3637,14 @@ Status VersionSet::ProcessManifestWrites( assert(!builder_guards.empty() && builder_guards.size() == versions.size()); auto* builder = builder_guards[i]->version_builder(); - builder->SaveTo(versions[i]->storage_info()); + Status s = builder->SaveTo(versions[i]->storage_info()); + if (!s.ok()) { + // free up the allocated memory + for (auto v : versions) { + delete v; + } + return s; + } } } @@ -4010,9 +4024,9 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { } } -void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, - VersionBuilder* builder, VersionEdit* edit, - InstrumentedMutex* mu) { +Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, + VersionBuilder* builder, VersionEdit* edit, + InstrumentedMutex* mu) { #ifdef NDEBUG (void)cfd; #endif @@ -4036,7 +4050,9 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_ : last_sequence_); - builder->Apply(edit); + Status s = builder->Apply(edit); + + return s; } Status VersionSet::ApplyOneVersionEditToBuilder( @@ -4129,7 +4145,10 @@ Status VersionSet::ApplyOneVersionEditToBuilder( // to builder auto builder = builders.find(edit.column_family_); assert(builder != builders.end()); - builder->second->version_builder()->Apply(&edit); + Status s = builder->second->version_builder()->Apply(&edit); + if (!s.ok()) { + return s; + } } return ExtractInfoFromVersionEdit( cfd, edit, have_log_number, log_number, have_prev_log_number, @@ -4748,7 +4767,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, // to builder auto builder = builders.find(edit.column_family_); assert(builder != builders.end()); - builder->second->version_builder()->Apply(&edit); + s = builder->second->version_builder()->Apply(&edit); + if (!s.ok()) { + break; + } } if (cfd != nullptr && edit.has_log_number_) { @@ -5767,7 +5789,10 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( } active_version_builders_.erase(builder_iter); } else { - builder->Apply(&edit); + Status s = builder->Apply(&edit); + if (!s.ok()) { + return s; + } } Status s = ExtractInfoFromVersionEdit( cfd, edit, have_log_number, log_number, have_prev_log_number, diff --git a/db/version_set.h b/db/version_set.h index 028c6ad1b..766e071f4 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1154,8 +1154,8 @@ class VersionSet { const ColumnFamilyOptions* new_cf_options); void LogAndApplyCFHelper(VersionEdit* edit); - void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, - VersionEdit* edit, InstrumentedMutex* mu); + Status LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, + VersionEdit* edit, InstrumentedMutex* mu); }; // ReactiveVersionSet represents a collection of versions of the column