fork of https://github.com/rust-rocksdb/rust-rocksdb for nextgraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1446 lines
57 KiB
1446 lines
57 KiB
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/compaction/compaction_iterator.h"
|
|
|
|
#include <iterator>
|
|
#include <limits>
|
|
|
|
#include "db/blob/blob_fetcher.h"
|
|
#include "db/blob/blob_file_builder.h"
|
|
#include "db/blob/blob_index.h"
|
|
#include "db/blob/prefetch_buffer_collection.h"
|
|
#include "db/snapshot_checker.h"
|
|
#include "db/wide/wide_column_serialization.h"
|
|
#include "logging/logging.h"
|
|
#include "port/likely.h"
|
|
#include "rocksdb/listener.h"
|
|
#include "table/internal_iterator.h"
|
|
#include "test_util/sync_point.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
CompactionIterator::CompactionIterator(
|
|
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
|
|
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
|
|
SequenceNumber earliest_write_conflict_snapshot,
|
|
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
|
|
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
|
|
CompactionRangeDelAggregator* range_del_agg,
|
|
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
|
|
bool enforce_single_del_contracts,
|
|
const std::atomic<bool>& manual_compaction_canceled,
|
|
bool must_count_input_entries, const Compaction* compaction,
|
|
const CompactionFilter* compaction_filter,
|
|
const std::atomic<bool>* shutting_down,
|
|
const std::shared_ptr<Logger> info_log,
|
|
const std::string* full_history_ts_low,
|
|
const SequenceNumber preserve_time_min_seqno,
|
|
const SequenceNumber preclude_last_level_min_seqno)
|
|
: CompactionIterator(
|
|
input, cmp, merge_helper, last_sequence, snapshots,
|
|
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
|
|
report_detailed_time, expect_valid_internal_key, range_del_agg,
|
|
blob_file_builder, allow_data_in_errors, enforce_single_del_contracts,
|
|
manual_compaction_canceled,
|
|
std::unique_ptr<CompactionProxy>(
|
|
compaction ? new RealCompaction(compaction) : nullptr),
|
|
must_count_input_entries, compaction_filter, shutting_down, info_log,
|
|
full_history_ts_low, preserve_time_min_seqno,
|
|
preclude_last_level_min_seqno) {}
|
|
|
|
CompactionIterator::CompactionIterator(
|
|
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
|
|
SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
|
|
SequenceNumber earliest_write_conflict_snapshot,
|
|
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
|
|
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
|
|
CompactionRangeDelAggregator* range_del_agg,
|
|
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
|
|
bool enforce_single_del_contracts,
|
|
const std::atomic<bool>& manual_compaction_canceled,
|
|
std::unique_ptr<CompactionProxy> compaction, bool must_count_input_entries,
|
|
const CompactionFilter* compaction_filter,
|
|
const std::atomic<bool>* shutting_down,
|
|
const std::shared_ptr<Logger> info_log,
|
|
const std::string* full_history_ts_low,
|
|
const SequenceNumber preserve_time_min_seqno,
|
|
const SequenceNumber preclude_last_level_min_seqno)
|
|
: input_(input, cmp, must_count_input_entries),
|
|
cmp_(cmp),
|
|
merge_helper_(merge_helper),
|
|
snapshots_(snapshots),
|
|
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
|
|
job_snapshot_(job_snapshot),
|
|
snapshot_checker_(snapshot_checker),
|
|
env_(env),
|
|
clock_(env_->GetSystemClock().get()),
|
|
report_detailed_time_(report_detailed_time),
|
|
expect_valid_internal_key_(expect_valid_internal_key),
|
|
range_del_agg_(range_del_agg),
|
|
blob_file_builder_(blob_file_builder),
|
|
compaction_(std::move(compaction)),
|
|
compaction_filter_(compaction_filter),
|
|
shutting_down_(shutting_down),
|
|
manual_compaction_canceled_(manual_compaction_canceled),
|
|
bottommost_level_(!compaction_ ? false
|
|
: compaction_->bottommost_level() &&
|
|
!compaction_->allow_ingest_behind()),
|
|
// snapshots_ cannot be nullptr, but we will assert later in the body of
|
|
// the constructor.
|
|
visible_at_tip_(snapshots_ ? snapshots_->empty() : false),
|
|
earliest_snapshot_(!snapshots_ || snapshots_->empty()
|
|
? kMaxSequenceNumber
|
|
: snapshots_->at(0)),
|
|
info_log_(info_log),
|
|
allow_data_in_errors_(allow_data_in_errors),
|
|
enforce_single_del_contracts_(enforce_single_del_contracts),
|
|
timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
|
|
full_history_ts_low_(full_history_ts_low),
|
|
current_user_key_sequence_(0),
|
|
current_user_key_snapshot_(0),
|
|
merge_out_iter_(merge_helper_),
|
|
blob_garbage_collection_cutoff_file_number_(
|
|
ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())),
|
|
blob_fetcher_(CreateBlobFetcherIfNeeded(compaction_.get())),
|
|
prefetch_buffers_(
|
|
CreatePrefetchBufferCollectionIfNeeded(compaction_.get())),
|
|
current_key_committed_(false),
|
|
cmp_with_history_ts_low_(0),
|
|
level_(compaction_ == nullptr ? 0 : compaction_->level()),
|
|
preserve_time_min_seqno_(preserve_time_min_seqno),
|
|
preclude_last_level_min_seqno_(preclude_last_level_min_seqno) {
|
|
assert(snapshots_ != nullptr);
|
|
assert(preserve_time_min_seqno_ <= preclude_last_level_min_seqno_);
|
|
|
|
if (compaction_ != nullptr) {
|
|
level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
|
|
}
|
|
#ifndef NDEBUG
|
|
// findEarliestVisibleSnapshot assumes this ordering.
|
|
for (size_t i = 1; i < snapshots_->size(); ++i) {
|
|
assert(snapshots_->at(i - 1) < snapshots_->at(i));
|
|
}
|
|
assert(timestamp_size_ == 0 || !full_history_ts_low_ ||
|
|
timestamp_size_ == full_history_ts_low_->size());
|
|
#endif
|
|
input_.SetPinnedItersMgr(&pinned_iters_mgr_);
|
|
// The default `merge_until_status_` does not need to be checked since it is
|
|
// overwritten as soon as `MergeUntil()` is called
|
|
merge_until_status_.PermitUncheckedError();
|
|
TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
|
|
}
|
|
|
|
CompactionIterator::~CompactionIterator() {
|
|
// input_ Iterator lifetime is longer than pinned_iters_mgr_ lifetime
|
|
input_.SetPinnedItersMgr(nullptr);
|
|
}
|
|
|
|
void CompactionIterator::ResetRecordCounts() {
|
|
iter_stats_.num_record_drop_user = 0;
|
|
iter_stats_.num_record_drop_hidden = 0;
|
|
iter_stats_.num_record_drop_obsolete = 0;
|
|
iter_stats_.num_record_drop_range_del = 0;
|
|
iter_stats_.num_range_del_drop_obsolete = 0;
|
|
iter_stats_.num_optimized_del_drop_obsolete = 0;
|
|
}
|
|
|
|
void CompactionIterator::SeekToFirst() {
|
|
NextFromInput();
|
|
PrepareOutput();
|
|
}
|
|
|
|
void CompactionIterator::Next() {
|
|
// If there is a merge output, return it before continuing to process the
|
|
// input.
|
|
if (merge_out_iter_.Valid()) {
|
|
merge_out_iter_.Next();
|
|
|
|
// Check if we returned all records of the merge output.
|
|
if (merge_out_iter_.Valid()) {
|
|
key_ = merge_out_iter_.key();
|
|
value_ = merge_out_iter_.value();
|
|
Status s = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
|
|
// MergeUntil stops when it encounters a corrupt key and does not
|
|
// include them in the result, so we expect the keys here to be valid.
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_FATAL(
|
|
info_log_, "Invalid ikey %s in compaction. %s",
|
|
allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden",
|
|
s.getState());
|
|
assert(false);
|
|
}
|
|
|
|
// Keep current_key_ in sync.
|
|
if (0 == timestamp_size_) {
|
|
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
|
|
} else {
|
|
Slice ts = ikey_.GetTimestamp(timestamp_size_);
|
|
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type, &ts);
|
|
}
|
|
key_ = current_key_.GetInternalKey();
|
|
ikey_.user_key = current_key_.GetUserKey();
|
|
validity_info_.SetValid(ValidContext::kMerge1);
|
|
} else {
|
|
if (merge_until_status_.IsMergeInProgress()) {
|
|
// `Status::MergeInProgress()` tells us that the previous `MergeUntil()`
|
|
// produced only merge operands. Those merge operands were accessed and
|
|
// written out using `merge_out_iter_`. Since `merge_out_iter_` is
|
|
// exhausted at this point, all merge operands have been written out.
|
|
//
|
|
// Still, there may be a base value (PUT, DELETE, SINGLEDEL, etc.) that
|
|
// needs to be written out. Normally, `CompactionIterator` would skip it
|
|
// on the basis that it has already output something in the same
|
|
// snapshot stripe. To prevent this, we reset `has_current_user_key_` to
|
|
// trick the future iteration from finding out the snapshot stripe is
|
|
// unchanged.
|
|
has_current_user_key_ = false;
|
|
}
|
|
// We consumed all pinned merge operands, release pinned iterators
|
|
pinned_iters_mgr_.ReleasePinnedData();
|
|
// MergeHelper moves the iterator to the first record after the merged
|
|
// records, so even though we reached the end of the merge output, we do
|
|
// not want to advance the iterator.
|
|
NextFromInput();
|
|
}
|
|
} else {
|
|
// Only advance the input iterator if there is no merge output and the
|
|
// iterator is not already at the next record.
|
|
if (!at_next_) {
|
|
AdvanceInputIter();
|
|
}
|
|
NextFromInput();
|
|
}
|
|
|
|
if (Valid()) {
|
|
// Record that we've outputted a record for the current key.
|
|
has_outputted_key_ = true;
|
|
}
|
|
|
|
PrepareOutput();
|
|
}
|
|
|
|
bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
|
|
Slice* skip_until) {
|
|
if (!compaction_filter_) {
|
|
return true;
|
|
}
|
|
|
|
if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
|
|
ikey_.type != kTypeWideColumnEntity) {
|
|
return true;
|
|
}
|
|
|
|
CompactionFilter::Decision decision =
|
|
CompactionFilter::Decision::kUndetermined;
|
|
CompactionFilter::ValueType value_type =
|
|
ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
|
|
: ikey_.type == kTypeBlobIndex
|
|
? CompactionFilter::ValueType::kBlobIndex
|
|
: CompactionFilter::ValueType::kWideColumnEntity;
|
|
|
|
// Hack: pass internal key to BlobIndexCompactionFilter since it needs
|
|
// to get sequence number.
|
|
assert(compaction_filter_);
|
|
const Slice& filter_key =
|
|
(ikey_.type != kTypeBlobIndex ||
|
|
!compaction_filter_->IsStackedBlobDbInternalCompactionFilter())
|
|
? ikey_.user_key
|
|
: key_;
|
|
|
|
compaction_filter_value_.clear();
|
|
compaction_filter_skip_until_.Clear();
|
|
|
|
std::vector<std::pair<std::string, std::string>> new_columns;
|
|
|
|
{
|
|
StopWatchNano timer(clock_, report_detailed_time_);
|
|
|
|
if (ikey_.type == kTypeBlobIndex) {
|
|
decision = compaction_filter_->FilterBlobByKey(
|
|
level_, filter_key, &compaction_filter_value_,
|
|
compaction_filter_skip_until_.rep());
|
|
if (decision == CompactionFilter::Decision::kUndetermined &&
|
|
!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
|
|
if (!compaction_) {
|
|
status_ =
|
|
Status::Corruption("Unexpected blob index outside of compaction");
|
|
validity_info_.Invalidate();
|
|
return false;
|
|
}
|
|
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
|
|
&value_);
|
|
|
|
// For integrated BlobDB impl, CompactionIterator reads blob value.
|
|
// For Stacked BlobDB impl, the corresponding CompactionFilter's
|
|
// FilterV2 method should read the blob value.
|
|
BlobIndex blob_index;
|
|
Status s = blob_index.DecodeFrom(value_);
|
|
if (!s.ok()) {
|
|
status_ = s;
|
|
validity_info_.Invalidate();
|
|
return false;
|
|
}
|
|
|
|
FilePrefetchBuffer* prefetch_buffer =
|
|
prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
|
|
blob_index.file_number())
|
|
: nullptr;
|
|
|
|
uint64_t bytes_read = 0;
|
|
|
|
assert(blob_fetcher_);
|
|
|
|
s = blob_fetcher_->FetchBlob(ikey_.user_key, blob_index,
|
|
prefetch_buffer, &blob_value_,
|
|
&bytes_read);
|
|
if (!s.ok()) {
|
|
status_ = s;
|
|
validity_info_.Invalidate();
|
|
return false;
|
|
}
|
|
|
|
++iter_stats_.num_blobs_read;
|
|
iter_stats_.total_blob_bytes_read += bytes_read;
|
|
|
|
value_type = CompactionFilter::ValueType::kValue;
|
|
}
|
|
}
|
|
|
|
if (decision == CompactionFilter::Decision::kUndetermined) {
|
|
const Slice* existing_val = nullptr;
|
|
const WideColumns* existing_col = nullptr;
|
|
|
|
WideColumns existing_columns;
|
|
|
|
if (ikey_.type != kTypeWideColumnEntity) {
|
|
if (!blob_value_.empty()) {
|
|
existing_val = &blob_value_;
|
|
} else {
|
|
existing_val = &value_;
|
|
}
|
|
} else {
|
|
Slice value_copy = value_;
|
|
const Status s =
|
|
WideColumnSerialization::Deserialize(value_copy, existing_columns);
|
|
|
|
if (!s.ok()) {
|
|
status_ = s;
|
|
validity_info_.Invalidate();
|
|
return false;
|
|
}
|
|
|
|
existing_col = &existing_columns;
|
|
}
|
|
|
|
decision = compaction_filter_->FilterV3(
|
|
level_, filter_key, value_type, existing_val, existing_col,
|
|
&compaction_filter_value_, &new_columns,
|
|
compaction_filter_skip_until_.rep());
|
|
}
|
|
|
|
iter_stats_.total_filter_time +=
|
|
env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
|
|
}
|
|
|
|
if (decision == CompactionFilter::Decision::kUndetermined) {
|
|
// Should not reach here, since FilterV2/FilterV3 should never return
|
|
// kUndetermined.
|
|
status_ = Status::NotSupported(
|
|
"FilterV2/FilterV3 should never return kUndetermined");
|
|
validity_info_.Invalidate();
|
|
return false;
|
|
}
|
|
|
|
if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil &&
|
|
cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
|
|
0) {
|
|
// Can't skip to a key smaller than the current one.
|
|
// Keep the key as per FilterV2/FilterV3 documentation.
|
|
decision = CompactionFilter::Decision::kKeep;
|
|
}
|
|
|
|
if (decision == CompactionFilter::Decision::kRemove) {
|
|
// convert the current key to a delete; key_ is pointing into
|
|
// current_key_ at this point, so updating current_key_ updates key()
|
|
ikey_.type = kTypeDeletion;
|
|
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
|
|
// no value associated with delete
|
|
value_.clear();
|
|
iter_stats_.num_record_drop_user++;
|
|
} else if (decision == CompactionFilter::Decision::kPurge) {
|
|
// convert the current key to a single delete; key_ is pointing into
|
|
// current_key_ at this point, so updating current_key_ updates key()
|
|
ikey_.type = kTypeSingleDeletion;
|
|
current_key_.UpdateInternalKey(ikey_.sequence, kTypeSingleDeletion);
|
|
// no value associated with single delete
|
|
value_.clear();
|
|
iter_stats_.num_record_drop_user++;
|
|
} else if (decision == CompactionFilter::Decision::kChangeValue) {
|
|
if (ikey_.type != kTypeValue) {
|
|
ikey_.type = kTypeValue;
|
|
current_key_.UpdateInternalKey(ikey_.sequence, kTypeValue);
|
|
}
|
|
|
|
value_ = compaction_filter_value_;
|
|
} else if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil) {
|
|
*need_skip = true;
|
|
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
|
|
kValueTypeForSeek);
|
|
*skip_until = compaction_filter_skip_until_.Encode();
|
|
} else if (decision == CompactionFilter::Decision::kChangeBlobIndex) {
|
|
// Only the StackableDB-based BlobDB impl's compaction filter should return
|
|
// kChangeBlobIndex. Decision about rewriting blob and changing blob index
|
|
// in the integrated BlobDB impl is made in subsequent call to
|
|
// PrepareOutput() and its callees.
|
|
if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
|
|
status_ = Status::NotSupported(
|
|
"Only stacked BlobDB's internal compaction filter can return "
|
|
"kChangeBlobIndex.");
|
|
validity_info_.Invalidate();
|
|
return false;
|
|
}
|
|
|
|
if (ikey_.type != kTypeBlobIndex) {
|
|
ikey_.type = kTypeBlobIndex;
|
|
current_key_.UpdateInternalKey(ikey_.sequence, kTypeBlobIndex);
|
|
}
|
|
|
|
value_ = compaction_filter_value_;
|
|
} else if (decision == CompactionFilter::Decision::kIOError) {
|
|
if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
|
|
status_ = Status::NotSupported(
|
|
"CompactionFilter for integrated BlobDB should not return kIOError");
|
|
validity_info_.Invalidate();
|
|
return false;
|
|
}
|
|
|
|
status_ = Status::IOError("Failed to access blob during compaction filter");
|
|
validity_info_.Invalidate();
|
|
return false;
|
|
} else if (decision == CompactionFilter::Decision::kChangeWideColumnEntity) {
|
|
WideColumns sorted_columns;
|
|
|
|
sorted_columns.reserve(new_columns.size());
|
|
for (const auto& column : new_columns) {
|
|
sorted_columns.emplace_back(column.first, column.second);
|
|
}
|
|
|
|
std::sort(sorted_columns.begin(), sorted_columns.end(),
|
|
[](const WideColumn& lhs, const WideColumn& rhs) {
|
|
return lhs.name().compare(rhs.name()) < 0;
|
|
});
|
|
|
|
{
|
|
const Status s = WideColumnSerialization::Serialize(
|
|
sorted_columns, compaction_filter_value_);
|
|
if (!s.ok()) {
|
|
status_ = s;
|
|
validity_info_.Invalidate();
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if (ikey_.type != kTypeWideColumnEntity) {
|
|
ikey_.type = kTypeWideColumnEntity;
|
|
current_key_.UpdateInternalKey(ikey_.sequence, kTypeWideColumnEntity);
|
|
}
|
|
|
|
value_ = compaction_filter_value_;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void CompactionIterator::NextFromInput() {
|
|
at_next_ = false;
|
|
validity_info_.Invalidate();
|
|
|
|
while (!Valid() && input_.Valid() && !IsPausingManualCompaction() &&
|
|
!IsShuttingDown()) {
|
|
key_ = input_.key();
|
|
value_ = input_.value();
|
|
blob_value_.Reset();
|
|
iter_stats_.num_input_records++;
|
|
is_range_del_ = input_.IsDeleteRangeSentinelKey();
|
|
|
|
Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
|
|
if (!pik_status.ok()) {
|
|
iter_stats_.num_input_corrupt_records++;
|
|
|
|
// If `expect_valid_internal_key_` is false, return the corrupted key
|
|
// and let the caller decide what to do with it.
|
|
if (expect_valid_internal_key_) {
|
|
status_ = pik_status;
|
|
return;
|
|
}
|
|
key_ = current_key_.SetInternalKey(key_);
|
|
has_current_user_key_ = false;
|
|
current_user_key_sequence_ = kMaxSequenceNumber;
|
|
current_user_key_snapshot_ = 0;
|
|
validity_info_.SetValid(ValidContext::kParseKeyError);
|
|
break;
|
|
}
|
|
TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
|
|
if (is_range_del_) {
|
|
validity_info_.SetValid(kRangeDeletion);
|
|
break;
|
|
}
|
|
// Update input statistics
|
|
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
|
|
ikey_.type == kTypeDeletionWithTimestamp) {
|
|
iter_stats_.num_input_deletion_records++;
|
|
}
|
|
iter_stats_.total_input_raw_key_bytes += key_.size();
|
|
iter_stats_.total_input_raw_value_bytes += value_.size();
|
|
|
|
// If need_skip is true, we should seek the input iterator
|
|
// to internal key skip_until and continue from there.
|
|
bool need_skip = false;
|
|
// Points either into compaction_filter_skip_until_ or into
|
|
// merge_helper_->compaction_filter_skip_until_.
|
|
Slice skip_until;
|
|
|
|
bool user_key_equal_without_ts = false;
|
|
int cmp_ts = 0;
|
|
if (has_current_user_key_) {
|
|
user_key_equal_without_ts =
|
|
cmp_->EqualWithoutTimestamp(ikey_.user_key, current_user_key_);
|
|
// if timestamp_size_ > 0, then curr_ts_ has been initialized by a
|
|
// previous key.
|
|
cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp(
|
|
ExtractTimestampFromUserKey(
|
|
ikey_.user_key, timestamp_size_),
|
|
curr_ts_)
|
|
: 0;
|
|
}
|
|
|
|
// Check whether the user key changed. After this if statement current_key_
|
|
// is a copy of the current input key (maybe converted to a delete by the
|
|
// compaction filter). ikey_.user_key is pointing to the copy.
|
|
if (!has_current_user_key_ || !user_key_equal_without_ts || cmp_ts != 0) {
|
|
// First occurrence of this user key
|
|
// Copy key for output
|
|
key_ = current_key_.SetInternalKey(key_, &ikey_);
|
|
|
|
int prev_cmp_with_ts_low =
|
|
!full_history_ts_low_ ? 0
|
|
: curr_ts_.empty()
|
|
? 0
|
|
: cmp_->CompareTimestamp(curr_ts_, *full_history_ts_low_);
|
|
|
|
// If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use
|
|
// in next iteration to compare with the timestamp of next key.
|
|
UpdateTimestampAndCompareWithFullHistoryLow();
|
|
|
|
// If
|
|
// (1) !has_current_user_key_, OR
|
|
// (2) timestamp is disabled, OR
|
|
// (3) all history will be preserved, OR
|
|
// (4) user key (excluding timestamp) is different from previous key, OR
|
|
// (5) timestamp is NO older than *full_history_ts_low_, OR
|
|
// (6) timestamp is the largest one older than full_history_ts_low_,
|
|
// then current_user_key_ must be treated as a different user key.
|
|
// This means, if a user key (excluding ts) is the same as the previous
|
|
// user key, and its ts is older than *full_history_ts_low_, then we
|
|
// consider this key for GC, e.g. it may be dropped if certain conditions
|
|
// match.
|
|
if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
|
|
!user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0 ||
|
|
prev_cmp_with_ts_low >= 0) {
|
|
// Initialize for future comparison for rule (A) and etc.
|
|
current_user_key_sequence_ = kMaxSequenceNumber;
|
|
current_user_key_snapshot_ = 0;
|
|
has_current_user_key_ = true;
|
|
}
|
|
current_user_key_ = ikey_.user_key;
|
|
|
|
has_outputted_key_ = false;
|
|
|
|
last_key_seq_zeroed_ = false;
|
|
|
|
current_key_committed_ = KeyCommitted(ikey_.sequence);
|
|
|
|
// Apply the compaction filter to the first committed version of the user
|
|
// key.
|
|
if (current_key_committed_ &&
|
|
!InvokeFilterIfNeeded(&need_skip, &skip_until)) {
|
|
break;
|
|
}
|
|
} else {
|
|
// Update the current key to reflect the new sequence number/type without
|
|
// copying the user key.
|
|
// TODO(rven): Compaction filter does not process keys in this path
|
|
// Need to have the compaction filter process multiple versions
|
|
// if we have versions on both sides of a snapshot
|
|
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
|
|
key_ = current_key_.GetInternalKey();
|
|
ikey_.user_key = current_key_.GetUserKey();
|
|
|
|
// Note that newer version of a key is ordered before older versions. If a
|
|
// newer version of a key is committed, so as the older version. No need
|
|
// to query snapshot_checker_ in that case.
|
|
if (UNLIKELY(!current_key_committed_)) {
|
|
assert(snapshot_checker_ != nullptr);
|
|
current_key_committed_ = KeyCommitted(ikey_.sequence);
|
|
// Apply the compaction filter to the first committed version of the
|
|
// user key.
|
|
if (current_key_committed_ &&
|
|
!InvokeFilterIfNeeded(&need_skip, &skip_until)) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (UNLIKELY(!current_key_committed_)) {
|
|
assert(snapshot_checker_ != nullptr);
|
|
validity_info_.SetValid(ValidContext::kCurrentKeyUncommitted);
|
|
break;
|
|
}
|
|
|
|
// If there are no snapshots, then this kv affect visibility at tip.
|
|
// Otherwise, search though all existing snapshots to find the earliest
|
|
// snapshot that is affected by this kv.
|
|
SequenceNumber last_sequence = current_user_key_sequence_;
|
|
current_user_key_sequence_ = ikey_.sequence;
|
|
SequenceNumber last_snapshot = current_user_key_snapshot_;
|
|
SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
|
|
current_user_key_snapshot_ =
|
|
visible_at_tip_
|
|
? earliest_snapshot_
|
|
: findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
|
|
|
|
if (need_skip) {
|
|
// This case is handled below.
|
|
} else if (clear_and_output_next_key_) {
|
|
// In the previous iteration we encountered a single delete that we could
|
|
// not compact out. We will keep this Put, but can drop it's data.
|
|
// (See Optimization 3, below.)
|
|
if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
|
|
ikey_.type != kTypeWideColumnEntity) {
|
|
ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output",
|
|
ikey_.DebugString(allow_data_in_errors_, true).c_str());
|
|
assert(false);
|
|
}
|
|
if (current_user_key_snapshot_ < last_snapshot) {
|
|
ROCKS_LOG_FATAL(info_log_,
|
|
"key %s, current_user_key_snapshot_ (%" PRIu64
|
|
") < last_snapshot (%" PRIu64 ")",
|
|
ikey_.DebugString(allow_data_in_errors_, true).c_str(),
|
|
current_user_key_snapshot_, last_snapshot);
|
|
assert(false);
|
|
}
|
|
|
|
if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity) {
|
|
ikey_.type = kTypeValue;
|
|
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
|
|
}
|
|
|
|
value_.clear();
|
|
validity_info_.SetValid(ValidContext::kKeepSDAndClearPut);
|
|
clear_and_output_next_key_ = false;
|
|
} else if (ikey_.type == kTypeSingleDeletion) {
|
|
// We can compact out a SingleDelete if:
|
|
// 1) We encounter the corresponding PUT -OR- we know that this key
|
|
// doesn't appear past this output level
|
|
// =AND=
|
|
// 2) We've already returned a record in this snapshot -OR-
|
|
// there are no earlier earliest_write_conflict_snapshot.
|
|
//
|
|
// A note about 2) above:
|
|
// we try to determine whether there is any earlier write conflict
|
|
// checking snapshot by calling DefinitelyInSnapshot() with seq and
|
|
// earliest_write_conflict_snapshot as arguments. For write-prepared
|
|
// and write-unprepared transactions, if earliest_write_conflict_snapshot
|
|
// is evicted from WritePreparedTxnDB::commit_cache, then
|
|
// DefinitelyInSnapshot(seq, earliest_write_conflict_snapshot) returns
|
|
// false, even if the seq is actually visible within
|
|
// earliest_write_conflict_snapshot. Consequently, CompactionIterator
|
|
// may try to zero out its sequence number, thus hitting assertion error
|
|
// in debug mode or cause incorrect DBIter return result.
|
|
// We observe that earliest_write_conflict_snapshot >= earliest_snapshot,
|
|
// and the seq zeroing logic depends on
|
|
// DefinitelyInSnapshot(seq, earliest_snapshot). Therefore, if we cannot
|
|
// determine whether seq is **definitely** in
|
|
// earliest_write_conflict_snapshot, then we can additionally check if
|
|
// seq is definitely in earliest_snapshot. If the latter holds, then the
|
|
// former holds too.
|
|
//
|
|
// Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to
|
|
// allow Transactions to do write-conflict checking (if we compacted away
|
|
// all keys, then we wouldn't know that a write happened in this
|
|
// snapshot). If there is no earlier snapshot, then we know that there
|
|
// are no active transactions that need to know about any writes.
|
|
//
|
|
// Optimization 3:
|
|
// If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
|
|
// true, then we must output a SingleDelete. In this case, we will decide
|
|
// to also output the PUT. While we are compacting less by outputting the
|
|
// PUT now, hopefully this will lead to better compaction in the future
|
|
// when Rule 2 is later true (Ie, We are hoping we can later compact out
|
|
// both the SingleDelete and the Put, while we couldn't if we only
|
|
// outputted the SingleDelete now).
|
|
// In this case, we can save space by removing the PUT's value as it will
|
|
// never be read.
|
|
//
|
|
// Deletes and Merges are not supported on the same key that has a
|
|
// SingleDelete as it is not possible to correctly do any partial
|
|
// compaction of such a combination of operations. The result of mixing
|
|
// those operations for a given key is documented as being undefined. So
|
|
// we can choose how to handle such a combinations of operations. We will
|
|
// try to compact out as much as we can in these cases.
|
|
// We will report counts on these anomalous cases.
|
|
//
|
|
// Note: If timestamp is enabled, then record will be eligible for
|
|
// deletion, only if, along with above conditions (Rule 1 and Rule 2)
|
|
// full_history_ts_low_ is specified and timestamp for that key is less
|
|
// than *full_history_ts_low_. If it's not eligible for deletion, then we
|
|
// will output the SingleDelete. For Optimization 3 also, if
|
|
// full_history_ts_low_ is specified and timestamp for the key is less
|
|
// than *full_history_ts_low_ then only optimization will be applied.
|
|
|
|
// The easiest way to process a SingleDelete during iteration is to peek
|
|
// ahead at the next key.
|
|
const bool is_timestamp_eligible_for_gc =
|
|
(timestamp_size_ == 0 ||
|
|
(full_history_ts_low_ && cmp_with_history_ts_low_ < 0));
|
|
|
|
ParsedInternalKey next_ikey;
|
|
AdvanceInputIter();
|
|
while (input_.Valid() && input_.IsDeleteRangeSentinelKey() &&
|
|
ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
|
|
.ok() &&
|
|
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
|
|
// skip range tombstone start keys with the same user key
|
|
// since they are not "real" point keys.
|
|
AdvanceInputIter();
|
|
}
|
|
|
|
// Check whether the next key exists, is not corrupt, and is the same key
|
|
// as the single delete.
|
|
if (input_.Valid() &&
|
|
ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
|
|
.ok() &&
|
|
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
|
|
assert(!input_.IsDeleteRangeSentinelKey());
|
|
#ifndef NDEBUG
|
|
const Compaction* c =
|
|
compaction_ ? compaction_->real_compaction() : nullptr;
|
|
#endif
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"CompactionIterator::NextFromInput:SingleDelete:1",
|
|
const_cast<Compaction*>(c));
|
|
if (last_key_seq_zeroed_) {
|
|
++iter_stats_.num_record_drop_hidden;
|
|
++iter_stats_.num_record_drop_obsolete;
|
|
assert(bottommost_level_);
|
|
AdvanceInputIter();
|
|
} else if (prev_snapshot == 0 ||
|
|
DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
|
|
// Check whether the next key belongs to the same snapshot as the
|
|
// SingleDelete.
|
|
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
|
|
if (next_ikey.type == kTypeSingleDeletion) {
|
|
// We encountered two SingleDeletes for same key in a row. This
|
|
// could be due to unexpected user input. If write-(un)prepared
|
|
// transaction is used, this could also be due to releasing an old
|
|
// snapshot between a Put and its matching SingleDelete.
|
|
// Skip the first SingleDelete and let the next iteration decide
|
|
// how to handle the second SingleDelete.
|
|
|
|
// First SingleDelete has been skipped since we already called
|
|
// input_.Next().
|
|
++iter_stats_.num_record_drop_obsolete;
|
|
++iter_stats_.num_single_del_mismatch;
|
|
} else if (next_ikey.type == kTypeDeletion) {
|
|
std::ostringstream oss;
|
|
oss << "Found SD and type: " << static_cast<int>(next_ikey.type)
|
|
<< " on the same key, violating the contract "
|
|
"of SingleDelete. Check your application to make sure the "
|
|
"application does not mix SingleDelete and Delete for "
|
|
"the same key. If you are using "
|
|
"write-prepared/write-unprepared transactions, and use "
|
|
"SingleDelete to delete certain keys, then make sure "
|
|
"TransactionDBOptions::rollback_deletion_type_callback is "
|
|
"configured properly. Mixing SD and DEL can lead to "
|
|
"undefined behaviors";
|
|
++iter_stats_.num_record_drop_obsolete;
|
|
++iter_stats_.num_single_del_mismatch;
|
|
if (enforce_single_del_contracts_) {
|
|
ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str());
|
|
validity_info_.Invalidate();
|
|
status_ = Status::Corruption(oss.str());
|
|
return;
|
|
}
|
|
ROCKS_LOG_WARN(info_log_, "%s", oss.str().c_str());
|
|
} else if (!is_timestamp_eligible_for_gc) {
|
|
// We cannot drop the SingleDelete as timestamp is enabled, and
|
|
// timestamp of this key is greater than or equal to
|
|
// *full_history_ts_low_. We will output the SingleDelete.
|
|
validity_info_.SetValid(ValidContext::kKeepTsHistory);
|
|
} else if (has_outputted_key_ ||
|
|
DefinitelyInSnapshot(ikey_.sequence,
|
|
earliest_write_conflict_snapshot_) ||
|
|
(earliest_snapshot_ < earliest_write_conflict_snapshot_ &&
|
|
DefinitelyInSnapshot(ikey_.sequence,
|
|
earliest_snapshot_))) {
|
|
// Found a matching value, we can drop the single delete and the
|
|
// value. It is safe to drop both records since we've already
|
|
// outputted a key in this snapshot, or there is no earlier
|
|
// snapshot (Rule 2 above).
|
|
|
|
// Note: it doesn't matter whether the second key is a Put or if it
|
|
// is an unexpected Merge or Delete. We will compact it out
|
|
// either way. We will maintain counts of how many mismatches
|
|
// happened
|
|
if (next_ikey.type != kTypeValue &&
|
|
next_ikey.type != kTypeBlobIndex &&
|
|
next_ikey.type != kTypeWideColumnEntity) {
|
|
++iter_stats_.num_single_del_mismatch;
|
|
}
|
|
|
|
++iter_stats_.num_record_drop_hidden;
|
|
++iter_stats_.num_record_drop_obsolete;
|
|
// Already called input_.Next() once. Call it a second time to
|
|
// skip past the second key.
|
|
AdvanceInputIter();
|
|
} else {
|
|
// Found a matching value, but we cannot drop both keys since
|
|
// there is an earlier snapshot and we need to leave behind a record
|
|
// to know that a write happened in this snapshot (Rule 2 above).
|
|
// Clear the value and output the SingleDelete. (The value will be
|
|
// outputted on the next iteration.)
|
|
|
|
// Setting valid_ to true will output the current SingleDelete
|
|
validity_info_.SetValid(ValidContext::kKeepSDForConflictCheck);
|
|
|
|
// Set up the Put to be outputted in the next iteration.
|
|
// (Optimization 3).
|
|
clear_and_output_next_key_ = true;
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"CompactionIterator::NextFromInput:KeepSDForWW",
|
|
/*arg=*/nullptr);
|
|
}
|
|
} else {
|
|
// We hit the next snapshot without hitting a put, so the iterator
|
|
// returns the single delete.
|
|
validity_info_.SetValid(ValidContext::kKeepSDForSnapshot);
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"CompactionIterator::NextFromInput:SingleDelete:3",
|
|
const_cast<Compaction*>(c));
|
|
}
|
|
} else {
|
|
// We are at the end of the input, could not parse the next key, or hit
|
|
// a different key. The iterator returns the single delete if the key
|
|
// possibly exists beyond the current output level. We set
|
|
// has_current_user_key to false so that if the iterator is at the next
|
|
// key, we do not compare it again against the previous key at the next
|
|
// iteration. If the next key is corrupt, we return before the
|
|
// comparison, so the value of has_current_user_key does not matter.
|
|
has_current_user_key_ = false;
|
|
if (compaction_ != nullptr &&
|
|
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
|
|
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
|
|
&level_ptrs_) &&
|
|
is_timestamp_eligible_for_gc) {
|
|
// Key doesn't exist outside of this range.
|
|
// Can compact out this SingleDelete.
|
|
++iter_stats_.num_record_drop_obsolete;
|
|
++iter_stats_.num_single_del_fallthru;
|
|
if (!bottommost_level_) {
|
|
++iter_stats_.num_optimized_del_drop_obsolete;
|
|
}
|
|
} else if (last_key_seq_zeroed_) {
|
|
// Skip.
|
|
++iter_stats_.num_record_drop_hidden;
|
|
++iter_stats_.num_record_drop_obsolete;
|
|
assert(bottommost_level_);
|
|
} else {
|
|
// Output SingleDelete
|
|
validity_info_.SetValid(ValidContext::kKeepSD);
|
|
}
|
|
}
|
|
|
|
if (Valid()) {
|
|
at_next_ = true;
|
|
}
|
|
} else if (last_snapshot == current_user_key_snapshot_ ||
|
|
(last_snapshot > 0 &&
|
|
last_snapshot < current_user_key_snapshot_)) {
|
|
// If the earliest snapshot is which this key is visible in
|
|
// is the same as the visibility of a previous instance of the
|
|
// same key, then this kv is not visible in any snapshot.
|
|
// Hidden by an newer entry for same user key
|
|
//
|
|
// Note: Dropping this key will not affect TransactionDB write-conflict
|
|
// checking since there has already been a record returned for this key
|
|
// in this snapshot.
|
|
if (last_sequence < current_user_key_sequence_) {
|
|
ROCKS_LOG_FATAL(info_log_,
|
|
"key %s, last_sequence (%" PRIu64
|
|
") < current_user_key_sequence_ (%" PRIu64 ")",
|
|
ikey_.DebugString(allow_data_in_errors_, true).c_str(),
|
|
last_sequence, current_user_key_sequence_);
|
|
assert(false);
|
|
}
|
|
|
|
++iter_stats_.num_record_drop_hidden; // rule (A)
|
|
AdvanceInputIter();
|
|
} else if (compaction_ != nullptr &&
|
|
(ikey_.type == kTypeDeletion ||
|
|
(ikey_.type == kTypeDeletionWithTimestamp &&
|
|
cmp_with_history_ts_low_ < 0)) &&
|
|
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
|
|
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
|
|
&level_ptrs_)) {
|
|
// TODO(noetzli): This is the only place where we use compaction_
|
|
// (besides the constructor). We should probably get rid of this
|
|
// dependency and find a way to do similar filtering during flushes.
|
|
//
|
|
// For this user key:
|
|
// (1) there is no data in higher levels
|
|
// (2) data in lower levels will have larger sequence numbers
|
|
// (3) data in layers that are being compacted here and have
|
|
// smaller sequence numbers will be dropped in the next
|
|
// few iterations of this loop (by rule (A) above).
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
//
|
|
// Note: Dropping this Delete will not affect TransactionDB
|
|
// write-conflict checking since it is earlier than any snapshot.
|
|
//
|
|
// It seems that we can also drop deletion later than earliest snapshot
|
|
// given that:
|
|
// (1) The deletion is earlier than earliest_write_conflict_snapshot, and
|
|
// (2) No value exist earlier than the deletion.
|
|
//
|
|
// Note also that a deletion marker of type kTypeDeletionWithTimestamp
|
|
// will be treated as a different user key unless the timestamp is older
|
|
// than *full_history_ts_low_.
|
|
++iter_stats_.num_record_drop_obsolete;
|
|
if (!bottommost_level_) {
|
|
++iter_stats_.num_optimized_del_drop_obsolete;
|
|
}
|
|
AdvanceInputIter();
|
|
} else if ((ikey_.type == kTypeDeletion ||
|
|
(ikey_.type == kTypeDeletionWithTimestamp &&
|
|
cmp_with_history_ts_low_ < 0)) &&
|
|
bottommost_level_) {
|
|
// Handle the case where we have a delete key at the bottom most level
|
|
// We can skip outputting the key iff there are no subsequent puts for
|
|
// this key
|
|
assert(!compaction_ || compaction_->KeyNotExistsBeyondOutputLevel(
|
|
ikey_.user_key, &level_ptrs_));
|
|
ParsedInternalKey next_ikey;
|
|
AdvanceInputIter();
|
|
#ifndef NDEBUG
|
|
const Compaction* c =
|
|
compaction_ ? compaction_->real_compaction() : nullptr;
|
|
#endif
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"CompactionIterator::NextFromInput:BottommostDelete:1",
|
|
const_cast<Compaction*>(c));
|
|
// Skip over all versions of this key that happen to occur in the same
|
|
// snapshot range as the delete.
|
|
//
|
|
// Note that a deletion marker of type kTypeDeletionWithTimestamp will be
|
|
// considered to have a different user key unless the timestamp is older
|
|
// than *full_history_ts_low_.
|
|
//
|
|
// Range tombstone start keys are skipped as they are not "real" keys.
|
|
while (!IsPausingManualCompaction() && !IsShuttingDown() &&
|
|
input_.Valid() &&
|
|
(ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
|
|
.ok()) &&
|
|
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
|
|
(prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() ||
|
|
DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) {
|
|
AdvanceInputIter();
|
|
}
|
|
// If you find you still need to output a row with this key, we need to
|
|
// output the delete too
|
|
if (input_.Valid() &&
|
|
(ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
|
|
.ok()) &&
|
|
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
|
|
validity_info_.SetValid(ValidContext::kKeepDel);
|
|
at_next_ = true;
|
|
}
|
|
} else if (ikey_.type == kTypeMerge) {
|
|
if (!merge_helper_->HasOperator()) {
|
|
status_ = Status::InvalidArgument(
|
|
"merge_operator is not properly initialized.");
|
|
return;
|
|
}
|
|
|
|
pinned_iters_mgr_.StartPinning();
|
|
|
|
// We know the merge type entry is not hidden, otherwise we would
|
|
// have hit (A)
|
|
// We encapsulate the merge related state machine in a different
|
|
// object to minimize change to the existing flow.
|
|
merge_until_status_ = merge_helper_->MergeUntil(
|
|
&input_, range_del_agg_, prev_snapshot, bottommost_level_,
|
|
allow_data_in_errors_, blob_fetcher_.get(), full_history_ts_low_,
|
|
prefetch_buffers_.get(), &iter_stats_);
|
|
merge_out_iter_.SeekToFirst();
|
|
|
|
if (!merge_until_status_.ok() &&
|
|
!merge_until_status_.IsMergeInProgress()) {
|
|
status_ = merge_until_status_;
|
|
return;
|
|
} else if (merge_out_iter_.Valid()) {
|
|
// NOTE: key, value, and ikey_ refer to old entries.
|
|
// These will be correctly set below.
|
|
key_ = merge_out_iter_.key();
|
|
value_ = merge_out_iter_.value();
|
|
pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
|
|
// MergeUntil stops when it encounters a corrupt key and does not
|
|
// include them in the result, so we expect the keys here to valid.
|
|
if (!pik_status.ok()) {
|
|
ROCKS_LOG_FATAL(
|
|
info_log_, "Invalid key %s in compaction. %s",
|
|
allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden",
|
|
pik_status.getState());
|
|
assert(false);
|
|
}
|
|
// Keep current_key_ in sync.
|
|
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
|
|
key_ = current_key_.GetInternalKey();
|
|
ikey_.user_key = current_key_.GetUserKey();
|
|
validity_info_.SetValid(ValidContext::kMerge2);
|
|
} else {
|
|
// all merge operands were filtered out. reset the user key, since the
|
|
// batch consumed by the merge operator should not shadow any keys
|
|
// coming after the merges
|
|
has_current_user_key_ = false;
|
|
pinned_iters_mgr_.ReleasePinnedData();
|
|
|
|
if (merge_helper_->FilteredUntil(&skip_until)) {
|
|
need_skip = true;
|
|
}
|
|
}
|
|
} else {
|
|
// 1. new user key -OR-
|
|
// 2. different snapshot stripe
|
|
// If user-defined timestamp is enabled, we consider keys for GC if they
|
|
// are below history_ts_low_. CompactionRangeDelAggregator::ShouldDelete()
|
|
// only considers range deletions that are at or below history_ts_low_ and
|
|
// trim_ts_. We drop keys here that are below history_ts_low_ and are
|
|
// covered by a range tombstone that is at or below history_ts_low_ and
|
|
// trim_ts.
|
|
bool should_delete = false;
|
|
if (!timestamp_size_ || cmp_with_history_ts_low_ < 0) {
|
|
should_delete = range_del_agg_->ShouldDelete(
|
|
key_, RangeDelPositioningMode::kForwardTraversal);
|
|
}
|
|
if (should_delete) {
|
|
++iter_stats_.num_record_drop_hidden;
|
|
++iter_stats_.num_record_drop_range_del;
|
|
AdvanceInputIter();
|
|
} else {
|
|
validity_info_.SetValid(ValidContext::kNewUserKey);
|
|
}
|
|
}
|
|
|
|
if (need_skip) {
|
|
SkipUntil(skip_until);
|
|
}
|
|
}
|
|
|
|
if (!Valid() && IsShuttingDown()) {
|
|
status_ = Status::ShutdownInProgress();
|
|
}
|
|
|
|
if (IsPausingManualCompaction()) {
|
|
status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
|
|
}
|
|
|
|
// Propagate corruption status from memtable itereator
|
|
if (!input_.Valid() && input_.status().IsCorruption()) {
|
|
status_ = input_.status();
|
|
}
|
|
}
|
|
|
|
bool CompactionIterator::ExtractLargeValueIfNeededImpl() {
|
|
if (!blob_file_builder_) {
|
|
return false;
|
|
}
|
|
|
|
blob_index_.clear();
|
|
const Status s = blob_file_builder_->Add(user_key(), value_, &blob_index_);
|
|
|
|
if (!s.ok()) {
|
|
status_ = s;
|
|
validity_info_.Invalidate();
|
|
|
|
return false;
|
|
}
|
|
|
|
if (blob_index_.empty()) {
|
|
return false;
|
|
}
|
|
|
|
value_ = blob_index_;
|
|
|
|
return true;
|
|
}
|
|
|
|
void CompactionIterator::ExtractLargeValueIfNeeded() {
|
|
assert(ikey_.type == kTypeValue);
|
|
|
|
if (!ExtractLargeValueIfNeededImpl()) {
|
|
return;
|
|
}
|
|
|
|
ikey_.type = kTypeBlobIndex;
|
|
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
|
|
}
|
|
|
|
void CompactionIterator::GarbageCollectBlobIfNeeded() {
|
|
assert(ikey_.type == kTypeBlobIndex);
|
|
|
|
if (!compaction_) {
|
|
return;
|
|
}
|
|
|
|
// GC for integrated BlobDB
|
|
if (compaction_->enable_blob_garbage_collection()) {
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"CompactionIterator::GarbageCollectBlobIfNeeded::TamperWithBlobIndex",
|
|
&value_);
|
|
|
|
BlobIndex blob_index;
|
|
|
|
{
|
|
const Status s = blob_index.DecodeFrom(value_);
|
|
|
|
if (!s.ok()) {
|
|
status_ = s;
|
|
validity_info_.Invalidate();
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (blob_index.file_number() >=
|
|
blob_garbage_collection_cutoff_file_number_) {
|
|
return;
|
|
}
|
|
|
|
FilePrefetchBuffer* prefetch_buffer =
|
|
prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
|
|
blob_index.file_number())
|
|
: nullptr;
|
|
|
|
uint64_t bytes_read = 0;
|
|
|
|
{
|
|
assert(blob_fetcher_);
|
|
|
|
const Status s = blob_fetcher_->FetchBlob(
|
|
user_key(), blob_index, prefetch_buffer, &blob_value_, &bytes_read);
|
|
|
|
if (!s.ok()) {
|
|
status_ = s;
|
|
validity_info_.Invalidate();
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
++iter_stats_.num_blobs_read;
|
|
iter_stats_.total_blob_bytes_read += bytes_read;
|
|
|
|
++iter_stats_.num_blobs_relocated;
|
|
iter_stats_.total_blob_bytes_relocated += blob_index.size();
|
|
|
|
value_ = blob_value_;
|
|
|
|
if (ExtractLargeValueIfNeededImpl()) {
|
|
return;
|
|
}
|
|
|
|
ikey_.type = kTypeValue;
|
|
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
|
|
|
|
return;
|
|
}
|
|
|
|
// GC for stacked BlobDB
|
|
if (compaction_filter_ &&
|
|
compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
|
|
const auto blob_decision = compaction_filter_->PrepareBlobOutput(
|
|
user_key(), value_, &compaction_filter_value_);
|
|
|
|
if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
|
|
status_ =
|
|
Status::Corruption("Corrupted blob reference encountered during GC");
|
|
validity_info_.Invalidate();
|
|
|
|
return;
|
|
}
|
|
|
|
if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
|
|
status_ = Status::IOError("Could not relocate blob during GC");
|
|
validity_info_.Invalidate();
|
|
|
|
return;
|
|
}
|
|
|
|
if (blob_decision == CompactionFilter::BlobDecision::kChangeValue) {
|
|
value_ = compaction_filter_value_;
|
|
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
void CompactionIterator::DecideOutputLevel() {
|
|
assert(compaction_->SupportsPerKeyPlacement());
|
|
output_to_penultimate_level_ = false;
|
|
// if the key is newer than the cutoff sequence or within the earliest
|
|
// snapshot, it should output to the penultimate level.
|
|
if (ikey_.sequence > preclude_last_level_min_seqno_ ||
|
|
ikey_.sequence > earliest_snapshot_) {
|
|
output_to_penultimate_level_ = true;
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
// Could be overridden by unittest
|
|
PerKeyPlacementContext context(level_, ikey_.user_key, value_, ikey_.sequence,
|
|
output_to_penultimate_level_);
|
|
TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput.context",
|
|
&context);
|
|
if (ikey_.sequence > earliest_snapshot_) {
|
|
output_to_penultimate_level_ = true;
|
|
}
|
|
#endif // NDEBUG
|
|
|
|
if (output_to_penultimate_level_) {
|
|
// If it's decided to output to the penultimate level, but unsafe to do so,
|
|
// still output to the last level. For example, moving the data from a lower
|
|
// level to a higher level outside of the higher-level input key range is
|
|
// considered unsafe, because the key may conflict with higher-level SSTs
|
|
// not from this compaction.
|
|
// TODO: add statistic for declined output_to_penultimate_level
|
|
bool safe_to_penultimate_level =
|
|
compaction_->WithinPenultimateLevelOutputRange(ikey_.user_key);
|
|
if (!safe_to_penultimate_level) {
|
|
output_to_penultimate_level_ = false;
|
|
// It could happen when disable/enable `last_level_temperature` while
|
|
// holding a snapshot. When `last_level_temperature` is not set
|
|
// (==kUnknown), the data newer than any snapshot is pushed to the last
|
|
// level, but when the per_key_placement feature is enabled on the fly,
|
|
// the data later than the snapshot has to be moved to the penultimate
|
|
// level, which may or may not be safe. So the user needs to make sure all
|
|
// snapshot is released before enabling `last_level_temperature` feature
|
|
// We will migrate the feature to `last_level_temperature` and maybe make
|
|
// it not dynamically changeable.
|
|
if (ikey_.sequence > earliest_snapshot_) {
|
|
status_ = Status::Corruption(
|
|
"Unsafe to store Seq later than snapshot in the last level if "
|
|
"per_key_placement is enabled");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void CompactionIterator::PrepareOutput() {
|
|
if (Valid()) {
|
|
if (LIKELY(!is_range_del_)) {
|
|
if (ikey_.type == kTypeValue) {
|
|
ExtractLargeValueIfNeeded();
|
|
} else if (ikey_.type == kTypeBlobIndex) {
|
|
GarbageCollectBlobIfNeeded();
|
|
}
|
|
}
|
|
|
|
if (compaction_ != nullptr && compaction_->SupportsPerKeyPlacement()) {
|
|
DecideOutputLevel();
|
|
}
|
|
|
|
// Zeroing out the sequence number leads to better compression.
|
|
// If this is the bottommost level (no files in lower levels)
|
|
// and the earliest snapshot is larger than this seqno
|
|
// and the userkey differs from the last userkey in compaction
|
|
// then we can squash the seqno to zero.
|
|
//
|
|
// This is safe for TransactionDB write-conflict checking since transactions
|
|
// only care about sequence number larger than any active snapshots.
|
|
//
|
|
// Can we do the same for levels above bottom level as long as
|
|
// KeyNotExistsBeyondOutputLevel() return true?
|
|
if (Valid() && compaction_ != nullptr &&
|
|
!compaction_->allow_ingest_behind() && bottommost_level_ &&
|
|
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
|
|
ikey_.type != kTypeMerge && current_key_committed_ &&
|
|
!output_to_penultimate_level_ &&
|
|
ikey_.sequence < preserve_time_min_seqno_ && !is_range_del_) {
|
|
if (ikey_.type == kTypeDeletion ||
|
|
(ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
|
|
ROCKS_LOG_FATAL(
|
|
info_log_,
|
|
"Unexpected key %s for seq-zero optimization. "
|
|
"earliest_snapshot %" PRIu64
|
|
", earliest_write_conflict_snapshot %" PRIu64
|
|
" job_snapshot %" PRIu64
|
|
". timestamp_size: %d full_history_ts_low_ %s. validity %x",
|
|
ikey_.DebugString(allow_data_in_errors_, true).c_str(),
|
|
earliest_snapshot_, earliest_write_conflict_snapshot_,
|
|
job_snapshot_, static_cast<int>(timestamp_size_),
|
|
full_history_ts_low_ != nullptr
|
|
? Slice(*full_history_ts_low_).ToString(true).c_str()
|
|
: "null",
|
|
validity_info_.rep);
|
|
assert(false);
|
|
}
|
|
ikey_.sequence = 0;
|
|
last_key_seq_zeroed_ = true;
|
|
TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput:ZeroingSeq",
|
|
&ikey_);
|
|
if (!timestamp_size_) {
|
|
current_key_.UpdateInternalKey(0, ikey_.type);
|
|
} else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) {
|
|
// We can also zero out timestamp for better compression.
|
|
// For the same user key (excluding timestamp), the timestamp-based
|
|
// history can be collapsed to save some space if the timestamp is
|
|
// older than *full_history_ts_low_.
|
|
const std::string kTsMin(timestamp_size_, static_cast<char>(0));
|
|
const Slice ts_slice = kTsMin;
|
|
ikey_.SetTimestamp(ts_slice);
|
|
current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
|
|
SequenceNumber in, SequenceNumber* prev_snapshot) {
|
|
assert(snapshots_->size());
|
|
if (snapshots_->size() == 0) {
|
|
ROCKS_LOG_FATAL(info_log_,
|
|
"No snapshot left in findEarliestVisibleSnapshot");
|
|
}
|
|
auto snapshots_iter =
|
|
std::lower_bound(snapshots_->begin(), snapshots_->end(), in);
|
|
assert(prev_snapshot != nullptr);
|
|
if (snapshots_iter == snapshots_->begin()) {
|
|
*prev_snapshot = 0;
|
|
} else {
|
|
*prev_snapshot = *std::prev(snapshots_iter);
|
|
if (*prev_snapshot >= in) {
|
|
ROCKS_LOG_FATAL(info_log_,
|
|
"*prev_snapshot (%" PRIu64 ") >= in (%" PRIu64
|
|
") in findEarliestVisibleSnapshot",
|
|
*prev_snapshot, in);
|
|
assert(false);
|
|
}
|
|
}
|
|
if (snapshot_checker_ == nullptr) {
|
|
return snapshots_iter != snapshots_->end() ? *snapshots_iter
|
|
: kMaxSequenceNumber;
|
|
}
|
|
bool has_released_snapshot = !released_snapshots_.empty();
|
|
for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
|
|
auto cur = *snapshots_iter;
|
|
if (in > cur) {
|
|
ROCKS_LOG_FATAL(info_log_,
|
|
"in (%" PRIu64 ") > cur (%" PRIu64
|
|
") in findEarliestVisibleSnapshot",
|
|
in, cur);
|
|
assert(false);
|
|
}
|
|
// Skip if cur is in released_snapshots.
|
|
if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
|
|
continue;
|
|
}
|
|
auto res = snapshot_checker_->CheckInSnapshot(in, cur);
|
|
if (res == SnapshotCheckerResult::kInSnapshot) {
|
|
return cur;
|
|
} else if (res == SnapshotCheckerResult::kSnapshotReleased) {
|
|
released_snapshots_.insert(cur);
|
|
}
|
|
*prev_snapshot = cur;
|
|
}
|
|
return kMaxSequenceNumber;
|
|
}
|
|
|
|
uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber(
|
|
const CompactionProxy* compaction) {
|
|
if (!compaction) {
|
|
return 0;
|
|
}
|
|
|
|
if (!compaction->enable_blob_garbage_collection()) {
|
|
return 0;
|
|
}
|
|
|
|
const Version* const version = compaction->input_version();
|
|
assert(version);
|
|
|
|
const VersionStorageInfo* const storage_info = version->storage_info();
|
|
assert(storage_info);
|
|
|
|
const auto& blob_files = storage_info->GetBlobFiles();
|
|
|
|
const size_t cutoff_index = static_cast<size_t>(
|
|
compaction->blob_garbage_collection_age_cutoff() * blob_files.size());
|
|
|
|
if (cutoff_index >= blob_files.size()) {
|
|
return std::numeric_limits<uint64_t>::max();
|
|
}
|
|
|
|
const auto& meta = blob_files[cutoff_index];
|
|
assert(meta);
|
|
|
|
return meta->GetBlobFileNumber();
|
|
}
|
|
|
|
std::unique_ptr<BlobFetcher> CompactionIterator::CreateBlobFetcherIfNeeded(
|
|
const CompactionProxy* compaction) {
|
|
if (!compaction) {
|
|
return nullptr;
|
|
}
|
|
|
|
const Version* const version = compaction->input_version();
|
|
if (!version) {
|
|
return nullptr;
|
|
}
|
|
|
|
ReadOptions read_options;
|
|
read_options.io_activity = Env::IOActivity::kCompaction;
|
|
read_options.fill_cache = false;
|
|
|
|
return std::unique_ptr<BlobFetcher>(new BlobFetcher(version, read_options));
|
|
}
|
|
|
|
std::unique_ptr<PrefetchBufferCollection>
|
|
CompactionIterator::CreatePrefetchBufferCollectionIfNeeded(
|
|
const CompactionProxy* compaction) {
|
|
if (!compaction) {
|
|
return nullptr;
|
|
}
|
|
|
|
if (!compaction->input_version()) {
|
|
return nullptr;
|
|
}
|
|
|
|
if (compaction->allow_mmap_reads()) {
|
|
return nullptr;
|
|
}
|
|
|
|
const uint64_t readahead_size = compaction->blob_compaction_readahead_size();
|
|
if (!readahead_size) {
|
|
return nullptr;
|
|
}
|
|
|
|
return std::unique_ptr<PrefetchBufferCollection>(
|
|
new PrefetchBufferCollection(readahead_size));
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|