From 54c224d7601a731d00787978a6282f06bae60abe Mon Sep 17 00:00:00 2001 From: gmega Date: Sun, 12 Jan 2025 19:33:14 -0300 Subject: [PATCH] fix: improve experiment robustness, fix test assertions --- .../core/experiments/static_experiment.py | 15 ++++++++++++++- benchmarks/deluge/config.py | 7 +++++++ benchmarks/deluge/deluge_node.py | 2 ++ benchmarks/deluge/tests/test_deluge_node.py | 5 ++++- experiments.k8s.yaml | 1 + poetry.lock | 17 ++++++++++++++++- pyproject.toml | 1 + 7 files changed, 45 insertions(+), 3 deletions(-) diff --git a/benchmarks/core/experiments/static_experiment.py b/benchmarks/core/experiments/static_experiment.py index d6da758..22923ac 100644 --- a/benchmarks/core/experiments/static_experiment.py +++ b/benchmarks/core/experiments/static_experiment.py @@ -1,5 +1,6 @@ import logging from multiprocessing.pool import ThreadPool +from time import sleep from typing import Sequence, Optional from typing_extensions import Generic, List, Tuple @@ -26,6 +27,7 @@ class StaticDisseminationExperiment( seeders: List[int], data: ExperimentData[TInitialMetadata], concurrency: Optional[int] = None, + logging_cooldown: int = 0, ) -> None: self.nodes = network self.seeders = seeders @@ -36,6 +38,7 @@ class StaticDisseminationExperiment( else concurrency ) self._cid: Optional[TNetworkHandle] = None + self.logging_cooldown = logging_cooldown def setup(self): pass @@ -73,7 +76,10 @@ class StaticDisseminationExperiment( def _await_for_download(element: Tuple[int, DownloadHandle]) -> int: index, download = element - download.await_for_completion() + if not download.await_for_completion(): + raise Exception( + f"Download ({index}, {str(download)}) did not complete in time." + ) return index for i in self._pool.imap_unordered( @@ -81,6 +87,13 @@ class StaticDisseminationExperiment( ): logger.info("Download %d / %d completed", i + 1, len(downloads)) + # FIXME this is a hack to ensure that nodes get a chance to log their data before we + # run the teardown hook and remove the torrents. + logger.info( + f"Waiting for {self.logging_cooldown} seconds before teardown..." + ) + sleep(self.logging_cooldown) + def teardown(self, exception: Optional[Exception] = None): def _remove(element: Tuple[int, Node[TNetworkHandle, TInitialMetadata]]): index, node = element diff --git a/benchmarks/deluge/config.py b/benchmarks/deluge/config.py index 78b3726..04f8c9d 100644 --- a/benchmarks/deluge/config.py +++ b/benchmarks/deluge/config.py @@ -78,6 +78,12 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]): description="Configuration for the nodes that make up the network" ) + logging_cooldown: int = Field( + gt=0, + default=0, + description="Time to wait after the last download completes before tearing down the experiment.", + ) + def build(self) -> DelugeDisseminationExperiment: nodes_specs = ( self.nodes.nodes @@ -117,6 +123,7 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]): announce_url=tracker.announce_url, ), ), + logging_cooldown=self.logging_cooldown, ) ) diff --git a/benchmarks/deluge/deluge_node.py b/benchmarks/deluge/deluge_node.py index 3566ff6..fae4058 100644 --- a/benchmarks/deluge/deluge_node.py +++ b/benchmarks/deluge/deluge_node.py @@ -9,6 +9,7 @@ from typing import List, Union, Optional, Self, Dict, Any import pathvalidate from deluge_client import DelugeRPCClient +from tenacity import retry, wait_exponential from torrentool.torrent import Torrent from urllib3.util import Url @@ -128,6 +129,7 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): self.connect() return self._rpc + @retry(wait=wait_exponential(multiplier=1, min=4, max=16)) def connect(self) -> Self: client = DelugeRPCClient(**self.daemon_args) client.connect() diff --git a/benchmarks/deluge/tests/test_deluge_node.py b/benchmarks/deluge/tests/test_deluge_node.py index abc0109..061d009 100644 --- a/benchmarks/deluge/tests/test_deluge_node.py +++ b/benchmarks/deluge/tests/test_deluge_node.py @@ -16,11 +16,14 @@ def assert_is_seed(node: DelugeNode, name: str, size: int): assert len(response) == 1 info = response[0] + if not info[b"is_seed"]: + return False + assert info[b"name"] == name.encode( "utf-8" ) # not sure that this works for ANY name... assert info[b"total_size"] == size - assert info[b"is_seed"] + return True assert await_predicate(_is_seed, timeout=5) diff --git a/experiments.k8s.yaml b/experiments.k8s.yaml index bd25bcb..d91bf6b 100644 --- a/experiments.k8s.yaml +++ b/experiments.k8s.yaml @@ -5,6 +5,7 @@ deluge_experiment: file_size: ${FILE_SIZE} repetitions: ${REPETITIONS} shared_volume_path: ${SHARED_VOLUME_PATH} + logging_cooldown: 10 nodes: network_size: ${NETWORK_SIZE} diff --git a/poetry.lock b/poetry.lock index 0e4eb55..b1087bc 100644 --- a/poetry.lock +++ b/poetry.lock @@ -640,6 +640,21 @@ files = [ {file = "ruff-0.8.6.tar.gz", hash = "sha256:dcad24b81b62650b0eb8814f576fc65cfee8674772a6e24c9b747911801eeaa5"}, ] +[[package]] +name = "tenacity" +version = "9.0.0" +description = "Retry code until it succeeds" +optional = false +python-versions = ">=3.8" +files = [ + {file = "tenacity-9.0.0-py3-none-any.whl", hash = "sha256:93de0c98785b27fcf659856aa9f54bfbd399e29969b0621bc7f762bd441b4539"}, + {file = "tenacity-9.0.0.tar.gz", hash = "sha256:807f37ca97d62aa361264d497b0e31e92b8027044942bfa756160d908320d73b"}, +] + +[package.extras] +doc = ["reno", "sphinx"] +test = ["pytest", "tornado (>=4.5)", "typeguard"] + [[package]] name = "torrentool" version = "1.2.0" @@ -730,4 +745,4 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "5dfb4405c6c8fa4c8651e17ee88b1eea15461eeb4748ddf53584ea97a98a291c" +content-hash = "ecc038d5e0e05d072a39a294ef96e7045554678cfc870bafe13b07385208e0f2" diff --git a/pyproject.toml b/pyproject.toml index 9bc1ea0..94b69e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ pydantic = "^2.10.2" pyyaml = "^6.0.2" requests = "^2.32.3" ruff = "^0.8.6" +tenacity = "^9.0.0" [tool.poetry.group.test.dependencies] pytest = "^8.3.3"