Do not hold mutex when write keys if not necessary (#7516)

Summary:
## Problem Summary
RocksDB will acquire the global mutex of db instance for every time when user calls `Write`.  When RocksDB schedules a lot of compaction jobs,   it will compete the mutex with write thread and it will hurt the write performance.

## Problem Solution:
I want to use log_write_mutex to replace the global mutex in most case so that we do not acquire it in write-thread unless there is a write-stall event or a write-buffer-full event occur.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7516

Test Plan:
1. make check
2. CI
3. COMPILE_WITH_TSAN=1 make db_stress
make crash_test
make crash_test_with_multiops_wp_txn
make crash_test_with_multiops_wc_txn
make crash_test_with_atomic_flush

Reviewed By: siying

Differential Revision: D36908702

Pulled By: riversand963

fbshipit-source-id: 59b13881f4f5c0a58fd3ca79128a396d9cd98efe
main
Wallace 2 years ago committed by Facebook GitHub Bot
parent a0c63083d3
commit 1e9bf25f61
  1. 1
      HISTORY.md
  2. 14
      db/db_compaction_test.cc
  3. 93
      db/db_impl/db_impl.cc
  4. 121
      db/db_impl/db_impl.h
  5. 32
      db/db_impl/db_impl_compaction_flush.cc
  6. 2
      db/db_impl/db_impl_debug.cc
  7. 53
      db/db_impl/db_impl_files.cc
  8. 14
      db/db_impl/db_impl_open.cc
  9. 117
      db/db_impl/db_impl_write.cc
  10. 7
      db/error_handler.cc
  11. 19
      db/error_handler.h
  12. 2
      db/perf_context_test.cc
  13. 14
      db/version_set_test.cc
  14. 37
      db/wal_edit.cc
  15. 10
      db/wal_edit.h
  16. 9
      db/wal_edit_test.cc
  17. 3
      utilities/backup/backup_engine_test.cc

@ -99,6 +99,7 @@
### Behavior changes ### Behavior changes
* DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984) * DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984)
* DB::Write does not hold global `mutex_` if this db instance does not need to switch wal and mem-table (#7516).
* Removed support for reading Bloom filters using obsolete block-based filter format. (Support for writing such filters was dropped in 7.0.) For good read performance on old DBs using these filters, a full compaction is required. * Removed support for reading Bloom filters using obsolete block-based filter format. (Support for writing such filters was dropped in 7.0.) For good read performance on old DBs using these filters, a full compaction is required.
* Per KV checksum in write batch is verified before a write batch is written to WAL to detect any corruption to the write batch (#10114). * Per KV checksum in write batch is verified before a write batch is written to WAL to detect any corruption to the write batch (#10114).

@ -5659,18 +5659,10 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
for (int j = 0; j != kNumKeysPerFile; ++j) { for (int j = 0; j != kNumKeysPerFile; ++j) {
ASSERT_OK(Put(Key(j), rnd.RandomString(990))); ASSERT_OK(Put(Key(j), rnd.RandomString(990)));
} }
if (0 == i) { if (i > 0) {
// When we reach here, the memtables have kNumKeysPerFile keys. Note that ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// flush is not yet triggered. We need to write an extra key so that the ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i);
// write path will call PreprocessWrite and flush the previous key-value
// pairs to e flushed. After that, there will be the newest key in the
// memtable, and a bunch of L0 files. Since there is already one key in
// the memtable, then for i = 1, 2, ..., we do not have to write this
// extra key to trigger flush.
ASSERT_OK(Put("", ""));
} }
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
} }
// When we reach this point, there will be level0_stop_writes_trigger L0 // When we reach this point, there will be level0_stop_writes_trigger L0
// files and one extra key (99) in memory, which overlaps with the external // files and one extra key (99) in memory, which overlaps with the external

@ -185,7 +185,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
log_dir_synced_(false), log_dir_synced_(false),
log_empty_(true), log_empty_(true),
persist_stats_cf_handle_(nullptr), persist_stats_cf_handle_(nullptr),
log_sync_cv_(&mutex_), log_sync_cv_(&log_write_mutex_),
total_log_size_(0), total_log_size_(0),
is_snapshot_supported_(true), is_snapshot_supported_(true),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
@ -273,6 +273,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
mutable_db_options_.Dump(immutable_db_options_.info_log.get()); mutable_db_options_.Dump(immutable_db_options_.info_log.get());
DumpSupportInfo(immutable_db_options_.info_log.get()); DumpSupportInfo(immutable_db_options_.info_log.get());
max_total_wal_size_.store(mutable_db_options_.max_total_wal_size,
std::memory_order_relaxed);
if (write_buffer_manager_) { if (write_buffer_manager_) {
wbm_stall_.reset(new WBMStallInterface()); wbm_stall_.reset(new WBMStallInterface());
} }
@ -625,26 +627,28 @@ Status DBImpl::CloseHelper() {
job_context.Clean(); job_context.Clean();
mutex_.Lock(); mutex_.Lock();
} }
{
for (auto l : logs_to_free_) { InstrumentedMutexLock lock(&log_write_mutex_);
delete l; for (auto l : logs_to_free_) {
} delete l;
for (auto& log : logs_) { }
uint64_t log_number = log.writer->get_log_number(); for (auto& log : logs_) {
Status s = log.ClearWriter(); uint64_t log_number = log.writer->get_log_number();
if (!s.ok()) { Status s = log.ClearWriter();
ROCKS_LOG_WARN( if (!s.ok()) {
immutable_db_options_.info_log, ROCKS_LOG_WARN(
"Unable to Sync WAL file %s with error -- %s", immutable_db_options_.info_log,
LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(), "Unable to Sync WAL file %s with error -- %s",
s.ToString().c_str()); LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
// Retain the first error s.ToString().c_str());
if (ret.ok()) { // Retain the first error
ret = s; if (ret.ok()) {
ret = s;
}
} }
} }
logs_.clear();
} }
logs_.clear();
// Table cache may have table handles holding blocks from the block cache. // Table cache may have table handles holding blocks from the block cache.
// We need to release them before the block cache is destroyed. The block // We need to release them before the block cache is destroyed. The block
@ -1108,6 +1112,7 @@ Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
} }
void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) { void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
mutex_.AssertHeld();
if (!job_context->logs_to_free.empty()) { if (!job_context->logs_to_free.empty()) {
for (auto l : job_context->logs_to_free) { for (auto l : job_context->logs_to_free) {
AddToLogsToFreeQueue(l); AddToLogsToFreeQueue(l);
@ -1285,6 +1290,11 @@ Status DBImpl::SetDBOptions(
new_options.stats_persist_period_sec); new_options.stats_persist_period_sec);
mutex_.Lock(); mutex_.Lock();
} }
if (new_options.max_total_wal_size !=
mutable_db_options_.max_total_wal_size) {
max_total_wal_size_.store(new_options.max_total_wal_size,
std::memory_order_release);
}
write_controller_.set_max_delayed_write_rate( write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate); new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1 table_cache_.get()->SetCapacity(new_options.max_open_files == -1
@ -1405,7 +1415,7 @@ Status DBImpl::SyncWAL() {
uint64_t current_log_number; uint64_t current_log_number;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&log_write_mutex_);
assert(!logs_.empty()); assert(!logs_.empty());
// This SyncWAL() call only cares about logs up to this number. // This SyncWAL() call only cares about logs up to this number.
@ -1462,19 +1472,37 @@ Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
VersionEdit synced_wals;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&log_write_mutex_);
if (status.ok()) { if (status.ok()) {
status = MarkLogsSynced(current_log_number, need_log_dir_sync); MarkLogsSynced(current_log_number, need_log_dir_sync, &synced_wals);
} else { } else {
MarkLogsNotSynced(current_log_number); MarkLogsNotSynced(current_log_number);
} }
} }
if (status.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
status = ApplyWALToManifest(&synced_wals);
}
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
return status; return status;
} }
Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) {
// not empty, write to MANIFEST.
mutex_.AssertHeld();
Status status =
versions_->LogAndApplyToDefaultColumnFamily(synced_wals, &mutex_);
if (!status.ok() && versions_->io_status().IsIOError()) {
status = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
return status;
}
Status DBImpl::LockWAL() { Status DBImpl::LockWAL() {
log_write_mutex_.Lock(); log_write_mutex_.Lock();
auto cur_log_writer = logs_.back().writer; auto cur_log_writer = logs_.back().writer;
@ -1494,12 +1522,12 @@ Status DBImpl::UnlockWAL() {
return Status::OK(); return Status::OK();
} }
Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) { void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
mutex_.AssertHeld(); VersionEdit* synced_wals) {
log_write_mutex_.AssertHeld();
if (synced_dir && logfile_number_ == up_to) { if (synced_dir && logfile_number_ == up_to) {
log_dir_synced_ = true; log_dir_synced_ = true;
} }
VersionEdit synced_wals;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it; auto& wal = *it;
assert(wal.IsSyncing()); assert(wal.IsSyncing());
@ -1507,11 +1535,9 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
if (logs_.size() > 1) { if (logs_.size() > 1) {
if (immutable_db_options_.track_and_verify_wals_in_manifest && if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.GetPreSyncSize() > 0) { wal.GetPreSyncSize() > 0) {
synced_wals.AddWal(wal.number, WalMetadata(wal.GetPreSyncSize())); synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
} }
logs_to_free_.push_back(wal.ReleaseWriter()); logs_to_free_.push_back(wal.ReleaseWriter());
// To modify logs_ both mutex_ and log_write_mutex_ must be held
InstrumentedMutexLock l(&log_write_mutex_);
it = logs_.erase(it); it = logs_.erase(it);
} else { } else {
wal.FinishSync(); wal.FinishSync();
@ -1520,22 +1546,11 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
} }
assert(logs_.empty() || logs_[0].number > up_to || assert(logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].IsSyncing())); (logs_.size() == 1 && !logs_[0].IsSyncing()));
Status s;
if (synced_wals.IsWalAddition()) {
// not empty, write to MANIFEST.
s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
if (!s.ok() && versions_->io_status().IsIOError()) {
s = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
}
log_sync_cv_.SignalAll(); log_sync_cv_.SignalAll();
return s;
} }
void DBImpl::MarkLogsNotSynced(uint64_t up_to) { void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
mutex_.AssertHeld(); log_write_mutex_.AssertHeld();
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to; for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) { ++it) {
auto& wal = *it; auto& wal = *it;

@ -998,6 +998,7 @@ class DBImpl : public DB {
} }
void AddToLogsToFreeQueue(log::Writer* log_writer) { void AddToLogsToFreeQueue(log::Writer* log_writer) {
mutex_.AssertHeld();
logs_to_free_queue_.push_back(log_writer); logs_to_free_queue_.push_back(log_writer);
} }
@ -1298,7 +1299,7 @@ class DBImpl : public DB {
// only used for dynamically adjusting max_total_wal_size. it is a sum of // only used for dynamically adjusting max_total_wal_size. it is a sum of
// [write_buffer_size * max_write_buffer_number] over all column families // [write_buffer_size * max_write_buffer_number] over all column families
uint64_t max_total_in_memory_state_; std::atomic<uint64_t> max_total_in_memory_state_;
// The options to access storage files // The options to access storage files
const FileOptions file_options_; const FileOptions file_options_;
@ -1648,6 +1649,15 @@ class DBImpl : public DB {
uint64_t pre_sync_size = 0; uint64_t pre_sync_size = 0;
}; };
struct LogContext {
explicit LogContext(bool need_sync = false)
: need_log_sync(need_sync), need_log_dir_sync(need_sync) {}
bool need_log_sync = false;
bool need_log_dir_sync = false;
log::Writer* writer = nullptr;
LogFileNumberSize* log_file_number_size = nullptr;
};
// PurgeFileInfo is a structure to hold information of files to be deleted in // PurgeFileInfo is a structure to hold information of files to be deleted in
// purge_files_ // purge_files_
struct PurgeFileInfo { struct PurgeFileInfo {
@ -1801,7 +1811,7 @@ class DBImpl : public DB {
void ReleaseFileNumberFromPendingOutputs( void ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v); std::unique_ptr<std::list<uint64_t>::iterator>& v);
IOStatus SyncClosedLogs(JobContext* job_context); IOStatus SyncClosedLogs(JobContext* job_context, VersionEdit* synced_wals);
// Flush the in-memory write buffer to storage. Switches to a new // Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful. Then // log-file/memtable and writes a new descriptor iff successful. Then
@ -1961,8 +1971,8 @@ class DBImpl : public DB {
Status HandleWriteBufferManagerFlush(WriteContext* write_context); Status HandleWriteBufferManagerFlush(WriteContext* write_context);
// REQUIRES: mutex locked // REQUIRES: mutex locked
Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, Status PreprocessWrite(const WriteOptions& write_options,
WriteContext* write_context); LogContext* log_context, WriteContext* write_context);
// Merge write batches in the write group into merged_batch. // Merge write batches in the write group into merged_batch.
// Returns OK if merge is successful. // Returns OK if merge is successful.
@ -2101,7 +2111,8 @@ class DBImpl : public DB {
std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer); std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer);
// helper function to call after some of the logs_ were synced // helper function to call after some of the logs_ were synced
Status MarkLogsSynced(uint64_t up_to, bool synced_dir); void MarkLogsSynced(uint64_t up_to, bool synced_dir, VersionEdit* edit);
Status ApplyWALToManifest(VersionEdit* edit);
// WALs with log number up to up_to are not synced successfully. // WALs with log number up to up_to are not synced successfully.
void MarkLogsNotSynced(uint64_t up_to); void MarkLogsNotSynced(uint64_t up_to);
@ -2307,8 +2318,9 @@ class DBImpl : public DB {
// logfile_number_ is currently updated only in write_thread_, it can be read // logfile_number_ is currently updated only in write_thread_, it can be read
// from the same write_thread_ without any locks. // from the same write_thread_ without any locks.
uint64_t logfile_number_; uint64_t logfile_number_;
std::deque<uint64_t> // Log files that we can recycle. Must be protected by db mutex_.
log_recycle_files_; // a list of log files that we can recycle std::deque<uint64_t> log_recycle_files_;
// Protected by log_write_mutex_.
bool log_dir_synced_; bool log_dir_synced_;
// Without two_write_queues, read and writes to log_empty_ are protected by // Without two_write_queues, read and writes to log_empty_ are protected by
// mutex_. Since it is currently updated/read only in write_thread_, it can be // mutex_. Since it is currently updated/read only in write_thread_, it can be
@ -2322,26 +2334,93 @@ class DBImpl : public DB {
bool persistent_stats_cfd_exists_ = true; bool persistent_stats_cfd_exists_ = true;
// Without two_write_queues, read and writes to alive_log_files_ are // alive_log_files_ is protected by mutex_ and log_write_mutex_ with details
// protected by mutex_. With two_write_queues_, writes // as follows:
// are protected by locking both mutex_ and log_write_mutex_, and reads must // 1. read by FindObsoleteFiles() which can be called in either application
// be under either mutex_ or log_write_mutex_. // thread or RocksDB bg threads, both mutex_ and log_write_mutex_ are
// held.
// 2. pop_front() by FindObsoleteFiles(), both mutex_ and log_write_mutex_
// are held.
// 3. push_back() by DBImpl::Open() and DBImpl::RestoreAliveLogFiles()
// (actually called by Open()), only mutex_ is held because at this point,
// the DB::Open() call has not returned success to application, and the
// only other thread(s) that can conflict are bg threads calling
// FindObsoleteFiles() which ensure that both mutex_ and log_write_mutex_
// are held when accessing alive_log_files_.
// 4. read by DBImpl::Open() is protected by mutex_.
// 5. push_back() by SwitchMemtable(). Both mutex_ and log_write_mutex_ are
// held. This is done by the write group leader. Note that in the case of
// two-write-queues, another WAL-only write thread can be writing to the
// WAL concurrently. See 9.
// 6. read by SwitchWAL() with both mutex_ and log_write_mutex_ held. This is
// done by write group leader.
// 7. read by ConcurrentWriteToWAL() by the write group leader in the case of
// two-write-queues. Only log_write_mutex_ is held to protect concurrent
// pop_front() by FindObsoleteFiles().
// 8. read by PreprocessWrite() by the write group leader. log_write_mutex_
// is held to protect the data structure from concurrent pop_front() by
// FindObsoleteFiles().
// 9. read by ConcurrentWriteToWAL() by a WAL-only write thread in the case
// of two-write-queues. Only log_write_mutex_ is held. This suffices to
// protect the data structure from concurrent push_back() by current
// write group leader as well as pop_front() by FindObsoleteFiles().
std::deque<LogFileNumberSize> alive_log_files_; std::deque<LogFileNumberSize> alive_log_files_;
// Log files that aren't fully synced, and the current log file. // Log files that aren't fully synced, and the current log file.
// Synchronization: // Synchronization:
// - push_back() is done from write_thread_ with locked mutex_ and // 1. read by FindObsoleteFiles() which can be called either in application
// log_write_mutex_ // thread or RocksDB bg threads. log_write_mutex_ is always held, while
// - pop_front() is done from any thread with locked mutex_ and // some reads are performed without mutex_.
// log_write_mutex_ // 2. pop_front() by FindObsoleteFiles() with only log_write_mutex_ held.
// - reads are done with either locked mutex_ or log_write_mutex_ // 3. read by DBImpl::Open() with both mutex_ and log_write_mutex_.
// 4. emplace_back() by DBImpl::Open() with both mutex_ and log_write_mutex.
// Note that at this point, DB::Open() has not returned success to
// application, thus the only other thread(s) that can conflict are bg
// threads calling FindObsoleteFiles(). See 1.
// 5. iteration and clear() from CloseHelper() always hold log_write_mutex
// and mutex_.
// 6. back() called by APIs FlushWAL() and LockWAL() are protected by only
// log_write_mutex_. These two can be called by application threads after
// DB::Open() returns success to applications.
// 7. read by SyncWAL(), another API, protected by only log_write_mutex_.
// 8. read by MarkLogsNotSynced() and MarkLogsSynced() are protected by
// log_write_mutex_.
// 9. erase() by MarkLogsSynced() protected by log_write_mutex_.
// 10. read by SyncClosedLogs() protected by only log_write_mutex_. This can
// happen in bg flush threads after DB::Open() returns success to
// applications.
// 11. reads, e.g. front(), iteration, and back() called by PreprocessWrite()
// holds only the log_write_mutex_. This is done by the write group
// leader. A bg thread calling FindObsoleteFiles() or MarkLogsSynced()
// can happen concurrently. This is fine because log_write_mutex_ is used
// by all parties. See 2, 5, 9.
// 12. reads, empty(), back() called by SwitchMemtable() hold both mutex_ and
// log_write_mutex_. This happens in the write group leader.
// 13. emplace_back() by SwitchMemtable() hold both mutex_ and
// log_write_mutex_. This happens in the write group leader. Can conflict
// with bg threads calling FindObsoleteFiles(), MarkLogsSynced(),
// SyncClosedLogs(), etc. as well as application threads calling
// FlushWAL(), SyncWAL(), LockWAL(). This is fine because all parties
// require at least log_write_mutex_.
// 14. iteration called in WriteToWAL(write_group) protected by
// log_write_mutex_. This is done by write group leader when
// two-write-queues is disabled and write needs to sync logs.
// 15. back() called in ConcurrentWriteToWAL() protected by log_write_mutex_.
// This can be done by the write group leader if two-write-queues is
// enabled. It can also be done by another WAL-only write thread.
//
// Other observations:
// - back() and items with getting_synced=true are not popped, // - back() and items with getting_synced=true are not popped,
// - The same thread that sets getting_synced=true will reset it. // - The same thread that sets getting_synced=true will reset it.
// - it follows that the object referred by back() can be safely read from // - it follows that the object referred by back() can be safely read from
// the write_thread_ without using mutex // the write_thread_ without using mutex. Note that calling back() without
// mutex may be unsafe because different implementations of deque::back() may
// access other member variables of deque, causing undefined behaviors.
// Generally, do not access stl containers without proper synchronization.
// - it follows that the items with getting_synced=true can be safely read // - it follows that the items with getting_synced=true can be safely read
// from the same thread that has set getting_synced=true // from the same thread that has set getting_synced=true
std::deque<LogWriterNumber> logs_; std::deque<LogWriterNumber> logs_;
// Signaled when getting_synced becomes false for some of the logs_. // Signaled when getting_synced becomes false for some of the logs_.
InstrumentedCondVar log_sync_cv_; InstrumentedCondVar log_sync_cv_;
// This is the app-level state that is written to the WAL but will be used // This is the app-level state that is written to the WAL but will be used
@ -2356,7 +2435,7 @@ class DBImpl : public DB {
std::atomic<uint64_t> total_log_size_; std::atomic<uint64_t> total_log_size_;
// If this is non-empty, we need to delete these log files in background // If this is non-empty, we need to delete these log files in background
// threads. Protected by db mutex. // threads. Protected by log_write_mutex_.
autovector<log::Writer*> logs_to_free_; autovector<log::Writer*> logs_to_free_;
bool is_snapshot_supported_; bool is_snapshot_supported_;
@ -2436,10 +2515,13 @@ class DBImpl : public DB {
// JobContext. Current implementation tracks table and blob files only. // JobContext. Current implementation tracks table and blob files only.
std::unordered_set<uint64_t> files_grabbed_for_purge_; std::unordered_set<uint64_t> files_grabbed_for_purge_;
// A queue to store log writers to close // A queue to store log writers to close. Protected by db mutex_.
std::deque<log::Writer*> logs_to_free_queue_; std::deque<log::Writer*> logs_to_free_queue_;
std::deque<SuperVersion*> superversions_to_free_queue_; std::deque<SuperVersion*> superversions_to_free_queue_;
int unscheduled_flushes_; int unscheduled_flushes_;
int unscheduled_compactions_; int unscheduled_compactions_;
// count how many background compactions are running or have been scheduled in // count how many background compactions are running or have been scheduled in
@ -2592,6 +2674,7 @@ class DBImpl : public DB {
InstrumentedCondVar atomic_flush_install_cv_; InstrumentedCondVar atomic_flush_install_cv_;
bool wal_in_db_path_; bool wal_in_db_path_;
std::atomic<uint64_t> max_total_wal_size_;
BlobFileCompletionCallback blob_callback_; BlobFileCompletionCallback blob_callback_;

@ -82,9 +82,10 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
return false; return false;
} }
IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
VersionEdit* synced_wals) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
mutex_.AssertHeld(); InstrumentedMutexLock l(&log_write_mutex_);
autovector<log::Writer*, 1> logs_to_sync; autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_; uint64_t current_log_number = logfile_number_;
while (logs_.front().number < current_log_number && while (logs_.front().number < current_log_number &&
@ -100,7 +101,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
IOStatus io_s; IOStatus io_s;
if (!logs_to_sync.empty()) { if (!logs_to_sync.empty()) {
mutex_.Unlock(); log_write_mutex_.Unlock();
assert(job_context); assert(job_context);
@ -128,12 +129,12 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedLogs:BeforeReLock", TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedLogs:BeforeReLock",
/*arg=*/nullptr); /*arg=*/nullptr);
mutex_.Lock(); log_write_mutex_.Lock();
// "number <= current_log_number - 1" is equivalent to // "number <= current_log_number - 1" is equivalent to
// "number < current_log_number". // "number < current_log_number".
if (io_s.ok()) { if (io_s.ok()) {
io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true)); MarkLogsSynced(current_log_number - 1, true, synced_wals);
} else { } else {
MarkLogsNotSynced(current_log_number - 1); MarkLogsNotSynced(current_log_number - 1);
} }
@ -220,8 +221,16 @@ Status DBImpl::FlushMemTableToOutputFile(
bool need_cancel = false; bool need_cancel = false;
IOStatus log_io_s = IOStatus::OK(); IOStatus log_io_s = IOStatus::OK();
if (needs_to_sync_closed_wals) { if (needs_to_sync_closed_wals) {
// SyncClosedLogs() may unlock and re-lock the db_mutex. // SyncClosedLogs() may unlock and re-lock the log_write_mutex multiple
log_io_s = SyncClosedLogs(job_context); // times.
VersionEdit synced_wals;
mutex_.Unlock();
log_io_s = SyncClosedLogs(job_context, &synced_wals);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals));
}
if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
!log_io_s.IsColumnFamilyDropped()) { !log_io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush); error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
@ -474,7 +483,14 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (logfile_number_ > 0) { if (logfile_number_ > 0) {
// TODO (yanqin) investigate whether we should sync the closed logs for // TODO (yanqin) investigate whether we should sync the closed logs for
// single column family case. // single column family case.
log_io_s = SyncClosedLogs(job_context); VersionEdit synced_wals;
mutex_.Unlock();
log_io_s = SyncClosedLogs(job_context, &synced_wals);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals));
}
if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
!log_io_s.IsColumnFamilyDropped()) { !log_io_s.IsColumnFamilyDropped()) {
if (total_log_size_ > 0) { if (total_log_size_ > 0) {

@ -223,7 +223,7 @@ void DBImpl::TEST_EndWrite(void* w) {
} }
size_t DBImpl::TEST_LogsToFreeSize() { size_t DBImpl::TEST_LogsToFreeSize() {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&log_write_mutex_);
return logs_to_free_.size(); return logs_to_free_.size();
} }

@ -271,6 +271,15 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// logs_ is empty when called during recovery, in which case there can't yet // logs_ is empty when called during recovery, in which case there can't yet
// be any tracked obsolete logs // be any tracked obsolete logs
log_write_mutex_.Lock();
if (alive_log_files_.empty() || logs_.empty()) {
mutex_.AssertHeld();
// We may reach here if the db is DBImplSecondary
log_write_mutex_.Unlock();
return;
}
if (!alive_log_files_.empty() && !logs_.empty()) { if (!alive_log_files_.empty() && !logs_.empty()) {
uint64_t min_log_number = job_context->log_number; uint64_t min_log_number = job_context->log_number;
size_t num_alive_log_files = alive_log_files_.size(); size_t num_alive_log_files = alive_log_files_.size();
@ -292,17 +301,15 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
} }
job_context->size_log_to_delete += earliest.size; job_context->size_log_to_delete += earliest.size;
total_log_size_ -= earliest.size; total_log_size_ -= earliest.size;
if (two_write_queues_) {
log_write_mutex_.Lock();
}
alive_log_files_.pop_front(); alive_log_files_.pop_front();
if (two_write_queues_) {
log_write_mutex_.Unlock();
}
// Current log should always stay alive since it can't have // Current log should always stay alive since it can't have
// number < MinLogNumber(). // number < MinLogNumber().
assert(alive_log_files_.size()); assert(alive_log_files_.size());
} }
log_write_mutex_.Unlock();
mutex_.Unlock();
log_write_mutex_.Lock();
while (!logs_.empty() && logs_.front().number < min_log_number) { while (!logs_.empty() && logs_.front().number < min_log_number) {
auto& log = logs_.front(); auto& log = logs_.front();
if (log.IsSyncing()) { if (log.IsSyncing()) {
@ -311,10 +318,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
continue; continue;
} }
logs_to_free_.push_back(log.ReleaseWriter()); logs_to_free_.push_back(log.ReleaseWriter());
{ logs_.pop_front();
InstrumentedMutexLock wl(&log_write_mutex_);
logs_.pop_front();
}
} }
// Current log cannot be obsolete. // Current log cannot be obsolete.
assert(!logs_.empty()); assert(!logs_.empty());
@ -323,23 +327,13 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// We're just cleaning up for DB::Write(). // We're just cleaning up for DB::Write().
assert(job_context->logs_to_free.empty()); assert(job_context->logs_to_free.empty());
job_context->logs_to_free = logs_to_free_; job_context->logs_to_free = logs_to_free_;
logs_to_free_.clear();
log_write_mutex_.Unlock();
mutex_.Lock();
job_context->log_recycle_files.assign(log_recycle_files_.begin(), job_context->log_recycle_files.assign(log_recycle_files_.begin(),
log_recycle_files_.end()); log_recycle_files_.end());
logs_to_free_.clear();
}
namespace {
bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
const JobContext::CandidateFileInfo& second) {
if (first.file_name > second.file_name) {
return true;
} else if (first.file_name < second.file_name) {
return false;
} else {
return (first.file_path > second.file_path);
}
} }
} // namespace
// Delete obsolete files and log status and information of file deletion // Delete obsolete files and log status and information of file deletion
void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
@ -445,7 +439,16 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// dedup state.candidate_files so we don't try to delete the same // dedup state.candidate_files so we don't try to delete the same
// file twice // file twice
std::sort(candidate_files.begin(), candidate_files.end(), std::sort(candidate_files.begin(), candidate_files.end(),
CompareCandidateFile); [](const JobContext::CandidateFileInfo& lhs,
const JobContext::CandidateFileInfo& rhs) {
if (lhs.file_name > rhs.file_name) {
return true;
} else if (lhs.file_name < rhs.file_name) {
return false;
} else {
return (lhs.file_path > rhs.file_path);
}
});
candidate_files.erase( candidate_files.erase(
std::unique(candidate_files.begin(), candidate_files.end()), std::unique(candidate_files.begin(), candidate_files.end()),
candidate_files.end()); candidate_files.end());

@ -1459,9 +1459,6 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
Status s; Status s;
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(immutable_db_options_.avoid_flush_during_recovery); assert(immutable_db_options_.avoid_flush_during_recovery);
if (two_write_queues_) {
log_write_mutex_.Lock();
}
// Mark these as alive so they'll be considered for deletion later by // Mark these as alive so they'll be considered for deletion later by
// FindObsoleteFiles() // FindObsoleteFiles()
total_log_size_ = 0; total_log_size_ = 0;
@ -1486,9 +1483,6 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
total_log_size_ += log.size; total_log_size_ += log.size;
alive_log_files_.push_back(log); alive_log_files_.push_back(log);
} }
if (two_write_queues_) {
log_write_mutex_.Unlock();
}
return s; return s;
} }
@ -1871,16 +1865,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
} }
if (s.ok()) { if (s.ok()) {
if (impl->two_write_queues_) {
impl->log_write_mutex_.Lock();
}
impl->alive_log_files_.push_back( impl->alive_log_files_.push_back(
DBImpl::LogFileNumberSize(impl->logfile_number_)); DBImpl::LogFileNumberSize(impl->logfile_number_));
if (impl->two_write_queues_) {
impl->log_write_mutex_.Unlock();
}
}
if (s.ok()) {
// In WritePrepared there could be gap in sequence numbers. This breaks // In WritePrepared there could be gap in sequence numbers. This breaks
// the trick we use in kPointInTimeRecovery which assumes the first seq in // the trick we use in kPointInTimeRecovery which assumes the first seq in
// the log right after the corrupted log is one larger than the last seq // the log right after the corrupted log is one larger than the last seq

@ -349,14 +349,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// when it finds suitable, and finish them in the same write batch. // when it finds suitable, and finish them in the same write batch.
// This is how a write job could be done by the other writer. // This is how a write job could be done by the other writer.
WriteContext write_context; WriteContext write_context;
LogContext log_context(write_options.sync);
WriteThread::WriteGroup write_group; WriteThread::WriteGroup write_group;
bool in_parallel_group = false; bool in_parallel_group = false;
uint64_t last_sequence = kMaxSequenceNumber; uint64_t last_sequence = kMaxSequenceNumber;
mutex_.Lock();
bool need_log_sync = write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
assert(!two_write_queues_ || !disable_memtable); assert(!two_write_queues_ || !disable_memtable);
{ {
// With concurrent writes we do preprocess only in the write thread that // With concurrent writes we do preprocess only in the write thread that
@ -366,7 +363,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// PreprocessWrite does its own perf timing. // PreprocessWrite does its own perf timing.
PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_STOP(write_pre_and_post_process_time);
status = PreprocessWrite(write_options, &need_log_sync, &write_context); status = PreprocessWrite(write_options, &log_context, &write_context);
if (!two_write_queues_) { if (!two_write_queues_) {
// Assign it after ::PreprocessWrite since the sequence might advance // Assign it after ::PreprocessWrite since the sequence might advance
// inside it by WriteRecoverableState // inside it by WriteRecoverableState
@ -376,13 +373,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_START(write_pre_and_post_process_time); PERF_TIMER_START(write_pre_and_post_process_time);
} }
log::Writer* log_writer = logs_.back().writer;
LogFileNumberSize& log_file_number_size = alive_log_files_.back();
assert(log_writer->get_log_number() == log_file_number_size.number);
mutex_.Unlock();
// Add to log and apply to memtable. We can release the lock // Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging // during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes // and protects against concurrent loggers and concurrent writes
@ -477,10 +467,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (!two_write_queues_) { if (!two_write_queues_) {
if (status.ok() && !write_options.disableWAL) { if (status.ok() && !write_options.disableWAL) {
assert(log_context.log_file_number_size);
LogFileNumberSize& log_file_number_size =
*(log_context.log_file_number_size);
PERF_TIMER_GUARD(write_wal_time); PERF_TIMER_GUARD(write_wal_time);
io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync, io_s =
need_log_dir_sync, last_sequence + 1, WriteToWAL(write_group, log_context.writer, log_used,
log_file_number_size); log_context.need_log_sync, log_context.need_log_dir_sync,
last_sequence + 1, log_file_number_size);
} }
} else { } else {
if (status.ok() && !write_options.disableWAL) { if (status.ok() && !write_options.disableWAL) {
@ -582,14 +576,21 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
assert(pre_release_cb_status.ok()); assert(pre_release_cb_status.ok());
} }
if (need_log_sync) { if (log_context.need_log_sync) {
mutex_.Lock(); VersionEdit synced_wals;
log_write_mutex_.Lock();
if (status.ok()) { if (status.ok()) {
status = MarkLogsSynced(logfile_number_, need_log_dir_sync); MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync,
&synced_wals);
} else { } else {
MarkLogsNotSynced(logfile_number_); MarkLogsNotSynced(logfile_number_);
} }
mutex_.Unlock(); log_write_mutex_.Unlock();
if (status.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
status = ApplyWALToManifest(&synced_wals);
}
// Requesting sync with two_write_queues_ is expected to be very rare. We // Requesting sync with two_write_queues_ is expected to be very rare. We
// hence provide a simple implementation that is not necessarily efficient. // hence provide a simple implementation that is not necessarily efficient.
if (two_write_queues_) { if (two_write_queues_) {
@ -652,19 +653,11 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
if (w.callback && !w.callback->AllowWriteBatching()) { if (w.callback && !w.callback->AllowWriteBatching()) {
write_thread_.WaitForMemTableWriters(); write_thread_.WaitForMemTableWriters();
} }
mutex_.Lock(); LogContext log_context(!write_options.disableWAL && write_options.sync);
bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
// PreprocessWrite does its own perf timing. // PreprocessWrite does its own perf timing.
PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_STOP(write_pre_and_post_process_time);
w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); w.status = PreprocessWrite(write_options, &log_context, &write_context);
PERF_TIMER_START(write_pre_and_post_process_time); PERF_TIMER_START(write_pre_and_post_process_time);
log::Writer* log_writer = logs_.back().writer;
LogFileNumberSize& log_file_number_size = alive_log_files_.back();
assert(log_writer->get_log_number() == log_file_number_size.number);
mutex_.Unlock();
// This can set non-OK status if callback fail. // This can set non-OK status if callback fail.
last_batch_group_size_ = last_batch_group_size_ =
@ -727,9 +720,13 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
wal_write_group.size - 1); wal_write_group.size - 1);
RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
} }
assert(log_context.log_file_number_size);
LogFileNumberSize& log_file_number_size =
*(log_context.log_file_number_size);
io_s = io_s =
WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync, WriteToWAL(wal_write_group, log_context.writer, log_used,
need_log_dir_sync, current_sequence, log_file_number_size); log_context.need_log_sync, log_context.need_log_dir_sync,
current_sequence, log_file_number_size);
w.status = io_s; w.status = io_s;
} }
@ -740,16 +737,20 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteStatusCheck(w.status); WriteStatusCheck(w.status);
} }
if (need_log_sync) { VersionEdit synced_wals;
mutex_.Lock(); if (log_context.need_log_sync) {
InstrumentedMutexLock l(&log_write_mutex_);
if (w.status.ok()) { if (w.status.ok()) {
w.status = MarkLogsSynced(logfile_number_, need_log_dir_sync); MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync,
&synced_wals);
} else { } else {
MarkLogsNotSynced(logfile_number_); MarkLogsNotSynced(logfile_number_);
} }
mutex_.Unlock();
} }
if (w.status.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
w.status = ApplyWALToManifest(&synced_wals);
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
} }
@ -893,9 +894,8 @@ Status DBImpl::WriteImplWALOnly(
// TODO(myabandeh): Make preliminary checks thread-safe so we could do them // TODO(myabandeh): Make preliminary checks thread-safe so we could do them
// without paying the cost of obtaining the mutex. // without paying the cost of obtaining the mutex.
if (status.ok()) { if (status.ok()) {
InstrumentedMutexLock l(&mutex_); LogContext log_context;
bool need_log_sync = false; status = PreprocessWrite(write_options, &log_context, &write_context);
status = PreprocessWrite(write_options, &need_log_sync, &write_context);
WriteStatusCheckOnLocked(status); WriteStatusCheckOnLocked(status);
} }
if (!status.ok()) { if (!status.ok()) {
@ -1057,9 +1057,8 @@ Status DBImpl::WriteImplWALOnly(
void DBImpl::WriteStatusCheckOnLocked(const Status& status) { void DBImpl::WriteStatusCheckOnLocked(const Status& status) {
// Is setting bg_error_ enough here? This will at least stop // Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes. // compaction and fail any further writes.
// Caller must hold mutex_. InstrumentedMutexLock l(&mutex_);
assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok()); assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
mutex_.AssertHeld();
if (immutable_db_options_.paranoid_checks && !status.ok() && if (immutable_db_options_.paranoid_checks && !status.ok() &&
!status.IsBusy() && !status.IsIncomplete()) { !status.IsBusy() && !status.IsIncomplete()) {
// Maybe change the return status to void? // Maybe change the return status to void?
@ -1110,13 +1109,13 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) {
} }
Status DBImpl::PreprocessWrite(const WriteOptions& write_options, Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
bool* need_log_sync, LogContext* log_context,
WriteContext* write_context) { WriteContext* write_context) {
mutex_.AssertHeld(); assert(write_context != nullptr && log_context != nullptr);
assert(write_context != nullptr && need_log_sync != nullptr);
Status status; Status status;
if (error_handler_.IsDBStopped()) { if (error_handler_.IsDBStopped()) {
InstrumentedMutexLock l(&mutex_);
status = error_handler_.GetBGError(); status = error_handler_.GetBGError();
} }
@ -1124,11 +1123,11 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
if (UNLIKELY(status.ok() && total_log_size_ > GetMaxTotalWalSize())) { if (UNLIKELY(status.ok() && total_log_size_ > GetMaxTotalWalSize())) {
assert(versions_); assert(versions_);
InstrumentedMutexLock l(&mutex_);
const ColumnFamilySet* const column_families = const ColumnFamilySet* const column_families =
versions_->GetColumnFamilySet(); versions_->GetColumnFamilySet();
assert(column_families); assert(column_families);
size_t num_cfs = column_families->NumberOfColumnFamilies(); size_t num_cfs = column_families->NumberOfColumnFamilies();
assert(num_cfs >= 1); assert(num_cfs >= 1);
if (num_cfs > 1) { if (num_cfs > 1) {
WaitForPendingWrites(); WaitForPendingWrites();
@ -1142,15 +1141,18 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
// thread is writing to another DB with the same write buffer, they may also // thread is writing to another DB with the same write buffer, they may also
// be flushed. We may end up with flushing much more DBs than needed. It's // be flushed. We may end up with flushing much more DBs than needed. It's
// suboptimal but still correct. // suboptimal but still correct.
InstrumentedMutexLock l(&mutex_);
WaitForPendingWrites(); WaitForPendingWrites();
status = HandleWriteBufferManagerFlush(write_context); status = HandleWriteBufferManagerFlush(write_context);
} }
if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) { if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) {
InstrumentedMutexLock l(&mutex_);
status = TrimMemtableHistory(write_context); status = TrimMemtableHistory(write_context);
} }
if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
InstrumentedMutexLock l(&mutex_);
WaitForPendingWrites(); WaitForPendingWrites();
status = ScheduleFlushes(write_context); status = ScheduleFlushes(write_context);
} }
@ -1166,6 +1168,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
// for previous one. It might create a fairness issue that expiration // for previous one. It might create a fairness issue that expiration
// might happen for smaller writes but larger writes can go through. // might happen for smaller writes but larger writes can go through.
// Can optimize it if it is an issue. // Can optimize it if it is an issue.
InstrumentedMutexLock l(&mutex_);
status = DelayWrite(last_batch_group_size_, write_options); status = DelayWrite(last_batch_group_size_, write_options);
PERF_TIMER_START(write_pre_and_post_process_time); PERF_TIMER_START(write_pre_and_post_process_time);
} }
@ -1180,11 +1183,12 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
if (write_options.no_slowdown) { if (write_options.no_slowdown) {
status = Status::Incomplete("Write stall"); status = Status::Incomplete("Write stall");
} else { } else {
InstrumentedMutexLock l(&mutex_);
WriteBufferManagerStallWrites(); WriteBufferManagerStallWrites();
} }
} }
InstrumentedMutexLock l(&log_write_mutex_);
if (status.ok() && *need_log_sync) { if (status.ok() && log_context->need_log_sync) {
// Wait until the parallel syncs are finished. Any sync process has to sync // Wait until the parallel syncs are finished. Any sync process has to sync
// the front log too so it is enough to check the status of front() // the front log too so it is enough to check the status of front()
// We do a while loop since log_sync_cv_ is signalled when any sync is // We do a while loop since log_sync_cv_ is signalled when any sync is
@ -1204,8 +1208,12 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
log.PrepareForSync(); log.PrepareForSync();
} }
} else { } else {
*need_log_sync = false; log_context->need_log_sync = false;
} }
log_context->writer = logs_.back().writer;
log_context->need_log_dir_sync =
log_context->need_log_dir_sync && !log_dir_synced_;
log_context->log_file_number_size = std::addressof(alive_log_files_.back());
return status; return status;
} }
@ -1714,10 +1722,12 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
} }
uint64_t DBImpl::GetMaxTotalWalSize() const { uint64_t DBImpl::GetMaxTotalWalSize() const {
mutex_.AssertHeld(); uint64_t max_total_wal_size =
return mutable_db_options_.max_total_wal_size == 0 max_total_wal_size_.load(std::memory_order_acquire);
? 4 * max_total_in_memory_state_ if (max_total_wal_size > 0) {
: mutable_db_options_.max_total_wal_size; return max_total_wal_size;
}
return 4 * max_total_in_memory_state_.load(std::memory_order_acquire);
} }
// REQUIRES: mutex_ is held // REQUIRES: mutex_ is held
@ -2065,7 +2075,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
log_recycle_files_.pop_front(); log_recycle_files_.pop_front();
} }
if (s.ok() && creating_new_log) { if (s.ok() && creating_new_log) {
log_write_mutex_.Lock(); InstrumentedMutexLock l(&log_write_mutex_);
assert(new_log != nullptr); assert(new_log != nullptr);
if (!logs_.empty()) { if (!logs_.empty()) {
// Alway flush the buffer of the last log before switching to a new one // Alway flush the buffer of the last log before switching to a new one
@ -2089,7 +2099,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
logs_.emplace_back(logfile_number_, new_log); logs_.emplace_back(logfile_number_, new_log);
alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
} }
log_write_mutex_.Unlock();
} }
if (!s.ok()) { if (!s.ok()) {

@ -358,6 +358,9 @@ const Status& ErrorHandler::HandleKnownErrors(const Status& bg_err,
RecoverFromNoSpace(); RecoverFromNoSpace();
} }
} }
if (bg_error_.severity() >= Status::Severity::kHardError) {
is_db_stopped_.store(true, std::memory_order_release);
}
return bg_error_; return bg_error_;
} }
@ -736,6 +739,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() {
// the bg_error and notify user. // the bg_error and notify user.
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:RecoverSuccess"); TEST_SYNC_POINT("RecoverFromRetryableBGIOError:RecoverSuccess");
Status old_bg_error = bg_error_; Status old_bg_error = bg_error_;
is_db_stopped_.store(false, std::memory_order_release);
bg_error_ = Status::OK(); bg_error_ = Status::OK();
bg_error_.PermitUncheckedError(); bg_error_.PermitUncheckedError();
EventHelpers::NotifyOnErrorRecoveryEnd( EventHelpers::NotifyOnErrorRecoveryEnd(
@ -792,6 +796,9 @@ void ErrorHandler::CheckAndSetRecoveryAndBGError(const Status& bg_err) {
if (bg_err.severity() > bg_error_.severity()) { if (bg_err.severity() > bg_error_.severity()) {
bg_error_ = bg_err; bg_error_ = bg_err;
} }
if (bg_error_.severity() >= Status::Severity::kHardError) {
is_db_stopped_.store(true, std::memory_order_release);
}
return; return;
} }

@ -38,6 +38,7 @@ class ErrorHandler {
auto_recovery_(false), auto_recovery_(false),
recovery_in_prog_(false), recovery_in_prog_(false),
soft_error_no_bg_work_(false), soft_error_no_bg_work_(false),
is_db_stopped_(false),
bg_error_stats_(db_options.statistics) { bg_error_stats_(db_options.statistics) {
// Clear the checked flag for uninitialized errors // Clear the checked flag for uninitialized errors
bg_error_.PermitUncheckedError(); bg_error_.PermitUncheckedError();
@ -59,16 +60,15 @@ class ErrorHandler {
Status ClearBGError(); Status ClearBGError();
bool IsDBStopped() { bool IsDBStopped() { return is_db_stopped_.load(std::memory_order_acquire); }
return !bg_error_.ok() &&
bg_error_.severity() >= Status::Severity::kHardError;
}
bool IsBGWorkStopped() { bool IsBGWorkStopped() {
return !bg_error_.ok() && assert(db_mutex_);
(bg_error_.severity() >= Status::Severity::kHardError || db_mutex_->AssertHeld();
!auto_recovery_ || soft_error_no_bg_work_); return !bg_error_.ok() &&
} (bg_error_.severity() >= Status::Severity::kHardError ||
!auto_recovery_ || soft_error_no_bg_work_);
}
bool IsSoftErrorNoBGWork() { return soft_error_no_bg_work_; } bool IsSoftErrorNoBGWork() { return soft_error_no_bg_work_; }
@ -105,6 +105,7 @@ class ErrorHandler {
// Used to store the context for recover, such as flush reason. // Used to store the context for recover, such as flush reason.
DBRecoverContext recover_context_; DBRecoverContext recover_context_;
std::atomic<bool> is_db_stopped_;
// The pointer of DB statistics. // The pointer of DB statistics.
std::shared_ptr<Statistics> bg_error_stats_; std::shared_ptr<Statistics> bg_error_stats_;

@ -390,7 +390,7 @@ void ProfileQueries(bool enabled_time = false) {
EXPECT_GT(hist_write_scheduling_time.Average(), 0); EXPECT_GT(hist_write_scheduling_time.Average(), 0);
#ifndef NDEBUG #ifndef NDEBUG
ASSERT_GT(total_db_mutex_nanos, 2000U); ASSERT_LT(total_db_mutex_nanos, 100U);
#endif #endif
} }

@ -1978,6 +1978,7 @@ TEST_F(VersionSetTest, WalCreateAfterClose) {
TEST_F(VersionSetTest, AddWalWithSmallerSize) { TEST_F(VersionSetTest, AddWalWithSmallerSize) {
NewDB(); NewDB();
assert(versions_);
constexpr WalNumber kLogNumber = 10; constexpr WalNumber kLogNumber = 10;
constexpr uint64_t kSizeInBytes = 111; constexpr uint64_t kSizeInBytes = 111;
@ -1990,6 +1991,9 @@ TEST_F(VersionSetTest, AddWalWithSmallerSize) {
ASSERT_OK(LogAndApplyToDefaultCF(edit)); ASSERT_OK(LogAndApplyToDefaultCF(edit));
} }
// Copy for future comparison.
const std::map<WalNumber, WalMetadata> wals1 =
versions_->GetWalSet().GetWals();
{ {
// Add the same WAL with smaller synced size. // Add the same WAL with smaller synced size.
@ -1998,13 +2002,11 @@ TEST_F(VersionSetTest, AddWalWithSmallerSize) {
edit.AddWal(kLogNumber, wal); edit.AddWal(kLogNumber, wal);
Status s = LogAndApplyToDefaultCF(edit); Status s = LogAndApplyToDefaultCF(edit);
ASSERT_TRUE(s.IsCorruption()); ASSERT_OK(s);
ASSERT_TRUE(
s.ToString().find(
"WAL 10 must not have smaller synced size than previous one") !=
std::string::npos)
<< s.ToString();
} }
const std::map<WalNumber, WalMetadata> wals2 =
versions_->GetWalSet().GetWals();
ASSERT_EQ(wals1, wals2);
} }
TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) { TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) {

@ -112,26 +112,33 @@ Status WalSet::AddWal(const WalAddition& wal) {
auto it = wals_.lower_bound(wal.GetLogNumber()); auto it = wals_.lower_bound(wal.GetLogNumber());
bool existing = it != wals_.end() && it->first == wal.GetLogNumber(); bool existing = it != wals_.end() && it->first == wal.GetLogNumber();
if (existing && !wal.GetMetadata().HasSyncedSize()) {
std::stringstream ss; if (!existing) {
ss << "WAL " << wal.GetLogNumber() << " is created more than once"; wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()});
return Status::Corruption("WalSet::AddWal", ss.str()); return Status::OK();
} }
// If the WAL has synced size, it must >= the previous size.
if (wal.GetMetadata().HasSyncedSize() && existing && assert(existing);
it->second.HasSyncedSize() && if (!wal.GetMetadata().HasSyncedSize()) {
wal.GetMetadata().GetSyncedSizeInBytes() <
it->second.GetSyncedSizeInBytes()) {
std::stringstream ss; std::stringstream ss;
ss << "WAL " << wal.GetLogNumber() ss << "WAL " << wal.GetLogNumber() << " is created more than once";
<< " must not have smaller synced size than previous one";
return Status::Corruption("WalSet::AddWal", ss.str()); return Status::Corruption("WalSet::AddWal", ss.str());
} }
if (existing) {
it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes()); assert(wal.GetMetadata().HasSyncedSize());
} else { if (it->second.HasSyncedSize() && wal.GetMetadata().GetSyncedSizeInBytes() <=
wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()}); it->second.GetSyncedSizeInBytes()) {
// This is possible because version edits with different synced WAL sizes
// for the same WAL can be committed out-of-order. For example, thread
// 1 synces the first 10 bytes of 1.log, while thread 2 synces the first 20
// bytes of 1.log. It's possible that thread 1 calls LogAndApply() after
// thread 2.
// In this case, just return ok.
return Status::OK();
} }
// Update synced size for the given WAL.
it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes());
return Status::OK(); return Status::OK();
} }

@ -42,6 +42,8 @@ class WalMetadata {
uint64_t GetSyncedSizeInBytes() const { return synced_size_bytes_; } uint64_t GetSyncedSizeInBytes() const { return synced_size_bytes_; }
private: private:
friend bool operator==(const WalMetadata& lhs, const WalMetadata& rhs);
friend bool operator!=(const WalMetadata& lhs, const WalMetadata& rhs);
// The size of WAL is unknown, used when the WAL is not synced yet or is // The size of WAL is unknown, used when the WAL is not synced yet or is
// empty. // empty.
constexpr static uint64_t kUnknownWalSize = constexpr static uint64_t kUnknownWalSize =
@ -51,6 +53,14 @@ class WalMetadata {
uint64_t synced_size_bytes_ = kUnknownWalSize; uint64_t synced_size_bytes_ = kUnknownWalSize;
}; };
inline bool operator==(const WalMetadata& lhs, const WalMetadata& rhs) {
return lhs.synced_size_bytes_ == rhs.synced_size_bytes_;
}
inline bool operator!=(const WalMetadata& lhs, const WalMetadata& rhs) {
return !(lhs == rhs);
}
// These tags are persisted to MANIFEST, so it's part of the user API. // These tags are persisted to MANIFEST, so it's part of the user API.
enum class WalAdditionTag : uint32_t { enum class WalAdditionTag : uint32_t {
// Indicates that there are no more tags. // Indicates that there are no more tags.

@ -54,12 +54,11 @@ TEST(WalSet, SmallerSyncedSize) {
constexpr uint64_t kBytes = 100; constexpr uint64_t kBytes = 100;
WalSet wals; WalSet wals;
ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes)))); ASSERT_OK(wals.AddWal(WalAddition(kNumber, WalMetadata(kBytes))));
const auto wals1 = wals.GetWals();
Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(0))); Status s = wals.AddWal(WalAddition(kNumber, WalMetadata(0)));
ASSERT_TRUE(s.IsCorruption()); const auto wals2 = wals.GetWals();
ASSERT_TRUE( ASSERT_OK(s);
s.ToString().find( ASSERT_EQ(wals1, wals2);
"WAL 100 must not have smaller synced size than previous one") !=
std::string::npos);
} }
TEST(WalSet, CreateTwice) { TEST(WalSet, CreateTwice) {

@ -1115,7 +1115,8 @@ TEST_P(BackupEngineTestWithParam, OfflineIntegrationTest) {
destroy_data = false; destroy_data = false;
// kAutoFlushOnly to preserve legacy test behavior (consider updating) // kAutoFlushOnly to preserve legacy test behavior (consider updating)
FillDB(db_.get(), keys_iteration * i, fill_up_to, kAutoFlushOnly); FillDB(db_.get(), keys_iteration * i, fill_up_to, kAutoFlushOnly);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), iter == 0)); ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), iter == 0))
<< "iter: " << iter << ", idx: " << i;
CloseDBAndBackupEngine(); CloseDBAndBackupEngine();
DestroyDB(dbname_, options_); DestroyDB(dbname_, options_);

Loading…
Cancel
Save