Added some tests for rpcserver

This commit is contained in:
bendikro 2013-05-26 22:05:46 +02:00 committed by Calum Lind
parent affe47a11c
commit 19234d6565
5 changed files with 149 additions and 13 deletions

View File

@ -72,10 +72,11 @@ import logging
import shutil import shutil
import os import os
from twisted.internet.reactor import callLater
import deluge.common import deluge.common
json = deluge.common.json json = deluge.common.json
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def prop(func): def prop(func):
@ -219,23 +220,22 @@ what is currently in the config and it could not convert the value
self.__config[key] = value self.__config[key] = value
# Run the set_function for this key if any # Run the set_function for this key if any
from twisted.internet import reactor
try: try:
for func in self.__set_functions[key]: for func in self.__set_functions[key]:
reactor.callLater(0, func, key, value) callLater(0, func, key, value)
except KeyError: except KeyError:
pass pass
try: try:
def do_change_callbacks(key, value): def do_change_callbacks(key, value):
for func in self.__change_callbacks: for func in self.__change_callbacks:
func(key, value) func(key, value)
reactor.callLater(0, do_change_callbacks, key, value) callLater(0, do_change_callbacks, key, value)
except: except:
pass pass
# We set the save_timer for 5 seconds if not already set # We set the save_timer for 5 seconds if not already set
if not self._save_timer or not self._save_timer.active(): if not self._save_timer or not self._save_timer.active():
self._save_timer = reactor.callLater(5, self.save) self._save_timer = callLater(5, self.save)
def __getitem__(self, key): def __getitem__(self, key):
""" """
@ -314,9 +314,8 @@ what is currently in the config and it could not convert the value
""" """
del self.__config[key] del self.__config[key]
# We set the save_timer for 5 seconds if not already set # We set the save_timer for 5 seconds if not already set
from twisted.internet import reactor
if not self._save_timer or not self._save_timer.active(): if not self._save_timer or not self._save_timer.active():
self._save_timer = reactor.callLater(5, self.save) self._save_timer = callLater(5, self.save)
def register_change_callback(self, callback): def register_change_callback(self, callback):

View File

@ -228,7 +228,7 @@ class DelugeRPCProtocol(DelugeTransferProtocol):
Sends an error response with the contents of the exception that was raised. Sends an error response with the contents of the exception that was raised.
""" """
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
formated_tb = "".join(traceback.format_tb(exceptionTraceback)) formated_tb = traceback.format_exc()
try: try:
self.sendData(( self.sendData((
RPC_ERROR, RPC_ERROR,

View File

@ -2,8 +2,10 @@
import os import os
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import task
from deluge.tests.common import set_tmp_config_dir from deluge.tests.common import set_tmp_config_dir
from deluge.config import Config from deluge.config import Config
import deluge.config
DEFAULTS = {"string": "foobar", "int": 1, "float": 0.435, "bool": True, "unicode": u"foobar"} DEFAULTS = {"string": "foobar", "int": 1, "float": 0.435, "bool": True, "unicode": u"foobar"}
@ -106,11 +108,17 @@ class ConfigTestCase(unittest.TestCase):
self.assertEquals(config["int"], 2) self.assertEquals(config["int"], 2)
def test_save_timer(self): def test_save_timer(self):
self.clock = task.Clock()
deluge.config.callLater = self.clock.callLater
config = Config("test.conf", defaults=DEFAULTS, config_dir=self.config_dir) config = Config("test.conf", defaults=DEFAULTS, config_dir=self.config_dir)
config["string"] = "baz" config["string"] = "baz"
config["int"] = 2 config["int"] = 2
self.assertTrue(config._save_timer.active()) self.assertTrue(config._save_timer.active())
# Timeout set for 5 seconds in config, so lets move clock by 5 seconds
self.clock.advance(5)
def check_config(config): def check_config(config):
self.assertTrue(not config._save_timer.active()) self.assertTrue(not config._save_timer.active())
del config del config
@ -118,10 +126,7 @@ class ConfigTestCase(unittest.TestCase):
self.assertEquals(config["string"], "baz") self.assertEquals(config["string"], "baz")
self.assertEquals(config["int"], 2) self.assertEquals(config["int"], 2)
from twisted.internet.task import deferLater check_config(config)
from twisted.internet import reactor
d = deferLater(reactor, 7, check_config, config)
return d
def test_find_json_objects(self): def test_find_json_objects(self):
s = """{ s = """{

View File

@ -9,7 +9,7 @@ class LogTestCase(unittest.TestCase):
setupLogger(logging.DEBUG) setupLogger(logging.DEBUG)
def tearDown(self): def tearDown(self):
pass setupLogger("none")
def test_old_LOG_deprecation_warning(self): def test_old_LOG_deprecation_warning(self):
import warnings import warnings

View File

@ -0,0 +1,132 @@
#
# test_rpcserver.py
#
# Copyright (C) 2013 Bro <bro.development@gmail.com>
#
# Deluge is free software.
#
# You may redistribute it and/or modify it under the terms of the
# GNU General Public License, as published by the Free Software
# Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# deluge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with deluge. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.
#
#
import os
from twisted.trial import unittest
from twisted.python import log
import deluge.component as component
import deluge.error
from deluge.core import rpcserver
from deluge.ui.common import get_localhost_auth
from deluge.core.authmanager import AuthManager
from deluge.core.rpcserver import RPCServer, DelugeRPCProtocol
from deluge.log import setupLogger
setupLogger("none")
class DelugeRPCProtocolTester(DelugeRPCProtocol):
messages = []
def transfer_message(self, data):
import traceback
self.messages.append(data)
class RPCServerTestCase(unittest.TestCase):
def setUp(self):
self.rpcserver = RPCServer(listen=False)
self.rpcserver.factory.protocol = DelugeRPCProtocolTester
self.factory = self.rpcserver.factory
self.session_id = "0"
self.request_id = 11
self.protocol = self.rpcserver.factory.protocol()
self.protocol.factory = self.factory
self.protocol.transport = self.protocol
self.factory.session_protocols[self.session_id] = self.protocol
self.factory.authorized_sessions[self.session_id] = None
self.factory.interested_events[self.session_id] = ["TorrentFolderRenamedEvent"]
self.protocol.sessionno = self.session_id
return component.start()
def tearDown(self):
def on_shutdown(result):
component._ComponentRegistry.components = {}
del self.rpcserver
return component.shutdown().addCallback(on_shutdown)
def test_emit_event_for_session_id(self):
torrent_id = "12"
from deluge.event import TorrentFolderRenamedEvent
data = [torrent_id, "new name", "old name"]
e = TorrentFolderRenamedEvent(*data)
self.rpcserver.emit_event_for_session_id(self.session_id, e)
msg = self.protocol.messages.pop()
self.assertEquals(msg[0], rpcserver.RPC_EVENT, str(msg))
self.assertEquals(msg[1], "TorrentFolderRenamedEvent", str(msg))
self.assertEquals(msg[2], data, str(msg))
def test_invalid_client_login(self):
ret = self.protocol.dispatch(self.request_id, "daemon.login", [1], {})
msg = self.protocol.messages.pop()
self.assertEquals(msg[0], rpcserver.RPC_ERROR)
self.assertEquals(msg[1], self.request_id)
def test_invalid_client_login(self):
ret = self.protocol.dispatch(self.request_id, "daemon.login", [1], {})
msg = self.protocol.messages.pop()
self.assertEquals(msg[0], rpcserver.RPC_ERROR)
self.assertEquals(msg[1], self.request_id)
def test_valid_client_login(self):
self.authmanager = AuthManager()
auth = get_localhost_auth()
ret = self.protocol.dispatch(self.request_id, "daemon.login", auth, {"client_version": "Test"})
msg = self.protocol.messages.pop()
self.assertEquals(msg[0], rpcserver.RPC_RESPONSE, str(msg))
self.assertEquals(msg[1], self.request_id, str(msg))
self.assertEquals(msg[2], rpcserver.AUTH_LEVEL_ADMIN, str(msg))
def test_client_login_error(self):
# This test causes error log prints while running the test...
self.protocol.transport = None # This should causes AttributeError
self.authmanager = AuthManager()
auth = get_localhost_auth()
self.protocol.dispatch(self.request_id, "daemon.login", auth, {"client_version": "Test"})
msg = self.protocol.messages.pop()
self.assertEquals(msg[0], rpcserver.RPC_ERROR)
self.assertEquals(msg[1], self.request_id)
self.assertEquals(msg[2], "WrappedException")
self.assertEquals(msg[3][1], "AttributeError")
def test_daemon_info(self):
ret = self.protocol.dispatch(self.request_id, "daemon.info", [], {})
msg = self.protocol.messages.pop()
self.assertEquals(msg[0], rpcserver.RPC_RESPONSE, str(msg))
self.assertEquals(msg[1], self.request_id, str(msg))
self.assertEquals(msg[2], deluge.common.get_version(), str(msg))