stream.ha (5903B)
1 // SPDX-License-Identifier: MPL-2.0 2 // (c) Hare authors <https://harelang.org> 3 4 use bytes; 5 use encoding::utf8; 6 use errors; 7 use io; 8 use strings; 9 10 const vtable_r: io::vtable = io::vtable { 11 closer = &close_static, 12 reader = &read, 13 ... 14 }; 15 16 const vtable_w: io::vtable = io::vtable { 17 closer = &close_static, 18 writer = &write, 19 ... 20 }; 21 22 const vtable_rw: io::vtable = io::vtable { 23 closer = &close_static, 24 reader = &read, 25 writer = &write, 26 ... 27 }; 28 29 export type stream = struct { 30 stream: io::stream, 31 source: io::handle, 32 rbuffer: []u8, 33 wbuffer: []u8, 34 rpos: size, 35 ravail: size, 36 wavail: size, 37 flush: []u8, 38 }; 39 40 // Creates a stream which buffers reads and writes for the underlying stream. 41 // This is generally used to improve performance of small reads/writes for 42 // sources where I/O operations are costly, such as if they invoke a syscall or 43 // take place over the network. 44 // 45 // The caller should supply one or both of a read and write buffer as a slice of 46 // the desired buffer, or empty slices if read or write functionality is 47 // disabled. The same buffer may not be used for both reads and writes. 48 // 49 // The caller is responsible for closing the underlying stream, and freeing the 50 // provided buffers if necessary, after the buffered stream is closed. 51 // 52 // let rbuf: [os::BUFSZ]u8 = [0...]; 53 // let wbuf: [os::BUFSZ]u8 = [0...]; 54 // let buffered = bufio::init(source, rbuf, wbuf); 55 export fn init( 56 src: io::handle, 57 rbuf: []u8, 58 wbuf: []u8, 59 ) stream = { 60 static let flush_default = ['\n': u8]; 61 62 let st = 63 if (len(rbuf) != 0 && len(wbuf) != 0) { 64 assert(rbuf: *[*]u8 != wbuf: *[*]u8, 65 "Cannot use same buffer for reads and writes"); 66 yield &vtable_rw; 67 } else if (len(rbuf) != 0) { 68 yield &vtable_r; 69 } else if (len(wbuf) != 0) { 70 yield &vtable_w; 71 } else { 72 abort("Must provide at least one buffer"); 73 }; 74 75 return stream { 76 stream = st, 77 source = src, 78 rbuffer = rbuf, 79 wbuffer = wbuf, 80 flush = flush_default, 81 rpos = len(rbuf), // necessary for unread() before read() 82 ... 83 }; 84 }; 85 86 // Flushes pending writes to the underlying stream. 87 export fn flush(s: io::handle) (void | io::error) = { 88 let s = match (s) { 89 case let st: *io::stream => 90 if (st.writer != &write) { 91 return errors::unsupported; 92 }; 93 yield st: *stream; 94 case => 95 return errors::unsupported; 96 }; 97 if (s.wavail == 0) { 98 return; 99 }; 100 io::writeall(s.source, s.wbuffer[..s.wavail])?; 101 s.wavail = 0; 102 return; 103 }; 104 105 // Sets the list of bytes which will cause the stream to flush when written. By 106 // default, the stream will flush when a newline (\n) is written. 107 export fn setflush(s: io::handle, b: []u8) void = { 108 let s = match (s) { 109 case let st: *io::stream => 110 if (st.writer != &write) { 111 abort("Attempted to set flush bytes on unbuffered stream"); 112 }; 113 yield st: *stream; 114 case => 115 abort("Attempted to set flush bytes on unbuffered stream"); 116 }; 117 s.flush = b; 118 }; 119 120 // "Unreads" a slice of bytes, such that the next call to "read" will return 121 // these bytes before reading any new data from the underlying source. The 122 // unread data must fit into the read buffer's available space. The amount of 123 // data which can be unread before the user makes any reads from a buffered 124 // stream is equal to the length of the read buffer, and otherwise it is equal 125 // to the length of the return value of the last call to [[io::read]] using this 126 // buffered stream. Attempting to unread more data than can fit into the read 127 // buffer will abort the program. 128 export fn unread(s: io::handle, buf: []u8) void = { 129 match (s) { 130 case let st: *io::stream => 131 switch (st.reader) { 132 case &read => 133 stream_unread(s: *stream, buf); 134 case &scan_read => 135 scan_unread(s: *scanner, buf); 136 case => 137 abort("Attempted unread on unbuffered stream"); 138 }; 139 case => 140 abort("Attempted unread on unbuffered stream"); 141 }; 142 }; 143 144 fn stream_unread(s: *stream, buf: []u8) void = { 145 assert(s.rpos >= len(buf), 146 "Attempted to unread more data than buffer has available"); 147 s.rbuffer[s.rpos - len(buf)..s.rpos] = buf; 148 s.rpos -= len(buf); 149 s.ravail += len(buf); 150 }; 151 152 // Unreads a rune; see [[unread]]. 153 export fn unreadrune(s: io::handle, rn: rune) void = { 154 const buf = utf8::encoderune(rn); 155 unread(s, buf); 156 }; 157 158 // Returns true if an [[io::handle]] is a [[stream]]. 159 export fn isbuffered(in: io::handle) bool = { 160 match (in) { 161 case io::file => 162 return false; 163 case let st: *io::stream => 164 return st.reader == &read || st.writer == &write; 165 }; 166 }; 167 168 fn close_static(s: *io::stream) (void | io::error) = { 169 assert(s.closer == &close_static); 170 if (s.writer != null) { 171 flush(s: *stream)?; 172 }; 173 }; 174 175 fn read(s: *io::stream, buf: []u8) (size | io::EOF | io::error) = { 176 assert(s.reader == &read); 177 let s = s: *stream; 178 179 if (s.ravail < len(buf) && s.ravail < len(s.rbuffer)) { 180 s.rbuffer[..s.ravail] = s.rbuffer[s.rpos..s.rpos + s.ravail]; 181 s.rpos = 0; 182 match (io::read(s.source, s.rbuffer[s.ravail..])) { 183 case let err: io::error => 184 return err; 185 case io::EOF => 186 if (s.ravail == 0) { 187 return io::EOF; 188 }; 189 case let z: size => 190 s.ravail += z; 191 }; 192 }; 193 194 const n = if (len(buf) < s.ravail) len(buf) else s.ravail; 195 buf[..n] = s.rbuffer[s.rpos..s.rpos + n]; 196 s.rpos += n; 197 s.ravail -= n; 198 return n; 199 }; 200 201 fn write(s: *io::stream, buf: const []u8) (size | io::error) = { 202 assert(s.writer == &write); 203 let s = s: *stream; 204 let buf = buf; 205 206 let doflush = false; 207 if (len(s.flush) != 0) { 208 for :search (let i = 0z; i < len(buf); i += 1) { 209 for (let j = 0z; j < len(s.flush); j += 1) { 210 if (buf[i] == s.flush[j]) { 211 doflush = true; 212 break :search; 213 }; 214 }; 215 }; 216 }; 217 218 let z = 0z; 219 for (len(buf) > 0) { 220 let avail = len(s.wbuffer) - s.wavail; 221 if (avail == 0) { 222 flush(s)?; 223 avail = len(s.wbuffer); 224 }; 225 226 const n = if (avail < len(buf)) avail else len(buf); 227 s.wbuffer[s.wavail..s.wavail + n] = buf[..n]; 228 buf = buf[n..]; 229 s.wavail += n; 230 z += n; 231 }; 232 233 if (doflush) { 234 flush(s)?; 235 }; 236 237 return z; 238 };