Add support for Merge with base value during Compaction in IntegratedBlobDB (#8445)

Summary:
Provide support for Merge operation with base values during
Compaction in IntegratedBlobDB.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8445

Test Plan: Add new unit test

Reviewed By: ltamasi

Differential Revision: D29343949

Pulled By: akankshamahajan15

fbshipit-source-id: 844f6f02f93388a11e6e08bda7bb3a2a28e47c70
main
Akanksha Mahajan 4 years ago committed by Facebook GitHub Bot
parent 66b62a123a
commit 95d0ee95fa
  1. 26
      db/blob/db_blob_compaction_test.cc
  2. 8
      db/compaction/compaction_iterator.cc
  3. 17
      db/merge_helper.cc
  4. 3
      db/merge_helper.h

@ -565,6 +565,32 @@ TEST_F(DBBlobCompactionTest, TrackGarbage) {
} }
} }
TEST_F(DBBlobCompactionTest, MergeBlobWithBase) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.disable_auto_compactions = true;
Reopen(options);
ASSERT_OK(Put("Key1", "v1_1"));
ASSERT_OK(Put("Key2", "v2_1"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key1", "v1_2"));
ASSERT_OK(Merge("Key2", "v2_2"));
ASSERT_OK(Flush());
ASSERT_OK(Merge("Key1", "v1_3"));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
/*end=*/nullptr));
ASSERT_EQ(Get("Key1"), "v1_1,v1_2,v1_3");
ASSERT_EQ(Get("Key2"), "v2_1,v2_2");
Close();
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -754,13 +754,15 @@ void CompactionIterator::NextFromInput() {
} }
pinned_iters_mgr_.StartPinning(); pinned_iters_mgr_.StartPinning();
Version* version = compaction_ ? compaction_->input_version() : nullptr;
// We know the merge type entry is not hidden, otherwise we would // We know the merge type entry is not hidden, otherwise we would
// have hit (A) // have hit (A)
// We encapsulate the merge related state machine in a different // We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow. // object to minimize change to the existing flow.
Status s = Status s = merge_helper_->MergeUntil(&input_, range_del_agg_,
merge_helper_->MergeUntil(&input_, range_del_agg_, prev_snapshot, prev_snapshot, bottommost_level_,
bottommost_level_, allow_data_in_errors_); allow_data_in_errors_, version);
merge_out_iter_.SeekToFirst(); merge_out_iter_.SeekToFirst();
if (!s.ok() && !s.IsMergeInProgress()) { if (!s.ok() && !s.IsMergeInProgress()) {

@ -7,6 +7,7 @@
#include <string> #include <string>
#include "db/blob/blob_fetcher.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
@ -119,7 +120,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
CompactionRangeDelAggregator* range_del_agg, CompactionRangeDelAggregator* range_del_agg,
const SequenceNumber stop_before, const SequenceNumber stop_before,
const bool at_bottom, const bool at_bottom,
const bool allow_data_in_errors) { const bool allow_data_in_errors,
Version* version) {
// Get a copy of the internal key, before it's invalidated by iter->Next() // Get a copy of the internal key, before it's invalidated by iter->Next()
// Also maintain the list of merge operands seen. // Also maintain the list of merge operands seen.
assert(HasOperator()); assert(HasOperator());
@ -203,12 +205,23 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// want. Also if we're in compaction and it's a put, it would be nice to // want. Also if we're in compaction and it's a put, it would be nice to
// run compaction filter on it. // run compaction filter on it.
const Slice val = iter->value(); const Slice val = iter->value();
PinnableSlice blob_value;
const Slice* val_ptr; const Slice* val_ptr;
if (kTypeValue == ikey.type && if ((kTypeValue == ikey.type || kTypeBlobIndex == ikey.type) &&
(range_del_agg == nullptr || (range_del_agg == nullptr ||
!range_del_agg->ShouldDelete( !range_del_agg->ShouldDelete(
ikey, RangeDelPositioningMode::kForwardTraversal))) { ikey, RangeDelPositioningMode::kForwardTraversal))) {
if (ikey.type == kTypeBlobIndex) {
assert(version);
BlobFetcher blob_fetcher(version, ReadOptions());
s = blob_fetcher.FetchBlob(ikey.user_key, val, &blob_value);
if (!s.ok()) {
return s;
}
val_ptr = &blob_value;
} else {
val_ptr = &val; val_ptr = &val;
}
} else { } else {
val_ptr = nullptr; val_ptr = nullptr;
} }

@ -84,7 +84,8 @@ class MergeHelper {
CompactionRangeDelAggregator* range_del_agg = nullptr, CompactionRangeDelAggregator* range_del_agg = nullptr,
const SequenceNumber stop_before = 0, const SequenceNumber stop_before = 0,
const bool at_bottom = false, const bool at_bottom = false,
const bool allow_data_in_errors = false); const bool allow_data_in_errors = false,
Version* version = nullptr);
// Filters a merge operand using the compaction filter specified // Filters a merge operand using the compaction filter specified
// in the constructor. Returns the decision that the filter made. // in the constructor. Returns the decision that the filter made.

Loading…
Cancel
Save