我对multiprocessing
模块很multiprocessing
。 我只是试图创build以下内容:我有一个进程是从RabbitMQ获取消息并将其传递给内部队列( multiprocessing.Queue
)。 然后,我想要做的是:当新消息进来时产生一个进程。它可以工作,但是在作业完成后,它会留下一个没有被父节点终止的僵尸进程。 这是我的代码:
主要stream程:
#!/usr/bin/env python import multiprocessing import logging import consumer import producer import worker import time import base conf = base.get_settings() logger = base.logger(identity='launcher') request_order_q = multiprocessing.Queue() result_order_q = multiprocessing.Queue() request_status_q = multiprocessing.Queue() result_status_q = multiprocessing.Queue() CONSUMER_KEYS = [{'queue':'product.order', 'routing_key':'product.order', 'internal_q':request_order_q}] # {'queue':'product.status', # 'routing_key':'product.status', # 'internal_q':request_status_q}] def main(): # Launch consumers for key in CONSUMER_KEYS: cons = consumer.RabbitConsumer(rabbit_q=key['queue'], routing_key=key['routing_key'], internal_q=key['internal_q']) cons.start() # Check reques_order_q if not empty spaw a process and process message while True: time.sleep(0.5) if not request_order_q.empty(): handler = worker.Worker(request_order_q.get()) logger.info('Launching Worker') handler.start() if __name__ == "__main__": main()
这是我的工作者:
import multiprocessing import sys import time import base conf = base.get_settings() logger = base.logger(identity='worker') class Worker(multiprocessing.Process): def __init__(self, msg): super(Worker, self).__init__() self.msg = msg self.daemon = True def run(self): logger.info('%s' % self.msg) time.sleep(10) sys.exit(1)
所以在处理所有的消息之后,我可以用ps aux
命令看到进程。 但是我真的很希望他们一旦完成就终止。 谢谢。
几件事情:
确保父母joins
其子女,以避免僵尸。 请参阅Python多处理杀死进程
你可以用is_alive()
成员函数检查一个小孩是否还在运行。 请参阅http://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process
使用multiprocessing.active_children
比Process.join
更好。 函数active_children
清除自从上次调用active_children
创建的所有僵尸。 join
方法等待选定的进程。 在此期间,其他进程可以终止并成为僵尸,但是父进程不会注意到,直到等待的方法被加入。 要看到这个行动:
import multiprocessing as mp import time def main(): n = 3 c = list() for i in xrange(n): d = dict(i=i) p = mp.Process(target=count, kwargs=d) p.start() c.append(p) for p in reversed(c): p.join() print('joined') def count(i): print('{i} going to sleep'.format(i=i)) time.sleep(i * 10) print('{i} woke up'.format(i=i)) if __name__ == '__main__': main()
以上将创建3个进程,每个进程终止10秒。 正如代码所示,最后一个进程首先被连接,所以之前终止的另外两个将是僵尸20秒。 你可以看到他们:
ps aux | grep Z
如果进程按照它们将要终止的顺序等待,将不会有僵尸。 删除reversed
看到这种情况。 但是,在实际的应用中,我们很少知道孩子会终止的顺序,所以使用join
会导致一些僵尸。
替代active_children
不会留下任何僵尸。 在上面的例子中,用for p in reversed(c):
替换for p in reversed(c):
的循环for p in reversed(c):
with:
while True: time.sleep(1) if not mp.active_children(): break
看看会发生什么
使用active_children。 multiprocessing.active_children