Allow ttl to be changed dynamically (#4133)

Summary:
Allow ttl to be changed dynamically.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4133

Differential Revision: D8845440

Pulled By: sagar0

fbshipit-source-id: c8c87ae643b3a8c4123e4c037c4645efc094a2d3
main
Sagar Vemuri 7 years ago committed by Facebook Github Bot
parent 8f06b4fa01
commit 991120fa10
  1. 3
      HISTORY.md
  2. 56
      db/db_compaction_test.cc
  3. 11
      db/version_set.cc
  4. 3
      db/version_set.h
  5. 2
      include/rocksdb/advanced_options.h
  6. 3
      options/cf_options.cc
  7. 5
      options/cf_options.h
  8. 4
      options/options_helper.cc

@ -11,7 +11,8 @@
* Add a new table property, "rocksdb.num.range-deletions", which counts the number of range deletion tombstones in the table. * Add a new table property, "rocksdb.num.range-deletions", which counts the number of range deletion tombstones in the table.
* Improve the performance of iterators doing long range scans by using readahead, when using direct IO. * Improve the performance of iterators doing long range scans by using readahead, when using direct IO.
* pin_top_level_index_and_filter (default true) in BlockBasedTableOptions can be used in combination with cache_index_and_filter_blocks to prefetch and pin the top-level index of partitioned index and filter blocks in cache. It has no impact when cache_index_and_filter_blocks is false. * pin_top_level_index_and_filter (default true) in BlockBasedTableOptions can be used in combination with cache_index_and_filter_blocks to prefetch and pin the top-level index of partitioned index and filter blocks in cache. It has no impact when cache_index_and_filter_blocks is false.
* Avoid memcpy when reading mmap files with OpenReadOnly and max_open_files==-1 * Avoid memcpy when reading mmap files with OpenReadOnly and max_open_files==-1.
* Support dynamically changing `ColumnFamilyOptions::ttl` via `SetOptions()`.
### Bug Fixes ### Bug Fixes
* fix deadlock with enable_pipelined_write=true and max_successive_merges > 0 * fix deadlock with enable_pipelined_write=true and max_successive_merges > 0

@ -3376,14 +3376,13 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
} }
Flush(); Flush();
} }
Flush();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
MoveFilesToLevel(3); MoveFilesToLevel(3);
ASSERT_EQ("0,0,0,2", FilesPerLevel()); ASSERT_EQ("0,0,0,2", FilesPerLevel());
// Delete previously written keys.
for (int i = 0; i < kNumLevelFiles; ++i) { for (int i = 0; i < kNumLevelFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) { for (int j = 0; j < kNumKeysPerFile; ++j) {
// Overwrite previous keys with smaller, but predictable, values.
ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j))); ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
} }
Flush(); Flush();
@ -3396,7 +3395,7 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
env_->addon_time_.fetch_add(36 * 60 * 60); // 36 hours env_->addon_time_.fetch_add(36 * 60 * 60); // 36 hours
ASSERT_EQ("0,2,0,2", FilesPerLevel()); ASSERT_EQ("0,2,0,2", FilesPerLevel());
// Just do a siimple write + flush so that the Ttl expired files get // Just do a simple write + flush so that the Ttl expired files get
// compacted. // compacted.
ASSERT_OK(Put("a", "1")); ASSERT_OK(Put("a", "1"));
Flush(); Flush();
@ -3410,6 +3409,57 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
// All non-L0 files are deleted, as they contained only deleted data. // All non-L0 files are deleted, as they contained only deleted data.
ASSERT_EQ("1", FilesPerLevel()); ASSERT_EQ("1", FilesPerLevel());
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
// Test dynamically changing ttl.
env_->addon_time_.store(0);
DestroyAndReopen(options);
for (int i = 0; i < kNumLevelFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
}
Flush();
}
dbfull()->TEST_WaitForCompact();
MoveFilesToLevel(3);
ASSERT_EQ("0,0,0,2", FilesPerLevel());
// Delete previously written keys.
for (int i = 0; i < kNumLevelFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
}
Flush();
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("2,0,0,2", FilesPerLevel());
MoveFilesToLevel(1);
ASSERT_EQ("0,2,0,2", FilesPerLevel());
// Move time forward by 12 hours, and make sure that compaction still doesn't
// trigger as ttl is set to 24 hours.
env_->addon_time_.fetch_add(12 * 60 * 60);
ASSERT_OK(Put("a", "1"));
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("1,2,0,2", FilesPerLevel());
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Dynamically change ttl to 10 hours.
// This should trigger a ttl compaction, as 12 hours have already passed.
ASSERT_OK(dbfull()->SetOptions({{"ttl", "36000"}}));
dbfull()->TEST_WaitForCompact();
// All non-L0 files are deleted, as they contained only deleted data.
ASSERT_EQ("1", FilesPerLevel());
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) { TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {

@ -1665,8 +1665,8 @@ void VersionStorageInfo::ComputeCompactionScore(
} }
ComputeFilesMarkedForCompaction(); ComputeFilesMarkedForCompaction();
ComputeBottommostFilesMarkedForCompaction(); ComputeBottommostFilesMarkedForCompaction();
if (immutable_cf_options.ttl > 0) { if (mutable_cf_options.ttl > 0) {
ComputeExpiredTtlFiles(immutable_cf_options); ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
} }
EstimateCompactionBytesNeeded(mutable_cf_options); EstimateCompactionBytesNeeded(mutable_cf_options);
} }
@ -1695,8 +1695,8 @@ void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
} }
void VersionStorageInfo::ComputeExpiredTtlFiles( void VersionStorageInfo::ComputeExpiredTtlFiles(
const ImmutableCFOptions& ioptions) { const ImmutableCFOptions& ioptions, const uint64_t ttl) {
assert(ioptions.ttl > 0); assert(ttl > 0);
expired_ttl_files_.clear(); expired_ttl_files_.clear();
@ -1713,8 +1713,7 @@ void VersionStorageInfo::ComputeExpiredTtlFiles(
f->fd.table_reader->GetTableProperties() != nullptr) { f->fd.table_reader->GetTableProperties() != nullptr) {
auto creation_time = auto creation_time =
f->fd.table_reader->GetTableProperties()->creation_time; f->fd.table_reader->GetTableProperties()->creation_time;
if (creation_time > 0 && if (creation_time > 0 && creation_time < (current_time - ttl)) {
creation_time < (current_time - ioptions.ttl)) {
expired_ttl_files_.emplace_back(level, f); expired_ttl_files_.emplace_back(level, f);
} }
} }

@ -137,7 +137,8 @@ class VersionStorageInfo {
// This computes ttl_expired_files_ and is called by // This computes ttl_expired_files_ and is called by
// ComputeCompactionScore() // ComputeCompactionScore()
void ComputeExpiredTtlFiles(const ImmutableCFOptions& ioptions); void ComputeExpiredTtlFiles(const ImmutableCFOptions& ioptions,
const uint64_t ttl);
// This computes bottommost_files_marked_for_compaction_ and is called by // This computes bottommost_files_marked_for_compaction_ and is called by
// ComputeCompactionScore() or UpdateOldestSnapshot(). // ComputeCompactionScore() or UpdateOldestSnapshot().

@ -596,6 +596,8 @@ struct AdvancedColumnFamilyOptions {
// Enabled only for level compaction for now. // Enabled only for level compaction for now.
// //
// Default: 0 (disabled) // Default: 0 (disabled)
//
// Dynamically changeable through SetOptions() API
uint64_t ttl = 0; uint64_t ttl = 0;
// Create ColumnFamilyOptions with default values for all fields // Create ColumnFamilyOptions with default values for all fields

@ -75,7 +75,6 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
max_subcompactions(db_options.max_subcompactions), max_subcompactions(db_options.max_subcompactions),
memtable_insert_with_hint_prefix_extractor( memtable_insert_with_hint_prefix_extractor(
cf_options.memtable_insert_with_hint_prefix_extractor.get()), cf_options.memtable_insert_with_hint_prefix_extractor.get()),
ttl(cf_options.ttl),
cf_paths(cf_options.cf_paths) {} cf_paths(cf_options.cf_paths) {}
// Multiple two operands. If they overflow, return op1. // Multiple two operands. If they overflow, return op1.
@ -168,6 +167,8 @@ void MutableCFOptions::Dump(Logger* log) const {
max_bytes_for_level_base); max_bytes_for_level_base);
ROCKS_LOG_INFO(log, " max_bytes_for_level_multiplier: %f", ROCKS_LOG_INFO(log, " max_bytes_for_level_multiplier: %f",
max_bytes_for_level_multiplier); max_bytes_for_level_multiplier);
ROCKS_LOG_INFO(log, " ttl: %" PRIu64,
ttl);
std::string result; std::string result;
char buf[10]; char buf[10];
for (const auto m : max_bytes_for_level_multiplier_additional) { for (const auto m : max_bytes_for_level_multiplier_additional) {

@ -119,8 +119,6 @@ struct ImmutableCFOptions {
const SliceTransform* memtable_insert_with_hint_prefix_extractor; const SliceTransform* memtable_insert_with_hint_prefix_extractor;
uint64_t ttl;
std::vector<DbPath> cf_paths; std::vector<DbPath> cf_paths;
}; };
@ -149,6 +147,7 @@ struct MutableCFOptions {
target_file_size_multiplier(options.target_file_size_multiplier), target_file_size_multiplier(options.target_file_size_multiplier),
max_bytes_for_level_base(options.max_bytes_for_level_base), max_bytes_for_level_base(options.max_bytes_for_level_base),
max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier), max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier),
ttl(options.ttl),
max_bytes_for_level_multiplier_additional( max_bytes_for_level_multiplier_additional(
options.max_bytes_for_level_multiplier_additional), options.max_bytes_for_level_multiplier_additional),
compaction_options_fifo(options.compaction_options_fifo), compaction_options_fifo(options.compaction_options_fifo),
@ -181,6 +180,7 @@ struct MutableCFOptions {
target_file_size_multiplier(0), target_file_size_multiplier(0),
max_bytes_for_level_base(0), max_bytes_for_level_base(0),
max_bytes_for_level_multiplier(0), max_bytes_for_level_multiplier(0),
ttl(0),
compaction_options_fifo(), compaction_options_fifo(),
max_sequential_skip_in_iterations(0), max_sequential_skip_in_iterations(0),
paranoid_file_checks(false), paranoid_file_checks(false),
@ -228,6 +228,7 @@ struct MutableCFOptions {
int target_file_size_multiplier; int target_file_size_multiplier;
uint64_t max_bytes_for_level_base; uint64_t max_bytes_for_level_base;
double max_bytes_for_level_multiplier; double max_bytes_for_level_multiplier;
uint64_t ttl;
std::vector<int> max_bytes_for_level_multiplier_additional; std::vector<int> max_bytes_for_level_multiplier_additional;
CompactionOptionsFIFO compaction_options_fifo; CompactionOptionsFIFO compaction_options_fifo;
CompactionOptionsUniversal compaction_options_universal; CompactionOptionsUniversal compaction_options_universal;

@ -168,6 +168,7 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
mutable_cf_options.max_bytes_for_level_base; mutable_cf_options.max_bytes_for_level_base;
cf_opts.max_bytes_for_level_multiplier = cf_opts.max_bytes_for_level_multiplier =
mutable_cf_options.max_bytes_for_level_multiplier; mutable_cf_options.max_bytes_for_level_multiplier;
cf_opts.ttl = mutable_cf_options.ttl;
cf_opts.max_bytes_for_level_multiplier_additional.clear(); cf_opts.max_bytes_for_level_multiplier_additional.clear();
for (auto value : for (auto value :
@ -1867,7 +1868,8 @@ std::unordered_map<std::string, OptionTypeInfo>
offsetof(struct MutableCFOptions, compaction_options_universal)}}, offsetof(struct MutableCFOptions, compaction_options_universal)}},
{"ttl", {"ttl",
{offset_of(&ColumnFamilyOptions::ttl), OptionType::kUInt64T, {offset_of(&ColumnFamilyOptions::ttl), OptionType::kUInt64T,
OptionVerificationType::kNormal, false, 0}}}; OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, ttl)}}};
std::unordered_map<std::string, OptionTypeInfo> std::unordered_map<std::string, OptionTypeInfo>
OptionsHelper::fifo_compaction_options_type_info = { OptionsHelper::fifo_compaction_options_type_info = {

Loading…
Cancel
Save