diff --git a/HISTORY.md b/HISTORY.md index a38325923..568acce3a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### Public API Change ### New Features ### Bug Fixes +* Fix IOError on WAL write doesn't propagate to write group follower ## 5.9.0 (11/1/2017) ### Public API Change diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index a569ddbae..2560c38bd 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -348,7 +348,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, versions_->SetLastSequence(last_sequence); } MemTableInsertStatusCheck(w.status); - write_thread_.ExitAsBatchGroupLeader(write_group, w.status); + write_thread_.ExitAsBatchGroupLeader(write_group, status); } if (status.ok()) { @@ -577,7 +577,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, if (!w.CallbackFailed()) { WriteCallbackStatusCheck(status); } - nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, w.status); + nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status); if (status.ok()) { status = w.FinalStatus(); } diff --git a/db/db_write_test.cc b/db/db_write_test.cc index b5ea930a0..d21bfe473 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -3,12 +3,18 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include #include #include #include #include "db/db_test_util.h" #include "db/write_batch_internal.h" +#include "db/write_thread.h" +#include "port/port.h" #include "port/stack_trace.h" +#include "util/fault_injection_test_env.h" +#include "util/string_util.h" +#include "util/sync_point.h" namespace rocksdb { @@ -17,7 +23,9 @@ class DBWriteTest : public DBTestBase, public testing::WithParamInterface { public: DBWriteTest() : DBTestBase("/db_write_test") {} - void Open() { DBTestBase::Reopen(GetOptions(GetParam())); } + Options GetOptions() { return DBTestBase::GetOptions(GetParam()); } + + void Open() { DBTestBase::Reopen(GetOptions()); } }; // It is invalid to do sync write while disabling WAL. @@ -77,6 +85,47 @@ TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) { } } +TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { + constexpr int kNumThreads = 5; + std::unique_ptr mock_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetOptions(); + options.env = mock_env.get(); + Reopen(options); + std::atomic ready_count{0}; + std::atomic leader_count{0}; + std::vector threads; + mock_env->SetFilesystemActive(false); + // Wait until all threads linked to write threads, to make sure + // all threads join the same batch group. + SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + ready_count++; + auto* w = reinterpret_cast(arg); + if (w->state == WriteThread::STATE_GROUP_LEADER) { + leader_count++; + while (ready_count < kNumThreads) { + // busy waiting + } + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + for (int i = 0; i < kNumThreads; i++) { + threads.push_back(port::Thread( + [&](int index) { + // All threads should fail. + ASSERT_FALSE(Put("key" + ToString(index), "value").ok()); + }, + i)); + } + for (int i = 0; i < kNumThreads; i++) { + threads[i].join(); + } + ASSERT_EQ(1, leader_count); + // Close before mock_env destruct. + Close(); +} + INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites, diff --git a/db/write_thread.cc b/db/write_thread.cc index 27c20e40b..e115ba539 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -533,6 +533,11 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, Writer* last_writer = write_group.last_writer; assert(leader->link_older == nullptr); + // Propagate memtable write error to the whole group. + if (status.ok() && !write_group.status.ok()) { + status = write_group.status; + } + if (enable_pipelined_write_) { // Notify writers don't write to memtable to exit. for (Writer* w = last_writer; w != leader;) {