Allow class Compaction to handle input files from multiple levels.

Summary:
Allow class Compaction to handle input files from multiple levels.
This diff is a subset of https://reviews.facebook.net/D19263 where
only db/compaction.cc and db/compaction.h are changed.

Test Plan:
make db_test
export ROCKSDB_TESTS=Compaction
./db_test

Reviewers: igor, sdong, ljin

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D19923
main
Yueh-Hsuan Chiang 11 years ago
parent 296e340753
commit 3178510153
  1. 68
      db/compaction.cc
  2. 116
      db/compaction.h
  3. 2
      db/db_impl.cc

@ -26,14 +26,14 @@ static uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
return sum; return sum;
} }
Compaction::Compaction(Version* input_version, int level, int out_level, Compaction::Compaction(Version* input_version, int start_level, int out_level,
uint64_t target_file_size, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, uint64_t max_grandparent_overlap_bytes,
uint32_t output_path_id, uint32_t output_path_id,
CompressionType output_compression, bool seek_compaction, CompressionType output_compression, bool seek_compaction,
bool deletion_compaction) bool deletion_compaction)
: level_(level), : start_level_(start_level),
out_level_(out_level), output_level_(out_level),
max_output_file_size_(target_file_size), max_output_file_size_(target_file_size),
max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes), max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes),
input_version_(input_version), input_version_(input_version),
@ -61,8 +61,10 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
for (int i = 0; i < number_levels_; i++) { for (int i = 0; i < number_levels_; i++) {
level_ptrs_[i] = 0; level_ptrs_[i] = 0;
} }
for (int i = 0; i < 2; ++i) { int num_levels = output_level_ - start_level_ + 1;
inputs_[i].level = level_ + i; inputs_.resize(num_levels);
for (int i = 0; i < num_levels; ++i) {
inputs_[i].level = start_level_ + i;
} }
} }
@ -89,39 +91,39 @@ bool Compaction::IsTrivialMove() const {
// Avoid a move if there is lots of overlapping grandparent data. // Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require // Otherwise, the move could create a parent file that will require
// a very expensive merge later on. // a very expensive merge later on.
// If level_== out_level_, the purpose is to force compaction filter to be // If start_level_== output_level_, the purpose is to force compaction
// applied to that level, and thus cannot be a trivia move. // filter to be applied to that level, and thus cannot be a trivia move.
return (level_ != out_level_ && return (start_level_ != output_level_ &&
num_input_levels() == 2 &&
num_input_files(0) == 1 && num_input_files(0) == 1 &&
num_input_files(1) == 0 && num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_);
} }
bool Compaction::IsDeletionCompaction() const { return deletion_compaction_; }
void Compaction::AddInputDeletions(VersionEdit* edit) { void Compaction::AddInputDeletions(VersionEdit* edit) {
for (int which = 0; which < 2; which++) { for (int which = 0; which < num_input_levels(); which++) {
for (size_t i = 0; i < inputs_[which].size(); i++) { for (size_t i = 0; i < inputs_[which].size(); i++) {
edit->DeleteFile(level_ + which, inputs_[which][i]->fd.GetNumber()); edit->DeleteFile(level(which), inputs_[which][i]->fd.GetNumber());
} }
} }
} }
bool Compaction::IsBaseLevelForKey(const Slice& user_key) { bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) {
assert(cfd_->options()->compaction_style != kCompactionStyleFIFO); assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
return bottommost_level_; return bottommost_level_;
} }
// Maybe use binary search to find right entry instead of linear search? // Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = cfd_->user_comparator(); const Comparator* user_cmp = cfd_->user_comparator();
for (int lvl = level_ + 2; lvl < number_levels_; lvl++) { for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
const std::vector<FileMetaData*>& files = input_version_->files_[lvl]; const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
for (; level_ptrs_[lvl] < files.size(); ) { for (; level_ptrs_[lvl] < files.size(); ) {
FileMetaData* f = files[level_ptrs_[lvl]]; FileMetaData* f = files[level_ptrs_[lvl]];
if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) { if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
// We've advanced far enough // We've advanced far enough
if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) { if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
// Key falls in this file's range, so definitely not base level // Key falls in this file's range, so definitely
// exists beyond output level
return false; return false;
} }
break; break;
@ -159,18 +161,18 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) {
} }
// Mark (or clear) each file that is being compacted // Mark (or clear) each file that is being compacted
void Compaction::MarkFilesBeingCompacted(bool value) { void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
for (int i = 0; i < 2; i++) { for (int i = 0; i < num_input_levels(); i++) {
for (unsigned int j = 0; j < inputs_[i].size(); j++) { for (unsigned int j = 0; j < inputs_[i].size(); j++) {
assert(value ? !inputs_[i][j]->being_compacted : assert(mark_as_compacted ? !inputs_[i][j]->being_compacted :
inputs_[i][j]->being_compacted); inputs_[i][j]->being_compacted);
inputs_[i][j]->being_compacted = value; inputs_[i][j]->being_compacted = mark_as_compacted;
} }
} }
} }
// Is this compaction producing files at the bottommost level? // Is this compaction producing files at the bottommost level?
void Compaction::SetupBottomMostLevel(bool isManual) { void Compaction::SetupBottomMostLevel(bool is_manual) {
assert(cfd_->options()->compaction_style != kCompactionStyleFIFO); assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
// If universal compaction style is used and manual // If universal compaction style is used and manual
@ -179,13 +181,14 @@ void Compaction::SetupBottomMostLevel(bool isManual) {
// run. We can safely set bottommost_level_ = true. // run. We can safely set bottommost_level_ = true.
// If it is not manual compaction, then bottommost_level_ // If it is not manual compaction, then bottommost_level_
// is already set when the Compaction was created. // is already set when the Compaction was created.
if (isManual) { if (is_manual) {
bottommost_level_ = true; bottommost_level_ = true;
} }
return; return;
} }
bottommost_level_ = true; bottommost_level_ = true;
for (int i = output_level() + 1; i < number_levels_; i++) { // checks whether there are files living beyond the output_level.
for (int i = output_level_ + 1; i < number_levels_; i++) {
if (input_version_->NumLevelFiles(i) > 0) { if (input_version_->NumLevelFiles(i) > 0) {
bottommost_level_ = false; bottommost_level_ = false;
break; break;
@ -211,7 +214,7 @@ void Compaction::ReleaseCompactionFiles(Status status) {
} }
void Compaction::ResetNextCompactionIndex() { void Compaction::ResetNextCompactionIndex() {
input_version_->ResetNextCompactionIndex(level_); input_version_->ResetNextCompactionIndex(start_level_);
} }
namespace { namespace {
@ -238,25 +241,24 @@ void Compaction::Summary(char* output, int len) {
int write = int write =
snprintf(output, len, "Base version %" PRIu64 snprintf(output, len, "Base version %" PRIu64
" Base level %d, seek compaction:%d, inputs: [", " Base level %d, seek compaction:%d, inputs: [",
input_version_->GetVersionNumber(), level_, seek_compaction_); input_version_->GetVersionNumber(),
if (write < 0 || write >= len) { start_level_, seek_compaction_);
return;
}
write += InputSummary(inputs_[0].files, output + write, len - write);
if (write < 0 || write >= len) { if (write < 0 || write >= len) {
return; return;
} }
for (int level = 0; level < num_input_levels(); ++level) {
if (level > 0) {
write += snprintf(output + write, len - write, "], ["); write += snprintf(output + write, len - write, "], [");
if (write < 0 || write >= len) { if (write < 0 || write >= len) {
return; return;
} }
}
write += InputSummary(inputs_[1].files, output + write, len - write); write += InputSummary(inputs_[level].files, output + write, len - write);
if (write < 0 || write >= len) { if (write < 0 || write >= len) {
return; return;
} }
}
snprintf(output + write, len - write, "]"); snprintf(output + write, len - write, "]");
} }
@ -268,10 +270,12 @@ uint64_t Compaction::OutputFilePreallocationSize() {
preallocation_size = preallocation_size =
cfd_->compaction_picker()->MaxFileSizeForLevel(output_level()); cfd_->compaction_picker()->MaxFileSizeForLevel(output_level());
} else { } else {
for (const auto& f : inputs_[0].files) { for (int level = 0; level < num_input_levels(); ++level) {
for (const auto& f : inputs_[level].files) {
preallocation_size += f->fd.GetFileSize(); preallocation_size += f->fd.GetFileSize();
} }
} }
}
// Over-estimate slightly so we don't end up just barely crossing // Over-estimate slightly so we don't end up just barely crossing
// the threshold // the threshold
return preallocation_size * 1.1; return preallocation_size * 1.1;

@ -14,6 +14,8 @@
namespace rocksdb { namespace rocksdb {
// The structure that manages compaction input files associated
// with the same physical level.
struct CompactionInputFiles { struct CompactionInputFiles {
int level; int level;
std::vector<FileMetaData*> files; std::vector<FileMetaData*> files;
@ -36,35 +38,63 @@ class Compaction {
~Compaction(); ~Compaction();
// Returns the level associated to the specified compaction input level. // Returns the level associated to the specified compaction input level.
// If input_level is not specified, then input_level is set to 0. // If compaction_input_level is not specified, then input_level is set to 0.
int level(int input_level = 0) const { return inputs_[input_level].level; } int level(int compaction_input_level = 0) const {
return inputs_[compaction_input_level].level;
}
// Outputs will go to this level // Outputs will go to this level
int output_level() const { return out_level_; } int output_level() const { return output_level_; }
// Returns the number of input levels in this compaction.
int num_input_levels() const { return inputs_.size(); }
// Return the object that holds the edits to the descriptor done // Return the object that holds the edits to the descriptor done
// by this compaction. // by this compaction.
VersionEdit* edit() { return edit_; } VersionEdit* edit() const { return edit_; }
// "which" must be either 0 or 1 // Returns the number of input files associated to the specified
int num_input_files(int which) const { return inputs_[which].size(); } // compaction input level.
// The function will return 0 if when "compaction_input_level" < 0
// or "compaction_input_level" >= "num_input_levels()".
int num_input_files(int compaction_input_level) const {
if (compaction_input_level >= 0 &&
compaction_input_level < inputs_.size()) {
return inputs_[compaction_input_level].size();
}
return 0;
}
// Returns input version of the compaction // Returns input version of the compaction
Version* input_version() const { return input_version_; } Version* input_version() const { return input_version_; }
// Returns the ColumnFamilyData associated with the compaction.
ColumnFamilyData* column_family_data() const { return cfd_; } ColumnFamilyData* column_family_data() const { return cfd_; }
// Return the ith input file at "level()+which" ("which" must be 0 or 1). // Returns the file meta data of the 'i'th input file at the
FileMetaData* input(int which, int i) const { return inputs_[which][i]; } // specified compaction input level.
// REQUIREMENT: "compaction_input_level" must be >= 0 and
// < "input_levels()"
FileMetaData* input(int compaction_input_level, int i) const {
assert(compaction_input_level < inputs_.size() &&
compaction_input_level >= 0);
return inputs_[compaction_input_level][i];
}
// Returns the list of FileMataData associated with the specified // Returns the list of file meta data of the specified compaction
// compaction input level. // input level.
std::vector<FileMetaData*>* inputs(int which) { // REQUIREMENT: "compaction_input_level" must be >= 0 and
return &inputs_[which].files; // < "input_levels()"
std::vector<FileMetaData*>* const inputs(int compaction_input_level) {
assert(compaction_input_level < inputs_.size() &&
compaction_input_level >= 0);
return &inputs_[compaction_input_level].files;
} }
// Return the input_level file // Returns the FileLevel of the specified compaction input level.
FileLevel* input_levels(int which) { return &input_levels_[which]; } FileLevel* input_levels(int compaction_input_level) {
return &input_levels_[compaction_input_level];
}
// Maximum size of files to build during this compaction. // Maximum size of files to build during this compaction.
uint64_t MaxOutputFileSize() const { return max_output_file_size_; } uint64_t MaxOutputFileSize() const { return max_output_file_size_; }
@ -83,16 +113,17 @@ class Compaction {
// moving a single input file to the next level (no merging or splitting) // moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const; bool IsTrivialMove() const;
// If true, just delete all files in inputs_[0] // If true, then the comaction can be done by simply deleting input files.
bool IsDeletionCompaction() const; bool IsDeletionCompaction() const {
return deletion_compaction_;
}
// Add all inputs to this compaction as delete operations to *edit. // Add all inputs to this compaction as delete operations to *edit.
void AddInputDeletions(VersionEdit* edit); void AddInputDeletions(VersionEdit* edit);
// Returns true if the information we have available guarantees that // Returns true if the available information we have guarantees that
// the compaction is producing data in "level+1" for which no data exists // the input "user_key" does not exist in any level beyond "output_level()".
// in levels greater than "level+1". bool KeyNotExistsBeyondOutputLevel(const Slice& user_key);
bool IsBaseLevelForKey(const Slice& user_key);
// Returns true iff we should stop building the current output // Returns true iff we should stop building the current output
// before processing "internal_key". // before processing "internal_key".
@ -106,6 +137,9 @@ class Compaction {
// Delete this compaction from the list of running compactions. // Delete this compaction from the list of running compactions.
void ReleaseCompactionFiles(Status status); void ReleaseCompactionFiles(Status status);
// Returns the summary of the compaction in "output" with maximum "len"
// in bytes. The caller is responsible for the memory management of
// "output".
void Summary(char* output, int len); void Summary(char* output, int len);
// Return the score that was used to pick this compaction run. // Return the score that was used to pick this compaction run.
@ -120,9 +154,9 @@ class Compaction {
// Was this compaction triggered manually by the client? // Was this compaction triggered manually by the client?
bool IsManualCompaction() { return is_manual_compaction_; } bool IsManualCompaction() { return is_manual_compaction_; }
// Returns a number of byte that the output file should be preallocated to // Returns the size in bytes that the output file should be preallocated to.
// In level compaction, that is max_file_size_. In universal compaction, that // In level compaction, that is max_file_size_. In universal compaction, that
// is the sum of all input file sizes // is the sum of all input file sizes.
uint64_t OutputFilePreallocationSize(); uint64_t OutputFilePreallocationSize();
private: private:
@ -131,13 +165,13 @@ class Compaction {
friend class FIFOCompactionPicker; friend class FIFOCompactionPicker;
friend class LevelCompactionPicker; friend class LevelCompactionPicker;
Compaction(Version* input_version, int level, int out_level, Compaction(Version* input_version, int start_level, int out_level,
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
uint32_t output_path_id, CompressionType output_compression, uint32_t output_path_id, CompressionType output_compression,
bool seek_compaction = false, bool deletion_compaction = false); bool seek_compaction = false, bool deletion_compaction = false);
int level_; const int start_level_; // the lowest level to be compacted
int out_level_; // levels to which output files are stored const int output_level_; // levels to which output files are stored
uint64_t max_output_file_size_; uint64_t max_output_file_size_;
uint64_t max_grandparent_overlap_bytes_; uint64_t max_grandparent_overlap_bytes_;
Version* input_version_; Version* input_version_;
@ -149,24 +183,26 @@ class Compaction {
uint32_t output_path_id_; uint32_t output_path_id_;
CompressionType output_compression_; CompressionType output_compression_;
bool seek_compaction_; bool seek_compaction_;
// if true, just delete files in inputs_[0] // If true, then the comaction can be done by simply deleting input files.
bool deletion_compaction_; bool deletion_compaction_;
// Each compaction reads inputs from "level_" and "level_+1" // Compaction input files organized by level.
CompactionInputFiles inputs_[2]; // The two sets of inputs autovector<CompactionInputFiles> inputs_;
// A copy of inputs_, organized more closely in memory // A copy of inputs_, organized more closely in memory
autovector<FileLevel, 2> input_levels_; autovector<FileLevel, 2> input_levels_;
// State used to check for number of of overlapping grandparent files // State used to check for number of of overlapping grandparent files
// (parent == level_ + 1, grandparent == level_ + 2) // (grandparent == "output_level_ + 1")
// This vector is updated by Version::GetOverlappingInputs().
std::vector<FileMetaData*> grandparents_; std::vector<FileMetaData*> grandparents_;
size_t grandparent_index_; // Index in grandparent_starts_ size_t grandparent_index_; // Index in grandparent_starts_
bool seen_key_; // Some output key has been seen bool seen_key_; // Some output key has been seen
uint64_t overlapped_bytes_; // Bytes of overlap between current output uint64_t overlapped_bytes_; // Bytes of overlap between current output
// and grandparent files // and grandparent files
int base_index_; // index of the file in files_[level_] int base_index_; // index of the file in files_[start_level_]
int parent_index_; // index of some file with same range in files_[level_+1] int parent_index_; // index of some file with same range in
// files_[start_level_+1]
double score_; // score that was used to pick this compaction. double score_; // score that was used to pick this compaction.
// Is this compaction creating a file in the bottom most level? // Is this compaction creating a file in the bottom most level?
@ -177,17 +213,21 @@ class Compaction {
// Is this compaction requested by the client? // Is this compaction requested by the client?
bool is_manual_compaction_; bool is_manual_compaction_;
// level_ptrs_ holds indices into input_version_->levels_: our state // "level_ptrs_" holds indices into "input_version_->levels_", where each
// is that we are positioned at one of the file ranges for each // index remembers which file of an associated level we are currently used
// higher level than the ones involved in this compaction (i.e. for // to check KeyNotExistsBeyondOutputLevel() for deletion operation.
// all L >= level_ + 2). // As it is for checking KeyNotExistsBeyondOutputLevel(), it only
// records indices for all levels beyond "output_level_".
std::vector<size_t> level_ptrs_; std::vector<size_t> level_ptrs_;
// mark (or clear) all files that are being compacted // mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool); void MarkFilesBeingCompacted(bool mark_as_compacted);
// Initialize whether compaction producing files at the bottommost level // Initialize whether the compaction is producing files at the
void SetupBottomMostLevel(bool isManual); // bottommost level.
//
// @see BottomMostLevel()
void SetupBottomMostLevel(bool is_manual);
// In case of compaction error, reset the nextIndex that is used // In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_ // to pick up the next file to be compacted from files_by_size_

@ -2670,7 +2670,7 @@ Status DBImpl::ProcessKeyValueCompaction(
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_NEWER_ENTRY); RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_NEWER_ENTRY);
} else if (ikey.type == kTypeDeletion && } else if (ikey.type == kTypeDeletion &&
ikey.sequence <= earliest_snapshot && ikey.sequence <= earliest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) { compact->compaction->KeyNotExistsBeyondOutputLevel(ikey.user_key)) {
// For this user key: // For this user key:
// (1) there is no data in higher levels // (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers // (2) data in lower levels will have larger sequence numbers

Loading…
Cancel
Save