SpringBoot整合MQTT(使⽤官⽅demo)
依赖
<dependency>
<groupId&lipse.paho</groupId>
<artifactId&lipse.paho.client.mqttv3</artifactId>
<version>1.2.3</version>
</dependency>
配置
spring:
mqtt:
clientId: test1
url: tcp://192.168.1.24:1883
username: admin
password: 123456
配置类
MyMqttClient.java
lipse.paho.client.mqttv3.persist.MemoryPersistence;
lipse.paho.client.mqttv3.MqttClient;
lipse.paho.client.mqttv3.MqttConnectOptions;
lipse.paho.client.mqttv3.MqttDeliveryToken;
lipse.paho.client.mqttv3.MqttException;
lipse.paho.client.mqttv3.MqttMessage;
lipse.paho.client.mqttv3.MqttPersistenceException;
lipse.paho.client.mqttv3.MqttTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Component
public class MyMqttClient {
public static MqttClient mqttClient = null;
private static MemoryPersistence memoryPersistence = null;
private static MqttConnectOptions mqttConnectOptions = null;
@Autowired
private MqttRecieveCallback mqttRecieveCallback;
@Autowired
private MqttTwoRecieveCallback mqttTwoRecieveCallback;
@Value("${spring.mqtt.url}")
private String serverURI;
@Value("${spring.mqtt.clientId}")
private String clientId;
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@PostConstruct
public void init() {
//初始化连接设置对象
mqttConnectOptions = new MqttConnectOptions();
//初始化MqttClient
if (null != mqttConnectOptions) {
// true可以安全地使⽤内存持久性作为客户端断开连接时清除的所有状态
mqttConnectOptions.setCleanSession(true);
// 设置连接超时
mqttConnectOptions.setConnectionTimeout(10);
//设置账号密码
// mqttConnectOptions.setUserName(username);
// mqttConnectOptions.CharArray());
// 设置持久化⽅式
memoryPersistence = new MemoryPersistence();
if (null != memoryPersistence && null != clientId) {
try {
mqttClient = new MqttClient(serverURI, clientId, memoryPersistence);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
}
} else {
System.out.println("mqttConnectOptions对象为空");
}
System.out.println(mqttClient.isConnected());
//设置连接和回调
if (null != mqttClient) {
if (!mqttClient.isConnected()) {
/
/ 创建回调函数对象
// MqttRecieveCallback mqttReceriveCallback = new MqttRecieveCallback(); // 客户端添加回调函数
// mqttClient.setCallback(mqttReceriveCallback);
// 创建连接
try {
System.out.println("创建连接");
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} else {
System.out.println("mqttClient为空");
}
System.out.println(mqttClient.isConnected());
if (mqttClient.isConnected()) {
try {
//添加回调⽅法1
mqttClient.subscribe("topic/test1", 2, mqttRecieveCallback);
//添加回调⽅法2
mqttClient.subscribe("topic/test2", 2, mqttTwoRecieveCallback);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
// 关闭连接
@PreDestroy
public void closeConnect() {
//关闭存储⽅式
if (null != memoryPersistence) {
try {
memoryPersistence.close();
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
System.out.println("memoryPersistence is null");
}
// 关闭连接
if (null != mqttClient) {
if (mqttClient.isConnected()) {
try {
mqttClient.disconnect();
mqttClient.close();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
System.out.println("mqttClient is not connect");
}
} else {
System.out.println("mqttClient is null");
}
}
// 发布消息
public void publishMessage(String pubTopic, String message, int qos,Boolean retained) { if (null != mqttClient && mqttClient.isConnected()) {
System.out.println("发布消息 " + mqttClient.isConnected());
System.out.println("id:" + ClientId());
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.Bytes());
mqttMessage.setRetained(retained);
MqttTopic topic = Topic(pubTopic);
if (null != topic) {
try {
MqttDeliveryToken publish = topic.publish(mqttMessage);
if (!publish.isComplete()) {
System.out.println("消息发布成功");
}
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} else {
reConnect();
}
}
// 重新连接
public void reConnect() {
if (null != mqttClient) {
if (!mqttClient.isConnected()) {
if (null != mqttConnectOptions) {
try {
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
System.out.println("mqttConnectOptions is null");
}
} else {
System.out.println("mqttClient is null or connect");
}
} else {
init();
}
}
// 订阅主题
public void subTopic(String topic) {
if (null != mqttClient && mqttClient.isConnected()) {
try {
mqttClient.subscribe(topic, 1);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
System.out.println("mqttClient is error");
}
}
// 清空主题
public void cleanTopic(String topic) {
if (null != mqttClient && !mqttClient.isConnected()) {
try {
mqttClient.unsubscribe(topic);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
System.out.println("mqttClient is error");
}
}
}
配置参数说明:
cleanSession :把配置⾥的 cleanSession 设为false,客户端掉线后服务器端不会清除session,当重
连后可以接收之前订阅主题的消息。当客户端上线后继续订阅会接收到它离线的这段时间的消息(注意:clientId 是不能修改)
Retained:如果PUBLISH消息的RETAIN标记位被设置为1,则称该消息为“保留消息”(只会保存⼀条);Broker会存储每个Topic的最后⼀条保留消息及其Qos,当订阅该Topic的客户端上线后,Broker需要将该消息投递给它。这可以让新订阅的客户端马上得到发布⽅的最新的状态值,⽽不必要等待。
保留消息的删除
1. ⽅式1:发送空消息体的保留消息;
2. ⽅式2:发送最新的保留消息覆盖之前的(推荐);
回调类⼀
MqttRecieveCallback.java
lipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqttRecieveCallback implements MqttCallback, IMqttMessageListener {
@Autowired
private MyMqttClient client;
@Override
public void connectionLost(Throwable cause) {
springboot中文}
@Override
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Client 接收消息主题 : " + topic);
System.out.println("Client 接收消息Qos : " + Qos());
System.out.println("Client 接收消息内容 : " + new Payload()));
/**
* 发送消息
*/
client.publishMessage("topic/test2","2",2,false);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
回调类2
MqttTwoRecieveCallback.java
lipse.paho.client.mqttv3.IMqttDeliveryToken;
lipse.paho.client.mqttv3.IMqttMessageListener;
lipse.paho.client.mqttv3.MqttCallback;
lipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
@Component
public class MqttTwoRecieveCallback implements MqttCallback, IMqttMessageListener { @Override
public void connectionLost(Throwable cause) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Client2 接收消息主题 : " + topic);
System.out.println("Client2 接收消息Qos : " + Qos());
System.out.println("Client2 接收消息内容 : " + new Payload())); }
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
使⽤
@Autowired
private MyMqttClient myMqttClient;
myMqttClient.publishMessage("tra_topic",text,2,false);
如果出现报错:MQTT(32202): 正在发布过多的消息
增加配置
mqttConnectOptions.setMaxInflight(1000);
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论