[Tests] Fixed tests so that if the tcp port is used, other ports will be tested

This commit is contained in:
bendikro 2014-03-05 18:44:52 +01:00 committed by Calum Lind
parent 973e2d2ef8
commit 6c74e2d19c
4 changed files with 78 additions and 33 deletions

View File

@ -7,6 +7,7 @@ from subprocess import Popen, PIPE
import deluge.common import deluge.common
import deluge.configmanager import deluge.configmanager
import deluge.log import deluge.log
from twisted.internet.error import CannotListenError
deluge.log.setupLogger("none") deluge.log.setupLogger("none")
@ -24,29 +25,29 @@ def rpath(*args):
deluge.common.setup_translations() deluge.common.setup_translations()
def start_core(): def start_core(listen_port=58846):
CWD = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) CWD = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
DAEMON_SCRIPT = """ DAEMON_SCRIPT = """
import sys import sys
import deluge.main import deluge.main
sys.argv.extend(['-d', '-c', '%s', '-L', 'info']) sys.argv.extend(['-d', '-c', '%s', '-L', 'info', '-p', '%d'])
deluge.main.start_daemon() deluge.main.start_daemon()
""" """
config_directory = set_tmp_config_dir() config_directory = set_tmp_config_dir()
fp = tempfile.TemporaryFile() fp = tempfile.TemporaryFile()
fp.write(DAEMON_SCRIPT % config_directory) fp.write(DAEMON_SCRIPT % (config_directory, listen_port))
fp.seek(0) fp.seek(0)
core = Popen([sys.executable], cwd=CWD, stdin=fp, stdout=PIPE, stderr=PIPE) core = Popen([sys.executable], cwd=CWD, stdin=fp, stdout=PIPE, stderr=PIPE)
while True: while True:
line = core.stderr.readline() line = core.stderr.readline()
if "starting on 58846" in line: if ("starting on %d" % listen_port) in line:
time.sleep(0.3) # Slight pause just incase time.sleep(0.3) # Slight pause just incase
break break
elif "Couldn't listen on localhost:58846" in line: elif ("Couldn't listen on localhost:%d" % listen_port) in line:
raise SystemExit("Could not start deluge test client. %s" % line) raise CannotListenError("localhost", listen_port, "Could not start deluge test client: %s" % line)
elif 'Traceback' in line: elif 'Traceback' in line:
raise SystemExit( raise SystemExit(
"Failed to start core daemon. Do \"\"\" %s \"\"\" to see what's " "Failed to start core daemon. Do \"\"\" %s \"\"\" to see what's "

View File

@ -2,6 +2,7 @@ from twisted.internet import defer
from twisted.trial import unittest from twisted.trial import unittest
import deluge.tests.common as common import deluge.tests.common as common
from twisted.internet.error import CannotListenError
from deluge import error from deluge import error
from deluge.core.authmanager import AUTH_LEVEL_ADMIN from deluge.core.authmanager import AUTH_LEVEL_ADMIN
from deluge.ui.client import client, Client, DaemonSSLProxy from deluge.ui.client import client, Client, DaemonSSLProxy
@ -64,15 +65,29 @@ class NoVersionSendingClient(Client):
class ClientTestCase(unittest.TestCase): class ClientTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.core = common.start_core() self.listen_port = 58846
tries = 10
error = None
while tries > 0:
try:
self.core = common.start_core(listen_port=self.listen_port)
except CannotListenError, e:
error = e
self.listen_port += 1
tries -= 1
else:
error = None
break
if error:
raise error
def tearDown(self): def tearDown(self):
self.core.terminate() self.core.terminate()
def test_connect_no_credentials(self): def test_connect_no_credentials(self):
d = client.connect(
d = client.connect() "localhost", self.listen_port, username="", password=""
)
def on_connect(result): def on_connect(result):
self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN) self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN)
self.addCleanup(client.disconnect) self.addCleanup(client.disconnect)
@ -85,7 +100,7 @@ class ClientTestCase(unittest.TestCase):
from deluge.ui import common from deluge.ui import common
username, password = common.get_localhost_auth() username, password = common.get_localhost_auth()
d = client.connect( d = client.connect(
"localhost", 58846, username=username, password=password "localhost", self.listen_port, username=username, password=password
) )
def on_connect(result): def on_connect(result):
@ -100,7 +115,7 @@ class ClientTestCase(unittest.TestCase):
from deluge.ui import common from deluge.ui import common
username, password = common.get_localhost_auth() username, password = common.get_localhost_auth()
d = client.connect( d = client.connect(
"localhost", 58846, username=username, password=password + "1" "localhost", self.listen_port, username=username, password=password + "1"
) )
def on_failure(failure): def on_failure(failure):
@ -117,7 +132,7 @@ class ClientTestCase(unittest.TestCase):
from deluge.ui import common from deluge.ui import common
username, password = common.get_localhost_auth() username, password = common.get_localhost_auth()
d = client.connect( d = client.connect(
"localhost", 58846, username=username "localhost", self.listen_port, username=username
) )
def on_failure(failure): def on_failure(failure):
@ -136,7 +151,7 @@ class ClientTestCase(unittest.TestCase):
username, password = common.get_localhost_auth() username, password = common.get_localhost_auth()
no_version_sending_client = NoVersionSendingClient() no_version_sending_client = NoVersionSendingClient()
d = no_version_sending_client.connect( d = no_version_sending_client.connect(
"localhost", 58846, username=username, password=password "localhost", self.listen_port, username=username, password=password
) )
def on_failure(failure): def on_failure(failure):

View File

@ -13,6 +13,7 @@ from twisted.web.http import FORBIDDEN
from twisted.web.resource import Resource from twisted.web.resource import Resource
from twisted.web.server import Site from twisted.web.server import Site
from twisted.web.static import File from twisted.web.static import File
from twisted.internet.error import CannotListenError
import deluge.tests.common as common import deluge.tests.common as common
from deluge.core.rpcserver import RPCServer from deluge.core.rpcserver import RPCServer
@ -67,17 +68,30 @@ class TopLevelResource(Resource):
self.putChild("ubuntu-9.04-desktop-i386.iso.torrent", self.putChild("ubuntu-9.04-desktop-i386.iso.torrent",
File(common.rpath("ubuntu-9.04-desktop-i386.iso.torrent"))) File(common.rpath("ubuntu-9.04-desktop-i386.iso.torrent")))
class CoreTestCase(unittest.TestCase): class CoreTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
common.set_tmp_config_dir() common.set_tmp_config_dir()
self.rpcserver = RPCServer(listen=False) self.rpcserver = RPCServer(listen=False)
self.core = Core() self.core = Core()
self.listen_port = 51242
return component.start().addCallback(self.startWebserver) return component.start().addCallback(self.startWebserver)
def startWebserver(self, result): def startWebserver(self, result):
self.website = Site(TopLevelResource()) self.website = Site(TopLevelResource())
self.webserver = reactor.listenTCP(51242, self.website) tries = 10
error = None
while tries > 0:
try:
self.webserver = reactor.listenTCP(self.listen_port, self.website)
except CannotListenError, e:
error = e
self.listen_port += 1
tries -= 1
else:
error = None
break
if error:
raise error
return result return result
def tearDown(self): def tearDown(self):
@ -103,7 +117,7 @@ class CoreTestCase(unittest.TestCase):
self.assertEquals(torrent_id, info_hash) self.assertEquals(torrent_id, info_hash)
def test_add_torrent_url(self): def test_add_torrent_url(self):
url = "http://localhost:51242/ubuntu-9.04-desktop-i386.iso.torrent" url = "http://localhost:%d/ubuntu-9.04-desktop-i386.iso.torrent" % self.listen_port
options = {} options = {}
info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00" info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00"
@ -112,7 +126,7 @@ class CoreTestCase(unittest.TestCase):
return d return d
def test_add_torrent_url_with_cookie(self): def test_add_torrent_url_with_cookie(self):
url = "http://localhost:51242/cookie" url = "http://localhost:%d/cookie" % self.listen_port
options = {} options = {}
headers = {"Cookie": "password=deluge"} headers = {"Cookie": "password=deluge"}
info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00" info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00"
@ -126,7 +140,7 @@ class CoreTestCase(unittest.TestCase):
return d return d
def test_add_torrent_url_with_redirect(self): def test_add_torrent_url_with_redirect(self):
url = "http://localhost:51242/redirect" url = "http://localhost:%d/redirect" % self.listen_port
options = {} options = {}
info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00" info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00"
@ -136,7 +150,7 @@ class CoreTestCase(unittest.TestCase):
return d return d
def test_add_torrent_url_with_partial_download(self): def test_add_torrent_url_with_partial_download(self):
url = "http://localhost:51242/partial" url = "http://localhost:%d/partial" % self.listen_port
options = {} options = {}
info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00" info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00"

View File

@ -3,6 +3,7 @@ import warnings
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import reactor from twisted.internet import reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.internet.error import CannotListenError
from twisted.web.http import NOT_MODIFIED from twisted.web.http import NOT_MODIFIED
try: try:
from twisted.web.resource import Resource from twisted.web.resource import Resource
@ -89,7 +90,21 @@ class DownloadFileTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
setupLogger("warning", "log_file") setupLogger("warning", "log_file")
self.website = Site(TopLevelResource()) self.website = Site(TopLevelResource())
self.webserver = reactor.listenTCP(51242, self.website) self.listen_port = 51242
tries = 10
error = None
while tries > 0:
try:
self.webserver = reactor.listenTCP(self.listen_port, self.website)
except CannotListenError, e:
error = e
self.listen_port += 1
tries -= 1
else:
error = None
break
if error:
raise error
def tearDown(self): def tearDown(self):
return self.webserver.stopListening() return self.webserver.stopListening()
@ -115,19 +130,19 @@ class DownloadFileTestCase(unittest.TestCase):
return filename return filename
def test_download(self): def test_download(self):
d = download_file("http://localhost:51242/", "index.html") d = download_file("http://localhost:%d/" % self.listen_port, "index.html")
d.addCallback(self.assertEqual, "index.html") d.addCallback(self.assertEqual, "index.html")
return d return d
def test_download_without_required_cookies(self): def test_download_without_required_cookies(self):
url = "http://localhost:51242/cookie" url = "http://localhost:%d/cookie" % self.listen_port
d = download_file(url, "none") d = download_file(url, "none")
d.addCallback(self.fail) d.addCallback(self.fail)
d.addErrback(self.assertIsInstance, Failure) d.addErrback(self.assertIsInstance, Failure)
return d return d
def test_download_with_required_cookies(self): def test_download_with_required_cookies(self):
url = "http://localhost:51242/cookie" url = "http://localhost:%d/cookie" % self.listen_port
cookie = {"cookie": "password=deluge"} cookie = {"cookie": "password=deluge"}
d = download_file(url, "monster", headers=cookie) d = download_file(url, "monster", headers=cookie)
d.addCallback(self.assertEqual, "monster") d.addCallback(self.assertEqual, "monster")
@ -135,7 +150,7 @@ class DownloadFileTestCase(unittest.TestCase):
return d return d
def test_download_with_rename(self): def test_download_with_rename(self):
url = "http://localhost:51242/rename?filename=renamed" url = "http://localhost:%d/rename?filename=renamed" % self.listen_port
d = download_file(url, "original") d = download_file(url, "original")
d.addCallback(self.assertEqual, "renamed") d.addCallback(self.assertEqual, "renamed")
d.addCallback(self.assertContains, "This file should be called renamed") d.addCallback(self.assertContains, "This file should be called renamed")
@ -143,54 +158,54 @@ class DownloadFileTestCase(unittest.TestCase):
def test_download_with_rename_exists(self): def test_download_with_rename_exists(self):
open('renamed', 'w').close() open('renamed', 'w').close()
url = "http://localhost:51242/rename?filename=renamed" url = "http://localhost:%d/rename?filename=renamed" % self.listen_port
d = download_file(url, "original") d = download_file(url, "original")
d.addCallback(self.assertEqual, "renamed-1") d.addCallback(self.assertEqual, "renamed-1")
d.addCallback(self.assertContains, "This file should be called renamed") d.addCallback(self.assertContains, "This file should be called renamed")
return d return d
def test_download_with_rename_sanitised(self): def test_download_with_rename_sanitised(self):
url = "http://localhost:51242/rename?filename=/etc/passwd" url = "http://localhost:%d/rename?filename=/etc/passwd" % self.listen_port
d = download_file(url, "original") d = download_file(url, "original")
d.addCallback(self.assertEqual, "passwd") d.addCallback(self.assertEqual, "passwd")
d.addCallback(self.assertContains, "This file should be called /etc/passwd") d.addCallback(self.assertContains, "This file should be called /etc/passwd")
return d return d
def test_download_with_rename_prevented(self): def test_download_with_rename_prevented(self):
url = "http://localhost:51242/rename?filename=spam" url = "http://localhost:%d/rename?filename=spam" % self.listen_port
d = download_file(url, "forced", force_filename=True) d = download_file(url, "forced", force_filename=True)
d.addCallback(self.assertEqual, "forced") d.addCallback(self.assertEqual, "forced")
d.addCallback(self.assertContains, "This file should be called spam") d.addCallback(self.assertContains, "This file should be called spam")
return d return d
def test_download_with_gzip_encoding(self): def test_download_with_gzip_encoding(self):
url = "http://localhost:51242/gzip?msg=success" url = "http://localhost:%d/gzip?msg=success" % self.listen_port
d = download_file(url, "gzip_encoded") d = download_file(url, "gzip_encoded")
d.addCallback(self.assertContains, "success") d.addCallback(self.assertContains, "success")
return d return d
def test_download_with_gzip_encoding_disabled(self): def test_download_with_gzip_encoding_disabled(self):
url = "http://localhost:51242/gzip?msg=fail" url = "http://localhost:%d/gzip?msg=fail" % self.listen_port
d = download_file(url, "gzip_encoded", allow_compression=False) d = download_file(url, "gzip_encoded", allow_compression=False)
d.addCallback(self.failIfContains, "fail") d.addCallback(self.failIfContains, "fail")
return d return d
def test_page_redirect(self): def test_page_redirect(self):
url = 'http://localhost:51242/redirect' url = 'http://localhost:%d/redirect' % self.listen_port
d = download_file(url, "none") d = download_file(url, "none")
d.addCallback(self.fail) d.addCallback(self.fail)
d.addErrback(self.assertIsInstance, Failure) d.addErrback(self.assertIsInstance, Failure)
return d return d
def test_page_not_found(self): def test_page_not_found(self):
d = download_file("http://localhost:51242/page/not/found", "none") d = download_file("http://localhost:%d/page/not/found" % self.listen_port, "none")
d.addCallback(self.fail) d.addCallback(self.fail)
d.addErrback(self.assertIsInstance, Failure) d.addErrback(self.assertIsInstance, Failure)
return d return d
def test_page_not_modified(self): def test_page_not_modified(self):
headers = {'If-Modified-Since': formatdate(usegmt=True)} headers = {'If-Modified-Since': formatdate(usegmt=True)}
d = download_file("http://localhost:51242/", "index.html", headers=headers) d = download_file("http://localhost:%d/" % self.listen_port, "index.html", headers=headers)
d.addCallback(self.fail) d.addCallback(self.fail)
d.addErrback(self.assertIsInstance, Failure) d.addErrback(self.assertIsInstance, Failure)
return d return d