diff --git a/src/steps/sharding.py b/src/steps/sharding.py index 5086a57f0..7c8aabc25 100644 --- a/src/steps/sharding.py +++ b/src/steps/sharding.py @@ -20,9 +20,9 @@ logger = get_custom_logger(__name__) class StepsSharding(StepsRelay): test_content_topic = "/myapp/1/latest/proto" - test_pubsub_topic = "/waku/2/rs/2/0" + test_pubsub_topic = "/waku/2/rs/199/0" test_payload = "Sharding works!!" - auto_cluster = 2 + auto_cluster = 199 num_shards_in_network = 8 @pytest.fixture(scope="function", autouse=True) diff --git a/src/test_data.py b/src/test_data.py index 02ad8c2fc..b5e425201 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -96,7 +96,7 @@ CONTENT_TOPICS_SHARD_7 = [ "/newsService/4.0/updates/yaml", ] -DEFAULT_CLUSTER_ID = "3" +DEFAULT_CLUSTER_ID = "198" VALID_PUBSUB_TOPICS = [ f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0", f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1", @@ -130,14 +130,14 @@ PUBSUB_TOPICS_DIFFERENT_CLUSTERS = [ ] PUBSUB_TOPICS_SAME_CLUSTER = [ - "/waku/2/rs/2/0", - "/waku/2/rs/2/1", - "/waku/2/rs/2/2", - "/waku/2/rs/2/3", - "/waku/2/rs/2/4", - "/waku/2/rs/2/5", - "/waku/2/rs/2/6", - "/waku/2/rs/2/7", + "/waku/2/rs/199/0", + "/waku/2/rs/199/1", + "/waku/2/rs/199/2", + "/waku/2/rs/199/3", + "/waku/2/rs/199/4", + "/waku/2/rs/199/5", + "/waku/2/rs/199/6", + "/waku/2/rs/199/7", ] PUBSUB_TOPICS_WRONG_FORMAT = [ diff --git a/tests/e2e/test_network_conditions.py b/tests/e2e/test_network_conditions.py index 54c909a64..4ec46654b 100644 --- a/tests/e2e/test_network_conditions.py +++ b/tests/e2e/test_network_conditions.py @@ -622,9 +622,13 @@ class TestNetworkConditions(StepsRelay): @pytest.mark.timeout(60 * 8) def test_relay_2_nodes_bandwidth_low_vs_high_drain_time(self): - msg_count = 200 - cache = "250" - poll_sleep = 0.5 + # large payload (~16KB) so 50 msgs = ~800KB total, + # making 256kbit meaningfully slower than 10mbit on loopback + msg_count = 50 + large_payload = to_base64("x" * 16_000) + cache = "100" + # fine-grained poll so sub-second differences are measurable + poll_sleep = 0.05 max_wait = 200 self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") @@ -642,12 +646,14 @@ class TestNetworkConditions(StepsRelay): self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + # apply tc to both nodes so intra-host loopback fast-path is throttled self.tc.add_bandwidth(self.node1, rate="256kbit") + self.tc.add_bandwidth(self.node2, rate="256kbit") _ = self.node2.get_relay_messages(self.test_pubsub_topic) for _ in range(msg_count): - self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic) total_low_msgs = 0 t0 = time() @@ -657,12 +663,15 @@ class TestNetworkConditions(StepsRelay): sleep(poll_sleep) low_rate_t = time() - t0 + # upgrade both nodes to high bandwidth self.tc.add_bandwidth(self.node1, rate="10mbit") + self.tc.add_bandwidth(self.node2, rate="10mbit") _ = self.node2.get_relay_messages(self.test_pubsub_topic) + sleep(1) # let the phase-1 shaper queue fully drain before phase 2 for _ in range(msg_count): - self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic) total_high_msgs = 0 t1 = time() @@ -680,7 +689,9 @@ class TestNetworkConditions(StepsRelay): assert total_low_msgs >= msg_count assert total_high_msgs >= msg_count - assert high_rate_t < low_rate_t + # Assert high bandwidth was meaningfully faster, not just marginally so, + # to absorb scheduling jitter on localhost Docker + assert high_rate_t < low_rate_t / 2 @pytest.mark.timeout(60 * 6) def test_relay_2_nodes_packet_reordering(self): diff --git a/tests/filter/test_subscribe_create.py b/tests/filter/test_subscribe_create.py index ca8d0221e..5a57b277d 100644 --- a/tests/filter/test_subscribe_create.py +++ b/tests/filter/test_subscribe_create.py @@ -1,4 +1,5 @@ import pytest +from uuid import uuid4 from src.env_vars import NODE_1, NODE_2 from src.libs.custom_logger import get_custom_logger from src.test_data import INVALID_CONTENT_TOPICS, SAMPLE_INPUTS, VALID_PUBSUB_TOPICS @@ -60,9 +61,10 @@ class TestFilterSubscribeCreate(StepsFilter): assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" def test_filter_subscribe_to_29_content_topics_in_separate_calls(self, subscribe_main_nodes): + # subscribe_main_nodes already consumed 1 slot; make 29 more = 30 total, all must succeed _29_content_topics = [str(i) for i in range(29)] for content_topic in _29_content_topics: - self.create_filter_subscription({"requestId": "1", "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic}) + self.create_filter_subscription({"requestId": str(uuid4()), "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic}) failed_content_topics = [] for content_topic in _29_content_topics: logger.debug(f"Running test with content topic {content_topic}") @@ -73,11 +75,20 @@ class TestFilterSubscribeCreate(StepsFilter): logger.error(f"ContentTopic {content_topic} failed: {str(ex)}") failed_content_topics.append(content_topic) assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" - try: - self.create_filter_subscription({"requestId": "1", "contentFilters": ["rate limited"], "pubsubTopic": self.test_pubsub_topic}) - raise AssertionError("The 30th subscribe call was not rate limited!!!") - except Exception as ex: - assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex) + # Rate limit is a token bucket (30/min); keep calling beyond 30 until one is denied + rate_limited = False + for extra in range(1, 20): + try: + self.create_filter_subscription( + {"requestId": str(uuid4()), "contentFilters": [f"extra_{extra}"], "pubsubTopic": self.test_pubsub_topic} + ) + logger.debug(f"Extra subscribe call #{extra} succeeded") + except Exception as ex: + assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex), f"Unexpected error on extra call #{extra}: {ex}" + logger.info(f"Rate limit hit on extra call #{extra}: {ex}") + rate_limited = True + break + assert rate_limited, "Rate limit was not triggered on any call beyond 30" def test_filter_subscribe_to_101_content_topics(self, subscribe_main_nodes): try: diff --git a/tests/sharding/test_filter.py b/tests/sharding/test_filter.py index aba678bad..4b526b516 100644 --- a/tests/sharding/test_filter.py +++ b/tests/sharding/test_filter.py @@ -32,8 +32,10 @@ class TestFilterAutoSharding(StepsSharding): self.setup_first_relay_node_with_filter( cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic, num_shards_in_network=1 ) + self.setup_third_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=1) self.setup_second_node_as_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic) self.subscribe_first_relay_node(content_topics=[self.test_content_topic]) + self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic]) self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic) self.check_published_message_reaches_filter_peer(content_topic=self.test_content_topic) @@ -44,10 +46,16 @@ class TestFilterAutoSharding(StepsSharding): pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER, num_shards_in_network=8, ) + self.setup_third_relay_node( + cluster_id=self.auto_cluster, + content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, + num_shards_in_network=8, + ) self.setup_second_node_as_filter( cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=self.test_pubsub_topic ) self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + self.subscribe_optional_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER): self.subscribe_filter_node(self.node2, content_topics=[content_topic], pubsub_topic=pubsub_topic) for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: diff --git a/tests/sharding/test_relay_static_sharding.py b/tests/sharding/test_relay_static_sharding.py index 07e632d72..aef848819 100644 --- a/tests/sharding/test_relay_static_sharding.py +++ b/tests/sharding/test_relay_static_sharding.py @@ -96,14 +96,7 @@ class TestRelayStaticSharding(StepsSharding): def test_unsubscribe_from_non_subscribed_pubsub_topics(self): self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) - try: - self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) - if self.node1.is_nwaku(): - pass - else: - raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!") - except Exception as ex: - assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic)