diff --git a/src/steps/network_conditions.py b/src/steps/network_conditions.py index be3094f3d..218424f61 100644 --- a/src/steps/network_conditions.py +++ b/src/steps/network_conditions.py @@ -25,6 +25,23 @@ class TrafficController: if res.returncode != 0: raise RuntimeError(f"TC failed: {' '.join(cmd)}\n" f"stdout: {res.stdout}\n" f"stderr: {res.stderr}") + return res.stdout + + def log_tc_stats(self, node, iface: str = "eth0"): + """ + Log tc statistics for an interface (best-effort). + Useful to confirm netem loss/delay counters (sent/dropped/etc.). + """ + try: + out = self._exec(node, ["-s", "qdisc", "show", "dev", iface], iface=iface) + out = (out or "").strip() + if out: + logger.debug(f"tc -s qdisc show dev {iface}:\n{out}") + else: + logger.debug(f"tc -s qdisc show dev {iface}: (no output)") + except Exception as e: + logger.debug(f"Failed to read tc stats for {iface}: {e}") + def clear(self, node, iface: str = "eth0"): try: self._exec(node, ["qdisc", "del", "dev", iface, "root"], iface=iface) @@ -40,7 +57,24 @@ class TrafficController: def add_packet_loss(self, node, percent: float, iface: str = "eth0"): self.clear(node, iface=iface) - self._exec(node, ["qdisc", "add", "dev", iface, "root", "netem", "loss", f"{percent}%"], iface=iface) + + # Apply loss + self._exec( + node, + ["qdisc", "add", "dev", iface, "root", "netem", "loss", f"{percent}%"], + iface=iface, + ) + try: + stats = self._exec(node, ["-s", "qdisc", "show", "dev", iface], iface=iface) + # _exec might return bytes/str/None depending on your implementation. + if stats is not None: + if isinstance(stats, (bytes, bytearray)): + stats = stats.decode(errors="replace") + logger.debug(f"tc -s qdisc show dev {iface}:\n{stats}") + else: + logger.debug(f"Executed: tc -s qdisc show dev {iface} (no output returned by _exec)") + except Exception as e: + logger.debug(f"Failed to read tc stats for {iface}: {e}") def add_bandwidth(self, node, rate: str, iface: str = "eth0"): self.clear(node, iface=iface) diff --git a/tests/e2e/test_network_Conditions.py b/tests/e2e/test_network_Conditions.py index d1dc601a8..d3a001399 100644 --- a/tests/e2e/test_network_Conditions.py +++ b/tests/e2e/test_network_Conditions.py @@ -8,6 +8,7 @@ from src.steps.relay import StepsRelay from src.libs.common import delay from src.steps.common import StepsCommon from src.steps.network_conditions import TrafficController +from src.libs.common import to_base64 logger = get_custom_logger(__name__) @@ -230,7 +231,7 @@ class TestNetworkConditions(StepsRelay): self.tc.clear(self.node2) @pytest.mark.timeout(60 * 10) - def test_relay_sustained_latency_under_load_sender_burst(self): + def test_latency_with_load_sender_side(self): latency_ms = 3000 total_messages = 50 wait_time = 40.0 @@ -279,7 +280,7 @@ class TestNetworkConditions(StepsRelay): assert received_count >= acceptable_msgs, "relay stalled or dropped all traffic; " self.tc.clear(self.node1) - def test_relay_4_nodes_sender_packet_loss_simple(self): + def test_relay_4_nodes_sender_packet_loss(self): self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") @@ -293,7 +294,7 @@ class TestNetworkConditions(StepsRelay): for n in nodes: n.set_relay_subscriptions([self.test_pubsub_topic]) - self.wait_for_autoconnection(nodes, hard_wait=60) + self.wait_for_autoconnection(nodes, hard_wait=20) loss_percent = 30.0 logger.info(f"Applying {loss_percent}% packet loss on sender node1") @@ -303,8 +304,10 @@ class TestNetworkConditions(StepsRelay): self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + self.tc.log_tc_stats(self.node1) deadline = time() + 10.0 received = False + cnt = 0 while time() < deadline: msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] @@ -312,7 +315,61 @@ class TestNetworkConditions(StepsRelay): received = True break delay(0.2) + cnt = cnt + 1 - assert received, f"node4 did not receive any relay message" + assert received, f"Node4 did not receive any relay message" + logger.info(f"Node4 received messages from node1 after " f"{cnt} trails") self.tc.clear(self.node1) + + def test_relay_4_nodes_sender_packet_loss_delivery_ratio_simple(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + loss_levels = [30.0, 40.0] + total_msgs = 30 + window_s = 10.0 + received = {} + + for loss in loss_levels: + self.tc.clear(self.node1) + self.tc.add_packet_loss(self.node1, percent=loss) + + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + batch_tag = f"loss={loss}-{self.test_id}" + batch_tag_b64 = to_base64(batch_tag) + + for i in range(total_msgs): + payload_b64 = to_base64(f"{batch_tag}-msg-{i}") + msg = self.create_message(payload=payload_b64, meta=batch_tag_b64) + self.node1.send_relay_message(msg, self.test_pubsub_topic) + + delay(window_s) + + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + received_count = sum(1 for m in msgs if m.get("meta") == batch_tag_b64) + + received[loss] = received_count + logger.info(f"[LOSS={loss}%] sent={total_msgs} received={received_count} window={window_s}s") + + self.tc.clear(self.node1) + logger.debug(f"at 50 {received[30.0]}") + logger.debug(f"at 70 {received[40.0]}") + # assert received[50.0] > 25, f"Expected >25 msgs at 50% loss, got {received[50.0]}" + + # assert received[70.0] > 25, f"Expected >25 msgs at 70% loss, got {received[70.0]}" + # assert received[50.0] >= received[70.0], ( + # f"Expected 50% loss >= 70% loss (50%={received[50.0]}, 70%={received[70.0]})" + # )