Python多处理使用共享variables的Pool.apply_async(值)

对于我的大学项目,我试图开发一个基于python的stream量生成器。我在vmware上创build了2个CentOS机器,我使用1作为我的客户机,1作为我的服务器机器。 我使用IP别名技术来增加客户端和服务器的数量,只使用一台客户机/服务器机器。 现在,我已经在客户机上创build了50个IP别名,在我的服务器上创build了10个IP别名。 我也使用多处理模块来从所有50个客户端并发地向所有10个服务器产生通信量。 我也在我的服务器上开发了几个configuration文件(1kb,10kb,50kb,100kb,500kb,1mb)(我在使用Apache服务器时在/ var / www / html目录中),并使用urllib2向这些configuration文件发送请求我的客户机。 我使用httplib + urllib2首先绑定到任何一个源别名ip,然后使用urllib2从这个ip发送请求。 这里要增加我的TCP连接数 ,我正在尝试使用multiprocessing.Pool.apply_async模块。 但是我在运行脚本的时候遇到了这个错误“RuntimeError:同步对象只能通过inheritance在进程之间共享”。 经过一些debugging,我发现这个错误是由于使用multiprocessing.Value引起的。 但我想分享我的进程之间的一些variables,我也想增加我的TCP连接数。 在这里可以使用其他模块(multiprocessing.Value除外)来共享一些常用variables吗? 或者有没有其他的解决scheme,这个查询?

''' Traffic Generator Script: Here I have used IP Aliasing to create multiple clients on single vm machine. Same I have done on server side to create multiple servers. I have around 50 clients and 10 servers ''' import multiprocessing import urllib2 import random import myurllist #list of all destination urls for all 10 servers import time import socbindtry #script that binds various virtual/aliased client ips to the script m=multiprocessing.Manager() response_time=m.list() #some shared variables error_count=multiprocessing.Value('i',0) def send_request3(): #function to send requests from alias client ip 1 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3) #bind to alias client ip1 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: error_count.value=error_count.value+1 def send_request4(): #function to send requests from alias client ip 2 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4) #bind to alias client ip2 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: error_count.value=error_count.value+1 #50 such functions are defined here for 50 clients def func(): pool=multiprocessing.Pool(processes=750) for i in range(5): pool.apply_async(send_request3) pool.apply_async(send_request4) pool.apply_async(send_request5) #append 50 functions here pool.close() pool.join() print"All work Done..!!" return start=float(time.time()) func() end=float(time.time())-start print end 

Solutions Collecting From Web of "Python多处理使用共享variables的Pool.apply_async(值)"

如错误消息所述,您不能通过pickle传递一个multiprocessing.Value 。 但是,您可以使用multiprocessing.Manager().Value

 import multiprocessing import urllib2 import random import myurllist #list of all destination urls for all 10 servers import time import socbindtry #script that binds various virtual/aliased client ips to the script def send_request3(response_time, error_count): #function to send requests from alias client ip 1 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3) #bind to alias client ip1 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: with error_count.get_lock(): error_count.value += 1 def send_request4(response_time, error_count): #function to send requests from alias client ip 2 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4) #bind to alias client ip2 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: with error_count.get_lock(): error_count.value += 1 #50 such functions are defined here for 50 clients def func(response_time, error_count): pool=multiprocessing.Pool(processes=2*multiprocessing.cpu_count()) args = (response_time, error_count) for i in range(5): pool.apply_async(send_request3, args=args) pool.apply_async(send_request4, args=args) #append 50 functions here pool.close() pool.join() print"All work Done..!!" return if __name__ == "__main__": m=multiprocessing.Manager() response_time=m.list() #some shared variables error_count=m.Value('i',0) start=float(time.time()) func(response_time, error_count) end=float(time.time())-start print end 

其他一些注意事项在这里:

  1. 使用具有750个进程的Pool不是一个好主意。 除非你使用一个拥有数百个CPU内核的服务器,否则这将会让你的机器瘫痪。 这会更快,并减少您的机器使用少得多的进程。 更像2 * multiprocessing.cpu_count()
  2. 作为最佳实践,您应该明确地将您需要使用的所有共享参数传递给子进程,而不是使用全局变量。 这增加了代码在Windows上工作的机会。
  3. 它看起来像所有的send_request*函数几乎完全相同的事情。 为什么不只是做一个函数,并使用一个变量来决定使用哪个socbindtry.BindableHTTPHandler ? 这样做可以避免大量的代码重复。
  4. 你增加error_count方式不是进程/线程安全的,并且容易受竞争条件的影响。 你需要用锁来保护增量(就像我在上面的例子中所做的那样)。

可能是因为Python多进程在Windows和Linux之间的区别 (我认真的,不知道多处理在VM中是如何工作的,就像这里的情况一样)。

这可能工作;

 import multiprocessing import random import myurllist #list of all destination urls for all 10 servers import time def send_request3(response_time, error_count): #function to send requests from alias client ip 1 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3) #bind to alias client ip1 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: error_count.value=error_count.value+1 def send_request4(response_time, error_count): #function to send requests from alias client ip 2 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4) #bind to alias client ip2 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: error_count.value=error_count.value+1 #50 such functions are defined here for 50 clients def func(): m=multiprocessing.Manager() response_time=m.list() #some shared variables error_count=multiprocessing.Value('i',0) pool=multiprocessing.Pool(processes=750) for i in range(5): pool.apply_async(send_request3, [response_time, error_count]) pool.apply_async(send_request4, [response_time, error_count]) # pool.apply_async(send_request5) #append 50 functions here pool.close() pool.join() print"All work Done..!!" return start=float(time.time()) func() end=float(time.time())-start print end