Redo SyncPoints for flush while rolling test

Summary:
There was a race condition in the test where the rolling thread
acquired the mutex before the flush thread pinned the logger. Rather than add
more complicated synchronization to fix it, I followed Siying's suggestion to
use SyncPoint in the test code.

Comments in the LoadDependency() invocation explain the reason for each of the
sync points.

Test Plan:
Ran test 1000 times for tsan/asan. Will wait for all sandcastle tests
to finish before committing since this is a tricky test.

Reviewers: IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D54615
main
Andrew Kryczka 9 years ago
parent 291ae4c206
commit b046916656
  1. 2
      db/auto_roll_logger.cc
  2. 2
      db/auto_roll_logger.h
  3. 60
      db/auto_roll_logger_test.cc
  4. 3
      util/posix_logger.h

@ -77,8 +77,6 @@ void AutoRollLogger::Logv(const char* format, va_list ap) {
if ((kLogFileTimeToRoll > 0 && LogExpired()) || if ((kLogFileTimeToRoll > 0 && LogExpired()) ||
(kMaxLogFileSize > 0 && logger_->GetLogFileSize() >= kMaxLogFileSize)) { (kMaxLogFileSize > 0 && logger_->GetLogFileSize() >= kMaxLogFileSize)) {
RollLogFile(); RollLogFile();
TEST_SYNC_POINT_CALLBACK("AutoRollLogger::Logv:BeforeResetLogger",
logger_.get());
Status s = ResetLogger(); Status s = ResetLogger();
if (!s.ok()) { if (!s.ok()) {
// can't really log the error if creating a new LOG file failed // can't really log the error if creating a new LOG file failed

@ -72,7 +72,7 @@ class AutoRollLogger : public Logger {
// pin down the current logger_ instance before releasing the mutex. // pin down the current logger_ instance before releasing the mutex.
logger = logger_; logger = logger_;
} }
TEST_SYNC_POINT_CALLBACK("AutoRollLogger::Flush:PinnedLogger", nullptr); TEST_SYNC_POINT("AutoRollLogger::Flush:PinnedLogger");
if (logger) { if (logger) {
logger->Flush(); logger->Flush();
} }

@ -13,7 +13,6 @@
#include <algorithm> #include <algorithm>
#include "db/auto_roll_logger.h" #include "db/auto_roll_logger.h"
#include "port/port.h" #include "port/port.h"
#include "util/mutexlock.h"
#include "util/sync_point.h" #include "util/sync_point.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -275,51 +274,42 @@ TEST_F(AutoRollLoggerTest, LogFlushWhileRolling) {
AutoRollLogger* auto_roll_logger = AutoRollLogger* auto_roll_logger =
dynamic_cast<AutoRollLogger*>(logger.get()); dynamic_cast<AutoRollLogger*>(logger.get());
ASSERT_TRUE(auto_roll_logger); ASSERT_TRUE(auto_roll_logger);
std::thread flush_thread;
// The test is split into two parts, with the below callback happening between
// them:
// (1) Before ResetLogger() is reached, the log rolling test code occasionally
// invokes PosixLogger::Flush(). For this part, dependencies should not be
// enforced.
// (2) After ResetLogger() has begun, any calls to PosixLogger::Flush() will
// be from threads other than the log rolling thread. We want to only
// enforce dependencies for this part.
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"AutoRollLogger::Logv:BeforeResetLogger", [&](void* arg) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({ rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"PosixLogger::Flush:1", // Need to pin the old logger before beginning the roll, as rolling grabs
// the mutex, which would prevent us from accessing the old logger.
{"AutoRollLogger::Flush:PinnedLogger",
"AutoRollLoggerTest::LogFlushWhileRolling:PreRollAndPostThreadInit"},
// Need to finish the flush thread init before this callback because the
// callback accesses flush_thread.get_id() in order to apply certain sync
// points only to the flush thread.
{"AutoRollLoggerTest::LogFlushWhileRolling:PreRollAndPostThreadInit",
"AutoRollLoggerTest::LogFlushWhileRolling:FlushCallbackBegin"},
// Need to reset logger at this point in Flush() to exercise a race
// condition case, which is executing the flush with the pinned (old)
// logger after the roll has cut over to a new logger.
{"AutoRollLoggerTest::LogFlushWhileRolling:FlushCallback1",
"AutoRollLogger::ResetLogger:BeforeNewLogger"}, "AutoRollLogger::ResetLogger:BeforeNewLogger"},
{"AutoRollLogger::ResetLogger:AfterNewLogger", {"AutoRollLogger::ResetLogger:AfterNewLogger",
"PosixLogger::Flush:2"}, "AutoRollLoggerTest::LogFlushWhileRolling:FlushCallback2"},
});
}); });
port::Mutex flush_thread_mutex;
port::CondVar flush_thread_cv{&flush_thread_mutex};
std::thread flush_thread;
// Additionally, to exercise the edge case, we need to ensure the old logger
// is used. For this, we pause after pinning the logger until dependencies
// have probably been loaded.
const int kWaitForDepsSeconds = 1;
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
"AutoRollLogger::Flush:PinnedLogger", [&](void* arg) { "PosixLogger::Flush:BeginCallback", [&](void* arg) {
MutexLock ml{&flush_thread_mutex}; TEST_SYNC_POINT(
while (flush_thread.get_id() == std::thread::id()) { "AutoRollLoggerTest::LogFlushWhileRolling:FlushCallbackBegin");
flush_thread_cv.Wait();
}
if (std::this_thread::get_id() == flush_thread.get_id()) { if (std::this_thread::get_id() == flush_thread.get_id()) {
Env::Default()->SleepForMicroseconds(kWaitForDepsSeconds * 1000 * 1000); TEST_SYNC_POINT(
sleep(1); "AutoRollLoggerTest::LogFlushWhileRolling:FlushCallback1");
TEST_SYNC_POINT(
"AutoRollLoggerTest::LogFlushWhileRolling:FlushCallback2");
} }
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
{
MutexLock ml{&flush_thread_mutex};
flush_thread = std::thread([&]() { auto_roll_logger->Flush(); });
flush_thread_cv.Signal();
}
flush_thread = std::thread([&]() { auto_roll_logger->Flush(); });
TEST_SYNC_POINT(
"AutoRollLoggerTest::LogFlushWhileRolling:PreRollAndPostThreadInit");
RollLogFileBySizeTest(auto_roll_logger, options.max_log_file_size, RollLogFileBySizeTest(auto_roll_logger, options.max_log_file_size,
kSampleMessage + ":LogFlushWhileRolling"); kSampleMessage + ":LogFlushWhileRolling");
flush_thread.join(); flush_thread.join();

@ -57,8 +57,7 @@ class PosixLogger : public Logger {
fclose(file_); fclose(file_);
} }
virtual void Flush() override { virtual void Flush() override {
TEST_SYNC_POINT("PosixLogger::Flush:1"); TEST_SYNC_POINT_CALLBACK("PosixLogger::Flush:BeginCallback", nullptr);
TEST_SYNC_POINT("PosixLogger::Flush:2");
if (flush_pending_) { if (flush_pending_) {
flush_pending_ = false; flush_pending_ = false;
fflush(file_); fflush(file_);

Loading…
Cancel
Save