Python多线程多进程中的⼏个坑
Introduction
本⽂系转载,原⽂: 今天遇到的Python多线程、多进程中的⼏个坑
今天在写oj的判题端的时候犯了⼀个低级错误,就是为了加快判题速度,我就采⽤了多线程多组⽤例同时运⾏的⽅法,但是后来不经意的发现,明明跑的很快的程序到了我这实际运⾏时间就变成了好⼏倍,⽽cpu时间并没有太⼤的变化。
下⾯是我的测试数据,
//多线程
//两组⽤例同时运⾏
{‘cpu_time’: 3543.0, ‘real_time’: 13384.0, ‘test_case_id’: 2}
{‘cpu_time’: 3592.0, ‘real_time’: 13688.0, ‘test_case_id’: 1}
//只有⼀组测试⽤例
{‘cpu_time’: 3612.0, ‘real_time’: 6856.0, ‘test_case_id’: 1}
很明显的结果,下⾯是采⽤了多进程之后的测试数据
//多进程
//两组⽤例同时运⾏
{‘cpu_time’: 4110.0, ‘real_time’: 4250.0, ‘test_case_id’: 2}
{‘cpu_time’: 4121.0, ‘real_time’: 4298.0, ‘test_case_id’: 1}
//⼀组⽤例
{‘cpu_time’: 3861.0, ‘real_time’: 4040.0, ‘test_case_id’: 1}
好了,其实我不是专门想说这个的了,因为这是⼀个愚蠢的问题。我要记录⼀下今天遇到的三个多进程中的问题:
第⼀个问题
PicklingError: Can’t pickle : attribute lookup builtin.instancemethod failed
poc如下
from multiprocessing import Pool
class Runner(object):
def func(self, i):
print i
return i
runner = Runner()
pool = Pool(processes=5)
for i in range(5):
pool.apply_async(runner.func, (i, ))
pool.close()
pool.join()
这个问题只出现在Python2上,Python3没有问题。这是因为多进程之间要使⽤pickle来序列化并传递⼀些数据,但是实例⽅法并不能被pickle,参见 Python⽂档,可以被pickle的类型列表
,还有在Python3中实例⽅法可以被pickle了,见 Python bug list
最简单的解决办法就是写⼀个可以被pickle的函数代理⼀下
from multiprocessing import Pool
def run(cls_instance, i):
return cls_instance.func(i)
print i
return i
runner = Runner()
pool = Pool(processes=5)
for i in range(5):
pool.apply_async(run, (runner, i))
pool.close()
pool.join()
还有⼀个⽅法
已经被指出可能存在缺陷了,就是这个⼈第⼀个例⼦,但是我不知道为什么⼀个类可以被析构多次呢?是不是这个类实例化⼀次以后就被复制到了各个进程上,然后再单独进⾏的析构呢。这个⼈第⼆个例⼦是反驳的 call
⽅法的,我没法运⾏,总是提⽰ Can’t pickle : attribute lookup builtin.instancemethod failed
,估计是⼀样的原因。
第⼆个问题
pool作为实例变量的时候出错 pool objects cannot be passed between processes or pickled
把上⾯的例⼦稍微的改造了⼀下,
from multiprocessing import Pool
def run(cls_instance, i):
return cls_instance.func(i)
class Runner(object):
def init(self):
pool = Pool(processes=5)
for i in range(5):
pool.apply_async(run, (self, i))
pool.close()
pool.join()
def func(self, i):
print i
return i
runner = Runner()
把pool放在实例内部了,使⽤外部函数代理,运⾏正常。但是如果把⾥⾯的pool都换成 self.pool
的话,就会出现上⾯的错误。原因是在pickle传递给pool的对象的时候,这个对象就包含pool这个实例变量,它不能被pickle,造成错误。⽽不使⽤self的话,那就是 init
⽅法的⼀个局部变量,不受影响。解决⽅法就是⾃⼰实现 getstate
⽅法,它是决定什么需要pickle的函数,我们删除掉pool,不让它pickle就好了。 setstate
作⽤是相反的,是⽤来增加实例变量的。
看例⼦
from multiprocessing import Pool
def run(cls_instance, i):
return cls_instance.func(i)
self.pool = Pool(processes=2)
self.var = 10
for i in range(2):
self.pool.apply_async(run, (self, i))
self.pool.close()
self.pool.join()
def func(self, i):
print i
return i
def __getstate__(self):
self_dict = self.__dict__.copy()
print self.__dict__
del self_dict['pool']
return self_dict
def __setstate__(self, state):
print state
self.__dict__.update(state)
runner = Runner()
输出是
{‘var’: 10, ‘pool’: }
import pickle
{‘var’: 10, ‘pool’: }
{‘var’: 10}
{‘var’: 10}
1
也就是实例在unpickle的时候丢了pool这个变量,但是我们也不需要了,所以可以这样解决问题。第三个问题
⼦进程引发的异常怎么消失了?
from multiprocessing import Pool
def run(cls_instance, i):
return cls_instance.func(i)
class Runner(object):
def init(self):
self.pool = Pool(processes=2)
self.var = 10
for i in range(2):
self.pool.apply_async(run, (self, i))
self.pool.close()
self.pool.join()
def func(self, i):
if i == 1:
raise ValueError("xxx")
print i
return i
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
runner = Runner()
只能打印出⼀个0来,当i为1的时候有⼀个异常啊,怎么没显⽰出来。在 ⽂档
中这么说的
get([timeout]) Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().
apply_async
返回的是 AsyncResult
,其中出现的异常只有在调⽤()的时候才会被重新引发。
from multiprocessing import Pool
def run(cls_instance, i):
return cls_instance.func(i)
class Runner(object):
def init(self):
self.pool = Pool(processes=2)
results = []
for i in range(2):
results.append(self.pool.apply_async(run, (self, i)))
self.pool.close()
self.pool.join()
for i in range(2):
print results[i].get()
def func(self, i):
if i == 1:
raise ValueError("xxx")
return i
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
runner = Runner()

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。