From c93ba7db5ddc33f69f8f049cc59454985b17dc46 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 13 Dec 2022 21:45:00 -0800 Subject: [PATCH] Revise LockWAL/UnlockWAL implementation (#11020) Summary: RocksDB has two public APIs: `DB::LockWAL()`/`DB::UnlockWAL()`. The current implementation acquires and releases the internal `DBImpl::log_write_mutex_`. According to the comment on `DBImpl::log_write_mutex_`: https://github.com/facebook/rocksdb/blob/7.8.fb/db/db_impl/db_impl.h#L2287:L2288 > Note: to avoid dealock, if needed to acquire both log_write_mutex_ and mutex_, the order should be first mutex_ and then log_write_mutex_. This puts limitations on how applications can use the `LockWAL()` API. After `LockWAL()` returns ok, then application should not perform any operation that acquires `mutex_`. Currently, the use case of `LockWAL()` is MyRocks implementing the MySQL storage engine handlerton `lock_hton_log` interface. The operation that MyRocks performs after `LockWAL()` is `GetSortedWalFiless()` which not only acquires mutex_, but also `log_write_mutex_`. There are two issues: 1. Applications using these two APIs may hang if one thread calls `GetSortedWalFiles()` after calling `LockWAL()` because log_write_mutex is not recursive. 2. Two threads may dead lock due to lock order inversion. To fix these issues, we can modify the implementation of LockWAL so that it does not keep `log_write_mutex_` held until UnlockWAL. To achieve the goal of locking the WAL, we can instead manually inject a write stall so that all future writes will be stopped. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11020 Test Plan: make check Reviewed By: ajkr Differential Revision: D41785203 Pulled By: riversand963 fbshipit-source-id: 5ccb7a9c6eb9a2c3fa80fd2c399cc2568b8f89ce --- HISTORY.md | 1 + db/db_impl/db_impl.cc | 32 ++++++++---- db/db_impl/db_impl.h | 4 ++ db/db_impl/db_impl_write.cc | 10 ++++ db/db_wal_test.cc | 46 +++++++++++++++++ db/write_thread.cc | 7 ++- include/rocksdb/db.h | 6 +++ utilities/transactions/transaction_test.cc | 59 ++++++++++++++++++++++ 8 files changed, 152 insertions(+), 13 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 314666501..f695158d7 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,7 @@ * Fixed a bug caused by `DB::SyncWAL()` affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#10892). * Fixed a BackupEngine bug in which RestoreDBFromLatestBackup would fail if the latest backup was deleted and there is another valid backup available. * Fix L0 file misorder corruption caused by ingesting files of overlapping seqnos with memtable entries' through introducing `epoch_number`. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected. Also replace the previous incomplete fix (#5958) to the same corruption with this new and more complete fix. +* Fixed a bug in LockWAL() leading to re-locking mutex (#11020). ## 7.9.0 (11/21/2022) ### Performance Improvements diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 657d2870f..411503a6f 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1570,21 +1570,31 @@ Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) { } Status DBImpl::LockWAL() { - log_write_mutex_.Lock(); - auto cur_log_writer = logs_.back().writer; - IOStatus status = cur_log_writer->WriteBuffer(); - if (!status.ok()) { - ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", - status.ToString().c_str()); - // In case there is a fs error we should set it globally to prevent the - // future writes - WriteStatusCheck(status); + { + InstrumentedMutexLock lock(&mutex_); + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + + lock_wal_write_token_ = write_controller_.GetStopToken(); + + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + write_thread_.ExitUnbatched(&w); } - return static_cast(status); + return FlushWAL(/*sync=*/false); } Status DBImpl::UnlockWAL() { - log_write_mutex_.Unlock(); + { + InstrumentedMutexLock lock(&mutex_); + lock_wal_write_token_.reset(); + } + bg_cv_.SignalAll(); return Status::OK(); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 920c3d3f8..0eebef774 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2680,6 +2680,10 @@ class DBImpl : public DB { // seqno_time_mapping_ stores the sequence number to time mapping, it's not // thread safe, both read and write need db mutex hold. SeqnoToTimeMapping seqno_time_mapping_; + + // stop write token that is acquired when LockWal() is called. Destructed + // when UnlockWal() is called. + std::unique_ptr lock_wal_write_token_; }; class GetWithTimestampReadCallback : public ReadCallback { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index a597c168d..cbeab046f 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -924,6 +924,15 @@ Status DBImpl::WriteImplWALOnly( write_thread->ExitAsBatchGroupLeader(write_group, status); return status; } + } else { + InstrumentedMutexLock lock(&mutex_); + Status status = DelayWrite(/*num_bytes=*/0ull, write_options); + if (!status.ok()) { + WriteThread::WriteGroup write_group; + write_thread->EnterAsBatchGroupLeader(&w, &write_group); + write_thread->ExitAsBatchGroupLeader(write_group, status); + return status; + } } WriteThread::WriteGroup write_group; @@ -1762,6 +1771,7 @@ uint64_t DBImpl::GetMaxTotalWalSize() const { // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::DelayWrite(uint64_t num_bytes, const WriteOptions& write_options) { + mutex_.AssertHeld(); uint64_t time_delayed = 0; bool delayed = false; { diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 2bdfaada5..99d0b3c4c 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -610,6 +610,52 @@ TEST_F(DBWALTest, WALWithChecksumHandoff) { #endif // ROCKSDB_ASSERT_STATUS_CHECKED } +#ifndef ROCKSDB_LITE +TEST_F(DBWALTest, LockWal) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBWALTest::LockWal:AfterGetSortedWal", + "DBWALTest::LockWal:BeforeFlush:1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("foo", "v")); + ASSERT_OK(Put("bar", "v")); + port::Thread worker([&]() { + TEST_SYNC_POINT("DBWALTest::LockWal:BeforeFlush:1"); + Status tmp_s = db_->Flush(FlushOptions()); + ASSERT_OK(tmp_s); + }); + + ASSERT_OK(db_->LockWAL()); + // Verify writes are stopped + WriteOptions wopts; + wopts.no_slowdown = true; + Status s = db_->Put(wopts, "foo", "dontcare"); + ASSERT_TRUE(s.IsIncomplete()); + { + VectorLogPtr wals; + ASSERT_OK(db_->GetSortedWalFiles(wals)); + ASSERT_FALSE(wals.empty()); + } + TEST_SYNC_POINT("DBWALTest::LockWal:AfterGetSortedWal"); + FlushOptions flush_opts; + flush_opts.wait = false; + s = db_->Flush(flush_opts); + ASSERT_TRUE(s.IsTryAgain()); + ASSERT_OK(db_->UnlockWAL()); + ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare")); + + worker.join(); + + SyncPoint::GetInstance()->DisableProcessing(); + } while (ChangeWalOptions()); +} +#endif //! ROCKSDB_LITE + class DBRecoveryTestBlobError : public DBWALTest, public testing::WithParamInterface { diff --git a/db/write_thread.cc b/db/write_thread.cc index cc8645f37..de1744cf0 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -360,8 +360,11 @@ void WriteThread::EndWriteStall() { // Unlink write_stall_dummy_ from the write queue. This will unblock // pending write threads to enqueue themselves assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_); - assert(write_stall_dummy_.link_older != nullptr); - write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer; + // write_stall_dummy_.link_older can be nullptr only if LockWAL() has been + // called. + if (write_stall_dummy_.link_older) { + write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer; + } newest_writer_.exchange(write_stall_dummy_.link_older); // Wake up writers diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 26c07c19f..cad3d1c3c 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1445,11 +1445,17 @@ class DB { virtual Status SyncWAL() = 0; // Lock the WAL. Also flushes the WAL after locking. + // After this method returns ok, writes to the database will be stopped until + // UnlockWAL() is called. + // This method may internally acquire and release DB mutex and the WAL write + // mutex, but after it returns, neither mutex is held by caller. virtual Status LockWAL() { return Status::NotSupported("LockWAL not implemented"); } // Unlock the WAL. + // The write stop on the database will be cleared. + // This method may internally acquire and release DB mutex. virtual Status UnlockWAL() { return Status::NotSupported("UnlockWAL not implemented"); } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index caf1566b9..e035e1f88 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -6530,6 +6530,65 @@ TEST_P(TransactionTest, WriteWithBulkCreatedColumnFamilies) { cf_handles.clear(); } +TEST_P(TransactionTest, LockWal) { + const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); + if (TxnDBWritePolicy::WRITE_COMMITTED != write_policy) { + ROCKSDB_GTEST_BYPASS("Test only write-committed for now"); + return; + } + ASSERT_OK(ReOpen()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"TransactionTest::LockWal:AfterLockWal", + "TransactionTest::LockWal:BeforePrepareTxn2"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + std::unique_ptr txn0; + WriteOptions wopts; + wopts.no_slowdown = true; + txn0.reset(db->BeginTransaction(wopts, TransactionOptions())); + ASSERT_OK(txn0->SetName("txn0")); + ASSERT_OK(txn0->Put("foo", "v0")); + + std::unique_ptr txn1; + txn1.reset(db->BeginTransaction(wopts, TransactionOptions())); + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn1->Put("dummy", "v0")); + ASSERT_OK(txn1->Prepare()); + + std::unique_ptr txn2; + port::Thread worker([&]() { + txn2.reset(db->BeginTransaction(WriteOptions(), TransactionOptions())); + ASSERT_OK(txn2->SetName("txn2")); + ASSERT_OK(txn2->Put("bar", "v0")); + TEST_SYNC_POINT("TransactionTest::LockWal:BeforePrepareTxn2"); + ASSERT_OK(txn2->Prepare()); + ASSERT_OK(txn2->Commit()); + }); + ASSERT_OK(db->LockWAL()); + // txn0 cannot prepare + Status s = txn0->Prepare(); + ASSERT_TRUE(s.IsIncomplete()); + // txn1 cannot commit + s = txn1->Commit(); + ASSERT_TRUE(s.IsIncomplete()); + + TEST_SYNC_POINT("TransactionTest::LockWal:AfterLockWal"); + + ASSERT_OK(db->UnlockWAL()); + txn0.reset(); + + txn0.reset(db->BeginTransaction(wopts, TransactionOptions())); + ASSERT_OK(txn0->SetName("txn0_1")); + ASSERT_OK(txn0->Put("foo", "v1")); + ASSERT_OK(txn0->Prepare()); + ASSERT_OK(txn0->Commit()); + worker.join(); + + SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {