From 278007cdce415833d2c94ad4dc613270628a882c Mon Sep 17 00:00:00 2001 From: gmega Date: Tue, 10 Dec 2024 15:29:33 -0300 Subject: [PATCH] make leech requests concurrent to reduce noise --- .../core/experiments/static_experiment.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/benchmarks/core/experiments/static_experiment.py b/benchmarks/core/experiments/static_experiment.py index dba8849..b0c6e67 100644 --- a/benchmarks/core/experiments/static_experiment.py +++ b/benchmarks/core/experiments/static_experiment.py @@ -1,10 +1,11 @@ import logging +from multiprocessing.pool import ThreadPool from typing import Sequence from typing_extensions import Generic, List, Tuple from benchmarks.core.experiments.experiments import Experiment -from benchmarks.core.network import TInitialMetadata, TNetworkHandle, Node +from benchmarks.core.network import TInitialMetadata, TNetworkHandle, Node, DownloadHandle from benchmarks.core.utils import ExperimentData logger = logging.getLogger(__name__) @@ -20,6 +21,7 @@ class StaticDisseminationExperiment(Generic[TNetworkHandle, TInitialMetadata], E self.nodes = network self.seeders = seeders self.data = data + self._pool = ThreadPool(processes=len(network) - len(seeders)) def run(self, run: int = 0): seeders, leechers = self._split_nodes() @@ -35,14 +37,24 @@ class StaticDisseminationExperiment(Generic[TNetworkHandle, TInitialMetadata], E assert cid is not None # to please mypy - logger.info(f'Setting up leechers: {str(leechers)}') - downloads = [node.leech(cid) for node in leechers] + logger.info(f'Setting up leechers: {[str(leecher) for leecher in leechers]}') + downloads = list(self._pool.imap_unordered(lambda leecher: leecher.leech(cid), leechers)) logger.info('Now waiting for downloads to complete') - for i, download in enumerate(downloads): + + def _await_for_download(element: Tuple[int, DownloadHandle]) -> int: + index, download = element download.await_for_completion() + return index + + for i in self._pool.imap_unordered(_await_for_download, enumerate(downloads)): logger.info('Download %d / %d completed', i + 1, len(downloads)) + logger.info('Shut down thread pool.') + self._pool.close() + self._pool.join() + logger.info('Done.') + def _split_nodes(self) -> Tuple[ List[Node[TNetworkHandle, TInitialMetadata]], List[Node[TNetworkHandle, TInitialMetadata]]