diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index f0737cafb..5a1fde9aa 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1283,7 +1283,11 @@ Status DBImpl::SyncWAL() { TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); { InstrumentedMutexLock l(&mutex_); - MarkLogsSynced(current_log_number, need_log_dir_sync, status); + if (status.ok()) { + status = MarkLogsSynced(current_log_number, need_log_dir_sync); + } else { + MarkLogsNotSynced(current_log_number); + } } TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); @@ -1309,27 +1313,53 @@ Status DBImpl::UnlockWAL() { return Status::OK(); } -void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, - const Status& status) { +Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) { mutex_.AssertHeld(); - if (synced_dir && logfile_number_ == up_to && status.ok()) { + if (synced_dir && logfile_number_ == up_to) { log_dir_synced_ = true; } + VersionEdit synced_wals; for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { - auto& log = *it; - assert(log.getting_synced); - if (status.ok() && logs_.size() > 1) { - logs_to_free_.push_back(log.ReleaseWriter()); + auto& wal = *it; + assert(wal.getting_synced); + if (logs_.size() > 1) { + if (immutable_db_options_.track_and_verify_wals_in_manifest) { + synced_wals.AddWal(wal.number, + WalMetadata(wal.writer->file()->GetFileSize())); + } + logs_to_free_.push_back(wal.ReleaseWriter()); // To modify logs_ both mutex_ and log_write_mutex_ must be held InstrumentedMutexLock l(&log_write_mutex_); it = logs_.erase(it); } else { - log.getting_synced = false; + wal.getting_synced = false; ++it; } } - assert(!status.ok() || logs_.empty() || logs_[0].number > up_to || + assert(logs_.empty() || logs_[0].number > up_to || (logs_.size() == 1 && !logs_[0].getting_synced)); + + Status s; + if (synced_wals.IsWalAddition()) { + // not empty, write to MANIFEST. + s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_); + if (!s.ok() && versions_->io_status().IsIOError()) { + s = error_handler_.SetBGError(versions_->io_status(), + BackgroundErrorReason::kManifestWrite); + } + } + log_sync_cv_.SignalAll(); + return s; +} + +void DBImpl::MarkLogsNotSynced(uint64_t up_to) { + mutex_.AssertHeld(); + for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to; + ++it) { + auto& wal = *it; + assert(wal.getting_synced); + wal.getting_synced = false; + } log_sync_cv_.SignalAll(); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 68b053f31..1ffff8fa6 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1702,7 +1702,9 @@ class DBImpl : public DB { std::unique_ptr* token, LogBuffer* log_buffer); // helper function to call after some of the logs_ were synced - void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status); + Status MarkLogsSynced(uint64_t up_to, bool synced_dir); + // WALs with log number up to up_to are not synced successfully. + void MarkLogsNotSynced(uint64_t up_to); SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary, bool lock = true); @@ -2204,12 +2206,18 @@ extern CompressionType GetCompressionFlush( // `memtables_to_flush`) will be flushed and thus will not depend on any WAL // file. // The function is only applicable to 2pc mode. -extern uint64_t PrecomputeMinLogNumberToKeep( +extern uint64_t PrecomputeMinLogNumberToKeep2PC( VersionSet* vset, const ColumnFamilyData& cfd_to_flush, - autovector edit_list, + const autovector& edit_list, const autovector& memtables_to_flush, LogsWithPrepTracker* prep_tracker); +// In non-2PC mode, WALs with log number < the returned number can be +// deleted after the cfd_to_flush column family is flushed successfully. +extern uint64_t PrecomputeMinLogNumberToKeepNon2PC( + VersionSet* vset, const ColumnFamilyData& cfd_to_flush, + const autovector& edit_list); + // `cfd_to_flush` is the column family whose memtable will be flushed and thus // will not depend on any WAL file. nullptr means no memtable is being flushed. // The function is only applicable to 2pc mode. diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 742f8882f..518cabf11 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -123,7 +123,11 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { // "number <= current_log_number - 1" is equivalent to // "number < current_log_number". - MarkLogsSynced(current_log_number - 1, true, io_s); + if (io_s.ok()) { + io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true)); + } else { + MarkLogsNotSynced(current_log_number - 1); + } if (!io_s.ok()) { if (total_log_size_ > 0) { error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush) diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 72faf8a03..d825c406c 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -680,16 +680,10 @@ uint64_t FindMinPrepLogReferencedByMemTable( return min_log; } -uint64_t PrecomputeMinLogNumberToKeep( +uint64_t PrecomputeMinLogNumberToKeepNon2PC( VersionSet* vset, const ColumnFamilyData& cfd_to_flush, - autovector edit_list, - const autovector& memtables_to_flush, - LogsWithPrepTracker* prep_tracker) { + const autovector& edit_list) { assert(vset != nullptr); - assert(prep_tracker != nullptr); - // Calculate updated min_log_number_to_keep - // Since the function should only be called in 2pc mode, log number in - // the version edit should be sufficient. // Precompute the min log number containing unflushed data for the column // family being flushed (`cfd_to_flush`). @@ -713,6 +707,22 @@ uint64_t PrecomputeMinLogNumberToKeep( min_log_number_to_keep = std::min(cf_min_log_number_to_keep, min_log_number_to_keep); } + return min_log_number_to_keep; +} + +uint64_t PrecomputeMinLogNumberToKeep2PC( + VersionSet* vset, const ColumnFamilyData& cfd_to_flush, + const autovector& edit_list, + const autovector& memtables_to_flush, + LogsWithPrepTracker* prep_tracker) { + assert(vset != nullptr); + assert(prep_tracker != nullptr); + // Calculate updated min_log_number_to_keep + // Since the function should only be called in 2pc mode, log number in + // the version edit should be sufficient. + + uint64_t min_log_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, cfd_to_flush, edit_list); // if are 2pc we must consider logs containing prepared // sections of outstanding transactions. diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 8f660924e..90759e9c5 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -589,18 +589,20 @@ Status DBImpl::Recover( } if (immutable_db_options_.track_and_verify_wals_in_manifest) { - // Verify WALs in MANIFEST. - s = versions_->GetWalSet().CheckWals(env_, wal_files); + if (!immutable_db_options_.best_efforts_recovery) { + // Verify WALs in MANIFEST. + s = versions_->GetWalSet().CheckWals(env_, wal_files); + } // else since best effort recovery does not recover from WALs, no need + // to check WALs. } else if (!versions_->GetWalSet().GetWals().empty()) { // Tracking is disabled, clear previously tracked WALs from MANIFEST, // otherwise, in the future, if WAL tracking is enabled again, // since the WALs deleted when WAL tracking is disabled are not persisted // into MANIFEST, WAL check may fail. VersionEdit edit; - for (const auto& wal : versions_->GetWalSet().GetWals()) { - WalNumber number = wal.first; - edit.DeleteWal(number); - } + WalNumber max_wal_number = + versions_->GetWalSet().GetWals().rbegin()->first; + edit.DeleteWalsBefore(max_wal_number + 1); s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_); } if (!s.ok()) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 5d4f88f0b..1cab2b6c0 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -426,7 +426,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (need_log_sync) { mutex_.Lock(); - MarkLogsSynced(logfile_number_, need_log_dir_sync, status); + if (status.ok()) { + status = MarkLogsSynced(logfile_number_, need_log_dir_sync); + } else { + MarkLogsNotSynced(logfile_number_); + } mutex_.Unlock(); // Requesting sync with two_write_queues_ is expected to be very rare. We // hence provide a simple implementation that is not necessarily efficient. @@ -551,7 +555,11 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, if (need_log_sync) { mutex_.Lock(); - MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status); + if (w.status.ok()) { + w.status = MarkLogsSynced(logfile_number_, need_log_dir_sync); + } else { + MarkLogsNotSynced(logfile_number_); + } mutex_.Unlock(); } diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index 9428ea6c6..403b221e6 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -159,7 +159,7 @@ TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) { // Want max_compaction_bytes to trigger the end of compaction output file, not // target_file_size_base, so make the latter much bigger opts.target_file_size_base = 100 * opts.max_compaction_bytes; - Reopen(opts); + DestroyAndReopen(opts); // snapshot protects range tombstone from dropping due to becoming obsolete. const Snapshot* snapshot = db_->GetSnapshot(); diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 0d52eebd9..42c557643 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -340,6 +340,7 @@ Options DBTestBase::GetDefaultOptions() const { options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords; options.compaction_pri = CompactionPri::kByCompensatedSize; options.env = env_; + options.track_and_verify_wals_in_manifest = true; return options; } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 1e1758d59..77c2ba51e 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -473,12 +473,27 @@ Status MemTableList::TryInstallMemtableFlushResults( // TODO(myabandeh): Not sure how batch_count could be 0 here. if (batch_count > 0) { + uint64_t min_wal_number_to_keep = 0; if (vset->db_options()->allow_2pc) { assert(edit_list.size() > 0); + min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( + vset, *cfd, edit_list, memtables_to_flush, prep_tracker); // We piggyback the information of earliest log file to keep in the // manifest entry for the last file flushed. - edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep( - vset, *cfd, edit_list, memtables_to_flush, prep_tracker)); + edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep); + } else { + min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list); + } + + std::unique_ptr wal_deletion; + if (vset->db_options()->track_and_verify_wals_in_manifest) { + const auto& wals = vset->GetWalSet().GetWals(); + if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) { + wal_deletion.reset(new VersionEdit); + wal_deletion->DeleteWalsBefore(min_wal_number_to_keep); + edit_list.push_back(wal_deletion.get()); + } } const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, @@ -704,6 +719,10 @@ Status InstallMemtableAtomicFlushResults( if (imm_lists != nullptr) { assert(imm_lists->size() == num); } + if (num == 0) { + return Status::OK(); + } + for (size_t k = 0; k != num; ++k) { #ifndef NDEBUG const auto* imm = @@ -732,12 +751,36 @@ Status InstallMemtableAtomicFlushResults( ++num_entries; edit_lists.emplace_back(edits); } + + // TODO(cc): after https://github.com/facebook/rocksdb/pull/7570, handle 2pc + // here. + std::unique_ptr wal_deletion; + if (vset->db_options()->track_and_verify_wals_in_manifest) { + uint64_t min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[0], edit_lists[0]); + for (size_t i = 1; i < cfds.size(); i++) { + min_wal_number_to_keep = std::min( + min_wal_number_to_keep, + PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[i], edit_lists[i])); + } + const auto& wals = vset->GetWalSet().GetWals(); + if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) { + wal_deletion.reset(new VersionEdit); + wal_deletion->DeleteWalsBefore(min_wal_number_to_keep); + edit_lists.back().push_back(wal_deletion.get()); + ++num_entries; + } + } + // Mark the version edits as an atomic group if the number of version edits // exceeds 1. if (cfds.size() > 1) { - for (auto& edits : edit_lists) { - assert(edits.size() == 1); - edits[0]->MarkAtomicGroup(--num_entries); + for (size_t i = 0; i < edit_lists.size(); i++) { + assert((edit_lists[i].size() == 1) || + ((edit_lists[i].size() == 2) && (i == edit_lists.size() - 1))); + for (auto& e : edit_lists[i]) { + e->MarkAtomicGroup(--num_entries); + } } assert(0 == num_entries); } diff --git a/db/version_edit.cc b/db/version_edit.cc index 8879f0e1b..8f4fb5766 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -89,7 +89,7 @@ void VersionEdit::Clear() { blob_file_additions_.clear(); blob_file_garbages_.clear(); wal_additions_.clear(); - wal_deletions_.clear(); + wal_deletion_.Reset(); column_family_ = 0; is_column_family_add_ = false; is_column_family_drop_ = false; @@ -229,9 +229,9 @@ bool VersionEdit::EncodeTo(std::string* dst) const { wal_addition.EncodeTo(dst); } - for (const auto& wal_deletion : wal_deletions_) { + if (!wal_deletion_.IsEmpty()) { PutVarint32(dst, kWalDeletion); - wal_deletion.EncodeTo(dst); + wal_deletion_.EncodeTo(dst); } // 0 is default and does not need to be explicitly written @@ -576,7 +576,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) { return s; } - wal_deletions_.emplace_back(std::move(wal_deletion)); + wal_deletion_ = std::move(wal_deletion); break; } @@ -725,9 +725,9 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append(wal_addition.DebugString()); } - for (const auto& wal_deletion : wal_deletions_) { + if (!wal_deletion_.IsEmpty()) { r.append("\n WalDeletion: "); - r.append(wal_deletion.DebugString()); + r.append(wal_deletion_.DebugString()); } r.append("\n ColumnFamily: "); @@ -854,18 +854,11 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { jw.EndArray(); } - if (!wal_deletions_.empty()) { - jw << "WalDeletions"; - - jw.StartArray(); - - for (const auto& wal_deletion : wal_deletions_) { - jw.StartArrayedObject(); - jw << wal_deletion; - jw.EndArrayedObject(); - } - - jw.EndArray(); + if (!wal_deletion_.IsEmpty()) { + jw << "WalDeletion"; + jw.StartObject(); + jw << wal_deletion_; + jw.EndObject(); } jw << "ColumnFamily" << column_family_; diff --git a/db/version_edit.h b/db/version_edit.h index ff6a16c49..9193a8369 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -452,6 +452,7 @@ class VersionEdit { } // Add a WAL (either just created or closed). + // AddWal and DeleteWalsBefore cannot be called on the same VersionEdit. void AddWal(WalNumber number, WalMetadata metadata = WalMetadata()) { assert(NumEntries() == wal_additions_.size()); wal_additions_.emplace_back(number, std::move(metadata)); @@ -463,22 +464,27 @@ class VersionEdit { bool IsWalAddition() const { return !wal_additions_.empty(); } // Delete a WAL (either directly deleted or archived). - void DeleteWal(WalNumber number) { - assert(NumEntries() == wal_deletions_.size()); - wal_deletions_.emplace_back(number); + // AddWal and DeleteWalsBefore cannot be called on the same VersionEdit. + void DeleteWalsBefore(WalNumber number) { + assert((NumEntries() == 1) == !wal_deletion_.IsEmpty()); + wal_deletion_ = WalDeletion(number); } - const WalDeletions& GetWalDeletions() const { return wal_deletions_; } + const WalDeletion& GetWalDeletion() const { return wal_deletion_; } - bool IsWalDeletion() const { return !wal_deletions_.empty(); } + bool IsWalDeletion() const { return !wal_deletion_.IsEmpty(); } - bool IsWalManipulation() const { return IsWalAddition() || IsWalDeletion(); } + bool IsWalManipulation() const { + size_t entries = NumEntries(); + return (entries > 0) && ((entries == wal_additions_.size()) || + (entries == !wal_deletion_.IsEmpty())); + } // Number of edits size_t NumEntries() const { return new_files_.size() + deleted_files_.size() + blob_file_additions_.size() + blob_file_garbages_.size() + - wal_additions_.size() + wal_deletions_.size(); + wal_additions_.size() + !wal_deletion_.IsEmpty(); } void SetColumnFamily(uint32_t column_family_id) { @@ -563,7 +569,7 @@ class VersionEdit { BlobFileGarbages blob_file_garbages_; WalAdditions wal_additions_; - WalDeletions wal_deletions_; + WalDeletion wal_deletion_; // Each version edit record should have column_family_ set // If it's not set, it is default (0) diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index a1ce12d1b..c5924f065 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -201,7 +201,8 @@ Status VersionEditHandler::OnWalAddition(VersionEdit& edit) { Status VersionEditHandler::OnWalDeletion(VersionEdit& edit) { assert(edit.IsWalDeletion()); - return version_set_->wals_.DeleteWals(edit.GetWalDeletions()); + return version_set_->wals_.DeleteWalsBefore( + edit.GetWalDeletion().GetLogNumber()); } Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit, diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 7236e0a2d..88e98606a 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -470,9 +470,7 @@ TEST_F(VersionEditTest, AddWalDebug) { TEST_F(VersionEditTest, DeleteWalEncodeDecode) { VersionEdit edit; - for (uint64_t log_number = 1; log_number <= 20; log_number++) { - edit.DeleteWal(log_number); - } + edit.DeleteWalsBefore(rand() % 100); TestEncodeDecode(edit); } @@ -481,36 +479,29 @@ TEST_F(VersionEditTest, DeleteWalDebug) { constexpr std::array kLogNumbers{{10, 20}}; VersionEdit edit; - for (int i = 0; i < n; i++) { - edit.DeleteWal(kLogNumbers[i]); - } + edit.DeleteWalsBefore(kLogNumbers[n - 1]); - const WalDeletions& wals = edit.GetWalDeletions(); + const WalDeletion& wal = edit.GetWalDeletion(); ASSERT_TRUE(edit.IsWalDeletion()); - ASSERT_EQ(wals.size(), n); - for (int i = 0; i < n; i++) { - const WalDeletion& wal = wals[i]; - ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[i]); - } + ASSERT_EQ(wal.GetLogNumber(), kLogNumbers[n - 1]); std::string expected_str = "VersionEdit {\n"; - for (int i = 0; i < n; i++) { + { std::stringstream ss; - ss << " WalDeletion: log_number: " << kLogNumbers[i] << "\n"; + ss << " WalDeletion: log_number: " << kLogNumbers[n - 1] << "\n"; expected_str += ss.str(); } expected_str += " ColumnFamily: 0\n}\n"; ASSERT_EQ(edit.DebugString(true), expected_str); - std::string expected_json = "{\"EditNumber\": 4, \"WalDeletions\": ["; - for (int i = 0; i < n; i++) { + std::string expected_json = "{\"EditNumber\": 4, \"WalDeletion\": "; + { std::stringstream ss; - ss << "{\"LogNumber\": " << kLogNumbers[i] << "}"; - if (i < n - 1) ss << ", "; + ss << "{\"LogNumber\": " << kLogNumbers[n - 1] << "}"; expected_json += ss.str(); } - expected_json += "], \"ColumnFamily\": 0}"; + expected_json += ", \"ColumnFamily\": 0}"; ASSERT_EQ(edit.DebugJSON(4, true), expected_json); } diff --git a/db/version_set.cc b/db/version_set.cc index 0fe09ef34..f15b94e77 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4179,7 +4179,7 @@ Status VersionSet::ProcessManifestWrites( if (e->IsWalAddition()) { s = wals_.AddWals(e->GetWalAdditions()); } else if (e->IsWalDeletion()) { - s = wals_.DeleteWals(e->GetWalDeletions()); + s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber()); } if (!s.ok()) { break; @@ -4527,7 +4527,7 @@ Status VersionSet::ApplyOneVersionEditToBuilder( return s; } } else if (edit.IsWalDeletion()) { - Status s = wals_.DeleteWals(edit.GetWalDeletions()); + Status s = wals_.DeleteWalsBefore(edit.GetWalDeletion().GetLogNumber()); if (!s.ok()) { return s; } diff --git a/db/version_set_test.cc b/db/version_set_test.cc index fce8e025f..4d54cec92 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1192,10 +1192,8 @@ TEST_F(VersionSetTest, WalEditsNotAppliedToVersion) { edits.back()->AddWal(i, WalMetadata(i)); } // Delete the first half of the WALs. - for (uint64_t i = 1; i <= kNumWals; i++) { - edits.emplace_back(new VersionEdit); - edits.back()->DeleteWal(i); - } + edits.emplace_back(new VersionEdit); + edits.back()->DeleteWalsBefore(kNumWals / 2 + 1); autovector versions; SyncPoint::GetInstance()->SetCallBack( @@ -1228,10 +1226,8 @@ TEST_F(VersionSetTest, NonWalEditsAppliedToVersion) { edits.back()->AddWal(i, WalMetadata(i)); } // Delete the first half of the WALs. - for (uint64_t i = 1; i <= kNumWals; i++) { - edits.emplace_back(new VersionEdit); - edits.back()->DeleteWal(i); - } + edits.emplace_back(new VersionEdit); + edits.back()->DeleteWalsBefore(kNumWals / 2 + 1); edits.emplace_back(new VersionEdit); edits.back()->SetDBId(kDBId); @@ -1411,7 +1407,7 @@ TEST_F(VersionSetTest, WalDeletion) { // Delete the closed WAL. { VersionEdit edit; - edit.DeleteWal(kClosedLogNumber); + edit.DeleteWalsBefore(kNonClosedLogNumber); ASSERT_OK(LogAndApplyToDefaultCF(edit)); @@ -1549,39 +1545,83 @@ TEST_F(VersionSetTest, AddWalWithSmallerSize) { } } -TEST_F(VersionSetTest, DeleteNonExistingWal) { +TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) { NewDB(); - constexpr WalNumber kLogNumber = 10; - constexpr WalNumber kNonExistingNumber = 11; + constexpr WalNumber kLogNumber0 = 10; + constexpr WalNumber kLogNumber1 = 20; + constexpr WalNumber kNonExistingNumber = 15; + constexpr uint64_t kSizeInBytes = 111; + + { + // Add closed WALs. + VersionEdit edit; + WalMetadata wal(kSizeInBytes); + edit.AddWal(kLogNumber0, wal); + edit.AddWal(kLogNumber1, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + + { + // Delete WALs before a non-existing WAL. + VersionEdit edit; + edit.DeleteWalsBefore(kNonExistingNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + + // Recover a new VersionSet, WAL0 is deleted, WAL1 is not. + { + std::unique_ptr new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); + ASSERT_OK(new_versions->Recover(column_families_, false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber1) != wals.end()); + } +} + +TEST_F(VersionSetTest, DeleteAllWals) { + NewDB(); + + constexpr WalNumber kMaxLogNumber = 10; constexpr uint64_t kSizeInBytes = 111; { // Add a closed WAL. VersionEdit edit; WalMetadata wal(kSizeInBytes); - edit.AddWal(kLogNumber, wal); + edit.AddWal(kMaxLogNumber, wal); ASSERT_OK(LogAndApplyToDefaultCF(edit)); } { - // Delete a non-existing WAL. VersionEdit edit; - edit.DeleteWal(kNonExistingNumber); + edit.DeleteWalsBefore(kMaxLogNumber + 10); - Status s = LogAndApplyToDefaultCF(edit); - ASSERT_TRUE(s.IsCorruption()); - ASSERT_TRUE(s.ToString().find("WAL 11 must exist before deletion") != - std::string::npos) - << s.ToString(); + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + + // Recover a new VersionSet, all WALs are deleted. + { + std::unique_ptr new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); + ASSERT_OK(new_versions->Recover(column_families_, false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 0); } } TEST_F(VersionSetTest, AtomicGroupWithWalEdits) { NewDB(); - constexpr int kAtomicGroupSize = 10; + constexpr int kAtomicGroupSize = 7; constexpr uint64_t kNumWals = 5; const std::string kDBId = "db_db"; @@ -1599,11 +1639,9 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) { edits.back()->SetDBId(kDBId); edits.back()->MarkAtomicGroup(--remaining); // Delete the first added 4 WALs. - for (uint64_t i = 1; i < kNumWals; i++) { - edits.emplace_back(new VersionEdit); - edits.back()->DeleteWal(i); - edits.back()->MarkAtomicGroup(--remaining); - } + edits.emplace_back(new VersionEdit); + edits.back()->DeleteWalsBefore(kNumWals); + edits.back()->MarkAtomicGroup(--remaining); ASSERT_EQ(remaining, 0); Status s = LogAndApplyToDefaultCF(edits); diff --git a/db/wal_edit.cc b/db/wal_edit.cc index 70965a770..6d639823f 100644 --- a/db/wal_edit.cc +++ b/db/wal_edit.cc @@ -141,33 +141,11 @@ Status WalSet::AddWals(const WalAdditions& wals) { return s; } -Status WalSet::DeleteWal(const WalDeletion& wal) { - auto it = wals_.find(wal.GetLogNumber()); - // The WAL must exist. - if (it == wals_.end()) { - std::stringstream ss; - ss << "WAL " << wal.GetLogNumber() << " must exist before deletion"; - return Status::Corruption("WalSet", ss.str()); - } - wals_.erase(it); +Status WalSet::DeleteWalsBefore(WalNumber wal) { + wals_.erase(wals_.begin(), wals_.lower_bound(wal)); return Status::OK(); } -Status WalSet::DeleteWals(const WalDeletions& wals) { - Status s; - for (const WalDeletion& wal : wals) { - s = DeleteWal(wal); - if (!s.ok()) { - break; - } - } - return s; -} - -void WalSet::DeleteWalsBefore(WalNumber number) { - wals_.erase(wals_.begin(), wals_.lower_bound(number)); -} - void WalSet::Reset() { wals_.clear(); } Status WalSet::CheckWals( diff --git a/db/wal_edit.h b/db/wal_edit.h index 78510a2e0..1ec7c4bf9 100644 --- a/db/wal_edit.h +++ b/db/wal_edit.h @@ -89,7 +89,7 @@ JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal); using WalAdditions = std::vector; -// Records the event of deleting a WAL. +// Records the event of deleting WALs before the specified log number. class WalDeletion { public: WalDeletion() : number_(kEmpty) {} @@ -104,6 +104,10 @@ class WalDeletion { std::string DebugString() const; + bool IsEmpty() const { return number_ == kEmpty; } + + void Reset() { number_ = kEmpty; } + private: static constexpr WalNumber kEmpty = 0; @@ -113,11 +117,9 @@ class WalDeletion { std::ostream& operator<<(std::ostream& os, const WalDeletion& wal); JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal); -using WalDeletions = std::vector; - // Used in VersionSet to keep the current set of WALs. // -// When a WAL is created, closed, deleted, or archived, +// When a WAL is synced or becomes obsoleted, // a VersionEdit is logged to MANIFEST and // the WAL is added to or deleted from WalSet. // @@ -132,15 +134,9 @@ class WalSet { Status AddWal(const WalAddition& wal); Status AddWals(const WalAdditions& wals); - // Delete WAL(s). - // The WAL to be deleted must exist and be closed, otherwise, - // return Status::Corruption. + // Delete WALs with log number smaller than the specified wal number. // Can happen when applying a VersionEdit or recovering from MANIFEST. - Status DeleteWal(const WalDeletion& wal); - Status DeleteWals(const WalDeletions& wals); - - // Delete WALs with log number < wal_number. - void DeleteWalsBefore(WalNumber wal_number); + Status DeleteWalsBefore(WalNumber wal); // Resets the internal state. void Reset(); diff --git a/db/wal_edit_test.cc b/db/wal_edit_test.cc index cdfe1b9cf..bc863e89e 100644 --- a/db/wal_edit_test.cc +++ b/db/wal_edit_test.cc @@ -25,9 +25,7 @@ TEST(WalSet, AddDeleteReset) { ASSERT_EQ(wals.GetWals().size(), 10); // Delete WAL 1 - 5. - for (WalNumber log_number = 1; log_number <= 5; log_number++) { - wals.DeleteWal(WalDeletion(log_number)); - } + wals.DeleteWalsBefore(6); ASSERT_EQ(wals.GetWals().size(), 5); WalNumber expected_log_number = 6; @@ -74,13 +72,13 @@ TEST(WalSet, CreateTwice) { std::string::npos); } -TEST(WalSet, DeleteNonExistingWal) { - constexpr WalNumber kNonExistingNumber = 100; +TEST(WalSet, DeleteAllWals) { + constexpr WalNumber kMaxWalNumber = 10; WalSet wals; - Status s = wals.DeleteWal(WalDeletion(kNonExistingNumber)); - ASSERT_TRUE(s.IsCorruption()); - ASSERT_TRUE(s.ToString().find("WAL 100 must exist before deletion") != - std::string::npos); + for (WalNumber i = 1; i <= kMaxWalNumber; i++) { + wals.AddWal(WalAddition(i)); + } + ASSERT_OK(wals.DeleteWalsBefore(kMaxWalNumber + 1)); } class WalSetTest : public DBTestBase { diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index fe90a0796..da1098e27 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -2066,6 +2066,7 @@ void StressTest::Open() { FLAGS_level_compaction_dynamic_level_bytes; options_.file_checksum_gen_factory = GetFileChecksumImpl(FLAGS_file_checksum_impl); + options_.track_and_verify_wals_in_manifest = true; } else { #ifdef ROCKSDB_LITE fprintf(stderr, "--options_file not supported in lite mode\n");