diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index f9a73a9d..8a33932d 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -77,6 +77,16 @@ jobs: python-version: '3.12' cache: 'pip' + - name: Install system deps for tc / nsenter + run: | + sudo apt-get update + sudo apt-get install -y \ + util-linux \ + iproute2 \ + sudo \ + ca-certificates \ + curl + - run: pip install -r requirements.txt - name: Run tests diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 9a8d2e35..1a96d061 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -577,3 +577,9 @@ class WakuNode: def get_peer_info(self, peer_id: str): return self._api.get_peer(peer_id) + + @property + def container_id(self) -> str: + if not self._container: + raise RuntimeError("Node container not started yet") + return self._container.id diff --git a/src/steps/network_conditions.py b/src/steps/network_conditions.py new file mode 100644 index 00000000..49ba3608 --- /dev/null +++ b/src/steps/network_conditions.py @@ -0,0 +1,107 @@ +import subprocess +from src.libs.custom_logger import get_custom_logger + +logger = get_custom_logger(__name__) + + +class TrafficController: + def _pid(self, node) -> int: + if not node.container: + raise RuntimeError("Node container not started yet") + + node.container.reload() + pid = node.container.attrs.get("State", {}).get("Pid") + if not pid or pid == 0: + raise RuntimeError("Container PID not available (container not running?)") + return int(pid) + + def _exec(self, node, tc_args: list[str], iface: str = "eth0"): + pid = self._pid(node) + + cmd = ["sudo", "-n", "nsenter", "-t", str(pid), "-n", "tc"] + tc_args + logger.info(f"TC exec: {cmd}") + + res = subprocess.run(cmd, capture_output=True, text=True) + if res.returncode != 0: + raise RuntimeError(f"TC failed: {' '.join(cmd)}\n" f"stdout: {res.stdout}\n" f"stderr: {res.stderr}") + + return res.stdout + + def log_tc_stats(self, node, iface: str = "eth0"): + """ + Log tc statistics for an interface (best-effort). + Useful to confirm netem loss/delay counters (sent/dropped/etc.). + """ + try: + out = self._exec(node, ["-s", "qdisc", "show", "dev", iface], iface=iface) + out = (out or "").strip() + if out: + logger.debug(f"tc -s qdisc show dev {iface}:\n{out}") + else: + logger.debug(f"tc -s qdisc show dev {iface}: (no output)") + except Exception as e: + logger.debug(f"Failed to read tc stats for {iface}: {e}") + + def clear(self, node, iface: str = "eth0"): + try: + self._exec(node, ["qdisc", "del", "dev", iface, "root"], iface=iface) + except RuntimeError as e: + msg = str(e) + if "Cannot delete qdisc with handle of zero" in msg or "No such file or directory" in msg: + return + raise + + def add_latency(self, node, ms: int, iface: str = "eth0"): + self.clear(node, iface=iface) + self._exec(node, ["qdisc", "add", "dev", iface, "root", "netem", "delay", f"{ms}ms"], iface=iface) + + def add_packet_loss(self, node, percent: float, iface: str = "eth0"): + self.clear(node, iface=iface) + + self._exec( + node, + ["qdisc", "add", "dev", iface, "root", "netem", "loss", f"{percent}%"], + iface=iface, + ) + try: + stats = self._exec(node, ["-s", "qdisc", "show", "dev", iface], iface=iface) + if stats is not None: + if isinstance(stats, (bytes, bytearray)): + stats = stats.decode(errors="replace") + logger.debug(f"tc -s qdisc show dev {iface}:\n{stats}") + else: + logger.debug(f"Executed: tc -s qdisc show dev {iface} (no output returned by _exec)") + except Exception as e: + logger.debug(f"Failed to read tc stats for {iface}: {e}") + + def add_bandwidth(self, node, rate: str, iface: str = "eth0"): + self.clear(node, iface=iface) + self._exec( + node, + ["qdisc", "add", "dev", iface, "root", "tbf", "rate", rate, "burst", "32kbit", "limit", "12500"], + iface=iface, + ) + + def add_packet_loss_correlated( + self, + node, + percent: float, + correlation: float, + iface: str = "eth0", + ): + self.clear(node, iface=iface) + self._exec( + node, + [ + "qdisc", + "add", + "dev", + iface, + "root", + "netem", + "loss", + f"{percent}%", + f"{correlation}%", + ], + iface=iface, + ) diff --git a/tests/e2e/test_network_conditions.py b/tests/e2e/test_network_conditions.py new file mode 100644 index 00000000..b94414b8 --- /dev/null +++ b/tests/e2e/test_network_conditions.py @@ -0,0 +1,525 @@ +import pytest +import logging +from time import time +from src.libs.custom_logger import get_custom_logger +from src.env_vars import NODE_1, NODE_2 +from src.node.waku_node import WakuNode +from src.steps.relay import StepsRelay +from src.libs.common import delay +from src.steps.common import StepsCommon +from src.steps.network_conditions import TrafficController +from src.libs.common import to_base64 + +logger = get_custom_logger(__name__) + + +class TestNetworkConditions(StepsRelay): + @pytest.fixture(scope="function", autouse=True) + def setup_nodes(self, request): + self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") + self.tc = TrafficController() + + def test_relay_with_latency_between_two_nodes(self): + logger.info("Starting node1 and node2 with relay enabled") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + + logger.info("Subscribing both nodes to relay topic") + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.info("Waiting for autoconnection") + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + logger.debug("Applying 5000ms latency to node2") + self.tc.add_latency(self.node2, ms=5000) + message = self.create_message() + + logger.debug("Publishing message from node1") + + self.node1.send_relay_message(message, self.test_pubsub_topic) + logger.debug("Fetching relay messages on node2") + t0 = time() + messages = self.node2.get_relay_messages(self.test_pubsub_topic) + dt = time() - t0 + assert messages, "Messages aren't arrive" + assert dt >= 4.5, f"Expected slow GET due to latency, got {dt}" + assert dt <= 10.5, "msg took too long" + self.tc.clear(self.node2) + + @pytest.mark.timeout(60 * 8) + def test_relay_7_nodes_3sec_latency(self): + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + self.node5 = WakuNode(NODE_2, f"node5_{self.test_id}") + self.node6 = WakuNode(NODE_2, f"node6_{self.test_id}") + self.node7 = WakuNode(NODE_2, f"node7_{self.test_id}") + + nodes = [self.node1, self.node2, self.node3, self.node4, self.node5, self.node6, self.node7] + + logger.info("Starting nodes with chain bootstrap") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + self.node5.start(relay="true", discv5_bootstrap_node=self.node4.get_enr_uri()) + self.node6.start(relay="true", discv5_bootstrap_node=self.node5.get_enr_uri()) + self.node7.start(relay="true", discv5_bootstrap_node=self.node6.get_enr_uri()) + + logger.info("Subscribing all nodes to relay topic") + for node in nodes: + node.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.info("Waiting for autoconnection") + self.wait_for_autoconnection(nodes, hard_wait=60) + + logger.info("Applying 3s latency to node3") + self.tc.add_latency(self.node3, ms=3000) + + t_start = time() + _ = self.node3.get_relay_messages(self.test_pubsub_topic) + elapsed = time() - t_start + + logger.info(f"Observed GET latency on node3: {elapsed:.2f}s") + assert elapsed >= 2.8, f"Expected ~3s latency on node3 GET, got {elapsed:.2f}s" + + @pytest.mark.timeout(60 * 6) + def test_relay_4_nodes_sender_latency(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + logger.info("Starting 4 nodes with relay enabled (bootstrap chain)") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=40) + + latency_ms = 3000 + logger.info(f"Applying {latency_ms}ms latency on sender node1") + self.tc.add_latency(self.node1, ms=latency_ms) + + t_pub0 = time() + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + publish_dt = time() - t_pub0 + + assert publish_dt > ((latency_ms * 2) / 1000.0) - 0.4, f"Expected publish call to be slowed by sender latency. " + assert publish_dt <= ((latency_ms * 2) / 1000.0) + 0.4, f"Publish call took too long" + # latency is doubled as request + response both will have latency + + deadline = t_pub0 + 10.0 + received = False + + while time() < deadline: + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + received = True + break + delay(0.2) + + assert received, f"node4 did not receive any relay message within {deadline}" + + self.tc.clear(self.node1) + + @pytest.mark.timeout(60 * 8) + def test_relay_4_nodes_two_publishers_compare_latency(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + logger.info("Starting 4 nodes ") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=60) + + latency_ms = 3000 + logger.debug(f"Applying {latency_ms}ms latency on node1 only") + self.tc.add_latency(self.node1, ms=latency_ms) + + node1_dts = [] + node2_dts = [] + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for i in range(5): + t0 = time() + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + node1_dts.append(time() - t0) + + t0 = time() + self.node2.send_relay_message( + self.create_message(), + self.test_pubsub_topic, + ) + node2_dts.append(time() - t0) + + delay(0.2) + + for dt in node1_dts: + assert dt <= ((latency_ms * 2) / 1000.0) + 0.4, "node1 publish took too long" + + for dt in node2_dts: + assert dt < 1.0, f"Expected node2 publish to be fast" + + deadline = time() + 10.0 + received = False + while time() < deadline: + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + received = True + break + delay(0.2) + + assert received, f"node4 did not receive any relay message within time" + + self.tc.clear(self.node1) + + @pytest.mark.timeout(60 * 6) + @pytest.mark.parametrize( + "latency_ms", + [ + 200, + 500, + 1000, + 5000, + 7000, + ], + ) + def test_relay_different_latency_between_two_nodes(self, latency_ms): + logger.info("Starting node1 and node2 with relay enabled") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + + logger.info("Subscribing both nodes to relay topic") + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.info("Waiting for autoconnection") + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + logger.info(f"Applying {latency_ms}ms latency to node2") + self.tc.clear(self.node2) + if latency_ms > 0: + self.tc.add_latency(self.node2, ms=latency_ms) + + message = self.create_message() + self.node1.send_relay_message(message, self.test_pubsub_topic) + t0 = time() + messages = self.node2.get_relay_messages(self.test_pubsub_topic) + dt = time() - t0 + + assert messages, "No relay messages returned (publish/relay may have failed)" + expected_s = (latency_ms / 1000.0) * 2 + tolerance_s = 0.5 + assert dt >= expected_s - tolerance_s, f"Expected >= {expected_s - tolerance_s}s, got {dt}s" + assert dt <= expected_s + tolerance_s, f"Expected <= {expected_s + tolerance_s}s, got {dt}s" + self.tc.clear(self.node2) + + @pytest.mark.timeout(60 * 10) + def test_latency_with_load_sender_side(self): + latency_ms = 3000 + total_messages = 50 + wait_time = 40.0 + acceptable_msgs = total_messages / 2 + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + logger.info("Starting 4 nodes with relay enabled") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + + logger.info("Subscribing all nodes to relay topic") + for n in [self.node1, self.node2, self.node3, self.node4]: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2, self.node3, self.node4], hard_wait=30) + + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + logger.info(f"Applying {latency_ms}ms latency on sender node1") + self.tc.clear(self.node1) + self.tc.add_latency(self.node1, ms=latency_ms) + + logger.info(f"Sending {total_messages} messages from node1") + for i in range(total_messages): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + received_count = 0 + last_count = 0 + ticks = 0 + + deadline = time() + wait_time + while time() < deadline: + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + if len(msgs) > last_count: + ticks += 1 + last_count = len(msgs) + received_count = max(received_count, len(msgs)) + if received_count >= acceptable_msgs: + break + delay(1) + + logger.info(f"Node4 received {received_count} messages " f"(min_expected={acceptable_msgs}, total_sent={total_messages})") + + assert received_count >= acceptable_msgs, "relay stalled or dropped all traffic; " + self.tc.clear(self.node1) + + def test_relay_4_nodes_sender_packet_loss(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + logger.info("Starting 4 nodes with relay enabled (bootstrap chain)") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + loss_percent = 30.0 + logger.info(f"Applying {loss_percent}% packet loss on sender node1") + self.tc.add_packet_loss(self.node1, percent=loss_percent) + + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + self.tc.log_tc_stats(self.node1) + deadline = time() + 10.0 + received = False + cnt = 0 + + while time() < deadline: + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + received = True + break + delay(0.2) + cnt = cnt + 1 + + assert received, f"Node4 did not receive any relay message" + logger.info(f"Node4 received messages from node1 after " f"{cnt} trails") + + self.tc.clear(self.node1) + + def test_relay_4_nodes_sender_packet_loss_50_15sec_timeout(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + loss = 50.0 + total_msgs = 30 + window_s = 15.0 + + self.tc.clear(self.node1) + self.tc.add_packet_loss(self.node1, percent=loss) + + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + batch_tag = f"loss={loss}-{self.test_id}" + batch_tag_b64 = to_base64(batch_tag) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + self.tc.log_tc_stats(self.node1) + delay(window_s) + + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + received = len(msgs) + + logger.info(f"[LOSS={loss}%] sent={total_msgs} received={received} window={window_s}s") + + self.tc.clear(self.node1) + + assert received > int(total_msgs * 0.8), "No messages received under 50% packet loss" + + @pytest.mark.timeout(60 * 10) + @pytest.mark.parametrize("loss", [40.0, 60.0], ids=["loss40", "loss60"]) + def test_relay_4_nodes_sender_packet_loss_delivery_ratio_simple(self, loss): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + total_msgs = 30 + window_s = 40.0 + + self.tc.clear(self.node1) + self.tc.add_packet_loss(self.node1, percent=loss) + + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + self.tc.log_tc_stats(self.node1) + delay(window_s) + + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + received = len(msgs) + + logger.info(f"[LOSS={loss}%] sent={total_msgs} received={received} window={window_s}s") + + self.tc.clear(self.node1) + + assert received > int(total_msgs * 0.8), f"No messages received at {loss}% packet loss" + + @pytest.mark.timeout(60 * 10) + def test_relay_packet_loss_correlated_vs_uncorrelated(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + total_msgs = 5 + window_s = 70.0 + loss = 40.0 + + self.tc.add_packet_loss(self.node1, percent=loss) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + uncorrelated = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear(self.node1) + + self.tc.add_packet_loss_correlated(self.node1, percent=loss, correlation=75.0) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + correlated = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear(self.node1) + + assert uncorrelated >= correlated + assert correlated > 0 + + @pytest.mark.timeout(60 * 10) + def test_relay_packet_loss_sender_vs_receiver(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + total_msgs = 30 + window_s = 30.0 + loss = 50.0 + + self.tc.add_packet_loss(self.node1, percent=loss) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + sender_loss = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear(self.node1) + + self.tc.add_packet_loss(self.node4, percent=loss) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + receiver_loss = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + logger.debug(f"sender_loss={sender_loss} receiver_loss={receiver_loss}") + self.tc.clear(self.node4) + + assert sender_loss > 0 + assert receiver_loss > 0 + + @pytest.mark.timeout(60 * 10) + def test_relay_packet_loss_applied_mid_way(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + window_s = 30.0 + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(10): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + self.tc.add_packet_loss(self.node1, percent=50.0) + + for _ in range(20): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + received = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear(self.node1) + + assert received > 0