Rewrite component module to handle deferreds properly

Add test_component
Fix test_core to work with new component module
This commit is contained in:
Andrew Resch 2010-03-20 19:11:32 -07:00
parent 670ad51de1
commit b0714f625f
3 changed files with 465 additions and 172 deletions

View File

@ -1,7 +1,7 @@
# #
# component.py # component.py
# #
# Copyright (C) 2007, 2008 Andrew Resch <andrewresch@gmail.com> # Copyright (C) 2007-2010 Andrew Resch <andrewresch@gmail.com>
# #
# Deluge is free software. # Deluge is free software.
# #
@ -33,213 +33,340 @@
# #
# #
from twisted.internet.defer import maybeDeferred, succeed, DeferredList, fail
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from deluge.log import LOG as log from deluge.log import LOG as log
COMPONENT_STATE = [ class ComponentAlreadyRegistered(Exception):
"Stopped", pass
"Started",
"Paused"
]
class Component(object): class Component(object):
"""
Component objects are singletons managed by the `:class:ComponentRegistry`.
When a new Component object is instantiated, it will be automatically
registered with the `:class:ComponentRegistry`.
The ComponentRegistry has the ability to start, stop, pause and shutdown the
components registered with it.
Events:
start() - This method is called when the client has connected to a
Deluge core.
stop() - This method is called when the client has disconnected from a
Deluge core.
update() - This method is called every 1 second by default while the
Componented is in a "Started" state. The interval can be
specified during instantiation. The update() timer can be
paused by instructing the `:class:ComponentRegistry` to pause
this Component.
shutdown() - This method is called when the client is exiting. If the
Component is in a "Started" state when this is called, a
call to stop() will be issued prior to shutdown().
"""
def __init__(self, name, interval=1, depend=None): def __init__(self, name, interval=1, depend=None):
# Register with the ComponentRegistry self._component_name = name
register(name, self, depend) self._component_interval = interval
self._interval = interval self._component_depend = depend
self._timer = None self._component_state = "Stopped"
self._state = COMPONENT_STATE.index("Stopped") self._component_timer = None
self._name = name _ComponentRegistry.register(self)
def get_state(self): def _component_start_timer(self):
return self._state
def get_component_name(self):
return self._name
def start(self):
pass
def _start(self):
self._state = COMPONENT_STATE.index("Started")
if hasattr(self, "update"): if hasattr(self, "update"):
self._timer = LoopingCall(self.update) self._component_timer = LoopingCall(self.update)
self._timer.start(self._interval) self._component_timer.start(self._component_interval)
def stop(self): def _component_start(self):
pass def on_start(result):
self._component_state = "Started"
self._component_start_timer()
return True
def _stop(self): if self._component_state == "Stopped":
self._state = COMPONENT_STATE.index("Stopped") if hasattr(self, "start"):
try: d = maybeDeferred(self.start)
self._timer.stop() d.addCallback(on_start)
except: else:
pass d = maybeDeferred(on_start, None)
elif self._component_state == "Started":
d = succeed(True)
else:
d = fail("Cannot start a component not in a Stopped state!")
def _pause(self): return d
self._state = COMPONENT_STATE.index("Paused")
try:
self._timer.stop()
except:
pass
def _resume(self): def _component_stop(self):
self._start() def on_stop(result):
self._component_state = "Stopped"
if self._component_timer and self._component_timer.running:
self._component_timer.stop()
return True
def shutdown(self): if self._component_state != "Stopped":
pass if hasattr(self, "stop"):
d = maybeDeferred(self.stop)
d.addCallback(on_stop)
else:
d = maybeDeferred(on_stop, None)
class ComponentRegistry: return succeed(None)
def _component_pause(self):
def on_pause(result):
self._component_state = "Paused"
if self._component_state == "Started":
if self._component_timer and self._component_timer.running:
d = maybeDeferred(self._component_timer.stop)
d.addCallback(on_pause)
else:
d = succeed(None)
elif self._component_state == "Paused":
d = succeed(None)
else:
d = fail("Cannot pause a component in a non-Started state!")
return d
def _component_resume(self):
def on_resume(result):
self._component_state = "Started"
if self._component_state == "Paused":
d = maybeDeferred(self._component_start_timer)
d.addCallback(on_resume)
else:
d = fail("Component cannot be resumed from a non-Paused state!")
return d
def _component_shutdown(self):
def on_stop(result):
if hasattr(self, "shutdown"):
return maybeDeferred(self.shutdown)
return succeed(None)
d = self._component_stop()
d.addCallback(on_stop)
return d
class ComponentRegistry(object):
"""
The ComponentRegistry holds a list of currently registered
`:class:Component` objects. It is used to manage the Components by
starting, stopping, pausing and shutting them down.
"""
def __init__(self): def __init__(self):
self.components = {} self.components = {}
self.depend = {}
def register(self, name, obj, depend): def register(self, obj):
"""Registers a component.. depend must be list or None""" """
log.debug("Registered %s with ComponentRegistry..", name) Registers a component object with the registry. This is done
self.components[name] = obj automatically when a Component object is instantiated.
if depend != None:
self.depend[name] = depend :param obj: the Component object
:type obj: object
:raises ComponentAlreadyRegistered: if a component with the same name
is already registered.
"""
name = obj._component_name
if name in self.components:
raise ComponentAlreadyRegistered(
"Component already registered with name %s" % name)
self.components[obj._component_name] = obj
def deregister(self, name): def deregister(self, name):
"""Deregisters a component""" """
Deregisters a component from the registry. A stop will be
issued to the component prior to deregistering it.
:param name: the name of the component
:type name: string
"""
if name in self.components: if name in self.components:
log.debug("Deregistering Component: %s", name) log.debug("Deregistering Component: %s", name)
self.stop_component(name) d = self.stop([name])
del self.components[name] def on_stop(result, name):
del self.components[name]
return d.addCallback(on_stop, name)
else:
return succeed(None)
def get(self, name): def start(self, names=[]):
"""Returns a reference to the component 'name'""" """
return self.components[name] Starts Components that are currently in a Stopped state and their
dependencies. If `:param:names` is specified, will only start those
Components and their dependencies and if not it will start all
registered components.
def start(self): :param names: a list of Components to start
"""Starts all components""" :type names: list
for component in self.components.keys():
self.start_component(component)
def start_component(self, name): :returns: a Deferred object that will fire once all Components have been
"""Starts a component""" sucessfully started
# Check to see if this component has any dependencies :rtype: twisted.internet.defer.Deferred
if self.depend.has_key(name):
for depend in self.depend[name]:
self.start_component(depend)
# Only start if the component is stopped. """
if self.components[name].get_state() == \ # Start all the components if names is empty
COMPONENT_STATE.index("Stopped"): if not names:
log.debug("Starting component %s..", name) names = self.components.keys()
self.components[name].start() elif isinstance(names, str):
self.components[name]._start() names = [names]
def stop(self): def on_depends_started(result, name):
"""Stops all components""" return self.components[name]._component_start()
# We create a separate list of the keys and do an additional check to
# make sure the key still exists in the components dict.
# This is because components could be deregistered during a stop and
# the dictionary would get modified while iterating through it.
components = self.components.keys()
for component in components:
if component in self.components:
self.stop_component(component)
def stop_component(self, component): deferreds = []
if self.components[component].get_state() != \
COMPONENT_STATE.index("Stopped"):
log.debug("Stopping component %s..", component)
self.components[component].stop()
self.components[component]._stop()
def pause(self): for name in names:
"""Pauses all components. Stops calling update()""" if self.components[name]._component_depend:
for component in self.components.keys(): # This component has depends, so we need to start them first.
self.pause_component(component) d = self.start(self.components[name]._component_depend)
d.addCallback(on_depends_started, name)
deferreds.append(d)
else:
deferreds.append(self.components[name]._component_start())
def pause_component(self, component): return DeferredList(deferreds)
if self.components[component].get_state() not in \
[COMPONENT_STATE.index("Paused"), COMPONENT_STATE.index("Stopped")]:
log.debug("Pausing component %s..", component)
self.components[component]._pause()
def resume(self): def stop(self, names=[]):
"""Resumes all components. Starts calling update()""" """
for component in self.components.keys(): Stops Components that are currently not in a Stopped state. If
self.resume_component(component) `:param:names` is specified, then it will only stop those Components,
and if not it will stop all the registered Components.
def resume_component(self, component): :param names: a list of Components to start
if self.components[component].get_state() == COMPONENT_STATE.index("Paused"): :type names: list
log.debug("Resuming component %s..", component)
self.components[component]._resume()
def update(self): :returns: a Deferred object that will fire once all Components have been
"""Updates all components""" sucessfully stopped
for component in self.components.keys(): :rtype: twisted.internet.defer.Deferred
# Only update the component if it's started
if self.components[component].get_state() == \
COMPONENT_STATE.index("Started"):
self.components[component].update()
return True """
if not names:
names = self.components.keys()
elif isinstance(names, str):
names = [names]
deferreds = []
for name in names:
deferreds.append(self.components[name]._component_stop())
return DeferredList(deferreds)
def pause(self, names=[]):
"""
Pauses Components that are currently in a Started state. If
`:param:names` is specified, then it will only pause those Components,
and if not it will pause all the registered Components.
:param names: a list of Components to pause
:type names: list
:returns: a Deferred object that will fire once all Components have been
sucessfully paused
:rtype: twisted.internet.defer.Deferred
"""
if not names:
names = self.components.keys()
elif isinstance(names, str):
names = [names]
deferreds = []
for name in names:
if self.components[name]._component_state == "Started":
deferreds.append(self.components[name]._component_pause())
return DeferredList(deferreds)
def resume(self, names=[]):
"""
Resumes Components that are currently in a Paused state. If
`:param:names` is specified, then it will only resume those Components,
and if not it will resume all the registered Components.
:param names: a list of Components to resume
:type names: list
:returns: a Deferred object that will fire once all Components have been
sucessfully resumed
:rtype: twisted.internet.defer.Deferred
"""
if not names:
names = self.components.keys()
elif isinstance(names, str):
names = [names]
deferreds = []
for name in names:
if self.components[name]._component_state == "Paused":
deferreds.append(self.components[name]._component_resume())
return DeferredList(deferreds)
def shutdown(self): def shutdown(self):
"""Shuts down all components. This should be called when the program """
exits so that components can do any necessary clean-up.""" Shutdowns all Components regardless of state. This will call
# Stop all components first `:meth:stop` on call the components prior to shutting down. This should
self.stop() be called when the program is exiting to ensure all Components have a
for component in self.components.keys(): chance to properly shutdown.
log.debug("Shutting down component %s..", component)
try:
self.components[component].shutdown()
except Exception, e:
log.debug("Unable to call shutdown()")
log.exception(e)
:returns: a Deferred object that will fire once all Components have been
sucessfully resumed
:rtype: twisted.internet.defer.Deferred
"""
deferreds = []
for component in self.components.values():
deferreds.append(component._component_shutdown())
return DeferredList(deferreds)
def update(self):
"""
Updates all Components that are in a Started state.
"""
for component in self.components.items():
component.update()
_ComponentRegistry = ComponentRegistry() _ComponentRegistry = ComponentRegistry()
def register(name, obj, depend=None): deregister = _ComponentRegistry.deregister
"""Registers a component with the registry""" start = _ComponentRegistry.start
_ComponentRegistry.register(name, obj, depend) stop = _ComponentRegistry.stop
pause = _ComponentRegistry.pause
resume = _ComponentRegistry.resume
update = _ComponentRegistry.update
shutdown = _ComponentRegistry.shutdown
def deregister(name): def get(name):
"""Deregisters a component""" """
_ComponentRegistry.deregister(name) Return a reference to a component.
def start(component=None): :param name: the Component name to get
"""Starts all components""" :type name: string
if component == None:
_ComponentRegistry.start()
else:
_ComponentRegistry.start_component(component)
def stop(component=None): :returns: the Component object
"""Stops all or specified components""" :rtype: object
if component == None:
_ComponentRegistry.stop()
else:
_ComponentRegistry.stop_component(component)
def pause(component=None): :raises KeyError: if the Component does not exist
"""Pauses all or specificed components"""
if component == None:
_ComponentRegistry.pause()
else:
_ComponentRegistry.pause_component(component)
def resume(component=None): """
"""Resumes all or specificed components""" return _ComponentRegistry.components[name]
if component == None:
_ComponentRegistry.resume()
else:
_ComponentRegistry.resume_component(component)
def update():
"""Updates all components"""
_ComponentRegistry.update()
def shutdown():
"""Shutdowns all components"""
_ComponentRegistry.shutdown()
def get(component):
"""Return a reference to the component"""
return _ComponentRegistry.get(component)

162
tests/test_component.py Normal file
View File

@ -0,0 +1,162 @@
from twisted.trial import unittest
import deluge.component as component
class testcomponent(component.Component):
def __init__(self, name):
component.Component.__init__(self, name)
class testcomponent_delaystart(component.Component):
def __init__(self, name):
component.Component.__init__(self, name)
def start(self):
import time
time.sleep(1)
class testcomponent_update(component.Component):
def __init__(self, name):
component.Component.__init__(self, name)
self.counter = 0
def update(self):
self.counter += 1
class testcomponent_shutdown(component.Component):
def __init__(self, name):
component.Component.__init__(self, name)
self.shutdowned = False
def shutdown(self):
self.shutdowned = True
class ComponentTestClass(unittest.TestCase):
def tearDown(self):
component._ComponentRegistry.components = {}
def test_start_component(self):
def on_start(result, c):
self.assertEquals(c._component_state, "Started")
c = testcomponent("test_start_c1")
d = component.start(["test_start_c1"])
d.addCallback(on_start, c)
def test_start_depends(self):
def on_start(result, c1, c2):
self.assertEquals(c1._component_state, "Started")
self.assertEquals(c2._component_state, "Started")
c1 = testcomponent("test_start_depends_c1")
c2 = testcomponent("test_start_depends_c2")
c2._component_depend = ["test_start_depends_c1"]
d = component.start(["test_start_depends_c2"])
d.addCallback(on_start, c1, c2)
def start_with_depends(self):
c1 = testcomponent_delaystart("test_start_all_c1")
c2 = testcomponent("test_start_all_c2")
c3 = testcomponent_delaystart("test_start_all_c3")
c4 = testcomponent("test_start_all_c4")
c5 = testcomponent("test_start_all_c5")
c3._component_depend = ["test_start_all_c5", "test_start_all_c1"]
c4._component_depend = ["test_start_all_c3"]
c2._component_depend = ["test_start_all_c4"]
d = component.start()
return (d, c1, c2, c3, c4, c5)
def finish_start_with_depends(self, *args):
for c in args[1:]:
component.deregister(c._component_name)
def test_start_all(self):
def on_start(*args):
for c in args[1:]:
self.assertEquals(c._component_state, "Started")
ret = self.start_with_depends()
ret[0].addCallback(on_start, *ret[1:])
ret[0].addCallback(self.finish_start_with_depends, *ret[1:])
def test_register_exception(self):
c1 = testcomponent("test_register_exception_c1")
self.assertRaises(
component.ComponentAlreadyRegistered,
testcomponent,
"test_register_exception_c1")
def test_stop_component(self):
def on_stop(result, c):
self.assertEquals(c._component_state, "Stopped")
self.assertFalse(c._component_timer.running)
def on_start(result, c):
self.assertEquals(c._component_state, "Started")
return component.stop(["test_stop_component_c1"]).addCallback(on_stop, c)
c = testcomponent_update("test_stop_component_c1")
d = component.start(["test_stop_component_c1"])
d.addCallback(on_start, c)
def test_stop_all(self):
def on_stop(*args):
for c in args[1:]:
self.assertEquals(c._component_state, "Stopped")
def on_start(*args):
for c in args[1:]:
self.assertEquals(c._component_state, "Started")
return component.stop().addCallback(on_stop, *args[1:])
ret = self.start_with_depends()
ret[0].addCallback(on_start, *ret[1:])
ret[0].addCallback(self.finish_start_with_depends, *ret[1:])
def test_update(self):
def on_start(result, c1, counter):
self.assertTrue(c1._component_timer)
self.assertTrue(c1._component_timer.running)
self.assertNotEqual(c1.counter, counter)
return component.stop()
c1 = testcomponent_update("test_update_c1")
cnt = int(c1.counter)
d = component.start(["test_update_c1"])
d.addCallback(on_start, c1, cnt)
def test_pause(self):
def on_pause(result, c1, counter):
self.assertEqual(c1._component_state, "Paused")
self.assertNotEqual(c1.counter, counter)
self.assertFalse(c1._component_timer.running)
def on_start(result, c1, counter):
self.assertTrue(c1._component_timer)
self.assertNotEqual(c1.counter, counter)
d = component.pause(["test_pause_c1"])
d.addCallback(on_pause, c1, counter)
return d
c1 = testcomponent_update("test_pause_c1")
cnt = int(c1.counter)
d = component.start(["test_pause_c1"])
d.addCallback(on_start, c1, cnt)
def test_shutdown(self):
def on_shutdown(result, c1):
self.assertTrue(c1.shutdowned)
self.assertEquals(c1._component_state, "Stopped")
def on_start(result, c1):
d = component.shutdown()
d.addCallback(on_shutdown, c1)
return d
c1 = testcomponent_shutdown("test_shutdown_c1")
d = component.start(["test_shutdown_c1"])
d.addCallback(on_start, c1)

View File

@ -18,13 +18,17 @@ class CoreTestCase(unittest.TestCase):
common.set_tmp_config_dir() common.set_tmp_config_dir()
self.rpcserver = RPCServer(listen=False) self.rpcserver = RPCServer(listen=False)
self.core = Core() self.core = Core()
component.start() d = component.start()
return d
def tearDown(self): def tearDown(self):
component.stop()
component.shutdown() def on_shutdown(result):
del self.rpcserver component._ComponentRegistry.components = {}
del self.core del self.rpcserver
del self.core
return component.shutdown().addCallback(on_shutdown)
def test_add_torrent_file(self): def test_add_torrent_file(self):
options = {} options = {}