python︱⽤asyncio、aiohttp实现异步及相关案例
Asyncio 是并发(concurrency)的⼀种⽅式。对 Python 来说,并发还可以通过线程(threading)和多进程(multiprocessing)来实现。Asyncio 并不能带来真正的并⾏(parallelism)。当然,因为 GIL(全局解释器锁)的存在,Python 的多线程也不能带来真正的并⾏。
.
⼀、asyncio的异步
主要来源:
1、定义协程
import asyncio
async def hello1():
print("1, Hello world!")
#r = await asyncio.sleep(1)
print("1, Hello again!")
for i in range(5):
print(i)
async def hello2():
print("2, Hello world!")
#r = await asyncio.sleep(1)
print("2, Hello again!")
for i in range(5,10):
print(i)
协程于我的理解是跟yield ⼀致的,协程可以做哪些事。协程可以:
等待⼀个 future 结束
等待另⼀个协程(产⽣⼀个结果,或引发⼀个异常)
产⽣⼀个结果给正在等它的协程
引发⼀个异常给正在等它的协程
.
2、运⾏协程
要让这个协程对象运⾏的话,有两种⽅式:
在另⼀个已经运⾏的协程中⽤ await 等待它
通过 ensure_future 函数计划它的执⾏
简单来说,只有 loop 运⾏了,协程才可能运⾏。
async def doSomething():
.
..
pass
async def other_field():
await doSomething()
...
pass
async: 写在你要指定异步的⽅法def之前,等同于@utine
await: 写在调⽤此⽅法前,等同于yield from
# 执⾏
loop = _event_loop()
loop.run_until_complete(hello1())
>>> 1, Hello world!
>>> 1, Hello again!
>>> 0
>>> 1
>>> 2
>>> 3
>>> 4
.
3、回调函数
回调函数, 执⾏且按照顺序, 假如协程是⼀个 IO 的读操作,等它读完数据后,我们希望得到通知,以便下⼀步数据的处理。这⼀需求可以通过往 future 添加回调来实现。
def done_callback1(futu): # futu是异步的函数名称
print('Done1')
def done_callback2(futu):
print('Done2')
futu = sure_future(hello1())
futu.add_done_callback(done_callback1)
futu = sure_future(hello2())
futu.add_done_callback(done_callback2)
loop.run_until_complete(futu)
>>> 1, Hello world!
>>> 1, Hello again!
>>> 0
>>> 1
>>> 2
>>> 3
>>> 4
>>> 2, Hello world!
>>> 2, Hello again!
>>> 5
>>> 6
>>> 7
>>> 8
>>> 9
>>> Done1
>>> Done2
.
4、多个协程共同运⾏run_until_complete
有以下三种写法:
# 多个协程同步执⾏
# 第⼀种写法
loop.run_until_complete(asyncio.gather(hello1(), hello2()))
# 第⼆种写法
coros = [hello1(), hello2()]
await和async使用方法loop.run_until_complete(asyncio.gather(*coros))
# 第三种写法
futus = [sure_future(hello1()),
loop.run_until_complete(asyncio.gather(*futus))
gather 起聚合的作⽤,把多个 futures 包装成单个 future,因为 loop.run_until_complete 只接受单个 future。
run_until_complete 是⼀个阻塞(blocking)调⽤,直到协程运⾏结束,它才返回。这⼀点从函数名不难看出。
run_until_complete 的参数是⼀个 future,但是我们这⾥传给它的却是协程对象,之所以能这样,是因为它在内部做了检查,通过ensure_future 函数把协程对象包装(wrap)成了 future。
.
5、⼀直执⾏的run_forever
import functools
from retry import retry
ptions import ConnectTimeout
async def do_some_work(loop, x):
print('Waiting ' + str(x))
await asyncio.sleep(x)
print ('Done')
def done_callback(loop, futu):
loop.stop()
loop = _event_loop()
futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3))
futus.add_done_callback(functools.partial(done_callback, loop))
loop.run_forever()
>>> Waiting 3
>>> Waiting 1
>>> Done
>>> Done
loop.run_forever()执⾏之后,会按照顺序执⾏,完成之后,程序不会关闭,仍然处于开启状态。
来⼀个例⼦:
@utine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(2.0)
return x + y
@utine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = _event_loop()
tasks = [print_sum(1, 2), print_sum(3, 4), print_sum(5, 6)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
# OUTPUT
Compute 3 + 4 ...
Compute 5 + 6 ...
Compute 1 + 2 ...
# ⼤约 1 秒以后
3 +
4 = 7
5 +
6 = 11
1 +
2 = 3
.
⼆、aiohttp
asyncio可以实现单线程并发IO操作。如果仅⽤在客户端,发挥的威⼒不⼤。如果把asyncio⽤在服务器端,例如Web服务器,由于HTTP 连接就是IO操作,因此可以⽤单线程+coroutine实现多⽤户的⾼并发⽀持。()
后续内容主要来源:
.
1、基本⽤法
with aiohttp.Timeout(0.001):
async ('github') as r:
(encoding='windows-1251')
aiohttp中设置了timeout,请求了github中的内容。
.
2、session获取数据
aiohttp.ClientSession. ⾸先要建⽴⼀个session对象,然后⽤该session对象去打开⽹页。session可以进⾏多项操作,⽐如post, get, put, head。
async with aiohttp.ClientSession() as session:
async ('api.github/events') as resp:
print(resp.status)
print(())
如果要使⽤post⽅法,则相应的语句要改成
session.post('/post', data=b'data')
官⽅例⼦:
import asyncio
import async_timeout
async def fetch(session, url):
with async_timeout.timeout(10):
async (url) as response:
return ()
async def main(loop):
async with aiohttp.ClientSession(loop=loop) as session:
html = await fetch(session, '')
print(html)
loop = _event_loop()
loop.run_until_complete(main(loop))
.
三、asyncio、aiohttp结合案例
1、采集ReadHub案例
此案例来源:
Sanic是⼀个异步框架,为了更好的发挥它的性能,有些操作最好也要⽤异步的, ⽐如这⾥发起请求就必须要⽤异步请求框架aiohttp
import aiohttp
async def get_news(size=10):
all_news, readhub_api = [], "/topic"
# # conn = aiohttp.ProxyConnector(proxy="127.0.0.1:8087")
async with aiohttp.ClientSession() as client:
headers = {'content-type': 'application/json'}
params = {'pageSize': size}
async (readhub_api, params=params, headers=headers) as response: # 启动
assert response.status == 200
result = await response.json() # 获取的数据
for value ('data', []): # 稍微整理数据
each_data = {}
each_data['title'] = ('title')
each_data['summary'] = ('summary')
each_data['news_info'] = ('newsArray')
each_data['updated_at'] = ('updatedAt')
all_news.append(each_data)
return all_news
async def index_json():
nums = 2
# 获取数据
all_news = await get_news()
return all_news
<中,可以设置参数params ,以及⾃定义header头(需以dict的形式),当然也可以设置代理conn 。
在index_json()函数中,可以了解到,如何在另⼀个协程中使⽤前⾯⼀个协程,可以使⽤await ,且await 只在async (异步)中才有效。参考:
.
2、asyncio并发
来源:
并发和并⾏⼀直是容易混淆的概念。并发通常指有多个任务需要同时进⾏,并⾏则是同⼀时刻有多个任务执⾏。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论