一瞥io_uring
LI Rui

当谈到Linux你会想到什么?想到一些Ubuntu、Manjaro、Arch等知名Flavors,想到巨量的C语言代码,想到某些令人生畏的东西?

谈到Linux我会想到调用内核API以做到我们想做的事情,同时需要面临冗长的代码和文档。但有些东西总要想办法啃下来,就像高等数学、离散数学、概率论一样(至少不挂科),对于io_uring我们也要搞明白至少怎么用

什么是io_uring

2019年1月12日,Jens Axboe介绍了一个名为“io_uring”的东西,原文如下:

io_uring is a submission queue (SQ) and completion queue (CQ) pair that an application can use to communicate with the kernel for doing IO. This isn’t aio/libaio, but it provides a similar set of features, as well as some new ones[omitted]

大致翻译过来就是io_uring由一对队列组成,分为提交队列和完成队列。我们可以往提交队列里面添加我们的任务,然后交给操作系统去完成,当有任务完成后我们就能从完成队列中获取到最新的状态。

Okay,这就是io_uring。

怎么用io_uring

使用linux/io_uring.h

panic!()

使用liburing

liburing是对于linux/io_uring.h的高级封装,这使得我们能够以友好的方式去使用io_uring,下面是我写的一个echo程序,尽量添加了注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>

#include <sys/mman.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include <liburing.h>

#define PORT 2255 // Port to listen on
#define DEPTH 256 // Number of entries in the ring
#define BUFSIZE 4096 // Size of the buffer

struct io_uring ring;

enum RequestStatus {
Accept,
Read,
Write,
};

struct Request {
enum RequestStatus status;
int fd;
int count;
struct iovec iov[];
};

// Submit accept to the ring
void submit_accept(int server_fd, struct sockaddr_in *addr, socklen_t *addrlen) {
// Get submission queue
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
// Set sqe with accept like operation
io_uring_prep_accept(sqe, server_fd, (struct sockaddr *)addr, addrlen, 0);
// Add Request to sqe user data
struct Request *req = malloc(sizeof(*req));
req->status = Accept;
// Set user data
io_uring_sqe_set_data(sqe, req);
// Submit sqe to the ring
io_uring_submit(&ring);
}

void submit_read(int socket_fd) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
// Setup Request struct and buffer
struct Request *req = malloc(sizeof(*req) + sizeof(struct iovec));
req->status = Read;
req->fd = socket_fd;
req->iov[0].iov_len = BUFSIZE;
req->iov[0].iov_base = malloc(BUFSIZE);
memset(req->iov[0].iov_base, 0, BUFSIZE);
// Set sqe with readv like operation
io_uring_prep_readv(sqe, socket_fd, &req->iov[0], 1, 0);
// Set user data
io_uring_sqe_set_data(sqe, req);
// Submit sqe to the ring
io_uring_submit(&ring);
}

void submit_write(struct Request *req) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
// Clone Request as req will be freed soon
struct Request *cloned_req = malloc(sizeof(*req) + sizeof(struct iovec));
cloned_req->status = Write;
cloned_req->fd = req->fd;
cloned_req->iov[0].iov_len = BUFSIZE;
cloned_req->iov[0].iov_base = malloc(BUFSIZE);
memcpy(cloned_req->iov[0].iov_base, req->iov[0].iov_base, req->iov[0].iov_len);
// Set sqe with writev like operation
io_uring_prep_writev(sqe, cloned_req->fd, cloned_req->iov, 1, 0);
io_uring_sqe_set_data(sqe, cloned_req);
io_uring_submit(&ring);
}

void server(int server_fd) {
struct io_uring_cqe *cqe;
struct sockaddr_in addr;
socklen_t addr_len = sizeof(struct sockaddr_in);

submit_accept(server_fd, &addr, &addr_len);

while (1) {
int ret = io_uring_wait_cqe(&ring, &cqe);
struct Request *req = (struct Request *)cqe->user_data;
if (ret) {
printf("io_uring_wait_cqe failed: %d\n", ret);
break;
}
if (cqe->res < 0) {
printf("%d failed: %d\n", req->status, cqe->res);
break;
}

switch (req->status) {
case Accept:
// Submit accept to get more connections
submit_accept(server_fd, &addr, &addr_len);
// cqe res for client socket fd
submit_read(cqe->res);
free(req);
break;
case Read:
submit_write(req);
free(req->iov[0].iov_base);
free(req);
break;
case Write:
close(req->fd);
free(req->iov[0].iov_base);
free(req);
break;
}

// Mark our user application has seen cqe
io_uring_cqe_seen(&ring, cqe);
}
}

void sigint_handler(int signo) {
printf("Shutting down\n");
io_uring_queue_exit(&ring);
exit(0);
}

int main() {
int ret;

// Create server socket
int server_fd = socket(AF_INET, SOCK_STREAM, 0); // IP TCP Auto Protocol
if (server_fd < 0) {
perror("Failed to create socket");
exit(EXIT_FAILURE);
}

struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);

// Make socket re-usable
ret = setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int));
if (ret < 0) {
perror("Failed to set socket options");
exit(EXIT_FAILURE);
}

// Bind socket to local host and port
ret = bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (ret < 0) {
perror("Failed to bind");
exit(EXIT_FAILURE);
}

// Listen to incoming connections
ret = listen(server_fd, 10);
if (ret < 0) {
perror("Failed to listen");
exit(EXIT_FAILURE);
}
printf("Listening on port %d\n", PORT);

// Initialize ring
io_uring_queue_init(DEPTH, &ring, 0);

signal(SIGINT, sigint_handler);
server(server_fd);

return 0;
}

一共172行,我们先和普通io一样先设置了监听的socket,然后标记socket可以复用(即端口释放后立即可用,不需要等待)。之后我们通过io_uring_queue_init去初始化io_uring的两个队列,这里队列的长度为常量DEPTH

进入我们正式的循环,这实际上是一个状态机,我们从Accept->Read->Write->(close)进行状态的转移。我们先通过submit_accept函数向sqe中添加一个accept任务,并附上了我们自定义的结构体Request作为状态的跟踪和数据保存。io_uring_sqe结构体中会保存一个指向用户数据的指针,在完成的时候返回。

当accept任务完成,我们从cqe完成队列取得返回的Request结构体,判断状态是Accept,下面我们就提交read任务,并新添加一个accept任务来接受新的连接。write也大致类似。

是不是看上去并不复杂呢?我们不断向sqe中添加我们需要的操作,然后从完成队列中知道哪些任务完成了。需要注意的是我们一定要在最后调用io_uring_cqe_seen表示我们用户程序已经处理了cqe,否则程序将会意外情况。

Rust下面怎么用呢

如果是基于io_uring的异步runtime,推荐两个:

如果想直接调用io_uring相关API,有两个思路,这也是上面的Runtime的方法:

  • 使用Crates.io中现成的bingdings
  • 将liburing编译成动态链接库使用libc去调用
  • 本文标题:一瞥io_uring
  • 本文作者:LI Rui
  • 创建时间:2022-01-22 22:28:42
  • 本文链接:https://www.lirui.tech/post/2022/6562d2d06d6d.html
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-SA 许可协议。转载请注明出处!