commit d5072e7cefad607d45b817cd3826b1f772b5f767
parent c5c9138b05c5495f73aaf6734bf594ec1e710016
Author: Drew DeVault <sir@cmpwn.com>
Date: Fri, 22 Oct 2021 09:51:57 +0200
iobus: implement nobuffers error
Signed-off-by: Drew DeVault <sir@cmpwn.com>
Diffstat:
4 files changed, 49 insertions(+), 13 deletions(-)
diff --git a/iobus/io_uring/ops.ha b/iobus/io_uring/ops.ha
@@ -15,6 +15,10 @@ fn getsqe(bus: *bus) (*io_uring::sqe | queuefull) = {
};
};
+// This value may be passed to [[read]] and [[write]] to use (and update) the
+// current file offset, with the caveat that parallel reads or writes may clash.
+export def CUR: u64 = -1: u64;
+
// Prepares an asynchronous read operation. The offset value may be set to -1 to
// advance the host's internal file offset, and must be set to zero for
// non-seekable files (such as network connections).
@@ -41,7 +45,7 @@ export fn read(
export fn endread(res: result) (size | io::EOF | error) = {
assert(handleof(res).sqe.opcode == io_uring::op::READ,
"endread called for non-read iobus::result");
- let z = io_uring::result(res)?: size;
+ let z = cqe_result(res)?: size;
if (z == 0) {
return io::EOF;
};
@@ -74,7 +78,7 @@ export fn write(
export fn endwrite(res: result) (size | error) = {
assert(handleof(res).sqe.opcode == io_uring::op::WRITE,
"endwrite called for non-write iobus::result");
- return io_uring::result(res)?: size;
+ return cqe_result(res)?: size;
};
// Prepares an asynchronous close operation.
@@ -96,7 +100,7 @@ export fn close(bus: *bus, file: file) (handle | queuefull) = {
export fn endclose(res: result) (void | error) = {
assert(handleof(res).sqe.opcode == io_uring::op::CLOSE,
"endclose called for non-close iobus::result");
- io_uring::result(res)?;
+ cqe_result(res)?;
return;
};
@@ -134,7 +138,7 @@ export fn endaccept(res: result, opts: accept_opts...) (io::file | error) = {
let handle = handleof(res);
assert(handle.sqe.opcode == io_uring::op::ACCEPT,
"endaccept called for non-accept iobus::result");
- const file = io_uring::result(res)?: io::file;
+ const file = cqe_result(res)?: io::file;
for (let i = 0z; i < len(opts); i += 1) {
match (opts[i]) {
case addr: *ip::addr =>
@@ -171,7 +175,7 @@ export fn poll(
export fn endpoll(bus: *bus, res: result) (poll::event | error) = {
let handle = handleof(res);
assert(handle.sqe.opcode == io_uring::op::POLL_ADD);
- return io_uring::result(res)?: poll::event;
+ return cqe_result(res)?: poll::event;
};
// TODO: poll_remove
diff --git a/iobus/io_uring/pool.ha b/iobus/io_uring/pool.ha
@@ -1,5 +1,4 @@
use linux::io_uring;
-use fmt;
export type pool = struct {
bus: *bus,
diff --git a/iobus/io_uring/types.ha b/iobus/io_uring/types.ha
@@ -3,14 +3,39 @@ use io;
use linux::io_uring;
use rt;
-// TODO: Add our own nobuffers error
-
// All errors which may be raised by iobus.
-export type error = io_uring::error;
+export type error = !(io_uring::error | nobuffers);
+
+// Returned from [[endread]] et al when an operation was configured to use a
+// buffer pool, but the buffer pool has been exhausted.
+export type nobuffers = !void;
// Converts an [[error]] into a user-friendly string.
export fn strerror(err: error) const str = {
- return io_uring::strerror(err);
+ match (err) {
+ case nobuffers =>
+ return "iobus buffer pool exhausted";
+ case err: io_uring::error =>
+ return io_uring::strerror(err);
+ };
+};
+
+fn wraperror(err: io_uring::error) error = {
+ match (err) {
+ case io_uring::nobuffers =>
+ return nobuffers;
+ case =>
+ return err;
+ };
+};
+
+fn cqe_result(res: *io_uring::cqe) (int | error) = {
+ match (io_uring::result(res)) {
+ case err: io_uring::error =>
+ return wraperror(err);
+ case n: int =>
+ return n;
+ };
};
export type bus = struct {
diff --git a/linux/io_uring/cqe.ha b/linux/io_uring/cqe.ha
@@ -28,9 +28,17 @@ export fn wait(ring: *io_uring) (*cqe | error) = {
export fn peek(ring: *io_uring) (nullable *cqe | error) = get_cqe(ring, 0, 0);
// Returns the result of a [[cqe]], or an error if unsuccessful.
-export fn result(cqe: *cqe) (int | error) =
- if (cqe.res < 0) errors::errno(rt::wrap_errno(-cqe.res))
- else cqe.res;
+export fn result(cqe: *cqe) (int | error) = {
+ if (cqe.res >= 0) {
+ return cqe.res;
+ };
+ switch (-cqe.res) {
+ case rt::ENOBUFS =>
+ return nobuffers;
+ case =>
+ return errors::errno(rt::wrap_errno(-cqe.res));
+ };
+};
// Gets the user data field of a [[cqe]]. See [[set_user]] for the corresponding
// SQE function.