hare

[hare] The Hare programming language
git clone https://git.torresjrjr.com/hare.git
Log | Files | Refs | README | LICENSE

buffered.ha (9254B)


      1 // License: MPL-2.0
      2 // (c) 2022 Alexey Yerin <yyp@disroot.org>
      3 // (c) 2021 Byron Torres <b@torresjrjr.com>
      4 // (c) 2021 Drew DeVault <sir@cmpwn.com>
      5 // (c) 2021 Eyal Sawady <ecs@d2evs.net>
      6 use bytes;
      7 use encoding::utf8;
      8 use errors;
      9 use io;
     10 use strings;
     11 
     12 const buffered_vtable_r: io::vtable = io::vtable {
     13 	closer = &buffered_close_static,
     14 	reader = &buffered_read,
     15 	...
     16 };
     17 
     18 const buffered_vtable_w: io::vtable = io::vtable {
     19 	closer = &buffered_close_static,
     20 	writer = &buffered_write,
     21 	...
     22 };
     23 
     24 const buffered_vtable_rw: io::vtable = io::vtable {
     25 	closer = &buffered_close_static,
     26 	reader = &buffered_read,
     27 	writer = &buffered_write,
     28 	...
     29 };
     30 
     31 export type bufstream = struct {
     32 	stream: io::stream,
     33 	source: io::handle,
     34 	rbuffer: []u8,
     35 	wbuffer: []u8,
     36 	ravail: size,
     37 	wavail: size,
     38 	flush: []u8,
     39 };
     40 
     41 // Creates a stream which buffers reads and writes for the underlying stream.
     42 // This is generally used to improve performance of small reads/writes for
     43 // sources where I/O operations are costly, such as if they invoke a syscall or
     44 // take place over the network.
     45 //
     46 // The caller should supply one or both of a read and write buffer as a slice of
     47 // the desired buffer, or empty slices if read or write functionality is
     48 // disabled. The same buffer may not be used for both reads and writes.
     49 //
     50 // The caller is responsible for closing the underlying stream, and freeing the
     51 // provided buffers if necessary, after the buffered stream is closed.
     52 //
     53 // 	let rbuf: [os::BUFSIZ]u8 = [0...];
     54 // 	let wbuf: [os::BUFSIZ]u8 = [0...];
     55 // 	let buffered = bufio::buffered(source, rbuf, wbuf);
     56 export fn buffered(
     57 	src: io::handle,
     58 	rbuf: []u8,
     59 	wbuf: []u8,
     60 ) bufstream = {
     61 	static let flush_default = ['\n': u8];
     62 	let s = bufstream {
     63 		source = src,
     64 		rbuffer = rbuf,
     65 		wbuffer = wbuf,
     66 		flush = flush_default,
     67 		...
     68 	};
     69 	if (len(rbuf) != 0 && len(wbuf) != 0) {
     70 		assert(rbuf: *[*]u8 != wbuf: *[*]u8,
     71 			"Cannot use bufio::buffered with same buffer for reads and writes");
     72 		s.stream = &buffered_vtable_rw;
     73 	} else if (len(rbuf) != 0) {
     74 		s.stream = &buffered_vtable_r;
     75 	} else if (len(wbuf) != 0) {
     76 		s.stream = &buffered_vtable_w;
     77 	} else abort("Must provide at least one buffer to bufio::buffered");
     78 	return s;
     79 };
     80 
     81 // Flushes pending writes to the underlying stream.
     82 export fn flush(s: io::handle) (io::error | void) = {
     83 	let s = match (s) {
     84 	case let st: *io::stream =>
     85 		if (st.writer != &buffered_write) {
     86 			return errors::unsupported;
     87 		};
     88 		yield st: *bufstream;
     89 	case =>
     90 		return errors::unsupported;
     91 	};
     92 	if (s.wavail == 0) {
     93 		return;
     94 	};
     95 	io::writeall(s.source, s.wbuffer[..s.wavail])?;
     96 	s.wavail = 0;
     97 	return;
     98 };
     99 
    100 // Sets the list of bytes which will cause the stream to flush when written. By
    101 // default, the stream will flush when a newline (\n) is written.
    102 export fn setflush(s: io::handle, b: []u8) void = {
    103 	let s = match (s) {
    104 	case let st: *io::stream =>
    105 		if (st.writer != &buffered_write) {
    106 			abort("Attempted to set flush bytes on unbuffered stream");
    107 		};
    108 		yield st: *bufstream;
    109 	case =>
    110 		abort("Attempted to set flush bytes on unbuffered stream");
    111 	};
    112 	s.flush = b;
    113 };
    114 
    115 // "Unreads" a slice of bytes, such that the next call to "read" will return
    116 // these bytes before reading any new data from the underlying source. The
    117 // unread data must fit into the read buffer's available space. The amount of
    118 // data which can be unread before the user makes any reads from a buffered
    119 // stream is equal to the length of the read buffer, and otherwise it is equal
    120 // to the length of the return value of the last call to [[io::read]] using this
    121 // buffered stream. Attempting to unread more data than can fit into the read
    122 // buffer will abort the program.
    123 export fn unread(s: io::handle, buf: []u8) void = {
    124 	let s = match (s) {
    125 	case let st: *io::stream =>
    126 		if (st.reader != &buffered_read) {
    127 			abort("Attempted unread on unbuffered stream");
    128 		};
    129 		yield st: *bufstream;
    130 	case =>
    131 		abort("Attempted unread on unbuffered stream");
    132 	};
    133 	assert(len(s.rbuffer) - s.ravail >= len(buf),
    134 		"Attempted to unread more data than buffer has available");
    135 	let sl = s.rbuffer[..s.ravail];
    136 	static insert(sl[0], buf...);
    137 	s.ravail += len(buf);
    138 };
    139 
    140 // Unreads a rune; see [[unread]].
    141 export fn unreadrune(s: io::handle, rn: rune) void = {
    142 	const buf = utf8::encoderune(rn);
    143 	unread(s, buf);
    144 };
    145 
    146 // Returns true if an [[io::handle]] is a [[buffered]] stream.
    147 export fn isbuffered(in: io::handle) bool = {
    148 	match (in) {
    149 	case io::file =>
    150 		return false;
    151 	case let st: *io::stream =>
    152 		return st.reader == &buffered_read || st.writer == &buffered_write;
    153 	};
    154 };
    155 
    156 fn buffered_close_static(s: *io::stream) (void | io::error) = {
    157 	assert(s.closer == &buffered_close_static);
    158 	if (s.writer != null) {
    159 		flush(s: *bufstream)?;
    160 	};
    161 };
    162 
    163 fn buffered_read(s: *io::stream, buf: []u8) (size | io::EOF | io::error) = {
    164 	assert(s.reader == &buffered_read);
    165 	let s = s: *bufstream;
    166 	let n = if (len(buf) < len(s.rbuffer)) len(buf) else len(s.rbuffer);
    167 	if (n > s.ravail) {
    168 		let z = match (io::read(s.source, s.rbuffer[s.ravail..])) {
    169 		case let err: io::error =>
    170 			return err;
    171 		case io::EOF =>
    172 			if (s.ravail == 0) {
    173 				return io::EOF;
    174 			};
    175 			yield 0z;
    176 		case let z: size =>
    177 			yield z;
    178 		};
    179 		s.ravail += z;
    180 		n = if (n > s.ravail) s.ravail else n;
    181 		assert(n != 0);
    182 	};
    183 
    184 	buf[..n] = s.rbuffer[..n];
    185 	s.rbuffer[..len(s.rbuffer) - n] = s.rbuffer[n..];
    186 	s.ravail -= n;
    187 	return n;
    188 };
    189 
    190 fn buffered_write(s: *io::stream, buf: const []u8) (size | io::error) = {
    191 	assert(s.writer == &buffered_write);
    192 	let s = s: *bufstream;
    193 	let buf = buf;
    194 
    195 	let doflush = false;
    196 	for (let i = 0z; i < len(buf); i += 1) {
    197 		for (let j = 0z; j < len(s.flush); j += 1) {
    198 			if (buf[i] == s.flush[j]) {
    199 				doflush = true;
    200 				break;
    201 			};
    202 		};
    203 	};
    204 
    205 	let z = 0z;
    206 	for (len(buf) > 0) {
    207 		let avail = len(s.wbuffer) - s.wavail;
    208 		if (avail == 0) {
    209 			flush(s)?;
    210 			avail = len(s.wbuffer);
    211 		};
    212 
    213 		const n = if (avail < len(buf)) avail else len(buf);
    214 		s.wbuffer[s.wavail..s.wavail + n] = buf[..n];
    215 		buf = buf[n..];
    216 		s.wavail += n;
    217 		z += n;
    218 	};
    219 
    220 	if (doflush) {
    221 		flush(s)?;
    222 	};
    223 
    224 	return z;
    225 };
    226 
    227 @test fn buffered_read() void = {
    228 	let sourcebuf: []u8 = [1, 3, 3, 7];
    229 	let source = fixed(sourcebuf, io::mode::READ);
    230 	defer io::close(&source)!;
    231 
    232 	let rbuf: [1024]u8 = [0...];
    233 	let f = buffered(&source, rbuf, []);
    234 	defer io::close(&f)!;
    235 
    236 	let buf: [1024]u8 = [0...];
    237 	assert(io::read(&f, buf[..2]) as size == 2);
    238 	assert(source.pos == len(source.buf), "fixed stream was not fully consumed");
    239 	assert(bytes::equal(buf[..2], [1, 3]));
    240 
    241 	assert(io::read(&f, buf[2..]) as size == 2);
    242 	assert(bytes::equal(buf[..4], [1, 3, 3, 7]));
    243 	assert(io::read(&f, buf) is io::EOF);
    244 
    245 	let sourcebuf: [32]u8 = [1, 3, 3, 7, 0...];
    246 	let source = fixed(sourcebuf, io::mode::READ);
    247 
    248 	let rbuf: [16]u8 = [0...];
    249 	let f = buffered(&source, rbuf, []);
    250 	defer io::close(&f)!;
    251 
    252 	let buf: [32]u8 = [0...];
    253 	assert(io::read(&f, buf) as size == 16);
    254 	assert(source.pos == 16);
    255 
    256 	assert(io::read(&f, buf[16..]) as size == 16);
    257 	assert(bytes::equal(buf, sourcebuf));
    258 	assert(io::read(&f, buf) is io::EOF);
    259 	assert(source.pos == len(source.buf));
    260 };
    261 
    262 @test fn buffered_write() void = {
    263 	// Normal case
    264 	let sink = dynamic(io::mode::WRITE);
    265 	defer io::close(&sink)!;
    266 
    267 	let wbuf: [1024]u8 = [0...];
    268 	let f = buffered(&sink, [], wbuf);
    269 	defer io::close(&f)!;
    270 
    271 	assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4);
    272 	assert(len(buffer(&sink)) == 0);
    273 	assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4);
    274 	assert(flush(&f) is void);
    275 	assert(bytes::equal(buffer(&sink), [1, 3, 3, 7, 1, 3, 3, 7]));
    276 
    277 	// Test flushing via buffer exhaustion
    278 	let sink = dynamic(io::mode::WRITE);
    279 	defer io::close(&sink)!;
    280 
    281 	let wbuf: [4]u8 = [0...];
    282 	let f = buffered(&sink, [], wbuf);
    283 
    284 	assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4);
    285 	assert(len(buffer(&sink)) == 0);
    286 	assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4);
    287 	assert(bytes::equal(buffer(&sink), [1, 3, 3, 7]));
    288 	io::close(&f)!; // Should flush
    289 	assert(bytes::equal(buffer(&sink), [1, 3, 3, 7, 1, 3, 3, 7]));
    290 
    291 	// Test flushing via flush characters
    292 	let sink = dynamic(io::mode::WRITE);
    293 	defer io::close(&sink)!;
    294 
    295 	let wbuf: [1024]u8 = [0...];
    296 	let f = buffered(&sink, [], wbuf);
    297 
    298 	assert(io::writeall(&f, strings::toutf8("hello")) as size == 5);
    299 	assert(len(buffer(&sink)) == 0);
    300 	assert(io::writeall(&f, strings::toutf8(" world!\n")) as size == 8);
    301 	assert(bytes::equal(buffer(&sink), strings::toutf8("hello world!\n")));
    302 };
    303 
    304 @test fn unread() void = {
    305 	let rbuf: [8]u8 = [0...];
    306 	let f = buffered(io::zero, rbuf, []);
    307 
    308 	let buf: [16]u8 = [42...];
    309 	assert(io::read(&f, buf[..4]) as size == 4);
    310 	assert(buf[0] == 0);
    311 	assert(buf[1] == 0);
    312 	assert(buf[2] == 0);
    313 	assert(buf[3] == 0);
    314 	unread(&f, [1, 2, 3, 4]);
    315 
    316 	assert(io::read(&f, buf[..8]) as size == 8);
    317 	assert(buf[0] == 1);
    318 	assert(buf[1] == 2);
    319 	assert(buf[2] == 3);
    320 	assert(buf[3] == 4);
    321 	assert(buf[4] == 0);
    322 	assert(buf[5] == 0);
    323 	assert(buf[6] == 0);
    324 	assert(buf[7] == 0);
    325 
    326 	assert(io::read(&f, buf) as size == 8);
    327 	for (let i = 0z; i < 8; i += 1) {
    328 		assert(buf[i] == 0);
    329 	};
    330 
    331 	let input: []u8 = [1, 2, 3, 4];
    332 	let f = buffered(&fixed(input, io::mode::READ), rbuf, []);
    333 
    334 	assert(io::read(&f, buf) as size == 4);
    335 	unread(&f, [1, 2, 3, 4]);
    336 	assert(io::read(&f, buf) as size == 4);
    337 	assert(io::read(&f, buf) is io::EOF);
    338 };