Python实现串⼝通信(pyserial)
pyserial模块封装了对串⼝的访问,兼容各种平台。
安装
pip insatll pyserial
初始化
简单初始化⽰例
import serial
ser = serial.Serial('com1', 9600, timeout=1)
所有参数
ser = serial.Serial(
port=None,              # number of device, numbering starts at
# zero. if everything fails, the user
# can specify a device string, note
# that this isn't portable anymore
# if no port is specified an unconfigured
# an closed serial port object is created
baudrate=9600,          # baud rate
bytesize=EIGHTBITS,    # number of databits
parity=PARITY_NONE,    # enable parity checkingpython怎么读取串口数据
stopbits=STOPBITS_ONE,  # number of stopbits
timeout=None,          # set a timeout value, None for waiting forever
xonxoff=0,              # enable software flow control
rtscts=0,              # enable RTS/CTS flow control
interCharTimeout=None  # Inter-character timeout, None to disable
)
不同平台下初始化
ser=serial.Serial("/dev/ttyUSB0",9600,timeout=0.5) #使⽤USB连接串⾏⼝
ser=serial.Serial("/dev/ttyAMA0",9600,timeout=0.5) #使⽤树莓派的GPIO⼝连接串⾏⼝
ser=serial.Serial(1,9600,timeout=0.5)#winsows系统使⽤com1⼝连接串⾏⼝
ser=serial.Serial("com1",9600,timeout=0.5)#winsows系统使⽤com1⼝连接串⾏⼝
ser=serial.Serial("/dev/ttyS1",9600,timeout=0.5)#Linux系统使⽤com1⼝连接串⾏⼝
serial.Serial类(另外初始化的⽅法)
class serial.Serial()
{
def__init__(port=None, baudrate=9600, bytesize=EIGHTBITS,parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=None, xonxoff=False, rtscts=False, writeTimeout=None, dsrdtr=False, interCharTimeout=None) }
ser对象属性
name:设备名字
port:读或者写端⼝
baudrate:波特率
bytesize:字节⼤⼩
parity:校验位
stopbits:停⽌位
timeout:读超时设置
writeTimeout:写超时
xonxoff:软件流控
rtscts:硬件流控
dsrdtr:硬件流控
interCharTimeout:字符间隔超时
ser对象常⽤⽅法
ser.isOpen():查看端⼝是否被打开。
ser.open() :打开端⼝‘。
ser.close():关闭端⼝。
ser.write("hello"):向端⼝写数据。
in_waiting():返回接收缓存中的字节数。
flush():等待所有数据写出。
flushInput():丢弃接收缓存中的所有数据。
flushOutput():终⽌当前写操作,并丢弃发送缓存中的数据。
封装参考
import serial
ls.list_ports
class Communication():
#初始化
def__init__(self,com,bps,timeout):
self.port = com
self.bps = bps
self.timeout =timeout
global Ret
try:
# 打开串⼝,并得到串⼝对象
self.main_engine= serial.Serial(self.port,self.bps,timeout=self.timeout)
# 判断是否打开成功
if (self.main_engine.is_open):
Ret = True
except Exception as e:
print("---异常---:", e)
# 打印设备基本信息
def Print_Name(self):
print(self.main_engine.name) #设备名字
print(self.main_engine.port)#读或者写端⼝
print(self.main_engine.baudrate)#波特率
print(self.main_engine.bytesize)#字节⼤⼩
print(self.main_engine.parity)#校验位
print(self.main_engine.stopbits)#停⽌位
print(self.main_engine.timeout)#读超时设置
print(self.main_engine.writeTimeout)#写超时
print(self.ff)#软件流控
print(self.scts)#软件流控
print(self.main_engine.dsrdtr)#硬件流控
print(self.main_engine.interCharTimeout)#字符间隔超时
#打开串⼝
def Open_Engine(self):
self.main_engine.open()
#关闭串⼝
def Close_Engine(self):
self.main_engine.close()
print(self.main_engine.is_open)  # 检验串⼝是否打开
# 打印可⽤串⼝列表
@staticmethod
def Print_Used_Com():
port_list = ls.list_portsports())
print(port_list)
#接收指定⼤⼩的数据
#从串⼝读size个字节。如果指定超时,则可能在超时后返回较少的字节;如果没有指定超时,则会⼀直等到收完指定的字节数。
def Read_Size(self,size):
return self.ad(size=size)
#接收⼀⾏数据
# 使⽤readline()时应该注意:打开串⼝时应该指定超时,否则如果串⼝没有收到新⾏,则会⼀直等待。
# 如果没有超时,readline会报异常。
def Read_Line(self):
return self.adline()
#发数据
def Send_data(self,data):
self.main_engine.write(data)
#更多⽰例
# self.main_engine.write(chr(0x06).encode("utf-8"))  # ⼗六制发送⼀个数据
# print(self.ad().hex())  #  # ⼗六进制的读取读⼀个字节
# print(self.ad())#读⼀个字节
# print(self.ad(10).decode("gbk"))#读⼗个字节
# print(self.adline().decode("gbk"))#读⼀⾏
# print(self.adlines())#读取多⾏,返回列表,必须匹配超时(timeout)使⽤
# print(self.main_engine.in_waiting)#获取输⼊缓冲区的剩余字节数
# print(self.main_engine.out_waiting)#获取输出缓冲区的字节数
# print(self.adall())#读取全部字符。
#接收数据
#⼀个整型数据占两个字节
#⼀个字符占⼀个字节
def Recive_data(self,way):
# 循环接收数据,此为死循环,可⽤线程实现
print("开始接收数据:")
while True:
try:
# ⼀个字节⼀个字节的接收
if self.main_engine.in_waiting:
if(way == 0):
for i in range(self.main_engine.in_waiting):
print("接收ascii数据:"+str(self.Read_Size(1)))
data1 = self.Read_Size(1).hex()#转为⼗六进制
data2 = int(data1,16)#转为⼗进制print("收到数据⼗六进制:"+data1+"  收到数据⼗进制:"+str(data2)) if(way == 1):
#整体接收
# data = self.ad(self.main_engine.in_waiting).decode("utf-8")#⽅式⼀                        data = self.ad_all()#⽅式⼆print("接收ascii数据:", data) except Exception as e:
print("异常报错:",e)
Communication.Print_Used_Com()
Ret =False #是否创建成功标志
Engine1 = Communication("com12",115200,0.5)
if (Ret):
Engine1.Recive_data(0)
while(1)
{
//发送测试
uint8_t a = 61;
delayms(300);
printf("%c", a);
}
开始接收数据:
接收ascii数据:b'='
收到数据⼗六进制:3d  收到数据⼗进制:61

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。