SpringBoot整合MQTT
SpringBoot整合MQTT
公司的项⽬重新进⾏了优化,换成了SpringBoot的架构。之前写的⼀个SSM整合MQTT的demo不能使⽤了,其实之前的代码写的也是有点烂,所以就来了解⼀下SpringBoot整合MQTT,顺便写⼀个⽐较好的代码。下⾯直接开始。
需要添加pom依赖,这个是官⽅整合的依赖,官⽅也有例⼦(虽然不怎么样)。依赖如下:
<!-- MQTT依赖 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.3.1.RELEASE</version>
</dependency>
因为之前是使⽤xml⽂件对mqtt适配器进⾏配置,所以这个新项⽬⾮常不想这样做,最后采⽤JavaConfig的形式配置。官⽅的例⼦写的很清楚了。我的⼊站适配器和出站适配器写在⼀起了。MQTT的配置⽂件如下:
package fig;
lipse.paho.client.mqttv3.MqttConnectOptions;
import t.annotation.Bean;
import t.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.MessageProducer;
import org.springframework.DefaultMqttPahoClientFactory;
import org.springframework.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import ssaging.Message;
import ssaging.MessageChannel;
import ssaging.MessageHandler;
import ssaging.MessagingException;
import ssaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/
**
* MQTTConfig.java
* Description: MQTT的配置类,主要配置出⼊站适配器
*
* @author Peng Shiquan
* @date 2020/7/13
*/
@Configuration
public class MQTTConfig {
/**
* =====================================⼊站适配器1=====================================
*/
@Bean
public MessageChannel mqttInputChannel(){
return new DirectChannel();
}
@Bean
public MessageProducer inbound(){
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883","testClient1",
"testTopic2");
"testTopic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel ="mqttInputChannel")
public MessageHandler handler(){
return new MessageHandler(){
@Override
public void handleMessage(Message<?> message)throws MessagingException {
springboot是啥System.out.Payload()+"====="+ Headers().get("mqtt_receivedTopic"));
}
};
}
/**
* =====================================⼊站适配器2=====================================
*/
@Bean
public MessageChannel mqttInputChannelTwo(){
return new DirectChannel();
}
@Bean
public MessageProducer inboundTwo(){
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883","testClient3",
"testTopic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannelTwo());
return adapter;
}
@Bean
@ServiceActivator(inputChannel ="mqttInputChannelTwo")
public MessageHandler handlerTwo(){
return new MessageHandler(){
@Override
public void handleMessage(Message<?> message)throws MessagingException {
System.out.Payload()+"====="+ Headers().get("mqtt_receivedTopic"));
}
};
}
/**
* =====================================出站适配器=====================================
*/
@Bean
public MqttPahoClientFactory mqttClientFactory(){
DefaultMqttPahoClientFactory factory =new DefaultMqttPahoClientFactory();
MqttConnectOptions options =new MqttConnectOptions();
options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"});
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel ="mqttOutboundChannel")
public MessageHandler mqttOutbound(){
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient2",mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("test");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel(){
return new DirectChannel();
}
/**
* =====================================发送的接⼝=====================================
*/
@Component
@MessagingGateway(defaultRequestChannel ="mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
}
}
上⾯就是代码,解释都在官⽅⽂档⾥⾯了,⼤家搞个翻译,翻译⼀下官⽅⽂档就⼤致明⽩了。这⾥说⼀些注意的点。
代码中配置了两个⼊站适配器,⼀个出站适配器,这三个适配器的ClientID最好不要⼀样,要不然会⼀直出现断连重连现象。
发送的接⼝最好分离出去⼀个单独的类,我写在⼀起,但是实际使⽤的时候⼀直出问题,因为时间问题也没有去原因。
代码的这种写法不是MQTT的单独的写法,我了解到这个好像是Spring的⼀种模式吧,没有来得及了解,后续肯定会去了解⼀下。
SpringBoot的这种整合有⼀些BUG,好像是MQTT官⽅解决了,但是SpringBoot官⽅还存在这个问题。具体没有遇到这个问题,所以不清楚现象,留个坑。
官⽅还可以写⼀个监听函数,⽤来监听连接成功和断连的时候,这⾥没有上代码,没有啥可以讲的。在官⽅⽂档的最后⾯有提到。
下⾯就是上调⽤的代码,这⾥直接使⽤controller调⽤了,没有啥复杂的。
@RequestMapping(value ="/mqtt", method = RequestMethod.GET)
public String sendMQTTMsg(){
myGateway.sendToMqtt("testTopic1","hello1");
myGateway.sendToMqtt("testTopic2","hello2");
return"hello";
}
剩下就没有啥可以讲的了,时间赶的急,后⾯遇到再说吧,再次留个坑。
就这样吧,结束。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论