From 32e31d49d1319cc20f2dc7c91a91db88c13f5c7e Mon Sep 17 00:00:00 2001 From: Zhongyi Xie Date: Thu, 16 Nov 2017 17:46:43 -0800 Subject: [PATCH] Make DBOption compaction_readahead_size dynamic Summary: Closes https://github.com/facebook/rocksdb/pull/3004 Differential Revision: D6056141 Pulled By: miasantreble fbshipit-source-id: 56df1630f464fd56b07d25d38161f699e0528b7f --- HISTORY.md | 2 +- db/compaction_job.cc | 7 ++++--- db/compaction_job.h | 2 ++ db/db_impl.cc | 12 +++++++----- db/db_impl_compaction_flush.cc | 8 ++++---- db/db_options_test.cc | 26 ++++++++++++++++++++++++++ db/db_test_util.h | 5 +++++ db/table_cache.cc | 4 +++- db/version_set.cc | 13 ++++++------- db/version_set.h | 9 +++------ options/cf_options.cc | 1 - options/cf_options.h | 2 -- options/db_options.cc | 13 +++++++------ options/db_options.h | 2 +- options/options_helper.cc | 2 +- options/options_helper.h | 3 ++- 16 files changed, 72 insertions(+), 39 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 0ffe60d48..5bd3e837e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -15,7 +15,7 @@ ### New Features * `DBOptions::writable_file_max_buffer_size` can now be changed dynamically. -* `DBOptions::bytes_per_sync` and `DBOptions::wal_bytes_per_sync` can now be changed dynamically, `DBOptions::wal_bytes_per_sync` will flush all memtables and switch to a new WAL file. +* `DBOptions::bytes_per_sync`, `DBOptions::compaction_readahead_size`, and `DBOptions::wal_bytes_per_sync` can now be changed dynamically, `DBOptions::wal_bytes_per_sync` will flush all memtables and switch to a new WAL file. * Support dynamic adjustment of rate limit according to demand for background I/O. It can be enabled by passing `true` to the `auto_tuned` parameter in `NewGenericRateLimiter()`. The value passed as `rate_bytes_per_sec` will still be respected as an upper-bound. * Support dynamically changing `ColumnFamilyOptions::compaction_options_fifo`. * Introduce `EventListener::OnStallConditionsChanged()` callback. Users can implement it to be notified when user writes are stalled, stopped, or resumed. diff --git a/db/compaction_job.cc b/db/compaction_job.cc index ba8ec1be4..c9c71d60c 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -265,8 +265,7 @@ CompactionJob::CompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, const EnvOptions env_options, VersionSet* versions, const std::atomic* shutting_down, - const SequenceNumber preserve_deletes_seqnum, - LogBuffer* log_buffer, + const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, Status* db_bg_error, std::vector existing_snapshots, @@ -282,6 +281,8 @@ CompactionJob::CompactionJob( db_options_(db_options), env_options_(env_options), env_(db_options.env), + env_optiosn_for_read_( + env_->OptimizeForCompactionTableRead(env_options, db_options_)), versions_(versions), shutting_down_(shutting_down), preserve_deletes_seqnum_(preserve_deletes_seqnum), @@ -680,7 +681,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { std::unique_ptr range_del_agg( new RangeDelAggregator(cfd->internal_comparator(), existing_snapshots_)); std::unique_ptr input(versions_->MakeInputIterator( - sub_compact->compaction, range_del_agg.get())); + sub_compact->compaction, range_del_agg.get(), env_optiosn_for_read_)); AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); diff --git a/db/compaction_job.h b/db/compaction_job.h index e92991d49..353a53d5c 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -134,6 +134,8 @@ class CompactionJob { const EnvOptions env_options_; Env* env_; + // env_option optimized for compaction table reads + EnvOptions env_optiosn_for_read_; VersionSet* versions_; const std::atomic* shutting_down_; const SequenceNumber preserve_deletes_seqnum_; diff --git a/db/db_impl.cc b/db/db_impl.cc index 35383aada..a2c72f0d9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -587,13 +587,15 @@ Status DBImpl::SetDBOptions( new_options.bytes_per_sync = 1024 * 1024; } mutable_db_options_ = new_options; - env_options_for_compaction_ = EnvOptions(BuildDBOptions( - immutable_db_options_, - mutable_db_options_)); + env_options_for_compaction_ = EnvOptions( + BuildDBOptions(immutable_db_options_, mutable_db_options_)); env_options_for_compaction_ = env_->OptimizeForCompactionTableWrite( - env_options_for_compaction_, - immutable_db_options_); + env_options_for_compaction_, immutable_db_options_); versions_->ChangeEnvOptions(mutable_db_options_); + env_options_for_compaction_ = env_->OptimizeForCompactionTableRead( + env_options_for_compaction_, immutable_db_options_); + env_options_for_compaction_.compaction_readahead_size = + mutable_db_options_.compaction_readahead_size; write_thread_.EnterUnbatched(&w, &mutex_); if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) { Status purge_wal_status = SwitchWAL(&write_context); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 6706826c0..02f35df21 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -544,10 +544,10 @@ Status DBImpl::CompactFilesImpl( job_context->job_id, c.get(), immutable_db_options_, env_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->output_path_id()), - stats_, &mutex_, &bg_error_, snapshot_seqs, - earliest_write_conflict_snapshot, snapshot_checker, table_cache_, - &event_logger_, c->mutable_cf_options()->paranoid_file_checks, + directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_, + snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, + table_cache_, &event_logger_, + c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, nullptr); // Here we pass a nullptr for CompactionJobStats because // CompactFiles does not trigger OnCompactionCompleted(), diff --git a/db/db_options_test.cc b/db/db_options_test.cc index bafa8e55a..81bcc888c 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -692,6 +692,32 @@ TEST_F(DBOptionsTest, SetFIFOCompactionOptions) { ASSERT_LE(NumTableFilesAtLevel(0), 5); } +TEST_F(DBOptionsTest, CompactionReadaheadSizeChange) { + SpecialEnv env(env_); + Options options; + options.env = &env; + + options.compaction_readahead_size = 0; + options.new_table_reader_for_compaction_inputs = true; + options.level0_file_num_compaction_trigger = 2; + const std::string kValue(1024, 'v'); + Reopen(options); + + ASSERT_EQ(0, dbfull()->GetDBOptions().compaction_readahead_size); + ASSERT_OK(dbfull()->SetDBOptions({{"compaction_readahead_size", "256"}})); + ASSERT_EQ(256, dbfull()->GetDBOptions().compaction_readahead_size); + for (int i = 0; i < 1024; i++) { + Put(Key(i), kValue); + } + Flush(); + for (int i = 0; i < 1024 * 2; i++) { + Put(Key(i), kValue); + } + Flush(); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(256, env_->compaction_readahead_size_); + Close(); +} #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/db/db_test_util.h b/db/db_test_util.h index 89083b501..cca776157 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -445,6 +445,9 @@ class SpecialEnv : public EnvWrapper { r->reset(new CountingFile(std::move(*r), &random_read_counter_, &random_read_bytes_counter_)); } + if (s.ok() && soptions.compaction_readahead_size > 0) { + compaction_readahead_size_ = soptions.compaction_readahead_size; + } return s; } @@ -570,6 +573,8 @@ class SpecialEnv : public EnvWrapper { bool no_slowdown_; std::atomic is_wal_sync_thread_safe_{true}; + + size_t compaction_readahead_size_; }; class MockTimeEnv : public EnvWrapper { diff --git a/db/table_cache.cc b/db/table_cache.cc index ca9245391..07daec2d6 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -191,7 +191,9 @@ InternalIterator* TableCache::NewIterator( &use_direct_reads_for_compaction); #endif // !NDEBUG if (ioptions_.new_table_reader_for_compaction_inputs) { - readahead = ioptions_.compaction_readahead_size; + // get compaction_readahead_size from env_options allows us to set the + // value dynamically + readahead = env_options.compaction_readahead_size; create_new_table_reader = true; } } else { diff --git a/db/version_set.cc b/db/version_set.cc index e494f231f..f432a56c0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2418,9 +2418,7 @@ VersionSet::VersionSet(const std::string& dbname, prev_log_number_(0), current_version_number_(0), manifest_file_size_(0), - env_options_(storage_options), - env_options_compactions_( - env_->OptimizeForCompactionTableRead(env_options_, *db_options_)) {} + env_options_(storage_options) {} void CloseTables(void* ptr, size_t) { TableReader* table_reader = reinterpret_cast(ptr); @@ -3688,7 +3686,8 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { } InternalIterator* VersionSet::MakeInputIterator( - const Compaction* c, RangeDelAggregator* range_del_agg) { + const Compaction* c, RangeDelAggregator* range_del_agg, + const EnvOptions& env_options_compactions) { auto cfd = c->column_family_data(); ReadOptions read_options; read_options.verify_checksums = true; @@ -3713,8 +3712,8 @@ InternalIterator* VersionSet::MakeInputIterator( const LevelFilesBrief* flevel = c->input_levels(which); for (size_t i = 0; i < flevel->num_files; i++) { list[num++] = cfd->table_cache()->NewIterator( - read_options, env_options_compactions_, - cfd->internal_comparator(), flevel->files[i].fd, range_del_agg, + read_options, env_options_compactions, cfd->internal_comparator(), + flevel->files[i].fd, range_del_agg, nullptr /* table_reader_ptr */, nullptr /* no per level latency histogram */, true /* for_compaction */, nullptr /* arena */, @@ -3724,7 +3723,7 @@ InternalIterator* VersionSet::MakeInputIterator( // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( new LevelFileIteratorState( - cfd->table_cache(), read_options, env_options_compactions_, + cfd->table_cache(), read_options, env_options_compactions, cfd->internal_comparator(), nullptr /* no per level latency histogram */, true /* for_compaction */, false /* prefix enabled */, diff --git a/db/version_set.h b/db/version_set.h index 59378d878..7942ce7f1 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -812,8 +812,9 @@ class VersionSet { // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. - InternalIterator* MakeInputIterator(const Compaction* c, - RangeDelAggregator* range_del_agg); + InternalIterator* MakeInputIterator( + const Compaction* c, RangeDelAggregator* range_del_agg, + const EnvOptions& env_options_compactions); // Add all files listed in any live version to *live. void AddLiveFiles(std::vector* live_list); @@ -916,10 +917,6 @@ class VersionSet { // env options for all reads and writes except compactions EnvOptions env_options_; - // env options used for compactions. This is a copy of - // env_options_ but with readaheads set to readahead_compactions_. - const EnvOptions env_options_compactions_; - // No copying allowed VersionSet(const VersionSet&); void operator=(const VersionSet&); diff --git a/options/cf_options.cc b/options/cf_options.cc index 049199d2f..f8e4e8dfe 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -65,7 +65,6 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, db_options.access_hint_on_compaction_start), new_table_reader_for_compaction_inputs( db_options.new_table_reader_for_compaction_inputs), - compaction_readahead_size(db_options.compaction_readahead_size), num_levels(cf_options.num_levels), optimize_filters_for_hits(cf_options.optimize_filters_for_hits), force_consistency_checks(cf_options.force_consistency_checks), diff --git a/options/cf_options.h b/options/cf_options.h index 682e5bf7f..e24a756fe 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -101,8 +101,6 @@ struct ImmutableCFOptions { bool new_table_reader_for_compaction_inputs; - size_t compaction_readahead_size; - int num_levels; bool optimize_filters_for_hits; diff --git a/options/db_options.cc b/options/db_options.cc index a052a01b0..fd3cdcccd 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -62,7 +62,6 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) access_hint_on_compaction_start(options.access_hint_on_compaction_start), new_table_reader_for_compaction_inputs( options.new_table_reader_for_compaction_inputs), - compaction_readahead_size(options.compaction_readahead_size), random_access_max_buffer_size(options.random_access_max_buffer_size), use_adaptive_mutex(options.use_adaptive_mutex), listeners(options.listeners), @@ -168,9 +167,6 @@ void ImmutableDBOptions::Dump(Logger* log) const { static_cast(access_hint_on_compaction_start)); ROCKS_LOG_HEADER(log, " Options.new_table_reader_for_compaction_inputs: %d", new_table_reader_for_compaction_inputs); - ROCKS_LOG_HEADER( - log, " Options.compaction_readahead_size: %" ROCKSDB_PRIszt, - compaction_readahead_size); ROCKS_LOG_HEADER( log, " Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt, random_access_max_buffer_size); @@ -234,7 +230,8 @@ MutableDBOptions::MutableDBOptions() stats_dump_period_sec(600), max_open_files(-1), bytes_per_sync(0), - wal_bytes_per_sync(0) {} + wal_bytes_per_sync(0), + compaction_readahead_size(0) {} MutableDBOptions::MutableDBOptions(const DBOptions& options) : max_background_jobs(options.max_background_jobs), @@ -249,7 +246,8 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options) stats_dump_period_sec(options.stats_dump_period_sec), max_open_files(options.max_open_files), bytes_per_sync(options.bytes_per_sync), - wal_bytes_per_sync(options.wal_bytes_per_sync) {} + wal_bytes_per_sync(options.wal_bytes_per_sync), + compaction_readahead_size(options.compaction_readahead_size) {} void MutableDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.max_background_jobs: %d", @@ -278,6 +276,9 @@ void MutableDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.wal_bytes_per_sync: %" PRIu64, wal_bytes_per_sync); + ROCKS_LOG_HEADER(log, + " Options.compaction_readahead_size: %" ROCKSDB_PRIszt, + compaction_readahead_size); } } // namespace rocksdb diff --git a/options/db_options.h b/options/db_options.h index 399b38443..107d35c87 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -55,7 +55,6 @@ struct ImmutableDBOptions { std::shared_ptr write_buffer_manager; DBOptions::AccessHint access_hint_on_compaction_start; bool new_table_reader_for_compaction_inputs; - size_t compaction_readahead_size; size_t random_access_max_buffer_size; bool use_adaptive_mutex; std::vector> listeners; @@ -100,6 +99,7 @@ struct MutableDBOptions { int max_open_files; uint64_t bytes_per_sync; uint64_t wal_bytes_per_sync; + size_t compaction_readahead_size; }; } // namespace rocksdb diff --git a/options/options_helper.cc b/options/options_helper.cc index cbb389153..40c0539e1 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -87,7 +87,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.new_table_reader_for_compaction_inputs = immutable_db_options.new_table_reader_for_compaction_inputs; options.compaction_readahead_size = - immutable_db_options.compaction_readahead_size; + mutable_db_options.compaction_readahead_size; options.random_access_max_buffer_size = immutable_db_options.random_access_max_buffer_size; options.writable_file_max_buffer_size = diff --git a/options/options_helper.h b/options/options_helper.h index ebcec7b2a..4f37c4482 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -216,7 +216,8 @@ static std::unordered_map db_options_type_info = { OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, {"compaction_readahead_size", {offsetof(struct DBOptions, compaction_readahead_size), OptionType::kSizeT, - OptionVerificationType::kNormal, false, 0}}, + OptionVerificationType::kNormal, true, + offsetof(struct MutableDBOptions, compaction_readahead_size)}}, {"random_access_max_buffer_size", {offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal, false, 0}},