From: Steinar H. Gunderson Date: Thu, 1 Oct 2020 20:53:17 +0000 (+0200) Subject: Support batch io_uring completions. X-Git-Tag: 1.0.0~69 X-Git-Url: https://git.sesse.net/?a=commitdiff_plain;h=48be942bea5a1fabb09e9a0333c6dfb0ac7f2ebc;p=plocate Support batch io_uring completions. Should save a fair amount of syscalls in heavy situations. --- diff --git a/io_uring_engine.cpp b/io_uring_engine.cpp index fb0a4de..cadc6c3 100644 --- a/io_uring_engine.cpp +++ b/io_uring_engine.cpp @@ -69,78 +69,78 @@ void IOUringEngine::finish() } #ifndef WITHOUT_URING - int ret = io_uring_submit(&ring); - if (ret < 0) { - fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret)); - exit(1); - } - bool anything_to_submit = false; + bool anything_to_submit = true; while (pending_reads > 0) { - io_uring_cqe *cqe; - ret = io_uring_wait_cqe(&ring, &cqe); - if (ret < 0) { - fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret)); - exit(1); - } - - PendingRead *pending = reinterpret_cast(cqe->user_data); - if (cqe->res <= 0) { - fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res)); - exit(1); - } - - if (size_t(cqe->res) < pending->iov.iov_len) { - // Incomplete read, so resubmit it. - pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res; - pending->iov.iov_len -= cqe->res; - pending->offset += cqe->res; - io_uring_cqe_seen(&ring, cqe); - - io_uring_sqe *sqe = io_uring_get_sqe(&ring); - if (sqe == nullptr) { - fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n"); - exit(1); + io_uring_cqe *cqes[queue_depth]; + int num_sqes = io_uring_peek_batch_cqe(&ring, cqes, queue_depth); + if (num_sqes == 0) { + if (anything_to_submit) { + // Nothing ready, so submit whatever is pending and then do a blocking wait. + int ret = io_uring_submit(&ring); + if (ret < 0) { + fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret)); + exit(1); + } + anything_to_submit = false; } - io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset); - io_uring_sqe_set_data(sqe, pending); - anything_to_submit = true; - } else { - io_uring_cqe_seen(&ring, cqe); - --pending_reads; - - size_t old_pending_reads = pending_reads; - pending->cb(string(reinterpret_cast(pending->buf), pending->len)); - free(pending->buf); - delete pending; - - if (pending_reads != old_pending_reads) { - // A new read was made in the callback (and not queued), - // so we need to re-submit. - anything_to_submit = true; + int ret = io_uring_wait_cqe(&ring, &cqes[0]); + if (ret < 0) { + fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret)); + exit(1); } + num_sqes = 1; } - // See if there are any queued reads we can submit now. - while (!queued_reads.empty() && pending_reads < queue_depth) { - io_uring_sqe *sqe = io_uring_get_sqe(&ring); - if (sqe == nullptr) { - fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno)); + for (int sqe_idx = 0; sqe_idx < num_sqes; ++sqe_idx) { + io_uring_cqe *cqe = cqes[sqe_idx]; + PendingRead *pending = reinterpret_cast(cqe->user_data); + if (cqe->res <= 0) { + fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res)); exit(1); } - QueuedRead &qr = queued_reads.front(); - submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb)); - queued_reads.pop(); - anything_to_submit = true; - } - if (anything_to_submit) { - // A new read was made, so we need to re-submit. - int ret = io_uring_submit(&ring); - if (ret < 0) { - fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret)); - exit(1); + if (size_t(cqe->res) < pending->iov.iov_len) { + // Incomplete read, so resubmit it. + pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res; + pending->iov.iov_len -= cqe->res; + pending->offset += cqe->res; + io_uring_cqe_seen(&ring, cqe); + + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n"); + exit(1); + } + io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset); + io_uring_sqe_set_data(sqe, pending); + anything_to_submit = true; } else { - anything_to_submit = false; + io_uring_cqe_seen(&ring, cqe); + --pending_reads; + + size_t old_pending_reads = pending_reads; + pending->cb(string(reinterpret_cast(pending->buf), pending->len)); + free(pending->buf); + delete pending; + + if (pending_reads != old_pending_reads) { + // A new read was made in the callback (and not queued), + // so we need to re-submit. + anything_to_submit = true; + } + } + + // See if there are any queued reads we can submit now. + while (!queued_reads.empty() && pending_reads < queue_depth) { + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno)); + exit(1); + } + QueuedRead &qr = queued_reads.front(); + submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb)); + queued_reads.pop(); + anything_to_submit = true; } } }