diff --git a/HISTORY.md b/HISTORY.md index 79eb4ca52..09cae7926 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Bug Fixes * Fixed a major performance bug in which Bloom filters generated by pre-7.0 releases are not read by early 7.0.x releases (and vice-versa) due to changes to FilterPolicy::Name() in #9590. This can severely impact read performance and read I/O on upgrade or downgrade with existing DB, but not data correctness. +* Fixed a race condition when 2PC is disabled and WAL tracking in the MANIFEST is enabled. The race condition is between two background flush threads trying to install flush results, causing a WAL deletion not tracked in the MANIFEST. A future DB open may fail. ### Public API changes * Added pure virtual FilterPolicy::CompatibilityName(), which is needed for fixing major performance bug involving FilterPolicy naming in SST metadata without affecting Customizable aspect of FilterPolicy. This change only affects those with their own custom or wrapper FilterPolicy classes. diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index b50380d92..1790ed836 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -23,11 +23,7 @@ namespace ROCKSDB_NAMESPACE { uint64_t DBImpl::MinLogNumberToKeep() { - if (allow_2pc()) { - return versions_->min_log_number_to_keep_2pc(); - } else { - return versions_->MinLogNumberWithUnflushedData(); - } + return versions_->min_log_number_to_keep(); } uint64_t DBImpl::MinObsoleteSstNumberToKeep() { @@ -224,7 +220,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } // Add log files in wal_dir - if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) { std::vector log_files; Status s = env_->GetChildren(immutable_db_options_.wal_dir, &log_files); @@ -234,6 +229,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, log_file, immutable_db_options_.wal_dir); } } + // Add info log files in db_log_dir if (!immutable_db_options_.db_log_dir.empty() && immutable_db_options_.db_log_dir != dbname_) { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 94ad8bf85..419c2e9af 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -866,6 +866,11 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, bool flushed = false; uint64_t corrupted_wal_number = kMaxSequenceNumber; uint64_t min_wal_number = MinLogNumberToKeep(); + if (!allow_2pc()) { + // In non-2pc mode, we skip WALs that do not back unflushed data. + min_wal_number = + std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData()); + } for (auto wal_number : wal_numbers) { if (wal_number < min_wal_number) { ROCKS_LOG_INFO(immutable_db_options_.info_log, @@ -1270,9 +1275,16 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, } std::unique_ptr wal_deletion; - if (immutable_db_options_.track_and_verify_wals_in_manifest) { - wal_deletion.reset(new VersionEdit); - wal_deletion->DeleteWalsBefore(max_wal_number + 1); + if (flushed) { + wal_deletion = std::make_unique(); + if (immutable_db_options_.track_and_verify_wals_in_manifest) { + wal_deletion->DeleteWalsBefore(max_wal_number + 1); + } + if (!allow_2pc()) { + // In non-2pc mode, flushing the memtables of the column families + // means we can advance min_log_number_to_keep. + wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1); + } edit_lists.back().push_back(wal_deletion.get()); } @@ -1351,7 +1363,14 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { // FindObsoleteFiles() total_log_size_ = 0; log_empty_ = false; + uint64_t min_wal_with_unflushed_data = + versions_->MinLogNumberWithUnflushedData(); for (auto wal_number : wal_numbers) { + if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) { + // In non-2pc mode, the WAL files not backing unflushed data are not + // alive, thus should not be added to the alive_log_files_. + continue; + } // We preallocate space for wals, but then after a crash and restart, those // preallocated space are not needed anymore. It is likely only the last // log has such preallocated space, so we only truncate for the last log. diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 8a61dadaf..f2b94ed07 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1491,6 +1491,93 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) { ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options)); } +TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) { + Options options = CurrentOptions(); + options.env = env_; + options.track_and_verify_wals_in_manifest = true; + // The following make sure there are two bg flush threads. + options.max_background_jobs = 8; + + const std::string cf1_name("cf1"); + CreateAndReopenWithCF({cf1_name}, options); + assert(handles_.size() == 2); + + { + dbfull()->TEST_LockMutex(); + ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes); + dbfull()->TEST_UnlockMutex(); + } + + ASSERT_OK(dbfull()->PauseBackgroundWork()); + + ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value")); + ASSERT_OK(db_->Put(WriteOptions(), "foo", "value")); + + ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[1])); + + ASSERT_OK(db_->Put(WriteOptions(), "foo", "value")); + ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[0])); + + bool called = false; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + // This callback will be called when the first bg flush thread reaches the + // point before entering the MANIFEST write queue after flushing the SST + // file. + // The purpose of the sync points here is to ensure both bg flush threads + // finish computing `min_wal_number_to_keep` before any of them updates the + // `log_number` for the column family that's being flushed. + SyncPoint::GetInstance()->SetCallBack( + "MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep", + [&](void* /*arg*/) { + dbfull()->mutex()->AssertHeld(); + if (!called) { + // We are the first bg flush thread in the MANIFEST write queue. + // We set up the dependency between sync points for two threads that + // will be executing the same code. + // For the interleaving of events, see + // https://github.com/facebook/rocksdb/pull/9715. + // bg flush thread1 will release the db mutex while in the MANIFEST + // write queue. In the meantime, bg flush thread2 locks db mutex and + // computes the min_wal_number_to_keep (before thread1 writes to + // MANIFEST thus before cf1->log_number is updated). Bg thread2 joins + // the MANIFEST write queue afterwards and bg flush thread1 proceeds + // with writing to MANIFEST. + called = true; + SyncPoint::GetInstance()->LoadDependency({ + {"VersionSet::LogAndApply:WriteManifestStart", + "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"}, + {"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2", + "VersionSet::LogAndApply:WriteManifest"}, + }); + } else { + // The other bg flush thread has already been in the MANIFEST write + // queue, and we are after. + TEST_SYNC_POINT( + "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(dbfull()->ContinueBackgroundWork()); + + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0])); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); + + ASSERT_TRUE(called); + + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + DB* db1 = nullptr; + Status s = DB::OpenForReadOnly(options, dbname_, &db1); + ASSERT_OK(s); + assert(db1); + delete db1; +} + // Test scope: // - We expect to open data store under all circumstances // - We expect only data upto the point where the first error was encountered diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 88bf8cc69..3ec0e8da1 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -95,8 +95,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( jwriter << "cf_name" << cf_name << "job" << job_id << "event" << "table_file_creation" << "file_number" << fd.GetNumber() << "file_size" - << fd.GetFileSize() << "file_checksum" << file_checksum - << "file_checksum_func_name" << file_checksum_func_name; + << fd.GetFileSize() << "file_checksum" + << Slice(file_checksum).ToString(true) << "file_checksum_func_name" + << file_checksum_func_name; // table_properties { diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 133562b7b..b0d29bcd2 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -489,8 +489,8 @@ Status MemTableList::TryInstallMemtableFlushResults( // TODO(myabandeh): Not sure how batch_count could be 0 here. if (batch_count > 0) { uint64_t min_wal_number_to_keep = 0; + assert(edit_list.size() > 0); if (vset->db_options()->allow_2pc) { - assert(edit_list.size() > 0); // Note that if mempurge is successful, the edit_list will // not be applicable (contains info of new min_log number to keep, // and level 0 file path of SST file created during normal flush, @@ -499,23 +499,26 @@ Status MemTableList::TryInstallMemtableFlushResults( min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( vset, *cfd, edit_list, memtables_to_flush, prep_tracker); - // We piggyback the information of earliest log file to keep in the + // We piggyback the information of earliest log file to keep in the // manifest entry for the last file flushed. - edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep); + } else { + min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list); } + edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep); std::unique_ptr wal_deletion; if (vset->db_options()->track_and_verify_wals_in_manifest) { - if (!vset->db_options()->allow_2pc) { - min_wal_number_to_keep = - PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list); - } if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) { wal_deletion.reset(new VersionEdit); wal_deletion->DeleteWalsBefore(min_wal_number_to_keep); edit_list.push_back(wal_deletion.get()); } + TEST_SYNC_POINT_CALLBACK( + "MemTableList::TryInstallMemtableFlushResults:" + "AfterComputeMinWalToKeep", + nullptr); } const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, @@ -798,15 +801,14 @@ Status InstallMemtableAtomicFlushResults( if (vset->db_options()->allow_2pc) { min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( vset, cfds, edit_lists, mems_list, prep_tracker); - edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep); + } else { + min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists); } + edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep); std::unique_ptr wal_deletion; if (vset->db_options()->track_and_verify_wals_in_manifest) { - if (!vset->db_options()->allow_2pc) { - min_wal_number_to_keep = - PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists); - } if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) { wal_deletion.reset(new VersionEdit); wal_deletion->DeleteWalsBefore(min_wal_number_to_keep); diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 208527fb8..f3e24016b 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -394,7 +394,7 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader, if (s->ok()) { version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily( version_edit_params_.max_column_family_); - version_set_->MarkMinLogNumberToKeep2PC( + version_set_->MarkMinLogNumberToKeep( version_edit_params_.min_log_number_to_keep_); version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_); version_set_->MarkFileNumberUsed(version_edit_params_.log_number_); @@ -970,12 +970,11 @@ void DumpManifestHandler::CheckIterationResult(const log::Reader& reader, fprintf(stdout, "next_file_number %" PRIu64 " last_sequence %" PRIu64 " prev_log_number %" PRIu64 " max_column_family %" PRIu32 - " min_log_number_to_keep " - "%" PRIu64 "\n", + " min_log_number_to_keep %" PRIu64 "\n", version_set_->current_next_file_number(), version_set_->LastSequence(), version_set_->prev_log_number(), version_set_->column_family_set_->GetMaxColumnFamily(), - version_set_->min_log_number_to_keep_2pc()); + version_set_->min_log_number_to_keep()); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index 134c1f88a..a5d43d2d4 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4141,7 +4141,7 @@ void VersionSet::Reset() { } db_id_.clear(); next_file_number_.store(2); - min_log_number_to_keep_2pc_.store(0); + min_log_number_to_keep_.store(0); manifest_file_number_ = 0; options_file_number_ = 0; pending_manifest_file_number_ = 0; @@ -4610,8 +4610,7 @@ Status VersionSet::ProcessManifestWrites( } if (last_min_log_number_to_keep != 0) { - // Should only be set in 2PC mode. - MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep); + MarkMinLogNumberToKeep(last_min_log_number_to_keep); } for (int i = 0; i < static_cast(versions.size()); ++i) { @@ -4965,7 +4964,7 @@ Status VersionSet::Recover( ",min_log_number_to_keep is %" PRIu64 "\n", manifest_path.c_str(), manifest_file_number_, next_file_number_.load(), last_sequence_.load(), log_number, prev_log_number_, - column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); + column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep()); for (auto cfd : *column_family_set_) { if (cfd->IsDropped()) { @@ -5392,9 +5391,9 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { } // Called only either from ::LogAndApply which is protected by mutex or during // recovery which is single-threaded. -void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { - if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) { - min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed); +void VersionSet::MarkMinLogNumberToKeep(uint64_t number) { + if (min_log_number_to_keep_.load(std::memory_order_relaxed) < number) { + min_log_number_to_keep_.store(number, std::memory_order_relaxed); } } @@ -5520,7 +5519,7 @@ Status VersionSet::WriteCurrentStateToManifest( // min_log_number_to_keep is for the whole db, not for specific column family. // So it does not need to be set for every column family, just need to be set once. // Since default CF can never be dropped, we set the min_log to the default CF here. - uint64_t min_log = min_log_number_to_keep_2pc(); + uint64_t min_log = min_log_number_to_keep(); if (min_log != 0) { edit.SetMinLogNumberToKeep(min_log); } diff --git a/db/version_set.h b/db/version_set.h index 0c5e97334..2db6a26fb 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1131,8 +1131,8 @@ class VersionSet { uint64_t current_next_file_number() const { return next_file_number_.load(); } - uint64_t min_log_number_to_keep_2pc() const { - return min_log_number_to_keep_2pc_.load(); + uint64_t min_log_number_to_keep() const { + return min_log_number_to_keep_.load(); } // Allocate and return a new file number @@ -1190,7 +1190,7 @@ class VersionSet { // Mark the specified log number as deleted // REQUIRED: this is only called during single-threaded recovery or repair, or // from ::LogAndApply where the global mutex is held. - void MarkMinLogNumberToKeep2PC(uint64_t number); + void MarkMinLogNumberToKeep(uint64_t number); // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. @@ -1199,10 +1199,12 @@ class VersionSet { // Returns the minimum log number which still has data not flushed to any SST // file. // In non-2PC mode, all the log numbers smaller than this number can be safely - // deleted. + // deleted, although we still use `min_log_number_to_keep_` to determine when + // to delete a WAL file. uint64_t MinLogNumberWithUnflushedData() const { return PreComputeMinLogNumberWithUnflushedData(nullptr); } + // Returns the minimum log number which still has data not flushed to any SST // file. // Empty column families' log number is considered to be @@ -1402,9 +1404,8 @@ class VersionSet { const ImmutableDBOptions* const db_options_; std::atomic next_file_number_; // Any WAL number smaller than this should be ignored during recovery, - // and is qualified for being deleted in 2PC mode. In non-2PC mode, this - // number is ignored. - std::atomic min_log_number_to_keep_2pc_ = {0}; + // and is qualified for being deleted. + std::atomic min_log_number_to_keep_ = {0}; uint64_t manifest_file_number_; uint64_t options_file_number_; uint64_t options_file_size_; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 422afaf49..c48547508 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -3424,6 +3424,7 @@ TEST_F(VersionSetTestMissingFiles, NoFileMissing) { } TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) { + db_options_.allow_2pc = true; NewDB(); SstInfo sst(100, kDefaultColumnFamilyName, "a"); @@ -3435,12 +3436,12 @@ TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) { edit.AddFile(0, file_metas[0]); edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC); ASSERT_OK(LogAndApplyToDefaultCF(edit)); - ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC); + ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC); for (int i = 0; i < 3; i++) { CreateNewManifest(); ReopenDB(); - ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC); + ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC); } }