diff --git a/HISTORY.md b/HISTORY.md index 0637747a0..316e5cb7a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -28,6 +28,9 @@ ### Build Changes * The `make` build now builds a shared library by default instead of a static library. Use `LIB_MODE=static` to override. +### New Features +* Compaction filters are now supported for wide-column entities by means of the `FilterV3` API. See the comment of the API for more details. + ## 7.10.0 (01/23/2023) ### Behavior changes * Make best-efforts recovery verify SST unique ID before Version construction (#10962) diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 00191978f..1f104c4af 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -13,6 +13,7 @@ #include "db/blob/blob_index.h" #include "db/blob/prefetch_buffer_collection.h" #include "db/snapshot_checker.h" +#include "db/wide/wide_column_serialization.h" #include "logging/logging.h" #include "port/likely.h" #include "rocksdb/listener.h" @@ -225,8 +226,8 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, return true; } - // TODO: support compaction filter for wide-column entities - if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex) { + if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex && + ikey_.type != kTypeWideColumnEntity) { return true; } @@ -234,7 +235,9 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, CompactionFilter::Decision::kUndetermined; CompactionFilter::ValueType value_type = ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue - : CompactionFilter::ValueType::kBlobIndex; + : ikey_.type == kTypeBlobIndex + ? CompactionFilter::ValueType::kBlobIndex + : CompactionFilter::ValueType::kWideColumnEntity; // Hack: pass internal key to BlobIndexCompactionFilter since it needs // to get sequence number. @@ -248,6 +251,8 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, compaction_filter_value_.clear(); compaction_filter_skip_until_.Clear(); + std::vector> new_columns; + { StopWatchNano timer(clock_, report_detailed_time_); @@ -303,10 +308,36 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, value_type = CompactionFilter::ValueType::kValue; } } + if (decision == CompactionFilter::Decision::kUndetermined) { - decision = compaction_filter_->FilterV2( - level_, filter_key, value_type, - blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_, + const Slice* existing_val = nullptr; + const WideColumns* existing_col = nullptr; + + WideColumns existing_columns; + + if (ikey_.type != kTypeWideColumnEntity) { + if (!blob_value_.empty()) { + existing_val = &blob_value_; + } else { + existing_val = &value_; + } + } else { + Slice value_copy = value_; + const Status s = + WideColumnSerialization::Deserialize(value_copy, existing_columns); + + if (!s.ok()) { + status_ = s; + validity_info_.Invalidate(); + return false; + } + + existing_col = &existing_columns; + } + + decision = compaction_filter_->FilterV3( + level_, filter_key, value_type, existing_val, existing_col, + &compaction_filter_value_, &new_columns, compaction_filter_skip_until_.rep()); } @@ -315,9 +346,10 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, } if (decision == CompactionFilter::Decision::kUndetermined) { - // Should not reach here, since FilterV2 should never return kUndetermined. - status_ = - Status::NotSupported("FilterV2() should never return kUndetermined"); + // Should not reach here, since FilterV2/FilterV3 should never return + // kUndetermined. + status_ = Status::NotSupported( + "FilterV2/FilterV3 should never return kUndetermined"); validity_info_.Invalidate(); return false; } @@ -326,7 +358,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= 0) { // Can't skip to a key smaller than the current one. - // Keep the key as per FilterV2 documentation. + // Keep the key as per FilterV2/FilterV3 documentation. decision = CompactionFilter::Decision::kKeep; } @@ -388,6 +420,35 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, status_ = Status::IOError("Failed to access blob during compaction filter"); validity_info_.Invalidate(); return false; + } else if (decision == CompactionFilter::Decision::kChangeWideColumnEntity) { + WideColumns sorted_columns; + + sorted_columns.reserve(new_columns.size()); + for (const auto& column : new_columns) { + sorted_columns.emplace_back(column.first, column.second); + } + + std::sort(sorted_columns.begin(), sorted_columns.end(), + [](const WideColumn& lhs, const WideColumn& rhs) { + return lhs.name().compare(rhs.name()) < 0; + }); + + { + const Status s = WideColumnSerialization::Serialize( + sorted_columns, compaction_filter_value_); + if (!s.ok()) { + status_ = s; + validity_info_.Invalidate(); + return false; + } + } + + if (ikey_.type != kTypeWideColumnEntity) { + ikey_.type = kTypeWideColumnEntity; + current_key_.UpdateInternalKey(ikey_.sequence, kTypeWideColumnEntity); + } + + value_ = compaction_filter_value_; } return true; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index e29d9c5ba..fcff8f0ac 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -578,14 +578,15 @@ CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key, } compaction_filter_value_.clear(); compaction_filter_skip_until_.Clear(); - auto ret = compaction_filter_->FilterV2( - level_, user_key, CompactionFilter::ValueType::kMergeOperand, value_slice, - &compaction_filter_value_, compaction_filter_skip_until_.rep()); + auto ret = compaction_filter_->FilterV3( + level_, user_key, CompactionFilter::ValueType::kMergeOperand, + &value_slice, /* existing_columns */ nullptr, &compaction_filter_value_, + /* new_columns */ nullptr, compaction_filter_skip_until_.rep()); if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) { if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(), user_key) <= 0) { // Invalid skip_until returned from compaction filter. - // Keep the key as per FilterV2 documentation. + // Keep the key as per FilterV2/FilterV3 documentation. ret = CompactionFilter::Decision::kKeep; } else { compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, diff --git a/db/wide/db_wide_basic_test.cc b/db/wide/db_wide_basic_test.cc index 1ffe314fe..2c345d464 100644 --- a/db/wide/db_wide_basic_test.cc +++ b/db/wide/db_wide_basic_test.cc @@ -4,6 +4,7 @@ // (found in the LICENSE.Apache file in the root directory). #include +#include #include #include "db/db_test_util.h" @@ -593,6 +594,302 @@ TEST_F(DBWideBasicTest, MergeEntity) { verify_merge_ops_post_compaction(); } +TEST_F(DBWideBasicTest, CompactionFilter) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + + // Wide-column entity with default column + constexpr char first_key[] = "first"; + WideColumns first_columns{{kDefaultWideColumnName, "a"}, + {"attr_name1", "foo"}, + {"attr_name2", "bar"}}; + WideColumns first_columns_uppercase{{kDefaultWideColumnName, "A"}, + {"attr_name1", "FOO"}, + {"attr_name2", "BAR"}}; + + // Wide-column entity without default column + constexpr char second_key[] = "second"; + WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; + WideColumns second_columns_uppercase{{"attr_one", "TWO"}, + {"attr_three", "FOUR"}}; + + // Plain old key-value + constexpr char last_key[] = "last"; + constexpr char last_value[] = "baz"; + constexpr char last_value_uppercase[] = "BAZ"; + + auto write = [&] { + ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), + first_key, first_columns)); + ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), + second_key, second_columns)); + + ASSERT_OK(Flush()); + + ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), last_key, + last_value)); + + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr, + /* end */ nullptr)); + }; + + // Test a compaction filter that keeps all entries + { + class KeepFilter : public CompactionFilter { + public: + Decision FilterV3( + int /* level */, const Slice& /* key */, ValueType /* value_type */, + const Slice* /* existing_value */, + const WideColumns* /* existing_columns */, + std::string* /* new_value */, + std::vector>* /* new_columns */, + std::string* /* skip_until */) const override { + return Decision::kKeep; + } + + const char* Name() const override { return "KeepFilter"; } + }; + + KeepFilter filter; + options.compaction_filter = &filter; + + DestroyAndReopen(options); + + write(); + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &result)); + ASSERT_EQ(result.columns(), first_columns); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &result)); + ASSERT_EQ(result.columns(), second_columns); + } + + // Note: GetEntity should return an entity with a single default column, + // since last_key is a plain key-value + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + last_key, &result)); + + WideColumns expected_columns{{kDefaultWideColumnName, last_value}}; + ASSERT_EQ(result.columns(), expected_columns); + } + } + + // Test a compaction filter that removes all entries + { + class RemoveFilter : public CompactionFilter { + public: + Decision FilterV3( + int /* level */, const Slice& /* key */, ValueType /* value_type */, + const Slice* /* existing_value */, + const WideColumns* /* existing_columns */, + std::string* /* new_value */, + std::vector>* /* new_columns */, + std::string* /* skip_until */) const override { + return Decision::kRemove; + } + + const char* Name() const override { return "RemoveFilter"; } + }; + + RemoveFilter filter; + options.compaction_filter = &filter; + + DestroyAndReopen(options); + + write(); + + { + PinnableWideColumns result; + ASSERT_TRUE(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &result) + .IsNotFound()); + } + + { + PinnableWideColumns result; + ASSERT_TRUE(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &result) + .IsNotFound()); + } + + { + PinnableWideColumns result; + ASSERT_TRUE(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + last_key, &result) + .IsNotFound()); + } + } + + // Test a compaction filter that changes the values of entries to uppercase. + // The new entry is always a plain key-value; if the existing entry is a + // wide-column entity, only the value of its first column is kept. + { + class ChangeValueFilter : public CompactionFilter { + public: + Decision FilterV3( + int /* level */, const Slice& /* key */, ValueType value_type, + const Slice* existing_value, const WideColumns* existing_columns, + std::string* new_value, + std::vector>* /* new_columns */, + std::string* /* skip_until */) const override { + assert(new_value); + + auto upper = [](const std::string& str) { + std::string result(str); + + for (char& c : result) { + c = static_cast(std::toupper(static_cast(c))); + } + + return result; + }; + + if (value_type == ValueType::kWideColumnEntity) { + assert(existing_columns); + + if (!existing_columns->empty()) { + *new_value = upper(existing_columns->front().value().ToString()); + } + } else { + assert(existing_value); + + *new_value = upper(existing_value->ToString()); + } + + return Decision::kChangeValue; + } + + const char* Name() const override { return "ChangeValueFilter"; } + }; + + ChangeValueFilter filter; + options.compaction_filter = &filter; + + DestroyAndReopen(options); + + write(); + + // Note: GetEntity should return entities with a single default column, + // since all entries are now plain key-values + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &result)); + + WideColumns expected_columns{ + {kDefaultWideColumnName, first_columns_uppercase[0].value()}}; + ASSERT_EQ(result.columns(), expected_columns); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &result)); + + WideColumns expected_columns{ + {kDefaultWideColumnName, second_columns_uppercase[0].value()}}; + ASSERT_EQ(result.columns(), expected_columns); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + last_key, &result)); + + WideColumns expected_columns{ + {kDefaultWideColumnName, last_value_uppercase}}; + ASSERT_EQ(result.columns(), expected_columns); + } + } + + // Test a compaction filter that changes the column values of entries to + // uppercase. The new entry is always a wide-column entity; if the existing + // entry is a plain key-value, it is converted to a wide-column entity with a + // single default column. + { + class ChangeEntityFilter : public CompactionFilter { + public: + Decision FilterV3( + int /* level */, const Slice& /* key */, ValueType value_type, + const Slice* existing_value, const WideColumns* existing_columns, + std::string* /* new_value */, + std::vector>* new_columns, + std::string* /* skip_until */) const override { + assert(new_columns); + + auto upper = [](const std::string& str) { + std::string result(str); + + for (char& c : result) { + c = static_cast(std::toupper(static_cast(c))); + } + + return result; + }; + + if (value_type == ValueType::kWideColumnEntity) { + assert(existing_columns); + + for (const auto& column : *existing_columns) { + new_columns->emplace_back(column.name().ToString(), + upper(column.value().ToString())); + } + } else { + assert(existing_value); + + new_columns->emplace_back(kDefaultWideColumnName.ToString(), + upper(existing_value->ToString())); + } + + return Decision::kChangeWideColumnEntity; + } + + const char* Name() const override { return "ChangeEntityFilter"; } + }; + + ChangeEntityFilter filter; + options.compaction_filter = &filter; + + DestroyAndReopen(options); + + write(); + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + first_key, &result)); + ASSERT_EQ(result.columns(), first_columns_uppercase); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &result)); + ASSERT_EQ(result.columns(), second_columns_uppercase); + } + + { + PinnableWideColumns result; + ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), + last_key, &result)); + + WideColumns expected_columns{ + {kDefaultWideColumnName, last_value_uppercase}}; + ASSERT_EQ(result.columns(), expected_columns); + } + } +} + TEST_F(DBWideBasicTest, PutEntityTimestampError) { // Note: timestamps are currently not supported diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 9c6a9c30d..aeeb3c47b 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -11,11 +11,13 @@ #include #include #include +#include #include #include "rocksdb/customizable.h" #include "rocksdb/rocksdb_namespace.h" #include "rocksdb/types.h" +#include "rocksdb/wide_columns.h" namespace ROCKSDB_NAMESPACE { @@ -34,6 +36,7 @@ class CompactionFilter : public Customizable { kValue, kMergeOperand, kBlobIndex, // used internally by BlobDB. + kWideColumnEntity, }; enum class Decision { @@ -44,6 +47,7 @@ class CompactionFilter : public Customizable { kChangeBlobIndex, // used internally by BlobDB. kIOError, // used internally by BlobDB. kPurge, // used for keys that can only be SingleDelete'ed + kChangeWideColumnEntity, kUndetermined, }; @@ -176,15 +180,57 @@ class CompactionFilter : public Customizable { } return value_changed ? Decision::kChangeValue : Decision::kKeep; } + case ValueType::kMergeOperand: { bool rv = FilterMergeOperand(level, key, existing_value); return rv ? Decision::kRemove : Decision::kKeep; } + case ValueType::kBlobIndex: return Decision::kKeep; + + default: + assert(false); + return Decision::kKeep; } - assert(false); - return Decision::kKeep; + } + + // Wide column aware API. Called for plain values, merge operands, and + // wide-column entities; the `value_type` parameter indicates the type of the + // key-value. When the key-value is a plain value or a merge operand, the + // `existing_value` parameter contains the existing value and the + // `existing_columns` parameter is invalid (nullptr). When the key-value is a + // wide-column entity, the `existing_columns` parameter contains the wide + // columns of the existing entity and the `existing_value` parameter is + // invalid (nullptr). The output parameters `new_value` and `new_columns` can + // be used to change the value or wide columns of the key-value when + // `kChangeValue` or `kChangeWideColumnEntity` is returned. See above for more + // information on the semantics of the potential return values. + // + // For compatibility, the default implementation keeps all wide-column + // entities, and falls back to FilterV2 for plain values and merge operands. + // If you override this method, there is no need to override FilterV2 (or + // Filter/FilterMergeOperand). + virtual Decision FilterV3( + int level, const Slice& key, ValueType value_type, + const Slice* existing_value, const WideColumns* existing_columns, + std::string* new_value, + std::vector>* /* new_columns */, + std::string* skip_until) const { +#ifdef NDEBUG + (void)existing_columns; +#endif + + assert(!existing_value || !existing_columns); + assert(value_type == ValueType::kWideColumnEntity || existing_value); + assert(value_type != ValueType::kWideColumnEntity || existing_columns); + + if (value_type == ValueType::kWideColumnEntity) { + return Decision::kKeep; + } + + return FilterV2(level, key, value_type, *existing_value, new_value, + skip_until); } // Internal (BlobDB) use only. Do not override in application code. @@ -211,7 +257,7 @@ class CompactionFilter : public Customizable { // In the case of BlobDB, it may be possible to reach a decision with only // the key without reading the actual value. Keys whose value_type is // kBlobIndex will be checked by this method. - // Returning kUndetermined will cause FilterV2() to be called to make a + // Returning kUndetermined will cause FilterV3() to be called to make a // decision as usual. virtual Decision FilterBlobByKey(int /*level*/, const Slice& /*key*/, std::string* /*new_value*/,