diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index a7c6ff316..7cd8c30d6 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1453,6 +1453,18 @@ Status DBImpl::FlushWAL(bool sync) { return SyncWAL(); } +bool DBImpl::WALBufferIsEmpty(bool lock) { + if (lock) { + log_write_mutex_.Lock(); + } + log::Writer* cur_log_writer = logs_.back().writer; + auto res = cur_log_writer->BufferIsEmpty(); + if (lock) { + log_write_mutex_.Unlock(); + } + return res; +} + Status DBImpl::SyncWAL() { TEST_SYNC_POINT("DBImpl::SyncWAL:Begin"); autovector logs_to_sync; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 03418c1d5..0a2cd12e9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -423,7 +423,7 @@ class DBImpl : public DB { const FlushOptions& options, const std::vector& column_families) override; virtual Status FlushWAL(bool sync) override; - bool TEST_WALBufferIsEmpty(bool lock = true); + bool WALBufferIsEmpty(bool lock = true); virtual Status SyncWAL() override; virtual Status LockWAL() override; virtual Status UnlockWAL() override; diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 52b0b67a2..7054b0669 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -31,18 +31,6 @@ Status DBImpl::TEST_SwitchWAL() { return s; } -bool DBImpl::TEST_WALBufferIsEmpty(bool lock) { - if (lock) { - log_write_mutex_.Lock(); - } - log::Writer* cur_log_writer = logs_.back().writer; - auto res = cur_log_writer->TEST_BufferIsEmpty(); - if (lock) { - log_write_mutex_.Unlock(); - } - return res; -} - uint64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 1f1dcb6a1..498c92b39 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -2060,9 +2060,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p", impl); LogFlush(impl->immutable_db_options_.info_log); - assert(impl->TEST_WALBufferIsEmpty()); - // If the assert above fails then we need to FlushWAL before returning - // control back to the user. + if (!impl->WALBufferIsEmpty()) { + impl->FlushWAL(true /* sync */); + } if (!persist_options_status.ok()) { s = Status::IOError( "DB::Open() failed --- Unable to persist Options file", diff --git a/db/db_write_test.cc b/db/db_write_test.cc index a068041c2..2c3781ae1 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -453,15 +453,15 @@ TEST_P(DBWriteTest, ManualWalFlushInEffect) { Reopen(options); // try the 1st WAL created during open ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok()); - ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); - ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty()); // try the 2nd wal created during SwitchWAL ASSERT_OK(dbfull()->TEST_SwitchWAL()); ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok()); - ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); - ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty()); } TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) { @@ -609,16 +609,16 @@ TEST_P(DBWriteTest, LockWalInEffect) { Reopen(options); // try the 1st WAL created during open ASSERT_OK(Put("key" + std::to_string(0), "value")); - ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); ASSERT_OK(dbfull()->LockWAL()); - ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false)); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false)); ASSERT_OK(dbfull()->UnlockWAL()); // try the 2nd wal created during SwitchWAL ASSERT_OK(dbfull()->TEST_SwitchWAL()); ASSERT_OK(Put("key" + std::to_string(0), "value")); - ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty()); ASSERT_OK(dbfull()->LockWAL()); - ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false)); + ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false)); ASSERT_OK(dbfull()->UnlockWAL()); } diff --git a/db/log_writer.cc b/db/log_writer.cc index faa9ad089..56f58543e 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -194,7 +194,7 @@ IOStatus Writer::AddCompressionTypeRecord() { return s; } -bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } +bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); } IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, Env::IOPriority rate_limiter_priority) { diff --git a/db/log_writer.h b/db/log_writer.h index 877be1755..4d0d49a86 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -96,7 +96,7 @@ class Writer { IOStatus Close(); - bool TEST_BufferIsEmpty(); + bool BufferIsEmpty(); private: std::unique_ptr dest_; diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 676d5f758..53e1db42d 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -86,6 +86,7 @@ DECLARE_string(options_file); DECLARE_int64(active_width); DECLARE_bool(test_batches_snapshots); DECLARE_bool(atomic_flush); +DECLARE_int32(manual_wal_flush_one_in); DECLARE_bool(test_cf_consistency); DECLARE_bool(test_multi_ops_txns); DECLARE_int32(threads); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index e0a273320..067600ee9 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -85,6 +85,13 @@ DEFINE_bool(test_batches_snapshots, false, DEFINE_bool(atomic_flush, false, "If set, enables atomic flush in the options.\n"); +DEFINE_int32( + manual_wal_flush_one_in, 0, + "If non-zero, then `FlushWAL(bool sync)`, where `bool sync` is randomly " + "decided, will be explictly called in db stress once for every N ops " + "on average. Setting `manual_wal_flush_one_in` to be greater than 0 " + "implies `Options::manual_wal_flush = true` is set."); + DEFINE_bool(test_cf_consistency, false, "If set, runs the stress test dedicated to verifying writes to " "multiple column families are consistent. Setting this implies " diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 55e61a6a7..896653b70 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -334,7 +334,14 @@ void StressTest::FinishInitDb(SharedState* shared) { } void StressTest::TrackExpectedState(SharedState* shared) { - if ((FLAGS_sync_fault_injection || FLAGS_disable_wal) && IsStateTracked()) { + // For `FLAGS_manual_wal_flush_one_inWAL` + // data can be lost when `manual_wal_flush_one_in > 0` and `FlushWAL()` is not + // explictly called by users of RocksDB (in our case, db stress). + // Therefore recovery from such potential WAL data loss is a prefix recovery + // that requires tracing + if ((FLAGS_sync_fault_injection || FLAGS_disable_wal || + FLAGS_manual_wal_flush_one_in > 0) && + IsStateTracked()) { Status s = shared->SaveAtAndAfter(db_); if (!s.ok()) { fprintf(stderr, "Error enabling history tracing: %s\n", @@ -777,6 +784,15 @@ void StressTest::OperateDb(ThreadState* thread) { MaybeClearOneColumnFamily(thread); + if (thread->rand.OneInOpt(FLAGS_manual_wal_flush_one_in)) { + bool sync = thread->rand.OneIn(2) ? true : false; + Status s = db_->FlushWAL(sync); + if (!s.ok()) { + fprintf(stderr, "FlushWAL(sync=%s) failed: %s\n", + (sync ? "true" : "false"), s.ToString().c_str()); + } + } + if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) { Status s = db_->SyncWAL(); if (!s.ok() && !s.IsNotSupported()) { @@ -2293,6 +2309,8 @@ void StressTest::PrintEnv() const { FLAGS_read_only ? "true" : "false"); fprintf(stdout, "Atomic flush : %s\n", FLAGS_atomic_flush ? "true" : "false"); + fprintf(stdout, "Manual WAL flush : %s\n", + FLAGS_manual_wal_flush_one_in > 0 ? "true" : "false"); fprintf(stdout, "Column families : %d\n", FLAGS_column_families); if (!FLAGS_test_batches_snapshots) { fprintf(stdout, "Clear CFs one in : %d\n", @@ -2801,7 +2819,9 @@ void StressTest::Reopen(ThreadState* thread) { clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_); Open(thread->shared); - if ((FLAGS_sync_fault_injection || FLAGS_disable_wal) && IsStateTracked()) { + if ((FLAGS_sync_fault_injection || FLAGS_disable_wal || + FLAGS_manual_wal_flush_one_in > 0) && + IsStateTracked()) { Status s = thread->shared->SaveAtAndAfter(db_); if (!s.ok()) { fprintf(stderr, "Error enabling history tracing: %s\n", @@ -3094,6 +3114,7 @@ void InitializeOptionsFromFlags( options.compaction_options_universal.max_size_amplification_percent = FLAGS_universal_max_size_amplification_percent; options.atomic_flush = FLAGS_atomic_flush; + options.manual_wal_flush = FLAGS_manual_wal_flush_one_in > 0 ? true : false; options.avoid_unnecessary_blocking_io = FLAGS_avoid_unnecessary_blocking_io; options.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest; options.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery; diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index d513e2bc1..b3985eb20 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -286,7 +286,7 @@ class WritableFileWriter { bool use_direct_io() { return writable_file_->use_direct_io(); } - bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; } + bool BufferIsEmpty() { return buf_.CurrentSize() == 0; } void TEST_SetFileChecksumGenerator( FileChecksumGenerator* checksum_generator) { diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 5216c845a..07f754f3c 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -77,6 +77,7 @@ default_params = { "expected_values_dir": lambda: setup_expected_values_dir(), "fail_if_options_file_error": lambda: random.randint(0, 1), "flush_one_in": 1000000, + "manual_wal_flush_one_in": lambda: random.choice([0, 0, 1000, 1000000]), "file_checksum_impl": lambda: random.choice(["none", "crc32c", "xxh64", "big"]), "get_live_files_one_in": 1000000, # Note: the following two are intentionally disabled as the corresponding @@ -525,6 +526,7 @@ def finalize_and_sanitize(src_params): if ( dest_params.get("disable_wal") == 1 or dest_params.get("sync_fault_injection") == 1 + or dest_params.get("manual_wal_flush_one_in") > 0 ): # File ingestion does not guarantee prefix-recoverability when unsynced # data can be lost. Ingesting a file syncs data immediately that is @@ -603,7 +605,7 @@ def finalize_and_sanitize(src_params): # compatible with only write committed policy if (dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0): dest_params["sync_fault_injection"] = 0 - + dest_params["manual_wal_flush_one_in"] = 0 # PutEntity is currently not supported with Merge if dest_params["use_put_entity_one_in"] != 0: dest_params["use_merge"] = 0