Refactor WriteImpl (pipeline write part 1)

Summary:
Refactor WriteImpl() so when I plug-in the pipeline write code (which is
an alternative approach for WriteThread), some of the logic can be
reuse. I split out the following methods from WriteImpl():

* PreprocessWrite()
* HandleWALFull() (previous MaybeFlushColumnFamilies())
* HandleWriteBufferFull()
* WriteToWAL()

Also adding a constructor to WriteThread::Writer, and move WriteContext into db_impl.h.
No real logic change in this patch.
Closes https://github.com/facebook/rocksdb/pull/2042

Differential Revision: D4781014

Pulled By: yiwu-arbug

fbshipit-source-id: d45ca18
main
Yi Wu 8 years ago committed by Facebook Github Bot
parent 6ef8c620d3
commit 9e44531803
  1. 464
      db/db_impl.cc
  2. 36
      db/db_impl.h
  3. 5
      db/db_impl_debug.cc
  4. 2
      db/write_thread.cc
  5. 24
      db/write_thread.h
  6. 4
      utilities/transactions/transaction_test.cc

@ -106,20 +106,6 @@ const std::string kDefaultColumnFamilyName("default");
void DumpRocksDBBuildVersion(Logger * log); void DumpRocksDBBuildVersion(Logger * log);
struct DBImpl::WriteContext {
autovector<SuperVersion*> superversions_to_free_;
autovector<MemTable*> memtables_to_free_;
~WriteContext() {
for (auto& sv : superversions_to_free_) {
delete sv;
}
for (auto& m : memtables_to_free_) {
delete m;
}
}
};
Options SanitizeOptions(const std::string& dbname, Options SanitizeOptions(const std::string& dbname,
const Options& src) { const Options& src) {
auto db_options = SanitizeOptions(dbname, DBOptions(src)); auto db_options = SanitizeOptions(dbname, DBOptions(src));
@ -2459,6 +2445,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
MutableCFOptions new_options; MutableCFOptions new_options;
Status s; Status s;
Status persist_options_status; Status persist_options_status;
WriteThread::Writer w;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
s = cfd->SetOptions(options_map); s = cfd->SetOptions(options_map);
@ -2475,7 +2462,9 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options);
delete old_sv; delete old_sv;
persist_options_status = PersistOptions(); write_thread_.EnterUnbatched(&w, &mutex_);
persist_options_status = WriteOptionsFile();
write_thread_.ExitUnbatched(&w);
} }
} }
@ -2523,6 +2512,8 @@ Status DBImpl::SetDBOptions(
MutableDBOptions new_options; MutableDBOptions new_options;
Status s; Status s;
Status persist_options_status; Status persist_options_status;
WriteThread::Writer w;
WriteContext write_context;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map, s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
@ -2539,11 +2530,17 @@ Status DBImpl::SetDBOptions(
mutable_db_options_ = new_options; mutable_db_options_ = new_options;
write_thread_.EnterUnbatched(&w, &mutex_);
if (total_log_size_ > GetMaxTotalWalSize()) { if (total_log_size_ > GetMaxTotalWalSize()) {
MaybeFlushColumnFamilies(); Status purge_wal_status = HandleWALFull(&write_context);
if (!purge_wal_status.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to purge WAL files in SetDBOptions() -- %s",
purge_wal_status.ToString().c_str());
} }
}
persist_options_status = PersistOptions(); persist_options_status = WriteOptionsFile();
write_thread_.ExitUnbatched(&w);
} }
} }
ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:"); ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
@ -2572,15 +2569,6 @@ Status DBImpl::SetDBOptions(
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
Status DBImpl::PersistOptions() {
mutex_.AssertHeld();
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
Status s = WriteOptionsFile();
write_thread_.ExitUnbatched(&w);
return s;
}
// return the same level if it cannot be moved // return the same level if it cannot be moved
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options, int level) { const MutableCFOptions& mutable_cf_options, int level) {
@ -4674,14 +4662,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
Status status; Status status;
PERF_TIMER_GUARD(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w; WriteThread::Writer w(write_options, my_batch, callback, log_ref,
w.batch = my_batch; disable_memtable);
w.sync = write_options.sync;
w.disableWAL = write_options.disableWAL;
w.disable_memtable = disable_memtable;
w.in_batch_group = false;
w.callback = callback;
w.log_ref = log_ref;
if (!write_options.disableWAL) { if (!write_options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL); RecordTick(stats_, WRITE_WITH_WAL);
@ -4694,10 +4676,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// we are a non-leader in a parallel group // we are a non-leader in a parallel group
PERF_TIMER_GUARD(write_memtable_time); PERF_TIMER_GUARD(write_memtable_time);
if (log_used != nullptr) {
*log_used = w.log_used;
}
if (w.ShouldWriteToMemtable()) { if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables( ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet()); versions_->GetColumnFamilySet());
@ -4724,123 +4702,35 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
*log_used = w.log_used; *log_used = w.log_used;
} }
// write is complete and leader has updated sequence // write is complete and leader has updated sequence
RecordTick(stats_, WRITE_DONE_BY_OTHER);
return w.FinalStatus(); return w.FinalStatus();
} }
// else we are the leader of the write batch group // else we are the leader of the write batch group
assert(w.state == WriteThread::STATE_GROUP_LEADER); assert(w.state == WriteThread::STATE_GROUP_LEADER);
WriteContext context;
mutex_.Lock();
if (!write_options.disableWAL) {
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1);
}
RecordTick(stats_, WRITE_DONE_BY_SELF);
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
// Once reaches this point, the current writer "w" will try to do its write // Once reaches this point, the current writer "w" will try to do its write
// job. It may also pick up some of the remaining writers in the "writers_" // job. It may also pick up some of the remaining writers in the "writers_"
// when it finds suitable, and finish them in the same write batch. // when it finds suitable, and finish them in the same write batch.
// This is how a write job could be done by the other writer. // This is how a write job could be done by the other writer.
assert(!single_column_family_mode_ || WriteContext write_context;
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
if (UNLIKELY(!single_column_family_mode_ &&
total_log_size_ > GetMaxTotalWalSize())) {
MaybeFlushColumnFamilies();
}
if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
// Before a new memtable is added in SwitchMemtable(),
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
// thread is writing to another DB with the same write buffer, they may also
// be flushed. We may end up with flushing much more DBs than needed. It's
// suboptimal but still correct.
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Flushing column family with largest mem table size. Write buffer is "
"using %" PRIu64 " bytes out of a total of %" PRIu64 ".",
write_buffer_manager_->memory_usage(),
write_buffer_manager_->buffer_size());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
ColumnFamilyData* cfd_picked = nullptr;
SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (!cfd->mem()->IsEmpty()) {
// We only consider active mem table, hoping immutable memtable is
// already in the process of flushing.
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
cfd_picked = cfd;
seq_num_for_cf_picked = seq;
}
}
}
if (cfd_picked != nullptr) {
status = SwitchMemtable(cfd_picked, &context);
if (status.ok()) {
cfd_picked->imm()->FlushRequested();
SchedulePendingFlush(cfd_picked);
MaybeScheduleFlushOrCompaction();
}
}
}
if (UNLIKELY(status.ok() && !bg_error_.ok())) {
status = bg_error_;
}
if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
status = ScheduleFlushes(&context);
}
if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
write_controller_.NeedsDelay()))) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_delay_time);
// We don't know size of curent batch so that we always use the size
// for previous one. It might create a fairness issue that expiration
// might happen for smaller writes but larger writes can go through.
// Can optimize it if it is an issue.
status = DelayWrite(last_batch_group_size_, write_options);
PERF_TIMER_START(write_pre_and_post_process_time);
}
uint64_t last_sequence = versions_->LastSequence();
WriteThread::Writer* last_writer = &w; WriteThread::Writer* last_writer = &w;
autovector<WriteThread::Writer*> write_group; autovector<WriteThread::Writer*> write_group;
bool logs_getting_synced = false;
mutex_.Lock();
bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_; bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
status = PreprocessWrite(write_options, need_log_sync, &logs_getting_synced,
&write_context);
uint64_t last_sequence = versions_->LastSequence();
log::Writer* cur_log_writer = logs_.back().writer;
bool logs_getting_synced = false; mutex_.Unlock();
if (status.ok()) {
if (need_log_sync) {
while (logs_.front().getting_synced) {
log_sync_cv_.Wait();
}
for (auto& log : logs_) {
assert(!log.getting_synced);
log.getting_synced = true;
}
logs_getting_synced = true;
}
// Add to log and apply to memtable. We can release the lock // Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging // during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes // and protects against concurrent loggers and concurrent writes
// into memtables // into memtables
}
log::Writer* cur_log_writer = logs_.back().writer;
mutex_.Unlock();
// At this point the mutex is unlocked
bool exit_completed_early = false; bool exit_completed_early = false;
last_batch_group_size_ = last_batch_group_size_ =
@ -4881,110 +4771,44 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
const SequenceNumber current_sequence = last_sequence + 1; const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += total_count; last_sequence += total_count;
// Record statistics // Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully
// commit. That lets us release our leader status early in
// some cases.
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size); RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
RecordTick(stats_, WRITE_DONE_BY_SELF);
auto write_done_by_other = write_group.size() - 1;
if (write_done_by_other > 0) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
write_done_by_other);
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
}
MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
PERF_TIMER_STOP(write_pre_and_post_process_time);
if (write_options.disableWAL) { if (write_options.disableWAL) {
has_unpersisted_data_.store(true, std::memory_order_relaxed); has_unpersisted_data_.store(true, std::memory_order_relaxed);
} }
uint64_t log_size = 0; PERF_TIMER_STOP(write_pre_and_post_process_time);
if (!write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
WriteBatch* merged_batch = nullptr;
if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() &&
write_group[0]->batch->GetWalTerminationPoint().is_cleared()) {
// we simply write the first WriteBatch to WAL if the group only
// contains one batch, that batch should be written to the WAL,
// and the batch is not wanting to be truncated
merged_batch = write_group[0]->batch;
write_group[0]->log_used = logfile_number_;
} else {
// WAL needs all of the batches flattened into a single batch.
// We could avoid copying here with an iov-like AddRecord
// interface
merged_batch = &tmp_batch_;
for (auto writer : write_group) {
if (writer->ShouldWriteToWAL()) {
WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true);
}
writer->log_used = logfile_number_;
}
}
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
status = WriteToWAL(write_group, cur_log_writer, need_log_sync,
need_log_dir_sync, current_sequence);
if (log_used != nullptr) { if (log_used != nullptr) {
*log_used = logfile_number_; *log_used = logfile_number_;
} }
WriteBatchInternal::SetSequence(merged_batch, current_sequence);
Slice log_entry = WriteBatchInternal::Contents(merged_batch);
status = cur_log_writer->AddRecord(log_entry);
total_log_size_ += log_entry.size();
alive_log_files_.back().AddSize(log_entry.size());
log_empty_ = false;
log_size = log_entry.size();
RecordTick(stats_, WAL_FILE_BYTES, log_size);
if (status.ok() && need_log_sync) {
RecordTick(stats_, WAL_FILE_SYNCED);
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
// It's safe to access logs_ with unlocked mutex_ here because:
// - we've set getting_synced=true for all logs,
// so other threads won't pop from logs_ while we're here,
// - only writer thread can push to logs_, and we're in
// writer thread, so no one will push to logs_,
// - as long as other threads don't modify it, it's safe to read
// from std::deque from multiple threads concurrently.
for (auto& log : logs_) {
status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
if (!status.ok()) {
break;
}
}
if (status.ok() && need_log_dir_sync) {
// We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync,
// we can avoid the disk I/O in the write code path.
status = directories_.GetWalDir()->Fsync();
}
} }
if (merged_batch == &tmp_batch_) {
tmp_batch_.Clear();
}
}
if (status.ok()) { if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time); PERF_TIMER_GUARD(write_memtable_time);
{
// Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully
// commit. That lets us release our leader status early in
// some cases.
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count);
if (!write_options.disableWAL) {
if (write_options.sync) {
stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1);
}
stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size);
}
uint64_t for_other = write_group.size() - 1;
if (for_other > 0) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, for_other);
if (!write_options.disableWAL) {
stats->AddDBStats(InternalStats::WRITE_WITH_WAL, for_other);
}
}
}
if (!parallel) { if (!parallel) {
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(), write_group, current_sequence, column_family_memtables_.get(),
@ -5070,11 +4894,147 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return status; return status;
} }
void DBImpl::MaybeFlushColumnFamilies() { Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
bool need_log_sync, bool* logs_getting_synced,
WriteContext* write_context) {
mutex_.AssertHeld();
assert(write_context != nullptr && logs_getting_synced != nullptr);
Status status;
assert(!single_column_family_mode_ ||
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
total_log_size_ > GetMaxTotalWalSize())) {
status = HandleWALFull(write_context);
}
if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
// Before a new memtable is added in SwitchMemtable(),
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
// thread is writing to another DB with the same write buffer, they may also
// be flushed. We may end up with flushing much more DBs than needed. It's
// suboptimal but still correct.
status = HandleWriteBufferFull(write_context);
}
if (UNLIKELY(status.ok() && !bg_error_.ok())) {
status = bg_error_;
}
if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
status = ScheduleFlushes(write_context);
}
if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
write_controller_.NeedsDelay()))) {
PERF_TIMER_GUARD(write_delay_time);
// We don't know size of curent batch so that we always use the size
// for previous one. It might create a fairness issue that expiration
// might happen for smaller writes but larger writes can go through.
// Can optimize it if it is an issue.
status = DelayWrite(last_batch_group_size_, write_options);
}
if (status.ok() && need_log_sync) {
while (logs_.front().getting_synced) {
log_sync_cv_.Wait();
}
for (auto& log : logs_) {
assert(!log.getting_synced);
log.getting_synced = true;
}
*logs_getting_synced = true;
}
return status;
}
Status DBImpl::WriteToWAL(const autovector<WriteThread::Writer*>& write_group,
log::Writer* log_writer, bool need_log_sync,
bool need_log_dir_sync, SequenceNumber sequence) {
Status status;
WriteBatch* merged_batch = nullptr;
size_t write_with_wal = 0;
if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() &&
write_group[0]->batch->GetWalTerminationPoint().is_cleared()) {
// we simply write the first WriteBatch to WAL if the group only
// contains one batch, that batch should be written to the WAL,
// and the batch is not wanting to be truncated
merged_batch = write_group[0]->batch;
write_group[0]->log_used = logfile_number_;
write_with_wal = 1;
} else {
// WAL needs all of the batches flattened into a single batch.
// We could avoid copying here with an iov-like AddRecord
// interface
merged_batch = &tmp_batch_;
for (auto writer : write_group) {
if (writer->ShouldWriteToWAL()) {
WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true);
write_with_wal++;
}
writer->log_used = logfile_number_;
}
}
WriteBatchInternal::SetSequence(merged_batch, sequence);
Slice log_entry = WriteBatchInternal::Contents(merged_batch);
status = log_writer->AddRecord(log_entry);
total_log_size_ += log_entry.size();
alive_log_files_.back().AddSize(log_entry.size());
log_empty_ = false;
uint64_t log_size = log_entry.size();
if (status.ok() && need_log_sync) {
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
// It's safe to access logs_ with unlocked mutex_ here because:
// - we've set getting_synced=true for all logs,
// so other threads won't pop from logs_ while we're here,
// - only writer thread can push to logs_, and we're in
// writer thread, so no one will push to logs_,
// - as long as other threads don't modify it, it's safe to read
// from std::deque from multiple threads concurrently.
for (auto& log : logs_) {
status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
if (!status.ok()) {
break;
}
}
if (status.ok() && need_log_dir_sync) {
// We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync,
// we can avoid the disk I/O in the write code path.
status = directories_.GetWalDir()->Fsync();
}
}
if (merged_batch == &tmp_batch_) {
tmp_batch_.Clear();
}
if (status.ok()) {
auto stats = default_cf_internal_stats_;
if (need_log_sync) {
stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1);
RecordTick(stats_, WAL_FILE_SYNCED);
}
stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size);
RecordTick(stats_, WAL_FILE_BYTES, log_size);
stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
}
return status;
}
Status DBImpl::HandleWALFull(WriteContext* write_context) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(write_context != nullptr);
Status status;
if (alive_log_files_.begin()->getting_flushed) { if (alive_log_files_.begin()->getting_flushed) {
return; return status;
} }
auto oldest_alive_log = alive_log_files_.begin()->number; auto oldest_alive_log = alive_log_files_.begin()->number;
@ -5088,7 +5048,7 @@ void DBImpl::MaybeFlushColumnFamilies() {
// the oldest alive log but the log still contained uncommited transactions. // the oldest alive log but the log still contained uncommited transactions.
// the oldest alive log STILL contains uncommited transaction so there // the oldest alive log STILL contains uncommited transaction so there
// is still nothing that we can do. // is still nothing that we can do.
return; return status;
} else { } else {
ROCKS_LOG_WARN( ROCKS_LOG_WARN(
immutable_db_options_.info_log, immutable_db_options_.info_log,
@ -5103,8 +5063,6 @@ void DBImpl::MaybeFlushColumnFamilies() {
alive_log_files_.begin()->getting_flushed = true; alive_log_files_.begin()->getting_flushed = true;
} }
WriteContext context;
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Flushing all column families with data in WAL number %" PRIu64 "Flushing all column families with data in WAL number %" PRIu64
". Total log size is %" PRIu64 ". Total log size is %" PRIu64
@ -5117,7 +5075,7 @@ void DBImpl::MaybeFlushColumnFamilies() {
continue; continue;
} }
if (cfd->OldestLogToKeep() <= oldest_alive_log) { if (cfd->OldestLogToKeep() <= oldest_alive_log) {
auto status = SwitchMemtable(cfd, &context); status = SwitchMemtable(cfd, write_context);
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
@ -5126,7 +5084,53 @@ void DBImpl::MaybeFlushColumnFamilies() {
} }
} }
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
return status;
}
Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
mutex_.AssertHeld();
assert(write_context != nullptr);
Status status;
// Before a new memtable is added in SwitchMemtable(),
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
// thread is writing to another DB with the same write buffer, they may also
// be flushed. We may end up with flushing much more DBs than needed. It's
// suboptimal but still correct.
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Flushing column family with largest mem table size. Write buffer is "
"using %" PRIu64 " bytes out of a total of %" PRIu64 ".",
write_buffer_manager_->memory_usage(),
write_buffer_manager_->buffer_size());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
ColumnFamilyData* cfd_picked = nullptr;
SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (!cfd->mem()->IsEmpty()) {
// We only consider active mem table, hoping immutable memtable is
// already in the process of flushing.
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
cfd_picked = cfd;
seq_num_for_cf_picked = seq;
}
}
}
if (cfd_picked != nullptr) {
status = SwitchMemtable(cfd_picked, write_context);
if (status.ok()) {
cfd_picked->imm()->FlushRequested();
SchedulePendingFlush(cfd_picked);
MaybeScheduleFlushOrCompaction();
}
}
return status;
} }
uint64_t DBImpl::GetMaxTotalWalSize() const { uint64_t DBImpl::GetMaxTotalWalSize() const {

@ -314,7 +314,7 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family = nullptr, ColumnFamilyHandle* column_family = nullptr,
bool disallow_trivial_move = false); bool disallow_trivial_move = false);
void TEST_MaybeFlushColumnFamilies(); void TEST_HandleWALFull();
bool TEST_UnableToFlushOldestLog() { bool TEST_UnableToFlushOldestLog() {
return unable_to_flush_oldest_log_; return unable_to_flush_oldest_log_;
@ -600,7 +600,19 @@ class DBImpl : public DB {
#endif #endif
struct CompactionState; struct CompactionState;
struct WriteContext; struct WriteContext {
autovector<SuperVersion*> superversions_to_free_;
autovector<MemTable*> memtables_to_free_;
~WriteContext() {
for (auto& sv : superversions_to_free_) {
delete sv;
}
for (auto& m : memtables_to_free_) {
delete m;
}
}
};
struct PurgeFileInfo; struct PurgeFileInfo;
@ -677,6 +689,20 @@ class DBImpl : public DB {
// Wait for memtable flushed // Wait for memtable flushed
Status WaitForFlushMemTable(ColumnFamilyData* cfd); Status WaitForFlushMemTable(ColumnFamilyData* cfd);
// REQUIRES: mutex locked
Status HandleWALFull(WriteContext* write_context);
// REQUIRES: mutex locked
Status HandleWriteBufferFull(WriteContext* write_context);
// REQUIRES: mutex locked
Status PreprocessWrite(const WriteOptions& write_options, bool need_log_sync,
bool* logs_getting_syned, WriteContext* write_context);
Status WriteToWAL(const autovector<WriteThread::Writer*>& write_group,
log::Writer* log_writer, bool need_log_sync,
bool need_log_dir_sync, SequenceNumber sequence);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
Status CompactFilesImpl(const CompactionOptions& compact_options, Status CompactFilesImpl(const CompactionOptions& compact_options,
@ -740,12 +766,6 @@ class DBImpl : public DB {
const Snapshot* GetSnapshotImpl(bool is_write_conflict_boundary); const Snapshot* GetSnapshotImpl(bool is_write_conflict_boundary);
// Persist RocksDB options under the single write thread
// REQUIRES: mutex locked
Status PersistOptions();
void MaybeFlushColumnFamilies();
uint64_t GetMaxTotalWalSize() const; uint64_t GetMaxTotalWalSize() const;
// table_cache_ provides its own synchronization // table_cache_ provides its own synchronization

@ -19,9 +19,10 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0); return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
} }
void DBImpl::TEST_MaybeFlushColumnFamilies() { void DBImpl::TEST_HandleWALFull() {
WriteContext write_context;
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
MaybeFlushColumnFamilies(); HandleWALFull(&write_context);
} }
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(

@ -274,7 +274,7 @@ size_t WriteThread::EnterAsBatchGroupLeader(
break; break;
} }
if (!w->disableWAL && leader->disableWAL) { if (!w->disable_wal && leader->disable_wal) {
// Do not include a write that needs WAL into a batch that has // Do not include a write that needs WAL into a batch that has
// WAL disabled. // WAL disabled.
break; break;

@ -15,6 +15,7 @@
#include <vector> #include <vector>
#include "db/write_callback.h" #include "db/write_callback.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
@ -80,7 +81,7 @@ class WriteThread {
WriteBatch* batch; WriteBatch* batch;
bool sync; bool sync;
bool no_slowdown; bool no_slowdown;
bool disableWAL; bool disable_wal;
bool disable_memtable; bool disable_memtable;
uint64_t log_used; // log number that this batch was inserted into uint64_t log_used; // log number that this batch was inserted into
uint64_t log_ref; // log number that memtable insert should reference uint64_t log_ref; // log number that memtable insert should reference
@ -101,7 +102,7 @@ class WriteThread {
: batch(nullptr), : batch(nullptr),
sync(false), sync(false),
no_slowdown(false), no_slowdown(false),
disableWAL(false), disable_wal(false),
disable_memtable(false), disable_memtable(false),
log_used(0), log_used(0),
log_ref(0), log_ref(0),
@ -113,6 +114,23 @@ class WriteThread {
link_older(nullptr), link_older(nullptr),
link_newer(nullptr) {} link_newer(nullptr) {}
Writer(const WriteOptions& write_options, WriteBatch* _batch,
WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable)
: batch(_batch),
sync(write_options.sync),
no_slowdown(write_options.no_slowdown),
disable_wal(write_options.disableWAL),
disable_memtable(_disable_memtable),
log_used(0),
log_ref(_log_ref),
in_batch_group(false),
callback(_callback),
made_waitable(false),
state(STATE_INIT),
parallel_group(nullptr),
link_older(nullptr),
link_newer(nullptr) {}
~Writer() { ~Writer() {
if (made_waitable) { if (made_waitable) {
StateMutex().~mutex(); StateMutex().~mutex();
@ -166,7 +184,7 @@ class WriteThread {
return !CallbackFailed() && !disable_memtable; return !CallbackFailed() && !disable_memtable;
} }
bool ShouldWriteToWAL() { return !CallbackFailed() && !disableWAL; } bool ShouldWriteToWAL() { return !CallbackFailed() && !disable_wal; }
// No other mutexes may be acquired while holding StateMutex(), it is // No other mutexes may be acquired while holding StateMutex(), it is
// always last in the order // always last in the order

@ -1429,7 +1429,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
// request a flush for all column families such that the earliest // request a flush for all column families such that the earliest
// alive log file can be killed // alive log file can be killed
db_impl->TEST_MaybeFlushColumnFamilies(); db_impl->TEST_HandleWALFull();
// log cannot be flushed because txn2 has not been commited // log cannot be flushed because txn2 has not been commited
ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed()); ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog()); ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog());
@ -1444,7 +1444,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
s = txn2->Commit(); s = txn2->Commit();
ASSERT_OK(s); ASSERT_OK(s);
db_impl->TEST_MaybeFlushColumnFamilies(); db_impl->TEST_HandleWALFull();
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
// we should see that cfb now has a flush requested // we should see that cfb now has a flush requested

Loading…
Cancel
Save