[Core] Implement async_add_torrent in torrentmanager

This commit is contained in:
bendikro 2015-11-22 15:01:40 +01:00 committed by Calum Lind
parent 73220b5116
commit 5d1aff157e
8 changed files with 141 additions and 87 deletions

View File

@ -50,11 +50,10 @@ class AlertManager(component.Component):
# handlers is a dictionary of lists {"alert_type": [handler1,h2,..]}
self.handlers = {}
self.delayed_calls = []
self.wait_on_handler = False
def update(self):
self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()]
self.handle_alerts(wait=self.wait_on_handler)
self.handle_alerts()
def stop(self):
for delayed_call in self.delayed_calls:
@ -92,12 +91,9 @@ class AlertManager(component.Component):
# Handler is in this alert type list
value.remove(handler)
def handle_alerts(self, wait=False):
"""Pops all libtorrent alerts in the session queue and handles them appropriately.
Args:
wait (bool): If True the handler functions will be run straight away and
waited to return before processing the next alert.
def handle_alerts(self):
"""
Pops all libtorrent alerts in the session queue and handles them appropriately.
"""
alerts = self.session.pop_alerts()
if not alerts:
@ -118,10 +114,7 @@ class AlertManager(component.Component):
# Call any handlers for this alert type
if alert_type in self.handlers:
for handler in self.handlers[alert_type]:
if not wait:
self.delayed_calls.append(reactor.callLater(0, handler, alert))
else:
handler(alert)
self.delayed_calls.append(reactor.callLater(0, handler, alert))
def set_alert_queue_size(self, queue_size):
"""Sets the maximum size of the libtorrent alert queue"""

View File

@ -16,7 +16,7 @@ import shutil
import tempfile
import threading
from twisted.internet import reactor, task
from twisted.internet import defer, reactor, task
from twisted.web.client import getPage
import deluge.common
@ -33,7 +33,7 @@ from deluge.core.pluginmanager import PluginManager
from deluge.core.preferencesmanager import PreferencesManager
from deluge.core.rpcserver import export
from deluge.core.torrentmanager import TorrentManager
from deluge.error import DelugeError, InvalidPathError, InvalidTorrentError
from deluge.error import AddTorrentError, DelugeError, InvalidPathError, InvalidTorrentError
from deluge.event import NewVersionAvailableEvent, SessionPausedEvent, SessionResumedEvent, TorrentQueueChangedEvent
from deluge.httpdownloader import download_file
@ -211,13 +211,14 @@ class Core(component.Component):
log.error("There was an error decoding the filedump string: %s", ex)
try:
torrent_id = self.torrentmanager.add(
d = self.torrentmanager.add(
filedump=filedump, options=options, filename=filename, save_state=save_state
)
except RuntimeError as ex:
log.error("There was an error adding the torrent file %s: %s", filename, ex)
torrent_id = None
return torrent_id
raise
else:
return d
# Exported Methods
@export
@ -246,14 +247,18 @@ class Core(component.Component):
Deferred
"""
@defer.inlineCallbacks
def add_torrents():
torrent_ids = []
count = len(torrent_files)
errors = []
last_index = len(torrent_files) - 1
for idx, torrent in enumerate(torrent_files):
torrent_id = self._add_torrent_file(torrent[0], torrent[1],
torrent[2], save_state=idx == (count - 1))
torrent_ids.append(torrent_id)
return torrent_ids
try:
yield self._add_torrent_file(torrent[0], torrent[1],
torrent[2], save_state=idx == last_index)
except AddTorrentError as ex:
log.warn("Error when adding torrent: '%s'", ex)
errors.append(ex)
defer.returnValue(errors)
return task.deferLater(reactor, 0, add_torrents)
@export

View File

@ -10,6 +10,7 @@
"""TorrentManager handles Torrent objects"""
import cPickle
import datetime
import logging
import operator
import os
@ -26,7 +27,7 @@ from deluge.common import decode_string, get_magnet_info, utf8_encoded
from deluge.configmanager import ConfigManager, get_config_dir
from deluge.core.authmanager import AUTH_LEVEL_ADMIN
from deluge.core.torrent import Torrent, TorrentOptions, sanitize_filepath
from deluge.error import InvalidTorrentError
from deluge.error import AddTorrentError, InvalidTorrentError
from deluge.event import (PreTorrentRemovedEvent, SessionStartedEvent, TorrentAddedEvent, TorrentFileCompletedEvent,
TorrentFileRenamedEvent, TorrentFinishedEvent, TorrentRemovedEvent, TorrentResumedEvent)
@ -105,6 +106,7 @@ class TorrentManager(component.Component):
self.torrents = {}
self.queued_torrents = set()
self.is_saving_state = False
self.torrents_loading = {}
# This is a map of torrent_ids to Deferreds used to track needed resume data.
# The Deferreds will be completed when resume data has been saved.
@ -152,6 +154,7 @@ class TorrentManager(component.Component):
self.alerts.register_handler("external_ip_alert", self.on_alert_external_ip)
self.alerts.register_handler("performance_alert", self.on_alert_performance)
self.alerts.register_handler("fastresume_rejected_alert", self.on_alert_fastresume_rejected)
self.alerts.register_handler("add_torrent_alert", self.on_add_torrent_alert)
# Define timers
self.save_state_timer = LoopingCall(self.save_state)
@ -163,7 +166,6 @@ class TorrentManager(component.Component):
if os.path.isfile(self.temp_file):
def archive_file(filename):
"""Archives the file in 'archive' sub-directory with timestamp appended"""
import datetime
filepath = os.path.join(self.state_dir, filename)
filepath_bak = filepath + ".bak"
archive_dir = os.path.join(get_config_dir(), "archive")
@ -302,16 +304,14 @@ class TorrentManager(component.Component):
TorrentAddedEvent: Torrent with torrent_id added to session.
"""
if torrent_info is None and filedump is None and magnet is None:
log.error("You must specify a valid torrent_info, torrent state or magnet.")
return
if not torrent_info and not filedump and not magnet:
raise AddTorrentError("You must specify a valid torrent_info, torrent state or magnet.")
if filedump:
try:
torrent_info = lt.torrent_info(lt.bdecode(filedump))
except RuntimeError as ex:
log.error("Unable to add torrent, decoding filedump failed: %s", ex)
return
raise AddTorrentError("Unable to add torrent, decoding filedump failed: %s" % ex)
add_torrent_params = {}
if torrent_info:
@ -329,15 +329,15 @@ class TorrentManager(component.Component):
add_torrent_params["name"] = magnet_info["name"]
torrent_id = magnet_info["info_hash"]
else:
log.error("Unable to add magnet, invalid magnet info: %s", magnet)
return
raise AddTorrentError("Unable to add magnet, invalid magnet info: %s" % magnet)
# Check for existing torrent in session.
if torrent_id in self.get_torrent_list():
log.warning("Unable to add torrent (%s), already in session", torrent_id)
# Attempt merge trackers before returning.
self.torrents[torrent_id].merge_trackers(torrent_info)
return
raise AddTorrentError("Torrent already in session (%s)." % torrent_id)
elif torrent_id in self.torrents_loading:
raise AddTorrentError("Torrent already being added (%s)." % torrent_id)
# Load default options and update if needed.
_options = TorrentOptions()
@ -385,24 +385,31 @@ class TorrentManager(component.Component):
if options["seed_mode"]:
add_torrent_params["flags"] |= lt.add_torrent_params_flags_t.flag_seed_mode
# We need to pause the AlertManager momentarily to prevent alerts
# for this torrent being generated before a Torrent object is created.
component.pause("AlertManager")
d = Deferred()
try:
handle = self.session.add_torrent(add_torrent_params)
if not handle.is_valid():
raise InvalidTorrentError("Torrent handle is invalid!")
except (RuntimeError, InvalidTorrentError) as ex:
log.error("Unable to add torrent to session: %s", ex)
component.resume("AlertManager")
self.torrents_loading[torrent_id] = (d, options, state, filename, magnet, resume_data, filedump, save_state)
self.session.async_add_torrent(add_torrent_params)
except RuntimeError as ex:
raise AddTorrentError("Unable to add torrent to session: %s" % ex)
return d
def on_add_torrent_alert(self, alert):
"""Alert handler for libtorrent add_torrent_alert"""
if not alert.handle.is_valid():
log.warn("Torrent handle is invalid!")
return
# Create a Torrent object and add to the dictionary.
torrent = Torrent(handle, options, state, filename, magnet)
self.torrents[torrent.torrent_id] = torrent
try:
torrent_id = str(alert.handle.info_hash())
except RuntimeError as ex:
log.warn("Failed to get torrent id from handle: %s", ex)
return
component.resume("AlertManager")
d, options, state, filename, magnet, resume_data, filedump, save_state = self.torrents_loading.pop(torrent_id)
# Create a Torrent object and add to the dictionary.
torrent = Torrent(alert.handle, options, state, filename, magnet)
self.torrents[torrent.torrent_id] = torrent
# Store the orignal resume_data, in case of errors.
if resume_data:
@ -422,7 +429,7 @@ class TorrentManager(component.Component):
component.get("EventManager").emit(TorrentAddedEvent(torrent.torrent_id, from_state))
if log.isEnabledFor(logging.DEBUG):
log.debug("Torrent added: %s", str(handle.info_hash()))
log.debug("Torrent added: %s", str(alert.handle.info_hash()))
if log.isEnabledFor(logging.INFO):
name_and_owner = torrent.get_status(["name", "owner"])
log.info("Torrent %s from user \"%s\" %s",
@ -438,7 +445,7 @@ class TorrentManager(component.Component):
if save_state:
self.save_state()
return torrent.torrent_id
d.callback(torrent.torrent_id)
def remove(self, torrent_id, remove_data=False, save_state=True):
"""Remove a torrent from the session.
@ -556,6 +563,7 @@ class TorrentManager(component.Component):
SessionStartedEvent: Emitted after all torrents are added to the session.
"""
start = datetime.datetime.now()
state = self.open_state()
state = self.fixup_state(state)
@ -563,10 +571,7 @@ class TorrentManager(component.Component):
state.torrents.sort(key=operator.attrgetter("queue"), reverse=self.config["queue_new_to_top"])
resume_data = self.load_resume_data_file()
# Tell alertmanager to wait for the handlers while adding torrents.
# This speeds up startup loading the torrents by quite a lot for some reason (~40%)
self.alerts.wait_on_handler = True
deferreds = []
for t_state in state.torrents:
# Populate the options dict from state
options = TorrentOptions()
@ -587,12 +592,16 @@ class TorrentManager(component.Component):
if torrent_info:
magnet = None
self.add(torrent_info=torrent_info, state=t_state, options=options, save_state=False,
magnet=magnet, resume_data=resume_data.get(t_state.torrent_id))
d = self.add(torrent_info=torrent_info, state=t_state, options=options, save_state=False,
magnet=magnet, resume_data=resume_data.get(t_state.torrent_id))
deferreds.append(d)
self.alerts.wait_on_handler = False
log.info("Finished loading %d torrents.", len(state.torrents))
component.get("EventManager").emit(SessionStartedEvent())
deferred_list = DeferredList(deferreds, consumeErrors=False)
def on_complete(result):
log.info("Finished loading %d torrents in %s", len(state.torrents), str(datetime.datetime.now() - start))
component.get("EventManager").emit(SessionStartedEvent())
deferred_list.addCallback(on_complete)
def create_state(self):
"""Create a state of all the torrents in TorrentManager.

View File

@ -32,6 +32,10 @@ class InvalidTorrentError(DelugeError):
pass
class AddTorrentError(DelugeError):
pass
class InvalidPathError(DelugeError):
pass

View File

@ -3,7 +3,7 @@ import os
from hashlib import sha1 as sha
import pytest
from twisted.internet import reactor
from twisted.internet import defer, reactor
from twisted.internet.error import CannotListenError
from twisted.python.failure import Failure
from twisted.web.http import FORBIDDEN
@ -16,7 +16,7 @@ import deluge.component as component
import deluge.core.torrent
from deluge.core.core import Core
from deluge.core.rpcserver import RPCServer
from deluge.error import InvalidTorrentError
from deluge.error import AddTorrentError, InvalidTorrentError
from deluge.ui.web.common import compress
from . import common
@ -102,25 +102,55 @@ class CoreTestCase(BaseTestCase):
return component.shutdown().addCallback(on_shutdown)
@defer.inlineCallbacks
def test_add_torrent_files(self):
options = {}
filenames = ["test.torrent", "test_torrent.file.torrent"]
files_to_add = []
for f in filenames:
filename = os.path.join(os.path.dirname(__file__), f)
filedump = base64.encodestring(open(filename).read())
files_to_add.append((filename, filedump, options))
errors = yield self.core.add_torrent_files(files_to_add)
self.assertEquals(len(errors), 0)
@defer.inlineCallbacks
def test_add_torrent_files_error_duplicate(self):
options = {}
filenames = ["test.torrent", "test.torrent"]
files_to_add = []
for f in filenames:
filename = os.path.join(os.path.dirname(__file__), f)
filedump = base64.encodestring(open(filename).read())
files_to_add.append((filename, filedump, options))
errors = yield self.core.add_torrent_files(files_to_add)
self.assertEquals(len(errors), 1)
self.assertTrue(str(errors[0]).startswith("Torrent already in session"))
@defer.inlineCallbacks
def test_add_torrent_file(self):
options = {}
filename = os.path.join(os.path.dirname(__file__), "test.torrent")
torrent_id = self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
torrent_id = yield self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
# Get the info hash from the test.torrent
from deluge.bencode import bdecode, bencode
info_hash = sha(bencode(bdecode(open(filename).read())["info"])).hexdigest()
self.assertEquals(torrent_id, info_hash)
def test_add_torrent_file_invalid_filedump(self):
options = {}
filename = os.path.join(os.path.dirname(__file__), "test.torrent")
self.assertRaises(AddTorrentError, self.core.add_torrent_file, filename, False, options)
@defer.inlineCallbacks
def test_add_torrent_url(self):
url = "http://localhost:%d/ubuntu-9.04-desktop-i386.iso.torrent" % self.listen_port
options = {}
info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00"
d = self.core.add_torrent_url(url, options)
d.addCallback(self.assertEquals, info_hash)
return d
torrent_id = yield self.core.add_torrent_url(url, options)
self.assertEquals(torrent_id, info_hash)
def test_add_torrent_url_with_cookie(self):
url = "http://localhost:%d/cookie" % self.listen_port
@ -143,7 +173,6 @@ class CoreTestCase(BaseTestCase):
d = self.core.add_torrent_url(url, options)
d.addCallback(self.assertEquals, info_hash)
return d
def test_add_torrent_url_with_partial_download(self):
@ -153,21 +182,21 @@ class CoreTestCase(BaseTestCase):
d = self.core.add_torrent_url(url, options)
d.addCallback(self.assertEquals, info_hash)
return d
def test_add_magnet(self):
@defer.inlineCallbacks
def test_add_torrent_magnet(self):
info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00"
uri = deluge.common.create_magnet_uri(info_hash)
options = {}
torrent_id = self.core.add_torrent_magnet(uri, options)
torrent_id = yield self.core.add_torrent_magnet(uri, options)
self.assertEquals(torrent_id, info_hash)
@defer.inlineCallbacks
def test_remove_torrent(self):
options = {}
filename = os.path.join(os.path.dirname(__file__), "test.torrent")
torrent_id = self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
torrent_id = yield self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
removed = self.core.remove_torrent(torrent_id, True)
self.assertTrue(removed)
self.assertEquals(len(self.core.get_session_state()), 0)
@ -182,12 +211,13 @@ class CoreTestCase(BaseTestCase):
d.addCallback(test_true)
return d
@defer.inlineCallbacks
def test_remove_torrents(self):
options = {}
filename = os.path.join(os.path.dirname(__file__), "test.torrent")
torrent_id = self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
torrent_id = yield self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
filename2 = os.path.join(os.path.dirname(__file__), "unicode_filenames.torrent")
torrent_id2 = self.core.add_torrent_file(filename2, base64.encodestring(open(filename2).read()), options)
torrent_id2 = yield self.core.add_torrent_file(filename2, base64.encodestring(open(filename2).read()), options)
d = self.core.remove_torrents([torrent_id, torrent_id2], True)
def test_ret(val):
@ -197,12 +227,13 @@ class CoreTestCase(BaseTestCase):
def test_session_state(val):
self.assertEquals(len(self.core.get_session_state()), 0)
d.addCallback(test_session_state)
return d
yield d
@defer.inlineCallbacks
def test_remove_torrents_invalid(self):
options = {}
filename = os.path.join(os.path.dirname(__file__), "test.torrent")
torrent_id = self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
torrent_id = yield self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
d = self.core.remove_torrents(["invalidid1", "invalidid2", torrent_id], False)
def test_ret(val):
@ -212,7 +243,7 @@ class CoreTestCase(BaseTestCase):
self.assertTrue(val[1][0] == "invalidid2")
self.assertTrue(isinstance(val[1][1], InvalidTorrentError))
d.addCallback(test_ret)
return d
yield d
def test_get_session_status(self):
status = self.core.get_session_status(["upload_rate", "download_rate"])

View File

@ -5,7 +5,7 @@ import os
import sys
import time
from twisted.internet import reactor
from twisted.internet import defer, reactor
from twisted.internet.task import deferLater
from twisted.trial import unittest
@ -144,10 +144,11 @@ class TorrentTestCase(unittest.TestCase):
# self.print_priority_list(priorities)
@defer.inlineCallbacks
def test_torrent_error_data_missing(self):
options = {"seed_mode": True}
filename = os.path.join(os.path.dirname(__file__), "test_torrent.file.torrent")
torrent_id = core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
torrent_id = yield core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
torrent = core.torrentmanager.torrents[torrent_id]
self.assert_state(torrent, "Seeding")
@ -157,10 +158,11 @@ class TorrentTestCase(unittest.TestCase):
time.sleep(0.2) # Delay to wait for alert from lt
self.assert_state(torrent, "Error")
@defer.inlineCallbacks
def test_torrent_error_resume_original_state(self):
options = {"seed_mode": True, "add_paused": True}
filename = os.path.join(os.path.dirname(__file__), "test_torrent.file.torrent")
torrent_id = core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
torrent_id = yield core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
torrent = core.torrentmanager.torrents[torrent_id]
orig_state = "Paused"
@ -173,8 +175,10 @@ class TorrentTestCase(unittest.TestCase):
# Clear error and verify returned to original state
torrent.force_recheck()
return deferLater(reactor, 0.1, self.assert_state, torrent, orig_state)
yield deferLater(reactor, 0.1, self.assert_state, torrent, orig_state)
return
@defer.inlineCallbacks
def test_torrent_error_resume_data_unaltered(self):
resume_data = {'active_time': 13399, 'num_incomplete': 16777215, 'announce_to_lsd': 1, 'seed_mode': 0,
'pieces': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01', 'paused': 0,
@ -202,8 +206,8 @@ class TorrentTestCase(unittest.TestCase):
filename = os.path.join(os.path.dirname(__file__), "test_torrent.file.torrent")
filedump = open(filename).read()
torrent_id = core.torrentmanager.add(state=torrent_state, filedump=filedump,
resume_data=lt.bencode(resume_data))
torrent_id = yield core.torrentmanager.add(state=torrent_state, filedump=filedump,
resume_data=lt.bencode(resume_data))
torrent = core.torrentmanager.torrents[torrent_id]
def assert_resume_data():
@ -211,4 +215,5 @@ class TorrentTestCase(unittest.TestCase):
tm_resume_data = lt.bdecode(core.torrentmanager.resume_data[torrent.torrent_id])
self.assertEquals(tm_resume_data, resume_data)
return deferLater(reactor, 0.5, assert_resume_data)
yield deferLater(reactor, 0.5, assert_resume_data)
return

View File

@ -2,6 +2,7 @@ import base64
import os
import warnings
from twisted.internet import defer
from twisted.trial import unittest
from deluge import component
@ -32,9 +33,10 @@ class TorrentmanagerTestCase(unittest.TestCase):
del self.torrentManager
return component.shutdown().addCallback(on_shutdown)
@defer.inlineCallbacks
def test_remove_torrent(self):
filename = os.path.join(os.path.dirname(__file__), "test.torrent")
torrent_id = self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), {})
torrent_id = yield self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), {})
self.assertTrue(self.torrentManager.remove(torrent_id, False))
def test_remove_torrent_false(self):

View File

@ -738,8 +738,13 @@ class AddTorrentDialog(component.Component):
options))
row = self.torrent_liststore.iter_next(row)
def on_torrents_added(torrent_ids):
log.info("Added %d torrents", len(torrent_ids))
def on_torrents_added(errors):
if errors:
log.info("Failed to add %d out of %d torrents.", len(errors), len(torrents_to_add))
for e in errors:
log.info("Torrent add failed: %s", e)
else:
log.info("Successfully added %d torrents.", len(torrents_to_add))
client.core.add_torrent_files(torrents_to_add).addCallback(on_torrents_added)
def _on_button_apply_clicked(self, widget):