Compaction filter support for (new) BlobDB (#7974)

Summary:
Allow applications to implement a custom compaction filter and pass it to BlobDB.

The compaction filter's custom logic can operate on blobs.
To do so, application needs to subclass `CompactionFilter` abstract class and implement `FilterV2()` method.
Optionally, a method called `ShouldFilterBlobByKey()` can be implemented if application's custom logic rely solely
on the key to make a decision without reading the blob, thus saving extra IO. Examples can be found in
db/blob/db_blob_compaction_test.cc.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7974

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D26509280

Pulled By: riversand963

fbshipit-source-id: 59f9ae5614c4359de32f4f2b16684193cc537b39
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent 2772eb7735
commit cef4a6c49f
  1. 1
      CMakeLists.txt
  2. 5
      HISTORY.md
  3. 4
      Makefile
  4. 7
      TARGETS
  5. 53
      db/blob/db_blob_basic_test.cc
  6. 366
      db/blob/db_blob_compaction_test.cc
  7. 189
      db/compaction/compaction_iterator.cc
  8. 2
      db/db_impl/db_impl_open.cc
  9. 16
      include/rocksdb/compaction_filter.h
  10. 1
      src.mk
  11. 2
      utilities/blob_db/blob_compaction_filter.h

@ -1065,6 +1065,7 @@ if(WITH_TESTS)
db/blob/blob_file_garbage_test.cc
db/blob/blob_file_reader_test.cc
db/blob/db_blob_basic_test.cc
db/blob/db_blob_compaction_test.cc
db/blob/db_blob_corruption_test.cc
db/blob/db_blob_index_test.cc
db/column_family_test.cc

@ -7,6 +7,10 @@
* Add a new option BlockBasedTableOptions::max_auto_readahead_size. RocksDB does auto-readahead for iterators on noticing more than two reads for a table file if user doesn't provide readahead_size. The readahead starts at 8KB and doubles on every additional read upto max_auto_readahead_size and now max_auto_readahead_size can be configured dynamically as well. Found that 256 KB readahead size provides the best performance, based on experiments, for auto readahead. Experiment data is in PR #3282. If value is set 0 then no automatic prefetching will be done by rocksdb. Also changing the value will only affect files opened after the change.
* Add suppport to extend DB::VerifyFileChecksums API to also verify blob files checksum.
### New Features
* Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision.
## 6.18.0 (02/19/2021)
### Behavior Changes
* When retryable IO error occurs during compaction, it is mapped to soft error and set the BG error. However, auto resume is not called to clean the soft error since compaction will reschedule by itself. In this change, When retryable IO error occurs during compaction, BG error is not set. User will be informed the error via EventHelper.
@ -30,6 +34,7 @@
* Add new Append and PositionedAppend APIs to FileSystem to bring the data verification information (data checksum information) from upper layer (e.g., WritableFileWriter) to the storage layer. In this way, the customized FileSystem is able to verify the correctness of data being written to the storage on time. Add checksum_handoff_file_types to DBOptions. User can use this option to control which file types (Currently supported file tyes: kWALFile, kTableFile, kDescriptorFile.) should use the new Append and PositionedAppend APIs to handoff the verification information. Currently, RocksDB only use crc32c to calculate the checksum for write handoff.
* Add an option, `CompressionOptions::max_dict_buffer_bytes`, to limit the in-memory buffering for selecting samples for generating/training a dictionary. The limit is currently loosely adhered to.
## 6.17.0 (01/15/2021)
### Behavior Changes
* When verifying full file checksum with `DB::VerifyFileChecksums()`, we now fail with `Status::InvalidArgument` if the name of the checksum generator used for verification does not match the name of the checksum generator used for protecting the file when it was created.

@ -592,6 +592,7 @@ ifdef ASSERT_STATUS_CHECKED
db_log_iter_test \
db_bloom_filter_test \
db_blob_basic_test \
db_blob_compaction_test \
db_blob_corruption_test \
db_blob_index_test \
db_block_cache_test \
@ -1569,6 +1570,9 @@ db_basic_test: $(OBJ_DIR)/db/db_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
db_blob_basic_test: $(OBJ_DIR)/db/blob/db_blob_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
db_blob_compaction_test: $(OBJ_DIR)/db/blob/db_blob_compaction_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
db_with_timestamp_basic_test: $(OBJ_DIR)/db/db_with_timestamp_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

@ -1149,6 +1149,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"db_blob_compaction_test",
"db/blob/db_blob_compaction_test.cc",
"serial",
[],
[],
],
[
"db_blob_corruption_test",
"db/blob/db_blob_corruption_test.cc",

@ -355,6 +355,59 @@ TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) {
ASSERT_TRUE(statuses[1].IsIOError());
}
namespace {
class ReadBlobCompactionFilter : public CompactionFilter {
public:
ReadBlobCompactionFilter() = default;
const char* Name() const override {
return "rocksdb.compaction.filter.read.blob";
}
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& /*key*/, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const override {
if (value_type != CompactionFilter::ValueType::kValue) {
return CompactionFilter::Decision::kKeep;
}
assert(new_value);
new_value->assign(existing_value.data(), existing_value.size());
return CompactionFilter::Decision::kChangeValue;
}
};
} // anonymous namespace
TEST_P(DBBlobBasicIOErrorTest, CompactionFilterReadBlob_IOError) {
Options options = GetDefaultOptions();
options.env = fault_injection_env_.get();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ReadBlobCompactionFilter);
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
constexpr char key[] = "foo";
constexpr char blob_value[] = "foo_blob_value";
ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_->SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr)
.IsIOError());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -0,0 +1,366 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_index.h"
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
#include "utilities/fault_injection_env.h"
namespace ROCKSDB_NAMESPACE {
class DBBlobCompactionTest : public DBTestBase {
public:
explicit DBBlobCompactionTest()
: DBTestBase("/db_blob_compaction_test", /*env_do_fsync=*/false) {}
// TODO: copied from DBCompactionTest. Should be de-duplicated in the future.
std::vector<uint64_t> GetBlobFileNumbers() {
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& blob_files = storage_info->GetBlobFiles();
std::vector<uint64_t> result;
result.reserve(blob_files.size());
for (const auto& blob_file : blob_files) {
result.emplace_back(blob_file.first);
}
return result;
}
};
namespace {
class FilterByKeyLength : public CompactionFilter {
public:
explicit FilterByKeyLength(size_t len) : length_threshold_(len) {}
const char* Name() const override {
return "rocksdb.compaction.filter.by.key.length";
}
CompactionFilter::Decision FilterBlobByKey(
int /*level*/, const Slice& key, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
if (key.size() < length_threshold_) {
return CompactionFilter::Decision::kRemove;
}
return CompactionFilter::Decision::kKeep;
}
private:
size_t length_threshold_;
};
class BadBlobCompactionFilter : public CompactionFilter {
public:
explicit BadBlobCompactionFilter(std::string prefix,
CompactionFilter::Decision filter_by_key,
CompactionFilter::Decision filter_v2)
: prefix_(std::move(prefix)),
filter_blob_by_key_(filter_by_key),
filter_v2_(filter_v2) {}
const char* Name() const override { return "rocksdb.compaction.filter.bad"; }
CompactionFilter::Decision FilterBlobByKey(
int /*level*/, const Slice& key, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
if (key.size() >= prefix_.size() &&
0 == strncmp(prefix_.data(), key.data(), prefix_.size())) {
return CompactionFilter::Decision::kUndetermined;
}
return filter_blob_by_key_;
}
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
return filter_v2_;
}
private:
const std::string prefix_;
const CompactionFilter::Decision filter_blob_by_key_;
const CompactionFilter::Decision filter_v2_;
};
class ValueBlindWriteFilter : public CompactionFilter {
public:
explicit ValueBlindWriteFilter(std::string new_val)
: new_value_(std::move(new_val)) {}
const char* Name() const override {
return "rocksdb.compaction.filter.blind.write";
}
CompactionFilter::Decision FilterBlobByKey(
int level, const Slice& key, std::string* new_value,
std::string* skip_until) const override;
private:
const std::string new_value_;
};
CompactionFilter::Decision ValueBlindWriteFilter::FilterBlobByKey(
int /*level*/, const Slice& /*key*/, std::string* new_value,
std::string* /*skip_until*/) const {
assert(new_value);
new_value->assign(new_value_);
return CompactionFilter::Decision::kChangeValue;
}
class ValueMutationFilter : public CompactionFilter {
public:
explicit ValueMutationFilter(std::string padding)
: padding_(std::move(padding)) {}
const char* Name() const override {
return "rocksdb.compaction.filter.value.mutation";
}
CompactionFilter::Decision FilterV2(int level, const Slice& key,
ValueType value_type,
const Slice& existing_value,
std::string* new_value,
std::string* skip_until) const override;
private:
const std::string padding_;
};
CompactionFilter::Decision ValueMutationFilter::FilterV2(
int /*level*/, const Slice& /*key*/, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const {
assert(CompactionFilter::ValueType::kBlobIndex != value_type);
if (CompactionFilter::ValueType::kValue != value_type) {
return CompactionFilter::Decision::kKeep;
}
assert(new_value);
new_value->assign(existing_value.data(), existing_value.size());
new_value->append(padding_);
return CompactionFilter::Decision::kChangeValue;
}
class AlwaysKeepFilter : public CompactionFilter {
public:
explicit AlwaysKeepFilter() = default;
const char* Name() const override {
return "rocksdb.compaction.filter.always.keep";
}
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
return CompactionFilter::Decision::kKeep;
}
};
} // anonymous namespace
class DBBlobBadCompactionFilterTest
: public DBBlobCompactionTest,
public testing::WithParamInterface<
std::tuple<std::string, CompactionFilter::Decision,
CompactionFilter::Decision>> {
public:
explicit DBBlobBadCompactionFilterTest()
: compaction_filter_guard_(new BadBlobCompactionFilter(
std::get<0>(GetParam()), std::get<1>(GetParam()),
std::get<2>(GetParam()))) {}
protected:
std::unique_ptr<CompactionFilter> compaction_filter_guard_;
};
INSTANTIATE_TEST_CASE_P(
BadCompactionFilter, DBBlobBadCompactionFilterTest,
testing::Combine(
testing::Values("a"),
testing::Values(CompactionFilter::Decision::kChangeBlobIndex,
CompactionFilter::Decision::kIOError),
testing::Values(CompactionFilter::Decision::kUndetermined,
CompactionFilter::Decision::kChangeBlobIndex,
CompactionFilter::Decision::kIOError)));
TEST_F(DBBlobCompactionTest, FilterByKeyLength) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
constexpr size_t kKeyLength = 2;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new FilterByKeyLength(kKeyLength));
options.compaction_filter = compaction_filter_guard.get();
constexpr char short_key[] = "a";
constexpr char long_key[] = "abc";
constexpr char blob_value[] = "value";
DestroyAndReopen(options);
ASSERT_OK(Put(short_key, blob_value));
ASSERT_OK(Put(long_key, blob_value));
ASSERT_OK(Flush());
CompactRangeOptions cro;
ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), short_key, &value).IsNotFound());
value.clear();
ASSERT_OK(db_->Get(ReadOptions(), long_key, &value));
ASSERT_EQ("value", value);
Close();
}
TEST_F(DBBlobCompactionTest, BlindWriteFilter) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
constexpr char new_blob_value[] = "new_blob_value";
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ValueBlindWriteFilter(new_blob_value));
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
const std::vector<std::string> keys = {"a", "b", "c"};
const std::vector<std::string> values = {"a_value", "b_value", "c_value"};
assert(keys.size() == values.size());
for (size_t i = 0; i < keys.size(); ++i) {
ASSERT_OK(Put(keys[i], values[i]));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
for (const auto& key : keys) {
ASSERT_EQ(new_blob_value, Get(key));
}
Close();
}
TEST_P(DBBlobBadCompactionFilterTest, BadDecisionFromCompactionFilter) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
options.compaction_filter = compaction_filter_guard_.get();
DestroyAndReopen(options);
ASSERT_OK(Put("b", "value"));
ASSERT_OK(Flush());
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr)
.IsNotSupported());
Close();
DestroyAndReopen(options);
std::string key(std::get<0>(GetParam()));
ASSERT_OK(Put(key, "value"));
ASSERT_OK(Flush());
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr)
.IsNotSupported());
Close();
}
TEST_F(DBBlobCompactionTest, CompactionFilter_InlinedTTLIndex) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.enable_blob_files = true;
options.min_blob_size = 0;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ValueMutationFilter(""));
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
constexpr char key[] = "key";
constexpr char blob[] = "blob";
// Fake an inlined TTL blob index.
std::string blob_index;
constexpr uint64_t expiration = 1234567890;
BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
WriteBatch batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr)
.IsCorruption());
Close();
}
TEST_F(DBBlobCompactionTest, CompactionFilter) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.enable_blob_files = true;
options.min_blob_size = 0;
constexpr char padding[] = "_delta";
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ValueMutationFilter(padding));
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
const std::vector<std::pair<std::string, std::string>> kvs = {
{"a", "a_value"}, {"b", "b_value"}, {"c", "c_value"}};
for (const auto& kv : kvs) {
ASSERT_OK(Put(kv.first, kv.second));
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
for (const auto& kv : kvs) {
ASSERT_EQ(kv.second + std::string(padding), Get(kv.first));
}
Close();
}
TEST_F(DBBlobCompactionTest, CorruptedBlobIndex) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.enable_blob_files = true;
options.min_blob_size = 0;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new ValueMutationFilter(""));
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
// Mock a corrupted blob index
constexpr char key[] = "key";
std::string blob_idx("blob_idx");
WriteBatch write_batch;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&write_batch, 0, key, blob_idx));
ASSERT_OK(db_->Write(WriteOptions(), &write_batch));
ASSERT_OK(Flush());
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr)
.IsCorruption());
Close();
}
TEST_F(DBBlobCompactionTest, CompactionFilterReadBlobAndKeep) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.enable_blob_files = true;
options.min_blob_size = 0;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new AlwaysKeepFilter());
options.compaction_filter = compaction_filter_guard.get();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "foo_value"));
ASSERT_OK(Flush());
std::vector<uint64_t> blob_files = GetBlobFileNumbers();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
ASSERT_EQ(blob_files, GetBlobFileNumbers());
Close();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -204,72 +204,140 @@ void CompactionIterator::Next() {
bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Slice* skip_until) {
if (compaction_filter_ != nullptr &&
(ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true,
// replace the entry with a deletion marker.
CompactionFilter::Decision filter;
compaction_filter_value_.clear();
compaction_filter_skip_until_.Clear();
CompactionFilter::ValueType value_type =
ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
: CompactionFilter::ValueType::kBlobIndex;
// Hack: pass internal key to BlobIndexCompactionFilter since it needs
// to get sequence number.
Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_;
{
StopWatchNano timer(clock_, report_detailed_time_);
if (!compaction_filter_ ||
(ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex)) {
return true;
}
bool error = false;
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true,
// replace the entry with a deletion marker.
CompactionFilter::Decision filter = CompactionFilter::Decision::kUndetermined;
compaction_filter_value_.clear();
compaction_filter_skip_until_.Clear();
CompactionFilter::ValueType value_type =
ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
: CompactionFilter::ValueType::kBlobIndex;
// Hack: pass internal key to BlobIndexCompactionFilter since it needs
// to get sequence number.
assert(compaction_filter_);
Slice& filter_key =
(ikey_.type == kTypeValue ||
!compaction_filter_->IsStackedBlobDbInternalCompactionFilter())
? ikey_.user_key
: key_;
{
StopWatchNano timer(clock_, report_detailed_time_);
if (kTypeBlobIndex == ikey_.type) {
blob_value_.Reset();
filter = compaction_filter_->FilterBlobByKey(
compaction_->level(), filter_key, &compaction_filter_value_,
compaction_filter_skip_until_.rep());
if (CompactionFilter::Decision::kUndetermined == filter &&
!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
// For integrated BlobDB impl, CompactionIterator reads blob value.
// For Stacked BlobDB impl, the corresponding CompactionFilter's
// FilterV2 method should read the blob value.
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(value_);
if (!s.ok()) {
status_ = s;
valid_ = false;
return false;
}
if (blob_index.HasTTL() || blob_index.IsInlined()) {
status_ = Status::Corruption("Unexpected TTL/inlined blob index");
valid_ = false;
return false;
}
const Version* const version = compaction_->input_version();
assert(version);
s = version->GetBlob(ReadOptions(), ikey_.user_key, blob_index,
&blob_value_);
if (!s.ok()) {
status_ = s;
valid_ = false;
return false;
}
value_type = CompactionFilter::ValueType::kValue;
}
}
if (CompactionFilter::Decision::kUndetermined == filter) {
filter = compaction_filter_->FilterV2(
compaction_->level(), filter_key, value_type, value_,
&compaction_filter_value_, compaction_filter_skip_until_.rep());
iter_stats_.total_filter_time +=
env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
compaction_->level(), filter_key, value_type,
blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_,
compaction_filter_skip_until_.rep());
}
iter_stats_.total_filter_time +=
env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
}
if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
0) {
// Can't skip to a key smaller than the current one.
// Keep the key as per FilterV2 documentation.
filter = CompactionFilter::Decision::kKeep;
}
if (CompactionFilter::Decision::kUndetermined == filter) {
// Should not reach here, since FilterV2 should never return kUndetermined.
status_ =
Status::NotSupported("FilterV2() should never return kUndetermined");
valid_ = false;
return false;
}
if (filter == CompactionFilter::Decision::kRemove) {
// convert the current key to a delete; key_ is pointing into
// current_key_ at this point, so updating current_key_ updates key()
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (filter == CompactionFilter::Decision::kChangeValue) {
if (ikey_.type == kTypeBlobIndex) {
// value transfer from blob file to inlined data
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
*need_skip = true;
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
kValueTypeForSeek);
*skip_until = compaction_filter_skip_until_.Encode();
} else if (filter == CompactionFilter::Decision::kChangeBlobIndex) {
if (ikey_.type == kTypeValue) {
// value transfer from inlined data to blob file
ikey_.type = kTypeBlobIndex;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kIOError) {
status_ =
Status::IOError("Failed to access blob during compaction filter");
if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
0) {
// Can't skip to a key smaller than the current one.
// Keep the key as per FilterV2 documentation.
filter = CompactionFilter::Decision::kKeep;
}
if (filter == CompactionFilter::Decision::kRemove) {
// convert the current key to a delete; key_ is pointing into
// current_key_ at this point, so updating current_key_ updates key()
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (filter == CompactionFilter::Decision::kChangeValue) {
if (ikey_.type == kTypeBlobIndex) {
// value transfer from blob file to inlined data
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
*need_skip = true;
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
kValueTypeForSeek);
*skip_until = compaction_filter_skip_until_.Encode();
} else if (filter == CompactionFilter::Decision::kChangeBlobIndex) {
// Only the StackableDB-based BlobDB impl's compaction filter should return
// kChangeBlobIndex. Decision about rewriting blob and changing blob index
// in the integrated BlobDB impl is made in subsequent call to
// PrepareOutput() and its callees.
if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
status_ = Status::NotSupported(
"Only stacked BlobDB's internal compaction filter can return "
"kChangeBlobIndex.");
valid_ = false;
return false;
}
if (ikey_.type == kTypeValue) {
// value transfer from inlined data to blob file
ikey_.type = kTypeBlobIndex;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kIOError) {
if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
status_ = Status::NotSupported(
"CompactionFilter for integrated BlobDB should not return kIOError");
valid_ = false;
return false;
}
status_ = Status::IOError("Failed to access blob during compaction filter");
error = true;
}
return true;
return !error;
}
void CompactionIterator::NextFromInput() {
@ -840,7 +908,8 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() {
}
// GC for stacked BlobDB
if (compaction_filter_) {
if (compaction_filter_ &&
compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
const auto blob_decision = compaction_filter_->PrepareBlobOutput(
user_key(), value_, &compaction_filter_value_);

@ -175,7 +175,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
NewSstFileManager(result.env, result.info_log));
result.sst_file_manager = sst_file_manager;
}
#endif
#endif // !ROCKSDB_LITE
if (!result.paranoid_checks) {
result.skip_checking_sst_file_sizes_on_db_open = true;

@ -47,6 +47,7 @@ class CompactionFilter {
kRemoveAndSkipUntil,
kChangeBlobIndex, // used internally by BlobDB.
kIOError, // used internally by BlobDB.
kUndetermined,
};
enum class BlobDecision { kKeep, kChangeValue, kCorruption, kIOError };
@ -150,6 +151,7 @@ class CompactionFilter {
// - If you use kRemoveAndSkipUntil, consider also reducing
// compaction_readahead_size option.
//
// Should never return kUndetermined.
// Note: If you are using a TransactionDB, it is not recommended to filter
// out or modify merge operands (ValueType::kMergeOperand).
// If a merge operation is filtered out, TransactionDB may not realize there
@ -196,6 +198,20 @@ class CompactionFilter {
// Returns a name that identifies this compaction filter.
// The name will be printed to LOG file on start up for diagnosis.
virtual const char* Name() const = 0;
// Internal (BlobDB) use only. Do not override in application code.
virtual bool IsStackedBlobDbInternalCompactionFilter() const { return false; }
// In the case of BlobDB, it may be possible to reach a decision with only
// the key without reading the actual value. Keys whose value_type is
// kBlobIndex will be checked by this method.
// Returning kUndetermined will cause FilterV2() to be called to make a
// decision as usual.
virtual Decision FilterBlobByKey(int /*level*/, const Slice& /*key*/,
std::string* /*new_value*/,
std::string* /*skip_until*/) const {
return Decision::kUndetermined;
}
};
// Each compaction will create a new CompactionFilter allowing the

@ -377,6 +377,7 @@ TEST_MAIN_SOURCES = \
db/blob/blob_file_garbage_test.cc \
db/blob/blob_file_reader_test.cc \
db/blob/db_blob_basic_test.cc \
db/blob/db_blob_compaction_test.cc \
db/blob/db_blob_corruption_test.cc \
db/blob/db_blob_index_test.cc \
db/column_family_test.cc \

@ -54,6 +54,8 @@ class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase {
const Slice& value, std::string* new_value,
std::string* skip_until) const override;
bool IsStackedBlobDbInternalCompactionFilter() const override { return true; }
protected:
bool IsBlobFileOpened() const;
virtual bool OpenNewBlobFileIfNeeded() const;

Loading…
Cancel
Save