From c92416f242c7c23f069a995e249c0a74987eb8cd Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Wed, 7 Jan 2026 21:34:36 +0100 Subject: [PATCH] Adding more latency tests --- src/steps/network_conditions.py | 85 ++++++-------- tests/e2e/test_network_Conditions.py | 170 ++++++++++++++++++++++++--- 2 files changed, 194 insertions(+), 61 deletions(-) diff --git a/src/steps/network_conditions.py b/src/steps/network_conditions.py index 3c5db8846..0a8a7bc1b 100644 --- a/src/steps/network_conditions.py +++ b/src/steps/network_conditions.py @@ -1,58 +1,51 @@ -import json -from typing import Any +import subprocess from src.libs.custom_logger import get_custom_logger -from src.node.api_clients.base_client import BaseClient - logger = get_custom_logger(__name__) -class TrafficController(BaseClient): - def __init__(self, host: str = "127.0.0.1", port: int = 8080): - self._host = host - self._port = port +class TrafficController: + def _pid(self, node) -> int: + if not node.container: + raise RuntimeError("Node container not started yet") - def _post(self, path: str, payload: dict[str, Any]) -> dict[str, Any]: - url = f"http://{self._host}:{self._port}/{path.lstrip('/')}" - headers = {"Content-Type": "application/json"} + node.container.reload() + pid = node.container.attrs.get("State", {}).get("Pid") + if not pid or pid == 0: + raise RuntimeError("Container PID not available (container not running?)") + return int(pid) - logger.info(f"TC request POST {url} payload={payload}") - resp = self.make_request("post", url, headers=headers, data=json.dumps(payload)) - logger.info(f"TC response status={getattr(resp, 'status_code', None)}") + def _exec(self, node, tc_args: list[str], iface: str = "eth0"): + pid = self._pid(node) - return resp.json() + cmd = ["sudo", "-n", "nsenter", "-t", str(pid), "-n", "tc"] + tc_args + logger.info(f"TC exec: {cmd}") - def apply(self, *, node: str, command: str, value: Any = None) -> dict[str, Any]: - return self._post("tc/apply", {"node": node, "command": command, "value": value}) + res = subprocess.run(cmd, capture_output=True, text=True) + if res.returncode != 0: + raise RuntimeError(f"TC failed: {' '.join(cmd)}\n" f"stdout: {res.stdout}\n" f"stderr: {res.stderr}") - def add_latency(self, *, container_id: str, ms: int, jitter_ms: int | None = None) -> dict[str, Any]: - value: dict[str, Any] = {"ms": ms} - if jitter_ms is not None: - value["jitter_ms"] = jitter_ms + def clear(self, node, iface: str = "eth0"): + try: + self._exec(node, ["qdisc", "del", "dev", iface, "root"], iface=iface) + except RuntimeError as e: + msg = str(e) + if "Cannot delete qdisc with handle of zero" in msg or "No such file or directory" in msg: + return + raise - return self.apply( - node=container_id, - command="latency", - value=value, - ) - - def add_packet_loss(self, *, container_id: str, percent: float) -> dict[str, Any]: - return self.apply( - node=container_id, - command="loss", - value={"percent": percent}, - ) - - def add_bandwidth(self, *, container_id: str, rate: str) -> dict[str, Any]: - return self.apply( - node=container_id, - command="bandwidth", - value={"rate": rate}, - ) - - def clear(self, *, container_id: str) -> dict[str, Any]: - return self.apply( - node=container_id, - command="clear", - value=None, + def add_latency(self, node, ms: int, iface: str = "eth0"): + self.clear(node, iface=iface) + self._exec(node, ["qdisc", "add", "dev", iface, "root", "netem", "delay", f"{ms}ms"], iface=iface) + + def add_packet_loss(self, node, percent: float, iface: str = "eth0"): + self.clear(node, iface=iface) + self._exec(node, ["qdisc", "add", "dev", iface, "root", "netem", "loss", f"{percent}%"], iface=iface) + + def add_bandwidth(self, node, rate: str, iface: str = "eth0"): + self.clear(node, iface=iface) + self._exec( + node, + ["qdisc", "add", "dev", iface, "root", "tbf", "rate", rate, "burst", "32kbit", "limit", "12500"], + iface=iface, ) diff --git a/tests/e2e/test_network_Conditions.py b/tests/e2e/test_network_Conditions.py index db264ec71..bdf0ca733 100644 --- a/tests/e2e/test_network_Conditions.py +++ b/tests/e2e/test_network_Conditions.py @@ -17,7 +17,7 @@ class TestNetworkConditions(StepsRelay): def setup_nodes(self, request): self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") - self.tc = TrafficController(host="127.0.0.1", port=8080) + self.tc = TrafficController() def test_relay_with_latency_between_two_nodes(self): logger.info("Starting node1 and node2 with relay enabled") @@ -31,23 +31,163 @@ class TestNetworkConditions(StepsRelay): logger.info("Waiting for autoconnection") self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) - logger.info("Applying 500ms latency to node2") - self.tc.add_latency(container_id=self.node2.container_id, ms=500) - + logger.debug("Applying 500ms latency to node2") + self.tc.add_latency(self.node2, ms=5000) message = self.create_message() - logger.info("Publishing message from node1") - start = time() + logger.debug("Publishing message from node1") + self.node1.send_relay_message(message, self.test_pubsub_topic) - - delay(1) - - logger.info("Fetching relay messages on node2") + logger.debug("Fetching relay messages on node2") + t0 = time() messages = self.node2.get_relay_messages(self.test_pubsub_topic) - end = time() + dt = time() - t0 + assert messages, "Message arrived too early; latency may not be applied" + assert dt >= 4.5, f"Expected slow GET due to latency, got {dt:.2f}s" - logger.info("Clearing network conditions on node2") - self.tc.clear(container_id=self.node2.container_id) + self.tc.clear(self.node2) - assert messages, "Message was not received under latency" - assert (end - start) >= 0.5 + @pytest.mark.timeout(60 * 8) + def test_relay_7_nodes_3sec_latency(self): + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + self.node5 = WakuNode(NODE_2, f"node5_{self.test_id}") + self.node6 = WakuNode(NODE_2, f"node6_{self.test_id}") + self.node7 = WakuNode(NODE_2, f"node7_{self.test_id}") + + nodes = [self.node1, self.node2, self.node3, self.node4, self.node5, self.node6, self.node7] + + logger.info("Starting nodes with chain bootstrap") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + self.node5.start(relay="true", discv5_bootstrap_node=self.node4.get_enr_uri()) + self.node6.start(relay="true", discv5_bootstrap_node=self.node5.get_enr_uri()) + self.node7.start(relay="true", discv5_bootstrap_node=self.node6.get_enr_uri()) + + logger.info("Subscribing all nodes to relay topic") + for node in nodes: + node.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.info("Waiting for autoconnection") + self.wait_for_autoconnection(nodes, hard_wait=60) + + logger.info("Applying 3s latency to node3") + self.tc.add_latency(self.node3, ms=3000) + + t_start = time() + _ = self.node3.get_relay_messages(self.test_pubsub_topic) + elapsed = time() - t_start + + logger.info(f"Observed GET latency on node3: {elapsed:.2f}s") + assert elapsed >= 2.8, f"Expected ~3s latency on node3 GET, got {elapsed:.2f}s" + + @pytest.mark.timeout(60 * 6) + def test_relay_4_nodes_sender_latency(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + logger.info("Starting 4 nodes with relay enabled (bootstrap chain)") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=60) + + latency_ms = 3000 + logger.info(f"Applying {latency_ms}ms latency on sender node1") + self.tc.add_latency(self.node1, ms=latency_ms) + + t_pub0 = time() + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + publish_dt = time() - t_pub0 + + assert publish_dt > (latency_ms / 1000.0) - 0.4, f"Expected publish call to be slowed by sender latency. " + assert publish_dt <= (latency_ms / 1000.0) + 0.4, f"Publish call took too long" + + # 2) Poll receiver until it arrives, assert within expected time + deadline = t_pub0 + 5.0 + received = False + + while time() < deadline: + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + received = True + break + delay(0.2) + + assert received, f"node4 did not receive any relay message within {deadline:.1f}s" + + self.tc.clear(self.node1) + + @pytest.mark.timeout(60 * 8) + def test_relay_4_nodes_two_publishers_compare_latency(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + logger.info("Starting 4 nodes ") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=60) + + latency_ms = 3000 + logger.info(f"Applying {latency_ms}ms latency on node1 only") + self.tc.add_latency(self.node1, ms=latency_ms) + + node1_dts = [] + node2_dts = [] + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for i in range(5): + t0 = time() + self.node1.send_relay_message( + self.create_message(payload=to_base64(f"n1_{self.test_id}_{i}")), + self.test_pubsub_topic, + ) + node1_dts.append(time() - t0) + + t0 = time() + self.node2.send_relay_message( + self.create_message(payload=to_base64(f"n2_{self.test_id}_{i}")), + self.test_pubsub_topic, + ) + node2_dts.append(time() - t0) + + delay(0.2) + + for dt in node1_dts: + assert dt > (latency_ms / 1000.0) - 0.4, "Expected node1 publish to be slowed by latency" + assert dt <= (latency_ms / 1000.0) + 0.4, "node1 publish took too long" + + for dt in node2_dts: + assert dt < 1.0, f"Expected node2 publish to be fast (baseline), got {dt:.2f}s" + + deadline = time() + 10.0 + received = False + while time() < deadline: + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + received = True + break + delay(0.2) + + assert received, f"node4 did not receive any relay message within {10.0:.1f}s" + + self.tc.clear(self.node1)