]> git.sesse.net Git - plocate/blobdiff - io_uring_engine.cpp
Add debug output if io_uring initialization fails.
[plocate] / io_uring_engine.cpp
index cadc6c33a79abeb12d2f63387b2f1fa6037b25f2..03160b12a4abb1f04e83a84acfcf84d7589ad1f1 100644 (file)
@@ -1,34 +1,44 @@
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
 #include <string.h>
 #ifndef WITHOUT_URING
 #include <liburing.h>
 #endif
+#include "dprintf.h"
 #include "io_uring_engine.h"
 
 #include <functional>
-#include <memory>
-#include <stdint.h>
+#include <iosfwd>
+#include <string>
 #include <unistd.h>
+#include <utility>
 
 using namespace std;
 
-IOUringEngine::IOUringEngine()
+IOUringEngine::IOUringEngine(size_t slop_bytes)
+       : slop_bytes(slop_bytes)
 {
 #ifdef WITHOUT_URING
        int ret = -1;
+       dprintf("Compiled without liburing support; not using io_uring.\n");
 #else
        int ret = io_uring_queue_init(queue_depth, &ring, 0);
+       if (ret < 0) {
+               dprintf("io_uring_queue_init() failed; not using io_uring.\n");
+       }
 #endif
        using_uring = (ret >= 0);
 }
 
-void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string)> cb)
+void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
 {
        if (!using_uring) {
                // Synchronous read.
                string s;
-               s.resize(len);
+               s.resize(len + slop_bytes);
                complete_pread(fd, &s[0], len, offset);
-               cb(move(s));
+               cb(string_view(s.data(), len));
                return;
        }
 
@@ -47,10 +57,10 @@ void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(
 }
 
 #ifndef WITHOUT_URING
-void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string)> cb)
+void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
 {
        void *buf;
-       if (posix_memalign(&buf, /*alignment=*/4096, len)) {
+       if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
                fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
                exit(1);
        }
@@ -71,28 +81,28 @@ void IOUringEngine::finish()
 #ifndef WITHOUT_URING
        bool anything_to_submit = true;
        while (pending_reads > 0) {
-               io_uring_cqe *cqes[queue_depth];
-               int num_sqes = io_uring_peek_batch_cqe(&ring, cqes, queue_depth);
-               if (num_sqes == 0) {
+               io_uring_cqe *cqe;
+               if (io_uring_peek_cqe(&ring, &cqe) != 0) {
                        if (anything_to_submit) {
                                // Nothing ready, so submit whatever is pending and then do a blocking wait.
-                               int ret = io_uring_submit(&ring);
+                               int ret = io_uring_submit_and_wait(&ring, 1);
                                if (ret < 0) {
                                        fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
                                        exit(1);
                                }
                                anything_to_submit = false;
+                       } else {
+                               int ret = io_uring_wait_cqe(&ring, &cqe);
+                               if (ret < 0) {
+                                       fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
+                                       exit(1);
+                               }
                        }
-                       int ret = io_uring_wait_cqe(&ring, &cqes[0]);
-                       if (ret < 0) {
-                               fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
-                               exit(1);
-                       }
-                       num_sqes = 1;
                }
 
-               for (int sqe_idx = 0; sqe_idx < num_sqes; ++sqe_idx) {
-                       io_uring_cqe *cqe = cqes[sqe_idx];
+               unsigned head;
+               io_uring_for_each_cqe(&ring, head, cqe)
+               {
                        PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
                        if (cqe->res <= 0) {
                                fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
@@ -119,7 +129,7 @@ void IOUringEngine::finish()
                                --pending_reads;
 
                                size_t old_pending_reads = pending_reads;
-                               pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
+                               pending->cb(string_view(reinterpret_cast<char *>(pending->buf), pending->len));
                                free(pending->buf);
                                delete pending;
 
@@ -129,19 +139,19 @@ void IOUringEngine::finish()
                                        anything_to_submit = true;
                                }
                        }
+               }
 
-                       // See if there are any queued reads we can submit now.
-                       while (!queued_reads.empty() && pending_reads < queue_depth) {
-                               io_uring_sqe *sqe = io_uring_get_sqe(&ring);
-                               if (sqe == nullptr) {
-                                       fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
-                                       exit(1);
-                               }
-                               QueuedRead &qr = queued_reads.front();
-                               submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
-                               queued_reads.pop();
-                               anything_to_submit = true;
+               // See if there are any queued reads we can submit now.
+               while (!queued_reads.empty() && pending_reads < queue_depth) {
+                       io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+                       if (sqe == nullptr) {
+                               fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
+                               exit(1);
                        }
+                       QueuedRead &qr = queued_reads.front();
+                       submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
+                       queued_reads.pop();
+                       anything_to_submit = true;
                }
        }
 #endif