Fix bug with kHashSearch and changing prefix_extractor with SetOptions (#10128)

Summary:
When opening an SST file created using index_type=kHashSearch,
the *current* prefix_extractor would be saved, and used with hash index
if the *new current* prefix_extractor at query time is compatible with
the SST file. This is a problem if the prefix_extractor at SST open time
is not compatible but SetOptions later changes (back) to one that is
compatible.

This change fixes that by using the known compatible (or missing) prefix
extractor we save for use with prefix filtering. Detail: I have moved the
InternalKeySliceTransform wrapper to avoid some indirection and remove
unnecessary fields.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10128

Test Plan:
expanded unit test (using some logic from https://github.com/facebook/rocksdb/issues/10122) that fails
before fix and probably covers some other previously uncovered cases.

Reviewed By: siying

Differential Revision: D36955738

Pulled By: pdillinger

fbshipit-source-id: 0c78a6b0d24054ef2f3cb237bf010c1c5589fb10
main
Peter Dillinger 3 years ago committed by Facebook GitHub Bot
parent 693dffd8e8
commit d3a3b02134
  1. 1
      HISTORY.md
  2. 36
      db/db_test.cc
  3. 2
      db/dbformat.h
  4. 37
      table/block_based/block_based_table_reader.cc
  5. 6
      table/block_based/block_based_table_reader.h
  6. 20
      table/block_based/block_prefix_index.cc
  7. 14
      table/block_based/block_prefix_index.h
  8. 4
      table/block_based/hash_index_reader.cc

@ -7,6 +7,7 @@
* Fix a race condition in WAL size tracking which is caused by an unsafe iterator access after container is changed. * Fix a race condition in WAL size tracking which is caused by an unsafe iterator access after container is changed.
* Fix unprotected concurrent accesses to `WritableFileWriter::filesize_` by `DB::SyncWAL()` and `DB::Put()` in two write queue mode. * Fix unprotected concurrent accesses to `WritableFileWriter::filesize_` by `DB::SyncWAL()` and `DB::Put()` in two write queue mode.
* Fix a bug in WAL tracking. Before this PR (#10087), calling `SyncWAL()` on the only WAL file of the db will not log the event in MANIFEST, thus allowing a subsequent `DB::Open` even if the WAL file is missing or corrupted. * Fix a bug in WAL tracking. Before this PR (#10087), calling `SyncWAL()` on the only WAL file of the db will not log the event in MANIFEST, thus allowing a subsequent `DB::Open` even if the WAL file is missing or corrupted.
* Fix a bug that could return wrong results with `index_type=kHashSearch` and using `SetOptions` to change the `prefix_extractor`.
* Fixed a bug in WAL tracking with wal_compression. WAL compression writes a kSetCompressionType record which is not associated with any sequence number. As result, WalManager::GetSortedWalsOfType() will skip these WALs and not return them to caller, e.g. Checkpoint, Backup, causing the operations to fail. * Fixed a bug in WAL tracking with wal_compression. WAL compression writes a kSetCompressionType record which is not associated with any sequence number. As result, WalManager::GetSortedWalsOfType() will skip these WALs and not return them to caller, e.g. Checkpoint, Backup, causing the operations to fail.
### Public API changes ### Public API changes

@ -3467,7 +3467,40 @@ TEST_F(DBTest, BlockBasedTablePrefixIndexTest) {
ASSERT_OK(Flush()); ASSERT_OK(Flush());
ASSERT_OK(Put("k2", "v2")); ASSERT_OK(Put("k2", "v2"));
// Reopen it without prefix extractor, make sure everything still works. // Reopen with different prefix extractor, make sure everything still works.
// RocksDB should just fall back to the binary index.
options.prefix_extractor.reset(NewFixedPrefixTransform(2));
Reopen(options);
ASSERT_EQ("v1", Get("k1"));
ASSERT_EQ("v2", Get("k2"));
#ifndef ROCKSDB_LITE
// Back to original
ASSERT_OK(dbfull()->SetOptions({{"prefix_extractor", "fixed:1"}}));
ASSERT_EQ("v1", Get("k1"));
ASSERT_EQ("v2", Get("k2"));
#endif // !ROCKSDB_LITE
// Same if there's a problem initally loading prefix transform
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::ForceNullTablePrefixExtractor",
[&](void* arg) { *static_cast<bool*>(arg) = true; });
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ("v1", Get("k1"));
ASSERT_EQ("v2", Get("k2"));
#ifndef ROCKSDB_LITE
// Change again
ASSERT_OK(dbfull()->SetOptions({{"prefix_extractor", "fixed:2"}}));
ASSERT_EQ("v1", Get("k1"));
ASSERT_EQ("v2", Get("k2"));
#endif // !ROCKSDB_LITE
SyncPoint::GetInstance()->DisableProcessing();
// Reopen with no prefix extractor, make sure everything still works.
// RocksDB should just fall back to the binary index. // RocksDB should just fall back to the binary index.
table_options.index_type = BlockBasedTableOptions::kBinarySearch; table_options.index_type = BlockBasedTableOptions::kBinarySearch;
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
@ -3477,6 +3510,7 @@ TEST_F(DBTest, BlockBasedTablePrefixIndexTest) {
ASSERT_EQ("v1", Get("k1")); ASSERT_EQ("v1", Get("k1"));
ASSERT_EQ("v2", Get("k2")); ASSERT_EQ("v2", Get("k2"));
} }
TEST_F(DBTest, BlockBasedTablePrefixHashIndexTest) { TEST_F(DBTest, BlockBasedTablePrefixHashIndexTest) {
// create a DB with block prefix index // create a DB with block prefix index
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;

@ -625,7 +625,7 @@ class IterKey {
}; };
// Convert from a SliceTransform of user keys, to a SliceTransform of // Convert from a SliceTransform of user keys, to a SliceTransform of
// user keys. // internal keys.
class InternalKeySliceTransform : public SliceTransform { class InternalKeySliceTransform : public SliceTransform {
public: public:
explicit InternalKeySliceTransform(const SliceTransform* transform) explicit InternalKeySliceTransform(const SliceTransform* transform)

@ -654,17 +654,6 @@ Status BlockBasedTable::Open(
file_size, level, immortal_table); file_size, level, immortal_table);
rep->file = std::move(file); rep->file = std::move(file);
rep->footer = footer; rep->footer = footer;
// We've successfully read the footer. We are ready to serve requests.
// Better not mutate rep_ after the creation. eg. internal_prefix_transform
// raw pointer will be used to create HashIndexReader, whose reset may
// access a dangling pointer.
// We need to wrap data with internal_prefix_transform to make sure it can
// handle prefix correctly.
// FIXME: is changed prefix_extractor handled anywhere for hash index?
if (prefix_extractor != nullptr) {
rep->internal_prefix_transform.reset(
new InternalKeySliceTransform(prefix_extractor.get()));
}
// For fully portable/stable cache keys, we need to read the properties // For fully portable/stable cache keys, we need to read the properties
// block before setting up cache keys. TODO: consider setting up a bootstrap // block before setting up cache keys. TODO: consider setting up a bootstrap
@ -694,8 +683,14 @@ Status BlockBasedTable::Open(
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
if (!PrefixExtractorChangedHelper(rep->table_properties.get(), bool force_null_table_prefix_extractor = false;
prefix_extractor.get())) { TEST_SYNC_POINT_CALLBACK(
"BlockBasedTable::Open::ForceNullTablePrefixExtractor",
&force_null_table_prefix_extractor);
if (force_null_table_prefix_extractor) {
assert(!rep->table_prefix_extractor);
} else if (!PrefixExtractorChangedHelper(rep->table_properties.get(),
prefix_extractor.get())) {
// Establish fast path for unchanged prefix_extractor // Establish fast path for unchanged prefix_extractor
rep->table_prefix_extractor = prefix_extractor; rep->table_prefix_extractor = prefix_extractor;
} else { } else {
@ -2003,7 +1998,7 @@ InternalIterator* BlockBasedTable::NewIterator(
read_options.auto_prefix_mode || PrefixExtractorChanged(prefix_extractor); read_options.auto_prefix_mode || PrefixExtractorChanged(prefix_extractor);
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(NewIndexIterator( std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(NewIndexIterator(
read_options, read_options,
need_upper_bound_check && /*disable_prefix_seek=*/need_upper_bound_check &&
rep_->index_type == BlockBasedTableOptions::kHashSearch, rep_->index_type == BlockBasedTableOptions::kHashSearch,
/*input_iter=*/nullptr, /*get_context=*/nullptr, &lookup_context)); /*input_iter=*/nullptr, /*get_context=*/nullptr, &lookup_context));
if (arena == nullptr) { if (arena == nullptr) {
@ -2565,18 +2560,10 @@ Status BlockBasedTable::CreateIndexReader(
lookup_context, index_reader); lookup_context, index_reader);
} }
case BlockBasedTableOptions::kHashSearch: { case BlockBasedTableOptions::kHashSearch: {
std::unique_ptr<Block> metaindex_guard; if (!rep_->table_prefix_extractor) {
std::unique_ptr<InternalIterator> metaindex_iter_guard;
bool should_fallback = false;
// FIXME: is changed prefix_extractor handled anywhere for hash index?
if (rep_->internal_prefix_transform.get() == nullptr) {
ROCKS_LOG_WARN(rep_->ioptions.logger, ROCKS_LOG_WARN(rep_->ioptions.logger,
"No prefix extractor passed in. Fall back to binary" "Missing prefix extractor for hash index. Fall back to"
" search index."); " binary search index.");
should_fallback = true;
}
if (should_fallback) {
return BinarySearchIndexReader::Create(this, ro, prefetch_buffer, return BinarySearchIndexReader::Create(this, ro, prefetch_buffer,
use_cache, prefetch, pin, use_cache, prefetch, pin,
lookup_context, index_reader); lookup_context, index_reader);

@ -596,12 +596,6 @@ struct BlockBasedTable::Rep {
BlockBasedTableOptions::IndexType index_type; BlockBasedTableOptions::IndexType index_type;
bool whole_key_filtering; bool whole_key_filtering;
bool prefix_filtering; bool prefix_filtering;
// TODO(kailiu) It is very ugly to use internal key in table, since table
// module should not be relying on db module. However to make things easier
// and compatible with existing code, we introduce a wrapper that allows
// block to extract prefix without knowing if a key is internal or not.
// null if no prefix_extractor is passed in when opening the table reader.
std::unique_ptr<SliceTransform> internal_prefix_transform;
std::shared_ptr<const SliceTransform> table_prefix_extractor; std::shared_ptr<const SliceTransform> table_prefix_extractor;
std::shared_ptr<const FragmentedRangeTombstoneList> fragmented_range_dels; std::shared_ptr<const FragmentedRangeTombstoneList> fragmented_range_dels;

@ -69,9 +69,6 @@ struct PrefixRecord {
class BlockPrefixIndex::Builder { class BlockPrefixIndex::Builder {
public: public:
explicit Builder(const SliceTransform* internal_prefix_extractor)
: internal_prefix_extractor_(internal_prefix_extractor) {}
void Add(const Slice& key_prefix, uint32_t start_block, uint32_t num_blocks) { void Add(const Slice& key_prefix, uint32_t start_block, uint32_t num_blocks) {
PrefixRecord* record = reinterpret_cast<PrefixRecord*>( PrefixRecord* record = reinterpret_cast<PrefixRecord*>(
arena_.AllocateAligned(sizeof(PrefixRecord))); arena_.AllocateAligned(sizeof(PrefixRecord)));
@ -82,7 +79,7 @@ class BlockPrefixIndex::Builder {
prefixes_.push_back(record); prefixes_.push_back(record);
} }
BlockPrefixIndex* Finish() { BlockPrefixIndex* Finish(const SliceTransform* prefix_extractor) {
// For now, use roughly 1:1 prefix to bucket ratio. // For now, use roughly 1:1 prefix to bucket ratio.
uint32_t num_buckets = static_cast<uint32_t>(prefixes_.size()) + 1; uint32_t num_buckets = static_cast<uint32_t>(prefixes_.size()) + 1;
@ -154,25 +151,22 @@ class BlockPrefixIndex::Builder {
assert(offset == total_block_array_entries); assert(offset == total_block_array_entries);
return new BlockPrefixIndex(internal_prefix_extractor_, num_buckets, return new BlockPrefixIndex(prefix_extractor, num_buckets, buckets,
buckets, total_block_array_entries, total_block_array_entries, block_array_buffer);
block_array_buffer);
} }
private: private:
const SliceTransform* internal_prefix_extractor_;
std::vector<PrefixRecord*> prefixes_; std::vector<PrefixRecord*> prefixes_;
Arena arena_; Arena arena_;
}; };
Status BlockPrefixIndex::Create(const SliceTransform* internal_prefix_extractor, Status BlockPrefixIndex::Create(const SliceTransform* prefix_extractor,
const Slice& prefixes, const Slice& prefix_meta, const Slice& prefixes, const Slice& prefix_meta,
BlockPrefixIndex** prefix_index) { BlockPrefixIndex** prefix_index) {
uint64_t pos = 0; uint64_t pos = 0;
auto meta_pos = prefix_meta; auto meta_pos = prefix_meta;
Status s; Status s;
Builder builder(internal_prefix_extractor); Builder builder;
while (!meta_pos.empty()) { while (!meta_pos.empty()) {
uint32_t prefix_size = 0; uint32_t prefix_size = 0;
@ -201,14 +195,14 @@ Status BlockPrefixIndex::Create(const SliceTransform* internal_prefix_extractor,
} }
if (s.ok()) { if (s.ok()) {
*prefix_index = builder.Finish(); *prefix_index = builder.Finish(prefix_extractor);
} }
return s; return s;
} }
uint32_t BlockPrefixIndex::GetBlocks(const Slice& key, uint32_t** blocks) { uint32_t BlockPrefixIndex::GetBlocks(const Slice& key, uint32_t** blocks) {
Slice prefix = internal_prefix_extractor_->Transform(key); Slice prefix = internal_prefix_extractor_.Transform(key);
uint32_t bucket = PrefixToBucket(prefix, num_buckets_); uint32_t bucket = PrefixToBucket(prefix, num_buckets_);
uint32_t block_id = buckets_[bucket]; uint32_t block_id = buckets_[bucket];

@ -5,6 +5,8 @@
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>
#include "db/dbformat.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -31,6 +33,8 @@ class BlockPrefixIndex {
} }
// Create hash index by reading from the metadata blocks. // Create hash index by reading from the metadata blocks.
// Note: table reader (caller) is responsible for keeping shared_ptr to
// underlying prefix extractor
// @params prefixes: a sequence of prefixes. // @params prefixes: a sequence of prefixes.
// @params prefix_meta: contains the "metadata" to of the prefixes. // @params prefix_meta: contains the "metadata" to of the prefixes.
static Status Create(const SliceTransform* hash_key_extractor, static Status Create(const SliceTransform* hash_key_extractor,
@ -46,17 +50,17 @@ class BlockPrefixIndex {
class Builder; class Builder;
friend Builder; friend Builder;
BlockPrefixIndex(const SliceTransform* internal_prefix_extractor, BlockPrefixIndex(const SliceTransform* prefix_extractor, uint32_t num_buckets,
uint32_t num_buckets, uint32_t* buckets, uint32_t* buckets, uint32_t num_block_array_buffer_entries,
uint32_t num_block_array_buffer_entries,
uint32_t* block_array_buffer) uint32_t* block_array_buffer)
: internal_prefix_extractor_(internal_prefix_extractor), : internal_prefix_extractor_(prefix_extractor),
num_buckets_(num_buckets), num_buckets_(num_buckets),
num_block_array_buffer_entries_(num_block_array_buffer_entries), num_block_array_buffer_entries_(num_block_array_buffer_entries),
buckets_(buckets), buckets_(buckets),
block_array_buffer_(block_array_buffer) {} block_array_buffer_(block_array_buffer) {}
const SliceTransform* internal_prefix_extractor_; InternalKeySliceTransform internal_prefix_extractor_;
uint32_t num_buckets_; uint32_t num_buckets_;
uint32_t num_block_array_buffer_entries_; uint32_t num_block_array_buffer_entries_;
uint32_t* buckets_; uint32_t* buckets_;

@ -95,8 +95,8 @@ Status HashIndexReader::Create(const BlockBasedTable* table,
} }
BlockPrefixIndex* prefix_index = nullptr; BlockPrefixIndex* prefix_index = nullptr;
assert(rep->internal_prefix_transform.get() != nullptr); assert(rep->table_prefix_extractor);
s = BlockPrefixIndex::Create(rep->internal_prefix_transform.get(), s = BlockPrefixIndex::Create(rep->table_prefix_extractor.get(),
prefixes_contents.data, prefixes_contents.data,
prefixes_meta_contents.data, &prefix_index); prefixes_meta_contents.data, &prefix_index);
// TODO: log error // TODO: log error

Loading…
Cancel
Save