fix: fix pm_async interface so its not accidentally sequential, procmon overhaul

This commit is contained in:
gmega 2025-06-20 21:15:57 -03:00
parent c4d1a0681e
commit 070b3d188a
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
4 changed files with 134 additions and 104 deletions

View File

@ -122,11 +122,10 @@ cdx_launch_node() {
local codex_cmd
codex_cmd=$(cdx_cmdline "$@") || return 1
(
$codex_cmd
pm_job_exit $?
)&
pm_track_last_job
cmd_array=()
IFS=' ' read -r -a cmd_array <<<"$codex_cmd"
pm_async "${cmd_array[@]}" -%- "codex"
_cdx_pids[$node_index]=$!
cdx_ensure_ready "$node_index"

View File

@ -39,6 +39,7 @@ pm_start() {
_pm_pid=${BASHPID}
while true; do
pm_known_pids
echoerr "Known PIDs:" "${result[@]}"
for pid in "${result[@]}"; do
if kill -0 "${pid}" 2> /dev/null; then
continue
@ -82,26 +83,6 @@ pm_start() {
return 0
}
# Tracks the last job started by the shell as part of a monitored group.
# If a tracked process dies:
# 1. without an error code (e.g. it is killed);
# 2. with a non-zero error code.
# Then all processes in the process group and their descendants (with caveats)
# are also killed. This makes sure that the harness does not leave processes
# behind.
#
# Returns:
# 1 if the process monitor is not running
# 0 otherwise
pm_track_last_job() {
_pm_assert_state "running" || return 1
local pid=$!
if [ ! -f "${_pm_output}/${pid}.pid" ]; then
touch "${_pm_output}/${pid}.pid"
fi
}
# Stops tracking a given PID. This means that the process dying or exiting
# with an error code will no longer stop the whole process group.
# Arguments:
@ -189,21 +170,6 @@ pm_join() {
await "$_pm_pid" "$1"
}
# This function is called by the shell running a background job before
# it exits to communicate the exit code to the process monitor.
# Arguments:
# $1: exit code
pm_job_exit() {
local pid_file="${_pm_output}/${BASHPID}.pid" exit_code=$1
# If the process is not tracked, don't write down an exit code.
if [ ! -f "$pid_file" ]; then
echoerr "[procmon] no PID file found for process $BASHPID"
else
echo "$exit_code" > "${_pm_output}/${BASHPID}.pid"
fi
exit "$exit_code"
}
# Kills a process and all of its descendants. This is full of caveats
# so make sure you see `test_procmon` for an example of how to use it.
# Arguments:
@ -248,14 +214,38 @@ pm_async() {
done
(
_pm_invoke_callback "start" "$proc_type" "$BASHPID"
set +e
_pm_job_started "${BASHPID}" "$proc_type"
trap '_pm_job_exited "${BASHPID}" "$proc_type" "killed"' TERM
trap '_pm_job_exited "${BASHPID}" "$proc_type" "$?"' EXIT
"${command[@]}"
exit_code=$?
_pm_invoke_callback "exit" "$proc_type" "$BASHPID" "$exit_code"
pm_job_exit "$exit_code"
) &
pm_track_last_job
echo $!
result=("$!")
}
_pm_job_started() {
local pid=$1 proc_type=$2
echoerr "[procmon] job started: $pid ($proc_type)"
if [ ! -f "${_pm_output}/${pid}.pid" ]; then
touch "${_pm_output}/${pid}.pid"
fi
_pm_invoke_callback "start" "$proc_type" "$pid"
}
_pm_job_exited() {
local pid=$1\
proc_type=$2\
exit_code=$3
local pid_file="${_pm_output}/${pid}.pid"
# If the process is not tracked, don't write down an exit code.
if [ ! -f "$pid_file" ]; then
echoerr "[procmon] no PID file found for process $pid"
else
echo "$exit_code" > "$pid_file"
fi
_pm_invoke_callback "exit" "$proc_type" "$pid" "$exit_code"
}
pm_register_callback() {

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bats
# shellcheck disable=SC2128
setup() {
load test_helper/common_setup
common_setup
@ -118,7 +118,9 @@ setup() {
filename=$(cdx_generate_file 10)
cid=$(cdx_upload_file 0 "$filename")
handle=$(cdx_download_file_async 0 "$cid")
cdx_download_file_async 0 "$cid"
handle=$result
await "$handle" 3
assert cdx_check_download 0 0 "$cid"
@ -136,7 +138,8 @@ setup() {
handles=()
for i in {1..4}; do
handles+=("$(cdx_download_file_async "$i" "$cid")")
cdx_download_file_async "$i" "$cid"
handles+=("$result")
done
assert await_all "${handles[@]}"
@ -160,7 +163,8 @@ setup() {
handles=()
for i in {1..4}; do
handles+=("$(cdx_download_file_async "$i" "$cid")")
cdx_download_file_async "$i" "$cid"
handles+=("$result")
done
assert await_all "${handles[@]}"
@ -170,9 +174,11 @@ setup() {
done
cdx_log_timings_end
pm_stop
assert [ -f "${_cdx_output}/experiment-0.csv" ]
assert [ "$(<"${_cdx_output}/experiment-0.csv" wc -l)" -eq 4 ]
decimal_regex='^[0-9]+(\.[0-9]+)?$'
while IFS=',' read -r experiment file_size operation node_index recorded_cid wallclock user system; do
@ -190,6 +196,21 @@ setup() {
done < "${_cdx_output}/experiment-0.csv"
}
# @test "should add a prometheus target for each Codex node when requested" {
# pm_start
# cdx_enable_prometheus "anexperiment" "84858"
# cdx_launch_node 0
# config_file="${_prom_output}/8290-anexperiment-84858-node-1-codex.json"
# assert [ -f "$config_file" ]
# cdx_destroy_node 0
# assert [ ! -f "$config_file" ]
# pm_stop
# }
teardown() {
clh_clear_outputs
}

View File

@ -1,3 +1,4 @@
# shellcheck disable=SC2128
setup() {
load test_helper/common_setup
common_setup
@ -68,23 +69,18 @@ setup() {
pm_known_pids
assert [ ${#result[@]} -eq 0 ]
(
# shellcheck disable=SC2317
job() {
while [ ! -f "${_pm_output}/sync" ]; do
sleep 0.1
done
pm_job_exit 0
) &
pm_track_last_job
p1=$!
}
(
while [ ! -f "${_pm_output}/sync" ]; do
sleep 0.1
done
pm_job_exit 0
) &
pm_track_last_job
p2=$!
pm_async job
p1=$result
pm_async job
p2=$result
pm_known_pids
assert [ ${#result[@]} -eq 2 ]
@ -108,23 +104,20 @@ setup() {
@test "should stop the monitor and all other processes if one process fails" {
assert pm_start
(
# shellcheck disable=SC2317
job() {
exit_code=$1
while [ ! -f "${_pm_output}/sync" ]; do
sleep 0.1
done
pm_job_exit 1
) &
pm_track_last_job
p1=$!
return 1
}
(
while [ ! -f "${_pm_output}/sync" ]; do
sleep 1
done
pm_job_exit 0
) &
pm_track_last_job
p2=$!
pm_async job 0
p1=$result
pm_async job 1
p2=$result
touch "${_pm_output}/sync"
@ -139,48 +132,54 @@ setup() {
@test "should no longer track a process if requested" {
assert pm_start
(
while true; do
sleep 1
done
pm_job_exit 1
) &
pid1=$!
pm_track_last_job
job() {
echoerr "starting job"
touch "${_pm_output}/sync"
sleep 50
}
pm_stop_tracking $pid1
kill -SIGKILL $pid1
pm_async job
pid1=$result
while [ ! -f "${_pm_output}/sync" ]; do
sleep 0.1
done
pm_stop_tracking "$pid1" # remove this and the test should fail
pm_kill_rec "$pid1"
await "$pid1"
# Again, we need to allow time for the procmon
# to pick up on the kill.
sleep 1
# Sleeps a bit to let the procmon catch up.
sleep 3
pm_stop
pm_join 3
assert_equal "$(pm_state)" "halted"
}
@test "should call lifecycle callbacks when processes start and stop" {
callback() {
local event="$1" proc_type="$2" pid="$3" exit_code
callback() {
local event="$1" proc_type="$2" pid="$3" exit_code
if [ "$event" = "start" ]; then
touch "${_pm_output}/${pid}-${proc_type}-start"
elif [ "$event" = "exit" ]; then
exit_code="$4"
touch "${_pm_output}/${pid}-${proc_type}-${exit_code}-exit"
fi
}
if [ "$event" = "start" ]; then
touch "${_pm_output}/${pid}-${proc_type}-start"
elif [ "$event" = "exit" ]; then
exit_code="$4"
touch "${_pm_output}/${pid}-${proc_type}-${exit_code}-exit"
fi
}
@test "should call lifecycle callbacks when processes start and stop" {
pm_register_callback "sleepy" "callback"
pm_start
pid1=$(pm_async sleep 0.1 -%- "sleepy")
pid2=$(pm_async sleep 0.1 -%- "sleepy")
pid3=$(pm_async sleep 0.1 -%- "awake")
pm_async sleep 0.1 -%- "sleepy"
pid1=$result
pm_async sleep 0.1 -%- "sleepy"
pid2=$result
pm_async sleep 0.1 -%- "awake"
pid3=$result
await "$pid1"
await "$pid2"
@ -197,3 +196,24 @@ setup() {
assert [ ! -f "${_pm_output}/${pid3}-awake-start" ]
assert [ ! -f "${_pm_output}/${pid3}-awake-0-exit" ]
}
@test "should invoke lifecycle callback when process is killed" {
pm_register_callback "sleepy" "callback"
pm_start
pm_async sleep 10 -%- "sleepy"
pid1=$result
echoerr "Run this line"
pm_async false -%- "sleepy"
pid2=$result
pm_join
assert_equal "$(pm_state)" "halted_process_failure"
assert [ -f "${_pm_output}/${pid1}-sleepy-start" ]
assert [ -f "${_pm_output}/${pid1}-sleepy-killed-exit" ]
assert [ -f "${_pm_output}/${pid2}-sleepy-start" ]
assert [ -f "${_pm_output}/${pid2}-sleepy-1-exit" ]
}