From 0b6be7eb686293a9f7114d6116f72ccd2ec66f19 Mon Sep 17 00:00:00 2001 From: eharry Date: Mon, 13 Sep 2021 20:15:00 -0700 Subject: [PATCH] Fix WAL log data corruption #8723 (#8746) Summary: Fix WAL log data corruption when using DBOptions.manual_wal_flush(true) and WriteOptions.sync(true) together (https://github.com/facebook/rocksdb/issues/8723) Pull Request resolved: https://github.com/facebook/rocksdb/pull/8746 Reviewed By: ajkr Differential Revision: D30758468 Pulled By: riversand963 fbshipit-source-id: 07c20899d5f2447dc77861b4845efc68a59aa4e8 --- HISTORY.md | 1 + db/db_impl/db_impl_write.cc | 16 ++++++++++++++++ db/db_test.cc | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index 8a44d216a..680c05088 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ * Fix a race in BackupEngine if RateLimiter is reconfigured during concurrent Restore operations. * Fix a bug on POSIX in which failure to create a lock file (e.g. out of space) can prevent future LockFile attempts in the same process on the same file from succeeding. * Fix a bug that backup_rate_limiter and restore_rate_limiter in BackupEngine could not limit read rates. +* Fix WAL log data corruption when using DBOptions.manual_wal_flush(true) and WriteOptions.sync(true) together. The sync WAL should work with locked log_write_mutex_. ### New Features * RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions. diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 6fcb56b79..b7d856ca3 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1124,6 +1124,18 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, // writer thread, so no one will push to logs_, // - as long as other threads don't modify it, it's safe to read // from std::deque from multiple threads concurrently. + // + // Sync operation should work with locked log_write_mutex_, because: + // when DBOptions.manual_wal_flush_ is set, + // FlushWAL function will be invoked by another thread. + // if without locked log_write_mutex_, the log file may get data + // corruption + + const bool needs_locking = manual_wal_flush_ && !two_write_queues_; + if (UNLIKELY(needs_locking)) { + log_write_mutex_.Lock(); + } + for (auto& log : logs_) { io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync); if (!io_s.ok()) { @@ -1131,6 +1143,10 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, } } + if (UNLIKELY(needs_locking)) { + log_write_mutex_.Unlock(); + } + if (io_s.ok() && need_log_dir_sync) { // We only sync WAL directory the first time WAL syncing is // requested, so that in case users never turn on WAL sync, diff --git a/db/db_test.cc b/db/db_test.cc index fca8b2880..b99615caa 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4098,6 +4098,39 @@ TEST_F(DBTest, ConcurrentFlushWAL) { } } +// This test failure will be caught with a probability +TEST_F(DBTest, ManualFlushWalAndWriteRace) { + Options options; + options.env = env_; + options.manual_wal_flush = true; + options.create_if_missing = true; + + DestroyAndReopen(options); + + WriteOptions wopts; + wopts.sync = true; + + port::Thread writeThread([&]() { + for (int i = 0; i < 100; i++) { + auto istr = ToString(i); + dbfull()->Put(wopts, "key_" + istr, "value_" + istr); + } + }); + port::Thread flushThread([&]() { + for (int i = 0; i < 100; i++) { + ASSERT_OK(dbfull()->FlushWAL(false)); + } + }); + + writeThread.join(); + flushThread.join(); + ASSERT_OK(dbfull()->Put(wopts, "foo1", "value1")); + ASSERT_OK(dbfull()->Put(wopts, "foo2", "value2")); + Reopen(options); + ASSERT_EQ("value1", Get("foo1")); + ASSERT_EQ("value2", Get("foo2")); +} + #ifndef ROCKSDB_LITE TEST_F(DBTest, DynamicMemtableOptions) { const uint64_t k64KB = 1 << 16;