@ -224,6 +224,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
log_dir_synced_ ( false ) ,
log_empty_ ( true ) ,
default_cf_handle_ ( nullptr ) ,
log_sync_cv_ ( & mutex_ ) ,
total_log_size_ ( 0 ) ,
max_total_in_memory_state_ ( 0 ) ,
is_snapshot_supported_ ( true ) ,
@ -367,6 +368,7 @@ DBImpl::~DBImpl() {
for ( auto l : logs_to_free_ ) {
delete l ;
}
logs_ . clear ( ) ;
// versions need to be destroyed before table_cache since it can hold
// references to table_cache.
@ -535,8 +537,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
versions_ - > GetObsoleteFiles ( & job_context - > sst_delete_files ,
job_context - > min_pending_output ) ;
uint64_t min_log_number = versions_ - > MinLogNumber ( ) ;
if ( ! alive_log_files_ . empty ( ) ) {
uint64_t min_log_number = versions_ - > MinLogNumber ( ) ;
// find newly obsoleted log files
while ( alive_log_files_ . begin ( ) - > number < min_log_number ) {
auto & earliest = * alive_log_files_ . begin ( ) ;
@ -547,6 +549,18 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// number < MinLogNumber().
assert ( alive_log_files_ . size ( ) ) ;
}
while ( ! logs_ . empty ( ) & & logs_ . front ( ) . number < min_log_number ) {
auto & log = logs_ . front ( ) ;
if ( log . getting_synced ) {
log_sync_cv_ . Wait ( ) ;
// logs_ could have changed while we were waiting.
continue ;
}
logs_to_free_ . push_back ( log . writer . release ( ) ) ;
logs_ . pop_front ( ) ;
}
// Current log cannot be obsolete.
assert ( ! logs_ . empty ( ) ) ;
}
// We're just cleaning up for DB::Write().
@ -597,6 +611,11 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}
}
Status DBImpl : : SyncLog ( log : : Writer * log ) {
assert ( log ) ;
return log - > file ( ) - > Sync ( db_options_ . use_fsync ) ;
}
namespace {
bool CompareCandidateFile ( const JobContext : : CandidateFileInfo & first ,
const JobContext : : CandidateFileInfo & second ) {
@ -3448,11 +3467,22 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
uint64_t last_sequence = versions_ - > LastSequence ( ) ;
WriteThread : : Writer * last_writer = & w ;
autovector < WriteBatch * > write_batch_group ;
bool need_wal_sync = ! write_options . disableWAL & & write_options . sync ;
if ( status . ok ( ) ) {
last_batch_group_size_ =
write_thread_ . BuildBatchGroup ( & last_writer , & write_batch_group ) ;
if ( need_wal_sync ) {
while ( logs_ . front ( ) . getting_synced ) {
log_sync_cv_ . Wait ( ) ;
}
for ( auto & log : logs_ ) {
assert ( ! log . getting_synced ) ;
log . getting_synced = true ;
}
}
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
@ -3499,16 +3529,28 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if ( ! write_options . disableWAL ) {
PERF_TIMER_GUARD ( write_wal_time ) ;
Slice log_entry = WriteBatchInternal : : Contents ( updates ) ;
status = log_ - > AddRecord ( log_entry ) ;
status = logs _ . back ( ) . writer - > AddRecord ( log_entry ) ;
total_log_size_ + = log_entry . size ( ) ;
alive_log_files_ . back ( ) . AddSize ( log_entry . size ( ) ) ;
log_empty_ = false ;
log_size = log_entry . size ( ) ;
RecordTick ( stats_ , WAL_FILE_BYTES , log_size ) ;
if ( status . ok ( ) & & write_options . sync ) {
if ( status . ok ( ) & & need_wal_ sync) {
RecordTick ( stats_ , WAL_FILE_SYNCED ) ;
StopWatch sw ( env_ , stats_ , WAL_FILE_SYNC_MICROS ) ;
status = log_ - > file ( ) - > Sync ( db_options_ . use_fsync ) ;
// It's safe to access logs_ with unlocked mutex_ here because:
// - we've set getting_synced=true for all logs,
// so other threads won't pop from logs_ while we're here,
// - only writer thread can push to logs_, and we're in
// writer thread, so no one will push to logs_,
// - as long as other threads don't modify it, it's safe to read
// from std::deque from multiple threads concurrently.
for ( auto & log : logs_ ) {
status = SyncLog ( log . writer . get ( ) ) ;
if ( ! status . ok ( ) ) {
break ;
}
}
if ( status . ok ( ) & & ! log_dir_synced_ ) {
// We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync,
@ -3557,14 +3599,27 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} else {
// Operation failed. Make sure sure mutex is held for cleanup code below.
mutex_ . Lock ( ) ;
}
}
if ( db_options_ . paranoid_checks & & ! status . ok ( ) & &
! status . IsBusy ( ) & & bg_error_ . ok ( ) ) {
if ( db_options_ . paranoid_checks & & ! status . ok ( ) & &
! status . IsBusy ( ) & & bg_error_ . ok ( ) ) {
bg_error_ = status ; // stop compaction & fail any further writes
}
mutex_ . AssertHeld ( ) ;
if ( need_wal_sync ) {
while ( logs_ . size ( ) > 1 ) {
auto & log = logs_ . front ( ) ;
assert ( log . getting_synced ) ;
logs_to_free_ . push_back ( log . writer . release ( ) ) ;
logs_ . pop_front ( ) ;
}
assert ( logs_ . back ( ) . getting_synced ) ;
logs_ . back ( ) . getting_synced = false ;
log_sync_cv_ . SignalAll ( ) ;
}
write_thread_ . ExitWriteThread ( & w , last_writer , status ) ;
mutex_ . Unlock ( ) ;
@ -3675,9 +3730,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
if ( creating_new_log ) {
logfile_number_ = new_log_number ;
assert ( new_log ! = nullptr ) ;
logs_to_free_ . push_back ( log_ . release ( ) ) ;
log_ . reset ( new_log ) ;
log_empty_ = true ;
logs_ . emplace_back ( logfile_number_ , std : : unique_ptr < log : : Writer > ( new_log ) ) ;
alive_log_files_ . push_back ( LogFileNumberSize ( logfile_number_ ) ) ;
for ( auto loop_cfd : * versions_ - > GetColumnFamilySet ( ) ) {
// all this is just optimization to delete logs that
@ -4210,7 +4264,10 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
impl - > logfile_number_ = new_log_number ;
unique_ptr < WritableFileWriter > file_writer (
new WritableFileWriter ( std : : move ( lfile ) , opt_env_options ) ) ;
impl - > log_ . reset ( new log : : Writer ( std : : move ( file_writer ) ) ) ;
impl - > logs_ . emplace_back (
new_log_number ,
std : : unique_ptr < log : : Writer > (
new log : : Writer ( std : : move ( file_writer ) ) ) ) ;
// set column family handles
for ( auto cf : column_families ) {