From b17a855f6e75a4e7c401adb1a3e241a361c51871 Mon Sep 17 00:00:00 2001 From: gmega Date: Sat, 14 Dec 2024 06:34:11 -0300 Subject: [PATCH] run ruff formatter --- benchmarks/cli.py | 74 ++++++---- benchmarks/core/config.py | 9 +- benchmarks/core/experiments/experiments.py | 24 ++-- .../core/experiments/static_experiment.py | 129 +++++++++++------- .../experiments/tests/test_experiments.py | 7 +- .../tests/test_static_experiment.py | 73 +++++----- benchmarks/core/logging.py | 68 +++++---- benchmarks/core/network.py | 10 +- benchmarks/core/pydantic.py | 10 +- benchmarks/core/tests/fixtures.py | 2 +- benchmarks/core/tests/test_config.py | 12 +- benchmarks/core/tests/test_logging.py | 83 ++++++----- benchmarks/core/tests/test_pydantic.py | 36 ++--- benchmarks/core/utils.py | 15 +- benchmarks/deluge/config.py | 65 ++++++--- benchmarks/deluge/deluge_node.py | 49 +++---- benchmarks/deluge/logging.py | 2 +- benchmarks/deluge/tests/fixtures.py | 26 +++- benchmarks/deluge/tests/test_config.py | 58 ++++---- benchmarks/deluge/tests/test_deluge_node.py | 37 +++-- .../tests/test_deluge_static_experiment.py | 52 ++++--- benchmarks/deluge/tracker.py | 3 +- benchmarks/tests/utils.py | 5 +- 23 files changed, 494 insertions(+), 355 deletions(-) diff --git a/benchmarks/cli.py b/benchmarks/cli.py index 0e8856f..736c28e 100644 --- a/benchmarks/cli.py +++ b/benchmarks/cli.py @@ -8,7 +8,12 @@ from pydantic_core import ValidationError from benchmarks.core.config import ConfigParser, ExperimentBuilder from benchmarks.core.experiments.experiments import Experiment -from benchmarks.core.logging import basic_log_parser, LogSplitter, LogEntry, LogSplitterFormats +from benchmarks.core.logging import ( + basic_log_parser, + LogSplitter, + LogEntry, + LogSplitterFormats, +) from benchmarks.deluge.config import DelugeExperimentConfig from benchmarks.deluge.logging import DelugeTorrentDownload @@ -25,14 +30,14 @@ logger = logging.getLogger(__name__) def cmd_list(experiments: Dict[str, ExperimentBuilder[Experiment]], _): - print('Available experiments are:') + print("Available experiments are:") for experiment in experiments.keys(): - print(f' - {experiment}') + print(f" - {experiment}") def cmd_run(experiments: Dict[str, ExperimentBuilder[Experiment]], args): if args.experiment not in experiments: - print(f'Experiment {args.experiment} not found.') + print(f"Experiment {args.experiment} not found.") sys.exit(-1) experiment = experiments[args.experiment] @@ -42,9 +47,9 @@ def cmd_run(experiments: Dict[str, ExperimentBuilder[Experiment]], args): def cmd_describe(args): if not args.type: - print('Available experiment types are:') + print("Available experiment types are:") for experiment in config_parser.experiment_types.keys(): - print(f' - {experiment}') + print(f" - {experiment}") return print(config_parser.experiment_types[args.type].schema_json(indent=2)) @@ -52,34 +57,36 @@ def cmd_describe(args): def cmd_logs(log: Path, output: Path): if not log.exists(): - print(f'Log file {log} does not exist.') + print(f"Log file {log} does not exist.") sys.exit(-1) if not output.parent.exists(): - print(f'Folder {output.parent} does not exist.') + print(f"Folder {output.parent} does not exist.") sys.exit(-1) output.mkdir(exist_ok=True) def output_factory(event_type: str, format: LogSplitterFormats): - return (output / f'{event_type}.{format.value}').open('w', encoding='utf-8') + return (output / f"{event_type}.{format.value}").open("w", encoding="utf-8") - with (log.open('r', encoding='utf-8') as istream, - LogSplitter(output_factory) as splitter): + with ( + log.open("r", encoding="utf-8") as istream, + LogSplitter(output_factory) as splitter, + ): splitter.set_format(DECLogEntry, LogSplitterFormats.jsonl) splitter.split(log_parser.parse(istream)) def _parse_config(config: Path) -> Dict[str, ExperimentBuilder[Experiment]]: if not config.exists(): - print(f'Config file {config} does not exist.') + print(f"Config file {config} does not exist.") sys.exit(-1) - with config.open(encoding='utf-8') as infile: + with config.open(encoding="utf-8") as infile: try: return config_parser.parse(infile) except ValidationError as e: - print('There were errors parsing the config file.') + print("There were errors parsing the config file.") for error in e.errors(): print(f' - {error["loc"]}: {error["msg"]} {error["input"]}') sys.exit(-1) @@ -90,7 +97,7 @@ def _init_logging(): logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) @@ -99,26 +106,39 @@ def main(): commands = parser.add_subparsers(required=True) - experiments = commands.add_parser('experiments', help='List or run experiments in config file.') - experiments.add_argument('config', type=Path, help='Path to the experiment configuration file.') + experiments = commands.add_parser( + "experiments", help="List or run experiments in config file." + ) + experiments.add_argument( + "config", type=Path, help="Path to the experiment configuration file." + ) experiment_commands = experiments.add_subparsers(required=True) - list_cmd = experiment_commands.add_parser('list', help='Lists available experiments.') + list_cmd = experiment_commands.add_parser( + "list", help="Lists available experiments." + ) list_cmd.set_defaults(func=lambda args: cmd_list(_parse_config(args.config), args)) - run_cmd = experiment_commands.add_parser('run', help='Runs an experiment') - run_cmd.add_argument('experiment', type=str, help='Name of the experiment to run.') + run_cmd = experiment_commands.add_parser("run", help="Runs an experiment") + run_cmd.add_argument("experiment", type=str, help="Name of the experiment to run.") run_cmd.set_defaults(func=lambda args: cmd_run(_parse_config(args.config), args)) - describe = commands.add_parser('describe', help='Shows the JSON schema for the various experiment types.') - describe.add_argument('type', type=str, help='Type of the experiment to describe.', - choices=config_parser.experiment_types.keys(), nargs='?') + describe = commands.add_parser( + "describe", help="Shows the JSON schema for the various experiment types." + ) + describe.add_argument( + "type", + type=str, + help="Type of the experiment to describe.", + choices=config_parser.experiment_types.keys(), + nargs="?", + ) describe.set_defaults(func=cmd_describe) - logs = commands.add_parser('logs', help='Parse logs.') - logs.add_argument('log', type=Path, help='Path to the log file.') - logs.add_argument('output_dir', type=Path, help='Path to an output folder.') + logs = commands.add_parser("logs", help="Parse logs.") + logs.add_argument("log", type=Path, help="Path to the log file.") + logs.add_argument("output_dir", type=Path, help="Path to an output folder.") logs.set_defaults(func=lambda args: cmd_logs(args.log, args.output_dir)) args = parser.parse_args() @@ -128,5 +148,5 @@ def main(): args.func(args) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/benchmarks/core/config.py b/benchmarks/core/config.py index deadef3..9e17abd 100644 --- a/benchmarks/core/config.py +++ b/benchmarks/core/config.py @@ -1,4 +1,5 @@ """Basic utilities for structuring experiment configurations based on Pydantic schemas.""" + import os from abc import abstractmethod from io import TextIOBase @@ -12,7 +13,7 @@ from benchmarks.core.pydantic import SnakeCaseModel class ExperimentBuilder(SnakeCaseModel, Generic[TExperiment]): - """:class:`ExperimentBuilders` can build real :class:`Experiment`s out of :class:`ConfigModel`s. """ + """:class:`ExperimentBuilders` can build real :class:`Experiment`s out of :class:`ConfigModel`s.""" @abstractmethod def build(self) -> TExperiment: @@ -32,12 +33,10 @@ class ConfigParser: self.experiment_types[root.alias()] = root @overload - def parse(self, data: dict) -> Dict[str, ExperimentBuilder[TExperiment]]: - ... + def parse(self, data: dict) -> Dict[str, ExperimentBuilder[TExperiment]]: ... @overload - def parse(self, data: TextIO) -> Dict[str, ExperimentBuilder[TExperiment]]: - ... + def parse(self, data: TextIO) -> Dict[str, ExperimentBuilder[TExperiment]]: ... def parse(self, data: dict | TextIO) -> Dict[str, ExperimentBuilder[TExperiment]]: if isinstance(data, TextIOBase): diff --git a/benchmarks/core/experiments/experiments.py b/benchmarks/core/experiments/experiments.py index b0edc0c..bc0b588 100644 --- a/benchmarks/core/experiments/experiments.py +++ b/benchmarks/core/experiments/experiments.py @@ -21,7 +21,7 @@ class Experiment(ABC): pass -TExperiment = TypeVar('TExperiment', bound=Experiment) +TExperiment = TypeVar("TExperiment", bound=Experiment) class ExperimentComponent(ABC): @@ -38,7 +38,9 @@ class ExperimentEnvironment: """An :class:`ExperimentEnvironment` is a collection of :class:`ExperimentComponent`s that must be ready before an :class:`Experiment` can execute.""" - def __init__(self, components: Iterable[ExperimentComponent], polling_interval: float = 0): + def __init__( + self, components: Iterable[ExperimentComponent], polling_interval: float = 0 + ): self.components = components self.polling_interval = polling_interval @@ -48,33 +50,39 @@ class ExperimentEnvironment: start_time = time() not_ready = [component for component in self.components] - logging.info(f'Awaiting for components to be ready: {self._component_names(not_ready)}') + logging.info( + f"Awaiting for components to be ready: {self._component_names(not_ready)}" + ) while len(not_ready) != 0: for component in not_ready: if component.is_ready(): - logger.info(f'Component {str(component)} is ready.') + logger.info(f"Component {str(component)} is ready.") not_ready.remove(component) sleep(self.polling_interval) if (timeout != 0) and (time() - start_time > timeout): - logger.info(f'Some components timed out: {self._component_names(not_ready)}') + logger.info( + f"Some components timed out: {self._component_names(not_ready)}" + ) return False return True @staticmethod def _component_names(components: List[ExperimentComponent]) -> str: - return ', '.join(str(component) for component in components) + return ", ".join(str(component) for component in components) def run(self, experiment: Experiment): """Runs the :class:`Experiment` within this :class:`ExperimentEnvironment`.""" if not self.await_ready(): - raise RuntimeError('One or more environment components were not get ready in time') + raise RuntimeError( + "One or more environment components were not get ready in time" + ) experiment.run() - def bind(self, experiment: TExperiment) -> 'BoundExperiment[TExperiment]': + def bind(self, experiment: TExperiment) -> "BoundExperiment[TExperiment]": return BoundExperiment(experiment, self) diff --git a/benchmarks/core/experiments/static_experiment.py b/benchmarks/core/experiments/static_experiment.py index a9c89cb..ba47e83 100644 --- a/benchmarks/core/experiments/static_experiment.py +++ b/benchmarks/core/experiments/static_experiment.py @@ -6,98 +6,121 @@ from typing_extensions import Generic, List, Tuple from benchmarks.core.experiments.experiments import Experiment from benchmarks.core.logging import RequestEvent, RequestEventType -from benchmarks.core.network import TInitialMetadata, TNetworkHandle, Node, DownloadHandle +from benchmarks.core.network import ( + TInitialMetadata, + TNetworkHandle, + Node, + DownloadHandle, +) from benchmarks.core.utils import ExperimentData logger = logging.getLogger(__name__) -class StaticDisseminationExperiment(Generic[TNetworkHandle, TInitialMetadata], Experiment): +class StaticDisseminationExperiment( + Generic[TNetworkHandle, TInitialMetadata], Experiment +): def __init__( - self, - network: Sequence[Node[TNetworkHandle, TInitialMetadata]], - seeders: List[int], - data: ExperimentData[TInitialMetadata], - concurrency: Optional[int] = None + self, + network: Sequence[Node[TNetworkHandle, TInitialMetadata]], + seeders: List[int], + data: ExperimentData[TInitialMetadata], + concurrency: Optional[int] = None, ): self.nodes = network self.seeders = seeders self.data = data - self._pool = ThreadPool(processes=len(network) - len(seeders) if concurrency is None else concurrency) + self._pool = ThreadPool( + processes=len(network) - len(seeders) + if concurrency is None + else concurrency + ) def run(self, run: int = 0): seeders, leechers = self._split_nodes() - logger.info('Running experiment with %d seeders and %d leechers', - len(seeders), len(leechers)) + logger.info( + "Running experiment with %d seeders and %d leechers", + len(seeders), + len(leechers), + ) with self.data as (meta, data): cid = None for node in seeders: - logger.info(RequestEvent( - node='runner', - destination=node.name, - name='seed', - request_id=str(meta), - type=RequestEventType.start - )) + logger.info( + RequestEvent( + node="runner", + destination=node.name, + name="seed", + request_id=str(meta), + type=RequestEventType.start, + ) + ) cid = node.seed(data, meta if cid is None else cid) - logger.info(RequestEvent( - node='runner', - destination=node.name, - name='seed', - request_id=str(meta), - type=RequestEventType.end - )) + logger.info( + RequestEvent( + node="runner", + destination=node.name, + name="seed", + request_id=str(meta), + type=RequestEventType.end, + ) + ) assert cid is not None # to please mypy - logger.info(f'Setting up leechers: {[str(leecher) for leecher in leechers]}') + logger.info( + f"Setting up leechers: {[str(leecher) for leecher in leechers]}" + ) def _leech(leecher): - logger.info(RequestEvent( - node='runner', - destination=leecher.name, - name='leech', - request_id=str(meta), - type=RequestEventType.start - )) + logger.info( + RequestEvent( + node="runner", + destination=leecher.name, + name="leech", + request_id=str(meta), + type=RequestEventType.start, + ) + ) download = leecher.leech(cid) - logger.info(RequestEvent( - node='runner', - destination=leecher.name, - name='leech', - request_id=str(meta), - type=RequestEventType.end - )) + logger.info( + RequestEvent( + node="runner", + destination=leecher.name, + name="leech", + request_id=str(meta), + type=RequestEventType.end, + ) + ) return download downloads = list(self._pool.imap_unordered(_leech, leechers)) - logger.info('Now waiting for downloads to complete') + logger.info("Now waiting for downloads to complete") def _await_for_download(element: Tuple[int, DownloadHandle]) -> int: index, download = element download.await_for_completion() return index - for i in self._pool.imap_unordered(_await_for_download, enumerate(downloads)): - logger.info('Download %d / %d completed', i + 1, len(downloads)) + for i in self._pool.imap_unordered( + _await_for_download, enumerate(downloads) + ): + logger.info("Download %d / %d completed", i + 1, len(downloads)) - logger.info('Shut down thread pool.') + logger.info("Shut down thread pool.") self._pool.close() self._pool.join() - logger.info('Done.') + logger.info("Done.") - def _split_nodes(self) -> Tuple[ + def _split_nodes( + self, + ) -> Tuple[ + List[Node[TNetworkHandle, TInitialMetadata]], List[Node[TNetworkHandle, TInitialMetadata]], - List[Node[TNetworkHandle, TInitialMetadata]] ]: - return [ - self.nodes[i] - for i in self.seeders - ], [ - self.nodes[i] - for i in range(0, len(self.nodes)) - if i not in self.seeders + return [self.nodes[i] for i in self.seeders], [ + self.nodes[i] for i in range(0, len(self.nodes)) if i not in self.seeders ] diff --git a/benchmarks/core/experiments/tests/test_experiments.py b/benchmarks/core/experiments/tests/test_experiments.py index a3deba3..f6dce1e 100644 --- a/benchmarks/core/experiments/tests/test_experiments.py +++ b/benchmarks/core/experiments/tests/test_experiments.py @@ -1,11 +1,14 @@ from time import sleep from typing import List -from benchmarks.core.experiments.experiments import ExperimentComponent, ExperimentEnvironment, Experiment +from benchmarks.core.experiments.experiments import ( + ExperimentComponent, + ExperimentEnvironment, + Experiment, +) class ExternalComponent(ExperimentComponent): - @property def readiness_timeout(self) -> float: return 0.1 diff --git a/benchmarks/core/experiments/tests/test_static_experiment.py b/benchmarks/core/experiments/tests/test_static_experiment.py index dbea51b..7c515f5 100644 --- a/benchmarks/core/experiments/tests/test_static_experiment.py +++ b/benchmarks/core/experiments/tests/test_static_experiment.py @@ -20,8 +20,7 @@ class MockHandle: class MockNode(Node[MockHandle, str]): - - def __init__(self, name='mock_node') -> None: + def __init__(self, name="mock_node") -> None: self._name = name self.seeding: Optional[Tuple[MockHandle, Path]] = None self.leeching: Optional[MockHandle] = None @@ -31,12 +30,7 @@ class MockNode(Node[MockHandle, str]): def name(self) -> str: return self._name - def seed( - self, - file: Path, - handle: Union[str, MockHandle] - ) -> MockHandle: - + def seed(self, file: Path, handle: Union[str, MockHandle]) -> MockHandle: if isinstance(handle, MockHandle): self.seeding = (handle, file) else: @@ -45,7 +39,6 @@ class MockNode(Node[MockHandle, str]): return self.seeding[0] def leech(self, handle: MockHandle): - self.leeching = handle return MockDownloadHandle(self) @@ -60,12 +53,12 @@ class MockDownloadHandle(DownloadHandle): def mock_network(n: int) -> List[MockNode]: - return [MockNode(f'node-{i}') for i in range(n)] + return [MockNode(f"node-{i}") for i in range(n)] def test_should_place_seeders(): network = mock_network(n=13) - data = MockExperimentData(meta='data', data=Path('/path/to/data')) + data = MockExperimentData(meta="data", data=Path("/path/to/data")) seeders = [9, 6, 3] experiment = StaticDisseminationExperiment( @@ -87,7 +80,7 @@ def test_should_place_seeders(): def test_should_download_at_remaining_nodes(): network = mock_network(n=13) - data = MockExperimentData(meta='data', data=Path('/path/to/data')) + data = MockExperimentData(meta="data", data=Path("/path/to/data")) seeders = [9, 6, 3] experiment = StaticDisseminationExperiment( @@ -112,7 +105,7 @@ def test_should_download_at_remaining_nodes(): def test_should_delete_generated_file_at_end_of_experiment(): network = mock_network(n=2) - data = MockExperimentData(meta='data', data=Path('/path/to/data')) + data = MockExperimentData(meta="data", data=Path("/path/to/data")) seeders = [1] experiment = StaticDisseminationExperiment( @@ -128,9 +121,9 @@ def test_should_delete_generated_file_at_end_of_experiment(): def test_should_log_requests_to_seeders_and_leechers(mock_logger): logger, output = mock_logger - with patch('benchmarks.core.experiments.static_experiment.logger', logger): + with patch("benchmarks.core.experiments.static_experiment.logger", logger): network = mock_network(n=3) - data = MockExperimentData(meta='dataset-1', data=Path('/path/to/data')) + data = MockExperimentData(meta="dataset-1", data=Path("/path/to/data")) seeders = [1] experiment = StaticDisseminationExperiment( @@ -149,51 +142,51 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger): assert events == [ RequestEvent( - destination='node-1', - node='runner', - name='seed', - request_id='dataset-1', + destination="node-1", + node="runner", + name="seed", + request_id="dataset-1", type=RequestEventType.start, timestamp=events[0].timestamp, ), RequestEvent( - destination='node-1', - node='runner', - name='seed', - request_id='dataset-1', + destination="node-1", + node="runner", + name="seed", + request_id="dataset-1", type=RequestEventType.end, timestamp=events[1].timestamp, ), RequestEvent( - destination='node-0', - node='runner', - name='leech', - request_id='dataset-1', + destination="node-0", + node="runner", + name="leech", + request_id="dataset-1", type=RequestEventType.start, timestamp=events[2].timestamp, ), RequestEvent( - destination='node-0', - node='runner', - name='leech', - request_id='dataset-1', + destination="node-0", + node="runner", + name="leech", + request_id="dataset-1", type=RequestEventType.end, timestamp=events[3].timestamp, ), RequestEvent( - destination='node-2', - node='runner', - name='leech', - request_id='dataset-1', + destination="node-2", + node="runner", + name="leech", + request_id="dataset-1", type=RequestEventType.start, timestamp=events[4].timestamp, ), RequestEvent( - destination='node-2', - node='runner', - name='leech', - request_id='dataset-1', + destination="node-2", + node="runner", + name="leech", + request_id="dataset-1", type=RequestEventType.end, timestamp=events[5].timestamp, - ) + ), ] diff --git a/benchmarks/core/logging.py b/benchmarks/core/logging.py index aa8d7fe..bdb35d0 100644 --- a/benchmarks/core/logging.py +++ b/benchmarks/core/logging.py @@ -11,7 +11,7 @@ from pydantic import ValidationError, computed_field, Field from benchmarks.core.pydantic import SnakeCaseModel -MARKER = '>>' +MARKER = ">>" logger = logging.getLogger(__name__) @@ -40,7 +40,7 @@ class LogEntry(SnakeCaseModel): return self.alias() @classmethod - def adapt(cls, model: Type[SnakeCaseModel]) -> Type['AdaptedLogEntry']: + def adapt(cls, model: Type[SnakeCaseModel]) -> Type["AdaptedLogEntry"]: """Adapts an existing Pydantic model to a LogEntry. This is useful for when you have a model that you want to log and later recover from logs using :class:`LogParser` or :class:`LogSplitter`.""" @@ -51,23 +51,22 @@ class LogEntry(SnakeCaseModel): return model.model_validate(self.model_dump()) adapted = type( - f'{model.__name__}LogEntry', + f"{model.__name__}LogEntry", (LogEntry,), { - '__annotations__': model.__annotations__, - 'adapt_instance': classmethod(adapt_instance), - 'recover_instance': recover_instance, - } + "__annotations__": model.__annotations__, + "adapt_instance": classmethod(adapt_instance), + "recover_instance": recover_instance, + }, ) return cast(Type[AdaptedLogEntry], adapted) class AdaptedLogEntry(LogEntry, ABC): - @classmethod @abstractmethod - def adapt_instance(cls, data: SnakeCaseModel) -> 'AdaptedLogEntry': + def adapt_instance(cls, data: SnakeCaseModel) -> "AdaptedLogEntry": pass @abstractmethod @@ -95,11 +94,11 @@ class LogParser: if index == -1: continue - type_tag = '' # just to calm down mypy + type_tag = "" # just to calm down mypy try: # Should probably test this against a regex for the type tag to see which is faster. - json_line = json.loads(line[index + marker_len:]) - type_tag = json_line.get('entry_type') + json_line = json.loads(line[index + marker_len :]) + type_tag = json_line.get("entry_type") if not type_tag or (type_tag not in self.entry_types): continue yield self.entry_types[type_tag].model_validate(json_line) @@ -110,26 +109,33 @@ class LogParser: # that we know, then we should probably be able to parse it. self.warn_counts -= 1 # avoid flooding everything with warnings if self.warn_counts > 0: - logger.warning(f"Schema failed for line with known type tag {type_tag}: {err}") + logger.warning( + f"Schema failed for line with known type tag {type_tag}: {err}" + ) elif self.warn_counts == 0: - logger.warning("Too many errors: suppressing further schema warnings.") + logger.warning( + "Too many errors: suppressing further schema warnings." + ) class LogSplitterFormats(Enum): - jsonl = 'jsonl' - csv = 'csv' + jsonl = "jsonl" + csv = "csv" class LogSplitter: """:class:`LogSplitter` will split parsed logs into different files based on the entry type. The output format can be set for each entry type.""" - def __init__(self, output_factory=Callable[[str, LogSplitterFormats], TextIO], - output_entry_type=False) -> None: + def __init__( + self, + output_factory=Callable[[str, LogSplitterFormats], TextIO], + output_entry_type=False, + ) -> None: self.output_factory = output_factory self.outputs: Dict[str, Tuple[Callable[[LogEntry], None], TextIO]] = {} self.formats: Dict[str, LogSplitterFormats] = {} - self.exclude = {'entry_type'} if not output_entry_type else set() + self.exclude = {"entry_type"} if not output_entry_type else set() def set_format(self, entry_type: Type[LogEntry], output_format: LogSplitterFormats): self.formats[entry_type.alias()] = output_format @@ -139,7 +145,9 @@ class LogSplitter: write, _ = self.outputs.get(entry.entry_type, (None, None)) if write is None: - output_format = self.formats.get(entry.entry_type, LogSplitterFormats.csv) + output_format = self.formats.get( + entry.entry_type, LogSplitterFormats.csv + ) output_stream = self.output_factory(entry.entry_type, output_format) write = self._formatting_writer(entry, output_stream, output_format) @@ -148,19 +156,19 @@ class LogSplitter: write(entry) def _formatting_writer( - self, - entry: LogEntry, - output_stream: TextIO, - output_format: LogSplitterFormats + self, entry: LogEntry, output_stream: TextIO, output_format: LogSplitterFormats ) -> Callable[[LogEntry], None]: if output_format == LogSplitterFormats.csv: - writer = DictWriter(output_stream, fieldnames=entry.model_dump(exclude=self.exclude).keys()) + writer = DictWriter( + output_stream, fieldnames=entry.model_dump(exclude=self.exclude).keys() + ) writer.writeheader() return lambda x: writer.writerow(x.model_dump(exclude=self.exclude)) elif output_format == LogSplitterFormats.jsonl: + def write_jsonl(x: LogEntry): - output_stream.write(x.model_dump_json(exclude=self.exclude) + '\n') + output_stream.write(x.model_dump_json(exclude=self.exclude) + "\n") return write_jsonl @@ -181,7 +189,9 @@ type NodeId = str class Event(LogEntry): node: NodeId name: str # XXX this ends up being redundant for custom event schemas... need to think of a better solution. - timestamp: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now(datetime.UTC)) + timestamp: datetime.datetime = Field( + default_factory=lambda: datetime.datetime.now(datetime.UTC) + ) class Metric(Event): @@ -189,8 +199,8 @@ class Metric(Event): class RequestEventType(Enum): - start = 'start' - end = 'end' + start = "start" + end = "end" class RequestEvent(Event): diff --git a/benchmarks/core/network.py b/benchmarks/core/network.py index bdf3f75..62c0650 100644 --- a/benchmarks/core/network.py +++ b/benchmarks/core/network.py @@ -4,8 +4,8 @@ from pathlib import Path from typing_extensions import Generic, TypeVar, Union -TNetworkHandle = TypeVar('TNetworkHandle') -TInitialMetadata = TypeVar('TInitialMetadata') +TNetworkHandle = TypeVar("TNetworkHandle") +TInitialMetadata = TypeVar("TInitialMetadata") class DownloadHandle(ABC): @@ -31,9 +31,9 @@ class Node(ABC, Generic[TNetworkHandle, TInitialMetadata]): @abstractmethod def seed( - self, - file: Path, - handle: Union[TInitialMetadata, TNetworkHandle], + self, + file: Path, + handle: Union[TInitialMetadata, TNetworkHandle], ) -> TNetworkHandle: """ Makes the current :class:`Node` a seeder for the specified file. diff --git a/benchmarks/core/pydantic.py b/benchmarks/core/pydantic.py index b8a45b6..c0340f3 100644 --- a/benchmarks/core/pydantic.py +++ b/benchmarks/core/pydantic.py @@ -5,21 +5,19 @@ from pydantic import BaseModel, AfterValidator, IPvAnyAddress def drop_config_suffix(name: str) -> str: - return name[:-6] if name.endswith('Config') else name + return name[:-6] if name.endswith("Config") else name def to_snake_case(name: str) -> str: - return re.sub(r'(? Generator[Tuple[logging.Logger, StringIO], None, None]: output = StringIO() - logger = logging.getLogger('test_logger') + logger = logging.getLogger("test_logger") logger.setLevel(logging.INFO) for handler in logger.handlers: logger.removeHandler(handler) diff --git a/benchmarks/core/tests/test_config.py b/benchmarks/core/tests/test_config.py index 68f76cc..23f63ef 100644 --- a/benchmarks/core/tests/test_config.py +++ b/benchmarks/core/tests/test_config.py @@ -32,8 +32,8 @@ def test_should_parse_multiple_roots(): conf = parser.parse(yaml.safe_load(config_file)) - assert cast(Root1, conf['root1']).index == 1 - assert cast(Root2, conf['root2']).name == 'root2' + assert cast(Root1, conf["root1"]).index == 1 + assert cast(Root2, conf["root2"]).name == "root2" def test_should_expand_env_vars_when_fed_a_config_file(): @@ -44,13 +44,13 @@ def test_should_expand_env_vars_when_fed_a_config_file(): name: "My name is ${BTB_NAME}" """) - os.environ['BTB_MY_INDEX'] = '10' - os.environ['BTB_NAME'] = 'John Doe' + os.environ["BTB_MY_INDEX"] = "10" + os.environ["BTB_NAME"] = "John Doe" parser = ConfigParser() parser.register(Root1) parser.register(Root2) conf = parser.parse(config_file) - assert cast(Root1, conf['root1']).index == 10 - assert cast(Root2, conf['root2']).name == 'My name is John Doe' + assert cast(Root1, conf["root1"]).index == 10 + assert cast(Root2, conf["root2"]).name == "My name is John Doe" diff --git a/benchmarks/core/tests/test_logging.py b/benchmarks/core/tests/test_logging.py index c33bbd5..88fe801 100644 --- a/benchmarks/core/tests/test_logging.py +++ b/benchmarks/core/tests/test_logging.py @@ -16,17 +16,16 @@ class MetricsEvent(LogEntry): def test_log_entry_should_serialize_to_expected_format(): event = MetricsEvent( - name='download', - timestamp=datetime.datetime( - 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc - ), + name="download", + timestamp=datetime.datetime(2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc), value=0.245, - node='node1', + node="node1", ) - assert str( - event) == ('>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.245,' - '"node":"node1","entry_type":"metrics_event"}') + assert str(event) == ( + '>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.245,' + '"node":"node1","entry_type":"metrics_event"}' + ) def test_should_parse_logs(): @@ -40,20 +39,20 @@ def test_should_parse_logs(): assert list(parser.parse(log)) == [ MetricsEvent( - name='download', + name="download", timestamp=datetime.datetime( 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc ), value=0.245, - node='node1', + node="node1", ), MetricsEvent( - name='download', + name="download", timestamp=datetime.datetime( 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc ), value=0.246, - node='node2', + node="node2", ), ] @@ -72,20 +71,20 @@ def test_should_skip_unparseable_lines(): assert list(parser.parse(log)) == [ MetricsEvent( - name='download', + name="download", timestamp=datetime.datetime( 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc ), value=0.246, - node='node2', + node="node2", ), MetricsEvent( - name='download', + name="download", timestamp=datetime.datetime( 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc ), value=0.246, - node='node3', + node="node3", ), ] @@ -99,16 +98,16 @@ def test_should_recover_logged_events_at_parsing(mock_logger): logger, output = mock_logger events = [ - StateChangeEvent(old='stopped', new='started'), + StateChangeEvent(old="stopped", new="started"), MetricsEvent( - name='download', + name="download", timestamp=datetime.datetime( 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc ), value=0.246, - node='node3', + node="node3", ), - StateChangeEvent(old='started', new='stopped'), + StateChangeEvent(old="started", new="stopped"), ] for event in events: @@ -152,21 +151,27 @@ def test_should_split_intertwined_logs_by_entry_type(): splitter.split(parser.parse(log)) - assert compact(outputs['metrics_event'].getvalue()) == (compact(""" + assert compact(outputs["metrics_event"].getvalue()) == ( + compact(""" name,timestamp,value,node download,2021-01-01 00:00:00+00:00,0.246,node2 - """)) + """) + ) - assert compact(outputs['simple_event'].getvalue()) == (compact(""" + assert compact(outputs["simple_event"].getvalue()) == ( + compact(""" name,timestamp start,2021-01-01 00:00:00+00:00 start2,2021-01-01 00:00:00+00:00 - """)) + """) + ) - assert compact(outputs['person'].getvalue()) == (compact(""" + assert compact(outputs["person"].getvalue()) == ( + compact(""" name,surname John,Doe - """)) + """) + ) class SomeConfig(SnakeCaseModel): @@ -176,16 +181,20 @@ class SomeConfig(SnakeCaseModel): def test_should_adapt_existing_model_types_to_logging_types(): SomeConfigLogEntry = LogEntry.adapt(SomeConfig) - assert (str(SomeConfigLogEntry(some_field='value')) == - '>>{"some_field":"value","entry_type":"some_config_log_entry"}') + assert ( + str(SomeConfigLogEntry(some_field="value")) + == '>>{"some_field":"value","entry_type":"some_config_log_entry"}' + ) def test_should_adapt_existing_model_instances_to_logging_instances(): SomeConfigLogEntry = LogEntry.adapt(SomeConfig) - instance = SomeConfig(some_field='value') + instance = SomeConfig(some_field="value") - assert (str(SomeConfigLogEntry.adapt_instance(instance)) == - '>>{"some_field":"value","entry_type":"some_config_log_entry"}') + assert ( + str(SomeConfigLogEntry.adapt_instance(instance)) + == '>>{"some_field":"value","entry_type":"some_config_log_entry"}' + ) def test_should_store_split_logs_as_jsonl_for_requested_types(): @@ -209,12 +218,16 @@ def test_should_store_split_logs_as_jsonl_for_requested_types(): splitter.split(parser.parse(log)) - assert compact(outputs['metrics_event'].getvalue()) == (compact(""" + assert compact(outputs["metrics_event"].getvalue()) == ( + compact(""" name,timestamp,value,node download,2021-01-01 00:00:00+00:00,0.246,node2 - """)) + """) + ) - assert compact(outputs['simple_event'].getvalue()) == (compact(""" + assert compact(outputs["simple_event"].getvalue()) == ( + compact(""" {"name":"start","timestamp":"2021-01-01T00:00:00Z"} {"name":"start2","timestamp":"2021-01-01T00:00:00Z"} - """)) + """) + ) diff --git a/benchmarks/core/tests/test_pydantic.py b/benchmarks/core/tests/test_pydantic.py index 3583e8a..c3fe260 100644 --- a/benchmarks/core/tests/test_pydantic.py +++ b/benchmarks/core/tests/test_pydantic.py @@ -7,41 +7,41 @@ from benchmarks.core.pydantic import DomainName, Host def test_should_parse_ipv4_address(): - h = TypeAdapter(Host).validate_strings('192.168.1.1') - assert h == IPv4Address('192.168.1.1') + h = TypeAdapter(Host).validate_strings("192.168.1.1") + assert h == IPv4Address("192.168.1.1") def test_should_parse_ipv6_address(): - h = TypeAdapter(Host).validate_strings('2001:0000:130F:0000:0000:09C0:876A:130B') - assert h == IPv6Address('2001:0000:130F:0000:0000:09C0:876A:130B') + h = TypeAdapter(Host).validate_strings("2001:0000:130F:0000:0000:09C0:876A:130B") + assert h == IPv6Address("2001:0000:130F:0000:0000:09C0:876A:130B") def test_should_parse_simple_dns_names(): - h = TypeAdapter(Host).validate_strings('node-1.local.svc') - assert h == DomainName('node-1.local.svc') + h = TypeAdapter(Host).validate_strings("node-1.local.svc") + assert h == DomainName("node-1.local.svc") def test_should_parse_localhost(): - h = TypeAdapter(Host).validate_strings('localhost') - assert h == DomainName('localhost') + h = TypeAdapter(Host).validate_strings("localhost") + assert h == DomainName("localhost") def test_should_return_correct_string_representation_for_addresses(): - h = TypeAdapter(Host).validate_strings('localhost') - assert h == DomainName('localhost') + h = TypeAdapter(Host).validate_strings("localhost") + assert h == DomainName("localhost") - h = TypeAdapter(Host).validate_strings('192.168.1.1') - assert h == IPv4Address('192.168.1.1') + h = TypeAdapter(Host).validate_strings("192.168.1.1") + assert h == IPv4Address("192.168.1.1") def test_should_fail_invalid_names(): invalid_names = [ - '-node-1.local.svc', - 'node-1.local..svc', - 'node-1.local.svc.', - 'node-1.local.reallylongsubdomain', - 'node-1.local.s-dash', - 'notlocalhost', + "-node-1.local.svc", + "node-1.local..svc", + "node-1.local.svc.", + "node-1.local.reallylongsubdomain", + "node-1.local.s-dash", + "notlocalhost", ] for invalid_name in invalid_names: diff --git a/benchmarks/core/utils.py b/benchmarks/core/utils.py index d4e09fa..949c633 100644 --- a/benchmarks/core/utils.py +++ b/benchmarks/core/utils.py @@ -16,7 +16,7 @@ from benchmarks.core.network import TInitialMetadata @dataclass class ExperimentData(Generic[TInitialMetadata], AbstractContextManager, ABC): """:class:`ExperimentData` provides a context for providing and wiping out - data and metadata objects, usually within the scope of an experiment. """ + data and metadata objects, usually within the scope of an experiment.""" @abstractmethod def __enter__(self) -> Tuple[TInitialMetadata, Path]: @@ -30,7 +30,6 @@ class ExperimentData(Generic[TInitialMetadata], AbstractContextManager, ABC): class RandomTempData(ExperimentData[TInitialMetadata]): - def __init__(self, size: int, meta: TInitialMetadata): self.meta = meta self.size = size @@ -38,9 +37,9 @@ class RandomTempData(ExperimentData[TInitialMetadata]): def __enter__(self) -> Tuple[TInitialMetadata, Path]: if self._context is not None: - raise Exception('Cannot enter context twice') + raise Exception("Cannot enter context twice") - self._context = temp_random_file(self.size, 'data.bin') + self._context = temp_random_file(self.size, "data.bin") return self.meta, self._context.__enter__() @@ -49,18 +48,20 @@ class RandomTempData(ExperimentData[TInitialMetadata]): @contextmanager -def temp_random_file(size: int, name: str = 'data.bin'): +def temp_random_file(size: int, name: str = "data.bin"): with tempfile.TemporaryDirectory() as temp_dir_str: temp_dir = Path(temp_dir_str) random_file = temp_dir / name random_bytes = os.urandom(size) - with random_file.open('wb') as outfile: + with random_file.open("wb") as outfile: outfile.write(random_bytes) yield random_file -def await_predicate(predicate: Callable[[], bool], timeout: float = 0, polling_interval: float = 0) -> bool: +def await_predicate( + predicate: Callable[[], bool], timeout: float = 0, polling_interval: float = 0 +) -> bool: current = time() while (timeout == 0) or ((time() - current) <= timeout): if predicate(): diff --git a/benchmarks/deluge/config.py b/benchmarks/deluge/config.py index 0ee95b6..78b3726 100644 --- a/benchmarks/deluge/config.py +++ b/benchmarks/deluge/config.py @@ -7,7 +7,11 @@ from torrentool.torrent import Torrent from urllib3.util import parse_url from benchmarks.core.config import ExperimentBuilder -from benchmarks.core.experiments.experiments import IteratedExperiment, ExperimentEnvironment, BoundExperiment +from benchmarks.core.experiments.experiments import ( + IteratedExperiment, + ExperimentEnvironment, + BoundExperiment, +) from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment from benchmarks.core.pydantic import Host from benchmarks.core.utils import sample, RandomTempData @@ -31,7 +35,7 @@ class DelugeNodeSetConfig(BaseModel): first_node_index: int = 1 nodes: List[DelugeNodeConfig] = [] - @model_validator(mode='after') + @model_validator(mode="after") def expand_nodes(self): self.nodes = [ DelugeNodeConfig( @@ -40,29 +44,46 @@ class DelugeNodeSetConfig(BaseModel): daemon_port=self.daemon_port, listen_ports=self.listen_ports, ) - for i in range(self.first_node_index, self.first_node_index + self.network_size) + for i in range( + self.first_node_index, self.first_node_index + self.network_size + ) ] return self -DelugeDisseminationExperiment = IteratedExperiment[BoundExperiment[StaticDisseminationExperiment[Torrent, DelugeMeta]]] +DelugeDisseminationExperiment = IteratedExperiment[ + BoundExperiment[StaticDisseminationExperiment[Torrent, DelugeMeta]] +] class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]): - seeder_sets: int = Field(gt=0, default=1, description='Number of distinct seeder sets to experiment with') - seeders: int = Field(gt=0, description='Number of seeders per seeder set') + seeder_sets: int = Field( + gt=0, default=1, description="Number of distinct seeder sets to experiment with" + ) + seeders: int = Field(gt=0, description="Number of seeders per seeder set") - repetitions: int = Field(gt=0, description='How many experiment repetitions to run for each seeder set') - file_size: int = Field(gt=0, description='File size, in bytes') + repetitions: int = Field( + gt=0, description="How many experiment repetitions to run for each seeder set" + ) + file_size: int = Field(gt=0, description="File size, in bytes") - shared_volume_path: Path = Field(description='Path to the volume shared between clients and experiment runner') - tracker_announce_url: HttpUrl = Field(description='URL to the tracker announce endpoint') + shared_volume_path: Path = Field( + description="Path to the volume shared between clients and experiment runner" + ) + tracker_announce_url: HttpUrl = Field( + description="URL to the tracker announce endpoint" + ) nodes: List[DelugeNodeConfig] | DelugeNodeSetConfig = Field( - description='Configuration for the nodes that make up the network') + description="Configuration for the nodes that make up the network" + ) def build(self) -> DelugeDisseminationExperiment: - nodes_specs = self.nodes.nodes if isinstance(self.nodes, DelugeNodeSetConfig) else self.nodes + nodes_specs = ( + self.nodes.nodes + if isinstance(self.nodes, DelugeNodeSetConfig) + else self.nodes + ) network = [ DelugeNode( @@ -85,12 +106,18 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]): for seeder_set in range(self.seeder_sets): seeders = list(islice(sample(len(network)), self.seeders)) for experiment_run in range(self.repetitions): - yield env.bind(StaticDisseminationExperiment( - network=network, - seeders=seeders, - data=RandomTempData(size=self.file_size, - meta=DelugeMeta(f'dataset-{seeder_set}-{experiment_run}', - announce_url=tracker.announce_url)) - )) + yield env.bind( + StaticDisseminationExperiment( + network=network, + seeders=seeders, + data=RandomTempData( + size=self.file_size, + meta=DelugeMeta( + f"dataset-{seeder_set}-{experiment_run}", + announce_url=tracker.announce_url, + ), + ), + ) + ) return IteratedExperiment(repetitions()) diff --git a/benchmarks/deluge/deluge_node.py b/benchmarks/deluge/deluge_node.py index fd490a1..e546271 100644 --- a/benchmarks/deluge/deluge_node.py +++ b/benchmarks/deluge/deluge_node.py @@ -23,33 +23,33 @@ logger = logging.getLogger(__name__) class DelugeMeta: """:class:`DelugeMeta` represents the initial metadata required so that a :class:`DelugeNode` can introduce a file into the network, becoming its initial seeder.""" + name: str announce_url: Url class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): - def __init__( - self, - name: str, - volume: Path, - daemon_port: int, - daemon_address: str = 'localhost', - daemon_username: str = 'user', - daemon_password: str = 'password', + self, + name: str, + volume: Path, + daemon_port: int, + daemon_address: str = "localhost", + daemon_username: str = "user", + daemon_password: str = "password", ) -> None: if not pathvalidate.is_valid_filename(name): raise ValueError(f'Node name must be a valid filename (bad name: "{name}")') self._name = name - self.downloads_root = volume / name / 'downloads' + self.downloads_root = volume / name / "downloads" self._rpc: Optional[DelugeRPCClient] = None self.daemon_args = { - 'host': daemon_address, - 'port': daemon_port, - 'username': daemon_username, - 'password': daemon_password, + "host": daemon_address, + "port": daemon_port, + "username": daemon_username, + "password": daemon_password, } super().__init__(self.downloads_root) @@ -65,7 +65,7 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): if torrent_ids: errors = self.rpc.core.remove_torrents(torrent_ids, remove_data=True) if errors: - raise Exception(f'There were errors removing torrents: {errors}') + raise Exception(f"There were errors removing torrents: {errors}") # Wipe download folder to get rid of files that got uploaded but failed # seeding or deletes. @@ -80,9 +80,9 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): self._init_folders() def seed( - self, - file: Path, - handle: Union[DelugeMeta, Torrent], + self, + file: Path, + handle: Union[DelugeMeta, Torrent], ) -> Torrent: data_root = self.downloads_root / handle.name data_root.mkdir(parents=True, exist_ok=False) @@ -97,7 +97,7 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): torrent = handle self.rpc.core.add_torrent_file( - filename=f'{handle.name}.torrent', + filename=f"{handle.name}.torrent", filedump=self._b64dump(torrent), options=dict(), ) @@ -106,7 +106,7 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): def leech(self, handle: Torrent) -> DownloadHandle: self.rpc.core.add_torrent_file( - filename=f'{handle.name}.torrent', + filename=f"{handle.name}.torrent", filedump=self._b64dump(handle), options=dict(), ) @@ -117,7 +117,7 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): ) def torrent_info(self, name: str) -> List[Dict[bytes, Any]]: - return list(self.rpc.core.get_torrents_status({'name': name}, []).values()) + return list(self.rpc.core.get_torrents_status({"name": name}, []).values()) @property def rpc(self) -> DelugeRPCClient: @@ -152,7 +152,6 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): class DelugeDownloadHandle(DownloadHandle): - def __init__(self, torrent: Torrent, node: DelugeNode) -> None: self.node = node self.torrent = torrent @@ -161,11 +160,13 @@ class DelugeDownloadHandle(DownloadHandle): name = self.torrent.name def _predicate(): - response = self.node.rpc.core.get_torrents_status({'name': name}, []) + response = self.node.rpc.core.get_torrents_status({"name": name}, []) if len(response) > 1: - logger.warning(f'Client has multiple torrents matching name {name}. Returning the first one.') + logger.warning( + f"Client has multiple torrents matching name {name}. Returning the first one." + ) status = list(response.values())[0] - return status[b'is_seed'] + return status[b"is_seed"] return await_predicate(_predicate, timeout=timeout) diff --git a/benchmarks/deluge/logging.py b/benchmarks/deluge/logging.py index 33b5a35..5a0643e 100644 --- a/benchmarks/deluge/logging.py +++ b/benchmarks/deluge/logging.py @@ -2,4 +2,4 @@ from benchmarks.core.tests.test_logging import MetricsEvent class DelugeTorrentDownload(MetricsEvent): - torrent_name: str \ No newline at end of file + torrent_name: str diff --git a/benchmarks/deluge/tests/fixtures.py b/benchmarks/deluge/tests/fixtures.py index e723c99..afaa9bd 100644 --- a/benchmarks/deluge/tests/fixtures.py +++ b/benchmarks/deluge/tests/fixtures.py @@ -12,8 +12,12 @@ from benchmarks.deluge.tracker import Tracker from benchmarks.tests.utils import shared_volume -def deluge_node(name: str, address: str, port: int) -> Generator[DelugeNode, None, None]: - node = DelugeNode(name, volume=shared_volume(), daemon_address=address, daemon_port=port) +def deluge_node( + name: str, address: str, port: int +) -> Generator[DelugeNode, None, None]: + node = DelugeNode( + name, volume=shared_volume(), daemon_address=address, daemon_port=port + ) assert await_predicate(node.is_ready, timeout=10, polling_interval=0.5) node.wipe_all_torrents() try: @@ -24,17 +28,23 @@ def deluge_node(name: str, address: str, port: int) -> Generator[DelugeNode, Non @pytest.fixture def deluge_node1() -> Generator[DelugeNode, None, None]: - yield from deluge_node('deluge-1', os.environ.get('DELUGE_NODE_1', 'localhost'), 6890) + yield from deluge_node( + "deluge-1", os.environ.get("DELUGE_NODE_1", "localhost"), 6890 + ) @pytest.fixture def deluge_node2() -> Generator[DelugeNode, None, None]: - yield from deluge_node('deluge-2', os.environ.get('DELUGE_NODE_2', 'localhost'), 6893) + yield from deluge_node( + "deluge-2", os.environ.get("DELUGE_NODE_2", "localhost"), 6893 + ) @pytest.fixture def deluge_node3() -> Generator[DelugeNode, None, None]: - yield from deluge_node('deluge-3', os.environ.get('DELUGE_NODE_3', 'localhost'), 6896) + yield from deluge_node( + "deluge-3", os.environ.get("DELUGE_NODE_3", "localhost"), 6896 + ) @pytest.fixture @@ -45,4 +55,8 @@ def temp_random_file() -> Generator[Path, None, None]: @pytest.fixture def tracker() -> Tracker: - return Tracker(parse_url(os.environ.get('TRACKER_ANNOUNCE_URL', 'http://127.0.0.1:8000/announce'))) + return Tracker( + parse_url( + os.environ.get("TRACKER_ANNOUNCE_URL", "http://127.0.0.1:8000/announce") + ) + ) diff --git a/benchmarks/deluge/tests/test_config.py b/benchmarks/deluge/tests/test_config.py index d84baf0..f861a20 100644 --- a/benchmarks/deluge/tests/test_config.py +++ b/benchmarks/deluge/tests/test_config.py @@ -5,41 +5,45 @@ from unittest.mock import patch import yaml from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment -from benchmarks.deluge.config import DelugeNodeSetConfig, DelugeNodeConfig, DelugeExperimentConfig +from benchmarks.deluge.config import ( + DelugeNodeSetConfig, + DelugeNodeConfig, + DelugeExperimentConfig, +) from benchmarks.deluge.deluge_node import DelugeNode def test_should_expand_node_sets_into_simple_nodes(): nodeset = DelugeNodeSetConfig( - name='custom-{node_index}', - address='deluge-{node_index}.local.svc', + name="custom-{node_index}", + address="deluge-{node_index}.local.svc", network_size=4, daemon_port=6080, - listen_ports=[6081, 6082] + listen_ports=[6081, 6082], ) assert nodeset.nodes == [ DelugeNodeConfig( - name='custom-1', - address='deluge-1.local.svc', + name="custom-1", + address="deluge-1.local.svc", daemon_port=6080, listen_ports=[6081, 6082], ), DelugeNodeConfig( - name='custom-2', - address='deluge-2.local.svc', + name="custom-2", + address="deluge-2.local.svc", daemon_port=6080, listen_ports=[6081, 6082], ), DelugeNodeConfig( - name='custom-3', - address='deluge-3.local.svc', + name="custom-3", + address="deluge-3.local.svc", daemon_port=6080, listen_ports=[6081, 6082], ), DelugeNodeConfig( - name='custom-4', - address='deluge-4.local.svc', + name="custom-4", + address="deluge-4.local.svc", daemon_port=6080, listen_ports=[6081, 6082], ), @@ -48,24 +52,24 @@ def test_should_expand_node_sets_into_simple_nodes(): def test_should_respect_first_node_index(): nodeset = DelugeNodeSetConfig( - name='deluge-{node_index}', - address='deluge-{node_index}.local.svc', + name="deluge-{node_index}", + address="deluge-{node_index}.local.svc", network_size=2, daemon_port=6080, listen_ports=[6081, 6082], - first_node_index=5 + first_node_index=5, ) assert nodeset.nodes == [ DelugeNodeConfig( - name='deluge-5', - address='deluge-5.local.svc', + name="deluge-5", + address="deluge-5.local.svc", daemon_port=6080, listen_ports=[6081, 6082], ), DelugeNodeConfig( - name='deluge-6', - address='deluge-6.local.svc', + name="deluge-6", + address="deluge-6.local.svc", daemon_port=6080, listen_ports=[6081, 6082], ), @@ -89,17 +93,21 @@ def test_should_build_experiment_from_config(): listen_ports: [ 6891, 6892 ] """) - config = DelugeExperimentConfig.model_validate(yaml.safe_load(config_file)['deluge_experiment']) + config = DelugeExperimentConfig.model_validate( + yaml.safe_load(config_file)["deluge_experiment"] + ) # Need to patch mkdir, or we'll try to actually create the folder when DelugeNode gets initialized. - with patch('pathlib.Path.mkdir'): + with patch("pathlib.Path.mkdir"): experiment = config.build() repetitions = list(experiment.experiments) assert len(repetitions) == 3 assert len(repetitions[0].experiment.nodes) == 10 - assert cast(DelugeNode, repetitions[0].experiment.nodes[5]).daemon_args['port'] == 6890 + assert ( + cast(DelugeNode, repetitions[0].experiment.nodes[5]).daemon_args["port"] == 6890 + ) def test_should_create_n_repetitions_per_seeder_set(): @@ -120,10 +128,12 @@ def test_should_create_n_repetitions_per_seeder_set(): listen_ports: [ 6891, 6892 ] """) - config = DelugeExperimentConfig.model_validate(yaml.safe_load(config_file)['deluge_experiment']) + config = DelugeExperimentConfig.model_validate( + yaml.safe_load(config_file)["deluge_experiment"] + ) # Need to patch mkdir, or we'll try to actually create the folder when DelugeNode gets initialized. - with patch('pathlib.Path.mkdir'): + with patch("pathlib.Path.mkdir"): experiment = config.build() repetitions = list(experiment.experiments) diff --git a/benchmarks/deluge/tests/test_deluge_node.py b/benchmarks/deluge/tests/test_deluge_node.py index d44419a..1f7e01d 100644 --- a/benchmarks/deluge/tests/test_deluge_node.py +++ b/benchmarks/deluge/tests/test_deluge_node.py @@ -13,29 +13,40 @@ def assert_is_seed(node: DelugeNode, name: str, size: int): assert len(response) == 1 info = response[0] - assert info[b'name'] == name.encode('utf-8') # not sure that this works for ANY name... - assert info[b'total_size'] == size - assert info[b'is_seed'] + assert info[b"name"] == name.encode( + "utf-8" + ) # not sure that this works for ANY name... + assert info[b"total_size"] == size + assert info[b"is_seed"] @pytest.mark.integration -def test_should_seed_files(deluge_node1: DelugeNode, temp_random_file: Path, tracker: Tracker): - assert not deluge_node1.torrent_info(name='dataset1') +def test_should_seed_files( + deluge_node1: DelugeNode, temp_random_file: Path, tracker: Tracker +): + assert not deluge_node1.torrent_info(name="dataset1") - deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker.announce_url)) - assert_is_seed(deluge_node1, name='dataset1', size=megabytes(1)) + deluge_node1.seed( + temp_random_file, DelugeMeta(name="dataset1", announce_url=tracker.announce_url) + ) + assert_is_seed(deluge_node1, name="dataset1", size=megabytes(1)) @pytest.mark.integration def test_should_download_files( - deluge_node1: DelugeNode, deluge_node2: DelugeNode, - temp_random_file: Path, tracker: Tracker): - assert not deluge_node1.torrent_info(name='dataset1') - assert not deluge_node2.torrent_info(name='dataset1') + deluge_node1: DelugeNode, + deluge_node2: DelugeNode, + temp_random_file: Path, + tracker: Tracker, +): + assert not deluge_node1.torrent_info(name="dataset1") + assert not deluge_node2.torrent_info(name="dataset1") - torrent = deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker.announce_url)) + torrent = deluge_node1.seed( + temp_random_file, DelugeMeta(name="dataset1", announce_url=tracker.announce_url) + ) handle = deluge_node2.leech(torrent) assert handle.await_for_completion(5) - assert_is_seed(deluge_node2, name='dataset1', size=megabytes(1)) + assert_is_seed(deluge_node2, name="dataset1", size=megabytes(1)) diff --git a/benchmarks/deluge/tests/test_deluge_static_experiment.py b/benchmarks/deluge/tests/test_deluge_static_experiment.py index 81b4cb0..b8e4110 100644 --- a/benchmarks/deluge/tests/test_deluge_static_experiment.py +++ b/benchmarks/deluge/tests/test_deluge_static_experiment.py @@ -8,48 +8,56 @@ from benchmarks.deluge.tests.test_deluge_node import assert_is_seed @pytest.mark.integration -def test_should_run_with_a_single_seeder(tracker, deluge_node1, deluge_node2, deluge_node3): +def test_should_run_with_a_single_seeder( + tracker, deluge_node1, deluge_node2, deluge_node3 +): size = megabytes(10) env = ExperimentEnvironment( components=[deluge_node1, deluge_node2, deluge_node3, tracker], polling_interval=0.5, ) - experiment = env.bind(StaticDisseminationExperiment( - network=[deluge_node1, deluge_node2, deluge_node3], - seeders=[1], - data=RandomTempData( - size=size, - meta=DelugeMeta('dataset-1', announce_url=tracker.announce_url) + experiment = env.bind( + StaticDisseminationExperiment( + network=[deluge_node1, deluge_node2, deluge_node3], + seeders=[1], + data=RandomTempData( + size=size, + meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url), + ), ) - )) + ) experiment.run() - assert_is_seed(deluge_node1, 'dataset-1', size) - assert_is_seed(deluge_node2, 'dataset-1', size) - assert_is_seed(deluge_node3, 'dataset-1', size) + assert_is_seed(deluge_node1, "dataset-1", size) + assert_is_seed(deluge_node2, "dataset-1", size) + assert_is_seed(deluge_node3, "dataset-1", size) @pytest.mark.integration -def test_should_run_with_multiple_seeders(tracker, deluge_node1, deluge_node2, deluge_node3): +def test_should_run_with_multiple_seeders( + tracker, deluge_node1, deluge_node2, deluge_node3 +): size = megabytes(10) env = ExperimentEnvironment( components=[deluge_node1, deluge_node2, deluge_node3, tracker], polling_interval=0.5, ) - experiment = env.bind(StaticDisseminationExperiment( - network=[deluge_node1, deluge_node2, deluge_node3], - seeders=[1, 2], - data=RandomTempData( - size=size, - meta=DelugeMeta('dataset-1', announce_url=tracker.announce_url) + experiment = env.bind( + StaticDisseminationExperiment( + network=[deluge_node1, deluge_node2, deluge_node3], + seeders=[1, 2], + data=RandomTempData( + size=size, + meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url), + ), ) - )) + ) experiment.run() - assert_is_seed(deluge_node1, 'dataset-1', size) - assert_is_seed(deluge_node2, 'dataset-1', size) - assert_is_seed(deluge_node3, 'dataset-1', size) + assert_is_seed(deluge_node1, "dataset-1", size) + assert_is_seed(deluge_node2, "dataset-1", size) + assert_is_seed(deluge_node3, "dataset-1", size) diff --git a/benchmarks/deluge/tracker.py b/benchmarks/deluge/tracker.py index 6333d70..7f67ef7 100644 --- a/benchmarks/deluge/tracker.py +++ b/benchmarks/deluge/tracker.py @@ -7,7 +7,6 @@ from benchmarks.core.experiments.experiments import ExperimentComponent class Tracker(ExperimentComponent): - def __init__(self, announce_url: Url): self.announce_url = announce_url @@ -19,4 +18,4 @@ class Tracker(ExperimentComponent): return False def __str__(self) -> str: - return f'Tracker({self.announce_url})' + return f"Tracker({self.announce_url})" diff --git a/benchmarks/tests/utils.py b/benchmarks/tests/utils.py index 485c13f..e3add7d 100644 --- a/benchmarks/tests/utils.py +++ b/benchmarks/tests/utils.py @@ -2,7 +2,8 @@ from pathlib import Path def shared_volume() -> Path: - return Path(__file__).parent.parent.parent.joinpath('volume') + return Path(__file__).parent.parent.parent.joinpath("volume") + def compact(a_string: str) -> str: - return '\n'.join([line.strip() for line in a_string.splitlines() if line.strip()]) \ No newline at end of file + return "\n".join([line.strip() for line in a_string.splitlines() if line.strip()])