From 9563a963734c06bf4eb67602d6e9b718a5d1ee12 Mon Sep 17 00:00:00 2001 From: gmega Date: Wed, 19 Feb 2025 14:00:27 -0300 Subject: [PATCH] feat: add explicit logging for experiment stage boundaries --- .../dissemination_experiment/static.py | 130 ++++++++++-------- benchmarks/core/experiments/experiments.py | 11 +- .../core/experiments/iterated_experiment.py | 3 + benchmarks/core/experiments/logging.py | 39 ++++++ .../experiments/tests/test_experiments.py | 5 +- .../tests/test_iterated_experiment.py | 8 ++ .../core/experiments/tests/test_logging.py | 102 ++++++++++++++ .../tests/test_static_experiment.py | 14 +- benchmarks/logging/logging.py | 11 +- 9 files changed, 254 insertions(+), 69 deletions(-) create mode 100644 benchmarks/core/experiments/logging.py create mode 100644 benchmarks/core/experiments/tests/test_logging.py diff --git a/benchmarks/core/experiments/dissemination_experiment/static.py b/benchmarks/core/experiments/dissemination_experiment/static.py index 2cd4247..4b05049 100644 --- a/benchmarks/core/experiments/dissemination_experiment/static.py +++ b/benchmarks/core/experiments/dissemination_experiment/static.py @@ -8,13 +8,14 @@ from typing_extensions import Generic, List, Tuple from benchmarks.core.concurrency import ensure_successful from benchmarks.core.experiments.experiments import ExperimentWithLifecycle +from benchmarks.core.experiments.logging import experiment_stage from benchmarks.core.network import ( TInitialMetadata, TNetworkHandle, Node, DownloadHandle, ) -from benchmarks.logging.logging import RequestEvent, RequestEventType +from benchmarks.logging.logging import RequestEvent, EventBoundary logger = logging.getLogger(__name__) @@ -31,12 +32,14 @@ class StaticDisseminationExperiment( seed: int, concurrency: Optional[int] = None, logging_cooldown: int = 0, + experiment_id: Optional[str] = None, ) -> None: self.nodes = network self.seeders = seeders self.meta = meta self.file_size = file_size self.seed = seed + self._experiment_id = experiment_id self._executor = ThreadPoolExecutor( max_workers=len(network) - len(seeders) @@ -46,66 +49,76 @@ class StaticDisseminationExperiment( self._cid: Optional[TNetworkHandle] = None self.logging_cooldown = logging_cooldown + def experiment_id(self) -> Optional[str]: + return self._experiment_id + def setup(self): pass def do_run(self, run: int = 0): seeders, leechers = self._split_nodes() - logger.info( - "Running experiment with %d seeders and %d leechers", - len(seeders), - len(leechers), - ) - - for node in seeders: - _log_request(node, "genseed", str(self.meta), RequestEventType.start) - self._cid = node.genseed(self.file_size, self.seed, self.meta) - _log_request(node, "genseed", str(self.meta), RequestEventType.end) - - assert self._cid is not None # to please mypy - - logger.info(f"Setting up leechers: {[str(leecher) for leecher in leechers]}") - - def _leech(leecher): - _log_request(leecher, "leech", str(self.meta), RequestEventType.start) - download = leecher.leech(self._cid) - _log_request(leecher, "leech", str(self.meta), RequestEventType.end) - return download - - logger.info("Now waiting for downloads to complete") - - downloads = ensure_successful( - [self._executor.submit(_leech, leecher) for leecher in leechers] - ) - - def _await_for_download( - element: Tuple[int, DownloadHandle], - ) -> Tuple[int, DownloadHandle]: - index, download = element - if not download.await_for_completion(): - raise Exception( - f"Download ({index}, {str(download)}) did not complete in time." - ) + with experiment_stage(self, "seeding"): logger.info( - "Download %d / %d completed (node: %s)", - index + 1, - len(downloads), - download.node.name, + "Running experiment with %d seeders and %d leechers", + len(seeders), + len(leechers), ) - return element - ensure_successful( - [ - self._executor.submit(_await_for_download, (i, download)) - for i, download in enumerate(downloads) - ] - ) + for node in seeders: + _log_request(node, "genseed", str(self.meta), EventBoundary.start) + self._cid = node.genseed(self.file_size, self.seed, self.meta) + _log_request(node, "genseed", str(self.meta), EventBoundary.end) - # FIXME this is a hack to ensure that nodes get a chance to log their data before we - # run the teardown hook and remove the torrents. - logger.info(f"Waiting for {self.logging_cooldown} seconds before teardown...") - sleep(self.logging_cooldown) + assert self._cid is not None # to please mypy + + with experiment_stage(self, "leeching"): + logger.info( + f"Setting up leechers: {[str(leecher) for leecher in leechers]}" + ) + + def _leech(leecher): + _log_request(leecher, "leech", str(self.meta), EventBoundary.start) + download = leecher.leech(self._cid) + _log_request(leecher, "leech", str(self.meta), EventBoundary.end) + return download + + downloads = ensure_successful( + [self._executor.submit(_leech, leecher) for leecher in leechers] + ) + + with experiment_stage(self, "downloading"): + + def _await_for_download( + element: Tuple[int, DownloadHandle], + ) -> Tuple[int, DownloadHandle]: + index, download = element + if not download.await_for_completion(): + raise Exception( + f"Download ({index}, {str(download)}) did not complete in time." + ) + logger.info( + "Download %d / %d completed (node: %s)", + index + 1, + len(downloads), + download.node.name, + ) + return element + + ensure_successful( + [ + self._executor.submit(_await_for_download, (i, download)) + for i, download in enumerate(downloads) + ] + ) + + with experiment_stage(self, "log_cooldown"): + # FIXME this is a hack to ensure that nodes get a chance to log their data before we + # run the teardown hook and remove the torrents. + logger.info( + f"Waiting for {self.logging_cooldown} seconds before teardown..." + ) + sleep(self.logging_cooldown) def teardown(self, exception: Optional[Exception] = None): logger.info("Tearing down experiment.") @@ -123,12 +136,13 @@ class StaticDisseminationExperiment( return element try: - ensure_successful( - [ - self._executor.submit(_remove, (i, node)) - for i, node in enumerate(self.nodes) - ] - ) + with experiment_stage(self, "deleting"): + ensure_successful( + [ + self._executor.submit(_remove, (i, node)) + for i, node in enumerate(self.nodes) + ] + ) finally: logger.info("Shut down thread pool.") self._executor.shutdown(wait=True) @@ -149,7 +163,7 @@ def _log_request( node: Node[TNetworkHandle, TInitialMetadata], name: str, request_id: str, - event_type: RequestEventType, + event_type: EventBoundary, ): logger.info( RequestEvent( diff --git a/benchmarks/core/experiments/experiments.py b/benchmarks/core/experiments/experiments.py index 5188f0d..e545bea 100644 --- a/benchmarks/core/experiments/experiments.py +++ b/benchmarks/core/experiments/experiments.py @@ -17,6 +17,12 @@ logger = logging.getLogger(__name__) class Experiment(ABC): """Base interface for an executable :class:`Experiment`.""" + @abstractmethod + def experiment_id(self) -> Optional[str]: + """A meaningful identifier for the experiment. What this means is experiment-specific. + Anonymous experiments can simply return None.""" + pass + @abstractmethod def run(self): """Synchronously runs the experiment, blocking the current thread until it's done.""" @@ -28,7 +34,7 @@ TExperiment = TypeVar("TExperiment", bound=Experiment) ExperimentBuilder = Builder[TExperiment] -class ExperimentWithLifecycle(Experiment): +class ExperimentWithLifecycle(Experiment, ABC): """An :class:`ExperimentWithLifecycle` is a basic implementation of an :class:`Experiment` with overridable lifecycle hooks.""" @@ -131,5 +137,8 @@ class BoundExperiment(Experiment, Generic[TExperiment]): self.experiment = experiment self.env = env + def experiment_id(self) -> Optional[str]: + return self.experiment.experiment_id() + def run(self): self.env.run(self.experiment) diff --git a/benchmarks/core/experiments/iterated_experiment.py b/benchmarks/core/experiments/iterated_experiment.py index 31b0891..4c26d15 100644 --- a/benchmarks/core/experiments/iterated_experiment.py +++ b/benchmarks/core/experiments/iterated_experiment.py @@ -25,6 +25,9 @@ class IteratedExperiment(Experiment, Generic[TExperiment]): self.raise_when_failures = raise_when_failures self.experiments = experiments + def experiment_id(self) -> str: + return self.experiment_set_id + def run(self): for i, experiment in enumerate(self.experiments): start = time.time() diff --git a/benchmarks/core/experiments/logging.py b/benchmarks/core/experiments/logging.py new file mode 100644 index 0000000..d7e0da9 --- /dev/null +++ b/benchmarks/core/experiments/logging.py @@ -0,0 +1,39 @@ +from contextlib import contextmanager +import logging + +from benchmarks.core.experiments.experiments import Experiment +from benchmarks.logging.logging import ExperimentStage, EventBoundary + +logger = logging.getLogger(__name__) + + +@contextmanager +def experiment_stage(experiment: Experiment, name: str): + logger.info( + ExperimentStage( + name=experiment.experiment_id() or "", + stage=name, + type=EventBoundary.start, + ) + ) + + try: + yield + except Exception as exc: + logger.info( + ExperimentStage( + name=experiment.experiment_id() or "", + stage=name, + type=EventBoundary.end, + error=str(exc), + ) + ) + raise + + logger.info( + ExperimentStage( + name=experiment.experiment_id() or "", + stage=name, + type=EventBoundary.end, + ) + ) diff --git a/benchmarks/core/experiments/tests/test_experiments.py b/benchmarks/core/experiments/tests/test_experiments.py index 973ae3c..3acc2df 100644 --- a/benchmarks/core/experiments/tests/test_experiments.py +++ b/benchmarks/core/experiments/tests/test_experiments.py @@ -1,5 +1,5 @@ from time import sleep -from typing import List +from typing import List, Optional from benchmarks.core.experiments.experiments import ( ExperimentComponent, @@ -59,6 +59,9 @@ class ExperimentThatReliesOnComponents(Experiment): def __init__(self, components: List[ExperimentComponent]): self.components = components + def experiment_id(self) -> Optional[str]: + return None + def run(self): assert all(component.is_ready() for component in self.components) diff --git a/benchmarks/core/experiments/tests/test_iterated_experiment.py b/benchmarks/core/experiments/tests/test_iterated_experiment.py index 95dc7ce..aa21f25 100644 --- a/benchmarks/core/experiments/tests/test_iterated_experiment.py +++ b/benchmarks/core/experiments/tests/test_iterated_experiment.py @@ -1,3 +1,5 @@ +from typing import Optional + from benchmarks.core.experiments.experiments import Experiment from benchmarks.core.experiments.iterated_experiment import IteratedExperiment @@ -6,6 +8,9 @@ class SimpleExperiment(Experiment): def __init__(self): self.ran = False + def experiment_id(self) -> Optional[str]: + return None + def run(self): self.ran = True @@ -27,6 +32,9 @@ def test_should_run_experiment_repetitions(): def test_should_register_failed_repetitions(): class FailingExperiment(Experiment): + def experiment_id(self) -> Optional[str]: + return None + def run(self): raise RuntimeError("This experiment failed.") diff --git a/benchmarks/core/experiments/tests/test_logging.py b/benchmarks/core/experiments/tests/test_logging.py new file mode 100644 index 0000000..c28b0fd --- /dev/null +++ b/benchmarks/core/experiments/tests/test_logging.py @@ -0,0 +1,102 @@ +from io import StringIO +from unittest.mock import patch + +from benchmarks.core.experiments.experiments import Experiment +from benchmarks.core.experiments.logging import experiment_stage +from benchmarks.logging.logging import LogParser, ExperimentStage, EventBoundary + + +class SomeExperiment(Experiment): + def __init__(self, should_raise=False): + self.should_raise = should_raise + + def experiment_id(self): + return "some-experiment" + + def run(self): + with experiment_stage(self, "stage1"): + pass + + with experiment_stage(self, "stage2"): + if self.should_raise: + raise RuntimeError("Error in experiment") + + +def test_should_log_experiment_stages(mock_logger): + logger, output = mock_logger + with patch("benchmarks.core.experiments.logging.logger", logger): + experiment = SomeExperiment() + experiment.run() + + parser = LogParser() + parser.register(ExperimentStage) + events = list(parser.parse(StringIO(output.getvalue()))) + + assert events == [ + ExperimentStage( + name="some-experiment", + stage="stage1", + type=EventBoundary.start, + timestamp=events[0].timestamp, + ), + ExperimentStage( + name="some-experiment", + stage="stage1", + type=EventBoundary.end, + timestamp=events[1].timestamp, + ), + ExperimentStage( + name="some-experiment", + stage="stage2", + type=EventBoundary.start, + timestamp=events[2].timestamp, + ), + ExperimentStage( + name="some-experiment", + stage="stage2", + type=EventBoundary.end, + timestamp=events[3].timestamp, + ), + ] + + +def test_should_log_errors_when_thrown(mock_logger): + logger, output = mock_logger + with patch("benchmarks.core.experiments.logging.logger", logger): + experiment = SomeExperiment(should_raise=True) + try: + experiment.run() + except RuntimeError: + pass + + parser = LogParser() + parser.register(ExperimentStage) + events = list(parser.parse(StringIO(output.getvalue()))) + + assert events == [ + ExperimentStage( + name="some-experiment", + stage="stage1", + type=EventBoundary.start, + timestamp=events[0].timestamp, + ), + ExperimentStage( + name="some-experiment", + stage="stage1", + type=EventBoundary.end, + timestamp=events[1].timestamp, + ), + ExperimentStage( + name="some-experiment", + stage="stage2", + type=EventBoundary.start, + timestamp=events[2].timestamp, + ), + ExperimentStage( + name="some-experiment", + stage="stage2", + type=EventBoundary.end, + error="Error in experiment", + timestamp=events[3].timestamp, + ), + ] diff --git a/benchmarks/core/experiments/tests/test_static_experiment.py b/benchmarks/core/experiments/tests/test_static_experiment.py index cadc412..06e3594 100644 --- a/benchmarks/core/experiments/tests/test_static_experiment.py +++ b/benchmarks/core/experiments/tests/test_static_experiment.py @@ -8,7 +8,7 @@ from benchmarks.core.experiments.dissemination_experiment.static import ( StaticDisseminationExperiment, ) from benchmarks.core.network import Node, DownloadHandle -from benchmarks.logging.logging import LogParser, RequestEvent, RequestEventType +from benchmarks.logging.logging import LogParser, RequestEvent, EventBoundary @dataclass @@ -178,7 +178,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger): node="runner", name="genseed", request_id="dataset-1", - type=RequestEventType.start, + type=EventBoundary.start, timestamp=events[0].timestamp, ), RequestEvent( @@ -186,7 +186,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger): node="runner", name="genseed", request_id="dataset-1", - type=RequestEventType.end, + type=EventBoundary.end, timestamp=events[1].timestamp, ), RequestEvent( @@ -194,7 +194,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger): node="runner", name="leech", request_id="dataset-1", - type=RequestEventType.start, + type=EventBoundary.start, timestamp=events[2].timestamp, ), RequestEvent( @@ -202,7 +202,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger): node="runner", name="leech", request_id="dataset-1", - type=RequestEventType.end, + type=EventBoundary.end, timestamp=events[3].timestamp, ), RequestEvent( @@ -210,7 +210,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger): node="runner", name="leech", request_id="dataset-1", - type=RequestEventType.start, + type=EventBoundary.start, timestamp=events[4].timestamp, ), RequestEvent( @@ -218,7 +218,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger): node="runner", name="leech", request_id="dataset-1", - type=RequestEventType.end, + type=EventBoundary.end, timestamp=events[5].timestamp, ), ] diff --git a/benchmarks/logging/logging.py b/benchmarks/logging/logging.py index f93e9ed..3084355 100644 --- a/benchmarks/logging/logging.py +++ b/benchmarks/logging/logging.py @@ -247,7 +247,7 @@ class DownloadMetric(Metric): dataset_name: str -class RequestEventType(Enum): +class EventBoundary(Enum): start = "start" end = "end" @@ -255,7 +255,7 @@ class RequestEventType(Enum): class RequestEvent(NodeEvent): destination: NodeId request_id: str - type: RequestEventType + type: EventBoundary class ExperimentStatus(Event): @@ -264,6 +264,12 @@ class ExperimentStatus(Event): error: Optional[str] = None +class ExperimentStage(Event): + stage: str + type: EventBoundary + error: Optional[str] = None + + def basic_log_parser() -> LogParser: """Constructs a basic log parser which can understand some common log entry types.""" parser = LogParser() @@ -273,4 +279,5 @@ def basic_log_parser() -> LogParser: parser.register(DownloadMetric) parser.register(RequestEvent) parser.register(ExperimentStatus) + parser.register(ExperimentStage) return parser