commit 64f03254705d37d95ccde4040c9d5b07716b3bd4
parent 80827d904f7bd05b4dbbd04527f579239e427aa5
Author: Drew DeVault <sir@cmpwn.com>
Date: Tue, 2 Feb 2021 15:56:18 -0500
io, os: implement streams with first-class copy
This allows a stream to use a more efficient copy mechanism if it has
one. os::fdstream has been updated to use sendfile based on this.
Diffstat:
6 files changed, 113 insertions(+), 9 deletions(-)
diff --git a/io/copy.ha b/io/copy.ha
@@ -1,6 +1,11 @@
// Copies data from one stream into another. Note that this function will never
// return if the source stream is infinite.
export fn copy(dest: *stream, src: *stream) (error | size) = {
+ match (dest.copier) {
+ null => void,
+ c: *copier => return c(dest, src),
+ };
+
let w = 0z;
let buf: [4096]u8 = [0u8...];
for (true) {
diff --git a/io/stream.ha b/io/stream.ha
@@ -24,6 +24,7 @@ export type stream = struct {
reader: nullable *reader,
writer: nullable *writer,
closer: nullable *closer,
+ copier: nullable *copier,
};
// Reads up to len(buf) bytes from the reader into the given buffer, returning
diff --git a/io/types.ha b/io/types.ha
@@ -46,3 +46,13 @@ export type writer = fn(s: *stream, buf: const []u8) (size | error);
// itself. The other stream functions may be called after close is called; it is
// their responsibility to return [io::closed] in this case.
export type closer = fn(s: *stream) void;
+
+// The interface for a stream which has first-class support for copying data
+// from another stream. Often this only works if the second stream is of the
+// same underlying stream type. This is optional, [io::copy] still works even
+// with a stream which does not implement this (it falls back to calling read
+// and write in a loop).
+//
+// Returns the number of bytes copied, or an error if one occured. Does not
+// close either stream.
+export type copier = fn(to: *stream, from: *stream) (size | error);
diff --git a/os/+linux/fdstream.ha b/os/+linux/fdstream.ha
@@ -5,6 +5,7 @@ use strings;
type fd_stream = struct {
stream: io::stream,
fd: int,
+ offs: size,
};
// Opens a Unix file descriptor as an io::stream. If 'name' is an empty string,
@@ -18,16 +19,25 @@ export fn fdopen(fd: int, name: str) *io::stream = {
reader = &fd_read,
writer = &fd_write,
closer = &fd_close,
+ copier = &fd_copy,
},
fd = fd,
+ offs = 0z,
});
return &stream.stream;
};
+fn is_fdstream(s: *io::stream) bool = {
+ return s.reader == &fd_read
+ || s.writer == &fd_write
+ || s.closer == &fd_close
+ || s.copier == &fd_copy;
+};
+
// Returns the file descriptor for a given [io::stream]. If there is no fd
// associated with this stream, void is returned.
export fn streamfd(s: *io::stream) (int | void) = {
- if (s.reader != &fd_read) {
+ if (!is_fdstream(s)) {
return void;
};
let stream = s: *fd_stream;
@@ -51,7 +61,10 @@ fn fd_read(s: *io::stream, buf: []u8) (size | io::EOF | io::error) = {
err: rt::errno => errno_to_io(err),
n: size => switch (n) {
0z => io::EOF,
- * => n,
+ * => {
+ stream.offs += n;
+ n;
+ },
},
};
};
@@ -61,7 +74,10 @@ fn fd_write(s: *io::stream, buf: const []u8) (size | io::error) = {
let r = rt::write(stream.fd, buf: *const [*]u8, len(buf));
return match (rt::wrap_return(r)) {
err: rt::errno => errno_to_io(err),
- n: size => n,
+ n: size => {
+ stream.offs += n;
+ n;
+ },
};
};
@@ -70,3 +86,42 @@ fn fd_close(s: *io::stream) void = {
rt::close(stream.fd);
free(stream);
};
+
+def SENDFILE_MAX: size = 2147479552z;
+
+fn fd_copy(_to: *io::stream, _from: *io::stream) (size | io::error) = {
+ if (!is_fdstream(_from)) {
+ return copy_fallback(_to, _from);
+ };
+
+ let to = _to: *fd_stream, from = _from: *fd_stream;
+ let offs = to.offs;
+ let sum = 0z;
+ for (true) {
+ let r = rt::sendfile(to.fd, from.fd, &offs, SENDFILE_MAX);
+ let n = match(rt::wrap_return(r)) {
+ err: rt::errno => switch (err) {
+ rt::EINVAL => {
+ assert(sum == 0z);
+ return copy_fallback(_to, _from);
+ },
+ * => return errno_to_io(err),
+ },
+ n: size => switch (n) {
+ 0z => return sum,
+ * => n,
+ },
+ };
+ let w = offs - n;
+ sum += w;
+ to.offs += n;
+ from.offs += n;
+ };
+ return sum;
+};
+
+fn copy_fallback(_to: *io::stream, _from: *io::stream) (size | io::error) = {
+ let to = _to: *fd_stream, temp = *to;
+ temp.stream.copier = null;
+ return io::copy(&temp.stream, _from);
+};
diff --git a/os/+linux/open.ha b/os/+linux/open.ha
@@ -2,12 +2,34 @@ use io;
use rt;
use strings;
+export type flags = enum {
+ NONE = 0,
+ CREATE = 0o100,
+ EXCLUSIVE = 0o200,
+ NOCTTY = 0o400,
+ TRUNC = 0o1000,
+ APPEND = 0o2000,
+ NONBLOCK = 0o4000,
+ DSYNC = 0o10000,
+ SYNC = 0o4010000,
+ RSYNC = 0o4010000,
+ DIRECTORY = 0o200000,
+ NOFOLLOW = 0o400000,
+ CLOEXEC = 0o2000000,
+};
+
// Opens a file from the filesystem.
-export fn open(path: (str | []u8), mode: io::mode) (*io::stream | io::error) = {
+//
+// If no flags are provided, the default is NOCTTY | CLOEXEC.
+export fn open(
+ path: (str | []u8),
+ mode: io::mode,
+ flag: flags...
+) (*io::stream | io::error) = {
// Verify that these line up with the Linux ABI:
- static assert(io::mode::RDONLY: uint == 0u);
- static assert(io::mode::WRONLY: uint == 1u);
- static assert(io::mode::RDWR: uint == 2u);
+ static assert(io::mode::RDONLY: int == 0);
+ static assert(io::mode::WRONLY: int == 1);
+ static assert(io::mode::RDWR: int == 2);
const p: []u8 = match (path) {
s: str => strings::to_utf8(s),
@@ -18,12 +40,20 @@ export fn open(path: (str | []u8), mode: io::mode) (*io::stream | io::error) = {
b: []u8 => "<open([]u8)>", // TODO: try to decode it?
};
- let r = rt::open(p: *[*]u8: *const char, mode: int, 0u);
+ let m = mode: int;
+ if (len(flag) == 0z) {
+ m |= flags::NOCTTY: int | flags::CLOEXEC: int;
+ };
+ for (let i = 0z; i < len(flag); i += 1z) {
+ m |= flag[i]: int;
+ };
+
+ let r = rt::open(p: *[*]u8: *const char, m, 0u);
let fd: int = match (rt::wrap_return(r)) {
err: rt::errno => return errno_to_io(err),
n: size => n: int,
};
- // TODO: Pass mode flags along to fd_stream
+ // TODO: Pass stream mode to fd_stream
return fdopen(fd, name);
};
diff --git a/rt/+linux/syscalls.ha b/rt/+linux/syscalls.ha
@@ -19,6 +19,9 @@ export fn close(fd: int) int = syscall1(SYS_close, fd: u64): int;
export fn getpid() int = syscall0(SYS_getpid): int;
+export fn sendfile(out: int, in: int, offs: *size, count: size) size =
+ syscall4(SYS_sendfile, out: u64, in: u64, offs: uintptr: u64, count: u64): size;
+
export @noreturn fn exit(status: int) void = syscall1(SYS_exit, status: u64);
export fn kill(pid: int, signal: int) int =