From 155296c4d3a46f7e675b4822ac8d78aa2997a676 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 31 Mar 2026 04:44:37 +0200 Subject: [PATCH] Fix failing test cases on logos-delivery/v0.38 (#165) * Fix auto and static sharding subscribe/unsubscribe tests - use a safe un-used cluster-id ever (cluster id 2 is now defaults to logos.dev with its settings), also adapted static sharding unsubscribe to PR#3732 * Adjust cluster_id to pubsub_topics * Fix uncertain rate limit hit of filter subscribes - this is a planned behavior of current rate limiting, as we are trying our best to serve requests within reasonanble flexibility, thus we mint new tokens over time, so it can be seen as we are able to serve more requests as configured, those are not hard limits. * fix test_relay_2_nodes_bandwidth_low_vs_high_drain_time flaky result, eliminate jitter and localhost test optimization can appear on docker networking. --- src/steps/sharding.py | 4 ++-- src/test_data.py | 18 +++++++-------- tests/e2e/test_network_conditions.py | 23 +++++++++++++++----- tests/filter/test_subscribe_create.py | 23 +++++++++++++++----- tests/sharding/test_filter.py | 8 +++++++ tests/sharding/test_relay_static_sharding.py | 9 +------- 6 files changed, 54 insertions(+), 31 deletions(-) diff --git a/src/steps/sharding.py b/src/steps/sharding.py index 5086a57f0..7c8aabc25 100644 --- a/src/steps/sharding.py +++ b/src/steps/sharding.py @@ -20,9 +20,9 @@ logger = get_custom_logger(__name__) class StepsSharding(StepsRelay): test_content_topic = "/myapp/1/latest/proto" - test_pubsub_topic = "/waku/2/rs/2/0" + test_pubsub_topic = "/waku/2/rs/199/0" test_payload = "Sharding works!!" - auto_cluster = 2 + auto_cluster = 199 num_shards_in_network = 8 @pytest.fixture(scope="function", autouse=True) diff --git a/src/test_data.py b/src/test_data.py index 02ad8c2fc..b5e425201 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -96,7 +96,7 @@ CONTENT_TOPICS_SHARD_7 = [ "/newsService/4.0/updates/yaml", ] -DEFAULT_CLUSTER_ID = "3" +DEFAULT_CLUSTER_ID = "198" VALID_PUBSUB_TOPICS = [ f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0", f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1", @@ -130,14 +130,14 @@ PUBSUB_TOPICS_DIFFERENT_CLUSTERS = [ ] PUBSUB_TOPICS_SAME_CLUSTER = [ - "/waku/2/rs/2/0", - "/waku/2/rs/2/1", - "/waku/2/rs/2/2", - "/waku/2/rs/2/3", - "/waku/2/rs/2/4", - "/waku/2/rs/2/5", - "/waku/2/rs/2/6", - "/waku/2/rs/2/7", + "/waku/2/rs/199/0", + "/waku/2/rs/199/1", + "/waku/2/rs/199/2", + "/waku/2/rs/199/3", + "/waku/2/rs/199/4", + "/waku/2/rs/199/5", + "/waku/2/rs/199/6", + "/waku/2/rs/199/7", ] PUBSUB_TOPICS_WRONG_FORMAT = [ diff --git a/tests/e2e/test_network_conditions.py b/tests/e2e/test_network_conditions.py index 54c909a64..4ec46654b 100644 --- a/tests/e2e/test_network_conditions.py +++ b/tests/e2e/test_network_conditions.py @@ -622,9 +622,13 @@ class TestNetworkConditions(StepsRelay): @pytest.mark.timeout(60 * 8) def test_relay_2_nodes_bandwidth_low_vs_high_drain_time(self): - msg_count = 200 - cache = "250" - poll_sleep = 0.5 + # large payload (~16KB) so 50 msgs = ~800KB total, + # making 256kbit meaningfully slower than 10mbit on loopback + msg_count = 50 + large_payload = to_base64("x" * 16_000) + cache = "100" + # fine-grained poll so sub-second differences are measurable + poll_sleep = 0.05 max_wait = 200 self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") @@ -642,12 +646,14 @@ class TestNetworkConditions(StepsRelay): self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + # apply tc to both nodes so intra-host loopback fast-path is throttled self.tc.add_bandwidth(self.node1, rate="256kbit") + self.tc.add_bandwidth(self.node2, rate="256kbit") _ = self.node2.get_relay_messages(self.test_pubsub_topic) for _ in range(msg_count): - self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic) total_low_msgs = 0 t0 = time() @@ -657,12 +663,15 @@ class TestNetworkConditions(StepsRelay): sleep(poll_sleep) low_rate_t = time() - t0 + # upgrade both nodes to high bandwidth self.tc.add_bandwidth(self.node1, rate="10mbit") + self.tc.add_bandwidth(self.node2, rate="10mbit") _ = self.node2.get_relay_messages(self.test_pubsub_topic) + sleep(1) # let the phase-1 shaper queue fully drain before phase 2 for _ in range(msg_count): - self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic) total_high_msgs = 0 t1 = time() @@ -680,7 +689,9 @@ class TestNetworkConditions(StepsRelay): assert total_low_msgs >= msg_count assert total_high_msgs >= msg_count - assert high_rate_t < low_rate_t + # Assert high bandwidth was meaningfully faster, not just marginally so, + # to absorb scheduling jitter on localhost Docker + assert high_rate_t < low_rate_t / 2 @pytest.mark.timeout(60 * 6) def test_relay_2_nodes_packet_reordering(self): diff --git a/tests/filter/test_subscribe_create.py b/tests/filter/test_subscribe_create.py index ca8d0221e..5a57b277d 100644 --- a/tests/filter/test_subscribe_create.py +++ b/tests/filter/test_subscribe_create.py @@ -1,4 +1,5 @@ import pytest +from uuid import uuid4 from src.env_vars import NODE_1, NODE_2 from src.libs.custom_logger import get_custom_logger from src.test_data import INVALID_CONTENT_TOPICS, SAMPLE_INPUTS, VALID_PUBSUB_TOPICS @@ -60,9 +61,10 @@ class TestFilterSubscribeCreate(StepsFilter): assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" def test_filter_subscribe_to_29_content_topics_in_separate_calls(self, subscribe_main_nodes): + # subscribe_main_nodes already consumed 1 slot; make 29 more = 30 total, all must succeed _29_content_topics = [str(i) for i in range(29)] for content_topic in _29_content_topics: - self.create_filter_subscription({"requestId": "1", "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic}) + self.create_filter_subscription({"requestId": str(uuid4()), "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic}) failed_content_topics = [] for content_topic in _29_content_topics: logger.debug(f"Running test with content topic {content_topic}") @@ -73,11 +75,20 @@ class TestFilterSubscribeCreate(StepsFilter): logger.error(f"ContentTopic {content_topic} failed: {str(ex)}") failed_content_topics.append(content_topic) assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" - try: - self.create_filter_subscription({"requestId": "1", "contentFilters": ["rate limited"], "pubsubTopic": self.test_pubsub_topic}) - raise AssertionError("The 30th subscribe call was not rate limited!!!") - except Exception as ex: - assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex) + # Rate limit is a token bucket (30/min); keep calling beyond 30 until one is denied + rate_limited = False + for extra in range(1, 20): + try: + self.create_filter_subscription( + {"requestId": str(uuid4()), "contentFilters": [f"extra_{extra}"], "pubsubTopic": self.test_pubsub_topic} + ) + logger.debug(f"Extra subscribe call #{extra} succeeded") + except Exception as ex: + assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex), f"Unexpected error on extra call #{extra}: {ex}" + logger.info(f"Rate limit hit on extra call #{extra}: {ex}") + rate_limited = True + break + assert rate_limited, "Rate limit was not triggered on any call beyond 30" def test_filter_subscribe_to_101_content_topics(self, subscribe_main_nodes): try: diff --git a/tests/sharding/test_filter.py b/tests/sharding/test_filter.py index aba678bad..4b526b516 100644 --- a/tests/sharding/test_filter.py +++ b/tests/sharding/test_filter.py @@ -32,8 +32,10 @@ class TestFilterAutoSharding(StepsSharding): self.setup_first_relay_node_with_filter( cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic, num_shards_in_network=1 ) + self.setup_third_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=1) self.setup_second_node_as_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic) self.subscribe_first_relay_node(content_topics=[self.test_content_topic]) + self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic]) self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic) self.check_published_message_reaches_filter_peer(content_topic=self.test_content_topic) @@ -44,10 +46,16 @@ class TestFilterAutoSharding(StepsSharding): pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER, num_shards_in_network=8, ) + self.setup_third_relay_node( + cluster_id=self.auto_cluster, + content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, + num_shards_in_network=8, + ) self.setup_second_node_as_filter( cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=self.test_pubsub_topic ) self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + self.subscribe_optional_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER): self.subscribe_filter_node(self.node2, content_topics=[content_topic], pubsub_topic=pubsub_topic) for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: diff --git a/tests/sharding/test_relay_static_sharding.py b/tests/sharding/test_relay_static_sharding.py index 07e632d72..aef848819 100644 --- a/tests/sharding/test_relay_static_sharding.py +++ b/tests/sharding/test_relay_static_sharding.py @@ -96,14 +96,7 @@ class TestRelayStaticSharding(StepsSharding): def test_unsubscribe_from_non_subscribed_pubsub_topics(self): self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) - try: - self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) - if self.node1.is_nwaku(): - pass - else: - raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!") - except Exception as ex: - assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic)