Python_Modbus_RTU_通讯Demo
之前利⽤Python进⾏Modbus_TCP进⾏数据接收,本次需要利⽤串⼝进⾏数据传输,学习Modbus_RTU的简单实现
⾸先要在创建两个虚拟串⼝,利⽤VSPD⼯具即可。在⼀台电脑上实现数据的发送和接收
进⼊Python IDE进⾏slave端的编写
import serial
import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_rtu
import time
def ModbusRTU_Slave():
try:
# 设定串⼝为从站
# 外置参数包括端⼝ port = "COM3" 波特率:9600
server= modbus_rtu.RtuServer(serial.Serial(port="com3",baudrate=9600, bytesize=8, parity='N', stopbits=1))
server.start()
print("")
SLAVE1 = server.add_slave(1)
SLAVE1.add_block('A', cst.HOLDING_REGISTERS, 0, 4)  # 地址0,长度4
increase_num = 17000
for i in range(300):
SLAVE1.set_values('A', 0, [17000, i + 1, i + 2])  # 改变在地址0处的寄存器的值
time.sleep(0.1)
for i in range(1500):
increase_num = increase_num - 1
SLAVE1.set_values('A', 0, [increase_num, i + 1, i + 2])  # 改变在地址0处的寄存器的值
time.sleep(0.1)
for i in range(800):
SLAVE1.set_values('A', 0, [10000, i + 1, i + 2])  # 改变在地址0处的寄存器的值
time.sleep(0.1)
for i in range(800):
SLAVE1.set_values('A', 0, [17000, i + 1, i + 2])  # 改变在地址0处的寄存器的值
time.sleep(0.1)
except Exception as exc:
print(str(exc))
再进⾏master端的编写
# 数据接收端
def ModbusRTU_Master():
try:
# 设定串⼝为从站
# 外置参数包括端⼝ port = "COM3" 波特率:9600
master = modbus_rtu.RtuMaster(serial.Serial(port="com4",baudrate=9600, bytesize=8, parity='N', stopbits=1))
master.set_timeout(1.0)
master.set_verbose(True)
# 读保持寄存器
read = ute(1, cst.HOLDING_REGISTERS, 0, 4)  # 这⾥可以修改需要读取的功能码
print(red)
except Exception as exc:
print(str(exc))
补充功能码
功能代码cst
1~255中1~21是常⽤的,以下是读写专⽤的功能码
READ_COILS = 01 读线圈,位操作
READ_DISCRETE_INPUTS = 02 读离散输⼊状态,位操作
READ_HOLDING_REGISTERS = 03 读保持寄存器,字操作
READ_INPUT_REGISTERS = 04 读输⼊寄存器,字操作
WRITE_SINGLE_COIL = 05 写单线圈,位操作
WRITE_SINGLE_REGISTER = 06 写单⼀寄存器,字操作
WRITE_MULTIPLE_COILS = 15 写多个线圈【强制多点线圈】,位操作
WRITE_MULTIPLE_REGISTERS = 16 写多寄存器【写乘法寄存器】,字操作
演⽰Demo 创建⼀个项⽬ 1. 通过Modbus_RTU读取数据在通过Modbus_RTU将数据发出
⾸先创建四个虚拟串⼝
利⽤Modbus ⼯具模拟数据发送和接收【注意】:modbus poll 设置中的数据长度⼀定要和程序中推送的长度⼀致,在这个问题上浪费了好长时间哎
Python程序,其中包含了tcp 和 rtu的内容可以通过JSON配置⽂件实现协议切换
import time
from datetime import datetime
import modbus_tk
import modbus_tk.defines as cst
import dbus_tcp as modbus_tcp
from modbus_tk import modbus_rtu
import serial
import json
# JSON ⽂件初始化
def InitJSON():
global filejson
try:
with open('20191219_1443.json', 'r') as f:
filejson = json.load(f)
except FileNotFoundError:
print('⽆法打开指定的⽂件!')
except LookupError:
print('指定了未知的编码!')
except UnicodeDecodeError:
print('读取⽂件时解码错误!')
# modbus 相关参数初始化
def ModbusInit():
# ⾸先判断数据传输协议 Modbus_TCP  还是 RTU
python怎么读取串口数据
if filejson['ModbusInit']['Modbus_Mode'] == "Modbus_TCP":
print("Modbus_Mode = Modbus_TCP")
Modbus_master_IP = filejson['ModbusInit']['Modbus_master_IP']
Modbus_master_port = filejson['ModbusInit']['Modbus_master_port']
MASTER = modbus_tcp.TcpMaster(host=Modbus_master_IP, port=Modbus_master_port)
MASTER.set_timeout(5.0)
Modbus_slave_IP = filejson['ModbusInit']['Modbus_slave_IP']
Modbus_slave_port = filejson['ModbusInit']['Modbus_slave_port']
SLAVE = modbus_tcp.TcpServer(address=Modbus_slave_IP, port=Modbus_slave_port)
# SLAVE.set_timeout(5.0)
else:
print("Modbus_Mode = Modbus_RTU")
Modbus_master_PORT = filejson['ModbusInit']['Modbus_master_PORT']
Modbus_master_baudrate = filejson['ModbusInit']['Modbus_master_baudrate']
print(Modbus_master_PORT)
MASTER = modbus_rtu.RtuMaster(serial.Serial(port=Modbus_master_PORT, baudrate=Modbus_master_baudrate))
MASTER.set_timeout(5.0)
Modbus_slave_PORT = filejson['ModbusInit']['Modbus_slave_PORT']
print(Modbus_slave_PORT)
Modbus_slave_baudrate = filejson['ModbusInit']['Modbus_slave_baudrate']
SLAVE = modbus_rtu.RtuServer(serial.Serial(port=Modbus_slave_PORT, baudrate=Modbus_slave_baudrate, bytesize=8, parity='N', stopbits=1)) # SLAVE.set_timeout(5.0)
SLAVE.start()
print("")
SLAVE1 = SLAVE.add_slave(1)
SLAVE1.add_block('warning', cst.HOLDING_REGISTERS, 0, 4)  # 地址0,长度1
return MASTER,SLAVE1
def main():
# 初始化JSON ⽂件
InitJSON()
# modbus 初始化
MASTER, SLAVE1 = ModbusInit()
a = filejson['DistanceParam']['EquationParamA']
b = filejson['DistanceParam']['EquationParamB']
print("⽅程参数a = {},b= {}\n".format(a, b))
while(1):
# 测试⽤来获取数据
num = ute(1, cst.READ_HOLDING_REGISTERS, 0, 1)[0]
# 获取数据
# num = ute(1, cst.READ_INPUT_REGISTERS, 0, 1)[0]
print("GetV={}".format(num))
SLAVE1.set_values('warning', 0, num)  # 改变在地址0处的寄存器的值
time.sleep(filejson['WhileTime'])
if__name__ == '__main__':
main()
View Code

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。