Configure index partition size

Summary:
Allow the users to specify the target index partition size.

With this patch an index partition is cut before its estimated in-memory size goes above the configured value for metadata_block_size. The filter partitions are still cut right after an index partition is cut.
Closes https://github.com/facebook/rocksdb/pull/2041

Differential Revision: D4780216

Pulled By: maysamyabandeh

fbshipit-source-id: 95a0831
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 69c8d524a3
commit e7731d119a
  1. 4
      db/db_bloom_filter_test.cc
  2. 2
      db/db_test_util.cc
  3. 2
      db/db_test_util.h
  4. 4
      include/rocksdb/flush_block_policy.h
  5. 13
      include/rocksdb/table.h
  6. 6
      table/flush_block_policy.cc
  7. 59
      table/index_builder.cc
  8. 27
      table/index_builder.h
  9. 56
      table/partitioned_filter_block_test.cc
  10. 6
      table/table_test.cc
  11. 4
      util/options_helper.h
  12. 2
      util/options_settable_test.cc

@ -50,7 +50,7 @@ TEST_P(DBBloomFilterTestWithParam, KeyMayExist) {
options_override.filter_policy.reset(
NewBloomFilterPolicy(20, use_block_based_filter_));
options_override.partition_filters = partition_filters_;
options_override.index_per_partition = 2;
options_override.metadata_block_size = 32;
Options options = CurrentOptions(options_override);
if (partition_filters_ &&
static_cast<BlockBasedTableOptions*>(
@ -334,7 +334,7 @@ TEST_P(DBBloomFilterTestWithParam, BloomFilter) {
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
table_options.index_per_partition = 2;
table_options.metadata_block_size = 32;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);

@ -437,7 +437,7 @@ Options DBTestBase::CurrentOptions(
if (options_override.filter_policy) {
table_options.filter_policy = options_override.filter_policy;
table_options.partition_filters = options_override.partition_filters;
table_options.index_per_partition = options_override.index_per_partition;
table_options.metadata_block_size = options_override.metadata_block_size;
}
if (set_block_based_table_factory) {
options.table_factory.reset(NewBlockBasedTableFactory(table_options));

@ -111,7 +111,7 @@ struct OptionsOverride {
std::shared_ptr<const FilterPolicy> filter_policy = nullptr;
// These will be used only if filter_policy is set
bool partition_filters = false;
uint64_t index_per_partition = 1024;
uint64_t metadata_block_size = 1024;
BlockBasedTableOptions::IndexType index_type =
BlockBasedTableOptions::IndexType::kBinarySearch;

@ -55,6 +55,10 @@ class FlushBlockBySizePolicyFactory : public FlushBlockPolicyFactory {
virtual FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& table_options,
const BlockBuilder& data_block_builder) const override;
static FlushBlockPolicy* NewFlushBlockPolicy(
const uint64_t size, const int deviation,
const BlockBuilder& data_block_builder);
};
} // rocksdb

@ -144,9 +144,16 @@ struct BlockBasedTableOptions {
// Same as block_restart_interval but used for the index block.
int index_block_restart_interval = 1;
// Number of index keys per partition of indexes in a multi-level index
// i.e., the number of data blocks covered by each index partition
uint64_t index_per_partition = 1024;
// Block size for partitioned metadata. Currently applied to indexes when
// kTwoLevelIndexSearch is used and to filters when partition_filters is used.
// Note: Since in the current implementation the filters and index partitions
// are aligned, an index/filter block is created when eitehr index or filter
// block size reaches the specified limit.
// Note: this limit is currently applied to only index blocks; a filter
// partition is cut right after an index block is cut
// TODO(myabandeh): remove the note above when filter partitions are cut
// separately
uint64_t metadata_block_size = 4096;
// Note: currently this option requires kTwoLevelIndexSearch to be set as
// well.

@ -71,4 +71,10 @@ FlushBlockPolicy* FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
data_block_builder);
}
FlushBlockPolicy* FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
const uint64_t size, const int deviation,
const BlockBuilder& data_block_builder) {
return new FlushBlockBySizePolicy(size, deviation, data_block_builder);
}
} // namespace rocksdb

@ -15,6 +15,7 @@
#include <string>
#include "rocksdb/comparator.h"
#include "rocksdb/flush_block_policy.h"
#include "table/format.h"
#include "table/partitioned_filter_block.h"
@ -60,32 +61,60 @@ PartitionedIndexBuilder::PartitionedIndexBuilder(
const BlockBasedTableOptions& table_opt)
: IndexBuilder(comparator),
index_block_builder_(table_opt.index_block_restart_interval),
table_opt_(table_opt) {
sub_index_builder_ = IndexBuilder::CreateIndexBuilder(sub_type_, comparator_,
nullptr, table_opt_);
}
sub_index_builder_(nullptr),
table_opt_(table_opt) {}
PartitionedIndexBuilder::~PartitionedIndexBuilder() {
delete sub_index_builder_;
}
void PartitionedIndexBuilder::MakeNewSubIndexBuilder() {
assert(sub_index_builder_ == nullptr);
sub_index_builder_ = new ShortenedIndexBuilder(
comparator_, table_opt_.index_block_restart_interval);
flush_policy_.reset(FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
table_opt_.metadata_block_size, table_opt_.block_size_deviation,
sub_index_builder_->index_block_builder_));
}
void PartitionedIndexBuilder::AddIndexEntry(
std::string* last_key_in_current_block,
const Slice* first_key_in_next_block, const BlockHandle& block_handle) {
sub_index_builder_->AddIndexEntry(last_key_in_current_block,
first_key_in_next_block, block_handle);
num_indexes++;
// Note: to avoid two consecuitive flush in the same method call, we do not
// check flush policy when adding the last key
if (UNLIKELY(first_key_in_next_block == nullptr)) { // no more keys
entries_.push_back({std::string(*last_key_in_current_block),
std::unique_ptr<IndexBuilder>(sub_index_builder_)});
if (sub_index_builder_ == nullptr) {
MakeNewSubIndexBuilder();
}
sub_index_builder_->AddIndexEntry(last_key_in_current_block,
first_key_in_next_block, block_handle);
sub_index_last_key_ = std::string(*last_key_in_current_block);
entries_.push_back(
{sub_index_last_key_,
std::unique_ptr<ShortenedIndexBuilder>(sub_index_builder_)});
sub_index_builder_ = nullptr;
cut_filter_block = true;
} else if (num_indexes % table_opt_.index_per_partition == 0) {
entries_.push_back({std::string(*last_key_in_current_block),
std::unique_ptr<IndexBuilder>(sub_index_builder_)});
sub_index_builder_ = IndexBuilder::CreateIndexBuilder(
sub_type_, comparator_, nullptr, table_opt_);
cut_filter_block = true;
} else {
// apply flush policy only to non-empty sub_index_builder_
if (sub_index_builder_ != nullptr) {
std::string handle_encoding;
block_handle.EncodeTo(&handle_encoding);
bool do_flush =
flush_policy_->Update(*last_key_in_current_block, handle_encoding);
if (do_flush) {
entries_.push_back(
{sub_index_last_key_,
std::unique_ptr<ShortenedIndexBuilder>(sub_index_builder_)});
cut_filter_block = true;
sub_index_builder_ = nullptr;
}
}
if (sub_index_builder_ == nullptr) {
MakeNewSubIndexBuilder();
}
sub_index_builder_->AddIndexEntry(last_key_in_current_block,
first_key_in_next_block, block_handle);
sub_index_last_key_ = std::string(*last_key_in_current_block);
}
}

@ -134,6 +134,7 @@ class ShortenedIndexBuilder : public IndexBuilder {
index_block_builder_.Add(*last_key_in_current_block, handle_encoding);
}
using IndexBuilder::Finish;
virtual Status Finish(
IndexBlocks* index_blocks,
const BlockHandle& last_partition_block_handle) override {
@ -145,6 +146,8 @@ class ShortenedIndexBuilder : public IndexBuilder {
return index_block_builder_.CurrentSizeEstimate();
}
friend class PartitionedIndexBuilder;
private:
BlockBuilder index_block_builder_;
};
@ -305,24 +308,26 @@ class PartitionedIndexBuilder : public IndexBuilder {
return false;
}
std::string& GetPartitionKey() { return entries_.back().key; }
std::string& GetPartitionKey() { return sub_index_last_key_; }
private:
static const BlockBasedTableOptions::IndexType sub_type_ =
BlockBasedTableOptions::kBinarySearch;
void MakeNewSubIndexBuilder();
struct Entry {
std::string key;
std::unique_ptr<IndexBuilder> value;
std::unique_ptr<ShortenedIndexBuilder> value;
};
std::list<Entry> entries_; // list of partitioned indexes and their keys
BlockBuilder index_block_builder_; // top-level index builder
IndexBuilder* sub_index_builder_; // the active partition index builder
uint64_t num_indexes = 0;
bool finishing_indexes =
false; // true if Finish is called once but not complete yet.
// the active partition index builder
ShortenedIndexBuilder* sub_index_builder_;
// the last key in the active partition index builder
std::string sub_index_last_key_;
std::unique_ptr<FlushBlockPolicy> flush_policy_;
// true if Finish is called once but not complete yet.
bool finishing_indexes = false;
const BlockBasedTableOptions& table_opt_;
// Filter data
bool cut_filter_block =
false; // true if it should cut the next filter partition block
// true if it should cut the next filter partition block
bool cut_filter_block = false;
};
} // namespace rocksdb

@ -7,6 +7,7 @@
#include "rocksdb/filter_policy.h"
#include "table/index_builder.h"
#include "table/partitioned_filter_block.h"
#include "util/coding.h"
#include "util/hash.h"
@ -51,6 +52,16 @@ class PartitionedFilterBlockTest : public testing::Test {
const std::string keys[4] = {"afoo", "bar", "box", "hello"};
const std::string missing_keys[2] = {"missing", "other"};
uint64_t MaxIndexSize() {
int num_keys = sizeof(keys) / sizeof(*keys);
uint64_t max_key_size = 0;
for (int i = 1; i < num_keys; i++) {
max_key_size = std::max(max_key_size, keys[i].size());
}
uint64_t max_index_size = num_keys * (max_key_size + 8 /*handle*/);
return max_index_size;
}
int last_offset = 10;
BlockHandle Write(const Slice& slice) {
BlockHandle bh(last_offset + 1, slice.size());
@ -122,8 +133,7 @@ class PartitionedFilterBlockTest : public testing::Test {
}
}
void TestBlockPerKey() {
table_options_.index_per_partition = 1;
int TestBlockPerKey() {
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
std::unique_ptr<PartitionedFilterBlockBuilder> builder(
NewBuilder(pib.get()));
@ -142,6 +152,7 @@ class PartitionedFilterBlockTest : public testing::Test {
CutABlock(pib.get(), keys[i]);
VerifyReader(builder.get());
return CountNumOfIndexPartitions(pib.get());
}
void TestBlockPerTwoKeys() {
@ -201,6 +212,18 @@ class PartitionedFilterBlockTest : public testing::Test {
Slice slice = Slice(next_key.data(), next_key.size());
builder->AddIndexEntry(&key, &slice, dont_care_block_handle);
}
int CountNumOfIndexPartitions(PartitionedIndexBuilder* builder) {
IndexBuilder::IndexBlocks dont_care_ib;
BlockHandle dont_care_bh(10, 10);
Status s;
int cnt = 0;
do {
s = builder->Finish(&dont_care_ib, dont_care_bh);
cnt++;
} while (s.IsIncomplete());
return cnt - 1; // 1 is 2nd level index
}
};
TEST_F(PartitionedFilterBlockTest, EmptyBuilder) {
@ -211,29 +234,40 @@ TEST_F(PartitionedFilterBlockTest, EmptyBuilder) {
}
TEST_F(PartitionedFilterBlockTest, OneBlock) {
int num_keys = sizeof(keys) / sizeof(*keys);
for (int i = 1; i < num_keys + 1; i++) {
table_options_.index_per_partition = i;
uint64_t max_index_size = MaxIndexSize();
for (uint64_t i = 1; i < max_index_size + 1; i++) {
table_options_.metadata_block_size = i;
TestBlockPerAllKeys();
}
}
TEST_F(PartitionedFilterBlockTest, TwoBlocksPerKey) {
int num_keys = sizeof(keys) / sizeof(*keys);
for (int i = 1; i < num_keys + 1; i++) {
table_options_.index_per_partition = i;
uint64_t max_index_size = MaxIndexSize();
for (uint64_t i = 1; i < max_index_size + 1; i++) {
table_options_.metadata_block_size = i;
TestBlockPerTwoKeys();
}
}
TEST_F(PartitionedFilterBlockTest, OneBlockPerKey) {
int num_keys = sizeof(keys) / sizeof(*keys);
for (int i = 1; i < num_keys + 1; i++) {
table_options_.index_per_partition = i;
uint64_t max_index_size = MaxIndexSize();
for (uint64_t i = 1; i < max_index_size + 1; i++) {
table_options_.metadata_block_size = i;
TestBlockPerKey();
}
}
TEST_F(PartitionedFilterBlockTest, PartitionCount) {
int num_keys = sizeof(keys) / sizeof(*keys);
table_options_.metadata_block_size = MaxIndexSize();
int partitions = TestBlockPerKey();
ASSERT_EQ(partitions, 1);
// A low number ensures cutting a block after each key
table_options_.metadata_block_size = 1;
partitions = TestBlockPerKey();
ASSERT_EQ(partitions, num_keys - 1 /* last two keys make one flush */);
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -1657,10 +1657,12 @@ TEST_F(TableTest, HashIndexTest) {
TEST_F(TableTest, PartitionIndexTest) {
const int max_index_keys = 5;
for (int i = 1; i <= max_index_keys + 1; i++) {
const int est_max_index_key_value_size = 32;
const int est_max_index_size = max_index_keys * est_max_index_key_value_size;
for (int i = 1; i <= est_max_index_size + 1; i++) {
BlockBasedTableOptions table_options;
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
table_options.index_per_partition = i;
table_options.metadata_block_size = i;
IndexTest(table_options);
}
}

@ -636,8 +636,8 @@ static std::unordered_map<std::string, OptionTypeInfo>
{"index_block_restart_interval",
{offsetof(struct BlockBasedTableOptions, index_block_restart_interval),
OptionType::kInt, OptionVerificationType::kNormal, false, 0}},
{"index_per_partition",
{offsetof(struct BlockBasedTableOptions, index_per_partition),
{"metadata_block_size",
{offsetof(struct BlockBasedTableOptions, metadata_block_size),
OptionType::kUInt64T, OptionVerificationType::kNormal, false, 0}},
{"partition_filters",
{offsetof(struct BlockBasedTableOptions, partition_filters),

@ -155,7 +155,7 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
"checksum=kxxHash;hash_index_allow_collision=1;no_block_cache=1;"
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
"block_size_deviation=8;block_restart_interval=4; "
"index_per_partition=4;"
"metadata_block_size=1024;"
"partition_filters=false;"
"index_block_restart_interval=4;"
"filter_policy=bloomfilter:4:true;whole_key_filtering=1;"

Loading…
Cancel
Save