Python多线程——线程间通信与同步机制
线程间通信
1.Queue
使⽤线程队列有⼀个要注意的问题是,向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引⽤。如果你担⼼对象的共享状态,那你最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者⼀个对象的深拷贝。
Queue 对象提供⼀些在当前上下⽂很有⽤的附加特性。⽐如在创建 Queue 对象时提供可选的 size 参数来限制可以添加到队列中的元素数量。对于“⽣产者”与“消费者”速度有差异的情况,为队列中的元素数量添加上限是有意义的。⽐如,⼀个“⽣产者”产⽣项⽬的速度⽐“消费者”“消费”的速度快,那么使⽤固定⼤⼩的队列就可以在队列已满的时候阻塞队列,以免未预期的连锁效应扩散整个程序造成死锁或者程序运⾏失常。在通信的线程之间进⾏“流量控制”是⼀个看起来容易实现起来困难的问题。如果你发现⾃⼰曾经试图通过摆弄队列⼤⼩来解决⼀个问题,这也许就标志着你的程序可能存在脆弱设计或者固有的可伸缩问题。 get() 和 put() ⽅法都⽀持⾮阻塞⽅式和设定超时。
import queue
q = queue.Queue()
try:
data = q.get(block=False)
except queue.Empty:
...
try:
q.put(item, block=False)进程间通信和线程间通信的区别
except queue.Full:
...
try:
data = q.get(timeout=5.0)
except queue.Empty:
...
def producer(q):
...
try:
q.put(item, block=False)
except queue.Full:
log.warning('queued item %r discarded!', item)
_running = True
def consumer(q):
while _running:
try:
item = q.get(timeout=5.0)
# Process item
...
except queue.Empty:
pass
最后,有 q.qsize() , q.full() , q.empty() 等实⽤⽅法可以获取⼀个队列的当前⼤⼩和状态。但要注意,这些⽅法都不是线程安全的。可能你对⼀个队列使⽤empty() 判断出这个队列为空,但同时另外⼀个线程可能已经向这个队列中插⼊⼀个数据项。所以,你最好不要在你的代码中使⽤这些⽅法。
为了避免出现死锁的情况,使⽤锁机制的程序应该设定为每个线程⼀次只允许获取⼀个锁。如果不能这样做的话,你就需要更⾼级的死锁避免机制。在 threading 库中还提供了其他的同步原语,⽐如 RLock 和 Semaphore 对象。
Queue提供的⽅法:
task_done()
意味着之前⼊队的⼀个任务已经完成。由队列的消费者线程调⽤。每⼀个get()调⽤得到⼀个任务,接下来的task_done()调⽤告诉队列该任务已经处理完毕。
如果当前⼀个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执⾏(即每⼀个由put()调⽤⼊队的任务都有⼀个对应的task_done()调⽤)。
join()
阻塞调⽤线程,直到队列中的所有任务被处理掉。
只要有数据被加⼊队列,未完成的任务数就会增加。当消费者线程调⽤task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
put(item[, block[, timeout]])
将item放⼊队列中。
1.如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调⽤,⽆超时)。
2.如果timeout是个正整数,阻塞调⽤进程最多timeout秒,如果⼀直⽆空空间可⽤,抛出Full异常(带超时的阻塞调⽤)。
3.如果block为False,如果有空闲空间可⽤将数据放⼊队列,否则⽴即抛出Full异常
其⾮阻塞版本为put_nowait等同于put(item, False)
get([block[, timeout]])
从队列中移除并返回⼀个数据。block跟timeout参数同put⽅法
其⾮阻塞⽅法为get_nowait()相当与get(False)
empty()
如果队列为空,返回True,反之返回False
同步机制
Event
线程的⼀个关键特性是每个线程都是独⽴运⾏且状态不可预测。如果程序中的其他线程需要通过断某个线程的状态来确定⾃⼰下⼀步的操作,这时线程同步问题就会变得⾮常棘⼿。为了解决这些问题,我们需要使⽤ threading 库中的 Event 对象。Event 对象包含⼀个可由线程设置的信号标志,它允许线程等待某些事件的发⽣。在初始情况下,event 对象中的信号标志被设置假。如果有线程等待⼀个 event 对象,⽽这个 event 对象的标志为假,那么这个线程将会被⼀直阻塞直⾄该标志为真。⼀个线程如果将⼀个 event 对象的信号标志设置为真,它将唤醒所有等待个 event 对象的线程。如果⼀个线程等待⼀个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执⾏。
from threading import Thread, Event
import time
def countdown(n, start_evt):
print('countdown ')
start_evt.set()
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)
start_evt = Event() # 可通过Event 判断线程的是否已运⾏
t = Thread(target=countdown, args=(10, start_evt))
t.start()
print('')
start_evt.wait() # 等待countdown执⾏
# event 对象的⼀个重要特点是当它被设置为真时会唤醒所有等待它的线程
print('countdown ')
Semaphore(信号量)
在多线程编程中,为了防⽌不同的线程同时对⼀个公⽤的资源(⽐如全部变量)进⾏修改,需要进⾏同
时访问的数量(通常是1)的限制。信号量同步基于内部计数器,每调⽤⼀次acquire(),计数器减1;每调⽤⼀次release(),计数器加1.当计数器为0时,acquire()调⽤被阻塞。
from threading import Semaphore, Lock, RLock, Condition, Event, Thread
import time
# 信号量
sema = Semaphore(3) #限制同时能访问资源的数量为3
def foo(tid):
with sema:
print('{} acquire sema'.format(tid))
time.sleep(1)
print('{} release sema'.format(tid))
threads = []
for i in range(5):
t = Thread(target=foo, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
Lock(锁)
互斥锁为资源引⼊⼀个状态:锁定/⾮锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“⾮锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有⼀个线程进⾏写⼊操作,从⽽保证
了多线程情况下数据的正确性。
#创建锁
mutex = threading.Lock()
#锁定
mutex.acquire([timeout])
#释放
RLock(可重⼊锁)
为了⽀持在同⼀线程中多次请求同⼀资源,python提供了“可重⼊锁”:threading.RLock。RLock内部维护着⼀个Lock和⼀个counter变
量,counter记录了acquire的次数,从⽽使得资源可以被多次acquire。直到⼀个线程所有的acquire都被release,其他的线程才能获得资源。
import threading
import time
class MyThread(threading.Thread):
def run(self):
global num
time.sleep(1)
if mutex.acquire(1):
num = num+1
msg = self.name+' set num to '+str(num)
print msg
mutex.acquire()
num = 0
mutex = threading.RLock()
def test():
for i in range(5):
t = MyThread()
t.start()
if __name__ == '__main__':
test()
Condition(条件变量)
Condition被称为条件变量,除了提供与Lock类似的acquire和release⽅法外,还提供了wait和notify⽅法。线程⾸先acquire⼀个条件变量,然后判断⼀些条件。如果条件不满⾜则wait;如果条件满⾜,进⾏⼀些处理改变条件后,通过notify⽅法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条
件。不断的重复这⼀过程,从⽽解决复杂的同步问题。
可以认为Condition对象维护了⼀个锁(Lock/RLock)和⼀个waiting池。线程通过acquire获得Condition对象,当调⽤wait⽅法时,线程会释放Condition内部的锁并进⼊blocked状态,同时在waiting池中记录这个线程。当调⽤notify⽅法时,Condition对象会从waiting池中挑选⼀个线程,通知其调⽤acquire⽅法尝试取到锁。
Condition对象的构造函数可以接受⼀个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部⾃⾏创建⼀个RLock。
除了notify⽅法外,Condition对象还提供了notifyAll⽅法,可以通知waiting池中的所有线程尝试acquire内部锁。由于上述机制,处于waiting 状态的线程只能通过notify⽅法唤醒,所以notifyAll的作⽤在于防⽌有线程永远处于沉默状态。
import threading
import time
class Producer:
def run(self):
global count
while True:
if con.acquire():
if count > 1000:
con.wait()
else:
count += 100
msg = threading.current_thread().name + ' produce 100, count=' + str(count)
print(msg)
time.sleep(1)
count = 0
con = threading.Condition()
class Consumer:
def run(self):
global count
while True:
if con.acquire():
if count < 100:
con.wait()
else:
count -= 3
msg = threading.current_thread().name + ' consumer 3, count=' + str(count) print(msg)
time.sleep(3)
producer = Producer()
consumer = Consumer()
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论