Linux⽹络编程的5种IO模型:异步IO模型
Linux ⽹络编程的5种IO模型:异步IO模型
资料已经整理好,但是还有未竟之业:复习多路复⽤epoll 阅读例程, 异步IO 函数实现
背景
上⼀讲《》我们已经介绍了信号驱动模型,以及带有BUG的例程。
前⾯四种IO模型实际上都属于同步IO,只有最后⼀种是真正的异步IO,因为⽆论是多路复⽤IO还是信号驱动模型,IO操作的第2个阶段都会引起⽤户线程阻塞,也就是内核进⾏数据拷贝的过程都会让⽤户线程阻塞。
这⼀讲我们来介绍最后⼀种IO模型。
导⾔
两种⾼性能IO设计模式
在传统的⽹络服务设计模式中,有两种⽐较经典的模式:多线程,与线程池。
多线程
对于多线程模式,也就说来了client,服务器就会新建⼀个线程来处理该client的读写事件,如下图所⽰:
这种模式虽然处理起来简单⽅便,但是由于服务器为每个client的连接都采⽤⼀个线程去处理,使得资源占⽤⾮常⼤。因此,当连接数量达到上限时,再有⽤户请求连接,直接会导致资源瓶颈,严重的可能会直接导致服务器崩溃。
线程池
因此,为了解决这种⼀个线程对应⼀个客户端模式带来的问题,提出了采⽤线程池的⽅式,也就说创建⼀个固定⼤⼩的线程池,来⼀个客户端,就从线程池取⼀个空闲线程来处理,当客户端处理完读写操作之后,就交出对线程的占⽤。因此这样就避免为每⼀个客户端都要创建线程带来的资源浪费,使得线程可以重⽤。
但是线程池也有它的弊端,如果连接⼤多是长连接,因此可能会导致在⼀段时间内,线程池中的线程都被占⽤,那么当再有⽤户请求连接时,由于没有可⽤的空闲线程来处理,就会导致客户端连接失败,从⽽影响⽤户体验。因此,线程池⽐较适合⼤量的短连接应⽤。
⾼性能IO模型
因此便出现了下⾯的两种⾼性能IO设计模式:Reactor和Proactor。
Reactor
在Reactor模式中,会先对每个client注册感兴趣的事件,然后有⼀个线程专门去轮询每个client是否有事件发⽣,当有事件发⽣时,便顺序处理每个事件,当所有事件处理完之后,便再转去继续轮询,如下图所⽰:
从这⾥可以看出,多路复⽤IO就是采⽤Reactor模式。
注意,上⾯的图中展⽰的是顺序处理每个事件,当然为了提⾼事件处理速度,可以通过多线程或者线程池的⽅式来处理事件。
Proactor
在Proactor模式中:当检测到有事件发⽣时,会新起⼀个异步操作,然后交由内核线程去处理,当内核线程完成IO操作之后,发送⼀个通知告知操作已完成;可以得知,异步IO 模型采⽤的就是Proactor模式。
Linux异步IO模型与有关函数
异步IO模型是⽐较理想的IO模型,在异步IO模型中,当⽤户线程发起read操作之后,⽴刻就可以开始去做其它的事。⽽另⼀⽅⾯,从内核的⾓度,当它受到⼀个asynchronous read之后,它会⽴刻返回,说明read请求已经成功发起了,因此不会对⽤户线程产⽣任何block。然后,内核会等待数据准备完成,然
后将数据拷贝到⽤户线程,当这⼀切都完成之后,内核会给⽤户线程发送⼀个信号,告诉它read操作完成了。也就说⽤户线程完全不需要关⼼实际的整个IO操作是如何进⾏的,只需要先发起⼀个请求,当接收内核返回的成功信号时表⽰IO操作已经完成,可以直接去使⽤数据了。
也就说在异步IO模型中,IO操作的两个阶段都不会阻塞⽤户线程,这两个阶段都是由内核⾃动完成,然后发送⼀个信号告知⽤户线程操作已完成。⽤户线程中不需要再次调⽤IO 函数进⾏具体的读写。
这点是和信号驱动模型有所不同的,在信号驱动模型中,当⽤户线程接收到信号表⽰数据已经就绪,然后需要⽤户线程调⽤IO函数进⾏实际的读写操作;⽽在异步IO 模型中,收到信号表⽰IO操作已经完成,不需要再在⽤户线程中调⽤iO函数进⾏实际的读写操作。
%% 时序图 sequenceDiagram title : 异步IO模型 participant application participant kernel Note right of application: 应⽤程序调⽤系统调⽤ application ->> kernel: aio_read kernel ->> application: 返回 kernel ->> kernel: 准备好数据,拷贝到⽤户空间 kernel ->> application: 递交到aio_read指定的信号中 application ->> application : 信号处理
#include <aio.h>
int aio_read(struct aiocb *__aiocbp);
int aio_write(struct aiocb *__aiocbp);
Link with -lrt.
/* 有关结构体,能够使⽤的成员已经标出 */
struct aiocb
{
▲ int aio_fildes; /* 对哪个⽂件进⾏读写. */
▲ int aio_lio_opcode; /* 要执⾏的操作 */
int aio_reqprio; /* Request priority offset. */
▲ volatile void *aio_buf; /* 读写⽤的buffer */
▲ size_t aio_nbytes; /* Length of transfer. */
▲ struct sigevent aio_sigevent; /* 告诉 AIO 在 I/O 操作完成时应该执⾏什么操作。 */
/
* Internal members. */
struct aiocb *__next_prio;
int __abs_prio;
int __policy;
int __error_code;
__ssize_t __return_value;
#ifndef __USE_FILE_OFFSET64 // 针对⼤⽂件的⽀持
▲ __off_t aio_offset; /* 在传统的 read 调⽤中,偏移量是在⽂件描述符上下⽂中进⾏维护的, */
char __pad[sizeof (__off64_t) - sizeof (__off_t)];
#else
▲ __off64_t aio_offset; /* 对于异步 I/O 操作来说这是不可能的,因为我们可以同时执⾏很多读请求,因此必须为每个特定的读请求都指定偏移量。 */
#endif
char __glibc_reserved[32];
};
struct sigevent {
int sigev_notify; /* 通知⽅式:为SIGEV_NONE、SIGEV_SIGNAL、SIGEV_THREAD、SIGEV_THREAD_ID(只针对linux)当中的⼀个; */
int sigev_signo; /* 为signal的值,当sigev_notify为SIGEV_SIGNAL时,会将这个signal发送给进程; */
union sigval sigev_value; /* 信号传递的数据 */
void (*sigev_notify_function) (union sigval);/* 当sigev_notify为SIGEV_THREAD时,处理线程将调⽤这个处理函数 (SIGEV_THREAD) */
void *sigev_notify_attributes;/* sigev_notify_function的参数 (SIGEV_THREAD) */
pid_t sigev_notify_thread_id; /* 当sigev_notify为SIGEV_THREAD_ID时的处理线程ID (SIGEV_THREA
D_ID) */
};
union sigval { /*传递的参数*/
int sival_int; /* 信号机制传递的参数 */
void *sival_ptr; /* 若是线程机制传递的参数 */
};
// 什么时候使⽤ AIO ?了解 AIO 机制之后,不难发现, AIO 其实是⽤于解决⼤量 IO 并发操作⽽出现的,牺牲⼀些信号处理耗时,⽤多线程的⽅式加速 IO ,提⾼并⾏效率。
函数作⽤
请求异步读操作
检查异步请求的状态
获得完成的异步请求的返回状态
请求异步写操作
挂起调⽤进程,直到⼀个或多个异步请求已经完成(或失败)
取消异步 I/O 请求
强制同步
发起⼀系列 I/O 操作
aio_read
#include <aio.h>
int aio_read( struct aiocb *aiocbp );
描述:请求⼀个异步写操作。
返回值:成功返回值 0;出错返回值 -1,并设置 errno的值。
aio_read 例⼦
#include <unistd.h>
#include <stdio.h>
#include <aio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/timeb.h>
#define BUFFER_SIZE 1024*1024
void ptime(const char* tip){
struct timeb tb;
ftime(&tb);
fprintf(stdout, "%s %u : %u\n", tip, tb.time, tb.millitm);
}
int main(){
/* 句柄,返回码 */
int fd = -1, ret = -1;
fd = open("./", O_RDONLY);
if(fd <= 0){
fprintf(stderr, "open file errro: %s\n", strerror(errno));
return -1;
}
/* aio控制结构 */
aiocb my_aiocb;
memset(&my_aiocb, 0, sizeof(my_aiocb));
/* 初始化 */
my_aiocb.aio_fildes = fd;
my_aiocb.aio_reqprio = 0;
my_aiocb.aio_nbytes = BUFFER_SIZE;
char buf[BUFFER_SIZE + 1] = {0};
my_aiocb.aio_buf = buf;
ptime("start read");
/
* aio 读 */
ret = aio_read(&my_aiocb);
if(ret < 0){
fprintf(stderr, "aio read error: %s\n", strerror(errno));
return -2;
}
ptime("reading");
/* 检查状态 */
while(aio_error(&my_aiocb) == EINPROGRESS);
/* (这种做法不是最有效的,只是为了演⽰aio_error如何使⽤)可以调⽤ aio_error 来确定 aio_read 的状态。只要状态是 EINPROGRESS,就⼀直忙碌等待,直到状态发⽣变化为⽌。请求可能成功,也可能失败。*/ ptime("after read");
if ((ret = aio_return( &my_iocb )) > 0) {
/* got ret bytes on the read */
fprintf(stdout, "read: %10.10s\n", my_aiocb.aio_buf);
} else {
/* read failed, consult errno */
fprintf(stderr, "return: %d\n", ret);
}
close(fd);
return 0;
}
aio_error
int aio_error( struct aiocb *aiocbp );
描述:⽤来确定请求的状态。
返回值:
EINPROGRESS,说明请求尚未完成
ECANCELLED,说明请求被应⽤程序取消了
-1,说明发⽣了错误,具体错误原因可以查阅 errno
aio_return
ssize_t aio_return( struct aiocb *aiocbp );
描述:获得完成的异步请求的返回状态。
异步 I/O 和标准 I/O 之间的另外⼀个区别是我们不能⽴即访问这个函数的返回状态,因为我们并没有阻塞在read调⽤上。在标准的read调⽤中,返回状态是在该函数返回时提供的。但是在异步 I/O 中,我们要使⽤aio_return函数。
只有在aio_error调⽤确定请求已经完成(可能成功,也可能发⽣了错误)之后,才会调⽤这个函数。
返回值:所传输的字节数,如果出错,返回-1(等价于read或write系统调⽤的返回值)。
aio_write
int aio_write( struct aiocb *aiocbp );
描述:请求⼀个异步写操作。
aio_write函数会⽴即返回,说明请求已经进⾏排队(成功时返回值为0,失败时返回值为-1,并相应地设置errno)。
这与aio_read类似,但是在偏移量上有⼀点不⼀样:对于write来说,这个偏移量只有在没有设置O_APPEND选项的⽂件上下⽂中才会⾮常重要。
如果设置了O_APPEND,那么这个偏移量就会被忽略,数据都会被附加到⽂件的末尾。否则,aio_offset域就确定了数据在要写⼊的⽂件中的偏移量。
aio_suspend
int aio_suspend( const struct aiocb *const aiocb_list[],
int nitems, const struct timespec *timeout );
描述:挂起(或阻塞)调⽤进程,直到以下情况发⽣:
⼀个或多个处于 aiocb_list中的异步请求完成
有信号递达
调⽤时指定的时间已到,发⽣超时
调⽤者提供了⼀个aiocb引⽤列表,其中任何⼀个完成都会导致aio_suspend返回。
参数解析:
cblist:⼀组异步IO请求(aiocb_list中任何NULL元素都会被忽略)
nitems:该组的成员数量
timeout:超时时间,NULL代表永远阻塞
返回值:成功返回0;失败返回-1,设置errno:
EAGAIN :超时,希望程序重试。
EINTR :被信号中断(也有可能是等待的某个操作的完成信号)
ENOSYS :这个功能未被当前系统⽀持(未实现)
aio_suspend 例程
使⽤⾮常简单。我们要提供⼀个aiocb引⽤列表。
...
struct aioct *cblist[MAX_LIST];
/* Clear the list. */
bzero( (char *)cblist, sizeof(cblist) );
/* Load one or more references into the list */
cblist[0] = &my_aiocb;
...
for(i = 0; i < ..; i++)
{
}
ret = aio_read( &my_aiocb );
...
ret = aio_suspend(cblist, MAX_LIST, NULL );
...
aio_cancel
int aio_cancel( int fd, struct aiocb *aiocbp);
描述:允许我们取消对某个⽂件描述符执⾏的⼀个或所有 I/O 请求。
参数解析:
fd :与读写请求有关的⽂件描述符
aiocbp:读写请求(为NULL时,取消所有请求)
返回值:成功取消返回AIO_CANCELED,请求被完成时返回AIO_NOTCANCELED。
要取消对某个给定⽂件描述符的所有请求,我们需要提供这个⽂件的描述符,以及⼀个对aiocbp的NULL引⽤。
如果所有的请求都取消了,这个函数就会返回AIO_CANCELED;
如果⾄少有⼀个请求没有被取消,那么这个函数就会返回AIO_NOT_CANCELED;
如果没有⼀个请求可以被取消,那么这个函数就会返回AIO_ALLDONE。
可以使⽤aio_error来验证每个 AIO 请求。如果这个请求已经被取消了,那么aio_error就会返回-1,并且errno会被设置为ECANCELED。
aio_fsync
int aio_fsync(int op, struct aiocb *aiocbp);
描述:在AIO是交给其他线程来完成的,如果需要⼿动执⾏同步,则需要调⽤这个函数。函数执⾏时,将强制完成该AIO上的所有操作。与⼀般⽂件IO的fsync⽤法基本⼀致。
如果想要所有等待的异步操作不等待⽽写⼊持久化的存储中,可以设⽴⼀个AIO控制板并调⽤该函数。
aio_read、aio_write函数会进⾏数据的缓冲。使⽤了aio_fsync就不必再去使⽤aio_read和aio_write了
参数解析:
op: operation为操作码
O_SYNC :同步异步IO数据,当前所有IO操作均将完成
O_DSYNC:同步⼀个IO请求,并不等待所有的IO完成(相当于调⽤fdatasync函数)
aiocbp:异步请求
lio_listio
int lio_listio( int mode, struct aiocb *aiocb_list[], int nitems,
struct sigevent *sevp);
#include <signal.h>
union sigval { /*传递的参数*/
int sival_int; /* 信号机制传递的参数 */
void *sival_ptr; /* 若是线程机制传递的参数 */
};
struct sigevent {
int sigev_notify; /* 设置通知机制⽅法,线程为SIGEV_THREAD,信号为SIGEV_SIGNAL*/
int sigev_signo; /* 若是信号机制,该参数设置为触发的信号 */
union sigval sigev_value;/* 传递的参数*/
void (*sigev_notify_function)(union sigval);
/* 若是线程机制,该参数为线程函数*/
void *sigev_notify_attributes;
/* 线程函数的属性 */
};
描述:同时发起多个传输。
意味着我们可以在⼀个系统调⽤(⼀次内核上下⽂切换)中启动⼤量的 I/O 操作。从性能的⾓度来看,⼤⼤提⾼了效率。
参数解析:
mode:
LIO_WAIT:阻塞这个调⽤,直到所有的 I/O 都完成为⽌。
LIO_NOWAIT:操作进⾏排队之后,⽴即返回。
list:⼀组异步IO请求(aiocb_list中任何NULL元素都会被忽略)
nitems:请求的个数
sigevent:在所有 I/O 操作都完成时产⽣信号的⽅法。
注意:
对于lio_listio的请求与传统的read或write请求在必须指定的操作⽅⾯稍有不同。
对于读操作来说,aio_lio_opcode域的值为LIO_READ
对于写操作来说,我们要使⽤LIO_WRITE;
允许LIO_NOP(不执⾏)
lio_listio 例程
struct aiocb aiocb1, aiocb2;
struct aiocb *list[MAX_LIST];
.
..
/* Prepare the first aiocb */
aiocb1.aio_fildes = fd;
reactor线程模型 javaaiocb1.aio_buf = malloc( BUFSIZE+1 );
aiocb1.aio_nbytes = BUFSIZE;
aiocb1.aio_offset = next_offset;
aiocb1.aio_lio_opcode = LIO_READ;
...
bzero( (char *)list, sizeof(list) );
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论