commit 4517c10f0a3b8d6f38be8c392d8d7ca56bbe721b
parent d33cf1238fa6b9af957f3de07fdc14fdc30469fb
Author: Drew DeVault <sir@cmpwn.com>
Date: Sun, 14 Mar 2021 16:13:34 -0400
io: add stream unwrapping
Diffstat:
4 files changed, 54 insertions(+), 17 deletions(-)
diff --git a/bufio/buffered.ha b/bufio/buffered.ha
@@ -1,7 +1,7 @@
use bytes;
use io;
-type bufstream = struct {
+export type bufstream = struct {
stream: io::stream,
source: *io::stream,
rbuffer: []u8,
@@ -11,33 +11,24 @@ type bufstream = struct {
flush: []u8,
};
-// Creates a stream which buffers reads and writes for the underlying stream.
-// This is generally used to improve performance of small reads/writes for
-// sources where I/O operations are costly, such as if they invoke a syscall or
-// take place over the network.
-//
-// The caller should supply one or both of a read and write buffer as a slice of
-// the desired buffer, or empty slices if read or write functionality is
-// disabled. The same buffer may not be used for both reads and writes.
-//
-// When the buffered stream is closed, the underlying stream is also closed. The
-// provided buffers are not freed.
-export fn buffered(
+export fn static_buffered(
src: *io::stream,
rbuf: []u8,
wbuf: []u8,
+ s: *bufstream,
) *io::stream = {
- let s = alloc(bufstream {
+ *s = bufstream {
stream = io::stream {
name = src.name,
closer = &buffered_close,
+ unwrap = &buffered_unwrap,
},
source = src,
rbuffer = rbuf,
wbuffer = wbuf,
flush = ['\n': u32: u8],
...
- });
+ };
if (len(rbuf) != 0) {
s.rfilled = rbuf[..0];
s.stream.reader = &buffered_read;
@@ -53,6 +44,26 @@ export fn buffered(
return &s.stream;
};
+// Creates a stream which buffers reads and writes for the underlying stream.
+// This is generally used to improve performance of small reads/writes for
+// sources where I/O operations are costly, such as if they invoke a syscall or
+// take place over the network.
+//
+// The caller should supply one or both of a read and write buffer as a slice of
+// the desired buffer, or empty slices if read or write functionality is
+// disabled. The same buffer may not be used for both reads and writes.
+//
+// When the buffered stream is closed, the underlying stream is also closed. The
+// provided buffers are not freed.
+export fn buffered(
+ src: *io::stream,
+ rbuf: []u8,
+ wbuf: []u8,
+) *io::stream = {
+ let s = alloc(bufstream { ... });
+ return static_buffered(src, rbuf, wbuf, s);
+};
+
// Flushes pending writes to the underlying stream.
export fn flush(s: *io::stream) void = {
assert(s.closer == &buffered_close,
@@ -77,6 +88,12 @@ fn buffered_close(s: *io::stream) void = {
free(s);
};
+fn buffered_unwrap(s: *io::stream) *io::stream = {
+ assert(s.unwrap == &buffered_unwrap);
+ let s = s: *bufstream;
+ return s.source;
+};
+
fn buffered_read(s: *io::stream, buf: []u8) (size | io::EOF | io::error) = {
assert(s.reader == &buffered_read);
let s = s: *bufstream;
@@ -113,9 +130,11 @@ fn buffered_write(s: *io::stream, buf: const []u8) (size | io::error) = {
let sourcebuf: []u8 = [1, 3, 3, 7];
let source = fixed(sourcebuf, io::mode::READ);
let fb = source: *fixed_stream;
+ defer io::close(source);
let rbuf: [1024]u8 = [0...];
let f = buffered(source, rbuf, []);
+ defer io::close(f);
let buf: [1024]u8 = [0...];
assert(io::read(f, buf[..2]) as size == 2);
@@ -130,9 +149,11 @@ fn buffered_write(s: *io::stream, buf: const []u8) (size | io::error) = {
sourcebuf[32..36] = [7, 3, 3, 1];
let source = fixed(sourcebuf, io::mode::READ);
let fb = source: *fixed_stream;
+ defer io::close(source);
let rbuf: [16]u8 = [0...];
let f = buffered(source, rbuf, []);
+ defer io::close(f);
let buf: [32]u8 = [0...];
assert(io::read(f, buf) as size == 16);
diff --git a/io/stream.ha b/io/stream.ha
@@ -26,6 +26,7 @@ export type stream = struct {
closer: nullable *closer,
copier: nullable *copier,
seeker: nullable *seeker,
+ unwrap: nullable *unwrap,
};
// Reads up to len(buf) bytes from the reader into the given buffer, returning
@@ -70,6 +71,14 @@ export fn tell(s: *stream) (off | error) = {
};
};
+// Returns the underlying stream for a stream which wraps another stream.
+export fn source(s: *stream) (*io::stream | unsupported) = {
+ return match (s.unwrap) {
+ null => unsupported,
+ uw: *unwrap => uw(s),
+ };
+};
+
let _empty: io::stream = io::stream {
reader = &empty_read,
writer = &empty_write,
diff --git a/io/types.ha b/io/types.ha
@@ -76,3 +76,7 @@ export type copier = fn(to: *stream, from: *stream) (size | error);
//
// Returns the new offset relative to the start or an error.
export type seeker = fn(s: *stream, off: off, w: whence) (off | error);
+
+// The interface for a stream which wraps another kind of stream. Returns the
+// underlying stream.
+export type unwrap = fn(s: *stream) *stream;
diff --git a/os/+linux/fdstream.ha b/os/+linux/fdstream.ha
@@ -48,8 +48,11 @@ fn is_fdstream(s: *io::stream) bool = {
// Returns the file descriptor for a given [io::stream]. If there is no fd
// associated with this stream, void is returned.
export fn streamfd(s: *io::stream) (int | void) = {
- if (!is_fdstream(s)) {
- return;
+ for (!is_fdstream(s)) {
+ s = match (io::source(s)) {
+ io::unsupported => return,
+ s: *io::stream => s,
+ };
};
let stream = s: *fd_stream;
return stream.fd;