Make bytes_per_sync and wal_bytes_per_sync mutable

Summary:
SUMMARY
Moves the bytes_per_sync and wal_bytes_per_sync options from immutableoptions to mutable options. Also if wal_bytes_per_sync is changed, the wal file and memtables are flushed.
TEST PLAN
ran make check
all passed

Two new tests SetBytesPerSync, SetWalBytesPerSync check that after issuing setoptions with a new value for the var, the db options have the new value.
Closes https://github.com/facebook/rocksdb/pull/2893

Reviewed By: yiwu-arbug

Differential Revision: D5845814

Pulled By: TheRushingWookie

fbshipit-source-id: 93b52d779ce623691b546679dcd984a06d2ad1bd
main
Quinn Jarrell 7 years ago committed by Facebook Github Bot
parent ec48e5c77f
commit 6a541afcc4
  1. 1
      HISTORY.md
  2. 5
      db/c.cc
  3. 10
      db/compaction_job.cc
  4. 4
      db/compaction_job.h
  5. 25
      db/db_impl.cc
  6. 7
      db/db_impl.h
  7. 23
      db/db_impl_compaction_flush.cc
  8. 4
      db/db_impl_debug.cc
  9. 4
      db/db_impl_open.cc
  10. 4
      db/db_impl_write.cc
  11. 81
      db/db_options_test.cc
  12. 8
      db/flush_job.cc
  13. 7
      db/flush_job.h
  14. 21
      db/flush_job_test.cc
  15. 19
      db/repair.cc
  16. 2
      include/rocksdb/c.h
  17. 7
      include/rocksdb/env.h
  18. 18
      options/db_options.cc
  19. 4
      options/db_options.h
  20. 4
      options/options_helper.cc
  21. 6
      options/options_helper.h
  22. 4
      utilities/transactions/transaction_test.cc

@ -4,6 +4,7 @@
* `BackupableDBOptions::max_valid_backups_to_open == 0` now means no backups will be opened during BackupEngine initialization. Previously this condition disabled limiting backups opened. * `BackupableDBOptions::max_valid_backups_to_open == 0` now means no backups will be opened during BackupEngine initialization. Previously this condition disabled limiting backups opened.
### New Features ### New Features
* `DBOptions::bytes_per_sync` and `DBOptions::wal_bytes_per_sync` can now be changed dynamically, `DBOptions::wal_bytes_per_sync` will flush all memtables and switch to a new WAL file.
### Bug Fixes ### Bug Fixes
* Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery. * Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery.

@ -2266,6 +2266,11 @@ void rocksdb_options_set_use_adaptive_mutex(
opt->rep.use_adaptive_mutex = v; opt->rep.use_adaptive_mutex = v;
} }
void rocksdb_options_set_wal_bytes_per_sync(
rocksdb_options_t* opt, uint64_t v) {
opt->rep.wal_bytes_per_sync = v;
}
void rocksdb_options_set_bytes_per_sync( void rocksdb_options_set_bytes_per_sync(
rocksdb_options_t* opt, uint64_t v) { rocksdb_options_t* opt, uint64_t v) {
opt->rep.bytes_per_sync = v; opt->rep.bytes_per_sync = v;

@ -38,7 +38,6 @@
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h" #include "monitoring/thread_status_util.h"
#include "port/likely.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -264,7 +263,7 @@ void CompactionJob::AggregateStatistics() {
CompactionJob::CompactionJob( CompactionJob::CompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const EnvOptions& env_options, VersionSet* versions, const EnvOptions env_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer, const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats, Directory* db_directory, Directory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, Status* db_bg_error, InstrumentedMutex* db_mutex, Status* db_bg_error,
@ -1261,11 +1260,10 @@ Status CompactionJob::OpenCompactionOutputFile(
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
// Make the output file // Make the output file
unique_ptr<WritableFile> writable_file; unique_ptr<WritableFile> writable_file;
EnvOptions opt_env_opts = bool syncpoint_arg = env_options_.use_direct_writes;
env_->OptimizeForCompactionTableWrite(env_options_, db_options_);
TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile", TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
&opt_env_opts.use_direct_writes); &syncpoint_arg);
Status s = NewWritableFile(env_, fname, &writable_file, opt_env_opts); Status s = NewWritableFile(env_, fname, &writable_file, env_options_);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR( ROCKS_LOG_ERROR(
db_options_.info_log, db_options_.info_log,

@ -56,7 +56,7 @@ class CompactionJob {
public: public:
CompactionJob(int job_id, Compaction* compaction, CompactionJob(int job_id, Compaction* compaction,
const ImmutableDBOptions& db_options, const ImmutableDBOptions& db_options,
const EnvOptions& env_options, VersionSet* versions, const EnvOptions env_options, VersionSet* versions,
const std::atomic<bool>* shutting_down, LogBuffer* log_buffer, const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Directory* db_directory, Directory* output_directory,
Statistics* stats, InstrumentedMutex* db_mutex, Statistics* stats, InstrumentedMutex* db_mutex,
@ -127,7 +127,7 @@ class CompactionJob {
// DBImpl state // DBImpl state
const std::string& dbname_; const std::string& dbname_;
const ImmutableDBOptions& db_options_; const ImmutableDBOptions& db_options_;
const EnvOptions& env_options_; const EnvOptions env_options_;
Env* env_; Env* env_;
VersionSet* versions_; VersionSet* versions_;

@ -181,6 +181,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
has_unpersisted_data_(false), has_unpersisted_data_(false),
unable_to_flush_oldest_log_(false), unable_to_flush_oldest_log_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
env_options_,
immutable_db_options_)),
num_running_ingest_file_(0), num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
wal_manager_(immutable_db_options_, env_options_), wal_manager_(immutable_db_options_, env_options_),
@ -539,6 +542,7 @@ Status DBImpl::SetDBOptions(
MutableDBOptions new_options; MutableDBOptions new_options;
Status s; Status s;
Status persist_options_status; Status persist_options_status;
bool wal_changed = false;
WriteThread::Writer w; WriteThread::Writer w;
WriteContext write_context; WriteContext write_context;
{ {
@ -557,12 +561,25 @@ Status DBImpl::SetDBOptions(
table_cache_.get()->SetCapacity(new_options.max_open_files == -1 table_cache_.get()->SetCapacity(new_options.max_open_files == -1
? TableCache::kInfiniteCapacity ? TableCache::kInfiniteCapacity
: new_options.max_open_files - 10); : new_options.max_open_files - 10);
wal_changed = mutable_db_options_.wal_bytes_per_sync !=
new_options.wal_bytes_per_sync;
if (new_options.bytes_per_sync == 0) {
new_options.bytes_per_sync = 1024 * 1024;
}
mutable_db_options_ = new_options; mutable_db_options_ = new_options;
env_options_for_compaction_ = EnvOptions(BuildDBOptions(
immutable_db_options_,
mutable_db_options_));
env_options_for_compaction_ = env_->OptimizeForCompactionTableWrite(
env_options_for_compaction_,
immutable_db_options_);
env_options_for_compaction_.bytes_per_sync
= mutable_db_options_.bytes_per_sync;
env_->OptimizeForCompactionTableWrite(env_options_for_compaction_,
immutable_db_options_);
write_thread_.EnterUnbatched(&w, &mutex_); write_thread_.EnterUnbatched(&w, &mutex_);
if (total_log_size_ > GetMaxTotalWalSize()) { if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
Status purge_wal_status = HandleWALFull(&write_context); Status purge_wal_status = SwitchWAL(&write_context);
if (!purge_wal_status.ok()) { if (!purge_wal_status.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log, ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to purge WAL files in SetDBOptions() -- %s", "Unable to purge WAL files in SetDBOptions() -- %s",

@ -332,7 +332,7 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family = nullptr, ColumnFamilyHandle* column_family = nullptr,
bool disallow_trivial_move = false); bool disallow_trivial_move = false);
void TEST_HandleWALFull(); void TEST_SwitchWAL();
bool TEST_UnableToFlushOldestLog() { bool TEST_UnableToFlushOldestLog() {
return unable_to_flush_oldest_log_; return unable_to_flush_oldest_log_;
@ -750,7 +750,7 @@ class DBImpl : public DB {
Status WaitForFlushMemTable(ColumnFamilyData* cfd); Status WaitForFlushMemTable(ColumnFamilyData* cfd);
// REQUIRES: mutex locked // REQUIRES: mutex locked
Status HandleWALFull(WriteContext* write_context); Status SwitchWAL(WriteContext* write_context);
// REQUIRES: mutex locked // REQUIRES: mutex locked
Status HandleWriteBufferFull(WriteContext* write_context); Status HandleWriteBufferFull(WriteContext* write_context);
@ -1168,6 +1168,9 @@ class DBImpl : public DB {
// The options to access storage files // The options to access storage files
const EnvOptions env_options_; const EnvOptions env_options_;
// Additonal options for compaction and flush
EnvOptions env_options_for_compaction_;
// Number of running IngestExternalFile() calls. // Number of running IngestExternalFile() calls.
// REQUIRES: mutex held // REQUIRES: mutex held
int num_running_ingest_file_; int num_running_ingest_file_;

@ -87,10 +87,11 @@ Status DBImpl::FlushMemTableToOutputFile(
snapshots_.GetAll(&earliest_write_conflict_snapshot); snapshots_.GetAll(&earliest_write_conflict_snapshot);
FlushJob flush_job( FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options, env_options_, dbname_, cfd, immutable_db_options_, mutable_cf_options,
versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, env_options_for_compaction_, versions_.get(), &mutex_,
earliest_write_conflict_snapshot, job_context, log_buffer, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
directories_.GetDbDir(), directories_.GetDataDir(0U), job_context, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats); &event_logger_, mutable_cf_options.report_bg_io_stats);
@ -532,8 +533,9 @@ Status DBImpl::CompactFilesImpl(
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, env_options_, job_context->job_id, c.get(), immutable_db_options_,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), env_options_for_compaction_, versions_.get(), &shutting_down_,
log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_, directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_,
snapshot_seqs, earliest_write_conflict_snapshot, table_cache_, snapshot_seqs, earliest_write_conflict_snapshot, table_cache_,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks, &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
@ -1676,11 +1678,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, env_options_, job_context->job_id, c.get(), immutable_db_options_,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), env_options_for_compaction_, versions_.get(), &shutting_down_,
log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, directories_.GetDataDir(c->output_path_id()), stats_, &mutex_,
&bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, &bg_error_, snapshot_seqs,
table_cache_, &event_logger_, earliest_write_conflict_snapshot, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_, c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats); &compaction_job_stats);

@ -19,10 +19,10 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0); return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
} }
void DBImpl::TEST_HandleWALFull() { void DBImpl::TEST_SwitchWAL() {
WriteContext write_context; WriteContext write_context;
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
HandleWALFull(&write_context); SwitchWAL(&write_context);
} }
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(

@ -884,11 +884,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
std::vector<SequenceNumber> snapshot_seqs = std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot); snapshots_.GetAll(&earliest_write_conflict_snapshot);
EnvOptions optimized_env_options =
env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
s = BuildTable( s = BuildTable(
dbname_, env_, *cfd->ioptions(), mutable_cf_options, dbname_, env_, *cfd->ioptions(), mutable_cf_options,
optimized_env_options, cfd->table_cache(), iter.get(), env_options_for_compaction_, cfd->table_cache(), iter.get(),
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)), std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
&meta, cfd->internal_comparator(), &meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),

@ -619,7 +619,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
if (UNLIKELY(status.ok() && !single_column_family_mode_ && if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
total_log_size_ > GetMaxTotalWalSize())) { total_log_size_ > GetMaxTotalWalSize())) {
status = HandleWALFull(write_context); status = SwitchWAL(write_context);
} }
if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) { if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
@ -830,7 +830,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
return status; return status;
} }
Status DBImpl::HandleWALFull(WriteContext* write_context) { Status DBImpl::SwitchWAL(WriteContext* write_context) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(write_context != nullptr); assert(write_context != nullptr);
Status status; Status status;

@ -117,6 +117,87 @@ TEST_F(DBOptionsTest, GetLatestCFOptions) {
GetMutableCFOptionsMap(dbfull()->GetOptions(handles_[1]))); GetMutableCFOptionsMap(dbfull()->GetOptions(handles_[1])));
} }
TEST_F(DBOptionsTest, SetBytesPerSync) {
const size_t kValueSize = 1024 * 1024 * 8;
Options options;
options.create_if_missing = true;
options.bytes_per_sync = 1024 * 1024;
options.use_direct_reads = false;
options.write_buffer_size = 400 * kValueSize;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
int counter = 0;
int low_bytes_per_sync = 0;
int i = 0;
const std::string kValue(kValueSize, 'v');
ASSERT_EQ(options.bytes_per_sync, dbfull()->GetDBOptions().bytes_per_sync);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::RangeSync:0", [&](void* arg) {
counter++;
});
WriteOptions write_opts;
for (; i < 40; i++) {
Put(Key(i), kValue, write_opts);
}
i = 0;
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
low_bytes_per_sync = counter;
counter = 0;
// 8388608 = 8 * 1024 * 1024
ASSERT_OK(dbfull()->SetDBOptions({{"bytes_per_sync", "8388608"}}));
ASSERT_EQ(8388608, dbfull()->GetDBOptions().bytes_per_sync);
for (; i < 40; i++) {
Put(Key(i), kValue, write_opts);
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_GT(counter, 0);
ASSERT_GT(low_bytes_per_sync, 0);
ASSERT_GT(low_bytes_per_sync, counter);
}
TEST_F(DBOptionsTest, SetWalBytesPerSync) {
const size_t kValueSize = 1024 * 1024 * 3;
Options options;
options.create_if_missing = true;
options.wal_bytes_per_sync = 512;
options.write_buffer_size = 100 * kValueSize;
options.disable_auto_compactions = true;
options.env = env_;
Reopen(options);
ASSERT_EQ(512, dbfull()->GetDBOptions().wal_bytes_per_sync);
int counter = 0;
int low_bytes_per_sync = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::RangeSync:0", [&](void* arg) {
counter++;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
const std::string kValue(kValueSize, 'v');
int i = 0;
for (; i < 10; i++) {
Put(Key(i), kValue);
}
// Do not flush. If we flush here, SwitchWAL will reuse old WAL file since its
// empty and will not get the new wal_bytes_per_sync value.
low_bytes_per_sync = counter;
//5242880 = 1024 * 1024 * 5
ASSERT_OK(dbfull()->SetDBOptions({{"wal_bytes_per_sync", "5242880"}}));
ASSERT_EQ(5242880, dbfull()->GetDBOptions().wal_bytes_per_sync);
counter = 0;
i = 0;
for (; i < 10; i++) {
Put(Key(i), kValue);
}
ASSERT_GT(counter, 0);
ASSERT_GT(low_bytes_per_sync, 0);
ASSERT_GT(low_bytes_per_sync, counter);
}
TEST_F(DBOptionsTest, SetOptionsAndReopen) { TEST_F(DBOptionsTest, SetOptionsAndReopen) {
Random rnd(1044); Random rnd(1044);
auto rand_opts = GetRandomizedMutableCFOptionsMap(&rnd); auto rand_opts = GetRandomizedMutableCFOptionsMap(&rnd);

@ -30,7 +30,6 @@
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h" #include "monitoring/thread_status_util.h"
#include "port/likely.h"
#include "port/port.h" #include "port/port.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -58,7 +57,7 @@ namespace rocksdb {
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options, const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions, const EnvOptions env_options, VersionSet* versions,
InstrumentedMutex* db_mutex, InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down, std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
@ -294,16 +293,13 @@ Status FlushJob::WriteLevel0Table() {
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_); &output_compression_);
EnvOptions optimized_env_options =
db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_);
int64_t _current_time = 0; int64_t _current_time = 0;
db_options_.env->GetCurrentTime(&_current_time); // ignore error db_options_.env->GetCurrentTime(&_current_time); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time); const uint64_t current_time = static_cast<uint64_t>(_current_time);
s = BuildTable( s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
optimized_env_options, cfd_->table_cache(), iter.get(), env_options_, cfd_->table_cache(), iter.get(),
std::move(range_del_iter), &meta_, cfd_->internal_comparator(), std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_, cfd_->GetName(), existing_snapshots_,

@ -56,8 +56,9 @@ class FlushJob {
FlushJob(const std::string& dbname, ColumnFamilyData* cfd, FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options, const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions, const EnvOptions env_options,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down, VersionSet* versions, InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot, SequenceNumber earliest_write_conflict_snapshot,
JobContext* job_context, LogBuffer* log_buffer, JobContext* job_context, LogBuffer* log_buffer,
@ -83,7 +84,7 @@ class FlushJob {
ColumnFamilyData* cfd_; ColumnFamilyData* cfd_;
const ImmutableDBOptions& db_options_; const ImmutableDBOptions& db_options_;
const MutableCFOptions& mutable_cf_options_; const MutableCFOptions& mutable_cf_options_;
const EnvOptions& env_options_; const EnvOptions env_options_;
VersionSet* versions_; VersionSet* versions_;
InstrumentedMutex* db_mutex_; InstrumentedMutex* db_mutex_;
std::atomic<bool>* shutting_down_; std::atomic<bool>* shutting_down_;

@ -94,9 +94,10 @@ TEST_F(FlushJobTest, Empty) {
EventLogger event_logger(db_options_.info_log.get()); EventLogger event_logger(db_options_.info_log.get());
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_, env_options_, versions_.get(), &mutex_,
{}, kMaxSequenceNumber, &job_context, nullptr, nullptr, &shutting_down_, {}, kMaxSequenceNumber, &job_context,
nullptr, kNoCompression, nullptr, &event_logger, false); nullptr, nullptr, nullptr, kNoCompression, nullptr,
&event_logger, false);
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
flush_job.PickMemTable(); flush_job.PickMemTable();
@ -138,9 +139,10 @@ TEST_F(FlushJobTest, NonEmpty) {
EventLogger event_logger(db_options_.info_log.get()); EventLogger event_logger(db_options_.info_log.get());
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_, env_options_, versions_.get(), &mutex_,
{}, kMaxSequenceNumber, &job_context, nullptr, nullptr, &shutting_down_, {}, kMaxSequenceNumber, &job_context,
nullptr, kNoCompression, nullptr, &event_logger, true); nullptr, nullptr, nullptr, kNoCompression, nullptr,
&event_logger, true);
FileMetaData fd; FileMetaData fd;
mutex_.Lock(); mutex_.Lock();
flush_job.PickMemTable(); flush_job.PickMemTable();
@ -204,9 +206,10 @@ TEST_F(FlushJobTest, Snapshots) {
EventLogger event_logger(db_options_.info_log.get()); EventLogger event_logger(db_options_.info_log.get());
FlushJob flush_job( FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, *cfd->GetLatestMutableCFOptions(), env_options_,
&shutting_down_, snapshots, kMaxSequenceNumber, &job_context, nullptr, versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
nullptr, nullptr, kNoCompression, nullptr, &event_logger, true); &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr,
&event_logger, true);
mutex_.Lock(); mutex_.Lock();
flush_job.PickMemTable(); flush_job.PickMemTable();
ASSERT_OK(flush_job.Run()); ASSERT_OK(flush_job.Run());

@ -99,7 +99,7 @@ class Repairer {
env_(db_options.env), env_(db_options.env),
env_options_(), env_options_(),
db_options_(SanitizeOptions(dbname_, db_options)), db_options_(SanitizeOptions(dbname_, db_options)),
immutable_db_options_(db_options_), immutable_db_options_(ImmutableDBOptions(db_options_)),
icmp_(default_cf_opts.comparator), icmp_(default_cf_opts.comparator),
default_cf_opts_(default_cf_opts), default_cf_opts_(default_cf_opts),
default_cf_iopts_( default_cf_iopts_(
@ -397,16 +397,13 @@ class Repairer {
ro.total_order_seek = true; ro.total_order_seek = true;
Arena arena; Arena arena;
ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
EnvOptions optimized_env_options =
env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
int64_t _current_time = 0; int64_t _current_time = 0;
status = env_->GetCurrentTime(&_current_time); // ignore error status = env_->GetCurrentTime(&_current_time); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time); const uint64_t current_time = static_cast<uint64_t>(_current_time);
status = BuildTable( status = BuildTable(
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
optimized_env_options, table_cache_, iter.get(), env_options_, table_cache_, iter.get(),
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)), std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
&meta, cfd->internal_comparator(), &meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
@ -618,11 +615,13 @@ Status GetDefaultCFOptions(
} // anonymous namespace } // anonymous namespace
Status RepairDB(const std::string& dbname, const DBOptions& db_options, Status RepairDB(const std::string& dbname, const DBOptions& db_options,
const std::vector<ColumnFamilyDescriptor>& column_families) { const std::vector<ColumnFamilyDescriptor>& column_families
) {
ColumnFamilyOptions default_cf_opts; ColumnFamilyOptions default_cf_opts;
Status status = GetDefaultCFOptions(column_families, &default_cf_opts); Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
if (status.ok()) { if (status.ok()) {
Repairer repairer(dbname, db_options, column_families, default_cf_opts, Repairer repairer(dbname, db_options, column_families,
default_cf_opts,
ColumnFamilyOptions() /* unknown_cf_opts */, ColumnFamilyOptions() /* unknown_cf_opts */,
false /* create_unknown_cfs */); false /* create_unknown_cfs */);
status = repairer.Run(); status = repairer.Run();
@ -636,7 +635,8 @@ Status RepairDB(const std::string& dbname, const DBOptions& db_options,
ColumnFamilyOptions default_cf_opts; ColumnFamilyOptions default_cf_opts;
Status status = GetDefaultCFOptions(column_families, &default_cf_opts); Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
if (status.ok()) { if (status.ok()) {
Repairer repairer(dbname, db_options, column_families, default_cf_opts, Repairer repairer(dbname, db_options,
column_families, default_cf_opts,
unknown_cf_opts, true /* create_unknown_cfs */); unknown_cf_opts, true /* create_unknown_cfs */);
status = repairer.Run(); status = repairer.Run();
} }
@ -646,7 +646,8 @@ Status RepairDB(const std::string& dbname, const DBOptions& db_options,
Status RepairDB(const std::string& dbname, const Options& options) { Status RepairDB(const std::string& dbname, const Options& options) {
DBOptions db_options(options); DBOptions db_options(options);
ColumnFamilyOptions cf_options(options); ColumnFamilyOptions cf_options(options);
Repairer repairer(dbname, db_options, {}, cf_options /* default_cf_opts */, Repairer repairer(dbname, db_options,
{}, cf_options /* default_cf_opts */,
cf_options /* unknown_cf_opts */, cf_options /* unknown_cf_opts */,
true /* create_unknown_cfs */); true /* create_unknown_cfs */);
return repairer.Run(); return repairer.Run();

@ -854,6 +854,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_adaptive_mutex(
rocksdb_options_t*, unsigned char); rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_bytes_per_sync( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_bytes_per_sync(
rocksdb_options_t*, uint64_t); rocksdb_options_t*, uint64_t);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_wal_bytes_per_sync(
rocksdb_options_t*, uint64_t);
extern ROCKSDB_LIBRARY_API void extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_allow_concurrent_memtable_write(rocksdb_options_t*, rocksdb_options_set_allow_concurrent_memtable_write(rocksdb_options_t*,
unsigned char); unsigned char);

@ -45,6 +45,7 @@ class RandomRWFile;
class Directory; class Directory;
struct DBOptions; struct DBOptions;
struct ImmutableDBOptions; struct ImmutableDBOptions;
struct MutableDBOptions;
class RateLimiter; class RateLimiter;
class ThreadStatusUpdater; class ThreadStatusUpdater;
struct ThreadStatus; struct ThreadStatus;
@ -411,7 +412,7 @@ class Env {
// table files. // table files.
virtual EnvOptions OptimizeForCompactionTableWrite( virtual EnvOptions OptimizeForCompactionTableWrite(
const EnvOptions& env_options, const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const; const ImmutableDBOptions& immutable_ops) const;
// OptimizeForCompactionTableWrite will create a new EnvOptions object that // OptimizeForCompactionTableWrite will create a new EnvOptions object that
// is a copy of the EnvOptions in the parameters, but is optimized for reading // is a copy of the EnvOptions in the parameters, but is optimized for reading
@ -1087,8 +1088,8 @@ class EnvWrapper : public Env {
} }
EnvOptions OptimizeForCompactionTableWrite( EnvOptions OptimizeForCompactionTableWrite(
const EnvOptions& env_options, const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const override { const ImmutableDBOptions& immutable_ops) const override {
return target_->OptimizeForCompactionTableWrite(env_options, db_options); return target_->OptimizeForCompactionTableWrite(env_options, immutable_ops);
} }
EnvOptions OptimizeForCompactionTableRead( EnvOptions OptimizeForCompactionTableRead(
const EnvOptions& env_options, const EnvOptions& env_options,

@ -66,8 +66,6 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
random_access_max_buffer_size(options.random_access_max_buffer_size), random_access_max_buffer_size(options.random_access_max_buffer_size),
writable_file_max_buffer_size(options.writable_file_max_buffer_size), writable_file_max_buffer_size(options.writable_file_max_buffer_size),
use_adaptive_mutex(options.use_adaptive_mutex), use_adaptive_mutex(options.use_adaptive_mutex),
bytes_per_sync(options.bytes_per_sync),
wal_bytes_per_sync(options.wal_bytes_per_sync),
listeners(options.listeners), listeners(options.listeners),
enable_thread_tracking(options.enable_thread_tracking), enable_thread_tracking(options.enable_thread_tracking),
enable_pipelined_write(options.enable_pipelined_write), enable_pipelined_write(options.enable_pipelined_write),
@ -187,12 +185,6 @@ void ImmutableDBOptions::Dump(Logger* log) const {
Header( Header(
log, " Options.sst_file_manager.rate_bytes_per_sec: %" PRIi64, log, " Options.sst_file_manager.rate_bytes_per_sec: %" PRIi64,
sst_file_manager ? sst_file_manager->GetDeleteRateBytesPerSecond() : 0); sst_file_manager ? sst_file_manager->GetDeleteRateBytesPerSecond() : 0);
ROCKS_LOG_HEADER(log,
" Options.bytes_per_sync: %" PRIu64,
bytes_per_sync);
ROCKS_LOG_HEADER(log,
" Options.wal_bytes_per_sync: %" PRIu64,
wal_bytes_per_sync);
ROCKS_LOG_HEADER(log, " Options.wal_recovery_mode: %d", ROCKS_LOG_HEADER(log, " Options.wal_recovery_mode: %d",
wal_recovery_mode); wal_recovery_mode);
ROCKS_LOG_HEADER(log, " Options.enable_thread_tracking: %d", ROCKS_LOG_HEADER(log, " Options.enable_thread_tracking: %d",
@ -254,7 +246,9 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options)
delete_obsolete_files_period_micros( delete_obsolete_files_period_micros(
options.delete_obsolete_files_period_micros), options.delete_obsolete_files_period_micros),
stats_dump_period_sec(options.stats_dump_period_sec), stats_dump_period_sec(options.stats_dump_period_sec),
max_open_files(options.max_open_files) {} max_open_files(options.max_open_files),
bytes_per_sync(options.bytes_per_sync),
wal_bytes_per_sync(options.wal_bytes_per_sync) {}
void MutableDBOptions::Dump(Logger* log) const { void MutableDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log, " Options.max_background_jobs: %d", ROCKS_LOG_HEADER(log, " Options.max_background_jobs: %d",
@ -274,6 +268,12 @@ void MutableDBOptions::Dump(Logger* log) const {
stats_dump_period_sec); stats_dump_period_sec);
ROCKS_LOG_HEADER(log, " Options.max_open_files: %d", ROCKS_LOG_HEADER(log, " Options.max_open_files: %d",
max_open_files); max_open_files);
ROCKS_LOG_HEADER(log,
" Options.bytes_per_sync: %" PRIu64,
bytes_per_sync);
ROCKS_LOG_HEADER(log,
" Options.wal_bytes_per_sync: %" PRIu64,
wal_bytes_per_sync);
} }
} // namespace rocksdb } // namespace rocksdb

@ -59,8 +59,6 @@ struct ImmutableDBOptions {
size_t random_access_max_buffer_size; size_t random_access_max_buffer_size;
size_t writable_file_max_buffer_size; size_t writable_file_max_buffer_size;
bool use_adaptive_mutex; bool use_adaptive_mutex;
uint64_t bytes_per_sync;
uint64_t wal_bytes_per_sync;
std::vector<std::shared_ptr<EventListener>> listeners; std::vector<std::shared_ptr<EventListener>> listeners;
bool enable_thread_tracking; bool enable_thread_tracking;
bool enable_pipelined_write; bool enable_pipelined_write;
@ -100,6 +98,8 @@ struct MutableDBOptions {
uint64_t delete_obsolete_files_period_micros; uint64_t delete_obsolete_files_period_micros;
unsigned int stats_dump_period_sec; unsigned int stats_dump_period_sec;
int max_open_files; int max_open_files;
uint64_t bytes_per_sync;
uint64_t wal_bytes_per_sync;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -56,6 +56,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
mutable_db_options.base_background_compactions; mutable_db_options.base_background_compactions;
options.max_background_compactions = options.max_background_compactions =
mutable_db_options.max_background_compactions; mutable_db_options.max_background_compactions;
options.bytes_per_sync = mutable_db_options.bytes_per_sync;
options.wal_bytes_per_sync = mutable_db_options.wal_bytes_per_sync;
options.max_subcompactions = immutable_db_options.max_subcompactions; options.max_subcompactions = immutable_db_options.max_subcompactions;
options.max_background_flushes = immutable_db_options.max_background_flushes; options.max_background_flushes = immutable_db_options.max_background_flushes;
options.max_log_file_size = immutable_db_options.max_log_file_size; options.max_log_file_size = immutable_db_options.max_log_file_size;
@ -91,8 +93,6 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
options.writable_file_max_buffer_size = options.writable_file_max_buffer_size =
immutable_db_options.writable_file_max_buffer_size; immutable_db_options.writable_file_max_buffer_size;
options.use_adaptive_mutex = immutable_db_options.use_adaptive_mutex; options.use_adaptive_mutex = immutable_db_options.use_adaptive_mutex;
options.bytes_per_sync = immutable_db_options.bytes_per_sync;
options.wal_bytes_per_sync = immutable_db_options.wal_bytes_per_sync;
options.listeners = immutable_db_options.listeners; options.listeners = immutable_db_options.listeners;
options.enable_thread_tracking = immutable_db_options.enable_thread_tracking; options.enable_thread_tracking = immutable_db_options.enable_thread_tracking;
options.delayed_write_rate = mutable_db_options.delayed_write_rate; options.delayed_write_rate = mutable_db_options.delayed_write_rate;

@ -283,7 +283,8 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
OptionVerificationType::kNormal, false, 0}}, OptionVerificationType::kNormal, false, 0}},
{"bytes_per_sync", {"bytes_per_sync",
{offsetof(struct DBOptions, bytes_per_sync), OptionType::kUInt64T, {offsetof(struct DBOptions, bytes_per_sync), OptionType::kUInt64T,
OptionVerificationType::kNormal, false, 0}}, OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, bytes_per_sync)}},
{"delayed_write_rate", {"delayed_write_rate",
{offsetof(struct DBOptions, delayed_write_rate), OptionType::kUInt64T, {offsetof(struct DBOptions, delayed_write_rate), OptionType::kUInt64T,
OptionVerificationType::kNormal, true, OptionVerificationType::kNormal, true,
@ -301,7 +302,8 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
offsetof(struct MutableDBOptions, max_total_wal_size)}}, offsetof(struct MutableDBOptions, max_total_wal_size)}},
{"wal_bytes_per_sync", {"wal_bytes_per_sync",
{offsetof(struct DBOptions, wal_bytes_per_sync), OptionType::kUInt64T, {offsetof(struct DBOptions, wal_bytes_per_sync), OptionType::kUInt64T,
OptionVerificationType::kNormal, false, 0}}, OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, wal_bytes_per_sync)}},
{"stats_dump_period_sec", {"stats_dump_period_sec",
{offsetof(struct DBOptions, stats_dump_period_sec), OptionType::kUInt, {offsetof(struct DBOptions, stats_dump_period_sec), OptionType::kUInt,
OptionVerificationType::kNormal, true, OptionVerificationType::kNormal, true,

@ -1561,7 +1561,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
// request a flush for all column families such that the earliest // request a flush for all column families such that the earliest
// alive log file can be killed // alive log file can be killed
db_impl->TEST_HandleWALFull(); db_impl->TEST_SwitchWAL();
// log cannot be flushed because txn2 has not been commited // log cannot be flushed because txn2 has not been commited
ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed()); ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog()); ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog());
@ -1576,7 +1576,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
s = txn2->Commit(); s = txn2->Commit();
ASSERT_OK(s); ASSERT_OK(s);
db_impl->TEST_HandleWALFull(); db_impl->TEST_SwitchWAL();
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
// we should see that cfb now has a flush requested // we should see that cfb now has a flush requested

Loading…
Cancel
Save