@ -155,7 +155,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
Status DBImpl : : FlushMemTableToOutputFile (
ColumnFamilyData * cfd , const MutableCFOptions & mutable_cf_options ,
bool * made_progress , JobContext * job_context ,
bool * made_progress , JobContext * job_context , FlushReason flush_reason ,
SuperVersionContext * superversion_context ,
std : : vector < SequenceNumber > & snapshot_seqs ,
SequenceNumber earliest_write_conflict_snapshot ,
@ -215,7 +215,8 @@ Status DBImpl::FlushMemTableToOutputFile(
dbname_ , cfd , immutable_db_options_ , mutable_cf_options , max_memtable_id ,
file_options_for_compaction_ , versions_ . get ( ) , & mutex_ , & shutting_down_ ,
snapshot_seqs , earliest_write_conflict_snapshot , snapshot_checker ,
job_context , log_buffer , directories_ . GetDbDir ( ) , GetDataDir ( cfd , 0U ) ,
job_context , flush_reason , log_buffer , directories_ . GetDbDir ( ) ,
GetDataDir ( cfd , 0U ) ,
GetCompressionFlush ( * cfd - > ioptions ( ) , mutable_cf_options ) , stats_ ,
& event_logger_ , mutable_cf_options . report_bg_io_stats ,
true /* sync_output_directory */ , true /* write_manifest */ , thread_pri ,
@ -260,7 +261,8 @@ Status DBImpl::FlushMemTableToOutputFile(
# ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin ( cfd , & file_meta , mutable_cf_options , job_context - > job_id ) ;
NotifyOnFlushBegin ( cfd , & file_meta , mutable_cf_options , job_context - > job_id ,
flush_reason ) ;
# endif // ROCKSDB_LITE
bool switched_to_mempurge = false ;
@ -390,8 +392,9 @@ Status DBImpl::FlushMemTablesToOutputFiles(
MutableCFOptions mutable_cf_options_copy = * cfd - > GetLatestMutableCFOptions ( ) ;
SuperVersionContext * superversion_context =
bg_flush_arg . superversion_context_ ;
FlushReason flush_reason = bg_flush_arg . flush_reason_ ;
Status s = FlushMemTableToOutputFile (
cfd , mutable_cf_options_copy , made_progress , job_context ,
cfd , mutable_cf_options_copy , made_progress , job_context , flush_reason ,
superversion_context , snapshot_seqs , earliest_write_conflict_snapshot ,
snapshot_checker , log_buffer , thread_pri ) ;
return s ;
@ -420,7 +423,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
for ( const auto cfd : cfds ) {
assert ( cfd - > imm ( ) - > NumNotFlushed ( ) ! = 0 ) ;
assert ( cfd - > imm ( ) - > IsFlushPending ( ) ) ;
assert ( cfd - > GetFlushReason ( ) = = cfds [ 0 ] - > GetFlushReason ( ) ) ;
}
for ( const auto bg_flush_arg : bg_flush_args ) {
assert ( bg_flush_arg . flush_reason_ = = bg_flush_args [ 0 ] . flush_reason_ ) ;
}
# endif /* !NDEBUG */
@ -459,13 +464,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
all_mutable_cf_options . emplace_back ( * cfd - > GetLatestMutableCFOptions ( ) ) ;
const MutableCFOptions & mutable_cf_options = all_mutable_cf_options . back ( ) ;
uint64_t max_memtable_id = bg_flush_args [ i ] . max_memtable_id_ ;
FlushReason flush_reason = bg_flush_args [ i ] . flush_reason_ ;
jobs . emplace_back ( new FlushJob (
dbname_ , cfd , immutable_db_options_ , mutable_cf_options ,
max_memtable_id , file_options_for_compaction_ , versions_ . get ( ) , & mutex_ ,
& shutting_down_ , snapshot_seqs , earliest_write_conflict_snapshot ,
snapshot_checker , job_context , log_buffer , directories_ . GetDbDir ( ) ,
data_dir , GetCompressionFlush ( * cfd - > ioptions ( ) , mutable_cf_options ) ,
stats_ , & event_logger_ , mutable_cf_options . report_bg_io_stats ,
snapshot_checker , job_context , flush_reason , log_buffer ,
directories_ . GetDbDir ( ) , data_dir ,
GetCompressionFlush ( * cfd - > ioptions ( ) , mutable_cf_options ) , stats_ ,
& event_logger_ , mutable_cf_options . report_bg_io_stats ,
false /* sync_output_directory */ , false /* write_manifest */ ,
thread_pri , io_tracer_ , seqno_time_mapping_ , db_id_ , db_session_id_ ,
cfd - > GetFullHistoryTsLow ( ) , & blob_callback_ ) ) ;
@ -483,8 +490,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
for ( int i = 0 ; i ! = num_cfs ; + + i ) {
const MutableCFOptions & mutable_cf_options = all_mutable_cf_options . at ( i ) ;
// may temporarily unlock and lock the mutex.
FlushReason flush_reason = bg_flush_args [ i ] . flush_reason_ ;
NotifyOnFlushBegin ( cfds [ i ] , & file_meta [ i ] , mutable_cf_options ,
job_context - > job_id ) ;
job_context - > job_id , flush_reason ) ;
}
# endif /* !ROCKSDB_LITE */
@ -642,8 +650,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
bool resuming_from_bg_err =
error_handler_ . IsDBStopped ( ) | |
( cfds [ 0 ] - > GetFlushReason ( ) = = FlushReason : : kErrorRecovery | |
cfds [ 0 ] - > GetFlushReason ( ) = = FlushReason : : kErrorRecoveryRetryFlush ) ;
( bg_flush_args [ 0 ] . flush_reason_ = = FlushReason : : kErrorRecovery | |
bg_flush_args [ 0 ] . flush_reason_ = =
FlushReason : : kErrorRecoveryRetryFlush ) ;
while ( ( ! resuming_from_bg_err | | error_handler_ . GetRecoveryError ( ) . ok ( ) ) ) {
std : : pair < Status , bool > res = wait_to_install_func ( ) ;
@ -660,8 +669,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
resuming_from_bg_err =
error_handler_ . IsDBStopped ( ) | |
( cfds [ 0 ] - > GetFlushReason ( ) = = FlushReason : : kErrorRecovery | |
cfds [ 0 ] - > GetFlushReason ( ) = = FlushReason : : kErrorRecoveryRetryFlush ) ;
( bg_flush_args [ 0 ] . flush_reason_ = = FlushReason : : kErrorRecovery | |
bg_flush_args [ 0 ] . flush_reason_ = =
FlushReason : : kErrorRecoveryRetryFlush ) ;
}
if ( ! resuming_from_bg_err ) {
@ -816,7 +826,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
void DBImpl : : NotifyOnFlushBegin ( ColumnFamilyData * cfd , FileMetaData * file_meta ,
const MutableCFOptions & mutable_cf_options ,
int job_id ) {
int job_id , FlushReason flush_reason ) {
# ifndef ROCKSDB_LITE
if ( immutable_db_options_ . listeners . size ( ) = = 0U ) {
return ;
@ -849,7 +859,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
info . triggered_writes_stop = triggered_writes_stop ;
info . smallest_seqno = file_meta - > fd . smallest_seqno ;
info . largest_seqno = file_meta - > fd . largest_seqno ;
info . flush_reason = cfd - > GetFlushReason ( ) ;
info . flush_reason = flush_reason ;
for ( auto listener : immutable_db_options_ . listeners ) {
listener - > OnFlushBegin ( this , info ) ;
}
@ -862,6 +872,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
( void ) file_meta ;
( void ) mutable_cf_options ;
( void ) job_id ;
( void ) flush_reason ;
# endif // ROCKSDB_LITE
}
@ -2102,16 +2113,17 @@ Status DBImpl::RunManualCompaction(
}
void DBImpl : : GenerateFlushRequest ( const autovector < ColumnFamilyData * > & cfds ,
FlushRequest * req ) {
FlushReason flush_reason , FlushRe quest * req ) {
assert ( req ! = nullptr ) ;
req - > reserve ( cfds . size ( ) ) ;
req - > flush_reason = flush_reason ;
req - > cfd_to_max_mem_id_to_persist . reserve ( cfds . size ( ) ) ;
for ( const auto cfd : cfds ) {
if ( nullptr = = cfd ) {
// cfd may be null, see DBImpl::ScheduleFlushes
continue ;
}
uint64_t max_memtable_id = cfd - > imm ( ) - > GetLatestMemTableID ( ) ;
req - > emplace_back ( cfd , max_memtable_id ) ;
req - > cfd_to_max_mem_id_to_persist . emplace ( cfd , max_memtable_id ) ;
}
}
@ -2169,7 +2181,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
if ( s . ok ( ) ) {
if ( cfd - > imm ( ) - > NumNotFlushed ( ) ! = 0 | | ! cfd - > mem ( ) - > IsEmpty ( ) | |
! cached_recoverable_state_empty_ . load ( ) ) {
FlushRequest req { { cfd , flush_memtable_id } } ;
FlushRequest req { flush_reason , { { cfd , flush_memtable_id } } } ;
flush_reqs . emplace_back ( std : : move ( req ) ) ;
memtable_ids_to_wait . emplace_back ( cfd - > imm ( ) - > GetLatestMemTableID ( ) ) ;
}
@ -2197,7 +2209,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
" to avoid holding old logs " ,
cfd - > GetName ( ) . c_str ( ) ) ;
s = SwitchMemtable ( cfd_stats , & context ) ;
FlushRequest req { { cfd_stats , flush_memtable_id } } ;
FlushRequest req { flush_reason , { { cfd_stats , flush_memtable_id } } } ;
flush_reqs . emplace_back ( std : : move ( req ) ) ;
memtable_ids_to_wait . emplace_back (
cfd_stats - > imm ( ) - > GetLatestMemTableID ( ) ) ;
@ -2208,8 +2220,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
if ( s . ok ( ) & & ! flush_reqs . empty ( ) ) {
for ( const auto & req : flush_reqs ) {
assert ( req . size ( ) = = 1 ) ;
ColumnFamilyData * loop_cfd = req [ 0 ] . first ;
assert ( req . cfd_to_max_mem_id_to_persist . size ( ) = = 1 ) ;
ColumnFamilyData * loop_cfd =
req . cfd_to_max_mem_id_to_persist . begin ( ) - > first ;
loop_cfd - > imm ( ) - > FlushRequested ( ) ;
}
// If the caller wants to wait for this flush to complete, it indicates
@ -2218,13 +2231,14 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
// Therefore, we increase the cfd's ref count.
if ( flush_options . wait ) {
for ( const auto & req : flush_reqs ) {
assert ( req . size ( ) = = 1 ) ;
ColumnFamilyData * loop_cfd = req [ 0 ] . first ;
assert ( req . cfd_to_max_mem_id_to_persist . size ( ) = = 1 ) ;
ColumnFamilyData * loop_cfd =
req . cfd_to_max_mem_id_to_persist . begin ( ) - > first ;
loop_cfd - > Ref ( ) ;
}
}
for ( const auto & req : flush_reqs ) {
SchedulePendingFlush ( req , flush_reason ) ;
SchedulePendingFlush ( req ) ;
}
MaybeScheduleFlushOrCompaction ( ) ;
}
@ -2243,8 +2257,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
autovector < const uint64_t * > flush_memtable_ids ;
assert ( flush_reqs . size ( ) = = memtable_ids_to_wait . size ( ) ) ;
for ( size_t i = 0 ; i < flush_reqs . size ( ) ; + + i ) {
assert ( flush_reqs [ i ] . size ( ) = = 1 ) ;
cfds . push_back ( flush_reqs [ i ] [ 0 ] . first ) ;
assert ( flush_reqs [ i ] . cfd_to_max_mem_id_to_persist . size ( ) = = 1 ) ;
cfds . push_back ( flush_reqs [ i ] . cfd_to_max_mem_id_to_persist . begin ( ) - > first ) ;
flush_memtable_ids . push_back ( & ( memtable_ids_to_wait [ i ] ) ) ;
}
s = WaitForFlushMemTables (
@ -2341,8 +2355,8 @@ Status DBImpl::AtomicFlushMemTables(
cfd - > Ref ( ) ;
}
}
GenerateFlushRequest ( cfds , & flush_req ) ;
SchedulePendingFlush ( flush_req , flush_reason ) ;
GenerateFlushRequest ( cfds , flush_reason , & flush_req ) ;
SchedulePendingFlush ( flush_req ) ;
MaybeScheduleFlushOrCompaction ( ) ;
}
@ -2357,7 +2371,7 @@ Status DBImpl::AtomicFlushMemTables(
TEST_SYNC_POINT ( " DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush " ) ;
if ( s . ok ( ) & & flush_options . wait ) {
autovector < const uint64_t * > flush_memtable_ids ;
for ( auto & iter : flush_req ) {
for ( auto & iter : flush_req . cfd_to_max_mem_id_to_persist ) {
flush_memtable_ids . push_back ( & ( iter . second ) ) ;
}
s = WaitForFlushMemTables (
@ -2704,9 +2718,9 @@ DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
FlushRequest flush_req = flush_queue_ . front ( ) ;
flush_queue_ . pop_front ( ) ;
if ( ! immutable_db_options_ . atomic_flush ) {
assert ( flush_req . size ( ) = = 1 ) ;
assert ( flush_req . cfd_to_max_mem_id_to_persist . size ( ) = = 1 ) ;
}
for ( const auto & elem : flush_req ) {
for ( const auto & elem : flush_req . cfd_to_max_mem_id_to_persist ) {
if ( ! immutable_db_options_ . atomic_flush ) {
ColumnFamilyData * cfd = elem . first ;
assert ( cfd ) ;
@ -2714,7 +2728,6 @@ DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
cfd - > set_queued_for_flush ( false ) ;
}
}
// TODO: need to unset flush reason?
return flush_req ;
}
@ -2744,31 +2757,29 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue(
return cfd ;
}
void DBImpl : : SchedulePendingFlush ( const FlushRequest & flush_req ,
FlushReason flush_reason ) {
void DBImpl : : SchedulePendingFlush ( const FlushRequest & flush_req ) {
mutex_ . AssertHeld ( ) ;
if ( flush_req . empty ( ) ) {
if ( flush_req . cfd_to_max_mem_id_to_persist . empty ( ) ) {
return ;
}
if ( ! immutable_db_options_ . atomic_flush ) {
// For the non-atomic flush case, we never schedule multiple column
// families in the same flush request.
assert ( flush_req . size ( ) = = 1 ) ;
ColumnFamilyData * cfd = flush_req [ 0 ] . first ;
assert ( flush_req . cfd_to_max_mem_id_to_persist . size ( ) = = 1 ) ;
ColumnFamilyData * cfd =
flush_req . cfd_to_max_mem_id_to_persist . begin ( ) - > first ;
assert ( cfd ) ;
if ( ! cfd - > queued_for_flush ( ) & & cfd - > imm ( ) - > IsFlushPending ( ) ) {
cfd - > Ref ( ) ;
cfd - > set_queued_for_flush ( true ) ;
cfd - > SetFlushReason ( flush_reason ) ;
+ + unscheduled_flushes_ ;
flush_queue_ . push_back ( flush_req ) ;
}
} else {
for ( auto & iter : flush_req ) {
for ( auto & iter : flush_req . cfd_to_max_mem_id_to_persist ) {
ColumnFamilyData * cfd = iter . first ;
cfd - > Ref ( ) ;
cfd - > SetFlushReason ( flush_reason ) ;
}
+ + unscheduled_flushes_ ;
flush_queue_ . push_back ( flush_req ) ;
@ -2900,10 +2911,12 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
while ( ! flush_queue_ . empty ( ) ) {
// This cfd is already referenced
const FlushRequest & flush_req = PopFirstFromFlushQueue ( ) ;
FlushReason flush_reason = flush_req . flush_reason ;
superversion_contexts . clear ( ) ;
superversion_contexts . reserve ( flush_req . size ( ) ) ;
superversion_contexts . reserve (
flush_req . cfd_to_max_mem_id_to_persist . size ( ) ) ;
for ( const auto & iter : flush_req ) {
for ( const auto & iter : flush_req . cfd_to_max_mem_id_to_persist ) {
ColumnFamilyData * cfd = iter . first ;
if ( cfd - > GetMempurgeUsed ( ) ) {
// If imm() contains silent memtables (e.g.: because
@ -2919,7 +2932,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
}
superversion_contexts . emplace_back ( SuperVersionContext ( true ) ) ;
bg_flush_args . emplace_back ( cfd , iter . second ,
& ( superversion_contexts . back ( ) ) ) ;
& ( superversion_contexts . back ( ) ) , flush_reason ) ;
}
if ( ! bg_flush_args . empty ( ) ) {
break ;
@ -2943,9 +2956,14 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
status = FlushMemTablesToOutputFiles ( bg_flush_args , made_progress ,
job_context , log_buffer , thread_pri ) ;
TEST_SYNC_POINT ( " DBImpl::BackgroundFlush:BeforeFlush " ) ;
// All the CFDs in the FlushReq must have the same flush reason, so just
// grab the first one
* reason = bg_flush_args [ 0 ] . cfd_ - > GetFlushReason ( ) ;
// All the CFD/bg_flush_arg in the FlushReq must have the same flush reason, so
// just grab the first one
# ifndef NDEBUG
for ( const auto bg_flush_arg : bg_flush_args ) {
assert ( bg_flush_arg . flush_reason_ = = bg_flush_args [ 0 ] . flush_reason_ ) ;
}
# endif /* !NDEBUG */
* reason = bg_flush_args [ 0 ] . flush_reason_ ;
for ( auto & arg : bg_flush_args ) {
ColumnFamilyData * cfd = arg . cfd_ ;
if ( cfd - > UnrefAndTryDelete ( ) ) {