Make sure dependent components are stopped before their dependencies.

Make sure deferreds are not lost in new resume saving code.
This commit is contained in:
Chase Sterling 2012-12-07 19:00:43 -05:00
parent ce99b5f688
commit 58cb9e1c22
3 changed files with 37 additions and 27 deletions

View File

@ -34,6 +34,7 @@
# #
import logging import logging
from collections import defaultdict
from twisted.internet.defer import maybeDeferred, succeed, DeferredList, fail from twisted.internet.defer import maybeDeferred, succeed, DeferredList, fail
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
@ -225,6 +226,8 @@ class ComponentRegistry(object):
""" """
def __init__(self): def __init__(self):
self.components = {} self.components = {}
# Stores all of the components that are dependent on a particular component
self.dependents = defaultdict(list)
def register(self, obj): def register(self, obj):
""" """
@ -243,6 +246,9 @@ class ComponentRegistry(object):
"Component already registered with name %s" % name) "Component already registered with name %s" % name)
self.components[obj._component_name] = obj self.components[obj._component_name] = obj
if obj._component_depend:
for depend in obj._component_depend:
self.dependents[depend].append(name)
def deregister(self, obj): def deregister(self, obj):
""" """
@ -317,10 +323,22 @@ class ComponentRegistry(object):
elif isinstance(names, str): elif isinstance(names, str):
names = [names] names = [names]
def on_dependents_stopped(result, name):
return self.components[name]._component_stop()
stopped_in_deferred = set()
deferreds = [] deferreds = []
for name in names: for name in names:
if name in stopped_in_deferred:
continue
if name in self.components: if name in self.components:
if name in self.dependents:
# If other components depend on this component, stop them first
d = self.stop(self.dependents[name]).addCallback(on_dependents_stopped, name)
deferreds.append(d)
stopped_in_deferred.update(self.dependents[name])
else:
deferreds.append(self.components[name]._component_stop()) deferreds.append(self.components[name]._component_stop())
return DeferredList(deferreds) return DeferredList(deferreds)
@ -360,7 +378,7 @@ class ComponentRegistry(object):
:param names: a list of Components to resume :param names: a list of Components to resume
:type names: list :type names: list
:returns: a Deferred object that will fire once all Components have been sucessfully resumed :returns: a Deferred object that will fire once all Components have been successfully resumed
:rtype: twisted.internet.defer.Deferred :rtype: twisted.internet.defer.Deferred
""" """
@ -384,16 +402,14 @@ class ComponentRegistry(object):
be called when the program is exiting to ensure all Components have a be called when the program is exiting to ensure all Components have a
chance to properly shutdown. chance to properly shutdown.
:returns: a Deferred object that will fire once all Components have been sucessfully resumed :returns: a Deferred object that will fire once all Components have been successfully shut down
:rtype: twisted.internet.defer.Deferred :rtype: twisted.internet.defer.Deferred
""" """
deferreds = [] def on_stopped(result):
return DeferredList(map(lambda c: c._component_shutdown(), self.components.values()))
for component in self.components.values(): return self.stop(self.components.keys()).addCallback(on_stopped)
deferreds.append(component._component_shutdown())
return DeferredList(deferreds)
def update(self): def update(self):
""" """

View File

@ -75,6 +75,7 @@ class AlertManager(component.Component):
def stop(self): def stop(self):
for dc in self.delayed_calls: for dc in self.delayed_calls:
if dc.active():
dc.cancel() dc.cancel()
self.delayed_calls = [] self.delayed_calls = []

View File

@ -43,6 +43,7 @@ import shutil
import operator import operator
import logging import logging
import re import re
from collections import defaultdict
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from twisted.internet.defer import Deferred, DeferredList from twisted.internet.defer import Deferred, DeferredList
@ -134,7 +135,7 @@ class TorrentManager(component.Component):
def __init__(self): def __init__(self):
component.Component.__init__(self, "TorrentManager", interval=5, component.Component.__init__(self, "TorrentManager", interval=5,
depend=["CorePluginManager"]) depend=["CorePluginManager", "AlertManager"])
log.debug("TorrentManager init..") log.debug("TorrentManager init..")
# Set the libtorrent session # Set the libtorrent session
self.session = component.get("Core").session self.session = component.get("Core").session
@ -154,7 +155,7 @@ class TorrentManager(component.Component):
# This is a map of torrent_ids to Deferreds used to track needed resume data. # This is a map of torrent_ids to Deferreds used to track needed resume data.
# The Deferreds will be completed when resume data has been saved. # The Deferreds will be completed when resume data has been saved.
self.waiting_on_resume_data = {} self.waiting_on_resume_data = defaultdict(list)
# Keeps track of resume data # Keeps track of resume data
self.resume_data = {} self.resume_data = {}
@ -247,14 +248,7 @@ class TorrentManager(component.Component):
# Stop the status cleanup LoopingCall here # Stop the status cleanup LoopingCall here
self.torrents[key].prev_status_cleanup_loop.stop() self.torrents[key].prev_status_cleanup_loop.stop()
# Save all resume data and wait until alerts have finished return self.save_resume_data(self.torrents.keys())
lc = LoopingCall(lambda: self.alerts.handle_alerts(True))
def on_save_resume_finished(result, lc):
lc.stop()
self.save_resume_data(self.torrents.keys()).addBoth(on_save_resume_finished, lc)
return lc.start(0.01)
def update(self): def update(self):
for torrent_id, torrent in self.torrents.items(): for torrent_id, torrent in self.torrents.items():
@ -768,24 +762,22 @@ class TorrentManager(component.Component):
if torrent_ids is None: if torrent_ids is None:
torrent_ids = (t[0] for t in self.torrents.iteritems() if t[1].handle.need_save_resume_data()) torrent_ids = (t[0] for t in self.torrents.iteritems() if t[1].handle.need_save_resume_data())
if self.waiting_on_resume_data: deferreds = []
# If we are still waiting on resume data from last call, force write and clear the queue
self.save_resume_data_file()
self.waiting_on_resume_data = {}
def on_torrent_resume_save(result, torrent_id): def on_torrent_resume_save(result, torrent_id):
self.waiting_on_resume_data.pop(torrent_id, None) self.waiting_on_resume_data.pop(torrent_id, None)
for torrent_id in torrent_ids: for torrent_id in torrent_ids:
self.waiting_on_resume_data[torrent_id] = Deferred() d = Deferred().addBoth(on_torrent_resume_save, torrent_id)
self.waiting_on_resume_data[torrent_id].addBoth(on_torrent_resume_save, torrent_id) self.waiting_on_resume_data[torrent_id].append(d)
deferreds.append(d)
self.torrents[torrent_id].save_resume_data() self.torrents[torrent_id].save_resume_data()
def on_all_resume_data_finished(result): def on_all_resume_data_finished(result):
if result: if result:
self.save_resume_data_file() self.save_resume_data_file()
return DeferredList(self.waiting_on_resume_data.values()).addBoth(on_all_resume_data_finished) return DeferredList(deferreds).addBoth(on_all_resume_data_finished)
def load_resume_data_file(self): def load_resume_data_file(self):
resume_data = {} resume_data = {}
@ -1087,7 +1079,8 @@ class TorrentManager(component.Component):
self.resume_data[torrent_id] = lt.bencode(alert.resume_data) self.resume_data[torrent_id] = lt.bencode(alert.resume_data)
if torrent_id in self.waiting_on_resume_data: if torrent_id in self.waiting_on_resume_data:
self.waiting_on_resume_data[torrent_id].callback(None) for d in self.waiting_on_resume_data[torrent_id]:
d.callback(None)
def on_alert_save_resume_data_failed(self, alert): def on_alert_save_resume_data_failed(self, alert):
log.debug("on_alert_save_resume_data_failed: %s", alert.message()) log.debug("on_alert_save_resume_data_failed: %s", alert.message())