Ensure writes to WAL tail during `FlushWAL(true /* sync */)` will be synced (#10560)

Summary:
WAL append and switch can both happen between `FlushWAL(true /* sync */)`'s sync operations and its call to `MarkLogsSynced()`. We permit this since locks need to be released for the sync operations. Such an appended/switched WAL is both inactive and incompletely synced at the time `MarkLogsSynced()` processes it.

Prior to this PR, `MarkLogsSynced()` assumed all inactive WALs were fully synced and removed them from consideration for future syncs. That was wrong in the scenario described above and led to the latest append(s) never being synced. This PR changes `MarkLogsSynced()` to only remove inactive WALs from consideration for which all flushed data has been synced.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10560

Test Plan: repro unit test for the scenario described above. Without this PR, it fails on "key2" not found

Reviewed By: riversand963

Differential Revision: D38957391

Pulled By: ajkr

fbshipit-source-id: da77175eba97ff251a4219b227b3bb2d4843ed26
main
Andrew Kryczka 2 years ago committed by Facebook GitHub Bot
parent 7fbee01f0c
commit 7ad4b38617
  1. 2
      HISTORY.md
  2. 14
      db/db_impl/db_impl.cc
  3. 45
      db/db_write_test.cc

@ -1,5 +1,7 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### Bug Fixes
* Fixed bug where `FlushWAL(true /* sync */)` (used by `GetLiveFilesStorageInfo()`, which is used by checkpoint and backup) could cause parallel writes at the tail of a WAL file to never be synced.
### Public API changes ### Public API changes
* Add `rocksdb_column_family_handle_get_id`, `rocksdb_column_family_handle_get_name` to get name, id of column family in C API * Add `rocksdb_column_family_handle_get_id`, `rocksdb_column_family_handle_get_name` to get name, id of column family in C API

@ -1532,20 +1532,28 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
auto& wal = *it; auto& wal = *it;
assert(wal.IsSyncing()); assert(wal.IsSyncing());
if (logs_.size() > 1) { if (wal.number < logs_.back().number) {
// Inactive WAL
if (immutable_db_options_.track_and_verify_wals_in_manifest && if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.GetPreSyncSize() > 0) { wal.GetPreSyncSize() > 0) {
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize())); synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
} }
if (wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
// Fully synced
logs_to_free_.push_back(wal.ReleaseWriter()); logs_to_free_.push_back(wal.ReleaseWriter());
it = logs_.erase(it); it = logs_.erase(it);
} else { } else {
assert(wal.GetPreSyncSize() < wal.writer->file()->GetFlushedSize());
wal.FinishSync();
++it;
}
} else {
assert(wal.number == logs_.back().number);
// Active WAL
wal.FinishSync(); wal.FinishSync();
++it; ++it;
} }
} }
assert(logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].IsSyncing()));
log_sync_cv_.SignalAll(); log_sync_cv_.SignalAll();
} }

@ -499,6 +499,51 @@ TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) {
Close(); Close();
} }
TEST_P(DBWriteTest, InactiveWalFullySyncedBeforeUntracked) {
// Repro bug where a WAL is appended and switched after
// `FlushWAL(true /* sync */)`'s sync finishes and before it untracks fully
// synced inactive logs. Previously such a WAL would be wrongly untracked
// so the final append would never be synced.
Options options = GetOptions();
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
options.env = fault_env.get();
Reopen(options);
ASSERT_OK(Put("key1", "val1"));
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncWAL:BeforeMarkLogsSynced:1", [this](void* /* arg */) {
ASSERT_OK(Put("key2", "val2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->FlushWAL(true /* sync */));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_OK(Put("key3", "val3"));
ASSERT_OK(db_->FlushWAL(true /* sync */));
Close();
// Simulate full loss of unsynced data. This should drop nothing since we did
// `FlushWAL(true /* sync */)` before `Close()`.
fault_env->DropUnsyncedFileData();
Reopen(options);
ASSERT_EQ("val1", Get("key1"));
ASSERT_EQ("val2", Get("key2"));
ASSERT_EQ("val3", Get("key3"));
// Need to close before `fault_env` goes out of scope.
Close();
}
TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
std::unique_ptr<FaultInjectionTestEnv> mock_env( std::unique_ptr<FaultInjectionTestEnv> mock_env(
new FaultInjectionTestEnv(env_)); new FaultInjectionTestEnv(env_));

Loading…
Cancel
Save