diff --git a/HISTORY.md b/HISTORY.md index e904aee00..44e408485 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ ### Bug Fixes * Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob. +* Fix WAL corruption caused by race condition between user write thread and FlushWAL when two_write_queue is not set. ### Java API Changes * Add `BlockBasedTableConfig.setBlockCache` to allow sharing a block cache across DB instances. diff --git a/db/db_impl.cc b/db/db_impl.cc index bab63ac90..f7ba90f52 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -693,7 +693,7 @@ int DBImpl::FindMinimumEmptyLevelFitting( } Status DBImpl::FlushWAL(bool sync) { - { + if (manual_wal_flush_) { // We need to lock log_write_mutex_ since logs_ might change concurrently InstrumentedMutexLock wl(&log_write_mutex_); log::Writer* cur_log_writer = logs_.back().writer; @@ -707,6 +707,9 @@ Status DBImpl::FlushWAL(bool sync) { return s; } } + if (!sync) { + return Status::OK(); + } // sync = true ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true"); return SyncWAL(); diff --git a/db/db_impl.h b/db/db_impl.h index aa97245ea..33e44bf4d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -737,6 +737,7 @@ class DBImpl : public DB { #endif friend struct SuperVersion; friend class CompactedDBImpl; + friend class DBTest_ConcurrentFlushWAL_Test; #ifndef NDEBUG friend class DBTest2_ReadCallbackTest_Test; friend class WriteCallbackTest_WriteWithCallbackTest_Test; diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 0c3085b5e..b4b92567a 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -808,7 +808,21 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, assert(log_size != nullptr); Slice log_entry = WriteBatchInternal::Contents(&merged_batch); *log_size = log_entry.size(); + // When two_write_queues_ WriteToWAL has to be protected from concurretn calls + // from the two queues anyway and log_write_mutex_ is already held. Otherwise + // if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord + // from possible concurrent calls via the FlushWAL by the application. + const bool needs_locking = manual_wal_flush_ && !two_write_queues_; + // Due to performance cocerns of missed branch prediction penalize the new + // manual_wal_flush_ feature (by UNLIKELY) instead of the more common case + // when we do not need any locking. + if (UNLIKELY(needs_locking)) { + log_write_mutex_.Lock(); + } Status status = log_writer->AddRecord(log_entry); + if (UNLIKELY(needs_locking)) { + log_write_mutex_.Unlock(); + } if (log_used != nullptr) { *log_used = logfile_number_; } diff --git a/db/db_test.cc b/db/db_test.cc index 9ee5f54db..119883a28 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3348,6 +3348,56 @@ TEST_F(DBTest, WriteSingleThreadEntry) { } } +TEST_F(DBTest, ConcurrentFlushWAL) { + const size_t cnt = 100; + Options options; + WriteOptions wopt; + ReadOptions ropt; + for (bool two_write_queues : {false, true}) { + for (bool manual_wal_flush : {false, true}) { + options.two_write_queues = two_write_queues; + options.manual_wal_flush = manual_wal_flush; + options.create_if_missing = true; + DestroyAndReopen(options); + std::vector threads; + threads.emplace_back([&] { + for (size_t i = 0; i < cnt; i++) { + auto istr = ToString(i); + db_->Put(wopt, db_->DefaultColumnFamily(), "a" + istr, "b" + istr); + } + }); + if (two_write_queues) { + threads.emplace_back([&] { + for (size_t i = cnt; i < 2 * cnt; i++) { + auto istr = ToString(i); + WriteBatch batch; + batch.Put("a" + istr, "b" + istr); + dbfull()->WriteImpl(wopt, &batch, nullptr, nullptr, 0, true); + } + }); + } + threads.emplace_back([&] { + for (size_t i = 0; i < cnt * 100; i++) { // FlushWAL is faster than Put + db_->FlushWAL(false); + } + }); + for (auto& t : threads) { + t.join(); + } + options.create_if_missing = false; + // Recover from the wal and make sure that it is not corrupted + Reopen(options); + for (size_t i = 0; i < cnt; i++) { + PinnableSlice pval; + auto istr = ToString(i); + ASSERT_OK( + db_->Get(ropt, db_->DefaultColumnFamily(), "a" + istr, &pval)); + ASSERT_TRUE(pval == ("b" + istr)); + } + } + } +} + #ifndef ROCKSDB_LITE TEST_F(DBTest, DynamicMemtableOptions) { const uint64_t k64KB = 1 << 16; diff --git a/utilities/checkpoint/checkpoint_impl.cc b/utilities/checkpoint/checkpoint_impl.cc index 9b0568c22..9135dbabf 100644 --- a/utilities/checkpoint/checkpoint_impl.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -222,9 +222,7 @@ Status CheckpointImpl::CreateCustomCheckpoint( TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1"); TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"); - if (db_options.manual_wal_flush) { - db_->FlushWAL(false /* sync */); - } + db_->FlushWAL(false /* sync */); } // if we have more than one column family, we need to also get WAL files if (s.ok()) {