libevent与多线程通信
多线程之间的通信常见的⽅式包括共享内存,消息队列,管道等。基于libevent开发的程序,bufferevent_pair也可作为多线程通信的⼀种⽅式。这⾥简单讲解这⼏种⽅式在libevent中的使⽤。
共享内存
共享内存是多线程通信中最常⽤的⼀种⽅式,⽐如共享⼀个结构体,⼀个数组,⼀个链表等等。使⽤这种⽅式唯⼀要注意的便是对共享内存操作时需要有锁的保护。另外在libevent中,通常会增加⼀个定时器事件,定期从共享内存中取出数据进⾏相应处理。
简单⽰例代码:
#include "stdlib.h"
#include "stdio.h"
#include "string.h"
#include "unistd.h"
#include "event.h"
#include "sys/queue.h"
#define BUF_MAX_LEN 1024
#define BUF_BODY "hello world\n"
struct msg_item
{
TAILQ_ENTRY(msg_item) next;
char buf[BUF_MAX_LEN];
int len;
};
struct msg_queue
{
TAILQ_HEAD(msg_q, msg_item) entries
pthread_mutex_t lock;
};
void do_write( void * pArg )
{
struct msg_queue * pMsgQueue = (struct msg_queue *)pArg;
while( 1 )
{
struct msg_item *pMsg = (struct msg_item*)malloc(sizeof(struct msg_item));
memset(pMsg, 0, sizeof(struct msg_item));
int nLength = strlen(BUF_BODY);
memcpy(pMsg->buf, BUF_BODY, nLength);
pMsg->len = nLength;
pthread_mutex_lock(&(pMsgQueue-lock));
TAILQ_INSERT_TAIL(&(pMsgQueue->entries),pMsg,next);
pthread_mutex_unlock(&(pMsgQueue->lock));
sleep(5);
}
}
void do_read(int fd, short sWhat, void * pArg)
{
struct msg_queue *pMsgQueue = (struct msg_queue*)pArg;
struct msg_item *pMsg;
pthread_mutex_lock(&(pMsgQueue->lock));
while(NULL != (pMsg=TAILQ_FIRST(&(pMsgQueue->entries))) )
{
printf("Length:%d\tContent: %s\n", pMsg->len, pMsg->buf);
TAILQ_REMOVE(&(pMsgQueue->entries), pMsg, next);
free(pMsg);
}
pthread_mutex_unlock(&(pMsgQueue->lock));
return ;
}
int main(void)
{
struct event_base * pEventBase = NULL;
pEventBase = event_base_new();
struct msg_queue *pMsgQueue = (struct msg_queue*)malloc(sizeof(struct msg_queue));
TAILQ_INIT(&(pMsgQueue->entries));
pthread_mutex_init(&(pMsgQueue->lock));
struct event eTimeout;
struct timeval tTimeout = {1, 0};
event_assing(&eTimeout, pEventBase, -1, EV_PERSIST, do_read, pMsgQueue);
evtimer_add(&eTimeout, &tTimeout);
pthread_t tid;
pthread_create(&tid, NULL, do_write, pMsgQueue);
event_base_dispatch(pEventBase);
event_base_free(pEventBase);
return 0;
}
系统的消息队列
前⾯提到共享内存的⽅式需要有锁的保护,那么使⽤系统的消息队列虽然本质上仍旧是共享内存,但是系统(内核)帮我们完成了锁的操作,因此我们只需要向队列发送数据,或者取出数据进⾏相应处理。当然,定时器事件仍旧是不可缺少的。
代码⽚段:
void do_write(void *pArg)
{
mqd_t nMQHandle = *(mqd_t *)pArg;
int nLength = strlen(BUF_BODY);
while(1)
{
mq_send(nMQHandle, BUF_BODY, nLength, 0);
}
return ;
}
void do_read(int fd, short sWhat, void * pArg)
{
mqd_t nMQHandle = *(mqd_t *)pArg;
char buf[50] = {0};
int nLen = mq_receive(nMQHandle, buf, sizeof(buf), NULL);
if( -1 == nLen )
{
printf("mq_receive failed\n");
return ;
}
printf("receive %d byte: %s\n", nLen, buf);
return ;
}
pipe管道
管道通常⽤于⽗⼦进程间的通信,但是它也是⼀种多线程之间通信的⽅式。在libevent中使⽤这种⽅式,只要给相应的fd设置相应的可读可写事件及其回调,就可以通过libevent进⾏事件驱动。
简单⽰例代码:
void do_write(void * pArg)
{
int fd = *(int *)pArg;
while( 1 )
{
write(fd, BUF_BODY, strlen(BUF_BODY));
sleep(5);
}
return ;
}
void do_read(int fd, short sWhat, void * pArg)
{
char buf[20] = {0};
int nLen = read(fd, buf, 20);
printf("read %d bytes: %s\n", nLen, buf);
}
int main( void )
{
int fds[2];
if(0 > pipe(fds))
{
return ;
}
struct event_base * pEventBase = NULL;
pEventBase = event_base_new();
if( NULL == pEventBase )
{
printf("event_base_new failed\n");
return 0;
}
struct event eRead;
event_assign(&eRead, pEventBase, fds[0], EV_PERSIST|EV_READ, do_read, NULL);
event_add(&eRead, NULL);
pthread_t tid;
pthread_create(&tid, NULL, do_write, &fds[1]);
event_base_dispatch(pEventBase);
event_base_free(pEventBase);
return 0;
}
bufferevent_pair
bufferevent_pair是⼀个成对的bufferevent,在任意⼀个bufferevent中写⼊数据,会⾃动拷贝到成对的另⼀个bufferevent中。它实现了⼀个类似socketpair的功能,我们先看看bufferevent_pair在多线程中的使⽤。
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "unistd.h"
#include "event.h"
#define BUF_BODY "hello world\n"
void do_write( void * pArg )
{
struct bufferevent *pBufEv = (struct bufferevent *)pArg;
while(1)
{
bufferevent_write(pBufEv,BUF_BODY,strlen(BUF_BODY));
sleep(5);
}
}
void do_read( struct bufferevent * pBEv, void * pArg )
{
struct evbuffer * pInput = bufferevent_get_input(pBEv);
size_t nLen = evbuffer_get_length(pInput);
if( nLen > 0 )
{
char * pData = (char *)malloc(nLen+1);
evbuffer_remove(pInput, pData, nLen);
pData[nLen] ='\0';
printf("%s\n", pData);
free(pData);
}
return ;
}
int main( void )
{
struct event_base * pEventBase = NULL;
struct bufferevent * aPair[2] = {NULL};
pEventBase = event_base_new();
进程间通信 共享内存if( NULL == pEventBase )
{
printf("event_base_new() 返回NULL值\n");
return 0;
}
int nRet = bufferevent_pair_new(pEventBase, 0, aPair);
if( 0 != nRet )
{
printf("bufferevent_pair_new()返回值为%d\n", nRet);
event_base_free(pEventBase);
return 0;
}
bufferevent_setcb(aPair[1], do_read, NULL, NULL, NULL);
bufferevent_enable(aPair[1], EV_READ);
bufferevent_enable(aPair[0], EV_WRITE);
pthread_t tid;
pthread_create(&tid, NULL, do_write, aPair[0]);
event_base_dispatch(pEventBase);
return 0;
}
程序中,创建⼀个线程往bufferevent[0]中写⼊数据,对bufferevent[1]注册可读,并在回调函数中取出数据打印。
从官⽅的⽂档中可以知道,通过bufferevent_pair_new创建的两个bufferevent都具有默认的属性BEV_OPT_DEFER_CALLBACKS。为什么需要这个属性?官⽅给出的解释是为了防⽌回调的层次太多,从⽽导致栈溢出,并且所有的回调都要求是可重⼊的,因此需要延迟回调。分析完相关代码后,可以认为延迟回调也恰好是可以⽤于多线程通信的关键。
在⽂章中简单介绍了延迟回调,让我们再回顾下延迟回调的过程。
这幅图应该⽐较形象的描述了延迟回调的过程,那我们再来看看bufferevent_pair和延迟回调怎么完成多线程通信的。当我们在bufferevent[0]中写⼊数据后,会⽴即将数据拷贝到bufferevent[1]的输⼊缓存区中,然后分别触发bufferevent[0]和bufferevent[1]的可写,可读回调。但由于两个bufferevent都属于延迟回调,因此并不⽴即回调,⽽是添加到延迟队列中,注意这些动作都是在写bufferevent[0]操作的线程中完成的。在事件驱动的线程中,对延迟队列中的事件进⾏回调处理,这样就完成了多线程之间的通信。
虽然bufferevent_pair看似是实现了socketpair的功能,但本质上应该还是属于内存共享的⽅式。这样,锁是不可或缺的。实际上,在整个过程中,对输出缓存,输⼊缓存,延迟队列的操作都会先进⾏上锁,另外对bufferevent本⾝也使⽤了锁和引⽤计数,保证原⼦操作。
总结
libevent中的多线程通信⽅式各有优缺点,⽐如pipe有⼤⼩限制,共享内存有锁的开销,系统消息队列每次receive都只取⼀条消息,bufferevent_pair有⼀定的延迟等等,我们需要根据实际的业务合理采⽤不同的⽅式。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论