diff --git a/tests/e2e/test_e2e.py b/tests/e2e/test_e2e.py index 410be28bd..0aef1e738 100644 --- a/tests/e2e/test_e2e.py +++ b/tests/e2e/test_e2e.py @@ -503,54 +503,3 @@ class TestE2E(StepsFilter, StepsStore, StepsRelay, StepsLightPush): pubsub_topic=self.test_pubsub_topic, content_topics=self.test_content_topic, page_size=5, ascending="true" ) assert len(store_response["messages"]) == 1, "Can't find stored message!!" - - @pytest.mark.timeout(60 * 4) - def test_relay_2_nodes_bandwidth_high_vs_low(self): - self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") - self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") - - self.node1.start(relay="true") - self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) - - self.node1.set_relay_subscriptions([self.test_pubsub_topic]) - self.node2.set_relay_subscriptions([self.test_pubsub_topic]) - - self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) - - msg_count = 120 - window_s = 20 - - self.tc.add_bandwidth(self.node2, rate="10mbit") - - for _ in range(msg_count): - self.node1.send_relay_message( - self.create_message(), - self.test_pubsub_topic, - ) - - deadline = time() + window_s - high_bw_msgs = 0 - while time() < deadline: - msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] - high_bw_msgs = max(high_bw_msgs, len(msgs)) - sleep(0.5) - - self.tc.add_bandwidth(self.node2, rate="256kbit") - - for _ in range(msg_count): - self.node1.send_relay_message( - self.create_message(), - self.test_pubsub_topic, - ) - - deadline = time() + window_s - low_bw_msgs = 0 - while time() < deadline: - msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] - low_bw_msgs = max(low_bw_msgs, len(msgs)) - sleep(0.5) - - logger.info(f"high_bw_msgs={high_bw_msgs} low_bw_msgs={low_bw_msgs} " f"msg_count={msg_count} window={window_s}s") - - assert high_bw_msgs > 0 - assert high_bw_msgs >= low_bw_msgs diff --git a/tests/e2e/test_network_conditions.py b/tests/e2e/test_network_conditions.py index b94414b8f..7bffaabc2 100644 --- a/tests/e2e/test_network_conditions.py +++ b/tests/e2e/test_network_conditions.py @@ -1,6 +1,6 @@ import pytest import logging -from time import time +from time import time, sleep from src.libs.custom_logger import get_custom_logger from src.env_vars import NODE_1, NODE_2 from src.node.waku_node import WakuNode @@ -522,4 +522,97 @@ class TestNetworkConditions(StepsRelay): received = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) self.tc.clear(self.node1) - assert received > 0 + assert received > 0 @ pytest.mark.timeout(60 * 4) + + @pytest.mark.timeout(60 * 2) + def test_relay_2_nodes_low_bandwidth_reliability(self): + msg_count = 200 + max_wait = 50 + poll_sleep = 0.5 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity="250", + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + self.tc.add_bandwidth(self.node1, rate="256kbit") + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + t0 = time() + received = 0 + while time() - t0 < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + received = len(msgs) + if received >= msg_count: + break + sleep(poll_sleep) + + elapsed = time() - t0 + logger.info(f"low_bw_elapsed={elapsed:.2f}s received={received} msg_count={msg_count}") + + assert received >= msg_count + + @pytest.mark.timeout(60 * 8) + def test_relay_2_nodes_low_bandwidth_sending_over_time(self): + msg_count = 200 + cache_capacity = "250" + + send_interval_s = 0.05 + poll_interval_s = 0.5 + max_wait_s = 240 + min_received = 150 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity=cache_capacity, + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + # mirror your suite's "low bandwidth" shaping values (rate + burst/limit handled by the tc wrapper defaults) + self.tc.add_bandwidth(self.node1, rate="256kbit") + + received = 0 + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + sleep(send_interval_s) + + t0 = time() + while time() - t0 < max_wait_s and received < msg_count: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + received += len(msgs) + if received >= msg_count: + break + sleep(poll_interval_s) + + total_time = time() - t0 + + logger.info( + "low_bw_reliability " + f"rate=256kbit msg_count={msg_count} cache_capacity={cache_capacity} " + f"send_interval_s={send_interval_s} poll_interval_s={poll_interval_s} " + f"recv_duration={total_time:.2f}s" + ) + + assert received >= min_received, f"received {received}/{msg_count} under low bandwidth"