Articles of 多处理

如何编程以便不同的进程在不同的CPU核上运行?

我正在写2个进程的Linux C程序。 我将在不同的机器上运行程序。 这些机器可能有多个CPU核心。 当我运行程序时,系统会为不同的进程分配不同的CPU核心吗? 或者我需要编写一些代码,以充分利用CPU核心?

locking内存分配争用 – multithreading与多进程

我们已经开发了一个大型C ++应用程序,可以在大型Linux和Solaris机箱(多达160个CPU内核甚至更多)上的多个站点上运行得令人满意。 这是一个严重的multithreading(1000多个线程),单进程体系结构,消耗大量的内存(200 GB +)。 我们是LD_PRELO在谷歌Perftool的tcmalloc(或Solaris上的libumem / mtmalloc),以避免内存分配性能瓶颈,总体上好的结果。 然而,我们已经开始看到在一些更大的安装中,在内存分配/重新分配期间锁争用的不利影响,特别是在进程已经运行一段时间(暗示分配器的老化/分段影响)之后。 我们正在考虑改用多进程/共享内存架构(在共享内存中不会发生严重的分配/释放,而是在常规的堆上)。 所以,最后,我们的问题是:我们能否假设现代Linux内核的虚拟内存pipe理器能够有效地将内存分发到数百个并发进程? 或者我们不得不期望在单进程/multithreading环境中遇到与内存分配争用相同的问题? 我倾向于希望获得更好的整体系统性能,因为我们不再局限于单个地址空间,并且拥有多个独立的地址空间将会减less对虚拟内存pipe理器的locking。 任何人都有比较multithreading和多进程内存分配的实际经验或性能数据?

自定义对象是由Python的多处理进程共享的吗?

我有一个初始化方法,它初始化各种原始和复杂的数据types和对象。 在由multiprocessing.Process产生的每个进程中,我从init()方法和初始化对象的地址打印一个variables。 我得到了variables的不同实例,但对象的地址保持不变。 所以,想知道多处理过程中父类的成员究竟发生了什么。进程调用? def __init__(self): self.count = 0 self.db = pymongo.MongoClient() def consumerManager(self): for i in range(4): p = multiprocessing.Process(target = self.consumer, args = (i,)) def consumer(self, i): while(1): time.sleep(i) self.count += 1 print self.count print os.getpid() print id(self.db) 如果它正在做对象的深层副本,那么id(self.db)应该在每个进程中打印一个不同的id,这不会发生。 这是如何完成的?

发送TERM信号给在父进程的另一个线程中产生的subprocess

我在Linux平台上使用Perl。 首先我创build了一个线程,并在这个新线程中派生了一个subprocess。 当新线程中的父进程返回并join到主线程时,我想将TERM信号发送到创build线程中产生的subprocess,但信号处理程序不起作用,subprocess变成僵尸。 这是我的代码: use strict; use warnings; use Thread 'async'; use POSIX; my $thrd = async { my $pid = fork(); if ($pid == 0) { $SIG{TERM} = \&child_exit; `echo $$ > 1`; for (1..5) { print "in child process: cycle $_\n"; sleep 2; } exit(0); } else { $SIG{CHLD} = \&reaper; } }; […]

在C中同步进程和信号量和信号

我必须在Linux上用C编写程序。 它必须有3个进程 – 首先从STDIN中读取,通过FIFO发送消息到第二个进程,计算接收到的消息的长度,并将结果发送到第三个进程(也是通过FIFO),将其显示在STDOUT上。 我必须使用信号量来同步它。 另外,我还要添加信号处理(我正在使用共享内存) – 一个信号结束程序,第二个停止它,第三个恢复。 信号可以发送到任何进程。 我已经有一些代码,但它不能正常工作。 第一个问题是同步 – 正如你通过运行它可以看到的,第一个消息被第二个进程接收,但是然后它被卡住了。 第一个和第二个过程显示他们的信息,但不是第三个。 有非常相似的,所以很混乱。 我必须发送另一条消息,然后P3显示前一个的长度。 第二个问题是信号 – 发送一个后,我必须按回车键(SIGUSR)或发送信息(SIGINT)才能送达。 任何想法有什么不对? 我之前发布的内容有一些改进,但仍然不能正常工作,我没有太多的时间来完成(直到星期一)。 我知道这是很多代码,但如果有人能分析第二和第三个进程的通信,我将非常感激。 #include <stdio.h> #include <stdlib.h> #include <fcntl.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/ipc.h> #include <unistd.h> #include <sys/sem.h> #include <signal.h> #include <sys/shm.h> #include <sys/wait.h> #include <string.h> #include <errno.h> #define WRITE 1 #define READ 0 #define […]

实现与gevent兼容的工作进程池的最佳方式是什么?

脚本 我有一个在Python和gevent中实现的服务器进程,它通过TCP / IP连接到后端服务,并为许多Web客户端(每个进程约1000个)提供服务,并根据后端服务提供的更改快速更改dynamic内容。 该服务在Ubuntu 10.04上运行,不会在Windows上运行,因此目标平台已经修复。 目前我们正在使用Python 2.6.6。 问题 将更改提供给多个客户端可能会导致处理后端发送的更改滞后,因此我的计划是将服务器分成多个进程。 许多工作进程将为Web客户端服务,而主进程仍将连接到后端服务。 我已经使用一个单独的greenlet池来为Web客户端服务,但是他们需要被放入工作进程。 题 您可以请我指出一个gevent的工作进程池实现,或者找出如何以正确的方式使用Python自己的多处理模块? 限制 我想避免在我们的进程中引入Python线程,因为这会给GIL争用留出空间,从而通过引入延迟来降低性能。 所以如果可能的话,这将是一个干净的多处理+ gevent解决scheme。

我如何循环pipe道中的数据?

我发现一些Perl中的进程可以通过pipe道进行通信的代码。 例: if ($pid = fork) { close $reader; print $writer "Parent Pid $$ is sending this\n"; close $writer; waitpid($pid,0); } else { close $writer; chomp($line = <$reader>); print "Child Pid $$ just read this: `$line'\n"; close $reader; exit; } 现在我有以下问题: 是否有可能让读者从pipe道中读取,然后阻塞,直到新的数据从pipe道中像循环一样来? 如果是,当父进程没有数据发送时,杀死subprocess的方式是什么? 每个程序有多less个打开的读/写pipe道是有限制的? 例如,如果我叉十进程,并有20个pipe道(10读/ 10写)这是一个坏主意? 如果问题太基本,我很抱歉,但我的经验是用另一种语言的线程。

如何使用subprocess停止由单个脚本产生的所有subprocess

我需要在我的包的一部分上运行一个unit testing,对我来说最好的办法是控制启动它,然后杀死multiprocessing模块产生的所有进程。 以下是我正在谈论的内容: test.py import logging import multiprocessing import time import random log = logging.getLogger(__name__) def start_consumers(conf, worker_count=5): manager = WorkerManager(conf, worker_count) manager.start() class WorkerManager(): def __init__(self, conf, worker_count=5): self.workers = [] for num in range(worker_count): self.workers.append(WorkHandler(conf)) def start(self): for worker in self.workers: worker.daemon = True worker.start() print 'started' for worker in self.workers: worker.join() class […]

如何在Linux上超时运行一个正在运行的函数以及它的subprocess?

如何强制一个函数和所有的subprocess在Linux上超时? 例如,如何在10秒后强制multiprocessed_func完成: import time def multiprocessed_func(seconds): # Assume this a long running function which uses # multiprocessing internally and returns None. time.sleep(seconds) try: multiprocessed_func(600) except: print('took too long')

使用python在linux中合​​并文件时文件大小的巨大减less

我写了一个脚本,其中包含一个文件的文件夹,并使用Python的多处理池库将它们组合成最大大小为500MB的文件。 脚本获取文件夹中的文件列表,并将其分成16个列表,每个列表映射到一个进程。 在每个进程中,组合的临时文件由每个列表中的一组文件组成。 在获得所有这16个文件后,我将这16个文件按顺序合并,并删除临时文件。 我在ext4文件系统的CentOS系统上运行这个,并且我传递了一个大小为930 MB的文件夹,其中186147个文件分布在50个子文件夹中,它给了我一个单一的文件作为输出,大小为346 MB。 我很困惑如何减less文件大小如此之多。 请注意,这些186147文件中的每一个在开始时都有一个额外的头文件,在最终的组合文件创build过程中被忽略,但只有头文件只有233个字节。 为了检查我的脚本是否正确,我检查了合并文件(3083015)中的总行数,它匹配186147个文件(3269162)中的行数总和(186147)。 我也试图猫单个文件和行看起来是完整的,但我没有通过整个文件。 有什么我在这里失踪? 这是我使用的并行函数: curr_write_file_name = os.path.join(output_folder, str(list_index) + '_' + "00000.flows") curr_write_file = open(curr_write_file_name, 'w') curr_write_file.write(header) curr_write_count = 1 for curr_file in file_list: print('Processing', curr_file) netflow_read = open(curr_file, 'r') for index, line in enumerate(netflow_read): if index == 0: continue else: curr_write_file.write(line) if os.stat(curr_file).st_size >= 500000000: […]