Universal Compactions with Small Files

Summary:
With this change, we use L1 and up to store compaction outputs in universal compaction.
The compaction pick logic stays the same. Outputs are stored in the largest "level" as possible.

If options.num_levels=1, it behaves all the same as now.

Test Plan:
1) convert most of existing unit tests for universal comapaction to include the option of one level and multiple levels.
2) add a unit test to cover parallel compaction in universal compaction and run it in one level and multiple levels
3) add unit test to migrate from multiple level setting back to one level setting
4) add a unit test to insert keys to trigger multiple rounds of compactions and verify results.

Reviewers: rven, kradhakrishnan, yhchiang, igor

Reviewed By: igor

Subscribers: meyering, leveldb, MarkCallaghan, dhruba

Differential Revision: https://reviews.facebook.net/D34539
main
sdong 9 years ago
parent 2511b7d947
commit b23bbaa82a
  1. 1
      HISTORY.md
  2. 10
      db/column_family.cc
  3. 1
      db/column_family_test.cc
  4. 411
      db/compaction_picker.cc
  5. 48
      db/compaction_picker.h
  6. 12
      db/db_impl.cc
  7. 491
      db/db_test.cc
  8. 63
      db/version_set.cc
  9. 12
      db/version_set.h
  10. 7
      util/mutable_cf_options.h
  11. 7
      util/options.cc

@ -4,6 +4,7 @@
### New Features
* Added an experimental API for handling flashcache devices (blacklists background threads from caching their reads) -- NewFlashcacheAwareEnv
* If universal compaction is used and options.num_levels > 1, compact files are tried to be stored in none-L0 with smaller files based on options.target_file_size_base. The limitation of DB size when using universal compaction is greatly mitigated by using more levels. You can set num_levels = 1 to make universal compaction behave as before. If you set num_levels > 1 and want to roll back to a previous version, you need to compact all files to a big file in level 0 (by setting target_file_size_base to be large and CompactRange(<cf_handle>, nullptr, nullptr, true, 0) and reopen the DB with the same version to rewrite the manifest, and then you can open it using previous releases.
## 3.10.0 (3/24/2015)
### New Features

@ -426,18 +426,18 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"(waiting for flush), max_write_buffer_number is set to %d",
name_.c_str(), imm()->size(),
mutable_cf_options.max_write_buffer_number);
} else if (vstorage->NumLevelFiles(0) >=
} else if (vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->NumLevelFiles(0));
name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
vstorage->NumLevelFiles(0) >=
vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_slowdown_writes_trigger) {
uint64_t slowdown =
SlowdownAmount(vstorage->NumLevelFiles(0),
SlowdownAmount(vstorage->l0_delay_trigger_count(),
mutable_cf_options.level0_slowdown_writes_trigger,
mutable_cf_options.level0_stop_writes_trigger);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
@ -445,7 +445,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we have %d level-0 files (%" PRIu64
"us)",
name_.c_str(), vstorage->NumLevelFiles(0), slowdown);
name_.c_str(), vstorage->l0_delay_trigger_count(), slowdown);
} else if (mutable_cf_options.hard_rate_limit > 1.0 &&
score > mutable_cf_options.hard_rate_limit) {
uint64_t kHardLimitSlowdown = 1000;

@ -807,6 +807,7 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) {
default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options));
one.compaction_style = kCompactionStyleUniversal;
one.num_levels = 1;
// trigger compaction if there are >= 4 files
one.level0_file_num_compaction_trigger = 4;
one.write_buffer_size = 100000;

@ -902,6 +902,101 @@ bool UniversalCompactionPicker::NeedsCompaction(
return vstorage->CompactionScore(kLevel0) >= 1;
}
void UniversalCompactionPicker::SortedRun::Dump(char* out_buf,
size_t out_buf_size,
bool print_path) const {
if (level == 0) {
assert(file != nullptr);
if (file->fd.GetPathId() == 0 || !print_path) {
snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
} else {
snprintf(out_buf, out_buf_size, "file %" PRIu64
"(path "
"%" PRIu32 ")",
file->fd.GetNumber(), file->fd.GetPathId());
}
} else {
snprintf(out_buf, out_buf_size, "level %d", level);
}
}
void UniversalCompactionPicker::SortedRun::DumpSizeInfo(
char* out_buf, size_t out_buf_size, int sorted_run_count) const {
if (level == 0) {
assert(file != nullptr);
snprintf(out_buf, out_buf_size,
"file %" PRIu64
"[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")",
file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(),
file->compensated_file_size);
} else {
snprintf(out_buf, out_buf_size,
"level %d[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")",
level, sorted_run_count, size, compensated_file_size);
}
}
std::vector<UniversalCompactionPicker::SortedRun>
UniversalCompactionPicker::CalculateSortedRuns(
const VersionStorageInfo& vstorage) {
std::vector<UniversalCompactionPicker::SortedRun> ret;
for (FileMetaData* f : vstorage.LevelFiles(0)) {
ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
f->being_compacted);
}
for (int level = 1; level < vstorage.num_levels(); level++) {
uint64_t total_compensated_size = 0U;
uint64_t total_size = 0U;
bool being_compacted = false;
bool is_first = true;
for (FileMetaData* f : vstorage.LevelFiles(level)) {
total_compensated_size += f->compensated_file_size;
total_size += f->fd.GetFileSize();
// Compaction always includes all files for a non-zero level, so for a
// non-zero level, all the files should share the same being_compacted
// value.
assert(is_first || f->being_compacted == being_compacted);
if (is_first) {
being_compacted = f->being_compacted;
is_first = false;
}
}
if (total_compensated_size > 0) {
ret.emplace_back(level, nullptr, total_size, total_compensated_size,
being_compacted);
}
}
return ret;
}
#ifndef NDEBUG
namespace {
// smallest_seqno and largest_seqno are set iff. `files` is not empty.
void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
SequenceNumber* smallest_seqno,
SequenceNumber* largest_seqno) {
bool is_first = true;
for (FileMetaData* f : files) {
assert(f->smallest_seqno <= f->largest_seqno);
if (is_first) {
is_first = false;
*smallest_seqno = f->smallest_seqno;
*largest_seqno = f->largest_seqno;
} else {
if (f->smallest_seqno < *smallest_seqno) {
*smallest_seqno = f->smallest_seqno;
}
if (f->largest_seqno > *largest_seqno) {
*largest_seqno = f->largest_seqno;
}
}
}
}
} // namespace
#endif
// Universal style of compaction. Pick files that are contiguous in
// time-range to compact.
//
@ -910,22 +1005,23 @@ Compaction* UniversalCompactionPicker::PickCompaction(
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
const int kLevel0 = 0;
double score = vstorage->CompactionScore(kLevel0);
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
std::vector<SortedRun> sorted_runs = CalculateSortedRuns(*vstorage);
if ((level_files.size() <
(unsigned int)mutable_cf_options.level0_file_num_compaction_trigger)) {
if (sorted_runs.size() <
(unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) {
LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n", cf_name.c_str());
return nullptr;
}
VersionStorageInfo::FileSummaryStorage tmp;
LogToBuffer(log_buffer, 3072, "[%s] Universal: candidate files(%zu): %s\n",
cf_name.c_str(), level_files.size(),
vstorage->LevelFileSummary(&tmp, kLevel0));
VersionStorageInfo::LevelSummaryStorage tmp;
LogToBuffer(log_buffer, 3072, "[%s] Universal: sorted runs files(%zu): %s\n",
cf_name.c_str(), sorted_runs.size(),
vstorage->LevelSummary(&tmp));
// Check for size amplification first.
Compaction* c;
if ((c = PickCompactionUniversalSizeAmp(cf_name, mutable_cf_options, vstorage,
score, log_buffer)) != nullptr) {
score, sorted_runs, log_buffer)) !=
nullptr) {
LogToBuffer(log_buffer, "[%s] Universal: compacting for size amp\n",
cf_name.c_str());
} else {
@ -933,9 +1029,9 @@ Compaction* UniversalCompactionPicker::PickCompaction(
// amplification while maintaining file size ratios.
unsigned int ratio = ioptions_.compaction_options_universal.size_ratio;
if ((c = PickCompactionUniversalReadAmp(cf_name, mutable_cf_options,
vstorage, score, ratio, UINT_MAX,
log_buffer)) != nullptr) {
if ((c = PickCompactionUniversalReadAmp(
cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX,
sorted_runs, log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "[%s] Universal: compacting for size ratio\n",
cf_name.c_str());
} else {
@ -944,11 +1040,11 @@ Compaction* UniversalCompactionPicker::PickCompaction(
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
unsigned int num_files =
static_cast<unsigned int>(level_files.size()) -
static_cast<unsigned int>(sorted_runs.size()) -
mutable_cf_options.level0_file_num_compaction_trigger;
if ((c = PickCompactionUniversalReadAmp(
cf_name, mutable_cf_options, vstorage, score, UINT_MAX,
num_files, log_buffer)) != nullptr) {
num_files, sorted_runs, log_buffer)) != nullptr) {
LogToBuffer(log_buffer,
"[%s] Universal: compacting for file num -- %u\n",
cf_name.c_str(), num_files);
@ -958,21 +1054,55 @@ Compaction* UniversalCompactionPicker::PickCompaction(
if (c == nullptr) {
return nullptr;
}
assert(c->inputs_[kLevel0].size() > 1);
// validate that all the chosen files are non overlapping in time
FileMetaData* newerfile __attribute__((unused)) = nullptr;
for (unsigned int i = 0; i < c->inputs_[kLevel0].size(); i++) {
FileMetaData* f = c->inputs_[kLevel0][i];
assert (f->smallest_seqno <= f->largest_seqno);
assert(newerfile == nullptr ||
newerfile->smallest_seqno > f->largest_seqno);
newerfile = f;
}
// validate that all the chosen files of L0 are non overlapping in time
#ifndef NDEBUG
SequenceNumber prev_smallest_seqno = 0U;
bool is_first = true;
// Is the earliest file part of this compaction?
FileMetaData* last_file = level_files.back();
c->bottommost_level_ = c->inputs_[kLevel0].files.back() == last_file;
size_t level_index = 0U;
if (c->start_level() == 0) {
for (unsigned int i = 0; i < c->inputs_[0].size(); i++) {
FileMetaData* f = c->inputs_[0][i];
assert(f->smallest_seqno <= f->largest_seqno);
if (is_first) {
is_first = false;
} else {
assert(prev_smallest_seqno > f->largest_seqno);
}
prev_smallest_seqno = f->smallest_seqno;
}
level_index = 1U;
}
for (; level_index < c->num_input_levels(); level_index++) {
if (c->num_input_files(level_index) != 0) {
SequenceNumber smallest_seqno = 0U;
SequenceNumber largest_seqno = 0U;
GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
&largest_seqno);
if (is_first) {
is_first = false;
} else {
assert(prev_smallest_seqno > largest_seqno);
}
prev_smallest_seqno = smallest_seqno;
}
}
#endif
auto& last_sr = sorted_runs.back();
if (c->output_level() < last_sr.level) {
// If it doesn't compact to the last level that has file, it's not the last
// bottom most.
c->bottommost_level_ = false;
} else if (last_sr.level == 0) {
// All files are level 0. Then the compaction is bottom most iff it includes
// the last file.
c->bottommost_level_ = c->inputs_[kLevel0].files.back() == last_sr.file;
} else {
// In this case, it's not level 0 only and the compaction includes the last
// level having files. It is bottom most.
c->bottommost_level_ = true;
}
// update statistics
MeasureTime(ioptions_.statistics,
@ -985,7 +1115,15 @@ Compaction* UniversalCompactionPicker::PickCompaction(
// Record whether this compaction includes all sst files.
// For now, it is only relevant in universal compaction mode.
c->is_full_compaction_ = (c->inputs_[kLevel0].size() == level_files.size());
int num_files_in_compaction = 0;
int total_num_files = 0;
for (int level = 0; level < vstorage->num_levels(); level++) {
total_num_files += vstorage->NumLevelFiles(level);
}
for (size_t i = 0; i < c->num_input_levels(); i++) {
num_files_in_compaction += c->num_input_files(i);
}
c->is_full_compaction_ = (num_files_in_compaction == total_num_files);
c->mutable_cf_options_ = mutable_cf_options;
return c;
@ -1030,18 +1168,14 @@ uint32_t UniversalCompactionPicker::GetPathId(
Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score, unsigned int ratio,
unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) {
const int kLevel0 = 0;
unsigned int max_number_of_files_to_compact,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
unsigned int min_merge_width =
ioptions_.compaction_options_universal.min_merge_width;
unsigned int max_merge_width =
ioptions_.compaction_options_universal.max_merge_width;
// The files are sorted from newest first to oldest last.
const auto& files = vstorage->LevelFiles(kLevel0);
FileMetaData* f = nullptr;
const SortedRun* sr = nullptr;
bool done = false;
int start_index = 0;
unsigned int candidate_count = 0;
@ -1052,40 +1186,43 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// Considers a candidate file only if it is smaller than the
// total size accumulated so far.
for (unsigned int loop = 0; loop < files.size(); loop++) {
for (unsigned int loop = 0; loop < sorted_runs.size(); loop++) {
candidate_count = 0;
// Skip files that are already being compacted
for (f = nullptr; loop < files.size(); loop++) {
f = files[loop];
for (sr = nullptr; loop < sorted_runs.size(); loop++) {
sr = &sorted_runs[loop];
if (!f->being_compacted) {
if (!sr->being_compacted) {
candidate_count = 1;
break;
}
LogToBuffer(log_buffer, "[%s] Universal: file %" PRIu64
"[%d] being compacted, skipping",
cf_name.c_str(), f->fd.GetNumber(), loop);
f = nullptr;
char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf));
LogToBuffer(log_buffer,
"[%s] Universal: %s"
"[%d] being compacted, skipping",
cf_name.c_str(), file_num_buf, loop);
sr = nullptr;
}
// This file is not being compacted. Consider it as the
// first candidate to be compacted.
uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0;
if (f != nullptr) {
uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0;
if (sr != nullptr) {
char file_num_buf[kFormatFileNumberBufSize];
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf,
sizeof(file_num_buf));
LogToBuffer(log_buffer, "[%s] Universal: Possible candidate file %s[%d].",
sr->Dump(file_num_buf, sizeof(file_num_buf), true);
LogToBuffer(log_buffer, "[%s] Universal: Possible candidate %s[%d].",
cf_name.c_str(), file_num_buf, loop);
}
// Check if the suceeding files need compaction.
for (unsigned int i = loop + 1;
candidate_count < max_files_to_compact && i < files.size(); i++) {
FileMetaData* suceeding_file = files[i];
if (suceeding_file->being_compacted) {
candidate_count < max_files_to_compact && i < sorted_runs.size();
i++) {
const SortedRun* suceeding_sr = &sorted_runs[i];
if (suceeding_sr->being_compacted) {
break;
}
// Pick files if the total/last candidate file size (increased by the
@ -1095,14 +1232,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// kCompactionStopStyleSimilarSize, it's simply the size of the last
// picked file.
double sz = candidate_size * (100.0 + ratio) / 100.0;
if (sz < static_cast<double>(suceeding_file->fd.GetFileSize())) {
if (sz < static_cast<double>(suceeding_sr->size)) {
break;
}
if (ioptions_.compaction_options_universal.stop_style ==
kCompactionStopStyleSimilarSize) {
// Similar-size stopping rule: also check the last picked file isn't
// far larger than the next candidate file.
sz = (suceeding_file->fd.GetFileSize() * (100.0 + ratio)) / 100.0;
sz = (suceeding_sr->size * (100.0 + ratio)) / 100.0;
if (sz < static_cast<double>(candidate_size)) {
// If the small file we've encountered begins a run of similar-size
// files, we'll pick them up on a future iteration of the outer
@ -1110,9 +1247,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// by the last-resort read amp strategy which disregards size ratios.
break;
}
candidate_size = suceeding_file->compensated_file_size;
candidate_size = suceeding_sr->compensated_file_size;
} else { // default kCompactionStopStyleTotalSize
candidate_size += suceeding_file->compensated_file_size;
candidate_size += suceeding_sr->compensated_file_size;
}
candidate_count++;
}
@ -1124,15 +1261,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
break;
} else {
for (unsigned int i = loop;
i < loop + candidate_count && i < files.size(); i++) {
FileMetaData* skipping_file = files[i];
LogToBuffer(log_buffer, "[%s] Universal: Skipping file %" PRIu64
"[%d] with size %" PRIu64
" (compensated size %" PRIu64 ") %d\n",
cf_name.c_str(), f->fd.GetNumber(), i,
skipping_file->fd.GetFileSize(),
skipping_file->compensated_file_size,
skipping_file->being_compacted);
i < loop + candidate_count && i < sorted_runs.size(); i++) {
const SortedRun* skipping_sr = &sorted_runs[i];
char file_num_buf[256];
skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
LogToBuffer(log_buffer, "[%s] Universal: Skipping %s", cf_name.c_str(),
file_num_buf);
}
}
}
@ -1146,10 +1280,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
int ratio_to_compress =
ioptions_.compaction_options_universal.compression_size_percent;
if (ratio_to_compress >= 0) {
uint64_t total_size = vstorage->NumLevelBytes(kLevel0);
uint64_t total_size = 0;
for (auto& sorted_run : sorted_runs) {
total_size += sorted_run.compensated_file_size;
}
uint64_t older_file_size = 0;
for (size_t i = files.size() - 1; i >= first_index_after; i--) {
older_file_size += files[i]->fd.GetFileSize();
for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) {
older_file_size += sorted_runs[i].size;
if (older_file_size * 100L >= total_size * (long) ratio_to_compress) {
enable_compression = false;
break;
@ -1159,28 +1297,40 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
uint64_t estimated_total_size = 0;
for (unsigned int i = 0; i < first_index_after; i++) {
estimated_total_size += files[i]->fd.GetFileSize();
estimated_total_size += sorted_runs[i].size;
}
uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
int start_level = sorted_runs[start_index].level;
int output_level;
if (first_index_after == sorted_runs.size()) {
output_level = vstorage->num_levels() - 1;
} else if (sorted_runs[first_index_after].level == 0) {
output_level = 0;
} else {
output_level = sorted_runs[first_index_after].level - 1;
}
Compaction* c = new Compaction(
vstorage->num_levels(), kLevel0, kLevel0,
mutable_cf_options.MaxFileSizeForLevel(kLevel0), LLONG_MAX, path_id,
GetCompressionType(ioptions_, kLevel0, 1, enable_compression));
vstorage->num_levels(), start_level, output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id,
GetCompressionType(ioptions_, start_level, 1, enable_compression));
c->score_ = score;
for (unsigned int i = start_index; i < first_index_after; i++) {
FileMetaData* picking_file = files[i];
c->inputs_[0].files.push_back(picking_file);
char file_num_buf[kFormatFileNumberBufSize];
FormatFileNumber(picking_file->fd.GetNumber(), picking_file->fd.GetPathId(),
file_num_buf, sizeof(file_num_buf));
LogToBuffer(log_buffer,
"[%s] Universal: Picking file %s[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")\n",
cf_name.c_str(), file_num_buf, i,
picking_file->fd.GetFileSize(),
picking_file->compensated_file_size);
auto& picking_sr = sorted_runs[i];
if (picking_sr.level == 0) {
FileMetaData* picking_file = picking_sr.file;
c->inputs_[0].files.push_back(picking_file);
} else {
auto& files = c->inputs_[picking_sr.level - start_level].files;
for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
files.push_back(f);
}
}
char file_num_buf[256];
picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
LogToBuffer(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(),
file_num_buf);
}
return c;
}
@ -1193,61 +1343,56 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
//
Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score, LogBuffer* log_buffer) {
const int kLevel = 0;
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
// percentage flexibilty while reducing size amplification
uint64_t ratio = ioptions_.compaction_options_universal.
max_size_amplification_percent;
// The files are sorted from newest first to oldest last.
const auto& files = vstorage->LevelFiles(kLevel);
unsigned int candidate_count = 0;
uint64_t candidate_size = 0;
unsigned int start_index = 0;
FileMetaData* f = nullptr;
const SortedRun* sr = nullptr;
// Skip files that are already being compacted
for (unsigned int loop = 0; loop < files.size() - 1; loop++) {
f = files[loop];
if (!f->being_compacted) {
for (unsigned int loop = 0; loop < sorted_runs.size() - 1; loop++) {
sr = &sorted_runs[loop];
if (!sr->being_compacted) {
start_index = loop; // Consider this as the first candidate.
break;
}
char file_num_buf[kFormatFileNumberBufSize];
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf,
sizeof(file_num_buf));
LogToBuffer(log_buffer, "[%s] Universal: skipping file %s[%d] compacted %s",
sr->Dump(file_num_buf, sizeof(file_num_buf), true);
LogToBuffer(log_buffer, "[%s] Universal: skipping %s[%d] compacted %s",
cf_name.c_str(), file_num_buf, loop,
" cannot be a candidate to reduce size amp.\n");
f = nullptr;
sr = nullptr;
}
if (f == nullptr) {
if (sr == nullptr) {
return nullptr; // no candidate files
}
char file_num_buf[kFormatFileNumberBufSize];
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf,
sizeof(file_num_buf));
LogToBuffer(log_buffer, "[%s] Universal: First candidate file %s[%d] %s",
cf_name.c_str(), file_num_buf, start_index,
" to reduce size amp.\n");
{
char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf), true);
LogToBuffer(log_buffer, "[%s] Universal: First candidate %s[%d] %s",
cf_name.c_str(), file_num_buf, start_index,
" to reduce size amp.\n");
}
// keep adding up all the remaining files
for (unsigned int loop = start_index; loop < files.size() - 1; loop++) {
f = files[loop];
if (f->being_compacted) {
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf,
sizeof(file_num_buf));
for (unsigned int loop = start_index; loop < sorted_runs.size() - 1; loop++) {
sr = &sorted_runs[loop];
if (sr->being_compacted) {
char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf), true);
LogToBuffer(
log_buffer, "[%s] Universal: Possible candidate file %s[%d] %s.",
cf_name.c_str(), file_num_buf, loop,
log_buffer, "[%s] Universal: Possible candidate %s[%d] %s",
cf_name.c_str(), file_num_buf, start_index,
" is already being compacted. No size amp reduction possible.\n");
return nullptr;
}
candidate_size += f->compensated_file_size;
candidate_size += sr->compensated_file_size;
candidate_count++;
}
if (candidate_count == 0) {
@ -1255,7 +1400,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
}
// size of earliest file
uint64_t earliest_file_size = files.back()->fd.GetFileSize();
uint64_t earliest_file_size = sorted_runs.back().size;
// size amplification = percentage of additional size
if (candidate_size * 100 < ratio * earliest_file_size) {
@ -1272,31 +1417,39 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
"earliest-file-size %" PRIu64,
cf_name.c_str(), candidate_size, earliest_file_size);
}
assert(start_index < files.size() - 1);
assert(start_index < sorted_runs.size() - 1);
// Estimate total file size
uint64_t estimated_total_size = 0;
for (unsigned int loop = start_index; loop < files.size(); loop++) {
estimated_total_size += files[loop]->fd.GetFileSize();
for (unsigned int loop = start_index; loop < sorted_runs.size(); loop++) {
estimated_total_size += sorted_runs[loop].size;
}
uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
int start_level = sorted_runs[start_index].level;
// create a compaction request
// We always compact all the files, so always compress.
Compaction* c =
new Compaction(vstorage->num_levels(), kLevel, kLevel,
mutable_cf_options.MaxFileSizeForLevel(kLevel), LLONG_MAX,
path_id, GetCompressionType(ioptions_, kLevel, 1));
Compaction* c = new Compaction(
vstorage->num_levels(), start_level, vstorage->num_levels() - 1,
mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1),
LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage->num_levels() - 1, 1));
c->score_ = score;
for (unsigned int loop = start_index; loop < files.size(); loop++) {
f = files[loop];
c->inputs_[0].files.push_back(f);
LogToBuffer(log_buffer,
"[%s] Universal: size amp picking file %" PRIu64
"[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")",
cf_name.c_str(), f->fd.GetNumber(), loop, f->fd.GetFileSize(),
f->compensated_file_size);
for (unsigned int loop = start_index; loop < sorted_runs.size(); loop++) {
auto& picking_sr = sorted_runs[loop];
if (picking_sr.level == 0) {
FileMetaData* f = picking_sr.file;
c->inputs_[0].files.push_back(f);
} else {
auto& files = c->inputs_[picking_sr.level - start_level].files;
for (auto* f : vstorage->LevelFiles(picking_sr.level)) {
files.push_back(f);
}
}
char file_num_buf[256];
sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
LogToBuffer(log_buffer, "[%s] Universal: size amp picking %s",
cf_name.c_str(), file_num_buf);
}
return c;
}

@ -210,30 +210,62 @@ class UniversalCompactionPicker : public CompactionPicker {
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
// The maxinum allowed input level. Always returns 0.
virtual int MaxInputLevel(int current_num_levels) const override {
return 0;
return NumberLevels() - 2;
}
// The maximum allowed output level. Always returns 0.
virtual int MaxOutputLevel() const override {
return 0;
}
virtual int MaxOutputLevel() const override { return NumberLevels() - 1; }
virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const
override;
private:
struct SortedRun {
SortedRun(int _level, FileMetaData* _file, uint64_t _size,
uint64_t _compensated_file_size, bool _being_compacted)
: level(_level),
file(_file),
size(_size),
compensated_file_size(_compensated_file_size),
being_compacted(_being_compacted) {
assert(compensated_file_size > 0);
assert(level != 0 || file != nullptr);
}
void Dump(char* out_buf, size_t out_buf_size,
bool print_path = false) const;
// sorted_run_count is added into the string to print
void DumpSizeInfo(char* out_buf, size_t out_buf_size,
int sorted_run_count) const;
int level;
// `file` Will be null for level > 0. For level = 0, the sorted run is
// for this file.
FileMetaData* file;
// For level > 0, `size` and `compensated_file_size` are sum of sizes all
// files in the level. `being_compacted` should be the same for all files
// in a non-zero level. Use the value here.
uint64_t size;
uint64_t compensated_file_size;
bool being_compacted;
};
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionUniversalReadAmp(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score, unsigned int ratio,
unsigned int num_files, LogBuffer* log_buffer);
unsigned int num_files, const std::vector<SortedRun>& sorted_runs,
LogBuffer* log_buffer);
// Pick Universal compaction to limit space amplification.
Compaction* PickCompactionUniversalSizeAmp(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score, LogBuffer* log_buffer);
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer);
static std::vector<SortedRun> CalculateSortedRuns(
const VersionStorageInfo& vstorage);
// Pick a path ID to place a newly generated file, with its estimated file
// size.

@ -1284,6 +1284,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
(level == max_level_with_files && level > 0)) {
s = RunManualCompaction(cfd, level, level, target_path_id, begin, end);
} else {
// TODO(sdong) Skip empty levels if possible.
s = RunManualCompaction(cfd, level, level + 1, target_path_id, begin,
end);
}
@ -2370,7 +2371,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
// Universal and FIFO compactions should always compact the whole range
assert(m->cfd->ioptions()->compaction_style != kCompactionStyleUniversal);
assert(m->cfd->ioptions()->compaction_style !=
kCompactionStyleUniversal ||
m->cfd->ioptions()->num_levels > 1);
assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
m->tmp_storage = *manual_end;
m->begin = &m->tmp_storage;
@ -3303,6 +3306,7 @@ Status DBImpl::DelayWrite(uint64_t expiration_time) {
mutex_.Unlock();
delayed = true;
// hopefully we don't have to sleep more than 2 billion microseconds
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
env_->SleepForMicroseconds(static_cast<int>(delay));
mutex_.Lock();
}
@ -3310,6 +3314,7 @@ Status DBImpl::DelayWrite(uint64_t expiration_time) {
while (bg_error_.ok() && write_controller_.IsStopped()) {
delayed = true;
if (has_timeout) {
TEST_SYNC_POINT("DBImpl::DelayWrite:TimedWait");
bg_cv_.TimedWait(expiration_time);
if (env_->NowMicros() > expiration_time) {
timed_out = true;
@ -3930,15 +3935,14 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
auto* vstorage = cfd->current()->storage_info();
for (int i = 1; i < vstorage->num_levels(); ++i) {
int num_files = vstorage->NumLevelFiles(i);
if (num_files > 0) {
s = Status::InvalidArgument(
"Not all files are at level 0. Cannot "
"open with universal or FIFO compaction style.");
"open with FIFO compaction style.");
break;
}
}

@ -426,12 +426,13 @@ class DBTest : public testing::Test {
kDeletesFilterFirst = 19,
kHashSkipList = 20,
kUniversalCompaction = 21,
kCompressedBlockCache = 22,
kInfiniteMaxOpenFiles = 23,
kxxHashChecksum = 24,
kFIFOCompaction = 25,
kOptimizeFiltersForHits = 26,
kEnd = 27
kUniversalCompactionMultiLevel = 22,
kCompressedBlockCache = 23,
kInfiniteMaxOpenFiles = 24,
kxxHashChecksum = 25,
kFIFOCompaction = 26,
kOptimizeFiltersForHits = 27,
kEnd = 28
};
int option_config_;
@ -499,7 +500,8 @@ class DBTest : public testing::Test {
continue;
}
if ((skip_mask & kSkipUniversalCompaction) &&
option_config_ == kUniversalCompaction) {
(option_config_ == kUniversalCompaction ||
option_config_ == kUniversalCompactionMultiLevel)) {
continue;
}
if ((skip_mask & kSkipMergePut) && option_config_ == kMergePut) {
@ -555,6 +557,13 @@ class DBTest : public testing::Test {
options.create_if_missing = true;
TryReopen(options);
return true;
} else if (option_config_ == kUniversalCompaction) {
option_config_ = kUniversalCompactionMultiLevel;
Destroy(last_options_);
auto options = CurrentOptions();
options.create_if_missing = true;
TryReopen(options);
return true;
} else {
return false;
}
@ -674,6 +683,11 @@ class DBTest : public testing::Test {
break;
case kUniversalCompaction:
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
break;
case kUniversalCompactionMultiLevel:
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 8;
break;
case kCompressedBlockCache:
options.allow_mmap_writes = true;
@ -964,6 +978,32 @@ class DBTest : public testing::Test {
return result;
}
int NumSortedRuns(int cf = 0) {
ColumnFamilyMetaData cf_meta;
if (cf == 0) {
db_->GetColumnFamilyMetaData(&cf_meta);
} else {
db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
}
int num_sr = static_cast<int>(cf_meta.levels[0].files.size());
for (size_t i = 1U; i < cf_meta.levels.size(); i++) {
if (cf_meta.levels[i].files.size() > 0) {
num_sr++;
}
}
return num_sr;
}
uint64_t TotalSize(int cf = 0) {
ColumnFamilyMetaData cf_meta;
if (cf == 0) {
db_->GetColumnFamilyMetaData(&cf_meta);
} else {
db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
}
return cf_meta.size;
}
int NumTableFilesAtLevel(int level, int cf = 0) {
std::string property;
if (cf == 0) {
@ -990,6 +1030,20 @@ class DBTest : public testing::Test {
return sum;
}
int TotalLiveFiles(int cf = 0) {
ColumnFamilyMetaData cf_meta;
if (cf == 0) {
db_->GetColumnFamilyMetaData(&cf_meta);
} else {
db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
}
int num_files = 0;
for (auto& level : cf_meta.levels) {
num_files += level.files.size();
}
return num_files;
}
int TotalTableFiles(int cf = 0, int levels = -1) {
if (levels == -1) {
levels = CurrentOptions().num_levels;
@ -3886,14 +3940,26 @@ class ChangeFilterFactory : public CompactionFilterFactory {
virtual const char* Name() const override { return "ChangeFilterFactory"; }
};
class DBTestUniversalCompactionBase
: public DBTest,
public ::testing::WithParamInterface<int> {
public:
virtual void SetUp() override { num_levels_ = GetParam(); }
int num_levels_;
};
class DBTestUniversalCompaction : public DBTestUniversalCompactionBase {};
// TODO(kailiu) The tests on UniversalCompaction has some issues:
// 1. A lot of magic numbers ("11" or "12").
// 2. Made assumption on the memtable flush conditions, which may change from
// time to time.
TEST_F(DBTest, UniversalCompactionTrigger) {
TEST_P(DBTestUniversalCompaction, UniversalCompactionTrigger) {
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100<<10; //100KB
options.num_levels = num_levels_;
options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB
// trigger compaction if there are >= 4 files
options.level0_file_num_compaction_trigger = 4;
KeepFilterFactory* filter = new KeepFilterFactory(true);
@ -3901,6 +3967,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) {
options.compaction_filter_factory.reset(filter);
options = CurrentOptions(options);
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
@ -3918,7 +3985,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) {
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1);
ASSERT_EQ(NumSortedRuns(1), num + 1);
}
// Generate one more file at level-0, which should trigger level-0
@ -3931,10 +3998,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) {
// Suppose each file flushed from mem table has size 1. Now we compact
// (level0_file_num_compaction_trigger+1)=4 files and should have a big
// file of size 4.
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
}
ASSERT_EQ(NumSortedRuns(1), 1);
// Stage 2:
// Now we have one file at level 0, with size 4. We also have some data in
@ -3953,7 +4017,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) {
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 3);
ASSERT_EQ(NumSortedRuns(1), num + 3);
}
// Generate one more file at level-0, which should trigger level-0
@ -3965,10 +4029,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) {
dbfull()->TEST_WaitForCompact();
// Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
// After compaction, we should have 2 files, with size 4, 2.4.
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 2);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
}
ASSERT_EQ(NumSortedRuns(1), 2);
// Stage 3:
// Now we have 2 files at level 0, with size 4 and 2.4. Continue
@ -3981,7 +4042,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) {
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 3);
ASSERT_EQ(NumSortedRuns(1), num + 3);
}
// Generate one more file at level-0, which should trigger level-0
@ -3993,10 +4054,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) {
dbfull()->TEST_WaitForCompact();
// Before compaction, we have 4 files at level 0, with size 4, 2.4, 1, 1.
// After compaction, we should have 3 files, with size 4, 2.4, 2.
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
}
ASSERT_EQ(NumSortedRuns(1), 3);
// Stage 4:
// Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a
@ -4007,10 +4065,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) {
}
dbfull()->TEST_WaitForCompact();
// Level-0 compaction is triggered, but no file will be picked up.
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 4);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
}
ASSERT_EQ(NumSortedRuns(1), 4);
// Stage 5:
// Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
@ -4022,18 +4077,18 @@ TEST_F(DBTest, UniversalCompactionTrigger) {
}
dbfull()->TEST_WaitForCompact();
// All files at level 0 will be compacted into a single one.
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
}
ASSERT_EQ(NumSortedRuns(1), 1);
}
TEST_F(DBTest, UniversalCompactionSizeAmplification) {
TEST_P(DBTestUniversalCompaction, UniversalCompactionSizeAmplification) {
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100<<10; //100KB
options.num_levels = num_levels_;
options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB
options.level0_file_num_compaction_trigger = 3;
options = CurrentOptions(options);
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// Trigger compaction if size amplification exceeds 110%
@ -4053,9 +4108,9 @@ TEST_F(DBTest, UniversalCompactionSizeAmplification) {
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1);
ASSERT_EQ(NumSortedRuns(1), num + 1);
}
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 2);
ASSERT_EQ(NumSortedRuns(1), 2);
// Flush whatever is remaining in memtable. This is typically
// small, which should not trigger size ratio based compaction
@ -4065,17 +4120,123 @@ TEST_F(DBTest, UniversalCompactionSizeAmplification) {
dbfull()->TEST_WaitForCompact();
// Verify that size amplification did occur
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1);
ASSERT_EQ(NumSortedRuns(1), 1);
}
TEST_F(DBTest, UniversalCompactionOptions) {
class DBTestUniversalCompactionMultiLevels
: public DBTestUniversalCompactionBase {};
TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionMultiLevels) {
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100<<10; //100KB
options.num_levels = num_levels_;
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 8;
options.max_background_compactions = 3;
options.target_file_size_base = 32 * 1024;
options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options);
// Trigger compaction if size amplification exceeds 110%
options.compaction_options_universal.max_size_amplification_percent = 110;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
Random rnd(301);
int num_keys = 100000;
for (int i = 0; i < num_keys * 2; i++) {
ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
}
dbfull()->TEST_WaitForCompact();
for (int i = num_keys; i < num_keys * 2; i++) {
ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
}
}
INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels,
DBTestUniversalCompactionMultiLevels,
::testing::Values(3, 20));
class DBTestUniversalCompactionParallel : public DBTestUniversalCompactionBase {
};
TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) {
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.write_buffer_size = 1 << 10; // 1KB
options.level0_file_num_compaction_trigger = 3;
options.max_background_compactions = 3;
options.max_background_flushes = 3;
options.target_file_size_base = 1 * 1024;
options.compaction_options_universal.max_size_amplification_percent = 110;
options = CurrentOptions(options);
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// Delay every compaction so multiple compactions will happen.
std::atomic<int> num_compactions_running(0);
std::atomic<bool> has_parallel(false);
rocksdb::SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Start",
[&]() {
if (num_compactions_running.fetch_add(1) > 0) {
has_parallel.store(true);
return;
}
for (int nwait = 0; nwait < 20000; nwait++) {
if (has_parallel.load() || num_compactions_running.load() > 1) {
has_parallel.store(true);
break;
}
env_->SleepForMicroseconds(1000);
}
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():End",
[&]() { num_compactions_running.fetch_add(-1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
Random rnd(301);
int num_keys = 30000;
for (int i = 0; i < num_keys * 2; i++) {
ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
}
dbfull()->TEST_WaitForCompact();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(num_compactions_running.load(), 0);
ASSERT_TRUE(has_parallel.load());
for (int i = num_keys; i < num_keys * 2; i++) {
ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
}
// Reopen and check.
ReopenWithColumnFamilies({"default", "pikachu"}, options);
for (int i = num_keys; i < num_keys * 2; i++) {
ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
}
}
INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel,
DBTestUniversalCompactionParallel,
::testing::Values(1, 10));
TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) {
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB
options.level0_file_num_compaction_trigger = 4;
options.num_levels = 1;
options.num_levels = num_levels_;
options.compaction_options_universal.compression_size_percent = -1;
options = CurrentOptions(options);
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
@ -4090,27 +4251,26 @@ TEST_F(DBTest, UniversalCompactionOptions) {
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
if (num < options.level0_file_num_compaction_trigger - 1) {
ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1);
ASSERT_EQ(NumSortedRuns(1), num + 1);
}
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1);
for (int i = 1; i < options.num_levels ; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
}
ASSERT_EQ(NumSortedRuns(1), 1);
}
TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) {
TEST_P(DBTestUniversalCompaction, UniversalCompactionStopStyleSimilarSize) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100<<10; //100KB
options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB
// trigger compaction if there are >= 4 files
options.level0_file_num_compaction_trigger = 4;
options.compaction_options_universal.size_ratio = 10;
options.compaction_options_universal.stop_style = kCompactionStopStyleSimilarSize;
options.num_levels=1;
Reopen(options);
options.compaction_options_universal.stop_style =
kCompactionStopStyleSimilarSize;
options.num_levels = num_levels_;
DestroyAndReopen(options);
Random rnd(301);
int key_idx = 0;
@ -4118,8 +4278,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) {
// Stage 1:
// Generate a set of files at level 0, but don't trigger level-0
// compaction.
for (int num = 0;
num < options.level0_file_num_compaction_trigger-1;
for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
num++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
@ -4127,7 +4286,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) {
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0), num + 1);
ASSERT_EQ(NumSortedRuns(), num + 1);
}
// Generate one more file at level-0, which should trigger level-0
@ -4140,7 +4299,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) {
// Suppose each file flushed from mem table has size 1. Now we compact
// (level0_file_num_compaction_trigger+1)=4 files and should have a big
// file of size 4.
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_EQ(NumSortedRuns(), 1);
// Stage 2:
// Now we have one file at level 0, with size 4. We also have some data in
@ -4150,8 +4309,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) {
// a level-0 file, with size around 0.4 (according to previously written
// data amount).
dbfull()->Flush(FlushOptions());
for (int num = 0;
num < options.level0_file_num_compaction_trigger-3;
for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
num++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
@ -4159,7 +4317,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) {
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0), num + 3);
ASSERT_EQ(NumSortedRuns(), num + 3);
}
// Generate one more file at level-0, which should trigger level-0
@ -4171,7 +4329,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) {
dbfull()->TEST_WaitForCompact();
// Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
// After compaction, we should have 3 files, with size 4, 0.4, 2.
ASSERT_EQ(NumTableFilesAtLevel(0), 3);
ASSERT_EQ(NumSortedRuns(), 3);
// Stage 3:
// Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one
// more file at level-0, which should trigger level-0 compaction.
@ -4181,7 +4339,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) {
}
dbfull()->TEST_WaitForCompact();
// Level-0 compaction is triggered, but no file will be picked up.
ASSERT_EQ(NumTableFilesAtLevel(0), 4);
ASSERT_EQ(NumSortedRuns(), 4);
}
TEST_F(DBTest, CompressedCache) {
@ -4308,18 +4466,20 @@ static std::string CompressibleString(Random* rnd, int len) {
return r;
}
TEST_F(DBTest, UniversalCompactionCompressRatio1) {
TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio1) {
if (!SnappyCompressionSupported()) {
return;
}
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100<<10; //100KB
options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 1;
options.num_levels = num_levels_;
options.compaction_options_universal.compression_size_percent = 70;
options = CurrentOptions(options);
Reopen(options);
DestroyAndReopen(options);
Random rnd(301);
int key_idx = 0;
@ -4334,7 +4494,7 @@ TEST_F(DBTest, UniversalCompactionCompressRatio1) {
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 2 * 0.9);
ASSERT_LT(TotalSize(), 110000U * 2 * 0.9);
// The second compaction (4) is compressed
for (int num = 0; num < 2; num++) {
@ -4346,7 +4506,7 @@ TEST_F(DBTest, UniversalCompactionCompressRatio1) {
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 4 * 0.9);
ASSERT_LT(TotalSize(), 110000 * 4 * 0.9);
// The third compaction (2 4) is compressed since this time it is
// (1 1 3.2) and 3.2/5.2 doesn't reach ratio.
@ -4359,7 +4519,7 @@ TEST_F(DBTest, UniversalCompactionCompressRatio1) {
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 6 * 0.9);
ASSERT_LT(TotalSize(), 110000 * 6 * 0.9);
// When we start for the compaction up to (2 4 8), the latest
// compressed is not compressed.
@ -4372,22 +4532,22 @@ TEST_F(DBTest, UniversalCompactionCompressRatio1) {
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_GT((int)dbfull()->TEST_GetLevel0TotalSize(),
110000 * 11 * 0.8 + 110000 * 2);
ASSERT_GT(TotalSize(), 110000 * 11 * 0.8 + 110000 * 2);
}
TEST_F(DBTest, UniversalCompactionCompressRatio2) {
TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio2) {
if (!SnappyCompressionSupported()) {
return;
}
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100<<10; //100KB
options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 1;
options.num_levels = num_levels_;
options.compaction_options_universal.compression_size_percent = 95;
options = CurrentOptions(options);
Reopen(options);
DestroyAndReopen(options);
Random rnd(301);
int key_idx = 0;
@ -4403,10 +4563,12 @@ TEST_F(DBTest, UniversalCompactionCompressRatio2) {
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(),
120000 * 12 * 0.8 + 120000 * 2);
ASSERT_LT(TotalSize(), 120000U * 12 * 0.8 + 120000 * 2);
}
INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction,
::testing::Values(1, 3, 5));
TEST_F(DBTest, FailMoreDbPaths) {
Options options = CurrentOptions();
options.db_paths.emplace_back(dbname_, 10000000);
@ -4908,6 +5070,7 @@ TEST_F(DBTest, ConvertCompactionStyle) {
// Stage 2: reopen with universal compaction - should fail
options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
options = CurrentOptions(options);
Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_TRUE(s.IsInvalidArgument());
@ -4919,6 +5082,7 @@ TEST_F(DBTest, ConvertCompactionStyle) {
options.target_file_size_multiplier = 1;
options.max_bytes_for_level_base = INT_MAX;
options.max_bytes_for_level_multiplier = 1;
options.num_levels = 4;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
@ -4937,11 +5101,15 @@ TEST_F(DBTest, ConvertCompactionStyle) {
// Stage 4: re-open in universal compaction style and do some db operations
options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 4;
options.write_buffer_size = 100<<10; //100KB
options.level0_file_num_compaction_trigger = 3;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
options.num_levels = 1;
ReopenWithColumnFamilies({"default", "pikachu"}, options);
for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) {
ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
}
@ -4972,6 +5140,99 @@ TEST_F(DBTest, ConvertCompactionStyle) {
ASSERT_EQ(keys_in_db, expected_keys);
}
TEST_F(DBTest, IncreaseUniversalCompactionNumLevels) {
std::function<void(int)> verify_func = [&](int num_keys_in_db) {
std::string keys_in_db;
Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
keys_in_db.append(iter->key().ToString());
keys_in_db.push_back(',');
}
delete iter;
std::string expected_keys;
for (int i = 0; i <= num_keys_in_db; i++) {
expected_keys.append(Key(i));
expected_keys.push_back(',');
}
ASSERT_EQ(keys_in_db, expected_keys);
};
Random rnd(301);
int max_key1 = 200;
int max_key2 = 600;
int max_key3 = 800;
// Stage 1: open a DB with universal compaction, num_levels=1
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 3;
options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options);
for (int i = 0; i <= max_key1; i++) {
// each value is 10K
ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
}
ASSERT_OK(Flush(1));
dbfull()->TEST_WaitForCompact();
int non_level0_num_files = 0;
for (int i = 1; i < options.num_levels; i++) {
non_level0_num_files += NumTableFilesAtLevel(i, 1);
}
ASSERT_EQ(non_level0_num_files, 0);
// Stage 2: reopen with universal compaction, num_levels=4
options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 4;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
verify_func(max_key1);
// Insert more keys
for (int i = max_key1 + 1; i <= max_key2; i++) {
// each value is 10K
ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
}
ASSERT_OK(Flush(1));
dbfull()->TEST_WaitForCompact();
verify_func(max_key2);
// Compaction to non-L0 has happened.
ASSERT_GT(NumTableFilesAtLevel(options.num_levels - 1, 1), 0);
// Stage 3: Revert it back to one level and revert to num_levels=1.
options.num_levels = 4;
options.target_file_size_base = INT_MAX;
ReopenWithColumnFamilies({"default", "pikachu"}, options);
// Compact all to level 0
dbfull()->CompactRange(handles_[1], nullptr, nullptr, true /* reduce level */,
0 /* reduce to level 0 */);
// Need to restart it once to remove higher level records in manifest.
ReopenWithColumnFamilies({"default", "pikachu"}, options);
// Final reopen
options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
// Insert more keys
for (int i = max_key2 + 1; i <= max_key3; i++) {
// each value is 10K
ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
}
ASSERT_OK(Flush(1));
dbfull()->TEST_WaitForCompact();
verify_func(max_key3);
}
namespace {
void MinLevelHelper(DBTest* self, Options& options) {
Random rnd(301);
@ -5596,7 +5857,7 @@ TEST_F(DBTest, CompactionFilterContextManual) {
// set this flag.
dbfull()->CompactRange(nullptr, nullptr);
ASSERT_EQ(cfilter_count, 700);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_EQ(NumSortedRuns(0), 1);
// Verify total number of keys is correct after manual compaction.
{
@ -5770,7 +6031,7 @@ TEST_F(DBTest, CompactionFilterV2) {
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_EQ(NumSortedRuns(0), 1);
// All the files are in the lowest level.
int count = 0;
@ -6568,46 +6829,52 @@ TEST_F(DBTest, ManualCompaction) {
}
TEST_F(DBTest, ManualCompactionOutputPathId) {
class DBTestUniversalManualCompactionOutputPathId
: public DBTestUniversalCompactionBase {};
TEST_P(DBTestUniversalManualCompactionOutputPathId,
ManualCompactionOutputPathId) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.db_paths.emplace_back(dbname_, 1000000000);
options.db_paths.emplace_back(dbname_ + "_2", 1000000000);
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.target_file_size_base = 1 << 30; // Big size
options.level0_file_num_compaction_trigger = 10;
Destroy(options);
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
MakeTables(3, "p", "q", 1);
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("3", FilesPerLevel(1));
ASSERT_EQ(3, TotalLiveFiles(1));
ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
// Full compaction to DB path 0
db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 1);
ASSERT_EQ("1", FilesPerLevel(1));
ASSERT_EQ(1, TotalLiveFiles(1));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ("1", FilesPerLevel(1));
ASSERT_EQ(1, TotalLiveFiles(1));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
MakeTables(1, "p", "q", 1);
ASSERT_EQ("2", FilesPerLevel(1));
ASSERT_EQ(2, TotalLiveFiles(1));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ("2", FilesPerLevel(1));
ASSERT_EQ(2, TotalLiveFiles(1));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
// Full compaction to DB path 0
db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 0);
ASSERT_EQ("1", FilesPerLevel(1));
ASSERT_EQ(1, TotalLiveFiles(1));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
@ -6616,6 +6883,10 @@ TEST_F(DBTest, ManualCompactionOutputPathId) {
.IsInvalidArgument());
}
INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId,
DBTestUniversalManualCompactionOutputPathId,
::testing::Values(1, 8));
TEST_F(DBTest, ManualLevelCompactionOutputPathId) {
Options options = CurrentOptions();
options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
@ -6796,7 +7067,10 @@ TEST_F(DBTest, DropWrites) {
env_->drop_writes_.store(true, std::memory_order_release);
env_->sleep_counter_.Reset();
for (int i = 0; i < 5; i++) {
for (int level = 0; level < dbfull()->NumberLevels() - 1; level++) {
for (int level = 0; level < dbfull()->NumberLevels(); level++) {
if (level > 0 && level == dbfull()->NumberLevels() - 1) {
break;
}
dbfull()->TEST_CompactRange(level, nullptr, nullptr);
}
}
@ -9693,6 +9967,7 @@ TEST_F(DBTest, CompactFilesOnUniversalCompaction) {
options.create_if_missing = true;
options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
options.compaction_style = kCompactionStyleLevel;
options.num_levels = 1;
options.target_file_size_base = options.write_buffer_size;
options.compression = kNoCompression;
options = CurrentOptions(options);
@ -10063,11 +10338,19 @@ TEST_F(DBTest, DynamicMemtableOptions) {
int count = 0;
Random rnd(301);
WriteOptions wo;
wo.timeout_hint_us = 1000;
wo.timeout_hint_us = 100000; // Reasonabley long timeout to make sure sleep
// triggers but not forever.
std::atomic<int> sleep_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:TimedWait",
[&]() { sleep_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 256) {
count++;
}
ASSERT_GT(sleep_count.load(), 0);
ASSERT_GT(static_cast<double>(count), 128 * 0.8);
ASSERT_LT(static_cast<double>(count), 128 * 1.2);
@ -10085,9 +10368,11 @@ TEST_F(DBTest, DynamicMemtableOptions) {
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2,
Env::Priority::LOW);
count = 0;
sleep_count.store(0);
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) {
count++;
}
ASSERT_GT(sleep_count.load(), 0);
ASSERT_GT(static_cast<double>(count), 512 * 0.8);
ASSERT_LT(static_cast<double>(count), 512 * 1.2);
sleeping_task_low2.WakeUp();
@ -10103,14 +10388,19 @@ TEST_F(DBTest, DynamicMemtableOptions) {
SleepingBackgroundTask sleeping_task_low3;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low3,
Env::Priority::LOW);
count = 0;
sleep_count.store(0);
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) {
count++;
}
ASSERT_GT(sleep_count.load(), 0);
ASSERT_GT(static_cast<double>(count), 256 * 0.8);
ASSERT_LT(static_cast<double>(count), 266 * 1.2);
sleeping_task_low3.WakeUp();
sleeping_task_low3.WaitUntilDone();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
#if ROCKSDB_USING_THREAD_STATUS
@ -11173,17 +11463,29 @@ TEST_F(DBTest, DynamicCompactionOptions) {
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024)));
dbfull()->TEST_FlushMemTable(true);
std::atomic<int> sleep_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Sleep",
[&]() { sleep_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Hard rate limit slow down for 1000 us, so default 10ms should be ok
ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok());
wo.timeout_hint_us = 500;
ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).IsTimedOut());
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
sleep_count.store(0);
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
ASSERT_GT(sleep_count.load(), 0);
// Lift the limit and no timeout
ASSERT_OK(dbfull()->SetOptions({
{"hard_rate_limit", "100"}
}));
dbfull()->TEST_FlushMemTable(true);
ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok());
sleep_count.store(0);
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
// Technically, time out is still possible for timing issue.
ASSERT_EQ(sleep_count.load(), 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
// Test max_mem_compaction_level.
// Destory DB and start from scratch
@ -11195,8 +11497,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_TRUE(Put("max_mem_compaction_level_key",
RandomString(&rnd, 8)).ok());
ASSERT_OK(Put("max_mem_compaction_level_key", RandomString(&rnd, 8)));
dbfull()->TEST_FlushMemTable(true);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
@ -11886,8 +12187,8 @@ TEST_F(DBTest, MergeTestTime) {
std::string result;
db_->Get(opt, "foo", &result);
ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 2100000);
ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 1900000);
ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 2800000);
ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 1200000);
ReadOptions read_options;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
@ -11899,8 +12200,8 @@ TEST_F(DBTest, MergeTestTime) {
ASSERT_EQ(1, count);
ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 4200000);
ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 3800000);
ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 6000000);
ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 3200000);
}
TEST_F(DBTest, MergeCompactionTimeTest) {

@ -726,11 +726,12 @@ VersionStorageInfo::VersionStorageInfo(
file_indexer_(user_comparator),
compaction_style_(compaction_style),
files_(new std::vector<FileMetaData*>[num_levels_]),
base_level_(1),
base_level_(num_levels_ == 1 ? -1 : 1),
files_by_size_(num_levels_),
next_file_to_compact_by_size_(num_levels_),
compaction_score_(num_levels_),
compaction_level_(num_levels_),
l0_delay_trigger_count_(0),
accumulated_file_size_(0),
accumulated_raw_key_size_(0),
accumulated_raw_value_size_(0),
@ -991,25 +992,37 @@ void VersionStorageInfo::ComputeCompactionScore(
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
int numfiles = 0;
int num_sorted_runs = 0;
uint64_t total_size = 0;
for (unsigned int i = 0; i < files_[level].size(); i++) {
if (!files_[level][i]->being_compacted) {
total_size += files_[level][i]->compensated_file_size;
numfiles++;
num_sorted_runs++;
}
}
if (compaction_style_ == kCompactionStyleUniversal) {
// For universal compaction, we use level0 score to indicate
// compaction score for the whole DB. Adding other levels as if
// they are L0 files.
for (int i = 1; i < num_levels(); i++) {
if (!files_[i].empty() && !files_[i][0]->being_compacted) {
num_sorted_runs++;
}
}
}
if (compaction_style_ == kCompactionStyleFIFO) {
score = static_cast<double>(total_size) /
compaction_options_fifo.max_table_files_size;
} else if (numfiles >= mutable_cf_options.level0_stop_writes_trigger) {
} else if (num_sorted_runs >=
mutable_cf_options.level0_stop_writes_trigger) {
// If we are slowing down writes, then we better compact that first
score = 1000000;
} else if (numfiles >=
mutable_cf_options.level0_slowdown_writes_trigger) {
} else if (num_sorted_runs >=
mutable_cf_options.level0_slowdown_writes_trigger) {
score = 10000;
} else {
score = static_cast<double>(numfiles) /
score = static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger;
}
} else {
@ -1090,7 +1103,12 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
void VersionStorageInfo::SetFinalized() {
finalized_ = true;
#ifndef NDEBUG
assert(base_level_ >= 1 && (num_levels() <= 1 || base_level_ < num_levels()));
if (compaction_style_ != kCompactionStyleLevel) {
// Not level based compaction.
return;
}
assert(base_level_ < 0 || num_levels() == 1 ||
(base_level_ >= 1 && base_level_ < num_levels()));
// Verify all levels newer than base_level are empty except L0
for (int level = 1; level < base_level(); level++) {
assert(NumLevelBytes(level) == 0);
@ -1442,14 +1460,14 @@ uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
const char* VersionStorageInfo::LevelSummary(
LevelSummaryStorage* scratch) const {
int len = 0;
if (num_levels() > 1) {
if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
len = snprintf(scratch->buffer, sizeof(scratch->buffer),
"base level %d max bytes base %" PRIu64, base_level_,
"base level %d max bytes base %" PRIu64 " ", base_level_,
level_max_bytes_[base_level_]);
}
len +=
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " files[");
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
for (int i = 0; i < num_levels(); i++) {
int sz = sizeof(scratch->buffer) - len;
int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
@ -1512,9 +1530,24 @@ uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
const MutableCFOptions& options) {
// Special logic to set number of sorted runs.
// It is to match the previous behavior when all files are in L0.
int num_l0_count = static_cast<int>(files_[0].size());
if (compaction_style_ == kCompactionStyleUniversal) {
// For universal compaction, we use level0 score to indicate
// compaction score for the whole DB. Adding other levels as if
// they are L0 files.
for (int i = 1; i < num_levels(); i++) {
if (!files_[i].empty()) {
num_l0_count++;
}
}
}
set_l0_delay_trigger_count(num_l0_count);
level_max_bytes_.resize(ioptions.num_levels);
if (!ioptions.level_compaction_dynamic_level_bytes) {
base_level_ = 1;
base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
// Calculate for static bytes base case
for (int i = 0; i < ioptions.num_levels; ++i) {
@ -1524,7 +1557,7 @@ void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
level_max_bytes_[i] = MultiplyCheckOverflow(
MultiplyCheckOverflow(level_max_bytes_[i - 1],
options.max_bytes_for_level_multiplier),
options.max_bytes_for_level_multiplier_additional[i - 1]);
options.MaxBytesMultiplerAdditional(i - 1));
} else {
level_max_bytes_[i] = options.max_bytes_for_level_base;
}
@ -2868,7 +2901,9 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
"[%s] compaction output being applied to a different base version from"
" input version",
c->column_family_data()->GetName().c_str());
if (c->start_level() == 0 && c->num_input_levels() > 2U) {
if (vstorage->compaction_style_ == kCompactionStyleLevel &&
c->start_level() == 0 && c->num_input_levels() > 2U) {
// We are doing a L0->base_level compaction. The assumption is if
// base level is not L1, levels from L1 to base_level - 1 is empty.
// This is ensured by having one compaction from L0 going on at the

@ -189,6 +189,14 @@ class VersionStorageInfo {
return num_non_empty_levels_;
}
// REQUIRES: This version has been finalized.
// (CalculateBaseBytes() is called)
// This may or may not return number of level files. It is to keep backward
// compatible behavior in universal compaction.
int l0_delay_trigger_count() const { return l0_delay_trigger_count_; }
void set_l0_delay_trigger_count(int v) { l0_delay_trigger_count_ = v; }
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
int NumLevelFiles(int level) const {
assert(finalized_);
@ -312,7 +320,7 @@ class VersionStorageInfo {
std::vector<FileMetaData*>* files_;
// Level that L0 data should be compacted to. All levels < base_level_ should
// be empty.
// be empty. -1 if it is not level-compaction so it's not applicable.
int base_level_;
// A list for the same set of files that are stored in files_,
@ -341,6 +349,8 @@ class VersionStorageInfo {
std::vector<int> compaction_level_;
double max_compaction_score_ = 0.0; // max score in l1 to ln-1
int max_compaction_score_level_ = 0; // level on which max score occurs
int l0_delay_trigger_count_ = 0; // Count used to trigger slow down and stop
// for number of L0 files.
// the following are the sampled temporary stats.
// the current accumulated size of sampled files.

@ -84,6 +84,13 @@ struct MutableCFOptions {
// file in level->level+1 compaction.
uint64_t MaxGrandParentOverlapBytes(int level) const;
uint64_t ExpandedCompactionByteSizeLimit(int level) const;
int MaxBytesMultiplerAdditional(int level) const {
if (level >=
static_cast<int>(max_bytes_for_level_multiplier_additional.size())) {
return 1;
}
return max_bytes_for_level_multiplier_additional[level];
}
void Dump(Logger* log) const;

@ -420,9 +420,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
level_compaction_dynamic_level_bytes);
Log(log," Options.max_bytes_for_level_multiplier: %d",
max_bytes_for_level_multiplier);
for (int i = 0; i < num_levels; i++) {
Log(log,"Options.max_bytes_for_level_multiplier_addtl[%d]: %d",
i, max_bytes_for_level_multiplier_additional[i]);
for (size_t i = 0; i < max_bytes_for_level_multiplier_additional.size();
i++) {
Log(log, "Options.max_bytes_for_level_multiplier_addtl[%zu]: %d", i,
max_bytes_for_level_multiplier_additional[i]);
}
Log(log," Options.max_sequential_skip_in_iterations: %" PRIu64,
max_sequential_skip_in_iterations);

Loading…
Cancel
Save