commit 78a14ed9397f5fb8ae48cc26695e7610edb993ff
parent 51f2cd3e2644534f8a9dc5b135ebed123dd4fad4
Author: Drew DeVault <sir@cmpwn.com>
Date: Thu, 21 Oct 2021 10:33:48 +0200
iobus: buffer pool management
Signed-off-by: Drew DeVault <sir@cmpwn.com>
Diffstat:
8 files changed, 107 insertions(+), 19 deletions(-)
diff --git a/iobus/io_uring/bus.ha b/iobus/io_uring/bus.ha
@@ -5,6 +5,10 @@ use rt;
def DEFAULT_RING_SIZE: u32 = 256;
+// This global is used for the io_uring SQE user data field to mark SQEs which
+// are used internally by iobus.
+const internal_sqe: int = 0;
+
// Creates a new I/O bus.
export fn new() (*bus | error) = {
let params = io_uring::params { ... };
@@ -53,13 +57,23 @@ export fn submit(bus: *bus, ops: *handle...) (void | error) = {
// Waits until at least one I/O event completes, then returns the result. The
// caller must call [[done]] with the result when done with it.
export fn dispatch(bus: *bus) (result | error) = {
- const res = _dispatch(bus)?;
- const handle = handleof(res);
- for (let i = 0z; i < len(handle.callbacks); i += 1) {
- const cb = handle.callbacks[i];
- cb.0(res, cb.1);
+ for (true) {
+ const res = _dispatch(bus)?;
+ // XXX: Internal SQEs may have to be more sophisticated in the
+ // future, we'll see
+ if (io_uring::get_user(res) == &internal_sqe) {
+ io_uring::result(res)!;
+ done(bus, res);
+ continue;
+ };
+ const handle = handleof(res);
+ for (let i = 0z; i < len(handle.callbacks); i += 1) {
+ const cb = handle.callbacks[i];
+ cb.0(res, cb.1);
+ };
+ return res;
};
- return res;
+ abort();
};
fn _dispatch(bus: *bus) (result | error) = {
diff --git a/iobus/io_uring/pool.ha b/iobus/io_uring/pool.ha
@@ -0,0 +1,65 @@
+use linux::io_uring;
+use fmt;
+
+export type pool = struct {
+ bus: *bus,
+ buffer: []u8,
+ bufsz: size,
+ nbuf: size,
+ group: u16,
+};
+
+export type buffer = u16;
+
+// Creates a new buffer pool for use with read operations. The buffer argument
+// must be a slice whose length is divisible by bufsz, thus providing a buffer
+// pool of N buffers where N is len(buffer) / bufsz.
+export fn newpool(
+ bus: *bus,
+ buffer: []u8,
+ bufsz: size,
+) (pool | queuefull) = {
+ // XXX: If there's an error with the CQE, it will cause an assertion
+ // failure later, which is lame as hell. We should change this to return
+ // the handle and let the user run it through their bus normally.
+ assert(len(buffer) % bufsz == 0, "Invalid buffer pool size");
+ const nbuf = len(buffer) / bufsz;
+ const group = bus.nextgroup;
+ bus.nextgroup += 1;
+
+ let sqe = getsqe(bus)?;
+ io_uring::provide_buffers(sqe, group, buffer, nbuf, bufsz, 0);
+ io_uring::set_user(sqe, &internal_sqe);
+ return pool {
+ bus = bus,
+ buffer = buffer,
+ bufsz = bufsz,
+ nbuf = nbuf,
+ group = group,
+ };
+};
+
+// Configures an operation to use a [[pool]].
+export fn setpool(handle: *handle, pool: pool) void = {
+ io_uring::set_buffer_select(handle.sqe, pool.group);
+ // XXX: Is this always right?
+ handle.sqe.length = pool.bufsz: u32;
+};
+
+// Returns the buffer used by a [[handle]] configured with [[setpool]]. The
+// caller should provide the buffer from the return value to [[release]] when
+// done to return it to the host for future requests. After calling [[release]],
+// the slice reference is no longer valid.
+export fn getbuffer(pool: *pool, result: result) (buffer, []u8) = {
+ let buf: buffer = io_uring::get_buffer_id(result);
+ return (buf, pool.buffer[(buf * pool.bufsz)..((buf + 1) * pool.bufsz)]);
+};
+
+// Returns a buffer from [[getbuffer]] back to the buffer pool.
+export fn release(pool: *pool, buf: buffer) (void | queuefull) = {
+ let sqe = getsqe(pool.bus)?;
+ io_uring::provide_buffers(sqe, pool.group,
+ pool.buffer[(buf * pool.bufsz)..((buf + 1) * pool.bufsz)],
+ 1, pool.bufsz, buf);
+ io_uring::set_user(sqe, &internal_sqe);
+};
diff --git a/iobus/io_uring/types.ha b/iobus/io_uring/types.ha
@@ -17,6 +17,7 @@ export type bus = struct {
uring: io_uring::io_uring,
fdset: []io::file,
lastfd: size,
+ nextgroup: u16,
};
export type result = *io_uring::cqe;
diff --git a/linux/io_uring/cqe.ha b/linux/io_uring/cqe.ha
@@ -41,6 +41,7 @@ export fn get_user(cqe: *cqe) nullable *void =
// [[set_buffer_select]]. Aborts the program if this CQE was not configured to
// use a buffer pool.
export fn get_buffer_id(cqe: *cqe) u16 = {
+ // TODO: Handle ENOBUFS
assert(cqe.flags & cqe_flags::F_BUFFER > 0,
"get_buffer_id called for CQE without buffer");
return (cqe.flags: u32 >> CQE_BUFFER_SHIFT): u16;
diff --git a/linux/io_uring/sqe.ha b/linux/io_uring/sqe.ha
@@ -1,6 +1,7 @@
use endian;
use rt;
use types;
+use fmt;
fn prep(sq: *sqe, op: op, flags: flags...) void = {
rt::memset(sq, 0, size(sqe));
@@ -38,7 +39,7 @@ export fn set_user(sqe: *sqe, user_data: *void) void = {
// retrieve the buffer used from the corresponding [[cqe]].
export fn set_buffer_select(sqe: *sqe, group: u16) void = {
sqe.flags |= flags::BUFFER_SELECT;
- sqe.buf_group = group;
+ sqe.extras.buffers.buf_group = group;
};
// Prepares a no-op "operation" for an [[sqe]].
@@ -109,7 +110,7 @@ export fn read_fixed(
) void = {
assert(count <= types::U32_MAX);
preprw(sqe, op::READ_FIXED, fd, buf, count: u32, 0, flags...);
- sqe.buf_index = index;
+ sqe.extras.buffers.buf_index = index;
};
// Prepares a write for a fixed buffer previously registered with
@@ -125,7 +126,7 @@ export fn write_fixed(
) void = {
assert(count <= types::U32_MAX);
preprw(sqe, op::WRITE_FIXED, fd, buf, count: u32, 0, flags...);
- sqe.buf_index = index;
+ sqe.extras.buffers.buf_index = index;
};
// Prepares an fsync operation for an [[sqe]]. Note that operations are executed
@@ -327,7 +328,7 @@ export fn provide_buffers(
assert(len(pool) == nbuf * bufsz);
preprw(sqe, op::PROVIDE_BUFFERS, nbuf: int, pool: *[*]u8,
bufsz: uint, bufid: uint, flags...);
- sqe.buf_group = group;
+ sqe.extras.buffers.buf_group = group;
};
// Removes buffers previously registered with [[provide_buffers]].
@@ -338,5 +339,5 @@ export fn remove_buffers(
flags: flags...
) void = {
preprw(sqe, op::REMOVE_BUFFERS, nbuf: int, null, 0, 0, flags...);
- sqe.buf_group = group;
+ sqe.extras.buffers.buf_group = group;
};
diff --git a/linux/io_uring/uring.ha b/linux/io_uring/uring.ha
@@ -108,6 +108,7 @@ export type splice_flags = enum u32 {
export type cqe_flags = enum u32 {
NONE = 0,
F_BUFFER = 1 << 0,
+ F_MORE = 1 << 1,
};
// A submission queue entry.
@@ -141,9 +142,11 @@ export type sqe = struct {
splice_flags: splice_flags,
},
user_data: u64,
+ // TODO: Remove the names on these embedded types
+ // See https://todo.sr.ht/~sircmpwn/hare/493
union {
- struct {
- union {
+ extras: struct {
+ buffers: union {
buf_index: u16,
buf_group: u16,
},
diff --git a/scripts/gen-stdlib b/scripts/gen-stdlib
@@ -576,8 +576,9 @@ iobus_io_uring() {
gen_srcs iobus::io_uring \
bus.ha \
handle.ha \
- types.ha \
- ops.ha
+ ops.ha \
+ pool.ha \
+ types.ha
gen_ssa iobus::io_uring errors io linux::io_uring net::ip unix::poll
}
diff --git a/stdlib.mk b/stdlib.mk
@@ -845,8 +845,9 @@ $(HARECACHE)/io/io.ssa: $(stdlib_io_srcs) $(stdlib_rt) $(stdlib_strings) $(stdli
stdlib_iobus_io_uring_srcs= \
$(STDLIB)/iobus/io_uring/bus.ha \
$(STDLIB)/iobus/io_uring/handle.ha \
- $(STDLIB)/iobus/io_uring/types.ha \
- $(STDLIB)/iobus/io_uring/ops.ha
+ $(STDLIB)/iobus/io_uring/ops.ha \
+ $(STDLIB)/iobus/io_uring/pool.ha \
+ $(STDLIB)/iobus/io_uring/types.ha
$(HARECACHE)/iobus/io_uring/iobus_io_uring.ssa: $(stdlib_iobus_io_uring_srcs) $(stdlib_rt) $(stdlib_errors) $(stdlib_io) $(stdlib_linux_io_uring) $(stdlib_net_ip) $(stdlib_unix_poll)
@printf 'HAREC \t$@\n'
@@ -2119,8 +2120,9 @@ $(TESTCACHE)/io/io.ssa: $(testlib_io_srcs) $(testlib_rt) $(testlib_strings) $(te
testlib_iobus_io_uring_srcs= \
$(STDLIB)/iobus/io_uring/bus.ha \
$(STDLIB)/iobus/io_uring/handle.ha \
- $(STDLIB)/iobus/io_uring/types.ha \
- $(STDLIB)/iobus/io_uring/ops.ha
+ $(STDLIB)/iobus/io_uring/ops.ha \
+ $(STDLIB)/iobus/io_uring/pool.ha \
+ $(STDLIB)/iobus/io_uring/types.ha
$(TESTCACHE)/iobus/io_uring/iobus_io_uring.ssa: $(testlib_iobus_io_uring_srcs) $(testlib_rt) $(testlib_errors) $(testlib_io) $(testlib_linux_io_uring) $(testlib_net_ip) $(testlib_unix_poll)
@printf 'HAREC \t$@\n'