From bcefc59e9fff388b11f2645a934e52d957829408 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Wed, 27 May 2020 13:03:08 -0700 Subject: [PATCH] Allow MultiGet users to limit cumulative value size (#6826) Summary: 1. Add a value_size in read options which limits the cumulative value size of keys read in batches. Once the size exceeds read_options.value_size, all the remaining keys are returned with status Abort without further fetching any key. 2. Add a unit test case MultiGetBatchedValueSizeSimple the reads keys from memory and sst files. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6826 Test Plan: 1. make check -j64 2. Add a new unit test case Reviewed By: anand1976 Differential Revision: D21471483 Pulled By: akankshamahajan15 fbshipit-source-id: dea51b8e76d5d1df38ece8cdb29933b1d798b900 --- HISTORY.md | 1 + db/db_basic_test.cc | 215 ++++++++++++++++++++++++++++++++++++++ db/db_impl/db_impl.cc | 25 ++++- db/memtable.cc | 10 ++ db/version_set.cc | 26 ++++- include/rocksdb/options.h | 7 ++ options/options.cc | 6 +- table/multiget_context.h | 6 ++ 8 files changed, 285 insertions(+), 11 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index d5d79301e..db22132c2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,6 +19,7 @@ ### New Feature * sst_dump to add a new --readahead_size argument. Users can specify read size when scanning the data. Sst_dump also tries to prefetch tail part of the SST files so usually some number of I/Os are saved there too. * Generate file checksum in SstFileWriter if Options.file_checksum_gen_factory is set. The checksum and checksum function name are stored in ExternalSstFileInfo after the sst file write is finished. +* Add a value_size_soft_limit in read options which limits the cumulative value size of keys read in batches in MultiGet. Once the cumulative value size of found keys exceeds read_options.value_size_soft_limit, all the remaining keys are returned with status Abort without further finding their values. By default the value_size_soft_limit is std::numeric_limits::max(). ## 6.10 (5/2/2020) ### Bug Fixes diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index a585c15c5..323bed873 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1501,6 +1501,221 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) { } } +TEST_F(DBBasicTest, MultiGetBatchedValueSizeInMemory) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + SetPerfLevel(kEnableCount); + ASSERT_OK(Put(1, "k1", "v_1")); + ASSERT_OK(Put(1, "k2", "v_2")); + ASSERT_OK(Put(1, "k3", "v_3")); + ASSERT_OK(Put(1, "k4", "v_4")); + ASSERT_OK(Put(1, "k5", "v_5")); + ASSERT_OK(Put(1, "k6", "v_6")); + std::vector keys = {"k1", "k2", "k3", "k4", "k5", "k6"}; + std::vector values(keys.size()); + std::vector s(keys.size()); + std::vector cfs(keys.size(), handles_[1]); + + get_perf_context()->Reset(); + ReadOptions ro; + ro.value_size_soft_limit = 11; + db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(), + s.data(), false); + + ASSERT_EQ(values.size(), keys.size()); + for (unsigned int i = 0; i < 4; i++) { + ASSERT_EQ(std::string(values[i].data(), values[i].size()), + "v_" + std::to_string(i + 1)); + } + + for (unsigned int i = 4; i < 6; i++) { + ASSERT_TRUE(s[i].IsAborted()); + } + + ASSERT_EQ(12, (int)get_perf_context()->multiget_read_bytes); + SetPerfLevel(kDisable); +} + +TEST_F(DBBasicTest, MultiGetBatchedValueSize) { + do { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + SetPerfLevel(kEnableCount); + + ASSERT_OK(Put(1, "k6", "v6")); + ASSERT_OK(Put(1, "k7", "v7_")); + ASSERT_OK(Put(1, "k3", "v3_")); + ASSERT_OK(Put(1, "k4", "v4")); + Flush(1); + ASSERT_OK(Delete(1, "k4")); + ASSERT_OK(Put(1, "k11", "v11")); + ASSERT_OK(Delete(1, "no_key")); + ASSERT_OK(Put(1, "k8", "v8_")); + ASSERT_OK(Put(1, "k13", "v13")); + ASSERT_OK(Put(1, "k14", "v14")); + ASSERT_OK(Put(1, "k15", "v15")); + ASSERT_OK(Put(1, "k16", "v16")); + ASSERT_OK(Put(1, "k17", "v17")); + Flush(1); + + ASSERT_OK(Put(1, "k1", "v1_")); + ASSERT_OK(Put(1, "k2", "v2_")); + ASSERT_OK(Put(1, "k5", "v5_")); + ASSERT_OK(Put(1, "k9", "v9_")); + ASSERT_OK(Put(1, "k10", "v10")); + ASSERT_OK(Delete(1, "k2")); + ASSERT_OK(Delete(1, "k6")); + + get_perf_context()->Reset(); + + std::vector keys({"k1", "k10", "k11", "k12", "k13", "k14", "k15", + "k16", "k17", "k2", "k3", "k4", "k5", "k6", "k7", + "k8", "k9", "no_key"}); + std::vector values(keys.size()); + std::vector cfs(keys.size(), handles_[1]); + std::vector s(keys.size()); + + ReadOptions ro; + ro.value_size_soft_limit = 20; + db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(), + s.data(), false); + + ASSERT_EQ(values.size(), keys.size()); + + // In memory keys + ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1_"); + ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v10"); + ASSERT_TRUE(s[9].IsNotFound()); // k2 + ASSERT_EQ(std::string(values[12].data(), values[12].size()), "v5_"); + ASSERT_TRUE(s[13].IsNotFound()); // k6 + ASSERT_EQ(std::string(values[16].data(), values[16].size()), "v9_"); + + // In sst files + ASSERT_EQ(std::string(values[2].data(), values[1].size()), "v11"); + ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v13"); + ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v14"); + + // Remaining aborted after value_size exceeds. + ASSERT_TRUE(s[3].IsAborted()); + ASSERT_TRUE(s[6].IsAborted()); + ASSERT_TRUE(s[7].IsAborted()); + ASSERT_TRUE(s[8].IsAborted()); + ASSERT_TRUE(s[10].IsAborted()); + ASSERT_TRUE(s[11].IsAborted()); + ASSERT_TRUE(s[14].IsAborted()); + ASSERT_TRUE(s[15].IsAborted()); + ASSERT_TRUE(s[17].IsAborted()); + + // 6 kv pairs * 3 bytes per value (i.e. 18) + ASSERT_EQ(21, (int)get_perf_context()->multiget_read_bytes); + SetPerfLevel(kDisable); + } while (ChangeCompactOptions()); +} + +TEST_F(DBBasicTest, MultiGetBatchedValueSizeMultiLevelMerge) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + int num_keys = 0; + + for (int i = 0; i < 64; ++i) { + ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + MoveFilesToLevel(2); + + for (int i = 0; i < 64; i += 3) { + ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + MoveFilesToLevel(1); + + for (int i = 0; i < 64; i += 5) { + ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + ASSERT_EQ(0, num_keys); + + for (int i = 0; i < 64; i += 9) { + ASSERT_OK( + Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); + } + + std::vector keys_str; + for (int i = 10; i < 50; ++i) { + keys_str.push_back("key_" + std::to_string(i)); + } + + std::vector keys(keys_str.size()); + for (int i = 0; i < 40; i++) { + keys[i] = Slice(keys_str[i]); + } + + std::vector values(keys_str.size()); + std::vector statuses(keys_str.size()); + ReadOptions read_options; + read_options.verify_checksums = true; + read_options.value_size_soft_limit = 380; + db_->MultiGet(read_options, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + + ASSERT_EQ(values.size(), keys.size()); + + uint64_t curr_value_size = 0; + for (unsigned int j = 0; j < 26; ++j) { + int key = j + 10; + std::string value; + value.append("val_l2_" + std::to_string(key)); + if (key % 3 == 0) { + value.append(","); + value.append("val_l1_" + std::to_string(key)); + } + if (key % 5 == 0) { + value.append(","); + value.append("val_l0_" + std::to_string(key)); + } + if (key % 9 == 0) { + value.append(","); + value.append("val_mem_" + std::to_string(key)); + } + curr_value_size += value.size(); + ASSERT_EQ(values[j], value); + ASSERT_OK(statuses[j]); + } + // ASSERT_TRUE(curr_value_size <= read_options.value_size_hard_limit); + + // All remaning keys status is set Status::Abort + for (unsigned int j = 26; j < 40; j++) { + ASSERT_TRUE(statuses[j].IsAborted()); + } +} + // Test class for batched MultiGet with prefix extractor // Param bool - If true, use partitioned filters // If false, use full filter block diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index d7a040885..c9d7aab3a 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1787,6 +1787,7 @@ std::vector DBImpl::MultiGet( // merge_operands will contain the sequence of merges in the latter case. size_t num_found = 0; size_t keys_read; + uint64_t curr_value_size = 0; for (keys_read = 0; keys_read < num_keys; ++keys_read) { merge_context.Clear(); Status& s = stat_list[keys_read]; @@ -1830,6 +1831,13 @@ std::vector DBImpl::MultiGet( if (s.ok()) { bytes_read += value->size(); num_found++; + curr_value_size += value->size(); + if (curr_value_size > read_options.value_size_soft_limit) { + while (++keys_read < num_keys) { + stat_list[keys_read] = Status::Aborted(); + } + break; + } } if (read_options.deadline.count() && @@ -2084,11 +2092,11 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, } } if (!s.ok()) { - assert(s.IsTimedOut()); + assert(s.IsTimedOut() || s.IsAborted()); for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) { for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys; ++i) { - *sorted_keys[i]->s = Status::TimedOut(); + *sorted_keys[i]->s = s; } } } @@ -2243,7 +2251,7 @@ void DBImpl::MultiGetWithCallback( Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys, multiget_cf_data[0].super_version, consistent_seqnum, nullptr, nullptr); - assert(s.ok() || s.IsTimedOut()); + assert(s.ok() || s.IsTimedOut() || s.IsAborted()); ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd, multiget_cf_data[0].super_version); } @@ -2271,6 +2279,7 @@ Status DBImpl::MultiGetImpl( // merge_operands will contain the sequence of merges in the latter case. size_t keys_left = num_keys; Status s; + uint64_t curr_value_size = 0; while (keys_left) { if (read_options.deadline.count() && env_->NowMicros() > @@ -2285,6 +2294,7 @@ Status DBImpl::MultiGetImpl( MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left, batch_size, snapshot, read_options); MultiGetRange range = ctx.GetMultiGetRange(); + range.AddValueSize(curr_value_size); bool lookup_current = false; keys_left -= batch_size; @@ -2315,6 +2325,11 @@ Status DBImpl::MultiGetImpl( super_version->current->MultiGet(read_options, &range, callback, is_blob_index); } + curr_value_size = range.GetValueSize(); + if (curr_value_size > read_options.value_size_soft_limit) { + s = Status::Aborted(); + break; + } } // Post processing (decrement reference counts and record statistics) @@ -2329,11 +2344,11 @@ Status DBImpl::MultiGetImpl( } } if (keys_left) { - assert(s.IsTimedOut()); + assert(s.IsTimedOut() || s.IsAborted()); for (size_t i = start_key + num_keys - keys_left; i < start_key + num_keys; ++i) { KeyContext* key = (*sorted_keys)[i]; - *key->s = Status::TimedOut(); + *key->s = s; } } diff --git a/db/memtable.cc b/db/memtable.cc index 6e44d5658..82b577ed8 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -929,8 +929,18 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, if (found_final_value) { iter->value->PinSelf(); + range->AddValueSize(iter->value->size()); range->MarkKeyDone(iter); RecordTick(moptions_.statistics, MEMTABLE_HIT); + if (range->GetValueSize() > read_options.value_size_soft_limit) { + // Set all remaining keys in range to Abort + for (auto range_iter = range->begin(); range_iter != range->end(); + ++range_iter) { + range->MarkKeyDone(range_iter); + *(range_iter->s) = Status::Aborted(); + } + break; + } } } PERF_COUNTER_ADD(get_from_memtable_count, 1); diff --git a/db/version_set.cc b/db/version_set.cc index 5a4eab343..38d0e9ed0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1932,6 +1932,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_, user_comparator(), internal_comparator()); FdWithKeyRange* f = fp.GetNextFile(); + Status s; while (f != nullptr) { MultiGetRange file_range = fp.CurrentFileRange(); @@ -1939,7 +1940,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && get_perf_context()->per_level_perf_context_enabled; StopWatchNano timer(env_, timer_enabled /* auto_start */); - Status s = table_cache_->MultiGet( + s = table_cache_->MultiGet( read_options, *internal_comparator(), *f->file_metadata, &file_range, mutable_cf_options_.prefix_extractor.get(), cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), @@ -1960,7 +1961,8 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, return; } uint64_t batch_size = 0; - for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { + for (auto iter = file_range.begin(); s.ok() && iter != file_range.end(); + ++iter) { GetContext& get_context = *iter->get_context; Status* status = iter->s; // The Status in the KeyContext takes precedence over GetContext state @@ -2006,7 +2008,12 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, } PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel()); + file_range.AddValueSize(iter->value->size()); file_range.MarkKeyDone(iter); + if (file_range.GetValueSize() > read_options.value_size_soft_limit) { + s = Status::Aborted(); + break; + } continue; case GetContext::kDeleted: // Use empty error message for speed @@ -2028,14 +2035,14 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, } } RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size); - if (file_picker_range.empty()) { + if (!s.ok() || file_picker_range.empty()) { break; } f = fp.GetNextFile(); } // Process any left over keys - for (auto iter = range->begin(); iter != range->end(); ++iter) { + for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) { GetContext& get_context = *iter->get_context; Status* status = iter->s; Slice user_key = iter->lkey->user_key(); @@ -2060,12 +2067,23 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, nullptr /* result_operand */, true); if (LIKELY(iter->value != nullptr)) { iter->value->PinSelf(); + range->AddValueSize(iter->value->size()); + range->MarkKeyDone(iter); + if (range->GetValueSize() > read_options.value_size_soft_limit) { + s = Status::Aborted(); + break; + } } } else { range->MarkKeyDone(iter); *status = Status::NotFound(); // Use an empty error message for speed } } + + for (auto iter = range->begin(); iter != range->end(); ++iter) { + range->MarkKeyDone(iter); + *(iter->s) = s; + } } bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 10b68ff4e..1404031a0 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1356,6 +1356,13 @@ struct ReadOptions { // processing a batch std::chrono::microseconds deadline; + // It limits the maximum cumulative value size of the keys in batch while + // reading through MultiGet. Once the cumulative value size exceeds this + // soft limit then all the remaining keys are returned with status Aborted. + // + // Default: std::numeric_limits::max() + uint64_t value_size_soft_limit; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/options/options.cc b/options/options.cc index 0e2094d4c..f9983d069 100644 --- a/options/options.cc +++ b/options/options.cc @@ -608,7 +608,8 @@ ReadOptions::ReadOptions() iter_start_seqnum(0), timestamp(nullptr), iter_start_ts(nullptr), - deadline(std::chrono::microseconds::zero()) {} + deadline(std::chrono::microseconds::zero()), + value_size_soft_limit(std::numeric_limits::max()) {} ReadOptions::ReadOptions(bool cksum, bool cache) : snapshot(nullptr), @@ -630,6 +631,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) iter_start_seqnum(0), timestamp(nullptr), iter_start_ts(nullptr), - deadline(std::chrono::microseconds::zero()) {} + deadline(std::chrono::microseconds::zero()), + value_size_soft_limit(std::numeric_limits::max()) {} } // namespace ROCKSDB_NAMESPACE diff --git a/table/multiget_context.h b/table/multiget_context.h index fdf72771d..964544d07 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -97,6 +97,7 @@ class MultiGetContext { const ReadOptions& read_opts) : num_keys_(num_keys), value_mask_(0), + value_size_(0), lookup_key_ptr_(reinterpret_cast(lookup_key_stack_buf)) { if (num_keys > MAX_LOOKUP_KEYS_ON_STACK) { lookup_key_heap_buf.reset(new char[sizeof(LookupKey) * num_keys]); @@ -127,6 +128,7 @@ class MultiGetContext { std::array sorted_keys_; size_t num_keys_; uint64_t value_mask_; + uint64_t value_size_; std::unique_ptr lookup_key_heap_buf; LookupKey* lookup_key_ptr_; @@ -243,6 +245,10 @@ class MultiGetContext { skip_mask_ |= other.skip_mask_; } + uint64_t GetValueSize() { return ctx_->value_size_; } + + void AddValueSize(uint64_t value_size) { ctx_->value_size_ += value_size; } + private: friend MultiGetContext; MultiGetContext* ctx_;