diff --git a/HISTORY.md b/HISTORY.md index 2a236ce06..d2f4762f8 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ * Fixed an issue for backward iteration when user defined timestamp is enabled in combination with BlobDB. * Fixed a couple of cases where a Merge operand encountered during iteration wasn't reflected in the `internal_merge_count` PerfContext counter. * Fixed a bug in CreateColumnFamilyWithImport()/ExportColumnFamily() which did not support range tombstones (#11252). +* Fixed a bug where an excluded column family from an atomic flush contains unflushed data that should've been included in this atomic flush (i.e, data of seqno less than the max seqno of this atomic flush), leading to potential data loss in this excluded column family when `WriteOptions::disableWAL == true` (#11148). ### New Features * Add statistics rocksdb.secondary.cache.filter.hits, rocksdb.secondary.cache.index.hits, and rocksdb.secondary.cache.filter.hits diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 6ca7685ae..f035041c7 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -34,11 +34,8 @@ Status DBImpl::FlushForGetLiveFiles() { // flush all dirty data to disk. Status status; if (immutable_db_options_.atomic_flush) { - autovector cfds; - SelectColumnFamiliesForAtomicFlush(&cfds); mutex_.Unlock(); - status = - AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kGetLiveFiles); + status = AtomicFlushMemTables(FlushOptions(), FlushReason::kGetLiveFiles); if (status.IsColumnFamilyDropped()) { status = Status::OK(); } @@ -437,4 +434,3 @@ Status DBImpl::GetLiveFilesStorageInfo( } } // namespace ROCKSDB_NAMESPACE - diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index ca331b86a..0b2e7abb1 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -740,7 +740,97 @@ class TestFlushListener : public EventListener { DBFlushTest* test_; }; -// RocksDB lite does not support GetLiveFiles() +TEST_F( + DBFlushTest, + FixUnrecoverableWriteDuringAtomicFlushWaitUntilFlushWouldNotStallWrites) { + Options options = CurrentOptions(); + options.atomic_flush = true; + + // To simulate a real-life crash where we can't flush during db's shutdown + options.avoid_flush_during_shutdown = true; + + // Set 3 low thresholds (while `disable_auto_compactions=false`) here so flush + // adding one more L0 file during `GetLiveFiles()` will have to wait till such + // flush will not stall writes + options.level0_stop_writes_trigger = 2; + options.level0_slowdown_writes_trigger = 2; + // Disable level-0 compaction triggered by number of files to avoid + // stalling check being skipped (resulting in the flush mentioned above didn't + // wait) + options.level0_file_num_compaction_trigger = -1; + + CreateAndReopenWithCF({"cf1"}, options); + + // Manually pause compaction thread to ensure enough L0 files as + // `disable_auto_compactions=false`is needed, in order to meet the 3 low + // thresholds above + std::unique_ptr sleeping_task_; + sleeping_task_.reset(new test::SleepingBackgroundTask()); + env_->SetBackgroundThreads(1, Env::LOW); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + sleeping_task_.get(), Env::Priority::LOW); + sleeping_task_->WaitUntilSleeping(); + + // Create some initial file to help meet the 3 low thresholds above + ASSERT_OK(Put(1, "dontcare", "dontcare")); + ASSERT_OK(Flush(1)); + + // Insert some initial data so we have something to atomic-flush later + // triggered by `GetLiveFiles()` + WriteOptions write_opts; + write_opts.disableWAL = true; + ASSERT_OK(Put(1, "k1", "v1", write_opts)); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({{ + "DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait", + "DBFlushTest::" + "UnrecoverableWriteInAtomicFlushWaitUntilFlushWouldNotStallWrites::Write", + }}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Write to db when atomic flush releases the lock to wait on write stall + // condition to be gone in `WaitUntilFlushWouldNotStallWrites()` + port::Thread write_thread([&] { + TEST_SYNC_POINT( + "DBFlushTest::" + "UnrecoverableWriteInAtomicFlushWaitUntilFlushWouldNotStallWrites::" + "Write"); + // Before the fix, the empty default CF would've been prematurely excluded + // from this atomic flush. The following two writes together make default CF + // later contain data that should've been included in the atomic flush. + ASSERT_OK(Put(0, "k2", "v2", write_opts)); + // The following write increases the max seqno of this atomic flush to be 3, + // which is greater than the seqno of default CF's data. This then violates + // the invariant that all entries of seqno less than the max seqno + // of this atomic flush should've been flushed by the time of this atomic + // flush finishes. + ASSERT_OK(Put(1, "k3", "v3", write_opts)); + + // Resume compaction threads and reduce L0 files so `GetLiveFiles()` can + // resume from the wait + sleeping_task_->WakeUp(); + sleeping_task_->WaitUntilDone(); + MoveFilesToLevel(1, 1); + }); + + // Trigger an atomic flush by `GetLiveFiles()` + std::vector files; + uint64_t manifest_file_size; + ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, /*flush*/ true)); + + write_thread.join(); + + ReopenWithColumnFamilies({"default", "cf1"}, options); + + ASSERT_EQ(Get(1, "k3"), "v3"); + // Prior to the fix, `Get()` will return `NotFound as "k2" entry in default CF + // can't be recovered from a crash right after the atomic flush finishes, + // resulting in a "recovery hole" as "k3" can be recovered. It's due to the + // invariant violation described above. + ASSERT_EQ(Get(0, "k2"), "v2"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBFlushTest, FixFlushReasonRaceFromConcurrentFlushes) { Options options = CurrentOptions(); options.atomic_flush = true; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 36bf1f9cd..69350af34 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -387,10 +387,8 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) { // We allow flush to stall write since we are trying to resume from error. flush_opts.allow_write_stall = true; if (immutable_db_options_.atomic_flush) { - autovector cfds; - SelectColumnFamiliesForAtomicFlush(&cfds); mutex_.Unlock(); - s = AtomicFlushMemTables(cfds, flush_opts, context.flush_reason); + s = AtomicFlushMemTables(flush_opts, context.flush_reason); mutex_.Lock(); } else { for (auto cfd : versions_->GetRefedColumnFamilySet()) { @@ -507,11 +505,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { has_unpersisted_data_.load(std::memory_order_relaxed) && !mutable_db_options_.avoid_flush_during_shutdown) { if (immutable_db_options_.atomic_flush) { - autovector cfds; - SelectColumnFamiliesForAtomicFlush(&cfds); mutex_.Unlock(); - Status s = - AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown); + Status s = AtomicFlushMemTables(FlushOptions(), FlushReason::kShutDown); s.PermitUncheckedError(); //**TODO: What to do on error? mutex_.Lock(); } else { @@ -5350,12 +5345,10 @@ Status DBImpl::IngestExternalFiles( FlushOptions flush_opts; flush_opts.allow_write_stall = true; if (immutable_db_options_.atomic_flush) { - autovector cfds_to_flush; - SelectColumnFamiliesForAtomicFlush(&cfds_to_flush); mutex_.Unlock(); - status = AtomicFlushMemTables(cfds_to_flush, flush_opts, - FlushReason::kExternalFileIngestion, - true /* entered_write_thread */); + status = AtomicFlushMemTables( + flush_opts, FlushReason::kExternalFileIngestion, + {} /* provided_candidate_cfds */, true /* entered_write_thread */); mutex_.Lock(); } else { for (size_t i = 0; i != num_cfs; ++i) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 309021d74..226772bdc 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1081,8 +1081,9 @@ class DBImpl : public DB { // is because in certain cases, we can flush column families, wait for the // flush to complete, but delete the column family handle before the wait // finishes. For example in CompactRange. - Status TEST_AtomicFlushMemTables(const autovector& cfds, - const FlushOptions& flush_opts); + Status TEST_AtomicFlushMemTables( + const autovector& provided_candidate_cfds, + const FlushOptions& flush_opts); // Wait for background threads to complete scheduled work. Status TEST_WaitForBackgroundWork(); @@ -1886,16 +1887,27 @@ class DBImpl : public DB { Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); - void SelectColumnFamiliesForAtomicFlush(autovector* cfds); + // Select and output column families qualified for atomic flush in + // `selected_cfds`. If `provided_candidate_cfds` is non-empty, it will be used + // as candidate CFs to select qualified ones from. Otherwise, all column + // families are used as candidate to select from. + // + // REQUIRES: mutex held + void SelectColumnFamiliesForAtomicFlush( + autovector* selected_cfds, + const autovector& provided_candidate_cfds = {}); // Force current memtable contents to be flushed. Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, FlushReason flush_reason, bool entered_write_thread = false); + // Atomic-flush memtables from quanlified CFs among `provided_candidate_cfds` + // (if non-empty) or amomg all column families and atomically record the + // result to the MANIFEST. Status AtomicFlushMemTables( - const autovector& column_family_datas, const FlushOptions& options, FlushReason flush_reason, + const autovector& provided_candidate_cfds = {}, bool entered_write_thread = false); // Wait until flushing this column family won't stall writes diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index b6a619037..da43d609d 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1031,15 +1031,9 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options, FlushOptions fo; fo.allow_write_stall = options.allow_write_stall; if (immutable_db_options_.atomic_flush) { - autovector cfds; - mutex_.Lock(); - SelectColumnFamiliesForAtomicFlush(&cfds); - mutex_.Unlock(); - s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction, - false /* entered_write_thread */); + s = AtomicFlushMemTables(fo, FlushReason::kManualCompaction); } else { - s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction, - false /* entered_write_thread */); + s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction); } if (!s.ok()) { LogFlush(immutable_db_options_.info_log); @@ -1800,8 +1794,8 @@ Status DBImpl::Flush(const FlushOptions& flush_options, cfh->GetName().c_str()); Status s; if (immutable_db_options_.atomic_flush) { - s = AtomicFlushMemTables({cfh->cfd()}, flush_options, - FlushReason::kManualFlush); + s = AtomicFlushMemTables(flush_options, FlushReason::kManualFlush, + {cfh->cfd()}); } else { s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush); } @@ -1839,7 +1833,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options, auto cfh = static_cast(elem); cfds.emplace_back(cfh->cfd()); }); - s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush); + s = AtomicFlushMemTables(flush_options, FlushReason::kManualFlush, cfds); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Manual atomic flush finished, status: %s\n" "=====Column families:=====", @@ -2223,11 +2217,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, return s; } -// Flush all elements in 'column_family_datas' -// and atomically record the result to the MANIFEST. Status DBImpl::AtomicFlushMemTables( - const autovector& column_family_datas, const FlushOptions& flush_options, FlushReason flush_reason, + const autovector& provided_candidate_cfds, bool entered_write_thread) { assert(immutable_db_options_.atomic_flush); if (!flush_options.wait && write_controller_.IsStopped()) { @@ -2237,18 +2229,48 @@ Status DBImpl::AtomicFlushMemTables( return Status::TryAgain(oss.str()); } Status s; + autovector candidate_cfds; + if (provided_candidate_cfds.empty()) { + // Generate candidate cfds if not provided + { + InstrumentedMutexLock l(&mutex_); + for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) { + if (!cfd->IsDropped() && cfd->initialized()) { + cfd->Ref(); + candidate_cfds.push_back(cfd); + } + } + } + } else { + candidate_cfds = provided_candidate_cfds; + } + if (!flush_options.allow_write_stall) { int num_cfs_to_flush = 0; - for (auto cfd : column_family_datas) { + for (auto cfd : candidate_cfds) { bool flush_needed = true; s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed); if (!s.ok()) { + // Unref the newly generated candidate cfds (when not provided) in + // `candidate_cfds` + if (provided_candidate_cfds.empty()) { + for (auto candidate_cfd : candidate_cfds) { + candidate_cfd->UnrefAndTryDelete(); + } + } return s; } else if (flush_needed) { ++num_cfs_to_flush; } } if (0 == num_cfs_to_flush) { + // Unref the newly generated candidate cfds (when not provided) in + // `candidate_cfds` + if (provided_candidate_cfds.empty()) { + for (auto candidate_cfd : candidate_cfds) { + candidate_cfd->UnrefAndTryDelete(); + } + } return s; } } @@ -2269,15 +2291,16 @@ Status DBImpl::AtomicFlushMemTables( } WaitForPendingWrites(); - for (auto cfd : column_family_datas) { - if (cfd->IsDropped()) { - continue; - } - if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || - !cached_recoverable_state_empty_.load()) { - cfds.emplace_back(cfd); + SelectColumnFamiliesForAtomicFlush(&cfds, candidate_cfds); + + // Unref the newly generated candidate cfds (when not provided) in + // `candidate_cfds` + if (provided_candidate_cfds.empty()) { + for (auto candidate_cfd : candidate_cfds) { + candidate_cfd->UnrefAndTryDelete(); } } + for (auto cfd : cfds) { if ((cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) || flush_reason == FlushReason::kErrorRecoveryRetryFlush) { diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index e59a4b981..90d37473f 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -155,8 +155,10 @@ Status DBImpl::TEST_FlushMemTable(ColumnFamilyData* cfd, } Status DBImpl::TEST_AtomicFlushMemTables( - const autovector& cfds, const FlushOptions& flush_opts) { - return AtomicFlushMemTables(cfds, flush_opts, FlushReason::kTest); + const autovector& provided_candidate_cfds, + const FlushOptions& flush_opts) { + return AtomicFlushMemTables(flush_opts, FlushReason::kTest, + provided_candidate_cfds); } Status DBImpl::TEST_WaitForBackgroundWork() { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 0891e5012..25dcec727 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1543,14 +1543,40 @@ Status DBImpl::WriteRecoverableState() { } void DBImpl::SelectColumnFamiliesForAtomicFlush( - autovector* cfds) { - for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) { + autovector* selected_cfds, + const autovector& provided_candidate_cfds) { + mutex_.AssertHeld(); + assert(selected_cfds); + + autovector candidate_cfds; + + // Generate candidate cfds if not provided + if (provided_candidate_cfds.empty()) { + for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) { + if (!cfd->IsDropped() && cfd->initialized()) { + cfd->Ref(); + candidate_cfds.push_back(cfd); + } + } + } else { + candidate_cfds = provided_candidate_cfds; + } + + for (ColumnFamilyData* cfd : candidate_cfds) { if (cfd->IsDropped()) { continue; } if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { - cfds->push_back(cfd); + selected_cfds->push_back(cfd); + } + } + + // Unref the newly generated candidate cfds (when not provided) in + // `candidate_cfds` + if (provided_candidate_cfds.empty()) { + for (auto candidate_cfd : candidate_cfds) { + candidate_cfd->UnrefAndTryDelete(); } } }