python的多线程及线程同步⽅式
1.线程执⾏
join与setDaemon
1.⼦线程在主线程运⾏结束后,会继续执⾏完,如果给⼦线程设置为守护线程(setDaemon=True),主线程运⾏结束⼦线程即结束;
2 .如果join()线程,那么主线程会等待⼦线程执⾏完再执⾏。
import threading
import time
def get_thread_a():
print("get thread A started")
time.sleep(3)
print("get thread A end")
def get_thread_b():
print("get thread B started")
time.sleep(5)
print("get thread B end")
if __name__ =="__main__":
thread_a = threading.Thread(target=get_thread_a)
thread_b = threading.Thread(target=get_thread_b)
start_time = time.time()
thread_b.setDaemon(True)
thread_a.start()
thread_b.start()
thread_a.join()
end_time = time.time()
print("execution time: {}".format(end_time - start_time))
thread_a是join,⾸先⼦线程thread_a执⾏,thread_b是守护线程,当主线程执⾏完后,thread_b不会再执⾏
执⾏结果如下:
get thread A started
get thread B started
get thread A end
execution time: 3.003199815750122
2.线程同步
多线程间共享全局变量,多个线程对该变量执⾏不同的操作时,该变量最终的结果可能是不确定的(每次线程执⾏后的结果不同),如:对count变量执⾏加减操作 ,count的值是不确定的,要想count的值是⼀个确定的需对线程执⾏的代码段加锁。
3.线程同步的⽅式
3.1 锁机制
python对线程加锁主要有Lock和Rlock模块
Lock:
from threading import Lock
lock = Lock()
lock.acquire()
Lock有acquire()和release()⽅法,这两个⽅法必须是成对出现的,acquire()后⾯必须release()后才能再a
cquire(),否则会造成死锁Rlock:
鉴于Lock可能会造成死锁的情况,RLock(可重⼊锁)对Lock进⾏了改进,RLock可以在同⼀个线程⾥⾯连续调⽤多次acquire(),但必须再执⾏相同次数的release()
from threading import RLock
lock = RLock()
lock.acquire()
lock.acquire()
当⼀个线程调⽤锁的acquire()⽅法获得锁时,锁就进⼊“locked”状态。每次只有⼀个线程可以获得锁。如果此时另⼀个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“同步阻塞”(参见多线程的基本概念)。
直到拥有锁的线程调⽤锁的release()⽅法释放锁之后,锁进⼊“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择⼀个来获得锁,并使得该线程进⼊运⾏(running)状态。
3.2 Semaphore(信号量)
信号量也提供acquire⽅法和release⽅法,每当调⽤acquire⽅法的时候,如果内部计数器⼤于0,则将其减1,如果内部计数器等于0,则会阻塞该线程,直到有线程调⽤了release⽅法将内部计数器更新到⼤于1位置。
Semaphore(信号量)是计算机科学史上最古⽼的同步指令之⼀。Semaphore管理⼀个内置的计数器,每当调⽤acquire()时-1,调⽤release() 时+1。计数器不能⼩于0;当计数器为0时,acquire()将阻塞线程⾄同步锁定状态,直到其他线程调⽤release()。
基于这个特点,Semaphore经常⽤来同步⼀些有“访客上限”的对象,⽐如连接池。
BoundedSemaphore 与Semaphore的唯⼀区别在于前者将在调⽤release()时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出⼀个异常。
构造⽅法:
Semaphore(value=1): value是计数器的初始值。
import time
import threading
def get_thread_a(semaphore,i):
time.sleep(1)
print("get thread : {}".format(i))
def get_thread_b(semaphore):
for i in range(10):
semaphore.acquire()
thread_a = threading.Thread(target=get_thread_a, args=(semaphore,i))
thread_a.start()
if __name__ =="__main__":thread技术
semaphore = threading.Semaphore(2)
thread_b = threading.Thread(target=get_thread_b, args=(semaphore,))
thread_b.start()
3.3 条件判断
所谓条件变量,即这种机制是在满⾜了特定的条件后,线程才可以访问相关的数据。
它使⽤Condition类来完成,由于它也可以像锁机制那样⽤,所以它也有acquire⽅法和release⽅法,⽽且它还有wait,notify,notifyAll ⽅法。
"""
⼀个简单的⽣产消费者模型,通过条件变量的控制产品数量的增减,调⽤⼀次⽣产者产品就是+1,调⽤⼀次消费者产品就会-1.
"""
"""
使⽤ Condition 类来完成,由于它也可以像锁机制那样⽤,所以它也有 acquire ⽅法和 release ⽅法,⽽且它还有
wait, notify, notifyAll ⽅法。
"""
import threading
import queue,time,random
class Goods:#产品类
def__init__(self):
def add(self,num =1):
def sub(self):
def sub(self):
unt>=0:
def empty(self):
unt <=0
class Producer(threading.Thread):#⽣产者类
def__init__(self,condition,goods,sleeptime =1):#sleeptime=1
threading.Thread.__init__(self)
self.sleeptime = sleeptime
def run(self):
cond = d
goods = ds
while True:
cond.acquire()#锁住资源
goods.add()
print("产品数量:",unt,"⽣产者线程")
time.sleep(self.sleeptime)
class Consumer(threading.Thread):#消费者类
def__init__(self,condition,goods,sleeptime =2):#sleeptime=2
threading.Thread.__init__(self)
self.sleeptime = sleeptime
def run(self):
cond = d
goods = ds
while True:
time.sleep(self.sleeptime)
cond.acquire()#锁住资源
pty():#如⽆产品则让线程等待
cond.wait()
goods.sub()
print("产品数量:",unt,"消费者线程")
g = Goods()
c = threading.Condition()
pro = Producer(c,g)
pro.start()
con = Consumer(c,g)
con.start()
Condition内部有⼀把锁,默认是RLock,在调⽤wait()和notify()之前必须先调⽤acquire()获取这个锁,才能继续执⾏;当wait()和notify()执⾏完后,需调⽤release()释放这个锁,在执⾏with condition时,会先执⾏acquire(),with结束时,执⾏了release();所以condition有两层锁,最底层锁在调⽤wait()时会释放,同时会加⼀把锁到等待队列,等待notify()唤醒释放锁
wait() :允许等待某个条件变量的通知,notify()可唤醒
notify(): 唤醒等待队列wait()
# encoding: UTF-8
import threading
import time
# 商品
product =None
# 条件变量
con = threading.Condition()
# ⽣产者⽅法
def produce():
global product
if con.acquire():
while True:
if product is None:
'
product ='anything'
# 通知消费者,商品已经⽣产 ify()
# 等待通知
con.wait()
time.sleep(2)
# 消费者⽅法
def consume():
global product
if con.acquire():
while True:
if product is not None:
'
product =None
# 通知⽣产者,商品已经没了 ify()
# 等待通知
con.wait()
time.sleep(2)
t1 = threading.Thread(target=produce) t2 = threading.Thread(target=consume) t2.start()
t1.start()
3.4 同步队列
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论