@ -1591,6 +1591,16 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
ColumnFamilyData * loop_cfd = elem . first ;
ColumnFamilyData * loop_cfd = elem . first ;
loop_cfd - > imm ( ) - > FlushRequested ( ) ;
loop_cfd - > imm ( ) - > FlushRequested ( ) ;
}
}
// If the caller wants to wait for this flush to complete, it indicates
// that the caller expects the ColumnFamilyData not to be free'ed by
// other threads which may drop the column family concurrently.
// Therefore, we increase the cfd's ref count.
if ( flush_options . wait ) {
for ( auto & elem : flush_req ) {
ColumnFamilyData * loop_cfd = elem . first ;
loop_cfd - > Ref ( ) ;
}
}
SchedulePendingFlush ( flush_req , flush_reason ) ;
SchedulePendingFlush ( flush_req , flush_reason ) ;
MaybeScheduleFlushOrCompaction ( ) ;
MaybeScheduleFlushOrCompaction ( ) ;
}
}
@ -1599,7 +1609,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
write_thread_ . ExitUnbatched ( & w ) ;
write_thread_ . ExitUnbatched ( & w ) ;
}
}
}
}
TEST_SYNC_POINT ( " DBImpl::FlushMemTable:AfterScheduleFlush " ) ;
TEST_SYNC_POINT ( " DBImpl::FlushMemTable:BeforeWaitForBgFlush " ) ;
if ( s . ok ( ) & & flush_options . wait ) {
if ( s . ok ( ) & & flush_options . wait ) {
autovector < ColumnFamilyData * > cfds ;
autovector < ColumnFamilyData * > cfds ;
autovector < const uint64_t * > flush_memtable_ids ;
autovector < const uint64_t * > flush_memtable_ids ;
@ -1609,6 +1620,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}
s = WaitForFlushMemTables ( cfds , flush_memtable_ids ,
s = WaitForFlushMemTables ( cfds , flush_memtable_ids ,
( flush_reason = = FlushReason : : kErrorRecovery ) ) ;
( flush_reason = = FlushReason : : kErrorRecovery ) ) ;
for ( auto * tmp_cfd : cfds ) {
if ( tmp_cfd - > Unref ( ) ) {
// Only one thread can reach here.
InstrumentedMutexLock lock_guard ( & mutex_ ) ;
delete tmp_cfd ;
}
}
}
}
TEST_SYNC_POINT ( " FlushMemTableFinished " ) ;
TEST_SYNC_POINT ( " FlushMemTableFinished " ) ;
return s ;
return s ;
@ -1672,6 +1690,15 @@ Status DBImpl::AtomicFlushMemTables(
for ( auto cfd : cfds ) {
for ( auto cfd : cfds ) {
cfd - > imm ( ) - > FlushRequested ( ) ;
cfd - > imm ( ) - > FlushRequested ( ) ;
}
}
// If the caller wants to wait for this flush to complete, it indicates
// that the caller expects the ColumnFamilyData not to be free'ed by
// other threads which may drop the column family concurrently.
// Therefore, we increase the cfd's ref count.
if ( flush_options . wait ) {
for ( auto cfd : cfds ) {
cfd - > Ref ( ) ;
}
}
GenerateFlushRequest ( cfds , & flush_req ) ;
GenerateFlushRequest ( cfds , & flush_req ) ;
SchedulePendingFlush ( flush_req , flush_reason ) ;
SchedulePendingFlush ( flush_req , flush_reason ) ;
MaybeScheduleFlushOrCompaction ( ) ;
MaybeScheduleFlushOrCompaction ( ) ;
@ -1682,7 +1709,7 @@ Status DBImpl::AtomicFlushMemTables(
}
}
}
}
TEST_SYNC_POINT ( " DBImpl::AtomicFlushMemTables:AfterScheduleFlush " ) ;
TEST_SYNC_POINT ( " DBImpl::AtomicFlushMemTables:AfterScheduleFlush " ) ;
TEST_SYNC_POINT ( " DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush " ) ;
if ( s . ok ( ) & & flush_options . wait ) {
if ( s . ok ( ) & & flush_options . wait ) {
autovector < const uint64_t * > flush_memtable_ids ;
autovector < const uint64_t * > flush_memtable_ids ;
for ( auto & iter : flush_req ) {
for ( auto & iter : flush_req ) {
@ -1690,6 +1717,13 @@ Status DBImpl::AtomicFlushMemTables(
}
}
s = WaitForFlushMemTables ( cfds , flush_memtable_ids ,
s = WaitForFlushMemTables ( cfds , flush_memtable_ids ,
( flush_reason = = FlushReason : : kErrorRecovery ) ) ;
( flush_reason = = FlushReason : : kErrorRecovery ) ) ;
for ( auto * cfd : cfds ) {
if ( cfd - > Unref ( ) ) {
// Only one thread can reach here.
InstrumentedMutexLock lock_guard ( & mutex_ ) ;
delete cfd ;
}
}
}
}
return s ;
return s ;
}
}
@ -2151,6 +2185,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
}
}
status = FlushMemTablesToOutputFiles ( bg_flush_args , made_progress ,
status = FlushMemTablesToOutputFiles ( bg_flush_args , made_progress ,
job_context , log_buffer , thread_pri ) ;
job_context , log_buffer , thread_pri ) ;
TEST_SYNC_POINT ( " DBImpl::BackgroundFlush:BeforeFlush " ) ;
// All the CFDs in the FlushReq must have the same flush reason, so just
// All the CFDs in the FlushReq must have the same flush reason, so just
// grab the first one
// grab the first one
* reason = bg_flush_args [ 0 ] . cfd_ - > GetFlushReason ( ) ;
* reason = bg_flush_args [ 0 ] . cfd_ - > GetFlushReason ( ) ;