From 2267b49254f7c9ff7a5d6093d6d8d770e9dac85c Mon Sep 17 00:00:00 2001 From: AYAHASSAN287 <49167455+AYAHASSAN287@users.noreply.github.com> Date: Thu, 11 Jun 2026 22:02:51 +0300 Subject: [PATCH] Add the number of shards fix suggested to address CI issue (#182) * Add the number of shards fix suggested to address CI issue * Fix failed auto-shard tests * add the auto-shard option for the failed tests * fix the .so build issue * remove commented line * fix admin flags tests --- tests/rest_flags/test_admin_flags.py | 6 +- tests/sharding/test_multiple_nodes.py | 16 ++++-- tests/sharding/test_relay_auto_sharding.py | 4 +- .../test_running_nodes_auto_sharding.py | 57 +++++++++++++++---- 4 files changed, 66 insertions(+), 17 deletions(-) diff --git a/tests/rest_flags/test_admin_flags.py b/tests/rest_flags/test_admin_flags.py index 83b62991e..1b10da508 100644 --- a/tests/rest_flags/test_admin_flags.py +++ b/tests/rest_flags/test_admin_flags.py @@ -24,10 +24,15 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): LEVEL_RE = re.compile(r'"lvl"\s*:\s*"(TRC|DBG|INF|NTC|WRN|ERR|FTL)"|\b(TRC|DBG|INF|NTC|WRN|ERR|FTL)\b') + # Newer nwaku images color log output; strip ANSI codes so the level tags + # aren't wrapped (e.g. "\x1b[33mWRN\x1b[0m") and LEVEL_RE can match them. + ANSI_RE = re.compile(r"\x1b\[[0-9;]*m") + def _read_tail_counts(self, path: str, start_size: int) -> dict: with open(path, "rb") as f: f.seek(start_size) text = f.read().decode(errors="ignore") + text = self.ANSI_RE.sub("", text) counts = {t: 0 for t in self.TAGS} for a, b in self.LEVEL_RE.findall(text): counts[(a or b)] += 1 @@ -213,7 +218,6 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): break time.sleep(0.05) - # assert self.node1.set_log_level("DEBUG").status_code == 200 assert self.node1.set_log_level("INFO").status_code == 200 start = os.path.getsize(path) diff --git a/tests/sharding/test_multiple_nodes.py b/tests/sharding/test_multiple_nodes.py index 067db10d1..1e85a45d0 100644 --- a/tests/sharding/test_multiple_nodes.py +++ b/tests/sharding/test_multiple_nodes.py @@ -23,15 +23,23 @@ class TestMultipleNodes(StepsSharding): self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic) def test_auto_shard_relay(self): - self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) - self.setup_optional_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.setup_main_relay_nodes( + cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=self.num_shards_in_network + ) + self.setup_optional_relay_nodes( + cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=self.num_shards_in_network + ) self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic]) self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic]) self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic) def test_auto_shard_relay_10_nwaku_nodes(self): - self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic) - self.setup_nwaku_relay_nodes(num_nodes=8, cluster_id=self.auto_cluster, content_topic=self.test_content_topic) + self.setup_first_relay_node_with_filter( + cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=self.num_shards_in_network + ) + self.setup_nwaku_relay_nodes( + num_nodes=8, cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=self.num_shards_in_network + ) self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic]) self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic]) self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic) diff --git a/tests/sharding/test_relay_auto_sharding.py b/tests/sharding/test_relay_auto_sharding.py index 9553c33ab..3a4e45df6 100644 --- a/tests/sharding/test_relay_auto_sharding.py +++ b/tests/sharding/test_relay_auto_sharding.py @@ -64,7 +64,9 @@ class TestRelayAutosharding(StepsSharding): @pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_DIFFERENT_SHARDS]) def test_subscribe_via_api_to_new_content_topics(self, content_topic_list): - self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic_list[:1]) + self.setup_main_relay_nodes( + cluster_id=self.auto_cluster, num_shards_in_network=self.num_shards_in_network, content_topic=content_topic_list[:1] + ) self.subscribe_main_relay_nodes(content_topics=content_topic_list[1:]) for content_topic in content_topic_list[1:]: self.check_published_message_reaches_relay_peer(content_topic=content_topic) diff --git a/tests/sharding/test_running_nodes_auto_sharding.py b/tests/sharding/test_running_nodes_auto_sharding.py index 23c5bbcef..27551ae65 100644 --- a/tests/sharding/test_running_nodes_auto_sharding.py +++ b/tests/sharding/test_running_nodes_auto_sharding.py @@ -12,39 +12,66 @@ logger = get_custom_logger(__name__) class TestRunningNodesAutosharding(StepsSharding): @pytest.mark.parametrize("content_topic", CONTENT_TOPICS_DIFFERENT_SHARDS) def test_single_content_topic(self, content_topic): - self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic) + self.setup_main_relay_nodes(cluster_id=self.auto_cluster, num_shards_in_network=self.num_shards_in_network, content_topic=content_topic) self.subscribe_first_relay_node(content_topics=[content_topic]) self.subscribe_second_relay_node(content_topics=[content_topic]) self.check_published_message_reaches_relay_peer(content_topic=content_topic) @pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_SHARD_7]) def test_multiple_content_topics_same_shard(self, content_topic_list): - self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic_list) + self.setup_main_relay_nodes( + cluster_id=self.auto_cluster, + num_shards_in_network=self.num_shards_in_network, + content_topic=content_topic_list, + ) self.subscribe_first_relay_node(content_topics=content_topic_list) self.subscribe_second_relay_node(content_topics=content_topic_list) for content_topic in content_topic_list: self.check_published_message_reaches_relay_peer(content_topic=content_topic) def test_multiple_content_topics_different_shard(self): - self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS) + self.setup_main_relay_nodes( + cluster_id=self.auto_cluster, num_shards_in_network=self.num_shards_in_network, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS + ) self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) self.subscribe_second_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: self.check_published_message_reaches_relay_peer(content_topic=content_topic) def test_2_nodes_different_content_topic_same_shard(self): - self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/weekly/protobuf") - self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/alerts/xml") + self.setup_first_relay_node_with_filter( + cluster_id=self.auto_cluster, content_topic="/newsService/1.0/weekly/protobuf", num_shards_in_network=self.num_shards_in_network + ) + self.setup_second_relay_node( + cluster_id=self.auto_cluster, content_topic="/newsService/1.0/alerts/xml", num_shards_in_network=self.num_shards_in_network + ) self.subscribe_first_relay_node(content_topics=["/newsService/1.0/weekly/protobuf"]) self.subscribe_second_relay_node(content_topics=["/newsService/1.0/alerts/xml"]) - self.check_published_message_reaches_relay_peer(content_topic="/newsService/1.0/weekly/protobuf") + # relay; node2's REST cache only serves its own subscribed content topic (404 for node1's), + # so the arrival is asserted via the node log instead + self.check_published_message_reaches_relay_peer(content_topic="/newsService/1.0/weekly/protobuf", peer_list=[self.node1]) + assert self.node2.search_waku_log_for_string( + r"received relay message.*contentTopic.*/newsService/1.0/weekly/protobuf", use_regex=True + ), "Message on the shared shard did not reach node2's relay" def test_2_nodes_different_content_topic_different_shard(self): - self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic="/myapp/1/latest/proto") - self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/waku/2/content/test.js") + self.setup_first_relay_node_with_filter( + cluster_id=self.auto_cluster, content_topic="/myapp/1/latest/proto", num_shards_in_network=self.num_shards_in_network + ) + # shard=1 overrides the default --shard=0 so node2 only relays its own content topic's shard + self.setup_second_relay_node( + cluster_id=self.auto_cluster, + content_topic="/waku/2/content/test.js", + shard="1", + num_shards_in_network=self.num_shards_in_network, + ) self.subscribe_first_relay_node(content_topics=["/myapp/1/latest/proto"]) self.subscribe_second_relay_node(content_topics=["/waku/2/content/test.js"]) - self.check_published_message_reaches_relay_peer(content_topic="/myapp/1/latest/proto") + try: + self.check_published_message_reaches_relay_peer(content_topic="/myapp/1/latest/proto") + raise AssertionError("Message on shard 0 unexpectedly reached node2 on shard 1") + except Exception as ex: + assert "404 Client Error" in str(ex), f"Expected node2 to not have the message, got: {ex}" def test_content_topic_not_in_docker_flags(self): self.setup_main_relay_nodes(cluster_id=2, pubsub_topic=self.test_pubsub_topic) @@ -60,12 +87,18 @@ class TestRunningNodesAutosharding(StepsSharding): assert f"Peer NODE_2:{NODE_2} couldn't find any messages" in str(ex) def test_multiple_content_topics_and_multiple_pubsub_topics(self): + # nwaku drops the pubsub-topic CLI arg, so the pubsub topics are subscribed via REST instead self.setup_main_relay_nodes( - cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER + cluster_id=self.auto_cluster, + content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, + num_shards_in_network=self.num_shards_in_network, ) self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: self.check_published_message_reaches_relay_peer(content_topic=content_topic) + for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: + self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic) def test_node_uses_both_auto_and_regular_apis(self): self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) @@ -85,7 +118,9 @@ class TestRunningNodesAutosharding(StepsSharding): assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" def test_sender_uses_regular_api_receiver_uses_auto_api(self): - self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) + self.setup_main_relay_nodes( + cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic, num_shards_in_network=self.num_shards_in_network + ) self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic]) self.subscribe_second_relay_node(content_topics=[self.test_content_topic]) self.relay_message(self.node1, self.create_message(contentTopic=self.test_content_topic), pubsub_topic=self.test_pubsub_topic)