From df680b24ef2bbd8c9ee8cf882baa7636bcc21be4 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Wed, 1 Feb 2023 10:03:07 -0800 Subject: [PATCH] Clean up InvokeFilterIfNeeded a bit (#11174) Summary: The patch makes some code quality enhancements in `CompactionIterator::InvokeFilterIfNeeded` including the renaming of `filter` (which is most likely a remnant of the days before the `FilterV2` API when the compaction filter used to return a boolean) to `decision`, the removal of some outdated comments, the elimination of an `error` flag which was only used in one failure case out of many, as well as some small stylistic improvements. (Some the above will also come in handy when adding compaction filter support for wide-column entities.) Pull Request resolved: https://github.com/facebook/rocksdb/pull/11174 Test Plan: `make check` Reviewed By: akankshamahajan15 Differential Revision: D42901408 Pulled By: ltamasi fbshipit-source-id: ab382d59a4990c5dfe1cee219d49e1d80902b666 --- db/compaction/compaction_iterator.cc | 79 ++++++++++++++++------------ 1 file changed, 44 insertions(+), 35 deletions(-) diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index e1bdddcb7..00191978f 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -221,39 +221,43 @@ void CompactionIterator::Next() { bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until) { + if (!compaction_filter_) { + return true; + } + // TODO: support compaction filter for wide-column entities - if (!compaction_filter_ || - (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex)) { + if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex) { return true; } - bool error = false; - // If the user has specified a compaction filter and the sequence - // number is greater than any external snapshot, then invoke the - // filter. If the return value of the compaction filter is true, - // replace the entry with a deletion marker. - CompactionFilter::Decision filter = CompactionFilter::Decision::kUndetermined; - compaction_filter_value_.clear(); - compaction_filter_skip_until_.Clear(); + + CompactionFilter::Decision decision = + CompactionFilter::Decision::kUndetermined; CompactionFilter::ValueType value_type = ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue : CompactionFilter::ValueType::kBlobIndex; + // Hack: pass internal key to BlobIndexCompactionFilter since it needs // to get sequence number. assert(compaction_filter_); - Slice& filter_key = - (ikey_.type == kTypeValue || + const Slice& filter_key = + (ikey_.type != kTypeBlobIndex || !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) ? ikey_.user_key : key_; + + compaction_filter_value_.clear(); + compaction_filter_skip_until_.Clear(); + { StopWatchNano timer(clock_, report_detailed_time_); - if (kTypeBlobIndex == ikey_.type) { - filter = compaction_filter_->FilterBlobByKey( + + if (ikey_.type == kTypeBlobIndex) { + decision = compaction_filter_->FilterBlobByKey( level_, filter_key, &compaction_filter_value_, compaction_filter_skip_until_.rep()); - if (CompactionFilter::Decision::kUndetermined == filter && + if (decision == CompactionFilter::Decision::kUndetermined && !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { - if (compaction_ == nullptr) { + if (!compaction_) { status_ = Status::Corruption("Unexpected blob index outside of compaction"); validity_info_.Invalidate(); @@ -299,17 +303,18 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, value_type = CompactionFilter::ValueType::kValue; } } - if (CompactionFilter::Decision::kUndetermined == filter) { - filter = compaction_filter_->FilterV2( + if (decision == CompactionFilter::Decision::kUndetermined) { + decision = compaction_filter_->FilterV2( level_, filter_key, value_type, blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_, compaction_filter_skip_until_.rep()); } + iter_stats_.total_filter_time += env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0; } - if (CompactionFilter::Decision::kUndetermined == filter) { + if (decision == CompactionFilter::Decision::kUndetermined) { // Should not reach here, since FilterV2 should never return kUndetermined. status_ = Status::NotSupported("FilterV2() should never return kUndetermined"); @@ -317,15 +322,15 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, return false; } - if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil && + if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil && cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= 0) { // Can't skip to a key smaller than the current one. // Keep the key as per FilterV2 documentation. - filter = CompactionFilter::Decision::kKeep; + decision = CompactionFilter::Decision::kKeep; } - if (filter == CompactionFilter::Decision::kRemove) { + if (decision == CompactionFilter::Decision::kRemove) { // convert the current key to a delete; key_ is pointing into // current_key_ at this point, so updating current_key_ updates key() ikey_.type = kTypeDeletion; @@ -333,7 +338,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, // no value associated with delete value_.clear(); iter_stats_.num_record_drop_user++; - } else if (filter == CompactionFilter::Decision::kPurge) { + } else if (decision == CompactionFilter::Decision::kPurge) { // convert the current key to a single delete; key_ is pointing into // current_key_ at this point, so updating current_key_ updates key() ikey_.type = kTypeSingleDeletion; @@ -341,19 +346,19 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, // no value associated with single delete value_.clear(); iter_stats_.num_record_drop_user++; - } else if (filter == CompactionFilter::Decision::kChangeValue) { - if (ikey_.type == kTypeBlobIndex) { - // value transfer from blob file to inlined data + } else if (decision == CompactionFilter::Decision::kChangeValue) { + if (ikey_.type != kTypeValue) { ikey_.type = kTypeValue; - current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + current_key_.UpdateInternalKey(ikey_.sequence, kTypeValue); } + value_ = compaction_filter_value_; - } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { + } else if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil) { *need_skip = true; compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, kValueTypeForSeek); *skip_until = compaction_filter_skip_until_.Encode(); - } else if (filter == CompactionFilter::Decision::kChangeBlobIndex) { + } else if (decision == CompactionFilter::Decision::kChangeBlobIndex) { // Only the StackableDB-based BlobDB impl's compaction filter should return // kChangeBlobIndex. Decision about rewriting blob and changing blob index // in the integrated BlobDB impl is made in subsequent call to @@ -365,23 +370,27 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, validity_info_.Invalidate(); return false; } - if (ikey_.type == kTypeValue) { - // value transfer from inlined data to blob file + + if (ikey_.type != kTypeBlobIndex) { ikey_.type = kTypeBlobIndex; - current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + current_key_.UpdateInternalKey(ikey_.sequence, kTypeBlobIndex); } + value_ = compaction_filter_value_; - } else if (filter == CompactionFilter::Decision::kIOError) { + } else if (decision == CompactionFilter::Decision::kIOError) { if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { status_ = Status::NotSupported( "CompactionFilter for integrated BlobDB should not return kIOError"); validity_info_.Invalidate(); return false; } + status_ = Status::IOError("Failed to access blob during compaction filter"); - error = true; + validity_info_.Invalidate(); + return false; } - return !error; + + return true; } void CompactionIterator::NextFromInput() {