@ -92,10 +92,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
}
if ( write_thread_ . CompleteParallelWorker ( & w ) ) {
if ( write_thread_ . CompleteParallelWorker ( & w ) ) {
// we're responsible for early e xit
// we're responsible for exit batch group
auto last_sequence = w . parallel_group - > last_sequence ;
auto last_sequence = w . parallel_group - > last_sequence ;
versions_ - > SetLastSequence ( last_sequence ) ;
versions_ - > SetLastSequence ( last_sequence ) ;
write_thread_ . EarlyExitParallelGroup ( & w ) ;
UpdateBackgroundError ( w . status ) ;
write_thread_ . ExitAsBatchGroupFollower ( & w ) ;
}
}
assert ( w . state = = WriteThread : : STATE_COMPLETED ) ;
assert ( w . state = = WriteThread : : STATE_COMPLETED ) ;
// STATE_COMPLETED conditional below handles exit
// STATE_COMPLETED conditional below handles exit
@ -119,7 +120,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteContext write_context ;
WriteContext write_context ;
WriteThread : : Writer * last_writer = & w ; // Dummy intial value
WriteThread : : Writer * last_writer = & w ; // Dummy intial value
autovector < WriteThread : : Writer * > write_group ;
autovector < WriteThread : : Writer * > write_group ;
WriteThread : : ParallelGroup pg ;
bool logs_getting_synced = false ;
bool logs_getting_synced = false ;
bool in_parallel_group = false ;
uint64_t last_sequence = versions_ - > LastSequence ( ) ;
mutex_ . Lock ( ) ;
mutex_ . Lock ( ) ;
@ -136,7 +140,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// and protects against concurrent loggers and concurrent writes
// and protects against concurrent loggers and concurrent writes
// into memtables
// into memtables
bool exit_completed_early = false ;
last_batch_group_size_ =
last_batch_group_size_ =
write_thread_ . EnterAsBatchGroupLeader ( & w , & last_writer , & write_group ) ;
write_thread_ . EnterAsBatchGroupLeader ( & w , & last_writer , & write_group ) ;
@ -168,15 +171,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
}
}
}
uint64_t last_sequence = versions_ - > LastSequence ( ) ;
const SequenceNumber current_sequence = last_sequence + 1 ;
const SequenceNumber current_sequence = last_sequence + 1 ;
last_sequence + = total_count ;
last_sequence + = total_count ;
// Update stats while we are an exclusive group leader, so we know
// Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats.
// that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully
// We're optimistic, updating the stats before we successfully
// commit. That lets us release our leader status early in
// commit. That lets us release our leader status early.
// some cases.
auto stats = default_cf_internal_stats_ ;
auto stats = default_cf_internal_stats_ ;
stats - > AddDBStats ( InternalStats : : NUMBER_KEYS_WRITTEN , total_count ) ;
stats - > AddDBStats ( InternalStats : : NUMBER_KEYS_WRITTEN , total_count ) ;
RecordTick ( stats_ , NUMBER_KEYS_WRITTEN , total_count ) ;
RecordTick ( stats_ , NUMBER_KEYS_WRITTEN , total_count ) ;
@ -211,26 +212,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_GUARD ( write_memtable_time ) ;
PERF_TIMER_GUARD ( write_memtable_time ) ;
if ( ! parallel ) {
if ( ! parallel ) {
status = WriteBatchInternal : : InsertInto (
w . status = WriteBatchInternal : : InsertInto (
write_group , current_sequence , column_family_memtables_ . get ( ) ,
write_group , current_sequence , column_family_memtables_ . get ( ) ,
& flush_scheduler_ , write_options . ignore_missing_column_families ,
& flush_scheduler_ , write_options . ignore_missing_column_families ,
0 /*recovery_log_number*/ , this ) ;
0 /*recovery_log_number*/ , this ) ;
if ( status . ok ( ) ) {
// There were no write failures. Set leader's status
// in case the write callback returned a non-ok status.
status = w . FinalStatus ( ) ;
}
} else {
} else {
WriteThread : : ParallelGroup pg ;
pg . leader = & w ;
pg . leader = & w ;
pg . last_writer = last_writer ;
pg . last_writer = last_writer ;
pg . last_sequence = last_sequence ;
pg . last_sequence = last_sequence ;
pg . early_exit_allowed = ! need_log_sync ;
pg . running . store ( static_cast < uint32_t > ( write_group . size ( ) ) ,
pg . running . store ( static_cast < uint32_t > ( write_group . size ( ) ) ,
std : : memory_order_relaxed ) ;
std : : memory_order_relaxed ) ;
write_thread_ . LaunchParallelFollowers ( & pg , current_sequence ) ;
write_thread_ . LaunchParallelFollowers ( & pg , current_sequence ) ;
in_parallel_group = true ;
// Each parallel follower is doing each own writes. The leader should
// Each parallel follower is doing each own writes. The leader should
// also do its own.
// also do its own.
@ -244,40 +237,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
write_options . ignore_missing_column_families , 0 /*log_number*/ ,
write_options . ignore_missing_column_families , 0 /*log_number*/ ,
this , true /*concurrent_memtable_writes*/ ) ;
this , true /*concurrent_memtable_writes*/ ) ;
}
}
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
exit_completed_early = ! write_thread_ . CompleteParallelWorker ( & w ) ;
status = w . FinalStatus ( ) ;
}
if ( ! exit_completed_early & & w . status . ok ( ) ) {
versions_ - > SetLastSequence ( last_sequence ) ;
if ( ! need_log_sync ) {
write_thread_ . ExitAsBatchGroupLeader ( & w , last_writer , w . status ) ;
exit_completed_early = true ;
}
}
// A non-OK status here indicates that the state implied by the
// WAL has diverged from the in-memory state. This could be
// because of a corrupt write_batch (very bad), or because the
// client specified an invalid column family and didn't specify
// ignore_missing_column_families.
//
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
if ( ! status . ok ( ) & & ! w . CallbackFailed ( ) ) {
mutex_ . Lock ( ) ;
if ( bg_error_ . ok ( ) ) {
bg_error_ = status ; // stop compaction & fail any further writes
}
mutex_ . Unlock ( ) ;
}
}
}
}
}
}
PERF_TIMER_START ( write_pre_and_post_process_time ) ;
PERF_TIMER_START ( write_pre_and_post_process_time ) ;
//
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
if ( immutable_db_options_ . paranoid_checks & & ! status . ok ( ) & &
if ( immutable_db_options_ . paranoid_checks & & ! status . ok ( ) & &
! w . CallbackFailed ( ) & & ! status . IsBusy ( ) & & ! status . IsIncomplete ( ) ) {
! w . CallbackFailed ( ) & & ! status . IsBusy ( ) & & ! status . IsIncomplete ( ) ) {
mutex_ . Lock ( ) ;
mutex_ . Lock ( ) ;
@ -293,13 +260,39 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
mutex_ . Unlock ( ) ;
mutex_ . Unlock ( ) ;
}
}
if ( ! exit_completed_early ) {
bool should_exit_batch_group = true ;
if ( in_parallel_group ) {
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
should_exit_batch_group = write_thread_ . CompleteParallelWorker ( & w ) ;
}
if ( should_exit_batch_group ) {
versions_ - > SetLastSequence ( last_sequence ) ;
UpdateBackgroundError ( w . status ) ;
write_thread_ . ExitAsBatchGroupLeader ( & w , last_writer , w . status ) ;
write_thread_ . ExitAsBatchGroupLeader ( & w , last_writer , w . status ) ;
}
}
if ( status . ok ( ) ) {
status = w . FinalStatus ( ) ;
}
return status ;
return status ;
}
}
void DBImpl : : UpdateBackgroundError ( const Status & memtable_insert_status ) {
// A non-OK status here indicates that the state implied by the
// WAL has diverged from the in-memory state. This could be
// because of a corrupt write_batch (very bad), or because the
// client specified an invalid column family and didn't specify
// ignore_missing_column_families.
if ( ! memtable_insert_status . ok ( ) ) {
mutex_ . Lock ( ) ;
assert ( bg_error_ . ok ( ) ) ;
bg_error_ = memtable_insert_status ;
mutex_ . Unlock ( ) ;
}
}
Status DBImpl : : PreprocessWrite ( const WriteOptions & write_options ,
Status DBImpl : : PreprocessWrite ( const WriteOptions & write_options ,
bool need_log_sync , bool * logs_getting_synced ,
bool need_log_sync , bool * logs_getting_synced ,
WriteContext * write_context ) {
WriteContext * write_context ) {
@ -324,7 +317,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
}
}
if ( UNLIKELY ( status . ok ( ) & & ! bg_error_ . ok ( ) ) ) {
if ( UNLIKELY ( status . ok ( ) & & ! bg_error_ . ok ( ) ) ) {
status = bg_error_ ;
return bg_error_ ;
}
}
if ( UNLIKELY ( status . ok ( ) & & ! flush_scheduler_ . Empty ( ) ) ) {
if ( UNLIKELY ( status . ok ( ) & & ! flush_scheduler_ . Empty ( ) ) ) {