diff --git a/deluge/core/alertmanager.py b/deluge/core/alertmanager.py index 71045b0a3..cf541f015 100644 --- a/deluge/core/alertmanager.py +++ b/deluge/core/alertmanager.py @@ -17,10 +17,12 @@ This should typically only be used by the Core. Plugins should utilize the import contextlib import logging import threading +import time from collections import defaultdict +from functools import partial from typing import Any, Callable -from twisted.internet import reactor, threads +from twisted.internet import reactor, task, threads import deluge.component as component from deluge._libtorrent import lt @@ -56,8 +58,7 @@ class AlertManager(component.Component): # handlers is a dictionary of lists {"alert_type": [handler1,h2,..]} self.handlers = defaultdict(list) - self.handlers_retry_timeout = 0.3 - self.handlers_retry_count = 6 + self.handlers_timeout_secs = 2 self.delayed_calls = [] self._event = threading.Event() @@ -82,45 +83,33 @@ class AlertManager(component.Component): def wait_for_alert_in_thread(self): while self._component_state not in ('Stopping', 'Stopped'): + if self.check_delayed_calls(): + time.sleep(0.05) + continue + if self.session.wait_for_alert(1000) is None: continue if self._event.wait(): threads.blockingCallFromThread(reactor, self.maybe_handle_alerts) + def on_delayed_call_timeout(self, result, timeout, **kwargs): + log.warning('Alert handler was timed-out before being called %s', kwargs) + def cancel_delayed_calls(self): """Cancel all delayed handlers.""" for delayed_call in self.delayed_calls: - if delayed_call.active(): - delayed_call.cancel() + delayed_call.cancel() self.delayed_calls = [] - def check_delayed_calls(self, retries: int = 0) -> bool: - """Returns True if any handler calls are delayed (upto retry limit).""" - self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()] - if not self.delayed_calls: - return False + def check_delayed_calls(self) -> bool: + """Returns True if any handler calls are delayed.""" + self.delayed_calls = [dc for dc in self.delayed_calls if not dc.called] + return len(self.delayed_calls) > 0 - if retries > self.handlers_retry_count: - log.warning( - 'Alert handlers timeout reached, cancelling: %s', self.delayed_calls - ) - self.cancel_delayed_calls() - return False - - return True - - def maybe_handle_alerts(self, retries: int = 0) -> None: + def maybe_handle_alerts(self) -> None: if self._component_state != 'Started': return - if self.check_delayed_calls(retries): - log.debug('Waiting for delayed alerts: %s', self.delayed_calls) - retries += 1 - reactor.callLater( - self.handlers_retry_timeout, self.maybe_handle_alerts, retries - ) - return - self.handle_alerts() def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None: @@ -182,8 +171,18 @@ class AlertManager(component.Component): for handler in self.handlers[alert_type]: if log.isEnabledFor(logging.DEBUG): log.debug('Handling alert: %s', alert_type) - - self.delayed_calls.append(reactor.callLater(0, handler, alert)) + d = task.deferLater(reactor, 0, handler, alert) + on_handler_timeout = partial( + self.on_delayed_call_timeout, + handler=handler.__qualname__, + alert_type=alert_type, + ) + d.addTimeout( + self.handlers_timeout_secs, + reactor, + onTimeoutCancel=on_handler_timeout, + ) + self.delayed_calls.append(d) def set_alert_queue_size(self, queue_size): """Sets the maximum size of the libtorrent alert queue"""