unfriend UniversalCompactionPicker,LevelCompactionPicker and FIFOCompactionPicker from VersionSet

Summary: as title

Test Plan:
make release
I will run make all check for all stacked diffs before commit

Reviewers: sdong, yhchiang, rven, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D27573
main
Lei Jin 10 years ago
parent 5187d896b9
commit a28b3c4388
  1. 121
      db/compaction_picker.cc
  2. 4
      db/version_set.cc
  3. 29
      db/version_set.h

@ -372,12 +372,11 @@ Compaction* LevelCompactionPicker::PickCompaction(
//
// Find the compactions by size on all levels.
for (int i = 0; i < NumberLevels() - 1; i++) {
assert(i == 0 ||
version->compaction_score_[i] <= version->compaction_score_[i - 1]);
level = version->compaction_level_[i];
if ((version->compaction_score_[i] >= 1)) {
c = PickCompactionBySize(mutable_cf_options, version, level,
version->compaction_score_[i]);
double score = version->CompactionScore(i);
assert(i == 0 || score <= version->CompactionScore(i - 1));
level = version->CompactionScoreLevel(i);
if (score >= 1) {
c = PickCompactionBySize(mutable_cf_options, version, level, score);
if (c == nullptr || ExpandWhileOverlapping(c) == false) {
delete c;
c = nullptr;
@ -455,7 +454,8 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
// Pick the largest file in this level that is not already
// being compacted
std::vector<int>& file_size = version->files_by_size_[level];
const std::vector<int>& file_size = version->FilesBySize(level);
const std::vector<FileMetaData*>& level_files = version->LevelFiles(level);
// record the first file that is not yet compacted
int nextIndex = -1;
@ -463,13 +463,13 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
for (unsigned int i = version->NextCompactionIndex(level);
i < file_size.size(); i++) {
int index = file_size[i];
FileMetaData* f = version->files_[level][index];
FileMetaData* f = level_files[index];
// Check to verify files are arranged in descending compensated size.
assert((i == file_size.size() - 1) ||
(i >= Version::number_of_files_to_sort_ - 1) ||
(i >= Version::kNumberFilesToSort - 1) ||
(f->compensated_file_size >=
version->files_[level][file_size[i + 1]]->compensated_file_size));
level_files[file_size[i + 1]]->compensated_file_size));
// do not pick a file to compact if it is being compacted
// from n-1 level.
@ -512,26 +512,27 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
Compaction* UniversalCompactionPicker::PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) {
int level = 0;
double score = version->compaction_score_[0];
const int kLevel0 = 0;
double score = version->CompactionScore(kLevel0);
const std::vector<FileMetaData*>& level_files = version->LevelFiles(kLevel0);
if ((version->files_[level].size() <
if ((level_files.size() <
(unsigned int)mutable_cf_options.level0_file_num_compaction_trigger)) {
LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n",
version->cfd_->GetName().c_str());
version->cfd()->GetName().c_str());
return nullptr;
}
Version::FileSummaryStorage tmp;
LogToBuffer(log_buffer, 3072, "[%s] Universal: candidate files(%zu): %s\n",
version->cfd_->GetName().c_str(), version->files_[level].size(),
version->LevelFileSummary(&tmp, 0));
version->cfd()->GetName().c_str(), level_files.size(),
version->LevelFileSummary(&tmp, kLevel0));
// Check for size amplification first.
Compaction* c;
if ((c = PickCompactionUniversalSizeAmp(
mutable_cf_options, version, score, log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "[%s] Universal: compacting for size amp\n",
version->cfd_->GetName().c_str());
version->cfd()->GetName().c_str());
} else {
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
@ -541,31 +542,31 @@ Compaction* UniversalCompactionPicker::PickCompaction(
mutable_cf_options, version, score, ratio,
UINT_MAX, log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "[%s] Universal: compacting for size ratio\n",
version->cfd_->GetName().c_str());
version->cfd()->GetName().c_str());
} else {
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
unsigned int num_files = version->files_[level].size() -
unsigned int num_files = level_files.size() -
mutable_cf_options.level0_file_num_compaction_trigger;
if ((c = PickCompactionUniversalReadAmp(
mutable_cf_options, version, score, UINT_MAX,
num_files, log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "[%s] Universal: compacting for file num -- %u\n",
version->cfd_->GetName().c_str(), num_files);
version->cfd()->GetName().c_str(), num_files);
}
}
}
if (c == nullptr) {
return nullptr;
}
assert(c->inputs_[0].size() > 1);
assert(c->inputs_[kLevel0].size() > 1);
// validate that all the chosen files are non overlapping in time
FileMetaData* newerfile __attribute__((unused)) = nullptr;
for (unsigned int i = 0; i < c->inputs_[0].size(); i++) {
FileMetaData* f = c->inputs_[0][i];
for (unsigned int i = 0; i < c->inputs_[kLevel0].size(); i++) {
FileMetaData* f = c->inputs_[kLevel0][i];
assert (f->smallest_seqno <= f->largest_seqno);
assert(newerfile == nullptr ||
newerfile->smallest_seqno > f->largest_seqno);
@ -573,23 +574,22 @@ Compaction* UniversalCompactionPicker::PickCompaction(
}
// Is the earliest file part of this compaction?
FileMetaData* last_file = c->input_version_->files_[level].back();
c->bottommost_level_ = c->inputs_[0].files.back() == last_file;
FileMetaData* last_file = level_files.back();
c->bottommost_level_ = c->inputs_[kLevel0].files.back() == last_file;
// update statistics
MeasureTime(ioptions_.statistics,
NUM_FILES_IN_SINGLE_COMPACTION, c->inputs_[0].size());
NUM_FILES_IN_SINGLE_COMPACTION, c->inputs_[kLevel0].size());
// mark all the files that are being compacted
c->MarkFilesBeingCompacted(true);
// remember this currently undergoing compaction
compactions_in_progress_[level].insert(c);
compactions_in_progress_[kLevel0].insert(c);
// Record whether this compaction includes all sst files.
// For now, it is only relevant in universal compaction mode.
c->is_full_compaction_ =
(c->inputs_[0].size() == c->input_version_->files_[0].size());
c->is_full_compaction_ = (c->inputs_[kLevel0].size() == level_files.size());
c->mutable_cf_options_ = mutable_cf_options;
return c;
@ -634,7 +634,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
const MutableCFOptions& mutable_cf_options, Version* version,
double score, unsigned int ratio,
unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) {
int level = 0;
const int kLevel0 = 0;
unsigned int min_merge_width =
ioptions_.compaction_options_universal.min_merge_width;
@ -642,7 +642,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
ioptions_.compaction_options_universal.max_merge_width;
// The files are sorted from newest first to oldest last.
const auto& files = version->files_[level];
const auto& files = version->LevelFiles(kLevel0);
FileMetaData* f = nullptr;
bool done = false;
@ -669,7 +669,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
}
LogToBuffer(log_buffer, "[%s] Universal: file %" PRIu64
"[%d] being compacted, skipping",
version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop);
version->cfd()->GetName().c_str(), f->fd.GetNumber(), loop);
f = nullptr;
}
@ -681,7 +681,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf,
sizeof(file_num_buf));
LogToBuffer(log_buffer, "[%s] Universal: Possible candidate file %s[%d].",
version->cfd_->GetName().c_str(), file_num_buf, loop);
version->cfd()->GetName().c_str(), file_num_buf, loop);
}
// Check if the suceeding files need compaction.
@ -732,7 +732,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
LogToBuffer(log_buffer, "[%s] Universal: Skipping file %" PRIu64
"[%d] with size %" PRIu64
" (compensated size %" PRIu64 ") %d\n",
version->cfd_->GetName().c_str(), f->fd.GetNumber(), i,
version->cfd()->GetName().c_str(), f->fd.GetNumber(), i,
f->fd.GetFileSize(), f->compensated_file_size,
f->being_compacted);
}
@ -748,7 +748,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
int ratio_to_compress =
ioptions_.compaction_options_universal.compression_size_percent;
if (ratio_to_compress >= 0) {
uint64_t total_size = version->NumLevelBytes(level);
uint64_t total_size = version->NumLevelBytes(kLevel0);
uint64_t older_file_size = 0;
for (unsigned int i = files.size() - 1;
i >= first_index_after; i--) {
@ -766,14 +766,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
}
uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
Compaction* c = new Compaction(
version, level, level, mutable_cf_options.MaxFileSizeForLevel(level),
LLONG_MAX, path_id, GetCompressionType(ioptions_, level,
Compaction* c = new Compaction(version, kLevel0, kLevel0,
mutable_cf_options.MaxFileSizeForLevel(kLevel0),
LLONG_MAX, path_id, GetCompressionType(ioptions_, kLevel0,
enable_compression));
c->score_ = score;
for (unsigned int i = start_index; i < first_index_after; i++) {
FileMetaData* f = c->input_version_->files_[level][i];
FileMetaData* f = files[i];
c->inputs_[0].files.push_back(f);
char file_num_buf[kFormatFileNumberBufSize];
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf,
@ -781,7 +781,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
LogToBuffer(log_buffer,
"[%s] Universal: Picking file %s[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")\n",
version->cfd_->GetName().c_str(), file_num_buf, i,
version->cfd()->GetName().c_str(), file_num_buf, i,
f->fd.GetFileSize(), f->compensated_file_size);
}
return c;
@ -796,14 +796,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
const MutableCFOptions& mutable_cf_options, Version* version,
double score, LogBuffer* log_buffer) {
int level = 0;
const int kLevel = 0;
// percentage flexibilty while reducing size amplification
uint64_t ratio = ioptions_.compaction_options_universal.
max_size_amplification_percent;
// The files are sorted from newest first to oldest last.
const auto& files = version->files_[level];
const auto& files = version->LevelFiles(kLevel);
unsigned int candidate_count = 0;
uint64_t candidate_size = 0;
@ -821,10 +821,11 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf,
sizeof(file_num_buf));
LogToBuffer(log_buffer, "[%s] Universal: skipping file %s[%d] compacted %s",
version->cfd_->GetName().c_str(), file_num_buf, loop,
version->cfd()->GetName().c_str(), file_num_buf, loop,
" cannot be a candidate to reduce size amp.\n");
f = nullptr;
}
if (f == nullptr) {
return nullptr; // no candidate files
}
@ -833,7 +834,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf,
sizeof(file_num_buf));
LogToBuffer(log_buffer, "[%s] Universal: First candidate file %s[%d] %s",
version->cfd_->GetName().c_str(), file_num_buf, start_index,
version->cfd()->GetName().c_str(), file_num_buf, start_index,
" to reduce size amp.\n");
// keep adding up all the remaining files
@ -845,7 +846,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
sizeof(file_num_buf));
LogToBuffer(
log_buffer, "[%s] Universal: Possible candidate file %s[%d] %s.",
version->cfd_->GetName().c_str(), file_num_buf, loop,
version->cfd()->GetName().c_str(), file_num_buf, loop,
" is already being compacted. No size amp reduction possible.\n");
return nullptr;
}
@ -865,14 +866,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
log_buffer,
"[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
"earliest-file-size %" PRIu64,
version->cfd_->GetName().c_str(), candidate_size, earliest_file_size);
version->cfd()->GetName().c_str(), candidate_size, earliest_file_size);
return nullptr;
} else {
LogToBuffer(
log_buffer,
"[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
"earliest-file-size %" PRIu64,
version->cfd_->GetName().c_str(), candidate_size, earliest_file_size);
version->cfd()->GetName().c_str(), candidate_size, earliest_file_size);
}
assert(start_index < files.size() - 1);
@ -886,17 +887,17 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
// create a compaction request
// We always compact all the files, so always compress.
Compaction* c =
new Compaction(version, level, level,
mutable_cf_options.MaxFileSizeForLevel(level),
LLONG_MAX, path_id, GetCompressionType(ioptions_, level));
new Compaction(version, kLevel, kLevel,
mutable_cf_options.MaxFileSizeForLevel(kLevel),
LLONG_MAX, path_id, GetCompressionType(ioptions_, kLevel));
c->score_ = score;
for (unsigned int loop = start_index; loop < files.size(); loop++) {
f = c->input_version_->files_[level][loop];
f = files[loop];
c->inputs_[0].files.push_back(f);
LogToBuffer(log_buffer,
"[%s] Universal: size amp picking file %" PRIu64 "[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")",
version->cfd_->GetName().c_str(),
version->cfd()->GetName().c_str(),
f->fd.GetNumber(), loop,
f->fd.GetFileSize(), f->compensated_file_size);
}
@ -907,18 +908,20 @@ Compaction* FIFOCompactionPicker::PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) {
assert(version->NumberLevels() == 1);
const int kLevel0 = 0;
const std::vector<FileMetaData*>& level_files = version->LevelFiles(kLevel0);
uint64_t total_size = 0;
for (const auto& file : version->files_[0]) {
for (const auto& file : level_files) {
total_size += file->compensated_file_size;
}
if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size ||
version->files_[0].size() == 0) {
level_files.size() == 0) {
// total size not exceeded
LogToBuffer(log_buffer,
"[%s] FIFO compaction: nothing to do. Total size %" PRIu64
", max size %" PRIu64 "\n",
version->cfd_->GetName().c_str(), total_size,
version->cfd()->GetName().c_str(), total_size,
ioptions_.compaction_options_fifo.max_table_files_size);
return nullptr;
}
@ -927,15 +930,14 @@ Compaction* FIFOCompactionPicker::PickCompaction(
LogToBuffer(log_buffer,
"[%s] FIFO compaction: Already executing compaction. No need "
"to run parallel compactions since compactions are very fast",
version->cfd_->GetName().c_str());
version->cfd()->GetName().c_str());
return nullptr;
}
Compaction* c = new Compaction(version, 0, 0, 0, 0, 0, kNoCompression, false,
true /* is deletion compaction */);
// delete old files (FIFO)
for (auto ritr = version->files_[0].rbegin();
ritr != version->files_[0].rend(); ++ritr) {
for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
auto f = *ritr;
total_size -= f->compensated_file_size;
c->inputs_[0].files.push_back(f);
@ -943,7 +945,8 @@ Compaction* FIFOCompactionPicker::PickCompaction(
AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
" with size %s for deletion",
version->cfd_->GetName().c_str(), f->fd.GetNumber(), tmp_fsize);
version->cfd()->GetName().c_str(), f->fd.GetNumber(),
tmp_fsize);
if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) {
break;
}

@ -991,8 +991,8 @@ void Version::UpdateFilesBySize() {
temp[i].file = files[i];
}
// sort the top number_of_files_to_sort_ based on file size
size_t num = Version::number_of_files_to_sort_;
// sort the top kNumberFilesToSort based on file size
size_t num = Version::kNumberFilesToSort;
if (num > temp.size()) {
num = temp.size();
}

@ -130,6 +130,12 @@ class Version {
// See field declaration
int MaxCompactionScoreLevel() const { return max_compaction_score_level_; }
// Return level number that has idx'th highest score
int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; }
// Return idx'th highest score
double CompactionScore(int idx) const { return compaction_score_[idx]; }
void GetOverlappingInputs(
int level,
const InternalKey* begin, // nullptr means before all keys
@ -251,6 +257,12 @@ class Version {
return files_[level];
}
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
const std::vector<int>& FilesBySize(int level) const {
assert(finalized_);
return files_by_size_[level];
}
// REQUIRES: lock is held
// Set the index that is used to offset into files_by_size_ to find
// the next compaction candidate file.
@ -263,14 +275,18 @@ class Version {
return next_file_to_compact_by_size_[level];
}
// Only the first few entries of files_by_size_ are sorted.
// There is no need to sort all the files because it is likely
// that on a running system, we need to look at only the first
// few largest files because a new version is created every few
// seconds/minutes (because of concurrent compactions).
static const size_t kNumberFilesToSort = 50;
private:
friend class VersionSet;
friend class DBImpl;
friend class CompactedDBImpl;
friend class ColumnFamilyData;
friend class LevelCompactionPicker;
friend class UniversalCompactionPicker;
friend class FIFOCompactionPicker;
friend class ForwardIterator;
friend class InternalStats;
@ -332,13 +348,6 @@ class Version {
// file that is not yet compacted
std::vector<int> next_file_to_compact_by_size_;
// Only the first few entries of files_by_size_ are sorted.
// There is no need to sort all the files because it is likely
// that on a running system, we need to look at only the first
// few largest files because a new version is created every few
// seconds/minutes (because of concurrent compactions).
static const size_t number_of_files_to_sort_ = 50;
// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().

Loading…
Cancel
Save