queue.ha (9174B)
1 // SPDX-License-Identifier: GPL-3.0-only 2 // (c) Hare authors <https://harelang.org> 3 4 use crypto::sha256; 5 use encoding::hex; 6 use errors; 7 use fmt; 8 use fs; 9 use hare::module; 10 use hare::unparse; 11 use hash; 12 use io; 13 use memio; 14 use os; 15 use os::exec; 16 use path; 17 use shlex; 18 use sort; 19 use strings; 20 use unix::tty; 21 22 // a command which is currently running 23 type job = struct { 24 pid: exec::process, 25 task: *task, 26 // fd to be closed once the job has finished, in order to release the 27 // [[io::lock]] on it 28 lock: io::file, 29 }; 30 31 export fn execute(ctx: *context) (str | error) = { 32 let q: []*task = []; 33 defer free(q); 34 defer for (let t .. q) { 35 free_task(t); 36 }; 37 38 // stage::TD (typedef files) are generated by the SSA stage (harec) 39 const goal = if (ctx.goal == stage::TD) stage::SSA else ctx.goal; 40 queue(ctx, &q, goal, ctx.top); 41 // sort by stage, harec then qbe then as then ld, and keep reverse 42 // topo sort within each stage 43 sort::sort(q, size(*task), &task_cmp); 44 ctx.total = len(q); 45 46 let jobs: []job = alloc([], ctx.jobs); 47 defer free(jobs); 48 49 if (len(os::tryenv("NO_COLOR", "")) == 0 50 && os::getenv("HAREC_COLOR") is void 51 && tty::isatty(os::stderr_file)) { 52 os::setenv("HAREC_COLOR", "1")!; 53 }; 54 55 for (let i = 0z; len(q) != 0; i += 1) { 56 if (i == len(q)) { 57 await_task(ctx, &jobs)?; 58 i = 0; 59 }; 60 if (run_task(ctx, &jobs, q[i])?) { 61 delete(q[i]); 62 i = -1; 63 }; 64 }; 65 for (await_task(ctx, &jobs) is size) void; 66 if (ctx.mode == output::DEFAULT && ctx.total != 0) { 67 fmt::errorln()?; 68 }; 69 70 return get_cache(ctx, ctx.top, ctx.goal)?; 71 }; 72 73 fn task_cmp(a: const *opaque, b: const *opaque) int = { 74 let a = a: const **task, b = b: const **task; 75 return a.kind - b.kind; 76 }; 77 78 fn queue(ctx: *context, q: *[]*task, kind: stage, idx: size) *task = { 79 // return already existing task to avoid creating duplicates 80 for (let t .. *q) { 81 if (t.kind == kind && t.idx == idx) { 82 return t; 83 }; 84 }; 85 let t = alloc(task { 86 kind = kind, 87 idx = idx, 88 ... 89 }); 90 switch (kind) { 91 case stage::BIN => 92 t.ndeps = len(ctx.mods); 93 for (let i = 0z; i < len(ctx.mods); i += 1) { 94 append(queue(ctx, q, stage::O, i).rdeps, t); 95 }; 96 case stage::O, stage::S => 97 t.ndeps = 1; 98 append(queue(ctx, q, kind - 1, idx).rdeps, t); 99 case stage::SSA => 100 t.ndeps = len(ctx.mods[idx].deps); 101 for (let (dep_idx, _) .. ctx.mods[idx].deps) { 102 append(queue(ctx, q, stage::SSA, dep_idx).rdeps, t); 103 }; 104 case stage::TD => abort(); 105 }; 106 append(q, t); 107 return t; 108 }; 109 110 // returns true if the task was executed. returns false if the task cannot be 111 // executed (because it is waiting for dependencies) or if the task is already 112 // running (possibly in another instance of this build driver) 113 fn run_task(ctx: *context, jobs: *[]job, t: *task) (bool | error) = { 114 if (len(jobs) == ctx.jobs) { 115 await_task(ctx, jobs)?; 116 }; 117 if (t.ndeps != 0) { 118 return false; 119 }; 120 let mod = ctx.mods[t.idx]; 121 let deps = get_deps(ctx, t); 122 defer strings::freeall(deps); 123 let flags = get_flags(ctx, t)?; 124 defer strings::freeall(flags); 125 ctx.hashes[t.idx][t.kind] = get_hash(ctx, deps, flags, t); 126 127 os::mkdirs(module::get_cache(ctx.ctx.harecache, mod.path)?, 0o755)!; 128 let out = get_cache(ctx, t.idx, t.kind)?; 129 defer free(out); 130 131 path::set(&buf, out)?; 132 let lock = path::push_ext(&buf, "lock")?; 133 let lock = os::create(lock, 0o644, fs::flag::WRONLY)?; 134 if (!io::lock(lock, false, io::lockop::EXCLUSIVE)?) { 135 io::close(lock)?; 136 return false; 137 }; 138 139 path::set(&buf, out)?; 140 let tmp = path::push_ext(&buf, "tmp")?; 141 // TODO: use os::mkfile once that's supported on freebsd and openbsd 142 io::close(os::create(tmp, 0o644)?)?; 143 144 let args = get_args(ctx, tmp, flags, t); 145 defer strings::freeall(args); 146 147 path::set(&buf, out)?; 148 write_args(ctx, path::push_ext(&buf, "txt")?, args, t)?; 149 150 let outdated = module::outdated(out, deps, mod.srcs.mtime); 151 let exec = t.kind != stage::SSA || len(mod.srcs.ha) != 0; 152 if (!exec || !outdated) { 153 if (outdated) { 154 cleanup_task(ctx, t)?; 155 } else if (t.kind == stage::SSA) { 156 get_td(ctx, t.idx)?; 157 }; 158 io::close(lock)?; 159 free_task(t); 160 ctx.total -= 1; 161 return true; 162 }; 163 164 switch (ctx.mode) { 165 case output::DEFAULT, output::SILENT => void; 166 case output::VERBOSE => 167 if (tty::isatty(os::stderr_file)) { 168 fmt::errorfln("\x1b[1m{}\x1b[0m\t{}", 169 ctx.cmds[t.kind], mod.name)?; 170 } else { 171 fmt::errorfln("{}\t{}", ctx.cmds[t.kind], mod.name)?; 172 }; 173 case output::VVERBOSE => 174 fmt::error(ctx.cmds[t.kind])?; 175 for (let arg .. args) { 176 fmt::error(" ")?; 177 shlex::quote(os::stderr, arg)?; 178 }; 179 fmt::errorln()?; 180 }; 181 182 let cmd = match(exec::cmd(ctx.cmds[t.kind], args...)) { 183 case exec::nocmd => 184 fmt::fatalf("Error: Command not found: {}", ctx.cmds[t.kind]); 185 case let e: exec::error => 186 return e; 187 case let c: exec::command => 188 yield c; 189 }; 190 path::set(&buf, out)?; 191 let output = os::create(path::push_ext(&buf, "log")?, 0o644)?; 192 defer io::close(output)!; 193 exec::addfile(&cmd, os::stdout_file, output); 194 exec::addfile(&cmd, os::stderr_file, output); 195 static append(jobs, job { 196 pid = exec::start(&cmd)?, 197 task = t, 198 lock = lock, 199 }); 200 return true; 201 }; 202 203 fn await_task(ctx: *context, jobs: *[]job) (size | void | error) = { 204 if (ctx.mode == output::DEFAULT && ctx.total != 0) { 205 fmt::errorf("\x1b[G\x1b[2K{}/{} tasks completed ({}%)", 206 ctx.completed, ctx.total, 207 ctx.completed * 100 / ctx.total)?; 208 }; 209 if (len(jobs) == 0) { 210 return; 211 }; 212 213 let (proc, status) = exec::waitany()?; 214 let i = 0z; 215 for (i < len(jobs) && jobs[i].pid != proc; i += 1) void; 216 assert(i < len(jobs), "Unknown PID returned from waitany"); 217 let j = jobs[i]; 218 let t = j.task; 219 static delete(jobs[i]); 220 221 let out = get_cache(ctx, t.idx, t.kind)?; 222 defer free(out); 223 path::set(&buf, out)?; 224 225 let output = os::open(path::push_ext(&buf, "log")?)?; 226 defer io::close(output)!; 227 let output = io::drain(output)?; 228 defer free(output); 229 if (len(output) > 0) { 230 if (ctx.mode == output::DEFAULT) { 231 fmt::errorln()?; 232 }; 233 io::writeall(os::stderr, output)?; 234 }; 235 236 match (exec::check(&status)) { 237 case void => void; 238 case let e: !exec::exit_status => 239 if (ctx.mode == output::DEFAULT) { 240 fmt::errorln()?; 241 }; 242 if (len(ctx.mods[t.idx].ns) > 0) { 243 fmt::fatalf("{} for {} ({}) {}", 244 ctx.cmds[t.kind], ctx.mods[t.idx].name, 245 ctx.mods[t.idx].path, exec::exitstr(e)); 246 } else { 247 fmt::fatal(ctx.cmds[t.kind], "for", 248 ctx.mods[t.idx].name, exec::exitstr(e)); 249 }; 250 }; 251 252 cleanup_task(ctx, t)?; 253 free_task(t); 254 io::close(j.lock)?; 255 ctx.completed += 1; 256 return i; 257 }; 258 259 // update the cache after a task has been run 260 fn cleanup_task(ctx: *context, t: *task) (void | error) = { 261 let out = get_cache(ctx, t.idx, t.kind)?; 262 defer free(out); 263 264 if (t.kind == stage::SSA) { 265 cleanup_ssa_task(ctx, t, out)?; 266 }; 267 268 let tmp = strings::concat(out, ".tmp"); 269 defer free(tmp); 270 os::move(tmp, out)?; 271 }; 272 273 fn cleanup_ssa_task(ctx: *context, t: *task, out: str) (void | error) = { 274 // td file is hashed solely based on its contents. not worth doing this 275 // for other types of outputs, but it gets us better caching behavior 276 // for tds since we need to include the dependency tds in the ssa hash 277 // see design.txt for more details 278 let tmp = strings::concat(out, ".td.tmp"); 279 defer free(tmp); 280 281 let f = match (os::create(tmp, 0o644, fs::flag::RDWR)) { 282 case let f: io::file => 283 yield f; 284 case let err: fs::error => 285 return err; 286 }; 287 defer io::close(f)!; 288 let h = sha256::sha256(); 289 io::copy(&h, f)!; 290 let prefix: [sha256::SZ]u8 = [0...]; 291 hash::sum(&h, prefix); 292 ctx.hashes[t.idx][stage::TD] = prefix; 293 294 let ptr = strings::concat(out, ".td"); 295 defer free(ptr); 296 let ptr = os::create(ptr, 0o644)?; 297 defer io::close(ptr)!; 298 hex::encode(ptr, prefix)?; 299 300 let td = update_env(ctx, t.idx)?; 301 defer free(td); 302 if (os::exists(td)) { 303 os::remove(tmp)?; 304 } else { 305 os::move(tmp, td)?; 306 }; 307 }; 308 309 // get the td for a module whose harec has been skipped 310 fn get_td(ctx: *context, idx: size) (void | error) = { 311 let ssa = get_cache(ctx, idx, stage::SSA)?; 312 defer free(ssa); 313 let ptr = strings::concat(ssa, ".td"); 314 defer free(ptr); 315 let ptr = match (os::open(ptr)) { 316 case fs::error => 317 return; 318 case let ptr: io::file => 319 yield ptr; 320 }; 321 defer io::close(ptr)!; 322 323 let ptr = hex::newdecoder(ptr); 324 let prefix: [sha256::SZ]u8 = [0...]; 325 io::readall(&ptr, prefix)?; 326 ctx.hashes[idx][stage::TD] = prefix; 327 328 free(update_env(ctx, idx)?); 329 }; 330 331 // set $HARE_TD_<module>, returning the path to the module's td 332 fn update_env(ctx: *context, idx: size) (str | error) = { 333 let path = get_cache(ctx, idx, stage::TD)?; 334 let ns = unparse::identstr(ctx.mods[idx].ns); 335 defer free(ns); 336 if (ctx.mode == output::VVERBOSE) { 337 fmt::errorfln("# HARE_TD_{}={}", ns, path)?; 338 }; 339 let var = strings::concat("HARE_TD_", ns); 340 defer free(var); 341 os::setenv(var, path)!; 342 return path; 343 }; 344 345 fn get_cache(ctx: *context, idx: size, kind: stage) (str | error) = { 346 let prefix = match (ctx.hashes[idx][kind]) { 347 case void => abort("expected non-void prefix in get_cache()"); 348 case let prefix: [sha256::SZ]u8 => 349 yield prefix; 350 }; 351 let s = memio::dynamic(); 352 memio::concat(&s, module::get_cache(ctx.ctx.harecache, 353 ctx.mods[idx].path)?)!; 354 memio::concat(&s, "/")!; 355 hex::encode(&s, prefix)!; 356 memio::concat(&s, ".", stage_ext[kind])!; 357 return memio::string(&s)!; 358 };