[Core] Refactor prefetch_metadata for more clarity

Just trying to clean up some of the more complicated callback logic.

Notable changes:

* The test was awaiting a DeferredList. By default that will eat
exceptions and just add them to the result list (including test
assertion exceptions.) Added fireOnOneErrback=True to make sure that
wasn't happening.  * Moved the logic for multiple calls to await the
same response into torrentmanager from core, so no matter where the
prefetch is called from it will wait for the original call.
* Implemented the multiple calls with an explicit queue of waiting
callbacks, rather than a callback callback chain.  * Moved to one inline
async function rather than split into a main and callback after alert
function.
* Added some more type hints to the stuff I changed.

Adjusted test since we are using prefetch as an async function now
we have to schedule the alert to come after we start awaiting the
prefetch call.

Closes: https://github.com/deluge-torrent/deluge/pull/368
This commit is contained in:
Chase Sterling 2022-02-07 19:40:05 -05:00 committed by Calum Lind
parent cd63efd935
commit 47e548fdb5
No known key found for this signature in database
GPG Key ID: 90597A687B836BA3
5 changed files with 64 additions and 57 deletions

View File

@ -432,9 +432,10 @@ class Core(component.Component):
return d return d
@export @export
def prefetch_magnet_metadata( @maybe_coroutine
async def prefetch_magnet_metadata(
self, magnet: str, timeout: int = 30 self, magnet: str, timeout: int = 30
) -> 'defer.Deferred[Tuple[str, bytes]]': ) -> Tuple[str, bytes]:
"""Download magnet metadata without adding to Deluge session. """Download magnet metadata without adding to Deluge session.
Used by UIs to get magnet files for selection before adding to session. Used by UIs to get magnet files for selection before adding to session.
@ -446,19 +447,10 @@ class Core(component.Component):
timeout: Number of seconds to wait before canceling request. timeout: Number of seconds to wait before canceling request.
Returns: Returns:
A tuple of (torrent_id (str), metadata (str)) for the magnet. A tuple of (torrent_id, metadata) for the magnet.
""" """
return await self.torrentmanager.prefetch_metadata(magnet, timeout)
def on_metadata(result, result_d):
"""Return result of torrent_id and metadata"""
result_d.callback(result)
return result
d = self.torrentmanager.prefetch_metadata(magnet, timeout)
# Use a separate callback chain to handle existing prefetching magnet.
result_d = defer.Deferred()
d.addBoth(on_metadata, result_d)
return result_d
@export @export
def add_torrent_file( def add_torrent_file(

View File

@ -13,6 +13,7 @@ import sys
import traceback import traceback
from collections import namedtuple from collections import namedtuple
from types import FunctionType from types import FunctionType
from typing import Callable, TypeVar, overload
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.internet.protocol import Factory, connectionDone from twisted.internet.protocol import Factory, connectionDone
@ -41,6 +42,18 @@ RPC_EVENT = 3
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
TCallable = TypeVar('TCallable', bound=Callable)
@overload
def export(func: TCallable) -> TCallable:
...
@overload
def export(auth_level: int) -> Callable[[TCallable], TCallable]:
...
def export(auth_level=AUTH_LEVEL_DEFAULT): def export(auth_level=AUTH_LEVEL_DEFAULT):
""" """

View File

@ -14,10 +14,10 @@ import os
import pickle import pickle
import time import time
from base64 import b64encode from base64 import b64encode
from collections import namedtuple
from tempfile import gettempdir from tempfile import gettempdir
from typing import Dict, List, NamedTuple, Tuple
from twisted.internet import defer, error, reactor, threads from twisted.internet import defer, reactor, threads
from twisted.internet.defer import Deferred, DeferredList from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
@ -57,6 +57,11 @@ LT_DEFAULT_ADD_TORRENT_FLAGS = (
) )
class PrefetchQueueItem(NamedTuple):
alert_deferred: Deferred
result_queue: List[Deferred]
class TorrentState: # pylint: disable=old-style-class class TorrentState: # pylint: disable=old-style-class
"""Create a torrent state. """Create a torrent state.
@ -134,7 +139,8 @@ class TorrentManager(component.Component):
""" """
callLater = reactor.callLater # noqa: N815 # This is used in the test to mock out timeouts
clock = reactor
def __init__(self): def __init__(self):
component.Component.__init__( component.Component.__init__(
@ -163,7 +169,7 @@ class TorrentManager(component.Component):
self.is_saving_state = False self.is_saving_state = False
self.save_resume_data_file_lock = defer.DeferredLock() self.save_resume_data_file_lock = defer.DeferredLock()
self.torrents_loading = {} self.torrents_loading = {}
self.prefetching_metadata = {} self.prefetching_metadata: Dict[str, PrefetchQueueItem] = {}
# This is a map of torrent_ids to Deferreds used to track needed resume data. # This is a map of torrent_ids to Deferreds used to track needed resume data.
# The Deferreds will be completed when resume data has been saved. # The Deferreds will be completed when resume data has been saved.
@ -339,21 +345,24 @@ class TorrentManager(component.Component):
else: else:
return torrent_info return torrent_info
def prefetch_metadata(self, magnet, timeout): @maybe_coroutine
async def prefetch_metadata(self, magnet: str, timeout: int) -> Tuple[str, bytes]:
"""Download the metadata for a magnet URI. """Download the metadata for a magnet URI.
Args: Args:
magnet (str): A magnet URI to download the metadata for. magnet: A magnet URI to download the metadata for.
timeout (int): Number of seconds to wait before canceling. timeout: Number of seconds to wait before canceling.
Returns: Returns:
Deferred: A tuple of (torrent_id (str), metadata (dict)) A tuple of (torrent_id, metadata)
""" """
torrent_id = get_magnet_info(magnet)['info_hash'] torrent_id = get_magnet_info(magnet)['info_hash']
if torrent_id in self.prefetching_metadata: if torrent_id in self.prefetching_metadata:
return self.prefetching_metadata[torrent_id].defer d = Deferred()
self.prefetching_metadata[torrent_id].result_queue.append(d)
return await d
add_torrent_params = lt.parse_magnet_uri(magnet) add_torrent_params = lt.parse_magnet_uri(magnet)
add_torrent_params.save_path = gettempdir() add_torrent_params.save_path = gettempdir()
@ -371,36 +380,29 @@ class TorrentManager(component.Component):
d = Deferred() d = Deferred()
# Cancel the defer if timeout reached. # Cancel the defer if timeout reached.
defer_timeout = self.callLater(timeout, d.cancel) d.addTimeout(timeout, self.clock)
d.addBoth(self.on_prefetch_metadata, torrent_id, defer_timeout) self.prefetching_metadata[torrent_id] = PrefetchQueueItem(d, [])
Prefetch = namedtuple('Prefetch', 'defer handle')
self.prefetching_metadata[torrent_id] = Prefetch(defer=d, handle=torrent_handle)
return d
def on_prefetch_metadata(self, torrent_info, torrent_id, defer_timeout):
# Cancel reactor.callLater.
try: try:
defer_timeout.cancel() torrent_info = await d
except error.AlreadyCalled: except (defer.TimeoutError, defer.CancelledError):
pass log.debug(f'Prefetching metadata for {torrent_id} timed out or cancelled.')
log.debug('remove prefetch magnet from session')
try:
torrent_handle = self.prefetching_metadata.pop(torrent_id).handle
except KeyError:
pass
else:
self.session.remove_torrent(torrent_handle, 1)
metadata = b'' metadata = b''
if isinstance(torrent_info, lt.torrent_info): else:
log.debug('prefetch metadata received') log.debug('prefetch metadata received')
if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'): if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'):
metadata = torrent_info.metadata() metadata = torrent_info.metadata()
else: else:
metadata = torrent_info.info_section() metadata = torrent_info.info_section()
return torrent_id, b64encode(metadata) log.debug('remove prefetch magnet from session')
result_queue = self.prefetching_metadata.pop(torrent_id).result_queue
self.session.remove_torrent(torrent_handle, 1)
result = torrent_id, b64encode(metadata)
for d in result_queue:
d.callback(result)
return result
def _build_torrent_options(self, options): def _build_torrent_options(self, options):
"""Load default options and update if needed.""" """Load default options and update if needed."""
@ -451,7 +453,7 @@ class TorrentManager(component.Component):
raise AddTorrentError('Torrent already being added (%s).' % torrent_id) raise AddTorrentError('Torrent already being added (%s).' % torrent_id)
elif torrent_id in self.prefetching_metadata: elif torrent_id in self.prefetching_metadata:
# Cancel and remove metadata fetching torrent. # Cancel and remove metadata fetching torrent.
self.prefetching_metadata[torrent_id].defer.cancel() self.prefetching_metadata[torrent_id].alert_deferred.cancel()
# Check for renamed files and if so, rename them in the torrent_info before adding. # Check for renamed files and if so, rename them in the torrent_info before adding.
if options['mapped_files'] and torrent_info: if options['mapped_files'] and torrent_info:
@ -1559,7 +1561,7 @@ class TorrentManager(component.Component):
# Try callback to prefetch_metadata method. # Try callback to prefetch_metadata method.
try: try:
d = self.prefetching_metadata[torrent_id].defer d = self.prefetching_metadata[torrent_id].alert_deferred
except KeyError: except KeyError:
pass pass
else: else:

View File

@ -80,10 +80,10 @@ class TopLevelResource(Resource):
class TestCore(BaseTestCase): class TestCore(BaseTestCase):
def set_up(self): def set_up(self):
self.rpcserver = RPCServer(listen=False) self.rpcserver = RPCServer(listen=False)
self.core = Core() self.core: Core = Core()
self.core.config.config['lsd'] = False self.core.config.config['lsd'] = False
self.clock = task.Clock() self.clock = task.Clock()
self.core.torrentmanager.callLater = self.clock.callLater self.core.torrentmanager.clock = self.clock
self.listen_port = 51242 self.listen_port = 51242
return component.start().addCallback(self.start_web_server) return component.start().addCallback(self.start_web_server)
@ -313,20 +313,18 @@ class TestCore(BaseTestCase):
r2 = self.core.get_torrent_status(tid2, ['paused']) r2 = self.core.get_torrent_status(tid2, ['paused'])
assert r2['paused'] assert r2['paused']
@pytest_twisted.inlineCallbacks
def test_prefetch_metadata_existing(self): def test_prefetch_metadata_existing(self):
"""Check another call with same magnet returns existing deferred.""" """Check another call with same magnet returns existing deferred."""
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'') expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'')
def on_result(result): d1 = self.core.prefetch_magnet_metadata(magnet)
assert result == expected
d = self.core.prefetch_magnet_metadata(magnet)
d.addCallback(on_result)
d2 = self.core.prefetch_magnet_metadata(magnet) d2 = self.core.prefetch_magnet_metadata(magnet)
d2.addCallback(on_result) dg = defer.gatherResults([d1, d2], consumeErrors=True)
self.clock.advance(30) self.clock.advance(30)
return defer.DeferredList([d, d2]) result = yield dg
assert result == [expected] * 2
@pytest_twisted.inlineCallbacks @pytest_twisted.inlineCallbacks
def test_remove_torrent(self): def test_remove_torrent(self):

View File

@ -12,7 +12,7 @@ from unittest import mock
import pytest import pytest
import pytest_twisted import pytest_twisted
from twisted.internet import task from twisted.internet import reactor, task
from deluge import component from deluge import component
from deluge.bencode import bencode from deluge.bencode import bencode
@ -78,7 +78,9 @@ class TestTorrentmanager(BaseTestCase):
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
d = self.tm.prefetch_metadata(magnet, 30) d = self.tm.prefetch_metadata(magnet, 30)
self.tm.on_alert_metadata_received(mock_alert) # Make sure to use calllater, because the above prefetch call won't
# actually start running until we await it.
reactor.callLater(0, self.tm.on_alert_metadata_received, mock_alert)
expected = ( expected = (
'ab570cdd5a17ea1b61e970bb72047de141bce173', 'ab570cdd5a17ea1b61e970bb72047de141bce173',