From 04828514e4b5ff989e631cc7c19e1dacd888a821 Mon Sep 17 00:00:00 2001 From: gmega Date: Mon, 9 Jun 2025 20:06:19 -0300 Subject: [PATCH] feat: port bug fixes and features from swarm branch --- benchmarks/codex/agent/agent.py | 3 +- benchmarks/codex/agent/tests/test_api.py | 1 + .../codex/agent/tests/test_codex_agent.py | 55 +++++++++++++++---- benchmarks/codex/codex_node.py | 17 ++++-- benchmarks/codex/tests/test_codex_node.py | 8 +++ k8s/argo-workflows/benchmark-workflow.yaml | 24 ++++++-- .../log-parsing-workflow-template.yaml | 9 ++- 7 files changed, 89 insertions(+), 28 deletions(-) diff --git a/benchmarks/codex/agent/agent.py b/benchmarks/codex/agent/agent.py index 12261e7..cb995a8 100644 --- a/benchmarks/codex/agent/agent.py +++ b/benchmarks/codex/agent/agent.py @@ -47,7 +47,7 @@ class DownloadHandle: return self.download_task async def _download_loop(self): - step_size = int(self.manifest.datasetSize * self.read_increment) + step_size = max(1, int(self.manifest.datasetSize * self.read_increment)) async with self.parent.client.download( self.manifest.cid, @@ -72,7 +72,6 @@ class DownloadHandle: logger.info( DownloadMetric( dataset_name=self.manifest.filename, - handle=self.manifest.cid, value=step_size * logged_step, node=self.parent.node_id, ) diff --git a/benchmarks/codex/agent/tests/test_api.py b/benchmarks/codex/agent/tests/test_api.py index c5b76ae..7f3cae3 100644 --- a/benchmarks/codex/agent/tests/test_api.py +++ b/benchmarks/codex/agent/tests/test_api.py @@ -25,6 +25,7 @@ async def test_should_create_file(): ) assert response.status_code == 200 + assert response.headers["content-type"] == "text/plain" assert response.charset_encoding == "utf-8" manifest = await codex_client.manifest(response.text) diff --git a/benchmarks/codex/agent/tests/test_codex_agent.py b/benchmarks/codex/agent/tests/test_codex_agent.py index 9aea254..cf4102a 100644 --- a/benchmarks/codex/agent/tests/test_codex_agent.py +++ b/benchmarks/codex/agent/tests/test_codex_agent.py @@ -99,35 +99,30 @@ async def test_should_log_download_progress_as_metric_in_discrete_steps(mock_log assert metrics == [ DownloadMetric( dataset_name="dataset-1", - handle=cid, value=200, node=codex_agent.node_id, timestamp=metrics[0].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=400, node=codex_agent.node_id, timestamp=metrics[1].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=600, node=codex_agent.node_id, timestamp=metrics[2].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=800, node=codex_agent.node_id, timestamp=metrics[3].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=1000, node=codex_agent.node_id, timestamp=metrics[4].timestamp, @@ -171,35 +166,30 @@ async def test_should_log_download_progress_as_discrete_steps_even_when_underlyi assert metrics == [ DownloadMetric( dataset_name="dataset-1", - handle=cid, value=200, node=codex_agent.node_id, timestamp=metrics[0].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=400, node=codex_agent.node_id, timestamp=metrics[1].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=600, node=codex_agent.node_id, timestamp=metrics[2].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=800, node=codex_agent.node_id, timestamp=metrics[3].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=1000, node=codex_agent.node_id, timestamp=metrics[4].timestamp, @@ -207,6 +197,51 @@ async def test_should_log_download_progress_as_discrete_steps_even_when_underlyi ] +@pytest.mark.asyncio +async def test_should_log_download_progress_even_when_log_granularity_larger_than_number_of_bytes( + mock_logger, +): + logger, output = mock_logger + + with patch("benchmarks.codex.agent.agent.logger", logger): + client = FakeCodex() + codex_agent = CodexAgent(client) + cid = await codex_agent.create_dataset(size=3, name="dataset-1", seed=1234) + download_stream = client.create_download_stream(cid) + handle = await codex_agent.download(cid, read_increment=0.1) + + download_stream.feed_data(b"0" * 3) + download_stream.feed_eof() + + await handle.download_task + + parser = LogParser() + parser.register(DownloadMetric) + + metrics = list(parser.parse(StringIO(output.getvalue()))) + + assert metrics == [ + DownloadMetric( + dataset_name="dataset-1", + value=1, + node=codex_agent.node_id, + timestamp=metrics[0].timestamp, + ), + DownloadMetric( + dataset_name="dataset-1", + value=2, + node=codex_agent.node_id, + timestamp=metrics[1].timestamp, + ), + DownloadMetric( + dataset_name="dataset-1", + value=3, + node=codex_agent.node_id, + timestamp=metrics[2].timestamp, + ), + ] + + @pytest.mark.asyncio async def test_should_track_download_handles(): client = FakeCodex() diff --git a/benchmarks/codex/codex_node.py b/benchmarks/codex/codex_node.py index 3c2e924..4e51dd7 100644 --- a/benchmarks/codex/codex_node.py +++ b/benchmarks/codex/codex_node.py @@ -35,11 +35,14 @@ class CodexMeta: class CodexNode(Node[Cid, CodexMeta], ExperimentComponent): - def __init__(self, codex_api_url: Url, agent: CodexAgentClient) -> None: + def __init__( + self, codex_api_url: Url, agent: CodexAgentClient, remove_data: bool = True + ) -> None: self.codex_api_url = codex_api_url self.agent = agent # Lightweight tracking of datasets created by this node. It's OK if we lose them. self.hosted_datasets: Set[Cid] = set() + self.remove_data = remove_data def is_ready(self) -> bool: try: @@ -70,12 +73,14 @@ class CodexNode(Node[Cid, CodexMeta], ExperimentComponent): return CodexDownloadHandle(parent=self, monitor_url=self.agent.download(handle)) def remove(self, handle: Cid) -> bool: - response = requests.delete( - str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}")), - timeout=DELETE_TIMEOUT, - ) + if self.remove_data: + response = requests.delete( + str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}")), + timeout=DELETE_TIMEOUT, + ) + + response.raise_for_status() - response.raise_for_status() return True def exists_local(self, handle: Cid) -> bool: diff --git a/benchmarks/codex/tests/test_codex_node.py b/benchmarks/codex/tests/test_codex_node.py index 7c8ea38..db0096f 100644 --- a/benchmarks/codex/tests/test_codex_node.py +++ b/benchmarks/codex/tests/test_codex_node.py @@ -35,6 +35,14 @@ def test_should_remove_file(codex_node1: CodexNode): assert not codex_node1.exists_local(cid) +@pytest.mark.codex_integration +def test_should_turn_remove_file_into_noop_if_remove_file_flag_is_false( + codex_node1: CodexNode, +): + codex_node1.remove_data = False + assert codex_node1.remove("malformed-and-invalid-cid") + + @pytest.mark.codex_integration def test_should_download_file_from_local_node(codex_node1: CodexNode): cid = codex_node1.genseed( diff --git a/k8s/argo-workflows/benchmark-workflow.yaml b/k8s/argo-workflows/benchmark-workflow.yaml index 2ea36b1..d159b0f 100644 --- a/k8s/argo-workflows/benchmark-workflow.yaml +++ b/k8s/argo-workflows/benchmark-workflow.yaml @@ -10,7 +10,7 @@ spec: ######################################## Global Settings ############################################## # What are we benchmarking (one of: codex, deluge)? - name: system - value: "deluge" + value: "codex" ################################ Experiment Parameter Matrix ########################################## # Parameters in the experiment parameter matrix will be expanded, and can be set @@ -35,6 +35,17 @@ spec: # off of a branch. - name: nodeTag value: "latest" + # Which tag to use for the experiment runner. Useful if you want to run something off of a branch. + - name: runnerTag + value: "latest" + # Which tag to use for the workflow runner. Useful if you want to run something off of a branch. + - name: workflowRunnerTag + value: "latest" + - name: removeData + value: "false" + - name: codexLogLevel + value: "INFO" + # value: "DEBUG;trace:swarm\\,blockexcnetworkpeer" # make sure to escape commas or helm will fail ###################################### Experiment Retries ############################################# # Allow the workflow to replay failed experiments from a previous run instead of running a new set. @@ -85,7 +96,7 @@ spec: # If set to false, does not parse/upload logs at the end of the experiment. You'll probably want to # disable this when running local experiments. - name: parseLogs - value: "true" + value: "false" ####################################################################################################### @@ -181,7 +192,7 @@ spec: when: '{{workflow.parameters.parseLogs}} == true' - name: retry-benchmark-run - parallelism: 2 + parallelism: 1 inputs: parameters: - name: runnerImage @@ -272,7 +283,7 @@ spec: echo "Never" > /tmp/imagePullPolicy.txt else echo "NOT using Minikube env" - echo "codexstorage/bittorrent-benchmarks-workflows:latest" > /tmp/image.txt + echo "codexstorage/bittorrent-benchmarks-workflows:{{workflow.parameters.workflowRunnerTag}}" > /tmp/image.txt echo "Always" > /tmp/imagePullPolicy.txt fi outputs: @@ -468,7 +479,7 @@ spec: imagePullPolicy: '{{inputs.parameters.imagePullPolicy}}' command: [ "/bin/bash" ] source: | - set -e + set -e if [[ "{{workflow.parameters.minikubeEnv}}" == "false" ]]; then echo "Using devnet cluster values for deploy." @@ -482,9 +493,12 @@ spec: --set experiment.fileSize={{inputs.parameters.fileSize}}\ --set experiment.networkSize={{inputs.parameters.networkSize}}\ --set experiment.seeders={{inputs.parameters.seeders}}\ + --set "experiment.codexLogLevel={{workflow.parameters.codexLogLevel}}"\ --set experiment.seederSets={{inputs.parameters.seederSets}}\ --set deployment.minikubeEnv={{workflow.parameters.minikubeEnv}}\ + --set deployment.removeData={{workflow.parameters.removeData}}\ --set deployment.nodeTag={{workflow.parameters.nodeTag}}\ + --set deployment.runnerTag={{workflow.parameters.runnerTag}}\ --set deployment.region={{workflow.parameters.region}} - name: wait-for-experiment diff --git a/k8s/argo-workflows/log-parsing-workflow-template.yaml b/k8s/argo-workflows/log-parsing-workflow-template.yaml index 76fabfd..acb5901 100644 --- a/k8s/argo-workflows/log-parsing-workflow-template.yaml +++ b/k8s/argo-workflows/log-parsing-workflow-template.yaml @@ -62,7 +62,7 @@ spec: command: ["/bin/bash"] source: | set -e - + if [ -z "$(ls /var/logs/{{workflow.parameters.experimentGroupId}})" ]; then echo "No logs found." exit 1 @@ -76,16 +76,15 @@ spec: echo "Configure s3 alias for endpoint ${AWS_ENDPOINT_URL}." mc alias set s3 "${AWS_ENDPOINT_URL}" "${AWS_ACCESS_KEY_ID}" "${AWS_SECRET_ACCESS_KEY}" - + echo "Copy logs." mc cp "/var/logs/{{workflow.parameters.experimentGroupId}}.tar.gz"\ "s3/{{workflow.parameters.bucket}}/logs/{{workflow.parameters.experimentGroupId}}.tar.gz" - + envFrom: - secretRef: name: s3-codex-benchmarks - + volumeMounts: - name: logs mountPath: "/var/logs" - \ No newline at end of file