diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index c93a5e436..5d016f5f0 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -994,7 +994,7 @@ TEST_F(DBBasicTest, MultiGetMultiCF) { keys.push_back("cf" + std::to_string(i) + "_key"); } - values = MultiGet(cfs, keys); + values = MultiGet(cfs, keys, nullptr); ASSERT_EQ(values.size(), 8); for (unsigned int j = 0; j < values.size(); ++j) { ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val2"); @@ -1054,7 +1054,7 @@ TEST_F(DBBasicTest, MultiGetMultiCFMutex) { keys.push_back("cf" + std::to_string(i) + "_key"); } - values = MultiGet(cfs, keys); + values = MultiGet(cfs, keys, nullptr); ASSERT_TRUE(last_try); ASSERT_EQ(values.size(), 8); for (unsigned int j = 0; j < values.size(); ++j) { @@ -1128,6 +1128,79 @@ TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) { } } +TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + Reopen(options); + int num_keys = 0; + + for (int i = 0; i < 128; ++i) { + ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + MoveFilesToLevel(2); + + for (int i = 0; i < 128; i += 3) { + ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + MoveFilesToLevel(1); + + for (int i = 0; i < 128; i += 5) { + ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + + for (int i = 0; i < 128; i += 9) { + ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); + } + + std::vector keys; + std::vector values; + + for (int i = 64; i < 80; ++i) { + keys.push_back("key_" + std::to_string(i)); + } + + values = MultiGet(keys, nullptr); + ASSERT_EQ(values.size(), 16); + for (unsigned int j = 0; j < values.size(); ++j) { + int key = j + 64; + if (key % 9 == 0) { + ASSERT_EQ(values[j], "val_mem_" + std::to_string(key)); + } else if (key % 5 == 0) { + ASSERT_EQ(values[j], "val_l0_" + std::to_string(key)); + } else if (key % 3 == 0) { + ASSERT_EQ(values[j], "val_l1_" + std::to_string(key)); + } else { + ASSERT_EQ(values[j], "val_l2_" + std::to_string(key)); + } + } +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 8180564c2..97cd2ac79 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -76,7 +76,9 @@ #include "rocksdb/write_buffer_manager.h" #include "table/block.h" #include "table/block_based_table_factory.h" +#include "table/get_context.h" #include "table/merging_iterator.h" +#include "table/multiget_context.h" #include "table/table_builder.h" #include "table/two_level_iterator.h" #include "tools/sst_dump_tool_imp.h" @@ -1659,6 +1661,189 @@ std::vector DBImpl::MultiGet( return stat_list; } +// Order keys by CF ID, followed by key contents +struct CompareKeyContext { + inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) { + const Comparator* comparator = cfd->user_comparator(); + int cmp = comparator->Compare(*(lhs->key), *(rhs->key)); + if (cmp < 0) { + return true; + } + return false; + } + const ColumnFamilyData* cfd; +}; + +void DBImpl::MultiGet(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const size_t num_keys, + const Slice* keys, PinnableSlice* values, + Status* statuses, const bool sorted_input) { + autovector key_context; + for (size_t i = 0; i < num_keys; ++i) { + key_context.emplace_back(keys[i], &values[i], &statuses[i]); + } + + MultiGetImpl(read_options, column_family, key_context, sorted_input, nullptr, + nullptr); +} + +void DBImpl::MultiGetImpl( + const ReadOptions& read_options, ColumnFamilyHandle* column_family, + autovector& key_context, + bool sorted_input, ReadCallback* callback, bool* is_blob_index) { + PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); + StopWatch sw(env_, stats_, DB_MULTIGET); + size_t num_keys = key_context.size(); + + PERF_TIMER_GUARD(get_snapshot_time); + + ColumnFamilyHandleImpl* cfh = + reinterpret_cast(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + + autovector sorted_keys; + sorted_keys.resize(num_keys); + { + size_t index = 0; + for (KeyContext& key : key_context) { +#ifndef NDEBUG + if (index > 0) { + KeyContext* lhs = &key_context[index-1]; + KeyContext* rhs = &key_context[index]; + const Comparator* comparator = cfd->user_comparator(); + int cmp = comparator->Compare(*(lhs->key), *(rhs->key)); + assert(cmp <= 0); + } +#endif + + sorted_keys[index] = &key; + index++; + } + if (!sorted_input) { + CompareKeyContext sort_comparator; + sort_comparator.cfd = cfd; + std::sort(sorted_keys.begin(), sorted_keys.begin() + index, + sort_comparator); + } + } + + // Keep track of bytes that we read for statistics-recording later + PERF_TIMER_STOP(get_snapshot_time); + + // Acquire SuperVersion + SuperVersion* super_version = GetAndRefSuperVersion(cfd); + SequenceNumber snapshot; + if (read_options.snapshot != nullptr) { + // Note: In WritePrepared txns this is not necessary but not harmful + // either. Because prep_seq > snapshot => commit_seq > snapshot so if + // a snapshot is specified we should be fine with skipping seq numbers + // that are greater than that. + // + // In WriteUnprepared, we cannot set snapshot in the lookup key because we + // may skip uncommitted data that should be visible to the transaction for + // reading own writes. + snapshot = + reinterpret_cast(read_options.snapshot)->number_; + if (callback) { + snapshot = std::max(snapshot, callback->max_visible_seq()); + } + } else { + // Since we get and reference the super version before getting + // the snapshot number, without a mutex protection, it is possible + // that a memtable switch happened in the middle and not all the + // data for this snapshot is available. But it will contain all + // the data available in the super version we have, which is also + // a valid snapshot to read from. + // We shouldn't get snapshot before finding and referencing the super + // version because a flush happening in between may compact away data for + // the snapshot, but the snapshot is earlier than the data overwriting it, + // so users may see wrong results. + snapshot = last_seq_same_as_publish_seq_ + ? versions_->LastSequence() + : versions_->LastPublishedSequence(); + } + + // For each of the given keys, apply the entire "get" process as follows: + // First look in the memtable, then in the immutable memtable (if any). + // s is both in/out. When in, s could either be OK or MergeInProgress. + // merge_operands will contain the sequence of merges in the latter case. + size_t keys_left = num_keys; + while (keys_left) { + size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE) + ? MultiGetContext::MAX_BATCH_SIZE + : keys_left; + MultiGetContext ctx(&sorted_keys[num_keys - keys_left], batch_size, + snapshot); + MultiGetRange range = ctx.GetMultiGetRange(); + bool lookup_current = false; + + keys_left -= batch_size; + for (auto mget_iter = range.begin(); mget_iter != range.end(); + ++mget_iter) { + MergeContext& merge_context = mget_iter->merge_context; + merge_context.Clear(); + Status& s = *mget_iter->s; + PinnableSlice* value = mget_iter->value; + s = Status::OK(); + + bool skip_memtable = + (read_options.read_tier == kPersistedTier && + has_unpersisted_data_.load(std::memory_order_relaxed)); + bool done = false; + if (!skip_memtable) { + if (super_version->mem->Get(*(mget_iter->lkey), value->GetSelf(), &s, + &merge_context, + &mget_iter->max_covering_tombstone_seq, + read_options, callback, is_blob_index)) { + done = true; + value->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } else if (super_version->imm->Get( + *(mget_iter->lkey), value->GetSelf(), &s, &merge_context, + &mget_iter->max_covering_tombstone_seq, read_options, + callback, is_blob_index)) { + done = true; + value->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } + } + if (done) { + range.MarkKeyDone(mget_iter); + } else { + RecordTick(stats_, MEMTABLE_MISS); + lookup_current = true; + } + } + + if (lookup_current) { + PERF_TIMER_GUARD(get_from_output_files_time); + super_version->current->MultiGet(read_options, &range, callback, + is_blob_index); + } + } + + // Post processing (decrement reference counts and record statistics) + PERF_TIMER_GUARD(get_post_process_time); + size_t num_found = 0; + uint64_t bytes_read = 0; + for (KeyContext& key : key_context) { + if (key.s->ok()) { + bytes_read += key.value->size(); + num_found++; + } + } + + ReturnAndCleanupSuperVersion(cfd, super_version); + + RecordTick(stats_, NUMBER_MULTIGET_CALLS); + RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys); + RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found); + RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read); + RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read); + PERF_COUNTER_ADD(multiget_read_bytes, bytes_read); + PERF_TIMER_STOP(get_post_process_time); +} + Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family, ColumnFamilyHandle** handle) { diff --git a/db/db_impl.h b/db/db_impl.h index e834e0fbe..8b434c118 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -124,6 +124,25 @@ class DBImpl : public DB { const std::vector& keys, std::vector* values) override; + // This MultiGet is a batched version, which may be faster than calling Get + // multiple times, especially if the keys have some spatial locality that + // enables them to be queried in the same SST files/set of files. The larger + // the batch size, the more scope for batching and performance improvement + // The values and statuses parameters are arrays with number of elements + // equal to keys.size(). This allows the storage for those to be alloacted + // by the caller on the stack for small batches + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + const bool sorted_input = false) override; + + void MultiGetImpl( + const ReadOptions& options, ColumnFamilyHandle* column_family, + autovector& key_context, + bool sorted_input, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); + virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family, ColumnFamilyHandle** handle) override; diff --git a/db/db_test.cc b/db/db_test.cc index 60e66c6c3..8a112e48f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2153,6 +2153,7 @@ struct MTState { struct MTThread { MTState* state; int id; + bool multiget_batched; }; static void MTThreadBody(void* arg) { @@ -2201,8 +2202,28 @@ static void MTThreadBody(void* arg) { // same) std::vector keys(kColumnFamilies, Slice(keybuf)); std::vector values; - std::vector statuses = - db->MultiGet(ReadOptions(), t->state->test->handles_, keys, &values); + std::vector statuses; + if (!t->multiget_batched) { + statuses = db->MultiGet(ReadOptions(), t->state->test->handles_, keys, + &values); + } else { + std::vector pin_values(keys.size()); + statuses.resize(keys.size()); + const Snapshot* snapshot = db->GetSnapshot(); + ReadOptions ro; + ro.snapshot = snapshot; + for (int cf = 0; cf < kColumnFamilies; ++cf) { + db->MultiGet(ro, t->state->test->handles_[cf], 1, &keys[cf], + &pin_values[cf], &statuses[cf]); + } + db->ReleaseSnapshot(snapshot); + values.resize(keys.size()); + for (int cf = 0; cf < kColumnFamilies; ++cf) { + if (statuses[cf].ok()) { + values[cf].assign(pin_values[cf].data(), pin_values[cf].size()); + } + } + } Status s = statuses[0]; // all statuses have to be the same for (size_t i = 1; i < statuses.size(); ++i) { @@ -2244,10 +2265,13 @@ static void MTThreadBody(void* arg) { } // namespace -class MultiThreadedDBTest : public DBTest, - public ::testing::WithParamInterface { +class MultiThreadedDBTest + : public DBTest, + public ::testing::WithParamInterface> { public: - void SetUp() override { option_config_ = GetParam(); } + void SetUp() override { + std::tie(option_config_, multiget_batched_) = GetParam(); + } static std::vector GenerateOptionConfigs() { std::vector optionConfigs; @@ -2256,6 +2280,8 @@ class MultiThreadedDBTest : public DBTest, } return optionConfigs; } + + bool multiget_batched_; }; TEST_P(MultiThreadedDBTest, MultiThreaded) { @@ -2282,6 +2308,7 @@ TEST_P(MultiThreadedDBTest, MultiThreaded) { for (int id = 0; id < kNumThreads; id++) { thread[id].state = &mt; thread[id].id = id; + thread[id].multiget_batched = multiget_batched_; env_->StartThread(MTThreadBody, &thread[id]); } @@ -2299,7 +2326,9 @@ TEST_P(MultiThreadedDBTest, MultiThreaded) { INSTANTIATE_TEST_CASE_P( MultiThreaded, MultiThreadedDBTest, - ::testing::ValuesIn(MultiThreadedDBTest::GenerateOptionConfigs())); + ::testing::Combine( + ::testing::ValuesIn(MultiThreadedDBTest::GenerateOptionConfigs()), + ::testing::Bool())); #endif // ROCKSDB_LITE // Group commit test: diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 9ef82fd2e..bee6b81d5 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -780,6 +780,34 @@ std::vector DBTestBase::MultiGet(std::vector cfs, return result; } +std::vector DBTestBase::MultiGet(const std::vector& k, + const Snapshot* snapshot) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::vector keys; + std::vector result; + std::vector statuses(k.size()); + std::vector pin_values(k.size()); + + for (unsigned int i = 0; i < k.size(); ++i) { + keys.push_back(k[i]); + } + db_->MultiGet(options, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), pin_values.data(), statuses.data()); + result.resize(k.size()); + for (auto iter = result.begin(); iter != result.end(); ++iter) { + iter->assign(pin_values[iter - result.begin()].data(), + pin_values[iter - result.begin()].size()); + } + for (unsigned int i = 0; i < statuses.size(); ++i) { + if (statuses[i].IsNotFound()) { + result[i] = "NOT_FOUND"; + } + } + return result; +} + Status DBTestBase::Get(const std::string& k, PinnableSlice* v) { ReadOptions options; options.verify_checksums = true; diff --git a/db/db_test_util.h b/db/db_test_util.h index 1ba1f0a96..36f3813c9 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -842,6 +842,9 @@ class DBTestBase : public testing::Test { const std::vector& k, const Snapshot* snapshot = nullptr); + std::vector MultiGet(const std::vector& k, + const Snapshot* snapshot = nullptr); + uint64_t GetNumSnapshots(); uint64_t GetTimeOldestSnapshots(); diff --git a/db/dbformat.h b/db/dbformat.h index 7a5ddc1ad..c850adcb0 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -9,8 +9,11 @@ #pragma once #include +#include #include #include +#include "db/lookup_key.h" +#include "db/merge_context.h" #include "monitoring/perf_context_imp.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" @@ -292,52 +295,6 @@ inline uint64_t GetInternalKeySeqno(const Slice& internal_key) { return num >> 8; } -// A helper class useful for DBImpl::Get() -class LookupKey { - public: - // Initialize *this for looking up user_key at a snapshot with - // the specified sequence number. - LookupKey(const Slice& _user_key, SequenceNumber sequence); - - ~LookupKey(); - - // Return a key suitable for lookup in a MemTable. - Slice memtable_key() const { - return Slice(start_, static_cast(end_ - start_)); - } - - // Return an internal key (suitable for passing to an internal iterator) - Slice internal_key() const { - return Slice(kstart_, static_cast(end_ - kstart_)); - } - - // Return the user key - Slice user_key() const { - return Slice(kstart_, static_cast(end_ - kstart_ - 8)); - } - - private: - // We construct a char array of the form: - // klength varint32 <-- start_ - // userkey char[klength] <-- kstart_ - // tag uint64 - // <-- end_ - // The array is a suitable MemTable key. - // The suffix starting with "userkey" can be used as an InternalKey. - const char* start_; - const char* kstart_; - const char* end_; - char space_[200]; // Avoid allocation for short keys - - // No copying allowed - LookupKey(const LookupKey&); - void operator=(const LookupKey&); -}; - -inline LookupKey::~LookupKey() { - if (start_ != space_) delete[] start_; -} - class IterKey { public: IterKey() diff --git a/db/lookup_key.h b/db/lookup_key.h new file mode 100644 index 000000000..ddf4ff0e9 --- /dev/null +++ b/db/lookup_key.h @@ -0,0 +1,65 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "rocksdb/types.h" + +namespace rocksdb { + +// A helper class useful for DBImpl::Get() +class LookupKey { + public: + // Initialize *this for looking up user_key at a snapshot with + // the specified sequence number. + LookupKey(const Slice& _user_key, SequenceNumber sequence); + + ~LookupKey(); + + // Return a key suitable for lookup in a MemTable. + Slice memtable_key() const { + return Slice(start_, static_cast(end_ - start_)); + } + + // Return an internal key (suitable for passing to an internal iterator) + Slice internal_key() const { + return Slice(kstart_, static_cast(end_ - kstart_)); + } + + // Return the user key + Slice user_key() const { + return Slice(kstart_, static_cast(end_ - kstart_ - 8)); + } + + private: + // We construct a char array of the form: + // klength varint32 <-- start_ + // userkey char[klength] <-- kstart_ + // tag uint64 + // <-- end_ + // The array is a suitable MemTable key. + // The suffix starting with "userkey" can be used as an InternalKey. + const char* start_; + const char* kstart_; + const char* end_; + char space_[200]; // Avoid allocation for short keys + + // No copying allowed + LookupKey(const LookupKey&); + void operator=(const LookupKey&); +}; + +inline LookupKey::~LookupKey() { + if (start_ != space_) delete[] start_; +} + +} // namespace rocksdb diff --git a/db/merge_context.h b/db/merge_context.h index fd06441f7..884682eec 100644 --- a/db/merge_context.h +++ b/db/merge_context.h @@ -4,9 +4,10 @@ // (found in the LICENSE.Apache file in the root directory). // #pragma once +#include +#include #include #include -#include "db/dbformat.h" #include "rocksdb/slice.h" namespace rocksdb { diff --git a/db/table_cache.cc b/db/table_cache.cc index 764c05bfa..2eb742e24 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -19,6 +19,7 @@ #include "table/get_context.h" #include "table/internal_iterator.h" #include "table/iterator_wrapper.h" +#include "table/multiget_context.h" #include "table/table_builder.h" #include "table/table_reader.h" #include "util/coding.h" @@ -418,6 +419,66 @@ Status TableCache::Get(const ReadOptions& options, return s; } +// Batched version of TableCache::MultiGet. +// TODO: Add support for row cache. As of now, this ignores the row cache +// and directly looks up in the table files +Status TableCache::MultiGet(const ReadOptions& options, + const InternalKeyComparator& internal_comparator, + const FileMetaData& file_meta, + const MultiGetContext::Range* mget_range, + const SliceTransform* prefix_extractor, + HistogramImpl* file_read_hist, bool skip_filters, + int level) { + auto& fd = file_meta.fd; + Status s; + TableReader* t = fd.table_reader; + Cache::Handle* handle = nullptr; + if (s.ok()) { + if (t == nullptr) { + s = FindTable( + env_options_, internal_comparator, fd, &handle, prefix_extractor, + options.read_tier == kBlockCacheTier /* no_io */, + true /* record_read_stats */, file_read_hist, skip_filters, level); + if (s.ok()) { + t = GetTableReaderFromHandle(handle); + assert(t); + } + } + if (s.ok() && !options.ignore_range_deletions) { + std::unique_ptr range_del_iter( + t->NewRangeTombstoneIterator(options)); + if (range_del_iter != nullptr) { + for (auto iter = mget_range->begin(); iter != mget_range->end(); + ++iter) { + const Slice& k = iter->ikey; + SequenceNumber* max_covering_tombstone_seq = + iter->get_context->max_covering_tombstone_seq(); + *max_covering_tombstone_seq = std::max( + *max_covering_tombstone_seq, + range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k))); + } + } + } + if (s.ok()) { + t->MultiGet(options, mget_range, prefix_extractor, skip_filters); + } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) { + for (auto iter = mget_range->begin(); iter != mget_range->end(); ++iter) { + Status* status = iter->s; + if (status->IsIncomplete()) { + // Couldn't find Table in cache but treat as kFound if no_io set + iter->get_context->MarkKeyMayExist(); + s = Status::OK(); + } + } + } + } + + if (handle != nullptr) { + ReleaseHandle(handle); + } + return s; +} + Status TableCache::GetTableProperties( const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, diff --git a/db/table_cache.h b/db/table_cache.h index 180ebc6bd..1e96dfa1b 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -76,6 +76,14 @@ class TableCache { HistogramImpl* file_read_hist = nullptr, bool skip_filters = false, int level = -1); + Status MultiGet(const ReadOptions& options, + const InternalKeyComparator& internal_comparator, + const FileMetaData& file_meta, + const MultiGetContext::Range* mget_range, + const SliceTransform* prefix_extractor = nullptr, + HistogramImpl* file_read_hist = nullptr, + bool skip_filters = false, int level = -1); + // Evict any entry for the specified file number static void Evict(Cache* cache, uint64_t file_number); diff --git a/db/version_set.cc b/db/version_set.cc index 4e8026e96..08286c41a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,7 @@ #include "table/internal_iterator.h" #include "table/merging_iterator.h" #include "table/meta_blocks.h" +#include "table/multiget_context.h" #include "table/plain_table_factory.h" #include "table/table_reader.h" #include "table/two_level_iterator.h" @@ -345,6 +347,407 @@ class FilePicker { return false; } }; + +class FilePickerMultiGet { + private: + struct FilePickerContext; + + public: + FilePickerMultiGet(std::vector* files, MultiGetRange* range, + autovector* file_levels, + unsigned int num_levels, FileIndexer* file_indexer, + const Comparator* user_comparator, + const InternalKeyComparator* internal_comparator) + : num_levels_(num_levels), + curr_level_(static_cast(-1)), + returned_file_level_(static_cast(-1)), + hit_file_level_(static_cast(-1)), + range_(range), + batch_iter_(range->begin()), + batch_iter_prev_(range->begin()), + maybe_repeat_key_(false), + current_level_range_(*range, range->begin(), range->end()), + current_file_range_(*range, range->begin(), range->end()), +#ifndef NDEBUG + files_(files), +#endif + level_files_brief_(file_levels), + is_hit_file_last_in_level_(false), + curr_file_level_(nullptr), + file_indexer_(file_indexer), + user_comparator_(user_comparator), + internal_comparator_(internal_comparator) { +#ifdef NDEBUG + (void)files; +#endif + for (auto iter = range_->begin(); iter != range_->end(); ++iter) { + fp_ctx_array_[iter.index()] = + FilePickerContext(0, FileIndexer::kLevelMaxIndex); + } + + // Setup member variables to search first level. + search_ended_ = !PrepareNextLevel(); + if (!search_ended_) { + // REVISIT + // Prefetch Level 0 table data to avoid cache miss if possible. + // As of now, only PlainTableReader and CuckooTableReader do any + // prefetching. This may not be necessary anymore once we implement + // batching in those table readers + for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) { + auto* r = (*level_files_brief_)[0].files[i].fd.table_reader; + if (r) { + for (auto iter = range_->begin(); iter != range_->end(); ++iter) { + r->Prepare(iter->ikey); + } + } + } + } + } + + int GetCurrentLevel() const { return curr_level_; } + + // Iterates through files in the current level until it finds a file that + // contains atleast one key from the MultiGet batch + bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range, + size_t* file_index, FdWithKeyRange** fd, + bool* is_last_key_in_file) { + size_t curr_file_index = *file_index; + FdWithKeyRange* f = nullptr; + bool file_hit = false; + int cmp_largest = -1; + if (curr_file_index >= curr_file_level_->num_files) { + return false; + } + // Loops over keys in the MultiGet batch until it finds a file with + // atleast one of the keys. Then it keeps moving forward until the + // last key in the batch that falls in that file + while (batch_iter_ != current_level_range_.end() && + (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level == + curr_file_index || + !file_hit)) { + struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()]; + f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level]; + Slice& user_key = batch_iter_->ukey; + + // Do key range filtering of files or/and fractional cascading if: + // (1) not all the files are in level 0, or + // (2) there are more than 3 current level files + // If there are only 3 or less current level files in the system, we + // skip the key range filtering. In this case, more likely, the system + // is highly tuned to minimize number of tables queried by each query, + // so it is unlikely that key range filtering is more efficient than + // querying the files. + if (num_levels_ > 1 || curr_file_level_->num_files > 3) { + // Check if key is within a file's range. If search left bound and + // right bound point to the same find, we are sure key falls in + // range. + assert(curr_level_ == 0 || + fp_ctx.curr_index_in_curr_level == + fp_ctx.start_index_in_curr_level || + user_comparator_->Compare(user_key, + ExtractUserKey(f->smallest_key)) <= 0); + + int cmp_smallest = user_comparator_->Compare( + user_key, ExtractUserKey(f->smallest_key)); + if (cmp_smallest >= 0) { + cmp_largest = user_comparator_->Compare( + user_key, ExtractUserKey(f->largest_key)); + } else { + cmp_largest = -1; + } + + // Setup file search bound for the next level based on the + // comparison results + if (curr_level_ > 0) { + file_indexer_->GetNextLevelIndex( + curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest, + cmp_largest, &fp_ctx.search_left_bound, + &fp_ctx.search_right_bound); + } + // Key falls out of current file's range + if (cmp_smallest < 0 || cmp_largest > 0) { + next_file_range->SkipKey(batch_iter_); + } else { + file_hit = true; + } + } else { + file_hit = true; + } +#ifndef NDEBUG + // Sanity check to make sure that the files are correctly sorted + if (f != prev_file_) { + if (prev_file_) { + if (curr_level_ != 0) { + int comp_sign = internal_comparator_->Compare( + prev_file_->largest_key, f->smallest_key); + assert(comp_sign < 0); + } else if (fp_ctx.curr_index_in_curr_level > 0) { + // level == 0, the current file cannot be newer than the previous + // one. Use compressed data structure, has no attribute seqNo + assert(!NewestFirstBySeqNo( + files_[0][fp_ctx.curr_index_in_curr_level], + files_[0][fp_ctx.curr_index_in_curr_level - 1])); + } + } + prev_file_ = f; + } +#endif + if (cmp_largest == 0) { + // cmp_largest is 0, which means the next key will not be in this + // file, so stop looking further. Also don't increment megt_iter_ + // as we may have to look for this key in the next file if we don't + // find it in this one + break; + } else { + if (curr_level_ == 0) { + // We need to look through all files in level 0 + ++fp_ctx.curr_index_in_curr_level; + } + ++batch_iter_; + } + if (!file_hit) { + curr_file_index = + (batch_iter_ != current_level_range_.end()) + ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level + : curr_file_level_->num_files; + } + } + + *fd = f; + *file_index = curr_file_index; + *is_last_key_in_file = cmp_largest == 0; + return file_hit; + } + + FdWithKeyRange* GetNextFile() { + while (!search_ended_) { + // Start searching next level. + if (batch_iter_ == current_level_range_.end()) { + search_ended_ = !PrepareNextLevel(); + continue; + } else { + if (maybe_repeat_key_) { + maybe_repeat_key_ = false; + // Check if we found the final value for the last key in the + // previous lookup range. If we did, then there's no need to look + // any further for that key, so advance batch_iter_. Else, keep + // batch_iter_ positioned on that key so we look it up again in + // the next file + if (current_level_range_.CheckKeyDone(batch_iter_)) { + ++batch_iter_; + } + } + // batch_iter_prev_ will become the start key for the next file + // lookup + batch_iter_prev_ = batch_iter_; + } + + MultiGetRange next_file_range(current_level_range_, batch_iter_prev_, + current_level_range_.end()); + size_t curr_file_index = + (batch_iter_ != current_level_range_.end()) + ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level + : curr_file_level_->num_files; + FdWithKeyRange* f; + bool is_last_key_in_file; + if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f, + &is_last_key_in_file)) { + search_ended_ = !PrepareNextLevel(); + } else { + MultiGetRange::Iterator upper_key = batch_iter_; + if (is_last_key_in_file) { + // Since cmp_largest is 0, batch_iter_ still points to the last key + // that falls in this file, instead of the next one. Increment + // upper_key so we can set the range properly for SST MultiGet + ++upper_key; + ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level); + maybe_repeat_key_ = true; + } + // Set the range for this file + current_file_range_ = + MultiGetRange(next_file_range, batch_iter_prev_, upper_key); + returned_file_level_ = curr_level_; + hit_file_level_ = curr_level_; + is_hit_file_last_in_level_ = + curr_file_index == curr_file_level_->num_files - 1; + return f; + } + } + + // Search ended + return nullptr; + } + + // getter for current file level + // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts + unsigned int GetHitFileLevel() { return hit_file_level_; } + + // Returns true if the most recent "hit file" (i.e., one returned by + // GetNextFile()) is at the last index in its level. + bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } + + const MultiGetRange& CurrentFileRange() { return current_file_range_; } + + private: + unsigned int num_levels_; + unsigned int curr_level_; + unsigned int returned_file_level_; + unsigned int hit_file_level_; + + struct FilePickerContext { + int32_t search_left_bound; + int32_t search_right_bound; + unsigned int curr_index_in_curr_level; + unsigned int start_index_in_curr_level; + + FilePickerContext(int32_t left, int32_t right) + : search_left_bound(left), search_right_bound(right) {} + + FilePickerContext() = default; + }; + std::array fp_ctx_array_; + MultiGetRange* range_; + // Iterator to iterate through the keys in a MultiGet batch, that gets reset + // at the beginning of each level. Each call to GetNextFile() will position + // batch_iter_ at or right after the last key that was found in the returned + // SST file + MultiGetRange::Iterator batch_iter_; + // An iterator that records the previous position of batch_iter_, i.e last + // key found in the previous SST file, in order to serve as the start of + // the batch key range for the next SST file + MultiGetRange::Iterator batch_iter_prev_; + bool maybe_repeat_key_; + MultiGetRange current_level_range_; + MultiGetRange current_file_range_; +#ifndef NDEBUG + std::vector* files_; +#endif + autovector* level_files_brief_; + bool search_ended_; + bool is_hit_file_last_in_level_; + LevelFilesBrief* curr_file_level_; + FileIndexer* file_indexer_; + const Comparator* user_comparator_; + const InternalKeyComparator* internal_comparator_; +#ifndef NDEBUG + FdWithKeyRange* prev_file_; +#endif + + // Setup local variables to search next level. + // Returns false if there are no more levels to search. + bool PrepareNextLevel() { + if (curr_level_ == 0) { + MultiGetRange::Iterator mget_iter = current_level_range_.begin(); + if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level < + curr_file_level_->num_files) { +#ifndef NDEBUG + prev_file_ = nullptr; +#endif + batch_iter_prev_ = current_level_range_.begin(); + batch_iter_ = current_level_range_.begin(); + return true; + } + } + + curr_level_++; + // Reset key range to saved value + while (curr_level_ < num_levels_) { + bool level_contains_keys = false; + curr_file_level_ = &(*level_files_brief_)[curr_level_]; + if (curr_file_level_->num_files == 0) { + // When current level is empty, the search bound generated from upper + // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is + // also empty. + + for (auto mget_iter = current_level_range_.begin(); + mget_iter != current_level_range_.end(); ++mget_iter) { + struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()]; + + assert(fp_ctx.search_left_bound == 0); + assert(fp_ctx.search_right_bound == -1 || + fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex); + // Since current level is empty, it will need to search all files in + // the next level + fp_ctx.search_left_bound = 0; + fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex; + } + // Skip all subsequent empty levels + while ((*level_files_brief_)[++curr_level_].num_files == 0) { + } + } + + // Some files may overlap each other. We find + // all files that overlap user_key and process them in order from + // newest to oldest. In the context of merge-operator, this can occur at + // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes + // are always compacted into a single entry). + int32_t start_index = -1; + current_level_range_ = + MultiGetRange(*range_, range_->begin(), range_->end()); + for (auto mget_iter = current_level_range_.begin(); + mget_iter != current_level_range_.end(); ++mget_iter) { + struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()]; + if (curr_level_ == 0) { + // On Level-0, we read through all files to check for overlap. + start_index = 0; + level_contains_keys = true; + } else { + // On Level-n (n>=1), files are sorted. Binary search to find the + // earliest file whose largest key >= ikey. Search left bound and + // right bound are used to narrow the range. + if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) { + if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) { + fp_ctx.search_right_bound = + static_cast(curr_file_level_->num_files) - 1; + } + // `search_right_bound_` is an inclusive upper-bound, but since it + // was determined based on user key, it is still possible the lookup + // key falls to the right of `search_right_bound_`'s corresponding + // file. So, pass a limit one higher, which allows us to detect this + // case. + Slice& ikey = mget_iter->ikey; + start_index = FindFileInRange( + *internal_comparator_, *curr_file_level_, ikey, + static_cast(fp_ctx.search_left_bound), + static_cast(fp_ctx.search_right_bound) + 1); + if (start_index == fp_ctx.search_right_bound + 1) { + // `ikey_` comes after `search_right_bound_`. The lookup key does + // not exist on this level, so let's skip this level and do a full + // binary search on the next level. + fp_ctx.search_left_bound = 0; + fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex; + current_level_range_.SkipKey(mget_iter); + continue; + } else { + level_contains_keys = true; + } + } else { + // search_left_bound > search_right_bound, key does not exist in + // this level. Since no comparison is done in this level, it will + // need to search all files in the next level. + fp_ctx.search_left_bound = 0; + fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex; + current_level_range_.SkipKey(mget_iter); + continue; + } + } + fp_ctx.start_index_in_curr_level = start_index; + fp_ctx.curr_index_in_curr_level = start_index; + } + if (level_contains_keys) { +#ifndef NDEBUG + prev_file_ = nullptr; +#endif + batch_iter_prev_ = current_level_range_.begin(); + batch_iter_ = current_level_range_.begin(); + return true; + } + curr_level_++; + } + // curr_level_ = num_levels_. So, no more levels to search. + return false; + } +}; } // anonymous namespace VersionStorageInfo::~VersionStorageInfo() { delete[] files_; } @@ -1318,6 +1721,163 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } } +void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, + ReadCallback* callback, bool* is_blob) { + PinnedIteratorsManager pinned_iters_mgr; + + // Pin blocks that we read to hold merge operands + if (merge_operator_) { + pinned_iters_mgr.StartPinning(); + } + + // Even though we know the batch size won't be > MAX_BATCH_SIZE, + // use autovector in order to avoid unnecessary construction of GetContext + // objects, which is expensive + autovector get_ctx; + for (auto iter = range->begin(); iter != range->end(); ++iter) { + assert(iter->s->ok() || iter->s->IsMergeInProgress()); + get_ctx.emplace_back( + user_comparator(), merge_operator_, info_log_, db_statistics_, + iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey, + iter->value, nullptr, &(iter->merge_context), + &iter->max_covering_tombstone_seq, this->env_, &iter->seq, + merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob); + iter->get_context = &get_ctx.back(); + } + + MultiGetRange file_picker_range(*range, range->begin(), range->end()); + FilePickerMultiGet fp( + storage_info_.files_, &file_picker_range, + &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_, + &storage_info_.file_indexer_, user_comparator(), internal_comparator()); + FdWithKeyRange* f = fp.GetNextFile(); + + while (f != nullptr) { + MultiGetRange file_range = fp.CurrentFileRange(); + bool timer_enabled = + GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && + get_perf_context()->per_level_perf_context_enabled; + StopWatchNano timer(env_, timer_enabled /* auto_start */); + Status s = table_cache_->MultiGet( + read_options, *internal_comparator(), *f->file_metadata, &file_range, + mutable_cf_options_.prefix_extractor.get(), + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), + IsFilterSkipped(static_cast(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()), + fp.GetCurrentLevel()); + // TODO: examine the behavior for corrupted key + if (timer_enabled) { + PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), + fp.GetCurrentLevel()); + } + if (!s.ok()) { + // TODO: Set status for individual keys appropriately + for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { + *iter->s = s; + file_range.MarkKeyDone(iter); + } + return; + } + uint64_t batch_size = 0; + for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { + GetContext& get_context = *iter->get_context; + Status* status = iter->s; + + if (get_context.sample()) { + sample_file_read_inc(f->file_metadata); + } + batch_size++; + // report the counters before returning + if (get_context.State() != GetContext::kNotFound && + get_context.State() != GetContext::kMerge && + db_statistics_ != nullptr) { + get_context.ReportCounters(); + } else { + if (iter->max_covering_tombstone_seq > 0) { + // The remaining files we look at will only contain covered keys, so + // we stop here for this key + file_picker_range.SkipKey(iter); + } + } + switch (get_context.State()) { + case GetContext::kNotFound: + // Keep searching in other files + break; + case GetContext::kMerge: + // TODO: update per-level perfcontext user_key_return_count for kMerge + break; + case GetContext::kFound: + if (fp.GetHitFileLevel() == 0) { + RecordTick(db_statistics_, GET_HIT_L0); + } else if (fp.GetHitFileLevel() == 1) { + RecordTick(db_statistics_, GET_HIT_L1); + } else if (fp.GetHitFileLevel() >= 2) { + RecordTick(db_statistics_, GET_HIT_L2_AND_UP); + } + PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, + fp.GetHitFileLevel()); + file_range.MarkKeyDone(iter); + continue; + case GetContext::kDeleted: + // Use empty error message for speed + *status = Status::NotFound(); + file_range.MarkKeyDone(iter); + continue; + case GetContext::kCorrupt: + *status = + Status::Corruption("corrupted key for ", iter->lkey->user_key()); + file_range.MarkKeyDone(iter); + continue; + case GetContext::kBlobIndex: + ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); + *status = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + file_range.MarkKeyDone(iter); + continue; + } + } + RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size); + if (file_picker_range.empty()) { + break; + } + f = fp.GetNextFile(); + } + + // Process any left over keys + for (auto iter = range->begin(); iter != range->end(); ++iter) { + GetContext& get_context = *iter->get_context; + Status* status = iter->s; + Slice user_key = iter->lkey->user_key(); + + if (db_statistics_ != nullptr) { + get_context.ReportCounters(); + } + if (GetContext::kMerge == get_context.State()) { + if (!merge_operator_) { + *status = Status::InvalidArgument( + "merge_operator is not properly initialized."); + range->MarkKeyDone(iter); + continue; + } + // merge_operands are in saver and we hit the beginning of the key history + // do a final merge of nullptr and operands; + std::string* str_value = + iter->value != nullptr ? iter->value->GetSelf() : nullptr; + *status = MergeHelper::TimedFullMerge( + merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(), + str_value, info_log_, db_statistics_, env_, + nullptr /* result_operand */, true); + if (LIKELY(iter->value != nullptr)) { + iter->value->PinSelf(); + } + } else { + range->MarkKeyDone(iter); + *status = Status::NotFound(); // Use an empty error message for speed + } + } +} + bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) { // Reaching the bottom level implies misses at all upper levels, so we'll // skip checking the filters when we predict a hit. diff --git a/db/version_set.h b/db/version_set.h index 16b7b4347..b2b6cc863 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -44,6 +44,8 @@ #include "options/db_options.h" #include "port/port.h" #include "rocksdb/env.h" +#include "table/get_context.h" +#include "table/multiget_context.h" namespace rocksdb { @@ -552,6 +554,7 @@ class VersionStorageInfo { void operator=(const VersionStorageInfo&) = delete; }; +using MultiGetRange = MultiGetContext::Range; class Version { public: // Append to *iters a sequence of iterators that will @@ -593,6 +596,9 @@ class Version { SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr, bool* is_blob = nullptr); + void MultiGet(const ReadOptions&, MultiGetRange* range, + ReadCallback* callback = nullptr, bool* is_blob = nullptr); + // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. void PrepareApply(const MutableCFOptions& mutable_cf_options, diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index b40af20e2..8bec4a56f 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -421,6 +421,46 @@ class DB { keys, values); } + // Overloaded MultiGet API that improves performance by batching operations + // in the read path for greater efficiency. Currently, only the block based + // table format with full filters are supported. Other table formats such + // as plain table, block based table with block based filters and + // partitioned indexes will still work, but will not get any performance + // benefits. + // Parameters - + // options - ReadOptions + // column_family - ColumnFamilyHandle* that the keys belong to. All the keys + // passed to the API are restricted to a single column family + // num_keys - Number of keys to lookup + // keys - Pointer to C style array of key Slices with num_keys elements + // values - Pointer to C style array of PinnableSlices with num_keys elements + // statuses - Pointer to C style array of Status with num_keys elements + // sorted_input - If true, it means the input keys are already sorted by key + // order, so the MultiGet() API doesn't have to sort them + // again. If false, the keys will be copied and sorted + // internally by the API - the input array will not be + // modified + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + const bool /*sorted_input*/ = false) { + std::vector cf; + std::vector user_keys; + std::vector status; + std::vector vals; + + for (size_t i = 0; i < num_keys; ++i) { + cf.emplace_back(column_family); + user_keys.emplace_back(keys[i]); + } + status = MultiGet(options, cf, user_keys, &vals); + std::copy(status.begin(), status.end(), statuses); + for (auto& value : vals) { + values->PinSelf(value); + values++; + } + } // If the key definitely does not exist in the database, then this method // returns false, else true. If the caller wants to obtain value when the key // is found in memory, a bool for 'value_found' must be passed. 'value_found' diff --git a/include/rocksdb/filter_policy.h b/include/rocksdb/filter_policy.h index 5d465b782..d2da38b51 100644 --- a/include/rocksdb/filter_policy.h +++ b/include/rocksdb/filter_policy.h @@ -24,6 +24,8 @@ #include #include #include +#include "db/dbformat.h" +#include "table/multiget_context.h" namespace rocksdb { @@ -64,12 +66,20 @@ class FilterBitsBuilder { // A class that checks if a key can be in filter // It should be initialized by Slice generated by BitsBuilder +using MultiGetRange = MultiGetContext::Range; class FilterBitsReader { public: virtual ~FilterBitsReader() {} // Check if the entry match the bits in filter virtual bool MayMatch(const Slice& entry) = 0; + + // Check if an array of entries match the bits in filter + virtual void MayMatch(int num_keys, Slice** keys, bool* may_match) { + for (int i = 0; i < num_keys; ++i) { + may_match[i] = MayMatch(*keys[i]); + } + } }; // We add a new format of filter block called full filter block diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index bad1c87ec..3b2b2e048 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -81,6 +81,8 @@ enum Tickers : uint32_t { // exist. BLOOM_FILTER_FULL_TRUE_POSITIVE, + BLOOM_FILTER_MICROS, + // # persistent cache hit PERSISTENT_CACHE_HIT, // # persistent cache miss @@ -424,6 +426,7 @@ enum Histograms : uint32_t { BLOB_DB_DECOMPRESSION_MICROS, // Time spent flushing memtable to disk FLUSH_TIME, + SST_BATCH_SIZE, HISTOGRAM_ENUM_MAX, }; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 8fef9b3e8..dd3e09016 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -96,6 +96,15 @@ class StackableDB : public DB { return db_->MultiGet(options, column_family, keys, values); } + virtual void MultiGet(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const size_t num_keys, const Slice* keys, + PinnableSlice* values, Status* statuses, + const bool sorted_input = false) override { + return db_->MultiGet(options, column_family, num_keys, keys, + values, statuses, sorted_input); + } + using DB::IngestExternalFile; virtual Status IngestExternalFile( ColumnFamilyHandle* column_family, diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index adb8cbfed..fe2f2e25a 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -45,6 +45,7 @@ const std::vector> TickersNameMap = { {BLOOM_FILTER_FULL_POSITIVE, "rocksdb.bloom.filter.full.positive"}, {BLOOM_FILTER_FULL_TRUE_POSITIVE, "rocksdb.bloom.filter.full.true.positive"}, + {BLOOM_FILTER_MICROS, "rocksdb.bloom.filter.micros"}, {PERSISTENT_CACHE_HIT, "rocksdb.persistent.cache.hit"}, {PERSISTENT_CACHE_MISS, "rocksdb.persistent.cache.miss"}, {SIM_BLOCK_CACHE_HIT, "rocksdb.sim.block.cache.hit"}, @@ -228,6 +229,7 @@ const std::vector> HistogramsNameMap = { {BLOB_DB_COMPRESSION_MICROS, "rocksdb.blobdb.compression.micros"}, {BLOB_DB_DECOMPRESSION_MICROS, "rocksdb.blobdb.decompression.micros"}, {FLUSH_TIME, "rocksdb.db.flush.micros"}, + {SST_BATCH_SIZE, "rocksdb.sst.batch.size"}, }; std::shared_ptr CreateDBStatistics() { diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index dc2d4263e..b58960a85 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -39,6 +39,7 @@ #include "table/get_context.h" #include "table/internal_iterator.h" #include "table/meta_blocks.h" +#include "table/multiget_context.h" #include "table/partitioned_filter_block.h" #include "table/persistent_cache_helper.h" #include "table/sst_file_writer_collectors.h" @@ -2623,6 +2624,29 @@ bool BlockBasedTable::FullFilterKeyMayMatch( return may_match; } +void BlockBasedTable::FullFilterKeysMayMatch( + const ReadOptions& read_options, FilterBlockReader* filter, + MultiGetRange* range, const bool no_io, + const SliceTransform* prefix_extractor) const { + if (filter == nullptr || filter->IsBlockBased()) { + return; + } + if (filter->whole_key_filtering()) { + filter->KeysMayMatch(range, prefix_extractor, kNotValid, no_io); + } else if (!read_options.total_order_seek && prefix_extractor && + rep_->table_properties->prefix_extractor_name.compare( + prefix_extractor->Name()) == 0) { + for (auto iter = range->begin(); iter != range->end(); ++iter) { + Slice user_key = iter->lkey->user_key(); + + if (!prefix_extractor->InDomain(user_key)) { + range->SkipKey(iter); + } + } + filter->PrefixesMayMatch(range, prefix_extractor, kNotValid, false); + } +} + Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, GetContext* get_context, const SliceTransform* prefix_extractor, @@ -2631,17 +2655,22 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, Status s; const bool no_io = read_options.read_tier == kBlockCacheTier; CachableEntry filter_entry; - if (!skip_filters) { - filter_entry = - GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr, - read_options.read_tier == kBlockCacheTier, get_context); - } - FilterBlockReader* filter = filter_entry.value; + bool may_match; + FilterBlockReader* filter = nullptr; + { + if (!skip_filters) { + filter_entry = + GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr, + read_options.read_tier == kBlockCacheTier, get_context); + } + filter = filter_entry.value; - // First check the full filter - // If full filter not useful, Then go into each block - if (!FullFilterKeyMayMatch(read_options, filter, key, no_io, - prefix_extractor)) { + // First check the full filter + // If full filter not useful, Then go into each block + may_match = FullFilterKeyMayMatch(read_options, filter, key, no_io, + prefix_extractor); + } + if (!may_match) { RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL); PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level); } else { @@ -2747,6 +2776,122 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, return s; } +using MultiGetRange = MultiGetContext::Range; +void BlockBasedTable::MultiGet(const ReadOptions& read_options, + const MultiGetRange* mget_range, + const SliceTransform* prefix_extractor, + bool skip_filters) { + const bool no_io = read_options.read_tier == kBlockCacheTier; + CachableEntry filter_entry; + FilterBlockReader* filter = nullptr; + MultiGetRange sst_file_range(*mget_range, mget_range->begin(), + mget_range->end()); + { + if (!skip_filters) { + // TODO: Figure out where the stats should go + filter_entry = GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr, + read_options.read_tier == kBlockCacheTier, + nullptr /*get_context*/); + } + filter = filter_entry.value; + + // First check the full filter + // If full filter not useful, Then go into each block + FullFilterKeysMayMatch(read_options, filter, &sst_file_range, no_io, + prefix_extractor); + } + if (skip_filters || !sst_file_range.empty()) { + IndexBlockIter iiter_on_stack; + // if prefix_extractor found in block differs from options, disable + // BlockPrefixIndex. Only do this check when index_type is kHashSearch. + bool need_upper_bound_check = false; + if (rep_->index_type == BlockBasedTableOptions::kHashSearch) { + need_upper_bound_check = PrefixExtractorChanged( + rep_->table_properties.get(), prefix_extractor); + } + auto iiter = NewIndexIterator( + read_options, need_upper_bound_check, &iiter_on_stack, + /* index_entry */ nullptr, sst_file_range.begin()->get_context); + std::unique_ptr> iiter_unique_ptr; + if (iiter != &iiter_on_stack) { + iiter_unique_ptr.reset(iiter); + } + + for (auto miter = sst_file_range.begin(); miter != sst_file_range.end(); + ++miter) { + Status s; + GetContext* get_context = miter->get_context; + const Slice& key = miter->ikey; + bool matched = false; // if such user key matched a key in SST + bool done = false; + for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { + DataBlockIter biter; + NewDataBlockIterator( + rep_, read_options, iiter->value(), &biter, false, + true /* key_includes_seq */, get_context); + + if (read_options.read_tier == kBlockCacheTier && + biter.status().IsIncomplete()) { + // couldn't get block from block_cache + // Update Saver.state to Found because we are only looking for + // whether we can guarantee the key is not there when "no_io" is set + get_context->MarkKeyMayExist(); + break; + } + if (!biter.status().ok()) { + s = biter.status(); + break; + } + + bool may_exist = biter.SeekForGet(key); + if (!may_exist) { + // HashSeek cannot find the key this block and the the iter is not + // the end of the block, i.e. cannot be in the following blocks + // either. In this case, the seek_key cannot be found, so we break + // from the top level for-loop. + break; + } + + // Call the *saver function on each entry/block until it returns false + for (; biter.Valid(); biter.Next()) { + ParsedInternalKey parsed_key; + if (!ParseInternalKey(biter.key(), &parsed_key)) { + s = Status::Corruption(Slice()); + } + + if (!get_context->SaveValue( + parsed_key, biter.value(), &matched, + biter.IsValuePinned() ? &biter : nullptr)) { + done = true; + break; + } + } + s = biter.status(); + if (done) { + // Avoid the extra Next which is expensive in two-level indexes + break; + } + } + if (matched && filter != nullptr && !filter->IsBlockBased()) { + RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_TRUE_POSITIVE); + PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1, + rep_->level); + } + if (s.ok()) { + s = iiter->status(); + } + *(miter->s) = s; + } + } + + // if rep_->filter_entry is not set, we should call Release(); otherwise + // don't call, in this case we have a local copy in rep_->filter_entry, + // it's pinned to the cache and will be released in the destructor + if (!rep_->filter_entry.IsSet()) { + filter_entry.Release(rep_->table_options.block_cache.get()); + } +} + Status BlockBasedTable::Prefetch(const Slice* const begin, const Slice* const end) { auto& comparator = rep_->internal_comparator; diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index f0b5cdb1b..e75a71cda 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -27,6 +27,8 @@ #include "table/block_based_table_factory.h" #include "table/filter_block.h" #include "table/format.h" +#include "table/get_context.h" +#include "table/multiget_context.h" #include "table/persistent_cache_helper.h" #include "table/table_properties_internal.h" #include "table/table_reader.h" @@ -121,6 +123,11 @@ class BlockBasedTable : public TableReader { GetContext* get_context, const SliceTransform* prefix_extractor, bool skip_filters = false) override; + void MultiGet(const ReadOptions& readOptions, + const MultiGetContext::Range* mget_range, + const SliceTransform* prefix_extractor, + bool skip_filters = false) override; + // Pre-fetch the disk blocks that correspond to the key range specified by // (kbegin, kend). The call will return error status in the event of // IO or iteration error. @@ -355,6 +362,11 @@ class BlockBasedTable : public TableReader { const Slice& user_key, const bool no_io, const SliceTransform* prefix_extractor = nullptr) const; + void FullFilterKeysMayMatch( + const ReadOptions& read_options, FilterBlockReader* filter, + MultiGetRange* range, const bool no_io, + const SliceTransform* prefix_extractor = nullptr) const; + static Status PrefetchTail( RandomAccessFileReader* file, uint64_t file_size, TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all, diff --git a/table/filter_block.h b/table/filter_block.h index a93049547..fe1678908 100644 --- a/table/filter_block.h +++ b/table/filter_block.h @@ -99,6 +99,19 @@ class FilterBlockReader { const bool no_io = false, const Slice* const const_ikey_ptr = nullptr) = 0; + virtual void KeysMayMatch(MultiGetRange* range, + const SliceTransform* prefix_extractor, + uint64_t block_offset = kNotValid, + const bool no_io = false) { + for (auto iter = range->begin(); iter != range->end(); ++iter) { + const Slice ukey = iter->ukey; + const Slice ikey = iter->ikey; + if (!KeyMayMatch(ukey, prefix_extractor, block_offset, no_io, &ikey)) { + range->SkipKey(iter); + } + } + } + /** * no_io and const_ikey_ptr here means the same as in KeyMayMatch */ @@ -108,6 +121,20 @@ class FilterBlockReader { const bool no_io = false, const Slice* const const_ikey_ptr = nullptr) = 0; + virtual void PrefixesMayMatch(MultiGetRange* range, + const SliceTransform* prefix_extractor, + uint64_t block_offset = kNotValid, + const bool no_io = false) { + for (auto iter = range->begin(); iter != range->end(); ++iter) { + const Slice ukey = iter->ukey; + const Slice ikey = iter->ikey; + if (!KeyMayMatch(prefix_extractor->Transform(ukey), prefix_extractor, + block_offset, no_io, &ikey)) { + range->SkipKey(iter); + } + } + } + virtual size_t ApproximateMemoryUsage() const = 0; virtual size_t size() const { return size_; } virtual Statistics* statistics() const { return statistics_; } diff --git a/table/full_filter_block.cc b/table/full_filter_block.cc index a7491a716..34012fd82 100644 --- a/table/full_filter_block.cc +++ b/table/full_filter_block.cc @@ -159,6 +159,59 @@ bool FullFilterBlockReader::MayMatch(const Slice& entry) { return true; // remain the same with block_based filter } +void FullFilterBlockReader::KeysMayMatch( + MultiGetRange* range, const SliceTransform* /*prefix_extractor*/, + uint64_t block_offset, const bool /*no_io*/) { +#ifdef NDEBUG + (void)range; + (void)block_offset; +#endif + assert(block_offset == kNotValid); + if (!whole_key_filtering_) { + // Simply return. Don't skip any key - consider all keys as likely to be + // present + return; + } + MayMatch(range); +} + +void FullFilterBlockReader::PrefixesMayMatch( + MultiGetRange* range, const SliceTransform* /* prefix_extractor */, + uint64_t block_offset, const bool /*no_io*/) { +#ifdef NDEBUG + (void)range; + (void)block_offset; +#endif + assert(block_offset == kNotValid); + MayMatch(range); +} + +void FullFilterBlockReader::MayMatch(MultiGetRange* range) { + if (contents_.size() == 0) { + return; + } + + // We need to use an array instead of autovector for may_match since + // &may_match[0] doesn't work for autovector (compiler error). So + // declare both keys and may_match as arrays, which is also slightly less + // expensive compared to autovector + Slice* keys[MultiGetContext::MAX_BATCH_SIZE]; + bool may_match[MultiGetContext::MAX_BATCH_SIZE]; + int num_keys = 0; + for (auto iter = range->begin(); iter != range->end(); ++iter) { + keys[num_keys++] = &iter->ukey; + } + filter_bits_reader_->MayMatch(num_keys, &keys[0], &may_match[0]); + + int i = 0; + for (auto iter = range->begin(); iter != range->end(); ++iter) { + if (!may_match[i]) { + range->SkipKey(iter); + } + ++i; + } +} + size_t FullFilterBlockReader::ApproximateMemoryUsage() const { size_t usage = block_contents_.usable_size(); #ifdef ROCKSDB_MALLOC_USABLE_SIZE diff --git a/table/full_filter_block.h b/table/full_filter_block.h index e4384c91a..f97952a7c 100644 --- a/table/full_filter_block.h +++ b/table/full_filter_block.h @@ -107,6 +107,16 @@ class FullFilterBlockReader : public FilterBlockReader { const Slice& prefix, const SliceTransform* prefix_extractor, uint64_t block_offset = kNotValid, const bool no_io = false, const Slice* const const_ikey_ptr = nullptr) override; + + virtual void KeysMayMatch(MultiGetRange* range, + const SliceTransform* prefix_extractor, + uint64_t block_offset = kNotValid, + const bool no_io = false) override; + + virtual void PrefixesMayMatch(MultiGetRange* range, + const SliceTransform* prefix_extractor, + uint64_t block_offset = kNotValid, + const bool no_io = false) override; virtual size_t ApproximateMemoryUsage() const override; virtual bool RangeMayExist(const Slice* iterate_upper_bound, const Slice& user_key, const SliceTransform* prefix_extractor, @@ -124,6 +134,7 @@ class FullFilterBlockReader : public FilterBlockReader { // No copying allowed FullFilterBlockReader(const FullFilterBlockReader&); bool MayMatch(const Slice& entry); + void MayMatch(MultiGetRange* range); void operator=(const FullFilterBlockReader&); bool IsFilterCompatible(const Slice* iterate_upper_bound, const Slice& prefix, const Comparator* comparator); diff --git a/table/full_filter_block_test.cc b/table/full_filter_block_test.cc index f01ae52bf..3abae979a 100644 --- a/table/full_filter_block_test.cc +++ b/table/full_filter_block_test.cc @@ -45,6 +45,8 @@ class TestFilterBitsReader : public FilterBitsReader { explicit TestFilterBitsReader(const Slice& contents) : data_(contents.data()), len_(static_cast(contents.size())) {} + // Silence compiler warning about overloaded virtual + using FilterBitsReader::MayMatch; bool MayMatch(const Slice& entry) override { uint32_t h = Hash(entry.data(), entry.size(), 1); for (size_t i = 0; i + 4 <= len_; i += 4) { diff --git a/table/get_context.h b/table/get_context.h index d7d0e9808..7ed316f0e 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -5,6 +5,7 @@ #pragma once #include +#include #include "db/merge_context.h" #include "db/read_callback.h" #include "rocksdb/env.h" @@ -61,6 +62,8 @@ class GetContext { PinnedIteratorsManager* _pinned_iters_mgr = nullptr, ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); + GetContext() = default; + void MarkKeyMayExist(); // Records this key, value, and any meta-data (such as sequence number and diff --git a/table/multiget_context.h b/table/multiget_context.h new file mode 100644 index 000000000..d3a8d0946 --- /dev/null +++ b/table/multiget_context.h @@ -0,0 +1,249 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include +#include +#include "db/lookup_key.h" +#include "db/merge_context.h" +#include "rocksdb/env.h" +#include "rocksdb/statistics.h" +#include "rocksdb/types.h" +#include "util/autovector.h" + +namespace rocksdb { +class GetContext; + +struct KeyContext { + const Slice* key; + LookupKey* lkey; + Slice ukey; + Slice ikey; + Status* s; + MergeContext merge_context; + SequenceNumber max_covering_tombstone_seq; + bool key_exists; + SequenceNumber seq; + void* cb_arg; + PinnableSlice* value; + GetContext* get_context; + + KeyContext(const Slice& user_key, PinnableSlice* val, Status* stat) + : key(&user_key), + lkey(nullptr), + s(stat), + max_covering_tombstone_seq(0), + key_exists(false), + seq(0), + cb_arg(nullptr), + value(val), + get_context(nullptr) {} + + KeyContext() = default; +}; + +// The MultiGetContext class is a container for the sorted list of keys that +// we need to lookup in a batch. Its main purpose is to make batch execution +// easier by allowing various stages of the MultiGet lookups to operate on +// subsets of keys, potentially non-contiguous. In order to accomplish this, +// it defines the following classes - +// +// MultiGetContext::Range +// MultiGetContext::Range::Iterator +// MultiGetContext::Range::IteratorWrapper +// +// Here is an example of how this can be used - +// +// { +// MultiGetContext ctx(...); +// MultiGetContext::Range range = ctx.GetMultiGetRange(); +// +// // Iterate to determine some subset of the keys +// MultiGetContext::Range::Iterator start = range.begin(); +// MultiGetContext::Range::Iterator end = ...; +// +// // Make a new range with a subset of keys +// MultiGetContext::Range subrange(range, start, end); +// +// // Define an auxillary vector, if needed, to hold additional data for +// // each key +// std::array aux; +// +// // Iterate over the subrange and the auxillary vector simultaneously +// MultiGetContext::Range::Iterator iter = subrange.begin(); +// for (; iter != subrange.end(); ++iter) { +// KeyContext& key = *iter; +// Foo& aux_key = aux_iter[iter.index()]; +// ... +// } +// } +class MultiGetContext { + public: + // Limit the number of keys in a batch to this number. Benchmarks show that + // there is negligible benefit for batches exceeding this. Keeping this < 64 + // simplifies iteration, as well as reduces the amount of stack allocations + // htat need to be performed + static const int MAX_BATCH_SIZE = 32; + + MultiGetContext(KeyContext** sorted_keys, size_t num_keys, + SequenceNumber snapshot) + : sorted_keys_(sorted_keys), + num_keys_(num_keys), + value_mask_(0), + lookup_key_ptr_(reinterpret_cast(lookup_key_stack_buf)) { + int index = 0; + + if (num_keys > MAX_LOOKUP_KEYS_ON_STACK) { + lookup_key_heap_buf.reset(new char[sizeof(LookupKey) * num_keys]); + lookup_key_ptr_ = reinterpret_cast( + lookup_key_heap_buf.get()); + } + + for (size_t iter = 0; iter != num_keys_; ++iter) { + sorted_keys_[iter]->lkey = new (&lookup_key_ptr_[index]) + LookupKey(*sorted_keys_[iter]->key, snapshot); + sorted_keys_[iter]->ukey = sorted_keys_[iter]->lkey->user_key(); + sorted_keys_[iter]->ikey = sorted_keys_[iter]->lkey->internal_key(); + index++; + } + } + + ~MultiGetContext() { + for (size_t i = 0; i < num_keys_; ++i) { + lookup_key_ptr_[i].~LookupKey(); + } + } + + private: + static const int MAX_LOOKUP_KEYS_ON_STACK = 16; + alignas(alignof(LookupKey)) + char lookup_key_stack_buf[sizeof(LookupKey) * MAX_LOOKUP_KEYS_ON_STACK]; + KeyContext** sorted_keys_; + size_t num_keys_; + uint64_t value_mask_; + std::unique_ptr lookup_key_heap_buf; + LookupKey* lookup_key_ptr_; + + public: + // MultiGetContext::Range - Specifies a range of keys, by start and end index, + // from the parent MultiGetContext. Each range contains a bit vector that + // indicates whether the corresponding keys need to be processed or skipped. + // A Range object can be copy constructed, and the new object inherits the + // original Range's bit vector. This is useful for progressively skipping + // keys as the lookup goes through various stages. For example, when looking + // up keys in the same SST file, a Range is created excluding keys not + // belonging to that file. A new Range is then copy constructed and individual + // keys are skipped based on bloom filter lookup. + class Range { + public: + // MultiGetContext::Range::Iterator - A forward iterator that iterates over + // non-skippable keys in a Range, as well as keys whose final value has been + // found. The latter is tracked by MultiGetContext::value_mask_ + class Iterator { + public: + // -- iterator traits + typedef Iterator self_type; + typedef KeyContext value_type; + typedef KeyContext& reference; + typedef KeyContext* pointer; + typedef int difference_type; + typedef std::forward_iterator_tag iterator_category; + + Iterator(const Range* range, size_t idx) + : range_(range), ctx_(range->ctx_), index_(idx) { + while (index_ < range_->end_ && + (1ull << index_) & + (range_->ctx_->value_mask_ | range_->skip_mask_)) + index_++; + } + + Iterator(const Iterator&) = default; + Iterator& operator=(const Iterator&) = default; + + Iterator& operator++() { + while (++index_ < range_->end_ && + (1ull << index_) & + (range_->ctx_->value_mask_ | range_->skip_mask_)) + ; + return *this; + } + + bool operator==(Iterator other) const { + assert(range_->ctx_ == other.range_->ctx_); + return index_ == other.index_; + } + + bool operator!=(Iterator other) const { + assert(range_->ctx_ == other.range_->ctx_); + return index_ != other.index_; + } + + KeyContext& operator*() { + assert(index_ < range_->end_ && index_ >= range_->start_); + return *(ctx_->sorted_keys_[index_]); + } + + KeyContext* operator->() { + assert(index_ < range_->end_ && index_ >= range_->start_); + return ctx_->sorted_keys_[index_]; + } + + size_t index() { return index_; } + + private: + friend Range; + const Range* range_; + const MultiGetContext* ctx_; + size_t index_; + }; + + Range(const Range& mget_range, + const Iterator& first, + const Iterator& last) { + ctx_ = mget_range.ctx_; + start_ = first.index_; + end_ = last.index_; + skip_mask_ = mget_range.skip_mask_; + } + + Range() = default; + + Iterator begin() const { return Iterator(this, start_); } + + Iterator end() const { return Iterator(this, end_); } + + bool empty() { + return (((1ull << end_) - 1) & ~((1ull << start_) - 1) & + ~(ctx_->value_mask_ | skip_mask_)) == 0; + } + + void SkipKey(const Iterator& iter) { skip_mask_ |= 1ull << iter.index_; } + + // Update the value_mask_ in MultiGetContext so its + // immediately reflected in all the Range Iterators + void MarkKeyDone(Iterator& iter) { + ctx_->value_mask_ |= (1ull << iter.index_); + } + + bool CheckKeyDone(Iterator& iter) { + return ctx_->value_mask_ & (1ull << iter.index_); + } + + private: + friend MultiGetContext; + MultiGetContext* ctx_; + size_t start_; + size_t end_; + uint64_t skip_mask_; + + Range(MultiGetContext* ctx, size_t num_keys) + : ctx_(ctx), start_(0), end_(num_keys), skip_mask_(0) {} + }; + + // Return the initial range that encompasses all the keys in the batch + Range GetMultiGetRange() { return Range(this, num_keys_); } +}; + +} // namespace rocksdb diff --git a/table/table_reader.h b/table/table_reader.h index a5f15e130..bd6071d9c 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -11,7 +11,9 @@ #include #include "db/range_tombstone_fragmenter.h" #include "rocksdb/slice_transform.h" +#include "table/get_context.h" #include "table/internal_iterator.h" +#include "table/multiget_context.h" namespace rocksdb { @@ -22,6 +24,7 @@ class Arena; struct ReadOptions; struct TableProperties; class GetContext; +class MultiGetContext; // A Table is a sorted map from strings to strings. Tables are // immutable and persistent. A Table may be safely accessed from @@ -86,6 +89,16 @@ class TableReader { const SliceTransform* prefix_extractor, bool skip_filters = false) = 0; + virtual void MultiGet(const ReadOptions& readOptions, + const MultiGetContext::Range* mget_range, + const SliceTransform* prefix_extractor, + bool skip_filters = false) { + for (auto iter = mget_range->begin(); iter != mget_range->end(); ++iter) { + *iter->s = Get(readOptions, iter->ikey, iter->get_context, + prefix_extractor, skip_filters); + } + } + // Prefetch data corresponding to a give range of keys // Typically this functionality is required for table implementations that // persists the data on a non volatile storage medium like disk/SSD diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 0cb4e0eb2..88222664b 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1114,6 +1114,9 @@ DEFINE_uint64(stats_persist_period_sec, DEFINE_uint64(stats_history_buffer_size, rocksdb::Options().stats_history_buffer_size, "Max number of stats snapshots to keep in memory"); +DEFINE_int64(multiread_stride, 0, + "Stride length for the keys in a MultiGet batch"); +DEFINE_bool(multiread_batched, false, "Use the new MultiGet API"); enum RepFactory { kSkipList, @@ -2700,6 +2703,10 @@ void VerifyDBFromDB(std::string& truth_db_name) { } else if (name == "readreverse") { method = &Benchmark::ReadReverse; } else if (name == "readrandom") { + if (FLAGS_multiread_stride) { + fprintf(stderr, "entries_per_batch = %" PRIi64 "\n", + entries_per_batch_); + } method = &Benchmark::ReadRandom; } else if (name == "readrandomfast") { method = &Benchmark::ReadRandomFast; @@ -4531,6 +4538,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { int64_t read = 0; int64_t found = 0; int64_t bytes = 0; + int num_keys = 0; + int64_t key_rand = GetRandomKey(&thread->rand); ReadOptions options(FLAGS_verify_checksum, true); std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); @@ -4542,8 +4551,21 @@ void VerifyDBFromDB(std::string& truth_db_name) { // We use same key_rand as seed for key and column family so that we can // deterministically find the cfh corresponding to a particular key, as it // is done in DoWrite method. - int64_t key_rand = GetRandomKey(&thread->rand); GenerateKeyFromInt(key_rand, FLAGS_num, &key); + if (entries_per_batch_ > 1 && FLAGS_multiread_stride) { + if (++num_keys == entries_per_batch_) { + num_keys = 0; + key_rand = GetRandomKey(&thread->rand); + if ((key_rand + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >= + FLAGS_num) { + key_rand = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride; + } + } else { + key_rand += FLAGS_multiread_stride; + } + } else { + key_rand = GetRandomKey(&thread->rand); + } read++; Status s; if (FLAGS_num_column_families > 1) { @@ -4595,6 +4617,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { std::vector keys; std::vector > key_guards; std::vector values(entries_per_batch_); + PinnableSlice* pin_values = new PinnableSlice[entries_per_batch_]; + std::vector stat_list(entries_per_batch_); while (static_cast(keys.size()) < entries_per_batch_) { key_guards.push_back(std::unique_ptr()); keys.push_back(AllocateKey(&key_guards.back())); @@ -4603,21 +4627,52 @@ void VerifyDBFromDB(std::string& truth_db_name) { Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { DB* db = SelectDB(thread); - for (int64_t i = 0; i < entries_per_batch_; ++i) { - GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]); + if (FLAGS_multiread_stride) { + int64_t key = GetRandomKey(&thread->rand); + if ((key + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >= + (int64_t)FLAGS_num) { + key = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride; + } + for (int64_t i = 0; i < entries_per_batch_; ++i) { + GenerateKeyFromInt(key, FLAGS_num, &keys[i]); + key += FLAGS_multiread_stride; + } + } else { + for (int64_t i = 0; i < entries_per_batch_; ++i) { + GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]); + } } - std::vector statuses = db->MultiGet(options, keys, &values); - assert(static_cast(statuses.size()) == entries_per_batch_); - - read += entries_per_batch_; - num_multireads++; - for (int64_t i = 0; i < entries_per_batch_; ++i) { - if (statuses[i].ok()) { - ++found; - } else if (!statuses[i].IsNotFound()) { - fprintf(stderr, "MultiGet returned an error: %s\n", - statuses[i].ToString().c_str()); - abort(); + if (!FLAGS_multiread_batched) { + std::vector statuses = db->MultiGet(options, keys, &values); + assert(static_cast(statuses.size()) == entries_per_batch_); + + read += entries_per_batch_; + num_multireads++; + for (int64_t i = 0; i < entries_per_batch_; ++i) { + if (statuses[i].ok()) { + ++found; + } else if (!statuses[i].IsNotFound()) { + fprintf(stderr, "MultiGet returned an error: %s\n", + statuses[i].ToString().c_str()); + abort(); + } + } + } else { + db->MultiGet(options, db->DefaultColumnFamily(), keys.size(), + keys.data(), pin_values, stat_list.data()); + + read += entries_per_batch_; + num_multireads++; + for (int64_t i = 0; i < entries_per_batch_; ++i) { + if (stat_list[i].ok()) { + ++found; + } else if (!stat_list[i].IsNotFound()) { + fprintf(stderr, "MultiGet returned an error: %s\n", + stat_list[i].ToString().c_str()); + abort(); + } + stat_list[i] = Status::OK(); + pin_values[i].Reset(); } } if (thread->shared->read_rate_limiter.get() != nullptr && diff --git a/util/bloom.cc b/util/bloom.cc index 9c05f7107..1da4f2aa4 100644 --- a/util/bloom.cc +++ b/util/bloom.cc @@ -181,8 +181,37 @@ class FullFilterBitsReader : public FilterBitsReader { // Other Error params, including a broken filter, regarded as match if (num_probes_ == 0 || num_lines_ == 0) return true; uint32_t hash = BloomHash(entry); - return HashMayMatch(hash, Slice(data_, data_len_), - num_probes_, num_lines_); + uint32_t bit_offset; + FilterPrepare(hash, Slice(data_, data_len_), num_lines_, &bit_offset); + return HashMayMatch(hash, Slice(data_, data_len_), num_probes_, bit_offset); + } + + virtual void MayMatch(int num_keys, Slice** keys, bool* may_match) override { + if (data_len_ <= 5) { // remain same with original filter + for (int i = 0; i < num_keys; ++i) { + may_match[i] = false; + } + return; + } + for (int i = 0; i < num_keys; ++i) { + may_match[i] = true; + } + // Other Error params, including a broken filter, regarded as match + if (num_probes_ == 0 || num_lines_ == 0) return; + uint32_t hashes[MultiGetContext::MAX_BATCH_SIZE]; + uint32_t bit_offsets[MultiGetContext::MAX_BATCH_SIZE]; + for (int i = 0; i < num_keys; ++i) { + hashes[i] = BloomHash(*keys[i]); + FilterPrepare(hashes[i], Slice(data_, data_len_), num_lines_, + &bit_offsets[i]); + } + + for (int i = 0; i < num_keys; ++i) { + if (!HashMayMatch(hashes[i], Slice(data_, data_len_), num_probes_, + bit_offsets[i])) { + may_match[i] = false; + } + } } private: @@ -211,7 +240,10 @@ class FullFilterBitsReader : public FilterBitsReader { // Before calling this function, need to ensure the input meta data // is valid. bool HashMayMatch(const uint32_t& hash, const Slice& filter, - const size_t& num_probes, const uint32_t& num_lines); + const size_t& num_probes, const uint32_t& bit_offset); + + void FilterPrepare(const uint32_t& hash, const Slice& filter, + const uint32_t& num_lines, uint32_t* bit_offset); // No Copy allowed FullFilterBitsReader(const FullFilterBitsReader&); @@ -232,29 +264,44 @@ void FullFilterBitsReader::GetFilterMeta(const Slice& filter, *num_lines = DecodeFixed32(filter.data() + len - 4); } +void FullFilterBitsReader::FilterPrepare(const uint32_t& hash, + const Slice& filter, + const uint32_t& num_lines, + uint32_t* bit_offset) { + uint32_t len = static_cast(filter.size()); + if (len <= 5) return; // remain the same with original filter + + // It is ensured the params are valid before calling it + assert(num_lines != 0 && (len - 5) % num_lines == 0); + + uint32_t h = hash; + // Left shift by an extra 3 to convert bytes to bits + uint32_t b = (h % num_lines) << (log2_cache_line_size_ + 3); + PREFETCH(&filter.data()[b / 8], 0 /* rw */, 1 /* locality */); + PREFETCH(&filter.data()[b / 8 + (1 << log2_cache_line_size_) - 1], + 0 /* rw */, 1 /* locality */); + *bit_offset = b; +} + bool FullFilterBitsReader::HashMayMatch(const uint32_t& hash, - const Slice& filter, const size_t& num_probes, - const uint32_t& num_lines) { + const Slice& filter, + const size_t& num_probes, + const uint32_t& bit_offset) { uint32_t len = static_cast(filter.size()); if (len <= 5) return false; // remain the same with original filter // It is ensured the params are valid before calling it assert(num_probes != 0); - assert(num_lines != 0 && (len - 5) % num_lines == 0); const char* data = filter.data(); uint32_t h = hash; const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits - // Left shift by an extra 3 to convert bytes to bits - uint32_t b = (h % num_lines) << (log2_cache_line_size_ + 3); - PREFETCH(&data[b / 8], 0 /* rw */, 1 /* locality */); - PREFETCH(&data[b / 8 + (1 << log2_cache_line_size_) - 1], 0 /* rw */, - 1 /* locality */); for (uint32_t i = 0; i < num_probes; ++i) { // Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized // to a simple and operation by compiler. - const uint32_t bitpos = b + (h & ((1 << (log2_cache_line_size_ + 3)) - 1)); + const uint32_t bitpos = + bit_offset + (h & ((1 << (log2_cache_line_size_ + 3)) - 1)); if (((data[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { return false; } diff --git a/util/fault_injection_test_env.h b/util/fault_injection_test_env.h index 7c5a080f7..a39e5b71e 100644 --- a/util/fault_injection_test_env.h +++ b/util/fault_injection_test_env.h @@ -123,6 +123,8 @@ class FaultInjectionTestEnv : public EnvWrapper { virtual Status RenameFile(const std::string& s, const std::string& t) override; +// Undef to eliminate clash on Windows +#undef GetFreeSpace virtual Status GetFreeSpace(const std::string& path, uint64_t* disk_free) override { if (!IsFilesystemActive() && error_ == Status::NoSpace()) { diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 3beb74fc9..8a7f7e084 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -166,6 +166,16 @@ class BlobDB : public StackableDB { } return MultiGet(options, keys, values); } + virtual void MultiGet(const ReadOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const size_t num_keys, const Slice* /*keys*/, + PinnableSlice* /*values*/, Status* statuses, + const bool /*sorted_input*/ = false) override { + for (size_t i = 0; i < num_keys; ++i) { + statuses[i] = Status::NotSupported( + "Blob DB doesn't support batched MultiGet"); + } + } using rocksdb::StackableDB::SingleDelete; virtual Status SingleDelete(const WriteOptions& /*wopts*/,