commit b383ccb3c8e336849466fe821bec05b3d0421ee0
parent a8cb40960b965934add5bece1c8cfdcfa5f10d82
Author: Drew DeVault <sir@cmpwn.com>
Date: Tue, 19 Oct 2021 11:24:26 +0200
iobus: add CQE callbacks
This also expands the handle type to set aside some additional storage
for us to use, such as the list of callbacks.
Signed-off-by: Drew DeVault <sir@cmpwn.com>
Diffstat:
6 files changed, 71 insertions(+), 21 deletions(-)
diff --git a/iobus/io_uring/bus.ha b/iobus/io_uring/bus.ha
@@ -23,14 +23,23 @@ export fn destroy(bus: *bus) void = {
io_uring::finish(&bus.uring);
};
-// Submits all pending I/O to the host, without blocking.
-export fn submit(bus: *bus) void = {
- io_uring::submit(&bus.uring)!;
+// Registers a set of I/O operations with the queue, without submitting them.
+// Must call [[submit]] later submit them to the host.
+export fn enqueue(bus: *bus, ops: *handle...) void = {
+ for (let i = 0z; i < len(ops); i += 1) {
+ io_uring::set_user(ops[i].sqe, ops[i]);
+ };
};
-// Waits until at least one I/O event completes, then returns the result. The
-// caller must call [[done]] with the result when done with it.
-export fn dispatch(bus: *bus) (result | error) = {
+// Submits all pending I/O to the host, without blocking, optionally enqueing
+// any number of additional operations prior to submission.
+export fn submit(bus: *bus, ops: *handle...) (void | error) = {
+ enqueue(bus, ops...);
+ io_uring::submit(&bus.uring)?;
+ return;
+};
+
+fn _dispatch(bus: *bus) (result | error) = {
match (io_uring::peek(&bus.uring)?) {
case null => void;
case cqe: *io_uring::cqe =>
@@ -44,14 +53,16 @@ export fn dispatch(bus: *bus) (result | error) = {
};
};
-// Returns the [[handle]] which this [[result]] corresponds to.
-export fn handleof(res: result) handle = {
- return io_uring::get_user(res): handle;
-};
-
-// Discards states associated with an [[result]].
-export fn done(bus: *bus, res: result) void = {
- io_uring::cqe_seen(&bus.uring, res);
+// Waits until at least one I/O event completes, then returns the result. The
+// caller must call [[done]] with the result when done with it.
+export fn dispatch(bus: *bus) (result | error) = {
+ const res = _dispatch(bus)?;
+ const handle = handleof(res);
+ for (let i = 0z; i < len(handle.callbacks); i += 1) {
+ const cb = handle.callbacks[i];
+ cb.0(res, cb.1);
+ };
+ return res;
};
// Registers a file with the iobus, returning a [[registered_file]] object to
diff --git a/iobus/io_uring/handle.ha b/iobus/io_uring/handle.ha
@@ -0,0 +1,29 @@
+use linux::io_uring;
+
+// Registers a callback to occur following the completion of an I/O operation.
+// Note that this is generally less efficient than using an internal dispatch in
+// your Hare program based on the handle returned from [[dispatch]].
+export fn callback(
+ op: *handle,
+ cb: *fn(res: result, data: *void) void,
+ data: *void,
+) void = {
+ append(op.callbacks, (cb, data));
+};
+
+// Returns the [[handle]] which this [[result]] corresponds to.
+export fn handleof(res: result) *handle = {
+ match (io_uring::get_user(res)) {
+ case null =>
+ abort("I/O submission was prepared, but not enqueued. Did you pass it to iobus::enqueue or iobus::submit?");
+ case v: *void =>
+ return v: *handle;
+ };
+};
+
+// Discards states associated with an [[result]].
+export fn done(bus: *bus, res: result) void = {
+ const handle = handleof(res);
+ free(handle.callbacks);
+ io_uring::cqe_seen(&bus.uring, res);
+};
diff --git a/iobus/io_uring/ops.ha b/iobus/io_uring/ops.ha
@@ -18,13 +18,15 @@ export fn read(bus: *bus, file: file, buf: []u8) (handle | queuefull) = {
yield file: i32;
};
io_uring::read(sqe, fd, buf: *[*]u8, len(buf), -1, flags);
- io_uring::set_user(sqe, sqe);
- return sqe: handle;
+ return handle {
+ sqe = sqe,
+ ...
+ };
};
// Returns the result of a completed read operation.
export fn endread(res: result) (size | io::EOF | error) = {
- assert(handleof(res).opcode == io_uring::op::READ,
+ assert(handleof(res).sqe.opcode == io_uring::op::READ,
"endread called for non-read iobus::result");
let z = io_uring::result(res)?: size;
if (z == 0) {
@@ -50,13 +52,15 @@ export fn write(bus: *bus, file: file, buf: []u8) (handle | queuefull) = {
yield file: i32;
};
io_uring::write(sqe, fd, buf: *[*]u8, len(buf), 0, flags);
- io_uring::set_user(sqe, sqe);
- return sqe: handle;
+ return handle {
+ sqe = sqe,
+ ...
+ };
};
// Returns the result of a completed write operation.
export fn endwrite(res: result) (size | error) = {
- assert(handleof(res).opcode == io_uring::op::WRITE,
+ assert(handleof(res).sqe.opcode == io_uring::op::WRITE,
"endwrite called for non-write iobus::result");
return io_uring::result(res)?: size;
};
diff --git a/iobus/io_uring/types.ha b/iobus/io_uring/types.ha
@@ -18,7 +18,10 @@ export type bus = struct {
export type result = *io_uring::cqe;
// An I/O operation handle.
-export type handle = *io_uring::sqe;
+export type handle = struct {
+ sqe: *io_uring::sqe,
+ callbacks: [](*fn(res: result, data: *void) void, *void),
+};
export type registered_file = int;
diff --git a/scripts/gen-stdlib b/scripts/gen-stdlib
@@ -575,6 +575,7 @@ io() {
iobus_io_uring() {
gen_srcs iobus::io_uring \
bus.ha \
+ handle.ha \
types.ha \
ops.ha
gen_ssa iobus::io_uring errors io linux::io_uring
diff --git a/stdlib.mk b/stdlib.mk
@@ -844,6 +844,7 @@ $(HARECACHE)/io/io.ssa: $(stdlib_io_srcs) $(stdlib_rt) $(stdlib_strings) $(stdli
# iobus::io_uring
stdlib_iobus_io_uring_srcs= \
$(STDLIB)/iobus/io_uring/bus.ha \
+ $(STDLIB)/iobus/io_uring/handle.ha \
$(STDLIB)/iobus/io_uring/types.ha \
$(STDLIB)/iobus/io_uring/ops.ha
@@ -2117,6 +2118,7 @@ $(TESTCACHE)/io/io.ssa: $(testlib_io_srcs) $(testlib_rt) $(testlib_strings) $(te
# iobus::io_uring
testlib_iobus_io_uring_srcs= \
$(STDLIB)/iobus/io_uring/bus.ha \
+ $(STDLIB)/iobus/io_uring/handle.ha \
$(STDLIB)/iobus/io_uring/types.ha \
$(STDLIB)/iobus/io_uring/ops.ha