diff --git a/db/db_impl.cc b/db/db_impl.cc index 291493e7e..133297c1e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -224,6 +224,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) log_dir_synced_(false), log_empty_(true), default_cf_handle_(nullptr), + log_sync_cv_(&mutex_), total_log_size_(0), max_total_in_memory_state_(0), is_snapshot_supported_(true), @@ -367,6 +368,7 @@ DBImpl::~DBImpl() { for (auto l : logs_to_free_) { delete l; } + logs_.clear(); // versions need to be destroyed before table_cache since it can hold // references to table_cache. @@ -535,8 +537,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, versions_->GetObsoleteFiles(&job_context->sst_delete_files, job_context->min_pending_output); - uint64_t min_log_number = versions_->MinLogNumber(); if (!alive_log_files_.empty()) { + uint64_t min_log_number = versions_->MinLogNumber(); // find newly obsoleted log files while (alive_log_files_.begin()->number < min_log_number) { auto& earliest = *alive_log_files_.begin(); @@ -547,6 +549,18 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // number < MinLogNumber(). assert(alive_log_files_.size()); } + while (!logs_.empty() && logs_.front().number < min_log_number) { + auto& log = logs_.front(); + if (log.getting_synced) { + log_sync_cv_.Wait(); + // logs_ could have changed while we were waiting. + continue; + } + logs_to_free_.push_back(log.writer.release()); + logs_.pop_front(); + } + // Current log cannot be obsolete. + assert(!logs_.empty()); } // We're just cleaning up for DB::Write(). @@ -597,6 +611,11 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } } +Status DBImpl::SyncLog(log::Writer* log) { + assert(log); + return log->file()->Sync(db_options_.use_fsync); +} + namespace { bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, const JobContext::CandidateFileInfo& second) { @@ -3448,11 +3467,22 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, uint64_t last_sequence = versions_->LastSequence(); WriteThread::Writer* last_writer = &w; autovector write_batch_group; + bool need_wal_sync = !write_options.disableWAL && write_options.sync; if (status.ok()) { last_batch_group_size_ = write_thread_.BuildBatchGroup(&last_writer, &write_batch_group); + if (need_wal_sync) { + while (logs_.front().getting_synced) { + log_sync_cv_.Wait(); + } + for (auto& log : logs_) { + assert(!log.getting_synced); + log.getting_synced = true; + } + } + // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes @@ -3499,16 +3529,28 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (!write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); Slice log_entry = WriteBatchInternal::Contents(updates); - status = log_->AddRecord(log_entry); + status = logs_.back().writer->AddRecord(log_entry); total_log_size_ += log_entry.size(); alive_log_files_.back().AddSize(log_entry.size()); log_empty_ = false; log_size = log_entry.size(); RecordTick(stats_, WAL_FILE_BYTES, log_size); - if (status.ok() && write_options.sync) { + if (status.ok() && need_wal_sync) { RecordTick(stats_, WAL_FILE_SYNCED); StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); - status = log_->file()->Sync(db_options_.use_fsync); + // It's safe to access logs_ with unlocked mutex_ here because: + // - we've set getting_synced=true for all logs, + // so other threads won't pop from logs_ while we're here, + // - only writer thread can push to logs_, and we're in + // writer thread, so no one will push to logs_, + // - as long as other threads don't modify it, it's safe to read + // from std::deque from multiple threads concurrently. + for (auto& log : logs_) { + status = SyncLog(log.writer.get()); + if (!status.ok()) { + break; + } + } if (status.ok() && !log_dir_synced_) { // We only sync WAL directory the first time WAL syncing is // requested, so that in case users never turn on WAL sync, @@ -3557,14 +3599,27 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } else { // Operation failed. Make sure sure mutex is held for cleanup code below. mutex_.Lock(); - } + } - if (db_options_.paranoid_checks && !status.ok() && - !status.IsBusy() && bg_error_.ok()) { + if (db_options_.paranoid_checks && !status.ok() && + !status.IsBusy() && bg_error_.ok()) { bg_error_ = status; // stop compaction & fail any further writes } mutex_.AssertHeld(); + + if (need_wal_sync) { + while (logs_.size() > 1) { + auto& log = logs_.front(); + assert(log.getting_synced); + logs_to_free_.push_back(log.writer.release()); + logs_.pop_front(); + } + assert(logs_.back().getting_synced); + logs_.back().getting_synced = false; + log_sync_cv_.SignalAll(); + } + write_thread_.ExitWriteThread(&w, last_writer, status); mutex_.Unlock(); @@ -3675,9 +3730,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { if (creating_new_log) { logfile_number_ = new_log_number; assert(new_log != nullptr); - logs_to_free_.push_back(log_.release()); - log_.reset(new_log); log_empty_ = true; + logs_.emplace_back(logfile_number_, std::unique_ptr(new_log)); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); for (auto loop_cfd : *versions_->GetColumnFamilySet()) { // all this is just optimization to delete logs that @@ -4210,7 +4264,10 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, impl->logfile_number_ = new_log_number; unique_ptr file_writer( new WritableFileWriter(std::move(lfile), opt_env_options)); - impl->log_.reset(new log::Writer(std::move(file_writer))); + impl->logs_.emplace_back( + new_log_number, + std::unique_ptr( + new log::Writer(std::move(file_writer)))); // set column family handles for (auto cf : column_families) { diff --git a/db/db_impl.h b/db/db_impl.h index 5f6a09348..4d63f6bb3 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -308,6 +308,8 @@ class DBImpl : public DB { // It is not necessary to hold the mutex when invoking this method. void PurgeObsoleteFiles(const JobContext& background_contet); + Status SyncLog(log::Writer* log); + ColumnFamilyHandle* DefaultColumnFamily() const override; const SnapshotList& snapshots() const { return snapshots_; } @@ -514,7 +516,6 @@ class DBImpl : public DB { // * whenever there is an error in background flush or compaction InstrumentedCondVar bg_cv_; uint64_t logfile_number_; - unique_ptr log_; bool log_dir_synced_; bool log_empty_; ColumnFamilyHandleImpl* default_cf_handle_; @@ -522,13 +523,31 @@ class DBImpl : public DB { unique_ptr column_family_memtables_; struct LogFileNumberSize { explicit LogFileNumberSize(uint64_t _number) - : number(_number), size(0), getting_flushed(false) {} + : number(_number) {} void AddSize(uint64_t new_size) { size += new_size; } uint64_t number; - uint64_t size; - bool getting_flushed; + uint64_t size = 0; + bool getting_flushed = false; + }; + struct LogWriterNumber { + LogWriterNumber(uint64_t _number, std::unique_ptr _writer) + : number(_number), writer(std::move(_writer)) {} + uint64_t number; + std::unique_ptr writer; + // true for some prefix of logs_ + bool getting_synced = false; }; std::deque alive_log_files_; + // Log files that aren't fully synced, and the current log file. + // Synchronization: + // - push_back() is done from write thread with locked mutex_, + // - pop_front() is done from any thread with locked mutex_, + // - back() and items with getting_synced=true are not popped, + // - it follows that write thread with unlocked mutex_ can safely access + // back() and items with getting_synced=true. + std::deque logs_; + // Signaled when getting_synced becomes false for some of the logs_. + InstrumentedCondVar log_sync_cv_; uint64_t total_log_size_; // only used for dynamically adjusting max_total_wal_size. it is a sum of // [write_buffer_size * max_write_buffer_number] over all column families diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 29d5d7387..9ddad2dd5 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -807,10 +807,8 @@ class SleepingBackgroundTask { bool done_with_sleep_; }; -// Disable the test because it is not passing. // Previous log file is not fsynced if sync is forced after log rolling. -// TODO(FB internal task#6730880) Fix the bug -TEST_P(FaultInjectionTest, DISABLED_WriteOptionSyncTest) { +TEST_P(FaultInjectionTest, WriteOptionSyncTest) { SleepingBackgroundTask sleeping_task_low; env_->SetBackgroundThreads(1, Env::HIGH); // Block the job queue to prevent flush job from running.