diff --git a/.gitignore b/.gitignore index 703d3e2..620f85d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,6 @@ .idea .vscode /volume/deluge* -*.timestamp .RData .Rhistory .Rproj.user diff --git a/Makefile b/Makefile index 5ab47ca..eac50ad 100644 --- a/Makefile +++ b/Makefile @@ -7,10 +7,9 @@ SHELL := bash harness-stop \ integration \ tests \ - unit-docker \ integration-docker \ - image-test \ image-release \ + image-test \ clean # Runs the unit tests locally. @@ -19,7 +18,6 @@ unit: # Starts the local integration harness. This is required for running pytest with the "integration" marker. harness-start: - rm -rf ${PWD}/volume/deluge-{1,2,3} docker compose -f docker-compose.local.yaml up # Stops the local integration harness. @@ -33,7 +31,6 @@ integration: tests: unit integration -# Builds the test image required for local dockerized integration tests. image-test: docker build -t bittorrent-benchmarks:test -f ./docker/bittorrent-benchmarks.Dockerfile . @@ -41,19 +38,11 @@ image-release: docker build -t bittorrent-benchmarks:test --build-arg BUILD_TYPE="release" \ -f ./docker/bittorrent-benchmarks.Dockerfile . -# Runs the unit tests in a docker container. -unit-docker: image-test - docker run --entrypoint poetry --rm bittorrent-benchmarks:test run pytest -m "not integration" - # Runs the integration tests in a docker container. -integration-docker: image-test +integration-docker: docker compose -f docker-compose.local.yaml -f docker-compose.ci.yaml down --volumes --remove-orphans docker compose -f docker-compose.local.yaml -f docker-compose.ci.yaml up \ --abort-on-container-exit --exit-code-from test-runner -tests-docker: unit-docker integration-docker - clean: - rm -rf docker/.lastbuilt* - rm -rf volume/deluge-{1,2,3} docker compose -f docker-compose.local.yaml -f docker-compose.ci.yaml down --volumes --rmi all --remove-orphans diff --git a/benchmarks/cli.py b/benchmarks/cli.py index cbcd8f5..d576634 100644 --- a/benchmarks/cli.py +++ b/benchmarks/cli.py @@ -4,26 +4,34 @@ import sys from pathlib import Path from typing import Dict +import uvicorn +from pydantic import IPvAnyAddress from pydantic_core import ValidationError +from typing_extensions import TypeVar -from benchmarks.core.config import ConfigParser +from benchmarks.core.agent import AgentBuilder +from benchmarks.core.config import ConfigParser, Builder from benchmarks.core.experiments.experiments import Experiment, ExperimentBuilder +from benchmarks.deluge.agent.api import DelugeAgentConfig +from benchmarks.deluge.config import DelugeExperimentConfig +from benchmarks.deluge.logging import DelugeTorrentDownload from benchmarks.logging.logging import ( basic_log_parser, LogSplitter, LogEntry, LogSplitterFormats, ) -from benchmarks.deluge.config import DelugeExperimentConfig -from benchmarks.deluge.logging import DelugeTorrentDownload from benchmarks.logging.sources import ( VectorFlatFileSource, FSOutputManager, split_logs_in_source, ) -config_parser = ConfigParser[ExperimentBuilder]() -config_parser.register(DelugeExperimentConfig) +experiment_config_parser = ConfigParser[ExperimentBuilder]() +experiment_config_parser.register(DelugeExperimentConfig) + +agent_config_parser = ConfigParser[AgentBuilder]() +agent_config_parser.register(DelugeAgentConfig) log_parser = basic_log_parser() log_parser.register(DelugeTorrentDownload) @@ -34,13 +42,13 @@ log_parser.register(DECLogEntry) logger = logging.getLogger(__name__) -def cmd_list(experiments: Dict[str, ExperimentBuilder[Experiment]], _): +def cmd_list_experiment(experiments: Dict[str, ExperimentBuilder[Experiment]], _): print("Available experiments are:") for experiment in experiments.keys(): print(f" - {experiment}") -def cmd_run(experiments: Dict[str, ExperimentBuilder[Experiment]], args): +def cmd_run_experiment(experiments: Dict[str, ExperimentBuilder[Experiment]], args): if args.experiment not in experiments: print(f"Experiment {args.experiment} not found.") sys.exit(-1) @@ -50,14 +58,14 @@ def cmd_run(experiments: Dict[str, ExperimentBuilder[Experiment]], args): experiment.build().run() -def cmd_describe(args): +def cmd_describe_experiment(args): if not args.type: print("Available experiment types are:") - for experiment in config_parser.experiment_types.keys(): + for experiment in experiment_config_parser.experiment_types.keys(): print(f" - {experiment}") return - print(config_parser.experiment_types[args.type].schema_json(indent=2)) + print(experiment_config_parser.experiment_types[args.type].schema_json(indent=2)) def cmd_parse_single_log(log: Path, output: Path): @@ -107,14 +115,33 @@ def cmd_parse_log_source(group_id: str, source_file: Path, output_dir: Path): ) -def _parse_config(config: Path) -> Dict[str, ExperimentBuilder[Experiment]]: +def cmd_run_agent(agents: Dict[str, AgentBuilder], args): + if args.agent not in agents: + print(f"Agent type {args.experiment} not found.") + sys.exit(-1) + + uvicorn.run( + agents[args.agent].build(), + host=str(args.host), + port=args.port, + reload=False, + workers=1, + ) + + +T = TypeVar("T") + + +def _parse_config( + config: Path, parser: ConfigParser[Builder[T]] +) -> Dict[str, Builder[T]]: if not config.exists(): print(f"Config file {config} does not exist.") sys.exit(-1) with config.open(encoding="utf-8") as infile: try: - return config_parser.parse(infile) + return parser.parse(infile) except ValidationError as e: print("There were errors parsing the config file.") for error in e.errors(): @@ -147,11 +174,17 @@ def main(): list_cmd = experiment_commands.add_parser( "list", help="Lists available experiments." ) - list_cmd.set_defaults(func=lambda args: cmd_list(_parse_config(args.config), args)) + list_cmd.set_defaults( + func=lambda args: cmd_list_experiment(_parse_config(args.config), args) + ) run_cmd = experiment_commands.add_parser("run", help="Runs an experiment") run_cmd.add_argument("experiment", type=str, help="Name of the experiment to run.") - run_cmd.set_defaults(func=lambda args: cmd_run(_parse_config(args.config), args)) + run_cmd.set_defaults( + func=lambda args: cmd_run_experiment( + _parse_config(args.config, experiment_config_parser), args + ) + ) describe_cmd = commands.add_parser( "describe", help="Shows the JSON schema for the various experiment types." @@ -160,11 +193,11 @@ def main(): "type", type=str, help="Type of the experiment to describe.", - choices=config_parser.experiment_types.keys(), + choices=experiment_config_parser.experiment_types.keys(), nargs="?", ) - describe_cmd.set_defaults(func=cmd_describe) + describe_cmd.set_defaults(func=cmd_describe_experiment) logs_cmd = commands.add_parser("logs", help="Parse logs.") log_subcommands = logs_cmd.add_subparsers(required=True) @@ -198,6 +231,27 @@ def main(): ) ) + agent_cmd = commands.add_parser("agent", help="Starts a local agent.") + agent_cmd.add_argument( + "config", type=Path, help="Path to the agent configuration file." + ) + agent_cmd.add_argument("agent", type=str, help="Name of the agent to run.") + agent_cmd.add_argument( + "--host", + type=IPvAnyAddress, + help="IP address to bind to.", + default=IPvAnyAddress("0.0.0.0"), + ) + agent_cmd.add_argument( + "--port", type=int, help="Port to listen to connections.", default=9001 + ) + + agent_cmd.set_defaults( + func=lambda args: cmd_run_agent( + _parse_config(args.config, agent_config_parser), args + ) + ) + args = parser.parse_args() _init_logging() diff --git a/benchmarks/core/agent.py b/benchmarks/core/agent.py new file mode 100644 index 0000000..c9f2aeb --- /dev/null +++ b/benchmarks/core/agent.py @@ -0,0 +1,7 @@ +from fastapi import FastAPI + +from benchmarks.core.config import Builder + +# Agents are auxiliary containers deployed alongside a :class:`Node` which allow us to implement +# arbitrary actions, like generating files for experiments, close to the node itself. +AgentBuilder = Builder[FastAPI] diff --git a/benchmarks/core/config.py b/benchmarks/core/config.py index fde9fb8..57bf48d 100644 --- a/benchmarks/core/config.py +++ b/benchmarks/core/config.py @@ -30,8 +30,9 @@ class ConfigParser(Generic[TBuilder]): Currently, each :class:`Builder` type can appear at most once in the config file. """ - def __init__(self): - self.experiment_types = {} + def __init__(self, ignore_unknown: bool = True) -> None: + self.experiment_types: Dict[str, Type[TBuilder]] = {} + self.ignore_unknown = ignore_unknown def register(self, root: Type[TBuilder]): self.experiment_types[root.alias()] = root @@ -51,4 +52,5 @@ class ConfigParser(Generic[TBuilder]): return { tag: self.experiment_types[tag].model_validate(config) for tag, config in entries.items() + if tag in self.experiment_types or not self.ignore_unknown } diff --git a/benchmarks/core/experiments/static_experiment.py b/benchmarks/core/experiments/static_experiment.py index 22923ac..a7773f2 100644 --- a/benchmarks/core/experiments/static_experiment.py +++ b/benchmarks/core/experiments/static_experiment.py @@ -6,14 +6,13 @@ from typing import Sequence, Optional from typing_extensions import Generic, List, Tuple from benchmarks.core.experiments.experiments import ExperimentWithLifecycle -from benchmarks.logging.logging import RequestEvent, RequestEventType from benchmarks.core.network import ( TInitialMetadata, TNetworkHandle, Node, DownloadHandle, ) -from benchmarks.core.utils import ExperimentData +from benchmarks.logging.logging import RequestEvent, RequestEventType logger = logging.getLogger(__name__) @@ -25,13 +24,18 @@ class StaticDisseminationExperiment( self, network: Sequence[Node[TNetworkHandle, TInitialMetadata]], seeders: List[int], - data: ExperimentData[TInitialMetadata], + meta: TInitialMetadata, + file_size: int, + seed: int, concurrency: Optional[int] = None, logging_cooldown: int = 0, ) -> None: self.nodes = network self.seeders = seeders - self.data = data + self.meta = meta + self.file_size = file_size + self.seed = seed + self._pool = ThreadPool( processes=len(network) - len(seeders) if concurrency is None @@ -52,47 +56,40 @@ class StaticDisseminationExperiment( len(leechers), ) - with self.data as (meta, data): - for node in seeders: - _log_request(node, "seed", str(meta), RequestEventType.start) - self._cid = node.seed(data, meta if self._cid is None else self._cid) - _log_request(node, "seed", str(meta), RequestEventType.end) + for node in seeders: + _log_request(node, "genseed", str(self.meta), RequestEventType.start) + self._cid = node.genseed(self.file_size, self.seed, self.meta) + _log_request(node, "genseed", str(self.meta), RequestEventType.end) - assert self._cid is not None # to please mypy + assert self._cid is not None # to please mypy - logger.info( - f"Setting up leechers: {[str(leecher) for leecher in leechers]}" - ) + logger.info(f"Setting up leechers: {[str(leecher) for leecher in leechers]}") - def _leech(leecher): - _log_request(leecher, "leech", str(meta), RequestEventType.start) - download = leecher.leech(self._cid) - _log_request(leecher, "leech", str(meta), RequestEventType.end) - return download + def _leech(leecher): + _log_request(leecher, "leech", str(self.meta), RequestEventType.start) + download = leecher.leech(self._cid) + _log_request(leecher, "leech", str(self.meta), RequestEventType.end) + return download - downloads = list(self._pool.imap_unordered(_leech, leechers)) + downloads = list(self._pool.imap_unordered(_leech, leechers)) - logger.info("Now waiting for downloads to complete") + logger.info("Now waiting for downloads to complete") - def _await_for_download(element: Tuple[int, DownloadHandle]) -> int: - index, download = element - if not download.await_for_completion(): - raise Exception( - f"Download ({index}, {str(download)}) did not complete in time." - ) - return index + def _await_for_download(element: Tuple[int, DownloadHandle]) -> int: + index, download = element + if not download.await_for_completion(): + raise Exception( + f"Download ({index}, {str(download)}) did not complete in time." + ) + return index - for i in self._pool.imap_unordered( - _await_for_download, enumerate(downloads) - ): - logger.info("Download %d / %d completed", i + 1, len(downloads)) + for i in self._pool.imap_unordered(_await_for_download, enumerate(downloads)): + logger.info("Download %d / %d completed", i + 1, len(downloads)) - # FIXME this is a hack to ensure that nodes get a chance to log their data before we - # run the teardown hook and remove the torrents. - logger.info( - f"Waiting for {self.logging_cooldown} seconds before teardown..." - ) - sleep(self.logging_cooldown) + # FIXME this is a hack to ensure that nodes get a chance to log their data before we + # run the teardown hook and remove the torrents. + logger.info(f"Waiting for {self.logging_cooldown} seconds before teardown...") + sleep(self.logging_cooldown) def teardown(self, exception: Optional[Exception] = None): def _remove(element: Tuple[int, Node[TNetworkHandle, TInitialMetadata]]): diff --git a/benchmarks/core/experiments/tests/test_static_experiment.py b/benchmarks/core/experiments/tests/test_static_experiment.py index 1f27ef0..16397b2 100644 --- a/benchmarks/core/experiments/tests/test_static_experiment.py +++ b/benchmarks/core/experiments/tests/test_static_experiment.py @@ -1,29 +1,28 @@ from dataclasses import dataclass from io import StringIO -from pathlib import Path -from typing import Optional, List, Tuple, Union +from typing import Optional, List from unittest.mock import patch from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment -from benchmarks.core.experiments.tests.utils import MockExperimentData -from benchmarks.logging.logging import LogParser, RequestEvent, RequestEventType from benchmarks.core.network import Node, DownloadHandle +from benchmarks.logging.logging import LogParser, RequestEvent, RequestEventType @dataclass -class MockHandle: - path: Path +class MockGenData: + size: int + seed: int name: str def __str__(self): return self.name -class MockNode(Node[MockHandle, str]): +class MockNode(Node[MockGenData, str]): def __init__(self, name="mock_node") -> None: self._name = name - self.seeding: Optional[Tuple[MockHandle, Path]] = None - self.leeching: Optional[MockHandle] = None + self.seeding: Optional[MockGenData] = None + self.leeching: Optional[MockGenData] = None self.download_was_awaited = False self.cleanup_was_called = False @@ -31,23 +30,19 @@ class MockNode(Node[MockHandle, str]): def name(self) -> str: return self._name - def seed(self, file: Path, handle: Union[str, MockHandle]) -> MockHandle: - if isinstance(handle, MockHandle): - self.seeding = (handle, file) - else: - self.seeding = (MockHandle(name=handle, path=file), file) + def genseed(self, size: int, seed: int, meta: str) -> MockGenData: + self.seeding = MockGenData(size=size, seed=seed, name=meta) + return self.seeding - return self.seeding[0] - - def leech(self, handle: MockHandle): + def leech(self, handle: MockGenData): self.leeching = handle return MockDownloadHandle(self) - def remove(self, handle: MockHandle): + def remove(self, handle: MockGenData): if self.leeching is not None: assert self.leeching == handle elif self.seeding is not None: - assert self.seeding[0] == handle + assert self.seeding == handle else: raise Exception( "Either leech or seed must be called before attempting a remove" @@ -69,15 +64,13 @@ def mock_network(n: int) -> List[MockNode]: return [MockNode(f"node-{i}") for i in range(n)] -def test_should_place_seeders(): +def test_should_generate_correct_data_and_seed(): network = mock_network(n=13) - data = MockExperimentData(meta="data", data=Path("/path/to/data")) + gendata = MockGenData(size=1000, seed=12, name="dataset1") seeders = [9, 6, 3] experiment = StaticDisseminationExperiment( - seeders=seeders, - network=network, - data=data, + seeders=seeders, network=network, meta="dataset1", file_size=1000, seed=12 ) experiment.run() @@ -86,20 +79,22 @@ def test_should_place_seeders(): for index, node in enumerate(network): if node.seeding is not None: actual_seeders.add(index) - assert node.seeding[0] == MockHandle(name=data.meta, path=data.data) + assert node.seeding == gendata assert actual_seeders == set(seeders) def test_should_download_at_remaining_nodes(): network = mock_network(n=13) - data = MockExperimentData(meta="data", data=Path("/path/to/data")) + gendata = MockGenData(size=1000, seed=12, name="dataset1") seeders = [9, 6, 3] experiment = StaticDisseminationExperiment( seeders=seeders, network=network, - data=data, + meta="dataset1", + file_size=1000, + seed=12, ) experiment.run() @@ -107,8 +102,7 @@ def test_should_download_at_remaining_nodes(): actual_leechers = set() for index, node in enumerate(network): if node.leeching is not None: - assert node.leeching.path == data.data - assert node.leeching.name == data.meta + assert node.leeching == gendata assert node.seeding is None assert node.download_was_awaited actual_leechers.add(index) @@ -116,34 +110,18 @@ def test_should_download_at_remaining_nodes(): assert actual_leechers == set(range(13)) - set(seeders) -def test_should_delete_generated_file_at_end_of_experiment(): - network = mock_network(n=2) - data = MockExperimentData(meta="data", data=Path("/path/to/data")) - seeders = [1] - - experiment = StaticDisseminationExperiment( - seeders=seeders, - network=network, - data=data, - ) - - experiment.run() - - assert data.cleanup_called - - def test_should_log_requests_to_seeders_and_leechers(mock_logger): logger, output = mock_logger with patch("benchmarks.core.experiments.static_experiment.logger", logger): network = mock_network(n=3) - data = MockExperimentData(meta="dataset-1", data=Path("/path/to/data")) seeders = [1] experiment = StaticDisseminationExperiment( seeders=seeders, network=network, - data=data, - concurrency=1, + meta="dataset-1", + file_size=1000, + seed=12, ) experiment.run() @@ -157,7 +135,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger): RequestEvent( destination="node-1", node="runner", - name="seed", + name="genseed", request_id="dataset-1", type=RequestEventType.start, timestamp=events[0].timestamp, @@ -165,7 +143,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger): RequestEvent( destination="node-1", node="runner", - name="seed", + name="genseed", request_id="dataset-1", type=RequestEventType.end, timestamp=events[1].timestamp, @@ -207,13 +185,14 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger): def test_should_delete_file_from_nodes_at_the_end_of_the_experiment(): network = mock_network(n=2) - data = MockExperimentData(meta="data", data=Path("/path/to/data")) seeders = [1] experiment = StaticDisseminationExperiment( seeders=seeders, network=network, - data=data, + meta="dataset-1", + file_size=1000, + seed=12, ) experiment.run() diff --git a/benchmarks/core/experiments/tests/test_utils.py b/benchmarks/core/experiments/tests/test_utils.py new file mode 100644 index 0000000..ceeab6c --- /dev/null +++ b/benchmarks/core/experiments/tests/test_utils.py @@ -0,0 +1,31 @@ +from io import BytesIO + +from benchmarks.core.utils import random_data + + +def test_should_generate_the_requested_amount_of_bytes(): + f = BytesIO() + + random_data(size=1024, outfile=f) + + assert len(f.getvalue()) == 1024 + + +def test_should_generate_equal_files_for_equal_seeds(): + f1 = BytesIO() + f2 = BytesIO() + + random_data(size=1024, outfile=f1, seed=1234) + random_data(size=1024, outfile=f2, seed=1234) + + assert f1.getvalue() == f2.getvalue() + + +def test_should_generate_different_files_for_different_seeds(): + f1 = BytesIO() + f2 = BytesIO() + + random_data(size=1024, outfile=f1, seed=1234) + random_data(size=1024, outfile=f2, seed=1235) + + assert f1.getvalue() != f2.getvalue() diff --git a/benchmarks/core/experiments/tests/utils.py b/benchmarks/core/experiments/tests/utils.py deleted file mode 100644 index c928daf..0000000 --- a/benchmarks/core/experiments/tests/utils.py +++ /dev/null @@ -1,18 +0,0 @@ -from pathlib import Path -from typing import Tuple - -from benchmarks.core.network import TInitialMetadata -from benchmarks.core.utils import ExperimentData - - -class MockExperimentData(ExperimentData[TInitialMetadata]): - def __init__(self, meta: TInitialMetadata, data: Path): - self.cleanup_called = False - self.meta = meta - self.data = data - - def __enter__(self) -> Tuple[TInitialMetadata, Path]: - return self.meta, self.data - - def __exit__(self, exc_type, exc_val, exc_tb): - self.cleanup_called = True diff --git a/benchmarks/core/network.py b/benchmarks/core/network.py index 7116a02..efef44b 100644 --- a/benchmarks/core/network.py +++ b/benchmarks/core/network.py @@ -1,8 +1,6 @@ -import shutil from abc import abstractmethod, ABC -from pathlib import Path -from typing_extensions import Generic, TypeVar, Union +from typing_extensions import Generic, TypeVar TNetworkHandle = TypeVar("TNetworkHandle") TInitialMetadata = TypeVar("TInitialMetadata") @@ -30,20 +28,20 @@ class Node(ABC, Generic[TNetworkHandle, TInitialMetadata]): pass @abstractmethod - def seed( + def genseed( self, - file: Path, - handle: Union[TInitialMetadata, TNetworkHandle], + size: int, + seed: int, + meta: TInitialMetadata, ) -> TNetworkHandle: """ - Makes the current :class:`Node` a seeder for the specified file. + Generates a random file of given size and makes the current node a seeder for it. Identical seeds, + metadata, and sizes should result in identical network handles. - :param file: local path to the file to seed. - :param handle: file sharing typically requires some initial metadata when a file is first uploaded into the - network, and this will typically then result into a compact representation such as a manifest CID (Codex) - or a Torrent file (Bittorrent) which other nodes can then use to identify and locate both the file and its - metadata within the network. When doing an initial seed, this method should be called with the initial - metadata (TInitialMetadata). Subsequent calls should use the network handle (TNetworkHandle). + :param size: The size of the file to be seeded. + :param seed: The seed for the random number generator producing the file. + :param meta: Additional, client-specific metadata relevant to the seeding process. For torrents, + this could be the name of the torrent. :return: The network handle (TNetworkHandle) for this file. This handle should be used for subsequent calls to :meth:`seed`. @@ -64,22 +62,3 @@ class Node(ABC, Generic[TNetworkHandle, TInitialMetadata]): seeding it. For leechers, it will stop downloading it. In both cases, the file will be removed from the node's storage.""" pass - - -class SharedFSNode(Node[TNetworkHandle, TInitialMetadata], ABC): - """A `SharedFSNode` is a :class:`Node` which shares a network volume with us. This means - we are able to upload files to it by means of simple file copies.""" - - def __init__(self, volume: Path): - self.volume = volume - - def upload(self, local: Path, name: str) -> Path: - target_path = self.volume / name - target_path.mkdir(parents=True, exist_ok=True) - target = target_path / local.name - if local.is_dir(): - shutil.copytree(local, target) - else: - shutil.copy(local, target) - - return target diff --git a/benchmarks/core/utils.py b/benchmarks/core/utils.py index 92fde77..f94c64f 100644 --- a/benchmarks/core/utils.py +++ b/benchmarks/core/utils.py @@ -1,50 +1,6 @@ -import os import random -import tempfile -from abc import ABC, abstractmethod -from contextlib import contextmanager, AbstractContextManager -from dataclasses import dataclass -from pathlib import Path from time import time, sleep -from typing import Iterator, Tuple, ContextManager, Optional, Callable - -from typing_extensions import Generic - -from benchmarks.core.network import TInitialMetadata - - -@dataclass -class ExperimentData(Generic[TInitialMetadata], AbstractContextManager, ABC): - """:class:`ExperimentData` provides a context for providing and wiping out - data and metadata objects, usually within the scope of an experiment.""" - - @abstractmethod - def __enter__(self) -> Tuple[TInitialMetadata, Path]: - """Generates new data and metadata and returns it.""" - pass - - @abstractmethod - def __exit__(self, exc_type, exc_val, exc_tb): - """Wipes out data and metadata.""" - pass - - -class RandomTempData(ExperimentData[TInitialMetadata]): - def __init__(self, size: int, meta: TInitialMetadata): - self.meta = meta - self.size = size - self._context: Optional[ContextManager[Tuple[TInitialMetadata, Path]]] = None - - def __enter__(self) -> Tuple[TInitialMetadata, Path]: - if self._context is not None: - raise Exception("Cannot enter context twice") - - self._context = temp_random_file(self.size, "data.bin") - - return self.meta, self._context.__enter__() - - def __exit__(self, exc_type, exc_val, exc_tb): - self._context.__exit__(exc_type, exc_val, exc_tb) +from typing import Iterator, Optional, Callable, IO def await_predicate( @@ -78,19 +34,12 @@ def megabytes(n: int) -> int: return kilobytes(n) * 1024 -@contextmanager -def temp_random_file( - size: int, name: str = "data.bin", batch_size: int = megabytes(50) +def random_data( + size: int, outfile: IO, batch_size: int = megabytes(50), seed: Optional[int] = None ): - with tempfile.TemporaryDirectory() as temp_dir_str: - temp_dir = Path(temp_dir_str) - random_file = temp_dir / name - - with random_file.open("wb") as outfile: - while size > 0: - batch = min(size, batch_size) - random_bytes = os.urandom(batch) - outfile.write(random_bytes) - size -= batch - - yield random_file + rnd = random.Random(seed) if seed is not None else random + while size > 0: + batch = min(size, batch_size) + random_bytes = rnd.randbytes(batch) + outfile.write(random_bytes) + size -= batch diff --git a/volume/.marker b/benchmarks/deluge/agent/__init__.py similarity index 100% rename from volume/.marker rename to benchmarks/deluge/agent/__init__.py diff --git a/benchmarks/deluge/agent/agent.py b/benchmarks/deluge/agent/agent.py new file mode 100644 index 0000000..b3ddfd6 --- /dev/null +++ b/benchmarks/deluge/agent/agent.py @@ -0,0 +1,25 @@ +from pathlib import Path +from typing import Optional + +from torrentool.torrent import Torrent + +from benchmarks.core.utils import random_data, megabytes + + +class DelugeAgent: + def __init__(self, torrents_path: Path, batch_size: int = megabytes(50)): + self.torrents_path = torrents_path + self.batch_size = batch_size + + def create_torrent(self, name: str, size: int, seed: Optional[int]) -> Torrent: + torrent_path = self.torrents_path / name + torrent_path.mkdir(parents=True, exist_ok=False) + + file_path = torrent_path / "datafile.bin" + with file_path.open(mode="wb") as output: + random_data(size=size, outfile=output, seed=seed) + + torrent = Torrent.create_from(torrent_path) + torrent.name = name + + return torrent diff --git a/benchmarks/deluge/agent/api.py b/benchmarks/deluge/agent/api.py new file mode 100644 index 0000000..6383ea0 --- /dev/null +++ b/benchmarks/deluge/agent/api.py @@ -0,0 +1,43 @@ +from pathlib import Path +from typing import Annotated, Optional + +from fastapi import FastAPI, Depends, APIRouter, Response + +from benchmarks.core.agent import AgentBuilder + +from benchmarks.core.utils import megabytes +from benchmarks.deluge.agent.agent import DelugeAgent + +router = APIRouter() + + +def deluge_agent() -> DelugeAgent: + raise Exception("Dependency must be set") + + +@router.post("/api/v1/deluge/torrent") +def generate( + agent: Annotated[DelugeAgent, Depends(deluge_agent)], + name: str, + size: int, + seed: Optional[int], +): + return Response( + agent.create_torrent(name=name, size=size, seed=seed).to_string(), + media_type="application/octet-stream", + ) + + +class DelugeAgentConfig(AgentBuilder): + torrents_path: Path + batch_size: int = megabytes(50) + + def build(self) -> FastAPI: + app = FastAPI() + app.include_router(router) + agent = DelugeAgent( + torrents_path=self.torrents_path, + batch_size=self.batch_size, + ) + app.dependency_overrides[deluge_agent] = lambda: agent + return app diff --git a/benchmarks/deluge/agent/client.py b/benchmarks/deluge/agent/client.py new file mode 100644 index 0000000..c7b4608 --- /dev/null +++ b/benchmarks/deluge/agent/client.py @@ -0,0 +1,27 @@ +import requests +from tenacity import stop_after_attempt, wait_exponential, retry +from torrentool.torrent import Torrent +from urllib3.util import Url + + +class DelugeAgentClient: + def __init__(self, url: Url): + self.url = url + + def generate(self, size: int, seed: int, name: str) -> Torrent: + @retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=16), + ) + def _request(): + return requests.post( + url=self.url._replace(path="/api/v1/deluge/torrent").url, + params={ + "size": size, + "seed": seed, + "name": name, + }, + ) + + torrent = Torrent.from_string(_request().content) + return torrent diff --git a/benchmarks/deluge/agent/tests/__init__.py b/benchmarks/deluge/agent/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/deluge/agent/tests/test_agent.py b/benchmarks/deluge/agent/tests/test_agent.py new file mode 100644 index 0000000..10e09d5 --- /dev/null +++ b/benchmarks/deluge/agent/tests/test_agent.py @@ -0,0 +1,79 @@ +import tempfile +from pathlib import Path + +import pytest +from torrentool.torrent import TorrentFile + +from benchmarks.deluge.agent.agent import DelugeAgent + + +@pytest.fixture +def temp_dir(): + with tempfile.TemporaryDirectory() as temp_dir: + yield Path(temp_dir) + + +def test_should_create_torrent_at_specified_location(temp_dir): + agent = DelugeAgent( + torrents_path=temp_dir, + ) + + torrent_file = agent.create_torrent( + name="dataset-1", + size=1024, + seed=12, + ) + + assert torrent_file.name == "dataset-1" + assert torrent_file.total_size == 1024 + assert torrent_file.files == [TorrentFile("dataset-1/datafile.bin", 1024)] + + assert (temp_dir / "dataset-1" / "datafile.bin").stat().st_size == 1024 + + +def test_should_generate_identical_torrent_files_for_identical_seeds(temp_dir): + agent1 = DelugeAgent( + torrents_path=temp_dir / "d1", + ) + + torrent_file1 = agent1.create_torrent( + name="dataset-1", + size=1024, + seed=12, + ) + + agent2 = DelugeAgent( + torrents_path=temp_dir / "d2", + ) + + torrent_file2 = agent2.create_torrent( + name="dataset-1", + size=1024, + seed=12, + ) + + assert torrent_file1.to_string() == torrent_file2.to_string() + + +def test_should_generate_different_torrent_files_for_different_seeds(temp_dir): + agent1 = DelugeAgent( + torrents_path=temp_dir / "d1", + ) + + torrent_file1 = agent1.create_torrent( + name="dataset-1", + size=1024, + seed=12, + ) + + agent2 = DelugeAgent( + torrents_path=temp_dir / "d2", + ) + + torrent_file2 = agent2.create_torrent( + name="dataset-1", + size=1024, + seed=13, + ) + + assert torrent_file1.to_string() != torrent_file2.to_string() diff --git a/benchmarks/deluge/agent/tests/test_api.py b/benchmarks/deluge/agent/tests/test_api.py new file mode 100644 index 0000000..a255a2c --- /dev/null +++ b/benchmarks/deluge/agent/tests/test_api.py @@ -0,0 +1,25 @@ +from fastapi import FastAPI +from starlette.testclient import TestClient +from torrentool.torrent import Torrent + +from benchmarks.deluge.agent import api +from benchmarks.deluge.agent.agent import DelugeAgent +from benchmarks.deluge.agent.api import deluge_agent + + +def test_should_return_a_valid_byte_encoded_torrent_object(tmp_path): + app = FastAPI() + app.include_router(api.router) + app.dependency_overrides[deluge_agent] = lambda: DelugeAgent(tmp_path) + + client = TestClient(app) + response = client.post( + "/api/v1/deluge/torrent", + params={"name": "dataset-1", "size": 1024, "seed": 12}, + ) + + assert response.status_code == 200 + torrent = Torrent.from_string(response.content) + + assert torrent.name == "dataset-1" + assert torrent.total_size == 1024 diff --git a/benchmarks/deluge/config.py b/benchmarks/deluge/config.py index 85b1bd3..3fab27b 100644 --- a/benchmarks/deluge/config.py +++ b/benchmarks/deluge/config.py @@ -1,3 +1,4 @@ +import random from itertools import islice from pathlib import Path from typing import List @@ -14,7 +15,7 @@ from benchmarks.core.experiments.experiments import ( ) from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment from benchmarks.core.pydantic import Host -from benchmarks.core.utils import sample, RandomTempData +from benchmarks.core.utils import sample from benchmarks.deluge.deluge_node import DelugeMeta, DelugeNode from benchmarks.deluge.tracker import Tracker @@ -24,6 +25,7 @@ class DelugeNodeConfig(BaseModel): address: Host daemon_port: int listen_ports: list[int] = Field(min_length=2, max_length=2) + agent_url: HttpUrl class DelugeNodeSetConfig(BaseModel): @@ -34,6 +36,7 @@ class DelugeNodeSetConfig(BaseModel): listen_ports: list[int] = Field(min_length=2, max_length=2) first_node_index: int = 1 nodes: List[DelugeNodeConfig] = [] + agent_url: HttpUrl @model_validator(mode="after") def expand_nodes(self): @@ -43,6 +46,7 @@ class DelugeNodeSetConfig(BaseModel): address=self.address.format(node_index=str(i)), daemon_port=self.daemon_port, listen_ports=self.listen_ports, + agent_url=self.agent_url, ) for i in range( self.first_node_index, self.first_node_index + self.network_size @@ -97,6 +101,7 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]): volume=self.shared_volume_path, daemon_port=node_spec.daemon_port, daemon_address=str(node_spec.address), + agent_url=parse_url(str(node_spec.agent_url)), ) for i, node_spec in enumerate(nodes_specs) ] @@ -116,12 +121,11 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]): StaticDisseminationExperiment( network=network, seeders=seeders, - data=RandomTempData( - size=self.file_size, - meta=DelugeMeta( - f"dataset-{seeder_set}-{experiment_run}", - announce_url=tracker.announce_url, - ), + file_size=self.file_size, + seed=random.randint(0, 2**16), + meta=DelugeMeta( + f"dataset-{seeder_set}-{experiment_run}", + announce_url=tracker.announce_url, ), logging_cooldown=self.logging_cooldown, ) diff --git a/benchmarks/deluge/deluge_node.py b/benchmarks/deluge/deluge_node.py index 4f40b81..316469d 100644 --- a/benchmarks/deluge/deluge_node.py +++ b/benchmarks/deluge/deluge_node.py @@ -5,7 +5,7 @@ import socket from dataclasses import dataclass from io import BytesIO from pathlib import Path -from typing import List, Union, Optional, Self, Dict, Any +from typing import List, Optional, Self, Dict, Any import pathvalidate from deluge_client import DelugeRPCClient @@ -16,8 +16,9 @@ from torrentool.torrent import Torrent from urllib3.util import Url from benchmarks.core.experiments.experiments import ExperimentComponent -from benchmarks.core.network import SharedFSNode, DownloadHandle +from benchmarks.core.network import DownloadHandle from benchmarks.core.utils import await_predicate +from benchmarks.deluge.agent.client import DelugeAgentClient logger = logging.getLogger(__name__) @@ -31,12 +32,13 @@ class DelugeMeta: announce_url: Url -class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): +class DelugeNode(ExperimentComponent): def __init__( self, name: str, volume: Path, daemon_port: int, + agent_url: Url = Url(scheme="http", host="localhost", port=8000), daemon_address: str = "localhost", daemon_username: str = "user", daemon_password: str = "password", @@ -45,7 +47,7 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): raise ValueError(f'Node name must be a valid filename (bad name: "{name}")') self._name = name - self.downloads_root = volume / name / "downloads" + self.downloads_root = volume / "downloads" self._rpc: Optional[DelugeRPCClient] = None self.daemon_args = { @@ -55,9 +57,7 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): "password": daemon_password, } - super().__init__(self.downloads_root) - - self._init_folders() + self.agent = DelugeAgentClient(agent_url) @property def name(self) -> str: @@ -80,27 +80,17 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): # folder after your check, so this is the only sane way to do it. pass - self._init_folders() - - def seed( + def genseed( self, - file: Path, - handle: Union[DelugeMeta, Torrent], + size: int, + seed: int, + meta: DelugeMeta, ) -> Torrent: - data_root = self.downloads_root / handle.name - data_root.mkdir(parents=True, exist_ok=False) - - target = self.upload(local=file, name=handle.name) - - if isinstance(handle, DelugeMeta): - torrent = Torrent.create_from(target.parent) - torrent.announce_urls = handle.announce_url.url - torrent.name = handle.name - else: - torrent = handle + torrent = self.agent.generate(size, seed, meta.name) + torrent.announce_urls = [str(meta.announce_url)] self.rpc.core.add_torrent_file( - filename=f"{handle.name}.torrent", + filename=f"{meta.name}.torrent", filedump=self._b64dump(torrent), options=dict(), ) @@ -154,9 +144,6 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): except (ConnectionRefusedError, socket.gaierror): return False - def _init_folders(self): - self.downloads_root.mkdir(parents=True, exist_ok=True) - @staticmethod def _b64dump(handle: Torrent) -> bytes: buffer = BytesIO() diff --git a/benchmarks/deluge/tests/fixtures.py b/benchmarks/deluge/tests/fixtures.py index afaa9bd..40c6a41 100644 --- a/benchmarks/deluge/tests/fixtures.py +++ b/benchmarks/deluge/tests/fixtures.py @@ -5,18 +5,20 @@ from typing import Generator import pytest from urllib3.util import parse_url -from benchmarks.core import utils -from benchmarks.core.utils import megabytes, await_predicate +from benchmarks.core.utils import await_predicate from benchmarks.deluge.deluge_node import DelugeNode from benchmarks.deluge.tracker import Tracker -from benchmarks.tests.utils import shared_volume def deluge_node( - name: str, address: str, port: int + name: str, address: str, port: int, agent_url: str ) -> Generator[DelugeNode, None, None]: node = DelugeNode( - name, volume=shared_volume(), daemon_address=address, daemon_port=port + name, + volume=Path("/var/lib/deluge"), + daemon_address=address, + daemon_port=port, + agent_url=parse_url(agent_url), ) assert await_predicate(node.is_ready, timeout=10, polling_interval=0.5) node.wipe_all_torrents() @@ -29,30 +31,33 @@ def deluge_node( @pytest.fixture def deluge_node1() -> Generator[DelugeNode, None, None]: yield from deluge_node( - "deluge-1", os.environ.get("DELUGE_NODE_1", "localhost"), 6890 + "deluge-1", + os.environ.get("DELUGE_NODE_1", "localhost"), + 6890, + os.environ.get("DELUGE_AGENT_1", "http://localhost:9001"), ) @pytest.fixture def deluge_node2() -> Generator[DelugeNode, None, None]: yield from deluge_node( - "deluge-2", os.environ.get("DELUGE_NODE_2", "localhost"), 6893 + "deluge-2", + os.environ.get("DELUGE_NODE_2", "localhost"), + 6893, + os.environ.get("DELUGE_AGENT_2", "http://localhost:9002"), ) @pytest.fixture def deluge_node3() -> Generator[DelugeNode, None, None]: yield from deluge_node( - "deluge-3", os.environ.get("DELUGE_NODE_3", "localhost"), 6896 + "deluge-3", + os.environ.get("DELUGE_NODE_3", "localhost"), + 6896, + os.environ.get("DELUGE_AGENT_3", "http://localhost:9003"), ) -@pytest.fixture -def temp_random_file() -> Generator[Path, None, None]: - with utils.temp_random_file(size=megabytes(1)) as random_file: - yield random_file - - @pytest.fixture def tracker() -> Tracker: return Tracker( diff --git a/benchmarks/deluge/tests/test_config.py b/benchmarks/deluge/tests/test_config.py index f861a20..7cd974d 100644 --- a/benchmarks/deluge/tests/test_config.py +++ b/benchmarks/deluge/tests/test_config.py @@ -20,6 +20,7 @@ def test_should_expand_node_sets_into_simple_nodes(): network_size=4, daemon_port=6080, listen_ports=[6081, 6082], + agent_url="http://localhost:8000", ) assert nodeset.nodes == [ @@ -28,24 +29,28 @@ def test_should_expand_node_sets_into_simple_nodes(): address="deluge-1.local.svc", daemon_port=6080, listen_ports=[6081, 6082], + agent_url="http://localhost:8000", ), DelugeNodeConfig( name="custom-2", address="deluge-2.local.svc", daemon_port=6080, listen_ports=[6081, 6082], + agent_url="http://localhost:8000", ), DelugeNodeConfig( name="custom-3", address="deluge-3.local.svc", daemon_port=6080, listen_ports=[6081, 6082], + agent_url="http://localhost:8000", ), DelugeNodeConfig( name="custom-4", address="deluge-4.local.svc", daemon_port=6080, listen_ports=[6081, 6082], + agent_url="http://localhost:8000", ), ] @@ -58,6 +63,7 @@ def test_should_respect_first_node_index(): daemon_port=6080, listen_ports=[6081, 6082], first_node_index=5, + agent_url="http://localhost:8000", ) assert nodeset.nodes == [ @@ -66,12 +72,14 @@ def test_should_respect_first_node_index(): address="deluge-5.local.svc", daemon_port=6080, listen_ports=[6081, 6082], + agent_url="http://localhost:8000", ), DelugeNodeConfig( name="deluge-6", address="deluge-6.local.svc", daemon_port=6080, listen_ports=[6081, 6082], + agent_url="http://localhost:8000", ), ] @@ -91,6 +99,7 @@ def test_should_build_experiment_from_config(): address: 'node-{node_index}.deluge.codexbenchmarks.svc.cluster.local' daemon_port: 6890 listen_ports: [ 6891, 6892 ] + agent_url: http://localhost:8080 """) config = DelugeExperimentConfig.model_validate( @@ -126,6 +135,7 @@ def test_should_create_n_repetitions_per_seeder_set(): address: 'node-{node_index}.deluge.codexbenchmarks.svc.cluster.local' daemon_port: 6890 listen_ports: [ 6891, 6892 ] + agent_url: http://localhost:8080 """) config = DelugeExperimentConfig.model_validate( diff --git a/benchmarks/deluge/tests/test_deluge_node.py b/benchmarks/deluge/tests/test_deluge_node.py index d991804..a28a746 100644 --- a/benchmarks/deluge/tests/test_deluge_node.py +++ b/benchmarks/deluge/tests/test_deluge_node.py @@ -1,5 +1,3 @@ -from pathlib import Path - import pytest from tenacity import wait_incrementing, stop_after_attempt, RetryError @@ -31,14 +29,15 @@ def assert_is_seed(node: DelugeNode, name: str, size: int): @pytest.mark.integration -def test_should_seed_files( - deluge_node1: DelugeNode, temp_random_file: Path, tracker: Tracker -): +def test_should_seed_files(deluge_node1: DelugeNode, tracker: Tracker): assert not deluge_node1.torrent_info(name="dataset1") - deluge_node1.seed( - temp_random_file, DelugeMeta(name="dataset1", announce_url=tracker.announce_url) + deluge_node1.genseed( + size=megabytes(1), + seed=1234, + meta=DelugeMeta(name="dataset1", announce_url=tracker.announce_url), ) + assert_is_seed(deluge_node1, name="dataset1", size=megabytes(1)) @@ -46,14 +45,15 @@ def test_should_seed_files( def test_should_download_files( deluge_node1: DelugeNode, deluge_node2: DelugeNode, - temp_random_file: Path, tracker: Tracker, ): assert not deluge_node1.torrent_info(name="dataset1") assert not deluge_node2.torrent_info(name="dataset1") - torrent = deluge_node1.seed( - temp_random_file, DelugeMeta(name="dataset1", announce_url=tracker.announce_url) + torrent = deluge_node1.genseed( + size=megabytes(1), + seed=1234, + meta=DelugeMeta(name="dataset1", announce_url=tracker.announce_url), ) handle = deluge_node2.leech(torrent) @@ -63,13 +63,13 @@ def test_should_download_files( @pytest.mark.integration -def test_should_remove_files( - deluge_node1: DelugeNode, temp_random_file: Path, tracker: Tracker -): +def test_should_remove_files(deluge_node1: DelugeNode, tracker: Tracker): assert not deluge_node1.torrent_info(name="dataset1") - torrent = deluge_node1.seed( - temp_random_file, DelugeMeta(name="dataset1", announce_url=tracker.announce_url) + torrent = deluge_node1.genseed( + size=megabytes(1), + seed=1234, + meta=DelugeMeta(name="dataset1", announce_url=tracker.announce_url), ) assert_is_seed(deluge_node1, name="dataset1", size=megabytes(1)) diff --git a/benchmarks/deluge/tests/test_deluge_static_experiment.py b/benchmarks/deluge/tests/test_deluge_static_experiment.py index 2591c5d..1cbd8df 100644 --- a/benchmarks/deluge/tests/test_deluge_static_experiment.py +++ b/benchmarks/deluge/tests/test_deluge_static_experiment.py @@ -2,7 +2,7 @@ import pytest from benchmarks.core.experiments.experiments import ExperimentEnvironment from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment -from benchmarks.core.utils import RandomTempData, megabytes +from benchmarks.core.utils import megabytes from benchmarks.deluge.deluge_node import DelugeMeta from benchmarks.deluge.tests.test_deluge_node import assert_is_seed @@ -20,10 +20,9 @@ def test_should_run_with_a_single_seeder( experiment = StaticDisseminationExperiment( network=[deluge_node1, deluge_node2, deluge_node3], seeders=[1], - data=RandomTempData( - size=size, - meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url), - ), + file_size=size, + seed=1234, + meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url), ) env.await_ready() @@ -51,10 +50,9 @@ def test_should_run_with_multiple_seeders( experiment = StaticDisseminationExperiment( network=[deluge_node1, deluge_node2, deluge_node3], seeders=[1, 2], - data=RandomTempData( - size=size, - meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url), - ), + file_size=size, + seed=1234, + meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url), ) env.await_ready() diff --git a/benchmarks/logging/sources.py b/benchmarks/logging/sources.py index 8b6fdac..8117c8a 100644 --- a/benchmarks/logging/sources.py +++ b/benchmarks/logging/sources.py @@ -8,7 +8,7 @@ from collections.abc import Iterator from contextlib import AbstractContextManager from json import JSONDecodeError from pathlib import Path -from typing import TextIO, Optional, Tuple, List, Dict, Type +from typing import TextIO, Optional, Tuple, List, Dict, Type, IO from benchmarks.logging.logging import ( LogParser, @@ -51,14 +51,14 @@ class OutputManager(AbstractContextManager): """An :class:`OutputManager` is responsible for managing output locations for log splitting operations. :class:`OutputManager`s must be closed after use, and implements the context manager interface to that end.""" - def open(self, relative_path: Path) -> TextIO: + def open(self, relative_path: Path, mode: str = "w", encoding="utf-8") -> IO: """Opens a file for writing within a relative abstract path.""" if relative_path.is_absolute(): raise ValueError(f"Path {relative_path} must be relative.") - return self._open(relative_path) + return self._open(relative_path, mode, encoding) @abstractmethod - def _open(self, relative_path: Path) -> TextIO: + def _open(self, relative_path: Path, mode: str, encoding: str) -> IO: pass @@ -67,13 +67,13 @@ class FSOutputManager(OutputManager): def __init__(self, root: Path) -> None: self.root = root - self.open_files: List[TextIO] = [] + self.open_files: List[IO] = [] - def _open(self, relative_path: Path) -> TextIO: + def _open(self, relative_path: Path, mode: str, encoding: str) -> IO: fullpath = self.root / relative_path parent = fullpath.parent parent.mkdir(parents=True, exist_ok=True) - f = fullpath.open("w", encoding="utf-8") + f = fullpath.open(mode, encoding=encoding) self.open_files.append(f) return f diff --git a/benchmarks/logging/tests/test_split_logs_in_source.py b/benchmarks/logging/tests/test_split_logs_in_source.py index 9b5005d..d0ba992 100644 --- a/benchmarks/logging/tests/test_split_logs_in_source.py +++ b/benchmarks/logging/tests/test_split_logs_in_source.py @@ -1,13 +1,12 @@ import datetime from io import StringIO +from benchmarks.logging.logging import LogEntry, LogParser from benchmarks.logging.sources import ( VectorFlatFileSource, - OutputManager, split_logs_in_source, ) - -from benchmarks.logging.logging import LogEntry, LogParser +from benchmarks.logging.tests.utils import InMemoryOutputManager from benchmarks.tests.utils import make_jsonl, compact EXPERIMENT_LOG = [ @@ -64,27 +63,6 @@ EXPERIMENT_LOG = [ ] -class InMemoryOutputManager(OutputManager): - def __init__(self): - self.fs = {} - - def _open(self, relative_path): - root = self.fs - for element in relative_path.parts[:-1]: - subtree = root.get(element) - if subtree is None: - subtree = {} - root[element] = subtree - root = subtree - - output = StringIO() - root[relative_path.parts[-1]] = output - return output - - def __exit__(self, exc_type, exc_value, traceback, /): - pass - - class MetricsEvent(LogEntry): name: str timestamp: datetime.datetime diff --git a/benchmarks/logging/tests/utils.py b/benchmarks/logging/tests/utils.py new file mode 100644 index 0000000..88b350a --- /dev/null +++ b/benchmarks/logging/tests/utils.py @@ -0,0 +1,24 @@ +from io import StringIO + +from benchmarks.logging.sources import OutputManager + + +class InMemoryOutputManager(OutputManager): + def __init__(self): + self.fs = {} + + def _open(self, relative_path, mode: str, encoding: str): + root = self.fs + for element in relative_path.parts[:-1]: + subtree = root.get(element) + if subtree is None: + subtree = {} + root[element] = subtree + root = subtree + + output = StringIO() + root[relative_path.parts[-1]] = output + return output + + def __exit__(self, exc_type, exc_value, traceback, /): + pass diff --git a/docker-compose.ci.yaml b/docker-compose.ci.yaml index 89bdee4..8b32861 100644 --- a/docker-compose.ci.yaml +++ b/docker-compose.ci.yaml @@ -3,47 +3,15 @@ services: test-runner: image: bittorrent-benchmarks:test container_name: test-runner - volumes: - - shared-volume:/opt/bittorrent-benchmarks/volume - entrypoint: [ "bash", "-c", "/opt/bittorrent-benchmarks/docker/bin/run-tests.sh" ] - healthcheck: - test: stat /opt/bittorrent-benchmarks/volume/.initialized - interval: 1s - timeout: 5s - retries: 150 - - deluge-1: - volumes: !override - - type: volume - source: shared-volume - target: /var/lib/deluge - volume: - subpath: deluge-1 + entrypoint: [ "poetry", "run", "pytest", "--exitfirst" ] + environment: + - DELUGE_NODE_1=deluge-1 + - DELUGE_NODE_2=deluge-2 + - DELUGE_NODE_3=deluge-3 + - DELUGE_AGENT_1=http://agent-1:9001/ + - DELUGE_AGENT_2=http://agent-2:9002/ + - DELUGE_AGENT_3=http://agent-3:9003/ + - TRACKER_ANNOUNCE_URL=http://tracker:8000/announce depends_on: - test-runner: + clean-volumes: condition: service_healthy - - deluge-2: - volumes: !override - - type: volume - source: shared-volume - target: /var/lib/deluge - volume: - subpath: deluge-2 - depends_on: - test-runner: - condition: service_healthy - - deluge-3: - volumes: !override - - type: volume - source: shared-volume - target: /var/lib/deluge - volume: - subpath: deluge-3 - depends_on: - test-runner: - condition: service_healthy - -volumes: - shared-volume: diff --git a/docker-compose.local.yaml b/docker-compose.local.yaml index a43e873..d385a11 100644 --- a/docker-compose.local.yaml +++ b/docker-compose.local.yaml @@ -1,11 +1,27 @@ # This compose spec contains the basic setup for running integration tests with the -# test runner outside of a container and bind mounts for data so they can be inspected. -# This is ideal for development. - -# You will need [rootless Docker](https://docs.docker.com/engine/security/rootless/) -# for this to work cause the tests rely on user-writable bind mounts. +# test runner outside of a container. This is ideal for local development. services: + clean-volumes: + image: alpine + container_name: clean-volumes + entrypoint: + - /bin/sh + - -c + - | + rm -rf /var/lib/deluge1/* /var/lib/deluge2/* /var/lib/deluge3/* + touch /.done + sleep infinity + volumes: + - shared-volume-1:/var/lib/deluge1 + - shared-volume-2:/var/lib/deluge2 + - shared-volume-3:/var/lib/deluge3 + healthcheck: + timeout: 10s + test: [ "CMD", "test", "-f", "/.done" ] + retries: 10 + interval: 1s + deluge-1: image: codexstorage/deluge container_name: deluge-1 @@ -15,11 +31,25 @@ services: - DELUGE_LOG_LEVEL=${DELUGE_LOG_LEVEL:-info} - DELUGE_NODE_ID=deluge-1 volumes: - - ./volume/deluge-1:/var/lib/deluge - - ./volume/deluge-1/downloads:/var/lib/deluge/downloads + - shared-volume-1:/var/lib/deluge ports: - "6890:6890" - "6891-6892:6891-6892" + depends_on: + clean-volumes: + condition: service_healthy + + agent-1: + image: bittorrent-benchmarks:test + container_name: agent-1 + entrypoint: [ "poetry", "run", "bittorrent-benchmarks", + "agent", "experiments.local.yaml", "deluge_agent", "--port", "9001" ] + environment: + - TORRENTS_ROOT=/var/lib/deluge/downloads + volumes: + - shared-volume-1:/var/lib/deluge + ports: + - "9001:9001" deluge-2: image: codexstorage/deluge @@ -30,11 +60,25 @@ services: - DELUGE_LOG_LEVEL=${DELUGE_LOG_LEVEL:-info} - DELUGE_NODE_ID=deluge-2 volumes: - - ./volume/deluge-2:/var/lib/deluge - - ./volume/deluge-2/downloads:/var/lib/deluge/downloads + - shared-volume-2:/var/lib/deluge ports: - "6893:6893" - "6894-6895:6894-6895" + depends_on: + clean-volumes: + condition: service_healthy + + agent-2: + image: bittorrent-benchmarks:test + container_name: agent-2 + entrypoint: [ "poetry", "run", "bittorrent-benchmarks", "agent", + "experiments.local.yaml", "deluge_agent", "--port", "9002" ] + environment: + - TORRENTS_ROOT=/var/lib/deluge/downloads + volumes: + - shared-volume-2:/var/lib/deluge + ports: + - "9002:9002" deluge-3: image: codexstorage/deluge @@ -45,14 +89,33 @@ services: - DELUGE_LOG_LEVEL=${DELUGE_LOG_LEVEL:-info} - DELUGE_NODE_ID=deluge-3 volumes: - - ./volume/deluge-3:/var/lib/deluge - - ./volume/deluge-3/downloads:/var/lib/deluge/downloads + - shared-volume-3:/var/lib/deluge ports: - "6896:6896" - "6897-6898:6897-6898" + depends_on: + clean-volumes: + condition: service_healthy + + agent-3: + image: bittorrent-benchmarks:test + container_name: agent-3 + entrypoint: [ "poetry", "run", "bittorrent-benchmarks", "agent", "experiments.local.yaml", + "deluge_agent", "--port", "9003" ] + environment: + - TORRENTS_ROOT=/var/lib/deluge/downloads + volumes: + - shared-volume-3:/var/lib/deluge + ports: + - "9003:9003" tracker: image: codexstorage/bittorrent-tracker container_name: tracker ports: - - "8000:8000" \ No newline at end of file + - "8000:8000" + +volumes: + shared-volume-1: + shared-volume-2: + shared-volume-3: \ No newline at end of file diff --git a/docker/bin/run-tests.sh b/docker/bin/run-tests.sh deleted file mode 100755 index 3fdbf65..0000000 --- a/docker/bin/run-tests.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env bash -set -e - -export DELUGE_NODE_1=deluge-1 -export DELUGE_NODE_2=deluge-2 -export DELUGE_NODE_3=deluge-3 -export TRACKER_ANNOUNCE_URL=http://tracker:8000/announce - -# Initializes the shared volume. -echo "Initializing shared volume." -mkdir -p /opt/bittorrent-benchmarks/volume/deluge-{1,2,3} -touch /opt/bittorrent-benchmarks/volume/.initialized - -echo "Launching tests." -cd /opt/bittorrent-benchmarks -poetry run pytest --exitfirst diff --git a/docker/bittorrent-benchmarks.Dockerfile b/docker/bittorrent-benchmarks.Dockerfile index 07fa149..de18331 100644 --- a/docker/bittorrent-benchmarks.Dockerfile +++ b/docker/bittorrent-benchmarks.Dockerfile @@ -10,7 +10,7 @@ WORKDIR /opt/bittorrent-benchmarks COPY pyproject.toml poetry.lock ./ RUN if [ "$BUILD_TYPE" = "release" ]; then \ echo "Image is a release build"; \ - poetry install --only main --no-root; \ + poetry install --without dev --no-root; \ else \ echo "Image is a test build"; \ poetry install --no-root; \ diff --git a/experiments.local.yaml b/experiments.local.yaml index 35470cd..201bd15 100644 --- a/experiments.local.yaml +++ b/experiments.local.yaml @@ -12,11 +12,17 @@ deluge_experiment: address: ${DELUGE_NODE_1:-localhost} daemon_port: 6890 listen_ports: [ 6891, 6892 ] + agent_url: http://${DELUGE_AGENT_1:-localhost}:9001/ - name: deluge-2 address: ${DELUGE_NODE_2:-localhost} daemon_port: 6893 listen_ports: [ 6894, 6895 ] - - name: deluge-2 + agent_url: http://${DELUGE_AGENT_2:-localhost}:9002/ + - name: deluge-3 address: ${DELUGE_NODE_3:-localhost} daemon_port: 6896 listen_ports: [ 6897, 6898 ] + agent_url: http://${DELUGE_AGENT_3:-localhost}:9003/ + +deluge_agent: + torrents_path: /var/lib/deluge/downloads diff --git a/poetry.lock b/poetry.lock index b1087bc..88bac1a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -11,6 +11,27 @@ files = [ {file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"}, ] +[[package]] +name = "anyio" +version = "4.8.0" +description = "High level compatibility layer for multiple asynchronous event loop implementations" +optional = false +python-versions = ">=3.9" +files = [ + {file = "anyio-4.8.0-py3-none-any.whl", hash = "sha256:b5011f270ab5eb0abf13385f851315585cc37ef330dd88e27ec3d34d651fd47a"}, + {file = "anyio-4.8.0.tar.gz", hash = "sha256:1d9fe889df5212298c0c0723fa20479d1b94883a2df44bd3897aa91083316f7a"}, +] + +[package.dependencies] +idna = ">=2.8" +sniffio = ">=1.1" +typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""} + +[package.extras] +doc = ["Sphinx (>=7.4,<8.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx_rtd_theme"] +test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "trustme", "truststore (>=0.9.1)", "uvloop (>=0.21)"] +trio = ["trio (>=0.26.1)"] + [[package]] name = "certifi" version = "2024.12.14" @@ -134,6 +155,20 @@ files = [ {file = "charset_normalizer-3.4.1.tar.gz", hash = "sha256:44251f18cd68a75b56585dd00dae26183e102cd5e0f9f1466e6df5da2ed64ea3"}, ] +[[package]] +name = "click" +version = "8.1.8" +description = "Composable command line interface toolkit" +optional = false +python-versions = ">=3.7" +files = [ + {file = "click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2"}, + {file = "click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + [[package]] name = "colorama" version = "0.4.6" @@ -167,6 +202,26 @@ files = [ {file = "distlib-0.3.9.tar.gz", hash = "sha256:a60f20dea646b8a33f3e7772f74dc0b2d0772d2837ee1342a00645c81edf9403"}, ] +[[package]] +name = "fastapi" +version = "0.115.6" +description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production" +optional = false +python-versions = ">=3.8" +files = [ + {file = "fastapi-0.115.6-py3-none-any.whl", hash = "sha256:e9240b29e36fa8f4bb7290316988e90c381e5092e0cbe84e7818cc3713bcf305"}, + {file = "fastapi-0.115.6.tar.gz", hash = "sha256:9ec46f7addc14ea472958a96aae5b5de65f39721a46aaf5705c480d9a8b76654"}, +] + +[package.dependencies] +pydantic = ">=1.7.4,<1.8 || >1.8,<1.8.1 || >1.8.1,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.1.0 || >2.1.0,<3.0.0" +starlette = ">=0.40.0,<0.42.0" +typing-extensions = ">=4.8.0" + +[package.extras] +all = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "itsdangerous (>=1.1.0)", "jinja2 (>=2.11.2)", "orjson (>=3.2.1)", "pydantic-extra-types (>=2.0.0)", "pydantic-settings (>=2.0.0)", "python-multipart (>=0.0.7)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "uvicorn[standard] (>=0.12.0)"] +standard = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "jinja2 (>=2.11.2)", "python-multipart (>=0.0.7)", "uvicorn[standard] (>=0.12.0)"] + [[package]] name = "filelock" version = "3.16.1" @@ -183,6 +238,62 @@ docs = ["furo (>=2024.8.6)", "sphinx (>=8.0.2)", "sphinx-autodoc-typehints (>=2. testing = ["covdefaults (>=2.3)", "coverage (>=7.6.1)", "diff-cover (>=9.2)", "pytest (>=8.3.3)", "pytest-asyncio (>=0.24)", "pytest-cov (>=5)", "pytest-mock (>=3.14)", "pytest-timeout (>=2.3.1)", "virtualenv (>=20.26.4)"] typing = ["typing-extensions (>=4.12.2)"] +[[package]] +name = "h11" +version = "0.14.0" +description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" +optional = false +python-versions = ">=3.7" +files = [ + {file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"}, + {file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"}, +] + +[[package]] +name = "httpcore" +version = "1.0.7" +description = "A minimal low-level HTTP client." +optional = false +python-versions = ">=3.8" +files = [ + {file = "httpcore-1.0.7-py3-none-any.whl", hash = "sha256:a3fff8f43dc260d5bd363d9f9cf1830fa3a458b332856f34282de498ed420edd"}, + {file = "httpcore-1.0.7.tar.gz", hash = "sha256:8551cb62a169ec7162ac7be8d4817d561f60e08eaa485234898414bb5a8a0b4c"}, +] + +[package.dependencies] +certifi = "*" +h11 = ">=0.13,<0.15" + +[package.extras] +asyncio = ["anyio (>=4.0,<5.0)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +trio = ["trio (>=0.22.0,<1.0)"] + +[[package]] +name = "httpx" +version = "0.28.1" +description = "The next generation HTTP client." +optional = false +python-versions = ">=3.8" +files = [ + {file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"}, + {file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"}, +] + +[package.dependencies] +anyio = "*" +certifi = "*" +httpcore = "==1.*" +idna = "*" + +[package.extras] +brotli = ["brotli", "brotlicffi"] +cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +zstd = ["zstandard (>=0.18.0)"] + [[package]] name = "identify" version = "2.6.5" @@ -640,6 +751,34 @@ files = [ {file = "ruff-0.8.6.tar.gz", hash = "sha256:dcad24b81b62650b0eb8814f576fc65cfee8674772a6e24c9b747911801eeaa5"}, ] +[[package]] +name = "sniffio" +version = "1.3.1" +description = "Sniff out which async library your code is running under" +optional = false +python-versions = ">=3.7" +files = [ + {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, + {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, +] + +[[package]] +name = "starlette" +version = "0.41.3" +description = "The little ASGI library that shines." +optional = false +python-versions = ">=3.8" +files = [ + {file = "starlette-0.41.3-py3-none-any.whl", hash = "sha256:44cedb2b7c77a9de33a8b74b2b90e9f50d11fcf25d8270ea525ad71a25374ff7"}, + {file = "starlette-0.41.3.tar.gz", hash = "sha256:0e4ab3d16522a255be6b28260b938eae2482f98ce5cc934cb08dce8dc3ba5835"}, +] + +[package.dependencies] +anyio = ">=3.4.0,<5" + +[package.extras] +full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.7)", "pyyaml"] + [[package]] name = "tenacity" version = "9.0.0" @@ -722,6 +861,24 @@ h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] +[[package]] +name = "uvicorn" +version = "0.34.0" +description = "The lightning-fast ASGI server." +optional = false +python-versions = ">=3.9" +files = [ + {file = "uvicorn-0.34.0-py3-none-any.whl", hash = "sha256:023dc038422502fa28a09c7a30bf2b6991512da7dcdb8fd35fe57cfc154126f4"}, + {file = "uvicorn-0.34.0.tar.gz", hash = "sha256:404051050cd7e905de2c9a7e61790943440b3416f49cb409f965d9dcd0fa73e9"}, +] + +[package.dependencies] +click = ">=7.0" +h11 = ">=0.8" + +[package.extras] +standard = ["colorama (>=0.4)", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"] + [[package]] name = "virtualenv" version = "20.28.1" @@ -745,4 +902,4 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "ecc038d5e0e05d072a39a294ef96e7045554678cfc870bafe13b07385208e0f2" +content-hash = "1cb8a00bb7c5eebcafcd9415d1513517c35dd450ca0d4e93a2584f81b08412fe" diff --git a/pyproject.toml b/pyproject.toml index 94b69e3..fd91e18 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,17 +16,23 @@ pyyaml = "^6.0.2" requests = "^2.32.3" ruff = "^0.8.6" tenacity = "^9.0.0" +fastapi = "^0.115.6" [tool.poetry.group.test.dependencies] pytest = "^8.3.3" mypy = "^1.13.0" types-pyyaml = "^6.0.12.20240917" types-requests = "^2.32.0.20241016" +httpx = "^0.28.1" [tool.poetry.group.dev.dependencies] pre-commit = "^4.0.1" + +[tool.poetry.group.agent.dependencies] +uvicorn = "^0.34.0" + [tool.pytest.ini_options] markers = [ "integration: marks tests as integration tests"