commit dec4f06381ec2da5c903074719e8d2f0322db522
parent 133af73a88d86dcdc0bcb01580df9a4c739101ae
Author: Drew DeVault <sir@cmpwn.com>
Date: Thu, 18 Mar 2021 11:28:58 -0400
bufio: implement write
Diffstat:
2 files changed, 113 insertions(+), 32 deletions(-)
diff --git a/bufio/buffered.ha b/bufio/buffered.ha
@@ -1,13 +1,14 @@
use bytes;
use io;
+use strings;
export type bufstream = struct {
stream: io::stream,
source: *io::stream,
rbuffer: []u8,
wbuffer: []u8,
- rfilled: []u8,
- wfilled: []u8,
+ ravail: size,
+ wavail: size,
flush: []u8,
};
@@ -17,24 +18,23 @@ export fn static_buffered(
wbuf: []u8,
s: *bufstream,
) *io::stream = {
+ static let flush_default = ['\n': u32: u8];
*s = bufstream {
stream = io::stream {
name = src.name,
- closer = &buffered_close,
+ closer = &buffered_close_static,
unwrap = &buffered_unwrap,
},
source = src,
rbuffer = rbuf,
wbuffer = wbuf,
- flush = ['\n': u32: u8],
+ flush = flush_default,
...
};
if (len(rbuf) != 0) {
- s.rfilled = rbuf[..0];
s.stream.reader = &buffered_read;
};
if (len(wbuf) != 0) {
- s.wfilled = wbuf[..0];
s.stream.writer = &buffered_write;
};
if (len(rbuf) != 0 && len(wbuf) != 0) {
@@ -53,29 +53,33 @@ export fn static_buffered(
// the desired buffer, or empty slices if read or write functionality is
// disabled. The same buffer may not be used for both reads and writes.
//
-// When the buffered stream is closed, the underlying stream is also closed. The
-// provided buffers are not freed.
+// The caller is responsible for closing the underlying stream and freeing the
+// provided buffers after the buffered stream is closed.
export fn buffered(
src: *io::stream,
rbuf: []u8,
wbuf: []u8,
) *io::stream = {
let s = alloc(bufstream { ... });
- return static_buffered(src, rbuf, wbuf, s);
+ let st = static_buffered(src, rbuf, wbuf, s);
+ st.closer = &buffered_close;
+ return st;
};
// Flushes pending writes to the underlying stream.
-export fn flush(s: *io::stream) void = {
- assert(s.closer == &buffered_close,
+export fn flush(s: *io::stream) (io::error | void) = {
+ assert(s.writer == &buffered_write,
"bufio::flushed used on non-buffered stream");
let s = s: *bufstream;
- abort(); // TODO
+ io::write(s.source, s.wbuffer[..s.wavail])?;
+ s.wavail = 0;
+ return;
};
// Sets the list of bytes which will cause the stream to flush when written. By
// default, the stream will flush when a newline (\n) is written.
export fn set_flush_bytes(s: *io::stream, b: []u8) void = {
- assert(s.closer == &buffered_close,
+ assert(s.writer == &buffered_write,
"bufio::set_flush_bytes used on non-buffered stream");
let s = s: *bufstream;
s.flush = b;
@@ -83,7 +87,7 @@ export fn set_flush_bytes(s: *io::stream, b: []u8) void = {
// Returns true if this is a buffered stream.
export fn is_buffered(s: *io::stream) bool = {
- return s.closer == &buffered_close;
+ return s.reader == &buffered_read || s.writer == &buffered_write;
};
// Returns true if this stream or any underlying streams are buffered.
@@ -99,8 +103,17 @@ export fn is_buffered_any(s: *io::stream) bool = {
fn buffered_close(s: *io::stream) void = {
assert(s.closer == &buffered_close);
- let s = s: *bufstream;
- io::close(s.source);
+ if (s.writer != null) {
+ flush(s);
+ };
+ free(s);
+};
+
+fn buffered_close_static(s: *io::stream) void = {
+ assert(s.closer == &buffered_close_static);
+ if (s.writer != null) {
+ flush(s);
+ };
free(s);
};
@@ -115,31 +128,63 @@ fn buffered_read(s: *io::stream, buf: []u8) (size | io::EOF | io::error) = {
let s = s: *bufstream;
let n = if (len(buf) < len(s.rbuffer)) len(buf) else len(s.rbuffer);
- if (n > len(s.rfilled)) {
- let z = match (io::read(s.source, s.rbuffer[len(s.rfilled)..])) {
+ if (n > s.ravail) {
+ let z = match (io::read(s.source, s.rbuffer[s.ravail..])) {
err: io::error => return err,
io::EOF => {
- if (len(s.rfilled) == 0) {
+ if (s.ravail == 0) {
return io::EOF;
};
0z;
},
z: size => z,
};
- s.rfilled = s.rbuffer[..len(s.rfilled) + z];
- n = if (n > len(s.rfilled)) len(s.rfilled) else n;
+ s.ravail += z;
+ n = if (n > s.ravail) s.ravail else n;
assert(n != 0);
};
- buf[..n] = s.rfilled[..n];
+ buf[..n] = s.rbuffer[..n];
s.rbuffer[..len(s.rbuffer) - n] = s.rbuffer[n..];
- s.rfilled = s.rbuffer[..len(s.rfilled) - n];
+ s.ravail -= n;
return n;
};
fn buffered_write(s: *io::stream, buf: const []u8) (size | io::error) = {
assert(s.writer == &buffered_write);
- return io::unsupported; // TODO
+ let s = s: *bufstream;
+ let buf = buf;
+
+ let doflush = false;
+ for (let i = 0z; i < len(buf); i += 1) {
+ for (let j = 0z; j < len(s.flush); j += 1) {
+ if (buf[i] == s.flush[j]) {
+ doflush = true;
+ break;
+ };
+ };
+ };
+
+ let z = 0z;
+ for (len(buf) > 0) {
+ let avail = len(s.wbuffer) - s.wavail;
+ if (avail == 0) {
+ flush(&s.stream)?;
+ avail = len(s.wbuffer);
+ };
+
+ const n = if (avail < len(buf)) avail else len(buf);
+ s.wbuffer[s.wavail..s.wavail + n] = buf[..n];
+ buf = buf[n..];
+ s.wavail += n;
+ z += n;
+ };
+
+ if (doflush) {
+ flush(&s.stream)?;
+ };
+
+ return z;
};
@test fn buffered_read() void = {
@@ -180,3 +225,45 @@ fn buffered_write(s: *io::stream, buf: const []u8) (size | io::error) = {
assert(io::read(f, buf) is io::EOF);
assert(len(fb.buf) == 0);
};
+
+@test fn buffered_write() void = {
+ // Normal case
+ let sink = dynamic(io::mode::WRITE);
+ defer io::close(sink);
+
+ let wbuf: [1024]u8 = [0...];
+ let f = buffered(sink, [], wbuf);
+ defer io::close(f);
+
+ assert(io::write(f, [1, 3, 3, 7]) as size == 4);
+ assert(len(buffer(sink)) == 0);
+ assert(io::write(f, [1, 3, 3, 7]) as size == 4);
+ assert(flush(f) is void);
+ assert(bytes::equal(buffer(sink), [1, 3, 3, 7, 1, 3, 3, 7]));
+
+ // Test flushing via buffer exhaustion
+ let sink = dynamic(io::mode::WRITE);
+ defer io::close(sink);
+
+ let wbuf: [4]u8 = [0...];
+ let f = buffered(sink, [], wbuf);
+
+ assert(io::write(f, [1, 3, 3, 7]) as size == 4);
+ assert(len(buffer(sink)) == 0);
+ assert(io::write(f, [1, 3, 3, 7]) as size == 4);
+ assert(bytes::equal(buffer(sink), [1, 3, 3, 7]));
+ io::close(f); // Should flush
+ assert(bytes::equal(buffer(sink), [1, 3, 3, 7, 1, 3, 3, 7]));
+
+ // Test flushing via flush characters
+ let sink = dynamic(io::mode::WRITE);
+ defer io::close(sink);
+
+ let wbuf: [1024]u8 = [0...];
+ let f = buffered(sink, [], wbuf);
+
+ assert(io::write(f, strings::to_utf8("hello")) as size == 5);
+ assert(len(buffer(sink)) == 0);
+ assert(io::write(f, strings::to_utf8(" world!\n")) as size == 8);
+ assert(bytes::equal(buffer(sink), strings::to_utf8("hello world!\n")));
+};
diff --git a/io/types.ha b/io/types.ha
@@ -4,14 +4,11 @@ export type os_error = struct {
data: *void,
}!;
-// An error indicating that the underlying stream has been closed.
-export type closed = void!;
-
// An error indicating that the requested operation is not supported.
export type unsupported = void!;
// Any error which may be returned from an I/O function.
-export type error = (os_error | closed | unsupported)!;
+export type error = (os_error | unsupported)!;
// Indicates an end-of-file condition.
export type EOF = void;
@@ -21,7 +18,6 @@ export fn errstr(err: error) str = {
return match (err) {
err: os_error => err.string(err.data),
unsupported => "The requested operation is not supported",
- closed => "This stream has been closed",
};
};
@@ -51,9 +47,7 @@ export type reader = fn(s: *stream, buf: []u8) (size | EOF | error);
export type writer = fn(s: *stream, buf: const []u8) (size | error);
// The interface for a stream which can be closed. This function should close
-// the underlying resources and free everything except for the stream pointer
-// itself. The other stream functions may be called after close is called; it is
-// their responsibility to return [io::closed] in this case.
+// and free any underlying resources, and cannot be used again.
export type closer = fn(s: *stream) void;
// The interface for a stream which has first-class support for copying data