From 02f680a91dea23829fc2cec446072f38ffebfbb3 Mon Sep 17 00:00:00 2001 From: gmega Date: Mon, 20 Jan 2025 15:00:02 -0300 Subject: [PATCH] feat: make deluge agent client part of the experiment environment --- benchmarks/core/experiments/experiments.py | 53 +++++++++++-------- .../experiments/tests/test_experiments.py | 14 +++++ benchmarks/core/utils.py | 4 +- benchmarks/deluge/agent/api.py | 5 ++ benchmarks/deluge/agent/client.py | 13 ++++- benchmarks/deluge/config.py | 11 +++- benchmarks/deluge/deluge_node.py | 4 +- benchmarks/deluge/tests/fixtures.py | 3 +- 8 files changed, 78 insertions(+), 29 deletions(-) diff --git a/benchmarks/core/experiments/experiments.py b/benchmarks/core/experiments/experiments.py index 83e267d..2be2850 100644 --- a/benchmarks/core/experiments/experiments.py +++ b/benchmarks/core/experiments/experiments.py @@ -1,14 +1,15 @@ """Basic definitions for structuring experiments.""" import logging +import random from abc import ABC, abstractmethod from collections.abc import Iterable -from time import time, sleep from typing import List, Optional from typing_extensions import Generic, TypeVar from benchmarks.core.config import Builder +from benchmarks.core.utils import await_predicate logger = logging.getLogger(__name__) @@ -63,41 +64,51 @@ class ExperimentComponent(ABC): pass -class ExperimentEnvironment: +class ExperimentEnvironment(ExperimentComponent): """An :class:`ExperimentEnvironment` is a collection of :class:`ExperimentComponent`s that must be ready before - an :class:`Experiment` can execute.""" + an :class:`Experiment` can execute. Note that we assume that readiness is stable; i.e., if a component is ready + at some point, then it will remain ready for the duration of the experiment.""" def __init__( - self, components: Iterable[ExperimentComponent], polling_interval: float = 0 + self, + components: Iterable[ExperimentComponent], + ping_max: int = 10, + polling_interval: float = 0, ): self.components = components self.polling_interval = polling_interval + self.ping_max = ping_max + self.not_ready = list(components) def await_ready(self, timeout: float = 0) -> bool: """Awaits for all components to be ready, or until a timeout is reached.""" - - start_time = time() - not_ready = [component for component in self.components] - logging.info( - f"Awaiting for components to be ready: {self._component_names(not_ready)}" + f"Awaiting for components to be ready: {self._component_names(self.not_ready)}" ) - while len(not_ready) != 0: - for component in not_ready: - if component.is_ready(): - logger.info(f"Component {str(component)} is ready.") - not_ready.remove(component) - sleep(self.polling_interval) - - if (timeout != 0) and (time() - start_time > timeout): - logger.info( - f"Some components timed out: {self._component_names(not_ready)}" - ) - return False + if not await_predicate(self.is_ready, timeout, self.polling_interval): + logger.info( + f"Some components timed out: {self._component_names(self.not_ready)}" + ) + return False return True + def is_ready(self) -> bool: + for component in self._draw(self.not_ready): + if component.is_ready(): + logger.info(f"Component {str(component)} is ready.") + self.not_ready.remove(component) + + return len(self.not_ready) == 0 + + def _draw(self, components: List[ExperimentComponent]) -> List[ExperimentComponent]: + if len(components) <= self.ping_max: + return components + + random.shuffle(components) + return components[: self.ping_max] + @staticmethod def _component_names(components: List[ExperimentComponent]) -> str: return ", ".join(str(component) for component in components) diff --git a/benchmarks/core/experiments/tests/test_experiments.py b/benchmarks/core/experiments/tests/test_experiments.py index f6dce1e..973ae3c 100644 --- a/benchmarks/core/experiments/tests/test_experiments.py +++ b/benchmarks/core/experiments/tests/test_experiments.py @@ -77,3 +77,17 @@ def test_should_bind_experiment_to_environment(): assert components[0].is_ready() assert components[1].is_ready() + + +def test_should_not_ping_more_than_ping_max_components_per_polling_round(): + components = [ + ExternalComponent(5), + ExternalComponent(3), + ExternalComponent(1), + ] + + env = ExperimentEnvironment(components, ping_max=2, polling_interval=0) + env.is_ready() + + assert len([component for component in components if component.iteration == 1]) == 2 + assert len([component for component in components if component.iteration == 0]) == 1 diff --git a/benchmarks/core/utils.py b/benchmarks/core/utils.py index f94c64f..4f56519 100644 --- a/benchmarks/core/utils.py +++ b/benchmarks/core/utils.py @@ -6,8 +6,8 @@ from typing import Iterator, Optional, Callable, IO def await_predicate( predicate: Callable[[], bool], timeout: float = 0, polling_interval: float = 0 ) -> bool: - current = time() - while (timeout == 0) or ((time() - current) <= timeout): + start_time = time() + while (timeout == 0) or ((time() - start_time) <= timeout): if predicate(): return True sleep(polling_interval) diff --git a/benchmarks/deluge/agent/api.py b/benchmarks/deluge/agent/api.py index 6383ea0..1746cf7 100644 --- a/benchmarks/deluge/agent/api.py +++ b/benchmarks/deluge/agent/api.py @@ -15,6 +15,11 @@ def deluge_agent() -> DelugeAgent: raise Exception("Dependency must be set") +@router.post("/api/v1/hello") +def hello(): + return {"message": "Server is up"} + + @router.post("/api/v1/deluge/torrent") def generate( agent: Annotated[DelugeAgent, Depends(deluge_agent)], diff --git a/benchmarks/deluge/agent/client.py b/benchmarks/deluge/agent/client.py index c7b4608..3bf3a9d 100644 --- a/benchmarks/deluge/agent/client.py +++ b/benchmarks/deluge/agent/client.py @@ -1,13 +1,24 @@ +import socket + import requests from tenacity import stop_after_attempt, wait_exponential, retry from torrentool.torrent import Torrent from urllib3.util import Url +from benchmarks.core.experiments.experiments import ExperimentComponent -class DelugeAgentClient: + +class DelugeAgentClient(ExperimentComponent): def __init__(self, url: Url): self.url = url + def is_ready(self) -> bool: + try: + requests.get(str(self.url._replace(path="/api/v1/hello"))) + return True + except (ConnectionError, socket.gaierror): + return False + def generate(self, size: int, seed: int, name: str) -> Torrent: @retry( stop=stop_after_attempt(5), diff --git a/benchmarks/deluge/config.py b/benchmarks/deluge/config.py index 9ea988f..3456b04 100644 --- a/benchmarks/deluge/config.py +++ b/benchmarks/deluge/config.py @@ -16,6 +16,7 @@ from benchmarks.core.experiments.experiments import ( from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment from benchmarks.core.pydantic import Host from benchmarks.core.utils import sample +from benchmarks.deluge.agent.client import DelugeAgentClient from benchmarks.deluge.deluge_node import DelugeMeta, DelugeNode from benchmarks.deluge.tracker import Tracker @@ -95,13 +96,18 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]): else self.nodes ) + agents = [ + DelugeAgentClient(parse_url(str(node_spec.agent_url))) + for node_spec in nodes_specs + ] + network = [ DelugeNode( name=node_spec.name, volume=self.shared_volume_path, daemon_port=node_spec.daemon_port, daemon_address=str(node_spec.address), - agent_url=parse_url(str(node_spec.agent_url)), + agent=agents[i], ) for i, node_spec in enumerate(nodes_specs) ] @@ -109,7 +115,8 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]): tracker = Tracker(parse_url(str(self.tracker_announce_url))) env = ExperimentEnvironment( - components=network + [tracker], + components=network + agents + [tracker], + ping_max=10, polling_interval=0.5, ) diff --git a/benchmarks/deluge/deluge_node.py b/benchmarks/deluge/deluge_node.py index 316469d..8608371 100644 --- a/benchmarks/deluge/deluge_node.py +++ b/benchmarks/deluge/deluge_node.py @@ -38,7 +38,7 @@ class DelugeNode(ExperimentComponent): name: str, volume: Path, daemon_port: int, - agent_url: Url = Url(scheme="http", host="localhost", port=8000), + agent: DelugeAgentClient, daemon_address: str = "localhost", daemon_username: str = "user", daemon_password: str = "password", @@ -57,7 +57,7 @@ class DelugeNode(ExperimentComponent): "password": daemon_password, } - self.agent = DelugeAgentClient(agent_url) + self.agent = agent @property def name(self) -> str: diff --git a/benchmarks/deluge/tests/fixtures.py b/benchmarks/deluge/tests/fixtures.py index 40c6a41..5458505 100644 --- a/benchmarks/deluge/tests/fixtures.py +++ b/benchmarks/deluge/tests/fixtures.py @@ -6,6 +6,7 @@ import pytest from urllib3.util import parse_url from benchmarks.core.utils import await_predicate +from benchmarks.deluge.agent.client import DelugeAgentClient from benchmarks.deluge.deluge_node import DelugeNode from benchmarks.deluge.tracker import Tracker @@ -18,7 +19,7 @@ def deluge_node( volume=Path("/var/lib/deluge"), daemon_address=address, daemon_port=port, - agent_url=parse_url(agent_url), + agent=DelugeAgentClient(parse_url(agent_url)), ) assert await_predicate(node.is_ready, timeout=10, polling_interval=0.5) node.wipe_all_torrents()