diff --git a/HISTORY.md b/HISTORY.md index 26b6efcdb..6d6ad040e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### New Features * Enabled checkpoint on readonly db (DBImplReadOnly). * Make DB ignore dropped column families while committing results of atomic flush. +* RocksDB may choose to preopen some files even if options.max_open_files != -1. This may make DB open slightly longer. ### Public API Change * Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot before doing the read. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_tracked with default value of false. If true it indicates that call is assumed to be after a ::GetForUpdate. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index c8c30d8cb..b9dcd4040 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -414,6 +414,7 @@ TEST_F(DBCompactionTest, SkipStatsUpdateTest) { // Reopen the DB with stats-update disabled options.skip_stats_update_on_db_open = true; + options.max_open_files = 20; env_->random_file_open_counter_.store(0); Reopen(options); @@ -439,7 +440,7 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { Options options = CurrentOptions(); options.env = env_; options.new_table_reader_for_compaction_inputs = true; - options.max_open_files = 100; + options.max_open_files = 20; options.level0_file_num_compaction_trigger = 3; DestroyAndReopen(options); Random rnd(301); @@ -468,15 +469,16 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { Flush(); dbfull()->TEST_WaitForCompact(); // preloading iterator issues one table cache lookup and create - // a new table reader. - ASSERT_EQ(num_table_cache_lookup, 1); + // a new table reader, if not preloaded. + int old_num_table_cache_lookup = num_table_cache_lookup; + ASSERT_GE(num_table_cache_lookup, 1); ASSERT_EQ(num_new_table_reader, 1); num_table_cache_lookup = 0; num_new_table_reader = 0; ASSERT_EQ(Key(k), Get(Key(k))); // lookup iterator from table cache and no need to create a new one. - ASSERT_EQ(num_table_cache_lookup, 1); + ASSERT_EQ(old_num_table_cache_lookup + num_table_cache_lookup, 2); ASSERT_EQ(num_new_table_reader, 0); } } @@ -489,7 +491,10 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { // a new table reader. One file is created for flush and one for compaction. // Compaction inputs make no table cache look-up for data/range deletion // iterators - ASSERT_EQ(num_table_cache_lookup, 2); + // May preload table cache too. + ASSERT_GE(num_table_cache_lookup, 2); + int old_num_table_cache_lookup2 = num_table_cache_lookup; + // Create new iterator for: // (1) 1 for verifying flush results // (2) 3 for compaction input files @@ -499,7 +504,7 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { num_table_cache_lookup = 0; num_new_table_reader = 0; ASSERT_EQ(Key(1), Get(Key(1))); - ASSERT_EQ(num_table_cache_lookup, 1); + ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3); ASSERT_EQ(num_new_table_reader, 0); num_table_cache_lookup = 0; @@ -511,14 +516,16 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { db_->CompactRange(cro, nullptr, nullptr); // Only verifying compaction outputs issues one table cache lookup // for both data block and range deletion block). - ASSERT_EQ(num_table_cache_lookup, 1); + // May preload table cache too. + ASSERT_GE(num_table_cache_lookup, 1); + old_num_table_cache_lookup2 = num_table_cache_lookup; // One for compaction input, one for verifying compaction results. ASSERT_EQ(num_new_table_reader, 2); num_table_cache_lookup = 0; num_new_table_reader = 0; ASSERT_EQ(Key(1), Get(Key(1))); - ASSERT_EQ(num_table_cache_lookup, 1); + ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 2); ASSERT_EQ(num_new_table_reader, 0); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 5196be7ba..2a3374156 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -41,6 +41,8 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { max_max_open_files = 0x400000; } ClipToRange(&result.max_open_files, 20, max_max_open_files); + TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles", + &result.max_open_files); } if (result.info_log == nullptr) { diff --git a/db/db_properties_test.cc b/db/db_properties_test.cc index 3745135bf..f8733934f 100644 --- a/db/db_properties_test.cc +++ b/db/db_properties_test.cc @@ -393,7 +393,15 @@ TEST_F(DBPropertiesTest, ReadLatencyHistogramByLevel) { options.target_file_size_base = 98 << 10; options.max_write_buffer_number = 2; options.statistics = rocksdb::CreateDBStatistics(); - options.max_open_files = 100; + options.max_open_files = 11; // Make sure no proloading of table readers + + // RocksDB sanitize max open files to at least 20. Modify it back. + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = static_cast(arg); + *max_open_files = 11; + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); BlockBasedTableOptions table_options; table_options.no_block_cache = true; @@ -441,6 +449,7 @@ TEST_F(DBPropertiesTest, ReadLatencyHistogramByLevel) { // Reopen and issue iterating. See thee latency tracked ReopenWithColumnFamilies({"default", "pikachu"}, options); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); ASSERT_TRUE(dbfull()->GetProperty("rocksdb.cf-file-histogram", &prop)); ASSERT_EQ(std::string::npos, prop.find("** Level 0 read latency histogram")); ASSERT_EQ(std::string::npos, prop.find("** Level 1 read latency histogram")); diff --git a/db/db_test2.cc b/db/db_test2.cc index e7a7bfe82..7060eef17 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -342,6 +342,7 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { ASSERT_GE(cache->GetUsage(), 1024 * 1024); Close(); options.write_buffer_manager.reset(); + last_options_.write_buffer_manager.reset(); ASSERT_LT(cache->GetUsage(), 1024 * 1024); } rocksdb::SyncPoint::GetInstance()->DisableProcessing(); @@ -1476,11 +1477,26 @@ TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) { uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS); uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT); + if (!infinite_max_files_) { + // Now we have two files. We narrow the max open files to allow 3 entries + // so that preloading SST files won't happen. + options.max_open_files = 13; + // RocksDB sanitize max open files to at least 20. Modify it back. + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = static_cast(arg); + *max_open_files = 13; + }); + } + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // Reopen database. If max_open_files is set as -1, table readers will be // preloaded. This will trigger a BlockBasedTable::Open() and prefetch // L0 index and filter. Level 1's prefetching is disabled in DB::Open() TryReopenWithColumnFamilies({"default", "pikachu"}, options); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + if (infinite_max_files_) { // After reopen, cache miss are increased by one because we read (and only // read) filter and index on L0 @@ -1650,9 +1666,11 @@ class MockPersistentCache : public PersistentCache { // Make sure that in CPU time perf context counters, Env::NowCPUNanos() // is used, rather than Env::CPUNanos(); TEST_F(DBTest2, TestPerfContextCpuTime) { + // force resizing table cache so table handle is not preloaded so that + // we can measure find_table_nanos during Get(). + dbfull()->TEST_table_cache()->SetCapacity(0); ASSERT_OK(Put("foo", "bar")); ASSERT_OK(Flush()); - env_->now_cpu_count_.store(0); // CPU timing is not enabled with kEnableTimeExceptForMutex diff --git a/db/db_test_util.cc b/db/db_test_util.cc index de096d254..d78fbfca9 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -604,6 +604,7 @@ Status DBTestBase::TryReopenWithColumnFamilies( column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i])); } DBOptions db_opts = DBOptions(options[0]); + last_options_ = options[0]; return DB::Open(db_opts, dbname_, column_families, &handles_, &db_); } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 1ba502458..78f72b4a0 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -283,7 +283,31 @@ TEST_F(DBWALTest, RecoverWithTableHandle) { ASSERT_OK(Put(1, "bar", "v4")); ASSERT_OK(Flush(1)); ASSERT_OK(Put(1, "big", std::string(100, 'a'))); - ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); + + options = CurrentOptions(); + const int kSmallMaxOpenFiles = 13; + if (option_config_ == kDBLogDir) { + // Use this option to check not preloading files + // Set the max open files to be small enough so no preload will + // happen. + options.max_open_files = kSmallMaxOpenFiles; + // RocksDB sanitize max open files to at least 20. Modify it back. + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = static_cast(arg); + *max_open_files = kSmallMaxOpenFiles; + }); + + } else if (option_config_ == kWalDirAndMmapReads) { + // Use this option to check always loading all files. + options.max_open_files = 100; + } else { + options.max_open_files = -1; + } + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); std::vector> files; dbfull()->TEST_GetFilesMetaData(handles_[1], &files); @@ -294,10 +318,10 @@ TEST_F(DBWALTest, RecoverWithTableHandle) { ASSERT_EQ(total_files, 3); for (const auto& level : files) { for (const auto& file : level) { - if (kInfiniteMaxOpenFiles == option_config_) { - ASSERT_TRUE(file.table_reader_handle != nullptr); - } else { + if (options.max_open_files == kSmallMaxOpenFiles) { ASSERT_TRUE(file.table_reader_handle == nullptr); + } else { + ASSERT_TRUE(file.table_reader_handle != nullptr); } } } diff --git a/db/table_cache.h b/db/table_cache.h index e3936ab44..180ebc6bd 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -122,6 +122,8 @@ class TableCache { // Release the handle from a cache void ReleaseHandle(Cache::Handle* handle); + Cache* get_cache() const { return cache_; } + // Capacity of the backing Cache that indicates inifinite TableCache capacity. // For example when max_open_files is -1 we set the backing Cache to this. static const int kInfiniteCapacity = 0x400000; diff --git a/db/version_builder.cc b/db/version_builder.cc index 5d0d6d927..7b45347c1 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -366,8 +366,40 @@ class VersionBuilder::Rep { void LoadTableHandlers(InternalStats* internal_stats, int max_threads, bool prefetch_index_and_filter_in_cache, + bool is_initial_load, const SliceTransform* prefix_extractor) { assert(table_cache_ != nullptr); + + size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity(); + bool always_load = (table_cache_capacity == TableCache::kInfiniteCapacity); + size_t max_load = port::kMaxSizet; + + if (!always_load) { + // If it is initial loading and not set to always laoding all the + // files, we only load up to kInitialLoadLimit files, to limit the + // time reopening the DB. + const size_t kInitialLoadLimit = 16; + size_t load_limit; + // If the table cache is not 1/4 full, we pin the table handle to + // file metadata to avoid the cache read costs when reading the file. + // The downside of pinning those files is that LRU won't be followed + // for those files. This doesn't matter much because if number of files + // of the DB excceeds table cache capacity, eventually no table reader + // will be pinned and LRU will be followed. + if (is_initial_load) { + load_limit = std::min(kInitialLoadLimit, table_cache_capacity / 4); + } else { + load_limit = table_cache_capacity / 4; + } + + size_t table_cache_usage = table_cache_->get_cache()->GetUsage(); + if (table_cache_usage >= load_limit) { + return; + } else { + max_load = load_limit - table_cache_usage; + } + } + // std::vector> files_meta; for (int level = 0; level < num_levels_; level++) { @@ -375,6 +407,12 @@ class VersionBuilder::Rep { auto* file_meta = file_meta_pair.second; assert(!file_meta->table_reader_handle); files_meta.emplace_back(file_meta, level); + if (files_meta.size() >= max_load) { + break; + } + } + if (files_meta.size() >= max_load) { + break; } } @@ -452,9 +490,11 @@ void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats, int max_threads, bool prefetch_index_and_filter_in_cache, + bool is_initial_load, const SliceTransform* prefix_extractor) { rep_->LoadTableHandlers(internal_stats, max_threads, - prefetch_index_and_filter_in_cache, prefix_extractor); + prefetch_index_and_filter_in_cache, is_initial_load, + prefix_extractor); } void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level, diff --git a/db/version_builder.h b/db/version_builder.h index 4d32aec2b..d6ee37e08 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -35,6 +35,7 @@ class VersionBuilder { void SaveTo(VersionStorageInfo* vstorage); void LoadTableHandlers(InternalStats* internal_stats, int max_threads, bool prefetch_index_and_filter_in_cache, + bool is_initial_load, const SliceTransform* prefix_extractor); void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f); diff --git a/db/version_set.cc b/db/version_set.cc index 2d6997601..8de8c86b5 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3002,9 +3002,7 @@ Status VersionSet::ProcessManifestWrites( mu->Unlock(); TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); - if (!first_writer.edit_list.front()->IsColumnFamilyManipulation() && - column_family_set_->get_table_cache()->GetCapacity() == - TableCache::kInfiniteCapacity) { + if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) { for (int i = 0; i < static_cast(versions.size()); ++i) { assert(!builder_guards.empty() && builder_guards.size() == versions.size()); @@ -3013,7 +3011,10 @@ Status VersionSet::ProcessManifestWrites( ColumnFamilyData* cfd = versions[i]->cfd_; builder_guards[i]->version_builder()->LoadTableHandlers( cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits, - true /* prefetch_index_and_filter_in_cache */, + this->GetColumnFamilySet()->get_table_cache()->GetCapacity() == + TableCache:: + kInfiniteCapacity /* prefetch_index_and_filter_in_cache */, + false /* is_initial_load */, mutable_cf_options_ptrs[i]->prefix_extractor.get()); } } @@ -3671,15 +3672,13 @@ Status VersionSet::Recover( assert(builders_iter != builders.end()); auto* builder = builders_iter->second->version_builder(); - if (GetColumnFamilySet()->get_table_cache()->GetCapacity() == - TableCache::kInfiniteCapacity) { - // unlimited table cache. Pre-load table handle now. - // Need to do it out of the mutex. - builder->LoadTableHandlers( - cfd->internal_stats(), db_options_->max_file_opening_threads, - false /* prefetch_index_and_filter_in_cache */, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); - } + // unlimited table cache. Pre-load table handle now. + // Need to do it out of the mutex. + builder->LoadTableHandlers( + cfd->internal_stats(), db_options_->max_file_opening_threads, + false /* prefetch_index_and_filter_in_cache */, + true /* is_initial_load */, + cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); Version* v = new Version(cfd, this, env_options_, *cfd->GetLatestMutableCFOptions(),