hare

[hare] The Hare programming language
git clone https://git.torresjrjr.com/hare.git
Log | Files | Refs | README | LICENSE

queue.ha (9174B)


      1 // SPDX-License-Identifier: GPL-3.0-only
      2 // (c) Hare authors <https://harelang.org>
      3 
      4 use crypto::sha256;
      5 use encoding::hex;
      6 use errors;
      7 use fmt;
      8 use fs;
      9 use hare::module;
     10 use hare::unparse;
     11 use hash;
     12 use io;
     13 use memio;
     14 use os;
     15 use os::exec;
     16 use path;
     17 use shlex;
     18 use sort;
     19 use strings;
     20 use unix::tty;
     21 
     22 // a command which is currently running
     23 type job = struct {
     24 	pid: exec::process,
     25 	task: *task,
     26 	// fd to be closed once the job has finished, in order to release the
     27 	// [[io::lock]] on it
     28 	lock: io::file,
     29 };
     30 
     31 export fn execute(ctx: *context) (str | error) = {
     32 	let q: []*task = [];
     33 	defer free(q);
     34 	defer for (let t .. q) {
     35 		free_task(t);
     36 	};
     37 
     38 	// stage::TD (typedef files) are generated by the SSA stage (harec)
     39 	const goal = if (ctx.goal == stage::TD) stage::SSA else ctx.goal;
     40 	queue(ctx, &q, goal, ctx.top);
     41 	// sort by stage, harec then qbe then as then ld, and keep reverse
     42 	// topo sort within each stage
     43 	sort::sort(q, size(*task), &task_cmp);
     44 	ctx.total = len(q);
     45 
     46 	let jobs: []job = alloc([], ctx.jobs);
     47 	defer free(jobs);
     48 
     49 	if (len(os::tryenv("NO_COLOR", "")) == 0
     50 			&& os::getenv("HAREC_COLOR") is void
     51 			&& tty::isatty(os::stderr_file)) {
     52 		os::setenv("HAREC_COLOR", "1")!;
     53 	};
     54 
     55 	for (let i = 0z; len(q) != 0; i += 1) {
     56 		if (i == len(q)) {
     57 			await_task(ctx, &jobs)?;
     58 			i = 0;
     59 		};
     60 		if (run_task(ctx, &jobs, q[i])?) {
     61 			delete(q[i]);
     62 			i = -1;
     63 		};
     64 	};
     65 	for (await_task(ctx, &jobs) is size) void;
     66 	if (ctx.mode == output::DEFAULT && ctx.total != 0) {
     67 		fmt::errorln()?;
     68 	};
     69 
     70 	return get_cache(ctx, ctx.top, ctx.goal)?;
     71 };
     72 
     73 fn task_cmp(a: const *opaque, b: const *opaque) int = {
     74 	let a = a: const **task, b = b: const **task;
     75 	return a.kind - b.kind;
     76 };
     77 
     78 fn queue(ctx: *context, q: *[]*task, kind: stage, idx: size) *task = {
     79 	// return already existing task to avoid creating duplicates
     80 	for (let t .. *q) {
     81 		if (t.kind == kind && t.idx == idx) {
     82 			return t;
     83 		};
     84 	};
     85 	let t = alloc(task {
     86 		kind = kind,
     87 		idx = idx,
     88 		...
     89 	});
     90 	switch (kind) {
     91 	case stage::BIN =>
     92 		t.ndeps = len(ctx.mods);
     93 		for (let i = 0z; i < len(ctx.mods); i += 1) {
     94 			append(queue(ctx, q, stage::O, i).rdeps, t);
     95 		};
     96 	case stage::O, stage::S =>
     97 		t.ndeps = 1;
     98 		append(queue(ctx, q, kind - 1, idx).rdeps, t);
     99 	case stage::SSA =>
    100 		t.ndeps = len(ctx.mods[idx].deps);
    101 		for (let (dep_idx, _) .. ctx.mods[idx].deps) {
    102 			append(queue(ctx, q, stage::SSA, dep_idx).rdeps, t);
    103 		};
    104 	case stage::TD => abort();
    105 	};
    106 	append(q, t);
    107 	return t;
    108 };
    109 
    110 // returns true if the task was executed. returns false if the task cannot be
    111 // executed (because it is waiting for dependencies) or if the task is already
    112 // running (possibly in another instance of this build driver)
    113 fn run_task(ctx: *context, jobs: *[]job, t: *task) (bool | error) = {
    114 	if (len(jobs) == ctx.jobs) {
    115 		await_task(ctx, jobs)?;
    116 	};
    117 	if (t.ndeps != 0) {
    118 		return false;
    119 	};
    120 	let mod = ctx.mods[t.idx];
    121 	let deps = get_deps(ctx, t);
    122 	defer strings::freeall(deps);
    123 	let flags = get_flags(ctx, t)?;
    124 	defer strings::freeall(flags);
    125 	ctx.hashes[t.idx][t.kind] = get_hash(ctx, deps, flags, t);
    126 
    127 	os::mkdirs(module::get_cache(ctx.ctx.harecache, mod.path)?, 0o755)!;
    128 	let out = get_cache(ctx, t.idx, t.kind)?;
    129 	defer free(out);
    130 
    131 	path::set(&buf, out)?;
    132 	let lock = path::push_ext(&buf, "lock")?;
    133 	let lock = os::create(lock, 0o644, fs::flag::WRONLY)?;
    134 	if (!io::lock(lock, false, io::lockop::EXCLUSIVE)?) {
    135 		io::close(lock)?;
    136 		return false;
    137 	};
    138 
    139 	path::set(&buf, out)?;
    140 	let tmp = path::push_ext(&buf, "tmp")?;
    141 	// TODO: use os::mkfile once that's supported on freebsd and openbsd
    142 	io::close(os::create(tmp, 0o644)?)?;
    143 
    144 	let args = get_args(ctx, tmp, flags, t);
    145 	defer strings::freeall(args);
    146 
    147 	path::set(&buf, out)?;
    148 	write_args(ctx, path::push_ext(&buf, "txt")?, args, t)?;
    149 
    150 	let outdated = module::outdated(out, deps, mod.srcs.mtime);
    151 	let exec = t.kind != stage::SSA || len(mod.srcs.ha) != 0;
    152 	if (!exec || !outdated) {
    153 		if (outdated) {
    154 			cleanup_task(ctx, t)?;
    155 		} else if (t.kind == stage::SSA) {
    156 			get_td(ctx, t.idx)?;
    157 		};
    158 		io::close(lock)?;
    159 		free_task(t);
    160 		ctx.total -= 1;
    161 		return true;
    162 	};
    163 
    164 	switch (ctx.mode) {
    165 	case output::DEFAULT, output::SILENT => void;
    166 	case output::VERBOSE =>
    167 		if (tty::isatty(os::stderr_file)) {
    168 			fmt::errorfln("\x1b[1m{}\x1b[0m\t{}",
    169 				ctx.cmds[t.kind], mod.name)?;
    170 		} else {
    171 			fmt::errorfln("{}\t{}", ctx.cmds[t.kind], mod.name)?;
    172 		};
    173 	case output::VVERBOSE =>
    174 		fmt::error(ctx.cmds[t.kind])?;
    175 		for (let arg .. args) {
    176 			fmt::error(" ")?;
    177 			shlex::quote(os::stderr, arg)?;
    178 		};
    179 		fmt::errorln()?;
    180 	};
    181 
    182 	let cmd = match(exec::cmd(ctx.cmds[t.kind], args...)) {
    183 	case exec::nocmd =>
    184 		fmt::fatalf("Error: Command not found: {}", ctx.cmds[t.kind]);
    185 	case let e: exec::error =>
    186 		return e;
    187 	case let c: exec::command =>
    188 		yield c;
    189 	};
    190 	path::set(&buf, out)?;
    191 	let output = os::create(path::push_ext(&buf, "log")?, 0o644)?;
    192 	defer io::close(output)!;
    193 	exec::addfile(&cmd, os::stdout_file, output);
    194 	exec::addfile(&cmd, os::stderr_file, output);
    195 	static append(jobs, job {
    196 		pid = exec::start(&cmd)?,
    197 		task = t,
    198 		lock = lock,
    199 	});
    200 	return true;
    201 };
    202 
    203 fn await_task(ctx: *context, jobs: *[]job) (size | void | error) = {
    204 	if (ctx.mode == output::DEFAULT && ctx.total != 0) {
    205 		fmt::errorf("\x1b[G\x1b[2K{}/{} tasks completed ({}%)",
    206 			ctx.completed, ctx.total,
    207 			ctx.completed * 100 / ctx.total)?;
    208 	};
    209 	if (len(jobs) == 0) {
    210 		return;
    211 	};
    212 
    213 	let (proc, status) = exec::waitany()?;
    214 	let i = 0z;
    215 	for (i < len(jobs) && jobs[i].pid != proc; i += 1) void;
    216 	assert(i < len(jobs), "Unknown PID returned from waitany");
    217 	let j = jobs[i];
    218 	let t = j.task;
    219 	static delete(jobs[i]);
    220 
    221 	let out = get_cache(ctx, t.idx, t.kind)?;
    222 	defer free(out);
    223 	path::set(&buf, out)?;
    224 
    225 	let output = os::open(path::push_ext(&buf, "log")?)?;
    226 	defer io::close(output)!;
    227 	let output = io::drain(output)?;
    228 	defer free(output);
    229 	if (len(output) > 0) {
    230 		if (ctx.mode == output::DEFAULT) {
    231 			fmt::errorln()?;
    232 		};
    233 		io::writeall(os::stderr, output)?;
    234 	};
    235 
    236 	match (exec::check(&status)) {
    237 	case void => void;
    238 	case let e: !exec::exit_status =>
    239 		if (ctx.mode == output::DEFAULT) {
    240 			fmt::errorln()?;
    241 		};
    242 		if (len(ctx.mods[t.idx].ns) > 0) {
    243 			fmt::fatalf("{} for {} ({}) {}",
    244 				ctx.cmds[t.kind], ctx.mods[t.idx].name,
    245 				ctx.mods[t.idx].path, exec::exitstr(e));
    246 		} else {
    247 			fmt::fatal(ctx.cmds[t.kind], "for",
    248 				ctx.mods[t.idx].name, exec::exitstr(e));
    249 		};
    250 	};
    251 
    252 	cleanup_task(ctx, t)?;
    253 	free_task(t);
    254 	io::close(j.lock)?;
    255 	ctx.completed += 1;
    256 	return i;
    257 };
    258 
    259 // update the cache after a task has been run
    260 fn cleanup_task(ctx: *context, t: *task) (void | error) = {
    261 	let out = get_cache(ctx, t.idx, t.kind)?;
    262 	defer free(out);
    263 
    264 	if (t.kind == stage::SSA) {
    265 		cleanup_ssa_task(ctx, t, out)?;
    266 	};
    267 
    268 	let tmp = strings::concat(out, ".tmp");
    269 	defer free(tmp);
    270 	os::move(tmp, out)?;
    271 };
    272 
    273 fn cleanup_ssa_task(ctx: *context, t: *task, out: str) (void | error) = {
    274 	// td file is hashed solely based on its contents. not worth doing this
    275 	// for other types of outputs, but it gets us better caching behavior
    276 	// for tds since we need to include the dependency tds in the ssa hash
    277 	// see design.txt for more details
    278 	let tmp = strings::concat(out, ".td.tmp");
    279 	defer free(tmp);
    280 
    281 	let f = match (os::create(tmp, 0o644, fs::flag::RDWR)) {
    282 	case let f: io::file =>
    283 		yield f;
    284 	case let err: fs::error =>
    285 		return err;
    286 	};
    287 	defer io::close(f)!;
    288 	let h = sha256::sha256();
    289 	io::copy(&h, f)!;
    290 	let prefix: [sha256::SZ]u8 = [0...];
    291 	hash::sum(&h, prefix);
    292 	ctx.hashes[t.idx][stage::TD] = prefix;
    293 
    294 	let ptr = strings::concat(out, ".td");
    295 	defer free(ptr);
    296 	let ptr = os::create(ptr, 0o644)?;
    297 	defer io::close(ptr)!;
    298 	hex::encode(ptr, prefix)?;
    299 
    300 	let td = update_env(ctx, t.idx)?;
    301 	defer free(td);
    302 	if (os::exists(td)) {
    303 		os::remove(tmp)?;
    304 	} else {
    305 		os::move(tmp, td)?;
    306 	};
    307 };
    308 
    309 // get the td for a module whose harec has been skipped
    310 fn get_td(ctx: *context, idx: size) (void | error) = {
    311 	let ssa = get_cache(ctx, idx, stage::SSA)?;
    312 	defer free(ssa);
    313 	let ptr = strings::concat(ssa, ".td");
    314 	defer free(ptr);
    315 	let ptr = match (os::open(ptr)) {
    316 	case fs::error =>
    317 		return;
    318 	case let ptr: io::file =>
    319 		yield ptr;
    320 	};
    321 	defer io::close(ptr)!;
    322 
    323 	let ptr = hex::newdecoder(ptr);
    324 	let prefix: [sha256::SZ]u8 = [0...];
    325 	io::readall(&ptr, prefix)?;
    326 	ctx.hashes[idx][stage::TD] = prefix;
    327 
    328 	free(update_env(ctx, idx)?);
    329 };
    330 
    331 // set $HARE_TD_<module>, returning the path to the module's td
    332 fn update_env(ctx: *context, idx: size) (str | error) = {
    333 	let path = get_cache(ctx, idx, stage::TD)?;
    334 	let ns = unparse::identstr(ctx.mods[idx].ns);
    335 	defer free(ns);
    336 	if (ctx.mode == output::VVERBOSE) {
    337 		fmt::errorfln("# HARE_TD_{}={}", ns, path)?;
    338 	};
    339 	let var = strings::concat("HARE_TD_", ns);
    340 	defer free(var);
    341 	os::setenv(var, path)!;
    342 	return path;
    343 };
    344 
    345 fn get_cache(ctx: *context, idx: size, kind: stage) (str | error) = {
    346 	let prefix = match (ctx.hashes[idx][kind]) {
    347 	case void => abort("expected non-void prefix in get_cache()");
    348 	case let prefix: [sha256::SZ]u8 =>
    349 		yield prefix;
    350 	};
    351 	let s = memio::dynamic();
    352 	memio::concat(&s, module::get_cache(ctx.ctx.harecache,
    353 		ctx.mods[idx].path)?)!;
    354 	memio::concat(&s, "/")!;
    355 	hex::encode(&s, prefix)!;
    356 	memio::concat(&s, ".", stage_ext[kind])!;
    357 	return memio::string(&s)!;
    358 };