From 94e3beec7773561d5d9d074dc52989a9b2b5e6a4 Mon Sep 17 00:00:00 2001 From: Peter Dillinger Date: Mon, 30 Jan 2023 22:52:30 -0800 Subject: [PATCH] Cleanup, improve, stress test LockWAL() (#11143) Summary: The previous API comments for LockWAL didn't provide much about why you might want to use it, and didn't really meet what one would infer its contract was. Also, LockWAL was not in db_stress / crash test. In this change: * Implement a counting semantics for LockWAL()+UnlockWAL(), so that they can safely be used concurrently across threads or recursively within a thread. This should make the API much less bug-prone and easier to use. * Make sure no UnlockWAL() is needed after non-OK LockWAL() (to match RocksDB conventions) * Make UnlockWAL() reliably return non-OK when there's no matching LockWAL() (for debug-ability) * Clarify API comments on LockWAL(), UnlockWAL(), FlushWAL(), and SyncWAL(). Their exact meanings are not obvious, and I don't think it's appropriate to talk about implementation mutexes in the API comments, but about what operations might block each other. * Add LockWAL()/UnlockWAL() to db_stress and crash test, mostly to check for assertion failures, but also checks that latest seqno doesn't change while WAL is locked. This is simpler to add when LockWAL() is allowed in multiple threads. * Remove unnecessary use of sync points in test DBWALTest::LockWal. There was a bug during development of above changes that caused this test to fail sporadically, with and without this sync point change. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11143 Test Plan: unit tests added / updated, added to stress/crash test Reviewed By: ajkr Differential Revision: D42848627 Pulled By: pdillinger fbshipit-source-id: 6d976c51791941a31fd8fbf28b0f82e888d9f4b4 --- HISTORY.md | 1 + buckifier/buckify_rocksdb.py | 2 +- db/db_impl/db_impl.cc | 66 +++++++++---- db/db_impl/db_impl.h | 11 ++- db/db_wal_test.cc | 17 +--- db/db_write_test.cc | 126 +++++++++++++++++++++--- db_stress_tool/db_stress_common.h | 1 + db_stress_tool/db_stress_gflags.cc | 4 + db_stress_tool/db_stress_test_base.cc | 26 +++++ include/rocksdb/db.h | 44 ++++++--- tools/db_crashtest.py | 1 + utilities/checkpoint/checkpoint_test.cc | 24 +++++ 12 files changed, 257 insertions(+), 66 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index b10c72958..0513248d0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ ### Bug Fixes * Fixed a data race on `ColumnFamilyData::flush_reason` caused by concurrent flushes. * Fixed an issue in `Get` and `MultiGet` when user-defined timestamps is enabled in combination with BlobDB. +* Fixed some atypical behaviors for `LockWAL()` such as allowing concurrent/recursive use and not expecting `UnlockWAL()` after non-OK result. See API comments. ### Feature Removal * Remove RocksDB Lite. diff --git a/buckifier/buckify_rocksdb.py b/buckifier/buckify_rocksdb.py index ac09c0519..d7e9b645b 100755 --- a/buckifier/buckify_rocksdb.py +++ b/buckifier/buckify_rocksdb.py @@ -26,7 +26,7 @@ from util import ColorString # $python3 buckifier/buckify_rocksdb.py \ # '{"fake": { # "extra_deps": [":test_dep", "//fakes/module:mock1"], -# "extra_compiler_flags": ["-DROCKSDB_LITE", "-Os"] +# "extra_compiler_flags": ["-DFOO_BAR", "-Os"] # } # }' # (Generated TARGETS file has test_dep and mock1 as dependencies for RocksDB diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 2acf9b2fb..0b47acd2a 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -243,7 +243,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, atomic_flush_install_cv_(&mutex_), blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_, &error_handler_, &event_logger_, - immutable_db_options_.listeners, dbname_) { + immutable_db_options_.listeners, dbname_), + lock_wal_count_(0) { // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. assert(batch_per_txn_ || seq_per_batch_); @@ -1429,15 +1430,10 @@ Status DBImpl::FlushWAL(bool sync) { return SyncWAL(); } -bool DBImpl::WALBufferIsEmpty(bool lock) { - if (lock) { - log_write_mutex_.Lock(); - } +bool DBImpl::WALBufferIsEmpty() { + InstrumentedMutexLock l(&log_write_mutex_); log::Writer* cur_log_writer = logs_.back().writer; auto res = cur_log_writer->BufferIsEmpty(); - if (lock) { - log_write_mutex_.Unlock(); - } return res; } @@ -1539,29 +1535,57 @@ Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) { Status DBImpl::LockWAL() { { InstrumentedMutexLock lock(&mutex_); - WriteThread::Writer w; - write_thread_.EnterUnbatched(&w, &mutex_); - WriteThread::Writer nonmem_w; - if (two_write_queues_) { - nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); - } + if (lock_wal_count_ > 0) { + assert(lock_wal_write_token_); + ++lock_wal_count_; + } else { + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } - lock_wal_write_token_ = write_controller_.GetStopToken(); + // NOTE: releasing mutex in EnterUnbatched might mean we are actually + // now lock_wal_count > 0 + if (lock_wal_count_ == 0) { + assert(!lock_wal_write_token_); + lock_wal_write_token_ = write_controller_.GetStopToken(); + } + ++lock_wal_count_; - if (two_write_queues_) { - nonmem_write_thread_.ExitUnbatched(&nonmem_w); + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + write_thread_.ExitUnbatched(&w); } - write_thread_.ExitUnbatched(&w); } - return FlushWAL(/*sync=*/false); + // NOTE: avoid I/O holding DB mutex + Status s = FlushWAL(/*sync=*/false); + if (!s.ok()) { + // Non-OK return should not be in locked state + UnlockWAL().PermitUncheckedError(); + } + return s; } Status DBImpl::UnlockWAL() { + bool signal = false; { InstrumentedMutexLock lock(&mutex_); - lock_wal_write_token_.reset(); + if (lock_wal_count_ == 0) { + return Status::Aborted("No LockWAL() in effect"); + } + --lock_wal_count_; + if (lock_wal_count_ == 0) { + lock_wal_write_token_.reset(); + signal = true; + } + } + if (signal) { + // SignalAll outside of mutex for efficiency + bg_cv_.SignalAll(); } - bg_cv_.SignalAll(); return Status::OK(); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 72d7e08f7..6a930b2d8 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -426,7 +426,7 @@ class DBImpl : public DB { const FlushOptions& options, const std::vector& column_families) override; virtual Status FlushWAL(bool sync) override; - bool WALBufferIsEmpty(bool lock = true); + bool WALBufferIsEmpty(); virtual Status SyncWAL() override; virtual Status LockWAL() override; virtual Status UnlockWAL() override; @@ -2663,9 +2663,14 @@ class DBImpl : public DB { // thread safe, both read and write need db mutex hold. SeqnoToTimeMapping seqno_time_mapping_; - // stop write token that is acquired when LockWal() is called. Destructed - // when UnlockWal() is called. + // Stop write token that is acquired when first LockWAL() is called. + // Destroyed when last UnlockWAL() is called. Controlled by DB mutex. + // See lock_wal_count_ std::unique_ptr lock_wal_write_token_; + + // The number of LockWAL called without matching UnlockWAL call. + // See also lock_wal_write_token_ + uint32_t lock_wal_count_; }; class GetWithTimestampReadCallback : public ReadCallback { diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 9cb9364d3..e733cf106 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -613,19 +613,9 @@ TEST_F(DBWALTest, LockWal) { Options options = CurrentOptions(); options.create_if_missing = true; DestroyAndReopen(options); - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->LoadDependency( - {{"DBWALTest::LockWal:AfterGetSortedWal", - "DBWALTest::LockWal:BeforeFlush:1"}}); - SyncPoint::GetInstance()->EnableProcessing(); ASSERT_OK(Put("foo", "v")); ASSERT_OK(Put("bar", "v")); - port::Thread worker([&]() { - TEST_SYNC_POINT("DBWALTest::LockWal:BeforeFlush:1"); - Status tmp_s = db_->Flush(FlushOptions()); - ASSERT_OK(tmp_s); - }); ASSERT_OK(db_->LockWAL()); // Verify writes are stopped @@ -638,7 +628,10 @@ TEST_F(DBWALTest, LockWal) { ASSERT_OK(db_->GetSortedWalFiles(wals)); ASSERT_FALSE(wals.empty()); } - TEST_SYNC_POINT("DBWALTest::LockWal:AfterGetSortedWal"); + port::Thread worker([&]() { + Status tmp_s = db_->Flush(FlushOptions()); + ASSERT_OK(tmp_s); + }); FlushOptions flush_opts; flush_opts.wait = false; s = db_->Flush(flush_opts); @@ -647,8 +640,6 @@ TEST_F(DBWALTest, LockWal) { ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare")); worker.join(); - - SyncPoint::GetInstance()->DisableProcessing(); } while (ChangeWalOptions()); } diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 1011d5c9e..3c7271a9b 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -4,6 +4,7 @@ // (found in the LICENSE.Apache file in the root directory). #include +#include #include #include #include @@ -605,23 +606,124 @@ TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) { Close(); } -// Test that db->LockWAL() flushes the WAL after locking. -TEST_P(DBWriteTest, LockWalInEffect) { +// Test that db->LockWAL() flushes the WAL after locking, which can fail +TEST_P(DBWriteTest, LockWALInEffect) { Options options = GetOptions(); + std::unique_ptr mock_env( + new FaultInjectionTestEnv(env_)); + options.env = mock_env.get(); + options.paranoid_checks = false; Reopen(options); // try the 1st WAL created during open - ASSERT_OK(Put("key" + std::to_string(0), "value")); - ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); - ASSERT_OK(dbfull()->LockWAL()); - ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false)); - ASSERT_OK(dbfull()->UnlockWAL()); + ASSERT_OK(Put("key0", "value")); + ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty()); + ASSERT_OK(db_->LockWAL()); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty()); + ASSERT_OK(db_->UnlockWAL()); // try the 2nd wal created during SwitchWAL ASSERT_OK(dbfull()->TEST_SwitchWAL()); - ASSERT_OK(Put("key" + std::to_string(0), "value")); - ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); - ASSERT_OK(dbfull()->LockWAL()); - ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false)); - ASSERT_OK(dbfull()->UnlockWAL()); + ASSERT_OK(Put("key1", "value")); + ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty()); + ASSERT_OK(db_->LockWAL()); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty()); + ASSERT_OK(db_->UnlockWAL()); + + // Fail the WAL flush if applicable + mock_env->SetFilesystemActive(false); + Status s = Put("key2", "value"); + if (options.manual_wal_flush) { + ASSERT_OK(s); + // I/O failure + ASSERT_NOK(db_->LockWAL()); + // Should not need UnlockWAL after LockWAL fails + } else { + ASSERT_NOK(s); + ASSERT_OK(db_->LockWAL()); + ASSERT_OK(db_->UnlockWAL()); + } + mock_env->SetFilesystemActive(true); + // Writes should work again + ASSERT_OK(Put("key3", "value")); + ASSERT_EQ(Get("key3"), "value"); + + // Should be extraneous, but allowed + ASSERT_NOK(db_->UnlockWAL()); + + // Close before mock_env destruct. + Close(); +} + +TEST_P(DBWriteTest, LockWALConcurrentRecursive) { + Options options = GetOptions(); + Reopen(options); + ASSERT_OK(Put("k1", "val")); + ASSERT_OK(db_->LockWAL()); // 0 -> 1 + auto frozen_seqno = db_->GetLatestSequenceNumber(); + std::atomic t1_completed{false}; + port::Thread t1{[&]() { + // Won't finish until WAL unlocked + ASSERT_OK(Put("k1", "val2")); + t1_completed = true; + }}; + + ASSERT_OK(db_->LockWAL()); // 1 -> 2 + // Read-only ops are OK + ASSERT_EQ(Get("k1"), "val"); + { + std::vector files; + LiveFilesStorageInfoOptions lf_opts; + // A DB flush could deadlock + lf_opts.wal_size_for_flush = UINT64_MAX; + ASSERT_OK(db_->GetLiveFilesStorageInfo({lf_opts}, &files)); + } + + port::Thread t2{[&]() { + ASSERT_OK(db_->LockWAL()); // 2 -> 3 or 1 -> 2 + }}; + + ASSERT_OK(db_->UnlockWAL()); // 2 -> 1 or 3 -> 2 + // Give t1 an extra chance to jump in case of bug + std::this_thread::yield(); + t2.join(); + ASSERT_FALSE(t1_completed.load()); + + // Should now have 2 outstanding LockWAL + ASSERT_EQ(Get("k1"), "val"); + + ASSERT_OK(db_->UnlockWAL()); // 2 -> 1 + + ASSERT_FALSE(t1_completed.load()); + ASSERT_EQ(Get("k1"), "val"); + ASSERT_EQ(frozen_seqno, db_->GetLatestSequenceNumber()); + + // Ensure final Unlock is concurrency safe and extra Unlock is safe but + // non-OK + std::atomic unlock_ok{0}; + port::Thread t3{[&]() { + if (db_->UnlockWAL().ok()) { + unlock_ok++; + } + ASSERT_OK(db_->LockWAL()); + if (db_->UnlockWAL().ok()) { + unlock_ok++; + } + }}; + + if (db_->UnlockWAL().ok()) { + unlock_ok++; + } + t3.join(); + + // There was one extra unlock, so just one non-ok + ASSERT_EQ(unlock_ok.load(), 2); + + // Write can proceed + t1.join(); + ASSERT_TRUE(t1_completed.load()); + ASSERT_EQ(Get("k1"), "val2"); + // And new writes + ASSERT_OK(Put("k2", "val")); + ASSERT_EQ(Get("k2"), "val"); } TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) { diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index c1f9e7baf..1256cc146 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -87,6 +87,7 @@ DECLARE_int64(active_width); DECLARE_bool(test_batches_snapshots); DECLARE_bool(atomic_flush); DECLARE_int32(manual_wal_flush_one_in); +DECLARE_int32(lock_wal_one_in); DECLARE_bool(test_cf_consistency); DECLARE_bool(test_multi_ops_txns); DECLARE_int32(threads); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index ddedbdde9..eda58c0b3 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -92,6 +92,10 @@ DEFINE_int32( "on average. Setting `manual_wal_flush_one_in` to be greater than 0 " "implies `Options::manual_wal_flush = true` is set."); +DEFINE_int32(lock_wal_one_in, 1000000, + "If non-zero, then `LockWAL()` + `UnlockWAL()` will be called in " + "db_stress once for every N ops on average."); + DEFINE_bool(test_cf_consistency, false, "If set, runs the stress test dedicated to verifying writes to " "multiple column families are consistent. Setting this implies " diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index a1d9b08b0..cb3fec327 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -9,6 +9,7 @@ // #include +#include #include "util/compression.h" #ifdef GFLAGS @@ -828,6 +829,31 @@ void StressTest::OperateDb(ThreadState* thread) { } } + if (thread->rand.OneInOpt(FLAGS_lock_wal_one_in)) { + Status s = db_->LockWAL(); + if (!s.ok()) { + fprintf(stderr, "LockWAL() failed: %s\n", s.ToString().c_str()); + } else { + auto old_seqno = db_->GetLatestSequenceNumber(); + // Yield for a while + do { + std::this_thread::yield(); + } while (thread->rand.OneIn(2)); + // Latest seqno should not have changed + auto new_seqno = db_->GetLatestSequenceNumber(); + if (old_seqno != new_seqno) { + fprintf( + stderr, + "Failure: latest seqno changed from %u to %u with WAL locked\n", + (unsigned)old_seqno, (unsigned)new_seqno); + } + s = db_->UnlockWAL(); + if (!s.ok()) { + fprintf(stderr, "UnlockWAL() failed: %s\n", s.ToString().c_str()); + } + } + } + if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) { Status s = db_->SyncWAL(); if (!s.ok() && !s.IsNotSupported()) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index a5973b479..fc027a9d3 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1400,7 +1400,7 @@ class DB { virtual DBOptions GetDBOptions() const = 0; - // Flush all mem-table data. + // Flush all memtable data. // Flush a single column family, even when atomic flush is enabled. To flush // multiple column families, use Flush(options, column_families). virtual Status Flush(const FlushOptions& options, @@ -1408,7 +1408,7 @@ class DB { virtual Status Flush(const FlushOptions& options) { return Flush(options, DefaultColumnFamily()); } - // Flushes multiple column families. + // Flushes memtables of multiple column families. // If atomic flush is not enabled, Flush(options, column_families) is // equivalent to calling Flush(options, column_family) multiple times. // If atomic flush is enabled, Flush(options, column_families) will flush all @@ -1420,29 +1420,41 @@ class DB { const FlushOptions& options, const std::vector& column_families) = 0; - // Flush the WAL memory buffer to the file. If sync is true, it calls SyncWAL - // afterwards. + // When using the manual_wal_flush option, flushes RocksDB internal buffers + // of WAL data to the file, so that the data can survive process crash or be + // included in a Checkpoint or Backup. Without manual_wal_flush, there is no + // such internal buffer. If sync is true, it calls SyncWAL() afterwards. virtual Status FlushWAL(bool /*sync*/) { return Status::NotSupported("FlushWAL not implemented"); } - // Sync the wal. Note that Write() followed by SyncWAL() is not exactly the - // same as Write() with sync=true: in the latter case the changes won't be - // visible until the sync is done. - // Currently only works if allow_mmap_writes = false in Options. + + // Ensure all WAL writes have been synced to storage, so that (assuming OS + // and hardware support) data will survive power loss. This function does + // not imply FlushWAL, so `FlushWAL(true)` is recommended if using + // manual_wal_flush=true. Currently only works if allow_mmap_writes = false + // in Options. + // + // Note that Write() followed by SyncWAL() is not exactly the same as Write() + // with sync=true: in the latter case the changes won't be visible until the + // sync is done. virtual Status SyncWAL() = 0; - // Lock the WAL. Also flushes the WAL after locking. - // After this method returns ok, writes to the database will be stopped until - // UnlockWAL() is called. - // This method may internally acquire and release DB mutex and the WAL write - // mutex, but after it returns, neither mutex is held by caller. + // Freezes the logical state of the DB (by stopping writes), and if WAL is + // enabled, ensures that state has been flushed to DB files (as in + // FlushWAL()). This can be used for taking a Checkpoint at a known DB + // state, though the user must use options to insure no DB flush is invoked + // in this frozen state. Other operations allowed on a "read only" DB should + // work while frozen. Each LockWAL() call that returns OK must eventually be + // followed by a corresponding call to UnlockWAL(). Where supported, non-OK + // status is generally only possible with some kind of corruption or I/O + // error. virtual Status LockWAL() { return Status::NotSupported("LockWAL not implemented"); } - // Unlock the WAL. - // The write stop on the database will be cleared. - // This method may internally acquire and release DB mutex. + // Unfreeze the DB state from a successful LockWAL(). + // The write stop on the database will be cleared when UnlockWAL() have been + // called for each successful LockWAL(). virtual Status UnlockWAL() { return Status::NotSupported("UnlockWAL not implemented"); } diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 836782894..0aea6ffd4 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -88,6 +88,7 @@ default_params = { "index_type": lambda: random.choice([0, 0, 0, 2, 2, 3]), "ingest_external_file_one_in": 1000000, "iterpercent": 10, + "lock_wal_one_in": 1000000, "mark_for_compaction_one_file_in": lambda: 10 * random.randint(0, 1), "max_background_compactions": 20, "max_bytes_for_level_base": 10485760, diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index 5020da452..2bdab44fd 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -854,6 +854,30 @@ TEST_F(CheckpointTest, CheckpointReadOnlyDB) { delete snapshot_db; } +TEST_F(CheckpointTest, CheckpointWithLockWAL) { + Options options = CurrentOptions(); + ASSERT_OK(Put("foo", "foo_value")); + + ASSERT_OK(db_->LockWAL()); + + Checkpoint* checkpoint = nullptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + checkpoint = nullptr; + + ASSERT_OK(db_->UnlockWAL()); + Close(); + + DB* snapshot_db = nullptr; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ReadOptions read_opts; + std::string get_result; + ASSERT_OK(snapshot_db->Get(read_opts, "foo", &get_result)); + ASSERT_EQ("foo_value", get_result); + delete snapshot_db; +} + TEST_F(CheckpointTest, CheckpointReadOnlyDBWithMultipleColumnFamilies) { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "eevee"}, options);