Fix wrong value passed in compaction filter in BlobDB (#10391)

Summary:
New blobdb has a bug in compaction filter, where `blob_value_` is not reset for next iterated key. This will cause blob_value_ not empty and previous value read from blob is passed into the filter function for next key, even if its value is not in blob. Fixed by reseting regardless of key type.

Test Case:
Add `FilterByValueLength` test case in `DBBlobCompactionTest`

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10391

Reviewed By: riversand963

Differential Revision: D38629900

Pulled By: ltamasi

fbshipit-source-id: 47d23ff2e5ec697958a210db9e6ceeb8b2fc49fa
main
sherriiiliu 2 years ago committed by Facebook GitHub Bot
parent f42fec2fab
commit 4753e5a2e9
  1. 69
      db/blob/db_blob_compaction_test.cc
  2. 2
      db/compaction/compaction_iterator.cc

@ -54,6 +54,26 @@ class FilterByKeyLength : public CompactionFilter {
size_t length_threshold_; size_t length_threshold_;
}; };
class FilterByValueLength : public CompactionFilter {
public:
explicit FilterByValueLength(size_t len) : length_threshold_(len) {}
const char* Name() const override {
return "rocksdb.compaction.filter.by.value.length";
}
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
const Slice& existing_value, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
if (existing_value.size() < length_threshold_) {
return CompactionFilter::Decision::kRemove;
}
return CompactionFilter::Decision::kKeep;
}
private:
size_t length_threshold_;
};
class BadBlobCompactionFilter : public CompactionFilter { class BadBlobCompactionFilter : public CompactionFilter {
public: public:
explicit BadBlobCompactionFilter(std::string prefix, explicit BadBlobCompactionFilter(std::string prefix,
@ -243,6 +263,55 @@ TEST_F(DBBlobCompactionTest, FilterByKeyLength) {
Close(); Close();
} }
TEST_F(DBBlobCompactionTest, FilterByValueLength) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 5;
options.create_if_missing = true;
constexpr size_t kValueLength = 5;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new FilterByValueLength(kValueLength));
options.compaction_filter = compaction_filter_guard.get();
const std::vector<std::string> short_value_keys = {"a", "e", "j"};
constexpr char short_value[] = "val";
const std::vector<std::string> long_value_keys = {"b", "f", "k"};
constexpr char long_value[] = "valuevalue";
DestroyAndReopen(options);
for (size_t i = 0; i < short_value_keys.size(); ++i) {
ASSERT_OK(Put(short_value_keys[i], short_value));
}
for (size_t i = 0; i < short_value_keys.size(); ++i) {
ASSERT_OK(Put(long_value_keys[i], long_value));
}
ASSERT_OK(Flush());
CompactRangeOptions cro;
ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
std::string value;
for (size_t i = 0; i < short_value_keys.size(); ++i) {
ASSERT_TRUE(
db_->Get(ReadOptions(), short_value_keys[i], &value).IsNotFound());
value.clear();
}
for (size_t i = 0; i < long_value_keys.size(); ++i) {
ASSERT_OK(db_->Get(ReadOptions(), long_value_keys[i], &value));
ASSERT_EQ(long_value, value);
}
#ifndef ROCKSDB_LITE
const auto& compaction_stats = GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);
// Filter decides between kKeep and kRemove based on value;
// this involves reading but not writing blobs
ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
#endif // ROCKSDB_LITE
Close();
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_F(DBBlobCompactionTest, BlobCompactWithStartingLevel) { TEST_F(DBBlobCompactionTest, BlobCompactWithStartingLevel) {
Options options = GetDefaultOptions(); Options options = GetDefaultOptions();

@ -222,7 +222,6 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
{ {
StopWatchNano timer(clock_, report_detailed_time_); StopWatchNano timer(clock_, report_detailed_time_);
if (kTypeBlobIndex == ikey_.type) { if (kTypeBlobIndex == ikey_.type) {
blob_value_.Reset();
filter = compaction_filter_->FilterBlobByKey( filter = compaction_filter_->FilterBlobByKey(
level_, filter_key, &compaction_filter_value_, level_, filter_key, &compaction_filter_value_,
compaction_filter_skip_until_.rep()); compaction_filter_skip_until_.rep());
@ -367,6 +366,7 @@ void CompactionIterator::NextFromInput() {
!IsShuttingDown()) { !IsShuttingDown()) {
key_ = input_.key(); key_ = input_.key();
value_ = input_.value(); value_ = input_.value();
blob_value_.Reset();
iter_stats_.num_input_records++; iter_stats_.num_input_records++;
Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_); Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);

Loading…
Cancel
Save