Add compaction filter support for wide-column entities (#11196)

Summary:
The patch adds compaction filter support for wide-column entities by introducing
a new `CompactionFilter` API called `FilterV3`. This API is called for regular
key-values, merge operands, and wide-column entities as well. It is passed the
existing value/operand or wide-column structure and it can update the value or
columns or keep/delete/etc. the key-value as usual. For compatibility, the default
implementation of `FilterV3` keeps all wide-column entities and falls back to calling
`FilterV2` for plain old key-values and merge operands.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11196

Test Plan: `make check`

Reviewed By: akankshamahajan15

Differential Revision: D43094147

Pulled By: ltamasi

fbshipit-source-id: 75acabe9a35254f7f404ba6173ee9c2774382ebd
oxigraph-8.1.1
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent 6650ca244e
commit 876d281592
  1. 3
      HISTORY.md
  2. 81
      db/compaction/compaction_iterator.cc
  3. 9
      db/merge_helper.cc
  4. 297
      db/wide/db_wide_basic_test.cc
  5. 52
      include/rocksdb/compaction_filter.h

@ -28,6 +28,9 @@
### Build Changes
* The `make` build now builds a shared library by default instead of a static library. Use `LIB_MODE=static` to override.
### New Features
* Compaction filters are now supported for wide-column entities by means of the `FilterV3` API. See the comment of the API for more details.
## 7.10.0 (01/23/2023)
### Behavior changes
* Make best-efforts recovery verify SST unique ID before Version construction (#10962)

@ -13,6 +13,7 @@
#include "db/blob/blob_index.h"
#include "db/blob/prefetch_buffer_collection.h"
#include "db/snapshot_checker.h"
#include "db/wide/wide_column_serialization.h"
#include "logging/logging.h"
#include "port/likely.h"
#include "rocksdb/listener.h"
@ -225,8 +226,8 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
return true;
}
// TODO: support compaction filter for wide-column entities
if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex) {
if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
ikey_.type != kTypeWideColumnEntity) {
return true;
}
@ -234,7 +235,9 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
CompactionFilter::Decision::kUndetermined;
CompactionFilter::ValueType value_type =
ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
: CompactionFilter::ValueType::kBlobIndex;
: ikey_.type == kTypeBlobIndex
? CompactionFilter::ValueType::kBlobIndex
: CompactionFilter::ValueType::kWideColumnEntity;
// Hack: pass internal key to BlobIndexCompactionFilter since it needs
// to get sequence number.
@ -248,6 +251,8 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
compaction_filter_value_.clear();
compaction_filter_skip_until_.Clear();
std::vector<std::pair<std::string, std::string>> new_columns;
{
StopWatchNano timer(clock_, report_detailed_time_);
@ -303,10 +308,36 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
value_type = CompactionFilter::ValueType::kValue;
}
}
if (decision == CompactionFilter::Decision::kUndetermined) {
decision = compaction_filter_->FilterV2(
level_, filter_key, value_type,
blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_,
const Slice* existing_val = nullptr;
const WideColumns* existing_col = nullptr;
WideColumns existing_columns;
if (ikey_.type != kTypeWideColumnEntity) {
if (!blob_value_.empty()) {
existing_val = &blob_value_;
} else {
existing_val = &value_;
}
} else {
Slice value_copy = value_;
const Status s =
WideColumnSerialization::Deserialize(value_copy, existing_columns);
if (!s.ok()) {
status_ = s;
validity_info_.Invalidate();
return false;
}
existing_col = &existing_columns;
}
decision = compaction_filter_->FilterV3(
level_, filter_key, value_type, existing_val, existing_col,
&compaction_filter_value_, &new_columns,
compaction_filter_skip_until_.rep());
}
@ -315,9 +346,10 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
}
if (decision == CompactionFilter::Decision::kUndetermined) {
// Should not reach here, since FilterV2 should never return kUndetermined.
status_ =
Status::NotSupported("FilterV2() should never return kUndetermined");
// Should not reach here, since FilterV2/FilterV3 should never return
// kUndetermined.
status_ = Status::NotSupported(
"FilterV2/FilterV3 should never return kUndetermined");
validity_info_.Invalidate();
return false;
}
@ -326,7 +358,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
0) {
// Can't skip to a key smaller than the current one.
// Keep the key as per FilterV2 documentation.
// Keep the key as per FilterV2/FilterV3 documentation.
decision = CompactionFilter::Decision::kKeep;
}
@ -388,6 +420,35 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
status_ = Status::IOError("Failed to access blob during compaction filter");
validity_info_.Invalidate();
return false;
} else if (decision == CompactionFilter::Decision::kChangeWideColumnEntity) {
WideColumns sorted_columns;
sorted_columns.reserve(new_columns.size());
for (const auto& column : new_columns) {
sorted_columns.emplace_back(column.first, column.second);
}
std::sort(sorted_columns.begin(), sorted_columns.end(),
[](const WideColumn& lhs, const WideColumn& rhs) {
return lhs.name().compare(rhs.name()) < 0;
});
{
const Status s = WideColumnSerialization::Serialize(
sorted_columns, compaction_filter_value_);
if (!s.ok()) {
status_ = s;
validity_info_.Invalidate();
return false;
}
}
if (ikey_.type != kTypeWideColumnEntity) {
ikey_.type = kTypeWideColumnEntity;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeWideColumnEntity);
}
value_ = compaction_filter_value_;
}
return true;

@ -578,14 +578,15 @@ CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key,
}
compaction_filter_value_.clear();
compaction_filter_skip_until_.Clear();
auto ret = compaction_filter_->FilterV2(
level_, user_key, CompactionFilter::ValueType::kMergeOperand, value_slice,
&compaction_filter_value_, compaction_filter_skip_until_.rep());
auto ret = compaction_filter_->FilterV3(
level_, user_key, CompactionFilter::ValueType::kMergeOperand,
&value_slice, /* existing_columns */ nullptr, &compaction_filter_value_,
/* new_columns */ nullptr, compaction_filter_skip_until_.rep());
if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) {
if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(),
user_key) <= 0) {
// Invalid skip_until returned from compaction filter.
// Keep the key as per FilterV2 documentation.
// Keep the key as per FilterV2/FilterV3 documentation.
ret = CompactionFilter::Decision::kKeep;
} else {
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,

@ -4,6 +4,7 @@
// (found in the LICENSE.Apache file in the root directory).
#include <array>
#include <cctype>
#include <memory>
#include "db/db_test_util.h"
@ -593,6 +594,302 @@ TEST_F(DBWideBasicTest, MergeEntity) {
verify_merge_ops_post_compaction();
}
TEST_F(DBWideBasicTest, CompactionFilter) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
// Wide-column entity with default column
constexpr char first_key[] = "first";
WideColumns first_columns{{kDefaultWideColumnName, "a"},
{"attr_name1", "foo"},
{"attr_name2", "bar"}};
WideColumns first_columns_uppercase{{kDefaultWideColumnName, "A"},
{"attr_name1", "FOO"},
{"attr_name2", "BAR"}};
// Wide-column entity without default column
constexpr char second_key[] = "second";
WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
WideColumns second_columns_uppercase{{"attr_one", "TWO"},
{"attr_three", "FOUR"}};
// Plain old key-value
constexpr char last_key[] = "last";
constexpr char last_value[] = "baz";
constexpr char last_value_uppercase[] = "BAZ";
auto write = [&] {
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
first_key, first_columns));
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
second_key, second_columns));
ASSERT_OK(Flush());
ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), last_key,
last_value));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr,
/* end */ nullptr));
};
// Test a compaction filter that keeps all entries
{
class KeepFilter : public CompactionFilter {
public:
Decision FilterV3(
int /* level */, const Slice& /* key */, ValueType /* value_type */,
const Slice* /* existing_value */,
const WideColumns* /* existing_columns */,
std::string* /* new_value */,
std::vector<std::pair<std::string, std::string>>* /* new_columns */,
std::string* /* skip_until */) const override {
return Decision::kKeep;
}
const char* Name() const override { return "KeepFilter"; }
};
KeepFilter filter;
options.compaction_filter = &filter;
DestroyAndReopen(options);
write();
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
first_key, &result));
ASSERT_EQ(result.columns(), first_columns);
}
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &result));
ASSERT_EQ(result.columns(), second_columns);
}
// Note: GetEntity should return an entity with a single default column,
// since last_key is a plain key-value
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
last_key, &result));
WideColumns expected_columns{{kDefaultWideColumnName, last_value}};
ASSERT_EQ(result.columns(), expected_columns);
}
}
// Test a compaction filter that removes all entries
{
class RemoveFilter : public CompactionFilter {
public:
Decision FilterV3(
int /* level */, const Slice& /* key */, ValueType /* value_type */,
const Slice* /* existing_value */,
const WideColumns* /* existing_columns */,
std::string* /* new_value */,
std::vector<std::pair<std::string, std::string>>* /* new_columns */,
std::string* /* skip_until */) const override {
return Decision::kRemove;
}
const char* Name() const override { return "RemoveFilter"; }
};
RemoveFilter filter;
options.compaction_filter = &filter;
DestroyAndReopen(options);
write();
{
PinnableWideColumns result;
ASSERT_TRUE(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
first_key, &result)
.IsNotFound());
}
{
PinnableWideColumns result;
ASSERT_TRUE(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &result)
.IsNotFound());
}
{
PinnableWideColumns result;
ASSERT_TRUE(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
last_key, &result)
.IsNotFound());
}
}
// Test a compaction filter that changes the values of entries to uppercase.
// The new entry is always a plain key-value; if the existing entry is a
// wide-column entity, only the value of its first column is kept.
{
class ChangeValueFilter : public CompactionFilter {
public:
Decision FilterV3(
int /* level */, const Slice& /* key */, ValueType value_type,
const Slice* existing_value, const WideColumns* existing_columns,
std::string* new_value,
std::vector<std::pair<std::string, std::string>>* /* new_columns */,
std::string* /* skip_until */) const override {
assert(new_value);
auto upper = [](const std::string& str) {
std::string result(str);
for (char& c : result) {
c = static_cast<char>(std::toupper(static_cast<unsigned char>(c)));
}
return result;
};
if (value_type == ValueType::kWideColumnEntity) {
assert(existing_columns);
if (!existing_columns->empty()) {
*new_value = upper(existing_columns->front().value().ToString());
}
} else {
assert(existing_value);
*new_value = upper(existing_value->ToString());
}
return Decision::kChangeValue;
}
const char* Name() const override { return "ChangeValueFilter"; }
};
ChangeValueFilter filter;
options.compaction_filter = &filter;
DestroyAndReopen(options);
write();
// Note: GetEntity should return entities with a single default column,
// since all entries are now plain key-values
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
first_key, &result));
WideColumns expected_columns{
{kDefaultWideColumnName, first_columns_uppercase[0].value()}};
ASSERT_EQ(result.columns(), expected_columns);
}
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &result));
WideColumns expected_columns{
{kDefaultWideColumnName, second_columns_uppercase[0].value()}};
ASSERT_EQ(result.columns(), expected_columns);
}
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
last_key, &result));
WideColumns expected_columns{
{kDefaultWideColumnName, last_value_uppercase}};
ASSERT_EQ(result.columns(), expected_columns);
}
}
// Test a compaction filter that changes the column values of entries to
// uppercase. The new entry is always a wide-column entity; if the existing
// entry is a plain key-value, it is converted to a wide-column entity with a
// single default column.
{
class ChangeEntityFilter : public CompactionFilter {
public:
Decision FilterV3(
int /* level */, const Slice& /* key */, ValueType value_type,
const Slice* existing_value, const WideColumns* existing_columns,
std::string* /* new_value */,
std::vector<std::pair<std::string, std::string>>* new_columns,
std::string* /* skip_until */) const override {
assert(new_columns);
auto upper = [](const std::string& str) {
std::string result(str);
for (char& c : result) {
c = static_cast<char>(std::toupper(static_cast<unsigned char>(c)));
}
return result;
};
if (value_type == ValueType::kWideColumnEntity) {
assert(existing_columns);
for (const auto& column : *existing_columns) {
new_columns->emplace_back(column.name().ToString(),
upper(column.value().ToString()));
}
} else {
assert(existing_value);
new_columns->emplace_back(kDefaultWideColumnName.ToString(),
upper(existing_value->ToString()));
}
return Decision::kChangeWideColumnEntity;
}
const char* Name() const override { return "ChangeEntityFilter"; }
};
ChangeEntityFilter filter;
options.compaction_filter = &filter;
DestroyAndReopen(options);
write();
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
first_key, &result));
ASSERT_EQ(result.columns(), first_columns_uppercase);
}
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &result));
ASSERT_EQ(result.columns(), second_columns_uppercase);
}
{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
last_key, &result));
WideColumns expected_columns{
{kDefaultWideColumnName, last_value_uppercase}};
ASSERT_EQ(result.columns(), expected_columns);
}
}
}
TEST_F(DBWideBasicTest, PutEntityTimestampError) {
// Note: timestamps are currently not supported

@ -11,11 +11,13 @@
#include <cassert>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "rocksdb/customizable.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/types.h"
#include "rocksdb/wide_columns.h"
namespace ROCKSDB_NAMESPACE {
@ -34,6 +36,7 @@ class CompactionFilter : public Customizable {
kValue,
kMergeOperand,
kBlobIndex, // used internally by BlobDB.
kWideColumnEntity,
};
enum class Decision {
@ -44,6 +47,7 @@ class CompactionFilter : public Customizable {
kChangeBlobIndex, // used internally by BlobDB.
kIOError, // used internally by BlobDB.
kPurge, // used for keys that can only be SingleDelete'ed
kChangeWideColumnEntity,
kUndetermined,
};
@ -176,15 +180,57 @@ class CompactionFilter : public Customizable {
}
return value_changed ? Decision::kChangeValue : Decision::kKeep;
}
case ValueType::kMergeOperand: {
bool rv = FilterMergeOperand(level, key, existing_value);
return rv ? Decision::kRemove : Decision::kKeep;
}
case ValueType::kBlobIndex:
return Decision::kKeep;
default:
assert(false);
return Decision::kKeep;
}
assert(false);
return Decision::kKeep;
}
// Wide column aware API. Called for plain values, merge operands, and
// wide-column entities; the `value_type` parameter indicates the type of the
// key-value. When the key-value is a plain value or a merge operand, the
// `existing_value` parameter contains the existing value and the
// `existing_columns` parameter is invalid (nullptr). When the key-value is a
// wide-column entity, the `existing_columns` parameter contains the wide
// columns of the existing entity and the `existing_value` parameter is
// invalid (nullptr). The output parameters `new_value` and `new_columns` can
// be used to change the value or wide columns of the key-value when
// `kChangeValue` or `kChangeWideColumnEntity` is returned. See above for more
// information on the semantics of the potential return values.
//
// For compatibility, the default implementation keeps all wide-column
// entities, and falls back to FilterV2 for plain values and merge operands.
// If you override this method, there is no need to override FilterV2 (or
// Filter/FilterMergeOperand).
virtual Decision FilterV3(
int level, const Slice& key, ValueType value_type,
const Slice* existing_value, const WideColumns* existing_columns,
std::string* new_value,
std::vector<std::pair<std::string, std::string>>* /* new_columns */,
std::string* skip_until) const {
#ifdef NDEBUG
(void)existing_columns;
#endif
assert(!existing_value || !existing_columns);
assert(value_type == ValueType::kWideColumnEntity || existing_value);
assert(value_type != ValueType::kWideColumnEntity || existing_columns);
if (value_type == ValueType::kWideColumnEntity) {
return Decision::kKeep;
}
return FilterV2(level, key, value_type, *existing_value, new_value,
skip_until);
}
// Internal (BlobDB) use only. Do not override in application code.
@ -211,7 +257,7 @@ class CompactionFilter : public Customizable {
// In the case of BlobDB, it may be possible to reach a decision with only
// the key without reading the actual value. Keys whose value_type is
// kBlobIndex will be checked by this method.
// Returning kUndetermined will cause FilterV2() to be called to make a
// Returning kUndetermined will cause FilterV3() to be called to make a
// decision as usual.
virtual Decision FilterBlobByKey(int /*level*/, const Slice& /*key*/,
std::string* /*new_value*/,

Loading…
Cancel
Save