使用Python的select模块检查是否有更多数据要从文件描述符中读取

我有一个程序在一个线程中创build一个subprocess,以便线程可以不断检查特定的输出条件(从标准输出或标准错误),并调用适当的callback,而其余的程序继续。 这是这个代码的简化版本:

import select import subprocess import threading def run_task(): command = ['python', 'a-script-that-outputs-lines.py'] proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE) while True: ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1) if proc.stdout in ready: next_line_to_process = proc.stdout.readline() # process the output if proc.stderr in ready: next_line_to_process = proc.stderr.readline() # process the output if not ready and proc.poll() is not None: break thread = threading.Thread(target = run_task) thread.run() 

它工作得相当好,但我希望线程退出,一旦满足两个条件:正在运行的subprocess已完成,stdout和stderr中的所有数据已被处理。

我的困难是,如果我的最后一个条件是上面的( if not ready and proc.poll() is not None ),那么线程永远不会退出,因为一旦stdout和stderr的文件描述符被标记为准备就绪, (即使所有的数据已经从它们读取,并且read()会挂起或者readline()会返回一个空string)。

如果我改变这个条件, if proc.poll() is not None ,那么当程序退出的时候循环就会存在,我不能保证看到所有需要处理的数据。

这只是错误的方法,还是有办法可靠地确定什么时候你已经读取了所有将被写入文件描述符的数据? 或者这是一个特定的尝试从一个subprocess的stderr /标准输出读取的问题?

我一直在Python 2.5上运行(在OS X上运行),并且还尝试了Python 2.6(基于2.6内核的Debian上运行select.epoll()基于select.epoll()select.epoll()的变体。

select模块是合适的,如果你想知道你是否可以从一个管道读取没有阻塞。

if proc.poll() is not None: break和call rest = [pipe.read() for pipe in [p.stdout, p.stderr]]循环。

子进程在关闭之前不太可能关闭它的stdout / stderr,因此为了简单起见你可以跳过处理EOF的逻辑。


不要直接调用Thread.run() ,而是使用Thread.start() 。 你可能根本不需要单独的线程。

p.stdout.readline()之后不要调用p.stdout.readline() ,它可能会阻塞, os.read(p.stdout.fileno(), limit)使用os.read(p.stdout.fileno(), limit) 。 空字节串表示相应管道的EOF。


作为替代或补充,您可以使用fcntl模块使管道无阻塞:

 import os from fcntl import fcntl, F_GETFL, F_SETFL def make_nonblocking(fd): return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | os.O_NONBLOCK) 

并在阅读时处理io / os错误。

正如我上面提到的,我最终的解决方案如下,以防止这对任何人都有帮助。 我认为这是正确的方法,因为我现在97.2%肯定你不能只用select() / poll()read()来做到这一点:

 import select import subprocess import threading def run_task(): command = ['python', 'a-script-that-outputs-lines.py'] proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE) while True: ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1) if proc.stdout in ready: next_line_to_process = proc.stdout.readline() if next_line_to_process: # process the output elif proc.returncode is not None: # The program has exited, and we have read everything written to stdout ready = filter(lambda x: x is not proc.stdout, ready) if proc.stderr in ready: next_line_to_process = proc.stderr.readline() if next_line_to_process: # process the output elif proc.returncode is not None: # The program has exited, and we have read everything written to stderr ready = filter(lambda x: x is not proc.stderr, ready) if proc.poll() is not None and not ready: break thread = threading.Thread(target = run_task) thread.run() 

你可以在管道的文件描述符上做一个原始的os.read(fd, size) ,而不是使用readline() 。 这是一个非阻塞操作,它也可以检测到EOF(在这种情况下,它返回一个空的字符串或字节对象)。 你必须实现分行和缓冲自己。 使用这样的东西:

 class NonblockingReader(): def __init__(self, pipe): self.fd = pipe.fileno() self.buffer = "" def readlines(self): data = os.read(self.fd, 2048) if not data: return None self.buffer += data if os.linesep in self.buffer: lines = self.buffer.split(os.linesep) self.buffer = lines[-1] return lines[:-1] else: return []