add whole key bloom filter support in memtables (#4985)

Summary:
MyRocks calls `GetForUpdate` on `INSERT`, for unique key check, and in almost all cases GetForUpdate returns empty result. For such cases, whole key bloom filter is helpful.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4985

Differential Revision: D14118257

Pulled By: miasantreble

fbshipit-source-id: d35cb7109c62fd5ad541a26968e3a3e16d3e85ea
main
Zhongyi Xie 6 years ago committed by Facebook Github Bot
parent c2affccc18
commit ed995c6a69
  1. 3
      HISTORY.md
  2. 50
      db/db_bloom_filter_test.cc
  3. 4
      db/external_sst_file_test.cc
  4. 51
      db/memtable.cc
  5. 5
      db/memtable.h
  6. 9
      include/rocksdb/advanced_options.h
  7. 2
      options/cf_options.cc
  8. 3
      options/cf_options.h
  9. 4
      options/options.cc
  10. 6
      options/options_helper.cc
  11. 1
      options/options_settable_test.cc
  12. 2
      options/options_test.cc
  13. 5
      table/block_based_table_builder.cc
  14. 3
      tools/db_bench_tool.cc
  15. 1
      tools/db_bench_tool_test.cc
  16. 6
      tools/db_stress.cc
  17. 1
      util/testutil.cc

@ -11,10 +11,11 @@
* Add support for trace sampling.
* Enable properties block checksum verification for block-based tables.
* For all users of dictionary compression, we now generate a separate dictionary for compressing each bottom-level SST file. Previously we reused a single dictionary for a whole compaction to bottom level. The new approach achieves better compression ratios; however, it uses more memory and CPU for buffering/sampling data blocks and training dictionaries.
* Add whole key bloom filter support in memtable.
* Files written by `SstFileWriter` will now use dictionary compression if it is configured in the file writer's `CompressionOptions`.
### Public API Change
* Disallow CompactionFilter::IgnoreSnapshots() = false, because it is not very useful and the behavior is confusing. The filter will filter everything if there is no snapshot declared by the time the compaction starts. However, users can define a snapshot after the compaction starts and before it finishes and this new snapshot won't be repeatable, because after the compaction finishes, some keys may be dropped.
* Disallow CompactionFilter::IgnoreSnapshots() = false, because it is not very useful and the behavior is confusing. The filter will filter everything if there is no snapshot declared by the time the compaction starts. However, users can define a snapshot after the compaction starts and before it finishes and this new snapshot won't be repeatable, because after the compaction finishes, some keys may be dropped.
* CompactionPri = kMinOverlappingRatio also uses compensated file size, which boosts file with lots of tombstones to be compacted first.
* Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot before doing the read. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_tracked with default value of false. If true it indicates that call is assumed to be after a ::GetForUpdate.
* `TableProperties::num_entries` and `TableProperties::num_deletions` now also account for number of range tombstones.

@ -786,6 +786,56 @@ TEST_F(DBBloomFilterTest, PrefixExtractorBlockFilter) {
delete iter;
}
TEST_F(DBBloomFilterTest, MemtableWholeKeyBloomFilter) {
// regression test for #2743. the range delete tombstones in memtable should
// be added even when Get() skips searching due to its prefix bloom filter
const int kMemtableSize = 1 << 20; // 1MB
const int kMemtablePrefixFilterSize = 1 << 13; // 8KB
const int kPrefixLen = 4;
Options options = CurrentOptions();
options.memtable_prefix_bloom_size_ratio =
static_cast<double>(kMemtablePrefixFilterSize) / kMemtableSize;
options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(kPrefixLen));
options.write_buffer_size = kMemtableSize;
options.memtable_whole_key_filtering = false;
Reopen(options);
std::string key1("AAAABBBB");
std::string key2("AAAACCCC"); // not in DB
std::string key3("AAAADDDD");
std::string key4("AAAAEEEE");
std::string value1("Value1");
std::string value3("Value3");
std::string value4("Value4");
ASSERT_OK(Put(key1, value1, WriteOptions()));
// check memtable bloom stats
ASSERT_EQ("NOT_FOUND", Get(key2));
ASSERT_EQ(0, get_perf_context()->bloom_memtable_miss_count);
// same prefix, bloom filter false positive
ASSERT_EQ(1, get_perf_context()->bloom_memtable_hit_count);
// enable whole key bloom filter
options.memtable_whole_key_filtering = true;
Reopen(options);
// check memtable bloom stats
ASSERT_OK(Put(key3, value3, WriteOptions()));
ASSERT_EQ("NOT_FOUND", Get(key2));
// whole key bloom filter kicks in and determines it's a miss
ASSERT_EQ(1, get_perf_context()->bloom_memtable_miss_count);
ASSERT_EQ(1, get_perf_context()->bloom_memtable_hit_count);
// verify whole key filtering does not depend on prefix_extractor
options.prefix_extractor.reset();
Reopen(options);
// check memtable bloom stats
ASSERT_OK(Put(key4, value4, WriteOptions()));
ASSERT_EQ("NOT_FOUND", Get(key2));
// whole key bloom filter kicks in and determines it's a miss
ASSERT_EQ(2, get_perf_context()->bloom_memtable_miss_count);
ASSERT_EQ(1, get_perf_context()->bloom_memtable_hit_count);
}
#ifndef ROCKSDB_LITE
class BloomStatsTestWithParam
: public DBBloomFilterTest,

@ -2333,9 +2333,7 @@ TEST_F(ExternalSSTFileTest, IngestFileWrittenWithCompressionDictionary) {
std::atomic<int> num_compression_dicts(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
[&](void* /* arg */) {
++num_compression_dicts;
});
[&](void* /* arg */) { ++num_compression_dicts; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);

@ -51,6 +51,8 @@ ImmutableMemTableOptions::ImmutableMemTableOptions(
mutable_cf_options.memtable_prefix_bloom_size_ratio) *
8u),
memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size),
memtable_whole_key_filtering(
mutable_cf_options.memtable_whole_key_filtering),
inplace_update_support(ioptions.inplace_update_support),
inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
inplace_callback(ioptions.inplace_callback),
@ -109,8 +111,10 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(
// use bloom_filter_ for both whole key and prefix bloom filter
if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) &&
moptions_.memtable_prefix_bloom_bits > 0) {
bloom_filter_.reset(
new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
ioptions.bloom_locality, 6 /* hard coded 6 probes */,
moptions_.memtable_huge_page_size, ioptions.info_log));
@ -282,7 +286,7 @@ class MemTableIterator : public InternalIterator {
if (use_range_del_table) {
iter_ = mem.range_del_table_->GetIterator(arena);
} else if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
bloom_ = mem.prefix_bloom_.get();
bloom_ = mem.bloom_filter_.get();
iter_ = mem.table_->GetDynamicPrefixIterator(arena);
} else {
iter_ = mem.table_->GetIterator(arena);
@ -313,7 +317,8 @@ class MemTableIterator : public InternalIterator {
void Seek(const Slice& k) override {
PERF_TIMER_GUARD(seek_on_memtable_time);
PERF_COUNTER_ADD(seek_on_memtable_count, 1);
if (bloom_ != nullptr) {
if (bloom_) {
// iterator should only use prefix bloom filter
if (!bloom_->MayContain(
prefix_extractor_->Transform(ExtractUserKey(k)))) {
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
@ -329,7 +334,7 @@ class MemTableIterator : public InternalIterator {
void SeekForPrev(const Slice& k) override {
PERF_TIMER_GUARD(seek_on_memtable_time);
PERF_COUNTER_ADD(seek_on_memtable_count, 1);
if (bloom_ != nullptr) {
if (bloom_) {
if (!bloom_->MayContain(
prefix_extractor_->Transform(ExtractUserKey(k)))) {
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
@ -515,9 +520,11 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
std::memory_order_relaxed);
}
if (prefix_bloom_) {
assert(prefix_extractor_);
prefix_bloom_->Add(prefix_extractor_->Transform(key));
if (bloom_filter_ && prefix_extractor_) {
bloom_filter_->Add(prefix_extractor_->Transform(key));
}
if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
bloom_filter_->Add(key);
}
// The first sequence number inserted into the memtable
@ -546,9 +553,11 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
post_process_info->num_deletes++;
}
if (prefix_bloom_) {
assert(prefix_extractor_);
prefix_bloom_->AddConcurrently(prefix_extractor_->Transform(key));
if (bloom_filter_ && prefix_extractor_) {
bloom_filter_->AddConcurrently(prefix_extractor_->Transform(key));
}
if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
bloom_filter_->AddConcurrently(key);
}
// atomically update first_seqno_ and earliest_seqno_.
@ -755,16 +764,24 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
Slice user_key = key.user_key();
bool found_final_value = false;
bool merge_in_progress = s->IsMergeInProgress();
bool const may_contain =
nullptr == prefix_bloom_
? false
: prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key));
if (prefix_bloom_ && !may_contain) {
bool may_contain = true;
if (bloom_filter_) {
// when both memtable_whole_key_filtering and prefix_extractor_ are set,
// only do whole key filtering for Get() to save CPU
if (moptions_.memtable_whole_key_filtering) {
may_contain = bloom_filter_->MayContain(user_key);
} else {
assert(prefix_extractor_);
may_contain =
bloom_filter_->MayContain(prefix_extractor_->Transform(user_key));
}
}
if (bloom_filter_ && !may_contain) {
// iter is null if prefix bloom says the key does not exist
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
*seq = kMaxSequenceNumber;
} else {
if (prefix_bloom_) {
if (bloom_filter_) {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
Saver saver;

@ -41,6 +41,7 @@ struct ImmutableMemTableOptions {
size_t arena_block_size;
uint32_t memtable_prefix_bloom_bits;
size_t memtable_huge_page_size;
bool memtable_whole_key_filtering;
bool inplace_update_support;
size_t inplace_update_num_locks;
UpdateStatus (*inplace_callback)(char* existing_value,
@ -274,7 +275,7 @@ class MemTable {
// memtable prefix bloom is disabled, since we can't easily allocate more
// space.
void UpdateWriteBufferSize(size_t new_write_buffer_size) {
if (prefix_bloom_ == nullptr ||
if (bloom_filter_ == nullptr ||
new_write_buffer_size < write_buffer_size_) {
write_buffer_size_.store(new_write_buffer_size,
std::memory_order_relaxed);
@ -454,7 +455,7 @@ class MemTable {
std::vector<port::RWMutex> locks_;
const SliceTransform* const prefix_extractor_;
std::unique_ptr<DynamicBloom> prefix_bloom_;
std::unique_ptr<DynamicBloom> bloom_filter_;
std::atomic<FlushStateEnum> flush_state_;

@ -272,6 +272,15 @@ struct AdvancedColumnFamilyOptions {
// Dynamically changeable through SetOptions() API
double memtable_prefix_bloom_size_ratio = 0.0;
// Enable whole key bloom filter in memtable. Note this will only take effect
// if memtable_prefix_bloom_size_ratio is not 0. Enabling whole key filtering
// can potentially reduce CPU usage for point-look-ups.
//
// Default: false (disable)
//
// Dynamically changeable through SetOptions() API
bool memtable_whole_key_filtering = false;
// Page size for huge page for the arena used by the memtable. If <=0, it
// won't allocate from huge page but from malloc.
// Users are responsible to reserve huge pages for it to be allocated. For

@ -135,6 +135,8 @@ void MutableCFOptions::Dump(Logger* log) const {
arena_block_size);
ROCKS_LOG_INFO(log, " memtable_prefix_bloom_ratio: %f",
memtable_prefix_bloom_size_ratio);
ROCKS_LOG_INFO(log, " memtable_whole_key_filtering: %d",
memtable_whole_key_filtering);
ROCKS_LOG_INFO(log,
" memtable_huge_page_size: %" ROCKSDB_PRIszt,
memtable_huge_page_size);

@ -131,6 +131,7 @@ struct MutableCFOptions {
arena_block_size(options.arena_block_size),
memtable_prefix_bloom_size_ratio(
options.memtable_prefix_bloom_size_ratio),
memtable_whole_key_filtering(options.memtable_whole_key_filtering),
memtable_huge_page_size(options.memtable_huge_page_size),
max_successive_merges(options.max_successive_merges),
inplace_update_num_locks(options.inplace_update_num_locks),
@ -167,6 +168,7 @@ struct MutableCFOptions {
max_write_buffer_number(0),
arena_block_size(0),
memtable_prefix_bloom_size_ratio(0),
memtable_whole_key_filtering(false),
memtable_huge_page_size(0),
max_successive_merges(0),
inplace_update_num_locks(0),
@ -213,6 +215,7 @@ struct MutableCFOptions {
int max_write_buffer_number;
size_t arena_block_size;
double memtable_prefix_bloom_size_ratio;
bool memtable_whole_key_filtering;
size_t memtable_huge_page_size;
size_t max_successive_merges;
size_t inplace_update_num_locks;

@ -51,6 +51,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options)
inplace_callback(options.inplace_callback),
memtable_prefix_bloom_size_ratio(
options.memtable_prefix_bloom_size_ratio),
memtable_whole_key_filtering(options.memtable_whole_key_filtering),
memtable_huge_page_size(options.memtable_huge_page_size),
memtable_insert_with_hint_prefix_extractor(
options.memtable_insert_with_hint_prefix_extractor),
@ -325,6 +326,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(
log, " Options.memtable_prefix_bloom_size_ratio: %f",
memtable_prefix_bloom_size_ratio);
ROCKS_LOG_HEADER(log,
" Options.memtable_whole_key_filtering: %d",
memtable_whole_key_filtering);
ROCKS_LOG_HEADER(log, " Options.memtable_huge_page_size: %" ROCKSDB_PRIszt,
memtable_huge_page_size);

@ -142,6 +142,8 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
cf_opts.arena_block_size = mutable_cf_options.arena_block_size;
cf_opts.memtable_prefix_bloom_size_ratio =
mutable_cf_options.memtable_prefix_bloom_size_ratio;
cf_opts.memtable_whole_key_filtering =
mutable_cf_options.memtable_whole_key_filtering;
cf_opts.memtable_huge_page_size = mutable_cf_options.memtable_huge_page_size;
cf_opts.max_successive_merges = mutable_cf_options.max_successive_merges;
cf_opts.inplace_update_num_locks =
@ -1801,6 +1803,10 @@ std::unordered_map<std::string, OptionTypeInfo>
{"memtable_prefix_bloom_probes",
{0, OptionType::kUInt32T, OptionVerificationType::kDeprecated, true,
0}},
{"memtable_whole_key_filtering",
{offset_of(&ColumnFamilyOptions::memtable_whole_key_filtering),
OptionType::kBoolean, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, memtable_whole_key_filtering)}},
{"min_partial_merge_operands",
{0, OptionType::kUInt32T, OptionVerificationType::kDeprecated, true,
0}},

@ -435,6 +435,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"max_write_buffer_number_to_maintain=84;"
"merge_operator=aabcxehazrMergeOperator;"
"memtable_prefix_bloom_size_ratio=0.4642;"
"memtable_whole_key_filtering=true;"
"memtable_insert_with_hint_prefix_extractor=rocksdb.CappedPrefix.13;"
"paranoid_file_checks=true;"
"force_consistency_checks=true;"

@ -90,6 +90,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"compaction_measure_io_stats", "false"},
{"inplace_update_num_locks", "25"},
{"memtable_prefix_bloom_size_ratio", "0.26"},
{"memtable_whole_key_filtering", "true"},
{"memtable_huge_page_size", "28"},
{"bloom_locality", "29"},
{"max_successive_merges", "30"},
@ -195,6 +196,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.inplace_update_support, true);
ASSERT_EQ(new_cf_opt.inplace_update_num_locks, 25U);
ASSERT_EQ(new_cf_opt.memtable_prefix_bloom_size_ratio, 0.26);
ASSERT_EQ(new_cf_opt.memtable_whole_key_filtering, true);
ASSERT_EQ(new_cf_opt.memtable_huge_page_size, 28U);
ASSERT_EQ(new_cf_opt.bloom_locality, 29U);
ASSERT_EQ(new_cf_opt.max_successive_merges, 30U);

@ -354,9 +354,8 @@ struct BlockBasedTableBuilder::Rep {
compression_dict(),
compression_ctx(_compression_type),
verify_dict(),
state((_compression_opts.max_dict_bytes > 0)
? State::kBuffered
: State::kUnbuffered),
state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
: State::kUnbuffered),
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
!table_opt.block_align),
compressed_cache_key_prefix_size(0),

@ -514,6 +514,8 @@ DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
DEFINE_double(memtable_bloom_size_ratio, 0,
"Ratio of memtable size used for bloom filter. 0 means no bloom "
"filter.");
DEFINE_bool(memtable_whole_key_filtering, false,
"Try to use whole key bloom filter in memtables.");
DEFINE_bool(memtable_use_huge_page, false,
"Try to use huge page in memtables.");
@ -3247,6 +3249,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
}
options.memtable_huge_page_size = FLAGS_memtable_use_huge_page ? 2048 : 0;
options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_bloom_size_ratio;
options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
if (FLAGS_memtable_insert_with_hint_prefix_size > 0) {
options.memtable_insert_with_hint_prefix_extractor.reset(
NewCappedPrefixTransform(

@ -248,6 +248,7 @@ const std::string options_file_content = R"OPTIONS_FILE(
verify_checksums_in_compaction=true
merge_operator=nullptr
memtable_prefix_bloom_bits=0
memtable_whole_key_filtering=true
paranoid_file_checks=false
inplace_update_num_locks=10000
optimize_filters_for_hits=false

@ -210,6 +210,10 @@ DEFINE_double(memtable_prefix_bloom_size_ratio,
"creates prefix blooms for memtables, each with size "
"`write_buffer_size * memtable_prefix_bloom_size_ratio`.");
DEFINE_bool(memtable_whole_key_filtering,
rocksdb::Options().memtable_whole_key_filtering,
"Enable whole key filtering in memtables.");
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
"Maximum number of files to keep open at the same time "
"(use default if == 0)");
@ -2583,6 +2587,8 @@ class StressTest {
FLAGS_max_write_buffer_number_to_maintain;
options_.memtable_prefix_bloom_size_ratio =
FLAGS_memtable_prefix_bloom_size_ratio;
options_.memtable_whole_key_filtering =
FLAGS_memtable_whole_key_filtering;
options_.max_background_compactions = FLAGS_max_background_compactions;
options_.max_background_flushes = FLAGS_max_background_flushes;
options_.compaction_style =

@ -306,6 +306,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, Random* rnd) {
cf_opt->purge_redundant_kvs_while_flush = rnd->Uniform(2);
cf_opt->force_consistency_checks = rnd->Uniform(2);
cf_opt->compaction_options_fifo.allow_compaction = rnd->Uniform(2);
cf_opt->memtable_whole_key_filtering = rnd->Uniform(2);
// double options
cf_opt->hard_rate_limit = static_cast<double>(rnd->Uniform(10000)) / 13;

Loading…
Cancel
Save