diff --git a/HISTORY.md b/HISTORY.md index a115c051a..45e52b4cb 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,7 @@ * Fix a bug that could return wrong results with `index_type=kHashSearch` and using `SetOptions` to change the `prefix_extractor`. * Fixed a bug in WAL tracking with wal_compression. WAL compression writes a kSetCompressionType record which is not associated with any sequence number. As result, WalManager::GetSortedWalsOfType() will skip these WALs and not return them to caller, e.g. Checkpoint, Backup, causing the operations to fail. * Avoid a crash if the IDENTITY file is accidentally truncated to empty. A new DB ID will be written and generated on Open. +* Fixed a possible corruption for users of `manual_wal_flush` and/or `FlushWAL(true /* sync */)`, together with `track_and_verify_wals_in_manifest == true`. For those users, losing unsynced data (e.g., due to power loss) could make future DB opens fail with a `Status::Corruption` complaining about missing WAL data. ### Public API changes * Add new API GetUnixTime in Snapshot class which returns the unix time at which Snapshot is taken. diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index d3e0ff0c2..58c3035c9 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1367,6 +1367,7 @@ Status DBImpl::FlushWAL(bool sync) { } Status DBImpl::SyncWAL() { + TEST_SYNC_POINT("DBImpl::SyncWAL:Begin"); autovector logs_to_sync; bool need_log_dir_sync; uint64_t current_log_number; @@ -1379,7 +1380,7 @@ Status DBImpl::SyncWAL() { current_log_number = logfile_number_; while (logs_.front().number <= current_log_number && - logs_.front().getting_synced) { + logs_.front().IsSyncing()) { log_sync_cv_.Wait(); } // First check that logs are safe to sync in background. @@ -1396,8 +1397,7 @@ Status DBImpl::SyncWAL() { for (auto it = logs_.begin(); it != logs_.end() && it->number <= current_log_number; ++it) { auto& log = *it; - assert(!log.getting_synced); - log.getting_synced = true; + log.PrepareForSync(); logs_to_sync.push_back(log.writer); } @@ -1470,11 +1470,10 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) { VersionEdit synced_wals; for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { auto& wal = *it; - assert(wal.getting_synced); + assert(wal.IsSyncing()); if (immutable_db_options_.track_and_verify_wals_in_manifest && - wal.writer->file()->GetFileSize() > 0) { - synced_wals.AddWal(wal.number, - WalMetadata(wal.writer->file()->GetFileSize())); + wal.GetPreSyncSize() > 0) { + synced_wals.AddWal(wal.number, WalMetadata(wal.GetPreSyncSize())); } if (logs_.size() > 1) { @@ -1483,12 +1482,12 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) { InstrumentedMutexLock l(&log_write_mutex_); it = logs_.erase(it); } else { - wal.getting_synced = false; + wal.FinishSync(); ++it; } } assert(logs_.empty() || logs_[0].number > up_to || - (logs_.size() == 1 && !logs_[0].getting_synced)); + (logs_.size() == 1 && !logs_[0].IsSyncing())); Status s; if (synced_wals.IsWalAddition()) { @@ -1508,8 +1507,7 @@ void DBImpl::MarkLogsNotSynced(uint64_t up_to) { for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to; ++it) { auto& wal = *it; - assert(wal.getting_synced); - wal.getting_synced = false; + wal.FinishSync(); } log_sync_cv_.SignalAll(); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 27a3384ec..fa570a368 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1593,12 +1593,38 @@ class DBImpl : public DB { return s; } + bool IsSyncing() { return getting_synced; } + + uint64_t GetPreSyncSize() { + assert(getting_synced); + return pre_sync_size; + } + + void PrepareForSync() { + assert(!getting_synced); + // Size is expected to be monotonically increasing. + assert(writer->file()->GetFlushedSize() >= pre_sync_size); + getting_synced = true; + pre_sync_size = writer->file()->GetFlushedSize(); + } + + void FinishSync() { + assert(getting_synced); + getting_synced = false; + } + uint64_t number; // Visual Studio doesn't support deque's member to be noncopyable because // of a std::unique_ptr as a member. log::Writer* writer; // own + + private: // true for some prefix of logs_ bool getting_synced = false; + // The size of the file before the sync happens. This amount is guaranteed + // to be persisted even if appends happen during sync so it can be used for + // tracking the synced size in MANIFEST. + uint64_t pre_sync_size = 0; }; // PurgeFileInfo is a structure to hold information of files to be deleted in diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 16d8c8f35..f79b7bee8 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -88,14 +88,13 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { autovector logs_to_sync; uint64_t current_log_number = logfile_number_; while (logs_.front().number < current_log_number && - logs_.front().getting_synced) { + logs_.front().IsSyncing()) { log_sync_cv_.Wait(); } for (auto it = logs_.begin(); it != logs_.end() && it->number < current_log_number; ++it) { auto& log = *it; - assert(!log.getting_synced); - log.getting_synced = true; + log.PrepareForSync(); logs_to_sync.push_back(log.writer); } diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 596d3aa73..f5ca248be 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -288,7 +288,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } while (!logs_.empty() && logs_.front().number < min_log_number) { auto& log = logs_.front(); - if (log.getting_synced) { + if (log.IsSyncing()) { log_sync_cv_.Wait(); // logs_ could have changed while we were waiting. continue; diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 09287a4ef..e65ecf75a 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1181,17 +1181,16 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, // Note: there does not seem to be a reason to wait for parallel sync at // this early step but it is not important since parallel sync (SyncWAL) and // need_log_sync are usually not used together. - while (logs_.front().getting_synced) { + while (logs_.front().IsSyncing()) { log_sync_cv_.Wait(); } for (auto& log : logs_) { - assert(!log.getting_synced); // This is just to prevent the logs to be synced by a parallel SyncWAL // call. We will do the actual syncing later after we will write to the // WAL. // Note: there does not seem to be a reason to set this early before we // actually write to the WAL - log.getting_synced = true; + log.PrepareForSync(); } } else { *need_log_sync = false; diff --git a/db/db_write_test.cc b/db/db_write_test.cc index aae97ef0c..7107d2bb2 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -334,6 +334,41 @@ TEST_P(DBWriteTest, ManualWalFlushInEffect) { ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); } +TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) { + // Repro race condition bug where unflushed WAL data extended the synced size + // recorded to MANIFEST despite being unrecoverable. + Options options = GetOptions(); + std::unique_ptr fault_env( + new FaultInjectionTestEnv(env_)); + options.env = fault_env.get(); + options.manual_wal_flush = true; + options.track_and_verify_wals_in_manifest = true; + Reopen(options); + + ASSERT_OK(Put("key1", "val1")); + + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SyncWAL:Begin", + [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->FlushWAL(true /* sync */)); + + // Ensure callback ran. + ASSERT_EQ("val2", Get("key2")); + + Close(); + + // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the + // DB WAL. + fault_env->DropUnsyncedFileData(); + + Reopen(options); + + // Need to close before `fault_env` goes out of scope. + Close(); +} + TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { std::unique_ptr mock_env( new FaultInjectionTestEnv(env_)); diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index ce1b7bc3d..3b98ea658 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -585,6 +585,8 @@ IOStatus WritableFileWriter::WriteBuffered( left -= allowed; src += allowed; + uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); + flushed_size_.store(cur_size + allowed, std::memory_order_release); } buf_.Size(0); buffered_data_crc32c_checksum_ = 0; @@ -675,6 +677,8 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( // the corresponding checksum value buf_.Size(0); buffered_data_crc32c_checksum_ = 0; + uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); + flushed_size_.store(cur_size + left, std::memory_order_release); return s; } @@ -782,6 +786,8 @@ IOStatus WritableFileWriter::WriteDirect( left -= size; src += size; write_offset += size; + uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); + flushed_size_.store(cur_size + size, std::memory_order_release); assert((next_write_offset_ % alignment) == 0); } @@ -884,6 +890,8 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( IOSTATS_ADD(bytes_written, left); assert((next_write_offset_ % alignment) == 0); + uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); + flushed_size_.store(cur_size + left, std::memory_order_release); if (s.ok()) { // Move the tail to the beginning of the buffer diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 2cbc715b3..bb9e5a6a1 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -143,6 +143,7 @@ class WritableFileWriter { // Actually written data size can be used for truncate // not counting padding data std::atomic filesize_; + std::atomic flushed_size_; #ifndef ROCKSDB_LITE // This is necessary when we use unbuffered access // and writes must happen on aligned offsets @@ -180,6 +181,7 @@ class WritableFileWriter { buf_(), max_buffer_size_(options.writable_file_max_buffer_size), filesize_(0), + flushed_size_(0), #ifndef ROCKSDB_LITE next_write_offset_(0), #endif // ROCKSDB_LITE @@ -259,6 +261,14 @@ class WritableFileWriter { return filesize_.load(std::memory_order_acquire); } + // Returns the size of data flushed to the underlying `FSWritableFile`. + // Expected to match `writable_file()->GetFileSize()`. + // The return value can serve as a lower-bound for the amount of data synced + // by a future call to `SyncWithoutFlush()`. + uint64_t GetFlushedSize() const { + return flushed_size_.load(std::memory_order_acquire); + } + IOStatus InvalidateCache(size_t offset, size_t length) { return writable_file_->InvalidateCache(offset, length); } diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index 18c437dbb..10aaf16ba 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -915,6 +915,51 @@ TEST_F(CheckpointTest, CheckpointWithDbPath) { delete checkpoint; } +TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) { + // Repro for a race condition where a user write comes in after the checkpoint + // syncs WAL for `track_and_verify_wals_in_manifest` but before the + // corresponding MANIFEST update. With the bug, that scenario resulted in an + // unopenable DB with error "Corruption: Size mismatch: WAL ...". + Options options = CurrentOptions(); + std::unique_ptr fault_env( + new FaultInjectionTestEnv(env_)); + options.env = fault_env.get(); + options.track_and_verify_wals_in_manifest = true; + Reopen(options); + + ASSERT_OK(Put("key1", "val1")); + + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SyncWAL:BeforeMarkLogsSynced:1", + [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + std::unique_ptr checkpoint; + { + Checkpoint* checkpoint_ptr; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr)); + checkpoint.reset(checkpoint_ptr); + } + + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_)); + + // Ensure callback ran. + ASSERT_EQ("val2", Get("key2")); + + Close(); + + // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the + // DB WAL. + fault_env->DropUnsyncedFileData(); + + // Before the bug fix, reopening the DB would fail because the MANIFEST's + // AddWal entry indicated the WAL should be synced through "key2" -> "val2". + Reopen(options); + + // Need to close before `fault_env` goes out of scope. + Close(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {