springboot+mqtt物联⽹开发
最近这⼀年⾥,在项⽬实战的时候,遇到了mqtt开发,今天我就⼤致的来总结下,mqtt在spring boot的使⽤1、引⽤jar
<!-- mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
2.项⽬启动建⽴链接
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import com.slife.cws.mqttponent.MqttPushClient;
import com.slife.fig.MqttConfig;
slf4j.Slf4j;
@Component
@Slf4j
public class MqttApplicationRunner implements ApplicationRunner {
@Autowired
private MqttConfig mqttConfig;
@Override
public void run(ApplicationArguments args) throws Exception {
if (log.isInfoEnabled()) {
log.info("===============>>>Mqtt is run starting:<<==================");
}
MqttPushClient mqttPushClient = new MqttPushClient();
// 订阅主题
mqttPushClient.Topic(), Qos());
}
}
3.相关配置及实现类
①、配置
#spring.mqtt.url=tcp://127.0.0.1
spring.mqtt.url=tcp://127.0.0.1
spring.mqtt.username= nbew
spring.mqtt.password= 123456
spring.mqtt.client-id= 100201101
pics= top
spring.mqttpletion-timeout= 3000
spring.mqtt.timeout= 120
spring.mqtt.keep-alive= 20
spring.mqtt.qos= 1,1
spring:
mqtt:
url: tcp://loudapp:1883
username: 0ba851e2e83609b9
password: 81757448a4df0d73
client-id: 0ba851e2e83609b9
id: test10000011
topics: v4/p/post/thing/live/json/1.1
completion-timeout: 3000
timeout: 30
keep-alive: 60
qos: 1
import org.springframework.beans.factory.annotation.Value; import t.annotation.Configuration; import org.springframework.stereotype.Component;
import lombok.Data;
//@ConfigurationProperties(prefix = "spring.mqtt")
@Data
@Component
@Configuration
public class MqttConfig {
/**
* 链接url
*/
@Value("${spring.mqtt.url}")
private String url;
/**
* ⽤户名
*/
@Value("${spring.mqtt.username}")
private String username;
/**
* 密码
*/
@Value("${spring.mqtt.password}")
private String password;
/**
* 客户端id
*/
@Value("${spring.mqtt.client-id}")
private String clientId;
/**
* 通讯标识 id
*/
@Value("${spring.mqtt.id}")
private String id;
/**
* 主题
*/
@Value("${pics}")
private String[] topic;
/**
* 超时时间
*/
@Value("${spring.mqtt.timeout}")
private int timeout;
/**
* ⼼跳检测时间
*/
@Value("${spring.mqtt.keep-alive}")
private int keepAlive;
/**
* ⼼跳包级别
*/
@Value("${spring.mqtt.qos}")
private int[] qos;
private int completionTimeout;
}
import t.annotation.Bean; import org.springframework.stereotype.Component;
/**
* @Package com.slife.cws.mqttponent
* @ClassName: Mqttbean
* @Description: 客户端
* @Author youli
* @date 2021年2⽉16⽇
* @CopyRight:上海成⽣科技有限公司
*/
@Component
public class Mqttbean {
@Bean("mqttPushClient")
public MqttPushClient getMqttPushClient() {
MqttPushClient mqttPushClient = new MqttPushClient();
spring framework是哪个公司return mqttPushClient;
}
}
lipse.paho.client.mqttv3.MqttClient;
lipse.paho.client.mqttv3.MqttConnectOptions;
lipse.paho.client.mqttv3.MqttDeliveryToken;
lipse.paho.client.mqttv3.MqttException;
lipse.paho.client.mqttv3.MqttMessage;
lipse.paho.client.mqttv3.MqttPersistenceException;
lipse.paho.client.mqttv3.MqttTopic;
lipse.paho.client.mqttv3.persist.MemoryPersistence;
import com.slife.fig.MqttConfig;
slf4j.Slf4j;
/**
* @Package com.shhw.mqttponent
* @ClassName: MqttPushClient
* @Description: MqttClient客户端代码
* @Author youli
* @date 2020年10⽉16⽇
* @CopyRight:上海成⽣科技有限公司
*/
@Slf4j
public class MqttPushClient {
private static MqttClient client;
public static MqttClient getClient() {
return client;
}
public static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
private MqttConnectOptions getOption(String userName, String password, int outTime, int KeepAlive) {
// MQTT连接设置
MqttConnectOptions option = new MqttConnectOptions();
// 设置是否清空session,false表⽰服务器会保留客户端的连接记录,true表⽰每次连接到服务器都以新的⾝份连接
option.setCleanSession(false);
// 设置连接的⽤户名
option.setUserName(userName);
// 设置连接的密码
option.CharArray());
// 设置超时时间单位为秒
option.setConnectionTimeout(outTime);
// 设置会话⼼跳时间单位为秒服务器会每隔(1.5*keepTime)秒的时间向客户端发送个消息判断客户端是否在线,但这个⽅法并没有重连的机制 option.setKeepAliveInterval(KeepAlive);
// setWill⽅法,如果项⽬中需要知道客户端是否掉线可以调⽤该⽅法。设置最终端⼝的通知消息
// option.setWill(topic, "close".getBytes(), 2, true);
option.setMaxInflight(1000);
log.info("================>>>MQTT连接认证成功<<======================");
return option;
}
/**
* 连接
*/
public void connect(MqttConfig mqttConfig) {
MqttClient client;
try {
String clientId = ClientId();
clientId += System.currentTimeMillis();
client = new Url(), clientId, new MemoryPersistence());
MqttConnectOptions options = Username(), Password(),
MqttPushClient.setClient(client);
try {
client.setCallback(new PushCallback<Object>(this, mqttConfig));
if (!client.isConnected()) {
log.info("================>>>MQTT连接成功<<======================");
//订阅主题
Topic(), Qos());
} else {// 这⾥的逻辑是如果连接不成功就重新连接
client.disconnect();
log.info("===================>>>MQTT断连成功<<<======================");
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 断线重连
*
* @throws Exception
*/
public Boolean reConnect() throws Exception {
Boolean isConnected = false;
if (null != client) {
if (client.isConnected()) {
isConnected = true;
}
}
return isConnected;
}
/
**
* 发布,默认qos为0,⾮持久化
*
* @param topic
* @param pushMessage
*/
public void publish(String topic, String pushMessage) {
publish(0, false, topic, pushMessage);
}
/**
* 发布
*
* @param qos
* @param retained
* @param topic
* @param pushMessage
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.Bytes());
MqttTopic mTopic = Client().getTopic(topic);
if (null == mTopic) {
<("===============>>>MQTT topic 不存在<<=======================");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 发布消息的服务质量(推荐为:2-确保消息到达⼀次。0-⾄多⼀次到达;1-⾄少⼀次到达,可能重复), retained
* 默认:false-⾮持久化(是指⼀条消息消费完,就会被删除;持久化,消费完,还会保存在服务器中,当新的订阅者出现,继续给新订阅者消费) *
* @param topic
* @param pushMessage
*/
public void publish(int qos, String topic, String pushMessage) {
publish(qos, false, topic, pushMessage);
}
/**
* 订阅某个主题,qos默认为0
*
* @param topic
*/
public void subscribe(String[] topic) {
subscribe(topic, null);
}
/**
* 订阅某个主题
*
* @param topic
* @param qos
*/
public void subscribe(String[] topic, int[] qos) {
try {
} catch (MqttException e) {
e.printStackTrace();
}
}
}
lipse.paho.client.mqttv3.MqttDeliveryToken;
lipse.paho.client.mqttv3.MqttException;
lipse.paho.client.mqttv3.MqttMessage;
lipse.paho.client.mqttv3.MqttPersistenceException;
lipse.paho.client.mqttv3.MqttTopic;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
slf4j.Slf4j;
/
**
* @Package com.shhw.mqttponent
* @ClassName: MqttSender
* @Description: 主题发布
* @Author youli
* @date 2020年10⽉16⽇
* @CopyRight:上海成⽣科技有限公司
*/
@Component(value = "mqttSender")
@Slf4j
public class MqttSender {
@Async
public void send(String queueName, String msg) {
log.debug("=====================>>>>发送主题:{}, msg:{}", queueName,msg); publish(2, queueName, msg);
}
/**
* 发布,默认qos为0,⾮持久化
*
* @param topic
* @param pushMessage
*/
public void publish(String topic, String pushMessage) {
publish(1, false, topic, pushMessage);
}
/**
* 发布
*
* @param qos
* @param retained
* @param topic
* @param pushMessage
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.Bytes());
MqttTopic mTopic = Client().getTopic(topic);
if (null == mTopic) {
<("===================>>>MQTT topic 不存在<<================="); }
MqttDeliveryToken token;
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论