diff --git a/deluge/httpdownloader.py b/deluge/httpdownloader.py index 9dd5dbbba..6fc004ea3 100644 --- a/deluge/httpdownloader.py +++ b/deluge/httpdownloader.py @@ -36,13 +36,15 @@ from twisted.web import client, http from twisted.web.error import PageRedirect from twisted.python.failure import Failure from twisted.internet import reactor +from deluge.log import setupLogger, LOG as log from common import get_version +import os.path class HTTPDownloader(client.HTTPDownloader): """ Factory class for downloading files and keeping track of progress. """ - def __init__(self, url, filename, part_callback=None, headers=None): + def __init__(self, url, filename, part_callback=None, headers=None, force_filename=False): """ :param url: the url to download from :type url: string @@ -57,6 +59,7 @@ class HTTPDownloader(client.HTTPDownloader): self.__part_callback = part_callback self.current_length = 0 self.value = filename + self.force_filename = force_filename agent = "Deluge/%s (http://deluge-torrent.org)" % get_version() client.HTTPDownloader.__init__(self, url, filename, headers=headers, agent=agent) @@ -70,6 +73,18 @@ class HTTPDownloader(client.HTTPDownloader): self.total_length = int(headers["content-length"][0]) else: self.total_length = 0 + + if "content-disposition" in headers and not self.force_filename: + try: + new_file_name = str(headers["content-disposition"][0]).split(";")[1].split("=")[1] + new_file_name = sanitise_filename(new_file_name) + new_file_name = os.path.join(os.path.split(self.fileName)[0], new_file_name) + except Exception, e: + log.exception(e) + else: + self.fileName = new_file_name + self.value = new_file_name + elif self.code in (http.TEMPORARY_REDIRECT, http.MOVED_PERMANENTLY): location = headers["location"][0] error = PageRedirect(self.code, location=location) @@ -85,7 +100,39 @@ class HTTPDownloader(client.HTTPDownloader): return client.HTTPDownloader.pagePart(self, data) -def download_file(url, filename, callback=None, headers=None): +def sanitise_filename(filename): + """ + Sanitises a filename to use as a download destination file. + Logs any filenames that could be considered malicious. + + :param filename: the filename to sanitise + :type filename: string + :returns: the sanitised filename + :rtype: string + + :raises IOError: when the filename exists + """ + + # Remove any quotes + filename = filename.strip("'\"") + + if os.path.basename(filename) != filename: + # Dodgy server, log it + log.warning("Potentially malicious server: trying to write to file '%s'" % filename) + # Only use the basename + filename = os.path.basename(filename) + + filename = filename.strip() + if filename.startswith(".") or ";" in filename or "|" in filename: + # Dodgy server, log it + log.warning("Potentially malicious server: trying to write to file '%s'" % filename) + + if os.path.exists(filename): + raise IOError, "File '%s' already exists!" % filename + + return filename + +def download_file(url, filename, callback=None, headers=None, force_filename=False): """ Downloads a file from a specific URL and returns a Deferred. You can also specify a callback function to be called as parts are received. @@ -99,6 +146,9 @@ def download_file(url, filename, callback=None, headers=None): :type callback: function :param headers: any optional headers to send :type headers: dictionary + :param force_filename: force us to use the filename specified rather than + one the server may suggest + :type force_filename: boolean :returns: the filename of the downloaded file :rtype: Deferred @@ -114,7 +164,7 @@ def download_file(url, filename, callback=None, headers=None): headers[str(key)] = str(value) scheme, host, port, path = client._parse(url) - factory = HTTPDownloader(url, filename, callback, headers) + factory = HTTPDownloader(url, filename, callback, headers, force_filename) if scheme == "https": from twisted.internet import ssl reactor.connectSSL(host, port, factory, ssl.ClientContextFactory()) diff --git a/tests/test_httpdownloader.py b/tests/test_httpdownloader.py index e2dda2fd9..0659bd091 100644 --- a/tests/test_httpdownloader.py +++ b/tests/test_httpdownloader.py @@ -2,24 +2,80 @@ from twisted.trial import unittest from twisted.python.failure import Failure from deluge.httpdownloader import download_file +from deluge.log import setupLogger class DownloadFileTestCase(unittest.TestCase): + def setUp(self): + setupLogger("warning", "log_file") + + def tearDown(self): + pass + + def assertContains(self, filename, contents): + with open(filename) as f: + self.assertEqual(f.read(), contents) + return filename + def test_download(self): d = download_file("http://deluge-torrent.org", "index.html") d.addCallback(self.assertEqual, "index.html") return d - def test_download_with_cookies(self): - pass + def test_download_without_required_cookies(self): + url = "http://deluge-torrent.org/httpdownloader.php?test=cookie" + d = download_file(url, "none") + d.addCallback(self.fail) + d.addErrback(self.assertIsInstance, Failure) + return d - def test_page_moved(self): - pass + def test_download_with_required_cookies(self): + url = "http://deluge-torrent.org/httpdownloader.php?test=cookie" + cookie = { "cookie" : "password=deluge" } + d = download_file(url, "monster", headers=cookie) + d.addCallback(self.assertEqual, "monster") + d.addCallback(self.assertContains, "COOKIE MONSTER!") + return d - def test_page_moved_permanently(self): - pass + def test_download_with_rename(self): + url = "http://deluge-torrent.org/httpdownloader.php?test=rename&filename=renamed" + d = download_file(url, "original") + d.addCallback(self.assertEqual, "renamed") + d.addCallback(self.assertContains, "This file should be called renamed") + return d - def test_page_not_modified(self): - pass + def test_download_with_rename_fail(self): + url = "http://deluge-torrent.org/httpdownloader.php?test=rename&filename=renamed" + d = download_file(url, "original") + d.addCallback(self.assertEqual, "original") + d.addCallback(self.assertContains, "This file should be called renamed") + return d + + def test_download_with_rename_sanitised(self): + url = "http://deluge-torrent.org/httpdownloader.php?test=rename&filename=/etc/passwd" + d = download_file(url, "original") + d.addCallback(self.assertEqual, "passwd") + d.addCallback(self.assertContains, "This file should be called /etc/passwd") + return d + + def test_download_with_rename_prevented(self): + url = "http://deluge-torrent.org/httpdownloader.php?test=rename&filename=spam" + d = download_file(url, "forced", force_filename=True) + d.addCallback(self.assertEqual, "forced") + d.addCallback(self.assertContains, "This file should be called spam") + return d + + def test_download_with_gzip_encoding(self): + url = "http://deluge-torrent.org/httpdownloader.php?test=gzip&msg=success" + d = download_file(url, "gzip_encoded") + d.addCallback(self.assertContains, "success") + return d + + def test_page_redirect(self): + url = "http://deluge-torrent.org/httpdownloader.php?test=redirect" + d = download_file(url, "none") + d.addCallback(self.fail) + d.addErrback(self.assertIsInstance, Failure) + return d def test_page_not_found(self): d = download_file("http://does.not.exist", "none")