hare

[hare] The Hare programming language
git clone https://git.torresjrjr.com/hare.git
Log | Files | Refs | README | LICENSE

stream.ha (5903B)


      1 // SPDX-License-Identifier: MPL-2.0
      2 // (c) Hare authors <https://harelang.org>
      3 
      4 use bytes;
      5 use encoding::utf8;
      6 use errors;
      7 use io;
      8 use strings;
      9 
     10 const vtable_r: io::vtable = io::vtable {
     11 	closer = &close_static,
     12 	reader = &read,
     13 	...
     14 };
     15 
     16 const vtable_w: io::vtable = io::vtable {
     17 	closer = &close_static,
     18 	writer = &write,
     19 	...
     20 };
     21 
     22 const vtable_rw: io::vtable = io::vtable {
     23 	closer = &close_static,
     24 	reader = &read,
     25 	writer = &write,
     26 	...
     27 };
     28 
     29 export type stream = struct {
     30 	stream: io::stream,
     31 	source: io::handle,
     32 	rbuffer: []u8,
     33 	wbuffer: []u8,
     34 	rpos: size,
     35 	ravail: size,
     36 	wavail: size,
     37 	flush: []u8,
     38 };
     39 
     40 // Creates a stream which buffers reads and writes for the underlying stream.
     41 // This is generally used to improve performance of small reads/writes for
     42 // sources where I/O operations are costly, such as if they invoke a syscall or
     43 // take place over the network.
     44 //
     45 // The caller should supply one or both of a read and write buffer as a slice of
     46 // the desired buffer, or empty slices if read or write functionality is
     47 // disabled. The same buffer may not be used for both reads and writes.
     48 //
     49 // The caller is responsible for closing the underlying stream, and freeing the
     50 // provided buffers if necessary, after the buffered stream is closed.
     51 //
     52 // 	let rbuf: [os::BUFSZ]u8 = [0...];
     53 // 	let wbuf: [os::BUFSZ]u8 = [0...];
     54 // 	let buffered = bufio::init(source, rbuf, wbuf);
     55 export fn init(
     56 	src: io::handle,
     57 	rbuf: []u8,
     58 	wbuf: []u8,
     59 ) stream = {
     60 	static let flush_default = ['\n': u8];
     61 
     62 	let st =
     63 		if (len(rbuf) != 0 && len(wbuf) != 0) {
     64 			assert(rbuf: *[*]u8 != wbuf: *[*]u8,
     65 				"Cannot use same buffer for reads and writes");
     66 			yield &vtable_rw;
     67 		} else if (len(rbuf) != 0) {
     68 			yield &vtable_r;
     69 		} else if (len(wbuf) != 0) {
     70 			yield &vtable_w;
     71 		} else {
     72 			abort("Must provide at least one buffer");
     73 		};
     74 
     75 	return stream {
     76 		stream = st,
     77 		source = src,
     78 		rbuffer = rbuf,
     79 		wbuffer = wbuf,
     80 		flush = flush_default,
     81 		rpos = len(rbuf), // necessary for unread() before read()
     82 		...
     83 	};
     84 };
     85 
     86 // Flushes pending writes to the underlying stream.
     87 export fn flush(s: io::handle) (void | io::error) = {
     88 	let s = match (s) {
     89 	case let st: *io::stream =>
     90 		if (st.writer != &write) {
     91 			return errors::unsupported;
     92 		};
     93 		yield st: *stream;
     94 	case =>
     95 		return errors::unsupported;
     96 	};
     97 	if (s.wavail == 0) {
     98 		return;
     99 	};
    100 	io::writeall(s.source, s.wbuffer[..s.wavail])?;
    101 	s.wavail = 0;
    102 	return;
    103 };
    104 
    105 // Sets the list of bytes which will cause the stream to flush when written. By
    106 // default, the stream will flush when a newline (\n) is written.
    107 export fn setflush(s: io::handle, b: []u8) void = {
    108 	let s = match (s) {
    109 	case let st: *io::stream =>
    110 		if (st.writer != &write) {
    111 			abort("Attempted to set flush bytes on unbuffered stream");
    112 		};
    113 		yield st: *stream;
    114 	case =>
    115 		abort("Attempted to set flush bytes on unbuffered stream");
    116 	};
    117 	s.flush = b;
    118 };
    119 
    120 // "Unreads" a slice of bytes, such that the next call to "read" will return
    121 // these bytes before reading any new data from the underlying source. The
    122 // unread data must fit into the read buffer's available space. The amount of
    123 // data which can be unread before the user makes any reads from a buffered
    124 // stream is equal to the length of the read buffer, and otherwise it is equal
    125 // to the length of the return value of the last call to [[io::read]] using this
    126 // buffered stream. Attempting to unread more data than can fit into the read
    127 // buffer will abort the program.
    128 export fn unread(s: io::handle, buf: []u8) void = {
    129 	match (s) {
    130 	case let st: *io::stream =>
    131 		switch (st.reader) {
    132 		case &read =>
    133 			stream_unread(s: *stream, buf);
    134 		case &scan_read =>
    135 			scan_unread(s: *scanner, buf);
    136 		case =>
    137 			abort("Attempted unread on unbuffered stream");
    138 		};
    139 	case =>
    140 		abort("Attempted unread on unbuffered stream");
    141 	};
    142 };
    143 
    144 fn stream_unread(s: *stream, buf: []u8) void = {
    145 	assert(s.rpos >= len(buf),
    146 		"Attempted to unread more data than buffer has available");
    147 	s.rbuffer[s.rpos - len(buf)..s.rpos] = buf;
    148 	s.rpos -= len(buf);
    149 	s.ravail += len(buf);
    150 };
    151 
    152 // Unreads a rune; see [[unread]].
    153 export fn unreadrune(s: io::handle, rn: rune) void = {
    154 	const buf = utf8::encoderune(rn);
    155 	unread(s, buf);
    156 };
    157 
    158 // Returns true if an [[io::handle]] is a [[stream]].
    159 export fn isbuffered(in: io::handle) bool = {
    160 	match (in) {
    161 	case io::file =>
    162 		return false;
    163 	case let st: *io::stream =>
    164 		return st.reader == &read || st.writer == &write;
    165 	};
    166 };
    167 
    168 fn close_static(s: *io::stream) (void | io::error) = {
    169 	assert(s.closer == &close_static);
    170 	if (s.writer != null) {
    171 		flush(s: *stream)?;
    172 	};
    173 };
    174 
    175 fn read(s: *io::stream, buf: []u8) (size | io::EOF | io::error) = {
    176 	assert(s.reader == &read);
    177 	let s = s: *stream;
    178 
    179 	if (s.ravail < len(buf) && s.ravail < len(s.rbuffer)) {
    180 		s.rbuffer[..s.ravail] = s.rbuffer[s.rpos..s.rpos + s.ravail];
    181 		s.rpos = 0;
    182 		match (io::read(s.source, s.rbuffer[s.ravail..])) {
    183 		case let err: io::error =>
    184 			return err;
    185 		case io::EOF =>
    186 			if (s.ravail == 0) {
    187 				return io::EOF;
    188 			};
    189 		case let z: size =>
    190 			s.ravail += z;
    191 		};
    192 	};
    193 
    194 	const n = if (len(buf) < s.ravail) len(buf) else s.ravail;
    195 	buf[..n] = s.rbuffer[s.rpos..s.rpos + n];
    196 	s.rpos += n;
    197 	s.ravail -= n;
    198 	return n;
    199 };
    200 
    201 fn write(s: *io::stream, buf: const []u8) (size | io::error) = {
    202 	assert(s.writer == &write);
    203 	let s = s: *stream;
    204 	let buf = buf;
    205 
    206 	let doflush = false;
    207 	if (len(s.flush) != 0) {
    208 		for :search (let i = 0z; i < len(buf); i += 1) {
    209 			for (let j = 0z; j < len(s.flush); j += 1) {
    210 				if (buf[i] == s.flush[j]) {
    211 					doflush = true;
    212 					break :search;
    213 				};
    214 			};
    215 		};
    216 	};
    217 
    218 	let z = 0z;
    219 	for (len(buf) > 0) {
    220 		let avail = len(s.wbuffer) - s.wavail;
    221 		if (avail == 0) {
    222 			flush(s)?;
    223 			avail = len(s.wbuffer);
    224 		};
    225 
    226 		const n = if (avail < len(buf)) avail else len(buf);
    227 		s.wbuffer[s.wavail..s.wavail + n] = buf[..n];
    228 		buf = buf[n..];
    229 		s.wavail += n;
    230 		z += n;
    231 	};
    232 
    233 	if (doflush) {
    234 		flush(s)?;
    235 	};
    236 
    237 	return z;
    238 };