Python多线程教程:并发与并⾏
在批评Python的讨论中,常常说起Python多线程是多么的难⽤。还有⼈对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运⾏。因此,如果你是从其他语⾔(⽐如C++或Java)转过来的话,Python线程模块并不会像你想象的那样去运⾏。必须要说明的是,我们还是可以⽤Python写出能并发或并⾏的代码,并且能带来性能的显著提升,只要你能顾及到⼀些事情。如果你还没看过的话,我建议你看看Eqbal Quran的⽂章《Ruby中的并发和并⾏》。
在本⽂中,我们将会写⼀个⼩的Python脚本,⽤于下载Imgur上最热门的图⽚。我们将会从⼀个按顺序下载图⽚的版本开始做起,即⼀个⼀个地下载。在那之前,你得注册⼀个Imgur上的应⽤。如果你还没有Imgur账户,请先注册⼀个。
本⽂中的脚本在Python3.4.2中测试通过。稍微改⼀下,应该也能在Python2中运⾏——urllib是两个版本中区别最⼤的部分。
快速使⽤Romanysoft LAB的技术实现 HTML 开发Mac OS App,并销售到苹果应⽤商店中。
《HTML开发Mac OS App 视频教程》
⼟⾖⽹同步更新:
百度⽹盘同步:
分享  [中⽂纪录⽚]互联⽹时代
官⽅QQ:(申请加⼊,说是我推荐的)
App实践出真知434558944
App学习交流452180823
1、开始动⼿
让我们从创建⼀个叫“download.py”的Python模块开始。这个⽂件包含了获取图⽚列表以及下载这些图⽚所需的所有函数。我们将这些功能分成三个单独的函数:
get_links
download_link
setup_download_dir
第三个函数,“setup_download_dir”,⽤于创建下载的⽬标⽬录(如果不存在的话)。
Imgur的API要求HTTP请求能⽀持带有client ID的“Authorization”头部。你可以从你注册的Imgur应⽤的⾯板上到这个client ID,⽽响应会以JSON进⾏编码。我们可以使⽤Python的标准JSON库去解码。下载图⽚更简单,你只需要根据它们的URL获取图⽚,然后写⼊到⼀个⽂件即可。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19import json
import logging
import os
from pathlib import Path
quest import urlopen, Request
logger =Logger(__name__)
def get_links(client_id):
headers ={'Authorization': 'Client-ID {}'.format(client_id)}
req =Request('api.imgur/3/gallery/', headers=headers, method='GET')  with urlopen(req) as resp:
data =json.adall().decode('utf-8'))
return map(lambda item: item['link'], data['data'])
def download_link(directory, link):
logger.info('Downloading %s', link)
download_path =directory /os.path.basename(link)
with urlopen(link) as image, download_path.open('wb') as f:
f.adall())
19 20 21 22 23 24 25 26      f.adall())
def setup_download_dir():
download_dir =Path('images')  if not ists():      download_dir.mkdir()
return download_dir
接下来,你需要写⼀个模块,利⽤这些函数去逐个下载图⽚。我们给它命名为“single.py”。它包含了我们最原始版本的Imgur图⽚下载器的主要函数。这个模块将会通过环境变量“IMGUR_CLIENT_ID”去获取Imgur的client ID。它将会调⽤“setup_download_dir”去创建下载⽬录。最后,使⽤get_links函数去获取图⽚的列表,过滤掉所有的GIF和专辑URL,然后⽤“download_link”去将图⽚下载并保存在磁盘中。下⾯
是“single.py”的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23import logging
import os
from time import time
from download import setup_download_dir, get_links, download_link
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') Logger('requests').setLevel(logging.CRITICAL)
logger =Logger(__name__)
def main():
ts =time()
client_id =os.getenv('IMGUR_CLIENT_ID')
navicat离线激活
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir =setup_download_dir()
links =[l for l in get_links(client_id) dswith('.jpg')]
for link in links:
download_link(download_dir, link)
print('Took {}s'.format(time() -ts))
if__name__ =='__main__':
main()
注:为了测试⽅便,上⾯两段代码可以⽤如下代码替代演⽰:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25# coding=utf-8
#测试utf-8编码
from time import sleep, time
import sys, threading
reload(sys)
sys.setdefaultencoding('utf-8')
def getNums(N):
return xrange(N)
def processNum(num):
num_add =num +1
sleep(1)
print str(threading.current_thread()) +": "+str(num) +" → "+str(num_add)
if__name__ =="__main__":
t1 =time()
for i in getNums(3):
processNum(i)
print"cost time is: {:.2f}s".format(time() -t1)
结果:
webservice搭建
1 2 3<_MainThread(MainThread, started 4436)>: 0 → 1 <_MainThread(MainThread, started 4436)>: 1 → 2 <_MainThread(MainThread, started 4436)>: 2 → 3
3 4<_MainThread(MainThread, started 4436)>: 2 → 3 cost time is: 3.00s
在我的笔记本上,这个脚本花了19.4秒去下载91张图⽚。请注意这些数字在不同的⽹络上也会有所不同。19.4秒并不是⾮常的长,但是如果我们要下载更多的图⽚怎么办呢?或许是900张⽽不是90张。平均下载⼀张图⽚要0.2秒,900张的话⼤概需要3分钟。那么9000张图⽚将会花掉30分钟。好消息是使⽤了并发或者并⾏后,我们可以将这个速度显著地提⾼。
接下来的代码⽰例将只会显⽰导⼊特有模块和新模块的import语句。所有相关的Python脚本都可以在这⽅便地到。
2、使⽤线程
线程是最出名的实现并发和并⾏的⽅式之⼀。操作系统⼀般提供了线程的特性。线程⽐进程要⼩,⽽且共享同⼀块内存空间。
在这⾥,我们将写⼀个替代“single.py”的新模块。它将创建⼀个有⼋个线程的池,加上主线程的话总共就是九个线程。之所以是⼋个线程,是因为我的电脑有8个CPU内核,⽽⼀个⼯作线程对应⼀个内核看起来还不错。在实践中,线程的数量是仔细考究的,需要考虑到其他的因素,⽐如在同⼀台机器上跑的的其他应⽤和服务。
python入门教程视屏
下⾯的脚本⼏乎跟之前的⼀样,除了我们现在有个新的类,DownloadWorker,⼀个Thread类的⼦类。运⾏⽆限循环的run⽅法已经被重写。在每次迭代时,它调⽤“()”试图从⼀个线程安全的队列⾥获取⼀个URL。它将会⼀直堵塞,直到队列中出现⼀个要处理元素。⼀旦⼯作线程从队列中得到⼀个元素,它将会调⽤之前脚本中⽤来下载图⽚到⽬录中所⽤到的“download_link”⽅法。下载完成之后,⼯作线程向队列发送任务完成的信号。这⾮常重要,因为队列⼀直在跟踪队列中的任务数。如果⼯作线程没有发出任务完成的信
号,“queue.join()”的调⽤将会令整个主线程都在阻塞状态。
1
2
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42from queue import Queue
from threading import Thread
class DownloadWorker(Thread):
def__init__(self, queue):
Thread.__init__(self)
self.queue =queue
def run(self):
while True:
# Get the work from the queue and expand the tuple
# 从队列中获取任务并扩展tuple
directory, link =()
download_link(directory, link)
self.queue.task_done()
def main():
ts =time()
client_id =os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir =setup_download_dir()
links =[l for l in get_links(client_id) dswith('.jpg')]
# Create a queue to communicate with the worker threads
queue =Queue()
# Create 8 worker threads
# 创建⼋个⼯作线程
for x in range(8):
worker =DownloadWorker(queue)
# Setting daemon to True will let the main thread exit even though the workers are blocking        # 将daemon设置为True将会使主线程退出,即使worker都阻塞了
worker.daemon =True
worker.start()
# Put the tasks into the queue as a tuple
# 将任务以tuple的形式放⼊队列中
for link in links:
logger.info('Queueing {}'.format(link))
queue.put((download_dir, link))
# Causes the main thread to wait for the queue to finish processing all the tasks
# 让主线程等待队列完成所有的任务
queue.join()
print('Took {}'.format(time() -ts))
注:为了测试⽅便,上⾯的代码可以⽤如下代码替代演⽰:1
1 2
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48# coding=utf-8
#测试utf-8编码
from Queue import Queue
from threading import Thread
from single import*
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
sql语句中distinct是什么意思
class ProcessWorker(Thread):
def__init__(self, queue):
Thread.__init__(self)
php empty判断0self.queue =queue
def run(self):
while True:
# Get the work from the queue
num =()
processNum(num)
self.queue.task_done()
def main():
ts =time()
nums =getNums(4)
# Create a queue to communicate with the worker threads
queue =Queue()
# Create 4 worker threads
# 创建四个⼯作线程
for x in range(4):
worker =ProcessWorker(queue)
# Setting daemon to True will let the main thread exit even though the workers are blocking        # 将daemon设置为True将会使主线程退出,即使worker都阻塞了
worker.daemon =True
worker.start()
# Put the tasks into the queue
for num in nums:
queue.put(num)
# Causes the main thread to wait for the queue to finish processing all the tasks
# 让主线程等待队列完成所有的任务
queue.join()
print("cost time is: {:.2f}s".format(time() -ts))
if__name__ =="__main__":
main()
结果:
1 2 3 4 5<ProcessWorker(Thread-4, started daemon 3900)>: 3 → 4<ProcessWorker(Thread-1, started daemon 3436)>: 2 → 3<ProcessWorker(Thread-3, started daemon 4576)>: 1 → 2
<ProcessWorker(Thread-2, started daemon 396)>: 0 → 1
cost time is: 1.01s
在同⼀个机器上运⾏这个脚本,下载时间变成了4.1秒!即⽐之前的例⼦快4.7倍。虽然这快了很多,但还是要提⼀下,由于GIL的缘故,在这个进程中同⼀时间只有⼀个线程在运⾏。因此,这段代码是并发的但不是并⾏的。⽽它仍然变快的原因是这是⼀个IO密集型的任务。进程下载图⽚时根本毫不费⼒,⽽主要的时间都花在了等待⽹络上。这就是为什么线程可以提供很⼤的速度提升。每当线程中的⼀个准备⼯作时,进程可以不断转换线程。使⽤Python或其他有GIL的解释型语⾔中的线程模块实际上会降低性能。如果你的代码执⾏的是CPU密集型的任务,例如解压gzip⽂件,使⽤线程模块将会导致执⾏时间变长。对于CPU密集型任务和真正的并⾏执⾏,我们可以使⽤多进程(multiprocessing)模块。
官⽅的Python实现——CPython——带有GIL,但不是所有的Python实现都是这样的。⽐如,IronPython,使⽤.NET框架实现的Python就没有GIL,基于Java实现的Jython也同样没有。你可以查看现有的Python实现。
3、⽣成多进程
多进程模块⽐线程模块更易使⽤,因为我们不需要像线程⽰例那样新增⼀个类。我们唯⼀需要做的改变在主函数中。
表达式int为了使⽤多进程,我们得建⽴⼀个多进程池。通过它提供的map⽅法,我们把URL列表传给池,然后8个新进程就会⽣成,它们将并⾏地去下载图⽚。这就是真正的并⾏,不过这是有代价的。整个脚本的内存将会被拷贝到各个⼦进程中。在我们的例⼦中这不算什么,但是在⼤型程序中它很容易导致严重的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14from functools import partial
from multiprocessing.pool import Pool
def main():
ts =time()
client_id =os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")  download_dir =setup_download_dir()
links =[l for l in get_links(client_id) dswith('.jpg')]
download =partial(download_link, download_dir)
with Pool(8) as p:
p.map(download, links)
print('Took {}s'.format(time() -ts))
注:为了测试⽅便,上⾯的代码可以⽤如下代码替代演⽰:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23# coding=utf-8
#测试utf-8编码
from functools import partial
from multiprocessing.pool import Pool
from single import*
from time import time
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
def main():
ts =time()
nums =getNums(4)
p =Pool(4)
p.map(processNum, nums)
print("cost time is: {:.2f}s".format(time() -ts)) if__name__ =="__main__":
main()
结果:
1 2 3 4 5<_MainThread(MainThread, started 6188)>: 0 → 1
<_MainThread(MainThread, started 3584)>: 1 → 2
<_MainThread(MainThread, started 2572)>: 3 → 4<_MainThread(MainThread, started 4692)>: 2 → 3 cost time is: 1.21s
4、分布式任务
你已经知道了线程和多进程模块可以给你⾃⼰的电脑跑脚本时提供很⼤的帮助,那么在你想要在不同的机器上执⾏任务,或者在你需要扩⼤规模⽽超过⼀台机器的的能⼒范围时,你该怎么办呢?⼀个很好的使⽤案例是⽹络应⽤的长时间后台任务。如果你有⼀些很耗时的任务,你不会希望在同⼀台机器上占⽤⼀些其他的应⽤代码所需要的⼦进程或线程。这将会使你的应⽤的性能下降,影响到你的⽤户们。如果能在另外⼀台甚⾄很多台其他的机器上跑这些任务就好了。
Python库⾮常适⽤于这类任务。它是⼀个简单却很强⼤的库。⾸先将⼀个函数和它的参数放⼊队列中。
它将函数调⽤的表⽰,然后将这些表⽰添加到⼀个列表中。任务进⼊队列只是第⼀步,什么都还没有做。我们⾄少还需要⼀个能去监听任务队列的worker(⼯作线程)。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。