Allow MultiGet users to limit cumulative value size (#6826)

Summary:
1. Add a value_size in read options which limits the cumulative value size of keys read in batches. Once the size exceeds read_options.value_size, all the remaining keys are returned with status Abort without further fetching any key.
2. Add a unit test case MultiGetBatchedValueSizeSimple the reads keys from memory and sst files.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6826

Test Plan:
1. make check -j64
	   2. Add a new unit test case

Reviewed By: anand1976

Differential Revision: D21471483

Pulled By: akankshamahajan15

fbshipit-source-id: dea51b8e76d5d1df38ece8cdb29933b1d798b900
main
Akanksha Mahajan 4 years ago committed by Facebook GitHub Bot
parent 9060e6fa79
commit bcefc59e9f
  1. 1
      HISTORY.md
  2. 215
      db/db_basic_test.cc
  3. 25
      db/db_impl/db_impl.cc
  4. 10
      db/memtable.cc
  5. 26
      db/version_set.cc
  6. 7
      include/rocksdb/options.h
  7. 6
      options/options.cc
  8. 6
      table/multiget_context.h

@ -19,6 +19,7 @@
### New Feature
* sst_dump to add a new --readahead_size argument. Users can specify read size when scanning the data. Sst_dump also tries to prefetch tail part of the SST files so usually some number of I/Os are saved there too.
* Generate file checksum in SstFileWriter if Options.file_checksum_gen_factory is set. The checksum and checksum function name are stored in ExternalSstFileInfo after the sst file write is finished.
* Add a value_size_soft_limit in read options which limits the cumulative value size of keys read in batches in MultiGet. Once the cumulative value size of found keys exceeds read_options.value_size_soft_limit, all the remaining keys are returned with status Abort without further finding their values. By default the value_size_soft_limit is std::numeric_limits<uint64_t>::max().
## 6.10 (5/2/2020)
### Bug Fixes

@ -1501,6 +1501,221 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
}
}
TEST_F(DBBasicTest, MultiGetBatchedValueSizeInMemory) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
SetPerfLevel(kEnableCount);
ASSERT_OK(Put(1, "k1", "v_1"));
ASSERT_OK(Put(1, "k2", "v_2"));
ASSERT_OK(Put(1, "k3", "v_3"));
ASSERT_OK(Put(1, "k4", "v_4"));
ASSERT_OK(Put(1, "k5", "v_5"));
ASSERT_OK(Put(1, "k6", "v_6"));
std::vector<Slice> keys = {"k1", "k2", "k3", "k4", "k5", "k6"};
std::vector<PinnableSlice> values(keys.size());
std::vector<Status> s(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
get_perf_context()->Reset();
ReadOptions ro;
ro.value_size_soft_limit = 11;
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
s.data(), false);
ASSERT_EQ(values.size(), keys.size());
for (unsigned int i = 0; i < 4; i++) {
ASSERT_EQ(std::string(values[i].data(), values[i].size()),
"v_" + std::to_string(i + 1));
}
for (unsigned int i = 4; i < 6; i++) {
ASSERT_TRUE(s[i].IsAborted());
}
ASSERT_EQ(12, (int)get_perf_context()->multiget_read_bytes);
SetPerfLevel(kDisable);
}
TEST_F(DBBasicTest, MultiGetBatchedValueSize) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
SetPerfLevel(kEnableCount);
ASSERT_OK(Put(1, "k6", "v6"));
ASSERT_OK(Put(1, "k7", "v7_"));
ASSERT_OK(Put(1, "k3", "v3_"));
ASSERT_OK(Put(1, "k4", "v4"));
Flush(1);
ASSERT_OK(Delete(1, "k4"));
ASSERT_OK(Put(1, "k11", "v11"));
ASSERT_OK(Delete(1, "no_key"));
ASSERT_OK(Put(1, "k8", "v8_"));
ASSERT_OK(Put(1, "k13", "v13"));
ASSERT_OK(Put(1, "k14", "v14"));
ASSERT_OK(Put(1, "k15", "v15"));
ASSERT_OK(Put(1, "k16", "v16"));
ASSERT_OK(Put(1, "k17", "v17"));
Flush(1);
ASSERT_OK(Put(1, "k1", "v1_"));
ASSERT_OK(Put(1, "k2", "v2_"));
ASSERT_OK(Put(1, "k5", "v5_"));
ASSERT_OK(Put(1, "k9", "v9_"));
ASSERT_OK(Put(1, "k10", "v10"));
ASSERT_OK(Delete(1, "k2"));
ASSERT_OK(Delete(1, "k6"));
get_perf_context()->Reset();
std::vector<Slice> keys({"k1", "k10", "k11", "k12", "k13", "k14", "k15",
"k16", "k17", "k2", "k3", "k4", "k5", "k6", "k7",
"k8", "k9", "no_key"});
std::vector<PinnableSlice> values(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
std::vector<Status> s(keys.size());
ReadOptions ro;
ro.value_size_soft_limit = 20;
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
s.data(), false);
ASSERT_EQ(values.size(), keys.size());
// In memory keys
ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1_");
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v10");
ASSERT_TRUE(s[9].IsNotFound()); // k2
ASSERT_EQ(std::string(values[12].data(), values[12].size()), "v5_");
ASSERT_TRUE(s[13].IsNotFound()); // k6
ASSERT_EQ(std::string(values[16].data(), values[16].size()), "v9_");
// In sst files
ASSERT_EQ(std::string(values[2].data(), values[1].size()), "v11");
ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v13");
ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v14");
// Remaining aborted after value_size exceeds.
ASSERT_TRUE(s[3].IsAborted());
ASSERT_TRUE(s[6].IsAborted());
ASSERT_TRUE(s[7].IsAborted());
ASSERT_TRUE(s[8].IsAborted());
ASSERT_TRUE(s[10].IsAborted());
ASSERT_TRUE(s[11].IsAborted());
ASSERT_TRUE(s[14].IsAborted());
ASSERT_TRUE(s[15].IsAborted());
ASSERT_TRUE(s[17].IsAborted());
// 6 kv pairs * 3 bytes per value (i.e. 18)
ASSERT_EQ(21, (int)get_perf_context()->multiget_read_bytes);
SetPerfLevel(kDisable);
} while (ChangeCompactOptions());
}
TEST_F(DBBasicTest, MultiGetBatchedValueSizeMultiLevelMerge) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
int num_keys = 0;
for (int i = 0; i < 64; ++i) {
ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
Flush();
num_keys = 0;
}
}
if (num_keys > 0) {
Flush();
num_keys = 0;
}
MoveFilesToLevel(2);
for (int i = 0; i < 64; i += 3) {
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
Flush();
num_keys = 0;
}
}
if (num_keys > 0) {
Flush();
num_keys = 0;
}
MoveFilesToLevel(1);
for (int i = 0; i < 64; i += 5) {
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
Flush();
num_keys = 0;
}
}
if (num_keys > 0) {
Flush();
num_keys = 0;
}
ASSERT_EQ(0, num_keys);
for (int i = 0; i < 64; i += 9) {
ASSERT_OK(
Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
}
std::vector<std::string> keys_str;
for (int i = 10; i < 50; ++i) {
keys_str.push_back("key_" + std::to_string(i));
}
std::vector<Slice> keys(keys_str.size());
for (int i = 0; i < 40; i++) {
keys[i] = Slice(keys_str[i]);
}
std::vector<PinnableSlice> values(keys_str.size());
std::vector<Status> statuses(keys_str.size());
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.value_size_soft_limit = 380;
db_->MultiGet(read_options, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), keys.size());
uint64_t curr_value_size = 0;
for (unsigned int j = 0; j < 26; ++j) {
int key = j + 10;
std::string value;
value.append("val_l2_" + std::to_string(key));
if (key % 3 == 0) {
value.append(",");
value.append("val_l1_" + std::to_string(key));
}
if (key % 5 == 0) {
value.append(",");
value.append("val_l0_" + std::to_string(key));
}
if (key % 9 == 0) {
value.append(",");
value.append("val_mem_" + std::to_string(key));
}
curr_value_size += value.size();
ASSERT_EQ(values[j], value);
ASSERT_OK(statuses[j]);
}
// ASSERT_TRUE(curr_value_size <= read_options.value_size_hard_limit);
// All remaning keys status is set Status::Abort
for (unsigned int j = 26; j < 40; j++) {
ASSERT_TRUE(statuses[j].IsAborted());
}
}
// Test class for batched MultiGet with prefix extractor
// Param bool - If true, use partitioned filters
// If false, use full filter block

@ -1787,6 +1787,7 @@ std::vector<Status> DBImpl::MultiGet(
// merge_operands will contain the sequence of merges in the latter case.
size_t num_found = 0;
size_t keys_read;
uint64_t curr_value_size = 0;
for (keys_read = 0; keys_read < num_keys; ++keys_read) {
merge_context.Clear();
Status& s = stat_list[keys_read];
@ -1830,6 +1831,13 @@ std::vector<Status> DBImpl::MultiGet(
if (s.ok()) {
bytes_read += value->size();
num_found++;
curr_value_size += value->size();
if (curr_value_size > read_options.value_size_soft_limit) {
while (++keys_read < num_keys) {
stat_list[keys_read] = Status::Aborted();
}
break;
}
}
if (read_options.deadline.count() &&
@ -2084,11 +2092,11 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
}
}
if (!s.ok()) {
assert(s.IsTimedOut());
assert(s.IsTimedOut() || s.IsAborted());
for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) {
for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys;
++i) {
*sorted_keys[i]->s = Status::TimedOut();
*sorted_keys[i]->s = s;
}
}
}
@ -2243,7 +2251,7 @@ void DBImpl::MultiGetWithCallback(
Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
multiget_cf_data[0].super_version, consistent_seqnum,
nullptr, nullptr);
assert(s.ok() || s.IsTimedOut());
assert(s.ok() || s.IsTimedOut() || s.IsAborted());
ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
multiget_cf_data[0].super_version);
}
@ -2271,6 +2279,7 @@ Status DBImpl::MultiGetImpl(
// merge_operands will contain the sequence of merges in the latter case.
size_t keys_left = num_keys;
Status s;
uint64_t curr_value_size = 0;
while (keys_left) {
if (read_options.deadline.count() &&
env_->NowMicros() >
@ -2285,6 +2294,7 @@ Status DBImpl::MultiGetImpl(
MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
batch_size, snapshot, read_options);
MultiGetRange range = ctx.GetMultiGetRange();
range.AddValueSize(curr_value_size);
bool lookup_current = false;
keys_left -= batch_size;
@ -2315,6 +2325,11 @@ Status DBImpl::MultiGetImpl(
super_version->current->MultiGet(read_options, &range, callback,
is_blob_index);
}
curr_value_size = range.GetValueSize();
if (curr_value_size > read_options.value_size_soft_limit) {
s = Status::Aborted();
break;
}
}
// Post processing (decrement reference counts and record statistics)
@ -2329,11 +2344,11 @@ Status DBImpl::MultiGetImpl(
}
}
if (keys_left) {
assert(s.IsTimedOut());
assert(s.IsTimedOut() || s.IsAborted());
for (size_t i = start_key + num_keys - keys_left; i < start_key + num_keys;
++i) {
KeyContext* key = (*sorted_keys)[i];
*key->s = Status::TimedOut();
*key->s = s;
}
}

@ -929,8 +929,18 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
if (found_final_value) {
iter->value->PinSelf();
range->AddValueSize(iter->value->size());
range->MarkKeyDone(iter);
RecordTick(moptions_.statistics, MEMTABLE_HIT);
if (range->GetValueSize() > read_options.value_size_soft_limit) {
// Set all remaining keys in range to Abort
for (auto range_iter = range->begin(); range_iter != range->end();
++range_iter) {
range->MarkKeyDone(range_iter);
*(range_iter->s) = Status::Aborted();
}
break;
}
}
}
PERF_COUNTER_ADD(get_from_memtable_count, 1);

@ -1932,6 +1932,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
&storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
&storage_info_.file_indexer_, user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
Status s;
while (f != nullptr) {
MultiGetRange file_range = fp.CurrentFileRange();
@ -1939,7 +1940,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
get_perf_context()->per_level_perf_context_enabled;
StopWatchNano timer(env_, timer_enabled /* auto_start */);
Status s = table_cache_->MultiGet(
s = table_cache_->MultiGet(
read_options, *internal_comparator(), *f->file_metadata, &file_range,
mutable_cf_options_.prefix_extractor.get(),
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
@ -1960,7 +1961,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
return;
}
uint64_t batch_size = 0;
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
++iter) {
GetContext& get_context = *iter->get_context;
Status* status = iter->s;
// The Status in the KeyContext takes precedence over GetContext state
@ -2006,7 +2008,12 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
}
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
fp.GetHitFileLevel());
file_range.AddValueSize(iter->value->size());
file_range.MarkKeyDone(iter);
if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
s = Status::Aborted();
break;
}
continue;
case GetContext::kDeleted:
// Use empty error message for speed
@ -2028,14 +2035,14 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
}
}
RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
if (file_picker_range.empty()) {
if (!s.ok() || file_picker_range.empty()) {
break;
}
f = fp.GetNextFile();
}
// Process any left over keys
for (auto iter = range->begin(); iter != range->end(); ++iter) {
for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
GetContext& get_context = *iter->get_context;
Status* status = iter->s;
Slice user_key = iter->lkey->user_key();
@ -2060,12 +2067,23 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
nullptr /* result_operand */, true);
if (LIKELY(iter->value != nullptr)) {
iter->value->PinSelf();
range->AddValueSize(iter->value->size());
range->MarkKeyDone(iter);
if (range->GetValueSize() > read_options.value_size_soft_limit) {
s = Status::Aborted();
break;
}
}
} else {
range->MarkKeyDone(iter);
*status = Status::NotFound(); // Use an empty error message for speed
}
}
for (auto iter = range->begin(); iter != range->end(); ++iter) {
range->MarkKeyDone(iter);
*(iter->s) = s;
}
}
bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {

@ -1356,6 +1356,13 @@ struct ReadOptions {
// processing a batch
std::chrono::microseconds deadline;
// It limits the maximum cumulative value size of the keys in batch while
// reading through MultiGet. Once the cumulative value size exceeds this
// soft limit then all the remaining keys are returned with status Aborted.
//
// Default: std::numeric_limits<uint64_t>::max()
uint64_t value_size_soft_limit;
ReadOptions();
ReadOptions(bool cksum, bool cache);
};

@ -608,7 +608,8 @@ ReadOptions::ReadOptions()
iter_start_seqnum(0),
timestamp(nullptr),
iter_start_ts(nullptr),
deadline(std::chrono::microseconds::zero()) {}
deadline(std::chrono::microseconds::zero()),
value_size_soft_limit(std::numeric_limits<uint64_t>::max()) {}
ReadOptions::ReadOptions(bool cksum, bool cache)
: snapshot(nullptr),
@ -630,6 +631,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
iter_start_seqnum(0),
timestamp(nullptr),
iter_start_ts(nullptr),
deadline(std::chrono::microseconds::zero()) {}
deadline(std::chrono::microseconds::zero()),
value_size_soft_limit(std::numeric_limits<uint64_t>::max()) {}
} // namespace ROCKSDB_NAMESPACE

@ -97,6 +97,7 @@ class MultiGetContext {
const ReadOptions& read_opts)
: num_keys_(num_keys),
value_mask_(0),
value_size_(0),
lookup_key_ptr_(reinterpret_cast<LookupKey*>(lookup_key_stack_buf)) {
if (num_keys > MAX_LOOKUP_KEYS_ON_STACK) {
lookup_key_heap_buf.reset(new char[sizeof(LookupKey) * num_keys]);
@ -127,6 +128,7 @@ class MultiGetContext {
std::array<KeyContext*, MAX_BATCH_SIZE> sorted_keys_;
size_t num_keys_;
uint64_t value_mask_;
uint64_t value_size_;
std::unique_ptr<char[]> lookup_key_heap_buf;
LookupKey* lookup_key_ptr_;
@ -243,6 +245,10 @@ class MultiGetContext {
skip_mask_ |= other.skip_mask_;
}
uint64_t GetValueSize() { return ctx_->value_size_; }
void AddValueSize(uint64_t value_size) { ctx_->value_size_ += value_size; }
private:
friend MultiGetContext;
MultiGetContext* ctx_;

Loading…
Cancel
Save