From 48bc0c6ad39b69ba118435e1bcfc69b23606cb10 Mon Sep 17 00:00:00 2001 From: Haobo Xu Date: Sun, 23 Mar 2014 21:49:14 -0700 Subject: [PATCH] [RocksDB] Fix a race condition in GetSortedWalFiles Summary: This patch fixed a race condition where a log file is moved to archived dir in the middle of GetSortedWalFiles. Without the fix, the log file would be missed in the result, which leads to transaction log iterator gap. A test utility SyncPoint is added to help reproducing the race condition. Test Plan: TransactionLogIteratorRace; make check Reviewers: dhruba, ljin Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D17121 --- db/db_filesnapshot.cc | 48 ++++++++++++++++++++++---- db/db_impl.cc | 19 +++++------ db/db_impl.h | 6 ++-- db/db_test.cc | 46 +++++++++++++++++++++++++ util/sync_point.cc | 62 +++++++++++++++++++++++++++++++++ util/sync_point.h | 79 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 241 insertions(+), 19 deletions(-) create mode 100644 util/sync_point.cc create mode 100644 util/sync_point.h diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 04d6d0e17..89db22f43 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -17,6 +17,7 @@ #include "rocksdb/env.h" #include "port/port.h" #include "util/mutexlock.h" +#include "util/sync_point.h" namespace rocksdb { @@ -95,20 +96,55 @@ Status DBImpl::GetLiveFiles(std::vector& ret, } Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { - // First get sorted files in archive dir, then append sorted files from main - // dir to maintain sorted order + // First get sorted files in db dir, then get sorted files from archived + // dir, to avoid a race condition where a log file is moved to archived + // dir in between. + Status s; + // list wal files in main db dir. + VectorLogPtr logs; + s = GetSortedWalsOfType(options_.wal_dir, logs, kAliveLogFile); + if (!s.ok()) { + return s; + } + // Reproduce the race condition where a log file is moved + // to archived dir, between these two sync points, used in + // (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:1"); + TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:2"); + + files.clear(); // list wal files in archive dir. - Status s; std::string archivedir = ArchivalDirectory(options_.wal_dir); if (env_->FileExists(archivedir)) { - s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile); + s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile); if (!s.ok()) { return s; } } - // list wal files in main db dir. - return AppendSortedWalsOfType(options_.wal_dir, files, kAliveLogFile); + + uint64_t latest_archived_log_number = 0; + if (!files.empty()) { + latest_archived_log_number = files.back()->LogNumber(); + Log(options_.info_log, "Latest Archived log: %lu", + latest_archived_log_number); + } + + files.reserve(files.size() + logs.size()); + for (auto& log : logs) { + if (log->LogNumber() > latest_archived_log_number) { + files.push_back(std::move(log)); + } else { + // When the race condition happens, we could see the + // same log in both db dir and archived dir. Simply + // ignore the one in db dir. Note that, if we read + // archived dir first, we would have missed the log file. + Log(options_.info_log, "%s already moved to archive", + log->PathName().c_str()); + } + } + + return s; } } diff --git a/db/db_impl.cc b/db/db_impl.cc index bb1f839a9..b813efc49 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -64,6 +64,7 @@ #include "util/mutexlock.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" +#include "util/sync_point.h" namespace rocksdb { @@ -872,7 +873,11 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { if (type == kLogFile && (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) { auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number); + // The sync point below is used in (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:1"); Status s = env_->RenameFile(fname, archived_log_name); + // The sync point below is used in (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:2"); Log(options_.info_log, "Move log file %s to %s -- %s\n", fname.c_str(), archived_log_name.c_str(), s.ToString().c_str()); @@ -1020,7 +1025,7 @@ void DBImpl::PurgeObsoleteWALFiles() { size_t files_del_num = log_files_num - files_keep_num; VectorLogPtr archived_logs; - AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); + GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); if (files_del_num > archived_logs.size()) { Log(options_.info_log, "Trying to delete more archived log files than " @@ -1791,20 +1796,14 @@ struct CompareLogByPointer { } }; -Status DBImpl::AppendSortedWalsOfType(const std::string& path, +Status DBImpl::GetSortedWalsOfType(const std::string& path, VectorLogPtr& log_files, WalFileType log_type) { std::vector all_files; const Status status = env_->GetChildren(path, &all_files); if (!status.ok()) { return status; } - log_files.reserve(log_files.size() + all_files.size()); - VectorLogPtr::iterator pos_start; - if (!log_files.empty()) { - pos_start = log_files.end() - 1; - } else { - pos_start = log_files.begin(); - } + log_files.reserve(all_files.size()); for (const auto& f : all_files) { uint64_t number; FileType type; @@ -1830,7 +1829,7 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path, } } CompareLogByPointer compare_log_files; - std::sort(pos_start, log_files.end(), compare_log_files); + std::sort(log_files.begin(), log_files.end(), compare_log_files); return status; } diff --git a/db/db_impl.h b/db/db_impl.h index 4cfb6ecaf..3eb557a02 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -394,9 +394,9 @@ class DBImpl : public DB { void PurgeObsoleteWALFiles(); - Status AppendSortedWalsOfType(const std::string& path, - VectorLogPtr& log_files, - WalFileType type); + Status GetSortedWalsOfType(const std::string& path, + VectorLogPtr& log_files, + WalFileType type); // Requires: all_logs should be sorted with earliest log file first // Retains all log files in all_logs which contain updates with seq no. diff --git a/db/db_test.cc b/db/db_test.cc index 0695b5cc7..f707eb97c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -37,6 +37,7 @@ #include "util/mutexlock.h" #include "util/statistics.h" #include "util/testharness.h" +#include "util/sync_point.h" #include "util/testutil.h" namespace rocksdb { @@ -5189,6 +5190,51 @@ TEST(DBTest, TransactionLogIterator) { } while (ChangeCompactOptions()); } +TEST(DBTest, TransactionLogIteratorRace) { + // Setup sync point dependency to reproduce the race condition of + // a log file moved to archived dir, in the middle of GetSortedWalFiles + rocksdb::SyncPoint::GetInstance()->LoadDependency( + { { "DBImpl::GetSortedWalFiles:1", "DBImpl::PurgeObsoleteFiles:1" }, + { "DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalFiles:2" }, + }); + + do { + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + Put("key1", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key2", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key3", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key4", DummyString(1024)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U); + + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(4, iter); + } + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // trigger async flush, and log move. Well, log move will + // wait until the GetSortedWalFiles:1 to reproduce the race + // condition + FlushOptions flush_options; + flush_options.wait = false; + dbfull()->Flush(flush_options); + + // "key5" would be written in a new memtable and log + Put("key5", DummyString(1024)); + { + // this iter would miss "key4" if not fixed + auto iter = OpenTransactionLogIter(0); + ExpectRecords(5, iter); + } + } while (ChangeCompactOptions()); +} + TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) { do { Options options = OptionsForLogIterTest(); diff --git a/util/sync_point.cc b/util/sync_point.cc new file mode 100644 index 000000000..5d0ac2dd6 --- /dev/null +++ b/util/sync_point.cc @@ -0,0 +1,62 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "util/sync_point.h" + +namespace rocksdb { + +SyncPoint* SyncPoint::GetInstance() { + static SyncPoint sync_point; + return &sync_point; +} + +void SyncPoint::LoadDependency(const std::vector& dependencies) { + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } +} + +bool SyncPoint::PredecessorsAllCleared(const std::string& point) { + for (const auto& pred : predecessors_[point]) { + if (cleared_points_.count(pred) == 0) { + return false; + } + } + return true; +} + +void SyncPoint::EnableProcessing() { + std::unique_lock lock(mutex_); + enabled_ = true; +} + +void SyncPoint::DisableProcessing() { + std::unique_lock lock(mutex_); + enabled_ = false; +} + +void SyncPoint::ClearTrace() { + std::unique_lock lock(mutex_); + cleared_points_.clear(); +} + +void SyncPoint::Process(const std::string& point) { + std::unique_lock lock(mutex_); + + if (!enabled_) return; + + while (!PredecessorsAllCleared(point)) { + cv_.wait(lock); + } + + cleared_points_.insert(point); + cv_.notify_all(); +} + +} // namespace rocksdb diff --git a/util/sync_point.h b/util/sync_point.h new file mode 100644 index 000000000..3cc892370 --- /dev/null +++ b/util/sync_point.h @@ -0,0 +1,79 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace rocksdb { + +// This class provides facility to reproduce race conditions deterministically +// in unit tests. +// Developer could specify sync points in the codebase via TEST_SYNC_POINT. +// Each sync point represents a position in the execution stream of a thread. +// In the unit test, 'Happens After' relationship among sync points could be +// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of +// threads execution. +// Refer to (DBTest,TransactionLogIteratorRace), for an exmaple use case. + +class SyncPoint { + public: + static SyncPoint* GetInstance(); + + struct Dependency { + std::string predecessor; + std::string successor; + }; + // call once at the beginning of a test to setup the dependency between + // sync points + void LoadDependency(const std::vector& dependencies); + + // enable sync point processing (disabled on startup) + void EnableProcessing(); + + // disable sync point processing + void DisableProcessing(); + + // remove the execution trace of all sync points + void ClearTrace(); + + // triggered by TEST_SYNC_POINT, blocking execution until all predecessors + // are executed. + void Process(const std::string& point); + + // TODO: it might be useful to provide a function that blocks until all + // sync points are cleared. + + private: + bool PredecessorsAllCleared(const std::string& point); + + // successor/predecessor map loaded from LoadDependency + std::unordered_map> successors_; + std::unordered_map> predecessors_; + + std::mutex mutex_; + std::condition_variable cv_; + // sync points that have been passed through + std::unordered_set cleared_points_; + bool enabled_ = false; +}; + +} // namespace rocksdb + +// Use TEST_SYNC_POINT to specify sync points inside code base. +// Sync points can have happens-after depedency on other sync points, +// configured at runtime via SyncPoint::LoadDependency. This could be +// utilized to re-produce race conditions between threads. +// See TransactionLogIteratorRace in db_test.cc for an example use case. +// TEST_SYNC_POINT is no op in release build. +#ifdef NDEBUG +#define TEST_SYNC_POINT(x) +#else +#define TEST_SYNC_POINT(x) rocksdb::SyncPoint::GetInstance()->Process(x) +#endif