@ -146,17 +146,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if ( write_thread_ . CompleteParallelMemTableWriter ( & w ) ) {
// we're responsible for exit batch group
for ( auto * writer : * ( w . write_group ) ) {
if ( ! writer - > CallbackFailed ( ) & & writer - > pre_release_callback ) {
assert ( writer - > sequence ! = kMaxSequenceNumber ) ;
Status ws = writer - > pre_release_callback - > Callback ( writer - > sequence ,
disable_memtable ) ;
if ( ! ws . ok ( ) ) {
status = ws ;
break ;
}
}
}
// TODO(myabandeh): propagate status to write_group
auto last_sequence = w . write_group - > last_sequence ;
versions_ - > SetLastSequence ( last_sequence ) ;
@ -309,6 +298,35 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
const SequenceNumber current_sequence = last_sequence + 1 ;
last_sequence + = seq_inc ;
// PreReleaseCallback is called after WAL write and before memtable write
if ( status . ok ( ) ) {
SequenceNumber next_sequence = current_sequence ;
// Note: the logic for advancing seq here must be consistent with the
// logic in WriteBatchInternal::InsertInto(write_group...) as well as
// with WriteBatchInternal::InsertInto(write_batch...) that is called on
// the merged batch during recovery from the WAL.
for ( auto * writer : write_group ) {
if ( writer - > CallbackFailed ( ) ) {
continue ;
}
writer - > sequence = next_sequence ;
if ( writer - > pre_release_callback ) {
Status ws = writer - > pre_release_callback - > Callback ( writer - > sequence ,
disable_memtable ) ;
if ( ! ws . ok ( ) ) {
status = ws ;
break ;
}
}
if ( seq_per_batch_ ) {
assert ( writer - > batch_cnt ) ;
next_sequence + = writer - > batch_cnt ;
} else if ( writer - > ShouldWriteToMemtable ( ) ) {
next_sequence + = WriteBatchInternal : : Count ( writer - > batch ) ;
}
}
}
if ( status . ok ( ) ) {
PERF_TIMER_GUARD ( write_memtable_time ) ;
@ -320,23 +338,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
0 /*recovery_log_number*/ , this , parallel , seq_per_batch_ ,
batch_per_txn_ ) ;
} else {
SequenceNumber next_sequence = current_sequence ;
// Note: the logic for advancing seq here must be consistent with the
// logic in WriteBatchInternal::InsertInto(write_group...) as well as
// with WriteBatchInternal::InsertInto(write_batch...) that is called on
// the merged batch during recovery from the WAL.
for ( auto * writer : write_group ) {
if ( writer - > CallbackFailed ( ) ) {
continue ;
}
writer - > sequence = next_sequence ;
if ( seq_per_batch_ ) {
assert ( writer - > batch_cnt ) ;
next_sequence + = writer - > batch_cnt ;
} else if ( writer - > ShouldWriteToMemtable ( ) ) {
next_sequence + = WriteBatchInternal : : Count ( writer - > batch ) ;
}
}
write_group . last_sequence = last_sequence ;
write_thread_ . LaunchParallelMemTableWriters ( & write_group ) ;
in_parallel_group = true ;
@ -388,17 +389,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
if ( should_exit_batch_group ) {
if ( status . ok ( ) ) {
for ( auto * writer : write_group ) {
if ( ! writer - > CallbackFailed ( ) & & writer - > pre_release_callback ) {
assert ( writer - > sequence ! = kMaxSequenceNumber ) ;
Status ws = writer - > pre_release_callback - > Callback ( writer - > sequence ,
disable_memtable ) ;
if ( ! ws . ok ( ) ) {
status = ws ;
break ;
}
}
}
// Note: if we are to resume after non-OK statuses we need to revisit how
// we reacts to non-OK statuses here.
versions_ - > SetLastSequence ( last_sequence ) ;
}
MemTableInsertStatusCheck ( w . status ) ;