hare

[hare] The Hare programming language
git clone https://git.torresjrjr.com/hare.git
Log | Files | Refs | README | LICENSE

buffered.ha (9384B)


      1 // License: MPL-2.0
      2 // (c) 2022 Alexey Yerin <yyp@disroot.org>
      3 // (c) 2021 Byron Torres <b@torresjrjr.com>
      4 // (c) 2021 Drew DeVault <sir@cmpwn.com>
      5 // (c) 2021 Ember Sawady <ecs@d2evs.net>
      6 use bytes;
      7 use encoding::utf8;
      8 use errors;
      9 use io;
     10 use strings;
     11 
     12 const buffered_vtable_r: io::vtable = io::vtable {
     13 	closer = &buffered_close_static,
     14 	reader = &buffered_read,
     15 	...
     16 };
     17 
     18 const buffered_vtable_w: io::vtable = io::vtable {
     19 	closer = &buffered_close_static,
     20 	writer = &buffered_write,
     21 	...
     22 };
     23 
     24 const buffered_vtable_rw: io::vtable = io::vtable {
     25 	closer = &buffered_close_static,
     26 	reader = &buffered_read,
     27 	writer = &buffered_write,
     28 	...
     29 };
     30 
     31 export type bufstream = struct {
     32 	stream: io::stream,
     33 	source: io::handle,
     34 	rbuffer: []u8,
     35 	wbuffer: []u8,
     36 	rpos: size,
     37 	ravail: size,
     38 	wavail: size,
     39 	flush: []u8,
     40 };
     41 
     42 // Creates a stream which buffers reads and writes for the underlying stream.
     43 // This is generally used to improve performance of small reads/writes for
     44 // sources where I/O operations are costly, such as if they invoke a syscall or
     45 // take place over the network.
     46 //
     47 // The caller should supply one or both of a read and write buffer as a slice of
     48 // the desired buffer, or empty slices if read or write functionality is
     49 // disabled. The same buffer may not be used for both reads and writes.
     50 //
     51 // The caller is responsible for closing the underlying stream, and freeing the
     52 // provided buffers if necessary, after the buffered stream is closed.
     53 //
     54 // 	let rbuf: [os::BUFSIZ]u8 = [0...];
     55 // 	let wbuf: [os::BUFSIZ]u8 = [0...];
     56 // 	let buffered = bufio::buffered(source, rbuf, wbuf);
     57 export fn buffered(
     58 	src: io::handle,
     59 	rbuf: []u8,
     60 	wbuf: []u8,
     61 ) bufstream = {
     62 	static let flush_default = ['\n': u8];
     63 
     64 	let stream =
     65 		if (len(rbuf) != 0 && len(wbuf) != 0) {
     66 			assert(rbuf: *[*]u8 != wbuf: *[*]u8,
     67 				"Cannot use bufio::buffered with same buffer for reads and writes");
     68 			yield &buffered_vtable_rw;
     69 		} else if (len(rbuf) != 0) {
     70 			yield &buffered_vtable_r;
     71 		} else if (len(wbuf) != 0) {
     72 			yield &buffered_vtable_w;
     73 		} else {
     74 			abort("Must provide at least one buffer to bufio::buffered");
     75 		};
     76 
     77 	return bufstream {
     78 		stream = stream,
     79 		source = src,
     80 		rbuffer = rbuf,
     81 		wbuffer = wbuf,
     82 		flush = flush_default,
     83 		rpos = len(rbuf), // necessary for unread() before read()
     84 		...
     85 	};
     86 };
     87 
     88 // Flushes pending writes to the underlying stream.
     89 export fn flush(s: io::handle) (void | io::error) = {
     90 	let s = match (s) {
     91 	case let st: *io::stream =>
     92 		if (st.writer != &buffered_write) {
     93 			return errors::unsupported;
     94 		};
     95 		yield st: *bufstream;
     96 	case =>
     97 		return errors::unsupported;
     98 	};
     99 	if (s.wavail == 0) {
    100 		return;
    101 	};
    102 	io::writeall(s.source, s.wbuffer[..s.wavail])?;
    103 	s.wavail = 0;
    104 	return;
    105 };
    106 
    107 // Sets the list of bytes which will cause the stream to flush when written. By
    108 // default, the stream will flush when a newline (\n) is written.
    109 export fn setflush(s: io::handle, b: []u8) void = {
    110 	let s = match (s) {
    111 	case let st: *io::stream =>
    112 		if (st.writer != &buffered_write) {
    113 			abort("Attempted to set flush bytes on unbuffered stream");
    114 		};
    115 		yield st: *bufstream;
    116 	case =>
    117 		abort("Attempted to set flush bytes on unbuffered stream");
    118 	};
    119 	s.flush = b;
    120 };
    121 
    122 // "Unreads" a slice of bytes, such that the next call to "read" will return
    123 // these bytes before reading any new data from the underlying source. The
    124 // unread data must fit into the read buffer's available space. The amount of
    125 // data which can be unread before the user makes any reads from a buffered
    126 // stream is equal to the length of the read buffer, and otherwise it is equal
    127 // to the length of the return value of the last call to [[io::read]] using this
    128 // buffered stream. Attempting to unread more data than can fit into the read
    129 // buffer will abort the program.
    130 export fn unread(s: io::handle, buf: []u8) void = {
    131 	let s = match (s) {
    132 	case let st: *io::stream =>
    133 		if (st.reader != &buffered_read) {
    134 			abort("Attempted unread on unbuffered stream");
    135 		};
    136 		yield st: *bufstream;
    137 	case =>
    138 		abort("Attempted unread on unbuffered stream");
    139 	};
    140 	assert(s.rpos >= len(buf),
    141 		"Attempted to unread more data than buffer has available");
    142 	s.rbuffer[s.rpos - len(buf)..s.rpos] = buf;
    143 	s.rpos -= len(buf);
    144 	s.ravail += len(buf);
    145 };
    146 
    147 // Unreads a rune; see [[unread]].
    148 export fn unreadrune(s: io::handle, rn: rune) void = {
    149 	const buf = utf8::encoderune(rn);
    150 	unread(s, buf);
    151 };
    152 
    153 // Returns true if an [[io::handle]] is a [[buffered]] stream.
    154 export fn isbuffered(in: io::handle) bool = {
    155 	match (in) {
    156 	case io::file =>
    157 		return false;
    158 	case let st: *io::stream =>
    159 		return st.reader == &buffered_read || st.writer == &buffered_write;
    160 	};
    161 };
    162 
    163 fn buffered_close_static(s: *io::stream) (void | io::error) = {
    164 	assert(s.closer == &buffered_close_static);
    165 	if (s.writer != null) {
    166 		flush(s: *bufstream)?;
    167 	};
    168 };
    169 
    170 fn buffered_read(s: *io::stream, buf: []u8) (size | io::EOF | io::error) = {
    171 	assert(s.reader == &buffered_read);
    172 	let s = s: *bufstream;
    173 
    174 	if (s.ravail < len(buf) && s.ravail < len(s.rbuffer)) {
    175 		s.rbuffer[..s.ravail] = s.rbuffer[s.rpos..s.rpos + s.ravail];
    176 		s.rpos = 0;
    177 		match (io::read(s.source, s.rbuffer[s.ravail..])) {
    178 		case let err: io::error =>
    179 			return err;
    180 		case io::EOF =>
    181 			if (s.ravail == 0) {
    182 				return io::EOF;
    183 			};
    184 		case let z: size =>
    185 			s.ravail += z;
    186 		};
    187 	};
    188 
    189 	const n = if (len(buf) < s.ravail) len(buf) else s.ravail;
    190 	buf[..n] = s.rbuffer[s.rpos..s.rpos + n];
    191 	s.rpos += n;
    192 	s.ravail -= n;
    193 	return n;
    194 };
    195 
    196 fn buffered_write(s: *io::stream, buf: const []u8) (size | io::error) = {
    197 	assert(s.writer == &buffered_write);
    198 	let s = s: *bufstream;
    199 	let buf = buf;
    200 
    201 	let doflush = false;
    202 	if (len(s.flush) != 0) {
    203 		for (let i = 0z; i < len(buf); i += 1) :search {
    204 			for (let j = 0z; j < len(s.flush); j += 1) {
    205 				if (buf[i] == s.flush[j]) {
    206 					doflush = true;
    207 					break :search;
    208 				};
    209 			};
    210 		};
    211 	};
    212 
    213 	let z = 0z;
    214 	for (len(buf) > 0) {
    215 		let avail = len(s.wbuffer) - s.wavail;
    216 		if (avail == 0) {
    217 			flush(s)?;
    218 			avail = len(s.wbuffer);
    219 		};
    220 
    221 		const n = if (avail < len(buf)) avail else len(buf);
    222 		s.wbuffer[s.wavail..s.wavail + n] = buf[..n];
    223 		buf = buf[n..];
    224 		s.wavail += n;
    225 		z += n;
    226 	};
    227 
    228 	if (doflush) {
    229 		flush(s)?;
    230 	};
    231 
    232 	return z;
    233 };
    234 
    235 @test fn buffered_read() void = {
    236 	let sourcebuf: []u8 = [1, 3, 3, 7];
    237 	let source = fixed(sourcebuf, io::mode::READ);
    238 	defer io::close(&source)!;
    239 
    240 	let rbuf: [1024]u8 = [0...];
    241 	let f = buffered(&source, rbuf, []);
    242 	defer io::close(&f)!;
    243 
    244 	let buf: [1024]u8 = [0...];
    245 	assert(io::read(&f, buf[..2]) as size == 2);
    246 	assert(source.pos == len(source.buf), "fixed stream was not fully consumed");
    247 	assert(bytes::equal(buf[..2], [1, 3]));
    248 
    249 	assert(io::read(&f, buf[2..]) as size == 2);
    250 	assert(bytes::equal(buf[..4], [1, 3, 3, 7]));
    251 	assert(io::read(&f, buf) is io::EOF);
    252 
    253 	let sourcebuf: [32]u8 = [1, 3, 3, 7, 0...];
    254 	let source = fixed(sourcebuf, io::mode::READ);
    255 
    256 	let rbuf: [16]u8 = [0...];
    257 	let f = buffered(&source, rbuf, []);
    258 	defer io::close(&f)!;
    259 
    260 	let buf: [32]u8 = [0...];
    261 	assert(io::read(&f, buf) as size == 16);
    262 	assert(source.pos == 16);
    263 
    264 	assert(io::read(&f, buf[16..]) as size == 16);
    265 	assert(bytes::equal(buf, sourcebuf));
    266 	assert(io::read(&f, buf) is io::EOF);
    267 	assert(source.pos == len(source.buf));
    268 };
    269 
    270 @test fn buffered_write() void = {
    271 	// Normal case
    272 	let sink = dynamic(io::mode::WRITE);
    273 	defer io::close(&sink)!;
    274 
    275 	let wbuf: [1024]u8 = [0...];
    276 	let f = buffered(&sink, [], wbuf);
    277 	defer io::close(&f)!;
    278 
    279 	assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4);
    280 	assert(len(buffer(&sink)) == 0);
    281 	assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4);
    282 	assert(flush(&f) is void);
    283 	assert(bytes::equal(buffer(&sink), [1, 3, 3, 7, 1, 3, 3, 7]));
    284 
    285 	// Test flushing via buffer exhaustion
    286 	let sink = dynamic(io::mode::WRITE);
    287 	defer io::close(&sink)!;
    288 
    289 	let wbuf: [4]u8 = [0...];
    290 	let f = buffered(&sink, [], wbuf);
    291 
    292 	assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4);
    293 	assert(len(buffer(&sink)) == 0);
    294 	assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4);
    295 	assert(bytes::equal(buffer(&sink), [1, 3, 3, 7]));
    296 	io::close(&f)!; // Should flush
    297 	assert(bytes::equal(buffer(&sink), [1, 3, 3, 7, 1, 3, 3, 7]));
    298 
    299 	// Test flushing via flush characters
    300 	let sink = dynamic(io::mode::WRITE);
    301 	defer io::close(&sink)!;
    302 
    303 	let wbuf: [1024]u8 = [0...];
    304 	let f = buffered(&sink, [], wbuf);
    305 
    306 	assert(io::writeall(&f, strings::toutf8("hello")) as size == 5);
    307 	assert(len(buffer(&sink)) == 0);
    308 	assert(io::writeall(&f, strings::toutf8(" world!\n")) as size == 8);
    309 	assert(bytes::equal(buffer(&sink), strings::toutf8("hello world!\n")));
    310 };
    311 
    312 @test fn unread() void = {
    313 	let rbuf: [8]u8 = [0...];
    314 	let f = buffered(io::zero, rbuf, []);
    315 
    316 	let buf: [16]u8 = [42...];
    317 	assert(io::read(&f, buf[..4]) as size == 4);
    318 	assert(buf[0] == 0);
    319 	assert(buf[1] == 0);
    320 	assert(buf[2] == 0);
    321 	assert(buf[3] == 0);
    322 	unread(&f, [1, 2, 3, 4]);
    323 
    324 	assert(io::read(&f, buf[..8]) as size == 8);
    325 	assert(buf[0] == 1);
    326 	assert(buf[1] == 2);
    327 	assert(buf[2] == 3);
    328 	assert(buf[3] == 4);
    329 	assert(buf[4] == 0);
    330 	assert(buf[5] == 0);
    331 	assert(buf[6] == 0);
    332 	assert(buf[7] == 0);
    333 
    334 	assert(io::read(&f, buf) as size == 8);
    335 	for (let i = 0z; i < 8; i += 1) {
    336 		assert(buf[i] == 0);
    337 	};
    338 
    339 	let input: []u8 = [1, 2, 3, 4];
    340 	let f = buffered(&fixed(input, io::mode::READ), rbuf, []);
    341 
    342 	assert(io::read(&f, buf) as size == 4);
    343 	unread(&f, [1, 2, 3, 4]);
    344 	assert(io::read(&f, buf) as size == 4);
    345 	assert(io::read(&f, buf) is io::EOF);
    346 };