diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index fc7e56fae..d43651667 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -128,7 +128,7 @@ void CompactionIterator::NextFromInput() { current_user_key_snapshot_ = 0; // apply the compaction filter to the first occurrence of the user key if (compaction_filter_ != nullptr && ikey_.type == kTypeValue && - (visible_at_tip_ || latest_snapshot_)) { + (visible_at_tip_ || ikey_.sequence > latest_snapshot_)) { // If the user has specified a compaction filter and the sequence // number is greater than any external snapshot, then invoke the // filter. If the return value of the compaction filter is true, diff --git a/db/db_compaction_filter_test.cc b/db/db_compaction_filter_test.cc index 34e2a85b6..055c1bb76 100644 --- a/db/db_compaction_filter_test.cc +++ b/db/db_compaction_filter_test.cc @@ -537,6 +537,41 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) { } } +// Compaction filters should only be applied to records that are newer than the +// latest snapshot. This test inserts records and applies a delete filter. +TEST_F(DBTestCompactionFilter, CompactionFilterSnapshot) { + Options options; + options.compaction_filter_factory = std::make_shared(); + options.disable_auto_compactions = true; + options.create_if_missing = true; + options = CurrentOptions(options); + DestroyAndReopen(options); + + // Put some data. + const Snapshot* snapshot; + for (int table = 0; table < 4; ++table) { + for (int i = 0; i < 10; ++i) { + Put(ToString(table * 100 + i), "val"); + } + Flush(); + + if (table == 0) { + snapshot = db_->GetSnapshot(); + } + } + + cfilter_count = 0; + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // The filter should delete 10 records. + ASSERT_EQ(30U, cfilter_count); + + // Release the snapshot and compact again -> now all records should be + // removed. + db_->ReleaseSnapshot(snapshot); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ(0U, CountLiveFiles()); +} + } // namespace rocksdb int main(int argc, char** argv) {