import base64 import logging import socket from dataclasses import dataclass from io import BytesIO from typing import List, Optional, Self, Dict, Any import pathvalidate from deluge_client import DelugeRPCClient from deluge_client.client import RemoteException from tenacity import ( retry, wait_exponential, stop_after_attempt, retry_if_not_exception_type, after_log, ) from tenacity.stop import stop_base from tenacity.wait import wait_base from torrentool.torrent import Torrent from urllib3.util import Url from benchmarks.core.concurrency import await_predicate from benchmarks.core.experiments.experiments import ExperimentComponent from benchmarks.core.network import DownloadHandle, Node from benchmarks.deluge.agent.deluge_agent_client import DelugeAgentClient logger = logging.getLogger(__name__) STOP_POLICY = stop_after_attempt(10) WAIT_POLICY = wait_exponential(exp_base=2, min=4, max=16) @dataclass(frozen=True) class DelugeMeta: """:class:`DelugeMeta` represents the initial metadata required so that a :class:`DelugeNode` can introduce a file into the network, becoming its initial seeder.""" name: str announce_url: Url class DelugeNode(Node[Torrent, DelugeMeta], ExperimentComponent): def __init__( self, name: str, daemon_port: int, agent: DelugeAgentClient, daemon_address: str = "localhost", daemon_username: str = "user", daemon_password: str = "password", ) -> None: if not pathvalidate.is_valid_filename(name): raise ValueError(f'Node name must be a valid filename (bad name: "{name}")') self._name = name self._rpc: Optional[DelugeRPCClient] = None self.daemon_args = { "host": daemon_address, "port": daemon_port, "username": daemon_username, "password": daemon_password, } self.agent = agent @property def name(self) -> str: return self._name def wipe_all_torrents(self): torrent_ids = list(self.rpc.core.get_torrents_status({}, []).keys()) if torrent_ids: errors = self.rpc.core.remove_torrents(torrent_ids, remove_data=True) if errors: raise Exception(f"There were errors removing torrents: {errors}") def genseed( self, size: int, seed: int, meta: DelugeMeta, ) -> Torrent: torrent = self.agent.generate(size, seed, meta.name) torrent.announce_urls = [str(meta.announce_url)] self.rpc.core.add_torrent_file( filename=f"{meta.name}.torrent", filedump=self._b64dump(torrent), options=dict(), ) return torrent def leech(self, handle: Torrent) -> DownloadHandle: self.rpc.core.add_torrent_file( filename=f"{handle.name}.torrent", filedump=self._b64dump(handle), options=dict(), ) return DelugeDownloadHandle( node=self, torrent=handle, ) def remove(self, handle: Torrent): try: self.rpc.core.remove_torrent(handle.info_hash, remove_data=True) return True except RemoteException as ex: # DelugeRPCClient creates remote exception types dynamically, so there's # actually no way of testing for them other than this. exception_type = str(ex.__class__) if "deluge_client.client.InvalidTorrentError" in exception_type: # This might happen when we retry a failed delete - maybe we got a bad response back, # but the node managed to delete it already. logger.warning(f"Torrent {handle.name} was not found on {self.name}.") return False else: raise ex def torrent_info(self, name: str) -> List[Dict[bytes, Any]]: return list(self.rpc.core.get_torrents_status({"name": name}, []).values()) @property def rpc(self) -> DelugeRPCClient: if self._rpc is None: self.connect() return self._rpc @retry( stop=STOP_POLICY, wait=WAIT_POLICY, after=after_log(logger, logging.WARNING), ) def connect(self) -> Self: return self._raw_connect() def _raw_connect(self): client = DelugeRPCClient(**self.daemon_args) client.connect() self._rpc = ResilientCallWrapper( client, wait_policy=WAIT_POLICY, stop_policy=STOP_POLICY, ) return self def is_ready(self) -> bool: try: self._raw_connect() return True except (ConnectionRefusedError, socket.gaierror): return False @staticmethod def _b64dump(handle: Torrent) -> bytes: buffer = BytesIO() buffer.write(handle.to_string()) return base64.b64encode(buffer.getvalue()) def __str__(self): return f"DelugeNode({self.name}, {self.daemon_args['host']}:{self.daemon_args['port']})" class ResilientCallWrapper: def __init__(self, node: Any, wait_policy: wait_base, stop_policy: stop_base): self.node = node self.wait_policy = wait_policy self.stop_policy = stop_policy def __call__(self, *args, **kwargs): @retry( wait=self.wait_policy, stop=self.stop_policy, retry=retry_if_not_exception_type(RemoteException), after=after_log(logger, logging.WARNING), ) def _resilient_wrapper(): return self.node(*args, **kwargs) return _resilient_wrapper() def __getattr__(self, item): return ResilientCallWrapper( getattr(self.node, item), wait_policy=self.wait_policy, stop_policy=self.stop_policy, ) class DelugeDownloadHandle(DownloadHandle): def __init__(self, torrent: Torrent, node: DelugeNode) -> None: self._node = node self.torrent = torrent @property def node(self) -> DelugeNode: return self._node def await_for_completion(self, timeout: float = 0) -> bool: name = self.torrent.name def _predicate(): response = self.node.rpc.core.get_torrents_status({"name": name}, []) if len(response) > 1: logger.warning( f"Client has multiple torrents matching name {name}. Returning the first one." ) if len(response) == 0: raise ValueError( f"Client {self._node.name} has no torrents matching name {name}." ) status = list(response.values())[0] return status[b"is_seed"] return await_predicate(_predicate, timeout=timeout)