10 #include "complete_pread.h"
12 #include "io_uring_engine.h"
23 IOUringEngine::IOUringEngine(size_t slop_bytes)
24 : slop_bytes(slop_bytes)
28 dprintf("Compiled without liburing support; not using io_uring.\n");
30 int ret = io_uring_queue_init(queue_depth, &ring, 0);
32 dprintf("io_uring_queue_init() failed; not using io_uring.\n");
35 using_uring = (ret >= 0);
39 io_uring_probe *probe = io_uring_get_probe_ring(&ring);
40 supports_stat = (probe != nullptr && io_uring_opcode_supported(probe, IORING_OP_STATX));
42 dprintf("io_uring on this kernel does not support statx(); will do synchronous access checking.\n");
49 void IOUringEngine::submit_stat(const char *path [[maybe_unused]], std::function<void(bool)> cb [[maybe_unused]])
51 assert(supports_stat);
54 if (pending_reads < queue_depth) {
55 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
57 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
60 submit_stat_internal(sqe, strdup(path), move(cb));
64 qs.pathname = strdup(path);
65 queued_stats.push(move(qs));
70 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
75 s.resize(len + slop_bytes);
76 complete_pread(fd, &s[0], len, offset);
77 cb(string_view(s.data(), len));
82 if (pending_reads < queue_depth) {
83 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
85 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
88 submit_read_internal(sqe, fd, len, offset, move(cb));
90 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
96 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
99 if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
100 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
104 PendingRead *pending = new PendingRead;
105 pending->op = OP_READ;
106 pending->read_cb = move(cb);
107 pending->read.buf = buf;
108 pending->read.len = len;
109 pending->read.fd = fd;
110 pending->read.offset = offset;
111 pending->read.iov = iovec{ buf, len };
113 io_uring_prep_readv(sqe, fd, &pending->read.iov, 1, offset);
114 io_uring_sqe_set_data(sqe, pending);
118 void IOUringEngine::submit_stat_internal(io_uring_sqe *sqe, char *path, std::function<void(bool)> cb)
120 PendingRead *pending = new PendingRead;
121 pending->op = OP_STAT;
122 pending->stat_cb = move(cb);
123 pending->stat.pathname = path;
124 pending->stat.buf = new struct statx;
126 io_uring_prep_statx(sqe, /*fd=*/-1, pending->stat.pathname, AT_STATX_SYNC_AS_STAT | AT_SYMLINK_NOFOLLOW, STATX_MODE, pending->stat.buf);
127 io_uring_sqe_set_data(sqe, pending);
132 void IOUringEngine::finish()
138 #ifndef WITHOUT_URING
139 bool anything_to_submit = true;
140 while (pending_reads > 0) {
142 if (io_uring_peek_cqe(&ring, &cqe) != 0) {
143 if (anything_to_submit) {
144 // Nothing ready, so submit whatever is pending and then do a blocking wait.
145 int ret = io_uring_submit_and_wait(&ring, 1);
147 fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
150 anything_to_submit = false;
152 int ret = io_uring_wait_cqe(&ring, &cqe);
154 fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
161 io_uring_for_each_cqe(&ring, head, cqe)
163 PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
164 if (pending->op == OP_STAT) {
165 io_uring_cqe_seen(&ring, cqe);
168 size_t old_pending_reads = pending_reads;
169 pending->stat_cb(cqe->res == 0);
170 free(pending->stat.pathname);
171 delete pending->stat.buf;
174 if (pending_reads != old_pending_reads) {
175 // A new read was made in the callback (and not queued),
176 // so we need to re-submit.
177 anything_to_submit = true;
181 fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
185 if (size_t(cqe->res) < pending->read.iov.iov_len) {
186 // Incomplete read, so resubmit it.
187 pending->read.iov.iov_base = (char *)pending->read.iov.iov_base + cqe->res;
188 pending->read.iov.iov_len -= cqe->res;
189 pending->read.offset += cqe->res;
190 io_uring_cqe_seen(&ring, cqe);
192 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
193 if (sqe == nullptr) {
194 fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
197 io_uring_prep_readv(sqe, pending->read.fd, &pending->read.iov, 1, pending->read.offset);
198 io_uring_sqe_set_data(sqe, pending);
199 anything_to_submit = true;
201 io_uring_cqe_seen(&ring, cqe);
204 size_t old_pending_reads = pending_reads;
205 pending->read_cb(string_view(reinterpret_cast<char *>(pending->read.buf), pending->read.len));
206 free(pending->read.buf);
209 if (pending_reads != old_pending_reads) {
210 // A new read was made in the callback (and not queued),
211 // so we need to re-submit.
212 anything_to_submit = true;
218 // See if there are any queued stats we can submit now.
219 // Running a stat means we're very close to printing out a match,
220 // which is more important than reading more blocks from disk.
221 // (Even if those blocks returned early, they would only generate
222 // more matches that would be blocked by this one in Serializer.)
223 // Thus, prioritize stats.
224 while (!queued_stats.empty() && pending_reads < queue_depth) {
225 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
226 if (sqe == nullptr) {
227 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
230 QueuedStat &qs = queued_stats.front();
231 submit_stat_internal(sqe, qs.pathname, move(qs.cb));
233 anything_to_submit = true;
236 // See if there are any queued reads we can submit now.
237 while (!queued_reads.empty() && pending_reads < queue_depth) {
238 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
239 if (sqe == nullptr) {
240 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
243 QueuedRead &qr = queued_reads.front();
244 submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
246 anything_to_submit = true;