Allow IntraL0 compaction in FIFO Compaction

Summary:
Allow an option for users to do some compaction in FIFO compaction, to pay some write amplification for fewer number of files.
Closes https://github.com/facebook/rocksdb/pull/2163

Differential Revision: D4895953

Pulled By: siying

fbshipit-source-id: a1ab608dd0627211f3e1f588a2e97159646e1231
main
Siying Dong 8 years ago committed by Facebook Github Bot
parent 8c3a180e83
commit 264d3f540c
  1. 4
      HISTORY.md
  2. 1
      db/column_family.cc
  3. 81
      db/compaction_picker.cc
  4. 42
      db/db_test.cc
  5. 7
      db/version_set.cc
  6. 14
      include/rocksdb/advanced_options.h
  7. 2
      include/rocksdb/listener.h
  8. 3
      options/options.cc
  9. 5
      tools/db_bench_tool.cc

@ -1,9 +1,9 @@
# Rocksdb Change Log
## Unreleased
### Public API Change
* Introduce WriteBatch::PopSavePoint to pop the most recent save point explicitly.
* Introduce WriteBatch::PopSavePoint to pop the most recent save point explicitly
### New Features
* FIFO compaction to support Intra L0 compaction too with CompactionOptionsFIFO.allow_compaction=true.
* DB::ResetStats() to reset internal stats.
* Statistics::Reset() to reset user stats.
* ldb add option --try_load_options, which will open DB with its own option file.

@ -198,7 +198,6 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
result.num_levels = 1;
// since we delete level0 files in FIFO compaction when there are too many
// of them, these options don't really mean anything
result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
}

@ -38,6 +38,39 @@ uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
}
return sum;
}
bool FindIntraL0Compaction(const std::vector<FileMetaData*>& level_files,
size_t min_files_to_compact,
uint64_t max_compact_bytes_per_del_file,
CompactionInputFiles* comp_inputs) {
size_t compact_bytes = level_files[0]->fd.file_size;
size_t compact_bytes_per_del_file = port::kMaxSizet;
// compaction range will be [0, span_len).
size_t span_len;
// pull in files until the amount of compaction work per deleted file begins
// increasing.
size_t new_compact_bytes_per_del_file = 0;
for (span_len = 1; span_len < level_files.size(); ++span_len) {
compact_bytes += level_files[span_len]->fd.file_size;
new_compact_bytes_per_del_file = compact_bytes / span_len;
if (level_files[span_len]->being_compacted ||
new_compact_bytes_per_del_file > compact_bytes_per_del_file) {
break;
}
compact_bytes_per_del_file = new_compact_bytes_per_del_file;
}
if (span_len >= min_files_to_compact &&
new_compact_bytes_per_del_file < max_compact_bytes_per_del_file) {
assert(comp_inputs != nullptr);
comp_inputs->level = 0;
for (size_t i = 0; i < span_len; ++i) {
comp_inputs->files.push_back(level_files[i]);
}
return true;
}
return false;
}
} // anonymous namespace
// Determine compression type, based on user options, level of the output
@ -1348,31 +1381,8 @@ bool LevelCompactionBuilder::PickIntraL0Compaction() {
// resort to L0->L0 compaction yet.
return false;
}
size_t compact_bytes = level_files[0]->fd.file_size;
size_t compact_bytes_per_del_file = port::kMaxSizet;
// compaction range will be [0, span_len).
size_t span_len;
// pull in files until the amount of compaction work per deleted file begins
// increasing.
for (span_len = 1; span_len < level_files.size(); ++span_len) {
compact_bytes += level_files[span_len]->fd.file_size;
size_t new_compact_bytes_per_del_file = compact_bytes / span_len;
if (level_files[span_len]->being_compacted ||
new_compact_bytes_per_del_file > compact_bytes_per_del_file) {
break;
}
compact_bytes_per_del_file = new_compact_bytes_per_del_file;
}
if (span_len >= kMinFilesForIntraL0Compaction) {
start_level_inputs_.level = 0;
for (size_t i = 0; i < span_len; ++i) {
start_level_inputs_.files.push_back(level_files[i]);
}
return true;
}
return false;
return FindIntraL0Compaction(level_files, kMinFilesForIntraL0Compaction,
port::kMaxUint64, &start_level_inputs_);
}
} // namespace
@ -1405,6 +1415,27 @@ Compaction* FIFOCompactionPicker::PickCompaction(
if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size ||
level_files.size() == 0) {
// total size not exceeded
if (ioptions_.compaction_options_fifo.allow_compaction &&
level_files.size() > 0) {
CompactionInputFiles comp_inputs;
if (FindIntraL0Compaction(
level_files,
mutable_cf_options
.level0_file_num_compaction_trigger /* min_files_to_compact */,
mutable_cf_options.write_buffer_size, &comp_inputs)) {
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, {comp_inputs}, 0,
16 * 1024 * 1024 /* output file size limit */,
0 /* max compaction bytes, not applicable */,
0 /* output path ID */, mutable_cf_options.compression, {},
/* is manual */ false, vstorage->CompactionScore(0),
/* is deletion compaction */ false,
CompactionReason::kFIFOReduceNumFiles);
RegisterCompaction(c);
return c;
}
}
ROCKS_LOG_BUFFER(log_buffer,
"[%s] FIFO compaction: nothing to do. Total size %" PRIu64
", max size %" PRIu64 "\n",

@ -2767,6 +2767,48 @@ TEST_P(DBTestWithParam, FIFOCompactionTest) {
}
}
}
TEST_F(DBTest, FIFOCompactionTestWithCompaction) {
Options options;
options.compaction_style = kCompactionStyleFIFO;
options.write_buffer_size = 20 << 10; // 20K
options.arena_block_size = 4096;
options.compaction_options_fifo.max_table_files_size = 1500 << 10; // 1MB
options.compaction_options_fifo.allow_compaction = true;
options.level0_file_num_compaction_trigger = 6;
options.compression = kNoCompression;
options.create_if_missing = true;
options = CurrentOptions(options);
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 60; i++) {
// Generate and flush a file about 20KB.
for (int j = 0; j < 20; j++) {
ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
}
Flush();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
// It should be compacted to 10 files.
ASSERT_EQ(NumTableFilesAtLevel(0), 10);
for (int i = 0; i < 60; i++) {
// Generate and flush a file about 10KB.
for (int j = 0; j < 20; j++) {
ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980)));
}
Flush();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
// It should be compacted to no more than 20 files.
ASSERT_GT(NumTableFilesAtLevel(0), 10);
ASSERT_LT(NumTableFilesAtLevel(0), 18);
// Size limit is still guaranteed.
ASSERT_LE(SizeAtLevel(0),
options.compaction_options_fifo.max_table_files_size);
}
#endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE

@ -1312,6 +1312,13 @@ void VersionStorageInfo::ComputeCompactionScore(
score =
static_cast<double>(total_size) /
immutable_cf_options.compaction_options_fifo.max_table_files_size;
if (immutable_cf_options.compaction_options_fifo.allow_compaction) {
score = std::max(
static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger,
score);
}
} else {
score = static_cast<double>(num_sorted_runs) /
mutable_cf_options.level0_file_num_compaction_trigger;

@ -62,9 +62,19 @@ struct CompactionOptionsFIFO {
// Default: 1GB
uint64_t max_table_files_size;
// If true, try to do compaction to compact smaller files into larger ones.
// Minimum files to compact follows options.level0_file_num_compaction_trigger
// and compaction won't trigger if average compact bytes per del file is
// larger than options.write_buffer_size. This is to protect large files
// from being compacted again.
// Default: false;
bool allow_compaction = false;
CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {}
CompactionOptionsFIFO(uint64_t _max_table_files_size) :
max_table_files_size(_max_table_files_size) {}
CompactionOptionsFIFO(uint64_t _max_table_files_size,
uint64_t _allow_compaction)
: max_table_files_size(_max_table_files_size),
allow_compaction(_allow_compaction) {}
};
// Compression options for different compression algorithms like Zlib

@ -69,6 +69,8 @@ enum class CompactionReason {
kUniversalSortedRunNum,
// [FIFO] total size > max_table_files_size
kFIFOMaxSize,
// [FIFO] reduce number of files.
kFIFOReduceNumFiles,
// Manual compaction
kManualCompaction,
// DB::SuggestCompactRange() marked files for compaction

@ -348,6 +348,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(
log, "Options.compaction_options_fifo.max_table_files_size: %" PRIu64,
compaction_options_fifo.max_table_files_size);
ROCKS_LOG_HEADER(log,
"Options.compaction_options_fifo.allow_compaction: %d",
compaction_options_fifo.allow_compaction);
std::string collector_names;
for (const auto& collector_factory : table_properties_collector_factories) {
collector_names.append(collector_factory->Name());

@ -620,6 +620,8 @@ DEFINE_string(
DEFINE_uint64(fifo_compaction_max_table_files_size_mb, 0,
"The limit of total table file sizes to trigger FIFO compaction");
DEFINE_bool(fifo_compaction_allow_compaction, true,
"Allow compaction in FIFO compaction.");
#endif // ROCKSDB_LITE
DEFINE_bool(report_bg_io_stats, false,
@ -2823,7 +2825,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
FLAGS_use_direct_io_for_flush_and_compaction;
#ifndef ROCKSDB_LITE
options.compaction_options_fifo = CompactionOptionsFIFO(
FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024);
FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024,
FLAGS_fifo_compaction_allow_compaction);
#endif // ROCKSDB_LITE
if (FLAGS_prefix_size != 0) {
options.prefix_extractor.reset(

Loading…
Cancel
Save