mirror of
https://github.com/logos-storage/bittorrent-benchmarks.git
synced 2026-01-07 15:33:10 +00:00
feat: add more robust retry policy to Deluge client
This commit is contained in:
parent
700753cf3d
commit
8ef05e36a9
@ -9,8 +9,11 @@ from typing import List, Union, Optional, Self, Dict, Any
|
|||||||
|
|
||||||
import pathvalidate
|
import pathvalidate
|
||||||
from deluge_client import DelugeRPCClient
|
from deluge_client import DelugeRPCClient
|
||||||
from tenacity import retry, wait_exponential
|
from tenacity import retry, wait_exponential, stop_after_attempt
|
||||||
|
from tenacity.stop import stop_base
|
||||||
|
from tenacity.wait import wait_base
|
||||||
from torrentool.torrent import Torrent
|
from torrentool.torrent import Torrent
|
||||||
|
from typing_extensions import Generic, TypeVar
|
||||||
from urllib3.util import Url
|
from urllib3.util import Url
|
||||||
|
|
||||||
from benchmarks.core.experiments.experiments import ExperimentComponent
|
from benchmarks.core.experiments.experiments import ExperimentComponent
|
||||||
@ -129,14 +132,20 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent):
|
|||||||
self.connect()
|
self.connect()
|
||||||
return self._rpc
|
return self._rpc
|
||||||
|
|
||||||
@retry(wait=wait_exponential(multiplier=1, min=4, max=16))
|
@retry(
|
||||||
|
stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=16)
|
||||||
|
)
|
||||||
def connect(self) -> Self:
|
def connect(self) -> Self:
|
||||||
return self._raw_connect()
|
return self._raw_connect()
|
||||||
|
|
||||||
def _raw_connect(self):
|
def _raw_connect(self):
|
||||||
client = DelugeRPCClient(**self.daemon_args)
|
client = DelugeRPCClient(**self.daemon_args)
|
||||||
client.connect()
|
client.connect()
|
||||||
self._rpc = client
|
self._rpc = ResilientCallWrapper(
|
||||||
|
client,
|
||||||
|
wait_policy=wait_exponential(multiplier=1, min=4, max=16),
|
||||||
|
stop_policy=stop_after_attempt(5),
|
||||||
|
)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def is_ready(self) -> bool:
|
def is_ready(self) -> bool:
|
||||||
@ -159,6 +168,23 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent):
|
|||||||
return f"DelugeNode({self.name}, {self.daemon_args['host']}:{self.daemon_args['port']})"
|
return f"DelugeNode({self.name}, {self.daemon_args['host']}:{self.daemon_args['port']})"
|
||||||
|
|
||||||
|
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
class ResilientCallWrapper(Generic[T]):
|
||||||
|
def __init__(self, client: T, wait_policy: wait_base, stop_policy: stop_base):
|
||||||
|
self.client = client
|
||||||
|
self.wait_policy = wait_policy
|
||||||
|
self.stop_policy = stop_policy
|
||||||
|
|
||||||
|
def __getattr__(self, item):
|
||||||
|
@retry(wait=self.wait_policy, stop=self.stop_policy)
|
||||||
|
def _resilient_wrapper(*args, **kwargs):
|
||||||
|
return getattr(self.client, item)(*args, **kwargs)
|
||||||
|
|
||||||
|
return _resilient_wrapper
|
||||||
|
|
||||||
|
|
||||||
class DelugeDownloadHandle(DownloadHandle):
|
class DelugeDownloadHandle(DownloadHandle):
|
||||||
def __init__(self, torrent: Torrent, node: DelugeNode) -> None:
|
def __init__(self, torrent: Torrent, node: DelugeNode) -> None:
|
||||||
self.node = node
|
self.node = node
|
||||||
|
|||||||
@ -1,9 +1,10 @@
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from tenacity import wait_incrementing, stop_after_attempt, RetryError
|
||||||
|
|
||||||
from benchmarks.core.utils import megabytes, await_predicate
|
from benchmarks.core.utils import megabytes, await_predicate
|
||||||
from benchmarks.deluge.deluge_node import DelugeNode, DelugeMeta
|
from benchmarks.deluge.deluge_node import DelugeNode, DelugeMeta, ResilientCallWrapper
|
||||||
from benchmarks.deluge.tracker import Tracker
|
from benchmarks.deluge.tracker import Tracker
|
||||||
|
|
||||||
|
|
||||||
@ -74,3 +75,34 @@ def test_should_remove_files(
|
|||||||
|
|
||||||
deluge_node1.remove(torrent)
|
deluge_node1.remove(torrent)
|
||||||
assert not deluge_node1.torrent_info(name="dataset1")
|
assert not deluge_node1.torrent_info(name="dataset1")
|
||||||
|
|
||||||
|
|
||||||
|
class FlakyClient:
|
||||||
|
def __init__(self):
|
||||||
|
self.count = 0
|
||||||
|
|
||||||
|
def flaky(self):
|
||||||
|
self.count += 1
|
||||||
|
if self.count == 1:
|
||||||
|
raise IOError("Connection refused")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_should_retry_operations_when_they_fail():
|
||||||
|
wrapper = ResilientCallWrapper(
|
||||||
|
FlakyClient(),
|
||||||
|
wait_policy=wait_incrementing(start=0, increment=0),
|
||||||
|
stop_policy=stop_after_attempt(2),
|
||||||
|
)
|
||||||
|
assert wrapper.flaky() == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_should_give_up_on_operations_when_stop_policy_is_met():
|
||||||
|
wrapper = ResilientCallWrapper(
|
||||||
|
FlakyClient(),
|
||||||
|
wait_policy=wait_incrementing(start=0, increment=0),
|
||||||
|
stop_policy=stop_after_attempt(1),
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(RetryError):
|
||||||
|
wrapper.flaky()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user