Shells in OCaml

Improve process group intialisation

Previously we relied on spawning a long-running [sleep] process in order
to have a process to make the process group leader. The issue was that
Eio was eager to reap child processes but that meant we could not
attach a bunch of processes in a pipeline to the pid of the first
process without race conditions. Now we explicitly allow users to hold
off on reaping children until a promise is resolved.

+109 -109
+14 -17
src/lib/built_ins.ml
··· 196 196 end 197 197 198 198 module Command = struct 199 - open Cmdliner 199 + (* Given the nature of how command works, we need to parse the strings 200 + of the arguments by ourself. *) 200 201 201 - let args = 202 - let doc = "Arguments to command." in 203 - Arg.(value & pos_all string [] & info [] ~docv:"ARGS" ~doc) 204 - 205 - let print_command = 206 - let doc = "Write a string to stdout of the command we would use." in 207 - Arg.(value & flag & info [ "v"; "V" ] ~docv:"V" ~doc) 208 - 209 - let t = 210 - let make_command print_command args = Command { print_command; args } in 211 - let term = Term.(const make_command $ print_command $ args) in 212 - let info = 213 - let doc = "Execute a simple command." in 214 - Cmd.info "command" ~doc 202 + let of_strings args = 203 + let any_flags, args = 204 + let rec loop acc = function 205 + | [] -> (acc, []) 206 + | arg :: args -> 207 + if String.starts_with ~prefix:"-" arg then loop (arg :: acc) args 208 + else (List.rev acc, arg :: args) 209 + in 210 + loop [] args 215 211 in 216 - Cmd.v info term 212 + let print_command = List.mem "-v" any_flags in 213 + Some (Ok (Command { args; print_command })) 217 214 end 218 215 219 216 module Make_dot (T : sig ··· 281 278 | "." :: _ as cmd -> exec_cmd cmd Dot.t 282 279 | "unset" :: _ as cmd -> exec_cmd cmd Unset.t 283 280 | "hash" :: _ as cmd -> exec_cmd cmd Hash.t 284 - | "command" :: _ as cmd -> exec_cmd cmd Command.t 281 + | "command" :: cmd -> Command.of_strings cmd 285 282 | "alias" :: _ -> Some (Ok Alias) 286 283 | "unalias" :: _ -> Some (Ok Unalias) 287 284 | "eval" :: _ as cmd -> exec_cmd cmd Eval.t
+65 -70
src/lib/eval.ml
··· 235 235 let s_len = String.length s in 236 236 if s.[0] = '"' && s.[s_len - 1] = '"' then String.sub s 1 (s_len - 2) else s 237 237 238 - let rec handle_pipeline ~async initial_ctx pipeline_switch p : ctx Exit.t = 239 - let mode = if async then Types.Async else Types.Switched pipeline_switch in 238 + let rec handle_pipeline ~async initial_ctx p : ctx Exit.t = 240 239 let set_last_background ~async process ctx = 241 240 if async then 242 241 { ctx with last_background_process = string_of_int (E.pid process) } ··· 248 247 | None -> ctx 249 248 | Some process -> set_last_background ~async process ctx 250 249 in 251 - let handle_job ~pgid j p = 252 - match (j, p) with 253 - | None, _ -> 254 - Option.some @@ J.make ~state:`Running pgid (Nlist.Singleton p) 255 - | Some j, `Process p -> Option.some @@ J.add_process p j 256 - | Some j, `Built_in p -> Option.some @@ J.add_built_in p j 257 - | Some j, `Error p -> Option.some @@ J.add_error p j 250 + let handle_job j p = 251 + match p with 252 + (* | None, _ -> *) 253 + (* let pgid = match pgid with Some p -> p | None -> Unix.getpid () in *) 254 + (* Option.some *) 255 + (* @@ J.make ~state:`Running ~reap:(Option.get reap) pgid *) 256 + (* (Nlist.Singleton p) *) 257 + | `Process p -> J.add_process p j 258 + | `Built_in p -> J.add_built_in p j 259 + | `Error p -> J.add_error p j 258 260 in 259 261 let close_stdout ~is_global some_write = 260 262 if not is_global then begin 261 263 Eio.Flow.close some_write 262 264 end 263 265 in 264 - let exec_process ctx job ?fds ?stdin ~stdout ~pgid executable args = 266 + let exec_process ~sw ctx job ?fds ?stdin ~stdout ?pgid executable args = 267 + let pgid = match pgid with None -> 0 | Some p -> p in 268 + let reap = J.get_reaper job in 269 + let mode = if async then Types.Async else Types.Switched sw in 265 270 let ctx, process = 266 271 match (executable, resolve_program ctx executable) with 267 272 | _, (ctx, None) | "", (ctx, _) -> ··· 274 279 (ctx, Error (127, `Not_found)) 275 280 | _, (ctx, Some full_path) -> 276 281 ( ctx, 277 - E.exec ctx.executor ?fds ?stdin ~stdout ~pgid ~mode 278 - ~cwd:(cwd_of_ctx ctx) 282 + E.exec ctx.executor ~delay_reap:(fst reap) ?fds ?stdin ~stdout 283 + ~pgid ~mode ~cwd:(cwd_of_ctx ctx) 279 284 ~env:(get_env ~extra:ctx.local_state ctx) 280 285 ~executable:full_path (executable :: args) ) 281 286 in 282 287 match process with 283 288 | Error (n, _) -> 284 - let job = handle_job ~pgid job (`Error n) in 289 + let job = handle_job job (`Error n) in 285 290 (on_process ~async ctx, job) 286 291 | Ok process -> 287 - let job = handle_job ~pgid job (`Process process) in 292 + let pgid = if Int.equal pgid 0 then E.pid process else pgid in 293 + let job = 294 + handle_job job (`Process process) |> fun j -> { j with id = pgid } 295 + in 288 296 (on_process ~async ~process ctx, job) 289 297 in 290 - let rec loop (ctx : ctx) (job : J.t option) 291 - ((pgid, stdout_of_previous) : 292 - int * Eio_unix.source_ty Eio_unix.source option) : 293 - Ast.command list -> ctx * J.t option = 298 + let job_pgid (t : J.t) = t.id in 299 + let rec loop pipeline_switch (ctx : ctx) (job : J.t) 300 + (stdout_of_previous : Eio_unix.source_ty Eio_unix.source option) : 301 + Ast.command list -> ctx * J.t = 294 302 fun c -> 303 + let loop = loop pipeline_switch in 295 304 match c with 296 305 | Ast.SimpleCommand (Prefixed (prefix, None, _suffix)) :: rest -> 297 306 let ctx = collect_assignments ctx prefix in 298 - loop ctx job (pgid, stdout_of_previous) rest 307 + loop ctx job stdout_of_previous rest 299 308 | Ast.SimpleCommand (Prefixed (prefix, Some executable, suffix)) :: rest 300 309 -> 301 310 let ctx = collect_assignments ~update:false ctx prefix in 302 - loop ctx job (pgid, stdout_of_previous) 311 + loop ctx job stdout_of_previous 303 312 (Ast.SimpleCommand (Named (executable, suffix)) :: rest) 304 313 | Ast.SimpleCommand (Named (executable, suffix)) :: rest -> ( 305 314 let ctx, executable = expand_cst ctx executable in ··· 345 354 in 346 355 match Built_ins.of_args (executable :: args_as_strings) with 347 356 | Some (Error _) -> 348 - (ctx, handle_job ~pgid job (`Built_in (Exit.nonzero () 1))) 357 + (ctx, handle_job job (`Built_in (Exit.nonzero () 1))) 349 358 | (None | Some (Ok (Command _))) as v -> ( 350 359 let is_command, command_args, print_command = 351 360 match v with ··· 359 368 | "export" -> 360 369 let updated = handle_export ctx args in 361 370 let job = 362 - handle_job ~pgid job (`Built_in (updated >|= fun _ -> ())) 371 + handle_job job (`Built_in (updated >|= fun _ -> ())) 363 372 in 364 - loop (Exit.value updated) job (pgid, stdout_of_previous) rest 373 + loop (Exit.value updated) job stdout_of_previous rest 365 374 | _ -> ( 366 375 let saved_ctx = ctx in 367 376 let func_app = ··· 376 385 close_stdout ~is_global some_write; 377 386 (* TODO: Proper job stuff and redirects etc. *) 378 387 let job = 379 - handle_job ~pgid job (`Built_in (ctx >|= fun _ -> ())) 388 + handle_job job (`Built_in (ctx >|= fun _ -> ())) 380 389 in 381 - loop saved_ctx job (pgid, some_read) rest 390 + loop saved_ctx job some_read rest 382 391 | None -> ( 383 392 match Built_ins.of_args command_args with 384 393 | Some (Error _) -> 385 - ( ctx, 386 - handle_job ~pgid job (`Built_in (Exit.nonzero () 1)) 387 - ) 394 + (ctx, handle_job job (`Built_in (Exit.nonzero () 1))) 388 395 | Some (Ok bi) -> 389 396 let ctx = 390 397 handle_built_in ~rdrs ~stdout:some_write ctx bi 391 398 in 392 399 close_stdout ~is_global some_write; 393 400 let built_in = ctx >|= fun _ -> () in 394 - let job = handle_job ~pgid job (`Built_in built_in) in 395 - loop (Exit.value ctx) job (pgid, some_read) rest 401 + let job = handle_job job (`Built_in built_in) in 402 + loop (Exit.value ctx) job some_read rest 396 403 | _ -> ( 397 404 let exec_and_args = 398 405 if is_command then begin ··· 412 419 match exec_and_args with 413 420 | Exit.Nonzero _ as v -> 414 421 let job = 415 - handle_job ~pgid job 416 - (`Built_in (v >|= fun _ -> ())) 422 + handle_job job (`Built_in (v >|= fun _ -> ())) 417 423 in 418 - loop ctx job (pgid, some_read) rest 424 + loop ctx job some_read rest 419 425 | Exit.Zero (executable, args) -> ( 420 426 match stdout_of_previous with 421 427 | None -> 422 428 let ctx, job = 423 - exec_process ctx job ~fds:rdrs 424 - ~stdout:some_write ~pgid executable args 429 + exec_process ~sw:pipeline_switch ctx job 430 + ~fds:rdrs ~stdout:some_write 431 + ~pgid:(job_pgid job) executable args 425 432 in 426 433 close_stdout ~is_global some_write; 427 - loop ctx job (pgid, some_read) rest 434 + loop ctx job some_read rest 428 435 | Some stdout -> 429 436 let ctx, job = 430 - exec_process ctx job ~fds:rdrs ~stdin:stdout 431 - ~stdout:some_write ~pgid executable 437 + exec_process ~sw:pipeline_switch ctx job 438 + ~fds:rdrs ~stdin:stdout ~stdout:some_write 439 + ~pgid:(job_pgid job) executable 432 440 args_as_strings 433 441 in 434 442 close_stdout ~is_global some_write; 435 - loop ctx job (pgid, some_read) rest))))) 443 + loop ctx job some_read rest))))) 436 444 | Some (Ok bi) -> 437 445 let ctx = handle_built_in ~rdrs ~stdout:some_write ctx bi in 438 446 close_stdout ~is_global some_write; 439 447 let built_in = ctx >|= fun _ -> () in 440 - let job = handle_job ~pgid job (`Built_in built_in) in 441 - loop (Exit.value ctx) job (pgid, some_read) rest) 448 + let job = handle_job job (`Built_in built_in) in 449 + loop (Exit.value ctx) job some_read rest) 442 450 | CompoundCommand (c, rdrs) :: rest -> 443 451 let _rdrs = 444 452 List.map (handle_one_redirection ~sw:pipeline_switch ctx) rdrs 445 453 in 446 454 (* TODO: No way this is right *) 447 455 let ctx = handle_compound_command ctx c in 448 - let job = handle_job ~pgid job (`Built_in (ctx >|= fun _ -> ())) in 449 - loop (Exit.value ctx) job (pgid, None) rest 456 + let job = handle_job job (`Built_in (ctx >|= fun _ -> ())) in 457 + loop (Exit.value ctx) job None rest 450 458 | FunctionDefinition (name, (body, _rdrs)) :: rest -> 451 459 let ctx = { ctx with functions = (name, body) :: ctx.functions } in 452 - loop ctx job (pgid, None) rest 460 + loop ctx job None rest 453 461 | [] -> (clear_local_state ctx, job) 454 462 in 455 463 (* HACK: when running the pipeline, we need a process group to 456 464 put everything in. Eio's model of execution is nice, but we cannot 457 465 safely delay execution of a process. So instead we create a ghost 458 466 process that last just until all of the processes are setup. *) 459 - let ctx, job = 460 - let ghost_process = 461 - match resolve_program ~update:false initial_ctx "sleep" with 462 - | _, None -> Fmt.failwith "Sleep not found\n%!" 463 - | ctx, Some sleep -> ( 464 - E.exec ~mode:(Types.Switched pipeline_switch) ~pgid:0 465 - ~cwd:(cwd_of_ctx ctx) ctx.executor ~executable:sleep 466 - [ "sleep"; "99999999" ] 467 - |> function 468 - | Ok p -> p 469 - | Error (n, `Not_found) -> 470 - Fmt.epr "Interal error ghost process: not found"; 471 - exit n) 472 - in 473 - loop initial_ctx None (E.pid ghost_process, None) p 474 - in 475 - match job with 476 - | None -> Exit.zero ctx 477 - | Some job -> 467 + Eio.Switch.run @@ fun sw -> 468 + let initial_job = J.make 0 [] in 469 + let ctx, job = loop sw initial_ctx initial_job None p in 470 + match job.processes with 471 + | [] -> Exit.zero ctx 472 + | _ :: _ -> 478 473 if not async then begin 479 474 J.await_exit ~pipefail:false ~interactive:ctx.interactive job 480 475 >|= fun () -> ctx ··· 670 665 expand_redirects (ctx, v :: acc) rest 671 666 | s :: rest -> expand_redirects (ctx, s :: acc) rest 672 667 673 - and handle_and_or ~sw ~async ctx c = 668 + and handle_and_or ~sw:_ ~async ctx c = 674 669 let pipeline = function 675 670 | Ast.Pipeline p -> (Fun.id, p) 676 671 | Ast.Pipeline_Bang p -> (Exit.not, p) ··· 684 679 match exit_so_far with 685 680 | Exit.Zero ctx -> 686 681 let f, p = pipeline p in 687 - f @@ handle_pipeline ~async ctx sw p 682 + f @@ handle_pipeline ~async ctx p 688 683 | v -> v) 689 684 | Or, Nlist.Singleton (p, _) -> ( 690 685 match exit_so_far with 691 686 | Exit.Zero _ as ctx -> ctx 692 687 | _ -> 693 688 let f, p = pipeline p in 694 - f @@ handle_pipeline ~async ctx sw p) 689 + f @@ handle_pipeline ~async ctx p) 695 690 | Noand_or, Nlist.Singleton (p, _) -> 696 691 let f, p = pipeline p in 697 - f @@ handle_pipeline ~async ctx sw p 692 + f @@ handle_pipeline ~async ctx p 698 693 | Noand_or, Nlist.Cons ((p, next_sep), rest) -> 699 694 let f, p = pipeline p in 700 - let exit_status = f (handle_pipeline ~async ctx sw p) in 695 + let exit_status = f (handle_pipeline ~async ctx p) in 701 696 fold (next_sep, exit_status) rest 702 697 | And, Nlist.Cons ((p, next_sep), rest) -> ( 703 698 match exit_so_far with 704 699 | Exit.Zero ctx -> 705 700 let f, p = pipeline p in 706 - fold (next_sep, f (handle_pipeline ~async ctx sw p)) rest 701 + fold (next_sep, f (handle_pipeline ~async ctx p)) rest 707 702 | Exit.Nonzero _ as v -> v) 708 703 | Or, Nlist.Cons ((p, next_sep), rest) -> ( 709 704 match exit_so_far with 710 705 | Exit.Zero _ as exit_so_far -> fold (next_sep, exit_so_far) rest 711 706 | Exit.Nonzero _ -> 712 707 let f, p = pipeline p in 713 - fold (next_sep, f (handle_pipeline ~async ctx sw p)) rest) 708 + fold (next_sep, f (handle_pipeline ~async ctx p)) rest) 714 709 in 715 710 fold (Noand_or, Exit.zero ctx) c 716 711
+15 -11
src/lib/job.ml
··· 1 - open Import 2 - 3 1 module Make (E : Types.Exec) = struct 4 2 type t = { 5 3 state : [ `Running ]; 4 + reap : unit Eio.Promise.t * unit Eio.Promise.u; 6 5 id : int; 7 6 (* Process list is in reverse order *) 8 7 processes : 9 - [ `Process of E.process | `Built_in of unit Exit.t | `Error of int ] 10 - Nlist.t; 8 + [ `Process of E.process | `Built_in of unit Exit.t | `Error of int ] list; 11 9 } 12 10 13 - let make ?(state = `Running) id processes = { state; id; processes } 11 + let get_reaper t = t.reap 12 + 13 + let make ?(state = `Running) id processes = 14 + let reap = Eio.Promise.create () in 15 + { state; id; processes; reap } 14 16 15 17 let add_process proc t = 16 - { t with processes = Nlist.cons (`Process proc) t.processes } 18 + { t with processes = List.cons (`Process proc) t.processes } 17 19 18 20 let add_built_in b t = 19 - { t with processes = Nlist.cons (`Built_in b) t.processes } 21 + { t with processes = List.cons (`Built_in b) t.processes } 20 22 21 - let add_error b t = { t with processes = Nlist.cons (`Error b) t.processes } 23 + let add_error b t = { t with processes = List.cons (`Error b) t.processes } 22 24 23 25 (* Section 2.9.2 https://pubs.opengroup.org/onlinepubs/9799919799/ *) 24 26 let await_exit ~pipefail ~interactive t = 27 + Eio.Promise.resolve (snd t.reap) (); 28 + Eio.Fiber.yield (); 25 29 let await = function 26 30 | `Process p -> 27 31 if interactive then ··· 30 34 | `Built_in b -> b 31 35 | `Error n -> Exit.nonzero () n 32 36 in 33 - match pipefail with 34 - | false -> await (Nlist.hd t.processes) 35 - | _ -> Fmt.failwith "TODO: pipefail" 37 + match (pipefail, t.processes) with 38 + | false, x :: _ -> await x 39 + | _ -> Fmt.failwith "TODO: pipefail or no processes" 36 40 end
+10 -7
src/lib/posix/exec.ml
··· 70 70 | Merry.Types.Async -> () 71 71 | Merry.Types.Switched sw -> f sw 72 72 73 - let spawn ~mode actions = 73 + let spawn ?delay_reap ~mode actions = 74 74 with_pipe @@ fun errors_r errors_w -> 75 75 Eio_unix.Private.Fork_action.with_actions actions @@ fun c_actions -> 76 76 iter_switch ~f:Switch.check mode; ··· 98 98 reap t set_exit_status) 99 99 in 100 100 Fiber.fork_daemon ~sw (fun () -> 101 + Option.iter Eio.Promise.await delay_reap; 101 102 reap t set_exit_status; 102 103 Switch.remove_hook hook; 103 104 `Stop_daemon)) ··· 207 208 Eio_unix.Private.Fork_action. 208 209 { run = (fun k -> k (Obj.repr (action_dups, plan, blocking))) } 209 210 210 - let spawn_unix () ~mode ~fork_actions ?pgid ?uid ?gid ~env ~fds ~executable ~cwd 211 - args = 211 + let spawn_unix () ?delay_reap ~mode ~fork_actions ?pgid ?uid ?gid ~env ~fds 212 + ~executable ~cwd args = 212 213 let open Eio_posix in 213 214 let actions = 214 215 [ ··· 248 249 in 249 250 fn (Low_level.Process.Fork_action.fchdir cwd :: actions) 250 251 in 251 - with_actions cwd @@ fun actions -> process (Process.spawn ~mode actions) 252 + with_actions cwd @@ fun actions -> 253 + process (Process.spawn ?delay_reap ~mode actions) 252 254 253 255 let fd_equal_int fd i = 254 256 Eio_unix.Fd.use_exn "fd_equal_int" fd @@ fun ufd -> ··· 257 259 258 260 let pp_redirections ppf (i, fd, _) = Fmt.pf ppf "(%i,%a)" i Eio_unix.Fd.pp fd 259 261 260 - let run ~mode _ ?stdin ?stdout ?stderr ?(fds = []) ?(fork_actions = []) ~pgid 261 - ~cwd ?env ?executable args = 262 + let run ~mode ?delay_reap _ ?stdin ?stdout ?stderr ?(fds = []) 263 + ?(fork_actions = []) ~pgid ~cwd ?env ?executable args = 262 264 with_close_list @@ fun to_close -> 263 265 let check_fd n = function 264 266 | Merry.Types.Redirect (m, _, _) -> Int.equal n m ··· 302 304 let fds = std_fds @ fds in 303 305 let executable = get_executable executable ~args in 304 306 let env = get_env env in 305 - spawn_unix ~mode ~fork_actions ~cwd ~pgid ~fds ~env ~executable () args 307 + spawn_unix ?delay_reap ~mode ~fork_actions ~cwd ~pgid ~fds ~env ~executable () 308 + args
+4 -4
src/lib/posix/merry_posix.ml
··· 15 15 | `Exited n -> Merry.Exit.nonzero () n 16 16 | `Signaled n -> Merry.Exit.nonzero () n 17 17 18 - let exec ?(fork_actions = []) ?(fds = []) ?stdin ?stdout ?stderr ?env ~mode 19 - ~pgid ~cwd ~executable t args = 18 + let exec ?delay_reap ?(fork_actions = []) ?(fds = []) ?stdin ?stdout ?stderr 19 + ?env ~mode ~pgid ~cwd ~executable t args = 20 20 let env = 21 21 Option.map 22 22 (fun lst -> List.map (fun (a, b) -> a ^ "=" ^ b) lst |> Array.of_list) ··· 24 24 in 25 25 try 26 26 Ok 27 - (Exec.run ~fork_actions ~mode ~fds ~pgid ~cwd ?stdin ?stdout ?stderr 28 - ?env t ~executable args) 27 + (Exec.run ?delay_reap ~fork_actions ~mode ~fds ~pgid ~cwd ?stdin ?stdout 28 + ?stderr ?env t ~executable args) 29 29 with Eio.Io (Eio.Process.E (Eio.Process.Executable_not_found m), _ctx) -> 30 30 Fmt.epr "msh: command not found: %s\n%!" m; 31 31 Error (127, `Not_found)
+1
src/lib/types.ml
··· 61 61 val pid : process -> int 62 62 63 63 val exec : 64 + ?delay_reap:unit Eio.Promise.t -> 64 65 ?fork_actions:Eio_unix__.Fork_action.t list -> 65 66 ?fds:redirect list -> 66 67 ?stdin:_ Eio.Flow.source ->