X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=io_uring_engine.cpp;h=03160b12a4abb1f04e83a84acfcf84d7589ad1f1;hb=99a0a43a777176583f3525e11ef62185dd2be740;hp=cadc6c33a79abeb12d2f63387b2f1fa6037b25f2;hpb=48be942bea5a1fabb09e9a0333c6dfb0ac7f2ebc;p=plocate diff --git a/io_uring_engine.cpp b/io_uring_engine.cpp index cadc6c3..03160b1 100644 --- a/io_uring_engine.cpp +++ b/io_uring_engine.cpp @@ -1,34 +1,44 @@ +#include +#include +#include #include #ifndef WITHOUT_URING #include #endif +#include "dprintf.h" #include "io_uring_engine.h" #include -#include -#include +#include +#include #include +#include using namespace std; -IOUringEngine::IOUringEngine() +IOUringEngine::IOUringEngine(size_t slop_bytes) + : slop_bytes(slop_bytes) { #ifdef WITHOUT_URING int ret = -1; + dprintf("Compiled without liburing support; not using io_uring.\n"); #else int ret = io_uring_queue_init(queue_depth, &ring, 0); + if (ret < 0) { + dprintf("io_uring_queue_init() failed; not using io_uring.\n"); + } #endif using_uring = (ret >= 0); } -void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function cb) +void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function cb) { if (!using_uring) { // Synchronous read. string s; - s.resize(len); + s.resize(len + slop_bytes); complete_pread(fd, &s[0], len, offset); - cb(move(s)); + cb(string_view(s.data(), len)); return; } @@ -47,10 +57,10 @@ void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function cb) +void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function cb) { void *buf; - if (posix_memalign(&buf, /*alignment=*/4096, len)) { + if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) { fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno)); exit(1); } @@ -71,28 +81,28 @@ void IOUringEngine::finish() #ifndef WITHOUT_URING bool anything_to_submit = true; while (pending_reads > 0) { - io_uring_cqe *cqes[queue_depth]; - int num_sqes = io_uring_peek_batch_cqe(&ring, cqes, queue_depth); - if (num_sqes == 0) { + io_uring_cqe *cqe; + if (io_uring_peek_cqe(&ring, &cqe) != 0) { if (anything_to_submit) { // Nothing ready, so submit whatever is pending and then do a blocking wait. - int ret = io_uring_submit(&ring); + int ret = io_uring_submit_and_wait(&ring, 1); if (ret < 0) { fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret)); exit(1); } anything_to_submit = false; + } else { + int ret = io_uring_wait_cqe(&ring, &cqe); + if (ret < 0) { + fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret)); + exit(1); + } } - int ret = io_uring_wait_cqe(&ring, &cqes[0]); - if (ret < 0) { - fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret)); - exit(1); - } - num_sqes = 1; } - for (int sqe_idx = 0; sqe_idx < num_sqes; ++sqe_idx) { - io_uring_cqe *cqe = cqes[sqe_idx]; + unsigned head; + io_uring_for_each_cqe(&ring, head, cqe) + { PendingRead *pending = reinterpret_cast(cqe->user_data); if (cqe->res <= 0) { fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res)); @@ -119,7 +129,7 @@ void IOUringEngine::finish() --pending_reads; size_t old_pending_reads = pending_reads; - pending->cb(string(reinterpret_cast(pending->buf), pending->len)); + pending->cb(string_view(reinterpret_cast(pending->buf), pending->len)); free(pending->buf); delete pending; @@ -129,19 +139,19 @@ void IOUringEngine::finish() anything_to_submit = true; } } + } - // See if there are any queued reads we can submit now. - while (!queued_reads.empty() && pending_reads < queue_depth) { - io_uring_sqe *sqe = io_uring_get_sqe(&ring); - if (sqe == nullptr) { - fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno)); - exit(1); - } - QueuedRead &qr = queued_reads.front(); - submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb)); - queued_reads.pop(); - anything_to_submit = true; + // See if there are any queued reads we can submit now. + while (!queued_reads.empty() && pending_reads < queue_depth) { + io_uring_sqe *sqe = io_uring_get_sqe(&ring); + if (sqe == nullptr) { + fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno)); + exit(1); } + QueuedRead &qr = queued_reads.front(); + submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb)); + queued_reads.pop(); + anything_to_submit = true; } } #endif