diff --git a/db/db_impl.cc b/db/db_impl.cc index 3e5f962c8..90a6cae51 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1641,12 +1641,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { SequenceNumber visible_at_tip = 0; SequenceNumber earliest_snapshot; + SequenceNumber latest_snapshot = 0; snapshots_.getAll(compact->existing_snapshots); if (compact->existing_snapshots.size() == 0) { // optimize for fast path if there are no snapshots visible_at_tip = versions_->LastSequence(); earliest_snapshot = visible_at_tip; } else { + latest_snapshot = compact->existing_snapshots.back(); // Add the current seqno as the 'latest' virtual // snapshot to the end of this list. compact->existing_snapshots.push_back(versions_->LastSequence()); @@ -1680,6 +1682,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { kMaxSequenceNumber; SequenceNumber visible_in_snapshot = kMaxSequenceNumber; std::string compaction_filter_value; + std::vector delete_key; // for compaction filter MergeHelper merge(user_comparator(), options_.merge_operator, options_.info_log.get(), false /* internal key corruption is expected */); @@ -1727,6 +1730,41 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { has_current_user_key = true; last_sequence_for_key = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber; + + // apply the compaction filter to the first occurrence of the user key + if (options_.compaction_filter && + ikey.type == kTypeValue && + (visible_at_tip || ikey.sequence > latest_snapshot)) { + // If the user has specified a compaction filter and the sequence + // number is greater than any external snapshot, then invoke the + // filter. + // If the return value of the compaction filter is true, replace + // the entry with a delete marker. + bool value_changed = false; + compaction_filter_value.clear(); + bool to_delete = + options_.compaction_filter->Filter(compact->compaction->level(), + ikey.user_key, value, + &compaction_filter_value, + &value_changed); + if (to_delete) { + // make a copy of the original key + delete_key.assign(key.data(), key.data() + key.size()); + // convert it to a delete + UpdateInternalKey(&delete_key[0], delete_key.size(), + ikey.sequence, kTypeDeletion); + // anchor the key again + key = Slice(&delete_key[0], delete_key.size()); + // needed because ikey is backed by key + ParseInternalKey(key, &ikey); + // no value associated with delete + value.clear(); + RecordTick(options_.statistics, COMPACTION_KEY_DROP_USER); + } else if (value_changed) { + value = compaction_filter_value; + } + } + } // If there are no snapshots, then this kv affect visibility at tip. @@ -1773,31 +1811,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { key = merge.key(); ParseInternalKey(key, &ikey); value = merge.value(); - } else if (options_.compaction_filter && - ikey.type != kTypeDeletion && - visible_at_tip) { - // If the user has specified a compaction filter and there are no - // outstanding external snapshots, then invoke the filter. - // If the return value of the compaction filter is true, - // drop this key from the output. - assert(ikey.type == kTypeValue); - assert(!drop); - bool value_changed = false; - compaction_filter_value.clear(); - drop = options_.compaction_filter->Filter(compact->compaction->level(), - ikey.user_key, value, - &compaction_filter_value, - &value_changed); - // Another example of statistics update without holding the lock - // TODO: clean it up - if (drop) { - RecordTick(options_.statistics, COMPACTION_KEY_DROP_USER); - } - - // If the application wants to change the value, then do so here. - if (value_changed) { - value = compaction_filter_value; - } } last_sequence_for_key = ikey.sequence;