feat: add supporting scripts to run workflow retries

This commit is contained in:
gmega 2025-02-25 11:00:12 -03:00
parent 5a9543259b
commit 2061fe6dbe
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
4 changed files with 172 additions and 0 deletions

View File

@ -0,0 +1,52 @@
"""Queries the Argo Workflows API and collects inputs for failed nodes matching a given template name and group ID.
We need this because Argo will not respect parallelism in retries, so we spin the retry as a new workflow."""
import sys
from typing import Dict, Any
import requests
import json
from benchmarks.k8s.parameter_expander import normalize_argo_params
def collect_failed_inputs(group_id: str, template: str, workflows: Dict[str, Any]):
def _belongs_to_group(pars):
for parameter in pars:
if (
parameter.get("name") == "groupId"
and parameter.get("value") == group_id
):
return True
return False
for workflow in workflows["items"]:
for key, node in workflow["status"].get("nodes", {}).items():
if node.get("templateName") != template:
continue
if node.get("phase") != "Failed":
continue
parameters = node.get("inputs", {}).get("parameters", {})
if not parameters:
continue
if not _belongs_to_group(parameters):
continue
yield normalize_argo_params(parameters)
if __name__ == "__main__":
if len(sys.argv) != 5:
print(
"Usage: collect_failed_inputs.py <group_id> <template> <argo_api_host> <argo_api_port>"
)
sys.exit(1)
group_id, template, argo_api_host, argo_api_port = sys.argv[1:]
workflows = requests.get(
f"https://{argo_api_host}:{argo_api_port}/api/v1/workflows/argo", verify=False
).json()
print(json.dumps(list(collect_failed_inputs(group_id, template, workflows))))

View File

@ -0,0 +1,18 @@
import re
import sys
def increment_retry_counter(group_id: str) -> str:
retry = re.match(r"(.+)-r(\d+)$", group_id)
if not retry:
return f"{group_id}-r1"
return f"{retry.group(1)}-r{int(retry.group(2)) + 1}"
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: increment_retry_counter.py <group_id>")
sys.exit(1)
print(increment_retry_counter(sys.argv[1]))

View File

@ -0,0 +1,90 @@
from benchmarks.k8s.collect_failed_inputs import collect_failed_inputs
API_RESPONSE = {
"items": [
{"status": {"no-nodes": []}},
{
"status": {
"nodes": {
"codex-benchmark-4pkjd-1084037939": {
"templateName": "wrapped-benchmark-experiment",
"phase": "Failed",
"inputs": {
"parameters": [
{"name": "groupId", "value": "g1"},
{"name": "seeders", "value": "1"},
{"name": "repetitions", "value": "5"},
]
},
},
"codex-benchmark-4pkjd-1084037941": {
"templateName": "wrapped-benchmark-experiment",
"phase": "Failed",
"inputs": {
"parameters": [
{"name": "groupId", "value": "g3"},
{"name": "seeders", "value": "1"},
{"name": "repetitions", "value": "7"},
]
},
},
"codex-benchmark-4pkjd-1084037940": {
"templateName": "some-other-node",
"phase": "Succeeded",
"inputs": {
"parameters": [
{"name": "groupId", "value": "g1"},
{"name": "seeders", "value": "1"},
{"name": "repetitions", "value": "6"},
]
},
},
"codex-benchmark-4pkjd-1118304667": {
"templateName": "cleanup",
"phase": "Omitted",
},
}
}
},
]
}
def test_should_extract_parameters_for_failed_nodes_matching_template():
assert list(
collect_failed_inputs(
group_id="g1",
template="wrapped-benchmark-experiment",
workflows=API_RESPONSE,
)
) == [{"groupId": "g1", "seeders": 1, "repetitions": 5}]
assert list(
collect_failed_inputs(
group_id="g3",
template="wrapped-benchmark-experiment",
workflows=API_RESPONSE,
)
) == [{"groupId": "g3", "seeders": 1, "repetitions": 7}]
def test_should_return_empty_if_no_failing_nodes():
assert (
list(
collect_failed_inputs(
group_id="g1", template="some-other-node", workflows=API_RESPONSE
)
)
== []
)
def test_should_return_empty_if_no_matching_group_id():
assert (
list(
collect_failed_inputs(
group_id="g5", template="some-other-node", workflows=API_RESPONSE
)
)
== []
)

View File

@ -0,0 +1,12 @@
from benchmarks.k8s.increment_retry_counter import increment_retry_counter
def test_should_add_counter_if_absent():
assert increment_retry_counter("sometestgroup") == "sometestgroup-r1"
def test_should_increment_counter_if_present():
assert increment_retry_counter("sometestgroup-r1") == "sometestgroup-r2"
assert increment_retry_counter("sometestgroup-r2") == "sometestgroup-r3"
assert increment_retry_counter("sometestgroup-r10") == "sometestgroup-r11"
assert increment_retry_counter("sometestgroup-r100") == "sometestgroup-r101"