commit 0900dbb51f946b46630a740ef701727c9fdee7e2
parent f1c3648a68335cc5a9065f1e3e4d4bb946102547
Author: Drew DeVault <sir@cmpwn.com>
Date: Sat, 15 May 2021 11:19:31 -1000
linux::uring: implement basic submitter
Signed-off-by: Drew DeVault <sir@cmpwn.com>
Diffstat:
6 files changed, 96 insertions(+), 16 deletions(-)
diff --git a/linux/uring/queue.ha b/linux/uring/queue.ha
@@ -6,7 +6,7 @@ use rt;
// Returns the next available [[sqe]] for this [[io_uring]], or null if the
// queue is full.
export fn get_sqe(ring: *io_uring) nullable *sqe = {
- const sq = ring.sq;
+ const sq = &ring.sq;
const head = *sq.khead, next = sq.sqe_tail + 1;
if (next - head <= *sq.kring_entries) {
let sqe = &sq.sqes[sq.sqe_tail & *sq.kring_mask];
@@ -16,14 +16,6 @@ export fn get_sqe(ring: *io_uring) nullable *sqe = {
return null;
};
-// Like [[get_sqe]], but aborts the program if the queue is full.
-export fn must_get_sqe(ring: *io_uring) *sqe = {
- return match (get_sqe(ring)) {
- sq: *sqe => sq,
- null => abort("I/O submission queue full"),
- };
-};
-
fn needs_enter(ring: *io_uring, flags: *enter_flags) bool = {
if (ring.flags & setup_flags::IOPOLL == setup_flags::IOPOLL) {
return true;
@@ -40,6 +32,56 @@ fn needs_enter(ring: *io_uring, flags: *enter_flags) bool = {
fn needs_flush(ring: *io_uring) bool =
*ring.sq.kflags & sqring_flags::CQ_OVERFLOW == sqring_flags::CQ_OVERFLOW;
+// Submits queued I/O asynchronously. Returns the number of submissions accepted
+// by the kernel.
+export fn submit(ring: *io_uring) (uint | errors::opaque) =
+ do_submit(ring, flush_sq(ring), 0u);
+
+// Submits queued I/O asynchronously and blocks until at least "wait" events are
+// complete. If setup_flags::IOPOLL was configured for this ring, the meaning of
+// the "wait" parameter is different: a non-zero value will block until at least
+// one event is completed.
+//
+// Returns the number of submissions accepted by the kernel.
+export fn submit_wait(ring: *io_uring, wait: uint) (uint | errors::opaque) =
+ do_submit(ring, flush_sq(ring), wait);
+
+fn flush_sq(ring: *io_uring) uint = {
+ let sq = &ring.sq;
+ let ktail = *sq.ktail;
+ const mask = *sq.kring_mask;
+
+ if (sq.sqe_head == sq.sqe_tail) {
+ return ktail - *sq.khead;
+ };
+
+ for (let n = sq.sqe_tail - sq.sqe_head; n > 0; n -= 1u) {
+ sq.array[ktail & mask] = sq.sqe_head & mask;
+ ktail += 1u;
+ sq.sqe_head += 1u;
+ };
+
+ *sq.ktail = ktail;
+ return ktail - *sq.khead;
+};
+
+fn do_submit(
+ ring: *io_uring,
+ submitted: uint,
+ wait: uint,
+) (uint | errors::opaque) = {
+ let flags: enter_flags = enter_flags::GETEVENTS;
+ if (needs_enter(ring, &flags) || wait != 0) {
+ return match (rt::io_uring_enter(ring.fd,
+ submitted, wait, flags, null)) {
+ err: rt::errno => errors::errno(err),
+ n: uint => n,
+ };
+ } else {
+ return submitted;
+ };
+};
+
fn peek_cqe(ring: *io_uring) (nullable *cqe, uint) = {
let head = *ring.cq.khead;
let tail = *ring.cq.ktail;
@@ -51,7 +93,7 @@ fn peek_cqe(ring: *io_uring) (nullable *cqe, uint) = {
return (&ring.cq.cqes[head & mask], avail);
};
-fn get_cqe(
+export fn get_cqe(
ring: *io_uring,
submit: uint,
wait: uint,
diff --git a/linux/uring/setup.ha b/linux/uring/setup.ha
@@ -55,7 +55,7 @@ export fn setup(entries: u32, params: *params) (io_uring | error) = {
sq.kring_entries = (ring_ptr + params.sq_off.ring_entries: uintptr): *uint;
sq.kflags = (ring_ptr + params.sq_off.flags: uintptr): *sqring_flags;
sq.kdropped = (ring_ptr + params.sq_off.dropped: uintptr): *uint;
- sq.array = (ring_ptr + params.sq_off.array: uintptr): *uint;
+ sq.array = (ring_ptr + params.sq_off.array: uintptr): *[*]uint;
sq.sqes = match (rt::mmap(null,
params.sq_entries * size(sqe),
rt::PROT_READ | rt::PROT_WRITE,
diff --git a/linux/uring/sqe.ha b/linux/uring/sqe.ha
@@ -1 +1,39 @@
-// TODO
+use types;
+
+fn prep(sq: *sqe, op: op, flags: sqe_flags...) void = {
+ // XXX: Is this compatible with the spec?
+ *sq = sqe { opcode = op, ... };
+ for (let i = 0z; i < len(flags); i += 1) {
+ sq.flags |= flags[i];
+ };
+};
+
+// Prepares a read operation for an [[sqe]].
+export fn read(
+ sqe: *sqe,
+ fd: int,
+ buf: *void,
+ count: size,
+ flags: sqe_flags...,
+) void = {
+ prep(sqe, op::READ, flags...);
+ sqe.fd = fd;
+ sqe.addr = buf: uintptr: u64;
+ assert(count <= types::U32_MAX);
+ sqe.length = count: u32;
+};
+
+// Prepares a write operation for an [[sqe]].
+export fn write(
+ sqe: *sqe,
+ fd: int,
+ buf: *void,
+ count: size,
+ flags: sqe_flags...,
+) void = {
+ prep(sqe, op::WRITE, flags...);
+ sqe.fd = fd;
+ sqe.addr = buf: uintptr: u64;
+ assert(count <= types::U32_MAX);
+ sqe.length = count: u32;
+};
diff --git a/linux/uring/uring.ha b/linux/uring/uring.ha
@@ -315,7 +315,7 @@ export type sq = struct {
kring_entries: *uint,
kflags: *sqring_flags,
kdropped: *uint,
- array: *uint,
+ array: *[*]uint,
sqes: *[*]sqe,
sqe_head: uint,
sqe_tail: uint,
diff --git a/scripts/gen-stdlib b/scripts/gen-stdlib
@@ -523,7 +523,7 @@ linux_uring() {
setup.ha \
sqe.ha \
uring.ha
- gen_ssa linux::uring errors
+ gen_ssa linux::uring errors types
}
linux_vdso() {
diff --git a/stdlib.mk b/stdlib.mk
@@ -703,7 +703,7 @@ stdlib_linux_uring_srcs= \
$(STDLIB)/linux/uring/sqe.ha \
$(STDLIB)/linux/uring/uring.ha
-$(HARECACHE)/linux/uring/linux_uring.ssa: $(stdlib_linux_uring_srcs) $(stdlib_rt) $(stdlib_errors)
+$(HARECACHE)/linux/uring/linux_uring.ssa: $(stdlib_linux_uring_srcs) $(stdlib_rt) $(stdlib_errors) $(stdlib_types)
@printf 'HAREC \t$@\n'
@mkdir -p $(HARECACHE)/linux/uring
@HARECACHE=$(HARECACHE) $(HAREC) $(HAREFLAGS) -o $@ -Nlinux::uring \
@@ -1695,7 +1695,7 @@ testlib_linux_uring_srcs= \
$(STDLIB)/linux/uring/sqe.ha \
$(STDLIB)/linux/uring/uring.ha
-$(TESTCACHE)/linux/uring/linux_uring.ssa: $(testlib_linux_uring_srcs) $(testlib_rt) $(testlib_errors)
+$(TESTCACHE)/linux/uring/linux_uring.ssa: $(testlib_linux_uring_srcs) $(testlib_rt) $(testlib_errors) $(testlib_types)
@printf 'HAREC \t$@\n'
@mkdir -p $(TESTCACHE)/linux/uring
@HARECACHE=$(TESTCACHE) $(HAREC) $(TESTHAREFLAGS) -o $@ -Nlinux::uring \