From 9b51987521bd7256db06a194863963457ed92648 Mon Sep 17 00:00:00 2001 From: Marton Trencseni Date: Fri, 1 Apr 2016 10:42:39 -0700 Subject: [PATCH] Adding pin_l0_filter_and_index_blocks_in_cache feature and related fixes. Summary: When a block based table file is opened, if prefetch_index_and_filter is true, it will prefetch the index and filter blocks, putting them into the block cache. What this feature adds: when a L0 block based table file is opened, if pin_l0_filter_and_index_blocks_in_cache is true in the options (and prefetch_index_and_filter is true), then the filter and index blocks aren't released back to the block cache at the end of BlockBasedTableReader::Open(). Instead the table reader takes ownership of them, hence pinning them, ie. the LRU cache will never push them out. Meanwhile in the table reader, further accesses will not hit the block cache, thus avoiding lock contention. Test Plan: 'export TEST_TMPDIR=/dev/shm/ && DISABLE_JEMALLOC=1 OPT=-g make all valgrind_check -j32' is OK. I didn't run the Java tests, I don't have Java set up on my devserver. Reviewers: sdong Reviewed By: sdong Subscribers: andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D56133 --- db/builder.cc | 5 +- db/builder.h | 2 +- db/c.cc | 5 + db/column_family.h | 2 + db/db_impl.cc | 15 +++ db/db_test2.cc | 118 ++++++++++++++++ db/flush_job.cc | 16 +-- db/table_cache.cc | 28 ++-- db/table_cache.h | 15 ++- db/version_builder.cc | 10 +- db/version_set.cc | 35 +++-- examples/rocksdb_option_file_example.ini | 1 + include/rocksdb/c.h | 3 + include/rocksdb/cache.h | 4 + include/rocksdb/table.h | 6 + java/rocksjni/table.cc | 5 +- .../org/rocksdb/BlockBasedTableConfig.java | 33 ++++- table/block_based_table_factory.cc | 12 +- table/block_based_table_reader.cc | 126 +++++++++++++----- table/block_based_table_reader.h | 9 +- table/table_builder.h | 7 +- table/table_reader.h | 2 + table/table_test.cc | 23 +++- tools/benchmark.sh | 1 + tools/db_bench_tool.cc | 5 + tools/sst_dump_tool_imp.h | 8 +- util/cache.cc | 32 +++++ util/options_helper.h | 103 +++++++------- util/options_test.cc | 4 +- util/testutil.cc | 1 + 30 files changed, 504 insertions(+), 132 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index ae6015003..317c9b054 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -63,7 +63,7 @@ Status BuildTable( const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, const Env::IOPriority io_priority, - TableProperties* table_properties) { + TableProperties* table_properties, int level) { // Reports the IOStats for flush for every following bytes. const size_t kReportFlushIOStatsEvery = 1048576; Status s; @@ -149,7 +149,8 @@ Status BuildTable( ReadOptions(), env_options, internal_comparator, meta->fd, nullptr, (internal_stats == nullptr) ? nullptr : internal_stats->GetFileReadHist(0), - false)); + false /* for_compaction */, nullptr /* arena */, + false /* skip_filter */, level)); s = it->status(); if (s.ok() && paranoid_file_checks) { for (it->SeekToFirst(); it->Valid(); it->Next()) { diff --git a/db/builder.h b/db/builder.h index b4b72b7d7..1eba6da9c 100644 --- a/db/builder.h +++ b/db/builder.h @@ -61,6 +61,6 @@ extern Status BuildTable( const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, const Env::IOPriority io_priority = Env::IO_HIGH, - TableProperties* table_properties = nullptr); + TableProperties* table_properties = nullptr, int level = -1); } // namespace rocksdb diff --git a/db/c.cc b/db/c.cc index 85e911491..9f49aba23 100644 --- a/db/c.cc +++ b/db/c.cc @@ -1288,6 +1288,11 @@ void rocksdb_block_based_options_set_cache_index_and_filter_blocks( options->rep.cache_index_and_filter_blocks = v; } +void rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache( + rocksdb_block_based_table_options_t* options, unsigned char v) { + options->rep.pin_l0_filter_and_index_blocks_in_cache = v; +} + void rocksdb_block_based_options_set_skip_table_builder_flush( rocksdb_block_based_table_options_t* options, unsigned char v) { options->rep.skip_table_builder_flush = v; diff --git a/db/column_family.h b/db/column_family.h index ce5f409c4..1a4036e60 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -465,6 +465,8 @@ class ColumnFamilySet { // Don't call while iterating over ColumnFamilySet void FreeDeadColumnFamilies(); + Cache* get_table_cache() { return table_cache_; } + private: friend class ColumnFamilyData; // helper function that gets called from cfd destructor diff --git a/db/db_impl.cc b/db/db_impl.cc index 6124313b8..db5390bca 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -405,6 +405,21 @@ DBImpl::~DBImpl() { } logs_.clear(); + // Table cache may have table handles holding blocks from the block cache. + // We need to release them before the block cache is destroyed. The block + // cache may be destroyed inside versions_.reset(), when column family data + // list is destroyed, so leaving handles in table cache after + // versions_.reset() may cause issues. + // Here we clean all unreferenced handles in table cache. + // Now we assume all user queries have finished, so only version set itself + // can possibly hold the blocks from block cache. After releasing unreferenced + // handles here, only handles held by version set left and inside + // versions_.reset(), we will release them. There, we need to make sure every + // time a handle is released, we erase it from the cache too. By doing that, + // we can guarantee that after versions_.reset(), table cache is empty + // so the cache can be safely destroyed. + table_cache_->EraseUnRefEntries(); + // versions need to be destroyed before table_cache since it can hold // references to table_cache. versions_.reset(); diff --git a/db/db_test2.cc b/db/db_test2.cc index f7d18dae2..595b8bede 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -13,6 +13,11 @@ namespace rocksdb { +static uint64_t TestGetTickerCount(const Options& options, + Tickers ticker_type) { + return options.statistics->getTickerCount(ticker_type); +} + class DBTest2 : public DBTestBase { public: DBTest2() : DBTestBase("/db_test2") {} @@ -675,6 +680,119 @@ TEST_F(DBTest2, DISABLED_FirstSnapshotTest) { db_->ReleaseSnapshot(s1); } + +class PinL0IndexAndFilterBlocksTest : public DBTestBase, + public testing::WithParamInterface { + public: + PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {} + virtual void SetUp() override { infinite_max_files_ = GetParam(); } + + bool infinite_max_files_; +}; + +TEST_P(PinL0IndexAndFilterBlocksTest, + IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) { + Options options = CurrentOptions(); + if (infinite_max_files_) { + options.max_open_files = -1; + } + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + table_options.pin_l0_filter_and_index_blocks_in_cache = true; + table_options.filter_policy.reset(NewBloomFilterPolicy(20)); + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + CreateAndReopenWithCF({"pikachu"}, options); + + ASSERT_OK(Put(1, "key", "val")); + // Create a new table. + ASSERT_OK(Flush(1)); + + // index/filter blocks added to block cache right after table creation. + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); + + // only index/filter were added + ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS)); + + std::string value; + // Miss and hit count should remain the same, they're all pinned. + db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); + + // Miss and hit count should remain the same, they're all pinned. + value = Get(1, "key"); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); +} + +TEST_P(PinL0IndexAndFilterBlocksTest, + MultiLevelIndexAndFilterBlocksCachedWithPinning) { + Options options = CurrentOptions(); + if (infinite_max_files_) { + options.max_open_files = -1; + } + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + table_options.pin_l0_filter_and_index_blocks_in_cache = true; + table_options.filter_policy.reset(NewBloomFilterPolicy(20)); + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + CreateAndReopenWithCF({"pikachu"}, options); + + Put(1, "a", "begin"); + Put(1, "z", "end"); + ASSERT_OK(Flush(1)); + // move this table to L1 + dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); + + // reset block cache + table_options.block_cache = NewLRUCache(64 * 1024); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + TryReopenWithColumnFamilies({"default", "pikachu"}, options); + // create new table at L0 + Put(1, "a2", "begin2"); + Put(1, "z2", "end2"); + ASSERT_OK(Flush(1)); + + table_options.block_cache->EraseUnRefEntries(); + + // get base cache values + uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS); + uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT); + uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS); + uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT); + + std::string value; + // this should be read from L0 + // so cache values don't change + value = Get(1, "a2"); + ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); + + // this should be read from L1 + // the file is opened, prefetching results in a cache filter miss + // the block is loaded and added to the cache, + // then the get results in a cache hit for L1 + value = Get(1, "a"); + ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); +} + +INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest, + PinL0IndexAndFilterBlocksTest, ::testing::Bool()); } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/flush_job.cc b/db/flush_job.cc index b83f9dbe6..b4e5b307f 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -234,14 +234,14 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); - s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_, - cfd_->table_cache(), iter.get(), meta, - cfd_->internal_comparator(), - cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), - existing_snapshots_, earliest_write_conflict_snapshot_, - output_compression_, cfd_->ioptions()->compression_opts, - mutable_cf_options_.paranoid_file_checks, - cfd_->internal_stats(), Env::IO_HIGH, &table_properties_); + s = BuildTable( + dbname_, db_options_.env, *cfd_->ioptions(), env_options_, + cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(), + cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), + existing_snapshots_, earliest_write_conflict_snapshot_, + output_compression_, cfd_->ioptions()->compression_opts, + mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), + Env::IO_HIGH, &table_properties_, 0 /* level */); info.table_properties = table_properties_; LogFlush(db_options_.info_log); } diff --git a/db/table_cache.cc b/db/table_cache.cc index 2a4621b7e..f8c81f9f9 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -88,7 +88,7 @@ Status TableCache::GetTableReader( const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, - unique_ptr* table_reader, bool skip_filters) { + unique_ptr* table_reader, bool skip_filters, int level) { std::string fname = TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); unique_ptr file; @@ -109,18 +109,26 @@ Status TableCache::GetTableReader( file_read_hist)); s = ioptions_.table_factory->NewTableReader( TableReaderOptions(ioptions_, env_options, internal_comparator, - skip_filters), + skip_filters, level), std::move(file_reader), fd.GetFileSize(), table_reader); TEST_SYNC_POINT("TableCache::GetTableReader:0"); } return s; } +void TableCache::EraseHandle(const FileDescriptor& fd, Cache::Handle* handle) { + ReleaseHandle(handle); + uint64_t number = fd.GetNumber(); + Slice key = GetSliceForFileNumber(&number); + cache_->Erase(key); +} + Status TableCache::FindTable(const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, Cache::Handle** handle, const bool no_io, bool record_read_stats, - HistogramImpl* file_read_hist, bool skip_filters) { + HistogramImpl* file_read_hist, bool skip_filters, + int level) { PERF_TIMER_GUARD(find_table_nanos); Status s; uint64_t number = fd.GetNumber(); @@ -136,7 +144,7 @@ Status TableCache::FindTable(const EnvOptions& env_options, unique_ptr table_reader; s = GetTableReader(env_options, internal_comparator, fd, false /* sequential mode */, record_read_stats, - file_read_hist, &table_reader, skip_filters); + file_read_hist, &table_reader, skip_filters, level); if (!s.ok()) { assert(table_reader == nullptr); RecordTick(ioptions_.statistics, NO_FILE_ERRORS); @@ -158,7 +166,7 @@ InternalIterator* TableCache::NewIterator( const ReadOptions& options, const EnvOptions& env_options, const InternalKeyComparator& icomparator, const FileDescriptor& fd, TableReader** table_reader_ptr, HistogramImpl* file_read_hist, - bool for_compaction, Arena* arena, bool skip_filters) { + bool for_compaction, Arena* arena, bool skip_filters, int level) { PERF_TIMER_GUARD(new_table_iterator_nanos); if (table_reader_ptr != nullptr) { @@ -173,7 +181,8 @@ InternalIterator* TableCache::NewIterator( unique_ptr table_reader_unique_ptr; Status s = GetTableReader( env_options, icomparator, fd, /* sequential mode */ true, - /* record stats */ false, nullptr, &table_reader_unique_ptr); + /* record stats */ false, nullptr, &table_reader_unique_ptr, + false /* skip_filters */, level); if (!s.ok()) { return NewErrorInternalIterator(s, arena); } @@ -184,7 +193,7 @@ InternalIterator* TableCache::NewIterator( Status s = FindTable(env_options, icomparator, fd, &handle, options.read_tier == kBlockCacheTier /* no_io */, !for_compaction /* record read_stats */, - file_read_hist, skip_filters); + file_read_hist, skip_filters, level); if (!s.ok()) { return NewErrorInternalIterator(s, arena); } @@ -216,7 +225,7 @@ Status TableCache::Get(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, const Slice& k, GetContext* get_context, HistogramImpl* file_read_hist, - bool skip_filters) { + bool skip_filters, int level) { TableReader* t = fd.table_reader; Status s; Cache::Handle* handle = nullptr; @@ -265,7 +274,8 @@ Status TableCache::Get(const ReadOptions& options, if (!t) { s = FindTable(env_options_, internal_comparator, fd, &handle, options.read_tier == kBlockCacheTier /* no_io */, - true /* record_read_stats */, file_read_hist, skip_filters); + true /* record_read_stats */, file_read_hist, skip_filters, + level); if (s.ok()) { t = GetTableReaderFromHandle(handle); } diff --git a/db/table_cache.h b/db/table_cache.h index f8416e0b4..fbb7cacbf 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -45,34 +45,41 @@ class TableCache { // the cache and should not be deleted, and is valid for as long as the // returned iterator is live. // @param skip_filters Disables loading/accessing the filter block + // @param level The level this table is at, -1 for "not set / don't know" InternalIterator* NewIterator( const ReadOptions& options, const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, TableReader** table_reader_ptr = nullptr, HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, - Arena* arena = nullptr, bool skip_filters = false); + Arena* arena = nullptr, bool skip_filters = false, int level = -1); // If a seek to internal key "k" in specified file finds an entry, // call (*handle_result)(arg, found_key, found_value) repeatedly until // it returns false. // @param skip_filters Disables loading/accessing the filter block + // @param level The level this table is at, -1 for "not set / don't know" Status Get(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, const Slice& k, GetContext* get_context, HistogramImpl* file_read_hist = nullptr, - bool skip_filters = false); + bool skip_filters = false, int level = -1); // Evict any entry for the specified file number static void Evict(Cache* cache, uint64_t file_number); + // Clean table handle and erase it from the table cache + // Used in DB close, or the file is not live anymore. + void EraseHandle(const FileDescriptor& fd, Cache::Handle* handle); + // Find table reader // @param skip_filters Disables loading/accessing the filter block + // @param level == -1 means not specified Status FindTable(const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, Cache::Handle**, const bool no_io = false, bool record_read_stats = true, HistogramImpl* file_read_hist = nullptr, - bool skip_filters = false); + bool skip_filters = false, int level = -1); // Get TableReader from a cache handle. TableReader* GetTableReaderFromHandle(Cache::Handle* handle); @@ -106,7 +113,7 @@ class TableCache { const FileDescriptor& fd, bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, unique_ptr* table_reader, - bool skip_filters = false); + bool skip_filters = false, int level = -1); const ImmutableCFOptions& ioptions_; const EnvOptions& env_options_; diff --git a/db/version_builder.cc b/db/version_builder.cc index d0e7640fd..d73b33d8e 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -309,11 +309,11 @@ class VersionBuilder::Rep { auto* file_meta = files_meta[file_idx].first; int level = files_meta[file_idx].second; - table_cache_->FindTable(env_options_, - *(base_vstorage_->InternalComparator()), - file_meta->fd, &file_meta->table_reader_handle, - false /*no_io */, true /* record_read_stats */, - internal_stats->GetFileReadHist(level)); + table_cache_->FindTable( + env_options_, *(base_vstorage_->InternalComparator()), + file_meta->fd, &file_meta->table_reader_handle, false /*no_io */, + true /* record_read_stats */, + internal_stats->GetFileReadHist(level), false, level); if (file_meta->table_reader_handle != nullptr) { // Load table_reader file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle( diff --git a/db/version_set.cc b/db/version_set.cc index 75140ed42..f1dbb94e0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -92,6 +92,7 @@ class FilePicker { const InternalKeyComparator* internal_comparator) : num_levels_(num_levels), curr_level_(static_cast(-1)), + returned_file_level_(static_cast(-1)), hit_file_level_(static_cast(-1)), search_left_bound_(0), search_right_bound_(FileIndexer::kLevelMaxIndex), @@ -118,6 +119,8 @@ class FilePicker { } } + int GetCurrentLevel() { return returned_file_level_; } + FdWithKeyRange* GetNextFile() { while (!search_ended_) { // Loops over different levels. while (curr_index_in_curr_level_ < curr_file_level_->num_files) { @@ -190,6 +193,7 @@ class FilePicker { } prev_file_ = f; #endif + returned_file_level_ = curr_level_; if (curr_level_ > 0 && cmp_largest < 0) { // No more files to search in this level. search_ended_ = !PrepareNextLevel(); @@ -216,6 +220,7 @@ class FilePicker { private: unsigned int num_levels_; unsigned int curr_level_; + unsigned int returned_file_level_; unsigned int hit_file_level_; int32_t search_left_bound_; int32_t search_right_bound_; @@ -322,7 +327,7 @@ Version::~Version() { f->refs--; if (f->refs <= 0) { if (f->table_reader_handle) { - cfd_->table_cache()->ReleaseHandle(f->table_reader_handle); + cfd_->table_cache()->EraseHandle(f->fd, f->table_reader_handle); f->table_reader_handle = nullptr; } vset_->obsolete_files_.push_back(f); @@ -486,7 +491,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { const EnvOptions& env_options, const InternalKeyComparator& icomparator, HistogramImpl* file_read_hist, bool for_compaction, - bool prefix_enabled, bool skip_filters) + bool prefix_enabled, bool skip_filters, int level) : TwoLevelIteratorState(prefix_enabled), table_cache_(table_cache), read_options_(read_options), @@ -494,7 +499,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState { icomparator_(icomparator), file_read_hist_(file_read_hist), for_compaction_(for_compaction), - skip_filters_(skip_filters) {} + skip_filters_(skip_filters), + level_(level) {} InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override { if (meta_handle.size() != sizeof(FileDescriptor)) { @@ -506,7 +512,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { return table_cache_->NewIterator( read_options_, env_options_, icomparator_, *fd, nullptr /* don't need reference to table*/, file_read_hist_, - for_compaction_, nullptr /* arena */, skip_filters_); + for_compaction_, nullptr /* arena */, skip_filters_, level_); } } @@ -522,6 +528,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { HistogramImpl* file_read_hist_; bool for_compaction_; bool skip_filters_; + int level_; }; // A wrapper of version builder which references the current version in @@ -789,7 +796,8 @@ void Version::AddIterators(const ReadOptions& read_options, const auto& file = storage_info_.LevelFilesBrief(0).files[i]; merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr, - cfd_->internal_stats()->GetFileReadHist(0), false, arena)); + cfd_->internal_stats()->GetFileReadHist(0), false, arena, + false /* skip_filters */, 0 /* level */)); } // For levels > 0, we can use a concatenating iterator that sequentially @@ -804,7 +812,7 @@ void Version::AddIterators(const ReadOptions& read_options, cfd_->internal_stats()->GetFileReadHist(level), false /* for_compaction */, cfd_->ioptions()->prefix_extractor != nullptr, - IsFilterSkipped(level)); + IsFilterSkipped(level), level); mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); auto* first_level_iter = new (mem) LevelFileNumIterator( cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level)); @@ -909,7 +917,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, read_options, *internal_comparator(), f->fd, ikey, &get_context, cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), IsFilterSkipped(static_cast(fp.GetHitFileLevel()), - fp.IsHitFileLastInLevel())); + fp.IsHitFileLastInLevel()), + fp.GetCurrentLevel()); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; @@ -2062,9 +2071,16 @@ VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options, env_options_(storage_options), env_options_compactions_(env_options_) {} +void CloseTables(void* ptr, size_t) { + TableReader* table_reader = reinterpret_cast(ptr); + table_reader->Close(); +} + VersionSet::~VersionSet() { // we need to delete column_family_set_ because its destructor depends on // VersionSet + column_family_set_->get_table_cache()->ApplyToAllCacheEntries(&CloseTables, + false); column_family_set_.reset(); for (auto file : obsolete_files_) { delete file; @@ -3275,7 +3291,8 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { read_options, env_options_compactions_, cfd->internal_comparator(), flevel->files[i].fd, nullptr, nullptr, /* no per level latency histogram*/ - true /* for compaction */); + true /* for_compaction */, nullptr /* arena */, + false /* skip_filters */, (int)which /* level */); } } else { // Create concatenating iterator for the files from this level @@ -3285,7 +3302,7 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) { cfd->internal_comparator(), nullptr /* no per level latency histogram */, true /* for_compaction */, false /* prefix enabled */, - false /* skip_filters */), + false /* skip_filters */, (int)which /* level */), new LevelFileNumIterator(cfd->internal_comparator(), c->input_levels(which))); } diff --git a/examples/rocksdb_option_file_example.ini b/examples/rocksdb_option_file_example.ini index 838afe8eb..7dc070429 100644 --- a/examples/rocksdb_option_file_example.ini +++ b/examples/rocksdb_option_file_example.ini @@ -138,6 +138,7 @@ block_size=8192 block_restart_interval=16 cache_index_and_filter_blocks=false + pin_l0_filter_and_index_blocks_in_cache=false index_type=kBinarySearch hash_index_allow_collision=true flush_block_policy_factory=FlushBlockBySizePolicyFactory diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 4bb870e20..6e52d20af 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -451,6 +451,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_cache_index_and_filter_blocks( rocksdb_block_based_table_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void +rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache( + rocksdb_block_based_table_options_t*, unsigned char); +extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_skip_table_builder_flush( rocksdb_block_based_table_options_t* options, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_block_based_table_factory( diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 327270e34..638928ff5 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -146,6 +146,10 @@ class Cache { virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), bool thread_safe) = 0; + // Remove all entries. + // Prerequisit: no entry is referenced. + virtual void EraseUnRefEntries() = 0; + private: void LRU_Remove(Handle* e); void LRU_Append(Handle* e); diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index cb4d850e8..8aba3a153 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -64,6 +64,12 @@ struct BlockBasedTableOptions { // block during table initialization. bool cache_index_and_filter_blocks = false; + // if cache_index_and_filter_blocks is true and the below is true, then + // filter and index blocks are stored in the cache, but a reference is + // held in the "table reader" object so the blocks are pinned and only + // evicted from cache when the table reader is freed. + bool pin_l0_filter_and_index_blocks_in_cache = false; + // The index type that will be used for this table. enum IndexType : char { // A space efficient index block that is optimized for diff --git a/java/rocksjni/table.cc b/java/rocksjni/table.cc index 97aef9888..204d1ba38 100644 --- a/java/rocksjni/table.cc +++ b/java/rocksjni/table.cc @@ -38,13 +38,14 @@ jlong Java_org_rocksdb_PlainTableConfig_newTableFactoryHandle( /* * Class: org_rocksdb_BlockBasedTableConfig * Method: newTableFactoryHandle - * Signature: (ZJIJIIZIZZJIBBI)J + * Signature: (ZJIJIIZIZZZJIBBI)J */ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle( JNIEnv* env, jobject jobj, jboolean no_block_cache, jlong block_cache_size, jint block_cache_num_shardbits, jlong block_size, jint block_size_deviation, jint block_restart_interval, jboolean whole_key_filtering, jlong jfilterPolicy, jboolean cache_index_and_filter_blocks, + jboolean pin_l0_filter_and_index_blocks_in_cache, jboolean hash_index_allow_collision, jlong block_cache_compressed_size, jint block_cache_compressd_num_shard_bits, jbyte jchecksum_type, jbyte jindex_type, jint jformat_version) { @@ -70,6 +71,8 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle( options.filter_policy = *pFilterPolicy; } options.cache_index_and_filter_blocks = cache_index_and_filter_blocks; + options.pin_l0_filter_and_index_blocks_in_cache = + pin_l0_filter_and_index_blocks_in_cache; options.hash_index_allow_collision = hash_index_allow_collision; if (block_cache_compressed_size > 0) { if (block_cache_compressd_num_shard_bits > 0) { diff --git a/java/src/main/java/org/rocksdb/BlockBasedTableConfig.java b/java/src/main/java/org/rocksdb/BlockBasedTableConfig.java index f569e6f42..050eff1c8 100644 --- a/java/src/main/java/org/rocksdb/BlockBasedTableConfig.java +++ b/java/src/main/java/org/rocksdb/BlockBasedTableConfig.java @@ -21,6 +21,7 @@ public class BlockBasedTableConfig extends TableFormatConfig { wholeKeyFiltering_ = true; filter_ = null; cacheIndexAndFilterBlocks_ = false; + pinL0FilterAndIndexBlocksInCache_ = false; hashIndexAllowCollision_ = true; blockCacheCompressedSize_ = 0; blockCacheCompressedNumShardBits_ = 0; @@ -226,6 +227,29 @@ public class BlockBasedTableConfig extends TableFormatConfig { return this; } + /** + * Indicating if we'd like to pin L0 index/filter blocks to the block cache. + If not specified, defaults to false. + * + * @return if L0 index and filter blocks should be pinned to the block cache. + */ + public boolean pinL0FilterAndIndexBlocksInCache() { + return pinL0FilterAndIndexBlocksInCache_; + } + + /** + * Indicating if we'd like to pin L0 index/filter blocks to the block cache. + If not specified, defaults to false. + * + * @param pinL0FilterAndIndexBlocksInCache pin blocks in block cache + * @return the reference to the current config. + */ + public BlockBasedTableConfig setPinL0FilterAndIndexBlocksInCache( + final boolean pinL0FilterAndIndexBlocksInCache) { + pinL0FilterAndIndexBlocksInCache_ = pinL0FilterAndIndexBlocksInCache; + return this; + } + /** * Influence the behavior when kHashSearch is used. if false, stores a precise prefix to block range mapping @@ -393,6 +417,7 @@ public class BlockBasedTableConfig extends TableFormatConfig { blockCacheNumShardBits_, blockSize_, blockSizeDeviation_, blockRestartInterval_, wholeKeyFiltering_, filterHandle, cacheIndexAndFilterBlocks_, + pinL0FilterAndIndexBlocksInCache_, hashIndexAllowCollision_, blockCacheCompressedSize_, blockCacheCompressedNumShardBits_, checksumType_.getValue(), indexType_.getValue(), @@ -403,11 +428,13 @@ public class BlockBasedTableConfig extends TableFormatConfig { boolean noBlockCache, long blockCacheSize, int blockCacheNumShardBits, long blockSize, int blockSizeDeviation, int blockRestartInterval, boolean wholeKeyFiltering, long filterPolicyHandle, - boolean cacheIndexAndFilterBlocks, boolean hashIndexAllowCollision, - long blockCacheCompressedSize, int blockCacheCompressedNumShardBits, - byte checkSumType, byte indexType, int formatVersion); + boolean cacheIndexAndFilterBlocks, boolean pinL0FilterAndIndexBlocksInCache, + boolean hashIndexAllowCollision, long blockCacheCompressedSize, + int blockCacheCompressedNumShardBits, byte checkSumType, + byte indexType, int formatVersion); private boolean cacheIndexAndFilterBlocks_; + private boolean pinL0FilterAndIndexBlocksInCache_; private IndexType indexType_; private boolean hashIndexAllowCollision_; private ChecksumType checksumType_; diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 75917232d..c2617b168 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -64,7 +64,7 @@ Status BlockBasedTableFactory::NewTableReader( table_reader_options.ioptions, table_reader_options.env_options, table_options_, table_reader_options.internal_comparator, std::move(file), file_size, table_reader, prefetch_enabled, - table_reader_options.skip_filters); + table_reader_options.skip_filters, table_reader_options.level); } TableBuilder* BlockBasedTableFactory::NewTableBuilder( @@ -94,6 +94,12 @@ Status BlockBasedTableFactory::SanitizeOptions( return Status::InvalidArgument("Enable cache_index_and_filter_blocks, " ", but block cache is disabled"); } + if (table_options_.pin_l0_filter_and_index_blocks_in_cache && + table_options_.no_block_cache) { + return Status::InvalidArgument( + "Enable pin_l0_filter_and_index_blocks_in_cache, " + ", but block cache is disabled"); + } if (!BlockBasedTableSupportedVersion(table_options_.format_version)) { return Status::InvalidArgument( "Unsupported BlockBasedTable format_version. Please check " @@ -115,6 +121,10 @@ std::string BlockBasedTableFactory::GetPrintableTableOptions() const { snprintf(buffer, kBufferSize, " cache_index_and_filter_blocks: %d\n", table_options_.cache_index_and_filter_blocks); ret.append(buffer); + snprintf(buffer, kBufferSize, + " pin_l0_filter_and_index_blocks_in_cache: %d\n", + table_options_.pin_l0_filter_and_index_blocks_in_cache); + ret.append(buffer); snprintf(buffer, kBufferSize, " index_type: %d\n", table_options_.index_type); ret.append(buffer); diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index e48eea694..f0b192f7f 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -340,6 +340,28 @@ class HashIndexReader : public IndexReader { BlockContents prefixes_contents_; }; +// CachableEntry represents the entries that *may* be fetched from block cache. +// field `value` is the item we want to get. +// field `cache_handle` is the cache handle to the block cache. If the value +// was not read from cache, `cache_handle` will be nullptr. +template +struct BlockBasedTable::CachableEntry { + CachableEntry(TValue* _value, Cache::Handle* _cache_handle) + : value(_value), cache_handle(_cache_handle) {} + CachableEntry() : CachableEntry(nullptr, nullptr) {} + void Release(Cache* cache) { + if (cache_handle) { + cache->Release(cache_handle); + value = nullptr; + cache_handle = nullptr; + } + } + bool IsSet() const { return cache_handle != nullptr; } + + TValue* value = nullptr; + // if the entry is from the cache, cache_handle will be populated. + Cache::Handle* cache_handle = nullptr; +}; struct BlockBasedTable::Rep { Rep(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options, @@ -394,34 +416,21 @@ struct BlockBasedTable::Rep { // and compatible with existing code, we introduce a wrapper that allows // block to extract prefix without knowing if a key is internal or not. unique_ptr internal_prefix_transform; + + // only used in level 0 files: + // when pin_l0_filter_and_index_blocks_in_cache is true, we do use the + // LRU cache, but we always keep the filter & idndex block's handle checked + // out here (=we don't call Release()), plus the parsed out objects + // the LRU cache will never push flush them out, hence they're pinned + CachableEntry filter_entry; + CachableEntry index_entry; }; BlockBasedTable::~BlockBasedTable() { + Close(); delete rep_; } -// CachableEntry represents the entries that *may* be fetched from block cache. -// field `value` is the item we want to get. -// field `cache_handle` is the cache handle to the block cache. If the value -// was not read from cache, `cache_handle` will be nullptr. -template -struct BlockBasedTable::CachableEntry { - CachableEntry(TValue* _value, Cache::Handle* _cache_handle) - : value(_value), cache_handle(_cache_handle) {} - CachableEntry() : CachableEntry(nullptr, nullptr) {} - void Release(Cache* cache) { - if (cache_handle) { - cache->Release(cache_handle); - value = nullptr; - cache_handle = nullptr; - } - } - - TValue* value = nullptr; - // if the entry is from the cache, cache_handle will be populated. - Cache::Handle* cache_handle = nullptr; -}; - // Helper function to setup the cache key's prefix for the Table. void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) { assert(kMaxCacheKeyPrefixSize >= 10); @@ -498,7 +507,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, uint64_t file_size, unique_ptr* table_reader, const bool prefetch_index_and_filter, - const bool skip_filters) { + const bool skip_filters, const int level) { table_reader->reset(); Footer footer; @@ -594,14 +603,33 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, assert(table_options.block_cache != nullptr); // Hack: Call NewIndexIterator() to implicitly add index to the // block_cache + + // if pin_l0_filter_and_index_blocks_in_cache is true and this is + // a level0 file, then we will pass in this pointer to rep->index + // to NewIndexIterator(), which will save the index block in there + // else it's a nullptr and nothing special happens + CachableEntry* index_entry = nullptr; + if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache && + level == 0) { + index_entry = &rep->index_entry; + } unique_ptr iter( - new_table->NewIndexIterator(ReadOptions())); + new_table->NewIndexIterator(ReadOptions(), nullptr, index_entry)); s = iter->status(); if (s.ok()) { // Hack: Call GetFilter() to implicitly add filter to the block_cache auto filter_entry = new_table->GetFilter(); - filter_entry.Release(table_options.block_cache.get()); + // if pin_l0_filter_and_index_blocks_in_cache is true, and this is + // a level0 file, then save it in rep_->filter_entry; it will be + // released in the destructor only, hence it will be pinned in the + // cache until this reader is alive + if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache && + level == 0) { + rep->filter_entry = filter_entry; + } else { + filter_entry.Release(table_options.block_cache.get()); + } } } else { // If we don't use block cache for index/filter blocks access, we'll @@ -886,14 +914,19 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( return {rep_->filter.get(), nullptr /* cache handle */}; } - PERF_TIMER_GUARD(read_filter_block_nanos); - Cache* block_cache = rep_->table_options.block_cache.get(); if (rep_->filter_policy == nullptr /* do not use filter */ || block_cache == nullptr /* no block cache at all */) { return {nullptr /* filter */, nullptr /* cache handle */}; } + // we have a pinned filter block + if (rep_->filter_entry.IsSet()) { + return rep_->filter_entry; + } + + PERF_TIMER_GUARD(read_filter_block_nanos); + // Fetching from the cache char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, @@ -935,12 +968,19 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( } InternalIterator* BlockBasedTable::NewIndexIterator( - const ReadOptions& read_options, BlockIter* input_iter) { + const ReadOptions& read_options, BlockIter* input_iter, + CachableEntry* index_entry) { // index reader has already been pre-populated. if (rep_->index_reader) { return rep_->index_reader->NewIterator( input_iter, read_options.total_order_seek); } + // we have a pinned index block + if (rep_->index_entry.IsSet()) { + return rep_->index_entry.value->NewIterator(input_iter, + read_options.total_order_seek); + } + PERF_TIMER_GUARD(read_index_block_nanos); bool no_io = read_options.read_tier == kBlockCacheTier; @@ -996,7 +1036,15 @@ InternalIterator* BlockBasedTable::NewIndexIterator( assert(cache_handle); auto* iter = index_reader->NewIterator( input_iter, read_options.total_order_seek); - iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); + + // the caller would like to take ownership of the index block + // don't call RegisterCleanup() in this case, the caller will take care of it + if (index_entry != nullptr) { + *index_entry = {index_reader, cache_handle}; + } else { + iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); + } + return iter; } @@ -1224,7 +1272,13 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) { RecordTick(statistics, BLOOM_FILTER_PREFIX_USEFUL); } - filter_entry.Release(rep_->table_options.block_cache.get()); + // if rep_->filter_entry is not set, we should call Release(); otherwise + // don't call, in this case we have a local copy in rep_->filter_entry, + // it's pinned to the cache and will be released in the destructor + if (!rep_->filter_entry.IsSet()) { + filter_entry.Release(rep_->table_options.block_cache.get()); + } + return may_match; } @@ -1324,7 +1378,12 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, } } - filter_entry.Release(rep_->table_options.block_cache.get()); + // if rep_->filter_entry is not set, we should call Release(); otherwise + // don't call, in this case we have a local copy in rep_->filter_entry, + // it's pinned to the cache and will be released in the destructor + if (!rep_->filter_entry.IsSet()) { + filter_entry.Release(rep_->table_options.block_cache.get()); + } return s; } @@ -1612,6 +1671,11 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) { return s; } +void BlockBasedTable::Close() { + rep_->filter_entry.Release(rep_->table_options.block_cache.get()); + rep_->index_entry.Release(rep_->table_options.block_cache.get()); +} + Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) { out_file->Append( "Index Details:\n" diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 600ca18a3..6a88d9d9a 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -76,7 +76,7 @@ class BlockBasedTable : public TableReader { unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader, bool prefetch_index_and_filter = true, - bool skip_filters = false); + bool skip_filters = false, int level = -1); bool PrefixMayMatch(const Slice& internal_key); @@ -119,6 +119,8 @@ class BlockBasedTable : public TableReader { // convert SST file to a human readable form Status DumpTable(WritableFile* out_file) override; + void Close() override; + ~BlockBasedTable(); bool TEST_filter_block_preloaded() const; @@ -155,8 +157,9 @@ class BlockBasedTable : public TableReader { // 2. index is not present in block cache. // 3. We disallowed any io to be performed, that is, read_options == // kBlockCacheTier - InternalIterator* NewIndexIterator(const ReadOptions& read_options, - BlockIter* input_iter = nullptr); + InternalIterator* NewIndexIterator( + const ReadOptions& read_options, BlockIter* input_iter = nullptr, + CachableEntry* index_entry = nullptr); // Read block cache from block caches (if set): block_cache and // block_cache_compressed. diff --git a/table/table_builder.h b/table/table_builder.h index ed79bed0e..274245f08 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -29,17 +29,20 @@ struct TableReaderOptions { TableReaderOptions(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options, const InternalKeyComparator& _internal_comparator, - bool _skip_filters = false) + bool _skip_filters = false, int _level = -1) : ioptions(_ioptions), env_options(_env_options), internal_comparator(_internal_comparator), - skip_filters(_skip_filters) {} + skip_filters(_skip_filters), + level(_level) {} const ImmutableCFOptions& ioptions; const EnvOptions& env_options; const InternalKeyComparator& internal_comparator; // This is only used for BlockBasedTable (reader) bool skip_filters; + // what level this table/file is on, -1 for "not set, don't know" + int level; }; struct TableBuilderOptions { diff --git a/table/table_reader.h b/table/table_reader.h index 5751ab03f..c047bf8cb 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -91,6 +91,8 @@ class TableReader { virtual Status DumpTable(WritableFile* out_file) { return Status::NotSupported("DumpTable() not supported"); } + + virtual void Close() {} }; } // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index 3cc7d0dc7..1ce61cadf 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -333,6 +333,8 @@ class TableConstructor: public Constructor { return convert_to_internal_key_; } + void ResetTableReader() { table_reader_.reset(); } + private: void Reset() { uniq_id_ = 0; @@ -1017,6 +1019,7 @@ TEST_F(BlockBasedTableTest, BasicBlockBasedTableProperties) { } Slice content = block_builder.Finish(); ASSERT_EQ(content.size() + kBlockTrailerSize, props.data_size); + c.ResetTableReader(); } TEST_F(BlockBasedTableTest, FilterPolicyNameProperties) { @@ -1034,6 +1037,7 @@ TEST_F(BlockBasedTableTest, FilterPolicyNameProperties) { GetPlainInternalComparator(options.comparator), &keys, &kvmap); auto& props = *c.GetTableReader()->GetTableProperties(); ASSERT_EQ("rocksdb.BuiltinBloomFilter", props.filter_policy_name); + c.ResetTableReader(); } // @@ -1075,6 +1079,7 @@ void PrefetchRange(TableConstructor* c, Options* opt, // assert our expectation in cache warmup AssertKeysInCache(table_reader, keys_in_cache, keys_not_in_cache); + c->ResetTableReader(); } TEST_F(BlockBasedTableTest, PrefetchTest) { @@ -1102,6 +1107,7 @@ TEST_F(BlockBasedTableTest, PrefetchTest) { stl_wrappers::KVMap kvmap; const ImmutableCFOptions ioptions(opt); c.Finish(opt, ioptions, table_options, *ikc, &keys, &kvmap); + c.ResetTableReader(); // We get the following data spread : // @@ -1157,6 +1163,7 @@ TEST_F(BlockBasedTableTest, PrefetchTest) { PrefetchRange(&c, &opt, &table_options, keys, "k06", "k00", {}, {}, Status::InvalidArgument(Slice("k06 "), Slice("k07"))); + c.ResetTableReader(); } TEST_F(BlockBasedTableTest, TotalOrderSeekOnHashIndex) { @@ -1400,6 +1407,7 @@ TEST_F(TableTest, HashIndexTest) { ASSERT_TRUE(BytewiseComparator()->Compare(prefix, ukey_prefix) < 0); } } + c.ResetTableReader(); } // It's very hard to figure out the index block size of a block accurately. @@ -1440,6 +1448,7 @@ TEST_F(BlockBasedTableTest, IndexSizeStat) { auto index_size = c.GetTableReader()->GetTableProperties()->index_size; ASSERT_GT(index_size, last_index_size); last_index_size = index_size; + c.ResetTableReader(); } } @@ -1466,6 +1475,7 @@ TEST_F(BlockBasedTableTest, NumBlockStat) { GetPlainInternalComparator(options.comparator), &ks, &kvmap); ASSERT_EQ(kvmap.size(), c.GetTableReader()->GetTableProperties()->num_data_blocks); + c.ResetTableReader(); } // A simple tool that takes the snapshot of block cache statistics. @@ -1662,6 +1672,8 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { // release the iterator so that the block cache can reset correctly. iter.reset(); + c.ResetTableReader(); + // -- PART 2: Open with very small block cache // In this test, no block will ever get hit since the block cache is // too small to fit even one entry. @@ -1702,6 +1714,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ASSERT_EQ(props.GetCacheBytesRead(), 0); } iter.reset(); + c.ResetTableReader(); // -- PART 3: Open table with bloom filter enabled but not in SST file table_options.block_cache = NewLRUCache(4096); @@ -1715,7 +1728,9 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ImmutableCFOptions ioptions3(options); // Generate table without filter policy c3.Finish(options, ioptions3, table_options, - GetPlainInternalComparator(options.comparator), &keys, &kvmap); + GetPlainInternalComparator(options.comparator), &keys, &kvmap); + c3.ResetTableReader(); + // Open table with filter policy table_options.filter_policy.reset(NewBloomFilterPolicy(1)); options.table_factory.reset(new BlockBasedTableFactory(table_options)); @@ -1732,6 +1747,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ASSERT_EQ(value, "hello"); BlockCachePropertiesSnapshot props(options.statistics.get()); props.AssertFilterBlockStat(0, 0); + c3.ResetTableReader(); } void ValidateBlockSizeDeviation(int value, int expected) { @@ -1894,6 +1910,7 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) { for (const std::string& key : keys) { ASSERT_TRUE(table_reader->TEST_KeyInCache(ReadOptions(), key)); } + c.ResetTableReader(); // rerun with different block cache table_options.block_cache = NewLRUCache(16 * 1024 * 1024); @@ -1904,6 +1921,7 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) { for (const std::string& key : keys) { ASSERT_TRUE(!table_reader->TEST_KeyInCache(ReadOptions(), key)); } + c.ResetTableReader(); } // Plain table is not supported in ROCKSDB_LITE @@ -1991,6 +2009,7 @@ TEST_F(GeneralTableTest, ApproximateOffsetOfPlain) { ASSERT_TRUE(Between(c.ApproximateOffsetOf("k06"), 510000, 511000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("k07"), 510000, 511000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000)); + c.ResetTableReader(); } static void DoCompressionTest(CompressionType comp) { @@ -2017,6 +2036,7 @@ static void DoCompressionTest(CompressionType comp) { ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), 2000, 3000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), 2000, 3000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 4000, 6100)); + c.ResetTableReader(); } TEST_F(GeneralTableTest, ApproximateOffsetOfCompressed) { @@ -2342,6 +2362,7 @@ TEST_P(IndexBlockRestartIntervalTest, IndexBlockRestartInterval) { kv_iter++; } ASSERT_EQ(kv_iter, kvmap.end()); + c.ResetTableReader(); } class PrefixTest : public testing::Test { diff --git a/tools/benchmark.sh b/tools/benchmark.sh index e4729e7f3..d28aeb271 100755 --- a/tools/benchmark.sh +++ b/tools/benchmark.sh @@ -74,6 +74,7 @@ const_params=" --level_compaction_dynamic_level_bytes=true \ --bytes_per_sync=$((8 * M)) \ --cache_index_and_filter_blocks=0 \ + --pin_l0_filter_and_index_blocks_in_cache=1 \ --benchmark_write_rate_limit=$(( 1024 * 1024 * $mb_written_per_sec )) \ \ --hard_rate_limit=3 \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index e3e11e17b..2e1a83237 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -340,6 +340,9 @@ DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed" DEFINE_bool(cache_index_and_filter_blocks, false, "Cache index/filter blocks in block cache."); +DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache, false, + "Pin index/filter blocks of L0 files in block cache."); + DEFINE_int32(block_size, static_cast(rocksdb::BlockBasedTableOptions().block_size), "Number of bytes in a block."); @@ -2511,6 +2514,8 @@ class Benchmark { } block_based_options.cache_index_and_filter_blocks = FLAGS_cache_index_and_filter_blocks; + block_based_options.pin_l0_filter_and_index_blocks_in_cache = + FLAGS_pin_l0_filter_and_index_blocks_in_cache; block_based_options.block_cache = cache_; block_based_options.block_cache_compressed = compressed_cache_; block_based_options.block_size = FLAGS_block_size; diff --git a/tools/sst_dump_tool_imp.h b/tools/sst_dump_tool_imp.h index 6bbc4d676..b518866cc 100644 --- a/tools/sst_dump_tool_imp.h +++ b/tools/sst_dump_tool_imp.h @@ -59,12 +59,14 @@ class SstFileReader { bool output_hex_; EnvOptions soptions_; - Status init_result_; - unique_ptr table_reader_; - unique_ptr file_; // options_ and internal_comparator_ will also be used in // ReadSequential internally (specifically, seek-related operations) Options options_; + + Status init_result_; + unique_ptr table_reader_; + unique_ptr file_; + const ImmutableCFOptions ioptions_; InternalKeyComparator internal_comparator_; unique_ptr table_properties_; diff --git a/util/cache.cc b/util/cache.cc index 6015644f6..ed9ff870c 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -225,6 +225,8 @@ class LRUCache { void ApplyToAllCacheEntries(void (*callback)(void*, size_t), bool thread_safe); + void EraseUnRefEntries(); + private: void LRU_Remove(LRUHandle* e); void LRU_Append(LRUHandle* e); @@ -280,6 +282,29 @@ bool LRUCache::Unref(LRUHandle* e) { // Call deleter and free +void LRUCache::EraseUnRefEntries() { + autovector last_reference_list; + { + MutexLock l(&mutex_); + while (lru_.next != &lru_) { + LRUHandle* old = lru_.next; + assert(old->in_cache); + assert(old->refs == + 1); // LRU list contains elements which may be evicted + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + old->in_cache = false; + Unref(old); + usage_ -= old->charge; + last_reference_list.push_back(old); + } + } + + for (auto entry : last_reference_list) { + entry->Free(); + } +} + void LRUCache::ApplyToAllCacheEntries(void (*callback)(void*, size_t), bool thread_safe) { if (thread_safe) { @@ -615,6 +640,13 @@ class ShardedLRUCache : public Cache { shards_[s].ApplyToAllCacheEntries(callback, thread_safe); } } + + virtual void EraseUnRefEntries() override { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + shards_[s].EraseUnRefEntries(); + } + } }; } // end anonymous namespace diff --git a/util/options_helper.h b/util/options_helper.h index b0864442c..f2e4878b1 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -479,54 +479,61 @@ static std::unordered_map cf_options_type_info = { {offsetof(struct ColumnFamilyOptions, compaction_style), OptionType::kCompactionStyle, OptionVerificationType::kNormal}}}; -static std::unordered_map block_based_table_type_info = { - /* currently not supported - std::shared_ptr block_cache = nullptr; - std::shared_ptr block_cache_compressed = nullptr; - */ - {"flush_block_policy_factory", - {offsetof(struct BlockBasedTableOptions, flush_block_policy_factory), - OptionType::kFlushBlockPolicyFactory, OptionVerificationType::kByName}}, - {"cache_index_and_filter_blocks", - {offsetof(struct BlockBasedTableOptions, cache_index_and_filter_blocks), - OptionType::kBoolean, OptionVerificationType::kNormal}}, - {"index_type", - {offsetof(struct BlockBasedTableOptions, index_type), - OptionType::kBlockBasedTableIndexType, OptionVerificationType::kNormal}}, - {"hash_index_allow_collision", - {offsetof(struct BlockBasedTableOptions, hash_index_allow_collision), - OptionType::kBoolean, OptionVerificationType::kNormal}}, - {"checksum", - {offsetof(struct BlockBasedTableOptions, checksum), - OptionType::kChecksumType, OptionVerificationType::kNormal}}, - {"no_block_cache", - {offsetof(struct BlockBasedTableOptions, no_block_cache), - OptionType::kBoolean, OptionVerificationType::kNormal}}, - {"block_size", - {offsetof(struct BlockBasedTableOptions, block_size), OptionType::kSizeT, - OptionVerificationType::kNormal}}, - {"block_size_deviation", - {offsetof(struct BlockBasedTableOptions, block_size_deviation), - OptionType::kInt, OptionVerificationType::kNormal}}, - {"block_restart_interval", - {offsetof(struct BlockBasedTableOptions, block_restart_interval), - OptionType::kInt, OptionVerificationType::kNormal}}, - {"index_block_restart_interval", - {offsetof(struct BlockBasedTableOptions, index_block_restart_interval), - OptionType::kInt, OptionVerificationType::kNormal}}, - {"filter_policy", - {offsetof(struct BlockBasedTableOptions, filter_policy), - OptionType::kFilterPolicy, OptionVerificationType::kByName}}, - {"whole_key_filtering", - {offsetof(struct BlockBasedTableOptions, whole_key_filtering), - OptionType::kBoolean, OptionVerificationType::kNormal}}, - {"skip_table_builder_flush", - {offsetof(struct BlockBasedTableOptions, skip_table_builder_flush), - OptionType::kBoolean, OptionVerificationType::kNormal}}, - {"format_version", - {offsetof(struct BlockBasedTableOptions, format_version), - OptionType::kUInt32T, OptionVerificationType::kNormal}}}; +static std::unordered_map + block_based_table_type_info = { + /* currently not supported + std::shared_ptr block_cache = nullptr; + std::shared_ptr block_cache_compressed = nullptr; + */ + {"flush_block_policy_factory", + {offsetof(struct BlockBasedTableOptions, flush_block_policy_factory), + OptionType::kFlushBlockPolicyFactory, + OptionVerificationType::kByName}}, + {"cache_index_and_filter_blocks", + {offsetof(struct BlockBasedTableOptions, + cache_index_and_filter_blocks), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"pin_l0_filter_and_index_blocks_in_cache", + {offsetof(struct BlockBasedTableOptions, + pin_l0_filter_and_index_blocks_in_cache), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"index_type", + {offsetof(struct BlockBasedTableOptions, index_type), + OptionType::kBlockBasedTableIndexType, + OptionVerificationType::kNormal}}, + {"hash_index_allow_collision", + {offsetof(struct BlockBasedTableOptions, hash_index_allow_collision), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"checksum", + {offsetof(struct BlockBasedTableOptions, checksum), + OptionType::kChecksumType, OptionVerificationType::kNormal}}, + {"no_block_cache", + {offsetof(struct BlockBasedTableOptions, no_block_cache), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"block_size", + {offsetof(struct BlockBasedTableOptions, block_size), + OptionType::kSizeT, OptionVerificationType::kNormal}}, + {"block_size_deviation", + {offsetof(struct BlockBasedTableOptions, block_size_deviation), + OptionType::kInt, OptionVerificationType::kNormal}}, + {"block_restart_interval", + {offsetof(struct BlockBasedTableOptions, block_restart_interval), + OptionType::kInt, OptionVerificationType::kNormal}}, + {"index_block_restart_interval", + {offsetof(struct BlockBasedTableOptions, index_block_restart_interval), + OptionType::kInt, OptionVerificationType::kNormal}}, + {"filter_policy", + {offsetof(struct BlockBasedTableOptions, filter_policy), + OptionType::kFilterPolicy, OptionVerificationType::kByName}}, + {"whole_key_filtering", + {offsetof(struct BlockBasedTableOptions, whole_key_filtering), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"skip_table_builder_flush", + {offsetof(struct BlockBasedTableOptions, skip_table_builder_flush), + OptionType::kBoolean, OptionVerificationType::kNormal}}, + {"format_version", + {offsetof(struct BlockBasedTableOptions, format_version), + OptionType::kUInt32T, OptionVerificationType::kNormal}}}; static std::unordered_map plain_table_type_info = { {"user_key_len", diff --git a/util/options_test.cc b/util/options_test.cc index 7076ea41e..bb5cc96e4 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -1602,7 +1602,9 @@ TEST_F(OptionsParserTest, BlockBasedTableOptionsAllFieldsSettable) { // Need to update the option string if a new option is added. ASSERT_OK(GetBlockBasedTableOptionsFromString( *bbto, - "cache_index_and_filter_blocks=1;index_type=kHashSearch;" + "cache_index_and_filter_blocks=1;" + "pin_l0_filter_and_index_blocks_in_cache=1;" + "index_type=kHashSearch;" "checksum=kxxHash;hash_index_allow_collision=1;no_block_cache=1;" "block_cache=1M;block_cache_compressed=1k;block_size=1024;" "block_size_deviation=8;block_restart_interval=4; " diff --git a/util/testutil.cc b/util/testutil.cc index b8190faf7..8c587511f 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -193,6 +193,7 @@ const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined) { BlockBasedTableOptions RandomBlockBasedTableOptions(Random* rnd) { BlockBasedTableOptions opt; opt.cache_index_and_filter_blocks = rnd->Uniform(2); + opt.pin_l0_filter_and_index_blocks_in_cache = rnd->Uniform(2); opt.index_type = rnd->Uniform(2) ? BlockBasedTableOptions::kBinarySearch : BlockBasedTableOptions::kHashSearch; opt.hash_index_allow_collision = rnd->Uniform(2);