X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=io_uring_engine.cpp;h=63a3af78a86256ad4469443e6f872b4f890a43bc;hb=fd6198891d6fd9642effc0843fef6f23b991af3e;hp=fc51abab6a75e23143235f555e8edec82547529b;hpb=2fcf8490a3a7a2e1d434f33383b5d33a0bc3ac03;p=plocate diff --git a/io_uring_engine.cpp b/io_uring_engine.cpp index fc51aba..63a3af7 100644 --- a/io_uring_engine.cpp +++ b/io_uring_engine.cpp @@ -1,15 +1,20 @@ +#include #include +#include #include #include #include #ifndef WITHOUT_URING #include #endif +#include "complete_pread.h" +#include "dprintf.h" #include "io_uring_engine.h" #include #include #include +#include #include #include @@ -20,10 +25,46 @@ IOUringEngine::IOUringEngine(size_t slop_bytes) { #ifdef WITHOUT_URING int ret = -1; + dprintf("Compiled without liburing support; not using io_uring.\n"); #else int ret = io_uring_queue_init(queue_depth, &ring, 0); + if (ret < 0) { + dprintf("io_uring_queue_init() failed; not using io_uring.\n"); + } #endif using_uring = (ret >= 0); + +#ifndef WITHOUT_URING + if (using_uring) { + io_uring_probe *probe = io_uring_get_probe_ring(&ring); + supports_stat = (probe != nullptr && io_uring_opcode_supported(probe, IORING_OP_STATX)); + if (!supports_stat) { + dprintf("io_uring on this kernel does not support statx(); will do synchronous access checking.\n"); + } + free(probe); + } +#endif +} + +void IOUringEngine::submit_stat(const char *path [[maybe_unused]], std::function cb [[maybe_unused]]) +{ + assert(supports_stat); + +#ifndef WITHOUT_URING + if (pending_reads < queue_depth) { + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno)); + exit(1); + } + submit_stat_internal(sqe, strdup(path), move(cb)); + } else { + QueuedStat qs; + qs.cb = move(cb); + qs.pathname = strdup(path); + queued_stats.push(move(qs)); + } +#endif } void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function cb) @@ -59,9 +100,30 @@ void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno)); exit(1); } - PendingRead *pending = new PendingRead{ buf, len, move(cb), fd, offset, { buf, len } }; - io_uring_prep_readv(sqe, fd, &pending->iov, 1, offset); + PendingRead *pending = new PendingRead; + pending->op = OP_READ; + pending->read_cb = move(cb); + pending->read.buf = buf; + pending->read.len = len; + pending->read.fd = fd; + pending->read.offset = offset; + pending->read.iov = iovec{ buf, len }; + + io_uring_prep_readv(sqe, fd, &pending->read.iov, 1, offset); + io_uring_sqe_set_data(sqe, pending); + ++pending_reads; +} + +void IOUringEngine::submit_stat_internal(io_uring_sqe *sqe, char *path, std::function cb) +{ + PendingRead *pending = new PendingRead; + pending->op = OP_STAT; + pending->stat_cb = move(cb); + pending->stat.pathname = path; + pending->stat.buf = new struct statx; + + io_uring_prep_statx(sqe, /*fd=*/-1, pending->stat.pathname, AT_STATX_SYNC_AS_STAT | AT_SYMLINK_NOFOLLOW, STATX_MODE, pending->stat.buf); io_uring_sqe_set_data(sqe, pending); ++pending_reads; } @@ -99,33 +161,14 @@ void IOUringEngine::finish() io_uring_for_each_cqe(&ring, head, cqe) { PendingRead *pending = reinterpret_cast(cqe->user_data); - if (cqe->res <= 0) { - fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res)); - exit(1); - } - - if (size_t(cqe->res) < pending->iov.iov_len) { - // Incomplete read, so resubmit it. - pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res; - pending->iov.iov_len -= cqe->res; - pending->offset += cqe->res; - io_uring_cqe_seen(&ring, cqe); - - io_uring_sqe *sqe = io_uring_get_sqe(&ring); - if (sqe == nullptr) { - fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n"); - exit(1); - } - io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset); - io_uring_sqe_set_data(sqe, pending); - anything_to_submit = true; - } else { + if (pending->op == OP_STAT) { io_uring_cqe_seen(&ring, cqe); --pending_reads; size_t old_pending_reads = pending_reads; - pending->cb(string_view(reinterpret_cast(pending->buf), pending->len)); - free(pending->buf); + pending->stat_cb(cqe->res == 0); + free(pending->stat.pathname); + delete pending->stat.buf; delete pending; if (pending_reads != old_pending_reads) { @@ -133,7 +176,61 @@ void IOUringEngine::finish() // so we need to re-submit. anything_to_submit = true; } + } else { + if (cqe->res <= 0) { + fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res)); + exit(1); + } + + if (size_t(cqe->res) < pending->read.iov.iov_len) { + // Incomplete read, so resubmit it. + pending->read.iov.iov_base = (char *)pending->read.iov.iov_base + cqe->res; + pending->read.iov.iov_len -= cqe->res; + pending->read.offset += cqe->res; + io_uring_cqe_seen(&ring, cqe); + + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n"); + exit(1); + } + io_uring_prep_readv(sqe, pending->read.fd, &pending->read.iov, 1, pending->read.offset); + io_uring_sqe_set_data(sqe, pending); + anything_to_submit = true; + } else { + io_uring_cqe_seen(&ring, cqe); + --pending_reads; + + size_t old_pending_reads = pending_reads; + pending->read_cb(string_view(reinterpret_cast(pending->read.buf), pending->read.len)); + free(pending->read.buf); + delete pending; + + if (pending_reads != old_pending_reads) { + // A new read was made in the callback (and not queued), + // so we need to re-submit. + anything_to_submit = true; + } + } + } + } + + // See if there are any queued stats we can submit now. + // Running a stat means we're very close to printing out a match, + // which is more important than reading more blocks from disk. + // (Even if those blocks returned early, they would only generate + // more matches that would be blocked by this one in Serializer.) + // Thus, prioritize stats. + while (!queued_stats.empty() && pending_reads < queue_depth) { + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno)); + exit(1); } + QueuedStat &qs = queued_stats.front(); + submit_stat_internal(sqe, qs.pathname, move(qs.cb)); + queued_stats.pop(); + anything_to_submit = true; } // See if there are any queued reads we can submit now. @@ -151,20 +248,3 @@ void IOUringEngine::finish() } #endif } - -void complete_pread(int fd, void *ptr, size_t len, off_t offset) -{ - while (len > 0) { - ssize_t ret = pread(fd, ptr, len, offset); - if (ret == -1 && errno == EINTR) { - continue; - } - if (ret <= 0) { - perror("pread"); - exit(1); - } - ptr = reinterpret_cast(ptr) + ret; - len -= ret; - offset -= ret; - } -}