diff --git a/CMakeLists.txt b/CMakeLists.txt index 4d130260f..68624d274 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -74,6 +74,7 @@ set(SOURCES db/column_family.cc db/compacted_db_impl.cc db/compaction.cc + db/compaction_iterator.cc db/compaction_job.cc db/compaction_picker.cc db/convenience.cc diff --git a/db/builder.cc b/db/builder.cc index d7c9f8c85..a40a9564a 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -13,6 +13,7 @@ #include #include +#include "db/compaction_iterator.h" #include "db/dbformat.h" #include "db/filename.h" #include "db/internal_stats.h" @@ -32,28 +33,6 @@ namespace rocksdb { -namespace { -inline SequenceNumber EarliestVisibleSnapshot( - SequenceNumber in, const std::vector& snapshots, - SequenceNumber* prev_snapshot) { - if (snapshots.empty()) { - *prev_snapshot = 0; // 0 means no previous snapshot - return kMaxSequenceNumber; - } - SequenceNumber prev = 0; - for (const auto cur : snapshots) { - assert(prev <= cur); - if (cur >= in) { - *prev_snapshot = prev; - return cur; - } - prev = cur; // assignment - } - *prev_snapshot = prev; - return kMaxSequenceNumber; -} -} // namespace - class TableFactory; TableBuilder* NewTableBuilder( @@ -84,7 +63,6 @@ Status BuildTable( const size_t kReportFlushIOStatsEvery = 1048576; Status s; meta->fd.file_size = 0; - meta->smallest_seqno = meta->largest_seqno = 0; iter->SeekToFirst(); std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(), @@ -107,120 +85,22 @@ Status BuildTable( file_writer.get(), compression, compression_opts); } - { - // the first key is the smallest key - Slice key = iter->key(); - meta->smallest.DecodeFrom(key); - meta->smallest_seqno = GetInternalKeySeqno(key); - meta->largest_seqno = meta->smallest_seqno; - } - MergeHelper merge(internal_comparator.user_comparator(), ioptions.merge_operator, ioptions.info_log, ioptions.min_partial_merge_operands, true /* internal key corruption is not ok */); - IterKey current_user_key; - bool has_current_user_key = false; - // If has_current_user_key == true, this variable remembers the earliest - // snapshot in which this current key already exists. If two internal keys - // have the same user key AND the earlier one should be visible in the - // snapshot in which we already have a user key, we can drop the earlier - // user key - SequenceNumber current_user_key_exists_in_snapshot = kMaxSequenceNumber; - - while (iter->Valid()) { - // Get current key - ParsedInternalKey ikey; - Slice key = iter->key(); - Slice value = iter->value(); - - // In-memory key corruption is not ok; - // TODO: find a clean way to treat in memory key corruption - // Ugly workaround to avoid compiler error for release build - bool ok __attribute__((unused)) = true; - ok = ParseInternalKey(key, &ikey); - assert(ok); - - meta->smallest_seqno = std::min(meta->smallest_seqno, ikey.sequence); - meta->largest_seqno = std::max(meta->largest_seqno, ikey.sequence); - - // If the key is the same as the previous key (and it is not the - // first key), then we skip it, since it is an older version. - // Otherwise we output the key and mark it as the "new" previous key. - if (!has_current_user_key || - !internal_comparator.user_comparator()->Equal( - ikey.user_key, current_user_key.GetKey())) { - // First occurrence of this user key - current_user_key.SetKey(ikey.user_key); - has_current_user_key = true; - current_user_key_exists_in_snapshot = 0; - } - - // If there are no snapshots, then this kv affect visibility at tip. - // Otherwise, search though all existing snapshots to find - // the earliest snapshot that is affected by this kv. - SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot - SequenceNumber key_needs_to_exist_in_snapshot = - EarliestVisibleSnapshot(ikey.sequence, snapshots, &prev_snapshot); - - if (current_user_key_exists_in_snapshot == - key_needs_to_exist_in_snapshot) { - // If this user key already exists in snapshot in which it needs to - // exist, we can drop it. - // In other words, if the earliest snapshot is which this key is visible - // in is the same as the visibily of a previous instance of the - // same key, then this kv is not visible in any snapshot. - // Hidden by an newer entry for same user key - iter->Next(); - } else if (ikey.type == kTypeMerge) { - meta->largest.DecodeFrom(key); - - // TODO(tbd): Add a check here to prevent RocksDB from crash when - // reopening a DB w/o properly specifying the merge operator. But - // currently we observed a memory leak on failing in RocksDB - // recovery, so we decide to let it crash instead of causing - // memory leak for now before we have identified the real cause - // of the memory leak. - - // Handle merge-type keys using the MergeHelper - // TODO: pass statistics to MergeUntil - merge.MergeUntil(iter, prev_snapshot, false, nullptr, env); - // IMPORTANT: Slice key doesn't point to a valid value anymore!! - - const auto& keys = merge.keys(); - const auto& values = merge.values(); - assert(!keys.empty()); - assert(keys.size() == values.size()); - - // largest possible sequence number in a merge queue is already stored - // in ikey.sequence. - // we additionally have to consider the front of the merge queue, which - // might have the smallest sequence number (out of all the merges with - // the same key) - meta->smallest_seqno = - std::min(meta->smallest_seqno, GetInternalKeySeqno(keys.front())); - - // We have a list of keys to write, write all keys in the list. - for (auto key_iter = keys.rbegin(), value_iter = values.rbegin(); - key_iter != keys.rend(); key_iter++, value_iter++) { - key = Slice(*key_iter); - value = Slice(*value_iter); - bool valid_key __attribute__((__unused__)) = - ParseInternalKey(key, &ikey); - // MergeUntil stops when it encounters a corrupt key and does not - // include them in the result, so we expect the keys here to valid. - assert(valid_key); - builder->Add(key, value); - } - } else { // just write out the key-value - builder->Add(key, value); - meta->largest.DecodeFrom(key); - iter->Next(); - } - - current_user_key_exists_in_snapshot = key_needs_to_exist_in_snapshot; - + CompactionIterator c_iter(iter, internal_comparator.user_comparator(), + &merge, kMaxSequenceNumber, &snapshots, env, + true /* internal key corruption is not ok */); + c_iter.SeekToFirst(); + for (; c_iter.Valid(); c_iter.Next()) { + const Slice& key = c_iter.key(); + const Slice& value = c_iter.value(); + builder->Add(key, value); + meta->UpdateBoundaries(key, c_iter.ikey().sequence); + + // TODO(noetzli): Update stats after flush, too. if (io_priority == Env::IO_HIGH && IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { ThreadStatusUtil::IncreaseThreadOperationProperty( @@ -230,6 +110,7 @@ Status BuildTable( } // Finish and check for builder errors + s = c_iter.status(); if (s.ok()) { s = builder->Finish(); } else { diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc new file mode 100644 index 000000000..b6c8c4b73 --- /dev/null +++ b/db/compaction_iterator.cc @@ -0,0 +1,266 @@ +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/compaction_iterator.h" + +namespace rocksdb { + +CompactionIterator::CompactionIterator( + Iterator* input, const Comparator* cmp, MergeHelper* merge_helper, + SequenceNumber last_sequence, std::vector* snapshots, + Env* env, bool expect_valid_internal_key, Statistics* stats, + Compaction* compaction, const CompactionFilter* compaction_filter, + LogBuffer* log_buffer) + : input_(input), + cmp_(cmp), + merge_helper_(merge_helper), + snapshots_(snapshots), + env_(env), + expect_valid_internal_key_(expect_valid_internal_key), + stats_(stats), + compaction_(compaction), + compaction_filter_(compaction_filter), + log_buffer_(log_buffer), + merge_out_iter_(merge_helper_) { + assert(compaction_filter_ == nullptr || compaction_ != nullptr); + bottommost_level_ = + compaction_ == nullptr ? false : compaction_->bottommost_level(); + if (compaction_ != nullptr) { + level_ptrs_ = std::vector(compaction_->number_levels(), 0); + } + + if (snapshots_->size() == 0) { + // optimize for fast path if there are no snapshots + visible_at_tip_ = last_sequence; + earliest_snapshot_ = visible_at_tip_; + latest_snapshot_ = 0; + } else { + visible_at_tip_ = 0; + earliest_snapshot_ = snapshots_->at(0); + latest_snapshot_ = snapshots_->back(); + } +} + +void CompactionIterator::ResetRecordCounts() { + iter_stats_.num_record_drop_user = 0; + iter_stats_.num_record_drop_hidden = 0; + iter_stats_.num_record_drop_obsolete = 0; +} + +void CompactionIterator::SeekToFirst() { + NextFromInput(); + PrepareOutput(); +} + +void CompactionIterator::Next() { + // If there is a merge output, return it before continuing to process the + // input. + if (merge_out_iter_.Valid()) { + merge_out_iter_.Next(); + + // Check if we returned all records of the merge output. + if (merge_out_iter_.Valid()) { + key_ = merge_out_iter_.key(); + value_ = merge_out_iter_.value(); + bool valid_key __attribute__((__unused__)) = + ParseInternalKey(key_, &ikey_); + // MergeUntil stops when it encounters a corrupt key and does not + // include them in the result, so we expect the keys here to be valid. + assert(valid_key); + valid_ = true; + } else { + // MergeHelper moves the iterator to the first record after the merged + // records, so even though we reached the end of the merge output, we do + // not want to advance the iterator. + NextFromInput(); + } + } else { + // Only advance the input iterator if there is no merge output. + input_->Next(); + NextFromInput(); + } + + PrepareOutput(); +} + +void CompactionIterator::NextFromInput() { + valid_ = false; + + while (input_->Valid()) { + key_ = input_->key(); + value_ = input_->value(); + iter_stats_.num_input_records++; + + if (!ParseInternalKey(key_, &ikey_)) { + // If `expect_valid_internal_key_` is false, return the corrupted key + // and let the caller decide what to do with it. + // TODO(noetzli): Maybe we should have a more elegant solution for this. + assert(!expect_valid_internal_key_); + current_user_key_.Clear(); + has_current_user_key_ = false; + current_user_key_sequence_ = kMaxSequenceNumber; + current_user_key_snapshot_ = 0; + iter_stats_.num_input_corrupt_records++; + valid_ = true; + break; + } + + // Update input statistics + if (ikey_.type == kTypeDeletion) { + iter_stats_.num_input_deletion_records++; + } + iter_stats_.total_input_raw_key_bytes += key_.size(); + iter_stats_.total_input_raw_value_bytes += value_.size(); + + if (!has_current_user_key_ || + cmp_->Compare(ikey_.user_key, current_user_key_.GetKey()) != 0) { + // First occurrence of this user key + current_user_key_.SetKey(ikey_.user_key); + has_current_user_key_ = true; + current_user_key_sequence_ = kMaxSequenceNumber; + current_user_key_snapshot_ = 0; + // apply the compaction filter to the first occurrence of the user key + if (compaction_filter_ != nullptr && ikey_.type == kTypeValue && + (visible_at_tip_ || latest_snapshot_)) { + // If the user has specified a compaction filter and the sequence + // number is greater than any external snapshot, then invoke the + // filter. If the return value of the compaction filter is true, + // replace the entry with a deletion marker. + bool value_changed = false; + bool to_delete = false; + compaction_filter_value_.clear(); + { + StopWatchNano timer(env_, true); + to_delete = compaction_filter_->Filter( + compaction_->level(), ikey_.user_key, value_, + &compaction_filter_value_, &value_changed); + iter_stats_.total_filter_time += + env_ != nullptr ? timer.ElapsedNanos() : 0; + } + if (to_delete) { + // make a copy of the original key and convert it to a delete + delete_key_.SetInternalKey(ExtractUserKey(key_), ikey_.sequence, + kTypeDeletion); + // anchor the key again + key_ = delete_key_.GetKey(); + // needed because ikey_ is backed by key + ParseInternalKey(key_, &ikey_); + // no value associated with delete + value_.clear(); + iter_stats_.num_record_drop_user++; + } else if (value_changed) { + value_ = compaction_filter_value_; + } + } + } + + // If there are no snapshots, then this kv affect visibility at tip. + // Otherwise, search though all existing snapshots to find the earliest + // snapshot that is affected by this kv. + SequenceNumber last_sequence __attribute__((__unused__)) = + current_user_key_sequence_; + current_user_key_sequence_ = ikey_.sequence; + SequenceNumber last_snapshot = current_user_key_snapshot_; + SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot + current_user_key_snapshot_ = + visible_at_tip_ ? visible_at_tip_ : findEarliestVisibleSnapshot( + ikey_.sequence, &prev_snapshot); + + if (last_snapshot == current_user_key_snapshot_) { + // If the earliest snapshot is which this key is visible in + // is the same as the visibility of a previous instance of the + // same key, then this kv is not visible in any snapshot. + // Hidden by an newer entry for same user key + // TODO: why not > ? + assert(last_sequence >= current_user_key_sequence_); + ++iter_stats_.num_record_drop_hidden; // (A) + } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && + ikey_.sequence <= earliest_snapshot_ && + compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, + &level_ptrs_)) { + // TODO(noetzli): This is the only place where we use compaction_ + // (besides the constructor). We should probably get rid of this + // dependency and find a way to do similar filtering during flushes. + // + // For this user key: + // (1) there is no data in higher levels + // (2) data in lower levels will have larger sequence numbers + // (3) data in layers that are being compacted here and have + // smaller sequence numbers will be dropped in the next + // few iterations of this loop (by rule (A) above). + // Therefore this deletion marker is obsolete and can be dropped. + ++iter_stats_.num_record_drop_obsolete; + } else if (ikey_.type == kTypeMerge) { + if (!merge_helper_->HasOperator()) { + LogToBuffer(log_buffer_, "Options::merge_operator is null."); + status_ = Status::InvalidArgument( + "merge_operator is not properly initialized."); + return; + } + + // We know the merge type entry is not hidden, otherwise we would + // have hit (A) + // We encapsulate the merge related state machine in a different + // object to minimize change to the existing flow. + merge_helper_->MergeUntil(input_, prev_snapshot, bottommost_level_, + stats_, env_); + merge_out_iter_.SeekToFirst(); + + // NOTE: key, value, and ikey_ refer to old entries. + // These will be correctly set below. + key_ = merge_out_iter_.key(); + value_ = merge_out_iter_.value(); + bool valid_key __attribute__((__unused__)) = + ParseInternalKey(key_, &ikey_); + // MergeUntil stops when it encounters a corrupt key and does not + // include them in the result, so we expect the keys here to valid. + assert(valid_key); + valid_ = true; + break; + } else { + valid_ = true; + break; + } + + input_->Next(); + } +} + +void CompactionIterator::PrepareOutput() { + // Zeroing out the sequence number leads to better compression. + // If this is the bottommost level (no files in lower levels) + // and the earliest snapshot is larger than this seqno + // then we can squash the seqno to zero. + if (bottommost_level_ && valid_ && ikey_.sequence < earliest_snapshot_ && + ikey_.type != kTypeMerge) { + assert(ikey_.type != kTypeDeletion); + // make a copy because updating in place would cause problems + // with the priority queue that is managing the input key iterator + updated_key_.assign(key_.data(), key_.size()); + UpdateInternalKey(&updated_key_, (uint64_t)0, ikey_.type); + key_ = Slice(updated_key_); + } +} + +inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( + SequenceNumber in, SequenceNumber* prev_snapshot) { + assert(snapshots_->size()); + SequenceNumber prev __attribute__((unused)) = 0; + for (const auto cur : *snapshots_) { + assert(prev <= cur); + if (cur >= in) { + *prev_snapshot = prev; + return cur; + } + prev = cur; + assert(prev); + } + *prev_snapshot = prev; + return kMaxSequenceNumber; +} + +} // namespace rocksdb diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h new file mode 100644 index 000000000..d683d5626 --- /dev/null +++ b/db/compaction_iterator.h @@ -0,0 +1,126 @@ +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include +#include +#include +#include + +#include "db/compaction.h" +#include "db/merge_helper.h" +#include "rocksdb/compaction_filter.h" +#include "util/log_buffer.h" + +namespace rocksdb { + +struct CompactionIteratorStats { + // Compaction statistics + int64_t num_record_drop_user = 0; + int64_t num_record_drop_hidden = 0; + int64_t num_record_drop_obsolete = 0; + uint64_t total_filter_time = 0; + + // Input statistics + // TODO(noetzli): The stats are incomplete. They are lacking everything + // consumed by MergeHelper. + uint64_t num_input_records = 0; + uint64_t num_input_deletion_records = 0; + uint64_t num_input_corrupt_records = 0; + uint64_t total_input_raw_key_bytes = 0; + uint64_t total_input_raw_value_bytes = 0; +}; + +class CompactionIterator { + public: + CompactionIterator(Iterator* input, const Comparator* cmp, + MergeHelper* merge_helper, SequenceNumber last_sequence, + std::vector* snapshots, Env* env, + bool expect_valid_internal_key, + Statistics* stats = nullptr, + Compaction* compaction = nullptr, + const CompactionFilter* compaction_filter = nullptr, + LogBuffer* log_buffer = nullptr); + + void ResetRecordCounts(); + + // Seek to the beginning of the compaction iterator output. + // + // REQUIRED: Call only once. + void SeekToFirst(); + + // Produces the next record in the compaction. + // + // REQUIRED: SeekToFirst() has been called. + void Next(); + + // Getters + const Slice& key() const { return key_; } + const Slice& value() const { return value_; } + const Status& status() const { return status_; } + const ParsedInternalKey& ikey() const { return ikey_; } + bool Valid() const { return valid_; } + Slice user_key() const { return current_user_key_.GetKey(); } + const CompactionIteratorStats& iter_stats() const { return iter_stats_; } + + private: + // Processes the input stream to find the next output + void NextFromInput(); + + // Do last preparations before presenting the output to the callee. At this + // point this only zeroes out the sequence number if possible for better + // compression. + void PrepareOutput(); + + // Given a sequence number, return the sequence number of the + // earliest snapshot that this sequence number is visible in. + // The snapshots themselves are arranged in ascending order of + // sequence numbers. + // Employ a sequential search because the total number of + // snapshots are typically small. + inline SequenceNumber findEarliestVisibleSnapshot( + SequenceNumber in, SequenceNumber* prev_snapshot); + + Iterator* input_; + const Comparator* cmp_; + MergeHelper* merge_helper_; + const std::vector* snapshots_; + Env* env_; + bool expect_valid_internal_key_ __attribute__((__unused__)); + Statistics* stats_; + Compaction* compaction_; + const CompactionFilter* compaction_filter_; + LogBuffer* log_buffer_; + bool bottommost_level_; + bool valid_ = false; + SequenceNumber visible_at_tip_; + SequenceNumber earliest_snapshot_; + SequenceNumber latest_snapshot_; + + // State + Slice key_; + Slice value_; + Status status_; + ParsedInternalKey ikey_; + bool has_current_user_key_ = false; + IterKey current_user_key_; + SequenceNumber current_user_key_sequence_; + SequenceNumber current_user_key_snapshot_; + MergeOutputIterator merge_out_iter_; + std::string updated_key_; + std::string compaction_filter_value_; + IterKey delete_key_; + // "level_ptrs" holds indices that remember which file of an associated + // level we were last checking during the last call to compaction-> + // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function + // to pick off where it left off since each subcompaction's key range is + // increasing so a later call to the function must be looking for a key that + // is in or beyond the last file checked during the previous call + std::vector level_ptrs_; + CompactionIteratorStats iter_stats_; +}; +} // namespace rocksdb diff --git a/db/compaction_job.cc b/db/compaction_job.cc index b1a0c4878..7eb2361e8 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -31,12 +31,12 @@ #include "db/log_reader.h" #include "db/log_writer.h" #include "db/memtable.h" -#include "db/merge_helper.h" #include "db/memtable_list.h" #include "db/merge_context.h" +#include "db/merge_helper.h" #include "db/version_set.h" -#include "port/port.h" #include "port/likely.h" +#include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/statistics.h" @@ -46,14 +46,13 @@ #include "table/block_based_table_factory.h" #include "table/merger.h" #include "table/table_builder.h" -#include "table/two_level_iterator.h" #include "util/coding.h" #include "util/file_reader_writer.h" -#include "util/logging.h" +#include "util/iostats_context_imp.h" #include "util/log_buffer.h" +#include "util/logging.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" -#include "util/iostats_context_imp.h" #include "util/stop_watch.h" #include "util/string_util.h" #include "util/sync_point.h" @@ -64,6 +63,7 @@ namespace rocksdb { // Maintains state for each sub-compaction struct CompactionJob::SubcompactionState { Compaction* compaction; + std::unique_ptr c_iter; // The boundaries of the key-range this compaction is interested in. No two // subcompactions may have overlapping key-ranges. @@ -75,12 +75,8 @@ struct CompactionJob::SubcompactionState { // Files produced by this subcompaction struct Output { - uint64_t number; - uint32_t path_id; - uint64_t file_size; - InternalKey smallest, largest; - SequenceNumber smallest_seqno, largest_seqno; - bool need_compaction; + FileMetaData meta; + bool finished; }; // State kept for output being generated @@ -89,7 +85,7 @@ struct CompactionJob::SubcompactionState { std::unique_ptr builder; Output* current_output() { if (outputs.empty()) { - // This subcompaction's ouptut could be empty if compaction was aborted + // This subcompaction's outptut could be empty if compaction was aborted // before this subcompaction had a chance to generate any output files. // When subcompactions are executed sequentially this is more likely and // will be particulalry likely for the later subcompactions to be empty. @@ -107,14 +103,6 @@ struct CompactionJob::SubcompactionState { CompactionJobStats compaction_job_stats; uint64_t approx_size; - // "level_ptrs" holds indices that remember which file of an associated - // level we were last checking during the last call to compaction-> - // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function - // to pick off where it left off since each subcompaction's key range is - // increasing so a later call to the function must be looking for a key that - // is in or beyond the last file checked during the previous call - std::vector level_ptrs; - SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size = 0) : compaction(c), @@ -127,7 +115,6 @@ struct CompactionJob::SubcompactionState { num_output_records(0), approx_size(size) { assert(compaction != nullptr); - level_ptrs = std::vector(compaction->number_levels(), 0); } SubcompactionState(SubcompactionState&& o) { *this = std::move(o); } @@ -145,7 +132,6 @@ struct CompactionJob::SubcompactionState { num_output_records = std::move(o.num_output_records); compaction_job_stats = std::move(o.compaction_job_stats); approx_size = std::move(o.approx_size); - level_ptrs = std::move(o.level_ptrs); return *this; } @@ -183,21 +169,25 @@ struct CompactionJob::CompactionState { } Slice SmallestUserKey() { - for (auto& s : sub_compact_states) { - if (!s.outputs.empty()) { - return s.outputs[0].smallest.user_key(); + for (const auto& sub_compact_state : sub_compact_states) { + if (!sub_compact_state.outputs.empty() && + sub_compact_state.outputs[0].finished) { + return sub_compact_state.outputs[0].meta.smallest.user_key(); } } + // If there is no finished output, return an empty slice. return Slice(nullptr, 0); } Slice LargestUserKey() { - for (int i = static_cast(sub_compact_states.size() - 1); i >= 0; i--) { - if (!sub_compact_states[i].outputs.empty()) { - assert(sub_compact_states[i].current_output() != nullptr); - return sub_compact_states[i].current_output()->largest.user_key(); + for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend(); + ++it) { + if (!it->outputs.empty() && it->current_output()->finished) { + assert(it->current_output() != nullptr); + return it->current_output()->meta.largest.user_key(); } } + // If there is no finished output, return an empty slice. return Slice(nullptr, 0); } }; @@ -313,22 +303,6 @@ void CompactionJob::Prepare() { // Is this compaction producing files at the bottommost level? bottommost_level_ = c->bottommost_level(); - // Initialize subcompaction states - latest_snapshot_ = 0; - visible_at_tip_ = 0; - - if (existing_snapshots_.size() == 0) { - // optimize for fast path if there are no snapshots - visible_at_tip_ = versions_->LastSequence(); - earliest_snapshot_ = visible_at_tip_; - } else { - latest_snapshot_ = existing_snapshots_.back(); - // Add the current seqno as the 'latest' virtual - // snapshot to the end of this list. - existing_snapshots_.push_back(versions_->LastSequence()); - earliest_snapshot_ = existing_snapshots_[0]; - } - if (c->ShouldFormSubcompactions()) { const uint64_t start_micros = env_->NowMicros(); GenSubcompactionBoundaries(); @@ -597,15 +571,15 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options, void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact != nullptr); - std::unique_ptr input_ptr( + std::unique_ptr input( versions_->MakeInputIterator(sub_compact->compaction)); - Iterator* input = input_ptr.get(); AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); // I/O measurement variables PerfLevel prev_perf_level = PerfLevel::kEnableTime; + const uint64_t kRecordStatsEvery = 1000; uint64_t prev_write_nanos = 0; uint64_t prev_fsync_nanos = 0; uint64_t prev_range_sync_nanos = 0; @@ -619,17 +593,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { prev_prepare_write_nanos = iostats_context.prepare_write_nanos; } - // Variables used inside the loop - Status status; - std::string compaction_filter_value; - ParsedInternalKey ikey; - IterKey current_user_key; - bool has_current_user_key = false; - IterKey delete_key; - - SequenceNumber last_sequence_for_key __attribute__((unused)) = - kMaxSequenceNumber; - SequenceNumber visible_in_snapshot = kMaxSequenceNumber; ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); MergeHelper merge(cfd->user_comparator(), cfd->ioptions()->merge_operator, db_options_.info_log.get(), @@ -645,200 +608,93 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); - int64_t key_drop_user = 0; - int64_t key_drop_newer_entry = 0; - int64_t key_drop_obsolete = 0; - int64_t loop_cnt = 0; - - StopWatchNano timer(env_, stats_ != nullptr); - uint64_t total_filter_time = 0; - Slice* start = sub_compact->start; Slice* end = sub_compact->end; if (start != nullptr) { IterKey start_iter; start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); - Slice start_key = start_iter.GetKey(); - input->Seek(start_key); + input->Seek(start_iter.GetKey()); } else { input->SeekToFirst(); } + Status status; + sub_compact->c_iter.reset(new CompactionIterator( + input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), + &existing_snapshots_, env_, false, db_options_.statistics.get(), + sub_compact->compaction, compaction_filter)); + auto c_iter = sub_compact->c_iter.get(); + c_iter->SeekToFirst(); + const auto& c_iter_stats = c_iter->iter_stats(); // TODO(noetzli): check whether we could check !shutting_down_->... only // only occasionally (see diff D42687) - while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) && - !cfd->IsDropped() && status.ok()) { - Slice key = input->key(); - Slice value = input->value(); - - // First check that the key is parseable before performing the comparison - // to determine if it's within the range we want. Parsing may fail if the - // key being passed in is a user key without any internal key component - if (!ParseInternalKey(key, &ikey)) { - // Do not hide error keys - // TODO: error key stays in db forever? Figure out the rationale - // v10 error v8 : we cannot hide v8 even though it's pretty obvious. - current_user_key.Clear(); - has_current_user_key = false; - last_sequence_for_key = kMaxSequenceNumber; - visible_in_snapshot = kMaxSequenceNumber; - sub_compact->compaction_job_stats.num_corrupt_keys++; - - status = WriteKeyValue(key, value, ikey, input->status(), sub_compact); - input->Next(); - continue; - } + while (status.ok() && !shutting_down_->load(std::memory_order_acquire) && + !cfd->IsDropped() && c_iter->Valid()) { + // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() + // returns true. + const Slice& key = c_iter->key(); + const Slice& value = c_iter->value(); // If an end key (exclusive) is specified, check if the current key is // >= than it and exit if it is because the iterator is out of its range if (end != nullptr && - cfd->user_comparator()->Compare(ikey.user_key, *end) >= 0) { + cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { break; - } - - sub_compact->num_input_records++; - if (++loop_cnt > 1000) { - RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, - &key_drop_obsolete, - &sub_compact->compaction_job_stats); - RecordCompactionIOStats(); - loop_cnt = 0; - } - - sub_compact->compaction_job_stats.total_input_raw_key_bytes += key.size(); - sub_compact->compaction_job_stats.total_input_raw_value_bytes += - value.size(); - - if (sub_compact->compaction->ShouldStopBefore(key) && - sub_compact->builder != nullptr) { + } else if (sub_compact->compaction->ShouldStopBefore(key) && + sub_compact->builder != nullptr) { status = FinishCompactionOutputFile(input->status(), sub_compact); if (!status.ok()) { break; } } - if (ikey.type == kTypeDeletion) { - sub_compact->compaction_job_stats.num_input_deletion_records++; - } - - if (!has_current_user_key || - !cfd->user_comparator()->Equal(ikey.user_key, - current_user_key.GetKey())) { - // First occurrence of this user key - current_user_key.SetKey(ikey.user_key); - has_current_user_key = true; - last_sequence_for_key = kMaxSequenceNumber; - visible_in_snapshot = kMaxSequenceNumber; - // apply the compaction filter to the first occurrence of the user key - if (compaction_filter && ikey.type == kTypeValue && - (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { - // If the user has specified a compaction filter and the sequence - // number is greater than any external snapshot, then invoke the - // filter. If the return value of the compaction filter is true, - // replace the entry with a deletion marker. - bool value_changed = false; - compaction_filter_value.clear(); - if (stats_ != nullptr) { - timer.Start(); - } - bool to_delete = compaction_filter->Filter( - sub_compact->compaction->level(), ikey.user_key, value, - &compaction_filter_value, &value_changed); - total_filter_time += timer.ElapsedNanos(); - if (to_delete) { - // make a copy of the original key and convert it to a delete - delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence, - kTypeDeletion); - // anchor the key again - key = delete_key.GetKey(); - // needed because ikey is backed by key - ParseInternalKey(key, &ikey); - // no value associated with delete - value.clear(); - ++key_drop_user; - } else if (value_changed) { - value = compaction_filter_value; - } - } + if (c_iter_stats.num_input_records % kRecordStatsEvery == + kRecordStatsEvery - 1) { + RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); + c_iter->ResetRecordCounts(); + RecordCompactionIOStats(); } - // If there are no snapshots, then this kv affect visibility at tip. - // Otherwise, search though all existing snapshots to find - // the earlist snapshot that is affected by this kv. - SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot - SequenceNumber visible = - visible_at_tip_ ? visible_at_tip_ : findEarliestVisibleSnapshot( - ikey.sequence, &prev_snapshot); - - if (visible_in_snapshot == visible) { - // If the earliest snapshot is which this key is visible in - // is the same as the visibily of a previous instance of the - // same key, then this kv is not visible in any snapshot. - // Hidden by an newer entry for same user key - // TODO: why not > ? - assert(last_sequence_for_key >= ikey.sequence); - ++key_drop_newer_entry; - input->Next(); // (A) - } else if (ikey.type == kTypeDeletion && - ikey.sequence <= earliest_snapshot_ && - sub_compact->compaction->KeyNotExistsBeyondOutputLevel( - ikey.user_key, &sub_compact->level_ptrs)) { - // For this user key: - // (1) there is no data in higher levels - // (2) data in lower levels will have larger sequence numbers - // (3) data in layers that are being compacted here and have - // smaller sequence numbers will be dropped in the next - // few iterations of this loop (by rule (A) above). - // Therefore this deletion marker is obsolete and can be dropped. - ++key_drop_obsolete; - input->Next(); - } else if (ikey.type == kTypeMerge) { - if (!merge.HasOperator()) { - LogToBuffer(log_buffer_, "Options::merge_operator is null."); - status = Status::InvalidArgument( - "merge_operator is not properly initialized."); + // Open output file if necessary + if (sub_compact->builder == nullptr) { + status = OpenCompactionOutputFile(sub_compact); + if (!status.ok()) { break; } - // We know the merge type entry is not hidden, otherwise we would - // have hit (A) - // We encapsulate the merge related state machine in a different - // object to minimize change to the existing flow. Turn out this - // logic could also be nicely re-used for memtable flush purge - // optimization in BuildTable. - merge.MergeUntil(input, prev_snapshot, bottommost_level_, - db_options_.statistics.get(), env_); - - // NOTE: key, value, and ikey refer to old entries. - // These will be correctly set below. - const auto& keys = merge.keys(); - const auto& values = merge.values(); - assert(!keys.empty()); - assert(keys.size() == values.size()); - - // We have a list of keys to write, write all keys in the list. - for (auto key_iter = keys.rbegin(), value_iter = values.rbegin(); - !status.ok() || key_iter != keys.rend(); key_iter++, value_iter++) { - key = Slice(*key_iter); - value = Slice(*value_iter); - bool valid_key __attribute__((__unused__)) = - ParseInternalKey(key, &ikey); - // MergeUntil stops when it encounters a corrupt key and does not - // include them in the result, so we expect the keys here to valid. - assert(valid_key); - status = WriteKeyValue(key, value, ikey, input->status(), sub_compact); - } - } else { - status = WriteKeyValue(key, value, ikey, input->status(), sub_compact); - input->Next(); + } + assert(sub_compact->builder != nullptr); + assert(sub_compact->current_output() != nullptr); + sub_compact->builder->Add(key, value); + sub_compact->current_output()->meta.UpdateBoundaries( + key, c_iter->ikey().sequence); + sub_compact->num_output_records++; + + // Close output file if it is big enough + // TODO(aekmekji): determine if file should be closed earlier than this + // during subcompactions (i.e. if output size, estimated by input size, is + // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB + // and 0.6MB instead of 1MB and 0.2MB) + if (sub_compact->builder->FileSize() >= + sub_compact->compaction->max_output_file_size()) { + status = FinishCompactionOutputFile(input->status(), sub_compact); } - last_sequence_for_key = ikey.sequence; - visible_in_snapshot = visible; + c_iter->Next(); } - RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); - RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete, - &sub_compact->compaction_job_stats); + sub_compact->num_input_records = c_iter_stats.num_input_records; + sub_compact->compaction_job_stats.num_input_deletion_records = + c_iter_stats.num_input_deletion_records; + sub_compact->compaction_job_stats.num_corrupt_keys = + c_iter_stats.num_input_corrupt_records; + sub_compact->compaction_job_stats.total_input_raw_key_bytes += + c_iter_stats.total_input_raw_key_bytes; + sub_compact->compaction_job_stats.total_input_raw_value_bytes += + c_iter_stats.total_input_raw_value_bytes; + + RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, + c_iter_stats.total_filter_time); + RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); RecordCompactionIOStats(); if (status.ok() && @@ -867,91 +723,33 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } } - input_ptr.reset(); + sub_compact->c_iter.reset(); + input.reset(); sub_compact->status = status; } -Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, - const ParsedInternalKey& ikey, - const Status& input_status, - SubcompactionState* sub_compact) { - Slice newkey(key.data(), key.size()); - std::string kstr; - - // Zeroing out the sequence number leads to better compression. - // If this is the bottommost level (no files in lower levels) - // and the earliest snapshot is larger than this seqno - // then we can squash the seqno to zero. - if (bottommost_level_ && ikey.sequence < earliest_snapshot_ && - ikey.type != kTypeMerge) { - assert(ikey.type != kTypeDeletion); - // make a copy because updating in place would cause problems - // with the priority queue that is managing the input key iterator - kstr.assign(key.data(), key.size()); - UpdateInternalKey(&kstr, (uint64_t)0, ikey.type); - newkey = Slice(kstr); - } - - // Open output file if necessary - if (sub_compact->builder == nullptr) { - Status status = OpenCompactionOutputFile(sub_compact); - if (!status.ok()) { - return status; - } - } - assert(sub_compact->builder != nullptr); - assert(sub_compact->current_output() != nullptr); - - SequenceNumber seqno = GetInternalKeySeqno(newkey); - if (sub_compact->builder->NumEntries() == 0) { - sub_compact->current_output()->smallest.DecodeFrom(newkey); - sub_compact->current_output()->smallest_seqno = seqno; - } else { - sub_compact->current_output()->smallest_seqno = - std::min(sub_compact->current_output()->smallest_seqno, seqno); - } - sub_compact->current_output()->largest.DecodeFrom(newkey); - sub_compact->builder->Add(newkey, value); - sub_compact->num_output_records++; - sub_compact->current_output()->largest_seqno = - std::max(sub_compact->current_output()->largest_seqno, seqno); - - // Close output file if it is big enough - // TODO(aekmekji): determine if file should be closed earlier than this - // during subcompactions (i.e. if output size, estimated by input size, is - // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB - // and 0.6MB instead of 1MB and 0.2MB) - Status status; - if (sub_compact->builder->FileSize() >= - sub_compact->compaction->max_output_file_size()) { - status = FinishCompactionOutputFile(input_status, sub_compact); - } - - return status; -} - void CompactionJob::RecordDroppedKeys( - int64_t* key_drop_user, - int64_t* key_drop_newer_entry, - int64_t* key_drop_obsolete, + const CompactionIteratorStats& c_iter_stats, CompactionJobStats* compaction_job_stats) { - if (*key_drop_user > 0) { - RecordTick(stats_, COMPACTION_KEY_DROP_USER, *key_drop_user); - *key_drop_user = 0; + if (c_iter_stats.num_record_drop_user > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_USER, + c_iter_stats.num_record_drop_user); } - if (*key_drop_newer_entry > 0) { - RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, *key_drop_newer_entry); + if (c_iter_stats.num_record_drop_hidden > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, + c_iter_stats.num_record_drop_hidden); if (compaction_job_stats) { - compaction_job_stats->num_records_replaced += *key_drop_newer_entry; + compaction_job_stats->num_records_replaced += + c_iter_stats.num_record_drop_hidden; } - *key_drop_newer_entry = 0; } - if (*key_drop_obsolete > 0) { - RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, *key_drop_obsolete); + if (c_iter_stats.num_record_drop_obsolete > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, + c_iter_stats.num_record_drop_obsolete); if (compaction_job_stats) { - compaction_job_stats->num_expired_deletion_records += *key_drop_obsolete; + compaction_job_stats->num_expired_deletion_records += + c_iter_stats.num_record_drop_obsolete; } - *key_drop_obsolete = 0; } } @@ -964,23 +762,23 @@ Status CompactionJob::FinishCompactionOutputFile( assert(sub_compact->builder != nullptr); assert(sub_compact->current_output() != nullptr); - const uint64_t output_number = sub_compact->current_output()->number; - const uint32_t output_path_id = sub_compact->current_output()->path_id; + uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber(); assert(output_number != 0); TableProperties table_properties; // Check for iterator errors Status s = input_status; + auto meta = &sub_compact->current_output()->meta; const uint64_t current_entries = sub_compact->builder->NumEntries(); - sub_compact->current_output()->need_compaction = - sub_compact->builder->NeedCompact(); + meta->marked_for_compaction = sub_compact->builder->NeedCompact(); if (s.ok()) { s = sub_compact->builder->Finish(); } else { sub_compact->builder->Abandon(); } const uint64_t current_bytes = sub_compact->builder->FileSize(); - sub_compact->current_output()->file_size = current_bytes; + meta->fd.file_size = current_bytes; + sub_compact->current_output()->finished = true; sub_compact->total_bytes += current_bytes; // Finish and check for file errors @@ -996,11 +794,10 @@ Status CompactionJob::FinishCompactionOutputFile( if (s.ok() && current_entries > 0) { // Verify that the table is usable ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); - FileDescriptor fd(output_number, output_path_id, current_bytes); Iterator* iter = cfd->table_cache()->NewIterator( - ReadOptions(), env_options_, cfd->internal_comparator(), fd, nullptr, - cfd->internal_stats()->GetFileReadHist( - compact_->compaction->output_level()), + ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd, + nullptr, cfd->internal_stats()->GetFileReadHist( + compact_->compaction->output_level()), false); s = iter->status(); @@ -1014,19 +811,19 @@ Status CompactionJob::FinishCompactionOutputFile( TableFileCreationInfo info(sub_compact->builder->GetTableProperties()); info.db_name = dbname_; info.cf_name = cfd->GetName(); - info.file_path = TableFileName(cfd->ioptions()->db_paths, - fd.GetNumber(), fd.GetPathId()); - info.file_size = fd.GetFileSize(); + info.file_path = + TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(), + meta->fd.GetPathId()); + info.file_size = meta->fd.GetFileSize(); info.job_id = job_id_; Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 " keys, %" PRIu64 " bytes%s", cfd->GetName().c_str(), job_id_, output_number, current_entries, current_bytes, - sub_compact->current_output()->need_compaction ? " (need compaction)" - : ""); + meta->marked_for_compaction ? " (need compaction)" : ""); EventHelpers::LogAndNotifyTableFileCreation( - event_logger_, cfd->ioptions()->listeners, fd, info); + event_logger_, cfd->ioptions()->listeners, meta->fd, info); } } sub_compact->builder.reset(); @@ -1063,13 +860,9 @@ Status CompactionJob::InstallCompactionResults( // Add compaction outputs compaction->AddInputDeletions(compact_->compaction->edit()); - for (SubcompactionState& sub_compact : compact_->sub_compact_states) { - for (size_t i = 0; i < sub_compact.outputs.size(); i++) { - const SubcompactionState::Output& out = sub_compact.outputs[i]; - compaction->edit()->AddFile(compaction->output_level(), out.number, - out.path_id, out.file_size, out.smallest, - out.largest, out.smallest_seqno, - out.largest_seqno, out.need_compaction); + for (const auto& sub_compact : compact_->sub_compact_states) { + for (const auto& out : sub_compact.outputs) { + compaction->edit()->AddFile(compaction->output_level(), out.meta); } } return versions_->LogAndApply(compaction->column_family_data(), @@ -1077,34 +870,6 @@ Status CompactionJob::InstallCompactionResults( db_mutex, db_directory_); } -// Given a sequence number, return the sequence number of the -// earliest snapshot that this sequence number is visible in. -// The snapshots themselves are arranged in ascending order of -// sequence numbers. -// Employ a sequential search because the total number of -// snapshots are typically small. -inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot( - SequenceNumber in, SequenceNumber* prev_snapshot) { - assert(existing_snapshots_.size()); - SequenceNumber prev __attribute__((unused)) = 0; - for (const auto cur : existing_snapshots_) { - assert(prev <= cur); - if (cur >= in) { - *prev_snapshot = prev; - return cur; - } - prev = cur; // assignment - assert(prev); - } - Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, - "CompactionJob is not able to find snapshot" - " with SeqId later than %" PRIu64 - ": current MaxSeqId is %" PRIu64 "", - in, existing_snapshots_[existing_snapshots_.size() - 1]); - assert(0); - return 0; -} - void CompactionJob::RecordCompactionIOStats() { RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read)); ThreadStatusUtil::IncreaseThreadOperationProperty( @@ -1137,11 +902,9 @@ Status CompactionJob::OpenCompactionOutputFile( return s; } SubcompactionState::Output out; - out.number = file_number; - out.path_id = sub_compact->compaction->output_path_id(); - out.smallest.Clear(); - out.largest.Clear(); - out.smallest_seqno = out.largest_seqno = 0; + out.meta.fd = + FileDescriptor(file_number, sub_compact->compaction->output_path_id(), 0); + out.finished = false; sub_compact->outputs.push_back(out); writable_file->SetIOPriority(Env::IO_LOW); @@ -1151,16 +914,11 @@ Status CompactionJob::OpenCompactionOutputFile( new WritableFileWriter(std::move(writable_file), env_options_)); ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); - bool skip_filters = false; - // If the Column family flag is to only optimize filters for hits, // we can skip creating filters if this is the bottommost_level where // data is going to be found - // - if (cfd->ioptions()->optimize_filters_for_hits && bottommost_level_) { - skip_filters = true; - } - + bool skip_filters = + cfd->ioptions()->optimize_filters_for_hits && bottommost_level_; sub_compact->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), sub_compact->outfile.get(), @@ -1181,13 +939,11 @@ void CompactionJob::CleanupCompaction() { } else { assert(!sub_status.ok() || sub_compact.outfile == nullptr); } - for (size_t i = 0; i < sub_compact.outputs.size(); i++) { - const SubcompactionState::Output& out = sub_compact.outputs[i]; - + for (const auto& out : sub_compact.outputs) { // If this file was inserted into the table cache then remove // them here because this compaction was not committed. if (!sub_status.ok()) { - TableCache::Evict(table_cache_.get(), out.number); + TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber()); } } } @@ -1237,8 +993,8 @@ void CompactionJob::UpdateCompactionStats() { } compaction_stats_.num_output_files += static_cast(num_output_files); - for (size_t i = 0; i < num_output_files; i++) { - compaction_stats_.bytes_written += sub_compact.outputs[i].file_size; + for (const auto& out : sub_compact.outputs) { + compaction_stats_.bytes_written += out.meta.fd.file_size; } if (sub_compact.num_input_records > sub_compact.num_output_records) { compaction_stats_.num_dropped_records += diff --git a/db/compaction_job.h b/db/compaction_job.h index b3dd3e717..1054fecc9 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -18,6 +18,7 @@ #include #include "db/column_family.h" +#include "db/compaction_iterator.h" #include "db/dbformat.h" #include "db/flush_scheduler.h" #include "db/internal_stats.h" @@ -91,25 +92,16 @@ class CompactionJob { // kv-pairs void ProcessKeyValueCompaction(SubcompactionState* sub_compact); - Status WriteKeyValue(const Slice& key, const Slice& value, - const ParsedInternalKey& ikey, - const Status& input_status, - SubcompactionState* sub_compact); - Status FinishCompactionOutputFile(const Status& input_status, SubcompactionState* sub_compact); Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options, InstrumentedMutex* db_mutex); - SequenceNumber findEarliestVisibleSnapshot(SequenceNumber in, - SequenceNumber* prev_snapshot); void RecordCompactionIOStats(); Status OpenCompactionOutputFile(SubcompactionState* sub_compact); void CleanupCompaction(); void UpdateCompactionJobStats( const InternalStats::CompactionStats& stats) const; - void RecordDroppedKeys(int64_t* key_drop_user, - int64_t* key_drop_newer_entry, - int64_t* key_drop_obsolete, + void RecordDroppedKeys(const CompactionIteratorStats& c_iter_stats, CompactionJobStats* compaction_job_stats = nullptr); void UpdateCompactionStats(); @@ -124,15 +116,8 @@ class CompactionJob { struct CompactionState; CompactionState* compact_; CompactionJobStats* compaction_job_stats_; - - bool bottommost_level_; - InternalStats::CompactionStats compaction_stats_; - SequenceNumber earliest_snapshot_; - SequenceNumber latest_snapshot_; - SequenceNumber visible_at_tip_; - // DBImpl state const std::string& dbname_; const DBOptions& db_options_; @@ -153,6 +138,7 @@ class CompactionJob { EventLogger* event_logger_; + bool bottommost_level_; bool paranoid_file_checks_; bool measure_io_stats_; // Stores the Slices that designate the boundaries for each subcompaction diff --git a/db/dbformat.h b/db/dbformat.h index f54c8e226..c3ff2ef2f 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -166,6 +166,7 @@ class InternalKey { } Slice user_key() const { return ExtractUserKey(rep_); } + size_t size() { return rep_.size(); } void SetFrom(const ParsedInternalKey& p) { rep_.clear(); diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 7d5f4c1e9..a87584647 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -222,4 +222,24 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, return s; } +MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper) + : merge_helper_(merge_helper) { + it_keys_ = merge_helper_->keys().rend(); + it_values_ = merge_helper_->values().rend(); +} + +void MergeOutputIterator::SeekToFirst() { + const auto& keys = merge_helper_->keys(); + const auto& values = merge_helper_->values(); + assert(keys.size() > 0); + assert(keys.size() == values.size()); + it_keys_ = keys.rbegin(); + it_values_ = values.rbegin(); +} + +void MergeOutputIterator::Next() { + ++it_keys_; + ++it_values_; +} + } // namespace rocksdb diff --git a/db/merge_helper.h b/db/merge_helper.h index d88f3e356..a65639025 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -114,6 +114,27 @@ class MergeHelper { std::deque operands_; // Parallel with keys_; stores the values }; +// MergeOutputIterator can be used to iterate over the result of a merge. +class MergeOutputIterator { + public: + // The MergeOutputIterator is bound to a MergeHelper instance. + explicit MergeOutputIterator(const MergeHelper* merge_helper); + + // Seeks to the first record in the output. + void SeekToFirst(); + // Advances to the next record in the output. + void Next(); + + Slice key() { return Slice(*it_keys_); } + Slice value() { return Slice(*it_values_); } + bool Valid() { return it_keys_ != merge_helper_->keys().rend(); } + + private: + const MergeHelper* merge_helper_; + std::deque::const_reverse_iterator it_keys_; + std::deque::const_reverse_iterator it_values_; +}; + } // namespace rocksdb #endif diff --git a/db/version_edit.h b/db/version_edit.h index ea3a308b9..5c558409a 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -93,6 +93,8 @@ struct FileMetaData { FileMetaData() : refs(0), being_compacted(false), + smallest_seqno(kMaxSequenceNumber), + largest_seqno(0), table_reader_handle(nullptr), compensated_file_size(0), num_entries(0), @@ -101,6 +103,17 @@ struct FileMetaData { raw_value_size(0), init_stats_from_file(false), marked_for_compaction(false) {} + + // REQUIRED: Keys must be given to the function in sorted order (it expects + // the last key to be the largest). + void UpdateBoundaries(const Slice& key, SequenceNumber seqno) { + if (smallest.size() == 0) { + smallest.DecodeFrom(key); + } + largest.DecodeFrom(key); + smallest_seqno = std::min(smallest_seqno, seqno); + largest_seqno = std::max(largest_seqno, seqno); + } }; // A compressed copy of file meta data that just contain @@ -179,7 +192,12 @@ class VersionEdit { f.smallest_seqno = smallest_seqno; f.largest_seqno = largest_seqno; f.marked_for_compaction = marked_for_compaction; - new_files_.push_back(std::make_pair(level, f)); + new_files_.emplace_back(level, f); + } + + void AddFile(int level, const FileMetaData& f) { + assert(f.smallest_seqno <= f.largest_seqno); + new_files_.emplace_back(level, f); } // Delete the specified "file" from the specified "level". diff --git a/src.mk b/src.mk index cd0849d57..22b8ff9fa 100644 --- a/src.mk +++ b/src.mk @@ -5,6 +5,7 @@ LIB_SOURCES = \ db/column_family.cc \ db/compacted_db_impl.cc \ db/compaction.cc \ + db/compaction_iterator.cc \ db/compaction_job.cc \ db/compaction_picker.cc \ db/convenience.cc \