Fix a timer crash caused by invalid memory management (#9656)

Summary:
Timer crash when multiple DB instances doing heavy DB open and close
operations concurrently. Which is caused by adding a timer task with
smaller timestamp than the current running task. Fix it by moving the
getting new task timestamp part within timer mutex protection.
And other fixes:
- Disallow adding duplicated function name to timer
- Fix a minor memory leak in timer when a running task is cancelled

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9656

Reviewed By: ajkr

Differential Revision: D34626296

Pulled By: jay-zhuang

fbshipit-source-id: 6b6d96a5149746bf503546244912a9e41a0c5f6b
main
Jay Zhuang 2 years ago committed by Facebook GitHub Bot
parent 91372328ef
commit 09b0e8f2c7
  1. 1
      db/column_family.cc
  2. 1
      db/compaction/compaction_picker_universal.cc
  3. 4
      db/db_impl/compacted_db_impl.cc
  4. 9
      db/db_impl/db_impl.cc
  5. 2
      db/db_impl/db_impl.h
  6. 2
      db/db_impl/db_impl_compaction_flush.cc
  7. 5
      db/db_impl/db_impl_open.cc
  8. 38
      db/periodic_work_scheduler.cc
  9. 4
      db/periodic_work_scheduler.h
  10. 71
      util/timer.h
  11. 21
      util/timer_test.cc

@ -989,6 +989,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
GetL0ThresholdSpeedupCompaction( GetL0ThresholdSpeedupCompaction(
mutable_cf_options.level0_file_num_compaction_trigger, mutable_cf_options.level0_file_num_compaction_trigger,
mutable_cf_options.level0_slowdown_writes_trigger)) { mutable_cf_options.level0_slowdown_writes_trigger)) {
fprintf(stdout, "JJJ2\n");
write_controller_token_ = write_controller_token_ =
write_controller->GetCompactionPressureToken(); write_controller->GetCompactionPressureToken();
ROCKS_LOG_INFO( ROCKS_LOG_INFO(

@ -372,6 +372,7 @@ Compaction* UniversalCompactionBuilder::PickCompaction() {
const int kLevel0 = 0; const int kLevel0 = 0;
score_ = vstorage_->CompactionScore(kLevel0); score_ = vstorage_->CompactionScore(kLevel0);
sorted_runs_ = CalculateSortedRuns(*vstorage_); sorted_runs_ = CalculateSortedRuns(*vstorage_);
fprintf(stdout, "JJJ1\n");
if (sorted_runs_.size() == 0 || if (sorted_runs_.size() == 0 ||
(vstorage_->FilesMarkedForPeriodicCompaction().empty() && (vstorage_->FilesMarkedForPeriodicCompaction().empty() &&

@ -171,7 +171,9 @@ Status CompactedDBImpl::Open(const Options& options,
std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname)); std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
Status s = db->Init(options); Status s = db->Init(options);
if (s.ok()) { if (s.ok()) {
db->StartPeriodicWorkScheduler(); s = db->StartPeriodicWorkScheduler();
}
if (s.ok()) {
ROCKS_LOG_INFO(db->immutable_db_options_.info_log, ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
"Opened the db as fully compacted mode"); "Opened the db as fully compacted mode");
LogFlush(db->immutable_db_options_.info_log); LogFlush(db->immutable_db_options_.info_log);

@ -768,7 +768,7 @@ void DBImpl::PrintStatistics() {
} }
} }
void DBImpl::StartPeriodicWorkScheduler() { Status DBImpl::StartPeriodicWorkScheduler() {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#ifndef NDEBUG #ifndef NDEBUG
@ -778,7 +778,7 @@ void DBImpl::StartPeriodicWorkScheduler() {
"DBImpl::StartPeriodicWorkScheduler:DisableScheduler", "DBImpl::StartPeriodicWorkScheduler:DisableScheduler",
&disable_scheduler); &disable_scheduler);
if (disable_scheduler) { if (disable_scheduler) {
return; return Status::OK();
} }
#endif // !NDEBUG #endif // !NDEBUG
@ -789,10 +789,11 @@ void DBImpl::StartPeriodicWorkScheduler() {
&periodic_work_scheduler_); &periodic_work_scheduler_);
} }
periodic_work_scheduler_->Register( return periodic_work_scheduler_->Register(
this, mutable_db_options_.stats_dump_period_sec, this, mutable_db_options_.stats_dump_period_sec,
mutable_db_options_.stats_persist_period_sec); mutable_db_options_.stats_persist_period_sec);
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
return Status::OK();
} }
// esitmate the total size of stats_history_ // esitmate the total size of stats_history_
@ -1226,7 +1227,7 @@ Status DBImpl::SetDBOptions(
mutable_db_options_.stats_persist_period_sec) { mutable_db_options_.stats_persist_period_sec) {
mutex_.Unlock(); mutex_.Unlock();
periodic_work_scheduler_->Unregister(this); periodic_work_scheduler_->Unregister(this);
periodic_work_scheduler_->Register( s = periodic_work_scheduler_->Register(
this, new_options.stats_dump_period_sec, this, new_options.stats_dump_period_sec,
new_options.stats_persist_period_sec); new_options.stats_persist_period_sec);
mutex_.Lock(); mutex_.Lock();

@ -1872,7 +1872,7 @@ class DBImpl : public DB {
LogBuffer* log_buffer); LogBuffer* log_buffer);
// Schedule background tasks // Schedule background tasks
void StartPeriodicWorkScheduler(); Status StartPeriodicWorkScheduler();
void PrintStatistics(); void PrintStatistics();

@ -285,6 +285,7 @@ Status DBImpl::FlushMemTableToOutputFile(
assert(storage_info); assert(storage_info);
VersionStorageInfo::LevelSummaryStorage tmp; VersionStorageInfo::LevelSummaryStorage tmp;
fprintf(stdout, "JJJ4\n");
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
column_family_name.c_str(), column_family_name.c_str(),
storage_info->LevelSummary(&tmp)); storage_info->LevelSummary(&tmp));
@ -730,6 +731,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
assert(storage_info); assert(storage_info);
VersionStorageInfo::LevelSummaryStorage tmp; VersionStorageInfo::LevelSummaryStorage tmp;
fprintf(stdout, "JJJ3\n");
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
column_family_name.c_str(), column_family_name.c_str(),
storage_info->LevelSummary(&tmp)); storage_info->LevelSummary(&tmp));

@ -1956,8 +1956,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
persist_options_status.ToString().c_str()); persist_options_status.ToString().c_str());
} }
if (s.ok()) { if (s.ok()) {
impl->StartPeriodicWorkScheduler(); s = impl->StartPeriodicWorkScheduler();
} else { }
if (!s.ok()) {
for (auto* h : *handles) { for (auto* h : *handles) {
delete h; delete h;
} }

@ -16,31 +16,41 @@ PeriodicWorkScheduler::PeriodicWorkScheduler(
timer = std::unique_ptr<Timer>(new Timer(clock.get())); timer = std::unique_ptr<Timer>(new Timer(clock.get()));
} }
void PeriodicWorkScheduler::Register(DBImpl* dbi, Status PeriodicWorkScheduler::Register(DBImpl* dbi,
unsigned int stats_dump_period_sec, unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec) { unsigned int stats_persist_period_sec) {
MutexLock l(&timer_mu_); MutexLock l(&timer_mu_);
static std::atomic<uint64_t> initial_delay(0); static std::atomic<uint64_t> initial_delay(0);
timer->Start(); timer->Start();
if (stats_dump_period_sec > 0) { if (stats_dump_period_sec > 0) {
timer->Add([dbi]() { dbi->DumpStats(); }, GetTaskName(dbi, "dump_st"), bool succeeded = timer->Add(
initial_delay.fetch_add(1) % [dbi]() { dbi->DumpStats(); }, GetTaskName(dbi, "dump_st"),
static_cast<uint64_t>(stats_dump_period_sec) * initial_delay.fetch_add(1) %
kMicrosInSecond, static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond,
static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond); static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond);
if (!succeeded) {
return Status::Aborted("Unable to add periodic task DumpStats");
}
} }
if (stats_persist_period_sec > 0) { if (stats_persist_period_sec > 0) {
timer->Add( bool succeeded = timer->Add(
[dbi]() { dbi->PersistStats(); }, GetTaskName(dbi, "pst_st"), [dbi]() { dbi->PersistStats(); }, GetTaskName(dbi, "pst_st"),
initial_delay.fetch_add(1) % initial_delay.fetch_add(1) %
static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond, static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond,
static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond); static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond);
if (!succeeded) {
return Status::Aborted("Unable to add periodic task PersistStats");
}
}
bool succeeded = timer->Add(
[dbi]() { dbi->FlushInfoLog(); }, GetTaskName(dbi, "flush_info_log"),
initial_delay.fetch_add(1) % kDefaultFlushInfoLogPeriodSec *
kMicrosInSecond,
kDefaultFlushInfoLogPeriodSec * kMicrosInSecond);
if (!succeeded) {
return Status::Aborted("Unable to add periodic task PersistStats");
} }
timer->Add([dbi]() { dbi->FlushInfoLog(); }, return Status::OK();
GetTaskName(dbi, "flush_info_log"),
initial_delay.fetch_add(1) % kDefaultFlushInfoLogPeriodSec *
kMicrosInSecond,
kDefaultFlushInfoLogPeriodSec * kMicrosInSecond);
} }
void PeriodicWorkScheduler::Unregister(DBImpl* dbi) { void PeriodicWorkScheduler::Unregister(DBImpl* dbi) {

@ -30,8 +30,8 @@ class PeriodicWorkScheduler {
PeriodicWorkScheduler& operator=(const PeriodicWorkScheduler&) = delete; PeriodicWorkScheduler& operator=(const PeriodicWorkScheduler&) = delete;
PeriodicWorkScheduler& operator=(PeriodicWorkScheduler&&) = delete; PeriodicWorkScheduler& operator=(PeriodicWorkScheduler&&) = delete;
void Register(DBImpl* dbi, unsigned int stats_dump_period_sec, Status Register(DBImpl* dbi, unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec); unsigned int stats_persist_period_sec);
void Unregister(DBImpl* dbi); void Unregister(DBImpl* dbi);

@ -48,36 +48,38 @@ class Timer {
~Timer() { Shutdown(); } ~Timer() { Shutdown(); }
// Add a new function to run. // Add a new function to run.
// fn_name has to be identical, otherwise, the new one overrides the existing // fn_name has to be identical, otherwise it will fail to add and return false
// one, regardless if the function is pending removed (invalid) or not.
// start_after_us is the initial delay. // start_after_us is the initial delay.
// repeat_every_us is the interval between ending time of the last call and // repeat_every_us is the interval between ending time of the last call and
// starting time of the next call. For example, repeat_every_us = 2000 and // starting time of the next call. For example, repeat_every_us = 2000 and
// the function takes 1000us to run. If it starts at time [now]us, then it // the function takes 1000us to run. If it starts at time [now]us, then it
// finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us. // finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us.
// repeat_every_us == 0 means do not repeat. // repeat_every_us == 0 means do not repeat.
void Add(std::function<void()> fn, bool Add(std::function<void()> fn, const std::string& fn_name,
const std::string& fn_name, uint64_t start_after_us, uint64_t repeat_every_us) {
uint64_t start_after_us, auto fn_info = std::make_unique<FunctionInfo>(std::move(fn), fn_name, 0,
uint64_t repeat_every_us) { repeat_every_us);
std::unique_ptr<FunctionInfo> fn_info(new FunctionInfo( InstrumentedMutexLock l(&mutex_);
std::move(fn), fn_name, clock_->NowMicros() + start_after_us, // Assign time within mutex to make sure the next_run_time is larger than
repeat_every_us)); // the current running one
{ fn_info->next_run_time_us = clock_->NowMicros() + start_after_us;
InstrumentedMutexLock l(&mutex_); // the new task start time should never before the current task executing
auto it = map_.find(fn_name); // time, as the executing task can only be running if it's next_run_time_us
if (it == map_.end()) { // is due (<= clock_->NowMicros()).
heap_.push(fn_info.get()); if (executing_task_ &&
map_.emplace(std::make_pair(fn_name, std::move(fn_info))); fn_info->next_run_time_us < heap_.top()->next_run_time_us) {
} else { return false;
// If it already exists, overriding it. }
it->second->fn = std::move(fn_info->fn); auto it = map_.find(fn_name);
it->second->valid = true; if (it == map_.end()) {
it->second->next_run_time_us = clock_->NowMicros() + start_after_us; heap_.push(fn_info.get());
it->second->repeat_every_us = repeat_every_us; map_.try_emplace(fn_name, std::move(fn_info));
} } else {
// timer doesn't support duplicated function name
return false;
} }
cond_var_.SignalAll(); cond_var_.SignalAll();
return true;
} }
void Cancel(const std::string& fn_name) { void Cancel(const std::string& fn_name) {
@ -116,7 +118,7 @@ class Timer {
} }
running_ = true; running_ = true;
thread_.reset(new port::Thread(&Timer::Run, this)); thread_ = std::make_unique<port::Thread>(&Timer::Run, this);
return true; return true;
} }
@ -140,8 +142,8 @@ class Timer {
bool HasPendingTask() const { bool HasPendingTask() const {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
for (auto it = map_.begin(); it != map_.end(); it++) { for (const auto& fn_info : map_) {
if (it->second->IsValid()) { if (fn_info.second->IsValid()) {
return true; return true;
} }
} }
@ -155,7 +157,7 @@ class Timer {
// here to bump current time and trigger Timer. See timer_test for example. // here to bump current time and trigger Timer. See timer_test for example.
// //
// Note: only support one caller of this method. // Note: only support one caller of this method.
void TEST_WaitForRun(std::function<void()> callback = nullptr) { void TEST_WaitForRun(const std::function<void()>& callback = nullptr) {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
// It act as a spin lock // It act as a spin lock
while (executing_task_ || while (executing_task_ ||
@ -177,8 +179,8 @@ class Timer {
size_t TEST_GetPendingTaskNum() const { size_t TEST_GetPendingTaskNum() const {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
size_t ret = 0; size_t ret = 0;
for (auto it = map_.begin(); it != map_.end(); it++) { for (const auto& fn_info : map_) {
if (it->second->IsValid()) { if (fn_info.second->IsValid()) {
ret++; ret++;
} }
} }
@ -220,10 +222,13 @@ class Timer {
executing_task_ = false; executing_task_ = false;
cond_var_.SignalAll(); cond_var_.SignalAll();
// Remove the work from the heap once it is done executing. // Remove the work from the heap once it is done executing, make sure
// it's the same function after executing the work while mutex is
// released.
// Note that we are just removing the pointer from the heap. Its // Note that we are just removing the pointer from the heap. Its
// memory is still managed in the map (as it holds a unique ptr). // memory is still managed in the map (as it holds a unique ptr).
// So current_fn is still a valid ptr. // So current_fn is still a valid ptr.
assert(heap_.top() == current_fn);
heap_.pop(); heap_.pop();
// current_fn may be cancelled already. // current_fn may be cancelled already.
@ -234,6 +239,10 @@ class Timer {
// Schedule new work into the heap with new time. // Schedule new work into the heap with new time.
heap_.push(current_fn); heap_.push(current_fn);
} else {
// if current_fn is cancelled or no need to repeat, remove it from the
// map to avoid leak.
map_.erase(current_fn->name);
} }
} else { } else {
cond_var_.TimedWait(current_fn->next_run_time_us); cond_var_.TimedWait(current_fn->next_run_time_us);
@ -280,10 +289,10 @@ class Timer {
// calls `Cancel()`. // calls `Cancel()`.
bool valid; bool valid;
FunctionInfo(std::function<void()>&& _fn, const std::string& _name, FunctionInfo(std::function<void()>&& _fn, std::string _name,
const uint64_t _next_run_time_us, uint64_t _repeat_every_us) const uint64_t _next_run_time_us, uint64_t _repeat_every_us)
: fn(std::move(_fn)), : fn(std::move(_fn)),
name(_name), name(std::move(_name)),
next_run_time_us(_next_run_time_us), next_run_time_us(_next_run_time_us),
repeat_every_us(_repeat_every_us), repeat_every_us(_repeat_every_us),
valid(true) {} valid(true) {}

@ -273,33 +273,32 @@ TEST_F(TimerTest, AddSameFuncName) {
ASSERT_TRUE(timer.Start()); ASSERT_TRUE(timer.Start());
int func_counter1 = 0; int func_counter1 = 0;
timer.Add([&] { func_counter1++; }, "duplicated_func", kInitDelayUs, ASSERT_TRUE(timer.Add([&] { func_counter1++; }, "duplicated_func",
kRepeat1Us); kInitDelayUs, kRepeat1Us));
int func2_counter = 0; int func2_counter = 0;
timer.Add([&] { func2_counter++; }, "func2", kInitDelayUs, kRepeat2Us); ASSERT_TRUE(
timer.Add([&] { func2_counter++; }, "func2", kInitDelayUs, kRepeat2Us));
// New function with the same name should override the existing one // New function with the same name should fail to add
int func_counter2 = 0; int func_counter2 = 0;
timer.Add([&] { func_counter2++; }, "duplicated_func", kInitDelayUs, ASSERT_FALSE(timer.Add([&] { func_counter2++; }, "duplicated_func",
kRepeat1Us); kInitDelayUs, kRepeat1Us));
ASSERT_EQ(0, func_counter1); ASSERT_EQ(0, func_counter1);
ASSERT_EQ(0, func2_counter); ASSERT_EQ(0, func2_counter);
ASSERT_EQ(0, func_counter2);
timer.TEST_WaitForRun( timer.TEST_WaitForRun(
[&] { mock_clock_->SleepForMicroseconds(kInitDelayUs); }); [&] { mock_clock_->SleepForMicroseconds(kInitDelayUs); });
ASSERT_EQ(0, func_counter1); ASSERT_EQ(1, func_counter1);
ASSERT_EQ(1, func2_counter); ASSERT_EQ(1, func2_counter);
ASSERT_EQ(1, func_counter2);
timer.TEST_WaitForRun([&] { mock_clock_->SleepForMicroseconds(kRepeat1Us); }); timer.TEST_WaitForRun([&] { mock_clock_->SleepForMicroseconds(kRepeat1Us); });
ASSERT_EQ(0, func_counter1); ASSERT_EQ(2, func_counter1);
ASSERT_EQ(2, func2_counter); ASSERT_EQ(2, func2_counter);
ASSERT_EQ(2, func_counter2); ASSERT_EQ(0, func_counter2);
ASSERT_TRUE(timer.Shutdown()); ASSERT_TRUE(timer.Shutdown());
} }

Loading…
Cancel
Save