From 9e44531803348fff8a7b8878dd4230fddabf9e8d Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 4 Apr 2017 10:19:33 -0700 Subject: [PATCH] Refactor WriteImpl (pipeline write part 1) Summary: Refactor WriteImpl() so when I plug-in the pipeline write code (which is an alternative approach for WriteThread), some of the logic can be reuse. I split out the following methods from WriteImpl(): * PreprocessWrite() * HandleWALFull() (previous MaybeFlushColumnFamilies()) * HandleWriteBufferFull() * WriteToWAL() Also adding a constructor to WriteThread::Writer, and move WriteContext into db_impl.h. No real logic change in this patch. Closes https://github.com/facebook/rocksdb/pull/2042 Differential Revision: D4781014 Pulled By: yiwu-arbug fbshipit-source-id: d45ca18 --- db/db_impl.cc | 470 +++++++++++---------- db/db_impl.h | 36 +- db/db_impl_debug.cc | 5 +- db/write_thread.cc | 2 +- db/write_thread.h | 24 +- utilities/transactions/transaction_test.cc | 4 +- 6 files changed, 292 insertions(+), 249 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index a86e7d481..8c9aa5f12 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -106,20 +106,6 @@ const std::string kDefaultColumnFamilyName("default"); void DumpRocksDBBuildVersion(Logger * log); -struct DBImpl::WriteContext { - autovector superversions_to_free_; - autovector memtables_to_free_; - - ~WriteContext() { - for (auto& sv : superversions_to_free_) { - delete sv; - } - for (auto& m : memtables_to_free_) { - delete m; - } - } -}; - Options SanitizeOptions(const std::string& dbname, const Options& src) { auto db_options = SanitizeOptions(dbname, DBOptions(src)); @@ -2459,6 +2445,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, MutableCFOptions new_options; Status s; Status persist_options_status; + WriteThread::Writer w; { InstrumentedMutexLock l(&mutex_); s = cfd->SetOptions(options_map); @@ -2475,7 +2462,9 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); delete old_sv; - persist_options_status = PersistOptions(); + write_thread_.EnterUnbatched(&w, &mutex_); + persist_options_status = WriteOptionsFile(); + write_thread_.ExitUnbatched(&w); } } @@ -2523,6 +2512,8 @@ Status DBImpl::SetDBOptions( MutableDBOptions new_options; Status s; Status persist_options_status; + WriteThread::Writer w; + WriteContext write_context; { InstrumentedMutexLock l(&mutex_); s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map, @@ -2539,11 +2530,17 @@ Status DBImpl::SetDBOptions( mutable_db_options_ = new_options; + write_thread_.EnterUnbatched(&w, &mutex_); if (total_log_size_ > GetMaxTotalWalSize()) { - MaybeFlushColumnFamilies(); + Status purge_wal_status = HandleWALFull(&write_context); + if (!purge_wal_status.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Unable to purge WAL files in SetDBOptions() -- %s", + purge_wal_status.ToString().c_str()); + } } - - persist_options_status = PersistOptions(); + persist_options_status = WriteOptionsFile(); + write_thread_.ExitUnbatched(&w); } } ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:"); @@ -2572,15 +2569,6 @@ Status DBImpl::SetDBOptions( #endif // ROCKSDB_LITE } -Status DBImpl::PersistOptions() { - mutex_.AssertHeld(); - WriteThread::Writer w; - write_thread_.EnterUnbatched(&w, &mutex_); - Status s = WriteOptionsFile(); - write_thread_.ExitUnbatched(&w); - return s; -} - // return the same level if it cannot be moved int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, int level) { @@ -4674,14 +4662,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, Status status; PERF_TIMER_GUARD(write_pre_and_post_process_time); - WriteThread::Writer w; - w.batch = my_batch; - w.sync = write_options.sync; - w.disableWAL = write_options.disableWAL; - w.disable_memtable = disable_memtable; - w.in_batch_group = false; - w.callback = callback; - w.log_ref = log_ref; + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + disable_memtable); if (!write_options.disableWAL) { RecordTick(stats_, WRITE_WITH_WAL); @@ -4694,10 +4676,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // we are a non-leader in a parallel group PERF_TIMER_GUARD(write_memtable_time); - if (log_used != nullptr) { - *log_used = w.log_used; - } - if (w.ShouldWriteToMemtable()) { ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); @@ -4724,123 +4702,35 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, *log_used = w.log_used; } // write is complete and leader has updated sequence - RecordTick(stats_, WRITE_DONE_BY_OTHER); return w.FinalStatus(); } // else we are the leader of the write batch group assert(w.state == WriteThread::STATE_GROUP_LEADER); - WriteContext context; - mutex_.Lock(); - - if (!write_options.disableWAL) { - default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1); - } - - RecordTick(stats_, WRITE_DONE_BY_SELF); - default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); - // Once reaches this point, the current writer "w" will try to do its write // job. It may also pick up some of the remaining writers in the "writers_" // when it finds suitable, and finish them in the same write batch. // This is how a write job could be done by the other writer. - assert(!single_column_family_mode_ || - versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); - - if (UNLIKELY(!single_column_family_mode_ && - total_log_size_ > GetMaxTotalWalSize())) { - MaybeFlushColumnFamilies(); - } - if (UNLIKELY(write_buffer_manager_->ShouldFlush())) { - // Before a new memtable is added in SwitchMemtable(), - // write_buffer_manager_->ShouldFlush() will keep returning true. If another - // thread is writing to another DB with the same write buffer, they may also - // be flushed. We may end up with flushing much more DBs than needed. It's - // suboptimal but still correct. - ROCKS_LOG_INFO( - immutable_db_options_.info_log, - "Flushing column family with largest mem table size. Write buffer is " - "using %" PRIu64 " bytes out of a total of %" PRIu64 ".", - write_buffer_manager_->memory_usage(), - write_buffer_manager_->buffer_size()); - // no need to refcount because drop is happening in write thread, so can't - // happen while we're in the write thread - ColumnFamilyData* cfd_picked = nullptr; - SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; - - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } - if (!cfd->mem()->IsEmpty()) { - // We only consider active mem table, hoping immutable memtable is - // already in the process of flushing. - uint64_t seq = cfd->mem()->GetCreationSeq(); - if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { - cfd_picked = cfd; - seq_num_for_cf_picked = seq; - } - } - } - if (cfd_picked != nullptr) { - status = SwitchMemtable(cfd_picked, &context); - if (status.ok()) { - cfd_picked->imm()->FlushRequested(); - SchedulePendingFlush(cfd_picked); - MaybeScheduleFlushOrCompaction(); - } - } - } - - if (UNLIKELY(status.ok() && !bg_error_.ok())) { - status = bg_error_; - } - - if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { - status = ScheduleFlushes(&context); - } - - if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || - write_controller_.NeedsDelay()))) { - PERF_TIMER_STOP(write_pre_and_post_process_time); - PERF_TIMER_GUARD(write_delay_time); - // We don't know size of curent batch so that we always use the size - // for previous one. It might create a fairness issue that expiration - // might happen for smaller writes but larger writes can go through. - // Can optimize it if it is an issue. - status = DelayWrite(last_batch_group_size_, write_options); - PERF_TIMER_START(write_pre_and_post_process_time); - } - - uint64_t last_sequence = versions_->LastSequence(); + WriteContext write_context; WriteThread::Writer* last_writer = &w; autovector write_group; - bool need_log_sync = !write_options.disableWAL && write_options.sync; - bool need_log_dir_sync = need_log_sync && !log_dir_synced_; - bool logs_getting_synced = false; - if (status.ok()) { - if (need_log_sync) { - while (logs_.front().getting_synced) { - log_sync_cv_.Wait(); - } - for (auto& log : logs_) { - assert(!log.getting_synced); - log.getting_synced = true; - } - logs_getting_synced = true; - } - // Add to log and apply to memtable. We can release the lock - // during this phase since &w is currently responsible for logging - // and protects against concurrent loggers and concurrent writes - // into memtables - } + mutex_.Lock(); + + bool need_log_sync = !write_options.disableWAL && write_options.sync; + bool need_log_dir_sync = need_log_sync && !log_dir_synced_; + status = PreprocessWrite(write_options, need_log_sync, &logs_getting_synced, + &write_context); + uint64_t last_sequence = versions_->LastSequence(); log::Writer* cur_log_writer = logs_.back().writer; mutex_.Unlock(); - // At this point the mutex is unlocked + // Add to log and apply to memtable. We can release the lock + // during this phase since &w is currently responsible for logging + // and protects against concurrent loggers and concurrent writes + // into memtables bool exit_completed_early = false; last_batch_group_size_ = @@ -4881,110 +4771,44 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, const SequenceNumber current_sequence = last_sequence + 1; last_sequence += total_count; - // Record statistics + // Update stats while we are an exclusive group leader, so we know + // that nobody else can be writing to these particular stats. + // We're optimistic, updating the stats before we successfully + // commit. That lets us release our leader status early in + // some cases. + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); + RecordTick(stats_, WRITE_DONE_BY_SELF); + auto write_done_by_other = write_group.size() - 1; + if (write_done_by_other > 0) { + stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, + write_done_by_other); + RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); + } MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); - PERF_TIMER_STOP(write_pre_and_post_process_time); if (write_options.disableWAL) { has_unpersisted_data_.store(true, std::memory_order_relaxed); } - uint64_t log_size = 0; - if (!write_options.disableWAL) { - PERF_TIMER_GUARD(write_wal_time); - - WriteBatch* merged_batch = nullptr; - if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() && - write_group[0]->batch->GetWalTerminationPoint().is_cleared()) { - // we simply write the first WriteBatch to WAL if the group only - // contains one batch, that batch should be written to the WAL, - // and the batch is not wanting to be truncated - merged_batch = write_group[0]->batch; - write_group[0]->log_used = logfile_number_; - } else { - // WAL needs all of the batches flattened into a single batch. - // We could avoid copying here with an iov-like AddRecord - // interface - merged_batch = &tmp_batch_; - for (auto writer : write_group) { - if (writer->ShouldWriteToWAL()) { - WriteBatchInternal::Append(merged_batch, writer->batch, - /*WAL_only*/ true); - } - writer->log_used = logfile_number_; - } - } + PERF_TIMER_STOP(write_pre_and_post_process_time); + if (status.ok() && !write_options.disableWAL) { + PERF_TIMER_GUARD(write_wal_time); + status = WriteToWAL(write_group, cur_log_writer, need_log_sync, + need_log_dir_sync, current_sequence); if (log_used != nullptr) { *log_used = logfile_number_; } - - WriteBatchInternal::SetSequence(merged_batch, current_sequence); - - Slice log_entry = WriteBatchInternal::Contents(merged_batch); - status = cur_log_writer->AddRecord(log_entry); - total_log_size_ += log_entry.size(); - alive_log_files_.back().AddSize(log_entry.size()); - log_empty_ = false; - log_size = log_entry.size(); - RecordTick(stats_, WAL_FILE_BYTES, log_size); - if (status.ok() && need_log_sync) { - RecordTick(stats_, WAL_FILE_SYNCED); - StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); - // It's safe to access logs_ with unlocked mutex_ here because: - // - we've set getting_synced=true for all logs, - // so other threads won't pop from logs_ while we're here, - // - only writer thread can push to logs_, and we're in - // writer thread, so no one will push to logs_, - // - as long as other threads don't modify it, it's safe to read - // from std::deque from multiple threads concurrently. - for (auto& log : logs_) { - status = log.writer->file()->Sync(immutable_db_options_.use_fsync); - if (!status.ok()) { - break; - } - } - if (status.ok() && need_log_dir_sync) { - // We only sync WAL directory the first time WAL syncing is - // requested, so that in case users never turn on WAL sync, - // we can avoid the disk I/O in the write code path. - status = directories_.GetWalDir()->Fsync(); - } - } - - if (merged_batch == &tmp_batch_) { - tmp_batch_.Clear(); - } } + if (status.ok()) { PERF_TIMER_GUARD(write_memtable_time); - { - // Update stats while we are an exclusive group leader, so we know - // that nobody else can be writing to these particular stats. - // We're optimistic, updating the stats before we successfully - // commit. That lets us release our leader status early in - // some cases. - auto stats = default_cf_internal_stats_; - stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); - stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); - if (!write_options.disableWAL) { - if (write_options.sync) { - stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1); - } - stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size); - } - uint64_t for_other = write_group.size() - 1; - if (for_other > 0) { - stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, for_other); - if (!write_options.disableWAL) { - stats->AddDBStats(InternalStats::WRITE_WITH_WAL, for_other); - } - } - } - if (!parallel) { status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), @@ -5070,11 +4894,147 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return status; } -void DBImpl::MaybeFlushColumnFamilies() { +Status DBImpl::PreprocessWrite(const WriteOptions& write_options, + bool need_log_sync, bool* logs_getting_synced, + WriteContext* write_context) { mutex_.AssertHeld(); + assert(write_context != nullptr && logs_getting_synced != nullptr); + Status status; + + assert(!single_column_family_mode_ || + versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); + if (UNLIKELY(status.ok() && !single_column_family_mode_ && + total_log_size_ > GetMaxTotalWalSize())) { + status = HandleWALFull(write_context); + } + + if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) { + // Before a new memtable is added in SwitchMemtable(), + // write_buffer_manager_->ShouldFlush() will keep returning true. If another + // thread is writing to another DB with the same write buffer, they may also + // be flushed. We may end up with flushing much more DBs than needed. It's + // suboptimal but still correct. + status = HandleWriteBufferFull(write_context); + } + + if (UNLIKELY(status.ok() && !bg_error_.ok())) { + status = bg_error_; + } + + if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { + status = ScheduleFlushes(write_context); + } + + if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || + write_controller_.NeedsDelay()))) { + PERF_TIMER_GUARD(write_delay_time); + // We don't know size of curent batch so that we always use the size + // for previous one. It might create a fairness issue that expiration + // might happen for smaller writes but larger writes can go through. + // Can optimize it if it is an issue. + status = DelayWrite(last_batch_group_size_, write_options); + } + + if (status.ok() && need_log_sync) { + while (logs_.front().getting_synced) { + log_sync_cv_.Wait(); + } + for (auto& log : logs_) { + assert(!log.getting_synced); + log.getting_synced = true; + } + *logs_getting_synced = true; + } + + return status; +} + +Status DBImpl::WriteToWAL(const autovector& write_group, + log::Writer* log_writer, bool need_log_sync, + bool need_log_dir_sync, SequenceNumber sequence) { + Status status; + + WriteBatch* merged_batch = nullptr; + size_t write_with_wal = 0; + if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() && + write_group[0]->batch->GetWalTerminationPoint().is_cleared()) { + // we simply write the first WriteBatch to WAL if the group only + // contains one batch, that batch should be written to the WAL, + // and the batch is not wanting to be truncated + merged_batch = write_group[0]->batch; + write_group[0]->log_used = logfile_number_; + write_with_wal = 1; + } else { + // WAL needs all of the batches flattened into a single batch. + // We could avoid copying here with an iov-like AddRecord + // interface + merged_batch = &tmp_batch_; + for (auto writer : write_group) { + if (writer->ShouldWriteToWAL()) { + WriteBatchInternal::Append(merged_batch, writer->batch, + /*WAL_only*/ true); + write_with_wal++; + } + writer->log_used = logfile_number_; + } + } + + WriteBatchInternal::SetSequence(merged_batch, sequence); + + Slice log_entry = WriteBatchInternal::Contents(merged_batch); + status = log_writer->AddRecord(log_entry); + total_log_size_ += log_entry.size(); + alive_log_files_.back().AddSize(log_entry.size()); + log_empty_ = false; + uint64_t log_size = log_entry.size(); + + if (status.ok() && need_log_sync) { + StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); + // It's safe to access logs_ with unlocked mutex_ here because: + // - we've set getting_synced=true for all logs, + // so other threads won't pop from logs_ while we're here, + // - only writer thread can push to logs_, and we're in + // writer thread, so no one will push to logs_, + // - as long as other threads don't modify it, it's safe to read + // from std::deque from multiple threads concurrently. + for (auto& log : logs_) { + status = log.writer->file()->Sync(immutable_db_options_.use_fsync); + if (!status.ok()) { + break; + } + } + if (status.ok() && need_log_dir_sync) { + // We only sync WAL directory the first time WAL syncing is + // requested, so that in case users never turn on WAL sync, + // we can avoid the disk I/O in the write code path. + status = directories_.GetWalDir()->Fsync(); + } + } + + if (merged_batch == &tmp_batch_) { + tmp_batch_.Clear(); + } + if (status.ok()) { + auto stats = default_cf_internal_stats_; + if (need_log_sync) { + stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1); + RecordTick(stats_, WAL_FILE_SYNCED); + } + stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size); + RecordTick(stats_, WAL_FILE_BYTES, log_size); + stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal); + RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); + } + return status; +} + +Status DBImpl::HandleWALFull(WriteContext* write_context) { + mutex_.AssertHeld(); + assert(write_context != nullptr); + Status status; if (alive_log_files_.begin()->getting_flushed) { - return; + return status; } auto oldest_alive_log = alive_log_files_.begin()->number; @@ -5088,7 +5048,7 @@ void DBImpl::MaybeFlushColumnFamilies() { // the oldest alive log but the log still contained uncommited transactions. // the oldest alive log STILL contains uncommited transaction so there // is still nothing that we can do. - return; + return status; } else { ROCKS_LOG_WARN( immutable_db_options_.info_log, @@ -5103,8 +5063,6 @@ void DBImpl::MaybeFlushColumnFamilies() { alive_log_files_.begin()->getting_flushed = true; } - WriteContext context; - ROCKS_LOG_INFO(immutable_db_options_.info_log, "Flushing all column families with data in WAL number %" PRIu64 ". Total log size is %" PRIu64 @@ -5117,7 +5075,7 @@ void DBImpl::MaybeFlushColumnFamilies() { continue; } if (cfd->OldestLogToKeep() <= oldest_alive_log) { - auto status = SwitchMemtable(cfd, &context); + status = SwitchMemtable(cfd, write_context); if (!status.ok()) { break; } @@ -5126,7 +5084,53 @@ void DBImpl::MaybeFlushColumnFamilies() { } } MaybeScheduleFlushOrCompaction(); + return status; +} + +Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { + mutex_.AssertHeld(); + assert(write_context != nullptr); + Status status; + // Before a new memtable is added in SwitchMemtable(), + // write_buffer_manager_->ShouldFlush() will keep returning true. If another + // thread is writing to another DB with the same write buffer, they may also + // be flushed. We may end up with flushing much more DBs than needed. It's + // suboptimal but still correct. + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Flushing column family with largest mem table size. Write buffer is " + "using %" PRIu64 " bytes out of a total of %" PRIu64 ".", + write_buffer_manager_->memory_usage(), + write_buffer_manager_->buffer_size()); + // no need to refcount because drop is happening in write thread, so can't + // happen while we're in the write thread + ColumnFamilyData* cfd_picked = nullptr; + SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; + + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + if (!cfd->mem()->IsEmpty()) { + // We only consider active mem table, hoping immutable memtable is + // already in the process of flushing. + uint64_t seq = cfd->mem()->GetCreationSeq(); + if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { + cfd_picked = cfd; + seq_num_for_cf_picked = seq; + } + } + } + if (cfd_picked != nullptr) { + status = SwitchMemtable(cfd_picked, write_context); + if (status.ok()) { + cfd_picked->imm()->FlushRequested(); + SchedulePendingFlush(cfd_picked); + MaybeScheduleFlushOrCompaction(); + } + } + return status; } uint64_t DBImpl::GetMaxTotalWalSize() const { diff --git a/db/db_impl.h b/db/db_impl.h index 0fa231d07..2be81374e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -314,7 +314,7 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family = nullptr, bool disallow_trivial_move = false); - void TEST_MaybeFlushColumnFamilies(); + void TEST_HandleWALFull(); bool TEST_UnableToFlushOldestLog() { return unable_to_flush_oldest_log_; @@ -600,7 +600,19 @@ class DBImpl : public DB { #endif struct CompactionState; - struct WriteContext; + struct WriteContext { + autovector superversions_to_free_; + autovector memtables_to_free_; + + ~WriteContext() { + for (auto& sv : superversions_to_free_) { + delete sv; + } + for (auto& m : memtables_to_free_) { + delete m; + } + } + }; struct PurgeFileInfo; @@ -677,6 +689,20 @@ class DBImpl : public DB { // Wait for memtable flushed Status WaitForFlushMemTable(ColumnFamilyData* cfd); + // REQUIRES: mutex locked + Status HandleWALFull(WriteContext* write_context); + + // REQUIRES: mutex locked + Status HandleWriteBufferFull(WriteContext* write_context); + + // REQUIRES: mutex locked + Status PreprocessWrite(const WriteOptions& write_options, bool need_log_sync, + bool* logs_getting_syned, WriteContext* write_context); + + Status WriteToWAL(const autovector& write_group, + log::Writer* log_writer, bool need_log_sync, + bool need_log_dir_sync, SequenceNumber sequence); + #ifndef ROCKSDB_LITE Status CompactFilesImpl(const CompactionOptions& compact_options, @@ -740,12 +766,6 @@ class DBImpl : public DB { const Snapshot* GetSnapshotImpl(bool is_write_conflict_boundary); - // Persist RocksDB options under the single write thread - // REQUIRES: mutex locked - Status PersistOptions(); - - void MaybeFlushColumnFamilies(); - uint64_t GetMaxTotalWalSize() const; // table_cache_ provides its own synchronization diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index a69d2658d..8df5601d5 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -19,9 +19,10 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() { return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0); } -void DBImpl::TEST_MaybeFlushColumnFamilies() { +void DBImpl::TEST_HandleWALFull() { + WriteContext write_context; InstrumentedMutexLock l(&mutex_); - MaybeFlushColumnFamilies(); + HandleWALFull(&write_context); } int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( diff --git a/db/write_thread.cc b/db/write_thread.cc index 66cec0e1b..048eca42c 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -274,7 +274,7 @@ size_t WriteThread::EnterAsBatchGroupLeader( break; } - if (!w->disableWAL && leader->disableWAL) { + if (!w->disable_wal && leader->disable_wal) { // Do not include a write that needs WAL into a batch that has // WAL disabled. break; diff --git a/db/write_thread.h b/db/write_thread.h index 7cad1d0d5..5af6547b9 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -15,6 +15,7 @@ #include #include "db/write_callback.h" +#include "rocksdb/options.h" #include "rocksdb/status.h" #include "rocksdb/types.h" #include "rocksdb/write_batch.h" @@ -80,7 +81,7 @@ class WriteThread { WriteBatch* batch; bool sync; bool no_slowdown; - bool disableWAL; + bool disable_wal; bool disable_memtable; uint64_t log_used; // log number that this batch was inserted into uint64_t log_ref; // log number that memtable insert should reference @@ -101,7 +102,7 @@ class WriteThread { : batch(nullptr), sync(false), no_slowdown(false), - disableWAL(false), + disable_wal(false), disable_memtable(false), log_used(0), log_ref(0), @@ -113,6 +114,23 @@ class WriteThread { link_older(nullptr), link_newer(nullptr) {} + Writer(const WriteOptions& write_options, WriteBatch* _batch, + WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable) + : batch(_batch), + sync(write_options.sync), + no_slowdown(write_options.no_slowdown), + disable_wal(write_options.disableWAL), + disable_memtable(_disable_memtable), + log_used(0), + log_ref(_log_ref), + in_batch_group(false), + callback(_callback), + made_waitable(false), + state(STATE_INIT), + parallel_group(nullptr), + link_older(nullptr), + link_newer(nullptr) {} + ~Writer() { if (made_waitable) { StateMutex().~mutex(); @@ -166,7 +184,7 @@ class WriteThread { return !CallbackFailed() && !disable_memtable; } - bool ShouldWriteToWAL() { return !CallbackFailed() && !disableWAL; } + bool ShouldWriteToWAL() { return !CallbackFailed() && !disable_wal; } // No other mutexes may be acquired while holding StateMutex(), it is // always last in the order diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 0866f4ffd..2138d48a4 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -1429,7 +1429,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { // request a flush for all column families such that the earliest // alive log file can be killed - db_impl->TEST_MaybeFlushColumnFamilies(); + db_impl->TEST_HandleWALFull(); // log cannot be flushed because txn2 has not been commited ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed()); ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog()); @@ -1444,7 +1444,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { s = txn2->Commit(); ASSERT_OK(s); - db_impl->TEST_MaybeFlushColumnFamilies(); + db_impl->TEST_HandleWALFull(); ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); // we should see that cfb now has a flush requested