// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #pragma once #include #include #include #include #include #include "db/compaction/compaction.h" #include "db/compaction/compaction_iteration_stats.h" #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" #include "db/range_del_aggregator.h" #include "db/snapshot_checker.h" #include "options/cf_options.h" #include "rocksdb/compaction_filter.h" namespace ROCKSDB_NAMESPACE { class BlobFileBuilder; class CompactionIterator { public: // A wrapper around Compaction. Has a much smaller interface, only what // CompactionIterator uses. Tests can override it. class CompactionProxy { public: explicit CompactionProxy(const Compaction* compaction) : compaction_(compaction) {} virtual ~CompactionProxy() = default; virtual int level(size_t /*compaction_input_level*/ = 0) const { return compaction_->level(); } virtual bool KeyNotExistsBeyondOutputLevel( const Slice& user_key, std::vector* level_ptrs) const { return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs); } virtual bool bottommost_level() const { return compaction_->bottommost_level(); } virtual int number_levels() const { return compaction_->number_levels(); } virtual Slice GetLargestUserKey() const { return compaction_->GetLargestUserKey(); } virtual bool allow_ingest_behind() const { return compaction_->immutable_cf_options()->allow_ingest_behind; } virtual bool preserve_deletes() const { return compaction_->immutable_cf_options()->preserve_deletes; } protected: CompactionProxy() = default; private: const Compaction* compaction_; }; CompactionIterator(InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, const std::atomic* manual_compaction_paused = nullptr, const std::shared_ptr info_log = nullptr, const std::string* full_history_ts_low = nullptr); // Constructor with custom CompactionProxy, used for tests. CompactionIterator(InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, std::unique_ptr compaction, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, const std::atomic* manual_compaction_paused = nullptr, const std::shared_ptr info_log = nullptr, const std::string* full_history_ts_low = nullptr); ~CompactionIterator(); void ResetRecordCounts(); // Seek to the beginning of the compaction iterator output. // // REQUIRED: Call only once. void SeekToFirst(); // Produces the next record in the compaction. // // REQUIRED: SeekToFirst() has been called. void Next(); // Getters const Slice& key() const { return key_; } const Slice& value() const { return value_; } const Status& status() const { return status_; } const ParsedInternalKey& ikey() const { return ikey_; } bool Valid() const { return valid_; } const Slice& user_key() const { return current_user_key_; } const CompactionIterationStats& iter_stats() const { return iter_stats_; } private: // Processes the input stream to find the next output void NextFromInput(); // Do last preparations before presenting the output to the callee. At this // point this only zeroes out the sequence number if possible for better // compression. void PrepareOutput(); // Invoke compaction filter if needed. // Return true on success, false on failures (e.g.: kIOError). bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until); // Given a sequence number, return the sequence number of the // earliest snapshot that this sequence number is visible in. // The snapshots themselves are arranged in ascending order of // sequence numbers. // Employ a sequential search because the total number of // snapshots are typically small. inline SequenceNumber findEarliestVisibleSnapshot( SequenceNumber in, SequenceNumber* prev_snapshot); // Checks whether the currently seen ikey_ is needed for // incremental (differential) snapshot and hence can't be dropped // or seqnum be zero-ed out even if all other conditions for it are met. inline bool ikeyNotNeededForIncrementalSnapshot(); inline bool KeyCommitted(SequenceNumber sequence) { return snapshot_checker_ == nullptr || snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) == SnapshotCheckerResult::kInSnapshot; } bool IsInEarliestSnapshot(SequenceNumber sequence); // Extract user-defined timestamp from user key if possible and compare it // with *full_history_ts_low_ if applicable. inline void UpdateTimestampAndCompareWithFullHistoryLow() { if (!timestamp_size_) { return; } Slice ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_); curr_ts_.assign(ts.data(), ts.size()); if (full_history_ts_low_) { cmp_with_history_ts_low_ = cmp_->CompareTimestamp(ts, *full_history_ts_low_); } } InternalIterator* input_; const Comparator* cmp_; MergeHelper* merge_helper_; const std::vector* snapshots_; // List of snapshots released during compaction. // findEarliestVisibleSnapshot() find them out from return of // snapshot_checker, and make sure they will not be returned as // earliest visible snapshot of an older value. // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3. std::unordered_set released_snapshots_; std::vector::const_iterator earliest_snapshot_iter_; const SequenceNumber earliest_write_conflict_snapshot_; const SnapshotChecker* const snapshot_checker_; Env* env_; bool report_detailed_time_; bool expect_valid_internal_key_; CompactionRangeDelAggregator* range_del_agg_; BlobFileBuilder* blob_file_builder_; std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; const std::atomic* shutting_down_; const std::atomic* manual_compaction_paused_; const SequenceNumber preserve_deletes_seqnum_; bool bottommost_level_; bool valid_ = false; bool visible_at_tip_; SequenceNumber earliest_snapshot_; SequenceNumber latest_snapshot_; std::shared_ptr info_log_; bool allow_data_in_errors_; // Comes from comparator. const size_t timestamp_size_; // Lower bound timestamp to retain full history in terms of user-defined // timestamp. If a key's timestamp is older than full_history_ts_low_, then // the key *may* be eligible for garbage collection (GC). The skipping logic // is in `NextFromInput()` and `PrepareOutput()`. // If nullptr, NO GC will be performed and all history will be preserved. const std::string* const full_history_ts_low_; // State // // Points to a copy of the current compaction iterator output (current_key_) // if valid_. Slice key_; // Points to the value in the underlying iterator that corresponds to the // current output. Slice value_; // The status is OK unless compaction iterator encounters a merge operand // while not having a merge operator defined. Status status_; // Stores the user key, sequence number and type of the current compaction // iterator output (or current key in the underlying iterator during // NextFromInput()). ParsedInternalKey ikey_; // Stores whether ikey_.user_key is valid. If set to false, the user key is // not compared against the current key in the underlying iterator. bool has_current_user_key_ = false; // If false, the iterator holds a copy of the current compaction iterator // output (or current key in the underlying iterator during NextFromInput()). bool at_next_ = false; IterKey current_key_; Slice current_user_key_; std::string curr_ts_; SequenceNumber current_user_key_sequence_; SequenceNumber current_user_key_snapshot_; // True if the iterator has already returned a record for the current key. bool has_outputted_key_ = false; // truncated the value of the next key and output it without applying any // compaction rules. This is used for outputting a put after a single delete. bool clear_and_output_next_key_ = false; MergeOutputIterator merge_out_iter_; // PinnedIteratorsManager used to pin input_ Iterator blocks while reading // merge operands and then releasing them after consuming them. PinnedIteratorsManager pinned_iters_mgr_; std::string blob_index_; std::string compaction_filter_value_; InternalKey compaction_filter_skip_until_; // "level_ptrs" holds indices that remember which file of an associated // level we were last checking during the last call to compaction-> // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function // to pick off where it left off since each subcompaction's key range is // increasing so a later call to the function must be looking for a key that // is in or beyond the last file checked during the previous call std::vector level_ptrs_; CompactionIterationStats iter_stats_; // Used to avoid purging uncommitted values. The application can specify // uncommitted values by providing a SnapshotChecker object. bool current_key_committed_; // Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_) int cmp_with_history_ts_low_; bool IsShuttingDown() { // This is a best-effort facility, so memory_order_relaxed is sufficient. return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); } bool IsPausingManualCompaction() { // This is a best-effort facility, so memory_order_relaxed is sufficient. return manual_compaction_paused_ && manual_compaction_paused_->load(std::memory_order_relaxed) > 0; } }; } // namespace ROCKSDB_NAMESPACE