From 58cb9e1c22540ff32fe0de267f60476d0bc3f623 Mon Sep 17 00:00:00 2001 From: Chase Sterling Date: Fri, 7 Dec 2012 19:00:43 -0500 Subject: [PATCH] Make sure dependent components are stopped before their dependencies. Make sure deferreds are not lost in new resume saving code. --- deluge/component.py | 32 ++++++++++++++++++++++++-------- deluge/core/alertmanager.py | 3 ++- deluge/core/torrentmanager.py | 29 +++++++++++------------------ 3 files changed, 37 insertions(+), 27 deletions(-) diff --git a/deluge/component.py b/deluge/component.py index 538d2f9f6..6085db277 100644 --- a/deluge/component.py +++ b/deluge/component.py @@ -34,6 +34,7 @@ # import logging +from collections import defaultdict from twisted.internet.defer import maybeDeferred, succeed, DeferredList, fail from twisted.internet.task import LoopingCall @@ -225,6 +226,8 @@ class ComponentRegistry(object): """ def __init__(self): self.components = {} + # Stores all of the components that are dependent on a particular component + self.dependents = defaultdict(list) def register(self, obj): """ @@ -243,6 +246,9 @@ class ComponentRegistry(object): "Component already registered with name %s" % name) self.components[obj._component_name] = obj + if obj._component_depend: + for depend in obj._component_depend: + self.dependents[depend].append(name) def deregister(self, obj): """ @@ -317,11 +323,23 @@ class ComponentRegistry(object): elif isinstance(names, str): names = [names] + def on_dependents_stopped(result, name): + return self.components[name]._component_stop() + + stopped_in_deferred = set() deferreds = [] for name in names: + if name in stopped_in_deferred: + continue if name in self.components: - deferreds.append(self.components[name]._component_stop()) + if name in self.dependents: + # If other components depend on this component, stop them first + d = self.stop(self.dependents[name]).addCallback(on_dependents_stopped, name) + deferreds.append(d) + stopped_in_deferred.update(self.dependents[name]) + else: + deferreds.append(self.components[name]._component_stop()) return DeferredList(deferreds) @@ -360,7 +378,7 @@ class ComponentRegistry(object): :param names: a list of Components to resume :type names: list - :returns: a Deferred object that will fire once all Components have been sucessfully resumed + :returns: a Deferred object that will fire once all Components have been successfully resumed :rtype: twisted.internet.defer.Deferred """ @@ -384,16 +402,14 @@ class ComponentRegistry(object): be called when the program is exiting to ensure all Components have a chance to properly shutdown. - :returns: a Deferred object that will fire once all Components have been sucessfully resumed + :returns: a Deferred object that will fire once all Components have been successfully shut down :rtype: twisted.internet.defer.Deferred """ - deferreds = [] + def on_stopped(result): + return DeferredList(map(lambda c: c._component_shutdown(), self.components.values())) - for component in self.components.values(): - deferreds.append(component._component_shutdown()) - - return DeferredList(deferreds) + return self.stop(self.components.keys()).addCallback(on_stopped) def update(self): """ diff --git a/deluge/core/alertmanager.py b/deluge/core/alertmanager.py index e2ff6c865..063bda723 100644 --- a/deluge/core/alertmanager.py +++ b/deluge/core/alertmanager.py @@ -75,7 +75,8 @@ class AlertManager(component.Component): def stop(self): for dc in self.delayed_calls: - dc.cancel() + if dc.active(): + dc.cancel() self.delayed_calls = [] def register_handler(self, alert_type, handler): diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py index d2e9bff13..621b3c181 100644 --- a/deluge/core/torrentmanager.py +++ b/deluge/core/torrentmanager.py @@ -43,6 +43,7 @@ import shutil import operator import logging import re +from collections import defaultdict from twisted.internet.task import LoopingCall from twisted.internet.defer import Deferred, DeferredList @@ -134,7 +135,7 @@ class TorrentManager(component.Component): def __init__(self): component.Component.__init__(self, "TorrentManager", interval=5, - depend=["CorePluginManager"]) + depend=["CorePluginManager", "AlertManager"]) log.debug("TorrentManager init..") # Set the libtorrent session self.session = component.get("Core").session @@ -154,7 +155,7 @@ class TorrentManager(component.Component): # This is a map of torrent_ids to Deferreds used to track needed resume data. # The Deferreds will be completed when resume data has been saved. - self.waiting_on_resume_data = {} + self.waiting_on_resume_data = defaultdict(list) # Keeps track of resume data self.resume_data = {} @@ -247,14 +248,7 @@ class TorrentManager(component.Component): # Stop the status cleanup LoopingCall here self.torrents[key].prev_status_cleanup_loop.stop() - # Save all resume data and wait until alerts have finished - lc = LoopingCall(lambda: self.alerts.handle_alerts(True)) - - def on_save_resume_finished(result, lc): - lc.stop() - - self.save_resume_data(self.torrents.keys()).addBoth(on_save_resume_finished, lc) - return lc.start(0.01) + return self.save_resume_data(self.torrents.keys()) def update(self): for torrent_id, torrent in self.torrents.items(): @@ -768,24 +762,22 @@ class TorrentManager(component.Component): if torrent_ids is None: torrent_ids = (t[0] for t in self.torrents.iteritems() if t[1].handle.need_save_resume_data()) - if self.waiting_on_resume_data: - # If we are still waiting on resume data from last call, force write and clear the queue - self.save_resume_data_file() - self.waiting_on_resume_data = {} + deferreds = [] def on_torrent_resume_save(result, torrent_id): self.waiting_on_resume_data.pop(torrent_id, None) for torrent_id in torrent_ids: - self.waiting_on_resume_data[torrent_id] = Deferred() - self.waiting_on_resume_data[torrent_id].addBoth(on_torrent_resume_save, torrent_id) + d = Deferred().addBoth(on_torrent_resume_save, torrent_id) + self.waiting_on_resume_data[torrent_id].append(d) + deferreds.append(d) self.torrents[torrent_id].save_resume_data() def on_all_resume_data_finished(result): if result: self.save_resume_data_file() - return DeferredList(self.waiting_on_resume_data.values()).addBoth(on_all_resume_data_finished) + return DeferredList(deferreds).addBoth(on_all_resume_data_finished) def load_resume_data_file(self): resume_data = {} @@ -1087,7 +1079,8 @@ class TorrentManager(component.Component): self.resume_data[torrent_id] = lt.bencode(alert.resume_data) if torrent_id in self.waiting_on_resume_data: - self.waiting_on_resume_data[torrent_id].callback(None) + for d in self.waiting_on_resume_data[torrent_id]: + d.callback(None) def on_alert_save_resume_data_failed(self, alert): log.debug("on_alert_save_resume_data_failed: %s", alert.message())