multiprocessing.pool.imap是否有一个允许多个参数的变体(如starmap)?

我正在做大量字节集合的计算。 该进程以字节块运行。 我正在尝试使用多处理并行处理来提高性能。 最初我尝试使用pool.map,但只允许一个参数,然后我发现有关pool.starmap。 但是pool.starmap只有在所有进程完成后才会给出结果。 我想要结果,因为他们来了(有点)。 我正在尝试使用pool.imap提供的结果作为进程完成,但不允许多个参数(我的函数需要2个参数)。 而且,结果的顺序很重要。

以下是一些示例代码:

pool = mp.Pool(processes=4) y = [] for x in pool.starmap(f, zip(da, repeat(db))): y.append(x) 

上面的代码可以工作,但是只有在所有的过程完成后才给出结果。 我看不到任何进展。 这就是为什么我试图使用pool.imap,效果很好,但只有一个参数:

 pool = mp.Pool(processes=4) y = [] for x in pool.imap(f, da)): y.append(x) 

在多个参数引起以下exception:

 TypeError: f() missing 1 required positional argument: 'd' 

寻找简单的方法来实现所有3个要求:

  1. 并行处理使用多个参数/参数
  2. 在stream程运行的同时看到进展
  3. 有序的结果。

谢谢!

我可以很快回答前两个问题。 我认为在理解前两句之后你应该能够处理第三个问题。

1.具有多个参数的并行处理

我不确定整个“星图”是否相同,但是这里有一个选择。 我过去所做的就是将我的论据压缩成一个单一的数据对象,如列表。 例如,如果要将三个参数传递给map_function ,可以将这些参数附加到列表中,然后使用.map().imap()函数使用该列表。

 def map_function(combo): a = combo[0] b = combo[1] c = combo[2] return a + b + c if '__name__' == '__main__': combo = [] combo[0] = arg_1 combo[1] = arg_2 combo[2] = arg_3 pool = Pool(processes=4) pool.map(map_function, combo) 

2.跟踪进度

一个好的方法是使用multiprocessing的共享值。 我实际上在一个月前问过这个(几乎)相同的确切问题 。 这使您可以从由map函数创建的不同进程中操作相同的变量。 为了学习,我会让你读一读自己的共享状态解决方案。 如果经过几次尝试仍然有困难,我会非常乐意帮助你,但是我相信自己教导你如何理解一些东西比我给你的答案更有价值。

希望这可以帮助!!

我认为这个解决方案完全符合你的3个要求: https : //stackoverflow.com/a/28382913/2379433

简而言之, p = Pool(); p.imap p = Pool(); p.imap将使您看到进展和维持秩序。 如果你想使用多个参数的map函数,你可以使用一个multiprocessing的分支,它提供了更好的序列化和多个参数。 看到一个例子的链接。