我想用C ++创build一个非常高效的任务调度器系统。
基本的想法是这样的:
class Task { public: virtual void run() = 0; }; class Scheduler { public: void add(Task &task, double delayToRun); };
在Scheduler
后面,应该有一个固定大小的线程池,它运行任务(我不想为每个任务创build一个线程)。 delayToRun
意味着该task
不会立即执行,而是delayToRun
秒后(从添加到Scheduler
的点开始测量)。
( delayToRun
是一个“至less”的值,如果系统被加载,或者如果我们向调度器请求不可能的话,它将无法处理我们的请求,但是它应该尽可能地做到这一点)
这是我的问题。 如何高效地实现delayToRun
function? 我正在尝试使用互斥锁和条件variables来解决这个问题。
我看到两种方式:
调度程序包含两个队列: allTasksQueue
和tasksReadyToRunQueue
。 在Scheduler::add
allTasksQueue
任务添加到allTasksQueue
中。 有一个pipe理器线程,它等待最less的时间,这样就可以将任务从allTasksQueue
放到tasksReadyToRunQueue
。 工作线程等待tasksReadyToRunQueue
可用的任务。
如果Scheduler::add
在allTasksQueue
前面Scheduler::add
了一个任务(一个任务,其值为delayToRun
所以它应该在当前最快的任务之前),那么pipe理器任务需要被唤醒,所以它可以更新等待的时间。
这个方法可以被认为是低效的,因为它需要两个队列,并且需要两个condvar.signals来完成一个任务(一个用于allTasksQueue
– > tasksReadyToRunQueue
,另一个用于发信号tasksReadyToRunQueue
工作线程实际运行任务)
调度器中有一个队列。 在Scheduler::add
任务添加到此队列中。 工作线程检查队列。 如果是空的,则等待没有时间限制。 如果不是空的,则等待最快的任务。
如果只有一个工作线程正在等待的条件variables:这个方法可以被认为是低效的,因为如果一个任务添加在队列前(前面的意思是,如果有N个工作线程,那么任务索引<N)那么所有的工作线程都需要被唤醒来更新他们正在等待的时间。
如果每个线程都有一个单独的条件variables,那么我们可以控制哪个线程被唤醒,所以在这种情况下我们不需要唤醒所有的线程(我们只需要唤醒等待时间最长的线程,所以我们需要pipe理这个值)。 我正在考虑实施这个,但是确切的细节是很复杂的。 有没有关于这个方法的build议/想法/文件?
有没有更好的解决这个问题? 我试图使用标准的C ++function,但是如果它们提供了更好的解决scheme,我愿意使用平台相关的(我的主要平台是linux)工具(如pthreads),甚至是linux特定的工具(如futexes)。
您可以避免使用单独的“管理器”线程,并且在下一个运行任务更改时必须唤醒大量任务,方法是使用单个线程池线程等待“下一个运行”任务的设计(如果有的话)一个条件变量,其余的池线程无限期地等待第二个条件变量。
池线程将沿着这些行执行伪代码:
pthread_mutex_lock(&queue_lock); while (running) { if (head task is ready to run) { dequeue head task; if (task_thread == 1) pthread_cond_signal(&task_cv); else pthread_cond_signal(&queue_cv); pthread_mutex_unlock(&queue_lock); run dequeued task; pthread_mutex_lock(&queue_lock); } else if (!queue_empty && task_thread == 0) { task_thread = 1; pthread_cond_timedwait(&task_cv, &queue_lock, time head task is ready to run); task_thread = 0; } else { pthread_cond_wait(&queue_cv, &queue_lock); } } pthread_mutex_unlock(&queue_lock);
如果您更改下一个要运行的任务,则执行:
if (task_thread == 1) pthread_cond_signal(&task_cv); else pthread_cond_signal(&queue_cv);
与queue_lock
举行。
在这个方案下,所有的唤醒都只在一个线程中,只有一个任务的优先级队列,并且不需要管理线程。
你的规范有点太强大了:
delayToRun
表示该任务不会立即执行,而是delayToRun
数秒后执行
你忘了添加“至少”:
delayToRun
几秒钟 重点是如果一万个任务都安排了0.1
delayToRun,他们肯定不会同时运行。
有了这样的更正,你只需要维护一些队列(或议程)(计划启动时间,关闭运行),你保持该队列排序,并开始N
(一些固定数量)的线程,原子弹出的第一个元素议程并运行它。
那么所有的工作线程都需要被唤醒来更新他们正在等待的时间。
不, 一些工作线程会被唤醒。
阅读关于条件变量和广播。
您也可以使用POSIX计时器,请参阅timer_create(2)或Linux特定的fd计时器,请参阅timerfd_create(2)
你可能会避免在你的线程中运行阻塞系统调用,并有一些中心线程使用一些事件循环来管理它们(参见poll(2) …); 否则,如果你有一百个运行sleep(100)
任务sleep(100)
,一个任务计划在半秒内运行,它将在一百秒之前运行。
您可能想了解关于continuation-passing风格的编程(它-CPS-高度相关)。 阅读Juliusz Chroboczek 关于延续传球C的论文。
再看看Qt线程 。
你也可以考虑Go的编码(和它的Goroutines)。
这是您提供的与“ With manager thread ”描述最接近的界面的示例实现。
它使用一个单线程( timer_thread
)来管理一个队列( allTasksQueue
),这个队列是根据任务启动的实际时间( std::chrono::time_point
)来std::chrono::time_point
。
“队列”是一个std::priority_queue
(保持其time_point
键元素排序)。
timer_thread
通常是暂停的,直到下一个任务开始或添加新的任务。
当任务即将运行时,它将被放置在tasksReadyToRunQueue
,其中一个工作线程将被发信号通知,唤醒,将其从队列中移除并开始处理任务。
请注意,线程池对于线程数有一个编译时间上限(40)。 如果您安排的任务多于可分派给工作人员的任务,则新任务将阻塞,直到线程再次可用。
你说这种方法效率不高,但总的来说,对我来说似乎是合理的。 这一切都是由事件驱动的,而不是由于不必要的旋转而浪费CPU周期。 当然,这只是一个例子,优化是可能的(注意: std::multimap
已被替换为std::priority_queue
)。
实现是C ++ 11兼容
#include <iostream> #include <chrono> #include <queue> #include <unistd.h> #include <vector> #include <thread> #include <condition_variable> #include <mutex> #include <memory> class Task { public: virtual void run() = 0; virtual ~Task() { } }; class Scheduler { public: Scheduler(); ~Scheduler(); void add(Task &task, double delayToRun); private: using timepoint = std::chrono::time_point<std::chrono::steady_clock>; struct key { timepoint tp; Task *taskp; }; struct TScomp { bool operator()(const key &a, const key &b) const { return a.tp > b.tp; } }; const int ThreadPoolSize = 40; std::vector<std::thread> ThreadPool; std::vector<Task *> tasksReadyToRunQueue; std::priority_queue<key, std::vector<key>, TScomp> allTasksQueue; std::thread TimerThr; std::mutex TimerMtx, WorkerMtx; std::condition_variable TimerCV, WorkerCV; bool WorkerIsRunning = true; bool TimerIsRunning = true; void worker_thread(); void timer_thread(); }; Scheduler::Scheduler() { for (int i = 0; i <ThreadPoolSize; ++i) ThreadPool.push_back(std::thread(&Scheduler::worker_thread, this)); TimerThr = std::thread(&Scheduler::timer_thread, this); } Scheduler::~Scheduler() { { std::lock_guard<std::mutex> lck{TimerMtx}; TimerIsRunning = false; TimerCV.notify_one(); } TimerThr.join(); { std::lock_guard<std::mutex> lck{WorkerMtx}; WorkerIsRunning = false; WorkerCV.notify_all(); } for (auto &t : ThreadPool) t.join(); } void Scheduler::add(Task &task, double delayToRun) { auto now = std::chrono::steady_clock::now(); long delay_ms = delayToRun * 1000; std::chrono::milliseconds duration (delay_ms); timepoint tp = now + duration; if (now >= tp) { /* * This is a short-cut * When time is due, the task is directly dispatched to the workers */ std::lock_guard<std::mutex> lck{WorkerMtx}; tasksReadyToRunQueue.push_back(&task); WorkerCV.notify_one(); } else { std::lock_guard<std::mutex> lck{TimerMtx}; allTasksQueue.push({tp, &task}); TimerCV.notify_one(); } } void Scheduler::worker_thread() { for (;;) { std::unique_lock<std::mutex> lck{WorkerMtx}; WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 || !WorkerIsRunning; } ); if (!WorkerIsRunning) break; Task *p = tasksReadyToRunQueue.back(); tasksReadyToRunQueue.pop_back(); lck.unlock(); p->run(); delete p; // delete Task } } void Scheduler::timer_thread() { for (;;) { std::unique_lock<std::mutex> lck{TimerMtx}; if (!TimerIsRunning) break; auto duration = std::chrono::nanoseconds(1000000000); if (allTasksQueue.size() != 0) { auto now = std::chrono::steady_clock::now(); auto head = allTasksQueue.top(); Task *p = head.taskp; duration = head.tp - now; if (now >= head.tp) { /* * A Task is due, pass to worker threads */ std::unique_lock<std::mutex> ulck{WorkerMtx}; tasksReadyToRunQueue.push_back(p); WorkerCV.notify_one(); ulck.unlock(); allTasksQueue.pop(); } } TimerCV.wait_for(lck, duration); } } /* * End sample implementation */ class DemoTask : public Task { int n; public: DemoTask(int n=0) : n{n} { } void run() override { std::cout << "Start task " << n << std::endl;; std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << " Stop task " << n << std::endl;; } }; int main() { Scheduler sched; Task *t0 = new DemoTask{0}; Task *t1 = new DemoTask{1}; Task *t2 = new DemoTask{2}; Task *t3 = new DemoTask{3}; Task *t4 = new DemoTask{4}; Task *t5 = new DemoTask{5}; sched.add(*t0, 7.313); sched.add(*t1, 2.213); sched.add(*t2, 0.713); sched.add(*t3, 1.243); sched.add(*t4, 0.913); sched.add(*t5, 3.313); std::this_thread::sleep_for(std::chrono::seconds(10)); }
这意味着您想要使用某种顺序连续运行所有任务。
你可以创建一些类型的任务的延迟堆栈(甚至链表)。 当新任务即将到来时,您应该根据延迟时间将其插入到位置(只需高效地计算该位置并高效地插入新任务)。
运行从任务堆栈(或列表)头部开始的所有任务。
C ++ 11的核心代码:
#include <thread> #include <queue> #include <chrono> #include <mutex> #include <atomic> using namespace std::chrono; using namespace std; class Task { public: virtual void run() = 0; }; template<typename T, typename = enable_if<std::is_base_of<Task, T>::value>> class SchedulerItem { public: T task; time_point<steady_clock> startTime; int delay; SchedulerItem(T t, time_point<steady_clock> s, int d) : task(t), startTime(s), delay(d){} }; template<typename T, typename = enable_if<std::is_base_of<Task, T>::value>> class Scheduler { public: queue<SchedulerItem<T>> pool; mutex mtx; atomic<bool> running; Scheduler() : running(false){} void add(T task, double delayMsToRun) { lock_guard<mutex> lock(mtx); pool.push(SchedulerItem<T>(task, high_resolution_clock::now(), delayMsToRun)); if (running == false) runNext(); } void runNext(void) { running = true; auto th = [this]() { mtx.lock(); auto item = pool.front(); pool.pop(); mtx.unlock(); auto remaining = (item.startTime + milliseconds(item.delay)) - high_resolution_clock::now(); if(remaining.count() > 0) this_thread::sleep_for(remaining); item.task.run(); if(pool.size() > 0) runNext(); else running = false; }; thread t(th); t.detach(); } };
测试代码:
class MyTask : Task { public: virtual void run() override { printf("mytask \n"); }; }; int main() { Scheduler<MyTask> s; s.add(MyTask(), 0); s.add(MyTask(), 2000); s.add(MyTask(), 2500); s.add(MyTask(), 6000); std::this_thread::sleep_for(std::chrono::seconds(10)); }