]> git.sesse.net Git - plocate/blob - io_uring_engine.cpp
Release plocate 1.1.22.
[plocate] / io_uring_engine.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <fcntl.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #ifndef WITHOUT_URING
8 #include <liburing.h>
9 #endif
10 #include "complete_pread.h"
11 #include "dprintf.h"
12 #include "io_uring_engine.h"
13
14 #include <functional>
15 #include <iosfwd>
16 #include <string>
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #include <utility>
20
21 using namespace std;
22
23 IOUringEngine::IOUringEngine(size_t slop_bytes)
24         : slop_bytes(slop_bytes)
25 {
26 #ifdef WITHOUT_URING
27         int ret = -1;
28         dprintf("Compiled without liburing support; not using io_uring.\n");
29 #else
30         int ret = io_uring_queue_init(queue_depth, &ring, 0);
31         if (ret < 0) {
32                 dprintf("io_uring_queue_init() failed; not using io_uring.\n");
33         }
34 #endif
35         using_uring = (ret >= 0);
36
37 #ifndef WITHOUT_URING
38         if (using_uring) {
39                 io_uring_probe *probe = io_uring_get_probe_ring(&ring);
40                 supports_stat = (probe != nullptr && io_uring_opcode_supported(probe, IORING_OP_STATX));
41                 if (!supports_stat) {
42                         dprintf("io_uring on this kernel does not support statx(); will do synchronous access checking.\n");
43                 }
44                 free(probe);
45         }
46 #endif
47 }
48
49 void IOUringEngine::submit_stat(const char *path [[maybe_unused]], std::function<void(bool)> cb [[maybe_unused]])
50 {
51         assert(supports_stat);
52
53 #ifndef WITHOUT_URING
54         if (pending_reads < queue_depth) {
55                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
56                 if (sqe == nullptr) {
57                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
58                         exit(1);
59                 }
60                 submit_stat_internal(sqe, strdup(path), move(cb));
61         } else {
62                 QueuedStat qs;
63                 qs.cb = move(cb);
64                 qs.pathname = strdup(path);
65                 queued_stats.push(move(qs));
66         }
67 #endif
68 }
69
70 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
71 {
72         if (!using_uring) {
73                 // Synchronous read.
74                 string s;
75                 s.resize(len + slop_bytes);
76                 complete_pread(fd, &s[0], len, offset);
77                 cb(string_view(s.data(), len));
78                 return;
79         }
80
81 #ifndef WITHOUT_URING
82         if (pending_reads < queue_depth) {
83                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
84                 if (sqe == nullptr) {
85                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
86                         exit(1);
87                 }
88                 submit_read_internal(sqe, fd, len, offset, move(cb));
89         } else {
90                 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
91         }
92 #endif
93 }
94
95 #ifndef WITHOUT_URING
96 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
97 {
98         void *buf;
99         if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
100                 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
101                 exit(1);
102         }
103
104         PendingRead *pending = new PendingRead;
105         pending->op = OP_READ;
106         pending->read_cb = move(cb);
107         pending->read.buf = buf;
108         pending->read.len = len;
109         pending->read.fd = fd;
110         pending->read.offset = offset;
111         pending->read.iov = iovec{ buf, len };
112
113         io_uring_prep_readv(sqe, fd, &pending->read.iov, 1, offset);
114         io_uring_sqe_set_data(sqe, pending);
115         ++pending_reads;
116 }
117
118 void IOUringEngine::submit_stat_internal(io_uring_sqe *sqe, char *path, std::function<void(bool)> cb)
119 {
120         PendingRead *pending = new PendingRead;
121         pending->op = OP_STAT;
122         pending->stat_cb = move(cb);
123         pending->stat.pathname = path;
124         pending->stat.buf = new struct statx;
125
126         io_uring_prep_statx(sqe, /*fd=*/-1, pending->stat.pathname, AT_STATX_SYNC_AS_STAT | AT_SYMLINK_NOFOLLOW, STATX_MODE, pending->stat.buf);
127         io_uring_sqe_set_data(sqe, pending);
128         ++pending_reads;
129 }
130 #endif
131
132 void IOUringEngine::finish()
133 {
134         if (!using_uring) {
135                 return;
136         }
137
138 #ifndef WITHOUT_URING
139         bool anything_to_submit = true;
140         while (pending_reads > 0) {
141                 io_uring_cqe *cqe;
142                 if (io_uring_peek_cqe(&ring, &cqe) != 0) {
143                         if (anything_to_submit) {
144                                 // Nothing ready, so submit whatever is pending and then do a blocking wait.
145                                 int ret = io_uring_submit_and_wait(&ring, 1);
146                                 if (ret < 0) {
147                                         fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
148                                         exit(1);
149                                 }
150                                 anything_to_submit = false;
151                         } else {
152                                 int ret = io_uring_wait_cqe(&ring, &cqe);
153                                 if (ret < 0) {
154                                         fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
155                                         exit(1);
156                                 }
157                         }
158                 }
159
160                 unsigned head;
161                 io_uring_for_each_cqe(&ring, head, cqe)
162                 {
163                         PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
164                         if (pending->op == OP_STAT) {
165                                 io_uring_cqe_seen(&ring, cqe);
166                                 --pending_reads;
167
168                                 size_t old_pending_reads = pending_reads;
169                                 pending->stat_cb(cqe->res == 0);
170                                 free(pending->stat.pathname);
171                                 delete pending->stat.buf;
172                                 delete pending;
173
174                                 if (pending_reads != old_pending_reads) {
175                                         // A new read was made in the callback (and not queued),
176                                         // so we need to re-submit.
177                                         anything_to_submit = true;
178                                 }
179                         } else {
180                                 if (cqe->res <= 0) {
181                                         fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
182                                         exit(1);
183                                 }
184
185                                 if (size_t(cqe->res) < pending->read.iov.iov_len) {
186                                         // Incomplete read, so resubmit it.
187                                         pending->read.iov.iov_base = (char *)pending->read.iov.iov_base + cqe->res;
188                                         pending->read.iov.iov_len -= cqe->res;
189                                         pending->read.offset += cqe->res;
190                                         io_uring_cqe_seen(&ring, cqe);
191
192                                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
193                                         if (sqe == nullptr) {
194                                                 fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
195                                                 exit(1);
196                                         }
197                                         io_uring_prep_readv(sqe, pending->read.fd, &pending->read.iov, 1, pending->read.offset);
198                                         io_uring_sqe_set_data(sqe, pending);
199                                         anything_to_submit = true;
200                                 } else {
201                                         io_uring_cqe_seen(&ring, cqe);
202                                         --pending_reads;
203
204                                         size_t old_pending_reads = pending_reads;
205                                         pending->read_cb(string_view(reinterpret_cast<char *>(pending->read.buf), pending->read.len));
206                                         free(pending->read.buf);
207                                         delete pending;
208
209                                         if (pending_reads != old_pending_reads) {
210                                                 // A new read was made in the callback (and not queued),
211                                                 // so we need to re-submit.
212                                                 anything_to_submit = true;
213                                         }
214                                 }
215                         }
216                 }
217
218                 // See if there are any queued stats we can submit now.
219                 // Running a stat means we're very close to printing out a match,
220                 // which is more important than reading more blocks from disk.
221                 // (Even if those blocks returned early, they would only generate
222                 // more matches that would be blocked by this one in Serializer.)
223                 // Thus, prioritize stats.
224                 while (!queued_stats.empty() && pending_reads < queue_depth) {
225                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
226                         if (sqe == nullptr) {
227                                 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
228                                 exit(1);
229                         }
230                         QueuedStat &qs = queued_stats.front();
231                         submit_stat_internal(sqe, qs.pathname, move(qs.cb));
232                         queued_stats.pop();
233                         anything_to_submit = true;
234                 }
235
236                 // See if there are any queued reads we can submit now.
237                 while (!queued_reads.empty() && pending_reads < queue_depth) {
238                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
239                         if (sqe == nullptr) {
240                                 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
241                                 exit(1);
242                         }
243                         QueuedRead &qr = queued_reads.front();
244                         submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
245                         queued_reads.pop();
246                         anything_to_submit = true;
247                 }
248         }
249 #endif
250 }