SpringBoot集成MQTT配置
⽬录
1. 前⾔
公司的主要采⽤MQTT(消息队列遥测传输)对底层的驱动做命令下发和数据采集。也⽤到了redis、zeroMQ、nats等消息中间件。今天先整理笔记和⼯作中遇到的问题。
2. MQTT介绍
MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
MQTT除了具备⼤部分消息中间件拥有的功能外,其最⼤的特点就是⼩型传输。以减少开销,减低⽹络流量的⽅式去满⾜低带宽、不稳定的⽹络远程传输。
MQTT服务器有很多,⽐如Apache-Apollo和EMQX,⽬前使⽤的时EMQX作为MQTT的服务器。使⽤也很简单,下载解压后,进⼊bin⽬录执⾏emqx console 启动服务。MQTT调试⼯具可以⽤MQTTBox
3. SpringBoot 集成MQTT
3.1 导⼊mqtt库
第⼀步:导⼊⾯向企业应⽤集成库和对应mqtt集成库
compile('org.springframework.boot:spring-boot-starter-integration')
compile('org.springframework.integration:spring-integration-mqtt')
这⾥要注意spring-integration-mqtt的版本。因为会存在lipse.paho.client.mqttv3修复了⼀些bug,并迭代了新版本。但spring-integration-mqtt并没有及时更新的情况。修改⽅法如下
compile("org.springframework.integration:spring-integration-mqtt") {
exclude group: "lipse.paho" , module: "lipse.paho.client.mqttv3"
}
compile("lipse.lipse.paho.client.mqttv3:1.2.2")
第⼆步:MQTT连接配置⽂件
# MQTT Config
mqtt.server=tcp://:1883
mqtt.username=xxx
mqtt.password=xxx
mqtt.client-id=clientID
mqtt.cache-number=100
3.2 配置MQTT订阅者
第⼀步:配置MQTT客户端⼯⼚类DefaultMqttPahoClientFactory
第⼆步:配置MQTT⼊站消息适配器MqttPahoMessageDrivenChannelAdapter
第三步:定义MQTT⼊站消息通道MessageChannel
第四步:声明MQTT⼊站消息处理器MessageHandler
以下有些配置是冲突或者重复的,主要是体现⼀些重要配置。
package com.fig
import com.ssage.ITDragonMQTTMessageHandler
lipse.paho.client.mqttv3.MqttConnectOptions
import org.springframework.beans.factory.annotation.Value
import t.annotation.Bean
import t.annotation.Configuration
import org.springframework.integration.annotation.ServiceActivator
import org.springframework.integration.channel.DirectChannel
import org.MessageProducer
import org.springframework.DefaultMqttPahoClientFactory
import org.springframework.MqttPahoClientFactory
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter
import ssaging.MessageChannel
import ssaging.MessageHandler
import java.time.Instant
@Configuration
class MQTTConfig {
@Value("\${mqtt.server}")
lateinit var mqttServer: String
@Value("\${mqtt.user-name}")
lateinit var mqttUserName: String
@Value("\${mqtt.password}")
lateinit var mqttUserPassword: String
@Value("\${mqtt.client-id}")
lateinit var clientID: String
@Value("\${mqtt.cache-number}")
lateinit var maxMessageInFlight: String
@Value("\${pic}")
lateinit var messageTopic: String
/**
* 配置DefaultMqttPahoClientFactory
* 1. 配置基本的链接信息
* 2. 配置maxInflight,在mqtt消息量⽐较⼤的情况下将值设⼤
*/
fun mqttClientFactory(): MqttPahoClientFactory {
val mqttConnectOptions = MqttConnectOptions()
// 配置mqtt服务端地址,登录账号和密码
mqttConnectOptions.serverURIs = arrayOf(mqttServer)
mqttConnectOptions.userName = mqttUserName
mqttConnectOptions.password = CharArray()
// 配置最⼤不确定接收消息数量,默认值10,qos!=0 时⽣效
mqttConnectOptions.maxInflight = Int()
val factory = DefaultMqttPahoClientFactory()
return factory
}
/**
* 配置Inbound⼊站,消费者基本连接配置
* 1. 通过DefaultMqttPahoClientFactory 初始化⼊站通道适配器
* 2. 配置超时时长,默认30000毫秒
* 3. 配置Paho消息转换器
* 4. 配置发送数据的服务质量 0~2
* 5. 配置订阅通道
*/
@Bean
fun itDragonMqttInbound(): MessageProducer {
// 初始化⼊站通道适配器,使⽤的是Eclipse Paho MQTT客户端库
val adapter = MqttPahoMessageDrivenChannelAdapter(clientID + w().toEpochMilli(), mqttClientFactory(), messageTopic) // 设置连接超时时长(默认30000毫秒)
adapter.setCompletionTimeout(30000)
// 配置默认Paho消息转换器(qos=0, retain=false, charset=UTF-8)
adapter.setConverter(DefaultPahoMessageConverter())
// 设置服务质量
// 0 最多⼀次,数据可能丢失;
// 1 ⾄少⼀次,数据可能重复;
// 2 只有⼀次,有且只有⼀次;最耗性能
adapter.setQos(0)
// 设置订阅通道
adapter.outputChannel = itDragonMqttInputChannel()
return adapter
}
/**
* 配置Inbound⼊站,消费者订阅的消息通道
*/
@Bean
fun itDragonMqttInputChannel(): MessageChannel {
return DirectChannel()
}
/**
* 配置Inbound⼊站,消费者的消息处理器
* 1. 使⽤@ServiceActivator注解,表明所修饰的⽅法⽤于消息处理
* 2. 使⽤inputChannel值,表明从指定通道中取值
* 3. 利⽤函数式编程的思路,解耦MessageHandler的业务逻辑
*/
@Bean
@ServiceActivator(inputChannel = "itDragonMqttInputChannel")
fun commandDataHandler(): MessageHandler {
/*return MessageHandler { message ->
println(message.payload)
}*/
return ITDragonMQTTMessageHandler()
}
}
注意:
1)MQTT的客户端ID要唯⼀。
2)MQTT在消息量⼤的情况下会出现消息丢失的情况。
3)MessageHandler注意解耦问题。
3.3 配置MQTT发布者
第⼀步:配置Outbound出站,出站通道适配器
第⼆步:配置Outbound出站,发布者发送的消息通道
第三步:对外提供推送消息的接⼝
在原有的MQTTConfig配置类的集成上补充以下内容
/**
* 配置Outbound出站,出站通道适配器
* 1. 通过MqttPahoMessageHandler 初始化出站通道适配器
* 2. 配置异步发送
* 3. 配置默认的服务质量
*/
@Bean
@ServiceActivator(inputChannel = "itDragonMqttOutputChannel")
fun itDragonMqttOutbound(): MqttPahoMessageHandler {
// 初始化出站通道适配器,使⽤的是Eclipse Paho MQTT客户端库
val messageHandler = MqttPahoMessageHandler(clientID + w().toEpochMilli() + "_set", mqttClientFactory())
// 设置异步发送,默认是false(发送时阻塞)
messageHandler.setAsync(true)
// 设置默认的服务质量
messageHandler.setDefaultQos(0)
return messageHandler
}
/**
* 配置Outbound出站,发布者发送的消息通道
*/
@Bean
fun itDragonMqttOutputChannel(): MessageChannel {
return DirectChannel()
}
/**
* 对外提供推送消息的接⼝
* 1. 使⽤@MessagingGateway注解,配置MQTTMessageGateway消息推送接⼝
* 2. 使⽤defaultRequestChannel值,调⽤时将向其发送消息的默认通道
* 3. 配置灵活的topic主题
*/
@MessagingGateway(defaultRequestChannel = "itDragonMqttOutputChannel")
interface MQTTMessageGateway {
fun sendToMqtt(data: String, @Header(MqttHeaders.TOPIC) topic: String)
fun sendToMqtt(data: String, @Header(MqttHeaders.QOS) qos: Int, @Header(MqttHeaders.TOPIC) topic: String)
}
注意:
1)发布者和订阅者的客户端ID不能相同。
2)消息的推送建议采⽤异步的⽅式。
3)消息的推送⽅法可以只传payload消息体,但需要配置setDefaultTopic。
3.4 MQTT消息处理和发送
3.4.1 消息处理
为了让消息处理函数和MQTT配置解耦,这⾥提供MessageHandler 注册类,将消息处理的业务逻辑以函数式编程的思维注册到Handler中。package com.ssage
import ssaging.Message
import ssaging.MessageHandler
class ITDragonMQTTMessageHandler : MessageHandler {
private var handler: ((String) -> Unit)? = null
fun registerHandler(handler: (String) -> Unit) {
this.handler = handler
}
override fun handleMessage(message: Message<*>) {
handler?.run { this.invoke(String()) }
}
}
注册MessageHandler
package com.ssage
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import javax.annotation.PostConstruct
@Service
class ITDragonMessageDispatcher {
private val logger = Logger(ITDragonMessageDispatcher::class.java)
@Autowired
lateinit var itDragonMQTTMessageHandler: ITDragonMQTTMessageHandler
@PostConstruct
fun init() {
}
fun itDragonMsgHandler(message: String) {
logger.info("itdragon mqtt receive message: $message")
try {
// todo
}catch (ex: Exception) {
ex.printStackTrace()
}
}
}
3.4.1 消息发送
注⼊MQTT的MessageGateway,然后推送消息。
@Autowired
lateinit var mqttGateway: MQTTConfig.MQTTMessageGateway
@Scheduled(fixedDelay = 10*1000)
spring frameworkfun sendMessage() {
mqttGateway.sendToMqtt("Hello ITDragon ${w()}", "itDragon/tags/cov/set")
}
4. 开发常见问题
4.1 MQTT每次重连失败都会增长线程数
项⽬上线⼀段时间后,客户的服务器严重卡顿。原因是客户服务断⽹后,MQTT在每次尝试重连的过程中⼀直在创建新的线程,导致⼀个Java服务创建了上万个线程。解决⽅案是更新了lipse.paho.client.mqttv3的版本,也是 "3.1 导⼊mqtt库" 中提到的。后续就没有出现这个问题了。
4.2 MQTT消息量⼤存在消息丢失的情况
MQTT的消息量⼤的情况下,既要保障数据的完整,⼜要保障性能的稳定。光从MQTT本⾝上来说,很难做到鱼和熊掌不可兼得。先要理清需求:
1)数据的完整性,主要⽤于能耗的统计、报警的分析
2)性能的稳定性,服务器不挂
在消息量⼤的情况下,可以将服务质量设置成0(最多⼀次)以减少消息确认的开销,⽤来保证系统的稳定性。
将消息的服务质量设置成0后,会让消息的丢失可能性变得更⼤,如何保证数据的完整性?其实可以在往MQTT通道推送消息之前,先将底层驱动采集的数据先异步保存到中。
还有就是每次发送消息量不能太⼤,太⼤也会导致消息丢失。最直接的就是后端报错,⽐如:java.io.EOFException 和 too large message: xxx bytes 。但是有的场景后端没有报错,前端订阅的mqtt也没收到消息。最⿇烦的是mqttbox⼯具因为数据量太⼤直接卡死。⼀时间真不知道把锅甩给谁。其实可以将消息拆包⼀批批发送。可以缓解这个问题 。
其实采集的数据消息,若在这⼀批推送过程中丢失。也会在下⼀批推送过程中补上。命令下发也是⼀样,如果下发失败,再重写下发⼀次。毕竟消息的丢失并不是必现的情况。也是⼩概率事件,系统的稳定性才是最重要的。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论