From 3cf562be31a5876a00caff6eb638049bcad5ee9b Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 28 Nov 2017 11:40:40 -0800 Subject: [PATCH] Fix IOError on WAL write doesn't propagate to write group follower Summary: This is a simpler version of #3097 by removing all unrelated changes. Fixing the bug where concurrent writes may get Status::OK while it actually gets IOError on WAL write. This happens when multiple writes form a write batch group, and the leader get an IOError while writing to WAL. The leader failed to pass the error to followers in the group, and the followers end up returning Status::OK() while actually writing nothing. The bug only affect writes in a batch group. Future writes after the batch group will correctly return immediately with the IOError. Closes https://github.com/facebook/rocksdb/pull/3201 Differential Revision: D6421644 Pulled By: yiwu-arbug fbshipit-source-id: 1c2a455c5b73f6842423785eb8a9dbfbb191dc0e --- HISTORY.md | 1 + db/db_impl_write.cc | 4 ++-- db/db_write_test.cc | 51 ++++++++++++++++++++++++++++++++++++++++++++- db/write_thread.cc | 5 +++++ 4 files changed, 58 insertions(+), 3 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index a38325923..568acce3a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### Public API Change ### New Features ### Bug Fixes +* Fix IOError on WAL write doesn't propagate to write group follower ## 5.9.0 (11/1/2017) ### Public API Change diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index a569ddbae..2560c38bd 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -348,7 +348,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, versions_->SetLastSequence(last_sequence); } MemTableInsertStatusCheck(w.status); - write_thread_.ExitAsBatchGroupLeader(write_group, w.status); + write_thread_.ExitAsBatchGroupLeader(write_group, status); } if (status.ok()) { @@ -577,7 +577,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, if (!w.CallbackFailed()) { WriteCallbackStatusCheck(status); } - nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, w.status); + nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status); if (status.ok()) { status = w.FinalStatus(); } diff --git a/db/db_write_test.cc b/db/db_write_test.cc index b5ea930a0..d21bfe473 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -3,12 +3,18 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include #include #include #include #include "db/db_test_util.h" #include "db/write_batch_internal.h" +#include "db/write_thread.h" +#include "port/port.h" #include "port/stack_trace.h" +#include "util/fault_injection_test_env.h" +#include "util/string_util.h" +#include "util/sync_point.h" namespace rocksdb { @@ -17,7 +23,9 @@ class DBWriteTest : public DBTestBase, public testing::WithParamInterface { public: DBWriteTest() : DBTestBase("/db_write_test") {} - void Open() { DBTestBase::Reopen(GetOptions(GetParam())); } + Options GetOptions() { return DBTestBase::GetOptions(GetParam()); } + + void Open() { DBTestBase::Reopen(GetOptions()); } }; // It is invalid to do sync write while disabling WAL. @@ -77,6 +85,47 @@ TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) { } } +TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { + constexpr int kNumThreads = 5; + std::unique_ptr mock_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetOptions(); + options.env = mock_env.get(); + Reopen(options); + std::atomic ready_count{0}; + std::atomic leader_count{0}; + std::vector threads; + mock_env->SetFilesystemActive(false); + // Wait until all threads linked to write threads, to make sure + // all threads join the same batch group. + SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + ready_count++; + auto* w = reinterpret_cast(arg); + if (w->state == WriteThread::STATE_GROUP_LEADER) { + leader_count++; + while (ready_count < kNumThreads) { + // busy waiting + } + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + for (int i = 0; i < kNumThreads; i++) { + threads.push_back(port::Thread( + [&](int index) { + // All threads should fail. + ASSERT_FALSE(Put("key" + ToString(index), "value").ok()); + }, + i)); + } + for (int i = 0; i < kNumThreads; i++) { + threads[i].join(); + } + ASSERT_EQ(1, leader_count); + // Close before mock_env destruct. + Close(); +} + INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites, diff --git a/db/write_thread.cc b/db/write_thread.cc index 27c20e40b..e115ba539 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -533,6 +533,11 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, Writer* last_writer = write_group.last_writer; assert(leader->link_older == nullptr); + // Propagate memtable write error to the whole group. + if (status.ok() && !write_group.status.ok()) { + status = write_group.status; + } + if (enable_pipelined_write_) { // Notify writers don't write to memtable to exit. for (Writer* w = last_writer; w != leader;) {