]> git.sesse.net Git - plocate/blob - io_uring_engine.cpp
clang-format again (IWYU and clang-format seemingly disagree).
[plocate] / io_uring_engine.cpp
1 #include <errno.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #ifndef WITHOUT_URING
6 #include <liburing.h>
7 #endif
8 #include "io_uring_engine.h"
9
10 #include <functional>
11 #include <iosfwd>
12 #include <string>
13 #include <unistd.h>
14 #include <utility>
15
16 using namespace std;
17
18 IOUringEngine::IOUringEngine(size_t slop_bytes)
19         : slop_bytes(slop_bytes)
20 {
21 #ifdef WITHOUT_URING
22         int ret = -1;
23 #else
24         int ret = io_uring_queue_init(queue_depth, &ring, 0);
25 #endif
26         using_uring = (ret >= 0);
27 }
28
29 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
30 {
31         if (!using_uring) {
32                 // Synchronous read.
33                 string s;
34                 s.resize(len + slop_bytes);
35                 complete_pread(fd, &s[0], len, offset);
36                 cb(string_view(s.data(), len));
37                 return;
38         }
39
40 #ifndef WITHOUT_URING
41         if (pending_reads < queue_depth) {
42                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
43                 if (sqe == nullptr) {
44                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
45                         exit(1);
46                 }
47                 submit_read_internal(sqe, fd, len, offset, move(cb));
48         } else {
49                 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
50         }
51 #endif
52 }
53
54 #ifndef WITHOUT_URING
55 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
56 {
57         void *buf;
58         if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
59                 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
60                 exit(1);
61         }
62         PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } };
63
64         io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset);
65         io_uring_sqe_set_data(sqe, pending);
66         ++pending_reads;
67 }
68 #endif
69
70 void IOUringEngine::finish()
71 {
72         if (!using_uring) {
73                 return;
74         }
75
76 #ifndef WITHOUT_URING
77         bool anything_to_submit = true;
78         while (pending_reads > 0) {
79                 io_uring_cqe *cqe;
80                 if (io_uring_peek_cqe(&ring, &cqe) != 0) {
81                         if (anything_to_submit) {
82                                 // Nothing ready, so submit whatever is pending and then do a blocking wait.
83                                 int ret = io_uring_submit_and_wait(&ring, 1);
84                                 if (ret < 0) {
85                                         fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
86                                         exit(1);
87                                 }
88                                 anything_to_submit = false;
89                         } else {
90                                 int ret = io_uring_wait_cqe(&ring, &cqe);
91                                 if (ret < 0) {
92                                         fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
93                                         exit(1);
94                                 }
95                         }
96                 }
97
98                 unsigned head;
99                 io_uring_for_each_cqe(&ring, head, cqe)
100                 {
101                         PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
102                         if (cqe->res <= 0) {
103                                 fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
104                                 exit(1);
105                         }
106
107                         if (size_t(cqe->res) < pending->iov.iov_len) {
108                                 // Incomplete read, so resubmit it.
109                                 pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
110                                 pending->iov.iov_len -= cqe->res;
111                                 pending->offset += cqe->res;
112                                 io_uring_cqe_seen(&ring, cqe);
113
114                                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
115                                 if (sqe == nullptr) {
116                                         fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
117                                         exit(1);
118                                 }
119                                 io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
120                                 io_uring_sqe_set_data(sqe, pending);
121                                 anything_to_submit = true;
122                         } else {
123                                 io_uring_cqe_seen(&ring, cqe);
124                                 --pending_reads;
125
126                                 size_t old_pending_reads = pending_reads;
127                                 pending->cb(string_view(reinterpret_cast<char *>(pending->buf), pending->len));
128                                 free(pending->buf);
129                                 delete pending;
130
131                                 if (pending_reads != old_pending_reads) {
132                                         // A new read was made in the callback (and not queued),
133                                         // so we need to re-submit.
134                                         anything_to_submit = true;
135                                 }
136                         }
137                 }
138
139                 // See if there are any queued reads we can submit now.
140                 while (!queued_reads.empty() && pending_reads < queue_depth) {
141                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
142                         if (sqe == nullptr) {
143                                 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
144                                 exit(1);
145                         }
146                         QueuedRead &qr = queued_reads.front();
147                         submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
148                         queued_reads.pop();
149                         anything_to_submit = true;
150                 }
151         }
152 #endif
153 }
154
155 void complete_pread(int fd, void *ptr, size_t len, off_t offset)
156 {
157         while (len > 0) {
158                 ssize_t ret = pread(fd, ptr, len, offset);
159                 if (ret == -1 && errno == EINTR) {
160                         continue;
161                 }
162                 if (ret <= 0) {
163                         perror("pread");
164                         exit(1);
165                 }
166                 ptr = reinterpret_cast<char *>(ptr) + ret;
167                 len -= ret;
168                 offset -= ret;
169         }
170 }