mqtt物联⽹协议:java实现d create mybroker
然后会⾃动创建⼀个mybroker的⽂件夹:为刚刚命令后取的名字
3、再进⼊mybroke⽂件夹⾥⾯,再执⾏cmd 命令:
代理服务器就启动成功了!
⼆、实现java代码:
其实服务端和客户端都是mqtt服务器的客户,只是⼀个发布,⼀个接收:
依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="/POM/4.0.0"xsi="/2001/XMLSchema-instance"
schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
<relativePath/><!-- lookup parent from repository -->
</parent>
<groupId&le</groupId>
<artifactId>dtest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>dtest</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
服务端代码:
package;
import*;
import MemoryPersistence;
public class MqttService {
//tcp: MQTT安装的服务器地址:MQTT定义的端⼝
public static final String HOST ="tcp://0.0.0.0:61613";
//定义⼀个主题
public static final String TOPIC ="topic11";
//定义MQTT的ID,可以在MQTT服务配置中指定
public static final String clentid ="server11";
private MqttClient client;
private MqttTopic topic11;
private String userName ="admin";
private String passWord ="password";
private MqttMessage message;
/**
* 构造函数
* @throws MqttService
*/
public MqttService()throws MqttException {
/
/ MemoryPersistence设置clientid的保存形式,默认为以内存保存
client =new MqttClient(HOST, clentid,new MemoryPersistence());
connect();
}
/**
* 连接服务器
*/
private void connect(){
MqttConnectOptions options =new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.CharArray());
//设置超时时间
options.setConnectionTimeout(10);
//设置回话⼼跳时间
options.setKeepAliveInterval(20);
try{
client.setCallback(new PushCallback());
topic11 = Topic(TOPIC);
}catch(MqttException e){
e.printStackTrace();
}
}
/**
*
* @param topic
* @param message
* @throws MqttPersistenceException
* @throws MqttException
*/
public void publish(MqttTopic topic,MqttMessage message)throws MqttException { MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println("message is published completely!");
}
/**
* 启动⼊⼝
* @param args
* @throws MqttException
*/
public static void main(String[] args)throws MqttException {
MqttService server =new MqttService();
server.pic11 , ssage);
System.out.ssage.isRetained()+"------ratained状态");
}
}
客户端代码:
package;
import MqttClient;
import MqttConnectOptions;
import MqttException;
import MqttTopic;
import MemoryPersistence;
import ScheduledExecutorService;
public class MyMqttClient {
public static final String HOST ="tcp://0.0.0.0:61613";
public static final String TOPIC ="topic11";
private static final String clientid ="client11";
private MqttClient client;
private MqttConnectOptions options;
private String userName ="admin";
private String passWord ="password";
private ScheduledExecutorService scheduler;
private void start(){
try{
// host为主机名,clientid即连接MQTT的客户端ID,⼀般以唯⼀标识符表⽰,
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
client =new MqttClient(HOST,clientid,new MemoryPersistence());
// MQTT的连接设置
options =new MqttConnectOptions();
// 设置是否清空session,这⾥如果设置为false表⽰服务器会保留客户端的连接记录,
// 这⾥设置为true表⽰每次连接到服务器都以新的⾝份连接
options.setCleanSession(true);
/
/ 设置连接的⽤户名
开源mqtt服务器options.setUserName(userName);
// 设置连接的密码
options.CharArray());
// 设置超时时间单位为秒
options.setConnectionTimeout(10);
// 设置会话⼼跳时间单位为秒服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,// 但这个⽅法并没有重连的机制
// 但这个⽅法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置回调
client.setCallback(new PushCallback());
MqttTopic topic = Topic(TOPIC);
//setWill⽅法,如果项⽬中需要知道客户端是否掉线可以调⽤该⽅法。设置最终端⼝的通知消息 options.setWill(topic,"close".getBytes(),2,true);
int[] Qos ={1};
String[] topic1 ={TOPIC};
client.subscribe(topic1,Qos);
}catch(MqttException e){
e.printStackTrace();
}
}
public static void main(String[] args){
MyMqttClient client =new MyMqttClient();
client.start();
}
}
回滚代码:⼀般⾮阻塞⽅式,都是要使⽤回滚形式的:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论