2024-11-28 09:46:21 -03:00
|
|
|
"""Basic definitions for structuring experiments."""
|
|
|
|
|
|
2024-12-03 17:50:27 -03:00
|
|
|
import logging
|
2024-11-28 09:46:21 -03:00
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
|
from collections.abc import Iterable
|
2024-12-03 17:50:27 -03:00
|
|
|
from typing import Optional
|
2024-11-28 09:46:21 -03:00
|
|
|
|
2024-12-02 17:10:18 -03:00
|
|
|
from typing_extensions import Generic, TypeVar
|
2024-11-28 09:46:21 -03:00
|
|
|
|
2024-12-03 17:50:27 -03:00
|
|
|
from benchmarks.core.utils import await_predicate
|
|
|
|
|
|
2024-11-28 09:46:21 -03:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Experiment(ABC):
|
2024-12-03 17:50:27 -03:00
|
|
|
"""Base interface for an executable :class:`Experiment`."""
|
|
|
|
|
|
2024-11-28 09:46:21 -03:00
|
|
|
@abstractmethod
|
|
|
|
|
def run(self):
|
|
|
|
|
"""Synchronously runs the experiment, blocking the current thread until it's done."""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TExperiment = TypeVar('TExperiment', bound=Experiment)
|
|
|
|
|
|
|
|
|
|
|
2024-12-03 17:50:27 -03:00
|
|
|
class ExperimentComponent(ABC):
|
|
|
|
|
"""An :class:`ExperimentComponent` is a part of the environment for an experiment. These could be databases,
|
|
|
|
|
network nodes, etc."""
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def is_ready(self) -> bool:
|
|
|
|
|
"""Returns whether this component is ready or not."""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ExperimentEnvironment:
|
|
|
|
|
"""An :class:`ExperimentEnvironment` is a collection of :class:`ExperimentComponent`s that must be ready before
|
|
|
|
|
an :class:`Experiment` can execute."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, components: Iterable[ExperimentComponent], polling_interval: float = 0):
|
|
|
|
|
self.components = components
|
|
|
|
|
self.polling_interval = polling_interval
|
|
|
|
|
|
|
|
|
|
def await_ready(self, timeout: float = 0) -> bool:
|
|
|
|
|
"""Awaits for all components to be ready, or until a timeout is reached."""
|
|
|
|
|
# TODO we should probably have per-component timeouts, or at least provide feedback
|
|
|
|
|
# as to what was the completion state of each component.
|
|
|
|
|
if not await_predicate(
|
|
|
|
|
lambda: all(component.is_ready() for component in self.components),
|
|
|
|
|
timeout=timeout,
|
|
|
|
|
polling_interval=self.polling_interval,
|
|
|
|
|
):
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def run(self, experiment: Experiment):
|
|
|
|
|
"""Runs the :class:`Experiment` within this :class:`ExperimentEnvironment`."""
|
|
|
|
|
if not self.await_ready():
|
|
|
|
|
raise RuntimeError('One or more environment components were not get ready in time')
|
|
|
|
|
|
|
|
|
|
experiment.run()
|
|
|
|
|
|
2024-12-03 18:32:48 -03:00
|
|
|
def bind(self, experiment: TExperiment) -> 'BoundExperiment[TExperiment]':
|
|
|
|
|
return BoundExperiment(experiment, self)
|
2024-12-03 17:50:27 -03:00
|
|
|
|
|
|
|
|
|
2024-12-03 18:32:48 -03:00
|
|
|
class BoundExperiment(Experiment, Generic[TExperiment]):
|
2024-12-03 17:50:27 -03:00
|
|
|
def __init__(self, experiment: Experiment, env: ExperimentEnvironment):
|
|
|
|
|
self.experiment = experiment
|
|
|
|
|
self.env = env
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
self.env.run(self.experiment)
|
|
|
|
|
|
|
|
|
|
|
2024-11-28 09:46:21 -03:00
|
|
|
class IteratedExperiment(Experiment, Generic[TExperiment]):
|
2024-12-03 17:50:27 -03:00
|
|
|
"""An :class:`IteratedExperiment` will run a sequence of :class:`Experiment`s."""
|
2024-11-28 09:46:21 -03:00
|
|
|
|
|
|
|
|
def __init__(self, experiments: Iterable[TExperiment]):
|
|
|
|
|
self.successful_runs = 0
|
|
|
|
|
self.failed_runs = 0
|
|
|
|
|
self.experiments = experiments
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
for experiment in self.experiments:
|
|
|
|
|
try:
|
|
|
|
|
experiment.run()
|
|
|
|
|
self.successful_runs += 1
|
|
|
|
|
except Exception as ex:
|
|
|
|
|
self.failed_runs += 1
|
|
|
|
|
logger.error(ex)
|