11 #include "io_uring_engine.h"
22 IOUringEngine::IOUringEngine(size_t slop_bytes)
23 : slop_bytes(slop_bytes)
27 dprintf("Compiled without liburing support; not using io_uring.\n");
29 int ret = io_uring_queue_init(queue_depth, &ring, 0);
31 dprintf("io_uring_queue_init() failed; not using io_uring.\n");
34 using_uring = (ret >= 0);
38 io_uring_probe *probe = io_uring_get_probe_ring(&ring);
39 supports_stat = (probe != nullptr && io_uring_opcode_supported(probe, IORING_OP_STATX));
41 dprintf("io_uring on this kernel does not support statx(); will do synchronous access checking.\n");
48 void IOUringEngine::submit_stat(const char *path, std::function<void()> cb)
50 assert(supports_stat);
53 if (pending_reads < queue_depth) {
54 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
56 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
59 submit_stat_internal(sqe, strdup(path), move(cb));
63 qs.pathname = strdup(path);
64 queued_stats.push(move(qs));
69 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
74 s.resize(len + slop_bytes);
75 complete_pread(fd, &s[0], len, offset);
76 cb(string_view(s.data(), len));
81 if (pending_reads < queue_depth) {
82 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
84 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
87 submit_read_internal(sqe, fd, len, offset, move(cb));
89 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
95 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
98 if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
99 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
103 PendingRead *pending = new PendingRead;
104 pending->op = OP_READ;
105 pending->read_cb = move(cb);
106 pending->read.buf = buf;
107 pending->read.len = len;
108 pending->read.fd = fd;
109 pending->read.offset = offset;
110 pending->read.iov = iovec{ buf, len };
112 io_uring_prep_readv(sqe, fd, &pending->read.iov, 1, offset);
113 io_uring_sqe_set_data(sqe, pending);
117 void IOUringEngine::submit_stat_internal(io_uring_sqe *sqe, char *path, std::function<void()> cb)
119 PendingRead *pending = new PendingRead;
120 pending->op = OP_STAT;
121 pending->stat_cb = move(cb);
122 pending->stat.pathname = path;
123 pending->stat.buf = new struct statx;
125 io_uring_prep_statx(sqe, /*fd=*/-1, pending->stat.pathname, AT_STATX_SYNC_AS_STAT, STATX_MODE, pending->stat.buf);
126 io_uring_sqe_set_data(sqe, pending);
131 void IOUringEngine::finish()
137 #ifndef WITHOUT_URING
138 bool anything_to_submit = true;
139 while (pending_reads > 0) {
141 if (io_uring_peek_cqe(&ring, &cqe) != 0) {
142 if (anything_to_submit) {
143 // Nothing ready, so submit whatever is pending and then do a blocking wait.
144 int ret = io_uring_submit_and_wait(&ring, 1);
146 fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
149 anything_to_submit = false;
151 int ret = io_uring_wait_cqe(&ring, &cqe);
153 fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
160 io_uring_for_each_cqe(&ring, head, cqe)
162 PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
163 if (pending->op == OP_STAT) {
164 io_uring_cqe_seen(&ring, cqe);
167 size_t old_pending_reads = pending_reads;
169 free(pending->stat.pathname);
170 delete pending->stat.buf;
173 if (pending_reads != old_pending_reads) {
174 // A new read was made in the callback (and not queued),
175 // so we need to re-submit.
176 anything_to_submit = true;
180 fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
184 if (size_t(cqe->res) < pending->read.iov.iov_len) {
185 // Incomplete read, so resubmit it.
186 pending->read.iov.iov_base = (char *)pending->read.iov.iov_base + cqe->res;
187 pending->read.iov.iov_len -= cqe->res;
188 pending->read.offset += cqe->res;
189 io_uring_cqe_seen(&ring, cqe);
191 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
192 if (sqe == nullptr) {
193 fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
196 io_uring_prep_readv(sqe, pending->read.fd, &pending->read.iov, 1, pending->read.offset);
197 io_uring_sqe_set_data(sqe, pending);
198 anything_to_submit = true;
200 io_uring_cqe_seen(&ring, cqe);
203 size_t old_pending_reads = pending_reads;
204 pending->read_cb(string_view(reinterpret_cast<char *>(pending->read.buf), pending->read.len));
205 free(pending->read.buf);
208 if (pending_reads != old_pending_reads) {
209 // A new read was made in the callback (and not queued),
210 // so we need to re-submit.
211 anything_to_submit = true;
217 // See if there are any queued stats we can submit now.
218 // Running a stat means we're very close to printing out a match,
219 // which is more important than reading more blocks from disk.
220 // (Even if those blocks returned early, they would only generate
221 // more matches that would be blocked by this one in Serializer.)
222 // Thus, prioritize stats.
223 while (!queued_stats.empty() && pending_reads < queue_depth) {
224 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
225 if (sqe == nullptr) {
226 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
229 QueuedStat &qs = queued_stats.front();
230 submit_stat_internal(sqe, qs.pathname, move(qs.cb));
232 anything_to_submit = true;
235 // See if there are any queued reads we can submit now.
236 while (!queued_reads.empty() && pending_reads < queue_depth) {
237 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
238 if (sqe == nullptr) {
239 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
242 QueuedRead &qr = queued_reads.front();
243 submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
245 anything_to_submit = true;
251 void complete_pread(int fd, void *ptr, size_t len, off_t offset)
254 ssize_t ret = pread(fd, ptr, len, offset);
255 if (ret == -1 && errno == EINTR) {
262 ptr = reinterpret_cast<char *>(ptr) + ret;