// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "table/block_based/partitioned_filter_block.h" #ifdef ROCKSDB_MALLOC_USABLE_SIZE #ifdef OS_FREEBSD #include #else #include #endif #endif #include #include "monitoring/perf_context_imp.h" #include "port/port.h" #include "rocksdb/filter_policy.h" #include "table/block_based/block.h" #include "table/block_based/block_based_table_reader.h" #include "util/coding.h" namespace rocksdb { PartitionedFilterBlockBuilder::PartitionedFilterBlockBuilder( const SliceTransform* prefix_extractor, bool whole_key_filtering, FilterBitsBuilder* filter_bits_builder, int index_block_restart_interval, const bool use_value_delta_encoding, PartitionedIndexBuilder* const p_index_builder, const uint32_t partition_size) : FullFilterBlockBuilder(prefix_extractor, whole_key_filtering, filter_bits_builder), index_on_filter_block_builder_(index_block_restart_interval, true /*use_delta_encoding*/, use_value_delta_encoding), index_on_filter_block_builder_without_seq_(index_block_restart_interval, true /*use_delta_encoding*/, use_value_delta_encoding), p_index_builder_(p_index_builder), filters_in_partition_(0), num_added_(0) { filters_per_partition_ = filter_bits_builder_->CalculateNumEntry(partition_size); } PartitionedFilterBlockBuilder::~PartitionedFilterBlockBuilder() {} void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock() { // Use == to send the request only once if (filters_in_partition_ == filters_per_partition_) { // Currently only index builder is in charge of cutting a partition. We keep // requesting until it is granted. p_index_builder_->RequestPartitionCut(); } if (!p_index_builder_->ShouldCutFilterBlock()) { return; } filter_gc.push_back(std::unique_ptr(nullptr)); Slice filter = filter_bits_builder_->Finish(&filter_gc.back()); std::string& index_key = p_index_builder_->GetPartitionKey(); filters.push_back({index_key, filter}); filters_in_partition_ = 0; Reset(); } void PartitionedFilterBlockBuilder::AddKey(const Slice& key) { MaybeCutAFilterBlock(); filter_bits_builder_->AddKey(key); filters_in_partition_++; num_added_++; } Slice PartitionedFilterBlockBuilder::Finish( const BlockHandle& last_partition_block_handle, Status* status) { if (finishing_filters == true) { // Record the handle of the last written filter block in the index FilterEntry& last_entry = filters.front(); std::string handle_encoding; last_partition_block_handle.EncodeTo(&handle_encoding); std::string handle_delta_encoding; PutVarsignedint64( &handle_delta_encoding, last_partition_block_handle.size() - last_encoded_handle_.size()); last_encoded_handle_ = last_partition_block_handle; const Slice handle_delta_encoding_slice(handle_delta_encoding); index_on_filter_block_builder_.Add(last_entry.key, handle_encoding, &handle_delta_encoding_slice); if (!p_index_builder_->seperator_is_key_plus_seq()) { index_on_filter_block_builder_without_seq_.Add( ExtractUserKey(last_entry.key), handle_encoding, &handle_delta_encoding_slice); } filters.pop_front(); } else { MaybeCutAFilterBlock(); } // If there is no filter partition left, then return the index on filter // partitions if (UNLIKELY(filters.empty())) { *status = Status::OK(); if (finishing_filters) { if (p_index_builder_->seperator_is_key_plus_seq()) { return index_on_filter_block_builder_.Finish(); } else { return index_on_filter_block_builder_without_seq_.Finish(); } } else { // This is the rare case where no key was added to the filter return Slice(); } } else { // Return the next filter partition in line and set Incomplete() status to // indicate we expect more calls to Finish *status = Status::Incomplete(); finishing_filters = true; return filters.front().filter; } } PartitionedFilterBlockReader::PartitionedFilterBlockReader( const SliceTransform* prefix_extractor, bool _whole_key_filtering, BlockContents&& contents, FilterBitsReader* /*filter_bits_reader*/, Statistics* stats, const InternalKeyComparator comparator, const BlockBasedTable* table, const bool index_key_includes_seq, const bool index_value_is_full) : FilterBlockReader(contents.data.size(), stats, _whole_key_filtering), prefix_extractor_(prefix_extractor), comparator_(comparator), table_(table), index_key_includes_seq_(index_key_includes_seq), index_value_is_full_(index_value_is_full) { idx_on_fltr_blk_.reset(new Block(std::move(contents), kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */, stats)); } PartitionedFilterBlockReader::~PartitionedFilterBlockReader() { // TODO(myabandeh): if instead of filter object we store only the blocks in // block cache, then we don't have to manually earse them from block cache // here. auto block_cache = table_->rep_->table_options.block_cache.get(); if (UNLIKELY(block_cache == nullptr)) { return; } char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length]; IndexBlockIter biter; BlockHandle handle; Statistics* kNullStats = nullptr; idx_on_fltr_blk_->NewIndexIterator( &comparator_, comparator_.user_comparator(), &biter, kNullStats, true, /* have_first_key */ false, index_key_includes_seq_, index_value_is_full_); biter.SeekToFirst(); for (; biter.Valid(); biter.Next()) { handle = biter.value().handle; auto key = BlockBasedTable::GetCacheKey(table_->rep_->cache_key_prefix, table_->rep_->cache_key_prefix_size, handle, cache_key); block_cache->Erase(key); } } bool PartitionedFilterBlockReader::KeyMayMatch( const Slice& key, const SliceTransform* prefix_extractor, uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr, BlockCacheLookupContext* context) { assert(const_ikey_ptr != nullptr); assert(block_offset == kNotValid); if (!whole_key_filtering_) { return true; } if (UNLIKELY(idx_on_fltr_blk_->size() == 0)) { return true; } auto filter_handle = GetFilterPartitionHandle(*const_ikey_ptr); if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range return false; } auto filter_partition = GetFilterPartition(/*prefetch_buffer=*/nullptr, filter_handle, no_io, prefix_extractor, context); if (UNLIKELY(!filter_partition.GetValue())) { return true; } return filter_partition.GetValue()->KeyMayMatch( key, prefix_extractor, block_offset, no_io, /*const_ikey_ptr=*/nullptr, context); } bool PartitionedFilterBlockReader::PrefixMayMatch( const Slice& prefix, const SliceTransform* prefix_extractor, uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr, BlockCacheLookupContext* context) { #ifdef NDEBUG (void)block_offset; #endif assert(const_ikey_ptr != nullptr); assert(block_offset == kNotValid); if (!prefix_extractor_ && !prefix_extractor) { return true; } if (UNLIKELY(idx_on_fltr_blk_->size() == 0)) { return true; } auto filter_handle = GetFilterPartitionHandle(*const_ikey_ptr); if (UNLIKELY(filter_handle.size() == 0)) { // prefix is out of range return false; } auto filter_partition = GetFilterPartition(/*prefetch_buffer=*/nullptr, filter_handle, no_io, prefix_extractor, context); if (UNLIKELY(!filter_partition.GetValue())) { return true; } return filter_partition.GetValue()->PrefixMayMatch( prefix, prefix_extractor, kNotValid, no_io, /*const_ikey_ptr=*/nullptr, context); } BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle( const Slice& entry) { IndexBlockIter iter; Statistics* kNullStats = nullptr; idx_on_fltr_blk_->NewIndexIterator( &comparator_, comparator_.user_comparator(), &iter, kNullStats, true, /* have_first_key */ false, index_key_includes_seq_, index_value_is_full_); iter.Seek(entry); if (UNLIKELY(!iter.Valid())) { return BlockHandle(0, 0); } assert(iter.Valid()); BlockHandle fltr_blk_handle = iter.value().handle; return fltr_blk_handle; } CachableEntry PartitionedFilterBlockReader::GetFilterPartition( FilePrefetchBuffer* prefetch_buffer, BlockHandle& fltr_blk_handle, const bool no_io, const SliceTransform* prefix_extractor, BlockCacheLookupContext* context) { const bool is_a_filter_partition = true; auto block_cache = table_->rep_->table_options.block_cache.get(); if (LIKELY(block_cache != nullptr)) { if (filter_map_.size() != 0) { auto iter = filter_map_.find(fltr_blk_handle.offset()); // This is a possible scenario since block cache might not have had space // for the partition if (iter != filter_map_.end()) { return {iter->second.GetValue(), nullptr /* cache */, nullptr /* cache_handle */, false /* own_value */}; } } return table_->GetFilter(/*prefetch_buffer=*/nullptr, fltr_blk_handle, is_a_filter_partition, no_io, /*get_context=*/nullptr, context, prefix_extractor); } else { auto filter = table_->ReadFilter(prefetch_buffer, fltr_blk_handle, is_a_filter_partition, prefix_extractor); return {filter, nullptr /* cache */, nullptr /* cache_handle */, true /* own_value */}; } } size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const { size_t usage = idx_on_fltr_blk_->usable_size(); #ifdef ROCKSDB_MALLOC_USABLE_SIZE usage += malloc_usable_size((void*)this); #else usage += sizeof(*this); #endif // ROCKSDB_MALLOC_USABLE_SIZE return usage; // TODO(myabandeh): better estimation for filter_map_ size } // TODO(myabandeh): merge this with the same function in IndexReader void PartitionedFilterBlockReader::CacheDependencies( bool pin, const SliceTransform* prefix_extractor) { // Before read partitions, prefetch them to avoid lots of IOs BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch}; IndexBlockIter biter; Statistics* kNullStats = nullptr; idx_on_fltr_blk_->NewIndexIterator( &comparator_, comparator_.user_comparator(), &biter, kNullStats, true, /* have_first_key */ false, index_key_includes_seq_, index_value_is_full_); // Index partitions are assumed to be consecuitive. Prefetch them all. // Read the first block offset biter.SeekToFirst(); BlockHandle handle = biter.value().handle; uint64_t prefetch_off = handle.offset(); // Read the last block's offset biter.SeekToLast(); handle = biter.value().handle; uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize; uint64_t prefetch_len = last_off - prefetch_off; std::unique_ptr prefetch_buffer; auto& file = table_->rep_->file; prefetch_buffer.reset(new FilePrefetchBuffer()); Status s; s = prefetch_buffer->Prefetch(file.get(), prefetch_off, static_cast(prefetch_len)); // After prefetch, read the partitions one by one biter.SeekToFirst(); for (; biter.Valid(); biter.Next()) { handle = biter.value().handle; const bool no_io = true; const bool is_a_filter_partition = true; auto filter = table_->GetFilter( prefetch_buffer.get(), handle, is_a_filter_partition, !no_io, /*get_context=*/nullptr, &lookup_context, prefix_extractor); if (LIKELY(filter.IsCached())) { if (pin) { filter_map_[handle.offset()] = std::move(filter); } } } } } // namespace rocksdb