使用来自多处理的Pool类,我将数据库search任务拆分为并行进程,每个并行进程都针对一个非常大的数据库运行一组正则expression式,这些数据库已加载到内存中。 该程序运行在一个不错的强大的Windows服务器上,具有60多个内核和大量内存。
我的Python编程经验,特别是多处理,是相当水平的。
当我第一次创build这个程序的时候,一切正常,每个工作人员都很好地处理了这个程序,然后继续下一个。 直到我不得不对数据库查询进行一些格式化修改之前,我没有碰它几个月,但是当我再次启动它时,它运行得太慢了。 在testing中,我确定了我产生的进程数量实际上并没有改变运行速度,而且确实看着任务pipe理器显示所有进程在那里变冷,但其中只有一个实际上显示出任何工作迹象。
def calc(ruleList,record): returnList = [] print(record[5],end = '\r') hits = recordIterator(ruleList,record) for h in hits: returnList.append([record[0],record[1],h]) return returnList nthreads = 48 hname = 'Hits.txt' p = multiprocessing.Pool(processes = nthreads) Hits = [] for record in Records: Hits.append((p.apply_async(calc, (rules, record))).get()) hhandle = open(hname, "w") for hit in Hits: try: for x in hit: hhandle.write(str(x[0])+'|'+str(x[1])+'|'+str(x[2])+'\n') except (UnicodeEncodeError,UnicodeDecodeError): pass hhandle.close()
我不是计算机上的pipe理员,而且我不熟悉如何configuration服务器,但在我看来,Windows根本就没有安排subprocess来分离核心。 我曾尝试以多种不同的方式重新configuration我的代码,以避免潜在的多处理堵塞,但每个function差异最终都会遇到同样的问题。
在我的代码中有没有什么东西是我错过了这个进程? 是否有一些Windows服务器设置可能已被更改为取消我的工作人员使用独立内核的资格?
它看起来像我的代码(p.apply_async(calc, (rules, record))).get()
迫使你的程序一次只运行一个作业。 父进程将在get()
等待上一个作业的结果在启动下一个作业之前变为可用。
尝试用一次调用starmap
替换Records
的循环和多个apply_async
调用:
Hits = p.starmap(calc, ((rules, record) for record in Records))
这将记录传递到池中,并且只有在它们全部被发送后才阻止结果进入。
要充实@ blckknght的答案: apply_async()
提交一个作业,但.get()
立即要求结果。 一个更简单的解决方案是提交所有的工作,然后按照他们进来的每个结果,不管顺序如何。 也就是说,使用imap_unordered()
import multiprocessing def calc(num): return num*2 pool = multiprocessing.Pool(5) for output in pool.imap_unordered(calc, [1,2,3]): print 'output:',output
output: 2 output: 4 output: 6