commit a83568fc4b19c53123a10d37576a5dfb357cde78
parent b383ccb3c8e336849466fe821bec05b3d0421ee0
Author: Drew DeVault <sir@cmpwn.com>
Date: Tue, 19 Oct 2021 12:57:33 +0200
iobus: implement accept
Signed-off-by: Drew DeVault <sir@cmpwn.com>
Diffstat:
5 files changed, 148 insertions(+), 25 deletions(-)
diff --git a/iobus/io_uring/bus.ha b/iobus/io_uring/bus.ha
@@ -27,6 +27,12 @@ export fn destroy(bus: *bus) void = {
// Must call [[submit]] later submit them to the host.
export fn enqueue(bus: *bus, ops: *handle...) void = {
for (let i = 0z; i < len(ops); i += 1) {
+ switch (ops[i].sqe.opcode) {
+ case io_uring::op::ACCEPT =>
+ ops[i].sqe.addr = &ops[i].accept.sockaddr;
+ ops[i].sqe.addr2 = &ops[i].accept.sockaddr_sz;
+ case => void;
+ };
io_uring::set_user(ops[i].sqe, ops[i]);
};
};
@@ -45,9 +51,13 @@ fn _dispatch(bus: *bus) (result | error) = {
case cqe: *io_uring::cqe =>
return cqe: result;
};
- match (io_uring::wait(&bus.uring)) {
+ match (io_uring::submit_wait(&bus.uring, 1)) {
case err: io_uring::error =>
return err: error;
+ case uint => void;
+ };
+ match (io_uring::peek(&bus.uring)?) {
+ case null => abort();
case cqe: *io_uring::cqe =>
return cqe: result;
};
@@ -80,7 +90,7 @@ export fn register_file(bus: *bus, file: io::file) registered_file = {
let updates = [
io_uring::files_update {
offs = bus.lastfd: u32,
- fds = &bus.fdset[bus.lastfd],
+ fds = &bus.fdset[0],
},
];
bus.fdset[bus.lastfd] = file;
@@ -90,3 +100,21 @@ export fn register_file(bus: *bus, file: io::file) registered_file = {
for (bus.fdset[bus.lastfd] != -1; bus.lastfd += 1) void;
return registered;
};
+
+// Unregisters a file previously registered with [[register_file]].
+export fn unregister_file(bus: *bus, file: file) void = {
+ const reg = match (file) {
+ case reg: registered_file =>
+ yield reg;
+ case => abort("Cannot unregister non-registered file");
+ };
+ let updates = [
+ io_uring::files_update {
+ offs = reg: u32,
+ fds = &bus.fdset[0],
+ },
+ ];
+ bus.fdset[reg] = -1;
+ bus.lastfd = reg: size;
+ io_uring::register_files_update(&bus.uring, updates)!;
+};
diff --git a/iobus/io_uring/ops.ha b/iobus/io_uring/ops.ha
@@ -1,14 +1,29 @@
use io;
use linux::io_uring;
+use net::ip;
+use net::unix;
+use rt;
+// TODO: Seek to de-duplicate some of the SQE prep code
-// Prepares an asynchronous read operation.
-export fn read(bus: *bus, file: file, buf: []u8) (handle | queuefull) = {
- let sqe = match (io_uring::get_sqe(&bus.uring)) {
+fn getsqe(bus: *bus) (*io_uring::sqe | queuefull) = {
+ match (io_uring::get_sqe(&bus.uring)) {
case null =>
return queuefull;
case sqe: *io_uring::sqe =>
- yield sqe;
+ return sqe;
};
+};
+
+// Prepares an asynchronous read operation. The offset value may be set to -1 to
+// advance the host's internal file offset, and must be set to zero for
+// non-seekable files (such as network connections).
+export fn read(
+ bus: *bus,
+ file: file,
+ buf: []u8,
+ offs: u64,
+) (handle | queuefull) = {
+ let sqe = getsqe(bus)?;
let flags = io_uring::flags::NONE;
let fd = match (file) {
case file: io::file =>
@@ -17,11 +32,8 @@ export fn read(bus: *bus, file: file, buf: []u8) (handle | queuefull) = {
flags |= io_uring::flags::FIXED_FILE;
yield file: i32;
};
- io_uring::read(sqe, fd, buf: *[*]u8, len(buf), -1, flags);
- return handle {
- sqe = sqe,
- ...
- };
+ io_uring::read(sqe, fd, buf: *[*]u8, len(buf), offs, flags);
+ return handle { sqe = sqe, ... };
};
// Returns the result of a completed read operation.
@@ -35,14 +47,16 @@ export fn endread(res: result) (size | io::EOF | error) = {
return z;
};
-// Prepares an asynchronous write operation.
-export fn write(bus: *bus, file: file, buf: []u8) (handle | queuefull) = {
- let sqe = match (io_uring::get_sqe(&bus.uring)) {
- case null =>
- return queuefull;
- case sqe: *io_uring::sqe =>
- yield sqe;
- };
+// Prepares an asynchronous write operation. The offset value may be set to -1
+// to advance the host's internal file offset, and must be set to zero for
+// non-seekable files (such as network connections).
+export fn write(
+ bus: *bus,
+ file: file,
+ buf: []u8,
+ offs: u64,
+) (handle | queuefull) = {
+ let sqe = getsqe(bus)?;
let flags = io_uring::flags::NONE;
let fd = match (file) {
case file: io::file =>
@@ -51,11 +65,8 @@ export fn write(bus: *bus, file: file, buf: []u8) (handle | queuefull) = {
flags |= io_uring::flags::FIXED_FILE;
yield file: i32;
};
- io_uring::write(sqe, fd, buf: *[*]u8, len(buf), 0, flags);
- return handle {
- sqe = sqe,
- ...
- };
+ io_uring::write(sqe, fd, buf: *[*]u8, len(buf), offs, flags);
+ return handle { sqe = sqe, ... };
};
// Returns the result of a completed write operation.
@@ -64,3 +75,74 @@ export fn endwrite(res: result) (size | error) = {
"endwrite called for non-write iobus::result");
return io_uring::result(res)?: size;
};
+
+// Prepares an asyncronous close operation.
+export fn close(bus: *bus, file: file) (handle | queuefull) = {
+ let sqe = getsqe(bus)?;
+ let flags = io_uring::flags::NONE;
+ let fd = match (file) {
+ case file: io::file =>
+ yield file: i32;
+ case file: registered_file =>
+ flags |= io_uring::flags::FIXED_FILE;
+ yield file: i32;
+ };
+ io_uring::close(sqe, fd, flags);
+ return handle { sqe = sqe, ... };
+};
+
+// Returns the result of a completed close operation.
+export fn endclose(res: result) (void | error) = {
+ assert(handleof(res).sqe.opcode == io_uring::op::CLOSE,
+ "endclose called for non-close iobus::result");
+ io_uring::result(res)?;
+ return;
+};
+
+// List of options for [[endaccept]].
+export type accept_opts = (*ip::addr | *u16);
+
+// Prepares an asynchronous operation to accept a new connection from a network
+// socket.
+export fn accept(bus: *bus, file: file) (handle | queuefull) = {
+ let sqe = getsqe(bus)?;
+ let flags = io_uring::flags::NONE;
+ let fd = match (file) {
+ case file: io::file =>
+ yield file: i32;
+ case file: registered_file =>
+ flags |= io_uring::flags::FIXED_FILE;
+ yield file: i32;
+ };
+ let handle = handle {
+ sqe = sqe,
+ accept = accept_handle {
+ sockaddr_sz = size(rt::sockaddr): uint,
+ ...
+ },
+ ...
+ };
+ // sockaddr fields are set up in [[enqueue]]
+ io_uring::accept(sqe, fd, null, null, rt::SOCK_CLOEXEC: uint, flags);
+ return handle;
+};
+
+// Returns the result of a completed accept operation. Accepts a list of
+// [[accept_opts]] which can be used to retrieve the peer's address or port.
+export fn endaccept(res: result, opts: accept_opts...) (io::file | error) = {
+ let handle = handleof(res);
+ assert(handle.sqe.opcode == io_uring::op::ACCEPT,
+ "endaccept called for non-accept iobus::result");
+ const file = io_uring::result(res)?: io::file;
+ for (let i = 0z; i < len(opts); i += 1) {
+ match (opts[i]) {
+ case addr: *ip::addr =>
+ let peer = ip::from_native(handle.accept.sockaddr);
+ *addr = peer.0;
+ case port: *u16 =>
+ let peer = ip::from_native(handle.accept.sockaddr);
+ *port = peer.1;
+ };
+ };
+ return file;
+};
diff --git a/iobus/io_uring/types.ha b/iobus/io_uring/types.ha
@@ -1,6 +1,9 @@
use errors;
use io;
use linux::io_uring;
+use net::ip;
+use net::unix;
+use rt;
export type error = io_uring::error;
@@ -21,6 +24,14 @@ export type result = *io_uring::cqe;
export type handle = struct {
sqe: *io_uring::sqe,
callbacks: [](*fn(res: result, data: *void) void, *void),
+ union {
+ accept: accept_handle,
+ },
+};
+
+export type accept_handle = struct {
+ sockaddr: rt::sockaddr,
+ sockaddr_sz: uint,
};
export type registered_file = int;
diff --git a/net/+linux.ha b/net/+linux.ha
@@ -13,7 +13,7 @@ export type stream_listener = struct {
};
// Gets the fd of the listener's socket. This function is not portable.
-export fn listenerfd(l: *listener) (int | void) = {
+export fn listenerfd(l: *listener) (io::file | void) = {
if (l.accept == &stream_accept) {
return (l: *stream_listener).fd;
};
diff --git a/rt/+linux/socket.ha b/rt/+linux/socket.ha
@@ -99,6 +99,8 @@ export def SOCK_RDM: int = 4;
export def SOCK_SEQPACKET: int = 5;
export def SOCK_DCCP: int = 6;
export def SOCK_PACKET: int = 10;
+export def SOCK_NONBLOCK: int = 0o4000;
+export def SOCK_CLOEXEC: int = 0o2000000;
// protocol for socket(2)
export def IPPROTO_IP: int = 0; // Dummy protocol for TCP