From b860a4215893c5862c7df1bbe14c30fc50af9b28 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Wed, 5 Jan 2022 16:00:41 -0800 Subject: [PATCH] Recover to exact latest seqno of data committed to MANIFEST (#9305) Summary: The LastSequence field in the MANIFEST file is the baseline seqno for a recovered DB. Recovering WAL entries might cause the recovered DB's seqno to advance above this baseline, but the recovered DB will never use a smaller seqno. Before this PR, we were writing the DB's seqno at the time of LogAndApply() as the LastSequence value. This works in the sense that it is a large enough baseline for the recovered DB that it'll never overwrite any records in existing SST files. At the same time, it's arbitrarily larger than what's needed. This behavior comes from LevelDB, where there was no tracking of largest seqno in an SST file. Now we know the largest seqno of newly written SST files, so we can write an exact value in LastSequence that actually reflects the largest seqno in any file referred to by the MANIFEST. This is primarily useful for correctness testing with unsynced data loss, where the recovered DB's seqno needs to indicate what records were recovered. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9305 Test Plan: - https://github.com/facebook/rocksdb/issues/9338 adds crash-recovery correctness testing coverage for WAL disabled use cases - https://github.com/facebook/rocksdb/issues/9357 will extend that testing to cover file ingestion - Added assertion at end of LogAndApply() for `VersionSet::descriptor_last_sequence_` consistency with files - Manually tested upgrade/downgrade compatibility with a custom crash test that randomly picks between a `db_stress` built with and without this PR (for old code it must run with `-disable_wal=0`) Reviewed By: riversand963 Differential Revision: D33182770 Pulled By: ajkr fbshipit-source-id: 0bfafaf685f347cc8cb0e1d62e0186340a738f7d --- db/version_edit.h | 6 ++ db/version_edit_handler.cc | 13 ++++ db/version_set.cc | 63 ++++++++++++++----- db/version_set.h | 9 ++- .../test/java/org/rocksdb/RocksDBTest.java | 2 +- 5 files changed, 73 insertions(+), 20 deletions(-) diff --git a/db/version_edit.h b/db/version_edit.h index 8dacf939c..4ba1639ac 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -418,11 +418,17 @@ class VersionEdit { temperature, oldest_blob_file_number, oldest_ancester_time, file_creation_time, file_checksum, file_checksum_func_name, min_timestamp, max_timestamp)); + if (!HasLastSequence() || largest_seqno > GetLastSequence()) { + SetLastSequence(largest_seqno); + } } void AddFile(int level, const FileMetaData& f) { assert(f.fd.smallest_seqno <= f.fd.largest_seqno); new_files_.emplace_back(level, f); + if (!HasLastSequence() || f.fd.largest_seqno > GetLastSequence()) { + SetLastSequence(f.fd.largest_seqno); + } } // Retrieve the table files added as well as their associated levels. diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 027813a18..c944a46ca 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -442,6 +442,14 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader, last_seq > version_set_->last_sequence_.load()) { version_set_->last_sequence_.store(last_seq); } + if (last_seq != kMaxSequenceNumber && + last_seq > version_set_->descriptor_last_sequence_) { + // This is the maximum last sequence of all `VersionEdit`s iterated. It + // may be greater than the maximum `largest_seqno` of all files in case + // the newest data referred to by the MANIFEST has been dropped or had its + // sequence number zeroed through compaction. + version_set_->descriptor_last_sequence_ = last_seq; + } version_set_->prev_log_number_ = version_edit_params_.prev_log_number_; } } @@ -597,6 +605,11 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, edit.min_log_number_to_keep_); } if (edit.has_last_sequence_) { + // `VersionEdit::last_sequence_`s are assumed to be non-decreasing. This + // is legacy behavior that cannot change without breaking downgrade + // compatibility. + assert(!version_edit_params_.has_last_sequence_ || + version_edit_params_.last_sequence_ <= edit.last_sequence_); version_edit_params_.SetLastSequence(edit.last_sequence_); } if (!version_edit_params_.has_prev_log_number_) { diff --git a/db/version_set.cc b/db/version_set.cc index b240711e6..0f96848b1 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4158,9 +4158,16 @@ Status VersionSet::ProcessManifestWrites( autovector mutable_cf_options_ptrs; std::vector> builder_guards; + // Tracking `max_last_sequence` is needed to ensure we write + // `VersionEdit::last_sequence_`s in non-decreasing order according to the + // recovery code's requirement. It also allows us to defer updating + // `descriptor_last_sequence_` until the apply phase, after the log phase + // succeeds. + SequenceNumber max_last_sequence = descriptor_last_sequence_; + if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) { // No group commits for column family add or drop - LogAndApplyCFHelper(first_writer.edit_list.front()); + LogAndApplyCFHelper(first_writer.edit_list.front(), &max_last_sequence); batch_edits.push_back(first_writer.edit_list.front()); } else { auto it = manifest_writers_.cbegin(); @@ -4248,7 +4255,8 @@ Status VersionSet::ProcessManifestWrites( } else if (group_start != std::numeric_limits::max()) { group_start = std::numeric_limits::max(); } - Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu); + Status s = LogAndApplyHelper(last_writer->cfd, builder, e, + &max_last_sequence, mu); if (!s.ok()) { // free up the allocated memory for (auto v : versions) { @@ -4520,9 +4528,11 @@ Status VersionSet::ProcessManifestWrites( if (first_writer.edit_list.front()->is_column_family_add_) { assert(batch_edits.size() == 1); assert(new_cf_options != nullptr); + assert(max_last_sequence == descriptor_last_sequence_); CreateColumnFamily(*new_cf_options, first_writer.edit_list.front()); } else if (first_writer.edit_list.front()->is_column_family_drop_) { assert(batch_edits.size() == 1); + assert(max_last_sequence == descriptor_last_sequence_); first_writer.cfd->SetDropped(); first_writer.cfd->UnrefAndTryDelete(); } else { @@ -4562,6 +4572,8 @@ Status VersionSet::ProcessManifestWrites( AppendVersion(cfd, versions[i]); } } + assert(max_last_sequence >= descriptor_last_sequence_); + descriptor_last_sequence_ = max_last_sequence; manifest_file_number_ = pending_manifest_file_number_; manifest_file_size_ = new_manifest_file_size; prev_log_number_ = first_writer.edit_list.front()->prev_log_number_; @@ -4628,6 +4640,23 @@ Status VersionSet::ProcessManifestWrites( pending_manifest_file_number_ = 0; +#ifndef NDEBUG + // This is here kind of awkwardly because there's no other consistency + // checks on `VersionSet`'s updates for the new `Version`s. We might want + // to move it to a dedicated function, or remove it if we gain enough + // confidence in `descriptor_last_sequence_`. + if (s.ok()) { + for (const auto* v : versions) { + const auto* vstorage = v->storage_info(); + for (int level = 0; level < vstorage->num_levels(); ++level) { + for (const auto& file : vstorage->LevelFiles(level)) { + assert(file->fd.largest_seqno <= descriptor_last_sequence_); + } + } + } + } +#endif // NDEBUG + // wake up all the waiting writers while (true) { ManifestWriter* ready = manifest_writers_.front(); @@ -4751,16 +4780,13 @@ Status VersionSet::LogAndApply( new_cf_options); } -void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { +void VersionSet::LogAndApplyCFHelper(VersionEdit* edit, + SequenceNumber* max_last_sequence) { + assert(max_last_sequence != nullptr); assert(edit->IsColumnFamilyManipulation()); edit->SetNextFile(next_file_number_.load()); - // The log might have data that is not visible to memtbale and hence have not - // updated the last_sequence_ yet. It is also possible that the log has is - // expecting some new data that is not written yet. Since LastSequence is an - // upper bound on the sequence, it is ok to record - // last_allocated_sequence_ as the last sequence. - edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_ - : last_sequence_); + assert(!edit->HasLastSequence()); + edit->SetLastSequence(*max_last_sequence); if (edit->is_column_family_drop_) { // if we drop column family, we have to make sure to save max column family, // so that we don't reuse existing ID @@ -4770,12 +4796,14 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* builder, VersionEdit* edit, + SequenceNumber* max_last_sequence, InstrumentedMutex* mu) { #ifdef NDEBUG (void)cfd; #endif mu->AssertHeld(); assert(!edit->IsColumnFamilyManipulation()); + assert(max_last_sequence != nullptr); if (edit->has_log_number_) { assert(edit->log_number_ >= cfd->GetLogNumber()); @@ -4786,13 +4814,11 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, edit->SetPrevLogNumber(prev_log_number_); } edit->SetNextFile(next_file_number_.load()); - // The log might have data that is not visible to memtbale and hence have not - // updated the last_sequence_ yet. It is also possible that the log has is - // expecting some new data that is not written yet. Since LastSequence is an - // upper bound on the sequence, it is ok to record - // last_allocated_sequence_ as the last sequence. - edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_ - : last_sequence_); + if (edit->HasLastSequence() && edit->GetLastSequence() > *max_last_sequence) { + *max_last_sequence = edit->GetLastSequence(); + } else { + edit->SetLastSequence(*max_last_sequence); + } // The builder can be nullptr only if edit is WAL manipulation, // because WAL edits do not need to be applied to versions, @@ -5431,6 +5457,9 @@ Status VersionSet::WriteCurrentStateToManifest( if (!full_history_ts_low.empty()) { edit.SetFullHistoryTsLow(full_history_ts_low); } + + edit.SetLastSequence(descriptor_last_sequence_); + std::string record; if (!edit.EncodeTo(&record)) { return Status::Corruption( diff --git a/db/version_set.h b/db/version_set.h index f6072f659..e9c393413 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1386,6 +1386,9 @@ class VersionSet { // the memtable but when using two write queues it could also indicate the // last sequence in the WAL visible to reads. std::atomic last_sequence_; + // The last sequence number of data committed to the descriptor (manifest + // file). + SequenceNumber descriptor_last_sequence_ = 0; // The last seq that is already allocated. It is applicable only when we have // two write queues. In that case seq might or might not have appreated in // memtable but it is expected to appear in the WAL. @@ -1434,9 +1437,11 @@ class VersionSet { bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options); - void LogAndApplyCFHelper(VersionEdit* edit); + void LogAndApplyCFHelper(VersionEdit* edit, + SequenceNumber* max_last_sequence); Status LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, - VersionEdit* edit, InstrumentedMutex* mu); + VersionEdit* edit, SequenceNumber* max_last_sequence, + InstrumentedMutex* mu); }; // ReactiveVersionSet represents a collection of versions of the column diff --git a/java/src/test/java/org/rocksdb/RocksDBTest.java b/java/src/test/java/org/rocksdb/RocksDBTest.java index 20588084c..f7c1290cd 100644 --- a/java/src/test/java/org/rocksdb/RocksDBTest.java +++ b/java/src/test/java/org/rocksdb/RocksDBTest.java @@ -1476,7 +1476,7 @@ public class RocksDBTest { try (final RocksDB db = RocksDB.open(options, dbPath)) { final RocksDB.LiveFiles livefiles = db.getLiveFiles(true); assertThat(livefiles).isNotNull(); - assertThat(livefiles.manifestFileSize).isEqualTo(57); + assertThat(livefiles.manifestFileSize).isEqualTo(59); assertThat(livefiles.files.size()).isEqualTo(3); assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT"); assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004");