// Copyright (c) Meta Platforms, Inc. and affiliates. // // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once #include #include "db/blob/blob_file_addition.h" #include "db/blob/blob_garbage_meter.h" #include "db/compaction/compaction.h" #include "db/compaction/compaction_iterator.h" #include "db/compaction/compaction_outputs.h" #include "db/internal_stats.h" #include "db/output_validator.h" #include "db/range_del_aggregator.h" namespace ROCKSDB_NAMESPACE { // Maintains state and outputs for each sub-compaction // It contains 2 `CompactionOutputs`: // 1. one for the normal output files // 2. another for the penultimate level outputs // a `current` pointer maintains the current output group, when calling // `AddToOutput()`, it checks the output of the current compaction_iterator key // and point `current` to the target output group. By default, it just points to // normal compaction_outputs, if the compaction_iterator key should be placed on // the penultimate level, `current` is changed to point to // `penultimate_level_outputs`. // The later operations uses `Current()` to get the target group. // // +----------+ +-----------------------------+ +---------+ // | *current |--------> | compaction_outputs |----->| output | // +----------+ +-----------------------------+ +---------+ // | | output | // | +---------+ // | | ... | // | // | +-----------------------------+ +---------+ // +-------------> | penultimate_level_outputs |----->| output | // +-----------------------------+ +---------+ // | ... | class SubcompactionState { public: const Compaction* compaction; // The boundaries of the key-range this compaction is interested in. No two // sub-compactions may have overlapping key-ranges. // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded const std::optional start, end; // The return status of this sub-compaction Status status; // The return IO Status of this sub-compaction IOStatus io_status; // Notify on sub-compaction completion only if listener was notified on // sub-compaction begin. bool notify_on_subcompaction_completion = false; // compaction job stats for this sub-compaction CompactionJobStats compaction_job_stats; // sub-compaction job id, which is used to identify different sub-compaction // within the same compaction job. const uint32_t sub_job_id; Slice SmallestUserKey() const; Slice LargestUserKey() const; // Get all outputs from the subcompaction. For per_key_placement compaction, // it returns both the last level outputs and penultimate level outputs. OutputIterator GetOutputs() const; // Assign range dels aggregator, for each range_del, it can only be assigned // to one output level, for per_key_placement, it's going to be the // penultimate level. void AssignRangeDelAggregator( std::unique_ptr&& range_del_agg) { if (compaction->SupportsPerKeyPlacement()) { penultimate_level_outputs_.AssignRangeDelAggregator( std::move(range_del_agg)); } else { compaction_outputs_.AssignRangeDelAggregator(std::move(range_del_agg)); } } void RemoveLastEmptyOutput() { compaction_outputs_.RemoveLastEmptyOutput(); penultimate_level_outputs_.RemoveLastEmptyOutput(); } void BuildSubcompactionJobInfo( SubcompactionJobInfo& subcompaction_job_info) const { const Compaction* c = compaction; const ColumnFamilyData* cfd = c->column_family_data(); subcompaction_job_info.cf_id = cfd->GetID(); subcompaction_job_info.cf_name = cfd->GetName(); subcompaction_job_info.status = status; subcompaction_job_info.subcompaction_job_id = static_cast(sub_job_id); subcompaction_job_info.base_input_level = c->start_level(); subcompaction_job_info.output_level = c->output_level(); subcompaction_job_info.stats = compaction_job_stats; } SubcompactionState() = delete; SubcompactionState(const SubcompactionState&) = delete; SubcompactionState& operator=(const SubcompactionState&) = delete; SubcompactionState(Compaction* c, const std::optional _start, const std::optional _end, uint32_t _sub_job_id) : compaction(c), start(_start), end(_end), sub_job_id(_sub_job_id), compaction_outputs_(c, /*is_penultimate_level=*/false), penultimate_level_outputs_(c, /*is_penultimate_level=*/true) { assert(compaction != nullptr); // Set output split key (used for RoundRobin feature) only for normal // compaction_outputs, output to penultimate_level feature doesn't support // RoundRobin feature (and may never going to be supported, because for // RoundRobin, the data time is mostly naturally sorted, no need to have // per-key placement with output_to_penultimate_level). compaction_outputs_.SetOutputSlitKey(start, end); } SubcompactionState(SubcompactionState&& state) noexcept : compaction(state.compaction), start(state.start), end(state.end), status(std::move(state.status)), io_status(std::move(state.io_status)), notify_on_subcompaction_completion( state.notify_on_subcompaction_completion), compaction_job_stats(std::move(state.compaction_job_stats)), sub_job_id(state.sub_job_id), compaction_outputs_(std::move(state.compaction_outputs_)), penultimate_level_outputs_(std::move(state.penultimate_level_outputs_)), is_current_penultimate_level_(state.is_current_penultimate_level_), has_penultimate_level_outputs_(state.has_penultimate_level_outputs_) { current_outputs_ = is_current_penultimate_level_ ? &penultimate_level_outputs_ : &compaction_outputs_; } bool HasPenultimateLevelOutputs() const { return has_penultimate_level_outputs_ || penultimate_level_outputs_.HasRangeDel(); } bool IsCurrentPenultimateLevel() const { return is_current_penultimate_level_; } // Add all the new files from this compaction to version_edit void AddOutputsEdit(VersionEdit* out_edit) const { for (const auto& file : penultimate_level_outputs_.outputs_) { out_edit->AddFile(compaction->GetPenultimateLevel(), file.meta); } for (const auto& file : compaction_outputs_.outputs_) { out_edit->AddFile(compaction->output_level(), file.meta); } } void Cleanup(Cache* cache); void AggregateCompactionStats( InternalStats::CompactionStatsFull& compaction_stats) const; CompactionOutputs& Current() const { assert(current_outputs_); return *current_outputs_; } // Add compaction_iterator key/value to the `Current` output group. Status AddToOutput(const CompactionIterator& iter, const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func); // Close all compaction output files, both output_to_penultimate_level outputs // and normal outputs. Status CloseCompactionFiles(const Status& curr_status, const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func) { // Call FinishCompactionOutputFile() even if status is not ok: it needs to // close the output file. // CloseOutput() may open new compaction output files. is_current_penultimate_level_ = true; Status s = penultimate_level_outputs_.CloseOutput( curr_status, open_file_func, close_file_func); is_current_penultimate_level_ = false; s = compaction_outputs_.CloseOutput(s, open_file_func, close_file_func); return s; } private: // State kept for output being generated CompactionOutputs compaction_outputs_; CompactionOutputs penultimate_level_outputs_; CompactionOutputs* current_outputs_ = &compaction_outputs_; bool is_current_penultimate_level_ = false; bool has_penultimate_level_outputs_ = false; }; } // namespace ROCKSDB_NAMESPACE