hare

The Hare programming language
git clone https://git.torresjrjr.com/hare.git
Log | Files | Refs | README | LICENSE

commit 8e70cbf8cfa6051866bb354b2875d99ff23897e0
parent 345522f00a4fbca5485d10437d2f105dc83dc7a4
Author: Drew DeVault <sir@cmpwn.com>
Date:   Mon, 18 Apr 2022 14:38:51 +0200

iobus: remove module

Moved to https://git.sr.ht/~sircmpwn/hare-iobus

Signed-off-by: Drew DeVault <sir@cmpwn.com>

Diffstat:
Dcmd/iobus/main.ha | 85-------------------------------------------------------------------------------
Mio/README | 3+--
Diobus/+linux | 2--
Diobus/README | 35-----------------------------------
Diobus/io_uring/bus.ha | 143-------------------------------------------------------------------------------
Diobus/io_uring/handle.ha | 72------------------------------------------------------------------------
Diobus/io_uring/ops.ha | 352-------------------------------------------------------------------------------
Diobus/io_uring/pool.ha | 86-------------------------------------------------------------------------------
Diobus/io_uring/types.ha | 76----------------------------------------------------------------------------
Mlinux/io_uring/README | 3---
Mscripts/gen-docs | 2+-
Mscripts/gen-stdlib | 11-----------
Mscripts/install-mods | 1-
Mstdlib.mk | 36------------------------------------
14 files changed, 2 insertions(+), 905 deletions(-)

diff --git a/cmd/iobus/main.ha b/cmd/iobus/main.ha @@ -1,85 +0,0 @@ -// License: GPL-3.0 -// (c) 2021 Drew DeVault <sir@cmpwn.com> -// (c) 2021 Eyal Sawady <ecs@d2evs.net> -use bufio; -use fmt; -use io; -use iobus; -use os; -use strings; -use unix::poll; - -type state = struct { - mainbus: *iobus::bus, - userbus: *iobus::bus, -}; - -export fn main() void = { - let mainbus = iobus::new()!; - defer iobus::destroy(mainbus); - let userbus = iobus::new()!; - defer iobus::destroy(userbus); - - const userfd = iobus::busfile(userbus); - defer io::close(userfd); - - const in = iobus::register_file(mainbus, os::stdin_file)!; - // XXX: io_uring has a somewhat arbitrary limitation on registering - // another uring as a file, so we can't do this. - //const userfd = iobus::register_file(mainbus, userfd)!; - - let pollin = iobus::poll(mainbus, in, poll::event::POLLIN)!; - let polluser = iobus::poll(mainbus, userfd, poll::event::POLLIN)!; - - iobus::enqueue(mainbus, &pollin, &polluser); - fmt::error("> ")!; - - const write = iobus::write(userbus, os::stdout_file, - strings::toutf8("hi!\n"), 0)!; - iobus::submit(userbus, &write)!; - - let state = state { - mainbus = mainbus, - userbus = userbus, - }; - - for (true) { - const res = iobus::dispatch(mainbus)!; - defer iobus::done(mainbus, res); - - if (iobus::handleof(res) == &pollin) { - iobus::endpoll(mainbus, res)!; - if (!readcmd(&state)) { - fmt::errorln("exit")!; - break; - }; - pollin = iobus::poll(mainbus, in, poll::event::POLLIN)!; - iobus::enqueue(mainbus, &pollin); - }; - if (iobus::handleof(res) == &polluser) { - iobus::endpoll(mainbus, res)!; - polluser = iobus::poll(mainbus, userfd, poll::event::POLLIN)!; - iobus::enqueue(mainbus, &polluser); - - // TODO: Something more sophisticated than this - const res = iobus::dispatch(userbus)!; - iobus::done(userbus, res); - fmt::errorln("\rcompletion event received")!; - fmt::error("> ")!; - }; - }; -}; - -fn readcmd(state: *state) bool = { - const line = match (bufio::scanline(os::stdin)!) { - case let line: []u8 => - yield strings::fromutf8(line); - case io::EOF => - return false; - }; - defer free(line); - - fmt::errorfln("{}", line)!; - fmt::error("> ")!; - return true; -}; diff --git a/io/README b/io/README @@ -11,8 +11,7 @@ unless they specifically require the special semantics of one of its subtypes. The [[file]] type provides access to an object, usually a file descriptor, which is provided by the host operating system. It represents objects such as a file on disk, an open network connection, and so on. The use of [[file]] is generally -required when working with host I/O, such as for modules like [[iobus]] or -[[unix::poll]]. +required when working with host I/O, such as with [[unix::poll]]. The [[stream]] type is an abstraction that allows Hare programs to implement their own I/O objects by providing implementations of [[read]], [[write]], and diff --git a/iobus/+linux b/iobus/+linux @@ -1 +0,0 @@ -io_uring -\ No newline at end of file diff --git a/iobus/README b/iobus/README @@ -1,35 +0,0 @@ -The iobus module provides a multiplexer for asynchronous I/O operations. - -A [[bus]] object manages the submission of I/O operations to the host operating -system and maintains the state associated with these. The implementation is -abstracted over various backend implementations which are selected from based on -the host platform, such as io_uring or kqueue. - -The core of iobus is the event loop. The application using iobus will generally -follow these steps: - -- Create an iobus with [[new]], and optionally register a set of [[io::file]]s or buffers with the bus using [[register_file]] or [[register_buffer]]. -- Create I/O handles (e.g. via [[read]]) and [[enqueue]] or [[submit]] them. -- Dispatch the bus with [[dispatch]], blocking until at least one operation is complete and returning its [[result]]. -- Process the results of the completed I/O operation (e.g. with [[endread]]), mark it as [[done]], then re-enter the event loop. - -The event loop will resemble the following: - - let bus = iobus::new(); - defer iobus::destroy(bus); - - // Prepare and submit/enqueue I/O handles as necessary - - for (true) { - const res = iobus::dispatch(); - // Dispatch the result back to the subsystem which created the - // handle, or allow callbacks to process it. - iobus::done(res); - }; - -It is the user's responsibility to ensure that each [[handle]]'s lifetime lasts -for the full duration of the I/O operation. The return value from [[read]] et al -is stack-allocated, and you may wish to pass this to "alloc" (freeing it later, -perhaps in the [[callback]] via [[handleof]]) to persist it. Alternative memory -management strategies may be more suited to your needs, this is left at the -programmer's discretion. diff --git a/iobus/io_uring/bus.ha b/iobus/io_uring/bus.ha @@ -1,143 +0,0 @@ -// License: MPL-2.0 -// (c) 2021-2022 Alexey Yerin <yyp@disroot.org> -// (c) 2022 Bor Grošelj Simić <bor.groseljsimic@telemach.net> -// (c) 2021 Drew DeVault <sir@cmpwn.com> -// (c) 2021 Eyal Sawady <ecs@d2evs.net> -use errors; -use io; -use linux::io_uring; -use rt; - -def DEFAULT_RING_SIZE: u32 = 256; - -// This global is used for the io_uring SQE user data field to mark SQEs which -// are used internally by iobus. -const internal_sqe: int = 0; - -// Creates a new I/O bus. -export fn new() (*bus | error) = { - let params = io_uring::params { ... }; - match (io_uring::setup(DEFAULT_RING_SIZE, &params)) { - case let err: io_uring::error => - return err: error; - case let ring: io_uring::io_uring => - return alloc(bus { - uring = ring, - ... - }); - }; -}; - -// Destroys an I/O bus. -export fn destroy(bus: *bus) void = { - io_uring::finish(&bus.uring); - free(bus); -}; - -// Returns an [[io::file]] for the bus itself. -export fn busfile(bus: *bus) io::file = { - return bus.uring.fd; -}; - -// Registers a set of I/O operations with the queue, without submitting them. -// Must call [[submit]] later submit them to the host. -export fn enqueue(bus: *bus, ops: *handle...) void = { - for (let i = 0z; i < len(ops); i += 1) { - switch (ops[i].sqe.opcode) { - case io_uring::op::ACCEPT => - ops[i].sqe.addr = &ops[i].accept.sockaddr; - ops[i].sqe.addr2 = &ops[i].accept.sockaddr_sz; - case => void; - }; - io_uring::set_user(ops[i].sqe, ops[i]); - }; -}; - -// Submits all pending I/O to the host, without blocking, optionally enqueing -// any number of additional operations prior to submission. -export fn submit(bus: *bus, ops: *handle...) (void | error) = { - enqueue(bus, ops...); - io_uring::submit(&bus.uring)?; - return; -}; - -// Waits until at least one I/O event completes, then returns the result. The -// caller must call [[done]] with the result when done with it. -export fn dispatch(bus: *bus) (result | error) = { - for (true) { - const res = _dispatch(bus)?; - // XXX: Internal SQEs may have to be more sophisticated in the - // future, we'll see - if (io_uring::get_user(res) == &internal_sqe) { - io_uring::result(res)!; // Assert success - io_uring::cqe_seen(&bus.uring, res); - continue; - }; - const handle = handleof(res); - for (let i = 0z; i < len(handle.callbacks); i += 1) { - const cb = handle.callbacks[i]; - cb.0(res, cb.1); - }; - return res; - }; - abort(); -}; - -fn _dispatch(bus: *bus) (result | error) = { - match (io_uring::peek(&bus.uring)?) { - case null => void; - case let cqe: *io_uring::cqe => - return cqe: result; - }; - match (io_uring::submit_wait(&bus.uring, 1)) { - case let err: io_uring::error => - return err: error; - case uint => void; - }; - return io_uring::peek(&bus.uring)? as *io_uring::cqe; -}; - -// Registers a file with the iobus, returning a [[registered_file]] object to -// use for I/O operations. It is not necessary to register files to use them for -// I/O, but it improves performance if you do. -export fn register_file(bus: *bus, file: io::file) (registered_file | error) = { - let registered: int = if (bus.lastfd >= len(bus.fdset)) { - static const init: [256]io::file= [-1...]; - append(bus.fdset, init...); - bus.fdset[bus.lastfd] = file; - io_uring::register_files(&bus.uring, bus.fdset)?; - yield bus.lastfd: int; - } else { - let updates = [ - io_uring::files_update { - offs = bus.lastfd: u32, - fds = &bus.fdset[bus.lastfd], - ... - }, - ]; - bus.fdset[bus.lastfd] = file; - io_uring::register_files_update(&bus.uring, updates)?; - yield bus.lastfd: int; - }; - for (bus.fdset[bus.lastfd] != -1; bus.lastfd += 1) void; - return registered; -}; - -// Unregisters a file previously registered with [[register_file]]. -export fn unregister_file(bus: *bus, file: file) void = { - const reg = match (file) { - case let reg: registered_file => - yield reg; - case => abort("Cannot unregister non-registered file"); - }; - let updates = [ - io_uring::files_update { - offs = reg: u32, - fds = &bus.fdset[reg], - ... - }, - ]; - bus.fdset[reg] = -1; - bus.lastfd = reg: size; - io_uring::register_files_update(&bus.uring, updates)!; -}; diff --git a/iobus/io_uring/handle.ha b/iobus/io_uring/handle.ha @@ -1,72 +0,0 @@ -// License: MPL-2.0 -// (c) 2021 Alexey Yerin <yyp@disroot.org> -// (c) 2021 Drew DeVault <sir@cmpwn.com> -// (c) 2021 Eyal Sawady <ecs@d2evs.net> -use linux::io_uring; - -// Registers a callback to occur following the completion of an I/O operation. -// Note that this is generally less efficient than using an internal dispatch in -// your Hare program based on the handle returned from [[dispatch]]. -export fn callback( - op: *handle, - cb: *fn(res: result, data: *void) void, - data: *void, -) void = { - append(op.callbacks, (cb, data)); -}; - -// Sets an arbitrary user data field for this handle. -export fn setuser(handle: *handle, user: *void) void = { - handle.user = user; -}; - -// Returns the value, if any, of the user data field set by [[setuser]]. -export fn getuser(handle: *handle) nullable *void = { - return handle.user; -}; - -// Returns the [[handle]] which this [[result]] corresponds to. -export fn handleof(res: result) *handle = { - match (io_uring::get_user(res)) { - case null => - abort("I/O submission was prepared, but not enqueued. Did you pass it to iobus::enqueue or iobus::submit?"); - case let v: *void => - return v: *handle; - }; -}; - -// Discards state associated with a [[result]]. -export fn done(bus: *bus, res: result) void = { - const handle = handleof(res); - free(handle.callbacks); - io_uring::cqe_seen(&bus.uring, res); -}; - -// Causes a series of I/O handles to be executed serially, waiting for the Nth -// to complete before starting the N+1th. If any of the I/O operations fail, all -// subsequent I/O handles in the chain will be cancelled, returning -// [[errors::cancelled]]. A short read or write (including [[io::EOF]]) is -// considered a failure for this purpose. -// -// All of the I/O handles must be provided as parameters in the order that they -// were created, and must form a continuous sequence of I/O handles as obtained -// from this bus. Discontinuous sequences are not permitted, such that 'h1 = -// read(); h2 = read(); h3 = read(); serial(h1, h3)' is invalid. This function -// must be called prior to any of the handles being submitted to the bus. -export fn chain(items: *handle...) void = { - // TODO: Add assertions to enforce the constraints of this function - for (let i = 0z; i < len(items) - 1; i += 1) { - items[i].sqe.flags |= io_uring::flags::IO_LINK; - }; -}; - -// Serialize behaves similarly to [[chain]], except that an error in I/O -// execution does not cause the remainder of the chain to be cancelled. However, -// a poorly formed I/O submission (such as for an invalid file handle) will -// still cause the chain to be cancelled. -export fn serialize(items: *handle...) void = { - // TODO: Add assertions to enforce the constraints of this function - for (let i = 0z; i < len(items) - 1; i += 1) { - items[i].sqe.flags |= io_uring::flags::IO_HARDLINK; - }; -}; diff --git a/iobus/io_uring/ops.ha b/iobus/io_uring/ops.ha @@ -1,352 +0,0 @@ -// License: MPL-2.0 -// (c) 2021 Alexey Yerin <yyp@disroot.org> -// (c) 2021 Drew DeVault <sir@cmpwn.com> -// (c) 2021 Eyal Sawady <ecs@d2evs.net> -use fs; -use io; -use linux::io_uring; -use net::ip; -use os; -use rt; -use strings; -use unix::poll; - -fn getsqe(bus: *bus) (*io_uring::sqe | queuefull) = { - match (io_uring::get_sqe(&bus.uring)) { - case null => - return queuefull; - case let sqe: *io_uring::sqe => - return sqe; - }; -}; - -// This value may be passed to [[read]] and [[write]] to use (and update) the -// current file offset, with the caveat that parallel reads or writes may clash. -export def CUR: u64 = -1: u64; - -// Prepares an asynchronous read operation. The offset value may be set to -1 to -// advance the host's internal file offset, and must be set to zero for -// non-seekable files (such as network connections). -export fn read( - bus: *bus, - file: file, - buf: []u8, - offs: u64, -) (handle | queuefull) = { - let sqe = getsqe(bus)?; - let flags = io_uring::flags::NONE; - let fd = match (file) { - case let file: io::file => - yield file: i32; - case let file: registered_file => - flags |= io_uring::flags::FIXED_FILE; - yield file: i32; - }; - io_uring::read(sqe, fd, buf: *[*]u8, len(buf), offs, flags); - return handle { sqe = sqe, ... }; -}; - -// Returns the result of a completed read operation. -export fn endread(res: result) (size | io::EOF | error) = { - assert(handleof(res).sqe.opcode == io_uring::op::READ, - "endread called for non-read iobus::result"); - let z = cqe_result(res)?: size; - if (z == 0) { - return io::EOF; - }; - return z; -}; - -// Prepares an asynchronous write operation. The offset value may be set to -1 -// to advance the host's internal file offset, and must be set to zero for -// non-seekable files (such as network connections). -export fn write( - bus: *bus, - file: file, - buf: []u8, - offs: u64, -) (handle | queuefull) = { - let sqe = getsqe(bus)?; - let flags = io_uring::flags::NONE; - let fd = match (file) { - case let file: io::file => - yield file: i32; - case let file: registered_file => - flags |= io_uring::flags::FIXED_FILE; - yield file: i32; - }; - io_uring::write(sqe, fd, buf: *[*]u8, len(buf), offs, flags); - return handle { sqe = sqe, ... }; -}; - -// Returns the result of a completed write operation. -export fn endwrite(res: result) (size | error) = { - assert(handleof(res).sqe.opcode == io_uring::op::WRITE, - "endwrite called for non-write iobus::result"); - return cqe_result(res)?: size; -}; - -// Prepares an asynchronous close operation. -export fn close(bus: *bus, file: file) (handle | queuefull) = { - let sqe = getsqe(bus)?; - let flags = io_uring::flags::NONE; - let fd = match (file) { - case let file: io::file => - yield file: i32; - case let file: registered_file => - flags |= io_uring::flags::FIXED_FILE; - yield file: i32; - }; - io_uring::close(sqe, fd, flags); - return handle { sqe = sqe, ... }; -}; - -// Returns the result of a completed close operation. -export fn endclose(res: result) (void | error) = { - assert(handleof(res).sqe.opcode == io_uring::op::CLOSE, - "endclose called for non-close iobus::result"); - cqe_result(res)?; - return; -}; - -// List of options for [[endaccept]]. -export type accept_opts = (*ip::addr | *u16); - -// Prepares an asynchronous operation to accept a new connection from a network -// socket. -export fn accept(bus: *bus, file: file) (handle | queuefull) = { - let sqe = getsqe(bus)?; - let flags = io_uring::flags::NONE; - let fd = match (file) { - case let file: io::file => - yield file: i32; - case let file: registered_file => - flags |= io_uring::flags::FIXED_FILE; - yield file: i32; - }; - let handle = handle { - sqe = sqe, - accept = accept_handle { - sockaddr_sz = size(rt::sockaddr): uint, - ... - }, - ... - }; - // sockaddr fields are set up in [[enqueue]] - io_uring::accept(sqe, fd, null, null, rt::SOCK_CLOEXEC: uint, flags); - return handle; -}; - -// Returns the result of a completed accept operation. Accepts a list of -// [[accept_opts]] which can be used to retrieve the peer's address or port. -export fn endaccept(res: result, opts: accept_opts...) (io::file | error) = { - let handle = handleof(res); - assert(handle.sqe.opcode == io_uring::op::ACCEPT, - "endaccept called for non-accept iobus::result"); - const file = cqe_result(res)?: io::file; - for (let i = 0z; i < len(opts); i += 1) { - match (opts[i]) { - case let addr: *ip::addr => - let peer = ip::from_native(handle.accept.sockaddr); - *addr = peer.0; - case let port: *u16 => - let peer = ip::from_native(handle.accept.sockaddr); - *port = peer.1; - }; - }; - return file; -}; - -// Prepares an asynchronous poll operation for a file. -export fn poll( - bus: *bus, - file: file, - events: poll::event, -) (handle | queuefull) = { - // TODO: Support IORING_POLL_ADD_MULTI - let sqe = getsqe(bus)?; - let flags = io_uring::flags::NONE; - let fd = match (file) { - case let file: io::file => - yield file: i32; - case let file: registered_file => - flags |= io_uring::flags::FIXED_FILE; - yield file: i32; - }; - io_uring::poll_add(sqe, fd, events: uint, flags); - return handle { sqe = sqe, ... }; -}; - -// Returns the revents for a [[poll]] operation. -export fn endpoll(bus: *bus, res: result) (poll::event | error) = { - let handle = handleof(res); - assert(handle.sqe.opcode == io_uring::op::POLL_ADD, - "endpoll called for non-poll iobus::result"); - return cqe_result(res)?: poll::event; -}; - -// TODO: poll_remove - -// Prepares an asynchronous open operation. -// -// If no flags are provided, [[fs::flags::RDONLY]], [[fs::flags::NOCTTY]], -// [[fs::flags::CLOEXEC]] are used when opening the file. If you pass your own -// flags, it is recommended that you add the latter two unless you know that you -// do not want them. -export fn open( - bus: *bus, - path: str, - flags: fs::flags... -) (handle | queuefull) = { - let sqe = getsqe(bus)?; - let handle = handle { - sqe = sqe, - cstring = strings::to_c(path), - ... - }; - let oflags = 0; - if (len(flags) == 0z) { - oflags |= (fs::flags::NOCTTY - | fs::flags::CLOEXEC - | fs::flags::RDONLY): int; - }; - for (let i = 0z; i < len(flags); i += 1z) { - oflags |= flags[i]: int; - }; - io_uring::openat(sqe, os::dirfile(os::cwd), handle.cstring, oflags, 0); - return handle; -}; - -// Returns the new file handle from an [[open]] operation. -export fn endopen(bus: *bus, res: result) (io::file | error) = { - let handle = handleof(res); - assert(handle.sqe.opcode == io_uring::op::OPENAT, - "endopen called for non-open iobus::result"); - free(handle.cstring); - return cqe_result(res)?: io::file; -}; - -// Prepares an asynchronous file creation operation. -// -// If no flags are provided, [[fs::flags::WRONLY]], [[fs::flags::NOCTTY]], -// [[fs::flags::CLOEXEC]] are used when opening the file. If you pass your own -// flags, it is recommended that you add the latter two unless you know that you -// do not want them. -export fn create( - bus: *bus, - path: str, - mode: fs::mode, - flags: fs::flags... -) (handle | queuefull) = { - let sqe = getsqe(bus)?; - let oflags = 0; - if (len(flags) == 0z) { - oflags |= (fs::flags::NOCTTY - | fs::flags::CLOEXEC - | fs::flags::WRONLY): int; - }; - for (let i = 0z; i < len(flags); i += 1z) { - oflags |= flags[i]: int; - }; - oflags |= fs::flags::CREATE: int; - let handle = handle { - sqe = sqe, - cstring = strings::to_c(path), - ... - }; - io_uring::openat(sqe, os::dirfile(os::cwd), handle.cstring, oflags, mode); - return handle; -}; - -// Returns the new file handle from a [[create]] operation. -export fn endcreate(bus: *bus, res: result) (io::file | error) = { - let handle = handleof(res); - assert(handle.sqe.opcode == io_uring::op::OPENAT, - "endcreate called for non-create iobus::result"); - free(handle.cstring); - return cqe_result(res)?: io::file; -}; - -// Flags for the [[recv]] and [[send]] family of operations. Only PEEK, OOB, -// NOSIGNAL, and WAITALL are defined by POSIX. -export type msgflag = enum int { - OOB = rt::MSG_OOB, - PEEK = rt::MSG_PEEK, - DONTROUTE = rt::MSG_DONTROUTE, - TRYHARD = rt::MSG_TRYHARD, - CTRUNC = rt::MSG_CTRUNC, - PROBE = rt::MSG_PROBE, - TRUNC = rt::MSG_TRUNC, - DONTWAIT = rt::MSG_DONTWAIT, - EOR = rt::MSG_EOR, - WAITALL = rt::MSG_WAITALL, - FIN = rt::MSG_FIN, - SYN = rt::MSG_SYN, - CONFIRM = rt::MSG_CONFIRM, - RST = rt::MSG_RST, - ERRQUEUE = rt::MSG_ERRQUEUE, - NOSIGNAL = rt::MSG_NOSIGNAL, - MORE = rt::MSG_MORE, - WAITFORONE = rt::MSG_WAITFORONE, - SENDPAGE_NOPOLICY = rt::MSG_SENDPAGE_NOPOLICY, - SENDPAGE_NOTLAST = rt::MSG_SENDPAGE_NOTLAST, - BATCH = rt::MSG_BATCH, - EOF = rt::MSG_EOF, - NO_SHARED_FRAGS = rt::MSG_NO_SHARED_FRAGS, - SENDPAGE_DECRYPTED = rt::MSG_SENDPAGE_DECRYPTED, - ZEROCOPY = rt::MSG_ZEROCOPY, - FASTOPEN = rt::MSG_FASTOPEN, - CMSG_CLOEXEC = rt::MSG_CMSG_CLOEXEC, -}; - -// Prepares an operation to receive data from a socket. -export fn recv( - bus: *bus, - file: file, - buf: []u8, - flags: msgflag... -) (handle | queuefull) = { - let rflags = 0; - for (let i = 0z; i < len(flags); i += 1) { - rflags |= flags[i]; - }; - - let sqe = getsqe(bus)?; - let iflags = io_uring::flags::NONE; - let fd = match (file) { - case let file: io::file => - yield file: i32; - case let file: registered_file => - iflags |= io_uring::flags::FIXED_FILE; - yield file: i32; - }; - - io_uring::recv(sqe, fd, buf: *[*]u8, len(buf), rflags, iflags); - return handle { sqe = sqe, ... }; -}; - -// Prepares an operation to send data to a socket. -export fn send( - bus: *bus, - file: file, - buf: []u8, - flags: msgflag... -) (handle | queuefull) = { - let sflags = 0; - for (let i = 0z; i < len(flags); i += 1) { - sflags |= flags[i]; - }; - - let sqe = getsqe(bus)?; - let iflags = io_uring::flags::NONE; - let fd = match (file) { - case let file: io::file => - yield file: i32; - case let file: registered_file => - iflags |= io_uring::flags::FIXED_FILE; - yield file: i32; - }; - - io_uring::send(sqe, fd, buf: *[*]u8, len(buf), sflags, iflags); - return handle { sqe = sqe, ... }; -}; diff --git a/iobus/io_uring/pool.ha b/iobus/io_uring/pool.ha @@ -1,86 +0,0 @@ -// License: MPL-2.0 -// (c) 2021 Drew DeVault <sir@cmpwn.com> -use linux::io_uring; - -export type pool = struct { - bus: *bus, - buffer: []u8, - bufsz: size, - nbuf: size, - group: u16, -}; - -export type buffer = u16; - -// Creates a new buffer pool for use with read operations. The user can request -// pool use for an I/O request via [[setpool]], and the host will select a -// suitable buffer from the pool to perform the read request. The selected -// buffer can be determined from the [[result]] using [[getbuffer]]. The user -// should, after processing the results of the I/O operation, return the buffer -// to the pool with [[release]]. If no buffers were available, [[endread]] (or -// whichever "end" function is appropriate) will return [[nobuffers]]; in this -// case the user should release some buffers and re-submit the read later. -// -// The utility of this functionality is that the user may submit more reads than -// they have buffers allocated. For instance, a network server may allocate a -// pool of X buffers for reads, and submit reads for Y client connections, where -// X < Y. This allows the server software to support more connections without a -// corresponding increase in RAM utilization, albiet at the cost of limiting -// active connections to the number of available buffers. -// -// The buffer argument must be a slice whose length is divisible by bufsz, thus -// providing a buffer pool of N buffers where N is len(buffer) / bufsz. -export fn newpool( - bus: *bus, - buffer: []u8, - bufsz: size, -) (pool | queuefull) = { - // XXX: If there's an error with the CQE, it will cause an assertion - // failure later, which is lame as hell. We should change this to return - // the handle and let the user run it through their bus normally. - assert(len(buffer) % bufsz == 0, "Invalid buffer pool size"); - const nbuf = len(buffer) / bufsz; - const group = bus.nextgroup; - bus.nextgroup += 1; - - let sqe = getsqe(bus)?; - io_uring::provide_buffers(sqe, group, buffer, nbuf, bufsz, 0); - io_uring::set_user(sqe, &internal_sqe); - return pool { - bus = bus, - buffer = buffer, - bufsz = bufsz, - nbuf = nbuf, - group = group, - }; -}; - -// Configures an operation to use a [[pool]]. -export fn setpool(handle: *handle, pool: pool) void = { - io_uring::set_buffer_select(handle.sqe, pool.group); - // XXX: Is this always right? - handle.sqe.length = pool.bufsz: u32; -}; - -// Returns the buffer used by a [[handle]] configured with [[setpool]]. The -// caller should provide the buffer from the return value to [[release]] when -// done to return it to the host for future requests. After calling [[release]], -// the slice reference is no longer valid. -// -// Note that I/O operations which completed with an error condition will not -// have a buffer assigned, even if [[setpool]] was called for the original -// handle. You must thus test for errors prior to calling [[getbuffer]], or risk -// causing an assertion failure. -export fn getbuffer(pool: *pool, result: result) (buffer, []u8) = { - let buf: buffer = io_uring::get_buffer_id(result); - return (buf, pool.buffer[(buf * pool.bufsz)..((buf + 1) * pool.bufsz)]); -}; - -// Returns a buffer from [[getbuffer]] back to the buffer pool for future reads. -export fn release(pool: *pool, buf: buffer) (void | queuefull) = { - let sqe = getsqe(pool.bus)?; - io_uring::provide_buffers(sqe, pool.group, - pool.buffer[(buf * pool.bufsz)..((buf + 1) * pool.bufsz)], - 1, pool.bufsz, buf); - io_uring::set_user(sqe, &internal_sqe); -}; diff --git a/iobus/io_uring/types.ha b/iobus/io_uring/types.ha @@ -1,76 +0,0 @@ -// License: MPL-2.0 -// (c) 2021 Drew DeVault <sir@cmpwn.com> -// (c) 2021 Eyal Sawady <ecs@d2evs.net> -use errors; -use io; -use linux::io_uring; -use rt; - -// All errors which may be raised by iobus. -export type error = !(io_uring::error | nobuffers); - -// Returned from [[endread]] et al when an operation was configured to use a -// buffer pool, but the buffer pool has been exhausted. -export type nobuffers = !void; - -// Converts an [[error]] into a user-friendly string. -export fn strerror(err: error) const str = { - match (err) { - case nobuffers => - return "iobus buffer pool exhausted"; - case let err: io_uring::error => - return io_uring::strerror(err); - }; -}; - -fn wraperror(err: io_uring::error) error = { - match (err) { - case io_uring::nobuffers => - return nobuffers; - case => - return err; - }; -}; - -fn cqe_result(res: *io_uring::cqe) (int | error) = { - match (io_uring::result(res)) { - case let err: io_uring::error => - return wraperror(err); - case let n: int => - return n; - }; -}; - -export type bus = struct { - uring: io_uring::io_uring, - fdset: []io::file, - lastfd: size, - nextgroup: u16, -}; - -export type result = *io_uring::cqe; - -export type handle = struct { - sqe: *io_uring::sqe, - user: nullable *void, - callbacks: [](*fn(res: result, data: *void) void, *void), - union { - accept: accept_handle, - cstring: *char, - }, -}; - -export type accept_handle = struct { - sockaddr: rt::sockaddr, - sockaddr_sz: uint, -}; - -export type registered_file = int; - -// A file object which accepts multiplexed I/O operations. -export type file = (registered_file | io::file); - -// Returned if the submission queue is full and new I/O submissions are not -// available. The user should call [[submit]] before attempting to submit new -// I/O. -export type queuefull = !void; diff --git a/linux/io_uring/README b/linux/io_uring/README @@ -4,6 +4,3 @@ familiar with io_uring. Thus, it is recommended that a reading of this module is paired with the Linux documentation, which may be available from your local liburing package under the io_uring_setup, io_uring_enter, and io_uring_register man pages. - -For a high-level I/O multiplexing interface, which takes advantage of io_uring -if available, users are encouraged to use [[iobus]] instead. diff --git a/scripts/gen-docs b/scripts/gen-docs @@ -24,7 +24,7 @@ modules="$(getmods)" mkdir -p docs/html/ haredoc -Fhtml > docs/html/index.html -for mod in $modules compress format encoding math crypto hare rt iobus +for mod in $modules compress format encoding math crypto hare rt do echo $mod path="$(echo $mod | sed -e 's?::?/?g')" diff --git a/scripts/gen-stdlib b/scripts/gen-stdlib @@ -817,16 +817,6 @@ io() { gen_ssa -pfreebsd io strings errors } -iobus_io_uring() { - gen_srcs -plinux iobus::io_uring \ - bus.ha \ - handle.ha \ - ops.ha \ - pool.ha \ - types.ha - gen_ssa -plinux iobus::io_uring errors io linux::io_uring net::ip unix::poll -} - linux() { gen_srcs -plinux linux \ start.ha \ @@ -1367,7 +1357,6 @@ hash::crc32 hash::crc64 hash::fnv io linux freebsd -iobus::io_uring linux linux linux linux::keyctl linux linux::io_uring linux diff --git a/scripts/install-mods b/scripts/install-mods @@ -18,7 +18,6 @@ glob hare hash io -iobus linux log math diff --git a/stdlib.mk b/stdlib.mk @@ -458,10 +458,6 @@ stdlib_deps_linux+=$(stdlib_io_linux) stdlib_io_freebsd=$(HARECACHE)/io/io-freebsd.o stdlib_deps_freebsd+=$(stdlib_io_freebsd) -# gen_lib iobus::io_uring (linux) -stdlib_iobus_io_uring_linux=$(HARECACHE)/iobus/io_uring/iobus_io_uring-linux.o -stdlib_deps_linux+=$(stdlib_iobus_io_uring_linux) - # gen_lib linux (linux) stdlib_linux_linux=$(HARECACHE)/linux/linux-linux.o stdlib_deps_linux+=$(stdlib_linux_linux) @@ -1363,20 +1359,6 @@ $(HARECACHE)/io/io-freebsd.ssa: $(stdlib_io_freebsd_srcs) $(stdlib_rt) $(stdlib_ @HARECACHE=$(HARECACHE) $(HAREC) $(HAREFLAGS) -o $@ -Nio \ -t$(HARECACHE)/io/io.td $(stdlib_io_freebsd_srcs) -# iobus::io_uring (+linux) -stdlib_iobus_io_uring_linux_srcs= \ - $(STDLIB)/iobus/io_uring/bus.ha \ - $(STDLIB)/iobus/io_uring/handle.ha \ - $(STDLIB)/iobus/io_uring/ops.ha \ - $(STDLIB)/iobus/io_uring/pool.ha \ - $(STDLIB)/iobus/io_uring/types.ha - -$(HARECACHE)/iobus/io_uring/iobus_io_uring-linux.ssa: $(stdlib_iobus_io_uring_linux_srcs) $(stdlib_rt) $(stdlib_errors_$(PLATFORM)) $(stdlib_io_$(PLATFORM)) $(stdlib_linux_io_uring_$(PLATFORM)) $(stdlib_net_ip_$(PLATFORM)) $(stdlib_unix_poll_$(PLATFORM)) - @printf 'HAREC \t$@\n' - @mkdir -p $(HARECACHE)/iobus/io_uring - @HARECACHE=$(HARECACHE) $(HAREC) $(HAREFLAGS) -o $@ -Niobus::io_uring \ - -t$(HARECACHE)/iobus/io_uring/iobus_io_uring.td $(stdlib_iobus_io_uring_linux_srcs) - # linux (+linux) stdlib_linux_linux_srcs= \ $(STDLIB)/linux/start.ha \ @@ -2464,10 +2446,6 @@ testlib_deps_linux+=$(testlib_io_linux) testlib_io_freebsd=$(TESTCACHE)/io/io-freebsd.o testlib_deps_freebsd+=$(testlib_io_freebsd) -# gen_lib iobus::io_uring (linux) -testlib_iobus_io_uring_linux=$(TESTCACHE)/iobus/io_uring/iobus_io_uring-linux.o -testlib_deps_linux+=$(testlib_iobus_io_uring_linux) - # gen_lib linux (linux) testlib_linux_linux=$(TESTCACHE)/linux/linux-linux.o testlib_deps_linux+=$(testlib_linux_linux) @@ -3407,20 +3385,6 @@ $(TESTCACHE)/io/io-freebsd.ssa: $(testlib_io_freebsd_srcs) $(testlib_rt) $(testl @HARECACHE=$(TESTCACHE) $(HAREC) $(TESTHAREFLAGS) -o $@ -Nio \ -t$(TESTCACHE)/io/io.td $(testlib_io_freebsd_srcs) -# iobus::io_uring (+linux) -testlib_iobus_io_uring_linux_srcs= \ - $(STDLIB)/iobus/io_uring/bus.ha \ - $(STDLIB)/iobus/io_uring/handle.ha \ - $(STDLIB)/iobus/io_uring/ops.ha \ - $(STDLIB)/iobus/io_uring/pool.ha \ - $(STDLIB)/iobus/io_uring/types.ha - -$(TESTCACHE)/iobus/io_uring/iobus_io_uring-linux.ssa: $(testlib_iobus_io_uring_linux_srcs) $(testlib_rt) $(testlib_errors_$(PLATFORM)) $(testlib_io_$(PLATFORM)) $(testlib_linux_io_uring_$(PLATFORM)) $(testlib_net_ip_$(PLATFORM)) $(testlib_unix_poll_$(PLATFORM)) - @printf 'HAREC \t$@\n' - @mkdir -p $(TESTCACHE)/iobus/io_uring - @HARECACHE=$(TESTCACHE) $(HAREC) $(TESTHAREFLAGS) -o $@ -Niobus::io_uring \ - -t$(TESTCACHE)/iobus/io_uring/iobus_io_uring.td $(testlib_iobus_io_uring_linux_srcs) - # linux (+linux) testlib_linux_linux_srcs= \ $(STDLIB)/linux/start.ha \