add iterated experiment

This commit is contained in:
gmega 2024-11-28 09:46:21 -03:00
parent 205d12ebe7
commit eab4759b93
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
6 changed files with 73 additions and 37 deletions

View File

@ -7,7 +7,7 @@ from typing import Annotated
from pydantic import BaseModel, IPvAnyAddress, AfterValidator
from typing_extensions import Generic
from benchmarks.core.experiments.experiment import TExperiment
from benchmarks.core.experiments.experiments import TExperiment
def drop_config_suffix(name: str) -> str:

View File

@ -1,17 +0,0 @@
"""Basic definitions for structuring experiments."""
from abc import ABC, abstractmethod
from mypy.graph_utils import TypeVar
class Experiment(ABC):
"""An :class:`Experiment` is an arbitrary piece of code that can be run and measured."""
@abstractmethod
def run(self):
"""Synchronously runs the experiment, blocking the current thread until it's done."""
pass
TExperiment = TypeVar('TExperiment', bound=Experiment)

View File

@ -0,0 +1,40 @@
"""Basic definitions for structuring experiments."""
from abc import ABC, abstractmethod
from collections.abc import Iterable
from mypy.graph_utils import TypeVar
import logging
from typing_extensions import Generic
logger = logging.getLogger(__name__)
class Experiment(ABC):
@abstractmethod
def run(self):
"""Synchronously runs the experiment, blocking the current thread until it's done."""
pass
TExperiment = TypeVar('TExperiment', bound=Experiment)
class IteratedExperiment(Experiment, Generic[TExperiment]):
"""An :class:`IteratedExperiment` will a sequence of :class:`Experiment`s."""
def __init__(self, experiments: Iterable[TExperiment]):
self.successful_runs = 0
self.failed_runs = 0
self.experiments = experiments
def run(self):
for experiment in self.experiments:
try:
experiment.run()
self.successful_runs += 1
except Exception as ex:
self.failed_runs += 1
logger.error(ex)

View File

@ -1,6 +1,6 @@
from typing_extensions import Generic, List
from benchmarks.core.experiments.experiment import Experiment
from benchmarks.core.experiments.experiments import Experiment
from benchmarks.core.network import TInitialMetadata, TNetworkHandle, Node
from benchmarks.core.utils import ExperimentData
@ -16,7 +16,7 @@ class StaticDisseminationExperiment(Generic[TNetworkHandle, TInitialMetadata], E
self.seeders = seeders
self.data = data
def run(self):
def run(self, run: int = 0):
seeders, leechers = (
[
self.nodes[i]

View File

@ -7,6 +7,7 @@ from torrentool.torrent import Torrent
from urllib3.util import parse_url
from benchmarks.core.config import Host, ExperimentBuilder
from benchmarks.core.experiments.experiments import IteratedExperiment
from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
from benchmarks.core.utils import sample, RandomTempData
from benchmarks.deluge.deluge_node import DelugeMeta, DelugeNode as RealDelugeNode
@ -38,10 +39,11 @@ class DelugeNodeSetConfig(BaseModel):
return self
DelugeDisseminationExperiment = StaticDisseminationExperiment[Torrent, DelugeMeta]
DelugeDisseminationExperiment = IteratedExperiment[StaticDisseminationExperiment[Torrent, DelugeMeta]]
class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]):
repetitions: int = Field(gt=0)
file_size: int = Field(gt=0)
seeders: int = Field(gt=0)
shared_volume_path: Path
@ -50,17 +52,23 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]):
def build(self) -> DelugeDisseminationExperiment:
nodes = self.nodes.nodes if isinstance(self.nodes, DelugeNodeSetConfig) else self.nodes
return StaticDisseminationExperiment(
network=[
RealDelugeNode(
name=f'deluge-{i}',
volume=self.shared_volume_path / f'deluge-{i}',
daemon_port=node.daemon_port,
daemon_address=str(node.address.address),
)
for i, node in enumerate(nodes)
],
seeders=list(islice(sample(len(nodes)), self.seeders)),
data=RandomTempData(size=self.file_size,
meta=DelugeMeta('dataset-1', announce_url=parse_url(str(self.tracker_announce_url))))
repetitions = (
StaticDisseminationExperiment(
network=[
RealDelugeNode(
name=f'deluge-{i}',
volume=self.shared_volume_path / f'deluge-{i}',
daemon_port=node.daemon_port,
daemon_address=str(node.address.address),
)
for i, node in enumerate(nodes)
],
seeders=list(islice(sample(len(nodes)), self.seeders)),
data=RandomTempData(size=self.file_size,
meta=DelugeMeta(f'dataset-{experiment_run}',
announce_url=parse_url(str(self.tracker_announce_url))))
)
for experiment_run in range(self.repetitions)
)
return IteratedExperiment(repetitions)

View File

@ -1,10 +1,12 @@
from io import StringIO
from typing import cast
from unittest.mock import patch
import yaml
from benchmarks.core.config import Host
from benchmarks.deluge.config import DelugeNodeSetConfig, DelugeNodeConfig, DelugeExperimentConfig
from benchmarks.deluge.deluge_node import DelugeNode
def test_should_expand_node_sets_into_simple_nodes():
@ -42,6 +44,7 @@ def test_should_expand_node_sets_into_simple_nodes():
def test_should_build_experiment_from_config():
config_file = StringIO("""
deluge_experiment:
repetitions: 3
seeders: 3
tracker_announce_url: http://localhost:2020/announce
file_size: 1024
@ -59,8 +62,10 @@ def test_should_build_experiment_from_config():
# Need to patch mkdir, or we'll try to actually create the folder when DelugeNode gets initialized.
with patch('pathlib.Path.mkdir'):
experiment = config.build()
assert len(experiment.nodes) == 10
repetitions = list(experiment.experiments)
assert len(repetitions) == 3
assert len(repetitions[0].nodes) == 10
assert cast(DelugeNode, repetitions[0].nodes[5]).daemon_args['port'] == 6890