@ -342,6 +342,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
last_stats_dump_time_microsec_ ( 0 ) ,
next_job_id_ ( 1 ) ,
has_unpersisted_data_ ( false ) ,
unable_to_flush_oldest_log_ ( false ) ,
env_options_ ( BuildDBOptions ( immutable_db_options_ , mutable_db_options_ ) ) ,
num_running_ingest_file_ ( 0 ) ,
# ifndef ROCKSDB_LITE
@ -663,6 +664,10 @@ void DBImpl::MaybeDumpStats() {
}
uint64_t DBImpl : : FindMinPrepLogReferencedByMemTable ( ) {
if ( ! allow_2pc ( ) ) {
return 0 ;
}
uint64_t min_log = 0 ;
// we must look through the memtables for two phase transactions
@ -707,6 +712,11 @@ void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
}
uint64_t DBImpl : : FindMinLogContainingOutstandingPrep ( ) {
if ( ! allow_2pc ( ) ) {
return 0 ;
}
std : : lock_guard < std : : mutex > lock ( prep_heap_mutex_ ) ;
uint64_t min_log = 0 ;
@ -2505,7 +2515,7 @@ Status DBImpl::SetDBOptions(
mutable_db_options_ = new_options ;
if ( total_log_size_ > GetMaxTotalWalSize ( ) ) {
FlushColumnFamilies ( ) ;
Maybe FlushColumnFamilies( ) ;
}
persist_options_status = PersistOptions ( ) ;
@ -4698,9 +4708,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
versions_ - > GetColumnFamilySet ( ) - > NumberOfColumnFamilies ( ) = = 1 ) ;
if ( UNLIKELY ( ! single_column_family_mode_ & &
! alive_log_files_ . begin ( ) - > getting_flushed & &
total_log_size_ > GetMaxTotalWalSize ( ) ) ) {
FlushColumnFamilies ( ) ;
Maybe FlushColumnFamilies( ) ;
} else if ( UNLIKELY ( write_buffer_manager_ - > ShouldFlush ( ) ) ) {
// Before a new memtable is added in SwitchMemtable(),
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
@ -5018,28 +5027,40 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return status ;
}
void DBImpl : : FlushColumnFamilies ( ) {
void DBImpl : : Maybe FlushColumnFamilies( ) {
mutex_ . AssertHeld ( ) ;
WriteContext context ;
if ( alive_log_files_ . begin ( ) - > getting_flushed ) {
return ;
}
uint64_t flush_column_family_if_log_file = alive_log_files_ . begin ( ) - > number ;
alive_log_files_ . begin ( ) - > getting_flushed = true ;
auto oldest_alive_log = alive_log_files_ . begin ( ) - > number ;
auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep ( ) ;
if ( allow_2pc ( ) & &
unable_to_flush_oldest_log_ & &
oldest_log_with_uncommited_prep > 0 & &
oldest_log_with_uncommited_prep < = oldest_alive_log ) {
// we already attempted to flush all column families dependent on
// the oldest alive log but the log still contained uncommited transactions.
// the oldest alive log STILL contains uncommited transaction so there
// is still nothing that we can do.
return ;
}
WriteContext context ;
Log ( InfoLogLevel : : INFO_LEVEL , immutable_db_options_ . info_log ,
" Flushing all column families with data in WAL number % " PRIu64
" . Total log size is % " PRIu64 " while max_total_wal_size is % " PRIu64 ,
flush_column_family_if_log_file , total_log_size_ , GetMaxTotalWalSize ( ) ) ;
oldest_alive_log , total_log_size_ , GetMaxTotalWalSize ( ) ) ;
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
for ( auto cfd : * versions_ - > GetColumnFamilySet ( ) ) {
if ( cfd - > IsDropped ( ) ) {
continue ;
}
if ( cfd - > GetLogNumber ( ) < = flush_column_family_if_log_file ) {
if ( cfd - > OldestLogToKeep ( ) < = oldest_alive_log ) {
auto status = SwitchMemtable ( cfd , & context ) ;
if ( ! status . ok ( ) ) {
break ;
@ -5049,6 +5070,26 @@ void DBImpl::FlushColumnFamilies() {
}
}
MaybeScheduleFlushOrCompaction ( ) ;
// we only mark this log as getting flushed if we have successfully
// flushed all data in this log. If this log contains outstanding prepred
// transactions then we cannot flush this log until those transactions are commited.
unable_to_flush_oldest_log_ = false ;
if ( allow_2pc ( ) ) {
if ( oldest_log_with_uncommited_prep = = 0 | |
oldest_log_with_uncommited_prep > oldest_alive_log ) {
// this log contains no outstanding prepared transactions
alive_log_files_ . begin ( ) - > getting_flushed = true ;
} else {
Log ( InfoLogLevel : : WARN_LEVEL , immutable_db_options_ . info_log ,
" Unable to release oldest log due to uncommited transaction " ) ;
unable_to_flush_oldest_log_ = true ;
}
} else {
alive_log_files_ . begin ( ) - > getting_flushed = true ;
}
}
uint64_t DBImpl : : GetMaxTotalWalSize ( ) const {