Python 之数据流(stream )
本⽂参考Python 官⽅⽂档针对官⽅⽂档⽰例进⾏解析,解析不完整只为了便于理解
流是⽤于处理⽹络连接的⾼级async/await-ready 原语。流允许发送和接收数据,⽽不需要使⽤回调或低级协议和传输。
Stream 下⾯的⾼级 asyncio 函数可以⽤来创建和处理
流:coroutine asyncio.open_connection (host=None , port=None , *, loop=None , limit=None , ssl=None , family=0, proto=0, flags=0, sock=None , local_addr=None , server_hostname=None , ssl_handshake_timeo 建⽴⽹络连接并返回⼀对 (reader, writer) 对象。
返回的 reader 和 writer 对象是 和 类的实例。
注意:使⽤ayncio.open_connection()⽅法创建和处理流时只有在await 时才返回reader 和writer
对象为了⽅便测试我们在本地搭建⼀个nginx 服务器,⾸页index.html 内容为“Hello World”
⽰例:
1
234567891011121314151617
import asyncio
async def wget(host):
connect = asyncio.open_connection(host,80)
print(type(connect))
reader,writer = await connect print(type(reader),type(writer)) async def main(): # 获取表头主机列表 hosts = ['192.168.1.100'] # 根据主机列表获取⼀个tasks 列表
tasks = [ate_task(wget(host)) for host in hosts]
# 等待任务列表执⾏结果
await asyncio.gather(*tasks)
# 运⾏asyncio.run(main()
运⾏输出如下
1
2
3
4
5
6
7
8
910111213
<class 'coroutine'><class 'asyncio.streams.StreamReader'> <class 'asyncio.streams.StreamWriter'>Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000236AA956F70>Traceback (most recent call last): File "C:\ProgramData\Anaconda3\lib\asyncio\proactor_events.py", line 116, in __del__ self.close() File "C:\ProgramData\Anaconda3\lib\asyncio\proactor_events.py", line 108, in close self._loop.call_soon(self._call_connection_lost, None) File "C:\ProgramData\Anaconda3\lib\asyncio\base_events.py", line 719, in call_soon
self._check_closed()
File "C:\ProgramData\Anaconda3\lib\asyncio\base_events.py", line 508, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
解析:运⾏报错是因为获取了writer 对象但是并没有写数据
打印类型可以看到
完善代码使⽤writer对象发送请求⾄服务器,然后reader对象就可以收取服务器发送过来的数据1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28# asyncio.open_connection创建数据流 start
import asyncio
async def wget(host):
connect =asyncio.open_connection(host,80)
print(type(connect))
reader,writer =await connect
print(type(reader),type(writer))
# 定义请求头部,格式是固定格式
header ='GET / HTTP/1.0\r\n Host:{0}\r\n\r\n'.format(host)
# 通过writer对象往http服务器发送请求,请求是⼆进制格式的需要使⽤encode()⽅法编码
writer.de('utf-8'))
# writer.write⽅法需要与drain()⽅法⼀起使⽤
await writer.drain()
# 阻塞获取服务器发送过来的所有数据,read()⽅法⼀次性获取所有数据,数据多可以使⽤readline()⽅法⼀⾏⾏获取 data =ad()
# 打印获取的数据,获取数据为⼆进制格式不加decode()解码则打印原始数据
print(data.decode())
# 关闭writer需要和writer.wait_closed()⼀起使⽤,这⾥可以省略
writer.close()
await writer.wait_closed()
async def main():
hosts =['192.168.1.100']
tasks =[ate_task(wget(host)) for host in hosts]
await asyncio.gather(*tasks)
asyncio.run(main())
# asyncio.open_connection创建数据流 end
输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13<class'coroutine'>
<class'asyncio.streams.StreamReader'> <class'asyncio.streams.StreamWriter'> HTTP/1.1200OK
Server: nginx/1.14.0
Date: Sat, 30Oct202109:37:17GMT
Content-Type: text/html
Content-Length: 12
Last-Modified: Fri, 29Oct202107:41:00GMT
Connection: close
ETag: "617ba58c-c"
Accept-Ranges: bytes
Hello World
本次代码演⽰了连接http服务器并且向服务器发送⼀个GET请求,服务器收到GET请求以后把数据返回给客户,然后通过reader对象获取到服务器发送过来的数据。 注意:本次发送的是⼀个GET请求,格式是固定的
本次发送的完整数据为
1'GET / HTTP/1.0\r\n Host:192.168.1.100\r\n\r\n'
对应关系如下图
拆分解析如下
1
234567891011
GET / HTTP/1.0\r\n Host:192.168.1.100\r\n\r\n
GET #请求⽅法为GET # 空格/ # 请求URL 为/即根⽬录HTTP/1.0 # 协议版本\r\n # 回车符和换⾏符Host # 头部字段为Host
: # 固定格式的符号:
192.168.1.100 # Host 的值,即本次请求的主机值
\r\n # 请求头部的回车符和换⾏符\r\n # 最后的回车符和换⾏符 使⽤流的TCP 使⽤流的TCP 回显客户端
tcp_stream_client.py
1
234567891011121314151617181920
import asyncio # 回显客户端协程函数,传递参数message 发送给服务器端,服务器端接收信息原样返回async def tcp_echo_client(message): # 创建reader ,writer 对象分别⽤于接收和发送信息 reade
r,writer = await asyncio.open_connection('127.0.0.1',8888) print(f'Send:{message!r}')
# 往服务器端写信息,需要编码后发送
writer.de())
# await writer.drain() # 从服务器端读取信息读取100个字节 data = ad(100) # 打印解码后的信息
print(f'Received:{data.decode()!r}')
print('Close the connection')
# 关闭 writer.close() # await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
注意:这⾥没有使⽤writer.drain()和writer.wait_closed()也可以
使⽤流的TCP 回显服务器端
tcp_stream_server.py
1
2
3
456import asyncio
# 启动服务后当客户端建⽴新连接时调⽤该函数
# 接受参数为reader ,writer
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43# reader是类StreamReader的实例,⽽writer是类StreamWriter的实例
# 即客户端和服务器端的reader和writer是⼀⼀对应的,分别⽤于接收对⽅数据流和往对⽅发送数据流
async def handle_echo(reader, writer):
# 服务器从客户端读取信息
# 即客户端通过writer往服务器写的信息
data =ad(100)
# 信息解码
message =data.decode()
# 该⽅法获取客户端的ip地址信息
addr =_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
# 服务器端把从客户端读取的信息⼜发送给客户端
writer.write(data)
await writer.drain()
# 关闭连接
python index函数print("Close the connection")
writer.close()
async def main():
# start_server()⽅法启动套接字服务,返回⼀个server对象
# 当⼀个新的客户端连接被建⽴时,回调函数会被调⽤。该函数会接收到⼀对参数(reader,writer)
# reader是类StreamReader的实例,⽽writer是类StreamWriter的实例
# client_connected_cb 即可以是普通的可调⽤对象也可以是⼀个协程函数; 如果它是⼀个协程函数,它将⾃动作为 Task 被调度。 server =await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
# 以下⽅法可以获取启动的ip地址和端⼝信息返回⼀个元组其实即使start_server⽅法传递的ip和端⼝信息('127.0.0.1',8888)
addr =server.sockets[0].getsockname()
print(f'Serving on {addr}')
# 启动服务端
# Server对象是异步上下⽂管理器。当⽤于async with语句时,异步上下⽂管理器可以确保Server对象被关闭
# 并且在async with完成后不接受新的连接。
async with server:
# server_forver()⽅法
# 开始接受连接,直到协程被取消。server_forever任务的取消将导致服务器被关闭
await server.serve_forever()
asyncio.run(main())
打开两个窗⼝,先启动服务器端
服务器端开启了8888端⼝等待客户端连接
运⾏客户端
运⾏客户端的时候客户端和服务器端建⽴了连接才启动协程函数handle_echo
服务器端接收的信息为
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论