diff --git a/HISTORY.md b/HISTORY.md index 6e64270ef..7c1aaaba0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ * `BackupableDBOptions::max_valid_backups_to_open == 0` now means no backups will be opened during BackupEngine initialization. Previously this condition disabled limiting backups opened. ### New Features +* `DBOptions::bytes_per_sync` and `DBOptions::wal_bytes_per_sync` can now be changed dynamically, `DBOptions::wal_bytes_per_sync` will flush all memtables and switch to a new WAL file. ### Bug Fixes * Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery. diff --git a/db/c.cc b/db/c.cc index 65e79c01f..5486e9602 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2023,8 +2023,8 @@ void rocksdb_options_set_paranoid_checks( opt->rep.paranoid_checks = v; } -void rocksdb_options_set_db_paths(rocksdb_options_t* opt, - const rocksdb_dbpath_t** dbpath_values, +void rocksdb_options_set_db_paths(rocksdb_options_t* opt, + const rocksdb_dbpath_t** dbpath_values, size_t num_paths) { std::vector db_paths(num_paths); for (size_t i = 0; i < num_paths; ++i) { @@ -2266,6 +2266,11 @@ void rocksdb_options_set_use_adaptive_mutex( opt->rep.use_adaptive_mutex = v; } +void rocksdb_options_set_wal_bytes_per_sync( + rocksdb_options_t* opt, uint64_t v) { + opt->rep.wal_bytes_per_sync = v; +} + void rocksdb_options_set_bytes_per_sync( rocksdb_options_t* opt, uint64_t v) { opt->rep.bytes_per_sync = v; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 13a43a00a..2e4bb367b 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -38,7 +38,6 @@ #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" #include "monitoring/thread_status_util.h" -#include "port/likely.h" #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -264,7 +263,7 @@ void CompactionJob::AggregateStatistics() { CompactionJob::CompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, - const EnvOptions& env_options, VersionSet* versions, + const EnvOptions env_options, VersionSet* versions, const std::atomic* shutting_down, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, Status* db_bg_error, @@ -1261,11 +1260,10 @@ Status CompactionJob::OpenCompactionOutputFile( #endif // !ROCKSDB_LITE // Make the output file unique_ptr writable_file; - EnvOptions opt_env_opts = - env_->OptimizeForCompactionTableWrite(env_options_, db_options_); + bool syncpoint_arg = env_options_.use_direct_writes; TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile", - &opt_env_opts.use_direct_writes); - Status s = NewWritableFile(env_, fname, &writable_file, opt_env_opts); + &syncpoint_arg); + Status s = NewWritableFile(env_, fname, &writable_file, env_options_); if (!s.ok()) { ROCKS_LOG_ERROR( db_options_.info_log, diff --git a/db/compaction_job.h b/db/compaction_job.h index 6ca5d627a..22b35a5f3 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -56,7 +56,7 @@ class CompactionJob { public: CompactionJob(int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, - const EnvOptions& env_options, VersionSet* versions, + const EnvOptions env_options, VersionSet* versions, const std::atomic* shutting_down, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, @@ -127,7 +127,7 @@ class CompactionJob { // DBImpl state const std::string& dbname_; const ImmutableDBOptions& db_options_; - const EnvOptions& env_options_; + const EnvOptions env_options_; Env* env_; VersionSet* versions_; diff --git a/db/db_impl.cc b/db/db_impl.cc index 19a394c74..101c3fe7c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -181,6 +181,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) has_unpersisted_data_(false), unable_to_flush_oldest_log_(false), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), + env_options_for_compaction_(env_->OptimizeForCompactionTableWrite( + env_options_, + immutable_db_options_)), num_running_ingest_file_(0), #ifndef ROCKSDB_LITE wal_manager_(immutable_db_options_, env_options_), @@ -539,6 +542,7 @@ Status DBImpl::SetDBOptions( MutableDBOptions new_options; Status s; Status persist_options_status; + bool wal_changed = false; WriteThread::Writer w; WriteContext write_context; { @@ -557,12 +561,25 @@ Status DBImpl::SetDBOptions( table_cache_.get()->SetCapacity(new_options.max_open_files == -1 ? TableCache::kInfiniteCapacity : new_options.max_open_files - 10); - + wal_changed = mutable_db_options_.wal_bytes_per_sync != + new_options.wal_bytes_per_sync; + if (new_options.bytes_per_sync == 0) { + new_options.bytes_per_sync = 1024 * 1024; + } mutable_db_options_ = new_options; - + env_options_for_compaction_ = EnvOptions(BuildDBOptions( + immutable_db_options_, + mutable_db_options_)); + env_options_for_compaction_ = env_->OptimizeForCompactionTableWrite( + env_options_for_compaction_, + immutable_db_options_); + env_options_for_compaction_.bytes_per_sync + = mutable_db_options_.bytes_per_sync; + env_->OptimizeForCompactionTableWrite(env_options_for_compaction_, + immutable_db_options_); write_thread_.EnterUnbatched(&w, &mutex_); - if (total_log_size_ > GetMaxTotalWalSize()) { - Status purge_wal_status = HandleWALFull(&write_context); + if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) { + Status purge_wal_status = SwitchWAL(&write_context); if (!purge_wal_status.ok()) { ROCKS_LOG_WARN(immutable_db_options_.info_log, "Unable to purge WAL files in SetDBOptions() -- %s", diff --git a/db/db_impl.h b/db/db_impl.h index 64f5dee67..9ffd9f1ea 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -332,7 +332,7 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family = nullptr, bool disallow_trivial_move = false); - void TEST_HandleWALFull(); + void TEST_SwitchWAL(); bool TEST_UnableToFlushOldestLog() { return unable_to_flush_oldest_log_; @@ -750,7 +750,7 @@ class DBImpl : public DB { Status WaitForFlushMemTable(ColumnFamilyData* cfd); // REQUIRES: mutex locked - Status HandleWALFull(WriteContext* write_context); + Status SwitchWAL(WriteContext* write_context); // REQUIRES: mutex locked Status HandleWriteBufferFull(WriteContext* write_context); @@ -1168,6 +1168,9 @@ class DBImpl : public DB { // The options to access storage files const EnvOptions env_options_; + // Additonal options for compaction and flush + EnvOptions env_options_for_compaction_; + // Number of running IngestExternalFile() calls. // REQUIRES: mutex held int num_running_ingest_file_; diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 8106b3e2d..db2988131 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -87,10 +87,11 @@ Status DBImpl::FlushMemTableToOutputFile( snapshots_.GetAll(&earliest_write_conflict_snapshot); FlushJob flush_job( - dbname_, cfd, immutable_db_options_, mutable_cf_options, env_options_, - versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, - earliest_write_conflict_snapshot, job_context, log_buffer, - directories_.GetDbDir(), directories_.GetDataDir(0U), + dbname_, cfd, immutable_db_options_, mutable_cf_options, + env_options_for_compaction_, versions_.get(), &mutex_, + &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, + job_context, log_buffer, directories_.GetDbDir(), + directories_.GetDataDir(0U), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats); @@ -532,8 +533,9 @@ Status DBImpl::CompactFilesImpl( assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( - job_context->job_id, c.get(), immutable_db_options_, env_options_, - versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), + job_context->job_id, c.get(), immutable_db_options_, + env_options_for_compaction_, versions_.get(), &shutting_down_, + log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, @@ -1676,11 +1678,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( - job_context->job_id, c.get(), immutable_db_options_, env_options_, - versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), + job_context->job_id, c.get(), immutable_db_options_, + env_options_for_compaction_, versions_.get(), &shutting_down_, + log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, - &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, - table_cache_, &event_logger_, + &bg_error_, snapshot_seqs, + earliest_write_conflict_snapshot, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats); diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index de5b66f2a..2ed72a395 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -19,10 +19,10 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() { return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0); } -void DBImpl::TEST_HandleWALFull() { +void DBImpl::TEST_SwitchWAL() { WriteContext write_context; InstrumentedMutexLock l(&mutex_); - HandleWALFull(&write_context); + SwitchWAL(&write_context); } int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 1da1465c4..9a273b850 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -884,11 +884,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, std::vector snapshot_seqs = snapshots_.GetAll(&earliest_write_conflict_snapshot); - EnvOptions optimized_env_options = - env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_); s = BuildTable( dbname_, env_, *cfd->ioptions(), mutable_cf_options, - optimized_env_options, cfd->table_cache(), iter.get(), + env_options_for_compaction_, cfd->table_cache(), iter.get(), std::unique_ptr(mem->NewRangeTombstoneIterator(ro)), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 246d39a47..71304dcc2 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -619,7 +619,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); if (UNLIKELY(status.ok() && !single_column_family_mode_ && total_log_size_ > GetMaxTotalWalSize())) { - status = HandleWALFull(write_context); + status = SwitchWAL(write_context); } if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) { @@ -830,7 +830,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, return status; } -Status DBImpl::HandleWALFull(WriteContext* write_context) { +Status DBImpl::SwitchWAL(WriteContext* write_context) { mutex_.AssertHeld(); assert(write_context != nullptr); Status status; diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 243748f9f..2b54bc7e0 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -117,6 +117,87 @@ TEST_F(DBOptionsTest, GetLatestCFOptions) { GetMutableCFOptionsMap(dbfull()->GetOptions(handles_[1]))); } +TEST_F(DBOptionsTest, SetBytesPerSync) { + const size_t kValueSize = 1024 * 1024 * 8; + Options options; + options.create_if_missing = true; + options.bytes_per_sync = 1024 * 1024; + options.use_direct_reads = false; + options.write_buffer_size = 400 * kValueSize; + options.disable_auto_compactions = true; + options.env = env_; + Reopen(options); + int counter = 0; + int low_bytes_per_sync = 0; + int i = 0; + const std::string kValue(kValueSize, 'v'); + ASSERT_EQ(options.bytes_per_sync, dbfull()->GetDBOptions().bytes_per_sync); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WritableFileWriter::RangeSync:0", [&](void* arg) { + counter++; + }); + + WriteOptions write_opts; + for (; i < 40; i++) { + Put(Key(i), kValue, write_opts); + } + i = 0; + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + low_bytes_per_sync = counter; + counter = 0; + // 8388608 = 8 * 1024 * 1024 + ASSERT_OK(dbfull()->SetDBOptions({{"bytes_per_sync", "8388608"}})); + ASSERT_EQ(8388608, dbfull()->GetDBOptions().bytes_per_sync); + for (; i < 40; i++) { + Put(Key(i), kValue, write_opts); + } + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_GT(counter, 0); + ASSERT_GT(low_bytes_per_sync, 0); + ASSERT_GT(low_bytes_per_sync, counter); +} + +TEST_F(DBOptionsTest, SetWalBytesPerSync) { + const size_t kValueSize = 1024 * 1024 * 3; + Options options; + options.create_if_missing = true; + options.wal_bytes_per_sync = 512; + options.write_buffer_size = 100 * kValueSize; + options.disable_auto_compactions = true; + options.env = env_; + Reopen(options); + ASSERT_EQ(512, dbfull()->GetDBOptions().wal_bytes_per_sync); + int counter = 0; + int low_bytes_per_sync = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WritableFileWriter::RangeSync:0", [&](void* arg) { + counter++; + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + const std::string kValue(kValueSize, 'v'); + int i = 0; + for (; i < 10; i++) { + Put(Key(i), kValue); + } + // Do not flush. If we flush here, SwitchWAL will reuse old WAL file since its + // empty and will not get the new wal_bytes_per_sync value. + low_bytes_per_sync = counter; + //5242880 = 1024 * 1024 * 5 + ASSERT_OK(dbfull()->SetDBOptions({{"wal_bytes_per_sync", "5242880"}})); + ASSERT_EQ(5242880, dbfull()->GetDBOptions().wal_bytes_per_sync); + counter = 0; + i = 0; + for (; i < 10; i++) { + Put(Key(i), kValue); + } + ASSERT_GT(counter, 0); + ASSERT_GT(low_bytes_per_sync, 0); + ASSERT_GT(low_bytes_per_sync, counter); +} + TEST_F(DBOptionsTest, SetOptionsAndReopen) { Random rnd(1044); auto rand_opts = GetRandomizedMutableCFOptionsMap(&rnd); diff --git a/db/flush_job.cc b/db/flush_job.cc index 846edb407..6c2478058 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -30,7 +30,6 @@ #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" #include "monitoring/thread_status_util.h" -#include "port/likely.h" #include "port/port.h" #include "db/memtable.h" #include "rocksdb/db.h" @@ -58,7 +57,7 @@ namespace rocksdb { FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, const ImmutableDBOptions& db_options, const MutableCFOptions& mutable_cf_options, - const EnvOptions& env_options, VersionSet* versions, + const EnvOptions env_options, VersionSet* versions, InstrumentedMutex* db_mutex, std::atomic* shutting_down, std::vector existing_snapshots, @@ -294,16 +293,13 @@ Status FlushJob::WriteLevel0Table() { TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); - EnvOptions optimized_env_options = - db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_); - int64_t _current_time = 0; db_options_.env->GetCurrentTime(&_current_time); // ignore error const uint64_t current_time = static_cast(_current_time); s = BuildTable( dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, - optimized_env_options, cfd_->table_cache(), iter.get(), + env_options_, cfd_->table_cache(), iter.get(), std::move(range_del_iter), &meta_, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), cfd_->GetName(), existing_snapshots_, diff --git a/db/flush_job.h b/db/flush_job.h index 4698ae7b0..37275581a 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -56,8 +56,9 @@ class FlushJob { FlushJob(const std::string& dbname, ColumnFamilyData* cfd, const ImmutableDBOptions& db_options, const MutableCFOptions& mutable_cf_options, - const EnvOptions& env_options, VersionSet* versions, - InstrumentedMutex* db_mutex, std::atomic* shutting_down, + const EnvOptions env_options, + VersionSet* versions, InstrumentedMutex* db_mutex, + std::atomic* shutting_down, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, JobContext* job_context, LogBuffer* log_buffer, @@ -83,7 +84,7 @@ class FlushJob { ColumnFamilyData* cfd_; const ImmutableDBOptions& db_options_; const MutableCFOptions& mutable_cf_options_; - const EnvOptions& env_options_; + const EnvOptions env_options_; VersionSet* versions_; InstrumentedMutex* db_mutex_; std::atomic* shutting_down_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 34a3c983c..9d55b7f73 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -94,9 +94,10 @@ TEST_F(FlushJobTest, Empty) { EventLogger event_logger(db_options_.info_log.get()); FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), - env_options_, versions_.get(), &mutex_, &shutting_down_, - {}, kMaxSequenceNumber, &job_context, nullptr, nullptr, - nullptr, kNoCompression, nullptr, &event_logger, false); + env_options_, versions_.get(), &mutex_, + &shutting_down_, {}, kMaxSequenceNumber, &job_context, + nullptr, nullptr, nullptr, kNoCompression, nullptr, + &event_logger, false); { InstrumentedMutexLock l(&mutex_); flush_job.PickMemTable(); @@ -138,9 +139,10 @@ TEST_F(FlushJobTest, NonEmpty) { EventLogger event_logger(db_options_.info_log.get()); FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), - env_options_, versions_.get(), &mutex_, &shutting_down_, - {}, kMaxSequenceNumber, &job_context, nullptr, nullptr, - nullptr, kNoCompression, nullptr, &event_logger, true); + env_options_, versions_.get(), &mutex_, + &shutting_down_, {}, kMaxSequenceNumber, &job_context, + nullptr, nullptr, nullptr, kNoCompression, nullptr, + &event_logger, true); FileMetaData fd; mutex_.Lock(); flush_job.PickMemTable(); @@ -204,9 +206,10 @@ TEST_F(FlushJobTest, Snapshots) { EventLogger event_logger(db_options_.info_log.get()); FlushJob flush_job( dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, - &shutting_down_, snapshots, kMaxSequenceNumber, &job_context, nullptr, - nullptr, nullptr, kNoCompression, nullptr, &event_logger, true); + *cfd->GetLatestMutableCFOptions(), env_options_, + versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, + &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, + &event_logger, true); mutex_.Lock(); flush_job.PickMemTable(); ASSERT_OK(flush_job.Run()); diff --git a/db/repair.cc b/db/repair.cc index b3862baab..284efd4e3 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -99,7 +99,7 @@ class Repairer { env_(db_options.env), env_options_(), db_options_(SanitizeOptions(dbname_, db_options)), - immutable_db_options_(db_options_), + immutable_db_options_(ImmutableDBOptions(db_options_)), icmp_(default_cf_opts.comparator), default_cf_opts_(default_cf_opts), default_cf_iopts_( @@ -397,16 +397,13 @@ class Repairer { ro.total_order_seek = true; Arena arena; ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); - EnvOptions optimized_env_options = - env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_); - int64_t _current_time = 0; status = env_->GetCurrentTime(&_current_time); // ignore error const uint64_t current_time = static_cast(_current_time); status = BuildTable( dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), - optimized_env_options, table_cache_, iter.get(), + env_options_, table_cache_, iter.get(), std::unique_ptr(mem->NewRangeTombstoneIterator(ro)), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), @@ -618,11 +615,13 @@ Status GetDefaultCFOptions( } // anonymous namespace Status RepairDB(const std::string& dbname, const DBOptions& db_options, - const std::vector& column_families) { + const std::vector& column_families + ) { ColumnFamilyOptions default_cf_opts; Status status = GetDefaultCFOptions(column_families, &default_cf_opts); if (status.ok()) { - Repairer repairer(dbname, db_options, column_families, default_cf_opts, + Repairer repairer(dbname, db_options, column_families, + default_cf_opts, ColumnFamilyOptions() /* unknown_cf_opts */, false /* create_unknown_cfs */); status = repairer.Run(); @@ -636,7 +635,8 @@ Status RepairDB(const std::string& dbname, const DBOptions& db_options, ColumnFamilyOptions default_cf_opts; Status status = GetDefaultCFOptions(column_families, &default_cf_opts); if (status.ok()) { - Repairer repairer(dbname, db_options, column_families, default_cf_opts, + Repairer repairer(dbname, db_options, + column_families, default_cf_opts, unknown_cf_opts, true /* create_unknown_cfs */); status = repairer.Run(); } @@ -646,7 +646,8 @@ Status RepairDB(const std::string& dbname, const DBOptions& db_options, Status RepairDB(const std::string& dbname, const Options& options) { DBOptions db_options(options); ColumnFamilyOptions cf_options(options); - Repairer repairer(dbname, db_options, {}, cf_options /* default_cf_opts */, + Repairer repairer(dbname, db_options, + {}, cf_options /* default_cf_opts */, cf_options /* unknown_cf_opts */, true /* create_unknown_cfs */); return repairer.Run(); diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 0b1dc3db4..1918e6027 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -854,6 +854,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_adaptive_mutex( rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_bytes_per_sync( rocksdb_options_t*, uint64_t); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_wal_bytes_per_sync( + rocksdb_options_t*, uint64_t); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_allow_concurrent_memtable_write(rocksdb_options_t*, unsigned char); diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index eac97a496..0e70ef372 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -45,6 +45,7 @@ class RandomRWFile; class Directory; struct DBOptions; struct ImmutableDBOptions; +struct MutableDBOptions; class RateLimiter; class ThreadStatusUpdater; struct ThreadStatus; @@ -411,7 +412,7 @@ class Env { // table files. virtual EnvOptions OptimizeForCompactionTableWrite( const EnvOptions& env_options, - const ImmutableDBOptions& db_options) const; + const ImmutableDBOptions& immutable_ops) const; // OptimizeForCompactionTableWrite will create a new EnvOptions object that // is a copy of the EnvOptions in the parameters, but is optimized for reading @@ -1087,8 +1088,8 @@ class EnvWrapper : public Env { } EnvOptions OptimizeForCompactionTableWrite( const EnvOptions& env_options, - const ImmutableDBOptions& db_options) const override { - return target_->OptimizeForCompactionTableWrite(env_options, db_options); + const ImmutableDBOptions& immutable_ops) const override { + return target_->OptimizeForCompactionTableWrite(env_options, immutable_ops); } EnvOptions OptimizeForCompactionTableRead( const EnvOptions& env_options, diff --git a/options/db_options.cc b/options/db_options.cc index c8c55f43e..4a1d49f81 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -66,8 +66,6 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) random_access_max_buffer_size(options.random_access_max_buffer_size), writable_file_max_buffer_size(options.writable_file_max_buffer_size), use_adaptive_mutex(options.use_adaptive_mutex), - bytes_per_sync(options.bytes_per_sync), - wal_bytes_per_sync(options.wal_bytes_per_sync), listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), enable_pipelined_write(options.enable_pipelined_write), @@ -187,12 +185,6 @@ void ImmutableDBOptions::Dump(Logger* log) const { Header( log, " Options.sst_file_manager.rate_bytes_per_sec: %" PRIi64, sst_file_manager ? sst_file_manager->GetDeleteRateBytesPerSecond() : 0); - ROCKS_LOG_HEADER(log, - " Options.bytes_per_sync: %" PRIu64, - bytes_per_sync); - ROCKS_LOG_HEADER(log, - " Options.wal_bytes_per_sync: %" PRIu64, - wal_bytes_per_sync); ROCKS_LOG_HEADER(log, " Options.wal_recovery_mode: %d", wal_recovery_mode); ROCKS_LOG_HEADER(log, " Options.enable_thread_tracking: %d", @@ -254,7 +246,9 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options) delete_obsolete_files_period_micros( options.delete_obsolete_files_period_micros), stats_dump_period_sec(options.stats_dump_period_sec), - max_open_files(options.max_open_files) {} + max_open_files(options.max_open_files), + bytes_per_sync(options.bytes_per_sync), + wal_bytes_per_sync(options.wal_bytes_per_sync) {} void MutableDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.max_background_jobs: %d", @@ -274,6 +268,12 @@ void MutableDBOptions::Dump(Logger* log) const { stats_dump_period_sec); ROCKS_LOG_HEADER(log, " Options.max_open_files: %d", max_open_files); + ROCKS_LOG_HEADER(log, + " Options.bytes_per_sync: %" PRIu64, + bytes_per_sync); + ROCKS_LOG_HEADER(log, + " Options.wal_bytes_per_sync: %" PRIu64, + wal_bytes_per_sync); } } // namespace rocksdb diff --git a/options/db_options.h b/options/db_options.h index 99ca7a595..adf7ef2f5 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -59,8 +59,6 @@ struct ImmutableDBOptions { size_t random_access_max_buffer_size; size_t writable_file_max_buffer_size; bool use_adaptive_mutex; - uint64_t bytes_per_sync; - uint64_t wal_bytes_per_sync; std::vector> listeners; bool enable_thread_tracking; bool enable_pipelined_write; @@ -100,6 +98,8 @@ struct MutableDBOptions { uint64_t delete_obsolete_files_period_micros; unsigned int stats_dump_period_sec; int max_open_files; + uint64_t bytes_per_sync; + uint64_t wal_bytes_per_sync; }; } // namespace rocksdb diff --git a/options/options_helper.cc b/options/options_helper.cc index 5cf548fb9..7b3dd5560 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -56,6 +56,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, mutable_db_options.base_background_compactions; options.max_background_compactions = mutable_db_options.max_background_compactions; + options.bytes_per_sync = mutable_db_options.bytes_per_sync; + options.wal_bytes_per_sync = mutable_db_options.wal_bytes_per_sync; options.max_subcompactions = immutable_db_options.max_subcompactions; options.max_background_flushes = immutable_db_options.max_background_flushes; options.max_log_file_size = immutable_db_options.max_log_file_size; @@ -91,8 +93,6 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.writable_file_max_buffer_size = immutable_db_options.writable_file_max_buffer_size; options.use_adaptive_mutex = immutable_db_options.use_adaptive_mutex; - options.bytes_per_sync = immutable_db_options.bytes_per_sync; - options.wal_bytes_per_sync = immutable_db_options.wal_bytes_per_sync; options.listeners = immutable_db_options.listeners; options.enable_thread_tracking = immutable_db_options.enable_thread_tracking; options.delayed_write_rate = mutable_db_options.delayed_write_rate; diff --git a/options/options_helper.h b/options/options_helper.h index c59367779..e1d4a67e8 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -283,7 +283,8 @@ static std::unordered_map db_options_type_info = { OptionVerificationType::kNormal, false, 0}}, {"bytes_per_sync", {offsetof(struct DBOptions, bytes_per_sync), OptionType::kUInt64T, - OptionVerificationType::kNormal, false, 0}}, + OptionVerificationType::kNormal, true, + offsetof(struct MutableDBOptions, bytes_per_sync)}}, {"delayed_write_rate", {offsetof(struct DBOptions, delayed_write_rate), OptionType::kUInt64T, OptionVerificationType::kNormal, true, @@ -301,7 +302,8 @@ static std::unordered_map db_options_type_info = { offsetof(struct MutableDBOptions, max_total_wal_size)}}, {"wal_bytes_per_sync", {offsetof(struct DBOptions, wal_bytes_per_sync), OptionType::kUInt64T, - OptionVerificationType::kNormal, false, 0}}, + OptionVerificationType::kNormal, true, + offsetof(struct MutableDBOptions, wal_bytes_per_sync)}}, {"stats_dump_period_sec", {offsetof(struct DBOptions, stats_dump_period_sec), OptionType::kUInt, OptionVerificationType::kNormal, true, diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 8c099244f..df8ba5c86 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -1561,7 +1561,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { // request a flush for all column families such that the earliest // alive log file can be killed - db_impl->TEST_HandleWALFull(); + db_impl->TEST_SwitchWAL(); // log cannot be flushed because txn2 has not been commited ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed()); ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog()); @@ -1576,7 +1576,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { s = txn2->Commit(); ASSERT_OK(s); - db_impl->TEST_HandleWALFull(); + db_impl->TEST_SwitchWAL(); ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); // we should see that cfb now has a flush requested