[Core] Refactor Alertmanager to retry unhandled alerts
We are currently creating a copy of each alert to avoid segfaults when the next pop_alerts invalidates the lt alert objects we are holding for handler callbacks. However these alert copies are not deep enough since need to also resolve the alert object methods e.g. message() therefore it is still possible to result in lt segfaults. The workaround is to check for any handlers not called, give them some more time and eventually discard if still not handled. Ref: https://github.com/arvidn/libtorrent/issues/6437
This commit is contained in:
parent
b5f8c5af2d
commit
54d6f50231
|
@ -18,7 +18,6 @@ import contextlib
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from types import SimpleNamespace
|
|
||||||
from typing import Any, Callable
|
from typing import Any, Callable
|
||||||
|
|
||||||
from twisted.internet import reactor, threads
|
from twisted.internet import reactor, threads
|
||||||
|
@ -57,11 +56,13 @@ class AlertManager(component.Component):
|
||||||
|
|
||||||
# handlers is a dictionary of lists {"alert_type": [handler1,h2,..]}
|
# handlers is a dictionary of lists {"alert_type": [handler1,h2,..]}
|
||||||
self.handlers = defaultdict(list)
|
self.handlers = defaultdict(list)
|
||||||
|
self.handlers_retry_timeout = 0.3
|
||||||
|
self.handlers_retry_count = 6
|
||||||
self.delayed_calls = []
|
self.delayed_calls = []
|
||||||
self._event = threading.Event()
|
self._event = threading.Event()
|
||||||
|
|
||||||
def update(self):
|
def update(self):
|
||||||
self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()]
|
pass
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
thread = threading.Thread(
|
thread = threading.Thread(
|
||||||
|
@ -71,10 +72,7 @@ class AlertManager(component.Component):
|
||||||
self._event.set()
|
self._event.set()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
for delayed_call in self.delayed_calls:
|
self.cancel_delayed_calls()
|
||||||
if delayed_call.active():
|
|
||||||
delayed_call.cancel()
|
|
||||||
self.delayed_calls = []
|
|
||||||
|
|
||||||
def pause(self):
|
def pause(self):
|
||||||
self._event.clear()
|
self._event.clear()
|
||||||
|
@ -89,8 +87,40 @@ class AlertManager(component.Component):
|
||||||
if self._event.wait():
|
if self._event.wait():
|
||||||
threads.blockingCallFromThread(reactor, self.maybe_handle_alerts)
|
threads.blockingCallFromThread(reactor, self.maybe_handle_alerts)
|
||||||
|
|
||||||
def maybe_handle_alerts(self):
|
def cancel_delayed_calls(self):
|
||||||
if self._component_state == 'Started':
|
"""Cancel all delayed handlers."""
|
||||||
|
for delayed_call in self.delayed_calls:
|
||||||
|
if delayed_call.active():
|
||||||
|
delayed_call.cancel()
|
||||||
|
self.delayed_calls = []
|
||||||
|
|
||||||
|
def check_delayed_calls(self, retries: int = 0) -> bool:
|
||||||
|
"""Returns True if any handler calls are delayed (upto retry limit)."""
|
||||||
|
self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()]
|
||||||
|
if not self.delayed_calls:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if retries > self.handlers_retry_count:
|
||||||
|
log.warning(
|
||||||
|
'Alert handlers timeout reached, cancelling: %s', self.delayed_calls
|
||||||
|
)
|
||||||
|
self.cancel_delayed_calls()
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def maybe_handle_alerts(self, retries: int = 0) -> None:
|
||||||
|
if self._component_state != 'Started':
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.check_delayed_calls(retries):
|
||||||
|
log.debug('Waiting for delayed alerts: %s', self.delayed_calls)
|
||||||
|
retries += 1
|
||||||
|
reactor.callLater(
|
||||||
|
self.handlers_retry_timeout, self.maybe_handle_alerts, retries
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
self.handle_alerts()
|
self.handle_alerts()
|
||||||
|
|
||||||
def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None:
|
def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None:
|
||||||
|
@ -153,21 +183,7 @@ class AlertManager(component.Component):
|
||||||
if log.isEnabledFor(logging.DEBUG):
|
if log.isEnabledFor(logging.DEBUG):
|
||||||
log.debug('Handling alert: %s', alert_type)
|
log.debug('Handling alert: %s', alert_type)
|
||||||
|
|
||||||
alert_copy = self.create_alert_copy(alert)
|
self.delayed_calls.append(reactor.callLater(0, handler, alert))
|
||||||
self.delayed_calls.append(reactor.callLater(0, handler, alert_copy))
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def create_alert_copy(alert):
|
|
||||||
"""Create a Python copy of libtorrent alert
|
|
||||||
|
|
||||||
Avoid segfault if an alert is handled after next pop_alert call"""
|
|
||||||
return SimpleNamespace(
|
|
||||||
**{
|
|
||||||
attr: getattr(alert, attr)
|
|
||||||
for attr in dir(alert)
|
|
||||||
if not attr.startswith('__')
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
def set_alert_queue_size(self, queue_size):
|
def set_alert_queue_size(self, queue_size):
|
||||||
"""Sets the maximum size of the libtorrent alert queue"""
|
"""Sets the maximum size of the libtorrent alert queue"""
|
||||||
|
|
|
@ -3,31 +3,19 @@
|
||||||
# the additional special exception to link portions of this program with the OpenSSL library.
|
# the additional special exception to link portions of this program with the OpenSSL library.
|
||||||
# See LICENSE for more details.
|
# See LICENSE for more details.
|
||||||
#
|
#
|
||||||
from types import SimpleNamespace
|
from dataclasses import dataclass
|
||||||
|
|
||||||
import pytest_twisted
|
import pytest
|
||||||
|
|
||||||
import deluge.component as component
|
|
||||||
from deluge.conftest import BaseTestCase
|
|
||||||
from deluge.core.core import Core
|
from deluge.core.core import Core
|
||||||
|
|
||||||
|
|
||||||
class DummyAlert1:
|
class LtSessionMock:
|
||||||
def __init__(self):
|
|
||||||
self.message = '1'
|
|
||||||
|
|
||||||
|
|
||||||
class DummyAlert2:
|
|
||||||
def __init__(self):
|
|
||||||
self.message = '2'
|
|
||||||
|
|
||||||
|
|
||||||
class SessionMock:
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.alerts = []
|
self.alerts = []
|
||||||
|
|
||||||
def set_alerts(self):
|
def push_alerts(self, alerts):
|
||||||
self.alerts = [DummyAlert1(), DummyAlert2()]
|
self.alerts = alerts
|
||||||
|
|
||||||
def wait_for_alert(self, timeout):
|
def wait_for_alert(self, timeout):
|
||||||
return self.alerts[0] if len(self.alerts) > 0 else None
|
return self.alerts[0] if len(self.alerts) > 0 else None
|
||||||
|
@ -38,16 +26,38 @@ class SessionMock:
|
||||||
return alerts
|
return alerts
|
||||||
|
|
||||||
|
|
||||||
class TestAlertManager(BaseTestCase):
|
@dataclass
|
||||||
def set_up(self):
|
class LtAlertMock:
|
||||||
|
type: int
|
||||||
|
name: str
|
||||||
|
message: str
|
||||||
|
|
||||||
|
def message(self):
|
||||||
|
return self.message
|
||||||
|
|
||||||
|
def what(self):
|
||||||
|
return self.name
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_alert1():
|
||||||
|
return LtAlertMock(type=1, name='mock_alert1', message='Alert 1')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_alert2():
|
||||||
|
return LtAlertMock(type=2, name='mock_alert2', message='Alert 2')
|
||||||
|
|
||||||
|
|
||||||
|
class TestAlertManager:
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def set_up(self, component):
|
||||||
self.core = Core()
|
self.core = Core()
|
||||||
self.core.config.config['lsd'] = False
|
self.core.config.config['lsd'] = False
|
||||||
self.am = component.get('AlertManager')
|
self.am = component.get('AlertManager')
|
||||||
self.am.session = SessionMock()
|
self.am.session = LtSessionMock()
|
||||||
return component.start(['AlertManager'])
|
|
||||||
|
|
||||||
def tear_down(self):
|
component.start(['AlertManager'])
|
||||||
return component.shutdown()
|
|
||||||
|
|
||||||
def test_register_handler(self):
|
def test_register_handler(self):
|
||||||
def handler(alert):
|
def handler(alert):
|
||||||
|
@ -58,22 +68,27 @@ class TestAlertManager(BaseTestCase):
|
||||||
assert self.am.handlers['dummy1'] == [handler]
|
assert self.am.handlers['dummy1'] == [handler]
|
||||||
assert self.am.handlers['dummy2'] == [handler]
|
assert self.am.handlers['dummy2'] == [handler]
|
||||||
|
|
||||||
@pytest_twisted.ensureDeferred
|
async def test_pop_alert(self, mock_callback, mock_alert1, mock_alert2):
|
||||||
async def test_pop_alert(self, mock_callback):
|
self.am.register_handler('mock_alert1', mock_callback)
|
||||||
mock_callback.reset_mock()
|
|
||||||
self.am.register_handler('DummyAlert1', mock_callback)
|
self.am.session.push_alerts([mock_alert1, mock_alert2])
|
||||||
self.am.session.set_alerts()
|
|
||||||
await mock_callback.deferred
|
|
||||||
mock_callback.assert_called_once_with(SimpleNamespace(message='1'))
|
|
||||||
|
|
||||||
@pytest_twisted.ensureDeferred
|
|
||||||
async def test_pause_not_pop_alert(self, mock_callback):
|
|
||||||
await component.pause(['AlertManager'])
|
|
||||||
self.am.register_handler('DummyAlert1', mock_callback)
|
|
||||||
self.am.session.set_alerts()
|
|
||||||
await mock_callback.deferred
|
await mock_callback.deferred
|
||||||
|
|
||||||
|
mock_callback.assert_called_once_with(mock_alert1)
|
||||||
|
|
||||||
|
async def test_pause_not_pop_alert(
|
||||||
|
self, component, mock_alert1, mock_alert2, mock_callback
|
||||||
|
):
|
||||||
|
await component.pause(['AlertManager'])
|
||||||
|
|
||||||
|
self.am.register_handler('mock_alert1', mock_callback)
|
||||||
|
self.am.session.push_alerts([mock_alert1, mock_alert2])
|
||||||
|
|
||||||
|
await mock_callback.deferred
|
||||||
|
|
||||||
mock_callback.assert_not_called()
|
mock_callback.assert_not_called()
|
||||||
assert not self.am._event.isSet()
|
assert not self.am._event.is_set()
|
||||||
assert len(self.am.session.alerts) == 2
|
assert len(self.am.session.alerts) == 2
|
||||||
|
|
||||||
def test_deregister_handler(self):
|
def test_deregister_handler(self):
|
||||||
|
|
Loading…
Reference in New Issue