]> git.sesse.net Git - plocate/blob - io_uring_engine.cpp
Replace mmap with io_uring.
[plocate] / io_uring_engine.cpp
1 #include <string.h>
2 #include <liburing.h>
3 #include <stdint.h>
4 #include <unistd.h>
5 #include <memory>
6 #include <functional>
7
8 #include "io_uring_engine.h"
9
10 using namespace std;
11
12 IOUringEngine::IOUringEngine()
13 {
14         int ret = io_uring_queue_init(queue_depth, &ring, 0);
15         using_uring = (ret >= 0);
16 }
17
18 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string)> cb)
19 {
20         if (!using_uring) {
21                 // Synchronous read.
22                 string s;
23                 s.resize(len);
24                 complete_pread(fd, &s[0], len, offset);
25                 cb(move(s));
26                 return;
27         }
28
29         if (pending_reads < queue_depth) {
30                 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
31                 if (sqe == nullptr) {
32                         fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
33                         exit(1);
34                 }
35                 submit_read_internal(sqe, fd, len, offset, move(cb));
36         } else {
37                 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
38         }
39 }
40
41 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string)> cb)
42 {
43         void *buf;
44         if (posix_memalign(&buf, /*alignment=*/4096, len)) {
45                 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
46                 exit(1);
47         }
48         PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } };
49
50         io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset);
51         io_uring_sqe_set_data(sqe, pending);
52         ++pending_reads;
53 }
54
55 void IOUringEngine::finish()
56 {
57         if (!using_uring) {
58                 return;
59         }
60
61         int ret = io_uring_submit(&ring);
62         if (ret < 0) {
63                 fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret));
64                 exit(1);
65         }
66         bool anything_to_submit = false;
67         while (pending_reads > 0) {
68                 io_uring_cqe *cqe;
69                 ret = io_uring_wait_cqe(&ring, &cqe);
70                 if (ret < 0) {
71                         fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
72                         exit(1);
73                 }
74
75                 PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
76                 if (cqe->res <= 0) {
77                         fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
78                         exit(1);
79                 }
80
81                 if (size_t(cqe->res) < pending->iov.iov_len) {
82                         // Incomplete read, so resubmit it.
83                         pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
84                         pending->iov.iov_len -= cqe->res;
85                         pending->offset += cqe->res;
86                         io_uring_cqe_seen(&ring, cqe);
87
88                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
89                         if (sqe == nullptr) {
90                                 fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
91                                 exit(1);
92                         }
93                         io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
94                         io_uring_sqe_set_data(sqe, pending);
95                         anything_to_submit = true;
96                 } else {
97                         io_uring_cqe_seen(&ring, cqe);
98                         --pending_reads;
99
100                         size_t old_pending_reads = pending_reads;
101                         pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
102                         free(pending->buf);
103                         delete pending;
104
105                         if (pending_reads != old_pending_reads) {
106                                 // A new read was made in the callback (and not queued),
107                                 // so we need to re-submit.
108                                 anything_to_submit = true;
109                         }
110                 }
111
112                 // See if there are any queued reads we can submit now.
113                 while (!queued_reads.empty() && pending_reads < queue_depth) {
114                         io_uring_sqe *sqe = io_uring_get_sqe(&ring);
115                         if (sqe == nullptr) {
116                                 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
117                                 exit(1);
118                         }
119                         QueuedRead &qr = queued_reads.front();
120                         submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
121                         queued_reads.pop();
122                         anything_to_submit = true;
123                 }
124
125                 if (anything_to_submit) {
126                         // A new read was made, so we need to re-submit.
127                         int ret = io_uring_submit(&ring);
128                         if (ret < 0) {
129                                 fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
130                                 exit(1);
131                         } else {
132                                 anything_to_submit = false;
133                         }
134                 }
135         }
136 }
137
138 void complete_pread(int fd, void *ptr, size_t len, off_t offset)
139 {
140         while (len > 0) {
141                 ssize_t ret = pread(fd, ptr, len, offset);
142                 if (ret == -1 && errno == EINTR) {
143                         continue;
144                 }
145                 if (ret <= 0) {
146                         perror("pread");
147                         exit(1);
148                 }
149                 ptr = reinterpret_cast<char *>(ptr) + ret;
150                 len -= ret;
151                 offset -= ret;
152         }
153 }
154
155