From e7ea51a8e7d5674203a7e1c96a14604454dd14fa Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 12 Feb 2015 09:54:48 -0800 Subject: [PATCH] Introduce job_id for flush and compaction Summary: It would be good to assing background job their IDs. Two benefits: 1) makes LOGs more readable 2) I might use it in my EventLogger, which will try to make our LOG easier to read/query/visualize Test Plan: ran rocksdb, read the LOG Reviewers: sdong, rven, yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D31617 --- db/column_family.cc | 4 +++- db/compaction_job.cc | 40 ++++++++++++++++---------------- db/compaction_job.h | 4 +++- db/compaction_job_test.cc | 2 +- db/db_filesnapshot.cc | 4 +++- db/db_impl.cc | 49 ++++++++++++++++++++++----------------- db/db_impl.h | 4 ++++ db/db_test.cc | 2 +- db/flush_job.cc | 14 +++++------ db/flush_job_test.cc | 4 ++-- db/forward_iterator.cc | 4 +++- db/job_context.h | 6 ++++- 12 files changed, 80 insertions(+), 57 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index ea3e617e2..9dcc64321 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -75,7 +75,9 @@ ColumnFamilyHandleImpl::ColumnFamilyHandleImpl( ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { if (cfd_ != nullptr) { - JobContext job_context; + // Job id == 0 means that this is not our background process, but rather + // user thread + JobContext job_context(0); mutex_->Lock(); if (cfd_->Unref()) { delete cfd_; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 775dcebec..f3b4834be 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -202,14 +202,15 @@ struct CompactionJob::CompactionState { }; CompactionJob::CompactionJob( - Compaction* compaction, const DBOptions& db_options, + int job_id, Compaction* compaction, const DBOptions& db_options, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, std::atomic* shutting_down, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, SnapshotList* snapshots, bool is_snapshot_supported, std::shared_ptr table_cache, std::function yield_callback) - : compact_(new CompactionState(compaction)), + : job_id_(job_id), + compact_(new CompactionState(compaction)), compaction_stats_(1), db_options_(db_options), mutable_cf_options_(mutable_cf_options), @@ -235,8 +236,8 @@ void CompactionJob::Prepare() { ColumnFamilyData* cfd = compact_->compaction->column_family_data(); assert(cfd != nullptr); LogToBuffer( - log_buffer_, "[%s] Compacting %d@%d + %d@%d files, score %.2f", - cfd->GetName().c_str(), compact_->compaction->num_input_files(0), + log_buffer_, "[%s] [JOB %d] Compacting %d@%d + %d@%d files, score %.2f", + cfd->GetName().c_str(), job_id_, compact_->compaction->num_input_files(0), compact_->compaction->level(), compact_->compaction->num_input_files(1), compact_->compaction->output_level(), compact_->compaction->score()); char scratch[2345]; @@ -324,8 +325,8 @@ Status CompactionJob::Run() { if (!ParseInternalKey(key, &ikey)) { // log error Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, - "[%s] Failed to parse key: %s", - cfd->GetName().c_str(), key.ToString().c_str()); + "[%s] [JOB %d] Failed to parse key: %s", cfd->GetName().c_str(), + job_id_, key.ToString().c_str()); continue; } else { const SliceTransform* transformer = @@ -946,10 +947,11 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { s = iter->status(); delete iter; if (s.ok()) { - Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, - "[%s] Generated table #%" PRIu64 ": %" PRIu64 - " keys, %" PRIu64 " bytes", cfd->GetName().c_str(), - output_number, current_entries, current_bytes); + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 + " keys, %" PRIu64 " bytes", + cfd->GetName().c_str(), job_id_, output_number, current_entries, + current_bytes); } } return s; @@ -964,8 +966,8 @@ Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) { // pick the same files to compact_. if (!versions_->VerifyCompactionFileConsistency(compact_->compaction)) { Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, - "[%s] Compaction %d@%d + %d@%d files aborted", - compact_->compaction->column_family_data()->GetName().c_str(), + "[%s] [JOB %d] Compaction %d@%d + %d@%d files aborted", + compact_->compaction->column_family_data()->GetName().c_str(), job_id_, compact_->compaction->num_input_files(0), compact_->compaction->level(), compact_->compaction->num_input_files(1), compact_->compaction->output_level()); @@ -973,13 +975,11 @@ Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) { } Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[%s] Compacted %d@%d + %d@%d files => %" PRIu64 " bytes", - compact_->compaction->column_family_data()->GetName().c_str(), - compact_->compaction->num_input_files(0), - compact_->compaction->level(), + "[%s] [JOB %d] Compacted %d@%d + %d@%d files => %" PRIu64 " bytes", + compact_->compaction->column_family_data()->GetName().c_str(), job_id_, + compact_->compaction->num_input_files(0), compact_->compaction->level(), compact_->compaction->num_input_files(1), - compact_->compaction->output_level(), - compact_->total_bytes); + compact_->compaction->output_level(), compact_->total_bytes); // Add compaction outputs compact_->compaction->AddInputDeletions(compact_->compaction->edit()); @@ -1043,9 +1043,9 @@ Status CompactionJob::OpenCompactionOutputFile() { if (!s.ok()) { Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, - "[%s] OpenCompactionOutputFiles for table #%" PRIu64 + "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64 " fails at NewWritableFile with status %s", - compact_->compaction->column_family_data()->GetName().c_str(), + compact_->compaction->column_family_data()->GetName().c_str(), job_id_, file_number, s.ToString().c_str()); LogFlush(db_options_.info_log); return s; diff --git a/db/compaction_job.h b/db/compaction_job.h index cc31ece87..2efcf86fc 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -53,7 +53,7 @@ class CompactionJob { // TODO(icanadi) make effort to reduce number of parameters here // IMPORTANT: mutable_cf_options needs to be alive while CompactionJob is // alive - CompactionJob(Compaction* compaction, const DBOptions& db_options, + CompactionJob(int job_id, Compaction* compaction, const DBOptions& db_options, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, std::atomic* shutting_down, LogBuffer* log_buffer, @@ -94,6 +94,8 @@ class CompactionJob { Status OpenCompactionOutputFile(); void CleanupCompaction(const Status& status); + int job_id_; + // CompactionJob state struct CompactionState; CompactionState* compact_; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 2a089dc57..50852fd7d 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -160,7 +160,7 @@ TEST(CompactionJobTest, Simple) { }; LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); mutex_.Lock(); - CompactionJob compaction_job(compaction.get(), db_options_, + CompactionJob compaction_job(0, compaction.get(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, &snapshots, true, diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 4011b4652..b5d4866ae 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -45,7 +45,9 @@ Status DBImpl::DisableFileDeletions() { } Status DBImpl::EnableFileDeletions(bool force) { - JobContext job_context; + // Job id == 0 means that this is not our background process, but rather + // user thread + JobContext job_context(0); bool should_purge_files = false; { InstrumentedMutexLock l(&mutex_); diff --git a/db/db_impl.cc b/db/db_impl.cc index 570928b1e..f51d53403 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -221,6 +221,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) options.env->NowMicros() + db_options_.delete_obsolete_files_period_micros), last_stats_dump_time_microsec_(0), + next_job_id_(1), flush_on_destroy_(false), env_options_(options), #ifndef ROCKSDB_LITE @@ -309,7 +310,7 @@ DBImpl::~DBImpl() { // result, all "live" files can get deleted by accident. However, corrupted // manifest is recoverable by RepairDB(). if (opened_successfully_) { - JobContext job_context; + JobContext job_context(next_job_id_.fetch_add(1)); FindObsoleteFiles(&job_context, true); // manifest number starting from 2 job_context.manifest_file_number = 1; @@ -650,7 +651,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { #ifdef ROCKSDB_LITE Status s = env_->DeleteFile(fname); Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, - "Delete %s type=%d #%" PRIu64 " -- %s\n", + "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", state.job_id, fname.c_str(), type, number, s.ToString().c_str()); #else // not ROCKSDB_LITE if (type == kLogFile && (db_options_.WAL_ttl_seconds > 0 || @@ -659,7 +660,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { } else { Status s = env_->DeleteFile(fname); Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, - "Delete %s type=%d #%" PRIu64 " -- %s\n", + "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", state.job_id, fname.c_str(), type, number, s.ToString().c_str()); } #endif // ROCKSDB_LITE @@ -675,12 +676,12 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { std::string full_path_to_delete = (db_options_.db_log_dir.empty() ? dbname_ : db_options_.db_log_dir) + "/" + to_delete; Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "Delete info log file %s\n", + "[JOB %d] Delete info log file %s\n", state.job_id, full_path_to_delete.c_str()); Status s = env_->DeleteFile(full_path_to_delete); if (!s.ok()) { - Log(InfoLogLevel::ERROR_LEVEL, - db_options_.info_log, "Delete info log file %s FAILED -- %s\n", + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, + "[JOB %d] Delete info log file %s FAILED -- %s\n", state.job_id, to_delete.c_str(), s.ToString().c_str()); } } @@ -693,7 +694,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { void DBImpl::DeleteObsoleteFiles() { mutex_.AssertHeld(); - JobContext job_context; + JobContext job_context(next_job_id_.fetch_add(1)); FindObsoleteFiles(&job_context, true); if (job_context.HaveSomethingToDelete()) { PurgeObsoleteFiles(job_context); @@ -1366,17 +1367,18 @@ Status DBImpl::CompactFilesImpl( // deletion compaction currently not allowed in CompactFiles. assert(!c->IsDeletionCompaction()); - JobContext job_context(true); + JobContext job_context(0, true); auto yield_callback = [&]() { return CallFlushDuringCompaction(c->column_family_data(), *c->mutable_cf_options(), &job_context, &log_buffer); }; CompactionJob compaction_job( - c.get(), db_options_, *c->mutable_cf_options(), env_options_, - versions_.get(), &shutting_down_, &log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_, - is_snapshot_supported_, table_cache_, std::move(yield_callback)); + job_context.job_id, c.get(), db_options_, *c->mutable_cf_options(), + env_options_, versions_.get(), &shutting_down_, &log_buffer, + directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), + stats_, &snapshots_, is_snapshot_supported_, table_cache_, + std::move(yield_callback)); compaction_job.Prepare(); mutex_.Unlock(); @@ -1395,7 +1397,9 @@ Status DBImpl::CompactFilesImpl( } else if (status.IsShutdownInProgress()) { // Ignore compaction errors found during shutting down } else { - Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "Compaction error: %s", + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "[%s] [JOB %d] Compaction error: %s", + c->column_family_data()->GetName().c_str(), job_context.job_id, status.ToString().c_str()); if (db_options_.paranoid_checks && bg_error_.ok()) { bg_error_ = status; @@ -1910,7 +1914,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context, void DBImpl::BackgroundCallFlush() { bool madeProgress = false; - JobContext job_context(true); + JobContext job_context(next_job_id_.fetch_add(1), true); assert(bg_flush_scheduled_); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); @@ -1978,7 +1982,7 @@ void DBImpl::BackgroundCallFlush() { void DBImpl::BackgroundCallCompaction() { bool madeProgress = false; - JobContext job_context(true); + JobContext job_context(next_job_id_.fetch_add(1), true); MaybeDumpStats(); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); @@ -2250,10 +2254,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, log_buffer); }; CompactionJob compaction_job( - c.get(), db_options_, *c->mutable_cf_options(), env_options_, - versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_, - is_snapshot_supported_, table_cache_, std::move(yield_callback)); + job_context->job_id, c.get(), db_options_, *c->mutable_cf_options(), + env_options_, versions_.get(), &shutting_down_, log_buffer, + directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), + stats_, &snapshots_, is_snapshot_supported_, table_cache_, + std::move(yield_callback)); compaction_job.Prepare(); mutex_.Unlock(); status = compaction_job.Run(); @@ -2361,7 +2366,9 @@ static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); if (state->super_version->Unref()) { - JobContext job_context; + // Job id == 0 means that this is not our background process, but rather + // user thread + JobContext job_context(0); state->mu->Lock(); state->super_version->Cleanup(); @@ -3525,7 +3532,7 @@ Status DBImpl::DeleteFile(std::string name) { FileMetaData* metadata; ColumnFamilyData* cfd; VersionEdit edit; - JobContext job_context(true); + JobContext job_context(next_job_id_.fetch_add(1), true); { InstrumentedMutexLock l(&mutex_); status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd); diff --git a/db/db_impl.h b/db/db_impl.h index 86402e817..8cf1a8b37 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -571,6 +571,10 @@ class DBImpl : public DB { // last time stats were dumped to LOG std::atomic last_stats_dump_time_microsec_; + // Each flush or compaction gets its own job id. this counter makes sure + // they're unique + std::atomic next_job_id_; + bool flush_on_destroy_; // Used when disableWAL is true. static const int KEEP_LOG_FILE_NUM = 1000; diff --git a/db/db_test.cc b/db/db_test.cc index fda54707b..04d81d564 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -10357,7 +10357,7 @@ TEST(DBTest, DontDeletePendingOutputs) { // Every time we write to a table file, call FOF/POF with full DB scan. This // will make sure our pending_outputs_ protection work correctly std::function purge_obsolete_files_function = [&]() { - JobContext job_context; + JobContext job_context(0); dbfull()->TEST_LockMutex(); dbfull()->FindObsoleteFiles(&job_context, true /*force*/); dbfull()->TEST_UnlockMutex(); diff --git a/db/flush_job.cc b/db/flush_job.cc index ca1d113db..fcacbf3bd 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -148,8 +148,8 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, Arena arena; for (MemTable* m : mems) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[%s] Flushing memtable with next log file: %" PRIu64 "\n", - cfd_->GetName().c_str(), m->GetNextLogNumber()); + "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n", + cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); memtables.push_back(m->NewIterator(ro, &arena)); } { @@ -157,8 +157,8 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, NewMergingIterator(&cfd_->internal_comparator(), &memtables[0], static_cast(memtables.size()), &arena)); Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[%s] Level-0 flush table #%" PRIu64 ": started", - cfd_->GetName().c_str(), meta.fd.GetNumber()); + "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", + cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber()); s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_, cfd_->table_cache(), iter.get(), &meta, @@ -168,9 +168,9 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, LogFlush(db_options_.info_log); } Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", - cfd_->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), - s.ToString().c_str()); + "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", + cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber(), + meta.fd.GetFileSize(), s.ToString().c_str()); if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { output_file_directory_->Fsync(); } diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index d3e824087..15dd91675 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -81,7 +81,7 @@ class FlushJobTest { }; TEST(FlushJobTest, Empty) { - JobContext job_context; + JobContext job_context(0); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), @@ -93,7 +93,7 @@ TEST(FlushJobTest, Empty) { } TEST(FlushJobTest, NonEmpty) { - JobContext job_context; + JobContext job_context(0); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions()); new_mem->Ref(); diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 93af3c2d4..b90823360 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -158,7 +158,9 @@ void ForwardIterator::Cleanup(bool release_sv) { if (release_sv) { if (sv_ != nullptr && sv_->Unref()) { - JobContext job_context; + // Job id == 0 means that this is not our background process, but rather + // user thread + JobContext job_context(0); db_->mutex_.Lock(); sv_->Cleanup(); db_->FindObsoleteFiles(&job_context, false, true); diff --git a/db/job_context.h b/db/job_context.h index d3aa9b215..befa974eb 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -36,6 +36,9 @@ struct JobContext { } }; + // Unique job id + int job_id; + // a list of all files that we'll consider deleting // (every once in a while this is filled up with all files // in the DB directory) @@ -67,7 +70,8 @@ struct JobContext { uint64_t min_pending_output = 0; - explicit JobContext(bool create_superversion = false) { + explicit JobContext(int _job_id, bool create_superversion = false) { + job_id = _job_id; manifest_file_number = 0; pending_manifest_file_number = 0; log_number = 0;