stream.ha (5889B)
1 // SPDX-License-Identifier: MPL-2.0 2 // (c) Hare authors <https://harelang.org> 3 4 use encoding::utf8; 5 use errors; 6 use io; 7 8 const vtable_r: io::vtable = io::vtable { 9 closer = &close_static, 10 reader = &read, 11 ... 12 }; 13 14 const vtable_w: io::vtable = io::vtable { 15 closer = &close_static, 16 writer = &write, 17 ... 18 }; 19 20 const vtable_rw: io::vtable = io::vtable { 21 closer = &close_static, 22 reader = &read, 23 writer = &write, 24 ... 25 }; 26 27 export type stream = struct { 28 stream: io::stream, 29 source: io::handle, 30 rbuffer: []u8, 31 wbuffer: []u8, 32 rpos: size, 33 ravail: size, 34 wavail: size, 35 flush: []u8, 36 }; 37 38 // Creates a stream which buffers reads and writes for the underlying stream. 39 // This is generally used to improve performance of small reads/writes for 40 // sources where I/O operations are costly, such as if they invoke a syscall or 41 // take place over the network. 42 // 43 // The caller should supply one or both of a read and write buffer as a slice of 44 // the desired buffer, or empty slices if read or write functionality is 45 // disabled. The same buffer may not be used for both reads and writes. 46 // 47 // The caller is responsible for closing the underlying stream, and freeing the 48 // provided buffers if necessary, after the buffered stream is closed. 49 // 50 // let rbuf: [os::BUFSZ]u8 = [0...]; 51 // let wbuf: [os::BUFSZ]u8 = [0...]; 52 // let buffered = bufio::init(source, rbuf, wbuf); 53 export fn init( 54 src: io::handle, 55 rbuf: []u8, 56 wbuf: []u8, 57 ) stream = { 58 static let flush_default = ['\n': u8]; 59 60 let st = 61 if (len(rbuf) != 0 && len(wbuf) != 0) { 62 assert(rbuf: *[*]u8 != wbuf: *[*]u8, 63 "Cannot use same buffer for reads and writes"); 64 yield &vtable_rw; 65 } else if (len(rbuf) != 0) { 66 yield &vtable_r; 67 } else if (len(wbuf) != 0) { 68 yield &vtable_w; 69 } else { 70 abort("Must provide at least one buffer"); 71 }; 72 73 return stream { 74 stream = st, 75 source = src, 76 rbuffer = rbuf, 77 wbuffer = wbuf, 78 flush = flush_default, 79 rpos = len(rbuf), // necessary for unread() before read() 80 ... 81 }; 82 }; 83 84 // Flushes pending writes to the underlying stream. 85 export fn flush(s: io::handle) (void | io::error) = { 86 let s = match (s) { 87 case let st: *io::stream => 88 if (st.writer != &write) { 89 return errors::unsupported; 90 }; 91 yield st: *stream; 92 case => 93 return errors::unsupported; 94 }; 95 if (s.wavail == 0) { 96 return; 97 }; 98 io::writeall(s.source, s.wbuffer[..s.wavail])?; 99 s.wavail = 0; 100 return; 101 }; 102 103 // Sets the list of bytes which will cause the stream to flush when written. By 104 // default, the stream will flush when a newline (\n) is written. 105 export fn setflush(s: io::handle, b: []u8) void = { 106 let s = match (s) { 107 case let st: *io::stream => 108 if (st.writer != &write) { 109 abort("Attempted to set flush bytes on unbuffered stream"); 110 }; 111 yield st: *stream; 112 case => 113 abort("Attempted to set flush bytes on unbuffered stream"); 114 }; 115 s.flush = b; 116 }; 117 118 // "Unreads" a slice of bytes, such that the next call to "read" will return 119 // these bytes before reading any new data from the underlying source. The 120 // unread data must fit into the read buffer's available space. The amount of 121 // data which can be unread before the user makes any reads from a buffered 122 // stream is equal to the length of the read buffer, and otherwise it is equal 123 // to the length of the return value of the last call to [[io::read]] using this 124 // buffered stream. Attempting to unread more data than can fit into the read 125 // buffer will abort the program. 126 export fn unread(s: io::handle, buf: []u8) void = { 127 match (s) { 128 case let st: *io::stream => 129 if (st.reader == &read) { 130 stream_unread(s: *stream, buf); 131 } else if (st.reader == &scan_read) { 132 scan_unread(s: *scanner, buf); 133 } else { 134 abort("Attempted unread on unbuffered stream"); 135 }; 136 case => 137 abort("Attempted unread on unbuffered stream"); 138 }; 139 }; 140 141 fn stream_unread(s: *stream, buf: []u8) void = { 142 assert(s.rpos >= len(buf), 143 "Attempted to unread more data than buffer has available"); 144 s.rbuffer[s.rpos - len(buf)..s.rpos] = buf; 145 s.rpos -= len(buf); 146 s.ravail += len(buf); 147 }; 148 149 // Unreads a rune; see [[unread]]. 150 export fn unreadrune(s: io::handle, rn: rune) void = { 151 const buf = utf8::encoderune(rn)!; 152 unread(s, buf); 153 }; 154 155 // Returns true if an [[io::handle]] is a [[stream]]. 156 export fn isbuffered(in: io::handle) bool = { 157 match (in) { 158 case io::file => 159 return false; 160 case let st: *io::stream => 161 return st.reader == &read || st.writer == &write; 162 }; 163 }; 164 165 fn close_static(s: *io::stream) (void | io::error) = { 166 assert(s.closer == &close_static); 167 if (s.writer != null) { 168 flush(s: *stream)?; 169 }; 170 }; 171 172 fn read(s: *io::stream, buf: []u8) (size | io::EOF | io::error) = { 173 assert(s.reader == &read); 174 let s = s: *stream; 175 176 if (s.ravail < len(buf) && s.ravail < len(s.rbuffer)) { 177 s.rbuffer[..s.ravail] = s.rbuffer[s.rpos..s.rpos + s.ravail]; 178 s.rpos = 0; 179 match (io::read(s.source, s.rbuffer[s.ravail..])) { 180 case let err: io::error => 181 return err; 182 case io::EOF => 183 if (s.ravail == 0) { 184 return io::EOF; 185 }; 186 case let z: size => 187 s.ravail += z; 188 }; 189 }; 190 191 const n = if (len(buf) < s.ravail) len(buf) else s.ravail; 192 buf[..n] = s.rbuffer[s.rpos..s.rpos + n]; 193 s.rpos += n; 194 s.ravail -= n; 195 return n; 196 }; 197 198 fn write(s: *io::stream, buf: const []u8) (size | io::error) = { 199 assert(s.writer == &write); 200 let s = s: *stream; 201 let buf = buf; 202 203 let doflush = false; 204 if (len(s.flush) != 0) { 205 for :search (let i = 0z; i < len(buf); i += 1) { 206 for (let j = 0z; j < len(s.flush); j += 1) { 207 if (buf[i] == s.flush[j]) { 208 doflush = true; 209 break :search; 210 }; 211 }; 212 }; 213 }; 214 215 let z = 0z; 216 for (len(buf) > 0) { 217 let avail = len(s.wbuffer) - s.wavail; 218 if (avail == 0) { 219 flush(s)?; 220 avail = len(s.wbuffer); 221 }; 222 223 const n = if (avail < len(buf)) avail else len(buf); 224 s.wbuffer[s.wavail..s.wavail + n] = buf[..n]; 225 buf = buf[n..]; 226 s.wavail += n; 227 z += n; 228 }; 229 230 if (doflush) { 231 flush(s)?; 232 }; 233 234 return z; 235 };