cache friendly blocked bloomfilter

Summary:
By constraining the probes within cache line(s), we can improve the
cache miss rate thus performance. This probably only makes sense for
in-memory workload so defaults the option to off.

Numbers and comparision can be found in wiki:
https://our.intern.facebook.com/intern/wiki/index.php/Ljin/rocksdb_perf/2014_03_17#Bloom_Filter_Study

Test Plan: benchmarked this change substantially. Will run make all check as well

Reviewers: haobo, igor, dhruba, sdong, yhchiang

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17133
main
Lei Jin 10 years ago
parent 10cebec79e
commit 0d755fff14
  1. 13
      db/db_bench.cc
  2. 1
      db/memtable.cc
  3. 11
      include/rocksdb/options.h
  4. 2
      port/port_posix.h
  5. 4
      table/plain_table_reader.cc
  6. 38
      util/dynamic_bloom.cc
  7. 61
      util/dynamic_bloom.h
  8. 185
      util/dynamic_bloom_test.cc
  9. 1
      util/options.cc

@ -134,6 +134,8 @@ DEFINE_int64(read_range, 1, "When ==1 reads use ::Get, when >1 reads use"
DEFINE_bool(use_prefix_blooms, false, "Whether to place prefixes in blooms");
DEFINE_int32(bloom_locality, 0, "Control bloom filter probes locality");
DEFINE_bool(use_prefix_api, false, "Whether to set ReadOptions.prefix for"
" prefixscanrandom. If true, use_prefix_blooms must also be true.");
@ -1543,6 +1545,7 @@ class Benchmark {
NewFixedPrefixTransform(FLAGS_prefix_size));
}
options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits;
options.bloom_locality = FLAGS_bloom_locality;
options.max_open_files = FLAGS_open_files;
options.statistics = dbstats;
options.env = FLAGS_env;
@ -1916,7 +1919,7 @@ class Benchmark {
Duration duration(FLAGS_duration, reads_);
int64_t found = 0;
int64_t read = 0;
if (FLAGS_use_multiget) { // MultiGet
const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group
long keys_left = reads_;
@ -1924,6 +1927,7 @@ class Benchmark {
// Recalculate number of keys per group, and call MultiGet until done
long num_keys;
while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
read += num_keys;
found +=
MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, "");
thread->stats.FinishedSingleOp(db_);
@ -1937,8 +1941,9 @@ class Benchmark {
std::string key = GenerateKeyFromInt(k, FLAGS_num);
iter->Seek(key);
read++;
if (iter->Valid() && iter->key().compare(Slice(key)) == 0) {
++found;
found++;
}
thread->stats.FinishedSingleOp(db_);
@ -1957,6 +1962,7 @@ class Benchmark {
}
if (FLAGS_read_range < 2) {
read++;
if (db_->Get(options, key, &value).ok()) {
found++;
}
@ -1972,6 +1978,7 @@ class Benchmark {
db_->GetApproximateSizes(&range, 1, &sizes);
}
read += FLAGS_read_range;
for (iter->Seek(key);
iter->Valid() && count <= FLAGS_read_range;
++count, iter->Next()) {
@ -1992,7 +1999,7 @@ class Benchmark {
char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
found, reads_);
found, read);
thread->stats.AddMessage(msg);

@ -52,6 +52,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
assert(!should_flush_);
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits,
options.bloom_locality,
options.memtable_prefix_bloom_probes));
}
}

@ -719,6 +719,17 @@ struct Options {
// number of hash probes per key
uint32_t memtable_prefix_bloom_probes;
// Control locality of bloom filter probes to improve cache miss rate.
// This option only applies to memtable prefix bloom and plaintable
// prefix bloom. It essentially limits the max number of cache lines each
// bloom filter check can touch.
// This optimization is turned off when set to 0. The number should never
// be greater than number of probes. This option can boost performance
// for in-memory workload but should use with care since it can cause
// higher false positive rate.
// Default: 0
uint32_t bloom_locality;
// Maximum number of successive merge operations on a key in the memtable.
//
// When a merge operation is added to the memtable and the maximum number of

@ -480,6 +480,8 @@ inline bool GetHeapProfile(void (*func)(void *, const char *, int), void *arg) {
return false;
}
#define CACHE_LINE_SIZE 64U
} // namespace port
} // namespace rocksdb

@ -270,7 +270,7 @@ void PlainTableReader::AllocateIndexAndBloom(int num_prefixes) {
if (options_.prefix_extractor != nullptr) {
uint32_t bloom_total_bits = num_prefixes * kBloomBitsPerKey;
if (bloom_total_bits > 0) {
bloom_.reset(new DynamicBloom(bloom_total_bits));
bloom_.reset(new DynamicBloom(bloom_total_bits, options_.bloom_locality));
}
}
@ -388,7 +388,7 @@ Status PlainTableReader::PopulateIndex() {
if (IsTotalOrderMode()) {
uint32_t num_bloom_bits = table_properties_->num_entries * kBloomBitsPerKey;
if (num_bloom_bits > 0) {
bloom_.reset(new DynamicBloom(num_bloom_bits));
bloom_.reset(new DynamicBloom(num_bloom_bits, options_.bloom_locality));
}
}

@ -5,6 +5,9 @@
#include "dynamic_bloom.h"
#include <algorithm>
#include "port/port.h"
#include "rocksdb/slice.h"
#include "util/hash.h"
@ -17,20 +20,31 @@ static uint32_t BloomHash(const Slice& key) {
}
DynamicBloom::DynamicBloom(uint32_t total_bits,
uint32_t (*hash_func)(const Slice& key),
uint32_t num_probes)
: hash_func_(hash_func),
kTotalBits((total_bits + 7) / 8 * 8),
kNumProbes(num_probes) {
assert(hash_func_);
uint32_t cl_per_block,
uint32_t num_probes,
uint32_t (*hash_func)(const Slice& key))
: kBlocked(cl_per_block > 0),
kBitsPerBlock(std::min(cl_per_block, num_probes) * CACHE_LINE_SIZE * 8),
kTotalBits((kBlocked ? (total_bits + kBitsPerBlock - 1) / kBitsPerBlock
* kBitsPerBlock :
total_bits + 7) / 8 * 8),
kNumBlocks(kBlocked ? kTotalBits / kBitsPerBlock : 1),
kNumProbes(num_probes),
hash_func_(hash_func == nullptr ? &BloomHash : hash_func) {
assert(kBlocked ? kTotalBits > 0 : kTotalBits >= kBitsPerBlock);
assert(kNumProbes > 0);
assert(kTotalBits > 0);
data_.reset(new unsigned char[kTotalBits / 8]());
}
DynamicBloom::DynamicBloom(uint32_t total_bits,
uint32_t num_probes)
: DynamicBloom(total_bits, &BloomHash, num_probes) {
uint32_t sz = kTotalBits / 8;
if (kBlocked) {
sz += CACHE_LINE_SIZE - 1;
}
raw_ = new unsigned char[sz]();
if (kBlocked) {
data_ = raw_ + CACHE_LINE_SIZE -
reinterpret_cast<uint64_t>(raw_) % CACHE_LINE_SIZE;
} else {
data_ = raw_;
}
}
} // rocksdb

@ -15,13 +15,17 @@ class Slice;
class DynamicBloom {
public:
// total_bits: fixed total bits for the bloom
// hash_func: customized hash function
// num_probes: number of hash probes for a single key
DynamicBloom(uint32_t total_bits,
uint32_t (*hash_func)(const Slice& key),
uint32_t num_probes = 6);
// cl_per_block: block size in cache lines. When this is non-zero, a
// query/set is done within a block to improve cache locality.
// hash_func: customized hash function
explicit DynamicBloom(uint32_t total_bits, uint32_t cl_per_block = 0,
uint32_t num_probes = 6,
uint32_t (*hash_func)(const Slice& key) = nullptr);
explicit DynamicBloom(uint32_t total_bits, uint32_t num_probes = 6);
~DynamicBloom() {
delete[] raw_;
}
// Assuming single threaded access to this function.
void Add(const Slice& key);
@ -36,10 +40,15 @@ class DynamicBloom {
bool MayContainHash(uint32_t hash);
private:
uint32_t (*hash_func_)(const Slice& key);
const bool kBlocked;
const uint32_t kBitsPerBlock;
const uint32_t kTotalBits;
const uint32_t kNumBlocks;
const uint32_t kNumProbes;
std::unique_ptr<unsigned char[]> data_;
uint32_t (*hash_func_)(const Slice& key);
unsigned char* data_;
unsigned char* raw_;
};
inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); }
@ -50,22 +59,42 @@ inline bool DynamicBloom::MayContain(const Slice& key) {
inline bool DynamicBloom::MayContainHash(uint32_t h) {
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (uint32_t i = 0; i < kNumProbes; i++) {
const uint32_t bitpos = h % kTotalBits;
if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) {
return false;
if (kBlocked) {
uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock;
for (uint32_t i = 0; i < kNumProbes; ++i) {
const uint32_t bitpos = b + h % kBitsPerBlock;
if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) {
return false;
}
h += delta;
}
} else {
for (uint32_t i = 0; i < kNumProbes; ++i) {
const uint32_t bitpos = h % kTotalBits;
if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) {
return false;
}
h += delta;
}
h += delta;
}
return true;
}
inline void DynamicBloom::AddHash(uint32_t h) {
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (uint32_t i = 0; i < kNumProbes; i++) {
const uint32_t bitpos = h % kTotalBits;
data_[bitpos / 8] |= (1 << (bitpos % 8));
h += delta;
if (kBlocked) {
uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock;
for (uint32_t i = 0; i < kNumProbes; ++i) {
const uint32_t bitpos = b + h % kBitsPerBlock;
data_[bitpos / 8] |= (1 << (bitpos % 8));
h += delta;
}
} else {
for (uint32_t i = 0; i < kNumProbes; ++i) {
const uint32_t bitpos = h % kTotalBits;
data_[bitpos / 8] |= (1 << (bitpos % 8));
h += delta;
}
}
}

@ -3,19 +3,23 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <algorithm>
#include <gflags/gflags.h>
#include "dynamic_bloom.h"
#include "port/port.h"
#include "util/logging.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/stop_watch.h"
DEFINE_int32(bits_per_key, 10, "");
DEFINE_int32(num_probes, 6, "");
DEFINE_bool(enable_perf, false, "");
namespace rocksdb {
static Slice Key(int i, char* buffer) {
static Slice Key(uint64_t i, char* buffer) {
memcpy(buffer, &i, sizeof(i));
return Slice(buffer, sizeof(i));
}
@ -24,32 +28,44 @@ class DynamicBloomTest {
};
TEST(DynamicBloomTest, EmptyFilter) {
DynamicBloom bloom(100, 2);
ASSERT_TRUE(! bloom.MayContain("hello"));
ASSERT_TRUE(! bloom.MayContain("world"));
DynamicBloom bloom1(100, 0, 2);
ASSERT_TRUE(!bloom1.MayContain("hello"));
ASSERT_TRUE(!bloom1.MayContain("world"));
DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2);
ASSERT_TRUE(!bloom2.MayContain("hello"));
ASSERT_TRUE(!bloom2.MayContain("world"));
}
TEST(DynamicBloomTest, Small) {
DynamicBloom bloom(100, 2);
bloom.Add("hello");
bloom.Add("world");
ASSERT_TRUE(bloom.MayContain("hello"));
ASSERT_TRUE(bloom.MayContain("world"));
ASSERT_TRUE(! bloom.MayContain("x"));
ASSERT_TRUE(! bloom.MayContain("foo"));
DynamicBloom bloom1(100, 0, 2);
bloom1.Add("hello");
bloom1.Add("world");
ASSERT_TRUE(bloom1.MayContain("hello"));
ASSERT_TRUE(bloom1.MayContain("world"));
ASSERT_TRUE(!bloom1.MayContain("x"));
ASSERT_TRUE(!bloom1.MayContain("foo"));
DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2);
bloom2.Add("hello");
bloom2.Add("world");
ASSERT_TRUE(bloom2.MayContain("hello"));
ASSERT_TRUE(bloom2.MayContain("world"));
ASSERT_TRUE(!bloom2.MayContain("x"));
ASSERT_TRUE(!bloom2.MayContain("foo"));
}
static int NextLength(int length) {
if (length < 10) {
length += 1;
} else if (length < 100) {
length += 10;
} else if (length < 1000) {
length += 100;
static uint32_t NextNum(uint32_t num) {
if (num < 10) {
num += 1;
} else if (num < 100) {
num += 10;
} else if (num < 1000) {
num += 100;
} else {
length += 1000;
num += 1000;
}
return length;
return num;
}
TEST(DynamicBloomTest, VaryingLengths) {
@ -62,47 +78,116 @@ TEST(DynamicBloomTest, VaryingLengths) {
fprintf(stderr, "bits_per_key: %d num_probes: %d\n",
FLAGS_bits_per_key, FLAGS_num_probes);
for (int length = 1; length <= 10000; length = NextLength(length)) {
uint32_t bloom_bits = std::max(length * FLAGS_bits_per_key, 64);
DynamicBloom bloom(bloom_bits, FLAGS_num_probes);
for (int i = 0; i < length; i++) {
bloom.Add(Key(i, buffer));
ASSERT_TRUE(bloom.MayContain(Key(i, buffer)));
}
for (uint32_t cl_per_block = 0; cl_per_block < FLAGS_num_probes;
++cl_per_block) {
for (uint32_t num = 1; num <= 10000; num = NextNum(num)) {
uint32_t bloom_bits = 0;
if (cl_per_block == 0) {
bloom_bits = std::max(num * FLAGS_bits_per_key, 64U);
} else {
bloom_bits = std::max(num * FLAGS_bits_per_key,
cl_per_block * CACHE_LINE_SIZE * 8);
}
DynamicBloom bloom(bloom_bits, cl_per_block, FLAGS_num_probes);
for (uint64_t i = 0; i < num; i++) {
bloom.Add(Key(i, buffer));
ASSERT_TRUE(bloom.MayContain(Key(i, buffer)));
}
// All added keys must match
for (int i = 0; i < length; i++) {
ASSERT_TRUE(bloom.MayContain(Key(i, buffer)))
<< "Length " << length << "; key " << i;
}
// All added keys must match
for (uint64_t i = 0; i < num; i++) {
ASSERT_TRUE(bloom.MayContain(Key(i, buffer)))
<< "Num " << num << "; key " << i;
}
// Check false positive rate
// Check false positive rate
int result = 0;
for (int i = 0; i < 10000; i++) {
if (bloom.MayContain(Key(i + 1000000000, buffer))) {
result++;
int result = 0;
for (uint64_t i = 0; i < 10000; i++) {
if (bloom.MayContain(Key(i + 1000000000, buffer))) {
result++;
}
}
double rate = result / 10000.0;
fprintf(stderr, "False positives: %5.2f%% @ num = %6u, bloom_bits = %6u, "
"cl per block = %u\n", rate*100.0, num, bloom_bits, cl_per_block);
if (rate > 0.0125)
mediocre_filters++; // Allowed, but not too often
else
good_filters++;
}
double rate = result / 10000.0;
fprintf(stderr, "False positives: %5.2f%% @ length = %6d ; \n",
rate*100.0, length);
fprintf(stderr, "Filters: %d good, %d mediocre\n",
good_filters, mediocre_filters);
ASSERT_LE(mediocre_filters, good_filters/5);
}
}
TEST(DynamicBloomTest, perf) {
StopWatchNano timer(Env::Default());
//ASSERT_LE(rate, 0.02); // Must not be over 2%
if (rate > 0.0125)
mediocre_filters++; // Allowed, but not too often
else
good_filters++;
if (!FLAGS_enable_perf) {
return;
}
fprintf(stderr, "Filters: %d good, %d mediocre\n",
good_filters, mediocre_filters);
for (uint64_t m = 1; m <= 8; ++m) {
const uint64_t num_keys = m * 8 * 1024 * 1024;
fprintf(stderr, "testing %luM keys\n", m * 8);
ASSERT_LE(mediocre_filters, good_filters/5);
}
DynamicBloom std_bloom(num_keys * 10, 0, FLAGS_num_probes);
// Different bits-per-byte
timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
std_bloom.Add(Slice(reinterpret_cast<const char*>(&i), 8));
}
uint64_t elapsed = timer.ElapsedNanos();
fprintf(stderr, "standard bloom, avg add latency %lu\n",
elapsed / num_keys);
uint64_t count = 0;
timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
if (std_bloom.MayContain(Slice(reinterpret_cast<const char*>(&i), 8))) {
++count;
}
}
elapsed = timer.ElapsedNanos();
fprintf(stderr, "standard bloom, avg query latency %lu\n",
elapsed / count);
ASSERT_TRUE(count == num_keys);
for (int cl_per_block = 1; cl_per_block <= FLAGS_num_probes;
++cl_per_block) {
DynamicBloom blocked_bloom(num_keys * 10, cl_per_block, FLAGS_num_probes);
timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
blocked_bloom.Add(Slice(reinterpret_cast<const char*>(&i), 8));
}
uint64_t elapsed = timer.ElapsedNanos();
fprintf(stderr, "blocked bloom(%d), avg add latency %lu\n",
cl_per_block, elapsed / num_keys);
uint64_t count = 0;
timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
if (blocked_bloom.MayContain(
Slice(reinterpret_cast<const char*>(&i), 8))) {
++count;
}
}
elapsed = timer.ElapsedNanos();
fprintf(stderr, "blocked bloom(%d), avg query latency %lu\n",
cl_per_block, elapsed / count);
ASSERT_TRUE(count == num_keys);
}
}
}
} // namespace rocksdb

@ -112,6 +112,7 @@ Options::Options()
inplace_callback(nullptr),
memtable_prefix_bloom_bits(0),
memtable_prefix_bloom_probes(6),
bloom_locality(0),
max_successive_merges(0),
min_partial_merge_operands(2),
allow_thread_local(true) {

Loading…
Cancel
Save