java实现简单的MQTT协议
简单实现⼀个MQTT协议,需要⼀个本地服务器
下载EMQX ,在本地安装好并运⾏起来,与Tomcat服务器类似,不过⽐Tomcat服务器安装的步骤少且简单不少。
java实现MQTT协议需要有三个类,⼀个客户端类,⼀个服务端类,还有⼀个回调函数类。
第⼀步:创建maven⼯程,添加maven依赖
<dependencies>
<dependency>
<groupId&lipse.paho</groupId>
<artifactId&lipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
第⼆步:创建三个类(包名随意)
服务端类
import*;
import MemoryPersistence;
import Scanner;
public class ServiceMQTT {
public static final String HOST ="tcp://localhost:1883";
private String ServiceID ="ServiceFirst";
private String topic;
private MqttClient client;
private MqttTopic mqttTopic;
private MqttConnectOptions options;
private String user ="admin";
private String password ="public";
private MqttMessage message;
public ServiceMQTT()throws MqttException {
//创建连接
client =new MqttClient(HOST,ServiceID,new MemoryPersistence());
options =new MqttConnectOptions();
options.setCleanSession(false);
options.setKeepAliveInterval(20);
options.setConnectionTimeout(50);
options.setUserName(user);
options.CharArray());
message =new MqttMessage();
}
public void getConnect(){
try{
client.setCallback(new PublishCallBack());
mqttTopic = Topic(topic);
}catch(MqttException e){
e.printStackTrace();
}
}
public void publish(MqttTopic topic, MqttMessage message)throws MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println("消息推送的状态--->"+token.isComplete());
System.out.println("消息推送的状态--->"+token.isComplete());
}
public static void main(String[] args)throws MqttException {
ServiceMQTT service =new ServiceMQTT();
Scanner input =new Scanner(System.in);
System.out.print("请输⼊消息的主题:");
System.out.print("请输⼊消息的内容:");
String messageVal = ();
service.publish(service.ssage);
System.out.println("消息的保持状态:"+ssage.isRetained()); }
}
客户端类
import*;
import MemoryPersistence;
import Scanner;
public class ClientMQTT {
public static final String HOST ="tcp://localhost:1883";
private static final String clientID ="clientFirst";
private String TOPIC;
private MqttClient client;
private MqttConnectOptions options;
private String user ="admin";
private String password ="public";
public void clientStart(){
try{
client =new MqttClient(HOST,clientID,new MemoryPersistence());
options =new MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(10);
options.setConnectionTimeout(50);
options.setUserName(user);
options.CharArray());
client.setCallback(new PublishCallBack());
Scanner input =new Scanner(System.in);
System.out.print("请输⼊订阅的主题:");
TOPIC = ();
MqttTopic topic = Topic(TOPIC);
//setWill⽅法,如果项⽬中需要知道客户端是否掉线可以调⽤该⽅法。设置最终端⼝的通知消息            options.setWill(topic,"close".getBytes(),1,true);
int[] Qos ={1};
String[] topic1 ={TOPIC};
client.subscribe(topic1,Qos);
}catch(MqttException e){
e.printStackTrace();
}
}
public static void main(String[] args)throws MqttException {
ClientMQTT clientMQTT =new ClientMQTT();
clientMQTT.clientStart();
}
}
回调函数类
import IMqttDeliveryToken;
import MqttCallback;
import MqttMessage;
public class PublishCallBack implements MqttCallback {
public void connectionLost(Throwable throwable){
开源mqtt服务器//连接断掉会执⾏到这⾥
System.out.println("连接以断,请重新连接");
}
public void messageArrived(String s, MqttMessage mqttMessage)throws Exception {
//subscribe后会执⾏到这⾥
System.out.println("消息的主题是:"+s);
System.out.println("消息的Qos是:"+Qos());
System.out.println("消息的ID是:"+Id());
System.out.println("消息的内容是:"+new Payload()));
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken){
//publish可以执⾏到这⾥
System.out.println("This is deliveryComplete method----->"+iMqttDeliveryToken.isComplete()); }
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。