diff --git a/benchmarks/k8s/parameter_expander.py b/benchmarks/k8s/parameter_expander.py index e9b52e7..e28158a 100644 --- a/benchmarks/k8s/parameter_expander.py +++ b/benchmarks/k8s/parameter_expander.py @@ -4,13 +4,18 @@ import itertools import json import sys from json import JSONDecodeError -from typing import Dict, Any, List, Tuple +from typing import Dict, Any, List, Tuple, Optional -def expand(parameters: Dict[str, Any], run_id: bool = False) -> List[Dict[str, Any]]: +def expand( + parameters: Dict[str, Any], + order_by: Optional[List[str]] = None, + run_id: bool = False, +) -> List[Dict[str, Any]]: simple = {} constrained = {} fixed = {} + order_by = [] if order_by is None else order_by for k, v in parameters.items(): if not isinstance(v, list): @@ -37,6 +42,10 @@ def expand(parameters: Dict[str, Any], run_id: bool = False) -> List[Dict[str, A for i, item in enumerate(final_expansion, start=1): item["runId"] = i + # Sort is stable, so we can just order in reverse. + for key in reversed(order_by): + final_expansion.sort(key=lambda x: x[key]) + return final_expansion @@ -90,15 +99,19 @@ def normalize_argo_params(argo_params: List[Dict[str, Any]]) -> Dict[str, Any]: return {param["name"]: param["value"] for param in argo_params} +def process_argo_input(input: str, run_id: bool = False) -> List[Dict[str, Any]]: + try: + params = normalize_argo_params(json.loads(input)) + return expand(params, order_by=params.pop("orderBy", None), run_id=run_id) + except JSONDecodeError as err: + print("Error decoding JSON: ", err) + print("Input:", sys.argv[1]) + sys.exit(1) + + if __name__ == "__main__": if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} ''") sys.exit(1) - try: - params = normalize_argo_params(json.loads(sys.argv[1])) - print(json.dumps(expand(params, run_id=True))) - except JSONDecodeError as err: - print("Error decoding JSON: ", err) - print("Input:", sys.argv[1]) - sys.exit(1) + print(json.dumps(process_argo_input(sys.argv[1], run_id=True))) diff --git a/benchmarks/k8s/tests/test_parameter_expander.py b/benchmarks/k8s/tests/test_parameter_expander.py index d9926d4..9d30244 100644 --- a/benchmarks/k8s/tests/test_parameter_expander.py +++ b/benchmarks/k8s/tests/test_parameter_expander.py @@ -1,7 +1,7 @@ import json from benchmarks.k8s import parameter_expander as expander -from benchmarks.k8s.parameter_expander import normalize_argo_params +from benchmarks.k8s.parameter_expander import normalize_argo_params, process_argo_input def test_should_expand_simple_parameter_lists(): @@ -70,3 +70,35 @@ def test_should_find_and_pre_expand_lists_encoded_as_strings(): "b": [1, [2, 3]], "c": "foo", } + + +def test_should_respect_the_specified_product_order(): + matrix = {"a": [1, 2], "b": [3, 4], "c": [5, 6], "d": "foo"} + + assert expander.expand(matrix, order_by=["c", "b", "a", "d"]) == [ + {"a": 1, "b": 3, "c": 5, "d": "foo"}, + {"a": 2, "b": 3, "c": 5, "d": "foo"}, + {"a": 1, "b": 4, "c": 5, "d": "foo"}, + {"a": 2, "b": 4, "c": 5, "d": "foo"}, + {"a": 1, "b": 3, "c": 6, "d": "foo"}, + {"a": 2, "b": 3, "c": 6, "d": "foo"}, + {"a": 1, "b": 4, "c": 6, "d": "foo"}, + {"a": 2, "b": 4, "c": 6, "d": "foo"}, + ] + + +def test_should_handle_order_by_when_consuming_argo_input(): + argo_input = ( + '[{"name":"repetitions","value":"1"},{"name":"fileSize","value":"[\\"100MB\\", \\"500MB\\"]"},' + '{"name":"networkSize","value":"[2, 10, 15]"},{"name":"seeders","value":"1"},' + '{"name": "orderBy", "value": "[\\"networkSize\\", \\"fileSize\\"]"}]' + ) + + assert process_argo_input(argo_input) == [ + {"repetitions": 1, "fileSize": "100MB", "networkSize": 2, "seeders": 1}, + {"repetitions": 1, "fileSize": "500MB", "networkSize": 2, "seeders": 1}, + {"repetitions": 1, "fileSize": "100MB", "networkSize": 10, "seeders": 1}, + {"repetitions": 1, "fileSize": "500MB", "networkSize": 10, "seeders": 1}, + {"repetitions": 1, "fileSize": "100MB", "networkSize": 15, "seeders": 1}, + {"repetitions": 1, "fileSize": "500MB", "networkSize": 15, "seeders": 1}, + ] diff --git a/k8s/argo-workflows/deluge-benchmark-workflow.yaml b/k8s/argo-workflows/deluge-benchmark-workflow.yaml index 04cd9da..a9ce9ed 100644 --- a/k8s/argo-workflows/deluge-benchmark-workflow.yaml +++ b/k8s/argo-workflows/deluge-benchmark-workflow.yaml @@ -20,6 +20,11 @@ spec: - name: maxExperimentDuration value: 144h + # Groups the expansion such that all experiments with a given networkSize run together, smallest + # to largest. This can save significant amounts of time when running on a cluster with autoscaling. + - name: orderBy + value: "['networkSize', 'seeders', 'fileSize']" + templates: - name: benchmark-workflow parallelism: 1