+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
#ifndef WITHOUT_URING
#include <liburing.h>
#include "io_uring_engine.h"
#include <functional>
-#include <memory>
-#include <stdint.h>
+#include <iosfwd>
+#include <string>
#include <unistd.h>
+#include <utility>
using namespace std;
-IOUringEngine::IOUringEngine()
+IOUringEngine::IOUringEngine(size_t slop_bytes)
+ : slop_bytes(slop_bytes)
{
#ifdef WITHOUT_URING
int ret = -1;
using_uring = (ret >= 0);
}
-void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string)> cb)
+void IOUringEngine::submit_read(int fd, size_t len, off_t offset, function<void(string_view)> cb)
{
if (!using_uring) {
// Synchronous read.
string s;
- s.resize(len);
+ s.resize(len + slop_bytes);
complete_pread(fd, &s[0], len, offset);
- cb(move(s));
+ cb(string_view(s.data(), len));
return;
}
}
#ifndef WITHOUT_URING
-void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string)> cb)
+void IOUringEngine::submit_read_internal(io_uring_sqe *sqe, int fd, size_t len, off_t offset, function<void(string_view)> cb)
{
void *buf;
- if (posix_memalign(&buf, /*alignment=*/4096, len)) {
+ if (posix_memalign(&buf, /*alignment=*/4096, len + slop_bytes)) {
fprintf(stderr, "Couldn't allocate %zu bytes: %s\n", len, strerror(errno));
exit(1);
}
#ifndef WITHOUT_URING
bool anything_to_submit = true;
while (pending_reads > 0) {
- io_uring_cqe *cqes[queue_depth];
- int num_sqes = io_uring_peek_batch_cqe(&ring, cqes, queue_depth);
- if (num_sqes == 0) {
+ io_uring_cqe *cqe;
+ if (io_uring_peek_cqe(&ring, &cqe) != 0) {
if (anything_to_submit) {
// Nothing ready, so submit whatever is pending and then do a blocking wait.
- int ret = io_uring_submit(&ring);
+ int ret = io_uring_submit_and_wait(&ring, 1);
if (ret < 0) {
fprintf(stderr, "io_uring_submit(queued): %s\n", strerror(-ret));
exit(1);
}
anything_to_submit = false;
+ } else {
+ int ret = io_uring_wait_cqe(&ring, &cqe);
+ if (ret < 0) {
+ fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
+ exit(1);
+ }
}
- int ret = io_uring_wait_cqe(&ring, &cqes[0]);
- if (ret < 0) {
- fprintf(stderr, "io_uring_wait_cqe: %s\n", strerror(-ret));
- exit(1);
- }
- num_sqes = 1;
}
- for (int sqe_idx = 0; sqe_idx < num_sqes; ++sqe_idx) {
- io_uring_cqe *cqe = cqes[sqe_idx];
+ unsigned head;
+ io_uring_for_each_cqe(&ring, head, cqe)
+ {
PendingRead *pending = reinterpret_cast<PendingRead *>(cqe->user_data);
if (cqe->res <= 0) {
fprintf(stderr, "async read failed: %s\n", strerror(-cqe->res));
--pending_reads;
size_t old_pending_reads = pending_reads;
- pending->cb(string(reinterpret_cast<char *>(pending->buf), pending->len));
+ pending->cb(string_view(reinterpret_cast<char *>(pending->buf), pending->len));
free(pending->buf);
delete pending;
anything_to_submit = true;
}
}
+ }
- // See if there are any queued reads we can submit now.
- while (!queued_reads.empty() && pending_reads < queue_depth) {
- io_uring_sqe *sqe = io_uring_get_sqe(&ring);
- if (sqe == nullptr) {
- fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
- exit(1);
- }
- QueuedRead &qr = queued_reads.front();
- submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
- queued_reads.pop();
- anything_to_submit = true;
+ // See if there are any queued reads we can submit now.
+ while (!queued_reads.empty() && pending_reads < queue_depth) {
+ io_uring_sqe *sqe = io_uring_get_sqe(&ring);
+ if (sqe == nullptr) {
+ fprintf(stderr, "io_uring_get_sqe: %s\n", strerror(errno));
+ exit(1);
}
+ QueuedRead &qr = queued_reads.front();
+ submit_read_internal(sqe, qr.fd, qr.len, qr.offset, move(qr.cb));
+ queued_reads.pop();
+ anything_to_submit = true;
}
}
#endif