Introduce job_id for flush and compaction

Summary:
It would be good to assing background job their IDs. Two benefits:
1) makes LOGs more readable
2) I might use it in my EventLogger, which will try to make our LOG easier to read/query/visualize

Test Plan: ran rocksdb, read the LOG

Reviewers: sdong, rven, yhchiang

Reviewed By: yhchiang

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D31617
main
Igor Canadi 10 years ago
parent 2d62e80511
commit e7ea51a8e7
  1. 4
      db/column_family.cc
  2. 40
      db/compaction_job.cc
  3. 4
      db/compaction_job.h
  4. 2
      db/compaction_job_test.cc
  5. 4
      db/db_filesnapshot.cc
  6. 49
      db/db_impl.cc
  7. 4
      db/db_impl.h
  8. 2
      db/db_test.cc
  9. 14
      db/flush_job.cc
  10. 4
      db/flush_job_test.cc
  11. 4
      db/forward_iterator.cc
  12. 6
      db/job_context.h

@ -75,7 +75,9 @@ ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
if (cfd_ != nullptr) {
JobContext job_context;
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
mutex_->Lock();
if (cfd_->Unref()) {
delete cfd_;

@ -202,14 +202,15 @@ struct CompactionJob::CompactionState {
};
CompactionJob::CompactionJob(
Compaction* compaction, const DBOptions& db_options,
int job_id, Compaction* compaction, const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
VersionSet* versions, std::atomic<bool>* shutting_down,
LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory,
Statistics* stats, SnapshotList* snapshots, bool is_snapshot_supported,
std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback)
: compact_(new CompactionState(compaction)),
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_stats_(1),
db_options_(db_options),
mutable_cf_options_(mutable_cf_options),
@ -235,8 +236,8 @@ void CompactionJob::Prepare() {
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
assert(cfd != nullptr);
LogToBuffer(
log_buffer_, "[%s] Compacting %d@%d + %d@%d files, score %.2f",
cfd->GetName().c_str(), compact_->compaction->num_input_files(0),
log_buffer_, "[%s] [JOB %d] Compacting %d@%d + %d@%d files, score %.2f",
cfd->GetName().c_str(), job_id_, compact_->compaction->num_input_files(0),
compact_->compaction->level(), compact_->compaction->num_input_files(1),
compact_->compaction->output_level(), compact_->compaction->score());
char scratch[2345];
@ -324,8 +325,8 @@ Status CompactionJob::Run() {
if (!ParseInternalKey(key, &ikey)) {
// log error
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"[%s] Failed to parse key: %s",
cfd->GetName().c_str(), key.ToString().c_str());
"[%s] [JOB %d] Failed to parse key: %s", cfd->GetName().c_str(),
job_id_, key.ToString().c_str());
continue;
} else {
const SliceTransform* transformer =
@ -946,10 +947,11 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
s = iter->status();
delete iter;
if (s.ok()) {
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"[%s] Generated table #%" PRIu64 ": %" PRIu64
" keys, %" PRIu64 " bytes", cfd->GetName().c_str(),
output_number, current_entries, current_bytes);
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
" keys, %" PRIu64 " bytes",
cfd->GetName().c_str(), job_id_, output_number, current_entries,
current_bytes);
}
}
return s;
@ -964,8 +966,8 @@ Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) {
// pick the same files to compact_.
if (!versions_->VerifyCompactionFileConsistency(compact_->compaction)) {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"[%s] Compaction %d@%d + %d@%d files aborted",
compact_->compaction->column_family_data()->GetName().c_str(),
"[%s] [JOB %d] Compaction %d@%d + %d@%d files aborted",
compact_->compaction->column_family_data()->GetName().c_str(), job_id_,
compact_->compaction->num_input_files(0), compact_->compaction->level(),
compact_->compaction->num_input_files(1),
compact_->compaction->output_level());
@ -973,13 +975,11 @@ Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) {
}
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] Compacted %d@%d + %d@%d files => %" PRIu64 " bytes",
compact_->compaction->column_family_data()->GetName().c_str(),
compact_->compaction->num_input_files(0),
compact_->compaction->level(),
"[%s] [JOB %d] Compacted %d@%d + %d@%d files => %" PRIu64 " bytes",
compact_->compaction->column_family_data()->GetName().c_str(), job_id_,
compact_->compaction->num_input_files(0), compact_->compaction->level(),
compact_->compaction->num_input_files(1),
compact_->compaction->output_level(),
compact_->total_bytes);
compact_->compaction->output_level(), compact_->total_bytes);
// Add compaction outputs
compact_->compaction->AddInputDeletions(compact_->compaction->edit());
@ -1043,9 +1043,9 @@ Status CompactionJob::OpenCompactionOutputFile() {
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"[%s] OpenCompactionOutputFiles for table #%" PRIu64
"[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
" fails at NewWritableFile with status %s",
compact_->compaction->column_family_data()->GetName().c_str(),
compact_->compaction->column_family_data()->GetName().c_str(), job_id_,
file_number, s.ToString().c_str());
LogFlush(db_options_.info_log);
return s;

@ -53,7 +53,7 @@ class CompactionJob {
// TODO(icanadi) make effort to reduce number of parameters here
// IMPORTANT: mutable_cf_options needs to be alive while CompactionJob is
// alive
CompactionJob(Compaction* compaction, const DBOptions& db_options,
CompactionJob(int job_id, Compaction* compaction, const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
@ -94,6 +94,8 @@ class CompactionJob {
Status OpenCompactionOutputFile();
void CleanupCompaction(const Status& status);
int job_id_;
// CompactionJob state
struct CompactionState;
CompactionState* compact_;

@ -160,7 +160,7 @@ TEST(CompactionJobTest, Simple) {
};
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
mutex_.Lock();
CompactionJob compaction_job(compaction.get(), db_options_,
CompactionJob compaction_job(0, compaction.get(), db_options_,
*cfd->GetLatestMutableCFOptions(), env_options_,
versions_.get(), &shutting_down_, &log_buffer,
nullptr, nullptr, nullptr, &snapshots, true,

@ -45,7 +45,9 @@ Status DBImpl::DisableFileDeletions() {
}
Status DBImpl::EnableFileDeletions(bool force) {
JobContext job_context;
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
bool should_purge_files = false;
{
InstrumentedMutexLock l(&mutex_);

@ -221,6 +221,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
options.env->NowMicros() +
db_options_.delete_obsolete_files_period_micros),
last_stats_dump_time_microsec_(0),
next_job_id_(1),
flush_on_destroy_(false),
env_options_(options),
#ifndef ROCKSDB_LITE
@ -309,7 +310,7 @@ DBImpl::~DBImpl() {
// result, all "live" files can get deleted by accident. However, corrupted
// manifest is recoverable by RepairDB().
if (opened_successfully_) {
JobContext job_context;
JobContext job_context(next_job_id_.fetch_add(1));
FindObsoleteFiles(&job_context, true);
// manifest number starting from 2
job_context.manifest_file_number = 1;
@ -650,7 +651,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
#ifdef ROCKSDB_LITE
Status s = env_->DeleteFile(fname);
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"Delete %s type=%d #%" PRIu64 " -- %s\n",
"[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", state.job_id,
fname.c_str(), type, number, s.ToString().c_str());
#else // not ROCKSDB_LITE
if (type == kLogFile && (db_options_.WAL_ttl_seconds > 0 ||
@ -659,7 +660,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
} else {
Status s = env_->DeleteFile(fname);
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"Delete %s type=%d #%" PRIu64 " -- %s\n",
"[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", state.job_id,
fname.c_str(), type, number, s.ToString().c_str());
}
#endif // ROCKSDB_LITE
@ -675,12 +676,12 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
std::string full_path_to_delete = (db_options_.db_log_dir.empty() ?
dbname_ : db_options_.db_log_dir) + "/" + to_delete;
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Delete info log file %s\n",
"[JOB %d] Delete info log file %s\n", state.job_id,
full_path_to_delete.c_str());
Status s = env_->DeleteFile(full_path_to_delete);
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL,
db_options_.info_log, "Delete info log file %s FAILED -- %s\n",
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"[JOB %d] Delete info log file %s FAILED -- %s\n", state.job_id,
to_delete.c_str(), s.ToString().c_str());
}
}
@ -693,7 +694,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
void DBImpl::DeleteObsoleteFiles() {
mutex_.AssertHeld();
JobContext job_context;
JobContext job_context(next_job_id_.fetch_add(1));
FindObsoleteFiles(&job_context, true);
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
@ -1366,17 +1367,18 @@ Status DBImpl::CompactFilesImpl(
// deletion compaction currently not allowed in CompactFiles.
assert(!c->IsDeletionCompaction());
JobContext job_context(true);
JobContext job_context(0, true);
auto yield_callback = [&]() {
return CallFlushDuringCompaction(c->column_family_data(),
*c->mutable_cf_options(), &job_context,
&log_buffer);
};
CompactionJob compaction_job(
c.get(), db_options_, *c->mutable_cf_options(), env_options_,
versions_.get(), &shutting_down_, &log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_,
is_snapshot_supported_, table_cache_, std::move(yield_callback));
job_context.job_id, c.get(), db_options_, *c->mutable_cf_options(),
env_options_, versions_.get(), &shutting_down_, &log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()),
stats_, &snapshots_, is_snapshot_supported_, table_cache_,
std::move(yield_callback));
compaction_job.Prepare();
mutex_.Unlock();
@ -1395,7 +1397,9 @@ Status DBImpl::CompactFilesImpl(
} else if (status.IsShutdownInProgress()) {
// Ignore compaction errors found during shutting down
} else {
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "Compaction error: %s",
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Compaction error: %s",
c->column_family_data()->GetName().c_str(), job_context.job_id,
status.ToString().c_str());
if (db_options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
@ -1910,7 +1914,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context,
void DBImpl::BackgroundCallFlush() {
bool madeProgress = false;
JobContext job_context(true);
JobContext job_context(next_job_id_.fetch_add(1), true);
assert(bg_flush_scheduled_);
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
@ -1978,7 +1982,7 @@ void DBImpl::BackgroundCallFlush() {
void DBImpl::BackgroundCallCompaction() {
bool madeProgress = false;
JobContext job_context(true);
JobContext job_context(next_job_id_.fetch_add(1), true);
MaybeDumpStats();
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
@ -2250,10 +2254,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
log_buffer);
};
CompactionJob compaction_job(
c.get(), db_options_, *c->mutable_cf_options(), env_options_,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_,
is_snapshot_supported_, table_cache_, std::move(yield_callback));
job_context->job_id, c.get(), db_options_, *c->mutable_cf_options(),
env_options_, versions_.get(), &shutting_down_, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()),
stats_, &snapshots_, is_snapshot_supported_, table_cache_,
std::move(yield_callback));
compaction_job.Prepare();
mutex_.Unlock();
status = compaction_job.Run();
@ -2361,7 +2366,9 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
if (state->super_version->Unref()) {
JobContext job_context;
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
state->mu->Lock();
state->super_version->Cleanup();
@ -3525,7 +3532,7 @@ Status DBImpl::DeleteFile(std::string name) {
FileMetaData* metadata;
ColumnFamilyData* cfd;
VersionEdit edit;
JobContext job_context(true);
JobContext job_context(next_job_id_.fetch_add(1), true);
{
InstrumentedMutexLock l(&mutex_);
status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);

@ -571,6 +571,10 @@ class DBImpl : public DB {
// last time stats were dumped to LOG
std::atomic<uint64_t> last_stats_dump_time_microsec_;
// Each flush or compaction gets its own job id. this counter makes sure
// they're unique
std::atomic<int> next_job_id_;
bool flush_on_destroy_; // Used when disableWAL is true.
static const int KEEP_LOG_FILE_NUM = 1000;

@ -10357,7 +10357,7 @@ TEST(DBTest, DontDeletePendingOutputs) {
// Every time we write to a table file, call FOF/POF with full DB scan. This
// will make sure our pending_outputs_ protection work correctly
std::function<void()> purge_obsolete_files_function = [&]() {
JobContext job_context;
JobContext job_context(0);
dbfull()->TEST_LockMutex();
dbfull()->FindObsoleteFiles(&job_context, true /*force*/);
dbfull()->TEST_UnlockMutex();

@ -148,8 +148,8 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
Arena arena;
for (MemTable* m : mems) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), m->GetNextLogNumber());
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ro, &arena));
}
{
@ -157,8 +157,8 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
static_cast<int>(memtables.size()), &arena));
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] Level-0 flush table #%" PRIu64 ": started",
cfd_->GetName().c_str(), meta.fd.GetNumber());
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber());
s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
cfd_->table_cache(), iter.get(), &meta,
@ -168,9 +168,9 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
LogFlush(db_options_.info_log);
}
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd_->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(),
s.ToString().c_str());
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber(),
meta.fd.GetFileSize(), s.ToString().c_str());
if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {
output_file_directory_->Fsync();
}

@ -81,7 +81,7 @@ class FlushJobTest {
};
TEST(FlushJobTest, Empty) {
JobContext job_context;
JobContext job_context(0);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
@ -93,7 +93,7 @@ TEST(FlushJobTest, Empty) {
}
TEST(FlushJobTest, NonEmpty) {
JobContext job_context;
JobContext job_context(0);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions());
new_mem->Ref();

@ -158,7 +158,9 @@ void ForwardIterator::Cleanup(bool release_sv) {
if (release_sv) {
if (sv_ != nullptr && sv_->Unref()) {
JobContext job_context;
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
db_->mutex_.Lock();
sv_->Cleanup();
db_->FindObsoleteFiles(&job_context, false, true);

@ -36,6 +36,9 @@ struct JobContext {
}
};
// Unique job id
int job_id;
// a list of all files that we'll consider deleting
// (every once in a while this is filled up with all files
// in the DB directory)
@ -67,7 +70,8 @@ struct JobContext {
uint64_t min_pending_output = 0;
explicit JobContext(bool create_superversion = false) {
explicit JobContext(int _job_id, bool create_superversion = false) {
job_id = _job_id;
manifest_file_number = 0;
pending_manifest_file_number = 0;
log_number = 0;

Loading…
Cancel
Save