From f998c9790f8cb78145a67c33da61578fe5051b0f Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Thu, 3 Nov 2016 18:40:23 -0700 Subject: [PATCH] DeleteRange Get support Summary: During Get()/MultiGet(), build up a RangeDelAggregator with range tombstones as we search through live memtable, immutable memtables, and SST files. This aggregator is then used by memtable.cc's SaveValue() and GetContext::SaveValue() to check whether keys are covered. added tests for Get on memtables/files; end-to-end tests mainly in https://reviews.facebook.net/D64761 Closes https://github.com/facebook/rocksdb/pull/1456 Differential Revision: D4111271 Pulled By: ajkr fbshipit-source-id: 6e388d4 --- db/compacted_db_impl.cc | 4 +- db/db_compaction_test.cc | 9 ++-- db/db_impl.cc | 42 +++++++++++----- db/db_impl_readonly.cc | 10 ++-- db/memtable.cc | 27 ++++++++-- db/memtable.h | 9 ++-- db/memtable_list.cc | 21 +++++--- db/memtable_list.h | 23 ++++++--- db/memtable_list_test.cc | 83 +++++++++++++++++++------------ db/range_del_aggregator.cc | 5 ++ db/range_del_aggregator.h | 4 ++ db/version_set.cc | 17 ++++++- db/version_set.h | 5 +- include/rocksdb/options.h | 6 +++ table/cuckoo_table_reader_test.cc | 12 ++--- table/get_context.cc | 20 +++++--- table/get_context.h | 6 ++- table/table_reader_bench.cc | 10 ++-- table/table_test.cc | 8 +-- util/options.cc | 6 ++- 20 files changed, 226 insertions(+), 101 deletions(-) diff --git a/db/compacted_db_impl.cc b/db/compacted_db_impl.cc index f59ccd2c9..e6a8a9e38 100644 --- a/db/compacted_db_impl.cc +++ b/db/compacted_db_impl.cc @@ -46,7 +46,7 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*, const Slice& key, std::string* value) { GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound, key, value, nullptr, nullptr, - nullptr); + nullptr, nullptr); LookupKey lkey(key, kMaxSequenceNumber); files_.files[FindFile(key)].fd.table_reader->Get( options, lkey.internal_key(), &get_context); @@ -77,7 +77,7 @@ std::vector CompactedDBImpl::MultiGet(const ReadOptions& options, if (r != nullptr) { GetContext get_context(user_comparator_, nullptr, nullptr, nullptr, GetContext::kNotFound, keys[idx], &(*values)[idx], - nullptr, nullptr, nullptr); + nullptr, nullptr, nullptr, nullptr); LookupKey lkey(keys[idx], kMaxSequenceNumber); r->Get(options, lkey.internal_key(), &get_context); if (get_context.State() == GetContext::kFound) { diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index c50e6748d..80fc12ff4 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -294,7 +294,8 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { num_new_table_reader = 0; ASSERT_EQ(Key(k), Get(Key(k))); // lookup iterator from table cache and no need to create a new one. - ASSERT_EQ(num_table_cache_lookup, 1); + // a second table cache iterator is created for range tombstones + ASSERT_EQ(num_table_cache_lookup, 2); ASSERT_EQ(num_new_table_reader, 0); } } @@ -317,7 +318,8 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { num_table_cache_lookup = 0; num_new_table_reader = 0; ASSERT_EQ(Key(1), Get(Key(1))); - ASSERT_EQ(num_table_cache_lookup, 1); + // a second table cache iterator is created for range tombstones + ASSERT_EQ(num_table_cache_lookup, 2); ASSERT_EQ(num_new_table_reader, 0); num_table_cache_lookup = 0; @@ -335,7 +337,8 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { num_table_cache_lookup = 0; num_new_table_reader = 0; ASSERT_EQ(Key(1), Get(Key(1))); - ASSERT_EQ(num_table_cache_lookup, 1); + // a second table cache iterator is created for range tombstones + ASSERT_EQ(num_table_cache_lookup, 2); ASSERT_EQ(num_new_table_reader, 0); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); diff --git a/db/db_impl.cc b/db/db_impl.cc index ad04bc396..0948039a0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -51,6 +51,7 @@ #include "db/memtable_list.h" #include "db/merge_context.h" #include "db/merge_helper.h" +#include "db/range_del_aggregator.h" #include "db/table_cache.h" #include "db/table_properties_collector.h" #include "db/transaction_log_impl.h" @@ -3961,6 +3962,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, SuperVersion* sv = GetAndRefSuperVersion(cfd); // Prepare to store a list of merge operations if merge occurs. MergeContext merge_context; + RangeDelAggregator range_del_agg(cfd->internal_comparator(), {snapshot}); Status s; // First look in the memtable, then in the immutable memtable (if any). @@ -3973,18 +3975,24 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, (read_options.read_tier == kPersistedTier && has_unpersisted_data_); bool done = false; if (!skip_memtable) { - if (sv->mem->Get(lkey, value, &s, &merge_context)) { + if (sv->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, + read_options)) { done = true; RecordTick(stats_, MEMTABLE_HIT); - } else if (sv->imm->Get(lkey, value, &s, &merge_context)) { + } else if ((s.ok() || s.IsMergeInProgress()) && + sv->imm->Get(lkey, value, &s, &merge_context, &range_del_agg, + read_options)) { done = true; RecordTick(stats_, MEMTABLE_HIT); } + if (!done && !s.ok() && !s.IsMergeInProgress()) { + return s; + } } if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); sv->current->Get(read_options, lkey, value, &s, &merge_context, - value_found); + &range_del_agg, value_found); RecordTick(stats_, MEMTABLE_MISS); } @@ -4062,6 +4070,8 @@ std::vector DBImpl::MultiGet( LookupKey lkey(keys[i], snapshot); auto cfh = reinterpret_cast(column_family[i]); + RangeDelAggregator range_del_agg(cfh->cfd()->internal_comparator(), + {snapshot}); auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); assert(mgd_iter != multiget_cf_data.end()); auto mgd = mgd_iter->second; @@ -4070,18 +4080,20 @@ std::vector DBImpl::MultiGet( (read_options.read_tier == kPersistedTier && has_unpersisted_data_); bool done = false; if (!skip_memtable) { - if (super_version->mem->Get(lkey, value, &s, &merge_context)) { + if (super_version->mem->Get(lkey, value, &s, &merge_context, + &range_del_agg, read_options)) { done = true; // TODO(?): RecordTick(stats_, MEMTABLE_HIT)? - } else if (super_version->imm->Get(lkey, value, &s, &merge_context)) { + } else if (super_version->imm->Get(lkey, value, &s, &merge_context, + &range_del_agg, read_options)) { done = true; // TODO(?): RecordTick(stats_, MEMTABLE_HIT)? } } if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(read_options, lkey, value, &s, - &merge_context); + super_version->current->Get(read_options, lkey, value, &s, &merge_context, + &range_del_agg); // TODO(?): RecordTick(stats_, MEMTABLE_MISS)? } @@ -6293,7 +6305,9 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, bool* found_record_for_key) { Status s; MergeContext merge_context; + RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(), {}); + ReadOptions read_options; SequenceNumber current_seq = versions_->LastSequence(); LookupKey lkey(key, current_seq); @@ -6301,7 +6315,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, *found_record_for_key = false; // Check if there is a record for this key in the latest memtable - sv->mem->Get(lkey, nullptr, &s, &merge_context, seq); + sv->mem->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, + read_options); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -6319,7 +6334,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, } // Check if there is a record for this key in the immutable memtables - sv->imm->Get(lkey, nullptr, &s, &merge_context, seq); + sv->imm->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, + read_options); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -6337,7 +6353,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, } // Check if there is a record for this key in the immutable memtables - sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, seq); + sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, &range_del_agg, + seq, read_options); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -6358,10 +6375,9 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, // SST files if cache_only=true? if (!cache_only) { // Check tables - ReadOptions read_options; - sv->current->Get(read_options, lkey, nullptr, &s, &merge_context, - nullptr /* value_found */, found_record_for_key, seq); + &range_del_agg, nullptr /* value_found */, + found_record_for_key, seq); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading SST files diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 6d2437c17..3e185288d 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -8,8 +8,9 @@ #include "db/compacted_db_impl.h" #include "db/db_impl.h" -#include "db/merge_context.h" #include "db/db_iter.h" +#include "db/merge_context.h" +#include "db/range_del_aggregator.h" #include "util/perf_context_imp.h" namespace rocksdb { @@ -37,11 +38,14 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options, auto cfd = cfh->cfd(); SuperVersion* super_version = cfd->GetSuperVersion(); MergeContext merge_context; + RangeDelAggregator range_del_agg(cfd->internal_comparator(), {snapshot}); LookupKey lkey(key, snapshot); - if (super_version->mem->Get(lkey, value, &s, &merge_context)) { + if (super_version->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, + read_options)) { } else { PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(read_options, lkey, value, &s, &merge_context); + super_version->current->Get(read_options, lkey, value, &s, &merge_context, + &range_del_agg); } return s; } diff --git a/db/memtable.cc b/db/memtable.cc index 338dcf69e..baac08dc6 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -24,6 +24,7 @@ #include "rocksdb/slice_transform.h" #include "rocksdb/write_buffer_manager.h" #include "table/internal_iterator.h" +#include "table/iterator_wrapper.h" #include "table/merger.h" #include "util/arena.h" #include "util/coding.h" @@ -366,6 +367,9 @@ InternalIterator* MemTable::NewIterator(const ReadOptions& read_options, InternalIterator* MemTable::NewRangeTombstoneIterator( const ReadOptions& read_options, Arena* arena) { assert(arena != nullptr); + if (read_options.ignore_range_deletions) { + return NewEmptyInternalIterator(arena); + } auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); return new (mem) MemTableIterator(*this, read_options, arena, true /* use_range_del_table */); @@ -500,6 +504,7 @@ struct Saver { const MergeOperator* merge_operator; // the merge operations encountered; MergeContext* merge_context; + RangeDelAggregator* range_del_agg; MemTable* mem; Logger* logger; Statistics* statistics; @@ -511,9 +516,10 @@ struct Saver { static bool SaveValue(void* arg, const char* entry) { Saver* s = reinterpret_cast(arg); MergeContext* merge_context = s->merge_context; + RangeDelAggregator* range_del_agg = s->range_del_agg; const MergeOperator* merge_operator = s->merge_operator; - assert(s != nullptr && merge_context != nullptr); + assert(s != nullptr && merge_context != nullptr && range_del_agg != nullptr); // entry format is: // klength varint32 @@ -533,6 +539,10 @@ static bool SaveValue(void* arg, const char* entry) { ValueType type; UnPackSequenceAndType(tag, &s->seq, &type); + if ((type == kTypeValue || type == kTypeDeletion) && + range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) { + type = kTypeRangeDeletion; + } switch (type) { case kTypeValue: { if (s->inplace_update_support) { @@ -555,7 +565,8 @@ static bool SaveValue(void* arg, const char* entry) { return false; } case kTypeDeletion: - case kTypeSingleDeletion: { + case kTypeSingleDeletion: + case kTypeRangeDeletion: { if (*(s->merge_in_progress)) { *(s->status) = MergeHelper::TimedFullMerge( merge_operator, s->key->user_key(), nullptr, @@ -595,7 +606,9 @@ static bool SaveValue(void* arg, const char* entry) { } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context, SequenceNumber* seq) { + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, SequenceNumber* seq, + const ReadOptions& read_opts) { // The sequence number is updated synchronously in version_set.h if (IsEmpty()) { // Avoiding recording stats for speed. @@ -618,6 +631,13 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, if (prefix_bloom_) { PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); } + ScopedArenaIterator range_del_iter( + NewRangeTombstoneIterator(read_opts, range_del_agg->GetArena())); + Status status = range_del_agg->AddTombstones(std::move(range_del_iter)); + if (!status.ok()) { + *s = status; + return false; + } Saver saver; saver.status = s; saver.found_final_value = &found_final_value; @@ -627,6 +647,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.seq = kMaxSequenceNumber; saver.mem = this; saver.merge_context = merge_context; + saver.range_del_agg = range_del_agg; saver.merge_operator = moptions_.merge_operator; saver.logger = moptions_.info_log; saver.inplace_update_support = moptions_.inplace_update_support; diff --git a/db/memtable.h b/db/memtable.h index 652a81711..28ece7d2b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -16,6 +16,7 @@ #include #include "db/dbformat.h" #include "db/memtable_allocator.h" +#include "db/range_del_aggregator.h" #include "db/skiplist.h" #include "db/version_edit.h" #include "rocksdb/db.h" @@ -185,12 +186,14 @@ class MemTable { // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other // status returned indicates a corruption or other unexpected error. bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context, SequenceNumber* seq); + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + SequenceNumber* seq, const ReadOptions& read_opts); bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context) { + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts) { SequenceNumber seq; - return Get(key, value, s, merge_context, &seq); + return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); } // Attempts to update the new_value inplace, else does normal Add diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 799887ed1..1c460324b 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -102,27 +102,36 @@ int MemTableList::NumFlushed() const { // Operands stores the list of merge operations to apply, so far. bool MemTableListVersion::Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, - SequenceNumber* seq) { - return GetFromList(&memlist_, key, value, s, merge_context, seq); + RangeDelAggregator* range_del_agg, + SequenceNumber* seq, + const ReadOptions& read_opts) { + return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg, + seq, read_opts); } bool MemTableListVersion::GetFromHistory(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, - SequenceNumber* seq) { - return GetFromList(&memlist_history_, key, value, s, merge_context, seq); + RangeDelAggregator* range_del_agg, + SequenceNumber* seq, + const ReadOptions& read_opts) { + return GetFromList(&memlist_history_, key, value, s, merge_context, + range_del_agg, seq, read_opts); } bool MemTableListVersion::GetFromList(std::list* list, const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, - SequenceNumber* seq) { + RangeDelAggregator* range_del_agg, + SequenceNumber* seq, + const ReadOptions& read_opts) { *seq = kMaxSequenceNumber; for (auto& memtable : *list) { SequenceNumber current_seq = kMaxSequenceNumber; - bool done = memtable->Get(key, value, s, merge_context, ¤t_seq); + bool done = memtable->Get(key, value, s, merge_context, range_del_agg, + ¤t_seq, read_opts); if (*seq == kMaxSequenceNumber) { // Store the most recent sequence number of any operation on this key. // Since we only care about the most recent change, we only need to diff --git a/db/memtable_list.h b/db/memtable_list.h index 7f633f026..a8f2b38c7 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -14,6 +14,7 @@ #include "db/dbformat.h" #include "db/filename.h" #include "db/memtable.h" +#include "db/range_del_aggregator.h" #include "db/skiplist.h" #include "rocksdb/db.h" #include "rocksdb/iterator.h" @@ -53,12 +54,14 @@ class MemTableListVersion { // will be stored in *seq on success (regardless of whether true/false is // returned). Otherwise, *seq will be set to kMaxSequenceNumber. bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context, SequenceNumber* seq); + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + SequenceNumber* seq, const ReadOptions& read_opts); bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context) { + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts) { SequenceNumber seq; - return Get(key, value, s, merge_context, &seq); + return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); } // Similar to Get(), but searches the Memtable history of memtables that @@ -66,11 +69,16 @@ class MemTableListVersion { // queries (such as Transaction validation) as the history may contain // writes that are also present in the SST files. bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context, SequenceNumber* seq); + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, SequenceNumber* seq, + const ReadOptions& read_opts); bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, - MergeContext* merge_context) { + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts) { SequenceNumber seq; - return GetFromHistory(key, value, s, merge_context, &seq); + return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq, + read_opts); } void AddIterators(const ReadOptions& options, @@ -102,7 +110,8 @@ class MemTableListVersion { bool GetFromList(std::list* list, const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, - SequenceNumber* seq); + RangeDelAggregator* range_del_agg, SequenceNumber* seq, + const ReadOptions& read_opts); void AddMemTable(MemTable* m); diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 5309ffff9..625b86922 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -8,6 +8,7 @@ #include #include #include "db/merge_context.h" +#include "db/range_del_aggregator.h" #include "db/version_set.h" #include "db/write_controller.h" #include "rocksdb/db.h" @@ -116,10 +117,13 @@ TEST_F(MemTableListTest, GetTest) { std::string value; Status s; MergeContext merge_context; + InternalKeyComparator ikey_cmp(options.comparator); + RangeDelAggregator range_del_agg(ikey_cmp, {} /* snapshots */); autovector to_delete; LookupKey lkey("key1", seq); - bool found = list.current()->Get(lkey, &value, &s, &merge_context); + bool found = list.current()->Get(lkey, &value, &s, &merge_context, + &range_del_agg, ReadOptions()); ASSERT_FALSE(found); // Create a MemTable @@ -141,17 +145,20 @@ TEST_F(MemTableListTest, GetTest) { // Fetch the newly written keys merge_context.Clear(); - found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context); + found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &range_del_agg, ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ(value, "value1"); merge_context.Clear(); - found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context); + found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context, + &range_del_agg, ReadOptions()); // MemTable found out that this key is *not* found (at this sequence#) ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); - found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context); + found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &range_del_agg, ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ(value, "value2.2"); @@ -177,24 +184,25 @@ TEST_F(MemTableListTest, GetTest) { // Fetch keys via MemTableList merge_context.Clear(); - found = - list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context); + found = list.current()->Get(LookupKey("key1", seq), &value, &s, + &merge_context, &range_del_agg, ReadOptions()); ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); found = list.current()->Get(LookupKey("key1", saved_seq), &value, &s, - &merge_context); + &merge_context, &range_del_agg, ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ("value1", value); merge_context.Clear(); - found = - list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context); + found = list.current()->Get(LookupKey("key2", seq), &value, &s, + &merge_context, &range_del_agg, ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ(value, "value2.3"); merge_context.Clear(); - found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context); + found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context, + &range_del_agg, ReadOptions()); ASSERT_FALSE(found); ASSERT_EQ(2, list.NumNotFlushed()); @@ -216,10 +224,13 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { std::string value; Status s; MergeContext merge_context; + InternalKeyComparator ikey_cmp(options.comparator); + RangeDelAggregator range_del_agg(ikey_cmp, {} /* snapshots */); autovector to_delete; LookupKey lkey("key1", seq); - bool found = list.current()->Get(lkey, &value, &s, &merge_context); + bool found = list.current()->Get(lkey, &value, &s, &merge_context, + &range_del_agg, ReadOptions()); ASSERT_FALSE(found); // Create a MemTable @@ -240,12 +251,14 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Fetch the newly written keys merge_context.Clear(); - found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context); + found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context, + &range_del_agg, ReadOptions()); // MemTable found out that this key is *not* found (at this sequence#) ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); - found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context); + found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context, + &range_del_agg, ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ(value, "value2.2"); @@ -255,13 +268,13 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Fetch keys via MemTableList merge_context.Clear(); - found = - list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context); + found = list.current()->Get(LookupKey("key1", seq), &value, &s, + &merge_context, &range_del_agg, ReadOptions()); ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); - found = - list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context); + found = list.current()->Get(LookupKey("key2", seq), &value, &s, + &merge_context, &range_del_agg, ReadOptions()); ASSERT_TRUE(s.ok() && found); ASSERT_EQ("value2.2", value); @@ -280,24 +293,26 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Verify keys are no longer in MemTableList merge_context.Clear(); - found = - list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context); + found = list.current()->Get(LookupKey("key1", seq), &value, &s, + &merge_context, &range_del_agg, ReadOptions()); ASSERT_FALSE(found); merge_context.Clear(); - found = - list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context); + found = list.current()->Get(LookupKey("key2", seq), &value, &s, + &merge_context, &range_del_agg, ReadOptions()); ASSERT_FALSE(found); // Verify keys are present in history merge_context.Clear(); found = list.current()->GetFromHistory(LookupKey("key1", seq), &value, &s, - &merge_context); + &merge_context, &range_del_agg, + ReadOptions()); ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); found = list.current()->GetFromHistory(LookupKey("key2", seq), &value, &s, - &merge_context); + &merge_context, &range_del_agg, + ReadOptions()); ASSERT_TRUE(found); ASSERT_EQ("value2.2", value); @@ -338,36 +353,38 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Verify keys are no longer in MemTableList merge_context.Clear(); - found = - list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context); + found = list.current()->Get(LookupKey("key1", seq), &value, &s, + &merge_context, &range_del_agg, ReadOptions()); ASSERT_FALSE(found); merge_context.Clear(); - found = - list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context); + found = list.current()->Get(LookupKey("key2", seq), &value, &s, + &merge_context, &range_del_agg, ReadOptions()); ASSERT_FALSE(found); merge_context.Clear(); - found = - list.current()->Get(LookupKey("key3", seq), &value, &s, &merge_context); + found = list.current()->Get(LookupKey("key3", seq), &value, &s, + &merge_context, &range_del_agg, ReadOptions()); ASSERT_FALSE(found); // Verify that the second memtable's keys are in the history merge_context.Clear(); found = list.current()->GetFromHistory(LookupKey("key1", seq), &value, &s, - &merge_context); + &merge_context, &range_del_agg, + ReadOptions()); ASSERT_TRUE(found && s.IsNotFound()); merge_context.Clear(); found = list.current()->GetFromHistory(LookupKey("key3", seq), &value, &s, - &merge_context); + &merge_context, &range_del_agg, + ReadOptions()); ASSERT_TRUE(found); ASSERT_EQ("value3", value); // Verify that key2 from the first memtable is no longer in the history merge_context.Clear(); - found = - list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context); + found = list.current()->Get(LookupKey("key2", seq), &value, &s, + &merge_context, &range_del_agg, ReadOptions()); ASSERT_FALSE(found); // Cleanup diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc index f79ce48b2..ba874c02b 100644 --- a/db/range_del_aggregator.cc +++ b/db/range_del_aggregator.cc @@ -29,6 +29,11 @@ bool RangeDelAggregator::ShouldDelete(const Slice& internal_key, if (!ParseInternalKey(internal_key, &parsed)) { assert(false); } + return ShouldDelete(parsed, for_compaction); +} + +bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed, + bool for_compaction /* = false */) { assert(IsValueType(parsed.type)); // Starting point is the snapshot stripe in which the key lives, then need to diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index d356ee139..64d1becab 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -42,6 +42,8 @@ class RangeDelAggregator { // Returns whether the key should be deleted, which is the case when it is // covered by a range tombstone residing in the same snapshot stripe. + bool ShouldDelete(const ParsedInternalKey& parsed, + bool for_compaction = false); bool ShouldDelete(const Slice& internal_key, bool for_compaction = false); bool ShouldAddTombstones(bool bottommost_level = false); @@ -69,6 +71,7 @@ class RangeDelAggregator { void AddToBuilder(TableBuilder* builder, bool extend_before_min_key, const Slice* next_table_min_key, FileMetaData* meta, bool bottommost_level = false); + Arena* GetArena() { return &arena_; } private: // Maps tombstone start key -> tombstone object @@ -84,5 +87,6 @@ class RangeDelAggregator { PinnedIteratorsManager pinned_iters_mgr_; StripeMap stripe_map_; const InternalKeyComparator icmp_; + Arena arena_; }; } // namespace rocksdb diff --git a/db/version_set.cc b/db/version_set.cc index 7600b0439..a78db579e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -928,7 +928,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, void Version::Get(const ReadOptions& read_options, const LookupKey& k, std::string* value, Status* status, - MergeContext* merge_context, bool* value_found, + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, bool* value_found, bool* key_exists, SequenceNumber* seq) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); @@ -944,7 +945,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, - value, value_found, merge_context, this->env_, seq, + value, value_found, merge_context, range_del_agg, this->env_, seq, merge_operator_ ? &pinned_iters_mgr : nullptr); // Pin blocks that we read to hold merge operands @@ -958,6 +959,18 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, user_comparator(), internal_comparator()); FdWithKeyRange* f = fp.GetNextFile(); while (f != nullptr) { + if (!read_options.ignore_range_deletions) { + table_cache_->NewIterator( + read_options, vset_->env_options(), *internal_comparator(), f->fd, + nullptr /* table_reader_ptr */, + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), + false /* for_compaction */, nullptr /* arena */, + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()), + fp.GetCurrentLevel() /* level */, range_del_agg, + true /* is_range_del_only */); + } + *status = table_cache_->Get( read_options, *internal_comparator(), f->fd, ikey, &get_context, cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), diff --git a/db/version_set.h b/db/version_set.h index ceeca3d5b..2842ff5dc 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -34,6 +34,7 @@ #include "db/dbformat.h" #include "db/file_indexer.h" #include "db/log_reader.h" +#include "db/range_del_aggregator.h" #include "db/table_cache.h" #include "db/version_builder.h" #include "db/version_edit.h" @@ -457,8 +458,8 @@ class Version { // REQUIRES: lock is not held void Get(const ReadOptions&, const LookupKey& key, std::string* val, Status* status, MergeContext* merge_context, - bool* value_found = nullptr, bool* key_exists = nullptr, - SequenceNumber* seq = nullptr); + RangeDelAggregator* range_del_agg, bool* value_found = nullptr, + bool* key_exists = nullptr, SequenceNumber* seq = nullptr); // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 2d7272d48..75fdab101 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1526,6 +1526,12 @@ struct ReadOptions { // Default: 0 size_t readahead_size; + // If true, keys deleted using the DeleteRange() API will be visible to + // readers until they are naturally deleted during compaction. This improves + // read performance in DBs with many range deletions. + // Default: false + bool ignore_range_deletions; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/table/cuckoo_table_reader_test.cc b/table/cuckoo_table_reader_test.cc index e440af90a..e207c62bd 100644 --- a/table/cuckoo_table_reader_test.cc +++ b/table/cuckoo_table_reader_test.cc @@ -126,7 +126,7 @@ class CuckooReaderTest : public testing::Test { std::string value; GetContext get_context(ucomp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(user_keys[i]), &value, - nullptr, nullptr, nullptr); + nullptr, nullptr, nullptr, nullptr); ASSERT_OK(reader.Get(ReadOptions(), Slice(keys[i]), &get_context)); ASSERT_EQ(values[i], value); } @@ -336,7 +336,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { std::string value; GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(not_found_key), &value, nullptr, nullptr, - nullptr); + nullptr, nullptr); ASSERT_OK(reader.Get(ReadOptions(), Slice(not_found_key), &get_context)); ASSERT_TRUE(value.empty()); ASSERT_OK(reader.status()); @@ -348,7 +348,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { AppendInternalKey(¬_found_key2, ikey2); GetContext get_context2(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(not_found_key2), &value, - nullptr, nullptr, nullptr); + nullptr, nullptr, nullptr, nullptr); ASSERT_OK(reader.Get(ReadOptions(), Slice(not_found_key2), &get_context2)); ASSERT_TRUE(value.empty()); ASSERT_OK(reader.status()); @@ -362,7 +362,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) { kNumHashFunc, kNumHashFunc); GetContext get_context3(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(unused_key), &value, - nullptr, nullptr, nullptr); + nullptr, nullptr, nullptr, nullptr); ASSERT_OK(reader.Get(ReadOptions(), Slice(unused_key), &get_context3)); ASSERT_TRUE(value.empty()); ASSERT_OK(reader.status()); @@ -437,7 +437,7 @@ void WriteFile(const std::vector& keys, // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, - nullptr, nullptr); + nullptr, nullptr, nullptr); for (uint64_t i = 0; i < num; ++i) { value.clear(); ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &get_context)); @@ -484,7 +484,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) { // Assume only the fast path is triggered GetContext get_context(nullptr, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), &value, nullptr, - nullptr, nullptr); + nullptr, nullptr, nullptr); uint64_t start_time = env->NowMicros(); if (batch_size > 0) { for (uint64_t i = 0; i < num; i += batch_size) { diff --git a/table/get_context.cc b/table/get_context.cc index 4a7a9693b..662833f4c 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -36,7 +36,8 @@ GetContext::GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, std::string* ret_value, - bool* value_found, MergeContext* merge_context, Env* env, + bool* value_found, MergeContext* merge_context, + RangeDelAggregator* range_del_agg, Env* env, SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr) : ucmp_(ucmp), @@ -48,6 +49,7 @@ GetContext::GetContext(const Comparator* ucmp, value_(ret_value), value_found_(value_found), merge_context_(merge_context), + range_del_agg_(range_del_agg), env_(env), seq_(seq), replay_log_(nullptr), @@ -93,8 +95,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } } + auto type = parsed_key.type; // Key matches. Process it - switch (parsed_key.type) { + if ((type == kTypeValue || type == kTypeMerge) && + range_del_agg_ != nullptr && range_del_agg_->ShouldDelete(parsed_key)) { + type = kTypeRangeDeletion; + } + switch (type) { case kTypeValue: assert(state_ == kNotFound || state_ == kMerge); if (kNotFound == state_) { @@ -106,10 +113,10 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(merge_operator_ != nullptr); state_ = kFound; if (value_ != nullptr) { - Status merge_status = - MergeHelper::TimedFullMerge(merge_operator_, user_key_, &value, - merge_context_->GetOperands(), - value_, logger_, statistics_, env_); + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator_, user_key_, &value, + merge_context_->GetOperands(), value_, logger_, statistics_, + env_); if (!merge_status.ok()) { state_ = kCorrupt; } @@ -119,6 +126,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, case kTypeDeletion: case kTypeSingleDeletion: + case kTypeRangeDeletion: // TODO(noetzli): Verify correctness once merge of single-deletes // is supported assert(state_ == kNotFound || state_ == kMerge); diff --git a/table/get_context.h b/table/get_context.h index 4cee09a8d..f76c9b48f 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -6,6 +6,7 @@ #pragma once #include #include "db/merge_context.h" +#include "db/range_del_aggregator.h" #include "rocksdb/env.h" #include "rocksdb/types.h" @@ -26,8 +27,8 @@ class GetContext { GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, std::string* ret_value, bool* value_found, - MergeContext* merge_context, Env* env, - SequenceNumber* seq = nullptr, + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + Env* env, SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr); void MarkKeyMayExist(); @@ -68,6 +69,7 @@ class GetContext { std::string* value_; bool* value_found_; // Is value set correctly? Used by KeyMayExist MergeContext* merge_context_; + RangeDelAggregator* range_del_agg_; Env* env_; // If a key is found, seq_ will be set to the SequenceNumber of most recent // write to the key or kMaxSequenceNumber if unknown diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index ccdd5c146..77adb8877 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -168,10 +168,12 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, if (!through_db) { std::string value; MergeContext merge_context; - GetContext get_context( - ioptions.user_comparator, ioptions.merge_operator, - ioptions.info_log, ioptions.statistics, GetContext::kNotFound, - Slice(key), &value, nullptr, &merge_context, env); + RangeDelAggregator range_del_agg(ikc, {} /* snapshots */); + GetContext get_context(ioptions.user_comparator, + ioptions.merge_operator, ioptions.info_log, + ioptions.statistics, GetContext::kNotFound, + Slice(key), &value, nullptr, &merge_context, + &range_del_agg, env); s = table_reader->Get(read_options, key, &get_context); } else { s = db->Get(read_options, key, &result); diff --git a/table/table_test.cc b/table/table_test.cc index f6dc75f81..ca9892316 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1799,7 +1799,7 @@ TEST_F(BlockBasedTableTest, BlockCacheDisabledTest) { { GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, Slice(), nullptr, nullptr, - nullptr, nullptr); + nullptr, nullptr, nullptr); // a hack that just to trigger BlockBasedTable::GetFilter. reader->Get(ReadOptions(), "non-exist-key", &get_context); BlockCachePropertiesSnapshot props(options.statistics.get()); @@ -1965,7 +1965,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { std::string value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr); + nullptr, nullptr, nullptr); ASSERT_OK(reader->Get(ReadOptions(), user_key, &get_context)); ASSERT_EQ(value, "hello"); BlockCachePropertiesSnapshot props(options.statistics.get()); @@ -2048,7 +2048,7 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) { std::string value; GetContext get_context(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr); + nullptr, nullptr, nullptr); perf_context.Reset(); ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context)); if (index_and_filter_in_cache) { @@ -2068,7 +2068,7 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) { get_context = GetContext(options.comparator, nullptr, nullptr, nullptr, GetContext::kNotFound, user_key, &value, nullptr, - nullptr, nullptr); + nullptr, nullptr, nullptr); perf_context.Reset(); ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context)); ASSERT_EQ(get_context.State(), GetContext::kNotFound); diff --git a/util/options.cc b/util/options.cc index bdacb7bf2..da15667bc 100644 --- a/util/options.cc +++ b/util/options.cc @@ -768,7 +768,8 @@ ReadOptions::ReadOptions() prefix_same_as_start(false), pin_data(false), background_purge_on_iterator_cleanup(false), - readahead_size(0) { + readahead_size(0), + ignore_range_deletions(false) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); } @@ -785,7 +786,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache) prefix_same_as_start(false), pin_data(false), background_purge_on_iterator_cleanup(false), - readahead_size(0) { + readahead_size(0), + ignore_range_deletions(false) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); }