You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
rocksdb/util/timer_queue.h

231 lines
7.2 KiB

// Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Borrowed from
// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
// Timer Queue
//
// License
//
// The source code in this article is licensed under the CC0 license, so feel
// free to copy, modify, share, do whatever you want with it.
// No attribution is required, but Ill be happy if you do.
// CC0 license
// The person who associated a work with this deed has dedicated the work to the
// public domain by waiving all of his or her rights to the work worldwide
// under copyright law, including all related and neighboring rights, to the
// extent allowed by law. You can copy, modify, distribute and perform the
// work, even for commercial purposes, all without asking permission.
#pragma once
#include <assert.h>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
#include "port/port.h"
#include "test_util/sync_point.h"
// Allows execution of handlers at a specified time in the future
// Guarantees:
// - All handlers are executed ONCE, even if cancelled (aborted parameter will
// be set to true)
// - If TimerQueue is destroyed, it will cancel all handlers.
// - Handlers are ALWAYS executed in the Timer Queue worker thread.
// - Handlers execution order is NOT guaranteed
//
////////////////////////////////////////////////////////////////////////////////
// borrowed from
// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
class TimerQueue {
public:
TimerQueue() : m_th(&TimerQueue::run, this) {}
~TimerQueue() { shutdown(); }
// This function is not thread-safe.
void shutdown() {
if (closed_) {
return;
}
cancelAll();
// Abusing the timer queue to trigger the shutdown.
add(0, [this](bool) {
m_finish = true;
return std::make_pair(false, 0);
});
m_th.join();
closed_ = true;
}
// Adds a new timer
// \return
// Returns the ID of the new timer. You can use this ID to cancel the
// timer
uint64_t add(int64_t milliseconds,
std::function<std::pair<bool, int64_t>(bool)> handler) {
WorkItem item;
Clock::time_point tp = Clock::now();
item.end = tp + std::chrono::milliseconds(milliseconds);
TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end);
item.period = milliseconds;
item.handler = std::move(handler);
std::unique_lock<std::mutex> lk(m_mtx);
uint64_t id = ++m_idcounter;
item.id = id;
m_items.push(std::move(item));
// Something changed, so wake up timer thread
m_checkWork.notify_one();
return id;
}
// Cancels the specified timer
// \return
// 1 if the timer was cancelled.
// 0 if you were too late to cancel (or the timer ID was never valid to
// start with)
size_t cancel(uint64_t id) {
// Instead of removing the item from the container (thus breaking the
// heap integrity), we set the item as having no handler, and put
// that handler on a new item at the top for immediate execution
// The timer thread will then ignore the original item, since it has no
// handler.
std::unique_lock<std::mutex> lk(m_mtx);
for (auto&& item : m_items.getContainer()) {
if (item.id == id && item.handler) {
WorkItem newItem;
// Zero time, so it stays at the top for immediate execution
newItem.end = Clock::time_point();
newItem.id = 0; // Means it is a canceled item
// Move the handler from item to newitem (thus clearing item)
newItem.handler = std::move(item.handler);
m_items.push(std::move(newItem));
// Something changed, so wake up timer thread
m_checkWork.notify_one();
return 1;
}
}
return 0;
}
// Cancels all timers
// \return
// The number of timers cancelled
size_t cancelAll() {
// Setting all "end" to 0 (for immediate execution) is ok,
// since it maintains the heap integrity
std::unique_lock<std::mutex> lk(m_mtx);
m_cancel = true;
for (auto&& item : m_items.getContainer()) {
if (item.id && item.handler) {
item.end = Clock::time_point();
item.id = 0;
}
}
auto ret = m_items.size();
m_checkWork.notify_one();
return ret;
}
private:
using Clock = std::chrono::steady_clock;
TimerQueue(const TimerQueue&) = delete;
TimerQueue& operator=(const TimerQueue&) = delete;
void run() {
std::unique_lock<std::mutex> lk(m_mtx);
while (!m_finish) {
auto end = calcWaitTime_lock();
if (end.first) {
// Timers found, so wait until it expires (or something else
// changes)
m_checkWork.wait_until(lk, end.second);
} else {
// No timers exist, so wait forever until something changes
m_checkWork.wait(lk);
}
// Check and execute as much work as possible, such as, all expired
// timers
checkWork(&lk);
}
// If we are shutting down, we should not have any items left,
// since the shutdown cancels all items
assert(m_items.size() == 0);
}
std::pair<bool, Clock::time_point> calcWaitTime_lock() {
while (m_items.size()) {
if (m_items.top().handler) {
// Item present, so return the new wait time
return std::make_pair(true, m_items.top().end);
} else {
// Discard empty handlers (they were cancelled)
m_items.pop();
}
}
// No items found, so return no wait time (causes the thread to wait
// indefinitely)
return std::make_pair(false, Clock::time_point());
}
void checkWork(std::unique_lock<std::mutex>* lk) {
while (m_items.size() && m_items.top().end <= Clock::now()) {
WorkItem item(m_items.top());
m_items.pop();
if (item.handler) {
(*lk).unlock();
auto reschedule_pair = item.handler(item.id == 0);
(*lk).lock();
if (!m_cancel && reschedule_pair.first) {
int64_t new_period = (reschedule_pair.second == -1)
? item.period
: reschedule_pair.second;
item.period = new_period;
item.end = Clock::now() + std::chrono::milliseconds(new_period);
m_items.push(std::move(item));
}
}
}
}
bool m_finish = false;
bool m_cancel = false;
uint64_t m_idcounter = 0;
std::condition_variable m_checkWork;
struct WorkItem {
Clock::time_point end;
int64_t period;
uint64_t id; // id==0 means it was cancelled
std::function<std::pair<bool, int64_t>(bool)> handler;
bool operator>(const WorkItem& other) const { return end > other.end; }
};
std::mutex m_mtx;
// Inheriting from priority_queue, so we can access the internal container
class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>,
std::greater<WorkItem>> {
public:
std::vector<WorkItem>& getContainer() { return this->c; }
} m_items;
ROCKSDB_NAMESPACE::port::Thread m_th;
bool closed_ = false;
};