From cef4a6c49f993590755289854aad109fbe8de629 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 25 Feb 2021 16:30:27 -0800 Subject: [PATCH] Compaction filter support for (new) BlobDB (#7974) Summary: Allow applications to implement a custom compaction filter and pass it to BlobDB. The compaction filter's custom logic can operate on blobs. To do so, application needs to subclass `CompactionFilter` abstract class and implement `FilterV2()` method. Optionally, a method called `ShouldFilterBlobByKey()` can be implemented if application's custom logic rely solely on the key to make a decision without reading the blob, thus saving extra IO. Examples can be found in db/blob/db_blob_compaction_test.cc. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7974 Test Plan: make check Reviewed By: ltamasi Differential Revision: D26509280 Pulled By: riversand963 fbshipit-source-id: 59f9ae5614c4359de32f4f2b16684193cc537b39 --- CMakeLists.txt | 1 + HISTORY.md | 5 + Makefile | 4 + TARGETS | 7 + db/blob/db_blob_basic_test.cc | 53 +++ db/blob/db_blob_compaction_test.cc | 366 +++++++++++++++++++++ db/compaction/compaction_iterator.cc | 189 +++++++---- db/db_impl/db_impl_open.cc | 2 +- include/rocksdb/compaction_filter.h | 16 + src.mk | 1 + utilities/blob_db/blob_compaction_filter.h | 2 + 11 files changed, 585 insertions(+), 61 deletions(-) create mode 100644 db/blob/db_blob_compaction_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index eb92a22d5..ff86426b8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1065,6 +1065,7 @@ if(WITH_TESTS) db/blob/blob_file_garbage_test.cc db/blob/blob_file_reader_test.cc db/blob/db_blob_basic_test.cc + db/blob/db_blob_compaction_test.cc db/blob/db_blob_corruption_test.cc db/blob/db_blob_index_test.cc db/column_family_test.cc diff --git a/HISTORY.md b/HISTORY.md index 30b236f32..3177854f1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,10 @@ * Add a new option BlockBasedTableOptions::max_auto_readahead_size. RocksDB does auto-readahead for iterators on noticing more than two reads for a table file if user doesn't provide readahead_size. The readahead starts at 8KB and doubles on every additional read upto max_auto_readahead_size and now max_auto_readahead_size can be configured dynamically as well. Found that 256 KB readahead size provides the best performance, based on experiments, for auto readahead. Experiment data is in PR #3282. If value is set 0 then no automatic prefetching will be done by rocksdb. Also changing the value will only affect files opened after the change. * Add suppport to extend DB::VerifyFileChecksums API to also verify blob files checksum. +### New Features +* Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision. + + ## 6.18.0 (02/19/2021) ### Behavior Changes * When retryable IO error occurs during compaction, it is mapped to soft error and set the BG error. However, auto resume is not called to clean the soft error since compaction will reschedule by itself. In this change, When retryable IO error occurs during compaction, BG error is not set. User will be informed the error via EventHelper. @@ -30,6 +34,7 @@ * Add new Append and PositionedAppend APIs to FileSystem to bring the data verification information (data checksum information) from upper layer (e.g., WritableFileWriter) to the storage layer. In this way, the customized FileSystem is able to verify the correctness of data being written to the storage on time. Add checksum_handoff_file_types to DBOptions. User can use this option to control which file types (Currently supported file tyes: kWALFile, kTableFile, kDescriptorFile.) should use the new Append and PositionedAppend APIs to handoff the verification information. Currently, RocksDB only use crc32c to calculate the checksum for write handoff. * Add an option, `CompressionOptions::max_dict_buffer_bytes`, to limit the in-memory buffering for selecting samples for generating/training a dictionary. The limit is currently loosely adhered to. + ## 6.17.0 (01/15/2021) ### Behavior Changes * When verifying full file checksum with `DB::VerifyFileChecksums()`, we now fail with `Status::InvalidArgument` if the name of the checksum generator used for verification does not match the name of the checksum generator used for protecting the file when it was created. diff --git a/Makefile b/Makefile index 8f42af139..b63c10b89 100644 --- a/Makefile +++ b/Makefile @@ -592,6 +592,7 @@ ifdef ASSERT_STATUS_CHECKED db_log_iter_test \ db_bloom_filter_test \ db_blob_basic_test \ + db_blob_compaction_test \ db_blob_corruption_test \ db_blob_index_test \ db_block_cache_test \ @@ -1569,6 +1570,9 @@ db_basic_test: $(OBJ_DIR)/db/db_basic_test.o $(TEST_LIBRARY) $(LIBRARY) db_blob_basic_test: $(OBJ_DIR)/db/blob/db_blob_basic_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +db_blob_compaction_test: $(OBJ_DIR)/db/blob/db_blob_compaction_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + db_with_timestamp_basic_test: $(OBJ_DIR)/db/db_with_timestamp_basic_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 30372b725..e891c3775 100644 --- a/TARGETS +++ b/TARGETS @@ -1149,6 +1149,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "db_blob_compaction_test", + "db/blob/db_blob_compaction_test.cc", + "serial", + [], + [], + ], [ "db_blob_corruption_test", "db/blob/db_blob_corruption_test.cc", diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index 61555e0a6..2f20b9500 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -355,6 +355,59 @@ TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) { ASSERT_TRUE(statuses[1].IsIOError()); } +namespace { + +class ReadBlobCompactionFilter : public CompactionFilter { + public: + ReadBlobCompactionFilter() = default; + const char* Name() const override { + return "rocksdb.compaction.filter.read.blob"; + } + CompactionFilter::Decision FilterV2( + int /*level*/, const Slice& /*key*/, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* /*skip_until*/) const override { + if (value_type != CompactionFilter::ValueType::kValue) { + return CompactionFilter::Decision::kKeep; + } + assert(new_value); + new_value->assign(existing_value.data(), existing_value.size()); + return CompactionFilter::Decision::kChangeValue; + } +}; + +} // anonymous namespace + +TEST_P(DBBlobBasicIOErrorTest, CompactionFilterReadBlob_IOError) { + Options options = GetDefaultOptions(); + options.env = fault_injection_env_.get(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.create_if_missing = true; + std::unique_ptr compaction_filter_guard( + new ReadBlobCompactionFilter); + options.compaction_filter = compaction_filter_guard.get(); + + DestroyAndReopen(options); + constexpr char key[] = "foo"; + constexpr char blob_value[] = "foo_blob_value"; + ASSERT_OK(Put(key, blob_value)); + ASSERT_OK(Flush()); + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_->SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr) + .IsIOError()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/blob/db_blob_compaction_test.cc b/db/blob/db_blob_compaction_test.cc new file mode 100644 index 000000000..6e699b09c --- /dev/null +++ b/db/blob/db_blob_compaction_test.cc @@ -0,0 +1,366 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_index.h" +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "test_util/sync_point.h" +#include "utilities/fault_injection_env.h" + +namespace ROCKSDB_NAMESPACE { + +class DBBlobCompactionTest : public DBTestBase { + public: + explicit DBBlobCompactionTest() + : DBTestBase("/db_blob_compaction_test", /*env_do_fsync=*/false) {} + + // TODO: copied from DBCompactionTest. Should be de-duplicated in the future. + std::vector GetBlobFileNumbers() { + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& blob_files = storage_info->GetBlobFiles(); + + std::vector result; + result.reserve(blob_files.size()); + + for (const auto& blob_file : blob_files) { + result.emplace_back(blob_file.first); + } + + return result; + } +}; + +namespace { + +class FilterByKeyLength : public CompactionFilter { + public: + explicit FilterByKeyLength(size_t len) : length_threshold_(len) {} + const char* Name() const override { + return "rocksdb.compaction.filter.by.key.length"; + } + CompactionFilter::Decision FilterBlobByKey( + int /*level*/, const Slice& key, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (key.size() < length_threshold_) { + return CompactionFilter::Decision::kRemove; + } + return CompactionFilter::Decision::kKeep; + } + + private: + size_t length_threshold_; +}; + +class BadBlobCompactionFilter : public CompactionFilter { + public: + explicit BadBlobCompactionFilter(std::string prefix, + CompactionFilter::Decision filter_by_key, + CompactionFilter::Decision filter_v2) + : prefix_(std::move(prefix)), + filter_blob_by_key_(filter_by_key), + filter_v2_(filter_v2) {} + const char* Name() const override { return "rocksdb.compaction.filter.bad"; } + CompactionFilter::Decision FilterBlobByKey( + int /*level*/, const Slice& key, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (key.size() >= prefix_.size() && + 0 == strncmp(prefix_.data(), key.data(), prefix_.size())) { + return CompactionFilter::Decision::kUndetermined; + } + return filter_blob_by_key_; + } + CompactionFilter::Decision FilterV2( + int /*level*/, const Slice& /*key*/, ValueType /*value_type*/, + const Slice& /*existing_value*/, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + return filter_v2_; + } + + private: + const std::string prefix_; + const CompactionFilter::Decision filter_blob_by_key_; + const CompactionFilter::Decision filter_v2_; +}; + +class ValueBlindWriteFilter : public CompactionFilter { + public: + explicit ValueBlindWriteFilter(std::string new_val) + : new_value_(std::move(new_val)) {} + const char* Name() const override { + return "rocksdb.compaction.filter.blind.write"; + } + CompactionFilter::Decision FilterBlobByKey( + int level, const Slice& key, std::string* new_value, + std::string* skip_until) const override; + + private: + const std::string new_value_; +}; + +CompactionFilter::Decision ValueBlindWriteFilter::FilterBlobByKey( + int /*level*/, const Slice& /*key*/, std::string* new_value, + std::string* /*skip_until*/) const { + assert(new_value); + new_value->assign(new_value_); + return CompactionFilter::Decision::kChangeValue; +} + +class ValueMutationFilter : public CompactionFilter { + public: + explicit ValueMutationFilter(std::string padding) + : padding_(std::move(padding)) {} + const char* Name() const override { + return "rocksdb.compaction.filter.value.mutation"; + } + CompactionFilter::Decision FilterV2(int level, const Slice& key, + ValueType value_type, + const Slice& existing_value, + std::string* new_value, + std::string* skip_until) const override; + + private: + const std::string padding_; +}; + +CompactionFilter::Decision ValueMutationFilter::FilterV2( + int /*level*/, const Slice& /*key*/, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* /*skip_until*/) const { + assert(CompactionFilter::ValueType::kBlobIndex != value_type); + if (CompactionFilter::ValueType::kValue != value_type) { + return CompactionFilter::Decision::kKeep; + } + assert(new_value); + new_value->assign(existing_value.data(), existing_value.size()); + new_value->append(padding_); + return CompactionFilter::Decision::kChangeValue; +} + +class AlwaysKeepFilter : public CompactionFilter { + public: + explicit AlwaysKeepFilter() = default; + const char* Name() const override { + return "rocksdb.compaction.filter.always.keep"; + } + CompactionFilter::Decision FilterV2( + int /*level*/, const Slice& /*key*/, ValueType /*value_type*/, + const Slice& /*existing_value*/, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + return CompactionFilter::Decision::kKeep; + } +}; +} // anonymous namespace + +class DBBlobBadCompactionFilterTest + : public DBBlobCompactionTest, + public testing::WithParamInterface< + std::tuple> { + public: + explicit DBBlobBadCompactionFilterTest() + : compaction_filter_guard_(new BadBlobCompactionFilter( + std::get<0>(GetParam()), std::get<1>(GetParam()), + std::get<2>(GetParam()))) {} + + protected: + std::unique_ptr compaction_filter_guard_; +}; + +INSTANTIATE_TEST_CASE_P( + BadCompactionFilter, DBBlobBadCompactionFilterTest, + testing::Combine( + testing::Values("a"), + testing::Values(CompactionFilter::Decision::kChangeBlobIndex, + CompactionFilter::Decision::kIOError), + testing::Values(CompactionFilter::Decision::kUndetermined, + CompactionFilter::Decision::kChangeBlobIndex, + CompactionFilter::Decision::kIOError))); + +TEST_F(DBBlobCompactionTest, FilterByKeyLength) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.create_if_missing = true; + constexpr size_t kKeyLength = 2; + std::unique_ptr compaction_filter_guard( + new FilterByKeyLength(kKeyLength)); + options.compaction_filter = compaction_filter_guard.get(); + + constexpr char short_key[] = "a"; + constexpr char long_key[] = "abc"; + constexpr char blob_value[] = "value"; + + DestroyAndReopen(options); + ASSERT_OK(Put(short_key, blob_value)); + ASSERT_OK(Put(long_key, blob_value)); + ASSERT_OK(Flush()); + CompactRangeOptions cro; + ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr)); + std::string value; + ASSERT_TRUE(db_->Get(ReadOptions(), short_key, &value).IsNotFound()); + value.clear(); + ASSERT_OK(db_->Get(ReadOptions(), long_key, &value)); + ASSERT_EQ("value", value); + Close(); +} + +TEST_F(DBBlobCompactionTest, BlindWriteFilter) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.create_if_missing = true; + constexpr char new_blob_value[] = "new_blob_value"; + std::unique_ptr compaction_filter_guard( + new ValueBlindWriteFilter(new_blob_value)); + options.compaction_filter = compaction_filter_guard.get(); + DestroyAndReopen(options); + const std::vector keys = {"a", "b", "c"}; + const std::vector values = {"a_value", "b_value", "c_value"}; + assert(keys.size() == values.size()); + for (size_t i = 0; i < keys.size(); ++i) { + ASSERT_OK(Put(keys[i], values[i])); + } + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + for (const auto& key : keys) { + ASSERT_EQ(new_blob_value, Get(key)); + } + Close(); +} + +TEST_P(DBBlobBadCompactionFilterTest, BadDecisionFromCompactionFilter) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.create_if_missing = true; + options.compaction_filter = compaction_filter_guard_.get(); + DestroyAndReopen(options); + ASSERT_OK(Put("b", "value")); + ASSERT_OK(Flush()); + ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr) + .IsNotSupported()); + Close(); + + DestroyAndReopen(options); + std::string key(std::get<0>(GetParam())); + ASSERT_OK(Put(key, "value")); + ASSERT_OK(Flush()); + ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr) + .IsNotSupported()); + Close(); +} + +TEST_F(DBBlobCompactionTest, CompactionFilter_InlinedTTLIndex) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.enable_blob_files = true; + options.min_blob_size = 0; + std::unique_ptr compaction_filter_guard( + new ValueMutationFilter("")); + options.compaction_filter = compaction_filter_guard.get(); + DestroyAndReopen(options); + constexpr char key[] = "key"; + constexpr char blob[] = "blob"; + // Fake an inlined TTL blob index. + std::string blob_index; + constexpr uint64_t expiration = 1234567890; + BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob); + WriteBatch batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(Flush()); + ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr) + .IsCorruption()); + Close(); +} + +TEST_F(DBBlobCompactionTest, CompactionFilter) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.enable_blob_files = true; + options.min_blob_size = 0; + constexpr char padding[] = "_delta"; + std::unique_ptr compaction_filter_guard( + new ValueMutationFilter(padding)); + options.compaction_filter = compaction_filter_guard.get(); + DestroyAndReopen(options); + const std::vector> kvs = { + {"a", "a_value"}, {"b", "b_value"}, {"c", "c_value"}}; + for (const auto& kv : kvs) { + ASSERT_OK(Put(kv.first, kv.second)); + } + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + for (const auto& kv : kvs) { + ASSERT_EQ(kv.second + std::string(padding), Get(kv.first)); + } + Close(); +} + +TEST_F(DBBlobCompactionTest, CorruptedBlobIndex) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.enable_blob_files = true; + options.min_blob_size = 0; + std::unique_ptr compaction_filter_guard( + new ValueMutationFilter("")); + options.compaction_filter = compaction_filter_guard.get(); + DestroyAndReopen(options); + // Mock a corrupted blob index + constexpr char key[] = "key"; + std::string blob_idx("blob_idx"); + WriteBatch write_batch; + ASSERT_OK(WriteBatchInternal::PutBlobIndex(&write_batch, 0, key, blob_idx)); + ASSERT_OK(db_->Write(WriteOptions(), &write_batch)); + ASSERT_OK(Flush()); + ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr) + .IsCorruption()); + Close(); +} + +TEST_F(DBBlobCompactionTest, CompactionFilterReadBlobAndKeep) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.enable_blob_files = true; + options.min_blob_size = 0; + std::unique_ptr compaction_filter_guard( + new AlwaysKeepFilter()); + options.compaction_filter = compaction_filter_guard.get(); + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "foo_value")); + ASSERT_OK(Flush()); + std::vector blob_files = GetBlobFileNumbers(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + ASSERT_EQ(blob_files, GetBlobFileNumbers()); + + Close(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 02ce7818e..8064474da 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -204,72 +204,140 @@ void CompactionIterator::Next() { bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until) { - if (compaction_filter_ != nullptr && - (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) { - // If the user has specified a compaction filter and the sequence - // number is greater than any external snapshot, then invoke the - // filter. If the return value of the compaction filter is true, - // replace the entry with a deletion marker. - CompactionFilter::Decision filter; - compaction_filter_value_.clear(); - compaction_filter_skip_until_.Clear(); - CompactionFilter::ValueType value_type = - ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue - : CompactionFilter::ValueType::kBlobIndex; - // Hack: pass internal key to BlobIndexCompactionFilter since it needs - // to get sequence number. - Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_; - { - StopWatchNano timer(clock_, report_detailed_time_); + if (!compaction_filter_ || + (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex)) { + return true; + } + bool error = false; + // If the user has specified a compaction filter and the sequence + // number is greater than any external snapshot, then invoke the + // filter. If the return value of the compaction filter is true, + // replace the entry with a deletion marker. + CompactionFilter::Decision filter = CompactionFilter::Decision::kUndetermined; + compaction_filter_value_.clear(); + compaction_filter_skip_until_.Clear(); + CompactionFilter::ValueType value_type = + ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue + : CompactionFilter::ValueType::kBlobIndex; + // Hack: pass internal key to BlobIndexCompactionFilter since it needs + // to get sequence number. + assert(compaction_filter_); + Slice& filter_key = + (ikey_.type == kTypeValue || + !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) + ? ikey_.user_key + : key_; + { + StopWatchNano timer(clock_, report_detailed_time_); + if (kTypeBlobIndex == ikey_.type) { + blob_value_.Reset(); + filter = compaction_filter_->FilterBlobByKey( + compaction_->level(), filter_key, &compaction_filter_value_, + compaction_filter_skip_until_.rep()); + if (CompactionFilter::Decision::kUndetermined == filter && + !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { + // For integrated BlobDB impl, CompactionIterator reads blob value. + // For Stacked BlobDB impl, the corresponding CompactionFilter's + // FilterV2 method should read the blob value. + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(value_); + if (!s.ok()) { + status_ = s; + valid_ = false; + return false; + } + if (blob_index.HasTTL() || blob_index.IsInlined()) { + status_ = Status::Corruption("Unexpected TTL/inlined blob index"); + valid_ = false; + return false; + } + const Version* const version = compaction_->input_version(); + assert(version); + s = version->GetBlob(ReadOptions(), ikey_.user_key, blob_index, + &blob_value_); + if (!s.ok()) { + status_ = s; + valid_ = false; + return false; + } + value_type = CompactionFilter::ValueType::kValue; + } + } + if (CompactionFilter::Decision::kUndetermined == filter) { filter = compaction_filter_->FilterV2( - compaction_->level(), filter_key, value_type, value_, - &compaction_filter_value_, compaction_filter_skip_until_.rep()); - iter_stats_.total_filter_time += - env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0; + compaction_->level(), filter_key, value_type, + blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_, + compaction_filter_skip_until_.rep()); } + iter_stats_.total_filter_time += + env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0; + } - if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil && - cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= - 0) { - // Can't skip to a key smaller than the current one. - // Keep the key as per FilterV2 documentation. - filter = CompactionFilter::Decision::kKeep; - } + if (CompactionFilter::Decision::kUndetermined == filter) { + // Should not reach here, since FilterV2 should never return kUndetermined. + status_ = + Status::NotSupported("FilterV2() should never return kUndetermined"); + valid_ = false; + return false; + } - if (filter == CompactionFilter::Decision::kRemove) { - // convert the current key to a delete; key_ is pointing into - // current_key_ at this point, so updating current_key_ updates key() - ikey_.type = kTypeDeletion; - current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); - // no value associated with delete - value_.clear(); - iter_stats_.num_record_drop_user++; - } else if (filter == CompactionFilter::Decision::kChangeValue) { - if (ikey_.type == kTypeBlobIndex) { - // value transfer from blob file to inlined data - ikey_.type = kTypeValue; - current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); - } - value_ = compaction_filter_value_; - } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { - *need_skip = true; - compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, - kValueTypeForSeek); - *skip_until = compaction_filter_skip_until_.Encode(); - } else if (filter == CompactionFilter::Decision::kChangeBlobIndex) { - if (ikey_.type == kTypeValue) { - // value transfer from inlined data to blob file - ikey_.type = kTypeBlobIndex; - current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); - } - value_ = compaction_filter_value_; - } else if (filter == CompactionFilter::Decision::kIOError) { - status_ = - Status::IOError("Failed to access blob during compaction filter"); + if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil && + cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= + 0) { + // Can't skip to a key smaller than the current one. + // Keep the key as per FilterV2 documentation. + filter = CompactionFilter::Decision::kKeep; + } + + if (filter == CompactionFilter::Decision::kRemove) { + // convert the current key to a delete; key_ is pointing into + // current_key_ at this point, so updating current_key_ updates key() + ikey_.type = kTypeDeletion; + current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); + // no value associated with delete + value_.clear(); + iter_stats_.num_record_drop_user++; + } else if (filter == CompactionFilter::Decision::kChangeValue) { + if (ikey_.type == kTypeBlobIndex) { + // value transfer from blob file to inlined data + ikey_.type = kTypeValue; + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + } + value_ = compaction_filter_value_; + } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { + *need_skip = true; + compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, + kValueTypeForSeek); + *skip_until = compaction_filter_skip_until_.Encode(); + } else if (filter == CompactionFilter::Decision::kChangeBlobIndex) { + // Only the StackableDB-based BlobDB impl's compaction filter should return + // kChangeBlobIndex. Decision about rewriting blob and changing blob index + // in the integrated BlobDB impl is made in subsequent call to + // PrepareOutput() and its callees. + if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { + status_ = Status::NotSupported( + "Only stacked BlobDB's internal compaction filter can return " + "kChangeBlobIndex."); + valid_ = false; + return false; + } + if (ikey_.type == kTypeValue) { + // value transfer from inlined data to blob file + ikey_.type = kTypeBlobIndex; + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + } + value_ = compaction_filter_value_; + } else if (filter == CompactionFilter::Decision::kIOError) { + if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { + status_ = Status::NotSupported( + "CompactionFilter for integrated BlobDB should not return kIOError"); + valid_ = false; return false; } + status_ = Status::IOError("Failed to access blob during compaction filter"); + error = true; } - return true; + return !error; } void CompactionIterator::NextFromInput() { @@ -840,7 +908,8 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() { } // GC for stacked BlobDB - if (compaction_filter_) { + if (compaction_filter_ && + compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { const auto blob_decision = compaction_filter_->PrepareBlobOutput( user_key(), value_, &compaction_filter_value_); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index fd63052de..98757297d 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -175,7 +175,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { NewSstFileManager(result.env, result.info_log)); result.sst_file_manager = sst_file_manager; } -#endif +#endif // !ROCKSDB_LITE if (!result.paranoid_checks) { result.skip_checking_sst_file_sizes_on_db_open = true; diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 9ffd776ab..f1aed7468 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -47,6 +47,7 @@ class CompactionFilter { kRemoveAndSkipUntil, kChangeBlobIndex, // used internally by BlobDB. kIOError, // used internally by BlobDB. + kUndetermined, }; enum class BlobDecision { kKeep, kChangeValue, kCorruption, kIOError }; @@ -150,6 +151,7 @@ class CompactionFilter { // - If you use kRemoveAndSkipUntil, consider also reducing // compaction_readahead_size option. // + // Should never return kUndetermined. // Note: If you are using a TransactionDB, it is not recommended to filter // out or modify merge operands (ValueType::kMergeOperand). // If a merge operation is filtered out, TransactionDB may not realize there @@ -196,6 +198,20 @@ class CompactionFilter { // Returns a name that identifies this compaction filter. // The name will be printed to LOG file on start up for diagnosis. virtual const char* Name() const = 0; + + // Internal (BlobDB) use only. Do not override in application code. + virtual bool IsStackedBlobDbInternalCompactionFilter() const { return false; } + + // In the case of BlobDB, it may be possible to reach a decision with only + // the key without reading the actual value. Keys whose value_type is + // kBlobIndex will be checked by this method. + // Returning kUndetermined will cause FilterV2() to be called to make a + // decision as usual. + virtual Decision FilterBlobByKey(int /*level*/, const Slice& /*key*/, + std::string* /*new_value*/, + std::string* /*skip_until*/) const { + return Decision::kUndetermined; + } }; // Each compaction will create a new CompactionFilter allowing the diff --git a/src.mk b/src.mk index d4c867c89..15c109470 100644 --- a/src.mk +++ b/src.mk @@ -377,6 +377,7 @@ TEST_MAIN_SOURCES = \ db/blob/blob_file_garbage_test.cc \ db/blob/blob_file_reader_test.cc \ db/blob/db_blob_basic_test.cc \ + db/blob/db_blob_compaction_test.cc \ db/blob/db_blob_corruption_test.cc \ db/blob/db_blob_index_test.cc \ db/column_family_test.cc \ diff --git a/utilities/blob_db/blob_compaction_filter.h b/utilities/blob_db/blob_compaction_filter.h index ab1be03d7..ae6a0bc8e 100644 --- a/utilities/blob_db/blob_compaction_filter.h +++ b/utilities/blob_db/blob_compaction_filter.h @@ -54,6 +54,8 @@ class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase { const Slice& value, std::string* new_value, std::string* skip_until) const override; + bool IsStackedBlobDbInternalCompactionFilter() const override { return true; } + protected: bool IsBlobFileOpened() const; virtual bool OpenNewBlobFileIfNeeded() const;