diff --git a/CMakeLists.txt b/CMakeLists.txt index 41a6f710b..ccbe14a00 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -503,6 +503,7 @@ set(SOURCES db/merge_helper.cc db/merge_operator.cc db/range_del_aggregator.cc + db/range_tombstone_fragmenter.cc db/repair.cc db/snapshot_impl.cc db/table_cache.cc @@ -901,6 +902,8 @@ if(WITH_TESTS) db/perf_context_test.cc db/plain_table_db_test.cc db/prefix_test.cc + db/range_del_aggregator_test.cc + db/range_tombstone_fragmenter_test.cc db/repair_test.cc db/table_properties_collector_test.cc db/version_builder_test.cc diff --git a/Makefile b/Makefile index 6cabfaded..6281dbd45 100644 --- a/Makefile +++ b/Makefile @@ -530,6 +530,7 @@ TESTS = \ db_universal_compaction_test \ trace_analyzer_test \ repeatable_thread_test \ + range_tombstone_fragmenter_test \ PARALLEL_TEST = \ backupable_db_test \ @@ -1556,6 +1557,9 @@ blob_db_test: utilities/blob_db/blob_db_test.o $(LIBOBJECTS) $(TESTHARNESS) repeatable_thread_test: util/repeatable_thread_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +range_tombstone_fragmenter_test: db/range_tombstone_fragmenter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/TARGETS b/TARGETS index 436b566fc..68a8e6f7f 100644 --- a/TARGETS +++ b/TARGETS @@ -123,6 +123,7 @@ cpp_library( "db/merge_helper.cc", "db/merge_operator.cc", "db/range_del_aggregator.cc", + "db/range_tombstone_fragmenter.cc", "db/repair.cc", "db/snapshot_impl.cc", "db/table_cache.cc", diff --git a/db/db_impl.cc b/db/db_impl.cc index 1c09e22d0..def0cd9ab 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1240,7 +1240,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, // Prepare to store a list of merge operations if merge occurs. MergeContext merge_context; - RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); + SequenceNumber max_covering_tombstone_seq = 0; Status s; // First look in the memtable, then in the immutable memtable (if any). @@ -1254,13 +1254,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, bool done = false; if (!skip_memtable) { if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &range_del_agg, read_options, callback, is_blob_index)) { + &max_covering_tombstone_seq, read_options, callback, + is_blob_index)) { done = true; pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); } else if ((s.ok() || s.IsMergeInProgress()) && sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &range_del_agg, read_options, callback, + &max_covering_tombstone_seq, read_options, callback, is_blob_index)) { done = true; pinnable_val->PinSelf(); @@ -1274,8 +1275,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, - &range_del_agg, value_found, nullptr, nullptr, callback, - is_blob_index); + &max_covering_tombstone_seq, value_found, nullptr, nullptr, + callback, is_blob_index); RecordTick(stats_, MEMTABLE_MISS); } @@ -1357,8 +1358,7 @@ std::vector DBImpl::MultiGet( LookupKey lkey(keys[i], snapshot); auto cfh = reinterpret_cast(column_family[i]); - RangeDelAggregator range_del_agg(cfh->cfd()->internal_comparator(), - snapshot); + SequenceNumber max_covering_tombstone_seq = 0; auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); assert(mgd_iter != multiget_cf_data.end()); auto mgd = mgd_iter->second; @@ -1369,11 +1369,12 @@ std::vector DBImpl::MultiGet( bool done = false; if (!skip_memtable) { if (super_version->mem->Get(lkey, value, &s, &merge_context, - &range_del_agg, read_options)) { + &max_covering_tombstone_seq, read_options)) { done = true; RecordTick(stats_, MEMTABLE_HIT); } else if (super_version->imm->Get(lkey, value, &s, &merge_context, - &range_del_agg, read_options)) { + &max_covering_tombstone_seq, + read_options)) { done = true; RecordTick(stats_, MEMTABLE_HIT); } @@ -1382,7 +1383,7 @@ std::vector DBImpl::MultiGet( PinnableSlice pinnable_val; PERF_TIMER_GUARD(get_from_output_files_time); super_version->current->Get(read_options, lkey, &pinnable_val, &s, - &merge_context, &range_del_agg); + &merge_context, &max_covering_tombstone_seq); value->assign(pinnable_val.data(), pinnable_val.size()); RecordTick(stats_, MEMTABLE_MISS); } @@ -2913,8 +2914,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, bool* is_blob_index) { Status s; MergeContext merge_context; - RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(), - kMaxSequenceNumber); + SequenceNumber max_covering_tombstone_seq = 0; ReadOptions read_options; SequenceNumber current_seq = versions_->LastSequence(); @@ -2924,8 +2924,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, *found_record_for_key = false; // Check if there is a record for this key in the latest memtable - sv->mem->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, - read_options, nullptr /*read_callback*/, is_blob_index); + sv->mem->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq, + seq, read_options, nullptr /*read_callback*/, is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -2943,8 +2943,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, } // Check if there is a record for this key in the immutable memtables - sv->imm->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, - read_options, nullptr /*read_callback*/, is_blob_index); + sv->imm->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq, + seq, read_options, nullptr /*read_callback*/, is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -2962,8 +2962,9 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, } // Check if there is a record for this key in the immutable memtables - sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, &range_del_agg, - seq, read_options, is_blob_index); + sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, + &max_covering_tombstone_seq, seq, read_options, + is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -2986,7 +2987,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, if (!cache_only) { // Check tables sv->current->Get(read_options, lkey, nullptr, &s, &merge_context, - &range_del_agg, nullptr /* value_found */, + &max_covering_tombstone_seq, nullptr /* value_found */, found_record_for_key, seq, nullptr /*read_callback*/, is_blob_index); diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index e32afd209..5ca120853 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -45,17 +45,17 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options, } SuperVersion* super_version = cfd->GetSuperVersion(); MergeContext merge_context; - RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); + SequenceNumber max_covering_tombstone_seq = 0; LookupKey lkey(key, snapshot); PERF_TIMER_STOP(get_snapshot_time); if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &range_del_agg, read_options)) { + &max_covering_tombstone_seq, read_options)) { pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); } else { PERF_TIMER_GUARD(get_from_output_files_time); super_version->current->Get(read_options, lkey, pinnable_val, &s, - &merge_context, &range_del_agg); + &merge_context, &max_covering_tombstone_seq); RecordTick(stats_, MEMTABLE_MISS); } RecordTick(stats_, NUMBER_KEYS_READ); diff --git a/db/memtable.cc b/db/memtable.cc index e2400414d..cbd8acedc 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -17,6 +17,7 @@ #include "db/merge_context.h" #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" +#include "db/range_tombstone_fragmenter.h" #include "db/read_callback.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" @@ -570,7 +571,7 @@ struct Saver { const MergeOperator* merge_operator; // the merge operations encountered; MergeContext* merge_context; - RangeDelAggregator* range_del_agg; + SequenceNumber max_covering_tombstone_seq; MemTable* mem; Logger* logger; Statistics* statistics; @@ -592,10 +593,10 @@ static bool SaveValue(void* arg, const char* entry) { Saver* s = reinterpret_cast(arg); assert(s != nullptr); MergeContext* merge_context = s->merge_context; - RangeDelAggregator* range_del_agg = s->range_del_agg; + SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq; const MergeOperator* merge_operator = s->merge_operator; - assert(merge_context != nullptr && range_del_agg != nullptr); + assert(merge_context != nullptr); // entry format is: // klength varint32 @@ -623,7 +624,7 @@ static bool SaveValue(void* arg, const char* entry) { s->seq = seq; if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && - range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) { + max_covering_tombstone_seq > seq) { type = kTypeRangeDeletion; } switch (type) { @@ -720,9 +721,9 @@ static bool SaveValue(void* arg, const char* entry) { bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, - RangeDelAggregator* range_del_agg, SequenceNumber* seq, - const ReadOptions& read_opts, ReadCallback* callback, - bool* is_blob_index) { + SequenceNumber* max_covering_tombstone_seq, + SequenceNumber* seq, const ReadOptions& read_opts, + ReadCallback* callback, bool* is_blob_index) { // The sequence number is updated synchronously in version_set.h if (IsEmpty()) { // Avoiding recording stats for speed. @@ -732,11 +733,13 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, std::unique_ptr range_del_iter( NewRangeTombstoneIterator(read_opts)); - Status status = range_del_agg->AddTombstones(std::move(range_del_iter)); - if (!status.ok()) { - *s = status; - return false; - } + SequenceNumber snapshot = GetInternalKeySeqno(key.internal_key()); + FragmentedRangeTombstoneIterator fragment_iter( + std::move(range_del_iter), comparator_.comparator, snapshot); + *max_covering_tombstone_seq = std::max( + *max_covering_tombstone_seq, + MaxCoveringTombstoneSeqnum(&fragment_iter, key.internal_key(), + comparator_.comparator.user_comparator())); Slice user_key = key.user_key(); bool found_final_value = false; @@ -762,7 +765,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.seq = kMaxSequenceNumber; saver.mem = this; saver.merge_context = merge_context; - saver.range_del_agg = range_del_agg; + saver.max_covering_tombstone_seq = *max_covering_tombstone_seq; saver.merge_operator = moptions_.merge_operator; saver.logger = moptions_.info_log; saver.inplace_update_support = moptions_.inplace_update_support; diff --git a/db/memtable.h b/db/memtable.h index 57751c922..be1960515 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -187,17 +187,19 @@ class MemTable { // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other // status returned indicates a corruption or other unexpected error. bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context, RangeDelAggregator* range_del_agg, - SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, + const ReadOptions& read_opts, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context, RangeDelAggregator* range_del_agg, + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts, ReadCallback* callback = nullptr, bool* is_blob_index = nullptr) { SequenceNumber seq; - return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts, - callback, is_blob_index); + return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq, + read_opts, callback, is_blob_index); } // Attempts to update the new_value inplace, else does normal Add diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 79e87c03f..e9842c823 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -104,34 +104,36 @@ int MemTableList::NumFlushed() const { // Operands stores the list of merge operations to apply, so far. bool MemTableListVersion::Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, - RangeDelAggregator* range_del_agg, + SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) { - return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg, - seq, read_opts, callback, is_blob_index); + return GetFromList(&memlist_, key, value, s, merge_context, + max_covering_tombstone_seq, seq, read_opts, callback, + is_blob_index); } bool MemTableListVersion::GetFromHistory( const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context, RangeDelAggregator* range_del_agg, + MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) { return GetFromList(&memlist_history_, key, value, s, merge_context, - range_del_agg, seq, read_opts, nullptr /*read_callback*/, - is_blob_index); + max_covering_tombstone_seq, seq, read_opts, + nullptr /*read_callback*/, is_blob_index); } bool MemTableListVersion::GetFromList( std::list* list, const LookupKey& key, std::string* value, - Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback, - bool* is_blob_index) { + Status* s, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, + const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) { *seq = kMaxSequenceNumber; for (auto& memtable : *list) { SequenceNumber current_seq = kMaxSequenceNumber; - bool done = memtable->Get(key, value, s, merge_context, range_del_agg, - ¤t_seq, read_opts, callback, is_blob_index); + bool done = + memtable->Get(key, value, s, merge_context, max_covering_tombstone_seq, + ¤t_seq, read_opts, callback, is_blob_index); if (*seq == kMaxSequenceNumber) { // Store the most recent sequence number of any operation on this key. // Since we only care about the most recent change, we only need to diff --git a/db/memtable_list.h b/db/memtable_list.h index 3f6f399a1..75f5cd56c 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -55,17 +55,19 @@ class MemTableListVersion { // will be stored in *seq on success (regardless of whether true/false is // returned). Otherwise, *seq will be set to kMaxSequenceNumber. bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context, RangeDelAggregator* range_del_agg, - SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, + const ReadOptions& read_opts, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context, RangeDelAggregator* range_del_agg, + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts, ReadCallback* callback = nullptr, bool* is_blob_index = nullptr) { SequenceNumber seq; - return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts, - callback, is_blob_index); + return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq, + read_opts, callback, is_blob_index); } // Similar to Get(), but searches the Memtable history of memtables that @@ -74,17 +76,18 @@ class MemTableListVersion { // writes that are also present in the SST files. bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, - RangeDelAggregator* range_del_agg, SequenceNumber* seq, - const ReadOptions& read_opts, + SequenceNumber* max_covering_tombstone_seq, + SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index = nullptr); bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, - RangeDelAggregator* range_del_agg, + SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts, bool* is_blob_index = nullptr) { SequenceNumber seq; - return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq, - read_opts, is_blob_index); + return GetFromHistory(key, value, s, merge_context, + max_covering_tombstone_seq, &seq, read_opts, + is_blob_index); } Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena, @@ -123,8 +126,8 @@ class MemTableListVersion { bool GetFromList(std::list* list, const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, - RangeDelAggregator* range_del_agg, SequenceNumber* seq, - const ReadOptions& read_opts, + SequenceNumber* max_covering_tombstone_seq, + SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index c6c5486d7..7482b6611 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -198,12 +198,12 @@ TEST_F(MemTableListTest, GetTest) { Status s; MergeContext merge_context; InternalKeyComparator ikey_cmp(options.comparator); - RangeDelAggregator range_del_agg(ikey_cmp, {} /* snapshots */); + SequenceNumber max_covering_tombstone_seq = 0; autovector to_delete; LookupKey lkey("key1", seq); bool found = list.current()->Get(lkey, &value, &s, &merge_context, - &range_del_agg, ReadOptions()); + &max_covering_tombstone_seq, ReadOptions()); ASSERT_FALSE(found); // Create a MemTable @@ -226,19 +226,19 @@ TEST_F(MemTableListTest, GetTest) { // Fetch the newly written keys merge_context.Clear(); found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context, - &range_del_agg, ReadOptions()); + &max_covering_tombstone_seq, ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ(value, "value1"); merge_context.Clear(); found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context, - &range_del_agg, ReadOptions()); + &max_covering_tombstone_seq, ReadOptions()); // MemTable found out that this key is *not* found (at this sequence#) ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context, - &range_del_agg, ReadOptions()); + &max_covering_tombstone_seq, ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ(value, "value2.2"); @@ -264,25 +264,28 @@ TEST_F(MemTableListTest, GetTest) { // Fetch keys via MemTableList merge_context.Clear(); - found = list.current()->Get(LookupKey("key1", seq), &value, &s, - &merge_context, &range_del_agg, ReadOptions()); + found = + list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); found = list.current()->Get(LookupKey("key1", saved_seq), &value, &s, - &merge_context, &range_del_agg, ReadOptions()); + &merge_context, &max_covering_tombstone_seq, + ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ("value1", value); merge_context.Clear(); - found = list.current()->Get(LookupKey("key2", seq), &value, &s, - &merge_context, &range_del_agg, ReadOptions()); + found = + list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ(value, "value2.3"); merge_context.Clear(); found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context, - &range_del_agg, ReadOptions()); + &max_covering_tombstone_seq, ReadOptions()); ASSERT_FALSE(found); ASSERT_EQ(2, list.NumNotFlushed()); @@ -305,12 +308,12 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { Status s; MergeContext merge_context; InternalKeyComparator ikey_cmp(options.comparator); - RangeDelAggregator range_del_agg(ikey_cmp, {} /* snapshots */); + SequenceNumber max_covering_tombstone_seq = 0; autovector to_delete; LookupKey lkey("key1", seq); bool found = list.current()->Get(lkey, &value, &s, &merge_context, - &range_del_agg, ReadOptions()); + &max_covering_tombstone_seq, ReadOptions()); ASSERT_FALSE(found); // Create a MemTable @@ -332,13 +335,13 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Fetch the newly written keys merge_context.Clear(); found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context, - &range_del_agg, ReadOptions()); + &max_covering_tombstone_seq, ReadOptions()); // MemTable found out that this key is *not* found (at this sequence#) ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context, - &range_del_agg, ReadOptions()); + &max_covering_tombstone_seq, ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ(value, "value2.2"); @@ -348,13 +351,15 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Fetch keys via MemTableList merge_context.Clear(); - found = list.current()->Get(LookupKey("key1", seq), &value, &s, - &merge_context, &range_del_agg, ReadOptions()); + found = + list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); - found = list.current()->Get(LookupKey("key2", seq), &value, &s, - &merge_context, &range_del_agg, ReadOptions()); + found = + list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ("value2.2", value); @@ -374,26 +379,28 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Verify keys are no longer in MemTableList merge_context.Clear(); - found = list.current()->Get(LookupKey("key1", seq), &value, &s, - &merge_context, &range_del_agg, ReadOptions()); + found = + list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_FALSE(found); merge_context.Clear(); - found = list.current()->Get(LookupKey("key2", seq), &value, &s, - &merge_context, &range_del_agg, ReadOptions()); + found = + list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_FALSE(found); // Verify keys are present in history merge_context.Clear(); - found = list.current()->GetFromHistory(LookupKey("key1", seq), &value, &s, - &merge_context, &range_del_agg, - ReadOptions()); + found = list.current()->GetFromHistory( + LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); - found = list.current()->GetFromHistory(LookupKey("key2", seq), &value, &s, - &merge_context, &range_del_agg, - ReadOptions()); + found = list.current()->GetFromHistory( + LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_TRUE(found); ASSERT_EQ("value2.2", value); @@ -434,38 +441,42 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Verify keys are no longer in MemTableList merge_context.Clear(); - found = list.current()->Get(LookupKey("key1", seq), &value, &s, - &merge_context, &range_del_agg, ReadOptions()); + found = + list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_FALSE(found); merge_context.Clear(); - found = list.current()->Get(LookupKey("key2", seq), &value, &s, - &merge_context, &range_del_agg, ReadOptions()); + found = + list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_FALSE(found); merge_context.Clear(); - found = list.current()->Get(LookupKey("key3", seq), &value, &s, - &merge_context, &range_del_agg, ReadOptions()); + found = + list.current()->Get(LookupKey("key3", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_FALSE(found); // Verify that the second memtable's keys are in the history merge_context.Clear(); - found = list.current()->GetFromHistory(LookupKey("key1", seq), &value, &s, - &merge_context, &range_del_agg, - ReadOptions()); + found = list.current()->GetFromHistory( + LookupKey("key1", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); - found = list.current()->GetFromHistory(LookupKey("key3", seq), &value, &s, - &merge_context, &range_del_agg, - ReadOptions()); + found = list.current()->GetFromHistory( + LookupKey("key3", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_TRUE(found); ASSERT_EQ("value3", value); // Verify that key2 from the first memtable is no longer in the history merge_context.Clear(); - found = list.current()->Get(LookupKey("key2", seq), &value, &s, - &merge_context, &range_del_agg, ReadOptions()); + found = + list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &max_covering_tombstone_seq, ReadOptions()); ASSERT_FALSE(found); // Cleanup diff --git a/db/range_tombstone_fragmenter.cc b/db/range_tombstone_fragmenter.cc new file mode 100644 index 000000000..cc3d26e06 --- /dev/null +++ b/db/range_tombstone_fragmenter.cc @@ -0,0 +1,277 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/range_tombstone_fragmenter.h" + +#include +#include +#include + +#include +#include + +#include "util/kv_map.h" +#include "util/vector_iterator.h" + +namespace rocksdb { + +namespace { + +struct ParsedInternalKeyComparator { + explicit ParsedInternalKeyComparator(const InternalKeyComparator* c) + : cmp(c) {} + + bool operator()(const ParsedInternalKey& a, + const ParsedInternalKey& b) const { + return cmp->Compare(a, b) < 0; + } + + const InternalKeyComparator* cmp; +}; + +} // anonymous namespace + +FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator( + std::unique_ptr unfragmented_tombstones, + const InternalKeyComparator& icmp, SequenceNumber snapshot) + : tombstone_cmp_(icmp.user_comparator()), + icmp_(&icmp), + ucmp_(icmp.user_comparator()) { + if (unfragmented_tombstones == nullptr) { + pos_ = tombstones_.end(); + return; + } + bool is_sorted = true; + int num_tombstones = 0; + InternalKey pinned_last_start_key; + Slice last_start_key; + for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid(); + unfragmented_tombstones->Next(), num_tombstones++) { + if (num_tombstones > 0 && + icmp_->Compare(last_start_key, unfragmented_tombstones->key()) > 0) { + is_sorted = false; + break; + } + if (unfragmented_tombstones->IsKeyPinned()) { + last_start_key = unfragmented_tombstones->key(); + } else { + pinned_last_start_key.DecodeFrom(unfragmented_tombstones->key()); + last_start_key = pinned_last_start_key.Encode(); + } + } + if (is_sorted) { + FragmentTombstones(std::move(unfragmented_tombstones), snapshot); + return; + } + + // Sort the tombstones before fragmenting them. + std::vector keys, values; + keys.reserve(num_tombstones); + values.reserve(num_tombstones); + for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid(); + unfragmented_tombstones->Next()) { + keys.emplace_back(unfragmented_tombstones->key().data(), + unfragmented_tombstones->key().size()); + values.emplace_back(unfragmented_tombstones->value().data(), + unfragmented_tombstones->value().size()); + } + // VectorIterator implicitly sorts by key during construction. + auto iter = std::unique_ptr( + new VectorIterator(std::move(keys), std::move(values), icmp_)); + FragmentTombstones(std::move(iter), snapshot); +} + +void FragmentedRangeTombstoneIterator::FragmentTombstones( + std::unique_ptr unfragmented_tombstones, + SequenceNumber snapshot) { + Slice cur_start_key(nullptr, 0); + auto cmp = ParsedInternalKeyComparator(icmp_); + + // Stores the end keys and sequence numbers of range tombstones with a start + // key less than or equal to cur_start_key. Provides an ordering by end key + // for use in flush_current_tombstones. + std::set cur_end_keys(cmp); + + // Given the next start key in unfragmented_tombstones, + // flush_current_tombstones writes every tombstone fragment that starts + // and ends with a key before next_start_key, and starts with a key greater + // than or equal to cur_start_key. + auto flush_current_tombstones = [&](const Slice& next_start_key) { + auto it = cur_end_keys.begin(); + bool reached_next_start_key = false; + for (; it != cur_end_keys.end() && !reached_next_start_key; ++it) { + Slice cur_end_key = it->user_key; + if (icmp_->user_comparator()->Compare(cur_start_key, cur_end_key) == 0) { + // Empty tombstone. + continue; + } + if (icmp_->user_comparator()->Compare(next_start_key, cur_end_key) <= 0) { + // All of the end keys in [it, cur_end_keys.end()) are after + // next_start_key, so the tombstones they represent can be used in + // fragments that start with keys greater than or equal to + // next_start_key. However, the end keys we already passed will not be + // used in any more tombstone fragments. + // + // Remove the fully fragmented tombstones and stop iteration after a + // final round of flushing to preserve the tombstones we can create more + // fragments from. + reached_next_start_key = true; + cur_end_keys.erase(cur_end_keys.begin(), it); + cur_end_key = next_start_key; + } + + // Flush a range tombstone fragment [cur_start_key, cur_end_key), which + // should not overlap with the last-flushed tombstone fragment. + assert(tombstones_.empty() || + icmp_->user_comparator()->Compare(tombstones_.back().end_key_, + cur_start_key) <= 0); + + SequenceNumber max_seqnum = 0; + for (auto flush_it = it; flush_it != cur_end_keys.end(); ++flush_it) { + max_seqnum = std::max(max_seqnum, flush_it->sequence); + } + // Flush only the tombstone fragment with the highest sequence + // number. + tombstones_.push_back( + RangeTombstone(cur_start_key, cur_end_key, max_seqnum)); + cur_start_key = cur_end_key; + } + if (!reached_next_start_key) { + // There is a gap between the last flushed tombstone fragment and + // the next tombstone's start key. Remove all the end keys in + // the working set, since we have fully fragmented their corresponding + // tombstones. + cur_end_keys.clear(); + } + cur_start_key = next_start_key; + }; + + pinned_iters_mgr_.StartPinning(); + + bool no_tombstones = true; + for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid(); + unfragmented_tombstones->Next()) { + const Slice& ikey = unfragmented_tombstones->key(); + Slice tombstone_start_key = ExtractUserKey(ikey); + SequenceNumber tombstone_seq = GetInternalKeySeqno(ikey); + if (tombstone_seq > snapshot) { + // The tombstone is not visible by this snapshot. + continue; + } + no_tombstones = false; + + Slice tombstone_end_key = unfragmented_tombstones->value(); + if (!unfragmented_tombstones->IsValuePinned()) { + pinned_slices_.emplace_back(tombstone_end_key.data(), + tombstone_end_key.size()); + tombstone_end_key = pinned_slices_.back(); + } + if (!cur_end_keys.empty() && icmp_->user_comparator()->Compare( + cur_start_key, tombstone_start_key) != 0) { + // The start key has changed. Flush all tombstones that start before + // this new start key. + flush_current_tombstones(tombstone_start_key); + } + if (unfragmented_tombstones->IsKeyPinned()) { + cur_start_key = tombstone_start_key; + } else { + pinned_slices_.emplace_back(tombstone_start_key.data(), + tombstone_start_key.size()); + cur_start_key = pinned_slices_.back(); + } + + cur_end_keys.emplace(tombstone_end_key, tombstone_seq, kTypeRangeDeletion); + } + if (!cur_end_keys.empty()) { + ParsedInternalKey last_end_key = *std::prev(cur_end_keys.end()); + flush_current_tombstones(last_end_key.user_key); + } + + if (!no_tombstones) { + pinned_iters_mgr_.PinIterator(unfragmented_tombstones.release(), + false /* arena */); + } + + // With this, the caller must Seek before the iterator is valid. + pos_ = tombstones_.end(); + pinned_pos_ = tombstones_.end(); +} + +void FragmentedRangeTombstoneIterator::SeekToFirst() { + pos_ = tombstones_.begin(); +} + +void FragmentedRangeTombstoneIterator::SeekToLast() { + pos_ = tombstones_.end(); + Prev(); +} + +void FragmentedRangeTombstoneIterator::Seek(const Slice& target) { + if (tombstones_.empty()) { + pos_ = tombstones_.end(); + return; + } + RangeTombstone search(ExtractUserKey(target), ExtractUserKey(target), + GetInternalKeySeqno(target)); + pos_ = std::lower_bound(tombstones_.begin(), tombstones_.end(), search, + tombstone_cmp_); +} + +void FragmentedRangeTombstoneIterator::SeekForPrev(const Slice& target) { + Seek(target); + if (!Valid()) { + SeekToLast(); + } + ParsedInternalKey parsed_target; + if (!ParseInternalKey(target, &parsed_target)) { + assert(false); + } + ParsedInternalKey parsed_start_key; + ParseKey(&parsed_start_key); + while (Valid() && icmp_->Compare(parsed_target, parsed_start_key) < 0) { + Prev(); + ParseKey(&parsed_start_key); + } +} + +void FragmentedRangeTombstoneIterator::Next() { ++pos_; } + +void FragmentedRangeTombstoneIterator::Prev() { + if (pos_ == tombstones_.begin()) { + pos_ = tombstones_.end(); + return; + } + --pos_; +} + +bool FragmentedRangeTombstoneIterator::Valid() const { + return pos_ != tombstones_.end(); +} + +SequenceNumber MaxCoveringTombstoneSeqnum( + FragmentedRangeTombstoneIterator* tombstone_iter, const Slice& lookup_key, + const Comparator* ucmp) { + SequenceNumber snapshot = GetInternalKeySeqno(lookup_key); + Slice user_key = ExtractUserKey(lookup_key); + + tombstone_iter->Seek(lookup_key); + SequenceNumber highest_covering_seqnum = 0; + if (!tombstone_iter->Valid()) { + // Seeked past the last tombstone + tombstone_iter->Prev(); + } + while (tombstone_iter->Valid() && + ucmp->Compare(user_key, tombstone_iter->value()) < 0) { + if (tombstone_iter->seq() <= snapshot && + ucmp->Compare(tombstone_iter->user_key(), user_key) <= 0) { + highest_covering_seqnum = + std::max(highest_covering_seqnum, tombstone_iter->seq()); + } + tombstone_iter->Prev(); + } + return highest_covering_seqnum; +} + +} // namespace rocksdb diff --git a/db/range_tombstone_fragmenter.h b/db/range_tombstone_fragmenter.h new file mode 100644 index 000000000..e82171787 --- /dev/null +++ b/db/range_tombstone_fragmenter.h @@ -0,0 +1,103 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include +#include + +#include "db/dbformat.h" +#include "db/pinned_iterators_manager.h" +#include "rocksdb/status.h" +#include "table/internal_iterator.h" + +namespace rocksdb { + +// FragmentedRangeTombstoneIterator converts an InternalIterator of a range-del +// meta block into an iterator over non-overlapping tombstone fragments. The +// tombstone fragmentation process should be more efficient than the range +// tombstone collapsing algorithm in RangeDelAggregator because this leverages +// the internal key ordering already provided by the input iterator, if +// applicable (when the iterator is unsorted, a new sorted iterator is created +// before proceeding). If there are few overlaps, creating a +// FragmentedRangeTombstoneIterator should be O(n), while the RangeDelAggregator +// tombstone collapsing is always O(n log n). +class FragmentedRangeTombstoneIterator : public InternalIterator { + public: + FragmentedRangeTombstoneIterator( + std::unique_ptr unfragmented_tombstones, + const InternalKeyComparator& icmp, SequenceNumber snapshot); + void SeekToFirst() override; + void SeekToLast() override; + void Seek(const Slice& target) override; + void SeekForPrev(const Slice& target) override; + void Next() override; + void Prev() override; + bool Valid() const override; + Slice key() const override { + MaybePinKey(); + return current_start_key_.Encode(); + } + Slice value() const override { return pos_->end_key_; } + bool IsKeyPinned() const override { return false; } + bool IsValuePinned() const override { return true; } + Status status() const override { return Status::OK(); } + + Slice user_key() const { return pos_->start_key_; } + SequenceNumber seq() const { return pos_->seq_; } + + private: + struct FragmentedRangeTombstoneComparator { + explicit FragmentedRangeTombstoneComparator(const Comparator* c) : cmp(c) {} + + bool operator()(const RangeTombstone& a, const RangeTombstone& b) const { + int user_key_cmp = cmp->Compare(a.start_key_, b.start_key_); + if (user_key_cmp != 0) { + return user_key_cmp < 0; + } + return a.seq_ > b.seq_; + } + + const Comparator* cmp; + }; + + void MaybePinKey() const { + if (pos_ != tombstones_.end() && pinned_pos_ != pos_) { + current_start_key_.Set(pos_->start_key_, pos_->seq_, kTypeRangeDeletion); + pinned_pos_ = pos_; + } + } + + void ParseKey(ParsedInternalKey* parsed) const { + parsed->user_key = pos_->start_key_; + parsed->sequence = pos_->seq_; + parsed->type = kTypeRangeDeletion; + } + + // Given an ordered range tombstone iterator unfragmented_tombstones, + // "fragment" the tombstones into non-overlapping pieces, and store them in + // tombstones_. + void FragmentTombstones( + std::unique_ptr unfragmented_tombstones, + SequenceNumber snapshot); + + const FragmentedRangeTombstoneComparator tombstone_cmp_; + const InternalKeyComparator* icmp_; + const Comparator* ucmp_; + std::vector tombstones_; + std::list pinned_slices_; + std::vector::const_iterator pos_; + mutable std::vector::const_iterator pinned_pos_; + mutable InternalKey current_start_key_; + PinnedIteratorsManager pinned_iters_mgr_; +}; + +SequenceNumber MaxCoveringTombstoneSeqnum( + FragmentedRangeTombstoneIterator* tombstone_iter, const Slice& key, + const Comparator* ucmp); + +} // namespace rocksdb diff --git a/db/range_tombstone_fragmenter_test.cc b/db/range_tombstone_fragmenter_test.cc new file mode 100644 index 000000000..6f0036cb4 --- /dev/null +++ b/db/range_tombstone_fragmenter_test.cc @@ -0,0 +1,275 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/range_tombstone_fragmenter.h" + +#include "db/db_test_util.h" +#include "rocksdb/comparator.h" +#include "util/testutil.h" + +namespace rocksdb { + +class RangeTombstoneFragmenterTest : public testing::Test {}; + +namespace { + +static auto bytewise_icmp = InternalKeyComparator(BytewiseComparator()); + +std::unique_ptr MakeRangeDelIter( + const std::vector& range_dels) { + std::vector keys, values; + for (const auto& range_del : range_dels) { + auto key_and_value = range_del.Serialize(); + keys.push_back(key_and_value.first.Encode().ToString()); + values.push_back(key_and_value.second.ToString()); + } + return std::unique_ptr( + new test::VectorIterator(keys, values)); +} + +void VerifyFragmentedRangeDels( + FragmentedRangeTombstoneIterator* iter, + const std::vector& expected_tombstones) { + iter->SeekToFirst(); + for (size_t i = 0; i < expected_tombstones.size() && iter->Valid(); + i++, iter->Next()) { + EXPECT_EQ(ExtractUserKey(iter->key()), expected_tombstones[i].start_key_); + EXPECT_EQ(iter->value(), expected_tombstones[i].end_key_); + EXPECT_EQ(GetInternalKeySeqno(iter->key()), expected_tombstones[i].seq_); + } + EXPECT_FALSE(iter->Valid()); +} + +struct SeekForPrevTestCase { + Slice seek_target; + RangeTombstone expected_position; + bool out_of_range; +}; + +void VerifySeekForPrev(FragmentedRangeTombstoneIterator* iter, + const std::vector& cases) { + for (const auto& testcase : cases) { + InternalKey ikey_seek_target(testcase.seek_target, 0, kTypeRangeDeletion); + iter->SeekForPrev(ikey_seek_target.Encode()); + if (testcase.out_of_range) { + ASSERT_FALSE(iter->Valid()); + } else { + ASSERT_TRUE(iter->Valid()); + EXPECT_EQ(ExtractUserKey(iter->key()), + testcase.expected_position.start_key_); + EXPECT_EQ(iter->value(), testcase.expected_position.end_key_); + EXPECT_EQ(GetInternalKeySeqno(iter->key()), + testcase.expected_position.seq_); + } + } +} + +struct MaxCoveringTombstoneSeqnumTestCase { + Slice user_key; + int result; +}; + +void VerifyMaxCoveringTombstoneSeqnum( + FragmentedRangeTombstoneIterator* iter, const Comparator* ucmp, + const std::vector& cases) { + for (const auto& testcase : cases) { + InternalKey key_and_snapshot(testcase.user_key, kMaxSequenceNumber, + kTypeValue); + EXPECT_EQ(testcase.result, MaxCoveringTombstoneSeqnum( + iter, key_and_snapshot.Encode(), ucmp)); + } +} + +} // anonymous namespace + +TEST_F(RangeTombstoneFragmenterTest, NonOverlappingTombstones) { + auto range_del_iter = MakeRangeDelIter({{"a", "b", 10}, {"c", "d", 5}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, kMaxSequenceNumber); + VerifyFragmentedRangeDels(&iter, {{"a", "b", 10}, {"c", "d", 5}}); + VerifyMaxCoveringTombstoneSeqnum(&iter, bytewise_icmp.user_comparator(), + {{"", 0}, {"a", 10}, {"b", 0}, {"c", 5}}); +} + +TEST_F(RangeTombstoneFragmenterTest, OverlappingTombstones) { + auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, {"c", "g", 15}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, kMaxSequenceNumber); + VerifyFragmentedRangeDels(&iter, + {{"a", "c", 10}, {"c", "e", 15}, {"e", "g", 15}}); + VerifyMaxCoveringTombstoneSeqnum(&iter, bytewise_icmp.user_comparator(), + {{"a", 10}, {"c", 15}, {"e", 15}, {"g", 0}}); +} + +TEST_F(RangeTombstoneFragmenterTest, ContiguousTombstones) { + auto range_del_iter = MakeRangeDelIter( + {{"a", "c", 10}, {"c", "e", 20}, {"c", "e", 5}, {"e", "g", 15}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, kMaxSequenceNumber); + VerifyFragmentedRangeDels(&iter, + {{"a", "c", 10}, {"c", "e", 20}, {"e", "g", 15}}); + VerifyMaxCoveringTombstoneSeqnum(&iter, bytewise_icmp.user_comparator(), + {{"a", 10}, {"c", 20}, {"e", 15}, {"g", 0}}); +} + +TEST_F(RangeTombstoneFragmenterTest, RepeatedStartAndEndKey) { + auto range_del_iter = + MakeRangeDelIter({{"a", "c", 10}, {"a", "c", 7}, {"a", "c", 3}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, kMaxSequenceNumber); + VerifyFragmentedRangeDels(&iter, {{"a", "c", 10}}); + VerifyMaxCoveringTombstoneSeqnum(&iter, bytewise_icmp.user_comparator(), + {{"a", 10}, {"b", 10}, {"c", 0}}); +} + +TEST_F(RangeTombstoneFragmenterTest, RepeatedStartKeyDifferentEndKeys) { + auto range_del_iter = + MakeRangeDelIter({{"a", "e", 10}, {"a", "g", 7}, {"a", "c", 3}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, kMaxSequenceNumber); + VerifyFragmentedRangeDels(&iter, + {{"a", "c", 10}, {"c", "e", 10}, {"e", "g", 7}}); + VerifyMaxCoveringTombstoneSeqnum(&iter, bytewise_icmp.user_comparator(), + {{"a", 10}, {"c", 10}, {"e", 7}, {"g", 0}}); +} + +TEST_F(RangeTombstoneFragmenterTest, RepeatedStartKeyMixedEndKeys) { + auto range_del_iter = MakeRangeDelIter({{"a", "c", 30}, + {"a", "g", 20}, + {"a", "e", 10}, + {"a", "g", 7}, + {"a", "c", 3}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, kMaxSequenceNumber); + VerifyFragmentedRangeDels(&iter, + {{"a", "c", 30}, {"c", "e", 20}, {"e", "g", 20}}); + VerifyMaxCoveringTombstoneSeqnum(&iter, bytewise_icmp.user_comparator(), + {{"a", 30}, {"c", 20}, {"e", 20}, {"g", 0}}); +} + +TEST_F(RangeTombstoneFragmenterTest, OverlapAndRepeatedStartKey) { + auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, + {"c", "g", 8}, + {"c", "i", 6}, + {"j", "n", 4}, + {"j", "l", 2}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, kMaxSequenceNumber); + VerifyFragmentedRangeDels(&iter, {{"a", "c", 10}, + {"c", "e", 10}, + {"e", "g", 8}, + {"g", "i", 6}, + {"j", "l", 4}, + {"l", "n", 4}}); + VerifyMaxCoveringTombstoneSeqnum( + &iter, bytewise_icmp.user_comparator(), + {{"a", 10}, {"c", 10}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}}); +} + +TEST_F(RangeTombstoneFragmenterTest, OverlapAndRepeatedStartKeyWithSnapshot) { + auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, + {"c", "g", 8}, + {"c", "i", 6}, + {"j", "n", 4}, + {"j", "l", 2}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, 9); + VerifyFragmentedRangeDels( + &iter, {{"c", "g", 8}, {"g", "i", 6}, {"j", "l", 4}, {"l", "n", 4}}); + VerifyMaxCoveringTombstoneSeqnum( + &iter, bytewise_icmp.user_comparator(), + {{"a", 0}, {"c", 8}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}}); +} + +TEST_F(RangeTombstoneFragmenterTest, OverlapAndRepeatedStartKeyUnordered) { + auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, + {"j", "n", 4}, + {"c", "i", 6}, + {"c", "g", 8}, + {"j", "l", 2}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, 9); + VerifyFragmentedRangeDels( + &iter, {{"c", "g", 8}, {"g", "i", 6}, {"j", "l", 4}, {"l", "n", 4}}); + VerifyMaxCoveringTombstoneSeqnum( + &iter, bytewise_icmp.user_comparator(), + {{"a", 0}, {"c", 8}, {"e", 8}, {"i", 0}, {"j", 4}, {"m", 4}}); +} + +TEST_F(RangeTombstoneFragmenterTest, SeekForPrevStartKey) { + // Same tombstones as OverlapAndRepeatedStartKey. + auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, + {"c", "g", 8}, + {"c", "i", 6}, + {"j", "n", 4}, + {"j", "l", 2}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, kMaxSequenceNumber); + VerifySeekForPrev( + &iter, + {{"a", {"a", "c", 10}}, {"e", {"e", "g", 8}}, {"l", {"l", "n", 4}}}); +} + +TEST_F(RangeTombstoneFragmenterTest, SeekForPrevCovered) { + // Same tombstones as OverlapAndRepeatedStartKey. + auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, + {"c", "g", 8}, + {"c", "i", 6}, + {"j", "n", 4}, + {"j", "l", 2}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, kMaxSequenceNumber); + VerifySeekForPrev( + &iter, + {{"b", {"a", "c", 10}}, {"f", {"e", "g", 8}}, {"m", {"l", "n", 4}}}); +} + +TEST_F(RangeTombstoneFragmenterTest, SeekForPrevEndKey) { + // Same tombstones as OverlapAndRepeatedStartKey. + auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, + {"c", "g", 8}, + {"c", "i", 6}, + {"j", "n", 4}, + {"j", "l", 2}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, kMaxSequenceNumber); + VerifySeekForPrev(&iter, {{"c", {"c", "e", 10}}, + {"g", {"g", "i", 6}}, + {"i", {"g", "i", 6}}, + {"n", {"l", "n", 4}}}); +} + +TEST_F(RangeTombstoneFragmenterTest, SeekForPrevOutOfBounds) { + // Same tombstones as OverlapAndRepeatedStartKey. + auto range_del_iter = MakeRangeDelIter({{"a", "e", 10}, + {"c", "g", 8}, + {"c", "i", 6}, + {"j", "n", 4}, + {"j", "l", 2}}); + + FragmentedRangeTombstoneIterator iter(std::move(range_del_iter), + bytewise_icmp, kMaxSequenceNumber); + VerifySeekForPrev(&iter, + {{"", {}, true /* out of range */}, {"z", {"l", "n", 4}}}); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/table_cache.cc b/db/table_cache.cc index d28d067c3..94901a75d 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -10,6 +10,7 @@ #include "db/table_cache.h" #include "db/dbformat.h" +#include "db/range_tombstone_fragmenter.h" #include "db/version_edit.h" #include "util/filename.h" @@ -372,19 +373,19 @@ Status TableCache::Get(const ReadOptions& options, t = GetTableReaderFromHandle(handle); } } - if (s.ok() && get_context->range_del_agg() != nullptr && + SequenceNumber* max_covering_tombstone_seq = + get_context->max_covering_tombstone_seq(); + if (s.ok() && max_covering_tombstone_seq != nullptr && !options.ignore_range_deletions) { std::unique_ptr range_del_iter( t->NewRangeTombstoneIterator(options)); - if (range_del_iter != nullptr) { - s = range_del_iter->status(); - } - if (s.ok()) { - s = get_context->range_del_agg()->AddTombstones( - std::move(range_del_iter), - &file_meta.smallest, - &file_meta.largest); - } + FragmentedRangeTombstoneIterator fragment_iter(std::move(range_del_iter), + internal_comparator, + GetInternalKeySeqno(k)); + *max_covering_tombstone_seq = std::max( + *max_covering_tombstone_seq, + MaxCoveringTombstoneSeqnum(&fragment_iter, k, + internal_comparator.user_comparator())); } if (s.ok()) { get_context->SetReplayLog(row_cache_entry); // nullptr if no cache. diff --git a/db/version_set.cc b/db/version_set.cc index f25f579cf..e782a438e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1177,7 +1177,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, void Version::Get(const ReadOptions& read_options, const LookupKey& k, PinnableSlice* value, Status* status, MergeContext* merge_context, - RangeDelAggregator* range_del_agg, bool* value_found, + SequenceNumber* max_covering_tombstone_seq, bool* value_found, bool* key_exists, SequenceNumber* seq, ReadCallback* callback, bool* is_blob) { Slice ikey = k.internal_key(); @@ -1194,8 +1194,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, - value, value_found, merge_context, range_del_agg, this->env_, seq, - merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob); + value, value_found, merge_context, max_covering_tombstone_seq, this->env_, + seq, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob); // Pin blocks that we read to hold merge operands if (merge_operator_) { diff --git a/db/version_set.h b/db/version_set.h index 814c5a0e4..a26254eec 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -567,9 +567,10 @@ class Version { // REQUIRES: lock is not held void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value, Status* status, MergeContext* merge_context, - RangeDelAggregator* range_del_agg, bool* value_found = nullptr, - bool* key_exists = nullptr, SequenceNumber* seq = nullptr, - ReadCallback* callback = nullptr, bool* is_blob = nullptr); + SequenceNumber* max_covering_tombstone_seq, + bool* value_found = nullptr, bool* key_exists = nullptr, + SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr, + bool* is_blob = nullptr); // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. diff --git a/src.mk b/src.mk index d98ac1ba0..7ebd93a15 100644 --- a/src.mk +++ b/src.mk @@ -43,6 +43,7 @@ LIB_SOURCES = \ db/merge_helper.cc \ db/merge_operator.cc \ db/range_del_aggregator.cc \ + db/range_tombstone_fragmenter.cc \ db/repair.cc \ db/snapshot_impl.cc \ db/table_cache.cc \ @@ -329,6 +330,7 @@ MAIN_SOURCES = \ db/repair_test.cc \ db/range_del_aggregator_test.cc \ db/range_del_aggregator_bench.cc \ + db/range_tombstone_fragmenter_test.cc \ db/table_properties_collector_test.cc \ db/util_merge_operators_test.cc \ db/version_builder_test.cc \ diff --git a/table/data_block_hash_index_test.cc b/table/data_block_hash_index_test.cc index dc62917f2..f25147079 100644 --- a/table/data_block_hash_index_test.cc +++ b/table/data_block_hash_index_test.cc @@ -7,12 +7,14 @@ #include #include +#include "db/table_properties_collector.h" #include "rocksdb/slice.h" #include "table/block.h" #include "table/block_based_table_reader.h" #include "table/block_builder.h" #include "table/data_block_hash_index.h" #include "table/get_context.h" +#include "table/table_builder.h" #include "util/testharness.h" #include "util/testutil.h" diff --git a/table/get_context.cc b/table/get_context.cc index 0aa75b607..6f0bd2ebb 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -43,7 +43,7 @@ GetContext::GetContext(const Comparator* ucmp, Statistics* statistics, GetState init_state, const Slice& user_key, PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context, - RangeDelAggregator* _range_del_agg, Env* env, + SequenceNumber* _max_covering_tombstone_seq, Env* env, SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback, bool* is_blob_index) @@ -56,7 +56,7 @@ GetContext::GetContext(const Comparator* ucmp, pinnable_val_(pinnable_val), value_found_(value_found), merge_context_(merge_context), - range_del_agg_(_range_del_agg), + max_covering_tombstone_seq_(_max_covering_tombstone_seq), env_(env), seq_(seq), replay_log_(nullptr), @@ -185,7 +185,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, auto type = parsed_key.type; // Key matches. Process it if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && - range_del_agg_ != nullptr && range_del_agg_->ShouldDelete(parsed_key)) { + max_covering_tombstone_seq_ != nullptr && + *max_covering_tombstone_seq_ > parsed_key.sequence) { type = kTypeRangeDeletion; } switch (type) { diff --git a/table/get_context.h b/table/get_context.h index 066be104b..407473808 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -6,7 +6,6 @@ #pragma once #include #include "db/merge_context.h" -#include "db/range_del_aggregator.h" #include "db/read_callback.h" #include "rocksdb/env.h" #include "rocksdb/statistics.h" @@ -52,8 +51,9 @@ class GetContext { GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, PinnableSlice* value, bool* value_found, - MergeContext* merge_context, RangeDelAggregator* range_del_agg, - Env* env, SequenceNumber* seq = nullptr, + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, Env* env, + SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr, ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); @@ -76,7 +76,9 @@ class GetContext { GetState State() const { return state_; } - RangeDelAggregator* range_del_agg() { return range_del_agg_; } + SequenceNumber* max_covering_tombstone_seq() { + return max_covering_tombstone_seq_; + } PinnedIteratorsManager* pinned_iters_mgr() { return pinned_iters_mgr_; } @@ -111,7 +113,7 @@ class GetContext { PinnableSlice* pinnable_val_; bool* value_found_; // Is value set correctly? Used by KeyMayExist MergeContext* merge_context_; - RangeDelAggregator* range_del_agg_; + SequenceNumber* max_covering_tombstone_seq_; Env* env_; // If a key is found, seq_ will be set to the SequenceNumber of most recent // write to the key or kMaxSequenceNumber if unknown diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index 4032c4a5a..43e5b2f0b 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -170,12 +170,12 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, if (!through_db) { PinnableSlice value; MergeContext merge_context; - RangeDelAggregator range_del_agg(ikc, {} /* snapshots */); + SequenceNumber max_covering_tombstone_seq = 0; GetContext get_context(ioptions.user_comparator, ioptions.merge_operator, ioptions.info_log, ioptions.statistics, GetContext::kNotFound, Slice(key), &value, nullptr, &merge_context, - &range_del_agg, env); + &max_covering_tombstone_seq, env); s = table_reader->Get(read_options, key, &get_context, nullptr); } else { s = db->Get(read_options, key, &result); diff --git a/util/vector_iterator.h b/util/vector_iterator.h new file mode 100644 index 000000000..da60eb229 --- /dev/null +++ b/util/vector_iterator.h @@ -0,0 +1,100 @@ +#pragma once + +#include +#include +#include + +#include "db/dbformat.h" +#include "rocksdb/iterator.h" +#include "rocksdb/slice.h" +#include "table/internal_iterator.h" + +namespace rocksdb { + +// Iterator over a vector of keys/values +class VectorIterator : public InternalIterator { + public: + VectorIterator(std::vector keys, std::vector values, + const InternalKeyComparator* icmp) + : keys_(std::move(keys)), + values_(std::move(values)), + indexed_cmp_(icmp, &keys_), + current_(keys.size()) { + assert(keys_.size() == values_.size()); + + indices_.reserve(keys_.size()); + for (size_t i = 0; i < keys_.size(); i++) { + indices_.push_back(i); + } + std::sort(indices_.begin(), indices_.end(), indexed_cmp_); + } + + virtual bool Valid() const override { + return !indices_.empty() && current_ < indices_.size(); + } + + virtual void SeekToFirst() override { current_ = 0; } + virtual void SeekToLast() override { current_ = indices_.size() - 1; } + + virtual void Seek(const Slice& target) override { + current_ = std::lower_bound(indices_.begin(), indices_.end(), target, + indexed_cmp_) - + indices_.begin(); + } + + virtual void SeekForPrev(const Slice& target) override { + current_ = std::lower_bound(indices_.begin(), indices_.end(), target, + indexed_cmp_) - + indices_.begin(); + if (!Valid()) { + SeekToLast(); + } else { + Prev(); + } + } + + virtual void Next() override { current_++; } + virtual void Prev() override { current_--; } + + virtual Slice key() const override { + return Slice(keys_[indices_[current_]]); + } + virtual Slice value() const override { + return Slice(values_[indices_[current_]]); + } + + virtual Status status() const override { return Status::OK(); } + + virtual bool IsKeyPinned() const override { return true; } + virtual bool IsValuePinned() const override { return true; } + + private: + struct IndexedKeyComparator { + IndexedKeyComparator(const InternalKeyComparator* c, + const std::vector* ks) + : cmp(c), keys(ks) {} + + bool operator()(size_t a, size_t b) const { + return cmp->Compare((*keys)[a], (*keys)[b]) < 0; + } + + bool operator()(size_t a, const Slice& b) const { + return cmp->Compare((*keys)[a], b) < 0; + } + + bool operator()(const Slice& a, size_t b) const { + return cmp->Compare(a, (*keys)[b]) < 0; + } + + const InternalKeyComparator* cmp; + const std::vector* keys; + }; + + std::vector keys_; + std::vector values_; + IndexedKeyComparator indexed_cmp_; + std::vector indices_; + size_t current_; +}; + +} // namespace rocksdb