From 4753e5a2e925f3f1393f13da3b3bd25cd994d479 Mon Sep 17 00:00:00 2001 From: sherriiiliu <79488180+sherriiiliu@users.noreply.github.com> Date: Thu, 11 Aug 2022 13:55:28 -0700 Subject: [PATCH] Fix wrong value passed in compaction filter in BlobDB (#10391) Summary: New blobdb has a bug in compaction filter, where `blob_value_` is not reset for next iterated key. This will cause blob_value_ not empty and previous value read from blob is passed into the filter function for next key, even if its value is not in blob. Fixed by reseting regardless of key type. Test Case: Add `FilterByValueLength` test case in `DBBlobCompactionTest` Pull Request resolved: https://github.com/facebook/rocksdb/pull/10391 Reviewed By: riversand963 Differential Revision: D38629900 Pulled By: ltamasi fbshipit-source-id: 47d23ff2e5ec697958a210db9e6ceeb8b2fc49fa --- db/blob/db_blob_compaction_test.cc | 69 ++++++++++++++++++++++++++++ db/compaction/compaction_iterator.cc | 2 +- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/db/blob/db_blob_compaction_test.cc b/db/blob/db_blob_compaction_test.cc index eac93f232..f3fe3c03b 100644 --- a/db/blob/db_blob_compaction_test.cc +++ b/db/blob/db_blob_compaction_test.cc @@ -54,6 +54,26 @@ class FilterByKeyLength : public CompactionFilter { size_t length_threshold_; }; +class FilterByValueLength : public CompactionFilter { + public: + explicit FilterByValueLength(size_t len) : length_threshold_(len) {} + const char* Name() const override { + return "rocksdb.compaction.filter.by.value.length"; + } + CompactionFilter::Decision FilterV2( + int /*level*/, const Slice& /*key*/, ValueType /*value_type*/, + const Slice& existing_value, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (existing_value.size() < length_threshold_) { + return CompactionFilter::Decision::kRemove; + } + return CompactionFilter::Decision::kKeep; + } + + private: + size_t length_threshold_; +}; + class BadBlobCompactionFilter : public CompactionFilter { public: explicit BadBlobCompactionFilter(std::string prefix, @@ -243,6 +263,55 @@ TEST_F(DBBlobCompactionTest, FilterByKeyLength) { Close(); } +TEST_F(DBBlobCompactionTest, FilterByValueLength) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 5; + options.create_if_missing = true; + constexpr size_t kValueLength = 5; + std::unique_ptr compaction_filter_guard( + new FilterByValueLength(kValueLength)); + options.compaction_filter = compaction_filter_guard.get(); + + const std::vector short_value_keys = {"a", "e", "j"}; + constexpr char short_value[] = "val"; + const std::vector long_value_keys = {"b", "f", "k"}; + constexpr char long_value[] = "valuevalue"; + + DestroyAndReopen(options); + for (size_t i = 0; i < short_value_keys.size(); ++i) { + ASSERT_OK(Put(short_value_keys[i], short_value)); + } + for (size_t i = 0; i < short_value_keys.size(); ++i) { + ASSERT_OK(Put(long_value_keys[i], long_value)); + } + ASSERT_OK(Flush()); + CompactRangeOptions cro; + ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); + std::string value; + for (size_t i = 0; i < short_value_keys.size(); ++i) { + ASSERT_TRUE( + db_->Get(ReadOptions(), short_value_keys[i], &value).IsNotFound()); + value.clear(); + } + for (size_t i = 0; i < long_value_keys.size(); ++i) { + ASSERT_OK(db_->Get(ReadOptions(), long_value_keys[i], &value)); + ASSERT_EQ(long_value, value); + } + +#ifndef ROCKSDB_LITE + const auto& compaction_stats = GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + + // Filter decides between kKeep and kRemove based on value; + // this involves reading but not writing blobs + ASSERT_GT(compaction_stats[1].bytes_read_blob, 0); + ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); +#endif // ROCKSDB_LITE + + Close(); +} + #ifndef ROCKSDB_LITE TEST_F(DBBlobCompactionTest, BlobCompactWithStartingLevel) { Options options = GetDefaultOptions(); diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 6b0db5e06..cfdd0b033 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -222,7 +222,6 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, { StopWatchNano timer(clock_, report_detailed_time_); if (kTypeBlobIndex == ikey_.type) { - blob_value_.Reset(); filter = compaction_filter_->FilterBlobByKey( level_, filter_key, &compaction_filter_value_, compaction_filter_skip_until_.rep()); @@ -367,6 +366,7 @@ void CompactionIterator::NextFromInput() { !IsShuttingDown()) { key_ = input_.key(); value_ = input_.value(); + blob_value_.Reset(); iter_stats_.num_input_records++; Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);