Add the number of shards fix suggested to address CI issue (#182)

* Add the number of shards fix suggested to address CI issue

* Fix failed auto-shard tests

* add the auto-shard option for the failed tests

* fix the .so build issue

* remove commented line

* fix admin flags tests
This commit is contained in:
AYAHASSAN287 2026-06-11 22:02:51 +03:00 committed by GitHub
parent 1a3e00d11e
commit 2267b49254
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 66 additions and 17 deletions

View File

@ -24,10 +24,15 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
LEVEL_RE = re.compile(r'"lvl"\s*:\s*"(TRC|DBG|INF|NTC|WRN|ERR|FTL)"|\b(TRC|DBG|INF|NTC|WRN|ERR|FTL)\b')
# Newer nwaku images color log output; strip ANSI codes so the level tags
# aren't wrapped (e.g. "\x1b[33mWRN\x1b[0m") and LEVEL_RE can match them.
ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
def _read_tail_counts(self, path: str, start_size: int) -> dict:
with open(path, "rb") as f:
f.seek(start_size)
text = f.read().decode(errors="ignore")
text = self.ANSI_RE.sub("", text)
counts = {t: 0 for t in self.TAGS}
for a, b in self.LEVEL_RE.findall(text):
counts[(a or b)] += 1
@ -213,7 +218,6 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
break
time.sleep(0.05)
# assert self.node1.set_log_level("DEBUG").status_code == 200
assert self.node1.set_log_level("INFO").status_code == 200
start = os.path.getsize(path)

View File

@ -23,15 +23,23 @@ class TestMultipleNodes(StepsSharding):
self.check_published_message_reaches_relay_peer(pubsub_topic=self.test_pubsub_topic)
def test_auto_shard_relay(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.setup_optional_relay_nodes(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.setup_main_relay_nodes(
cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=self.num_shards_in_network
)
self.setup_optional_relay_nodes(
cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=self.num_shards_in_network
)
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic])
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)
def test_auto_shard_relay_10_nwaku_nodes(self):
self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.setup_nwaku_relay_nodes(num_nodes=8, cluster_id=self.auto_cluster, content_topic=self.test_content_topic)
self.setup_first_relay_node_with_filter(
cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=self.num_shards_in_network
)
self.setup_nwaku_relay_nodes(
num_nodes=8, cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=self.num_shards_in_network
)
self.subscribe_main_relay_nodes(content_topics=[self.test_content_topic])
self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic])
self.check_published_message_reaches_relay_peer(content_topic=self.test_content_topic)

View File

@ -64,7 +64,9 @@ class TestRelayAutosharding(StepsSharding):
@pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_DIFFERENT_SHARDS])
def test_subscribe_via_api_to_new_content_topics(self, content_topic_list):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic_list[:1])
self.setup_main_relay_nodes(
cluster_id=self.auto_cluster, num_shards_in_network=self.num_shards_in_network, content_topic=content_topic_list[:1]
)
self.subscribe_main_relay_nodes(content_topics=content_topic_list[1:])
for content_topic in content_topic_list[1:]:
self.check_published_message_reaches_relay_peer(content_topic=content_topic)

View File

@ -12,39 +12,66 @@ logger = get_custom_logger(__name__)
class TestRunningNodesAutosharding(StepsSharding):
@pytest.mark.parametrize("content_topic", CONTENT_TOPICS_DIFFERENT_SHARDS)
def test_single_content_topic(self, content_topic):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic)
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, num_shards_in_network=self.num_shards_in_network, content_topic=content_topic)
self.subscribe_first_relay_node(content_topics=[content_topic])
self.subscribe_second_relay_node(content_topics=[content_topic])
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
@pytest.mark.parametrize("content_topic_list", [CONTENT_TOPICS_SHARD_0, CONTENT_TOPICS_SHARD_7])
def test_multiple_content_topics_same_shard(self, content_topic_list):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=content_topic_list)
self.setup_main_relay_nodes(
cluster_id=self.auto_cluster,
num_shards_in_network=self.num_shards_in_network,
content_topic=content_topic_list,
)
self.subscribe_first_relay_node(content_topics=content_topic_list)
self.subscribe_second_relay_node(content_topics=content_topic_list)
for content_topic in content_topic_list:
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
def test_multiple_content_topics_different_shard(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS)
self.setup_main_relay_nodes(
cluster_id=self.auto_cluster, num_shards_in_network=self.num_shards_in_network, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS
)
self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
self.subscribe_second_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
def test_2_nodes_different_content_topic_same_shard(self):
self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/weekly/protobuf")
self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/newsService/1.0/alerts/xml")
self.setup_first_relay_node_with_filter(
cluster_id=self.auto_cluster, content_topic="/newsService/1.0/weekly/protobuf", num_shards_in_network=self.num_shards_in_network
)
self.setup_second_relay_node(
cluster_id=self.auto_cluster, content_topic="/newsService/1.0/alerts/xml", num_shards_in_network=self.num_shards_in_network
)
self.subscribe_first_relay_node(content_topics=["/newsService/1.0/weekly/protobuf"])
self.subscribe_second_relay_node(content_topics=["/newsService/1.0/alerts/xml"])
self.check_published_message_reaches_relay_peer(content_topic="/newsService/1.0/weekly/protobuf")
# relay; node2's REST cache only serves its own subscribed content topic (404 for node1's),
# so the arrival is asserted via the node log instead
self.check_published_message_reaches_relay_peer(content_topic="/newsService/1.0/weekly/protobuf", peer_list=[self.node1])
assert self.node2.search_waku_log_for_string(
r"received relay message.*contentTopic.*/newsService/1.0/weekly/protobuf", use_regex=True
), "Message on the shared shard did not reach node2's relay"
def test_2_nodes_different_content_topic_different_shard(self):
self.setup_first_relay_node_with_filter(cluster_id=self.auto_cluster, content_topic="/myapp/1/latest/proto")
self.setup_second_relay_node(cluster_id=self.auto_cluster, content_topic="/waku/2/content/test.js")
self.setup_first_relay_node_with_filter(
cluster_id=self.auto_cluster, content_topic="/myapp/1/latest/proto", num_shards_in_network=self.num_shards_in_network
)
# shard=1 overrides the default --shard=0 so node2 only relays its own content topic's shard
self.setup_second_relay_node(
cluster_id=self.auto_cluster,
content_topic="/waku/2/content/test.js",
shard="1",
num_shards_in_network=self.num_shards_in_network,
)
self.subscribe_first_relay_node(content_topics=["/myapp/1/latest/proto"])
self.subscribe_second_relay_node(content_topics=["/waku/2/content/test.js"])
self.check_published_message_reaches_relay_peer(content_topic="/myapp/1/latest/proto")
try:
self.check_published_message_reaches_relay_peer(content_topic="/myapp/1/latest/proto")
raise AssertionError("Message on shard 0 unexpectedly reached node2 on shard 1")
except Exception as ex:
assert "404 Client Error" in str(ex), f"Expected node2 to not have the message, got: {ex}"
def test_content_topic_not_in_docker_flags(self):
self.setup_main_relay_nodes(cluster_id=2, pubsub_topic=self.test_pubsub_topic)
@ -60,12 +87,18 @@ class TestRunningNodesAutosharding(StepsSharding):
assert f"Peer NODE_2:{NODE_2} couldn't find any messages" in str(ex)
def test_multiple_content_topics_and_multiple_pubsub_topics(self):
# nwaku drops the pubsub-topic CLI arg, so the pubsub topics are subscribed via REST instead
self.setup_main_relay_nodes(
cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER
cluster_id=self.auto_cluster,
content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS,
num_shards_in_network=self.num_shards_in_network,
)
self.subscribe_main_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
self.subscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
self.check_published_message_reaches_relay_peer(content_topic=content_topic)
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic)
def test_node_uses_both_auto_and_regular_apis(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
@ -85,7 +118,9 @@ class TestRunningNodesAutosharding(StepsSharding):
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}"
def test_sender_uses_regular_api_receiver_uses_auto_api(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
self.setup_main_relay_nodes(
cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic, num_shards_in_network=self.num_shards_in_network
)
self.subscribe_first_relay_node(pubsub_topics=[self.test_pubsub_topic])
self.subscribe_second_relay_node(content_topics=[self.test_content_topic])
self.relay_message(self.node1, self.create_message(contentTopic=self.test_content_topic), pubsub_topic=self.test_pubsub_topic)