MQTT--PahoCClient的实现和详解
概述
  在⽂章Paho - MQTT C Cient的实现中,我介绍了如何使⽤Paho开源项⽬创建MQTTClient_pulish客户端。但只是简单的介绍了使⽤⽅法,⽽且客户端的结果与之前介绍的并不吻合,今天我就结合新的例⼦,给⼤家讲解⼀下Paho使⽤MQTT客户端的主要过程。
  如同前⾯介绍的,MQTT客户端分为同步客户端和异步客户端。今天主要讲解的是同步客户端,结构还是如同步客户端中介绍的:
  1.创建⼀个客户端对象;
  2.设置连接MQTT服务器的选项;
  3.如果多线程(异步模式)操作被使⽤则设置回调函数(详见 Asynchronous >vs synchronous client applications);
  4.订阅客户端需要接收的任意话题;
  5.重复以下操作直到结束:
    a.发布客户端需要的任意信息;
    b.处理所有接收到的信息;
  6.断开客户端连接;
  7.释放客户端使⽤的所有内存。
实现
  好,直接上代码,MQTT简单的同步客户端。
1 #include <pthread.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include "MQTTClient.h"
6#if !defined(WIN32)
7 #include <unistd.h>
8#else
9 #include <windows.h>
10#endif
11
12#define NUM_THREADS 2
13#define ADDRESS "tcp://localhost:1883" //更改此处地址
14#define CLIENTID "aaabbbccc_pub" //更改此处客户端ID
15#define SUB_CLIENTID "aaabbbccc_sub" //更改此处客户端ID
16#define TOPIC "topic01" //更改发送的话题
17#define PAYLOAD "Hello Man, Can you see me ?!" //
18#define QOS 1
19#define TIMEOUT 10000L
20#define USERNAME "test_user"
21#define PASSWORD "jim777"
22#define DISCONNECT "out"
23
24int CONNECT = 1;
25volatile MQTTClient_deliveryToken deliveredtoken;
26
27void delivered(void *context, MQTTClient_deliveryToken dt)
28 {
29 printf("Message with token value %d delivery confirmed\n", dt);
30 deliveredtoken = dt;
31 }
32
33int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
34 {
35int i;
36char* payloadptr;
37
38 printf("Message arrived\n");
39 printf(" topic: %s\n", topicName);
40 printf(" message: ");
41
42 payloadptr = message->payload;
43if(strcmp(payloadptr, DISCONNECT) == 0){
44 printf(" \n out!!");
45 CONNECT = 0;
46 }
47
48for(i=0; i<message->payloadlen; i++)
49 {
50 putchar(*payloadptr++);
51 }
52 printf("\n");
53
54 MQTTClient_freeMessage(&message);开源mqtt服务器
55 MQTTClient_free(topicName);
56return1;
57 }
58
59void connlost(void *context, char *cause)
60 {
61 printf("\nConnection lost\n");
62 printf(" cause: %s\n", cause);
63 }
64
65void *subClient(void *threadid){
66long tid;
67 tid = (long)threadid;
68 printf("Hello World! It's me, thread #%ld!\n", tid);
69
70 MQTTClient client;
71 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
72int rc;
73int ch;
74
75 MQTTClient_create(&client, ADDRESS, SUB_CLIENTID,
76 MQTTCLIENT_PERSISTENCE_NONE, NULL);
77 conn_opts.keepAliveInterval = 20;
78 conn_opts.cleansession = 1;
79 conn_opts.username = USERNAME;
80 conn_opts.password = PASSWORD;
81
82 MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
83
84if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
85 {
86 printf("Failed to connect, return code %d\n", rc);
87 exit(EXIT_FAILURE);
88 }
89 printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
90"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
91 MQTTClient_subscribe(client, TOPIC, QOS);
92
93do
94 {
95 ch = getchar();
96 } while(ch!='Q' && ch != 'q');
97
98 MQTTClient_unsubscribe(client, TOPIC);
99 MQTTClient_disconnect(client, 10000);
100 MQTTClient_destroy(&client);
101
102 pthread_exit(NULL);
103 }
104void *pubClient(void *threadid){
105long tid;
106 tid = (long)threadid;
107int count = 0;
108 printf("Hello World! It's me, thread #%ld!\n", tid);
109//声明⼀个MQTTClient
110 MQTTClient client;
111//初始化MQTT Client选项
112 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
113//#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 }
114 MQTTClient_message pubmsg = MQTTClient_message_initializer;
115//声明消息token
116 MQTTClient_deliveryToken token;
117int rc;
118//使⽤参数创建⼀个client,并将其赋值给之前声明的client
119 MQTTClient_create(&client, ADDRESS, CLIENTID,
120 MQTTCLIENT_PERSISTENCE_NONE, NULL);
121 conn_opts.keepAliveInterval = 20;
122 conn_opts.cleansession = 1;
123 conn_opts.username = USERNAME;
124 conn_opts.password = PASSWORD;
125//使⽤MQTTClient_connect将client连接到服务器,使⽤指定的连接选项。成功则返回MQTTCLIENT_SUCCESS
126if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
127 {
128 printf("Failed to connect, return code %d\n", rc);
129 exit(EXIT_FAILURE);
130 }
131 pubmsg.payload = PAYLOAD;
132 pubmsg.payloadlen = strlen(PAYLOAD);
133 pubmsg.qos = QOS;
ained = 0;
135while(CONNECT){
136 MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
137 printf("Waiting for up to %d seconds for publication of %s\n"
138"on topic %s for client with ClientID: %s\n",
139 (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
140 rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
141 printf("Message with delivery token %d delivered\n", token);
142 usleep(3000000L);
143 }
144
145
146 MQTTClient_disconnect(client, 10000);
147 MQTTClient_destroy(&client);
148 }
149int main(int argc, char* argv[])
150 {
151 pthread_t threads[NUM_THREADS];
152long t;
153 pthread_create(&threads[0], NULL, subClient, (void *)0);
154 pthread_create(&threads[1], NULL, pubClient, (void *)1);
155 pthread_exit(NULL);
156 }
  在代码中,我创建了两个线程,分别⽤来处理订阅客户端和发布客户端。
整体详解
接下来我讲解⼀下这个简单的客户端,其中,⼤体的流程如下:
  ⼤体的流程如图所⽰,在客户端启动之后,会启动线程,创建⼀个订阅客户端,它会监听消息的到达,在消息到达之后会触发相应的回调函数以对消息进⾏处理;后在启动⼀个线程,创建⼀个发送客户端,⽤来发送消息的,每次发送消息之前会判断是否要掉线,如CONNECT=0则会掉线,否则发送消息给topic01。
订阅客户端详解
  以下函数完成的是订阅的功能。
void *subClient(void *threadid)
过程⼤概如下:
  第⼀步:声明客户端,并通过函数给其赋值;
MQTTClient client;
MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
  第⼆步:设置连接MQTT服务器的选项;
1 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  第三步:设置回调函数;
1 MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
2//相应的回调函数connlost,msgarrvd,delivered我的代码中都有
  第四步:使⽤客户端和连接选项连接服务器;
1 MQTTClient_connect(client, &conn_opts))
  第五步订阅话题;
1 MQTTClient_subscribe(client, TOPIC, QOS);
  第六步⼀直等待,直到输⼊’Q’ 或’q’;
1do
2 {
3 ch = getchar();
4 } while(ch!='Q' && ch != 'q');
  第七步取消订阅;
1 MQTTClient_unsubscribe(client, TOPIC);
  第⼋步.断开客户端连接;
1 MQTTClient_disconnect(client, 10000);
  第九步.释放客户端使⽤的所有内存;
MQTTClient_destroy(&client);
  ⾄此,订阅客户端就结束了。⼀般订阅客户端的⼤体结构都是这样。不同的是回调函数的个性化上。
发送客户端详解
  以下函数完成的是发送的功能。
void *pubClient(void *threadid)
过程⼤概如下:
  第⼀步:声明客户端,并通过函数给其赋值;
1 MQTTClient client;
2 MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
  第⼆步:设置连接MQTT服务器的选项;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  第三步:使⽤客户端和连接选项连接服务器;
MQTTClient_connect(client, &conn_opts)
  第四步设置发送消息的属性;
1 pubmsg.payload = PAYLOAD;
2 pubmsg.payloadlen = strlen(PAYLOAD);
3 pubmsg.qos = QOS;
ained = 0;
  第五步循环发送消息;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
  第六步⼀直等待,当CONNECT=0时退出该客户端;
  第七步.断开客户端连接;
MQTTClient_disconnect(client, 10000);
  第⼋步.释放客户端使⽤的所有内存;
MQTTClient_destroy(&client);
  ⾄此,发送客户端就结束了。⼀般的发送客户端⼤体结构也如此,但异步客户端可能有些许不同,⽆
⾮就是设计回调函数,然后在连接,断开连接等时可以使⽤回调函数做⼀些操作⽽已,具体的可以⾃⼰研究。
  为了让⼤家能够更深⼊了解,我把⾃⼰学到的⼀些函数和结构体⼤致在下⾯讲解了⼀下。
相关结构体
MQTTClient
定义:typedef void* MQTTClient;
含义:代表MQTT客户端的句柄。成功调⽤MQTTClient_create()后,可以得到有效的客户端句柄。
MQTTClient_connectOptions
定义:
typedef struct
{
char struct_id[4];//结构体的识别序列,必须为MQTC
int struct_version;//结构体版本
/**
在0,1,2,3,4,5中取值:
0-表⽰没有SSL选项且没有serverURIs;
1-表⽰没有serverURIs;
2-表⽰没有MQTTVersion
3-表⽰没有返回值;
4-表⽰没有⼆进制密码选项
*/
int keepAliveInterval;
/**
在这段时间内没有数据相关的消息时,客户端发送⼀个⾮常⼩的MQTT“ping”消息,服务器将会确认这个消息
*/
int cleansession;
/**
当cleansession为true时,会话状态信息在连接和断开连接时被丢弃。将cleansession设置为false将保留会话状态信息
*/
int reliable;
/*
将该值设置为true意味着必须完成发布的消息(已收到确认),才能发送另⼀个消息
*/
MQTTClient_willOptions* will;
/*
如果程序不使⽤最后的意愿和遗嘱功能,请将此指针设置为NULL。
*/
const char* username;//⽤户名
const char* password;//密码
int connectTimeout;//允许尝试连接的过时时间
int retryInterval;//尝试重连的时间
MQTTClient_SSLOptions* ssl;
/*
如果程序不使⽤最后的ssl,请将此指针设置为NULL。
*/
int serverURIcount;
char* const* serverURIs;
/*
连接服务器的url,以protocol:// host:port为格式
*/
int MQTTVersion;
/*
MQTT的版本,MQTTVERSION_3_1(3),MQTTVERSION_3_1_1 (4)
*/
struct
{
const char* serverURI;
int MQTTVersion;
int sessionPresent;
} returned;
struct {
int len;
const void* data;
} binarypwd;
} MQTTClient_connectOptions;
含义:⽤来设置MQTTClient的连接选项的结构体。
MQTTClient_message
定义:
typedef struct
{
char struct_id[4];//结构体的识别序列,必须为MQTM
int struct_version;//结构体的版本,必须为0
int payloadlen;//MQTT信息的长度
void* payload;//指向消息负载的指针
int qos;//服务质量
int retained;//保留标志
int dup;dup//标志指⽰这个消息是否是重复的。只有在收到QoS1消息时才有意义。如果为true,则客户端应⽤程序应采取适当的措施来处理重复的消息。
int msgid;//消息标识符通常保留供MQTT客户端和服务器内部使⽤。
} MQTTClient_message;
含义:代表MQTT信息的结构体。
相关函数详解
MQTTClient_create
定义:
DLLExport int MQTTClient_create(
MQTTClient * handle,
const char * serverURI,
const char * clientId,
int persistence_type,
void * persistence_context
)
作⽤:该函数创建了⼀个⽤于连接到特定服务器,使⽤特定持久存储的MQTT客户端。
参数含义
handle 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引⽤所填充
serverURI 以空结尾的字符串,其指定客户端将连接到的服务器。其格式为protocol://host:port。现在的(protocol)协议必须是tcp或ssl,⽽host可以指定为IP地址或域名。例如, 要使⽤默认 MQTT 端⼝连clientId 客户端标识符(clientId)是⼀个以空结尾的 UTF-8 编码字符串,客户端连接到服务器时将它传递过去。
persistence_type 客户端所使⽤的持久类型。MQTTCLIENT_PERSISTENCE_NONE-使⽤内存持久化。如果客户端运⾏的设备或系统出故障或关闭, 则任何正在运⾏的消息的当前状态都将丢失, 甚⾄在 Q persistence_context 如果应⽤程序使⽤的是MQTTCLIENT_PERSISTENCE_NONE持久化,该参数不使⽤,⽽且值应该设置为NULL。对于MQTTCLIENT_PERSISTENCE_DEFAULT持久化,应该设置MQTTClient_setCallbacks
MQTTClient_setCallbacks
定义:
DLLExport int MQTTClient_setCallbacks (
MQTTClient handle,
void * context,
MQTTClient_connectionLost * cl,
MQTTClient_messageArrived * ma,
MQTTClient_deliveryComplete * dc
)
作⽤:该函数为特定的客户端创建回调函数。如果您的客户端应⽤程序不使⽤特定的回调函数,请将相关参数设置为NULL。调⽤MQTTClient_setCallbacks()使客户端进⼊多线程模式。任何必要的消注意:在调⽤该函数时,MQTT客户端必须断开连接。(即先要调⽤该函数在连接客户端)。
参数含义
handle    指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引⽤所填充
context    指向任何应⽤程序特定上下⽂的指针。上下⽂指针被传递给每个回调函数,以提供对回调中的上下⽂信息的访问。
cl    指向MQTTClient_connectionLost()回调函数的指针。如果您的应⽤程序不处理断开连接,您可以将其设置为NULL。
ma    指向MQTTClient_messageArrived()回调函数的指针。当您调⽤MQTTClient_setCallbacks()时,必须指定此回调函数。
dc    指向MQTTClient_deliveryComplete()回调函数的指针。如果您的应⽤程序同步发布,或者您不想检查是否成功发送,则可以将其设置为NULL。
MQTTClient_connect
定义:
DLLExport int MQTTClient_connect (
MQTTClient handle,
MQTTClient_connectOptions * options
)
作⽤:此函数尝试使⽤指定的选项将先前创建的客户端连接到MQTT服务器。
参数含义
handle    指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引⽤所填充
options    指向有效的MQTTClient_connectOptions结构的指针。
返回值含义
0连接成功
1拒绝连接:不可接受的协议版本。
2拒绝连接:标识符被拒绝。
3拒绝连接:服务器不可⽤。
4拒绝连接:⽤户名或密码错误。
5拒绝连接:未经授权。
6保留给未来⽤。
MQTTClient_subscribe
定义:
DLLExport int MQTTClient_subscribe (
MQTTClient handle,
const char * topic,
int qos
)
作⽤:此功能尝试将客户订阅到单个主题,该主题可能包含通配符。此函数还指定服务质量。
参数含义
handle    指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引⽤所填充
topic    订阅的主题,可使⽤通配符。
qos    订阅的请求服务质量
DLLExport int MQTTClient_publishMessage (
MQTTClient handle,
const char * topicName,
MQTTClient_message * msg,
MQTTClient_deliveryToken * dt
)
作⽤:此功能尝试将客户订阅到单个主题,该主题可能包含通配符。此函数还指定服务质量。
参数含义
handle    指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引⽤所填充
topicName    与信息相关的主题。
msg    指向有效的 MQTTClient_message 结构的指针, 其中包含要发布消息的有效负载和属性
dt    指向MQTTClient_deliveryToken的指针。当函数成功返回时,dt会被赋值为代表消息的token。如果程序中没有使⽤传递token,将其设置为NULL。
MQTTClient_waitForCompletion
定义:
DLLExport int MQTTClient_waitForCompletion (
MQTTClient handle,
MQTTClient_deliveryToken dt,
unsigned long timeout
)
作⽤:客户端应⽤程序调⽤此函数来将主线程的执⾏与消息的完成发布同步。被调⽤时,MQTTClient_waitForCompletion()阻塞执⾏,直到消息成功传递或已超过指定的时间。参数含义
handle    指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引⽤所填充
dt    代表消息的MQTTClient_deliveryToken⽤来检测是否成功传递。传递token由发布函数MQTTClient_publish () 和 MQTTClient_publishMessage ()所产⽣。
timeout    等待的最⼤毫秒数。
返回值:
消息成功传递则返回MQTTCLIENT_SUCCESS(0) ,如果时间已过期或检测token时出问题,则返回错误码。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。