DeleteRange Get support

Summary:
During Get()/MultiGet(), build up a RangeDelAggregator with range
tombstones as we search through live memtable, immutable memtables, and
SST files. This aggregator is then used by memtable.cc's SaveValue() and
GetContext::SaveValue() to check whether keys are covered.

added tests for Get on memtables/files; end-to-end tests mainly in https://reviews.facebook.net/D64761
Closes https://github.com/facebook/rocksdb/pull/1456

Differential Revision: D4111271

Pulled By: ajkr

fbshipit-source-id: 6e388d4
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 879f366366
commit f998c9790f
  1. 4
      db/compacted_db_impl.cc
  2. 9
      db/db_compaction_test.cc
  3. 42
      db/db_impl.cc
  4. 10
      db/db_impl_readonly.cc
  5. 27
      db/memtable.cc
  6. 9
      db/memtable.h
  7. 21
      db/memtable_list.cc
  8. 23
      db/memtable_list.h
  9. 83
      db/memtable_list_test.cc
  10. 5
      db/range_del_aggregator.cc
  11. 4
      db/range_del_aggregator.h
  12. 17
      db/version_set.cc
  13. 5
      db/version_set.h
  14. 6
      include/rocksdb/options.h
  15. 12
      table/cuckoo_table_reader_test.cc
  16. 20
      table/get_context.cc
  17. 6
      table/get_context.h
  18. 10
      table/table_reader_bench.cc
  19. 8
      table/table_test.cc
  20. 6
      util/options.cc

@ -46,7 +46,7 @@ Status CompactedDBImpl::Get(const ReadOptions& options,
ColumnFamilyHandle*, const Slice& key, std::string* value) {
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr,
nullptr);
nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber);
files_.files[FindFile(key)].fd.table_reader->Get(
options, lkey.internal_key(), &get_context);
@ -77,7 +77,7 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
if (r != nullptr) {
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, keys[idx], &(*values)[idx],
nullptr, nullptr, nullptr);
nullptr, nullptr, nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(), &get_context);
if (get_context.State() == GetContext::kFound) {

@ -294,7 +294,8 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
num_new_table_reader = 0;
ASSERT_EQ(Key(k), Get(Key(k)));
// lookup iterator from table cache and no need to create a new one.
ASSERT_EQ(num_table_cache_lookup, 1);
// a second table cache iterator is created for range tombstones
ASSERT_EQ(num_table_cache_lookup, 2);
ASSERT_EQ(num_new_table_reader, 0);
}
}
@ -317,7 +318,8 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
num_table_cache_lookup = 0;
num_new_table_reader = 0;
ASSERT_EQ(Key(1), Get(Key(1)));
ASSERT_EQ(num_table_cache_lookup, 1);
// a second table cache iterator is created for range tombstones
ASSERT_EQ(num_table_cache_lookup, 2);
ASSERT_EQ(num_new_table_reader, 0);
num_table_cache_lookup = 0;
@ -335,7 +337,8 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
num_table_cache_lookup = 0;
num_new_table_reader = 0;
ASSERT_EQ(Key(1), Get(Key(1)));
ASSERT_EQ(num_table_cache_lookup, 1);
// a second table cache iterator is created for range tombstones
ASSERT_EQ(num_table_cache_lookup, 2);
ASSERT_EQ(num_new_table_reader, 0);
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();

@ -51,6 +51,7 @@
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/range_del_aggregator.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
#include "db/transaction_log_impl.h"
@ -3961,6 +3962,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
SuperVersion* sv = GetAndRefSuperVersion(cfd);
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
RangeDelAggregator range_del_agg(cfd->internal_comparator(), {snapshot});
Status s;
// First look in the memtable, then in the immutable memtable (if any).
@ -3973,18 +3975,24 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
(read_options.read_tier == kPersistedTier && has_unpersisted_data_);
bool done = false;
if (!skip_memtable) {
if (sv->mem->Get(lkey, value, &s, &merge_context)) {
if (sv->mem->Get(lkey, value, &s, &merge_context, &range_del_agg,
read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if (sv->imm->Get(lkey, value, &s, &merge_context)) {
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, value, &s, &merge_context, &range_del_agg,
read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
}
if (!done && !s.ok() && !s.IsMergeInProgress()) {
return s;
}
}
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(read_options, lkey, value, &s, &merge_context,
value_found);
&range_del_agg, value_found);
RecordTick(stats_, MEMTABLE_MISS);
}
@ -4062,6 +4070,8 @@ std::vector<Status> DBImpl::MultiGet(
LookupKey lkey(keys[i], snapshot);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
RangeDelAggregator range_del_agg(cfh->cfd()->internal_comparator(),
{snapshot});
auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
assert(mgd_iter != multiget_cf_data.end());
auto mgd = mgd_iter->second;
@ -4070,18 +4080,20 @@ std::vector<Status> DBImpl::MultiGet(
(read_options.read_tier == kPersistedTier && has_unpersisted_data_);
bool done = false;
if (!skip_memtable) {
if (super_version->mem->Get(lkey, value, &s, &merge_context)) {
if (super_version->mem->Get(lkey, value, &s, &merge_context,
&range_del_agg, read_options)) {
done = true;
// TODO(?): RecordTick(stats_, MEMTABLE_HIT)?
} else if (super_version->imm->Get(lkey, value, &s, &merge_context)) {
} else if (super_version->imm->Get(lkey, value, &s, &merge_context,
&range_del_agg, read_options)) {
done = true;
// TODO(?): RecordTick(stats_, MEMTABLE_HIT)?
}
}
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->Get(read_options, lkey, value, &s,
&merge_context);
super_version->current->Get(read_options, lkey, value, &s, &merge_context,
&range_del_agg);
// TODO(?): RecordTick(stats_, MEMTABLE_MISS)?
}
@ -6293,7 +6305,9 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
bool* found_record_for_key) {
Status s;
MergeContext merge_context;
RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(), {});
ReadOptions read_options;
SequenceNumber current_seq = versions_->LastSequence();
LookupKey lkey(key, current_seq);
@ -6301,7 +6315,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
*found_record_for_key = false;
// Check if there is a record for this key in the latest memtable
sv->mem->Get(lkey, nullptr, &s, &merge_context, seq);
sv->mem->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq,
read_options);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.
@ -6319,7 +6334,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
}
// Check if there is a record for this key in the immutable memtables
sv->imm->Get(lkey, nullptr, &s, &merge_context, seq);
sv->imm->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq,
read_options);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.
@ -6337,7 +6353,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
}
// Check if there is a record for this key in the immutable memtables
sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, seq);
sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, &range_del_agg,
seq, read_options);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.
@ -6358,10 +6375,9 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
// SST files if cache_only=true?
if (!cache_only) {
// Check tables
ReadOptions read_options;
sv->current->Get(read_options, lkey, nullptr, &s, &merge_context,
nullptr /* value_found */, found_record_for_key, seq);
&range_del_agg, nullptr /* value_found */,
found_record_for_key, seq);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading SST files

@ -8,8 +8,9 @@
#include "db/compacted_db_impl.h"
#include "db/db_impl.h"
#include "db/merge_context.h"
#include "db/db_iter.h"
#include "db/merge_context.h"
#include "db/range_del_aggregator.h"
#include "util/perf_context_imp.h"
namespace rocksdb {
@ -37,11 +38,14 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
auto cfd = cfh->cfd();
SuperVersion* super_version = cfd->GetSuperVersion();
MergeContext merge_context;
RangeDelAggregator range_del_agg(cfd->internal_comparator(), {snapshot});
LookupKey lkey(key, snapshot);
if (super_version->mem->Get(lkey, value, &s, &merge_context)) {
if (super_version->mem->Get(lkey, value, &s, &merge_context, &range_del_agg,
read_options)) {
} else {
PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->Get(read_options, lkey, value, &s, &merge_context);
super_version->current->Get(read_options, lkey, value, &s, &merge_context,
&range_del_agg);
}
return s;
}

@ -24,6 +24,7 @@
#include "rocksdb/slice_transform.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "table/merger.h"
#include "util/arena.h"
#include "util/coding.h"
@ -366,6 +367,9 @@ InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
InternalIterator* MemTable::NewRangeTombstoneIterator(
const ReadOptions& read_options, Arena* arena) {
assert(arena != nullptr);
if (read_options.ignore_range_deletions) {
return NewEmptyInternalIterator(arena);
}
auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
return new (mem) MemTableIterator(*this, read_options, arena,
true /* use_range_del_table */);
@ -500,6 +504,7 @@ struct Saver {
const MergeOperator* merge_operator;
// the merge operations encountered;
MergeContext* merge_context;
RangeDelAggregator* range_del_agg;
MemTable* mem;
Logger* logger;
Statistics* statistics;
@ -511,9 +516,10 @@ struct Saver {
static bool SaveValue(void* arg, const char* entry) {
Saver* s = reinterpret_cast<Saver*>(arg);
MergeContext* merge_context = s->merge_context;
RangeDelAggregator* range_del_agg = s->range_del_agg;
const MergeOperator* merge_operator = s->merge_operator;
assert(s != nullptr && merge_context != nullptr);
assert(s != nullptr && merge_context != nullptr && range_del_agg != nullptr);
// entry format is:
// klength varint32
@ -533,6 +539,10 @@ static bool SaveValue(void* arg, const char* entry) {
ValueType type;
UnPackSequenceAndType(tag, &s->seq, &type);
if ((type == kTypeValue || type == kTypeDeletion) &&
range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) {
type = kTypeRangeDeletion;
}
switch (type) {
case kTypeValue: {
if (s->inplace_update_support) {
@ -555,7 +565,8 @@ static bool SaveValue(void* arg, const char* entry) {
return false;
}
case kTypeDeletion:
case kTypeSingleDeletion: {
case kTypeSingleDeletion:
case kTypeRangeDeletion: {
if (*(s->merge_in_progress)) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
@ -595,7 +606,9 @@ static bool SaveValue(void* arg, const char* entry) {
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* seq) {
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
@ -618,6 +631,13 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
if (prefix_bloom_) {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
ScopedArenaIterator range_del_iter(
NewRangeTombstoneIterator(read_opts, range_del_agg->GetArena()));
Status status = range_del_agg->AddTombstones(std::move(range_del_iter));
if (!status.ok()) {
*s = status;
return false;
}
Saver saver;
saver.status = s;
saver.found_final_value = &found_final_value;
@ -627,6 +647,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.merge_context = merge_context;
saver.range_del_agg = range_del_agg;
saver.merge_operator = moptions_.merge_operator;
saver.logger = moptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;

@ -16,6 +16,7 @@
#include <vector>
#include "db/dbformat.h"
#include "db/memtable_allocator.h"
#include "db/range_del_aggregator.h"
#include "db/skiplist.h"
#include "db/version_edit.h"
#include "rocksdb/db.h"
@ -185,12 +186,14 @@ class MemTable {
// On success, *s may be set to OK, NotFound, or MergeInProgress. Any other
// status returned indicates a corruption or other unexpected error.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* seq);
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts);
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context) {
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts) {
SequenceNumber seq;
return Get(key, value, s, merge_context, &seq);
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts);
}
// Attempts to update the new_value inplace, else does normal Add

@ -102,27 +102,36 @@ int MemTableList::NumFlushed() const {
// Operands stores the list of merge operations to apply, so far.
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context,
SequenceNumber* seq) {
return GetFromList(&memlist_, key, value, s, merge_context, seq);
RangeDelAggregator* range_del_agg,
SequenceNumber* seq,
const ReadOptions& read_opts) {
return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg,
seq, read_opts);
}
bool MemTableListVersion::GetFromHistory(const LookupKey& key,
std::string* value, Status* s,
MergeContext* merge_context,
SequenceNumber* seq) {
return GetFromList(&memlist_history_, key, value, s, merge_context, seq);
RangeDelAggregator* range_del_agg,
SequenceNumber* seq,
const ReadOptions& read_opts) {
return GetFromList(&memlist_history_, key, value, s, merge_context,
range_del_agg, seq, read_opts);
}
bool MemTableListVersion::GetFromList(std::list<MemTable*>* list,
const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context,
SequenceNumber* seq) {
RangeDelAggregator* range_del_agg,
SequenceNumber* seq,
const ReadOptions& read_opts) {
*seq = kMaxSequenceNumber;
for (auto& memtable : *list) {
SequenceNumber current_seq = kMaxSequenceNumber;
bool done = memtable->Get(key, value, s, merge_context, &current_seq);
bool done = memtable->Get(key, value, s, merge_context, range_del_agg,
&current_seq, read_opts);
if (*seq == kMaxSequenceNumber) {
// Store the most recent sequence number of any operation on this key.
// Since we only care about the most recent change, we only need to

@ -14,6 +14,7 @@
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/memtable.h"
#include "db/range_del_aggregator.h"
#include "db/skiplist.h"
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
@ -53,12 +54,14 @@ class MemTableListVersion {
// will be stored in *seq on success (regardless of whether true/false is
// returned). Otherwise, *seq will be set to kMaxSequenceNumber.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* seq);
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts);
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context) {
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts) {
SequenceNumber seq;
return Get(key, value, s, merge_context, &seq);
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts);
}
// Similar to Get(), but searches the Memtable history of memtables that
@ -66,11 +69,16 @@ class MemTableListVersion {
// queries (such as Transaction validation) as the history may contain
// writes that are also present in the SST files.
bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* seq);
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts);
bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context) {
MergeContext* merge_context,
RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts) {
SequenceNumber seq;
return GetFromHistory(key, value, s, merge_context, &seq);
return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq,
read_opts);
}
void AddIterators(const ReadOptions& options,
@ -102,7 +110,8 @@ class MemTableListVersion {
bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
std::string* value, Status* s, MergeContext* merge_context,
SequenceNumber* seq);
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts);
void AddMemTable(MemTable* m);

@ -8,6 +8,7 @@
#include <string>
#include <vector>
#include "db/merge_context.h"
#include "db/range_del_aggregator.h"
#include "db/version_set.h"
#include "db/write_controller.h"
#include "rocksdb/db.h"
@ -116,10 +117,13 @@ TEST_F(MemTableListTest, GetTest) {
std::string value;
Status s;
MergeContext merge_context;
InternalKeyComparator ikey_cmp(options.comparator);
RangeDelAggregator range_del_agg(ikey_cmp, {} /* snapshots */);
autovector<MemTable*> to_delete;
LookupKey lkey("key1", seq);
bool found = list.current()->Get(lkey, &value, &s, &merge_context);
bool found = list.current()->Get(lkey, &value, &s, &merge_context,
&range_del_agg, ReadOptions());
ASSERT_FALSE(found);
// Create a MemTable
@ -141,17 +145,20 @@ TEST_F(MemTableListTest, GetTest) {
// Fetch the newly written keys
merge_context.Clear();
found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context);
found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
&range_del_agg, ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value1");
merge_context.Clear();
found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context);
found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context,
&range_del_agg, ReadOptions());
// MemTable found out that this key is *not* found (at this sequence#)
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context);
found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
&range_del_agg, ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.2");
@ -177,24 +184,25 @@ TEST_F(MemTableListTest, GetTest) {
// Fetch keys via MemTableList
merge_context.Clear();
found =
list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context);
found = list.current()->Get(LookupKey("key1", seq), &value, &s,
&merge_context, &range_del_agg, ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
found = list.current()->Get(LookupKey("key1", saved_seq), &value, &s,
&merge_context);
&merge_context, &range_del_agg, ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ("value1", value);
merge_context.Clear();
found =
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context);
found = list.current()->Get(LookupKey("key2", seq), &value, &s,
&merge_context, &range_del_agg, ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.3");
merge_context.Clear();
found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context);
found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context,
&range_del_agg, ReadOptions());
ASSERT_FALSE(found);
ASSERT_EQ(2, list.NumNotFlushed());
@ -216,10 +224,13 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
std::string value;
Status s;
MergeContext merge_context;
InternalKeyComparator ikey_cmp(options.comparator);
RangeDelAggregator range_del_agg(ikey_cmp, {} /* snapshots */);
autovector<MemTable*> to_delete;
LookupKey lkey("key1", seq);
bool found = list.current()->Get(lkey, &value, &s, &merge_context);
bool found = list.current()->Get(lkey, &value, &s, &merge_context,
&range_del_agg, ReadOptions());
ASSERT_FALSE(found);
// Create a MemTable
@ -240,12 +251,14 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Fetch the newly written keys
merge_context.Clear();
found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context);
found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
&range_del_agg, ReadOptions());
// MemTable found out that this key is *not* found (at this sequence#)
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context);
found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
&range_del_agg, ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ(value, "value2.2");
@ -255,13 +268,13 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Fetch keys via MemTableList
merge_context.Clear();
found =
list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context);
found = list.current()->Get(LookupKey("key1", seq), &value, &s,
&merge_context, &range_del_agg, ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
found =
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context);
found = list.current()->Get(LookupKey("key2", seq), &value, &s,
&merge_context, &range_del_agg, ReadOptions());
ASSERT_TRUE(s.ok() && found);
ASSERT_EQ("value2.2", value);
@ -280,24 +293,26 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Verify keys are no longer in MemTableList
merge_context.Clear();
found =
list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context);
found = list.current()->Get(LookupKey("key1", seq), &value, &s,
&merge_context, &range_del_agg, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
found =
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context);
found = list.current()->Get(LookupKey("key2", seq), &value, &s,
&merge_context, &range_del_agg, ReadOptions());
ASSERT_FALSE(found);
// Verify keys are present in history
merge_context.Clear();
found = list.current()->GetFromHistory(LookupKey("key1", seq), &value, &s,
&merge_context);
&merge_context, &range_del_agg,
ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
found = list.current()->GetFromHistory(LookupKey("key2", seq), &value, &s,
&merge_context);
&merge_context, &range_del_agg,
ReadOptions());
ASSERT_TRUE(found);
ASSERT_EQ("value2.2", value);
@ -338,36 +353,38 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
// Verify keys are no longer in MemTableList
merge_context.Clear();
found =
list.current()->Get(LookupKey("key1", seq), &value, &s, &merge_context);
found = list.current()->Get(LookupKey("key1", seq), &value, &s,
&merge_context, &range_del_agg, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
found =
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context);
found = list.current()->Get(LookupKey("key2", seq), &value, &s,
&merge_context, &range_del_agg, ReadOptions());
ASSERT_FALSE(found);
merge_context.Clear();
found =
list.current()->Get(LookupKey("key3", seq), &value, &s, &merge_context);
found = list.current()->Get(LookupKey("key3", seq), &value, &s,
&merge_context, &range_del_agg, ReadOptions());
ASSERT_FALSE(found);
// Verify that the second memtable's keys are in the history
merge_context.Clear();
found = list.current()->GetFromHistory(LookupKey("key1", seq), &value, &s,
&merge_context);
&merge_context, &range_del_agg,
ReadOptions());
ASSERT_TRUE(found && s.IsNotFound());
merge_context.Clear();
found = list.current()->GetFromHistory(LookupKey("key3", seq), &value, &s,
&merge_context);
&merge_context, &range_del_agg,
ReadOptions());
ASSERT_TRUE(found);
ASSERT_EQ("value3", value);
// Verify that key2 from the first memtable is no longer in the history
merge_context.Clear();
found =
list.current()->Get(LookupKey("key2", seq), &value, &s, &merge_context);
found = list.current()->Get(LookupKey("key2", seq), &value, &s,
&merge_context, &range_del_agg, ReadOptions());
ASSERT_FALSE(found);
// Cleanup

@ -29,6 +29,11 @@ bool RangeDelAggregator::ShouldDelete(const Slice& internal_key,
if (!ParseInternalKey(internal_key, &parsed)) {
assert(false);
}
return ShouldDelete(parsed, for_compaction);
}
bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed,
bool for_compaction /* = false */) {
assert(IsValueType(parsed.type));
// Starting point is the snapshot stripe in which the key lives, then need to

@ -42,6 +42,8 @@ class RangeDelAggregator {
// Returns whether the key should be deleted, which is the case when it is
// covered by a range tombstone residing in the same snapshot stripe.
bool ShouldDelete(const ParsedInternalKey& parsed,
bool for_compaction = false);
bool ShouldDelete(const Slice& internal_key, bool for_compaction = false);
bool ShouldAddTombstones(bool bottommost_level = false);
@ -69,6 +71,7 @@ class RangeDelAggregator {
void AddToBuilder(TableBuilder* builder, bool extend_before_min_key,
const Slice* next_table_min_key, FileMetaData* meta,
bool bottommost_level = false);
Arena* GetArena() { return &arena_; }
private:
// Maps tombstone start key -> tombstone object
@ -84,5 +87,6 @@ class RangeDelAggregator {
PinnedIteratorsManager pinned_iters_mgr_;
StripeMap stripe_map_;
const InternalKeyComparator icmp_;
Arena arena_;
};
} // namespace rocksdb

@ -928,7 +928,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
std::string* value, Status* status,
MergeContext* merge_context, bool* value_found,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, bool* value_found,
bool* key_exists, SequenceNumber* seq) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
@ -944,7 +945,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
value, value_found, merge_context, this->env_, seq,
value, value_found, merge_context, range_del_agg, this->env_, seq,
merge_operator_ ? &pinned_iters_mgr : nullptr);
// Pin blocks that we read to hold merge operands
@ -958,6 +959,18 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
while (f != nullptr) {
if (!read_options.ignore_range_deletions) {
table_cache_->NewIterator(
read_options, vset_->env_options(), *internal_comparator(), f->fd,
nullptr /* table_reader_ptr */,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
false /* for_compaction */, nullptr /* arena */,
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
fp.GetCurrentLevel() /* level */, range_del_agg,
true /* is_range_del_only */);
}
*status = table_cache_->Get(
read_options, *internal_comparator(), f->fd, ikey, &get_context,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),

@ -34,6 +34,7 @@
#include "db/dbformat.h"
#include "db/file_indexer.h"
#include "db/log_reader.h"
#include "db/range_del_aggregator.h"
#include "db/table_cache.h"
#include "db/version_builder.h"
#include "db/version_edit.h"
@ -457,8 +458,8 @@ class Version {
// REQUIRES: lock is not held
void Get(const ReadOptions&, const LookupKey& key, std::string* val,
Status* status, MergeContext* merge_context,
bool* value_found = nullptr, bool* key_exists = nullptr,
SequenceNumber* seq = nullptr);
RangeDelAggregator* range_del_agg, bool* value_found = nullptr,
bool* key_exists = nullptr, SequenceNumber* seq = nullptr);
// Loads some stats information from files. Call without mutex held. It needs
// to be called before applying the version to the version set.

@ -1526,6 +1526,12 @@ struct ReadOptions {
// Default: 0
size_t readahead_size;
// If true, keys deleted using the DeleteRange() API will be visible to
// readers until they are naturally deleted during compaction. This improves
// read performance in DBs with many range deletions.
// Default: false
bool ignore_range_deletions;
ReadOptions();
ReadOptions(bool cksum, bool cache);
};

@ -126,7 +126,7 @@ class CuckooReaderTest : public testing::Test {
std::string value;
GetContext get_context(ucomp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(user_keys[i]), &value,
nullptr, nullptr, nullptr);
nullptr, nullptr, nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(keys[i]), &get_context));
ASSERT_EQ(values[i], value);
}
@ -336,7 +336,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
std::string value;
GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound,
Slice(not_found_key), &value, nullptr, nullptr,
nullptr);
nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(not_found_key), &get_context));
ASSERT_TRUE(value.empty());
ASSERT_OK(reader.status());
@ -348,7 +348,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
AppendInternalKey(&not_found_key2, ikey2);
GetContext get_context2(ucmp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(not_found_key2), &value,
nullptr, nullptr, nullptr);
nullptr, nullptr, nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(not_found_key2), &get_context2));
ASSERT_TRUE(value.empty());
ASSERT_OK(reader.status());
@ -362,7 +362,7 @@ TEST_F(CuckooReaderTest, WhenKeyNotFound) {
kNumHashFunc, kNumHashFunc);
GetContext get_context3(ucmp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(unused_key), &value,
nullptr, nullptr, nullptr);
nullptr, nullptr, nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(unused_key), &get_context3));
ASSERT_TRUE(value.empty());
ASSERT_OK(reader.status());
@ -437,7 +437,7 @@ void WriteFile(const std::vector<std::string>& keys,
// Assume only the fast path is triggered
GetContext get_context(nullptr, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), &value, nullptr,
nullptr, nullptr);
nullptr, nullptr, nullptr);
for (uint64_t i = 0; i < num; ++i) {
value.clear();
ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &get_context));
@ -484,7 +484,7 @@ void ReadKeys(uint64_t num, uint32_t batch_size) {
// Assume only the fast path is triggered
GetContext get_context(nullptr, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), &value, nullptr,
nullptr, nullptr);
nullptr, nullptr, nullptr);
uint64_t start_time = env->NowMicros();
if (batch_size > 0) {
for (uint64_t i = 0; i < num; i += batch_size) {

@ -36,7 +36,8 @@ GetContext::GetContext(const Comparator* ucmp,
const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state,
const Slice& user_key, std::string* ret_value,
bool* value_found, MergeContext* merge_context, Env* env,
bool* value_found, MergeContext* merge_context,
RangeDelAggregator* range_del_agg, Env* env,
SequenceNumber* seq,
PinnedIteratorsManager* _pinned_iters_mgr)
: ucmp_(ucmp),
@ -48,6 +49,7 @@ GetContext::GetContext(const Comparator* ucmp,
value_(ret_value),
value_found_(value_found),
merge_context_(merge_context),
range_del_agg_(range_del_agg),
env_(env),
seq_(seq),
replay_log_(nullptr),
@ -93,8 +95,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
}
}
auto type = parsed_key.type;
// Key matches. Process it
switch (parsed_key.type) {
if ((type == kTypeValue || type == kTypeMerge) &&
range_del_agg_ != nullptr && range_del_agg_->ShouldDelete(parsed_key)) {
type = kTypeRangeDeletion;
}
switch (type) {
case kTypeValue:
assert(state_ == kNotFound || state_ == kMerge);
if (kNotFound == state_) {
@ -106,10 +113,10 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
assert(merge_operator_ != nullptr);
state_ = kFound;
if (value_ != nullptr) {
Status merge_status =
MergeHelper::TimedFullMerge(merge_operator_, user_key_, &value,
merge_context_->GetOperands(),
value_, logger_, statistics_, env_);
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator_, user_key_, &value,
merge_context_->GetOperands(), value_, logger_, statistics_,
env_);
if (!merge_status.ok()) {
state_ = kCorrupt;
}
@ -119,6 +126,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
case kTypeDeletion:
case kTypeSingleDeletion:
case kTypeRangeDeletion:
// TODO(noetzli): Verify correctness once merge of single-deletes
// is supported
assert(state_ == kNotFound || state_ == kMerge);

@ -6,6 +6,7 @@
#pragma once
#include <string>
#include "db/merge_context.h"
#include "db/range_del_aggregator.h"
#include "rocksdb/env.h"
#include "rocksdb/types.h"
@ -26,8 +27,8 @@ class GetContext {
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics, GetState init_state,
const Slice& user_key, std::string* ret_value, bool* value_found,
MergeContext* merge_context, Env* env,
SequenceNumber* seq = nullptr,
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
Env* env, SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr);
void MarkKeyMayExist();
@ -68,6 +69,7 @@ class GetContext {
std::string* value_;
bool* value_found_; // Is value set correctly? Used by KeyMayExist
MergeContext* merge_context_;
RangeDelAggregator* range_del_agg_;
Env* env_;
// If a key is found, seq_ will be set to the SequenceNumber of most recent
// write to the key or kMaxSequenceNumber if unknown

@ -168,10 +168,12 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
if (!through_db) {
std::string value;
MergeContext merge_context;
GetContext get_context(
ioptions.user_comparator, ioptions.merge_operator,
ioptions.info_log, ioptions.statistics, GetContext::kNotFound,
Slice(key), &value, nullptr, &merge_context, env);
RangeDelAggregator range_del_agg(ikc, {} /* snapshots */);
GetContext get_context(ioptions.user_comparator,
ioptions.merge_operator, ioptions.info_log,
ioptions.statistics, GetContext::kNotFound,
Slice(key), &value, nullptr, &merge_context,
&range_del_agg, env);
s = table_reader->Get(read_options, key, &get_context);
} else {
s = db->Get(read_options, key, &result);

@ -1799,7 +1799,7 @@ TEST_F(BlockBasedTableTest, BlockCacheDisabledTest) {
{
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), nullptr, nullptr,
nullptr, nullptr);
nullptr, nullptr, nullptr);
// a hack that just to trigger BlockBasedTable::GetFilter.
reader->Get(ReadOptions(), "non-exist-key", &get_context);
BlockCachePropertiesSnapshot props(options.statistics.get());
@ -1965,7 +1965,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) {
std::string value;
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr);
nullptr, nullptr, nullptr);
ASSERT_OK(reader->Get(ReadOptions(), user_key, &get_context));
ASSERT_EQ(value, "hello");
BlockCachePropertiesSnapshot props(options.statistics.get());
@ -2048,7 +2048,7 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) {
std::string value;
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr);
nullptr, nullptr, nullptr);
perf_context.Reset();
ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context));
if (index_and_filter_in_cache) {
@ -2068,7 +2068,7 @@ TEST_F(BlockBasedTableTest, BlockReadCountTest) {
get_context = GetContext(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, user_key, &value, nullptr,
nullptr, nullptr);
nullptr, nullptr, nullptr);
perf_context.Reset();
ASSERT_OK(reader->Get(ReadOptions(), encoded_key, &get_context));
ASSERT_EQ(get_context.State(), GetContext::kNotFound);

@ -768,7 +768,8 @@ ReadOptions::ReadOptions()
prefix_same_as_start(false),
pin_data(false),
background_purge_on_iterator_cleanup(false),
readahead_size(0) {
readahead_size(0),
ignore_range_deletions(false) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}
@ -785,7 +786,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
prefix_same_as_start(false),
pin_data(false),
background_purge_on_iterator_cleanup(false),
readahead_size(0) {
readahead_size(0),
ignore_range_deletions(false) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}

Loading…
Cancel
Save