]> git.sesse.net Git - plocate/blob - io_uring_engine.cpp
Support batch io_uring completions.
[plocate] / io_uring_engine.cpp
1 #include <string.h>
2 #ifndef WITHOUT_URING
3 #include <liburing.h>
4 #endif
5 #include "io_uring_engine.h"
6
7 #include <functional>
8 #include <memory>
9 #include <stdint.h>
10 #include <unistd.h>
11
12 using namespace std;
13
14 IOUringEngine::IOUringEngine()
15 {
16 #ifdef WITHOUT_URING
17         int ret = -1;
18 #else
19         int ret = io_uring_queue_init(queue_depth, &ring, 0);
20 #endif
21         using_uring = (ret >= 0);
22 }
23
24 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string)> cb)
25 {
26         if (!using_uring) {
27                 // Synchronous read.
28                 string s;
29                 s.resize(len);
30                 complete_pread(fd, &s[0], len, offset);
31                 cb(move(s));
32                 return;
33         }
34
35 #ifndef WITHOUT_URING
36         if (pending_reads < queue_depth) {
37                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
38                 if (sqe == nullptr) {
39                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
40                         exit(1);
41                 }
42                 submit_read_internal(sqe, fd, len, offset, move(cb));
43         } else {
44                 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
45         }
46 #endif
47 }
48
49 #ifndef WITHOUT_URING
50 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string)> cb)
51 {
52         void *buf;
53         if (posix_memalign(&buf, /*alignment=*/4096, len)) {
54                 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
55                 exit(1);
56         }
57         PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } };
58
59         io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset);
60         io_uring_sqe_set_data(sqe, pending);
61         ++pending_reads;
62 }
63 #endif
64
65 void IOUringEngine::finish()
66 {
67         if (!using_uring) {
68                 return;
69         }
70
71 #ifndef WITHOUT_URING
72         bool anything_to_submit = true;
73         while (pending_reads > 0) {
74                 io_uring_cqe *cqes[queue_depth];
75                 int num_sqes = io_uring_peek_batch_cqe(&ring, cqes, queue_depth);
76                 if (num_sqes == 0) {
77                         if (anything_to_submit) {
78                                 // Nothing ready, so submit whatever is pending and then do a blocking wait.
79                                 int ret = io_uring_submit(&ring);
80                                 if (ret < 0) {
81                                         fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
82                                         exit(1);
83                                 }
84                                 anything_to_submit = false;
85                         }
86                         int ret = io_uring_wait_cqe(&ring, &cqes[0]);
87                         if (ret < 0) {
88                                 fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
89                                 exit(1);
90                         }
91                         num_sqes = 1;
92                 }
93
94                 for (int sqe_idx = 0; sqe_idx < num_sqes; ++sqe_idx) {
95                         io_uring_cqe *cqe = cqes[sqe_idx];
96                         PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
97                         if (cqe->res <= 0) {
98                                 fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
99                                 exit(1);
100                         }
101
102                         if (size_t(cqe->res) < pending->iov.iov_len) {
103                                 // Incomplete read, so resubmit it.
104                                 pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
105                                 pending->iov.iov_len -= cqe->res;
106                                 pending->offset += cqe->res;
107                                 io_uring_cqe_seen(&ring, cqe);
108
109                                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
110                                 if (sqe == nullptr) {
111                                         fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
112                                         exit(1);
113                                 }
114                                 io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
115                                 io_uring_sqe_set_data(sqe, pending);
116                                 anything_to_submit = true;
117                         } else {
118                                 io_uring_cqe_seen(&ring, cqe);
119                                 --pending_reads;
120
121                                 size_t old_pending_reads = pending_reads;
122                                 pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
123                                 free(pending->buf);
124                                 delete pending;
125
126                                 if (pending_reads != old_pending_reads) {
127                                         // A new read was made in the callback (and not queued),
128                                         // so we need to re-submit.
129                                         anything_to_submit = true;
130                                 }
131                         }
132
133                         // See if there are any queued reads we can submit now.
134                         while (!queued_reads.empty() && pending_reads < queue_depth) {
135                                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
136                                 if (sqe == nullptr) {
137                                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
138                                         exit(1);
139                                 }
140                                 QueuedRead &qr = queued_reads.front();
141                                 submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
142                                 queued_reads.pop();
143                                 anything_to_submit = true;
144                         }
145                 }
146         }
147 #endif
148 }
149
150 void complete_pread(int fd, void *ptr, size_t len, off_t offset)
151 {
152         while (len > 0) {
153                 ssize_t ret = pread(fd, ptr, len, offset);
154                 if (ret == -1 && errno == EINTR) {
155                         continue;
156                 }
157                 if (ret <= 0) {
158                         perror("pread");
159                         exit(1);
160                 }
161                 ptr = reinterpret_cast<char *>(ptr) + ret;
162                 len -= ret;
163                 offset -= ret;
164         }
165 }