From 9e303b58a09c4c5cc9fb5484838404f9ccb58d71 Mon Sep 17 00:00:00 2001 From: Calum Lind Date: Tue, 8 Nov 2016 18:00:07 +0000 Subject: [PATCH] Refactor common magnet funcs --- deluge/common.py | 131 ++++++++++++++++++++++++----------------------- 1 file changed, 68 insertions(+), 63 deletions(-) diff --git a/deluge/common.py b/deluge/common.py index 0980a6dd6..5d9b879d6 100644 --- a/deluge/common.py +++ b/deluge/common.py @@ -577,6 +577,12 @@ def is_infohash(infohash): return len(infohash) == 40 and infohash.isalnum() +MAGNET_SCHEME = 'magnet:?' +XT_BTIH_PARAM = 'xt=urn:btih:' +DN_PARAM = 'dn=' +TR_PARAM = 'tr=' + + def is_magnet(uri): """ A check to determine if a uri is a valid bittorrent magnet uri @@ -592,73 +598,72 @@ def is_magnet(uri): True """ - magnet_scheme = 'magnet:?' - xt_param = 'xt=urn:btih:' - if uri.startswith(magnet_scheme) and xt_param in uri: + + if uri.startswith(MAGNET_SCHEME) and XT_BTIH_PARAM in uri: return True return False def get_magnet_info(uri): + """Parse torrent information from magnet link. + + Args: + uri (str): The magnet link. + + Returns: + dict: Information about the magnet link. + + Format of the magnet dict:: + + { + "name": the torrent name, + "info_hash": the torrents info_hash, + "files_tree": empty value for magnet links + } + """ - Return information about a magnet link. - :param uri: the magnet link - :type uri: string + tr0_param = 'tr.' + tr0_param_regex = re.compile('^tr.(\d+)=(\S+)') + if not uri.startswith(MAGNET_SCHEME): + return {} - :returns: information about the magnet link: - - :: - - { - "name": the torrent name, - "info_hash": the torrents info_hash, - "files_tree": empty value for magnet links - } - - :rtype: dictionary - """ - magnet_scheme = 'magnet:?' - xt_param = 'xt=urn:btih:' - dn_param = 'dn=' - tr_param = 'tr=' - tr0_param = re.compile('^tr.(\d+)=(\S+)') - if uri.startswith(magnet_scheme): - name = None - info_hash = None - trackers = {} - tier = 0 - for param in uri[len(magnet_scheme):].split('&'): - if param.startswith(xt_param): - xt_hash = param[len(xt_param):] - if len(xt_hash) == 32: - try: - info_hash = base64.b32decode(xt_hash.upper()).encode('hex') - except TypeError as ex: - log.debug('Invalid base32 magnet hash: %s, %s', xt_hash, ex) - break - elif is_infohash(xt_hash): - info_hash = xt_hash.lower() - else: - break - elif param.startswith(dn_param): - name = unquote_plus(param[len(dn_param):]) - elif param.startswith(tr_param): - tracker = unquote_plus(param[len(tr_param):]) - trackers[tracker] = tier - tier += 1 - elif param.startswith('tr.'): + name = None + info_hash = None + trackers = {} + tier = 0 + for param in uri[len(MAGNET_SCHEME):].split('&'): + if param.startswith(XT_BTIH_PARAM): + xt_hash = param[len(XT_BTIH_PARAM):] + if len(xt_hash) == 32: try: - tier, tracker = re.match(tr0_param, param).groups() - trackers[tracker] = tier - except AttributeError: - pass + info_hash = base64.b32decode(xt_hash.upper()).encode('hex') + except TypeError as ex: + log.debug('Invalid base32 magnet hash: %s, %s', xt_hash, ex) + break + elif is_infohash(xt_hash): + info_hash = xt_hash.lower() + else: + break + elif param.startswith(DN_PARAM): + name = unquote_plus(param[len(DN_PARAM):]) + elif param.startswith(TR_PARAM): + tracker = unquote_plus(param[len(TR_PARAM):]) + trackers[tracker] = tier + tier += 1 + elif param.startswith(tr0_param): + try: + tier, tracker = re.match(tr0_param_regex, param).groups() + trackers[tracker] = tier + except AttributeError: + pass - if info_hash: - if not name: - name = info_hash - return {'name': name, 'info_hash': info_hash, 'files_tree': '', 'trackers': trackers} - return False + if info_hash: + if not name: + name = info_hash + return {'name': name, 'info_hash': info_hash, 'files_tree': '', 'trackers': trackers} + else: + return {} def create_magnet_uri(infohash, name=None, trackers=None): @@ -667,25 +672,25 @@ def create_magnet_uri(infohash, name=None, trackers=None): Args: infohash (str): The info-hash of the torrent. name (str, optional): The name of the torrent. - trackers (dict, optional): The trackers to announce to. + trackers (list or dict, optional): A list of trackers or dict or {tracker: tier} pairs. Returns: str: A magnet uri string. """ - uri = 'magnet:?xt=urn:btih:' + base64.b32encode(infohash.decode('hex')) + uri = [MAGNET_SCHEME, XT_BTIH_PARAM, base64.b32encode(infohash.decode('hex'))] if name: - uri = uri + '&dn=' + name + uri.extend(['&', DN_PARAM, name]) if trackers: try: for tracker in sorted(trackers, key=trackers.__getitem__): - uri = ''.join([uri, '&tr.%d=' % trackers[tracker], tracker]) + uri.extend(['&', 'tr.%d=' % trackers[tracker], tracker]) except TypeError: for tracker in trackers: - uri = ''.join([uri, '&tr=', tracker]) + uri.extend(['&', TR_PARAM, tracker]) - return uri + return ''.join(uri) def get_path_size(path):