diff --git a/HISTORY.md b/HISTORY.md index 80ffc7b23..78bf8c6c4 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,7 @@ # Rocksdb Change Log ## Unreleased +### Bug Fixes +* Fixed bug where `FlushWAL(true /* sync */)` (used by `GetLiveFilesStorageInfo()`, which is used by checkpoint and backup) could cause parallel writes at the tail of a WAL file to never be synced. ### Public API changes * Add `rocksdb_column_family_handle_get_id`, `rocksdb_column_family_handle_get_name` to get name, id of column family in C API diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index ef0dc2fbc..74925a942 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1532,20 +1532,28 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, auto& wal = *it; assert(wal.IsSyncing()); - if (logs_.size() > 1) { + if (wal.number < logs_.back().number) { + // Inactive WAL if (immutable_db_options_.track_and_verify_wals_in_manifest && wal.GetPreSyncSize() > 0) { synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize())); } - logs_to_free_.push_back(wal.ReleaseWriter()); - it = logs_.erase(it); + if (wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) { + // Fully synced + logs_to_free_.push_back(wal.ReleaseWriter()); + it = logs_.erase(it); + } else { + assert(wal.GetPreSyncSize() < wal.writer->file()->GetFlushedSize()); + wal.FinishSync(); + ++it; + } } else { + assert(wal.number == logs_.back().number); + // Active WAL wal.FinishSync(); ++it; } } - assert(logs_.empty() || logs_[0].number > up_to || - (logs_.size() == 1 && !logs_[0].IsSyncing())); log_sync_cv_.SignalAll(); } diff --git a/db/db_write_test.cc b/db/db_write_test.cc index deee6efc3..a068041c2 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -499,6 +499,51 @@ TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) { Close(); } +TEST_P(DBWriteTest, InactiveWalFullySyncedBeforeUntracked) { + // Repro bug where a WAL is appended and switched after + // `FlushWAL(true /* sync */)`'s sync finishes and before it untracks fully + // synced inactive logs. Previously such a WAL would be wrongly untracked + // so the final append would never be synced. + Options options = GetOptions(); + std::unique_ptr fault_env( + new FaultInjectionTestEnv(env_)); + options.env = fault_env.get(); + Reopen(options); + + ASSERT_OK(Put("key1", "val1")); + + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SyncWAL:BeforeMarkLogsSynced:1", [this](void* /* arg */) { + ASSERT_OK(Put("key2", "val2")); + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->FlushWAL(true /* sync */)); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_OK(Put("key3", "val3")); + + ASSERT_OK(db_->FlushWAL(true /* sync */)); + + Close(); + + // Simulate full loss of unsynced data. This should drop nothing since we did + // `FlushWAL(true /* sync */)` before `Close()`. + fault_env->DropUnsyncedFileData(); + + Reopen(options); + + ASSERT_EQ("val1", Get("key1")); + ASSERT_EQ("val2", Get("key2")); + ASSERT_EQ("val3", Get("key3")); + + // Need to close before `fault_env` goes out of scope. + Close(); +} + TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { std::unique_ptr mock_env( new FaultInjectionTestEnv(env_));