低级select.poll()从subprocess读取

我使用selectos模块中的底层POSIX工具从连接到正在运行的shell进程的pipe道读取数据。 为避免无限制地阻塞,我使用fcntl模块将pipe道进程的stdout文件描述符设置为非阻塞模式,然后使用select.poll轮询文件描述符,直到数据可供读取。 一旦数据可用,我使用os.read()从pipe道读取一些数据,然后继续循环,直到os.read()返回一个空的bytes对象或发生一些错误。

我有它的工作,除了一些原因,我最终从pipe道读取的数据被截断。 我读了pipe道进程的预期输出的一半,然后os.read()返回一个空bytes对象。 我不知道为什么我失去了其余的数据。

基本上,我有一个run_poll_once()函数,对poll对象的poll()方法进行一次调用。 如果我们应该继续轮询更多的数据,函数返回True ,如果我们应该停止,则返回False 。 该function如下(为了清晰和相关性,删除和编辑错误检查):

 def run_poll_once(poll): events = poll.poll(0.10) for fd, event in events: if event & select.POLLERR: return False if (event & select.POLLIN) or (event & select.POLLHUP): data = os.read(fd, READ_SIZE) print("Read:", data) if len(data) == 0: return False # ... do stuff with data return True 

然后我调用这个函数就像:

 with subprocess.Popen( ["ls", "-lh"], stdin = None, stdout = subprocess.PIPE, bufsize = 0 ) as proc: # --- snip setting proc.stdout.fileno() to non-blocking mode poll = select.poll() event_mask = select.POLLIN | select.POLLERR | select.POLLHUP poll.register(proc.stdout.fileno(), event_mask) while run_poll_once(poll): pass 

因此,这使我得到了pipe道进程( ls -lh )的预期输出的一半,然后os.read()过早地返回一个空bytes对象。 那么我在这里做错了什么?

好的,回答我自己的问题。

所以,正如评论中提到的,我之前发布了一个答案,然后将其删除。 我删除的答案是:

我想通了:显然proc.stdout流对象是自动做自己的内部缓冲,尽管传递给proc.stdoutbufsize = 0参数。 流对象似乎自动缓冲可用​​于读取幕后管道的stdout文件描述符的数据。

所以基本上,我不能使用os.read直接从底层描述符中读取,因为proc.stdout BufferedReader通过读取底层描述符自动进行缓冲。 为了得到这个工作,我可以直接调用proc.stdout.read(READ_SIZE)而不是os.read(fd, READ_SIZE) ,在poll()指示有数据要读取之后。 这按预期工作。

我删除它,因为最终我意识到这个解决方案也不是很正确。 问题在于,即使它可能大部分时间工作,也没有真正的保证这将工作,因为调用poll()将只返回POLLIN事件,当一个实际的低级操作系统中断发生指示数据是可以在内核缓冲区中读取。 但是调用proc.stdout.read()并不是直接从内核缓冲区中读取的,而是从一些内部的Python缓冲区读取的。 所以POLLIN事件和我们实际阅读的决定之间是不一致的。 事实上,它们是完全不相关的 – 所以不能保证我们的轮询工作正常,因此不保证对proc.stdout.read()的调用不会阻塞或丢失字节。

但是如果我们使用os.read() ,则不能保证我们对os.read()调用将始终能够直接从内核缓冲区中读取所有字节,因为Python BufferedReader对象基本上是“与我们对抗”它是自己的缓冲。 我们都在同一个底层内核缓冲区中争斗,并且在我们能够通过调用os.read()来提取这些字节之前,Python BufferedReader有时可能会提取字节进行自己的缓冲。 特别是,我观察到,如果子进程意外退出或中止,Python BufferedReader将立即从内核读取缓冲区中消耗所有剩余的字节( 即使将bufsize设置为0),这也是我失去部分输出的原因ls -lh

对于任何人在重现这个问题上有困难时,确保你使用的子进程输出大量的数据,至少在15K左右。

那么,解决方案是什么?

解决方案1:

我意识到,试图通过使用我自己的底层系统调用来绕过Python缓冲来尝试与Python自己的缓冲设施作斗争是一个不起眼的尝试。 所以,使用subprocess模块基本上是。 我通过os模块直接使用低级别的操作系统来重新实现这个功能。 基本上,我做了C中经常做的事情:使用os.pipe()调用,然后使用os.dup()创建管道文件描述符,然后使用os.dup()将管道的读取端指向子进程的sys.stdout.fileno()描述符。 最后,调用子进程中的一个os.exec函数来开始执行实际的子进程。

除了这个不是100%正确的。 这几乎是所有的时间,除非你碰巧创建一个开始向sys.stdout.fileno()输出大量字节的子进程。 在这种情况下,你遇到了OS管道缓冲区的问题,它有一些限制(我认为它在Linux上是65K)。 一旦操作系统管道缓冲区填满,进程可能会挂起,因为子进程用于执行I / O的任何库可能正在做它自己的缓冲。

就我而言,子进程使用C ++ <ostream>设施来完成I / O。 这也是自己的缓冲,所以在管道缓冲区填满的时候,子进程就会挂起。 我从来没有弄清楚究竟是什么原因。 据推测,如果管道缓冲区已满,应该挂起 – 但我会想,如果父进程(我控制)在管道的读取端调用os.read() ,子进程可以恢复输出。 我怀疑这是孩子进程自己缓冲的另一个问题。 C / C ++标准库输出函数(如printf中的printf或C ++中的std::cout )不会直接写入stdout ,而是执行自己的内部缓冲。 我怀疑发生了什么事是管道缓冲区填满了,所以一些调用printfstd::cout简单地挂起后,无法完全刷新缓冲区。

所以这带给我…

解决方案2:

所以事实证明,使用管道来做到这一点真的是从根本上打破。 似乎没有人在数以千计的教程中这样说过,所以也许我错了,但我声称使用管道与子进程通信是一个根本性的破坏方法。 在不同层次上进行的各种缓冲都有可能出错的事情太多了。 如果你完全控制了子进程,你可以直接使用(在Python中)类似os.write(1, mybuffer)直接写入stdout ,但大多数情况下你不能控制子进程,大多数程序将不会直接写入stdout ,而是会使用一些标准的I / O设备,这些设备有自己的缓冲方式。

所以,忘记管道。 真正的做法是使用伪终端 。 这可能不是可移植的,但它应该适用于大多数符合POSIX的平台。 伪终端基本上是一个类似于管道的I / O对象,其行为与标准控制台输出描述符stdoutstderr相似。 重要的是,对于伪终端,低级别的iocontrol系统调用isatty返回true ,因此标准I / O设施(如C语言中的stdio.h )将把线路视为线路缓冲控制台。

在Python中,您可以使用pty模块创建一个伪终端。 要创建一个子进程,然后把它的stdout挂接到父进程中的一个伪终端,你可以这样做:

 out_master, out_slave = pty.openpty() os.set_inheritable(out_master, True) os.set_inheritable(out_slave, True) pid = os.fork() if pid == 0: # child process try: assert(os.isatty(out_slave)) os.dup2(out_slave, sys.stdout.fileno()) os.close(out_master) os.execlp(name_of_child_process, shell_command_to_execute_child_process) except Exception as e: os._exit(os.EX_OSERR) else: # parent process os.close(out_slave) 

现在,您可以从out_master读取以获取从子进程写入到stdout ,而且由于您使用的是伪终端,因此子进程的行为与输出到控制台的行为完全一样 – 因此它可以很好地工作没有缓冲问题。 当然,你也可以用stderr做同样的事情。

令人惊讶的是,这个解决方案很简单,但是我必须自己去发现它,因为互联网上几乎所有关于与子进程通信的教程或指南都会坚持使用管道,这似乎是一个根本性的破坏方法。