Use filename suggested by content-disposition header.

Closes #1040.
This commit is contained in:
John Garland 2009-10-29 06:02:20 +00:00
parent 675d1219cd
commit 5991abcec5
2 changed files with 117 additions and 11 deletions

View File

@ -36,13 +36,15 @@ from twisted.web import client, http
from twisted.web.error import PageRedirect
from twisted.python.failure import Failure
from twisted.internet import reactor
from deluge.log import setupLogger, LOG as log
from common import get_version
import os.path
class HTTPDownloader(client.HTTPDownloader):
"""
Factory class for downloading files and keeping track of progress.
"""
def __init__(self, url, filename, part_callback=None, headers=None):
def __init__(self, url, filename, part_callback=None, headers=None, force_filename=False):
"""
:param url: the url to download from
:type url: string
@ -57,6 +59,7 @@ class HTTPDownloader(client.HTTPDownloader):
self.__part_callback = part_callback
self.current_length = 0
self.value = filename
self.force_filename = force_filename
agent = "Deluge/%s (http://deluge-torrent.org)" % get_version()
client.HTTPDownloader.__init__(self, url, filename, headers=headers, agent=agent)
@ -70,6 +73,18 @@ class HTTPDownloader(client.HTTPDownloader):
self.total_length = int(headers["content-length"][0])
else:
self.total_length = 0
if "content-disposition" in headers and not self.force_filename:
try:
new_file_name = str(headers["content-disposition"][0]).split(";")[1].split("=")[1]
new_file_name = sanitise_filename(new_file_name)
new_file_name = os.path.join(os.path.split(self.fileName)[0], new_file_name)
except Exception, e:
log.exception(e)
else:
self.fileName = new_file_name
self.value = new_file_name
elif self.code in (http.TEMPORARY_REDIRECT, http.MOVED_PERMANENTLY):
location = headers["location"][0]
error = PageRedirect(self.code, location=location)
@ -85,7 +100,39 @@ class HTTPDownloader(client.HTTPDownloader):
return client.HTTPDownloader.pagePart(self, data)
def download_file(url, filename, callback=None, headers=None):
def sanitise_filename(filename):
"""
Sanitises a filename to use as a download destination file.
Logs any filenames that could be considered malicious.
:param filename: the filename to sanitise
:type filename: string
:returns: the sanitised filename
:rtype: string
:raises IOError: when the filename exists
"""
# Remove any quotes
filename = filename.strip("'\"")
if os.path.basename(filename) != filename:
# Dodgy server, log it
log.warning("Potentially malicious server: trying to write to file '%s'" % filename)
# Only use the basename
filename = os.path.basename(filename)
filename = filename.strip()
if filename.startswith(".") or ";" in filename or "|" in filename:
# Dodgy server, log it
log.warning("Potentially malicious server: trying to write to file '%s'" % filename)
if os.path.exists(filename):
raise IOError, "File '%s' already exists!" % filename
return filename
def download_file(url, filename, callback=None, headers=None, force_filename=False):
"""
Downloads a file from a specific URL and returns a Deferred. You can also
specify a callback function to be called as parts are received.
@ -99,6 +146,9 @@ def download_file(url, filename, callback=None, headers=None):
:type callback: function
:param headers: any optional headers to send
:type headers: dictionary
:param force_filename: force us to use the filename specified rather than
one the server may suggest
:type force_filename: boolean
:returns: the filename of the downloaded file
:rtype: Deferred
@ -114,7 +164,7 @@ def download_file(url, filename, callback=None, headers=None):
headers[str(key)] = str(value)
scheme, host, port, path = client._parse(url)
factory = HTTPDownloader(url, filename, callback, headers)
factory = HTTPDownloader(url, filename, callback, headers, force_filename)
if scheme == "https":
from twisted.internet import ssl
reactor.connectSSL(host, port, factory, ssl.ClientContextFactory())

View File

@ -2,24 +2,80 @@ from twisted.trial import unittest
from twisted.python.failure import Failure
from deluge.httpdownloader import download_file
from deluge.log import setupLogger
class DownloadFileTestCase(unittest.TestCase):
def setUp(self):
setupLogger("warning", "log_file")
def tearDown(self):
pass
def assertContains(self, filename, contents):
with open(filename) as f:
self.assertEqual(f.read(), contents)
return filename
def test_download(self):
d = download_file("http://deluge-torrent.org", "index.html")
d.addCallback(self.assertEqual, "index.html")
return d
def test_download_with_cookies(self):
pass
def test_download_without_required_cookies(self):
url = "http://deluge-torrent.org/httpdownloader.php?test=cookie"
d = download_file(url, "none")
d.addCallback(self.fail)
d.addErrback(self.assertIsInstance, Failure)
return d
def test_page_moved(self):
pass
def test_download_with_required_cookies(self):
url = "http://deluge-torrent.org/httpdownloader.php?test=cookie"
cookie = { "cookie" : "password=deluge" }
d = download_file(url, "monster", headers=cookie)
d.addCallback(self.assertEqual, "monster")
d.addCallback(self.assertContains, "COOKIE MONSTER!")
return d
def test_page_moved_permanently(self):
pass
def test_download_with_rename(self):
url = "http://deluge-torrent.org/httpdownloader.php?test=rename&filename=renamed"
d = download_file(url, "original")
d.addCallback(self.assertEqual, "renamed")
d.addCallback(self.assertContains, "This file should be called renamed")
return d
def test_page_not_modified(self):
pass
def test_download_with_rename_fail(self):
url = "http://deluge-torrent.org/httpdownloader.php?test=rename&filename=renamed"
d = download_file(url, "original")
d.addCallback(self.assertEqual, "original")
d.addCallback(self.assertContains, "This file should be called renamed")
return d
def test_download_with_rename_sanitised(self):
url = "http://deluge-torrent.org/httpdownloader.php?test=rename&filename=/etc/passwd"
d = download_file(url, "original")
d.addCallback(self.assertEqual, "passwd")
d.addCallback(self.assertContains, "This file should be called /etc/passwd")
return d
def test_download_with_rename_prevented(self):
url = "http://deluge-torrent.org/httpdownloader.php?test=rename&filename=spam"
d = download_file(url, "forced", force_filename=True)
d.addCallback(self.assertEqual, "forced")
d.addCallback(self.assertContains, "This file should be called spam")
return d
def test_download_with_gzip_encoding(self):
url = "http://deluge-torrent.org/httpdownloader.php?test=gzip&msg=success"
d = download_file(url, "gzip_encoded")
d.addCallback(self.assertContains, "success")
return d
def test_page_redirect(self):
url = "http://deluge-torrent.org/httpdownloader.php?test=redirect"
d = download_file(url, "none")
d.addCallback(self.fail)
d.addErrback(self.assertIsInstance, Failure)
return d
def test_page_not_found(self):
d = download_file("http://does.not.exist", "none")