#include <string.h>
+#ifndef WITHOUT_URING
#include <liburing.h>
+#endif
+#include "io_uring_engine.h"
+
+#include <functional>
+#include <memory>
#include <stdint.h>
#include <unistd.h>
-#include <memory>
-#include <functional>
-
-#include "io_uring_engine.h"
using namespace std;
IOUringEngine::IOUringEngine()
{
+#ifdef WITHOUT_URING
+ int ret = -1;
+#else
int ret = io_uring_queue_init(queue_depth, &ring, 0);
+#endif
using_uring = (ret >= 0);
}
return;
}
+#ifndef WITHOUT_URING
if (pending_reads < queue_depth) {
io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (sqe == nullptr) {
} else {
queued_reads.push(QueuedRead{ fd, len, offset, move(cb) });
}
+#endif
}
+#ifndef WITHOUT_URING
void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string)> cb)
{
void *buf;
io_uring_sqe_set_data(sqe, pending);
++pending_reads;
}
+#endif
void IOUringEngine::finish()
{
return;
}
- int ret = io_uring_submit(&ring);
- if (ret < 0) {
- fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret));
- exit(1);
- }
- bool anything_to_submit = false;
+#ifndef WITHOUT_URING
+ bool anything_to_submit = true;
while (pending_reads > 0) {
io_uring_cqe *cqe;
- ret = io_uring_wait_cqe(&ring, &cqe);
- if (ret < 0) {
- fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
- exit(1);
- }
-
- PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
- if (cqe->res <= 0) {
- fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
- exit(1);
+ if (io_uring_peek_cqe(&ring, &cqe) != 0) {
+ if (anything_to_submit) {
+ // Nothing ready, so submit whatever is pending and then do a blocking wait.
+ int ret = io_uring_submit_and_wait(&ring, 1);
+ if (ret < 0) {
+ fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
+ exit(1);
+ }
+ anything_to_submit = false;
+ } else {
+ int ret = io_uring_wait_cqe(&ring, &cqe);
+ if (ret < 0) {
+ fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
+ exit(1);
+ }
+ }
}
- if (size_t(cqe->res) < pending->iov.iov_len) {
- // Incomplete read, so resubmit it.
- pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
- pending->iov.iov_len -= cqe->res;
- pending->offset += cqe->res;
- io_uring_cqe_seen(&ring, cqe);
-
- io_uring_sqe *sqe = io_uring_get_sqe(&ring);
- if (sqe == nullptr) {
- fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
+ unsigned head;
+ io_uring_for_each_cqe(&ring, head, cqe) {
+ PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
+ if (cqe->res <= 0) {
+ fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
exit(1);
}
- io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
- io_uring_sqe_set_data(sqe, pending);
- anything_to_submit = true;
- } else {
- io_uring_cqe_seen(&ring, cqe);
- --pending_reads;
-
- size_t old_pending_reads = pending_reads;
- pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
- free(pending->buf);
- delete pending;
-
- if (pending_reads != old_pending_reads) {
- // A new read was made in the callback (and not queued),
- // so we need to re-submit.
+
+ if (size_t(cqe->res) < pending->iov.iov_len) {
+ // Incomplete read, so resubmit it.
+ pending->iov.iov_base = (char *)pending->iov.iov_base + cqe->res;
+ pending->iov.iov_len -= cqe->res;
+ pending->offset += cqe->res;
+ io_uring_cqe_seen(&ring, cqe);
+
+ io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+ if (sqe == nullptr) {
+ fprintf(stderr, "No free SQE for resubmit; this shouldn't happen.\n");
+ exit(1);
+ }
+ io_uring_prep_readv(sqe, pending->fd, &pending->iov, 1, pending->offset);
+ io_uring_sqe_set_data(sqe, pending);
anything_to_submit = true;
+ } else {
+ io_uring_cqe_seen(&ring, cqe);
+ --pending_reads;
+
+ size_t old_pending_reads = pending_reads;
+ pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
+ free(pending->buf);
+ delete pending;
+
+ if (pending_reads != old_pending_reads) {
+ // A new read was made in the callback (and not queued),
+ // so we need to re-submit.
+ anything_to_submit = true;
+ }
}
}
queued_reads.pop();
anything_to_submit = true;
}
-
- if (anything_to_submit) {
- // A new read was made, so we need to re-submit.
- int ret = io_uring_submit(&ring);
- if (ret < 0) {
- fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
- exit(1);
- } else {
- anything_to_submit = false;
- }
- }
}
+#endif
}
void complete_pread(int fd, void *ptr, size_t len, off_t offset)
offset -= ret;
}
}
-
-