]> git.sesse.net Git - plocate/blob - io_uring_engine.cpp
Add debug output if io_uring initialization fails.
[plocate] / io_uring_engine.cpp
1 #include <errno.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #ifndef WITHOUT_URING
6 #include <liburing.h>
7 #endif
8 #include "dprintf.h"
9 #include "io_uring_engine.h"
10
11 #include <functional>
12 #include <iosfwd>
13 #include <string>
14 #include <unistd.h>
15 #include <utility>
16
17 using namespace std;
18
19 IOUringEngine::IOUringEngine(size_t slop_bytes)
20         : slop_bytes(slop_bytes)
21 {
22 #ifdef WITHOUT_URING
23         int ret = -1;
24         dprintf("Compiled without liburing support; not using io_uring.\n");
25 #else
26         int ret = io_uring_queue_init(queue_depth, &ring, 0);
27         if (ret < 0) {
28                 dprintf("io_uring_queue_init() failed; not using io_uring.\n");
29         }
30 #endif
31         using_uring = (ret >= 0);
32 }
33
34 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
35 {
36         if (!using_uring) {
37                 // Synchronous read.
38                 string s;
39                 s.resize(len + slop_bytes);
40                 complete_pread(fd, &s[0], len, offset);
41                 cb(string_view(s.data(), len));
42                 return;
43         }
44
45 #ifndef WITHOUT_URING
46         if (pending_reads < queue_depth) {
47                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
48                 if (sqe == nullptr) {
49                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
50                         exit(1);
51                 }
52                 submit_read_internal(sqe, fd, len, offset, move(cb));
53         } else {
54                 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
55         }
56 #endif
57 }
58
59 #ifndef WITHOUT_URING
60 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
61 {
62         void *buf;
63         if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
64                 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
65                 exit(1);
66         }
67         PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } };
68
69         io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset);
70         io_uring_sqe_set_data(sqe, pending);
71         ++pending_reads;
72 }
73 #endif
74
75 void IOUringEngine::finish()
76 {
77         if (!using_uring) {
78                 return;
79         }
80
81 #ifndef WITHOUT_URING
82         bool anything_to_submit = true;
83         while (pending_reads > 0) {
84                 io_uring_cqe *cqe;
85                 if (io_uring_peek_cqe(&ring, &cqe) != 0) {
86                         if (anything_to_submit) {
87                                 // Nothing ready, so submit whatever is pending and then do a blocking wait.
88                                 int ret = io_uring_submit_and_wait(&ring, 1);
89                                 if (ret < 0) {
90                                         fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
91                                         exit(1);
92                                 }
93                                 anything_to_submit = false;
94                         } else {
95                                 int ret = io_uring_wait_cqe(&ring, &cqe);
96                                 if (ret < 0) {
97                                         fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
98                                         exit(1);
99                                 }
100                         }
101                 }
102
103                 unsigned head;
104                 io_uring_for_each_cqe(&ring, head, cqe)
105                 {
106                         PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
107                         if (cqe->res <= 0) {
108                                 fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
109                                 exit(1);
110                         }
111
112                         if (size_t(cqe->res) < pending->iov.iov_len) {
113                                 // Incomplete read, so resubmit it.
114                                 pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
115                                 pending->iov.iov_len -= cqe->res;
116                                 pending->offset += cqe->res;
117                                 io_uring_cqe_seen(&ring, cqe);
118
119                                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
120                                 if (sqe == nullptr) {
121                                         fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
122                                         exit(1);
123                                 }
124                                 io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
125                                 io_uring_sqe_set_data(sqe, pending);
126                                 anything_to_submit = true;
127                         } else {
128                                 io_uring_cqe_seen(&ring, cqe);
129                                 --pending_reads;
130
131                                 size_t old_pending_reads = pending_reads;
132                                 pending->cb(string_view(reinterpret_cast<char *>(pending->buf), pending->len));
133                                 free(pending->buf);
134                                 delete pending;
135
136                                 if (pending_reads != old_pending_reads) {
137                                         // A new read was made in the callback (and not queued),
138                                         // so we need to re-submit.
139                                         anything_to_submit = true;
140                                 }
141                         }
142                 }
143
144                 // See if there are any queued reads we can submit now.
145                 while (!queued_reads.empty() && pending_reads < queue_depth) {
146                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
147                         if (sqe == nullptr) {
148                                 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
149                                 exit(1);
150                         }
151                         QueuedRead &qr = queued_reads.front();
152                         submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
153                         queued_reads.pop();
154                         anything_to_submit = true;
155                 }
156         }
157 #endif
158 }
159
160 void complete_pread(int fd, void *ptr, size_t len, off_t offset)
161 {
162         while (len > 0) {
163                 ssize_t ret = pread(fd, ptr, len, offset);
164                 if (ret == -1 && errno == EINTR) {
165                         continue;
166                 }
167                 if (ret <= 0) {
168                         perror("pread");
169                         exit(1);
170                 }
171                 ptr = reinterpret_cast<char *>(ptr) + ret;
172                 len -= ret;
173                 offset -= ret;
174         }
175 }