Column family logging

Summary:
Now that we have column families involved, we need to add extra context to every log message. They now start with "[column family name] log message"

Also added some logging that I think would be useful, like level summary after every flush (I often needed that when going through the logs).

Test Plan: make check + ran db_bench to confirm I'm happy with log output

Reviewers: dhruba, haobo, ljin, yhchiang, sdong

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D18303
main
Igor Canadi 11 years ago
parent 46b3076c91
commit ad3cd39ccd
  1. 89
      db/compaction_picker.cc
  2. 135
      db/db_impl.cc
  3. 10
      db/memtable_list.cc
  4. 21
      db/version_set.cc

@ -180,7 +180,8 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) {
int parent_index = -1; int parent_index = -1;
if (c->inputs_[0].empty()) { if (c->inputs_[0].empty()) {
Log(options_->info_log, Log(options_->info_log,
"ExpandWhileOverlapping() failure because zero input files"); "[%s] ExpandWhileOverlapping() failure because zero input files",
c->column_family_data()->GetName().c_str());
} }
if (c->inputs_[0].empty() || FilesInCompaction(c->inputs_[0]) || if (c->inputs_[0].empty() || FilesInCompaction(c->inputs_[0]) ||
(c->level() != c->output_level() && (c->level() != c->output_level() &&
@ -275,9 +276,10 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
if (expanded1.size() == c->inputs_[1].size() && if (expanded1.size() == c->inputs_[1].size() &&
!FilesInCompaction(expanded1)) { !FilesInCompaction(expanded1)) {
Log(options_->info_log, Log(options_->info_log,
"Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)" "[%s] Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu "
"\n", "bytes)\n",
(unsigned long)level, (unsigned long)(c->inputs_[0].size()), c->column_family_data()->GetName().c_str(), (unsigned long)level,
(unsigned long)(c->inputs_[0].size()),
(unsigned long)(c->inputs_[1].size()), (unsigned long)inputs0_size, (unsigned long)(c->inputs_[1].size()), (unsigned long)inputs0_size,
(unsigned long)inputs1_size, (unsigned long)(expanded0.size()), (unsigned long)inputs1_size, (unsigned long)(expanded0.size()),
(unsigned long)(expanded1.size()), (unsigned long)expanded0_size, (unsigned long)(expanded1.size()), (unsigned long)expanded0_size,
@ -345,7 +347,9 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
c->inputs_[0] = inputs; c->inputs_[0] = inputs;
if (ExpandWhileOverlapping(c) == false) { if (ExpandWhileOverlapping(c) == false) {
delete c; delete c;
Log(options_->info_log, "Could not compact due to expansion failure.\n"); Log(options_->info_log,
"[%s] Could not compact due to expansion failure.\n",
version->cfd_->GetName().c_str());
return nullptr; return nullptr;
} }
@ -515,10 +519,6 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
nextIndex = i; nextIndex = i;
} }
//if (i > Version::number_of_files_to_sort_) {
// Log(options_->info_log, "XXX Looking at index %d", i);
//}
// Do not pick this file if its parents at level+1 are being compacted. // Do not pick this file if its parents at level+1 are being compacted.
// Maybe we can avoid redoing this work in SetupOtherInputs // Maybe we can avoid redoing this work in SetupOtherInputs
int parent_index = -1; int parent_index = -1;
@ -553,19 +553,21 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
if ((version->files_[level].size() < if ((version->files_[level].size() <
(unsigned int)options_->level0_file_num_compaction_trigger)) { (unsigned int)options_->level0_file_num_compaction_trigger)) {
LogToBuffer(log_buffer, "Universal: nothing to do\n"); LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n",
version->cfd_->GetName().c_str());
return nullptr; return nullptr;
} }
Version::FileSummaryStorage tmp; Version::FileSummaryStorage tmp;
LogToBuffer(log_buffer, "Universal: candidate files(%zu): %s\n", LogToBuffer(log_buffer, "[%s] Universal: candidate files(%zu): %s\n",
version->files_[level].size(), version->cfd_->GetName().c_str(), version->files_[level].size(),
version->LevelFileSummary(&tmp, 0)); version->LevelFileSummary(&tmp, 0));
// Check for size amplification first. // Check for size amplification first.
Compaction* c; Compaction* c;
if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) != if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) !=
nullptr) { nullptr) {
LogToBuffer(log_buffer, "Universal: compacting for size amp\n"); LogToBuffer(log_buffer, "[%s] Universal: compacting for size amp\n",
version->cfd_->GetName().c_str());
} else { } else {
// Size amplification is within limits. Try reducing read // Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios. // amplification while maintaining file size ratios.
@ -573,7 +575,8 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX, if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX,
log_buffer)) != nullptr) { log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "Universal: compacting for size ratio\n"); LogToBuffer(log_buffer, "[%s] Universal: compacting for size ratio\n",
version->cfd_->GetName().c_str());
} else { } else {
// Size amplification and file size ratios are within configured limits. // Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force // If max read amplification is exceeding configured limits, then force
@ -583,7 +586,8 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
options_->level0_file_num_compaction_trigger; options_->level0_file_num_compaction_trigger;
if ((c = PickCompactionUniversalReadAmp( if ((c = PickCompactionUniversalReadAmp(
version, score, UINT_MAX, num_files, log_buffer)) != nullptr) { version, score, UINT_MAX, num_files, log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "Universal: compacting for file num\n"); LogToBuffer(log_buffer, "[%s] Universal: compacting for file num\n",
version->cfd_->GetName().c_str());
} }
} }
} }
@ -671,9 +675,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
candidate_count = 1; candidate_count = 1;
break; break;
} }
LogToBuffer(log_buffer, LogToBuffer(
"Universal: file %lu[%d] being compacted, skipping", log_buffer, "[%s] Universal: file %lu[%d] being compacted, skipping",
(unsigned long)f->number, loop); version->cfd_->GetName().c_str(), (unsigned long)f->number, loop);
f = nullptr; f = nullptr;
} }
@ -681,8 +685,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// first candidate to be compacted. // first candidate to be compacted.
uint64_t candidate_size = f != nullptr? f->file_size : 0; uint64_t candidate_size = f != nullptr? f->file_size : 0;
if (f != nullptr) { if (f != nullptr) {
LogToBuffer(log_buffer, "Universal: Possible candidate file %lu[%d].", LogToBuffer(
(unsigned long)f->number, loop); log_buffer, "[%s] Universal: Possible candidate file %lu[%d].",
version->cfd_->GetName().c_str(), (unsigned long)f->number, loop);
} }
// Check if the suceeding files need compaction. // Check if the suceeding files need compaction.
@ -733,9 +738,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
int index = file_by_time[i]; int index = file_by_time[i];
FileMetaData* f = version->files_[level][index]; FileMetaData* f = version->files_[level][index];
LogToBuffer(log_buffer, LogToBuffer(log_buffer,
"Universal: Skipping file %lu[%d] with size %lu %d\n", "[%s] Universal: Skipping file %lu[%d] with size %lu %d\n",
(unsigned long)f->number, i, (unsigned long)f->file_size, version->cfd_->GetName().c_str(), (unsigned long)f->number,
f->being_compacted); i, (unsigned long)f->file_size, f->being_compacted);
} }
} }
} }
@ -769,8 +774,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
int index = file_by_time[i]; int index = file_by_time[i];
FileMetaData* f = c->input_version_->files_[level][index]; FileMetaData* f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f); c->inputs_[0].push_back(f);
LogToBuffer(log_buffer, "Universal: Picking file %lu[%d] with size %lu\n", LogToBuffer(log_buffer,
(unsigned long)f->number, i, (unsigned long)f->file_size); "[%s] Universal: Picking file %lu[%d] with size %lu\n",
version->cfd_->GetName().c_str(), (unsigned long)f->number, i,
(unsigned long)f->file_size);
} }
return c; return c;
} }
@ -806,17 +813,19 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
start_index = loop; // Consider this as the first candidate. start_index = loop; // Consider this as the first candidate.
break; break;
} }
LogToBuffer(log_buffer, "Universal: skipping file %lu[%d] compacted %s", LogToBuffer(log_buffer,
(unsigned long)f->number, loop, "[%s] Universal: skipping file %lu[%d] compacted %s",
" cannot be a candidate to reduce size amp.\n"); version->cfd_->GetName().c_str(), (unsigned long)f->number,
loop, " cannot be a candidate to reduce size amp.\n");
f = nullptr; f = nullptr;
} }
if (f == nullptr) { if (f == nullptr) {
return nullptr; // no candidate files return nullptr; // no candidate files
} }
LogToBuffer(log_buffer, "Universal: First candidate file %lu[%d] %s", LogToBuffer(log_buffer, "[%s] Universal: First candidate file %lu[%d] %s",
(unsigned long)f->number, start_index, " to reduce size amp.\n"); version->cfd_->GetName().c_str(), (unsigned long)f->number,
start_index, " to reduce size amp.\n");
// keep adding up all the remaining files // keep adding up all the remaining files
for (unsigned int loop = start_index; loop < file_by_time.size() - 1; for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
@ -825,8 +834,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
f = version->files_[level][index]; f = version->files_[level][index];
if (f->being_compacted) { if (f->being_compacted) {
LogToBuffer( LogToBuffer(
log_buffer, "Universal: Possible candidate file %lu[%d] %s.", log_buffer, "[%s] Universal: Possible candidate file %lu[%d] %s.",
(unsigned long)f->number, loop, version->cfd_->GetName().c_str(), (unsigned long)f->number, loop,
" is already being compacted. No size amp reduction possible.\n"); " is already being compacted. No size amp reduction possible.\n");
return nullptr; return nullptr;
} }
@ -843,17 +852,18 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
// size amplification = percentage of additional size // size amplification = percentage of additional size
if (candidate_size * 100 < ratio * earliest_file_size) { if (candidate_size * 100 < ratio * earliest_file_size) {
LogToBuffer(log_buffer, LogToBuffer(
"Universal: size amp not needed. newer-files-total-size %lu " log_buffer,
"[%s] Universal: size amp not needed. newer-files-total-size %lu "
"earliest-file-size %lu", "earliest-file-size %lu",
(unsigned long)candidate_size, version->cfd_->GetName().c_str(), (unsigned long)candidate_size,
(unsigned long)earliest_file_size); (unsigned long)earliest_file_size);
return nullptr; return nullptr;
} else { } else {
LogToBuffer(log_buffer, LogToBuffer(log_buffer,
"Universal: size amp needed. newer-files-total-size %lu " "[%s] Universal: size amp needed. newer-files-total-size %lu "
"earliest-file-size %lu", "earliest-file-size %lu",
(unsigned long)candidate_size, version->cfd_->GetName().c_str(), (unsigned long)candidate_size,
(unsigned long)earliest_file_size); (unsigned long)earliest_file_size);
} }
assert(start_index >= 0 && start_index < file_by_time.size() - 1); assert(start_index >= 0 && start_index < file_by_time.size() - 1);
@ -869,8 +879,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
f = c->input_version_->files_[level][index]; f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f); c->inputs_[0].push_back(f);
LogToBuffer(log_buffer, LogToBuffer(log_buffer,
"Universal: size amp picking file %lu[%d] with size %lu", "[%s] Universal: size amp picking file %lu[%d] with size %lu",
(unsigned long)f->number, index, (unsigned long)f->file_size); version->cfd_->GetName().c_str(), (unsigned long)f->number,
index, (unsigned long)f->file_size);
} }
return c; return c;
} }

@ -1343,8 +1343,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber newest_snapshot = snapshots_.GetNewest();
const SequenceNumber earliest_seqno_in_memtable = const SequenceNumber earliest_seqno_in_memtable =
mem->GetFirstSequenceNumber(); mem->GetFirstSequenceNumber();
Log(options_.info_log, "Level-0 table #%lu: started", Log(options_.info_log, "[%s] Level-0 table #%lu: started",
(unsigned long) meta.number); cfd->GetName().c_str(), (unsigned long)meta.number);
Status s; Status s;
{ {
@ -1357,10 +1357,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
mutex_.Lock(); mutex_.Lock();
} }
Log(options_.info_log, "Level-0 table #%lu: %lu bytes %s", Log(options_.info_log, "[%s] Level-0 table #%lu: %lu bytes %s",
(unsigned long) meta.number, cfd->GetName().c_str(), (unsigned long)meta.number,
(unsigned long) meta.file_size, (unsigned long)meta.file_size, s.ToString().c_str());
s.ToString().c_str());
delete iter; delete iter;
pending_outputs_.erase(meta.number); pending_outputs_.erase(meta.number);
@ -1404,15 +1403,14 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
log_buffer->FlushBufferToLog(); log_buffer->FlushBufferToLog();
std::vector<Iterator*> memtables; std::vector<Iterator*> memtables;
for (MemTable* m : mems) { for (MemTable* m : mems) {
Log(options_.info_log, Log(options_.info_log, "[%s] Flushing memtable with next log file: %lu\n",
"[CF %u] Flushing memtable with next log file: %lu\n", cfd->GetID(), cfd->GetName().c_str(), (unsigned long)m->GetNextLogNumber());
(unsigned long)m->GetNextLogNumber());
memtables.push_back(m->NewIterator()); memtables.push_back(m->NewIterator());
} }
Iterator* iter = NewMergingIterator(&cfd->internal_comparator(), Iterator* iter = NewMergingIterator(&cfd->internal_comparator(),
&memtables[0], memtables.size()); &memtables[0], memtables.size());
Log(options_.info_log, "Level-0 flush table #%lu: started", Log(options_.info_log, "[%s] Level-0 flush table #%lu: started",
(unsigned long)meta.number); cfd->GetName().c_str(), (unsigned long)meta.number);
s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, s = BuildTable(dbname_, env_, *cfd->options(), storage_options_,
cfd->table_cache(), iter, &meta, cfd->internal_comparator(), cfd->table_cache(), iter, &meta, cfd->internal_comparator(),
@ -1420,10 +1418,13 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
GetCompressionFlush(*cfd->options())); GetCompressionFlush(*cfd->options()));
LogFlush(options_.info_log); LogFlush(options_.info_log);
delete iter; delete iter;
Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s", Log(options_.info_log, "[%s] Level-0 flush table #%lu: %lu bytes %s",
(unsigned long) meta.number, cfd->GetName().c_str(), (unsigned long)meta.number,
(unsigned long) meta.file_size, (unsigned long)meta.file_size, s.ToString().c_str());
s.ToString().c_str());
Version::LevelSummaryStorage tmp;
Log(options_.info_log, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
cfd->current()->LevelSummary(&tmp));
if (!options_.disableDataSync) { if (!options_.disableDataSync) {
db_directory_->Fsync(); db_directory_->Fsync();
} }
@ -1483,7 +1484,8 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
autovector<MemTable*> mems; autovector<MemTable*> mems;
cfd->imm()->PickMemtablesToFlush(&mems); cfd->imm()->PickMemtablesToFlush(&mems);
if (mems.empty()) { if (mems.empty()) {
LogToBuffer(log_buffer, "Nothing in memstore to flush"); LogToBuffer(log_buffer, "[%s] Nothing in memtable to flush",
cfd->GetName().c_str());
return Status::OK(); return Status::OK();
} }
@ -1644,7 +1646,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
Status status; Status status;
if (to_level < level) { if (to_level < level) {
Log(options_.info_log, "Before refitting:\n%s", Log(options_.info_log, "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
cfd->current()->DebugString().data()); cfd->current()->DebugString().data());
VersionEdit edit; VersionEdit edit;
@ -1654,18 +1656,19 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest, edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno); f->smallest_seqno, f->largest_seqno);
} }
Log(options_.info_log, "Apply version edit:\n%s", Log(options_.info_log, "[%s] Apply version edit:\n%s",
edit.DebugString().data()); cfd->GetName().c_str(), edit.DebugString().data());
status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get()); status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get());
superversion_to_free = cfd->InstallSuperVersion(new_superversion, &mutex_); superversion_to_free = cfd->InstallSuperVersion(new_superversion, &mutex_);
new_superversion = nullptr; new_superversion = nullptr;
Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data()); Log(options_.info_log, "[%s] LogAndApply: %s\n", cfd->GetName().c_str(),
status.ToString().data());
if (status.ok()) { if (status.ok()) {
Log(options_.info_log, "After refitting:\n%s", Log(options_.info_log, "[%s] After refitting:\n%s",
cfd->current()->DebugString().data()); cfd->GetName().c_str(), cfd->current()->DebugString().data());
} }
} }
@ -1752,12 +1755,14 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
++bg_manual_only_; ++bg_manual_only_;
while (bg_compaction_scheduled_ > 0) { while (bg_compaction_scheduled_ > 0) {
Log(options_.info_log, Log(options_.info_log,
"Manual compaction waiting for all other scheduled background " "[%s] Manual compaction waiting for all other scheduled background "
"compactions to finish"); "compactions to finish",
cfd->GetName().c_str());
bg_cv_.Wait(); bg_cv_.Wait();
} }
Log(options_.info_log, "Manual compaction starting"); Log(options_.info_log, "[%s] Manual compaction starting",
cfd->GetName().c_str());
while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) { while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
assert(bg_manual_only_ > 0); assert(bg_manual_only_ > 0);
@ -1874,8 +1879,9 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
LogToBuffer( LogToBuffer(
log_buffer, log_buffer,
"BackgroundCallFlush doing FlushMemTableToOutputFile with column " "BackgroundCallFlush doing FlushMemTableToOutputFile with column "
"family %u, flush slots available %d", "family [%s], flush slots available %d",
cfd->GetID(), options_.max_background_flushes - bg_flush_scheduled_); cfd->GetName().c_str(),
options_.max_background_flushes - bg_flush_scheduled_);
flush_status = FlushMemTableToOutputFile(cfd, madeProgress, flush_status = FlushMemTableToOutputFile(cfd, madeProgress,
deletion_state, log_buffer); deletion_state, log_buffer);
} }
@ -1963,8 +1969,6 @@ void DBImpl::BackgroundCallCompaction() {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_.info_log.get()); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_.info_log.get());
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
// Log(options_.info_log, "XXX BG Thread %llx process new work item",
// pthread_self());
assert(bg_compaction_scheduled_); assert(bg_compaction_scheduled_);
Status s; Status s;
if (!shutting_down_.Acquire_Load()) { if (!shutting_down_.Acquire_Load()) {
@ -2086,11 +2090,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (!c) { if (!c) {
m->done = true; m->done = true;
} }
LogToBuffer( LogToBuffer(log_buffer,
log_buffer, "[%s] Manual compaction from level-%d to level-%d from %s .. "
"Manual compaction from level-%d to level-%d from %s .. %s; will stop " "%s; will stop at %s\n",
"at %s\n", m->cfd->GetName().c_str(), m->input_level, m->output_level,
m->input_level, m->output_level,
(m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"), (m->end ? m->end->DebugString().c_str() : "(end)"),
((m->done || manual_end == nullptr) ((m->done || manual_end == nullptr)
@ -2128,10 +2131,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
InstallSuperVersion(c->column_family_data(), deletion_state); InstallSuperVersion(c->column_family_data(), deletion_state);
Version::LevelSummaryStorage tmp; Version::LevelSummaryStorage tmp;
LogToBuffer(log_buffer, "Moved #%lld to level-%d %lld bytes %s: %s\n", LogToBuffer(log_buffer, "[%s] Moved #%lld to level-%d %lld bytes %s: %s\n",
c->column_family_data()->GetName().c_str(),
static_cast<unsigned long long>(f->number), c->level() + 1, static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size), static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), c->input_version()->LevelSummary(&tmp)); status.ToString().c_str(),
c->input_version()->LevelSummary(&tmp));
c->ReleaseCompactionFiles(status); c->ReleaseCompactionFiles(status);
*madeProgress = true; *madeProgress = true;
} else { } else {
@ -2235,7 +2240,6 @@ void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) {
mutex_.AssertHeld(); mutex_.AssertHeld();
for (const auto file_number : compact->allocated_file_numbers) { for (const auto file_number : compact->allocated_file_numbers) {
pending_outputs_.erase(file_number); pending_outputs_.erase(file_number);
// Log(options_.info_log, "XXX releasing unused file num %d", file_number);
} }
} }
@ -2334,11 +2338,9 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
s = iter->status(); s = iter->status();
delete iter; delete iter;
if (s.ok()) { if (s.ok()) {
Log(options_.info_log, Log(options_.info_log, "[%s] Generated table #%lu: %lu keys, %lu bytes",
"Generated table #%lu: %lu keys, %lu bytes", cfd->GetName().c_str(), (unsigned long)output_number,
(unsigned long) output_number, (unsigned long)current_entries, (unsigned long)current_bytes);
(unsigned long) current_entries,
(unsigned long) current_bytes);
} }
} }
return s; return s;
@ -2354,15 +2356,16 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact,
// This ensures that a concurrent compaction did not erroneously // This ensures that a concurrent compaction did not erroneously
// pick the same files to compact. // pick the same files to compact.
if (!versions_->VerifyCompactionFileConsistency(compact->compaction)) { if (!versions_->VerifyCompactionFileConsistency(compact->compaction)) {
Log(options_.info_log, "Compaction %d@%d + %d@%d files aborted", Log(options_.info_log, "[%s] Compaction %d@%d + %d@%d files aborted",
compact->compaction->num_input_files(0), compact->compaction->column_family_data()->GetName().c_str(),
compact->compaction->level(), compact->compaction->num_input_files(0), compact->compaction->level(),
compact->compaction->num_input_files(1), compact->compaction->num_input_files(1),
compact->compaction->output_level()); compact->compaction->output_level());
return Status::Corruption("Compaction input files inconsistent"); return Status::Corruption("Compaction input files inconsistent");
} }
LogToBuffer(log_buffer, "Compacted %d@%d + %d@%d files => %lld bytes", LogToBuffer(log_buffer, "[%s] Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->column_family_data()->GetName().c_str(),
compact->compaction->num_input_files(0), compact->compaction->num_input_files(0),
compact->compaction->level(), compact->compaction->level(),
compact->compaction->num_input_files(1), compact->compaction->num_input_files(1),
@ -2620,16 +2623,6 @@ Status DBImpl::ProcessKeyValueCompaction(
last_sequence_for_key = ikey.sequence; last_sequence_for_key = ikey.sequence;
visible_in_snapshot = visible; visible_in_snapshot = visible;
} }
#if 0
Log(options_.info_log,
" Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
"%d smallest_snapshot: %d level: %d bottommost %d",
ikey.user_key.ToString().c_str(),
(int)ikey.sequence, ikey.type, kTypeValue, drop,
compact->compaction->IsBaseLevelForKey(ikey.user_key),
(int)last_sequence_for_key, (int)earliest_snapshot,
compact->compaction->level(), bottommost_level);
#endif
if (!drop) { if (!drop) {
// We may write a single key (e.g.: for Put/Delete or successful merge). // We may write a single key (e.g.: for Put/Delete or successful merge).
@ -2801,14 +2794,15 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
ColumnFamilyData* cfd = compact->compaction->column_family_data(); ColumnFamilyData* cfd = compact->compaction->column_family_data();
LogToBuffer( LogToBuffer(
log_buffer, log_buffer,
"[CF %u] Compacting %d@%d + %d@%d files, score %.2f slots available %d", "[%s] Compacting %d@%d + %d@%d files, score %.2f slots available %d",
cfd->GetID(), compact->compaction->num_input_files(0), cfd->GetName().c_str(), compact->compaction->num_input_files(0),
compact->compaction->level(), compact->compaction->num_input_files(1), compact->compaction->level(), compact->compaction->num_input_files(1),
compact->compaction->output_level(), compact->compaction->score(), compact->compaction->output_level(), compact->compaction->score(),
options_.max_background_compactions - bg_compaction_scheduled_); options_.max_background_compactions - bg_compaction_scheduled_);
char scratch[2345]; char scratch[2345];
compact->compaction->Summary(scratch, sizeof(scratch)); compact->compaction->Summary(scratch, sizeof(scratch));
LogToBuffer(log_buffer, "Compaction start summary: %s\n", scratch); LogToBuffer(log_buffer, "[%s] Compaction start summary: %s\n",
cfd->GetName().c_str(), scratch);
assert(cfd->current()->NumLevelFiles(compact->compaction->level()) > 0); assert(cfd->current()->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == nullptr); assert(compact->builder == nullptr);
@ -2886,8 +2880,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
} }
if (!ParseInternalKey(key, &ikey)) { if (!ParseInternalKey(key, &ikey)) {
// log error // log error
Log(options_.info_log, "Failed to parse key: %s", Log(options_.info_log, "[%s] Failed to parse key: %s",
key.ToString().c_str()); cfd->GetName().c_str(), key.ToString().c_str());
continue; continue;
} else { } else {
// If the prefix remains the same, keep buffering // If the prefix remains the same, keep buffering
@ -3068,10 +3062,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
Version::LevelSummaryStorage tmp; Version::LevelSummaryStorage tmp;
LogToBuffer( LogToBuffer(
log_buffer, log_buffer,
"compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) " "[%s] compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s\n", "write-amplify(%.1f) %s\n",
cfd->current()->LevelSummary(&tmp), cfd->GetName().c_str(), cfd->current()->LevelSummary(&tmp),
(stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) / (stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) /
(double)stats.micros, (double)stats.micros,
compact->compaction->output_level(), stats.files_in_leveln, compact->compaction->output_level(), stats.files_in_leveln,
@ -3409,10 +3403,10 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
assert(cfd != nullptr); assert(cfd != nullptr);
delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_); delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_);
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
Log(options_.info_log, "Created column family \"%s\" (ID %u)", Log(options_.info_log, "Created column family [%s] (ID %u)",
column_family_name.c_str(), (unsigned)cfd->GetID()); column_family_name.c_str(), (unsigned)cfd->GetID());
} else { } else {
Log(options_.info_log, "Creating column family \"%s\" FAILED -- %s", Log(options_.info_log, "Creating column family [%s] FAILED -- %s",
column_family_name.c_str(), s.ToString().c_str()); column_family_name.c_str(), s.ToString().c_str());
} }
return s; return s;
@ -3878,7 +3872,8 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
// We have filled up the current memtable, but the previous // We have filled up the current memtable, but the previous
// ones are still being flushed, so we wait. // ones are still being flushed, so we wait.
DelayLoggingAndReset(); DelayLoggingAndReset();
Log(options_.info_log, "wait for memtable flush...\n"); Log(options_.info_log, "[%s] wait for memtable flush...\n",
cfd->GetName().c_str());
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
uint64_t stall; uint64_t stall;
{ {
@ -3895,7 +3890,8 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
cfd->options()->level0_stop_writes_trigger) { cfd->options()->level0_stop_writes_trigger) {
// There are too many level-0 files. // There are too many level-0 files.
DelayLoggingAndReset(); DelayLoggingAndReset();
Log(options_.info_log, "wait for fewer level0 files...\n"); Log(options_.info_log, "[%s] wait for fewer level0 files...\n",
cfd->GetName().c_str());
uint64_t stall; uint64_t stall;
{ {
StopWatch sw(env_, options_.statistics.get(), StopWatch sw(env_, options_.statistics.get(),
@ -4019,9 +4015,8 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
} }
new_mem->Ref(); new_mem->Ref();
cfd->SetMemtable(new_mem); cfd->SetMemtable(new_mem);
Log(options_.info_log, Log(options_.info_log, "[%s] New memtable created with log file: #%lu\n",
"[CF %" PRIu32 "] New memtable created with log file: #%lu\n", cfd->GetName().c_str(), (unsigned long)logfile_number_);
cfd->GetID(), (unsigned long)logfile_number_);
force = false; // Do not force another compaction if have room force = false; // Do not force another compaction if have room
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
// TODO(icanadi) delete outside of mutex // TODO(icanadi) delete outside of mutex

@ -184,8 +184,8 @@ Status MemTableList::InstallMemtableFlushResults(
break; break;
} }
LogToBuffer(log_buffer, "Level-0 commit table #%lu started", LogToBuffer(log_buffer, "[%s] Level-0 commit table #%lu started",
(unsigned long)m->file_number_); cfd->GetName().c_str(), (unsigned long)m->file_number_);
// this can release and reacquire the mutex. // this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory); s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory);
@ -199,8 +199,10 @@ Status MemTableList::InstallMemtableFlushResults(
uint64_t mem_id = 1; // how many memtables has been flushed. uint64_t mem_id = 1; // how many memtables has been flushed.
do { do {
if (s.ok()) { // commit new state if (s.ok()) { // commit new state
LogToBuffer(log_buffer, "Level-0 commit table #%lu: memtable #%lu done", LogToBuffer(log_buffer,
(unsigned long)m->file_number_, (unsigned long)mem_id); "[%s] Level-0 commit table #%lu: memtable #%lu done",
cfd->GetName().c_str(), (unsigned long)m->file_number_,
(unsigned long)mem_id);
current_->Remove(m); current_->Remove(m);
assert(m->file_number_ > 0); assert(m->file_number_ > 0);

@ -767,16 +767,11 @@ void Version::ComputeCompactionScore(
// If we are slowing down writes, then we better compact that first // If we are slowing down writes, then we better compact that first
if (numfiles >= cfd_->options()->level0_stop_writes_trigger) { if (numfiles >= cfd_->options()->level0_stop_writes_trigger) {
score = 1000000; score = 1000000;
// Log(options_->info_log, "XXX score l0 = 1000000000 max");
} else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) { } else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) {
score = 10000; score = 10000;
// Log(options_->info_log, "XXX score l0 = 1000000 medium");
} else { } else {
score = static_cast<double>(numfiles) / score = static_cast<double>(numfiles) /
cfd_->options()->level0_file_num_compaction_trigger; cfd_->options()->level0_file_num_compaction_trigger;
if (score >= 1) {
// Log(options_->info_log, "XXX score l0 = %d least", (int)score);
}
} }
} else { } else {
// Compute the ratio of current size to size limit. // Compute the ratio of current size to size limit.
@ -784,9 +779,6 @@ void Version::ComputeCompactionScore(
TotalFileSize(files_[level]) - size_being_compacted[level]; TotalFileSize(files_[level]) - size_being_compacted[level];
score = static_cast<double>(level_bytes) / score = static_cast<double>(level_bytes) /
cfd_->compaction_picker()->MaxBytesForLevel(level); cfd_->compaction_picker()->MaxBytesForLevel(level);
if (score > 1) {
// Log(options_->info_log, "XXX score l%d = %d ", level, (int)score);
}
if (max_score < score) { if (max_score < score) {
max_score = score; max_score = score;
max_score_level = level; max_score_level = level;
@ -1823,8 +1815,9 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
manifest_file_size_ = new_manifest_file_size; manifest_file_size_ = new_manifest_file_size;
prev_log_number_ = edit->prev_log_number_; prev_log_number_ = edit->prev_log_number_;
} else { } else {
Log(options_->info_log, "Error in committing version %lu", Log(options_->info_log, "Error in committing version %lu to [%s]",
(unsigned long)v->GetVersionNumber()); (unsigned long)v->GetVersionNumber(),
column_family_data->GetName().c_str());
delete v; delete v;
if (new_descriptor_log) { if (new_descriptor_log) {
descriptor_log_.reset(); descriptor_log_.reset();
@ -2162,8 +2155,8 @@ Status VersionSet::Recover(
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
Log(options_->info_log, Log(options_->info_log,
"Column family \"%s\", log number is %" PRIu64 "\n", "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
cfd->GetName().c_str(), cfd->GetLogNumber()); cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
} }
} }
@ -2708,7 +2701,9 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
#ifndef NDEBUG #ifndef NDEBUG
Version* version = c->column_family_data()->current(); Version* version = c->column_family_data()->current();
if (c->input_version() != version) { if (c->input_version() != version) {
Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch"); Log(options_->info_log,
"[%s] VerifyCompactionFileConsistency version mismatch",
c->column_family_data()->GetName().c_str());
} }
// verify files in level // verify files in level

Loading…
Cancel
Save