springboot整合mqtt实现消息发送和消费,以及客户端断线重连
之后的消息恢复
参考资料:
MQTT简介
MQTT是⼀种基于发布/订阅模式的轻量级通讯协议,该协议构建在TCP/IP协议上。 MQTT最⼤的有点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为⼀种低开销、低带宽占⽤的即时通讯协议,MQTT在物联⽹、⼩型设备、移动应⽤等⽅⾯有⼴泛应⽤。
特点
开放消息协议,简单易实现
发布订阅模式,⼀对多消息发布
基于TCP/IP⽹络连接,提供有序,⽆损,双向连接
2字节固定报头,2字节⼼跳报⽂,最⼩化传输开销和协议交换,有效减少⽹络流量
消息QoS⽀持,可靠传输保证
应⽤
物联⽹M2M通信,物联⽹⼤数据采集
Android消息推送,WEB消息推送
智能硬件、智能家具、智能电器
车联⽹通信,电动车站桩采集
智慧城市、远程医疗、远程教育
电⼒、⽯油与能源等⾏业市场
MQTT控制报⽂的结构
MQTT通过交换⼀些预定义的MQTT控制报⽂来⼯作,每条MQTT命令消息的消息头都包含⼀个固定的报头,有些消息会携带⼀个可变报⽂头和⼀个负荷。消息格式如下:
|固定包头,存在于所有MQTT控制包
|可变包头,存在于某些MQTT控制包
|载荷,存在于某些MQTT控制包
固定报⽂头(Fixed Header)
MQTT固定报⽂头最少有两个字节,第⼀个字节包含消息类型(Message Type)和QoS级别等标志位。第⼆个字节开始是剩余长度字段,该长度是后⾯的可变报⽂头加消息负载的总长度,该字段最多允许四个字节。
剩余长度使⽤了⼀种可变长度的结构来编码,这种结构使⽤单⼀字节表⽰0-127的值。⼤于127的值如下处理。每个字节的低7位⽤来编码数据,最⾼位⽤来表⽰是否还有后续字节。因此每个字节可以编码128个值,再加上⼀个标识位。剩余长度最多可以⽤四个字节来表⽰。
例如⼗进制的数字64可以被编码成⼀个单独的字节,⼗进制为64,⼋进制为0x40。⼗进制数字321(=65+2×128)被编码为两个字节,低位在前。第⼀个字节是65+128 = 193。注意最⾼位的128表⽰后⾯⾄少还有⼀个字节。第⼆个字节是2,表⽰2*127。(翻译注:321 = 11000001 00000010,第⼀个字节是“标识符后⾯还有⼀个字节”+65,第⼆个字节是“标识符后⾯没有字节了”+256)。
可变报⽂头(Variable Header)
可变报⽂头主要包含协议名、协议版本、连接标志(Connect Flags)、⼼跳间隔时间(Keep Alive timer)、连接返回码(Connect Return Code)、主题名(Topic Name)等
有效负荷(Payload)
可以理解为消息主题(body)
当MQTT发送的消息类型是CONNECT(连接)、PUBLISH(发布)、SUBSCRIBE(订阅)、SUBACK(订阅确认)、则会带有负荷。MQTT的消息类型(Message Type)(控制报⽂类型)
名字值报⽂流动⽅向描述
Reserved0禁⽌保留
CONNECT1客户端到服务端客户端请求连接服务端
CONNACK2服务端到客户端连接报⽂确认
PUBLISH3两个⽅向都允许发布消息
PUBACK4两个⽅向都允许QoS 1消息发布收到确认
名字值报⽂流动⽅向描述
PUBREC5两个⽅向都允许发布收到(保证交付第⼀步)
PUBREL6两个⽅向都允许发布释放(保证交付第⼆步)
PUBCOMP7两个⽅向都允许QoS 2消息发布完成(保证交互第三步)
SUBSCRIBE8客户端到服务端客户端订阅请求
SUBACK9服务端到客户端订阅请求报⽂确认
UNSUBSCRIBE10客户端到服务端客户端取消订阅请求
UNSUBACK11服务端到客户端取消订阅报⽂确认
PINGREQ12客户端到服务端⼼跳请求
PINGRESP13服务端到客户端⼼跳响应
DISCONNECT14客户端到服务端客户端断开连接
Reserved15禁⽌保留
消息质量(QoS)
QoS 0:最多分发⼀次。消息的传递完全依赖于底层的TCP/IP协议,协议⾥没有定义应答和重试,消息要么只会到达服务端⼀次,要么根本没有到达。
QoS 1:⾄少分发⼀次。服务器的消息接收由PUBACK消息进⾏确认,如果通信链路或发送设备异常,或者指定时间内没有收到确认消息,发送端会重发这条在消息头中设置了DUP位的消息。QoS 2:只分发⼀次。这是最⾼级别的消息传递,消息丢失和重复都是不可接受的,使⽤这个服务质量等级会有额外的开销。
通过下⾯的例⼦可以更深刻的理解上⾯三个传输质量等级。
⽐如⽬前流⾏的共享单车智能锁,智能锁可以定时使⽤QoS level 0质量消息请求服务器,发送单车的当前位置,如果服务器没收到也没关系,反正过⼀段时间⼜会再发送⼀次。之后⽤户可以通过App查询周围单车位置,到单车后需要进⾏解锁,这时候可以使⽤QoS level 1质量消息,⼿机App不断的发送解锁消息给单车锁,确保有⼀次消息能达到以解锁单车。最后⽤户⽤完单车后,需要提交付款表单,可以使⽤QoS level 2质量消息,这样确保只传递⼀次数据,否则⽤户就会多付钱了。
Springboot整合MQTT实现消息发布和订阅
⼀、在Linux上搭建MQTT服务
1.1、打开EMQ官⽹:
1.2、点击开始试⽤
1.3、选择服务器对应版本
1.4、复制下载命令到ssh⼯具中执⾏
下载完成
1.5、下载完成后执⾏安装命令
1.6、安装成功后执⾏命令:
sudo emqx start
出现以下信息表⽰启动成功
1.7、测试
浏览器访问ip:18083进⼊管理界⾯,默认账号为admin,密码为public
⼆、MQTT服务搭建完成后使⽤Springboot整合MQTT协议
2.1、创建⼀个maven项⽬
2.2、在⽗⼯程下创建⼀个Springboot项⽬作为消息提供者,导⼊以下依赖
<!--mqtt相关依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.3、修改配置⽂件
spring:
application:
name: provider
#MQTT配置信息
mqtt:
#MQTT服务端地址,端⼝默认为1883,如果有多个,⽤逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883 url: tcp://ip:1883
#⽤户名
username: admin
#密码
password: public
#客户端id(不能重复)
client:
id: provider-id
#MQTT默认的消息推送主题,实际可在调⽤接⼝时指定
default:
topic: topic
server:
port: 8081
2.4、消息发布者客户端配置
qttprovider.mqtt;
slf4j.Slf4j;
lipse.paho.client.mqttv3.*;
lipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import t.annotation.Configuration;
/
**
* @Author: xct
* @Date: 2021/7/30 15:32
* @Description:
*/
@Configuration
@Slf4j
public class MqttProviderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.pic}")
private String defaultTopic;
/**
* 客户端对象
*/
private MqttClient client;
/
**
* 客户端连接服务端
* @author xct
* @param
* @return void
* @date 2021/7/30 16:01
*/
public void connect(){
try {
//创建MQTT客户端对象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
/
/连接设置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,设置为false表⽰服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表⽰每次连接到服务端都是以新的⾝份
options.setCleanSession(true);
//设置连接⽤户名
options.setUserName(username);
//设置连接密码
options.CharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(100);
//设置⼼跳时间单位为秒,表⽰服务器每隔1.5*20秒的时间向客户端发送⼼跳判断客户端是否在线
options.setKeepAliveInterval(20);
//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
//设置回调
client.setCallback(new MqttProviderCallBack());
} catch (MqttException e) {
e.printStackTrace();
}
}
public void publish(int qos,boolean retained,String topic,String message){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.Bytes());
//主题⽬的地,⽤于发布/订阅消息
MqttTopic mqttTopic = Topic(topic);
//提供⼀种机制来跟踪消息的传递进度。
//⽤于在以⾮阻塞⽅式(在后台运⾏)执⾏发布时跟踪消息的传递进度
MqttDeliveryToken token;
try {
/
/将指定消息发布到主题,但不等待消息传递完成。返回的token可⽤于跟踪消息的传递状态。
//⼀旦此⽅法⼲净地返回,消息就已被客户端接受发布。当连接可⽤时,将在后台完成消息传递。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
2.5、消息发布客户端回调
qttprovider.mqtt;
lipse.paho.client.mqttv3.IMqttAsyncClient;
lipse.paho.client.mqttv3.IMqttDeliveryToken;
lipse.paho.client.mqttv3.MqttCallback;
lipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import t.annotation.Configuration;
import org.springframework.stereotype.Component;
/**
* @Author: xct
* @Date: 2021/7/30 16:00
* @Description:
*/
@Configuration
public class MqttProviderCallBack implements MqttCallback {
@Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 与服务器断开连接的回调
* @author xct
* @param throwable
* @return void
* @date 2021/7/30 16:19
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println(clientId + "与服务器断开连接");
}
/**
* 消息到达的回调
* @author xct
* @param s
* @param mqttMessage
* @return void
* @date 2021/7/30 16:19
*/
spring boot选择题@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
/**
* 消息发布成功的回调
* @author xct
* @param iMqttDeliveryToken
* @return void
* @date 2021/7/30 16:20
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
IMqttAsyncClient client = Client();
System.out.ClientId() + "发布消息成功!");
}
}
2.6、创建控制器测试发布消息
ller;
qttprovider.mqtt.MqttProviderConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* @Author: xct
* @Date: 2021/7/30 16:26
* @Description:
*/
@Controller
public class SendController {
@Autowired
private MqttProviderConfig providerClient;
@RequestMapping("/sendMessage")
@ResponseBody
public String sendMessage(int qos,boolean retained,String topic,String message){
try {
providerClient.publish(qos,retained,topic,message);
return "发送成功";
}catch (Exception e){
e.printStackTrace();
return "发送失败";
}
}
}
2.7、在⽗⼯程下创建⼀个Springboot项⽬作为消息消费者,导⼊以下依赖<!--mqtt相关依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2.8、配置⽂件
spring:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论