561 lines
13 KiB
Python
561 lines
13 KiB
Python
#
|
|
# common.py
|
|
#
|
|
# Copyright (C) 2007, 2008 Andrew Resch <andrewresch@gmail.com>
|
|
#
|
|
# Deluge is free software.
|
|
#
|
|
# You may redistribute it and/or modify it under the terms of the
|
|
# GNU General Public License, as published by the Free Software
|
|
# Foundation; either version 3 of the License, or (at your option)
|
|
# any later version.
|
|
#
|
|
# deluge is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
|
# See the GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with deluge. If not, write to:
|
|
# The Free Software Foundation, Inc.,
|
|
# 51 Franklin Street, Fifth Floor
|
|
# Boston, MA 02110-1301, USA.
|
|
#
|
|
# In addition, as a special exception, the copyright holders give
|
|
# permission to link the code of portions of this program with the OpenSSL
|
|
# library.
|
|
# You must obey the GNU General Public License in all respects for all of
|
|
# the code used other than OpenSSL. If you modify file(s) with this
|
|
# exception, you may extend this exception to your version of the file(s),
|
|
# but you are not obligated to do so. If you do not wish to do so, delete
|
|
# this exception statement from your version. If you delete this exception
|
|
# statement from all source files in the program, then also delete it here.
|
|
#
|
|
#
|
|
|
|
|
|
"""Common functions for various parts of Deluge to use."""
|
|
|
|
import os
|
|
import time
|
|
import subprocess
|
|
import platform
|
|
import sys
|
|
|
|
try:
|
|
import json
|
|
except ImportError:
|
|
import simplejson as json
|
|
|
|
# Do a little hack here just in case the user has json-py installed since it
|
|
# has a different api
|
|
if not hasattr(json, "dumps"):
|
|
json.dumps = json.write
|
|
json.loads = json.read
|
|
|
|
def dump(obj, fp, **kw):
|
|
fp.write(json.dumps(obj))
|
|
|
|
def load(fp, **kw):
|
|
return json.loads(fp.read())
|
|
|
|
json.dump = dump
|
|
json.load = load
|
|
|
|
import pkg_resources
|
|
import xdg, xdg.BaseDirectory
|
|
|
|
from deluge.error import *
|
|
|
|
LT_TORRENT_STATE = {
|
|
"Queued": 0,
|
|
"Checking": 1,
|
|
"Downloading Metadata": 2,
|
|
"Downloading": 3,
|
|
"Finished": 4,
|
|
"Seeding": 5,
|
|
"Allocating": 6,
|
|
"Checking Resume Data": 7,
|
|
0: "Queued",
|
|
1: "Checking",
|
|
2: "Downloading Metadata",
|
|
3: "Downloading",
|
|
4: "Finished",
|
|
5: "Seeding",
|
|
6: "Allocating",
|
|
7: "Checking Resume Data"
|
|
}
|
|
|
|
|
|
TORRENT_STATE = [
|
|
"Allocating",
|
|
"Checking",
|
|
"Downloading",
|
|
"Seeding",
|
|
"Paused",
|
|
"Error",
|
|
"Queued"
|
|
]
|
|
|
|
FILE_PRIORITY = {
|
|
0: "Do Not Download",
|
|
1: "Normal Priority",
|
|
2: "High Priority",
|
|
5: "Highest Priority",
|
|
"Do Not Download": 0,
|
|
"Normal Priority": 1,
|
|
"High Priority": 2,
|
|
"Highest Priority": 5
|
|
}
|
|
|
|
def get_version():
|
|
"""
|
|
Returns the program version from the egg metadata
|
|
|
|
:returns: the version of Deluge
|
|
:rtype: string
|
|
|
|
"""
|
|
return pkg_resources.require("Deluge")[0].version
|
|
|
|
def get_default_config_dir(filename=None):
|
|
"""
|
|
:param filename: if None, only the config path is returned, if provided, a path including the filename will be returned
|
|
:type filename: string
|
|
:returns: a file path to the config directory and optional filename
|
|
:rtype: string
|
|
|
|
"""
|
|
if windows_check():
|
|
if filename:
|
|
return os.path.join(os.environ.get("APPDATA"), "deluge", filename)
|
|
else:
|
|
return os.path.join(os.environ.get("APPDATA"), "deluge")
|
|
else:
|
|
if filename:
|
|
return os.path.join(xdg.BaseDirectory.save_config_path("deluge"), filename)
|
|
else:
|
|
return xdg.BaseDirectory.save_config_path("deluge")
|
|
|
|
def get_default_download_dir():
|
|
"""
|
|
:returns: the default download directory
|
|
:rtype: string
|
|
|
|
"""
|
|
if windows_check():
|
|
return os.path.expanduser("~")
|
|
else:
|
|
return os.environ.get("HOME")
|
|
|
|
def windows_check():
|
|
"""
|
|
Checks if the current platform is Windows
|
|
|
|
:returns: True or False
|
|
:rtype: bool
|
|
|
|
"""
|
|
return platform.system() in ('Windows', 'Microsoft')
|
|
|
|
def vista_check():
|
|
"""
|
|
Checks if the current platform is Windows Vista
|
|
|
|
:returns: True or False
|
|
:rtype: bool
|
|
|
|
"""
|
|
return platform.release() == "Vista"
|
|
|
|
def osx_check():
|
|
"""
|
|
Checks if the current platform is Mac OS X
|
|
|
|
:returns: True or False
|
|
:rtype: bool
|
|
|
|
"""
|
|
return platform.system() == "Darwin"
|
|
|
|
def get_pixmap(fname):
|
|
"""
|
|
Provides easy access to files in the deluge/data/pixmaps folder within the Deluge egg
|
|
|
|
:param fname: the filename to look for
|
|
:type fname: string
|
|
:returns: a path to a pixmap file included with Deluge
|
|
:rtype: string
|
|
|
|
"""
|
|
return pkg_resources.resource_filename("deluge", os.path.join("data", \
|
|
"pixmaps", fname))
|
|
|
|
def open_file(path):
|
|
"""
|
|
Opens a file or folder using the system configured program
|
|
|
|
:param path: the path to the file or folder to open
|
|
:type path: string
|
|
|
|
"""
|
|
if windows_check():
|
|
os.startfile("%s" % path)
|
|
else:
|
|
subprocess.Popen(["xdg-open", "%s" % path])
|
|
|
|
def open_url_in_browser(url):
|
|
"""
|
|
Opens a url in the desktop's default browser
|
|
|
|
:param url: the url to open
|
|
:type url: string
|
|
|
|
"""
|
|
import webbrowser
|
|
webbrowser.open(url)
|
|
|
|
## Formatting text functions
|
|
|
|
def fsize(fsize_b):
|
|
"""
|
|
Formats the bytes value into a string with KiB, MiB or GiB units
|
|
|
|
:param fsize_b: the filesize in bytes
|
|
:type fsize_b: int
|
|
:returns: formatted string in KiB, MiB or GiB units
|
|
:rtype: string
|
|
|
|
**Usage**
|
|
|
|
>>> fsize(112245)
|
|
'109.6 KiB'
|
|
|
|
"""
|
|
fsize_kb = fsize_b / 1024.0
|
|
if fsize_kb < 1024:
|
|
return "%.1f KiB" % fsize_kb
|
|
fsize_mb = fsize_kb / 1024.0
|
|
if fsize_mb < 1024:
|
|
return "%.1f MiB" % fsize_mb
|
|
fsize_gb = fsize_mb / 1024.0
|
|
return "%.1f GiB" % fsize_gb
|
|
|
|
def fpcnt(dec):
|
|
"""
|
|
Formats a string to display a percentage with two decimal places
|
|
|
|
:param dec: the ratio in the range [0.0, 1.0]
|
|
:type dec: float
|
|
:returns: a formatted string representing a percentage
|
|
:rtype: string
|
|
|
|
**Usage**
|
|
|
|
>>> fpcnt(0.9311)
|
|
'93.11%'
|
|
|
|
"""
|
|
return '%.2f%%' % (dec * 100)
|
|
|
|
def fspeed(bps):
|
|
"""
|
|
Formats a string to display a transfer speed utilizing :func:`fsize`
|
|
|
|
:param bps: bytes per second
|
|
:type bps: int
|
|
:returns: a formatted string representing transfer speed
|
|
:rtype: string
|
|
|
|
**Usage**
|
|
|
|
>>> fspeed(43134)
|
|
'42.1 KiB/s'
|
|
|
|
"""
|
|
return '%s/s' % (fsize(bps))
|
|
|
|
def fpeer(num_peers, total_peers):
|
|
"""
|
|
Formats a string to show 'num_peers' ('total_peers')
|
|
|
|
:param num_peers: the number of connected peers
|
|
:type num_peers: int
|
|
:param total_peers: the total number of peers
|
|
:type total_peers: int
|
|
:returns: a formatted string: num_peers (total_peers), if total_peers < 0, then it will not be shown
|
|
:rtype: string
|
|
|
|
**Usage**
|
|
|
|
>>> fpeer(10, 20)
|
|
'10 (20)'
|
|
>>> fpeer(10, -1)
|
|
'10'
|
|
|
|
"""
|
|
if total_peers > -1:
|
|
return "%d (%d)" % (num_peers, total_peers)
|
|
else:
|
|
return "%d" % num_peers
|
|
|
|
def ftime(seconds):
|
|
"""
|
|
Formats a string to show time in a human readable form
|
|
|
|
:param seconds: the number of seconds
|
|
:type seconds: int
|
|
:returns: a formatted time string, will return '' if seconds == 0
|
|
:rtype: string
|
|
|
|
**Usage**
|
|
|
|
>>> ftime(23011)
|
|
'6h 23m'
|
|
|
|
"""
|
|
if seconds == 0:
|
|
return ""
|
|
if seconds < 60:
|
|
return '%ds' % (seconds)
|
|
minutes = seconds / 60
|
|
if minutes < 60:
|
|
seconds = seconds % 60
|
|
return '%dm %ds' % (minutes, seconds)
|
|
hours = minutes / 60
|
|
if hours < 24:
|
|
minutes = minutes % 60
|
|
return '%dh %dm' % (hours, minutes)
|
|
days = hours / 24
|
|
if days < 7:
|
|
hours = hours % 24
|
|
return '%dd %dh' % (days, hours)
|
|
weeks = days / 7
|
|
if weeks < 52:
|
|
days = days % 7
|
|
return '%dw %dd' % (weeks, days)
|
|
years = weeks / 52
|
|
weeks = weeks % 52
|
|
return '%dy %dw' % (years, weeks)
|
|
|
|
def fdate(seconds):
|
|
"""
|
|
Formats a date string in the locale's date representation based on the systems timezone
|
|
|
|
:param seconds: time in seconds since the Epoch
|
|
:type seconds: float
|
|
:returns: a string in the locale's date representation or "" if seconds < 0
|
|
:rtype: string
|
|
|
|
"""
|
|
if seconds < 0:
|
|
return ""
|
|
return time.strftime("%x", time.localtime(seconds))
|
|
|
|
def is_url(url):
|
|
"""
|
|
A simple regex test to check if the URL is valid
|
|
|
|
:param url: the url to test
|
|
:type url: string
|
|
:returns: True or False
|
|
:rtype: bool
|
|
|
|
**Usage**
|
|
|
|
>>> is_url("http://deluge-torrent.org")
|
|
True
|
|
|
|
"""
|
|
import re
|
|
return bool(re.search('^(https?|ftp|udp)://', url))
|
|
|
|
def is_magnet(uri):
|
|
"""
|
|
A check to determine if a uri is a valid bittorrent magnet uri
|
|
|
|
:param uri: the uri to check
|
|
:type uri: string
|
|
:returns: True or False
|
|
:rtype: bool
|
|
|
|
**Usage**
|
|
|
|
>>> is_magnet("magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN")
|
|
True
|
|
|
|
"""
|
|
if uri[:20] == "magnet:?xt=urn:btih:":
|
|
return True
|
|
return False
|
|
|
|
def fetch_url(url):
|
|
"""
|
|
Downloads a torrent file from a given URL and checks the file's validity
|
|
|
|
:param url: the url of the .torrent file to fetch
|
|
:type url: string
|
|
:returns: the filepath to the downloaded file
|
|
:rtype: string
|
|
|
|
"""
|
|
import urllib
|
|
from deluge.log import LOG as log
|
|
try:
|
|
filename, headers = urllib.urlretrieve(url)
|
|
except IOError:
|
|
log.debug("Network error while trying to fetch torrent from %s", url)
|
|
else:
|
|
if filename.endswith(".torrent") or headers["content-type"] ==\
|
|
"application/x-bittorrent":
|
|
return filename
|
|
else:
|
|
log.debug("URL doesn't appear to be a valid torrent file: %s", url)
|
|
return None
|
|
|
|
def create_magnet_uri(infohash, name=None, trackers=[]):
|
|
"""
|
|
Creates a magnet uri
|
|
|
|
:param infohash: the info-hash of the torrent
|
|
:type infohash: string
|
|
:param name: the name of the torrent (optional)
|
|
:type name: string
|
|
:param trackers: the trackers to announce to (optional)
|
|
:type trackers: list of strings
|
|
|
|
:returns: a magnet uri string
|
|
:rtype: string
|
|
|
|
"""
|
|
from base64 import b32encode
|
|
uri = "magnet:?xt=urn:btih:" + b32encode(infohash.decode("hex"))
|
|
if name:
|
|
uri = uri + "&dn=" + name
|
|
if trackers:
|
|
for t in trackers:
|
|
uri = uri + "&tr=" + t
|
|
|
|
return uri
|
|
|
|
def get_path_size(path):
|
|
"""
|
|
Gets the size in bytes of 'path'
|
|
|
|
:param path: the path to check for size
|
|
:type path: string
|
|
:returns: the size in bytes of the path or -1 if the path does not exist
|
|
:rtype: int
|
|
|
|
"""
|
|
if not os.path.exists(path):
|
|
return -1
|
|
|
|
if os.path.isfile(path):
|
|
return os.path.getsize(path)
|
|
|
|
dir_size = 0
|
|
for (p, dirs, files) in os.walk(path):
|
|
for file in files:
|
|
filename = os.path.join(p, file)
|
|
dir_size += os.path.getsize(filename)
|
|
return dir_size
|
|
|
|
def free_space(path):
|
|
"""
|
|
Gets the free space available at 'path'
|
|
|
|
:param path: the path to check
|
|
:type path: string
|
|
:returns: the free space at path in bytes
|
|
:rtype: int
|
|
|
|
:raises InvalidPathError: if the path is not valid
|
|
|
|
"""
|
|
if not os.path.exists(path):
|
|
raise InvalidPathError("%s is not a valid path" % path)
|
|
|
|
if windows_check():
|
|
import win32file
|
|
sectors, bytes, free, total = map(long, win32file.GetDiskFreeSpace(path))
|
|
return (free * sectors * bytes)
|
|
else:
|
|
disk_data = os.statvfs(path)
|
|
block_size = disk_data.f_bsize
|
|
return disk_data.f_bavail * block_size
|
|
|
|
def is_ip(ip):
|
|
"""
|
|
A simple test to see if 'ip' is valid
|
|
|
|
:param ip: the ip to check
|
|
:type ip: string
|
|
:returns: True or False
|
|
:rtype: bool
|
|
|
|
** Usage **
|
|
|
|
>>> is_ip("127.0.0.1")
|
|
True
|
|
|
|
"""
|
|
import socket
|
|
#first we test ipv4
|
|
try:
|
|
if socket.inet_pton(socket.AF_INET, "%s" % (ip)):
|
|
return True
|
|
except socket.error:
|
|
if not socket.has_ipv6:
|
|
return False
|
|
#now test ipv6
|
|
try:
|
|
if socket.inet_pton(socket.AF_INET6, "%s" % (ip)):
|
|
return True
|
|
except socket.error:
|
|
return False
|
|
|
|
class VersionSplit(object):
|
|
"""
|
|
Used for comparing version numbers.
|
|
|
|
:param ver: the version
|
|
:type ver: string
|
|
|
|
"""
|
|
def __init__(self, ver):
|
|
ver = ver.lower()
|
|
vs = ver.split("_") if "_" in ver else ver.split("-")
|
|
self.version = vs[0]
|
|
self.suffix = None
|
|
if len(vs) > 1:
|
|
for s in ("rc", "alpha", "beta", "dev"):
|
|
if s in vs[1][:len(s)]:
|
|
self.suffix = vs[1]
|
|
|
|
def __cmp__(self, ver):
|
|
"""
|
|
The comparison method.
|
|
|
|
:param ver: the version to compare with
|
|
:type ver: VersionSplit
|
|
|
|
"""
|
|
|
|
if self.version > ver.version or (self.suffix and self.suffix[:3] == "dev"):
|
|
return 1
|
|
if self.version < ver.version:
|
|
return -1
|
|
|
|
if self.version == ver.version:
|
|
if self.suffix == ver.suffix:
|
|
return 0
|
|
if self.suffix is None:
|
|
return 1
|
|
if ver.suffix is None:
|
|
return -1
|
|
if self.suffix < ver.suffix:
|
|
return -1
|
|
if self.suffix > ver.suffix:
|
|
return 1
|