Fix a bug for SeekForPrev with partitioned filter and prefix (#8137)

Summary:
According to https://github.com/facebook/rocksdb/issues/5907, each filter partition "should include the bloom of the prefix of the last
key in the previous partition" so that SeekForPrev() in prefix mode can return correct result.
The prefix of the last key in the previous partition does not necessarily have the same prefix
as the first key in the current partition. Regardless of the first key in current partition, the
prefix of the last key in the previous partition should be added. The existing code, however,
does not follow this. Furthermore, there is another issue: when finishing current filter partition,
`FullFilterBlockBuilder::AddPrefix()` is called for the first key in next filter partition, which effectively
overwrites `last_prefix_str_` prematurely. Consequently, when the filter block builder proceeds
to the next partition, `last_prefix_str_` will be the prefix of its first key, leaving no way of adding
the bloom of the prefix of the last key of the previous partition.

Prefix extractor is FixedLength.2.
```
[  filter part 1   ]    [  filter part 2    ]
                  abc    d
```
When SeekForPrev("abcd"), checking the filter partition will land on filter part 2 because "abcd" > "abc"
but smaller than "d".
If the filter in filter part 2 happens to return false for the test for "ab", then SeekForPrev("abcd") will build
incorrect iterator tree in non-total-order mode.

Also fix a unit test which starts to fail following this PR. `InDomain` should not fail due to assertion
error when checking on an arbitrary key.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8137

Test Plan:
```
make check
```

Without this fix, the following command will fail pretty soon.
```
./db_stress --acquire_snapshot_one_in=10000 --avoid_flush_during_recovery=0 \
--avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=0 \
--batch_protection_bytes_per_key=0 --block_size=16384 --bloom_bits=17 \
--bottommost_compression_type=disable --cache_index_and_filter_blocks=1 --cache_size=1048576 \
--checkpoint_one_in=0 --checksum_type=kxxHash64 --clear_column_family_one_in=0 \
--compact_files_one_in=1000000 --compact_range_one_in=1000000 --compaction_ttl=0 \
--compression_max_dict_buffer_bytes=0 --compression_max_dict_bytes=0 \
--compression_parallel_threads=1 --compression_type=zstd --compression_zstd_max_train_bytes=0 \
--continuous_verification_interval=0 --db=/dev/shm/rocksdb/rocksdb_crashtest_whitebox \
--db_write_buffer_size=8388608 --delpercent=5 --delrangepercent=0 --destroy_db_initially=0 --enable_blob_files=0 \
--enable_compaction_filter=0 --enable_pipelined_write=1 --file_checksum_impl=big --flush_one_in=1000000 \
--format_version=5 --get_current_wal_file_one_in=0 --get_live_files_one_in=1000000 --get_property_one_in=1000000 \
--get_sorted_wal_files_one_in=0 --index_block_restart_interval=4 --index_type=2 --ingest_external_file_one_in=0 \
--iterpercent=10 --key_len_percent_dist=1,30,69 --level_compaction_dynamic_level_bytes=True \
--log2_keys_per_lock=10 --long_running_snapshots=1 --mark_for_compaction_one_file_in=0 \
--max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=100000000 --max_key_len=3 \
--max_manifest_file_size=1073741824 --max_write_batch_group_size_bytes=16777216 --max_write_buffer_number=3 \
--max_write_buffer_size_to_maintain=8388608 --memtablerep=skip_list --mmap_read=1 --mock_direct_io=False \
--nooverwritepercent=0 --open_files=500000 --ops_per_thread=20000000 --optimize_filters_for_memory=0 --paranoid_file_checks=1 --partition_filters=1 --partition_pinning=0 --pause_background_one_in=1000000 \
--periodic_compaction_seconds=0 --prefixpercent=5 --progress_reports=0 --read_fault_one_in=0 --read_only=0 \
--readpercent=45 --recycle_log_file_num=0 --reopen=20 --secondary_catch_up_one_in=0 \
--snapshot_hold_ops=100000 --sst_file_manager_bytes_per_sec=104857600 \
--sst_file_manager_bytes_per_truncate=0 --subcompactions=2 --sync=0 --sync_fault_injection=False \
--target_file_size_base=2097152 --target_file_size_multiplier=2 --test_batches_snapshots=0 --test_cf_consistency=0 \
--top_level_index_pinning=0 --unpartitioned_pinning=1 --use_blob_db=0 --use_block_based_filter=0 \
--use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_merge=0 \
--use_multiget=0 --use_ribbon_filter=0 --use_txn=0 --user_timestamp_size=8 --verify_checksum=1 \
--verify_checksum_one_in=1000000 --verify_db_one_in=100000 --write_buffer_size=4194304 \
--write_dbid_to_manifest=1 --writepercent=35
```

Reviewed By: pdillinger

Differential Revision: D27553054

Pulled By: riversand963

fbshipit-source-id: 60e391e4a2d8d98a9a3172ec5d6176b90ec3de98
main
Yanqin Jin 4 years ago committed by Facebook GitHub Bot
parent c4d0e66d65
commit 09528f9fa1
  1. 1
      HISTORY.md
  2. 51
      db/db_bloom_filter_test.cc
  3. 14
      table/block_based/full_filter_block.cc
  4. 7
      table/block_based/full_filter_block.h
  5. 15
      table/block_based/partitioned_filter_block.cc
  6. 3
      table/table_test.cc
  7. 1
      tools/db_crashtest.py

@ -9,6 +9,7 @@
* Fixed a potential hang in shutdown for a DB whose `Env` has high-pri thread pool disabled (`Env::GetBackgroundThreads(Env::Priority::HIGH) == 0`) * Fixed a potential hang in shutdown for a DB whose `Env` has high-pri thread pool disabled (`Env::GetBackgroundThreads(Env::Priority::HIGH) == 0`)
* Made BackupEngine thread-safe and added documentation comments to clarify what is safe for multiple BackupEngine objects accessing the same backup directory. * Made BackupEngine thread-safe and added documentation comments to clarify what is safe for multiple BackupEngine objects accessing the same backup directory.
* Fixed crash (divide by zero) when compression dictionary is applied to a file containing only range tombstones. * Fixed crash (divide by zero) when compression dictionary is applied to a file containing only range tombstones.
* Fixed a backward iteration bug with partitioned filter enabled: not including the prefix of the last key of the previous filter partition in current filter partition can cause wrong iteration result.
### Performance Improvements ### Performance Improvements
* On ARM platform, use `yield` instead of `wfe` to relax cpu to gain better performance. * On ARM platform, use `yield` instead of `wfe` to relax cpu to gain better performance.

@ -7,6 +7,9 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <iomanip>
#include <sstream>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "options/options_helper.h" #include "options/options_helper.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
@ -2117,6 +2120,54 @@ TEST_F(DBBloomFilterTest, DynamicBloomFilterOptions) {
} }
} }
TEST_F(DBBloomFilterTest, SeekForPrevWithPartitionedFilters) {
Options options = CurrentOptions();
constexpr size_t kNumKeys = 10000;
static_assert(kNumKeys <= 10000, "kNumKeys have to be <= 10000");
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeys + 10));
options.create_if_missing = true;
constexpr size_t kPrefixLength = 4;
options.prefix_extractor.reset(NewFixedPrefixTransform(kPrefixLength));
options.compression = kNoCompression;
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(50));
bbto.index_shortening =
BlockBasedTableOptions::IndexShorteningMode::kNoShortening;
bbto.block_size = 128;
bbto.metadata_block_size = 128;
bbto.partition_filters = true;
bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
const std::string value(64, '\0');
WriteOptions write_opts;
write_opts.disableWAL = true;
for (size_t i = 0; i < kNumKeys; ++i) {
std::ostringstream oss;
oss << std::setfill('0') << std::setw(4) << std::fixed << i;
ASSERT_OK(db_->Put(write_opts, oss.str(), value));
}
ASSERT_OK(Flush());
ReadOptions read_opts;
// Use legacy, implicit prefix seek
read_opts.total_order_seek = false;
read_opts.auto_prefix_mode = false;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
for (size_t i = 0; i < kNumKeys; ++i) {
// Seek with a key after each one added but with same prefix. One will
// surely cross a partition boundary.
std::ostringstream oss;
oss << std::setfill('0') << std::setw(4) << std::fixed << i << "a";
it->SeekForPrev(oss.str());
ASSERT_OK(it->status());
ASSERT_TRUE(it->Valid());
}
it.reset();
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -22,6 +22,7 @@ FullFilterBlockBuilder::FullFilterBlockBuilder(
whole_key_filtering_(whole_key_filtering), whole_key_filtering_(whole_key_filtering),
last_whole_key_recorded_(false), last_whole_key_recorded_(false),
last_prefix_recorded_(false), last_prefix_recorded_(false),
last_key_in_domain_(false),
num_added_(0) { num_added_(0) {
assert(filter_bits_builder != nullptr); assert(filter_bits_builder != nullptr);
filter_bits_builder_.reset(filter_bits_builder); filter_bits_builder_.reset(filter_bits_builder);
@ -30,6 +31,15 @@ FullFilterBlockBuilder::FullFilterBlockBuilder(
void FullFilterBlockBuilder::Add(const Slice& key_without_ts) { void FullFilterBlockBuilder::Add(const Slice& key_without_ts) {
const bool add_prefix = const bool add_prefix =
prefix_extractor_ && prefix_extractor_->InDomain(key_without_ts); prefix_extractor_ && prefix_extractor_->InDomain(key_without_ts);
if (!last_prefix_recorded_ && last_key_in_domain_) {
// We can reach here when a new filter partition starts in partitioned
// filter. The last prefix in the previous partition should be added if
// necessary regardless of key_without_ts, to support prefix SeekForPrev.
AddKey(last_prefix_str_);
last_prefix_recorded_ = true;
}
if (whole_key_filtering_) { if (whole_key_filtering_) {
if (!add_prefix) { if (!add_prefix) {
AddKey(key_without_ts); AddKey(key_without_ts);
@ -49,7 +59,10 @@ void FullFilterBlockBuilder::Add(const Slice& key_without_ts) {
} }
} }
if (add_prefix) { if (add_prefix) {
last_key_in_domain_ = true;
AddPrefix(key_without_ts); AddPrefix(key_without_ts);
} else {
last_key_in_domain_ = false;
} }
} }
@ -61,6 +74,7 @@ inline void FullFilterBlockBuilder::AddKey(const Slice& key) {
// Add prefix to filter if needed // Add prefix to filter if needed
void FullFilterBlockBuilder::AddPrefix(const Slice& key) { void FullFilterBlockBuilder::AddPrefix(const Slice& key) {
assert(prefix_extractor_ && prefix_extractor_->InDomain(key));
Slice prefix = prefix_extractor_->Transform(key); Slice prefix = prefix_extractor_->Transform(key);
if (whole_key_filtering_) { if (whole_key_filtering_) {
// if both whole_key and prefix are added to bloom then we will have whole // if both whole_key and prefix are added to bloom then we will have whole

@ -61,6 +61,7 @@ class FullFilterBlockBuilder : public FilterBlockBuilder {
virtual void Reset(); virtual void Reset();
void AddPrefix(const Slice& key); void AddPrefix(const Slice& key);
const SliceTransform* prefix_extractor() { return prefix_extractor_; } const SliceTransform* prefix_extractor() { return prefix_extractor_; }
const std::string& last_prefix_str() const { return last_prefix_str_; }
private: private:
// important: all of these might point to invalid addresses // important: all of these might point to invalid addresses
@ -72,10 +73,14 @@ class FullFilterBlockBuilder : public FilterBlockBuilder {
std::string last_whole_key_str_; std::string last_whole_key_str_;
bool last_prefix_recorded_; bool last_prefix_recorded_;
std::string last_prefix_str_; std::string last_prefix_str_;
// Whether prefix_extractor_->InDomain(last_whole_key_) is true.
// Used in partitioned filters so that the last prefix from the previous
// filter partition will be added to the current partition if
// last_key_in_domain_ is true, regardless of the current key.
bool last_key_in_domain_;
uint32_t num_added_; uint32_t num_added_;
std::unique_ptr<const char[]> filter_data_; std::unique_ptr<const char[]> filter_data_;
}; };
// A FilterBlockReader is used to parse filter from SST table. // A FilterBlockReader is used to parse filter from SST table.

@ -73,13 +73,16 @@ void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock(
} }
filter_gc.push_back(std::unique_ptr<const char[]>(nullptr)); filter_gc.push_back(std::unique_ptr<const char[]>(nullptr));
// Add the prefix of the next key before finishing the partition. This hack, // Add the prefix of the next key before finishing the partition without
// fixes a bug with format_verison=3 where seeking for the prefix would lead // updating last_prefix_str_. This hack, fixes a bug with format_verison=3
// us to the previous partition. // where seeking for the prefix would lead us to the previous partition.
const bool add_prefix = const bool maybe_add_prefix =
next_key && prefix_extractor() && prefix_extractor()->InDomain(*next_key); next_key && prefix_extractor() && prefix_extractor()->InDomain(*next_key);
if (add_prefix) { if (maybe_add_prefix) {
FullFilterBlockBuilder::AddPrefix(*next_key); const Slice next_key_prefix = prefix_extractor()->Transform(*next_key);
if (next_key_prefix.compare(last_prefix_str()) != 0) {
AddKey(next_key_prefix);
}
} }
Slice filter = filter_bits_builder_->Finish(&filter_gc.back()); Slice filter = filter_bits_builder_->Finish(&filter_gc.back());

@ -3988,8 +3988,7 @@ class TestPrefixExtractor : public ROCKSDB_NAMESPACE::SliceTransform {
} }
bool InDomain(const ROCKSDB_NAMESPACE::Slice& src) const override { bool InDomain(const ROCKSDB_NAMESPACE::Slice& src) const override {
assert(IsValid(src)); return IsValid(src);
return true;
} }
bool InRange(const ROCKSDB_NAMESPACE::Slice& /*dst*/) const override { bool InRange(const ROCKSDB_NAMESPACE::Slice& /*dst*/) const override {

@ -299,7 +299,6 @@ ts_params = {
"use_blob_db": 0, "use_blob_db": 0,
"enable_compaction_filter": 0, "enable_compaction_filter": 0,
"ingest_external_file_one_in": 0, "ingest_external_file_one_in": 0,
"partition_filters": 0,
} }
def finalize_and_sanitize(src_params): def finalize_and_sanitize(src_params):

Loading…
Cancel
Save