make leech requests concurrent to reduce noise

This commit is contained in:
gmega 2024-12-10 15:29:33 -03:00
parent b971efcf69
commit 278007cdce
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18

View File

@ -1,10 +1,11 @@
import logging import logging
from multiprocessing.pool import ThreadPool
from typing import Sequence from typing import Sequence
from typing_extensions import Generic, List, Tuple from typing_extensions import Generic, List, Tuple
from benchmarks.core.experiments.experiments import Experiment from benchmarks.core.experiments.experiments import Experiment
from benchmarks.core.network import TInitialMetadata, TNetworkHandle, Node from benchmarks.core.network import TInitialMetadata, TNetworkHandle, Node, DownloadHandle
from benchmarks.core.utils import ExperimentData from benchmarks.core.utils import ExperimentData
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -20,6 +21,7 @@ class StaticDisseminationExperiment(Generic[TNetworkHandle, TInitialMetadata], E
self.nodes = network self.nodes = network
self.seeders = seeders self.seeders = seeders
self.data = data self.data = data
self._pool = ThreadPool(processes=len(network) - len(seeders))
def run(self, run: int = 0): def run(self, run: int = 0):
seeders, leechers = self._split_nodes() seeders, leechers = self._split_nodes()
@ -35,14 +37,24 @@ class StaticDisseminationExperiment(Generic[TNetworkHandle, TInitialMetadata], E
assert cid is not None # to please mypy assert cid is not None # to please mypy
logger.info(f'Setting up leechers: {str(leechers)}') logger.info(f'Setting up leechers: {[str(leecher) for leecher in leechers]}')
downloads = [node.leech(cid) for node in leechers] downloads = list(self._pool.imap_unordered(lambda leecher: leecher.leech(cid), leechers))
logger.info('Now waiting for downloads to complete') logger.info('Now waiting for downloads to complete')
for i, download in enumerate(downloads):
def _await_for_download(element: Tuple[int, DownloadHandle]) -> int:
index, download = element
download.await_for_completion() download.await_for_completion()
return index
for i in self._pool.imap_unordered(_await_for_download, enumerate(downloads)):
logger.info('Download %d / %d completed', i + 1, len(downloads)) logger.info('Download %d / %d completed', i + 1, len(downloads))
logger.info('Shut down thread pool.')
self._pool.close()
self._pool.join()
logger.info('Done.')
def _split_nodes(self) -> Tuple[ def _split_nodes(self) -> Tuple[
List[Node[TNetworkHandle, TInitialMetadata]], List[Node[TNetworkHandle, TInitialMetadata]],
List[Node[TNetworkHandle, TInitialMetadata]] List[Node[TNetworkHandle, TInitialMetadata]]