]> git.sesse.net Git - plocate/commitdiff
Support batch io_uring completions.
authorSteinar H. Gunderson <steinar+git@gunderson.no>
Thu, 1 Oct 2020 20:53:17 +0000 (22:53 +0200)
committerSteinar H. Gunderson <steinar+git@gunderson.no>
Thu, 1 Oct 2020 21:04:17 +0000 (23:04 +0200)
Should save a fair amount of syscalls in heavy situations.

io_uring_engine.cpp

index fb0a4de443d25ec99ff13ff6c1b11ca6a5897962..cadc6c33a79abeb12d2f63387b2f1fa6037b25f2 100644 (file)
@@ -69,78 +69,78 @@ void IOUringEngine::finish()
        }
 
 #ifndef WITHOUT_URING
-       int ret = io_uring_submit(&ring);
-       if (ret < 0) {
-               fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret));
-               exit(1);
-       }
-       bool anything_to_submit = false;
+       bool anything_to_submit = true;
        while (pending_reads > 0) {
-               io_uring_cqe *cqe;
-               ret = io_uring_wait_cqe(&ring, &cqe);
-               if (ret < 0) {
-                       fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
-                       exit(1);
-               }
-
-               PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
-               if (cqe->res <= 0) {
-                       fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
-                       exit(1);
-               }
-
-               if (size_t(cqe->res) < pending->iov.iov_len) {
-                       // Incomplete read, so resubmit it.
-                       pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
-                       pending->iov.iov_len -= cqe->res;
-                       pending->offset += cqe->res;
-                       io_uring_cqe_seen(&ring, cqe);
-
-                       io_uring_sqe *sqe = io_uring_get_sqe(&ring);
-                       if (sqe == nullptr) {
-                               fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
-                               exit(1);
+               io_uring_cqe *cqes[queue_depth];
+               int num_sqes = io_uring_peek_batch_cqe(&ring, cqes, queue_depth);
+               if (num_sqes == 0) {
+                       if (anything_to_submit) {
+                               // Nothing ready, so submit whatever is pending and then do a blocking wait.
+                               int ret = io_uring_submit(&ring);
+                               if (ret < 0) {
+                                       fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
+                                       exit(1);
+                               }
+                               anything_to_submit = false;
                        }
-                       io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
-                       io_uring_sqe_set_data(sqe, pending);
-                       anything_to_submit = true;
-               } else {
-                       io_uring_cqe_seen(&ring, cqe);
-                       --pending_reads;
-
-                       size_t old_pending_reads = pending_reads;
-                       pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
-                       free(pending->buf);
-                       delete pending;
-
-                       if (pending_reads != old_pending_reads) {
-                               // A new read was made in the callback (and not queued),
-                               // so we need to re-submit.
-                               anything_to_submit = true;
+                       int ret = io_uring_wait_cqe(&ring, &cqes[0]);
+                       if (ret < 0) {
+                               fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
+                               exit(1);
                        }
+                       num_sqes = 1;
                }
 
-               // See if there are any queued reads we can submit now.
-               while (!queued_reads.empty() && pending_reads < queue_depth) {
-                       io_uring_sqe *sqe = io_uring_get_sqe(&ring);
-                       if (sqe == nullptr) {
-                               fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
+               for (int sqe_idx = 0; sqe_idx < num_sqes; ++sqe_idx) {
+                       io_uring_cqe *cqe = cqes[sqe_idx];
+                       PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
+                       if (cqe->res <= 0) {
+                               fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
                                exit(1);
                        }
-                       QueuedRead &qr = queued_reads.front();
-                       submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
-                       queued_reads.pop();
-                       anything_to_submit = true;
-               }
 
-               if (anything_to_submit) {
-                       // A new read was made, so we need to re-submit.
-                       int ret = io_uring_submit(&ring);
-                       if (ret < 0) {
-                               fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
-                               exit(1);
+                       if (size_t(cqe->res) < pending->iov.iov_len) {
+                               // Incomplete read, so resubmit it.
+                               pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
+                               pending->iov.iov_len -= cqe->res;
+                               pending->offset += cqe->res;
+                               io_uring_cqe_seen(&ring, cqe);
+
+                               io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+                               if (sqe == nullptr) {
+                                       fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
+                                       exit(1);
+                               }
+                               io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
+                               io_uring_sqe_set_data(sqe, pending);
+                               anything_to_submit = true;
                        } else {
-                               anything_to_submit = false;
+                               io_uring_cqe_seen(&ring, cqe);
+                               --pending_reads;
+
+                               size_t old_pending_reads = pending_reads;
+                               pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
+                               free(pending->buf);
+                               delete pending;
+
+                               if (pending_reads != old_pending_reads) {
+                                       // A new read was made in the callback (and not queued),
+                                       // so we need to re-submit.
+                                       anything_to_submit = true;
+                               }
+                       }
+
+                       // See if there are any queued reads we can submit now.
+                       while (!queued_reads.empty() && pending_reads < queue_depth) {
+                               io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+                               if (sqe == nullptr) {
+                                       fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
+                                       exit(1);
+                               }
+                               QueuedRead &qr = queued_reads.front();
+                               submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
+                               queued_reads.pop();
+                               anything_to_submit = true;
                        }
                }
        }