python---异步IO(asyncio)协程
简单了解
在py3中内置了asyncio模块。其编程模型就是⼀个消息循环。
模块查看:recv函数
from .base_events import *
from .coroutines import *  #协程模块,可以将函数装饰为协程
from .events import *  #事件模块,事件循环和任务调度都将使⽤到他
from .futures import *   #异步并发模块,该模块对task封装了许多⽅法,代表将来执⾏或没有执⾏的任务的结果。它和task上没有本质上的区别
from .locks import *  #异步保证资源同步
from .protocols import *
from .queues import *
from .streams import *
from .subprocess import *
from .tasks import *  #创建任务,是对协程的封装,可以查看协程的状态。可以将任务集合
from .transports import *
调⽤步骤:
1.当我们给⼀个函数添加了async关键字,或者使⽤utine装饰器装饰,就会把它变成⼀个异步函数。
2.每个线程有⼀个事件循环,主线程调⽤_event_loop时会创建事件循环,
3.将任务封装为集合asyncio.gather(*args),之后⼀起传⼊事件循环中
4.要把异步的任务丢给这个循环的run_until_complete⽅法,事件循环会安排协同程序的执⾏。和⽅法名字⼀样,该⽅法会等待异步的任务完全执⾏才会结束。简单使⽤:
import asyncio,time
@utine  #设为异步函数
def func1(num):
print(num,'before---func1----')
yield from asyncio.sleep(5)
print(num,'after---func1----')
task = [func1(1),func1(2)]
if __name__ == "__main__":
begin = time.time()
loop = _event_loop()  #进⼊事件循环
loop.run_until_complete(asyncio.gather(*task))  #将协同程序注册到事件循环中
loop.close()
end = time.time()
print(end-begin)
1 before---func1----
2 before---func1----
1 after---func1----
2 after---func1----
5.00528621673584
输出结果
定义⼀个协程(不同于上⾯的实例)
import asyncio,time
async def func1(num):  #使⽤async关键字定义⼀个协程,协程也是⼀种对象,不能直接运⾏,需要加⼊事件循环中,才能被调⽤。
print(num,'before---func1----')
if __name__ == "__main__":
begin = time.time()
coroutine = func1(2)
loop = _event_loop()
loop.run_until_complete(coroutine)
loop.close()
end = time.time()
print(end-begin)
func1(2)  #由于使⽤async异步关键字,所以不能直接运⾏
  D:/MyPython/day25/mq/multhread.py:15: RuntimeWarning: coroutine 'func1' was never awaited
    func1(2)
print(type(func1),type(coroutine))  #<class 'function'> <class 'coroutine'>
同:
我们可以使⽤send(None)调⽤协程(这⾥不这么使⽤),这⾥是将协程放⼊事件循环中进⾏处理
coroutine = func1(2)
try:
coroutine.send(None)
except StopIteration:
pass
创建⼀个任务(对协程进⼀步封装,可以查看状态等)
协程对象不能直接运⾏,在注册事件循环的时候,其实是run_until_complete⽅法将协程包装成为了⼀个任务(task)对象.
task对象是Future类的⼦类,保存了协程运⾏后的状态,⽤于未来获取协程的结果
run_until_complete⽅法查看:
class BaseEventLoop(events.AbstractEventLoop):
def run_until_complete(self, future):
"""Run until the Future is done.
If the argument is a coroutine, it is wrapped in a Task.
WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.
Return the Future's result, or raise its exception.
"""
self._check_closed()
new_task = not futures.isfuture(future)
future = sure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
# is no need to log the "destroy pending task" message
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
# The coroutine raised a BaseException. Consume the exception
# to not log a warning, the caller doesn't have access to the
# local task.
raise
finally:
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
sult()
由源码可以知道,在协程注册后会被⾃动封装为task任务。所以我们不是必须传⼊task。但是去创建⼀个task对象,有利于我们理解协程的状态。
import asyncio,time
async def func1(num):
print(num,'before---func1----')
if __name__ == "__main__":
begin = time.time()
coroutine = func1(2)
loop = _event_loop()
task = ate_task(coroutine)  #创建了任务
print(task) #pending
loop.run_until_complete(task)
loop.close()
print(task) #finished
end = time.time()
print(end-begin)
对于协程的4种状态:
print(task) #pending
print(getcoroutinestate(coroutine))
loop.run_until_complete(task)
loop.close()
print(task) #finished
print(getcoroutinestate(coroutine))
CORO_CREATED
2 before---func1----
<Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/multhread.py:4> result=None>
CORO_CLOSED
深⼊了解:
条件使⽤ensure_future,他是最外层函数,其中调⽤了create_task()⽅法,功能全⾯,⽽Task官⽅不推荐直接使⽤
绑定回调add_done_callback
async def func1(num):
print(num,'before---func1----')
return "recv num %s"%num
def callback(future):
sult())
if __name__ == "__main__":
begin = time.time()
coroutine1 = func1(1)
loop = _event_loop()
sure_future(coroutine1)
task1.add_done_callback(callback)
loop.run_until_complete(task1)
loop.close()
end = time.time()
print(end-begin)
1 before---func1----
recv num 1
0.004000186920166016
可以看到,coroutine执⾏结束时候会调⽤回调函数。并通过参数future获取协程执⾏的结果。我们创建的task和回调⾥的future对象,实际上是同⼀个对象。
我也可以不使⽤回调函数,单纯获取返回值
当task状态为finished时候,我们可以直接使⽤result⽅法(在future模块)获取返回值
async def func1(num):
print(num,'before---func1----')
return"recv num %s"%num
if __name__ == "__main__":
begin = time.time()
coroutine1 = func1(1)
loop = _event_loop()
sure_future(coroutine1)
loop.run_until_complete(task1)
print(task1)
sult())
loop.close()
end = time.time()
print(end-begin)
1 before---func1----
<Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/multhread.py:6> result='recv num 1'>
recv num 1
0.0030002593994140625
阻塞和await
使⽤async关键字定义的协程对象,使⽤await可以针对耗时的操作进⾏挂起(是⽣成器中的yield的替代,但是本地协程函数不允许使⽤),让出当前控制权。协程遇到await,事件循环将会挂起该协程,执⾏别的协程,直到其他协程也挂起,或者执⾏完毕,在进⾏下⼀个协程的执⾏
使⽤asyncio.sleep模拟阻塞操作。
import asyncio,time
async def func1(num):
print(num,'before---func1----')
await asyncio.sleep(num)
return"recv num %s"%num
if __name__ == "__main__":
begin = time.time()
coroutine1 = func1(5)
coroutine2 = func1(3)
loop = _event_loop()
sure_future(coroutine1)
sure_future(coroutine2)
tasks = asyncio.gather(*[task1,task2])    #gather可以实现同时注册多个任务,实现并发操作。wait⽅法使⽤⼀致
loop.run_until_complete(tasks)
loop.close()
end = time.time()
print(end-begin)
并发:使⽤gather或者wait可以同时注册多个任务,实现并发
gather:Return a future aggregating results from the given coroutines or futures.  返回结果
sure_future(coroutine1)
sure_future(coroutine2)
tasks = asyncio.gather(*[task1,task2])
loop.run_until_complete(tasks)
wait:Returns two sets of Future: (done, pending).   #返回dones是已经完成的任务,pending是未完成的任务,都是集合类型
sure_future(coroutine1)
sure_future(coroutine2)
tasks = asyncio.wait([task1,task2])
loop.run_until_complete(tasks)
Usage:
done, pending = yield from asyncio.wait(fs)
wait是接收⼀个列表,⽽后gather是接收⼀堆任务数据。
两者的返回值也是不同的
协程嵌套,将多个协程封装到⼀个主协程中
import asyncio,aiohttp
async def fetch_async(url):
print(url)
async with aiohttp.ClientSession() as session:
async (url) as resp:
print(resp.status)
print(())
tasks = [fetch_async('www.baidu/'), fetch_async('wwwblogs/ssyfj/')]
event_loop = _event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()
关于aiohttp模块的协程嵌套,嵌套更加明显
import asyncio,time
async def func1(num):
print(num,'before---func1----')
await asyncio.sleep(num)
return"recv num %s"%num
async def main():
coroutine1 = func1(5)
coroutine2 = func1(3)
coroutine3 = func1(4)
tasks = [
]
dones, pendings = await asyncio.wait(tasks)
for task in dones:  #对已完成的任务集合进⾏操作
print("Task ret: ",sult())
if __name__ == "__main__":
begin = time.time()
loop = _event_loop()
loop.run_until_complete(main())
loop.close()
end = time.time()
print(end-begin)
5 before---func1----
3 before---func1----
4 before---func1----
Task ret:  recv num 4
Task ret:  recv num 5
Task ret:  recv num 3
5.000285863876343
也可以直接使⽤gather直接获取值
results = await asyncio.gather(*tasks)
for result in results:
print("Task ret: ",result)
我们也可以不在main中处理结果,⽽是返回到主调⽤⽅进⾏处理async def main():
coroutine1 = func1(5)
coroutine2 = func1(3)
coroutine3 = func1(4)
tasks = [
]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
begin = time.time()
loop = _event_loop()
results = loop.run_until_complete(main())
for result in results:
print("Task ret: ",result)
loop.close()
end = time.time()
print(end-begin)
或者使⽤wait挂起

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。