9 #include "io_uring_engine.h"
19 IOUringEngine::IOUringEngine(size_t slop_bytes)
20 : slop_bytes(slop_bytes)
24 dprintf("Compiled without liburing support; not using io_uring.\n");
26 int ret = io_uring_queue_init(queue_depth, &ring, 0);
28 dprintf("io_uring_queue_init() failed; not using io_uring.\n");
31 using_uring = (ret >= 0);
34 void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
39 s.resize(len + slop_bytes);
40 complete_pread(fd, &s[0], len, offset);
41 cb(string_view(s.data(), len));
46 if (pending_reads < queue_depth) {
47 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
49 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
52 submit_read_internal(sqe, fd, len, offset, move(cb));
54 queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
60 void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
63 if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
64 fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
67 PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } };
69 io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset);
70 io_uring_sqe_set_data(sqe, pending);
75 void IOUringEngine::finish()
82 bool anything_to_submit = true;
83 while (pending_reads > 0) {
85 if (io_uring_peek_cqe(&ring, &cqe) != 0) {
86 if (anything_to_submit) {
87 // Nothing ready, so submit whatever is pending and then do a blocking wait.
88 int ret = io_uring_submit_and_wait(&ring, 1);
90 fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
93 anything_to_submit = false;
95 int ret = io_uring_wait_cqe(&ring, &cqe);
97 fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
104 io_uring_for_each_cqe(&ring, head, cqe)
106 PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
108 fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
112 if (size_t(cqe->res) < pending->iov.iov_len) {
113 // Incomplete read, so resubmit it.
114 pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
115 pending->iov.iov_len -= cqe->res;
116 pending->offset += cqe->res;
117 io_uring_cqe_seen(&ring, cqe);
119 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
120 if (sqe == nullptr) {
121 fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
124 io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
125 io_uring_sqe_set_data(sqe, pending);
126 anything_to_submit = true;
128 io_uring_cqe_seen(&ring, cqe);
131 size_t old_pending_reads = pending_reads;
132 pending->cb(string_view(reinterpret_cast<char *>(pending->buf), pending->len));
136 if (pending_reads != old_pending_reads) {
137 // A new read was made in the callback (and not queued),
138 // so we need to re-submit.
139 anything_to_submit = true;
144 // See if there are any queued reads we can submit now.
145 while (!queued_reads.empty() && pending_reads < queue_depth) {
146 io_uring_sqe *sqe = io_uring_get_sqe(&ring);
147 if (sqe == nullptr) {
148 fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
151 QueuedRead &qr = queued_reads.front();
152 submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
154 anything_to_submit = true;
160 void complete_pread(int fd, void *ptr, size_t len, off_t offset)
163 ssize_t ret = pread(fd, ptr, len, offset);
164 if (ret == -1 && errno == EINTR) {
171 ptr = reinterpret_cast<char *>(ptr) + ret;