[Alerts] Fix alert handler segfault on lt.pop_alerts

We cannot handle an alert after calling lt.pop_alerts for a subsequent
time since the alert objects are invalidated and with cause a segfault.

To resolve this issue add a timeout to the handler calls and wait in the
alert thread for either the handlers to be called or eventually be
cancelled before getting more alerts.

This is still not an ideal solution and might leave to backlog of alerts
but this is better than crashing the application. Perhaps the timeout
could be tweaked to be shorter for certain alert types such as stats.

Related: https://github.com/arvidn/libtorrent/issues/6437
This commit is contained in:
Calum Lind 2023-12-03 19:47:11 +00:00
parent fa8d19335e
commit 7046824115
No known key found for this signature in database
GPG Key ID: 90597A687B836BA3
1 changed files with 29 additions and 30 deletions

View File

@ -17,10 +17,12 @@ This should typically only be used by the Core. Plugins should utilize the
import contextlib import contextlib
import logging import logging
import threading import threading
import time
from collections import defaultdict from collections import defaultdict
from functools import partial
from typing import Any, Callable from typing import Any, Callable
from twisted.internet import reactor, threads from twisted.internet import reactor, task, threads
import deluge.component as component import deluge.component as component
from deluge._libtorrent import lt from deluge._libtorrent import lt
@ -56,8 +58,7 @@ class AlertManager(component.Component):
# handlers is a dictionary of lists {"alert_type": [handler1,h2,..]} # handlers is a dictionary of lists {"alert_type": [handler1,h2,..]}
self.handlers = defaultdict(list) self.handlers = defaultdict(list)
self.handlers_retry_timeout = 0.3 self.handlers_timeout_secs = 2
self.handlers_retry_count = 6
self.delayed_calls = [] self.delayed_calls = []
self._event = threading.Event() self._event = threading.Event()
@ -82,45 +83,33 @@ class AlertManager(component.Component):
def wait_for_alert_in_thread(self): def wait_for_alert_in_thread(self):
while self._component_state not in ('Stopping', 'Stopped'): while self._component_state not in ('Stopping', 'Stopped'):
if self.check_delayed_calls():
time.sleep(0.05)
continue
if self.session.wait_for_alert(1000) is None: if self.session.wait_for_alert(1000) is None:
continue continue
if self._event.wait(): if self._event.wait():
threads.blockingCallFromThread(reactor, self.maybe_handle_alerts) threads.blockingCallFromThread(reactor, self.maybe_handle_alerts)
def on_delayed_call_timeout(self, result, timeout, **kwargs):
log.warning('Alert handler was timed-out before being called %s', kwargs)
def cancel_delayed_calls(self): def cancel_delayed_calls(self):
"""Cancel all delayed handlers.""" """Cancel all delayed handlers."""
for delayed_call in self.delayed_calls: for delayed_call in self.delayed_calls:
if delayed_call.active(): delayed_call.cancel()
delayed_call.cancel()
self.delayed_calls = [] self.delayed_calls = []
def check_delayed_calls(self, retries: int = 0) -> bool: def check_delayed_calls(self) -> bool:
"""Returns True if any handler calls are delayed (upto retry limit).""" """Returns True if any handler calls are delayed."""
self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()] self.delayed_calls = [dc for dc in self.delayed_calls if not dc.called]
if not self.delayed_calls: return len(self.delayed_calls) > 0
return False
if retries > self.handlers_retry_count: def maybe_handle_alerts(self) -> None:
log.warning(
'Alert handlers timeout reached, cancelling: %s', self.delayed_calls
)
self.cancel_delayed_calls()
return False
return True
def maybe_handle_alerts(self, retries: int = 0) -> None:
if self._component_state != 'Started': if self._component_state != 'Started':
return return
if self.check_delayed_calls(retries):
log.debug('Waiting for delayed alerts: %s', self.delayed_calls)
retries += 1
reactor.callLater(
self.handlers_retry_timeout, self.maybe_handle_alerts, retries
)
return
self.handle_alerts() self.handle_alerts()
def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None: def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None:
@ -182,8 +171,18 @@ class AlertManager(component.Component):
for handler in self.handlers[alert_type]: for handler in self.handlers[alert_type]:
if log.isEnabledFor(logging.DEBUG): if log.isEnabledFor(logging.DEBUG):
log.debug('Handling alert: %s', alert_type) log.debug('Handling alert: %s', alert_type)
d = task.deferLater(reactor, 0, handler, alert)
self.delayed_calls.append(reactor.callLater(0, handler, alert)) on_handler_timeout = partial(
self.on_delayed_call_timeout,
handler=handler.__qualname__,
alert_type=alert_type,
)
d.addTimeout(
self.handlers_timeout_secs,
reactor,
onTimeoutCancel=on_handler_timeout,
)
self.delayed_calls.append(d)
def set_alert_queue_size(self, queue_size): def set_alert_queue_size(self, queue_size):
"""Sets the maximum size of the libtorrent alert queue""" """Sets the maximum size of the libtorrent alert queue"""