feat: add ordering to parameter matrix expander

This commit is contained in:
gmega 2025-01-22 17:12:46 -03:00
parent aad78b9faa
commit 8096c9f4e0
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
3 changed files with 60 additions and 10 deletions

View File

@ -4,13 +4,18 @@ import itertools
import json import json
import sys import sys
from json import JSONDecodeError from json import JSONDecodeError
from typing import Dict, Any, List, Tuple from typing import Dict, Any, List, Tuple, Optional
def expand(parameters: Dict[str, Any], run_id: bool = False) -> List[Dict[str, Any]]: def expand(
parameters: Dict[str, Any],
order_by: Optional[List[str]] = None,
run_id: bool = False,
) -> List[Dict[str, Any]]:
simple = {} simple = {}
constrained = {} constrained = {}
fixed = {} fixed = {}
order_by = [] if order_by is None else order_by
for k, v in parameters.items(): for k, v in parameters.items():
if not isinstance(v, list): if not isinstance(v, list):
@ -37,6 +42,10 @@ def expand(parameters: Dict[str, Any], run_id: bool = False) -> List[Dict[str, A
for i, item in enumerate(final_expansion, start=1): for i, item in enumerate(final_expansion, start=1):
item["runId"] = i item["runId"] = i
# Sort is stable, so we can just order in reverse.
for key in reversed(order_by):
final_expansion.sort(key=lambda x: x[key])
return final_expansion return final_expansion
@ -90,15 +99,19 @@ def normalize_argo_params(argo_params: List[Dict[str, Any]]) -> Dict[str, Any]:
return {param["name"]: param["value"] for param in argo_params} return {param["name"]: param["value"] for param in argo_params}
def process_argo_input(input: str, run_id: bool = False) -> List[Dict[str, Any]]:
try:
params = normalize_argo_params(json.loads(input))
return expand(params, order_by=params.pop("orderBy", None), run_id=run_id)
except JSONDecodeError as err:
print("Error decoding JSON: ", err)
print("Input:", sys.argv[1])
sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) < 2: if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} '<json_string>'") print(f"Usage: {sys.argv[0]} '<json_string>'")
sys.exit(1) sys.exit(1)
try: print(json.dumps(process_argo_input(sys.argv[1], run_id=True)))
params = normalize_argo_params(json.loads(sys.argv[1]))
print(json.dumps(expand(params, run_id=True)))
except JSONDecodeError as err:
print("Error decoding JSON: ", err)
print("Input:", sys.argv[1])
sys.exit(1)

View File

@ -1,7 +1,7 @@
import json import json
from benchmarks.k8s import parameter_expander as expander from benchmarks.k8s import parameter_expander as expander
from benchmarks.k8s.parameter_expander import normalize_argo_params from benchmarks.k8s.parameter_expander import normalize_argo_params, process_argo_input
def test_should_expand_simple_parameter_lists(): def test_should_expand_simple_parameter_lists():
@ -70,3 +70,35 @@ def test_should_find_and_pre_expand_lists_encoded_as_strings():
"b": [1, [2, 3]], "b": [1, [2, 3]],
"c": "foo", "c": "foo",
} }
def test_should_respect_the_specified_product_order():
matrix = {"a": [1, 2], "b": [3, 4], "c": [5, 6], "d": "foo"}
assert expander.expand(matrix, order_by=["c", "b", "a", "d"]) == [
{"a": 1, "b": 3, "c": 5, "d": "foo"},
{"a": 2, "b": 3, "c": 5, "d": "foo"},
{"a": 1, "b": 4, "c": 5, "d": "foo"},
{"a": 2, "b": 4, "c": 5, "d": "foo"},
{"a": 1, "b": 3, "c": 6, "d": "foo"},
{"a": 2, "b": 3, "c": 6, "d": "foo"},
{"a": 1, "b": 4, "c": 6, "d": "foo"},
{"a": 2, "b": 4, "c": 6, "d": "foo"},
]
def test_should_handle_order_by_when_consuming_argo_input():
argo_input = (
'[{"name":"repetitions","value":"1"},{"name":"fileSize","value":"[\\"100MB\\", \\"500MB\\"]"},'
'{"name":"networkSize","value":"[2, 10, 15]"},{"name":"seeders","value":"1"},'
'{"name": "orderBy", "value": "[\\"networkSize\\", \\"fileSize\\"]"}]'
)
assert process_argo_input(argo_input) == [
{"repetitions": 1, "fileSize": "100MB", "networkSize": 2, "seeders": 1},
{"repetitions": 1, "fileSize": "500MB", "networkSize": 2, "seeders": 1},
{"repetitions": 1, "fileSize": "100MB", "networkSize": 10, "seeders": 1},
{"repetitions": 1, "fileSize": "500MB", "networkSize": 10, "seeders": 1},
{"repetitions": 1, "fileSize": "100MB", "networkSize": 15, "seeders": 1},
{"repetitions": 1, "fileSize": "500MB", "networkSize": 15, "seeders": 1},
]

View File

@ -20,6 +20,11 @@ spec:
- name: maxExperimentDuration - name: maxExperimentDuration
value: 144h value: 144h
# Groups the expansion such that all experiments with a given networkSize run together, smallest
# to largest. This can save significant amounts of time when running on a cluster with autoscaling.
- name: orderBy
value: "['networkSize', 'seeders', 'fileSize']"
templates: templates:
- name: benchmark-workflow - name: benchmark-workflow
parallelism: 1 parallelism: 1