Add a mode to always pick the oldest file to compact for each level

Summary:
Add options.compaction_pri, which specifies the policy about which file to compact first.
kCompactionPriByLargestSeq will compact oldest files first.
Verified the behavior in db_bench but did not write unit tests yet. Also need to make it settable through option string and dynamically changeable.

Test Plan: Will write unit tests

Reviewers: igor, rven, anthony, kradhakrishnan, IslamAbdelRahman, yhchiang, MarkCallaghan

Reviewed By: yhchiang

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D45951
main
sdong 9 years ago
parent dd2e1eeb35
commit f1b9f804e9
  1. 7
      db/compaction_picker.cc
  2. 2
      db/compaction_picker_test.cc
  3. 6
      db/db_bench.cc
  4. 2
      db/version_builder_test.cc
  5. 32
      db/version_set.cc
  6. 27
      db/version_set.h
  7. 12
      include/rocksdb/options.h
  8. 3
      util/mutable_cf_options.h
  9. 4
      util/options.cc

@ -1028,7 +1028,7 @@ bool LevelCompactionPicker::PickCompactionBySize(VersionStorageInfo* vstorage,
// Pick the largest file in this level that is not already // Pick the largest file in this level that is not already
// being compacted // being compacted
const std::vector<int>& file_size = vstorage->FilesBySize(level); const std::vector<int>& file_size = vstorage->FilesByCompactionPri(level);
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(level); const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(level);
// record the first file that is not yet compacted // record the first file that is not yet compacted
@ -1039,11 +1039,6 @@ bool LevelCompactionPicker::PickCompactionBySize(VersionStorageInfo* vstorage,
int index = file_size[i]; int index = file_size[i];
auto* f = level_files[index]; auto* f = level_files[index];
assert((i == file_size.size() - 1) ||
(i >= VersionStorageInfo::kNumberFilesToSort - 1) ||
(f->compensated_file_size >=
level_files[file_size[i + 1]]->compensated_file_size));
// do not pick a file to compact if it is being compacted // do not pick a file to compact if it is being compacted
// from n-1 level. // from n-1 level.
if (f->being_compacted) { if (f->being_compacted) {

@ -117,7 +117,7 @@ class CompactionPickerTest : public testing::Test {
void UpdateVersionStorageInfo() { void UpdateVersionStorageInfo() {
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_); vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
vstorage_->UpdateFilesBySize(); vstorage_->UpdateFilesByCompactionPri(mutable_cf_options_);
vstorage_->UpdateNumNonEmptyLevels(); vstorage_->UpdateNumNonEmptyLevels();
vstorage_->GenerateFileIndexer(); vstorage_->GenerateFileIndexer();
vstorage_->GenerateLevelFilesBrief(); vstorage_->GenerateLevelFilesBrief();

@ -314,6 +314,10 @@ static rocksdb::CompactionStyle FLAGS_compaction_style_e;
DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style, DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style,
"style of compaction: level-based vs universal"); "style of compaction: level-based vs universal");
static rocksdb::CompactionPri FLAGS_compaction_pri_e;
DEFINE_int32(compaction_pri, (int32_t)rocksdb::Options().compaction_style,
"priority of files to compaction: by size or by data age");
DEFINE_int32(universal_size_ratio, 0, DEFINE_int32(universal_size_ratio, 0,
"Percentage flexibility while comparing file size" "Percentage flexibility while comparing file size"
" (for universal compaction only)."); " (for universal compaction only).");
@ -2248,6 +2252,7 @@ class Benchmark {
options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions); options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
options.max_background_flushes = FLAGS_max_background_flushes; options.max_background_flushes = FLAGS_max_background_flushes;
options.compaction_style = FLAGS_compaction_style_e; options.compaction_style = FLAGS_compaction_style_e;
options.compaction_pri = FLAGS_compaction_pri_e;
if (FLAGS_prefix_size != 0) { if (FLAGS_prefix_size != 0) {
options.prefix_extractor.reset( options.prefix_extractor.reset(
NewFixedPrefixTransform(FLAGS_prefix_size)); NewFixedPrefixTransform(FLAGS_prefix_size));
@ -3957,6 +3962,7 @@ int main(int argc, char** argv) {
if (FLAGS_statistics) { if (FLAGS_statistics) {
dbstats = rocksdb::CreateDBStatistics(); dbstats = rocksdb::CreateDBStatistics();
} }
FLAGS_compaction_pri_e = (rocksdb::CompactionPri)FLAGS_compaction_pri;
std::vector<std::string> fanout = rocksdb::StringSplit( std::vector<std::string> fanout = rocksdb::StringSplit(
FLAGS_max_bytes_for_level_multiplier_additional, ','); FLAGS_max_bytes_for_level_multiplier_additional, ',');

@ -77,7 +77,7 @@ class VersionBuilderTest : public testing::Test {
} }
void UpdateVersionStorageInfo() { void UpdateVersionStorageInfo() {
vstorage_.UpdateFilesBySize(); vstorage_.UpdateFilesByCompactionPri(mutable_cf_options_);
vstorage_.UpdateNumNonEmptyLevels(); vstorage_.UpdateNumNonEmptyLevels();
vstorage_.GenerateFileIndexer(); vstorage_.GenerateFileIndexer();
vstorage_.GenerateLevelFilesBrief(); vstorage_.GenerateLevelFilesBrief();

@ -782,7 +782,7 @@ VersionStorageInfo::VersionStorageInfo(
compaction_style_(compaction_style), compaction_style_(compaction_style),
files_(new std::vector<FileMetaData*>[num_levels_]), files_(new std::vector<FileMetaData*>[num_levels_]),
base_level_(num_levels_ == 1 ? -1 : 1), base_level_(num_levels_ == 1 ? -1 : 1),
files_by_size_(num_levels_), files_by_compaction_pri_(num_levels_),
level0_non_overlapping_(false), level0_non_overlapping_(false),
next_file_to_compact_by_size_(num_levels_), next_file_to_compact_by_size_(num_levels_),
compaction_score_(num_levels_), compaction_score_(num_levels_),
@ -923,7 +923,7 @@ void Version::PrepareApply(
UpdateAccumulatedStats(update_stats); UpdateAccumulatedStats(update_stats);
storage_info_.UpdateNumNonEmptyLevels(); storage_info_.UpdateNumNonEmptyLevels();
storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options); storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
storage_info_.UpdateFilesBySize(); storage_info_.UpdateFilesByCompactionPri(mutable_cf_options);
storage_info_.GenerateFileIndexer(); storage_info_.GenerateFileIndexer();
storage_info_.GenerateLevelFilesBrief(); storage_info_.GenerateLevelFilesBrief();
storage_info_.GenerateLevel0NonOverlapping(); storage_info_.GenerateLevel0NonOverlapping();
@ -1227,7 +1227,6 @@ bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
return (first.file->compensated_file_size > return (first.file->compensated_file_size >
second.file->compensated_file_size); second.file->compensated_file_size);
} }
} // anonymous namespace } // anonymous namespace
void VersionStorageInfo::AddFile(int level, FileMetaData* f) { void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
@ -1245,7 +1244,7 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
// following functions called: // following functions called:
// 1. UpdateNumNonEmptyLevels(); // 1. UpdateNumNonEmptyLevels();
// 2. CalculateBaseBytes(); // 2. CalculateBaseBytes();
// 3. UpdateFilesBySize(); // 3. UpdateFilesByCompactionPri();
// 4. GenerateFileIndexer(); // 4. GenerateFileIndexer();
// 5. GenerateLevelFilesBrief(); // 5. GenerateLevelFilesBrief();
// 6. GenerateLevel0NonOverlapping(); // 6. GenerateLevel0NonOverlapping();
@ -1297,7 +1296,8 @@ void VersionStorageInfo::UpdateNumNonEmptyLevels() {
} }
} }
void VersionStorageInfo::UpdateFilesBySize() { void VersionStorageInfo::UpdateFilesByCompactionPri(
const MutableCFOptions& mutable_cf_options) {
if (compaction_style_ == kCompactionStyleFIFO || if (compaction_style_ == kCompactionStyleFIFO ||
compaction_style_ == kCompactionStyleUniversal) { compaction_style_ == kCompactionStyleUniversal) {
// don't need this // don't need this
@ -1306,8 +1306,8 @@ void VersionStorageInfo::UpdateFilesBySize() {
// No need to sort the highest level because it is never compacted. // No need to sort the highest level because it is never compacted.
for (int level = 0; level < num_levels() - 1; level++) { for (int level = 0; level < num_levels() - 1; level++) {
const std::vector<FileMetaData*>& files = files_[level]; const std::vector<FileMetaData*>& files = files_[level];
auto& files_by_size = files_by_size_[level]; auto& files_by_compaction_pri = files_by_compaction_pri_[level];
assert(files_by_size.size() == 0); assert(files_by_compaction_pri.size() == 0);
// populate a temp vector for sorting based on size // populate a temp vector for sorting based on size
std::vector<Fsize> temp(files.size()); std::vector<Fsize> temp(files.size());
@ -1321,16 +1321,28 @@ void VersionStorageInfo::UpdateFilesBySize() {
if (num > temp.size()) { if (num > temp.size()) {
num = temp.size(); num = temp.size();
} }
switch (mutable_cf_options.compaction_pri) {
case kCompactionPriByCompensatedSize:
std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
CompareCompensatedSizeDescending); CompareCompensatedSizeDescending);
break;
case kCompactionPriByLargestSeq:
std::sort(temp.begin(), temp.end(),
[this](const Fsize& f1, const Fsize& f2) -> bool {
return f1.file->largest_seqno < f2.file->largest_seqno;
});
break;
default:
assert(false);
}
assert(temp.size() == files.size()); assert(temp.size() == files.size());
// initialize files_by_size_ // initialize files_by_compaction_pri_
for (unsigned int i = 0; i < temp.size(); i++) { for (unsigned int i = 0; i < temp.size(); i++) {
files_by_size.push_back(temp[i].index); files_by_compaction_pri.push_back(temp[i].index);
} }
next_file_to_compact_by_size_[level] = 0; next_file_to_compact_by_size_[level] = 0;
assert(files_[level].size() == files_by_size_[level].size()); assert(files_[level].size() == files_by_compaction_pri_[level].size());
} }
} }

@ -132,8 +132,9 @@ class VersionStorageInfo {
// Generate level_files_brief_ from files_ // Generate level_files_brief_ from files_
void GenerateLevelFilesBrief(); void GenerateLevelFilesBrief();
// Sort all files for this version based on their file size and // Sort all files for this version based on their file size and
// record results in files_by_size_. The largest files are listed first. // record results in files_by_compaction_pri_. The largest files are listed
void UpdateFilesBySize(); // first.
void UpdateFilesByCompactionPri(const MutableCFOptions& mutable_cf_options);
void GenerateLevel0NonOverlapping(); void GenerateLevel0NonOverlapping();
bool level0_non_overlapping() const { bool level0_non_overlapping() const {
@ -226,9 +227,9 @@ class VersionStorageInfo {
} }
// REQUIRES: This version has been saved (see VersionSet::SaveTo) // REQUIRES: This version has been saved (see VersionSet::SaveTo)
const std::vector<int>& FilesBySize(int level) const { const std::vector<int>& FilesByCompactionPri(int level) const {
assert(finalized_); assert(finalized_);
return files_by_size_[level]; return files_by_compaction_pri_[level];
} }
// REQUIRES: This version has been saved (see VersionSet::SaveTo) // REQUIRES: This version has been saved (see VersionSet::SaveTo)
@ -242,7 +243,7 @@ class VersionStorageInfo {
int base_level() const { return base_level_; } int base_level() const { return base_level_; }
// REQUIRES: lock is held // REQUIRES: lock is held
// Set the index that is used to offset into files_by_size_ to find // Set the index that is used to offset into files_by_compaction_pri_ to find
// the next compaction candidate file. // the next compaction candidate file.
void SetNextCompactionIndex(int level, int index) { void SetNextCompactionIndex(int level, int index) {
next_file_to_compact_by_size_[level] = index; next_file_to_compact_by_size_[level] = index;
@ -259,7 +260,7 @@ class VersionStorageInfo {
return file_indexer_; return file_indexer_;
} }
// Only the first few entries of files_by_size_ are sorted. // Only the first few entries of files_by_compaction_pri_ are sorted.
// There is no need to sort all the files because it is likely // There is no need to sort all the files because it is likely
// that on a running system, we need to look at only the first // that on a running system, we need to look at only the first
// few largest files because a new version is created every few // few largest files because a new version is created every few
@ -299,7 +300,8 @@ class VersionStorageInfo {
uint64_t GetEstimatedActiveKeys() const; uint64_t GetEstimatedActiveKeys() const;
// re-initializes the index that is used to offset into files_by_size_ // re-initializes the index that is used to offset into
// files_by_compaction_pri_
// to find the next compaction candidate file. // to find the next compaction candidate file.
void ResetNextCompactionIndex(int level) { void ResetNextCompactionIndex(int level) {
next_file_to_compact_by_size_[level] = 0; next_file_to_compact_by_size_[level] = 0;
@ -351,16 +353,16 @@ class VersionStorageInfo {
// but files in each level are now sorted based on file // but files in each level are now sorted based on file
// size. The file with the largest size is at the front. // size. The file with the largest size is at the front.
// This vector stores the index of the file from files_. // This vector stores the index of the file from files_.
std::vector<std::vector<int>> files_by_size_; std::vector<std::vector<int>> files_by_compaction_pri_;
// If true, means that files in L0 have keys with non overlapping ranges // If true, means that files in L0 have keys with non overlapping ranges
bool level0_non_overlapping_; bool level0_non_overlapping_;
// An index into files_by_size_ that specifies the first // An index into files_by_compaction_pri_ that specifies the first
// file that is not yet compacted // file that is not yet compacted
std::vector<int> next_file_to_compact_by_size_; std::vector<int> next_file_to_compact_by_size_;
// Only the first few entries of files_by_size_ are sorted. // Only the first few entries of files_by_compaction_pri_ are sorted.
// There is no need to sort all the files because it is likely // There is no need to sort all the files because it is likely
// that on a running system, we need to look at only the first // that on a running system, we need to look at only the first
// few largest files because a new version is created every few // few largest files because a new version is created every few
@ -513,8 +515,9 @@ class Version {
void UpdateAccumulatedStats(bool update_stats); void UpdateAccumulatedStats(bool update_stats);
// Sort all files for this version based on their file size and // Sort all files for this version based on their file size and
// record results in files_by_size_. The largest files are listed first. // record results in files_by_compaction_pri_. The largest files are listed
void UpdateFilesBySize(); // first.
void UpdateFilesByCompactionPri();
ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs
Logger* info_log_; Logger* info_log_;

@ -80,6 +80,13 @@ enum CompactionStyle : char {
kCompactionStyleNone = 0x3, kCompactionStyleNone = 0x3,
}; };
enum CompactionPri : char {
// Slightly Priotize larger files by size compensated by #deletes
kCompactionPriByCompensatedSize = 0x0,
// First compact files whose data is oldest.
kCompactionPriByLargestSeq = 0x1,
};
enum class WALRecoveryMode : char { enum class WALRecoveryMode : char {
// Original levelDB recovery // Original levelDB recovery
// We tolerate incomplete record in trailing data on all logs // We tolerate incomplete record in trailing data on all logs
@ -547,6 +554,11 @@ struct ColumnFamilyOptions {
// The compaction style. Default: kCompactionStyleLevel // The compaction style. Default: kCompactionStyleLevel
CompactionStyle compaction_style; CompactionStyle compaction_style;
// If level compaction_style = kCompactionStyleLevel, for each level,
// which files are prioritized to be picked to compact.
// Default: kCompactionPriByCompensatedSize
CompactionPri compaction_pri;
// If true, compaction will verify checksum on every read that happens // If true, compaction will verify checksum on every read that happens
// as part of compaction // as part of compaction
// //

@ -31,6 +31,7 @@ struct MutableCFOptions {
options.level0_file_num_compaction_trigger), options.level0_file_num_compaction_trigger),
level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger), level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger),
level0_stop_writes_trigger(options.level0_stop_writes_trigger), level0_stop_writes_trigger(options.level0_stop_writes_trigger),
compaction_pri(options.compaction_pri),
max_grandparent_overlap_factor(options.max_grandparent_overlap_factor), max_grandparent_overlap_factor(options.max_grandparent_overlap_factor),
expanded_compaction_factor(options.expanded_compaction_factor), expanded_compaction_factor(options.expanded_compaction_factor),
source_compaction_factor(options.source_compaction_factor), source_compaction_factor(options.source_compaction_factor),
@ -66,6 +67,7 @@ struct MutableCFOptions {
level0_file_num_compaction_trigger(0), level0_file_num_compaction_trigger(0),
level0_slowdown_writes_trigger(0), level0_slowdown_writes_trigger(0),
level0_stop_writes_trigger(0), level0_stop_writes_trigger(0),
compaction_pri(kCompactionPriByCompensatedSize),
max_grandparent_overlap_factor(0), max_grandparent_overlap_factor(0),
expanded_compaction_factor(0), expanded_compaction_factor(0),
source_compaction_factor(0), source_compaction_factor(0),
@ -117,6 +119,7 @@ struct MutableCFOptions {
int level0_file_num_compaction_trigger; int level0_file_num_compaction_trigger;
int level0_slowdown_writes_trigger; int level0_slowdown_writes_trigger;
int level0_stop_writes_trigger; int level0_stop_writes_trigger;
CompactionPri compaction_pri;
int max_grandparent_overlap_factor; int max_grandparent_overlap_factor;
int expanded_compaction_factor; int expanded_compaction_factor;
int source_compaction_factor; int source_compaction_factor;

@ -109,6 +109,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
disable_auto_compactions(false), disable_auto_compactions(false),
purge_redundant_kvs_while_flush(true), purge_redundant_kvs_while_flush(true),
compaction_style(kCompactionStyleLevel), compaction_style(kCompactionStyleLevel),
compaction_pri(kCompactionPriByCompensatedSize),
verify_checksums_in_compaction(true), verify_checksums_in_compaction(true),
filter_deletes(false), filter_deletes(false),
max_sequential_skip_in_iterations(8), max_sequential_skip_in_iterations(8),
@ -170,6 +171,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
disable_auto_compactions(options.disable_auto_compactions), disable_auto_compactions(options.disable_auto_compactions),
purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush), purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush),
compaction_style(options.compaction_style), compaction_style(options.compaction_style),
compaction_pri(options.compaction_pri),
verify_checksums_in_compaction(options.verify_checksums_in_compaction), verify_checksums_in_compaction(options.verify_checksums_in_compaction),
compaction_options_universal(options.compaction_options_universal), compaction_options_universal(options.compaction_options_universal),
compaction_options_fifo(options.compaction_options_fifo), compaction_options_fifo(options.compaction_options_fifo),
@ -492,6 +494,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
verify_checksums_in_compaction); verify_checksums_in_compaction);
Header(log, " Options.compaction_style: %d", Header(log, " Options.compaction_style: %d",
compaction_style); compaction_style);
Header(log, " Options.compaction_pri: %d",
compaction_pri);
Header(log, " Options.compaction_options_universal.size_ratio: %u", Header(log, " Options.compaction_options_universal.size_ratio: %u",
compaction_options_universal.size_ratio); compaction_options_universal.size_ratio);
Header(log, "Options.compaction_options_universal.min_merge_width: %u", Header(log, "Options.compaction_options_universal.min_merge_width: %u",

Loading…
Cancel
Save