fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
329 lines
11 KiB
329 lines
11 KiB
2 years ago
|
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
||
|
//
|
||
|
// This source code is licensed under both the GPLv2 (found in the
|
||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||
|
// (found in the LICENSE.Apache file in the root directory).
|
||
|
//
|
||
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||
|
// Use of this source code is governed by a BSD-style license that can be
|
||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||
|
|
||
|
#pragma once
|
||
|
|
||
|
#include "db/blob/blob_garbage_meter.h"
|
||
|
#include "db/compaction/compaction.h"
|
||
|
#include "db/compaction/compaction_iterator.h"
|
||
|
#include "db/internal_stats.h"
|
||
|
#include "db/output_validator.h"
|
||
|
|
||
|
namespace ROCKSDB_NAMESPACE {
|
||
|
|
||
|
class CompactionOutputs;
|
||
|
using CompactionFileOpenFunc = std::function<Status(CompactionOutputs&)>;
|
||
|
using CompactionFileCloseFunc =
|
||
|
std::function<Status(CompactionOutputs&, const Status&, const Slice&)>;
|
||
|
|
||
|
// Files produced by subcompaction, most of the functions are used by
|
||
|
// compaction_job Open/Close compaction file functions.
|
||
|
class CompactionOutputs {
|
||
|
public:
|
||
|
// compaction output file
|
||
|
struct Output {
|
||
|
Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
|
||
|
bool _enable_order_check, bool _enable_hash, bool _finished,
|
||
|
uint64_t precalculated_hash)
|
||
|
: meta(std::move(_meta)),
|
||
|
validator(_icmp, _enable_order_check, _enable_hash,
|
||
|
precalculated_hash),
|
||
|
finished(_finished) {}
|
||
|
FileMetaData meta;
|
||
|
OutputValidator validator;
|
||
|
bool finished;
|
||
|
std::shared_ptr<const TableProperties> table_properties;
|
||
|
};
|
||
|
|
||
|
CompactionOutputs() = delete;
|
||
|
|
||
|
explicit CompactionOutputs(const Compaction* compaction,
|
||
|
const bool is_penultimate_level)
|
||
|
: compaction_(compaction), is_penultimate_level_(is_penultimate_level) {
|
||
|
partitioner_ = compaction->output_level() == 0
|
||
|
? nullptr
|
||
|
: compaction->CreateSstPartitioner();
|
||
|
}
|
||
|
|
||
|
// Add generated output to the list
|
||
|
void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp,
|
||
|
bool enable_order_check, bool enable_hash,
|
||
|
bool finished = false, uint64_t precalculated_hash = 0) {
|
||
|
outputs_.emplace_back(std::move(meta), icmp, enable_order_check,
|
||
|
enable_hash, finished, precalculated_hash);
|
||
|
}
|
||
|
|
||
|
// Set new table builder for the current output
|
||
|
void NewBuilder(const TableBuilderOptions& tboptions);
|
||
|
|
||
|
// Assign a new WritableFileWriter to the current output
|
||
|
void AssignFileWriter(WritableFileWriter* writer) {
|
||
|
file_writer_.reset(writer);
|
||
|
}
|
||
|
|
||
|
// TODO: Remove it when remote compaction support tiered compaction
|
||
|
void SetTotalBytes(uint64_t bytes) { stats_.bytes_written += bytes; }
|
||
|
void SetNumOutputRecords(uint64_t num) { stats_.num_output_records = num; }
|
||
|
|
||
|
// TODO: Move the BlobDB builder into CompactionOutputs
|
||
|
const std::vector<BlobFileAddition>& GetBlobFileAdditions() const {
|
||
|
if (is_penultimate_level_) {
|
||
|
assert(blob_file_additions_.empty());
|
||
|
}
|
||
|
return blob_file_additions_;
|
||
|
}
|
||
|
|
||
|
std::vector<BlobFileAddition>* GetBlobFileAdditionsPtr() {
|
||
|
assert(!is_penultimate_level_);
|
||
|
return &blob_file_additions_;
|
||
|
}
|
||
|
|
||
|
bool HasBlobFileAdditions() const { return !blob_file_additions_.empty(); }
|
||
|
|
||
|
BlobGarbageMeter* CreateBlobGarbageMeter() {
|
||
|
assert(!is_penultimate_level_);
|
||
|
blob_garbage_meter_ = std::make_unique<BlobGarbageMeter>();
|
||
|
return blob_garbage_meter_.get();
|
||
|
}
|
||
|
|
||
|
BlobGarbageMeter* GetBlobGarbageMeter() const {
|
||
|
if (is_penultimate_level_) {
|
||
|
// blobdb doesn't support per_key_placement yet
|
||
|
assert(blob_garbage_meter_ == nullptr);
|
||
|
return nullptr;
|
||
|
}
|
||
|
return blob_garbage_meter_.get();
|
||
|
}
|
||
|
|
||
|
void UpdateBlobStats() {
|
||
|
assert(!is_penultimate_level_);
|
||
|
stats_.num_output_files_blob = blob_file_additions_.size();
|
||
|
for (const auto& blob : blob_file_additions_) {
|
||
|
stats_.bytes_written_blob += blob.GetTotalBlobBytes();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Finish the current output file
|
||
|
Status Finish(const Status& intput_status);
|
||
|
|
||
|
// Update output table properties from table builder
|
||
|
void UpdateTableProperties() {
|
||
|
current_output().table_properties =
|
||
|
std::make_shared<TableProperties>(GetTableProperties());
|
||
|
}
|
||
|
|
||
|
IOStatus WriterSyncClose(const Status& intput_status, SystemClock* clock,
|
||
|
Statistics* statistics, bool use_fsync);
|
||
|
|
||
|
TableProperties GetTableProperties() {
|
||
|
return builder_->GetTableProperties();
|
||
|
}
|
||
|
|
||
|
Slice SmallestUserKey() const {
|
||
|
if (!outputs_.empty() && outputs_[0].finished) {
|
||
|
return outputs_[0].meta.smallest.user_key();
|
||
|
} else {
|
||
|
return Slice{nullptr, 0};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
Slice LargestUserKey() const {
|
||
|
if (!outputs_.empty() && outputs_.back().finished) {
|
||
|
return outputs_.back().meta.largest.user_key();
|
||
|
} else {
|
||
|
return Slice{nullptr, 0};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// In case the last output file is empty, which doesn't need to keep.
|
||
|
void RemoveLastEmptyOutput() {
|
||
|
if (!outputs_.empty() && !outputs_.back().meta.fd.file_size) {
|
||
|
// An error occurred, so ignore the last output.
|
||
|
outputs_.pop_back();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Remove the last output, for example the last output doesn't have data (no
|
||
|
// entry and no range-dels), but file_size might not be 0, as it has SST
|
||
|
// metadata.
|
||
|
void RemoveLastOutput() {
|
||
|
assert(!outputs_.empty());
|
||
|
outputs_.pop_back();
|
||
|
}
|
||
|
|
||
|
bool HasBuilder() const { return builder_ != nullptr; }
|
||
|
|
||
|
FileMetaData* GetMetaData() { return ¤t_output().meta; }
|
||
|
|
||
|
bool HasOutput() const { return !outputs_.empty(); }
|
||
|
|
||
|
uint64_t NumEntries() const { return builder_->NumEntries(); }
|
||
|
|
||
|
void ResetBuilder() {
|
||
|
builder_.reset();
|
||
|
current_output_file_size_ = 0;
|
||
|
}
|
||
|
|
||
|
// Add range-dels from the aggregator to the current output file
|
||
|
Status AddRangeDels(const Slice* comp_start, const Slice* comp_end,
|
||
|
CompactionIterationStats& range_del_out_stats,
|
||
|
bool bottommost_level, const InternalKeyComparator& icmp,
|
||
|
SequenceNumber earliest_snapshot,
|
||
|
const Slice& next_table_min_key);
|
||
|
|
||
|
// Is the current file is already pending for close
|
||
|
bool IsPendingClose() const { return pending_close_; }
|
||
|
|
||
|
// Current file should close before adding a new key
|
||
|
void SetPendingClose() { pending_close_ = true; }
|
||
|
|
||
|
// if the outputs have range delete, range delete is also data
|
||
|
bool HasRangeDel() const {
|
||
|
return range_del_agg_ && !range_del_agg_->IsEmpty();
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
friend class SubcompactionState;
|
||
|
|
||
|
void Cleanup() {
|
||
|
if (builder_ != nullptr) {
|
||
|
// May happen if we get a shutdown call in the middle of compaction
|
||
|
builder_->Abandon();
|
||
|
builder_.reset();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
uint64_t GetCurrentOutputFileSize() const {
|
||
|
return current_output_file_size_;
|
||
|
}
|
||
|
|
||
|
// Add curent key from compaction_iterator to the output file. If needed
|
||
|
// close and open new compaction output with the functions provided.
|
||
|
Status AddToOutput(const CompactionIterator& c_iter,
|
||
|
const CompactionFileOpenFunc& open_file_func,
|
||
|
const CompactionFileCloseFunc& close_file_func);
|
||
|
|
||
|
// Close the current output. `open_file_func` is needed for creating new file
|
||
|
// for range-dels only output file.
|
||
|
Status CloseOutput(const Status& curr_status,
|
||
|
const CompactionFileOpenFunc& open_file_func,
|
||
|
const CompactionFileCloseFunc& close_file_func) {
|
||
|
Status status = curr_status;
|
||
|
// handle subcompaction containing only range deletions
|
||
|
if (status.ok() && !HasBuilder() && !HasOutput() && HasRangeDel()) {
|
||
|
status = open_file_func(*this);
|
||
|
}
|
||
|
if (HasBuilder()) {
|
||
|
const Slice empty_key{};
|
||
|
Status s = close_file_func(*this, status, empty_key);
|
||
|
if (!s.ok() && status.ok()) {
|
||
|
status = s;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return status;
|
||
|
}
|
||
|
|
||
|
// This subcompaction's output could be empty if compaction was aborted before
|
||
|
// this subcompaction had a chance to generate any output files. When
|
||
|
// subcompactions are executed sequentially this is more likely and will be
|
||
|
// particularly likely for the later subcompactions to be empty. Once they are
|
||
|
// run in parallel however it should be much rarer.
|
||
|
// It's caller's responsibility to make sure it's not empty.
|
||
|
Output& current_output() {
|
||
|
assert(!outputs_.empty());
|
||
|
return outputs_.back();
|
||
|
}
|
||
|
|
||
|
// Assign the range_del_agg to the target output level. There's only one
|
||
|
// range-del-aggregator per compaction outputs, for
|
||
|
// output_to_penultimate_level compaction it is only assigned to the
|
||
|
// penultimate level.
|
||
|
void AssignRangeDelAggregator(
|
||
|
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
|
||
|
assert(range_del_agg_ == nullptr);
|
||
|
range_del_agg_ = std::move(range_del_agg);
|
||
|
}
|
||
|
|
||
|
const Compaction* compaction_;
|
||
|
|
||
|
// The current file is pending close, which needs to run `close_file_func()`
|
||
|
// first to add a new key.
|
||
|
bool pending_close_ = false;
|
||
|
|
||
|
// current output builder and writer
|
||
|
std::unique_ptr<TableBuilder> builder_;
|
||
|
std::unique_ptr<WritableFileWriter> file_writer_;
|
||
|
uint64_t current_output_file_size_ = 0;
|
||
|
|
||
|
// all the compaction outputs so far
|
||
|
std::vector<Output> outputs_;
|
||
|
|
||
|
// BlobDB info
|
||
|
std::vector<BlobFileAddition> blob_file_additions_;
|
||
|
std::unique_ptr<BlobGarbageMeter> blob_garbage_meter_;
|
||
|
|
||
|
// Basic compaction output stats for this level's outputs
|
||
|
InternalStats::CompactionOutputsStats stats_;
|
||
|
|
||
|
// indicate if this CompactionOutputs obj for penultimate_level, should always
|
||
|
// be false if per_key_placement feature is not enabled.
|
||
|
const bool is_penultimate_level_;
|
||
|
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_ = nullptr;
|
||
|
|
||
|
// partitioner information
|
||
|
std::string last_key_for_partitioner_;
|
||
|
std::unique_ptr<SstPartitioner> partitioner_;
|
||
|
};
|
||
|
|
||
|
// helper struct to concatenate the last level and penultimate level outputs
|
||
|
// which could be replaced by std::ranges::join_view() in c++20
|
||
|
struct OutputIterator {
|
||
|
public:
|
||
|
explicit OutputIterator(const std::vector<CompactionOutputs::Output>& a,
|
||
|
const std::vector<CompactionOutputs::Output>& b)
|
||
|
: a_(a), b_(b) {
|
||
|
within_a = !a_.empty();
|
||
|
idx_ = 0;
|
||
|
}
|
||
|
|
||
|
OutputIterator begin() { return *this; }
|
||
|
|
||
|
OutputIterator end() { return *this; }
|
||
|
|
||
|
size_t size() { return a_.size() + b_.size(); }
|
||
|
|
||
|
const CompactionOutputs::Output& operator*() const {
|
||
|
return within_a ? a_[idx_] : b_[idx_];
|
||
|
}
|
||
|
|
||
|
OutputIterator& operator++() {
|
||
|
idx_++;
|
||
|
if (within_a && idx_ >= a_.size()) {
|
||
|
within_a = false;
|
||
|
idx_ = 0;
|
||
|
}
|
||
|
assert(within_a || idx_ <= b_.size());
|
||
|
return *this;
|
||
|
}
|
||
|
|
||
|
bool operator!=(const OutputIterator& /*rhs*/) const {
|
||
|
return within_a || idx_ < b_.size();
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
const std::vector<CompactionOutputs::Output>& a_;
|
||
|
const std::vector<CompactionOutputs::Output>& b_;
|
||
|
bool within_a;
|
||
|
size_t idx_;
|
||
|
};
|
||
|
|
||
|
} // namespace ROCKSDB_NAMESPACE
|