Refactor common magnet funcs

This commit is contained in:
Calum Lind 2016-11-08 18:00:07 +00:00
parent 112a872bc1
commit 9e303b58a0
1 changed files with 68 additions and 63 deletions

View File

@ -577,6 +577,12 @@ def is_infohash(infohash):
return len(infohash) == 40 and infohash.isalnum() return len(infohash) == 40 and infohash.isalnum()
MAGNET_SCHEME = 'magnet:?'
XT_BTIH_PARAM = 'xt=urn:btih:'
DN_PARAM = 'dn='
TR_PARAM = 'tr='
def is_magnet(uri): def is_magnet(uri):
""" """
A check to determine if a uri is a valid bittorrent magnet uri A check to determine if a uri is a valid bittorrent magnet uri
@ -592,73 +598,72 @@ def is_magnet(uri):
True True
""" """
magnet_scheme = 'magnet:?'
xt_param = 'xt=urn:btih:' if uri.startswith(MAGNET_SCHEME) and XT_BTIH_PARAM in uri:
if uri.startswith(magnet_scheme) and xt_param in uri:
return True return True
return False return False
def get_magnet_info(uri): def get_magnet_info(uri):
"""Parse torrent information from magnet link.
Args:
uri (str): The magnet link.
Returns:
dict: Information about the magnet link.
Format of the magnet dict::
{
"name": the torrent name,
"info_hash": the torrents info_hash,
"files_tree": empty value for magnet links
}
""" """
Return information about a magnet link.
:param uri: the magnet link tr0_param = 'tr.'
:type uri: string tr0_param_regex = re.compile('^tr.(\d+)=(\S+)')
if not uri.startswith(MAGNET_SCHEME):
return {}
:returns: information about the magnet link: name = None
info_hash = None
:: trackers = {}
tier = 0
{ for param in uri[len(MAGNET_SCHEME):].split('&'):
"name": the torrent name, if param.startswith(XT_BTIH_PARAM):
"info_hash": the torrents info_hash, xt_hash = param[len(XT_BTIH_PARAM):]
"files_tree": empty value for magnet links if len(xt_hash) == 32:
}
:rtype: dictionary
"""
magnet_scheme = 'magnet:?'
xt_param = 'xt=urn:btih:'
dn_param = 'dn='
tr_param = 'tr='
tr0_param = re.compile('^tr.(\d+)=(\S+)')
if uri.startswith(magnet_scheme):
name = None
info_hash = None
trackers = {}
tier = 0
for param in uri[len(magnet_scheme):].split('&'):
if param.startswith(xt_param):
xt_hash = param[len(xt_param):]
if len(xt_hash) == 32:
try:
info_hash = base64.b32decode(xt_hash.upper()).encode('hex')
except TypeError as ex:
log.debug('Invalid base32 magnet hash: %s, %s', xt_hash, ex)
break
elif is_infohash(xt_hash):
info_hash = xt_hash.lower()
else:
break
elif param.startswith(dn_param):
name = unquote_plus(param[len(dn_param):])
elif param.startswith(tr_param):
tracker = unquote_plus(param[len(tr_param):])
trackers[tracker] = tier
tier += 1
elif param.startswith('tr.'):
try: try:
tier, tracker = re.match(tr0_param, param).groups() info_hash = base64.b32decode(xt_hash.upper()).encode('hex')
trackers[tracker] = tier except TypeError as ex:
except AttributeError: log.debug('Invalid base32 magnet hash: %s, %s', xt_hash, ex)
pass break
elif is_infohash(xt_hash):
info_hash = xt_hash.lower()
else:
break
elif param.startswith(DN_PARAM):
name = unquote_plus(param[len(DN_PARAM):])
elif param.startswith(TR_PARAM):
tracker = unquote_plus(param[len(TR_PARAM):])
trackers[tracker] = tier
tier += 1
elif param.startswith(tr0_param):
try:
tier, tracker = re.match(tr0_param_regex, param).groups()
trackers[tracker] = tier
except AttributeError:
pass
if info_hash: if info_hash:
if not name: if not name:
name = info_hash name = info_hash
return {'name': name, 'info_hash': info_hash, 'files_tree': '', 'trackers': trackers} return {'name': name, 'info_hash': info_hash, 'files_tree': '', 'trackers': trackers}
return False else:
return {}
def create_magnet_uri(infohash, name=None, trackers=None): def create_magnet_uri(infohash, name=None, trackers=None):
@ -667,25 +672,25 @@ def create_magnet_uri(infohash, name=None, trackers=None):
Args: Args:
infohash (str): The info-hash of the torrent. infohash (str): The info-hash of the torrent.
name (str, optional): The name of the torrent. name (str, optional): The name of the torrent.
trackers (dict, optional): The trackers to announce to. trackers (list or dict, optional): A list of trackers or dict or {tracker: tier} pairs.
Returns: Returns:
str: A magnet uri string. str: A magnet uri string.
""" """
uri = 'magnet:?xt=urn:btih:' + base64.b32encode(infohash.decode('hex')) uri = [MAGNET_SCHEME, XT_BTIH_PARAM, base64.b32encode(infohash.decode('hex'))]
if name: if name:
uri = uri + '&dn=' + name uri.extend(['&', DN_PARAM, name])
if trackers: if trackers:
try: try:
for tracker in sorted(trackers, key=trackers.__getitem__): for tracker in sorted(trackers, key=trackers.__getitem__):
uri = ''.join([uri, '&tr.%d=' % trackers[tracker], tracker]) uri.extend(['&', 'tr.%d=' % trackers[tracker], tracker])
except TypeError: except TypeError:
for tracker in trackers: for tracker in trackers:
uri = ''.join([uri, '&tr=', tracker]) uri.extend(['&', TR_PARAM, tracker])
return uri return ''.join(uri)
def get_path_size(path): def get_path_size(path):