diff --git a/db/blob/db_blob_compaction_test.cc b/db/blob/db_blob_compaction_test.cc index eac93f232..f3fe3c03b 100644 --- a/db/blob/db_blob_compaction_test.cc +++ b/db/blob/db_blob_compaction_test.cc @@ -54,6 +54,26 @@ class FilterByKeyLength : public CompactionFilter { size_t length_threshold_; }; +class FilterByValueLength : public CompactionFilter { + public: + explicit FilterByValueLength(size_t len) : length_threshold_(len) {} + const char* Name() const override { + return "rocksdb.compaction.filter.by.value.length"; + } + CompactionFilter::Decision FilterV2( + int /*level*/, const Slice& /*key*/, ValueType /*value_type*/, + const Slice& existing_value, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (existing_value.size() < length_threshold_) { + return CompactionFilter::Decision::kRemove; + } + return CompactionFilter::Decision::kKeep; + } + + private: + size_t length_threshold_; +}; + class BadBlobCompactionFilter : public CompactionFilter { public: explicit BadBlobCompactionFilter(std::string prefix, @@ -243,6 +263,55 @@ TEST_F(DBBlobCompactionTest, FilterByKeyLength) { Close(); } +TEST_F(DBBlobCompactionTest, FilterByValueLength) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 5; + options.create_if_missing = true; + constexpr size_t kValueLength = 5; + std::unique_ptr compaction_filter_guard( + new FilterByValueLength(kValueLength)); + options.compaction_filter = compaction_filter_guard.get(); + + const std::vector short_value_keys = {"a", "e", "j"}; + constexpr char short_value[] = "val"; + const std::vector long_value_keys = {"b", "f", "k"}; + constexpr char long_value[] = "valuevalue"; + + DestroyAndReopen(options); + for (size_t i = 0; i < short_value_keys.size(); ++i) { + ASSERT_OK(Put(short_value_keys[i], short_value)); + } + for (size_t i = 0; i < short_value_keys.size(); ++i) { + ASSERT_OK(Put(long_value_keys[i], long_value)); + } + ASSERT_OK(Flush()); + CompactRangeOptions cro; + ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); + std::string value; + for (size_t i = 0; i < short_value_keys.size(); ++i) { + ASSERT_TRUE( + db_->Get(ReadOptions(), short_value_keys[i], &value).IsNotFound()); + value.clear(); + } + for (size_t i = 0; i < long_value_keys.size(); ++i) { + ASSERT_OK(db_->Get(ReadOptions(), long_value_keys[i], &value)); + ASSERT_EQ(long_value, value); + } + +#ifndef ROCKSDB_LITE + const auto& compaction_stats = GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + + // Filter decides between kKeep and kRemove based on value; + // this involves reading but not writing blobs + ASSERT_GT(compaction_stats[1].bytes_read_blob, 0); + ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); +#endif // ROCKSDB_LITE + + Close(); +} + #ifndef ROCKSDB_LITE TEST_F(DBBlobCompactionTest, BlobCompactWithStartingLevel) { Options options = GetDefaultOptions(); diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 6b0db5e06..cfdd0b033 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -222,7 +222,6 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, { StopWatchNano timer(clock_, report_detailed_time_); if (kTypeBlobIndex == ikey_.type) { - blob_value_.Reset(); filter = compaction_filter_->FilterBlobByKey( level_, filter_key, &compaction_filter_value_, compaction_filter_skip_until_.rep()); @@ -367,6 +366,7 @@ void CompactionIterator::NextFromInput() { !IsShuttingDown()) { key_ = input_.key(); value_ = input_.value(); + blob_value_.Reset(); iter_stats_.num_input_records++; Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);