MQTT简单demo(java)
上次已经简单的谈了⼀些MQTT协议的⼀些知识,今天就来就上次的知识具体的Java实现。
现在就来具体说说实现这⼀步吧。中间的时间也是有点久。
MQTT消息的发送和订阅都是依赖MQTT服务器的,没有MQTT服务器,你的客户端是⽆法订阅和发送消息的。所以在最开始的时候,可以选择性的在你的电脑上⾯安装⼀个MQTT服务器。MQTT服务器有很多,⼤家也可以在⽹上去⼀些安装教程,这⾥因为和我要讲内容关系不⼤,所以不再累述。
MQTT协议中是没有发送者和接收者·的概念,所有的连接都是⽤户,所以⼀个MQTT连接既可以发送消息,也可以接收消息。就等于所有的连接都是客户端。下⾯我的客户端代码也是如此,因为公司这边接收的信息先是要进⾏认证,认证成功后再接收有⽤的信息。这时,客户端在根据设备的信息来控制⽹关上⾯的设备,达到远程控制设备的⽬的。因为要使⽤服务器来转发消息,所以对于服务器的测试也是⽐较重要的,但是我使⽤的是公司的服务器,所以这⼀块我的了解⽐较少。但是我这边有⼀些⼯具,⾕歌浏览器的插件MQTTLens。可能会帮助你。(需要翻阅墙体)
下⾯就说⼀说具体的思路,这边我的代码是基于公司的⽹关需求,所以先说⼀说公司⽹关的具体流程。⾸先,⽹关会⼀直发送⾝份验证消息,等待客户端认证,客户端认证通过后,会发送具体有⽤的信息。
客户端这时在根据⽹关信息发送控制命令,到达控制的⽬的。在这个过程中,客户端有订阅和发送,所以⼀个客户端就练习了发送消息和订阅消息。这就是公司的具体操作流程。下⾯就说⼀说代码的流程。
运⾏时要使⽤jar包,也可使⽤maven,但是使⽤maven时要注意版本。
依赖为:
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
</dependency>
下⾯开始编写demo
⾸先先要配置MQTT的⼀些配置,配置⽐较多,也很繁琐。
主要是配置主机号和端⼝号,根据⾃⼰的配置编写代码,在配置其他的⼀些细节配置,主要是和连接有关的。
代码如下:
// MQTT设置说明
// 设置主机号
mqtt.setHost("服务器地址和端⼝号");
// ⽤于设置客户端会话的ID。在setCleanSession(false);被调⽤时,MQTT服务器利⽤该ID获得相应的会话。此ID应少于23个字符,默认根据本机地址、端⼝和时间⾃ mqtt.setClientId("876543210");
// 若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true
mqtt.setCleanSession(false);
// 定义客户端传来消息的最⼤时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从⽽避免TCP/IP超时的长时间等待
mqtt.setKeepAlive((short) 60);
// 服务器认证⽤户名
mqtt.setUserName("admin");
// 服务器认证密码
mqtt.setPassword("admin");
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
mqtt.setWillTopic("willTopic");
// 设置“遗嘱”消息的内容,默认是长度为零的消息
mqtt.setWillMessage("willMessage");
// 设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE
mqtt.setWillQos(QoS.AT_LEAST_ONCE);
/
/ 若想要在发布“遗嘱”消息时拥有retain选项,则为true
mqtt.setWillRetain(true);
// 设置版本
mqtt.setVersion("3.1.1");
// 失败重连接设置说明
// 客户端⾸次连接到服务器时,连接的最⼤重试次数,超出该次数客户端将返回错误。-1意为⽆重试上限,默认为-1
mqtt.setConnectAttemptsMax(10L);
// 客户端已经连接到服务器,但因某种原因连接断开时的最⼤重试次数,超出该次数客户端将返回错误。-1意为⽆重试上限,默认为-1
mqtt.setReconnectAttemptsMax(3L);
// ⾸次重连接间隔毫秒数,默认为10ms
mqtt.setReconnectDelay(10L);
// 重连接间隔毫秒数,默认为30000ms
mqtt.setReconnectDelayMax(30000L);
// 设置重连接指数回归。设置为1则停⽤指数回归,默认为2
mqtt.setReconnectBackOffMultiplier(2);
// Socket设置说明
// 设置socket接收缓冲区⼤⼩,默认为65536(64k)
mqtt.setReceiveBufferSize(65536);
// 设置socket发送缓冲区⼤⼩,默认为65536(64k)
mqtt.setSendBufferSize(65536);
// 设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最⼤化传输
mqtt.setTrafficClass(8);
// 带宽限制设置说明
// 设置连接的最⼤接收速率,单位为bytes/s。默认为0,即⽆限制
mqtt.setMaxReadRate(0);
// 设置连接的最⼤发送速率,单位为bytes/s。默认为0,即⽆限制
mqtt.setMaxWriteRate(0);
// 选择消息分发队列
// 若没有调⽤⽅法setDispatchQueue,客户端将为连接新建⼀个队列。如果想实现多个连接使⽤公⽤的队列,显式地指定队列是⼀个⾮常⽅便的实现⽅法
mqtt.ateQueue("foo"));
下⾯开始讲讲连接和订阅和发送主题开源mqtt服务器
fusesource提供三种mqtt client api,分别为阻塞API,基于Futur的API和回调API。
其中,阻塞API是在tBlocking⽅法建⽴连接和提供阻断API的连接。
基于Futur的API则是:在tFuture⽅法建⽴连接,为您提供了⼀个与结合Futur的连接。所有操作的连接是⽆阻塞的,
并且经由返回的结果。
回调API是最复杂的也是性能最好的,另外两种均是对回调API的封装。
因为回调API有些复杂,现在只是介绍回调API的封装。就是前两个,前两个的区别是第⼀个为阻塞的,第⼆个不是阻塞。下⾯开始代码演⽰。
第⼀个阻塞API。代码如下:
// 接收消息
Message message = ive();
// 将收到的消息封装为消息类
ReceiveRealValiedateVO realValiedateVO = JSON.PayloadBuffer().toByte
Array(),
ReceiveRealValiedateVO.class);
System.out.G_id());
if (S_id().equals("1") && S_id().equals("1")) {
// 新建json
JSONObject jsonParam = new JSONObject();
// 封装json
jsonParam.put("s_id", S_id());
jsonParam.put("g_id", G_id());
jsonParam.put("seq", Seq());
jsonParam.put("type", Type());
jsonParam.put("valid", "true");
String s = String();
// 发送特定主题的消息
connection.publish("主題", s.getBytes(), QoS.AT_LEAST_ONCE, false);
}
// 打印主题
System.out.Topic());
// byte[] payload = Payload();
System.out.println(String.PayloadBuffer()));
// process the message then:
message.ack();
/
/ 连接断开
// connection.disconnect();
具体的解释都在代码⾥了,剩下就没有什么了。
要注意的点就是连接中断的处理,和对于服务器的处理。
第⼆种就是使⽤future连接,代码如下:
// 使⽤future连接
FutureConnection connection = mqtt.futureConnection();
Future<Void> f1 = t();
f1.await();
// 订阅消息
Future<byte[]> f2 = connection.subscribe(new Topic[] { new Topic("主题", QoS.AT_LEAST_ONCE) });
/
/
byte[] qoses = f2.await();
// 发送⾝份验证消息.
// Future<Void> f3 = connection.publish("foo", "Hello".getBytes(),
// QoS.AT_LEAST_ONCE, false);
// 接收订阅消息..
Future<Message> receive = ive();
// 打印消息.
Message message = receive.await();
System.out.println(String.PayloadBuffer()));
// 回应
message.ack();
//
Future<Void> f4 = connection.disconnect();
f4.await();
第三个是最难的,我这边的代码也是有点乱,直接上代码吧。
// 监听
connection.listener(new Listener() {
@Override
public void onPublish(UTF8Buffer topicmsg, Buffer msg, Runnable ack) {
// utf-8 is used for dealing with the garbled
String topic = topicmsg.utf8().toString();
String payload = msg.utf8().toString();
System.out.println(topic + " " + payload);
String Amsg = AuthenticationSendDemo.Authentication(topic, payload);
if (topic.equals("主题")) {
// 重起⼀个阻塞线程
public void run() {
connection.publish("主题", Bytes(), QoS.AT_LEAST_ONCE, false,
new Callback<Void>() {
@Override
public void onSuccess(Void args) {
/
/ 表⽰发布主题成功
System.out.println("发布成功!");
System.out.println("发布的消息" + Amsg);
}
@Override
public void onFailure(Throwable throwable) {
// 表⽰发布主题失败
System.out.println("发布失败!");
}
});
}
});
}
// 表⽰监听成功
ack.run();
}
@Override
public void onFailure(Throwable value) {
// 表⽰监听失败
}
// execute only once when connection is ended
@Override
public void onDisconnected() {
// 表⽰监听到断开连接
System.out.println("断开连接!!");
}
// execute only once when connecting started
@Override
public void onConnected() {
// 表⽰监听到连接成功
System.out.println("haha");
System.out.println();
}
});
因为代码中使⽤到了线程和回调,我对于这两个掌握的也不是很好,也不再这⾥乱扯,有⼤佬知道⽐较好的写法最好指点⼀下。在这⾥感谢。
三种写法都写完了,下⾯谈⼀谈感想和中间遇到的问题。
以为看具体的⽂档实在太多了,现在公司还在忙着赶项⽬,我这边时间也不是很多,代码的整理以后有时间在说。我感觉最重要的还是对于协议的⼀些掌握和体会,这些要⽐上⾯的代码重要的多,因为你最终的代码还是要和项⽬整合的,和Spring整合的时候你会发现这些都是框架提供好了,你需要做的就是填参数,但是整合中遇到的问题的解决办法都是你从写上⾯的代码中得到的。
因为刚开始写代码,所以代码中的注释也是⾮常多的,这⾥也不再累述。写上⾯的代码的时候遇到了很多的问题,解决的⽹站都在我第⼀篇MQTT博客中,⽐如MQTT的官⽹,⽹上的⽂章都是抄的,要不就是⼀知半解(我也是)。最终还是看⾃⼰的深⼊体会。
就这样吧,结束。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论