diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 9a8d2e35b..1a96d0617 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -577,3 +577,9 @@ class WakuNode: def get_peer_info(self, peer_id: str): return self._api.get_peer(peer_id) + + @property + def container_id(self) -> str: + if not self._container: + raise RuntimeError("Node container not started yet") + return self._container.id diff --git a/src/steps/network_conditions.py b/src/steps/network_conditions.py new file mode 100644 index 000000000..3c5db8846 --- /dev/null +++ b/src/steps/network_conditions.py @@ -0,0 +1,58 @@ +import json +from typing import Any +from src.libs.custom_logger import get_custom_logger + +from src.node.api_clients.base_client import BaseClient + +logger = get_custom_logger(__name__) + + +class TrafficController(BaseClient): + def __init__(self, host: str = "127.0.0.1", port: int = 8080): + self._host = host + self._port = port + + def _post(self, path: str, payload: dict[str, Any]) -> dict[str, Any]: + url = f"http://{self._host}:{self._port}/{path.lstrip('/')}" + headers = {"Content-Type": "application/json"} + + logger.info(f"TC request POST {url} payload={payload}") + resp = self.make_request("post", url, headers=headers, data=json.dumps(payload)) + logger.info(f"TC response status={getattr(resp, 'status_code', None)}") + + return resp.json() + + def apply(self, *, node: str, command: str, value: Any = None) -> dict[str, Any]: + return self._post("tc/apply", {"node": node, "command": command, "value": value}) + + def add_latency(self, *, container_id: str, ms: int, jitter_ms: int | None = None) -> dict[str, Any]: + value: dict[str, Any] = {"ms": ms} + if jitter_ms is not None: + value["jitter_ms"] = jitter_ms + + return self.apply( + node=container_id, + command="latency", + value=value, + ) + + def add_packet_loss(self, *, container_id: str, percent: float) -> dict[str, Any]: + return self.apply( + node=container_id, + command="loss", + value={"percent": percent}, + ) + + def add_bandwidth(self, *, container_id: str, rate: str) -> dict[str, Any]: + return self.apply( + node=container_id, + command="bandwidth", + value={"rate": rate}, + ) + + def clear(self, *, container_id: str) -> dict[str, Any]: + return self.apply( + node=container_id, + command="clear", + value=None, + ) diff --git a/tests/e2e/test_network_Conditions.py b/tests/e2e/test_network_Conditions.py new file mode 100644 index 000000000..db264ec71 --- /dev/null +++ b/tests/e2e/test_network_Conditions.py @@ -0,0 +1,53 @@ +import pytest +import logging +from time import time +from src.libs.custom_logger import get_custom_logger +from src.env_vars import NODE_1, NODE_2 +from src.node.waku_node import WakuNode +from src.steps.relay import StepsRelay +from src.libs.common import delay +from src.steps.common import StepsCommon +from src.steps.network_conditions import TrafficController + +logger = get_custom_logger(__name__) + + +class TestNetworkConditions(StepsRelay): + @pytest.fixture(scope="function", autouse=True) + def setup_nodes(self, request): + self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") + self.tc = TrafficController(host="127.0.0.1", port=8080) + + def test_relay_with_latency_between_two_nodes(self): + logger.info("Starting node1 and node2 with relay enabled") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + + logger.info("Subscribing both nodes to relay topic") + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.info("Waiting for autoconnection") + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + logger.info("Applying 500ms latency to node2") + self.tc.add_latency(container_id=self.node2.container_id, ms=500) + + message = self.create_message() + + logger.info("Publishing message from node1") + start = time() + self.node1.send_relay_message(message, self.test_pubsub_topic) + + delay(1) + + logger.info("Fetching relay messages on node2") + messages = self.node2.get_relay_messages(self.test_pubsub_topic) + end = time() + + logger.info("Clearing network conditions on node2") + self.tc.clear(container_id=self.node2.container_id) + + assert messages, "Message was not received under latency" + assert (end - start) >= 0.5