python Pipes的同步/asynchronous行为

在我的应用程序中,我使用多处理模块中的pipe道在python进程之间进行通信。 最近我观察到一个奇怪的行为取决于我通过他们发送的数据的大小。 根据python文档,这些pipe道是基于连接的,并且应该以asynchronous的方式运行,但是有时他们在发送时会卡住。 如果我在每个连接中启用全双工,一切工作正常,即使我没有使用连接发送和收听。 任何人都可以解释此行为?

  1. 100浮点数,全双工禁用
    代码工作,利用asynchronous。
  2. 100个浮点,全双工启用
    该示例正常工作正常。
  3. 10000浮点数,全双工禁用
    尽pipe数据较小,但执行被永久封锁。
  4. 10000浮点,全双工启用
    再次罚款。

代码(这不是我的生产代码,它只是说明了我的意思):

from collections import deque from multiprocessing import Process, Pipe from numpy.random import randn from os import getpid PROC_NR = 4 DATA_POINTS = 100 # DATA_POINTS = 10000 def arg_passer(pipe_in, pipe_out, list_): my_pid = getpid() print "{}: Before send".format(my_pid) pipe_out.send(list_) print "{}: After send, before recv".format(my_pid) buf = pipe_in.recv() print "{}: After recv".format(my_pid) if __name__ == "__main__": pipes = [Pipe(False) for _ in range(PROC_NR)] # pipes = [Pipe(True) for _ in range(PROC_NR)] pipes_in = deque(p[0] for p in pipes) pipes_out = deque(p[1] for p in pipes) pipes_in.rotate(1) pipes_out.rotate(-1) data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)] processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo])) for foo in xrange(PROC_NR)] for proc in processes: proc.start() for proc in processes: proc.join() 

首先,值得注意的是multiprocessing.Pipe类的实现…

 def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' if duplex: s1, s2 = socket.socketpair() s1.setblocking(True) s2.setblocking(True) c1 = _multiprocessing.Connection(os.dup(s1.fileno())) c2 = _multiprocessing.Connection(os.dup(s2.fileno())) s1.close() s2.close() else: fd1, fd2 = os.pipe() c1 = _multiprocessing.Connection(fd1, writable=False) c2 = _multiprocessing.Connection(fd2, readable=False) return c1, c2 

不同之处在于半双工“Pipes”使用匿名管道 ,但全双工“Pipes”实际上使用Unix域套接字 ,因为匿名管道本质上是单向的。

在这种情况下,我不确定“异步”这个词是什么意思。 如果你的意思是“非阻塞I / O”,那么值得注意的是,这两个实现默认使用阻塞I / O。


其次,值得注意的是你试图发送的数据的大小

 >>> from numpy.random import randn >>> from cPickle import dumps >>> len(dumps(randn(100))) 2479 >>> len(dumps(randn(10000))) 237154 

第三,从pipe(7) manpage …

管道容量

管道的容量有限。 如果管道已满,则根据是否设置了O_NONBLOCK标志(见下文),写入(2)将会阻塞或失败。 不同的实现对管道容量有不同的限制。 应用程序不应该依赖于特定的容量:应该设计一个应用程序,以便读取过程在可用时立即消耗数据,这样写入过程不会被阻塞。

在2.6.11之前的Linux版本中,管道的容量与系统页面大小相同(例如i386上的4096字节)。 从Linux 2.6.11开始,管道容量是65536字节。


所以,实际上,在pipe_out.send()调用中,所有的子pipe_out.send()都被阻塞,并且没有一个可以从其他进程接收任何数据,因为你发送了所有237154字节的数据一击,填满了65536字节的缓冲区。

您可能会试图使用Unix域套接字版本,但是目前唯一的原因是它的缓冲区大小比管道大,如果增加DATA_POINTS的数量,您会发现解决方案也会失败到10万。

“quick n'dirty hack”解决方案是将数据分割成更小的块进行发送,但依赖于特定大小的缓冲区不是好习惯。

更好的解决方案是在pipe_out.send()调用上使用非阻塞I / O,尽管我对multiprocessing模块不熟悉,无法确定使用该模块实现它的最佳方式。

伪代码将沿着…的线

 while 1: if we have sent all data and received all data: break send as much data as we can without blocking receive as much data as we can without blocking if we didn't send or receive anything in this iteration: sleep for a bit so we don't waste CPU time continue 

…或者你可以使用Python select模块来避免长时间睡眠,但同样可以将它与multiprocessing.Pipe进行整合。 multiprocessing.Pipe可能会非常棘手。

multiprocessing.Queue类可能会为你做所有这些,但是我以前从来没有用过,所以你需要做一些实验。