]> git.sesse.net Git - plocate/blob - io_uring_engine.cpp
Make the searcher ASan-clean.
[plocate] / io_uring_engine.cpp
1 #include <string.h>
2 #ifndef WITHOUT_URING
3 #include <liburing.h>
4 #endif
5 #include "io_uring_engine.h"
6
7 #include <functional>
8 #include <memory>
9 #include <stdint.h>
10 #include <unistd.h>
11
12 using namespace std;
13
14 IOUringEngine::IOUringEngine(size_t slop_bytes)
15         : slop_bytes(slop_bytes)
16 {
17 #ifdef WITHOUT_URING
18         int ret = -1;
19 #else
20         int ret = io_uring_queue_init(queue_depth, &ring, 0);
21 #endif
22         using_uring = (ret >= 0);
23 }
24
25 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
26 {
27         if (!using_uring) {
28                 // Synchronous read.
29                 string s;
30                 s.resize(len + slop_bytes);
31                 complete_pread(fd, &s[0], len, offset);
32                 cb(string_view(s.data(), len));
33                 return;
34         }
35
36 #ifndef WITHOUT_URING
37         if (pending_reads < queue_depth) {
38                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
39                 if (sqe == nullptr) {
40                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
41                         exit(1);
42                 }
43                 submit_read_internal(sqe, fd, len, offset, move(cb));
44         } else {
45                 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
46         }
47 #endif
48 }
49
50 #ifndef WITHOUT_URING
51 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
52 {
53         void *buf;
54         if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
55                 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
56                 exit(1);
57         }
58         PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } };
59
60         io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset);
61         io_uring_sqe_set_data(sqe, pending);
62         ++pending_reads;
63 }
64 #endif
65
66 void IOUringEngine::finish()
67 {
68         if (!using_uring) {
69                 return;
70         }
71
72 #ifndef WITHOUT_URING
73         bool anything_to_submit = true;
74         while (pending_reads > 0) {
75                 io_uring_cqe *cqe;
76                 if (io_uring_peek_cqe(&ring, &cqe) != 0) {
77                         if (anything_to_submit) {
78                                 // Nothing ready, so submit whatever is pending and then do a blocking wait.
79                                 int ret = io_uring_submit_and_wait(&ring, 1);
80                                 if (ret < 0) {
81                                         fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
82                                         exit(1);
83                                 }
84                                 anything_to_submit = false;
85                         } else {
86                                 int ret = io_uring_wait_cqe(&ring, &cqe);
87                                 if (ret < 0) {
88                                         fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
89                                         exit(1);
90                                 }
91                         }
92                 }
93
94                 unsigned head;
95                 io_uring_for_each_cqe(&ring, head, cqe)
96                 {
97                         PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
98                         if (cqe->res <= 0) {
99                                 fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
100                                 exit(1);
101                         }
102
103                         if (size_t(cqe->res) < pending->iov.iov_len) {
104                                 // Incomplete read, so resubmit it.
105                                 pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
106                                 pending->iov.iov_len -= cqe->res;
107                                 pending->offset += cqe->res;
108                                 io_uring_cqe_seen(&ring, cqe);
109
110                                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
111                                 if (sqe == nullptr) {
112                                         fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
113                                         exit(1);
114                                 }
115                                 io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
116                                 io_uring_sqe_set_data(sqe, pending);
117                                 anything_to_submit = true;
118                         } else {
119                                 io_uring_cqe_seen(&ring, cqe);
120                                 --pending_reads;
121
122                                 size_t old_pending_reads = pending_reads;
123                                 pending->cb(string_view(reinterpret_cast<char *>(pending->buf), pending->len));
124                                 free(pending->buf);
125                                 delete pending;
126
127                                 if (pending_reads != old_pending_reads) {
128                                         // A new read was made in the callback (and not queued),
129                                         // so we need to re-submit.
130                                         anything_to_submit = true;
131                                 }
132                         }
133                 }
134
135                 // See if there are any queued reads we can submit now.
136                 while (!queued_reads.empty() && pending_reads < queue_depth) {
137                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
138                         if (sqe == nullptr) {
139                                 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
140                                 exit(1);
141                         }
142                         QueuedRead &qr = queued_reads.front();
143                         submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
144                         queued_reads.pop();
145                         anything_to_submit = true;
146                 }
147         }
148 #endif
149 }
150
151 void complete_pread(int fd, void *ptr, size_t len, off_t offset)
152 {
153         while (len > 0) {
154                 ssize_t ret = pread(fd, ptr, len, offset);
155                 if (ret == -1 && errno == EINTR) {
156                         continue;
157                 }
158                 if (ret <= 0) {
159                         perror("pread");
160                         exit(1);
161                 }
162                 ptr = reinterpret_cast<char *>(ptr) + ret;
163                 len -= ret;
164                 offset -= ret;
165         }
166 }