stream.ha (8392B)
1 // SPDX-License-Identifier: MPL-2.0 2 // (c) Hare authors <https://harelang.org> 3 4 use bytes; 5 use encoding::utf8; 6 use errors; 7 use io; 8 use strings; 9 10 export type stream = struct { 11 stream: io::stream, 12 buf: []u8, 13 pos: size, 14 }; 15 16 const fixed_vt: io::vtable = io::vtable { 17 seeker = &seek, 18 copier = ©, 19 reader = &read, 20 writer = &fixed_write, 21 ... 22 }; 23 24 const dynamic_vt: io::vtable = io::vtable { 25 seeker = &seek, 26 copier = ©, 27 reader = &read, 28 writer = &dynamic_write, 29 closer = &dynamic_close, 30 ... 31 }; 32 33 // Creates a stream for a fixed, caller-supplied buffer. Seeking a stream will 34 // cause subsequent writes to overwrite existing contents of the buffer. 35 // Writes return an error if they would exceed the buffer's capacity. The 36 // stream doesn't have to be closed. 37 export fn fixed(in: []u8) stream = stream { 38 stream = &fixed_vt, 39 buf = in, 40 pos = 0, 41 }; 42 43 // Creates an [[io::stream]] which dynamically allocates a buffer to store 44 // writes into. Seeking the stream and reading will read the written data. 45 // Calling [[io::close]] on this stream will free the buffer. If a stream's 46 // data is referenced via [[buffer]], the stream shouldn't be closed as 47 // long as the data is used. 48 export fn dynamic() stream = dynamic_from([]); 49 50 // Like [[dynamic]], but takes an existing slice as input. Writes will 51 // overwrite the buffer and reads consume bytes from the initial buffer. 52 // Ownership of the provided slice is transferred to the returned [[stream]]. 53 // Calling [[io::close]] will free the buffer. 54 export fn dynamic_from(in: []u8) stream = stream { 55 stream = &dynamic_vt, 56 buf = in, 57 pos = 0, 58 }; 59 60 // Returns a stream's buffer, up to the current cursor position. 61 // [[io::seek]] to the end first in order to return the entire buffer. 62 // The return value is borrowed from the input. 63 export fn buffer(in: *stream) []u8 = { 64 return in.buf[..in.pos]; 65 }; 66 67 // Returns a stream's buffer, up to the current cursor position, as a string. 68 // [[io::seek]] to the end first in order to return the entire buffer. 69 // The return value is borrowed from the input. 70 export fn string(in: *stream) (str | utf8::invalid) = { 71 return strings::fromutf8(in.buf[..in.pos]); 72 }; 73 74 // A convenience function that sets the read-write cursor to zero, so that 75 // the buffer can be overwritten and reused. 76 export fn reset(in: *stream) void = { 77 in.pos = 0; 78 in.buf = in.buf[..0]; 79 }; 80 81 // Reads data from a [[dynamic]] or [[fixed]] stream and returns a slice 82 // borrowed from the internal buffer. 83 export fn borrowedread(st: *stream, amt: size) ([]u8 | io::EOF) = { 84 if (len(st.buf) - st.pos < amt) { 85 return io::EOF; 86 }; 87 let buf = st.buf[st.pos..st.pos + amt]; 88 st.pos += len(buf); 89 return buf; 90 }; 91 92 fn read(s: *io::stream, buf: []u8) (size | io::EOF | io::error) = { 93 let s = s: *stream; 94 if (len(s.buf) == s.pos) { 95 return io::EOF; 96 }; 97 const n = if (len(s.buf) - s.pos < len(buf)) { 98 yield len(s.buf) - s.pos; 99 } else { 100 yield len(buf); 101 }; 102 assert(s.pos + n <= len(s.buf)); 103 buf[..n] = s.buf[s.pos..s.pos + n]; 104 s.pos += n; 105 return n; 106 }; 107 108 fn seek( 109 s: *io::stream, 110 off: io::off, 111 w: io::whence 112 ) (io::off | io::error) = { 113 let s = s: *stream; 114 let start = switch (w) { 115 case io::whence::SET => yield 0z; 116 case io::whence::CUR => yield s.pos; 117 case io::whence::END => yield len(s.buf); 118 }; 119 if (off < 0) { 120 if (start < (-off): size) return errors::invalid; 121 } else { 122 if (len(s.buf) - start < off: size) return errors::invalid; 123 }; 124 s.pos = start + off: size; 125 return s.pos: io::off; 126 }; 127 128 fn copy(dest: *io::stream, src: *io::stream) (size | io::error) = { 129 if (src.reader != &read || dest.writer == null) { 130 return errors::unsupported; 131 }; 132 let src = src: *stream; 133 return (dest.writer: *io::writer)(dest, src.buf[src.pos..]); 134 }; 135 136 fn fixed_write(s: *io::stream, buf: const []u8) (size | io::error) = { 137 if (len(buf) == 0) { 138 return 0z; 139 }; 140 let s = s: *stream; 141 if (s.pos >= len(s.buf)) { 142 return errors::overflow; 143 }; 144 const n = if (len(buf) > len(s.buf[s.pos..])) { 145 yield len(s.buf[s.pos..]); 146 } else { 147 yield len(buf); 148 }; 149 s.buf[s.pos..s.pos+n] = buf[..n]; 150 s.pos += n; 151 return n; 152 }; 153 154 fn dynamic_write(s: *io::stream, buf: const []u8) (size | io::error) = { 155 let s = s: *stream; 156 let spare = len(s.buf) - s.pos; 157 let bufend = if (spare < len(buf)) spare else len(buf); 158 s.buf[s.pos..s.pos+bufend] = buf[..bufend]; 159 s.pos += bufend; 160 if (bufend < len(buf)) { 161 append(s.buf, buf[bufend..]...); 162 s.pos += len(buf[bufend..]); 163 }; 164 return len(buf); 165 }; 166 167 fn dynamic_close(s: *io::stream) (void | io::error) = { 168 const s = s: *stream; 169 free(s.buf); 170 s.buf = []; 171 s.pos = 0; 172 }; 173 174 @test fn fixed() void = { 175 let buf: [1024]u8 = [0...]; 176 let stream = fixed(buf); 177 defer io::close(&stream)!; 178 179 let n = 0z; 180 n += io::writeall(&stream, strings::toutf8("hello ")) as size; 181 n += io::writeall(&stream, strings::toutf8("world")) as size; 182 assert(bytes::equal(buf[..n], strings::toutf8("hello world"))); 183 assert(io::seek(&stream, 6, io::whence::SET) as io::off == 6: io::off); 184 io::writeall(&stream, strings::toutf8("asdf")) as size; 185 assert(bytes::equal(buf[..n], strings::toutf8("hello asdfd"))); 186 187 let out: [2]u8 = [0...]; 188 let s = fixed([1u8, 2u8]); 189 defer io::close(&s)!; 190 assert(io::read(&s, out[..1]) as size == 1 && out[0] == 1); 191 assert(io::seek(&s, 1, io::whence::CUR) as io::off == 2: io::off); 192 assert(io::read(&s, buf[..]) is io::EOF); 193 assert(io::writeall(&s, [1, 2]) as io::error is errors::overflow); 194 195 let in: [6]u8 = [0, 1, 2, 3, 4, 5]; 196 let out: [6]u8 = [0...]; 197 let source = fixed(in); 198 let sink = fixed(out); 199 io::copy(&sink, &source)!; 200 assert(bytes::equal(in, out)); 201 202 assert(io::write(&sink, [])! == 0); 203 204 static let buf: [1024]u8 = [0...]; 205 let stream = fixed(buf); 206 assert(string(&stream)! == ""); 207 io::writeall(&stream, strings::toutf8("hello ")) as size; 208 assert(string(&stream)! == "hello "); 209 io::writeall(&stream, strings::toutf8("world")) as size; 210 assert(string(&stream)! == "hello world"); 211 }; 212 213 @test fn dynamic() void = { 214 let s = dynamic(); 215 defer io::close(&s)!; 216 assert(io::writeall(&s, [1, 2, 3]) as size == 3); 217 assert(bytes::equal(buffer(&s), [1, 2, 3])); 218 assert(io::writeall(&s, [4, 5]) as size == 2); 219 assert(bytes::equal(buffer(&s), [1, 2, 3, 4, 5])); 220 let buf: [2]u8 = [0...]; 221 assert(io::seek(&s, 0, io::whence::SET) as io::off == 0: io::off); 222 assert(io::read(&s, buf[..]) as size == 2 && bytes::equal(buf, [1, 2])); 223 assert(io::read(&s, buf[..]) as size == 2 && bytes::equal(buf, [3, 4])); 224 assert(io::read(&s, buf[..]) as size == 1 && buf[0] == 5); 225 assert(io::read(&s, buf[..]) is io::EOF); 226 assert(io::writeall(&s, [6, 7, 8]) as size == 3); 227 assert(bytes::equal(buffer(&s), [1, 2, 3, 4, 5, 6, 7, 8])); 228 reset(&s); 229 assert(len(buffer(&s)) == 0); 230 assert(io::writeall(&s, [1, 2, 3]) as size == 3); 231 232 let sl: []u8 = alloc([1, 2, 3]); 233 let s = dynamic_from(sl); 234 defer io::close(&s)!; 235 assert(io::writeall(&s, [0, 0]) as size == 2); 236 assert(io::seek(&s, 0, io::whence::END) as io::off == 3: io::off); 237 assert(io::writeall(&s, [4, 5, 6]) as size == 3); 238 assert(bytes::equal(buffer(&s), [0, 0, 3, 4, 5, 6])); 239 assert(io::read(&s, buf[..]) is io::EOF); 240 241 sl = alloc([1, 2]); 242 let s = dynamic_from(sl); 243 defer io::close(&s)!; 244 assert(io::read(&s, buf[..1]) as size == 1 && buf[0] == 1); 245 assert(io::seek(&s, 1, io::whence::CUR) as io::off == 2: io::off); 246 assert(io::read(&s, buf[..]) is io::EOF); 247 assert(io::writeall(&s, [3, 4]) as size == 2 && bytes::equal(buffer(&s), [1, 2, 3, 4])); 248 io::close(&s)!; 249 assert(io::writeall(&s, [5, 6]) as size == 2 && bytes::equal(buffer(&s), [5, 6])); 250 251 let in: [6]u8 = [0, 1, 2, 3, 4, 5]; 252 let source = dynamic_from(in); 253 let sink = dynamic(); 254 defer io::close(&sink)!; 255 io::copy(&sink, &source)!; 256 assert(bytes::equal(in, buffer(&sink))); 257 258 let in: [6]u8 = [0, 1, 2, 3, 4, 5]; 259 let source = dynamic_from(in); 260 const borrowed = borrowedread(&source, len(in)-1) as []u8; 261 assert(bytes::equal(borrowed, [0, 1, 2, 3, 4])); 262 let source = dynamic_from(in); 263 const borrowed = borrowedread(&source, len(in)) as []u8; 264 assert(bytes::equal(borrowed, [0, 1, 2, 3, 4, 5])); 265 let source = dynamic_from(in); 266 assert(borrowedread(&source, len(in)+1) is io::EOF); 267 268 let stream = dynamic(); 269 defer io::close(&stream)!; 270 assert(string(&stream)! == ""); 271 io::writeall(&stream, strings::toutf8("hello ")) as size; 272 assert(string(&stream)! == "hello "); 273 io::writeall(&stream, strings::toutf8("world")) as size; 274 assert(string(&stream)! == "hello world"); 275 };