diff --git a/src/steps/network_conditions.py b/src/steps/network_conditions.py index 218424f61..cb88fa7ba 100644 --- a/src/steps/network_conditions.py +++ b/src/steps/network_conditions.py @@ -58,7 +58,6 @@ class TrafficController: def add_packet_loss(self, node, percent: float, iface: str = "eth0"): self.clear(node, iface=iface) - # Apply loss self._exec( node, ["qdisc", "add", "dev", iface, "root", "netem", "loss", f"{percent}%"], @@ -66,7 +65,6 @@ class TrafficController: ) try: stats = self._exec(node, ["-s", "qdisc", "show", "dev", iface], iface=iface) - # _exec might return bytes/str/None depending on your implementation. if stats is not None: if isinstance(stats, (bytes, bytearray)): stats = stats.decode(errors="replace") diff --git a/tests/e2e/test_network_Conditions.py b/tests/e2e/test_network_Conditions.py index d3a001399..9ce633adc 100644 --- a/tests/e2e/test_network_Conditions.py +++ b/tests/e2e/test_network_Conditions.py @@ -322,7 +322,7 @@ class TestNetworkConditions(StepsRelay): self.tc.clear(self.node1) - def test_relay_4_nodes_sender_packet_loss_delivery_ratio_simple(self): + def test_relay_4_nodes_sender_packet_loss_50(self): self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") @@ -337,39 +337,184 @@ class TestNetworkConditions(StepsRelay): self.wait_for_autoconnection(nodes, hard_wait=20) - loss_levels = [30.0, 40.0] + loss = 50.0 total_msgs = 30 - window_s = 10.0 - received = {} - - for loss in loss_levels: - self.tc.clear(self.node1) - self.tc.add_packet_loss(self.node1, percent=loss) - - _ = self.node4.get_relay_messages(self.test_pubsub_topic) - - batch_tag = f"loss={loss}-{self.test_id}" - batch_tag_b64 = to_base64(batch_tag) - - for i in range(total_msgs): - payload_b64 = to_base64(f"{batch_tag}-msg-{i}") - msg = self.create_message(payload=payload_b64, meta=batch_tag_b64) - self.node1.send_relay_message(msg, self.test_pubsub_topic) - - delay(window_s) - - msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] - received_count = sum(1 for m in msgs if m.get("meta") == batch_tag_b64) - - received[loss] = received_count - logger.info(f"[LOSS={loss}%] sent={total_msgs} received={received_count} window={window_s}s") + window_s = 15.0 self.tc.clear(self.node1) - logger.debug(f"at 50 {received[30.0]}") - logger.debug(f"at 70 {received[40.0]}") - # assert received[50.0] > 25, f"Expected >25 msgs at 50% loss, got {received[50.0]}" + self.tc.add_packet_loss(self.node1, percent=loss) - # assert received[70.0] > 25, f"Expected >25 msgs at 70% loss, got {received[70.0]}" - # assert received[50.0] >= received[70.0], ( - # f"Expected 50% loss >= 70% loss (50%={received[50.0]}, 70%={received[70.0]})" - # ) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + batch_tag = f"loss={loss}-{self.test_id}" + batch_tag_b64 = to_base64(batch_tag) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + self.tc.log_tc_stats(self.node1) + delay(window_s) + + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + received = len(msgs) + + logger.info(f"[LOSS={loss}%] sent={total_msgs} received={received} window={window_s}s") + + self.tc.clear(self.node1) + + assert received > int(total_msgs * 0.8), "No messages received under 50% packet loss" + + @pytest.mark.timeout(60 * 10) + @pytest.mark.parametrize("loss", [50.0, 60.0, 70], ids=["loss50", "loss60", "loss70"]) + def test_relay_4_nodes_sender_packet_loss_delivery_ratio_simple(self, loss): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + total_msgs = 30 + window_s = 30.0 + + self.tc.clear(self.node1) + self.tc.add_packet_loss(self.node1, percent=loss) + + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + self.tc.log_tc_stats(self.node1) + delay(window_s) + + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + received = len(msgs) + + logger.info(f"[LOSS={loss}%] sent={total_msgs} received={received} window={window_s}s") + + self.tc.clear(self.node1) + + assert received > int(total_msgs * 0.8), f"No messages received at {loss}% packet loss" + + @pytest.mark.timeout(60 * 10) + def test_relay_packet_loss_correlated_vs_uncorrelated(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + total_msgs = 30 + window_s = 30.0 + + self.tc.add_packet_loss(self.node1, percent=30.0) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + uncorrelated = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear(self.node1) + + self.tc.add_packet_loss_correlated(self.node1, percent=30.0, correlation=75.0) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + correlated = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear(self.node1) + + assert uncorrelated >= correlated + + @pytest.mark.timeout(60 * 10) + def test_relay_packet_loss_sender_vs_receiver_egress(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + total_msgs = 30 + window_s = 30.0 + + self.tc.add_packet_loss_egress(self.node1, percent=50.0) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + sender_loss = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear(self.node1) + + self.tc.add_packet_loss_egress(self.node4, percent=50.0) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + receiver_loss = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear(self.node4) + + assert sender_loss <= receiver_loss + + @pytest.mark.timeout(60 * 10) + def test_relay_packet_loss_applied_mid_stream(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + window_s = 30.0 + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(10): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + self.tc.add_packet_loss(self.node1, percent=70.0) + + for _ in range(20): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + received = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear(self.node1) + + assert received > 0