diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 7107d2bb2..deee6efc3 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -31,6 +31,12 @@ class DBWriteTest : public DBTestBase, public testing::WithParamInterface { void Open() { DBTestBase::Reopen(GetOptions()); } }; +class DBWriteTestUnparameterized : public DBTestBase { + public: + explicit DBWriteTestUnparameterized() + : DBTestBase("pipelined_write_test", /*env_do_fsync=*/false) {} +}; + // It is invalid to do sync write while disabling WAL. TEST_P(DBWriteTest, SyncAndDisableWAL) { WriteOptions write_options; @@ -318,6 +324,130 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { Close(); } +TEST_F(DBWriteTestUnparameterized, PipelinedWriteRace) { + // This test was written to trigger a race in ExitAsBatchGroupLeader in case + // enable_pipelined_write_ was true. + // Writers for which ShouldWriteToMemtable() evaluates to false are removed + // from the write_group via CompleteFollower/ CompleteLeader. Writers in the + // middle of the group are fully unlinked, but if that writers is the + // last_writer, then we did not update the predecessor's link_older, i.e., + // this writer was still reachable via newest_writer_. + // + // But the problem was, that CompleteFollower already wakes up the thread + // owning that writer before the writer has been removed. This resulted in a + // race - if the leader thread was fast enough, then everything was fine. + // However, if the woken up thread finished the current write operation and + // then performed yet another write, then a new writer instance was added + // to newest_writer_. It is possible that the new writer is located on the + // same address on stack, and if this happened, then we had a problem, + // because the old code tried to find the last_writer in the list to unlink + // it, which in this case produced a cycle in the list. + // Whether two invocations of PipelinedWriteImpl() by the same thread actually + // allocate the writer on the same address depends on the OS and/or compiler, + // so it is rather hard to create a deterministic test for this. + + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.enable_pipelined_write = true; + std::vector threads; + + std::atomic write_counter{0}; + std::atomic active_writers{0}; + std::atomic second_write_starting{false}; + std::atomic second_write_in_progress{false}; + std::atomic leader{nullptr}; + std::atomic finished_WAL_write{false}; + + DestroyAndReopen(options); + + auto write_one_doc = [&]() { + int a = write_counter.fetch_add(1); + std::string key = "foo" + std::to_string(a); + WriteOptions wo; + ASSERT_OK(dbfull()->Put(wo, key, "bar")); + --active_writers; + }; + + auto write_two_docs = [&]() { + write_one_doc(); + second_write_starting = true; + write_one_doc(); + }; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + if (second_write_starting.load()) { + second_write_in_progress = true; + return; + } + auto* w = reinterpret_cast(arg); + if (w->state == WriteThread::STATE_GROUP_LEADER) { + active_writers++; + if (leader.load() == nullptr) { + leader.store(w); + while (active_writers.load() < 2) { + // wait for another thread to join the write_group + } + } + } else { + // we disable the memtable for all followers so that they they are + // removed from the write_group before enqueuing it for the memtable + // write + w->disable_memtable = true; + active_writers++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::ExitAsBatchGroupLeader:Start", [&](void* arg) { + auto* wg = reinterpret_cast(arg); + if (wg->leader == leader && !finished_WAL_write) { + finished_WAL_write = true; + while (active_writers.load() < 3) { + // wait for the new writer to be enqueued + } + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters", + [&](void* arg) { + auto* wg = reinterpret_cast(arg); + if (wg->leader == leader) { + while (!second_write_in_progress.load()) { + // wait for the old follower thread to start the next write + } + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // start leader + one follower + threads.emplace_back(write_one_doc); + while (leader.load() == nullptr) { + // wait for leader + } + + // we perform two writes in the follower, so that for the second write + // the thread reinserts a Writer with the same address + threads.emplace_back(write_two_docs); + + // wait for the leader to enter ExitAsBatchGroupLeader + while (!finished_WAL_write.load()) { + // wait for write_group to have finished the WAL writes + } + + // start another writer thread to be enqueued before the leader can + // complete the writers from its write_group + threads.emplace_back(write_one_doc); + + for (auto& t : threads) { + t.join(); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_P(DBWriteTest, ManualWalFlushInEffect) { Options options = GetOptions(); Reopen(options); diff --git a/db/write_thread.cc b/db/write_thread.cc index 06d7f4500..40580710a 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -4,8 +4,10 @@ // (found in the LICENSE.Apache file in the root directory). #include "db/write_thread.h" + #include #include + #include "db/column_family.h" #include "monitoring/perf_context_imp.h" #include "port/port.h" @@ -293,17 +295,6 @@ void WriteThread::CreateMissingNewerLinks(Writer* head) { } } -WriteThread::Writer* WriteThread::FindNextLeader(Writer* from, - Writer* boundary) { - assert(from != nullptr && from != boundary); - Writer* current = from; - while (current->link_older != boundary) { - current = current->link_older; - assert(current != nullptr); - } - return current; -} - void WriteThread::CompleteLeader(WriteGroup& write_group) { assert(write_group.size > 0); Writer* leader = write_group.leader; @@ -640,6 +631,9 @@ void WriteThread::ExitAsBatchGroupFollower(Writer* w) { static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader"); void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, Status& status) { + TEST_SYNC_POINT_CALLBACK("WriteThread::ExitAsBatchGroupLeader:Start", + &write_group); + Writer* leader = write_group.leader; Writer* last_writer = write_group.last_writer; assert(leader->link_older == nullptr); @@ -656,7 +650,36 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, } if (enable_pipelined_write_) { - // Notify writers don't write to memtable to exit. + // We insert a dummy Writer right before our current write_group. This + // allows us to unlink our write_group without the risk that a subsequent + // writer becomes a new leader and might overtake us and add itself to the + // memtable-writer-list before we can do so. This ensures that writers are + // added to the memtable-writer-list in the exact same order in which they + // were in the newest_writer list. + // This must happen before completing the writers from our group to prevent + // a race where the owning thread of one of these writers can start a new + // write operation. + Writer dummy; + Writer* head = newest_writer_.load(std::memory_order_acquire); + if (head != last_writer || + !newest_writer_.compare_exchange_strong(head, &dummy)) { + // Either last_writer wasn't the head during the load(), or it was the + // head during the load() but somebody else pushed onto the list before + // we did the compare_exchange_strong (causing it to fail). In the latter + // case compare_exchange_strong has the effect of re-reading its first + // param (head). No need to retry a failing CAS, because only a departing + // leader (which we are at the moment) can remove nodes from the list. + assert(head != last_writer); + + // After walking link_older starting from head (if not already done) we + // will be able to traverse w->link_newer below. + CreateMissingNewerLinks(head); + assert(last_writer->link_newer != nullptr); + last_writer->link_newer->link_older = &dummy; + dummy.link_newer = last_writer->link_newer; + } + + // Complete writers that don't write to memtable for (Writer* w = last_writer; w != leader;) { Writer* next = w->link_older; w->status = status; @@ -669,23 +692,11 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, CompleteLeader(write_group); } - Writer* next_leader = nullptr; - - // Look for next leader before we call LinkGroup. If there isn't - // pending writers, place a dummy writer at the tail of the queue - // so we know the boundary of the current write group. - Writer dummy; - Writer* expected = last_writer; - bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy); - if (!has_dummy) { - // We find at least one pending writer when we insert dummy. We search - // for next leader from there. - next_leader = FindNextLeader(expected, last_writer); - assert(next_leader != nullptr && next_leader != last_writer); - } + TEST_SYNC_POINT_CALLBACK( + "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters", + &write_group); - // Link the ramaining of the group to memtable writer list. - // + // Link the remaining of the group to memtable writer list. // We have to link our group to memtable writer queue before wake up the // next leader or set newest_writer_ to null, otherwise the next leader // can run ahead of us and link to memtable writer queue before we do. @@ -696,24 +707,17 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, } } - // If we have inserted dummy in the queue, remove it now and check if there - // are pending writer join the queue since we insert the dummy. If so, - // look for next leader again. - if (has_dummy) { - assert(next_leader == nullptr); - expected = &dummy; - bool has_pending_writer = - !newest_writer_.compare_exchange_strong(expected, nullptr); - if (has_pending_writer) { - next_leader = FindNextLeader(expected, &dummy); - assert(next_leader != nullptr && next_leader != &dummy); - } + // Unlink the dummy writer from the list and identify the new leader + head = newest_writer_.load(std::memory_order_acquire); + if (head != &dummy || + !newest_writer_.compare_exchange_strong(head, nullptr)) { + CreateMissingNewerLinks(head); + Writer* new_leader = dummy.link_newer; + assert(new_leader != nullptr); + new_leader->link_older = nullptr; + SetState(new_leader, STATE_GROUP_LEADER); } - if (next_leader != nullptr) { - next_leader->link_older = nullptr; - SetState(next_leader, STATE_GROUP_LEADER); - } AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &eabgl_ctx); @@ -721,8 +725,8 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, Writer* head = newest_writer_.load(std::memory_order_acquire); if (head != last_writer || !newest_writer_.compare_exchange_strong(head, nullptr)) { - // Either w wasn't the head during the load(), or it was the head - // during the load() but somebody else pushed onto the list before + // Either last_writer wasn't the head during the load(), or it was the + // head during the load() but somebody else pushed onto the list before // we did the compare_exchange_strong (causing it to fail). In the // latter case compare_exchange_strong has the effect of re-reading // its first param (head). No need to retry a failing CAS, because @@ -738,6 +742,7 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, // to MarkJoined, so we can definitely conclude that no other leader // work is going on here (with or without db mutex). CreateMissingNewerLinks(head); + assert(last_writer->link_newer != nullptr); assert(last_writer->link_newer->link_older == last_writer); last_writer->link_newer->link_older = nullptr; diff --git a/db/write_thread.h b/db/write_thread.h index f78b01cd9..0ea51d922 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -428,10 +428,6 @@ class WriteThread { // concurrently with itself. void CreateMissingNewerLinks(Writer* head); - // Starting from a pending writer, follow link_older to search for next - // leader, until we hit boundary. - Writer* FindNextLeader(Writer* pending_writer, Writer* boundary); - // Set the leader in write_group to completed state and remove it from the // write group. void CompleteLeader(WriteGroup& write_group);