diff --git a/src/steps/network_conditions.py b/src/steps/network_conditions.py index 8bbf6f301..bfe9ffa39 100644 --- a/src/steps/network_conditions.py +++ b/src/steps/network_conditions.py @@ -133,3 +133,194 @@ class TrafficController: ], iface=iface, ) + + def _setup_prio_root(self, node, iface: str = "eth0"): + self.clear(node, iface=iface) + self._exec( + node, + [ + "qdisc", + "add", + "dev", + iface, + "root", + "handle", + "1:", + "prio", + "bands", + "3", + "priomap", + "1", + "1", + "1", + "1", + "1", + "1", + "1", + "1", + "1", + "1", + "1", + "1", + "1", + "1", + "1", + "1", + ], + iface=iface, + ) + + def _attach_netem_default_band(self, node, netem_args: list[str], iface: str = "eth0"): + self._exec( + node, + ["qdisc", "replace", "dev", iface, "parent", "1:1", "handle", "10:", "netem"] + netem_args, + iface=iface, + ) + + def _exempt_tcp_sport(self, node, port: int, band: int = 2, iface: str = "eth0"): + self._exec( + node, + [ + "filter", + "add", + "dev", + iface, + "protocol", + "ip", + "parent", + "1:", + "prio", + "1", + "u32", + "match", + "ip", + "protocol", + "6", + "0xff", + "match", + "ip", + "sport", + str(int(port)), + "0xffff", + "flowid", + f"1:{int(band)}", + ], + iface=iface, + ) + + def _apply_netem_except_control_plane( + self, + node, + netem_args: list[str], + rest_port: int, + metrics_port: int | None = None, + iface: str = "eth0", + ): + self._setup_prio_root(node, iface=iface) + self._attach_netem_default_band(node, netem_args, iface=iface) + self._exempt_tcp_sport(node, rest_port, band=2, iface=iface) + if metrics_port is not None: + self._exempt_tcp_sport(node, metrics_port, band=2, iface=iface) + self.log_tc_stats(node, iface=iface) + + def add_latency_except_rest( + self, + node, + ms: int, + rest_port: int, + metrics_port: int | None = None, + iface: str = "eth0", + ): + self._apply_netem_except_control_plane( + node, + netem_args=["delay", f"{int(ms)}ms"], + rest_port=rest_port, + metrics_port=metrics_port, + iface=iface, + ) + + def add_packet_loss_except_rest( + self, + node, + percent: float, + rest_port: int, + metrics_port: int | None = None, + iface: str = "eth0", + ): + self._apply_netem_except_control_plane( + node, + netem_args=["loss", f"{float(percent)}%"], + rest_port=rest_port, + metrics_port=metrics_port, + iface=iface, + ) + + def add_packet_loss_correlated_except_rest( + self, + node, + percent: float, + correlation: float, + rest_port: int, + metrics_port: int | None = None, + iface: str = "eth0", + ): + self._apply_netem_except_control_plane( + node, + netem_args=["loss", f"{float(percent)}%", f"{float(correlation)}%"], + rest_port=rest_port, + metrics_port=metrics_port, + iface=iface, + ) + + def add_packet_reordering_except_rest( + self, + node, + rest_port: int, + metrics_port: int | None = None, + percent: int = 25, + correlation: int = 50, + delay_ms: int = 10, + iface: str = "eth0", + ): + self._apply_netem_except_control_plane( + node, + netem_args=["delay", f"{int(delay_ms)}ms", "reorder", f"{int(percent)}%", f"{int(correlation)}%"], + rest_port=rest_port, + metrics_port=metrics_port, + iface=iface, + ) + + def add_bandwidth_except_rest( + self, + node, + rate: str, + rest_port: int, + metrics_port: int | None = None, + iface: str = "eth0", + ): + self._setup_prio_root(node, iface=iface) + self._exec( + node, + [ + "qdisc", + "replace", + "dev", + iface, + "parent", + "1:1", + "handle", + "20:", + "tbf", + "rate", + rate, + "burst", + "32kbit", + "limit", + "12500", + ], + iface=iface, + ) + self._exempt_tcp_sport(node, rest_port, band=2, iface=iface) + if metrics_port is not None: + self._exempt_tcp_sport(node, metrics_port, band=2, iface=iface) + self.log_tc_stats(node, iface=iface) diff --git a/tests/e2e/test_network_conditions.py b/tests/e2e/test_network_conditions.py index 54c909a64..a52af6f58 100644 --- a/tests/e2e/test_network_conditions.py +++ b/tests/e2e/test_network_conditions.py @@ -770,3 +770,51 @@ class TestNetworkConditions(StepsRelay): logger.info(f"{len(msgs)} messages were delivered") self.tc.clear(self.node1) self.tc.clear(self.node2) + + @pytest.mark.timeout(60) + def test_relay_with_latency_except_rest(self): + logger.info("Starting node1 and node2 with relay enabled") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + + logger.info("Subscribing both nodes to relay topic") + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.info("Waiting for autoconnection") + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + latency_ms = 9000 + + logger.info(f"Applying latency to node1 except REST") + self.tc.add_latency_except_rest( + self.node1, + ms=latency_ms, + rest_port=self.node1.start_args["rest-port"], + metrics_port=self.node1.start_args.get("metrics-server-port"), + ) + + _ = self.node2.get_relay_messages(self.test_pubsub_topic) + + logger.info("Publishing message from node1") + t0 = time() + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + self.tc.log_tc_stats(self.node1) + deadline = t0 + 4.0 + msg_received = False + arrival_time = 0 + while time() < deadline: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + arrival_time = time() - t0 + msg_received = True + break + delay(0.2) + + assert msg_received, "node2 did not receive message under latency" + + expected_s = latency_ms / 1000.0 + assert arrival_time >= expected_s - 0.5, f"Expected >= {expected_s - 0.5}s, got {arrival_time}s" + logger.info(f" arrival delay: {arrival_time:.2f}s") + + self.tc.clear(self.node2)