diff --git a/src/steps/network_conditions.py b/src/steps/network_conditions.py index 49ba36083..8bbf6f301 100644 --- a/src/steps/network_conditions.py +++ b/src/steps/network_conditions.py @@ -105,3 +105,31 @@ class TrafficController: ], iface=iface, ) + + def add_packet_reordering( + self, + node, + percent: int = 25, + correlation: int = 50, + delay_ms: int = 10, + iface: str = "eth0", + ): + self.clear(node, iface=iface) + + self._exec( + node, + [ + "qdisc", + "add", + "dev", + iface, + "root", + "netem", + "delay", + f"{delay_ms}ms", + "reorder", + f"{percent}%", + f"{correlation}%", + ], + iface=iface, + ) diff --git a/tests/e2e/test_network_conditions.py b/tests/e2e/test_network_conditions.py index 6ee69e3ce..54c909a64 100644 --- a/tests/e2e/test_network_conditions.py +++ b/tests/e2e/test_network_conditions.py @@ -1,6 +1,6 @@ import pytest import logging -from time import time +from time import time, sleep from src.libs.custom_logger import get_custom_logger from src.env_vars import NODE_1, NODE_2 from src.node.waku_node import WakuNode @@ -322,7 +322,7 @@ class TestNetworkConditions(StepsRelay): self.tc.clear(self.node1) - @pytest.mark.xfail(reason="Fails under high packet loss percentage 60") + @pytest.mark.xfail(reason="Fails under high packet loss percentage 50") def test_relay_4_nodes_sender_packet_loss_50_15sec_timeout(self): self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") @@ -423,8 +423,8 @@ class TestNetworkConditions(StepsRelay): self.wait_for_autoconnection(nodes, hard_wait=20) total_msgs = 5 - window_s = 70.0 - loss = 40.0 + window_s = 30.0 + loss = 50.0 self.tc.add_packet_loss(self.node1, percent=loss) _ = self.node4.get_relay_messages(self.test_pubsub_topic) @@ -446,6 +446,7 @@ class TestNetworkConditions(StepsRelay): correlated = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) self.tc.clear(self.node1) + logger.debug(f"uncorrelated={uncorrelated} correlated={correlated}") assert uncorrelated >= correlated assert correlated > 0 @@ -525,3 +526,247 @@ class TestNetworkConditions(StepsRelay): self.tc.clear(self.node1) assert received > 0 + + @pytest.mark.timeout(60 * 2) + def test_relay_2_nodes_low_bandwidth_reliability(self): + msg_count = 200 + max_wait = 50 + poll_sleep = 0.5 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity="250", + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + self.tc.add_bandwidth(self.node1, rate="256kbit") + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + t0 = time() + received = 0 + while time() - t0 < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + received = len(msgs) + if received >= msg_count: + break + sleep(poll_sleep) + + elapsed = time() - t0 + logger.info(f"low_bw_elapsed={elapsed:.2f}s received={received} msg_count={msg_count}") + + assert received >= msg_count + + @pytest.mark.timeout(60 * 8) + def test_relay_2_nodes_low_bandwidth_sending_over_time(self): + msg_count = 200 + cache_capacity = "250" + + send_interval_s = 0.05 + poll_interval_s = 0.5 + max_wait_s = 240 + min_received = 150 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity=cache_capacity, + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + # mirror your suite's "low bandwidth" shaping values (rate + burst/limit handled by the tc wrapper defaults) + self.tc.add_bandwidth(self.node1, rate="256kbit") + + received = 0 + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + sleep(send_interval_s) + + t0 = time() + while time() - t0 < max_wait_s and received < msg_count: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + received += len(msgs) + if received >= msg_count: + break + sleep(poll_interval_s) + + total_time = time() - t0 + + logger.info( + "low_bw_reliability " + f"rate=256kbit msg_count={msg_count} cache_capacity={cache_capacity} " + f"send_interval_s={send_interval_s} poll_interval_s={poll_interval_s} " + f"recv_duration={total_time:.2f}s" + ) + + assert received >= min_received, f"received {received}/{msg_count} under low bandwidth" + + @pytest.mark.timeout(60 * 8) + def test_relay_2_nodes_bandwidth_low_vs_high_drain_time(self): + msg_count = 200 + cache = "250" + poll_sleep = 0.5 + max_wait = 200 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true", rest_relay_cache_capacity=cache) + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity=cache, + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + self.tc.add_bandwidth(self.node1, rate="256kbit") + + _ = self.node2.get_relay_messages(self.test_pubsub_topic) + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + total_low_msgs = 0 + t0 = time() + while total_low_msgs < msg_count and (time() - t0) < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + total_low_msgs += len(msgs) + sleep(poll_sleep) + low_rate_t = time() - t0 + + self.tc.add_bandwidth(self.node1, rate="10mbit") + + _ = self.node2.get_relay_messages(self.test_pubsub_topic) + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + total_high_msgs = 0 + t1 = time() + while total_high_msgs < msg_count and (time() - t1) < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + total_high_msgs += len(msgs) + sleep(poll_sleep) + high_rate_t = time() - t1 + + logger.info( + f"low_rate_t={low_rate_t:.2f}s high_rate_t={high_rate_t:.2f}s " + f"total_low_msgs={total_low_msgs} total_high_msgs={total_high_msgs} " + f"msg_count={msg_count} cache={cache}" + ) + + assert total_low_msgs >= msg_count + assert total_high_msgs >= msg_count + assert high_rate_t < low_rate_t + + @pytest.mark.timeout(60 * 6) + def test_relay_2_nodes_packet_reordering(self): + msg_count = 200 + cache_capacity = "200" + poll_sleep = 0.5 + max_wait = 120 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity=cache_capacity, + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + self.tc.add_packet_reordering(self.node2, percent=25, correlation=50) + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + received = 0 + t0 = time() + while received < msg_count and time() - t0 < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + received += len(msgs) + sleep(poll_sleep) + + elapsed = time() - t0 + + logger.info(f"packet_reordering " f"reorder=25% corr=50% " f"msg_count={msg_count} received={received} " f"elapsed={elapsed:.2f}s") + + assert received >= msg_count + + @pytest.mark.timeout(60 * 6) + def test_relay_2_nodes_temporary_blackout_recovers_no_helpers(self): + msgs_count = 100 + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + self.tc = TrafficController() + + logger.info("Starting node1 and node2 with relay enabled") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + + logger.info("Subscribing both nodes to relay topic") + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.info("Waiting for autoconnection") + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=15) + logger.info(f"Applying 100%% packet loss on both nodes ") + self.tc.clear(self.node1) + self.tc.clear(self.node2) + self.tc.add_packet_loss(self.node1, percent=100.0) + self.tc.add_packet_loss(self.node2, percent=100.0) + + delay(5) + logger.info("Clearing tc rules (restore connectivity)") + self.tc.clear(self.node1) + self.tc.clear(self.node2) + + logger.info("Waiting for peer list recovery on both nodes") + peers1 = 0 + while time() < time() + 30.0: + peers1 = self.node1.get_peers() or [] + peers2 = self.node2.get_peers() or [] + if len(peers1) > 0 and len(peers2) > 0: + break + delay(0.5) + + assert len(peers1) > 0, "Peers did not recover after blackout (would require restart)" + + logger.info("Publishing after recovery") + for _ in range(msgs_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + delay(5) + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + assert len(msgs) >= msgs_count - 10, "Post-recovery message was not delivered" + logger.info(f"{len(msgs)} messages were delivered") + self.tc.clear(self.node1) + self.tc.clear(self.node2)