diff --git a/db/builder.cc b/db/builder.cc index ae6015003..317c9b054 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -63,7 +63,7 @@ Status BuildTable( const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, const Env::IOPriority io_priority, - TableProperties* table_properties) { + TableProperties* table_properties, int level) { // Reports the IOStats for flush for every following bytes. const size_t kReportFlushIOStatsEvery = 1048576; Status s; @@ -149,7 +149,8 @@ Status BuildTable( ReadOptions(), env_options, internal_comparator, meta->fd, nullptr, (internal_stats == nullptr) ? nullptr : internal_stats->GetFileReadHist(0), - false)); + false /* for_compaction */, nullptr /* arena */, + false /* skip_filter */, level)); s = it->status(); if (s.ok() && paranoid_file_checks) { for (it->SeekToFirst(); it->Valid(); it->Next()) { diff --git a/db/builder.h b/db/builder.h index b4b72b7d7..1eba6da9c 100644 --- a/db/builder.h +++ b/db/builder.h @@ -61,6 +61,6 @@ extern Status BuildTable( const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, const Env::IOPriority io_priority = Env::IO_HIGH, - TableProperties* table_properties = nullptr); + TableProperties* table_properties = nullptr, int level = -1); } // namespace rocksdb diff --git a/db/c.cc b/db/c.cc index 85e911491..9f49aba23 100644 --- a/db/c.cc +++ b/db/c.cc @@ -1288,6 +1288,11 @@ void rocksdb_block_based_options_set_cache_index_and_filter_blocks( options->rep.cache_index_and_filter_blocks = v; } +void rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache( + rocksdb_block_based_table_options_t* options, unsigned char v) { + options->rep.pin_l0_filter_and_index_blocks_in_cache = v; +} + void rocksdb_block_based_options_set_skip_table_builder_flush( rocksdb_block_based_table_options_t* options, unsigned char v) { options->rep.skip_table_builder_flush = v; diff --git a/db/column_family.h b/db/column_family.h index ce5f409c4..1a4036e60 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -465,6 +465,8 @@ class ColumnFamilySet { // Don't call while iterating over ColumnFamilySet void FreeDeadColumnFamilies(); + Cache* get_table_cache() { return table_cache_; } + private: friend class ColumnFamilyData; // helper function that gets called from cfd destructor diff --git a/db/db_impl.cc b/db/db_impl.cc index 6124313b8..db5390bca 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -405,6 +405,21 @@ DBImpl::~DBImpl() { } logs_.clear(); + // Table cache may have table handles holding blocks from the block cache. + // We need to release them before the block cache is destroyed. The block + // cache may be destroyed inside versions_.reset(), when column family data + // list is destroyed, so leaving handles in table cache after + // versions_.reset() may cause issues. + // Here we clean all unreferenced handles in table cache. + // Now we assume all user queries have finished, so only version set itself + // can possibly hold the blocks from block cache. After releasing unreferenced + // handles here, only handles held by version set left and inside + // versions_.reset(), we will release them. There, we need to make sure every + // time a handle is released, we erase it from the cache too. By doing that, + // we can guarantee that after versions_.reset(), table cache is empty + // so the cache can be safely destroyed. + table_cache_->EraseUnRefEntries(); + // versions need to be destroyed before table_cache since it can hold // references to table_cache. versions_.reset(); diff --git a/db/db_test2.cc b/db/db_test2.cc index f7d18dae2..595b8bede 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -13,6 +13,11 @@ namespace rocksdb { +static uint64_t TestGetTickerCount(const Options& options, + Tickers ticker_type) { + return options.statistics->getTickerCount(ticker_type); +} + class DBTest2 : public DBTestBase { public: DBTest2() : DBTestBase("/db_test2") {} @@ -675,6 +680,119 @@ TEST_F(DBTest2, DISABLED_FirstSnapshotTest) { db_->ReleaseSnapshot(s1); } + +class PinL0IndexAndFilterBlocksTest : public DBTestBase, + public testing::WithParamInterface { + public: + PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {} + virtual void SetUp() override { infinite_max_files_ = GetParam(); } + + bool infinite_max_files_; +}; + +TEST_P(PinL0IndexAndFilterBlocksTest, + IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) { + Options options = CurrentOptions(); + if (infinite_max_files_) { + options.max_open_files = -1; + } + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + table_options.pin_l0_filter_and_index_blocks_in_cache = true; + table_options.filter_policy.reset(NewBloomFilterPolicy(20)); + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + CreateAndReopenWithCF({"pikachu"}, options); + + ASSERT_OK(Put(1, "key", "val")); + // Create a new table. + ASSERT_OK(Flush(1)); + + // index/filter blocks added to block cache right after table creation. + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); + + // only index/filter were added + ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS)); + + std::string value; + // Miss and hit count should remain the same, they're all pinned. + db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); + + // Miss and hit count should remain the same, they're all pinned. + value = Get(1, "key"); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); +} + +TEST_P(PinL0IndexAndFilterBlocksTest, + MultiLevelIndexAndFilterBlocksCachedWithPinning) { + Options options = CurrentOptions(); + if (infinite_max_files_) { + options.max_open_files = -1; + } + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + table_options.pin_l0_filter_and_index_blocks_in_cache = true; + table_options.filter_policy.reset(NewBloomFilterPolicy(20)); + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + CreateAndReopenWithCF({"pikachu"}, options); + + Put(1, "a", "begin"); + Put(1, "z", "end"); + ASSERT_OK(Flush(1)); + // move this table to L1 + dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); + + // reset block cache + table_options.block_cache = NewLRUCache(64 * 1024); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + TryReopenWithColumnFamilies({"default", "pikachu"}, options); + // create new table at L0 + Put(1, "a2", "begin2"); + Put(1, "z2", "end2"); + ASSERT_OK(Flush(1)); + + table_options.block_cache->EraseUnRefEntries(); + + // get base cache values + uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS); + uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT); + uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS); + uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT); + + std::string value; + // this should be read from L0 + // so cache values don't change + value = Get(1, "a2"); + ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); + + // this should be read from L1 + // the file is opened, prefetching results in a cache filter miss + // the block is loaded and added to the cache, + // then the get results in a cache hit for L1 + value = Get(1, "a"); + ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); +} + +INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest, + PinL0IndexAndFilterBlocksTest, ::testing::Bool()); } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/flush_job.cc b/db/flush_job.cc index b83f9dbe6..b4e5b307f 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -234,14 +234,14 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); - s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_, - cfd_->table_cache(), iter.get(), meta, - cfd_->internal_comparator(), - cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), - existing_snapshots_, earliest_write_conflict_snapshot_, - output_compression_, cfd_->ioptions()->compression_opts, - mutable_cf_options_.paranoid_file_checks, - cfd_->internal_stats(), Env::IO_HIGH, &table_properties_); + s = BuildTable( + dbname_, db_options_.env, *cfd_->ioptions(), env_options_, + cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(), + cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), + existing_snapshots_, earliest_write_conflict_snapshot_, + output_compression_, cfd_->ioptions()->compression_opts, + mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), + Env::IO_HIGH, &table_properties_, 0 /* level */); info.table_properties = table_properties_; LogFlush(db_options_.info_log); } diff --git a/db/table_cache.cc b/db/table_cache.cc index 2a4621b7e..f8c81f9f9 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -88,7 +88,7 @@ Status TableCache::GetTableReader( const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, - unique_ptr* table_reader, bool skip_filters) { + unique_ptr* table_reader, bool skip_filters, int level) { std::string fname = TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); unique_ptr file; @@ -109,18 +109,26 @@ Status TableCache::GetTableReader( file_read_hist)); s = ioptions_.table_factory->NewTableReader( TableReaderOptions(ioptions_, env_options, internal_comparator, - skip_filters), + skip_filters, level), std::move(file_reader), fd.GetFileSize(), table_reader); TEST_SYNC_POINT("TableCache::GetTableReader:0"); } return s; } +void TableCache::EraseHandle(const FileDescriptor& fd, Cache::Handle* handle) { + ReleaseHandle(handle); + uint64_t number = fd.GetNumber(); + Slice key = GetSliceForFileNumber(&number); + cache_->Erase(key); +} + Status TableCache::FindTable(const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, Cache::Handle** handle, const bool no_io, bool record_read_stats, - HistogramImpl* file_read_hist, bool skip_filters) { + HistogramImpl* file_read_hist, bool skip_filters, + int level) { PERF_TIMER_GUARD(find_table_nanos); Status s; uint64_t number = fd.GetNumber(); @@ -136,7 +144,7 @@ Status TableCache::FindTable(const EnvOptions& env_options, unique_ptr table_reader; s = GetTableReader(env_options, internal_comparator, fd, false /* sequential mode */, record_read_stats, - file_read_hist, &table_reader, skip_filters); + file_read_hist, &table_reader, skip_filters, level); if (!s.ok()) { assert(table_reader == nullptr); RecordTick(ioptions_.statistics, NO_FILE_ERRORS); @@ -158,7 +166,7 @@ InternalIterator* TableCache::NewIterator( const ReadOptions& options, const EnvOptions& env_options, const InternalKeyComparator& icomparator, const FileDescriptor& fd, TableReader** table_reader_ptr, HistogramImpl* file_read_hist, - bool for_compaction, Arena* arena, bool skip_filters) { + bool for_compaction, Arena* arena, bool skip_filters, int level) { PERF_TIMER_GUARD(new_table_iterator_nanos); if (table_reader_ptr != nullptr) { @@ -173,7 +181,8 @@ InternalIterator* TableCache::NewIterator( unique_ptr table_reader_unique_ptr; Status s = GetTableReader( env_options, icomparator, fd, /* sequential mode */ true, - /* record stats */ false, nullptr, &table_reader_unique_ptr); + /* record stats */ false, nullptr, &table_reader_unique_ptr, + false /* skip_filters */, level); if (!s.ok()) { return NewErrorInternalIterator(s, arena); } @@ -184,7 +193,7 @@ InternalIterator* TableCache::NewIterator( Status s = FindTable(env_options, icomparator, fd, &handle, options.read_tier == kBlockCacheTier /* no_io */, !for_compaction /* record read_stats */, - file_read_hist, skip_filters); + file_read_hist, skip_filters, level); if (!s.ok()) { return NewErrorInternalIterator(s, arena); } @@ -216,7 +225,7 @@ Status TableCache::Get(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, const Slice& k, GetContext* get_context, HistogramImpl* file_read_hist, - bool skip_filters) { + bool skip_filters, int level) { TableReader* t = fd.table_reader; Status s; Cache::Handle* handle = nullptr; @@ -265,7 +274,8 @@ Status TableCache::Get(const ReadOptions& options, if (!t) { s = FindTable(env_options_, internal_comparator, fd, &handle, options.read_tier == kBlockCacheTier /* no_io */, - true /* record_read_stats */, file_read_hist, skip_filters); + true /* record_read_stats */, file_read_hist, skip_filters, + level); if (s.ok()) { t = GetTableReaderFromHandle(handle); } diff --git a/db/table_cache.h b/db/table_cache.h index f8416e0b4..fbb7cacbf 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -45,34 +45,41 @@ class TableCache { // the cache and should not be deleted, and is valid for as long as the // returned iterator is live. // @param skip_filters Disables loading/accessing the filter block + // @param level The level this table is at, -1 for "not set / don't know" InternalIterator* NewIterator( const ReadOptions& options, const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, TableReader** table_reader_ptr = nullptr, HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, - Arena* arena = nullptr, bool skip_filters = false); + Arena* arena = nullptr, bool skip_filters = false, int level = -1); // If a seek to internal key "k" in specified file finds an entry, // call (*handle_result)(arg, found_key, found_value) repeatedly until // it returns false. // @param skip_filters Disables loading/accessing the filter block + // @param level The level this table is at, -1 for "not set / don't know" Status Get(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, const Slice& k, GetContext* get_context, HistogramImpl* file_read_hist = nullptr, - bool skip_filters = false); + bool skip_filters = false, int level = -1); // Evict any entry for the specified file number static void Evict(Cache* cache, uint64_t file_number); + // Clean table handle and erase it from the table cache + // Used in DB close, or the file is not live anymore. + void EraseHandle(const FileDescriptor& fd, Cache::Handle* handle); + // Find table reader // @param skip_filters Disables loading/accessing the filter block + // @param level == -1 means not specified Status FindTable(const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, Cache::Handle**, const bool no_io = false, bool record_read_stats = true, HistogramImpl* file_read_hist = nullptr, - bool skip_filters = false); + bool skip_filters = false, int level = -1); // Get TableReader from a cache handle. TableReader* GetTableReaderFromHandle(Cache::Handle* handle); @@ -106,7 +113,7 @@ class TableCache { const FileDescriptor& fd, bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, unique_ptr* table_reader, - bool skip_filters = false); + bool skip_filters = false, int level = -1); const ImmutableCFOptions& ioptions_; const EnvOptions& env_options_; diff --git a/db/version_builder.cc b/db/version_builder.cc index d0e7640fd..d73b33d8e 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -309,11 +309,11 @@ class VersionBuilder::Rep { auto* file_meta = files_meta[file_idx].first; int level = files_meta[file_idx].second; - table_cache_->FindTable(env_options_, - *(base_vstorage_->InternalComparator()), - file_meta->fd, &file_meta->table_reader_handle, - false /*no_io */, true /* record_read_stats */, - internal_stats->GetFileReadHist(level)); + table_cache_->FindTable( + env_options_, *(base_vstorage_->InternalComparator()), + file_meta->fd, &file_meta->table_reader_handle, false /*no_io */, + true /* record_read_stats */, + internal_stats->GetFileReadHist(level), false, level); if (file_meta->table_reader_handle != nullptr) { // Load table_reader file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle( diff --git a/db/version_set.cc b/db/version_set.cc index 75140ed42..f1dbb94e0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -92,6 +92,7 @@ class FilePicker { const InternalKeyComparator* internal_comparator) : num_levels_(num_levels), curr_level_(static_cast(-1)), + returned_file_level_(static_cast(-1)), hit_file_level_(static_cast(-1)), search_left_bound_(0), search_right_bound_(FileIndexer::kLevelMaxIndex), @@ -118,6 +119,8 @@ class FilePicker { } } + int GetCurrentLevel() { return returned_file_level_; } + FdWithKeyRange* GetNextFile() { while (!search_ended_) { // Loops over different levels. while (curr_index_in_curr_level_ < curr_file_level_->num_files) { @@ -190,6 +193,7 @@ class FilePicker { } prev_file_ = f; #endif + returned_file_level_ = curr_level_; if (curr_level_ > 0 && cmp_largest < 0) { // No more files to search in this level. search_ended_ = !PrepareNextLevel(); @@ -216,6 +220,7 @@ class FilePicker { private: unsigned int num_levels_; unsigned int curr_level_; + unsigned int returned_file_level_; unsigned int hit_file_level_; int32_t search_left_bound_; int32_t search_right_bound_; @@ -322,7 +327,7 @@ Version::~Version() { f->refs--; if (f->refs <= 0) { if (f->table_reader_handle) { - cfd_->table_cache()->ReleaseHandle(f->table_reader_handle); + cfd_->table_cache()->EraseHandle(f->fd, f->table_reader_handle); f->table_reader_handle = nullptr; } vset_->obsolete_files_.push_back(f); @@ -486,7 +491,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { const EnvOptions& env_options, const InternalKeyComparator& icomparator, HistogramImpl* file_read_hist, bool for_compaction, - bool prefix_enabled, bool skip_filters) + bool prefix_enabled, bool skip_filters, int level) : TwoLevelIteratorState(prefix_enabled), table_cache_(table_cache), read_options_(read_options), @@ -494,7 +499,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState { icomparator_(icomparator), file_read_hist_(file_read_hist), for_compaction_(for_compaction), - skip_filters_(skip_filters) {} + skip_filters_(skip_filters), + level_(level) {} InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override { if (meta_handle.size() != sizeof(FileDescriptor)) { @@ -506,7 +512,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { return table_cache_->NewIterator( read_options_, env_options_, icomparator_, *fd, nullptr /* don't need reference to table*/, file_read_hist_, - for_compaction_, nullptr /* arena */, skip_filters_); + for_compaction_, nullptr /* arena */, skip_filters_, level_); } } @@ -522,6 +528,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { HistogramImpl* file_read_hist_; bool for_compaction_; bool skip_filters_; + int level_; }; // A wrapper of version builder which references the current version in @@ -789,7 +796,8 @@ void Version::AddIterators(const ReadOptions& read_options, const auto& file = storage_info_.LevelFilesBrief(0).files[i]; merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr, - cfd_->internal_stats()->GetFileReadHist(0), false, arena)); + cfd_->internal_stats()->GetFileReadHist(0), false, arena, + false /* skip_filters */, 0 /* level */)); } // For levels > 0, we can use a concatenating iterator that sequentially @@ -804,7 +812,7 @@ void Version::AddIterators(const ReadOptions& read_options, cfd_->internal_stats()->GetFileReadHist(level), false /* for_compaction */, cfd_->ioptions()->prefix_extractor != nullptr, - IsFilterSkipped(level)); + IsFilterSkipped(level), level); mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); auto* first_level_iter = new (mem) LevelFileNumIterator( cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level)); @@ -909,7 +917,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, read_options, *internal_comparator(), f->fd, ikey, &get_context, cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), IsFilterSkipped(static_cast(fp.GetHitFileLevel()), - fp.IsHitFileLastInLevel())); + fp.IsHitFileLastInLevel()), + fp.GetCurrentLevel()); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; @@ -2062,9 +2071,16 @@ VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options, env_options_(storage_options), env_options_compactions_(env_options_) {} +void CloseTables(void* ptr, size_t) { + TableReader* table_reader = reinterpret_cast(ptr); + table_reader->Close(); +} + VersionSet::~VersionSet() { // we need to delete column_family_set_ because its destructor depends on // VersionSet + column_family_set_->get_table_cache()->ApplyToAllCacheEntries(&CloseTables, + false); column_family_set_.reset(); for (auto file : obsolete_files_) { delete file; @@ -3275,7 +3291,8 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { read_options, env_options_compactions_, cfd->internal_comparator(), flevel->files[i].fd, nullptr, nullptr, /* no per level latency histogram*/ - true /* for compaction */); + true /* for_compaction */, nullptr /* arena */, + false /* skip_filters */, (int)which /* level */); } } else { // Create concatenating iterator for the files from this level @@ -3285,7 +3302,7 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { cfd->internal_comparator(), nullptr /* no per level latency histogram */, true /* for_compaction */, false /* prefix enabled */, - false /* skip_filters */), + false /* skip_filters */, (int)which /* level */), new LevelFileNumIterator(cfd->internal_comparator(), c->input_levels(which))); } diff --git a/examples/rocksdb_option_file_example.ini b/examples/rocksdb_option_file_example.ini index 838afe8eb..7dc070429 100644 --- a/examples/rocksdb_option_file_example.ini +++ b/examples/rocksdb_option_file_example.ini @@ -138,6 +138,7 @@ block_size=8192 block_restart_interval=16 cache_index_and_filter_blocks=false + pin_l0_filter_and_index_blocks_in_cache=false index_type=kBinarySearch hash_index_allow_collision=true flush_block_policy_factory=FlushBlockBySizePolicyFactory diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 4bb870e20..6e52d20af 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -451,6 +451,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_cache_index_and_filter_blocks( rocksdb_block_based_table_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void +rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache( + rocksdb_block_based_table_options_t*, unsigned char); +extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_skip_table_builder_flush( rocksdb_block_based_table_options_t* options, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_block_based_table_factory( diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 327270e34..638928ff5 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -146,6 +146,10 @@ class Cache { virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), bool thread_safe) = 0; + // Remove all entries. + // Prerequisit: no entry is referenced. + virtual void EraseUnRefEntries() = 0; + private: void LRU_Remove(Handle* e); void LRU_Append(Handle* e); diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index cb4d850e8..8aba3a153 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -64,6 +64,12 @@ struct BlockBasedTableOptions { // block during table initialization. bool cache_index_and_filter_blocks = false; + // if cache_index_and_filter_blocks is true and the below is true, then + // filter and index blocks are stored in the cache, but a reference is + // held in the "table reader" object so the blocks are pinned and only + // evicted from cache when the table reader is freed. + bool pin_l0_filter_and_index_blocks_in_cache = false; + // The index type that will be used for this table. enum IndexType : char { // A space efficient index block that is optimized for diff --git a/java/rocksjni/table.cc b/java/rocksjni/table.cc index 97aef9888..204d1ba38 100644 --- a/java/rocksjni/table.cc +++ b/java/rocksjni/table.cc @@ -38,13 +38,14 @@ jlong Java_org_rocksdb_PlainTableConfig_newTableFactoryHandle( /* * Class: org_rocksdb_BlockBasedTableConfig * Method: newTableFactoryHandle - * Signature: (ZJIJIIZIZZJIBBI)J + * Signature: (ZJIJIIZIZZZJIBBI)J */ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle( JNIEnv* env, jobject jobj, jboolean no_block_cache, jlong block_cache_size, jint block_cache_num_shardbits, jlong block_size, jint block_size_deviation, jint block_restart_interval, jboolean whole_key_filtering, jlong jfilterPolicy, jboolean cache_index_and_filter_blocks, + jboolean pin_l0_filter_and_index_blocks_in_cache, jboolean hash_index_allow_collision, jlong block_cache_compressed_size, jint block_cache_compressd_num_shard_bits, jbyte jchecksum_type, jbyte jindex_type, jint jformat_version) { @@ -70,6 +71,8 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle( options.filter_policy = *pFilterPolicy; } options.cache_index_and_filter_blocks = cache_index_and_filter_blocks; + options.pin_l0_filter_and_index_blocks_in_cache = + pin_l0_filter_and_index_blocks_in_cache; options.hash_index_allow_collision = hash_index_allow_collision; if (block_cache_compressed_size > 0) { if (block_cache_compressd_num_shard_bits > 0) { diff --git a/java/src/main/java/org/rocksdb/BlockBasedTableConfig.java b/java/src/main/java/org/rocksdb/BlockBasedTableConfig.java index f569e6f42..050eff1c8 100644 --- a/java/src/main/java/org/rocksdb/BlockBasedTableConfig.java +++ b/java/src/main/java/org/rocksdb/BlockBasedTableConfig.java @@ -21,6 +21,7 @@ public class BlockBasedTableConfig extends TableFormatConfig { wholeKeyFiltering_ = true; filter_ = null; cacheIndexAndFilterBlocks_ = false; + pinL0FilterAndIndexBlocksInCache_ = false; hashIndexAllowCollision_ = true; blockCacheCompressedSize_ = 0; blockCacheCompressedNumShardBits_ = 0; @@ -226,6 +227,29 @@ public class BlockBasedTableConfig extends TableFormatConfig { return this; } + /** + * Indicating if we'd like to pin L0 index/filter blocks to the block cache. + If not specified, defaults to false. + * + * @return if L0 index and filter blocks should be pinned to the block cache. + */ + public boolean pinL0FilterAndIndexBlocksInCache() { + return pinL0FilterAndIndexBlocksInCache_; + } + + /** + * Indicating if we'd like to pin L0 index/filter blocks to the block cache. + If not specified, defaults to false. + * + * @param pinL0FilterAndIndexBlocksInCache pin blocks in block cache + * @return the reference to the current config. + */ + public BlockBasedTableConfig setPinL0FilterAndIndexBlocksInCache( + final boolean pinL0FilterAndIndexBlocksInCache) { + pinL0FilterAndIndexBlocksInCache_ = pinL0FilterAndIndexBlocksInCache; + return this; + } + /** * Influence the behavior when kHashSearch is used. if false, stores a precise prefix to block range mapping @@ -393,6 +417,7 @@ public class BlockBasedTableConfig extends TableFormatConfig { blockCacheNumShardBits_, blockSize_, blockSizeDeviation_, blockRestartInterval_, wholeKeyFiltering_, filterHandle, cacheIndexAndFilterBlocks_, + pinL0FilterAndIndexBlocksInCache_, hashIndexAllowCollision_, blockCacheCompressedSize_, blockCacheCompressedNumShardBits_, checksumType_.getValue(), indexType_.getValue(), @@ -403,11 +428,13 @@ public class BlockBasedTableConfig extends TableFormatConfig { boolean noBlockCache, long blockCacheSize, int blockCacheNumShardBits, long blockSize, int blockSizeDeviation, int blockRestartInterval, boolean wholeKeyFiltering, long filterPolicyHandle, - boolean cacheIndexAndFilterBlocks, boolean hashIndexAllowCollision, - long blockCacheCompressedSize, int blockCacheCompressedNumShardBits, - byte checkSumType, byte indexType, int formatVersion); + boolean cacheIndexAndFilterBlocks, boolean pinL0FilterAndIndexBlocksInCache, + boolean hashIndexAllowCollision, long blockCacheCompressedSize, + int blockCacheCompressedNumShardBits, byte checkSumType, + byte indexType, int formatVersion); private boolean cacheIndexAndFilterBlocks_; + private boolean pinL0FilterAndIndexBlocksInCache_; private IndexType indexType_; private boolean hashIndexAllowCollision_; private ChecksumType checksumType_; diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 75917232d..c2617b168 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -64,7 +64,7 @@ Status BlockBasedTableFactory::NewTableReader( table_reader_options.ioptions, table_reader_options.env_options, table_options_, table_reader_options.internal_comparator, std::move(file), file_size, table_reader, prefetch_enabled, - table_reader_options.skip_filters); + table_reader_options.skip_filters, table_reader_options.level); } TableBuilder* BlockBasedTableFactory::NewTableBuilder( @@ -94,6 +94,12 @@ Status BlockBasedTableFactory::SanitizeOptions( return Status::InvalidArgument("Enable cache_index_and_filter_blocks, " ", but block cache is disabled"); } + if (table_options_.pin_l0_filter_and_index_blocks_in_cache && + table_options_.no_block_cache) { + return Status::InvalidArgument( + "Enable pin_l0_filter_and_index_blocks_in_cache, " + ", but block cache is disabled"); + } if (!BlockBasedTableSupportedVersion(table_options_.format_version)) { return Status::InvalidArgument( "Unsupported BlockBasedTable format_version. Please check " @@ -115,6 +121,10 @@ std::string BlockBasedTableFactory::GetPrintableTableOptions() const { snprintf(buffer, kBufferSize, " cache_index_and_filter_blocks: %d\n", table_options_.cache_index_and_filter_blocks); ret.append(buffer); + snprintf(buffer, kBufferSize, + " pin_l0_filter_and_index_blocks_in_cache: %d\n", + table_options_.pin_l0_filter_and_index_blocks_in_cache); + ret.append(buffer); snprintf(buffer, kBufferSize, " index_type: %d\n", table_options_.index_type); ret.append(buffer); diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index e48eea694..f0b192f7f 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -340,6 +340,28 @@ class HashIndexReader : public IndexReader { BlockContents prefixes_contents_; }; +// CachableEntry represents the entries that *may* be fetched from block cache. +// field `value` is the item we want to get. +// field `cache_handle` is the cache handle to the block cache. If the value +// was not read from cache, `cache_handle` will be nullptr. +template +struct BlockBasedTable::CachableEntry { + CachableEntry(TValue* _value, Cache::Handle* _cache_handle) + : value(_value), cache_handle(_cache_handle) {} + CachableEntry() : CachableEntry(nullptr, nullptr) {} + void Release(Cache* cache) { + if (cache_handle) { + cache->Release(cache_handle); + value = nullptr; + cache_handle = nullptr; + } + } + bool IsSet() const { return cache_handle != nullptr; } + + TValue* value = nullptr; + // if the entry is from the cache, cache_handle will be populated. + Cache::Handle* cache_handle = nullptr; +}; struct BlockBasedTable::Rep { Rep(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options, @@ -394,34 +416,21 @@ struct BlockBasedTable::Rep { // and compatible with existing code, we introduce a wrapper that allows // block to extract prefix without knowing if a key is internal or not. unique_ptr internal_prefix_transform; + + // only used in level 0 files: + // when pin_l0_filter_and_index_blocks_in_cache is true, we do use the + // LRU cache, but we always keep the filter & idndex block's handle checked + // out here (=we don't call Release()), plus the parsed out objects + // the LRU cache will never push flush them out, hence they're pinned + CachableEntry filter_entry; + CachableEntry index_entry; }; BlockBasedTable::~BlockBasedTable() { + Close(); delete rep_; } -// CachableEntry represents the entries that *may* be fetched from block cache. -// field `value` is the item we want to get. -// field `cache_handle` is the cache handle to the block cache. If the value -// was not read from cache, `cache_handle` will be nullptr. -template -struct BlockBasedTable::CachableEntry { - CachableEntry(TValue* _value, Cache::Handle* _cache_handle) - : value(_value), cache_handle(_cache_handle) {} - CachableEntry() : CachableEntry(nullptr, nullptr) {} - void Release(Cache* cache) { - if (cache_handle) { - cache->Release(cache_handle); - value = nullptr; - cache_handle = nullptr; - } - } - - TValue* value = nullptr; - // if the entry is from the cache, cache_handle will be populated. - Cache::Handle* cache_handle = nullptr; -}; - // Helper function to setup the cache key's prefix for the Table. void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) { assert(kMaxCacheKeyPrefixSize >= 10); @@ -498,7 +507,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, uint64_t file_size, unique_ptr* table_reader, const bool prefetch_index_and_filter, - const bool skip_filters) { + const bool skip_filters, const int level) { table_reader->reset(); Footer footer; @@ -594,14 +603,33 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, assert(table_options.block_cache != nullptr); // Hack: Call NewIndexIterator() to implicitly add index to the // block_cache + + // if pin_l0_filter_and_index_blocks_in_cache is true and this is + // a level0 file, then we will pass in this pointer to rep->index + // to NewIndexIterator(), which will save the index block in there + // else it's a nullptr and nothing special happens + CachableEntry* index_entry = nullptr; + if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache && + level == 0) { + index_entry = &rep->index_entry; + } unique_ptr iter( - new_table->NewIndexIterator(ReadOptions())); + new_table->NewIndexIterator(ReadOptions(), nullptr, index_entry)); s = iter->status(); if (s.ok()) { // Hack: Call GetFilter() to implicitly add filter to the block_cache auto filter_entry = new_table->GetFilter(); - filter_entry.Release(table_options.block_cache.get()); + // if pin_l0_filter_and_index_blocks_in_cache is true, and this is + // a level0 file, then save it in rep_->filter_entry; it will be + // released in the destructor only, hence it will be pinned in the + // cache until this reader is alive + if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache && + level == 0) { + rep->filter_entry = filter_entry; + } else { + filter_entry.Release(table_options.block_cache.get()); + } } } else { // If we don't use block cache for index/filter blocks access, we'll @@ -886,14 +914,19 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( return {rep_->filter.get(), nullptr /* cache handle */}; } - PERF_TIMER_GUARD(read_filter_block_nanos); - Cache* block_cache = rep_->table_options.block_cache.get(); if (rep_->filter_policy == nullptr /* do not use filter */ || block_cache == nullptr /* no block cache at all */) { return {nullptr /* filter */, nullptr /* cache handle */}; } + // we have a pinned filter block + if (rep_->filter_entry.IsSet()) { + return rep_->filter_entry; + } + + PERF_TIMER_GUARD(read_filter_block_nanos); + // Fetching from the cache char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, @@ -935,12 +968,19 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( } InternalIterator* BlockBasedTable::NewIndexIterator( - const ReadOptions& read_options, BlockIter* input_iter) { + const ReadOptions& read_options, BlockIter* input_iter, + CachableEntry* index_entry) { // index reader has already been pre-populated. if (rep_->index_reader) { return rep_->index_reader->NewIterator( input_iter, read_options.total_order_seek); } + // we have a pinned index block + if (rep_->index_entry.IsSet()) { + return rep_->index_entry.value->NewIterator(input_iter, + read_options.total_order_seek); + } + PERF_TIMER_GUARD(read_index_block_nanos); bool no_io = read_options.read_tier == kBlockCacheTier; @@ -996,7 +1036,15 @@ InternalIterator* BlockBasedTable::NewIndexIterator( assert(cache_handle); auto* iter = index_reader->NewIterator( input_iter, read_options.total_order_seek); - iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); + + // the caller would like to take ownership of the index block + // don't call RegisterCleanup() in this case, the caller will take care of it + if (index_entry != nullptr) { + *index_entry = {index_reader, cache_handle}; + } else { + iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); + } + return iter; } @@ -1224,7 +1272,13 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) { RecordTick(statistics, BLOOM_FILTER_PREFIX_USEFUL); } - filter_entry.Release(rep_->table_options.block_cache.get()); + // if rep_->filter_entry is not set, we should call Release(); otherwise + // don't call, in this case we have a local copy in rep_->filter_entry, + // it's pinned to the cache and will be released in the destructor + if (!rep_->filter_entry.IsSet()) { + filter_entry.Release(rep_->table_options.block_cache.get()); + } + return may_match; } @@ -1324,7 +1378,12 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, } } - filter_entry.Release(rep_->table_options.block_cache.get()); + // if rep_->filter_entry is not set, we should call Release(); otherwise + // don't call, in this case we have a local copy in rep_->filter_entry, + // it's pinned to the cache and will be released in the destructor + if (!rep_->filter_entry.IsSet()) { + filter_entry.Release(rep_->table_options.block_cache.get()); + } return s; } @@ -1612,6 +1671,11 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) { return s; } +void BlockBasedTable::Close() { + rep_->filter_entry.Release(rep_->table_options.block_cache.get()); + rep_->index_entry.Release(rep_->table_options.block_cache.get()); +} + Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) { out_file->Append( "Index Details:\n" diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 600ca18a3..6a88d9d9a 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -76,7 +76,7 @@ class BlockBasedTable : public TableReader { unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader, bool prefetch_index_and_filter = true, - bool skip_filters = false); + bool skip_filters = false, int level = -1); bool PrefixMayMatch(const Slice& internal_key); @@ -119,6 +119,8 @@ class BlockBasedTable : public TableReader { // convert SST file to a human readable form Status DumpTable(WritableFile* out_file) override; + void Close() override; + ~BlockBasedTable(); bool TEST_filter_block_preloaded() const; @@ -155,8 +157,9 @@ class BlockBasedTable : public TableReader { // 2. index is not present in block cache. // 3. We disallowed any io to be performed, that is, read_options == // kBlockCacheTier - InternalIterator* NewIndexIterator(const ReadOptions& read_options, - BlockIter* input_iter = nullptr); + InternalIterator* NewIndexIterator( + const ReadOptions& read_options, BlockIter* input_iter = nullptr, + CachableEntry* index_entry = nullptr); // Read block cache from block caches (if set): block_cache and // block_cache_compressed. diff --git a/table/table_builder.h b/table/table_builder.h index ed79bed0e..274245f08 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -29,17 +29,20 @@ struct TableReaderOptions { TableReaderOptions(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options, const InternalKeyComparator& _internal_comparator, - bool _skip_filters = false) + bool _skip_filters = false, int _level = -1) : ioptions(_ioptions), env_options(_env_options), internal_comparator(_internal_comparator), - skip_filters(_skip_filters) {} + skip_filters(_skip_filters), + level(_level) {} const ImmutableCFOptions& ioptions; const EnvOptions& env_options; const InternalKeyComparator& internal_comparator; // This is only used for BlockBasedTable (reader) bool skip_filters; + // what level this table/file is on, -1 for "not set, don't know" + int level; }; struct TableBuilderOptions { diff --git a/table/table_reader.h b/table/table_reader.h index 5751ab03f..c047bf8cb 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -91,6 +91,8 @@ class TableReader { virtual Status DumpTable(WritableFile* out_file) { return Status::NotSupported("DumpTable() not supported"); } + + virtual void Close() {} }; } // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index 3cc7d0dc7..1ce61cadf 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -333,6 +333,8 @@ class TableConstructor: public Constructor { return convert_to_internal_key_; } + void ResetTableReader() { table_reader_.reset(); } + private: void Reset() { uniq_id_ = 0; @@ -1017,6 +1019,7 @@ TEST_F(BlockBasedTableTest, BasicBlockBasedTableProperties) { } Slice content = block_builder.Finish(); ASSERT_EQ(content.size() + kBlockTrailerSize, props.data_size); + c.ResetTableReader(); } TEST_F(BlockBasedTableTest, FilterPolicyNameProperties) { @@ -1034,6 +1037,7 @@ TEST_F(BlockBasedTableTest, FilterPolicyNameProperties) { GetPlainInternalComparator(options.comparator), &keys, &kvmap); auto& props = *c.GetTableReader()->GetTableProperties(); ASSERT_EQ("rocksdb.BuiltinBloomFilter", props.filter_policy_name); + c.ResetTableReader(); } // @@ -1075,6 +1079,7 @@ void PrefetchRange(TableConstructor* c, Options* opt, // assert our expectation in cache warmup AssertKeysInCache(table_reader, keys_in_cache, keys_not_in_cache); + c->ResetTableReader(); } TEST_F(BlockBasedTableTest, PrefetchTest) { @@ -1102,6 +1107,7 @@ TEST_F(BlockBasedTableTest, PrefetchTest) { stl_wrappers::KVMap kvmap; const ImmutableCFOptions ioptions(opt); c.Finish(opt, ioptions, table_options, *ikc, &keys, &kvmap); + c.ResetTableReader(); // We get the following data spread : // @@ -1157,6 +1163,7 @@ TEST_F(BlockBasedTableTest, PrefetchTest) { PrefetchRange(&c, &opt, &table_options, keys, "k06", "k00", {}, {}, Status::InvalidArgument(Slice("k06 "), Slice("k07"))); + c.ResetTableReader(); } TEST_F(BlockBasedTableTest, TotalOrderSeekOnHashIndex) { @@ -1400,6 +1407,7 @@ TEST_F(TableTest, HashIndexTest) { ASSERT_TRUE(BytewiseComparator()->Compare(prefix, ukey_prefix) < 0); } } + c.ResetTableReader(); } // It's very hard to figure out the index block size of a block accurately. @@ -1440,6 +1448,7 @@ TEST_F(BlockBasedTableTest, IndexSizeStat) { auto index_size = c.GetTableReader()->GetTableProperties()->index_size; ASSERT_GT(index_size, last_index_size); last_index_size = index_size; + c.ResetTableReader(); } } @@ -1466,6 +1475,7 @@ TEST_F(BlockBasedTableTest, NumBlockStat) { GetPlainInternalComparator(options.comparator), &ks, &kvmap); ASSERT_EQ(kvmap.size(), c.GetTableReader()->GetTableProperties()->num_data_blocks); + c.ResetTableReader(); } // A simple tool that takes the snapshot of block cache statistics. @@ -1662,6 +1672,8 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { // release the iterator so that the block cache can reset correctly. iter.reset(); + c.ResetTableReader(); + // -- PART 2: Open with very small block cache // In this test, no block will ever get hit since the block cache is // too small to fit even one entry. @@ -1702,6 +1714,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ASSERT_EQ(props.GetCacheBytesRead(), 0); } iter.reset(); + c.ResetTableReader(); // -- PART 3: Open table with bloom filter enabled but not in SST file table_options.block_cache = NewLRUCache(4096); @@ -1715,7 +1728,9 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ImmutableCFOptions ioptions3(options); // Generate table without filter policy c3.Finish(options, ioptions3, table_options, - GetPlainInternalComparator(options.comparator), &keys, &kvmap); + GetPlainInternalComparator(options.comparator), &keys, &kvmap); + c3.ResetTableReader(); + // Open table with filter policy table_options.filter_policy.reset(NewBloomFilterPolicy(1)); options.table_factory.reset(new BlockBasedTableFactory(table_options)); @@ -1732,6 +1747,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ASSERT_EQ(value, "hello"); BlockCachePropertiesSnapshot props(options.statistics.get()); props.AssertFilterBlockStat(0, 0); + c3.ResetTableReader(); } void ValidateBlockSizeDeviation(int value, int expected) { @@ -1894,6 +1910,7 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) { for (const std::string& key : keys) { ASSERT_TRUE(table_reader->TEST_KeyInCache(ReadOptions(), key)); } + c.ResetTableReader(); // rerun with different block cache table_options.block_cache = NewLRUCache(16 * 1024 * 1024); @@ -1904,6 +1921,7 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) { for (const std::string& key : keys) { ASSERT_TRUE(!table_reader->TEST_KeyInCache(ReadOptions(), key)); } + c.ResetTableReader(); } // Plain table is not supported in ROCKSDB_LITE @@ -1991,6 +2009,7 @@ TEST_F(GeneralTableTest, ApproximateOffsetOfPlain) { ASSERT_TRUE(Between(c.ApproximateOffsetOf("k06"), 510000, 511000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("k07"), 510000, 511000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000)); + c.ResetTableReader(); } static void DoCompressionTest(CompressionType comp) { @@ -2017,6 +2036,7 @@ static void DoCompressionTest(CompressionType comp) { ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), 2000, 3000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), 2000, 3000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 4000, 6100)); + c.ResetTableReader(); } TEST_F(GeneralTableTest, ApproximateOffsetOfCompressed) { @@ -2342,6 +2362,7 @@ TEST_P(IndexBlockRestartIntervalTest, IndexBlockRestartInterval) { kv_iter++; } ASSERT_EQ(kv_iter, kvmap.end()); + c.ResetTableReader(); } class PrefixTest : public testing::Test { diff --git a/tools/benchmark.sh b/tools/benchmark.sh index e4729e7f3..d28aeb271 100755 --- a/tools/benchmark.sh +++ b/tools/benchmark.sh @@ -74,6 +74,7 @@ const_params=" --level_compaction_dynamic_level_bytes=true \ --bytes_per_sync=$((8 * M)) \ --cache_index_and_filter_blocks=0 \ + --pin_l0_filter_and_index_blocks_in_cache=1 \ --benchmark_write_rate_limit=$(( 1024 * 1024 * $mb_written_per_sec )) \ \ --hard_rate_limit=3 \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index e3e11e17b..2e1a83237 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -340,6 +340,9 @@ DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed" DEFINE_bool(cache_index_and_filter_blocks, false, "Cache index/filter blocks in block cache."); +DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache, false, + "Pin index/filter blocks of L0 files in block cache."); + DEFINE_int32(block_size, static_cast(rocksdb::BlockBasedTableOptions().block_size), "Number of bytes in a block."); @@ -2511,6 +2514,8 @@ class Benchmark { } block_based_options.cache_index_and_filter_blocks = FLAGS_cache_index_and_filter_blocks; + block_based_options.pin_l0_filter_and_index_blocks_in_cache = + FLAGS_pin_l0_filter_and_index_blocks_in_cache; block_based_options.block_cache = cache_; block_based_options.block_cache_compressed = compressed_cache_; block_based_options.block_size = FLAGS_block_size; diff --git a/tools/sst_dump_tool_imp.h b/tools/sst_dump_tool_imp.h index 6bbc4d676..b518866cc 100644 --- a/tools/sst_dump_tool_imp.h +++ b/tools/sst_dump_tool_imp.h @@ -59,12 +59,14 @@ class SstFileReader { bool output_hex_; EnvOptions soptions_; - Status init_result_; - unique_ptr table_reader_; - unique_ptr file_; // options_ and internal_comparator_ will also be used in // ReadSequential internally (specifically, seek-related operations) Options options_; + + Status init_result_; + unique_ptr table_reader_; + unique_ptr file_; + const ImmutableCFOptions ioptions_; InternalKeyComparator internal_comparator_; unique_ptr table_properties_; diff --git a/util/cache.cc b/util/cache.cc index 6015644f6..ed9ff870c 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -225,6 +225,8 @@ class LRUCache { void ApplyToAllCacheEntries(void (*callback)(void*, size_t), bool thread_safe); + void EraseUnRefEntries(); + private: void LRU_Remove(LRUHandle* e); void LRU_Append(LRUHandle* e); @@ -280,6 +282,29 @@ bool LRUCache::Unref(LRUHandle* e) { // Call deleter and free +void LRUCache::EraseUnRefEntries() { + autovector last_reference_list; + { + MutexLock l(&mutex_); + while (lru_.next != &lru_) { + LRUHandle* old = lru_.next; + assert(old->in_cache); + assert(old->refs == + 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->in_cache = false; + Unref(old); + usage_ -= old->charge; + last_reference_list.push_back(old); + } + } + + for (auto entry : last_reference_list) { + entry->Free(); + } +} + void LRUCache::ApplyToAllCacheEntries(void (*callback)(void*, size_t), bool thread_safe) { if (thread_safe) { @@ -615,6 +640,13 @@ class ShardedLRUCache : public Cache { shards_[s].ApplyToAllCacheEntries(callback, thread_safe); } } + + virtual void EraseUnRefEntries() override { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + shards_[s].EraseUnRefEntries(); + } + } }; } // end anonymous namespace diff --git a/util/options_helper.h b/util/options_helper.h index b0864442c..f2e4878b1 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -479,54 +479,61 @@ static std::unordered_map cf_options_type_info = { {offsetof(struct ColumnFamilyOptions, compaction_style), OptionType::kCompactionStyle, OptionVerificationType::kNormal}}}; -static std::unordered_map block_based_table_type_info = { - /* currently not supported - std::shared_ptr block_cache = nullptr; - std::shared_ptr block_cache_compressed = nullptr; - */ - {"flush_block_policy_factory", - {offsetof(struct BlockBasedTableOptions, flush_block_policy_factory), - OptionType::kFlushBlockPolicyFactory, OptionVerificationType::kByName}}, - {"cache_index_and_filter_blocks", - {offsetof(struct BlockBasedTableOptions, cache_index_and_filter_blocks), - OptionType::kBoolean, OptionVerificationType::kNormal}}, - {"index_type", - {offsetof(struct BlockBasedTableOptions, index_type), - OptionType::kBlockBasedTableIndexType, OptionVerificationType::kNormal}}, - {"hash_index_allow_collision", - {offsetof(struct BlockBasedTableOptions, hash_index_allow_collision), - OptionType::kBoolean, OptionVerificationType::kNormal}}, - {"checksum", - {offsetof(struct BlockBasedTableOptions, checksum), - OptionType::kChecksumType, OptionVerificationType::kNormal}}, - {"no_block_cache", - {offsetof(struct BlockBasedTableOptions, no_block_cache), - OptionType::kBoolean, OptionVerificationType::kNormal}}, - {"block_size", - {offsetof(struct BlockBasedTableOptions, block_size), OptionType::kSizeT, - OptionVerificationType::kNormal}}, - {"block_size_deviation", - {offsetof(struct BlockBasedTableOptions, block_size_deviation), - OptionType::kInt, OptionVerificationType::kNormal}}, - {"block_restart_interval", - {offsetof(struct BlockBasedTableOptions, block_restart_interval), - OptionType::kInt, OptionVerificationType::kNormal}}, - {"index_block_restart_interval", - {offsetof(struct BlockBasedTableOptions, index_block_restart_interval), - OptionType::kInt, OptionVerificationType::kNormal}}, - {"filter_policy", - {offsetof(struct BlockBasedTableOptions, filter_policy), - OptionType::kFilterPolicy, OptionVerificationType::kByName}}, - {"whole_key_filtering", - {offsetof(struct BlockBasedTableOptions, whole_key_filtering), - OptionType::kBoolean, OptionVerificationType::kNormal}}, - {"skip_table_builder_flush", - {offsetof(struct BlockBasedTableOptions, skip_table_builder_flush), - OptionType::kBoolean, OptionVerificationType::kNormal}}, - {"format_version", - {offsetof(struct BlockBasedTableOptions, format_version), - OptionType::kUInt32T, OptionVerificationType::kNormal}}}; +static std::unordered_map + block_based_table_type_info = { + /* currently not supported + std::shared_ptr block_cache = nullptr; + std::shared_ptr block_cache_compressed = nullptr; + */ + {"flush_block_policy_factory", + {offsetof(struct BlockBasedTableOptions, flush_block_policy_factory), + OptionType::kFlushBlockPolicyFactory, + OptionVerificationType::kByName}}, + {"cache_index_and_filter_blocks", + {offsetof(struct BlockBasedTableOptions, + cache_index_and_filter_blocks), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"pin_l0_filter_and_index_blocks_in_cache", + {offsetof(struct BlockBasedTableOptions, + pin_l0_filter_and_index_blocks_in_cache), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"index_type", + {offsetof(struct BlockBasedTableOptions, index_type), + OptionType::kBlockBasedTableIndexType, + OptionVerificationType::kNormal}}, + {"hash_index_allow_collision", + {offsetof(struct BlockBasedTableOptions, hash_index_allow_collision), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"checksum", + {offsetof(struct BlockBasedTableOptions, checksum), + OptionType::kChecksumType, OptionVerificationType::kNormal}}, + {"no_block_cache", + {offsetof(struct BlockBasedTableOptions, no_block_cache), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"block_size", + {offsetof(struct BlockBasedTableOptions, block_size), + OptionType::kSizeT, OptionVerificationType::kNormal}}, + {"block_size_deviation", + {offsetof(struct BlockBasedTableOptions, block_size_deviation), + OptionType::kInt, OptionVerificationType::kNormal}}, + {"block_restart_interval", + {offsetof(struct BlockBasedTableOptions, block_restart_interval), + OptionType::kInt, OptionVerificationType::kNormal}}, + {"index_block_restart_interval", + {offsetof(struct BlockBasedTableOptions, index_block_restart_interval), + OptionType::kInt, OptionVerificationType::kNormal}}, + {"filter_policy", + {offsetof(struct BlockBasedTableOptions, filter_policy), + OptionType::kFilterPolicy, OptionVerificationType::kByName}}, + {"whole_key_filtering", + {offsetof(struct BlockBasedTableOptions, whole_key_filtering), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"skip_table_builder_flush", + {offsetof(struct BlockBasedTableOptions, skip_table_builder_flush), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"format_version", + {offsetof(struct BlockBasedTableOptions, format_version), + OptionType::kUInt32T, OptionVerificationType::kNormal}}}; static std::unordered_map plain_table_type_info = { {"user_key_len", diff --git a/util/options_test.cc b/util/options_test.cc index 7076ea41e..bb5cc96e4 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -1602,7 +1602,9 @@ TEST_F(OptionsParserTest, BlockBasedTableOptionsAllFieldsSettable) { // Need to update the option string if a new option is added. ASSERT_OK(GetBlockBasedTableOptionsFromString( *bbto, - "cache_index_and_filter_blocks=1;index_type=kHashSearch;" + "cache_index_and_filter_blocks=1;" + "pin_l0_filter_and_index_blocks_in_cache=1;" + "index_type=kHashSearch;" "checksum=kxxHash;hash_index_allow_collision=1;no_block_cache=1;" "block_cache=1M;block_cache_compressed=1k;block_size=1024;" "block_size_deviation=8;block_restart_interval=4; " diff --git a/util/testutil.cc b/util/testutil.cc index b8190faf7..8c587511f 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -193,6 +193,7 @@ const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined) { BlockBasedTableOptions RandomBlockBasedTableOptions(Random* rnd) { BlockBasedTableOptions opt; opt.cache_index_and_filter_blocks = rnd->Uniform(2); + opt.pin_l0_filter_and_index_blocks_in_cache = rnd->Uniform(2); opt.index_type = rnd->Uniform(2) ? BlockBasedTableOptions::kBinarySearch : BlockBasedTableOptions::kHashSearch; opt.hash_index_allow_collision = rnd->Uniform(2);