[RocksDB] [Performance Branch] Added dynamic bloom, to be used for memable non-existing key filtering

Summary: as title

Test Plan: dynamic_bloom_test

Reviewers: dhruba, sdong, kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D14385
main
Haobo Xu 11 years ago
parent a82f42b765
commit 3c02c363b3
  1. 4
      Makefile
  2. 92
      db/memtable.cc
  3. 4
      db/memtable.h
  4. 30
      db/prefix_test.cc
  5. 2
      include/rocksdb/memtablerep.h
  6. 8
      include/rocksdb/options.h
  7. 8
      util/bloom_test.cc
  8. 63
      util/dynamic_bloom.cc
  9. 42
      util/dynamic_bloom.h
  10. 113
      util/dynamic_bloom_test.cc
  11. 4
      util/hash_skiplist_rep.cc
  12. 9
      util/options.cc

@ -50,6 +50,7 @@ TESTS = \
auto_roll_logger_test \
block_test \
bloom_test \
dynamic_bloom_test \
c_test \
cache_test \
coding_test \
@ -228,6 +229,9 @@ table_properties_collector_test: db/table_properties_collector_test.o $(LIBOBJEC
bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
dynamic_bloom_test: util/dynamic_bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/dynamic_bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
c_test: db/c_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/c_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

@ -52,7 +52,14 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
mem_logfile_number_(0),
locks_(options.inplace_update_support
? options.inplace_update_num_locks
: 0) { }
: 0),
prefix_extractor_(options.prefix_extractor) {
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits,
options.memtable_prefix_bloom_probes));
}
}
MemTable::~MemTable() {
assert(refs_ == 0);
@ -88,27 +95,53 @@ const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
MemTableIterator(MemTableRep* table, const ReadOptions& options)
: iter_() {
MemTableIterator(const MemTable& mem, const ReadOptions& options)
: mem_(mem), iter_(), dynamic_prefix_seek_(false), valid_(false) {
if (options.prefix) {
iter_ = table->GetPrefixIterator(*options.prefix);
iter_ = mem_.table_->GetPrefixIterator(*options.prefix);
} else if (options.prefix_seek) {
iter_ = table->GetDynamicPrefixIterator();
dynamic_prefix_seek_ = true;
iter_ = mem_.table_->GetDynamicPrefixIterator();
} else {
iter_ = table->GetIterator();
iter_ = mem_.table_->GetIterator();
}
}
virtual bool Valid() const { return iter_->Valid(); }
virtual void Seek(const Slice& k) { iter_->Seek(k, nullptr); }
virtual void SeekToFirst() { iter_->SeekToFirst(); }
virtual void SeekToLast() { iter_->SeekToLast(); }
virtual void Next() { iter_->Next(); }
virtual void Prev() { iter_->Prev(); }
virtual bool Valid() const { return valid_; }
virtual void Seek(const Slice& k) {
if (dynamic_prefix_seek_ && mem_.prefix_bloom_ &&
!mem_.prefix_bloom_->MayContain(
mem_.prefix_extractor_->Transform(ExtractUserKey(k)))) {
valid_ = false;
return;
}
iter_->Seek(k, nullptr);
valid_ = iter_->Valid();
}
virtual void SeekToFirst() {
iter_->SeekToFirst();
valid_ = iter_->Valid();
}
virtual void SeekToLast() {
iter_->SeekToLast();
valid_ = iter_->Valid();
}
virtual void Next() {
assert(Valid());
iter_->Next();
valid_ = iter_->Valid();
}
virtual void Prev() {
assert(Valid());
iter_->Prev();
valid_ = iter_->Valid();
}
virtual Slice key() const {
assert(Valid());
return GetLengthPrefixedSlice(iter_->key());
}
virtual Slice value() const {
assert(Valid());
Slice key_slice = GetLengthPrefixedSlice(iter_->key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
@ -116,7 +149,10 @@ class MemTableIterator: public Iterator {
virtual Status status() const { return Status::OK(); }
private:
const MemTable& mem_;
std::shared_ptr<MemTableRep::Iterator> iter_;
bool dynamic_prefix_seek_;
bool valid_;
// No copying allowed
MemTableIterator(const MemTableIterator&);
@ -124,7 +160,7 @@ class MemTableIterator: public Iterator {
};
Iterator* MemTable::NewIterator(const ReadOptions& options) {
return new MemTableIterator(table_.get(), options);
return new MemTableIterator(*this, options);
}
port::RWMutex* MemTable::GetLock(const Slice& key) {
@ -132,7 +168,7 @@ port::RWMutex* MemTable::GetLock(const Slice& key) {
}
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key,
const Slice& key, /* user key */
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
@ -156,6 +192,11 @@ void MemTable::Add(SequenceNumber s, ValueType type,
assert((p + val_size) - buf == (unsigned)encoded_len);
table_->Insert(buf);
if (prefix_bloom_) {
assert(prefix_extractor_);
prefix_bloom_->Add(prefix_extractor_->Transform(key));
}
// The first sequence number inserted into the memtable
assert(first_seqno_ == 0 || s > first_seqno_);
if (first_seqno_ == 0) {
@ -168,10 +209,17 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
StopWatchNano memtable_get_timer(options.env, false);
StartPerfTimer(&memtable_get_timer);
Slice memkey = key.memtable_key();
std::shared_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(key.user_key()));
iter->Seek(key.user_key(), memkey.data());
Slice mem_key = key.memtable_key();
Slice user_key = key.user_key();
std::shared_ptr<MemTableRep::Iterator> iter;
if (prefix_bloom_ &&
!prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))) {
// iter is null if prefix bloom says the key does not exist
} else {
iter = table_->GetIterator(user_key);
iter->Seek(user_key, mem_key.data());
}
bool merge_in_progress = s->IsMergeInProgress();
auto merge_operator = options.merge_operator.get();
@ -179,7 +227,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
std::string merge_result;
bool found_final_value = false;
for (; !found_final_value && iter->Valid(); iter->Next()) {
for (; !found_final_value && iter && iter->Valid(); iter->Next()) {
// entry format is:
// klength varint32
// userkey char[klength-8]
@ -278,11 +326,12 @@ bool MemTable::Update(SequenceNumber seq, ValueType type,
const Slice& key,
const Slice& value) {
LookupKey lkey(key, seq);
Slice memkey = lkey.memtable_key();
Slice mem_key = lkey.memtable_key();
std::shared_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(lkey.user_key()));
iter->Seek(key, memkey.data());
iter->Seek(key, mem_key.data());
if (iter->Valid()) {
// entry format is:
@ -319,6 +368,7 @@ bool MemTable::Update(SequenceNumber seq, ValueType type,
VarintLength(value.size()) +
value.size())
);
// no need to update bloom, as user key does not change.
return true;
}
}

@ -17,6 +17,7 @@
#include "rocksdb/db.h"
#include "rocksdb/memtablerep.h"
#include "util/arena_impl.h"
#include "util/dynamic_bloom.h"
namespace rocksdb {
@ -171,6 +172,9 @@ class MemTable {
// Get the lock associated for the key
port::RWMutex* GetLock(const Slice& key);
const SliceTransform* const prefix_extractor_;
std::unique_ptr<DynamicBloom> prefix_bloom_;
};
extern const char* EncodeKey(std::string* scratch, const Slice& target);

@ -16,12 +16,15 @@ DEFINE_bool(trigger_deadlock, false,
DEFINE_uint64(bucket_count, 100000, "number of buckets");
DEFINE_uint64(num_locks, 10001, "number of locks");
DEFINE_bool(random_prefix, false, "randomize prefix");
DEFINE_uint64(total_prefixes, 1000, "total number of prefixes");
DEFINE_uint64(items_per_prefix, 10, "total number of values per prefix");
DEFINE_int64(write_buffer_size, 1000000000, "");
DEFINE_int64(max_write_buffer_number, 8, "");
DEFINE_int64(min_write_buffer_number_to_merge, 7, "");
DEFINE_uint64(total_prefixes, 100000, "total number of prefixes");
DEFINE_uint64(items_per_prefix, 1, "total number of values per prefix");
DEFINE_int64(write_buffer_size, 33554432, "");
DEFINE_int64(max_write_buffer_number, 2, "");
DEFINE_int64(min_write_buffer_number_to_merge, 1, "");
DEFINE_int32(skiplist_height, 4, "");
DEFINE_int32(memtable_prefix_bloom_bits, 10000000, "");
DEFINE_int32(memtable_prefix_bloom_probes, 10, "");
DEFINE_int32(value_size, 40, "");
// Path to the database on file system
const std::string kDbName = rocksdb::test::TmpDir() + "/prefix_test";
@ -120,6 +123,9 @@ class PrefixTest {
}
}
options.memtable_prefix_bloom_bits = FLAGS_memtable_prefix_bloom_bits;
options.memtable_prefix_bloom_probes = FLAGS_memtable_prefix_bloom_probes;
Status s = DB::Open(options, kDbName, &db);
ASSERT_OK(s);
return std::shared_ptr<DB>(db);
@ -147,18 +153,28 @@ TEST(PrefixTest, DynamicPrefixIterator) {
std::random_shuffle(prefixes.begin(), prefixes.end());
}
HistogramImpl hist_put_time;
HistogramImpl hist_put_comparison;
// insert x random prefix, each with y continuous element.
for (auto prefix : prefixes) {
for (uint64_t sorted = 0; sorted < FLAGS_items_per_prefix; sorted++) {
TestKey test_key(prefix, sorted);
Slice key = TestKeyToSlice(test_key);
std::string value(40, 0);
std::string value(FLAGS_value_size, 0);
perf_context.Reset();
StopWatchNano timer(Env::Default(), true);
ASSERT_OK(db->Put(write_options, key, value));
hist_put_time.Add(timer.ElapsedNanos());
hist_put_comparison.Add(perf_context.user_key_comparison_count);
}
}
std::cout << "Put key comparison: \n" << hist_put_comparison.ToString()
<< "Put time: \n" << hist_put_time.ToString();
// test seek existing keys
HistogramImpl hist_seek_time;
HistogramImpl hist_seek_comparison;
@ -200,7 +216,7 @@ TEST(PrefixTest, DynamicPrefixIterator) {
HistogramImpl hist_no_seek_comparison;
for (auto prefix = FLAGS_total_prefixes;
prefix < FLAGS_total_prefixes + 100;
prefix < FLAGS_total_prefixes + 10000;
prefix++) {
TestKey test_key(prefix, 0);
Slice key = TestKeyToSlice(test_key);

@ -99,7 +99,7 @@ class MemTableRep {
virtual void Prev() = 0;
// Advance to the first entry with a key >= target
virtual void Seek(const Slice& user_key, const char* memtable_key) = 0;
virtual void Seek(const Slice& internal_key, const char* memtable_key) = 0;
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.

@ -635,6 +635,14 @@ struct Options {
// Number of locks used for inplace update
// Default: 10000, if inplace_update_support = true, else 0.
size_t inplace_update_num_locks;
// if prefix_extractor is set and bloom_bits is not 0, create prefix bloom
// for memtable
uint32_t memtable_prefix_bloom_bits;
// number of hash probes per key
uint32_t memtable_prefix_bloom_probes;
};
//

@ -7,12 +7,16 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <gflags/gflags.h>
#include "rocksdb/filter_policy.h"
#include "util/logging.h"
#include "util/testharness.h"
#include "util/testutil.h"
DEFINE_int32(bits_per_key, 10, "");
namespace rocksdb {
static const int kVerbose = 1;
@ -29,7 +33,7 @@ class BloomTest {
std::vector<std::string> keys_;
public:
BloomTest() : policy_(NewBloomFilterPolicy(10)) { }
BloomTest() : policy_(NewBloomFilterPolicy(FLAGS_bits_per_key)) { }
~BloomTest() {
delete policy_;
@ -160,5 +164,7 @@ TEST(BloomTest, VaryingLengths) {
} // namespace rocksdb
int main(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
return rocksdb::test::RunAllTests();
}

@ -0,0 +1,63 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "dynamic_bloom.h"
#include "rocksdb/slice.h"
#include "util/hash.h"
namespace rocksdb {
namespace {
static uint32_t BloomHash(const Slice& key) {
return Hash(key.data(), key.size(), 0xbc9f1d34);
}
}
DynamicBloom::DynamicBloom(uint32_t total_bits,
uint32_t (*hash_func)(const Slice& key),
uint32_t num_probes)
: hash_func_(hash_func),
total_bits_((total_bits + 7) / 8 * 8),
num_probes_(num_probes) {
assert(hash_func_);
assert(num_probes_ > 0);
assert(total_bits_ > 0);
data_.reset(new unsigned char[total_bits_ / 8]());
}
DynamicBloom::DynamicBloom(uint32_t total_bits,
uint32_t num_probes)
: hash_func_(&BloomHash),
total_bits_((total_bits + 7) / 8 * 8),
num_probes_(num_probes) {
assert(num_probes_ > 0);
assert(total_bits_ > 0);
data_.reset(new unsigned char[total_bits_ / 8]());
}
void DynamicBloom::Add(const Slice& key) {
uint32_t h = hash_func_(key);
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (uint32_t i = 0; i < num_probes_; i++) {
const uint32_t bitpos = h % total_bits_;
data_[bitpos/8] |= (1 << (bitpos % 8));
h += delta;
}
}
bool DynamicBloom::MayContain(const Slice& key) {
uint32_t h = hash_func_(key);
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (uint32_t i = 0; i < num_probes_; i++) {
const uint32_t bitpos = h % total_bits_;
if ((data_[bitpos/8] & (1 << (bitpos % 8)))
== 0) return false;
h += delta;
}
return true;
}
}

@ -0,0 +1,42 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <atomic>
#include <memory>
namespace rocksdb {
class Slice;
class DynamicBloom {
public:
// total_bits: fixed total bits for the bloom
// hash_func: customized hash function
// num_probes: number of hash probes for a single key
DynamicBloom(uint32_t total_bits,
uint32_t (*hash_func)(const Slice& key),
uint32_t num_probes = 6);
explicit DynamicBloom(uint32_t total_bits, uint32_t num_probes = 6);
// Assuming single threaded access to Add
void Add(const Slice& key);
// Multithreaded access to MayContain is OK
bool MayContain(const Slice& key);
private:
uint32_t (*hash_func_)(const Slice& key);
uint32_t total_bits_;
uint32_t num_probes_;
std::unique_ptr<unsigned char[]> data_;
};
}

@ -0,0 +1,113 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <gflags/gflags.h>
#include "dynamic_bloom.h"
#include "util/logging.h"
#include "util/testharness.h"
#include "util/testutil.h"
DEFINE_int32(bits_per_key, 10, "");
DEFINE_int32(num_probes, 6, "");
namespace rocksdb {
static Slice Key(int i, char* buffer) {
memcpy(buffer, &i, sizeof(i));
return Slice(buffer, sizeof(i));
}
class DynamicBloomTest {
};
TEST(DynamicBloomTest, EmptyFilter) {
DynamicBloom bloom(100, 2);
ASSERT_TRUE(! bloom.MayContain("hello"));
ASSERT_TRUE(! bloom.MayContain("world"));
}
TEST(DynamicBloomTest, Small) {
DynamicBloom bloom(100, 2);
bloom.Add("hello");
bloom.Add("world");
ASSERT_TRUE(bloom.MayContain("hello"));
ASSERT_TRUE(bloom.MayContain("world"));
ASSERT_TRUE(! bloom.MayContain("x"));
ASSERT_TRUE(! bloom.MayContain("foo"));
}
static int NextLength(int length) {
if (length < 10) {
length += 1;
} else if (length < 100) {
length += 10;
} else if (length < 1000) {
length += 100;
} else {
length += 1000;
}
return length;
}
TEST(DynamicBloomTest, VaryingLengths) {
char buffer[sizeof(int)];
// Count number of filters that significantly exceed the false positive rate
int mediocre_filters = 0;
int good_filters = 0;
fprintf(stderr, "bits_per_key: %d num_probes: %d\n",
FLAGS_bits_per_key, FLAGS_num_probes);
for (int length = 1; length <= 10000; length = NextLength(length)) {
uint32_t bloom_bits = std::max(length * FLAGS_bits_per_key, 64);
DynamicBloom bloom(bloom_bits, FLAGS_num_probes);
for (int i = 0; i < length; i++) {
bloom.Add(Key(i, buffer));
ASSERT_TRUE(bloom.MayContain(Key(i, buffer)));
}
// All added keys must match
for (int i = 0; i < length; i++) {
ASSERT_TRUE(bloom.MayContain(Key(i, buffer)))
<< "Length " << length << "; key " << i;
}
// Check false positive rate
int result = 0;
for (int i = 0; i < 10000; i++) {
if (bloom.MayContain(Key(i + 1000000000, buffer))) {
result++;
}
}
double rate = result / 10000.0;
fprintf(stderr, "False positives: %5.2f%% @ length = %6d ; \n",
rate*100.0, length);
//ASSERT_LE(rate, 0.02); // Must not be over 2%
if (rate > 0.0125)
mediocre_filters++; // Allowed, but not too often
else
good_filters++;
}
fprintf(stderr, "Filters: %d good, %d mediocre\n",
good_filters, mediocre_filters);
ASSERT_LE(mediocre_filters, good_filters/5);
}
// Different bits-per-byte
} // namespace rocksdb
int main(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
return rocksdb::test::RunAllTests();
}

@ -119,11 +119,11 @@ class HashSkipListRep : public MemTableRep {
}
// Advance to the first entry with a key >= target
virtual void Seek(const Slice& user_key, const char* memtable_key) {
virtual void Seek(const Slice& internal_key, const char* memtable_key) {
if (list_ != nullptr) {
const char* encoded_key =
(memtable_key != nullptr) ?
memtable_key : EncodeKey(&tmp_, user_key);
memtable_key : EncodeKey(&tmp_, internal_key);
iter_.Seek(encoded_key);
}
}

@ -101,7 +101,9 @@ Options::Options()
table_factory(
std::shared_ptr<TableFactory>(new BlockBasedTableFactory())),
inplace_update_support(false),
inplace_update_num_locks(10000) {
inplace_update_num_locks(10000),
memtable_prefix_bloom_bits(0),
memtable_prefix_bloom_probes(6) {
assert(memtable_factory.get() != nullptr);
}
@ -292,6 +294,11 @@ Options::Dump(Logger* log) const
inplace_update_support);
Log(log, " Options.inplace_update_num_locks: %zd",
inplace_update_num_locks);
// TODO: easier config for bloom (maybe based on avg key/value size)
Log(log, " Options.memtable_prefix_bloom_bits: %d",
memtable_prefix_bloom_bits);
Log(log, " Options.memtable_prefix_bloom_probes: %d",
memtable_prefix_bloom_probes);
} // Options::Dump
//

Loading…
Cancel
Save