]> git.sesse.net Git - plocate/blob - io_uring_engine.cpp
fix unused parameter warns in io_uring_engine.cpp
[plocate] / io_uring_engine.cpp
1 #include <assert.h>
2 #include <errno.h>
3 #include <fcntl.h>
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <string.h>
7 #ifndef WITHOUT_URING
8 #include <liburing.h>
9 #endif
10 #include "complete_pread.h"
11 #include "dprintf.h"
12 #include "io_uring_engine.h"
13
14 #include <functional>
15 #include <iosfwd>
16 #include <string>
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #include <utility>
20
21 using namespace std;
22
23 IOUringEngine::IOUringEngine(size_t slop_bytes)
24         : slop_bytes(slop_bytes)
25 {
26 #ifdef WITHOUT_URING
27         int ret = -1;
28         dprintf("Compiled without liburing support; not using io_uring.\n");
29 #else
30         int ret = io_uring_queue_init(queue_depth, &ring, 0);
31         if (ret < 0) {
32                 dprintf("io_uring_queue_init() failed; not using io_uring.\n");
33         }
34 #endif
35         using_uring = (ret >= 0);
36
37 #ifndef WITHOUT_URING
38         if (using_uring) {
39                 io_uring_probe *probe = io_uring_get_probe_ring(&ring);
40                 supports_stat = (probe != nullptr && io_uring_opcode_supported(probe, IORING_OP_STATX));
41                 if (!supports_stat) {
42                         dprintf("io_uring on this kernel does not support statx(); will do synchronous access checking.\n");
43                 }
44                 free(probe);
45         }
46 #endif
47 }
48
49 void IOUringEngine::submit_stat(const char *path, std::function<void(bool)> cb)
50 {
51         assert(supports_stat);
52
53 #ifndef WITHOUT_URING
54         if (pending_reads < queue_depth) {
55                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
56                 if (sqe == nullptr) {
57                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
58                         exit(1);
59                 }
60                 submit_stat_internal(sqe, strdup(path), move(cb));
61         } else {
62                 QueuedStat qs;
63                 qs.cb = move(cb);
64                 qs.pathname = strdup(path);
65                 queued_stats.push(move(qs));
66         }
67 #else
68         /* unused parameters */
69         (void)path;
70         (void)cb;
71 #endif
72 }
73
74 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
75 {
76         if (!using_uring) {
77                 // Synchronous read.
78                 string s;
79                 s.resize(len + slop_bytes);
80                 complete_pread(fd, &s[0], len, offset);
81                 cb(string_view(s.data(), len));
82                 return;
83         }
84
85 #ifndef WITHOUT_URING
86         if (pending_reads < queue_depth) {
87                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
88                 if (sqe == nullptr) {
89                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
90                         exit(1);
91                 }
92                 submit_read_internal(sqe, fd, len, offset, move(cb));
93         } else {
94                 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
95         }
96 #endif
97 }
98
99 #ifndef WITHOUT_URING
100 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
101 {
102         void *buf;
103         if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
104                 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
105                 exit(1);
106         }
107
108         PendingRead *pending = new PendingRead;
109         pending->op = OP_READ;
110         pending->read_cb = move(cb);
111         pending->read.buf = buf;
112         pending->read.len = len;
113         pending->read.fd = fd;
114         pending->read.offset = offset;
115         pending->read.iov = iovec{ buf, len };
116
117         io_uring_prep_readv(sqe, fd, &pending->read.iov, 1, offset);
118         io_uring_sqe_set_data(sqe, pending);
119         ++pending_reads;
120 }
121
122 void IOUringEngine::submit_stat_internal(io_uring_sqe *sqe, char *path, std::function<void(bool)> cb)
123 {
124         PendingRead *pending = new PendingRead;
125         pending->op = OP_STAT;
126         pending->stat_cb = move(cb);
127         pending->stat.pathname = path;
128         pending->stat.buf = new struct statx;
129
130         io_uring_prep_statx(sqe, /*fd=*/-1, pending->stat.pathname, AT_STATX_SYNC_AS_STAT | AT_SYMLINK_NOFOLLOW, STATX_MODE, pending->stat.buf);
131         io_uring_sqe_set_data(sqe, pending);
132         ++pending_reads;
133 }
134 #endif
135
136 void IOUringEngine::finish()
137 {
138         if (!using_uring) {
139                 return;
140         }
141
142 #ifndef WITHOUT_URING
143         bool anything_to_submit = true;
144         while (pending_reads > 0) {
145                 io_uring_cqe *cqe;
146                 if (io_uring_peek_cqe(&ring, &cqe) != 0) {
147                         if (anything_to_submit) {
148                                 // Nothing ready, so submit whatever is pending and then do a blocking wait.
149                                 int ret = io_uring_submit_and_wait(&ring, 1);
150                                 if (ret < 0) {
151                                         fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
152                                         exit(1);
153                                 }
154                                 anything_to_submit = false;
155                         } else {
156                                 int ret = io_uring_wait_cqe(&ring, &cqe);
157                                 if (ret < 0) {
158                                         fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
159                                         exit(1);
160                                 }
161                         }
162                 }
163
164                 unsigned head;
165                 io_uring_for_each_cqe(&ring, head, cqe)
166                 {
167                         PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
168                         if (pending->op == OP_STAT) {
169                                 io_uring_cqe_seen(&ring, cqe);
170                                 --pending_reads;
171
172                                 size_t old_pending_reads = pending_reads;
173                                 pending->stat_cb(cqe->res == 0);
174                                 free(pending->stat.pathname);
175                                 delete pending->stat.buf;
176                                 delete pending;
177
178                                 if (pending_reads != old_pending_reads) {
179                                         // A new read was made in the callback (and not queued),
180                                         // so we need to re-submit.
181                                         anything_to_submit = true;
182                                 }
183                         } else {
184                                 if (cqe->res <= 0) {
185                                         fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
186                                         exit(1);
187                                 }
188
189                                 if (size_t(cqe->res) < pending->read.iov.iov_len) {
190                                         // Incomplete read, so resubmit it.
191                                         pending->read.iov.iov_base = (char *)pending->read.iov.iov_base + cqe->res;
192                                         pending->read.iov.iov_len -= cqe->res;
193                                         pending->read.offset += cqe->res;
194                                         io_uring_cqe_seen(&ring, cqe);
195
196                                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
197                                         if (sqe == nullptr) {
198                                                 fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
199                                                 exit(1);
200                                         }
201                                         io_uring_prep_readv(sqe, pending->read.fd, &pending->read.iov, 1, pending->read.offset);
202                                         io_uring_sqe_set_data(sqe, pending);
203                                         anything_to_submit = true;
204                                 } else {
205                                         io_uring_cqe_seen(&ring, cqe);
206                                         --pending_reads;
207
208                                         size_t old_pending_reads = pending_reads;
209                                         pending->read_cb(string_view(reinterpret_cast<char *>(pending->read.buf), pending->read.len));
210                                         free(pending->read.buf);
211                                         delete pending;
212
213                                         if (pending_reads != old_pending_reads) {
214                                                 // A new read was made in the callback (and not queued),
215                                                 // so we need to re-submit.
216                                                 anything_to_submit = true;
217                                         }
218                                 }
219                         }
220                 }
221
222                 // See if there are any queued stats we can submit now.
223                 // Running a stat means we're very close to printing out a match,
224                 // which is more important than reading more blocks from disk.
225                 // (Even if those blocks returned early, they would only generate
226                 // more matches that would be blocked by this one in Serializer.)
227                 // Thus, prioritize stats.
228                 while (!queued_stats.empty() && pending_reads < queue_depth) {
229                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
230                         if (sqe == nullptr) {
231                                 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
232                                 exit(1);
233                         }
234                         QueuedStat &qs = queued_stats.front();
235                         submit_stat_internal(sqe, qs.pathname, move(qs.cb));
236                         queued_stats.pop();
237                         anything_to_submit = true;
238                 }
239
240                 // See if there are any queued reads we can submit now.
241                 while (!queued_reads.empty() && pending_reads < queue_depth) {
242                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
243                         if (sqe == nullptr) {
244                                 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
245                                 exit(1);
246                         }
247                         QueuedRead &qr = queued_reads.front();
248                         submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
249                         queued_reads.pop();
250                         anything_to_submit = true;
251                 }
252         }
253 #endif
254 }