我有一个我需要并行化的代码。 代码本身没有问题。 该代码是一个python类的方法。 例如,
class test: def __init__(self): <...> def method(self): <...>
我这样写,因为完整代码的细节可能不相关,它是很长的。 在开始的时候,我试着并行化这个代码(只有两个实例):
t1=test() t2=test() pr1=Process(target=t1.method, args=(,)) pr2=Process(target=t2.method, args=(,)) pr1.start() pr2.start() pr1.join() pr2.join()
但是这不起作用。 不仅运行速度比运行一个实例还要依次运行慢得多,但也存在类variables未被修改的问题。 最后一个问题得到解决,感谢@MattDMo在这个线程中的回答,通过创build一个共享命名空间,共享variables和共享列表:
import multiprocessing as mp <...> self.manager=mp.Manager() self.shared=self.manager.Namespace() self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0]) self.shared.V=V
但它仍然运行非常缓慢。
一开始我以为是因为我在一台双核笔记本电脑上执行代码,两个内核饱和,但是这两个实例和计算机变慢,因为不能快速执行任何其他任务。 所以我决定在6核(也是一个linux系统)的台式电脑上试用这个代码。 它不能解决问题。 还是平行版本要慢得多。 另一方面,当我使用multithreading执行C编译代码时,台式计算机的CPU不会变得很热。 任何人都知道发生了什么?
完整的代码在这里 ,包括如下:
from math import exp from pylab import * from scipy.stats import norm from scipy.integrate import ode from random import gauss,random from numpy import dot,fft from time import time import multiprocessing as mp from multiprocessing import Pool from multiprocessing import Process from multiprocessing import Queue, Pipe from multiprocessing import Lock, current_process #Global variables sec_steps=1000 #resolution (steps per second) DT=1/float(sec_steps) stdd=20 #standard deviation for retina random input stdd2=20 #standard deviation for sigmoid #FUNCTION TO APPROXIMATE NORMAL CUMULATIVE DISTRIBUTION FUNCTION def sigmoid(x,mu,sigma): beta1=-0.0004406 beta2=0.0418198 beta3=0.9 z=(x-mu)/sigma if z>8: return 1 elif z<-8: return 0 else: return 1/(1+exp(-sqrt(pi)*(beta1*z**5+beta2*z**3+beta3*z))) #CLASSES class retina: ##GAUSSIAN WHITE NOISE GENERATOR def __init__(self,mu,sigma): self.mu=mu self.sigma=sigma def create_pulse(self): def pulse(): return gauss(self.mu,self.sigma) #return uniform(-1,1)*sqrt(3.)*self.sigma+self.mu return pulse def test_white_noise(self,N): #test frequency spectrum of random number generator for N seconds noise=[] pulse=self.create_pulse() steps=sec_steps*N+1 t=linspace(0,N,steps) for i in t: noise.append(pulse()) X=fft(noise) X=[abs(x)/(steps/2.0) for x in X] xlim([0,steps/N]) xlabel("freq. (Hz)") ylabel("Ampl. (V)") plot((t*steps/N**2)[1:],X[1:],color='black') #savefig('./wnoise.eps', format='eps', dpi=1000) show() return noise class cleft: #object: parent class for a synaptic cleft def __init__(self): self.shared=manager.Namespace() self.shared.preV=0.0 #pre-synaptic voltage self.shared.r=0.0 #proportion of channels opened Tmax=1.0 #mM mu=-35.0 #mV sigma=stdd2 #mV def T(self): #Receives presynaptic Voltage preV, returns concentration of the corresponding neurotransmitter. return self.Tmax*sigmoid(self.shared.preV,self.mu,self.sigma) def r_next(self): #Solves kinematic ode -analytical solution- to find r after one time step DT (needs T and alfa and beta parameters) """ runs the ode for one unit of time dt, as specified updates the previous r taken as initial condition """ tau=1.0/(self.alfa*self.T()+self.beta) r_inf=self.alfa*self.T()*tau self.shared.r=r_inf+(self.shared.r-r_inf)*exp(-DT/tau) def DI(self,postV): #Receives PSP and computes resulting change in PSC return self.g*self.shared.r*(postV-self.restV) class ampa_cleft(cleft): #Child class for ampa synaptic connection def __init__(self): self.manager=mp.Manager() self.shared=self.manager.Namespace() self.shared.preV=0.0 self.shared.r=0.5 #initial condition for r self.alfa=2.0 self.beta=0.1 self.restV=0.0 self.g=0.1 class gaba_a_cleft(cleft): #Child class for GABAa synaptic connection def __init__(self): self.shared=manager.Namespace() self.shared.preV=0.0 self.shared.r=0.5 self.alfa=2.0 self.beta=0.08 self.restV=-75.0 self.g=0.2 class gaba_a_cleft_trnTOtrn(cleft): #Child class for GABAa synaptic connection def __init__(self): self.manager=mp.Manager() self.shared=self.manager.Namespace() self.shared.preV=0.0 self.shared.r=0.5 self.alfa=2.0 self.beta=0.08 self.restV=-75.0 self.g=0.2 class gaba_a_cleft_inTOin(cleft): #Child class for GABAa synaptic connection def __init__(self): self.manager=mp.Manager() self.shared=self.manager.Namespace() self.shared.preV=0.0 self.shared.r=0.5 self.alfa=2.0 self.beta=0.08 self.restV=-75.0 self.g=0.2 class gaba_a_cleft_trnTOtcr(cleft): #Child class for GABAa synaptic connection def __init__(self): self.manager=mp.Manager() self.shared=self.manager.Namespace() self.shared.preV=0.0 self.shared.r=0.5 self.alfa=2.0 self.beta=0.08 self.restV=-85.0 self.g=0.1 class gaba_a_cleft_inTOtcr(cleft): #Child class for GABAa synaptic connection def __init__(self): self.manager=mp.Manager() self.shared=self.manager.Namespace() self.shared.preV=0.0 self.shared.r=0.5 self.alfa=2.0 self.beta=0.08 self.restV=-85.0 self.g=0.1 class gaba_b_cleft(cleft): #Child class for GABAa synaptic connection def __init__(self): self.manager=mp.Manager() self.shared=self.manager.Namespace() self.shared.preV=0.0 self.shared.r=0.5 self.shared.R=0.5 self.shared.X=0.5 self.alfa_1=0.02 self.alfa_2=0.03 self.beta_1=0.05 self.beta_2=0.01 self.restV=-100.0 self.g=0.06 self.n=4 self.Kd=100 #Dissociation constant def r_next(self): #Solves kinematic ode SECOND MESSENGER -analytical solution- to find r after one time step DT (needs T and alfa and beta parameters) """ runs the ode for one unit of time dt, as specified updates the previous r taken as initial condition """ Q1=self.alfa_1*self.T() Q2=-Q1-self.beta_1 R0=self.shared.R X0=self.shared.X self.shared.R=(Q1*(exp(Q2*DT)-1)+Q2*R0*exp(Q2*DT))/Q2 self.shared.X=(exp(-self.beta_2*DT)*(self.alfa_2*(self.beta_2*(exp(DT*(self.beta_2+Q2))*(Q1+Q2*R0)+Q1*(-exp(self.beta_2*DT))-Q2*R0)-Q1*Q2*(exp(self.beta_2*DT)-1))+self.beta_2*Q2*X0*(self.beta_2+Q2)))/(self.beta_2*Q2*(self.beta_2+Q2)) self.shared.r=self.shared.X**self.n/(self.shared.X**self.n+self.Kd) ####################################################################################################################################################### class neuronEnsemble: def __init__(self,V): #Parent class for a Neuron ensemble self.manager=mp.Manager() self.shared=self.manager.Namespace() self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0]) #Variables to store changes in PSC produced by synaptic connection self.shared.V=V #Actual state of the membrane potential kappa=1.0 #conductance def V_next(self): #ode analitycally for a single time step DT K1=self.C[0]*self.g/self.kappa K2=(-dot(self.C,self.I)+self.C[0]*self.g*self.restV)/self.kappa self.shared.V=K2/K1+(self.shared.V-K2/K1)*exp(-K1*DT) class TCR_neuronEnsemble(neuronEnsemble): def __init__(self,V): self.manager=mp.Manager() self.shared=self.manager.Namespace() self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0]) #Variables to store changes in PSC produced by synaptic connection self.shared.V=V #Actual state of the membrane potential self.g=0.01 #conductance of leak self.restV=-55.0 #rest of leak self.C=(1.0,7.1,1.0/2.0*30.9/4.0,1.0/2.0*3.0*30.9/4.0,1.0/2.0*30.9) #Cleak,C2,C3,C4,C7!! #connectivity constants to the ensemble #First one is Cleak, the others in same order as in diagram class TRN_neuronEnsemble(neuronEnsemble): def __init__(self,V): self.manager=mp.Manager() self.shared=self.manager.Namespace() self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0]) #Variables to store changes in PSC produced by synaptic connection self.shared.V=V #Actual state of the membrane potential self.g=0.01 #conductance of leak self.restV=-72.5 #rest of leak self.C=(1.0,15.0,35.0,0.0,0.0) #Cleak,C5,C8 #connectivity constants to the ensemble #First one is Cleak, the others in same order as in diagram class IN_neuronEnsemble(neuronEnsemble): #!!! update all parameters !!! def __init__(self,V): self.manager=mp.Manager() self.shared=self.manager.Namespace() self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0]) #Variables to store changes in PSC produced by synaptic connection self.shared.V=V #Actual state of the membrane potential self.g=0.01 #conductance of leak self.restV=-70.0 #rest of leak self.C=(1.0,47.4,23.6,0.0,0.0) #Cleak,C1,C6!! #connectivity constants to the ensemble #First one is Cleak, the others in same order as in diagram ######################################INSTANCE GROUP################################################################# class group: def __init__(self,tcr_V0,trn_V0,in_V0): #Declarations of instances #################### #SYNAPTIC CLEFTS self.cleft_ret_in=ampa_cleft() #cleft between retina and IN ensemble self.cleft_ret_tcr=ampa_cleft() #cleft between retina and TCR ensemble self.cleft_in_in=gaba_a_cleft_inTOin() #cleft between IN and IN ensembles self.cleft_in_tcr=gaba_a_cleft_inTOtcr() #cleft between IN and TCR ensembles self.cleft_tcr_trn=ampa_cleft() #cleft between TCR and TRN ensembles self.cleft_trn_trn=gaba_a_cleft_trnTOtrn() #cleft between TRN and TRN ensembles self.cleft_trn_tcr_a=gaba_a_cleft_trnTOtcr() #cleft between TRN and TCR ensembles GABAa self.cleft_trn_tcr_b=gaba_b_cleft() #cleft between TRN and TCR ensembles GABAb #POPULATIONS self.in_V0=in_V0 #mV ic excitatory potential self.IN=IN_neuronEnsemble(self.in_V0) #create instance of IN ensemble self.tcr_V0=tcr_V0 #mV ic excitatory potential self.TCR=TCR_neuronEnsemble(self.tcr_V0) #create instance of TCR ensemble self.trn_V0=trn_V0 #mV ic inhibitory potential self.TRN=TRN_neuronEnsemble(self.trn_V0) #create instance of TCR ensemble def step(self,p): #makes a step of the circuit for the given instance #UPDATE TRN self.cleft_tcr_trn.shared.preV=self.TCR.shared.V #cleft takes presynaptic V self.cleft_tcr_trn.r_next() #cleft updates r self.TRN.I[2]=self.cleft_tcr_trn.DI(self.TRN.shared.V) #update PSC TCR--->TRN self.cleft_trn_trn.shared.preV=self.TRN.shared.V #cleft takes presynaptic V self.cleft_trn_trn.r_next() #cleft updates r self.TRN.I[1]=self.cleft_trn_trn.DI(self.TRN.shared.V) #update PSC TRN--->TRN self.TRN.V_next() #update PSP in TRN #record retinal pulse ------|> IN AND TCR self.cleft_ret_in.shared.preV=self.cleft_ret_tcr.shared.preV=p #UPDATE TCR self.cleft_ret_tcr.r_next() #cleft updates r self.TCR.I[1]=self.cleft_ret_tcr.DI(self.TCR.shared.V) #update PSC RET---|> TCR self.cleft_trn_tcr_b.shared.preV=self.TRN.shared.V #cleft takes presynaptic V self.cleft_trn_tcr_b.r_next() #cleft updates r self.TCR.I[2]=self.cleft_trn_tcr_b.DI(self.TCR.shared.V) #update PSC self.cleft_trn_tcr_a.shared.preV=self.TRN.shared.V #cleft takes presynaptic V self.cleft_trn_tcr_a.r_next() #cleft updates r self.TCR.I[3]=self.cleft_trn_tcr_a.DI(self.TCR.shared.V) #cleft updates r self.cleft_in_tcr.shared.preV=self.IN.shared.V #cleft takes presynaptic V self.cleft_in_tcr.r_next() #cleft updates r self.TCR.I[4]=self.cleft_in_tcr.DI(self.TCR.shared.V) #update PSC self.TCR.V_next() #UPDATE IN self.cleft_ret_in.r_next() #cleft updates r self.IN.I[1]=self.cleft_ret_in.DI(self.IN.shared.V) #update PSC self.cleft_in_in.shared.preV=self.IN.shared.V #cleft takes presynaptic V self.cleft_in_in.r_next() #cleft updates r self.IN.I[2]=self.cleft_in_in.DI(self.IN.shared.V) #update PSC self.IN.V_next() #---------------------------------------- def stepN(self, p, N, data_Vtcr, data_Vtrn, data_Vin): #makes N steps, receives a vector of N retinal impulses and output lists data_Vtcr.append(self.tcr_V0) data_Vtrn.append(self.trn_V0) data_Vin.append(self.in_V0) for i in xrange(N): self.step(p[i]) data_Vtcr.append(self.TCR.shared.V) #write to output list data_Vtrn.append(self.TRN.shared.V) data_Vin.append(self.IN.shared.V) name=current_process().name print name+" "+str(i) ###################################################################################################################### ############################### CODE THAT RUNS THE SIMULATION OF THE MODEL ########################################### ###################################################################################################################### def run(exec_t): """ runs the simulation for t=exec_t seconds """ t_0=time() mu=-45.0 #mV sigma=stdd #20.0 #mV ret=retina(mu,sigma) #create instance of white noise generator #initial conditions tcr_V0=-61.0 #mV ic excitatory potential trn_V0=-84.0 #mV ic inhibitory potential in_V0=-70.0 #mV ic excitatory potential ###########################LISTS FOR STORING DATA POINTS################################ t=linspace(0.0,exec_t,exec_t*sec_steps+1) # data_Vtcr=[] # data_Vtcr.append(tcr_V0) # # data_Vtrn=[] # data_Vtrn.append(trn_V0) # # data_Vin=[] # data_Vin.append(in_V0) # ###NUMBER OF INSTANCES # N=2 # pulse=ret.create_pulse() # #CREATE INSTANCES # groupN=[] # for i in xrange(N): # g=group(in_V0,tcr_V0,trn_V0) # groupN.append(g) # # for i in t[1:]: # p=pulse() # proc=[] # for j in xrange(N): # pr=Process(name="group_"+str(j),target=groupN[j].step, args=(p,)) # pr.start() # proc.append(pr) # for j in xrange(N): # proc[j].join(N) # # data_Vtcr.append((groupN[0].TCR.shared.V+groupN[1].TCR.shared.V)*0.5) #write to output list # data_Vtrn.append((groupN[0].TRN.shared.V+groupN[1].TRN.shared.V)*0.5) # data_Vin.append((groupN[0].IN.shared.V+groupN[1].IN.shared.V)*0.5) #############FOR LOOPING INSIDE INSTANCE ---FASTER############################################# #CREATE p vector of retinal pulses p=[] pulse=ret.create_pulse() for k in xrange(len(t)-1): p.append(pulse()) #CREATE INSTANCES N=2 groupN=[] proc=[] manager=mp.Manager() #creating a shared namespace data_Vtcr_0 = manager.list() data_Vtrn_0 = manager.list() data_Vin_0 = manager.list() data_Vtcr_1 = manager.list() data_Vtrn_1 = manager.list() data_Vin_1 = manager.list() data_Vtcr=[data_Vtcr_0, data_Vtcr_1] data_Vtrn=[data_Vtrn_0, data_Vtrn_1] data_Vin=[data_Vin_0, data_Vin_1] for j in xrange(N): g=group(tcr_V0,trn_V0,in_V0) groupN.append(g) for j in xrange(N): pr=Process(name="group_"+str(j),target=groupN[j].stepN, args=(p, len(t)-1, data_Vtcr[j], data_Vtrn[j], data_Vin[j],)) pr.start() proc.append(pr) for j in xrange(N): proc[j].join() data_Vtcr_av=[0.5*i for i in map(add, data_Vtcr[0], data_Vtcr[1])] data_Vtrn_av=[0.5*i for i in map(add, data_Vtrn[0], data_Vtrn[1])] data_Vin_av =[0.5*i for i in map(add, data_Vin[0], data_Vin[1])] print len(t), len(data_Vtcr[0]), len(data_Vtcr_av) ##Plotting##################################### subplot(3,1,1) xlabel('t') ylabel('tcr - mV') plot(t[50*sec_steps:],array(data_Vtcr_av)[50*sec_steps:], color='black') subplot(3,1,2) xlabel('t') ylabel('trn - mV') plot(t[50*sec_steps:],array(data_Vtrn_av)[50*sec_steps:], color='magenta') subplot(3,1,3) xlabel('t') ylabel('IN - mV') plot(t[50*sec_steps:],array(data_Vin_av)[50*sec_steps:], color='red') #savefig('./v_tcr.eps', format='eps', dpi=1000) ############################################### t_1=time() #measure elapsed time print "elapsed time: ", t_1-t_0, " seconds." #save data to file FILE=open("./output.dat","w") FILE.write("########################\n") FILE.write("# t V #\n") FILE.write("########################\n") for k in range(len(t)): FILE.write(str(t[k]).zfill(5)+"\t"*3+repr(data_Vtcr_av[k])+"\n") FILE.close() ################# show() return t,array(data_Vtcr) ###################################################################################################################### ###################################################################################################################### if __name__ == "__main__": run(60) #run simulation for 60 seconds
你的问题是你太依赖于multiprocessing.Manager
Proxy
对象来做你的数学计算。 我试图警告你这个multiprocessing.Manager
缺点。 multiprocessing.Manager
在我回答你原来的问题时,但我的措辞不够强大。 我说过这个:
请记住,一个
multiprocessing.Manager
启动一个子进程来管理你创建的所有共享实例,并且每次访问其中一个Proxy
实例时,实际上是对Manager
进程进行IPC调用。
我应该补充说:“在同一个过程中,IPC的调用要比正常的访问要昂贵得多。 你原来的问题并没有真正的说明你将如何使用Manager
实例,所以我没有想到强调它。
考虑一个简单的从一个 Proxy
变量中读取循环的例子:
>>> timeit.timeit("for _ in range(1000): x = v + 2", setup="v = 0", number=1000) 0.040110111236572266 >>> timeit.timeit("for _ in range(1000): x = shared.v + 2", setup="import multiprocessing ; m = multiprocessing.Manager() ; shared = m.Namespace(); shared.v = 0", number=1000) 15.048354864120483
当您引入共享变量时,速度几乎慢了400倍。 现在,这个例子有一点点极端,因为我们在一个紧密的循环中访问共享变量,但是这个点是站立的; 访问Proxy
变量很慢 。 而且你在你的程序中做了很多。 访问Proxy
的额外开销远远大于通过同时运行两个进程而获得的成本。
您将需要重构这些代码,以将Proxy
变量的使用量保持在最低限度。 你可能会发现更多的成功取代了multiprocessing.Namespace
multiprocessing.Value
的多用途。 multiprocessing.Namespace
与multiprocessing.Value
,它们存储在共享内存,而不是在一个单独的进程。 这使得它们快得多(虽然比常规变量还要慢):
>>> timeit.timeit("for _ in range(1000): x = v.value + 2", setup="import multiprocessing ; v = multiprocessing.Value('i', 0)", number=1000) 0.29022717475891113
如果你使用lock=False
初始化,事情会变得更快:
>>> timeit.timeit("for _ in range(1000): x = v.value + 2", setup="import multiprocessing ; v = multiprocessing.Value('i', 0, lock=False)", number=1000) 0.06386399269104004
但是这个Value
不再是自动的过程安全的。 你需要显式的创建一个multiprocessing.Lock
来同步对这两个变量的访问,
multiprocessing.Value
唯一的其他限制是你只能使用ctypes
或array
模块支持的类型。 因为你主要使用整数和浮点数,所以这实际上应该大部分都可以。 您可能需要保留作为Proxy
实例的唯一部分是列表,尽管您也可能使用multiprocessing.Array
。
一些数字python模块,比如numpy,可以改变python解释器的cpu关联性(解释器可以同时使用多少个内核)。 当python模块链接到某些多线程的BLAS库时,通常会发生这种情况。 这个问题可能导致多个python进程只在一个内核上运行,这使得它比单线程版本更慢,特别是在处理共享状态时。
检查你的程序是否没有使用你的核心。 如果没有使用它们,可以通过执行一个系统调用(使用os.system
)到taskset
命令来更改cpu关联,并使用正确的参数。
每个过程需要多长时间 – 他们在做多少工作? 开始一个单独的进程会产生开销 – 当你这样做的时候,必须创建一个新的子进程,并且父进程的环境必须被复制到它。 如果你正在处理非常短暂的工作,那么它可能比一个单一的线程替代方法慢。
如果你的个人工作很短,可以尝试增加工作量,看看你的速度是否有所提高。