Fix range deletion handling in async MultiGet (#10534)

Summary:
The fix in https://github.com/facebook/rocksdb/issues/10513 was not complete w.r.t range deletion handling. It didn't handle the case where a file with a range tombstone covering a key also overlapped another key in the batch. In that case, ```mget_range``` would be non-empty. However, ```mget_range``` would only have the second key and, therefore, the first key would be skipped when iterating through the range tombstones in ```TableCache::MultiGet```.

Test plan -
1. Add a unit test
2. Run stress tests

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10534

Reviewed By: akankshamahajan15

Differential Revision: D38773880

Pulled By: anand1976

fbshipit-source-id: dae491dbe52e18bbce5179b77b63f20771a66c00
main
anand76 2 years ago committed by Facebook GitHub Bot
parent 275cd80cdb
commit 65814a4ae6
  1. 30
      db/db_basic_test.cc
  2. 10
      db/table_cache.cc
  3. 3
      db/table_cache.h
  4. 6
      db/table_cache_sync_and_async.h
  5. 11
      db/version_set.cc
  6. 3
      db/version_set.h
  7. 4
      db/version_set_sync_and_async.h

@ -2407,6 +2407,36 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) {
// Bloom filters in L0/L1 will avoid the coroutine calls in those levels // Bloom filters in L0/L1 will avoid the coroutine calls in those levels
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
} }
TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
// 139 and 163 are in L2, but overlap with a range deletes in L1
key_strs.push_back(Key(139));
key_strs.push_back(Key(144));
key_strs.push_back(Key(163));
keys.push_back(key_strs[0]);
keys.push_back(key_strs[1]);
keys.push_back(key_strs[2]);
values.resize(keys.size());
statuses.resize(keys.size());
ReadOptions ro;
ro.async_io = true;
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), keys.size());
ASSERT_EQ(statuses[0], Status::NotFound());
ASSERT_EQ(statuses[1], Status::OK());
ASSERT_EQ(values[1], "val_l1_" + std::to_string(144));
ASSERT_EQ(statuses[2], Status::NotFound());
// Bloom filters in L0/L1 will avoid the coroutine calls in those levels
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
}
#endif // USE_COROUTINES #endif // USE_COROUTINES
TEST_F(DBBasicTest, MultiGetStats) { TEST_F(DBBasicTest, MultiGetStats) {

@ -557,18 +557,16 @@ Status TableCache::MultiGetFilter(
if (s.ok()) { if (s.ok()) {
s = t->MultiGetFilter(options, prefix_extractor.get(), mget_range); s = t->MultiGetFilter(options, prefix_extractor.get(), mget_range);
} }
if (mget_range->empty()) {
if (s.ok() && !options.ignore_range_deletions) { if (s.ok() && !options.ignore_range_deletions) {
// If all the keys have been filtered out by the bloom filter, then // Update the range tombstone sequence numbers for the keys here
// update the range tombstone sequence numbers for the keys as // as TableCache::MultiGet may or may not be called, and even if it
// MultiGet() will not be called for this set of keys. // is, it may be called with fewer keys in the rangedue to filtering.
UpdateRangeTombstoneSeqnums(options, t, tombstone_range); UpdateRangeTombstoneSeqnums(options, t, tombstone_range);
} }
if (handle) { if (mget_range->empty() && handle) {
ReleaseHandle(handle); ReleaseHandle(handle);
*table_handle = nullptr; *table_handle = nullptr;
} }
}
return s; return s;
} }

@ -135,7 +135,8 @@ class TableCache {
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range, const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr, const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
HistogramImpl* file_read_hist = nullptr, bool skip_filters = false, HistogramImpl* file_read_hist = nullptr, bool skip_filters = false,
int level = -1, Cache::Handle* table_handle = nullptr); bool skip_range_deletions = false, int level = -1,
Cache::Handle* table_handle = nullptr);
// Evict any entry for the specified file number // Evict any entry for the specified file number
static void Evict(Cache* cache, uint64_t file_number); static void Evict(Cache* cache, uint64_t file_number);

@ -18,8 +18,8 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
(const ReadOptions& options, const InternalKeyComparator& internal_comparator, (const ReadOptions& options, const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range, const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
const std::shared_ptr<const SliceTransform>& prefix_extractor, const std::shared_ptr<const SliceTransform>& prefix_extractor,
HistogramImpl* file_read_hist, bool skip_filters, int level, HistogramImpl* file_read_hist, bool skip_filters, bool skip_range_deletions,
Cache::Handle* table_handle) { int level, Cache::Handle* table_handle) {
auto& fd = file_meta.fd; auto& fd = file_meta.fd;
Status s; Status s;
TableReader* t = fd.table_reader; TableReader* t = fd.table_reader;
@ -79,7 +79,7 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
assert(t); assert(t);
} }
} }
if (s.ok() && !options.ignore_range_deletions) { if (s.ok() && !options.ignore_range_deletions && !skip_range_deletions) {
UpdateRangeTombstoneSeqnums(options, t, table_range); UpdateRangeTombstoneSeqnums(options, t, table_range);
} }
if (s.ok()) { if (s.ok()) {

@ -2219,7 +2219,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
static_cast<int>(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()); static_cast<int>(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel());
// Call MultiGetFromSST for looking up a single file // Call MultiGetFromSST for looking up a single file
s = MultiGetFromSST(read_options, fp.CurrentFileRange(), s = MultiGetFromSST(read_options, fp.CurrentFileRange(),
fp.GetHitFileLevel(), skip_filters, f, blob_ctxs, fp.GetHitFileLevel(), skip_filters,
/*skip_range_deletions=*/false, f, blob_ctxs,
/*table_handle=*/nullptr, num_filter_read, /*table_handle=*/nullptr, num_filter_read,
num_index_read, num_sst_read); num_index_read, num_sst_read);
if (fp.GetHitFileLevel() == 0) { if (fp.GetHitFileLevel() == 0) {
@ -2237,12 +2238,14 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
Cache::Handle* table_handle = nullptr; Cache::Handle* table_handle = nullptr;
bool skip_filters = IsFilterSkipped( bool skip_filters = IsFilterSkipped(
static_cast<int>(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()); static_cast<int>(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel());
bool skip_range_deletions = false;
if (!skip_filters) { if (!skip_filters) {
Status status = table_cache_->MultiGetFilter( Status status = table_cache_->MultiGetFilter(
read_options, *internal_comparator(), *f->file_metadata, read_options, *internal_comparator(), *f->file_metadata,
mutable_cf_options_.prefix_extractor, mutable_cf_options_.prefix_extractor,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
fp.GetHitFileLevel(), &file_range, &table_handle); fp.GetHitFileLevel(), &file_range, &table_handle);
skip_range_deletions = true;
if (status.ok()) { if (status.ok()) {
skip_filters = true; skip_filters = true;
} else if (!status.IsNotSupported()) { } else if (!status.IsNotSupported()) {
@ -2256,9 +2259,9 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
if (!file_range.empty()) { if (!file_range.empty()) {
mget_tasks.emplace_back(MultiGetFromSSTCoroutine( mget_tasks.emplace_back(MultiGetFromSSTCoroutine(
read_options, file_range, fp.GetHitFileLevel(), skip_filters, f, read_options, file_range, fp.GetHitFileLevel(), skip_filters,
blob_ctxs, table_handle, num_filter_read, num_index_read, skip_range_deletions, f, blob_ctxs, table_handle, num_filter_read,
num_sst_read)); num_index_read, num_sst_read));
} }
if (fp.KeyMaySpanNextFile()) { if (fp.KeyMaySpanNextFile()) {
break; break;

@ -990,7 +990,8 @@ class Version {
DECLARE_SYNC_AND_ASYNC( DECLARE_SYNC_AND_ASYNC(
/* ret_type */ Status, /* func_name */ MultiGetFromSST, /* ret_type */ Status, /* func_name */ MultiGetFromSST,
const ReadOptions& read_options, MultiGetRange file_range, const ReadOptions& read_options, MultiGetRange file_range,
int hit_file_level, bool skip_filters, FdWithKeyRange* f, int hit_file_level, bool skip_filters, bool skip_range_deletions,
FdWithKeyRange* f,
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs, std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs,
Cache::Handle* table_handle, uint64_t& num_filter_read, Cache::Handle* table_handle, uint64_t& num_filter_read,
uint64_t& num_index_read, uint64_t& num_sst_read); uint64_t& num_index_read, uint64_t& num_sst_read);

@ -14,7 +14,7 @@ namespace ROCKSDB_NAMESPACE {
// Lookup a batch of keys in a single SST file // Lookup a batch of keys in a single SST file
DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST) DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
(const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level, (const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level,
bool skip_filters, FdWithKeyRange* f, bool skip_filters, bool skip_range_deletions, FdWithKeyRange* f,
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs, std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs,
Cache::Handle* table_handle, uint64_t& num_filter_read, Cache::Handle* table_handle, uint64_t& num_filter_read,
uint64_t& num_index_read, uint64_t& num_sst_read) { uint64_t& num_index_read, uint64_t& num_sst_read) {
@ -27,7 +27,7 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
read_options, *internal_comparator(), *f->file_metadata, &file_range, read_options, *internal_comparator(), *f->file_metadata, &file_range,
mutable_cf_options_.prefix_extractor, mutable_cf_options_.prefix_extractor,
cfd_->internal_stats()->GetFileReadHist(hit_file_level), skip_filters, cfd_->internal_stats()->GetFileReadHist(hit_file_level), skip_filters,
hit_file_level, table_handle); skip_range_deletions, hit_file_level, table_handle);
// TODO: examine the behavior for corrupted key // TODO: examine the behavior for corrupted key
if (timer_enabled) { if (timer_enabled) {
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),

Loading…
Cancel
Save