diff --git a/db/db_test.cc b/db/db_test.cc index a15a0a714..b7aa39ef9 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8377,6 +8377,62 @@ TEST_F(DBTest, UnsupportedManualSync) { ASSERT_TRUE(s.IsNotSupported()); } +TEST_F(DBTest, OpenDBWithInfiniteMaxOpenFiles) { + // Open DB with infinite max open files + // - First iteration use 1 thread to open files + // - Second iteration use 5 threads to open files + for (int iter = 0; iter < 2; iter++) { + Options options; + options.create_if_missing = true; + options.write_buffer_size = 100000; + options.disable_auto_compactions = true; + options.max_open_files = -1; + if (iter == 0) { + options.max_file_opening_threads = 1; + } else { + options.max_file_opening_threads = 5; + } + options = CurrentOptions(options); + DestroyAndReopen(options); + + // Create 12 Files in L0 (then move then to L2) + for (int i = 0; i < 12; i++) { + std::string k = "L2_" + Key(i); + ASSERT_OK(Put(k, k + std::string(1000, 'a'))); + ASSERT_OK(Flush()); + } + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 2; + db_->CompactRange(compact_options, nullptr, nullptr); + + // Create 12 Files in L0 + for (int i = 0; i < 12; i++) { + std::string k = "L0_" + Key(i); + ASSERT_OK(Put(k, k + std::string(1000, 'a'))); + ASSERT_OK(Flush()); + } + Close(); + + // Reopening the DB will load all exisitng files + Reopen(options); + ASSERT_EQ("12,0,12", FilesPerLevel(0)); + std::vector> files; + dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files); + + for (const auto& level : files) { + for (const auto& file : level) { + ASSERT_TRUE(file.table_reader_handle != nullptr); + } + } + + for (int i = 0; i < 12; i++) { + ASSERT_EQ(Get("L0_" + Key(i)), "L0_" + Key(i) + std::string(1000, 'a')); + ASSERT_EQ(Get("L2_" + Key(i)), "L2_" + Key(i) + std::string(1000, 'a')); + } + } +} + INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam, ::testing::Values(1, 4)); diff --git a/db/version_builder.cc b/db/version_builder.cc index c010ee429..9ef952a0f 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -15,7 +15,9 @@ #include #include +#include #include +#include #include #include #include @@ -278,12 +280,26 @@ class VersionBuilder::Rep { CheckConsistency(vstorage); } - void LoadTableHandlers() { + void LoadTableHandlers(int max_threads) { assert(table_cache_ != nullptr); + std::vector files_meta; for (int level = 0; level < base_vstorage_->num_levels(); level++) { for (auto& file_meta_pair : levels_[level].added_files) { auto* file_meta = file_meta_pair.second; assert(!file_meta->table_reader_handle); + files_meta.push_back(file_meta); + } + } + + std::atomic next_file_meta_idx(0); + std::function load_handlers_func = [&]() { + while (true) { + size_t file_idx = next_file_meta_idx.fetch_add(1); + if (file_idx >= files_meta.size()) { + break; + } + + auto* file_meta = files_meta[file_idx]; table_cache_->FindTable( env_options_, *(base_vstorage_->InternalComparator()), file_meta->fd, &file_meta->table_reader_handle, false); @@ -293,6 +309,19 @@ class VersionBuilder::Rep { file_meta->table_reader_handle); } } + }; + + if (max_threads <= 1) { + load_handlers_func(); + } else { + std::vector threads; + for (int i = 0; i < max_threads; i++) { + threads.emplace_back(load_handlers_func); + } + + for (auto& t : threads) { + t.join(); + } } } @@ -321,7 +350,9 @@ void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); } void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { rep_->SaveTo(vstorage); } -void VersionBuilder::LoadTableHandlers() { rep_->LoadTableHandlers(); } +void VersionBuilder::LoadTableHandlers(int max_threads) { + rep_->LoadTableHandlers(max_threads); +} void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) { rep_->MaybeAddFile(vstorage, level, f); diff --git a/db/version_builder.h b/db/version_builder.h index 452604f17..b31b7a78a 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -30,7 +30,7 @@ class VersionBuilder { int level); void Apply(VersionEdit* edit); void SaveTo(VersionStorageInfo* vstorage); - void LoadTableHandlers(); + void LoadTableHandlers(int max_threads = 1); void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f); private: diff --git a/db/version_set.cc b/db/version_set.cc index f4a2b045c..c142cb985 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2393,9 +2393,9 @@ Status VersionSet::Recover( auto* builder = builders_iter->second->version_builder(); if (db_options_->max_open_files == -1) { - // unlimited table cache. Pre-load table handle now. - // Need to do it out of the mutex. - builder->LoadTableHandlers(); + // unlimited table cache. Pre-load table handle now. + // Need to do it out of the mutex. + builder->LoadTableHandlers(db_options_->max_file_opening_threads); } Version* v = new Version(cfd, this, current_version_number_++); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 72202753b..51c7cb956 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -810,6 +810,11 @@ struct DBOptions { // Default: 5000 int max_open_files; + // If max_open_files is -1, DB will open all files on DB::Open(). You can + // use this option to increase the number of threads used to open the files. + // Default: 1 + int max_file_opening_threads; + // Once write-ahead logs exceed this size, we will start forcing the flush of // column families whose memtables are backed by the oldest live WAL file // (i.e. the ones that are causing all the space amplification). If set to 0 diff --git a/util/options.cc b/util/options.cc index abfc55171..552d4f8ec 100644 --- a/util/options.cc +++ b/util/options.cc @@ -208,6 +208,7 @@ DBOptions::DBOptions() info_log_level(DEBUG_LEVEL), #endif // NDEBUG max_open_files(5000), + max_file_opening_threads(1), max_total_wal_size(0), statistics(nullptr), disableDataSync(false), @@ -256,6 +257,7 @@ DBOptions::DBOptions(const Options& options) info_log(options.info_log), info_log_level(options.info_log_level), max_open_files(options.max_open_files), + max_file_opening_threads(options.max_file_opening_threads), max_total_wal_size(options.max_total_wal_size), statistics(options.statistics), disableDataSync(options.disableDataSync), @@ -306,6 +308,7 @@ void DBOptions::Dump(Logger* log) const { Warn(log, " Options.env: %p", env); Warn(log, " Options.info_log: %p", info_log.get()); Warn(log, " Options.max_open_files: %d", max_open_files); + Warn(log, "Options.max_file_opening_threads: %d", max_file_opening_threads); Warn(log, " Options.max_total_wal_size: %" PRIu64, max_total_wal_size); Warn(log, " Options.disableDataSync: %d", disableDataSync); Warn(log, " Options.use_fsync: %d", use_fsync);