Python--Python并⾏编程实战(第2版)python 并⾏编程
threading 线程
线程由3个元素组成:程序计数器、寄存器和堆栈。
同⼀个进程内的多个线程可以共享当前进程资源包括:数据和操作系统资源。
线程有⾃⼰的状态: 就绪(ready)、运⾏(running)、阻塞(blocked)。
创建线程 --> 就绪
就绪 --> 操作系统调度线程 --> 运⾏
运⾏ --> 超时 --> 就绪
运⾏ --> 线程等待某些条件 --> 阻塞
运⾏ --> 条件得到满⾜ --> 就绪
运⾏ --> 运⾏完成,线程终⽌
threading.Thread 类
init(group=None, target=None, name=None, args=(), kwargs={})
参数
group: 线程组,必须为None,为将来的实现保留的
target: ⽬标函数
name: 线程名
args: ⽬标函数的参数元组
kwargs: ⽬标函数的关键字参数字典
daemon: 是否为后台线程
⼀般主线程退出后,⼦线程还是在运⾏,如果希望主线程退出后⼦线程⾃动退出,则daemon就可以设置为True start()
启动线程
join()
等待线程结束
run()
业务逻辑,定义Thread⼦类时,可覆盖此⽅法
threading.currentThread 函数
获取当前线程
threading.Lock 类
只能⼀个线程获取到锁,但是,不是得到锁的线程才能释放锁,其他线程可以主动释放锁
aquire()
获得锁
release()
释放锁
locked()
判断锁是否已被获取
threading.RLock 类
重⼊锁,⼀个线程可多次获取锁,每个aquire()都要有对应的release(),只有得到锁的线程才能释放锁
aquire()
获得锁
release()
释放锁
⽰例1
import datetime
import random
import threading
import time
def print_(content):
print(f'{w()} {content}')
def work():
thread_name = threading.current_thread().name
print_(f'thread {thread_name} start working ...')
time.sleep(random.randint(1, 5))
print_(f'thread {thread_name} over.')
lock = threading.Lock()
def work_lock():
thread_name = threading.current_thread().name
# 不是得到锁的线程才能释放锁,其他线程可以主动释放锁
# if lock.locked():
# print_(f'thread {thread_name} release the lock')
# lease()
# 加锁之后,线程按顺序执⾏
lock.acquire()
print_(f'thread {thread_name} start working ...')
# 锁的获取与释放的位置很重要,不同位置,表现的结果不⼀样
# 启动之后就释放,则上⾯⽇志按顺序输出,over⽇志则会随机输出
# lease()
time.sleep(random.randint(1, 5))
print_(f'thread {thread_name} over.')
r_lock = threading.RLock()
def work_r_lock():
thread_name = threading.current_thread().name
# 加锁之后,线程按顺序执⾏
r_lock.acquire()
print_(f'thread {thread_name} start working ...')
r_lock.acquire()
print_(f'thread {thread_name} second require lock')
print_(f'thread {thread_name} first release lock')
lease()
time.sleep(random.randint(1, 3))
print_(f'thread {thread_name} over.')
lease()
def main():
workers = []
for i in range(10):
# workers.append(threading.Thread(target=work, name=f'worker_{i}'))
# workers.append(threading.Thread(target=work_lock, name=f'worker_lock_{i}'))
workers.append(threading.Thread(target=work_r_lock, name=f'worker_r_lock_{i}'))
for worker in workers:
worker.start()
for worker in workers:
worker.join()
print_('main end')
def run_or_daemon():
name = threading.current_thread().name
print_(f'process {name} start ...')
if name == 'background_process':
for i in range(0, 5):
print_(f'process {name} ---> {i}')
time.sleep(3)
else:
for i in range(5, 10):
print_(f'process {name} ---> {i}')
time.sleep(1)
print_(f'process {name} exist.')
def main_daemon():
background_proc = threading.Thread(target=run_or_daemon, name='background_process')
no_background_proc = threading.Thread(target=run_or_daemon, name='no_background_process')
background_proc.daemon = True
# no_background_proc.daemon = False
no_background_proc.start()
background_proc.start()
# background_proc.join()
# no_background_proc.join()
print_('main_daemon end.')
if __name__ == '__main__':
# main()
main_daemon()
threading.Semaphore 类
信号量,内部维护⼀个计数器,默认计数器的值为1,当调⽤aquire()⽅法时计数器值等于零,则线程将被阻塞,直到其他线程调⽤release(),使得计算器的值⼤于零。aquire()
thread技术
计数器减1
参数
breaking: bool 是否阻塞
timeout: 阻塞时长
返回:bool
release()
计数器加1
⽰例1
import logging
import random
import threading
import time
LOG_FORMAT = '%(asctime)s %(threadName)-17s %(levelname)-8s %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
semaphore = threading.Semaphore()
item = 0
def consumer():
logging.info('Consumer ')
semaphore.acquire()
logging.info(f'Consumer notify: item is {item}')
def producer():
global item
time.sleep(3)
item = random.randint(0, 1000)
logging.info(f'Producer nodify: item is {item}')
def main():
for i in range(10):
t1 = threading.Thread(target=consumer, name=f'consumer_{i}')
t2 = threading.Thread(target=producer, name=f'producer_{i}')
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == '__main__':
main()
threading.Condition 类
条件锁,内部维护了⼀个锁变量和⼀个waiter池
有个疑问:为什么⼀个线程aquire后没有release,但是其他线程也可以继续aquire执⾏??aquire()
获取锁,调⽤内部锁变量的aquire⽅法
release()
释放锁,调⽤内部锁变量的release⽅法
wait()
等待,加⼊waiter池
notify(n=1)
唤醒,从waiter池取n(默认为1)个进⾏唤醒
notify_all()
唤醒全部,避免有的线程⼀直得不到唤醒
⽰例1
import logging
import random
import threading
import time
LOG_FORMAT = '%(asctime)s %(threadName)-17s %(levelname)-8s %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
# lock = threading.Lock()
# condition = threading.Condition(lock)
condition = threading.Condition()
items = []
"""
有个疑问:为什么consume没有release,但是produce也可以继续执⾏??
"""
class Consumer(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def consume(self):
condition.acquire()
if len(items) == 0:
logging.info('no items to consume')
condition.wait()
item = items.pop()
logging.info(f'Consume item {item}')
def run(self):
for i in range(20):
# time.sleep(random.randint(1, 3))
time.sleep(2)
class Producer(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def produce(self, index):
condition.acquire()
if len(items) == 10:
logging.info(f'items produced {len(items)}. Stopped')
condition.wait()
items.append(index)
logging.info(f'Produce item {index}, now total is {len(items)}')
def run(self):
for i in range(20):
# time.sleep(random.randint(1, 3))
time.sleep(0.5)
self.produce(i)
def main():
t1 = Consumer()
t1_2 = Consumer()
t1_3 = Consumer()
t2 = Producer()
t1.start()
t1_2.start()
t1_3.start()
t2.start()
t1.join()
t1_2.join()
t1_3.join()
t2.join()
if __name__ == '__main__':
main()
threading.Event 类
事件,内部维护了⼀个Condition和⼀个flag
wait()
等待事件发⽣
set()
触发事件,flag设置为True,所有wait的线程都会激活
clear()
恢复flag为False,⼀般调⽤了set()之后需要调⽤clear(),否则wait()将不会被阻塞
⽰例1
import logging
import random
import threading
import time
LOG_FORMAT = '%(asctime)s %(threadName)-17s %(levelname)-8s %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
event = threading.Event()
items = []
class Consumer(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self):
while True:
logging.info('')
event.wait()
logging.info('')
# time.sleep(random.randint(1, 3))
time.sleep(1)
# item = items.pop()
logging.info(f'Consume item {1}')
class Producer(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self):
for i in range(20):
# time.sleep(random.randint(1, 3))
time.sleep(3)
items.append(i)
logging.info(f'Produce item {i}, now total is {len(items)}')
event.set()
event.clear()
def main():
t1 = Consumer()
t1_2 = Consumer()
t1_3 = Consumer()
t2 = Producer()
t1.start()
t1_2.start()
t1_3.start()
t2.start()
t1.join()
t1_2.join()
t1_3.join()
t2.join()
if __name__ == '__main__':
main()
threading.Barrier 类
屏障,内部维护了⼀个Condition,技术count,条件数量parties
init
参数
parties: 条件数量,当调⽤wait的数量达到了此值,就会释放⼀次
wait()
等待,计数count会加1,当wait的数量达到了指定的parties,就会释放,同时count会相应的减1。⽰例
import datetime
import random
import threading
import time
num_runners = 3
finish_line = threading.Barrier(num_runners)
runners = ['Huey', 'Dewey', 'Louie']
def print_(content):
print(f'{w()} {content}')
def run():
runner = runners.pop()
time.sleep(random.randint(1, 3))
print_(f'{runner} reached the finish line.')
print_(f'{runner} index: {finish_line.wait()}')
def main():
threads = []
print_('')
for i in range(num_runners):
threads.append(threading.Thread(target=run))
for i in range(num_runners):
threads[i].start()
for i in range(num_runners):
threads[i].join()
print_('end race.')
if __name__ == '__main__':
main()
使⽤queue.Queue实现线程之间数据共享
queue.Queue
线程安全的队列
put(): 把数据放⼊队列。如果队列满了,就等待。
get(): 从队列中拿出数据。如果队列空了,就等待。
task_done(): 调⽤了get之后,再调⽤task_done,这样在等待中的线程可以继续put
put_nowait(): 把数据放⼊队列。如果队列满了,就报异常。
get_nowait(): 从队列中拿出数据。如果队列空了,就报异常。
⽰例1
import datetime
import threading
import time
from queue import Queue
def print_(content):
print(f'{w()} {content}')
class Producer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
for i in range(20):
self.queue.put(i)
print_(f'Producer {self.name} produce item {i}')
time.sleep(1)
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
print_(f'Consumer {self.name} ')
item = ()
print_(f'Consumer {self.name} consume item {item}')
self.queue.task_done()
if __name__ == '__main__':
queue = Queue()
p_1 = Producer(queue)
c_1 = Consumer(queue)
c_2 = Consumer(queue)
c_3 = Consumer(queue)
p_1.start()
c_1.start()
c_2.start()
c_3.start()
p_1.join()
c_1.join()
c_2.join()
c_3.join()
multiprocessing 进程
multiprocessing.Process 类
init(group=None, target=None, name=None, args=(), kwargs={})
参数
group: 进程组,必须为None,为将来的实现保留的
target: ⽬标函数
name: 进程名
args: ⽬标函数的参数元组
kwargs: ⽬标函数的关键字参数字典
daemon: 是否为后台进程
在windows系统上的结果是:不管是否设置为True,主进程推出⼦进程都会退出。⽬前处理⽅式就是在必须完整执⾏的进程调⽤join()⽅法,其他可以随主进程
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论