hare

[hare] The Hare programming language
git clone https://git.torresjrjr.com/hare.git
Log | Files | Refs | README | LICENSE

stream.ha (5889B)


      1 // SPDX-License-Identifier: MPL-2.0
      2 // (c) Hare authors <https://harelang.org>
      3 
      4 use encoding::utf8;
      5 use errors;
      6 use io;
      7 
      8 const vtable_r: io::vtable = io::vtable {
      9 	closer = &close_static,
     10 	reader = &read,
     11 	...
     12 };
     13 
     14 const vtable_w: io::vtable = io::vtable {
     15 	closer = &close_static,
     16 	writer = &write,
     17 	...
     18 };
     19 
     20 const vtable_rw: io::vtable = io::vtable {
     21 	closer = &close_static,
     22 	reader = &read,
     23 	writer = &write,
     24 	...
     25 };
     26 
     27 export type stream = struct {
     28 	stream: io::stream,
     29 	source: io::handle,
     30 	rbuffer: []u8,
     31 	wbuffer: []u8,
     32 	rpos: size,
     33 	ravail: size,
     34 	wavail: size,
     35 	flush: []u8,
     36 };
     37 
     38 // Creates a stream which buffers reads and writes for the underlying stream.
     39 // This is generally used to improve performance of small reads/writes for
     40 // sources where I/O operations are costly, such as if they invoke a syscall or
     41 // take place over the network.
     42 //
     43 // The caller should supply one or both of a read and write buffer as a slice of
     44 // the desired buffer, or empty slices if read or write functionality is
     45 // disabled. The same buffer may not be used for both reads and writes.
     46 //
     47 // The caller is responsible for closing the underlying stream, and freeing the
     48 // provided buffers if necessary, after the buffered stream is closed.
     49 //
     50 // 	let rbuf: [os::BUFSZ]u8 = [0...];
     51 // 	let wbuf: [os::BUFSZ]u8 = [0...];
     52 // 	let buffered = bufio::init(source, rbuf, wbuf);
     53 export fn init(
     54 	src: io::handle,
     55 	rbuf: []u8,
     56 	wbuf: []u8,
     57 ) stream = {
     58 	static let flush_default = ['\n': u8];
     59 
     60 	let st =
     61 		if (len(rbuf) != 0 && len(wbuf) != 0) {
     62 			assert(rbuf: *[*]u8 != wbuf: *[*]u8,
     63 				"Cannot use same buffer for reads and writes");
     64 			yield &vtable_rw;
     65 		} else if (len(rbuf) != 0) {
     66 			yield &vtable_r;
     67 		} else if (len(wbuf) != 0) {
     68 			yield &vtable_w;
     69 		} else {
     70 			abort("Must provide at least one buffer");
     71 		};
     72 
     73 	return stream {
     74 		stream = st,
     75 		source = src,
     76 		rbuffer = rbuf,
     77 		wbuffer = wbuf,
     78 		flush = flush_default,
     79 		rpos = len(rbuf), // necessary for unread() before read()
     80 		...
     81 	};
     82 };
     83 
     84 // Flushes pending writes to the underlying stream.
     85 export fn flush(s: io::handle) (void | io::error) = {
     86 	let s = match (s) {
     87 	case let st: *io::stream =>
     88 		if (st.writer != &write) {
     89 			return errors::unsupported;
     90 		};
     91 		yield st: *stream;
     92 	case =>
     93 		return errors::unsupported;
     94 	};
     95 	if (s.wavail == 0) {
     96 		return;
     97 	};
     98 	io::writeall(s.source, s.wbuffer[..s.wavail])?;
     99 	s.wavail = 0;
    100 	return;
    101 };
    102 
    103 // Sets the list of bytes which will cause the stream to flush when written. By
    104 // default, the stream will flush when a newline (\n) is written.
    105 export fn setflush(s: io::handle, b: []u8) void = {
    106 	let s = match (s) {
    107 	case let st: *io::stream =>
    108 		if (st.writer != &write) {
    109 			abort("Attempted to set flush bytes on unbuffered stream");
    110 		};
    111 		yield st: *stream;
    112 	case =>
    113 		abort("Attempted to set flush bytes on unbuffered stream");
    114 	};
    115 	s.flush = b;
    116 };
    117 
    118 // "Unreads" a slice of bytes, such that the next call to "read" will return
    119 // these bytes before reading any new data from the underlying source. The
    120 // unread data must fit into the read buffer's available space. The amount of
    121 // data which can be unread before the user makes any reads from a buffered
    122 // stream is equal to the length of the read buffer, and otherwise it is equal
    123 // to the length of the return value of the last call to [[io::read]] using this
    124 // buffered stream. Attempting to unread more data than can fit into the read
    125 // buffer will abort the program.
    126 export fn unread(s: io::handle, buf: []u8) void = {
    127 	match (s) {
    128 	case let st: *io::stream =>
    129 		if (st.reader == &read) {
    130 			stream_unread(s: *stream, buf);
    131 		} else if (st.reader == &scan_read) {
    132 			scan_unread(s: *scanner, buf);
    133 		} else {
    134 			abort("Attempted unread on unbuffered stream");
    135 		};
    136 	case =>
    137 		abort("Attempted unread on unbuffered stream");
    138 	};
    139 };
    140 
    141 fn stream_unread(s: *stream, buf: []u8) void = {
    142 	assert(s.rpos >= len(buf),
    143 		"Attempted to unread more data than buffer has available");
    144 	s.rbuffer[s.rpos - len(buf)..s.rpos] = buf;
    145 	s.rpos -= len(buf);
    146 	s.ravail += len(buf);
    147 };
    148 
    149 // Unreads a rune; see [[unread]].
    150 export fn unreadrune(s: io::handle, rn: rune) void = {
    151 	const buf = utf8::encoderune(rn)!;
    152 	unread(s, buf);
    153 };
    154 
    155 // Returns true if an [[io::handle]] is a [[stream]].
    156 export fn isbuffered(in: io::handle) bool = {
    157 	match (in) {
    158 	case io::file =>
    159 		return false;
    160 	case let st: *io::stream =>
    161 		return st.reader == &read || st.writer == &write;
    162 	};
    163 };
    164 
    165 fn close_static(s: *io::stream) (void | io::error) = {
    166 	assert(s.closer == &close_static);
    167 	if (s.writer != null) {
    168 		flush(s: *stream)?;
    169 	};
    170 };
    171 
    172 fn read(s: *io::stream, buf: []u8) (size | io::EOF | io::error) = {
    173 	assert(s.reader == &read);
    174 	let s = s: *stream;
    175 
    176 	if (s.ravail < len(buf) && s.ravail < len(s.rbuffer)) {
    177 		s.rbuffer[..s.ravail] = s.rbuffer[s.rpos..s.rpos + s.ravail];
    178 		s.rpos = 0;
    179 		match (io::read(s.source, s.rbuffer[s.ravail..])) {
    180 		case let err: io::error =>
    181 			return err;
    182 		case io::EOF =>
    183 			if (s.ravail == 0) {
    184 				return io::EOF;
    185 			};
    186 		case let z: size =>
    187 			s.ravail += z;
    188 		};
    189 	};
    190 
    191 	const n = if (len(buf) < s.ravail) len(buf) else s.ravail;
    192 	buf[..n] = s.rbuffer[s.rpos..s.rpos + n];
    193 	s.rpos += n;
    194 	s.ravail -= n;
    195 	return n;
    196 };
    197 
    198 fn write(s: *io::stream, buf: const []u8) (size | io::error) = {
    199 	assert(s.writer == &write);
    200 	let s = s: *stream;
    201 	let buf = buf;
    202 
    203 	let doflush = false;
    204 	if (len(s.flush) != 0) {
    205 		for :search (let i = 0z; i < len(buf); i += 1) {
    206 			for (let j = 0z; j < len(s.flush); j += 1) {
    207 				if (buf[i] == s.flush[j]) {
    208 					doflush = true;
    209 					break :search;
    210 				};
    211 			};
    212 		};
    213 	};
    214 
    215 	let z = 0z;
    216 	for (len(buf) > 0) {
    217 		let avail = len(s.wbuffer) - s.wavail;
    218 		if (avail == 0) {
    219 			flush(s)?;
    220 			avail = len(s.wbuffer);
    221 		};
    222 
    223 		const n = if (avail < len(buf)) avail else len(buf);
    224 		s.wbuffer[s.wavail..s.wavail + n] = buf[..n];
    225 		buf = buf[n..];
    226 		s.wavail += n;
    227 		z += n;
    228 	};
    229 
    230 	if (doflush) {
    231 		flush(s)?;
    232 	};
    233 
    234 	return z;
    235 };