From fefd4b98c572a09da47fb100da9e1472d28c0d08 Mon Sep 17 00:00:00 2001 From: anand76 Date: Thu, 11 Apr 2019 14:24:09 -0700 Subject: [PATCH] Introduce a new MultiGet batching implementation (#5011) Summary: This PR introduces a new MultiGet() API, with the underlying implementation grouping keys based on SST file and batching lookups in a file. The reason for the new API is twofold - the definition allows callers to allocate storage for status and values on stack instead of std::vector, as well as return values as PinnableSlices in order to avoid copying, and it keeps the original MultiGet() implementation intact while we experiment with batching. Batching is useful when there is some spatial locality to the keys being queries, as well as larger batch sizes. The main benefits are due to - 1. Fewer function calls, especially to BlockBasedTableReader::MultiGet() and FullFilterBlockReader::KeysMayMatch() 2. Bloom filter cachelines can be prefetched, hiding the cache miss latency The next step is to optimize the binary searches in the level_storage_info, index blocks and data blocks, since we could reduce the number of key comparisons if the keys are relatively close to each other. The batching optimizations also need to be extended to other formats, such as PlainTable and filter formats. This also needs to be added to db_stress. Benchmark results from db_bench for various batch size/locality of reference combinations are given below. Locality was simulated by offsetting the keys in a batch by a stride length. Each SST file is about 8.6MB uncompressed and key/value size is 16/100 uncompressed. To focus on the cpu benefit of batching, the runs were single threaded and bound to the same cpu to eliminate interference from other system events. The results show a 10-25% improvement in micros/op from smaller to larger batch sizes (4 - 32). Batch Sizes 1 | 2 | 4 | 8 | 16 | 32 Random pattern (Stride length 0) 4.158 | 4.109 | 4.026 | 4.05 | 4.1 | 4.074 - Get 4.438 | 4.302 | 4.165 | 4.122 | 4.096 | 4.075 - MultiGet (no batching) 4.461 | 4.256 | 4.277 | 4.11 | 4.182 | 4.14 - MultiGet (w/ batching) Good locality (Stride length 16) 4.048 | 3.659 | 3.248 | 2.99 | 2.84 | 2.753 4.429 | 3.728 | 3.406 | 3.053 | 2.911 | 2.781 4.452 | 3.45 | 2.833 | 2.451 | 2.233 | 2.135 Good locality (Stride length 256) 4.066 | 3.786 | 3.581 | 3.447 | 3.415 | 3.232 4.406 | 4.005 | 3.644 | 3.49 | 3.381 | 3.268 4.393 | 3.649 | 3.186 | 2.882 | 2.676 | 2.62 Medium locality (Stride length 4096) 4.012 | 3.922 | 3.768 | 3.61 | 3.582 | 3.555 4.364 | 4.057 | 3.791 | 3.65 | 3.57 | 3.465 4.479 | 3.758 | 3.316 | 3.077 | 2.959 | 2.891 dbbench command used (on a DB with 4 levels, 12 million keys)- TEST_TMPDIR=/dev/shm numactl -C 10 ./db_bench.tmp -use_existing_db=true -benchmarks="readseq,multireadrandom" -write_buffer_size=4194304 -target_file_size_base=4194304 -max_bytes_for_level_base=16777216 -num=12000000 -reads=12000000 -duration=90 -threads=1 -compression_type=none -cache_size=4194304000 -batch_size=32 -disable_auto_compactions=true -bloom_bits=10 -cache_index_and_filter_blocks=true -pin_l0_filter_and_index_blocks_in_cache=true -multiread_batched=true -multiread_stride=4 Pull Request resolved: https://github.com/facebook/rocksdb/pull/5011 Differential Revision: D14348703 Pulled By: anand1976 fbshipit-source-id: 774406dab3776d979c809522a67bedac6c17f84b --- db/db_basic_test.cc | 77 +++- db/db_impl.cc | 185 ++++++++ db/db_impl.h | 19 + db/db_test.cc | 41 +- db/db_test_util.cc | 28 ++ db/db_test_util.h | 3 + db/dbformat.h | 49 +- db/lookup_key.h | 65 +++ db/merge_context.h | 3 +- db/table_cache.cc | 61 +++ db/table_cache.h | 8 + db/version_set.cc | 560 +++++++++++++++++++++++ db/version_set.h | 6 + include/rocksdb/db.h | 40 ++ include/rocksdb/filter_policy.h | 10 + include/rocksdb/statistics.h | 3 + include/rocksdb/utilities/stackable_db.h | 9 + monitoring/statistics.cc | 2 + table/block_based_table_reader.cc | 165 ++++++- table/block_based_table_reader.h | 12 + table/filter_block.h | 27 ++ table/full_filter_block.cc | 53 +++ table/full_filter_block.h | 11 + table/full_filter_block_test.cc | 2 + table/get_context.h | 3 + table/multiget_context.h | 249 ++++++++++ table/table_reader.h | 13 + tools/db_bench_tool.cc | 85 +++- util/bloom.cc | 71 ++- util/fault_injection_test_env.h | 2 + utilities/blob_db/blob_db.h | 10 + 31 files changed, 1780 insertions(+), 92 deletions(-) create mode 100644 db/lookup_key.h create mode 100644 table/multiget_context.h diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index c93a5e436..5d016f5f0 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -994,7 +994,7 @@ TEST_F(DBBasicTest, MultiGetMultiCF) { keys.push_back("cf" + std::to_string(i) + "_key"); } - values = MultiGet(cfs, keys); + values = MultiGet(cfs, keys, nullptr); ASSERT_EQ(values.size(), 8); for (unsigned int j = 0; j < values.size(); ++j) { ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val2"); @@ -1054,7 +1054,7 @@ TEST_F(DBBasicTest, MultiGetMultiCFMutex) { keys.push_back("cf" + std::to_string(i) + "_key"); } - values = MultiGet(cfs, keys); + values = MultiGet(cfs, keys, nullptr); ASSERT_TRUE(last_try); ASSERT_EQ(values.size(), 8); for (unsigned int j = 0; j < values.size(); ++j) { @@ -1128,6 +1128,79 @@ TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) { } } +TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + Reopen(options); + int num_keys = 0; + + for (int i = 0; i < 128; ++i) { + ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + MoveFilesToLevel(2); + + for (int i = 0; i < 128; i += 3) { + ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + MoveFilesToLevel(1); + + for (int i = 0; i < 128; i += 5) { + ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + + for (int i = 0; i < 128; i += 9) { + ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); + } + + std::vector keys; + std::vector values; + + for (int i = 64; i < 80; ++i) { + keys.push_back("key_" + std::to_string(i)); + } + + values = MultiGet(keys, nullptr); + ASSERT_EQ(values.size(), 16); + for (unsigned int j = 0; j < values.size(); ++j) { + int key = j + 64; + if (key % 9 == 0) { + ASSERT_EQ(values[j], "val_mem_" + std::to_string(key)); + } else if (key % 5 == 0) { + ASSERT_EQ(values[j], "val_l0_" + std::to_string(key)); + } else if (key % 3 == 0) { + ASSERT_EQ(values[j], "val_l1_" + std::to_string(key)); + } else { + ASSERT_EQ(values[j], "val_l2_" + std::to_string(key)); + } + } +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 8180564c2..97cd2ac79 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -76,7 +76,9 @@ #include "rocksdb/write_buffer_manager.h" #include "table/block.h" #include "table/block_based_table_factory.h" +#include "table/get_context.h" #include "table/merging_iterator.h" +#include "table/multiget_context.h" #include "table/table_builder.h" #include "table/two_level_iterator.h" #include "tools/sst_dump_tool_imp.h" @@ -1659,6 +1661,189 @@ std::vector DBImpl::MultiGet( return stat_list; } +// Order keys by CF ID, followed by key contents +struct CompareKeyContext { + inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) { + const Comparator* comparator = cfd->user_comparator(); + int cmp = comparator->Compare(*(lhs->key), *(rhs->key)); + if (cmp < 0) { + return true; + } + return false; + } + const ColumnFamilyData* cfd; +}; + +void DBImpl::MultiGet(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const size_t num_keys, + const Slice* keys, PinnableSlice* values, + Status* statuses, const bool sorted_input) { + autovector key_context; + for (size_t i = 0; i < num_keys; ++i) { + key_context.emplace_back(keys[i], &values[i], &statuses[i]); + } + + MultiGetImpl(read_options, column_family, key_context, sorted_input, nullptr, + nullptr); +} + +void DBImpl::MultiGetImpl( + const ReadOptions& read_options, ColumnFamilyHandle* column_family, + autovector& key_context, + bool sorted_input, ReadCallback* callback, bool* is_blob_index) { + PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); + StopWatch sw(env_, stats_, DB_MULTIGET); + size_t num_keys = key_context.size(); + + PERF_TIMER_GUARD(get_snapshot_time); + + ColumnFamilyHandleImpl* cfh = + reinterpret_cast(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + + autovector sorted_keys; + sorted_keys.resize(num_keys); + { + size_t index = 0; + for (KeyContext& key : key_context) { +#ifndef NDEBUG + if (index > 0) { + KeyContext* lhs = &key_context[index-1]; + KeyContext* rhs = &key_context[index]; + const Comparator* comparator = cfd->user_comparator(); + int cmp = comparator->Compare(*(lhs->key), *(rhs->key)); + assert(cmp <= 0); + } +#endif + + sorted_keys[index] = &key; + index++; + } + if (!sorted_input) { + CompareKeyContext sort_comparator; + sort_comparator.cfd = cfd; + std::sort(sorted_keys.begin(), sorted_keys.begin() + index, + sort_comparator); + } + } + + // Keep track of bytes that we read for statistics-recording later + PERF_TIMER_STOP(get_snapshot_time); + + // Acquire SuperVersion + SuperVersion* super_version = GetAndRefSuperVersion(cfd); + SequenceNumber snapshot; + if (read_options.snapshot != nullptr) { + // Note: In WritePrepared txns this is not necessary but not harmful + // either. Because prep_seq > snapshot => commit_seq > snapshot so if + // a snapshot is specified we should be fine with skipping seq numbers + // that are greater than that. + // + // In WriteUnprepared, we cannot set snapshot in the lookup key because we + // may skip uncommitted data that should be visible to the transaction for + // reading own writes. + snapshot = + reinterpret_cast(read_options.snapshot)->number_; + if (callback) { + snapshot = std::max(snapshot, callback->max_visible_seq()); + } + } else { + // Since we get and reference the super version before getting + // the snapshot number, without a mutex protection, it is possible + // that a memtable switch happened in the middle and not all the + // data for this snapshot is available. But it will contain all + // the data available in the super version we have, which is also + // a valid snapshot to read from. + // We shouldn't get snapshot before finding and referencing the super + // version because a flush happening in between may compact away data for + // the snapshot, but the snapshot is earlier than the data overwriting it, + // so users may see wrong results. + snapshot = last_seq_same_as_publish_seq_ + ? versions_->LastSequence() + : versions_->LastPublishedSequence(); + } + + // For each of the given keys, apply the entire "get" process as follows: + // First look in the memtable, then in the immutable memtable (if any). + // s is both in/out. When in, s could either be OK or MergeInProgress. + // merge_operands will contain the sequence of merges in the latter case. + size_t keys_left = num_keys; + while (keys_left) { + size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE) + ? MultiGetContext::MAX_BATCH_SIZE + : keys_left; + MultiGetContext ctx(&sorted_keys[num_keys - keys_left], batch_size, + snapshot); + MultiGetRange range = ctx.GetMultiGetRange(); + bool lookup_current = false; + + keys_left -= batch_size; + for (auto mget_iter = range.begin(); mget_iter != range.end(); + ++mget_iter) { + MergeContext& merge_context = mget_iter->merge_context; + merge_context.Clear(); + Status& s = *mget_iter->s; + PinnableSlice* value = mget_iter->value; + s = Status::OK(); + + bool skip_memtable = + (read_options.read_tier == kPersistedTier && + has_unpersisted_data_.load(std::memory_order_relaxed)); + bool done = false; + if (!skip_memtable) { + if (super_version->mem->Get(*(mget_iter->lkey), value->GetSelf(), &s, + &merge_context, + &mget_iter->max_covering_tombstone_seq, + read_options, callback, is_blob_index)) { + done = true; + value->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } else if (super_version->imm->Get( + *(mget_iter->lkey), value->GetSelf(), &s, &merge_context, + &mget_iter->max_covering_tombstone_seq, read_options, + callback, is_blob_index)) { + done = true; + value->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } + } + if (done) { + range.MarkKeyDone(mget_iter); + } else { + RecordTick(stats_, MEMTABLE_MISS); + lookup_current = true; + } + } + + if (lookup_current) { + PERF_TIMER_GUARD(get_from_output_files_time); + super_version->current->MultiGet(read_options, &range, callback, + is_blob_index); + } + } + + // Post processing (decrement reference counts and record statistics) + PERF_TIMER_GUARD(get_post_process_time); + size_t num_found = 0; + uint64_t bytes_read = 0; + for (KeyContext& key : key_context) { + if (key.s->ok()) { + bytes_read += key.value->size(); + num_found++; + } + } + + ReturnAndCleanupSuperVersion(cfd, super_version); + + RecordTick(stats_, NUMBER_MULTIGET_CALLS); + RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys); + RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found); + RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read); + RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read); + PERF_COUNTER_ADD(multiget_read_bytes, bytes_read); + PERF_TIMER_STOP(get_post_process_time); +} + Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family, ColumnFamilyHandle** handle) { diff --git a/db/db_impl.h b/db/db_impl.h index e834e0fbe..8b434c118 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -124,6 +124,25 @@ class DBImpl : public DB { const std::vector& keys, std::vector* values) override; + // This MultiGet is a batched version, which may be faster than calling Get + // multiple times, especially if the keys have some spatial locality that + // enables them to be queried in the same SST files/set of files. The larger + // the batch size, the more scope for batching and performance improvement + // The values and statuses parameters are arrays with number of elements + // equal to keys.size(). This allows the storage for those to be alloacted + // by the caller on the stack for small batches + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + const bool sorted_input = false) override; + + void MultiGetImpl( + const ReadOptions& options, ColumnFamilyHandle* column_family, + autovector& key_context, + bool sorted_input, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); + virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family, ColumnFamilyHandle** handle) override; diff --git a/db/db_test.cc b/db/db_test.cc index 60e66c6c3..8a112e48f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2153,6 +2153,7 @@ struct MTState { struct MTThread { MTState* state; int id; + bool multiget_batched; }; static void MTThreadBody(void* arg) { @@ -2201,8 +2202,28 @@ static void MTThreadBody(void* arg) { // same) std::vector keys(kColumnFamilies, Slice(keybuf)); std::vector values; - std::vector statuses = - db->MultiGet(ReadOptions(), t->state->test->handles_, keys, &values); + std::vector statuses; + if (!t->multiget_batched) { + statuses = db->MultiGet(ReadOptions(), t->state->test->handles_, keys, + &values); + } else { + std::vector pin_values(keys.size()); + statuses.resize(keys.size()); + const Snapshot* snapshot = db->GetSnapshot(); + ReadOptions ro; + ro.snapshot = snapshot; + for (int cf = 0; cf < kColumnFamilies; ++cf) { + db->MultiGet(ro, t->state->test->handles_[cf], 1, &keys[cf], + &pin_values[cf], &statuses[cf]); + } + db->ReleaseSnapshot(snapshot); + values.resize(keys.size()); + for (int cf = 0; cf < kColumnFamilies; ++cf) { + if (statuses[cf].ok()) { + values[cf].assign(pin_values[cf].data(), pin_values[cf].size()); + } + } + } Status s = statuses[0]; // all statuses have to be the same for (size_t i = 1; i < statuses.size(); ++i) { @@ -2244,10 +2265,13 @@ static void MTThreadBody(void* arg) { } // namespace -class MultiThreadedDBTest : public DBTest, - public ::testing::WithParamInterface { +class MultiThreadedDBTest + : public DBTest, + public ::testing::WithParamInterface> { public: - void SetUp() override { option_config_ = GetParam(); } + void SetUp() override { + std::tie(option_config_, multiget_batched_) = GetParam(); + } static std::vector GenerateOptionConfigs() { std::vector optionConfigs; @@ -2256,6 +2280,8 @@ class MultiThreadedDBTest : public DBTest, } return optionConfigs; } + + bool multiget_batched_; }; TEST_P(MultiThreadedDBTest, MultiThreaded) { @@ -2282,6 +2308,7 @@ TEST_P(MultiThreadedDBTest, MultiThreaded) { for (int id = 0; id < kNumThreads; id++) { thread[id].state = &mt; thread[id].id = id; + thread[id].multiget_batched = multiget_batched_; env_->StartThread(MTThreadBody, &thread[id]); } @@ -2299,7 +2326,9 @@ TEST_P(MultiThreadedDBTest, MultiThreaded) { INSTANTIATE_TEST_CASE_P( MultiThreaded, MultiThreadedDBTest, - ::testing::ValuesIn(MultiThreadedDBTest::GenerateOptionConfigs())); + ::testing::Combine( + ::testing::ValuesIn(MultiThreadedDBTest::GenerateOptionConfigs()), + ::testing::Bool())); #endif // ROCKSDB_LITE // Group commit test: diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 9ef82fd2e..bee6b81d5 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -780,6 +780,34 @@ std::vector DBTestBase::MultiGet(std::vector cfs, return result; } +std::vector DBTestBase::MultiGet(const std::vector& k, + const Snapshot* snapshot) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::vector keys; + std::vector result; + std::vector statuses(k.size()); + std::vector pin_values(k.size()); + + for (unsigned int i = 0; i < k.size(); ++i) { + keys.push_back(k[i]); + } + db_->MultiGet(options, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), pin_values.data(), statuses.data()); + result.resize(k.size()); + for (auto iter = result.begin(); iter != result.end(); ++iter) { + iter->assign(pin_values[iter - result.begin()].data(), + pin_values[iter - result.begin()].size()); + } + for (unsigned int i = 0; i < statuses.size(); ++i) { + if (statuses[i].IsNotFound()) { + result[i] = "NOT_FOUND"; + } + } + return result; +} + Status DBTestBase::Get(const std::string& k, PinnableSlice* v) { ReadOptions options; options.verify_checksums = true; diff --git a/db/db_test_util.h b/db/db_test_util.h index 1ba1f0a96..36f3813c9 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -842,6 +842,9 @@ class DBTestBase : public testing::Test { const std::vector& k, const Snapshot* snapshot = nullptr); + std::vector MultiGet(const std::vector& k, + const Snapshot* snapshot = nullptr); + uint64_t GetNumSnapshots(); uint64_t GetTimeOldestSnapshots(); diff --git a/db/dbformat.h b/db/dbformat.h index 7a5ddc1ad..c850adcb0 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -9,8 +9,11 @@ #pragma once #include +#include #include #include +#include "db/lookup_key.h" +#include "db/merge_context.h" #include "monitoring/perf_context_imp.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" @@ -292,52 +295,6 @@ inline uint64_t GetInternalKeySeqno(const Slice& internal_key) { return num >> 8; } -// A helper class useful for DBImpl::Get() -class LookupKey { - public: - // Initialize *this for looking up user_key at a snapshot with - // the specified sequence number. - LookupKey(const Slice& _user_key, SequenceNumber sequence); - - ~LookupKey(); - - // Return a key suitable for lookup in a MemTable. - Slice memtable_key() const { - return Slice(start_, static_cast(end_ - start_)); - } - - // Return an internal key (suitable for passing to an internal iterator) - Slice internal_key() const { - return Slice(kstart_, static_cast(end_ - kstart_)); - } - - // Return the user key - Slice user_key() const { - return Slice(kstart_, static_cast(end_ - kstart_ - 8)); - } - - private: - // We construct a char array of the form: - // klength varint32 <-- start_ - // userkey char[klength] <-- kstart_ - // tag uint64 - // <-- end_ - // The array is a suitable MemTable key. - // The suffix starting with "userkey" can be used as an InternalKey. - const char* start_; - const char* kstart_; - const char* end_; - char space_[200]; // Avoid allocation for short keys - - // No copying allowed - LookupKey(const LookupKey&); - void operator=(const LookupKey&); -}; - -inline LookupKey::~LookupKey() { - if (start_ != space_) delete[] start_; -} - class IterKey { public: IterKey() diff --git a/db/lookup_key.h b/db/lookup_key.h new file mode 100644 index 000000000..ddf4ff0e9 --- /dev/null +++ b/db/lookup_key.h @@ -0,0 +1,65 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "rocksdb/types.h" + +namespace rocksdb { + +// A helper class useful for DBImpl::Get() +class LookupKey { + public: + // Initialize *this for looking up user_key at a snapshot with + // the specified sequence number. + LookupKey(const Slice& _user_key, SequenceNumber sequence); + + ~LookupKey(); + + // Return a key suitable for lookup in a MemTable. + Slice memtable_key() const { + return Slice(start_, static_cast(end_ - start_)); + } + + // Return an internal key (suitable for passing to an internal iterator) + Slice internal_key() const { + return Slice(kstart_, static_cast(end_ - kstart_)); + } + + // Return the user key + Slice user_key() const { + return Slice(kstart_, static_cast(end_ - kstart_ - 8)); + } + + private: + // We construct a char array of the form: + // klength varint32 <-- start_ + // userkey char[klength] <-- kstart_ + // tag uint64 + // <-- end_ + // The array is a suitable MemTable key. + // The suffix starting with "userkey" can be used as an InternalKey. + const char* start_; + const char* kstart_; + const char* end_; + char space_[200]; // Avoid allocation for short keys + + // No copying allowed + LookupKey(const LookupKey&); + void operator=(const LookupKey&); +}; + +inline LookupKey::~LookupKey() { + if (start_ != space_) delete[] start_; +} + +} // namespace rocksdb diff --git a/db/merge_context.h b/db/merge_context.h index fd06441f7..884682eec 100644 --- a/db/merge_context.h +++ b/db/merge_context.h @@ -4,9 +4,10 @@ // (found in the LICENSE.Apache file in the root directory). // #pragma once +#include +#include #include #include -#include "db/dbformat.h" #include "rocksdb/slice.h" namespace rocksdb { diff --git a/db/table_cache.cc b/db/table_cache.cc index 764c05bfa..2eb742e24 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -19,6 +19,7 @@ #include "table/get_context.h" #include "table/internal_iterator.h" #include "table/iterator_wrapper.h" +#include "table/multiget_context.h" #include "table/table_builder.h" #include "table/table_reader.h" #include "util/coding.h" @@ -418,6 +419,66 @@ Status TableCache::Get(const ReadOptions& options, return s; } +// Batched version of TableCache::MultiGet. +// TODO: Add support for row cache. As of now, this ignores the row cache +// and directly looks up in the table files +Status TableCache::MultiGet(const ReadOptions& options, + const InternalKeyComparator& internal_comparator, + const FileMetaData& file_meta, + const MultiGetContext::Range* mget_range, + const SliceTransform* prefix_extractor, + HistogramImpl* file_read_hist, bool skip_filters, + int level) { + auto& fd = file_meta.fd; + Status s; + TableReader* t = fd.table_reader; + Cache::Handle* handle = nullptr; + if (s.ok()) { + if (t == nullptr) { + s = FindTable( + env_options_, internal_comparator, fd, &handle, prefix_extractor, + options.read_tier == kBlockCacheTier /* no_io */, + true /* record_read_stats */, file_read_hist, skip_filters, level); + if (s.ok()) { + t = GetTableReaderFromHandle(handle); + assert(t); + } + } + if (s.ok() && !options.ignore_range_deletions) { + std::unique_ptr range_del_iter( + t->NewRangeTombstoneIterator(options)); + if (range_del_iter != nullptr) { + for (auto iter = mget_range->begin(); iter != mget_range->end(); + ++iter) { + const Slice& k = iter->ikey; + SequenceNumber* max_covering_tombstone_seq = + iter->get_context->max_covering_tombstone_seq(); + *max_covering_tombstone_seq = std::max( + *max_covering_tombstone_seq, + range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k))); + } + } + } + if (s.ok()) { + t->MultiGet(options, mget_range, prefix_extractor, skip_filters); + } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) { + for (auto iter = mget_range->begin(); iter != mget_range->end(); ++iter) { + Status* status = iter->s; + if (status->IsIncomplete()) { + // Couldn't find Table in cache but treat as kFound if no_io set + iter->get_context->MarkKeyMayExist(); + s = Status::OK(); + } + } + } + } + + if (handle != nullptr) { + ReleaseHandle(handle); + } + return s; +} + Status TableCache::GetTableProperties( const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, diff --git a/db/table_cache.h b/db/table_cache.h index 180ebc6bd..1e96dfa1b 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -76,6 +76,14 @@ class TableCache { HistogramImpl* file_read_hist = nullptr, bool skip_filters = false, int level = -1); + Status MultiGet(const ReadOptions& options, + const InternalKeyComparator& internal_comparator, + const FileMetaData& file_meta, + const MultiGetContext::Range* mget_range, + const SliceTransform* prefix_extractor = nullptr, + HistogramImpl* file_read_hist = nullptr, + bool skip_filters = false, int level = -1); + // Evict any entry for the specified file number static void Evict(Cache* cache, uint64_t file_number); diff --git a/db/version_set.cc b/db/version_set.cc index 4e8026e96..08286c41a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,7 @@ #include "table/internal_iterator.h" #include "table/merging_iterator.h" #include "table/meta_blocks.h" +#include "table/multiget_context.h" #include "table/plain_table_factory.h" #include "table/table_reader.h" #include "table/two_level_iterator.h" @@ -345,6 +347,407 @@ class FilePicker { return false; } }; + +class FilePickerMultiGet { + private: + struct FilePickerContext; + + public: + FilePickerMultiGet(std::vector* files, MultiGetRange* range, + autovector* file_levels, + unsigned int num_levels, FileIndexer* file_indexer, + const Comparator* user_comparator, + const InternalKeyComparator* internal_comparator) + : num_levels_(num_levels), + curr_level_(static_cast(-1)), + returned_file_level_(static_cast(-1)), + hit_file_level_(static_cast(-1)), + range_(range), + batch_iter_(range->begin()), + batch_iter_prev_(range->begin()), + maybe_repeat_key_(false), + current_level_range_(*range, range->begin(), range->end()), + current_file_range_(*range, range->begin(), range->end()), +#ifndef NDEBUG + files_(files), +#endif + level_files_brief_(file_levels), + is_hit_file_last_in_level_(false), + curr_file_level_(nullptr), + file_indexer_(file_indexer), + user_comparator_(user_comparator), + internal_comparator_(internal_comparator) { +#ifdef NDEBUG + (void)files; +#endif + for (auto iter = range_->begin(); iter != range_->end(); ++iter) { + fp_ctx_array_[iter.index()] = + FilePickerContext(0, FileIndexer::kLevelMaxIndex); + } + + // Setup member variables to search first level. + search_ended_ = !PrepareNextLevel(); + if (!search_ended_) { + // REVISIT + // Prefetch Level 0 table data to avoid cache miss if possible. + // As of now, only PlainTableReader and CuckooTableReader do any + // prefetching. This may not be necessary anymore once we implement + // batching in those table readers + for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) { + auto* r = (*level_files_brief_)[0].files[i].fd.table_reader; + if (r) { + for (auto iter = range_->begin(); iter != range_->end(); ++iter) { + r->Prepare(iter->ikey); + } + } + } + } + } + + int GetCurrentLevel() const { return curr_level_; } + + // Iterates through files in the current level until it finds a file that + // contains atleast one key from the MultiGet batch + bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range, + size_t* file_index, FdWithKeyRange** fd, + bool* is_last_key_in_file) { + size_t curr_file_index = *file_index; + FdWithKeyRange* f = nullptr; + bool file_hit = false; + int cmp_largest = -1; + if (curr_file_index >= curr_file_level_->num_files) { + return false; + } + // Loops over keys in the MultiGet batch until it finds a file with + // atleast one of the keys. Then it keeps moving forward until the + // last key in the batch that falls in that file + while (batch_iter_ != current_level_range_.end() && + (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level == + curr_file_index || + !file_hit)) { + struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()]; + f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level]; + Slice& user_key = batch_iter_->ukey; + + // Do key range filtering of files or/and fractional cascading if: + // (1) not all the files are in level 0, or + // (2) there are more than 3 current level files + // If there are only 3 or less current level files in the system, we + // skip the key range filtering. In this case, more likely, the system + // is highly tuned to minimize number of tables queried by each query, + // so it is unlikely that key range filtering is more efficient than + // querying the files. + if (num_levels_ > 1 || curr_file_level_->num_files > 3) { + // Check if key is within a file's range. If search left bound and + // right bound point to the same find, we are sure key falls in + // range. + assert(curr_level_ == 0 || + fp_ctx.curr_index_in_curr_level == + fp_ctx.start_index_in_curr_level || + user_comparator_->Compare(user_key, + ExtractUserKey(f->smallest_key)) <= 0); + + int cmp_smallest = user_comparator_->Compare( + user_key, ExtractUserKey(f->smallest_key)); + if (cmp_smallest >= 0) { + cmp_largest = user_comparator_->Compare( + user_key, ExtractUserKey(f->largest_key)); + } else { + cmp_largest = -1; + } + + // Setup file search bound for the next level based on the + // comparison results + if (curr_level_ > 0) { + file_indexer_->GetNextLevelIndex( + curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest, + cmp_largest, &fp_ctx.search_left_bound, + &fp_ctx.search_right_bound); + } + // Key falls out of current file's range + if (cmp_smallest < 0 || cmp_largest > 0) { + next_file_range->SkipKey(batch_iter_); + } else { + file_hit = true; + } + } else { + file_hit = true; + } +#ifndef NDEBUG + // Sanity check to make sure that the files are correctly sorted + if (f != prev_file_) { + if (prev_file_) { + if (curr_level_ != 0) { + int comp_sign = internal_comparator_->Compare( + prev_file_->largest_key, f->smallest_key); + assert(comp_sign < 0); + } else if (fp_ctx.curr_index_in_curr_level > 0) { + // level == 0, the current file cannot be newer than the previous + // one. Use compressed data structure, has no attribute seqNo + assert(!NewestFirstBySeqNo( + files_[0][fp_ctx.curr_index_in_curr_level], + files_[0][fp_ctx.curr_index_in_curr_level - 1])); + } + } + prev_file_ = f; + } +#endif + if (cmp_largest == 0) { + // cmp_largest is 0, which means the next key will not be in this + // file, so stop looking further. Also don't increment megt_iter_ + // as we may have to look for this key in the next file if we don't + // find it in this one + break; + } else { + if (curr_level_ == 0) { + // We need to look through all files in level 0 + ++fp_ctx.curr_index_in_curr_level; + } + ++batch_iter_; + } + if (!file_hit) { + curr_file_index = + (batch_iter_ != current_level_range_.end()) + ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level + : curr_file_level_->num_files; + } + } + + *fd = f; + *file_index = curr_file_index; + *is_last_key_in_file = cmp_largest == 0; + return file_hit; + } + + FdWithKeyRange* GetNextFile() { + while (!search_ended_) { + // Start searching next level. + if (batch_iter_ == current_level_range_.end()) { + search_ended_ = !PrepareNextLevel(); + continue; + } else { + if (maybe_repeat_key_) { + maybe_repeat_key_ = false; + // Check if we found the final value for the last key in the + // previous lookup range. If we did, then there's no need to look + // any further for that key, so advance batch_iter_. Else, keep + // batch_iter_ positioned on that key so we look it up again in + // the next file + if (current_level_range_.CheckKeyDone(batch_iter_)) { + ++batch_iter_; + } + } + // batch_iter_prev_ will become the start key for the next file + // lookup + batch_iter_prev_ = batch_iter_; + } + + MultiGetRange next_file_range(current_level_range_, batch_iter_prev_, + current_level_range_.end()); + size_t curr_file_index = + (batch_iter_ != current_level_range_.end()) + ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level + : curr_file_level_->num_files; + FdWithKeyRange* f; + bool is_last_key_in_file; + if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f, + &is_last_key_in_file)) { + search_ended_ = !PrepareNextLevel(); + } else { + MultiGetRange::Iterator upper_key = batch_iter_; + if (is_last_key_in_file) { + // Since cmp_largest is 0, batch_iter_ still points to the last key + // that falls in this file, instead of the next one. Increment + // upper_key so we can set the range properly for SST MultiGet + ++upper_key; + ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level); + maybe_repeat_key_ = true; + } + // Set the range for this file + current_file_range_ = + MultiGetRange(next_file_range, batch_iter_prev_, upper_key); + returned_file_level_ = curr_level_; + hit_file_level_ = curr_level_; + is_hit_file_last_in_level_ = + curr_file_index == curr_file_level_->num_files - 1; + return f; + } + } + + // Search ended + return nullptr; + } + + // getter for current file level + // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts + unsigned int GetHitFileLevel() { return hit_file_level_; } + + // Returns true if the most recent "hit file" (i.e., one returned by + // GetNextFile()) is at the last index in its level. + bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } + + const MultiGetRange& CurrentFileRange() { return current_file_range_; } + + private: + unsigned int num_levels_; + unsigned int curr_level_; + unsigned int returned_file_level_; + unsigned int hit_file_level_; + + struct FilePickerContext { + int32_t search_left_bound; + int32_t search_right_bound; + unsigned int curr_index_in_curr_level; + unsigned int start_index_in_curr_level; + + FilePickerContext(int32_t left, int32_t right) + : search_left_bound(left), search_right_bound(right) {} + + FilePickerContext() = default; + }; + std::array fp_ctx_array_; + MultiGetRange* range_; + // Iterator to iterate through the keys in a MultiGet batch, that gets reset + // at the beginning of each level. Each call to GetNextFile() will position + // batch_iter_ at or right after the last key that was found in the returned + // SST file + MultiGetRange::Iterator batch_iter_; + // An iterator that records the previous position of batch_iter_, i.e last + // key found in the previous SST file, in order to serve as the start of + // the batch key range for the next SST file + MultiGetRange::Iterator batch_iter_prev_; + bool maybe_repeat_key_; + MultiGetRange current_level_range_; + MultiGetRange current_file_range_; +#ifndef NDEBUG + std::vector* files_; +#endif + autovector* level_files_brief_; + bool search_ended_; + bool is_hit_file_last_in_level_; + LevelFilesBrief* curr_file_level_; + FileIndexer* file_indexer_; + const Comparator* user_comparator_; + const InternalKeyComparator* internal_comparator_; +#ifndef NDEBUG + FdWithKeyRange* prev_file_; +#endif + + // Setup local variables to search next level. + // Returns false if there are no more levels to search. + bool PrepareNextLevel() { + if (curr_level_ == 0) { + MultiGetRange::Iterator mget_iter = current_level_range_.begin(); + if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level < + curr_file_level_->num_files) { +#ifndef NDEBUG + prev_file_ = nullptr; +#endif + batch_iter_prev_ = current_level_range_.begin(); + batch_iter_ = current_level_range_.begin(); + return true; + } + } + + curr_level_++; + // Reset key range to saved value + while (curr_level_ < num_levels_) { + bool level_contains_keys = false; + curr_file_level_ = &(*level_files_brief_)[curr_level_]; + if (curr_file_level_->num_files == 0) { + // When current level is empty, the search bound generated from upper + // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is + // also empty. + + for (auto mget_iter = current_level_range_.begin(); + mget_iter != current_level_range_.end(); ++mget_iter) { + struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()]; + + assert(fp_ctx.search_left_bound == 0); + assert(fp_ctx.search_right_bound == -1 || + fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex); + // Since current level is empty, it will need to search all files in + // the next level + fp_ctx.search_left_bound = 0; + fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex; + } + // Skip all subsequent empty levels + while ((*level_files_brief_)[++curr_level_].num_files == 0) { + } + } + + // Some files may overlap each other. We find + // all files that overlap user_key and process them in order from + // newest to oldest. In the context of merge-operator, this can occur at + // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes + // are always compacted into a single entry). + int32_t start_index = -1; + current_level_range_ = + MultiGetRange(*range_, range_->begin(), range_->end()); + for (auto mget_iter = current_level_range_.begin(); + mget_iter != current_level_range_.end(); ++mget_iter) { + struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()]; + if (curr_level_ == 0) { + // On Level-0, we read through all files to check for overlap. + start_index = 0; + level_contains_keys = true; + } else { + // On Level-n (n>=1), files are sorted. Binary search to find the + // earliest file whose largest key >= ikey. Search left bound and + // right bound are used to narrow the range. + if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) { + if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) { + fp_ctx.search_right_bound = + static_cast(curr_file_level_->num_files) - 1; + } + // `search_right_bound_` is an inclusive upper-bound, but since it + // was determined based on user key, it is still possible the lookup + // key falls to the right of `search_right_bound_`'s corresponding + // file. So, pass a limit one higher, which allows us to detect this + // case. + Slice& ikey = mget_iter->ikey; + start_index = FindFileInRange( + *internal_comparator_, *curr_file_level_, ikey, + static_cast(fp_ctx.search_left_bound), + static_cast(fp_ctx.search_right_bound) + 1); + if (start_index == fp_ctx.search_right_bound + 1) { + // `ikey_` comes after `search_right_bound_`. The lookup key does + // not exist on this level, so let's skip this level and do a full + // binary search on the next level. + fp_ctx.search_left_bound = 0; + fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex; + current_level_range_.SkipKey(mget_iter); + continue; + } else { + level_contains_keys = true; + } + } else { + // search_left_bound > search_right_bound, key does not exist in + // this level. Since no comparison is done in this level, it will + // need to search all files in the next level. + fp_ctx.search_left_bound = 0; + fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex; + current_level_range_.SkipKey(mget_iter); + continue; + } + } + fp_ctx.start_index_in_curr_level = start_index; + fp_ctx.curr_index_in_curr_level = start_index; + } + if (level_contains_keys) { +#ifndef NDEBUG + prev_file_ = nullptr; +#endif + batch_iter_prev_ = current_level_range_.begin(); + batch_iter_ = current_level_range_.begin(); + return true; + } + curr_level_++; + } + // curr_level_ = num_levels_. So, no more levels to search. + return false; + } +}; } // anonymous namespace VersionStorageInfo::~VersionStorageInfo() { delete[] files_; } @@ -1318,6 +1721,163 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } } +void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, + ReadCallback* callback, bool* is_blob) { + PinnedIteratorsManager pinned_iters_mgr; + + // Pin blocks that we read to hold merge operands + if (merge_operator_) { + pinned_iters_mgr.StartPinning(); + } + + // Even though we know the batch size won't be > MAX_BATCH_SIZE, + // use autovector in order to avoid unnecessary construction of GetContext + // objects, which is expensive + autovector get_ctx; + for (auto iter = range->begin(); iter != range->end(); ++iter) { + assert(iter->s->ok() || iter->s->IsMergeInProgress()); + get_ctx.emplace_back( + user_comparator(), merge_operator_, info_log_, db_statistics_, + iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey, + iter->value, nullptr, &(iter->merge_context), + &iter->max_covering_tombstone_seq, this->env_, &iter->seq, + merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob); + iter->get_context = &get_ctx.back(); + } + + MultiGetRange file_picker_range(*range, range->begin(), range->end()); + FilePickerMultiGet fp( + storage_info_.files_, &file_picker_range, + &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_, + &storage_info_.file_indexer_, user_comparator(), internal_comparator()); + FdWithKeyRange* f = fp.GetNextFile(); + + while (f != nullptr) { + MultiGetRange file_range = fp.CurrentFileRange(); + bool timer_enabled = + GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && + get_perf_context()->per_level_perf_context_enabled; + StopWatchNano timer(env_, timer_enabled /* auto_start */); + Status s = table_cache_->MultiGet( + read_options, *internal_comparator(), *f->file_metadata, &file_range, + mutable_cf_options_.prefix_extractor.get(), + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()), + fp.GetCurrentLevel()); + // TODO: examine the behavior for corrupted key + if (timer_enabled) { + PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), + fp.GetCurrentLevel()); + } + if (!s.ok()) { + // TODO: Set status for individual keys appropriately + for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { + *iter->s = s; + file_range.MarkKeyDone(iter); + } + return; + } + uint64_t batch_size = 0; + for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { + GetContext& get_context = *iter->get_context; + Status* status = iter->s; + + if (get_context.sample()) { + sample_file_read_inc(f->file_metadata); + } + batch_size++; + // report the counters before returning + if (get_context.State() != GetContext::kNotFound && + get_context.State() != GetContext::kMerge && + db_statistics_ != nullptr) { + get_context.ReportCounters(); + } else { + if (iter->max_covering_tombstone_seq > 0) { + // The remaining files we look at will only contain covered keys, so + // we stop here for this key + file_picker_range.SkipKey(iter); + } + } + switch (get_context.State()) { + case GetContext::kNotFound: + // Keep searching in other files + break; + case GetContext::kMerge: + // TODO: update per-level perfcontext user_key_return_count for kMerge + break; + case GetContext::kFound: + if (fp.GetHitFileLevel() == 0) { + RecordTick(db_statistics_, GET_HIT_L0); + } else if (fp.GetHitFileLevel() == 1) { + RecordTick(db_statistics_, GET_HIT_L1); + } else if (fp.GetHitFileLevel() >= 2) { + RecordTick(db_statistics_, GET_HIT_L2_AND_UP); + } + PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, + fp.GetHitFileLevel()); + file_range.MarkKeyDone(iter); + continue; + case GetContext::kDeleted: + // Use empty error message for speed + *status = Status::NotFound(); + file_range.MarkKeyDone(iter); + continue; + case GetContext::kCorrupt: + *status = + Status::Corruption("corrupted key for ", iter->lkey->user_key()); + file_range.MarkKeyDone(iter); + continue; + case GetContext::kBlobIndex: + ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); + *status = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + file_range.MarkKeyDone(iter); + continue; + } + } + RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size); + if (file_picker_range.empty()) { + break; + } + f = fp.GetNextFile(); + } + + // Process any left over keys + for (auto iter = range->begin(); iter != range->end(); ++iter) { + GetContext& get_context = *iter->get_context; + Status* status = iter->s; + Slice user_key = iter->lkey->user_key(); + + if (db_statistics_ != nullptr) { + get_context.ReportCounters(); + } + if (GetContext::kMerge == get_context.State()) { + if (!merge_operator_) { + *status = Status::InvalidArgument( + "merge_operator is not properly initialized."); + range->MarkKeyDone(iter); + continue; + } + // merge_operands are in saver and we hit the beginning of the key history + // do a final merge of nullptr and operands; + std::string* str_value = + iter->value != nullptr ? iter->value->GetSelf() : nullptr; + *status = MergeHelper::TimedFullMerge( + merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(), + str_value, info_log_, db_statistics_, env_, + nullptr /* result_operand */, true); + if (LIKELY(iter->value != nullptr)) { + iter->value->PinSelf(); + } + } else { + range->MarkKeyDone(iter); + *status = Status::NotFound(); // Use an empty error message for speed + } + } +} + bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) { // Reaching the bottom level implies misses at all upper levels, so we'll // skip checking the filters when we predict a hit. diff --git a/db/version_set.h b/db/version_set.h index 16b7b4347..b2b6cc863 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -44,6 +44,8 @@ #include "options/db_options.h" #include "port/port.h" #include "rocksdb/env.h" +#include "table/get_context.h" +#include "table/multiget_context.h" namespace rocksdb { @@ -552,6 +554,7 @@ class VersionStorageInfo { void operator=(const VersionStorageInfo&) = delete; }; +using MultiGetRange = MultiGetContext::Range; class Version { public: // Append to *iters a sequence of iterators that will @@ -593,6 +596,9 @@ class Version { SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr, bool* is_blob = nullptr); + void MultiGet(const ReadOptions&, MultiGetRange* range, + ReadCallback* callback = nullptr, bool* is_blob = nullptr); + // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. void PrepareApply(const MutableCFOptions& mutable_cf_options, diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index b40af20e2..8bec4a56f 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -421,6 +421,46 @@ class DB { keys, values); } + // Overloaded MultiGet API that improves performance by batching operations + // in the read path for greater efficiency. Currently, only the block based + // table format with full filters are supported. Other table formats such + // as plain table, block based table with block based filters and + // partitioned indexes will still work, but will not get any performance + // benefits. + // Parameters - + // options - ReadOptions + // column_family - ColumnFamilyHandle* that the keys belong to. All the keys + // passed to the API are restricted to a single column family + // num_keys - Number of keys to lookup + // keys - Pointer to C style array of key Slices with num_keys elements + // values - Pointer to C style array of PinnableSlices with num_keys elements + // statuses - Pointer to C style array of Status with num_keys elements + // sorted_input - If true, it means the input keys are already sorted by key + // order, so the MultiGet() API doesn't have to sort them + // again. If false, the keys will be copied and sorted + // internally by the API - the input array will not be + // modified + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + const bool /*sorted_input*/ = false) { + std::vector cf; + std::vector user_keys; + std::vector status; + std::vector vals; + + for (size_t i = 0; i < num_keys; ++i) { + cf.emplace_back(column_family); + user_keys.emplace_back(keys[i]); + } + status = MultiGet(options, cf, user_keys, &vals); + std::copy(status.begin(), status.end(), statuses); + for (auto& value : vals) { + values->PinSelf(value); + values++; + } + } // If the key definitely does not exist in the database, then this method // returns false, else true. If the caller wants to obtain value when the key // is found in memory, a bool for 'value_found' must be passed. 'value_found' diff --git a/include/rocksdb/filter_policy.h b/include/rocksdb/filter_policy.h index 5d465b782..d2da38b51 100644 --- a/include/rocksdb/filter_policy.h +++ b/include/rocksdb/filter_policy.h @@ -24,6 +24,8 @@ #include #include #include +#include "db/dbformat.h" +#include "table/multiget_context.h" namespace rocksdb { @@ -64,12 +66,20 @@ class FilterBitsBuilder { // A class that checks if a key can be in filter // It should be initialized by Slice generated by BitsBuilder +using MultiGetRange = MultiGetContext::Range; class FilterBitsReader { public: virtual ~FilterBitsReader() {} // Check if the entry match the bits in filter virtual bool MayMatch(const Slice& entry) = 0; + + // Check if an array of entries match the bits in filter + virtual void MayMatch(int num_keys, Slice** keys, bool* may_match) { + for (int i = 0; i < num_keys; ++i) { + may_match[i] = MayMatch(*keys[i]); + } + } }; // We add a new format of filter block called full filter block diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index bad1c87ec..3b2b2e048 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -81,6 +81,8 @@ enum Tickers : uint32_t { // exist. BLOOM_FILTER_FULL_TRUE_POSITIVE, + BLOOM_FILTER_MICROS, + // # persistent cache hit PERSISTENT_CACHE_HIT, // # persistent cache miss @@ -424,6 +426,7 @@ enum Histograms : uint32_t { BLOB_DB_DECOMPRESSION_MICROS, // Time spent flushing memtable to disk FLUSH_TIME, + SST_BATCH_SIZE, HISTOGRAM_ENUM_MAX, }; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 8fef9b3e8..dd3e09016 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -96,6 +96,15 @@ class StackableDB : public DB { return db_->MultiGet(options, column_family, keys, values); } + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + const bool sorted_input = false) override { + return db_->MultiGet(options, column_family, num_keys, keys, + values, statuses, sorted_input); + } + using DB::IngestExternalFile; virtual Status IngestExternalFile( ColumnFamilyHandle* column_family, diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index adb8cbfed..fe2f2e25a 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -45,6 +45,7 @@ const std::vector> TickersNameMap = { {BLOOM_FILTER_FULL_POSITIVE, "rocksdb.bloom.filter.full.positive"}, {BLOOM_FILTER_FULL_TRUE_POSITIVE, "rocksdb.bloom.filter.full.true.positive"}, + {BLOOM_FILTER_MICROS, "rocksdb.bloom.filter.micros"}, {PERSISTENT_CACHE_HIT, "rocksdb.persistent.cache.hit"}, {PERSISTENT_CACHE_MISS, "rocksdb.persistent.cache.miss"}, {SIM_BLOCK_CACHE_HIT, "rocksdb.sim.block.cache.hit"}, @@ -228,6 +229,7 @@ const std::vector> HistogramsNameMap = { {BLOB_DB_COMPRESSION_MICROS, "rocksdb.blobdb.compression.micros"}, {BLOB_DB_DECOMPRESSION_MICROS, "rocksdb.blobdb.decompression.micros"}, {FLUSH_TIME, "rocksdb.db.flush.micros"}, + {SST_BATCH_SIZE, "rocksdb.sst.batch.size"}, }; std::shared_ptr CreateDBStatistics() { diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index dc2d4263e..b58960a85 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -39,6 +39,7 @@ #include "table/get_context.h" #include "table/internal_iterator.h" #include "table/meta_blocks.h" +#include "table/multiget_context.h" #include "table/partitioned_filter_block.h" #include "table/persistent_cache_helper.h" #include "table/sst_file_writer_collectors.h" @@ -2623,6 +2624,29 @@ bool BlockBasedTable::FullFilterKeyMayMatch( return may_match; } +void BlockBasedTable::FullFilterKeysMayMatch( + const ReadOptions& read_options, FilterBlockReader* filter, + MultiGetRange* range, const bool no_io, + const SliceTransform* prefix_extractor) const { + if (filter == nullptr || filter->IsBlockBased()) { + return; + } + if (filter->whole_key_filtering()) { + filter->KeysMayMatch(range, prefix_extractor, kNotValid, no_io); + } else if (!read_options.total_order_seek && prefix_extractor && + rep_->table_properties->prefix_extractor_name.compare( + prefix_extractor->Name()) == 0) { + for (auto iter = range->begin(); iter != range->end(); ++iter) { + Slice user_key = iter->lkey->user_key(); + + if (!prefix_extractor->InDomain(user_key)) { + range->SkipKey(iter); + } + } + filter->PrefixesMayMatch(range, prefix_extractor, kNotValid, false); + } +} + Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, GetContext* get_context, const SliceTransform* prefix_extractor, @@ -2631,17 +2655,22 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, Status s; const bool no_io = read_options.read_tier == kBlockCacheTier; CachableEntry filter_entry; - if (!skip_filters) { - filter_entry = - GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr, - read_options.read_tier == kBlockCacheTier, get_context); - } - FilterBlockReader* filter = filter_entry.value; + bool may_match; + FilterBlockReader* filter = nullptr; + { + if (!skip_filters) { + filter_entry = + GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr, + read_options.read_tier == kBlockCacheTier, get_context); + } + filter = filter_entry.value; - // First check the full filter - // If full filter not useful, Then go into each block - if (!FullFilterKeyMayMatch(read_options, filter, key, no_io, - prefix_extractor)) { + // First check the full filter + // If full filter not useful, Then go into each block + may_match = FullFilterKeyMayMatch(read_options, filter, key, no_io, + prefix_extractor); + } + if (!may_match) { RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL); PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level); } else { @@ -2747,6 +2776,122 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, return s; } +using MultiGetRange = MultiGetContext::Range; +void BlockBasedTable::MultiGet(const ReadOptions& read_options, + const MultiGetRange* mget_range, + const SliceTransform* prefix_extractor, + bool skip_filters) { + const bool no_io = read_options.read_tier == kBlockCacheTier; + CachableEntry filter_entry; + FilterBlockReader* filter = nullptr; + MultiGetRange sst_file_range(*mget_range, mget_range->begin(), + mget_range->end()); + { + if (!skip_filters) { + // TODO: Figure out where the stats should go + filter_entry = GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr, + read_options.read_tier == kBlockCacheTier, + nullptr /*get_context*/); + } + filter = filter_entry.value; + + // First check the full filter + // If full filter not useful, Then go into each block + FullFilterKeysMayMatch(read_options, filter, &sst_file_range, no_io, + prefix_extractor); + } + if (skip_filters || !sst_file_range.empty()) { + IndexBlockIter iiter_on_stack; + // if prefix_extractor found in block differs from options, disable + // BlockPrefixIndex. Only do this check when index_type is kHashSearch. + bool need_upper_bound_check = false; + if (rep_->index_type == BlockBasedTableOptions::kHashSearch) { + need_upper_bound_check = PrefixExtractorChanged( + rep_->table_properties.get(), prefix_extractor); + } + auto iiter = NewIndexIterator( + read_options, need_upper_bound_check, &iiter_on_stack, + /* index_entry */ nullptr, sst_file_range.begin()->get_context); + std::unique_ptr> iiter_unique_ptr; + if (iiter != &iiter_on_stack) { + iiter_unique_ptr.reset(iiter); + } + + for (auto miter = sst_file_range.begin(); miter != sst_file_range.end(); + ++miter) { + Status s; + GetContext* get_context = miter->get_context; + const Slice& key = miter->ikey; + bool matched = false; // if such user key matched a key in SST + bool done = false; + for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { + DataBlockIter biter; + NewDataBlockIterator( + rep_, read_options, iiter->value(), &biter, false, + true /* key_includes_seq */, get_context); + + if (read_options.read_tier == kBlockCacheTier && + biter.status().IsIncomplete()) { + // couldn't get block from block_cache + // Update Saver.state to Found because we are only looking for + // whether we can guarantee the key is not there when "no_io" is set + get_context->MarkKeyMayExist(); + break; + } + if (!biter.status().ok()) { + s = biter.status(); + break; + } + + bool may_exist = biter.SeekForGet(key); + if (!may_exist) { + // HashSeek cannot find the key this block and the the iter is not + // the end of the block, i.e. cannot be in the following blocks + // either. In this case, the seek_key cannot be found, so we break + // from the top level for-loop. + break; + } + + // Call the *saver function on each entry/block until it returns false + for (; biter.Valid(); biter.Next()) { + ParsedInternalKey parsed_key; + if (!ParseInternalKey(biter.key(), &parsed_key)) { + s = Status::Corruption(Slice()); + } + + if (!get_context->SaveValue( + parsed_key, biter.value(), &matched, + biter.IsValuePinned() ? &biter : nullptr)) { + done = true; + break; + } + } + s = biter.status(); + if (done) { + // Avoid the extra Next which is expensive in two-level indexes + break; + } + } + if (matched && filter != nullptr && !filter->IsBlockBased()) { + RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_TRUE_POSITIVE); + PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1, + rep_->level); + } + if (s.ok()) { + s = iiter->status(); + } + *(miter->s) = s; + } + } + + // if rep_->filter_entry is not set, we should call Release(); otherwise + // don't call, in this case we have a local copy in rep_->filter_entry, + // it's pinned to the cache and will be released in the destructor + if (!rep_->filter_entry.IsSet()) { + filter_entry.Release(rep_->table_options.block_cache.get()); + } +} + Status BlockBasedTable::Prefetch(const Slice* const begin, const Slice* const end) { auto& comparator = rep_->internal_comparator; diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index f0b5cdb1b..e75a71cda 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -27,6 +27,8 @@ #include "table/block_based_table_factory.h" #include "table/filter_block.h" #include "table/format.h" +#include "table/get_context.h" +#include "table/multiget_context.h" #include "table/persistent_cache_helper.h" #include "table/table_properties_internal.h" #include "table/table_reader.h" @@ -121,6 +123,11 @@ class BlockBasedTable : public TableReader { GetContext* get_context, const SliceTransform* prefix_extractor, bool skip_filters = false) override; + void MultiGet(const ReadOptions& readOptions, + const MultiGetContext::Range* mget_range, + const SliceTransform* prefix_extractor, + bool skip_filters = false) override; + // Pre-fetch the disk blocks that correspond to the key range specified by // (kbegin, kend). The call will return error status in the event of // IO or iteration error. @@ -355,6 +362,11 @@ class BlockBasedTable : public TableReader { const Slice& user_key, const bool no_io, const SliceTransform* prefix_extractor = nullptr) const; + void FullFilterKeysMayMatch( + const ReadOptions& read_options, FilterBlockReader* filter, + MultiGetRange* range, const bool no_io, + const SliceTransform* prefix_extractor = nullptr) const; + static Status PrefetchTail( RandomAccessFileReader* file, uint64_t file_size, TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all, diff --git a/table/filter_block.h b/table/filter_block.h index a93049547..fe1678908 100644 --- a/table/filter_block.h +++ b/table/filter_block.h @@ -99,6 +99,19 @@ class FilterBlockReader { const bool no_io = false, const Slice* const const_ikey_ptr = nullptr) = 0; + virtual void KeysMayMatch(MultiGetRange* range, + const SliceTransform* prefix_extractor, + uint64_t block_offset = kNotValid, + const bool no_io = false) { + for (auto iter = range->begin(); iter != range->end(); ++iter) { + const Slice ukey = iter->ukey; + const Slice ikey = iter->ikey; + if (!KeyMayMatch(ukey, prefix_extractor, block_offset, no_io, &ikey)) { + range->SkipKey(iter); + } + } + } + /** * no_io and const_ikey_ptr here means the same as in KeyMayMatch */ @@ -108,6 +121,20 @@ class FilterBlockReader { const bool no_io = false, const Slice* const const_ikey_ptr = nullptr) = 0; + virtual void PrefixesMayMatch(MultiGetRange* range, + const SliceTransform* prefix_extractor, + uint64_t block_offset = kNotValid, + const bool no_io = false) { + for (auto iter = range->begin(); iter != range->end(); ++iter) { + const Slice ukey = iter->ukey; + const Slice ikey = iter->ikey; + if (!KeyMayMatch(prefix_extractor->Transform(ukey), prefix_extractor, + block_offset, no_io, &ikey)) { + range->SkipKey(iter); + } + } + } + virtual size_t ApproximateMemoryUsage() const = 0; virtual size_t size() const { return size_; } virtual Statistics* statistics() const { return statistics_; } diff --git a/table/full_filter_block.cc b/table/full_filter_block.cc index a7491a716..34012fd82 100644 --- a/table/full_filter_block.cc +++ b/table/full_filter_block.cc @@ -159,6 +159,59 @@ bool FullFilterBlockReader::MayMatch(const Slice& entry) { return true; // remain the same with block_based filter } +void FullFilterBlockReader::KeysMayMatch( + MultiGetRange* range, const SliceTransform* /*prefix_extractor*/, + uint64_t block_offset, const bool /*no_io*/) { +#ifdef NDEBUG + (void)range; + (void)block_offset; +#endif + assert(block_offset == kNotValid); + if (!whole_key_filtering_) { + // Simply return. Don't skip any key - consider all keys as likely to be + // present + return; + } + MayMatch(range); +} + +void FullFilterBlockReader::PrefixesMayMatch( + MultiGetRange* range, const SliceTransform* /* prefix_extractor */, + uint64_t block_offset, const bool /*no_io*/) { +#ifdef NDEBUG + (void)range; + (void)block_offset; +#endif + assert(block_offset == kNotValid); + MayMatch(range); +} + +void FullFilterBlockReader::MayMatch(MultiGetRange* range) { + if (contents_.size() == 0) { + return; + } + + // We need to use an array instead of autovector for may_match since + // &may_match[0] doesn't work for autovector (compiler error). So + // declare both keys and may_match as arrays, which is also slightly less + // expensive compared to autovector + Slice* keys[MultiGetContext::MAX_BATCH_SIZE]; + bool may_match[MultiGetContext::MAX_BATCH_SIZE]; + int num_keys = 0; + for (auto iter = range->begin(); iter != range->end(); ++iter) { + keys[num_keys++] = &iter->ukey; + } + filter_bits_reader_->MayMatch(num_keys, &keys[0], &may_match[0]); + + int i = 0; + for (auto iter = range->begin(); iter != range->end(); ++iter) { + if (!may_match[i]) { + range->SkipKey(iter); + } + ++i; + } +} + size_t FullFilterBlockReader::ApproximateMemoryUsage() const { size_t usage = block_contents_.usable_size(); #ifdef ROCKSDB_MALLOC_USABLE_SIZE diff --git a/table/full_filter_block.h b/table/full_filter_block.h index e4384c91a..f97952a7c 100644 --- a/table/full_filter_block.h +++ b/table/full_filter_block.h @@ -107,6 +107,16 @@ class FullFilterBlockReader : public FilterBlockReader { const Slice& prefix, const SliceTransform* prefix_extractor, uint64_t block_offset = kNotValid, const bool no_io = false, const Slice* const const_ikey_ptr = nullptr) override; + + virtual void KeysMayMatch(MultiGetRange* range, + const SliceTransform* prefix_extractor, + uint64_t block_offset = kNotValid, + const bool no_io = false) override; + + virtual void PrefixesMayMatch(MultiGetRange* range, + const SliceTransform* prefix_extractor, + uint64_t block_offset = kNotValid, + const bool no_io = false) override; virtual size_t ApproximateMemoryUsage() const override; virtual bool RangeMayExist(const Slice* iterate_upper_bound, const Slice& user_key, const SliceTransform* prefix_extractor, @@ -124,6 +134,7 @@ class FullFilterBlockReader : public FilterBlockReader { // No copying allowed FullFilterBlockReader(const FullFilterBlockReader&); bool MayMatch(const Slice& entry); + void MayMatch(MultiGetRange* range); void operator=(const FullFilterBlockReader&); bool IsFilterCompatible(const Slice* iterate_upper_bound, const Slice& prefix, const Comparator* comparator); diff --git a/table/full_filter_block_test.cc b/table/full_filter_block_test.cc index f01ae52bf..3abae979a 100644 --- a/table/full_filter_block_test.cc +++ b/table/full_filter_block_test.cc @@ -45,6 +45,8 @@ class TestFilterBitsReader : public FilterBitsReader { explicit TestFilterBitsReader(const Slice& contents) : data_(contents.data()), len_(static_cast(contents.size())) {} + // Silence compiler warning about overloaded virtual + using FilterBitsReader::MayMatch; bool MayMatch(const Slice& entry) override { uint32_t h = Hash(entry.data(), entry.size(), 1); for (size_t i = 0; i + 4 <= len_; i += 4) { diff --git a/table/get_context.h b/table/get_context.h index d7d0e9808..7ed316f0e 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -5,6 +5,7 @@ #pragma once #include +#include #include "db/merge_context.h" #include "db/read_callback.h" #include "rocksdb/env.h" @@ -61,6 +62,8 @@ class GetContext { PinnedIteratorsManager* _pinned_iters_mgr = nullptr, ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); + GetContext() = default; + void MarkKeyMayExist(); // Records this key, value, and any meta-data (such as sequence number and diff --git a/table/multiget_context.h b/table/multiget_context.h new file mode 100644 index 000000000..d3a8d0946 --- /dev/null +++ b/table/multiget_context.h @@ -0,0 +1,249 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include +#include +#include "db/lookup_key.h" +#include "db/merge_context.h" +#include "rocksdb/env.h" +#include "rocksdb/statistics.h" +#include "rocksdb/types.h" +#include "util/autovector.h" + +namespace rocksdb { +class GetContext; + +struct KeyContext { + const Slice* key; + LookupKey* lkey; + Slice ukey; + Slice ikey; + Status* s; + MergeContext merge_context; + SequenceNumber max_covering_tombstone_seq; + bool key_exists; + SequenceNumber seq; + void* cb_arg; + PinnableSlice* value; + GetContext* get_context; + + KeyContext(const Slice& user_key, PinnableSlice* val, Status* stat) + : key(&user_key), + lkey(nullptr), + s(stat), + max_covering_tombstone_seq(0), + key_exists(false), + seq(0), + cb_arg(nullptr), + value(val), + get_context(nullptr) {} + + KeyContext() = default; +}; + +// The MultiGetContext class is a container for the sorted list of keys that +// we need to lookup in a batch. Its main purpose is to make batch execution +// easier by allowing various stages of the MultiGet lookups to operate on +// subsets of keys, potentially non-contiguous. In order to accomplish this, +// it defines the following classes - +// +// MultiGetContext::Range +// MultiGetContext::Range::Iterator +// MultiGetContext::Range::IteratorWrapper +// +// Here is an example of how this can be used - +// +// { +// MultiGetContext ctx(...); +// MultiGetContext::Range range = ctx.GetMultiGetRange(); +// +// // Iterate to determine some subset of the keys +// MultiGetContext::Range::Iterator start = range.begin(); +// MultiGetContext::Range::Iterator end = ...; +// +// // Make a new range with a subset of keys +// MultiGetContext::Range subrange(range, start, end); +// +// // Define an auxillary vector, if needed, to hold additional data for +// // each key +// std::array aux; +// +// // Iterate over the subrange and the auxillary vector simultaneously +// MultiGetContext::Range::Iterator iter = subrange.begin(); +// for (; iter != subrange.end(); ++iter) { +// KeyContext& key = *iter; +// Foo& aux_key = aux_iter[iter.index()]; +// ... +// } +// } +class MultiGetContext { + public: + // Limit the number of keys in a batch to this number. Benchmarks show that + // there is negligible benefit for batches exceeding this. Keeping this < 64 + // simplifies iteration, as well as reduces the amount of stack allocations + // htat need to be performed + static const int MAX_BATCH_SIZE = 32; + + MultiGetContext(KeyContext** sorted_keys, size_t num_keys, + SequenceNumber snapshot) + : sorted_keys_(sorted_keys), + num_keys_(num_keys), + value_mask_(0), + lookup_key_ptr_(reinterpret_cast(lookup_key_stack_buf)) { + int index = 0; + + if (num_keys > MAX_LOOKUP_KEYS_ON_STACK) { + lookup_key_heap_buf.reset(new char[sizeof(LookupKey) * num_keys]); + lookup_key_ptr_ = reinterpret_cast( + lookup_key_heap_buf.get()); + } + + for (size_t iter = 0; iter != num_keys_; ++iter) { + sorted_keys_[iter]->lkey = new (&lookup_key_ptr_[index]) + LookupKey(*sorted_keys_[iter]->key, snapshot); + sorted_keys_[iter]->ukey = sorted_keys_[iter]->lkey->user_key(); + sorted_keys_[iter]->ikey = sorted_keys_[iter]->lkey->internal_key(); + index++; + } + } + + ~MultiGetContext() { + for (size_t i = 0; i < num_keys_; ++i) { + lookup_key_ptr_[i].~LookupKey(); + } + } + + private: + static const int MAX_LOOKUP_KEYS_ON_STACK = 16; + alignas(alignof(LookupKey)) + char lookup_key_stack_buf[sizeof(LookupKey) * MAX_LOOKUP_KEYS_ON_STACK]; + KeyContext** sorted_keys_; + size_t num_keys_; + uint64_t value_mask_; + std::unique_ptr lookup_key_heap_buf; + LookupKey* lookup_key_ptr_; + + public: + // MultiGetContext::Range - Specifies a range of keys, by start and end index, + // from the parent MultiGetContext. Each range contains a bit vector that + // indicates whether the corresponding keys need to be processed or skipped. + // A Range object can be copy constructed, and the new object inherits the + // original Range's bit vector. This is useful for progressively skipping + // keys as the lookup goes through various stages. For example, when looking + // up keys in the same SST file, a Range is created excluding keys not + // belonging to that file. A new Range is then copy constructed and individual + // keys are skipped based on bloom filter lookup. + class Range { + public: + // MultiGetContext::Range::Iterator - A forward iterator that iterates over + // non-skippable keys in a Range, as well as keys whose final value has been + // found. The latter is tracked by MultiGetContext::value_mask_ + class Iterator { + public: + // -- iterator traits + typedef Iterator self_type; + typedef KeyContext value_type; + typedef KeyContext& reference; + typedef KeyContext* pointer; + typedef int difference_type; + typedef std::forward_iterator_tag iterator_category; + + Iterator(const Range* range, size_t idx) + : range_(range), ctx_(range->ctx_), index_(idx) { + while (index_ < range_->end_ && + (1ull << index_) & + (range_->ctx_->value_mask_ | range_->skip_mask_)) + index_++; + } + + Iterator(const Iterator&) = default; + Iterator& operator=(const Iterator&) = default; + + Iterator& operator++() { + while (++index_ < range_->end_ && + (1ull << index_) & + (range_->ctx_->value_mask_ | range_->skip_mask_)) + ; + return *this; + } + + bool operator==(Iterator other) const { + assert(range_->ctx_ == other.range_->ctx_); + return index_ == other.index_; + } + + bool operator!=(Iterator other) const { + assert(range_->ctx_ == other.range_->ctx_); + return index_ != other.index_; + } + + KeyContext& operator*() { + assert(index_ < range_->end_ && index_ >= range_->start_); + return *(ctx_->sorted_keys_[index_]); + } + + KeyContext* operator->() { + assert(index_ < range_->end_ && index_ >= range_->start_); + return ctx_->sorted_keys_[index_]; + } + + size_t index() { return index_; } + + private: + friend Range; + const Range* range_; + const MultiGetContext* ctx_; + size_t index_; + }; + + Range(const Range& mget_range, + const Iterator& first, + const Iterator& last) { + ctx_ = mget_range.ctx_; + start_ = first.index_; + end_ = last.index_; + skip_mask_ = mget_range.skip_mask_; + } + + Range() = default; + + Iterator begin() const { return Iterator(this, start_); } + + Iterator end() const { return Iterator(this, end_); } + + bool empty() { + return (((1ull << end_) - 1) & ~((1ull << start_) - 1) & + ~(ctx_->value_mask_ | skip_mask_)) == 0; + } + + void SkipKey(const Iterator& iter) { skip_mask_ |= 1ull << iter.index_; } + + // Update the value_mask_ in MultiGetContext so its + // immediately reflected in all the Range Iterators + void MarkKeyDone(Iterator& iter) { + ctx_->value_mask_ |= (1ull << iter.index_); + } + + bool CheckKeyDone(Iterator& iter) { + return ctx_->value_mask_ & (1ull << iter.index_); + } + + private: + friend MultiGetContext; + MultiGetContext* ctx_; + size_t start_; + size_t end_; + uint64_t skip_mask_; + + Range(MultiGetContext* ctx, size_t num_keys) + : ctx_(ctx), start_(0), end_(num_keys), skip_mask_(0) {} + }; + + // Return the initial range that encompasses all the keys in the batch + Range GetMultiGetRange() { return Range(this, num_keys_); } +}; + +} // namespace rocksdb diff --git a/table/table_reader.h b/table/table_reader.h index a5f15e130..bd6071d9c 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -11,7 +11,9 @@ #include #include "db/range_tombstone_fragmenter.h" #include "rocksdb/slice_transform.h" +#include "table/get_context.h" #include "table/internal_iterator.h" +#include "table/multiget_context.h" namespace rocksdb { @@ -22,6 +24,7 @@ class Arena; struct ReadOptions; struct TableProperties; class GetContext; +class MultiGetContext; // A Table is a sorted map from strings to strings. Tables are // immutable and persistent. A Table may be safely accessed from @@ -86,6 +89,16 @@ class TableReader { const SliceTransform* prefix_extractor, bool skip_filters = false) = 0; + virtual void MultiGet(const ReadOptions& readOptions, + const MultiGetContext::Range* mget_range, + const SliceTransform* prefix_extractor, + bool skip_filters = false) { + for (auto iter = mget_range->begin(); iter != mget_range->end(); ++iter) { + *iter->s = Get(readOptions, iter->ikey, iter->get_context, + prefix_extractor, skip_filters); + } + } + // Prefetch data corresponding to a give range of keys // Typically this functionality is required for table implementations that // persists the data on a non volatile storage medium like disk/SSD diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 0cb4e0eb2..88222664b 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1114,6 +1114,9 @@ DEFINE_uint64(stats_persist_period_sec, DEFINE_uint64(stats_history_buffer_size, rocksdb::Options().stats_history_buffer_size, "Max number of stats snapshots to keep in memory"); +DEFINE_int64(multiread_stride, 0, + "Stride length for the keys in a MultiGet batch"); +DEFINE_bool(multiread_batched, false, "Use the new MultiGet API"); enum RepFactory { kSkipList, @@ -2700,6 +2703,10 @@ void VerifyDBFromDB(std::string& truth_db_name) { } else if (name == "readreverse") { method = &Benchmark::ReadReverse; } else if (name == "readrandom") { + if (FLAGS_multiread_stride) { + fprintf(stderr, "entries_per_batch = %" PRIi64 "\n", + entries_per_batch_); + } method = &Benchmark::ReadRandom; } else if (name == "readrandomfast") { method = &Benchmark::ReadRandomFast; @@ -4531,6 +4538,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { int64_t read = 0; int64_t found = 0; int64_t bytes = 0; + int num_keys = 0; + int64_t key_rand = GetRandomKey(&thread->rand); ReadOptions options(FLAGS_verify_checksum, true); std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); @@ -4542,8 +4551,21 @@ void VerifyDBFromDB(std::string& truth_db_name) { // We use same key_rand as seed for key and column family so that we can // deterministically find the cfh corresponding to a particular key, as it // is done in DoWrite method. - int64_t key_rand = GetRandomKey(&thread->rand); GenerateKeyFromInt(key_rand, FLAGS_num, &key); + if (entries_per_batch_ > 1 && FLAGS_multiread_stride) { + if (++num_keys == entries_per_batch_) { + num_keys = 0; + key_rand = GetRandomKey(&thread->rand); + if ((key_rand + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >= + FLAGS_num) { + key_rand = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride; + } + } else { + key_rand += FLAGS_multiread_stride; + } + } else { + key_rand = GetRandomKey(&thread->rand); + } read++; Status s; if (FLAGS_num_column_families > 1) { @@ -4595,6 +4617,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { std::vector keys; std::vector > key_guards; std::vector values(entries_per_batch_); + PinnableSlice* pin_values = new PinnableSlice[entries_per_batch_]; + std::vector stat_list(entries_per_batch_); while (static_cast(keys.size()) < entries_per_batch_) { key_guards.push_back(std::unique_ptr()); keys.push_back(AllocateKey(&key_guards.back())); @@ -4603,21 +4627,52 @@ void VerifyDBFromDB(std::string& truth_db_name) { Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { DB* db = SelectDB(thread); - for (int64_t i = 0; i < entries_per_batch_; ++i) { - GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]); + if (FLAGS_multiread_stride) { + int64_t key = GetRandomKey(&thread->rand); + if ((key + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >= + (int64_t)FLAGS_num) { + key = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride; + } + for (int64_t i = 0; i < entries_per_batch_; ++i) { + GenerateKeyFromInt(key, FLAGS_num, &keys[i]); + key += FLAGS_multiread_stride; + } + } else { + for (int64_t i = 0; i < entries_per_batch_; ++i) { + GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]); + } } - std::vector statuses = db->MultiGet(options, keys, &values); - assert(static_cast(statuses.size()) == entries_per_batch_); - - read += entries_per_batch_; - num_multireads++; - for (int64_t i = 0; i < entries_per_batch_; ++i) { - if (statuses[i].ok()) { - ++found; - } else if (!statuses[i].IsNotFound()) { - fprintf(stderr, "MultiGet returned an error: %s\n", - statuses[i].ToString().c_str()); - abort(); + if (!FLAGS_multiread_batched) { + std::vector statuses = db->MultiGet(options, keys, &values); + assert(static_cast(statuses.size()) == entries_per_batch_); + + read += entries_per_batch_; + num_multireads++; + for (int64_t i = 0; i < entries_per_batch_; ++i) { + if (statuses[i].ok()) { + ++found; + } else if (!statuses[i].IsNotFound()) { + fprintf(stderr, "MultiGet returned an error: %s\n", + statuses[i].ToString().c_str()); + abort(); + } + } + } else { + db->MultiGet(options, db->DefaultColumnFamily(), keys.size(), + keys.data(), pin_values, stat_list.data()); + + read += entries_per_batch_; + num_multireads++; + for (int64_t i = 0; i < entries_per_batch_; ++i) { + if (stat_list[i].ok()) { + ++found; + } else if (!stat_list[i].IsNotFound()) { + fprintf(stderr, "MultiGet returned an error: %s\n", + stat_list[i].ToString().c_str()); + abort(); + } + stat_list[i] = Status::OK(); + pin_values[i].Reset(); } } if (thread->shared->read_rate_limiter.get() != nullptr && diff --git a/util/bloom.cc b/util/bloom.cc index 9c05f7107..1da4f2aa4 100644 --- a/util/bloom.cc +++ b/util/bloom.cc @@ -181,8 +181,37 @@ class FullFilterBitsReader : public FilterBitsReader { // Other Error params, including a broken filter, regarded as match if (num_probes_ == 0 || num_lines_ == 0) return true; uint32_t hash = BloomHash(entry); - return HashMayMatch(hash, Slice(data_, data_len_), - num_probes_, num_lines_); + uint32_t bit_offset; + FilterPrepare(hash, Slice(data_, data_len_), num_lines_, &bit_offset); + return HashMayMatch(hash, Slice(data_, data_len_), num_probes_, bit_offset); + } + + virtual void MayMatch(int num_keys, Slice** keys, bool* may_match) override { + if (data_len_ <= 5) { // remain same with original filter + for (int i = 0; i < num_keys; ++i) { + may_match[i] = false; + } + return; + } + for (int i = 0; i < num_keys; ++i) { + may_match[i] = true; + } + // Other Error params, including a broken filter, regarded as match + if (num_probes_ == 0 || num_lines_ == 0) return; + uint32_t hashes[MultiGetContext::MAX_BATCH_SIZE]; + uint32_t bit_offsets[MultiGetContext::MAX_BATCH_SIZE]; + for (int i = 0; i < num_keys; ++i) { + hashes[i] = BloomHash(*keys[i]); + FilterPrepare(hashes[i], Slice(data_, data_len_), num_lines_, + &bit_offsets[i]); + } + + for (int i = 0; i < num_keys; ++i) { + if (!HashMayMatch(hashes[i], Slice(data_, data_len_), num_probes_, + bit_offsets[i])) { + may_match[i] = false; + } + } } private: @@ -211,7 +240,10 @@ class FullFilterBitsReader : public FilterBitsReader { // Before calling this function, need to ensure the input meta data // is valid. bool HashMayMatch(const uint32_t& hash, const Slice& filter, - const size_t& num_probes, const uint32_t& num_lines); + const size_t& num_probes, const uint32_t& bit_offset); + + void FilterPrepare(const uint32_t& hash, const Slice& filter, + const uint32_t& num_lines, uint32_t* bit_offset); // No Copy allowed FullFilterBitsReader(const FullFilterBitsReader&); @@ -232,29 +264,44 @@ void FullFilterBitsReader::GetFilterMeta(const Slice& filter, *num_lines = DecodeFixed32(filter.data() + len - 4); } +void FullFilterBitsReader::FilterPrepare(const uint32_t& hash, + const Slice& filter, + const uint32_t& num_lines, + uint32_t* bit_offset) { + uint32_t len = static_cast(filter.size()); + if (len <= 5) return; // remain the same with original filter + + // It is ensured the params are valid before calling it + assert(num_lines != 0 && (len - 5) % num_lines == 0); + + uint32_t h = hash; + // Left shift by an extra 3 to convert bytes to bits + uint32_t b = (h % num_lines) << (log2_cache_line_size_ + 3); + PREFETCH(&filter.data()[b / 8], 0 /* rw */, 1 /* locality */); + PREFETCH(&filter.data()[b / 8 + (1 << log2_cache_line_size_) - 1], + 0 /* rw */, 1 /* locality */); + *bit_offset = b; +} + bool FullFilterBitsReader::HashMayMatch(const uint32_t& hash, - const Slice& filter, const size_t& num_probes, - const uint32_t& num_lines) { + const Slice& filter, + const size_t& num_probes, + const uint32_t& bit_offset) { uint32_t len = static_cast(filter.size()); if (len <= 5) return false; // remain the same with original filter // It is ensured the params are valid before calling it assert(num_probes != 0); - assert(num_lines != 0 && (len - 5) % num_lines == 0); const char* data = filter.data(); uint32_t h = hash; const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits - // Left shift by an extra 3 to convert bytes to bits - uint32_t b = (h % num_lines) << (log2_cache_line_size_ + 3); - PREFETCH(&data[b / 8], 0 /* rw */, 1 /* locality */); - PREFETCH(&data[b / 8 + (1 << log2_cache_line_size_) - 1], 0 /* rw */, - 1 /* locality */); for (uint32_t i = 0; i < num_probes; ++i) { // Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized // to a simple and operation by compiler. - const uint32_t bitpos = b + (h & ((1 << (log2_cache_line_size_ + 3)) - 1)); + const uint32_t bitpos = + bit_offset + (h & ((1 << (log2_cache_line_size_ + 3)) - 1)); if (((data[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { return false; } diff --git a/util/fault_injection_test_env.h b/util/fault_injection_test_env.h index 7c5a080f7..a39e5b71e 100644 --- a/util/fault_injection_test_env.h +++ b/util/fault_injection_test_env.h @@ -123,6 +123,8 @@ class FaultInjectionTestEnv : public EnvWrapper { virtual Status RenameFile(const std::string& s, const std::string& t) override; +// Undef to eliminate clash on Windows +#undef GetFreeSpace virtual Status GetFreeSpace(const std::string& path, uint64_t* disk_free) override { if (!IsFilesystemActive() && error_ == Status::NoSpace()) { diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 3beb74fc9..8a7f7e084 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -166,6 +166,16 @@ class BlobDB : public StackableDB { } return MultiGet(options, keys, values); } + virtual void MultiGet(const ReadOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const size_t num_keys, const Slice* /*keys*/, + PinnableSlice* /*values*/, Status* statuses, + const bool /*sorted_input*/ = false) override { + for (size_t i = 0; i < num_keys; ++i) { + statuses[i] = Status::NotSupported( + "Blob DB doesn't support batched MultiGet"); + } + } using rocksdb::StackableDB::SingleDelete; virtual Status SingleDelete(const WriteOptions& /*wopts*/,