diff --git a/benchmarks/core/experiments/experiments.py b/benchmarks/core/experiments/experiments.py index 69f31bf..63cc3ee 100644 --- a/benchmarks/core/experiments/experiments.py +++ b/benchmarks/core/experiments/experiments.py @@ -1,16 +1,20 @@ """Basic definitions for structuring experiments.""" +import logging from abc import ABC, abstractmethod from collections.abc import Iterable - -import logging +from typing import Optional from typing_extensions import Generic, TypeVar +from benchmarks.core.utils import await_predicate + logger = logging.getLogger(__name__) class Experiment(ABC): + """Base interface for an executable :class:`Experiment`.""" + @abstractmethod def run(self): """Synchronously runs the experiment, blocking the current thread until it's done.""" @@ -20,8 +24,59 @@ class Experiment(ABC): TExperiment = TypeVar('TExperiment', bound=Experiment) +class ExperimentComponent(ABC): + """An :class:`ExperimentComponent` is a part of the environment for an experiment. These could be databases, + network nodes, etc.""" + + @abstractmethod + def is_ready(self) -> bool: + """Returns whether this component is ready or not.""" + pass + + +class ExperimentEnvironment: + """An :class:`ExperimentEnvironment` is a collection of :class:`ExperimentComponent`s that must be ready before + an :class:`Experiment` can execute.""" + + def __init__(self, components: Iterable[ExperimentComponent], polling_interval: float = 0): + self.components = components + self.polling_interval = polling_interval + + def await_ready(self, timeout: float = 0) -> bool: + """Awaits for all components to be ready, or until a timeout is reached.""" + # TODO we should probably have per-component timeouts, or at least provide feedback + # as to what was the completion state of each component. + if not await_predicate( + lambda: all(component.is_ready() for component in self.components), + timeout=timeout, + polling_interval=self.polling_interval, + ): + return False + + return True + + def run(self, experiment: Experiment): + """Runs the :class:`Experiment` within this :class:`ExperimentEnvironment`.""" + if not self.await_ready(): + raise RuntimeError('One or more environment components were not get ready in time') + + experiment.run() + + def bind(self, experiment: TExperiment) -> Experiment: + return _BoundExperiment(experiment, self) + + +class _BoundExperiment(Experiment, ABC): + def __init__(self, experiment: Experiment, env: ExperimentEnvironment): + self.experiment = experiment + self.env = env + + def run(self): + self.env.run(self.experiment) + + class IteratedExperiment(Experiment, Generic[TExperiment]): - """An :class:`IteratedExperiment` will a sequence of :class:`Experiment`s.""" + """An :class:`IteratedExperiment` will run a sequence of :class:`Experiment`s.""" def __init__(self, experiments: Iterable[TExperiment]): self.successful_runs = 0 diff --git a/benchmarks/core/experiments/static_experiment.py b/benchmarks/core/experiments/static_experiment.py index d62e930..f891b7f 100644 --- a/benchmarks/core/experiments/static_experiment.py +++ b/benchmarks/core/experiments/static_experiment.py @@ -1,4 +1,4 @@ -from typing_extensions import Generic, List +from typing_extensions import Generic, List, Tuple from benchmarks.core.experiments.experiments import Experiment from benchmarks.core.network import TInitialMetadata, TNetworkHandle, Node @@ -21,17 +21,7 @@ class StaticDisseminationExperiment(Generic[TNetworkHandle, TInitialMetadata], E self.data = data def run(self, run: int = 0): - seeders, leechers = ( - [ - self.nodes[i] - for i in self.seeders - ], - [ - self.nodes[i] - for i in range(0, len(self.nodes)) - if i not in self.seeders - ] - ) + seeders, leechers = self._split_nodes() logger.info('Running experiment with %d seeders and %d leechers', len(seeders), len(leechers)) @@ -51,3 +41,16 @@ class StaticDisseminationExperiment(Generic[TNetworkHandle, TInitialMetadata], E for i, download in enumerate(downloads): download.await_for_completion() logger.info('Download %d / %d completed', i + 1, len(downloads)) + + def _split_nodes(self) -> Tuple[ + List[Node[TNetworkHandle, TInitialMetadata]], + List[Node[TNetworkHandle, TInitialMetadata]] + ]: + return [ + self.nodes[i] + for i in self.seeders + ], [ + self.nodes[i] + for i in range(0, len(self.nodes)) + if i not in self.seeders + ] diff --git a/benchmarks/core/experiments/tests/test_experiments.py b/benchmarks/core/experiments/tests/test_experiments.py new file mode 100644 index 0000000..3126c00 --- /dev/null +++ b/benchmarks/core/experiments/tests/test_experiments.py @@ -0,0 +1,74 @@ +from time import sleep +from typing import List + +from benchmarks.core.experiments.experiments import ExperimentComponent, ExperimentEnvironment, Experiment + + +class ExternalComponent(ExperimentComponent): + + @property + def readiness_timeout(self) -> float: + return 0.1 + + def __init__(self, loops: int, wait_time: float = 0.0): + self.loops = loops + self.iteration = 0 + self.wait_time = wait_time + + def is_ready(self) -> bool: + sleep(self.wait_time) + if self.iteration < self.loops: + self.iteration += 1 + return False + + return True + + +def test_should_await_until_components_are_ready(): + components = [ + ExternalComponent(5), + ExternalComponent(3), + ] + + environment = ExperimentEnvironment(components, polling_interval=0) + assert environment.await_ready() + + assert components[0].iteration == 5 + assert components[1].iteration == 3 + + +def test_should_timeout_if_component_takes_too_long(): + components = [ + ExternalComponent(5), + ExternalComponent(3, wait_time=0.1), + ] + + environment = ExperimentEnvironment(components, polling_interval=0) + assert not environment.await_ready(0.1) + + assert components[0].iteration == 5 + assert components[1].iteration < 3 + + +class ExperimentThatReliesOnComponents(Experiment): + def __init__(self, components: List[ExperimentComponent]): + self.components = components + + def run(self): + assert all(component.is_ready() for component in self.components) + + +def test_should_bind_experiment_to_environment(): + components = [ + ExternalComponent(5), + ExternalComponent(3), + ] + + env = ExperimentEnvironment(components, polling_interval=0) + experiment = ExperimentThatReliesOnComponents(components) + bound = env.bind(experiment) + + bound.run() + + assert components[0].is_ready() + assert components[1].is_ready() diff --git a/benchmarks/core/experiments/tests/test_static_experiment.py b/benchmarks/core/experiments/tests/test_static_experiment.py index 89974e4..3674efc 100644 --- a/benchmarks/core/experiments/tests/test_static_experiment.py +++ b/benchmarks/core/experiments/tests/test_static_experiment.py @@ -2,9 +2,9 @@ from dataclasses import dataclass from pathlib import Path from typing import Optional, List, Tuple, Union -from benchmarks.core.network import Node, DownloadHandle from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment from benchmarks.core.experiments.tests.utils import MockExperimentData +from benchmarks.core.network import Node, DownloadHandle @dataclass @@ -29,6 +29,7 @@ class MockNode(Node[MockHandle, str]): file: Path, handle: Union[str, MockHandle] ) -> MockHandle: + if isinstance(handle, MockHandle): self.seeding = (handle, file) else: @@ -37,6 +38,7 @@ class MockNode(Node[MockHandle, str]): return self.seeding[0] def leech(self, handle: MockHandle): + self.leeching = handle return MockDownloadHandle(self) diff --git a/benchmarks/core/utils.py b/benchmarks/core/utils.py index 02e7673..d4e09fa 100644 --- a/benchmarks/core/utils.py +++ b/benchmarks/core/utils.py @@ -5,7 +5,8 @@ from abc import ABC, abstractmethod from contextlib import contextmanager, AbstractContextManager from dataclasses import dataclass from pathlib import Path -from typing import Iterator, Tuple, ContextManager, Optional +from time import time, sleep +from typing import Iterator, Tuple, ContextManager, Optional, Callable from typing_extensions import Generic @@ -59,6 +60,16 @@ def temp_random_file(size: int, name: str = 'data.bin'): yield random_file +def await_predicate(predicate: Callable[[], bool], timeout: float = 0, polling_interval: float = 0) -> bool: + current = time() + while (timeout == 0) or ((time() - current) <= timeout): + if predicate(): + return True + sleep(polling_interval) + + return False + + def sample(n: int) -> Iterator[int]: """Samples without replacement using a basic Fisher-Yates shuffle.""" p = list(range(0, n)) diff --git a/benchmarks/deluge/deluge_node.py b/benchmarks/deluge/deluge_node.py index 5a7b7f5..a4be24f 100644 --- a/benchmarks/deluge/deluge_node.py +++ b/benchmarks/deluge/deluge_node.py @@ -4,7 +4,6 @@ import shutil from dataclasses import dataclass from io import BytesIO from pathlib import Path -from time import time, sleep from typing import List, Union, Optional, Self, Dict, Any import pathvalidate @@ -12,7 +11,9 @@ from deluge_client import DelugeRPCClient from torrentool.torrent import Torrent from urllib3.util import Url +from benchmarks.core.experiments.experiments import ExperimentComponent from benchmarks.core.network import SharedFSNode, DownloadHandle +from benchmarks.core.utils import await_predicate logger = logging.getLogger(__name__) @@ -25,7 +26,7 @@ class DelugeMeta: announce_url: Url -class DelugeNode(SharedFSNode[Torrent, DelugeMeta]): +class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): def __init__( self, @@ -123,6 +124,13 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta]): self._rpc = client return self + def is_ready(self) -> bool: + try: + self.connect() + return True + except ConnectionRefusedError: + return False + def _init_folders(self): self.downloads_root.mkdir(parents=True, exist_ok=True) @@ -141,16 +149,13 @@ class DelugeDownloadHandle(DownloadHandle): def await_for_completion(self, timeout: float = 0) -> bool: name = self.torrent.name - current = time() - while (timeout == 0) or ((time() - current) <= timeout): + + def _predicate(): response = self.node.rpc.core.get_torrents_status({'name': name}, []) if len(response) > 1: logger.warning(f'Client has multiple torrents matching name {name}. Returning the first one.') status = list(response.values())[0] - if status[b'is_seed']: - return True + return status[b'is_seed'] - sleep(0.5) - - return False + return await_predicate(_predicate, timeout=timeout) diff --git a/benchmarks/deluge/tests/fixtures.py b/benchmarks/deluge/tests/fixtures.py index a1af273..1ec1e06 100644 --- a/benchmarks/deluge/tests/fixtures.py +++ b/benchmarks/deluge/tests/fixtures.py @@ -5,13 +5,15 @@ import pytest from urllib3.util import Url, parse_url from benchmarks.core import utils -from benchmarks.core.utils import megabytes +from benchmarks.core.utils import megabytes, await_predicate from benchmarks.deluge.deluge_node import DelugeNode +from benchmarks.deluge.tracker import Tracker from benchmarks.tests.utils import shared_volume def deluge_node(name: str, port: int) -> Generator[DelugeNode, None, None]: node = DelugeNode(name, volume=shared_volume(), daemon_port=port) + await_predicate(node.is_ready, timeout=10, polling_interval=0.5) node.wipe_all_torrents() try: yield node @@ -41,5 +43,5 @@ def temp_random_file() -> Generator[Path, None, None]: @pytest.fixture -def tracker() -> Url: - return parse_url('http://127.0.0.1:8000/announce') +def tracker() -> Tracker: + return Tracker(parse_url('http://127.0.0.1:8000/announce')) diff --git a/benchmarks/deluge/tests/test_deluge_node.py b/benchmarks/deluge/tests/test_deluge_node.py index 8aaee82..63cac35 100644 --- a/benchmarks/deluge/tests/test_deluge_node.py +++ b/benchmarks/deluge/tests/test_deluge_node.py @@ -1,10 +1,11 @@ from pathlib import Path import pytest -from urllib3.util import Url from benchmarks.core.utils import megabytes from benchmarks.deluge.deluge_node import DelugeNode, DelugeMeta +from benchmarks.deluge.tracker import Tracker + @pytest.mark.integration def assert_is_seed(node: DelugeNode, name: str, size: int): @@ -12,25 +13,27 @@ def assert_is_seed(node: DelugeNode, name: str, size: int): assert len(response) == 1 info = response[0] - assert info[b'name'] == name.encode('utf-8') # not sure that this works for ANY name... + assert info[b'name'] == name.encode('utf-8') # not sure that this works for ANY name... assert info[b'total_size'] == size assert info[b'is_seed'] == True + @pytest.mark.integration -def test_should_seed_files(deluge_node1: DelugeNode, temp_random_file: Path, tracker: Url): +def test_should_seed_files(deluge_node1: DelugeNode, temp_random_file: Path, tracker: Tracker): assert not deluge_node1.torrent_info(name='dataset1') - deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker)) + deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker.announce_url)) assert_is_seed(deluge_node1, name='dataset1', size=megabytes(1)) + @pytest.mark.integration def test_should_download_files( deluge_node1: DelugeNode, deluge_node2: DelugeNode, - temp_random_file: Path, tracker: Url): + temp_random_file: Path, tracker: Tracker): assert not deluge_node1.torrent_info(name='dataset1') assert not deluge_node2.torrent_info(name='dataset1') - torrent = deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker)) + torrent = deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker.announce_url)) handle = deluge_node2.leech(torrent) assert handle.await_for_completion(5) diff --git a/benchmarks/deluge/tests/test_deluge_static_experiment.py b/benchmarks/deluge/tests/test_deluge_static_experiment.py index db6369e..81b4cb0 100644 --- a/benchmarks/deluge/tests/test_deluge_static_experiment.py +++ b/benchmarks/deluge/tests/test_deluge_static_experiment.py @@ -1,21 +1,28 @@ import pytest +from benchmarks.core.experiments.experiments import ExperimentEnvironment from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment from benchmarks.core.utils import RandomTempData, megabytes from benchmarks.deluge.deluge_node import DelugeMeta from benchmarks.deluge.tests.test_deluge_node import assert_is_seed + @pytest.mark.integration def test_should_run_with_a_single_seeder(tracker, deluge_node1, deluge_node2, deluge_node3): size = megabytes(10) - experiment = StaticDisseminationExperiment( + env = ExperimentEnvironment( + components=[deluge_node1, deluge_node2, deluge_node3, tracker], + polling_interval=0.5, + ) + + experiment = env.bind(StaticDisseminationExperiment( network=[deluge_node1, deluge_node2, deluge_node3], seeders=[1], data=RandomTempData( size=size, - meta=DelugeMeta('dataset-1', announce_url=tracker) + meta=DelugeMeta('dataset-1', announce_url=tracker.announce_url) ) - ) + )) experiment.run() @@ -23,20 +30,26 @@ def test_should_run_with_a_single_seeder(tracker, deluge_node1, deluge_node2, de assert_is_seed(deluge_node2, 'dataset-1', size) assert_is_seed(deluge_node3, 'dataset-1', size) + @pytest.mark.integration def test_should_run_with_multiple_seeders(tracker, deluge_node1, deluge_node2, deluge_node3): size = megabytes(10) - experiment = StaticDisseminationExperiment( + env = ExperimentEnvironment( + components=[deluge_node1, deluge_node2, deluge_node3, tracker], + polling_interval=0.5, + ) + + experiment = env.bind(StaticDisseminationExperiment( network=[deluge_node1, deluge_node2, deluge_node3], seeders=[1, 2], data=RandomTempData( size=size, - meta=DelugeMeta('dataset-1', announce_url=tracker) + meta=DelugeMeta('dataset-1', announce_url=tracker.announce_url) ) - ) + )) experiment.run() assert_is_seed(deluge_node1, 'dataset-1', size) assert_is_seed(deluge_node2, 'dataset-1', size) - assert_is_seed(deluge_node3, 'dataset-1', size) \ No newline at end of file + assert_is_seed(deluge_node3, 'dataset-1', size) diff --git a/benchmarks/deluge/tracker.py b/benchmarks/deluge/tracker.py new file mode 100644 index 0000000..b292b3a --- /dev/null +++ b/benchmarks/deluge/tracker.py @@ -0,0 +1,17 @@ +import requests +from urllib3.util import Url + +from benchmarks.core.experiments.experiments import ExperimentComponent + + +class Tracker(ExperimentComponent): + + def __init__(self, announce_url: Url): + self.announce_url = announce_url + + def is_ready(self) -> bool: + try: + requests.get(str(self.announce_url)) + return True + except ConnectionError: + return False diff --git a/bittorrent-benchmarks.Dockerfile b/bittorrent-benchmarks.Dockerfile index e377749..55c1bac 100644 --- a/bittorrent-benchmarks.Dockerfile +++ b/bittorrent-benchmarks.Dockerfile @@ -7,6 +7,7 @@ RUN pip install poetry && poetry config virtualenvs.create false RUN mkdir /opt/bittorrent-benchmarks WORKDIR /opt/bittorrent-benchmarks + COPY pyproject.toml poetry.lock ./ RUN if [ "$BUILD_TYPE" = "production" ]; then \ echo "Image is a production build"; \ diff --git a/poetry.lock b/poetry.lock index 7f362dc..1bf3b03 100644 --- a/poetry.lock +++ b/poetry.lock @@ -388,6 +388,20 @@ files = [ {file = "types_PyYAML-6.0.12.20240917-py3-none-any.whl", hash = "sha256:392b267f1c0fe6022952462bf5d6523f31e37f6cea49b14cee7ad634b6301570"}, ] +[[package]] +name = "types-requests" +version = "2.32.0.20241016" +description = "Typing stubs for requests" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-requests-2.32.0.20241016.tar.gz", hash = "sha256:0d9cad2f27515d0e3e3da7134a1b6f28fb97129d86b867f24d9c726452634d95"}, + {file = "types_requests-2.32.0.20241016-py3-none-any.whl", hash = "sha256:4195d62d6d3e043a4eaaf08ff8a62184584d2e8684e9d2aa178c7915a7da3747"}, +] + +[package.dependencies] +urllib3 = ">=2" + [[package]] name = "typing-extensions" version = "4.12.2" @@ -399,7 +413,24 @@ files = [ {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, ] +[[package]] +name = "urllib3" +version = "2.2.3" +description = "HTTP library with thread-safe connection pooling, file post, and more." +optional = false +python-versions = ">=3.8" +files = [ + {file = "urllib3-2.2.3-py3-none-any.whl", hash = "sha256:ca899ca043dcb1bafa3e262d73aa25c465bfb49e0bd9dd5d59f1d0acba2f8fac"}, + {file = "urllib3-2.2.3.tar.gz", hash = "sha256:e7d814a81dad81e6caf2ec9fdedb284ecc9c73076b62654547cc64ccdcae26e9"}, +] + +[package.extras] +brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] +h2 = ["h2 (>=4,<5)"] +socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] +zstd = ["zstandard (>=0.18.0)"] + [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "c10ab6006a3097ae8fcbac02448e98cf18f61146ab311979e1e9d5e735e2369d" +content-hash = "53a8f96ed6933280837a914e11a80694a9739726697a4b9bcfdf4981c9e6eeca" diff --git a/pyproject.toml b/pyproject.toml index b090005..a999677 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ pyyaml = "^6.0.2" pytest = "^8.3.3" mypy = "^1.13.0" types-pyyaml = "^6.0.12.20240917" +types-requests = "^2.32.0.20241016" [tool.pytest.ini_options] markers = [