多重处理在Ubuntu中工作,而不在Windows中

我试图使用这个例子作为我的cherrypy应用程序的排队系统的模板。

我能够将其从Python 2转换为Python 3( from Queue import Empty变为from queue import Empty ),并在Ubuntu中执行它。 但是,当我在Windows中执行它,我得到以下错误:

 F:\workspace\test>python test.py Traceback (most recent call last): File "test.py", line 112, in <module> broker.start() File "C:\Anaconda3\lib\multiprocessing\process.py", line 105, in start self._popen = self._Popen(self) File "C:\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "C:\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen return Popen(process_obj) File "C:\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__ reduction.dump(process_obj, to_child) File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump ForkingPickler(file, protocol).dump(obj) TypeError: cannot serialize '_io.TextIOWrapper' object F:\workspace\test>Traceback (most recent call last): File "<string>", line 1, in <module> File "C:\Anaconda3\lib\multiprocessing\spawn.py", line 100, in spawn_main new_handle = steal_handle(parent_pid, pipe_handle) File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 81, in steal_handle _winapi.PROCESS_DUP_HANDLE, False, source_pid) OSError: [WinError 87] The parameter is incorrect 

以下是完整的代码:

 # from http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html import sys import logging from logging import handlers from cherrypy.process import wspbus class MyBus(wspbus.Bus): def __init__(self, name=""): wspbus.Bus.__init__(self) self.open_logger(name) self.subscribe("log", self._log) def exit(self): wspbus.Bus.exit(self) self.close_logger() def open_logger(self, name=""): logger = logging.getLogger(name) logger.setLevel(logging.INFO) h = logging.StreamHandler(sys.stdout) h.setLevel(logging.INFO) h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s")) logger.addHandler(h) self.logger = logger def close_logger(self): for handler in self.logger.handlers: handler.flush() handler.close() def _log(self, msg="", level=logging.INFO): self.logger.log(level, msg) import random import string from multiprocessing import Process class Bank(object): def __init__(self, queue): self.bus = MyBus(Bank.__name__) self.queue = queue self.bus.subscribe("main", self.randomly_place_order) self.bus.subscribe("exit", self.terminate) def randomly_place_order(self): order = random.sample(['BUY', 'SELL'], 1)[0] code = random.sample(string.ascii_uppercase, 4) amount = random.randint(0, 100) message = "%s %s %d" % (order, ''.join(code), amount) self.bus.log("Placing order: %s" % message) self.queue.put(message) def run(self): self.bus.start() self.bus.block(interval=0.01) def terminate(self): self.bus.unsubscribe("main", self.randomly_place_order) self.bus.unsubscribe("exit", self.terminate) from queue import Empty class Broker(Process): def __init__(self, queue): Process.__init__(self) self.queue = queue self.bus = MyBus(Broker.__name__) self.bus.subscribe("main", self.check) def check(self): try: message = self.queue.get_nowait() except Empty: return if message == "stop": self.bus.unsubscribe("main", self.check) self.bus.exit() elif message.startswith("BUY"): self.buy(*message.split(' ', 2)[1:]) elif message.startswith("SELL"): self.sell(*message.split(' ', 2)[1:]) def run(self): self.bus.start() self.bus.block(interval=0.01) def stop(self): self.queue.put("stop") def buy(self, code, amount): self.bus.log("BUY order placed for %s %s" % (amount, code)) def sell(self, code, amount): self.bus.log("SELL order placed for %s %s" % (amount, code)) if __name__ == '__main__': from multiprocessing import Queue queue = Queue() broker = Broker(queue) broker.start() bank = Bank(queue) bank.run() 

Solutions Collecting From Web of "多重处理在Ubuntu中工作,而不在Windows中"

问题是MyBus对象的某些部分是不可MyBus ,而且你正在将一个MyBus的实例MyBus到你的Broker实例中。 由于Windows缺少fork()支持,因此在调用broker.start() ,必须在broker进程中重新创建broker的整个状态,以便multiprocessing执行broker.run 。 它在Linux上工作,因为Linux支持fork ; 在这种情况下,不需要腌制任何东西 – 子进程只要分叉就包含父进程的完整状态。

解决这个问题有两种方法。 第一个也是比较困难的方法是让你的broker实例可供选择。 要做到这一点,你需要使MyBus 。 您现在得到的错误是指MyBus上的logger属性,这是不可选的。 那个很容易修复; 只需将__getstate__ / __setstate__方法添加到MyBus ,这些方法用于控制对象如何__getstate__ / __setstate__ MyBus 。 如果我们在清理时删除记录器,并在解开时重新创建记录器,我们将解决这个问题:

 class MyBus(wspbus.Bus): ... def __getstate__(self): self_dict = self.__dict__ del self_dict['logger'] return self_dict def __setstate__(self, d): self.__dict__.update(d) self.open_logger() 

这工作,但后来我们再次酸洗错误:

 Traceback (most recent call last): File "async2.py", line 121, in <module> broker.start() File "C:\python34\lib\multiprocessing\process.py", line 105, in start self._popen = self._Popen(self) File "C:\python34\lib\multiprocessing\context.py", line 212, in _Popen return _default_context.get_context().Process._Popen(process_obj) File "C:\python34\lib\multiprocessing\context.py", line 313, in _Popen return Popen(process_obj) File "C:\python34\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__ reduction.dump(process_obj, to_child) File "C:\python34\lib\multiprocessing\reduction.py", line 60, in dump ForkingPickler(file, protocol).dump(obj) _pickle.PicklingError: Can't pickle <class 'cherrypy.process.wspbus._StateEnum.State'>: attribute lookup State on cherrypy.process.wspbus failed 

结果是cherrypy.process.wspbus._StateEnum.State ,它是由MyBus继承的wspbus.Bus类的一个属性,是一个嵌套类,嵌套类不能被腌制:

 class _StateEnum(object): class State(object): name = None def __repr__(self): return "states.%s" % self.name 

State对象(惊喜)用于跟踪Bus实例的状态。 由于在启动总线之前我们正在进行酸洗,所以我们可以在pickle时从对象中删除state属性,并在unpickle时将其设置为States.STOPPED。

 class MyBus(wspbus.Bus): def __init__(self, name=""): wspbus.Bus.__init__(self) self.open_logger(name) self.subscribe("log", self._log) def __getstate__(self): self_dict = self.__dict__ del self_dict['logger'] del self_dict['state'] return self_dict def __setstate__(self, d): self.__dict__.update(d) self.open_logger() self.state = wspbus.states.STOPPED # Initialize to STOPPED 

通过这些更改,代码可以按预期工作! 唯一的限制是,如果公共汽车还没有启动,那么腌制MyBus是安全的,这对你的用例来说是很好的。

再次,这是困难的方式。 简单的方法是删除需要腌制MyBus实例。 您可以在子进程中创建MyBus实例,而不是在父进程中:

 class Broker(Process): def __init__(self, queue): Process.__init__(self) self.queue = queue ... def run(self): self.bus = MyBus(Broker.__name__) # Create the instance here, in the child self.bus.subscribe("main", self.check) self.bus.start() self.bus.block(interval=0.01) 

只要你不需要访问父母的broker.bus ,这是更简单的选择。