You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rocksdb/db/compaction/subcompaction_state.h

223 lines
9.0 KiB

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <optional>
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_garbage_meter.h"
#include "db/compaction/compaction.h"
#include "db/compaction/compaction_iterator.h"
#include "db/compaction/compaction_outputs.h"
#include "db/internal_stats.h"
#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
namespace ROCKSDB_NAMESPACE {
// Maintains state and outputs for each sub-compaction
// It contains 2 `CompactionOutputs`:
// 1. one for the normal output files
// 2. another for the penultimate level outputs
// a `current` pointer maintains the current output group, when calling
// `AddToOutput()`, it checks the output of the current compaction_iterator key
// and point `current` to the target output group. By default, it just points to
// normal compaction_outputs, if the compaction_iterator key should be placed on
// the penultimate level, `current` is changed to point to
// `penultimate_level_outputs`.
// The later operations uses `Current()` to get the target group.
//
// +----------+ +-----------------------------+ +---------+
// | *current |--------> | compaction_outputs |----->| output |
// +----------+ +-----------------------------+ +---------+
// | | output |
// | +---------+
// | | ... |
// |
// | +-----------------------------+ +---------+
// +-------------> | penultimate_level_outputs |----->| output |
// +-----------------------------+ +---------+
// | ... |
class SubcompactionState {
public:
const Compaction* compaction;
// The boundaries of the key-range this compaction is interested in. No two
// sub-compactions may have overlapping key-ranges.
// 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
const std::optional<Slice> start, end;
// The return status of this sub-compaction
Status status;
// The return IO Status of this sub-compaction
IOStatus io_status;
// Notify on sub-compaction completion only if listener was notified on
// sub-compaction begin.
bool notify_on_subcompaction_completion = false;
// compaction job stats for this sub-compaction
CompactionJobStats compaction_job_stats;
// sub-compaction job id, which is used to identify different sub-compaction
// within the same compaction job.
const uint32_t sub_job_id;
Slice SmallestUserKey() const;
Slice LargestUserKey() const;
// Get all outputs from the subcompaction. For per_key_placement compaction,
// it returns both the last level outputs and penultimate level outputs.
OutputIterator GetOutputs() const;
// Assign range dels aggregator, for each range_del, it can only be assigned
// to one output level, for per_key_placement, it's going to be the
// penultimate level.
Consider range tombstone in compaction output file cutting (#10802) Summary: This PR is the first step for Issue https://github.com/facebook/rocksdb/issues/4811. Currently compaction output files are cut at point keys, and the decision is made mainly in `CompactionOutputs::ShouldStopBefore()`. This makes it possible for range tombstones to cause large compactions that does not respect `max_compaction_bytes`. For example, we can have a large range tombstone that overlaps with too many files from the next level. Another example is when there is a gap between a range tombstone and another key. The first issue may be more acceptable, as a lot of data is deleted. This PR address the second issue by calling `ShouldStopBefore()` for range tombstone start keys. The main change is for `CompactionIterator` to emit range tombstone start keys to be processed by `CompactionOutputs`. A new `CompactionMergingIterator` is introduced and only used under `CompactionIterator` for this purpose. Further improvement after this PR include 1) cut compaction output at some grandparent boundary key instead of at the next point key or range tombstone start key and 2) cut compaction output file within a large range tombstone (it may be easier and reasonable to only do it for range tombstones at the end of a compaction output). Pull Request resolved: https://github.com/facebook/rocksdb/pull/10802 Test Plan: - added unit tests in db_range_del_test. - stress test: `python3 tools/db_crashtest.py whitebox --[simple|enable_ts] --verify_iterator_with_expected_state_one_in=5 --delrangepercent=5 --prefixpercent=2 --writepercent=58 --readpercen=21 --duration=36000 --range_deletion_width=1000000` Reviewed By: ajkr, jay-zhuang Differential Revision: D40308827 Pulled By: cbi42 fbshipit-source-id: a8fd6f70a3f09d0ef7a40e006f6c964bba8c00df
2 years ago
// TODO: This does not work for per_key_placement + user-defined timestamp +
// DeleteRange() combo. If user-defined timestamp is enabled,
// it is possible for a range tombstone to belong to bottommost level (
// seqno < earliest snapshot) without being dropped (garbage collection
// for user-defined timestamp).
void AssignRangeDelAggregator(
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
if (compaction->SupportsPerKeyPlacement()) {
penultimate_level_outputs_.AssignRangeDelAggregator(
std::move(range_del_agg));
} else {
compaction_outputs_.AssignRangeDelAggregator(std::move(range_del_agg));
}
}
void RemoveLastEmptyOutput() {
compaction_outputs_.RemoveLastEmptyOutput();
penultimate_level_outputs_.RemoveLastEmptyOutput();
}
#ifndef ROCKSDB_LITE
void BuildSubcompactionJobInfo(
SubcompactionJobInfo& subcompaction_job_info) const {
const Compaction* c = compaction;
const ColumnFamilyData* cfd = c->column_family_data();
subcompaction_job_info.cf_id = cfd->GetID();
subcompaction_job_info.cf_name = cfd->GetName();
subcompaction_job_info.status = status;
subcompaction_job_info.subcompaction_job_id = static_cast<int>(sub_job_id);
subcompaction_job_info.base_input_level = c->start_level();
subcompaction_job_info.output_level = c->output_level();
subcompaction_job_info.stats = compaction_job_stats;
}
#endif // !ROCKSDB_LITE
SubcompactionState() = delete;
SubcompactionState(const SubcompactionState&) = delete;
SubcompactionState& operator=(const SubcompactionState&) = delete;
SubcompactionState(Compaction* c, const std::optional<Slice> _start,
const std::optional<Slice> _end, uint32_t _sub_job_id)
: compaction(c),
start(_start),
end(_end),
sub_job_id(_sub_job_id),
compaction_outputs_(c, /*is_penultimate_level=*/false),
penultimate_level_outputs_(c, /*is_penultimate_level=*/true) {
assert(compaction != nullptr);
// Set output split key (used for RoundRobin feature) only for normal
// compaction_outputs, output to penultimate_level feature doesn't support
// RoundRobin feature (and may never going to be supported, because for
// RoundRobin, the data time is mostly naturally sorted, no need to have
// per-key placement with output_to_penultimate_level).
compaction_outputs_.SetOutputSlitKey(start, end);
}
SubcompactionState(SubcompactionState&& state) noexcept
: compaction(state.compaction),
start(state.start),
end(state.end),
status(std::move(state.status)),
io_status(std::move(state.io_status)),
notify_on_subcompaction_completion(
state.notify_on_subcompaction_completion),
compaction_job_stats(std::move(state.compaction_job_stats)),
sub_job_id(state.sub_job_id),
compaction_outputs_(std::move(state.compaction_outputs_)),
penultimate_level_outputs_(std::move(state.penultimate_level_outputs_)),
is_current_penultimate_level_(state.is_current_penultimate_level_),
has_penultimate_level_outputs_(state.has_penultimate_level_outputs_) {
current_outputs_ = is_current_penultimate_level_
? &penultimate_level_outputs_
: &compaction_outputs_;
}
bool HasPenultimateLevelOutputs() const {
return has_penultimate_level_outputs_ ||
penultimate_level_outputs_.HasRangeDel();
}
bool IsCurrentPenultimateLevel() const {
return is_current_penultimate_level_;
}
// Add all the new files from this compaction to version_edit
void AddOutputsEdit(VersionEdit* out_edit) const {
for (const auto& file : penultimate_level_outputs_.outputs_) {
out_edit->AddFile(compaction->GetPenultimateLevel(), file.meta);
}
for (const auto& file : compaction_outputs_.outputs_) {
out_edit->AddFile(compaction->output_level(), file.meta);
}
}
void Cleanup(Cache* cache);
void AggregateCompactionStats(
InternalStats::CompactionStatsFull& compaction_stats) const;
CompactionOutputs& Current() const {
assert(current_outputs_);
return *current_outputs_;
}
// Add compaction_iterator key/value to the `Current` output group.
Status AddToOutput(const CompactionIterator& iter,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func);
// Close all compaction output files, both output_to_penultimate_level outputs
// and normal outputs.
Status CloseCompactionFiles(const Status& curr_status,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
// Call FinishCompactionOutputFile() even if status is not ok: it needs to
// close the output file.
// CloseOutput() may open new compaction output files.
is_current_penultimate_level_ = true;
Status s = penultimate_level_outputs_.CloseOutput(
curr_status, open_file_func, close_file_func);
is_current_penultimate_level_ = false;
s = compaction_outputs_.CloseOutput(s, open_file_func, close_file_func);
return s;
}
private:
// State kept for output being generated
CompactionOutputs compaction_outputs_;
CompactionOutputs penultimate_level_outputs_;
CompactionOutputs* current_outputs_ = &compaction_outputs_;
bool is_current_penultimate_level_ = false;
bool has_penultimate_level_outputs_ = false;
};
} // namespace ROCKSDB_NAMESPACE