From b0714f625f50c4c0037213626565917e3e57704b Mon Sep 17 00:00:00 2001 From: Andrew Resch Date: Sat, 20 Mar 2010 19:11:32 -0700 Subject: [PATCH] Rewrite component module to handle deferreds properly Add test_component Fix test_core to work with new component module --- deluge/component.py | 461 +++++++++++++++++++++++++--------------- tests/test_component.py | 162 ++++++++++++++ tests/test_core.py | 14 +- 3 files changed, 465 insertions(+), 172 deletions(-) create mode 100644 tests/test_component.py diff --git a/deluge/component.py b/deluge/component.py index f433ad6e0..d229883bb 100644 --- a/deluge/component.py +++ b/deluge/component.py @@ -1,7 +1,7 @@ # # component.py # -# Copyright (C) 2007, 2008 Andrew Resch +# Copyright (C) 2007-2010 Andrew Resch # # Deluge is free software. # @@ -33,213 +33,340 @@ # # +from twisted.internet.defer import maybeDeferred, succeed, DeferredList, fail from twisted.internet.task import LoopingCall from deluge.log import LOG as log -COMPONENT_STATE = [ - "Stopped", - "Started", - "Paused" -] +class ComponentAlreadyRegistered(Exception): + pass class Component(object): + """ + Component objects are singletons managed by the `:class:ComponentRegistry`. + When a new Component object is instantiated, it will be automatically + registered with the `:class:ComponentRegistry`. + + The ComponentRegistry has the ability to start, stop, pause and shutdown the + components registered with it. + + Events: + + start() - This method is called when the client has connected to a + Deluge core. + + stop() - This method is called when the client has disconnected from a + Deluge core. + + update() - This method is called every 1 second by default while the + Componented is in a "Started" state. The interval can be + specified during instantiation. The update() timer can be + paused by instructing the `:class:ComponentRegistry` to pause + this Component. + + shutdown() - This method is called when the client is exiting. If the + Component is in a "Started" state when this is called, a + call to stop() will be issued prior to shutdown(). + + """ def __init__(self, name, interval=1, depend=None): - # Register with the ComponentRegistry - register(name, self, depend) - self._interval = interval - self._timer = None - self._state = COMPONENT_STATE.index("Stopped") - self._name = name + self._component_name = name + self._component_interval = interval + self._component_depend = depend + self._component_state = "Stopped" + self._component_timer = None + _ComponentRegistry.register(self) - def get_state(self): - return self._state - - def get_component_name(self): - return self._name - - def start(self): - pass - - def _start(self): - self._state = COMPONENT_STATE.index("Started") + def _component_start_timer(self): if hasattr(self, "update"): - self._timer = LoopingCall(self.update) - self._timer.start(self._interval) + self._component_timer = LoopingCall(self.update) + self._component_timer.start(self._component_interval) - def stop(self): - pass + def _component_start(self): + def on_start(result): + self._component_state = "Started" + self._component_start_timer() + return True - def _stop(self): - self._state = COMPONENT_STATE.index("Stopped") - try: - self._timer.stop() - except: - pass + if self._component_state == "Stopped": + if hasattr(self, "start"): + d = maybeDeferred(self.start) + d.addCallback(on_start) + else: + d = maybeDeferred(on_start, None) + elif self._component_state == "Started": + d = succeed(True) + else: + d = fail("Cannot start a component not in a Stopped state!") - def _pause(self): - self._state = COMPONENT_STATE.index("Paused") - try: - self._timer.stop() - except: - pass + return d - def _resume(self): - self._start() + def _component_stop(self): + def on_stop(result): + self._component_state = "Stopped" + if self._component_timer and self._component_timer.running: + self._component_timer.stop() + return True - def shutdown(self): - pass + if self._component_state != "Stopped": + if hasattr(self, "stop"): + d = maybeDeferred(self.stop) + d.addCallback(on_stop) + else: + d = maybeDeferred(on_stop, None) -class ComponentRegistry: + return succeed(None) + + def _component_pause(self): + def on_pause(result): + self._component_state = "Paused" + + if self._component_state == "Started": + if self._component_timer and self._component_timer.running: + d = maybeDeferred(self._component_timer.stop) + d.addCallback(on_pause) + else: + d = succeed(None) + elif self._component_state == "Paused": + d = succeed(None) + else: + d = fail("Cannot pause a component in a non-Started state!") + + return d + + def _component_resume(self): + def on_resume(result): + self._component_state = "Started" + + if self._component_state == "Paused": + d = maybeDeferred(self._component_start_timer) + d.addCallback(on_resume) + else: + d = fail("Component cannot be resumed from a non-Paused state!") + + return d + + def _component_shutdown(self): + def on_stop(result): + if hasattr(self, "shutdown"): + return maybeDeferred(self.shutdown) + return succeed(None) + + d = self._component_stop() + d.addCallback(on_stop) + return d + +class ComponentRegistry(object): + """ + The ComponentRegistry holds a list of currently registered + `:class:Component` objects. It is used to manage the Components by + starting, stopping, pausing and shutting them down. + """ def __init__(self): self.components = {} - self.depend = {} - def register(self, name, obj, depend): - """Registers a component.. depend must be list or None""" - log.debug("Registered %s with ComponentRegistry..", name) - self.components[name] = obj - if depend != None: - self.depend[name] = depend + def register(self, obj): + """ + Registers a component object with the registry. This is done + automatically when a Component object is instantiated. + + :param obj: the Component object + :type obj: object + + :raises ComponentAlreadyRegistered: if a component with the same name + is already registered. + + """ + name = obj._component_name + if name in self.components: + raise ComponentAlreadyRegistered( + "Component already registered with name %s" % name) + + self.components[obj._component_name] = obj def deregister(self, name): - """Deregisters a component""" + """ + Deregisters a component from the registry. A stop will be + issued to the component prior to deregistering it. + + :param name: the name of the component + :type name: string + + """ + if name in self.components: log.debug("Deregistering Component: %s", name) - self.stop_component(name) - del self.components[name] + d = self.stop([name]) + def on_stop(result, name): + del self.components[name] + return d.addCallback(on_stop, name) + else: + return succeed(None) - def get(self, name): - """Returns a reference to the component 'name'""" - return self.components[name] + def start(self, names=[]): + """ + Starts Components that are currently in a Stopped state and their + dependencies. If `:param:names` is specified, will only start those + Components and their dependencies and if not it will start all + registered components. - def start(self): - """Starts all components""" - for component in self.components.keys(): - self.start_component(component) + :param names: a list of Components to start + :type names: list - def start_component(self, name): - """Starts a component""" - # Check to see if this component has any dependencies - if self.depend.has_key(name): - for depend in self.depend[name]: - self.start_component(depend) + :returns: a Deferred object that will fire once all Components have been + sucessfully started + :rtype: twisted.internet.defer.Deferred - # Only start if the component is stopped. - if self.components[name].get_state() == \ - COMPONENT_STATE.index("Stopped"): - log.debug("Starting component %s..", name) - self.components[name].start() - self.components[name]._start() + """ + # Start all the components if names is empty + if not names: + names = self.components.keys() + elif isinstance(names, str): + names = [names] - def stop(self): - """Stops all components""" - # We create a separate list of the keys and do an additional check to - # make sure the key still exists in the components dict. - # This is because components could be deregistered during a stop and - # the dictionary would get modified while iterating through it. - components = self.components.keys() - for component in components: - if component in self.components: - self.stop_component(component) + def on_depends_started(result, name): + return self.components[name]._component_start() - def stop_component(self, component): - if self.components[component].get_state() != \ - COMPONENT_STATE.index("Stopped"): - log.debug("Stopping component %s..", component) - self.components[component].stop() - self.components[component]._stop() + deferreds = [] - def pause(self): - """Pauses all components. Stops calling update()""" - for component in self.components.keys(): - self.pause_component(component) + for name in names: + if self.components[name]._component_depend: + # This component has depends, so we need to start them first. + d = self.start(self.components[name]._component_depend) + d.addCallback(on_depends_started, name) + deferreds.append(d) + else: + deferreds.append(self.components[name]._component_start()) - def pause_component(self, component): - if self.components[component].get_state() not in \ - [COMPONENT_STATE.index("Paused"), COMPONENT_STATE.index("Stopped")]: - log.debug("Pausing component %s..", component) - self.components[component]._pause() + return DeferredList(deferreds) - def resume(self): - """Resumes all components. Starts calling update()""" - for component in self.components.keys(): - self.resume_component(component) + def stop(self, names=[]): + """ + Stops Components that are currently not in a Stopped state. If + `:param:names` is specified, then it will only stop those Components, + and if not it will stop all the registered Components. - def resume_component(self, component): - if self.components[component].get_state() == COMPONENT_STATE.index("Paused"): - log.debug("Resuming component %s..", component) - self.components[component]._resume() + :param names: a list of Components to start + :type names: list - def update(self): - """Updates all components""" - for component in self.components.keys(): - # Only update the component if it's started - if self.components[component].get_state() == \ - COMPONENT_STATE.index("Started"): - self.components[component].update() + :returns: a Deferred object that will fire once all Components have been + sucessfully stopped + :rtype: twisted.internet.defer.Deferred - return True + """ + if not names: + names = self.components.keys() + elif isinstance(names, str): + names = [names] + + deferreds = [] + + for name in names: + deferreds.append(self.components[name]._component_stop()) + + return DeferredList(deferreds) + + def pause(self, names=[]): + """ + Pauses Components that are currently in a Started state. If + `:param:names` is specified, then it will only pause those Components, + and if not it will pause all the registered Components. + + :param names: a list of Components to pause + :type names: list + + :returns: a Deferred object that will fire once all Components have been + sucessfully paused + :rtype: twisted.internet.defer.Deferred + + """ + if not names: + names = self.components.keys() + elif isinstance(names, str): + names = [names] + + deferreds = [] + + for name in names: + if self.components[name]._component_state == "Started": + deferreds.append(self.components[name]._component_pause()) + + return DeferredList(deferreds) + + def resume(self, names=[]): + """ + Resumes Components that are currently in a Paused state. If + `:param:names` is specified, then it will only resume those Components, + and if not it will resume all the registered Components. + + :param names: a list of Components to resume + :type names: list + + :returns: a Deferred object that will fire once all Components have been + sucessfully resumed + :rtype: twisted.internet.defer.Deferred + + """ + if not names: + names = self.components.keys() + elif isinstance(names, str): + names = [names] + + deferreds = [] + + for name in names: + if self.components[name]._component_state == "Paused": + deferreds.append(self.components[name]._component_resume()) + + return DeferredList(deferreds) def shutdown(self): - """Shuts down all components. This should be called when the program - exits so that components can do any necessary clean-up.""" - # Stop all components first - self.stop() - for component in self.components.keys(): - log.debug("Shutting down component %s..", component) - try: - self.components[component].shutdown() - except Exception, e: - log.debug("Unable to call shutdown()") - log.exception(e) + """ + Shutdowns all Components regardless of state. This will call + `:meth:stop` on call the components prior to shutting down. This should + be called when the program is exiting to ensure all Components have a + chance to properly shutdown. + :returns: a Deferred object that will fire once all Components have been + sucessfully resumed + :rtype: twisted.internet.defer.Deferred + + """ + deferreds = [] + + for component in self.components.values(): + deferreds.append(component._component_shutdown()) + + return DeferredList(deferreds) + + def update(self): + """ + Updates all Components that are in a Started state. + + """ + for component in self.components.items(): + component.update() _ComponentRegistry = ComponentRegistry() -def register(name, obj, depend=None): - """Registers a component with the registry""" - _ComponentRegistry.register(name, obj, depend) +deregister = _ComponentRegistry.deregister +start = _ComponentRegistry.start +stop = _ComponentRegistry.stop +pause = _ComponentRegistry.pause +resume = _ComponentRegistry.resume +update = _ComponentRegistry.update +shutdown = _ComponentRegistry.shutdown -def deregister(name): - """Deregisters a component""" - _ComponentRegistry.deregister(name) +def get(name): + """ + Return a reference to a component. -def start(component=None): - """Starts all components""" - if component == None: - _ComponentRegistry.start() - else: - _ComponentRegistry.start_component(component) + :param name: the Component name to get + :type name: string -def stop(component=None): - """Stops all or specified components""" - if component == None: - _ComponentRegistry.stop() - else: - _ComponentRegistry.stop_component(component) + :returns: the Component object + :rtype: object -def pause(component=None): - """Pauses all or specificed components""" - if component == None: - _ComponentRegistry.pause() - else: - _ComponentRegistry.pause_component(component) + :raises KeyError: if the Component does not exist -def resume(component=None): - """Resumes all or specificed components""" - if component == None: - _ComponentRegistry.resume() - else: - _ComponentRegistry.resume_component(component) - -def update(): - """Updates all components""" - _ComponentRegistry.update() - -def shutdown(): - """Shutdowns all components""" - _ComponentRegistry.shutdown() - -def get(component): - """Return a reference to the component""" - return _ComponentRegistry.get(component) + """ + return _ComponentRegistry.components[name] diff --git a/tests/test_component.py b/tests/test_component.py new file mode 100644 index 000000000..c2aafde50 --- /dev/null +++ b/tests/test_component.py @@ -0,0 +1,162 @@ +from twisted.trial import unittest +import deluge.component as component + +class testcomponent(component.Component): + def __init__(self, name): + component.Component.__init__(self, name) + +class testcomponent_delaystart(component.Component): + def __init__(self, name): + component.Component.__init__(self, name) + + def start(self): + import time + time.sleep(1) + +class testcomponent_update(component.Component): + def __init__(self, name): + component.Component.__init__(self, name) + self.counter = 0 + + def update(self): + self.counter += 1 + +class testcomponent_shutdown(component.Component): + def __init__(self, name): + component.Component.__init__(self, name) + self.shutdowned = False + + def shutdown(self): + self.shutdowned = True + +class ComponentTestClass(unittest.TestCase): + def tearDown(self): + component._ComponentRegistry.components = {} + + def test_start_component(self): + def on_start(result, c): + self.assertEquals(c._component_state, "Started") + + c = testcomponent("test_start_c1") + d = component.start(["test_start_c1"]) + d.addCallback(on_start, c) + + def test_start_depends(self): + def on_start(result, c1, c2): + self.assertEquals(c1._component_state, "Started") + self.assertEquals(c2._component_state, "Started") + + c1 = testcomponent("test_start_depends_c1") + c2 = testcomponent("test_start_depends_c2") + c2._component_depend = ["test_start_depends_c1"] + + d = component.start(["test_start_depends_c2"]) + d.addCallback(on_start, c1, c2) + + + def start_with_depends(self): + c1 = testcomponent_delaystart("test_start_all_c1") + c2 = testcomponent("test_start_all_c2") + c3 = testcomponent_delaystart("test_start_all_c3") + c4 = testcomponent("test_start_all_c4") + c5 = testcomponent("test_start_all_c5") + + c3._component_depend = ["test_start_all_c5", "test_start_all_c1"] + c4._component_depend = ["test_start_all_c3"] + c2._component_depend = ["test_start_all_c4"] + + d = component.start() + return (d, c1, c2, c3, c4, c5) + + def finish_start_with_depends(self, *args): + for c in args[1:]: + component.deregister(c._component_name) + + def test_start_all(self): + def on_start(*args): + for c in args[1:]: + self.assertEquals(c._component_state, "Started") + + ret = self.start_with_depends() + ret[0].addCallback(on_start, *ret[1:]) + ret[0].addCallback(self.finish_start_with_depends, *ret[1:]) + + def test_register_exception(self): + c1 = testcomponent("test_register_exception_c1") + self.assertRaises( + component.ComponentAlreadyRegistered, + testcomponent, + "test_register_exception_c1") + + def test_stop_component(self): + def on_stop(result, c): + self.assertEquals(c._component_state, "Stopped") + self.assertFalse(c._component_timer.running) + + def on_start(result, c): + self.assertEquals(c._component_state, "Started") + return component.stop(["test_stop_component_c1"]).addCallback(on_stop, c) + + c = testcomponent_update("test_stop_component_c1") + d = component.start(["test_stop_component_c1"]) + d.addCallback(on_start, c) + + def test_stop_all(self): + def on_stop(*args): + for c in args[1:]: + self.assertEquals(c._component_state, "Stopped") + + def on_start(*args): + for c in args[1:]: + self.assertEquals(c._component_state, "Started") + return component.stop().addCallback(on_stop, *args[1:]) + + ret = self.start_with_depends() + ret[0].addCallback(on_start, *ret[1:]) + ret[0].addCallback(self.finish_start_with_depends, *ret[1:]) + + def test_update(self): + def on_start(result, c1, counter): + self.assertTrue(c1._component_timer) + self.assertTrue(c1._component_timer.running) + self.assertNotEqual(c1.counter, counter) + return component.stop() + + c1 = testcomponent_update("test_update_c1") + cnt = int(c1.counter) + d = component.start(["test_update_c1"]) + + d.addCallback(on_start, c1, cnt) + + def test_pause(self): + def on_pause(result, c1, counter): + self.assertEqual(c1._component_state, "Paused") + self.assertNotEqual(c1.counter, counter) + self.assertFalse(c1._component_timer.running) + + def on_start(result, c1, counter): + self.assertTrue(c1._component_timer) + self.assertNotEqual(c1.counter, counter) + d = component.pause(["test_pause_c1"]) + d.addCallback(on_pause, c1, counter) + return d + + c1 = testcomponent_update("test_pause_c1") + cnt = int(c1.counter) + d = component.start(["test_pause_c1"]) + + d.addCallback(on_start, c1, cnt) + + def test_shutdown(self): + def on_shutdown(result, c1): + self.assertTrue(c1.shutdowned) + self.assertEquals(c1._component_state, "Stopped") + + def on_start(result, c1): + d = component.shutdown() + d.addCallback(on_shutdown, c1) + return d + + c1 = testcomponent_shutdown("test_shutdown_c1") + d = component.start(["test_shutdown_c1"]) + d.addCallback(on_start, c1) diff --git a/tests/test_core.py b/tests/test_core.py index ed5986e42..41fca5efb 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -18,13 +18,17 @@ class CoreTestCase(unittest.TestCase): common.set_tmp_config_dir() self.rpcserver = RPCServer(listen=False) self.core = Core() - component.start() + d = component.start() + return d def tearDown(self): - component.stop() - component.shutdown() - del self.rpcserver - del self.core + + def on_shutdown(result): + component._ComponentRegistry.components = {} + del self.rpcserver + del self.core + + return component.shutdown().addCallback(on_shutdown) def test_add_torrent_file(self): options = {}