[Py3] A large set of fixes for tests to pass under Python 3
The usual minor fixes for unicode/bytes for library calls. The minimum Twisted version is now 16 for Python 3 support so remove old code and start replacing deprecated methods. Raised the minimum TLS version to 1.2 for the web server.
This commit is contained in:
parent
200e8f552b
commit
c3a2c67b98
|
@ -11,6 +11,7 @@
|
|||
from __future__ import division, print_function, unicode_literals
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import datetime
|
||||
import functools
|
||||
import glob
|
||||
|
@ -702,10 +703,11 @@ def get_magnet_info(uri):
|
|||
xt_hash = param[len(XT_BTIH_PARAM):]
|
||||
if len(xt_hash) == 32:
|
||||
try:
|
||||
info_hash = base64.b32decode(xt_hash.upper()).encode('hex')
|
||||
infohash_str = base64.b32decode(xt_hash.upper())
|
||||
except TypeError as ex:
|
||||
log.debug('Invalid base32 magnet hash: %s, %s', xt_hash, ex)
|
||||
break
|
||||
info_hash = binascii.hexlify(infohash_str)
|
||||
elif is_infohash(xt_hash):
|
||||
info_hash = xt_hash.lower()
|
||||
else:
|
||||
|
@ -744,11 +746,15 @@ def create_magnet_uri(infohash, name=None, trackers=None):
|
|||
|
||||
"""
|
||||
try:
|
||||
infohash = infohash.decode('hex')
|
||||
except AttributeError:
|
||||
pass
|
||||
infohash = binascii.unhexlify(infohash)
|
||||
except TypeError:
|
||||
infohash.encode('utf-8')
|
||||
|
||||
uri = [MAGNET_SCHEME, XT_BTIH_PARAM, base64.b32encode(infohash)]
|
||||
uri = [
|
||||
MAGNET_SCHEME,
|
||||
XT_BTIH_PARAM,
|
||||
base64.b32encode(infohash).decode('utf-8'),
|
||||
]
|
||||
if name:
|
||||
uri.extend(['&', DN_PARAM, name])
|
||||
if trackers:
|
||||
|
|
|
@ -188,12 +188,14 @@ class Config(object):
|
|||
if self.__config[key] == value:
|
||||
return
|
||||
|
||||
# Do not allow the type to change unless it is None
|
||||
if value is not None and not isinstance(
|
||||
self.__config[key], type(None),
|
||||
) and not isinstance(self.__config[key], type(value)):
|
||||
# Change the value type if it is not None and does not match.
|
||||
type_match = isinstance(self.__config[key], (type(None), type(value)))
|
||||
if value is not None and not type_match:
|
||||
try:
|
||||
oldtype = type(self.__config[key])
|
||||
# Don't convert to bytes as requires encoding and value will
|
||||
# be decoded anyway.
|
||||
if oldtype is not bytes:
|
||||
value = oldtype(value)
|
||||
except ValueError:
|
||||
log.warning('Value Type "%s" invalid for key: %s', type(value), key)
|
||||
|
|
|
@ -19,7 +19,7 @@ import threading
|
|||
from base64 import b64decode, b64encode
|
||||
|
||||
from twisted.internet import defer, reactor, task
|
||||
from twisted.web.client import getPage
|
||||
from twisted.web.client import Agent, readBody
|
||||
|
||||
import deluge.common
|
||||
import deluge.component as component
|
||||
|
@ -1116,13 +1116,21 @@ class Core(component.Component):
|
|||
:rtype: bool
|
||||
|
||||
"""
|
||||
d = getPage(
|
||||
b'http://deluge-torrent.org/test_port.php?port=%s' %
|
||||
self.get_listen_port(), timeout=30,
|
||||
port = self.get_listen_port()
|
||||
url = 'https://deluge-torrent.org/test_port.php?port=%s' % port
|
||||
agent = Agent(reactor, connectTimeout=30)
|
||||
d = agent.request(
|
||||
b'GET',
|
||||
url.encode('utf-8'),
|
||||
)
|
||||
|
||||
def on_get_page(result):
|
||||
return bool(int(result))
|
||||
def on_get_page(response):
|
||||
d = readBody(response)
|
||||
d.addCallback(on_read_body)
|
||||
return d
|
||||
|
||||
def on_read_body(body):
|
||||
return bool(int(body))
|
||||
|
||||
def on_error(failure):
|
||||
log.warning('Error testing listen port: %s', failure)
|
||||
|
|
|
@ -17,6 +17,7 @@ import zlib
|
|||
from twisted.internet import reactor
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.web import client, http
|
||||
from twisted.web.client import URI
|
||||
from twisted.web.error import PageRedirect
|
||||
|
||||
from deluge.common import get_version, utf8_encode_structure
|
||||
|
@ -60,29 +61,29 @@ class HTTPDownloader(client.HTTPDownloader):
|
|||
self.force_filename = force_filename
|
||||
self.allow_compression = allow_compression
|
||||
self.code = None
|
||||
agent = b'Deluge/%s (http://deluge-torrent.org)' % get_version().encode('utf8')
|
||||
|
||||
client.HTTPDownloader.__init__(self, url, filename, headers=headers, agent=agent)
|
||||
|
||||
def gotStatus(self, version, status, message): # NOQA: N802
|
||||
self.code = int(status)
|
||||
client.HTTPDownloader.gotStatus(self, version, status, message)
|
||||
agent = 'Deluge/%s (http://deluge-torrent.org)' % get_version()
|
||||
client.HTTPDownloader.__init__(
|
||||
self, url, filename, headers=headers, agent=agent.encode('utf-8'))
|
||||
|
||||
def gotHeaders(self, headers): # NOQA: N802
|
||||
self.code = int(self.status)
|
||||
if self.code == http.OK:
|
||||
if 'content-length' in headers:
|
||||
self.total_length = int(headers['content-length'][0])
|
||||
if b'content-length' in headers:
|
||||
self.total_length = int(headers[b'content-length'][0])
|
||||
else:
|
||||
self.total_length = 0
|
||||
|
||||
if self.allow_compression and 'content-encoding' in headers and \
|
||||
headers['content-encoding'][0] in ('gzip', 'x-gzip', 'deflate'):
|
||||
encodings_accepted = [b'gzip', b'x-gzip', b'deflate']
|
||||
if (
|
||||
self.allow_compression and b'content-encoding' in headers
|
||||
and headers[b'content-encoding'][0] in encodings_accepted
|
||||
):
|
||||
# Adding 32 to the wbits enables gzip & zlib decoding (with automatic header detection)
|
||||
# Adding 16 just enables gzip decoding (no zlib)
|
||||
self.decoder = zlib.decompressobj(zlib.MAX_WBITS + 32)
|
||||
|
||||
if 'content-disposition' in headers and not self.force_filename:
|
||||
content_disp = str(headers['content-disposition'][0])
|
||||
if b'content-disposition' in headers and not self.force_filename:
|
||||
content_disp = headers[b'content-disposition'][0].decode('utf-8')
|
||||
content_disp_params = cgi.parse_header(content_disp)[1]
|
||||
if 'filename' in content_disp_params:
|
||||
new_file_name = content_disp_params['filename']
|
||||
|
@ -100,8 +101,13 @@ class HTTPDownloader(client.HTTPDownloader):
|
|||
self.fileName = new_file_name
|
||||
self.value = new_file_name
|
||||
|
||||
elif self.code in (http.MOVED_PERMANENTLY, http.FOUND, http.SEE_OTHER, http.TEMPORARY_REDIRECT):
|
||||
location = headers['location'][0]
|
||||
elif self.code in (
|
||||
http.MOVED_PERMANENTLY,
|
||||
http.FOUND,
|
||||
http.SEE_OTHER,
|
||||
http.TEMPORARY_REDIRECT,
|
||||
):
|
||||
location = headers[b'location'][0]
|
||||
error = PageRedirect(self.code, location=location)
|
||||
self.noPage(Failure(error))
|
||||
|
||||
|
@ -185,26 +191,14 @@ def _download_file(url, filename, callback=None, headers=None, force_filename=Fa
|
|||
headers['accept-encoding'] = 'deflate, gzip, x-gzip'
|
||||
|
||||
url = url.encode('utf8')
|
||||
filename = filename.encode('utf8')
|
||||
headers = utf8_encode_structure(headers) if headers else headers
|
||||
factory = HTTPDownloader(url, filename, callback, headers, force_filename, allow_compression)
|
||||
|
||||
# In Twisted 13.1.0 _parse() function replaced by _URI class.
|
||||
# In Twisted 15.0.0 _URI class renamed to URI.
|
||||
if hasattr(client, '_parse'):
|
||||
scheme, host, port, dummy_path = client._parse(url)
|
||||
else:
|
||||
try:
|
||||
from twisted.web.client import _URI as URI
|
||||
except ImportError:
|
||||
from twisted.web.client import URI
|
||||
finally:
|
||||
uri = URI.fromBytes(url)
|
||||
scheme = uri.scheme
|
||||
host = uri.host
|
||||
port = uri.port
|
||||
|
||||
if scheme == 'https':
|
||||
if uri.scheme == b'https':
|
||||
from twisted.internet import ssl
|
||||
# ClientTLSOptions in Twisted >= 14, see ticket #2765 for details on this addition.
|
||||
try:
|
||||
|
|
|
@ -19,7 +19,7 @@ from deluge.config import Config
|
|||
|
||||
from .common import set_tmp_config_dir
|
||||
|
||||
DEFAULTS = {'string': b'foobar', 'int': 1, 'float': 0.435, 'bool': True, 'unicode': 'foobar'}
|
||||
DEFAULTS = {'string': 'foobar', 'int': 1, 'float': 0.435, 'bool': True, 'unicode': 'foobar'}
|
||||
|
||||
|
||||
class ConfigTestCase(unittest.TestCase):
|
||||
|
@ -100,8 +100,8 @@ class ConfigTestCase(unittest.TestCase):
|
|||
# Test opening a previous 1.2 config file of having the format versions
|
||||
# as ints
|
||||
with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file:
|
||||
_file.write(str(1) + '\n')
|
||||
_file.write(str(1) + '\n')
|
||||
_file.write(bytes(1) + b'\n')
|
||||
_file.write(bytes(1) + b'\n')
|
||||
json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT)
|
||||
|
||||
check_config()
|
||||
|
|
|
@ -42,7 +42,7 @@ class RedirectResource(Resource):
|
|||
class RenameResource(Resource):
|
||||
|
||||
def render(self, request):
|
||||
filename = request.args.get('filename', ['renamed_file'])[0]
|
||||
filename = request.args.get(b'filename', [b'renamed_file'])[0]
|
||||
request.setHeader(b'Content-Type', b'text/plain')
|
||||
request.setHeader(
|
||||
b'Content-Disposition', b'attachment; filename=' +
|
||||
|
@ -63,10 +63,10 @@ class CookieResource(Resource):
|
|||
|
||||
def render(self, request):
|
||||
request.setHeader(b'Content-Type', b'text/plain')
|
||||
if request.getCookie('password') is None:
|
||||
if request.getCookie(b'password') is None:
|
||||
return b'Password cookie not set!'
|
||||
|
||||
if request.getCookie('password') == 'deluge':
|
||||
if request.getCookie(b'password') == b'deluge':
|
||||
return b'COOKIE MONSTER!'
|
||||
|
||||
return request.getCookie('password')
|
||||
|
@ -75,7 +75,7 @@ class CookieResource(Resource):
|
|||
class GzipResource(Resource):
|
||||
|
||||
def render(self, request):
|
||||
message = request.args.get('msg', ['EFFICIENCY!'])[0]
|
||||
message = request.args.get(b'msg', [b'EFFICIENCY!'])[0]
|
||||
request.setHeader(b'Content-Type', b'text/plain')
|
||||
return compress(message, request)
|
||||
|
||||
|
@ -105,16 +105,16 @@ class TopLevelResource(Resource):
|
|||
|
||||
def __init__(self):
|
||||
Resource.__init__(self)
|
||||
self.putChild('cookie', CookieResource())
|
||||
self.putChild('gzip', GzipResource())
|
||||
self.putChild(b'cookie', CookieResource())
|
||||
self.putChild(b'gzip', GzipResource())
|
||||
self.redirect_rsrc = RedirectResource()
|
||||
self.putChild('redirect', self.redirect_rsrc)
|
||||
self.putChild('rename', RenameResource())
|
||||
self.putChild('attachment', AttachmentResource())
|
||||
self.putChild('partial', PartialDownloadResource())
|
||||
self.putChild(b'redirect', self.redirect_rsrc)
|
||||
self.putChild(b'rename', RenameResource())
|
||||
self.putChild(b'attachment', AttachmentResource())
|
||||
self.putChild(b'partial', PartialDownloadResource())
|
||||
|
||||
def getChild(self, path, request): # NOQA: N802
|
||||
if path == '':
|
||||
if not path:
|
||||
return self
|
||||
else:
|
||||
return Resource.getChild(self, path, request)
|
||||
|
@ -157,8 +157,8 @@ class DownloadFileTestCase(unittest.TestCase):
|
|||
self.fail(ex)
|
||||
return filename
|
||||
|
||||
def assertNotContains(self, filename, contents): # NOQA
|
||||
with open(filename) as _file:
|
||||
def assertNotContains(self, filename, contents, file_mode=''): # NOQA
|
||||
with open(filename, file_mode) as _file:
|
||||
try:
|
||||
self.assertNotEqual(_file.read(), contents)
|
||||
except Exception as ex:
|
||||
|
@ -236,13 +236,13 @@ class DownloadFileTestCase(unittest.TestCase):
|
|||
def test_download_with_gzip_encoding(self):
|
||||
url = self.get_url('gzip?msg=success')
|
||||
d = download_file(url, fname('gzip_encoded'))
|
||||
d.addCallback(self.assertContains, b'success')
|
||||
d.addCallback(self.assertContains, 'success')
|
||||
return d
|
||||
|
||||
def test_download_with_gzip_encoding_disabled(self):
|
||||
url = self.get_url('gzip?msg=fail')
|
||||
d = download_file(url, fname('gzip_encoded'), allow_compression=False)
|
||||
d.addCallback(self.assertNotContains, b'fail')
|
||||
d.addCallback(self.assertNotContains, 'fail', file_mode='rb')
|
||||
return d
|
||||
|
||||
def test_page_redirect_unhandled(self):
|
||||
|
|
|
@ -152,8 +152,8 @@ Please use commands from the command line, e.g.:\n
|
|||
|
||||
# We use the curses.wrapper function to prevent the console from getting
|
||||
# messed up if an uncaught exception is experienced.
|
||||
import curses.wrapper
|
||||
curses.wrapper(self.run)
|
||||
from curses import wrapper
|
||||
wrapper(self.run)
|
||||
|
||||
def quit(self):
|
||||
if client.connected():
|
||||
|
|
|
@ -100,6 +100,8 @@ class BaseWindow(object):
|
|||
self._height, self._width = rows, cols
|
||||
|
||||
def move_window(self, posy, posx):
|
||||
posy = int(posy)
|
||||
posx = int(posx)
|
||||
self.outer_screen.mvwin(posy, posx)
|
||||
self.posy = posy
|
||||
self.posx = posx
|
||||
|
|
|
@ -19,7 +19,7 @@ import tempfile
|
|||
from OpenSSL.crypto import FILETYPE_PEM
|
||||
from twisted.application import internet, service
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.internet.ssl import SSL, Certificate, CertificateOptions, KeyPair
|
||||
from twisted.internet.ssl import SSL, Certificate, CertificateOptions, KeyPair, TLSVersion
|
||||
from twisted.web import http, resource, server, static
|
||||
|
||||
from deluge import common, component, configmanager
|
||||
|
@ -668,7 +668,11 @@ class DelugeWeb(component.Component):
|
|||
certificate = Certificate.loadPEM(cert.read()).original
|
||||
with open(configmanager.get_config_dir(self.pkey)) as pkey:
|
||||
private_key = KeyPair.load(pkey.read(), FILETYPE_PEM).original
|
||||
options = CertificateOptions(privateKey=private_key, certificate=certificate, method=SSL.SSLv23_METHOD)
|
||||
options = CertificateOptions(
|
||||
privateKey=private_key,
|
||||
certificate=certificate,
|
||||
raiseMinimumTo=TLSVersion.TLSv1_2,
|
||||
)
|
||||
ctx = options.getContext()
|
||||
ctx.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
|
||||
ctx.use_certificate_chain_file(configmanager.get_config_dir(self.cert))
|
||||
|
|
Loading…
Reference in New Issue