@ -7,55 +7,64 @@
namespace rocksdb {
namespace rocksdb {
void WriteThread : : EnterWriteThread ( WriteThread : : Writer * w ) {
void WriteThread : : Await ( Writer * w ) {
// the following code block pushes the current writer "w" into the writer
std : : unique_lock < std : : mutex > guard ( w - > JoinMutex ( ) ) ;
// queue "writers_" and wait until one of the following conditions met:
w - > JoinCV ( ) . wait ( guard , [ w ] { return w - > joined ; } ) ;
// 1. the job of "w" has been done by some other writers.
}
// 2. "w" becomes the first writer in "writers_"
// 3. "w" timed-out.
void WriteThread : : MarkJoined ( Writer * w ) {
writers_ . push_back ( w ) ;
std : : lock_guard < std : : mutex > guard ( w - > JoinMutex ( ) ) ;
assert ( ! w - > joined ) ;
w - > joined = true ;
w - > JoinCV ( ) . notify_one ( ) ;
}
while ( ! w - > done & & w ! = writers_ . front ( ) ) {
void WriteThread : : LinkOne ( Writer * w , bool * wait_needed ) {
w - > cv . Wait ( ) ;
assert ( ! w - > joined & & ! w - > done ) ;
Writer * writers = newest_writer_ . load ( std : : memory_order_relaxed ) ;
while ( true ) {
w - > link_older = writers ;
if ( writers ! = nullptr ) {
w - > CreateMutex ( ) ;
}
if ( newest_writer_ . compare_exchange_strong ( writers , w ) ) {
// Success.
* wait_needed = ( writers ! = nullptr ) ;
return ;
}
}
}
}
}
void WriteThread : : ExitWriteThread ( WriteThread : : Writer * w ,
void WriteThread : : CreateMissingNewerLinks ( Writer * head ) {
WriteThread : : Writer * last_writer ,
while ( true ) {
Status status ) {
Writer * next = head - > link_older ;
// Pop out the current writer and all writers being pushed before the
if ( next = = nullptr | | next - > link_newer ! = nullptr ) {
// current writer from the writer queue.
assert ( next = = nullptr | | next - > link_newer = = head ) ;
while ( ! writers_ . empty ( ) ) {
break ;
Writer * ready = writers_ . front ( ) ;
}
writers_ . pop_front ( ) ;
next - > link_newer = head ;
if ( ready ! = w ) {
head = next ;
ready - > status = status ;
ready - > done = true ;
ready - > cv . Signal ( ) ;
}
}
if ( ready = = last_writer ) break ;
}
}
// Notify new head of write queue
void WriteThread : : JoinBatchGroup ( Writer * w ) {
if ( ! writers_ . empty ( ) ) {
assert ( w - > batch ! = nullptr ) ;
writers_ . front ( ) - > cv . Signal ( ) ;
bool wait_needed ;
LinkOne ( w , & wait_needed ) ;
if ( wait_needed ) {
Await ( w ) ;
}
}
}
}
// This function will be called only when the first writer succeeds.
size_t WriteThread : : EnterAsBatchGroupLeader (
// All writers in the to-be-built batch group will be processed.
Writer * leader , WriteThread : : Writer * * last_writer ,
//
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-nullptr batch
size_t WriteThread : : BuildBatchGroup (
WriteThread : : Writer * * last_writer ,
autovector < WriteBatch * > * write_batch_group ) {
autovector < WriteBatch * > * write_batch_group ) {
assert ( ! writers_ . empty ( ) ) ;
assert ( leader - > link_older = = nullptr ) ;
Writer * first = writers_ . front ( ) ;
assert ( leader - > batch ! = nullptr ) ;
assert ( first - > batch ! = nullptr ) ;
size_t size = WriteBatchInternal : : ByteSize ( first - > batch ) ;
size_t size = WriteBatchInternal : : ByteSize ( leader - > batch ) ;
write_batch_group - > push_back ( first - > batch ) ;
write_batch_group - > push_back ( leader - > batch ) ;
// Allow the group to grow up to a maximum size, but if the
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// original write is small, limit the growth so we do not slow
@ -65,24 +74,35 @@ size_t WriteThread::BuildBatchGroup(
max_size = size + ( 128 < < 10 ) ;
max_size = size + ( 128 < < 10 ) ;
}
}
* last_writer = first ;
* last_writer = leader ;
if ( first - > has_callback ) {
if ( leader - > has_callback ) {
// TODO(agiardullo:) Batching not currently supported as this write may
// TODO(agiardullo:) Batching not currently supported as this write may
// fail if the callback function decides to abort this write.
// fail if the callback function decides to abort this write.
return size ;
return size ;
}
}
std : : deque < Writer * > : : iterator iter = writers_ . begin ( ) ;
Writer * newest_writer = newest_writer_ . load ( std : : memory_order_acquire ) ;
+ + iter ; // Advance past "first"
for ( ; iter ! = writers_ . end ( ) ; + + iter ) {
// This is safe regardless of any db mutex status of the caller. Previous
Writer * w = * iter ;
// calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
if ( w - > sync & & ! first - > sync ) {
// (they emptied the list and then we added ourself as leader) or had to
// explicitly wake up us (the list was non-empty when we added ourself,
// so we have already received our MarkJoined).
CreateMissingNewerLinks ( newest_writer ) ;
// Tricky. Iteration start (leader) is exclusive and finish
// (newest_writer) is inclusive. Iteration goes from old to new.
Writer * w = leader ;
while ( w ! = newest_writer ) {
w = w - > link_newer ;
if ( w - > sync & & ! leader - > sync ) {
// Do not include a sync write into a batch handled by a non-sync write.
// Do not include a sync write into a batch handled by a non-sync write.
break ;
break ;
}
}
if ( ! w - > disableWAL & & first - > disableWAL ) {
if ( ! w - > disableWAL & & leader - > disableWAL ) {
// Do not include a write that needs WAL into a batch that has
// Do not include a write that needs WAL into a batch that has
// WAL disabled.
// WAL disabled.
break ;
break ;
@ -113,4 +133,68 @@ size_t WriteThread::BuildBatchGroup(
return size ;
return size ;
}
}
void WriteThread : : ExitAsBatchGroupLeader ( Writer * leader , Writer * last_writer ,
Status status ) {
assert ( leader - > link_older = = nullptr ) ;
Writer * head = newest_writer_ . load ( std : : memory_order_acquire ) ;
if ( head ! = last_writer | |
! newest_writer_ . compare_exchange_strong ( head , nullptr ) ) {
// Either w wasn't the head during the load(), or it was the head
// during the load() but somebody else pushed onto the list before
// we did the compare_exchange_strong (causing it to fail). In the
// latter case compare_exchange_strong has the effect of re-reading
// its first param (head). No need to retry a failing CAS, because
// only a departing leader (which we are at the moment) can remove
// nodes from the list.
assert ( head ! = last_writer ) ;
// After walking link_older starting from head (if not already done)
// we will be able to traverse w->link_newer below. This function
// can only be called from an active leader, only a leader can
// clear newest_writer_, we didn't, and only a clear newest_writer_
// could cause the next leader to start their work without a call
// to MarkJoined, so we can definitely conclude that no other leader
// work is going on here (with or without db mutex).
CreateMissingNewerLinks ( head ) ;
assert ( last_writer - > link_newer - > link_older = = last_writer ) ;
last_writer - > link_newer - > link_older = nullptr ;
// Next leader didn't self-identify, because newest_writer_ wasn't
// nullptr when they enqueued (we were definitely enqueued before them
// and are still in the list). That means leader handoff occurs when
// we call MarkJoined
MarkJoined ( last_writer - > link_newer ) ;
}
// else nobody else was waiting, although there might already be a new
// leader now
while ( last_writer ! = leader ) {
last_writer - > status = status ;
last_writer - > done = true ;
// We must read link_older before calling MarkJoined, because as
// soon as it is marked the other thread's AwaitJoined may return
// and deallocate the Writer.
auto next = last_writer - > link_older ;
MarkJoined ( last_writer ) ;
last_writer = next ;
}
}
void WriteThread : : EnterUnbatched ( Writer * w , InstrumentedMutex * mu ) {
assert ( w - > batch = = nullptr ) ;
bool wait_needed ;
LinkOne ( w , & wait_needed ) ;
if ( wait_needed ) {
mu - > Unlock ( ) ;
Await ( w ) ;
mu - > Lock ( ) ;
}
}
void WriteThread : : ExitUnbatched ( Writer * w ) {
Status dummy_status ;
ExitAsBatchGroupLeader ( w , w , dummy_status ) ;
}
} // namespace rocksdb
} // namespace rocksdb