在Windows 10上使用pool.apply_async创build新线程的Python 3.6多处理在多次迭代之后停止工作

我最近开始使用pythons多处理库,并决定使用Pool()和apply_async()方法是最适合我的问题。 代码很长,但是对于这个问题,我压缩了所有与函数中的多处理无关的东西。

背景信息

基本上,我的程序应该采取一些数据结构,并将其发送到另一个程序来处理它,并将结果写入一个txt文件。 我有几千个这样的结构(N * M),并且有很大的块(M)是独立的,可以按任何顺序处理。 我创build了一个工作池来处理这些M结构,然后检索下一个块。 为了处理一个结构,必须为外部程序创build一个新的线程来运行。 处理期间在外部程序之外花费的时间less于20%,所以如果我检查任务pipe理器,我可以看到在进程下运行的外部程序。

实际的问题

这个效果很好,但是在许多处理过的结构(5000到20000之间的任何数目)之后,突然间,外部程序停止出现在任务pipe理器中,并且python孩子在个人最高性能(〜13%cpu)任何更多的结果。 我不明白这个问题可能是什么。 还有足够的内存,每个孩子只能使用90 Mb左右。 也很奇怪,它工作了一段时间,然后停下来。 如果我使用ctrl-c,几分钟后它会停止,所以它对用户input没有反应。

有一个想法是,当超时的外部程序线程被杀死(时不时发生),也许东西没有被正确closures,以便subprocess正在等待一些它找不到的东西? 如果是这样,是否有更好的办法处理超时的外部进程?

from multiprocessing import Pool, TimeoutError N = 500 # Number of chunks of data that can be multiprocessed M = 80 # Independed chunk of data timeout = 100 # Higher than any of the value for dataStructures.timeout if __name__ == "__main__": results = [None]*M savedData = [] with Pool(processes=4) as pool: for iteration in range(N): dataStructures = [generate_data_structure(i) for i in range(M)] #---Process data structures--- for iS, dataStructure in enumerate(dataStructures): results[iS] = pool.apply_async(processing_func,(dataStructure,)) #---Extract processed data--- for iR, result in enumerate(results): try: processedData = result.get(timeout=timeout) except TimeoutError: print("Got TimeoutError.") if processedData.someBool: savedData.append(processedData) 

这也是为外部程序创build新线程的函数。

 import subprocess as sp import win32api as wa import threading def processing_func(dataStructure): # Call another program that processes the data, and wait until it is finished/timed out timedOut = RunCmd(dataStructure.command).start_process(dataStructure.timeout) # Read the data from the other program, stored in a text file if not timedOut: processedData = extract_data_from_finished_thread() else: processedData = 0. return processedData class RunCmd(threading.Thread): CREATE_NO_WINDOW = 0x08000000 def __init__(self, cmd): threading.Thread.__init__(self) self.cmd = cmd self.p = None def run(self): self.p = sp.Popen(self.cmd, creationflags=self.CREATE_NO_WINDOW) self.p.wait() def start_process(self, timeout): self.start() self.join(timeout) timedOut = self.is_alive() # Kills the thread if timeout limit is reached if timedOut: wa.TerminateProcess(self.p._handle,-1) self.join() return timedOut