]> git.sesse.net Git - plocate/blob - io_uring_engine.cpp
Bump version number.
[plocate] / io_uring_engine.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <fcntl.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #ifndef WITHOUT_URING
8 #include <liburing.h>
9 #endif
10 #include "dprintf.h"
11 #include "io_uring_engine.h"
12
13 #include <functional>
14 #include <iosfwd>
15 #include <string>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #include <utility>
19
20 using namespace std;
21
22 IOUringEngine::IOUringEngine(size_t slop_bytes)
23         : slop_bytes(slop_bytes)
24 {
25 #ifdef WITHOUT_URING
26         int ret = -1;
27         dprintf("Compiled without liburing support; not using io_uring.\n");
28 #else
29         int ret = io_uring_queue_init(queue_depth, &ring, 0);
30         if (ret < 0) {
31                 dprintf("io_uring_queue_init() failed; not using io_uring.\n");
32         }
33 #endif
34         using_uring = (ret >= 0);
35
36 #ifndef WITHOUT_URING
37         if (using_uring) {
38                 io_uring_probe *probe = io_uring_get_probe_ring(&ring);
39                 supports_stat = (probe != nullptr && io_uring_opcode_supported(probe, IORING_OP_STATX));
40                 if (!supports_stat) {
41                         dprintf("io_uring on this kernel does not support statx(); will do synchronous access checking.\n");
42                 }
43                 free(probe);
44         }
45 #endif
46 }
47
48 void IOUringEngine::submit_stat(const char *path, std::function<void()> cb)
49 {
50         assert(supports_stat);
51
52 #ifndef WITHOUT_URING
53         if (pending_reads < queue_depth) {
54                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
55                 if (sqe == nullptr) {
56                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
57                         exit(1);
58                 }
59                 submit_stat_internal(sqe, strdup(path), move(cb));
60         } else {
61                 QueuedStat qs;
62                 qs.cb = move(cb);
63                 qs.pathname = strdup(path);
64                 queued_stats.push(move(qs));
65         }
66 #endif
67 }
68
69 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
70 {
71         if (!using_uring) {
72                 // Synchronous read.
73                 string s;
74                 s.resize(len + slop_bytes);
75                 complete_pread(fd, &s[0], len, offset);
76                 cb(string_view(s.data(), len));
77                 return;
78         }
79
80 #ifndef WITHOUT_URING
81         if (pending_reads < queue_depth) {
82                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
83                 if (sqe == nullptr) {
84                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
85                         exit(1);
86                 }
87                 submit_read_internal(sqe, fd, len, offset, move(cb));
88         } else {
89                 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
90         }
91 #endif
92 }
93
94 #ifndef WITHOUT_URING
95 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
96 {
97         void *buf;
98         if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
99                 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
100                 exit(1);
101         }
102
103         PendingRead *pending = new PendingRead;
104         pending->op = OP_READ;
105         pending->read_cb = move(cb);
106         pending->read.buf = buf;
107         pending->read.len = len;
108         pending->read.fd = fd;
109         pending->read.offset = offset;
110         pending->read.iov = iovec{ buf, len };
111
112         io_uring_prep_readv(sqe, fd, &pending->read.iov, 1, offset);
113         io_uring_sqe_set_data(sqe, pending);
114         ++pending_reads;
115 }
116
117 void IOUringEngine::submit_stat_internal(io_uring_sqe *sqe, char *path, std::function<void()> cb)
118 {
119         PendingRead *pending = new PendingRead;
120         pending->op = OP_STAT;
121         pending->stat_cb = move(cb);
122         pending->stat.pathname = path;
123         pending->stat.buf = new struct statx;
124
125         io_uring_prep_statx(sqe, /*fd=*/-1, pending->stat.pathname, AT_STATX_SYNC_AS_STAT, STATX_MODE, pending->stat.buf);
126         io_uring_sqe_set_data(sqe, pending);
127         ++pending_reads;
128 }
129 #endif
130
131 void IOUringEngine::finish()
132 {
133         if (!using_uring) {
134                 return;
135         }
136
137 #ifndef WITHOUT_URING
138         bool anything_to_submit = true;
139         while (pending_reads > 0) {
140                 io_uring_cqe *cqe;
141                 if (io_uring_peek_cqe(&ring, &cqe) != 0) {
142                         if (anything_to_submit) {
143                                 // Nothing ready, so submit whatever is pending and then do a blocking wait.
144                                 int ret = io_uring_submit_and_wait(&ring, 1);
145                                 if (ret < 0) {
146                                         fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
147                                         exit(1);
148                                 }
149                                 anything_to_submit = false;
150                         } else {
151                                 int ret = io_uring_wait_cqe(&ring, &cqe);
152                                 if (ret < 0) {
153                                         fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
154                                         exit(1);
155                                 }
156                         }
157                 }
158
159                 unsigned head;
160                 io_uring_for_each_cqe(&ring, head, cqe)
161                 {
162                         PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
163                         if (pending->op == OP_STAT) {
164                                 io_uring_cqe_seen(&ring, cqe);
165                                 --pending_reads;
166
167                                 size_t old_pending_reads = pending_reads;
168                                 pending->stat_cb();
169                                 free(pending->stat.pathname);
170                                 delete pending->stat.buf;
171                                 delete pending;
172
173                                 if (pending_reads != old_pending_reads) {
174                                         // A new read was made in the callback (and not queued),
175                                         // so we need to re-submit.
176                                         anything_to_submit = true;
177                                 }
178                         } else {
179                                 if (cqe->res <= 0) {
180                                         fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
181                                         exit(1);
182                                 }
183
184                                 if (size_t(cqe->res) < pending->read.iov.iov_len) {
185                                         // Incomplete read, so resubmit it.
186                                         pending->read.iov.iov_base = (char *)pending->read.iov.iov_base + cqe->res;
187                                         pending->read.iov.iov_len -= cqe->res;
188                                         pending->read.offset += cqe->res;
189                                         io_uring_cqe_seen(&ring, cqe);
190
191                                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
192                                         if (sqe == nullptr) {
193                                                 fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
194                                                 exit(1);
195                                         }
196                                         io_uring_prep_readv(sqe, pending->read.fd, &pending->read.iov, 1, pending->read.offset);
197                                         io_uring_sqe_set_data(sqe, pending);
198                                         anything_to_submit = true;
199                                 } else {
200                                         io_uring_cqe_seen(&ring, cqe);
201                                         --pending_reads;
202
203                                         size_t old_pending_reads = pending_reads;
204                                         pending->read_cb(string_view(reinterpret_cast<char *>(pending->read.buf), pending->read.len));
205                                         free(pending->read.buf);
206                                         delete pending;
207
208                                         if (pending_reads != old_pending_reads) {
209                                                 // A new read was made in the callback (and not queued),
210                                                 // so we need to re-submit.
211                                                 anything_to_submit = true;
212                                         }
213                                 }
214                         }
215                 }
216
217                 // See if there are any queued stats we can submit now.
218                 // Running a stat means we're very close to printing out a match,
219                 // which is more important than reading more blocks from disk.
220                 // (Even if those blocks returned early, they would only generate
221                 // more matches that would be blocked by this one in Serializer.)
222                 // Thus, prioritize stats.
223                 while (!queued_stats.empty() && pending_reads < queue_depth) {
224                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
225                         if (sqe == nullptr) {
226                                 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
227                                 exit(1);
228                         }
229                         QueuedStat &qs = queued_stats.front();
230                         submit_stat_internal(sqe, qs.pathname, move(qs.cb));
231                         queued_stats.pop();
232                         anything_to_submit = true;
233                 }
234
235                 // See if there are any queued reads we can submit now.
236                 while (!queued_reads.empty() && pending_reads < queue_depth) {
237                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
238                         if (sqe == nullptr) {
239                                 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
240                                 exit(1);
241                         }
242                         QueuedRead &qr = queued_reads.front();
243                         submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
244                         queued_reads.pop();
245                         anything_to_submit = true;
246                 }
247         }
248 #endif
249 }
250
251 void complete_pread(int fd, void *ptr, size_t len, off_t offset)
252 {
253         while (len > 0) {
254                 ssize_t ret = pread(fd, ptr, len, offset);
255                 if (ret == -1 && errno == EINTR) {
256                         continue;
257                 }
258                 if (ret <= 0) {
259                         perror("pread");
260                         exit(1);
261                 }
262                 ptr = reinterpret_cast<char *>(ptr) + ret;
263                 len -= ret;
264                 offset -= ret;
265         }
266 }