revert to simpler experiment interface, roll out deluge dissemination experiment

This commit is contained in:
gmega 2024-11-25 19:03:54 -03:00
parent b706663219
commit 61e115281e
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
9 changed files with 130 additions and 168 deletions

View File

@ -1,30 +0,0 @@
from abc import ABC, abstractmethod
from typing_extensions import Generic, TypeVar
TRunnableExperiment = TypeVar('TRunnableExperiment', bound='RunnableExperiment')
class Experiment(Generic[TRunnableExperiment]):
"""An :class:`Experiment` represents a self-contained experimental unit which may be repeated
multiple times. :class:`Experiment`s, unlike tests, have the generation of metrics as a side effect
as their main outcome."""
@abstractmethod
def setup(self) -> TRunnableExperiment:
pass
class RunnableExperiment(ABC):
def run(self):
try:
self._run()
finally:
self.teardown()
@abstractmethod
def _run(self):
pass
def teardown(self):
pass

View File

@ -1,22 +1,21 @@
from typing_extensions import Generic, List
from benchmarks.core.network import TInitialMetadata, TNetworkHandle, Node
from benchmarks.core.utils import Sampler, DataGenerator, DataHandle
from benchmarks.core.experiments.experiments import Experiment, RunnableExperiment
from benchmarks.core.utils import ExperimentData
class _RunnableSDE(RunnableExperiment, Generic[TNetworkHandle, TInitialMetadata]):
class StaticDisseminationExperiment(Generic[TNetworkHandle, TInitialMetadata]):
def __init__(
self,
network: List[Node[TNetworkHandle, TInitialMetadata]],
seeders: List[int],
data_handle: DataHandle[TInitialMetadata],
data: ExperimentData[TInitialMetadata],
):
self.nodes = network
self.seeders = seeders
self.data_handle = data_handle
self.data = data
def _run(self):
def run(self):
seeders, leechers = (
[
self.nodes[i]
@ -29,35 +28,11 @@ class _RunnableSDE(RunnableExperiment, Generic[TNetworkHandle, TInitialMetadata]
]
)
handle = self.data_handle.meta
for node in seeders:
handle = node.seed(self.data_handle.data, handle)
with self.data as (meta, data):
handle = meta
for node in seeders:
handle = node.seed(data, handle)
handles = [node.leech(handle) for node in leechers]
for handle in handles:
handle.await_for_completion()
def teardown(self):
self.data_handle.cleanup()
class StaticDisseminationExperiment(Experiment[_RunnableSDE[TNetworkHandle, TInitialMetadata]]):
def __init__(
self,
network: List[Node[TNetworkHandle, TInitialMetadata]],
seeders: int,
sampler: Sampler,
generator: DataGenerator[TInitialMetadata],
):
self.nodes = network
self.sampler = sampler
self.generator = generator
self.seeders = seeders
def setup(self) -> _RunnableSDE[TNetworkHandle, TInitialMetadata]:
sample = self.sampler(len(self.nodes))
return _RunnableSDE(
network=self.nodes,
seeders=[next(sample) for _ in range(0, self.seeders)],
data_handle=self.generator.generate()
)
handles = [node.leech(handle) for node in leechers]
for handle in handles:
handle.await_for_completion()

View File

@ -4,7 +4,7 @@ from typing import Optional, List, Tuple, Union
from benchmarks.core.network import Node, DownloadHandle
from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
from benchmarks.core.experiments.tests.utils import mock_sampler, MockGenerator
from benchmarks.core.experiments.tests.utils import MockExperimentData
@dataclass
@ -52,67 +52,62 @@ def mock_network(n: int) -> List[MockNode]:
def test_should_place_seeders():
network = mock_network(n=13)
generator = MockGenerator(meta='data', data=Path('/path/to/data'))
seeder_indexes = [9, 6, 3]
data = MockExperimentData(meta='data', data=Path('/path/to/data'))
seeders = [9, 6, 3]
experiment = StaticDisseminationExperiment(
seeders=3,
sampler=mock_sampler(seeder_indexes),
seeders=seeders,
network=network,
generator=generator,
data=data,
)
runnable = experiment.setup()
runnable.run()
experiment.run()
actual_seeders = set()
for index, node in enumerate(network):
if node.seeding is not None:
actual_seeders.add(index)
assert node.seeding[0] == MockHandle(name=generator.meta, path=generator.data)
assert node.seeding[0] == MockHandle(name=data.meta, path=data.data)
assert actual_seeders == set(seeder_indexes)
assert actual_seeders == set(seeders)
def test_should_download_at_remaining_nodes():
network = mock_network(n=13)
generator = MockGenerator(meta='data', data=Path('/path/to/data'))
seeder_indexes = [9, 6, 3]
data = MockExperimentData(meta='data', data=Path('/path/to/data'))
seeders = [9, 6, 3]
experiment = StaticDisseminationExperiment(
seeders=3,
sampler=mock_sampler(seeder_indexes),
seeders=seeders,
network=network,
generator=generator,
data=data,
)
runnable = experiment.setup()
runnable.run()
experiment.run()
actual_leechers = set()
for index, node in enumerate(network):
if node.leeching is not None:
assert node.leeching.path == generator.data
assert node.leeching.name == generator.meta
assert node.leeching.path == data.data
assert node.leeching.name == data.meta
assert node.seeding is None
assert node.download_was_awaited
actual_leechers.add(index)
assert actual_leechers == set(range(13)) - set(seeder_indexes)
assert actual_leechers == set(range(13)) - set(seeders)
def test_should_delete_generated_file_at_end_of_experiment():
network = mock_network(n=2)
generator = MockGenerator(meta='data', data=Path('/path/to/data'))
seeder_indexes = [1]
data = MockExperimentData(meta='data', data=Path('/path/to/data'))
seeders = [1]
experiment = StaticDisseminationExperiment(
seeders=1,
sampler=mock_sampler(seeder_indexes),
seeders=seeders,
network=network,
generator=generator,
data=data,
)
runnable = experiment.setup()
runnable.run()
experiment.run()
assert generator.cleanup_called
assert data.cleanup_called

View File

@ -1,30 +1,18 @@
from pathlib import Path
from typing import List
from typing import Tuple
from benchmarks.core.network import TInitialMetadata
from benchmarks.core.utils import Sampler, DataGenerator, DataHandle
from benchmarks.core.utils import ExperimentData
def mock_sampler(elements: List[int]) -> Sampler:
return lambda _: iter(elements)
class MockGenerator(DataGenerator[TInitialMetadata]):
class MockExperimentData(ExperimentData[TInitialMetadata]):
def __init__(self, meta: TInitialMetadata, data: Path):
self.cleanup_called = False
self.meta = meta
self.data = data
def generate(self) -> DataHandle[TInitialMetadata]:
return MockHandle(self.meta, self.data, self)
def __enter__(self) -> Tuple[TInitialMetadata, Path]:
return self.meta, self.data
class MockHandle(DataHandle[TInitialMetadata]):
def __init__(self, meta: TInitialMetadata, data: Path, parent: MockGenerator):
self.meta = meta
self.data = data
self.parent = parent
def cleanup(self):
assert not self.parent.cleanup_called
self.parent.cleanup_called = True
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup_called = True

View File

@ -2,39 +2,50 @@ import os
import random
import tempfile
from abc import ABC, abstractmethod
from contextlib import contextmanager
from contextlib import contextmanager, AbstractContextManager
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Iterator, Tuple
from typing import Iterator, Tuple, ContextManager, Optional
from typing_extensions import Generic
from benchmarks.core.network import TInitialMetadata
# A Sampler samples without replacement from [0, ..., n].
type Sampler = Callable[[int], Iterator[int]]
@dataclass
class DataHandle(Generic[TInitialMetadata], ABC):
"""A :class:`DataHandle` knows how to clean up data and metadata that has been generated
by a :class:`DataGenerator`."""
meta: TInitialMetadata
data: Path
def cleanup(self):
if self.data.exists():
self.data.unlink()
class DataGenerator(Generic[TInitialMetadata], ABC):
"""A :class:`DataGenerator` knows how to generate data for an :class:`Experiment`."""
class ExperimentData(Generic[TInitialMetadata], AbstractContextManager, ABC):
""":class:`ExperimentData` provides a context for providing and wiping out
data and metadata objects, usually within the scope of an experiment. """
@abstractmethod
def generate(self) -> DataHandle[TInitialMetadata]:
"""Generates fresh data and metadata and returns a :class:`DataHandle`."""
def __enter__(self) -> Tuple[TInitialMetadata, Path]:
"""Generates new data and metadata and returns it."""
pass
@abstractmethod
def __exit__(self, exc_type, exc_val, exc_tb):
"""Wipes out data and metadata."""
pass
class RandomTempData(ExperimentData[TInitialMetadata]):
def __init__(self, size: int, meta: TInitialMetadata):
self.meta = meta
self.size = size
self._context: Optional[ContextManager[Tuple[TInitialMetadata, Path]]] = None
def __enter__(self) -> Tuple[TInitialMetadata, Path]:
if self._context is not None:
raise Exception('Cannot enter context twice')
self._context = temp_random_file(self.size, 'data.bin')
return self.meta, self._context.__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
self._context.__exit__(exc_type, exc_val, exc_tb)
@contextmanager
def temp_random_file(size: int, name: str = 'data.bin'):

View File

@ -137,13 +137,13 @@ class DelugeDownloadHandle(DownloadHandle):
def await_for_completion(self, timeout: float = 0) -> bool:
name = self.torrent.name
current = time()
while (time() - current) <= timeout:
while (timeout == 0) or ((time() - current) <= timeout):
response = self.node.rpc.core.get_torrents_status({'name': name}, [])
if len(response) > 1:
logger.warning(f'Client has multiple torrents matching name {name}. Returning the first one.')
status = list(response.values())[0]
if status[b'is_finished']:
if status[b'is_seed']:
return True
return False

View File

@ -6,17 +6,21 @@ from benchmarks.core.utils import megabytes
from benchmarks.deluge.deluge_node import DelugeNode, DelugeMeta
def assert_is_seed(node: DelugeNode, name: str, size: int):
response = node.torrent_info(name=name)
assert len(response) == 1
info = response[0]
assert info[b'name'] == name.encode('utf-8') # not sure that this works for ANY name...
assert info[b'total_size'] == size
assert info[b'is_seed'] == True
def test_should_seed_files(deluge_node1: DelugeNode, temp_random_file: Path, tracker: Url):
assert not deluge_node1.torrent_info(name='dataset1')
deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker))
response = deluge_node1.torrent_info(name='dataset1')
assert len(response) == 1
info = response[0]
assert info[b'name'] == b'dataset1'
assert info[b'total_size'] == megabytes(1)
assert info[b'is_seed'] == True
assert_is_seed(deluge_node1, name='dataset1', size=megabytes(1))
def test_should_download_files(
@ -30,10 +34,4 @@ def test_should_download_files(
assert handle.await_for_completion(5)
response = deluge_node2.torrent_info(name='dataset1')
assert len(response) == 1
info = response[0]
assert info[b'name'] == b'dataset1'
assert info[b'total_size'] == megabytes(1)
assert info[b'is_seed'] == True
assert_is_seed(deluge_node2, name='dataset1', size=megabytes(1))

View File

@ -1,16 +1,40 @@
# from benchmarks.core.utils import megabytes
# from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
# from benchmarks.core.experiments.tests.utils import mock_sampler
#
#
# def test_should_run_with_a_single_seeder(deluge_node1, deluge_node2, deluge_node3):
# network = [deluge_node1, deluge_node2, deluge_node3]
# experiment = StaticDisseminationExperiment(
# network=network,
# seeders=1,
# sampler=mock_sampler([1]),
# generator=RandomTempFileGenerator(size=megabytes(50))
# )
#
# ready = experiment.setup()
# ready.run()
from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
from benchmarks.core.utils import RandomTempData, megabytes
from benchmarks.deluge.deluge_node import DelugeMeta
from benchmarks.deluge.tests.test_deluge_node import assert_is_seed
def test_should_run_with_a_single_seeder(tracker, deluge_node1, deluge_node2, deluge_node3):
size = megabytes(10)
experiment = StaticDisseminationExperiment(
network=[deluge_node1, deluge_node2, deluge_node3],
seeders=[1],
data=RandomTempData(
size=size,
meta=DelugeMeta('dataset-1', announce_url=tracker)
)
)
experiment.run()
assert_is_seed(deluge_node1, 'dataset-1', size)
assert_is_seed(deluge_node2, 'dataset-1', size)
assert_is_seed(deluge_node3, 'dataset-1', size)
def test_should_run_with_multiple_seeders(tracker, deluge_node1, deluge_node2, deluge_node3):
size = megabytes(10)
experiment = StaticDisseminationExperiment(
network=[deluge_node1, deluge_node2, deluge_node3],
seeders=[1, 2],
data=RandomTempData(
size=size,
meta=DelugeMeta('dataset-1', announce_url=tracker)
)
)
experiment.run()
assert_is_seed(deluge_node1, 'dataset-1', size)
assert_is_seed(deluge_node2, 'dataset-1', size)
assert_is_seed(deluge_node3, 'dataset-1', size)

View File

@ -1,6 +1,7 @@
#!/usr/bin/env bash
set -e
# These have to be wiped out before we boot the containers.
rm -rf ./volume/{deluge-1,deluge-2}
# These have to be wiped out before we boot the containers. Note that this will only work
# if you've set up rootless Docker.
rm -rf ./volume/{deluge-1,deluge-2,deluge-3}
docker compose up