Fix a race condition in WAL tracking causing DB open failure (#9715)

Summary:
There is a race condition if WAL tracking in the MANIFEST is enabled in a database that disables 2PC.

The race condition is between two background flush threads trying to install flush results to the MANIFEST.

Consider an example database with two column families: "default" (cfd0) and "cf1" (cfd1). Initially,
both column families have one mutable (active) memtable whose data backed by 6.log.

1. Trigger a manual flush for "cf1", creating a 7.log
2. Insert another key to "default", and trigger flush for "default", creating 8.log
3. BgFlushThread1 finishes writing 9.sst
4. BgFlushThread2 finishes writing 10.sst

```
Time  BgFlushThread1                                    BgFlushThread2
 |    mutex_.Lock()
 |    precompute min_wal_to_keep as 6
 |    mutex_.Unlock()
 |                                                     mutex_.Lock()
 |                                                     precompute min_wal_to_keep as 6
 |                                                     join MANIFEST write queue and mutex_.Unlock()
 |    write to MANIFEST
 |    mutex_.Lock()
 |    cfd1->log_number = 7
 |    Signal bg_flush_2 and mutex_.Unlock()
 |                                                     wake up and mutex_.Lock()
 |                                                     cfd0->log_number = 8
 |                                                     FindObsoleteFiles() with job_context->log_number == 7
 |                                                     mutex_.Unlock()
 |                                                     PurgeObsoleteFiles() deletes 6.log
 V
```

As shown in the above, BgFlushThread2 thinks that the min wal to keep is 6.log because "cf1" has unflushed data in 6.log (cf1.log_number=6).
Similarly, BgThread1 thinks that min wal to keep is also 6.log because "default" has unflushed data (default.log_number=6).
No WAL deletion will be written to MANIFEST because 6 is equal to `versions_->wals_.min_wal_number_to_keep`,
due to https://github.com/facebook/rocksdb/blob/7.1.fb/db/memtable_list.cc#L513:L514.
The bg flush thread that finishes last will perform file purging. `job_context.log_number` will be evaluated as 7, i.e.
the min wal that contains unflushed data, causing 6.log to be deleted. However, MANIFEST thinks 6.log should still exist.
If you close the db at this point, you won't be able to re-open it if `track_and_verify_wal_in_manifest` is true.

We must handle the case of multiple bg flush threads, and it is difficult for one bg flush thread to know
the correct min wal number until the other bg flush threads have finished committing to the manifest and updated
the `cfd::log_number`.
To fix this issue, we rename an existing variable `min_log_number_to_keep_2pc` to `min_log_number_to_keep`,
and use it to track WAL file deletion in non-2pc mode as well.
This variable is updated only 1) during recovery with mutex held, or 2) in the MANIFEST write thread.
`min_log_number_to_keep` means RocksDB will delete WALs below it, although there may be WALs
above it which are also obsolete. Formally, we will have [min_wal_to_keep, max_obsolete_wal]. During recovery, we
make sure that only WALs above max_obsolete_wal are checked and added back to `alive_log_files_`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9715

Test Plan:
```
make check
```
Also ran stress test below (with asan) to make sure it completes successfully.
```
TEST_TMPDIR=/dev/shm/rocksdb OPT=-g ASAN_OPTIONS=disable_coredump=0 \
CRASH_TEST_EXT_ARGS=--compression_type=zstd SKIP_FORMAT_BUCK_CHECKS=1 \
make J=52 -j52 blackbox_asan_crash_test
```

Reviewed By: ltamasi

Differential Revision: D34984412

Pulled By: riversand963

fbshipit-source-id: c7b21a8d84751bb55ea79c9f387103d21b231005
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent 29bec740f5
commit e0c84aa0dc
  1. 1
      HISTORY.md
  2. 8
      db/db_impl/db_impl_files.cc
  3. 21
      db/db_impl/db_impl_open.cc
  4. 87
      db/db_wal_test.cc
  5. 5
      db/event_helpers.cc
  6. 24
      db/memtable_list.cc
  7. 7
      db/version_edit_handler.cc
  8. 15
      db/version_set.cc
  9. 15
      db/version_set.h
  10. 5
      db/version_set_test.cc

@ -2,6 +2,7 @@
## Unreleased ## Unreleased
### Bug Fixes ### Bug Fixes
* Fixed a major performance bug in which Bloom filters generated by pre-7.0 releases are not read by early 7.0.x releases (and vice-versa) due to changes to FilterPolicy::Name() in #9590. This can severely impact read performance and read I/O on upgrade or downgrade with existing DB, but not data correctness. * Fixed a major performance bug in which Bloom filters generated by pre-7.0 releases are not read by early 7.0.x releases (and vice-versa) due to changes to FilterPolicy::Name() in #9590. This can severely impact read performance and read I/O on upgrade or downgrade with existing DB, but not data correctness.
* Fixed a race condition when 2PC is disabled and WAL tracking in the MANIFEST is enabled. The race condition is between two background flush threads trying to install flush results, causing a WAL deletion not tracked in the MANIFEST. A future DB open may fail.
### Public API changes ### Public API changes
* Added pure virtual FilterPolicy::CompatibilityName(), which is needed for fixing major performance bug involving FilterPolicy naming in SST metadata without affecting Customizable aspect of FilterPolicy. This change only affects those with their own custom or wrapper FilterPolicy classes. * Added pure virtual FilterPolicy::CompatibilityName(), which is needed for fixing major performance bug involving FilterPolicy naming in SST metadata without affecting Customizable aspect of FilterPolicy. This change only affects those with their own custom or wrapper FilterPolicy classes.

@ -23,11 +23,7 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
uint64_t DBImpl::MinLogNumberToKeep() { uint64_t DBImpl::MinLogNumberToKeep() {
if (allow_2pc()) { return versions_->min_log_number_to_keep();
return versions_->min_log_number_to_keep_2pc();
} else {
return versions_->MinLogNumberWithUnflushedData();
}
} }
uint64_t DBImpl::MinObsoleteSstNumberToKeep() { uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
@ -224,7 +220,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
} }
// Add log files in wal_dir // Add log files in wal_dir
if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) { if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) {
std::vector<std::string> log_files; std::vector<std::string> log_files;
Status s = env_->GetChildren(immutable_db_options_.wal_dir, &log_files); Status s = env_->GetChildren(immutable_db_options_.wal_dir, &log_files);
@ -234,6 +229,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
log_file, immutable_db_options_.wal_dir); log_file, immutable_db_options_.wal_dir);
} }
} }
// Add info log files in db_log_dir // Add info log files in db_log_dir
if (!immutable_db_options_.db_log_dir.empty() && if (!immutable_db_options_.db_log_dir.empty() &&
immutable_db_options_.db_log_dir != dbname_) { immutable_db_options_.db_log_dir != dbname_) {

@ -866,6 +866,11 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
bool flushed = false; bool flushed = false;
uint64_t corrupted_wal_number = kMaxSequenceNumber; uint64_t corrupted_wal_number = kMaxSequenceNumber;
uint64_t min_wal_number = MinLogNumberToKeep(); uint64_t min_wal_number = MinLogNumberToKeep();
if (!allow_2pc()) {
// In non-2pc mode, we skip WALs that do not back unflushed data.
min_wal_number =
std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData());
}
for (auto wal_number : wal_numbers) { for (auto wal_number : wal_numbers) {
if (wal_number < min_wal_number) { if (wal_number < min_wal_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
@ -1270,9 +1275,16 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
} }
std::unique_ptr<VersionEdit> wal_deletion; std::unique_ptr<VersionEdit> wal_deletion;
if (flushed) {
wal_deletion = std::make_unique<VersionEdit>();
if (immutable_db_options_.track_and_verify_wals_in_manifest) { if (immutable_db_options_.track_and_verify_wals_in_manifest) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(max_wal_number + 1); wal_deletion->DeleteWalsBefore(max_wal_number + 1);
}
if (!allow_2pc()) {
// In non-2pc mode, flushing the memtables of the column families
// means we can advance min_log_number_to_keep.
wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1);
}
edit_lists.back().push_back(wal_deletion.get()); edit_lists.back().push_back(wal_deletion.get());
} }
@ -1351,7 +1363,14 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
// FindObsoleteFiles() // FindObsoleteFiles()
total_log_size_ = 0; total_log_size_ = 0;
log_empty_ = false; log_empty_ = false;
uint64_t min_wal_with_unflushed_data =
versions_->MinLogNumberWithUnflushedData();
for (auto wal_number : wal_numbers) { for (auto wal_number : wal_numbers) {
if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) {
// In non-2pc mode, the WAL files not backing unflushed data are not
// alive, thus should not be added to the alive_log_files_.
continue;
}
// We preallocate space for wals, but then after a crash and restart, those // We preallocate space for wals, but then after a crash and restart, those
// preallocated space are not needed anymore. It is likely only the last // preallocated space are not needed anymore. It is likely only the last
// log has such preallocated space, so we only truncate for the last log. // log has such preallocated space, so we only truncate for the last log.

@ -1491,6 +1491,93 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options)); ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
} }
TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) {
Options options = CurrentOptions();
options.env = env_;
options.track_and_verify_wals_in_manifest = true;
// The following make sure there are two bg flush threads.
options.max_background_jobs = 8;
const std::string cf1_name("cf1");
CreateAndReopenWithCF({cf1_name}, options);
assert(handles_.size() == 2);
{
dbfull()->TEST_LockMutex();
ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes);
dbfull()->TEST_UnlockMutex();
}
ASSERT_OK(dbfull()->PauseBackgroundWork());
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value"));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[1]));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(dbfull()->TEST_FlushMemTable(false, true, handles_[0]));
bool called = false;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
// This callback will be called when the first bg flush thread reaches the
// point before entering the MANIFEST write queue after flushing the SST
// file.
// The purpose of the sync points here is to ensure both bg flush threads
// finish computing `min_wal_number_to_keep` before any of them updates the
// `log_number` for the column family that's being flushed.
SyncPoint::GetInstance()->SetCallBack(
"MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep",
[&](void* /*arg*/) {
dbfull()->mutex()->AssertHeld();
if (!called) {
// We are the first bg flush thread in the MANIFEST write queue.
// We set up the dependency between sync points for two threads that
// will be executing the same code.
// For the interleaving of events, see
// https://github.com/facebook/rocksdb/pull/9715.
// bg flush thread1 will release the db mutex while in the MANIFEST
// write queue. In the meantime, bg flush thread2 locks db mutex and
// computes the min_wal_number_to_keep (before thread1 writes to
// MANIFEST thus before cf1->log_number is updated). Bg thread2 joins
// the MANIFEST write queue afterwards and bg flush thread1 proceeds
// with writing to MANIFEST.
called = true;
SyncPoint::GetInstance()->LoadDependency({
{"VersionSet::LogAndApply:WriteManifestStart",
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"},
{"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2",
"VersionSet::LogAndApply:WriteManifest"},
});
} else {
// The other bg flush thread has already been in the MANIFEST write
// queue, and we are after.
TEST_SYNC_POINT(
"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2");
}
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->ContinueBackgroundWork());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_TRUE(called);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
DB* db1 = nullptr;
Status s = DB::OpenForReadOnly(options, dbname_, &db1);
ASSERT_OK(s);
assert(db1);
delete db1;
}
// Test scope: // Test scope:
// - We expect to open data store under all circumstances // - We expect to open data store under all circumstances
// - We expect only data upto the point where the first error was encountered // - We expect only data upto the point where the first error was encountered

@ -95,8 +95,9 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
jwriter << "cf_name" << cf_name << "job" << job_id << "event" jwriter << "cf_name" << cf_name << "job" << job_id << "event"
<< "table_file_creation" << "table_file_creation"
<< "file_number" << fd.GetNumber() << "file_size" << "file_number" << fd.GetNumber() << "file_size"
<< fd.GetFileSize() << "file_checksum" << file_checksum << fd.GetFileSize() << "file_checksum"
<< "file_checksum_func_name" << file_checksum_func_name; << Slice(file_checksum).ToString(true) << "file_checksum_func_name"
<< file_checksum_func_name;
// table_properties // table_properties
{ {

@ -489,8 +489,8 @@ Status MemTableList::TryInstallMemtableFlushResults(
// TODO(myabandeh): Not sure how batch_count could be 0 here. // TODO(myabandeh): Not sure how batch_count could be 0 here.
if (batch_count > 0) { if (batch_count > 0) {
uint64_t min_wal_number_to_keep = 0; uint64_t min_wal_number_to_keep = 0;
if (vset->db_options()->allow_2pc) {
assert(edit_list.size() > 0); assert(edit_list.size() > 0);
if (vset->db_options()->allow_2pc) {
// Note that if mempurge is successful, the edit_list will // Note that if mempurge is successful, the edit_list will
// not be applicable (contains info of new min_log number to keep, // not be applicable (contains info of new min_log number to keep,
// and level 0 file path of SST file created during normal flush, // and level 0 file path of SST file created during normal flush,
@ -501,21 +501,24 @@ Status MemTableList::TryInstallMemtableFlushResults(
// We piggyback the information of earliest log file to keep in the // We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed. // manifest entry for the last file flushed.
edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep); } else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
} }
edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
std::unique_ptr<VersionEdit> wal_deletion; std::unique_ptr<VersionEdit> wal_deletion;
if (vset->db_options()->track_and_verify_wals_in_manifest) { if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (!vset->db_options()->allow_2pc) {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
}
if (min_wal_number_to_keep > if (min_wal_number_to_keep >
vset->GetWalSet().GetMinWalNumberToKeep()) { vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion.reset(new VersionEdit); wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep); wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
edit_list.push_back(wal_deletion.get()); edit_list.push_back(wal_deletion.get());
} }
TEST_SYNC_POINT_CALLBACK(
"MemTableList::TryInstallMemtableFlushResults:"
"AfterComputeMinWalToKeep",
nullptr);
} }
const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
@ -798,15 +801,14 @@ Status InstallMemtableAtomicFlushResults(
if (vset->db_options()->allow_2pc) { if (vset->db_options()->allow_2pc) {
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, cfds, edit_lists, mems_list, prep_tracker); vset, cfds, edit_lists, mems_list, prep_tracker);
edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep); } else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
} }
edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
std::unique_ptr<VersionEdit> wal_deletion; std::unique_ptr<VersionEdit> wal_deletion;
if (vset->db_options()->track_and_verify_wals_in_manifest) { if (vset->db_options()->track_and_verify_wals_in_manifest) {
if (!vset->db_options()->allow_2pc) {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
}
if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) { if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
wal_deletion.reset(new VersionEdit); wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep); wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);

@ -394,7 +394,7 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
if (s->ok()) { if (s->ok()) {
version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily( version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily(
version_edit_params_.max_column_family_); version_edit_params_.max_column_family_);
version_set_->MarkMinLogNumberToKeep2PC( version_set_->MarkMinLogNumberToKeep(
version_edit_params_.min_log_number_to_keep_); version_edit_params_.min_log_number_to_keep_);
version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_); version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_);
version_set_->MarkFileNumberUsed(version_edit_params_.log_number_); version_set_->MarkFileNumberUsed(version_edit_params_.log_number_);
@ -970,12 +970,11 @@ void DumpManifestHandler::CheckIterationResult(const log::Reader& reader,
fprintf(stdout, fprintf(stdout,
"next_file_number %" PRIu64 " last_sequence %" PRIu64 "next_file_number %" PRIu64 " last_sequence %" PRIu64
" prev_log_number %" PRIu64 " max_column_family %" PRIu32 " prev_log_number %" PRIu64 " max_column_family %" PRIu32
" min_log_number_to_keep " " min_log_number_to_keep %" PRIu64 "\n",
"%" PRIu64 "\n",
version_set_->current_next_file_number(), version_set_->current_next_file_number(),
version_set_->LastSequence(), version_set_->prev_log_number(), version_set_->LastSequence(), version_set_->prev_log_number(),
version_set_->column_family_set_->GetMaxColumnFamily(), version_set_->column_family_set_->GetMaxColumnFamily(),
version_set_->min_log_number_to_keep_2pc()); version_set_->min_log_number_to_keep());
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -4141,7 +4141,7 @@ void VersionSet::Reset() {
} }
db_id_.clear(); db_id_.clear();
next_file_number_.store(2); next_file_number_.store(2);
min_log_number_to_keep_2pc_.store(0); min_log_number_to_keep_.store(0);
manifest_file_number_ = 0; manifest_file_number_ = 0;
options_file_number_ = 0; options_file_number_ = 0;
pending_manifest_file_number_ = 0; pending_manifest_file_number_ = 0;
@ -4610,8 +4610,7 @@ Status VersionSet::ProcessManifestWrites(
} }
if (last_min_log_number_to_keep != 0) { if (last_min_log_number_to_keep != 0) {
// Should only be set in 2PC mode. MarkMinLogNumberToKeep(last_min_log_number_to_keep);
MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
} }
for (int i = 0; i < static_cast<int>(versions.size()); ++i) { for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
@ -4965,7 +4964,7 @@ Status VersionSet::Recover(
",min_log_number_to_keep is %" PRIu64 "\n", ",min_log_number_to_keep is %" PRIu64 "\n",
manifest_path.c_str(), manifest_file_number_, next_file_number_.load(), manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
last_sequence_.load(), log_number, prev_log_number_, last_sequence_.load(), log_number, prev_log_number_,
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep());
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) { if (cfd->IsDropped()) {
@ -5392,9 +5391,9 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
} }
// Called only either from ::LogAndApply which is protected by mutex or during // Called only either from ::LogAndApply which is protected by mutex or during
// recovery which is single-threaded. // recovery which is single-threaded.
void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { void VersionSet::MarkMinLogNumberToKeep(uint64_t number) {
if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) { if (min_log_number_to_keep_.load(std::memory_order_relaxed) < number) {
min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed); min_log_number_to_keep_.store(number, std::memory_order_relaxed);
} }
} }
@ -5520,7 +5519,7 @@ Status VersionSet::WriteCurrentStateToManifest(
// min_log_number_to_keep is for the whole db, not for specific column family. // min_log_number_to_keep is for the whole db, not for specific column family.
// So it does not need to be set for every column family, just need to be set once. // So it does not need to be set for every column family, just need to be set once.
// Since default CF can never be dropped, we set the min_log to the default CF here. // Since default CF can never be dropped, we set the min_log to the default CF here.
uint64_t min_log = min_log_number_to_keep_2pc(); uint64_t min_log = min_log_number_to_keep();
if (min_log != 0) { if (min_log != 0) {
edit.SetMinLogNumberToKeep(min_log); edit.SetMinLogNumberToKeep(min_log);
} }

@ -1131,8 +1131,8 @@ class VersionSet {
uint64_t current_next_file_number() const { return next_file_number_.load(); } uint64_t current_next_file_number() const { return next_file_number_.load(); }
uint64_t min_log_number_to_keep_2pc() const { uint64_t min_log_number_to_keep() const {
return min_log_number_to_keep_2pc_.load(); return min_log_number_to_keep_.load();
} }
// Allocate and return a new file number // Allocate and return a new file number
@ -1190,7 +1190,7 @@ class VersionSet {
// Mark the specified log number as deleted // Mark the specified log number as deleted
// REQUIRED: this is only called during single-threaded recovery or repair, or // REQUIRED: this is only called during single-threaded recovery or repair, or
// from ::LogAndApply where the global mutex is held. // from ::LogAndApply where the global mutex is held.
void MarkMinLogNumberToKeep2PC(uint64_t number); void MarkMinLogNumberToKeep(uint64_t number);
// Return the log file number for the log file that is currently // Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file. // being compacted, or zero if there is no such log file.
@ -1199,10 +1199,12 @@ class VersionSet {
// Returns the minimum log number which still has data not flushed to any SST // Returns the minimum log number which still has data not flushed to any SST
// file. // file.
// In non-2PC mode, all the log numbers smaller than this number can be safely // In non-2PC mode, all the log numbers smaller than this number can be safely
// deleted. // deleted, although we still use `min_log_number_to_keep_` to determine when
// to delete a WAL file.
uint64_t MinLogNumberWithUnflushedData() const { uint64_t MinLogNumberWithUnflushedData() const {
return PreComputeMinLogNumberWithUnflushedData(nullptr); return PreComputeMinLogNumberWithUnflushedData(nullptr);
} }
// Returns the minimum log number which still has data not flushed to any SST // Returns the minimum log number which still has data not flushed to any SST
// file. // file.
// Empty column families' log number is considered to be // Empty column families' log number is considered to be
@ -1402,9 +1404,8 @@ class VersionSet {
const ImmutableDBOptions* const db_options_; const ImmutableDBOptions* const db_options_;
std::atomic<uint64_t> next_file_number_; std::atomic<uint64_t> next_file_number_;
// Any WAL number smaller than this should be ignored during recovery, // Any WAL number smaller than this should be ignored during recovery,
// and is qualified for being deleted in 2PC mode. In non-2PC mode, this // and is qualified for being deleted.
// number is ignored. std::atomic<uint64_t> min_log_number_to_keep_ = {0};
std::atomic<uint64_t> min_log_number_to_keep_2pc_ = {0};
uint64_t manifest_file_number_; uint64_t manifest_file_number_;
uint64_t options_file_number_; uint64_t options_file_number_;
uint64_t options_file_size_; uint64_t options_file_size_;

@ -3424,6 +3424,7 @@ TEST_F(VersionSetTestMissingFiles, NoFileMissing) {
} }
TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) { TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
db_options_.allow_2pc = true;
NewDB(); NewDB();
SstInfo sst(100, kDefaultColumnFamilyName, "a"); SstInfo sst(100, kDefaultColumnFamilyName, "a");
@ -3435,12 +3436,12 @@ TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
edit.AddFile(0, file_metas[0]); edit.AddFile(0, file_metas[0]);
edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC); edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC);
ASSERT_OK(LogAndApplyToDefaultCF(edit)); ASSERT_OK(LogAndApplyToDefaultCF(edit));
ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC); ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
CreateNewManifest(); CreateNewManifest();
ReopenDB(); ReopenDB();
ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC); ASSERT_EQ(versions_->min_log_number_to_keep(), kMinWalNumberToKeep2PC);
} }
} }

Loading…
Cancel
Save