|
|
|
@ -267,8 +267,8 @@ void WriteThread::CompleteFollower(Writer* w, WriteGroup& write_group) { |
|
|
|
|
SetState(w, STATE_COMPLETED); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup"); |
|
|
|
|
void WriteThread::JoinBatchGroup(Writer* w) { |
|
|
|
|
static AdaptationContext ctx("JoinBatchGroup"); |
|
|
|
|
|
|
|
|
|
assert(w->batch != nullptr); |
|
|
|
|
bool linked_as_leader = LinkOne(w, &newest_writer_); |
|
|
|
@ -294,7 +294,7 @@ void WriteThread::JoinBatchGroup(Writer* w) { |
|
|
|
|
*/ |
|
|
|
|
AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | |
|
|
|
|
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, |
|
|
|
|
&ctx); |
|
|
|
|
&jbg_ctx); |
|
|
|
|
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -473,9 +473,9 @@ void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static WriteThread::AdaptationContext cpmtw_ctx("CompleteParallelMemTableWriter"); |
|
|
|
|
// This method is called by both the leader and parallel followers
|
|
|
|
|
bool WriteThread::CompleteParallelMemTableWriter(Writer* w) { |
|
|
|
|
static AdaptationContext ctx("CompleteParallelMemTableWriter"); |
|
|
|
|
|
|
|
|
|
auto* write_group = w->write_group; |
|
|
|
|
if (!w->status.ok()) { |
|
|
|
@ -485,7 +485,7 @@ bool WriteThread::CompleteParallelMemTableWriter(Writer* w) { |
|
|
|
|
|
|
|
|
|
if (write_group->running-- > 1) { |
|
|
|
|
// we're not the last one
|
|
|
|
|
AwaitState(w, STATE_COMPLETED, &ctx); |
|
|
|
|
AwaitState(w, STATE_COMPLETED, &cpmtw_ctx); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
// else we're the last parallel worker and should perform exit duties.
|
|
|
|
@ -504,9 +504,9 @@ void WriteThread::ExitAsBatchGroupFollower(Writer* w) { |
|
|
|
|
SetState(write_group->leader, STATE_COMPLETED); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader"); |
|
|
|
|
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, |
|
|
|
|
Status status) { |
|
|
|
|
static AdaptationContext ctx("ExitAsBatchGroupLeader"); |
|
|
|
|
Writer* leader = write_group.leader; |
|
|
|
|
Writer* last_writer = write_group.last_writer; |
|
|
|
|
assert(leader->link_older == nullptr); |
|
|
|
@ -544,7 +544,7 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, |
|
|
|
|
} |
|
|
|
|
AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER | |
|
|
|
|
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, |
|
|
|
|
&ctx); |
|
|
|
|
&eabgl_ctx); |
|
|
|
|
} else { |
|
|
|
|
Writer* head = newest_writer_.load(std::memory_order_acquire); |
|
|
|
|
if (head != last_writer || |
|
|
|
@ -591,15 +591,15 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static WriteThread::AdaptationContext eu_ctx("EnterUnbatched"); |
|
|
|
|
void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { |
|
|
|
|
static AdaptationContext ctx("EnterUnbatched"); |
|
|
|
|
assert(w != nullptr && w->batch == nullptr); |
|
|
|
|
mu->Unlock(); |
|
|
|
|
bool linked_as_leader = LinkOne(w, &newest_writer_); |
|
|
|
|
if (!linked_as_leader) { |
|
|
|
|
TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait"); |
|
|
|
|
// Last leader will not pick us as a follower since our batch is nullptr
|
|
|
|
|
AwaitState(w, STATE_GROUP_LEADER, &ctx); |
|
|
|
|
AwaitState(w, STATE_GROUP_LEADER, &eu_ctx); |
|
|
|
|
} |
|
|
|
|
if (enable_pipelined_write_) { |
|
|
|
|
WaitForMemTableWriters(); |
|
|
|
@ -619,15 +619,15 @@ void WriteThread::ExitUnbatched(Writer* w) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static WriteThread::AdaptationContext wfmw_ctx("WaitForMemTableWriters"); |
|
|
|
|
void WriteThread::WaitForMemTableWriters() { |
|
|
|
|
static AdaptationContext ctx("WaitForMemTableWriters"); |
|
|
|
|
assert(enable_pipelined_write_); |
|
|
|
|
if (newest_memtable_writer_.load() == nullptr) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
Writer w; |
|
|
|
|
if (!LinkOne(&w, &newest_memtable_writer_)) { |
|
|
|
|
AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &ctx); |
|
|
|
|
AwaitState(&w, STATE_MEMTABLE_WRITER_LEADER, &wfmw_ctx); |
|
|
|
|
} |
|
|
|
|
newest_memtable_writer_.store(nullptr); |
|
|
|
|
} |
|
|
|
|