10 #include "complete_pread.h"
12 #include "io_uring_engine.h"
23 IOUringEngine::IOUringEngine(size_t slop_bytes)
24 : slop_bytes(slop_bytes)
28 dprintf("Compiled without liburing support; not using io_uring.\n");
30 int ret = io_uring_queue_init(queue_depth, &ring, 0);
32 dprintf("io_uring_queue_init() failed; not using io_uring.\n");
35 using_uring = (ret >= 0);
39 io_uring_probe *probe = io_uring_get_probe_ring(&ring);
40 supports_stat = (probe != nullptr && io_uring_opcode_supported(probe, IORING_OP_STATX));
42 dprintf("io_uring on this kernel does not support statx(); will do synchronous access checking.\n");
49 void IOUringEngine::submit_stat(const char *path, std::function<void(bool)> cb)
51 assert(supports_stat);
54 if (pending_reads < queue_depth) {
55 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
57 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
60 submit_stat_internal(sqe, strdup(path), move(cb));
64 qs.pathname = strdup(path);
65 queued_stats.push(move(qs));
68 /* unused parameters */
74 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
79 s.resize(len + slop_bytes);
80 complete_pread(fd, &s[0], len, offset);
81 cb(string_view(s.data(), len));
86 if (pending_reads < queue_depth) {
87 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
89 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
92 submit_read_internal(sqe, fd, len, offset, move(cb));
94 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
100 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
103 if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
104 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
108 PendingRead *pending = new PendingRead;
109 pending->op = OP_READ;
110 pending->read_cb = move(cb);
111 pending->read.buf = buf;
112 pending->read.len = len;
113 pending->read.fd = fd;
114 pending->read.offset = offset;
115 pending->read.iov = iovec{ buf, len };
117 io_uring_prep_readv(sqe, fd, &pending->read.iov, 1, offset);
118 io_uring_sqe_set_data(sqe, pending);
122 void IOUringEngine::submit_stat_internal(io_uring_sqe *sqe, char *path, std::function<void(bool)> cb)
124 PendingRead *pending = new PendingRead;
125 pending->op = OP_STAT;
126 pending->stat_cb = move(cb);
127 pending->stat.pathname = path;
128 pending->stat.buf = new struct statx;
130 io_uring_prep_statx(sqe, /*fd=*/-1, pending->stat.pathname, AT_STATX_SYNC_AS_STAT | AT_SYMLINK_NOFOLLOW, STATX_MODE, pending->stat.buf);
131 io_uring_sqe_set_data(sqe, pending);
136 void IOUringEngine::finish()
142 #ifndef WITHOUT_URING
143 bool anything_to_submit = true;
144 while (pending_reads > 0) {
146 if (io_uring_peek_cqe(&ring, &cqe) != 0) {
147 if (anything_to_submit) {
148 // Nothing ready, so submit whatever is pending and then do a blocking wait.
149 int ret = io_uring_submit_and_wait(&ring, 1);
151 fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
154 anything_to_submit = false;
156 int ret = io_uring_wait_cqe(&ring, &cqe);
158 fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
165 io_uring_for_each_cqe(&ring, head, cqe)
167 PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
168 if (pending->op == OP_STAT) {
169 io_uring_cqe_seen(&ring, cqe);
172 size_t old_pending_reads = pending_reads;
173 pending->stat_cb(cqe->res == 0);
174 free(pending->stat.pathname);
175 delete pending->stat.buf;
178 if (pending_reads != old_pending_reads) {
179 // A new read was made in the callback (and not queued),
180 // so we need to re-submit.
181 anything_to_submit = true;
185 fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
189 if (size_t(cqe->res) < pending->read.iov.iov_len) {
190 // Incomplete read, so resubmit it.
191 pending->read.iov.iov_base = (char *)pending->read.iov.iov_base + cqe->res;
192 pending->read.iov.iov_len -= cqe->res;
193 pending->read.offset += cqe->res;
194 io_uring_cqe_seen(&ring, cqe);
196 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
197 if (sqe == nullptr) {
198 fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
201 io_uring_prep_readv(sqe, pending->read.fd, &pending->read.iov, 1, pending->read.offset);
202 io_uring_sqe_set_data(sqe, pending);
203 anything_to_submit = true;
205 io_uring_cqe_seen(&ring, cqe);
208 size_t old_pending_reads = pending_reads;
209 pending->read_cb(string_view(reinterpret_cast<char *>(pending->read.buf), pending->read.len));
210 free(pending->read.buf);
213 if (pending_reads != old_pending_reads) {
214 // A new read was made in the callback (and not queued),
215 // so we need to re-submit.
216 anything_to_submit = true;
222 // See if there are any queued stats we can submit now.
223 // Running a stat means we're very close to printing out a match,
224 // which is more important than reading more blocks from disk.
225 // (Even if those blocks returned early, they would only generate
226 // more matches that would be blocked by this one in Serializer.)
227 // Thus, prioritize stats.
228 while (!queued_stats.empty() && pending_reads < queue_depth) {
229 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
230 if (sqe == nullptr) {
231 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
234 QueuedStat &qs = queued_stats.front();
235 submit_stat_internal(sqe, qs.pathname, move(qs.cb));
237 anything_to_submit = true;
240 // See if there are any queued reads we can submit now.
241 while (!queued_reads.empty() && pending_reads < queue_depth) {
242 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
243 if (sqe == nullptr) {
244 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
247 QueuedRead &qr = queued_reads.front();
248 submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
250 anything_to_submit = true;