# # common.py # # Copyright (C) 2007, 2008 Andrew Resch # # Deluge is free software. # # You may redistribute it and/or modify it under the terms of the # GNU General Public License, as published by the Free Software # Foundation; either version 3 of the License, or (at your option) # any later version. # # deluge is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with deluge. If not, write to: # The Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor # Boston, MA 02110-1301, USA. # # In addition, as a special exception, the copyright holders give # permission to link the code of portions of this program with the OpenSSL # library. # You must obey the GNU General Public License in all respects for all of # the code used other than OpenSSL. If you modify file(s) with this # exception, you may extend this exception to your version of the file(s), # but you are not obligated to do so. If you do not wish to do so, delete # this exception statement from your version. If you delete this exception # statement from all source files in the program, then also delete it here. # # """Common functions for various parts of Deluge to use.""" import os import time import subprocess import platform import sys try: import json except ImportError: import simplejson as json # Do a little hack here just in case the user has json-py installed since it # has a different api if not hasattr(json, "dumps"): json.dumps = json.write json.loads = json.read import pkg_resources import xdg, xdg.BaseDirectory from deluge.error import * LT_TORRENT_STATE = { "Queued": 0, "Checking": 1, "Downloading Metadata": 2, "Downloading": 3, "Finished": 4, "Seeding": 5, "Allocating": 6, "Checking Resume Data": 7, 0: "Queued", 1: "Checking", 2: "Downloading Metadata", 3: "Downloading", 4: "Finished", 5: "Seeding", 6: "Allocating", 7: "Checking Resume Data" } TORRENT_STATE = [ "Allocating", "Checking", "Downloading", "Seeding", "Paused", "Error", "Queued" ] FILE_PRIORITY = { 0: "Do Not Download", 1: "Normal Priority", 2: "High Priority", 5: "Highest Priority", "Do Not Download": 0, "Normal Priority": 1, "High Priority": 2, "Highest Priority": 5 } def get_version(): """ Returns the program version from the egg metadata :returns: the version of Deluge :rtype: string """ return pkg_resources.require("Deluge")[0].version def get_default_config_dir(filename=None): """ :param filename: if None, only the config path is returned, if provided, a path including the filename will be returned :type filename: string :returns: a file path to the config directory and optional filename :rtype: string """ if windows_check(): if filename: return os.path.join(os.environ.get("APPDATA"), "deluge", filename) else: return os.path.join(os.environ.get("APPDATA"), "deluge") else: if filename: return os.path.join(xdg.BaseDirectory.save_config_path("deluge"), filename) else: return xdg.BaseDirectory.save_config_path("deluge") def get_default_download_dir(): """ :returns: the default download directory :rtype: string """ if windows_check(): return os.path.expanduser("~") else: return os.environ.get("HOME") def windows_check(): """ Checks if the current platform is Windows :returns: True or False :rtype: bool """ return platform.system() in ('Windows', 'Microsoft') def vista_check(): """ Checks if the current platform is Windows Vista :returns: True or False :rtype: bool """ return platform.release() == "Vista" def osx_check(): """ Checks if the current platform is Mac OS X :returns: True or False :rtype: bool """ return platform.system() == "Darwin" def get_pixmap(fname): """ Provides easy access to files in the deluge/data/pixmaps folder within the Deluge egg :param fname: the filename to look for :type fname: string :returns: a path to a pixmap file included with Deluge :rtype: string """ return pkg_resources.resource_filename("deluge", os.path.join("data", \ "pixmaps", fname)) def open_file(path): """ Opens a file or folder using the system configured program :param path: the path to the file or folder to open :type path: string """ if windows_check(): os.startfile("%s" % path) else: subprocess.Popen(["xdg-open", "%s" % path]) def open_url_in_browser(url): """ Opens a url in the desktop's default browser :param url: the url to open :type url: string """ import webbrowser webbrowser.open(url) ## Formatting text functions def fsize(fsize_b): """ Formats the bytes value into a string with KiB, MiB or GiB units :param fsize_b: the filesize in bytes :type fsize_b: int :returns: formatted string in KiB, MiB or GiB units :rtype: string **Usage** >>> fsize(112245) '109.6 KiB' """ fsize_kb = fsize_b / 1024.0 if fsize_kb < 1024: return "%.1f KiB" % fsize_kb fsize_mb = fsize_kb / 1024.0 if fsize_mb < 1024: return "%.1f MiB" % fsize_mb fsize_gb = fsize_mb / 1024.0 return "%.1f GiB" % fsize_gb def fpcnt(dec): """ Formats a string to display a percentage with two decimal places :param dec: the ratio in the range [0.0, 1.0] :type dec: float :returns: a formatted string representing a percentage :rtype: string **Usage** >>> fpcnt(0.9311) '93.11%' """ return '%.2f%%' % (dec * 100) def fspeed(bps): """ Formats a string to display a transfer speed utilizing :func:`fsize` :param bps: bytes per second :type bps: int :returns: a formatted string representing transfer speed :rtype: string **Usage** >>> fspeed(43134) '42.1 KiB/s' """ return '%s/s' % (fsize(bps)) def fpeer(num_peers, total_peers): """ Formats a string to show 'num_peers' ('total_peers') :param num_peers: the number of connected peers :type num_peers: int :param total_peers: the total number of peers :type total_peers: int :returns: a formatted string: num_peers (total_peers), if total_peers < 0, then it will not be shown :rtype: string **Usage** >>> fpeer(10, 20) '10 (20)' >>> fpeer(10, -1) '10' """ if total_peers > -1: return "%d (%d)" % (num_peers, total_peers) else: return "%d" % num_peers def ftime(seconds): """ Formats a string to show time in a human readable form :param seconds: the number of seconds :type seconds: int :returns: a formatted time string, will return '' if seconds == 0 :rtype: string **Usage** >>> ftime(23011) '6h 23m' """ if seconds == 0: return "" if seconds < 60: return '%ds' % (seconds) minutes = seconds / 60 if minutes < 60: seconds = seconds % 60 return '%dm %ds' % (minutes, seconds) hours = minutes / 60 if hours < 24: minutes = minutes % 60 return '%dh %dm' % (hours, minutes) days = hours / 24 if days < 7: hours = hours % 24 return '%dd %dh' % (days, hours) weeks = days / 7 if weeks < 52: days = days % 7 return '%dw %dd' % (weeks, days) years = weeks / 52 weeks = weeks % 52 return '%dy %dw' % (years, weeks) def fdate(seconds): """ Formats a date string in the locale's date representation based on the systems timezone :param seconds: time in seconds since the Epoch :type seconds: float :returns: a string in the locale's date representation or "" if seconds < 0 :rtype: string """ if seconds < 0: return "" return time.strftime("%x", time.localtime(seconds)) def is_url(url): """ A simple regex test to check if the URL is valid :param url: the url to test :type url: string :returns: True or False :rtype: bool **Usage** >>> is_url("http://deluge-torrent.org") True """ import re return bool(re.search('^(https?|ftp|udp)://', url)) def is_magnet(uri): """ A check to determine if a uri is a valid bittorrent magnet uri :param uri: the uri to check :type uri: string :returns: True or False :rtype: bool **Usage** >>> is_magnet("magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN") True """ if uri[:20] == "magnet:?xt=urn:btih:": return True return False def fetch_url(url): """ Downloads a torrent file from a given URL and checks the file's validity :param url: the url of the .torrent file to fetch :type url: string :returns: the filepath to the downloaded file :rtype: string """ import urllib from deluge.log import LOG as log try: filename, headers = urllib.urlretrieve(url) except IOError: log.debug("Network error while trying to fetch torrent from %s", url) else: if filename.endswith(".torrent") or headers["content-type"] ==\ "application/x-bittorrent": return filename else: log.debug("URL doesn't appear to be a valid torrent file: %s", url) return None def create_magnet_uri(infohash, name=None, trackers=[]): """ Creates a magnet uri :param infohash: the info-hash of the torrent :type infohash: string :param name: the name of the torrent (optional) :type name: string :param trackers: the trackers to announce to (optional) :type trackers: list of strings :returns: a magnet uri string :rtype: string """ from base64 import b32encode uri = "magnet:?xt=urn:btih:" + b32encode(infohash.decode("hex")) if name: uri = uri + "&dn=" + name if trackers: for t in trackers: uri = uri + "&tr=" + t return uri def get_path_size(path): """ Gets the size in bytes of 'path' :param path: the path to check for size :type path: string :returns: the size in bytes of the path or -1 if the path does not exist :rtype: int """ if not os.path.exists(path): return -1 if os.path.isfile(path): return os.path.getsize(path) dir_size = 0 for (p, dirs, files) in os.walk(path): for file in files: filename = os.path.join(p, file) dir_size += os.path.getsize(filename) return dir_size def free_space(path): """ Gets the free space available at 'path' :param path: the path to check :type path: string :returns: the free space at path in bytes :rtype: int :raises InvalidPathError: if the path is not valid """ if not os.path.exists(path): raise InvalidPathError("%s is not a valid path" % path) if windows_check(): import win32file sectors, bytes, free, total = map(long, win32file.GetDiskFreeSpace(path)) return (free * sectors * bytes) else: disk_data = os.statvfs(path) block_size = disk_data.f_bsize return disk_data.f_bavail * block_size def is_ip(ip): """ A simple test to see if 'ip' is valid :param ip: the ip to check :type ip: string :returns: True or False :rtype: bool ** Usage ** >>> is_ip("127.0.0.1") True """ import socket #first we test ipv4 try: if socket.inet_pton(socket.AF_INET, "%s" % (ip)): return True except socket.error: if not socket.has_ipv6: return False #now test ipv6 try: if socket.inet_pton(socket.AF_INET6, "%s" % (ip)): return True except socket.error: return False