diff --git a/deluge/core/torrent.py b/deluge/core/torrent.py index 44bea6c5c..3664c1e2d 100644 --- a/deluge/core/torrent.py +++ b/deluge/core/torrent.py @@ -125,9 +125,6 @@ class Torrent(object): # Set the torrent_id for this torrent self.torrent_id = str(handle.info_hash()) - # Let's us know if we're waiting on a lt alert - self.waiting_on_resume_data = False - # Keep a list of file indexes we're waiting for file_rename alerts on # This also includes the old_folder and new_folder to know what signal to send # This is so we can send one folder_renamed signal instead of multiple @@ -913,7 +910,6 @@ class Torrent(object): """Signals libtorrent to build resume data for this torrent, it gets returned in a libtorrent alert""" self.handle.save_resume_data() - self.waiting_on_resume_data = True def write_torrentfile(self): """Writes the torrent file""" diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py index a985d531c..d2e9bff13 100644 --- a/deluge/core/torrentmanager.py +++ b/deluge/core/torrentmanager.py @@ -45,6 +45,7 @@ import logging import re from twisted.internet.task import LoopingCall +from twisted.internet.defer import Deferred, DeferredList from deluge._libtorrent import lt @@ -151,15 +152,11 @@ class TorrentManager(component.Component): self.last_seen_complete_loop = None self.queued_torrents = set() - # This is a list of torrent_id when we shutdown the torrentmanager. - # We use this list to determine if all active torrents have been paused - # and that their resume data has been written. - self.shutdown_torrent_pause_list = [] + # This is a map of torrent_ids to Deferreds used to track needed resume data. + # The Deferreds will be completed when resume data has been saved. + self.waiting_on_resume_data = {} - # self.num_resume_data used to save resume_data in bulk - self.num_resume_data = 0 - - # Keeps track of resume data that needs to be saved to disk + # Keeps track of resume data self.resume_data = {} # Register set functions @@ -216,11 +213,14 @@ class TorrentManager(component.Component): # Try to load the state from file self.load_state() - # Save the state every 5 minutes + # Save the state periodically self.save_state_timer = LoopingCall(self.save_state) self.save_state_timer.start(200, False) self.save_resume_data_timer = LoopingCall(self.save_resume_data) - self.save_resume_data_timer.start(190) + self.save_resume_data_timer.start(190, False) + # Force update for all resume data a bit less frequently + self.save_all_resume_data_timer = LoopingCall(self.save_resume_data, self.torrents.keys()) + self.save_all_resume_data_timer.start(900, False) if self.last_seen_complete_loop: self.last_seen_complete_loop.start(60) @@ -233,45 +233,28 @@ class TorrentManager(component.Component): if self.save_resume_data_timer.running: self.save_resume_data_timer.stop() + if self.save_all_resume_data_timer.running: + self.save_all_resume_data_timer.stop() + if self.last_seen_complete_loop: self.last_seen_complete_loop.stop() # Save state on shutdown self.save_state() - # Make another list just to make sure all paused torrents will be - # passed to self.save_resume_data(). With - # self.shutdown_torrent_pause_list it is possible to have a case when - # torrent_id is removed from it in self.on_alert_torrent_paused() - # before we call self.save_resume_data() here. - save_resume_data_list = [] + self.session.pause() for key in self.torrents: # Stop the status cleanup LoopingCall here self.torrents[key].prev_status_cleanup_loop.stop() - if not self.torrents[key].handle.is_paused(): - # We set auto_managed false to prevent lt from resuming the torrent - self.torrents[key].handle.auto_managed(False) - self.torrents[key].handle.pause() - self.shutdown_torrent_pause_list.append(key) - save_resume_data_list.append(key) - self.save_resume_data(save_resume_data_list) + # Save all resume data and wait until alerts have finished + lc = LoopingCall(lambda: self.alerts.handle_alerts(True)) - # We have to wait for all torrents to pause and write their resume data - wait = True - while wait: - if self.shutdown_torrent_pause_list: - wait = True - else: - wait = False - for torrent in self.torrents.values(): - if torrent.waiting_on_resume_data: - wait = True - break + def on_save_resume_finished(result, lc): + lc.stop() - time.sleep(0.01) - # Wait for all alerts - self.alerts.handle_alerts(True) + self.save_resume_data(self.torrents.keys()).addBoth(on_save_resume_finished, lc) + return lc.start(0.01) def update(self): for torrent_id, torrent in self.torrents.items(): @@ -613,9 +596,7 @@ class TorrentManager(component.Component): return False # Remove fastresume data if it is exists - resume_data = self.load_resume_data_file() - resume_data.pop(torrent_id, None) - self.save_resume_data_file(resume_data) + self.resume_data.pop(torrent_id, None) # Remove the .torrent file in the state self.torrents[torrent_id].delete_torrentfile() @@ -777,17 +758,34 @@ class TorrentManager(component.Component): def save_resume_data(self, torrent_ids=None): """ - Saves resume data for list of torrent_ids or for all torrents if - torrent_ids is None + Saves resume data for list of torrent_ids or for all torrents + needing resume data updated if torrent_ids is None + + :returns: A Deferred whose callback will be invoked when save is complete + :rtype: twisted.internet.defer.Deferred """ if torrent_ids is None: - torrent_ids = self.torrents.keys() + torrent_ids = (t[0] for t in self.torrents.iteritems() if t[1].handle.need_save_resume_data()) + + if self.waiting_on_resume_data: + # If we are still waiting on resume data from last call, force write and clear the queue + self.save_resume_data_file() + self.waiting_on_resume_data = {} + + def on_torrent_resume_save(result, torrent_id): + self.waiting_on_resume_data.pop(torrent_id, None) for torrent_id in torrent_ids: + self.waiting_on_resume_data[torrent_id] = Deferred() + self.waiting_on_resume_data[torrent_id].addBoth(on_torrent_resume_save, torrent_id) self.torrents[torrent_id].save_resume_data() - self.num_resume_data = len(torrent_ids) + def on_all_resume_data_finished(result): + if result: + self.save_resume_data_file() + + return DeferredList(self.waiting_on_resume_data.values()).addBoth(on_all_resume_data_finished) def load_resume_data_file(self): resume_data = {} @@ -807,34 +805,16 @@ class TorrentManager(component.Component): return resume_data - def save_resume_data_file(self, resume_data=None): + def save_resume_data_file(self): """ - Saves the resume data file with the contents of self.resume_data. If - `resume_data` is None, then we grab the resume_data from the file on - disk, else, we update `resume_data` with self.resume_data and save - that to disk. - - :param resume_data: the current resume_data, this will be loaded from disk if not provided - :type resume_data: dict - + Saves the resume data file with the contents of self.resume_data. """ - # Check to see if we're waiting on more resume data - if self.num_resume_data or not self.resume_data: - return - path = os.path.join(get_config_dir(), "state", "torrents.fastresume") - # First step is to load the existing file and update the dictionary - if resume_data is None: - resume_data = self.load_resume_data_file() - - resume_data.update(self.resume_data) - self.resume_data = {} - try: log.debug("Saving fastresume file: %s", path) fastresume_file = open(path, "wb") - fastresume_file.write(lt.bencode(resume_data)) + fastresume_file.write(lt.bencode(self.resume_data)) fastresume_file.flush() os.fsync(fastresume_file.fileno()) fastresume_file.close() @@ -988,15 +968,10 @@ class TorrentManager(component.Component): if torrent.state != old_state: component.get("EventManager").emit(TorrentStateChangedEvent(torrent_id, torrent.state)) - # Don't save resume data for each torrent after self.stop() was called. - # We save resume data in bulk in self.stop() in this case. - if self.save_resume_data_timer.running: - # Write the fastresume file + # Write the fastresume file if we are not waiting on a bulk write + if not self.waiting_on_resume_data: self.save_resume_data((torrent_id, )) - if torrent_id in self.shutdown_torrent_pause_list: - self.shutdown_torrent_pause_list.remove(torrent_id) - def on_alert_torrent_checked(self, alert): log.debug("on_alert_torrent_checked") try: @@ -1105,32 +1080,21 @@ class TorrentManager(component.Component): def on_alert_save_resume_data(self, alert): log.debug("on_alert_save_resume_data") - try: - torrent_id = str(alert.handle.info_hash()) - torrent = self.torrents[torrent_id] - except: - return + torrent_id = str(alert.handle.info_hash()) - # Libtorrent in add_torrent() expects resume_data to be bencoded - self.resume_data[torrent_id] = lt.bencode(alert.resume_data) - self.num_resume_data -= 1 + if torrent_id in self.torrents: + # Libtorrent in add_torrent() expects resume_data to be bencoded + self.resume_data[torrent_id] = lt.bencode(alert.resume_data) - torrent.waiting_on_resume_data = False - - self.save_resume_data_file() + if torrent_id in self.waiting_on_resume_data: + self.waiting_on_resume_data[torrent_id].callback(None) def on_alert_save_resume_data_failed(self, alert): log.debug("on_alert_save_resume_data_failed: %s", alert.message()) - try: - torrent = self.torrents[str(alert.handle.info_hash())] - except: - return - - self.num_resume_data -= 1 - torrent.waiting_on_resume_data = False - - self.save_resume_data_file() + torrent_id = alert.handle.info_hash() + if torrent_id in self.waiting_on_resume_data: + self.waiting_on_resume_data[torrent_id].errback(Exception(alert.message())) def on_alert_file_renamed(self, alert): log.debug("on_alert_file_renamed")