Refactor resume data saving to not read and write fastresume unnecessarily.

Refactor saving resume data on shutdown to use deferreds.
Make bulk resume data saves write to disk only once.
This commit is contained in:
Chase Sterling 2012-12-06 01:20:19 -05:00
parent ce76c278ed
commit ce99b5f688
2 changed files with 56 additions and 96 deletions

View File

@ -125,9 +125,6 @@ class Torrent(object):
# Set the torrent_id for this torrent # Set the torrent_id for this torrent
self.torrent_id = str(handle.info_hash()) self.torrent_id = str(handle.info_hash())
# Let's us know if we're waiting on a lt alert
self.waiting_on_resume_data = False
# Keep a list of file indexes we're waiting for file_rename alerts on # Keep a list of file indexes we're waiting for file_rename alerts on
# This also includes the old_folder and new_folder to know what signal to send # This also includes the old_folder and new_folder to know what signal to send
# This is so we can send one folder_renamed signal instead of multiple # This is so we can send one folder_renamed signal instead of multiple
@ -913,7 +910,6 @@ class Torrent(object):
"""Signals libtorrent to build resume data for this torrent, it gets """Signals libtorrent to build resume data for this torrent, it gets
returned in a libtorrent alert""" returned in a libtorrent alert"""
self.handle.save_resume_data() self.handle.save_resume_data()
self.waiting_on_resume_data = True
def write_torrentfile(self): def write_torrentfile(self):
"""Writes the torrent file""" """Writes the torrent file"""

View File

@ -45,6 +45,7 @@ import logging
import re import re
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from twisted.internet.defer import Deferred, DeferredList
from deluge._libtorrent import lt from deluge._libtorrent import lt
@ -151,15 +152,11 @@ class TorrentManager(component.Component):
self.last_seen_complete_loop = None self.last_seen_complete_loop = None
self.queued_torrents = set() self.queued_torrents = set()
# This is a list of torrent_id when we shutdown the torrentmanager. # This is a map of torrent_ids to Deferreds used to track needed resume data.
# We use this list to determine if all active torrents have been paused # The Deferreds will be completed when resume data has been saved.
# and that their resume data has been written. self.waiting_on_resume_data = {}
self.shutdown_torrent_pause_list = []
# self.num_resume_data used to save resume_data in bulk # Keeps track of resume data
self.num_resume_data = 0
# Keeps track of resume data that needs to be saved to disk
self.resume_data = {} self.resume_data = {}
# Register set functions # Register set functions
@ -216,11 +213,14 @@ class TorrentManager(component.Component):
# Try to load the state from file # Try to load the state from file
self.load_state() self.load_state()
# Save the state every 5 minutes # Save the state periodically
self.save_state_timer = LoopingCall(self.save_state) self.save_state_timer = LoopingCall(self.save_state)
self.save_state_timer.start(200, False) self.save_state_timer.start(200, False)
self.save_resume_data_timer = LoopingCall(self.save_resume_data) self.save_resume_data_timer = LoopingCall(self.save_resume_data)
self.save_resume_data_timer.start(190) self.save_resume_data_timer.start(190, False)
# Force update for all resume data a bit less frequently
self.save_all_resume_data_timer = LoopingCall(self.save_resume_data, self.torrents.keys())
self.save_all_resume_data_timer.start(900, False)
if self.last_seen_complete_loop: if self.last_seen_complete_loop:
self.last_seen_complete_loop.start(60) self.last_seen_complete_loop.start(60)
@ -233,45 +233,28 @@ class TorrentManager(component.Component):
if self.save_resume_data_timer.running: if self.save_resume_data_timer.running:
self.save_resume_data_timer.stop() self.save_resume_data_timer.stop()
if self.save_all_resume_data_timer.running:
self.save_all_resume_data_timer.stop()
if self.last_seen_complete_loop: if self.last_seen_complete_loop:
self.last_seen_complete_loop.stop() self.last_seen_complete_loop.stop()
# Save state on shutdown # Save state on shutdown
self.save_state() self.save_state()
# Make another list just to make sure all paused torrents will be self.session.pause()
# passed to self.save_resume_data(). With
# self.shutdown_torrent_pause_list it is possible to have a case when
# torrent_id is removed from it in self.on_alert_torrent_paused()
# before we call self.save_resume_data() here.
save_resume_data_list = []
for key in self.torrents: for key in self.torrents:
# Stop the status cleanup LoopingCall here # Stop the status cleanup LoopingCall here
self.torrents[key].prev_status_cleanup_loop.stop() self.torrents[key].prev_status_cleanup_loop.stop()
if not self.torrents[key].handle.is_paused():
# We set auto_managed false to prevent lt from resuming the torrent
self.torrents[key].handle.auto_managed(False)
self.torrents[key].handle.pause()
self.shutdown_torrent_pause_list.append(key)
save_resume_data_list.append(key)
self.save_resume_data(save_resume_data_list) # Save all resume data and wait until alerts have finished
lc = LoopingCall(lambda: self.alerts.handle_alerts(True))
# We have to wait for all torrents to pause and write their resume data def on_save_resume_finished(result, lc):
wait = True lc.stop()
while wait:
if self.shutdown_torrent_pause_list:
wait = True
else:
wait = False
for torrent in self.torrents.values():
if torrent.waiting_on_resume_data:
wait = True
break
time.sleep(0.01) self.save_resume_data(self.torrents.keys()).addBoth(on_save_resume_finished, lc)
# Wait for all alerts return lc.start(0.01)
self.alerts.handle_alerts(True)
def update(self): def update(self):
for torrent_id, torrent in self.torrents.items(): for torrent_id, torrent in self.torrents.items():
@ -613,9 +596,7 @@ class TorrentManager(component.Component):
return False return False
# Remove fastresume data if it is exists # Remove fastresume data if it is exists
resume_data = self.load_resume_data_file() self.resume_data.pop(torrent_id, None)
resume_data.pop(torrent_id, None)
self.save_resume_data_file(resume_data)
# Remove the .torrent file in the state # Remove the .torrent file in the state
self.torrents[torrent_id].delete_torrentfile() self.torrents[torrent_id].delete_torrentfile()
@ -777,17 +758,34 @@ class TorrentManager(component.Component):
def save_resume_data(self, torrent_ids=None): def save_resume_data(self, torrent_ids=None):
""" """
Saves resume data for list of torrent_ids or for all torrents if Saves resume data for list of torrent_ids or for all torrents
torrent_ids is None needing resume data updated if torrent_ids is None
:returns: A Deferred whose callback will be invoked when save is complete
:rtype: twisted.internet.defer.Deferred
""" """
if torrent_ids is None: if torrent_ids is None:
torrent_ids = self.torrents.keys() torrent_ids = (t[0] for t in self.torrents.iteritems() if t[1].handle.need_save_resume_data())
if self.waiting_on_resume_data:
# If we are still waiting on resume data from last call, force write and clear the queue
self.save_resume_data_file()
self.waiting_on_resume_data = {}
def on_torrent_resume_save(result, torrent_id):
self.waiting_on_resume_data.pop(torrent_id, None)
for torrent_id in torrent_ids: for torrent_id in torrent_ids:
self.waiting_on_resume_data[torrent_id] = Deferred()
self.waiting_on_resume_data[torrent_id].addBoth(on_torrent_resume_save, torrent_id)
self.torrents[torrent_id].save_resume_data() self.torrents[torrent_id].save_resume_data()
self.num_resume_data = len(torrent_ids) def on_all_resume_data_finished(result):
if result:
self.save_resume_data_file()
return DeferredList(self.waiting_on_resume_data.values()).addBoth(on_all_resume_data_finished)
def load_resume_data_file(self): def load_resume_data_file(self):
resume_data = {} resume_data = {}
@ -807,34 +805,16 @@ class TorrentManager(component.Component):
return resume_data return resume_data
def save_resume_data_file(self, resume_data=None): def save_resume_data_file(self):
""" """
Saves the resume data file with the contents of self.resume_data. If Saves the resume data file with the contents of self.resume_data.
`resume_data` is None, then we grab the resume_data from the file on
disk, else, we update `resume_data` with self.resume_data and save
that to disk.
:param resume_data: the current resume_data, this will be loaded from disk if not provided
:type resume_data: dict
""" """
# Check to see if we're waiting on more resume data
if self.num_resume_data or not self.resume_data:
return
path = os.path.join(get_config_dir(), "state", "torrents.fastresume") path = os.path.join(get_config_dir(), "state", "torrents.fastresume")
# First step is to load the existing file and update the dictionary
if resume_data is None:
resume_data = self.load_resume_data_file()
resume_data.update(self.resume_data)
self.resume_data = {}
try: try:
log.debug("Saving fastresume file: %s", path) log.debug("Saving fastresume file: %s", path)
fastresume_file = open(path, "wb") fastresume_file = open(path, "wb")
fastresume_file.write(lt.bencode(resume_data)) fastresume_file.write(lt.bencode(self.resume_data))
fastresume_file.flush() fastresume_file.flush()
os.fsync(fastresume_file.fileno()) os.fsync(fastresume_file.fileno())
fastresume_file.close() fastresume_file.close()
@ -988,15 +968,10 @@ class TorrentManager(component.Component):
if torrent.state != old_state: if torrent.state != old_state:
component.get("EventManager").emit(TorrentStateChangedEvent(torrent_id, torrent.state)) component.get("EventManager").emit(TorrentStateChangedEvent(torrent_id, torrent.state))
# Don't save resume data for each torrent after self.stop() was called. # Write the fastresume file if we are not waiting on a bulk write
# We save resume data in bulk in self.stop() in this case. if not self.waiting_on_resume_data:
if self.save_resume_data_timer.running:
# Write the fastresume file
self.save_resume_data((torrent_id, )) self.save_resume_data((torrent_id, ))
if torrent_id in self.shutdown_torrent_pause_list:
self.shutdown_torrent_pause_list.remove(torrent_id)
def on_alert_torrent_checked(self, alert): def on_alert_torrent_checked(self, alert):
log.debug("on_alert_torrent_checked") log.debug("on_alert_torrent_checked")
try: try:
@ -1105,32 +1080,21 @@ class TorrentManager(component.Component):
def on_alert_save_resume_data(self, alert): def on_alert_save_resume_data(self, alert):
log.debug("on_alert_save_resume_data") log.debug("on_alert_save_resume_data")
try:
torrent_id = str(alert.handle.info_hash()) torrent_id = str(alert.handle.info_hash())
torrent = self.torrents[torrent_id]
except:
return
if torrent_id in self.torrents:
# Libtorrent in add_torrent() expects resume_data to be bencoded # Libtorrent in add_torrent() expects resume_data to be bencoded
self.resume_data[torrent_id] = lt.bencode(alert.resume_data) self.resume_data[torrent_id] = lt.bencode(alert.resume_data)
self.num_resume_data -= 1
torrent.waiting_on_resume_data = False if torrent_id in self.waiting_on_resume_data:
self.waiting_on_resume_data[torrent_id].callback(None)
self.save_resume_data_file()
def on_alert_save_resume_data_failed(self, alert): def on_alert_save_resume_data_failed(self, alert):
log.debug("on_alert_save_resume_data_failed: %s", alert.message()) log.debug("on_alert_save_resume_data_failed: %s", alert.message())
try: torrent_id = alert.handle.info_hash()
torrent = self.torrents[str(alert.handle.info_hash())]
except:
return
self.num_resume_data -= 1
torrent.waiting_on_resume_data = False
self.save_resume_data_file()
if torrent_id in self.waiting_on_resume_data:
self.waiting_on_resume_data[torrent_id].errback(Exception(alert.message()))
def on_alert_file_renamed(self, alert): def on_alert_file_renamed(self, alert):
log.debug("on_alert_file_renamed") log.debug("on_alert_file_renamed")