Fix write get stuck when pipelined write is enabled (#4143)

Summary:
Fix the issue when pipelined write is enabled, writers can get stuck indefinitely and not able to finish the write. It can show with the following example: Assume there are 4 writers W1, W2, W3, W4 (W1 is the first, W4 is the last).

T1: all writers pending in WAL writer queue:
WAL writer queue: W1, W2, W3, W4
memtable writer queue: empty

T2. W1 finish WAL writer and move to memtable writer queue:
WAL writer queue: W2, W3, W4,
memtable writer queue: W1

T3. W2 and W3 finish WAL write as a batch group. W2 enter ExitAsBatchGroupLeader and move the group to memtable writer queue, but before wake up next leader.
WAL writer queue: W4
memtable writer queue: W1, W2, W3

T4. W1, W2, W3 finish memtable write as a batch group. Note that W2 still in the previous ExitAsBatchGroupLeader, although W1 have done memtable write for W2.
WAL writer queue: W4
memtable writer queue: empty

T5. The thread corresponding to W3 create another writer W3' with the same address as W3.
WAL writer queue: W4, W3'
memtable writer queue: empty

T6. W2 continue with ExitAsBatchGroupLeader. Because the address of W3' is the same as W3, the last writer in its group, it thinks there are no pending writers, so it reset newest_writer_ to null, emptying the queue. W4 and W3' are deleted from the queue and will never be wake up.

The issue exists since pipelined write was introduced in 5.5.0.

Closes #3704
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4143

Differential Revision: D8871599

Pulled By: yiwu-arbug

fbshipit-source-id: 3502674e51066a954a0660257e24ac588f815e2a
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent ddc07b40fc
commit d538ebdff0
  1. 1
      HISTORY.md
  2. 53
      db/write_thread.cc
  3. 4
      db/write_thread.h

@ -20,6 +20,7 @@
* Fix corruption in non-iterator reads when mmap is used for file reads * Fix corruption in non-iterator reads when mmap is used for file reads
* Fix bug with prefix search in partition filters where a shared prefix would be ignored from the later partitions. The bug could report an eixstent key as missing. The bug could be triggered if prefix_extractor is set and partition filters is enabled. * Fix bug with prefix search in partition filters where a shared prefix would be ignored from the later partitions. The bug could report an eixstent key as missing. The bug could be triggered if prefix_extractor is set and partition filters is enabled.
* Change default value of `bytes_max_delete_chunk` to 0 in NewSstFileManager() as it doesn't work well with checkpoints. * Change default value of `bytes_max_delete_chunk` to 0 in NewSstFileManager() as it doesn't work well with checkpoints.
* Fix write can stuck indefinitely if enable_pipelined_write=true. The issue exists since pipelined write was introduced in 5.5.0.
## 5.14.0 (5/16/2018) ## 5.14.0 (5/16/2018)
### Public API Change ### Public API Change

@ -263,6 +263,17 @@ void WriteThread::CreateMissingNewerLinks(Writer* head) {
} }
} }
WriteThread::Writer* WriteThread::FindNextLeader(Writer* from,
Writer* boundary) {
assert(from != nullptr && from != boundary);
Writer* current = from;
while (current->link_older != boundary) {
current = current->link_older;
assert(current != nullptr);
}
return current;
}
void WriteThread::CompleteLeader(WriteGroup& write_group) { void WriteThread::CompleteLeader(WriteGroup& write_group) {
assert(write_group.size > 0); assert(write_group.size > 0);
Writer* leader = write_group.leader; Writer* leader = write_group.leader;
@ -558,21 +569,49 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
if (!leader->ShouldWriteToMemtable()) { if (!leader->ShouldWriteToMemtable()) {
CompleteLeader(write_group); CompleteLeader(write_group);
} }
Writer* next_leader = nullptr;
// Look for next leader before we call LinkGroup. If there isn't
// pending writers, place a dummy writer at the tail of the queue
// so we know the boundary of the current write group.
Writer dummy;
Writer* expected = last_writer;
bool has_dummy = newest_writer_.compare_exchange_strong(expected, &dummy);
if (!has_dummy) {
// We find at least one pending writer when we insert dummy. We search
// for next leader from there.
next_leader = FindNextLeader(expected, last_writer);
assert(next_leader != nullptr && next_leader != last_writer);
}
// Link the ramaining of the group to memtable writer list. // Link the ramaining of the group to memtable writer list.
//
// We have to link our group to memtable writer queue before wake up the
// next leader or set newest_writer_ to null, otherwise the next leader
// can run ahead of us and link to memtable writer queue before we do.
if (write_group.size > 0) { if (write_group.size > 0) {
if (LinkGroup(write_group, &newest_memtable_writer_)) { if (LinkGroup(write_group, &newest_memtable_writer_)) {
// The leader can now be different from current writer. // The leader can now be different from current writer.
SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER); SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
} }
} }
// Reset newest_writer_ and wake up the next leader.
Writer* newest_writer = last_writer; // If we have inserted dummy in the queue, remove it now and check if there
if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) { // are pending writer join the queue since we insert the dummy. If so,
Writer* next_leader = newest_writer; // look for next leader again.
while (next_leader->link_older != last_writer) { if (has_dummy) {
next_leader = next_leader->link_older; assert(next_leader == nullptr);
assert(next_leader != nullptr); expected = &dummy;
bool has_pending_writer =
!newest_writer_.compare_exchange_strong(expected, nullptr);
if (has_pending_writer) {
next_leader = FindNextLeader(expected, &dummy);
assert(next_leader != nullptr && next_leader != &dummy);
} }
}
if (next_leader != nullptr) {
next_leader->link_older = nullptr; next_leader->link_older = nullptr;
SetState(next_leader, STATE_GROUP_LEADER); SetState(next_leader, STATE_GROUP_LEADER);
} }

@ -392,6 +392,10 @@ class WriteThread {
// concurrently with itself. // concurrently with itself.
void CreateMissingNewerLinks(Writer* head); void CreateMissingNewerLinks(Writer* head);
// Starting from a pending writer, follow link_older to search for next
// leader, until we hit boundary.
Writer* FindNextLeader(Writer* pending_writer, Writer* boundary);
// Set the leader in write_group to completed state and remove it from the // Set the leader in write_group to completed state and remove it from the
// write group. // write group.
void CompleteLeader(WriteGroup& write_group); void CompleteLeader(WriteGroup& write_group);

Loading…
Cancel
Save