diff --git a/src/steps/network_conditions.py b/src/steps/network_conditions.py index 49ba36083..63bba9e05 100644 --- a/src/steps/network_conditions.py +++ b/src/steps/network_conditions.py @@ -105,3 +105,29 @@ class TrafficController: ], iface=iface, ) + + def add_packet_reordering(self, node, percent: int = 25, correlation: int = 50, delay_ms: int = 10): + self._exec( + ["sudo", "-n", "nsenter", "-t", "-n", "tc", "qdisc", "del", "dev", "eth0", "root"], + ) + self._exec( + [ + "sudo", + "-n", + "nsenter", + "-t", + "-n", + "tc", + "qdisc", + "add", + "dev", + "eth0", + "root", + "netem", + "delay", + f"{delay_ms}ms", + "reorder", + f"{percent}%", + f"{correlation}%", + ] + ) diff --git a/tests/e2e/test_network_conditions.py b/tests/e2e/test_network_conditions.py index 0d620ba2f..e622807ce 100644 --- a/tests/e2e/test_network_conditions.py +++ b/tests/e2e/test_network_conditions.py @@ -618,3 +618,105 @@ class TestNetworkConditions(StepsRelay): ) assert received >= min_received, f"received {received}/{msg_count} under low bandwidth" + + @pytest.mark.timeout(60 * 8) + def test_relay_2_nodes_bandwidth_low_vs_high_drain_time(self): + msg_count = 200 + cache = "250" + poll_sleep = 0.5 + max_wait = 200 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true", rest_relay_cache_capacity=cache) + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity=cache, + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + self.tc.add_bandwidth(self.node1, rate="256kbit") + + _ = self.node2.get_relay_messages(self.test_pubsub_topic) + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + total_low_msgs = 0 + t0 = time() + while total_low_msgs < msg_count and (time() - t0) < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + total_low_msgs += len(msgs) + sleep(poll_sleep) + low_rate_t = time() - t0 + + self.tc.add_bandwidth(self.node1, rate="10mbit") + + _ = self.node2.get_relay_messages(self.test_pubsub_topic) + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + total_high_msgs = 0 + t1 = time() + while total_high_msgs < msg_count and (time() - t1) < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + total_high_msgs += len(msgs) + sleep(poll_sleep) + high_rate_t = time() - t1 + + logger.info( + f"low_rate_t={low_rate_t:.2f}s high_rate_t={high_rate_t:.2f}s " + f"total_low_msgs={total_low_msgs} total_high_msgs={total_high_msgs} " + f"msg_count={msg_count} cache={cache}" + ) + + assert total_low_msgs >= msg_count + assert total_high_msgs >= msg_count + assert high_rate_t < low_rate_t + + @pytest.mark.timeout(60 * 6) + def test_relay_2_nodes_packet_reordering(self): + msg_count = 100 + cache_capacity = "200" + poll_sleep = 0.5 + max_wait = 120 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity=cache_capacity, + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + self.tc.add_packet_reordering(self.node2, percent=25, correlation=50) + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + received = 0 + t0 = time() + while received < msg_count and time() - t0 < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + received += len(msgs) + sleep(poll_sleep) + + elapsed = time() - t0 + + logger.info(f"packet_reordering " f"reorder=25% corr=50% " f"msg_count={msg_count} received={received} " f"elapsed={elapsed:.2f}s") + + assert received >= msg_count