Introduce a new MultiGet batching implementation (#5011)

Summary:
This PR introduces a new MultiGet() API, with the underlying implementation grouping keys based on SST file and batching lookups in a file. The reason for the new API is twofold - the definition allows callers to allocate storage for status and values on stack instead of std::vector, as well as return values as PinnableSlices in order to avoid copying, and it keeps the original MultiGet() implementation intact while we experiment with batching.

Batching is useful when there is some spatial locality to the keys being queries, as well as larger batch sizes. The main benefits are due to -
1. Fewer function calls, especially to BlockBasedTableReader::MultiGet() and FullFilterBlockReader::KeysMayMatch()
2. Bloom filter cachelines can be prefetched, hiding the cache miss latency

The next step is to optimize the binary searches in the level_storage_info, index blocks and data blocks, since we could reduce the number of key comparisons if the keys are relatively close to each other. The batching optimizations also need to be extended to other formats, such as PlainTable and filter formats. This also needs to be added to db_stress.

Benchmark results from db_bench for various batch size/locality of reference combinations are given below. Locality was simulated by offsetting the keys in a batch by a stride length. Each SST file is about 8.6MB uncompressed and key/value size is 16/100 uncompressed. To focus on the cpu benefit of batching, the runs were single threaded and bound to the same cpu to eliminate interference from other system events. The results show a 10-25% improvement in micros/op from smaller to larger batch sizes (4 - 32).

Batch   Sizes

1        | 2        | 4         | 8      | 16  | 32

Random pattern (Stride length 0)
4.158 | 4.109 | 4.026 | 4.05 | 4.1 | 4.074        - Get
4.438 | 4.302 | 4.165 | 4.122 | 4.096 | 4.075 - MultiGet (no batching)
4.461 | 4.256 | 4.277 | 4.11 | 4.182 | 4.14        - MultiGet (w/ batching)

Good locality (Stride length 16)
4.048 | 3.659 | 3.248 | 2.99 | 2.84 | 2.753
4.429 | 3.728 | 3.406 | 3.053 | 2.911 | 2.781
4.452 | 3.45 | 2.833 | 2.451 | 2.233 | 2.135

Good locality (Stride length 256)
4.066 | 3.786 | 3.581 | 3.447 | 3.415 | 3.232
4.406 | 4.005 | 3.644 | 3.49 | 3.381 | 3.268
4.393 | 3.649 | 3.186 | 2.882 | 2.676 | 2.62

Medium locality (Stride length 4096)
4.012 | 3.922 | 3.768 | 3.61 | 3.582 | 3.555
4.364 | 4.057 | 3.791 | 3.65 | 3.57 | 3.465
4.479 | 3.758 | 3.316 | 3.077 | 2.959 | 2.891

dbbench command used (on a DB with 4 levels, 12 million keys)-
TEST_TMPDIR=/dev/shm numactl -C 10  ./db_bench.tmp -use_existing_db=true -benchmarks="readseq,multireadrandom" -write_buffer_size=4194304 -target_file_size_base=4194304 -max_bytes_for_level_base=16777216 -num=12000000 -reads=12000000 -duration=90 -threads=1 -compression_type=none -cache_size=4194304000 -batch_size=32 -disable_auto_compactions=true -bloom_bits=10 -cache_index_and_filter_blocks=true -pin_l0_filter_and_index_blocks_in_cache=true -multiread_batched=true -multiread_stride=4
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5011

Differential Revision: D14348703

Pulled By: anand1976

fbshipit-source-id: 774406dab3776d979c809522a67bedac6c17f84b
main
anand76 6 years ago committed by Facebook Github Bot
parent ed9f5e21aa
commit fefd4b98c5
  1. 77
      db/db_basic_test.cc
  2. 185
      db/db_impl.cc
  3. 19
      db/db_impl.h
  4. 41
      db/db_test.cc
  5. 28
      db/db_test_util.cc
  6. 3
      db/db_test_util.h
  7. 49
      db/dbformat.h
  8. 65
      db/lookup_key.h
  9. 3
      db/merge_context.h
  10. 61
      db/table_cache.cc
  11. 8
      db/table_cache.h
  12. 560
      db/version_set.cc
  13. 6
      db/version_set.h
  14. 40
      include/rocksdb/db.h
  15. 10
      include/rocksdb/filter_policy.h
  16. 3
      include/rocksdb/statistics.h
  17. 9
      include/rocksdb/utilities/stackable_db.h
  18. 2
      monitoring/statistics.cc
  19. 165
      table/block_based_table_reader.cc
  20. 12
      table/block_based_table_reader.h
  21. 27
      table/filter_block.h
  22. 53
      table/full_filter_block.cc
  23. 11
      table/full_filter_block.h
  24. 2
      table/full_filter_block_test.cc
  25. 3
      table/get_context.h
  26. 249
      table/multiget_context.h
  27. 13
      table/table_reader.h
  28. 85
      tools/db_bench_tool.cc
  29. 71
      util/bloom.cc
  30. 2
      util/fault_injection_test_env.h
  31. 10
      utilities/blob_db/blob_db.h

@ -994,7 +994,7 @@ TEST_F(DBBasicTest, MultiGetMultiCF) {
keys.push_back("cf" + std::to_string(i) + "_key");
}
values = MultiGet(cfs, keys);
values = MultiGet(cfs, keys, nullptr);
ASSERT_EQ(values.size(), 8);
for (unsigned int j = 0; j < values.size(); ++j) {
ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val2");
@ -1054,7 +1054,7 @@ TEST_F(DBBasicTest, MultiGetMultiCFMutex) {
keys.push_back("cf" + std::to_string(i) + "_key");
}
values = MultiGet(cfs, keys);
values = MultiGet(cfs, keys, nullptr);
ASSERT_TRUE(last_try);
ASSERT_EQ(values.size(), 8);
for (unsigned int j = 0; j < values.size(); ++j) {
@ -1128,6 +1128,79 @@ TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) {
}
}
TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Reopen(options);
int num_keys = 0;
for (int i = 0; i < 128; ++i) {
ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
Flush();
num_keys = 0;
}
}
if (num_keys > 0) {
Flush();
num_keys = 0;
}
MoveFilesToLevel(2);
for (int i = 0; i < 128; i += 3) {
ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
Flush();
num_keys = 0;
}
}
if (num_keys > 0) {
Flush();
num_keys = 0;
}
MoveFilesToLevel(1);
for (int i = 0; i < 128; i += 5) {
ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
num_keys++;
if (num_keys == 8) {
Flush();
num_keys = 0;
}
}
if (num_keys > 0) {
Flush();
num_keys = 0;
}
for (int i = 0; i < 128; i += 9) {
ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
}
std::vector<std::string> keys;
std::vector<std::string> values;
for (int i = 64; i < 80; ++i) {
keys.push_back("key_" + std::to_string(i));
}
values = MultiGet(keys, nullptr);
ASSERT_EQ(values.size(), 16);
for (unsigned int j = 0; j < values.size(); ++j) {
int key = j + 64;
if (key % 9 == 0) {
ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
} else if (key % 5 == 0) {
ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
} else if (key % 3 == 0) {
ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
} else {
ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -76,7 +76,9 @@
#include "rocksdb/write_buffer_manager.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/get_context.h"
#include "table/merging_iterator.h"
#include "table/multiget_context.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "tools/sst_dump_tool_imp.h"
@ -1659,6 +1661,189 @@ std::vector<Status> DBImpl::MultiGet(
return stat_list;
}
// Order keys by CF ID, followed by key contents
struct CompareKeyContext {
inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
const Comparator* comparator = cfd->user_comparator();
int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
if (cmp < 0) {
return true;
}
return false;
}
const ColumnFamilyData* cfd;
};
void DBImpl::MultiGet(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values,
Status* statuses, const bool sorted_input) {
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
for (size_t i = 0; i < num_keys; ++i) {
key_context.emplace_back(keys[i], &values[i], &statuses[i]);
}
MultiGetImpl(read_options, column_family, key_context, sorted_input, nullptr,
nullptr);
}
void DBImpl::MultiGetImpl(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE>& key_context,
bool sorted_input, ReadCallback* callback, bool* is_blob_index) {
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_MULTIGET);
size_t num_keys = key_context.size();
PERF_TIMER_GUARD(get_snapshot_time);
ColumnFamilyHandleImpl* cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys);
{
size_t index = 0;
for (KeyContext& key : key_context) {
#ifndef NDEBUG
if (index > 0) {
KeyContext* lhs = &key_context[index-1];
KeyContext* rhs = &key_context[index];
const Comparator* comparator = cfd->user_comparator();
int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
assert(cmp <= 0);
}
#endif
sorted_keys[index] = &key;
index++;
}
if (!sorted_input) {
CompareKeyContext sort_comparator;
sort_comparator.cfd = cfd;
std::sort(sorted_keys.begin(), sorted_keys.begin() + index,
sort_comparator);
}
}
// Keep track of bytes that we read for statistics-recording later
PERF_TIMER_STOP(get_snapshot_time);
// Acquire SuperVersion
SuperVersion* super_version = GetAndRefSuperVersion(cfd);
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
// Note: In WritePrepared txns this is not necessary but not harmful
// either. Because prep_seq > snapshot => commit_seq > snapshot so if
// a snapshot is specified we should be fine with skipping seq numbers
// that are greater than that.
//
// In WriteUnprepared, we cannot set snapshot in the lookup key because we
// may skip uncommitted data that should be visible to the transaction for
// reading own writes.
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
if (callback) {
snapshot = std::max(snapshot, callback->max_visible_seq());
}
} else {
// Since we get and reference the super version before getting
// the snapshot number, without a mutex protection, it is possible
// that a memtable switch happened in the middle and not all the
// data for this snapshot is available. But it will contain all
// the data available in the super version we have, which is also
// a valid snapshot to read from.
// We shouldn't get snapshot before finding and referencing the super
// version because a flush happening in between may compact away data for
// the snapshot, but the snapshot is earlier than the data overwriting it,
// so users may see wrong results.
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
}
// For each of the given keys, apply the entire "get" process as follows:
// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
size_t keys_left = num_keys;
while (keys_left) {
size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
? MultiGetContext::MAX_BATCH_SIZE
: keys_left;
MultiGetContext ctx(&sorted_keys[num_keys - keys_left], batch_size,
snapshot);
MultiGetRange range = ctx.GetMultiGetRange();
bool lookup_current = false;
keys_left -= batch_size;
for (auto mget_iter = range.begin(); mget_iter != range.end();
++mget_iter) {
MergeContext& merge_context = mget_iter->merge_context;
merge_context.Clear();
Status& s = *mget_iter->s;
PinnableSlice* value = mget_iter->value;
s = Status::OK();
bool skip_memtable =
(read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
if (!skip_memtable) {
if (super_version->mem->Get(*(mget_iter->lkey), value->GetSelf(), &s,
&merge_context,
&mget_iter->max_covering_tombstone_seq,
read_options, callback, is_blob_index)) {
done = true;
value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if (super_version->imm->Get(
*(mget_iter->lkey), value->GetSelf(), &s, &merge_context,
&mget_iter->max_covering_tombstone_seq, read_options,
callback, is_blob_index)) {
done = true;
value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
}
}
if (done) {
range.MarkKeyDone(mget_iter);
} else {
RecordTick(stats_, MEMTABLE_MISS);
lookup_current = true;
}
}
if (lookup_current) {
PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->MultiGet(read_options, &range, callback,
is_blob_index);
}
}
// Post processing (decrement reference counts and record statistics)
PERF_TIMER_GUARD(get_post_process_time);
size_t num_found = 0;
uint64_t bytes_read = 0;
for (KeyContext& key : key_context) {
if (key.s->ok()) {
bytes_read += key.value->size();
num_found++;
}
}
ReturnAndCleanupSuperVersion(cfd, super_version);
RecordTick(stats_, NUMBER_MULTIGET_CALLS);
RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
PERF_TIMER_STOP(get_post_process_time);
}
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family,
ColumnFamilyHandle** handle) {

@ -124,6 +124,25 @@ class DBImpl : public DB {
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
// This MultiGet is a batched version, which may be faster than calling Get
// multiple times, especially if the keys have some spatial locality that
// enables them to be queried in the same SST files/set of files. The larger
// the batch size, the more scope for batching and performance improvement
// The values and statuses parameters are arrays with number of elements
// equal to keys.size(). This allows the storage for those to be alloacted
// by the caller on the stack for small batches
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override;
void MultiGetImpl(
const ReadOptions& options, ColumnFamilyHandle* column_family,
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE>& key_context,
bool sorted_input, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family,
ColumnFamilyHandle** handle) override;

@ -2153,6 +2153,7 @@ struct MTState {
struct MTThread {
MTState* state;
int id;
bool multiget_batched;
};
static void MTThreadBody(void* arg) {
@ -2201,8 +2202,28 @@ static void MTThreadBody(void* arg) {
// same)
std::vector<Slice> keys(kColumnFamilies, Slice(keybuf));
std::vector<std::string> values;
std::vector<Status> statuses =
db->MultiGet(ReadOptions(), t->state->test->handles_, keys, &values);
std::vector<Status> statuses;
if (!t->multiget_batched) {
statuses = db->MultiGet(ReadOptions(), t->state->test->handles_, keys,
&values);
} else {
std::vector<PinnableSlice> pin_values(keys.size());
statuses.resize(keys.size());
const Snapshot* snapshot = db->GetSnapshot();
ReadOptions ro;
ro.snapshot = snapshot;
for (int cf = 0; cf < kColumnFamilies; ++cf) {
db->MultiGet(ro, t->state->test->handles_[cf], 1, &keys[cf],
&pin_values[cf], &statuses[cf]);
}
db->ReleaseSnapshot(snapshot);
values.resize(keys.size());
for (int cf = 0; cf < kColumnFamilies; ++cf) {
if (statuses[cf].ok()) {
values[cf].assign(pin_values[cf].data(), pin_values[cf].size());
}
}
}
Status s = statuses[0];
// all statuses have to be the same
for (size_t i = 1; i < statuses.size(); ++i) {
@ -2244,10 +2265,13 @@ static void MTThreadBody(void* arg) {
} // namespace
class MultiThreadedDBTest : public DBTest,
public ::testing::WithParamInterface<int> {
class MultiThreadedDBTest
: public DBTest,
public ::testing::WithParamInterface<std::tuple<int, bool>> {
public:
void SetUp() override { option_config_ = GetParam(); }
void SetUp() override {
std::tie(option_config_, multiget_batched_) = GetParam();
}
static std::vector<int> GenerateOptionConfigs() {
std::vector<int> optionConfigs;
@ -2256,6 +2280,8 @@ class MultiThreadedDBTest : public DBTest,
}
return optionConfigs;
}
bool multiget_batched_;
};
TEST_P(MultiThreadedDBTest, MultiThreaded) {
@ -2282,6 +2308,7 @@ TEST_P(MultiThreadedDBTest, MultiThreaded) {
for (int id = 0; id < kNumThreads; id++) {
thread[id].state = &mt;
thread[id].id = id;
thread[id].multiget_batched = multiget_batched_;
env_->StartThread(MTThreadBody, &thread[id]);
}
@ -2299,7 +2326,9 @@ TEST_P(MultiThreadedDBTest, MultiThreaded) {
INSTANTIATE_TEST_CASE_P(
MultiThreaded, MultiThreadedDBTest,
::testing::ValuesIn(MultiThreadedDBTest::GenerateOptionConfigs()));
::testing::Combine(
::testing::ValuesIn(MultiThreadedDBTest::GenerateOptionConfigs()),
::testing::Bool()));
#endif // ROCKSDB_LITE
// Group commit test:

@ -780,6 +780,34 @@ std::vector<std::string> DBTestBase::MultiGet(std::vector<int> cfs,
return result;
}
std::vector<std::string> DBTestBase::MultiGet(const std::vector<std::string>& k,
const Snapshot* snapshot) {
ReadOptions options;
options.verify_checksums = true;
options.snapshot = snapshot;
std::vector<Slice> keys;
std::vector<std::string> result;
std::vector<Status> statuses(k.size());
std::vector<PinnableSlice> pin_values(k.size());
for (unsigned int i = 0; i < k.size(); ++i) {
keys.push_back(k[i]);
}
db_->MultiGet(options, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), pin_values.data(), statuses.data());
result.resize(k.size());
for (auto iter = result.begin(); iter != result.end(); ++iter) {
iter->assign(pin_values[iter - result.begin()].data(),
pin_values[iter - result.begin()].size());
}
for (unsigned int i = 0; i < statuses.size(); ++i) {
if (statuses[i].IsNotFound()) {
result[i] = "NOT_FOUND";
}
}
return result;
}
Status DBTestBase::Get(const std::string& k, PinnableSlice* v) {
ReadOptions options;
options.verify_checksums = true;

@ -842,6 +842,9 @@ class DBTestBase : public testing::Test {
const std::vector<std::string>& k,
const Snapshot* snapshot = nullptr);
std::vector<std::string> MultiGet(const std::vector<std::string>& k,
const Snapshot* snapshot = nullptr);
uint64_t GetNumSnapshots();
uint64_t GetTimeOldestSnapshots();

@ -9,8 +9,11 @@
#pragma once
#include <stdio.h>
#include <memory>
#include <string>
#include <utility>
#include "db/lookup_key.h"
#include "db/merge_context.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
@ -292,52 +295,6 @@ inline uint64_t GetInternalKeySeqno(const Slice& internal_key) {
return num >> 8;
}
// A helper class useful for DBImpl::Get()
class LookupKey {
public:
// Initialize *this for looking up user_key at a snapshot with
// the specified sequence number.
LookupKey(const Slice& _user_key, SequenceNumber sequence);
~LookupKey();
// Return a key suitable for lookup in a MemTable.
Slice memtable_key() const {
return Slice(start_, static_cast<size_t>(end_ - start_));
}
// Return an internal key (suitable for passing to an internal iterator)
Slice internal_key() const {
return Slice(kstart_, static_cast<size_t>(end_ - kstart_));
}
// Return the user key
Slice user_key() const {
return Slice(kstart_, static_cast<size_t>(end_ - kstart_ - 8));
}
private:
// We construct a char array of the form:
// klength varint32 <-- start_
// userkey char[klength] <-- kstart_
// tag uint64
// <-- end_
// The array is a suitable MemTable key.
// The suffix starting with "userkey" can be used as an InternalKey.
const char* start_;
const char* kstart_;
const char* end_;
char space_[200]; // Avoid allocation for short keys
// No copying allowed
LookupKey(const LookupKey&);
void operator=(const LookupKey&);
};
inline LookupKey::~LookupKey() {
if (start_ != space_) delete[] start_;
}
class IterKey {
public:
IterKey()

@ -0,0 +1,65 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <string>
#include <utility>
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/types.h"
namespace rocksdb {
// A helper class useful for DBImpl::Get()
class LookupKey {
public:
// Initialize *this for looking up user_key at a snapshot with
// the specified sequence number.
LookupKey(const Slice& _user_key, SequenceNumber sequence);
~LookupKey();
// Return a key suitable for lookup in a MemTable.
Slice memtable_key() const {
return Slice(start_, static_cast<size_t>(end_ - start_));
}
// Return an internal key (suitable for passing to an internal iterator)
Slice internal_key() const {
return Slice(kstart_, static_cast<size_t>(end_ - kstart_));
}
// Return the user key
Slice user_key() const {
return Slice(kstart_, static_cast<size_t>(end_ - kstart_ - 8));
}
private:
// We construct a char array of the form:
// klength varint32 <-- start_
// userkey char[klength] <-- kstart_
// tag uint64
// <-- end_
// The array is a suitable MemTable key.
// The suffix starting with "userkey" can be used as an InternalKey.
const char* start_;
const char* kstart_;
const char* end_;
char space_[200]; // Avoid allocation for short keys
// No copying allowed
LookupKey(const LookupKey&);
void operator=(const LookupKey&);
};
inline LookupKey::~LookupKey() {
if (start_ != space_) delete[] start_;
}
} // namespace rocksdb

@ -4,9 +4,10 @@
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include <algorithm>
#include <memory>
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "rocksdb/slice.h"
namespace rocksdb {

@ -19,6 +19,7 @@
#include "table/get_context.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "table/multiget_context.h"
#include "table/table_builder.h"
#include "table/table_reader.h"
#include "util/coding.h"
@ -418,6 +419,66 @@ Status TableCache::Get(const ReadOptions& options,
return s;
}
// Batched version of TableCache::MultiGet.
// TODO: Add support for row cache. As of now, this ignores the row cache
// and directly looks up in the table files
Status TableCache::MultiGet(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
const MultiGetContext::Range* mget_range,
const SliceTransform* prefix_extractor,
HistogramImpl* file_read_hist, bool skip_filters,
int level) {
auto& fd = file_meta.fd;
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
if (s.ok()) {
if (t == nullptr) {
s = FindTable(
env_options_, internal_comparator, fd, &handle, prefix_extractor,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters, level);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
assert(t);
}
}
if (s.ok() && !options.ignore_range_deletions) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
t->NewRangeTombstoneIterator(options));
if (range_del_iter != nullptr) {
for (auto iter = mget_range->begin(); iter != mget_range->end();
++iter) {
const Slice& k = iter->ikey;
SequenceNumber* max_covering_tombstone_seq =
iter->get_context->max_covering_tombstone_seq();
*max_covering_tombstone_seq = std::max(
*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k)));
}
}
}
if (s.ok()) {
t->MultiGet(options, mget_range, prefix_extractor, skip_filters);
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
for (auto iter = mget_range->begin(); iter != mget_range->end(); ++iter) {
Status* status = iter->s;
if (status->IsIncomplete()) {
// Couldn't find Table in cache but treat as kFound if no_io set
iter->get_context->MarkKeyMayExist();
s = Status::OK();
}
}
}
}
if (handle != nullptr) {
ReleaseHandle(handle);
}
return s;
}
Status TableCache::GetTableProperties(
const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,

@ -76,6 +76,14 @@ class TableCache {
HistogramImpl* file_read_hist = nullptr, bool skip_filters = false,
int level = -1);
Status MultiGet(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
const MultiGetContext::Range* mget_range,
const SliceTransform* prefix_extractor = nullptr,
HistogramImpl* file_read_hist = nullptr,
bool skip_filters = false, int level = -1);
// Evict any entry for the specified file number
static void Evict(Cache* cache, uint64_t file_number);

@ -16,6 +16,7 @@
#include <inttypes.h>
#include <stdio.h>
#include <algorithm>
#include <array>
#include <list>
#include <map>
#include <set>
@ -42,6 +43,7 @@
#include "table/internal_iterator.h"
#include "table/merging_iterator.h"
#include "table/meta_blocks.h"
#include "table/multiget_context.h"
#include "table/plain_table_factory.h"
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
@ -345,6 +347,407 @@ class FilePicker {
return false;
}
};
class FilePickerMultiGet {
private:
struct FilePickerContext;
public:
FilePickerMultiGet(std::vector<FileMetaData*>* files, MultiGetRange* range,
autovector<LevelFilesBrief>* file_levels,
unsigned int num_levels, FileIndexer* file_indexer,
const Comparator* user_comparator,
const InternalKeyComparator* internal_comparator)
: num_levels_(num_levels),
curr_level_(static_cast<unsigned int>(-1)),
returned_file_level_(static_cast<unsigned int>(-1)),
hit_file_level_(static_cast<unsigned int>(-1)),
range_(range),
batch_iter_(range->begin()),
batch_iter_prev_(range->begin()),
maybe_repeat_key_(false),
current_level_range_(*range, range->begin(), range->end()),
current_file_range_(*range, range->begin(), range->end()),
#ifndef NDEBUG
files_(files),
#endif
level_files_brief_(file_levels),
is_hit_file_last_in_level_(false),
curr_file_level_(nullptr),
file_indexer_(file_indexer),
user_comparator_(user_comparator),
internal_comparator_(internal_comparator) {
#ifdef NDEBUG
(void)files;
#endif
for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
fp_ctx_array_[iter.index()] =
FilePickerContext(0, FileIndexer::kLevelMaxIndex);
}
// Setup member variables to search first level.
search_ended_ = !PrepareNextLevel();
if (!search_ended_) {
// REVISIT
// Prefetch Level 0 table data to avoid cache miss if possible.
// As of now, only PlainTableReader and CuckooTableReader do any
// prefetching. This may not be necessary anymore once we implement
// batching in those table readers
for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
if (r) {
for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
r->Prepare(iter->ikey);
}
}
}
}
}
int GetCurrentLevel() const { return curr_level_; }
// Iterates through files in the current level until it finds a file that
// contains atleast one key from the MultiGet batch
bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
size_t* file_index, FdWithKeyRange** fd,
bool* is_last_key_in_file) {
size_t curr_file_index = *file_index;
FdWithKeyRange* f = nullptr;
bool file_hit = false;
int cmp_largest = -1;
if (curr_file_index >= curr_file_level_->num_files) {
return false;
}
// Loops over keys in the MultiGet batch until it finds a file with
// atleast one of the keys. Then it keeps moving forward until the
// last key in the batch that falls in that file
while (batch_iter_ != current_level_range_.end() &&
(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
curr_file_index ||
!file_hit)) {
struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
Slice& user_key = batch_iter_->ukey;
// Do key range filtering of files or/and fractional cascading if:
// (1) not all the files are in level 0, or
// (2) there are more than 3 current level files
// If there are only 3 or less current level files in the system, we
// skip the key range filtering. In this case, more likely, the system
// is highly tuned to minimize number of tables queried by each query,
// so it is unlikely that key range filtering is more efficient than
// querying the files.
if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
// Check if key is within a file's range. If search left bound and
// right bound point to the same find, we are sure key falls in
// range.
assert(curr_level_ == 0 ||
fp_ctx.curr_index_in_curr_level ==
fp_ctx.start_index_in_curr_level ||
user_comparator_->Compare(user_key,
ExtractUserKey(f->smallest_key)) <= 0);
int cmp_smallest = user_comparator_->Compare(
user_key, ExtractUserKey(f->smallest_key));
if (cmp_smallest >= 0) {
cmp_largest = user_comparator_->Compare(
user_key, ExtractUserKey(f->largest_key));
} else {
cmp_largest = -1;
}
// Setup file search bound for the next level based on the
// comparison results
if (curr_level_ > 0) {
file_indexer_->GetNextLevelIndex(
curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
cmp_largest, &fp_ctx.search_left_bound,
&fp_ctx.search_right_bound);
}
// Key falls out of current file's range
if (cmp_smallest < 0 || cmp_largest > 0) {
next_file_range->SkipKey(batch_iter_);
} else {
file_hit = true;
}
} else {
file_hit = true;
}
#ifndef NDEBUG
// Sanity check to make sure that the files are correctly sorted
if (f != prev_file_) {
if (prev_file_) {
if (curr_level_ != 0) {
int comp_sign = internal_comparator_->Compare(
prev_file_->largest_key, f->smallest_key);
assert(comp_sign < 0);
} else if (fp_ctx.curr_index_in_curr_level > 0) {
// level == 0, the current file cannot be newer than the previous
// one. Use compressed data structure, has no attribute seqNo
assert(!NewestFirstBySeqNo(
files_[0][fp_ctx.curr_index_in_curr_level],
files_[0][fp_ctx.curr_index_in_curr_level - 1]));
}
}
prev_file_ = f;
}
#endif
if (cmp_largest == 0) {
// cmp_largest is 0, which means the next key will not be in this
// file, so stop looking further. Also don't increment megt_iter_
// as we may have to look for this key in the next file if we don't
// find it in this one
break;
} else {
if (curr_level_ == 0) {
// We need to look through all files in level 0
++fp_ctx.curr_index_in_curr_level;
}
++batch_iter_;
}
if (!file_hit) {
curr_file_index =
(batch_iter_ != current_level_range_.end())
? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
: curr_file_level_->num_files;
}
}
*fd = f;
*file_index = curr_file_index;
*is_last_key_in_file = cmp_largest == 0;
return file_hit;
}
FdWithKeyRange* GetNextFile() {
while (!search_ended_) {
// Start searching next level.
if (batch_iter_ == current_level_range_.end()) {
search_ended_ = !PrepareNextLevel();
continue;
} else {
if (maybe_repeat_key_) {
maybe_repeat_key_ = false;
// Check if we found the final value for the last key in the
// previous lookup range. If we did, then there's no need to look
// any further for that key, so advance batch_iter_. Else, keep
// batch_iter_ positioned on that key so we look it up again in
// the next file
if (current_level_range_.CheckKeyDone(batch_iter_)) {
++batch_iter_;
}
}
// batch_iter_prev_ will become the start key for the next file
// lookup
batch_iter_prev_ = batch_iter_;
}
MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
current_level_range_.end());
size_t curr_file_index =
(batch_iter_ != current_level_range_.end())
? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
: curr_file_level_->num_files;
FdWithKeyRange* f;
bool is_last_key_in_file;
if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
&is_last_key_in_file)) {
search_ended_ = !PrepareNextLevel();
} else {
MultiGetRange::Iterator upper_key = batch_iter_;
if (is_last_key_in_file) {
// Since cmp_largest is 0, batch_iter_ still points to the last key
// that falls in this file, instead of the next one. Increment
// upper_key so we can set the range properly for SST MultiGet
++upper_key;
++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level);
maybe_repeat_key_ = true;
}
// Set the range for this file
current_file_range_ =
MultiGetRange(next_file_range, batch_iter_prev_, upper_key);
returned_file_level_ = curr_level_;
hit_file_level_ = curr_level_;
is_hit_file_last_in_level_ =
curr_file_index == curr_file_level_->num_files - 1;
return f;
}
}
// Search ended
return nullptr;
}
// getter for current file level
// for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
unsigned int GetHitFileLevel() { return hit_file_level_; }
// Returns true if the most recent "hit file" (i.e., one returned by
// GetNextFile()) is at the last index in its level.
bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
const MultiGetRange& CurrentFileRange() { return current_file_range_; }
private:
unsigned int num_levels_;
unsigned int curr_level_;
unsigned int returned_file_level_;
unsigned int hit_file_level_;
struct FilePickerContext {
int32_t search_left_bound;
int32_t search_right_bound;
unsigned int curr_index_in_curr_level;
unsigned int start_index_in_curr_level;
FilePickerContext(int32_t left, int32_t right)
: search_left_bound(left), search_right_bound(right) {}
FilePickerContext() = default;
};
std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
MultiGetRange* range_;
// Iterator to iterate through the keys in a MultiGet batch, that gets reset
// at the beginning of each level. Each call to GetNextFile() will position
// batch_iter_ at or right after the last key that was found in the returned
// SST file
MultiGetRange::Iterator batch_iter_;
// An iterator that records the previous position of batch_iter_, i.e last
// key found in the previous SST file, in order to serve as the start of
// the batch key range for the next SST file
MultiGetRange::Iterator batch_iter_prev_;
bool maybe_repeat_key_;
MultiGetRange current_level_range_;
MultiGetRange current_file_range_;
#ifndef NDEBUG
std::vector<FileMetaData*>* files_;
#endif
autovector<LevelFilesBrief>* level_files_brief_;
bool search_ended_;
bool is_hit_file_last_in_level_;
LevelFilesBrief* curr_file_level_;
FileIndexer* file_indexer_;
const Comparator* user_comparator_;
const InternalKeyComparator* internal_comparator_;
#ifndef NDEBUG
FdWithKeyRange* prev_file_;
#endif
// Setup local variables to search next level.
// Returns false if there are no more levels to search.
bool PrepareNextLevel() {
if (curr_level_ == 0) {
MultiGetRange::Iterator mget_iter = current_level_range_.begin();
if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
curr_file_level_->num_files) {
#ifndef NDEBUG
prev_file_ = nullptr;
#endif
batch_iter_prev_ = current_level_range_.begin();
batch_iter_ = current_level_range_.begin();
return true;
}
}
curr_level_++;
// Reset key range to saved value
while (curr_level_ < num_levels_) {
bool level_contains_keys = false;
curr_file_level_ = &(*level_files_brief_)[curr_level_];
if (curr_file_level_->num_files == 0) {
// When current level is empty, the search bound generated from upper
// level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
// also empty.
for (auto mget_iter = current_level_range_.begin();
mget_iter != current_level_range_.end(); ++mget_iter) {
struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
assert(fp_ctx.search_left_bound == 0);
assert(fp_ctx.search_right_bound == -1 ||
fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
// Since current level is empty, it will need to search all files in
// the next level
fp_ctx.search_left_bound = 0;
fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
}
// Skip all subsequent empty levels
while ((*level_files_brief_)[++curr_level_].num_files == 0) {
}
}
// Some files may overlap each other. We find
// all files that overlap user_key and process them in order from
// newest to oldest. In the context of merge-operator, this can occur at
// any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
// are always compacted into a single entry).
int32_t start_index = -1;
current_level_range_ =
MultiGetRange(*range_, range_->begin(), range_->end());
for (auto mget_iter = current_level_range_.begin();
mget_iter != current_level_range_.end(); ++mget_iter) {
struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
if (curr_level_ == 0) {
// On Level-0, we read through all files to check for overlap.
start_index = 0;
level_contains_keys = true;
} else {
// On Level-n (n>=1), files are sorted. Binary search to find the
// earliest file whose largest key >= ikey. Search left bound and
// right bound are used to narrow the range.
if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
fp_ctx.search_right_bound =
static_cast<int32_t>(curr_file_level_->num_files) - 1;
}
// `search_right_bound_` is an inclusive upper-bound, but since it
// was determined based on user key, it is still possible the lookup
// key falls to the right of `search_right_bound_`'s corresponding
// file. So, pass a limit one higher, which allows us to detect this
// case.
Slice& ikey = mget_iter->ikey;
start_index = FindFileInRange(
*internal_comparator_, *curr_file_level_, ikey,
static_cast<uint32_t>(fp_ctx.search_left_bound),
static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
if (start_index == fp_ctx.search_right_bound + 1) {
// `ikey_` comes after `search_right_bound_`. The lookup key does
// not exist on this level, so let's skip this level and do a full
// binary search on the next level.
fp_ctx.search_left_bound = 0;
fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
current_level_range_.SkipKey(mget_iter);
continue;
} else {
level_contains_keys = true;
}
} else {
// search_left_bound > search_right_bound, key does not exist in
// this level. Since no comparison is done in this level, it will
// need to search all files in the next level.
fp_ctx.search_left_bound = 0;
fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
current_level_range_.SkipKey(mget_iter);
continue;
}
}
fp_ctx.start_index_in_curr_level = start_index;
fp_ctx.curr_index_in_curr_level = start_index;
}
if (level_contains_keys) {
#ifndef NDEBUG
prev_file_ = nullptr;
#endif
batch_iter_prev_ = current_level_range_.begin();
batch_iter_ = current_level_range_.begin();
return true;
}
curr_level_++;
}
// curr_level_ = num_levels_. So, no more levels to search.
return false;
}
};
} // anonymous namespace
VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
@ -1318,6 +1721,163 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
}
}
void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool* is_blob) {
PinnedIteratorsManager pinned_iters_mgr;
// Pin blocks that we read to hold merge operands
if (merge_operator_) {
pinned_iters_mgr.StartPinning();
}
// Even though we know the batch size won't be > MAX_BATCH_SIZE,
// use autovector in order to avoid unnecessary construction of GetContext
// objects, which is expensive
autovector<GetContext, 16> get_ctx;
for (auto iter = range->begin(); iter != range->end(); ++iter) {
assert(iter->s->ok() || iter->s->IsMergeInProgress());
get_ctx.emplace_back(
user_comparator(), merge_operator_, info_log_, db_statistics_,
iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
iter->value, nullptr, &(iter->merge_context),
&iter->max_covering_tombstone_seq, this->env_, &iter->seq,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
iter->get_context = &get_ctx.back();
}
MultiGetRange file_picker_range(*range, range->begin(), range->end());
FilePickerMultiGet fp(
storage_info_.files_, &file_picker_range,
&storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
&storage_info_.file_indexer_, user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
while (f != nullptr) {
MultiGetRange file_range = fp.CurrentFileRange();
bool timer_enabled =
GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
get_perf_context()->per_level_perf_context_enabled;
StopWatchNano timer(env_, timer_enabled /* auto_start */);
Status s = table_cache_->MultiGet(
read_options, *internal_comparator(), *f->file_metadata, &file_range,
mutable_cf_options_.prefix_extractor.get(),
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
fp.GetCurrentLevel());
// TODO: examine the behavior for corrupted key
if (timer_enabled) {
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
fp.GetCurrentLevel());
}
if (!s.ok()) {
// TODO: Set status for individual keys appropriately
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
*iter->s = s;
file_range.MarkKeyDone(iter);
}
return;
}
uint64_t batch_size = 0;
for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
GetContext& get_context = *iter->get_context;
Status* status = iter->s;
if (get_context.sample()) {
sample_file_read_inc(f->file_metadata);
}
batch_size++;
// report the counters before returning
if (get_context.State() != GetContext::kNotFound &&
get_context.State() != GetContext::kMerge &&
db_statistics_ != nullptr) {
get_context.ReportCounters();
} else {
if (iter->max_covering_tombstone_seq > 0) {
// The remaining files we look at will only contain covered keys, so
// we stop here for this key
file_picker_range.SkipKey(iter);
}
}
switch (get_context.State()) {
case GetContext::kNotFound:
// Keep searching in other files
break;
case GetContext::kMerge:
// TODO: update per-level perfcontext user_key_return_count for kMerge
break;
case GetContext::kFound:
if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0);
} else if (fp.GetHitFileLevel() == 1) {
RecordTick(db_statistics_, GET_HIT_L1);
} else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
}
PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
fp.GetHitFileLevel());
file_range.MarkKeyDone(iter);
continue;
case GetContext::kDeleted:
// Use empty error message for speed
*status = Status::NotFound();
file_range.MarkKeyDone(iter);
continue;
case GetContext::kCorrupt:
*status =
Status::Corruption("corrupted key for ", iter->lkey->user_key());
file_range.MarkKeyDone(iter);
continue;
case GetContext::kBlobIndex:
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
file_range.MarkKeyDone(iter);
continue;
}
}
RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
if (file_picker_range.empty()) {
break;
}
f = fp.GetNextFile();
}
// Process any left over keys
for (auto iter = range->begin(); iter != range->end(); ++iter) {
GetContext& get_context = *iter->get_context;
Status* status = iter->s;
Slice user_key = iter->lkey->user_key();
if (db_statistics_ != nullptr) {
get_context.ReportCounters();
}
if (GetContext::kMerge == get_context.State()) {
if (!merge_operator_) {
*status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
range->MarkKeyDone(iter);
continue;
}
// merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands;
std::string* str_value =
iter->value != nullptr ? iter->value->GetSelf() : nullptr;
*status = MergeHelper::TimedFullMerge(
merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
str_value, info_log_, db_statistics_, env_,
nullptr /* result_operand */, true);
if (LIKELY(iter->value != nullptr)) {
iter->value->PinSelf();
}
} else {
range->MarkKeyDone(iter);
*status = Status::NotFound(); // Use an empty error message for speed
}
}
}
bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
// Reaching the bottom level implies misses at all upper levels, so we'll
// skip checking the filters when we predict a hit.

@ -44,6 +44,8 @@
#include "options/db_options.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "table/get_context.h"
#include "table/multiget_context.h"
namespace rocksdb {
@ -552,6 +554,7 @@ class VersionStorageInfo {
void operator=(const VersionStorageInfo&) = delete;
};
using MultiGetRange = MultiGetContext::Range;
class Version {
public:
// Append to *iters a sequence of iterators that will
@ -593,6 +596,9 @@ class Version {
SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr,
bool* is_blob = nullptr);
void MultiGet(const ReadOptions&, MultiGetRange* range,
ReadCallback* callback = nullptr, bool* is_blob = nullptr);
// Loads some stats information from files. Call without mutex held. It needs
// to be called before applying the version to the version set.
void PrepareApply(const MutableCFOptions& mutable_cf_options,

@ -421,6 +421,46 @@ class DB {
keys, values);
}
// Overloaded MultiGet API that improves performance by batching operations
// in the read path for greater efficiency. Currently, only the block based
// table format with full filters are supported. Other table formats such
// as plain table, block based table with block based filters and
// partitioned indexes will still work, but will not get any performance
// benefits.
// Parameters -
// options - ReadOptions
// column_family - ColumnFamilyHandle* that the keys belong to. All the keys
// passed to the API are restricted to a single column family
// num_keys - Number of keys to lookup
// keys - Pointer to C style array of key Slices with num_keys elements
// values - Pointer to C style array of PinnableSlices with num_keys elements
// statuses - Pointer to C style array of Status with num_keys elements
// sorted_input - If true, it means the input keys are already sorted by key
// order, so the MultiGet() API doesn't have to sort them
// again. If false, the keys will be copied and sorted
// internally by the API - the input array will not be
// modified
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool /*sorted_input*/ = false) {
std::vector<ColumnFamilyHandle*> cf;
std::vector<Slice> user_keys;
std::vector<Status> status;
std::vector<std::string> vals;
for (size_t i = 0; i < num_keys; ++i) {
cf.emplace_back(column_family);
user_keys.emplace_back(keys[i]);
}
status = MultiGet(options, cf, user_keys, &vals);
std::copy(status.begin(), status.end(), statuses);
for (auto& value : vals) {
values->PinSelf(value);
values++;
}
}
// If the key definitely does not exist in the database, then this method
// returns false, else true. If the caller wants to obtain value when the key
// is found in memory, a bool for 'value_found' must be passed. 'value_found'

@ -24,6 +24,8 @@
#include <stdexcept>
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "table/multiget_context.h"
namespace rocksdb {
@ -64,12 +66,20 @@ class FilterBitsBuilder {
// A class that checks if a key can be in filter
// It should be initialized by Slice generated by BitsBuilder
using MultiGetRange = MultiGetContext::Range;
class FilterBitsReader {
public:
virtual ~FilterBitsReader() {}
// Check if the entry match the bits in filter
virtual bool MayMatch(const Slice& entry) = 0;
// Check if an array of entries match the bits in filter
virtual void MayMatch(int num_keys, Slice** keys, bool* may_match) {
for (int i = 0; i < num_keys; ++i) {
may_match[i] = MayMatch(*keys[i]);
}
}
};
// We add a new format of filter block called full filter block

@ -81,6 +81,8 @@ enum Tickers : uint32_t {
// exist.
BLOOM_FILTER_FULL_TRUE_POSITIVE,
BLOOM_FILTER_MICROS,
// # persistent cache hit
PERSISTENT_CACHE_HIT,
// # persistent cache miss
@ -424,6 +426,7 @@ enum Histograms : uint32_t {
BLOB_DB_DECOMPRESSION_MICROS,
// Time spent flushing memtable to disk
FLUSH_TIME,
SST_BATCH_SIZE,
HISTOGRAM_ENUM_MAX,
};

@ -96,6 +96,15 @@ class StackableDB : public DB {
return db_->MultiGet(options, column_family, keys, values);
}
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override {
return db_->MultiGet(options, column_family, num_keys, keys,
values, statuses, sorted_input);
}
using DB::IngestExternalFile;
virtual Status IngestExternalFile(
ColumnFamilyHandle* column_family,

@ -45,6 +45,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{BLOOM_FILTER_FULL_POSITIVE, "rocksdb.bloom.filter.full.positive"},
{BLOOM_FILTER_FULL_TRUE_POSITIVE,
"rocksdb.bloom.filter.full.true.positive"},
{BLOOM_FILTER_MICROS, "rocksdb.bloom.filter.micros"},
{PERSISTENT_CACHE_HIT, "rocksdb.persistent.cache.hit"},
{PERSISTENT_CACHE_MISS, "rocksdb.persistent.cache.miss"},
{SIM_BLOCK_CACHE_HIT, "rocksdb.sim.block.cache.hit"},
@ -228,6 +229,7 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{BLOB_DB_COMPRESSION_MICROS, "rocksdb.blobdb.compression.micros"},
{BLOB_DB_DECOMPRESSION_MICROS, "rocksdb.blobdb.decompression.micros"},
{FLUSH_TIME, "rocksdb.db.flush.micros"},
{SST_BATCH_SIZE, "rocksdb.sst.batch.size"},
};
std::shared_ptr<Statistics> CreateDBStatistics() {

@ -39,6 +39,7 @@
#include "table/get_context.h"
#include "table/internal_iterator.h"
#include "table/meta_blocks.h"
#include "table/multiget_context.h"
#include "table/partitioned_filter_block.h"
#include "table/persistent_cache_helper.h"
#include "table/sst_file_writer_collectors.h"
@ -2623,6 +2624,29 @@ bool BlockBasedTable::FullFilterKeyMayMatch(
return may_match;
}
void BlockBasedTable::FullFilterKeysMayMatch(
const ReadOptions& read_options, FilterBlockReader* filter,
MultiGetRange* range, const bool no_io,
const SliceTransform* prefix_extractor) const {
if (filter == nullptr || filter->IsBlockBased()) {
return;
}
if (filter->whole_key_filtering()) {
filter->KeysMayMatch(range, prefix_extractor, kNotValid, no_io);
} else if (!read_options.total_order_seek && prefix_extractor &&
rep_->table_properties->prefix_extractor_name.compare(
prefix_extractor->Name()) == 0) {
for (auto iter = range->begin(); iter != range->end(); ++iter) {
Slice user_key = iter->lkey->user_key();
if (!prefix_extractor->InDomain(user_key)) {
range->SkipKey(iter);
}
}
filter->PrefixesMayMatch(range, prefix_extractor, kNotValid, false);
}
}
Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
GetContext* get_context,
const SliceTransform* prefix_extractor,
@ -2631,17 +2655,22 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
Status s;
const bool no_io = read_options.read_tier == kBlockCacheTier;
CachableEntry<FilterBlockReader> filter_entry;
if (!skip_filters) {
filter_entry =
GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr,
read_options.read_tier == kBlockCacheTier, get_context);
}
FilterBlockReader* filter = filter_entry.value;
bool may_match;
FilterBlockReader* filter = nullptr;
{
if (!skip_filters) {
filter_entry =
GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr,
read_options.read_tier == kBlockCacheTier, get_context);
}
filter = filter_entry.value;
// First check the full filter
// If full filter not useful, Then go into each block
if (!FullFilterKeyMayMatch(read_options, filter, key, no_io,
prefix_extractor)) {
// First check the full filter
// If full filter not useful, Then go into each block
may_match = FullFilterKeyMayMatch(read_options, filter, key, no_io,
prefix_extractor);
}
if (!may_match) {
RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL);
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level);
} else {
@ -2747,6 +2776,122 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
return s;
}
using MultiGetRange = MultiGetContext::Range;
void BlockBasedTable::MultiGet(const ReadOptions& read_options,
const MultiGetRange* mget_range,
const SliceTransform* prefix_extractor,
bool skip_filters) {
const bool no_io = read_options.read_tier == kBlockCacheTier;
CachableEntry<FilterBlockReader> filter_entry;
FilterBlockReader* filter = nullptr;
MultiGetRange sst_file_range(*mget_range, mget_range->begin(),
mget_range->end());
{
if (!skip_filters) {
// TODO: Figure out where the stats should go
filter_entry = GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr,
read_options.read_tier == kBlockCacheTier,
nullptr /*get_context*/);
}
filter = filter_entry.value;
// First check the full filter
// If full filter not useful, Then go into each block
FullFilterKeysMayMatch(read_options, filter, &sst_file_range, no_io,
prefix_extractor);
}
if (skip_filters || !sst_file_range.empty()) {
IndexBlockIter iiter_on_stack;
// if prefix_extractor found in block differs from options, disable
// BlockPrefixIndex. Only do this check when index_type is kHashSearch.
bool need_upper_bound_check = false;
if (rep_->index_type == BlockBasedTableOptions::kHashSearch) {
need_upper_bound_check = PrefixExtractorChanged(
rep_->table_properties.get(), prefix_extractor);
}
auto iiter = NewIndexIterator(
read_options, need_upper_bound_check, &iiter_on_stack,
/* index_entry */ nullptr, sst_file_range.begin()->get_context);
std::unique_ptr<InternalIteratorBase<BlockHandle>> iiter_unique_ptr;
if (iiter != &iiter_on_stack) {
iiter_unique_ptr.reset(iiter);
}
for (auto miter = sst_file_range.begin(); miter != sst_file_range.end();
++miter) {
Status s;
GetContext* get_context = miter->get_context;
const Slice& key = miter->ikey;
bool matched = false; // if such user key matched a key in SST
bool done = false;
for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) {
DataBlockIter biter;
NewDataBlockIterator<DataBlockIter>(
rep_, read_options, iiter->value(), &biter, false,
true /* key_includes_seq */, get_context);
if (read_options.read_tier == kBlockCacheTier &&
biter.status().IsIncomplete()) {
// couldn't get block from block_cache
// Update Saver.state to Found because we are only looking for
// whether we can guarantee the key is not there when "no_io" is set
get_context->MarkKeyMayExist();
break;
}
if (!biter.status().ok()) {
s = biter.status();
break;
}
bool may_exist = biter.SeekForGet(key);
if (!may_exist) {
// HashSeek cannot find the key this block and the the iter is not
// the end of the block, i.e. cannot be in the following blocks
// either. In this case, the seek_key cannot be found, so we break
// from the top level for-loop.
break;
}
// Call the *saver function on each entry/block until it returns false
for (; biter.Valid(); biter.Next()) {
ParsedInternalKey parsed_key;
if (!ParseInternalKey(biter.key(), &parsed_key)) {
s = Status::Corruption(Slice());
}
if (!get_context->SaveValue(
parsed_key, biter.value(), &matched,
biter.IsValuePinned() ? &biter : nullptr)) {
done = true;
break;
}
}
s = biter.status();
if (done) {
// Avoid the extra Next which is expensive in two-level indexes
break;
}
}
if (matched && filter != nullptr && !filter->IsBlockBased()) {
RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_TRUE_POSITIVE);
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1,
rep_->level);
}
if (s.ok()) {
s = iiter->status();
}
*(miter->s) = s;
}
}
// if rep_->filter_entry is not set, we should call Release(); otherwise
// don't call, in this case we have a local copy in rep_->filter_entry,
// it's pinned to the cache and will be released in the destructor
if (!rep_->filter_entry.IsSet()) {
filter_entry.Release(rep_->table_options.block_cache.get());
}
}
Status BlockBasedTable::Prefetch(const Slice* const begin,
const Slice* const end) {
auto& comparator = rep_->internal_comparator;

@ -27,6 +27,8 @@
#include "table/block_based_table_factory.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "table/get_context.h"
#include "table/multiget_context.h"
#include "table/persistent_cache_helper.h"
#include "table/table_properties_internal.h"
#include "table/table_reader.h"
@ -121,6 +123,11 @@ class BlockBasedTable : public TableReader {
GetContext* get_context, const SliceTransform* prefix_extractor,
bool skip_filters = false) override;
void MultiGet(const ReadOptions& readOptions,
const MultiGetContext::Range* mget_range,
const SliceTransform* prefix_extractor,
bool skip_filters = false) override;
// Pre-fetch the disk blocks that correspond to the key range specified by
// (kbegin, kend). The call will return error status in the event of
// IO or iteration error.
@ -355,6 +362,11 @@ class BlockBasedTable : public TableReader {
const Slice& user_key, const bool no_io,
const SliceTransform* prefix_extractor = nullptr) const;
void FullFilterKeysMayMatch(
const ReadOptions& read_options, FilterBlockReader* filter,
MultiGetRange* range, const bool no_io,
const SliceTransform* prefix_extractor = nullptr) const;
static Status PrefetchTail(
RandomAccessFileReader* file, uint64_t file_size,
TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all,

@ -99,6 +99,19 @@ class FilterBlockReader {
const bool no_io = false,
const Slice* const const_ikey_ptr = nullptr) = 0;
virtual void KeysMayMatch(MultiGetRange* range,
const SliceTransform* prefix_extractor,
uint64_t block_offset = kNotValid,
const bool no_io = false) {
for (auto iter = range->begin(); iter != range->end(); ++iter) {
const Slice ukey = iter->ukey;
const Slice ikey = iter->ikey;
if (!KeyMayMatch(ukey, prefix_extractor, block_offset, no_io, &ikey)) {
range->SkipKey(iter);
}
}
}
/**
* no_io and const_ikey_ptr here means the same as in KeyMayMatch
*/
@ -108,6 +121,20 @@ class FilterBlockReader {
const bool no_io = false,
const Slice* const const_ikey_ptr = nullptr) = 0;
virtual void PrefixesMayMatch(MultiGetRange* range,
const SliceTransform* prefix_extractor,
uint64_t block_offset = kNotValid,
const bool no_io = false) {
for (auto iter = range->begin(); iter != range->end(); ++iter) {
const Slice ukey = iter->ukey;
const Slice ikey = iter->ikey;
if (!KeyMayMatch(prefix_extractor->Transform(ukey), prefix_extractor,
block_offset, no_io, &ikey)) {
range->SkipKey(iter);
}
}
}
virtual size_t ApproximateMemoryUsage() const = 0;
virtual size_t size() const { return size_; }
virtual Statistics* statistics() const { return statistics_; }

@ -159,6 +159,59 @@ bool FullFilterBlockReader::MayMatch(const Slice& entry) {
return true; // remain the same with block_based filter
}
void FullFilterBlockReader::KeysMayMatch(
MultiGetRange* range, const SliceTransform* /*prefix_extractor*/,
uint64_t block_offset, const bool /*no_io*/) {
#ifdef NDEBUG
(void)range;
(void)block_offset;
#endif
assert(block_offset == kNotValid);
if (!whole_key_filtering_) {
// Simply return. Don't skip any key - consider all keys as likely to be
// present
return;
}
MayMatch(range);
}
void FullFilterBlockReader::PrefixesMayMatch(
MultiGetRange* range, const SliceTransform* /* prefix_extractor */,
uint64_t block_offset, const bool /*no_io*/) {
#ifdef NDEBUG
(void)range;
(void)block_offset;
#endif
assert(block_offset == kNotValid);
MayMatch(range);
}
void FullFilterBlockReader::MayMatch(MultiGetRange* range) {
if (contents_.size() == 0) {
return;
}
// We need to use an array instead of autovector for may_match since
// &may_match[0] doesn't work for autovector<bool> (compiler error). So
// declare both keys and may_match as arrays, which is also slightly less
// expensive compared to autovector
Slice* keys[MultiGetContext::MAX_BATCH_SIZE];
bool may_match[MultiGetContext::MAX_BATCH_SIZE];
int num_keys = 0;
for (auto iter = range->begin(); iter != range->end(); ++iter) {
keys[num_keys++] = &iter->ukey;
}
filter_bits_reader_->MayMatch(num_keys, &keys[0], &may_match[0]);
int i = 0;
for (auto iter = range->begin(); iter != range->end(); ++iter) {
if (!may_match[i]) {
range->SkipKey(iter);
}
++i;
}
}
size_t FullFilterBlockReader::ApproximateMemoryUsage() const {
size_t usage = block_contents_.usable_size();
#ifdef ROCKSDB_MALLOC_USABLE_SIZE

@ -107,6 +107,16 @@ class FullFilterBlockReader : public FilterBlockReader {
const Slice& prefix, const SliceTransform* prefix_extractor,
uint64_t block_offset = kNotValid, const bool no_io = false,
const Slice* const const_ikey_ptr = nullptr) override;
virtual void KeysMayMatch(MultiGetRange* range,
const SliceTransform* prefix_extractor,
uint64_t block_offset = kNotValid,
const bool no_io = false) override;
virtual void PrefixesMayMatch(MultiGetRange* range,
const SliceTransform* prefix_extractor,
uint64_t block_offset = kNotValid,
const bool no_io = false) override;
virtual size_t ApproximateMemoryUsage() const override;
virtual bool RangeMayExist(const Slice* iterate_upper_bound, const Slice& user_key,
const SliceTransform* prefix_extractor,
@ -124,6 +134,7 @@ class FullFilterBlockReader : public FilterBlockReader {
// No copying allowed
FullFilterBlockReader(const FullFilterBlockReader&);
bool MayMatch(const Slice& entry);
void MayMatch(MultiGetRange* range);
void operator=(const FullFilterBlockReader&);
bool IsFilterCompatible(const Slice* iterate_upper_bound,
const Slice& prefix, const Comparator* comparator);

@ -45,6 +45,8 @@ class TestFilterBitsReader : public FilterBitsReader {
explicit TestFilterBitsReader(const Slice& contents)
: data_(contents.data()), len_(static_cast<uint32_t>(contents.size())) {}
// Silence compiler warning about overloaded virtual
using FilterBitsReader::MayMatch;
bool MayMatch(const Slice& entry) override {
uint32_t h = Hash(entry.data(), entry.size(), 1);
for (size_t i = 0; i + 4 <= len_; i += 4) {

@ -5,6 +5,7 @@
#pragma once
#include <string>
#include <db/dbformat.h>
#include "db/merge_context.h"
#include "db/read_callback.h"
#include "rocksdb/env.h"
@ -61,6 +62,8 @@ class GetContext {
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr);
GetContext() = default;
void MarkKeyMayExist();
// Records this key, value, and any meta-data (such as sequence number and

@ -0,0 +1,249 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <algorithm>
#include <string>
#include "db/lookup_key.h"
#include "db/merge_context.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/types.h"
#include "util/autovector.h"
namespace rocksdb {
class GetContext;
struct KeyContext {
const Slice* key;
LookupKey* lkey;
Slice ukey;
Slice ikey;
Status* s;
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq;
bool key_exists;
SequenceNumber seq;
void* cb_arg;
PinnableSlice* value;
GetContext* get_context;
KeyContext(const Slice& user_key, PinnableSlice* val, Status* stat)
: key(&user_key),
lkey(nullptr),
s(stat),
max_covering_tombstone_seq(0),
key_exists(false),
seq(0),
cb_arg(nullptr),
value(val),
get_context(nullptr) {}
KeyContext() = default;
};
// The MultiGetContext class is a container for the sorted list of keys that
// we need to lookup in a batch. Its main purpose is to make batch execution
// easier by allowing various stages of the MultiGet lookups to operate on
// subsets of keys, potentially non-contiguous. In order to accomplish this,
// it defines the following classes -
//
// MultiGetContext::Range
// MultiGetContext::Range::Iterator
// MultiGetContext::Range::IteratorWrapper
//
// Here is an example of how this can be used -
//
// {
// MultiGetContext ctx(...);
// MultiGetContext::Range range = ctx.GetMultiGetRange();
//
// // Iterate to determine some subset of the keys
// MultiGetContext::Range::Iterator start = range.begin();
// MultiGetContext::Range::Iterator end = ...;
//
// // Make a new range with a subset of keys
// MultiGetContext::Range subrange(range, start, end);
//
// // Define an auxillary vector, if needed, to hold additional data for
// // each key
// std::array<Foo, MultiGetContext::MAX_BATCH_SIZE> aux;
//
// // Iterate over the subrange and the auxillary vector simultaneously
// MultiGetContext::Range::Iterator iter = subrange.begin();
// for (; iter != subrange.end(); ++iter) {
// KeyContext& key = *iter;
// Foo& aux_key = aux_iter[iter.index()];
// ...
// }
// }
class MultiGetContext {
public:
// Limit the number of keys in a batch to this number. Benchmarks show that
// there is negligible benefit for batches exceeding this. Keeping this < 64
// simplifies iteration, as well as reduces the amount of stack allocations
// htat need to be performed
static const int MAX_BATCH_SIZE = 32;
MultiGetContext(KeyContext** sorted_keys, size_t num_keys,
SequenceNumber snapshot)
: sorted_keys_(sorted_keys),
num_keys_(num_keys),
value_mask_(0),
lookup_key_ptr_(reinterpret_cast<LookupKey*>(lookup_key_stack_buf)) {
int index = 0;
if (num_keys > MAX_LOOKUP_KEYS_ON_STACK) {
lookup_key_heap_buf.reset(new char[sizeof(LookupKey) * num_keys]);
lookup_key_ptr_ = reinterpret_cast<LookupKey*>(
lookup_key_heap_buf.get());
}
for (size_t iter = 0; iter != num_keys_; ++iter) {
sorted_keys_[iter]->lkey = new (&lookup_key_ptr_[index])
LookupKey(*sorted_keys_[iter]->key, snapshot);
sorted_keys_[iter]->ukey = sorted_keys_[iter]->lkey->user_key();
sorted_keys_[iter]->ikey = sorted_keys_[iter]->lkey->internal_key();
index++;
}
}
~MultiGetContext() {
for (size_t i = 0; i < num_keys_; ++i) {
lookup_key_ptr_[i].~LookupKey();
}
}
private:
static const int MAX_LOOKUP_KEYS_ON_STACK = 16;
alignas(alignof(LookupKey))
char lookup_key_stack_buf[sizeof(LookupKey) * MAX_LOOKUP_KEYS_ON_STACK];
KeyContext** sorted_keys_;
size_t num_keys_;
uint64_t value_mask_;
std::unique_ptr<char> lookup_key_heap_buf;
LookupKey* lookup_key_ptr_;
public:
// MultiGetContext::Range - Specifies a range of keys, by start and end index,
// from the parent MultiGetContext. Each range contains a bit vector that
// indicates whether the corresponding keys need to be processed or skipped.
// A Range object can be copy constructed, and the new object inherits the
// original Range's bit vector. This is useful for progressively skipping
// keys as the lookup goes through various stages. For example, when looking
// up keys in the same SST file, a Range is created excluding keys not
// belonging to that file. A new Range is then copy constructed and individual
// keys are skipped based on bloom filter lookup.
class Range {
public:
// MultiGetContext::Range::Iterator - A forward iterator that iterates over
// non-skippable keys in a Range, as well as keys whose final value has been
// found. The latter is tracked by MultiGetContext::value_mask_
class Iterator {
public:
// -- iterator traits
typedef Iterator self_type;
typedef KeyContext value_type;
typedef KeyContext& reference;
typedef KeyContext* pointer;
typedef int difference_type;
typedef std::forward_iterator_tag iterator_category;
Iterator(const Range* range, size_t idx)
: range_(range), ctx_(range->ctx_), index_(idx) {
while (index_ < range_->end_ &&
(1ull << index_) &
(range_->ctx_->value_mask_ | range_->skip_mask_))
index_++;
}
Iterator(const Iterator&) = default;
Iterator& operator=(const Iterator&) = default;
Iterator& operator++() {
while (++index_ < range_->end_ &&
(1ull << index_) &
(range_->ctx_->value_mask_ | range_->skip_mask_))
;
return *this;
}
bool operator==(Iterator other) const {
assert(range_->ctx_ == other.range_->ctx_);
return index_ == other.index_;
}
bool operator!=(Iterator other) const {
assert(range_->ctx_ == other.range_->ctx_);
return index_ != other.index_;
}
KeyContext& operator*() {
assert(index_ < range_->end_ && index_ >= range_->start_);
return *(ctx_->sorted_keys_[index_]);
}
KeyContext* operator->() {
assert(index_ < range_->end_ && index_ >= range_->start_);
return ctx_->sorted_keys_[index_];
}
size_t index() { return index_; }
private:
friend Range;
const Range* range_;
const MultiGetContext* ctx_;
size_t index_;
};
Range(const Range& mget_range,
const Iterator& first,
const Iterator& last) {
ctx_ = mget_range.ctx_;
start_ = first.index_;
end_ = last.index_;
skip_mask_ = mget_range.skip_mask_;
}
Range() = default;
Iterator begin() const { return Iterator(this, start_); }
Iterator end() const { return Iterator(this, end_); }
bool empty() {
return (((1ull << end_) - 1) & ~((1ull << start_) - 1) &
~(ctx_->value_mask_ | skip_mask_)) == 0;
}
void SkipKey(const Iterator& iter) { skip_mask_ |= 1ull << iter.index_; }
// Update the value_mask_ in MultiGetContext so its
// immediately reflected in all the Range Iterators
void MarkKeyDone(Iterator& iter) {
ctx_->value_mask_ |= (1ull << iter.index_);
}
bool CheckKeyDone(Iterator& iter) {
return ctx_->value_mask_ & (1ull << iter.index_);
}
private:
friend MultiGetContext;
MultiGetContext* ctx_;
size_t start_;
size_t end_;
uint64_t skip_mask_;
Range(MultiGetContext* ctx, size_t num_keys)
: ctx_(ctx), start_(0), end_(num_keys), skip_mask_(0) {}
};
// Return the initial range that encompasses all the keys in the batch
Range GetMultiGetRange() { return Range(this, num_keys_); }
};
} // namespace rocksdb

@ -11,7 +11,9 @@
#include <memory>
#include "db/range_tombstone_fragmenter.h"
#include "rocksdb/slice_transform.h"
#include "table/get_context.h"
#include "table/internal_iterator.h"
#include "table/multiget_context.h"
namespace rocksdb {
@ -22,6 +24,7 @@ class Arena;
struct ReadOptions;
struct TableProperties;
class GetContext;
class MultiGetContext;
// A Table is a sorted map from strings to strings. Tables are
// immutable and persistent. A Table may be safely accessed from
@ -86,6 +89,16 @@ class TableReader {
const SliceTransform* prefix_extractor,
bool skip_filters = false) = 0;
virtual void MultiGet(const ReadOptions& readOptions,
const MultiGetContext::Range* mget_range,
const SliceTransform* prefix_extractor,
bool skip_filters = false) {
for (auto iter = mget_range->begin(); iter != mget_range->end(); ++iter) {
*iter->s = Get(readOptions, iter->ikey, iter->get_context,
prefix_extractor, skip_filters);
}
}
// Prefetch data corresponding to a give range of keys
// Typically this functionality is required for table implementations that
// persists the data on a non volatile storage medium like disk/SSD

@ -1114,6 +1114,9 @@ DEFINE_uint64(stats_persist_period_sec,
DEFINE_uint64(stats_history_buffer_size,
rocksdb::Options().stats_history_buffer_size,
"Max number of stats snapshots to keep in memory");
DEFINE_int64(multiread_stride, 0,
"Stride length for the keys in a MultiGet batch");
DEFINE_bool(multiread_batched, false, "Use the new MultiGet API");
enum RepFactory {
kSkipList,
@ -2700,6 +2703,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
} else if (name == "readreverse") {
method = &Benchmark::ReadReverse;
} else if (name == "readrandom") {
if (FLAGS_multiread_stride) {
fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
entries_per_batch_);
}
method = &Benchmark::ReadRandom;
} else if (name == "readrandomfast") {
method = &Benchmark::ReadRandomFast;
@ -4531,6 +4538,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
int64_t read = 0;
int64_t found = 0;
int64_t bytes = 0;
int num_keys = 0;
int64_t key_rand = GetRandomKey(&thread->rand);
ReadOptions options(FLAGS_verify_checksum, true);
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
@ -4542,8 +4551,21 @@ void VerifyDBFromDB(std::string& truth_db_name) {
// We use same key_rand as seed for key and column family so that we can
// deterministically find the cfh corresponding to a particular key, as it
// is done in DoWrite method.
int64_t key_rand = GetRandomKey(&thread->rand);
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
if (entries_per_batch_ > 1 && FLAGS_multiread_stride) {
if (++num_keys == entries_per_batch_) {
num_keys = 0;
key_rand = GetRandomKey(&thread->rand);
if ((key_rand + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
FLAGS_num) {
key_rand = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
}
} else {
key_rand += FLAGS_multiread_stride;
}
} else {
key_rand = GetRandomKey(&thread->rand);
}
read++;
Status s;
if (FLAGS_num_column_families > 1) {
@ -4595,6 +4617,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
std::vector<Slice> keys;
std::vector<std::unique_ptr<const char[]> > key_guards;
std::vector<std::string> values(entries_per_batch_);
PinnableSlice* pin_values = new PinnableSlice[entries_per_batch_];
std::vector<Status> stat_list(entries_per_batch_);
while (static_cast<int64_t>(keys.size()) < entries_per_batch_) {
key_guards.push_back(std::unique_ptr<const char[]>());
keys.push_back(AllocateKey(&key_guards.back()));
@ -4603,21 +4627,52 @@ void VerifyDBFromDB(std::string& truth_db_name) {
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
for (int64_t i = 0; i < entries_per_batch_; ++i) {
GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
if (FLAGS_multiread_stride) {
int64_t key = GetRandomKey(&thread->rand);
if ((key + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
(int64_t)FLAGS_num) {
key = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
}
for (int64_t i = 0; i < entries_per_batch_; ++i) {
GenerateKeyFromInt(key, FLAGS_num, &keys[i]);
key += FLAGS_multiread_stride;
}
} else {
for (int64_t i = 0; i < entries_per_batch_; ++i) {
GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
}
}
std::vector<Status> statuses = db->MultiGet(options, keys, &values);
assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
read += entries_per_batch_;
num_multireads++;
for (int64_t i = 0; i < entries_per_batch_; ++i) {
if (statuses[i].ok()) {
++found;
} else if (!statuses[i].IsNotFound()) {
fprintf(stderr, "MultiGet returned an error: %s\n",
statuses[i].ToString().c_str());
abort();
if (!FLAGS_multiread_batched) {
std::vector<Status> statuses = db->MultiGet(options, keys, &values);
assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
read += entries_per_batch_;
num_multireads++;
for (int64_t i = 0; i < entries_per_batch_; ++i) {
if (statuses[i].ok()) {
++found;
} else if (!statuses[i].IsNotFound()) {
fprintf(stderr, "MultiGet returned an error: %s\n",
statuses[i].ToString().c_str());
abort();
}
}
} else {
db->MultiGet(options, db->DefaultColumnFamily(), keys.size(),
keys.data(), pin_values, stat_list.data());
read += entries_per_batch_;
num_multireads++;
for (int64_t i = 0; i < entries_per_batch_; ++i) {
if (stat_list[i].ok()) {
++found;
} else if (!stat_list[i].IsNotFound()) {
fprintf(stderr, "MultiGet returned an error: %s\n",
stat_list[i].ToString().c_str());
abort();
}
stat_list[i] = Status::OK();
pin_values[i].Reset();
}
}
if (thread->shared->read_rate_limiter.get() != nullptr &&

@ -181,8 +181,37 @@ class FullFilterBitsReader : public FilterBitsReader {
// Other Error params, including a broken filter, regarded as match
if (num_probes_ == 0 || num_lines_ == 0) return true;
uint32_t hash = BloomHash(entry);
return HashMayMatch(hash, Slice(data_, data_len_),
num_probes_, num_lines_);
uint32_t bit_offset;
FilterPrepare(hash, Slice(data_, data_len_), num_lines_, &bit_offset);
return HashMayMatch(hash, Slice(data_, data_len_), num_probes_, bit_offset);
}
virtual void MayMatch(int num_keys, Slice** keys, bool* may_match) override {
if (data_len_ <= 5) { // remain same with original filter
for (int i = 0; i < num_keys; ++i) {
may_match[i] = false;
}
return;
}
for (int i = 0; i < num_keys; ++i) {
may_match[i] = true;
}
// Other Error params, including a broken filter, regarded as match
if (num_probes_ == 0 || num_lines_ == 0) return;
uint32_t hashes[MultiGetContext::MAX_BATCH_SIZE];
uint32_t bit_offsets[MultiGetContext::MAX_BATCH_SIZE];
for (int i = 0; i < num_keys; ++i) {
hashes[i] = BloomHash(*keys[i]);
FilterPrepare(hashes[i], Slice(data_, data_len_), num_lines_,
&bit_offsets[i]);
}
for (int i = 0; i < num_keys; ++i) {
if (!HashMayMatch(hashes[i], Slice(data_, data_len_), num_probes_,
bit_offsets[i])) {
may_match[i] = false;
}
}
}
private:
@ -211,7 +240,10 @@ class FullFilterBitsReader : public FilterBitsReader {
// Before calling this function, need to ensure the input meta data
// is valid.
bool HashMayMatch(const uint32_t& hash, const Slice& filter,
const size_t& num_probes, const uint32_t& num_lines);
const size_t& num_probes, const uint32_t& bit_offset);
void FilterPrepare(const uint32_t& hash, const Slice& filter,
const uint32_t& num_lines, uint32_t* bit_offset);
// No Copy allowed
FullFilterBitsReader(const FullFilterBitsReader&);
@ -232,29 +264,44 @@ void FullFilterBitsReader::GetFilterMeta(const Slice& filter,
*num_lines = DecodeFixed32(filter.data() + len - 4);
}
void FullFilterBitsReader::FilterPrepare(const uint32_t& hash,
const Slice& filter,
const uint32_t& num_lines,
uint32_t* bit_offset) {
uint32_t len = static_cast<uint32_t>(filter.size());
if (len <= 5) return; // remain the same with original filter
// It is ensured the params are valid before calling it
assert(num_lines != 0 && (len - 5) % num_lines == 0);
uint32_t h = hash;
// Left shift by an extra 3 to convert bytes to bits
uint32_t b = (h % num_lines) << (log2_cache_line_size_ + 3);
PREFETCH(&filter.data()[b / 8], 0 /* rw */, 1 /* locality */);
PREFETCH(&filter.data()[b / 8 + (1 << log2_cache_line_size_) - 1],
0 /* rw */, 1 /* locality */);
*bit_offset = b;
}
bool FullFilterBitsReader::HashMayMatch(const uint32_t& hash,
const Slice& filter, const size_t& num_probes,
const uint32_t& num_lines) {
const Slice& filter,
const size_t& num_probes,
const uint32_t& bit_offset) {
uint32_t len = static_cast<uint32_t>(filter.size());
if (len <= 5) return false; // remain the same with original filter
// It is ensured the params are valid before calling it
assert(num_probes != 0);
assert(num_lines != 0 && (len - 5) % num_lines == 0);
const char* data = filter.data();
uint32_t h = hash;
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
// Left shift by an extra 3 to convert bytes to bits
uint32_t b = (h % num_lines) << (log2_cache_line_size_ + 3);
PREFETCH(&data[b / 8], 0 /* rw */, 1 /* locality */);
PREFETCH(&data[b / 8 + (1 << log2_cache_line_size_) - 1], 0 /* rw */,
1 /* locality */);
for (uint32_t i = 0; i < num_probes; ++i) {
// Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized
// to a simple and operation by compiler.
const uint32_t bitpos = b + (h & ((1 << (log2_cache_line_size_ + 3)) - 1));
const uint32_t bitpos =
bit_offset + (h & ((1 << (log2_cache_line_size_ + 3)) - 1));
if (((data[bitpos / 8]) & (1 << (bitpos % 8))) == 0) {
return false;
}

@ -123,6 +123,8 @@ class FaultInjectionTestEnv : public EnvWrapper {
virtual Status RenameFile(const std::string& s,
const std::string& t) override;
// Undef to eliminate clash on Windows
#undef GetFreeSpace
virtual Status GetFreeSpace(const std::string& path,
uint64_t* disk_free) override {
if (!IsFilesystemActive() && error_ == Status::NoSpace()) {

@ -166,6 +166,16 @@ class BlobDB : public StackableDB {
}
return MultiGet(options, keys, values);
}
virtual void MultiGet(const ReadOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
const size_t num_keys, const Slice* /*keys*/,
PinnableSlice* /*values*/, Status* statuses,
const bool /*sorted_input*/ = false) override {
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = Status::NotSupported(
"Blob DB doesn't support batched MultiGet");
}
}
using rocksdb::StackableDB::SingleDelete;
virtual Status SingleDelete(const WriteOptions& /*wopts*/,

Loading…
Cancel
Save