python2.7multiprocessing共享字典dict中的进程安全性线程安全性测试
在开发过程中,需要使⽤多进程多线程来进⾏⾼性能开发,⽬的是cpu跑满,带宽跑满,但是在使⽤过程中发现很多共享变量、调⽤优先级的问题。
本⽂对python 2.7 的multiprocessing模块中的共享字典的线程安全性进⾏测试。直接上完成代码进⾏分析。
#!/usr/bin/python
# coding=utf-8
'''
测试 multiprocessing 中 dict 的共享特征
'''
import json
import threading
import os
from multiprocessing import Process, Lock, Manager
# 测试共享 dict 中常规 list 如何能够添加
def deal(lock, share_dict):
share_pid()]=["a"]
lock.acquire()
mydict =dict(share_dict)# 注意,共享dict⽆法直接dumps,会报类型错误,必须先转换为普通字典
json.dumps(mydict)
print mydict
# 测试共享 dict 中常规 list 如何能够添加
def deal2(lock, share_dict):
for i in range(10):
# lock.acquire()
share_dict["hello"]+=1
# lease()
lock.acquire()
mydict =dict(share_dict)# 注意,共享dict⽆法直接dumps,会报类型错误,必须先转换为普通字典
json.dumps(mydict)
print mydict
# 测试线程
def thread_test(share_dcit):
share_dcit["index"]+=1
pass
# 测试在多线程下共享变量的线程安全性
def deal3(share_dcit):
threads_num =20
for i in range(threads_num):
t = threading.Thread(target=thread_test, args=(share_dcit,))
t.daemon =True
t.start()
pass
def test():
p_num =10
process =list()
lock = Lock()
m = Manager()
m = Manager()
share_dict = m.dict()# 多进程共享变量字典
for i in xrange(p_num):
process.append(Process(target=deal, args=(lock, share_dict,)))
# share_dict["hello"] = 0
# process.append(Process(target=deal2, args=(lock, share_dict, )))
# share_dict["index"] = 100
# process.append(Process(target=deal3, args=(share_dict, )))
for p in process:
p.start()
for p in process:
p.join()
print share_dict
if __name__ =='__main__':
test()
本段代码主要进⾏了三段测试,deal() 函数测试共享dict中常规list如何能够添加问题,deal2()测试共享dict中的进程安全性,deal3()测试共享dict的线程安全性。
deal() 共享dict中list添加元素
在进⾏功能测试时,发现对于共享dict中的list元素,使⽤append是没有办法添加新元素的
def deal(lock, share_dict):
share_pid()]=["a"]
share_pid()].append("b")
lock.acquire()
mydict =dict(share_dict)# 注意,共享dict⽆法直接dumps,会报类型错误,必须先转换为普通字典
json.dumps(mydict)
print mydict
使⽤append后可以看到共享dict中没有添加进“b”。
将append()函数替换为 + 后,可以正常添加元素
def deal(lock, share_dict):
share_pid()]=["a"]
# share_pid()].append("b")
share_pid()]+=["b"]
lock.acquire()
mydict =dict(share_dict)# 注意,共享dict⽆法直接dumps,会报类型错误,必须先转换为普通字典
json.dumps(mydict)
print mydict
共享dict的进程安全性
将⽰例代码的注释进⾏修改
# process.append(Process(target=deal, args=(lock, share_dict, )))
share_dict["hello"]=0
process.append(Process(target=deal2, args=(lock, share_dict,)))
# share_dict["index"] = 100
# process.append(Process(target=deal3, args=(share_dict, )))
此段代码修改共享变量中 share_dict[“hello”] 计数器,在不适⽤lock机制的情况下,可以看到执⾏图如下图,程序设置的10个进程,每个进程中循环⾃增了10次,正产情况计数器应该在100,⼆现在只到
了83,说明在进⾏⾃增计算时出现了并发错误。
使⽤lock机制加锁,可以看到计数器正常了,到了100.
lock.acquire()
share_dict["hello"]+=1
共享dict的线程安全
将⽰例代码的注释进⾏修改
# process.append(Process(target=deal, args=(lock, share_dict, )))
# share_dict["hello"] = 0
# process.append(Process(target=deal2, args=(lock, share_dict, )))
share_dict["index"]=100
process.append(Process(target=deal3, args=(share_dict,)))
线程安全测试中,使⽤了10个进程,每个进程启动20个线程,按照正常计算,计数器share_dcit[“index”]应该到200,看⼀下现在计数结果。
计数器最终只到了77,说明在线程运⾏过程中,出现了⼤量的并发问题。
使⽤进程所 进程锁进⾏加锁时,即
def thread_test(lock, share_dcit):
lock.acquire()
share_dcit["index"]+=1
pass
发现并发问题更重了,可以看到,进程所对线程并不起作⽤
想到了将线程锁传⼊进程中使⽤,如下图,可以看到使⽤线程锁python会报错。
结论
1. multiprocesses中共享变量不具备进程安全和线程安全性,在使⽤过程中要⾃⼰进⾏加锁操作。同时,对某些特殊属性深拷贝机制也
需要在后⾯的使⽤中进⼀步研究。
2. 多个进程中同时开启多个线程,⽬前没有到python直接可以进⾏变量控制的⽅法,会继续在⽹上查,可以采⽤的替代⼿段是直接
将结果保存在dict或者其他共享变量中,然后在主进程在对此变量进⾏统计,⽽不采⽤在直接的计数器⽅式。
去看了看源代码,发现共享dict的注册源码是
AcquirerProxy)
# types returned by methods of PoolProxy
注册函数如下,看的不是很懂,也不想研究了-_-,没有到有类似Lock机制的地⽅
@classmethod
def register(cls, typeid,callable=None, proxytype=None, exposed=None,
method_to_typeid=None, create_method=True):
'''
Register a typeid with the manager type
'''
if'_registry'not in cls.__dict__:
cls._registry = cls._py()
if proxytype is None:
proxytype = AutoProxy
exposed = exposed or getattr(proxytype,'_exposed_',None)
method_to_typeid = method_to_typeid or \
getattr(proxytype,'_method_to_typeid_',None)
if method_to_typeid:
for key, value in method_to_typeid.items():
assert type(key)is str,'%r is not a string'% key
assert type(value)is str,'%r is not a string'% value
cls._registry[typeid]=(
callable, exposed, method_to_typeid, proxytype
)
if create_method:
def temp(self,*args,**kwds):
util.debug('requesting creation of a shared %r object', typeid)
token, exp = self._create(typeid,*args,**kwds)
proxy = proxytype(
token, self._serializer, manager=self,
authkey=self._authkey, exposed=exp
)
conn = self._Client(token.address, authkey=self._authkey)
dispatch(conn,None,'decref',(token.id,))
一个线程可以包含多个进程return proxy
temp.__name__ = typeid
setattr(cls, typeid, temp)
在这两天的⼯作中,想到了⼀个可以控制多进程中多线程共享变量访问的⽅法。⽅法是在使⽤共享变
量时,先⽤单个进程内的线程锁加锁,再⽤进程锁加锁,这样就能保证共享变量在同⼀时间只被⼀个线程访问。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论