[Tests] Improved common.start_core

* Replace Popen with reactor.spawnProcess and read process
  output with twisted.internet.protocol.ProcessProtocol
* Implement support for running custom script code
* Now logs to stdout instead of stderr when not logging to file
This commit is contained in:
bendikro 2015-12-15 18:26:53 +01:00
parent 533951afea
commit bcc1db12e5
8 changed files with 260 additions and 67 deletions

View File

@ -83,10 +83,11 @@ class Daemon(object):
read_only_config_keys (list of str, optional): A list of config keys that will not be
altered by core.set_config() RPC method.
"""
self.classic = classic
self.port = port
self.pid_file = get_config_dir("deluged.pid")
log.info("Deluge daemon %s", get_version())
pid_file = get_config_dir("deluged.pid")
check_running_daemon(pid_file)
check_running_daemon(self.pid_file)
# Twisted catches signals to terminate, so just have it call the shutdown method.
reactor.addSystemEventTrigger("before", "shutdown", self._shutdown)
@ -121,6 +122,7 @@ class Daemon(object):
log.debug("Listening to UI on: %s:%s and bittorrent on: %s", interface, port, listen_interface)
def start(self):
# Register the daemon and the core RPCs
self.rpcserver.register_object(self.core)
self.rpcserver.register_object(self)
@ -128,22 +130,21 @@ class Daemon(object):
# Make sure we start the PreferencesManager first
component.start("PreferencesManager")
if not classic:
if not self.classic:
log.info("Deluge daemon starting...")
# Create pid file to track if deluged is running, also includes the port number.
pid = os.getpid()
log.debug("Storing pid %s & port %s in: %s", pid, port, pid_file)
with open(pid_file, "wb") as _file:
_file.write("%s;%s\n" % (pid, port))
log.debug("Storing pid %s & port %s in: %s", pid, self.port, self.pid_file)
with open(self.pid_file, "wb") as _file:
_file.write("%s;%s\n" % (pid, self.port))
component.start()
try:
reactor.run()
finally:
log.debug("Remove pid file: %s", pid_file)
os.remove(pid_file)
log.debug("Remove pid file: %s", self.pid_file)
os.remove(self.pid_file)
log.info("Deluge daemon shutdown successfully")
@export()

View File

@ -373,9 +373,9 @@ class RPCServer(component.Component):
try:
reactor.listenSSL(port, self.factory, ServerContextFactory(), interface=hostname)
except Exception as ex:
log.info("Daemon already running or port not available..")
log.info("Daemon already running or port not available.")
log.error(ex)
sys.exit(0)
raise
def register_object(self, obj, name=None):
"""

View File

@ -14,6 +14,7 @@ import inspect
import logging
import logging.handlers
import os
import sys
from twisted.internet import defer
from twisted.python.log import PythonLoggingObserver
@ -138,7 +139,7 @@ def setup_logger(level="error", filename=None, filemode="w"):
filename, filemode, "utf-8", delay=0
)
else:
handler = logging.StreamHandler()
handler = logging.StreamHandler(stream=sys.stdout)
handler.setLevel(level)

View File

@ -119,8 +119,17 @@ def start_ui():
UI(options, args, options.args)
def start_daemon():
"""Entry point for daemon script"""
def start_daemon(skip_start=False):
"""
Entry point for daemon script
Args:
skip_start (bool): If starting daemon should be skipped.
Returns:
deluge.core.daemon.Daemon: A new daemon object
"""
deluge.common.setup_translations()
if 'dev' not in deluge.common.get_version():
@ -228,12 +237,15 @@ def start_daemon():
os.setuid(options.group)
def run_daemon(options):
from deluge.core.daemon import Daemon
try:
Daemon(listen_interface=options.listen_interface,
interface=options.ui_interface,
port=options.port,
read_only_config_keys=options.read_only_config_keys.split(","))
from deluge.core.daemon import Daemon
daemon = Daemon(listen_interface=options.listen_interface,
interface=options.ui_interface,
port=options.port,
read_only_config_keys=options.read_only_config_keys.split(","))
if not skip_start:
daemon.start()
return daemon
except Exception as ex:
log.exception(ex)
sys.exit(1)
@ -256,4 +268,4 @@ def start_daemon():
print("Running with profiler...")
profiler.runcall(run_daemon, options)
else:
run_daemon(options)
return run_daemon(options)

View File

@ -1,15 +1,16 @@
import os
import sys
import tempfile
import time
from subprocess import PIPE, Popen
from twisted.internet import defer, protocol, reactor
from twisted.internet.defer import Deferred
from twisted.internet.error import CannotListenError
import deluge.common
import deluge.configmanager
import deluge.core.preferencesmanager
import deluge.log
from deluge.error import DelugeError
deluge.log.setup_logger("none")
@ -24,6 +25,21 @@ def set_tmp_config_dir():
return config_directory
def add_watchdog(deferred, timeout=0.05, message=None):
def callback(value):
if not watchdog.called:
watchdog.cancel()
if not deferred.called:
if message:
print message
deferred.cancel()
return value
deferred.addBoth(callback)
watchdog = reactor.callLater(timeout, defer.timeout, deferred)
def rpath(*args):
return os.path.join(os.path.dirname(__file__), *args)
@ -31,37 +47,157 @@ def rpath(*args):
deluge.common.setup_translations()
def start_core(listen_port=58846):
cwd = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
class ProcessOutputHandler(protocol.ProcessProtocol):
def __init__(self, callbacks, script, logfile=None, print_stderr=True):
self.callbacks = callbacks
self.script = script
self.log_output = ""
self.stderr_out = ""
self.logfile = logfile
self.print_stderr = print_stderr
self.quit_d = None
self.killed = False
def connectionMade(self): # NOQA
self.transport.write(self.script)
self.transport.closeStdin()
def outConnectionLost(self): # NOQA
if not self.logfile:
return
with open(self.logfile, 'w') as f:
f.write(self.log_output)
def kill(self):
if self.killed:
return
self.killed = True
self.quit_d = Deferred()
self.transport.signalProcess('INT')
return self.quit_d
def processEnded(self, status): # NOQA
self.transport.loseConnection()
if self.quit_d is None:
return
if status.value.exitCode == 0:
self.quit_d.callback(True)
else:
self.quit_d.errback(status)
def check_callbacks(self, data, type="stdout"):
ret = False
for c in self.callbacks:
if type not in c["types"] or c["deferred"].called:
continue
for trigger in c["triggers"]:
if trigger["expr"] in data:
ret = True
if "cb" in trigger:
trigger["cb"](self, c["deferred"], data, self.log_output)
elif "value" not in trigger:
raise Exception("Trigger must specify either 'cb' or 'value'")
else:
val = trigger["value"](self, data, self.log_output)
if trigger.get("type", "callback") == "errback":
c["deferred"].errback(val)
else:
c["deferred"].callback(val)
return ret
def outReceived(self, data): # NOQA
"""Process output from stdout"""
self.log_output += data
if self.check_callbacks(data):
pass
elif '[ERROR' in data:
print data,
def errReceived(self, data): # NOQA
"""Process output from stderr"""
self.log_output += data
self.stderr_out += data
self.check_callbacks(data, type="stderr")
if not self.print_stderr:
return
data = "\n%s" % data.strip()
prefixed = data.replace("\n", "\nSTDERR: ")
print "\n%s" % prefixed
def start_core(listen_port=58846, logfile=None, timeout=10, timeout_msg=None,
custom_script="", print_stderr=True, extra_callbacks=None):
config_directory = set_tmp_config_dir()
daemon_script = """
import sys
import deluge.main
sys.argv.extend(['-d', '-c', '%s', '-L', 'info', '-p', '%d'])
deluge.main.start_daemon()
"""
config_directory = set_tmp_config_dir()
fp = tempfile.TemporaryFile()
fp.write(daemon_script % (config_directory, listen_port))
fp.seek(0)
try:
daemon = deluge.main.start_daemon(skip_start=True)
%s
daemon.start()
except:
import traceback
sys.stderr.write("Exception raised:\\n %%s" %% traceback.format_exc())
""" % (config_directory, listen_port, custom_script)
callbacks = []
default_core_cb = {"deferred": Deferred(), "types": "stdout"}
if timeout:
default_core_cb["timeout"] = timeout
core = Popen([sys.executable], cwd=cwd, stdin=fp, stdout=PIPE, stderr=PIPE)
while True:
line = core.stderr.readline()
if "starting on %d" % listen_port in line:
time.sleep(0.3) # Slight pause just incase
break
elif "Couldn't listen on localhost:%d" % listen_port in line:
raise CannotListenError("localhost", listen_port, "Could not start deluge test client: %s" % line)
elif 'Traceback' in line:
raise SystemExit(
"Failed to start core daemon. Do \"\"\" %s \"\"\" to see what's "
"happening" %
"python -c \"import sys; import tempfile; import deluge.main; "
"import deluge.configmanager; config_directory = tempfile.mkdtemp(); "
"deluge.configmanager.set_config_dir(config_directory); "
"sys.argv.extend(['-d', '-c', config_directory, '-L', 'info']); "
"deluge.main.start_daemon()\""
)
return core
# Specify the triggers for daemon log output
default_core_cb["triggers"] = [
{"expr": "Finished loading ", "value": lambda reader, data, data_all: reader},
{"expr": "Couldn't listen on localhost:%d" % (listen_port), "type": "errback", # Error from libtorrent
"value": lambda reader, data, data_all: CannotListenError("localhost", listen_port,
"Could not start deluge test client!\n%s" % data)},
{"expr": "Traceback", "type": "errback",
"value": lambda reader, data, data_all: DelugeError("Traceback found when starting daemon:\n%s" % data)}
]
callbacks.append(default_core_cb)
if extra_callbacks:
callbacks.extend(extra_callbacks)
process_protocol = start_process(daemon_script, callbacks, logfile, print_stderr)
return default_core_cb["deferred"], process_protocol
def start_process(script, callbacks, logfile=None, print_stderr=True):
"""
Starts an external python process which executes the given script.
Args:
script (str): The content of the script to execute
callbacks (list): list of dictionaries specifying callbacks
logfile (str): Optional logfile to write the output from the process
print_stderr (bool): If the output from the process' stderr should be printed to stdout
Returns:
ProcessOutputHandler: The handler for the process's output
Each entry in the callbacks list is a dictionary with the following keys:
* "deferred": The deferred to be called when matched
* "types": The output this callback should be matched against.
Possible values: ["stdout", "stderr"]
* "timeout" (optional): A timeout in seconds for the deferred
* "triggers": A list of dictionaries, each specifying specifying a trigger:
* "expr": A string to match against the log output
* "value": A function to produce the result to be passed to the callback
* "type" (optional): A string that specifies wether to trigger a regular callback or errback.
"""
cwd = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
process_protocol = ProcessOutputHandler(callbacks, script, logfile, print_stderr)
# Add timeouts to deferreds
for c in callbacks:
if "timeout" in c:
add_watchdog(c["deferred"], timeout=c["timeout"], message=c.get("timeout_msg", None))
reactor.spawnProcess(process_protocol, sys.executable, args=[sys.executable], path=cwd)
return process_protocol

View File

@ -0,0 +1,47 @@
from twisted.internet import defer
from twisted.internet.error import CannotListenError
import deluge.component as component
from . import common
class DaemonBase(object):
def common_set_up(self):
common.set_tmp_config_dir()
self.listen_port = 58900
self.core = None
return component.start()
def terminate_core(self, *args):
if args[0] is not None:
if hasattr(args[0], "getTraceback"):
print "terminate_core: Errback Exception: %s" % args[0].getTraceback()
if not self.core.killed:
d = self.core.kill()
return d
@defer.inlineCallbacks
def start_core(self, arg, custom_script="", logfile="", print_stderr=True, timeout=5,
port_range=10, extra_callbacks=None):
if logfile == "":
logfile = "daemon_%s.log" % self.id()
for dummy in range(port_range):
try:
d, self.core = common.start_core(listen_port=self.listen_port, logfile=logfile,
timeout=timeout, timeout_msg="Timeout!",
custom_script=custom_script,
print_stderr=print_stderr,
extra_callbacks=extra_callbacks)
yield d
except CannotListenError as ex:
exception_error = ex
self.listen_port += 1
except (KeyboardInterrupt, SystemExit):
raise
else:
return
raise exception_error

View File

@ -1,5 +1,4 @@
from twisted.internet import defer
from twisted.internet.error import CannotListenError
import deluge.component as component
import deluge.ui.common
@ -7,8 +6,8 @@ from deluge import error
from deluge.core.authmanager import AUTH_LEVEL_ADMIN
from deluge.ui.client import Client, DaemonSSLProxy, client
from . import common
from .basetest import BaseTestCase
from .daemon_base import DaemonBase
class NoVersionSendingDaemonSSLProxy(DaemonSSLProxy):
@ -65,24 +64,18 @@ class NoVersionSendingClient(Client):
self.disconnect_callback()
class ClientTestCase(BaseTestCase):
class ClientTestCase(BaseTestCase, DaemonBase):
def set_up(self):
self.listen_port = 58846
for dummy in range(10):
try:
self.core = common.start_core(listen_port=self.listen_port)
except CannotListenError as ex:
exception_error = ex
self.listen_port += 1
else:
break
else:
raise exception_error
d = self.common_set_up()
d.addCallback(self.start_core)
d.addErrback(self.terminate_core)
return d
def tear_down(self):
self.core.terminate()
return component.shutdown()
d = component.shutdown()
d.addCallback(self.terminate_core)
return d
def test_connect_no_credentials(self):
d = client.connect(

View File

@ -208,8 +208,10 @@ class DelugeRPCClientFactory(ClientFactory):
self.daemon.port = None
self.daemon.username = None
self.daemon.connected = False
if self.daemon.disconnect_deferred:
if self.daemon.disconnect_deferred and not self.daemon.disconnect_deferred.called:
self.daemon.disconnect_deferred.callback(reason.value)
self.daemon.disconnect_deferred = None
if self.daemon.disconnect_callback:
self.daemon.disconnect_callback()
@ -428,6 +430,7 @@ class DaemonClassicProxy(DaemonProxy):
event_handlers = {}
from deluge.core import daemon
self.__daemon = daemon.Daemon(classic=True)
self.__daemon.start()
log.debug("daemon created!")
self.connected = True
self.host = "localhost"