feat: add explicit logging for experiment stage boundaries

This commit is contained in:
gmega 2025-02-19 14:00:27 -03:00
parent 242097abd3
commit 9563a96373
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
9 changed files with 254 additions and 69 deletions

View File

@ -8,13 +8,14 @@ from typing_extensions import Generic, List, Tuple
from benchmarks.core.concurrency import ensure_successful
from benchmarks.core.experiments.experiments import ExperimentWithLifecycle
from benchmarks.core.experiments.logging import experiment_stage
from benchmarks.core.network import (
TInitialMetadata,
TNetworkHandle,
Node,
DownloadHandle,
)
from benchmarks.logging.logging import RequestEvent, RequestEventType
from benchmarks.logging.logging import RequestEvent, EventBoundary
logger = logging.getLogger(__name__)
@ -31,12 +32,14 @@ class StaticDisseminationExperiment(
seed: int,
concurrency: Optional[int] = None,
logging_cooldown: int = 0,
experiment_id: Optional[str] = None,
) -> None:
self.nodes = network
self.seeders = seeders
self.meta = meta
self.file_size = file_size
self.seed = seed
self._experiment_id = experiment_id
self._executor = ThreadPoolExecutor(
max_workers=len(network) - len(seeders)
@ -46,66 +49,76 @@ class StaticDisseminationExperiment(
self._cid: Optional[TNetworkHandle] = None
self.logging_cooldown = logging_cooldown
def experiment_id(self) -> Optional[str]:
return self._experiment_id
def setup(self):
pass
def do_run(self, run: int = 0):
seeders, leechers = self._split_nodes()
logger.info(
"Running experiment with %d seeders and %d leechers",
len(seeders),
len(leechers),
)
for node in seeders:
_log_request(node, "genseed", str(self.meta), RequestEventType.start)
self._cid = node.genseed(self.file_size, self.seed, self.meta)
_log_request(node, "genseed", str(self.meta), RequestEventType.end)
assert self._cid is not None # to please mypy
logger.info(f"Setting up leechers: {[str(leecher) for leecher in leechers]}")
def _leech(leecher):
_log_request(leecher, "leech", str(self.meta), RequestEventType.start)
download = leecher.leech(self._cid)
_log_request(leecher, "leech", str(self.meta), RequestEventType.end)
return download
logger.info("Now waiting for downloads to complete")
downloads = ensure_successful(
[self._executor.submit(_leech, leecher) for leecher in leechers]
)
def _await_for_download(
element: Tuple[int, DownloadHandle],
) -> Tuple[int, DownloadHandle]:
index, download = element
if not download.await_for_completion():
raise Exception(
f"Download ({index}, {str(download)}) did not complete in time."
)
with experiment_stage(self, "seeding"):
logger.info(
"Download %d / %d completed (node: %s)",
index + 1,
len(downloads),
download.node.name,
"Running experiment with %d seeders and %d leechers",
len(seeders),
len(leechers),
)
return element
ensure_successful(
[
self._executor.submit(_await_for_download, (i, download))
for i, download in enumerate(downloads)
]
)
for node in seeders:
_log_request(node, "genseed", str(self.meta), EventBoundary.start)
self._cid = node.genseed(self.file_size, self.seed, self.meta)
_log_request(node, "genseed", str(self.meta), EventBoundary.end)
# FIXME this is a hack to ensure that nodes get a chance to log their data before we
# run the teardown hook and remove the torrents.
logger.info(f"Waiting for {self.logging_cooldown} seconds before teardown...")
sleep(self.logging_cooldown)
assert self._cid is not None # to please mypy
with experiment_stage(self, "leeching"):
logger.info(
f"Setting up leechers: {[str(leecher) for leecher in leechers]}"
)
def _leech(leecher):
_log_request(leecher, "leech", str(self.meta), EventBoundary.start)
download = leecher.leech(self._cid)
_log_request(leecher, "leech", str(self.meta), EventBoundary.end)
return download
downloads = ensure_successful(
[self._executor.submit(_leech, leecher) for leecher in leechers]
)
with experiment_stage(self, "downloading"):
def _await_for_download(
element: Tuple[int, DownloadHandle],
) -> Tuple[int, DownloadHandle]:
index, download = element
if not download.await_for_completion():
raise Exception(
f"Download ({index}, {str(download)}) did not complete in time."
)
logger.info(
"Download %d / %d completed (node: %s)",
index + 1,
len(downloads),
download.node.name,
)
return element
ensure_successful(
[
self._executor.submit(_await_for_download, (i, download))
for i, download in enumerate(downloads)
]
)
with experiment_stage(self, "log_cooldown"):
# FIXME this is a hack to ensure that nodes get a chance to log their data before we
# run the teardown hook and remove the torrents.
logger.info(
f"Waiting for {self.logging_cooldown} seconds before teardown..."
)
sleep(self.logging_cooldown)
def teardown(self, exception: Optional[Exception] = None):
logger.info("Tearing down experiment.")
@ -123,12 +136,13 @@ class StaticDisseminationExperiment(
return element
try:
ensure_successful(
[
self._executor.submit(_remove, (i, node))
for i, node in enumerate(self.nodes)
]
)
with experiment_stage(self, "deleting"):
ensure_successful(
[
self._executor.submit(_remove, (i, node))
for i, node in enumerate(self.nodes)
]
)
finally:
logger.info("Shut down thread pool.")
self._executor.shutdown(wait=True)
@ -149,7 +163,7 @@ def _log_request(
node: Node[TNetworkHandle, TInitialMetadata],
name: str,
request_id: str,
event_type: RequestEventType,
event_type: EventBoundary,
):
logger.info(
RequestEvent(

View File

@ -17,6 +17,12 @@ logger = logging.getLogger(__name__)
class Experiment(ABC):
"""Base interface for an executable :class:`Experiment`."""
@abstractmethod
def experiment_id(self) -> Optional[str]:
"""A meaningful identifier for the experiment. What this means is experiment-specific.
Anonymous experiments can simply return None."""
pass
@abstractmethod
def run(self):
"""Synchronously runs the experiment, blocking the current thread until it's done."""
@ -28,7 +34,7 @@ TExperiment = TypeVar("TExperiment", bound=Experiment)
ExperimentBuilder = Builder[TExperiment]
class ExperimentWithLifecycle(Experiment):
class ExperimentWithLifecycle(Experiment, ABC):
"""An :class:`ExperimentWithLifecycle` is a basic implementation of an :class:`Experiment` with overridable
lifecycle hooks."""
@ -131,5 +137,8 @@ class BoundExperiment(Experiment, Generic[TExperiment]):
self.experiment = experiment
self.env = env
def experiment_id(self) -> Optional[str]:
return self.experiment.experiment_id()
def run(self):
self.env.run(self.experiment)

View File

@ -25,6 +25,9 @@ class IteratedExperiment(Experiment, Generic[TExperiment]):
self.raise_when_failures = raise_when_failures
self.experiments = experiments
def experiment_id(self) -> str:
return self.experiment_set_id
def run(self):
for i, experiment in enumerate(self.experiments):
start = time.time()

View File

@ -0,0 +1,39 @@
from contextlib import contextmanager
import logging
from benchmarks.core.experiments.experiments import Experiment
from benchmarks.logging.logging import ExperimentStage, EventBoundary
logger = logging.getLogger(__name__)
@contextmanager
def experiment_stage(experiment: Experiment, name: str):
logger.info(
ExperimentStage(
name=experiment.experiment_id() or "",
stage=name,
type=EventBoundary.start,
)
)
try:
yield
except Exception as exc:
logger.info(
ExperimentStage(
name=experiment.experiment_id() or "",
stage=name,
type=EventBoundary.end,
error=str(exc),
)
)
raise
logger.info(
ExperimentStage(
name=experiment.experiment_id() or "",
stage=name,
type=EventBoundary.end,
)
)

View File

@ -1,5 +1,5 @@
from time import sleep
from typing import List
from typing import List, Optional
from benchmarks.core.experiments.experiments import (
ExperimentComponent,
@ -59,6 +59,9 @@ class ExperimentThatReliesOnComponents(Experiment):
def __init__(self, components: List[ExperimentComponent]):
self.components = components
def experiment_id(self) -> Optional[str]:
return None
def run(self):
assert all(component.is_ready() for component in self.components)

View File

@ -1,3 +1,5 @@
from typing import Optional
from benchmarks.core.experiments.experiments import Experiment
from benchmarks.core.experiments.iterated_experiment import IteratedExperiment
@ -6,6 +8,9 @@ class SimpleExperiment(Experiment):
def __init__(self):
self.ran = False
def experiment_id(self) -> Optional[str]:
return None
def run(self):
self.ran = True
@ -27,6 +32,9 @@ def test_should_run_experiment_repetitions():
def test_should_register_failed_repetitions():
class FailingExperiment(Experiment):
def experiment_id(self) -> Optional[str]:
return None
def run(self):
raise RuntimeError("This experiment failed.")

View File

@ -0,0 +1,102 @@
from io import StringIO
from unittest.mock import patch
from benchmarks.core.experiments.experiments import Experiment
from benchmarks.core.experiments.logging import experiment_stage
from benchmarks.logging.logging import LogParser, ExperimentStage, EventBoundary
class SomeExperiment(Experiment):
def __init__(self, should_raise=False):
self.should_raise = should_raise
def experiment_id(self):
return "some-experiment"
def run(self):
with experiment_stage(self, "stage1"):
pass
with experiment_stage(self, "stage2"):
if self.should_raise:
raise RuntimeError("Error in experiment")
def test_should_log_experiment_stages(mock_logger):
logger, output = mock_logger
with patch("benchmarks.core.experiments.logging.logger", logger):
experiment = SomeExperiment()
experiment.run()
parser = LogParser()
parser.register(ExperimentStage)
events = list(parser.parse(StringIO(output.getvalue())))
assert events == [
ExperimentStage(
name="some-experiment",
stage="stage1",
type=EventBoundary.start,
timestamp=events[0].timestamp,
),
ExperimentStage(
name="some-experiment",
stage="stage1",
type=EventBoundary.end,
timestamp=events[1].timestamp,
),
ExperimentStage(
name="some-experiment",
stage="stage2",
type=EventBoundary.start,
timestamp=events[2].timestamp,
),
ExperimentStage(
name="some-experiment",
stage="stage2",
type=EventBoundary.end,
timestamp=events[3].timestamp,
),
]
def test_should_log_errors_when_thrown(mock_logger):
logger, output = mock_logger
with patch("benchmarks.core.experiments.logging.logger", logger):
experiment = SomeExperiment(should_raise=True)
try:
experiment.run()
except RuntimeError:
pass
parser = LogParser()
parser.register(ExperimentStage)
events = list(parser.parse(StringIO(output.getvalue())))
assert events == [
ExperimentStage(
name="some-experiment",
stage="stage1",
type=EventBoundary.start,
timestamp=events[0].timestamp,
),
ExperimentStage(
name="some-experiment",
stage="stage1",
type=EventBoundary.end,
timestamp=events[1].timestamp,
),
ExperimentStage(
name="some-experiment",
stage="stage2",
type=EventBoundary.start,
timestamp=events[2].timestamp,
),
ExperimentStage(
name="some-experiment",
stage="stage2",
type=EventBoundary.end,
error="Error in experiment",
timestamp=events[3].timestamp,
),
]

View File

@ -8,7 +8,7 @@ from benchmarks.core.experiments.dissemination_experiment.static import (
StaticDisseminationExperiment,
)
from benchmarks.core.network import Node, DownloadHandle
from benchmarks.logging.logging import LogParser, RequestEvent, RequestEventType
from benchmarks.logging.logging import LogParser, RequestEvent, EventBoundary
@dataclass
@ -178,7 +178,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger):
node="runner",
name="genseed",
request_id="dataset-1",
type=RequestEventType.start,
type=EventBoundary.start,
timestamp=events[0].timestamp,
),
RequestEvent(
@ -186,7 +186,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger):
node="runner",
name="genseed",
request_id="dataset-1",
type=RequestEventType.end,
type=EventBoundary.end,
timestamp=events[1].timestamp,
),
RequestEvent(
@ -194,7 +194,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger):
node="runner",
name="leech",
request_id="dataset-1",
type=RequestEventType.start,
type=EventBoundary.start,
timestamp=events[2].timestamp,
),
RequestEvent(
@ -202,7 +202,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger):
node="runner",
name="leech",
request_id="dataset-1",
type=RequestEventType.end,
type=EventBoundary.end,
timestamp=events[3].timestamp,
),
RequestEvent(
@ -210,7 +210,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger):
node="runner",
name="leech",
request_id="dataset-1",
type=RequestEventType.start,
type=EventBoundary.start,
timestamp=events[4].timestamp,
),
RequestEvent(
@ -218,7 +218,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger):
node="runner",
name="leech",
request_id="dataset-1",
type=RequestEventType.end,
type=EventBoundary.end,
timestamp=events[5].timestamp,
),
]

View File

@ -247,7 +247,7 @@ class DownloadMetric(Metric):
dataset_name: str
class RequestEventType(Enum):
class EventBoundary(Enum):
start = "start"
end = "end"
@ -255,7 +255,7 @@ class RequestEventType(Enum):
class RequestEvent(NodeEvent):
destination: NodeId
request_id: str
type: RequestEventType
type: EventBoundary
class ExperimentStatus(Event):
@ -264,6 +264,12 @@ class ExperimentStatus(Event):
error: Optional[str] = None
class ExperimentStage(Event):
stage: str
type: EventBoundary
error: Optional[str] = None
def basic_log_parser() -> LogParser:
"""Constructs a basic log parser which can understand some common log entry types."""
parser = LogParser()
@ -273,4 +279,5 @@ def basic_log_parser() -> LogParser:
parser.register(DownloadMetric)
parser.register(RequestEvent)
parser.register(ExperimentStatus)
parser.register(ExperimentStage)
return parser