From e8441b7bea640b8c6172fa86df8f676467fdb598 Mon Sep 17 00:00:00 2001 From: gmega Date: Fri, 14 Feb 2025 15:59:28 -0300 Subject: [PATCH] fix: respect logger increments even when stream returns less data than expected --- benchmarks/codex/agent/agent.py | 15 +++-- .../codex/agent/tests/test_codex_agent.py | 55 +++++++++++++++++++ ...-workflow.yaml => benchmark-workflow.yaml} | 0 3 files changed, 64 insertions(+), 6 deletions(-) rename k8s/argo-workflows/{deluge-benchmark-workflow.yaml => benchmark-workflow.yaml} (100%) diff --git a/benchmarks/codex/agent/agent.py b/benchmarks/codex/agent/agent.py index 90e030c..6141a99 100644 --- a/benchmarks/codex/agent/agent.py +++ b/benchmarks/codex/agent/agent.py @@ -47,6 +47,7 @@ class DownloadHandle: step_size = int(self.manifest.datasetSize * self.read_increment) async with self.parent.client.download(self.manifest.cid) as download_stream: + logged_step = 0 while not download_stream.at_eof(): step = min(step_size, self.manifest.datasetSize - self.bytes_downloaded) bytes_read = await download_stream.read(step) @@ -56,13 +57,15 @@ class DownloadHandle: await asyncio.sleep(EMPTY_STREAM_BACKOFF) self.bytes_downloaded += len(bytes_read) - logger.info( - CodexDownloadMetric( - cid=self.manifest.cid, - value=self.bytes_downloaded, - node=self.parent.node_id, + if int(self.bytes_downloaded / step_size) > logged_step: + logged_step += 1 + logger.info( + CodexDownloadMetric( + cid=self.manifest.cid, + value=step_size * logged_step, + node=self.parent.node_id, + ) ) - ) if self.bytes_downloaded < self.manifest.datasetSize: raise EOFError( diff --git a/benchmarks/codex/agent/tests/test_codex_agent.py b/benchmarks/codex/agent/tests/test_codex_agent.py index 667ed43..fb94007 100644 --- a/benchmarks/codex/agent/tests/test_codex_agent.py +++ b/benchmarks/codex/agent/tests/test_codex_agent.py @@ -154,6 +154,61 @@ async def test_should_log_download_progress_as_metric_in_discrete_steps(mock_log ] +@pytest.mark.asyncio +async def test_should_log_download_progress_as_discrete_steps_even_when_underlying_stream_is_choppy( + mock_logger, +): + logger, output = mock_logger + + with patch("benchmarks.codex.agent.agent.logger", logger): + client = FakeCodexClient() + codex_agent = CodexAgent(client) + cid = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234) + download_stream = client.create_download_stream(cid) + handle = await codex_agent.download(cid, read_increment=0.2) + + # Simulates a choppy download which returns a lot less than the logging step size every time. + fed = 0 + step = 37 + while fed < 1000: + to_feed = min(step, 1000 - fed) + download_stream.feed_data(b"0" * to_feed) + fed += to_feed + assert await await_predicate_async( + lambda: handle.progress() == DownloadStatus(downloaded=fed, total=1000), + timeout=5, + ) + + download_stream.feed_eof() + await handle.download_task + + parser = LogParser() + parser.register(CodexDownloadMetric) + + metrics = list(parser.parse(StringIO(output.getvalue()))) + + assert metrics == [ + CodexDownloadMetric( + cid=cid, value=200, node=codex_agent.node_id, timestamp=metrics[0].timestamp + ), + CodexDownloadMetric( + cid=cid, value=400, node=codex_agent.node_id, timestamp=metrics[1].timestamp + ), + CodexDownloadMetric( + cid=cid, value=600, node=codex_agent.node_id, timestamp=metrics[2].timestamp + ), + CodexDownloadMetric( + cid=cid, value=800, node=codex_agent.node_id, timestamp=metrics[3].timestamp + ), + CodexDownloadMetric( + cid=cid, + value=1000, + node=codex_agent.node_id, + timestamp=metrics[4].timestamp, + ), + ] + + @pytest.mark.asyncio async def test_should_track_download_handles(): client = FakeCodexClient() diff --git a/k8s/argo-workflows/deluge-benchmark-workflow.yaml b/k8s/argo-workflows/benchmark-workflow.yaml similarity index 100% rename from k8s/argo-workflows/deluge-benchmark-workflow.yaml rename to k8s/argo-workflows/benchmark-workflow.yaml