X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=io_uring_engine.cpp;h=fc51abab6a75e23143235f555e8edec82547529b;hb=8c2030fd894ac1539a46a976fe0e551b143a6140;hp=b10639df526d49e306f7a499f42917a5010e0d10;hpb=009ba1838c9185844acd34458ef863828b8ab143;p=plocate diff --git a/io_uring_engine.cpp b/io_uring_engine.cpp index b10639d..fc51aba 100644 --- a/io_uring_engine.cpp +++ b/io_uring_engine.cpp @@ -1,31 +1,43 @@ +#include +#include +#include #include +#ifndef WITHOUT_URING #include -#include -#include -#include -#include - +#endif #include "io_uring_engine.h" +#include +#include +#include +#include +#include + using namespace std; -IOUringEngine::IOUringEngine() +IOUringEngine::IOUringEngine(size_t slop_bytes) + : slop_bytes(slop_bytes) { +#ifdef WITHOUT_URING + int ret = -1; +#else int ret = io_uring_queue_init(queue_depth, &ring, 0); +#endif using_uring = (ret >= 0); } -void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function cb) +void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function cb) { if (!using_uring) { // Synchronous read. string s; - s.resize(len); + s.resize(len + slop_bytes); complete_pread(fd, &s[0], len, offset); - cb(move(s)); + cb(string_view(s.data(), len)); return; } +#ifndef WITHOUT_URING if (pending_reads < queue_depth) { io_uring_sqe *sqe = io_uring_get_sqe(&ring); if (sqe == nullptr) { @@ -36,12 +48,14 @@ void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function cb) +#ifndef WITHOUT_URING +void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function cb) { void *buf; - if (posix_memalign(&buf, /*alignment=*/4096, len)) { + if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) { fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno)); exit(1); } @@ -51,6 +65,7 @@ void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, io_uring_sqe_set_data(sqe, pending); ++pending_reads; } +#endif void IOUringEngine::finish() { @@ -58,54 +73,66 @@ void IOUringEngine::finish() return; } - int ret = io_uring_submit(&ring); - if (ret < 0) { - fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret)); - exit(1); - } - bool anything_to_submit = false; +#ifndef WITHOUT_URING + bool anything_to_submit = true; while (pending_reads > 0) { io_uring_cqe *cqe; - ret = io_uring_wait_cqe(&ring, &cqe); - if (ret < 0) { - fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret)); - exit(1); - } - - PendingRead *pending = reinterpret_cast(cqe->user_data); - if (cqe->res <= 0) { - fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res)); - exit(1); + if (io_uring_peek_cqe(&ring, &cqe) != 0) { + if (anything_to_submit) { + // Nothing ready, so submit whatever is pending and then do a blocking wait. + int ret = io_uring_submit_and_wait(&ring, 1); + if (ret < 0) { + fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret)); + exit(1); + } + anything_to_submit = false; + } else { + int ret = io_uring_wait_cqe(&ring, &cqe); + if (ret < 0) { + fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret)); + exit(1); + } + } } - if (size_t(cqe->res) < pending->iov.iov_len) { - // Incomplete read, so resubmit it. - pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res; - pending->iov.iov_len -= cqe->res; - pending->offset += cqe->res; - io_uring_cqe_seen(&ring, cqe); - - io_uring_sqe *sqe = io_uring_get_sqe(&ring); - if (sqe == nullptr) { - fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n"); + unsigned head; + io_uring_for_each_cqe(&ring, head, cqe) + { + PendingRead *pending = reinterpret_cast(cqe->user_data); + if (cqe->res <= 0) { + fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res)); exit(1); } - io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset); - io_uring_sqe_set_data(sqe, pending); - anything_to_submit = true; - } else { - io_uring_cqe_seen(&ring, cqe); - --pending_reads; - - size_t old_pending_reads = pending_reads; - pending->cb(string(reinterpret_cast(pending->buf), pending->len)); - free(pending->buf); - delete pending; - - if (pending_reads != old_pending_reads) { - // A new read was made in the callback (and not queued), - // so we need to re-submit. + + if (size_t(cqe->res) < pending->iov.iov_len) { + // Incomplete read, so resubmit it. + pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res; + pending->iov.iov_len -= cqe->res; + pending->offset += cqe->res; + io_uring_cqe_seen(&ring, cqe); + + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n"); + exit(1); + } + io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset); + io_uring_sqe_set_data(sqe, pending); anything_to_submit = true; + } else { + io_uring_cqe_seen(&ring, cqe); + --pending_reads; + + size_t old_pending_reads = pending_reads; + pending->cb(string_view(reinterpret_cast(pending->buf), pending->len)); + free(pending->buf); + delete pending; + + if (pending_reads != old_pending_reads) { + // A new read was made in the callback (and not queued), + // so we need to re-submit. + anything_to_submit = true; + } } } @@ -121,18 +148,8 @@ void IOUringEngine::finish() queued_reads.pop(); anything_to_submit = true; } - - if (anything_to_submit) { - // A new read was made, so we need to re-submit. - int ret = io_uring_submit(&ring); - if (ret < 0) { - fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret)); - exit(1); - } else { - anything_to_submit = false; - } - } } +#endif } void complete_pread(int fd, void *ptr, size_t len, off_t offset) @@ -151,5 +168,3 @@ void complete_pread(int fd, void *ptr, size_t len, off_t offset) offset -= ret; } } - -