Fix race in ExitAsBatchGroupLeader with pipelined writes (#9944)

Summary:
Resolves https://github.com/facebook/rocksdb/issues/9692

This PR adds a unit test that reproduces the race described in https://github.com/facebook/rocksdb/issues/9692 and an according fix.

The unit test does not have any assertions, because I could not find a reliable and save way to assert that the writers list does not form a cycle. So with the old (buggy) code, the test would simply hang, while with the fix the test passes successfully.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9944

Reviewed By: pdillinger

Differential Revision: D36134604

Pulled By: riversand963

fbshipit-source-id: ef636c5a79ddbef18658ab2f19ca9210a427324a
main
mpoeter 2 years ago committed by Facebook GitHub Bot
parent 27f3af5966
commit bef3127b00
  1. 130
      db/db_write_test.cc
  2. 97
      db/write_thread.cc
  3. 4
      db/write_thread.h

@ -31,6 +31,12 @@ class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
void Open() { DBTestBase::Reopen(GetOptions()); } void Open() { DBTestBase::Reopen(GetOptions()); }
}; };
class DBWriteTestUnparameterized : public DBTestBase {
public:
explicit DBWriteTestUnparameterized()
: DBTestBase("pipelined_write_test", /*env_do_fsync=*/false) {}
};
// It is invalid to do sync write while disabling WAL. // It is invalid to do sync write while disabling WAL.
TEST_P(DBWriteTest, SyncAndDisableWAL) { TEST_P(DBWriteTest, SyncAndDisableWAL) {
WriteOptions write_options; WriteOptions write_options;
@ -318,6 +324,130 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
Close(); Close();
} }
TEST_F(DBWriteTestUnparameterized, PipelinedWriteRace) {
// This test was written to trigger a race in ExitAsBatchGroupLeader in case
// enable_pipelined_write_ was true.
// Writers for which ShouldWriteToMemtable() evaluates to false are removed
// from the write_group via CompleteFollower/ CompleteLeader. Writers in the
// middle of the group are fully unlinked, but if that writers is the
// last_writer, then we did not update the predecessor's link_older, i.e.,
// this writer was still reachable via newest_writer_.
//
// But the problem was, that CompleteFollower already wakes up the thread
// owning that writer before the writer has been removed. This resulted in a
// race - if the leader thread was fast enough, then everything was fine.
// However, if the woken up thread finished the current write operation and
// then performed yet another write, then a new writer instance was added
// to newest_writer_. It is possible that the new writer is located on the
// same address on stack, and if this happened, then we had a problem,
// because the old code tried to find the last_writer in the list to unlink
// it, which in this case produced a cycle in the list.
// Whether two invocations of PipelinedWriteImpl() by the same thread actually
// allocate the writer on the same address depends on the OS and/or compiler,
// so it is rather hard to create a deterministic test for this.
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.enable_pipelined_write = true;
std::vector<port::Thread> threads;
std::atomic<int> write_counter{0};
std::atomic<int> active_writers{0};
std::atomic<bool> second_write_starting{false};
std::atomic<bool> second_write_in_progress{false};
std::atomic<WriteThread::Writer*> leader{nullptr};
std::atomic<bool> finished_WAL_write{false};
DestroyAndReopen(options);
auto write_one_doc = [&]() {
int a = write_counter.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
--active_writers;
};
auto write_two_docs = [&]() {
write_one_doc();
second_write_starting = true;
write_one_doc();
};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
if (second_write_starting.load()) {
second_write_in_progress = true;
return;
}
auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
if (w->state == WriteThread::STATE_GROUP_LEADER) {
active_writers++;
if (leader.load() == nullptr) {
leader.store(w);
while (active_writers.load() < 2) {
// wait for another thread to join the write_group
}
}
} else {
// we disable the memtable for all followers so that they they are
// removed from the write_group before enqueuing it for the memtable
// write
w->disable_memtable = true;
active_writers++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::ExitAsBatchGroupLeader:Start", [&](void* arg) {
auto* wg = reinterpret_cast<WriteThread::WriteGroup*>(arg);
if (wg->leader == leader && !finished_WAL_write) {
finished_WAL_write = true;
while (active_writers.load() < 3) {
// wait for the new writer to be enqueued
}
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters",
[&](void* arg) {
auto* wg = reinterpret_cast<WriteThread::WriteGroup*>(arg);
if (wg->leader == leader) {
while (!second_write_in_progress.load()) {
// wait for the old follower thread to start the next write
}
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// start leader + one follower
threads.emplace_back(write_one_doc);
while (leader.load() == nullptr) {
// wait for leader
}
// we perform two writes in the follower, so that for the second write
// the thread reinserts a Writer with the same address
threads.emplace_back(write_two_docs);
// wait for the leader to enter ExitAsBatchGroupLeader
while (!finished_WAL_write.load()) {
// wait for write_group to have finished the WAL writes
}
// start another writer thread to be enqueued before the leader can
// complete the writers from its write_group
threads.emplace_back(write_one_doc);
for (auto& t : threads) {
t.join();
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBWriteTest, ManualWalFlushInEffect) { TEST_P(DBWriteTest, ManualWalFlushInEffect) {
Options options = GetOptions(); Options options = GetOptions();
Reopen(options); Reopen(options);

@ -4,8 +4,10 @@
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "db/write_thread.h" #include "db/write_thread.h"
#include <chrono> #include <chrono>
#include <thread> #include <thread>
#include "db/column_family.h" #include "db/column_family.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "port/port.h" #include "port/port.h"
@ -293,17 +295,6 @@ void WriteThread::CreateMissingNewerLinks(Writer* head) {
} }
} }
WriteThread::Writer* WriteThread::FindNextLeader(Writer* from,
Writer* boundary) {
assert(from != nullptr && from != boundary);
Writer* current = from;
while (current->link_older != boundary) {
current = current->link_older;
assert(current != nullptr);
}
return current;
}
void WriteThread::CompleteLeader(WriteGroup& write_group) { void WriteThread::CompleteLeader(WriteGroup& write_group) {
assert(write_group.size > 0); assert(write_group.size > 0);
Writer* leader = write_group.leader; Writer* leader = write_group.leader;
@ -640,6 +631,9 @@ void WriteThread::ExitAsBatchGroupFollower(Writer* w) {
static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader"); static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Status& status) { Status& status) {
TEST_SYNC_POINT_CALLBACK("WriteThread::ExitAsBatchGroupLeader:Start",
&write_group);
Writer* leader = write_group.leader; Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer; Writer* last_writer = write_group.last_writer;
assert(leader->link_older == nullptr); assert(leader->link_older == nullptr);
@ -656,7 +650,36 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
} }
if (enable_pipelined_write_) { if (enable_pipelined_write_) {
// Notify writers don't write to memtable to exit. // We insert a dummy Writer right before our current write_group. This
// allows us to unlink our write_group without the risk that a subsequent
// writer becomes a new leader and might overtake us and add itself to the
// memtable-writer-list before we can do so. This ensures that writers are
// added to the memtable-writer-list in the exact same order in which they
// were in the newest_writer list.
// This must happen before completing the writers from our group to prevent
// a race where the owning thread of one of these writers can start a new
// write operation.
Writer dummy;
Writer* head = newest_writer_.load(std::memory_order_acquire);
if (head != last_writer ||
!newest_writer_.compare_exchange_strong(head, &dummy)) {
// Either last_writer wasn't the head during the load(), or it was the
// head during the load() but somebody else pushed onto the list before
// we did the compare_exchange_strong (causing it to fail). In the latter
// case compare_exchange_strong has the effect of re-reading its first
// param (head). No need to retry a failing CAS, because only a departing
// leader (which we are at the moment) can remove nodes from the list.
assert(head != last_writer);
// After walking link_older starting from head (if not already done) we
// will be able to traverse w->link_newer below.
CreateMissingNewerLinks(head);
assert(last_writer->link_newer != nullptr);
last_writer->link_newer->link_older = &dummy;
dummy.link_newer = last_writer->link_newer;
}
// Complete writers that don't write to memtable
for (Writer* w = last_writer; w != leader;) { for (Writer* w = last_writer; w != leader;) {
Writer* next = w->link_older; Writer* next = w->link_older;
w->status = status; w->status = status;
@ -669,23 +692,11 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
CompleteLeader(write_group); CompleteLeader(write_group);
} }
Writer* next_leader = nullptr; TEST_SYNC_POINT_CALLBACK(
"WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters",
// Look for next leader before we call LinkGroup. If there isn't &write_group);
// pending writers, place a dummy writer at the tail of the queue
// so we know the boundary of the current write group.
Writer dummy;
Writer* expected = last_writer;
bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy);
if (!has_dummy) {
// We find at least one pending writer when we insert dummy. We search
// for next leader from there.
next_leader = FindNextLeader(expected, last_writer);
assert(next_leader != nullptr && next_leader != last_writer);
}
// Link the ramaining of the group to memtable writer list. // Link the remaining of the group to memtable writer list.
//
// We have to link our group to memtable writer queue before wake up the // We have to link our group to memtable writer queue before wake up the
// next leader or set newest_writer_ to null, otherwise the next leader // next leader or set newest_writer_ to null, otherwise the next leader
// can run ahead of us and link to memtable writer queue before we do. // can run ahead of us and link to memtable writer queue before we do.
@ -696,24 +707,17 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
} }
} }
// If we have inserted dummy in the queue, remove it now and check if there // Unlink the dummy writer from the list and identify the new leader
// are pending writer join the queue since we insert the dummy. If so, head = newest_writer_.load(std::memory_order_acquire);
// look for next leader again. if (head != &dummy ||
if (has_dummy) { !newest_writer_.compare_exchange_strong(head, nullptr)) {
assert(next_leader == nullptr); CreateMissingNewerLinks(head);
expected = &dummy; Writer* new_leader = dummy.link_newer;
bool has_pending_writer = assert(new_leader != nullptr);
!newest_writer_.compare_exchange_strong(expected, nullptr); new_leader->link_older = nullptr;
if (has_pending_writer) { SetState(new_leader, STATE_GROUP_LEADER);
next_leader = FindNextLeader(expected, &dummy);
assert(next_leader != nullptr && next_leader != &dummy);
}
} }
if (next_leader != nullptr) {
next_leader->link_older = nullptr;
SetState(next_leader, STATE_GROUP_LEADER);
}
AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER | AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&eabgl_ctx); &eabgl_ctx);
@ -721,8 +725,8 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Writer* head = newest_writer_.load(std::memory_order_acquire); Writer* head = newest_writer_.load(std::memory_order_acquire);
if (head != last_writer || if (head != last_writer ||
!newest_writer_.compare_exchange_strong(head, nullptr)) { !newest_writer_.compare_exchange_strong(head, nullptr)) {
// Either w wasn't the head during the load(), or it was the head // Either last_writer wasn't the head during the load(), or it was the
// during the load() but somebody else pushed onto the list before // head during the load() but somebody else pushed onto the list before
// we did the compare_exchange_strong (causing it to fail). In the // we did the compare_exchange_strong (causing it to fail). In the
// latter case compare_exchange_strong has the effect of re-reading // latter case compare_exchange_strong has the effect of re-reading
// its first param (head). No need to retry a failing CAS, because // its first param (head). No need to retry a failing CAS, because
@ -738,6 +742,7 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
// to MarkJoined, so we can definitely conclude that no other leader // to MarkJoined, so we can definitely conclude that no other leader
// work is going on here (with or without db mutex). // work is going on here (with or without db mutex).
CreateMissingNewerLinks(head); CreateMissingNewerLinks(head);
assert(last_writer->link_newer != nullptr);
assert(last_writer->link_newer->link_older == last_writer); assert(last_writer->link_newer->link_older == last_writer);
last_writer->link_newer->link_older = nullptr; last_writer->link_newer->link_older = nullptr;

@ -428,10 +428,6 @@ class WriteThread {
// concurrently with itself. // concurrently with itself.
void CreateMissingNewerLinks(Writer* head); void CreateMissingNewerLinks(Writer* head);
// Starting from a pending writer, follow link_older to search for next
// leader, until we hit boundary.
Writer* FindNextLeader(Writer* pending_writer, Writer* boundary);
// Set the leader in write_group to completed state and remove it from the // Set the leader in write_group to completed state and remove it from the
// write group. // write group.
void CompleteLeader(WriteGroup& write_group); void CompleteLeader(WriteGroup& write_group);

Loading…
Cancel
Save