From 8b0097b49bc64b25f40c8c25a07f48c049860384 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Wed, 22 Mar 2017 09:11:23 -0700 Subject: [PATCH] Readers for partition filter Summary: This is the last split of this pull request: https://github.com/facebook/rocksdb/pull/1891 which includes the reader part as well as the tests. Closes https://github.com/facebook/rocksdb/pull/1961 Differential Revision: D4672216 Pulled By: maysamyabandeh fbshipit-source-id: 6a2b829 --- Makefile | 4 + db/db_bloom_filter_test.cc | 512 +++++++++++++++---------- db/db_test2.cc | 12 +- db/db_test_util.cc | 12 + db/db_test_util.h | 6 + table/block_based_filter_block.cc | 10 +- table/block_based_filter_block.h | 12 +- table/block_based_table_reader.cc | 297 ++++++++++---- table/block_based_table_reader.h | 63 ++- table/filter_block.h | 26 +- table/full_filter_block.cc | 9 +- table/full_filter_block.h | 12 +- table/partitioned_filter_block.cc | 150 ++++++++ table/partitioned_filter_block.h | 41 +- table/partitioned_filter_block_test.cc | 242 ++++++++++++ 15 files changed, 1078 insertions(+), 330 deletions(-) create mode 100644 table/partitioned_filter_block_test.cc diff --git a/Makefile b/Makefile index a0fa4028f..c798159a0 100644 --- a/Makefile +++ b/Makefile @@ -354,6 +354,7 @@ TESTS = \ file_reader_writer_test \ block_based_filter_block_test \ full_filter_block_test \ + partitioned_filter_block_test \ hash_table_test \ histogram_test \ log_test \ @@ -1158,6 +1159,9 @@ block_based_filter_block_test: table/block_based_filter_block_test.o $(LIBOBJECT full_filter_block_test: table/full_filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +partitioned_filter_block_test: table/partitioned_filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_bloom_filter_test.cc b/db/db_bloom_filter_test.cc index 2cc14dba8..11e1b0646 100644 --- a/db/db_bloom_filter_test.cc +++ b/db/db_bloom_filter_test.cc @@ -20,17 +20,23 @@ class DBBloomFilterTest : public DBTestBase { DBBloomFilterTest() : DBTestBase("/db_bloom_filter_test") {} }; -class DBBloomFilterTestWithParam : public DBTestBase, - public testing::WithParamInterface { +class DBBloomFilterTestWithParam + : public DBTestBase, + public testing::WithParamInterface> { + // public testing::WithParamInterface { protected: bool use_block_based_filter_; + bool partition_filters_; public: DBBloomFilterTestWithParam() : DBTestBase("/db_bloom_filter_tests") {} ~DBBloomFilterTestWithParam() {} - void SetUp() override { use_block_based_filter_ = GetParam(); } + void SetUp() override { + use_block_based_filter_ = std::get<0>(GetParam()); + partition_filters_ = std::get<1>(GetParam()); + } }; // KeyMayExist can lead to a few false positives, but not false negatives. @@ -43,7 +49,17 @@ TEST_P(DBBloomFilterTestWithParam, KeyMayExist) { anon::OptionsOverride options_override; options_override.filter_policy.reset( NewBloomFilterPolicy(20, use_block_based_filter_)); + options_override.partition_filters = partition_filters_; + options_override.index_per_partition = 2; Options options = CurrentOptions(options_override); + if (partition_filters_ && + static_cast( + options.table_factory->GetOptions()) + ->index_type != BlockBasedTableOptions::kTwoLevelIndexSearch) { + // In the current implementation partitioned filters depend on partitioned + // indexes + continue; + } options.statistics = rocksdb::CreateDBStatistics(); CreateAndReopenWithCF({"pikachu"}, options); @@ -102,192 +118,204 @@ TEST_P(DBBloomFilterTestWithParam, KeyMayExist) { } TEST_F(DBBloomFilterTest, GetFilterByPrefixBloom) { - Options options = last_options_; - options.prefix_extractor.reset(NewFixedPrefixTransform(8)); - options.statistics = rocksdb::CreateDBStatistics(); - BlockBasedTableOptions bbto; - bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); - bbto.whole_key_filtering = false; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - DestroyAndReopen(options); + for (bool partition_filters : {true, false}) { + Options options = last_options_; + options.prefix_extractor.reset(NewFixedPrefixTransform(8)); + options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); + if (partition_filters) { + bbto.partition_filters = true; + bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + } + bbto.whole_key_filtering = false; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyAndReopen(options); - WriteOptions wo; - ReadOptions ro; - FlushOptions fo; - fo.wait = true; - std::string value; + WriteOptions wo; + ReadOptions ro; + FlushOptions fo; + fo.wait = true; + std::string value; - ASSERT_OK(dbfull()->Put(wo, "barbarbar", "foo")); - ASSERT_OK(dbfull()->Put(wo, "barbarbar2", "foo2")); - ASSERT_OK(dbfull()->Put(wo, "foofoofoo", "bar")); + ASSERT_OK(dbfull()->Put(wo, "barbarbar", "foo")); + ASSERT_OK(dbfull()->Put(wo, "barbarbar2", "foo2")); + ASSERT_OK(dbfull()->Put(wo, "foofoofoo", "bar")); - dbfull()->Flush(fo); + dbfull()->Flush(fo); - ASSERT_EQ("foo", Get("barbarbar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); - ASSERT_EQ("foo2", Get("barbarbar2")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); - ASSERT_EQ("NOT_FOUND", Get("barbarbar3")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); + ASSERT_EQ("foo", Get("barbarbar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); + ASSERT_EQ("foo2", Get("barbarbar2")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); + ASSERT_EQ("NOT_FOUND", Get("barbarbar3")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); - ASSERT_EQ("NOT_FOUND", Get("barfoofoo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + ASSERT_EQ("NOT_FOUND", Get("barfoofoo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - ASSERT_EQ("NOT_FOUND", Get("foobarbar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 2); + ASSERT_EQ("NOT_FOUND", Get("foobarbar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 2); - ro.total_order_seek = true; - ASSERT_TRUE(db_->Get(ro, "foobarbar", &value).IsNotFound()); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 2); + ro.total_order_seek = true; + ASSERT_TRUE(db_->Get(ro, "foobarbar", &value).IsNotFound()); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 2); + } } TEST_F(DBBloomFilterTest, WholeKeyFilterProp) { - Options options = last_options_; - options.prefix_extractor.reset(NewFixedPrefixTransform(3)); - options.statistics = rocksdb::CreateDBStatistics(); + for (bool partition_filters : {true, false}) { + Options options = last_options_; + options.prefix_extractor.reset(NewFixedPrefixTransform(3)); + options.statistics = rocksdb::CreateDBStatistics(); - BlockBasedTableOptions bbto; - bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); - bbto.whole_key_filtering = false; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - DestroyAndReopen(options); + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); + bbto.whole_key_filtering = false; + if (partition_filters) { + bbto.partition_filters = true; + bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + } + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyAndReopen(options); - WriteOptions wo; - ReadOptions ro; - FlushOptions fo; - fo.wait = true; - std::string value; - - ASSERT_OK(dbfull()->Put(wo, "foobar", "foo")); - // Needs insert some keys to make sure files are not filtered out by key - // ranges. - ASSERT_OK(dbfull()->Put(wo, "aaa", "")); - ASSERT_OK(dbfull()->Put(wo, "zzz", "")); - dbfull()->Flush(fo); - - Reopen(options); - ASSERT_EQ("NOT_FOUND", Get("foo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); - ASSERT_EQ("NOT_FOUND", Get("bar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - ASSERT_EQ("foo", Get("foobar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - - // Reopen with whole key filtering enabled and prefix extractor - // NULL. Bloom filter should be off for both of whole key and - // prefix bloom. - bbto.whole_key_filtering = true; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - options.prefix_extractor.reset(); - Reopen(options); - - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - ASSERT_EQ("NOT_FOUND", Get("foo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - ASSERT_EQ("NOT_FOUND", Get("bar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - ASSERT_EQ("foo", Get("foobar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - // Write DB with only full key filtering. - ASSERT_OK(dbfull()->Put(wo, "foobar", "foo")); - // Needs insert some keys to make sure files are not filtered out by key - // ranges. - ASSERT_OK(dbfull()->Put(wo, "aaa", "")); - ASSERT_OK(dbfull()->Put(wo, "zzz", "")); - db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); - - // Reopen with both of whole key off and prefix extractor enabled. - // Still no bloom filter should be used. - options.prefix_extractor.reset(NewFixedPrefixTransform(3)); - bbto.whole_key_filtering = false; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - Reopen(options); - - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - ASSERT_EQ("NOT_FOUND", Get("foo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - ASSERT_EQ("NOT_FOUND", Get("bar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - ASSERT_EQ("foo", Get("foobar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - - // Try to create a DB with mixed files: - ASSERT_OK(dbfull()->Put(wo, "foobar", "foo")); - // Needs insert some keys to make sure files are not filtered out by key - // ranges. - ASSERT_OK(dbfull()->Put(wo, "aaa", "")); - ASSERT_OK(dbfull()->Put(wo, "zzz", "")); - db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); - - options.prefix_extractor.reset(); - bbto.whole_key_filtering = true; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - Reopen(options); - - // Try to create a DB with mixed files. - ASSERT_OK(dbfull()->Put(wo, "barfoo", "bar")); - // In this case needs insert some keys to make sure files are - // not filtered out by key ranges. - ASSERT_OK(dbfull()->Put(wo, "aaa", "")); - ASSERT_OK(dbfull()->Put(wo, "zzz", "")); - Flush(); + WriteOptions wo; + ReadOptions ro; + FlushOptions fo; + fo.wait = true; + std::string value; - // Now we have two files: - // File 1: An older file with prefix bloom. - // File 2: A newer file with whole bloom filter. - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); - ASSERT_EQ("NOT_FOUND", Get("foo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 2); - ASSERT_EQ("NOT_FOUND", Get("bar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 3); - ASSERT_EQ("foo", Get("foobar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 4); - ASSERT_EQ("bar", Get("barfoo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 4); - - // Reopen with the same setting: only whole key is used - Reopen(options); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 4); - ASSERT_EQ("NOT_FOUND", Get("foo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 5); - ASSERT_EQ("NOT_FOUND", Get("bar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 6); - ASSERT_EQ("foo", Get("foobar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 7); - ASSERT_EQ("bar", Get("barfoo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 7); - - // Restart with both filters are allowed - options.prefix_extractor.reset(NewFixedPrefixTransform(3)); - bbto.whole_key_filtering = true; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - Reopen(options); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 7); - // File 1 will has it filtered out. - // File 2 will not, as prefix `foo` exists in the file. - ASSERT_EQ("NOT_FOUND", Get("foo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 8); - ASSERT_EQ("NOT_FOUND", Get("bar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 10); - ASSERT_EQ("foo", Get("foobar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 11); - ASSERT_EQ("bar", Get("barfoo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 11); - - // Restart with only prefix bloom is allowed. - options.prefix_extractor.reset(NewFixedPrefixTransform(3)); - bbto.whole_key_filtering = false; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - Reopen(options); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 11); - ASSERT_EQ("NOT_FOUND", Get("foo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 11); - ASSERT_EQ("NOT_FOUND", Get("bar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 12); - ASSERT_EQ("foo", Get("foobar")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 12); - ASSERT_EQ("bar", Get("barfoo")); - ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 12); + ASSERT_OK(dbfull()->Put(wo, "foobar", "foo")); + // Needs insert some keys to make sure files are not filtered out by key + // ranges. + ASSERT_OK(dbfull()->Put(wo, "aaa", "")); + ASSERT_OK(dbfull()->Put(wo, "zzz", "")); + dbfull()->Flush(fo); + + Reopen(options); + ASSERT_EQ("NOT_FOUND", Get("foo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); + ASSERT_EQ("NOT_FOUND", Get("bar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + ASSERT_EQ("foo", Get("foobar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + + // Reopen with whole key filtering enabled and prefix extractor + // NULL. Bloom filter should be off for both of whole key and + // prefix bloom. + bbto.whole_key_filtering = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + options.prefix_extractor.reset(); + Reopen(options); + + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + ASSERT_EQ("NOT_FOUND", Get("foo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + ASSERT_EQ("NOT_FOUND", Get("bar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + ASSERT_EQ("foo", Get("foobar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + // Write DB with only full key filtering. + ASSERT_OK(dbfull()->Put(wo, "foobar", "foo")); + // Needs insert some keys to make sure files are not filtered out by key + // ranges. + ASSERT_OK(dbfull()->Put(wo, "aaa", "")); + ASSERT_OK(dbfull()->Put(wo, "zzz", "")); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + // Reopen with both of whole key off and prefix extractor enabled. + // Still no bloom filter should be used. + options.prefix_extractor.reset(NewFixedPrefixTransform(3)); + bbto.whole_key_filtering = false; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + ASSERT_EQ("NOT_FOUND", Get("foo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + ASSERT_EQ("NOT_FOUND", Get("bar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + ASSERT_EQ("foo", Get("foobar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + + // Try to create a DB with mixed files: + ASSERT_OK(dbfull()->Put(wo, "foobar", "foo")); + // Needs insert some keys to make sure files are not filtered out by key + // ranges. + ASSERT_OK(dbfull()->Put(wo, "aaa", "")); + ASSERT_OK(dbfull()->Put(wo, "zzz", "")); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + options.prefix_extractor.reset(); + bbto.whole_key_filtering = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + + // Try to create a DB with mixed files. + ASSERT_OK(dbfull()->Put(wo, "barfoo", "bar")); + // In this case needs insert some keys to make sure files are + // not filtered out by key ranges. + ASSERT_OK(dbfull()->Put(wo, "aaa", "")); + ASSERT_OK(dbfull()->Put(wo, "zzz", "")); + Flush(); + + // Now we have two files: + // File 1: An older file with prefix bloom. + // File 2: A newer file with whole bloom filter. + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 1); + ASSERT_EQ("NOT_FOUND", Get("foo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 2); + ASSERT_EQ("NOT_FOUND", Get("bar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 3); + ASSERT_EQ("foo", Get("foobar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 4); + ASSERT_EQ("bar", Get("barfoo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 4); + + // Reopen with the same setting: only whole key is used + Reopen(options); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 4); + ASSERT_EQ("NOT_FOUND", Get("foo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 5); + ASSERT_EQ("NOT_FOUND", Get("bar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 6); + ASSERT_EQ("foo", Get("foobar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 7); + ASSERT_EQ("bar", Get("barfoo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 7); + + // Restart with both filters are allowed + options.prefix_extractor.reset(NewFixedPrefixTransform(3)); + bbto.whole_key_filtering = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 7); + // File 1 will has it filtered out. + // File 2 will not, as prefix `foo` exists in the file. + ASSERT_EQ("NOT_FOUND", Get("foo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 8); + ASSERT_EQ("NOT_FOUND", Get("bar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 10); + ASSERT_EQ("foo", Get("foobar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 11); + ASSERT_EQ("bar", Get("barfoo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 11); + + // Restart with only prefix bloom is allowed. + options.prefix_extractor.reset(NewFixedPrefixTransform(3)); + bbto.whole_key_filtering = false; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 11); + ASSERT_EQ("NOT_FOUND", Get("foo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 11); + ASSERT_EQ("NOT_FOUND", Get("bar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 12); + ASSERT_EQ("foo", Get("foobar")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 12); + ASSERT_EQ("bar", Get("barfoo")); + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 12); + } } TEST_P(DBBloomFilterTestWithParam, BloomFilter) { @@ -301,6 +329,12 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) { table_options.no_block_cache = true; table_options.filter_policy.reset( NewBloomFilterPolicy(10, use_block_based_filter_)); + table_options.partition_filters = partition_filters_; + if (partition_filters_) { + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + } + table_options.index_per_partition = 2; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); CreateAndReopenWithCF({"pikachu"}, options); @@ -327,7 +361,13 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) { int reads = env_->random_read_counter_.Read(); fprintf(stderr, "%d present => %d reads\n", N, reads); ASSERT_GE(reads, N); - ASSERT_LE(reads, N + 2 * N / 100); + if (partition_filters_) { + // Without block cache, we read an extra partition filter per each + // level*read and a partition index per each read + ASSERT_LE(reads, 4 * N + 2 * N / 100); + } else { + ASSERT_LE(reads, N + 2 * N / 100); + } // Lookup present keys. Should rarely read from either sstable. env_->random_read_counter_.Reset(); @@ -336,7 +376,13 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) { } reads = env_->random_read_counter_.Read(); fprintf(stderr, "%d missing => %d reads\n", N, reads); - ASSERT_LE(reads, 3 * N / 100); + if (partition_filters_) { + // With partitioned filter we read one extra filter per level per each + // missed read. + ASSERT_LE(reads, 2 * N + 3 * N / 100); + } else { + ASSERT_LE(reads, 3 * N / 100); + } env_->delay_sstable_sync_.store(false, std::memory_order_release); Close(); @@ -344,7 +390,9 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) { } INSTANTIATE_TEST_CASE_P(DBBloomFilterTestWithParam, DBBloomFilterTestWithParam, - ::testing::Bool()); + ::testing::Values(std::make_tuple(true, false), + std::make_tuple(false, true), + std::make_tuple(false, false))); TEST_F(DBBloomFilterTest, BloomFilterRate) { while (ChangeFilterOptions()) { @@ -401,28 +449,13 @@ TEST_F(DBBloomFilterTest, BloomFilterCompatibility) { ASSERT_EQ(Key(i), Get(1, Key(i))); } ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); -} -TEST_F(DBBloomFilterTest, BloomFilterReverseCompatibility) { - Options options = CurrentOptions(); - options.statistics = rocksdb::CreateDBStatistics(); - BlockBasedTableOptions table_options; + // Check db with partitioned full filter + table_options.partition_filters = true; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - - // Create with full filter - CreateAndReopenWithCF({"pikachu"}, options); - - const int maxKey = 10000; - for (int i = 0; i < maxKey; i++) { - ASSERT_OK(Put(1, Key(i), Key(i))); - } - ASSERT_OK(Put(1, Key(maxKey + 55555), Key(maxKey + 55555))); - Flush(1); - - // Check db with block_based filter - table_options.filter_policy.reset(NewBloomFilterPolicy(10, true)); - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); ReopenWithColumnFamilies({"default", "pikachu"}, options); // Check if they can be found @@ -432,6 +465,43 @@ TEST_F(DBBloomFilterTest, BloomFilterReverseCompatibility) { ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); } +TEST_F(DBBloomFilterTest, BloomFilterReverseCompatibility) { + for (bool partition_filters : {true, false}) { + Options options = CurrentOptions(); + options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions table_options; + if (partition_filters) { + table_options.partition_filters = true; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + } + table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + // Create with full filter + CreateAndReopenWithCF({"pikachu"}, options); + + const int maxKey = 10000; + for (int i = 0; i < maxKey; i++) { + ASSERT_OK(Put(1, Key(i), Key(i))); + } + ASSERT_OK(Put(1, Key(maxKey + 55555), Key(maxKey + 55555))); + Flush(1); + + // Check db with block_based filter + table_options.filter_policy.reset(NewBloomFilterPolicy(10, true)); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + + // Check if they can be found + for (int i = 0; i < maxKey; i++) { + ASSERT_EQ(Key(i), Get(1, Key(i))); + } + ASSERT_EQ(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0); + } +} + namespace { // A wrapped bloom over default FilterPolicy class WrappedBloom : public FilterPolicy { @@ -586,11 +656,12 @@ TEST_F(DBBloomFilterTest, PrefixExtractorBlockFilter) { #ifndef ROCKSDB_LITE class BloomStatsTestWithParam : public DBBloomFilterTest, - public testing::WithParamInterface> { + public testing::WithParamInterface> { public: BloomStatsTestWithParam() { use_block_table_ = std::get<0>(GetParam()); use_block_based_builder_ = std::get<1>(GetParam()); + partition_filters_ = std::get<2>(GetParam()); options_.create_if_missing = true; options_.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(4)); @@ -599,10 +670,17 @@ class BloomStatsTestWithParam if (use_block_table_) { BlockBasedTableOptions table_options; table_options.hash_index_allow_collision = false; + if (partition_filters_) { + assert(!use_block_based_builder_); + table_options.partition_filters = partition_filters_; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + } table_options.filter_policy.reset( NewBloomFilterPolicy(10, use_block_based_builder_)); options_.table_factory.reset(NewBlockBasedTableFactory(table_options)); } else { + assert(!partition_filters_); // not supported in plain table PlainTableOptions table_options; options_.table_factory.reset(NewPlainTableFactory(table_options)); } @@ -623,6 +701,7 @@ class BloomStatsTestWithParam bool use_block_table_; bool use_block_based_builder_; + bool partition_filters_; Options options_; }; @@ -717,25 +796,44 @@ TEST_P(BloomStatsTestWithParam, BloomStatsTestWithIter) { ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ(value1, iter->value().ToString()); - ASSERT_EQ(1, perf_context.bloom_sst_hit_count); + if (partition_filters_) { + ASSERT_EQ(0, perf_context.bloom_sst_hit_count); // no_io + ASSERT_EQ(0, perf_context.bloom_sst_miss_count); // no_io + } else { + ASSERT_EQ(1, perf_context.bloom_sst_hit_count); + } iter->Seek(key3); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); ASSERT_EQ(value3, iter->value().ToString()); - ASSERT_EQ(2, perf_context.bloom_sst_hit_count); + if (partition_filters_) { + ASSERT_EQ(0, perf_context.bloom_sst_hit_count); // no_io + ASSERT_EQ(0, perf_context.bloom_sst_miss_count); // no_io + } else { + ASSERT_EQ(2, perf_context.bloom_sst_hit_count); + } iter->Seek(key2); ASSERT_OK(iter->status()); - ASSERT_TRUE(!iter->Valid()); - ASSERT_EQ(1, perf_context.bloom_sst_miss_count); - ASSERT_EQ(2, perf_context.bloom_sst_hit_count); + if (partition_filters_) { + // iter is still valid since filter did not reject the key2 + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(0, perf_context.bloom_sst_hit_count); // no_io + ASSERT_EQ(0, perf_context.bloom_sst_miss_count); // no_io + } else { + ASSERT_TRUE(!iter->Valid()); + ASSERT_EQ(1, perf_context.bloom_sst_miss_count); + ASSERT_EQ(2, perf_context.bloom_sst_hit_count); + } } INSTANTIATE_TEST_CASE_P(BloomStatsTestWithParam, BloomStatsTestWithParam, - ::testing::Values(std::make_tuple(true, true), - std::make_tuple(true, false), - std::make_tuple(false, false))); + ::testing::Values(std::make_tuple(true, true, false), + std::make_tuple(true, false, false), + std::make_tuple(true, false, true), + std::make_tuple(false, false, + false))); namespace { void PrefixScanInit(DBBloomFilterTest* dbtest) { diff --git a/db/db_test2.cc b/db/db_test2.cc index 109a550ac..bd22ba228 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1221,7 +1221,7 @@ class PinL0IndexAndFilterBlocksTest : public DBTestBase, PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {} virtual void SetUp() override { infinite_max_files_ = GetParam(); } - void CreateTwoLevels(Options* options) { + void CreateTwoLevels(Options* options, bool close_afterwards) { if (infinite_max_files_) { options->max_open_files = -1; } @@ -1249,6 +1249,9 @@ class PinL0IndexAndFilterBlocksTest : public DBTestBase, Put(1, "z2", "end2"); ASSERT_OK(Flush(1)); + if (close_afterwards) { + Close(); // This ensures that there is no ref to block cache entries + } table_options.block_cache->EraseUnRefEntries(); } @@ -1303,7 +1306,7 @@ TEST_P(PinL0IndexAndFilterBlocksTest, TEST_P(PinL0IndexAndFilterBlocksTest, MultiLevelIndexAndFilterBlocksCachedWithPinning) { Options options = CurrentOptions(); - PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options); + PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, false); // get base cache values uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS); uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT); @@ -1332,7 +1335,10 @@ TEST_P(PinL0IndexAndFilterBlocksTest, TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) { Options options = CurrentOptions(); - PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options); + // This ensures that db does not ref anything in the block cache, so + // EraseUnRefEntries could clear them up. + bool close_afterwards = true; + PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options, close_afterwards); // Get base cache values uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS); diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 4f476d76a..a41a76f24 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -230,6 +230,8 @@ bool DBTestBase::ChangeFilterOptions() { option_config_ = kFilter; } else if (option_config_ == kFilter) { option_config_ = kFullFilterWithNewTableReaderForCompactions; + } else if (option_config_ == kFullFilterWithNewTableReaderForCompactions) { + option_config_ = kPartitionedFilterWithNewTableReaderForCompactions; } else { return false; } @@ -325,6 +327,14 @@ Options DBTestBase::CurrentOptions( options.new_table_reader_for_compaction_inputs = true; options.compaction_readahead_size = 10 * 1024 * 1024; break; + case kPartitionedFilterWithNewTableReaderForCompactions: + table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); + table_options.partition_filters = true; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + options.new_table_reader_for_compaction_inputs = true; + options.compaction_readahead_size = 10 * 1024 * 1024; + break; case kUncompressed: options.compression = kNoCompression; break; @@ -426,6 +436,8 @@ Options DBTestBase::CurrentOptions( if (options_override.filter_policy) { table_options.filter_policy = options_override.filter_policy; + table_options.partition_filters = options_override.partition_filters; + table_options.index_per_partition = options_override.index_per_partition; } if (set_block_based_table_factory) { options.table_factory.reset(NewBlockBasedTableFactory(table_options)); diff --git a/db/db_test_util.h b/db/db_test_util.h index 050b8a97a..4fb26d232 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -109,6 +109,11 @@ class AtomicCounter { struct OptionsOverride { std::shared_ptr filter_policy = nullptr; + // These will be used only if filter_policy is set + bool partition_filters = false; + uint64_t index_per_partition = 1024; + BlockBasedTableOptions::IndexType index_type = + BlockBasedTableOptions::IndexType::kBinarySearch; // Used as a bit mask of individual enums in which to skip an XF test point int skip_policy = 0; @@ -617,6 +622,7 @@ class DBTestBase : public testing::Test { kUniversalSubcompactions = 32, kBlockBasedTableWithIndexRestartInterval = 33, kBlockBasedTableWithPartitionedIndex = 34, + kPartitionedFilterWithNewTableReaderForCompactions = 35, }; int option_config_; diff --git a/table/block_based_filter_block.cc b/table/block_based_filter_block.cc index ab2b8085e..343ac9aac 100644 --- a/table/block_based_filter_block.cc +++ b/table/block_based_filter_block.cc @@ -184,8 +184,9 @@ BlockBasedFilterBlockReader::BlockBasedFilterBlockReader( num_ = (n - 5 - last_word) / 4; } -bool BlockBasedFilterBlockReader::KeyMayMatch(const Slice& key, - uint64_t block_offset) { +bool BlockBasedFilterBlockReader::KeyMayMatch( + const Slice& key, uint64_t block_offset, const bool no_io, + const Slice* const const_ikey_ptr) { assert(block_offset != kNotValid); if (!whole_key_filtering_) { return true; @@ -193,8 +194,9 @@ bool BlockBasedFilterBlockReader::KeyMayMatch(const Slice& key, return MayMatch(key, block_offset); } -bool BlockBasedFilterBlockReader::PrefixMayMatch(const Slice& prefix, - uint64_t block_offset) { +bool BlockBasedFilterBlockReader::PrefixMayMatch( + const Slice& prefix, uint64_t block_offset, const bool no_io, + const Slice* const const_ikey_ptr) { assert(block_offset != kNotValid); if (!prefix_extractor_) { return true; diff --git a/table/block_based_filter_block.h b/table/block_based_filter_block.h index 666831970..a5b7dfdf8 100644 --- a/table/block_based_filter_block.h +++ b/table/block_based_filter_block.h @@ -81,10 +81,14 @@ class BlockBasedFilterBlockReader : public FilterBlockReader { bool whole_key_filtering, BlockContents&& contents, Statistics* statistics); virtual bool IsBlockBased() override { return true; } - virtual bool KeyMayMatch(const Slice& key, - uint64_t block_offset = kNotValid) override; - virtual bool PrefixMayMatch(const Slice& prefix, - uint64_t block_offset = kNotValid) override; + virtual bool KeyMayMatch( + const Slice& key, uint64_t block_offset = kNotValid, + const bool no_io = false, + const Slice* const const_ikey_ptr = nullptr) override; + virtual bool PrefixMayMatch( + const Slice& prefix, uint64_t block_offset = kNotValid, + const bool no_io = false, + const Slice* const const_ikey_ptr = nullptr) override; virtual size_t ApproximateMemoryUsage() const override; // convert this object to a human readable form diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index b8b09d770..eebe91d6a 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -37,6 +37,7 @@ #include "table/get_context.h" #include "table/internal_iterator.h" #include "table/meta_blocks.h" +#include "table/partitioned_filter_block.h" #include "table/persistent_cache_helper.h" #include "table/sst_file_writer_collectors.h" #include "table/two_level_iterator.h" @@ -149,7 +150,7 @@ Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, } // namespace // Index that allows binary search lookup in a two-level index structure. -class PartitionIndexReader : public IndexReader { +class PartitionIndexReader : public IndexReader, public Cleanable { public: // Read the partition index from the file and create an instance for // `PartitionIndexReader`. @@ -159,7 +160,8 @@ class PartitionIndexReader : public IndexReader { const Footer& footer, const BlockHandle& index_handle, const ImmutableCFOptions& ioptions, const Comparator* comparator, IndexReader** index_reader, - const PersistentCacheOptions& cache_options) { + const PersistentCacheOptions& cache_options, + const int level) { std::unique_ptr index_block; auto s = ReadBlockFromFile( file, footer, ReadOptions(), index_handle, &index_block, ioptions, @@ -167,8 +169,9 @@ class PartitionIndexReader : public IndexReader { kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */); if (s.ok()) { - *index_reader = new PartitionIndexReader( - table, comparator, std::move(index_block), ioptions.statistics); + *index_reader = + new PartitionIndexReader(table, comparator, std::move(index_block), + ioptions.statistics, level); } return s; @@ -177,10 +180,25 @@ class PartitionIndexReader : public IndexReader { // return a two-level iterator: first level is on the partition index virtual InternalIterator* NewIterator(BlockIter* iter = nullptr, bool dont_care = true) override { + // Filters are already checked before seeking the index + const bool skip_filters = true; + const bool is_index = true; + Cleanable* block_cache_cleaner = nullptr; + const bool pin_cached_indexes = + level_ == 0 && + table_->rep_->table_options.pin_l0_filter_and_index_blocks_in_cache; + if (pin_cached_indexes) { + // Keep partition indexes into the cache as long as the partition index + // reader object is alive + block_cache_cleaner = this; + } return NewTwoLevelIterator( - new BlockBasedTable::BlockEntryIteratorState(table_, ReadOptions(), - false), - index_block_->NewIterator(comparator_, iter, true)); + new BlockBasedTable::BlockEntryIteratorState( + table_, ReadOptions(), skip_filters, is_index, block_cache_cleaner), + index_block_->NewIterator(comparator_, nullptr, true)); + // TODO(myabandeh): Update TwoLevelIterator to be able to make use of + // on-stack + // BlockIter while the state is on heap } virtual size_t size() const override { return index_block_->size(); } @@ -195,14 +213,17 @@ class PartitionIndexReader : public IndexReader { private: PartitionIndexReader(BlockBasedTable* table, const Comparator* comparator, - std::unique_ptr&& index_block, Statistics* stats) + std::unique_ptr&& index_block, Statistics* stats, + const int level) : IndexReader(comparator, stats), table_(table), - index_block_(std::move(index_block)) { + index_block_(std::move(index_block)), + level_(level) { assert(index_block_ != nullptr); } BlockBasedTable* table_; std::unique_ptr index_block_; + int level_; }; // Index that allows binary search lookup for the first key of each block. @@ -555,14 +576,28 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // Find filter handle and filter type if (rep->filter_policy) { - for (auto prefix : {kFullFilterBlockPrefix, kFilterBlockPrefix}) { + for (auto filter_type : + {Rep::FilterType::kFullFilter, Rep::FilterType::kPartitionedFilter, + Rep::FilterType::kBlockFilter}) { + std::string prefix; + switch (filter_type) { + case Rep::FilterType::kFullFilter: + prefix = kFullFilterBlockPrefix; + break; + case Rep::FilterType::kPartitionedFilter: + prefix = kPartitionedFilterBlockPrefix; + break; + case Rep::FilterType::kBlockFilter: + prefix = kFilterBlockPrefix; + break; + default: + assert(0); + } std::string filter_block_key = prefix; filter_block_key.append(rep->filter_policy->Name()); if (FindMetaBlock(meta_iter.get(), filter_block_key, &rep->filter_handle) .ok()) { - rep->filter_type = (prefix == kFullFilterBlockPrefix) - ? Rep::FilterType::kFullFilter - : Rep::FilterType::kBlockFilter; + rep->filter_type = filter_type; break; } } @@ -697,6 +732,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache && level == 0) { rep->filter_entry = filter_entry; + rep->filter_entry.value->SetLevel(level); } else { filter_entry.Release(table_options.block_cache.get()); } @@ -707,14 +743,19 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // pre-load these blocks, which will kept in member variables in Rep // and with a same life-time as this table object. IndexReader* index_reader = nullptr; - s = new_table->CreateIndexReader(&index_reader, meta_iter.get()); + s = new_table->CreateIndexReader(&index_reader, meta_iter.get(), level); if (s.ok()) { rep->index_reader.reset(index_reader); // Set filter block if (rep->filter_policy) { - rep->filter.reset(new_table->ReadFilter(rep)); + const bool is_a_filter_partition = true; + rep->filter.reset( + new_table->ReadFilter(rep->filter_handle, !is_a_filter_partition)); + if (rep->filter.get()) { + rep->filter->SetLevel(level); + } } } else { delete index_reader; @@ -798,7 +839,8 @@ Status BlockBasedTable::GetDataBlockFromCache( Cache* block_cache, Cache* block_cache_compressed, const ImmutableCFOptions& ioptions, const ReadOptions& read_options, BlockBasedTable::CachableEntry* block, uint32_t format_version, - const Slice& compression_dict, size_t read_amp_bytes_per_bit) { + const Slice& compression_dict, size_t read_amp_bytes_per_bit, + bool is_index) { Status s; Block* compressed_block = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr; @@ -806,9 +848,10 @@ Status BlockBasedTable::GetDataBlockFromCache( // Lookup uncompressed cache first if (block_cache != nullptr) { - block->cache_handle = - GetEntryFromCache(block_cache, block_cache_key, BLOCK_CACHE_DATA_MISS, - BLOCK_CACHE_DATA_HIT, statistics); + block->cache_handle = GetEntryFromCache( + block_cache, block_cache_key, + is_index ? BLOCK_CACHE_INDEX_MISS : BLOCK_CACHE_DATA_MISS, + is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT, statistics); if (block->cache_handle != nullptr) { block->value = reinterpret_cast(block_cache->Value(block->cache_handle)); @@ -860,9 +903,15 @@ Status BlockBasedTable::GetDataBlockFromCache( &DeleteCachedEntry, &(block->cache_handle)); if (s.ok()) { RecordTick(statistics, BLOCK_CACHE_ADD); - RecordTick(statistics, BLOCK_CACHE_DATA_ADD); - RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, - block->value->usable_size()); + if (is_index) { + RecordTick(statistics, BLOCK_CACHE_INDEX_ADD); + RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, + block->value->usable_size()); + } else { + RecordTick(statistics, BLOCK_CACHE_DATA_ADD); + RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, + block->value->usable_size()); + } RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, block->value->usable_size()); } else { @@ -883,7 +932,8 @@ Status BlockBasedTable::PutDataBlockToCache( Cache* block_cache, Cache* block_cache_compressed, const ReadOptions& read_options, const ImmutableCFOptions& ioptions, CachableEntry* block, Block* raw_block, uint32_t format_version, - const Slice& compression_dict, size_t read_amp_bytes_per_bit) { + const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index, + Cache::Priority priority) { assert(raw_block->compression_type() == kNoCompression || block_cache_compressed != nullptr); @@ -929,15 +979,21 @@ Status BlockBasedTable::PutDataBlockToCache( // insert into uncompressed block cache assert((block->value->compression_type() == kNoCompression)); if (block_cache != nullptr && block->value->cachable()) { - s = block_cache->Insert(block_cache_key, block->value, - block->value->usable_size(), - &DeleteCachedEntry, &(block->cache_handle)); + s = block_cache->Insert( + block_cache_key, block->value, block->value->usable_size(), + &DeleteCachedEntry, &(block->cache_handle), priority); if (s.ok()) { assert(block->cache_handle != nullptr); RecordTick(statistics, BLOCK_CACHE_ADD); - RecordTick(statistics, BLOCK_CACHE_DATA_ADD); - RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, - block->value->usable_size()); + if (is_index) { + RecordTick(statistics, BLOCK_CACHE_INDEX_ADD); + RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, + block->value->usable_size()); + } else { + RecordTick(statistics, BLOCK_CACHE_DATA_ADD); + RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, + block->value->usable_size()); + } RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, block->value->usable_size()); assert(reinterpret_cast( @@ -952,7 +1008,9 @@ Status BlockBasedTable::PutDataBlockToCache( return s; } -FilterBlockReader* BlockBasedTable::ReadFilter(Rep* rep) const { +FilterBlockReader* BlockBasedTable::ReadFilter( + const BlockHandle& filter_handle, const bool is_a_filter_partition) const { + auto& rep = rep_; // TODO: We might want to unify with ReadBlockFromFile() if we start // requiring checksum verification in Table::Open. if (rep->filter_type == Rep::FilterType::kNoFilter) { @@ -960,7 +1018,7 @@ FilterBlockReader* BlockBasedTable::ReadFilter(Rep* rep) const { } BlockContents block; if (!ReadBlockContents(rep->file.get(), rep->footer, ReadOptions(), - rep->filter_handle, &block, rep->ioptions, + filter_handle, &block, rep->ioptions, false /* decompress */, Slice() /*compression dict*/, rep->persistent_cache_options) .ok()) { @@ -970,35 +1028,60 @@ FilterBlockReader* BlockBasedTable::ReadFilter(Rep* rep) const { assert(rep->filter_policy); - if (rep->filter_type == Rep::FilterType::kBlockFilter) { - return new BlockBasedFilterBlockReader( - rep->prefix_filtering ? rep->ioptions.prefix_extractor : nullptr, - rep->table_options, rep->whole_key_filtering, std::move(block), - rep->ioptions.statistics); - } else if (rep->filter_type == Rep::FilterType::kFullFilter) { - auto filter_bits_reader = - rep->filter_policy->GetFilterBitsReader(block.data); - if (filter_bits_reader != nullptr) { + auto filter_type = rep->filter_type; + if (rep->filter_type == Rep::FilterType::kPartitionedFilter && + is_a_filter_partition) { + filter_type = Rep::FilterType::kFullFilter; + } + + switch (filter_type) { + case Rep::FilterType::kPartitionedFilter: { + return new PartitionedFilterBlockReader( + rep->prefix_filtering ? rep->ioptions.prefix_extractor : nullptr, + rep->whole_key_filtering, std::move(block), nullptr, + rep->ioptions.statistics, rep->internal_comparator, this); + } + + case Rep::FilterType::kBlockFilter: + return new BlockBasedFilterBlockReader( + rep->prefix_filtering ? rep->ioptions.prefix_extractor : nullptr, + rep->table_options, rep->whole_key_filtering, std::move(block), + rep->ioptions.statistics); + + case Rep::FilterType::kFullFilter: { + auto filter_bits_reader = + rep->filter_policy->GetFilterBitsReader(block.data); + assert(filter_bits_reader != nullptr); return new FullFilterBlockReader( rep->prefix_filtering ? rep->ioptions.prefix_extractor : nullptr, rep->whole_key_filtering, std::move(block), filter_bits_reader, rep->ioptions.statistics); } - } - // filter_type is either kNoFilter (exited the function at the first if), - // kBlockFilter or kFullFilter. there is no way for the execution to come here - assert(false); - return nullptr; + default: + // filter_type is either kNoFilter (exited the function at the first if), + // or it must be covered in this switch block + assert(false); + return nullptr; + } } BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( bool no_io) const { + const BlockHandle& filter_blk_handle = rep_->filter_handle; + const bool is_a_filter_partition = true; + return GetFilter(filter_blk_handle, !is_a_filter_partition, no_io); +} + +BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( + const BlockHandle& filter_blk_handle, const bool is_a_filter_partition, + bool no_io) const { // If cache_index_and_filter_blocks is false, filter should be pre-populated. // We will return rep_->filter anyway. rep_->filter can be nullptr if filter // read fails at Open() time. We don't want to reload again since it will // most probably fail again. - if (!rep_->table_options.cache_index_and_filter_blocks) { + if (!is_a_filter_partition && + !rep_->table_options.cache_index_and_filter_blocks) { return {rep_->filter.get(), nullptr /* cache handle */}; } @@ -1008,8 +1091,7 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( return {nullptr /* filter */, nullptr /* cache handle */}; } - // we have a pinned filter block - if (rep_->filter_entry.IsSet()) { + if (!is_a_filter_partition && rep_->filter_entry.IsSet()) { return rep_->filter_entry; } @@ -1018,8 +1100,7 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( // Fetching from the cache char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, - rep_->footer.metaindex_handle(), - cache_key); + filter_blk_handle, cache_key); Statistics* statistics = rep_->ioptions.statistics; auto cache_handle = @@ -1034,7 +1115,7 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( // Do not invoke any io. return CachableEntry(); } else { - filter = ReadFilter(rep_); + filter = ReadFilter(filter_blk_handle, is_a_filter_partition); if (filter != nullptr) { assert(filter->size() > 0); Status s = block_cache->Insert( @@ -1074,7 +1155,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator( PERF_TIMER_GUARD(read_index_block_nanos); - bool no_io = read_options.read_tier == kBlockCacheTier; + const bool no_io = read_options.read_tier == kBlockCacheTier; Cache* block_cache = rep_->table_options.block_cache.get(); char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; auto key = @@ -1153,29 +1234,36 @@ InternalIterator* BlockBasedTable::NewIndexIterator( return iter; } +InternalIterator* BlockBasedTable::NewDataBlockIterator( + Rep* rep, const ReadOptions& ro, const Slice& index_value, + BlockIter* input_iter, bool is_index) { + BlockHandle handle; + Slice input = index_value; + // We intentionally allow extra stuff in index_value so that we + // can add more features in the future. + Status s = handle.DecodeFrom(&input); + return NewDataBlockIterator(rep, ro, handle, input_iter, is_index, s); +} + // Convert an index iterator value (i.e., an encoded BlockHandle) // into an iterator over the contents of the corresponding block. // If input_iter is null, new a iterator // If input_iter is not null, update this iter and return it InternalIterator* BlockBasedTable::NewDataBlockIterator( - Rep* rep, const ReadOptions& ro, const Slice& index_value, - BlockIter* input_iter) { + Rep* rep, const ReadOptions& ro, const BlockHandle& handle, + BlockIter* input_iter, bool is_index, Status s) { PERF_TIMER_GUARD(new_table_block_iter_nanos); const bool no_io = (ro.read_tier == kBlockCacheTier); Cache* block_cache = rep->table_options.block_cache.get(); CachableEntry block; - BlockHandle handle; - Slice input = index_value; - // We intentionally allow extra stuff in index_value so that we - // can add more features in the future. - Status s = handle.DecodeFrom(&input); Slice compression_dict; if (s.ok()) { if (rep->compression_dict_block) { compression_dict = rep->compression_dict_block->data; } - s = MaybeLoadDataBlockToCache(rep, ro, handle, compression_dict, &block); + s = MaybeLoadDataBlockToCache(rep, ro, handle, compression_dict, &block, + is_index); } // Didn't get any data from block caches. @@ -1224,7 +1312,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( Status BlockBasedTable::MaybeLoadDataBlockToCache( Rep* rep, const ReadOptions& ro, const BlockHandle& handle, - Slice compression_dict, CachableEntry* block_entry) { + Slice compression_dict, CachableEntry* block_entry, bool is_index) { const bool no_io = (ro.read_tier == kBlockCacheTier); Cache* block_cache = rep->table_options.block_cache.get(); Cache* block_cache_compressed = @@ -1254,7 +1342,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( s = GetDataBlockFromCache( key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro, block_entry, rep->table_options.format_version, compression_dict, - rep->table_options.read_amp_bytes_per_bit); + rep->table_options.read_amp_bytes_per_bit, is_index); if (block_entry->value == nullptr && !no_io && ro.fill_cache) { std::unique_ptr raw_block; @@ -1271,7 +1359,13 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( s = PutDataBlockToCache( key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions, block_entry, raw_block.release(), rep->table_options.format_version, - compression_dict, rep->table_options.read_amp_bytes_per_bit); + compression_dict, rep->table_options.read_amp_bytes_per_bit, + is_index, + is_index && + rep->table_options + .cache_index_and_filter_blocks_with_high_priority + ? Cache::Priority::HIGH + : Cache::Priority::LOW); } } } @@ -1279,17 +1373,39 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( } BlockBasedTable::BlockEntryIteratorState::BlockEntryIteratorState( - BlockBasedTable* table, const ReadOptions& read_options, bool skip_filters) + BlockBasedTable* table, const ReadOptions& read_options, bool skip_filters, + bool is_index, Cleanable* block_cache_cleaner) : TwoLevelIteratorState(table->rep_->ioptions.prefix_extractor != nullptr), table_(table), read_options_(read_options), - skip_filters_(skip_filters) {} + skip_filters_(skip_filters), + is_index_(is_index), + block_cache_cleaner_(block_cache_cleaner) {} InternalIterator* BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator( const Slice& index_value) { // Return a block iterator on the index partition - return NewDataBlockIterator(table_->rep_, read_options_, index_value); + BlockHandle handle; + Slice input = index_value; + Status s = handle.DecodeFrom(&input); + auto iter = NewDataBlockIterator(table_->rep_, read_options_, handle, nullptr, + is_index_, s); + if (block_cache_cleaner_) { + uint64_t offset = handle.offset(); + { + ReadLock rl(&cleaner_mu); + if (cleaner_set.find(offset) != cleaner_set.end()) { + // already have a refernce to the block cache objects + return iter; + } + } + WriteLock wl(&cleaner_mu); + cleaner_set.insert(offset); + // Keep the data into cache until the cleaner cleansup + iter->DelegateCleanupsTo(block_cache_cleaner_); + } + return iter; } bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch( @@ -1325,25 +1441,29 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) { return true; } auto prefix = rep_->ioptions.prefix_extractor->Transform(user_key); - InternalKey internal_key_prefix(prefix, kMaxSequenceNumber, kTypeValue); - auto internal_prefix = internal_key_prefix.Encode(); bool may_match = true; Status s; - // To prevent any io operation in this method, we set `read_tier` to make - // sure we always read index or filter only when they have already been - // loaded to memory. - ReadOptions no_io_read_options; - no_io_read_options.read_tier = kBlockCacheTier; - // First, try check with full filter - auto filter_entry = GetFilter(true /* no io */); + const bool no_io = true; + auto filter_entry = GetFilter(no_io); FilterBlockReader* filter = filter_entry.value; if (filter != nullptr) { if (!filter->IsBlockBased()) { - may_match = filter->PrefixMayMatch(prefix); + const Slice* const const_ikey_ptr = &internal_key; + may_match = + filter->PrefixMayMatch(prefix, kNotValid, no_io, const_ikey_ptr); } else { + InternalKey internal_key_prefix(prefix, kMaxSequenceNumber, kTypeValue); + auto internal_prefix = internal_key_prefix.Encode(); + + // To prevent any io operation in this method, we set `read_tier` to make + // sure we always read index or filter only when they have already been + // loaded to memory. + ReadOptions no_io_read_options; + no_io_read_options.read_tier = kBlockCacheTier; + // Then, try find it within each block unique_ptr iiter(NewIndexIterator(no_io_read_options)); iiter->Seek(internal_prefix); @@ -1438,20 +1558,23 @@ InternalIterator* BlockBasedTable::NewRangeTombstoneIterator( bool BlockBasedTable::FullFilterKeyMayMatch(const ReadOptions& read_options, FilterBlockReader* filter, - const Slice& internal_key) const { + const Slice& internal_key, + const bool no_io) const { if (filter == nullptr || filter->IsBlockBased()) { return true; } Slice user_key = ExtractUserKey(internal_key); + const Slice* const const_ikey_ptr = &internal_key; if (filter->whole_key_filtering()) { - return filter->KeyMayMatch(user_key); + return filter->KeyMayMatch(user_key, kNotValid, no_io, const_ikey_ptr); } if (!read_options.total_order_seek && rep_->ioptions.prefix_extractor && rep_->table_properties->prefix_extractor_name.compare( rep_->ioptions.prefix_extractor->Name()) == 0 && rep_->ioptions.prefix_extractor->InDomain(user_key) && !filter->PrefixMayMatch( - rep_->ioptions.prefix_extractor->Transform(user_key))) { + rep_->ioptions.prefix_extractor->Transform(user_key), kNotValid, + false, const_ikey_ptr)) { return false; } return true; @@ -1460,6 +1583,7 @@ bool BlockBasedTable::FullFilterKeyMayMatch(const ReadOptions& read_options, Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, GetContext* get_context, bool skip_filters) { Status s; + const bool no_io = read_options.read_tier == kBlockCacheTier; CachableEntry filter_entry; if (!skip_filters) { filter_entry = GetFilter(read_options.read_tier == kBlockCacheTier); @@ -1468,14 +1592,14 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, // First check the full filter // If full filter not useful, Then go into each block - if (!FullFilterKeyMayMatch(read_options, filter, key)) { + if (!FullFilterKeyMayMatch(read_options, filter, key, no_io)) { RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL); } else { BlockIter iiter_on_stack; auto iiter = NewIndexIterator(read_options, &iiter_on_stack); std::unique_ptr iiter_unique_ptr; if (iiter != &iiter_on_stack) { - iiter_unique_ptr = std::unique_ptr(iiter); + iiter_unique_ptr.reset(iiter); } bool done = false; @@ -1486,7 +1610,7 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, bool not_exist_in_filter = filter != nullptr && filter->IsBlockBased() == true && handle.DecodeFrom(&handle_value).ok() && - !filter->KeyMayMatch(ExtractUserKey(key), handle.offset()); + !filter->KeyMayMatch(ExtractUserKey(key), handle.offset(), no_io); if (not_exist_in_filter) { // Not found @@ -1525,6 +1649,10 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, } s = biter.status(); } + if (done) { + // Avoid the extra Next which is expensive in two-level indexes + break; + } } if (s.ok()) { s = iiter->status(); @@ -1631,7 +1759,8 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, // 4. internal_comparator // 5. index_type Status BlockBasedTable::CreateIndexReader( - IndexReader** index_reader, InternalIterator* preloaded_meta_index_iter) { + IndexReader** index_reader, InternalIterator* preloaded_meta_index_iter, + int level) { // Some old version of block-based tables don't have index type present in // table properties. If that's the case we can safely use the kBinarySearch. auto index_type_on_file = BlockBasedTableOptions::kBinarySearch; @@ -1660,7 +1789,7 @@ Status BlockBasedTable::CreateIndexReader( case BlockBasedTableOptions::kTwoLevelIndexSearch: { return PartitionIndexReader::Create( this, file, footer, footer.index_handle(), rep_->ioptions, comparator, - index_reader, rep_->persistent_cache_options); + index_reader, rep_->persistent_cache_options, level); } case BlockBasedTableOptions::kBinarySearch: { return BinarySearchIndexReader::Create( diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index db6cb4911..0305f2076 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -193,17 +194,29 @@ class BlockBasedTable : public TableReader { class BlockEntryIteratorState; - private: + friend class PartitionIndexReader; + + protected: template struct CachableEntry; struct Rep; Rep* rep_; + explicit BlockBasedTable(Rep* rep) + : rep_(rep), compaction_optimized_(false) {} + + private: bool compaction_optimized_; // input_iter: if it is not null, update this one and return it as Iterator - static InternalIterator* NewDataBlockIterator( - Rep* rep, const ReadOptions& ro, const Slice& index_value, - BlockIter* input_iter = nullptr); + static InternalIterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro, + const Slice& index_value, + BlockIter* input_iter = nullptr, + bool is_index = false); + static InternalIterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro, + const BlockHandle& block_hanlde, + BlockIter* input_iter = nullptr, + bool is_index = false, + Status s = Status()); // If block cache enabled (compressed or uncompressed), looks for the block // identified by handle in (1) uncompressed cache, (2) compressed cache, and // then (3) file. If found, inserts into the cache(s) that were searched @@ -213,14 +226,19 @@ class BlockBasedTable : public TableReader { // @param block_entry value is set to the uncompressed block if found. If // in uncompressed block cache, also sets cache_handle to reference that // block. - static Status MaybeLoadDataBlockToCache( - Rep* rep, const ReadOptions& ro, const BlockHandle& handle, - Slice compression_dict, CachableEntry* block_entry); + static Status MaybeLoadDataBlockToCache(Rep* rep, const ReadOptions& ro, + const BlockHandle& handle, + Slice compression_dict, + CachableEntry* block_entry, + bool is_index = false); // For the following two functions: // if `no_io == true`, we will not try to read filter/index from sst file // were they not present in cache yet. CachableEntry GetFilter(bool no_io = false) const; + virtual CachableEntry GetFilter( + const BlockHandle& filter_blk_handle, const bool is_a_filter_partition, + bool no_io) const; // Get the iterator from the index reader. // If input_iter is not set, return new Iterator @@ -247,7 +265,8 @@ class BlockBasedTable : public TableReader { Cache* block_cache, Cache* block_cache_compressed, const ImmutableCFOptions& ioptions, const ReadOptions& read_options, BlockBasedTable::CachableEntry* block, uint32_t format_version, - const Slice& compression_dict, size_t read_amp_bytes_per_bit); + const Slice& compression_dict, size_t read_amp_bytes_per_bit, + bool is_index = false); // Put a raw block (maybe compressed) to the corresponding block caches. // This method will perform decompression against raw_block if needed and then @@ -264,7 +283,8 @@ class BlockBasedTable : public TableReader { Cache* block_cache, Cache* block_cache_compressed, const ReadOptions& read_options, const ImmutableCFOptions& ioptions, CachableEntry* block, Block* raw_block, uint32_t format_version, - const Slice& compression_dict, size_t read_amp_bytes_per_bit); + const Slice& compression_dict, size_t read_amp_bytes_per_bit, + bool is_index = false, Cache::Priority pri = Cache::Priority::LOW); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. @@ -280,24 +300,23 @@ class BlockBasedTable : public TableReader { // helps avoid re-reading meta index block if caller already created one. Status CreateIndexReader( IndexReader** index_reader, - InternalIterator* preloaded_meta_index_iter = nullptr); + InternalIterator* preloaded_meta_index_iter = nullptr, + const int level = -1); bool FullFilterKeyMayMatch(const ReadOptions& read_options, - FilterBlockReader* filter, - const Slice& user_key) const; + FilterBlockReader* filter, const Slice& user_key, + const bool no_io) const; // Read the meta block from sst. static Status ReadMetaBlock(Rep* rep, std::unique_ptr* meta_block, std::unique_ptr* iter); // Create the filter from the filter block. - FilterBlockReader* ReadFilter(Rep* rep) const; + FilterBlockReader* ReadFilter(const BlockHandle& filter_handle, + const bool is_a_filter_partition) const; static void SetupCacheKeyPrefix(Rep* rep, uint64_t file_size); - explicit BlockBasedTable(Rep* rep) - : rep_(rep), compaction_optimized_(false) {} - // Generate a cache key prefix from the file static void GenerateCachePrefix(Cache* cc, RandomAccessFile* file, char* buffer, size_t* size); @@ -313,13 +332,18 @@ class BlockBasedTable : public TableReader { // No copying allowed explicit BlockBasedTable(const TableReader&) = delete; void operator=(const TableReader&) = delete; + + friend class PartitionedFilterBlockReader; + friend class PartitionedFilterBlockTest; }; // Maitaning state of a two-level iteration on a partitioned index structure class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState { public: BlockEntryIteratorState(BlockBasedTable* table, - const ReadOptions& read_options, bool skip_filters); + const ReadOptions& read_options, bool skip_filters, + bool is_index = false, + Cleanable* block_cache_cleaner = nullptr); InternalIterator* NewSecondaryIterator(const Slice& index_value) override; bool PrefixMayMatch(const Slice& internal_key) override; @@ -328,6 +352,11 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState { BlockBasedTable* table_; const ReadOptions read_options_; bool skip_filters_; + // true if the 2nd level iterator is on indexes instead of on user data. + bool is_index_; + Cleanable* block_cache_cleaner_; + std::set cleaner_set; + port::RWMutex cleaner_mu; }; // CachableEntry represents the entries that *may* be fetched from block cache. diff --git a/table/filter_block.h b/table/filter_block.h index 788f145bb..dc2417571 100644 --- a/table/filter_block.h +++ b/table/filter_block.h @@ -82,16 +82,35 @@ class FilterBlockReader { virtual ~FilterBlockReader() {} virtual bool IsBlockBased() = 0; // If is blockbased filter - virtual bool KeyMayMatch(const Slice& key, - uint64_t block_offset = kNotValid) = 0; + /** + * If no_io is set, then it returns true if it cannot answer the query without + * reading data from disk. This is used in PartitionedFilterBlockReader to + * avoid reading partitions that are not in block cache already + * + * Normally filters are built on only the user keys and the InternalKey is not + * needed for a query. The index in PartitionedFilterBlockReader however is + * built upon InternalKey and must be provided via const_ikey_ptr when running + * queries. + */ + virtual bool KeyMayMatch(const Slice& key, uint64_t block_offset = kNotValid, + const bool no_io = false, + const Slice* const const_ikey_ptr = nullptr) = 0; + /** + * no_io and const_ikey_ptr here means the same as in KeyMayMatch + */ virtual bool PrefixMayMatch(const Slice& prefix, - uint64_t block_offset = kNotValid) = 0; + uint64_t block_offset = kNotValid, + const bool no_io = false, + const Slice* const const_ikey_ptr = nullptr) = 0; virtual size_t ApproximateMemoryUsage() const = 0; virtual size_t size() const { return size_; } virtual Statistics* statistics() const { return statistics_; } bool whole_key_filtering() const { return whole_key_filtering_; } + int GetLevel() const { return level_; } + void SetLevel(int level) { level_ = level; } + // convert this object to a human readable form virtual std::string ToString() const { std::string error_msg("Unsupported filter \n"); @@ -107,6 +126,7 @@ class FilterBlockReader { void operator=(const FilterBlockReader&); size_t size_; Statistics* statistics_; + int level_ = -1; }; } // namespace rocksdb diff --git a/table/full_filter_block.cc b/table/full_filter_block.cc index 08dc52c01..267057915 100644 --- a/table/full_filter_block.cc +++ b/table/full_filter_block.cc @@ -73,8 +73,9 @@ FullFilterBlockReader::FullFilterBlockReader( block_contents_ = std::move(contents); } -bool FullFilterBlockReader::KeyMayMatch(const Slice& key, - uint64_t block_offset) { +bool FullFilterBlockReader::KeyMayMatch(const Slice& key, uint64_t block_offset, + const bool no_io, + const Slice* const const_ikey_ptr) { assert(block_offset == kNotValid); if (!whole_key_filtering_) { return true; @@ -83,7 +84,9 @@ bool FullFilterBlockReader::KeyMayMatch(const Slice& key, } bool FullFilterBlockReader::PrefixMayMatch(const Slice& prefix, - uint64_t block_offset) { + uint64_t block_offset, + const bool no_io, + const Slice* const const_ikey_ptr) { assert(block_offset == kNotValid); if (!prefix_extractor_) { return true; diff --git a/table/full_filter_block.h b/table/full_filter_block.h index 66475b4d3..119bd26da 100644 --- a/table/full_filter_block.h +++ b/table/full_filter_block.h @@ -91,10 +91,14 @@ class FullFilterBlockReader : public FilterBlockReader { ~FullFilterBlockReader() {} virtual bool IsBlockBased() override { return false; } - virtual bool KeyMayMatch(const Slice& key, - uint64_t block_offset = kNotValid) override; - virtual bool PrefixMayMatch(const Slice& prefix, - uint64_t block_offset = kNotValid) override; + virtual bool KeyMayMatch( + const Slice& key, uint64_t block_offset = kNotValid, + const bool no_io = false, + const Slice* const const_ikey_ptr = nullptr) override; + virtual bool PrefixMayMatch( + const Slice& prefix, uint64_t block_offset = kNotValid, + const bool no_io = false, + const Slice* const const_ikey_ptr = nullptr) override; virtual size_t ApproximateMemoryUsage() const override; private: diff --git a/table/partitioned_filter_block.cc b/table/partitioned_filter_block.cc index 28e52dd16..97e3d9884 100644 --- a/table/partitioned_filter_block.cc +++ b/table/partitioned_filter_block.cc @@ -5,7 +5,12 @@ #include "table/partitioned_filter_block.h" +#include + #include "port/port.h" +#include "rocksdb/filter_policy.h" +#include "table/block.h" +#include "table/block_based_table_reader.h" #include "util/coding.h" namespace rocksdb { @@ -67,4 +72,149 @@ Slice PartitionedFilterBlockBuilder::Finish( } } +PartitionedFilterBlockReader::PartitionedFilterBlockReader( + const SliceTransform* prefix_extractor, bool _whole_key_filtering, + BlockContents&& contents, FilterBitsReader* filter_bits_reader, + Statistics* stats, const Comparator& comparator, + const BlockBasedTable* table) + : FilterBlockReader(contents.data.size(), stats, _whole_key_filtering), + prefix_extractor_(prefix_extractor), + comparator_(comparator), + table_(table) { + idx_on_fltr_blk_.reset(new Block(std::move(contents), + kDisableGlobalSequenceNumber, + 0 /* read_amp_bytes_per_bit */, stats)); +} + +PartitionedFilterBlockReader::~PartitionedFilterBlockReader() { + ReadLock rl(&mu_); + for (auto it = handle_list_.begin(); it != handle_list_.end(); ++it) { + table_->rep_->table_options.block_cache.get()->Release(*it); + } +} + +bool PartitionedFilterBlockReader::KeyMayMatch( + const Slice& key, uint64_t block_offset, const bool no_io, + const Slice* const const_ikey_ptr) { + assert(const_ikey_ptr != nullptr); + assert(block_offset == kNotValid); + if (!whole_key_filtering_) { + return true; + } + if (UNLIKELY(idx_on_fltr_blk_->size() == 0)) { + return true; + } + // This is the user key vs. the full key in the partition index. We assume + // that user key <= full key + auto filter_handle = GetFilterPartitionHandle(*const_ikey_ptr); + if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range + return false; + } + bool cached = false; + auto filter_partition = GetFilterPartition(&filter_handle, no_io, &cached); + if (UNLIKELY(!filter_partition.value)) { + return true; + } + auto res = filter_partition.value->KeyMayMatch(key, block_offset, no_io); + if (cached) { + return res; + } + if (LIKELY(filter_partition.IsSet())) { + filter_partition.Release(table_->rep_->table_options.block_cache.get()); + } else { + delete filter_partition.value; + } + return res; +} + +bool PartitionedFilterBlockReader::PrefixMayMatch( + const Slice& prefix, uint64_t block_offset, const bool no_io, + const Slice* const const_ikey_ptr) { + assert(const_ikey_ptr != nullptr); + assert(block_offset == kNotValid); + if (!prefix_extractor_) { + return true; + } + if (UNLIKELY(idx_on_fltr_blk_->size() == 0)) { + return true; + } + auto filter_handle = GetFilterPartitionHandle(*const_ikey_ptr); + if (UNLIKELY(filter_handle.size() == 0)) { // prefix is out of range + return false; + } + bool cached = false; + auto filter_partition = GetFilterPartition(&filter_handle, no_io, &cached); + if (UNLIKELY(!filter_partition.value)) { + return true; + } + auto res = filter_partition.value->PrefixMayMatch(prefix, kNotValid, no_io); + if (cached) { + return res; + } + if (LIKELY(filter_partition.IsSet())) { + filter_partition.Release(table_->rep_->table_options.block_cache.get()); + } else { + delete filter_partition.value; + } + return res; +} + +Slice PartitionedFilterBlockReader::GetFilterPartitionHandle( + const Slice& entry) { + BlockIter iter; + idx_on_fltr_blk_->NewIterator(&comparator_, &iter, true); + iter.Seek(entry); + if (UNLIKELY(!iter.Valid())) { + return Slice(); + } + assert(iter.Valid()); + Slice handle_value = iter.value(); + return handle_value; +} + +BlockBasedTable::CachableEntry +PartitionedFilterBlockReader::GetFilterPartition(Slice* handle_value, + const bool no_io, + bool* cached) { + BlockHandle fltr_blk_handle; + auto s = fltr_blk_handle.DecodeFrom(handle_value); + assert(s.ok()); + const bool is_a_filter_partition = true; + auto block_cache = table_->rep_->table_options.block_cache.get(); + if (LIKELY(block_cache != nullptr)) { + bool pin_cached_filters = + GetLevel() == 0 && + table_->rep_->table_options.pin_l0_filter_and_index_blocks_in_cache; + if (pin_cached_filters) { + ReadLock rl(&mu_); + auto iter = filter_cache_.find(fltr_blk_handle.offset()); + if (iter != filter_cache_.end()) { + RecordTick(statistics(), BLOCK_CACHE_FILTER_HIT); + *cached = true; + return {iter->second, nullptr}; + } + } + auto filter = + table_->GetFilter(fltr_blk_handle, is_a_filter_partition, no_io); + if (pin_cached_filters && filter.IsSet()) { + WriteLock wl(&mu_); + std::pair pair(fltr_blk_handle.offset(), + filter.value); + auto succ = filter_cache_.insert(pair).second; + if (succ) { + handle_list_.push_back(filter.cache_handle); + } // Otherwise it is already inserted by a concurrent thread + *cached = true; + } + return filter; + } else { + auto filter = table_->ReadFilter(fltr_blk_handle, is_a_filter_partition); + return {filter, nullptr}; + } +} + +size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const { + return idx_on_fltr_blk_->size(); +} + } // namespace rocksdb diff --git a/table/partitioned_filter_block.h b/table/partitioned_filter_block.h index 680075c2b..2fe4957fa 100644 --- a/table/partitioned_filter_block.h +++ b/table/partitioned_filter_block.h @@ -7,14 +7,17 @@ #include #include -#include +#include #include "db/dbformat.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" +#include "table/block.h" +#include "table/block_based_table_reader.h" #include "table/full_filter_block.h" #include "table/index_builder.h" +#include "util/autovector.h" namespace rocksdb { @@ -49,4 +52,40 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder { PartitionedIndexBuilder* const p_index_builder_; }; +class PartitionedFilterBlockReader : public FilterBlockReader { + public: + explicit PartitionedFilterBlockReader(const SliceTransform* prefix_extractor, + bool whole_key_filtering, + BlockContents&& contents, + FilterBitsReader* filter_bits_reader, + Statistics* stats, + const Comparator& comparator, + const BlockBasedTable* table); + virtual ~PartitionedFilterBlockReader(); + + virtual bool IsBlockBased() override { return false; } + virtual bool KeyMayMatch( + const Slice& key, uint64_t block_offset = kNotValid, + const bool no_io = false, + const Slice* const const_ikey_ptr = nullptr) override; + virtual bool PrefixMayMatch( + const Slice& prefix, uint64_t block_offset = kNotValid, + const bool no_io = false, + const Slice* const const_ikey_ptr = nullptr) override; + virtual size_t ApproximateMemoryUsage() const override; + + private: + Slice GetFilterPartitionHandle(const Slice& entry); + BlockBasedTable::CachableEntry GetFilterPartition( + Slice* handle, const bool no_io, bool* cached); + + const SliceTransform* prefix_extractor_; + std::unique_ptr idx_on_fltr_blk_; + const Comparator& comparator_; + const BlockBasedTable* table_; + std::unordered_map filter_cache_; + autovector handle_list_; + port::RWMutex mu_; +}; + } // namespace rocksdb diff --git a/table/partitioned_filter_block_test.cc b/table/partitioned_filter_block_test.cc new file mode 100644 index 000000000..e4483014b --- /dev/null +++ b/table/partitioned_filter_block_test.cc @@ -0,0 +1,242 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include + +#include "rocksdb/filter_policy.h" + +#include "table/partitioned_filter_block.h" +#include "util/coding.h" +#include "util/hash.h" +#include "util/logging.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace rocksdb { + +std::map slices; + +class MockedBlockBasedTable : public BlockBasedTable { + public: + explicit MockedBlockBasedTable(Rep* rep) : BlockBasedTable(rep) {} + + virtual CachableEntry GetFilter( + const BlockHandle& filter_blk_handle, const bool is_a_filter_partition, + bool no_io) const override { + Slice slice = slices[filter_blk_handle.offset()]; + auto obj = new FullFilterBlockReader( + nullptr, true, BlockContents(slice, false, kNoCompression), + rep_->table_options.filter_policy->GetFilterBitsReader(slice), nullptr); + return {obj, nullptr}; + } +}; + +class PartitionedFilterBlockTest : public testing::Test { + public: + BlockBasedTableOptions table_options_; + InternalKeyComparator icomp = InternalKeyComparator(BytewiseComparator()); + + PartitionedFilterBlockTest() { + table_options_.filter_policy.reset(NewBloomFilterPolicy(10, false)); + table_options_.no_block_cache = true; // Otherwise BlockBasedTable::Close + // will access variable that are not + // initialized in our mocked version + } + + std::shared_ptr cache_; + ~PartitionedFilterBlockTest() {} + + const std::string keys[4] = {"afoo", "bar", "box", "hello"}; + const std::string missing_keys[2] = {"missing", "other"}; + + int last_offset = 10; + BlockHandle Write(const Slice& slice) { + BlockHandle bh(last_offset + 1, slice.size()); + slices[bh.offset()] = slice; + last_offset += bh.size(); + return bh; + } + + PartitionedIndexBuilder* NewIndexBuilder() { + return PartitionedIndexBuilder::CreateIndexBuilder(&icomp, table_options_); + } + + PartitionedFilterBlockBuilder* NewBuilder( + PartitionedIndexBuilder* const p_index_builder) { + return new PartitionedFilterBlockBuilder( + nullptr, table_options_.whole_key_filtering, + table_options_.filter_policy->GetFilterBitsBuilder(), + table_options_.index_block_restart_interval, p_index_builder); + } + + std::unique_ptr table; + + PartitionedFilterBlockReader* NewReader( + PartitionedFilterBlockBuilder* builder) { + BlockHandle bh; + Status status; + Slice slice; + do { + slice = builder->Finish(bh, &status); + bh = Write(slice); + } while (status.IsIncomplete()); + const Options options; + const ImmutableCFOptions ioptions(options); + const EnvOptions env_options; + table.reset(new MockedBlockBasedTable(new BlockBasedTable::Rep( + ioptions, env_options, table_options_, icomp, false))); + auto reader = new PartitionedFilterBlockReader( + nullptr, true, BlockContents(slice, false, kNoCompression), nullptr, + nullptr, *icomp.user_comparator(), table.get()); + return reader; + } + + void VerifyReader(PartitionedFilterBlockBuilder* builder, + bool empty = false) { + std::unique_ptr reader(NewReader(builder)); + // Querying added keys + const bool no_io = true; + for (auto key : keys) { + auto ikey = InternalKey(key, 0, ValueType::kTypeValue); + const Slice ikey_slice = Slice(*ikey.rep()); + ASSERT_TRUE(reader->KeyMayMatch(key, kNotValid, !no_io, &ikey_slice)); + } + { + // querying a key twice + auto ikey = InternalKey(keys[0], 0, ValueType::kTypeValue); + const Slice ikey_slice = Slice(*ikey.rep()); + ASSERT_TRUE(reader->KeyMayMatch(keys[0], kNotValid, !no_io, &ikey_slice)); + } + // querying missing keys + for (auto key : missing_keys) { + auto ikey = InternalKey(key, 0, ValueType::kTypeValue); + const Slice ikey_slice = Slice(*ikey.rep()); + if (empty) { + ASSERT_TRUE(reader->KeyMayMatch(key, kNotValid, !no_io, &ikey_slice)); + } else { + // assuming a good hash function + ASSERT_FALSE(reader->KeyMayMatch(key, kNotValid, !no_io, &ikey_slice)); + } + } + } + + void TestBlockPerKey() { + table_options_.index_per_partition = 1; + std::unique_ptr pib(NewIndexBuilder()); + std::unique_ptr builder( + NewBuilder(pib.get())); + int i = 0; + builder->Add(keys[i]); + CutABlock(pib.get(), keys[i], keys[i + 1]); + i++; + builder->Add(keys[i]); + CutABlock(pib.get(), keys[i], keys[i + 1]); + i++; + builder->Add(keys[i]); + builder->Add(keys[i]); + CutABlock(pib.get(), keys[i], keys[i + 1]); + i++; + builder->Add(keys[i]); + CutABlock(pib.get(), keys[i]); + + VerifyReader(builder.get()); + } + + void TestBlockPerTwoKeys() { + std::unique_ptr pib(NewIndexBuilder()); + std::unique_ptr builder( + NewBuilder(pib.get())); + int i = 0; + builder->Add(keys[i]); + i++; + builder->Add(keys[i]); + CutABlock(pib.get(), keys[i], keys[i + 1]); + i++; + builder->Add(keys[i]); + builder->Add(keys[i]); + i++; + builder->Add(keys[i]); + CutABlock(pib.get(), keys[i]); + + VerifyReader(builder.get()); + } + + void TestBlockPerAllKeys() { + std::unique_ptr pib(NewIndexBuilder()); + std::unique_ptr builder( + NewBuilder(pib.get())); + int i = 0; + builder->Add(keys[i]); + i++; + builder->Add(keys[i]); + i++; + builder->Add(keys[i]); + builder->Add(keys[i]); + i++; + builder->Add(keys[i]); + CutABlock(pib.get(), keys[i]); + + VerifyReader(builder.get()); + } + + void CutABlock(PartitionedIndexBuilder* builder, + const std::string& user_key) { + // Assuming a block is cut, add an entry to the index + std::string key = + std::string(*InternalKey(user_key, 0, ValueType::kTypeValue).rep()); + BlockHandle dont_care_block_handle(1, 1); + builder->AddIndexEntry(&key, nullptr, dont_care_block_handle); + } + + void CutABlock(PartitionedIndexBuilder* builder, const std::string& user_key, + const std::string& next_user_key) { + // Assuming a block is cut, add an entry to the index + std::string key = + std::string(*InternalKey(user_key, 0, ValueType::kTypeValue).rep()); + std::string next_key = std::string( + *InternalKey(next_user_key, 0, ValueType::kTypeValue).rep()); + BlockHandle dont_care_block_handle(1, 1); + Slice slice = Slice(next_key.data(), next_key.size()); + builder->AddIndexEntry(&key, &slice, dont_care_block_handle); + } +}; + +TEST_F(PartitionedFilterBlockTest, EmptyBuilder) { + std::unique_ptr pib(NewIndexBuilder()); + std::unique_ptr builder(NewBuilder(pib.get())); + const bool empty = true; + VerifyReader(builder.get(), empty); +} + +TEST_F(PartitionedFilterBlockTest, OneBlock) { + int num_keys = sizeof(keys) / sizeof(*keys); + for (int i = 1; i < num_keys + 1; i++) { + table_options_.index_per_partition = i; + TestBlockPerAllKeys(); + } +} + +TEST_F(PartitionedFilterBlockTest, TwoBlocksPerKey) { + int num_keys = sizeof(keys) / sizeof(*keys); + for (int i = 1; i < num_keys + 1; i++) { + table_options_.index_per_partition = i; + TestBlockPerTwoKeys(); + } +} + +TEST_F(PartitionedFilterBlockTest, OneBlockPerKey) { + int num_keys = sizeof(keys) / sizeof(*keys); + for (int i = 1; i < num_keys + 1; i++) { + table_options_.index_per_partition = i; + TestBlockPerKey(); + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}