From 8ef05e36a981d94b7c9331f40319f301aa5d4125 Mon Sep 17 00:00:00 2001 From: gmega Date: Tue, 14 Jan 2025 19:32:34 -0300 Subject: [PATCH] feat: add more robust retry policy to Deluge client --- benchmarks/deluge/deluge_node.py | 32 +++++++++++++++++-- benchmarks/deluge/tests/test_deluge_node.py | 34 ++++++++++++++++++++- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/benchmarks/deluge/deluge_node.py b/benchmarks/deluge/deluge_node.py index 6c37013..6acaee2 100644 --- a/benchmarks/deluge/deluge_node.py +++ b/benchmarks/deluge/deluge_node.py @@ -9,8 +9,11 @@ from typing import List, Union, Optional, Self, Dict, Any import pathvalidate from deluge_client import DelugeRPCClient -from tenacity import retry, wait_exponential +from tenacity import retry, wait_exponential, stop_after_attempt +from tenacity.stop import stop_base +from tenacity.wait import wait_base from torrentool.torrent import Torrent +from typing_extensions import Generic, TypeVar from urllib3.util import Url from benchmarks.core.experiments.experiments import ExperimentComponent @@ -129,14 +132,20 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): self.connect() return self._rpc - @retry(wait=wait_exponential(multiplier=1, min=4, max=16)) + @retry( + stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=16) + ) def connect(self) -> Self: return self._raw_connect() def _raw_connect(self): client = DelugeRPCClient(**self.daemon_args) client.connect() - self._rpc = client + self._rpc = ResilientCallWrapper( + client, + wait_policy=wait_exponential(multiplier=1, min=4, max=16), + stop_policy=stop_after_attempt(5), + ) return self def is_ready(self) -> bool: @@ -159,6 +168,23 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent): return f"DelugeNode({self.name}, {self.daemon_args['host']}:{self.daemon_args['port']})" +T = TypeVar("T") + + +class ResilientCallWrapper(Generic[T]): + def __init__(self, client: T, wait_policy: wait_base, stop_policy: stop_base): + self.client = client + self.wait_policy = wait_policy + self.stop_policy = stop_policy + + def __getattr__(self, item): + @retry(wait=self.wait_policy, stop=self.stop_policy) + def _resilient_wrapper(*args, **kwargs): + return getattr(self.client, item)(*args, **kwargs) + + return _resilient_wrapper + + class DelugeDownloadHandle(DownloadHandle): def __init__(self, torrent: Torrent, node: DelugeNode) -> None: self.node = node diff --git a/benchmarks/deluge/tests/test_deluge_node.py b/benchmarks/deluge/tests/test_deluge_node.py index 061d009..d8058e5 100644 --- a/benchmarks/deluge/tests/test_deluge_node.py +++ b/benchmarks/deluge/tests/test_deluge_node.py @@ -1,9 +1,10 @@ from pathlib import Path import pytest +from tenacity import wait_incrementing, stop_after_attempt, RetryError from benchmarks.core.utils import megabytes, await_predicate -from benchmarks.deluge.deluge_node import DelugeNode, DelugeMeta +from benchmarks.deluge.deluge_node import DelugeNode, DelugeMeta, ResilientCallWrapper from benchmarks.deluge.tracker import Tracker @@ -74,3 +75,34 @@ def test_should_remove_files( deluge_node1.remove(torrent) assert not deluge_node1.torrent_info(name="dataset1") + + +class FlakyClient: + def __init__(self): + self.count = 0 + + def flaky(self): + self.count += 1 + if self.count == 1: + raise IOError("Connection refused") + return 1 + + +def test_should_retry_operations_when_they_fail(): + wrapper = ResilientCallWrapper( + FlakyClient(), + wait_policy=wait_incrementing(start=0, increment=0), + stop_policy=stop_after_attempt(2), + ) + assert wrapper.flaky() == 1 + + +def test_should_give_up_on_operations_when_stop_policy_is_met(): + wrapper = ResilientCallWrapper( + FlakyClient(), + wait_policy=wait_incrementing(start=0, increment=0), + stop_policy=stop_after_attempt(1), + ) + + with pytest.raises(RetryError): + wrapper.flaky()