buffered.ha (9384B)
1 // License: MPL-2.0 2 // (c) 2022 Alexey Yerin <yyp@disroot.org> 3 // (c) 2021 Byron Torres <b@torresjrjr.com> 4 // (c) 2021 Drew DeVault <sir@cmpwn.com> 5 // (c) 2021 Ember Sawady <ecs@d2evs.net> 6 use bytes; 7 use encoding::utf8; 8 use errors; 9 use io; 10 use strings; 11 12 const buffered_vtable_r: io::vtable = io::vtable { 13 closer = &buffered_close_static, 14 reader = &buffered_read, 15 ... 16 }; 17 18 const buffered_vtable_w: io::vtable = io::vtable { 19 closer = &buffered_close_static, 20 writer = &buffered_write, 21 ... 22 }; 23 24 const buffered_vtable_rw: io::vtable = io::vtable { 25 closer = &buffered_close_static, 26 reader = &buffered_read, 27 writer = &buffered_write, 28 ... 29 }; 30 31 export type bufstream = struct { 32 stream: io::stream, 33 source: io::handle, 34 rbuffer: []u8, 35 wbuffer: []u8, 36 rpos: size, 37 ravail: size, 38 wavail: size, 39 flush: []u8, 40 }; 41 42 // Creates a stream which buffers reads and writes for the underlying stream. 43 // This is generally used to improve performance of small reads/writes for 44 // sources where I/O operations are costly, such as if they invoke a syscall or 45 // take place over the network. 46 // 47 // The caller should supply one or both of a read and write buffer as a slice of 48 // the desired buffer, or empty slices if read or write functionality is 49 // disabled. The same buffer may not be used for both reads and writes. 50 // 51 // The caller is responsible for closing the underlying stream, and freeing the 52 // provided buffers if necessary, after the buffered stream is closed. 53 // 54 // let rbuf: [os::BUFSIZ]u8 = [0...]; 55 // let wbuf: [os::BUFSIZ]u8 = [0...]; 56 // let buffered = bufio::buffered(source, rbuf, wbuf); 57 export fn buffered( 58 src: io::handle, 59 rbuf: []u8, 60 wbuf: []u8, 61 ) bufstream = { 62 static let flush_default = ['\n': u8]; 63 64 let stream = 65 if (len(rbuf) != 0 && len(wbuf) != 0) { 66 assert(rbuf: *[*]u8 != wbuf: *[*]u8, 67 "Cannot use bufio::buffered with same buffer for reads and writes"); 68 yield &buffered_vtable_rw; 69 } else if (len(rbuf) != 0) { 70 yield &buffered_vtable_r; 71 } else if (len(wbuf) != 0) { 72 yield &buffered_vtable_w; 73 } else { 74 abort("Must provide at least one buffer to bufio::buffered"); 75 }; 76 77 return bufstream { 78 stream = stream, 79 source = src, 80 rbuffer = rbuf, 81 wbuffer = wbuf, 82 flush = flush_default, 83 rpos = len(rbuf), // necessary for unread() before read() 84 ... 85 }; 86 }; 87 88 // Flushes pending writes to the underlying stream. 89 export fn flush(s: io::handle) (void | io::error) = { 90 let s = match (s) { 91 case let st: *io::stream => 92 if (st.writer != &buffered_write) { 93 return errors::unsupported; 94 }; 95 yield st: *bufstream; 96 case => 97 return errors::unsupported; 98 }; 99 if (s.wavail == 0) { 100 return; 101 }; 102 io::writeall(s.source, s.wbuffer[..s.wavail])?; 103 s.wavail = 0; 104 return; 105 }; 106 107 // Sets the list of bytes which will cause the stream to flush when written. By 108 // default, the stream will flush when a newline (\n) is written. 109 export fn setflush(s: io::handle, b: []u8) void = { 110 let s = match (s) { 111 case let st: *io::stream => 112 if (st.writer != &buffered_write) { 113 abort("Attempted to set flush bytes on unbuffered stream"); 114 }; 115 yield st: *bufstream; 116 case => 117 abort("Attempted to set flush bytes on unbuffered stream"); 118 }; 119 s.flush = b; 120 }; 121 122 // "Unreads" a slice of bytes, such that the next call to "read" will return 123 // these bytes before reading any new data from the underlying source. The 124 // unread data must fit into the read buffer's available space. The amount of 125 // data which can be unread before the user makes any reads from a buffered 126 // stream is equal to the length of the read buffer, and otherwise it is equal 127 // to the length of the return value of the last call to [[io::read]] using this 128 // buffered stream. Attempting to unread more data than can fit into the read 129 // buffer will abort the program. 130 export fn unread(s: io::handle, buf: []u8) void = { 131 let s = match (s) { 132 case let st: *io::stream => 133 if (st.reader != &buffered_read) { 134 abort("Attempted unread on unbuffered stream"); 135 }; 136 yield st: *bufstream; 137 case => 138 abort("Attempted unread on unbuffered stream"); 139 }; 140 assert(s.rpos >= len(buf), 141 "Attempted to unread more data than buffer has available"); 142 s.rbuffer[s.rpos - len(buf)..s.rpos] = buf; 143 s.rpos -= len(buf); 144 s.ravail += len(buf); 145 }; 146 147 // Unreads a rune; see [[unread]]. 148 export fn unreadrune(s: io::handle, rn: rune) void = { 149 const buf = utf8::encoderune(rn); 150 unread(s, buf); 151 }; 152 153 // Returns true if an [[io::handle]] is a [[buffered]] stream. 154 export fn isbuffered(in: io::handle) bool = { 155 match (in) { 156 case io::file => 157 return false; 158 case let st: *io::stream => 159 return st.reader == &buffered_read || st.writer == &buffered_write; 160 }; 161 }; 162 163 fn buffered_close_static(s: *io::stream) (void | io::error) = { 164 assert(s.closer == &buffered_close_static); 165 if (s.writer != null) { 166 flush(s: *bufstream)?; 167 }; 168 }; 169 170 fn buffered_read(s: *io::stream, buf: []u8) (size | io::EOF | io::error) = { 171 assert(s.reader == &buffered_read); 172 let s = s: *bufstream; 173 174 if (s.ravail < len(buf) && s.ravail < len(s.rbuffer)) { 175 s.rbuffer[..s.ravail] = s.rbuffer[s.rpos..s.rpos + s.ravail]; 176 s.rpos = 0; 177 match (io::read(s.source, s.rbuffer[s.ravail..])) { 178 case let err: io::error => 179 return err; 180 case io::EOF => 181 if (s.ravail == 0) { 182 return io::EOF; 183 }; 184 case let z: size => 185 s.ravail += z; 186 }; 187 }; 188 189 const n = if (len(buf) < s.ravail) len(buf) else s.ravail; 190 buf[..n] = s.rbuffer[s.rpos..s.rpos + n]; 191 s.rpos += n; 192 s.ravail -= n; 193 return n; 194 }; 195 196 fn buffered_write(s: *io::stream, buf: const []u8) (size | io::error) = { 197 assert(s.writer == &buffered_write); 198 let s = s: *bufstream; 199 let buf = buf; 200 201 let doflush = false; 202 if (len(s.flush) != 0) { 203 for (let i = 0z; i < len(buf); i += 1) :search { 204 for (let j = 0z; j < len(s.flush); j += 1) { 205 if (buf[i] == s.flush[j]) { 206 doflush = true; 207 break :search; 208 }; 209 }; 210 }; 211 }; 212 213 let z = 0z; 214 for (len(buf) > 0) { 215 let avail = len(s.wbuffer) - s.wavail; 216 if (avail == 0) { 217 flush(s)?; 218 avail = len(s.wbuffer); 219 }; 220 221 const n = if (avail < len(buf)) avail else len(buf); 222 s.wbuffer[s.wavail..s.wavail + n] = buf[..n]; 223 buf = buf[n..]; 224 s.wavail += n; 225 z += n; 226 }; 227 228 if (doflush) { 229 flush(s)?; 230 }; 231 232 return z; 233 }; 234 235 @test fn buffered_read() void = { 236 let sourcebuf: []u8 = [1, 3, 3, 7]; 237 let source = fixed(sourcebuf, io::mode::READ); 238 defer io::close(&source)!; 239 240 let rbuf: [1024]u8 = [0...]; 241 let f = buffered(&source, rbuf, []); 242 defer io::close(&f)!; 243 244 let buf: [1024]u8 = [0...]; 245 assert(io::read(&f, buf[..2]) as size == 2); 246 assert(source.pos == len(source.buf), "fixed stream was not fully consumed"); 247 assert(bytes::equal(buf[..2], [1, 3])); 248 249 assert(io::read(&f, buf[2..]) as size == 2); 250 assert(bytes::equal(buf[..4], [1, 3, 3, 7])); 251 assert(io::read(&f, buf) is io::EOF); 252 253 let sourcebuf: [32]u8 = [1, 3, 3, 7, 0...]; 254 let source = fixed(sourcebuf, io::mode::READ); 255 256 let rbuf: [16]u8 = [0...]; 257 let f = buffered(&source, rbuf, []); 258 defer io::close(&f)!; 259 260 let buf: [32]u8 = [0...]; 261 assert(io::read(&f, buf) as size == 16); 262 assert(source.pos == 16); 263 264 assert(io::read(&f, buf[16..]) as size == 16); 265 assert(bytes::equal(buf, sourcebuf)); 266 assert(io::read(&f, buf) is io::EOF); 267 assert(source.pos == len(source.buf)); 268 }; 269 270 @test fn buffered_write() void = { 271 // Normal case 272 let sink = dynamic(io::mode::WRITE); 273 defer io::close(&sink)!; 274 275 let wbuf: [1024]u8 = [0...]; 276 let f = buffered(&sink, [], wbuf); 277 defer io::close(&f)!; 278 279 assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4); 280 assert(len(buffer(&sink)) == 0); 281 assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4); 282 assert(flush(&f) is void); 283 assert(bytes::equal(buffer(&sink), [1, 3, 3, 7, 1, 3, 3, 7])); 284 285 // Test flushing via buffer exhaustion 286 let sink = dynamic(io::mode::WRITE); 287 defer io::close(&sink)!; 288 289 let wbuf: [4]u8 = [0...]; 290 let f = buffered(&sink, [], wbuf); 291 292 assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4); 293 assert(len(buffer(&sink)) == 0); 294 assert(io::writeall(&f, [1, 3, 3, 7]) as size == 4); 295 assert(bytes::equal(buffer(&sink), [1, 3, 3, 7])); 296 io::close(&f)!; // Should flush 297 assert(bytes::equal(buffer(&sink), [1, 3, 3, 7, 1, 3, 3, 7])); 298 299 // Test flushing via flush characters 300 let sink = dynamic(io::mode::WRITE); 301 defer io::close(&sink)!; 302 303 let wbuf: [1024]u8 = [0...]; 304 let f = buffered(&sink, [], wbuf); 305 306 assert(io::writeall(&f, strings::toutf8("hello")) as size == 5); 307 assert(len(buffer(&sink)) == 0); 308 assert(io::writeall(&f, strings::toutf8(" world!\n")) as size == 8); 309 assert(bytes::equal(buffer(&sink), strings::toutf8("hello world!\n"))); 310 }; 311 312 @test fn unread() void = { 313 let rbuf: [8]u8 = [0...]; 314 let f = buffered(io::zero, rbuf, []); 315 316 let buf: [16]u8 = [42...]; 317 assert(io::read(&f, buf[..4]) as size == 4); 318 assert(buf[0] == 0); 319 assert(buf[1] == 0); 320 assert(buf[2] == 0); 321 assert(buf[3] == 0); 322 unread(&f, [1, 2, 3, 4]); 323 324 assert(io::read(&f, buf[..8]) as size == 8); 325 assert(buf[0] == 1); 326 assert(buf[1] == 2); 327 assert(buf[2] == 3); 328 assert(buf[3] == 4); 329 assert(buf[4] == 0); 330 assert(buf[5] == 0); 331 assert(buf[6] == 0); 332 assert(buf[7] == 0); 333 334 assert(io::read(&f, buf) as size == 8); 335 for (let i = 0z; i < 8; i += 1) { 336 assert(buf[i] == 0); 337 }; 338 339 let input: []u8 = [1, 2, 3, 4]; 340 let f = buffered(&fixed(input, io::mode::READ), rbuf, []); 341 342 assert(io::read(&f, buf) as size == 4); 343 unread(&f, [1, 2, 3, 4]); 344 assert(io::read(&f, buf) as size == 4); 345 assert(io::read(&f, buf) is io::EOF); 346 };