Fix gettext setup in test_common and log in test_transfer so tests run standalone

Add extra tests to test_config
Run the test files through flake8 to tidy up code
This commit is contained in:
Calum Lind 2013-05-22 00:08:33 +01:00
parent 28d7c5d44a
commit e2e09200c4
18 changed files with 132 additions and 99 deletions

View File

@ -10,21 +10,20 @@ import deluge.log
deluge.log.setupLogger("none") deluge.log.setupLogger("none")
def set_tmp_config_dir(): def set_tmp_config_dir():
config_directory = tempfile.mkdtemp() config_directory = tempfile.mkdtemp()
deluge.configmanager.set_config_dir(config_directory) deluge.configmanager.set_config_dir(config_directory)
return config_directory return config_directory
def rpath(*args): def rpath(*args):
return os.path.join(os.path.dirname(__file__), *args) return os.path.join(os.path.dirname(__file__), *args)
import gettext
import locale
import pkg_resources
# Initialize gettext # Initialize gettext
deluge.common.setup_translations() deluge.common.setup_translations()
def start_core(): def start_core():
CWD = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) CWD = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
DAEMON_SCRIPT = """ DAEMON_SCRIPT = """

View File

@ -1,10 +1,8 @@
from twisted.trial import unittest from twisted.trial import unittest
import common
from deluge.core.alertmanager import AlertManager
from deluge.core.core import Core
import deluge.component as component import deluge.component as component
from deluge.core.core import Core
class AlertManagerTestCase(unittest.TestCase): class AlertManagerTestCase(unittest.TestCase):
def setUp(self): def setUp(self):

View File

@ -1,9 +1,8 @@
from twisted.trial import unittest from twisted.trial import unittest
import common
from deluge.core.authmanager import AuthManager, AUTH_LEVEL_ADMIN from deluge.core.authmanager import AuthManager, AUTH_LEVEL_ADMIN
class AuthManagerTestCase(unittest.TestCase): class AuthManagerTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.auth = AuthManager() self.auth = AuthManager()

View File

@ -1,9 +1,7 @@
import common
from twisted.internet import defer from twisted.internet import defer
from twisted.trial import unittest from twisted.trial import unittest
import deluge.tests.common as common
from deluge import error from deluge import error
from deluge.core.authmanager import AUTH_LEVEL_ADMIN from deluge.core.authmanager import AUTH_LEVEL_ADMIN
from deluge.ui.client import client, Client, DaemonSSLProxy from deluge.ui.client import client, Client, DaemonSSLProxy
@ -23,6 +21,7 @@ class NoVersionSendingDaemonSSLProxy(DaemonSSLProxy):
def __on_login_fail(self, result): def __on_login_fail(self, result):
self.login_deferred.errback(result) self.login_deferred.errback(result)
class NoVersionSendingClient(Client): class NoVersionSendingClient(Client):
def connect(self, host="127.0.0.1", port=58846, username="", password="", def connect(self, host="127.0.0.1", port=58846, username="", password="",
@ -61,6 +60,7 @@ class NoVersionSendingClient(Client):
if self.disconnect_callback: if self.disconnect_callback:
self.disconnect_callback() self.disconnect_callback()
class ClientTestCase(unittest.TestCase): class ClientTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
@ -100,7 +100,7 @@ class ClientTestCase(unittest.TestCase):
from deluge.ui import common from deluge.ui import common
username, password = common.get_localhost_auth() username, password = common.get_localhost_auth()
d = client.connect( d = client.connect(
"localhost", 58846, username=username, password=password+'1' "localhost", 58846, username=username, password=password + "1"
) )
def on_failure(failure): def on_failure(failure):

View File

@ -1,10 +1,12 @@
from twisted.trial import unittest
from deluge.common import *
import os import os
from twisted.trial import unittest
from deluge.common import (setup_translations, VersionSplit, fsize, fpcnt, fspeed, fpeer,
ftime, fdate, is_url, is_magnet, get_path_size, is_ip)
class CommonTestCase(unittest.TestCase): class CommonTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
pass setup_translations()
def tearDown(self): def tearDown(self):
pass pass

View File

@ -2,6 +2,7 @@ from twisted.trial import unittest
from twisted.internet import threads from twisted.internet import threads
import deluge.component as component import deluge.component as component
class testcomponent(component.Component): class testcomponent(component.Component):
def __init__(self, name, depend=None): def __init__(self, name, depend=None):
component.Component.__init__(self, name, depend=depend) component.Component.__init__(self, name, depend=depend)
@ -14,16 +15,19 @@ class testcomponent(component.Component):
def stop(self): def stop(self):
self.stop_count += 1 self.stop_count += 1
class testcomponent_delaystart(testcomponent): class testcomponent_delaystart(testcomponent):
def start(self): def start(self):
def do_sleep(): def do_sleep():
import time import time
time.sleep(1) time.sleep(1)
d = threads.deferToThread(do_sleep) d = threads.deferToThread(do_sleep)
def on_done(result): def on_done(result):
self.start_count += 1 self.start_count += 1
return d.addCallback(on_done) return d.addCallback(on_done)
class testcomponent_update(component.Component): class testcomponent_update(component.Component):
def __init__(self, name): def __init__(self, name):
component.Component.__init__(self, name) component.Component.__init__(self, name)
@ -37,6 +41,7 @@ class testcomponent_update(component.Component):
def stop(self): def stop(self):
self.stop_count += 1 self.stop_count += 1
class testcomponent_shutdown(component.Component): class testcomponent_shutdown(component.Component):
def __init__(self, name): def __init__(self, name):
component.Component.__init__(self, name) component.Component.__init__(self, name)
@ -49,6 +54,7 @@ class testcomponent_shutdown(component.Component):
def stop(self): def stop(self):
self.stop_count += 1 self.stop_count += 1
class ComponentTestClass(unittest.TestCase): class ComponentTestClass(unittest.TestCase):
def tearDown(self): def tearDown(self):
component.stop() component.stop()
@ -112,7 +118,7 @@ class ComponentTestClass(unittest.TestCase):
return ret[0] return ret[0]
def test_register_exception(self): def test_register_exception(self):
c1 = testcomponent("test_register_exception_c1") testcomponent("test_register_exception_c1")
self.assertRaises( self.assertRaises(
component.ComponentAlreadyRegistered, component.ComponentAlreadyRegistered,
testcomponent, testcomponent,

View File

@ -1,18 +1,16 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from twisted.trial import unittest
from twisted.python.failure import Failure
import common
import os import os
from twisted.trial import unittest
from deluge.tests.common import set_tmp_config_dir
from deluge.config import Config from deluge.config import Config
DEFAULTS = {"string": "foobar", "int": 1, "float": 0.435, "bool": True, "unicode": u"foobar"} DEFAULTS = {"string": "foobar", "int": 1, "float": 0.435, "bool": True, "unicode": u"foobar"}
class ConfigTestCase(unittest.TestCase): class ConfigTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.config_dir = common.set_tmp_config_dir() self.config_dir = set_tmp_config_dir()
def test_init(self): def test_init(self):
config = Config("test.conf", defaults=DEFAULTS, config_dir=self.config_dir) config = Config("test.conf", defaults=DEFAULTS, config_dir=self.config_dir)
@ -26,12 +24,19 @@ class ConfigTestCase(unittest.TestCase):
config["foo"] = 1 config["foo"] = 1
self.assertEquals(config["foo"], 1) self.assertEquals(config["foo"], 1)
self.assertRaises(ValueError, config.set_item, "foo", "bar") self.assertRaises(ValueError, config.set_item, "foo", "bar")
config["foo"] = 2 config["foo"] = 2
self.assertEquals(config.get_item("foo"), 2) self.assertEquals(config.get_item("foo"), 2)
config["foo"] = "3"
self.assertEquals(config.get_item("foo"), 3)
config["unicode"] = u"ВИДЕОФИЛЬМЫ" config["unicode"] = u"ВИДЕОФИЛЬМЫ"
self.assertEquals(config["unicode"], u"ВИДЕОФИЛЬМЫ") self.assertEquals(config["unicode"], u"ВИДЕОФИЛЬМЫ")
config["unicode"] = "foostring"
self.assertTrue(isinstance(config.get_item("unicode"), unicode))
config._save_timer.cancel() config._save_timer.cancel()
def test_get(self): def test_get(self):

View File

@ -1,3 +1,11 @@
import os
import warnings
try:
from hashlib import sha1 as sha
except ImportError:
from sha import sha
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import reactor from twisted.internet import reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -6,16 +14,7 @@ from twisted.web.resource import Resource
from twisted.web.server import Site from twisted.web.server import Site
from twisted.web.static import File from twisted.web.static import File
try: import deluge.tests.common as common
from hashlib import sha1 as sha
except ImportError:
from sha import sha
import os
import common
import warnings
rpath = common.rpath
from deluge.core.rpcserver import RPCServer from deluge.core.rpcserver import RPCServer
from deluge.core.core import Core from deluge.core.core import Core
warnings.filterwarnings("ignore", category=RuntimeWarning) warnings.filterwarnings("ignore", category=RuntimeWarning)
@ -24,6 +23,9 @@ warnings.resetwarnings()
import deluge.component as component import deluge.component as component
import deluge.error import deluge.error
rpath = common.rpath
class TestCookieResource(Resource): class TestCookieResource(Resource):
def render(self, request): def render(self, request):
@ -34,6 +36,7 @@ class TestCookieResource(Resource):
request.setHeader("Content-Type", "application/x-bittorrent") request.setHeader("Content-Type", "application/x-bittorrent")
return open(rpath("ubuntu-9.04-desktop-i386.iso.torrent")).read() return open(rpath("ubuntu-9.04-desktop-i386.iso.torrent")).read()
class TestPartialDownload(Resource): class TestPartialDownload(Resource):
def render(self, request): def render(self, request):
@ -44,12 +47,14 @@ class TestPartialDownload(Resource):
return compress(data, request) return compress(data, request)
return data return data
class TestRedirectResource(Resource): class TestRedirectResource(Resource):
def render(self, request): def render(self, request):
request.redirect("/ubuntu-9.04-desktop-i386.iso.torrent") request.redirect("/ubuntu-9.04-desktop-i386.iso.torrent")
return "" return ""
class TopLevelResource(Resource): class TopLevelResource(Resource):
addSlash = True addSlash = True
@ -59,7 +64,9 @@ class TopLevelResource(Resource):
self.putChild("cookie", TestCookieResource()) self.putChild("cookie", TestCookieResource())
self.putChild("partial", TestPartialDownload()) self.putChild("partial", TestPartialDownload())
self.putChild("redirect", TestRedirectResource()) self.putChild("redirect", TestRedirectResource())
self.putChild("ubuntu-9.04-desktop-i386.iso.torrent", File(common.rpath("ubuntu-9.04-desktop-i386.iso.torrent"))) self.putChild("ubuntu-9.04-desktop-i386.iso.torrent",
File(common.rpath("ubuntu-9.04-desktop-i386.iso.torrent")))
class CoreTestCase(unittest.TestCase): class CoreTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
@ -107,7 +114,7 @@ class CoreTestCase(unittest.TestCase):
def test_add_torrent_url_with_cookie(self): def test_add_torrent_url_with_cookie(self):
url = "http://localhost:51242/cookie" url = "http://localhost:51242/cookie"
options = {} options = {}
headers = { "Cookie" : "password=deluge" } headers = {"Cookie": "password=deluge"}
info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00" info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00"
d = self.core.add_torrent_url(url, options) d = self.core.add_torrent_url(url, options)
@ -188,9 +195,9 @@ class CoreTestCase(unittest.TestCase):
def test_sanitize_filepath(self): def test_sanitize_filepath(self):
pathlist = { pathlist = {
'\\backslash\\path\\' : 'backslash/path', '\\backslash\\path\\': 'backslash/path',
' single_file ': 'single_file', ' single_file ': 'single_file',
'..' : '', '..': '',
'/../..../': '', '/../..../': '',
' Def ////ad./ / . . /b d /file': 'Def/ad./. ./b d/file', ' Def ////ad./ / . . /b d /file': 'Def/ad./. ./b d/file',
'/ test /\\.. /.file/': 'test/.file', '/ test /\\.. /.file/': 'test/.file',

View File

@ -2,6 +2,7 @@ from twisted.trial import unittest
from deluge.decorators import proxy from deluge.decorators import proxy
class DecoratorsTestCase(unittest.TestCase): class DecoratorsTestCase(unittest.TestCase):
def test_proxy_with_simple_functions(self): def test_proxy_with_simple_functions(self):
def negate(func, *args, **kwargs): def negate(func, *args, **kwargs):

View File

@ -1,15 +1,14 @@
import os
import warnings import warnings
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import reactor from twisted.internet import reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.web.http import FORBIDDEN, NOT_MODIFIED from twisted.web.http import NOT_MODIFIED
try: try:
from twisted.web.resource import Resource, ForbiddenResource from twisted.web.resource import Resource
except ImportError: except ImportError:
# twisted 8 # twisted 8
from twisted.web.error import Resource, ForbiddenResource from twisted.web.error import Resource
from twisted.web.server import Site from twisted.web.server import Site
from deluge.httpdownloader import download_file from deluge.httpdownloader import download_file
@ -21,14 +20,16 @@ warnings.resetwarnings()
from email.utils import formatdate from email.utils import formatdate
import common import deluge.tests.common as common
rpath = common.rpath rpath = common.rpath
class TestRedirectResource(Resource): class TestRedirectResource(Resource):
def render(self, request): def render(self, request):
request.redirect("http://localhost:51242/") request.redirect("http://localhost:51242/")
class TestRenameResource(Resource): class TestRenameResource(Resource):
def render(self, request): def render(self, request):
@ -38,6 +39,7 @@ class TestRenameResource(Resource):
filename) filename)
return "This file should be called " + filename return "This file should be called " + filename
class TestCookieResource(Resource): class TestCookieResource(Resource):
def render(self, request): def render(self, request):
@ -50,6 +52,7 @@ class TestCookieResource(Resource):
return request.getCookie("password") return request.getCookie("password")
class TestGzipResource(Resource): class TestGzipResource(Resource):
def render(self, request): def render(self, request):
@ -57,6 +60,7 @@ class TestGzipResource(Resource):
request.setHeader("Content-Type", "text/plain") request.setHeader("Content-Type", "text/plain")
return compress(message, request) return compress(message, request)
class TopLevelResource(Resource): class TopLevelResource(Resource):
addSlash = True addSlash = True
@ -79,6 +83,7 @@ class TopLevelResource(Resource):
request.setResponseCode(NOT_MODIFIED) request.setResponseCode(NOT_MODIFIED)
return "<h1>Deluge HTTP Downloader tests webserver here</h1>" return "<h1>Deluge HTTP Downloader tests webserver here</h1>"
class DownloadFileTestCase(unittest.TestCase): class DownloadFileTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
@ -123,7 +128,7 @@ class DownloadFileTestCase(unittest.TestCase):
def test_download_with_required_cookies(self): def test_download_with_required_cookies(self):
url = "http://localhost:51242/cookie" url = "http://localhost:51242/cookie"
cookie = { "cookie" : "password=deluge" } cookie = {"cookie": "password=deluge"}
d = download_file(url, "monster", headers=cookie) d = download_file(url, "monster", headers=cookie)
d.addCallback(self.assertEqual, "monster") d.addCallback(self.assertEqual, "monster")
d.addCallback(self.assertContains, "COOKIE MONSTER!") d.addCallback(self.assertContains, "COOKIE MONSTER!")
@ -184,7 +189,7 @@ class DownloadFileTestCase(unittest.TestCase):
return d return d
def test_page_not_modified(self): def test_page_not_modified(self):
headers = { 'If-Modified-Since' : formatdate(usegmt=True) } headers = {'If-Modified-Since': formatdate(usegmt=True)}
d = download_file("http://localhost:51242/", "index.html", headers=headers) d = download_file("http://localhost:51242/", "index.html", headers=headers)
d.addCallback(self.fail) d.addCallback(self.fail)
d.addErrback(self.assertIsInstance, Failure) d.addErrback(self.assertIsInstance, Failure)

View File

@ -3,6 +3,7 @@ from twisted.internet import defer
from twisted.trial import unittest from twisted.trial import unittest
from deluge.log import setupLogger from deluge.log import setupLogger
class LogTestCase(unittest.TestCase): class LogTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
setupLogger(logging.DEBUG) setupLogger(logging.DEBUG)

View File

@ -1,6 +1,4 @@
from twisted.trial import unittest from twisted.trial import unittest
from twisted.python.failure import Failure
import os import os
import tempfile import tempfile
@ -14,7 +12,8 @@ def check_torrent(filename):
# Test loading with our internal TorrentInfo class # Test loading with our internal TorrentInfo class
from deluge.ui.common import TorrentInfo from deluge.ui.common import TorrentInfo
ti = TorrentInfo(filename) TorrentInfo(filename)
class MakeTorrentTestCase(unittest.TestCase): class MakeTorrentTestCase(unittest.TestCase):
def test_save_multifile(self): def test_save_multifile(self):
@ -39,7 +38,7 @@ class MakeTorrentTestCase(unittest.TestCase):
def test_save_singlefile(self): def test_save_singlefile(self):
tmp_data = tempfile.mkstemp("testdata")[1] tmp_data = tempfile.mkstemp("testdata")[1]
open(tmp_data, "wb").write("a"*(2314*1024)) open(tmp_data, "wb").write("a" * (2314 * 1024))
t = maketorrent.TorrentMetadata() t = maketorrent.TorrentMetadata()
t.data_path = tmp_data t.data_path = tmp_data
tmp_file = tempfile.mkstemp(".torrent")[1] tmp_file = tempfile.mkstemp(".torrent")[1]

View File

@ -3,5 +3,5 @@ pm = deluge.pluginmanagerbase.PluginManagerBase("core.conf", "deluge.plugin.core
for p in pm.get_available_plugins(): for p in pm.get_available_plugins():
print "Plugin: %s" % (p) print "Plugin: %s" % (p)
for k,v in pm.get_plugin_info(p).items(): for k, v in pm.get_plugin_info(p).items():
print "%s: %s" % (k, v) print "%s: %s" % (k, v)

View File

@ -4,6 +4,7 @@ from twisted.internet.defer import maybeDeferred, succeed
import deluge.ui.sessionproxy import deluge.ui.sessionproxy
import deluge.component as component import deluge.component as component
class Core(object): class Core(object):
def __init__(self): def __init__(self):
self.reset() self.reset()
@ -70,18 +71,22 @@ class Core(object):
self.prev_status[torrent] = dict(self.torrents[torrent]) self.prev_status[torrent] = dict(self.torrents[torrent])
return succeed(ret) return succeed(ret)
class Client(object): class Client(object):
def __init__(self): def __init__(self):
self.core = Core() self.core = Core()
def __noop__(self, *args, **kwargs): def __noop__(self, *args, **kwargs):
return None return None
def __getattr__(self, *args, **kwargs): def __getattr__(self, *args, **kwargs):
return self.__noop__ return self.__noop__
client = Client() client = Client()
deluge.ui.sessionproxy.client = client deluge.ui.sessionproxy.client = client
class SessionProxyTestCase(unittest.TestCase): class SessionProxyTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.sp = deluge.ui.sessionproxy.SessionProxy() self.sp = deluge.ui.sessionproxy.SessionProxy()

View File

@ -3,7 +3,7 @@ import os
import deluge.core.torrent import deluge.core.torrent
import test_torrent import test_torrent
import common import deluge.tests.common as common
from deluge.core.rpcserver import RPCServer from deluge.core.rpcserver import RPCServer
from deluge.core.core import Core from deluge.core.core import Core
@ -16,6 +16,7 @@ config_setup = False
core = None core = None
rpcserver = None rpcserver = None
# This is called by torrent.py when calling component.get("...") # This is called by torrent.py when calling component.get("...")
def get(key): def get(key):
if key is "Core": if key is "Core":
@ -25,13 +26,15 @@ def get(key):
else: else:
return None return None
class TorrentTestCase(unittest.TestCase): class TorrentTestCase(unittest.TestCase):
def setup_config(self): def setup_config(self):
global config_setup global config_setup
config_setup = True config_setup = True
config_dir = common.set_tmp_config_dir() config_dir = common.set_tmp_config_dir()
core_config = deluge.config.Config("core.conf", defaults=deluge.core.preferencesmanager.DEFAULT_PREFS, config_dir=config_dir) core_config = deluge.config.Config("core.conf", defaults=deluge.core.preferencesmanager.DEFAULT_PREFS,
config_dir=config_dir)
core_config.save() core_config.save()
def setUp(self): def setUp(self):
@ -52,6 +55,7 @@ class TorrentTestCase(unittest.TestCase):
self.torrent.prev_status_cleanup_loop.stop() self.torrent.prev_status_cleanup_loop.stop()
deluge.core.torrent.component = self.original_component deluge.core.torrent.component = self.original_component
def on_shutdown(result): def on_shutdown(result):
component._ComponentRegistry.components = {} component._ComponentRegistry.components = {}
return component.shutdown().addCallback(on_shutdown) return component.shutdown().addCallback(on_shutdown)
@ -78,9 +82,9 @@ class TorrentTestCase(unittest.TestCase):
return atp return atp
def test_set_prioritize_first_last(self): def test_set_prioritize_first_last(self):
piece_indexes = [(0,1), (0,1), (0,1), (0,1), (0,2), (50,52), piece_indexes = [(0, 1), (0, 1), (0, 1), (0, 1), (0, 2), (50, 52),
(51,53), (110,112), (111,114), (200,203), (51, 53), (110, 112), (111, 114), (200, 203),
(202,203), (212,213), (212,218), (457,463)] (202, 203), (212, 213), (212, 218), (457, 463)]
self.run_test_set_prioritize_first_last("dir_with_6_files.torrent", piece_indexes) self.run_test_set_prioritize_first_last("dir_with_6_files.torrent", piece_indexes)
def run_test_set_prioritize_first_last(self, torrent_file, prioritized_piece_indexes): def run_test_set_prioritize_first_last(self, torrent_file, prioritized_piece_indexes):

View File

@ -1,12 +1,9 @@
import os import os
from twisted.trial import unittest from twisted.trial import unittest
from deluge.ui.tracker_icons import TrackerIcons, TrackerIcon from deluge.ui.tracker_icons import TrackerIcons, TrackerIcon
from deluge.tests.common import set_tmp_config_dir
import common set_tmp_config_dir()
common.set_tmp_config_dir()
icons = TrackerIcons() icons = TrackerIcons()
dirname = os.path.dirname(__file__) dirname = os.path.dirname(__file__)
@ -14,6 +11,7 @@ dirname = os.path.dirname(__file__)
import deluge.ui.tracker_icons import deluge.ui.tracker_icons
deluge.ui.tracker_icons.PIL_INSTALLED = False deluge.ui.tracker_icons.PIL_INSTALLED = False
class TrackerIconsTestCase(unittest.TestCase): class TrackerIconsTestCase(unittest.TestCase):
def test_get_deluge_png(self): def test_get_deluge_png(self):

View File

@ -33,13 +33,13 @@
# statement from all source files in the program, then also delete it here. # statement from all source files in the program, then also delete it here.
# #
from twisted.trial import unittest
from deluge.transfer import DelugeTransferProtocol
import base64 import base64
from twisted.trial import unittest
import deluge.rencode as rencode import deluge.rencode as rencode
from deluge.transfer import DelugeTransferProtocol
import deluge.log
deluge.log.setupLogger("none")
class TransferTestClass(DelugeTransferProtocol): class TransferTestClass(DelugeTransferProtocol):
@ -77,7 +77,6 @@ class TransferTestClass(DelugeTransferProtocol):
:param data: a zlib compressed string encoded with rencode. :param data: a zlib compressed string encoded with rencode.
""" """
from datetime import timedelta
import zlib import zlib
print "\n=== New Data Received ===\nBytes received:", len(data) print "\n=== New Data Received ===\nBytes received:", len(data)
@ -109,8 +108,8 @@ class TransferTestClass(DelugeTransferProtocol):
try: try:
request = rencode.loads(dobj.decompress(data)) request = rencode.loads(dobj.decompress(data))
print "Successfully loaded message", print "Successfully loaded message",
print " - Buffer length: %d, data length: %d, unused length: %d" % (len(data), \ print " - Buffer length: %d, data length: %d, unused length: %d" % \
len(data) - len(dobj.unused_data), len(dobj.unused_data)) (len(data), len(data) - len(dobj.unused_data), len(dobj.unused_data))
print "Packet count:", self.packet_count print "Packet count:", self.packet_count
except Exception, e: except Exception, e:
#log.debug("Received possible invalid message (%r): %s", data, e) #log.debug("Received possible invalid message (%r): %s", data, e)
@ -125,6 +124,7 @@ class TransferTestClass(DelugeTransferProtocol):
self.message_received(request) self.message_received(request)
class DelugeTransferProtocolTestCase(unittest.TestCase): class DelugeTransferProtocolTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
@ -191,7 +191,8 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
and lets DelugeTransferProtocol receive the data as one string. and lets DelugeTransferProtocol receive the data as one string.
""" """
two_concatenated = base64.b64decode(self.msg1_expected_compressed_base64) + base64.b64decode(self.msg2_expected_compressed_base64) two_concatenated = base64.b64decode(self.msg1_expected_compressed_base64) + \
base64.b64decode(self.msg2_expected_compressed_base64)
self.transfer.dataReceived(two_concatenated) self.transfer.dataReceived(two_concatenated)
# Get the data as sent by DelugeTransferProtocol # Get the data as sent by DelugeTransferProtocol
@ -212,8 +213,10 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
packet_size = 40 packet_size = 40
one_message_byte_count = len(base64.b64decode(self.msg1_expected_compressed_base64)) one_message_byte_count = len(base64.b64decode(self.msg1_expected_compressed_base64))
two_messages_byte_count = one_message_byte_count + len(base64.b64decode(self.msg2_expected_compressed_base64)) two_messages_byte_count = one_message_byte_count + \
three_messages_byte_count = two_messages_byte_count + len(base64.b64decode(self.msg1_expected_compressed_base64)) len(base64.b64decode(self.msg2_expected_compressed_base64))
three_messages_byte_count = two_messages_byte_count + \
len(base64.b64decode(self.msg1_expected_compressed_base64))
for d in self.receive_parts_helper(msg_bytes, packet_size): for d in self.receive_parts_helper(msg_bytes, packet_size):
bytes_received = self.transfer.get_bytes_recv() bytes_received = self.transfer.get_bytes_recv()
@ -237,7 +240,6 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
message3 = self.transfer.get_messages_in().pop(0) message3 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message3)) self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message3))
# Remove underscore to enable test, or run the test directly: # Remove underscore to enable test, or run the test directly:
# tests $ trial test_transfer.DelugeTransferProtocolTestCase._test_rencode_fail_protocol # tests $ trial test_transfer.DelugeTransferProtocolTestCase._test_rencode_fail_protocol
def _test_rencode_fail_protocol(self): def _test_rencode_fail_protocol(self):
@ -251,8 +253,10 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
packet_size = 149 packet_size = 149
one_message_byte_count = len(base64.b64decode(self.msg1_expected_compressed_base64)) one_message_byte_count = len(base64.b64decode(self.msg1_expected_compressed_base64))
two_messages_byte_count = one_message_byte_count + len(base64.b64decode(self.msg2_expected_compressed_base64)) two_messages_byte_count = one_message_byte_count + \
three_messages_byte_count = two_messages_byte_count + len(base64.b64decode(self.msg1_expected_compressed_base64)) len(base64.b64decode(self.msg2_expected_compressed_base64))
three_messages_byte_count = two_messages_byte_count + \
len(base64.b64decode(self.msg1_expected_compressed_base64))
print print
@ -277,8 +281,8 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
expected_msgs_received_count = 0 expected_msgs_received_count = 0
# Verify that the expected number of complete messages has arrived # Verify that the expected number of complete messages has arrived
if expected_msgs_received_count != len(self.transfer.get_messages_in()): if expected_msgs_received_count != len(self.transfer.get_messages_in()):
print "Expected number of messages received is %d, but %d have been received."\ print "Expected number of messages received is %d, but %d have been received." % \
% (expected_msgs_received_count, len(self.transfer.get_messages_in())) (expected_msgs_received_count, len(self.transfer.get_messages_in()))
# Get the data as received by DelugeTransferProtocol # Get the data as received by DelugeTransferProtocol
message1 = self.transfer.get_messages_in().pop(0) message1 = self.transfer.get_messages_in().pop(0)
@ -288,7 +292,6 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
message3 = self.transfer.get_messages_in().pop(0) message3 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message3)) self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message3))
def test_receive_middle_of_header(self): def test_receive_middle_of_header(self):
""" """
This test concatenates two messsages (as they're sent over the network), This test concatenates two messsages (as they're sent over the network),
@ -301,17 +304,18 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
to read and parse the size of the payload. to read and parse the size of the payload.
""" """
two_concatenated = base64.b64decode(self.msg1_expected_compressed_base64) + base64.b64decode(self.msg2_expected_compressed_base64) two_concatenated = base64.b64decode(self.msg1_expected_compressed_base64) + \
base64.b64decode(self.msg2_expected_compressed_base64)
first_len = len(base64.b64decode(self.msg1_expected_compressed_base64)) first_len = len(base64.b64decode(self.msg1_expected_compressed_base64))
# Now found the entire first message, and half the header of the next message (2 bytes into the header) # Now found the entire first message, and half the header of the next message (2 bytes into the header)
self.transfer.dataReceived(two_concatenated[:first_len+2]) self.transfer.dataReceived(two_concatenated[:first_len + 2])
# Should be 1 message in the list # Should be 1 message in the list
self.assertEquals(1, len(self.transfer.get_messages_in())) self.assertEquals(1, len(self.transfer.get_messages_in()))
# Send the rest # Send the rest
self.transfer.dataReceived(two_concatenated[first_len+2:]) self.transfer.dataReceived(two_concatenated[first_len + 2:])
# Should be 2 messages in the list # Should be 2 messages in the list
self.assertEquals(2, len(self.transfer.get_messages_in())) self.assertEquals(2, len(self.transfer.get_messages_in()))
@ -322,7 +326,6 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
message2 = self.transfer.get_messages_in().pop(0) message2 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2)) self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2))
# Needs file containing big data structure e.g. like thetorrent list as it is transfered by the daemon # Needs file containing big data structure e.g. like thetorrent list as it is transfered by the daemon
#def test_simulate_big_transfer(self): #def test_simulate_big_transfer(self):
# filename = "../deluge.torrentlist" # filename = "../deluge.torrentlist"

View File

@ -1,8 +1,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from twisted.trial import unittest from twisted.trial import unittest
import os import os.path
from deluge.ui.common import TorrentInfo from deluge.ui.common import TorrentInfo
class UICommonTestCase(unittest.TestCase): class UICommonTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
pass pass
@ -13,18 +14,18 @@ class UICommonTestCase(unittest.TestCase):
def test_utf8_encoded_paths(self): def test_utf8_encoded_paths(self):
filename = os.path.join(os.path.dirname(__file__), "test.torrent") filename = os.path.join(os.path.dirname(__file__), "test.torrent")
ti = TorrentInfo(filename) ti = TorrentInfo(filename)
self.assertTrue(ti.files_tree.has_key("azcvsupdater_2.6.2.jar")) self.assertTrue("azcvsupdater_2.6.2.jar" in ti.files_tree)
def test_utf8_encoded_paths2(self): def test_utf8_encoded_paths2(self):
filename = os.path.join(os.path.dirname(__file__), "unicode_filenames.torrent") filename = os.path.join(os.path.dirname(__file__), "unicode_filenames.torrent")
ti = TorrentInfo(filename) ti = TorrentInfo(filename)
files = ti.files_tree["unicode_filenames"] files = ti.files_tree["unicode_filenames"]
self.assertTrue(files.has_key("\xe3\x83\x86\xe3\x82\xaf\xe3\x82\xb9\xe3\x83\xbb\xe3\x83" self.assertTrue("\xe3\x83\x86\xe3\x82\xaf\xe3\x82\xb9\xe3\x83\xbb\xe3\x83"
"\x86\xe3\x82\xaf\xe3\x82\xb5\xe3\x83\xb3.mkv")) "\x86\xe3\x82\xaf\xe3\x82\xb5\xe3\x83\xb3.mkv" in files)
self.assertTrue(files.has_key("\xd0\x9c\xd0\xb8\xd1\x85\xd0\xb0\xd0\xb8\xd0\xbb \xd0\x93" self.assertTrue("\xd0\x9c\xd0\xb8\xd1\x85\xd0\xb0\xd0\xb8\xd0\xbb \xd0\x93"
"\xd0\xbe\xd1\x80\xd0\xb1\xd0\xb0\xd1\x87\xd1\x91\xd0\xb2.mkv")) "\xd0\xbe\xd1\x80\xd0\xb1\xd0\xb0\xd1\x87\xd1\x91\xd0\xb2.mkv" in files)
self.assertTrue(files.has_key("Alisher ibn G'iyosiddin Navoiy.mkv")) self.assertTrue("Alisher ibn G'iyosiddin Navoiy.mkv" in files)
self.assertTrue(files.has_key("Ascii title.mkv")) self.assertTrue("Ascii title.mkv" in files)
self.assertTrue(files.has_key("\xe0\xa6\xb8\xe0\xa7\x81\xe0\xa6\x95\xe0\xa7\x81\xe0\xa6" self.assertTrue("\xe0\xa6\xb8\xe0\xa7\x81\xe0\xa6\x95\xe0\xa7\x81\xe0\xa6"
"\xae\xe0\xa6\xbe\xe0\xa6\xb0 \xe0\xa6\xb0\xe0\xa6\xbe\xe0\xa7\x9f.mkv")) "\xae\xe0\xa6\xbe\xe0\xa6\xb0 \xe0\xa6\xb0\xe0\xa6\xbe\xe0\xa7\x9f.mkv" in files)