L0 Subcompaction to trim input files (#9802)

Summary:
When sub compaction is decided for L0->L1 compaction, most of the cases, all L0 files will be involved in all sub compactions. However, it is not always the case. When files are generally (but not strictly) inserted in sequential order, there can be a subset of L0 files invovled. Yet RocksDB always open all those L0 files, and build an iterator, read many of the files' first of last block with expensive readahead. We trim some input files to reduce overhead a little bit.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9802

Test Plan: Add a unit test to cover this case and manually validate the behavior while running the test.

Reviewed By: ajkr

Differential Revision: D35371031

fbshipit-source-id: 701ed7375b5cbe41672e93b38fe8a1503dad08b6
main
sdong 3 years ago committed by Facebook GitHub Bot
parent 8ce7cea93f
commit e03f8a0c12
  1. 12
      db/compaction/compaction_job.cc
  2. 68
      db/db_compaction_test.cc
  3. 24
      db/version_set.cc
  4. 6
      db/version_set.h

@ -1341,6 +1341,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(), CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
existing_snapshots_); existing_snapshots_);
// TODO: since we already use C++17, should use
// std::optional<const Slice> instead.
const Slice* const start = sub_compact->start; const Slice* const start = sub_compact->start;
const Slice* const end = sub_compact->end; const Slice* const end = sub_compact->end;
@ -1362,9 +1364,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// Although the v2 aggregator is what the level iterator(s) know about, // Although the v2 aggregator is what the level iterator(s) know about,
// the AddTombstones calls will be propagated down to the v1 aggregator. // the AddTombstones calls will be propagated down to the v1 aggregator.
std::unique_ptr<InternalIterator> raw_input( std::unique_ptr<InternalIterator> raw_input(versions_->MakeInputIterator(
versions_->MakeInputIterator(read_options, sub_compact->compaction, read_options, sub_compact->compaction, &range_del_agg,
&range_del_agg, file_options_for_read_)); file_options_for_read_,
(start == nullptr) ? std::optional<const Slice>{}
: std::optional<const Slice>{*start},
(end == nullptr) ? std::optional<const Slice>{}
: std::optional<const Slice>{*end}));
InternalIterator* input = raw_input.get(); InternalIterator* input = raw_input.get();
IterKey start_ikey; IterKey start_ikey;

@ -1354,6 +1354,74 @@ TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) {
} }
} }
TEST_P(DBCompactionTestWithParam, PartialOverlappingL0) {
class SubCompactionEventListener : public EventListener {
public:
void OnSubcompactionCompleted(const SubcompactionJobInfo&) override {
sub_compaction_finished_++;
}
std::atomic<int> sub_compaction_finished_{0};
};
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.write_buffer_size = 10 * 1024 * 1024;
options.max_subcompactions = max_subcompactions_;
SubCompactionEventListener* listener = new SubCompactionEventListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options);
// For subcompactino to trigger, output level needs to be non-empty.
ASSERT_OK(Put("key", ""));
ASSERT_OK(Put("kez", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("key", ""));
ASSERT_OK(Put("kez", ""));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Ranges that are only briefly overlapping so that they won't be trivially
// moved but subcompaction ranges would only contain a subset of files.
std::vector<std::pair<int32_t, int32_t>> ranges = {
{100, 199}, {198, 399}, {397, 600}, {598, 800}, {799, 900}, {895, 999},
};
int32_t value_size = 10 * 1024; // 10 KB
Random rnd(301);
std::map<int32_t, std::string> values;
for (size_t i = 0; i < ranges.size(); i++) {
for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
values[j] = rnd.RandomString(value_size);
ASSERT_OK(Put(Key(j), values[j]));
}
ASSERT_OK(Flush());
}
int32_t level0_files = NumTableFilesAtLevel(0, 0);
ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1); // One file in L1
listener->sub_compaction_finished_ = 0;
ASSERT_OK(db_->EnableAutoCompaction({db_->DefaultColumnFamily()}));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
if (max_subcompactions_ > 3) {
// RocksDB might not generate the exact number of sub compactions.
// Here we validate that at least subcompaction happened.
ASSERT_GT(listener->sub_compaction_finished_.load(), 2);
}
// We expect that all the files were compacted to L1
ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
ASSERT_GT(NumTableFilesAtLevel(1, 0), 1);
for (size_t i = 0; i < ranges.size(); i++) {
for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
ASSERT_EQ(Get(Key(j)), values[j]);
}
}
}
TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
int32_t trivial_move = 0; int32_t trivial_move = 0;
int32_t non_trivial_move = 0; int32_t non_trivial_move = 0;

@ -27,6 +27,7 @@
#include "db/blob/blob_log_format.h" #include "db/blob/blob_log_format.h"
#include "db/compaction/compaction.h" #include "db/compaction/compaction.h"
#include "db/compaction/file_pri.h" #include "db/compaction/file_pri.h"
#include "db/dbformat.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/log_writer.h" #include "db/log_writer.h"
@ -5816,7 +5817,9 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_table_files,
InternalIterator* VersionSet::MakeInputIterator( InternalIterator* VersionSet::MakeInputIterator(
const ReadOptions& read_options, const Compaction* c, const ReadOptions& read_options, const Compaction* c,
RangeDelAggregator* range_del_agg, RangeDelAggregator* range_del_agg,
const FileOptions& file_options_compactions) { const FileOptions& file_options_compactions,
const std::optional<const Slice>& start,
const std::optional<const Slice>& end) {
auto cfd = c->column_family_data(); auto cfd = c->column_family_data();
// Level-0 files have to be merged together. For other levels, // Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level. // we will make a concatenating iterator per level.
@ -5831,10 +5834,25 @@ InternalIterator* VersionSet::MakeInputIterator(
if (c->level(which) == 0) { if (c->level(which) == 0) {
const LevelFilesBrief* flevel = c->input_levels(which); const LevelFilesBrief* flevel = c->input_levels(which);
for (size_t i = 0; i < flevel->num_files; i++) { for (size_t i = 0; i < flevel->num_files; i++) {
const FileMetaData& fmd = *flevel->files[i].file_metadata;
if (start.has_value() &&
cfd->user_comparator()->Compare(start.value(),
fmd.largest.user_key()) > 0) {
continue;
}
// We should be able to filter out the case where the end key
// equals to the end boundary, since the end key is exclusive.
// We try to be extra safe here.
if (end.has_value() &&
cfd->user_comparator()->Compare(end.value(),
fmd.smallest.user_key()) < 0) {
continue;
}
list[num++] = cfd->table_cache()->NewIterator( list[num++] = cfd->table_cache()->NewIterator(
read_options, file_options_compactions, read_options, file_options_compactions,
cfd->internal_comparator(), *flevel->files[i].file_metadata, cfd->internal_comparator(), fmd, range_del_agg,
range_del_agg, c->mutable_cf_options()->prefix_extractor, c->mutable_cf_options()->prefix_extractor,
/*table_reader_ptr=*/nullptr, /*table_reader_ptr=*/nullptr,
/*file_read_hist=*/nullptr, TableReaderCaller::kCompaction, /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
/*arena=*/nullptr, /*arena=*/nullptr,

@ -24,6 +24,7 @@
#include <limits> #include <limits>
#include <map> #include <map>
#include <memory> #include <memory>
#include <optional>
#include <set> #include <set>
#include <string> #include <string>
#include <unordered_set> #include <unordered_set>
@ -1261,10 +1262,13 @@ class VersionSet {
// Create an iterator that reads over the compaction inputs for "*c". // Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed. // The caller should delete the iterator when no longer needed.
// @param read_options Must outlive the returned iterator. // @param read_options Must outlive the returned iterator.
// @param start, end indicates compaction range
InternalIterator* MakeInputIterator( InternalIterator* MakeInputIterator(
const ReadOptions& read_options, const Compaction* c, const ReadOptions& read_options, const Compaction* c,
RangeDelAggregator* range_del_agg, RangeDelAggregator* range_del_agg,
const FileOptions& file_options_compactions); const FileOptions& file_options_compactions,
const std::optional<const Slice>& start,
const std::optional<const Slice>& end);
// Add all files listed in any live version to *live_table_files and // Add all files listed in any live version to *live_table_files and
// *live_blob_files. Note that these lists may contain duplicates. // *live_blob_files. Note that these lists may contain duplicates.

Loading…
Cancel
Save