single-file bottom-level compaction when snapshot released

Summary:
When snapshots are held for a long time, files may reach the bottom level containing overwritten/deleted keys. We previously had no mechanism to trigger compaction on such files. This particularly impacted DBs that write to different parts of the keyspace over time, as such files would never be naturally compacted due to second-last level files moving down. This PR introduces a mechanism for bottommost files to be recompacted upon releasing all snapshots that prevent them from dropping their deleted/overwritten keys.

- Changed `CompactionPicker` to compact files in `BottommostFilesMarkedForCompaction()`. These are the last choice when picking. Each file will be compacted alone and output to the same level in which it originated. The goal of this type of compaction is to rewrite the data excluding deleted/overwritten keys.
- Changed `ReleaseSnapshot()` to recompute the bottom files marked for compaction when the oldest existing snapshot changes, and schedule a compaction if needed. We cache the value that oldest existing snapshot needs to exceed in order for another file to be marked in `bottommost_files_mark_threshold_`, which allows us to avoid recomputing marked files for most snapshot releases.
- Changed `VersionStorageInfo` to track the list of bottommost files, which is recomputed every time the version changes by `UpdateBottommostFiles()`. The list of marked bottommost files is first computed in `ComputeBottommostFilesMarkedForCompaction()` when the version changes, but may also be recomputed when `ReleaseSnapshot()` is called.
- Extracted core logic of `Compaction::IsBottommostLevel()` into `VersionStorageInfo::RangeMightExistAfterSortedRun()` since logic to check whether a file is bottommost is now necessary outside of compaction.
Closes https://github.com/facebook/rocksdb/pull/3009

Differential Revision: D6062044

Pulled By: ajkr

fbshipit-source-id: 123d201cf140715a7d5928e8b3cb4f9cd9f7ad21
main
Andrew Kryczka 7 years ago committed by Facebook Github Bot
parent 96e3a600ba
commit 9b18cc2363
  1. 1
      HISTORY.md
  2. 40
      db/compaction.cc
  3. 31
      db/compaction_picker.cc
  4. 62
      db/db_compaction_test.cc
  5. 16
      db/db_impl.cc
  6. 86
      db/version_set.cc
  7. 62
      db/version_set.h
  8. 3
      include/rocksdb/listener.h

@ -9,6 +9,7 @@
* Support dynamically changing `ColumnFamilyOptions::compaction_options_fifo`. * Support dynamically changing `ColumnFamilyOptions::compaction_options_fifo`.
* Introduce `EventListener::OnStallConditionsChanged()` callback. Users can implement it to be notified when user writes are stalled, stopped, or resumed. * Introduce `EventListener::OnStallConditionsChanged()` callback. Users can implement it to be notified when user writes are stalled, stopped, or resumed.
* Add a new db property "rocksdb.estimate-oldest-key-time" to return oldest data timestamp. The property is available only for FIFO compaction with compaction_options_fifo.allow_compaction = false. * Add a new db property "rocksdb.estimate-oldest-key-time" to return oldest data timestamp. The property is available only for FIFO compaction with compaction_options_fifo.allow_compaction = false.
* Upon snapshot release, recompact bottommost files containing deleted/overwritten keys that previously could not be dropped due to the snapshot. This alleviates space-amp caused by long-held snapshots.
### Bug Fixes ### Bug Fixes
* Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery. * Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery.

@ -86,35 +86,23 @@ void Compaction::GetBoundaryKeys(
bool Compaction::IsBottommostLevel( bool Compaction::IsBottommostLevel(
int output_level, VersionStorageInfo* vstorage, int output_level, VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs) { const std::vector<CompactionInputFiles>& inputs) {
if (inputs[0].level == 0 && int output_l0_idx;
inputs[0].files.back() != vstorage->LevelFiles(0).back()) { if (output_level == 0) {
return false; output_l0_idx = 0;
for (const auto* file : vstorage->LevelFiles(0)) {
if (inputs[0].files.back() == file) {
break;
} }
++output_l0_idx;
Slice smallest_key, largest_key;
GetBoundaryKeys(vstorage, inputs, &smallest_key, &largest_key);
// Checks whether there are files living beyond the output_level.
// If lower levels have files, it checks for overlap between files
// if the compaction process and those files.
// Bottomlevel optimizations can be made if there are no files in
// lower levels or if there is no overlap with the files in
// the lower levels.
for (int i = output_level + 1; i < vstorage->num_levels(); i++) {
// It is not the bottommost level if there are files in higher
// levels when the output level is 0 or if there are files in
// higher levels which overlap with files to be compacted.
// output_level == 0 means that we want it to be considered
// s the bottommost level only if the last file on the level
// is a part of the files to be compacted - this is verified by
// the first if condition in this function
if (vstorage->NumLevelFiles(i) > 0 &&
(output_level == 0 ||
vstorage->OverlapInLevel(i, &smallest_key, &largest_key))) {
return false;
} }
assert(static_cast<size_t>(output_l0_idx) < vstorage->LevelFiles(0).size());
} else {
output_l0_idx = -1;
} }
return true; Slice smallest_key, largest_key;
GetBoundaryKeys(vstorage, inputs, &smallest_key, &largest_key);
return !vstorage->RangeMightExistAfterSortedRun(smallest_key, largest_key,
output_level, output_l0_idx);
} }
// test function to validate the functionality of IsBottommostLevel() // test function to validate the functionality of IsBottommostLevel()

@ -387,7 +387,10 @@ bool CompactionPicker::SetupOtherInputs(
assert(output_level_inputs->empty()); assert(output_level_inputs->empty());
const int input_level = inputs->level; const int input_level = inputs->level;
const int output_level = output_level_inputs->level; const int output_level = output_level_inputs->level;
assert(input_level != output_level); if (input_level == output_level) {
// no possibility of conflict
return true;
}
// For now, we only support merging two levels, start level and output level. // For now, we only support merging two levels, start level and output level.
// We need to assert other levels are empty. // We need to assert other levels are empty.
@ -938,6 +941,9 @@ void CompactionPicker::UnregisterCompaction(Compaction* c) {
bool LevelCompactionPicker::NeedsCompaction( bool LevelCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage) const { const VersionStorageInfo* vstorage) const {
if (!vstorage->BottommostFilesMarkedForCompaction().empty()) {
return true;
}
if (!vstorage->FilesMarkedForCompaction().empty()) { if (!vstorage->FilesMarkedForCompaction().empty()) {
return true; return true;
} }
@ -1128,7 +1134,28 @@ void LevelCompactionBuilder::SetupInitialFiles() {
is_manual_ = true; is_manual_ = true;
parent_index_ = base_index_ = -1; parent_index_ = base_index_ = -1;
PickFilesMarkedForCompaction(); PickFilesMarkedForCompaction();
if (!start_level_inputs_.empty()) { if (start_level_inputs_.empty()) {
size_t i;
for (i = 0; i < vstorage_->BottommostFilesMarkedForCompaction().size();
++i) {
auto& level_and_file =
vstorage_->BottommostFilesMarkedForCompaction()[i];
assert(!level_and_file.second->being_compacted);
start_level_inputs_.level = output_level_ = start_level_ =
level_and_file.first;
start_level_inputs_.files = {level_and_file.second};
if (compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&start_level_inputs_)) {
break;
}
}
if (i == vstorage_->BottommostFilesMarkedForCompaction().size()) {
start_level_inputs_.clear();
} else {
assert(!start_level_inputs_.empty());
compaction_reason_ = CompactionReason::kBottommostFiles;
}
} else {
compaction_reason_ = CompactionReason::kFilesMarkedForCompaction; compaction_reason_ = CompactionReason::kFilesMarkedForCompaction;
} }
} }

@ -2795,6 +2795,68 @@ TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
ASSERT_EQ("new_val", Get(Key(0))); ASSERT_EQ("new_val", Get(Key(0)));
} }
TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
// bottom-level files may contain deletions due to snapshots protecting the
// deleted keys. Once the snapshot is released, we should see files with many
// such deletions undergo single-file compactions.
const int kNumKeysPerFile = 1024;
const int kNumLevelFiles = 4;
const int kValueSize = 128;
Options options = CurrentOptions();
options.compression = kNoCompression;
options.level0_file_num_compaction_trigger = kNumLevelFiles;
// inflate it a bit to account for key/metadata overhead
options.target_file_size_base = 120 * kNumKeysPerFile * kValueSize / 100;
Reopen(options);
Random rnd(301);
const Snapshot* snapshot = nullptr;
for (int i = 0; i < kNumLevelFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
}
if (i == kNumLevelFiles - 1) {
snapshot = db_->GetSnapshot();
// delete every other key after grabbing a snapshot, so these deletions
// and the keys they cover can't be dropped until after the snapshot is
// released.
for (int j = 0; j < kNumLevelFiles * kNumKeysPerFile; j += 2) {
ASSERT_OK(Delete(Key(j)));
}
}
Flush();
if (i < kNumLevelFiles - 1) {
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1));
std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata;
db_->GetLiveFilesMetaData(&pre_release_metadata);
// just need to bump seqnum so ReleaseSnapshot knows the newest key in the SST
// files does not need to be preserved in case of a future snapshot.
ASSERT_OK(Put(Key(0), "val"));
// release snapshot and wait for compactions to finish. Single-file
// compactions should be triggered, which reduce the size of each bottom-level
// file without changing file count.
db_->ReleaseSnapshot(snapshot);
dbfull()->TEST_WaitForCompact();
db_->GetLiveFilesMetaData(&post_release_metadata);
ASSERT_EQ(pre_release_metadata.size(), post_release_metadata.size());
for (size_t i = 0; i < pre_release_metadata.size(); ++i) {
const auto& pre_file = pre_release_metadata[i];
const auto& post_file = post_release_metadata[i];
ASSERT_EQ(1, pre_file.level);
ASSERT_EQ(1, post_file.level);
// each file is smaller than it was before as it was rewritten without
// deletion markers/deleted keys.
ASSERT_LT(post_file.size, pre_file.size);
}
}
INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
::testing::Values(std::make_tuple(1, true), ::testing::Values(std::make_tuple(1, true),
std::make_tuple(1, false), std::make_tuple(1, false),

@ -1616,6 +1616,22 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
snapshots_.Delete(casted_s); snapshots_.Delete(casted_s);
uint64_t oldest_snapshot;
if (snapshots_.empty()) {
oldest_snapshot = versions_->LastSequence();
} else {
oldest_snapshot = snapshots_.oldest()->number_;
}
for (auto* cfd : *versions_->GetColumnFamilySet()) {
cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
if (!cfd->current()
->storage_info()
->BottommostFilesMarkedForCompaction()
.empty()) {
SchedulePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
}
}
} }
delete casted_s; delete casted_s;
} }

@ -931,6 +931,7 @@ VersionStorageInfo::VersionStorageInfo(
current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_; current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
current_num_deletions_ = ref_vstorage->current_num_deletions_; current_num_deletions_ = ref_vstorage->current_num_deletions_;
current_num_samples_ = ref_vstorage->current_num_samples_; current_num_samples_ = ref_vstorage->current_num_samples_;
oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
} }
} }
@ -1090,6 +1091,7 @@ void Version::PrepareApply(
storage_info_.GenerateFileIndexer(); storage_info_.GenerateFileIndexer();
storage_info_.GenerateLevelFilesBrief(); storage_info_.GenerateLevelFilesBrief();
storage_info_.GenerateLevel0NonOverlapping(); storage_info_.GenerateLevel0NonOverlapping();
storage_info_.GenerateBottommostFiles();
} }
bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) { bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
@ -1447,6 +1449,7 @@ void VersionStorageInfo::ComputeCompactionScore(
} }
} }
ComputeFilesMarkedForCompaction(); ComputeFilesMarkedForCompaction();
ComputeBottommostFilesMarkedForCompaction();
EstimateCompactionBytesNeeded(mutable_cf_options); EstimateCompactionBytesNeeded(mutable_cf_options);
} }
@ -1522,6 +1525,7 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
// 4. GenerateFileIndexer(); // 4. GenerateFileIndexer();
// 5. GenerateLevelFilesBrief(); // 5. GenerateLevelFilesBrief();
// 6. GenerateLevel0NonOverlapping(); // 6. GenerateLevel0NonOverlapping();
// 7. GenerateBottommostFiles();
void VersionStorageInfo::SetFinalized() { void VersionStorageInfo::SetFinalized() {
finalized_ = true; finalized_ = true;
#ifndef NDEBUG #ifndef NDEBUG
@ -1698,6 +1702,58 @@ void VersionStorageInfo::GenerateLevel0NonOverlapping() {
} }
} }
void VersionStorageInfo::GenerateBottommostFiles() {
assert(!finalized_);
assert(bottommost_files_.empty());
for (size_t level = 0; level < level_files_brief_.size(); ++level) {
for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
++file_idx) {
const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
int l0_file_idx;
if (level == 0) {
l0_file_idx = static_cast<int>(file_idx);
} else {
l0_file_idx = -1;
}
if (!RangeMightExistAfterSortedRun(f.smallest_key, f.largest_key,
static_cast<int>(level),
l0_file_idx)) {
bottommost_files_.emplace_back(static_cast<int>(level),
f.file_metadata);
}
}
}
}
void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
assert(seqnum >= oldest_snapshot_seqnum_);
oldest_snapshot_seqnum_ = seqnum;
if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
ComputeBottommostFilesMarkedForCompaction();
}
}
void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
bottommost_files_marked_for_compaction_.clear();
bottommost_files_mark_threshold_ = kMaxSequenceNumber;
for (auto& level_and_file : bottommost_files_) {
if (!level_and_file.second->being_compacted &&
level_and_file.second->largest_seqno != 0 &&
level_and_file.second->num_deletions > 1) {
// largest_seqno might be nonzero due to containing the final key in an
// earlier compaction, whose seqnum we didn't zero out. Multiple deletions
// ensures the file really contains deleted or overwritten keys.
if (level_and_file.second->largest_seqno < oldest_snapshot_seqnum_) {
bottommost_files_marked_for_compaction_.push_back(level_and_file);
} else {
bottommost_files_mark_threshold_ =
std::min(bottommost_files_mark_threshold_,
level_and_file.second->largest_seqno);
}
}
}
}
void Version::Ref() { void Version::Ref() {
++refs_; ++refs_;
} }
@ -2251,6 +2307,36 @@ uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
return size; return size;
} }
bool VersionStorageInfo::RangeMightExistAfterSortedRun(
const Slice& smallest_key, const Slice& largest_key, int last_level,
int last_l0_idx) {
assert((last_l0_idx != -1) == (last_level == 0));
// TODO(ajkr): this preserves earlier behavior where we considered an L0 file
// bottommost only if it's the oldest L0 file and there are no files on older
// levels. It'd be better to consider it bottommost if there's no overlap in
// older levels/files.
if (last_level == 0 &&
last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
return true;
}
// Checks whether there are files living beyond the `last_level`. If lower
// levels have files, it checks for overlap between [`smallest_key`,
// `largest_key`] and those files. Bottomlevel optimizations can be made if
// there are no files in lower levels or if there is no overlap with the files
// in the lower levels.
for (int level = last_level + 1; level < num_levels(); level++) {
// The range is not in the bottommost level if there are files in lower
// levels when the `last_level` is 0 or if there are files in lower levels
// which overlap with [`smallest_key`, `largest_key`].
if (files_[level].size() > 0 &&
(last_level == 0 ||
OverlapInLevel(level, &smallest_key, &largest_key))) {
return true;
}
}
return false;
}
void Version::AddLiveFiles(std::vector<FileDescriptor>* live) { void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
for (int level = 0; level < storage_info_.num_levels(); level++) { for (int level = 0; level < storage_info_.num_levels(); level++) {

@ -135,6 +135,18 @@ class VersionStorageInfo {
// ComputeCompactionScore() // ComputeCompactionScore()
void ComputeFilesMarkedForCompaction(); void ComputeFilesMarkedForCompaction();
// This computes bottommost_files_marked_for_compaction_ and is called by
// ComputeCompactionScore() or UpdateOldestSnapshot().
//
// Among bottommost files (assumes they've already been computed), marks the
// ones that have keys that would be eliminated if recompacted, according to
// the seqnum of the oldest existing snapshot. Must be called every time
// oldest snapshot changes as that is when bottom-level files can become
// eligible for compaction.
//
// REQUIRES: DB mutex held
void ComputeBottommostFilesMarkedForCompaction();
// Generate level_files_brief_ from files_ // Generate level_files_brief_ from files_
void GenerateLevelFilesBrief(); void GenerateLevelFilesBrief();
// Sort all files for this version based on their file size and // Sort all files for this version based on their file size and
@ -147,6 +159,16 @@ class VersionStorageInfo {
return level0_non_overlapping_; return level0_non_overlapping_;
} }
// Check whether each file in this version is bottommost (i.e., nothing in its
// key-range could possibly exist in an older file/level).
// REQUIRES: This version has not been saved
void GenerateBottommostFiles();
// Updates the oldest snapshot and related internal state, like the bottommost
// files marked for compaction.
// REQUIRES: DB mutex held
void UpdateOldestSnapshot(SequenceNumber oldest_snapshot_seqnum);
int MaxInputLevel() const; int MaxInputLevel() const;
int MaxOutputLevel(bool allow_ingest_behind) const; int MaxOutputLevel(bool allow_ingest_behind) const;
@ -264,6 +286,14 @@ class VersionStorageInfo {
return files_marked_for_compaction_; return files_marked_for_compaction_;
} }
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
// REQUIRES: DB mutex held during access
const autovector<std::pair<int, FileMetaData*>>&
BottommostFilesMarkedForCompaction() const {
assert(finalized_);
return bottommost_files_marked_for_compaction_;
}
int base_level() const { return base_level_; } int base_level() const { return base_level_; }
// REQUIRES: lock is held // REQUIRES: lock is held
@ -357,6 +387,16 @@ class VersionStorageInfo {
bool force_consistency_checks() const { return force_consistency_checks_; } bool force_consistency_checks() const { return force_consistency_checks_; }
// Returns whether any key in [`smallest_key`, `largest_key`] could appear in
// an older L0 file than `last_l0_idx` or in a greater level than `last_level`
//
// @param last_level Level after which we check for overlap
// @param last_l0_idx If `last_level == 0`, index of L0 file after which we
// check for overlap; otherwise, must be -1
bool RangeMightExistAfterSortedRun(const Slice& smallest_key,
const Slice& largest_key, int last_level,
int last_l0_idx);
private: private:
const InternalKeyComparator* internal_comparator_; const InternalKeyComparator* internal_comparator_;
const Comparator* user_comparator_; const Comparator* user_comparator_;
@ -406,6 +446,28 @@ class VersionStorageInfo {
// ComputeCompactionScore() // ComputeCompactionScore()
autovector<std::pair<int, FileMetaData*>> files_marked_for_compaction_; autovector<std::pair<int, FileMetaData*>> files_marked_for_compaction_;
// These files are considered bottommost because none of their keys can exist
// at lower levels. They are not necessarily all in the same level. The marked
// ones are eligible for compaction because they contain duplicate key
// versions that are no longer protected by snapshot. These variables are
// protected by DB mutex and are calculated in `GenerateBottommostFiles()` and
// `ComputeBottommostFilesMarkedForCompaction()`.
autovector<std::pair<int, FileMetaData*>> bottommost_files_;
autovector<std::pair<int, FileMetaData*>>
bottommost_files_marked_for_compaction_;
// Threshold for needing to mark another bottommost file. Maintain it so we
// can quickly check when releasing a snapshot whether more bottommost files
// became eligible for compaction. It's defined as the min of the max nonzero
// seqnums of unmarked bottommost files.
SequenceNumber bottommost_files_mark_threshold_ = kMaxSequenceNumber;
// Monotonically increases as we release old snapshots. Zero indicates no
// snapshots have been released yet. When no snapshots remain we set it to the
// current seqnum, which needs to be protected as a snapshot can still be
// created that references it.
SequenceNumber oldest_snapshot_seqnum_ = 0;
// Level that should be compacted next and its compaction score. // Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields // Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize(). // are initialized by Finalize().

@ -77,6 +77,9 @@ enum class CompactionReason {
kManualCompaction, kManualCompaction,
// DB::SuggestCompactRange() marked files for compaction // DB::SuggestCompactRange() marked files for compaction
kFilesMarkedForCompaction, kFilesMarkedForCompaction,
// [Level] Automatic compaction within bottommost level to cleanup duplicate
// versions of same user key, usually due to a released snapshot.
kBottommostFiles,
}; };
enum class BackgroundErrorReason { enum class BackgroundErrorReason {

Loading…
Cancel
Save