Refactor compaction picker code

Summary:
1. Move universal compaction picker to separate files compaction_picker_universal.cc and compaction_picker_universal.h.
2. Rename some functions to make the code easier to understand.
3. Move leveled compaction picking code to a dedicated class, so that we we don't need to pass some common variable around when calling functions. It also allowed us to break down LevelCompactionPicker::PickCompaction() to smaller functions.
Closes https://github.com/facebook/rocksdb/pull/2100

Differential Revision: D4845948

Pulled By: siying

fbshipit-source-id: efa0ab4
main
Siying Dong 7 years ago committed by Facebook Github Bot
parent 9e72939029
commit ff97287016
  1. 1
      CMakeLists.txt
  2. 1
      db/column_family.cc
  3. 1096
      db/compaction_picker.cc
  4. 154
      db/compaction_picker.h
  5. 3
      db/compaction_picker_test.cc
  6. 735
      db/compaction_picker_universal.cc
  7. 91
      db/compaction_picker_universal.h
  8. 8
      db/db_impl_compaction_flush.cc
  9. 2
      db/db_impl_open.cc
  10. 2
      db/db_impl_write.cc
  11. 1
      src.mk

@ -291,6 +291,7 @@ set(SOURCES
db/compaction_iterator.cc
db/compaction_job.cc
db/compaction_picker.cc
db/compaction_picker_universal.cc
db/convenience.cc
db/db_filesnapshot.cc
db/db_impl.cc

@ -20,6 +20,7 @@
#include <limits>
#include "db/compaction_picker.h"
#include "db/compaction_picker_universal.h"
#include "db/db_impl.h"
#include "db/internal_stats.h"
#include "db/job_context.h"

File diff suppressed because it is too large Load Diff

@ -84,15 +84,15 @@ class CompactionPicker {
void ReleaseCompactionFiles(Compaction* c, Status status);
// Returns true if any one of the specified files are being compacted
bool FilesInCompaction(const std::vector<FileMetaData*>& files);
bool AreFilesInCompaction(const std::vector<FileMetaData*>& files);
// Takes a list of CompactionInputFiles and returns a (manual) Compaction
// object.
Compaction* FormCompaction(
const CompactionOptions& compact_options,
const std::vector<CompactionInputFiles>& input_files, int output_level,
VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
uint32_t output_path_id);
Compaction* CompactFiles(const CompactionOptions& compact_options,
const std::vector<CompactionInputFiles>& input_files,
int output_level, VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options,
uint32_t output_path_id);
// Converts a set of compaction input file numbers into
// a list of CompactionInputFiles.
@ -102,12 +102,6 @@ class CompactionPicker {
const VersionStorageInfo* vstorage,
const CompactionOptions& compact_options) const;
// Used in universal compaction when the enabled_trivial_move
// option is set. Checks whether there are any overlapping files
// in the input. Returns true if the input files are non
// overlapping.
bool IsInputNonOverlapping(Compaction* c);
// Is there currently a compaction involving level 0 taking place
bool IsLevel0CompactionInProgress() const {
return !level0_compactions_in_progress_.empty();
@ -138,7 +132,6 @@ class CompactionPicker {
void GetRange(const std::vector<CompactionInputFiles>& inputs,
InternalKey* smallest, InternalKey* largest) const;
protected:
int NumberLevels() const { return ioptions_.num_levels; }
// Add more files to the inputs on "level" to make sure that
@ -151,14 +144,14 @@ class CompactionPicker {
// populated.
//
// Will return false if it is impossible to apply this compaction.
bool ExpandWhileOverlapping(const std::string& cf_name,
bool ExpandInputsToCleanCut(const std::string& cf_name,
VersionStorageInfo* vstorage,
CompactionInputFiles* inputs);
// Returns true if any one of the parent files are being compacted
bool RangeInCompaction(VersionStorageInfo* vstorage,
const InternalKey* smallest,
const InternalKey* largest, int level, int* index);
bool IsRangeInCompaction(VersionStorageInfo* vstorage,
const InternalKey* smallest,
const InternalKey* largest, int level, int* index);
// Returns true if the key range that `inputs` files cover overlap with the
// key range of a currently running compaction.
@ -177,6 +170,20 @@ class CompactionPicker {
const CompactionInputFiles& output_level_inputs,
std::vector<FileMetaData*>* grandparents);
// Register this compaction in the set of running compactions
void RegisterCompaction(Compaction* c);
// Remove this compaction from the set of running compactions
void UnregisterCompaction(Compaction* c);
std::set<Compaction*>* level0_compactions_in_progress() {
return &level0_compactions_in_progress_;
}
std::unordered_set<Compaction*>* compactions_in_progress() {
return &compactions_in_progress_;
}
protected:
const ImmutableCFOptions& ioptions_;
// A helper function to SanitizeCompactionInputFiles() that
@ -187,12 +194,6 @@ class CompactionPicker {
const ColumnFamilyMetaData& cf_meta, const int output_level) const;
#endif // ROCKSDB_LITE
// Register this compaction in the set of running compactions
void RegisterCompaction(Compaction* c);
// Remove this compaction from the set of running compactions
void UnregisterCompaction(Compaction* c);
// Keeps track of all compactions that are running on Level0.
// Protected by DB mutex
std::set<Compaction*> level0_compactions_in_progress_;
@ -216,116 +217,9 @@ class LevelCompactionPicker : public CompactionPicker {
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
// Pick a path ID to place a newly generated file, with its level
static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
int level);
private:
// For the specfied level, pick a file that we want to compact.
// Returns false if there is no file to compact.
// If it returns true, inputs->files.size() will be exactly one.
// If level is 0 and there is already a compaction on that level, this
// function will return false.
bool PickCompactionBySize(VersionStorageInfo* vstorage, int level,
int output_level, CompactionInputFiles* inputs,
int* parent_index, int* base_index);
// For L0->L0, picks the longest span of files that aren't currently
// undergoing compaction for which work-per-deleted-file decreases. The span
// always starts from the newest L0 file.
//
// Intra-L0 compaction is independent of all other files, so it can be
// performed even when L0->base_level compactions are blocked.
//
// Returns true if `inputs` is populated with a span of files to be compacted;
// otherwise, returns false.
bool PickIntraL0Compaction(VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options,
CompactionInputFiles* inputs);
// If there is any file marked for compaction, put put it into inputs.
// This is still experimental. It will return meaningful results only if
// clients call experimental feature SuggestCompactRange()
void PickFilesMarkedForCompactionExperimental(const std::string& cf_name,
VersionStorageInfo* vstorage,
CompactionInputFiles* inputs,
int* level, int* output_level);
static const int kMinFilesForIntraL0Compaction = 4;
};
#ifndef ROCKSDB_LITE
class UniversalCompactionPicker : public CompactionPicker {
public:
UniversalCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
virtual int MaxOutputLevel() const override { return NumberLevels() - 1; }
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
private:
struct SortedRun {
SortedRun(int _level, FileMetaData* _file, uint64_t _size,
uint64_t _compensated_file_size, bool _being_compacted)
: level(_level),
file(_file),
size(_size),
compensated_file_size(_compensated_file_size),
being_compacted(_being_compacted) {
assert(compensated_file_size > 0);
assert(level != 0 || file != nullptr);
}
void Dump(char* out_buf, size_t out_buf_size,
bool print_path = false) const;
// sorted_run_count is added into the string to print
void DumpSizeInfo(char* out_buf, size_t out_buf_size,
size_t sorted_run_count) const;
int level;
// `file` Will be null for level > 0. For level = 0, the sorted run is
// for this file.
FileMetaData* file;
// For level > 0, `size` and `compensated_file_size` are sum of sizes all
// files in the level. `being_compacted` should be the same for all files
// in a non-zero level. Use the value here.
uint64_t size;
uint64_t compensated_file_size;
bool being_compacted;
};
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionUniversalReadAmp(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score, unsigned int ratio,
unsigned int num_files, const std::vector<SortedRun>& sorted_runs,
LogBuffer* log_buffer);
// Pick Universal compaction to limit space amplification.
Compaction* PickCompactionUniversalSizeAmp(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer);
static std::vector<SortedRun> CalculateSortedRuns(
const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions);
// Pick a path ID to place a newly generated file, with its estimated file
// size.
static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
uint64_t file_size);
};
class FIFOCompactionPicker : public CompactionPicker {
public:
FIFOCompactionPicker(const ImmutableCFOptions& ioptions,

@ -3,11 +3,12 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/compaction.h"
#include "db/compaction_picker.h"
#include <limits>
#include <string>
#include <utility>
#include "db/compaction.h"
#include "db/compaction_picker_universal.h"
#include "util/logging.h"
#include "util/string_util.h"

@ -0,0 +1,735 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction_picker_universal.h"
#ifndef ROCKSDB_LITE
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <limits>
#include <queue>
#include <string>
#include <utility>
#include "db/column_family.h"
#include "monitoring/statistics.h"
#include "util/filename.h"
#include "util/log_buffer.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/sync_point.h"
namespace rocksdb {
namespace {
// Used in universal compaction when trivial move is enabled.
// This structure is used for the construction of min heap
// that contains the file meta data, the level of the file
// and the index of the file in that level
struct InputFileInfo {
InputFileInfo() : f(nullptr) {}
FileMetaData* f;
size_t level;
size_t index;
};
// Used in universal compaction when trivial move is enabled.
// This comparator is used for the construction of min heap
// based on the smallest key of the file.
struct SmallestKeyHeapComparator {
explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
bool operator()(InputFileInfo i1, InputFileInfo i2) const {
return (ucmp_->Compare(i1.f->smallest.user_key(),
i2.f->smallest.user_key()) > 0);
}
private:
const Comparator* ucmp_;
};
typedef std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
SmallestKeyHeapComparator>
SmallestKeyHeap;
// This function creates the heap that is used to find if the files are
// overlapping during universal compaction when the allow_trivial_move
// is set.
SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
SmallestKeyHeap smallest_key_priority_q =
SmallestKeyHeap(SmallestKeyHeapComparator(ucmp));
InputFileInfo input_file;
for (size_t l = 0; l < c->num_input_levels(); l++) {
if (c->num_input_files(l) != 0) {
if (l == 0 && c->start_level() == 0) {
for (size_t i = 0; i < c->num_input_files(0); i++) {
input_file.f = c->input(0, i);
input_file.level = 0;
input_file.index = i;
smallest_key_priority_q.push(std::move(input_file));
}
} else {
input_file.f = c->input(l, 0);
input_file.level = l;
input_file.index = 0;
smallest_key_priority_q.push(std::move(input_file));
}
}
}
return smallest_key_priority_q;
}
#ifndef NDEBUG
// smallest_seqno and largest_seqno are set iff. `files` is not empty.
void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
SequenceNumber* smallest_seqno,
SequenceNumber* largest_seqno) {
bool is_first = true;
for (FileMetaData* f : files) {
assert(f->smallest_seqno <= f->largest_seqno);
if (is_first) {
is_first = false;
*smallest_seqno = f->smallest_seqno;
*largest_seqno = f->largest_seqno;
} else {
if (f->smallest_seqno < *smallest_seqno) {
*smallest_seqno = f->smallest_seqno;
}
if (f->largest_seqno > *largest_seqno) {
*largest_seqno = f->largest_seqno;
}
}
}
}
#endif
} // namespace
// Algorithm that checks to see if there are any overlapping
// files in the input
bool UniversalCompactionPicker::IsInputFilesNonOverlapping(Compaction* c) {
auto comparator = icmp_->user_comparator();
int first_iter = 1;
InputFileInfo prev, curr, next;
SmallestKeyHeap smallest_key_priority_q =
create_level_heap(c, icmp_->user_comparator());
while (!smallest_key_priority_q.empty()) {
curr = smallest_key_priority_q.top();
smallest_key_priority_q.pop();
if (first_iter) {
prev = curr;
first_iter = 0;
} else {
if (comparator->Compare(prev.f->largest.user_key(),
curr.f->smallest.user_key()) >= 0) {
// found overlapping files, return false
return false;
}
assert(comparator->Compare(curr.f->largest.user_key(),
prev.f->largest.user_key()) > 0);
prev = curr;
}
next.f = nullptr;
if (curr.level != 0 && curr.index < c->num_input_files(curr.level) - 1) {
next.f = c->input(curr.level, curr.index + 1);
next.level = curr.level;
next.index = curr.index + 1;
}
if (next.f) {
smallest_key_priority_q.push(std::move(next));
}
}
return true;
}
bool UniversalCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage) const {
const int kLevel0 = 0;
return vstorage->CompactionScore(kLevel0) >= 1;
}
void UniversalCompactionPicker::SortedRun::Dump(char* out_buf,
size_t out_buf_size,
bool print_path) const {
if (level == 0) {
assert(file != nullptr);
if (file->fd.GetPathId() == 0 || !print_path) {
snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
} else {
snprintf(out_buf, out_buf_size, "file %" PRIu64
"(path "
"%" PRIu32 ")",
file->fd.GetNumber(), file->fd.GetPathId());
}
} else {
snprintf(out_buf, out_buf_size, "level %d", level);
}
}
void UniversalCompactionPicker::SortedRun::DumpSizeInfo(
char* out_buf, size_t out_buf_size, size_t sorted_run_count) const {
if (level == 0) {
assert(file != nullptr);
snprintf(out_buf, out_buf_size,
"file %" PRIu64 "[%" ROCKSDB_PRIszt
"] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")",
file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(),
file->compensated_file_size);
} else {
snprintf(out_buf, out_buf_size,
"level %d[%" ROCKSDB_PRIszt
"] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")",
level, sorted_run_count, size, compensated_file_size);
}
}
std::vector<UniversalCompactionPicker::SortedRun>
UniversalCompactionPicker::CalculateSortedRuns(
const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions) {
std::vector<UniversalCompactionPicker::SortedRun> ret;
for (FileMetaData* f : vstorage.LevelFiles(0)) {
ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
f->being_compacted);
}
for (int level = 1; level < vstorage.num_levels(); level++) {
uint64_t total_compensated_size = 0U;
uint64_t total_size = 0U;
bool being_compacted = false;
bool is_first = true;
for (FileMetaData* f : vstorage.LevelFiles(level)) {
total_compensated_size += f->compensated_file_size;
total_size += f->fd.GetFileSize();
if (ioptions.compaction_options_universal.allow_trivial_move == true) {
if (f->being_compacted) {
being_compacted = f->being_compacted;
}
} else {
// Compaction always includes all files for a non-zero level, so for a
// non-zero level, all the files should share the same being_compacted
// value.
// This assumption is only valid when
// ioptions.compaction_options_universal.allow_trivial_move is false
assert(is_first || f->being_compacted == being_compacted);
}
if (is_first) {
being_compacted = f->being_compacted;
is_first = false;
}
}
if (total_compensated_size > 0) {
ret.emplace_back(level, nullptr, total_size, total_compensated_size,
being_compacted);
}
}
return ret;
}
// Universal style of compaction. Pick files that are contiguous in
// time-range to compact.
//
Compaction* UniversalCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
const int kLevel0 = 0;
double score = vstorage->CompactionScore(kLevel0);
std::vector<SortedRun> sorted_runs =
CalculateSortedRuns(*vstorage, ioptions_);
if (sorted_runs.size() == 0 ||
sorted_runs.size() <
(unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) {
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n",
cf_name.c_str());
TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
nullptr);
return nullptr;
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER_MAX_SZ(
log_buffer, 3072,
"[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n",
cf_name.c_str(), sorted_runs.size(), vstorage->LevelSummary(&tmp));
// Check for size amplification first.
Compaction* c;
if ((c = PickCompactionToReduceSizeAmp(cf_name, mutable_cf_options, vstorage,
score, sorted_runs, log_buffer)) !=
nullptr) {
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: compacting for size amp\n",
cf_name.c_str());
} else {
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio = ioptions_.compaction_options_universal.size_ratio;
if ((c = PickCompactionToReduceSortedRuns(
cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX,
sorted_runs, log_buffer)) != nullptr) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Universal: compacting for size ratio\n",
cf_name.c_str());
} else {
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
// This is guaranteed by NeedsCompaction()
assert(sorted_runs.size() >=
static_cast<size_t>(
mutable_cf_options.level0_file_num_compaction_trigger));
// Get the total number of sorted runs that are not being compacted
int num_sr_not_compacted = 0;
for (size_t i = 0; i < sorted_runs.size(); i++) {
if (sorted_runs[i].being_compacted == false) {
num_sr_not_compacted++;
}
}
// The number of sorted runs that are not being compacted is greater than
// the maximum allowed number of sorted runs
if (num_sr_not_compacted >
mutable_cf_options.level0_file_num_compaction_trigger) {
unsigned int num_files =
num_sr_not_compacted -
mutable_cf_options.level0_file_num_compaction_trigger + 1;
if ((c = PickCompactionToReduceSortedRuns(
cf_name, mutable_cf_options, vstorage, score, UINT_MAX,
num_files, sorted_runs, log_buffer)) != nullptr) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Universal: compacting for file num -- %u\n",
cf_name.c_str(), num_files);
}
}
}
}
if (c == nullptr) {
TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
nullptr);
return nullptr;
}
if (ioptions_.compaction_options_universal.allow_trivial_move == true) {
c->set_is_trivial_move(IsInputFilesNonOverlapping(c));
}
// validate that all the chosen files of L0 are non overlapping in time
#ifndef NDEBUG
SequenceNumber prev_smallest_seqno = 0U;
bool is_first = true;
size_t level_index = 0U;
if (c->start_level() == 0) {
for (auto f : *c->inputs(0)) {
assert(f->smallest_seqno <= f->largest_seqno);
if (is_first) {
is_first = false;
} else {
assert(prev_smallest_seqno > f->largest_seqno);
}
prev_smallest_seqno = f->smallest_seqno;
}
level_index = 1U;
}
for (; level_index < c->num_input_levels(); level_index++) {
if (c->num_input_files(level_index) != 0) {
SequenceNumber smallest_seqno = 0U;
SequenceNumber largest_seqno = 0U;
GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
&largest_seqno);
if (is_first) {
is_first = false;
} else if (prev_smallest_seqno > 0) {
// A level is considered as the bottommost level if there are
// no files in higher levels or if files in higher levels do
// not overlap with the files being compacted. Sequence numbers
// of files in bottommost level can be set to 0 to help
// compression. As a result, the following assert may not hold
// if the prev_smallest_seqno is 0.
assert(prev_smallest_seqno > largest_seqno);
}
prev_smallest_seqno = smallest_seqno;
}
}
#endif
// update statistics
MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION,
c->inputs(0)->size());
RegisterCompaction(c);
TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return",
c);
return c;
}
uint32_t UniversalCompactionPicker::GetPathId(
const ImmutableCFOptions& ioptions, uint64_t file_size) {
// Two conditions need to be satisfied:
// (1) the target path needs to be able to hold the file's size
// (2) Total size left in this and previous paths need to be not
// smaller than expected future file size before this new file is
// compacted, which is estimated based on size_ratio.
// For example, if now we are compacting files of size (1, 1, 2, 4, 8),
// we will make sure the target file, probably with size of 16, will be
// placed in a path so that eventually when new files are generated and
// compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
// before the path we chose.
//
// TODO(sdong): now the case of multiple column families is not
// considered in this algorithm. So the target size can be violated in
// that case. We need to improve it.
uint64_t accumulated_size = 0;
uint64_t future_size =
file_size * (100 - ioptions.compaction_options_universal.size_ratio) /
100;
uint32_t p = 0;
assert(!ioptions.db_paths.empty());
for (; p < ioptions.db_paths.size() - 1; p++) {
uint64_t target_size = ioptions.db_paths[p].target_size;
if (target_size > file_size &&
accumulated_size + (target_size - file_size) > future_size) {
return p;
}
accumulated_size += target_size;
}
return p;
}
//
// Consider compaction files based on their size differences with
// the next file in time order.
//
Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score, unsigned int ratio,
unsigned int max_number_of_files_to_compact,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
unsigned int min_merge_width =
ioptions_.compaction_options_universal.min_merge_width;
unsigned int max_merge_width =
ioptions_.compaction_options_universal.max_merge_width;
const SortedRun* sr = nullptr;
bool done = false;
size_t start_index = 0;
unsigned int candidate_count = 0;
unsigned int max_files_to_compact =
std::min(max_merge_width, max_number_of_files_to_compact);
min_merge_width = std::max(min_merge_width, 2U);
// Caller checks the size before executing this function. This invariant is
// important because otherwise we may have a possible integer underflow when
// dealing with unsigned types.
assert(sorted_runs.size() > 0);
// Considers a candidate file only if it is smaller than the
// total size accumulated so far.
for (size_t loop = 0; loop < sorted_runs.size(); loop++) {
candidate_count = 0;
// Skip files that are already being compacted
for (sr = nullptr; loop < sorted_runs.size(); loop++) {
sr = &sorted_runs[loop];
if (!sr->being_compacted) {
candidate_count = 1;
break;
}
char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf));
ROCKS_LOG_BUFFER(log_buffer,
"[%s] Universal: %s"
"[%d] being compacted, skipping",
cf_name.c_str(), file_num_buf, loop);
sr = nullptr;
}
// This file is not being compacted. Consider it as the
// first candidate to be compacted.
uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0;
if (sr != nullptr) {
char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf), true);
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Possible candidate %s[%d].",
cf_name.c_str(), file_num_buf, loop);
}
// Check if the succeeding files need compaction.
for (size_t i = loop + 1;
candidate_count < max_files_to_compact && i < sorted_runs.size();
i++) {
const SortedRun* succeeding_sr = &sorted_runs[i];
if (succeeding_sr->being_compacted) {
break;
}
// Pick files if the total/last candidate file size (increased by the
// specified ratio) is still larger than the next candidate file.
// candidate_size is the total size of files picked so far with the
// default kCompactionStopStyleTotalSize; with
// kCompactionStopStyleSimilarSize, it's simply the size of the last
// picked file.
double sz = candidate_size * (100.0 + ratio) / 100.0;
if (sz < static_cast<double>(succeeding_sr->size)) {
break;
}
if (ioptions_.compaction_options_universal.stop_style ==
kCompactionStopStyleSimilarSize) {
// Similar-size stopping rule: also check the last picked file isn't
// far larger than the next candidate file.
sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0;
if (sz < static_cast<double>(candidate_size)) {
// If the small file we've encountered begins a run of similar-size
// files, we'll pick them up on a future iteration of the outer
// loop. If it's some lonely straggler, it'll eventually get picked
// by the last-resort read amp strategy which disregards size ratios.
break;
}
candidate_size = succeeding_sr->compensated_file_size;
} else { // default kCompactionStopStyleTotalSize
candidate_size += succeeding_sr->compensated_file_size;
}
candidate_count++;
}
// Found a series of consecutive files that need compaction.
if (candidate_count >= (unsigned int)min_merge_width) {
start_index = loop;
done = true;
break;
} else {
for (size_t i = loop;
i < loop + candidate_count && i < sorted_runs.size(); i++) {
const SortedRun* skipping_sr = &sorted_runs[i];
char file_num_buf[256];
skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Skipping %s",
cf_name.c_str(), file_num_buf);
}
}
}
if (!done || candidate_count <= 1) {
return nullptr;
}
size_t first_index_after = start_index + candidate_count;
// Compression is enabled if files compacted earlier already reached
// size ratio of compression.
bool enable_compression = true;
int ratio_to_compress =
ioptions_.compaction_options_universal.compression_size_percent;
if (ratio_to_compress >= 0) {
uint64_t total_size = 0;
for (auto& sorted_run : sorted_runs) {
total_size += sorted_run.compensated_file_size;
}
uint64_t older_file_size = 0;
for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) {
older_file_size += sorted_runs[i].size;
if (older_file_size * 100L >= total_size * (long)ratio_to_compress) {
enable_compression = false;
break;
}
}
}
uint64_t estimated_total_size = 0;
for (unsigned int i = 0; i < first_index_after; i++) {
estimated_total_size += sorted_runs[i].size;
}
uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
int start_level = sorted_runs[start_index].level;
int output_level;
if (first_index_after == sorted_runs.size()) {
output_level = vstorage->num_levels() - 1;
} else if (sorted_runs[first_index_after].level == 0) {
output_level = 0;
} else {
output_level = sorted_runs[first_index_after].level - 1;
}
std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
}
for (size_t i = start_index; i < first_index_after; i++) {
auto& picking_sr = sorted_runs[i];
if (picking_sr.level == 0) {
FileMetaData* picking_file = picking_sr.file;
inputs[0].files.push_back(picking_file);
} else {
auto& files = inputs[picking_sr.level - start_level].files;
for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
files.push_back(f);
}
}
char file_num_buf[256];
picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(),
file_num_buf);
}
CompactionReason compaction_reason;
if (max_number_of_files_to_compact == UINT_MAX) {
compaction_reason = CompactionReason::kUniversalSortedRunNum;
} else {
compaction_reason = CompactionReason::kUniversalSizeRatio;
}
return new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level,
1, enable_compression),
/* grandparents */ {}, /* is manual */ false, score,
false /* deletion_compaction */, compaction_reason);
}
// Look at overall size amplification. If size amplification
// exceeeds the configured value, then do a compaction
// of the candidate files all the way upto the earliest
// base file (overrides configured values of file-size ratios,
// min_merge_width and max_merge_width).
//
Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
// percentage flexibilty while reducing size amplification
uint64_t ratio =
ioptions_.compaction_options_universal.max_size_amplification_percent;
unsigned int candidate_count = 0;
uint64_t candidate_size = 0;
size_t start_index = 0;
const SortedRun* sr = nullptr;
// Skip files that are already being compacted
for (size_t loop = 0; loop < sorted_runs.size() - 1; loop++) {
sr = &sorted_runs[loop];
if (!sr->being_compacted) {
start_index = loop; // Consider this as the first candidate.
break;
}
char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf), true);
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: skipping %s[%d] compacted %s",
cf_name.c_str(), file_num_buf, loop,
" cannot be a candidate to reduce size amp.\n");
sr = nullptr;
}
if (sr == nullptr) {
return nullptr; // no candidate files
}
{
char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf), true);
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s",
cf_name.c_str(), file_num_buf, start_index, " to reduce size amp.\n");
}
// keep adding up all the remaining files
for (size_t loop = start_index; loop < sorted_runs.size() - 1; loop++) {
sr = &sorted_runs[loop];
if (sr->being_compacted) {
char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf), true);
ROCKS_LOG_BUFFER(
log_buffer, "[%s] Universal: Possible candidate %s[%d] %s",
cf_name.c_str(), file_num_buf, start_index,
" is already being compacted. No size amp reduction possible.\n");
return nullptr;
}
candidate_size += sr->compensated_file_size;
candidate_count++;
}
if (candidate_count == 0) {
return nullptr;
}
// size of earliest file
uint64_t earliest_file_size = sorted_runs.back().size;
// size amplification = percentage of additional size
if (candidate_size * 100 < ratio * earliest_file_size) {
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
" earliest-file-size %" PRIu64,
cf_name.c_str(), candidate_size, earliest_file_size);
return nullptr;
} else {
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
" earliest-file-size %" PRIu64,
cf_name.c_str(), candidate_size, earliest_file_size);
}
assert(start_index < sorted_runs.size() - 1);
// Estimate total file size
uint64_t estimated_total_size = 0;
for (size_t loop = start_index; loop < sorted_runs.size(); loop++) {
estimated_total_size += sorted_runs[loop].size;
}
uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
int start_level = sorted_runs[start_index].level;
std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
}
// We always compact all the files, so always compress.
for (size_t loop = start_index; loop < sorted_runs.size(); loop++) {
auto& picking_sr = sorted_runs[loop];
if (picking_sr.level == 0) {
FileMetaData* f = picking_sr.file;
inputs[0].files.push_back(f);
} else {
auto& files = inputs[picking_sr.level - start_level].files;
for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
files.push_back(f);
}
}
char file_num_buf[256];
picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: size amp picking %s",
cf_name.c_str(), file_num_buf);
}
return new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs),
vstorage->num_levels() - 1,
mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1),
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
vstorage->num_levels() - 1, 1),
/* grandparents */ {}, /* is manual */ false, score,
false /* deletion_compaction */,
CompactionReason::kUniversalSizeAmplification);
}
} // namespace rocksdb
#endif // !ROCKSDB_LITE

@ -0,0 +1,91 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#ifndef ROCKSDB_LITE
#include "db/compaction_picker.h"
namespace rocksdb {
class UniversalCompactionPicker : public CompactionPicker {
public:
UniversalCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
virtual int MaxOutputLevel() const override { return NumberLevels() - 1; }
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
private:
struct SortedRun {
SortedRun(int _level, FileMetaData* _file, uint64_t _size,
uint64_t _compensated_file_size, bool _being_compacted)
: level(_level),
file(_file),
size(_size),
compensated_file_size(_compensated_file_size),
being_compacted(_being_compacted) {
assert(compensated_file_size > 0);
assert(level != 0 || file != nullptr);
}
void Dump(char* out_buf, size_t out_buf_size,
bool print_path = false) const;
// sorted_run_count is added into the string to print
void DumpSizeInfo(char* out_buf, size_t out_buf_size,
size_t sorted_run_count) const;
int level;
// `file` Will be null for level > 0. For level = 0, the sorted run is
// for this file.
FileMetaData* file;
// For level > 0, `size` and `compensated_file_size` are sum of sizes all
// files in the level. `being_compacted` should be the same for all files
// in a non-zero level. Use the value here.
uint64_t size;
uint64_t compensated_file_size;
bool being_compacted;
};
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionToReduceSortedRuns(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score, unsigned int ratio,
unsigned int num_files, const std::vector<SortedRun>& sorted_runs,
LogBuffer* log_buffer);
// Pick Universal compaction to limit space amplification.
Compaction* PickCompactionToReduceSizeAmp(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer);
// Used in universal compaction when the enabled_trivial_move
// option is set. Checks whether there are any overlapping files
// in the input. Returns true if the input files are non
// overlapping.
bool IsInputFilesNonOverlapping(Compaction* c);
static std::vector<SortedRun> CalculateSortedRuns(
const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions);
// Pick a path ID to place a newly generated file, with its estimated file
// size.
static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
uint64_t file_size);
};
} // namespace rocksdb
#endif // !ROCKSDB_LITE

@ -16,10 +16,10 @@
#include "db/builder.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "util/sst_file_manager_impl.h"
#include "util/sync_point.h"
#include "monitoring/thread_status_updater.h"
#include "monitoring/thread_status_util.h"
#include "util/sst_file_manager_impl.h"
#include "util/sync_point.h"
namespace rocksdb {
Status DBImpl::SyncClosedLogs(JobContext* job_context) {
@ -425,7 +425,7 @@ Status DBImpl::CompactFilesImpl(
}
for (auto inputs : input_files) {
if (cfd->compaction_picker()->FilesInCompaction(inputs.files)) {
if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
return Status::Aborted(
"Some of the necessary compaction input "
"files are already being compacted");
@ -437,7 +437,7 @@ Status DBImpl::CompactFilesImpl(
unique_ptr<Compaction> c;
assert(cfd->compaction_picker());
c.reset(cfd->compaction_picker()->FormCompaction(
c.reset(cfd->compaction_picker()->CompactFiles(
compact_options, input_files, output_level, version->storage_info(),
*cfd->GetLatestMutableCFOptions(), output_path_id));
if (!c) {

@ -14,8 +14,8 @@
#include <inttypes.h>
#include "db/builder.h"
#include "rocksdb/wal_filter.h"
#include "options/options_helper.h"
#include "rocksdb/wal_filter.h"
#include "util/sst_file_manager_impl.h"
#include "util/sync_point.h"

@ -12,8 +12,8 @@
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include "options/options_helper.h"
#include "monitoring/perf_context_imp.h"
#include "options/options_helper.h"
#include "util/sync_point.h"
namespace rocksdb {

@ -11,6 +11,7 @@ LIB_SOURCES = \
db/compaction_iterator.cc \
db/compaction_job.cc \
db/compaction_picker.cc \
db/compaction_picker_universal.cc \
db/convenience.cc \
db/db_filesnapshot.cc \
db/db_impl.cc \

Loading…
Cancel
Save