@ -656,7 +656,6 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
// contains one batch, that batch should be written to the WAL,
// and the batch is not wanting to be truncated
merged_batch = leader - > batch ;
leader - > log_used = logfile_number_ ;
* write_with_wal = 1 ;
} else {
// WAL needs all of the batches flattened into a single batch.
@ -669,7 +668,6 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
/*WAL_only*/ true ) ;
( * write_with_wal ) + + ;
}
writer - > log_used = logfile_number_ ;
}
}
return merged_batch ;
@ -700,6 +698,13 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
size_t write_with_wal = 0 ;
WriteBatch * merged_batch =
MergeBatch ( write_group , & tmp_batch_ , & write_with_wal ) ;
if ( merged_batch = = write_group . leader - > batch ) {
write_group . leader - > log_used = logfile_number_ ;
} else if ( write_with_wal > 1 ) {
for ( auto writer : write_group ) {
writer - > log_used = logfile_number_ ;
}
}
WriteBatchInternal : : SetSequence ( merged_batch , sequence ) ;
@ -760,6 +765,13 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
// We need to lock log_write_mutex_ since logs_ and alive_log_files might be
// pushed back concurrently
log_write_mutex_ . Lock ( ) ;
if ( merged_batch = = write_group . leader - > batch ) {
write_group . leader - > log_used = logfile_number_ ;
} else if ( write_with_wal > 1 ) {
for ( auto writer : write_group ) {
writer - > log_used = logfile_number_ ;
}
}
* last_sequence = versions_ - > FetchAddLastToBeWrittenSequence ( total_count ) ;
auto sequence = * last_sequence + 1 ;
WriteBatchInternal : : SetSequence ( merged_batch , sequence ) ;
@ -1008,6 +1020,11 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* cfd,
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl : : SwitchMemtable ( ColumnFamilyData * cfd , WriteContext * context ) {
mutex_ . AssertHeld ( ) ;
WriteThread : : Writer nonmem_w ;
if ( concurrent_prepare_ ) {
nonmem_write_thread_ . EnterUnbatched ( & nonmem_w , & mutex_ ) ;
}
unique_ptr < WritableFile > lfile ;
log : : Writer * new_log = nullptr ;
MemTable * new_mem = nullptr ;
@ -1021,7 +1038,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
// Attempt to switch to a new memtable and trigger flush of old.
// Do this without holding the dbmutex lock.
assert ( versions_ - > prev_log_number ( ) = = 0 ) ;
//log_write_mutex_.Lock();
bool creating_new_log = ! log_empty_ ;
//log_write_mutex_.Unlock();
uint64_t recycle_log_number = 0 ;
if ( creating_new_log & & immutable_db_options_ . recycle_log_file_num & &
! log_recycle_files . empty ( ) ) {
@ -1106,14 +1125,17 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
assert ( creating_new_log ) ;
assert ( ! new_mem ) ;
assert ( ! new_log ) ;
if ( concurrent_prepare_ ) {
nonmem_write_thread_ . ExitUnbatched ( & nonmem_w ) ;
}
return s ;
}
if ( creating_new_log ) {
log_write_mutex_ . Lock ( ) ;
logfile_number_ = new_log_number ;
assert ( new_log ! = nullptr ) ;
log_empty_ = true ;
log_dir_synced_ = false ;
log_write_mutex_ . Lock ( ) ;
if ( ! logs_ . empty ( ) ) {
// Alway flush the buffer of the last log before switching to a new one
log : : Writer * cur_log_writer = logs_ . back ( ) . writer ;
@ -1143,6 +1165,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
cfd - > SetMemtable ( new_mem ) ;
context - > superversions_to_free_ . push_back ( InstallSuperVersionAndScheduleWork (
cfd , new_superversion , mutable_cf_options ) ) ;
if ( concurrent_prepare_ ) {
nonmem_write_thread_ . ExitUnbatched ( & nonmem_w ) ;
}
return s ;
}