fork of https://github.com/rust-rocksdb/rust-rocksdb for nextgraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
413 lines
15 KiB
413 lines
15 KiB
2 years ago
|
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
||
|
//
|
||
|
// This source code is licensed under both the GPLv2 (found in the
|
||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||
|
// (found in the LICENSE.Apache file in the root directory).
|
||
|
//
|
||
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||
|
// Use of this source code is governed by a BSD-style license that can be
|
||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||
|
|
||
|
#pragma once
|
||
|
|
||
|
#include "db/blob/blob_garbage_meter.h"
|
||
|
#include "db/compaction/compaction.h"
|
||
|
#include "db/compaction/compaction_iterator.h"
|
||
|
#include "db/internal_stats.h"
|
||
|
#include "db/output_validator.h"
|
||
|
|
||
|
namespace ROCKSDB_NAMESPACE {
|
||
|
|
||
|
class CompactionOutputs;
|
||
|
using CompactionFileOpenFunc = std::function<Status(CompactionOutputs&)>;
|
||
|
using CompactionFileCloseFunc =
|
||
|
std::function<Status(CompactionOutputs&, const Status&, const Slice&)>;
|
||
|
|
||
|
// Files produced by subcompaction, most of the functions are used by
|
||
|
// compaction_job Open/Close compaction file functions.
|
||
|
class CompactionOutputs {
|
||
|
public:
|
||
|
// compaction output file
|
||
|
struct Output {
|
||
|
Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
|
||
|
bool _enable_order_check, bool _enable_hash, bool _finished,
|
||
|
uint64_t precalculated_hash)
|
||
|
: meta(std::move(_meta)),
|
||
|
validator(_icmp, _enable_order_check, _enable_hash,
|
||
|
precalculated_hash),
|
||
|
finished(_finished) {}
|
||
|
FileMetaData meta;
|
||
|
OutputValidator validator;
|
||
|
bool finished;
|
||
|
std::shared_ptr<const TableProperties> table_properties;
|
||
|
};
|
||
|
|
||
|
CompactionOutputs() = delete;
|
||
|
|
||
|
explicit CompactionOutputs(const Compaction* compaction,
|
||
|
const bool is_penultimate_level);
|
||
|
|
||
|
// Add generated output to the list
|
||
|
void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp,
|
||
|
bool enable_order_check, bool enable_hash,
|
||
|
bool finished = false, uint64_t precalculated_hash = 0) {
|
||
|
outputs_.emplace_back(std::move(meta), icmp, enable_order_check,
|
||
|
enable_hash, finished, precalculated_hash);
|
||
|
}
|
||
|
|
||
|
// Set new table builder for the current output
|
||
|
void NewBuilder(const TableBuilderOptions& tboptions);
|
||
|
|
||
|
// Assign a new WritableFileWriter to the current output
|
||
|
void AssignFileWriter(WritableFileWriter* writer) {
|
||
|
file_writer_.reset(writer);
|
||
|
}
|
||
|
|
||
|
// TODO: Remove it when remote compaction support tiered compaction
|
||
|
void SetTotalBytes(uint64_t bytes) { stats_.bytes_written += bytes; }
|
||
|
void SetNumOutputRecords(uint64_t num) { stats_.num_output_records = num; }
|
||
|
|
||
|
// TODO: Move the BlobDB builder into CompactionOutputs
|
||
|
const std::vector<BlobFileAddition>& GetBlobFileAdditions() const {
|
||
|
if (is_penultimate_level_) {
|
||
|
assert(blob_file_additions_.empty());
|
||
|
}
|
||
|
return blob_file_additions_;
|
||
|
}
|
||
|
|
||
|
std::vector<BlobFileAddition>* GetBlobFileAdditionsPtr() {
|
||
|
assert(!is_penultimate_level_);
|
||
|
return &blob_file_additions_;
|
||
|
}
|
||
|
|
||
|
bool HasBlobFileAdditions() const { return !blob_file_additions_.empty(); }
|
||
|
|
||
|
BlobGarbageMeter* CreateBlobGarbageMeter() {
|
||
|
assert(!is_penultimate_level_);
|
||
|
blob_garbage_meter_ = std::make_unique<BlobGarbageMeter>();
|
||
|
return blob_garbage_meter_.get();
|
||
|
}
|
||
|
|
||
|
BlobGarbageMeter* GetBlobGarbageMeter() const {
|
||
|
if (is_penultimate_level_) {
|
||
|
// blobdb doesn't support per_key_placement yet
|
||
|
assert(blob_garbage_meter_ == nullptr);
|
||
|
return nullptr;
|
||
|
}
|
||
|
return blob_garbage_meter_.get();
|
||
|
}
|
||
|
|
||
|
void UpdateBlobStats() {
|
||
|
assert(!is_penultimate_level_);
|
||
|
stats_.num_output_files_blob = blob_file_additions_.size();
|
||
|
for (const auto& blob : blob_file_additions_) {
|
||
|
stats_.bytes_written_blob += blob.GetTotalBlobBytes();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Finish the current output file
|
||
|
Status Finish(const Status& intput_status,
|
||
|
const SeqnoToTimeMapping& seqno_time_mapping);
|
||
|
|
||
|
// Update output table properties from table builder
|
||
|
void UpdateTableProperties() {
|
||
|
current_output().table_properties =
|
||
|
std::make_shared<TableProperties>(GetTableProperties());
|
||
|
}
|
||
|
|
||
|
IOStatus WriterSyncClose(const Status& intput_status, SystemClock* clock,
|
||
|
Statistics* statistics, bool use_fsync);
|
||
|
|
||
|
TableProperties GetTableProperties() {
|
||
|
return builder_->GetTableProperties();
|
||
|
}
|
||
|
|
||
|
Slice SmallestUserKey() const {
|
||
|
if (!outputs_.empty() && outputs_[0].finished) {
|
||
|
return outputs_[0].meta.smallest.user_key();
|
||
|
} else {
|
||
|
return Slice{nullptr, 0};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
Slice LargestUserKey() const {
|
||
|
if (!outputs_.empty() && outputs_.back().finished) {
|
||
|
return outputs_.back().meta.largest.user_key();
|
||
|
} else {
|
||
|
return Slice{nullptr, 0};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// In case the last output file is empty, which doesn't need to keep.
|
||
|
void RemoveLastEmptyOutput() {
|
||
|
if (!outputs_.empty() && !outputs_.back().meta.fd.file_size) {
|
||
|
// An error occurred, so ignore the last output.
|
||
|
outputs_.pop_back();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Remove the last output, for example the last output doesn't have data (no
|
||
|
// entry and no range-dels), but file_size might not be 0, as it has SST
|
||
|
// metadata.
|
||
|
void RemoveLastOutput() {
|
||
|
assert(!outputs_.empty());
|
||
|
outputs_.pop_back();
|
||
|
}
|
||
|
|
||
|
bool HasBuilder() const { return builder_ != nullptr; }
|
||
|
|
||
|
FileMetaData* GetMetaData() { return ¤t_output().meta; }
|
||
|
|
||
|
bool HasOutput() const { return !outputs_.empty(); }
|
||
|
|
||
|
uint64_t NumEntries() const { return builder_->NumEntries(); }
|
||
|
|
||
|
void ResetBuilder() {
|
||
|
builder_.reset();
|
||
|
current_output_file_size_ = 0;
|
||
|
}
|
||
|
|
||
|
// Add range deletions from the range_del_agg_ to the current output file.
|
||
|
// Input parameters, `range_tombstone_lower_bound_` and current output's
|
||
|
// metadata determine the bounds on range deletions to add. Updates output
|
||
|
// file metadata boundary if extended by range tombstones.
|
||
|
//
|
||
|
// @param comp_start_user_key and comp_end_user_key include timestamp if
|
||
|
// user-defined timestamp is enabled. Their timestamp should be max timestamp.
|
||
|
// @param next_table_min_key internal key lower bound for the next compaction
|
||
|
// output.
|
||
|
// @param full_history_ts_low used for range tombstone garbage collection.
|
||
|
Status AddRangeDels(const Slice* comp_start_user_key,
|
||
|
const Slice* comp_end_user_key,
|
||
|
CompactionIterationStats& range_del_out_stats,
|
||
|
bool bottommost_level, const InternalKeyComparator& icmp,
|
||
|
SequenceNumber earliest_snapshot,
|
||
|
const Slice& next_table_min_key,
|
||
|
const std::string& full_history_ts_low);
|
||
|
|
||
|
// if the outputs have range delete, range delete is also data
|
||
|
bool HasRangeDel() const {
|
||
|
return range_del_agg_ && !range_del_agg_->IsEmpty();
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
friend class SubcompactionState;
|
||
|
|
||
|
void FillFilesToCutForTtl();
|
||
|
|
||
|
void SetOutputSlitKey(const std::optional<Slice> start,
|
||
|
const std::optional<Slice> end) {
|
||
|
const InternalKeyComparator* icmp =
|
||
|
&compaction_->column_family_data()->internal_comparator();
|
||
|
|
||
|
const InternalKey* output_split_key = compaction_->GetOutputSplitKey();
|
||
|
// Invalid output_split_key indicates that we do not need to split
|
||
|
if (output_split_key != nullptr) {
|
||
|
// We may only split the output when the cursor is in the range. Split
|
||
|
if ((!end.has_value() ||
|
||
|
icmp->user_comparator()->Compare(
|
||
|
ExtractUserKey(output_split_key->Encode()), *end) < 0) &&
|
||
|
(!start.has_value() ||
|
||
|
icmp->user_comparator()->Compare(
|
||
|
ExtractUserKey(output_split_key->Encode()), *start) > 0)) {
|
||
|
local_output_split_key_ = output_split_key;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Returns true iff we should stop building the current output
|
||
|
// before processing the current key in compaction iterator.
|
||
|
bool ShouldStopBefore(const CompactionIterator& c_iter);
|
||
|
|
||
|
void Cleanup() {
|
||
|
if (builder_ != nullptr) {
|
||
|
// May happen if we get a shutdown call in the middle of compaction
|
||
|
builder_->Abandon();
|
||
|
builder_.reset();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Updates states related to file cutting for TTL.
|
||
|
// Returns a boolean value indicating whether the current
|
||
|
// compaction output file should be cut before `internal_key`.
|
||
|
//
|
||
|
// @param internal_key the current key to be added to output.
|
||
|
bool UpdateFilesToCutForTTLStates(const Slice& internal_key);
|
||
|
|
||
|
// update tracked grandparents information like grandparent index, if it's
|
||
|
// in the gap between 2 grandparent files, accumulated grandparent files size
|
||
|
// etc.
|
||
|
// It returns how many boundaries it crosses by including current key.
|
||
|
size_t UpdateGrandparentBoundaryInfo(const Slice& internal_key);
|
||
|
|
||
|
// helper function to get the overlapped grandparent files size, it's only
|
||
|
// used for calculating the first key's overlap.
|
||
|
uint64_t GetCurrentKeyGrandparentOverlappedBytes(
|
||
|
const Slice& internal_key) const;
|
||
|
|
||
|
// Add current key from compaction_iterator to the output file. If needed
|
||
|
// close and open new compaction output with the functions provided.
|
||
|
Status AddToOutput(const CompactionIterator& c_iter,
|
||
|
const CompactionFileOpenFunc& open_file_func,
|
||
|
const CompactionFileCloseFunc& close_file_func);
|
||
|
|
||
|
// Close the current output. `open_file_func` is needed for creating new file
|
||
|
// for range-dels only output file.
|
||
|
Status CloseOutput(const Status& curr_status,
|
||
|
const CompactionFileOpenFunc& open_file_func,
|
||
|
const CompactionFileCloseFunc& close_file_func) {
|
||
|
Status status = curr_status;
|
||
|
// handle subcompaction containing only range deletions
|
||
|
if (status.ok() && !HasBuilder() && !HasOutput() && HasRangeDel()) {
|
||
|
status = open_file_func(*this);
|
||
|
}
|
||
|
if (HasBuilder()) {
|
||
|
const Slice empty_key{};
|
||
|
Status s = close_file_func(*this, status, empty_key);
|
||
|
if (!s.ok() && status.ok()) {
|
||
|
status = s;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return status;
|
||
|
}
|
||
|
|
||
|
// This subcompaction's output could be empty if compaction was aborted before
|
||
|
// this subcompaction had a chance to generate any output files. When
|
||
|
// subcompactions are executed sequentially this is more likely and will be
|
||
|
// particularly likely for the later subcompactions to be empty. Once they are
|
||
|
// run in parallel however it should be much rarer.
|
||
|
// It's caller's responsibility to make sure it's not empty.
|
||
|
Output& current_output() {
|
||
|
assert(!outputs_.empty());
|
||
|
return outputs_.back();
|
||
|
}
|
||
|
|
||
|
// Assign the range_del_agg to the target output level. There's only one
|
||
|
// range-del-aggregator per compaction outputs, for
|
||
|
// output_to_penultimate_level compaction it is only assigned to the
|
||
|
// penultimate level.
|
||
|
void AssignRangeDelAggregator(
|
||
|
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
|
||
|
assert(range_del_agg_ == nullptr);
|
||
|
range_del_agg_ = std::move(range_del_agg);
|
||
|
}
|
||
|
|
||
|
const Compaction* compaction_;
|
||
|
|
||
|
// current output builder and writer
|
||
|
std::unique_ptr<TableBuilder> builder_;
|
||
|
std::unique_ptr<WritableFileWriter> file_writer_;
|
||
|
uint64_t current_output_file_size_ = 0;
|
||
|
|
||
|
// all the compaction outputs so far
|
||
|
std::vector<Output> outputs_;
|
||
|
|
||
|
// BlobDB info
|
||
|
std::vector<BlobFileAddition> blob_file_additions_;
|
||
|
std::unique_ptr<BlobGarbageMeter> blob_garbage_meter_;
|
||
|
|
||
|
// Basic compaction output stats for this level's outputs
|
||
|
InternalStats::CompactionOutputsStats stats_;
|
||
|
|
||
|
// indicate if this CompactionOutputs obj for penultimate_level, should always
|
||
|
// be false if per_key_placement feature is not enabled.
|
||
|
const bool is_penultimate_level_;
|
||
|
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_ = nullptr;
|
||
|
|
||
|
// partitioner information
|
||
|
std::string last_key_for_partitioner_;
|
||
|
std::unique_ptr<SstPartitioner> partitioner_;
|
||
|
|
||
|
// A flag determines if this subcompaction has been split by the cursor
|
||
|
// for RoundRobin compaction
|
||
|
bool is_split_ = false;
|
||
|
|
||
|
// We also maintain the output split key for each subcompaction to avoid
|
||
|
// repetitive comparison in ShouldStopBefore()
|
||
|
const InternalKey* local_output_split_key_ = nullptr;
|
||
|
|
||
|
// Some identified files with old oldest ancester time and the range should be
|
||
|
// isolated out so that the output file(s) in that range can be merged down
|
||
|
// for TTL and clear the timestamps for the range.
|
||
|
std::vector<FileMetaData*> files_to_cut_for_ttl_;
|
||
|
int cur_files_to_cut_for_ttl_ = -1;
|
||
|
int next_files_to_cut_for_ttl_ = 0;
|
||
|
|
||
|
// An index that used to speed up ShouldStopBefore().
|
||
|
size_t grandparent_index_ = 0;
|
||
|
|
||
|
// if the output key is being grandparent files gap, so:
|
||
|
// key > grandparents[grandparent_index_ - 1].largest &&
|
||
|
// key < grandparents[grandparent_index_].smallest
|
||
|
bool being_grandparent_gap_ = true;
|
||
|
|
||
|
// The number of bytes overlapping between the current output and
|
||
|
// grandparent files used in ShouldStopBefore().
|
||
|
uint64_t grandparent_overlapped_bytes_ = 0;
|
||
|
|
||
|
// A flag determines whether the key has been seen in ShouldStopBefore()
|
||
|
bool seen_key_ = false;
|
||
|
|
||
|
// for the current output file, how many file boundaries has it crossed,
|
||
|
// basically number of files overlapped * 2
|
||
|
size_t grandparent_boundary_switched_num_ = 0;
|
||
|
|
||
|
// The smallest key of the current output file, this is set when current
|
||
|
// output file's smallest key is a range tombstone start key.
|
||
|
InternalKey range_tombstone_lower_bound_;
|
||
|
|
||
|
// Used for calls to compaction->KeyRangeNotExistsBeyondOutputLevel() in
|
||
|
// CompactionOutputs::AddRangeDels().
|
||
|
// level_ptrs_[i] holds index of the file that was checked during the last
|
||
|
// call to compaction->KeyRangeNotExistsBeyondOutputLevel(). This allows
|
||
|
// future calls to the function to pick up where it left off, since each
|
||
|
// range tombstone added to output file within each subcompaction is in
|
||
|
// increasing key range.
|
||
|
std::vector<size_t> level_ptrs_;
|
||
|
};
|
||
|
|
||
|
// helper struct to concatenate the last level and penultimate level outputs
|
||
|
// which could be replaced by std::ranges::join_view() in c++20
|
||
|
struct OutputIterator {
|
||
|
public:
|
||
|
explicit OutputIterator(const std::vector<CompactionOutputs::Output>& a,
|
||
|
const std::vector<CompactionOutputs::Output>& b)
|
||
|
: a_(a), b_(b) {
|
||
|
within_a = !a_.empty();
|
||
|
idx_ = 0;
|
||
|
}
|
||
|
|
||
|
OutputIterator begin() { return *this; }
|
||
|
|
||
|
OutputIterator end() { return *this; }
|
||
|
|
||
|
size_t size() { return a_.size() + b_.size(); }
|
||
|
|
||
|
const CompactionOutputs::Output& operator*() const {
|
||
|
return within_a ? a_[idx_] : b_[idx_];
|
||
|
}
|
||
|
|
||
|
OutputIterator& operator++() {
|
||
|
idx_++;
|
||
|
if (within_a && idx_ >= a_.size()) {
|
||
|
within_a = false;
|
||
|
idx_ = 0;
|
||
|
}
|
||
|
assert(within_a || idx_ <= b_.size());
|
||
|
return *this;
|
||
|
}
|
||
|
|
||
|
bool operator!=(const OutputIterator& /*rhs*/) const {
|
||
|
return within_a || idx_ < b_.size();
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
const std::vector<CompactionOutputs::Output>& a_;
|
||
|
const std::vector<CompactionOutputs::Output>& b_;
|
||
|
bool within_a;
|
||
|
size_t idx_;
|
||
|
};
|
||
|
|
||
|
} // namespace ROCKSDB_NAMESPACE
|