feat: make deluge agent client part of the experiment environment

This commit is contained in:
gmega 2025-01-20 15:00:02 -03:00
parent 94556d7a53
commit 02f680a91d
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
8 changed files with 78 additions and 29 deletions

View File

@ -1,14 +1,15 @@
"""Basic definitions for structuring experiments."""
import logging
import random
from abc import ABC, abstractmethod
from collections.abc import Iterable
from time import time, sleep
from typing import List, Optional
from typing_extensions import Generic, TypeVar
from benchmarks.core.config import Builder
from benchmarks.core.utils import await_predicate
logger = logging.getLogger(__name__)
@ -63,41 +64,51 @@ class ExperimentComponent(ABC):
pass
class ExperimentEnvironment:
class ExperimentEnvironment(ExperimentComponent):
"""An :class:`ExperimentEnvironment` is a collection of :class:`ExperimentComponent`s that must be ready before
an :class:`Experiment` can execute."""
an :class:`Experiment` can execute. Note that we assume that readiness is stable; i.e., if a component is ready
at some point, then it will remain ready for the duration of the experiment."""
def __init__(
self, components: Iterable[ExperimentComponent], polling_interval: float = 0
self,
components: Iterable[ExperimentComponent],
ping_max: int = 10,
polling_interval: float = 0,
):
self.components = components
self.polling_interval = polling_interval
self.ping_max = ping_max
self.not_ready = list(components)
def await_ready(self, timeout: float = 0) -> bool:
"""Awaits for all components to be ready, or until a timeout is reached."""
start_time = time()
not_ready = [component for component in self.components]
logging.info(
f"Awaiting for components to be ready: {self._component_names(not_ready)}"
f"Awaiting for components to be ready: {self._component_names(self.not_ready)}"
)
while len(not_ready) != 0:
for component in not_ready:
if component.is_ready():
logger.info(f"Component {str(component)} is ready.")
not_ready.remove(component)
sleep(self.polling_interval)
if (timeout != 0) and (time() - start_time > timeout):
logger.info(
f"Some components timed out: {self._component_names(not_ready)}"
)
return False
if not await_predicate(self.is_ready, timeout, self.polling_interval):
logger.info(
f"Some components timed out: {self._component_names(self.not_ready)}"
)
return False
return True
def is_ready(self) -> bool:
for component in self._draw(self.not_ready):
if component.is_ready():
logger.info(f"Component {str(component)} is ready.")
self.not_ready.remove(component)
return len(self.not_ready) == 0
def _draw(self, components: List[ExperimentComponent]) -> List[ExperimentComponent]:
if len(components) <= self.ping_max:
return components
random.shuffle(components)
return components[: self.ping_max]
@staticmethod
def _component_names(components: List[ExperimentComponent]) -> str:
return ", ".join(str(component) for component in components)

View File

@ -77,3 +77,17 @@ def test_should_bind_experiment_to_environment():
assert components[0].is_ready()
assert components[1].is_ready()
def test_should_not_ping_more_than_ping_max_components_per_polling_round():
components = [
ExternalComponent(5),
ExternalComponent(3),
ExternalComponent(1),
]
env = ExperimentEnvironment(components, ping_max=2, polling_interval=0)
env.is_ready()
assert len([component for component in components if component.iteration == 1]) == 2
assert len([component for component in components if component.iteration == 0]) == 1

View File

@ -6,8 +6,8 @@ from typing import Iterator, Optional, Callable, IO
def await_predicate(
predicate: Callable[[], bool], timeout: float = 0, polling_interval: float = 0
) -> bool:
current = time()
while (timeout == 0) or ((time() - current) <= timeout):
start_time = time()
while (timeout == 0) or ((time() - start_time) <= timeout):
if predicate():
return True
sleep(polling_interval)

View File

@ -15,6 +15,11 @@ def deluge_agent() -> DelugeAgent:
raise Exception("Dependency must be set")
@router.post("/api/v1/hello")
def hello():
return {"message": "Server is up"}
@router.post("/api/v1/deluge/torrent")
def generate(
agent: Annotated[DelugeAgent, Depends(deluge_agent)],

View File

@ -1,13 +1,24 @@
import socket
import requests
from tenacity import stop_after_attempt, wait_exponential, retry
from torrentool.torrent import Torrent
from urllib3.util import Url
from benchmarks.core.experiments.experiments import ExperimentComponent
class DelugeAgentClient:
class DelugeAgentClient(ExperimentComponent):
def __init__(self, url: Url):
self.url = url
def is_ready(self) -> bool:
try:
requests.get(str(self.url._replace(path="/api/v1/hello")))
return True
except (ConnectionError, socket.gaierror):
return False
def generate(self, size: int, seed: int, name: str) -> Torrent:
@retry(
stop=stop_after_attempt(5),

View File

@ -16,6 +16,7 @@ from benchmarks.core.experiments.experiments import (
from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
from benchmarks.core.pydantic import Host
from benchmarks.core.utils import sample
from benchmarks.deluge.agent.client import DelugeAgentClient
from benchmarks.deluge.deluge_node import DelugeMeta, DelugeNode
from benchmarks.deluge.tracker import Tracker
@ -95,13 +96,18 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]):
else self.nodes
)
agents = [
DelugeAgentClient(parse_url(str(node_spec.agent_url)))
for node_spec in nodes_specs
]
network = [
DelugeNode(
name=node_spec.name,
volume=self.shared_volume_path,
daemon_port=node_spec.daemon_port,
daemon_address=str(node_spec.address),
agent_url=parse_url(str(node_spec.agent_url)),
agent=agents[i],
)
for i, node_spec in enumerate(nodes_specs)
]
@ -109,7 +115,8 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]):
tracker = Tracker(parse_url(str(self.tracker_announce_url)))
env = ExperimentEnvironment(
components=network + [tracker],
components=network + agents + [tracker],
ping_max=10,
polling_interval=0.5,
)

View File

@ -38,7 +38,7 @@ class DelugeNode(ExperimentComponent):
name: str,
volume: Path,
daemon_port: int,
agent_url: Url = Url(scheme="http", host="localhost", port=8000),
agent: DelugeAgentClient,
daemon_address: str = "localhost",
daemon_username: str = "user",
daemon_password: str = "password",
@ -57,7 +57,7 @@ class DelugeNode(ExperimentComponent):
"password": daemon_password,
}
self.agent = DelugeAgentClient(agent_url)
self.agent = agent
@property
def name(self) -> str:

View File

@ -6,6 +6,7 @@ import pytest
from urllib3.util import parse_url
from benchmarks.core.utils import await_predicate
from benchmarks.deluge.agent.client import DelugeAgentClient
from benchmarks.deluge.deluge_node import DelugeNode
from benchmarks.deluge.tracker import Tracker
@ -18,7 +19,7 @@ def deluge_node(
volume=Path("/var/lib/deluge"),
daemon_address=address,
daemon_port=port,
agent_url=parse_url(agent_url),
agent=DelugeAgentClient(parse_url(agent_url)),
)
assert await_predicate(node.is_ready, timeout=10, polling_interval=0.5)
node.wipe_all_torrents()