MQTT在Python中的使⽤mqtt-paho(简单实例,回调函数,回调
参数,qos安全。。。
⽬录
haskell list⼀.  mqtt概念
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是⼀种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布
由于物联⽹的环境是⾮常特别的,所以MQTT遵循以下设计原则:
(1)精简,不添加可有可⽆的功能;
(2)发布/订阅(Pub/Sub)模式,⽅便消息在传感器之间传递;
(3)允许⽤户动态创建主题,零运维成本;
(4)把传输量降到最低以提⾼传输效率;
(5)把低带宽、⾼延迟、不稳定的⽹络等因素考虑在内;
(6)⽀持连续的会话控制;
(7)理解客户端计算能⼒可能很低;
(8)提供服务质量管理;
(9)假设数据不可知,不强求传输数据的类型与格式,保持灵活性。
⼆. mqtt的实现
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种⾝份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使⽤的内容。
当应⽤数据通过MQTT⽹络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。
python下载mqtt包:
pip install paho-mqtt
mqtt简单应⽤(实例)
发布端
# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
print "链接"
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
print "消息内容"
pic + " " + str(msg.payload))
#  订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
print "订阅"
print("On Subscribed: qos = %d" % granted_qos)
pass
#  取消订阅回调
def on_unsubscribe(client, userdata, mid, granted_qos):
print "取消订阅"
print("On unSubscribed: qos = %d" % granted_qos)
pass
#  发布消息回调
def on_publish(client, userdata, mid):
print "发布消息"
print("On onPublish: qos = %d" % mid)
pass
subtotal函数求和#  断开链接回调
def on_disconnect(client, userdata, rc):
print "断开链接"
print("Unexpected disconnection rc = " + str(rc))
pass
client = mqtt.Client()
<_connect = on_connect
<_message = on_message
<_publish = on_publishfileinfo
<_disconnect = on_disconnect
<_unsubscribe = on_unsubscribe
<_subscribe = on_subscribe
while True:
client.publish(topic='mqtt11', payload='amazing', qos=0, retain=False)
time.sleep(2)
此处也可以将mqtt⽅法封装为⼀个类,使⽤会更⽅便⼀些
参数解释:
keepalive =>  ⼼跳间隔,单位是秒,如果 broker 和 client 在这段时间内没有任何通讯,client 会给 broker 发送⼀个 ping 消息retain  =>  如果设为 Ture ,这条消息会被设为保留消息
payload  => 消息内容,字符串类型,如果设为 None ,会发送⼀条长度为 0 消息。如果设置了 int 或者 3. float 类型的值,会当做字符串发送,如果你想发送真正的 int 或者 float 值,需要⽤ struct.pack() ⽣成消息, mqtt的publish 只⽀持None, string, int, float 类型的数据,  如果需要发送json类型数据可以通过json.dumps()将数据进⾏转换后在发送, 接收端在on_message()回调函数中通过json.loads() 将数据解析就可以了
topic =>  这条消息所属的话题
qos  => 消息的安全等级
qos=0    QoS0,At most once,⾄多⼀次;
QoS0 代表,Sender 发送的⼀条消息,Receiver 最多能收到⼀次,也就是说 Sender 尽⼒向 Receiver 发送消息,如果发
送失败,也就算了;
qos=1    QoS1,At least once,⾄少⼀次;
QoS1 代表,Sender 发送的⼀条消息,Receiver ⾄少能收到⼀次,也就是说 Sender 向 Receiver 发送消息,如果发送失
败,会继续重试,直到 Receiver 收到消息为⽌,但是因为重传的原因,Receiver 有可能会收到重复的消息;
qos=2    QoS2,Exactly once,确保只有⼀次
QoS1 代表,Sender 发送的⼀条消息,Receiver ⾄少能收到⼀次,也就是说 Sender 向 Receiver 发送消息,如果发送失
败,会继续重试,直到 Receiver 收到消息为⽌,但是因为重传的原因,Receiver 有可能会收到重复的消息
qos安全等级需要注意的点:
1. python中下载的paho-mqtt包中,默认qos=0。
2. 不论是sub还是pub都需要指定qos安全等级。
pub指定的qos是服务器肯定按此规则接收,但是最终订阅者不⼀定。
sub指定的qos表⽰订阅者可以接收的最⾼消息等级,也就是可能收到更低等级的消息
也就是服务器只会按pub和sub两者qos等级最⼩的那个qos规则来发送消息
订阅端
# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
pic + " " + str(msg.payload))
#  订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
web服务框架print("On Subscribed: qos = %d" % granted_qos)
pass
#  取消订阅回调
def on_unsubscribe(client, userdata, mid):
print "取消订阅"
print("On unSubscribed: qos = %d" % mid)
pass
#  发布消息回调
def on_publish(client, userdata, mid):
print "发布消息"
print("On onPublish: qos = %d" % mid)
pass
#  断开链接回调
def on_disconnect( client, userdata, rc):
print "断开链接"
print("Unexpected disconnection rc = " + str(rc))
pass
client = mqtt.Client()
<_connect = on_connect
<_message = on_message
<_publish = on_publish
<_disconnect = on_disconnect
<_unsubscribe = on_unsubscribe
<_subscribe = on_subscribe
client.subscribe('mqtt11', qos=0)
client.loop_forever() # 保持连接
订阅服务client.subscribe("mqtt11", qos=0) 也可以将改订阅放在on_connect() 回调函数中,程序在建⽴连接成功后⾸先后执⾏
on_connect() , 可将整个订阅端封装为⼀个类使⽤
参数解释:
keepalive =>  ⼼跳间隔,单位是秒,如果 broker 和 client 在这段时间内没有任何通讯,client 会给 broker 发送⼀个 ping 消息loop_forever()  =>  该函数是保持永久连接, 阻塞式,可结合多线程或多进程的⽅式使⽤
注意: 同⼀个mqtt服务即可以是发布端,也可以是订阅端,也可以订阅⾃⼰发布的内容,这⾥之所以分开是为了看起来更直观⼀些mqtt⽅法可以封装为如下类
class MqttRoad(object):
def __init__(self, mqtt_host, mqtt_port, mqtt_keepalive):
super(MqttRoad, self).__init__()
client = mqtt.Client()
<_connect = _connect
<_message = _message
<_publish = _publish
client.loop_forever()  # 保持连接
def on_connect(self, client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
# 订阅
client.subscribe("mqtt11")
def on_message(self, client, userdata, msg):
print("on_message topic:" + pic + " message:" + str(msg.payload.decode('utf-8')))
#  订阅回调
def on_subscribe(self, client, userdata, mid, granted_qos):kotlin lateinit
print("On Subscribed: qos = %d" % granted_qos)
pass
#  取消订阅回调
def on_unsubscribe(self, client, userdata, mid):
python请求并解析json数据# print("取消订阅")
print("On unSubscribed: qos = %d" % mid)
pass
#  发布消息回调
def on_publish(self, client, userdata, mid):
# print("发布消息")
print("On onPublish: qos = %d" % mid)
pass
#  断开链接回调
def on_disconnect(self, client, userdata, rc):
# print("断开链接")
print("Unexpected disconnection rc = " + str(rc))
pass
if __name__ == '__main__':
MqttRoad("172.0.0.1", 1883, 600)
⼀个简单的发布订阅就完成了,通过回调函数,可以对相应的值进⾏操作
三. mqtt回调函数
回调函数的应⽤是⾮常有必要的,回调函数有很多种,我们可能根据不同的业务场景采⽤不同的回调函数进⾏数据的处理,可以达到代码的⾼可⽤,减少代码的冗余。
回调函数只是业务程序的中转站
在这⾥有⼀个特别要注意的点,回调函数可以获取到请求的响应数据,但回调函数并不是适合作为对响应结果进⾏处理的地⽅
举个例⼦:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。