Python中快速的方法调用调度

对于我的项目的一部分,我需要一个进程本地调度系统,这将允许我延迟几秒钟的方法执行。 我有这个系统的成千上万的“客户”,所以使用threading.Timer每个延迟是一个坏主意,因为我会很快达到操作系统线程的限制。 我已经实现了一个只使用一个线程进行时序控制的系统。

主要思想是保持sorting的任务(时间+ func + args + kwargs)队列,并使用单线程。定时器调度/取消该队列头部的执行。 这个scheme很有效,但是我对表演不满意。 大约2000个客户端每10秒计划虚拟任务会导致进程花费40%的CPU时间。 看看分析器的输出,我发现所有的时间都花在新的threading.Timer 。定时器的构build,它的开始,特别是新线程的创build。

我相信还有更好的办法。 现在我想重写LightTimer以便有一个执行线程可以通过threading.Event和几个定时线程来控制事件。 例如:

  • 我安排一个任务在10秒内打电话。 该任务被添加到队列中。 定时线程#1在event.set()之前启动event.set() time.sleep(10) event.set()
  • 然后我安排一个任务,在11秒内打电话。 该任务被添加到队列中。 定时线程没有任何反应,唤醒后会注意到新的任务。
  • 然后我安排一个任务在5秒内打电话。 任务被预先添加到队列中。 定时线程#2启动time.sleep(5)因为#1已经睡了一段较长的时间间隔。

我希望你已经发现了这个想法。 你怎么看待这种方式? 有没有更好的办法? 也许我可以利用一些Linux系统function来制定最佳解决scheme?

您可以使用的替代实现是使用time.time()方法来计算每个排队函数应该执行的绝对时间。 将此时间和要调用的函数放在一个对象包装器中,该对象包装器使用执行时间来覆盖比较运算符以确定顺序。 然后使用heapq模块来维护最小堆。 这将为您提供一个高效的数据结构,其中堆的元素0始终是您的下一个事件。

实现实际调用的一种方法是使用单独的线程来执行回调。 堆将需要用互斥锁保护,您可以使用条件变量来实现计划。 在一个无限循环中,只需查找下一次执行函数(堆的元素0),并使用条件变量的wait()方法,将超时设置为下一个执行时间。 如果新插入的函数应该在堆中最早的函数之前发生,那么您的堆插入方法可以使用条件变量的notify()方法提前唤醒调度线程。

你看过Python标准库中的sched模块吗? 在一个专用的线程上运行调度器(并且让所有的调度动作“将一个绑定的方法和它的参数放在一个队列中”,从这个线程池中的线程剥离并执行它 – 正如我在线程的“果壳”一章中所写的那样,除了在那种情况下没有调度)应该做你想要的。

“几千客户”不太可能达到操作系统线程限制; 尽管所有这些线程的堆栈可能会消耗大量不必要的内存。

看看扭曲了什么,它允许一个过程,以证明在大量事件中工作得很好的方式来复用很多事件(包括定时器)。

你也可以结合事件驱动和多进程模型,每台机器运行多个进程,并在每个进程中执行事件驱动的逻辑 – 比如一个进程可以处理2000个客户,你仍然可以运行30个进程(假设有足够的整体资源),并获得更好的吞吐量,特别是在现代多核硬件上。