zmq消息传输基本功能的实现、传输模式
zmq的基本功能:
(1)将消息快速⾼效地发送给其他节点,这⾥的节点可以是线程、进程、或是其他计算机;
(2)zmq为应⽤程序提供了⼀套简单的套结字API,不⽤考虑实际使⽤的协议类型(进程内、进程间、TPC、或⼴播);
(3)当节点调动时,zmq会⾃动进⾏链接或者重连;
(4)⽆论是发送消息还是接收消息,zmq都会先将消息放⼊队列中,并保证进程不会因为内存溢出⽽崩溃,适时地将消息写⼊磁盘;
(5)zmq会处理套接字异常;
(6)所有的I/O操作都在后台进⾏;
(7)zmq不会产⽣死锁。
zmq的使⽤:
在链接两个节点时,其中⼀个需要使⽤zmq_bind(),另⼀个则使⽤zmq_connect()。通常来讲,使⽤zmq_bind()链接的节点称之为服务端,它有着⼀个较为固定的⽹络地址;使⽤zmq_connect()链接的节点称为客户端,其地址不固定。每当客户端使⽤zmq_connect()链接上述某个端点时,服务端就会⾃动创建链接。zmq没有对链接数量进⾏限制。此外,客户端节点也可以使⽤⼀个套接字同时建⽴多个链接。
发送和接收消息使⽤的是zmq_send()和zmq_recv()这两个函数。zmq套接字可以发送消息给多个端点(扇出模型),或从多个端点中接收消息(扇⼊模型)。所以,向套接字写⼊⼀个消息时可能会将消息发送给很多节点,相应的,套接字⼜会从所有已建⽴的链接中接收消息。
zmq_recv()⽅法使⽤了公平队列的算法来决定接收哪个链接的消息。
调⽤zmq_send()⽅法时其实并没有真正将消息发送给套接字链接。消息会在⼀个内存队列中保存下来,并由后台的I/O线程异步地进⾏发送。如果不出意外情况,这⼀⾏为是⾮阻塞的。所以说,即便zmq_send()有返回值,并不能代表消息已经发送。已发送消息不能重复使⽤。
使⽤举例:
//
/
/ 管道模式 - 结构收集器设计2
// 添加发布-订阅消息流,⽤以向worker发送⾃杀信号
//
#include "zhelpers.h"
int main (void)
{
void *context = zmq_init (1);
// ⽤于接收消息的套接字
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");
// ⽤以发送控制信息的套接字
void *controller = zmq_socket (context, ZMQ_PUB);
zmq_bind (controller, "tcp://*:5559");
// 等待任务开始
char *string = s_recv (receiver);
free (string);
// 开始计时
int64_t start_time = s_clock ();
// 确认100个任务处理完毕
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);
free (string);
if ((task_nbr / 10) * 10 == task_nbr)
printf (":");
else
printf (".");
fflush (stdout);
}
printf ("总执⾏时间: %d msec\n",
(int) (s_clock () - start_time));
// 发送⾃杀消息给worker
s_send (controller, "KILL");
/
/ 结束
sleep (1); // 等待发送完毕
zmq_close (receiver);
zmq_close (controller);
zmq_term (context);
return0;
}
zmq发送和接收消息C++代码
///接收消息:
zmq_msg_t message; // 创建消息结构
zmq_msg_init (&message); // 初始化空消息
zmq_recv (socket, &message, 0); // 接收消息
int size = zmq_msg_size (&message); // 计算消息的⼤⼩
char *string = malloc (size + 1); // 分配string为指向size + 1⼤⼩的heap空间,那个多出来的1字节是'\0'的空间
memcpy (string, zmq_msg_data (&message), size); // 通过zmq_msg_data(1)获得消息的data地址,拷贝到字符串中
zmq_msg_close (&message); // 释放或销毁消息
string [size] = 0; // 设置'\0'
return (string);
///发送消息:
int rc;
zmq_msg_t message; // 创建消息结构
zmq_msg_init_size (&message, strlen (string)); // 以字符串长度(不包括'\0')初始化成消息
memcpy (zmq_msg_data (&message), string, strlen (string)); // 将字符串的内容(不包括'\0')拷贝给消息
rc = zmq_send (socket, &message, 0); // 发送消息
assert (!rc);
zmq_msg_close (&message); // 释放和销毁消息
return (rc);
socket types: zmq⼀共具有12种类型的socket,5种消息模式。
(1)请求/应答模式:ZMQ_REQ、ZMQ_REP、ZMQ_DEALER、ZMQ_ROUTER
(2)发布/订阅模式:ZMQ_PUB、ZMQ_SUB、ZMQ_XPUB、ZMQ_XSUB
(3)管道模式:ZMQ_PUSH、ZMQ_PULL
recv函数(4)本地模式:ZMQ_STREAM
请求/应答模式
ZMQ_REQ
⼀般⽤于客户端发送请求消息,此类型的socket必须严格遵循先发送後接收的顺序,如果发⽣异常或者当前没有可⽤的服务(连
接),socket会阻塞,直到有可⽤的服务(新连接的到来),再把消息发送出去。REQ类型的socket不会丢弃信息。
ZMQ_REP发送消息时会⾃动在消息顶部插⼊⼀个空帧。
特点总结:
(1)可兼容的socket types: ZMQ_REP, ZMQ_ROUTER
(2)数据传输:双向
(3)发送/接收模式:发送a接收a
(4)发送路由策略:Round-robin(循环队列)
(5)接收路由策略:Last peer
(6)进⼊mute状态后:阻塞。
ZMQ_REP
⼀般⽤于服务端接收消息,此类型的socket必须严格遵循先接收後发送的顺序,从客户端接收请求消息使⽤了公平队列,回应客户端时,所有的reply都会被路由到最后下达请求的客户端。
如果发⽣异常或者当前没有可⽤的客户端链接,所有消息都会毫⽆提⽰的被丢弃,不会发⽣阻塞。
特点总结:
(1)可兼容的socket types:ZMQ_REQ,ZMQ_DEALER
(2)数据传输:双向
(3)发送/接收模式:接收a发送a
(4)发送路由策略:Last peer
(5)接收路由策略:Fair-queued(公平队列)
ZMQ_DEALER
DEALER是⼀种⽤于请求/答应模式的更⾼级的扩展Socket,它可以⾃由的收发消息,没有ZMQ_REP/ZMQ_REQ那样的限制。
对于每⼀个连接,接收消息也是使⽤了公平队列,发送使⽤了循环队列(RR)。
ZMQ_DEALERE受ZMQ_RCVHW和ZMQ_SHDHW两个阈值影响(可通过zmq_setsockopt函数设置),⼀旦发送或接收消息队列达到阈值,socket就会进⼊mute状态,此时对DEALER的任何xmq_send操作都会阻塞,直到mute状态结束。
如果当前没有有效的链接,zmq_send操作也会阻塞,直到有新的链接到来为⽌。
DEALER发⽣阻塞并不会丢弃消息
注意:如果zmq_DEALER连接到ZMQ_REP,每⼀个消息包必须包含⼀个空帧,然后再紧跟着数据包体。
特点总结:
(1)可兼容的Socket types:ZMQ_ROUTER, ZMQ_REP, ZMQ_DEALER
(2)数据传输:双向
(3)发送/接收模式:⽆限制
(4)发送路由策略:Round-robin(循环队列)
(5)接收路由策略:Faie-queued(公平队列)
(6)进⼊mute状态後:阻塞
ZMQ_ROUTER
ZMQ_ROUTER是⼀种⽤于请求/答应模式的更⾼级的扩展Socket,它可以⾃由的收发消息。
当ZMQ_ROUTER接收到消息时,会⾃动在消息顶部加⼊来源地址标识符,接收消息使⽤了公平队列。当发送消息时,ZMQ_ROUTER⼜会⾃动去掉这个标识符,并且根据这个标识符路由到相应的端点。
如果此地址标识的端点不存在,默认会毫⽆征兆的丢弃消息,除⾮将ZMQ_ROUTE_MANDATORY选项设置为1。
当队列达到阈值时,socket就会进⼊mute状态,此时所有后续发送到此Socket的消息都会被丢弃,直到
mute状态结束。同样的,如果对端的接收队列达到了阈值,消息也会被丢弃。
如果ZMQ_REQ连接到ZMQ_ROUTER,从ZMQ_ROUTER接收到ZMQ_REQ消息时,除了会在消息前加上来源地址标识符之外,还会加上⼀个空帧与原消息分隔。因此消息可以包含多个地址标识符和多个数据包体。地址和数据体之间必须⽤空帧分割;发送回应消息给
ZMQ_REQ时,必须⾄少包含⼀个空帧;发送消息时,ZMQ_ROUTER会根据第⼀个地址标识符路由到对应的端点。
特点总结:
(1)可兼容的Sockte types:ZMQ_DEALER, ZMQ_REQ, ZMQ_ROUTER
(2)数据传输:双向
(3)发送/接收模式:⽆限制
(4)接收路由策略:Fair-queued(公平队列)
(5)进⼊mute状态後:丢弃消息
发布/订阅模式
ZMQ_PUB
ZMQ_PUB类型的Socket以发布者的⾝份向订阅者分发消息,消息以扇出的形式发送给所有订阅者链接。
ZMQ_PUB类型的Socket没有实现zmq_recv函数,所以不能对其调⽤zmq_recv函数!
当ZMQ_PUB Socket达到阈值时进⼊mute状态,此时后续发送的消息会被丢弃,直到mute状态结束。
对ZMQ_PUB Socket调⽤zmq_send永远不会发⽣阻塞。
特点总结:
(1)可兼容的Socket types: ZMQ_SUB, ZMQ_XSUB
(2)数据传输:单向
(3)发送/接收模式:只能发送
(4)接收路由策略:Fan out(扇出)
(5)进⼊mute状态後:丢弃消息
ZMQ_SUB
ZMQ_SUB类型的Socket以订阅者的⾝份接收消息。初始的ZMQ_SUB Socket没有订阅任何消息,可以通过设置ZMQ_SUBSRIBE选项来指定需要订阅的消息。
ZMQ_SUB Socket没有实现zmq_send函数,所以不能对其调⽤zmq_send函数!
特点总结:
(1)可兼容的Socket types: ZMQ_PUB, ZMQ_XPUB
(2)数据传输:单向
(3)发送/接收模式:只能接收
(4)接收路由策略:Fair-queued(公平队列)
Socket options(部分)
概要:通过zmq_setsockopt和zmq_getsocketopt函数来设置/读取指定选项
ZMQ_SNDHWM
设置指定Socket的发送消息队列在⾼⽔位标识(阈值),ZMQ会严格限制发送队列的上限数。0表⽰⽆限制。
如果达到了这个限制,socket就会进⼊异常状态,zmq此时会采取适当的措施——阻塞或丢弃消息,这取决于socket的类型。
Note: ZMQ不保证socket⼀定能接收ZMQ_SNDHWM这么多的消息,甚⾄可能会低60%-70%,这取决于socket上的信息流。
ZMQ_RCVHWM
设置指定Socket的接收消息队列的⾼⽔位标识(阈值),ZMQ会严格限制接收队列的上限数。0表⽰⽆限制。
如果达到了这个限制,socket就会进⼊异常状态,ZMQ此时会采取适当的措施——阻塞或丢弃消息,这取决于socket的类型
ZMQ_SUBSCRIBE
ZMQ_SUBSCRIBE选项会在ZMQ_SUB socket上建⽴⼀个消息过滤器。初始的ZMQ_SUB Socket会过滤掉所有的消息,因此必须设置这个选项,否则将收不到任何消息。
如果设置⼀个0长度的空值,ZMQ_SUB Socket会接受所有的消息。设置⼀个⾮空值将接受指定的消息。可以在同⼀个ZMQ_SUB Socket上设置多个过滤器,它将会接受⾄少⼀个匹配的消息。
ZMQ_UNSUBSCRIBE
此选项⽤来删除ZMQ_SUB Socket上通过ZMQ_SUBSCRIBE设置过的消息过滤器。如果Socket有多个实例有相同的过滤器,只删除其中⼀个。
ZMQ_IDENTITY
此选项⽤来设置Socket的⾝份标识,只能⽤于请求/答应模式。ROUTER Socket可以根据这个⾝份来标识路由信息。
⾝份标识的长度规定在1-255bytes, 由⼆进制零开头的标识符为ZMQ保留使⽤。
如果两个⾝份标识相同的Socket链接到同⼀个对端(ROUTER),结果⾏为是未定义的。
ZMQ_RECVTIMEO
设置Socket的receive操作的超时。如果为0,则zmq_recv会⽴即返回,如果没有接收到消息,会返回⼀个EAGAIN错误;如果为-1,Socket 会阻塞到有可⽤消息为⽌;如果为其他值,Socket要么阻塞达到指定的时间还没接收到可⽤的消息,返回⼀个EAGAIN错误,要么在指定时间前接收到可⽤消息。
ZMQ_SENDTIMEO
设置Socket的Send操作的超时。如果为0,则zmq_send会⽴即返回,如果消息没有发送成功,会返回⼀个EAGAIN错误;如果为-1,Socket会抑制阻塞到消息发送完毕;
如果为其他值,Socket要么阻塞达到指定的时间还没发送完成,返回⼀个EAGAIN错误,要么在指定时间前发送完消息。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论