FlushReason improvement

Summary:
Right now flush reason "SuperVersion Change" covers a few different scenarios which is a bit vague. For example, the following db_bench job should trigger "Write Buffer Full"

> $ TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=fillrandom -write_buffer_size=1048576 -target_file_size_base=1048576 -max_bytes_for_level_base=4194304
$ grep 'flush_reason' /dev/shm/dbbench/LOG
...
2018/03/06-17:30:42.543638 7f2773b99700 EVENT_LOG_v1 {"time_micros": 1520386242543634, "job": 192, "event": "flush_started", "num_memtables": 1, "num_entries": 7006, "num_deletes": 0, "memory_usage": 1018024, "flush_reason": "SuperVersion Change"}
2018/03/06-17:30:42.569541 7f2773b99700 EVENT_LOG_v1 {"time_micros": 1520386242569536, "job": 193, "event": "flush_started", "num_memtables": 1, "num_entries": 7006, "num_deletes": 0, "memory_usage": 1018104, "flush_reason": "SuperVersion Change"}
2018/03/06-17:30:42.596396 7f2773b99700 EVENT_LOG_v1 {"time_micros": 1520386242596392, "job": 194, "event": "flush_started", "num_memtables": 1, "num_entries": 7008, "num_deletes": 0, "memory_usage": 1018048, "flush_reason": "SuperVersion Change"}
2018/03/06-17:30:42.622444 7f2773b99700 EVENT_LOG_v1 {"time_micros": 1520386242622440, "job": 195, "event": "flush_started", "num_memtables": 1, "num_entries": 7006, "num_deletes": 0, "memory_usage": 1018104, "flush_reason": "SuperVersion Change"}

With the fix:
> 2018/03/19-14:40:02.341451 7f11dc257700 EVENT_LOG_v1 {"time_micros": 1521495602341444, "job": 98, "event": "flush_started", "num_memtables": 1, "num_entries": 7009, "num_deletes": 0, "memory_usage": 1018008, "flush_reason": "Write Buffer Full"}
2018/03/19-14:40:02.379655 7f11dc257700 EVENT_LOG_v1 {"time_micros": 1521495602379642, "job": 100, "event": "flush_started", "num_memtables": 1, "num_entries": 7006, "num_deletes": 0, "memory_usage": 1018016, "flush_reason": "Write Buffer Full"}
2018/03/19-14:40:02.418479 7f11dc257700 EVENT_LOG_v1 {"time_micros": 1521495602418474, "job": 101, "event": "flush_started", "num_memtables": 1, "num_entries": 7009, "num_deletes": 0, "memory_usage": 1018104, "flush_reason": "Write Buffer Full"}
2018/03/19-14:40:02.455084 7f11dc257700 EVENT_LOG_v1 {"time_micros": 1521495602455079, "job": 102, "event": "flush_started", "num_memtables": 1, "num_entries": 7009, "num_deletes": 0, "memory_usage": 1018048, "flush_reason": "Write Buffer Full"}
2018/03/19-14:40:02.492293 7f11dc257700 EVENT_LOG_v1 {"time_micros": 1521495602492288, "job": 104, "event": "flush_started", "num_memtables": 1, "num_entries": 7007, "num_deletes": 0, "memory_usage": 1018056, "flush_reason": "Write Buffer Full"}
2018/03/19-14:40:02.528720 7f11dc257700 EVENT_LOG_v1 {"time_micros": 1521495602528715, "job": 105, "event": "flush_started", "num_memtables": 1, "num_entries": 7006, "num_deletes": 0, "memory_usage": 1018104, "flush_reason": "Write Buffer Full"}
2018/03/19-14:40:02.566255 7f11dc257700 EVENT_LOG_v1 {"time_micros": 1521495602566238, "job": 107, "event": "flush_started", "num_memtables": 1, "num_entries": 7009, "num_deletes": 0, "memory_usage": 1018112, "flush_reason": "Write Buffer Full"}
Closes https://github.com/facebook/rocksdb/pull/3627

Differential Revision: D7328772

Pulled By: miasantreble

fbshipit-source-id: 67c94065fbdd36930f09930aad0aaa6d2c152bb8
main
Zhongyi Xie 7 years ago committed by Facebook Github Bot
parent 82137f0ce8
commit 1cbc96d236
  1. 2
      db/column_family.cc
  2. 9
      db/db_impl.cc
  3. 6
      db/db_impl.h
  4. 15
      db/db_impl_compaction_flush.cc
  5. 11
      db/db_impl_write.cc
  6. 12
      db/flush_job.cc
  7. 6
      include/rocksdb/listener.h

@ -386,7 +386,7 @@ ColumnFamilyData::ColumnFamilyData(
next_(nullptr), next_(nullptr),
prev_(nullptr), prev_(nullptr),
log_number_(0), log_number_(0),
flush_reason_(FlushReason::kUnknown), flush_reason_(FlushReason::kOthers),
column_family_set_(column_family_set), column_family_set_(column_family_set),
pending_flush_(false), pending_flush_(false),
pending_compaction_(false), pending_compaction_(false),

@ -2147,7 +2147,8 @@ Status DBImpl::DeleteFile(std::string name) {
if (status.ok()) { if (status.ok()) {
InstallSuperVersionAndScheduleWork( InstallSuperVersionAndScheduleWork(
cfd, &job_context.superversion_context, cfd, &job_context.superversion_context,
*cfd->GetLatestMutableCFOptions()); *cfd->GetLatestMutableCFOptions(),
FlushReason::kDeleteFiles);
} }
FindObsoleteFiles(&job_context, false); FindObsoleteFiles(&job_context, false);
} // lock released here } // lock released here
@ -2231,7 +2232,8 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
if (status.ok()) { if (status.ok()) {
InstallSuperVersionAndScheduleWork( InstallSuperVersionAndScheduleWork(
cfd, &job_context.superversion_context, cfd, &job_context.superversion_context,
*cfd->GetLatestMutableCFOptions()); *cfd->GetLatestMutableCFOptions(),
FlushReason::kDeleteFiles);
} }
for (auto* deleted_file : deleted_files) { for (auto* deleted_file : deleted_files) {
deleted_file->being_compacted = false; deleted_file->being_compacted = false;
@ -2860,7 +2862,8 @@ Status DBImpl::IngestExternalFile(
} }
if (status.ok()) { if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context, InstallSuperVersionAndScheduleWork(cfd, &sv_context,
*mutable_cf_options); *mutable_cf_options,
FlushReason::kExternalFileIngestion);
} }
// Resume writes to the DB // Resume writes to the DB

@ -835,7 +835,8 @@ class DBImpl : public DB {
Status ScheduleFlushes(WriteContext* context); Status ScheduleFlushes(WriteContext* context);
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
FlushReason flush_reason = FlushReason::kOthers);
// Force current memtable contents to be flushed. // Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
@ -1361,7 +1362,8 @@ class DBImpl : public DB {
// state needs flush or compaction. // state needs flush or compaction.
void InstallSuperVersionAndScheduleWork( void InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context, ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options); const MutableCFOptions& mutable_cf_options,
FlushReason flush_reason = FlushReason::kOthers);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
using DB::GetPropertiesOfAllTables; using DB::GetPropertiesOfAllTables;

@ -655,7 +655,7 @@ Status DBImpl::CompactFilesImpl(
if (status.ok()) { if (status.ok()) {
InstallSuperVersionAndScheduleWork( InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context, c->column_family_data(), &job_context->superversion_context,
*c->mutable_cf_options()); *c->mutable_cf_options(), FlushReason::kManualCompaction);
} }
c->ReleaseCompactionFiles(s); c->ReleaseCompactionFiles(s);
@ -891,7 +891,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.", ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
cfh->GetName().c_str()); cfh->GetName().c_str());
Status s = Status s =
FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualCompaction); FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] Manual flush finished, status: %s\n", "[%s] Manual flush finished, status: %s\n",
cfh->GetName().c_str(), s.ToString().c_str()); cfh->GetName().c_str(), s.ToString().c_str());
@ -1727,7 +1727,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&mutex_, directories_.GetDbDir()); &mutex_, directories_.GetDbDir());
InstallSuperVersionAndScheduleWork( InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context, c->column_family_data(), &job_context->superversion_context,
*c->mutable_cf_options()); *c->mutable_cf_options(), FlushReason::kAutoCompaction);
ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n", ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(), c->column_family_data()->GetName().c_str(),
c->num_input_files(0)); c->num_input_files(0));
@ -1774,7 +1774,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// Use latest MutableCFOptions // Use latest MutableCFOptions
InstallSuperVersionAndScheduleWork( InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context, c->column_family_data(), &job_context->superversion_context,
*c->mutable_cf_options()); *c->mutable_cf_options(), FlushReason::kAutoCompaction);
VersionStorageInfo::LevelSummaryStorage tmp; VersionStorageInfo::LevelSummaryStorage tmp;
c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(), c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
@ -1853,7 +1853,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (status.ok()) { if (status.ok()) {
InstallSuperVersionAndScheduleWork( InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context, c->column_family_data(), &job_context->superversion_context,
*c->mutable_cf_options()); *c->mutable_cf_options(), FlushReason::kAutoCompaction);
} }
*made_progress = true; *made_progress = true;
} }
@ -2044,7 +2044,8 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
void DBImpl::InstallSuperVersionAndScheduleWork( void DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context, ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options) { const MutableCFOptions& mutable_cf_options,
FlushReason flush_reason) {
mutex_.AssertHeld(); mutex_.AssertHeld();
// Update max_total_in_memory_state_ // Update max_total_in_memory_state_
@ -2063,7 +2064,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
// Whenever we install new SuperVersion, we might need to issue new flushes or // Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions. // compactions.
SchedulePendingFlush(cfd, FlushReason::kSuperVersionChange); SchedulePendingFlush(cfd, flush_reason);
SchedulePendingCompaction(cfd); SchedulePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();

@ -1066,7 +1066,8 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
} }
} }
if (cfd_picked != nullptr) { if (cfd_picked != nullptr) {
status = SwitchMemtable(cfd_picked, write_context); status = SwitchMemtable(cfd_picked, write_context,
FlushReason::kWriteBufferFull);
if (status.ok()) { if (status.ok()) {
cfd_picked->imm()->FlushRequested(); cfd_picked->imm()->FlushRequested();
SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull); SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull);
@ -1167,7 +1168,7 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
Status DBImpl::ScheduleFlushes(WriteContext* context) { Status DBImpl::ScheduleFlushes(WriteContext* context) {
ColumnFamilyData* cfd; ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
auto status = SwitchMemtable(cfd, context); auto status = SwitchMemtable(cfd, context, FlushReason::kWriteBufferFull);
if (cfd->Unref()) { if (cfd->Unref()) {
delete cfd; delete cfd;
} }
@ -1196,7 +1197,8 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
// REQUIRES: mutex_ is held // REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue // REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
FlushReason flush_reason) {
mutex_.AssertHeld(); mutex_.AssertHeld();
WriteThread::Writer nonmem_w; WriteThread::Writer nonmem_w;
if (two_write_queues_) { if (two_write_queues_) {
@ -1364,7 +1366,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
new_mem->Ref(); new_mem->Ref();
cfd->SetMemtable(new_mem); cfd->SetMemtable(new_mem);
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
mutable_cf_options); mutable_cf_options,
flush_reason);
if (two_write_queues_) { if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w); nonmem_write_thread_.ExitUnbatched(&nonmem_w);
} }

@ -56,8 +56,8 @@ namespace rocksdb {
const char* GetFlushReasonString (FlushReason flush_reason) { const char* GetFlushReasonString (FlushReason flush_reason) {
switch (flush_reason) { switch (flush_reason) {
case FlushReason::kUnknown: case FlushReason::kOthers:
return "Unknown"; return "Other Reasons";
case FlushReason::kGetLiveFiles: case FlushReason::kGetLiveFiles:
return "Get Live Files"; return "Get Live Files";
case FlushReason::kShutDown: case FlushReason::kShutDown:
@ -72,8 +72,12 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
return "Write Buffer Full"; return "Write Buffer Full";
case FlushReason::kTest: case FlushReason::kTest:
return "Test"; return "Test";
case FlushReason::kSuperVersionChange: case FlushReason::kDeleteFiles:
return "SuperVersion Change"; return "Delete Files";
case FlushReason::kAutoCompaction:
return "Auto Compaction";
case FlushReason::kManualFlush:
return "Manual Flush";
default: default:
return "Invalid"; return "Invalid";
} }

@ -83,7 +83,7 @@ enum class CompactionReason {
}; };
enum class FlushReason : int { enum class FlushReason : int {
kUnknown = 0x00, kOthers = 0x00,
kGetLiveFiles = 0x01, kGetLiveFiles = 0x01,
kShutDown = 0x02, kShutDown = 0x02,
kExternalFileIngestion = 0x03, kExternalFileIngestion = 0x03,
@ -91,7 +91,9 @@ enum class FlushReason : int {
kWriteBufferManager = 0x05, kWriteBufferManager = 0x05,
kWriteBufferFull = 0x06, kWriteBufferFull = 0x06,
kTest = 0x07, kTest = 0x07,
kSuperVersionChange = 0x08, kDeleteFiles = 0x08,
kAutoCompaction = 0x09,
kManualFlush = 0x0a,
}; };
enum class BackgroundErrorReason { enum class BackgroundErrorReason {

Loading…
Cancel
Save