From 59dcba5282b9e9a87b38b703fe1e94f715865d23 Mon Sep 17 00:00:00 2001 From: gmega Date: Mon, 6 Jan 2025 16:49:16 -0300 Subject: [PATCH] feat: make experiments cleanup downloaded/seeded files after each run --- benchmarks/core/experiments/experiments.py | 29 +++++- .../core/experiments/static_experiment.py | 88 +++++++++---------- .../tests/test_static_experiment.py | 23 +++++ benchmarks/core/network.py | 7 ++ benchmarks/deluge/deluge_node.py | 3 + benchmarks/deluge/tests/test_deluge_node.py | 41 ++++++--- .../tests/test_deluge_static_experiment.py | 58 ++++++------ 7 files changed, 167 insertions(+), 82 deletions(-) diff --git a/benchmarks/core/experiments/experiments.py b/benchmarks/core/experiments/experiments.py index bc0b588..69555bc 100644 --- a/benchmarks/core/experiments/experiments.py +++ b/benchmarks/core/experiments/experiments.py @@ -4,11 +4,10 @@ import logging from abc import ABC, abstractmethod from collections.abc import Iterable from time import time, sleep -from typing import List +from typing import List, Optional from typing_extensions import Generic, TypeVar - logger = logging.getLogger(__name__) @@ -24,6 +23,32 @@ class Experiment(ABC): TExperiment = TypeVar("TExperiment", bound=Experiment) +class ExperimentWithLifecycle(Experiment): + """An :class:`ExperimentWithLifecycle` is a basic implementation of an :class:`Experiment` with overridable + lifecycle hooks.""" + + def setup(self): + """Hook that runs before the experiment.""" + pass + + def run(self): + try: + self.setup() + self.do_run() + self.teardown() + except Exception as ex: + self.teardown(ex) + raise ex + + def do_run(self): + """The main body of the experiment.""" + pass + + def teardown(self, exception: Optional[Exception] = None): + """Hook that runs after the experiment.""" + pass + + class ExperimentComponent(ABC): """An :class:`ExperimentComponent` is a part of the environment for an experiment. These could be databases, network nodes, etc.""" diff --git a/benchmarks/core/experiments/static_experiment.py b/benchmarks/core/experiments/static_experiment.py index ba47e83..e440c81 100644 --- a/benchmarks/core/experiments/static_experiment.py +++ b/benchmarks/core/experiments/static_experiment.py @@ -4,7 +4,7 @@ from typing import Sequence, Optional from typing_extensions import Generic, List, Tuple -from benchmarks.core.experiments.experiments import Experiment +from benchmarks.core.experiments.experiments import ExperimentWithLifecycle from benchmarks.core.logging import RequestEvent, RequestEventType from benchmarks.core.network import ( TInitialMetadata, @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) class StaticDisseminationExperiment( - Generic[TNetworkHandle, TInitialMetadata], Experiment + Generic[TNetworkHandle, TInitialMetadata], ExperimentWithLifecycle ): def __init__( self, @@ -26,7 +26,7 @@ class StaticDisseminationExperiment( seeders: List[int], data: ExperimentData[TInitialMetadata], concurrency: Optional[int] = None, - ): + ) -> None: self.nodes = network self.seeders = seeders self.data = data @@ -35,8 +35,12 @@ class StaticDisseminationExperiment( if concurrency is None else concurrency ) + self._cid: Optional[TNetworkHandle] = None - def run(self, run: int = 0): + def setup(self): + pass + + def do_run(self, run: int = 0): seeders, leechers = self._split_nodes() logger.info( @@ -46,54 +50,21 @@ class StaticDisseminationExperiment( ) with self.data as (meta, data): - cid = None for node in seeders: - logger.info( - RequestEvent( - node="runner", - destination=node.name, - name="seed", - request_id=str(meta), - type=RequestEventType.start, - ) - ) - cid = node.seed(data, meta if cid is None else cid) - logger.info( - RequestEvent( - node="runner", - destination=node.name, - name="seed", - request_id=str(meta), - type=RequestEventType.end, - ) - ) + _log_request(node, "seed", str(meta), RequestEventType.start) + self._cid = node.seed(data, meta if self._cid is None else self._cid) + _log_request(node, "seed", str(meta), RequestEventType.end) - assert cid is not None # to please mypy + assert self._cid is not None # to please mypy logger.info( f"Setting up leechers: {[str(leecher) for leecher in leechers]}" ) def _leech(leecher): - logger.info( - RequestEvent( - node="runner", - destination=leecher.name, - name="leech", - request_id=str(meta), - type=RequestEventType.start, - ) - ) - download = leecher.leech(cid) - logger.info( - RequestEvent( - node="runner", - destination=leecher.name, - name="leech", - request_id=str(meta), - type=RequestEventType.end, - ) - ) + _log_request(leecher, "leech", str(meta), RequestEventType.start) + download = leecher.leech(self._cid) + _log_request(leecher, "leech", str(meta), RequestEventType.end) return download downloads = list(self._pool.imap_unordered(_leech, leechers)) @@ -110,6 +81,18 @@ class StaticDisseminationExperiment( ): logger.info("Download %d / %d completed", i + 1, len(downloads)) + def teardown(self, exception: Optional[Exception] = None): + + def _remove(element: Tuple[int, Node[TNetworkHandle, TInitialMetadata]]): + index, node = element + assert self._cid is not None # to please mypy + node.remove(self._cid) + return index + + try: + for i in self._pool.imap_unordered(_remove, enumerate(self.nodes)): + logger.info("Node %d removed file", i + 1) + finally: logger.info("Shut down thread pool.") self._pool.close() self._pool.join() @@ -124,3 +107,20 @@ class StaticDisseminationExperiment( return [self.nodes[i] for i in self.seeders], [ self.nodes[i] for i in range(0, len(self.nodes)) if i not in self.seeders ] + + +def _log_request( + node: Node[TNetworkHandle, TInitialMetadata], + name: str, + request_id: str, + event_type: RequestEventType, +): + logger.info( + RequestEvent( + node="runner", + destination=node.name, + name=name, + request_id=request_id, + type=event_type, + ) + ) diff --git a/benchmarks/core/experiments/tests/test_static_experiment.py b/benchmarks/core/experiments/tests/test_static_experiment.py index 7c515f5..39ca98a 100644 --- a/benchmarks/core/experiments/tests/test_static_experiment.py +++ b/benchmarks/core/experiments/tests/test_static_experiment.py @@ -25,6 +25,7 @@ class MockNode(Node[MockHandle, str]): self.seeding: Optional[Tuple[MockHandle, Path]] = None self.leeching: Optional[MockHandle] = None self.download_was_awaited = False + self.cleanup_was_called = False @property def name(self) -> str: @@ -42,6 +43,11 @@ class MockNode(Node[MockHandle, str]): self.leeching = handle return MockDownloadHandle(self) + def remove(self, handle: MockHandle): + assert self.seeding is not None + assert self.leeching == handle or self.seeding[0] == handle + self.remove_was_called = True + class MockDownloadHandle(DownloadHandle): def __init__(self, parent: MockNode) -> None: @@ -190,3 +196,20 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger): timestamp=events[5].timestamp, ), ] + + +def test_should_delete_file_from_nodes_at_the_end_of_the_experiment(): + network = mock_network(n=2) + data = MockExperimentData(meta="data", data=Path("/path/to/data")) + seeders = [1] + + experiment = StaticDisseminationExperiment( + seeders=seeders, + network=network, + data=data, + ) + + experiment.run() + + assert network[0].remove_was_called + assert network[1].remove_was_called diff --git a/benchmarks/core/network.py b/benchmarks/core/network.py index 62c0650..7116a02 100644 --- a/benchmarks/core/network.py +++ b/benchmarks/core/network.py @@ -58,6 +58,13 @@ class Node(ABC, Generic[TNetworkHandle, TInitialMetadata]): """ pass + @abstractmethod + def remove(self, handle: TNetworkHandle): + """Removes the file associated with the handle from this node. For seeders, this means the node will stop + seeding it. For leechers, it will stop downloading it. In both cases, the file will be removed from the node's + storage.""" + pass + class SharedFSNode(Node[TNetworkHandle, TInitialMetadata], ABC): """A `SharedFSNode` is a :class:`Node` which shares a network volume with us. This means diff --git a/benchmarks/deluge/deluge_node.py b/benchmarks/deluge/deluge_node.py index e546271..3566ff6 100644 --- a/benchmarks/deluge/deluge_node.py +++ b/benchmarks/deluge/deluge_node.py @@ -116,6 +116,9 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): torrent=handle, ) + def remove(self, handle: Torrent): + self.rpc.core.remove_torrent(handle.info_hash, remove_data=True) + def torrent_info(self, name: str) -> List[Dict[bytes, Any]]: return list(self.rpc.core.get_torrents_status({"name": name}, []).values()) diff --git a/benchmarks/deluge/tests/test_deluge_node.py b/benchmarks/deluge/tests/test_deluge_node.py index 1f7e01d..abc0109 100644 --- a/benchmarks/deluge/tests/test_deluge_node.py +++ b/benchmarks/deluge/tests/test_deluge_node.py @@ -2,22 +2,28 @@ from pathlib import Path import pytest -from benchmarks.core.utils import megabytes +from benchmarks.core.utils import megabytes, await_predicate from benchmarks.deluge.deluge_node import DelugeNode, DelugeMeta from benchmarks.deluge.tracker import Tracker -@pytest.mark.integration def assert_is_seed(node: DelugeNode, name: str, size: int): - response = node.torrent_info(name=name) - assert len(response) == 1 - info = response[0] + def _is_seed(): + response = node.torrent_info(name=name) + if len(response) == 0: + return False - assert info[b"name"] == name.encode( - "utf-8" - ) # not sure that this works for ANY name... - assert info[b"total_size"] == size - assert info[b"is_seed"] + assert len(response) == 1 + info = response[0] + + assert info[b"name"] == name.encode( + "utf-8" + ) # not sure that this works for ANY name... + assert info[b"total_size"] == size + assert info[b"is_seed"] + return True + + assert await_predicate(_is_seed, timeout=5) @pytest.mark.integration @@ -50,3 +56,18 @@ def test_should_download_files( assert handle.await_for_completion(5) assert_is_seed(deluge_node2, name="dataset1", size=megabytes(1)) + + +@pytest.mark.integration +def test_should_remove_files( + deluge_node1: DelugeNode, temp_random_file: Path, tracker: Tracker +): + assert not deluge_node1.torrent_info(name="dataset1") + + torrent = deluge_node1.seed( + temp_random_file, DelugeMeta(name="dataset1", announce_url=tracker.announce_url) + ) + assert_is_seed(deluge_node1, name="dataset1", size=megabytes(1)) + + deluge_node1.remove(torrent) + assert not deluge_node1.torrent_info(name="dataset1") diff --git a/benchmarks/deluge/tests/test_deluge_static_experiment.py b/benchmarks/deluge/tests/test_deluge_static_experiment.py index b8e4110..2591c5d 100644 --- a/benchmarks/deluge/tests/test_deluge_static_experiment.py +++ b/benchmarks/deluge/tests/test_deluge_static_experiment.py @@ -17,22 +17,25 @@ def test_should_run_with_a_single_seeder( polling_interval=0.5, ) - experiment = env.bind( - StaticDisseminationExperiment( - network=[deluge_node1, deluge_node2, deluge_node3], - seeders=[1], - data=RandomTempData( - size=size, - meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url), - ), - ) + experiment = StaticDisseminationExperiment( + network=[deluge_node1, deluge_node2, deluge_node3], + seeders=[1], + data=RandomTempData( + size=size, + meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url), + ), ) - experiment.run() + env.await_ready() + try: + experiment.setup() + experiment.do_run() - assert_is_seed(deluge_node1, "dataset-1", size) - assert_is_seed(deluge_node2, "dataset-1", size) - assert_is_seed(deluge_node3, "dataset-1", size) + assert_is_seed(deluge_node1, "dataset-1", size) + assert_is_seed(deluge_node2, "dataset-1", size) + assert_is_seed(deluge_node3, "dataset-1", size) + finally: + experiment.teardown() @pytest.mark.integration @@ -45,19 +48,22 @@ def test_should_run_with_multiple_seeders( polling_interval=0.5, ) - experiment = env.bind( - StaticDisseminationExperiment( - network=[deluge_node1, deluge_node2, deluge_node3], - seeders=[1, 2], - data=RandomTempData( - size=size, - meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url), - ), - ) + experiment = StaticDisseminationExperiment( + network=[deluge_node1, deluge_node2, deluge_node3], + seeders=[1, 2], + data=RandomTempData( + size=size, + meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url), + ), ) - experiment.run() + env.await_ready() + try: + experiment.setup() + experiment.do_run() - assert_is_seed(deluge_node1, "dataset-1", size) - assert_is_seed(deluge_node2, "dataset-1", size) - assert_is_seed(deluge_node3, "dataset-1", size) + assert_is_seed(deluge_node1, "dataset-1", size) + assert_is_seed(deluge_node2, "dataset-1", size) + assert_is_seed(deluge_node3, "dataset-1", size) + finally: + experiment.teardown()