CompactionPicker

Summary:
This is a big one. This diff moves all the code related to picking compactions from VersionSet to new class CompactionPicker. Column families' compactions will be completely separate processes, so we need to have multiple CompactionPickers.

To make this easier to review, most of the code change is just copy/paste. There is also a small change not to use VersionSet::current_, but rather to take `Version* version` as a parameter. Most of the other code is exactly the same.

In future diffs, I will also make some improvements to CompactionPickers. I think the most important part will be encapsulating it better. Currently Version, VersionSet, Compaction and CompactionPicker are all friend classes, which makes it harder to change the implementation.

This diff depends on D15171, D15183, D15189 and D15201

Test Plan: `make check`

Reviewers: kailiu, sdong, dhruba, haobo

Reviewed By: kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15207
main
Igor Canadi 11 years ago
parent eae1804f29
commit c699c84af4
  1. 3
      db/compaction.h
  2. 854
      db/compaction_picker.cc
  3. 152
      db/compaction_picker.h
  4. 856
      db/version_set.cc
  5. 69
      db/version_set.h
  6. 4
      db/version_set_reduce_num_levels.cc

@ -76,6 +76,9 @@ class Compaction {
private:
friend class Version;
friend class VersionSet;
friend class CompactionPicker;
friend class UniversalCompactionPicker;
friend class LevelCompactionPicker;
Compaction(Version* input_version, int level, int out_level,
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,

@ -0,0 +1,854 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction_picker.h"
namespace rocksdb {
namespace {
uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
uint64_t sum = 0;
for (size_t i = 0; i < files.size() && files[i]; i++) {
sum += files[i]->file_size;
}
return sum;
}
} // anonymous namespace
CompactionPicker::CompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: compactions_in_progress_(options->num_levels),
options_(options),
num_levels_(options->num_levels),
icmp_(icmp) {
Init();
}
void CompactionPicker::ReduceNumberOfLevels(int new_levels) {
num_levels_ = new_levels;
Init();
}
void CompactionPicker::Init() {
max_file_size_.reset(new uint64_t[NumberLevels()]);
level_max_bytes_.reset(new uint64_t[NumberLevels()]);
int target_file_size_multiplier = options_->target_file_size_multiplier;
int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
for (int i = 0; i < NumberLevels(); i++) {
if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) {
max_file_size_[i] = ULLONG_MAX;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
} else if (i > 1) {
max_file_size_[i] = max_file_size_[i - 1] * target_file_size_multiplier;
level_max_bytes_[i] =
level_max_bytes_[i - 1] * max_bytes_multiplier *
options_->max_bytes_for_level_multiplier_additional[i - 1];
} else {
max_file_size_[i] = options_->target_file_size_base;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
}
}
}
CompactionPicker::~CompactionPicker() {}
void CompactionPicker::SizeBeingCompacted(std::vector<uint64_t>& sizes) {
for (int level = 0; level < NumberLevels() - 1; level++) {
uint64_t total = 0;
for (auto c : compactions_in_progress_[level]) {
assert(c->level() == level);
for (int i = 0; i < c->num_input_files(0); i++) {
total += c->input(0,i)->file_size;
}
}
sizes[level] = total;
}
}
// Clear all files to indicate that they are not being compacted
// Delete this compaction from the list of running compactions.
void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
c->MarkFilesBeingCompacted(false);
compactions_in_progress_[c->level()].erase(c);
if (!status.ok()) {
c->ResetNextCompactionIndex();
}
}
uint64_t CompactionPicker::MaxFileSizeForLevel(int level) const {
assert(level >= 0);
assert(level < NumberLevels());
return max_file_size_[level];
}
uint64_t CompactionPicker::MaxGrandParentOverlapBytes(int level) {
uint64_t result = MaxFileSizeForLevel(level);
result *= options_->max_grandparent_overlap_factor;
return result;
}
double CompactionPicker::MaxBytesForLevel(int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
assert(level >= 0);
assert(level < NumberLevels());
return level_max_bytes_[level];
}
void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest, InternalKey* largest) {
assert(!inputs.empty());
smallest->Clear();
largest->Clear();
for (size_t i = 0; i < inputs.size(); i++) {
FileMetaData* f = inputs[i];
if (i == 0) {
*smallest = f->smallest;
*largest = f->largest;
} else {
if (icmp_->Compare(f->smallest, *smallest) < 0) {
*smallest = f->smallest;
}
if (icmp_->Compare(f->largest, *largest) > 0) {
*largest = f->largest;
}
}
}
}
void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest, InternalKey* largest) {
std::vector<FileMetaData*> all = inputs1;
all.insert(all.end(), inputs2.begin(), inputs2.end());
GetRange(all, smallest, largest);
}
// Add more files to the inputs on "level" to make sure that
// no newer version of a key is compacted to "level+1" while leaving an older
// version in a "level". Otherwise, any Get() will search "level" first,
// and will likely return an old/stale value for the key, since it always
// searches in increasing order of level to find the value. This could
// also scramble the order of merge operands. This function should be
// called any time a new Compaction is created, and its inputs_[0] are
// populated.
//
// Will set c to nullptr if it is impossible to apply this compaction.
void CompactionPicker::ExpandWhileOverlapping(Compaction* c) {
// If inputs are empty then there is nothing to expand.
if (!c || c->inputs_[0].empty()) {
return;
}
// GetOverlappingInputs will always do the right thing for level-0.
// So we don't need to do any expansion if level == 0.
if (c->level() == 0) {
return;
}
const int level = c->level();
InternalKey smallest, largest;
// Keep expanding c->inputs_[0] until we are sure that there is a
// "clean cut" boundary between the files in input and the surrounding files.
// This will ensure that no parts of a key are lost during compaction.
int hint_index = -1;
size_t old_size;
do {
old_size = c->inputs_[0].size();
GetRange(c->inputs_[0], &smallest, &largest);
c->inputs_[0].clear();
c->input_version_->GetOverlappingInputs(
level, &smallest, &largest, &c->inputs_[0], hint_index, &hint_index);
} while(c->inputs_[0].size() > old_size);
// Get the new range
GetRange(c->inputs_[0], &smallest, &largest);
// If, after the expansion, there are files that are already under
// compaction, then we must drop/cancel this compaction.
int parent_index = -1;
if (FilesInCompaction(c->inputs_[0]) ||
(c->level() != c->output_level() &&
ParentRangeInCompaction(c->input_version_, &smallest, &largest, level,
&parent_index))) {
c->inputs_[0].clear();
c->inputs_[1].clear();
delete c;
c = nullptr;
}
}
uint64_t CompactionPicker::ExpandedCompactionByteSizeLimit(int level) {
uint64_t result = MaxFileSizeForLevel(level);
result *= options_->expanded_compaction_factor;
return result;
}
// Returns true if any one of specified files are being compacted
bool CompactionPicker::FilesInCompaction(std::vector<FileMetaData*>& files) {
for (unsigned int i = 0; i < files.size(); i++) {
if (files[i]->being_compacted) {
return true;
}
}
return false;
}
// Returns true if any one of the parent files are being compacted
bool CompactionPicker::ParentRangeInCompaction(Version* version,
const InternalKey* smallest,
const InternalKey* largest,
int level, int* parent_index) {
std::vector<FileMetaData*> inputs;
assert(level + 1 < NumberLevels());
version->GetOverlappingInputs(level + 1, smallest, largest, &inputs,
*parent_index, parent_index);
return FilesInCompaction(inputs);
}
// Populates the set of inputs from "level+1" that overlap with "level".
// Will also attempt to expand "level" if that doesn't expand "level+1"
// or cause "level" to include a file for compaction that has an overlapping
// user-key with another file.
void CompactionPicker::SetupOtherInputs(Compaction* c) {
// If inputs are empty, then there is nothing to expand.
// If both input and output levels are the same, no need to consider
// files at level "level+1"
if (c->inputs_[0].empty() || c->level() == c->output_level()) {
return;
}
const int level = c->level();
InternalKey smallest, largest;
// Get the range one last time.
GetRange(c->inputs_[0], &smallest, &largest);
// Populate the set of next-level files (inputs_[1]) to include in compaction
c->input_version_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1], c->parent_index_,
&c->parent_index_);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
// See if we can further grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up. We also choose NOT
// to expand if this would cause "level" to include some entries for some
// user key, while excluding other entries for the same user key. This
// can happen when one user key spans multiple files.
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
c->input_version_->GetOverlappingInputs(
level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr);
const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const uint64_t expanded0_size = TotalFileSize(expanded0);
uint64_t limit = ExpandedCompactionByteSizeLimit(level);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < limit &&
!FilesInCompaction(expanded0) &&
!c->input_version_->HasOverlappingUserKey(&expanded0, level)) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
c->input_version_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
&expanded1, c->parent_index_,
&c->parent_index_);
if (expanded1.size() == c->inputs_[1].size() &&
!FilesInCompaction(expanded1)) {
Log(options_->info_log,
"Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)"
"\n",
(unsigned long)level,
(unsigned long)(c->inputs_[0].size()),
(unsigned long)(c->inputs_[1].size()),
(unsigned long)inputs0_size,
(unsigned long)inputs1_size,
(unsigned long)(expanded0.size()),
(unsigned long)(expanded1.size()),
(unsigned long)expanded0_size,
(unsigned long)inputs1_size);
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (level + 2 < NumberLevels()) {
c->input_version_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}
}
Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
std::vector<FileMetaData*> inputs;
bool covering_the_whole_range = true;
// All files are 'overlapping' in universal style compaction.
// We have to compact the entire range in one shot.
if (options_->compaction_style == kCompactionStyleUniversal) {
begin = nullptr;
end = nullptr;
}
version->GetOverlappingInputs(input_level, begin, end, &inputs);
if (inputs.empty()) {
return nullptr;
}
// Avoid compacting too much in one shot in case the range is large.
// But we cannot do this for level-0 since level-0 files can overlap
// and we must not pick one file and drop another older file if the
// two files overlap.
if (input_level > 0) {
const uint64_t limit =
MaxFileSizeForLevel(input_level) * options_->source_compaction_factor;
uint64_t total = 0;
for (size_t i = 0; i + 1 < inputs.size(); ++i) {
uint64_t s = inputs[i]->file_size;
total += s;
if (total >= limit) {
**compaction_end = inputs[i + 1]->smallest;
covering_the_whole_range = false;
inputs.resize(i + 1);
break;
}
}
}
Compaction* c = new Compaction(version, input_level, output_level,
MaxFileSizeForLevel(output_level),
MaxGrandParentOverlapBytes(input_level));
c->inputs_[0] = inputs;
ExpandWhileOverlapping(c);
if (c == nullptr) {
Log(options_->info_log, "Could not compact due to expansion failure.\n");
return nullptr;
}
SetupOtherInputs(c);
if (covering_the_whole_range) {
*compaction_end = nullptr;
}
// These files that are to be manaully compacted do not trample
// upon other files because manual compactions are processed when
// the system has a max of 1 background compaction thread.
c->MarkFilesBeingCompacted(true);
// Is this compaction creating a file at the bottommost level
c->SetupBottomMostLevel(true);
return c;
}
Compaction* LevelCompactionPicker::PickCompaction(Version* version) {
Compaction* c = nullptr;
int level = -1;
// Compute the compactions needed. It is better to do it here
// and also in LogAndApply(), otherwise the values could be stale.
std::vector<uint64_t> size_being_compacted(NumberLevels() - 1);
SizeBeingCompacted(size_being_compacted);
version->Finalize(size_being_compacted);
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
//
// Find the compactions by size on all levels.
for (int i = 0; i < NumberLevels() - 1; i++) {
assert(i == 0 ||
version->compaction_score_[i] <= version->compaction_score_[i - 1]);
level = version->compaction_level_[i];
if ((version->compaction_score_[i] >= 1)) {
c = PickCompactionBySize(version, level, version->compaction_score_[i]);
ExpandWhileOverlapping(c);
if (c != nullptr) {
break;
}
}
}
// Find compactions needed by seeks
FileMetaData* f = version->file_to_compact_;
if (c == nullptr && f != nullptr && !f->being_compacted) {
level = version->file_to_compact_level_;
int parent_index = -1;
// Only allow one level 0 compaction at a time.
// Do not pick this file if its parents at level+1 are being compacted.
if (level != 0 || compactions_in_progress_[0].empty()) {
if (!ParentRangeInCompaction(version, &f->smallest, &f->largest, level,
&parent_index)) {
c = new Compaction(version, level, level + 1,
MaxFileSizeForLevel(level + 1),
MaxGrandParentOverlapBytes(level), true);
c->inputs_[0].push_back(f);
c->parent_index_ = parent_index;
c->input_version_->file_to_compact_ = nullptr;
ExpandWhileOverlapping(c);
}
}
}
if (c == nullptr) {
return nullptr;
}
// Two level 0 compaction won't run at the same time, so don't need to worry
// about files on level 0 being compacted.
if (level == 0) {
assert(compactions_in_progress_[0].empty());
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
c->inputs_[0].clear();
c->input_version_->GetOverlappingInputs(0, &smallest, &largest,
&c->inputs_[0]);
// If we include more L0 files in the same compaction run it can
// cause the 'smallest' and 'largest' key to get extended to a
// larger range. So, re-invoke GetRange to get the new key range
GetRange(c->inputs_[0], &smallest, &largest);
if (ParentRangeInCompaction(c->input_version_, &smallest, &largest, level,
&c->parent_index_)) {
delete c;
return nullptr;
}
assert(!c->inputs_[0].empty());
}
// Setup "level+1" files (inputs_[1])
SetupOtherInputs(c);
// mark all the files that are being compacted
c->MarkFilesBeingCompacted(true);
// Is this compaction creating a file at the bottommost level
c->SetupBottomMostLevel(false);
// remember this currently undergoing compaction
compactions_in_progress_[level].insert(c);
return c;
}
Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
int level,
double score) {
Compaction* c = nullptr;
// level 0 files are overlapping. So we cannot pick more
// than one concurrent compactions at this level. This
// could be made better by looking at key-ranges that are
// being compacted at level 0.
if (level == 0 && compactions_in_progress_[level].size() == 1) {
return nullptr;
}
assert(level >= 0);
assert(level + 1 < NumberLevels());
c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1),
MaxGrandParentOverlapBytes(level));
c->score_ = score;
// Pick the largest file in this level that is not already
// being compacted
std::vector<int>& file_size = c->input_version_->files_by_size_[level];
// record the first file that is not yet compacted
int nextIndex = -1;
for (unsigned int i = c->input_version_->next_file_to_compact_by_size_[level];
i < file_size.size(); i++) {
int index = file_size[i];
FileMetaData* f = c->input_version_->files_[level][index];
// check to verify files are arranged in descending size
assert((i == file_size.size() - 1) ||
(i >= Version::number_of_files_to_sort_ - 1) ||
(f->file_size >=
c->input_version_->files_[level][file_size[i + 1]]->file_size));
// do not pick a file to compact if it is being compacted
// from n-1 level.
if (f->being_compacted) {
continue;
}
// remember the startIndex for the next call to PickCompaction
if (nextIndex == -1) {
nextIndex = i;
}
//if (i > Version::number_of_files_to_sort_) {
// Log(options_->info_log, "XXX Looking at index %d", i);
//}
// Do not pick this file if its parents at level+1 are being compacted.
// Maybe we can avoid redoing this work in SetupOtherInputs
int parent_index = -1;
if (ParentRangeInCompaction(c->input_version_, &f->smallest, &f->largest,
level, &parent_index)) {
continue;
}
c->inputs_[0].push_back(f);
c->base_index_ = index;
c->parent_index_ = parent_index;
break;
}
if (c->inputs_[0].empty()) {
delete c;
c = nullptr;
}
// store where to start the iteration in the next call to PickCompaction
c->input_version_->next_file_to_compact_by_size_[level] = nextIndex;
return c;
}
// Universal style of compaction. Pick files that are contiguous in
// time-range to compact.
//
Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
int level = 0;
double score = version->compaction_score_[0];
if ((version->files_[level].size() <
(unsigned int)options_->level0_file_num_compaction_trigger)) {
Log(options_->info_log, "Universal: nothing to do\n");
return nullptr;
}
Version::FileSummaryStorage tmp;
Log(options_->info_log, "Universal: candidate files(%lu): %s\n",
version->files_[level].size(),
version->LevelFileSummary(&tmp, 0));
// Check for size amplification first.
Compaction* c = PickCompactionUniversalSizeAmp(version, score);
if (c == nullptr) {
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio = options_->compaction_options_universal.size_ratio;
c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX);
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
if (c == nullptr) {
unsigned int num_files = version->files_[level].size() -
options_->level0_file_num_compaction_trigger;
c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files);
}
}
if (c == nullptr) {
return nullptr;
}
assert(c->inputs_[0].size() > 1);
// validate that all the chosen files are non overlapping in time
FileMetaData* newerfile __attribute__((unused)) = nullptr;
for (unsigned int i = 0; i < c->inputs_[0].size(); i++) {
FileMetaData* f = c->inputs_[0][i];
assert (f->smallest_seqno <= f->largest_seqno);
assert(newerfile == nullptr ||
newerfile->smallest_seqno > f->largest_seqno);
newerfile = f;
}
// The files are sorted from newest first to oldest last.
std::vector<int>& file_by_time = c->input_version_->files_by_size_[level];
// Is the earliest file part of this compaction?
int last_index = file_by_time[file_by_time.size()-1];
FileMetaData* last_file = c->input_version_->files_[level][last_index];
if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) {
c->bottommost_level_ = true;
}
// update statistics
if (options_->statistics != nullptr) {
options_->statistics->measureTime(NUM_FILES_IN_SINGLE_COMPACTION,
c->inputs_[0].size());
}
// mark all the files that are being compacted
c->MarkFilesBeingCompacted(true);
// remember this currently undergoing compaction
compactions_in_progress_[level].insert(c);
// Record whether this compaction includes all sst files.
// For now, it is only relevant in universal compaction mode.
c->is_full_compaction_ =
(c->inputs_[0].size() == c->input_version_->files_[0].size());
return c;
}
//
// Consider compaction files based on their size differences with
// the next file in time order.
//
Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
Version* version, double score, unsigned int ratio,
unsigned int max_number_of_files_to_compact) {
int level = 0;
unsigned int min_merge_width =
options_->compaction_options_universal.min_merge_width;
unsigned int max_merge_width =
options_->compaction_options_universal.max_merge_width;
// The files are sorted from newest first to oldest last.
std::vector<int>& file_by_time = version->files_by_size_[level];
FileMetaData* f = nullptr;
bool done = false;
int start_index = 0;
unsigned int candidate_count;
assert(file_by_time.size() == version->files_[level].size());
unsigned int max_files_to_compact = std::min(max_merge_width,
max_number_of_files_to_compact);
min_merge_width = std::max(min_merge_width, 2U);
// Considers a candidate file only if it is smaller than the
// total size accumulated so far.
for (unsigned int loop = 0; loop < file_by_time.size(); loop++) {
candidate_count = 0;
// Skip files that are already being compacted
for (f = nullptr; loop < file_by_time.size(); loop++) {
int index = file_by_time[loop];
f = version->files_[level][index];
if (!f->being_compacted) {
candidate_count = 1;
break;
}
Log(options_->info_log,
"Universal: file %lu[%d] being compacted, skipping",
(unsigned long)f->number, loop);
f = nullptr;
}
// This file is not being compacted. Consider it as the
// first candidate to be compacted.
uint64_t candidate_size = f != nullptr? f->file_size : 0;
if (f != nullptr) {
Log(options_->info_log, "Universal: Possible candidate file %lu[%d].",
(unsigned long)f->number, loop);
}
// Check if the suceeding files need compaction.
for (unsigned int i = loop+1;
candidate_count < max_files_to_compact && i < file_by_time.size();
i++) {
int index = file_by_time[i];
FileMetaData* f = version->files_[level][index];
if (f->being_compacted) {
break;
}
// pick files if the total candidate file size (increased by the
// specified ratio) is still larger than the next candidate file.
uint64_t sz = (candidate_size * (100L + ratio)) /100;
if (sz < f->file_size) {
break;
}
candidate_count++;
candidate_size += f->file_size;
}
// Found a series of consecutive files that need compaction.
if (candidate_count >= (unsigned int)min_merge_width) {
start_index = loop;
done = true;
break;
} else {
for (unsigned int i = loop;
i < loop + candidate_count && i < file_by_time.size(); i++) {
int index = file_by_time[i];
FileMetaData* f = version->files_[level][index];
Log(options_->info_log,
"Universal: Skipping file %lu[%d] with size %lu %d\n",
(unsigned long)f->number,
i,
(unsigned long)f->file_size,
f->being_compacted);
}
}
}
if (!done || candidate_count <= 1) {
return nullptr;
}
unsigned int first_index_after = start_index + candidate_count;
// Compression is enabled if files compacted earlier already reached
// size ratio of compression.
bool enable_compression = true;
int ratio_to_compress =
options_->compaction_options_universal.compression_size_percent;
if (ratio_to_compress >= 0) {
uint64_t total_size = version->NumLevelBytes(level);
uint64_t older_file_size = 0;
for (unsigned int i = file_by_time.size() - 1; i >= first_index_after;
i--) {
older_file_size += version->files_[level][file_by_time[i]]->file_size;
if (older_file_size * 100L >= total_size * (long) ratio_to_compress) {
enable_compression = false;
break;
}
}
}
Compaction* c =
new Compaction(version, level, level, MaxFileSizeForLevel(level),
LLONG_MAX, false, enable_compression);
c->score_ = score;
for (unsigned int i = start_index; i < first_index_after; i++) {
int index = file_by_time[i];
FileMetaData* f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n",
(unsigned long)f->number,
i,
(unsigned long)f->file_size);
}
return c;
}
// Look at overall size amplification. If size amplification
// exceeeds the configured value, then do a compaction
// of the candidate files all the way upto the earliest
// base file (overrides configured values of file-size ratios,
// min_merge_width and max_merge_width).
//
Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
Version* version, double score) {
int level = 0;
// percentage flexibilty while reducing size amplification
uint64_t ratio = options_->compaction_options_universal.
max_size_amplification_percent;
// The files are sorted from newest first to oldest last.
std::vector<int>& file_by_time = version->files_by_size_[level];
assert(file_by_time.size() == version->files_[level].size());
unsigned int candidate_count = 0;
uint64_t candidate_size = 0;
unsigned int start_index = 0;
FileMetaData* f = nullptr;
// Skip files that are already being compacted
for (unsigned int loop = 0; loop < file_by_time.size() - 1; loop++) {
int index = file_by_time[loop];
f = version->files_[level][index];
if (!f->being_compacted) {
start_index = loop; // Consider this as the first candidate.
break;
}
Log(options_->info_log, "Universal: skipping file %lu[%d] compacted %s",
(unsigned long)f->number,
loop,
" cannot be a candidate to reduce size amp.\n");
f = nullptr;
}
if (f == nullptr) {
return nullptr; // no candidate files
}
Log(options_->info_log, "Universal: First candidate file %lu[%d] %s",
(unsigned long)f->number,
start_index,
" to reduce size amp.\n");
// keep adding up all the remaining files
for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
loop++) {
int index = file_by_time[loop];
f = version->files_[level][index];
if (f->being_compacted) {
Log(options_->info_log,
"Universal: Possible candidate file %lu[%d] %s.",
(unsigned long)f->number,
loop,
" is already being compacted. No size amp reduction possible.\n");
return nullptr;
}
candidate_size += f->file_size;
candidate_count++;
}
if (candidate_count == 0) {
return nullptr;
}
// size of earliest file
int index = file_by_time[file_by_time.size() - 1];
uint64_t earliest_file_size = version->files_[level][index]->file_size;
// size amplification = percentage of additional size
if (candidate_size * 100 < ratio * earliest_file_size) {
Log(options_->info_log,
"Universal: size amp not needed. newer-files-total-size %lu "
"earliest-file-size %lu",
(unsigned long)candidate_size,
(unsigned long)earliest_file_size);
return nullptr;
} else {
Log(options_->info_log,
"Universal: size amp needed. newer-files-total-size %lu "
"earliest-file-size %lu",
(unsigned long)candidate_size,
(unsigned long)earliest_file_size);
}
assert(start_index >= 0 && start_index < file_by_time.size() - 1);
// create a compaction request
// We always compact all the files, so always compress.
Compaction* c =
new Compaction(version, level, level, MaxFileSizeForLevel(level),
LLONG_MAX, false, true);
c->score_ = score;
for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) {
int index = file_by_time[loop];
f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
Log(options_->info_log,
"Universal: size amp picking file %lu[%d] with size %lu",
(unsigned long)f->number,
index,
(unsigned long)f->file_size);
}
return c;
}
} // namespace rocksdb

@ -0,0 +1,152 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "db/version_set.h"
#include "db/compaction.h"
#include "rocksdb/status.h"
#include "rocksdb/options.h"
#include <vector>
#include <memory>
#include <set>
namespace rocksdb {
class Compaction;
class Version;
class CompactionPicker {
public:
CompactionPicker(const Options* options, const InternalKeyComparator* icmp);
virtual ~CompactionPicker();
// See VersionSet::ReduceNumberOfLevels()
void ReduceNumberOfLevels(int new_levels);
// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(Version* version) = 0;
// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
// level that overlaps the specified range. Caller should delete
// the result.
//
// The returned Compaction might not include the whole requested range.
// In that case, compaction_end will be set to the next key that needs
// compacting. In case the compaction will compact the whole range,
// compaction_end will be set to nullptr.
// Client is responsible for compaction_end storage -- when called,
// *compaction_end should point to valid InternalKey!
Compaction* CompactRange(Version* version, int input_level, int output_level,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end);
// Free up the files that participated in a compaction
void ReleaseCompactionFiles(Compaction* c, Status status);
// Return the total amount of data that is undergoing
// compactions per level
void SizeBeingCompacted(std::vector<uint64_t>& sizes);
// Returns maximum total overlap bytes with grandparent
// level (i.e., level+2) before we stop building a single
// file in level->level+1 compaction.
uint64_t MaxGrandParentOverlapBytes(int level);
// Returns maximum total bytes of data on a given level.
double MaxBytesForLevel(int level);
// Get the max file size in a given level.
uint64_t MaxFileSizeForLevel(int level) const;
protected:
int NumberLevels() const { return num_levels_; }
// Stores the minimal range that covers all entries in inputs in
// *smallest, *largest.
// REQUIRES: inputs is not empty
void GetRange(const std::vector<FileMetaData*>& inputs, InternalKey* smallest,
InternalKey* largest);
// Stores the minimal range that covers all entries in inputs1 and inputs2
// in *smallest, *largest.
// REQUIRES: inputs is not empty
void GetRange(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest, InternalKey* largest);
void ExpandWhileOverlapping(Compaction* c);
uint64_t ExpandedCompactionByteSizeLimit(int level);
// Returns true if any one of the specified files are being compacted
bool FilesInCompaction(std::vector<FileMetaData*>& files);
// Returns true if any one of the parent files are being compacted
bool ParentRangeInCompaction(Version* version, const InternalKey* smallest,
const InternalKey* largest, int level,
int* index);
void SetupOtherInputs(Compaction* c);
// record all the ongoing compactions for all levels
std::vector<std::set<Compaction*>> compactions_in_progress_;
// Per-level target file size.
std::unique_ptr<uint64_t[]> max_file_size_;
// Per-level max bytes
std::unique_ptr<uint64_t[]> level_max_bytes_;
const Options* const options_;
private:
void Init();
int num_levels_;
const InternalKeyComparator* const icmp_;
};
class UniversalCompactionPicker : public CompactionPicker {
public:
UniversalCompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version) override;
private:
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionUniversalReadAmp(Version* version, double score,
unsigned int ratio,
unsigned int num_files);
// Pick Universal compaction to limit space amplification.
Compaction* PickCompactionUniversalSizeAmp(Version* version, double score);
};
class LevelCompactionPicker : public CompactionPicker {
public:
LevelCompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version) override;
private:
// For the specfied level, pick a compaction.
// Returns nullptr if there is no compaction to be done.
// If level is 0 and there is already a compaction on that level, this
// function will return nullptr.
Compaction* PickCompactionBySize(Version* version, int level, double score);
};
} // namespace rocksdb

@ -784,7 +784,7 @@ int Version::PickLevelForMemTableOutput(
}
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const uint64_t sum = TotalFileSize(overlaps);
if (sum > vset_->MaxGrandParentOverlapBytes(level)) {
if (sum > vset_->compaction_picker_->MaxGrandParentOverlapBytes(level)) {
break;
}
level++;
@ -1361,13 +1361,16 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
dummy_versions_(this),
current_(nullptr),
need_slowdown_for_num_level0_files_(false),
compactions_in_progress_(options_->num_levels),
current_version_number_(0),
manifest_file_size_(0),
storage_options_(storage_options),
storage_options_compactions_(storage_options_) {
compact_pointer_ = new std::string[options_->num_levels];
Init(options_->num_levels);
if (options_->compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(new UniversalCompactionPicker(options_, &icmp_));
} else {
compaction_picker_.reset(new LevelCompactionPicker(options_, &icmp_));
}
AppendVersion(new Version(this, current_version_number_++));
}
@ -1379,28 +1382,6 @@ VersionSet::~VersionSet() {
}
obsolete_files_.clear();
delete[] compact_pointer_;
delete[] max_file_size_;
delete[] level_max_bytes_;
}
void VersionSet::Init(int num_levels) {
max_file_size_ = new uint64_t[num_levels];
level_max_bytes_ = new uint64_t[num_levels];
int target_file_size_multiplier = options_->target_file_size_multiplier;
int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
for (int i = 0; i < num_levels; i++) {
if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) {
max_file_size_[i] = ULLONG_MAX;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
} else if (i > 1) {
max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier;
level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier *
options_->max_bytes_for_level_multiplier_additional[i-1];
} else {
max_file_size_[i] = options_->target_file_size_base;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
}
}
}
void VersionSet::AppendVersion(Version* v) {
@ -1479,7 +1460,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
{
// calculate the amount of data being compacted at every level
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
SizeBeingCompacted(size_being_compacted);
compaction_picker_->SizeBeingCompacted(size_being_compacted);
mu->Unlock();
@ -1732,7 +1713,7 @@ Status VersionSet::Recover() {
// Install recovered version
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
SizeBeingCompacted(size_being_compacted);
compaction_picker_->SizeBeingCompacted(size_being_compacted);
v->Finalize(size_being_compacted);
manifest_file_size_ = manifest_file_size;
@ -1864,7 +1845,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
// Install recovered version
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
SizeBeingCompacted(size_being_compacted);
compaction_picker_->SizeBeingCompacted(size_being_compacted);
v->Finalize(size_being_compacted);
AppendVersion(v);
@ -2011,41 +1992,16 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
}
}
// Stores the minimal range that covers all entries in inputs in
// *smallest, *largest.
// REQUIRES: inputs is not empty
void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest,
InternalKey* largest) {
assert(!inputs.empty());
smallest->Clear();
largest->Clear();
for (size_t i = 0; i < inputs.size(); i++) {
FileMetaData* f = inputs[i];
if (i == 0) {
*smallest = f->smallest;
*largest = f->largest;
} else {
if (icmp_.Compare(f->smallest, *smallest) < 0) {
*smallest = f->smallest;
}
if (icmp_.Compare(f->largest, *largest) > 0) {
*largest = f->largest;
}
}
}
Compaction* VersionSet::PickCompaction() {
return compaction_picker_->PickCompaction(current_);
}
// Stores the minimal range that covers all entries in inputs1 and inputs2
// in *smallest, *largest.
// REQUIRES: inputs is not empty
void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest,
InternalKey* largest) {
std::vector<FileMetaData*> all = inputs1;
all.insert(all.end(), inputs2.begin(), inputs2.end());
GetRange(all, smallest, largest);
Compaction* VersionSet::CompactRange(int input_level, int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
return compaction_picker_->CompactRange(current_, input_level, output_level,
begin, end, compaction_end);
}
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
@ -2085,29 +2041,11 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
}
double VersionSet::MaxBytesForLevel(int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
assert(level >= 0);
assert(level < NumberLevels());
return level_max_bytes_[level];
return compaction_picker_->MaxBytesForLevel(level);
}
uint64_t VersionSet::MaxFileSizeForLevel(int level) {
assert(level >= 0);
assert(level < NumberLevels());
return max_file_size_[level];
}
uint64_t VersionSet::ExpandedCompactionByteSizeLimit(int level) {
uint64_t result = MaxFileSizeForLevel(level);
result *= options_->expanded_compaction_factor;
return result;
}
uint64_t VersionSet::MaxGrandParentOverlapBytes(int level) {
uint64_t result = MaxFileSizeForLevel(level);
result *= options_->max_grandparent_overlap_factor;
return result;
return compaction_picker_->MaxFileSizeForLevel(level);
}
// verify that the files listed in this compaction are present
@ -2158,697 +2096,8 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
return true; // everything good
}
// Clear all files to indicate that they are not being compacted
// Delete this compaction from the list of running compactions.
void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) {
c->MarkFilesBeingCompacted(false);
compactions_in_progress_[c->level()].erase(c);
if (!status.ok()) {
c->ResetNextCompactionIndex();
}
}
// The total size of files that are currently being compacted
// at at every level upto the penultimate level.
void VersionSet::SizeBeingCompacted(std::vector<uint64_t>& sizes) {
for (int level = 0; level < NumberLevels() - 1; level++) {
uint64_t total = 0;
for (std::set<Compaction*>::iterator it =
compactions_in_progress_[level].begin();
it != compactions_in_progress_[level].end();
++it) {
Compaction* c = (*it);
assert(c->level() == level);
for (int i = 0; i < c->num_input_files(0); i++) {
total += c->input(0,i)->file_size;
}
}
sizes[level] = total;
}
}
//
// Look at overall size amplification. If size amplification
// exceeeds the configured value, then do a compaction
// of the candidate files all the way upto the earliest
// base file (overrides configured values of file-size ratios,
// min_merge_width and max_merge_width).
//
Compaction* VersionSet::PickCompactionUniversalSizeAmp(int level,
double score) {
assert (level == 0);
// percentage flexibilty while reducing size amplification
uint64_t ratio = options_->compaction_options_universal.
max_size_amplification_percent;
// The files are sorted from newest first to oldest last.
std::vector<int>& file_by_time = current_->files_by_size_[level];
assert(file_by_time.size() == current_->files_[level].size());
unsigned int candidate_count = 0;
uint64_t candidate_size = 0;
unsigned int start_index = 0;
FileMetaData* f = nullptr;
// Skip files that are already being compacted
for (unsigned int loop = 0; loop < file_by_time.size() - 1; loop++) {
int index = file_by_time[loop];
f = current_->files_[level][index];
if (!f->being_compacted) {
start_index = loop; // Consider this as the first candidate.
break;
}
Log(options_->info_log, "Universal: skipping file %lu[%d] compacted %s",
(unsigned long)f->number,
loop,
" cannot be a candidate to reduce size amp.\n");
f = nullptr;
}
if (f == nullptr) {
return nullptr; // no candidate files
}
Log(options_->info_log, "Universal: First candidate file %lu[%d] %s",
(unsigned long)f->number,
start_index,
" to reduce size amp.\n");
// keep adding up all the remaining files
for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
loop++) {
int index = file_by_time[loop];
f = current_->files_[level][index];
if (f->being_compacted) {
Log(options_->info_log,
"Universal: Possible candidate file %lu[%d] %s.",
(unsigned long)f->number,
loop,
" is already being compacted. No size amp reduction possible.\n");
return nullptr;
}
candidate_size += f->file_size;
candidate_count++;
}
if (candidate_count == 0) {
return nullptr;
}
// size of earliest file
int index = file_by_time[file_by_time.size() - 1];
uint64_t earliest_file_size = current_->files_[level][index]->file_size;
// size amplification = percentage of additional size
if (candidate_size * 100 < ratio * earliest_file_size) {
Log(options_->info_log,
"Universal: size amp not needed. newer-files-total-size %lu "
"earliest-file-size %lu",
(unsigned long)candidate_size,
(unsigned long)earliest_file_size);
return nullptr;
} else {
Log(options_->info_log,
"Universal: size amp needed. newer-files-total-size %lu "
"earliest-file-size %lu",
(unsigned long)candidate_size,
(unsigned long)earliest_file_size);
}
assert(start_index >= 0 && start_index < file_by_time.size() - 1);
// create a compaction request
// We always compact all the files, so always compress.
Compaction* c =
new Compaction(current_, level, level, MaxFileSizeForLevel(level),
LLONG_MAX, false, true);
c->score_ = score;
for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) {
int index = file_by_time[loop];
f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
Log(options_->info_log,
"Universal: size amp picking file %lu[%d] with size %lu",
(unsigned long)f->number,
index,
(unsigned long)f->file_size);
}
return c;
}
//
// Consider compaction files based on their size differences with
// the next file in time order.
//
Compaction* VersionSet::PickCompactionUniversalReadAmp(
int level, double score, unsigned int ratio,
unsigned int max_number_of_files_to_compact) {
unsigned int min_merge_width =
options_->compaction_options_universal.min_merge_width;
unsigned int max_merge_width =
options_->compaction_options_universal.max_merge_width;
// The files are sorted from newest first to oldest last.
std::vector<int>& file_by_time = current_->files_by_size_[level];
FileMetaData* f = nullptr;
bool done = false;
int start_index = 0;
unsigned int candidate_count;
assert(file_by_time.size() == current_->files_[level].size());
unsigned int max_files_to_compact = std::min(max_merge_width,
max_number_of_files_to_compact);
min_merge_width = std::max(min_merge_width, 2U);
// Considers a candidate file only if it is smaller than the
// total size accumulated so far.
for (unsigned int loop = 0; loop < file_by_time.size(); loop++) {
candidate_count = 0;
// Skip files that are already being compacted
for (f = nullptr; loop < file_by_time.size(); loop++) {
int index = file_by_time[loop];
f = current_->files_[level][index];
if (!f->being_compacted) {
candidate_count = 1;
break;
}
Log(options_->info_log,
"Universal: file %lu[%d] being compacted, skipping",
(unsigned long)f->number, loop);
f = nullptr;
}
// This file is not being compacted. Consider it as the
// first candidate to be compacted.
uint64_t candidate_size = f != nullptr? f->file_size : 0;
if (f != nullptr) {
Log(options_->info_log, "Universal: Possible candidate file %lu[%d].",
(unsigned long)f->number, loop);
}
// Check if the suceeding files need compaction.
for (unsigned int i = loop+1;
candidate_count < max_files_to_compact && i < file_by_time.size();
i++) {
int index = file_by_time[i];
FileMetaData* f = current_->files_[level][index];
if (f->being_compacted) {
break;
}
// pick files if the total candidate file size (increased by the
// specified ratio) is still larger than the next candidate file.
uint64_t sz = (candidate_size * (100L + ratio)) /100;
if (sz < f->file_size) {
break;
}
candidate_count++;
candidate_size += f->file_size;
}
// Found a series of consecutive files that need compaction.
if (candidate_count >= (unsigned int)min_merge_width) {
start_index = loop;
done = true;
break;
} else {
for (unsigned int i = loop;
i < loop + candidate_count && i < file_by_time.size(); i++) {
int index = file_by_time[i];
FileMetaData* f = current_->files_[level][index];
Log(options_->info_log,
"Universal: Skipping file %lu[%d] with size %lu %d\n",
(unsigned long)f->number,
i,
(unsigned long)f->file_size,
f->being_compacted);
}
}
}
if (!done || candidate_count <= 1) {
return nullptr;
}
unsigned int first_index_after = start_index + candidate_count;
// Compression is enabled if files compacted earlier already reached
// size ratio of compression.
bool enable_compression = true;
int ratio_to_compress =
options_->compaction_options_universal.compression_size_percent;
if (ratio_to_compress >= 0) {
uint64_t total_size = TotalFileSize(current_->files_[level]);
uint64_t older_file_size = 0;
for (unsigned int i = file_by_time.size() - 1; i >= first_index_after;
i--) {
older_file_size += current_->files_[level][file_by_time[i]]->file_size;
if (older_file_size * 100L >= total_size * (long) ratio_to_compress) {
enable_compression = false;
break;
}
}
}
Compaction* c =
new Compaction(current_, level, level, MaxFileSizeForLevel(level),
LLONG_MAX, false, enable_compression);
c->score_ = score;
for (unsigned int i = start_index; i < first_index_after; i++) {
int index = file_by_time[i];
FileMetaData* f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n",
(unsigned long)f->number,
i,
(unsigned long)f->file_size);
}
return c;
}
//
// Universal style of compaction. Pick files that are contiguous in
// time-range to compact.
//
Compaction* VersionSet::PickCompactionUniversal(int level, double score) {
assert (level == 0);
if ((current_->files_[level].size() <
(unsigned int)options_->level0_file_num_compaction_trigger)) {
Log(options_->info_log, "Universal: nothing to do\n");
return nullptr;
}
Version::FileSummaryStorage tmp;
Log(options_->info_log, "Universal: candidate files(%lu): %s\n",
current_->files_[level].size(),
current_->LevelFileSummary(&tmp, 0));
// Check for size amplification first.
Compaction* c = PickCompactionUniversalSizeAmp(level, score);
if (c == nullptr) {
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio = options_->compaction_options_universal.size_ratio;
c = PickCompactionUniversalReadAmp(level, score, ratio, UINT_MAX);
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
if (c == nullptr) {
unsigned int num_files = current_->files_[level].size() -
options_->level0_file_num_compaction_trigger;
c = PickCompactionUniversalReadAmp(level, score, UINT_MAX, num_files);
}
}
if (c == nullptr) {
return nullptr;
}
assert(c->inputs_[0].size() > 1);
// validate that all the chosen files are non overlapping in time
FileMetaData* newerfile __attribute__((unused)) = nullptr;
for (unsigned int i = 0; i < c->inputs_[0].size(); i++) {
FileMetaData* f = c->inputs_[0][i];
assert (f->smallest_seqno <= f->largest_seqno);
assert(newerfile == nullptr ||
newerfile->smallest_seqno > f->largest_seqno);
newerfile = f;
}
// The files are sorted from newest first to oldest last.
std::vector<int>& file_by_time = c->input_version_->files_by_size_[level];
// Is the earliest file part of this compaction?
int last_index = file_by_time[file_by_time.size()-1];
FileMetaData* last_file = c->input_version_->files_[level][last_index];
if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) {
c->bottommost_level_ = true;
}
// update statistics
if (options_->statistics != nullptr) {
options_->statistics->measureTime(NUM_FILES_IN_SINGLE_COMPACTION,
c->inputs_[0].size());
}
// mark all the files that are being compacted
c->MarkFilesBeingCompacted(true);
// remember this currently undergoing compaction
compactions_in_progress_[level].insert(c);
// Record whether this compaction includes all sst files.
// For now, it is only relevant in universal compaction mode.
c->is_full_compaction_ =
(c->inputs_[0].size() == c->input_version_->files_[0].size());
return c;
}
Compaction* VersionSet::PickCompactionBySize(int level, double score) {
Compaction* c = nullptr;
// level 0 files are overlapping. So we cannot pick more
// than one concurrent compactions at this level. This
// could be made better by looking at key-ranges that are
// being compacted at level 0.
if (level == 0 && compactions_in_progress_[level].size() == 1) {
return nullptr;
}
assert(level >= 0);
assert(level + 1 < current_->NumberLevels());
c = new Compaction(current_, level, level + 1, MaxFileSizeForLevel(level + 1),
MaxGrandParentOverlapBytes(level));
c->score_ = score;
// Pick the largest file in this level that is not already
// being compacted
std::vector<int>& file_size = c->input_version_->files_by_size_[level];
// record the first file that is not yet compacted
int nextIndex = -1;
for (unsigned int i = c->input_version_->next_file_to_compact_by_size_[level];
i < file_size.size(); i++) {
int index = file_size[i];
FileMetaData* f = c->input_version_->files_[level][index];
// check to verify files are arranged in descending size
assert((i == file_size.size() - 1) ||
(i >= Version::number_of_files_to_sort_ - 1) ||
(f->file_size >=
c->input_version_->files_[level][file_size[i + 1]]->file_size));
// do not pick a file to compact if it is being compacted
// from n-1 level.
if (f->being_compacted) {
continue;
}
// remember the startIndex for the next call to PickCompaction
if (nextIndex == -1) {
nextIndex = i;
}
//if (i > Version::number_of_files_to_sort_) {
// Log(options_->info_log, "XXX Looking at index %d", i);
//}
// Do not pick this file if its parents at level+1 are being compacted.
// Maybe we can avoid redoing this work in SetupOtherInputs
int parent_index = -1;
if (ParentRangeInCompaction(&f->smallest, &f->largest, level,
&parent_index)) {
continue;
}
c->inputs_[0].push_back(f);
c->base_index_ = index;
c->parent_index_ = parent_index;
break;
}
if (c->inputs_[0].empty()) {
delete c;
c = nullptr;
}
// store where to start the iteration in the next call to PickCompaction
c->input_version_->next_file_to_compact_by_size_[level] = nextIndex;
return c;
}
Compaction* VersionSet::PickCompaction() {
Compaction* c = nullptr;
int level = -1;
// Compute the compactions needed. It is better to do it here
// and also in LogAndApply(), otherwise the values could be stale.
std::vector<uint64_t> size_being_compacted(NumberLevels()-1);
current_->vset_->SizeBeingCompacted(size_being_compacted);
current_->Finalize(size_being_compacted);
// In universal style of compaction, compact L0 files back into L0.
if (options_->compaction_style == kCompactionStyleUniversal) {
int level = 0;
c = PickCompactionUniversal(level, current_->compaction_score_[level]);
return c;
}
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
//
// Find the compactions by size on all levels.
for (int i = 0; i < NumberLevels()-1; i++) {
assert(i == 0 || current_->compaction_score_[i] <=
current_->compaction_score_[i-1]);
level = current_->compaction_level_[i];
if ((current_->compaction_score_[i] >= 1)) {
c = PickCompactionBySize(level, current_->compaction_score_[i]);
ExpandWhileOverlapping(c);
if (c != nullptr) {
break;
}
}
}
// Find compactions needed by seeks
FileMetaData* f = current_->file_to_compact_;
if (c == nullptr && f != nullptr && !f->being_compacted) {
level = current_->file_to_compact_level_;
int parent_index = -1;
// Only allow one level 0 compaction at a time.
// Do not pick this file if its parents at level+1 are being compacted.
if (level != 0 || compactions_in_progress_[0].empty()) {
if(!ParentRangeInCompaction(&f->smallest, &f->largest, level,
&parent_index)) {
c = new Compaction(current_, level, level + 1,
MaxFileSizeForLevel(level + 1),
MaxGrandParentOverlapBytes(level), true);
c->inputs_[0].push_back(f);
c->parent_index_ = parent_index;
c->input_version_->file_to_compact_ = nullptr;
ExpandWhileOverlapping(c);
}
}
}
if (c == nullptr) {
return nullptr;
}
// Two level 0 compaction won't run at the same time, so don't need to worry
// about files on level 0 being compacted.
if (level == 0) {
assert(compactions_in_progress_[0].empty());
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
c->inputs_[0].clear();
c->input_version_->GetOverlappingInputs(0, &smallest, &largest,
&c->inputs_[0]);
// If we include more L0 files in the same compaction run it can
// cause the 'smallest' and 'largest' key to get extended to a
// larger range. So, re-invoke GetRange to get the new key range
GetRange(c->inputs_[0], &smallest, &largest);
if (ParentRangeInCompaction(&smallest, &largest,
level, &c->parent_index_)) {
delete c;
return nullptr;
}
assert(!c->inputs_[0].empty());
}
// Setup "level+1" files (inputs_[1])
SetupOtherInputs(c);
// mark all the files that are being compacted
c->MarkFilesBeingCompacted(true);
// Is this compaction creating a file at the bottommost level
c->SetupBottomMostLevel(false);
// remember this currently undergoing compaction
compactions_in_progress_[level].insert(c);
return c;
}
// Returns true if any one of the parent files are being compacted
bool VersionSet::ParentRangeInCompaction(const InternalKey* smallest,
const InternalKey* largest, int level,
int* parent_index) {
std::vector<FileMetaData*> inputs;
assert(level + 1 < current_->NumberLevels());
current_->GetOverlappingInputs(level + 1, smallest, largest, &inputs,
*parent_index, parent_index);
return FilesInCompaction(inputs);
}
// Returns true if any one of specified files are being compacted
bool VersionSet::FilesInCompaction(std::vector<FileMetaData*>& files) {
for (unsigned int i = 0; i < files.size(); i++) {
if (files[i]->being_compacted) {
return true;
}
}
return false;
}
// Add more files to the inputs on "level" to make sure that
// no newer version of a key is compacted to "level+1" while leaving an older
// version in a "level". Otherwise, any Get() will search "level" first,
// and will likely return an old/stale value for the key, since it always
// searches in increasing order of level to find the value. This could
// also scramble the order of merge operands. This function should be
// called any time a new Compaction is created, and its inputs_[0] are
// populated.
//
// Will set c to nullptr if it is impossible to apply this compaction.
void VersionSet::ExpandWhileOverlapping(Compaction* c) {
// If inputs are empty then there is nothing to expand.
if (!c || c->inputs_[0].empty()) {
return;
}
// GetOverlappingInputs will always do the right thing for level-0.
// So we don't need to do any expansion if level == 0.
if (c->level() == 0) {
return;
}
const int level = c->level();
InternalKey smallest, largest;
// Keep expanding c->inputs_[0] until we are sure that there is a
// "clean cut" boundary between the files in input and the surrounding files.
// This will ensure that no parts of a key are lost during compaction.
int hint_index = -1;
size_t old_size;
do {
old_size = c->inputs_[0].size();
GetRange(c->inputs_[0], &smallest, &largest);
c->inputs_[0].clear();
c->input_version_->GetOverlappingInputs(
level, &smallest, &largest, &c->inputs_[0], hint_index, &hint_index);
} while(c->inputs_[0].size() > old_size);
// Get the new range
GetRange(c->inputs_[0], &smallest, &largest);
// If, after the expansion, there are files that are already under
// compaction, then we must drop/cancel this compaction.
int parent_index = -1;
if (FilesInCompaction(c->inputs_[0]) ||
(c->level() != c->output_level() &&
ParentRangeInCompaction(&smallest, &largest, level, &parent_index))) {
c->inputs_[0].clear();
c->inputs_[1].clear();
delete c;
c = nullptr;
}
}
// Populates the set of inputs from "level+1" that overlap with "level".
// Will also attempt to expand "level" if that doesn't expand "level+1"
// or cause "level" to include a file for compaction that has an overlapping
// user-key with another file.
void VersionSet::SetupOtherInputs(Compaction* c) {
// If inputs are empty, then there is nothing to expand.
// If both input and output levels are the same, no need to consider
// files at level "level+1"
if (c->inputs_[0].empty() || c->level() == c->output_level()) {
return;
}
const int level = c->level();
InternalKey smallest, largest;
// Get the range one last time.
GetRange(c->inputs_[0], &smallest, &largest);
// Populate the set of next-level files (inputs_[1]) to include in compaction
c->input_version_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1], c->parent_index_,
&c->parent_index_);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
// See if we can further grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up. We also choose NOT
// to expand if this would cause "level" to include some entries for some
// user key, while excluding other entries for the same user key. This
// can happen when one user key spans multiple files.
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
c->input_version_->GetOverlappingInputs(
level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr);
const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const uint64_t expanded0_size = TotalFileSize(expanded0);
uint64_t limit = ExpandedCompactionByteSizeLimit(level);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < limit &&
!FilesInCompaction(expanded0) &&
!c->input_version_->HasOverlappingUserKey(&expanded0, level)) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
c->input_version_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
&expanded1, c->parent_index_,
&c->parent_index_);
if (expanded1.size() == c->inputs_[1].size() &&
!FilesInCompaction(expanded1)) {
Log(options_->info_log,
"Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)"
"\n",
(unsigned long)level,
(unsigned long)(c->inputs_[0].size()),
(unsigned long)(c->inputs_[1].size()),
(unsigned long)inputs0_size,
(unsigned long)inputs1_size,
(unsigned long)(expanded0.size()),
(unsigned long)(expanded1.size()),
(unsigned long)expanded0_size,
(unsigned long)inputs1_size);
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (level + 2 < NumberLevels()) {
c->input_version_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}
if (false) {
Log(options_->info_log, "Compacting %d '%s' .. '%s'",
level,
smallest.DebugString().c_str(),
largest.DebugString().c_str());
}
// Update the place where we will do the next compaction for this level.
// We update this immediately instead of waiting for the VersionEdit
// to be applied so that if the compaction fails, we will try a different
// key range next time.
compact_pointer_[level] = largest.Encode().ToString();
c->edit_->SetCompactPointer(level, largest);
compaction_picker_->ReleaseCompactionFiles(c, status);
}
Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
@ -2890,69 +2139,4 @@ void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files) {
obsolete_files_.clear();
}
Compaction* VersionSet::CompactRange(int input_level,
int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
std::vector<FileMetaData*> inputs;
bool covering_the_whole_range = true;
// All files are 'overlapping' in universal style compaction.
// We have to compact the entire range in one shot.
if (options_->compaction_style == kCompactionStyleUniversal) {
begin = nullptr;
end = nullptr;
}
current_->GetOverlappingInputs(input_level, begin, end, &inputs);
if (inputs.empty()) {
return nullptr;
}
// Avoid compacting too much in one shot in case the range is large.
// But we cannot do this for level-0 since level-0 files can overlap
// and we must not pick one file and drop another older file if the
// two files overlap.
if (input_level > 0) {
const uint64_t limit =
MaxFileSizeForLevel(input_level) * options_->source_compaction_factor;
uint64_t total = 0;
for (size_t i = 0; i + 1 < inputs.size(); ++i) {
uint64_t s = inputs[i]->file_size;
total += s;
if (total >= limit) {
**compaction_end = inputs[i + 1]->smallest;
covering_the_whole_range = false;
inputs.resize(i + 1);
break;
}
}
}
Compaction* c = new Compaction(current_, input_level, output_level,
MaxFileSizeForLevel(output_level),
MaxGrandParentOverlapBytes(input_level));
c->inputs_[0] = inputs;
ExpandWhileOverlapping(c);
if (c == nullptr) {
Log(options_->info_log, "Could not compact due to expansion failure.\n");
return nullptr;
}
SetupOtherInputs(c);
if (covering_the_whole_range) {
*compaction_end = nullptr;
}
// These files that are to be manaully compacted do not trample
// upon other files because manual compactions are processed when
// the system has a max of 1 background compaction thread.
c->MarkFilesBeingCompacted(true);
// Is this compaction creating a file at the bottommost level
c->SetupBottomMostLevel(true);
return c;
}
} // namespace rocksdb

@ -28,12 +28,14 @@
#include "port/port.h"
#include "db/table_cache.h"
#include "db/compaction.h"
#include "db/compaction_picker.h"
namespace rocksdb {
namespace log { class Writer; }
class Compaction;
class CompactionPicker;
class Iterator;
class MemTable;
class TableCache;
@ -185,6 +187,9 @@ class Version {
friend class Compaction;
friend class VersionSet;
friend class DBImpl;
friend class CompactionPicker;
friend class LevelCompactionPicker;
friend class UniversalCompactionPicker;
class LevelFileNumIterator;
Iterator* NewConcatenatingIterator(const ReadOptions&,
@ -407,35 +412,18 @@ class VersionSet {
// Return the size of the current manifest file
uint64_t ManifestFileSize() const { return manifest_file_size_; }
// For the specfied level, pick a compaction.
// Returns nullptr if there is no compaction to be done.
// If level is 0 and there is already a compaction on that level, this
// function will return nullptr.
Compaction* PickCompactionBySize(int level, double score);
// Pick files to compact in Universal mode
Compaction* PickCompactionUniversal(int level, double score);
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionUniversalReadAmp(int level, double score,
unsigned int ratio, unsigned int num_files);
// Pick Universal compaction to limit space amplification.
Compaction* PickCompactionUniversalSizeAmp(int level, double score);
// Free up the files that were participated in a compaction
void ReleaseCompactionFiles(Compaction* c, Status status);
// verify that the files that we started with for a compaction
// still exist in the current version and in the same original level.
// This ensures that a concurrent compaction did not erroneously
// pick the same files to compact.
bool VerifyCompactionFileConsistency(Compaction* c);
double MaxBytesForLevel(int level);
// Get the max file size in a given level.
uint64_t MaxFileSizeForLevel(int level);
double MaxBytesForLevel(int level);
void ReleaseCompactionFiles(Compaction* c, Status status);
Status GetMetadataForFile(
uint64_t number, int *filelevel, FileMetaData *metadata);
@ -452,21 +440,6 @@ class VersionSet {
friend class Compaction;
friend class Version;
void Init(int num_levels);
void GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest,
InternalKey* largest);
void GetRange2(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest,
InternalKey* largest);
void ExpandWhileOverlapping(Compaction* c);
void SetupOtherInputs(Compaction* c);
// Save current contents to *log
Status WriteSnapshot(log::Writer* log);
@ -474,10 +447,6 @@ class VersionSet {
bool ManifestContains(const std::string& record) const;
uint64_t ExpandedCompactionByteSizeLimit(int level);
uint64_t MaxGrandParentOverlapBytes(int level);
Env* const env_;
const std::string dbname_;
const Options* const options_;
@ -504,14 +473,9 @@ class VersionSet {
// Either an empty string, or a valid InternalKey.
std::string* compact_pointer_;
// Per-level target file size.
uint64_t* max_file_size_;
// Per-level max bytes
uint64_t* level_max_bytes_;
// record all the ongoing compactions for all levels
std::vector<std::set<Compaction*> > compactions_in_progress_;
// An object that keeps all the compaction stats
// and picks the next compaction
std::unique_ptr<CompactionPicker> compaction_picker_;
// generates a increasing version number for every new version
uint64_t current_version_number_;
@ -535,17 +499,6 @@ class VersionSet {
VersionSet(const VersionSet&);
void operator=(const VersionSet&);
// Return the total amount of data that is undergoing
// compactions per level
void SizeBeingCompacted(std::vector<uint64_t>&);
// Returns true if any one of the parent files are being compacted
bool ParentRangeInCompaction(const InternalKey* smallest,
const InternalKey* largest, int level, int* index);
// Returns true if any one of the specified files are being compacted
bool FilesInCompaction(std::vector<FileMetaData*>& files);
void LogAndApplyHelper(Builder*b, Version* v,
VersionEdit* edit, port::Mutex* mu);
};

@ -68,11 +68,9 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
current_version->num_levels_ = new_levels;
delete[] compact_pointer_;
delete[] max_file_size_;
delete[] level_max_bytes_;
num_levels_ = new_levels;
compact_pointer_ = new std::string[new_levels];
Init(new_levels);
compaction_picker_->ReduceNumberOfLevels(new_levels);
VersionEdit ve;
st = LogAndApply(&ve, mu, true);
return st;

Loading…
Cancel
Save