Fix MultiGet unable to query timestamp data issue (#7589)

Summary:
The filter query key should not contain timestamp. The timestamp is
stripped for Get(), but not MultiGet().

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7589

Reviewed By: riversand963

Differential Revision: D24494661

Pulled By: jay-zhuang

fbshipit-source-id: fc5ff40f9d683a89a760c6ff0ab3aed05a70c317
main
Jay Zhuang 4 years ago committed by Facebook GitHub Bot
parent c992eb118b
commit 881e0dcc09
  1. 1
      HISTORY.md
  2. 222
      db/db_with_timestamp_basic_test.cc
  3. 13
      db/memtable.cc
  4. 12
      db/table_cache.cc
  5. 29
      db/version_set.cc
  6. 15
      table/block_based/block_based_table_reader.cc
  7. 16
      table/block_based/filter_block.h
  8. 6
      table/block_based/full_filter_block.cc
  9. 10
      table/multiget_context.h

@ -10,6 +10,7 @@
* Perform post-flush updates of in-memory data structures of memtable list in a callback that will be called by the thread writing to MANIFEST file before signaling other threads and releasing db mutex.
* Reverted a behavior change silently introduced in 6.14.2, in which the effects of the `ignore_unknown_options` flag (used in option parsing/loading functions) changed.
* Reverted a behavior change silently introduced in 6.14, in which options parsing/loading functions began returning `NotFound` instead of `InvalidArgument` for option names not available in the present version.
* Fixed MultiGet bugs it doesn't return valid data with user defined timestamp.
### Public API Change
* Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options.

@ -492,6 +492,228 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
Close();
}
TEST_F(DBBasicTestWithTimestamp, MultiGetWithFastLocalBloom) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.cache_index_and_filter_blocks = true;
bbto.whole_key_filtering = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
// Write any value
WriteOptions write_opts;
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, "foo", "bar"));
Flush();
// Read with MultiGet
ReadOptions read_opts;
read_opts.timestamp = &ts;
size_t batch_size = 1;
std::vector<Slice> keys(batch_size);
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
keys[0] = "foo";
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
statuses.data());
ASSERT_OK(statuses[0]);
Close();
}
TEST_F(DBBasicTestWithTimestamp, MultiGetWithPrefix) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.prefix_extractor.reset(NewCappedPrefixTransform(5));
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.cache_index_and_filter_blocks = true;
bbto.whole_key_filtering = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
// Write any value
WriteOptions write_opts;
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, "foo", "bar"));
Flush();
// Read with MultiGet
ReadOptions read_opts;
read_opts.timestamp = &ts;
size_t batch_size = 1;
std::vector<Slice> keys(batch_size);
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
keys[0] = "foo";
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
statuses.data());
ASSERT_OK(statuses[0]);
Close();
}
TEST_F(DBBasicTestWithTimestamp, MultiGetWithMemBloomFilter) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.prefix_extractor.reset(NewCappedPrefixTransform(5));
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.cache_index_and_filter_blocks = true;
bbto.whole_key_filtering = false;
options.memtable_prefix_bloom_size_ratio = 0.1;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
// Write any value
WriteOptions write_opts;
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, "foo", "bar"));
// Read with MultiGet
ts_str = Timestamp(2, 0);
ts = ts_str;
ReadOptions read_opts;
read_opts.timestamp = &ts;
size_t batch_size = 1;
std::vector<Slice> keys(batch_size);
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
keys[0] = "foo";
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
statuses.data());
ASSERT_OK(statuses[0]);
Close();
}
TEST_F(DBBasicTestWithTimestamp, MultiGetRangeFiltering) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.cache_index_and_filter_blocks = true;
bbto.whole_key_filtering = false;
options.memtable_prefix_bloom_size_ratio = 0.1;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
// Write any value
WriteOptions write_opts;
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
// random data
for (int i = 0; i < 3; i++) {
auto key = ToString(i * 10);
auto value = ToString(i * 10);
Slice key_slice = key;
Slice value_slice = value;
ASSERT_OK(db_->Put(write_opts, key_slice, value_slice));
Flush();
}
// Make num_levels to 2 to do key range filtering of sst files
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(db_->Put(write_opts, "foo", "bar"));
Flush();
// Read with MultiGet
ts_str = Timestamp(2, 0);
ts = ts_str;
ReadOptions read_opts;
read_opts.timestamp = &ts;
size_t batch_size = 1;
std::vector<Slice> keys(batch_size);
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
keys[0] = "foo";
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
db_->MultiGet(read_opts, cfh, batch_size, keys.data(), values.data(),
statuses.data());
ASSERT_OK(statuses[0]);
Close();
}
TEST_F(DBBasicTestWithTimestamp, MultiGetPrefixFilter) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.prefix_extractor.reset(NewCappedPrefixTransform(5));
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.cache_index_and_filter_blocks = true;
bbto.whole_key_filtering = false;
options.memtable_prefix_bloom_size_ratio = 0.1;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
WriteOptions write_opts;
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, "foo", "bar"));
Flush();
// Read with MultiGet
ts_str = Timestamp(2, 0);
ts = ts_str;
ReadOptions read_opts;
read_opts.timestamp = &ts;
size_t batch_size = 1;
std::vector<Slice> keys(batch_size);
std::vector<std::string> values(batch_size);
std::vector<std::string> timestamps(batch_size);
keys[0] = "foo";
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
std::vector<ColumnFamilyHandle*> cfhs(keys.size(), cfh);
std::vector<Status> statuses =
db_->MultiGet(read_opts, cfhs, keys, &values, &timestamps);
ASSERT_OK(statuses[0]);
Close();
}
TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) {
Options options = CurrentOptions();
options.env = env_;

@ -541,7 +541,8 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
if (bloom_filter_ && prefix_extractor_ &&
prefix_extractor_->InDomain(key)) {
bloom_filter_->Add(prefix_extractor_->Transform(key));
bloom_filter_->Add(
prefix_extractor_->Transform(StripTimestampFromUserKey(key, ts_sz)));
}
if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
bloom_filter_->Add(StripTimestampFromUserKey(key, ts_sz));
@ -908,16 +909,18 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
int num_keys = 0;
for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
if (!prefix_extractor_) {
keys[num_keys++] = &iter->ukey;
} else if (prefix_extractor_->InDomain(iter->ukey)) {
prefixes.emplace_back(prefix_extractor_->Transform(iter->ukey));
keys[num_keys++] = &iter->ukey_without_ts;
} else if (prefix_extractor_->InDomain(iter->ukey_without_ts)) {
prefixes.emplace_back(
prefix_extractor_->Transform(iter->ukey_without_ts));
keys[num_keys++] = &prefixes.back();
}
}
bloom_filter_->MayContain(num_keys, &keys[0], &may_match[0]);
int idx = 0;
for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
if (prefix_extractor_ && !prefix_extractor_->InDomain(iter->ukey)) {
if (prefix_extractor_ &&
!prefix_extractor_->InDomain(iter->ukey_without_ts)) {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
continue;
}

@ -502,8 +502,8 @@ Status TableCache::MultiGet(const ReadOptions& options,
for (auto miter = table_range.begin(); miter != table_range.end();
++miter) {
const Slice& user_key = miter->ukey;
;
const Slice& user_key = miter->ukey_with_ts;
GetContext* get_context = miter->get_context;
if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
@ -539,9 +539,9 @@ Status TableCache::MultiGet(const ReadOptions& options,
++iter) {
SequenceNumber* max_covering_tombstone_seq =
iter->get_context->max_covering_tombstone_seq();
*max_covering_tombstone_seq =
std::max(*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey));
*max_covering_tombstone_seq = std::max(
*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts));
}
}
}
@ -566,7 +566,7 @@ Status TableCache::MultiGet(const ReadOptions& options,
for (auto miter = table_range.begin(); miter != table_range.end();
++miter) {
std::string& row_cache_entry = row_cache_entries[row_idx++];
const Slice& user_key = miter->ukey;
const Slice& user_key = miter->ukey_with_ts;
;
GetContext* get_context = miter->get_context;

@ -440,7 +440,7 @@ class FilePickerMultiGet {
!file_hit)) {
struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
Slice& user_key = batch_iter_->ukey;
Slice& user_key = batch_iter_->ukey_without_ts;
// Do key range filtering of files or/and fractional cascading if:
// (1) not all the files are in level 0, or
@ -454,17 +454,17 @@ class FilePickerMultiGet {
// Check if key is within a file's range. If search left bound and
// right bound point to the same find, we are sure key falls in
// range.
int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
user_key, false, ExtractUserKey(f->smallest_key), true);
assert(curr_level_ == 0 ||
fp_ctx.curr_index_in_curr_level ==
fp_ctx.start_index_in_curr_level ||
user_comparator_->Compare(user_key,
ExtractUserKey(f->smallest_key)) <= 0);
cmp_smallest <= 0);
int cmp_smallest = user_comparator_->Compare(
user_key, ExtractUserKey(f->smallest_key));
if (cmp_smallest >= 0) {
cmp_largest = user_comparator_->Compare(
user_key, ExtractUserKey(f->largest_key));
cmp_largest = user_comparator_->CompareWithoutTimestamp(
user_key, false, ExtractUserKey(f->largest_key), true);
} else {
cmp_largest = -1;
}
@ -497,8 +497,9 @@ class FilePickerMultiGet {
upper_key_ = batch_iter_;
++upper_key_;
while (upper_key_ != current_level_range_.end() &&
user_comparator_->Compare(batch_iter_->ukey, upper_key_->ukey) ==
0) {
user_comparator_->CompareWithoutTimestamp(
batch_iter_->ukey_without_ts, false,
upper_key_->ukey_without_ts, false) == 0) {
++upper_key_;
}
break;
@ -2017,11 +2018,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
assert(iter->s->ok() || iter->s->IsMergeInProgress());
get_ctx.emplace_back(
user_comparator(), merge_operator_, info_log_, db_statistics_,
iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
iter->value, iter->timestamp, nullptr, &(iter->merge_context), true,
&iter->max_covering_tombstone_seq, this->env_, nullptr,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
tracing_mget_id);
iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge,
iter->ukey_with_ts, iter->value, iter->timestamp, nullptr,
&(iter->merge_context), true, &iter->max_covering_tombstone_seq,
this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr,
callback, is_blob, tracing_mget_id);
// MergeInProgress status, if set, has been transferred to the get_context
// state, so we set status to ok here. From now on, the iter status will
// be used for IO errors, and get_context state will be used for any

@ -2208,21 +2208,20 @@ bool BlockBasedTable::FullFilterKeyMayMatch(
Slice user_key = ExtractUserKey(internal_key);
const Slice* const const_ikey_ptr = &internal_key;
bool may_match = true;
size_t ts_sz = rep_->internal_comparator.user_comparator()->timestamp_size();
Slice user_key_without_ts = StripTimestampFromUserKey(user_key, ts_sz);
if (rep_->whole_key_filtering) {
size_t ts_sz =
rep_->internal_comparator.user_comparator()->timestamp_size();
Slice user_key_without_ts = StripTimestampFromUserKey(user_key, ts_sz);
may_match =
filter->KeyMayMatch(user_key_without_ts, prefix_extractor, kNotValid,
no_io, const_ikey_ptr, get_context, lookup_context);
} else if (!read_options.total_order_seek && prefix_extractor &&
rep_->table_properties->prefix_extractor_name.compare(
prefix_extractor->Name()) == 0 &&
prefix_extractor->InDomain(user_key) &&
!filter->PrefixMayMatch(prefix_extractor->Transform(user_key),
prefix_extractor, kNotValid, no_io,
const_ikey_ptr, get_context,
lookup_context)) {
prefix_extractor->InDomain(user_key_without_ts) &&
!filter->PrefixMayMatch(
prefix_extractor->Transform(user_key_without_ts),
prefix_extractor, kNotValid, no_io, const_ikey_ptr,
get_context, lookup_context)) {
may_match = false;
}
if (may_match) {

@ -108,11 +108,11 @@ class FilterBlockReader {
uint64_t block_offset, const bool no_io,
BlockCacheLookupContext* lookup_context) {
for (auto iter = range->begin(); iter != range->end(); ++iter) {
const Slice ukey = iter->ukey;
const Slice ukey_without_ts = iter->ukey_without_ts;
const Slice ikey = iter->ikey;
GetContext* const get_context = iter->get_context;
if (!KeyMayMatch(ukey, prefix_extractor, block_offset, no_io, &ikey,
get_context, lookup_context)) {
if (!KeyMayMatch(ukey_without_ts, prefix_extractor, block_offset, no_io,
&ikey, get_context, lookup_context)) {
range->SkipKey(iter);
}
}
@ -133,13 +133,13 @@ class FilterBlockReader {
uint64_t block_offset, const bool no_io,
BlockCacheLookupContext* lookup_context) {
for (auto iter = range->begin(); iter != range->end(); ++iter) {
const Slice ukey = iter->ukey;
const Slice ukey_without_ts = iter->ukey_without_ts;
const Slice ikey = iter->ikey;
GetContext* const get_context = iter->get_context;
if (prefix_extractor->InDomain(ukey) &&
!PrefixMayMatch(prefix_extractor->Transform(ukey), prefix_extractor,
block_offset, no_io, &ikey, get_context,
lookup_context)) {
if (prefix_extractor->InDomain(ukey_without_ts) &&
!PrefixMayMatch(prefix_extractor->Transform(ukey_without_ts),
prefix_extractor, block_offset, no_io, &ikey,
get_context, lookup_context)) {
range->SkipKey(iter);
}
}

@ -245,9 +245,9 @@ void FullFilterBlockReader::MayMatch(
MultiGetRange filter_range(*range, range->begin(), range->end());
for (auto iter = filter_range.begin(); iter != filter_range.end(); ++iter) {
if (!prefix_extractor) {
keys[num_keys++] = &iter->ukey;
} else if (prefix_extractor->InDomain(iter->ukey)) {
prefixes.emplace_back(prefix_extractor->Transform(iter->ukey));
keys[num_keys++] = &iter->ukey_without_ts;
} else if (prefix_extractor->InDomain(iter->ukey_without_ts)) {
prefixes.emplace_back(prefix_extractor->Transform(iter->ukey_without_ts));
keys[num_keys++] = &prefixes.back();
} else {
filter_range.SkipKey(iter);

@ -7,6 +7,8 @@
#include <algorithm>
#include <array>
#include <string>
#include "db/dbformat.h"
#include "db/lookup_key.h"
#include "db/merge_context.h"
#include "rocksdb/env.h"
@ -21,7 +23,8 @@ class GetContext;
struct KeyContext {
const Slice* key;
LookupKey* lkey;
Slice ukey;
Slice ukey_with_ts;
Slice ukey_without_ts;
Slice ikey;
ColumnFamilyHandle* column_family;
Status* s;
@ -110,7 +113,10 @@ class MultiGetContext {
sorted_keys_[iter] = (*sorted_keys)[begin + iter];
sorted_keys_[iter]->lkey = new (&lookup_key_ptr_[iter])
LookupKey(*sorted_keys_[iter]->key, snapshot, read_opts.timestamp);
sorted_keys_[iter]->ukey = sorted_keys_[iter]->lkey->user_key();
sorted_keys_[iter]->ukey_with_ts = sorted_keys_[iter]->lkey->user_key();
sorted_keys_[iter]->ukey_without_ts = StripTimestampFromUserKey(
sorted_keys_[iter]->lkey->user_key(),
read_opts.timestamp == nullptr ? 0 : read_opts.timestamp->size());
sorted_keys_[iter]->ikey = sorted_keys_[iter]->lkey->internal_key();
}
}

Loading…
Cancel
Save