⽤Python实现Modbus-RTU协议及串⼝调试(⼀)
⽤Python实现Modbus-RTU协议及串⼝调试(⼀)
最近由于要测试⼏块客户使⽤的现场仪表的通信(Modbus-RTU协议),就⽤Python写了个Modbus-RTU协议的串⼝调试模块,主要涉及了bytes类型字节串的使⽤,串⼝模块pyserial的使⽤,循环冗余校验CRC计算模块crcmod的使⽤,以及struct内置模块的使⽤。如果没有安装以上模块请按下⾯命令安装。
pip install pyserial
pip install crcmod
实现CRC16校验
⾸先按照Modbus-RTU协议的规范,所有通信帧末尾都要有2个字节的CRC16字节的校验码,以保证通信的可靠性。所以⾸先要实现CRC16校验算法,我们可以直接使⽤crcmod模块的已有算法,代码如下:
import crcmod
# CRC16校验,返回整型数
def crc16(veritydata):
if not veritydata:
return
crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
return crc16(veritydata)
其中函数参数veritydata是⼀个字节串bytes变量,表⽰要进⾏校验的数据。
当然你也可以⾃⼰按Modbus-RTU协议给出的c语⾔算法⾃⼰⽤Python写⼀个。
由于客户仪表主要使⽤了Modbus_RTU协议的03和04功能号,通过03和04功能号主机读取仪表从机的实时数据,⽤Python主要实现这两个功能号的协议,03和04功能号协议具体可以参考Modbus-RTU标准⽂档。
⽤函数实现Modbus-RTU的03和04功能主机->从机的命令帧
代码如下:
def mmodbus03or04(add, startregadd, regnum, funcode=3):
if add <0or add >0xFF or startregadd <0or startregadd >0xFFFF or regnum <1or regnum >0x7D:
print("Error: parameter error")
return
if funcode !=3and funcode !=4:
print("Error: parameter error")
return
sendbytes = _bytes(1, byteorder="big", signed=False)
sendbytes = sendbytes + _bytes(1, byteorder="big", signed=False)+ _bytes(2, byteorder="big", signed=False)+ \
<_bytes(2, byteorder="big", signed=False)
crcres = crc16(sendbytes)
crc16bytes = _bytes(2, byteorder="little", signed=False)
sendbytes = sendbytes + crc16bytes
return sendbytes
此函数⼀共有4个参数,含义如下。
add:Modbus从站地址(仪表地址)
startregadd:要读取的开始寄存器地址(从0开始的绝对地址)
regnum:要读取的寄存器个数
funcode:功能号,默认值是3
此函数⾸先判断了从站地址是否超限,开始寄存器地址是否超限,寄存器个数是否超限,功能号是否正确等。
然后将⼏个参数以及CRC16的校验码形成⼀个可以发送串⼝的字节串后返回。注意int整形的to_bytes成员函数,此函数⽤于将整形数转换为字节码,第⼀个参数是转换字节码的字节个数,第⼆个参数是
转换是按⼤端模式(big)还是⼩端模式(little)。第三个参数标识整形数按有符号整形转换还是⽆符号整形转换。
将Modbus从机返回的数据帧的解析为数值的函数
代码如下:
def smodbus03or04(recvdata, valueformat=0, intsigned=False):
if not recvdata:
print("Error: data error")
return
if not checkcrc(recvdata):
print("Error: crc error")
return
datalist =list(recvdata)
if datalist[1]!=0x3and datalist[1]!=0x4:
print("Error: recv data funcode error")
return
bytenums = datalist[2]
if bytenums %2!=0:
print("Error: recv data reg data error")
return
retdata =[]
if valueformat ==0:
floatnums = bytenums /4
print("float nums: ",str(floatnums))
floatlist =[0,0,0,0]
for i in range(int(floatnums)):
floatlist[1]= datalist[3+i*4]
floatlist[0]= datalist[4+i*4]
floatlist[3]= datalist[5+i*4]
floatlist[2]= datalist[6+i*4]
bfloatdata =bytes(floatlist)
[fvalue]= struct.unpack('f', bfloatdata)
retdata.append(fvalue)
print(f'Data{i+1}: {fvalue:.3f}')
elif valueformat ==1:
shortintnums = bytenums /2
print("short int nums: ",str(shortintnums))
for i in range(int(shortintnums)):
btemp = recvdata[3+i*2:5+i*2]
shortvalue =int.from_bytes(btemp, byteorder="big", signed=intsigned)
retdata.append(shortvalue)
print(f"Data{i+1}: {shortvalue}")
return retdata
此函数的3个参数含义如下:
recvdata:Modbus-RTU从站在接收03和04功能号命令帧后的发回主站的数据帧,bytes字节串类型。
valueformat:寄存器中值的格式,0代表⽤2个寄存器4个字节表⽰⼀个单精度浮点数,1代表1个寄存器(2字节)存放1个16位整形值,默认为0,(仪表⽤的是单精度浮点数)
intsigned:当寄存器数据值格式是整形时,true则按照有符号整形转换,false则按⽆符号整形转换。
⾸先是对接收的数据帧的各种错误判断,CRC16校验值核对等判断。如果数据值格式是单精度浮点数则按照仪表规定的浮点数格式进⾏转换,先将接收的字节串转换为整形列表,然后根据浮点数个数进⾏迭代,将接收的数据转换为浮点数,并放⼊列表中返回,要注意的是如何将bytes类型转换为⼀个浮点数,这⾥使⽤了struct模块的unpack函数。将浮点数转换为bytes类型则使⽤struct模块的pack函数。具体使⽤⽅法请参考说明⽂档。当数据值格式是16位整形时,则将切⽚出来每个值对应的bytes字节串转换为整形数,并放到列表中返回。
⽤串⼝读取仪表数据
串⼝读写使⽤pyserial模块,代码如下:
if __name__ =='__main__':
slaveadd =1# modbus从站地址
startreg =0# 开始寄存器地址
regnums =40# 寄存器个数
send_data = mmodbus03or04(slaveadd, startreg, regnums)
print("send data : ", send_data.hex())
com = serial.Serial("com3",9600, timeout=0.8)
starttime = time.time()
com.write(send_data)
recv_data = ad(regnums*2+5)
python怎么读取串口数据endtime = time.time()
if len(recv_data)>0:
print("recv: ", recv_data.hex())
print(f"used time: {endtime-starttime:.3f}")
com.close()
smodbus03or04(recv_data)
serial模块的Serial函数常⽤的3个参数是:串⼝号、波特率、超时时间(s)。这⾥在发送了命令帧后,根据命令帧中的寄存器个数计算出需要读取多少个字节的串⼝数据。读取后⽤解析函数得到具体数值。
完整的代码如下:
import serial
import crcmod
import time
import struct
# CRC16校验,返回整型数
def crc16(veritydata):
if not veritydata:
return
crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
return crc16(veritydata)
# 校验数据帧的CRC码是否正确
def checkcrc(data):
if not data:
return False
if len(data)<=2:
return False
nocrcdata = data[:-2]
oldcrc16 = data[-2:]
oldcrclist =list(oldcrc16)
crcres = crc16(nocrcdata)
crc16byts = _bytes(2, byteorder="little", signed=False)
# print("CRC16:", crc16byts.hex())
crclist =list(crc16byts)
if oldcrclist[0]!= crclist[0]or oldcrclist[1]!= crclist[1]:
return False
return True
# Modbus-RTU协议的03或04读取保存或输⼊寄存器功能主-》从命令帧
def mmodbus03or04(add, startregadd, regnum, funcode=3):
if add <0or add >0xFF or startregadd <0or startregadd >0xFFFF or regnum <1or regnum >0x7D:
print("Error: parameter error")
return
if funcode !=3and funcode !=4:
print("Error: parameter error")
return
sendbytes = _bytes(1, byteorder="big", signed=False)
sendbytes = sendbytes + _bytes(1, byteorder="big", signed=False)+ _bytes(2, byteorder="big", signed=False)+ \
<_bytes(2, byteorder="big", signed=False)
crcres = crc16(sendbytes)
crc16bytes = _bytes(2, byteorder="little", signed=False)
sendbytes = sendbytes + crc16bytes
return sendbytes
# Modbus-RTU协议的03或04读取保持或输⼊寄存器功能从-》主的数据帧解析(浮点数2,1,4,3格式,16位短整形(定义正负数))def smodbus03or04(recvdata, valueformat=0, intsigned=False):
if not recvdata:
print("Error: data error")
return
if not checkcrc(recvdata):
print("Error: crc error")
return
datalist =list(recvdata)
if datalist[1]!=0x3and datalist[1]!=0x4:
print("Error: recv data funcode error")
return
bytenums = datalist[2]
if bytenums %2!=0:
print("Error: recv data reg data error")
return
retdata =[]
if valueformat ==0:
floatnums = bytenums /4
print("float nums: ",str(floatnums))
floatlist =[0,0,0,0]
for i in range(int(floatnums)):
floatlist[1]= datalist[3+i*4]
floatlist[0]= datalist[4+i*4]
floatlist[3]= datalist[5+i*4]
floatlist[2]= datalist[6+i*4]
bfloatdata =bytes(floatlist)
[fvalue]= struct.unpack('f', bfloatdata)
retdata.append(fvalue)
print(f'Data{i+1}: {fvalue:.3f}')
elif valueformat ==1:
shortintnums = bytenums /2
print("short int nums: ",str(shortintnums))
for i in range(int(shortintnums)):
btemp = recvdata[3+i*2:5+i*2]
shortvalue =int.from_bytes(btemp, byteorder="big", signed=intsigned)
retdata.append(shortvalue)
print(f"Data{i+1}: {shortvalue}")
return retdata
if __name__ =='__main__':
slaveadd =1
startreg =0
regnums =40
send_data = mmodbus03or04(slaveadd, startreg, regnums)
print("send data : ", send_data.hex())
com = serial.Serial("com3",9600, timeout=0.8)
starttime = time.time()
com.write(send_data)
recv_data = ad(regnums*2+5)
endtime = time.time()
if len(recv_data)>0:
print("recv: ", recv_data.hex())
print(f"used time: {endtime-starttime:.3f}")
com.close()
smodbus03or04(recv_data)
码字不易,如果本⽂对您有⽤请随⼿点个赞,谢谢!^_^
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论