]> git.sesse.net Git - plocate/blobdiff - io_uring_engine.cpp
Release plocate 1.1.22.
[plocate] / io_uring_engine.cpp
index 7714cdb4f8295f69f6909e712eaf3a9c2523e98f..be9366037ae0dbd9e3ad2f34f7c9995513f1d20a 100644 (file)
@@ -1,34 +1,80 @@
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
 #include <string.h>
 #ifndef WITHOUT_URING
 #include <liburing.h>
 #endif
-#include <stdint.h>
-#include <unistd.h>
-#include <memory>
-#include <functional>
-
+#include "complete_pread.h"
+#include "dprintf.h"
 #include "io_uring_engine.h"
 
+#include <functional>
+#include <iosfwd>
+#include <string>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <utility>
+
 using namespace std;
 
-IOUringEngine::IOUringEngine()
+IOUringEngine::IOUringEngine(size_t slop_bytes)
+       : slop_bytes(slop_bytes)
 {
 #ifdef WITHOUT_URING
        int ret = -1;
+       dprintf("Compiled without liburing support; not using io_uring.\n");
 #else
        int ret = io_uring_queue_init(queue_depth, &ring, 0);
+       if (ret < 0) {
+               dprintf("io_uring_queue_init() failed; not using io_uring.\n");
+       }
 #endif
        using_uring = (ret >= 0);
+
+#ifndef WITHOUT_URING
+       if (using_uring) {
+               io_uring_probe *probe = io_uring_get_probe_ring(&ring);
+               supports_stat = (probe != nullptr && io_uring_opcode_supported(probe, IORING_OP_STATX));
+               if (!supports_stat) {
+                       dprintf("io_uring on this kernel does not support statx(); will do synchronous access checking.\n");
+               }
+               io_uring_free_probe(probe);
+       }
+#endif
 }
 
-void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string)> cb)
+void IOUringEngine::submit_stat(const char *path [[maybe_unused]], std::function<void(bool)> cb [[maybe_unused]])
+{
+       assert(supports_stat);
+
+#ifndef WITHOUT_URING
+       if (pending_reads < queue_depth) {
+               io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+               if (sqe == nullptr) {
+                       fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
+                       exit(1);
+               }
+               submit_stat_internal(sqe, strdup(path), move(cb));
+       } else {
+               QueuedStat qs;
+               qs.cb = move(cb);
+               qs.pathname = strdup(path);
+               queued_stats.push(move(qs));
+       }
+#endif
+}
+
+void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
 {
        if (!using_uring) {
                // Synchronous read.
                string s;
-               s.resize(len);
+               s.resize(len + slop_bytes);
                complete_pread(fd, &s[0], len, offset);
-               cb(move(s));
+               cb(string_view(s.data(), len));
                return;
        }
 
@@ -47,16 +93,37 @@ void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(
 }
 
 #ifndef WITHOUT_URING
-void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string)> cb)
+void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
 {
        void *buf;
-       if (posix_memalign(&buf, /*alignment=*/4096, len)) {
+       if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
                fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
                exit(1);
        }
-       PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } };
 
-       io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset);
+       PendingRead *pending = new PendingRead;
+       pending->op = OP_READ;
+       pending->read_cb = move(cb);
+       pending->read.buf = buf;
+       pending->read.len = len;
+       pending->read.fd = fd;
+       pending->read.offset = offset;
+       pending->read.iov = iovec{ buf, len };
+
+       io_uring_prep_readv(sqe, fd, &pending->read.iov, 1, offset);
+       io_uring_sqe_set_data(sqe, pending);
+       ++pending_reads;
+}
+
+void IOUringEngine::submit_stat_internal(io_uring_sqe *sqe, char *path, std::function<void(bool)> cb)
+{
+       PendingRead *pending = new PendingRead;
+       pending->op = OP_STAT;
+       pending->stat_cb = move(cb);
+       pending->stat.pathname = path;
+       pending->stat.buf = new struct statx;
+
+       io_uring_prep_statx(sqe, /*fd=*/-1, pending->stat.pathname, AT_STATX_SYNC_AS_STAT | AT_SYMLINK_NOFOLLOW, STATX_MODE, pending->stat.buf);
        io_uring_sqe_set_data(sqe, pending);
        ++pending_reads;
 }
@@ -69,55 +136,101 @@ void IOUringEngine::finish()
        }
 
 #ifndef WITHOUT_URING
-       int ret = io_uring_submit(&ring);
-       if (ret < 0) {
-               fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret));
-               exit(1);
-       }
-       bool anything_to_submit = false;
+       bool anything_to_submit = true;
        while (pending_reads > 0) {
                io_uring_cqe *cqe;
-               ret = io_uring_wait_cqe(&ring, &cqe);
-               if (ret < 0) {
-                       fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
-                       exit(1);
+               if (io_uring_peek_cqe(&ring, &cqe) != 0) {
+                       if (anything_to_submit) {
+                               // Nothing ready, so submit whatever is pending and then do a blocking wait.
+                               int ret = io_uring_submit_and_wait(&ring, 1);
+                               if (ret < 0) {
+                                       fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
+                                       exit(1);
+                               }
+                               anything_to_submit = false;
+                       } else {
+                               int ret = io_uring_wait_cqe(&ring, &cqe);
+                               if (ret < 0) {
+                                       fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
+                                       exit(1);
+                               }
+                       }
                }
 
-               PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
-               if (cqe->res <= 0) {
-                       fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
-                       exit(1);
+               unsigned head;
+               io_uring_for_each_cqe(&ring, head, cqe)
+               {
+                       PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
+                       if (pending->op == OP_STAT) {
+                               io_uring_cqe_seen(&ring, cqe);
+                               --pending_reads;
+
+                               size_t old_pending_reads = pending_reads;
+                               pending->stat_cb(cqe->res == 0);
+                               free(pending->stat.pathname);
+                               delete pending->stat.buf;
+                               delete pending;
+
+                               if (pending_reads != old_pending_reads) {
+                                       // A new read was made in the callback (and not queued),
+                                       // so we need to re-submit.
+                                       anything_to_submit = true;
+                               }
+                       } else {
+                               if (cqe->res <= 0) {
+                                       fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
+                                       exit(1);
+                               }
+
+                               if (size_t(cqe->res) < pending->read.iov.iov_len) {
+                                       // Incomplete read, so resubmit it.
+                                       pending->read.iov.iov_base = (char *)pending->read.iov.iov_base + cqe->res;
+                                       pending->read.iov.iov_len -= cqe->res;
+                                       pending->read.offset += cqe->res;
+                                       io_uring_cqe_seen(&ring, cqe);
+
+                                       io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+                                       if (sqe == nullptr) {
+                                               fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
+                                               exit(1);
+                                       }
+                                       io_uring_prep_readv(sqe, pending->read.fd, &pending->read.iov, 1, pending->read.offset);
+                                       io_uring_sqe_set_data(sqe, pending);
+                                       anything_to_submit = true;
+                               } else {
+                                       io_uring_cqe_seen(&ring, cqe);
+                                       --pending_reads;
+
+                                       size_t old_pending_reads = pending_reads;
+                                       pending->read_cb(string_view(reinterpret_cast<char *>(pending->read.buf), pending->read.len));
+                                       free(pending->read.buf);
+                                       delete pending;
+
+                                       if (pending_reads != old_pending_reads) {
+                                               // A new read was made in the callback (and not queued),
+                                               // so we need to re-submit.
+                                               anything_to_submit = true;
+                                       }
+                               }
+                       }
                }
 
-               if (size_t(cqe->res) < pending->iov.iov_len) {
-                       // Incomplete read, so resubmit it.
-                       pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
-                       pending->iov.iov_len -= cqe->res;
-                       pending->offset += cqe->res;
-                       io_uring_cqe_seen(&ring, cqe);
-
+               // See if there are any queued stats we can submit now.
+               // Running a stat means we're very close to printing out a match,
+               // which is more important than reading more blocks from disk.
+               // (Even if those blocks returned early, they would only generate
+               // more matches that would be blocked by this one in Serializer.)
+               // Thus, prioritize stats.
+               while (!queued_stats.empty() && pending_reads < queue_depth) {
                        io_uring_sqe *sqe = io_uring_get_sqe(&ring);
                        if (sqe == nullptr) {
-                               fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
+                               fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
                                exit(1);
                        }
-                       io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
-                       io_uring_sqe_set_data(sqe, pending);
+                       QueuedStat &qs = queued_stats.front();
+                       submit_stat_internal(sqe, qs.pathname, move(qs.cb));
+                       queued_stats.pop();
                        anything_to_submit = true;
-               } else {
-                       io_uring_cqe_seen(&ring, cqe);
-                       --pending_reads;
-
-                       size_t old_pending_reads = pending_reads;
-                       pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
-                       free(pending->buf);
-                       delete pending;
-
-                       if (pending_reads != old_pending_reads) {
-                               // A new read was made in the callback (and not queued),
-                               // so we need to re-submit.
-                               anything_to_submit = true;
-                       }
                }
 
                // See if there are any queued reads we can submit now.
@@ -132,36 +245,6 @@ void IOUringEngine::finish()
                        queued_reads.pop();
                        anything_to_submit = true;
                }
-
-               if (anything_to_submit) {
-                       // A new read was made, so we need to re-submit.
-                       int ret = io_uring_submit(&ring);
-                       if (ret < 0) {
-                               fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
-                               exit(1);
-                       } else {
-                               anything_to_submit = false;
-                       }
-               }
        }
 #endif
 }
-
-void complete_pread(int fd, void *ptr, size_t len, off_t offset)
-{
-       while (len > 0) {
-               ssize_t ret = pread(fd, ptr, len, offset);
-               if (ret == -1 && errno == EINTR) {
-                       continue;
-               }
-               if (ret <= 0) {
-                       perror("pread");
-                       exit(1);
-               }
-               ptr = reinterpret_cast<char *>(ptr) + ret;
-               len -= ret;
-               offset -= ret;
-       }
-}
-
-