AndroidMQTT客户端开发实例1.添加mqtt依赖
app的build中添加依赖如下
//mqtt
compile 'lipse.lipse.paho.client.mqttv3:1.1.0'
compile 'lipse.lipse.paho.android.service:1.1.1'
2.mainfests⽂件中添加权限
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
...
<application
eclipse开发手机app
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:roundIcon="@mipmap/ic_launcher_round"
android:supportsRtl="true"
android:theme="@style/AppTheme">
<activity android:name=".MainActivity">
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
<!--mqtt service-->
<service android:name="lipse.paho.android.service.MqttService" />
3.⾃定义MQTT⼯具类
package nano.dev.zjhi.utils;
t.Context;
import android.os.Handler;
import android.os.Message;
import android.util.Log;
lipse.paho.android.service.MqttAndroidClient;
lipse.paho.client.mqttv3.IMqttActionListener;
lipse.paho.client.mqttv3.IMqttDeliveryToken;
lipse.paho.client.mqttv3.IMqttToken;
lipse.paho.client.mqttv3.MqttCallback;
lipse.paho.client.mqttv3.MqttConnectOptions;
lipse.paho.client.mqttv3.MqttException;
lipse.paho.client.mqttv3.MqttMessage;
/**
* CreateTime 2019/8/8 16:11
* Author LiuShiHua
* Description:
*/
public class MqttUtil {
private final String TAG = "------------->mqtt";
private static MqttUtil mqttUtil;
private Context context;
private Context context;
private MqttAndroidClient mqttAndroidClient;
private MqttConnectOptions mMqttConnectOptions;
private boolean isConnectSuccess = false, isConnectionLost = true;
//MQTT相关配置
private final String CLIENTID = "AndroidUser";
public String HOST = "tcp://117.237.101.28:1883";//服务器地址(协议+地址+端⼝号)
public String USERNAME = "root";//⽤户名
public String PASSWORD = "root";//密码
public static String PUBLISH_TOPIC = "theme/city_id/country_id/company_id/sn";//发布主题
public static String RESPONSE_TOPIC = "theme/city_id/country_id/company_id/sn";//订阅主题
/**
* QUALITY_OF_SERVICE
* ⾄多⼀次,消息发布完全依赖底层 TCP/IP ⽹络。会发⽣消息丢失或重复。这⼀级别可⽤于如下情况,环境传感器数据,丢失⼀次读记录⽆所谓,因为不久后还会    * ⾄少⼀次,确保消息到达,但消息重复可能会发⽣。
* 只有⼀次,确保消息到达⼀次。这⼀级别可⽤于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果
*/
private final int QUALITY_OF_SERVICE = 0;//服务质量,0最多⼀次,1最少⼀次,2只⼀次
private final int HAND_RECONNECT = 1;//重连hand
private final int RECONNECT_TIME_CONFIG = 10 * 1000;//重连时间间隔为10秒
private Handler handler = new Handler() {
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
switch (msg.what) {
case HAND_RECONNECT:
if (!isConnectSuccess)
doClientConnection();//连接失败,重连(可关闭服务器进⾏模拟)
break;
}
}
};
//MQTT是否连接成功的监听
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken arg0) {
Log.i(TAG, "连接成功 ");
isConnectSuccess = true;
try {
mqttAndroidClient.subscribe(PUBLISH_TOPIC, QUALITY_OF_SERVICE);//订阅主题,参数:主题、服务质量
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
Log.i(TAG, "onFailure 连接失败:" + Message());
isConnectSuccess = false;
handler.sendEmptyMessageDelayed(HAND_RECONNECT, RECONNECT_TIME_CONFIG);
}
};
//订阅主题的回调
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG, "收到消息: " + new Payload()) + "\tToString:" + String());
//收到其他客户端的消息后,响应给对⽅告知消息已到达或者消息有问题等
//收到其他客户端的消息后,响应给对⽅告知消息已到达或者消息有问题等
//response("message arrived:"+message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
Log.i(TAG, "deliveryComplete");
}
@Override
public void connectionLost(Throwable arg0) {
Log.i(TAG, "连接断开");
doClientConnection();//连接断开,重连
}
};
//单例模式
public static MqttUtil getInstance(Context context) {
if (mqttUtil == null) {
mqttUtil = new MqttUtil(context);
}
return mqttUtil;
}
private MqttUtil(Context context) {
initMqtt();
}
private void initMqtt() {
String serverURI = HOST; //服务器地址(协议+地址+端⼝号)
mqttAndroidClient = new MqttAndroidClient(context, serverURI, CLIENTID);
mqttAndroidClient.setCallback(mqttCallback); //设置订阅消息的回调
mMqttConnectOptions = new MqttConnectOptions();
mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存
mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒
mMqttConnectOptions.setKeepAliveInterval(20); //设置⼼跳包发送间隔,单位:秒
mMqttConnectOptions.setUserName(USERNAME); //设置⽤户名
mMqttConnectOptions.CharArray()); //设置密码
// last will message
boolean doConnect = true;
String message = "{\"terminal_uid\":\"" + CLIENTID + "\"}";
String topic = PUBLISH_TOPIC;
Integer qos = QUALITY_OF_SERVICE;
Boolean retained = false;
if ((!message.equals("")) || (!topic.equals(""))) {
try {
mMqttConnectOptions.setWill(topic, Bytes(), qos.intValue(), retained.booleanValue());            } catch (Exception e) {
Log.i(TAG, "setWill Exception Occured:" + e.getMessage(), e);
doConnect = false;
}
}
if (doConnect) {
L.d("mMqttConnectOptions.setWill Success");
doClientConnection();
}
}
/**
* 连接MQTT服务器
*/
private void doClientConnection() {
L.d("是否链接成功:" + mqttAndroidClient.isConnected());
L.d("是否链接成功:" + mqttAndroidClient.isConnected());
if (!mqttAndroidClient.isConnected() && Tools.isInternetConnect(context)) {
try {
} catch (MqttException e) {
L.d("doClientConnection:" + e.getMessage());
e.printStackTrace();
}
}
}
/**
* 发布消息
*
* @param message 消息
*/
public void publish(String message) {
String topic = PUBLISH_TOPIC;
Integer qos = QUALITY_OF_SERVICE;
Boolean retained = false;
try {
if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
//参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后⼀条消息       
        mqttAndroidClient.publish(topic, Bytes(), qos.intValue(), retained.booleanValue());            } else {
L.d("mqttAndroidClient is Null");
}
} catch (MqttException e) {
L.d("publish MqttException:" + e.getMessage());
e.printStackTrace();
}
}
public void response(String message) {
String topic = RESPONSE_TOPIC;
Integer qos = QUALITY_OF_SERVICE;
Boolean retained = false;
try {
//参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后⼀条消息            mqttAndroidClient.publish(topic, Bytes(), qos.intValue(), retained.booleanValue());        } catch (MqttException e) {
L.d("publish:" + e.getMessage());
e.printStackTrace();
}
}
//断开链接
public void disconnect() {
try {
if (mqttAndroidClient != null){
mqttAndroidClient.unregisterResources();
mqttAndroidClient.disconnect();
}
} catch (MqttException e) {
e.printStackTrace();
}
}
}
4.⽤mqtt来发送Android⼿机加速度传感器的检测值
这⾥⽤来发送每100毫秒的Y轴的加速度平均值
package nano.dev.zjhi.service;
import android.app.IntentService;
t.Context;
t.Intent;
import android.hardware.Sensor;
import android.hardware.SensorEvent;
import android.hardware.SensorEventListener;
import android.hardware.SensorManager;
import android.os.Handler;
import android.os.Message;
import nano.dev.zjhi.utils.MqttUtil;
/
**
* CreateTime 2019/8/7 11:00
* Author LiuShiHua
* Description:
*/
public class SensorService extends IntentService {
private Context context;
private SensorManager sensorManager;
private Sensor rotationVectorSensor;
private double yt100 = 0.0; //记录100ms平均加速度
private long startYt100Date = 0;
private boolean yt100Reset = false;
private int yt100times = 0;
private MqttUtil mqttUtil;
public SensorService() {
super("SensorService");
}
public static void startService(Context context) {
context.startService(new Intent(context, SensorService.class));
}
private final int HAND_100ms = 1;//100ms计算⼀次平均加速度
private final int HAND_1000ms = 2;//1000ms计算⼀次平均加速度
private Handler handler = new Handler() {
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
switch (msg.what) {
case HAND_100ms:
yt100Reset = true;
break;
case HAND_1000ms:
break;
}
}
};
@Override
public void onCreate() {
context = this;
mqttUtil = Instance(context);
sensorManager = (SensorManager) getSystemService(SENSOR_SERVICE);
// 获取加速度传感器
rotationVectorSensor = DefaultSensor(Sensor.TYPE_ACCELEROMETER);        startYt100Date = System.currentTimeMillis();
handler.sendEmptyMessageDelayed(HAND_100ms, 100);

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。