这是在Linux,Python 3.5.1上。
我正在用asyncio
开发一个监视器进程,在各个地方的任务await
asyncio.sleep
调用各种持续时间。
有些时候,我希望能够中断所有的asyncio.sleep
调用,并让所有任务正常进行,但我找不到如何做到这一点。 一个例子是正常closures监视器进程。
我认为我可以发送一个ALRM信号,但是这个过程会死亡。 我试图捕捉ALRM信号:
def sigalrm_sent(signum, frame): tse.logger.info("got SIGALRM") signal.signal(signal.SIGALRM, sigalrm_sent)
然后我得到有关捕获SIGALRM的日志行,但是asyncio.sleep
调用不会中断。
在这一点上,我将所有的asyncio.sleep
调用replace为这个协程的调用:
async def interruptible_sleep(seconds): while seconds > 0 and not tse.stop_requested: duration = min(seconds, tse.TIME_QUANTUM) await asyncio.sleep(duration) seconds -= duration
所以我只需要选一个不算小也不要太大的TIME_QUANTUM
。
有没有办法中断所有正在运行的asyncio.sleep
调用,我错过了?
中断asyncio.sleep
所有正在运行的调用似乎有点危险,因为它可以用于代码的其他部分,用于其他目的。 相反,我会做一个专门的sleep
协程,跟踪它的运行调用。 然后可以通过取消相应的任务来中断它们:
def make_sleep(): async def sleep(delay, result=None, *, loop=None): coro = asyncio.sleep(delay, result=result, loop=loop) task = asyncio.ensure_future(coro) sleep.tasks.add(task) try: return await task except asyncio.CancelledError: return result finally: sleep.tasks.remove(task) sleep.tasks = set() sleep.cancel_all = lambda: sum(task.cancel() for task in sleep.tasks) return sleep
例:
async def main(sleep, loop): for i in range(10): loop.create_task(sleep(i)) await sleep(3) nb_cancelled = sleep.cancel_all() await asyncio.wait(sleep.tasks) return nb_cancelled sleep = make_sleep() loop = asyncio.get_event_loop() result = loop.run_until_complete(main(sleep, loop)) print(result) # Print '6'
出于调试的目的, loop.time = lambda: float('inf')
也可以。
根据Vincent的回答,我使用了下面的类(每个类的实例都可以取消所有正在运行的.sleep
任务,从而实现更好的划分):
class Sleeper: "Group sleep calls allowing instant cancellation of all" def __init__(self, loop): self.loop = loop self.tasks = set() async def sleep(self, delay, result=None): coro = aio.sleep(delay, result=result, loop=self.loop) task = aio.ensure_future(coro) self.tasks.add(task) try: return await task except aio.CancelledError: return result finally: self.tasks.remove(task) def cancel_all_helper(self): "Cancel all pending sleep tasks" cancelled = set() for task in self.tasks: if task.cancel(): cancelled.add(task) return cancelled async def cancel_all(self): "Coroutine cancelling tasks" cancelled = self.cancel_all_helper() await aio.wait(self.tasks) self.tasks -= cancelled return len(cancelled)