mqtt协议与emqx相关使⽤
MQTT协议
直接照着百度相关内容可以看出来,mqtt协议其实就是⼀个及时通讯协议,跟rocketMQ类似,也可以说是⼀个消息中间件.
作为⼀个传递消息的协议,mqtt是基于⼀个"发布者->代理服务器->消费者"的⼀个流程进⾏的
发布者负责消息的发布,定制好对应的消息就可以根据topic来把消息发送到服务器上,然后消费者就可以根据对应的topic来实现消息的读取,这样的⼀个流程就是mqtt发送消息到接收消费消息的⼀个过程.
EMQX
emqx是实现mqtt的⼀个消息中间件,当然还有别的⼀些实现,笔者这⾥没有使⽤过,因此就不做记录,为什么会使⽤到emqx作为消息中间件呢,因为⽬前做的⼀个项⽬上,涉及到这样⼀个需求:“设备接⼊”,外部设备接⼊到⽬前开发的系统中,并且设备接⼊后,要保证设备发送的数据实时存储并更新到后台界⾯,有⼀个直观的展⽰,所以在设备接⼊完成后,就需要⼀个渠道来实现消息的发送.我们就采⽤了emqx作为设备和平台数据交互的⼀个中间件.
平台是基于springboot开发的⼀个maven项⽬,关于mqtt和springboot的集成,请⾃⾏百度,⽹上的例⼦很多,这⾥就不进⾏过多的赘述.
值得⼀提的是,关于⽣成mqtt的bean对象的过程,因为使⽤了springboot,并且也没有涉及到集相关的内容,所以直接就把mqtt的初始化对象做成了单例并放⼊了springboot启动的过程中(springboot启动时就保证mqtt也注册到相应的服务器上),下⾯给出代码
config
@Configuration//使⽤@Configuration 的注解类表⽰这个类可以使⽤ Spring IoC容器作为bean 定义的来源
public class MqttConfig {
/**
* 代理服务器ip地址
*/
@Value("${mqtt.url}")//都是直接从配置⽂件中读数据(不会请⾃⾏百度)
public String MQTT_BROKER_HOST;
/**
* qos
*/
@Value("${mqtt.qos}")
public int QOS;
/
**
* topic
*/
private static final String TOPIC ="xxxx/#";//topic  前缀可⾃定义  "/"可作为分隔符  "#"代表接收所有的数据
@Bean//@Bean注解告诉 Spring,⼀个带有 @Bean 的注解⽅法将返回⼀个对象,该对象应该被注册为在 Spring 应⽤程序上下⽂中的 bean public void startMqttPushClient(){
MqttPushClient.MQTT_HOST = MQTT_BROKER_HOST;
MqttPushClient.MQTT_CLIENTID = System.currentTimeMillis()+"";
MqttPushClient instance = Instance();
instance.subscribe(TOPIC, QOS);
}
}
client
public class MqttPushClient {
private static final Logger log = Logger(MqttPushClient.class);
public static String MQTT_HOST ="";
public static String MQTT_CLIENTID ="";
public static String MQTT_USERNAME ="";
public static String MQTT_PASSWORD ="";
public static int MQTT_TIMEOUT =10;
public static int MQTT_KEEPALIVE =10;
private MqttClient client;
private static volatile MqttPushClient mqttClient = null;
//获得实例(单例)
public static MqttPushClient getInstance(){
if(mqttClient == null){
synchronized(MqttPushClient.class){
if(mqttClient == null){
mqttClient =new MqttPushClient();
}
}
}
return mqttClient;
}
private MqttPushClient(){
log.info("Connect MQTT: "+this);
connect();
}
private void connect(){
try{
client =new MqttClient(MQTT_HOST, MQTT_CLIENTID,new MemoryPersistence());
MqttConnectOptions option =new MqttConnectOptions();
option.setCleanSession(true);
// 设置⽤户名
/
/ 设置⽤户名
//            option.setUserName(MQTT_USERNAME);
// 设置密码
//            option.setPassword(CharArray());            option.setConnectionTimeout(MQTT_TIMEOUT);
option.setKeepAliveInterval(MQTT_KEEPALIVE);
option.setAutomaticReconnect(true);
try{
client.setCallback(new MqttPushCallback());//回调
}catch(Exception e){
e.printStackTrace();
}
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 订阅某个主题 qos默认为1
*
* @param topic
*/
public void subscribe(String topic){
subscribe(topic,1);
}
/**
* 订阅某个主题
*
* @param topic
* @param qos
*/
public void subscribe(String topic,int qos){
try{
client.subscribe(topic, qos);
}catch(Exception e){
e.printStackTrace();
}
}
}
callback mqtt回调
public class MqttPushCallback implements MqttCallback {//⼀定要实现MqttCallback接⼝
private static final Logger log = Logger(MqttPushCallback.class);
@Override
public void connectionLost(Throwable cause){
log.info("连接断开,正在尝试重新连接");
cause.printStackTrace();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token){
}
/**
* 处理接收到的消息
*
* @param topic
* @param message开源mqtt服务器
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message)throws Exception {
//接收到订阅的消息+topic  可以在这⾥进⾏消息的逻辑处理
}
}
这就是springboot整合mqtt后的⼀个流程,因为项⽬中只需要接收消息,所以就只给出了接收消息的相关代码,消息的发送可以⾃⾏去⽹上查.

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。