diff --git a/HISTORY.md b/HISTORY.md index d9fbc10cf..dc8aa91e9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,6 +12,9 @@ ### New Features * DB identity (`db_id`) and DB session identity (`db_session_id`) are added to table properties and stored in SST files. SST files generated from SstFileWriter and Repairer have DB identity “SST Writer” and “DB Repairer”, respectively. Their DB session IDs are generated in the same way as `DB::GetDbSessionId`. The session ID for SstFileWriter (resp., Repairer) resets every time `SstFileWriter::Open` (resp., `Repairer::Run`) is called. +### Bug Fixes +* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further. + ## 6.11 (6/12/2020) ### Bug Fixes * Fix consistency checking error swallowing in some cases when options.force_consistency_checks = true. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 5d304fabf..cdef126aa 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2318,6 +2318,33 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) { } #endif // !ROCKSDB_LITE +TEST_F(DBBasicTest, ManifestChecksumMismatch) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + ASSERT_OK(Put("bar", "value")); + ASSERT_OK(Flush()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) { + auto* crc = reinterpret_cast(arg); + *crc = *crc + 1; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + WriteOptions write_opts; + write_opts.disableWAL = true; + Status s = db_->Put(write_opts, "foo", "value"); + ASSERT_OK(s); + ASSERT_OK(Flush()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + ASSERT_OK(Put("foo", "value1")); + ASSERT_OK(Flush()); + s = TryReopen(options); + ASSERT_TRUE(s.IsCorruption()); +} + class DBBasicTestMultiGet : public DBTestBase { public: DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache, diff --git a/db/log_writer.cc b/db/log_writer.cc index 04d3f64cc..e290eae62 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -147,6 +147,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { // Compute the crc of the record type and the payload. crc = crc32c::Extend(crc, ptr, n); crc = crc32c::Mask(crc); // Adjust for storage + TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", + &crc); EncodeFixed32(buf, crc); // Write the header and the payload diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 0d7ec8bc5..888e94101 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -27,12 +27,17 @@ VersionEditHandler::VersionEditHandler( assert(version_set_ != nullptr); } -Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) { +void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status, + std::string* db_id) { Slice record; std::string scratch; + assert(log_read_status); + assert(log_read_status->ok()); + size_t recovered_edits = 0; Status s = Initialize(); - while (reader.ReadRecord(&record, &scratch) && s.ok()) { + while (s.ok() && reader.ReadRecord(&record, &scratch) && + log_read_status->ok()) { VersionEdit edit; s = edit.DecodeFrom(record); if (!s.ok()) { @@ -70,13 +75,15 @@ Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) { } } } + if (!log_read_status->ok()) { + s = *log_read_status; + } CheckIterationResult(reader, &s); if (!s.ok()) { status_ = s; } - return s; } Status VersionEditHandler::Initialize() { diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index 7df940d6f..3c239bdf7 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -40,7 +40,8 @@ class VersionEditHandler { virtual ~VersionEditHandler() {} - Status Iterate(log::Reader& reader, std::string* db_id); + void Iterate(log::Reader& reader, Status* log_read_status, + std::string* db_id); const Status& status() const { return status_; } diff --git a/db/version_set.cc b/db/version_set.cc index 732fb175c..899e7125b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -10,6 +10,7 @@ #include "db/version_set.h" #include + #include #include #include @@ -19,6 +20,7 @@ #include #include #include + #include "compaction/compaction.h" #include "db/internal_stats.h" #include "db/log_reader.h" @@ -50,6 +52,7 @@ #include "table/table_reader.h" #include "table/two_level_iterator.h" #include "test_util/sync_point.h" +#include "util/cast_util.h" #include "util/coding.h" #include "util/stop_watch.h" #include "util/string_util.h" @@ -4444,24 +4447,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname, if (dbname.back() != '/') { manifest_path->push_back('/'); } - *manifest_path += fname; + manifest_path->append(fname); return Status::OK(); } Status VersionSet::ReadAndRecover( - log::Reader* reader, AtomicGroupReadBuffer* read_buffer, + log::Reader& reader, AtomicGroupReadBuffer* read_buffer, const std::unordered_map& name_to_options, std::unordered_map& column_families_not_found, std::unordered_map>& builders, - VersionEditParams* version_edit_params, std::string* db_id) { - assert(reader != nullptr); + Status* log_read_status, VersionEditParams* version_edit_params, + std::string* db_id) { assert(read_buffer != nullptr); + assert(log_read_status != nullptr); Status s; Slice record; std::string scratch; size_t recovered_edits = 0; - while (reader->ReadRecord(&record, &scratch) && s.ok()) { + while (s.ok() && reader.ReadRecord(&record, &scratch) && + log_read_status->ok()) { VersionEdit edit; s = edit.DecodeFrom(record); if (!s.ok()) { @@ -4505,6 +4510,9 @@ Status VersionSet::ReadAndRecover( } } } + if (!log_read_status->ok()) { + s = *log_read_status; + } if (!s.ok()) { // Clear the buffer if we fail to decode/apply an edit. read_buffer->Clear(); @@ -4551,8 +4559,7 @@ Status VersionSet::Recover( db_options_->log_readahead_size)); } - std::unordered_map> - builders; + VersionBuilderMap builders; // add default column family auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); @@ -4574,12 +4581,13 @@ Status VersionSet::Recover( VersionEditParams version_edit_params; { VersionSet::LogReporter reporter; - reporter.status = &s; + Status log_read_status; + reporter.status = &log_read_status; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, true /* checksum */, 0 /* log_number */); AtomicGroupReadBuffer read_buffer; - s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options, - column_families_not_found, builders, + s = ReadAndRecover(reader, &read_buffer, cf_name_to_options, + column_families_not_found, builders, &log_read_status, &version_edit_params, db_id); current_manifest_file_size = reader.GetReadOffset(); assert(current_manifest_file_size != 0); @@ -4845,21 +4853,20 @@ Status VersionSet::TryRecoverFromOneManifest( db_options_->log_readahead_size)); } + assert(s.ok()); VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, /*checksum=*/true, /*log_num=*/0); - { - VersionEditHandlerPointInTime handler_pit(read_only, column_families, - const_cast(this)); + VersionEditHandlerPointInTime handler_pit(read_only, column_families, + const_cast(this)); - s = handler_pit.Iterate(reader, db_id); + handler_pit.Iterate(reader, &s, db_id); - assert(nullptr != has_missing_table_file); - *has_missing_table_file = handler_pit.HasMissingFiles(); - } + assert(nullptr != has_missing_table_file); + *has_missing_table_file = handler_pit.HasMissingFiles(); - return s; + return handler_pit.status(); } Status VersionSet::ListColumnFamilies(std::vector* column_families, @@ -5980,8 +5987,7 @@ Status ReactiveVersionSet::Recover( // In recovery, nobody else can access it, so it's fine to set it to be // initialized earlier. default_cfd->set_initialized(); - std::unordered_map> - builders; + VersionBuilderMap builders; std::unordered_map column_families_not_found; builders.insert( std::make_pair(0, std::unique_ptr( @@ -5989,7 +5995,7 @@ Status ReactiveVersionSet::Recover( manifest_reader_status->reset(new Status()); manifest_reporter->reset(new LogReporter()); - static_cast(manifest_reporter->get())->status = + static_cast_with_check(manifest_reporter->get())->status = manifest_reader_status->get(); Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader); log::Reader* reader = manifest_reader->get(); @@ -5998,10 +6004,9 @@ Status ReactiveVersionSet::Recover( VersionEdit version_edit; while (s.ok() && retry < 1) { assert(reader != nullptr); - Slice record; - std::string scratch; - s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options, - column_families_not_found, builders, &version_edit); + s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options, + column_families_not_found, builders, + manifest_reader_status->get(), &version_edit); if (s.ok()) { bool enough = version_edit.has_next_file_number_ && version_edit.has_log_number_ && diff --git a/db/version_set.h b/db/version_set.h index bac1c50f8..d1766d0bf 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1165,6 +1165,10 @@ class VersionSet { void SetIOStatusOK() { io_status_ = IOStatus::OK(); } protected: + using VersionBuilderMap = + std::unordered_map>; + struct ManifestWriter; friend class Version; @@ -1176,7 +1180,9 @@ class VersionSet { struct LogReporter : public log::Reader::Reporter { Status* status; virtual void Corruption(size_t /*bytes*/, const Status& s) override { - if (this->status->ok()) *this->status = s; + if (status->ok()) { + *status = s; + } } }; @@ -1207,13 +1213,14 @@ class VersionSet { const VersionEdit* edit); Status ReadAndRecover( - log::Reader* reader, AtomicGroupReadBuffer* read_buffer, + log::Reader& reader, AtomicGroupReadBuffer* read_buffer, const std::unordered_map& name_to_options, std::unordered_map& column_families_not_found, std::unordered_map< uint32_t, std::unique_ptr>& builders, - VersionEditParams* version_edit, std::string* db_id = nullptr); + Status* log_read_status, VersionEditParams* version_edit, + std::string* db_id = nullptr); // REQUIRES db mutex Status ApplyOneVersionEditToBuilder( @@ -1342,8 +1349,7 @@ class ReactiveVersionSet : public VersionSet { std::unique_ptr* manifest_reader); private: - std::unordered_map> - active_version_builders_; + VersionBuilderMap active_version_builders_; AtomicGroupReadBuffer read_buffer_; // Number of version edits to skip by ReadAndApply at the beginning of a new // MANIFEST created by primary.