diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 2165044a9..97a24c5ca 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -210,29 +210,15 @@ Status CompactionJob::Run() { ThreadStatus::STAGE_COMPACTION_RUN); TEST_SYNC_POINT("CompactionJob::Run():Start"); log_buffer_->FlushBufferToLog(); - ColumnFamilyData* cfd = compact_->compaction->column_family_data(); auto* compaction = compact_->compaction; - LogCompaction(cfd, compaction); + LogCompaction(compaction->column_family_data(), compaction); + int64_t imm_micros = 0; // Micros spent doing imm_ compactions const uint64_t start_micros = env_->NowMicros(); - std::unique_ptr input( - versions_->MakeInputIterator(compact_->compaction)); - input->SeekToFirst(); - int64_t imm_micros = 0; // Micros spent doing imm_ compactions + std::unique_ptr input(versions_->MakeInputIterator(compaction)); + input->SeekToFirst(); auto status = ProcessKeyValueCompaction(&imm_micros, input.get()); - - if (status.ok() && - (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) { - status = Status::ShutdownInProgress( - "Database shutdown or Column family drop during compaction"); - } - if (status.ok() && compact_->builder != nullptr) { - status = FinishCompactionOutputFile(input->status()); - } - if (status.ok()) { - status = input->status(); - } input.reset(); if (output_directory_ && !db_options_.disableDataSync) { @@ -331,7 +317,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, false /* internal key corruption is expected */); auto compaction_filter = cfd->ioptions()->compaction_filter; std::unique_ptr compaction_filter_from_factory = nullptr; - if (!compaction_filter) { + if (compaction_filter == nullptr) { compaction_filter_from_factory = compact_->compaction->CreateCompactionFilter(); compaction_filter = compaction_filter_from_factory.get(); @@ -346,6 +332,9 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, StopWatchNano timer(env_, stats_ != nullptr); uint64_t total_filter_time = 0; + + // TODO(noetzli): check whether we could check !shutting_down_->... only + // only occasionally (see diff D42687) while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) && !cfd->IsDropped() && status.ok()) { compact_->num_input_records++; @@ -360,10 +349,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, Slice value = input->value(); if (compaction_job_stats_ != nullptr) { - compaction_job_stats_->total_input_raw_key_bytes += - input->key().size(); - compaction_job_stats_->total_input_raw_value_bytes += - input->value().size(); + compaction_job_stats_->total_input_raw_key_bytes += key.size(); + compaction_job_stats_->total_input_raw_value_bytes += value.size(); } if (compact_->compaction->ShouldStopBefore(key) && @@ -375,8 +362,6 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, } // Handle key/value, add to state, etc. - bool drop = false; - bool current_entry_is_merging = false; if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys // TODO: error key stays in db forever? Figure out the intention/rationale @@ -389,177 +374,157 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, if (compaction_job_stats_ != nullptr) { compaction_job_stats_->num_corrupt_keys++; } - } else { - if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) { - compaction_job_stats_->num_input_deletion_records++; - } - if (!has_current_user_key || - cfd->user_comparator()->Compare(ikey.user_key, - current_user_key.GetKey()) != 0) { - // First occurrence of this user key - current_user_key.SetKey(ikey.user_key); - has_current_user_key = true; - last_sequence_for_key = kMaxSequenceNumber; - visible_in_snapshot = kMaxSequenceNumber; - // apply the compaction filter to the first occurrence of the user key - if (compaction_filter && ikey.type == kTypeValue && - (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { - // If the user has specified a compaction filter and the sequence - // number is greater than any external snapshot, then invoke the - // filter. - // If the return value of the compaction filter is true, replace - // the entry with a delete marker. - bool value_changed = false; - compaction_filter_value.clear(); - if (stats_ != nullptr) { - timer.Start(); - } - bool to_delete = compaction_filter->Filter( - compact_->compaction->level(), ikey.user_key, value, - &compaction_filter_value, &value_changed); - total_filter_time += timer.ElapsedNanos(); - if (to_delete) { - // make a copy of the original key and convert it to a delete - delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence, - kTypeDeletion); - // anchor the key again - key = delete_key.GetKey(); - // needed because ikey is backed by key - ParseInternalKey(key, &ikey); - // no value associated with delete - value.clear(); - ++key_drop_user; - } else if (value_changed) { - value = compaction_filter_value; - } - } - } + status = WriteKeyValue(key, value, ikey, input->status()); + input->Next(); + continue; + } - // If there are no snapshots, then this kv affect visibility at tip. - // Otherwise, search though all existing snapshots to find - // the earlist snapshot that is affected by this kv. - SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot - SequenceNumber visible = - visible_at_tip_ - ? visible_at_tip_ - : findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_, - &prev_snapshot); - - if (visible_in_snapshot == visible) { - // If the earliest snapshot is which this key is visible in - // is the same as the visibily of a previous instance of the - // same key, then this kv is not visible in any snapshot. - // Hidden by an newer entry for same user key - // TODO: why not > ? - assert(last_sequence_for_key >= ikey.sequence); - drop = true; // (A) - ++key_drop_newer_entry; - } else if (ikey.type == kTypeDeletion && - ikey.sequence <= earliest_snapshot_ && - compact_->compaction->KeyNotExistsBeyondOutputLevel( - ikey.user_key)) { - // For this user key: - // (1) there is no data in higher levels - // (2) data in lower levels will have larger sequence numbers - // (3) data in layers that are being compacted here and have - // smaller sequence numbers will be dropped in the next - // few iterations of this loop (by rule (A) above). - // Therefore this deletion marker is obsolete and can be dropped. - drop = true; - ++key_drop_obsolete; - } else if (ikey.type == kTypeMerge) { - if (!merge.HasOperator()) { - LogToBuffer(log_buffer_, "Options::merge_operator is null."); - status = Status::InvalidArgument( - "merge_operator is not properly initialized."); - break; + if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) { + compaction_job_stats_->num_input_deletion_records++; + } + + if (!has_current_user_key || + cfd->user_comparator()->Compare(ikey.user_key, + current_user_key.GetKey()) != 0) { + // First occurrence of this user key + current_user_key.SetKey(ikey.user_key); + has_current_user_key = true; + last_sequence_for_key = kMaxSequenceNumber; + visible_in_snapshot = kMaxSequenceNumber; + // apply the compaction filter to the first occurrence of the user key + if (compaction_filter && ikey.type == kTypeValue && + (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { + // If the user has specified a compaction filter and the sequence + // number is greater than any external snapshot, then invoke the + // filter. + // If the return value of the compaction filter is true, replace + // the entry with a delete marker. + bool value_changed = false; + compaction_filter_value.clear(); + if (stats_ != nullptr) { + timer.Start(); } - // We know the merge type entry is not hidden, otherwise we would - // have hit (A) - // We encapsulate the merge related state machine in a different - // object to minimize change to the existing flow. Turn out this - // logic could also be nicely re-used for memtable flush purge - // optimization in BuildTable. - merge.MergeUntil(input, prev_snapshot, bottommost_level_, - db_options_.statistics.get(), nullptr, env_); - - current_entry_is_merging = true; - if (merge.IsSuccess()) { - // Successfully found Put/Delete/(end-of-key-range) while merging - // Get the merge result - key = merge.key(); + bool to_delete = compaction_filter->Filter( + compact_->compaction->level(), ikey.user_key, value, + &compaction_filter_value, &value_changed); + total_filter_time += timer.ElapsedNanos(); + if (to_delete) { + // make a copy of the original key and convert it to a delete + delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence, + kTypeDeletion); + // anchor the key again + key = delete_key.GetKey(); + // needed because ikey is backed by key ParseInternalKey(key, &ikey); - value = merge.value(); - } else { - // Did not find a Put/Delete/(end-of-key-range) while merging - // We now have some stack of merge operands to write out. - // NOTE: key,value, and ikey are now referring to old entries. - // These will be correctly set below. - assert(!merge.keys().empty()); - assert(merge.keys().size() == merge.values().size()); - - // Hack to make sure last_sequence_for_key is correct - ParseInternalKey(merge.keys().front(), &ikey); + // no value associated with delete + value.clear(); + ++key_drop_user; + } else if (value_changed) { + value = compaction_filter_value; } } - - last_sequence_for_key = ikey.sequence; - visible_in_snapshot = visible; } - if (!drop) { - // We may write a single key (e.g.: for Put/Delete or successful merge). - // Or we may instead have to write a sequence/list of keys. - // We have to write a sequence iff we have an unsuccessful merge - if (current_entry_is_merging && !merge.IsSuccess()) { + // If there are no snapshots, then this kv affect visibility at tip. + // Otherwise, search though all existing snapshots to find + // the earlist snapshot that is affected by this kv. + SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot + SequenceNumber visible = + visible_at_tip_ + ? visible_at_tip_ + : findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_, + &prev_snapshot); + + if (visible_in_snapshot == visible) { + // If the earliest snapshot is which this key is visible in + // is the same as the visibily of a previous instance of the + // same key, then this kv is not visible in any snapshot. + // Hidden by an newer entry for same user key + // TODO: why not > ? + assert(last_sequence_for_key >= ikey.sequence); + ++key_drop_newer_entry; + input->Next(); // (A) + } else if (ikey.type == kTypeDeletion && + ikey.sequence <= earliest_snapshot_ && + compact_->compaction->KeyNotExistsBeyondOutputLevel( + ikey.user_key)) { + // For this user key: + // (1) there is no data in higher levels + // (2) data in lower levels will have larger sequence numbers + // (3) data in layers that are being compacted here and have + // smaller sequence numbers will be dropped in the next + // few iterations of this loop (by rule (A) above). + // Therefore this deletion marker is obsolete and can be dropped. + ++key_drop_obsolete; + input->Next(); + } else if (ikey.type == kTypeMerge) { + if (!merge.HasOperator()) { + LogToBuffer(log_buffer_, "Options::merge_operator is null."); + status = Status::InvalidArgument( + "merge_operator is not properly initialized."); + break; + } + // We know the merge type entry is not hidden, otherwise we would + // have hit (A) + // We encapsulate the merge related state machine in a different + // object to minimize change to the existing flow. Turn out this + // logic could also be nicely re-used for memtable flush purge + // optimization in BuildTable. + merge.MergeUntil(input, prev_snapshot, bottommost_level_, + db_options_.statistics.get(), env_); + + if (merge.IsSuccess()) { + // Successfully found Put/Delete/(end-of-key-range) while merging + // Get the merge result + key = merge.key(); + ParseInternalKey(key, &ikey); + status = WriteKeyValue(key, merge.value(), ikey, input->status()); + } else { + // Did not find a Put/Delete/(end-of-key-range) while merging + // We now have some stack of merge operands to write out. + // NOTE: key,value, and ikey are now referring to old entries. + // These will be correctly set below. const auto& keys = merge.keys(); const auto& values = merge.values(); - std::deque::const_reverse_iterator key_iter = - keys.rbegin(); // The back (*rbegin()) is the first key - std::deque::const_reverse_iterator value_iter = - values.rbegin(); - - key = Slice(*key_iter); - value = Slice(*value_iter); - - // We have a list of keys to write, traverse the list. - while (true) { - status = WriteKeyValue(key, value, ikey, input->status()); - if (!status.ok()) { - break; - } - - ++key_iter; - ++value_iter; + assert(!keys.empty()); + assert(keys.size() == values.size()); - // If at end of list - if (key_iter == keys.rend() || value_iter == values.rend()) { - // Sanity Check: if one ends, then both end - assert(key_iter == keys.rend() && value_iter == values.rend()); - break; - } - - // Otherwise not at end of list. Update key, value, and ikey. + // We have a list of keys to write, write all keys in the list. + for (auto key_iter = keys.rbegin(), value_iter = values.rbegin(); + !status.ok() || key_iter != keys.rend(); + key_iter++, value_iter++) { key = Slice(*key_iter); value = Slice(*value_iter); ParseInternalKey(key, &ikey); + status = WriteKeyValue(key, value, ikey, input->status()); } - } else { - // There is only one item to be written out - status = WriteKeyValue(key, value, ikey, input->status()); } - } // if (!drop) - - // MergeUntil has moved input to the next entry - if (!current_entry_is_merging) { + } else { + status = WriteKeyValue(key, value, ikey, input->status()); input->Next(); } + + last_sequence_for_key = ikey.sequence; + visible_in_snapshot = visible; } + RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete); RecordCompactionIOStats(); + if (status.ok() && + (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) { + status = Status::ShutdownInProgress( + "Database shutdown or Column family drop during compaction"); + } + if (status.ok() && compact_->builder != nullptr) { + status = FinishCompactionOutputFile(input->status()); + } + if (status.ok()) { + status = input->status(); + } + return status; } diff --git a/db/merge_helper.cc b/db/merge_helper.cc index cf58c6812..c8a4b140c 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -58,8 +58,8 @@ Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value, // keys_ stores the list of keys encountered while merging. // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. -void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, - bool at_bottom, Statistics* stats, int* steps, +void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, + const bool at_bottom, Statistics* stats, Env* env_) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. @@ -81,9 +81,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, ParseInternalKey(keys_.back(), &orig_ikey); bool hit_the_next_user_key = false; - if (steps) { - ++(*steps); - } for (iter->Next(); iter->Valid(); iter->Next()) { ParsedInternalKey ikey; assert(operands_.size() >= 1); // Should be invariants! @@ -138,9 +135,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // move iter to the next entry iter->Next(); - if (steps) { - ++(*steps); - } return; } else { // hit a merge @@ -153,9 +147,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // request or later did a partial merge. keys_.push_front(iter->key().ToString()); operands_.push_front(iter->value().ToString()); - if (steps) { - ++(*steps); - } } } diff --git a/db/merge_helper.h b/db/merge_helper.h index 7722446dd..8ad6acc07 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -6,11 +6,12 @@ #ifndef MERGE_HELPER_H #define MERGE_HELPER_H -#include "db/dbformat.h" -#include "rocksdb/slice.h" -#include #include +#include + +#include "db/dbformat.h" #include "rocksdb/env.h" +#include "rocksdb/slice.h" namespace rocksdb { @@ -56,9 +57,9 @@ class MergeHelper { // 0 means no restriction // at_bottom: (IN) true if the iterator covers the bottem level, which means // we could reach the start of the history of this user key. - void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0, - bool at_bottom = false, Statistics* stats = nullptr, - int* steps = nullptr, Env* env_ = nullptr); + void MergeUntil(Iterator* iter, const SequenceNumber stop_before = 0, + const bool at_bottom = false, Statistics* stats = nullptr, + Env* env_ = nullptr); // Query the merge result // These are valid until the next MergeUntil call diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc index a1ac91014..1dd17fb5f 100644 --- a/db/merge_helper_test.cc +++ b/db/merge_helper_test.cc @@ -18,7 +18,7 @@ namespace rocksdb { class MergeHelperTest : public testing::Test { public: - MergeHelperTest() : steps_(0) {} + MergeHelperTest() = default; ~MergeHelperTest() = default; void RunUInt64MergeHelper(SequenceNumber stop_before, bool at_bottom) { @@ -27,7 +27,7 @@ class MergeHelperTest : public testing::Test { merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), nullptr, 2U, true)); merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr, - &steps_, Env::Default()); + Env::Default()); } void RunStringAppendMergeHelper(SequenceNumber stop_before, bool at_bottom) { @@ -36,7 +36,7 @@ class MergeHelperTest : public testing::Test { merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), nullptr, 2U, true)); merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr, - &steps_, Env::Default()); + Env::Default()); } std::string Key(const std::string& user_key, const SequenceNumber& seq, @@ -63,9 +63,8 @@ class MergeHelperTest : public testing::Test { return result; } - void CheckState(bool success, int steps, int iter_pos) { + void CheckState(bool success, int iter_pos) { ASSERT_EQ(success, merge_helper_->IsSuccess()); - ASSERT_EQ(steps, steps_); if (iter_pos == -1) { ASSERT_FALSE(iter_->Valid()); } else { @@ -78,7 +77,6 @@ class MergeHelperTest : public testing::Test { std::unique_ptr merge_helper_; std::vector ks_; std::vector vs_; - int steps_; }; // If MergeHelper encounters a new key on the last level, we know that @@ -89,7 +87,7 @@ TEST_F(MergeHelperTest, MergeAtBottomSuccess) { AddKeyVal("b", 10, kTypeMerge, EncodeInt(4U)); // <- Iterator after merge RunUInt64MergeHelper(0, true); - CheckState(true, 2, 2); + CheckState(true, 2); ASSERT_EQ(Key("a", 20, kTypeValue), merge_helper_->key()); ASSERT_EQ(EncodeInt(4U), merge_helper_->value()); } @@ -102,7 +100,7 @@ TEST_F(MergeHelperTest, MergeValue) { AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); RunUInt64MergeHelper(0, false); - CheckState(true, 3, 3); + CheckState(true, 3); ASSERT_EQ(Key("a", 40, kTypeValue), merge_helper_->key()); ASSERT_EQ(EncodeInt(8U), merge_helper_->value()); } @@ -116,7 +114,7 @@ TEST_F(MergeHelperTest, SnapshotBeforeValue) { AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); RunUInt64MergeHelper(31, true); - CheckState(false, 2, 2); + CheckState(false, 2); ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); } @@ -129,7 +127,7 @@ TEST_F(MergeHelperTest, NoPartialMerge) { AddKeyVal("a", 30, kTypeMerge, "v"); RunStringAppendMergeHelper(31, true); - CheckState(false, 2, 2); + CheckState(false, 2); ASSERT_EQ(Key("a", 40, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ("v", merge_helper_->values()[0]); ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[1]); @@ -142,7 +140,7 @@ TEST_F(MergeHelperTest, MergeDeletion) { AddKeyVal("a", 20, kTypeDeletion, ""); RunUInt64MergeHelper(15, false); - CheckState(true, 2, -1); + CheckState(true, -1); ASSERT_EQ(Key("a", 30, kTypeValue), merge_helper_->key()); ASSERT_EQ(EncodeInt(3U), merge_helper_->value()); }