文章目录
总的来讲,异步分为
POSIX 异步与内核层异步
.
- POSIX 异步是一种穷人版的异步实现,主要方法就是
库函数aio_read/aio_write 克隆当前进程,让子进程调用同步的read/write,然后父进程结束aio_read/aio_write函数并继续执行,因此,就不用等待子进程的同步操作完成了
.而内核实现了真正的异步.POSIX要比内核异步慢的多. - 只支持直接IO-即
O_DIRECT
,会不使用系统缓存,如果要使用只能程序员自己实现! 真正利用了CPU和IO设备可以异步工作的特性(IO请求提交的过程主要还是在调用者线程上同步完成的,请求提交后由于CPU与IO设备可以并行工作,所以调用流程可以返回,调用者可以继续做其他事情)
API 函数
struct aiocb
/* Asynchronous I/O control block. */
struct aiocb
{
int aio_fildes; /* File desriptor. */
int aio_lio_opcode; /* 要执行的操作。 */
int aio_reqprio; /* Request priority offset. */
volatile void *aio_buf; /* Location of buffer. */
size_t aio_nbytes; /* Length of transfer. */
struct sigevent aio_sigevent; /* Signal number and value. */
// sigevent 结构告诉 AIO 在 I/O 操作完成时应该执行什么操作
/* Internal members. */
struct aiocb *__next_prio;
int __abs_prio;
int __policy;
int __error_code;
__ssize_t __return_value;
#ifndef __USE_FILE_OFFSET64
__off_t aio_offset; /* File offset. */
char __pad[sizeof (__off64_t) - sizeof (__off_t)];
#else
__off64_t aio_offset; /* File offset. */
#endif
char __glibc_reserved[32];
};
aio_read的使用实例(aio_write的使用与之类似)
#include <iostream>
#include <aio.h>
#include <bits/stdc++.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
const int BUFSIZE = 1024;
using namespace std;
int main(void)
{
struct aiocb my_aiocb;
int fd = open("file.txt", O_RDONLY);
if (fd < 0)
perror("open");
bzero((char *)&my_aiocb, sizeof(my_aiocb));
my_aiocb.aio_buf = malloc(BUFSIZE);
if (!my_aiocb.aio_buf)
perror("my_aiocb.aio_buf");
my_aiocb.aio_fildes = fd;
my_aiocb.aio_nbytes = BUFSIZE;
my_aiocb.aio_offset = 0;
int ret = aio_read(&my_aiocb);
if (ret < 0)
perror("aio_read");
//看一下有没有正确的开始传输,如果失败可能需要再次调用等处理哦(和欢神讲的连接建立之后,getsockerror原理一样)
//EINPROGRESS 表示正在处理
//如果去掉你就会发现没有读取到数据,说明读取是异步的!!!
for (int i = 1; aio_error(&my_aiocb) == EINPROGRESS; i++)
{
cout << i << endl;
}
/*更加一般的写法应该是
if(判断是否正常进行) 如果是,进入下面的环节
*/
ret = aio_return(&my_aiocb);
if (ret > 0)
{
cout << ret << endl;
cout << (char *)my_aiocb.aio_buf << endl; //注意这里必须要转换一下
}
else
perror("aio_return");
close(fd);
free((void *)my_aiocb.aio_buf);
return 0;
}
aio_read 的本质实现:
创建一个线程,然后线程其实还是调用了同步的read/write函数,处理完成之后线程退出
aio_suspend 异步阻塞IO
aio_suspend 函数来挂起(或阻塞)调用进程,直到异步请求完成为止,此时会产生一个信号,或者发生其他超时操作。调用者提供了一个 aiocb 引用列表,其中任何一个完成都会导致 aio_suspend 返回
自我觉得这个真的没什么用,因为使用异步IO就是为了不阻塞,但是你这个会导致进程阻塞,就很奇怪,以后用到再说.
lio_listio 同时发起多个异步IO请求
这意味着我们可以在一个系统调用(一次内核上下文切换)中启动大量的 I/O 操作。从性能的角度来看,这非常重要
int lio_listio(int mode, struct aiocb *const aiocb_list[],
int nitems, struct sigevent *sevp);
- 第一个参数 mode :LIO_WAIT(阻塞直到所有的IO都完成) 或 LIO_NOWAIT(不阻塞,在进入排队 队列后就返回)
- 第二个参数 list : 异步IO请求队列.
- 第三个参数 nitems :异步IO请求队列长度
- 第四个参数 sevp :定义了在所有 I/O 操作都完成时产生信号的方法。
#include <iostream>
#include <aio.h>
#include <bits/stdc++.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
using namespace std;
const int BUFSIZE = 1024;
//异步读
void aio_read_file(struct aiocb *cbp, int fd, int size)
{
bzero(cbp, sizeof(cbp));
cbp->aio_buf = malloc(size + 1);
cbp->aio_nbytes = size;
cbp->aio_offset = 0;
cbp->aio_fildes = fd;
cbp->aio_lio_opcode = LIO_READ;
}
int main(void)
{
//异步请求列表
struct aiocb *io_list[2];
struct aiocb cbp1, cbp2;
int fd1 = open("test.txt", O_RDONLY);
if (fd1 < 0)
{
perror("open error\n");
}
aio_read_file(&cbp1, fd1, BUFSIZE);
int fd2 = open("test.txt", O_RDONLY);
if (fd2 < 0)
{
perror("open error\n");
}
aio_read_file(&cbp2, fd2, BUFSIZE * 4);
io_list[0] = &cbp1;
io_list[1] = &cbp2;
lio_listio(LIO_NOWAIT, io_list, 2, NULL);
//sleep(1); 不加就可能会异步IO没有完成,导致没有输出
cout << (char *)cbp1.aio_buf << endl;
cout << "1111111111111111111111111111" << endl
<< endl
<< endl;
cout << (char *)cbp1.aio_buf << endl;
}
对于读操作来说,aio_lio_opcode 域的值为 LIO_READ。
对于写操作来说,我们要使用 LIO_WRITE,
不过 LIO_NOP 对于不执行操作来说也是有效的。
异步IO通知机制
异步与同步的区别就是我们不需要等待异步操作返回就可以继续干其他的事情,当异步操作完成时可以通知我们去处理它。
主要有两种通知方式:
- 信号
- 函数回调
(1)信号通知
在发起异步请求时,可以指定当异步操作完成时给调用进程发送什么信号,这样调用收到此信号就会执行相应的信号处理函数.很明显,这是很不好的,因为在系统中可能会有很多很多信号,而且必须要注意的是在信号处理的时候一定不要有任何阻塞操作,不然就会导致整个进程阻塞(中断)
#include <iostream>
#include <aio.h>
#include <bits/stdc++.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
const int BUFSIZE = 1024;
using namespace std;
void aio_completion_handler(int signo, siginfo_t *info, void *text)
{
struct aiocb *req;
if (info->si_signo == SIGIO)
{
req = (struct aiocb *)info->si_value.sival_ptr; //注意这里
if (aio_error(req) == 0)
{
int ret = aio_return(req);
cout << "ret == " << ret << endl;
cout << (char *)req->aio_buf << endl;
}
}
close(req->aio_fildes);
free((void *)req->aio_buf);
while (1)
{
printf("正在执行回调函数。。。\n");
sleep(1);
}
}
int main(void)
{
struct aiocb my_aiocb;
struct sigaction sig_act;
/*set signal handler */
sigemptyset(&sig_act.sa_mask);
/*sa_flags 加上选项 SA_SIGINFO 的含义仅仅是表明:在处理信号的时候,会附带一个 siginfo_t* 类型的参数*/
sig_act.sa_flags = SA_SIGINFO;
sig_act.sa_sigaction = aio_completion_handler;
int fd = open("file.txt", O_RDONLY);
if (fd < 0)
perror("open");
bzero((char *)&my_aiocb, sizeof(my_aiocb));
my_aiocb.aio_buf = malloc(BUFSIZE);
if (!my_aiocb.aio_buf)
perror("my_aiocb.aio_buf");
my_aiocb.aio_fildes = fd;
my_aiocb.aio_nbytes = BUFSIZE;
my_aiocb.aio_offset = 0;
//装填信号信息
my_aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
my_aiocb.aio_sigevent.sigev_signo = SIGIO;
my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;
/*Map the Signal to the Signal Handler*/
int ret = sigaction(SIGIO, &sig_act, NULL);
ret = aio_read(&my_aiocb);
if (ret < 0)
perror("aio_read");
//调用进程继续执行
while (1)
{
printf("主线程继续执行。。。\n");
sleep(1);
}
return 0;
}
sigaction
可以传递参数,以后也将使用它,而不是signal
函数
(2)线程函数回调
顾名思义...就是我们经常写的回调函数(会另外开一个线程去处理,所以不会阻塞当前进程)
#include <iostream>
#include <aio.h>
#include <bits/stdc++.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
const int BUFSIZE = 1024;
using namespace std;
void aio_completion_handler(sigval_t sigval)
{
struct aiocb *req;
req = (struct aiocb *)sigval.sival_ptr; //注意这里
/*再次检查异步是否完成??*/
if (aio_error(req) == 0)
{
int ret = aio_return(req);
cout << "ret == " << ret << endl;
cout << (char *)req->aio_buf << endl;
}
close(req->aio_fildes);
free((void *)req->aio_buf);
while (1)
{
printf("正在执行回调函数。。。\n");
sleep(1);
}
}
int main(void)
{
struct aiocb my_aiocb;
int fd = open("file.txt", O_RDONLY);
if (fd < 0)
perror("open");
bzero((char *)&my_aiocb, sizeof(my_aiocb));
my_aiocb.aio_buf = malloc(BUFSIZE);
if (!my_aiocb.aio_buf)
perror("my_aiocb.aio_buf");
my_aiocb.aio_fildes = fd;
my_aiocb.aio_nbytes = BUFSIZE;
my_aiocb.aio_offset = 0;
//装填回调信息
/*
使用 SIGEV_THREAD 请求了一个线程回调函数来作为通知方法
*/
my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD;
my_aiocb.aio_sigevent.sigev_notify_function = aio_completion_handler;
my_aiocb.aio_sigevent.sigev_notify_attributes = NULL;
/*
将要传输的上下文加载到处理程序中(在这种情况中,是个对 aiocb 请求自己的引用)。
在这个处理程序中,我们简单地引用到达的 sigval 指针并使用 AIO 函数来验证请求已经完成。
*/
my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;
int ret = aio_read(&my_aiocb);
if (ret < 0)
perror("aio_read");
//调用进程继续执行
while (1)
{
printf("主线程继续执行。。。\n");
sleep(1);
}
return 0;
}
Linux 2.6中的异步IO-libio
API函数
异步IO环境与实现原理
异步IO环境
就是一组数据结构,用于跟踪进程请求的异步IO操作的运行情况.
kioctx
对象
每个AIO
环境与一个kioctx
对象关联,该对象存放了与环境有关的所有信息
一个进程可以创建多个AIO环境,一个进程对应的所有的kioctx
对象存放在一条单链表中,该链表会存放在mm_struct
中.
#endif
#ifdef CONFIG_AIO
spinlock_t ioctx_lock;
struct kioctx_table __rcu *ioctx_table;
#endif
AIO 环
一种用户态进程虚拟地址空间的内存缓冲区(其实就是在虚拟地址空间中-磁盘上),也被所有内核态的进程访问.
kioctx
对象中的ring_info.mmap_base
和ring_info.mmap_size
指向的自然是AIO环
起始地址和长度ring_info.ring_pages
字段就是存放一个数组指针,指针指向所有含AIO环的页框(物理内存)
AIO环实际上是一个环形缓冲区,内核用他来写正在运行的异步IO的完成报告.
AIO环的第一个字节有一个首部(struct rio_ring结构),其余的都是io_event
数据结构,每个都表示一个已经完成的异步IO操作.
AIO环中的页会被映射到用户态地址空间,应用就可以直接检查异步IO操作的,避免系统调用(这句话,我当时看的时候有点蒙蔽,没事,往下去看io_setup
就懂了)
io_setup
为调用进程创建一个的AIO环境.内核会通过mmap
在对应的用户地址空间分配一段内存,然后创建并初始化该AIO环境的kioctx
对象.由 aio_ring_info
结构中的 mmap_base
、mmap_size
描述这个映射对应的位置和大小,由ring_pages
描述实际分配的物理内存页面信息,异步 IO 完成后,内核会将异步 IO 的结果写入其中
int io_setup(unsigned nr_events, aio_context_t *ctx_idp);
// 1. nr_events: 正在运行的异步IO操作的最大数目,会确定AIO环的大小
// 2. 存放环境句柄的指针,AIO环的基地址
使用实例:
确保安装了libaio
和libaio-devel/libaio-dev
#include <iostream>
#include <aio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <libaio.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
const int BUFSIZE = 1024 * 1024;
const int MAX_EVENTS = 32;
using namespace std;
/*
- 或者这样使用
inline static int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp)
{
return syscall(__NR_io_submit, ctx, nr, iocbpp);
}
*/
int main(void)
{
io_context_t ctx = {0};
//1.初始化 io_context_t
int ret = io_setup(MAX_EVENTS, &ctx);
if (ret != 0)
perror("io_setup");
//2.open 文件取得fd
int fd = open("file.txt", O_RDWR | O_DIRECT);
if (fd < 0)
{
io_destroy(ctx);
perror("open");
}
struct iocb my_iocb;
struct iocb *my_iocb_ptr = &my_iocb;
void *buf = NULL;
/*对于更大的边界,例如页面,程序员需要动态的对齐。虽然动机是多种多样的,
但最常见的是直接块I/O的缓存的对齐或者其它的软件对硬件的交互*/
// 注意:使用 O_DIRECT, 必须要对齐
//获取页面大小
int pagesize = sysconf(_SC_PAGESIZE);
//处理对齐
posix_memalign(&buf, pagesize, BUFSIZE);
memset(buf, 'L', BUFSIZE);
struct io_event e[10];
//3.根据fd,buffer offset等息建立iocb
int n = MAX_EVENTS;
while (n--) //循环 32 次
{
io_prep_pwrite(&my_iocb, fd, buf, BUFSIZE, 0);
//通过io_prep_pwrite和io_prep_pread生成iocb的结构体,做为io_submit的参数。
// 这个结构体中指定了读写类型、起始扇区、长度和设备标志符。
if (io_submit(ctx, 1, &my_iocb_ptr) != 1) //也可以发起多个异步IO
{
io_destroy(ctx);
perror("io_submit");
}
int ret = io_getevents(ctx, 1, 10, e, NULL);
if (ret < 1)
{
perror("io_getevents");
break;
}
cout << "io_getevents :ret " << ret << endl;
}
close(fd);
io_destroy(ctx);
return 0;
}
参考资料:(一定要读!!!因为我写的很水)
Linux异步IO+实例(POSIX IO与 libaio)
使用异步 I/O 大大提高应用程序的性能
linux AIO – libaio 实现的异步 IO 简介及实现原理
异步同步是在通知机制来讲的
阻塞非阻塞是程序执行上来讲的