Articles of 多处理

Windows上的multiprocessing.Pool.apply_async

我正在尝试使用一个池来并行处理一些subprocess调用。 一切工作正常,如果我构build池的整个迭代,并使用imap , map , imap_unordered等,但我不能得到apply_async工作。 例如,这可以正常工作: from subprocess import check_call from multiprocessing import Pool def dispatch_call(file_name): return check_call(…) if __name__ == '__main__': files = (constructed file list) pool = Pool() pool.imap(dispatch_call, files) pool.close() pool.join() 但是,这并不是: from subprocess import check_call from multiprocessing import Pool def dispatch_call(file_name): return check_call(…) if __name__ == '__main__': files = (constructed […]

使用超线程运行模拟运行时

我使用python / numpy / cython编写的模拟。 由于我需要对许多模拟运行进行平均,我使用多处理模块来批量运行所有单独的模拟运行。 在办公室,我有一个HT的i7-920工作站。 在家里我有一个没有的i5-560。 我以为我可以在办公室的每一批中运行两次仿真实例,并将运行时间减半。 令人惊讶的是,每个单独实例的运行时间与在家用工作站上运行的时间相比翻了一番。 在家里并行运行3个模拟实例需要8分钟,而在办公室运行6个实例需要15分钟。 使用'cat / proc / cpuinfo'我validation了'siblings'= 8和'cpu cores'= 4,所以启用了HT。 我没有意识到任何“运行时总体守恒定律”(尽pipe从科学的angular度来看它可能非常有趣:)),而在这里跳来跳去的人可能会对这个难题有所了解。

为什么孩子没有死?

我期望terminate()方法杀死这两个进程: import multiprocessing import time def foo(): while True: time.sleep(1) def bar(): while True: time.sleep(1) if __name__ == '__main__': while True: p_foo = multiprocessing.Process(target=foo, name='foo') p_bar = multiprocessing.Process(target=bar, name='bar') p_foo.start() p_bar.start() time.sleep(1) p_foo.terminate() p_bar.terminate() print p_foo print p_bar 运行代码给出: <Process(foo, started)> <Process(bar, started)> <Process(foo, started)> <Process(bar, started)> … 我期待着: <Process(foo, stopped)> <Process(bar, stopped)> <Process(foo, stopped)> […]

使用Python 3.6.1在Linux / Intel Xeon上使用“fork”上下文块进行多处理?

问题描述 我稍微调整了这个答案的代码(见下文)。 但是,当在Linux上运行这个脚本(如命令行: python script_name.py )时,它将打印所有jobs running: x ,但之后似乎卡住了。 但是,当我使用spawn方法( mp.set_start_method('spawn') )它工作得很好,立即开始打印countervariables的值(请参阅listener方法)。 题 为什么它只在产卵过程中起作用? 我怎样才能调整代码,所以它与Forc的工作(因为它可能更快) 码 import io import csv import multiprocessing as mp NEWLINE = '\n' def file_searcher(file_path): parsed_file = csv.DictReader(io.open(file_path, 'r', encoding='utf-8'), delimiter='\t') manager = mp.Manager() q = manager.Queue() pool = mp.Pool(mp.cpu_count()) # put listener to work first watcher = pool.apply_async(listener, (q,)) jobs […]

Python线程与Linux中的多处理

基于这个问题,我认为创build新进程应该和在Linux中创build新线程 一样快 。 但是,一点testing显示非常不同的结果。 这是我的代码: from multiprocessing import Process, Pool from threading import Thread times = 1000 def inc(a): b = 1 return a + b def processes(): for i in xrange(times): p = Process(target=inc, args=(i, )) p.start() p.join() def threads(): for i in xrange(times): t = Thread(target=inc, args=(i, )) t.start() t.join() testing: >>> timeit […]

python Pipes的同步/asynchronous行为

在我的应用程序中,我使用多处理模块中的pipe道在python进程之间进行通信。 最近我观察到一个奇怪的行为取决于我通过他们发送的数据的大小。 根据python文档,这些pipe道是基于连接的,并且应该以asynchronous的方式运行,但是有时他们在发送时会卡住。 如果我在每个连接中启用全双工,一切工作正常,即使我没有使用连接发送和收听。 任何人都可以解释此行为? 100浮点数,全双工禁用 代码工作,利用asynchronous。 100个浮点,全双工启用 该示例正常工作正常。 10000浮点数,全双工禁用 尽pipe数据较小,但执行被永久封锁。 10000浮点,全双工启用 再次罚款。 代码(这不是我的生产代码,它只是说明了我的意思): from collections import deque from multiprocessing import Process, Pipe from numpy.random import randn from os import getpid PROC_NR = 4 DATA_POINTS = 100 # DATA_POINTS = 10000 def arg_passer(pipe_in, pipe_out, list_): my_pid = getpid() print "{}: Before send".format(my_pid) pipe_out.send(list_) print "{}: […]

性能 – multithreading或多进程应用程序

为了在Linux上开发一个高度networking密集的服务器应用程序,什么样的架构是首选? 这个想法是,这个应用程序通常运行在具有多个内核(虚拟或物理)的机器上。 考虑到性能是关键标准,最好去multithreading应用程序还是使用多进程devise的应用程序? 我知道,资源共享和同步从多个进程访问这些资源是很多编程开销,但如前所述,总体性能是关键要求,所以我们可以忽略这些东西。 编程语言是C / C ++。 我听说即使是multithreading应用程序(单进程)也可以利用多个内核,并且独立地在不同的内核上运行每个线程(只要没有同步问题)。 这个调度是由内核完成的。 如果是这样,multithreading应用程序和多进程应用程序在性能上没有太大的区别? Nginx使用多进程架构,并且非常快,但是multithreading应用程序可以获得相同的性能吗? 谢谢。

父进程如何通过调用_exit的subprocess获得终止状态

我已阅读以下声明。 给予_exit()的状态参数定义了进程的终止状态,当它调用wait()时,这个进程的父进程可以使用它。 一个进程总是被_exit()成功终止(即_exit()永远不会返回 )。 题 如果_ exit没有返回,父进程如何通过等待从subprocess中获得终止状态?

Python多处理内存使用情况

我写了一个程序,可以总结如下: def loadHugeData(): #load it return data def processHugeData(data, res_queue): for item in data: #process it res_queue.put(result) res_queue.put("END") def writeOutput(outFile, res_queue): with open(outFile, 'w') as f res=res_queue.get() while res!='END': f.write(res) res=res_queue.get() res_queue = multiprocessing.Queue() if __name__ == '__main__': data=loadHugeData() p = multiprocessing.Process(target=writeOutput, args=(outFile, res_queue)) p.start() processHugeData(data, res_queue) p.join() 真正的代码(特别是'writeOutput()')要复杂得多。 'writeOutput()'只使用这些值作为它的参数(意思是它没有引用ifata) 基本上它将一个巨大的数据集加载到内存中并对其进行处理。 输出的写入委托给一个subprocess(它实际上写入多个文件,这需要很多时间)。 所以每当一个数据项被处理时,它就会被发送到subprocess槽res_queue,然后根据需要将结果写入文件。 subprocess不需要以任何方式访问,读取或修改由'loadHugeData()'加载的数据。 subprocess只需要使用主进程通过“res_queue”发送的内容。 […]

在进程之间进行通信时,队列对pipe道有什么优势?

使用pipe道上的2个队列在进程之间进行通信的好处是什么(如果有的话)? 我打算使用multiprocessing python模块。