[Core] Document all exported core methods with type hints
Standardize docstrings in core.py to google standard. Remove type hints in docstrings in favor of the ones in method signatures. Use function signature type hints in autodoc generated docs. Change Deferred type hints to strings. Older versions of twisted (<21.7) don't support generic Deferred type hinting, this prevents crashes on those versions. Closes: https://github.com/deluge-torrent/deluge/pull/359
This commit is contained in:
parent
aa74261d50
commit
dabb505376
|
@ -14,6 +14,7 @@ import shutil
|
|||
import tempfile
|
||||
import threading
|
||||
from base64 import b64decode, b64encode
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
from urllib.request import URLError, urlopen
|
||||
|
||||
from twisted.internet import defer, reactor, task
|
||||
|
@ -240,13 +241,12 @@ class Core(component.Component):
|
|||
"""Apply libtorrent session settings.
|
||||
|
||||
Args:
|
||||
settings (dict): A dict of lt session settings to apply.
|
||||
|
||||
settings: A dict of lt session settings to apply.
|
||||
"""
|
||||
self.session.apply_settings(settings)
|
||||
|
||||
@staticmethod
|
||||
def _create_peer_id(version):
|
||||
def _create_peer_id(version: str) -> str:
|
||||
"""Create a peer_id fingerprint.
|
||||
|
||||
This creates the peer_id and modifies the release char to identify
|
||||
|
@ -261,11 +261,10 @@ class Core(component.Component):
|
|||
``--DE201b--`` (beta pre-release of v2.0.1)
|
||||
|
||||
Args:
|
||||
version (str): The version string in PEP440 dotted notation.
|
||||
version: The version string in PEP440 dotted notation.
|
||||
|
||||
Returns:
|
||||
str: The formatted peer_id with Deluge prefix e.g. '--DE200s--'
|
||||
|
||||
The formatted peer_id with Deluge prefix e.g. '--DE200s--'
|
||||
"""
|
||||
split = deluge.common.VersionSplit(version)
|
||||
# Fill list with zeros to length of 4 and use lt to create fingerprint.
|
||||
|
@ -314,12 +313,11 @@ class Core(component.Component):
|
|||
log.info('Restoring backup of %s from: %s', filename, filepath_bak)
|
||||
shutil.move(filepath_bak, filepath)
|
||||
|
||||
def _load_session_state(self):
|
||||
def _load_session_state(self) -> dict:
|
||||
"""Loads the libtorrent session state
|
||||
|
||||
Returns:
|
||||
dict: A libtorrent sesion state, empty dict if unable to load it.
|
||||
|
||||
A libtorrent sesion state, empty dict if unable to load it.
|
||||
"""
|
||||
filename = 'session.state'
|
||||
filepath = get_config_dir(filename)
|
||||
|
@ -401,18 +399,19 @@ class Core(component.Component):
|
|||
|
||||
# Exported Methods
|
||||
@export
|
||||
def add_torrent_file_async(self, filename, filedump, options, save_state=True):
|
||||
def add_torrent_file_async(
|
||||
self, filename: str, filedump: str, options: dict, save_state: bool = True
|
||||
) -> 'defer.Deferred[Optional[str]]':
|
||||
"""Adds a torrent file to the session asynchronously.
|
||||
|
||||
Args:
|
||||
filename (str): The filename of the torrent.
|
||||
filedump (str): A base64 encoded string of torrent file contents.
|
||||
options (dict): The options to apply to the torrent upon adding.
|
||||
save_state (bool): If the state should be saved after adding the file.
|
||||
filename: The filename of the torrent.
|
||||
filedump: A base64 encoded string of torrent file contents.
|
||||
options: The options to apply to the torrent upon adding.
|
||||
save_state: If the state should be saved after adding the file.
|
||||
|
||||
Returns:
|
||||
Deferred: The torrent ID or None.
|
||||
|
||||
The torrent ID or None.
|
||||
"""
|
||||
try:
|
||||
filedump = b64decode(filedump)
|
||||
|
@ -433,7 +432,9 @@ class Core(component.Component):
|
|||
return d
|
||||
|
||||
@export
|
||||
def prefetch_magnet_metadata(self, magnet, timeout=30):
|
||||
def prefetch_magnet_metadata(
|
||||
self, magnet: str, timeout: int = 30
|
||||
) -> 'defer.Deferred[Tuple[str, bytes]]':
|
||||
"""Download magnet metadata without adding to Deluge session.
|
||||
|
||||
Used by UIs to get magnet files for selection before adding to session.
|
||||
|
@ -441,12 +442,11 @@ class Core(component.Component):
|
|||
The metadata is bencoded and for transfer base64 encoded.
|
||||
|
||||
Args:
|
||||
magnet (str): The magnet URI.
|
||||
timeout (int): Number of seconds to wait before canceling request.
|
||||
magnet: The magnet URI.
|
||||
timeout: Number of seconds to wait before canceling request.
|
||||
|
||||
Returns:
|
||||
Deferred: A tuple of (torrent_id (str), metadata (str)) for the magnet.
|
||||
|
||||
A tuple of (torrent_id (str), metadata (str)) for the magnet.
|
||||
"""
|
||||
|
||||
def on_metadata(result, result_d):
|
||||
|
@ -461,16 +461,18 @@ class Core(component.Component):
|
|||
return result_d
|
||||
|
||||
@export
|
||||
def add_torrent_file(self, filename, filedump, options):
|
||||
def add_torrent_file(
|
||||
self, filename: str, filedump: Union[str, bytes], options: dict
|
||||
) -> Optional[str]:
|
||||
"""Adds a torrent file to the session.
|
||||
|
||||
Args:
|
||||
filename (str): The filename of the torrent.
|
||||
filedump (str): A base64 encoded string of the torrent file contents.
|
||||
options (dict): The options to apply to the torrent upon adding.
|
||||
filename: The filename of the torrent.
|
||||
filedump: A base64 encoded string of the torrent file contents.
|
||||
options: The options to apply to the torrent upon adding.
|
||||
|
||||
Returns:
|
||||
str: The torrent_id or None.
|
||||
The torrent_id or None.
|
||||
"""
|
||||
try:
|
||||
filedump = b64decode(filedump)
|
||||
|
@ -486,16 +488,17 @@ class Core(component.Component):
|
|||
raise
|
||||
|
||||
@export
|
||||
def add_torrent_files(self, torrent_files):
|
||||
def add_torrent_files(
|
||||
self, torrent_files: List[Tuple[str, Union[str, bytes], dict]]
|
||||
) -> 'defer.Deferred[List[AddTorrentError]]':
|
||||
"""Adds multiple torrent files to the session asynchronously.
|
||||
|
||||
Args:
|
||||
torrent_files (list of tuples): Torrent files as tuple of
|
||||
``(filename, filedump, options)``.
|
||||
torrent_files: Torrent files as tuple of
|
||||
``(filename, filedump, options)``.
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
|
||||
A list of errors (if there were any)
|
||||
"""
|
||||
|
||||
@maybe_coroutine
|
||||
|
@ -515,19 +518,19 @@ class Core(component.Component):
|
|||
return task.deferLater(reactor, 0, add_torrents)
|
||||
|
||||
@export
|
||||
def add_torrent_url(self, url, options, headers=None):
|
||||
"""
|
||||
Adds a torrent from a URL. Deluge will attempt to fetch the torrent
|
||||
def add_torrent_url(
|
||||
self, url: str, options: dict, headers: dict = None
|
||||
) -> 'defer.Deferred[Optional[str]]':
|
||||
"""Adds a torrent from a URL. Deluge will attempt to fetch the torrent
|
||||
from the URL prior to adding it to the session.
|
||||
|
||||
:param url: the URL pointing to the torrent file
|
||||
:type url: string
|
||||
:param options: the options to apply to the torrent on add
|
||||
:type options: dict
|
||||
:param headers: any optional headers to send
|
||||
:type headers: dict
|
||||
Args:
|
||||
url: the URL pointing to the torrent file
|
||||
options: the options to apply to the torrent on add
|
||||
headers: any optional headers to send
|
||||
|
||||
:returns: a Deferred which returns the torrent_id as a str or None
|
||||
Returns:
|
||||
a Deferred which returns the torrent_id as a str or None
|
||||
"""
|
||||
log.info('Attempting to add URL %s', url)
|
||||
|
||||
|
@ -553,55 +556,52 @@ class Core(component.Component):
|
|||
return d
|
||||
|
||||
@export
|
||||
def add_torrent_magnet(self, uri, options):
|
||||
"""
|
||||
Adds a torrent from a magnet link.
|
||||
def add_torrent_magnet(self, uri: str, options: dict) -> str:
|
||||
"""Adds a torrent from a magnet link.
|
||||
|
||||
:param uri: the magnet link
|
||||
:type uri: string
|
||||
:param options: the options to apply to the torrent on add
|
||||
:type options: dict
|
||||
|
||||
:returns: the torrent_id
|
||||
:rtype: string
|
||||
Args:
|
||||
uri: the magnet link
|
||||
options: the options to apply to the torrent on add
|
||||
|
||||
Returns:
|
||||
the torrent_id
|
||||
"""
|
||||
log.debug('Attempting to add by magnet URI: %s', uri)
|
||||
|
||||
return self.torrentmanager.add(magnet=uri, options=options)
|
||||
|
||||
@export
|
||||
def remove_torrent(self, torrent_id, remove_data):
|
||||
def remove_torrent(self, torrent_id: str, remove_data: bool) -> bool:
|
||||
"""Removes a single torrent from the session.
|
||||
|
||||
Args:
|
||||
torrent_id (str): The torrent ID to remove.
|
||||
remove_data (bool): If True, also remove the downloaded data.
|
||||
torrent_id: The torrent ID to remove.
|
||||
remove_data: If True, also remove the downloaded data.
|
||||
|
||||
Returns:
|
||||
bool: True if removed successfully.
|
||||
True if removed successfully.
|
||||
|
||||
Raises:
|
||||
InvalidTorrentError: If the torrent ID does not exist in the session.
|
||||
|
||||
"""
|
||||
log.debug('Removing torrent %s from the core.', torrent_id)
|
||||
return self.torrentmanager.remove(torrent_id, remove_data)
|
||||
|
||||
@export
|
||||
def remove_torrents(self, torrent_ids, remove_data):
|
||||
def remove_torrents(
|
||||
self, torrent_ids: List[str], remove_data: bool
|
||||
) -> 'defer.Deferred[List[Tuple[str, str]]]':
|
||||
"""Remove multiple torrents from the session.
|
||||
|
||||
Args:
|
||||
torrent_ids (list): The torrent IDs to remove.
|
||||
remove_data (bool): If True, also remove the downloaded data.
|
||||
torrent_ids: The torrent IDs to remove.
|
||||
remove_data: If True, also remove the downloaded data.
|
||||
|
||||
Returns:
|
||||
list: An empty list if no errors occurred otherwise the list contains
|
||||
tuples of strings, a torrent ID and an error message. For example:
|
||||
|
||||
[('<torrent_id>', 'Error removing torrent')]
|
||||
An empty list if no errors occurred otherwise the list contains
|
||||
tuples of strings, a torrent ID and an error message. For example:
|
||||
|
||||
[('<torrent_id>', 'Error removing torrent')]
|
||||
"""
|
||||
log.info('Removing %d torrents from core.', len(torrent_ids))
|
||||
|
||||
|
@ -625,17 +625,17 @@ class Core(component.Component):
|
|||
return task.deferLater(reactor, 0, do_remove_torrents)
|
||||
|
||||
@export
|
||||
def get_session_status(self, keys):
|
||||
def get_session_status(self, keys: List[str]) -> Dict[str, Union[int, float]]:
|
||||
"""Gets the session status values for 'keys', these keys are taking
|
||||
from libtorrent's session status.
|
||||
|
||||
See: http://www.rasterbar.com/products/libtorrent/manual.html#status
|
||||
|
||||
:param keys: the keys for which we want values
|
||||
:type keys: list
|
||||
:returns: a dictionary of {key: value, ...}
|
||||
:rtype: dict
|
||||
Args:
|
||||
keys: the keys for which we want values
|
||||
|
||||
Returns:
|
||||
a dictionary of {key: value, ...}
|
||||
"""
|
||||
if not keys:
|
||||
return self.session_status
|
||||
|
@ -656,13 +656,13 @@ class Core(component.Component):
|
|||
return status
|
||||
|
||||
@export
|
||||
def force_reannounce(self, torrent_ids):
|
||||
def force_reannounce(self, torrent_ids: List[str]) -> None:
|
||||
log.debug('Forcing reannouncment to: %s', torrent_ids)
|
||||
for torrent_id in torrent_ids:
|
||||
self.torrentmanager[torrent_id].force_reannounce()
|
||||
|
||||
@export
|
||||
def pause_torrent(self, torrent_id):
|
||||
def pause_torrent(self, torrent_id: str) -> None:
|
||||
"""Pauses a torrent"""
|
||||
log.debug('Pausing: %s', torrent_id)
|
||||
if not isinstance(torrent_id, str):
|
||||
|
@ -671,7 +671,7 @@ class Core(component.Component):
|
|||
self.torrentmanager[torrent_id].pause()
|
||||
|
||||
@export
|
||||
def pause_torrents(self, torrent_ids=None):
|
||||
def pause_torrents(self, torrent_ids: List[str] = None) -> None:
|
||||
"""Pauses a list of torrents"""
|
||||
if not torrent_ids:
|
||||
torrent_ids = self.torrentmanager.get_torrent_list()
|
||||
|
@ -679,27 +679,27 @@ class Core(component.Component):
|
|||
self.pause_torrent(torrent_id)
|
||||
|
||||
@export
|
||||
def connect_peer(self, torrent_id, ip, port):
|
||||
def connect_peer(self, torrent_id: str, ip: str, port: int):
|
||||
log.debug('adding peer %s to %s', ip, torrent_id)
|
||||
if not self.torrentmanager[torrent_id].connect_peer(ip, port):
|
||||
log.warning('Error adding peer %s:%s to %s', ip, port, torrent_id)
|
||||
|
||||
@export
|
||||
def move_storage(self, torrent_ids, dest):
|
||||
def move_storage(self, torrent_ids: List[str], dest: str):
|
||||
log.debug('Moving storage %s to %s', torrent_ids, dest)
|
||||
for torrent_id in torrent_ids:
|
||||
if not self.torrentmanager[torrent_id].move_storage(dest):
|
||||
log.warning('Error moving torrent %s to %s', torrent_id, dest)
|
||||
|
||||
@export
|
||||
def pause_session(self):
|
||||
def pause_session(self) -> None:
|
||||
"""Pause the entire session"""
|
||||
if not self.session.is_paused():
|
||||
self.session.pause()
|
||||
component.get('EventManager').emit(SessionPausedEvent())
|
||||
|
||||
@export
|
||||
def resume_session(self):
|
||||
def resume_session(self) -> None:
|
||||
"""Resume the entire session"""
|
||||
if self.session.is_paused():
|
||||
self.session.resume()
|
||||
|
@ -708,12 +708,12 @@ class Core(component.Component):
|
|||
component.get('EventManager').emit(SessionResumedEvent())
|
||||
|
||||
@export
|
||||
def is_session_paused(self):
|
||||
def is_session_paused(self) -> bool:
|
||||
"""Returns the activity of the session"""
|
||||
return self.session.is_paused()
|
||||
|
||||
@export
|
||||
def resume_torrent(self, torrent_id):
|
||||
def resume_torrent(self, torrent_id: str) -> None:
|
||||
"""Resumes a torrent"""
|
||||
log.debug('Resuming: %s', torrent_id)
|
||||
if not isinstance(torrent_id, str):
|
||||
|
@ -722,7 +722,7 @@ class Core(component.Component):
|
|||
self.torrentmanager[torrent_id].resume()
|
||||
|
||||
@export
|
||||
def resume_torrents(self, torrent_ids=None):
|
||||
def resume_torrents(self, torrent_ids: List[str] = None) -> None:
|
||||
"""Resumes a list of torrents"""
|
||||
if not torrent_ids:
|
||||
torrent_ids = self.torrentmanager.get_torrent_list()
|
||||
|
@ -755,7 +755,9 @@ class Core(component.Component):
|
|||
return status
|
||||
|
||||
@export
|
||||
def get_torrent_status(self, torrent_id, keys, diff=False):
|
||||
def get_torrent_status(
|
||||
self, torrent_id: str, keys: List[str], diff: bool = False
|
||||
) -> dict:
|
||||
torrent_keys, plugin_keys = self.torrentmanager.separate_keys(
|
||||
keys, [torrent_id]
|
||||
)
|
||||
|
@ -770,10 +772,10 @@ class Core(component.Component):
|
|||
|
||||
@export
|
||||
@maybe_coroutine
|
||||
async def get_torrents_status(self, filter_dict, keys, diff=False):
|
||||
"""
|
||||
returns all torrents , optionally filtered by filter_dict.
|
||||
"""
|
||||
async def get_torrents_status(
|
||||
self, filter_dict: dict, keys: List[str], diff: bool = False
|
||||
) -> dict:
|
||||
"""returns all torrents , optionally filtered by filter_dict."""
|
||||
all_keys = not keys
|
||||
torrent_ids = self.filtermanager.filter_torrent_ids(filter_dict)
|
||||
status_dict, plugin_keys = await self.torrentmanager.torrents_status_update(
|
||||
|
@ -786,36 +788,37 @@ class Core(component.Component):
|
|||
return status_dict
|
||||
|
||||
@export
|
||||
def get_filter_tree(self, show_zero_hits=True, hide_cat=None):
|
||||
"""
|
||||
returns {field: [(value,count)] }
|
||||
def get_filter_tree(
|
||||
self, show_zero_hits: bool = True, hide_cat: List[str] = None
|
||||
) -> Dict:
|
||||
"""returns {field: [(value,count)] }
|
||||
for use in sidebar(s)
|
||||
"""
|
||||
return self.filtermanager.get_filter_tree(show_zero_hits, hide_cat)
|
||||
|
||||
@export
|
||||
def get_session_state(self):
|
||||
def get_session_state(self) -> List[str]:
|
||||
"""Returns a list of torrent_ids in the session."""
|
||||
# Get the torrent list from the TorrentManager
|
||||
return self.torrentmanager.get_torrent_list()
|
||||
|
||||
@export
|
||||
def get_config(self):
|
||||
def get_config(self) -> dict:
|
||||
"""Get all the preferences as a dictionary"""
|
||||
return self.config.config
|
||||
|
||||
@export
|
||||
def get_config_value(self, key):
|
||||
def get_config_value(self, key: str) -> Any:
|
||||
"""Get the config value for key"""
|
||||
return self.config.get(key)
|
||||
|
||||
@export
|
||||
def get_config_values(self, keys):
|
||||
def get_config_values(self, keys: List[str]) -> Dict[str, Any]:
|
||||
"""Get the config values for the entered keys"""
|
||||
return {key: self.config.get(key) for key in keys}
|
||||
|
||||
@export
|
||||
def set_config(self, config):
|
||||
def set_config(self, config: Dict[str, Any]):
|
||||
"""Set the config with values from dictionary"""
|
||||
# Load all the values into the configuration
|
||||
for key in config:
|
||||
|
@ -824,21 +827,20 @@ class Core(component.Component):
|
|||
self.config[key] = config[key]
|
||||
|
||||
@export
|
||||
def get_listen_port(self):
|
||||
def get_listen_port(self) -> int:
|
||||
"""Returns the active listen port"""
|
||||
return self.session.listen_port()
|
||||
|
||||
@export
|
||||
def get_proxy(self):
|
||||
def get_proxy(self) -> Dict[str, Any]:
|
||||
"""Returns the proxy settings
|
||||
|
||||
Returns:
|
||||
dict: Contains proxy settings.
|
||||
Proxy settings.
|
||||
|
||||
Notes:
|
||||
Proxy type names:
|
||||
0: None, 1: Socks4, 2: Socks5, 3: Socks5 w Auth, 4: HTTP, 5: HTTP w Auth, 6: I2P
|
||||
|
||||
"""
|
||||
|
||||
settings = self.session.get_settings()
|
||||
|
@ -861,36 +863,38 @@ class Core(component.Component):
|
|||
return proxy_dict
|
||||
|
||||
@export
|
||||
def get_available_plugins(self):
|
||||
def get_available_plugins(self) -> List[str]:
|
||||
"""Returns a list of plugins available in the core"""
|
||||
return self.pluginmanager.get_available_plugins()
|
||||
|
||||
@export
|
||||
def get_enabled_plugins(self):
|
||||
def get_enabled_plugins(self) -> List[str]:
|
||||
"""Returns a list of enabled plugins in the core"""
|
||||
return self.pluginmanager.get_enabled_plugins()
|
||||
|
||||
@export
|
||||
def enable_plugin(self, plugin):
|
||||
def enable_plugin(self, plugin: str) -> 'defer.Deferred[bool]':
|
||||
return self.pluginmanager.enable_plugin(plugin)
|
||||
|
||||
@export
|
||||
def disable_plugin(self, plugin):
|
||||
def disable_plugin(self, plugin: str) -> 'defer.Deferred[bool]':
|
||||
return self.pluginmanager.disable_plugin(plugin)
|
||||
|
||||
@export
|
||||
def force_recheck(self, torrent_ids):
|
||||
def force_recheck(self, torrent_ids: List[str]) -> None:
|
||||
"""Forces a data recheck on torrent_ids"""
|
||||
for torrent_id in torrent_ids:
|
||||
self.torrentmanager[torrent_id].force_recheck()
|
||||
|
||||
@export
|
||||
def set_torrent_options(self, torrent_ids, options):
|
||||
def set_torrent_options(
|
||||
self, torrent_ids: List[str], options: Dict[str, Any]
|
||||
) -> None:
|
||||
"""Sets the torrent options for torrent_ids
|
||||
|
||||
Args:
|
||||
torrent_ids (list): A list of torrent_ids to set the options for.
|
||||
options (dict): A dict of torrent options to set. See
|
||||
torrent_ids: A list of torrent_ids to set the options for.
|
||||
options: A dict of torrent options to set. See
|
||||
``torrent.TorrentOptions`` class for valid keys.
|
||||
"""
|
||||
if 'owner' in options and not self.authmanager.has_account(options['owner']):
|
||||
|
@ -903,12 +907,14 @@ class Core(component.Component):
|
|||
self.torrentmanager[torrent_id].set_options(options)
|
||||
|
||||
@export
|
||||
def set_torrent_trackers(self, torrent_id, trackers):
|
||||
def set_torrent_trackers(
|
||||
self, torrent_id: str, trackers: List[Dict[str, Any]]
|
||||
) -> None:
|
||||
"""Sets a torrents tracker list. trackers will be ``[{"url", "tier"}]``"""
|
||||
return self.torrentmanager[torrent_id].set_trackers(trackers)
|
||||
|
||||
@export
|
||||
def get_magnet_uri(self, torrent_id):
|
||||
def get_magnet_uri(self, torrent_id: str) -> str:
|
||||
return self.torrentmanager[torrent_id].get_magnet_uri()
|
||||
|
||||
@deprecated
|
||||
|
@ -1056,7 +1062,7 @@ class Core(component.Component):
|
|||
self.add_torrent_file(os.path.split(target)[1], filedump, options)
|
||||
|
||||
@export
|
||||
def upload_plugin(self, filename, filedump):
|
||||
def upload_plugin(self, filename: str, filedump: Union[str, bytes]) -> None:
|
||||
"""This method is used to upload new plugins to the daemon. It is used
|
||||
when connecting to the daemon remotely and installing a new plugin on
|
||||
the client side. ``plugin_data`` is a ``xmlrpc.Binary`` object of the file data,
|
||||
|
@ -1074,26 +1080,24 @@ class Core(component.Component):
|
|||
component.get('CorePluginManager').scan_for_plugins()
|
||||
|
||||
@export
|
||||
def rescan_plugins(self):
|
||||
"""
|
||||
Re-scans the plugin folders for new plugins
|
||||
"""
|
||||
def rescan_plugins(self) -> None:
|
||||
"""Re-scans the plugin folders for new plugins"""
|
||||
component.get('CorePluginManager').scan_for_plugins()
|
||||
|
||||
@export
|
||||
def rename_files(self, torrent_id, filenames):
|
||||
"""
|
||||
Rename files in ``torrent_id``. Since this is an asynchronous operation by
|
||||
def rename_files(
|
||||
self, torrent_id: str, filenames: List[Tuple[int, str]]
|
||||
) -> defer.Deferred:
|
||||
"""Rename files in ``torrent_id``. Since this is an asynchronous operation by
|
||||
libtorrent, watch for the TorrentFileRenamedEvent to know when the
|
||||
files have been renamed.
|
||||
|
||||
:param torrent_id: the torrent_id to rename files
|
||||
:type torrent_id: string
|
||||
:param filenames: a list of index, filename pairs
|
||||
:type filenames: ((index, filename), ...)
|
||||
|
||||
:raises InvalidTorrentError: if torrent_id is invalid
|
||||
Args:
|
||||
torrent_id: the torrent_id to rename files
|
||||
filenames: a list of index, filename pairs
|
||||
|
||||
Raises:
|
||||
InvalidTorrentError: if torrent_id is invalid
|
||||
"""
|
||||
if torrent_id not in self.torrentmanager.torrents:
|
||||
raise InvalidTorrentError('torrent_id is not in session')
|
||||
|
@ -1104,21 +1108,20 @@ class Core(component.Component):
|
|||
return task.deferLater(reactor, 0, rename)
|
||||
|
||||
@export
|
||||
def rename_folder(self, torrent_id, folder, new_folder):
|
||||
"""
|
||||
Renames the 'folder' to 'new_folder' in 'torrent_id'. Watch for the
|
||||
def rename_folder(
|
||||
self, torrent_id: str, folder: str, new_folder: str
|
||||
) -> defer.Deferred:
|
||||
"""Renames the 'folder' to 'new_folder' in 'torrent_id'. Watch for the
|
||||
TorrentFolderRenamedEvent which is emitted when the folder has been
|
||||
renamed successfully.
|
||||
|
||||
:param torrent_id: the torrent to rename folder in
|
||||
:type torrent_id: string
|
||||
:param folder: the folder to rename
|
||||
:type folder: string
|
||||
:param new_folder: the new folder name
|
||||
:type new_folder: string
|
||||
|
||||
:raises InvalidTorrentError: if the torrent_id is invalid
|
||||
Args:
|
||||
torrent_id: the torrent to rename folder in
|
||||
folder: the folder to rename
|
||||
new_folder: the new folder name
|
||||
|
||||
Raises:
|
||||
InvalidTorrentError: if the torrent_id is invalid
|
||||
"""
|
||||
if torrent_id not in self.torrentmanager.torrents:
|
||||
raise InvalidTorrentError('torrent_id is not in session')
|
||||
|
@ -1126,7 +1129,7 @@ class Core(component.Component):
|
|||
return self.torrentmanager[torrent_id].rename_folder(folder, new_folder)
|
||||
|
||||
@export
|
||||
def queue_top(self, torrent_ids):
|
||||
def queue_top(self, torrent_ids: List[str]) -> None:
|
||||
log.debug('Attempting to queue %s to top', torrent_ids)
|
||||
# torrent_ids must be sorted in reverse before moving to preserve order
|
||||
for torrent_id in sorted(
|
||||
|
@ -1140,7 +1143,7 @@ class Core(component.Component):
|
|||
log.warning('torrent_id: %s does not exist in the queue', torrent_id)
|
||||
|
||||
@export
|
||||
def queue_up(self, torrent_ids):
|
||||
def queue_up(self, torrent_ids: List[str]) -> None:
|
||||
log.debug('Attempting to queue %s to up', torrent_ids)
|
||||
torrents = (
|
||||
(self.torrentmanager.get_queue_position(torrent_id), torrent_id)
|
||||
|
@ -1165,7 +1168,7 @@ class Core(component.Component):
|
|||
prev_queue_position = queue_position
|
||||
|
||||
@export
|
||||
def queue_down(self, torrent_ids):
|
||||
def queue_down(self, torrent_ids: List[str]) -> None:
|
||||
log.debug('Attempting to queue %s to down', torrent_ids)
|
||||
torrents = (
|
||||
(self.torrentmanager.get_queue_position(torrent_id), torrent_id)
|
||||
|
@ -1190,7 +1193,7 @@ class Core(component.Component):
|
|||
prev_queue_position = queue_position
|
||||
|
||||
@export
|
||||
def queue_bottom(self, torrent_ids):
|
||||
def queue_bottom(self, torrent_ids: List[str]) -> None:
|
||||
log.debug('Attempting to queue %s to bottom', torrent_ids)
|
||||
# torrent_ids must be sorted before moving to preserve order
|
||||
for torrent_id in sorted(
|
||||
|
@ -1204,17 +1207,15 @@ class Core(component.Component):
|
|||
log.warning('torrent_id: %s does not exist in the queue', torrent_id)
|
||||
|
||||
@export
|
||||
def glob(self, path):
|
||||
def glob(self, path: str) -> List[str]:
|
||||
return glob.glob(path)
|
||||
|
||||
@export
|
||||
def test_listen_port(self):
|
||||
"""
|
||||
Checks if the active port is open
|
||||
|
||||
:returns: True if the port is open, False if not
|
||||
:rtype: bool
|
||||
def test_listen_port(self) -> 'defer.Deferred[Optional[bool]]':
|
||||
"""Checks if the active port is open
|
||||
|
||||
Returns:
|
||||
True if the port is open, False if not
|
||||
"""
|
||||
port = self.get_listen_port()
|
||||
url = 'https://deluge-torrent.org/test_port.php?port=%s' % port
|
||||
|
@ -1233,18 +1234,17 @@ class Core(component.Component):
|
|||
return d
|
||||
|
||||
@export
|
||||
def get_free_space(self, path=None):
|
||||
"""
|
||||
Returns the number of free bytes at path
|
||||
def get_free_space(self, path: str = None) -> int:
|
||||
"""Returns the number of free bytes at path
|
||||
|
||||
:param path: the path to check free space at, if None, use the default download location
|
||||
:type path: string
|
||||
Args:
|
||||
path: the path to check free space at, if None, use the default download location
|
||||
|
||||
:returns: the number of free bytes at path
|
||||
:rtype: int
|
||||
|
||||
:raises InvalidPathError: if the path is invalid
|
||||
Returns:
|
||||
the number of free bytes at path
|
||||
|
||||
Raises:
|
||||
InvalidPathError: if the path is invalid
|
||||
"""
|
||||
if not path:
|
||||
path = self.config['download_location']
|
||||
|
@ -1257,46 +1257,40 @@ class Core(component.Component):
|
|||
self.external_ip = external_ip
|
||||
|
||||
@export
|
||||
def get_external_ip(self):
|
||||
"""
|
||||
Returns the external IP address received from libtorrent.
|
||||
"""
|
||||
def get_external_ip(self) -> str:
|
||||
"""Returns the external IP address received from libtorrent."""
|
||||
return self.external_ip
|
||||
|
||||
@export
|
||||
def get_libtorrent_version(self):
|
||||
"""
|
||||
Returns the libtorrent version.
|
||||
|
||||
:returns: the version
|
||||
:rtype: string
|
||||
def get_libtorrent_version(self) -> str:
|
||||
"""Returns the libtorrent version.
|
||||
|
||||
Returns:
|
||||
the version
|
||||
"""
|
||||
return LT_VERSION
|
||||
|
||||
@export
|
||||
def get_completion_paths(self, args):
|
||||
"""
|
||||
Returns the available path completions for the input value.
|
||||
"""
|
||||
def get_completion_paths(self, args: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Returns the available path completions for the input value."""
|
||||
return path_chooser_common.get_completion_paths(args)
|
||||
|
||||
@export(AUTH_LEVEL_ADMIN)
|
||||
def get_known_accounts(self):
|
||||
def get_known_accounts(self) -> List[Dict[str, Any]]:
|
||||
return self.authmanager.get_known_accounts()
|
||||
|
||||
@export(AUTH_LEVEL_NONE)
|
||||
def get_auth_levels_mappings(self):
|
||||
def get_auth_levels_mappings(self) -> Tuple[Dict[str, int], Dict[int, str]]:
|
||||
return (AUTH_LEVELS_MAPPING, AUTH_LEVELS_MAPPING_REVERSE)
|
||||
|
||||
@export(AUTH_LEVEL_ADMIN)
|
||||
def create_account(self, username, password, authlevel):
|
||||
def create_account(self, username: str, password: str, authlevel: str) -> bool:
|
||||
return self.authmanager.create_account(username, password, authlevel)
|
||||
|
||||
@export(AUTH_LEVEL_ADMIN)
|
||||
def update_account(self, username, password, authlevel):
|
||||
def update_account(self, username: str, password: str, authlevel: str) -> bool:
|
||||
return self.authmanager.update_account(username, password, authlevel)
|
||||
|
||||
@export(AUTH_LEVEL_ADMIN)
|
||||
def remove_account(self, username):
|
||||
def remove_account(self, username: str) -> bool:
|
||||
return self.authmanager.remove_account(username)
|
||||
|
|
|
@ -40,7 +40,7 @@ def get_completion_paths(args):
|
|||
:param args: options
|
||||
:type args: dict
|
||||
:returns: the args argument containing the available completions for the completion_text
|
||||
:rtype: list
|
||||
:rtype: dict
|
||||
|
||||
"""
|
||||
args['paths'] = []
|
||||
|
|
|
@ -2,3 +2,4 @@ sphinx==4.*
|
|||
myst-parser
|
||||
sphinx_rtd_theme
|
||||
sphinxcontrib-spelling==7.3.0
|
||||
sphinx-autodoc-typehints
|
||||
|
|
|
@ -48,6 +48,7 @@ extensions = [
|
|||
'sphinx.ext.coverage',
|
||||
'sphinxcontrib.spelling',
|
||||
'myst_parser',
|
||||
'sphinx_autodoc_typehints',
|
||||
]
|
||||
|
||||
napoleon_include_init_with_doc = True
|
||||
|
|
Loading…
Reference in New Issue