diff --git a/db/blob/db_blob_compaction_test.cc b/db/blob/db_blob_compaction_test.cc index 69835def5..9038c93ff 100644 --- a/db/blob/db_blob_compaction_test.cc +++ b/db/blob/db_blob_compaction_test.cc @@ -565,6 +565,32 @@ TEST_F(DBBlobCompactionTest, TrackGarbage) { } } +TEST_F(DBBlobCompactionTest, MergeBlobWithBase) { + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + options.disable_auto_compactions = true; + + Reopen(options); + ASSERT_OK(Put("Key1", "v1_1")); + ASSERT_OK(Put("Key2", "v2_1")); + ASSERT_OK(Flush()); + + ASSERT_OK(Merge("Key1", "v1_2")); + ASSERT_OK(Merge("Key2", "v2_2")); + ASSERT_OK(Flush()); + + ASSERT_OK(Merge("Key1", "v1_3")); + ASSERT_OK(Flush()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, + /*end=*/nullptr)); + ASSERT_EQ(Get("Key1"), "v1_1,v1_2,v1_3"); + ASSERT_EQ(Get("Key2"), "v2_1,v2_2"); + Close(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 1869020f3..3ce38f90b 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -754,13 +754,15 @@ void CompactionIterator::NextFromInput() { } pinned_iters_mgr_.StartPinning(); + Version* version = compaction_ ? compaction_->input_version() : nullptr; + // We know the merge type entry is not hidden, otherwise we would // have hit (A) // We encapsulate the merge related state machine in a different // object to minimize change to the existing flow. - Status s = - merge_helper_->MergeUntil(&input_, range_del_agg_, prev_snapshot, - bottommost_level_, allow_data_in_errors_); + Status s = merge_helper_->MergeUntil(&input_, range_del_agg_, + prev_snapshot, bottommost_level_, + allow_data_in_errors_, version); merge_out_iter_.SeekToFirst(); if (!s.ok() && !s.IsMergeInProgress()) { diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 31cd3b6c5..388f4f1f2 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -7,6 +7,7 @@ #include +#include "db/blob/blob_fetcher.h" #include "db/dbformat.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" @@ -119,7 +120,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, CompactionRangeDelAggregator* range_del_agg, const SequenceNumber stop_before, const bool at_bottom, - const bool allow_data_in_errors) { + const bool allow_data_in_errors, + Version* version) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. assert(HasOperator()); @@ -203,12 +205,23 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // want. Also if we're in compaction and it's a put, it would be nice to // run compaction filter on it. const Slice val = iter->value(); + PinnableSlice blob_value; const Slice* val_ptr; - if (kTypeValue == ikey.type && + if ((kTypeValue == ikey.type || kTypeBlobIndex == ikey.type) && (range_del_agg == nullptr || !range_del_agg->ShouldDelete( ikey, RangeDelPositioningMode::kForwardTraversal))) { - val_ptr = &val; + if (ikey.type == kTypeBlobIndex) { + assert(version); + BlobFetcher blob_fetcher(version, ReadOptions()); + s = blob_fetcher.FetchBlob(ikey.user_key, val, &blob_value); + if (!s.ok()) { + return s; + } + val_ptr = &blob_value; + } else { + val_ptr = &val; + } } else { val_ptr = nullptr; } diff --git a/db/merge_helper.h b/db/merge_helper.h index f3bcd948b..7827babad 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -84,7 +84,8 @@ class MergeHelper { CompactionRangeDelAggregator* range_del_agg = nullptr, const SequenceNumber stop_before = 0, const bool at_bottom = false, - const bool allow_data_in_errors = false); + const bool allow_data_in_errors = false, + Version* version = nullptr); // Filters a merge operand using the compaction filter specified // in the constructor. Returns the decision that the filter made.