在python中的多处理,以加快function

我很困惑Python多处理

我试图加快从数据库处理string的function,但是我一定误解了多处理的工作原理,因为与“正常处理”相比,这个函数在给一个工作者池时需要更长的时间。

这里是我试图实现的一个例子。

from time import clock, time from multiprocessing import Pool, freeze_support from random import choice def foo(x): TupWerteMany = [] for i in range(0,len(x)): TupWerte = [] s = list(x[i][3]) NewValue = choice(s)+choice(s)+choice(s)+choice(s) TupWerte.append(NewValue) TupWerte = tuple(TupWerte) TupWerteMany.append(TupWerte) return TupWerteMany if __name__ == '__main__': start_time = time() List = [(u'1', u'aa', u'Jacob', u'Emily'), (u'2', u'bb', u'Ethan', u'Kayla')] List1 = List*1000000 # METHOD 1 : NORMAL (takes 20 seconds) x2 = foo(List1) print x2[1:3] # METHOD 2 : APPLY_ASYNC (takes 28 seconds) # pool = Pool(4) # Werte = pool.apply_async(foo, args=(List1,)) # x2 = Werte.get() # print '--------' # print x2[1:3] # print '--------' # METHOD 3: MAP (!! DOES NOT WORK !!) # pool = Pool(4) # Werte = pool.map(foo, args=(List1,)) # x2 = Werte.get() # print '--------' # print x2[1:3] # print '--------' print 'Time Elaspse: ', time() - start_time 

我的问题:

  1. 为什么apply_async比“正常的方式”花费的时间更长?
  2. 我在做什么地图错了?
  3. 通过多处理加速这样的任务是否有意义?
  4. 最后:毕竟我在这里阅读,我想知道python中的多处理在Windows上工作吗?

所以你的第一个问题是foo(x)没有实际的并行性,你将整个列表传递给函数一次。

1)进程池的想法是有许多进程在一些数据的不同位上进行计算。

  # METHOD 2 : APPLY_ASYNC jobs = 4 size = len(List1) pool = Pool(4) results = [] # split the list into 4 equally sized chunks and submit those to the pool heads = range(size/jobs, size, size/jobs) + [size] tails = range(0,size,size/jobs) for tail,head in zip(tails, heads): werte = pool.apply_async(foo, args=(List1[tail:head],)) results.append(werte) pool.close() pool.join() # wait for the pool to be done for result in results: werte = result.get() # get the return value from the sub jobs 

如果处理每个块需要的时间大于启动过程所花的时间,那么这只会给你一个实际的加速,在四个进程和四个工作要完成的情况下,当然这些动态变化,已经有4个流程和100个工作要做。 请记住,您正在创建一个全新的python解释器四次,这不是免费的。

2)你在map中遇到的问题是它在单独的进程中将foo应用于List1中的EVERY元素,这将花费相当长的一段时间。 所以如果你的pool有4个进程, map会弹出列表中的一个项目,并把它发送到一个进程来处理 – 等待进程结束 – 弹出一些列表的更多东西 – 等待进程完成。 这只有在处理单个项目需要很长时间时才有意义,例如,如果每个项目都是指向一个千兆字节文本文件的文件名称。 但是,因为它的地图将只需要一个列表中的单个字符串,并将其传递给fooapply_async将获取列表的一部分。 尝试下面的代码

 def foo(thing): print thing map(foo, ['a','b','c','d']) 

这是内置的Python映射,将运行一个进程,但这个想法对于多进程版本是完全一样的。

根据JFSebastian的评论添加:然而,您可以使用chunksize参数来map以指定每个块的近似大小。

 pool.map(foo, List1, chunksize=size/jobs) 

我不知道,如果在Windows上的map有问题,因为我没有一个可用于测试。

3)是的,因为你的问题已经足够大,可以证明你已经找到了新的python解释器

4)不能给你一个明确的答案,因为它取决于核心/处理器的数量等,但一般来说,它应该在Windows上罚款。

问题(2)在Dougal和Matti的指导下,我找出了什么地方出了问题。 原来的foo函数处理列表清单,而map需要一个函数来处理单个元素。

新的功能应该是

 def foo2 (x): TupWerte = [] s = list(x[3]) NewValue = choice(s)+choice(s)+choice(s)+choice(s) TupWerte.append(NewValue) TupWerte = tuple(TupWerte) return TupWerte 

和块来调用它:

 jobs = 4 size = len(List1) pool = Pool() #Werte = pool.map(foo2, List1, chunksize=size/jobs) Werte = pool.map(foo2, List1) pool.close() print Werte[1:3] 

感谢所有帮助我理解这一点的人。

所有方法的结果:对于列表* 2 Mio记录:正常13.3秒,与异步平行:7.5秒,与chuncksize:7.3并行,没有chunksize 5.2秒

如果你有兴趣,这是一个通用的多处理模板。

 import multiprocessing as mp import time def worker(x): time.sleep(0.2) print "x= %s, x squared = %s" % (x, x*x) return x*x def apply_async(): pool = mp.Pool() for i in range(100): pool.apply_async(worker, args = (i, )) pool.close() pool.join() if __name__ == '__main__': apply_async() 

输出如下所示:

 x= 0, x squared = 0 x= 1, x squared = 1 x= 2, x squared = 4 x= 3, x squared = 9 x= 4, x squared = 16 x= 6, x squared = 36 x= 5, x squared = 25 x= 7, x squared = 49 x= 8, x squared = 64 x= 10, x squared = 100 x= 11, x squared = 121 x= 9, x squared = 81 x= 12, x squared = 144 

正如你所看到的,这些数字并不是按顺序排列的,因为它们是异步执行的。