diff --git a/HISTORY.md b/HISTORY.md index b10c72958..0513248d0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ ### Bug Fixes * Fixed a data race on `ColumnFamilyData::flush_reason` caused by concurrent flushes. * Fixed an issue in `Get` and `MultiGet` when user-defined timestamps is enabled in combination with BlobDB. +* Fixed some atypical behaviors for `LockWAL()` such as allowing concurrent/recursive use and not expecting `UnlockWAL()` after non-OK result. See API comments. ### Feature Removal * Remove RocksDB Lite. diff --git a/buckifier/buckify_rocksdb.py b/buckifier/buckify_rocksdb.py index ac09c0519..d7e9b645b 100755 --- a/buckifier/buckify_rocksdb.py +++ b/buckifier/buckify_rocksdb.py @@ -26,7 +26,7 @@ from util import ColorString # $python3 buckifier/buckify_rocksdb.py \ # '{"fake": { # "extra_deps": [":test_dep", "//fakes/module:mock1"], -# "extra_compiler_flags": ["-DROCKSDB_LITE", "-Os"] +# "extra_compiler_flags": ["-DFOO_BAR", "-Os"] # } # }' # (Generated TARGETS file has test_dep and mock1 as dependencies for RocksDB diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 2acf9b2fb..0b47acd2a 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -243,7 +243,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, atomic_flush_install_cv_(&mutex_), blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_, &error_handler_, &event_logger_, - immutable_db_options_.listeners, dbname_) { + immutable_db_options_.listeners, dbname_), + lock_wal_count_(0) { // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. assert(batch_per_txn_ || seq_per_batch_); @@ -1429,15 +1430,10 @@ Status DBImpl::FlushWAL(bool sync) { return SyncWAL(); } -bool DBImpl::WALBufferIsEmpty(bool lock) { - if (lock) { - log_write_mutex_.Lock(); - } +bool DBImpl::WALBufferIsEmpty() { + InstrumentedMutexLock l(&log_write_mutex_); log::Writer* cur_log_writer = logs_.back().writer; auto res = cur_log_writer->BufferIsEmpty(); - if (lock) { - log_write_mutex_.Unlock(); - } return res; } @@ -1539,29 +1535,57 @@ Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) { Status DBImpl::LockWAL() { { InstrumentedMutexLock lock(&mutex_); - WriteThread::Writer w; - write_thread_.EnterUnbatched(&w, &mutex_); - WriteThread::Writer nonmem_w; - if (two_write_queues_) { - nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); - } + if (lock_wal_count_ > 0) { + assert(lock_wal_write_token_); + ++lock_wal_count_; + } else { + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } - lock_wal_write_token_ = write_controller_.GetStopToken(); + // NOTE: releasing mutex in EnterUnbatched might mean we are actually + // now lock_wal_count > 0 + if (lock_wal_count_ == 0) { + assert(!lock_wal_write_token_); + lock_wal_write_token_ = write_controller_.GetStopToken(); + } + ++lock_wal_count_; - if (two_write_queues_) { - nonmem_write_thread_.ExitUnbatched(&nonmem_w); + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + write_thread_.ExitUnbatched(&w); } - write_thread_.ExitUnbatched(&w); } - return FlushWAL(/*sync=*/false); + // NOTE: avoid I/O holding DB mutex + Status s = FlushWAL(/*sync=*/false); + if (!s.ok()) { + // Non-OK return should not be in locked state + UnlockWAL().PermitUncheckedError(); + } + return s; } Status DBImpl::UnlockWAL() { + bool signal = false; { InstrumentedMutexLock lock(&mutex_); - lock_wal_write_token_.reset(); + if (lock_wal_count_ == 0) { + return Status::Aborted("No LockWAL() in effect"); + } + --lock_wal_count_; + if (lock_wal_count_ == 0) { + lock_wal_write_token_.reset(); + signal = true; + } + } + if (signal) { + // SignalAll outside of mutex for efficiency + bg_cv_.SignalAll(); } - bg_cv_.SignalAll(); return Status::OK(); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 72d7e08f7..6a930b2d8 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -426,7 +426,7 @@ class DBImpl : public DB { const FlushOptions& options, const std::vector& column_families) override; virtual Status FlushWAL(bool sync) override; - bool WALBufferIsEmpty(bool lock = true); + bool WALBufferIsEmpty(); virtual Status SyncWAL() override; virtual Status LockWAL() override; virtual Status UnlockWAL() override; @@ -2663,9 +2663,14 @@ class DBImpl : public DB { // thread safe, both read and write need db mutex hold. SeqnoToTimeMapping seqno_time_mapping_; - // stop write token that is acquired when LockWal() is called. Destructed - // when UnlockWal() is called. + // Stop write token that is acquired when first LockWAL() is called. + // Destroyed when last UnlockWAL() is called. Controlled by DB mutex. + // See lock_wal_count_ std::unique_ptr lock_wal_write_token_; + + // The number of LockWAL called without matching UnlockWAL call. + // See also lock_wal_write_token_ + uint32_t lock_wal_count_; }; class GetWithTimestampReadCallback : public ReadCallback { diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 9cb9364d3..e733cf106 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -613,19 +613,9 @@ TEST_F(DBWALTest, LockWal) { Options options = CurrentOptions(); options.create_if_missing = true; DestroyAndReopen(options); - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->LoadDependency( - {{"DBWALTest::LockWal:AfterGetSortedWal", - "DBWALTest::LockWal:BeforeFlush:1"}}); - SyncPoint::GetInstance()->EnableProcessing(); ASSERT_OK(Put("foo", "v")); ASSERT_OK(Put("bar", "v")); - port::Thread worker([&]() { - TEST_SYNC_POINT("DBWALTest::LockWal:BeforeFlush:1"); - Status tmp_s = db_->Flush(FlushOptions()); - ASSERT_OK(tmp_s); - }); ASSERT_OK(db_->LockWAL()); // Verify writes are stopped @@ -638,7 +628,10 @@ TEST_F(DBWALTest, LockWal) { ASSERT_OK(db_->GetSortedWalFiles(wals)); ASSERT_FALSE(wals.empty()); } - TEST_SYNC_POINT("DBWALTest::LockWal:AfterGetSortedWal"); + port::Thread worker([&]() { + Status tmp_s = db_->Flush(FlushOptions()); + ASSERT_OK(tmp_s); + }); FlushOptions flush_opts; flush_opts.wait = false; s = db_->Flush(flush_opts); @@ -647,8 +640,6 @@ TEST_F(DBWALTest, LockWal) { ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare")); worker.join(); - - SyncPoint::GetInstance()->DisableProcessing(); } while (ChangeWalOptions()); } diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 1011d5c9e..3c7271a9b 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -4,6 +4,7 @@ // (found in the LICENSE.Apache file in the root directory). #include +#include #include #include #include @@ -605,23 +606,124 @@ TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) { Close(); } -// Test that db->LockWAL() flushes the WAL after locking. -TEST_P(DBWriteTest, LockWalInEffect) { +// Test that db->LockWAL() flushes the WAL after locking, which can fail +TEST_P(DBWriteTest, LockWALInEffect) { Options options = GetOptions(); + std::unique_ptr mock_env( + new FaultInjectionTestEnv(env_)); + options.env = mock_env.get(); + options.paranoid_checks = false; Reopen(options); // try the 1st WAL created during open - ASSERT_OK(Put("key" + std::to_string(0), "value")); - ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); - ASSERT_OK(dbfull()->LockWAL()); - ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false)); - ASSERT_OK(dbfull()->UnlockWAL()); + ASSERT_OK(Put("key0", "value")); + ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty()); + ASSERT_OK(db_->LockWAL()); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty()); + ASSERT_OK(db_->UnlockWAL()); // try the 2nd wal created during SwitchWAL ASSERT_OK(dbfull()->TEST_SwitchWAL()); - ASSERT_OK(Put("key" + std::to_string(0), "value")); - ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); - ASSERT_OK(dbfull()->LockWAL()); - ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false)); - ASSERT_OK(dbfull()->UnlockWAL()); + ASSERT_OK(Put("key1", "value")); + ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty()); + ASSERT_OK(db_->LockWAL()); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty()); + ASSERT_OK(db_->UnlockWAL()); + + // Fail the WAL flush if applicable + mock_env->SetFilesystemActive(false); + Status s = Put("key2", "value"); + if (options.manual_wal_flush) { + ASSERT_OK(s); + // I/O failure + ASSERT_NOK(db_->LockWAL()); + // Should not need UnlockWAL after LockWAL fails + } else { + ASSERT_NOK(s); + ASSERT_OK(db_->LockWAL()); + ASSERT_OK(db_->UnlockWAL()); + } + mock_env->SetFilesystemActive(true); + // Writes should work again + ASSERT_OK(Put("key3", "value")); + ASSERT_EQ(Get("key3"), "value"); + + // Should be extraneous, but allowed + ASSERT_NOK(db_->UnlockWAL()); + + // Close before mock_env destruct. + Close(); +} + +TEST_P(DBWriteTest, LockWALConcurrentRecursive) { + Options options = GetOptions(); + Reopen(options); + ASSERT_OK(Put("k1", "val")); + ASSERT_OK(db_->LockWAL()); // 0 -> 1 + auto frozen_seqno = db_->GetLatestSequenceNumber(); + std::atomic t1_completed{false}; + port::Thread t1{[&]() { + // Won't finish until WAL unlocked + ASSERT_OK(Put("k1", "val2")); + t1_completed = true; + }}; + + ASSERT_OK(db_->LockWAL()); // 1 -> 2 + // Read-only ops are OK + ASSERT_EQ(Get("k1"), "val"); + { + std::vector files; + LiveFilesStorageInfoOptions lf_opts; + // A DB flush could deadlock + lf_opts.wal_size_for_flush = UINT64_MAX; + ASSERT_OK(db_->GetLiveFilesStorageInfo({lf_opts}, &files)); + } + + port::Thread t2{[&]() { + ASSERT_OK(db_->LockWAL()); // 2 -> 3 or 1 -> 2 + }}; + + ASSERT_OK(db_->UnlockWAL()); // 2 -> 1 or 3 -> 2 + // Give t1 an extra chance to jump in case of bug + std::this_thread::yield(); + t2.join(); + ASSERT_FALSE(t1_completed.load()); + + // Should now have 2 outstanding LockWAL + ASSERT_EQ(Get("k1"), "val"); + + ASSERT_OK(db_->UnlockWAL()); // 2 -> 1 + + ASSERT_FALSE(t1_completed.load()); + ASSERT_EQ(Get("k1"), "val"); + ASSERT_EQ(frozen_seqno, db_->GetLatestSequenceNumber()); + + // Ensure final Unlock is concurrency safe and extra Unlock is safe but + // non-OK + std::atomic unlock_ok{0}; + port::Thread t3{[&]() { + if (db_->UnlockWAL().ok()) { + unlock_ok++; + } + ASSERT_OK(db_->LockWAL()); + if (db_->UnlockWAL().ok()) { + unlock_ok++; + } + }}; + + if (db_->UnlockWAL().ok()) { + unlock_ok++; + } + t3.join(); + + // There was one extra unlock, so just one non-ok + ASSERT_EQ(unlock_ok.load(), 2); + + // Write can proceed + t1.join(); + ASSERT_TRUE(t1_completed.load()); + ASSERT_EQ(Get("k1"), "val2"); + // And new writes + ASSERT_OK(Put("k2", "val")); + ASSERT_EQ(Get("k2"), "val"); } TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) { diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index c1f9e7baf..1256cc146 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -87,6 +87,7 @@ DECLARE_int64(active_width); DECLARE_bool(test_batches_snapshots); DECLARE_bool(atomic_flush); DECLARE_int32(manual_wal_flush_one_in); +DECLARE_int32(lock_wal_one_in); DECLARE_bool(test_cf_consistency); DECLARE_bool(test_multi_ops_txns); DECLARE_int32(threads); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index ddedbdde9..eda58c0b3 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -92,6 +92,10 @@ DEFINE_int32( "on average. Setting `manual_wal_flush_one_in` to be greater than 0 " "implies `Options::manual_wal_flush = true` is set."); +DEFINE_int32(lock_wal_one_in, 1000000, + "If non-zero, then `LockWAL()` + `UnlockWAL()` will be called in " + "db_stress once for every N ops on average."); + DEFINE_bool(test_cf_consistency, false, "If set, runs the stress test dedicated to verifying writes to " "multiple column families are consistent. Setting this implies " diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index a1d9b08b0..cb3fec327 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -9,6 +9,7 @@ // #include +#include #include "util/compression.h" #ifdef GFLAGS @@ -828,6 +829,31 @@ void StressTest::OperateDb(ThreadState* thread) { } } + if (thread->rand.OneInOpt(FLAGS_lock_wal_one_in)) { + Status s = db_->LockWAL(); + if (!s.ok()) { + fprintf(stderr, "LockWAL() failed: %s\n", s.ToString().c_str()); + } else { + auto old_seqno = db_->GetLatestSequenceNumber(); + // Yield for a while + do { + std::this_thread::yield(); + } while (thread->rand.OneIn(2)); + // Latest seqno should not have changed + auto new_seqno = db_->GetLatestSequenceNumber(); + if (old_seqno != new_seqno) { + fprintf( + stderr, + "Failure: latest seqno changed from %u to %u with WAL locked\n", + (unsigned)old_seqno, (unsigned)new_seqno); + } + s = db_->UnlockWAL(); + if (!s.ok()) { + fprintf(stderr, "UnlockWAL() failed: %s\n", s.ToString().c_str()); + } + } + } + if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) { Status s = db_->SyncWAL(); if (!s.ok() && !s.IsNotSupported()) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index a5973b479..fc027a9d3 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1400,7 +1400,7 @@ class DB { virtual DBOptions GetDBOptions() const = 0; - // Flush all mem-table data. + // Flush all memtable data. // Flush a single column family, even when atomic flush is enabled. To flush // multiple column families, use Flush(options, column_families). virtual Status Flush(const FlushOptions& options, @@ -1408,7 +1408,7 @@ class DB { virtual Status Flush(const FlushOptions& options) { return Flush(options, DefaultColumnFamily()); } - // Flushes multiple column families. + // Flushes memtables of multiple column families. // If atomic flush is not enabled, Flush(options, column_families) is // equivalent to calling Flush(options, column_family) multiple times. // If atomic flush is enabled, Flush(options, column_families) will flush all @@ -1420,29 +1420,41 @@ class DB { const FlushOptions& options, const std::vector& column_families) = 0; - // Flush the WAL memory buffer to the file. If sync is true, it calls SyncWAL - // afterwards. + // When using the manual_wal_flush option, flushes RocksDB internal buffers + // of WAL data to the file, so that the data can survive process crash or be + // included in a Checkpoint or Backup. Without manual_wal_flush, there is no + // such internal buffer. If sync is true, it calls SyncWAL() afterwards. virtual Status FlushWAL(bool /*sync*/) { return Status::NotSupported("FlushWAL not implemented"); } - // Sync the wal. Note that Write() followed by SyncWAL() is not exactly the - // same as Write() with sync=true: in the latter case the changes won't be - // visible until the sync is done. - // Currently only works if allow_mmap_writes = false in Options. + + // Ensure all WAL writes have been synced to storage, so that (assuming OS + // and hardware support) data will survive power loss. This function does + // not imply FlushWAL, so `FlushWAL(true)` is recommended if using + // manual_wal_flush=true. Currently only works if allow_mmap_writes = false + // in Options. + // + // Note that Write() followed by SyncWAL() is not exactly the same as Write() + // with sync=true: in the latter case the changes won't be visible until the + // sync is done. virtual Status SyncWAL() = 0; - // Lock the WAL. Also flushes the WAL after locking. - // After this method returns ok, writes to the database will be stopped until - // UnlockWAL() is called. - // This method may internally acquire and release DB mutex and the WAL write - // mutex, but after it returns, neither mutex is held by caller. + // Freezes the logical state of the DB (by stopping writes), and if WAL is + // enabled, ensures that state has been flushed to DB files (as in + // FlushWAL()). This can be used for taking a Checkpoint at a known DB + // state, though the user must use options to insure no DB flush is invoked + // in this frozen state. Other operations allowed on a "read only" DB should + // work while frozen. Each LockWAL() call that returns OK must eventually be + // followed by a corresponding call to UnlockWAL(). Where supported, non-OK + // status is generally only possible with some kind of corruption or I/O + // error. virtual Status LockWAL() { return Status::NotSupported("LockWAL not implemented"); } - // Unlock the WAL. - // The write stop on the database will be cleared. - // This method may internally acquire and release DB mutex. + // Unfreeze the DB state from a successful LockWAL(). + // The write stop on the database will be cleared when UnlockWAL() have been + // called for each successful LockWAL(). virtual Status UnlockWAL() { return Status::NotSupported("UnlockWAL not implemented"); } diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 836782894..0aea6ffd4 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -88,6 +88,7 @@ default_params = { "index_type": lambda: random.choice([0, 0, 0, 2, 2, 3]), "ingest_external_file_one_in": 1000000, "iterpercent": 10, + "lock_wal_one_in": 1000000, "mark_for_compaction_one_file_in": lambda: 10 * random.randint(0, 1), "max_background_compactions": 20, "max_bytes_for_level_base": 10485760, diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index 5020da452..2bdab44fd 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -854,6 +854,30 @@ TEST_F(CheckpointTest, CheckpointReadOnlyDB) { delete snapshot_db; } +TEST_F(CheckpointTest, CheckpointWithLockWAL) { + Options options = CurrentOptions(); + ASSERT_OK(Put("foo", "foo_value")); + + ASSERT_OK(db_->LockWAL()); + + Checkpoint* checkpoint = nullptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + delete checkpoint; + checkpoint = nullptr; + + ASSERT_OK(db_->UnlockWAL()); + Close(); + + DB* snapshot_db = nullptr; + ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db)); + ReadOptions read_opts; + std::string get_result; + ASSERT_OK(snapshot_db->Get(read_opts, "foo", &get_result)); + ASSERT_EQ("foo_value", get_result); + delete snapshot_db; +} + TEST_F(CheckpointTest, CheckpointReadOnlyDBWithMultipleColumnFamilies) { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "eevee"}, options);