From d739de63e5be4c14d2bd6702f572cdb46ee24579 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 3 Jun 2022 16:33:00 -0700 Subject: [PATCH] Fix a bug in WAL tracking (#10087) Summary: Closing https://github.com/facebook/rocksdb/issues/10080 When `SyncWAL()` calls `MarkLogsSynced()`, even if there is only one active WAL file, this event should still be added to the MANIFEST. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10087 Test Plan: make check Reviewed By: ajkr Differential Revision: D36797580 Pulled By: riversand963 fbshipit-source-id: 24184c9dd606b3939a454ed41de6e868d1519999 --- HISTORY.md | 7 ++++--- db/db_basic_test.cc | 21 +++++++++++++++++++++ db/db_impl/db_impl.cc | 11 ++++++----- file/writable_file_writer.cc | 16 ++++++++++------ file/writable_file_writer.h | 6 ++++-- 5 files changed, 45 insertions(+), 16 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 2d845b019..8caf7ce03 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,10 @@ * Fixed a bug where a snapshot taken during SST file ingestion would be unstable. * Fixed a bug for non-TransactionDB with avoid_flush_during_recovery = true and TransactionDB where in case of crash, min_log_number_to_keep may not change on recovery and persisting a new MANIFEST with advanced log_numbers for some column families, results in "column family inconsistency" error on second recovery. As a solution, RocksDB will persist the new MANIFEST after successfully syncing the new WAL. If a future recovery starts from the new MANIFEST, then it means the new WAL is successfully synced. Due to the sentinel empty write batch at the beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point. If future recovery starts from the old MANIFEST, it means the writing the new MANIFEST failed. We won't have the "SST ahead of WAL" error. * Fixed a bug where RocksDB DB::Open() may creates and writes to two new MANIFEST files even before recovery succeeds. Now writes to MANIFEST are persisted only after recovery is successful. +* Fix a race condition in WAL size tracking which is caused by an unsafe iterator access after container is changed. +* Fix unprotected concurrent accesses to `WritableFileWriter::filesize_` by `DB::SyncWAL()` and `DB::Put()` in two write queue mode. +* Fix a bug in WAL tracking. Before this PR (#10087), calling `SyncWAL()` on the only WAL file of the db will not log the event in MANIFEST, thus allowing a subsequent `DB::Open` even if the WAL file is missing or corrupted. + ### Public API changes * Add new API GetUnixTime in Snapshot class which returns the unix time at which Snapshot is taken. @@ -27,9 +31,6 @@ ### Behavior changes * DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984) -### Bug Fixes -* Fix a race condition in WAL size tracking which is caused by an unsafe iterator access after container is changed. - ## 7.3.0 (05/20/2022) ### Bug Fixes * Fixed a bug where manual flush would block forever even though flush options had wait=false. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index f13fe0df5..b41f48686 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -4151,6 +4151,27 @@ TEST_F(DBBasicTest, VerifyFileChecksums) { Reopen(options); ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument()); } + +TEST_F(DBBasicTest, ManualWalSync) { + Options options = CurrentOptions(); + options.track_and_verify_wals_in_manifest = true; + options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + DestroyAndReopen(options); + + ASSERT_OK(Put("x", "y")); + // This does not create a new WAL. + ASSERT_OK(db_->SyncWAL()); + EXPECT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty()); + + std::unique_ptr wal; + Status s = db_->GetCurrentWalFile(&wal); + ASSERT_OK(s); + Close(); + + EXPECT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber()))); + + ASSERT_TRUE(TryReopen(options).IsCorruption()); +} #endif // !ROCKSDB_LITE // A test class for intercepting random reads and injecting artificial diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 42543a671..f82f20f33 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1446,12 +1446,13 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) { for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { auto& wal = *it; assert(wal.getting_synced); + if (immutable_db_options_.track_and_verify_wals_in_manifest && + wal.writer->file()->GetFileSize() > 0) { + synced_wals.AddWal(wal.number, + WalMetadata(wal.writer->file()->GetFileSize())); + } + if (logs_.size() > 1) { - if (immutable_db_options_.track_and_verify_wals_in_manifest && - wal.writer->file()->GetFileSize() > 0) { - synced_wals.AddWal(wal.number, - WalMetadata(wal.writer->file()->GetFileSize())); - } logs_to_free_.push_back(wal.ReleaseWriter()); // To modify logs_ both mutex_ and log_write_mutex_ must be held InstrumentedMutexLock l(&log_write_mutex_); diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index e56742f7f..ce1b7bc3d 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -163,7 +163,8 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, TEST_KILL_RANDOM("WritableFileWriter::Append:1"); if (s.ok()) { - filesize_ += data.size(); + uint64_t cur_size = filesize_.load(std::memory_order_acquire); + filesize_.store(cur_size + data.size(), std::memory_order_release); } return s; } @@ -191,7 +192,8 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, cap = buf_.Capacity() - buf_.CurrentSize(); } pending_sync_ = true; - filesize_ += pad_bytes; + uint64_t cur_size = filesize_.load(std::memory_order_acquire); + filesize_.store(cur_size + pad_bytes, std::memory_order_release); if (perform_data_verification_) { buffered_data_crc32c_checksum_ = crc32c::Extend(buffered_data_crc32c_checksum_, @@ -227,14 +229,15 @@ IOStatus WritableFileWriter::Close() { start_ts = FileOperationInfo::StartNow(); } #endif - interim = writable_file_->Truncate(filesize_, io_options, nullptr); + uint64_t filesz = filesize_.load(std::memory_order_acquire); + interim = writable_file_->Truncate(filesz, io_options, nullptr); #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = FileOperationInfo::FinishNow(); NotifyOnFileTruncateFinish(start_ts, finish_ts, s); if (!interim.ok()) { NotifyOnIOError(interim, FileOperationType::kTruncate, file_name(), - filesize_); + filesz); } } #endif @@ -372,8 +375,9 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { const uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. - if (filesize_ > kBytesNotSyncRange) { - uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; + uint64_t cur_size = filesize_.load(std::memory_order_acquire); + if (cur_size > kBytesNotSyncRange) { + uint64_t offset_sync_to = cur_size - kBytesNotSyncRange; offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; assert(offset_sync_to >= last_sync_size_); if (offset_sync_to > 0 && diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 000159faa..2cbc715b3 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -142,7 +142,7 @@ class WritableFileWriter { size_t max_buffer_size_; // Actually written data size can be used for truncate // not counting padding data - uint64_t filesize_; + std::atomic filesize_; #ifndef ROCKSDB_LITE // This is necessary when we use unbuffered access // and writes must happen on aligned offsets @@ -255,7 +255,9 @@ class WritableFileWriter { // returns NotSupported status. IOStatus SyncWithoutFlush(bool use_fsync); - uint64_t GetFileSize() const { return filesize_; } + uint64_t GetFileSize() const { + return filesize_.load(std::memory_order_acquire); + } IOStatus InvalidateCache(size_t offset, size_t length) { return writable_file_->InvalidateCache(offset, length);