Make Compaction class easier to use

Summary:
The goal of this diff is to make Compaction class easier to use. This should also make new compaction algorithms easier to write (like CompactFiles from @yhchiang and dynamic leveled and multi-leveled universal from @sdong).

Here are couple of things demonstrating that Compaction class is hard to use:
1. we have two constructors of Compaction class
2. there's this thing called grandparents_, but it appears to only be setup for leveled compaction and not compactfiles
3. it's easy to introduce a subtle and dangerous bug like this: D36225
4. SetupBottomMostLevel() is hard to understand and it shouldn't be. See this comment: afbafeaeae/db/compaction.cc (L236-L241). It also made it harder for @yhchiang to write CompactFiles, as evidenced by this: afbafeaeae/db/compaction_picker.cc (L204-L210)

The problem is that we create Compaction object, which holds a lot of state, and then pass it around to some functions. After those functions are done mutating, then we call couple of functions on Compaction object, like SetupBottommostLevel() and MarkFilesBeingCompacted(). It is very hard to see what's happening with all that Compaction's state while it's travelling across different functions. If you're writing a new PickCompaction() function you need to try really hard to understand what are all the functions you need to run on Compaction object and what state you need to setup.

My proposed solution is to make important parts of Compaction immutable after construction. PickCompaction() should calculate compaction inputs and then pass them onto Compaction object once they are finalized. That makes it easy to create a new compaction -- just provide all the parameters to the constructor and you're done. No need to call confusing functions after you created your object.

This diff doesn't fully achieve that goal, but it comes pretty close. Here are some of the changes:
* have one Compaction constructor instead of two.
* inputs_ is constant after construction
* MarkFilesBeingCompacted() is now private to Compaction class and automatically called on construction/destruction.
* SetupBottommostLevel() is gone. Compaction figures it out on its own based on the input.
* CompactionPicker's functions are not passing around Compaction object anymore. They are only passing around the state that they need.

Test Plan:
make check
make asan_check
make valgrind_check

Reviewers: rven, anthony, sdong, yhchiang

Reviewed By: yhchiang

Subscribers: sdong, yhchiang, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D36687
main
Igor Canadi 9 years ago
parent 753dd1fdd0
commit 47b8743984
  1. 192
      db/compaction.cc
  2. 90
      db/compaction.h
  3. 1
      db/compaction_job.cc
  4. 19
      db/compaction_job_test.cc
  5. 465
      db/compaction_picker.cc
  6. 36
      db/compaction_picker.h
  7. 15
      db/compaction_picker_test.cc
  8. 8
      db/db_impl.cc
  9. 1
      db/db_test.cc

@ -36,83 +36,91 @@ void Compaction::SetInputVersion(Version* _input_version) {
cfd_->Ref(); cfd_->Ref();
input_version_->Ref(); input_version_->Ref();
edit_ = new VersionEdit(); edit_.SetColumnFamily(cfd_->GetID());
edit_->SetColumnFamily(cfd_->GetID());
} }
Compaction::Compaction(int number_levels, int _start_level, int out_level, // helper function to determine if compaction is creating files at the
uint64_t target_file_size, // bottommost level
uint64_t max_grandparent_overlap_bytes, bool Compaction::IsBottommostLevel(
uint32_t output_path_id, int output_level, VersionStorageInfo* vstorage,
CompressionType output_compression, bool seek_compaction, const std::vector<CompactionInputFiles>& inputs) {
bool deletion_compaction) if (inputs[0].level == 0 &&
: start_level_(_start_level), inputs[0].files.back() != vstorage->LevelFiles(0).back()) {
output_level_(out_level), return false;
max_output_file_size_(target_file_size), }
max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes),
input_version_(nullptr), // checks whether there are files living beyond the output_level.
edit_(nullptr), for (int i = output_level + 1; i < vstorage->num_levels(); i++) {
number_levels_(number_levels), if (vstorage->NumLevelFiles(i) > 0) {
cfd_(nullptr), return false;
output_path_id_(output_path_id), }
output_compression_(output_compression),
seek_compaction_(seek_compaction),
deletion_compaction_(deletion_compaction),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0),
base_index_(-1),
parent_index_(-1),
score_(0),
bottommost_level_(false),
is_full_compaction_(false),
is_manual_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels_)) {
for (int i = 0; i < number_levels_; i++) {
level_ptrs_[i] = 0;
} }
int num_levels = output_level_ - start_level_ + 1; return true;
input_levels_.resize(num_levels); }
inputs_.resize(num_levels);
for (int i = 0; i < num_levels; ++i) { bool Compaction::IsFullCompaction(
inputs_[i].level = start_level_ + i; VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs) {
int num_files_in_compaction = 0;
int total_num_files = 0;
for (int l = 0; l < vstorage->num_levels(); l++) {
total_num_files += vstorage->NumLevelFiles(l);
} }
for (size_t i = 0; i < inputs.size(); i++) {
num_files_in_compaction += inputs[i].size();
}
return num_files_in_compaction == total_num_files;
} }
Compaction::Compaction(VersionStorageInfo* vstorage, Compaction::Compaction(VersionStorageInfo* vstorage,
const autovector<CompactionInputFiles>& _inputs, const MutableCFOptions& _mutable_cf_options,
int _start_level, int _output_level, std::vector<CompactionInputFiles> _inputs,
uint64_t _max_grandparent_overlap_bytes, int _output_level, uint64_t _target_file_size,
const CompactionOptions& _options, uint64_t _max_grandparent_overlap_bytes,
bool _deletion_compaction) uint32_t _output_path_id, CompressionType _compression,
: start_level_(_start_level), std::vector<FileMetaData*> _grandparents,
bool _manual_compaction, double _score,
bool _deletion_compaction)
: start_level_(_inputs[0].level),
output_level_(_output_level), output_level_(_output_level),
max_output_file_size_(_options.output_file_size_limit), max_output_file_size_(_target_file_size),
max_grandparent_overlap_bytes_(_max_grandparent_overlap_bytes), max_grandparent_overlap_bytes_(_max_grandparent_overlap_bytes),
mutable_cf_options_(_mutable_cf_options),
input_version_(nullptr), input_version_(nullptr),
number_levels_(vstorage->num_levels()), number_levels_(vstorage->num_levels()),
cfd_(nullptr), cfd_(nullptr),
output_compression_(_options.compression), output_path_id_(_output_path_id),
seek_compaction_(false), output_compression_(_compression),
deletion_compaction_(_deletion_compaction), deletion_compaction_(_deletion_compaction),
inputs_(_inputs), inputs_(std::move(_inputs)),
grandparents_(std::move(_grandparents)),
grandparent_index_(0), grandparent_index_(0),
seen_key_(false), seen_key_(false),
overlapped_bytes_(0), overlapped_bytes_(0),
base_index_(-1), score_(_score),
parent_index_(-1), bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
score_(0), is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
bottommost_level_(false), is_manual_compaction_(_manual_compaction),
is_full_compaction_(false), level_ptrs_(std::vector<size_t>(number_levels_, 0)) {
is_manual_compaction_(false), MarkFilesBeingCompacted(true);
level_ptrs_(std::vector<size_t>(number_levels_)) {
for (int i = 0; i < number_levels_; i++) { #ifndef NDEBUG
level_ptrs_[i] = 0; for (size_t i = 1; i < inputs_.size(); ++i) {
assert(inputs_[i].level > inputs_[i - 1].level);
}
#endif
// setup input_levels_
{
input_levels_.resize(num_input_levels());
for (size_t which = 0; which < num_input_levels(); which++) {
DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files,
&arena_);
}
} }
} }
Compaction::~Compaction() { Compaction::~Compaction() {
delete edit_;
if (input_version_ != nullptr) { if (input_version_ != nullptr) {
input_version_->Unref(); input_version_->Unref();
} }
@ -123,14 +131,6 @@ Compaction::~Compaction() {
} }
} }
void Compaction::GenerateFileLevels() {
input_levels_.resize(num_input_levels());
for (size_t which = 0; which < num_input_levels(); which++) {
DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files,
&arena_);
}
}
bool Compaction::InputCompressionMatchesOutput() const { bool Compaction::InputCompressionMatchesOutput() const {
int base_level = input_version_->storage_info()->base_level(); int base_level = input_version_->storage_info()->base_level();
bool matches = (GetCompressionType(*cfd_->ioptions(), start_level_, bool matches = (GetCompressionType(*cfd_->ioptions(), start_level_,
@ -144,25 +144,14 @@ bool Compaction::InputCompressionMatchesOutput() const {
} }
bool Compaction::IsTrivialMove() const { bool Compaction::IsTrivialMove() const {
// If start_level_== output_level_, the purpose is to force compaction
// filter to be applied to that level, and thus cannot be a trivia move.
if (start_level_ == output_level_) {
return false;
}
// If compaction involves more than one file, it is not trivial move.
if (num_input_files(0) != 1) {
return false;
}
for (size_t l = 1u; l < num_input_levels(); l++) {
if (num_input_files(l) != 0) {
return false;
}
}
// Avoid a move if there is lots of overlapping grandparent data. // Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require // Otherwise, the move could create a parent file that will require
// a very expensive merge later on. // a very expensive merge later on.
return (input(0, 0)->fd.GetPathId() == GetOutputPathId() && // If start_level_== output_level_, the purpose is to force compaction
// filter to be applied to that level, and thus cannot be a trivia move.
return (start_level_ != output_level_ && num_input_levels() == 1 &&
num_input_files(0) == 1 &&
input(0, 0)->fd.GetPathId() == GetOutputPathId() &&
InputCompressionMatchesOutput() && InputCompressionMatchesOutput() &&
TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_);
} }
@ -240,31 +229,6 @@ void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
} }
} }
// Is this compaction producing files at the bottommost level?
void Compaction::SetupBottomMostLevel(VersionStorageInfo* vstorage,
bool is_manual, bool level0_only) {
if (level0_only) {
// If universal compaction style is used and manual
// compaction is occuring, then we are guaranteed that
// all files will be picked in a single compaction
// run. We can safely set bottommost_level_ = true.
// If it is not manual compaction, then bottommost_level_
// is already set when the Compaction was created.
if (is_manual) {
bottommost_level_ = true;
}
return;
}
bottommost_level_ = true;
// checks whether there are files living beyond the output_level.
for (int i = output_level_ + 1; i < number_levels_; i++) {
if (vstorage->NumLevelFiles(i) > 0) {
bottommost_level_ = false;
break;
}
}
}
// Sample output: // Sample output:
// If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5, // If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5,
// print: "3@0 + 2@3 + 1@4 files to L5" // print: "3@0 + 2@3 + 1@4 files to L5"
@ -292,6 +256,7 @@ const char* Compaction::InputLevelSummary(
} }
void Compaction::ReleaseCompactionFiles(Status status) { void Compaction::ReleaseCompactionFiles(Status status) {
MarkFilesBeingCompacted(false);
cfd_->compaction_picker()->ReleaseCompactionFiles(this, status); cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);
} }
@ -323,9 +288,9 @@ int InputSummary(const std::vector<FileMetaData*>& files, char* output,
void Compaction::Summary(char* output, int len) { void Compaction::Summary(char* output, int len) {
int write = int write =
snprintf(output, len, "Base version %" PRIu64 snprintf(output, len, "Base version %" PRIu64
" Base level %d, seek compaction:%d, inputs: [", " Base level %d, inputs: [",
input_version_->GetVersionNumber(), input_version_->GetVersionNumber(),
start_level_, seek_compaction_); start_level_);
if (write < 0 || write >= len) { if (write < 0 || write >= len) {
return; return;
} }
@ -365,15 +330,4 @@ uint64_t Compaction::OutputFilePreallocationSize(
return preallocation_size * 1.1; return preallocation_size * 1.1;
} }
Compaction* Compaction::TEST_NewCompaction(
int num_levels, int start_level, int out_level, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, uint32_t output_path_id,
CompressionType output_compression, bool seek_compaction,
bool deletion_compaction) {
return new Compaction(num_levels, start_level, out_level, target_file_size,
max_grandparent_overlap_bytes, output_path_id,
output_compression, seek_compaction,
deletion_compaction);
}
} // namespace rocksdb } // namespace rocksdb

@ -34,11 +34,13 @@ class VersionStorageInfo;
class Compaction { class Compaction {
public: public:
Compaction(VersionStorageInfo* input_version, Compaction(VersionStorageInfo* input_version,
const autovector<CompactionInputFiles>& inputs, const MutableCFOptions& mutable_cf_options,
int start_level, int output_level, std::vector<CompactionInputFiles> inputs, int output_level,
uint64_t max_grandparent_overlap_bytes, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
const CompactionOptions& options, uint32_t output_path_id, CompressionType compression,
bool deletion_compaction); std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, double score = -1,
bool deletion_compaction = false);
// No copying allowed // No copying allowed
Compaction(const Compaction&) = delete; Compaction(const Compaction&) = delete;
@ -62,7 +64,7 @@ class Compaction {
// Return the object that holds the edits to the descriptor done // Return the object that holds the edits to the descriptor done
// by this compaction. // by this compaction.
VersionEdit* edit() const { return edit_; } VersionEdit* edit() { return &edit_; }
// Returns the number of input files associated to the specified // Returns the number of input files associated to the specified
// compaction input level. // compaction input level.
@ -113,10 +115,6 @@ class Compaction {
// Whether need to write output file to second DB path. // Whether need to write output file to second DB path.
uint32_t GetOutputPathId() const { return output_path_id_; } uint32_t GetOutputPathId() const { return output_path_id_; }
// Generate input_levels_ from inputs_
// Should be called when inputs_ is stable
void GenerateFileLevels();
// Is this a trivial compaction that can be implemented by just // Is this a trivial compaction that can be implemented by just
// moving a single input file to the next level (no merging or splitting) // moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const; bool IsTrivialMove() const;
@ -158,8 +156,6 @@ class Compaction {
// Was this compaction triggered manually by the client? // Was this compaction triggered manually by the client?
bool IsManualCompaction() { return is_manual_compaction_; } bool IsManualCompaction() { return is_manual_compaction_; }
void SetOutputPathId(uint32_t path_id) { output_path_id_ = path_id; }
// Return the MutableCFOptions that should be used throughout the compaction // Return the MutableCFOptions that should be used throughout the compaction
// procedure // procedure
const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; } const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; }
@ -171,42 +167,27 @@ class Compaction {
void SetInputVersion(Version* input_version); void SetInputVersion(Version* input_version);
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
// Initialize whether the compaction is producing files at the
// bottommost level.
//
// @see BottomMostLevel()
void SetupBottomMostLevel(VersionStorageInfo* vstorage, bool is_manual,
bool level0_only);
static Compaction* TEST_NewCompaction(
int num_levels, int start_level, int out_level, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, uint32_t output_path_id,
CompressionType output_compression, bool seek_compaction = false,
bool deletion_compaction = false);
CompactionInputFiles* TEST_GetInputFiles(int l) {
return &inputs_[l];
}
struct InputLevelSummaryBuffer { struct InputLevelSummaryBuffer {
char buffer[128]; char buffer[128];
}; };
const char* InputLevelSummary(InputLevelSummaryBuffer* scratch) const; const char* InputLevelSummary(InputLevelSummaryBuffer* scratch) const;
// In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_
void ResetNextCompactionIndex();
private: private:
friend class CompactionPicker; // mark (or clear) all files that are being compacted
friend class UniversalCompactionPicker; void MarkFilesBeingCompacted(bool mark_as_compacted);
friend class FIFOCompactionPicker;
friend class LevelCompactionPicker;
Compaction(int num_levels, int start_level, int out_level, // helper function to determine if compaction with inputs and storage is
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, // bottommost
uint32_t output_path_id, CompressionType output_compression, static bool IsBottommostLevel(
bool seek_compaction = false, bool deletion_compaction = false); int output_level, VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs);
static bool IsFullCompaction(VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs);
const int start_level_; // the lowest level to be compacted const int start_level_; // the lowest level to be compacted
const int output_level_; // levels to which output files are stored const int output_level_; // levels to which output files are stored
@ -214,43 +195,38 @@ class Compaction {
uint64_t max_grandparent_overlap_bytes_; uint64_t max_grandparent_overlap_bytes_;
MutableCFOptions mutable_cf_options_; MutableCFOptions mutable_cf_options_;
Version* input_version_; Version* input_version_;
VersionEdit* edit_; VersionEdit edit_;
int number_levels_; const int number_levels_;
ColumnFamilyData* cfd_; ColumnFamilyData* cfd_;
Arena arena_; // Arena used to allocate space for file_levels_ Arena arena_; // Arena used to allocate space for file_levels_
uint32_t output_path_id_; const uint32_t output_path_id_;
CompressionType output_compression_; CompressionType output_compression_;
bool seek_compaction_;
// If true, then the comaction can be done by simply deleting input files. // If true, then the comaction can be done by simply deleting input files.
bool deletion_compaction_; const bool deletion_compaction_;
// Compaction input files organized by level. // Compaction input files organized by level. Constant after construction
autovector<CompactionInputFiles> inputs_; const std::vector<CompactionInputFiles> inputs_;
// A copy of inputs_, organized more closely in memory // A copy of inputs_, organized more closely in memory
autovector<LevelFilesBrief, 2> input_levels_; autovector<LevelFilesBrief, 2> input_levels_;
// State used to check for number of of overlapping grandparent files // State used to check for number of of overlapping grandparent files
// (grandparent == "output_level_ + 1") // (grandparent == "output_level_ + 1")
// This vector is updated by Version::GetOverlappingInputs().
std::vector<FileMetaData*> grandparents_; std::vector<FileMetaData*> grandparents_;
size_t grandparent_index_; // Index in grandparent_starts_ size_t grandparent_index_; // Index in grandparent_starts_
bool seen_key_; // Some output key has been seen bool seen_key_; // Some output key has been seen
uint64_t overlapped_bytes_; // Bytes of overlap between current output uint64_t overlapped_bytes_; // Bytes of overlap between current output
// and grandparent files // and grandparent files
int base_index_; // index of the file in files_[start_level_] const double score_; // score that was used to pick this compaction.
int parent_index_; // index of some file with same range in
// files_[start_level_+1]
double score_; // score that was used to pick this compaction.
// Is this compaction creating a file in the bottom most level? // Is this compaction creating a file in the bottom most level?
bool bottommost_level_; const bool bottommost_level_;
// Does this compaction include all sst files? // Does this compaction include all sst files?
bool is_full_compaction_; const bool is_full_compaction_;
// Is this compaction requested by the client? // Is this compaction requested by the client?
bool is_manual_compaction_; const bool is_manual_compaction_;
// "level_ptrs_" holds indices into "input_version_->levels_", where each // "level_ptrs_" holds indices into "input_version_->levels_", where each
// index remembers which file of an associated level we are currently used // index remembers which file of an associated level we are currently used
@ -259,10 +235,6 @@ class Compaction {
// records indices for all levels beyond "output_level_". // records indices for all levels beyond "output_level_".
std::vector<size_t> level_ptrs_; std::vector<size_t> level_ptrs_;
// In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_
void ResetNextCompactionIndex();
// Does input compression match the output compression? // Does input compression match the output compression?
bool InputCompressionMatchesOutput() const; bool InputCompressionMatchesOutput() const;
}; };

@ -245,7 +245,6 @@ void CompactionJob::Prepare() {
auto* compaction = compact_->compaction; auto* compaction = compact_->compaction;
// Generate file_levels_ for compaction berfore making Iterator // Generate file_levels_ for compaction berfore making Iterator
compaction->GenerateFileLevels();
ColumnFamilyData* cfd = compact_->compaction->column_family_data(); ColumnFamilyData* cfd = compact_->compaction->column_family_data();
assert(cfd != nullptr); assert(cfd != nullptr);
{ {

@ -66,6 +66,9 @@ class CompactionJobTest : public testing::Test {
auto key = ToString(i * (kKeysPerFile / 2) + k); auto key = ToString(i * (kKeysPerFile / 2) + k);
auto value = ToString(i * kKeysPerFile + k); auto value = ToString(i * kKeysPerFile + k);
InternalKey internal_key(key, ++sequence_number, kTypeValue); InternalKey internal_key(key, ++sequence_number, kTypeValue);
// This is how the key will look like once it's written in bottommost
// file
InternalKey bottommost_internal_key(key, 0, kTypeValue);
if (k == 0) { if (k == 0) {
smallest = internal_key; smallest = internal_key;
smallest_seqno = sequence_number; smallest_seqno = sequence_number;
@ -74,7 +77,7 @@ class CompactionJobTest : public testing::Test {
largest_seqno = sequence_number; largest_seqno = sequence_number;
} }
std::pair<std::string, std::string> key_value( std::pair<std::string, std::string> key_value(
{internal_key.Encode().ToString(), value}); {bottommost_internal_key.Encode().ToString(), value});
contents.insert(key_value); contents.insert(key_value);
if (i == 1 || k < kKeysPerFile / 2) { if (i == 1 || k < kKeysPerFile / 2) {
expected_results.insert(key_value); expected_results.insert(key_value);
@ -143,15 +146,15 @@ TEST_F(CompactionJobTest, Simple) {
auto files = cfd->current()->storage_info()->LevelFiles(0); auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size()); ASSERT_EQ(2U, files.size());
std::unique_ptr<Compaction> compaction(Compaction::TEST_NewCompaction( CompactionInputFiles compaction_input_files;
7, 0, 1, 1024 * 1024, 10, 0, kNoCompression)); compaction_input_files.level = 0;
compaction_input_files.files.push_back(files[0]);
compaction_input_files.files.push_back(files[1]);
std::unique_ptr<Compaction> compaction(new Compaction(
cfd->current()->storage_info(), *cfd->GetLatestMutableCFOptions(),
{compaction_input_files}, 1, 1024 * 1024, 10, 0, kNoCompression, {}));
compaction->SetInputVersion(cfd->current()); compaction->SetInputVersion(cfd->current());
auto compaction_input_files = compaction->TEST_GetInputFiles(0);
compaction_input_files->level = 0;
compaction_input_files->files.push_back(files[0]);
compaction_input_files->files.push_back(files[1]);
SnapshotList snapshots; SnapshotList snapshots;
int yield_callback_called = 0; int yield_callback_called = 0;
std::function<uint64_t()> yield_callback = [&]() { std::function<uint64_t()> yield_callback = [&]() {

@ -70,11 +70,9 @@ CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
CompactionPicker::~CompactionPicker() {} CompactionPicker::~CompactionPicker() {}
// Clear all files to indicate that they are not being compacted
// Delete this compaction from the list of running compactions. // Delete this compaction from the list of running compactions.
void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) { void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
c->MarkFilesBeingCompacted(false); if (c->start_level() == 0) {
if (c->level() == 0) {
level0_compactions_in_progress_.erase(c); level0_compactions_in_progress_.erase(c);
} }
if (!status.ok()) { if (!status.ok()) {
@ -113,61 +111,43 @@ void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs1,
bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name,
VersionStorageInfo* vstorage, VersionStorageInfo* vstorage,
Compaction* c) { CompactionInputFiles* inputs) {
assert(c != nullptr); // This isn't good compaction
// If inputs are empty then there is nothing to expand. assert(!inputs->empty());
if (c->inputs_[0].empty()) {
assert(c->inputs(c->num_input_levels() - 1)->empty());
// This isn't good compaction
return false;
}
const int level = inputs->level;
// GetOverlappingInputs will always do the right thing for level-0. // GetOverlappingInputs will always do the right thing for level-0.
// So we don't need to do any expansion if level == 0. // So we don't need to do any expansion if level == 0.
if (c->level() == 0) { if (level == 0) {
return true; return true;
} }
const int level = c->level();
InternalKey smallest, largest; InternalKey smallest, largest;
// Keep expanding c->inputs_[0] until we are sure that there is a // Keep expanding inputs until we are sure that there is a "clean cut"
// "clean cut" boundary between the files in input and the surrounding files. // boundary between the files in input and the surrounding files.
// This will ensure that no parts of a key are lost during compaction. // This will ensure that no parts of a key are lost during compaction.
int hint_index = -1; int hint_index = -1;
size_t old_size; size_t old_size;
do { do {
old_size = c->inputs_[0].size(); old_size = inputs->size();
GetRange(c->inputs_[0].files, &smallest, &largest); GetRange(inputs->files, &smallest, &largest);
c->inputs_[0].clear(); inputs->clear();
vstorage->GetOverlappingInputs(level, &smallest, &largest, vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files,
&c->inputs_[0].files, hint_index, hint_index, &hint_index);
&hint_index); } while (inputs->size() > old_size);
} while(c->inputs_[0].size() > old_size);
// Get the new range // we started off with inputs non-empty and the previous loop only grew
GetRange(c->inputs_[0].files, &smallest, &largest); // inputs. thus, inputs should be non-empty here
assert(!inputs->empty());
// If, after the expansion, there are files that are already under // If, after the expansion, there are files that are already under
// compaction, then we must drop/cancel this compaction. // compaction, then we must drop/cancel this compaction.
int parent_index = -1; if (FilesInCompaction(inputs->files)) {
if (c->inputs_[0].empty()) {
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] ExpandWhileOverlapping() failure because zero input files", "[%s] ExpandWhileOverlapping() failure because some of the necessary"
" compaction input files are currently being compacted.",
cf_name.c_str()); cf_name.c_str());
}
if (c->inputs_[0].empty() || FilesInCompaction(c->inputs_[0].files) ||
(c->level() != c->output_level() &&
RangeInCompaction(vstorage, &smallest, &largest, c->output_level(),
&parent_index))) {
c->inputs_[0].clear();
c->inputs_[c->num_input_levels() - 1].clear();
if (!c->inputs_[0].empty()) {
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] ExpandWhileOverlapping() failure because some of the necessary"
" compaction input files are currently being compacted.",
c->column_family_data()->GetName().c_str());
}
return false; return false;
} }
return true; return true;
@ -185,38 +165,23 @@ bool CompactionPicker::FilesInCompaction(
} }
Compaction* CompactionPicker::FormCompaction( Compaction* CompactionPicker::FormCompaction(
const CompactionOptions& compact_options, const CompactionOptions& compact_options,
const autovector<CompactionInputFiles>& input_files, const std::vector<CompactionInputFiles>& input_files, int output_level,
int output_level, VersionStorageInfo* vstorage, VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
const MutableCFOptions& mutable_cf_options) const { uint32_t output_path_id) const {
uint64_t max_grandparent_overlap_bytes = uint64_t max_grandparent_overlap_bytes =
output_level + 1 < vstorage->num_levels() ? output_level + 1 < vstorage->num_levels() ?
mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) : mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) :
std::numeric_limits<uint64_t>::max(); std::numeric_limits<uint64_t>::max();
assert(input_files.size()); assert(input_files.size());
auto c = new Compaction(vstorage, input_files, return new Compaction(vstorage, mutable_cf_options, input_files, output_level,
input_files[0].level, output_level, compact_options.output_file_size_limit,
max_grandparent_overlap_bytes, max_grandparent_overlap_bytes, output_path_id,
compact_options, false); compact_options.compression, /* grandparents */ {});
c->mutable_cf_options_ = mutable_cf_options;
c->MarkFilesBeingCompacted(true);
// TODO(yhchiang): complete the SetBottomMostLevel as follows
// If there is no any key of the range in DB that is older than the
// range to compact, it is bottom most. For leveled compaction,
// if number-of_level-1 is empty, and output is going to number-of_level-2,
// it is also bottom-most. On the other hand, if number of level=1 (
// something like universal), the compaction is only "bottom-most" if
// the oldest file is involved.
c->SetupBottomMostLevel(
vstorage,
(output_level == vstorage->num_levels() - 1),
(output_level == 0));
return c;
} }
Status CompactionPicker::GetCompactionInputsFromFileNumbers( Status CompactionPicker::GetCompactionInputsFromFileNumbers(
autovector<CompactionInputFiles>* input_files, std::vector<CompactionInputFiles>* input_files,
std::unordered_set<uint64_t>* input_set, std::unordered_set<uint64_t>* input_set,
const VersionStorageInfo* vstorage, const VersionStorageInfo* vstorage,
const CompactionOptions& compact_options) const { const CompactionOptions& compact_options) const {
@ -226,7 +191,7 @@ Status CompactionPicker::GetCompactionInputsFromFileNumbers(
} }
assert(input_files); assert(input_files);
autovector<CompactionInputFiles> matched_input_files; std::vector<CompactionInputFiles> matched_input_files;
matched_input_files.resize(vstorage->num_levels()); matched_input_files.resize(vstorage->num_levels());
int first_non_empty_level = -1; int first_non_empty_level = -1;
int last_non_empty_level = -1; int last_non_empty_level = -1;
@ -286,88 +251,100 @@ bool CompactionPicker::RangeInCompaction(VersionStorageInfo* vstorage,
// Will also attempt to expand "start level" if that doesn't expand // Will also attempt to expand "start level" if that doesn't expand
// "output level" or cause "level" to include a file for compaction that has an // "output level" or cause "level" to include a file for compaction that has an
// overlapping user-key with another file. // overlapping user-key with another file.
void CompactionPicker::SetupOtherInputs( // REQUIRES: input_level and output_level are different
// REQUIRES: inputs->empty() == false
// Returns false if files on parent level are currently in compaction, which
// means that we can't compact them
bool CompactionPicker::SetupOtherInputs(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, Compaction* c) { VersionStorageInfo* vstorage, CompactionInputFiles* inputs,
// If inputs are empty, then there is nothing to expand. CompactionInputFiles* output_level_inputs, int* parent_index,
// If both input and output levels are the same, no need to consider int base_index) {
// files at level "level+1" assert(!inputs->empty());
if (c->inputs_[0].empty() || c->level() == c->output_level()) { assert(output_level_inputs->empty());
return; const int input_level = inputs->level;
} const int output_level = output_level_inputs->level;
assert(input_level != output_level);
// For now, we only support merging two levels, start level and output level. // For now, we only support merging two levels, start level and output level.
// We need to assert other levels are empty. // We need to assert other levels are empty.
for (int l = c->start_level() + 1; l < c->output_level(); l++) { for (int l = input_level + 1; l < output_level; l++) {
assert(vstorage->NumLevelFiles(l) == 0); assert(vstorage->NumLevelFiles(l) == 0);
} }
const int level = c->level();
InternalKey smallest, largest; InternalKey smallest, largest;
// Get the range one last time. // Get the range one last time.
GetRange(c->inputs_[0].files, &smallest, &largest); GetRange(inputs->files, &smallest, &largest);
// Populate the set of next-level files (inputs_GetOutputLevelInputs()) to // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to
// include in compaction // include in compaction
vstorage->GetOverlappingInputs(c->output_level(), &smallest, &largest, vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
&c->inputs_[c->num_input_levels() - 1].files, &output_level_inputs->files, *parent_index,
c->parent_index_, &c->parent_index_); parent_index);
// Get entire range covered by compaction if (FilesInCompaction(output_level_inputs->files)) {
InternalKey all_start, all_limit; return false;
GetRange(c->inputs_[0].files, c->inputs_[c->num_input_levels() - 1].files, }
&all_start, &all_limit);
// See if we can further grow the number of inputs in "level" without // See if we can further grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up. We also choose NOT // changing the number of "level+1" files we pick up. We also choose NOT
// to expand if this would cause "level" to include some entries for some // to expand if this would cause "level" to include some entries for some
// user key, while excluding other entries for the same user key. This // user key, while excluding other entries for the same user key. This
// can happen when one user key spans multiple files. // can happen when one user key spans multiple files.
if (!c->inputs(c->num_input_levels() - 1)->empty()) { if (!output_level_inputs->empty()) {
std::vector<FileMetaData*> expanded0; std::vector<FileMetaData*> expanded0;
vstorage->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0, // Get entire range covered by compaction
c->base_index_, nullptr); InternalKey all_start, all_limit;
const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0].files); GetRange(inputs->files, output_level_inputs->files, &all_start, &all_limit);
vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
&expanded0, base_index, nullptr);
const uint64_t inputs0_size = TotalCompensatedFileSize(inputs->files);
const uint64_t inputs1_size = const uint64_t inputs1_size =
TotalCompensatedFileSize(c->inputs_[c->num_input_levels() - 1].files); TotalCompensatedFileSize(output_level_inputs->files);
const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0); const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0);
uint64_t limit = mutable_cf_options.ExpandedCompactionByteSizeLimit(level); uint64_t limit =
if (expanded0.size() > c->inputs_[0].size() && mutable_cf_options.ExpandedCompactionByteSizeLimit(input_level);
if (expanded0.size() > inputs->size() &&
inputs1_size + expanded0_size < limit && inputs1_size + expanded0_size < limit &&
!FilesInCompaction(expanded0) && !FilesInCompaction(expanded0) &&
!vstorage->HasOverlappingUserKey(&expanded0, level)) { !vstorage->HasOverlappingUserKey(&expanded0, input_level)) {
InternalKey new_start, new_limit; InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit); GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1; std::vector<FileMetaData*> expanded1;
vstorage->GetOverlappingInputs(c->output_level(), &new_start, &new_limit, vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
&expanded1, c->parent_index_, &expanded1, *parent_index, parent_index);
&c->parent_index_); if (expanded1.size() == output_level_inputs->size() &&
if (expanded1.size() == c->inputs(c->num_input_levels() - 1)->size() &&
!FilesInCompaction(expanded1)) { !FilesInCompaction(expanded1)) {
Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log, Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
"[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64 "[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64
" bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n", " bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n",
cf_name.c_str(), level, c->inputs_[0].size(), cf_name.c_str(), input_level, inputs->size(),
c->inputs(c->num_input_levels() - 1)->size(), inputs0_size, output_level_inputs->size(), inputs0_size, inputs1_size,
inputs1_size, expanded0.size(), expanded1.size(), expanded0_size, expanded0.size(), expanded1.size(), expanded0_size, inputs1_size);
inputs1_size);
smallest = new_start; smallest = new_start;
largest = new_limit; largest = new_limit;
c->inputs_[0].files = expanded0; inputs->files = expanded0;
c->inputs_[c->num_input_levels() - 1].files = expanded1; output_level_inputs->files = expanded1;
GetRange(c->inputs_[0].files,
c->inputs_[c->num_input_levels() - 1].files, &all_start,
&all_limit);
} }
} }
} }
return true;
}
void CompactionPicker::GetGrandparents(
VersionStorageInfo* vstorage, const CompactionInputFiles& inputs,
const CompactionInputFiles& output_level_inputs,
std::vector<FileMetaData*>* grandparents) {
InternalKey start, limit;
GetRange(inputs.files, output_level_inputs.files, &start, &limit);
// Compute the set of grandparent files that overlap this compaction // Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2) // (parent == level+1; grandparent == level+2)
if (c->output_level() + 1 < NumberLevels()) { if (output_level_inputs.level + 1 < NumberLevels()) {
vstorage->GetOverlappingInputs(c->output_level() + 1, &all_start, vstorage->GetOverlappingInputs(output_level_inputs.level + 1, &start,
&all_limit, &c->grandparents_); &limit, grandparents);
} }
} }
@ -379,7 +356,8 @@ Compaction* CompactionPicker::CompactRange(
// CompactionPickerFIFO has its own implementation of compact range // CompactionPickerFIFO has its own implementation of compact range
assert(ioptions_.compaction_style != kCompactionStyleFIFO); assert(ioptions_.compaction_style != kCompactionStyleFIFO);
std::vector<FileMetaData*> inputs; CompactionInputFiles inputs;
inputs.level = input_level;
bool covering_the_whole_range = true; bool covering_the_whole_range = true;
// All files are 'overlapping' in universal style compaction. // All files are 'overlapping' in universal style compaction.
@ -389,7 +367,7 @@ Compaction* CompactionPicker::CompactRange(
end = nullptr; end = nullptr;
} }
vstorage->GetOverlappingInputs(input_level, begin, end, &inputs); vstorage->GetOverlappingInputs(input_level, begin, end, &inputs.files);
if (inputs.empty()) { if (inputs.empty()) {
return nullptr; return nullptr;
} }
@ -408,46 +386,51 @@ Compaction* CompactionPicker::CompactRange(
if (total >= limit) { if (total >= limit) {
**compaction_end = inputs[i + 1]->smallest; **compaction_end = inputs[i + 1]->smallest;
covering_the_whole_range = false; covering_the_whole_range = false;
inputs.resize(i + 1); inputs.files.resize(i + 1);
break; break;
} }
} }
} }
assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size())); assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size()));
Compaction* c = new Compaction(
vstorage->num_levels(), input_level, output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(input_level),
output_path_id,
GetCompressionType(ioptions_, output_level, vstorage->base_level()));
c->inputs_[0].files = inputs; if (ExpandWhileOverlapping(cf_name, vstorage, &inputs) == false) {
if (ExpandWhileOverlapping(cf_name, vstorage, c) == false) { // manual compaction is currently single-threaded, so it should never
delete c; // happen that ExpandWhileOverlapping fails
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, assert(false);
"[%s] Could not compact due to expansion failure.\n", cf_name.c_str());
return nullptr; return nullptr;
} }
SetupOtherInputs(cf_name, mutable_cf_options, vstorage, c);
if (covering_the_whole_range) { if (covering_the_whole_range) {
*compaction_end = nullptr; *compaction_end = nullptr;
} }
// These files that are to be manaully compacted do not trample CompactionInputFiles output_level_inputs;
// upon other files because manual compactions are processed when output_level_inputs.level = output_level;
// the system has a max of 1 background compaction thread. if (input_level != output_level) {
c->MarkFilesBeingCompacted(true); int parent_index = -1;
if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
// Is this compaction creating a file at the bottommost level &output_level_inputs, &parent_index, -1)) {
c->SetupBottomMostLevel( // manual compaction is currently single-threaded, so it should never
vstorage, true, ioptions_.compaction_style == kCompactionStyleUniversal); // happen that SetupOtherInputs fails
assert(false);
return nullptr;
}
}
c->is_manual_compaction_ = true; std::vector<CompactionInputFiles> compaction_inputs({inputs});
c->mutable_cf_options_ = mutable_cf_options; if (!output_level_inputs.empty()) {
compaction_inputs.push_back(output_level_inputs);
}
return c; std::vector<FileMetaData*> grandparents;
GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
return new Compaction(
vstorage, mutable_cf_options, std::move(compaction_inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(input_level),
output_path_id,
GetCompressionType(ioptions_, output_level, vstorage->base_level()),
std::move(grandparents), /* is manual compaction */ true);
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -694,63 +677,80 @@ bool LevelCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage)
Compaction* LevelCompactionPicker::PickCompaction( Compaction* LevelCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) { VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
Compaction* c = nullptr;
int level = -1; int level = -1;
int output_level = -1;
int parent_index = -1;
int base_index = -1;
CompactionInputFiles inputs;
double score;
// Find the compactions by size on all levels. // Find the compactions by size on all levels.
for (int i = 0; i < NumberLevels() - 1; i++) { for (int i = 0; i < NumberLevels() - 1; i++) {
double score = vstorage->CompactionScore(i); score = vstorage->CompactionScore(i);
level = vstorage->CompactionScoreLevel(i); level = vstorage->CompactionScoreLevel(i);
assert(i == 0 || score <= vstorage->CompactionScore(i - 1)); assert(i == 0 || score <= vstorage->CompactionScore(i - 1));
if ((score >= 1)) { if (score >= 1) {
c = PickCompactionBySize(mutable_cf_options, vstorage, level, score); output_level = (level == 0) ? vstorage->base_level() : level + 1;
if (c == nullptr || if (PickCompactionBySize(vstorage, level, output_level, &inputs,
ExpandWhileOverlapping(cf_name, vstorage, c) == false) { &parent_index, &base_index) &&
delete c; ExpandWhileOverlapping(cf_name, vstorage, &inputs)) {
c = nullptr; // found the compaction!
} else {
break; break;
} }
} }
} }
if (c == nullptr) { if (inputs.empty()) {
return nullptr; return nullptr;
} }
assert(level >= 0 && output_level >= 0);
// Two level 0 compaction won't run at the same time, so don't need to worry // Two level 0 compaction won't run at the same time, so don't need to worry
// about files on level 0 being compacted. // about files on level 0 being compacted.
if (level == 0) { if (level == 0) {
assert(level0_compactions_in_progress_.empty()); assert(level0_compactions_in_progress_.empty());
InternalKey smallest, largest; InternalKey smallest, largest;
GetRange(c->inputs_[0].files, &smallest, &largest); GetRange(inputs.files, &smallest, &largest);
// Note that the next call will discard the file we placed in // Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set // c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file. // which will include the picked file.
c->inputs_[0].clear(); inputs.files.clear();
vstorage->GetOverlappingInputs(0, &smallest, &largest, vstorage->GetOverlappingInputs(0, &smallest, &largest, &inputs.files);
&c->inputs_[0].files);
// If we include more L0 files in the same compaction run it can // If we include more L0 files in the same compaction run it can
// cause the 'smallest' and 'largest' key to get extended to a // cause the 'smallest' and 'largest' key to get extended to a
// larger range. So, re-invoke GetRange to get the new key range // larger range. So, re-invoke GetRange to get the new key range
GetRange(c->inputs_[0].files, &smallest, &largest); GetRange(inputs.files, &smallest, &largest);
if (RangeInCompaction(vstorage, &smallest, &largest, c->output_level(), if (RangeInCompaction(vstorage, &smallest, &largest, output_level,
&c->parent_index_)) { &parent_index)) {
delete c;
return nullptr; return nullptr;
} }
assert(!c->inputs_[0].empty()); assert(!inputs.files.empty());
} }
// Setup input files from output level // Setup input files from output level
SetupOtherInputs(cf_name, mutable_cf_options, vstorage, c); CompactionInputFiles output_level_inputs;
output_level_inputs.level = output_level;
if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
&output_level_inputs, &parent_index, base_index)) {
return nullptr;
}
// mark all the files that are being compacted std::vector<CompactionInputFiles> compaction_inputs({inputs});
c->MarkFilesBeingCompacted(true); if (!output_level_inputs.empty()) {
compaction_inputs.push_back(output_level_inputs);
}
// Is this compaction creating a file at the bottommost level std::vector<FileMetaData*> grandparents;
c->SetupBottomMostLevel(vstorage, false, false); GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
auto c = new Compaction(
vstorage, mutable_cf_options, std::move(compaction_inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(level),
GetPathId(ioptions_, mutable_cf_options, output_level),
GetCompressionType(ioptions_, output_level, vstorage->base_level()),
std::move(grandparents),
/* is manual */ false, score);
// If it's level 0 compaction, make sure we don't execute any other level 0 // If it's level 0 compaction, make sure we don't execute any other level 0
// compactions in parallel // compactions in parallel
@ -758,8 +758,6 @@ Compaction* LevelCompactionPicker::PickCompaction(
level0_compactions_in_progress_.insert(c); level0_compactions_in_progress_.insert(c);
} }
c->mutable_cf_options_ = mutable_cf_options;
// Creating a compaction influences the compaction score because the score // Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already // takes running compactions into account (by skipping files that are already
// being compacted). Since we just changed compaction score, we recalculate it // being compacted). Since we just changed compaction score, we recalculate it
@ -811,11 +809,11 @@ uint32_t LevelCompactionPicker::GetPathId(
return p; return p;
} }
Compaction* LevelCompactionPicker::PickCompactionBySize( bool LevelCompactionPicker::PickCompactionBySize(VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int level, int output_level,
int level, double score) { CompactionInputFiles* inputs,
Compaction* c = nullptr; int* parent_index,
int* base_index) {
// level 0 files are overlapping. So we cannot pick more // level 0 files are overlapping. So we cannot pick more
// than one concurrent compactions at this level. This // than one concurrent compactions at this level. This
// could be made better by looking at key-ranges that are // could be made better by looking at key-ranges that are
@ -825,21 +823,6 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
} }
assert(level >= 0); assert(level >= 0);
int output_level;
if (level == 0) {
output_level = vstorage->base_level();
} else {
output_level = level + 1;
}
assert(output_level < NumberLevels());
c = new Compaction(
vstorage->num_levels(), level, output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(level),
GetPathId(ioptions_, mutable_cf_options, output_level),
GetCompressionType(ioptions_, output_level, vstorage->base_level()));
c->score_ = score;
// Pick the largest file in this level that is not already // Pick the largest file in this level that is not already
// being compacted // being compacted
@ -852,7 +835,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
for (unsigned int i = vstorage->NextCompactionIndex(level); for (unsigned int i = vstorage->NextCompactionIndex(level);
i < file_size.size(); i++) { i < file_size.size(); i++) {
int index = file_size[i]; int index = file_size[i];
FileMetaData* f = level_files[index]; auto* f = level_files[index];
assert((i == file_size.size() - 1) || assert((i == file_size.size() - 1) ||
(i >= VersionStorageInfo::kNumberFilesToSort - 1) || (i >= VersionStorageInfo::kNumberFilesToSort - 1) ||
@ -872,26 +855,21 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
// Do not pick this file if its parents at level+1 are being compacted. // Do not pick this file if its parents at level+1 are being compacted.
// Maybe we can avoid redoing this work in SetupOtherInputs // Maybe we can avoid redoing this work in SetupOtherInputs
int parent_index = -1; *parent_index = -1;
if (RangeInCompaction(vstorage, &f->smallest, &f->largest, if (RangeInCompaction(vstorage, &f->smallest, &f->largest, output_level,
c->output_level(), &parent_index)) { parent_index)) {
continue; continue;
} }
c->inputs_[0].files.push_back(f); inputs->files.push_back(f);
c->base_index_ = index; inputs->level = level;
c->parent_index_ = parent_index; *base_index = index;
break; break;
} }
if (c->inputs_[0].empty()) {
delete c;
c = nullptr;
}
// store where to start the iteration in the next call to PickCompaction // store where to start the iteration in the next call to PickCompaction
vstorage->SetNextCompactionIndex(level, nextIndex); vstorage->SetNextCompactionIndex(level, nextIndex);
return c; return inputs->size() > 0;
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -1061,8 +1039,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(
size_t level_index = 0U; size_t level_index = 0U;
if (c->start_level() == 0) { if (c->start_level() == 0) {
for (unsigned int i = 0; i < c->inputs_[0].size(); i++) { for (auto f : *c->inputs(0)) {
FileMetaData* f = c->inputs_[0][i];
assert(f->smallest_seqno <= f->largest_seqno); assert(f->smallest_seqno <= f->largest_seqno);
if (is_first) { if (is_first) {
is_first = false; is_first = false;
@ -1088,43 +1065,12 @@ Compaction* UniversalCompactionPicker::PickCompaction(
} }
} }
#endif #endif
auto& last_sr = sorted_runs.back();
if (c->output_level() < last_sr.level) {
// If it doesn't compact to the last level that has file, it's not the last
// bottom most.
c->bottommost_level_ = false;
} else if (last_sr.level == 0) {
// All files are level 0. Then the compaction is bottom most iff it includes
// the last file.
c->bottommost_level_ = c->inputs_[kLevel0].files.back() == last_sr.file;
} else {
// In this case, it's not level 0 only and the compaction includes the last
// level having files. It is bottom most.
c->bottommost_level_ = true;
}
// update statistics // update statistics
MeasureTime(ioptions_.statistics, MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION,
NUM_FILES_IN_SINGLE_COMPACTION, c->inputs_[kLevel0].size()); c->inputs(0)->size());
// mark all the files that are being compacted
c->MarkFilesBeingCompacted(true);
level0_compactions_in_progress_.insert(c); level0_compactions_in_progress_.insert(c);
// Record whether this compaction includes all sst files.
// For now, it is only relevant in universal compaction mode.
int num_files_in_compaction = 0;
int total_num_files = 0;
for (int level = 0; level < vstorage->num_levels(); level++) {
total_num_files += vstorage->NumLevelFiles(level);
}
for (size_t i = 0; i < c->num_input_levels(); i++) {
num_files_in_compaction += c->num_input_files(i);
}
c->is_full_compaction_ = (num_files_in_compaction == total_num_files);
c->mutable_cf_options_ = mutable_cf_options;
return c; return c;
} }
@ -1309,19 +1255,17 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
output_level = sorted_runs[first_index_after].level - 1; output_level = sorted_runs[first_index_after].level - 1;
} }
Compaction* c = new Compaction( std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
vstorage->num_levels(), start_level, output_level, for (size_t i = 0; i < inputs.size(); ++i) {
mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id, inputs[i].level = start_level + static_cast<int>(i);
GetCompressionType(ioptions_, start_level, 1, enable_compression)); }
c->score_ = score;
for (unsigned int i = start_index; i < first_index_after; i++) { for (unsigned int i = start_index; i < first_index_after; i++) {
auto& picking_sr = sorted_runs[i]; auto& picking_sr = sorted_runs[i];
if (picking_sr.level == 0) { if (picking_sr.level == 0) {
FileMetaData* picking_file = picking_sr.file; FileMetaData* picking_file = picking_sr.file;
c->inputs_[0].files.push_back(picking_file); inputs[0].files.push_back(picking_file);
} else { } else {
auto& files = c->inputs_[picking_sr.level - start_level].files; auto& files = inputs[picking_sr.level - start_level].files;
for (auto* f : vstorage->LevelFiles(picking_sr.level)) { for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
files.push_back(f); files.push_back(f);
} }
@ -1331,7 +1275,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
LogToBuffer(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(), LogToBuffer(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(),
file_num_buf); file_num_buf);
} }
return c;
return new Compaction(
vstorage, mutable_cf_options, std::move(inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id,
GetCompressionType(ioptions_, start_level, 1, enable_compression),
/* grandparents */ {}, /* is manual */ false, score);
} }
// Look at overall size amplification. If size amplification // Look at overall size amplification. If size amplification
@ -1426,21 +1375,18 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
uint32_t path_id = GetPathId(ioptions_, estimated_total_size); uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
int start_level = sorted_runs[start_index].level; int start_level = sorted_runs[start_index].level;
// create a compaction request std::vector<CompactionInputFiles> inputs(vstorage->num_levels());
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
}
// We always compact all the files, so always compress. // We always compact all the files, so always compress.
Compaction* c = new Compaction(
vstorage->num_levels(), start_level, vstorage->num_levels() - 1,
mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1),
LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage->num_levels() - 1, 1));
c->score_ = score;
for (unsigned int loop = start_index; loop < sorted_runs.size(); loop++) { for (unsigned int loop = start_index; loop < sorted_runs.size(); loop++) {
auto& picking_sr = sorted_runs[loop]; auto& picking_sr = sorted_runs[loop];
if (picking_sr.level == 0) { if (picking_sr.level == 0) {
FileMetaData* f = picking_sr.file; FileMetaData* f = picking_sr.file;
c->inputs_[0].files.push_back(f); inputs[0].files.push_back(f);
} else { } else {
auto& files = c->inputs_[picking_sr.level - start_level].files; auto& files = inputs[picking_sr.level - start_level].files;
for (auto* f : vstorage->LevelFiles(picking_sr.level)) { for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
files.push_back(f); files.push_back(f);
} }
@ -1450,7 +1396,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
LogToBuffer(log_buffer, "[%s] Universal: size amp picking %s", LogToBuffer(log_buffer, "[%s] Universal: size amp picking %s",
cf_name.c_str(), file_num_buf); cf_name.c_str(), file_num_buf);
} }
return c;
return new Compaction(
vstorage, mutable_cf_options, std::move(inputs),
vstorage->num_levels() - 1,
mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1),
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage->num_levels() - 1, 1),
/* grandparents */ {}, /* is manual */ false, score);
} }
bool FIFOCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage) bool FIFOCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage)
@ -1489,13 +1442,14 @@ Compaction* FIFOCompactionPicker::PickCompaction(
return nullptr; return nullptr;
} }
Compaction* c = new Compaction(1, 0, 0, 0, 0, 0, kNoCompression, false, std::vector<CompactionInputFiles> inputs;
true /* is deletion compaction */); inputs.emplace_back();
inputs[0].level = 0;
// delete old files (FIFO) // delete old files (FIFO)
for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
auto f = *ritr; auto f = *ritr;
total_size -= f->compensated_file_size; total_size -= f->compensated_file_size;
c->inputs_[0].files.push_back(f); inputs[0].files.push_back(f);
char tmp_fsize[16]; char tmp_fsize[16];
AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64 LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
@ -1505,10 +1459,11 @@ Compaction* FIFOCompactionPicker::PickCompaction(
break; break;
} }
} }
Compaction* c = new Compaction(
c->MarkFilesBeingCompacted(true); vstorage, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0),
/* is deletion compaction */ true);
level0_compactions_in_progress_.insert(c); level0_compactions_in_progress_.insert(c);
c->mutable_cf_options_ = mutable_cf_options;
return c; return c;
} }
@ -1523,10 +1478,6 @@ Compaction* FIFOCompactionPicker::CompactRange(
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
Compaction* c = Compaction* c =
PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer); PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer);
if (c != nullptr) {
assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size()));
c->output_path_id_ = output_path_id;
}
log_buffer.FlushBufferToLog(); log_buffer.FlushBufferToLog();
return c; return c;
} }

@ -97,14 +97,14 @@ class CompactionPicker {
// Takes a list of CompactionInputFiles and returns a Compaction object. // Takes a list of CompactionInputFiles and returns a Compaction object.
Compaction* FormCompaction( Compaction* FormCompaction(
const CompactionOptions& compact_options, const CompactionOptions& compact_options,
const autovector<CompactionInputFiles>& input_files, const std::vector<CompactionInputFiles>& input_files, int output_level,
int output_level, VersionStorageInfo* vstorage, VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
const MutableCFOptions& mutable_cf_options) const; uint32_t output_path_id) const;
// Converts a set of compaction input file numbers into // Converts a set of compaction input file numbers into
// a list of CompactionInputFiles. // a list of CompactionInputFiles.
Status GetCompactionInputsFromFileNumbers( Status GetCompactionInputsFromFileNumbers(
autovector<CompactionInputFiles>* input_files, std::vector<CompactionInputFiles>* input_files,
std::unordered_set<uint64_t>* input_set, std::unordered_set<uint64_t>* input_set,
const VersionStorageInfo* vstorage, const VersionStorageInfo* vstorage,
const CompactionOptions& compact_options) const; const CompactionOptions& compact_options) const;
@ -136,16 +136,25 @@ class CompactionPicker {
// //
// Will return false if it is impossible to apply this compaction. // Will return false if it is impossible to apply this compaction.
bool ExpandWhileOverlapping(const std::string& cf_name, bool ExpandWhileOverlapping(const std::string& cf_name,
VersionStorageInfo* vstorage, Compaction* c); VersionStorageInfo* vstorage,
CompactionInputFiles* inputs);
// Returns true if any one of the parent files are being compacted // Returns true if any one of the parent files are being compacted
bool RangeInCompaction(VersionStorageInfo* vstorage, bool RangeInCompaction(VersionStorageInfo* vstorage,
const InternalKey* smallest, const InternalKey* smallest,
const InternalKey* largest, int level, int* index); const InternalKey* largest, int level, int* index);
void SetupOtherInputs(const std::string& cf_name, bool SetupOtherInputs(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, Compaction* c); VersionStorageInfo* vstorage,
CompactionInputFiles* inputs,
CompactionInputFiles* output_level_inputs,
int* parent_index, int base_index);
void GetGrandparents(VersionStorageInfo* vstorage,
const CompactionInputFiles& inputs,
const CompactionInputFiles& output_level_inputs,
std::vector<FileMetaData*>* grandparents);
const ImmutableCFOptions& ioptions_; const ImmutableCFOptions& ioptions_;
@ -190,13 +199,14 @@ class LevelCompactionPicker : public CompactionPicker {
int level); int level);
private: private:
// For the specfied level, pick a compaction. // For the specfied level, pick a file that we want to compact.
// Returns nullptr if there is no compaction to be done. // Returns false if there is no file to compact.
// If it returns true, inputs->files.size() will be exactly one.
// If level is 0 and there is already a compaction on that level, this // If level is 0 and there is already a compaction on that level, this
// function will return nullptr. // function will return false.
Compaction* PickCompactionBySize(const MutableCFOptions& mutable_cf_options, bool PickCompactionBySize(VersionStorageInfo* vstorage, int level,
VersionStorageInfo* vstorage, int level, int output_level, CompactionInputFiles* inputs,
double score); int* parent_index, int* base_index);
}; };
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE

@ -229,7 +229,7 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic) {
ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(num_levels, static_cast<int>(compaction->num_input_levels())); ASSERT_EQ(1, static_cast<int>(compaction->num_input_levels()));
ASSERT_EQ(num_levels - 1, compaction->output_level()); ASSERT_EQ(num_levels - 1, compaction->output_level());
} }
@ -253,7 +253,7 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic2) {
ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(num_levels - 1, static_cast<int>(compaction->num_input_levels())); ASSERT_EQ(1, static_cast<int>(compaction->num_input_levels()));
ASSERT_EQ(num_levels - 2, compaction->output_level()); ASSERT_EQ(num_levels - 2, compaction->output_level());
} }
@ -278,7 +278,7 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic3) {
ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(num_levels - 2, static_cast<int>(compaction->num_input_levels())); ASSERT_EQ(1, static_cast<int>(compaction->num_input_levels()));
ASSERT_EQ(num_levels - 3, compaction->output_level()); ASSERT_EQ(num_levels - 3, compaction->output_level());
} }
@ -306,10 +306,11 @@ TEST_F(CompactionPickerTest, Level0TriggerDynamic4) {
ASSERT_EQ(2U, compaction->num_input_files(0)); ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
ASSERT_EQ(2U, compaction->num_input_files(num_levels - 3)); ASSERT_EQ(2U, compaction->num_input_files(1));
ASSERT_EQ(5U, compaction->input(num_levels - 3, 0)->fd.GetNumber()); ASSERT_EQ(num_levels - 3, compaction->level(1));
ASSERT_EQ(6U, compaction->input(num_levels - 3, 1)->fd.GetNumber()); ASSERT_EQ(5U, compaction->input(1, 0)->fd.GetNumber());
ASSERT_EQ(num_levels - 2, static_cast<int>(compaction->num_input_levels())); ASSERT_EQ(6U, compaction->input(1, 1)->fd.GetNumber());
ASSERT_EQ(2, static_cast<int>(compaction->num_input_levels()));
ASSERT_EQ(num_levels - 3, compaction->output_level()); ASSERT_EQ(num_levels - 3, compaction->output_level());
} }

@ -1399,7 +1399,7 @@ Status DBImpl::CompactFilesImpl(
return s; return s;
} }
autovector<CompactionInputFiles> input_files; std::vector<CompactionInputFiles> input_files;
s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers( s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
&input_files, &input_set, version->storage_info(), compact_options); &input_files, &input_set, version->storage_info(), compact_options);
if (!s.ok()) { if (!s.ok()) {
@ -1420,12 +1420,10 @@ Status DBImpl::CompactFilesImpl(
unique_ptr<Compaction> c; unique_ptr<Compaction> c;
assert(cfd->compaction_picker()); assert(cfd->compaction_picker());
c.reset(cfd->compaction_picker()->FormCompaction( c.reset(cfd->compaction_picker()->FormCompaction(
compact_options, input_files, compact_options, input_files, output_level, version->storage_info(),
output_level, version->storage_info(), *cfd->GetLatestMutableCFOptions(), output_path_id));
*cfd->GetLatestMutableCFOptions()));
assert(c); assert(c);
c->SetInputVersion(version); c->SetInputVersion(version);
c->SetOutputPathId(static_cast<uint32_t>(output_path_id));
// deletion compaction currently not allowed in CompactFiles. // deletion compaction currently not allowed in CompactFiles.
assert(!c->IsDeletionCompaction()); assert(!c->IsDeletionCompaction());

@ -12457,6 +12457,7 @@ TEST_F(DBTest, CompressLevelCompaction) {
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }

Loading…
Cancel
Save