diff --git a/HISTORY.md b/HISTORY.md index 314666501..f695158d7 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,7 @@ * Fixed a bug caused by `DB::SyncWAL()` affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#10892). * Fixed a BackupEngine bug in which RestoreDBFromLatestBackup would fail if the latest backup was deleted and there is another valid backup available. * Fix L0 file misorder corruption caused by ingesting files of overlapping seqnos with memtable entries' through introducing `epoch_number`. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected. Also replace the previous incomplete fix (#5958) to the same corruption with this new and more complete fix. +* Fixed a bug in LockWAL() leading to re-locking mutex (#11020). ## 7.9.0 (11/21/2022) ### Performance Improvements diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 657d2870f..411503a6f 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1570,21 +1570,31 @@ Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) { } Status DBImpl::LockWAL() { - log_write_mutex_.Lock(); - auto cur_log_writer = logs_.back().writer; - IOStatus status = cur_log_writer->WriteBuffer(); - if (!status.ok()) { - ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", - status.ToString().c_str()); - // In case there is a fs error we should set it globally to prevent the - // future writes - WriteStatusCheck(status); + { + InstrumentedMutexLock lock(&mutex_); + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + + lock_wal_write_token_ = write_controller_.GetStopToken(); + + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + write_thread_.ExitUnbatched(&w); } - return static_cast(status); + return FlushWAL(/*sync=*/false); } Status DBImpl::UnlockWAL() { - log_write_mutex_.Unlock(); + { + InstrumentedMutexLock lock(&mutex_); + lock_wal_write_token_.reset(); + } + bg_cv_.SignalAll(); return Status::OK(); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 920c3d3f8..0eebef774 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2680,6 +2680,10 @@ class DBImpl : public DB { // seqno_time_mapping_ stores the sequence number to time mapping, it's not // thread safe, both read and write need db mutex hold. SeqnoToTimeMapping seqno_time_mapping_; + + // stop write token that is acquired when LockWal() is called. Destructed + // when UnlockWal() is called. + std::unique_ptr lock_wal_write_token_; }; class GetWithTimestampReadCallback : public ReadCallback { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index a597c168d..cbeab046f 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -924,6 +924,15 @@ Status DBImpl::WriteImplWALOnly( write_thread->ExitAsBatchGroupLeader(write_group, status); return status; } + } else { + InstrumentedMutexLock lock(&mutex_); + Status status = DelayWrite(/*num_bytes=*/0ull, write_options); + if (!status.ok()) { + WriteThread::WriteGroup write_group; + write_thread->EnterAsBatchGroupLeader(&w, &write_group); + write_thread->ExitAsBatchGroupLeader(write_group, status); + return status; + } } WriteThread::WriteGroup write_group; @@ -1762,6 +1771,7 @@ uint64_t DBImpl::GetMaxTotalWalSize() const { // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::DelayWrite(uint64_t num_bytes, const WriteOptions& write_options) { + mutex_.AssertHeld(); uint64_t time_delayed = 0; bool delayed = false; { diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 2bdfaada5..99d0b3c4c 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -610,6 +610,52 @@ TEST_F(DBWALTest, WALWithChecksumHandoff) { #endif // ROCKSDB_ASSERT_STATUS_CHECKED } +#ifndef ROCKSDB_LITE +TEST_F(DBWALTest, LockWal) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBWALTest::LockWal:AfterGetSortedWal", + "DBWALTest::LockWal:BeforeFlush:1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("foo", "v")); + ASSERT_OK(Put("bar", "v")); + port::Thread worker([&]() { + TEST_SYNC_POINT("DBWALTest::LockWal:BeforeFlush:1"); + Status tmp_s = db_->Flush(FlushOptions()); + ASSERT_OK(tmp_s); + }); + + ASSERT_OK(db_->LockWAL()); + // Verify writes are stopped + WriteOptions wopts; + wopts.no_slowdown = true; + Status s = db_->Put(wopts, "foo", "dontcare"); + ASSERT_TRUE(s.IsIncomplete()); + { + VectorLogPtr wals; + ASSERT_OK(db_->GetSortedWalFiles(wals)); + ASSERT_FALSE(wals.empty()); + } + TEST_SYNC_POINT("DBWALTest::LockWal:AfterGetSortedWal"); + FlushOptions flush_opts; + flush_opts.wait = false; + s = db_->Flush(flush_opts); + ASSERT_TRUE(s.IsTryAgain()); + ASSERT_OK(db_->UnlockWAL()); + ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare")); + + worker.join(); + + SyncPoint::GetInstance()->DisableProcessing(); + } while (ChangeWalOptions()); +} +#endif //! ROCKSDB_LITE + class DBRecoveryTestBlobError : public DBWALTest, public testing::WithParamInterface { diff --git a/db/write_thread.cc b/db/write_thread.cc index cc8645f37..de1744cf0 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -360,8 +360,11 @@ void WriteThread::EndWriteStall() { // Unlink write_stall_dummy_ from the write queue. This will unblock // pending write threads to enqueue themselves assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_); - assert(write_stall_dummy_.link_older != nullptr); - write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer; + // write_stall_dummy_.link_older can be nullptr only if LockWAL() has been + // called. + if (write_stall_dummy_.link_older) { + write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer; + } newest_writer_.exchange(write_stall_dummy_.link_older); // Wake up writers diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 26c07c19f..cad3d1c3c 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1445,11 +1445,17 @@ class DB { virtual Status SyncWAL() = 0; // Lock the WAL. Also flushes the WAL after locking. + // After this method returns ok, writes to the database will be stopped until + // UnlockWAL() is called. + // This method may internally acquire and release DB mutex and the WAL write + // mutex, but after it returns, neither mutex is held by caller. virtual Status LockWAL() { return Status::NotSupported("LockWAL not implemented"); } // Unlock the WAL. + // The write stop on the database will be cleared. + // This method may internally acquire and release DB mutex. virtual Status UnlockWAL() { return Status::NotSupported("UnlockWAL not implemented"); } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index caf1566b9..e035e1f88 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -6530,6 +6530,65 @@ TEST_P(TransactionTest, WriteWithBulkCreatedColumnFamilies) { cf_handles.clear(); } +TEST_P(TransactionTest, LockWal) { + const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); + if (TxnDBWritePolicy::WRITE_COMMITTED != write_policy) { + ROCKSDB_GTEST_BYPASS("Test only write-committed for now"); + return; + } + ASSERT_OK(ReOpen()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"TransactionTest::LockWal:AfterLockWal", + "TransactionTest::LockWal:BeforePrepareTxn2"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + std::unique_ptr txn0; + WriteOptions wopts; + wopts.no_slowdown = true; + txn0.reset(db->BeginTransaction(wopts, TransactionOptions())); + ASSERT_OK(txn0->SetName("txn0")); + ASSERT_OK(txn0->Put("foo", "v0")); + + std::unique_ptr txn1; + txn1.reset(db->BeginTransaction(wopts, TransactionOptions())); + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn1->Put("dummy", "v0")); + ASSERT_OK(txn1->Prepare()); + + std::unique_ptr txn2; + port::Thread worker([&]() { + txn2.reset(db->BeginTransaction(WriteOptions(), TransactionOptions())); + ASSERT_OK(txn2->SetName("txn2")); + ASSERT_OK(txn2->Put("bar", "v0")); + TEST_SYNC_POINT("TransactionTest::LockWal:BeforePrepareTxn2"); + ASSERT_OK(txn2->Prepare()); + ASSERT_OK(txn2->Commit()); + }); + ASSERT_OK(db->LockWAL()); + // txn0 cannot prepare + Status s = txn0->Prepare(); + ASSERT_TRUE(s.IsIncomplete()); + // txn1 cannot commit + s = txn1->Commit(); + ASSERT_TRUE(s.IsIncomplete()); + + TEST_SYNC_POINT("TransactionTest::LockWal:AfterLockWal"); + + ASSERT_OK(db->UnlockWAL()); + txn0.reset(); + + txn0.reset(db->BeginTransaction(wopts, TransactionOptions())); + ASSERT_OK(txn0->SetName("txn0_1")); + ASSERT_OK(txn0->Put("foo", "v1")); + ASSERT_OK(txn0->Prepare()); + ASSERT_OK(txn0->Commit()); + worker.join(); + + SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {