diff --git a/db/column_family.cc b/db/column_family.cc index 8857f5c09..c05373293 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -381,6 +381,7 @@ ColumnFamilyData::ColumnFamilyData( next_(nullptr), prev_(nullptr), log_number_(0), + flush_reason_(FlushReason::kUnknown), column_family_set_(column_family_set), pending_flush_(false), pending_compaction_(false), diff --git a/db/column_family.h b/db/column_family.h index ce1fd4738..f89fc10d1 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -202,6 +202,10 @@ class ColumnFamilyData { void SetLogNumber(uint64_t log_number) { log_number_ = log_number; } uint64_t GetLogNumber() const { return log_number_; } + void SetFlushReason(FlushReason flush_reason) { + flush_reason_ = flush_reason; + } + FlushReason GetFlushReason() const { return flush_reason_; } // thread-safe const EnvOptions* soptions() const; const ImmutableCFOptions* ioptions() const { return &ioptions_; } @@ -404,6 +408,8 @@ class ColumnFamilyData { // recovered from uint64_t log_number_; + FlushReason flush_reason_; + // An object that keeps all the compaction stats // and picks the next compaction std::unique_ptr compaction_picker_; diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index dfefd89d8..1f1a0f449 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -93,7 +93,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, } cfd->Ref(); mutex_.Unlock(); - status = FlushMemTable(cfd, FlushOptions()); + status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles); TEST_SYNC_POINT("DBImpl::GetLiveFiles:1"); TEST_SYNC_POINT("DBImpl::GetLiveFiles:2"); mutex_.Lock(); diff --git a/db/db_impl.cc b/db/db_impl.cc index 837eee616..7136a813e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -258,7 +258,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { cfd->Ref(); mutex_.Unlock(); - FlushMemTable(cfd, FlushOptions()); + FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); mutex_.Lock(); cfd->Unref(); } @@ -2826,7 +2826,9 @@ Status DBImpl::IngestExternalFile( &need_flush); if (status.ok() && need_flush) { mutex_.Unlock(); - status = FlushMemTable(cfd, FlushOptions(), true /* writes_stopped */); + status = FlushMemTable(cfd, FlushOptions(), + FlushReason::kExternalFileIngestion, + true /* writes_stopped */); mutex_.Lock(); } } diff --git a/db/db_impl.h b/db/db_impl.h index 011066bf4..364c22f9c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -819,7 +819,7 @@ class DBImpl : public DB { // Force current memtable contents to be flushed. Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, - bool writes_stopped = false); + FlushReason flush_reason, bool writes_stopped = false); // Wait for memtable flushed. // If flush_memtable_id is non-null, wait until the memtable with the ID @@ -881,7 +881,7 @@ class DBImpl : public DB { ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name); void MaybeScheduleFlushOrCompaction(); - void SchedulePendingFlush(ColumnFamilyData* cfd); + void SchedulePendingFlush(ColumnFamilyData* cfd, FlushReason flush_reason); void SchedulePendingCompaction(ColumnFamilyData* cfd); void SchedulePendingPurge(std::string fname, FileType type, uint64_t number, uint32_t path_id, int job_id); @@ -920,7 +920,7 @@ class DBImpl : public DB { // helper functions for adding and removing from flush & compaction queues void AddToCompactionQueue(ColumnFamilyData* cfd); ColumnFamilyData* PopFirstFromCompactionQueue(); - void AddToFlushQueue(ColumnFamilyData* cfd); + void AddToFlushQueue(ColumnFamilyData* cfd, FlushReason flush_reason); ColumnFamilyData* PopFirstFromFlushQueue(); // helper function to call after some of the logs_ were synced diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index d834dddc7..b9e1b9b56 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -23,6 +23,7 @@ #include "util/sync_point.h" namespace rocksdb { + Status DBImpl::SyncClosedLogs(JobContext* job_context) { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); mutex_.AssertHeld(); @@ -222,6 +223,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, info.smallest_seqno = file_meta->smallest_seqno; info.largest_seqno = file_meta->largest_seqno; info.table_properties = prop; + info.flush_reason = cfd->GetFlushReason(); for (auto listener : immutable_db_options_.listeners) { listener->OnFlushBegin(this, info); } @@ -266,6 +268,7 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, info.smallest_seqno = file_meta->smallest_seqno; info.largest_seqno = file_meta->largest_seqno; info.table_properties = prop; + info.flush_reason = cfd->GetFlushReason(); for (auto listener : immutable_db_options_.listeners) { listener->OnFlushCompleted(this, info); } @@ -287,7 +290,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, auto cfd = cfh->cfd(); bool exclusive = options.exclusive_manual_compaction; - Status s = FlushMemTable(cfd, FlushOptions()); + Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kManualCompaction); if (!s.ok()) { LogFlush(immutable_db_options_.info_log); return s; @@ -812,7 +815,8 @@ Status DBImpl::Flush(const FlushOptions& flush_options, auto cfh = reinterpret_cast(column_family); ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.", cfh->GetName().c_str()); - Status s = FlushMemTable(cfh->cfd(), flush_options); + Status s = + FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualCompaction); ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush finished, status: %s\n", cfh->GetName().c_str(), s.ToString().c_str()); @@ -949,7 +953,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& flush_options, - bool writes_stopped) { + FlushReason flush_reason, bool writes_stopped) { Status s; uint64_t flush_memtable_id = 0; { @@ -978,7 +982,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, cfd->imm()->FlushRequested(); // schedule flush - SchedulePendingFlush(cfd); + SchedulePendingFlush(cfd, flush_reason); MaybeScheduleFlushOrCompaction(); } @@ -1134,11 +1138,12 @@ ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() { return cfd; } -void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd) { +void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd, FlushReason flush_reason) { assert(!cfd->pending_flush()); cfd->Ref(); flush_queue_.push_back(cfd); cfd->set_pending_flush(true); + cfd->SetFlushReason(flush_reason); } ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() { @@ -1147,12 +1152,14 @@ ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() { flush_queue_.pop_front(); assert(cfd->pending_flush()); cfd->set_pending_flush(false); + // TODO: need to unset flush reason? return cfd; } -void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd) { +void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd, + FlushReason flush_reason) { if (!cfd->pending_flush() && cfd->imm()->IsFlushPending()) { - AddToFlushQueue(cfd); + AddToFlushQueue(cfd, flush_reason); ++unscheduled_flushes_; } } @@ -1929,7 +1936,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork( // Whenever we install new SuperVersion, we might need to issue new flushes or // compactions. - SchedulePendingFlush(cfd); + SchedulePendingFlush(cfd, FlushReason::kSuperVersionChange); SchedulePendingCompaction(cfd); MaybeScheduleFlushOrCompaction(); diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index ca6772074..32c072b8f 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -103,7 +103,7 @@ Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) { auto cfhi = reinterpret_cast(cfh); cfd = cfhi->cfd(); } - return FlushMemTable(cfd, fo); + return FlushMemTable(cfd, fo, FlushReason::kTest); } Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) { diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index d6a062702..8f625f839 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1019,7 +1019,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { break; } cfd->imm()->FlushRequested(); - SchedulePendingFlush(cfd); + SchedulePendingFlush(cfd, FlushReason::kWriteBufferManager); } } MaybeScheduleFlushOrCompaction(); @@ -1065,7 +1065,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { status = SwitchMemtable(cfd_picked, write_context); if (status.ok()) { cfd_picked->imm()->FlushRequested(); - SchedulePendingFlush(cfd_picked); + SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull); MaybeScheduleFlushOrCompaction(); } } diff --git a/db/flush_job.cc b/db/flush_job.cc index 2c4fb91fe..2181bebb4 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -54,6 +54,32 @@ namespace rocksdb { +const char* GetFlushReasonString (FlushReason flush_reason) { + switch (flush_reason) { + case FlushReason::kUnknown: + return "Unknown"; + case FlushReason::kGetLiveFiles: + return "Get Live Files"; + case FlushReason::kShutDown: + return "Shut down"; + case FlushReason::kExternalFileIngestion: + return "External File Ingestion"; + case FlushReason::kManualCompaction: + return "Manual Compaction"; + case FlushReason::kWriteBufferManager: + return "Write Buffer Manager"; + case FlushReason::kWriteBufferFull: + return "Write Buffer Full"; + case FlushReason::kTest: + return "Test"; + case FlushReason::kSuperVersionChange: + return "SuperVersion Change"; + default: + return "Invalid"; + } +} + + FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, const ImmutableDBOptions& db_options, const MutableCFOptions& mutable_cf_options, @@ -278,12 +304,13 @@ Status FlushJob::WriteLevel0Table() { total_memory_usage += m->ApproximateMemoryUsage(); } - event_logger_->Log() << "job" << job_context_->job_id << "event" - << "flush_started" - << "num_memtables" << mems_.size() << "num_entries" - << total_num_entries << "num_deletes" - << total_num_deletes << "memory_usage" - << total_memory_usage; + event_logger_->Log() + << "job" << job_context_->job_id << "event" + << "flush_started" + << "num_memtables" << mems_.size() << "num_entries" << total_num_entries + << "num_deletes" << total_num_deletes << "memory_usage" + << total_memory_usage << "flush_reason" + << GetFlushReasonString(cfd_->GetFlushReason()); { ScopedArenaIterator iter( diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index feb08e6d8..c32bd1cb6 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -82,6 +82,18 @@ enum class CompactionReason { kBottommostFiles, }; +enum class FlushReason : int { + kUnknown = 0x00, + kGetLiveFiles = 0x01, + kShutDown = 0x02, + kExternalFileIngestion = 0x03, + kManualCompaction = 0x04, + kWriteBufferManager = 0x05, + kWriteBufferFull = 0x06, + kTest = 0x07, + kSuperVersionChange = 0x08, +}; + enum class BackgroundErrorReason { kFlush, kCompaction, @@ -143,6 +155,8 @@ struct FlushJobInfo { SequenceNumber largest_seqno; // Table properties of the table being flushed TableProperties table_properties; + + FlushReason flush_reason; }; struct CompactionJobInfo {