Simplify and document sync rules for logs_ etc

Summary:
Adding/Correcting inline comments and clarify the sync rules. To make it simple to reason, the rules are a big general which ended up to some extra synchronizations. However such synchronizations are not on the fast path, and they are worth the simplicity.
Closes https://github.com/facebook/rocksdb/pull/2517

Differential Revision: D5348239

Pulled By: maysamyabandeh

fbshipit-source-id: ff2e59fb1e568c122d2cdbf598310f3613b7d212
main
Maysam Yabandeh 8 years ago committed by Facebook Github Bot
parent d310e0f339
commit 1e34d07e18
  1. 45
      db/db_impl.h
  2. 9
      db/db_impl_files.cc
  3. 31
      db/db_impl_open.cc
  4. 14
      db/db_impl_write.cc

@ -843,11 +843,16 @@ class DBImpl : public DB {
// Lock over the persistent DB state. Non-nullptr iff successfully acquired. // Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_; FileLock* db_lock_;
// It protects the back() of logs_ and alive_log_files_. Any push_back to // In addition to mutex_, log_write_mutex_ protected writes to logs_ and
// these must be under log_write_mutex_ and any access that requires the // logfile_number_. With concurrent_prepare it also protects alive_log_files_,
// back() to remain the same must also lock log_write_mutex_. // and log_empty_. Refer to the definition of each variable below for more
// details.
InstrumentedMutex log_write_mutex_; InstrumentedMutex log_write_mutex_;
// State below is protected by mutex_ // State below is protected by mutex_
// With concurrent_prepare enabled, some of the variables that accessed during
// WriteToWAL need different synchronization: log_empty_, alive_log_files_,
// logs_, logfile_number_. Refer to the definition of each variable below for
// more description.
mutable InstrumentedMutex mutex_; mutable InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_; std::atomic<bool> shutting_down_;
@ -861,10 +866,20 @@ class DBImpl : public DB {
// * whenever there is an error in background purge, flush or compaction // * whenever there is an error in background purge, flush or compaction
// * whenever num_running_ingest_file_ goes to 0. // * whenever num_running_ingest_file_ goes to 0.
InstrumentedCondVar bg_cv_; InstrumentedCondVar bg_cv_;
// Writes are protected by locking both mutex_ and log_write_mutex_, and reads
// must be under either mutex_ or log_write_mutex_. Since after ::Open,
// logfile_number_ is currently updated only in write_thread_, it can be read
// from the same write_thread_ without any locks.
uint64_t logfile_number_; uint64_t logfile_number_;
std::deque<uint64_t> std::deque<uint64_t>
log_recycle_files; // a list of log files that we can recycle log_recycle_files; // a list of log files that we can recycle
bool log_dir_synced_; bool log_dir_synced_;
// Without concurrent_prepare, read and writes to log_empty_ are protected by
// mutex_. Since it is currently updated/read only in write_thread_, it can be
// accessed from the same write_thread_ without any locks. With
// concurrent_prepare writes, where it can be updated in different threads,
// read and writes are protected by log_write_mutex_ instead. This is to avoid
// expesnive mutex_ lock during WAL write, which update log_empty_.
bool log_empty_; bool log_empty_;
ColumnFamilyHandleImpl* default_cf_handle_; ColumnFamilyHandleImpl* default_cf_handle_;
InternalStats* default_cf_internal_stats_; InternalStats* default_cf_internal_stats_;
@ -899,18 +914,26 @@ class DBImpl : public DB {
// true for some prefix of logs_ // true for some prefix of logs_
bool getting_synced = false; bool getting_synced = false;
}; };
// Without concurrent_prepare, read and writes to alive_log_files_ are
// protected by mutex_. However since back() is never popped, and push_back()
// is done only from write_thread_, the same thread can access the item
// reffered by back() without mutex_. With concurrent_prepare_, writes
// are protected by locking both mutex_ and log_write_mutex_, and reads must
// be under either mutex_ or log_write_mutex_.
std::deque<LogFileNumberSize> alive_log_files_; std::deque<LogFileNumberSize> alive_log_files_;
// Log files that aren't fully synced, and the current log file. // Log files that aren't fully synced, and the current log file.
// Synchronization: // Synchronization:
// - push_back() is done from write thread with locked mutex_, // - push_back() is done from write_thread_ with locked mutex_ and
// - pop_front() is done from any thread with locked mutex_, // log_write_mutex_
// - pop_front() is done from any thread with locked mutex_ and
// log_write_mutex_
// - reads are done with either locked mutex_ or log_write_mutex_
// - back() and items with getting_synced=true are not popped, // - back() and items with getting_synced=true are not popped,
// - it follows that write thread with unlocked mutex_ can safely access // - The same thread that sets getting_synced=true will reset it.
// back() and items with getting_synced=true. // - it follows that the object referred by back() can be safely read from
// -- Update: apparently this was a mistake. back() should be called under // the write_thread_ without using mutex
// mute_: https://github.com/facebook/rocksdb/pull/1774 // - it follows that the items with getting_synced=true can be safely read
// - When concurrent write threads is enabled, back(), push_back(), and // from the same thread that has set getting_synced=true
// pop_front() must be called within log_write_mutex_
std::deque<LogWriterNumber> logs_; std::deque<LogWriterNumber> logs_;
// Signaled when getting_synced becomes false for some of the logs_. // Signaled when getting_synced becomes false for some of the logs_.
InstrumentedCondVar log_sync_cv_; InstrumentedCondVar log_sync_cv_;

@ -252,9 +252,12 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
} }
job_context->size_log_to_delete += earliest.size; job_context->size_log_to_delete += earliest.size;
total_log_size_ -= earliest.size; total_log_size_ -= earliest.size;
{ if (concurrent_prepare_) {
InstrumentedMutexLock wl(&log_write_mutex_); log_write_mutex_.Lock();
alive_log_files_.pop_front(); }
alive_log_files_.pop_front();
if (concurrent_prepare_) {
log_write_mutex_.Unlock();
} }
// Current log should always stay alive since it can't have // Current log should always stay alive since it can't have
// number < MinLogNumber(). // number < MinLogNumber().

@ -811,9 +811,15 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
if (data_seen && !flushed) { if (data_seen && !flushed) {
// Mark these as alive so they'll be considered for deletion later by // Mark these as alive so they'll be considered for deletion later by
// FindObsoleteFiles() // FindObsoleteFiles()
if (concurrent_prepare_) {
log_write_mutex_.Lock();
}
for (auto log_number : log_numbers) { for (auto log_number : log_numbers) {
alive_log_files_.push_back(LogFileNumberSize(log_number)); alive_log_files_.push_back(LogFileNumberSize(log_number));
} }
if (concurrent_prepare_) {
log_write_mutex_.Unlock();
}
} }
event_logger_.Log() << "job" << job_id << "event" event_logger_.Log() << "job" << job_id << "event"
@ -986,14 +992,17 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) { if (s.ok()) {
lfile->SetPreallocationBlockSize( lfile->SetPreallocationBlockSize(
impl->GetWalPreallocateBlockSize(max_write_buffer_size)); impl->GetWalPreallocateBlockSize(max_write_buffer_size));
impl->logfile_number_ = new_log_number; {
unique_ptr<WritableFileWriter> file_writer( InstrumentedMutexLock wl(&impl->log_write_mutex_);
new WritableFileWriter(std::move(lfile), opt_env_options)); impl->logfile_number_ = new_log_number;
impl->logs_.emplace_back( unique_ptr<WritableFileWriter> file_writer(
new_log_number, new WritableFileWriter(std::move(lfile), opt_env_options));
new log::Writer( impl->logs_.emplace_back(
std::move(file_writer), new_log_number, new_log_number,
impl->immutable_db_options_.recycle_log_file_num > 0)); new log::Writer(
std::move(file_writer), new_log_number,
impl->immutable_db_options_.recycle_log_file_num > 0));
}
// set column family handles // set column family handles
for (auto cf : column_families) { for (auto cf : column_families) {
@ -1027,8 +1036,14 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
delete impl->InstallSuperVersionAndScheduleWork( delete impl->InstallSuperVersionAndScheduleWork(
cfd, nullptr, *cfd->GetLatestMutableCFOptions()); cfd, nullptr, *cfd->GetLatestMutableCFOptions());
} }
if (impl->concurrent_prepare_) {
impl->log_write_mutex_.Lock();
}
impl->alive_log_files_.push_back( impl->alive_log_files_.push_back(
DBImpl::LogFileNumberSize(impl->logfile_number_)); DBImpl::LogFileNumberSize(impl->logfile_number_));
if (impl->concurrent_prepare_) {
impl->log_write_mutex_.Unlock();
}
impl->DeleteObsoleteFiles(); impl->DeleteObsoleteFiles();
s = impl->directories_.GetDbDir()->Fsync(); s = impl->directories_.GetDbDir()->Fsync();
} }

@ -672,6 +672,8 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
return merged_batch; return merged_batch;
} }
// When concurrent_prepare_ is disabled, this function is called from the only
// write thread. Otherwise this must be called holding log_write_mutex_.
Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
log::Writer* log_writer, uint64_t* log_used, log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size) { uint64_t* log_size) {
@ -683,6 +685,8 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
*log_used = logfile_number_; *log_used = logfile_number_;
} }
total_log_size_ += log_entry.size(); total_log_size_ += log_entry.size();
// TODO(myabandeh): it might be unsafe to access alive_log_files_.back() here
// since alive_log_files_ might be modified concurrently
alive_log_files_.back().AddSize(log_entry.size()); alive_log_files_.back().AddSize(log_entry.size());
log_empty_ = false; log_empty_ = false;
return status; return status;
@ -1021,6 +1025,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
mutex_.AssertHeld(); mutex_.AssertHeld();
WriteThread::Writer nonmem_w; WriteThread::Writer nonmem_w;
if (concurrent_prepare_) { if (concurrent_prepare_) {
// SwitchMemtable is a rare event. To simply the reasoning, we make sure
// that there is no concurrent thread writing to WAL.
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
} }
@ -1037,9 +1043,13 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
// Attempt to switch to a new memtable and trigger flush of old. // Attempt to switch to a new memtable and trigger flush of old.
// Do this without holding the dbmutex lock. // Do this without holding the dbmutex lock.
assert(versions_->prev_log_number() == 0); assert(versions_->prev_log_number() == 0);
//log_write_mutex_.Lock(); if (concurrent_prepare_) {
log_write_mutex_.Lock();
}
bool creating_new_log = !log_empty_; bool creating_new_log = !log_empty_;
//log_write_mutex_.Unlock(); if (concurrent_prepare_) {
log_write_mutex_.Unlock();
}
uint64_t recycle_log_number = 0; uint64_t recycle_log_number = 0;
if (creating_new_log && immutable_db_options_.recycle_log_file_num && if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
!log_recycle_files.empty()) { !log_recycle_files.empty()) {

Loading…
Cancel
Save