diff --git a/HISTORY.md b/HISTORY.md index 91e3e3bee..842a9f3ec 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### New Features +* CompactionFilter has new member function called IgnoreSnapshots which allows CompactionFilter to be called even if there are snapshots later than the key. * RocksDB will now persist options under the same directory as the RocksDB database on successful DB::Open, CreateColumnFamily, DropColumnFamily, and SetOptions. * Introduce LoadLatestOptions() in rocksdb/utilities/options_util.h. This function can construct the latest DBOptions / ColumnFamilyOptions used by the specified RocksDB intance. * Introduce CheckOptionsCompatibility() in rocksdb/utilities/options_util.h. This function checks whether the input set of options is able to open the specified DB successfully. diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 278c1cd75..8d0dc7f62 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -42,6 +42,11 @@ CompactionIterator::CompactionIterator( earliest_snapshot_ = snapshots_->at(0); latest_snapshot_ = snapshots_->back(); } + if (compaction_filter_ != nullptr && compaction_filter_->IgnoreSnapshots()) { + ignore_snapshots_ = true; + } else { + ignore_snapshots_ = false; + } } void CompactionIterator::ResetRecordCounts() { @@ -140,7 +145,8 @@ void CompactionIterator::NextFromInput() { current_user_key_snapshot_ = 0; // apply the compaction filter to the first occurrence of the user key if (compaction_filter_ != nullptr && ikey_.type == kTypeValue && - (visible_at_tip_ || ikey_.sequence > latest_snapshot_)) { + (visible_at_tip_ || ikey_.sequence > latest_snapshot_ || + ignore_snapshots_)) { // If the user has specified a compaction filter and the sequence // number is greater than any external snapshot, then invoke the // filter. If the return value of the compaction filter is true, @@ -170,6 +176,9 @@ void CompactionIterator::NextFromInput() { } else { // Update the current key to reflect the new sequence number/type without // copying the user key. + // TODO(rven): Compaction filter does not process keys in this path + // Need to have the compaction filter process multiple versions + // if we have versions on both sides of a snapshot current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); key_ = current_key_.GetKey(); ikey_.user_key = current_key_.GetUserKey(); diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index bd256439c..fdb8a7bf5 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -98,6 +98,7 @@ class CompactionIterator { SequenceNumber visible_at_tip_; SequenceNumber earliest_snapshot_; SequenceNumber latest_snapshot_; + bool ignore_snapshots_; // State // diff --git a/db/db_compaction_filter_test.cc b/db/db_compaction_filter_test.cc index e99db81f7..2b9a1a0df 100644 --- a/db/db_compaction_filter_test.cc +++ b/db/db_compaction_filter_test.cc @@ -47,6 +47,24 @@ class DeleteFilter : public CompactionFilter { virtual const char* Name() const override { return "DeleteFilter"; } }; +class DeleteISFilter : public CompactionFilter { + public: + virtual bool Filter(int level, const Slice& key, const Slice& value, + std::string* new_value, + bool* value_changed) const override { + cfilter_count++; + int i = std::stoi(key.ToString()); + if (i > 5 && i <= 105) { + return true; + } + return false; + } + + virtual bool IgnoreSnapshots() const override { return true; } + + virtual const char* Name() const override { return "DeleteFilter"; } +}; + class DelayFilter : public CompactionFilter { public: explicit DelayFilter(DBTestBase* d) : db_test(d) {} @@ -141,6 +159,21 @@ class DeleteFilterFactory : public CompactionFilterFactory { virtual const char* Name() const override { return "DeleteFilterFactory"; } }; +// Delete Filter Factory which ignores snapshots +class DeleteISFilterFactory : public CompactionFilterFactory { + public: + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) override { + if (context.is_manual_compaction) { + return std::unique_ptr(new DeleteISFilter()); + } else { + return std::unique_ptr(nullptr); + } + } + + virtual const char* Name() const override { return "DeleteFilterFactory"; } +}; + class DelayFilterFactory : public CompactionFilterFactory { public: explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {} @@ -620,6 +653,68 @@ TEST_F(DBTestCompactionFilter, CompactionFilterSnapshot) { ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_EQ(0U, CountLiveFiles()); } + +// Compaction filters should only be applied to records that are newer than the +// latest snapshot. However, if the compaction filter asks to ignore snapshots +// records newer than the snapshot will also be processed +TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) { + std::string five = ToString(5); + Options options; + options.compaction_filter_factory = std::make_shared(); + options.disable_auto_compactions = true; + options.create_if_missing = true; + options = CurrentOptions(options); + DestroyAndReopen(options); + + // Put some data. + const Snapshot* snapshot = nullptr; + for (int table = 0; table < 4; ++table) { + for (int i = 0; i < 10; ++i) { + Put(ToString(table * 100 + i), "val"); + } + Flush(); + + if (table == 0) { + snapshot = db_->GetSnapshot(); + } + } + assert(snapshot != nullptr); + + cfilter_count = 0; + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // The filter should delete 40 records. + ASSERT_EQ(40U, cfilter_count); + + { + // Scan the entire database as of the snapshot to ensure + // that nothing is left + ReadOptions read_options; + read_options.snapshot = snapshot; + std::unique_ptr iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + int count = 0; + while (iter->Valid()) { + count++; + iter->Next(); + } + ASSERT_EQ(count, 6); + read_options.snapshot = 0; + std::unique_ptr iter1(db_->NewIterator(read_options)); + iter1->SeekToFirst(); + count = 0; + while (iter1->Valid()) { + count++; + iter1->Next(); + } + // We have deleted 10 keys from 40 using the compaction filter + // Keys 6-9 before the snapshot and 100-105 after the snapshot + ASSERT_EQ(count, 30); + } + + // Release the snapshot and compact again -> now all records should be + // removed. + db_->ReleaseSnapshot(snapshot); +} #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 89558f208..3bb250e9c 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -98,6 +98,16 @@ class CompactionFilter { return false; } + // By default, compaction will only call Filter() on keys written after the + // most recent call to GetSnapshot(). However, if the compaction filter + // overrides IgnoreSnapshots to make it return false, the compaction filter + // will be called even if the keys were written before the last snapshot. + // This behavior is to be used only when we want to delete a set of keys + // irrespective of snapshots. In particular, care should be taken + // to understand that the values of thesekeys will change even if we are + // using a snapshot. + virtual bool IgnoreSnapshots() const { return false; } + // Returns a name that identifies this compaction filter. // The name will be printed to LOG file on start up for diagnosis. virtual const char* Name() const = 0;