Remove redundant member var and set options (#4631)

Summary:
In the past, both `DBImpl::atomic_flush_` and
`DBImpl::immutable_db_options_.atomic_flush` exist. However, we fail to set
`immutable_db_options_.atomic_flush`, but use `DBImpl::atomic_flush_` which is
set correctly. This does not lead to incorrect behavior, but is a duplicate of
information.

Since `immutable_db_options_` is always there and has `atomic_flush`, we should
use it as source of truth and remove `DBImpl::atomic_flush_`.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4631

Differential Revision: D12928371

Pulled By: riversand963

fbshipit-source-id: f85a811959d3828aad4a3a1b05f71facf19c636d
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent 09426ae1c7
commit 05dec0c7c7
  1. 2
      db/db_filesnapshot.cc
  2. 7
      db/db_impl.cc
  3. 3
      db/db_impl.h
  4. 8
      db/db_impl_compaction_flush.cc
  5. 14
      db/db_impl_write.cc
  6. 3
      options/db_options.cc
  7. 1
      options/options_helper.cc

@ -86,7 +86,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
if (flush_memtable) {
// flush all dirty data to disk.
Status status;
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();

@ -220,7 +220,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
preserve_deletes_(options.preserve_deletes),
closed_(false),
error_handler_(this, immutable_db_options_, &mutex_),
atomic_flush_(options.atomic_flush),
atomic_flush_commit_in_progress_(false) {
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
@ -309,7 +308,7 @@ Status DBImpl::ResumeImpl() {
FlushOptions flush_opts;
// We allow flush to stall write since we are trying to resume from error.
flush_opts.allow_write_stall = true;
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
@ -401,7 +400,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
if (!shutting_down_.load(std::memory_order_acquire) &&
has_unpersisted_data_.load(std::memory_order_relaxed) &&
!mutable_db_options_.avoid_flush_during_shutdown) {
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
@ -3130,7 +3129,7 @@ Status DBImpl::IngestExternalFile(
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
&need_flush);
if (status.ok() && need_flush) {
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
mutex_.Unlock();
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);

@ -1611,9 +1611,6 @@ class DBImpl : public DB {
ErrorHandler error_handler_;
// True if DB enables atomic flush.
bool atomic_flush_;
// True if the DB is committing atomic flush.
// TODO (yanqin) the current impl assumes that the entire DB belongs to
// a single atomic flush group. In the future we need to add a new class

@ -211,7 +211,7 @@ Status DBImpl::FlushMemTableToOutputFile(
Status DBImpl::FlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer) {
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer);
}
@ -566,7 +566,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
if (flush_needed) {
FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
@ -1196,7 +1196,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
cfh->GetName().c_str());
Status s;
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
FlushReason::kManualFlush);
} else {
@ -1212,7 +1212,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
Status DBImpl::Flush(const FlushOptions& flush_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
Status s;
if (!atomic_flush_) {
if (!immutable_db_options_.atomic_flush) {
for (auto cfh : column_families) {
s = Flush(flush_options, cfh);
if (!s.ok()) {

@ -1029,7 +1029,7 @@ void DBImpl::SelectColumnFamiliesForAtomicFlush(
// Assign sequence number for atomic flush.
void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) {
assert(atomic_flush_);
assert(immutable_db_options_.atomic_flush);
auto seq = versions_->LastSequence();
for (auto cfd : cfds) {
cfd->imm()->AssignAtomicFlushSeq(seq);
@ -1085,7 +1085,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
autovector<ColumnFamilyData*> cfds;
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
SelectColumnFamiliesForAtomicFlush(&cfds);
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
@ -1106,7 +1106,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
}
}
if (status.ok()) {
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds);
}
for (auto cfd : cfds) {
@ -1139,7 +1139,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
autovector<ColumnFamilyData*> cfds;
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
SelectColumnFamiliesForAtomicFlush(&cfds);
} else {
ColumnFamilyData* cfd_picked = nullptr;
@ -1176,7 +1176,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
}
}
if (status.ok()) {
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds);
}
for (const auto cfd : cfds) {
@ -1307,7 +1307,7 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
Status DBImpl::ScheduleFlushes(WriteContext* context) {
autovector<ColumnFamilyData*> cfds;
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
SelectColumnFamiliesForAtomicFlush(&cfds);
for (auto cfd : cfds) {
cfd->Ref();
@ -1333,7 +1333,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
}
}
if (status.ok()) {
if (atomic_flush_) {
if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds);
}
FlushRequest flush_req;

@ -85,7 +85,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
allow_ingest_behind(options.allow_ingest_behind),
preserve_deletes(options.preserve_deletes),
two_write_queues(options.two_write_queues),
manual_wal_flush(options.manual_wal_flush) {
manual_wal_flush(options.manual_wal_flush),
atomic_flush(options.atomic_flush) {
}
void ImmutableDBOptions::Dump(Logger* log) const {

@ -126,6 +126,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
immutable_db_options.preserve_deletes;
options.two_write_queues = immutable_db_options.two_write_queues;
options.manual_wal_flush = immutable_db_options.manual_wal_flush;
options.atomic_flush = immutable_db_options.atomic_flush;
return options;
}

Loading…
Cancel
Save