Level Compaction with TTL

Summary:
Level Compaction with TTL.

As of today, a file could exist in the LSM tree without going through the compaction process for a really long time if there are no updates to the data in the file's key range. For example, in certain use cases, the keys are not actually "deleted"; instead they are just set to empty values. There might not be any more writes to this "deleted" key range, and if so, such data could remain in the LSM for a really long time resulting in wasted space.

Introducing a TTL could solve this problem. Files (and, in turn, data) older than TTL will be scheduled for compaction when there is no other background work. This will make the data go through the regular compaction process and get rid of old unwanted data.
This also has the (good) side-effect of all the data in the non-bottommost level being newer than ttl, and all data in the bottommost level older than ttl. It could lead to more writes while reducing space.

This functionality can be controlled by the newly introduced column family option -- ttl.

TODO for later:
- Make ttl mutable
- Extend TTL to Universal compaction as well? (TTL is already supported in FIFO)
- Maybe deprecate CompactionOptionsFIFO.ttl in favor of this new ttl option.
Closes https://github.com/facebook/rocksdb/pull/3591

Differential Revision: D7275442

Pulled By: sagar0

fbshipit-source-id: dcba484717341200d419b0953dafcdf9eb2f0267
main
Sagar Vemuri 7 years ago committed by Facebook Github Bot
parent df14424410
commit 04c11b867d
  1. 1
      HISTORY.md
  2. 91
      db/compaction_picker.cc
  3. 52
      db/db_compaction_test.cc
  4. 7
      db/db_impl_open.cc
  5. 31
      db/version_set.cc
  6. 13
      db/version_set.h
  7. 7
      include/rocksdb/advanced_options.h
  8. 1
      include/rocksdb/listener.h
  9. 3
      options/cf_options.cc
  10. 2
      options/cf_options.h
  11. 4
      options/options.cc
  12. 5
      options/options_helper.cc
  13. 1
      options/options_settable_test.cc
  14. 1
      util/testutil.cc

@ -4,6 +4,7 @@
* Add a BlockBasedTableOption to align uncompressed data blocks on the smaller of block size or page size boundary, to reduce flash reads by avoiding reads spanning 4K pages. * Add a BlockBasedTableOption to align uncompressed data blocks on the smaller of block size or page size boundary, to reduce flash reads by avoiding reads spanning 4K pages.
### New Features ### New Features
* * Introduce TTL for level compaction so that all files older than ttl go through the compaction process to get rid of old data.
### Bug Fixes ### Bug Fixes
* Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob. * Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob.

@ -941,6 +941,9 @@ void CompactionPicker::UnregisterCompaction(Compaction* c) {
bool LevelCompactionPicker::NeedsCompaction( bool LevelCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage) const { const VersionStorageInfo* vstorage) const {
if (!vstorage->ExpiredTtlFiles().empty()) {
return true;
}
if (!vstorage->BottommostFilesMarkedForCompaction().empty()) { if (!vstorage->BottommostFilesMarkedForCompaction().empty()) {
return true; return true;
} }
@ -1010,6 +1013,8 @@ class LevelCompactionBuilder {
// If there is any file marked for compaction, put put it into inputs. // If there is any file marked for compaction, put put it into inputs.
void PickFilesMarkedForCompaction(); void PickFilesMarkedForCompaction();
void PickExpiredTtlFiles();
const std::string& cf_name_; const std::string& cf_name_;
VersionStorageInfo* vstorage_; VersionStorageInfo* vstorage_;
CompactionPicker* compaction_picker_; CompactionPicker* compaction_picker_;
@ -1080,6 +1085,42 @@ void LevelCompactionBuilder::PickFilesMarkedForCompaction() {
start_level_inputs_.files.clear(); start_level_inputs_.files.clear();
} }
void LevelCompactionBuilder::PickExpiredTtlFiles() {
if (vstorage_->ExpiredTtlFiles().empty()) {
return;
}
auto continuation = [&](std::pair<int, FileMetaData*> level_file) {
// If it's being compacted it has nothing to do here.
// If this assert() fails that means that some function marked some
// files as being_compacted, but didn't call ComputeCompactionScore()
assert(!level_file.second->being_compacted);
start_level_ = level_file.first;
output_level_ =
(start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
if ((start_level_ == vstorage_->num_non_empty_levels() - 1) ||
(start_level_ == 0 &&
!compaction_picker_->level0_compactions_in_progress()->empty())) {
return false;
}
start_level_inputs_.files = {level_file.second};
start_level_inputs_.level = start_level_;
return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&start_level_inputs_);
};
for (auto& level_file : vstorage_->ExpiredTtlFiles()) {
if (continuation(level_file)) {
// found the compaction!
return;
}
}
start_level_inputs_.files.clear();
}
void LevelCompactionBuilder::SetupInitialFiles() { void LevelCompactionBuilder::SetupInitialFiles() {
// Find the compactions by size on all levels. // Find the compactions by size on all levels.
bool skipped_l0_to_base = false; bool skipped_l0_to_base = false;
@ -1133,30 +1174,38 @@ void LevelCompactionBuilder::SetupInitialFiles() {
if (start_level_inputs_.empty()) { if (start_level_inputs_.empty()) {
is_manual_ = true; is_manual_ = true;
parent_index_ = base_index_ = -1; parent_index_ = base_index_ = -1;
PickFilesMarkedForCompaction(); PickFilesMarkedForCompaction();
if (start_level_inputs_.empty()) { if (!start_level_inputs_.empty()) {
size_t i; compaction_reason_ = CompactionReason::kFilesMarkedForCompaction;
for (i = 0; i < vstorage_->BottommostFilesMarkedForCompaction().size(); return;
++i) { }
auto& level_and_file =
vstorage_->BottommostFilesMarkedForCompaction()[i]; size_t i;
assert(!level_and_file.second->being_compacted); for (i = 0; i < vstorage_->BottommostFilesMarkedForCompaction().size();
start_level_inputs_.level = output_level_ = start_level_ = ++i) {
level_and_file.first; auto& level_and_file = vstorage_->BottommostFilesMarkedForCompaction()[i];
start_level_inputs_.files = {level_and_file.second}; assert(!level_and_file.second->being_compacted);
if (compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, start_level_inputs_.level = output_level_ = start_level_ =
&start_level_inputs_)) { level_and_file.first;
break; start_level_inputs_.files = {level_and_file.second};
} if (compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
} &start_level_inputs_)) {
if (i == vstorage_->BottommostFilesMarkedForCompaction().size()) { break;
start_level_inputs_.clear();
} else {
assert(!start_level_inputs_.empty());
compaction_reason_ = CompactionReason::kBottommostFiles;
} }
}
if (i == vstorage_->BottommostFilesMarkedForCompaction().size()) {
start_level_inputs_.clear();
} else { } else {
compaction_reason_ = CompactionReason::kFilesMarkedForCompaction; assert(!start_level_inputs_.empty());
compaction_reason_ = CompactionReason::kBottommostFiles;
return;
}
assert(start_level_inputs_.empty());
PickExpiredTtlFiles();
if (!start_level_inputs_.empty()) {
compaction_reason_ = CompactionReason::kTtl;
} }
} }
} }

@ -3106,6 +3106,58 @@ TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
} }
} }
TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
const int kNumKeysPerFile = 32;
const int kNumLevelFiles = 2;
const int kValueSize = 1024;
Options options = CurrentOptions();
options.compression = kNoCompression;
options.ttl = 24 * 60 * 60; // 24 hours
options.max_open_files = -1;
env_->time_elapse_only_sleep_ = false;
options.env = env_;
env_->addon_time_.store(0);
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < kNumLevelFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
}
Flush();
}
Flush();
dbfull()->TEST_WaitForCompact();
MoveFilesToLevel(3);
ASSERT_EQ("0,0,0,2", FilesPerLevel());
for (int i = 0; i < kNumLevelFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
// Overwrite previous keys with smaller, but predictable, values.
ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
}
Flush();
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("2,0,0,2", FilesPerLevel());
MoveFilesToLevel(1);
ASSERT_EQ("0,2,0,2", FilesPerLevel());
env_->addon_time_.fetch_add(36 * 60 * 60); // 36 hours
ASSERT_EQ("0,2,0,2", FilesPerLevel());
// Just do a siimple write + flush so that the Ttl expired files get
// compacted.
ASSERT_OK(Put("a", "1"));
Flush();
dbfull()->TEST_WaitForCompact();
// All non-L0 files are deleted, as they contained only deleted data.
ASSERT_EQ("1", FilesPerLevel());
}
TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) { TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
// Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
// compaction only triggers flush after it's sure stall won't be triggered for // compaction only triggers flush after it's sure stall won't be triggered for

@ -174,17 +174,16 @@ static Status ValidateOptions(
"universal and level compaction styles. "); "universal and level compaction styles. ");
} }
} }
if (cfd.options.compaction_options_fifo.ttl > 0) { if (cfd.options.ttl > 0 || cfd.options.compaction_options_fifo.ttl > 0) {
if (db_options.max_open_files != -1) { if (db_options.max_open_files != -1) {
return Status::NotSupported( return Status::NotSupported(
"FIFO Compaction with TTL is only supported when files are always " "TTL is only supported when files are always "
"kept open (set max_open_files = -1). "); "kept open (set max_open_files = -1). ");
} }
if (cfd.options.table_factory->Name() != if (cfd.options.table_factory->Name() !=
BlockBasedTableFactory().Name()) { BlockBasedTableFactory().Name()) {
return Status::NotSupported( return Status::NotSupported(
"FIFO Compaction with TTL is only supported in " "TTL is only supported in Block-Based Table format. ");
"Block-Based Table format. ");
} }
} }
} }

@ -1667,6 +1667,9 @@ void VersionStorageInfo::ComputeCompactionScore(
} }
ComputeFilesMarkedForCompaction(); ComputeFilesMarkedForCompaction();
ComputeBottommostFilesMarkedForCompaction(); ComputeBottommostFilesMarkedForCompaction();
if (immutable_cf_options.ttl > 0) {
ComputeExpiredTtlFiles(immutable_cf_options);
}
EstimateCompactionBytesNeeded(mutable_cf_options); EstimateCompactionBytesNeeded(mutable_cf_options);
} }
@ -1693,6 +1696,34 @@ void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
} }
} }
void VersionStorageInfo::ComputeExpiredTtlFiles(
const ImmutableCFOptions& ioptions) {
assert(ioptions.ttl > 0);
expired_ttl_files_.clear();
int64_t _current_time;
auto status = ioptions.env->GetCurrentTime(&_current_time);
if (!status.ok()) {
return;
}
const uint64_t current_time = static_cast<uint64_t>(_current_time);
for (int level = 0; level < num_levels() - 1; level++) {
for (auto f : files_[level]) {
if (!f->being_compacted && f->fd.table_reader != nullptr &&
f->fd.table_reader->GetTableProperties() != nullptr) {
auto creation_time =
f->fd.table_reader->GetTableProperties()->creation_time;
if (creation_time > 0 &&
creation_time < (current_time - ioptions.ttl)) {
expired_ttl_files_.emplace_back(level, f);
}
}
}
}
}
namespace { namespace {
// used to sort files by size // used to sort files by size

@ -135,6 +135,10 @@ class VersionStorageInfo {
// ComputeCompactionScore() // ComputeCompactionScore()
void ComputeFilesMarkedForCompaction(); void ComputeFilesMarkedForCompaction();
// This computes ttl_expired_files_ and is called by
// ComputeCompactionScore()
void ComputeExpiredTtlFiles(const ImmutableCFOptions& ioptions);
// This computes bottommost_files_marked_for_compaction_ and is called by // This computes bottommost_files_marked_for_compaction_ and is called by
// ComputeCompactionScore() or UpdateOldestSnapshot(). // ComputeCompactionScore() or UpdateOldestSnapshot().
// //
@ -286,6 +290,13 @@ class VersionStorageInfo {
return files_marked_for_compaction_; return files_marked_for_compaction_;
} }
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
// REQUIRES: DB mutex held during access
const autovector<std::pair<int, FileMetaData*>>& ExpiredTtlFiles() const {
assert(finalized_);
return expired_ttl_files_;
}
// REQUIRES: This version has been saved (see VersionSet::SaveTo) // REQUIRES: This version has been saved (see VersionSet::SaveTo)
// REQUIRES: DB mutex held during access // REQUIRES: DB mutex held during access
const autovector<std::pair<int, FileMetaData*>>& const autovector<std::pair<int, FileMetaData*>>&
@ -446,6 +457,8 @@ class VersionStorageInfo {
// ComputeCompactionScore() // ComputeCompactionScore()
autovector<std::pair<int, FileMetaData*>> files_marked_for_compaction_; autovector<std::pair<int, FileMetaData*>> files_marked_for_compaction_;
autovector<std::pair<int, FileMetaData*>> expired_ttl_files_;
// These files are considered bottommost because none of their keys can exist // These files are considered bottommost because none of their keys can exist
// at lower levels. They are not necessarily all in the same level. The marked // at lower levels. They are not necessarily all in the same level. The marked
// ones are eligible for compaction because they contain duplicate key // ones are eligible for compaction because they contain duplicate key

@ -570,6 +570,13 @@ struct AdvancedColumnFamilyOptions {
// Default: false // Default: false
bool report_bg_io_stats = false; bool report_bg_io_stats = false;
// Non-bottom-level files older than TTL will go through the compaction
// process. This needs max_open_files to be set to -1.
// Enabled only for level compaction for now.
//
// Default: 0 (disabled)
uint64_t ttl = 0;
// Create ColumnFamilyOptions with default values for all fields // Create ColumnFamilyOptions with default values for all fields
AdvancedColumnFamilyOptions(); AdvancedColumnFamilyOptions();
// Create ColumnFamilyOptions from Options // Create ColumnFamilyOptions from Options

@ -80,6 +80,7 @@ enum class CompactionReason {
// [Level] Automatic compaction within bottommost level to cleanup duplicate // [Level] Automatic compaction within bottommost level to cleanup duplicate
// versions of same user key, usually due to a released snapshot. // versions of same user key, usually due to a released snapshot.
kBottommostFiles, kBottommostFiles,
kTtl,
}; };
enum class FlushReason : int { enum class FlushReason : int {

@ -74,7 +74,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
row_cache(db_options.row_cache), row_cache(db_options.row_cache),
max_subcompactions(db_options.max_subcompactions), max_subcompactions(db_options.max_subcompactions),
memtable_insert_with_hint_prefix_extractor( memtable_insert_with_hint_prefix_extractor(
cf_options.memtable_insert_with_hint_prefix_extractor.get()) {} cf_options.memtable_insert_with_hint_prefix_extractor.get()),
ttl(cf_options.ttl) {}
// Multiple two operands. If they overflow, return op1. // Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) { uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) {

@ -118,6 +118,8 @@ struct ImmutableCFOptions {
uint32_t max_subcompactions; uint32_t max_subcompactions;
const SliceTransform* memtable_insert_with_hint_prefix_extractor; const SliceTransform* memtable_insert_with_hint_prefix_extractor;
uint64_t ttl;
}; };
struct MutableCFOptions { struct MutableCFOptions {

@ -85,7 +85,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options)
optimize_filters_for_hits(options.optimize_filters_for_hits), optimize_filters_for_hits(options.optimize_filters_for_hits),
paranoid_file_checks(options.paranoid_file_checks), paranoid_file_checks(options.paranoid_file_checks),
force_consistency_checks(options.force_consistency_checks), force_consistency_checks(options.force_consistency_checks),
report_bg_io_stats(options.report_bg_io_stats) { report_bg_io_stats(options.report_bg_io_stats),
ttl(options.ttl) {
assert(memtable_factory.get() != nullptr); assert(memtable_factory.get() != nullptr);
if (max_bytes_for_level_multiplier_additional.size() < if (max_bytes_for_level_multiplier_additional.size() <
static_cast<unsigned int>(num_levels)) { static_cast<unsigned int>(num_levels)) {
@ -321,6 +322,7 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
force_consistency_checks); force_consistency_checks);
ROCKS_LOG_HEADER(log, " Options.report_bg_io_stats: %d", ROCKS_LOG_HEADER(log, " Options.report_bg_io_stats: %d",
report_bg_io_stats); report_bg_io_stats);
ROCKS_LOG_HEADER(log, " Options.ttl: %d", ttl);
} // ColumnFamilyOptions::Dump } // ColumnFamilyOptions::Dump
void Options::Dump(Logger* log) const { void Options::Dump(Logger* log) const {

@ -1819,7 +1819,10 @@ std::unordered_map<std::string, OptionTypeInfo>
{offset_of(&ColumnFamilyOptions::compaction_options_universal), {offset_of(&ColumnFamilyOptions::compaction_options_universal),
OptionType::kCompactionOptionsUniversal, OptionType::kCompactionOptionsUniversal,
OptionVerificationType::kNormal, true, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, compaction_options_universal)}}}; offsetof(struct MutableCFOptions, compaction_options_universal)}},
{"ttl",
{offset_of(&ColumnFamilyOptions::ttl), OptionType::kUInt64T,
OptionVerificationType::kNormal, false, 0}}};
std::unordered_map<std::string, OptionTypeInfo> std::unordered_map<std::string, OptionTypeInfo>
OptionsHelper::fifo_compaction_options_type_info = { OptionsHelper::fifo_compaction_options_type_info = {

@ -437,6 +437,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"hard_pending_compaction_bytes_limit=0;" "hard_pending_compaction_bytes_limit=0;"
"disable_auto_compactions=false;" "disable_auto_compactions=false;"
"report_bg_io_stats=true;" "report_bg_io_stats=true;"
"ttl=60;"
"compaction_options_fifo={max_table_files_size=3;ttl=100;allow_" "compaction_options_fifo={max_table_files_size=3;ttl=100;allow_"
"compaction=false;};", "compaction=false;};",
new_options)); new_options));

@ -349,6 +349,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, Random* rnd) {
// uint64_t options // uint64_t options
static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX); static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX);
cf_opt->ttl = uint_max + rnd->Uniform(10000);
cf_opt->max_sequential_skip_in_iterations = uint_max + rnd->Uniform(10000); cf_opt->max_sequential_skip_in_iterations = uint_max + rnd->Uniform(10000);
cf_opt->target_file_size_base = uint_max + rnd->Uniform(10000); cf_opt->target_file_size_base = uint_max + rnd->Uniform(10000);
cf_opt->max_compaction_bytes = cf_opt->max_compaction_bytes =

Loading…
Cancel
Save