diff --git a/db/version_edit.h b/db/version_edit.h index 8dacf939c..4ba1639ac 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -418,11 +418,17 @@ class VersionEdit { temperature, oldest_blob_file_number, oldest_ancester_time, file_creation_time, file_checksum, file_checksum_func_name, min_timestamp, max_timestamp)); + if (!HasLastSequence() || largest_seqno > GetLastSequence()) { + SetLastSequence(largest_seqno); + } } void AddFile(int level, const FileMetaData& f) { assert(f.fd.smallest_seqno <= f.fd.largest_seqno); new_files_.emplace_back(level, f); + if (!HasLastSequence() || f.fd.largest_seqno > GetLastSequence()) { + SetLastSequence(f.fd.largest_seqno); + } } // Retrieve the table files added as well as their associated levels. diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 027813a18..c944a46ca 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -442,6 +442,14 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader, last_seq > version_set_->last_sequence_.load()) { version_set_->last_sequence_.store(last_seq); } + if (last_seq != kMaxSequenceNumber && + last_seq > version_set_->descriptor_last_sequence_) { + // This is the maximum last sequence of all `VersionEdit`s iterated. It + // may be greater than the maximum `largest_seqno` of all files in case + // the newest data referred to by the MANIFEST has been dropped or had its + // sequence number zeroed through compaction. + version_set_->descriptor_last_sequence_ = last_seq; + } version_set_->prev_log_number_ = version_edit_params_.prev_log_number_; } } @@ -597,6 +605,11 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, edit.min_log_number_to_keep_); } if (edit.has_last_sequence_) { + // `VersionEdit::last_sequence_`s are assumed to be non-decreasing. This + // is legacy behavior that cannot change without breaking downgrade + // compatibility. + assert(!version_edit_params_.has_last_sequence_ || + version_edit_params_.last_sequence_ <= edit.last_sequence_); version_edit_params_.SetLastSequence(edit.last_sequence_); } if (!version_edit_params_.has_prev_log_number_) { diff --git a/db/version_set.cc b/db/version_set.cc index b240711e6..0f96848b1 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4158,9 +4158,16 @@ Status VersionSet::ProcessManifestWrites( autovector mutable_cf_options_ptrs; std::vector> builder_guards; + // Tracking `max_last_sequence` is needed to ensure we write + // `VersionEdit::last_sequence_`s in non-decreasing order according to the + // recovery code's requirement. It also allows us to defer updating + // `descriptor_last_sequence_` until the apply phase, after the log phase + // succeeds. + SequenceNumber max_last_sequence = descriptor_last_sequence_; + if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) { // No group commits for column family add or drop - LogAndApplyCFHelper(first_writer.edit_list.front()); + LogAndApplyCFHelper(first_writer.edit_list.front(), &max_last_sequence); batch_edits.push_back(first_writer.edit_list.front()); } else { auto it = manifest_writers_.cbegin(); @@ -4248,7 +4255,8 @@ Status VersionSet::ProcessManifestWrites( } else if (group_start != std::numeric_limits::max()) { group_start = std::numeric_limits::max(); } - Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu); + Status s = LogAndApplyHelper(last_writer->cfd, builder, e, + &max_last_sequence, mu); if (!s.ok()) { // free up the allocated memory for (auto v : versions) { @@ -4520,9 +4528,11 @@ Status VersionSet::ProcessManifestWrites( if (first_writer.edit_list.front()->is_column_family_add_) { assert(batch_edits.size() == 1); assert(new_cf_options != nullptr); + assert(max_last_sequence == descriptor_last_sequence_); CreateColumnFamily(*new_cf_options, first_writer.edit_list.front()); } else if (first_writer.edit_list.front()->is_column_family_drop_) { assert(batch_edits.size() == 1); + assert(max_last_sequence == descriptor_last_sequence_); first_writer.cfd->SetDropped(); first_writer.cfd->UnrefAndTryDelete(); } else { @@ -4562,6 +4572,8 @@ Status VersionSet::ProcessManifestWrites( AppendVersion(cfd, versions[i]); } } + assert(max_last_sequence >= descriptor_last_sequence_); + descriptor_last_sequence_ = max_last_sequence; manifest_file_number_ = pending_manifest_file_number_; manifest_file_size_ = new_manifest_file_size; prev_log_number_ = first_writer.edit_list.front()->prev_log_number_; @@ -4628,6 +4640,23 @@ Status VersionSet::ProcessManifestWrites( pending_manifest_file_number_ = 0; +#ifndef NDEBUG + // This is here kind of awkwardly because there's no other consistency + // checks on `VersionSet`'s updates for the new `Version`s. We might want + // to move it to a dedicated function, or remove it if we gain enough + // confidence in `descriptor_last_sequence_`. + if (s.ok()) { + for (const auto* v : versions) { + const auto* vstorage = v->storage_info(); + for (int level = 0; level < vstorage->num_levels(); ++level) { + for (const auto& file : vstorage->LevelFiles(level)) { + assert(file->fd.largest_seqno <= descriptor_last_sequence_); + } + } + } + } +#endif // NDEBUG + // wake up all the waiting writers while (true) { ManifestWriter* ready = manifest_writers_.front(); @@ -4751,16 +4780,13 @@ Status VersionSet::LogAndApply( new_cf_options); } -void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { +void VersionSet::LogAndApplyCFHelper(VersionEdit* edit, + SequenceNumber* max_last_sequence) { + assert(max_last_sequence != nullptr); assert(edit->IsColumnFamilyManipulation()); edit->SetNextFile(next_file_number_.load()); - // The log might have data that is not visible to memtbale and hence have not - // updated the last_sequence_ yet. It is also possible that the log has is - // expecting some new data that is not written yet. Since LastSequence is an - // upper bound on the sequence, it is ok to record - // last_allocated_sequence_ as the last sequence. - edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_ - : last_sequence_); + assert(!edit->HasLastSequence()); + edit->SetLastSequence(*max_last_sequence); if (edit->is_column_family_drop_) { // if we drop column family, we have to make sure to save max column family, // so that we don't reuse existing ID @@ -4770,12 +4796,14 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* builder, VersionEdit* edit, + SequenceNumber* max_last_sequence, InstrumentedMutex* mu) { #ifdef NDEBUG (void)cfd; #endif mu->AssertHeld(); assert(!edit->IsColumnFamilyManipulation()); + assert(max_last_sequence != nullptr); if (edit->has_log_number_) { assert(edit->log_number_ >= cfd->GetLogNumber()); @@ -4786,13 +4814,11 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, edit->SetPrevLogNumber(prev_log_number_); } edit->SetNextFile(next_file_number_.load()); - // The log might have data that is not visible to memtbale and hence have not - // updated the last_sequence_ yet. It is also possible that the log has is - // expecting some new data that is not written yet. Since LastSequence is an - // upper bound on the sequence, it is ok to record - // last_allocated_sequence_ as the last sequence. - edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_ - : last_sequence_); + if (edit->HasLastSequence() && edit->GetLastSequence() > *max_last_sequence) { + *max_last_sequence = edit->GetLastSequence(); + } else { + edit->SetLastSequence(*max_last_sequence); + } // The builder can be nullptr only if edit is WAL manipulation, // because WAL edits do not need to be applied to versions, @@ -5431,6 +5457,9 @@ Status VersionSet::WriteCurrentStateToManifest( if (!full_history_ts_low.empty()) { edit.SetFullHistoryTsLow(full_history_ts_low); } + + edit.SetLastSequence(descriptor_last_sequence_); + std::string record; if (!edit.EncodeTo(&record)) { return Status::Corruption( diff --git a/db/version_set.h b/db/version_set.h index f6072f659..e9c393413 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1386,6 +1386,9 @@ class VersionSet { // the memtable but when using two write queues it could also indicate the // last sequence in the WAL visible to reads. std::atomic last_sequence_; + // The last sequence number of data committed to the descriptor (manifest + // file). + SequenceNumber descriptor_last_sequence_ = 0; // The last seq that is already allocated. It is applicable only when we have // two write queues. In that case seq might or might not have appreated in // memtable but it is expected to appear in the WAL. @@ -1434,9 +1437,11 @@ class VersionSet { bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options); - void LogAndApplyCFHelper(VersionEdit* edit); + void LogAndApplyCFHelper(VersionEdit* edit, + SequenceNumber* max_last_sequence); Status LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, - VersionEdit* edit, InstrumentedMutex* mu); + VersionEdit* edit, SequenceNumber* max_last_sequence, + InstrumentedMutex* mu); }; // ReactiveVersionSet represents a collection of versions of the column diff --git a/java/src/test/java/org/rocksdb/RocksDBTest.java b/java/src/test/java/org/rocksdb/RocksDBTest.java index 20588084c..f7c1290cd 100644 --- a/java/src/test/java/org/rocksdb/RocksDBTest.java +++ b/java/src/test/java/org/rocksdb/RocksDBTest.java @@ -1476,7 +1476,7 @@ public class RocksDBTest { try (final RocksDB db = RocksDB.open(options, dbPath)) { final RocksDB.LiveFiles livefiles = db.getLiveFiles(true); assertThat(livefiles).isNotNull(); - assertThat(livefiles.manifestFileSize).isEqualTo(57); + assertThat(livefiles.manifestFileSize).isEqualTo(59); assertThat(livefiles.files.size()).isEqualTo(3); assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT"); assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004");