feat: make experiments cleanup downloaded/seeded files after each run

This commit is contained in:
gmega 2025-01-06 16:49:16 -03:00
parent 0fa4f99e35
commit 59dcba5282
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
7 changed files with 167 additions and 82 deletions

View File

@ -4,11 +4,10 @@ import logging
from abc import ABC, abstractmethod
from collections.abc import Iterable
from time import time, sleep
from typing import List
from typing import List, Optional
from typing_extensions import Generic, TypeVar
logger = logging.getLogger(__name__)
@ -24,6 +23,32 @@ class Experiment(ABC):
TExperiment = TypeVar("TExperiment", bound=Experiment)
class ExperimentWithLifecycle(Experiment):
"""An :class:`ExperimentWithLifecycle` is a basic implementation of an :class:`Experiment` with overridable
lifecycle hooks."""
def setup(self):
"""Hook that runs before the experiment."""
pass
def run(self):
try:
self.setup()
self.do_run()
self.teardown()
except Exception as ex:
self.teardown(ex)
raise ex
def do_run(self):
"""The main body of the experiment."""
pass
def teardown(self, exception: Optional[Exception] = None):
"""Hook that runs after the experiment."""
pass
class ExperimentComponent(ABC):
"""An :class:`ExperimentComponent` is a part of the environment for an experiment. These could be databases,
network nodes, etc."""

View File

@ -4,7 +4,7 @@ from typing import Sequence, Optional
from typing_extensions import Generic, List, Tuple
from benchmarks.core.experiments.experiments import Experiment
from benchmarks.core.experiments.experiments import ExperimentWithLifecycle
from benchmarks.core.logging import RequestEvent, RequestEventType
from benchmarks.core.network import (
TInitialMetadata,
@ -18,7 +18,7 @@ logger = logging.getLogger(__name__)
class StaticDisseminationExperiment(
Generic[TNetworkHandle, TInitialMetadata], Experiment
Generic[TNetworkHandle, TInitialMetadata], ExperimentWithLifecycle
):
def __init__(
self,
@ -26,7 +26,7 @@ class StaticDisseminationExperiment(
seeders: List[int],
data: ExperimentData[TInitialMetadata],
concurrency: Optional[int] = None,
):
) -> None:
self.nodes = network
self.seeders = seeders
self.data = data
@ -35,8 +35,12 @@ class StaticDisseminationExperiment(
if concurrency is None
else concurrency
)
self._cid: Optional[TNetworkHandle] = None
def run(self, run: int = 0):
def setup(self):
pass
def do_run(self, run: int = 0):
seeders, leechers = self._split_nodes()
logger.info(
@ -46,54 +50,21 @@ class StaticDisseminationExperiment(
)
with self.data as (meta, data):
cid = None
for node in seeders:
logger.info(
RequestEvent(
node="runner",
destination=node.name,
name="seed",
request_id=str(meta),
type=RequestEventType.start,
)
)
cid = node.seed(data, meta if cid is None else cid)
logger.info(
RequestEvent(
node="runner",
destination=node.name,
name="seed",
request_id=str(meta),
type=RequestEventType.end,
)
)
_log_request(node, "seed", str(meta), RequestEventType.start)
self._cid = node.seed(data, meta if self._cid is None else self._cid)
_log_request(node, "seed", str(meta), RequestEventType.end)
assert cid is not None # to please mypy
assert self._cid is not None # to please mypy
logger.info(
f"Setting up leechers: {[str(leecher) for leecher in leechers]}"
)
def _leech(leecher):
logger.info(
RequestEvent(
node="runner",
destination=leecher.name,
name="leech",
request_id=str(meta),
type=RequestEventType.start,
)
)
download = leecher.leech(cid)
logger.info(
RequestEvent(
node="runner",
destination=leecher.name,
name="leech",
request_id=str(meta),
type=RequestEventType.end,
)
)
_log_request(leecher, "leech", str(meta), RequestEventType.start)
download = leecher.leech(self._cid)
_log_request(leecher, "leech", str(meta), RequestEventType.end)
return download
downloads = list(self._pool.imap_unordered(_leech, leechers))
@ -110,6 +81,18 @@ class StaticDisseminationExperiment(
):
logger.info("Download %d / %d completed", i + 1, len(downloads))
def teardown(self, exception: Optional[Exception] = None):
def _remove(element: Tuple[int, Node[TNetworkHandle, TInitialMetadata]]):
index, node = element
assert self._cid is not None # to please mypy
node.remove(self._cid)
return index
try:
for i in self._pool.imap_unordered(_remove, enumerate(self.nodes)):
logger.info("Node %d removed file", i + 1)
finally:
logger.info("Shut down thread pool.")
self._pool.close()
self._pool.join()
@ -124,3 +107,20 @@ class StaticDisseminationExperiment(
return [self.nodes[i] for i in self.seeders], [
self.nodes[i] for i in range(0, len(self.nodes)) if i not in self.seeders
]
def _log_request(
node: Node[TNetworkHandle, TInitialMetadata],
name: str,
request_id: str,
event_type: RequestEventType,
):
logger.info(
RequestEvent(
node="runner",
destination=node.name,
name=name,
request_id=request_id,
type=event_type,
)
)

View File

@ -25,6 +25,7 @@ class MockNode(Node[MockHandle, str]):
self.seeding: Optional[Tuple[MockHandle, Path]] = None
self.leeching: Optional[MockHandle] = None
self.download_was_awaited = False
self.cleanup_was_called = False
@property
def name(self) -> str:
@ -42,6 +43,11 @@ class MockNode(Node[MockHandle, str]):
self.leeching = handle
return MockDownloadHandle(self)
def remove(self, handle: MockHandle):
assert self.seeding is not None
assert self.leeching == handle or self.seeding[0] == handle
self.remove_was_called = True
class MockDownloadHandle(DownloadHandle):
def __init__(self, parent: MockNode) -> None:
@ -190,3 +196,20 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger):
timestamp=events[5].timestamp,
),
]
def test_should_delete_file_from_nodes_at_the_end_of_the_experiment():
network = mock_network(n=2)
data = MockExperimentData(meta="data", data=Path("/path/to/data"))
seeders = [1]
experiment = StaticDisseminationExperiment(
seeders=seeders,
network=network,
data=data,
)
experiment.run()
assert network[0].remove_was_called
assert network[1].remove_was_called

View File

@ -58,6 +58,13 @@ class Node(ABC, Generic[TNetworkHandle, TInitialMetadata]):
"""
pass
@abstractmethod
def remove(self, handle: TNetworkHandle):
"""Removes the file associated with the handle from this node. For seeders, this means the node will stop
seeding it. For leechers, it will stop downloading it. In both cases, the file will be removed from the node's
storage."""
pass
class SharedFSNode(Node[TNetworkHandle, TInitialMetadata], ABC):
"""A `SharedFSNode` is a :class:`Node` which shares a network volume with us. This means

View File

@ -116,6 +116,9 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent):
torrent=handle,
)
def remove(self, handle: Torrent):
self.rpc.core.remove_torrent(handle.info_hash, remove_data=True)
def torrent_info(self, name: str) -> List[Dict[bytes, Any]]:
return list(self.rpc.core.get_torrents_status({"name": name}, []).values())

View File

@ -2,22 +2,28 @@ from pathlib import Path
import pytest
from benchmarks.core.utils import megabytes
from benchmarks.core.utils import megabytes, await_predicate
from benchmarks.deluge.deluge_node import DelugeNode, DelugeMeta
from benchmarks.deluge.tracker import Tracker
@pytest.mark.integration
def assert_is_seed(node: DelugeNode, name: str, size: int):
response = node.torrent_info(name=name)
assert len(response) == 1
info = response[0]
def _is_seed():
response = node.torrent_info(name=name)
if len(response) == 0:
return False
assert info[b"name"] == name.encode(
"utf-8"
) # not sure that this works for ANY name...
assert info[b"total_size"] == size
assert info[b"is_seed"]
assert len(response) == 1
info = response[0]
assert info[b"name"] == name.encode(
"utf-8"
) # not sure that this works for ANY name...
assert info[b"total_size"] == size
assert info[b"is_seed"]
return True
assert await_predicate(_is_seed, timeout=5)
@pytest.mark.integration
@ -50,3 +56,18 @@ def test_should_download_files(
assert handle.await_for_completion(5)
assert_is_seed(deluge_node2, name="dataset1", size=megabytes(1))
@pytest.mark.integration
def test_should_remove_files(
deluge_node1: DelugeNode, temp_random_file: Path, tracker: Tracker
):
assert not deluge_node1.torrent_info(name="dataset1")
torrent = deluge_node1.seed(
temp_random_file, DelugeMeta(name="dataset1", announce_url=tracker.announce_url)
)
assert_is_seed(deluge_node1, name="dataset1", size=megabytes(1))
deluge_node1.remove(torrent)
assert not deluge_node1.torrent_info(name="dataset1")

View File

@ -17,22 +17,25 @@ def test_should_run_with_a_single_seeder(
polling_interval=0.5,
)
experiment = env.bind(
StaticDisseminationExperiment(
network=[deluge_node1, deluge_node2, deluge_node3],
seeders=[1],
data=RandomTempData(
size=size,
meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url),
),
)
experiment = StaticDisseminationExperiment(
network=[deluge_node1, deluge_node2, deluge_node3],
seeders=[1],
data=RandomTempData(
size=size,
meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url),
),
)
experiment.run()
env.await_ready()
try:
experiment.setup()
experiment.do_run()
assert_is_seed(deluge_node1, "dataset-1", size)
assert_is_seed(deluge_node2, "dataset-1", size)
assert_is_seed(deluge_node3, "dataset-1", size)
assert_is_seed(deluge_node1, "dataset-1", size)
assert_is_seed(deluge_node2, "dataset-1", size)
assert_is_seed(deluge_node3, "dataset-1", size)
finally:
experiment.teardown()
@pytest.mark.integration
@ -45,19 +48,22 @@ def test_should_run_with_multiple_seeders(
polling_interval=0.5,
)
experiment = env.bind(
StaticDisseminationExperiment(
network=[deluge_node1, deluge_node2, deluge_node3],
seeders=[1, 2],
data=RandomTempData(
size=size,
meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url),
),
)
experiment = StaticDisseminationExperiment(
network=[deluge_node1, deluge_node2, deluge_node3],
seeders=[1, 2],
data=RandomTempData(
size=size,
meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url),
),
)
experiment.run()
env.await_ready()
try:
experiment.setup()
experiment.do_run()
assert_is_seed(deluge_node1, "dataset-1", size)
assert_is_seed(deluge_node2, "dataset-1", size)
assert_is_seed(deluge_node3, "dataset-1", size)
assert_is_seed(deluge_node1, "dataset-1", size)
assert_is_seed(deluge_node2, "dataset-1", size)
assert_is_seed(deluge_node3, "dataset-1", size)
finally:
experiment.teardown()