diff --git a/HISTORY.md b/HISTORY.md index fc6c55874..a07337055 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,7 @@ # Rocksdb Change Log ## Unreleased + +## 5.7.8 (08/14/2017) ### New Features * Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators. * Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1. diff --git a/include/rocksdb/version.h b/include/rocksdb/version.h index fb920cf2e..dd11ea7e8 100644 --- a/include/rocksdb/version.h +++ b/include/rocksdb/version.h @@ -5,7 +5,7 @@ #pragma once #define ROCKSDB_MAJOR 5 -#define ROCKSDB_MINOR 7 +#define ROCKSDB_MINOR 8 #define ROCKSDB_PATCH 0 // Do not use these. We made the mistake of declaring macros starting with diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 89e0c7354..7e236e8bf 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -186,23 +186,87 @@ class PartitionIndexReader : public IndexReader, public Cleanable { // Filters are already checked before seeking the index const bool skip_filters = true; const bool is_index = true; - Cleanable* block_cache_cleaner = nullptr; - const bool pin_cached_indexes = - level_ == 0 && - table_->rep_->table_options.pin_l0_filter_and_index_blocks_in_cache; - if (pin_cached_indexes) { - // Keep partition indexes into the cache as long as the partition index - // reader object is alive - block_cache_cleaner = this; - } return NewTwoLevelIterator( new BlockBasedTable::BlockEntryIteratorState( table_, ReadOptions(), icomparator_, skip_filters, is_index, - block_cache_cleaner), + partition_map_.size() ? &partition_map_ : nullptr), index_block_->NewIterator(icomparator_, nullptr, true)); // TODO(myabandeh): Update TwoLevelIterator to be able to make use of - // on-stack - // BlockIter while the state is on heap + // on-stack BlockIter while the state is on heap. Currentlly it assumes + // the first level iter is always on heap and will attempt to delete it + // in its destructor. + } + + virtual void CacheDependencies(bool pin) override { + // Before read partitions, prefetch them to avoid lots of IOs + auto rep = table_->rep_; + BlockIter biter; + BlockHandle handle; + index_block_->NewIterator(icomparator_, &biter, true); + // Index partitions are assumed to be consecuitive. Prefetch them all. + // Read the first block offset + biter.SeekToFirst(); + Slice input = biter.value(); + Status s = handle.DecodeFrom(&input); + assert(s.ok()); + if (!s.ok()) { + ROCKS_LOG_WARN(rep->ioptions.info_log, + "Could not read first index partition"); + return; + } + uint64_t prefetch_off = handle.offset(); + + // Read the last block's offset + biter.SeekToLast(); + input = biter.value(); + s = handle.DecodeFrom(&input); + assert(s.ok()); + if (!s.ok()) { + ROCKS_LOG_WARN(rep->ioptions.info_log, + "Could not read last index partition"); + return; + } + uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize; + uint64_t prefetch_len = last_off - prefetch_off; + std::unique_ptr prefetch_buffer; + auto& file = table_->rep_->file; + prefetch_buffer.reset(new FilePrefetchBuffer()); + s = prefetch_buffer->Prefetch(file.get(), prefetch_off, prefetch_len); + + // After prefetch, read the partitions one by one + biter.SeekToFirst(); + auto ro = ReadOptions(); + Cache* block_cache = rep->table_options.block_cache.get(); + for (; biter.Valid(); biter.Next()) { + input = biter.value(); + s = handle.DecodeFrom(&input); + assert(s.ok()); + if (!s.ok()) { + ROCKS_LOG_WARN(rep->ioptions.info_log, + "Could not read index partition"); + continue; + } + + BlockBasedTable::CachableEntry block; + Slice compression_dict; + if (rep->compression_dict_block) { + compression_dict = rep->compression_dict_block->data; + } + const bool is_index = true; + s = table_->MaybeLoadDataBlockToCache(prefetch_buffer.get(), rep, ro, + handle, compression_dict, &block, + is_index); + + if (s.ok() && block.value != nullptr) { + assert(block.cache_handle != nullptr); + if (pin) { + partition_map_[handle.offset()] = block; + RegisterCleanup(&ReleaseCachedEntry, block_cache, block.cache_handle); + } else { + block_cache->Release(block.cache_handle); + } + } + } } virtual size_t size() const override { return index_block_->size(); } @@ -222,13 +286,12 @@ class PartitionIndexReader : public IndexReader, public Cleanable { const int level) : IndexReader(icomparator, stats), table_(table), - index_block_(std::move(index_block)), - level_(level) { + index_block_(std::move(index_block)) { assert(index_block_ != nullptr); } BlockBasedTable* table_; std::unique_ptr index_block_; - int level_; + std::map> partition_map_; }; // Index that allows binary search lookup for the first key of each block. @@ -708,10 +771,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, } else { if (found_range_del_block && !rep->range_del_handle.IsNull()) { ReadOptions read_options; - // TODO: try to use prefetched buffer too. - s = MaybeLoadDataBlockToCache(rep, read_options, rep->range_del_handle, - Slice() /* compression_dict */, - &rep->range_del_entry); + s = MaybeLoadDataBlockToCache( + prefetch_buffer.get(), rep, read_options, rep->range_del_handle, + Slice() /* compression_dict */, &rep->range_del_entry); if (!s.ok()) { ROCKS_LOG_WARN( rep->ioptions.info_log, @@ -740,21 +802,22 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // Always prefetch index and filter for level 0 if (table_options.cache_index_and_filter_blocks) { if (prefetch_index_and_filter_in_cache || level == 0) { + const bool pin = + rep->table_options.pin_l0_filter_and_index_blocks_in_cache && + level == 0; assert(table_options.block_cache != nullptr); // Hack: Call NewIndexIterator() to implicitly add index to the // block_cache - // if pin_l0_filter_and_index_blocks_in_cache is true and this is - // a level0 file, then we will pass in this pointer to rep->index - // to NewIndexIterator(), which will save the index block in there - // else it's a nullptr and nothing special happens - CachableEntry* index_entry = nullptr; - if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache && - level == 0) { - index_entry = &rep->index_entry; - } + CachableEntry index_entry; unique_ptr iter( - new_table->NewIndexIterator(ReadOptions(), nullptr, index_entry)); + new_table->NewIndexIterator(ReadOptions(), nullptr, &index_entry)); + index_entry.value->CacheDependencies(pin); + if (pin) { + rep->index_entry = std::move(index_entry); + } else { + index_entry.Release(table_options.block_cache.get()); + } s = iter->status(); if (s.ok()) { @@ -764,8 +827,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // a level0 file, then save it in rep_->filter_entry; it will be // released in the destructor only, hence it will be pinned in the // cache while this reader is alive - if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache && - level == 0) { + if (pin) { rep->filter_entry = filter_entry; if (rep->filter_entry.value != nullptr) { rep->filter_entry.value->SetLevel(level); @@ -1305,8 +1367,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( if (rep->compression_dict_block) { compression_dict = rep->compression_dict_block->data; } - s = MaybeLoadDataBlockToCache(rep, ro, handle, compression_dict, &block, - is_index); + s = MaybeLoadDataBlockToCache(nullptr /*prefetch_buffer*/, rep, ro, handle, + compression_dict, &block, is_index); } // Didn't get any data from block caches. @@ -1355,8 +1417,9 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } Status BlockBasedTable::MaybeLoadDataBlockToCache( - Rep* rep, const ReadOptions& ro, const BlockHandle& handle, - Slice compression_dict, CachableEntry* block_entry, bool is_index) { + FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro, + const BlockHandle& handle, Slice compression_dict, + CachableEntry* block_entry, bool is_index) { const bool no_io = (ro.read_tier == kBlockCacheTier); Cache* block_cache = rep->table_options.block_cache.get(); Cache* block_cache_compressed = @@ -1392,12 +1455,11 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( std::unique_ptr raw_block; { StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS); - s = ReadBlockFromFile(rep->file.get(), nullptr /* prefetch_buffer*/, - rep->footer, ro, handle, &raw_block, - rep->ioptions, block_cache_compressed == nullptr, - compression_dict, rep->persistent_cache_options, - rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit); + s = ReadBlockFromFile( + rep->file.get(), prefetch_buffer, rep->footer, ro, handle, + &raw_block, rep->ioptions, block_cache_compressed == nullptr, + compression_dict, rep->persistent_cache_options, rep->global_seqno, + rep->table_options.read_amp_bytes_per_bit); } if (s.ok()) { @@ -1420,14 +1482,14 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( BlockBasedTable::BlockEntryIteratorState::BlockEntryIteratorState( BlockBasedTable* table, const ReadOptions& read_options, const InternalKeyComparator* icomparator, bool skip_filters, bool is_index, - Cleanable* block_cache_cleaner) + std::map>* block_map) : TwoLevelIteratorState(table->rep_->ioptions.prefix_extractor != nullptr), table_(table), read_options_(read_options), icomparator_(icomparator), skip_filters_(skip_filters), is_index_(is_index), - block_cache_cleaner_(block_cache_cleaner) {} + block_map_(block_map) {} InternalIterator* BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator( @@ -1436,23 +1498,15 @@ BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator( BlockHandle handle; Slice input = index_value; Status s = handle.DecodeFrom(&input); - auto iter = NewDataBlockIterator(table_->rep_, read_options_, handle, nullptr, - is_index_, s); - if (block_cache_cleaner_) { - uint64_t offset = handle.offset(); - { - ReadLock rl(&cleaner_mu); - if (cleaner_set.find(offset) != cleaner_set.end()) { - // already have a reference to the block cache objects - return iter; - } - } - WriteLock wl(&cleaner_mu); - cleaner_set.insert(offset); - // Keep the data into cache until the cleaner cleansup - iter->DelegateCleanupsTo(block_cache_cleaner_); - } - return iter; + auto rep = table_->rep_; + if (block_map_) { + auto block = block_map_->find(handle.offset()); + assert(block != block_map_->end()); + return block->second.value->NewIterator(&rep->internal_comparator, nullptr, + true, rep->ioptions.statistics); + } + return NewDataBlockIterator(rep, read_options_, handle, nullptr, is_index_, + s); } bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch( diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 457edce22..640a70656 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -181,6 +181,8 @@ class BlockBasedTable : public TableReader { // that was allocated in block cache. virtual size_t ApproximateMemoryUsage() const = 0; + virtual void CacheDependencies(bool /* unused */) {} + protected: const InternalKeyComparator* icomparator_; @@ -227,7 +229,8 @@ class BlockBasedTable : public TableReader { // @param block_entry value is set to the uncompressed block if found. If // in uncompressed block cache, also sets cache_handle to reference that // block. - static Status MaybeLoadDataBlockToCache(Rep* rep, const ReadOptions& ro, + static Status MaybeLoadDataBlockToCache(FilePrefetchBuffer* prefetch_buffer, + Rep* rep, const ReadOptions& ro, const BlockHandle& handle, Slice compression_dict, CachableEntry* block_entry, @@ -345,11 +348,11 @@ class BlockBasedTable : public TableReader { // Maitaning state of a two-level iteration on a partitioned index structure class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState { public: - BlockEntryIteratorState(BlockBasedTable* table, - const ReadOptions& read_options, - const InternalKeyComparator* icomparator, - bool skip_filters, bool is_index = false, - Cleanable* block_cache_cleaner = nullptr); + BlockEntryIteratorState( + BlockBasedTable* table, const ReadOptions& read_options, + const InternalKeyComparator* icomparator, bool skip_filters, + bool is_index = false, + std::map>* block_map = nullptr); InternalIterator* NewSecondaryIterator(const Slice& index_value) override; bool PrefixMayMatch(const Slice& internal_key) override; bool KeyReachedUpperBound(const Slice& internal_key) override; @@ -362,8 +365,7 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState { bool skip_filters_; // true if the 2nd level iterator is on indexes instead of on user data. bool is_index_; - Cleanable* block_cache_cleaner_; - std::set cleaner_set; + std::map>* block_map_; port::RWMutex cleaner_mu; }; diff --git a/table/table_test.cc b/table/table_test.cc index c55eb4255..178cf4243 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -2174,7 +2174,7 @@ std::map MockCache::marked_data_in_cache_; // Block cache can contain raw data blocks as well as general objects. If an // object depends on the table to be live, it then must be destructed before the -// table is closed. This test makese sure that the only items remains in the +// table is closed. This test makes sure that the only items remains in the // cache after the table is closed are raw data blocks. TEST_F(BlockBasedTableTest, NoObjectInCacheAfterTableClose) { for (auto index_type :