diff --git a/HISTORY.md b/HISTORY.md index 8a500037d..cacf30fd1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * Return an error on write if write_options.sync = true and write_options.disableWAL = true to warn user of inconsistent options. Previously we will not write to WAL and not respecting the sync options in this case. ### New Features +* `DBOptions::writable_file_max_buffer_size` can now be changed dynamically. * `DBOptions::bytes_per_sync` and `DBOptions::wal_bytes_per_sync` can now be changed dynamically, `DBOptions::wal_bytes_per_sync` will flush all memtables and switch to a new WAL file. * Support dynamic adjustment of rate limit according to demand for background I/O. It can be enabled by passing `true` to the `auto_tuned` parameter in `NewGenericRateLimiter()`. The value passed as `rate_bytes_per_sec` will still be respected as an upper-bound. * Support dynamically changing `ColumnFamilyOptions::compaction_options_fifo`. diff --git a/db/c.cc b/db/c.cc index 5486e9602..2e8d0db3c 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2276,6 +2276,11 @@ void rocksdb_options_set_bytes_per_sync( opt->rep.bytes_per_sync = v; } +void rocksdb_options_set_writable_file_max_buffer_size(rocksdb_options_t* opt, + uint64_t v) { + opt->rep.writable_file_max_buffer_size = v; +} + void rocksdb_options_set_allow_concurrent_memtable_write(rocksdb_options_t* opt, unsigned char v) { opt->rep.allow_concurrent_memtable_write = v; diff --git a/db/db_impl.cc b/db/db_impl.cc index 84cc7b9cf..465ca7306 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -574,6 +574,7 @@ Status DBImpl::SetDBOptions( env_options_for_compaction_ = env_->OptimizeForCompactionTableWrite( env_options_for_compaction_, immutable_db_options_); + versions_->ChangeEnvOptions(mutable_db_options_); write_thread_.EnterUnbatched(&w, &mutex_); if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) { Status purge_wal_status = SwitchWAL(&write_context); diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 3cd4dcdc5..bafa8e55a 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -208,6 +208,58 @@ TEST_F(DBOptionsTest, SetWalBytesPerSync) { ASSERT_GT(low_bytes_per_sync, counter); } +TEST_F(DBOptionsTest, WritableFileMaxBufferSize) { + Options options; + options.create_if_missing = true; + options.writable_file_max_buffer_size = 1024 * 1024; + options.level0_file_num_compaction_trigger = 3; + options.max_manifest_file_size = 1; + options.env = env_; + int buffer_size = 1024 * 1024; + Reopen(options); + ASSERT_EQ(buffer_size, + dbfull()->GetDBOptions().writable_file_max_buffer_size); + + std::atomic match_cnt(0); + std::atomic unmatch_cnt(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WritableFileWriter::WritableFileWriter:0", [&](void* arg) { + int value = static_cast(reinterpret_cast(arg)); + if (value == buffer_size) { + match_cnt++; + } else { + unmatch_cnt++; + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + int i = 0; + for (; i < 3; i++) { + ASSERT_OK(Put("foo", ToString(i))); + ASSERT_OK(Put("bar", ToString(i))); + Flush(); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(unmatch_cnt, 0); + ASSERT_GE(match_cnt, 11); + + buffer_size = 512 * 1024; + match_cnt = 0; + unmatch_cnt = 0; + ASSERT_OK( + dbfull()->SetDBOptions({{"writable_file_max_buffer_size", "524288"}})); + ASSERT_EQ(buffer_size, + dbfull()->GetDBOptions().writable_file_max_buffer_size); + i = 0; + for (; i < 3; i++) { + ASSERT_OK(Put("foo", ToString(i))); + ASSERT_OK(Put("bar", ToString(i))); + Flush(); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(unmatch_cnt, 0); + ASSERT_GE(match_cnt, 11); +} + TEST_F(DBOptionsTest, SetOptionsAndReopen) { Random rnd(1044); auto rand_opts = GetRandomizedMutableCFOptionsMap(&rnd); diff --git a/db/version_set.cc b/db/version_set.cc index 7ccf94e9c..b52eae03a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -576,7 +576,7 @@ Status Version::GetTableProperties(std::shared_ptr* tp, auto table_cache = cfd_->table_cache(); auto ioptions = cfd_->ioptions(); Status s = table_cache->GetTableProperties( - vset_->env_options_, cfd_->internal_comparator(), file_meta->fd, + env_options_, cfd_->internal_comparator(), file_meta->fd, tp, true /* no io */); if (s.ok()) { return s; @@ -599,7 +599,7 @@ Status Version::GetTableProperties(std::shared_ptr* tp, TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(), file_meta->fd.GetPathId()); } - s = ioptions->env->NewRandomAccessFile(file_name, &file, vset_->env_options_); + s = ioptions->env->NewRandomAccessFile(file_name, &file, env_options_); if (!s.ok()) { return s; } @@ -711,7 +711,7 @@ size_t Version::GetMemoryUsageByTableReaders() { for (auto& file_level : storage_info_.level_files_brief_) { for (size_t i = 0; i < file_level.num_files; i++) { total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader( - vset_->env_options_, cfd_->internal_comparator(), + env_options_, cfd_->internal_comparator(), file_level.files[i].fd); } } @@ -936,7 +936,7 @@ VersionStorageInfo::VersionStorageInfo( } Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, - uint64_t version_number) + const EnvOptions& env_opt, uint64_t version_number) : env_(vset->env_), cfd_(column_family_data), info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log), @@ -959,6 +959,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, next_(this), prev_(this), refs_(0), + env_options_(env_opt), version_number_(version_number) {} void Version::Get(const ReadOptions& read_options, const LookupKey& k, @@ -2532,7 +2533,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, LogAndApplyCFHelper(w.edit_list.front()); batch_edits.push_back(w.edit_list.front()); } else { - v = new Version(column_family_data, this, current_version_number_++); + v = new Version(column_family_data, this, env_options_, + current_version_number_++); builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data)); auto* builder = builder_guard->version_builder(); for (const auto& writer : manifest_writers_) { @@ -2577,7 +2579,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, // Unlock during expensive operations. New writes cannot get here // because &w is ensuring that all new writes get queued. { - + EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_); mu->Unlock(); TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); @@ -2599,7 +2601,6 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n", pending_manifest_file_number_); unique_ptr descriptor_file; - EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_); s = NewWritableFile( env_, DescriptorFileName(dbname_, pending_manifest_file_number_), &descriptor_file, opt_env_opts); @@ -3064,7 +3065,8 @@ Status VersionSet::Recover( false /* prefetch_index_and_filter_in_cache */); } - Version* v = new Version(cfd, this, current_version_number_++); + Version* v = + new Version(cfd, this, env_options_, current_version_number_++); builder->SaveTo(v->storage_info()); // Install recovered version @@ -3422,7 +3424,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, assert(builders_iter != builders.end()); auto builder = builders_iter->second->version_builder(); - Version* v = new Version(cfd, this, current_version_number_++); + Version* v = + new Version(cfd, this, env_options_, current_version_number_++); builder->SaveTo(v->storage_info()); v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false); @@ -3634,7 +3637,7 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f, // approximate offset of "key" within the table. TableReader* table_reader_ptr; InternalIterator* iter = v->cfd_->table_cache()->NewIterator( - ReadOptions(), env_options_, v->cfd_->internal_comparator(), f.fd, + ReadOptions(), v->env_options_, v->cfd_->internal_comparator(), f.fd, nullptr /* range_del_agg */, &table_reader_ptr); if (table_reader_ptr != nullptr) { result = table_reader_ptr->ApproximateOffsetOf(key); @@ -3865,7 +3868,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( const ColumnFamilyOptions& cf_options, VersionEdit* edit) { assert(edit->is_column_family_add_); - Version* dummy_versions = new Version(nullptr, this); + Version* dummy_versions = new Version(nullptr, this, env_options_); // Ref() dummy version once so that later we can call Unref() to delete it // by avoiding calling "delete" explicitly (~Version is private) dummy_versions->Ref(); @@ -3873,7 +3876,8 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( edit->column_family_name_, edit->column_family_, dummy_versions, cf_options); - Version* v = new Version(new_cfd, this, current_version_number_++); + Version* v = + new Version(new_cfd, this, env_options_, current_version_number_++); // Fill level target base information. v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(), diff --git a/db/version_set.h b/db/version_set.h index 11e175b5d..f83099f23 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -663,12 +663,14 @@ class Version { Version* next_; // Next version in linked list Version* prev_; // Previous version in linked list int refs_; // Number of live refs to this version + const EnvOptions env_options_; // A version number that uniquely represents this version. This is // used for debugging and logging purposes only. uint64_t version_number_; - Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0); + Version(ColumnFamilyData* cfd, VersionSet* vset, const EnvOptions& env_opt, + uint64_t version_number = 0); ~Version(); @@ -844,6 +846,10 @@ class VersionSet { ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); } const EnvOptions& env_options() { return env_options_; } + void ChangeEnvOptions(const MutableDBOptions& new_options) { + env_options_.writable_file_max_buffer_size = + new_options.writable_file_max_buffer_size; + } static uint64_t GetNumLiveVersions(Version* dummy_versions); @@ -908,7 +914,7 @@ class VersionSet { std::vector obsolete_manifests_; // env options for all reads and writes except compactions - const EnvOptions& env_options_; + EnvOptions env_options_; // env options used for compactions. This is a copy of // env_options_ but with readaheads set to readahead_compactions_. diff --git a/env/env.cc b/env/env.cc index ae0b111be..0b2a6cf98 100644 --- a/env/env.cc +++ b/env/env.cc @@ -333,6 +333,8 @@ EnvOptions Env::OptimizeForLogWrite(const EnvOptions& env_options, const DBOptions& db_options) const { EnvOptions optimized_env_options(env_options); optimized_env_options.bytes_per_sync = db_options.wal_bytes_per_sync; + optimized_env_options.writable_file_max_buffer_size = + db_options.writable_file_max_buffer_size; return optimized_env_options; } diff --git a/env/env_posix.cc b/env/env_posix.cc index 5644c5a8a..b30a00e4b 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -832,6 +832,8 @@ class PosixEnv : public Env { // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit // test and make this false optimized.fallocate_with_keep_size = true; + optimized.writable_file_max_buffer_size = + db_options.writable_file_max_buffer_size; return optimized; } diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 1918e6027..267f1dc1f 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -857,6 +857,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_bytes_per_sync( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_wal_bytes_per_sync( rocksdb_options_t*, uint64_t); extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_writable_file_max_buffer_size(rocksdb_options_t*, uint64_t); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_allow_concurrent_memtable_write(rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void diff --git a/options/db_options.cc b/options/db_options.cc index af3f799e2..53b0acece 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -64,7 +64,6 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) options.new_table_reader_for_compaction_inputs), compaction_readahead_size(options.compaction_readahead_size), random_access_max_buffer_size(options.random_access_max_buffer_size), - writable_file_max_buffer_size(options.writable_file_max_buffer_size), use_adaptive_mutex(options.use_adaptive_mutex), listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), @@ -175,9 +174,6 @@ void ImmutableDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER( log, " Options.random_access_max_buffer_size: %" ROCKSDB_PRIszt, random_access_max_buffer_size); - ROCKS_LOG_HEADER( - log, " Options.writable_file_max_buffer_size: %" ROCKSDB_PRIszt, - writable_file_max_buffer_size); ROCKS_LOG_HEADER(log, " Options.use_adaptive_mutex: %d", use_adaptive_mutex); ROCKS_LOG_HEADER(log, " Options.rate_limiter: %p", @@ -230,6 +226,7 @@ MutableDBOptions::MutableDBOptions() base_background_compactions(-1), max_background_compactions(-1), avoid_flush_during_shutdown(false), + writable_file_max_buffer_size(1024 * 1024), delayed_write_rate(2 * 1024U * 1024U), max_total_wal_size(0), delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000), @@ -243,6 +240,7 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options) base_background_compactions(options.base_background_compactions), max_background_compactions(options.max_background_compactions), avoid_flush_during_shutdown(options.avoid_flush_during_shutdown), + writable_file_max_buffer_size(options.writable_file_max_buffer_size), delayed_write_rate(options.delayed_write_rate), max_total_wal_size(options.max_total_wal_size), delete_obsolete_files_period_micros( @@ -259,6 +257,9 @@ void MutableDBOptions::Dump(Logger* log) const { max_background_compactions); ROCKS_LOG_HEADER(log, " Options.avoid_flush_during_shutdown: %d", avoid_flush_during_shutdown); + ROCKS_LOG_HEADER( + log, " Options.writable_file_max_buffer_size: %" ROCKSDB_PRIszt, + writable_file_max_buffer_size); ROCKS_LOG_HEADER(log, " Options.delayed_write_rate : %" PRIu64, delayed_write_rate); ROCKS_LOG_HEADER(log, " Options.max_total_wal_size: %" PRIu64, diff --git a/options/db_options.h b/options/db_options.h index adf7ef2f5..653448c44 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -57,7 +57,6 @@ struct ImmutableDBOptions { bool new_table_reader_for_compaction_inputs; size_t compaction_readahead_size; size_t random_access_max_buffer_size; - size_t writable_file_max_buffer_size; bool use_adaptive_mutex; std::vector> listeners; bool enable_thread_tracking; @@ -93,6 +92,7 @@ struct MutableDBOptions { int base_background_compactions; int max_background_compactions; bool avoid_flush_during_shutdown; + size_t writable_file_max_buffer_size; uint64_t delayed_write_rate; uint64_t max_total_wal_size; uint64_t delete_obsolete_files_period_micros; diff --git a/options/options_helper.cc b/options/options_helper.cc index c4d2beb8f..6d95dbc6a 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -91,7 +91,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.random_access_max_buffer_size = immutable_db_options.random_access_max_buffer_size; options.writable_file_max_buffer_size = - immutable_db_options.writable_file_max_buffer_size; + mutable_db_options.writable_file_max_buffer_size; options.use_adaptive_mutex = immutable_db_options.use_adaptive_mutex; options.listeners = immutable_db_options.listeners; options.enable_thread_tracking = immutable_db_options.enable_thread_tracking; diff --git a/options/options_helper.h b/options/options_helper.h index 8993daeef..c97cd320d 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -220,9 +220,6 @@ static std::unordered_map db_options_type_info = { {"random_access_max_buffer_size", {offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal, false, 0}}, - {"writable_file_max_buffer_size", - {offsetof(struct DBOptions, writable_file_max_buffer_size), - OptionType::kSizeT, OptionVerificationType::kNormal, false, 0}}, {"use_adaptive_mutex", {offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, @@ -351,6 +348,10 @@ static std::unordered_map db_options_type_info = { {offsetof(struct DBOptions, avoid_flush_during_shutdown), OptionType::kBoolean, OptionVerificationType::kNormal, true, offsetof(struct MutableDBOptions, avoid_flush_during_shutdown)}}, + {"writable_file_max_buffer_size", + {offsetof(struct DBOptions, writable_file_max_buffer_size), + OptionType::kSizeT, OptionVerificationType::kNormal, true, + offsetof(struct MutableDBOptions, writable_file_max_buffer_size)}}, {"allow_ingest_behind", {offsetof(struct DBOptions, allow_ingest_behind), OptionType::kBoolean, OptionVerificationType::kNormal, false, diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 3e84d06bd..87b4eb159 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -786,6 +786,8 @@ EnvOptions WinEnvIO::OptimizeForLogWrite(const EnvOptions& env_options, // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit // test and make this false optimized.fallocate_with_keep_size = true; + optimized.writable_file_max_buffer_size = + db_options.writable_file_max_buffer_size; return optimized; } diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 9be692458..41bf6a309 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -13,6 +13,7 @@ #include "rocksdb/env.h" #include "rocksdb/rate_limiter.h" #include "util/aligned_buffer.h" +#include "util/sync_point.h" namespace rocksdb { @@ -151,6 +152,8 @@ class WritableFileWriter { bytes_per_sync_(options.bytes_per_sync), rate_limiter_(options.rate_limiter), stats_(stats) { + TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", + reinterpret_cast(max_buffer_size_)); buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); }