python做MQTT性能测试⼩解
⼀、概念与原理
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是⼀种基于发布/订阅(publish/subscribe)模式的轻量级协议,该协议构建于TCP/IP协议之上,MQTT最⼤优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为⼀种低开销、低带宽占⽤的即时通讯协议,使其在物联⽹、⼩型设备、移动应⽤等⽅⾯有较⼴泛的应⽤。
MQTT是⼀个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适⽤范围⾮常⼴泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联⽹(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及⼀些⼩型化设备中已⼴泛使⽤。
MQTT协议⼯作在低带宽、不可靠的⽹络的远程传感器和控制设备通讯⽽设计的协议,它具有以下主要的⼏项特性:
(1)使⽤发布/订阅消息模式,提供⼀对多的消息发布,解除应⽤程序耦合。
(2)对负载内容屏蔽的消息传输。
(3)使⽤TCP/IP提供⽹络连接。
主流的MQTT是基于TCP连接进⾏数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接⽅式,优缺点⾃然也就各有不同了。
(4)有三种消息发布服务质量(qss):
“⾄多⼀次”,消息发布完全依赖底层TCP/IP⽹络。会发⽣消息丢失或重复。这⼀级别可⽤于如下情况,环境传感器数据,丢失⼀次读记录⽆所谓,因为不久后还会有第⼆次发送。这⼀种⽅式主要普通APP的推送,倘若你的智能设备在消息推送时未联⽹,推送过去没收到,再次联⽹也就收不到了。
“⾄少⼀次”,确保消息到达,但消息重复可能会发⽣。
“只有⼀次”,确保消息到达⼀次。在⼀些要求⽐较严格的计费系统中,可以使⽤此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最⾼质量的消息发布服务还可以⽤于即时通讯类的APP的推送,确保⽤户收到且只会收到⼀次。
当然了这个数字越⼩,带宽带站越低
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种⾝份:发布者(
Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使⽤的内容。
⼆、环境准备
1、安装paho-mqtt
命令 pip install paho-mqtt
2、安装peewee
pip install peewee
运⽤命令python -m pwiz -e mysql -H 127.0.0.1 -p 3306 -u root -P dbname >model.py
python -m pwiz -e mysql -H {主机地址} -p 3306 -u root -- password {数据库名称} > {⽣成的代码⽂件 例model.py}
-e数据库类型 ⽐如mysql
-H数据库ip
-p端⼝
-u数据库⽤户名
-P密码
dbname为db名
db.py⾃动⽣成的py⽂件名
因为我做的时候,订阅跟推送都需要从数据库取公司编码以及推送还需要从数据库取得⽤户ID 三、订阅
啥也不说 直接上代码:
"""
@Desc: 订阅
@Time: 2021/8/8 10:00
@Author: kally
@Version: 1.0
@FileName: subscribe.py
"""
# coding=utf-8
import datetime
import time
import paho.mqtt.client as mqtt
import logging
from models.dev_weigh import CoTpSiteProgress #利⽤peewee⽣成的.py⽂件
logging.basicConfig(filename='', level='INFO')
file = open("", 'w').close() # 清空⽂件之前的内容
def findStr(str):
开源mqtt服务器with open("", 'r') as f:
counts = 0
for line adlines():
time = unt(str)
counts += time
print("%s出现的次数:%d" % (str, counts))
num = 1
host = "*****" # 主机地址
port = 1883
client = mqtt.Client()
def get_data():
'''从数据库获取公司编码拼接成需要订阅的主题列表'''
cocode_list = []
sub_topic = []
driver = CoTpSiteProgress.select().where((CoTpSiteProgress.status == 1))
# 遍历数据
for p in driver:
cocode_list._code)
print(f"⼀共{len(cocode_list)}条数据")
cocode_list = set(cocode_list)
cocode_list = list(cocode_list)
for i in range(len(cocode_list)):
coCode = cocode_list[i]
sub_topic.append("LMS/uat10/toClient/{}/1".format(coCode)) # 要订阅的主题列表
print(sub_topic)
return sub_topic
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
print("Connected with result code: " + str(rc))
def on_log(client, obj, level, string):
print("Log:" + string)
def on_message(client, obj, msg):
'''接受到的服务器返回的消息格式处理'''
global num
curtime = w()
strcurtime = curtime.strftime("%Y-%m-%d %H:%M:%S")
print("index: " + str(num) + " " + strcurtime + ": " + pic + " " + str(msg.qos) + " " + str(msg.payload)) logging.info(
"index: " + str(num) + " " + strcurtime + ": " + pic + " " + str(msg.qos) + " " + str(msg.payload)) on_exec(str(msg.payload))
num = num + 1
def on_exec(strcmd):
print("Exec:", strcmd)
def mqtt_connect():
'''链接到MQTT服务器'''
client.username_pw_set("user_test", "user_test")
# client.username_pw_set("admin", "public")
<_connect = on_connect
<_message = on_message
time.sleep(2)
def mqtt_sub():
'''循环⽅式订阅所有的主题函数'''
sub_topic = get_data()
mqtt_connect()
print("beign sub")
for i in range(len(sub_topic)):
client.subscribe(sub_topic[i], qos=2)
print("index: " + str(i + 1) + " " + sub_topic[i])
print(f"订阅了{len(sub_topic)}个公司")
print("end sub")
client.loop_forever() # 保持连接
if __name__ == '__main__':
mqtt_sub()
findStr("ack")
四、推送
"""
@Desc: 推送
@Time: 2021/8/8 10:00
@Author: kally
@Version: 1.0
@FileName: locustfile.py
"""
# coding=utf-8
import datetime
import os
import random
import paho.mqtt.client as mqtt
import time
import logging
import sys
import queue
from locust import task, HttpUser, constant, TaskSet, between
import json
from models.dev_weigh import CoTpSiteProgress
host = "*******" # 主机地址
port = 1883 # 端⼝号
client = mqtt.Client()
file = open("locust.log", 'w').close() # 清空⽂件之前的内容
def get_XS_one_data(): # 销售
driverIdCard_list = []
cocode_list = []
driver = CoTpSiteProgress.select().where(
(CoTpSiteProgress.status == 1) & (de_now == "040") & (CoTpSiteProgress.business_type == 2)) # 遍历数据
for p in driver:
driverIdCard_list.append(p.driver_id_card)
cocode_list._code)
print(cocode_list)
print(driverIdCard_list)
print(len(cocode_list))
return cocode_list, driverIdCard_list
def get_XS_two_data(): # 销售⼆次过磅
driverIdCard_list = []
cocode_list = []
driver = CoTpSiteProgress.select().where(
(CoTpSiteProgress.status == 1) & (de_now == "070") & (CoTpSiteProgress.business_type == 2)) # 遍历数据
for p in driver:
driverIdCard_list.append(p.driver_id_card)
cocode_list._code)
print(cocode_list)
print(len(cocode_list))
return cocode_list, driverIdCard_list
def get_CG_one_data(): # 采购1次过磅
driverIdCard_list = []
cocode_list = []
driver = CoTpSiteProgress.select().where(
(CoTpSiteProgress.status == 1) & (de_now == "040") & (CoTpSiteProgress.business_type == 1)) # 遍历数据
for p in driver:
driverIdCard_list.append(p.driver_id_card)
cocode_list._code)
cocode_list._code)
print(cocode_list)
print(len(cocode_list))
return cocode_list, driverIdCard_list
def get_CG_two_data(): # 采购⼆次过磅
driverIdCard_list = []
cocode_list = []
driver = CoTpSiteProgress.select().where(
(CoTpSiteProgress.status == 1) & (de_now == "070") & (CoTpSiteProgress.business_type == 1))
# 遍历数据
for p in driver:
driverIdCard_list.append(p.driver_id_card)
cocode_list._code)
print(cocode_list)
print(len(cocode_list))
return cocode_list, driverIdCard_list
def get_DD_1_one_data(): # 短倒先⽑后⽪1次过磅 # 短导⼀次过磅,先⽑后⽪短倒⼀次过磅先⽪后⽑过磅类型 1=仙茅后⽪,2=先⽪后⽑,3=单次计量 driverIdCard_list = []
cocode_list = []
driver = CoTpSiteProgress.select().where(
(CoTpSiteProgress.status == 1) & (de_now == "040") & (
CoTpSiteProgress.business_type == 3) & (CoTpSiteProgress.weigh_type == 1))
# 遍历数据
for p in driver:
driverIdCard_list.append(p.driver_id_card)
cocode_list._code)
print(cocode_list)
print(len(cocode_list))
return cocode_list, driverIdCard_list
def get_DD_1_two_data(): # 短倒先⽑后⽪2次过磅
driverIdCard_list = []
cocode_list = []
driver = CoTpSiteProgress.select().where(
(CoTpSiteProgress.status == 1) & (de_now == "070") & (
CoTpSiteProgress.business_type == 3) & (CoTpSiteProgress.weigh_type == 1))
# 遍历数据
for p in driver:
driverIdCard_list.append(p.driver_id_card)
cocode_list._code)
print(cocode_list)
print(len(cocode_list))
return cocode_list, driverIdCard_list
def get_DD_2_one_data(): # 短导⼀次过磅,先⽑后⽪短倒⼀次过磅先⽪后⽑过磅类型 1=仙茅后⽪,2=先⽪后⽑,3=单次计量 #短倒先⽪后⽑1次过磅 driverIdCard_list = []
cocode_list = []
driver = CoTpSiteProgress.select().where(
(CoTpSiteProgress.status == 1) & (de_now == "040") & (
CoTpSiteProgress.business_type == 3) & (CoTpSiteProgress.weigh_type == 2))
# 遍历数据
for p in driver:
driverIdCard_list.append(p.driver_id_card)
cocode_list._code)
print(cocode_list)
print(len(cocode_list))
return cocode_list, driverIdCard_list
def get_DD_2_two_data(): # 短倒先⽪后⽑2次过磅
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论