feat: add timing logs to downloads

This commit is contained in:
gmega 2025-06-20 14:35:51 -03:00
parent 9fcc15f79b
commit 7d6e1da293
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
5 changed files with 163 additions and 35 deletions

View File

@ -24,12 +24,24 @@ fi
# Output folders # Output folders
_cdx_output=$(clh_output_folder "codex") _cdx_output=$(clh_output_folder "codex")
# generated files
_cdx_genfiles="${_cdx_output}/genfiles" _cdx_genfiles="${_cdx_output}/genfiles"
# downloaded files, per node. File names are CIDs
_cdx_downloads="${_cdx_output}/downloads" _cdx_downloads="${_cdx_output}/downloads"
# SHA1 of uploaded files, per node. File names are CIDs
_cdx_uploads="${_cdx_output}/uploads" _cdx_uploads="${_cdx_output}/uploads"
# Codex node logs, per node
_cdx_logs="${_cdx_output}/logs" _cdx_logs="${_cdx_output}/logs"
# Codex data directories, per node
_cdx_data="${_cdx_output}/data" _cdx_data="${_cdx_output}/data"
# Partial timings, per operation per node
_cdx_timing_partials="${_cdx_output}/timing"
# Custom prefix for timing logs
_cdx_timing_prefix=""
# Log file where timings are aggregated
_cdx_timing_log="/dev/null"
# Base ports and timeouts # Base ports and timeouts
_cdx_base_api_port=8080 _cdx_base_api_port=8080
_cdx_base_disc_port=8190 _cdx_base_disc_port=8190
@ -104,7 +116,8 @@ cdx_get_spr() {
cdx_launch_node() { cdx_launch_node() {
local node_index="$1" local node_index="$1"
_cdx_ensure_outputs "${node_index}" || return 1 _cdx_init_global_outputs || return 1
_cdx_init_node_outputs "${node_index}" || return 1
local codex_cmd local codex_cmd
codex_cmd=$(cdx_cmdline "$@") || return 1 codex_cmd=$(cdx_cmdline "$@") || return 1
@ -172,15 +185,22 @@ cdx_ensure_ready() {
done done
} }
_cdx_ensure_outputs() { _cdx_init_node_outputs() {
local node_index="$1" local node_index="$1"
mkdir -p "${_cdx_logs}" || return 1
mkdir -p "${_cdx_data}/codex-${node_index}" || return 1 mkdir -p "${_cdx_data}/codex-${node_index}" || return 1
mkdir -p "${_cdx_genfiles}" || return 1
mkdir -p "${_cdx_downloads}/codex-${node_index}" || return 1 mkdir -p "${_cdx_downloads}/codex-${node_index}" || return 1
mkdir -p "${_cdx_uploads}/codex-${node_index}" || return 1 mkdir -p "${_cdx_uploads}/codex-${node_index}" || return 1
} }
# XXX: output initialization is a bit of a pain. Right now it's
# being piggybacked on cdx_launch_node and cdx_log_timings_start
# so we don't have to add extra initialization calls.
_cdx_init_global_outputs() {
mkdir -p "${_cdx_logs}" || return 1
mkdir -p "${_cdx_genfiles}" || return 1
mkdir -p "${_cdx_timing_partials}" || return 1
}
cdx_generate_file() { cdx_generate_file() {
local size_mb="${1}" filename local size_mb="${1}" filename
filename="${_cdx_genfiles}/file-$(date +%s).bin" filename="${_cdx_genfiles}/file-$(date +%s).bin"
@ -208,19 +228,21 @@ cdx_upload_file() {
} }
cdx_download_file() { cdx_download_file() {
local node_index="$1" cid="$2" local node_index="$1" cid="$2" timestamp
curl --silent --fail\ timestamp="$(date +%s)" || return 1
TIMEFORMAT="${_cdx_timing_prefix}download,${node_index},${cid},%E,%U,%S"
# Note that timing partial filenames are constructed so that lexicographic sorting
# puts the most recent entries first, while at the same time breaking ties arbitrarily
# for entries that happen within the same second.
{ time curl --silent --fail\
-XGET "http://localhost:$(_cdx_api_port "$node_index")/api/codex/v1/data/$cid/network/stream"\ -XGET "http://localhost:$(_cdx_api_port "$node_index")/api/codex/v1/data/$cid/network/stream"\
-o "${_cdx_downloads}/codex-${node_index}/$cid" || return 1 -o "${_cdx_downloads}/codex-${node_index}/$cid" ; } 2> \
"${_cdx_timing_partials}/codex-${node_index}-${timestamp}-${RANDOM}.csv"
} }
cdx_download_file_async() { cdx_download_file_async() {
( pm_async cdx_download_file "$@"
cdx_download_file "$@"
pm_job_exit $?
) &
pm_track_last_job
echo $!
} }
cdx_upload_sha1() { cdx_upload_sha1() {
@ -250,4 +272,32 @@ cdx_check_download() {
return 1 return 1
fi fi
return 0 return 0
}
cdx_log_timings_start() {
_cdx_init_global_outputs || return 1
local log_file="$1" prefix="$2"
touch "$log_file" || return 1
_cdx_timing_log="$log_file"
if [[ ! "$prefix" =~ ',$' ]]; then
prefix="$prefix,"
fi
_cdx_timing_prefix="$prefix"
}
cdx_flush_partial_timings() {
for file in "${_cdx_timing_partials}"/*; do
cat "$file" >> "${_cdx_timing_log}" || return 1
rm "$file"
done
}
cdx_log_timings_end() {
cdx_flush_partial_timings
_cdx_timing_log="/dev/null"
_cdx_timing_prefix=""
} }

View File

@ -1,4 +1,10 @@
#!/usr/bin/env bash #!/usr/bin/env bash
#
# procmon is a process monitor that tracks a set (group) of processes
# and kills the entire process group and all of its descendants if one
# of them fails or gets killed. It is used to ensure that no processes
# from failed experiments are left behind.
#
set -o pipefail set -o pipefail
LIB_SRC=${LIB_SRC:-$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)} LIB_SRC=${LIB_SRC:-$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)}
@ -32,14 +38,14 @@ pm_start() {
while true; do while true; do
pm_known_pids pm_known_pids
for pid in "${result[@]}"; do for pid in "${result[@]}"; do
if kill -0 "${pid}"; then if kill -0 "${pid}" 2> /dev/null; then
continue continue
fi fi
exit_code=$(cat "${_pm_output}/${pid}.pid") exit_code=$(cat "${_pm_output}/${pid}.pid")
# If the cat fails, this means the file was deleted. This will typically # If the cat fails, this means the file was deleted, which means
# only happen this way if the process was removed from tracking and then # the process is no longer being tracked but the call to pm_stop_tracking
# killed right after. # happened after we called pm_known_pids last. Simply ignore the process.
# #
# shellcheck disable=SC2181 # shellcheck disable=SC2181
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
@ -47,17 +53,22 @@ pm_start() {
continue continue
fi fi
# Parent process crashed or got killed. We won't get a return code in
# these cases.
if [ -z "$exit_code" ]; then if [ -z "$exit_code" ]; then
echoerr "[procmon] ${pid} died with unknown exit code. Aborting." echoerr "[procmon] ${pid} died with unknown exit code. Aborting."
_pm_halt "halted_no_return" _pm_halt "halted_no_return"
fi fi
# Parent process exited successfully, all good.
if [ "$exit_code" -eq 0 ]; then if [ "$exit_code" -eq 0 ]; then
echoerr "[procmon] ${pid} died with exit code $exit_code." echoerr "[procmon] ${pid} died with exit code $exit_code."
rm "${_pm_output}/${pid}.pid" rm "${_pm_output}/${pid}.pid"
continue continue
fi fi
# If we got thus far, the parent process died with a non-zero exit code,
# so we kill the whole process group.
echoerr "[procmon] ${pid} is dead with exit code $exit_code. Aborting." echoerr "[procmon] ${pid} is dead with exit code $exit_code. Aborting."
_pm_halt "halted_process_failure" _pm_halt "halted_process_failure"
done done
@ -89,17 +100,13 @@ pm_track_last_job() {
fi fi
} }
# Stops tracking a given PID. # Stops tracking a given PID. This means that the process dying or exiting
# with an error code will no longer stop the whole process group.
# Arguments: # Arguments:
# $1: PID to stop tracking # $1: PID to stop tracking
# Returns: # Returns:
# 1 if the process monitor is not running # 1 if the process monitor is not running
# 0 otherwise # 0 otherwise
# Note:
# This function is flaky. The process monitor
# might still see the PID as tracked after this
# function returns for a short period of time,
# so do not rely too much on it.
pm_stop_tracking() { pm_stop_tracking() {
_pm_assert_state "running" || return 1 _pm_assert_state "running" || return 1
@ -172,10 +179,18 @@ pm_stop() {
_pm_halt "halted" _pm_halt "halted"
} }
# Waits for the process monitor to exit. Returns immediately if
# the process monitor is not running.
# Arguments:
# $1: timeout in seconds
pm_join() { pm_join() {
await "$_pm_pid" "$1" await "$_pm_pid" "$1"
} }
# This function is called by the shell running a background job before
# it exits to communicate the exit code to the process monitor.
# Arguments:
# $1: exit code
pm_job_exit() { pm_job_exit() {
local pid_file="${_pm_output}/${BASHPID}.pid" exit_code=$1 local pid_file="${_pm_output}/${BASHPID}.pid" exit_code=$1
# If the process is not tracked, don't write down an exit code. # If the process is not tracked, don't write down an exit code.
@ -187,6 +202,10 @@ pm_job_exit() {
exit "$exit_code" exit "$exit_code"
} }
# Kills a process and all of its descendants. This is full of caveats
# so make sure you see `test_procmon` for an example of how to use it.
# Arguments:
# $1: process ID
pm_kill_rec() { pm_kill_rec() {
local parent="$1" descendant local parent="$1" descendant
@ -209,6 +228,15 @@ pm_list_descendants() {
_pm_list_descendants "$@" _pm_list_descendants "$@"
} }
pm_async() {
(
"$@"
pm_job_exit "$?"
) &
pm_track_last_job
echo $!
}
_pm_list_descendants() { _pm_list_descendants() {
local parent="$1" local parent="$1"
result+=("${parent}") result+=("${parent}")

View File

@ -28,13 +28,14 @@ echoerr() {
await() { await() {
local pid=$1 timeout=${2:-30} start="${SECONDS}" local pid=$1 timeout=${2:-30} start="${SECONDS}"
while kill -0 "$pid"; do while kill -0 "$pid" 2> /dev/null; do
if ((SECONDS - start > timeout)); then if ((SECONDS - start > timeout)); then
echoerr "Error: timeout waiting for process $pid to exit" echoerr "Error: timeout waiting for process $pid to exit"
return 1 return 1
fi fi
sleep 0.1 sleep 0.1
done done
echoerr "Process $pid exited"
return 0 return 0
} }

View File

@ -4,10 +4,12 @@ setup() {
load test_helper/common_setup load test_helper/common_setup
common_setup common_setup
# shellcheck source=./src/codex.bash
source "${LIB_SRC}/codex.bash" source "${LIB_SRC}/codex.bash"
} }
@test "should generate the correct Codex command line for node 0" { @test "should generate the correct Codex command line for node 0" {
# shellcheck disable=SC2140
assert_equal "$(cdx_cmdline 0)" "${_cdx_binary} --nat:none"\ assert_equal "$(cdx_cmdline 0)" "${_cdx_binary} --nat:none"\
" --log-file=${_cdx_output}/logs/codex-0.log"\ " --log-file=${_cdx_output}/logs/codex-0.log"\
" --data-dir=${_cdx_output}/data/codex-0"\ " --data-dir=${_cdx_output}/data/codex-0"\
@ -15,6 +17,7 @@ setup() {
} }
@test "should generate the correct Codex command line for node 1" { @test "should generate the correct Codex command line for node 1" {
# shellcheck disable=SC2140
assert_equal "$(cdx_cmdline 1 '--bootstrap-node' 'node-spr')" "${_cdx_binary} --nat:none"\ assert_equal "$(cdx_cmdline 1 '--bootstrap-node' 'node-spr')" "${_cdx_binary} --nat:none"\
" --bootstrap-node=node-spr --log-file=${_cdx_output}/logs/codex-1.log"\ " --bootstrap-node=node-spr --log-file=${_cdx_output}/logs/codex-1.log"\
" --data-dir=${_cdx_output}/data/codex-1"\ " --data-dir=${_cdx_output}/data/codex-1"\
@ -27,6 +30,7 @@ setup() {
} }
@test "should generate metrics options when metrics enabled for node" { @test "should generate metrics options when metrics enabled for node" {
# shellcheck disable=SC2140
assert_equal "$(cdx_cmdline 0 --metrics)" "${_cdx_binary} --nat:none"\ assert_equal "$(cdx_cmdline 0 --metrics)" "${_cdx_binary} --nat:none"\
" --metrics --metrics-port=8290 --metrics-address=0.0.0.0"\ " --metrics --metrics-port=8290 --metrics-address=0.0.0.0"\
" --log-file=${_cdx_output}/logs/codex-0.log"\ " --log-file=${_cdx_output}/logs/codex-0.log"\
@ -69,7 +73,8 @@ setup() {
refute [ -f "${_cdx_output}/logs/codex-0.log" ] refute [ -f "${_cdx_output}/logs/codex-0.log" ]
assert [ -z "${_cdx_pids[0]}" ] assert [ -z "${_cdx_pids[0]}" ]
assert $(! kill -0 "$pid") # Node should already be dead.
refute kill -0 "$pid"
pm_stop pm_stop
} }
@ -81,7 +86,7 @@ setup() {
filename=$(cdx_generate_file 10) filename=$(cdx_generate_file 10)
echo "$(sha1 "$filename")" > "${_cdx_uploads}/codex-0/fakecid.sha1" sha1 "$filename" > "${_cdx_uploads}/codex-0/fakecid.sha1"
cp "$filename" "${_cdx_downloads}/codex-1/fakecid" cp "$filename" "${_cdx_downloads}/codex-1/fakecid"
# Checks that the file uploaded at 0 matches the file downloaded at 1. # Checks that the file uploaded at 0 matches the file downloaded at 1.
@ -114,7 +119,7 @@ setup() {
cid=$(cdx_upload_file 0 "$filename") cid=$(cdx_upload_file 0 "$filename")
handle=$(cdx_download_file_async 0 "$cid") handle=$(cdx_download_file_async 0 "$cid")
await $handle 3 await "$handle" 3
assert cdx_check_download 0 0 "$cid" assert cdx_check_download 0 0 "$cid"
@ -131,7 +136,7 @@ setup() {
handles=() handles=()
for i in {1..4}; do for i in {1..4}; do
handles+=($(cdx_download_file_async "$i" "$cid")) handles+=("$(cdx_download_file_async "$i" "$cid")")
done done
assert await_all "${handles[@]}" assert await_all "${handles[@]}"
@ -143,6 +148,48 @@ setup() {
pm_stop pm_stop
} }
@test "should log download timing information when requested" {
pm_start
cdx_log_timings_start "${_cdx_output}/experiment-0.csv" "experiment-0,100MB"
assert cdx_launch_network 5
filename=$(cdx_generate_file 10)
cid=$(cdx_upload_file 0 "$filename")
handles=()
for i in {1..4}; do
handles+=("$(cdx_download_file_async "$i" "$cid")")
done
assert await_all "${handles[@]}"
for i in {1..4}; do
assert cdx_check_download 0 "$i" "$cid"
done
cdx_log_timings_end
pm_stop
decimal_regex='^[0-9]+(\.[0-9]+)?$'
while IFS=',' read -r experiment file_size operation node_index recorded_cid wallclock user system; do
assert [ "$experiment" = "experiment-0" ]
assert [ "$file_size" = "100MB" ]
assert [ "$recorded_cid" = "$cid" ]
assert [ "$operation" = "download" ]
# We can't use asserts for regex matches so use "raw" bats
# assertions.
[[ "$node_index" =~ [1-4] ]]
[[ "$wallclock" =~ $decimal_regex ]]
[[ "$user" =~ $decimal_regex ]]
[[ "$system" =~ $decimal_regex ]]
done < "${_cdx_output}/experiment-0.csv"
}
teardown() { teardown() {
clh_clear_outputs clh_clear_outputs
} }

View File

@ -2,6 +2,7 @@ setup() {
load test_helper/common_setup load test_helper/common_setup
common_setup common_setup
# shellcheck source=./src/procmon.bash
source "${LIB_SRC}/procmon.bash" source "${LIB_SRC}/procmon.bash"
} }
@ -46,15 +47,15 @@ setup() {
} }
@test "should not start process monitor twice" { @test "should not start process monitor twice" {
assert_equal $(pm_state) "halted" assert_equal "$(pm_state)" "halted"
assert pm_start assert pm_start
assert_equal $(pm_state) "running" assert_equal "$(pm_state)" "running"
refute pm_start refute pm_start
assert pm_stop assert pm_stop
assert_equal $(pm_state) "halted" assert_equal "$(pm_state)" "halted"
} }
@test "should not stop the process monitor if it wasn't started" { @test "should not stop the process monitor if it wasn't started" {
@ -132,7 +133,7 @@ setup() {
pm_join 3 pm_join 3
assert_equal $(pm_state) "halted_process_failure" assert_equal "$(pm_state)" "halted_process_failure"
} }
@test "should no longer track a process if requested" { @test "should no longer track a process if requested" {
@ -158,5 +159,6 @@ setup() {
pm_stop pm_stop
pm_join 3 pm_join 3
assert_equal $(pm_state) "halted" assert_equal "$(pm_state)" "halted"
} }