Add prefetching (batched MultiGet) for experimental Ribbon filter (#7889)

Summary:
Adds support for prefetching data in Ribbon queries,
which especially optimizes batched Ribbon queries for MultiGet
(~222ns/key to ~97ns/key) but also single key queries on cold memory
(~333ns to ~226ns) because many queries span more than one cache line.

This required some refactoring of the query algorithm, and there
does not appear to be a noticeable regression in "hot memory" query
times (perhaps from 48ns to 50ns).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7889

Test Plan:
existing unit tests, plus performance validation with
filter_bench:

Each data point is the best of two runs. I saturated the machine
CPUs with other filter_bench runs in the background.

Before:

    $ ./filter_bench -impl=3 -m_keys_total_max=200 -average_keys_per_filter=100000 -m_queries=50
    WARNING: Assertions are enabled; benchmarks unnecessarily slow
    Building...
    Build avg ns/key: 125.86
    Number of filters: 1993
    Total size (MB): 168.166
    Reported total allocated memory (MB): 183.211
    Reported internal fragmentation: 8.94626%
    Bits/key stored: 7.05341
    Prelim FP rate %: 0.951827
    ----------------------------
    Mixed inside/outside queries...
      Single filter net ns/op: 48.0111
      Batched, prepared net ns/op: 222.384
      Batched, unprepared net ns/op: 343.908
      Skewed 50% in 1% net ns/op: 252.916
      Skewed 80% in 20% net ns/op: 320.579
      Random filter net ns/op: 332.957

After:

    $ ./filter_bench -impl=3 -m_keys_total_max=200 -average_keys_per_filter=100000 -m_queries=50
    WARNING: Assertions are enabled; benchmarks unnecessarily slow
    Building...
    Build avg ns/key: 128.117
    Number of filters: 1993
    Total size (MB): 168.166
    Reported total allocated memory (MB): 183.211
    Reported internal fragmentation: 8.94626%
    Bits/key stored: 7.05341
    Prelim FP rate %: 0.951827
    ----------------------------
    Mixed inside/outside queries...
      Single filter net ns/op: 49.8812
      Batched, prepared net ns/op: 97.1514
      Batched, unprepared net ns/op: 222.025
      Skewed 50% in 1% net ns/op: 197.48
      Skewed 80% in 20% net ns/op: 212.457
      Random filter net ns/op: 226.464

Bloom comparison, for reference:

    $ ./filter_bench -impl=2 -m_keys_total_max=200 -average_keys_per_filter=100000 -m_queries=50
    WARNING: Assertions are enabled; benchmarks unnecessarily slow
    Building...
    Build avg ns/key: 35.3042
    Number of filters: 1993
    Total size (MB): 238.488
    Reported total allocated memory (MB): 262.875
    Reported internal fragmentation: 10.2255%
    Bits/key stored: 10.0029
    Prelim FP rate %: 0.965327
    ----------------------------
    Mixed inside/outside queries...
      Single filter net ns/op: 9.09931
      Batched, prepared net ns/op: 34.21
      Batched, unprepared net ns/op: 88.8564
      Skewed 50% in 1% net ns/op: 139.75
      Skewed 80% in 20% net ns/op: 181.264
      Random filter net ns/op: 173.88

Reviewed By: jay-zhuang

Differential Revision: D26378710

Pulled By: pdillinger

fbshipit-source-id: 058428967c55ed763698284cd3b4bbe3351b6e69
main
Peter Dillinger 4 years ago committed by Facebook GitHub Bot
parent 14fbb43f3e
commit e4f1e64c30
  1. 1
      HISTORY.md
  2. 17
      table/block_based/filter_policy.cc
  3. 119
      util/ribbon_alg.h
  4. 34
      util/ribbon_impl.h

@ -7,6 +7,7 @@
* Add support for key-value integrity protection in live updates from the user buffers provided to `WriteBatch` through the write to RocksDB's in-memory update buffer (memtable). This is intended to detect some cases of in-memory data corruption, due to either software or hardware errors. Users can enable protection by constructing their `WriteBatch` with `protection_bytes_per_key == 8`. * Add support for key-value integrity protection in live updates from the user buffers provided to `WriteBatch` through the write to RocksDB's in-memory update buffer (memtable). This is intended to detect some cases of in-memory data corruption, due to either software or hardware errors. Users can enable protection by constructing their `WriteBatch` with `protection_bytes_per_key == 8`.
* Add support for updating `full_history_ts_low` option in manual compaction, which is for old timestamp data GC. * Add support for updating `full_history_ts_low` option in manual compaction, which is for old timestamp data GC.
* Add a mechanism for using Makefile to build external plugin code into the RocksDB libraries/binaries. This intends to simplify compatibility and distribution for plugins (e.g., special-purpose `FileSystem`s) whose source code resides outside the RocksDB repo. See "plugin/README.md" for developer details, and "PLUGINS.md" for a listing of available plugins. * Add a mechanism for using Makefile to build external plugin code into the RocksDB libraries/binaries. This intends to simplify compatibility and distribution for plugins (e.g., special-purpose `FileSystem`s) whose source code resides outside the RocksDB repo. See "plugin/README.md" for developer details, and "PLUGINS.md" for a listing of available plugins.
* Added memory pre-fetching for experimental Ribbon filter, which especially optimizes performance with batched MultiGet.
### Bug Fixes ### Bug Fixes
* Since 6.15.0, `TransactionDB` returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees. There are certain cases where range deletion can still be used on such DBs; see the API doc on `TransactionDB::DeleteRange()` for details. * Since 6.15.0, `TransactionDB` returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees. There are certain cases where range deletion can still be used on such DBs; see the API doc on `TransactionDB::DeleteRange()` for details.

@ -725,13 +725,22 @@ class Standard128RibbonBitsReader : public FilterBitsReader {
} }
virtual void MayMatch(int num_keys, Slice** keys, bool* may_match) override { virtual void MayMatch(int num_keys, Slice** keys, bool* may_match) override {
std::array<uint64_t, MultiGetContext::MAX_BATCH_SIZE> hashes; struct SavedData {
uint64_t seeded_hash;
uint32_t segment_num;
uint32_t num_columns;
uint32_t start_bits;
};
std::array<SavedData, MultiGetContext::MAX_BATCH_SIZE> saved;
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
hashes[i] = GetSliceHash64(*keys[i]); ribbon::InterleavedPrepareQuery(
// FIXME: batched get optimization GetSliceHash64(*keys[i]), hasher_, soln_, &saved[i].seeded_hash,
&saved[i].segment_num, &saved[i].num_columns, &saved[i].start_bits);
} }
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
may_match[i] = soln_.FilterQuery(hashes[i], hasher_); may_match[i] = ribbon::InterleavedFilterQuery(
saved[i].seeded_hash, saved[i].segment_num, saved[i].num_columns,
saved[i].start_bits, hasher_, soln_);
} }
} }

@ -1050,13 +1050,13 @@ void InterleavedBackSubst(InterleavedSolutionStorage *iss,
std::unique_ptr<CoeffRow[]> state{new CoeffRow[num_columns]()}; std::unique_ptr<CoeffRow[]> state{new CoeffRow[num_columns]()};
Index block = num_blocks; Index block = num_blocks;
Index segment = num_segments; Index segment_num = num_segments;
while (block > upper_start_block) { while (block > upper_start_block) {
--block; --block;
BackSubstBlock(state.get(), num_columns, bs, block * kCoeffBits); BackSubstBlock(state.get(), num_columns, bs, block * kCoeffBits);
segment -= num_columns; segment_num -= num_columns;
for (Index i = 0; i < num_columns; ++i) { for (Index i = 0; i < num_columns; ++i) {
iss->StoreSegment(segment + i, state[i]); iss->StoreSegment(segment_num + i, state[i]);
} }
} }
// Now (if applicable), region using lower number of columns // Now (if applicable), region using lower number of columns
@ -1066,60 +1066,92 @@ void InterleavedBackSubst(InterleavedSolutionStorage *iss,
while (block > 0) { while (block > 0) {
--block; --block;
BackSubstBlock(state.get(), num_columns, bs, block * kCoeffBits); BackSubstBlock(state.get(), num_columns, bs, block * kCoeffBits);
segment -= num_columns; segment_num -= num_columns;
for (Index i = 0; i < num_columns; ++i) { for (Index i = 0; i < num_columns; ++i) {
iss->StoreSegment(segment + i, state[i]); iss->StoreSegment(segment_num + i, state[i]);
} }
} }
// Verify everything processed // Verify everything processed
assert(block == 0); assert(block == 0);
assert(segment == 0); assert(segment_num == 0);
} }
// General PHSF query a key from InterleavedSolutionStorage. // Prefetch memory for a key in InterleavedSolutionStorage.
template <typename InterleavedSolutionStorage, typename PhsfQueryHasher> template <typename InterleavedSolutionStorage, typename PhsfQueryHasher>
typename InterleavedSolutionStorage::ResultRow InterleavedPhsfQuery( inline void InterleavedPrepareQuery(
const typename PhsfQueryHasher::Key &key, const PhsfQueryHasher &hasher, const typename PhsfQueryHasher::Key &key, const PhsfQueryHasher &hasher,
const InterleavedSolutionStorage &iss) { const InterleavedSolutionStorage &iss,
typename PhsfQueryHasher::Hash *saved_hash,
typename InterleavedSolutionStorage::Index *saved_segment_num,
typename InterleavedSolutionStorage::Index *saved_num_columns,
typename InterleavedSolutionStorage::Index *saved_start_bit) {
using Hash = typename PhsfQueryHasher::Hash; using Hash = typename PhsfQueryHasher::Hash;
using CoeffRow = typename InterleavedSolutionStorage::CoeffRow; using CoeffRow = typename InterleavedSolutionStorage::CoeffRow;
using Index = typename InterleavedSolutionStorage::Index; using Index = typename InterleavedSolutionStorage::Index;
using ResultRow = typename InterleavedSolutionStorage::ResultRow;
static_assert(sizeof(Index) == sizeof(typename PhsfQueryHasher::Index), static_assert(sizeof(Index) == sizeof(typename PhsfQueryHasher::Index),
"must be same"); "must be same");
static_assert(sizeof(CoeffRow) == sizeof(typename PhsfQueryHasher::CoeffRow),
"must be same");
constexpr auto kCoeffBits = static_cast<Index>(sizeof(CoeffRow) * 8U);
const Hash hash = hasher.GetHash(key); const Hash hash = hasher.GetHash(key);
const Index start_slot = hasher.GetStart(hash, iss.GetNumStarts()); const Index start_slot = hasher.GetStart(hash, iss.GetNumStarts());
constexpr auto kCoeffBits = static_cast<Index>(sizeof(CoeffRow) * 8U);
const Index upper_start_block = iss.GetUpperStartBlock(); const Index upper_start_block = iss.GetUpperStartBlock();
Index num_columns = iss.GetUpperNumColumns(); Index num_columns = iss.GetUpperNumColumns();
Index start_block_num = start_slot / kCoeffBits; Index start_block_num = start_slot / kCoeffBits;
Index segment = start_block_num * num_columns - Index segment_num = start_block_num * num_columns -
std::min(start_block_num, upper_start_block); std::min(start_block_num, upper_start_block);
// Change to lower num columns if applicable. // Change to lower num columns if applicable.
// (This should not compile to a conditional branch.) // (This should not compile to a conditional branch.)
num_columns -= (start_block_num < upper_start_block) ? 1 : 0; num_columns -= (start_block_num < upper_start_block) ? 1 : 0;
const CoeffRow cr = hasher.GetCoeffRow(hash);
Index start_bit = start_slot % kCoeffBits; Index start_bit = start_slot % kCoeffBits;
Index segment_count = num_columns + (start_bit == 0 ? 0 : num_columns);
iss.PrefetchSegmentRange(segment_num, segment_num + segment_count);
*saved_hash = hash;
*saved_segment_num = segment_num;
*saved_num_columns = num_columns;
*saved_start_bit = start_bit;
}
// General PHSF query from InterleavedSolutionStorage, using data for
// the query key from InterleavedPrepareQuery
template <typename InterleavedSolutionStorage, typename PhsfQueryHasher>
inline typename InterleavedSolutionStorage::ResultRow InterleavedPhsfQuery(
typename PhsfQueryHasher::Hash hash,
typename InterleavedSolutionStorage::Index segment_num,
typename InterleavedSolutionStorage::Index num_columns,
typename InterleavedSolutionStorage::Index start_bit,
const PhsfQueryHasher &hasher, const InterleavedSolutionStorage &iss) {
using CoeffRow = typename InterleavedSolutionStorage::CoeffRow;
using Index = typename InterleavedSolutionStorage::Index;
using ResultRow = typename InterleavedSolutionStorage::ResultRow;
static_assert(sizeof(Index) == sizeof(typename PhsfQueryHasher::Index),
"must be same");
static_assert(sizeof(CoeffRow) == sizeof(typename PhsfQueryHasher::CoeffRow),
"must be same");
constexpr auto kCoeffBits = static_cast<Index>(sizeof(CoeffRow) * 8U);
const CoeffRow cr = hasher.GetCoeffRow(hash);
ResultRow sr = 0; ResultRow sr = 0;
const CoeffRow cr_left = cr << start_bit; const CoeffRow cr_left = cr << static_cast<unsigned>(start_bit);
for (Index i = 0; i < num_columns; ++i) { for (Index i = 0; i < num_columns; ++i) {
sr ^= BitParity(iss.LoadSegment(segment + i) & cr_left) << i; sr ^= BitParity(iss.LoadSegment(segment_num + i) & cr_left) << i;
} }
if (start_bit > 0) { if (start_bit > 0) {
segment += num_columns; segment_num += num_columns;
const CoeffRow cr_right = cr >> (kCoeffBits - start_bit); const CoeffRow cr_right =
cr >> static_cast<unsigned>(kCoeffBits - start_bit);
for (Index i = 0; i < num_columns; ++i) { for (Index i = 0; i < num_columns; ++i) {
sr ^= BitParity(iss.LoadSegment(segment + i) & cr_right) << i; sr ^= BitParity(iss.LoadSegment(segment_num + i) & cr_right) << i;
} }
} }
@ -1128,12 +1160,12 @@ typename InterleavedSolutionStorage::ResultRow InterleavedPhsfQuery(
// Filter query a key from InterleavedFilterQuery. // Filter query a key from InterleavedFilterQuery.
template <typename InterleavedSolutionStorage, typename FilterQueryHasher> template <typename InterleavedSolutionStorage, typename FilterQueryHasher>
bool InterleavedFilterQuery(const typename FilterQueryHasher::Key &key, inline bool InterleavedFilterQuery(
const FilterQueryHasher &hasher, typename FilterQueryHasher::Hash hash,
const InterleavedSolutionStorage &iss) { typename InterleavedSolutionStorage::Index segment_num,
// BEGIN mostly copied from InterleavedPhsfQuery typename InterleavedSolutionStorage::Index num_columns,
using Hash = typename FilterQueryHasher::Hash; typename InterleavedSolutionStorage::Index start_bit,
const FilterQueryHasher &hasher, const InterleavedSolutionStorage &iss) {
using CoeffRow = typename InterleavedSolutionStorage::CoeffRow; using CoeffRow = typename InterleavedSolutionStorage::CoeffRow;
using Index = typename InterleavedSolutionStorage::Index; using Index = typename InterleavedSolutionStorage::Index;
using ResultRow = typename InterleavedSolutionStorage::ResultRow; using ResultRow = typename InterleavedSolutionStorage::ResultRow;
@ -1149,41 +1181,28 @@ bool InterleavedFilterQuery(const typename FilterQueryHasher::Key &key,
constexpr auto kCoeffBits = static_cast<Index>(sizeof(CoeffRow) * 8U); constexpr auto kCoeffBits = static_cast<Index>(sizeof(CoeffRow) * 8U);
const Hash hash = hasher.GetHash(key);
const Index start_slot = hasher.GetStart(hash, iss.GetNumStarts());
const Index upper_start_block = iss.GetUpperStartBlock();
Index num_columns = iss.GetUpperNumColumns();
Index start_block_num = start_slot / kCoeffBits;
Index segment = start_block_num * num_columns -
std::min(start_block_num, upper_start_block);
// Change to lower num columns if applicable.
// (This should not compile to a conditional branch.)
num_columns -= (start_block_num < upper_start_block) ? 1 : 0;
const CoeffRow cr = hasher.GetCoeffRow(hash); const CoeffRow cr = hasher.GetCoeffRow(hash);
Index start_bit = start_slot % kCoeffBits;
// END mostly copied from InterleavedPhsfQuery.
const ResultRow expected = hasher.GetResultRowFromHash(hash); const ResultRow expected = hasher.GetResultRowFromHash(hash);
// TODO: consider optimizations such as // TODO: consider optimizations such as
// * mask fetched values and shift cr, rather than shifting fetched values
// * get rid of start_bit == 0 condition with careful fetching & shifting // * get rid of start_bit == 0 condition with careful fetching & shifting
if (start_bit == 0) { if (start_bit == 0) {
for (Index i = 0; i < num_columns; ++i) { for (Index i = 0; i < num_columns; ++i) {
if (BitParity(iss.LoadSegment(segment + i) & cr) != if (BitParity(iss.LoadSegment(segment_num + i) & cr) !=
(static_cast<int>(expected >> i) & 1)) { (static_cast<int>(expected >> i) & 1)) {
return false; return false;
} }
} }
} else { } else {
const CoeffRow cr_left = cr << static_cast<unsigned>(start_bit);
const CoeffRow cr_right =
cr >> static_cast<unsigned>(kCoeffBits - start_bit);
for (Index i = 0; i < num_columns; ++i) { for (Index i = 0; i < num_columns; ++i) {
CoeffRow soln_col = CoeffRow soln_data =
(iss.LoadSegment(segment + i) >> static_cast<unsigned>(start_bit)) | (iss.LoadSegment(segment_num + i) & cr_left) ^
(iss.LoadSegment(segment + num_columns + i) (iss.LoadSegment(segment_num + num_columns + i) & cr_right);
<< static_cast<unsigned>(kCoeffBits - start_bit)); if (BitParity(soln_data) != (static_cast<int>(expected >> i) & 1)) {
if (BitParity(soln_col & cr) != (static_cast<int>(expected >> i) & 1)) {
return false; return false;
} }
} }

@ -810,6 +810,20 @@ class SerializableInterleavedSolution {
assert(data_ != nullptr); // suppress clang analyzer report assert(data_ != nullptr); // suppress clang analyzer report
EncodeFixedGeneric(data_ + segment_num * sizeof(CoeffRow), val); EncodeFixedGeneric(data_ + segment_num * sizeof(CoeffRow), val);
} }
void PrefetchSegmentRange(Index begin_segment_num,
Index end_segment_num) const {
if (end_segment_num == begin_segment_num) {
// Nothing to do
return;
}
char* cur = data_ + begin_segment_num * sizeof(CoeffRow);
char* last = data_ + (end_segment_num - 1) * sizeof(CoeffRow);
while (cur < last) {
PREFETCH(cur, 0 /* rw */, 1 /* locality */);
cur += CACHE_LINE_SIZE;
}
PREFETCH(last, 0 /* rw */, 1 /* locality */);
}
// ******************************************************************** // ********************************************************************
// High-level API // High-level API
@ -846,7 +860,15 @@ class SerializableInterleavedSolution {
return 0; return 0;
} else { } else {
// Normal // Normal
return InterleavedPhsfQuery(input, hasher, *this); // NOTE: not using a struct to encourage compiler optimization
Hash hash;
Index segment_num;
Index num_columns;
Index start_bit;
InterleavedPrepareQuery(input, hasher, *this, &hash, &segment_num,
&num_columns, &start_bit);
return InterleavedPhsfQuery(hash, segment_num, num_columns, start_bit,
hasher, *this);
} }
} }
@ -859,7 +881,15 @@ class SerializableInterleavedSolution {
} else { } else {
// Normal, or upper_num_columns_ == 0 means "no space for data" and // Normal, or upper_num_columns_ == 0 means "no space for data" and
// thus will always return true. // thus will always return true.
return InterleavedFilterQuery(input, hasher, *this); // NOTE: not using a struct to encourage compiler optimization
Hash hash;
Index segment_num;
Index num_columns;
Index start_bit;
InterleavedPrepareQuery(input, hasher, *this, &hash, &segment_num,
&num_columns, &start_bit);
return InterleavedFilterQuery(hash, segment_num, num_columns, start_bit,
hasher, *this);
} }
} }

Loading…
Cancel
Save