@ -23,6 +23,7 @@
# include "util/sync_point.h"
# include "util/sync_point.h"
namespace rocksdb {
namespace rocksdb {
Status DBImpl : : SyncClosedLogs ( JobContext * job_context ) {
Status DBImpl : : SyncClosedLogs ( JobContext * job_context ) {
TEST_SYNC_POINT ( " DBImpl::SyncClosedLogs:Start " ) ;
TEST_SYNC_POINT ( " DBImpl::SyncClosedLogs:Start " ) ;
mutex_ . AssertHeld ( ) ;
mutex_ . AssertHeld ( ) ;
@ -222,6 +223,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
info . smallest_seqno = file_meta - > smallest_seqno ;
info . smallest_seqno = file_meta - > smallest_seqno ;
info . largest_seqno = file_meta - > largest_seqno ;
info . largest_seqno = file_meta - > largest_seqno ;
info . table_properties = prop ;
info . table_properties = prop ;
info . flush_reason = cfd - > GetFlushReason ( ) ;
for ( auto listener : immutable_db_options_ . listeners ) {
for ( auto listener : immutable_db_options_ . listeners ) {
listener - > OnFlushBegin ( this , info ) ;
listener - > OnFlushBegin ( this , info ) ;
}
}
@ -266,6 +268,7 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
info . smallest_seqno = file_meta - > smallest_seqno ;
info . smallest_seqno = file_meta - > smallest_seqno ;
info . largest_seqno = file_meta - > largest_seqno ;
info . largest_seqno = file_meta - > largest_seqno ;
info . table_properties = prop ;
info . table_properties = prop ;
info . flush_reason = cfd - > GetFlushReason ( ) ;
for ( auto listener : immutable_db_options_ . listeners ) {
for ( auto listener : immutable_db_options_ . listeners ) {
listener - > OnFlushCompleted ( this , info ) ;
listener - > OnFlushCompleted ( this , info ) ;
}
}
@ -287,7 +290,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
auto cfd = cfh - > cfd ( ) ;
auto cfd = cfh - > cfd ( ) ;
bool exclusive = options . exclusive_manual_compaction ;
bool exclusive = options . exclusive_manual_compaction ;
Status s = FlushMemTable ( cfd , FlushOptions ( ) ) ;
Status s = FlushMemTable ( cfd , FlushOptions ( ) , FlushReason : : kManualCompaction ) ;
if ( ! s . ok ( ) ) {
if ( ! s . ok ( ) ) {
LogFlush ( immutable_db_options_ . info_log ) ;
LogFlush ( immutable_db_options_ . info_log ) ;
return s ;
return s ;
@ -812,7 +815,8 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
ROCKS_LOG_INFO ( immutable_db_options_ . info_log , " [%s] Manual flush start. " ,
ROCKS_LOG_INFO ( immutable_db_options_ . info_log , " [%s] Manual flush start. " ,
cfh - > GetName ( ) . c_str ( ) ) ;
cfh - > GetName ( ) . c_str ( ) ) ;
Status s = FlushMemTable ( cfh - > cfd ( ) , flush_options ) ;
Status s =
FlushMemTable ( cfh - > cfd ( ) , flush_options , FlushReason : : kManualCompaction ) ;
ROCKS_LOG_INFO ( immutable_db_options_ . info_log ,
ROCKS_LOG_INFO ( immutable_db_options_ . info_log ,
" [%s] Manual flush finished, status: %s \n " ,
" [%s] Manual flush finished, status: %s \n " ,
cfh - > GetName ( ) . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
cfh - > GetName ( ) . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
@ -949,7 +953,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
Status DBImpl : : FlushMemTable ( ColumnFamilyData * cfd ,
Status DBImpl : : FlushMemTable ( ColumnFamilyData * cfd ,
const FlushOptions & flush_options ,
const FlushOptions & flush_options ,
bool writes_stopped ) {
FlushReason flush_reason , bool writes_stopped ) {
Status s ;
Status s ;
uint64_t flush_memtable_id = 0 ;
uint64_t flush_memtable_id = 0 ;
{
{
@ -978,7 +982,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
cfd - > imm ( ) - > FlushRequested ( ) ;
cfd - > imm ( ) - > FlushRequested ( ) ;
// schedule flush
// schedule flush
SchedulePendingFlush ( cfd ) ;
SchedulePendingFlush ( cfd , flush_reason ) ;
MaybeScheduleFlushOrCompaction ( ) ;
MaybeScheduleFlushOrCompaction ( ) ;
}
}
@ -1134,11 +1138,12 @@ ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
return cfd ;
return cfd ;
}
}
void DBImpl : : AddToFlushQueue ( ColumnFamilyData * cfd ) {
void DBImpl : : AddToFlushQueue ( ColumnFamilyData * cfd , FlushReason flush_reason ) {
assert ( ! cfd - > pending_flush ( ) ) ;
assert ( ! cfd - > pending_flush ( ) ) ;
cfd - > Ref ( ) ;
cfd - > Ref ( ) ;
flush_queue_ . push_back ( cfd ) ;
flush_queue_ . push_back ( cfd ) ;
cfd - > set_pending_flush ( true ) ;
cfd - > set_pending_flush ( true ) ;
cfd - > SetFlushReason ( flush_reason ) ;
}
}
ColumnFamilyData * DBImpl : : PopFirstFromFlushQueue ( ) {
ColumnFamilyData * DBImpl : : PopFirstFromFlushQueue ( ) {
@ -1147,12 +1152,14 @@ ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() {
flush_queue_ . pop_front ( ) ;
flush_queue_ . pop_front ( ) ;
assert ( cfd - > pending_flush ( ) ) ;
assert ( cfd - > pending_flush ( ) ) ;
cfd - > set_pending_flush ( false ) ;
cfd - > set_pending_flush ( false ) ;
// TODO: need to unset flush reason?
return cfd ;
return cfd ;
}
}
void DBImpl : : SchedulePendingFlush ( ColumnFamilyData * cfd ) {
void DBImpl : : SchedulePendingFlush ( ColumnFamilyData * cfd ,
FlushReason flush_reason ) {
if ( ! cfd - > pending_flush ( ) & & cfd - > imm ( ) - > IsFlushPending ( ) ) {
if ( ! cfd - > pending_flush ( ) & & cfd - > imm ( ) - > IsFlushPending ( ) ) {
AddToFlushQueue ( cfd ) ;
AddToFlushQueue ( cfd , flush_reason ) ;
+ + unscheduled_flushes_ ;
+ + unscheduled_flushes_ ;
}
}
}
}
@ -1929,7 +1936,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
// Whenever we install new SuperVersion, we might need to issue new flushes or
// Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions.
// compactions.
SchedulePendingFlush ( cfd ) ;
SchedulePendingFlush ( cfd , FlushReason : : kSuperVersionChange ) ;
SchedulePendingCompaction ( cfd ) ;
SchedulePendingCompaction ( cfd ) ;
MaybeScheduleFlushOrCompaction ( ) ;
MaybeScheduleFlushOrCompaction ( ) ;