From 1e34d07e182e4c4e54fc1a8cdd1cf1a228089f06 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Fri, 30 Jun 2017 09:30:03 -0700 Subject: [PATCH] Simplify and document sync rules for logs_ etc Summary: Adding/Correcting inline comments and clarify the sync rules. To make it simple to reason, the rules are a big general which ended up to some extra synchronizations. However such synchronizations are not on the fast path, and they are worth the simplicity. Closes https://github.com/facebook/rocksdb/pull/2517 Differential Revision: D5348239 Pulled By: maysamyabandeh fbshipit-source-id: ff2e59fb1e568c122d2cdbf598310f3613b7d212 --- db/db_impl.h | 45 ++++++++++++++++++++++++++++++++++----------- db/db_impl_files.cc | 9 ++++++--- db/db_impl_open.cc | 31 +++++++++++++++++++++++-------- db/db_impl_write.cc | 14 ++++++++++++-- 4 files changed, 75 insertions(+), 24 deletions(-) diff --git a/db/db_impl.h b/db/db_impl.h index 5cd7d2cfb..22ce36fd3 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -843,11 +843,16 @@ class DBImpl : public DB { // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; - // It protects the back() of logs_ and alive_log_files_. Any push_back to - // these must be under log_write_mutex_ and any access that requires the - // back() to remain the same must also lock log_write_mutex_. + // In addition to mutex_, log_write_mutex_ protected writes to logs_ and + // logfile_number_. With concurrent_prepare it also protects alive_log_files_, + // and log_empty_. Refer to the definition of each variable below for more + // details. InstrumentedMutex log_write_mutex_; // State below is protected by mutex_ + // With concurrent_prepare enabled, some of the variables that accessed during + // WriteToWAL need different synchronization: log_empty_, alive_log_files_, + // logs_, logfile_number_. Refer to the definition of each variable below for + // more description. mutable InstrumentedMutex mutex_; std::atomic shutting_down_; @@ -861,10 +866,20 @@ class DBImpl : public DB { // * whenever there is an error in background purge, flush or compaction // * whenever num_running_ingest_file_ goes to 0. InstrumentedCondVar bg_cv_; + // Writes are protected by locking both mutex_ and log_write_mutex_, and reads + // must be under either mutex_ or log_write_mutex_. Since after ::Open, + // logfile_number_ is currently updated only in write_thread_, it can be read + // from the same write_thread_ without any locks. uint64_t logfile_number_; std::deque log_recycle_files; // a list of log files that we can recycle bool log_dir_synced_; + // Without concurrent_prepare, read and writes to log_empty_ are protected by + // mutex_. Since it is currently updated/read only in write_thread_, it can be + // accessed from the same write_thread_ without any locks. With + // concurrent_prepare writes, where it can be updated in different threads, + // read and writes are protected by log_write_mutex_ instead. This is to avoid + // expesnive mutex_ lock during WAL write, which update log_empty_. bool log_empty_; ColumnFamilyHandleImpl* default_cf_handle_; InternalStats* default_cf_internal_stats_; @@ -899,18 +914,26 @@ class DBImpl : public DB { // true for some prefix of logs_ bool getting_synced = false; }; + // Without concurrent_prepare, read and writes to alive_log_files_ are + // protected by mutex_. However since back() is never popped, and push_back() + // is done only from write_thread_, the same thread can access the item + // reffered by back() without mutex_. With concurrent_prepare_, writes + // are protected by locking both mutex_ and log_write_mutex_, and reads must + // be under either mutex_ or log_write_mutex_. std::deque alive_log_files_; // Log files that aren't fully synced, and the current log file. // Synchronization: - // - push_back() is done from write thread with locked mutex_, - // - pop_front() is done from any thread with locked mutex_, + // - push_back() is done from write_thread_ with locked mutex_ and + // log_write_mutex_ + // - pop_front() is done from any thread with locked mutex_ and + // log_write_mutex_ + // - reads are done with either locked mutex_ or log_write_mutex_ // - back() and items with getting_synced=true are not popped, - // - it follows that write thread with unlocked mutex_ can safely access - // back() and items with getting_synced=true. - // -- Update: apparently this was a mistake. back() should be called under - // mute_: https://github.com/facebook/rocksdb/pull/1774 - // - When concurrent write threads is enabled, back(), push_back(), and - // pop_front() must be called within log_write_mutex_ + // - The same thread that sets getting_synced=true will reset it. + // - it follows that the object referred by back() can be safely read from + // the write_thread_ without using mutex + // - it follows that the items with getting_synced=true can be safely read + // from the same thread that has set getting_synced=true std::deque logs_; // Signaled when getting_synced becomes false for some of the logs_. InstrumentedCondVar log_sync_cv_; diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 6d16b8331..d34f46402 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -252,9 +252,12 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } job_context->size_log_to_delete += earliest.size; total_log_size_ -= earliest.size; - { - InstrumentedMutexLock wl(&log_write_mutex_); - alive_log_files_.pop_front(); + if (concurrent_prepare_) { + log_write_mutex_.Lock(); + } + alive_log_files_.pop_front(); + if (concurrent_prepare_) { + log_write_mutex_.Unlock(); } // Current log should always stay alive since it can't have // number < MinLogNumber(). diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 52611f172..754fc2ae6 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -811,9 +811,15 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, if (data_seen && !flushed) { // Mark these as alive so they'll be considered for deletion later by // FindObsoleteFiles() + if (concurrent_prepare_) { + log_write_mutex_.Lock(); + } for (auto log_number : log_numbers) { alive_log_files_.push_back(LogFileNumberSize(log_number)); } + if (concurrent_prepare_) { + log_write_mutex_.Unlock(); + } } event_logger_.Log() << "job" << job_id << "event" @@ -986,14 +992,17 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { lfile->SetPreallocationBlockSize( impl->GetWalPreallocateBlockSize(max_write_buffer_size)); - impl->logfile_number_ = new_log_number; - unique_ptr file_writer( - new WritableFileWriter(std::move(lfile), opt_env_options)); - impl->logs_.emplace_back( - new_log_number, - new log::Writer( - std::move(file_writer), new_log_number, - impl->immutable_db_options_.recycle_log_file_num > 0)); + { + InstrumentedMutexLock wl(&impl->log_write_mutex_); + impl->logfile_number_ = new_log_number; + unique_ptr file_writer( + new WritableFileWriter(std::move(lfile), opt_env_options)); + impl->logs_.emplace_back( + new_log_number, + new log::Writer( + std::move(file_writer), new_log_number, + impl->immutable_db_options_.recycle_log_file_num > 0)); + } // set column family handles for (auto cf : column_families) { @@ -1027,8 +1036,14 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, delete impl->InstallSuperVersionAndScheduleWork( cfd, nullptr, *cfd->GetLatestMutableCFOptions()); } + if (impl->concurrent_prepare_) { + impl->log_write_mutex_.Lock(); + } impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); + if (impl->concurrent_prepare_) { + impl->log_write_mutex_.Unlock(); + } impl->DeleteObsoleteFiles(); s = impl->directories_.GetDbDir()->Fsync(); } diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 734ec841a..4dfef1bad 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -672,6 +672,8 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, return merged_batch; } +// When concurrent_prepare_ is disabled, this function is called from the only +// write thread. Otherwise this must be called holding log_write_mutex_. Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size) { @@ -683,6 +685,8 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, *log_used = logfile_number_; } total_log_size_ += log_entry.size(); + // TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here + // since alive_log_files_ might be modified concurrently alive_log_files_.back().AddSize(log_entry.size()); log_empty_ = false; return status; @@ -1021,6 +1025,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutex_.AssertHeld(); WriteThread::Writer nonmem_w; if (concurrent_prepare_) { + // SwitchMemtable is a rare event. To simply the reasoning, we make sure + // that there is no concurrent thread writing to WAL. nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } @@ -1037,9 +1043,13 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { // Attempt to switch to a new memtable and trigger flush of old. // Do this without holding the dbmutex lock. assert(versions_->prev_log_number() == 0); - //log_write_mutex_.Lock(); + if (concurrent_prepare_) { + log_write_mutex_.Lock(); + } bool creating_new_log = !log_empty_; - //log_write_mutex_.Unlock(); + if (concurrent_prepare_) { + log_write_mutex_.Unlock(); + } uint64_t recycle_log_number = 0; if (creating_new_log && immutable_db_options_.recycle_log_file_num && !log_recycle_files.empty()) {