在Direct IO模式下,異步是非常有必要的(因為繞過了pagecache,直接和磁盤交互)。linux Native AIO正是基于這種場景設計的,具體的介紹見:KernelAsynchronousI/O (AIO)SupportforLinux。下面我們就來分析一下AIO編程的相關知識。
阻塞模式下的IO過程如下:
int fd = open(const char *pathname, int flags, mode_t mode);
ssize_t pread(int fd, void *buf, size_t count, off_t offset);
ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset);
int close(int fd);
因為整個過程會等待read/write的返回,所以不需要任何額外的數據結構。但異步IO的思想是:應用程序不能阻塞在昂貴的系統調用上讓CPU睡大覺,而是將IO操作抽象成一個個的任務單元提交給內核,內核完成IO任務后將結果放在應用程序可以取到的地方。這樣在底層做I/O的這段時間內,CPU可以去干其他的計算任務。但異步的IO任務批量的提交和完成,必須有自身可描述的結構,最重要的兩個就是iocb和io_event。
libaio中的structs
struct iocb {
void *data; /* Return in the io completion event */
unsigned key; /*r use in identifying io requests */
short aio_lio_opcode;
short aio_reqprio;
int aio_fildes;
union {
struct io_iocb_common c;
struct io_iocb_vector v;
struct io_iocb_poll poll;
struct io_iocb_sockaddr saddr;
} u;
};
struct io_iocb_common {
void *buf;
unsigned long nbytes;
long long offset;
unsigned flags;
unsigned resfd;
};
iocb是提交IO任務時用到的,可以完整地描述一個IO請求:
data是留給用來自定義的指針:可以設置為IO完成后的callback函數;
aio_lio_opcode表示操作的類型:IO_CMD_PWRITE | IO_CMD_PREAD;
aio_fildes是要操作的文件:fd;
io_iocb_common中的buf, nbytes, offset分別記錄的IO請求的mem buffer,大小和偏移。
struct io_event {
void *data;
struct iocb *obj;
unsigned long res;
unsigned long res2;
};
io_event是用來描述返回結果的:
obj就是之前提交IO任務時的iocb;
res和res2來表示IO任務完成的狀態。
libaio提供的API和完成IO的過程
libaio提供的API有:io_setup, io_submit, io_getevents, io_destroy。
- 建立IO任務
int io_setup (int maxevents, io_context_t *ctxp);
io_context_t對應內核中一個結構,為異步IO請求提供上下文環境。注意在setup前必須將io_context_t初始化為0。
當然,這里也需要open需要操作的文件,注意設置O_DIRECT標志。
2.提交IO任務
long io_submit (aio_context_t ctx_id, long nr, struct iocb **iocbpp);
提交任務之前必須先填充iocb結構體,libaio提供的包裝函數說明了需要完成的工作:
void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
memset(iocb, 0, sizeof(*iocb));
iocb- >aio_fildes = fd;
iocb- >aio_lio_opcode = IO_CMD_PREAD;
iocb- >aio_reqprio = 0;
iocb- >u.c.buf = buf;
iocb- >u.c.nbytes = count;
iocb- >u.c.offset = offset;
}
void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset)
{
memset(iocb, 0, sizeof(*iocb));
iocb- >aio_fildes = fd;
iocb- >aio_lio_opcode = IO_CMD_PWRITE;
iocb- >aio_reqprio = 0;
iocb- >u.c.buf = buf;
iocb- >u.c.nbytes = count;
iocb- >u.c.offset = offset;
}
這里注意讀寫的buf都必須是按扇區對齊的,可以用posix_memalign來分配。
3.獲取完成的IO
long io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);
這里最重要的就是提供一個io_event數組給內核來copy完成的IO請求到這里,數組的大小是io_setup時指定的maxevents。
timeout是指等待IO完成的超時時間,設置為NULL表示一直等待所有到IO的完成。
4.銷毀IO任務
int io_destroy (io_context_t ctx);
libaio和epoll的結合
在異步編程中,任何一個環節的阻塞都會導致整個程序的阻塞,所以一定要避免在io_getevents調用時阻塞式的等待。還記得io_iocb_common中的flags和resfd嗎?看看libaio是如何提供io_getevents和事件循環的結合:
void io_set_eventfd(struct iocb *iocb, int eventfd)
{
iocb- >u.c.flags |= (1 < < 0) /* IOCB_FLAG_RESFD */;
iocb- >u.c.resfd = eventfd;
}
這里的resfd是通過系統調用eventfd生成的。
int eventfd(unsigned int initval, int flags);
eventfd是linux 2.6.22內核之后加進來的syscall,作用是內核用來通知應用程序發生的事件的數量,從而使應用程序不用頻繁地去輪詢內核是否有時間發生,而是由內核將發生事件的數量寫入到該fd,應用程序發現fd可讀后,從fd讀取該數值,并馬上去內核讀取。
有了eventfd,就可以很好地將libaio和epoll事件循環結合起來:
- 創建一個eventfd
efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
- 將eventfd設置到iocb中
io_set_eventfd(iocb, efd);
- 交接AIO請求
io_submit(ctx, NUM_EVENTS, iocb);
- 創建一個epollfd,并將eventfd加到epoll中
epfd = epoll_create(1);
epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent);
epoll_wait(epfd, &epevent, 1, -1);
- 當eventfd可讀時,從eventfd讀出完成IO請求的數量,并調用io_getevents獲取這些IO
read(efd, &finished_aio, sizeof(finished_aio);
r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
異步非阻塞IO模型的流程圖
一個完整的編程實例
#define _GNU_SOURCE
#define __STDC_FORMAT_MACROS
#include < stdio.h >
#include < errno.h >
#include < libaio.h >
#include < sys/eventfd.h >
#include < sys/epoll.h >
#include < stdlib.h >
#include < sys/types.h >
#include < unistd.h >
#include < stdint.h >
#include < sys/stat.h >
#include < fcntl.h >
#include < inttypes.h >
#define TEST_FILE "aio_test_file"
#define TEST_FILE_SIZE (127 * 1024)
#define NUM_EVENTS 128
#define ALIGN_SIZE 512
#define RD_WR_SIZE 1024
struct custom_iocb
{
struct iocb iocb;
int nth_request;
};
void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ldn",
iocbp- >nth_request, (iocb- >aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE",
iocb- >u.c.offset, iocb- >u.c.nbytes, res, res2);
}
int main(int argc, char *argv[])
{
int efd, fd, epfd;
io_context_t ctx;
struct timespec tms;
struct io_event events[NUM_EVENTS];
struct custom_iocb iocbs[NUM_EVENTS];
struct iocb *iocbps[NUM_EVENTS];
struct custom_iocb *iocbp;
int i, j, r;
void *buf;
struct epoll_event epevent;
efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (efd == -1) {
perror("eventfd");
return 2;
}
fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644);
if (fd == -1) {
perror("open");
return 3;
}
ftruncate(fd, TEST_FILE_SIZE);
ctx = 0;
if (io_setup(8192, &ctx)) {
perror("io_setup");
return 4;
}
if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) {
perror("posix_memalign");
return 5;
}
printf("buf: %pn", buf);
for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
iocbps[i] = &iocbp- >iocb;
io_prep_pread(&iocbp- >iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);
io_set_eventfd(&iocbp- >iocb, efd);
io_set_callback(&iocbp- >iocb, aio_callback);
iocbp- >nth_request = i + 1;
}
if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
perror("io_submit");
return 6;
}
epfd = epoll_create(1);
if (epfd == -1) {
perror("epoll_create");
return 7;
}
epevent.events = EPOLLIN | EPOLLET;
epevent.data.ptr = NULL;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
perror("epoll_ctl");
return 8;
}
i = 0;
while (i < NUM_EVENTS) {
uint64_t finished_aio;
if (epoll_wait(epfd, &epevent, 1, -1) != 1) {
perror("epoll_wait");
return 9;
}
if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) {
perror("read");
return 10;
}
printf("finished io number: %"PRIu64"n", finished_aio);
while (finished_aio > 0) {
tms.tv_sec = 0;
tms.tv_nsec = 0;
r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
if (r > 0) {
for (j = 0; j < r; ++j) {
((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2);
}
i += r;
finished_aio -= r;
}
}
}
close(epfd);
free(buf);
io_destroy(ctx);
close(fd);
close(efd);
remove(TEST_FILE);
return 0;
}
說明:
- 在centos 6.2 (libaio-devel 0.3.107-10) 上運行通過
- struct io_event中的res字段表示讀到的字節數或者一個負數錯誤碼。在后一種情況下,-res表示對應的
errno。res2字段為0表示成功,否則失敗
- iocb在aio請求執行過程中必須是valid的
- 在上面的程序中,通過擴展iocb結構來保存額外的信息(nth_request),并使用iocb.data
來保存回調函數的地址。如果回調函數是固定的,那么也可以使用iocb.data來保存額外信息。
-
cpu
+關注
關注
68文章
10863瀏覽量
211763 -
編程
+關注
關注
88文章
3616瀏覽量
93735 -
數據結構
+關注
關注
3文章
573瀏覽量
40130 -
AIO
+關注
關注
1文章
61瀏覽量
9985
發布評論請先 登錄
相關推薦
評論