test: join fleet with real cluster ID shards RLN on

This commit is contained in:
Roman 2026-04-23 14:50:16 +08:00
parent 5dd2acf7b6
commit 0442ef4fd3
No known key found for this signature in database
GPG Key ID: 583BDF43C238B83E
6 changed files with 249 additions and 14 deletions

View File

@ -95,9 +95,15 @@ class StepsFilter(StepsCommon):
for index, peer in enumerate(peer_list): for index, peer in enumerate(peer_list):
logger.debug(f"Checking that peer NODE_{index + 2}:{peer.image} can find the published message") logger.debug(f"Checking that peer NODE_{index + 2}:{peer.image} can find the published message")
get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topic=pubsub_topic, node=peer) get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topic=pubsub_topic, node=peer)
assert get_messages_response, f"Peer NODE_{index + 2}:{peer.image} couldn't find any messages" # get_filter_messages already scopes to the requested content topic; the
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" # additional filter guards against any residual or fleet messages that may
waku_message = WakuMessage(get_messages_response) # have been queued under the same topic before this call.
test_messages = [m for m in get_messages_response if m.get("contentTopic") == message["contentTopic"]]
assert test_messages, f"Peer NODE_{index + 2}:{peer.image} couldn't find any messages"
assert len(test_messages) == 1, (
f"Expected 1 test message but got {len(test_messages)} " f"(total messages returned: {len(get_messages_response)})"
)
waku_message = WakuMessage(test_messages)
waku_message.assert_received_message(message) waku_message.assert_received_message(message)
@allure.step @allure.step

View File

@ -124,9 +124,15 @@ class StepsLightPush(StepsCommon):
for index, peer in enumerate(peer_list): for index, peer in enumerate(peer_list):
logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the lightpushed message") logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the lightpushed message")
get_messages_response = peer.get_relay_messages(pubsub_topic) get_messages_response = peer.get_relay_messages(pubsub_topic)
assert get_messages_response, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" # In fleet mode the relay cache may contain background messages from other
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" # fleet participants. Filter to only messages whose contentTopic matches
waku_message = WakuMessage(get_messages_response) # the test message so that fleet noise does not break the count assertion.
test_messages = [m for m in get_messages_response if m.get("contentTopic") == payload["message"]["contentTopic"]]
assert test_messages, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages"
assert len(test_messages) == 1, (
f"Expected 1 test message but got {len(test_messages)} " f"(total messages in cache: {len(get_messages_response)})"
)
waku_message = WakuMessage(test_messages)
waku_message.assert_received_message(payload["message"]) waku_message.assert_received_message(payload["message"])
@allure.step @allure.step

View File

@ -122,9 +122,15 @@ class StepsRelay(StepsCommon):
for index, peer in enumerate(peer_list): for index, peer in enumerate(peer_list):
logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message")
get_messages_response = peer.get_relay_messages(pubsub_topic) get_messages_response = peer.get_relay_messages(pubsub_topic)
assert get_messages_response, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" # In fleet mode the relay cache may contain background messages from other
assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" # fleet participants. Filter to only the message whose contentTopic matches
waku_message = WakuMessage(get_messages_response) # what the test sent so that fleet noise does not break the count assertion.
test_messages = [m for m in get_messages_response if m.get("contentTopic") == message["contentTopic"]]
assert test_messages, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages"
assert len(test_messages) == 1, (
f"Expected 1 test message but got {len(test_messages)} " f"(total messages in cache: {len(get_messages_response)})"
)
waku_message = WakuMessage(test_messages)
waku_message.assert_received_message(message) waku_message.assert_received_message(message)
@allure.step @allure.step

View File

@ -105,6 +105,11 @@ VALID_PUBSUB_TOPICS = [
f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1000", f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1000",
] ]
# Fleet cluster configuration mirrors config-n1.toml / config-n2.toml
# Both fleet nodes run cluster-id=1 with shards 0-7.
FLEET_CLUSTER_ID = "1"
FLEET_PUBSUB_TOPICS = [f"/waku/2/rs/{FLEET_CLUSTER_ID}/{i}" for i in range(8)]
PUBSUB_TOPICS_STORE = [ PUBSUB_TOPICS_STORE = [
f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0", f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0",
f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1", f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1",

View File

@ -34,17 +34,20 @@ Activation (opt-in, disabled by default):
""" """
import inspect import inspect
import glob import glob
import random
import string
from src.libs.custom_logger import get_custom_logger from src.libs.custom_logger import get_custom_logger
import os import os
import pytest import pytest
from datetime import datetime from datetime import datetime
from time import time from time import time
from uuid import uuid4 from uuid import uuid4
from src.libs.common import attach_allure_file from src.libs.common import attach_allure_file, gen_step_id
import src.env_vars as env_vars import src.env_vars as env_vars
from src.env_vars import FLEET_PRIMARY_MULTIADDR, FLEET_DNS_DISCOVERY_URL, FLEET_N1_MULTIADDR, FLEET_N2_MULTIADDR from src.env_vars import FLEET_PRIMARY_MULTIADDR, FLEET_DNS_DISCOVERY_URL, FLEET_N1_MULTIADDR, FLEET_N2_MULTIADDR
from src.data_storage import DS from src.data_storage import DS
from src.postgres_setup import start_postgres, stop_postgres from src.postgres_setup import start_postgres, stop_postgres
from src.test_data import FLEET_CLUSTER_ID, FLEET_PUBSUB_TOPICS, PUBSUB_TOPICS_RLN
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
@ -88,8 +91,59 @@ def _append_fleet_kwarg(kwargs: dict, key: str, value: str) -> None:
kwargs[key] = [existing, value] kwargs[key] = [existing, value]
@pytest.fixture(scope="session")
def fleet_rln_state(request):
"""Register 2 RLN memberships once per test session when ``--fleet`` is active.
The on-disk keystore directories created here are reused by every test in
the session so that the expensive blockchain registration only happens once.
Yields a dict with keys:
``keystore_prefixes`` list[str] of random 4-char directory prefixes
``rln_membership_indexes`` list[int | None] returned by register_rln()
An empty dict (both lists empty) is yielded when fleet bootstrap is not
active or when ``RLN_CREDENTIALS`` is not set.
"""
if not _fleet_bootstrap_enabled(request.config):
yield {"keystore_prefixes": [], "rln_membership_indexes": []}
return
from src.node.waku_node import WakuNode
from src.env_vars import RLN_CREDENTIALS, DEFAULT_NWAKU
if not RLN_CREDENTIALS:
logger.info("Fleet RLN: RLN_CREDENTIALS not set nodes will start without RLN")
yield {"keystore_prefixes": [], "rln_membership_indexes": []}
return
state: dict = {"keystore_prefixes": [], "rln_membership_indexes": []}
try:
for i in range(2):
prefix = "".join(random.choices(string.ascii_lowercase, k=4))
node = WakuNode(DEFAULT_NWAKU, f"rln_reg_{i + 1}_{gen_step_id()}")
membership_index = node.register_rln(
rln_keystore_prefix=prefix,
rln_creds_source=RLN_CREDENTIALS,
rln_creds_id=str(i + 1),
)
state["keystore_prefixes"].append(prefix)
state["rln_membership_indexes"].append(membership_index)
logger.info(
"Fleet RLN: registered %d memberships indexes=%s prefixes=%s",
len(state["rln_membership_indexes"]),
state["rln_membership_indexes"],
state["keystore_prefixes"],
)
except Exception as ex:
logger.warning("Fleet RLN: registration failed nodes will start without RLN: %s", ex)
state = {"keystore_prefixes": [], "rln_membership_indexes": []}
yield state
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def patch_waku_node_start(request, monkeypatch): def patch_waku_node_start(request, monkeypatch, fleet_rln_state):
"""Monkey-patch WakuNode.start() to bootstrap every local node from the """Monkey-patch WakuNode.start() to bootstrap every local node from the
waku.test fleet before the test body runs. waku.test fleet before the test body runs.
@ -101,6 +155,10 @@ def patch_waku_node_start(request, monkeypatch):
multiaddr is *appended* as an additional entry. multiaddr is *appended* as an additional entry.
- ``dns_discovery`` and ``dns_discovery_url`` are set only if the caller - ``dns_discovery`` and ``dns_discovery_url`` are set only if the caller
has not already supplied them (``setdefault`` semantics). has not already supplied them (``setdefault`` semantics).
- When ``fleet_rln_state`` contains registered credentials AND the caller
has not provided explicit RLN args, RLN credentials are injected
automatically so that the node can participate in the fleet's
RLN-protected relay.
""" """
if not _fleet_bootstrap_enabled(request.config): if not _fleet_bootstrap_enabled(request.config):
logger.info("Fleet bootstrap inactive pass --fleet (or set FLEET_BOOTSTRAP=true) " "to connect local nodes to the waku.test fleet") logger.info("Fleet bootstrap inactive pass --fleet (or set FLEET_BOOTSTRAP=true) " "to connect local nodes to the waku.test fleet")
@ -108,12 +166,25 @@ def patch_waku_node_start(request, monkeypatch):
return return
from src.node.waku_node import WakuNode from src.node.waku_node import WakuNode
from src.env_vars import RLN_CREDENTIALS
original_start = WakuNode.start original_start = WakuNode.start
def fleet_joined_start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs): def fleet_joined_start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs):
logger.debug("fleet_joined_start: injecting waku.test bootstrap args into WakuNode.start()") logger.debug("fleet_joined_start: injecting waku.test bootstrap args into WakuNode.start()")
# Escape-hatch: callers that set skip_fleet_peering=True (e.g. lightpush client
# nodes that have no RLN membership) bypass all fleet bootstrap injection so
# they do NOT join the fleet relay mesh. discv5_bootstrap_node is still stripped
# to prevent accidental local-node coupling. All other kwargs are forwarded as-is.
if kwargs.pop("skip_fleet_peering", False):
if "discv5_bootstrap_node" in kwargs:
del kwargs["discv5_bootstrap_node"]
logger.debug(
"fleet_joined_start: skip_fleet_peering=True bypassing fleet bootstrap " "for this node (no fleet staticnode / shard injected)"
)
return original_start(self, wait_for_node_sec=wait_for_node_sec, use_wrapper=use_wrapper, **kwargs)
# Determine which fleet peer to connect to based on node creation order # Determine which fleet peer to connect to based on node creation order
# within the current test (DS.waku_nodes is reset to [] before each test). # within the current test (DS.waku_nodes is reset to [] before each test).
# The append to DS.waku_nodes happens inside _start_docker/_start_wrapper, # The append to DS.waku_nodes happens inside _start_docker/_start_wrapper,
@ -136,6 +207,37 @@ def patch_waku_node_start(request, monkeypatch):
_append_fleet_kwarg(kwargs, "staticnode", fleet_multiaddr) _append_fleet_kwarg(kwargs, "staticnode", fleet_multiaddr)
kwargs.setdefault("dns_discovery", "true") kwargs.setdefault("dns_discovery", "true")
kwargs.setdefault("dns_discovery_url", FLEET_DNS_DISCOVERY_URL) kwargs.setdefault("dns_discovery_url", FLEET_DNS_DISCOVERY_URL)
# Align local node cluster and shards with the fleet node configs
# (config-n1.toml / config-n2.toml both use cluster-id=1, shards 0-7).
kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID)
kwargs.setdefault("shard", list(range(8)))
# Inject session-level RLN credentials into nodes that don't already
# carry explicit RLN args (i.e. the regular waku_test_fleet tests that
# are not themselves RLN tests). Uses the same node_index as the fleet
# multiaddr assignment so NODE1 gets creds-id=1 and NODE2 gets creds-id=2.
# Only inject into nodes with relay enabled filter/lightpush/store service
# nodes run without relay and nwaku rejects rln-relay when WakuRelay is not mounted.
if fleet_rln_state["keystore_prefixes"] and kwargs.get("rln_creds_source") is None:
if str(kwargs.get("relay", "")).lower() == "true":
if node_index < len(fleet_rln_state["keystore_prefixes"]):
kwargs["rln_creds_source"] = RLN_CREDENTIALS
kwargs["rln_creds_id"] = str(node_index + 1)
kwargs["rln_keystore_prefix"] = fleet_rln_state["keystore_prefixes"][node_index]
kwargs["rln_relay_membership_index"] = str(fleet_rln_state["rln_membership_indexes"][node_index])
kwargs.setdefault("rln_relay_user_message_limit", "300")
logger.debug(
"fleet_joined_start: injected session RLN for node %d prefix=%s index=%s user_message_limit=300",
node_index,
kwargs["rln_keystore_prefix"],
kwargs["rln_relay_membership_index"],
)
else:
logger.debug(
"fleet_joined_start: skipping RLN injection for node %d relay not enabled (relay=%r)",
node_index,
kwargs.get("relay"),
)
# Strip any local-node discv5 bootstrap ENR so that each node bootstraps # Strip any local-node discv5 bootstrap ENR so that each node bootstraps
# independently from its assigned fleet peer rather than from another local # independently from its assigned fleet peer rather than from another local
@ -165,6 +267,107 @@ def patch_waku_node_start(request, monkeypatch):
yield yield
@pytest.fixture(autouse=True)
def patch_fleet_cluster_config(request, monkeypatch):
"""When --fleet is active, override every step-class pubsub-topic attribute and
any module-level VALID_PUBSUB_TOPICS reference used in @pytest.mark.waku_test_fleet
tests so that local Docker nodes use cluster-id=1, shards 0-7 matching the
fleet node configs (config-n1.toml / config-n2.toml).
Without this patch the step classes default to VALID_PUBSUB_TOPICS (cluster-id
198) which the fleet nodes do not subscribe to, so no cross-fleet data flow
would actually be exercised.
"""
if not _fleet_bootstrap_enabled(request.config):
yield
return
from src.steps.relay import StepsRelay
from src.steps.filter import StepsFilter
from src.steps.light_push import StepsLightPush
from src.steps.store import StepsStore
from src.steps.rln import StepsRLN
import tests.relay.test_publish as _relay_publish_mod
# Step-class attributes: each class has test_pubsub_topic set to a cluster-198
# topic. Override them to the matching cluster-1 shard.
# StepsRelay → VALID_PUBSUB_TOPICS[1] → FLEET_PUBSUB_TOPICS[1] (/waku/2/rs/1/1)
# StepsFilter → VALID_PUBSUB_TOPICS[1] → FLEET_PUBSUB_TOPICS[1]
# second_pubsub_topic is VALID_PUBSUB_TOPICS[2] → FLEET_PUBSUB_TOPICS[2]
# StepsLightPush → VALID_PUBSUB_TOPICS[0] → FLEET_PUBSUB_TOPICS[0] (/waku/2/rs/1/0)
# StepsStore → VALID_PUBSUB_TOPICS[0] → FLEET_PUBSUB_TOPICS[0]
# StepsRLN → PUBSUB_TOPICS_RLN[0] (/waku/2/rs/198/0) → FLEET_PUBSUB_TOPICS[0] (/waku/2/rs/1/0)
monkeypatch.setattr(StepsRelay, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[1])
monkeypatch.setattr(StepsFilter, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[1])
monkeypatch.setattr(StepsFilter, "second_pubsub_topic", FLEET_PUBSUB_TOPICS[2])
monkeypatch.setattr(StepsLightPush, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0])
monkeypatch.setattr(StepsStore, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0])
monkeypatch.setattr(StepsRLN, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0])
# tests/relay/test_publish.py::test_publish_on_multiple_pubsub_topics has a
# @pytest.mark.waku_test_fleet marker and iterates over the module-level
# VALID_PUBSUB_TOPICS import directly. Patch that binding so the test uses
# cluster-1 topics instead.
monkeypatch.setattr(_relay_publish_mod, "VALID_PUBSUB_TOPICS", FLEET_PUBSUB_TOPICS)
# ── Light-push client topology fix ──────────────────────────────────────────
# In fleet mode the 3rd node started by light-push tests (light_push_node1,
# node_index=2) has relay=true but NO RLN membership (only 2 memberships are
# registered in fleet_rln_state). Connecting to a fleet peer that enforces
# rln-relay with no credentials causes nwaku to crash, producing the
# ConnectionResetError(54) seen in the ERROR lines.
#
# Fix: replace setup_lightpush_node with a fleet-aware version that
# 1. routes lightpush requests to FLEET_N1_MULTIADDR (Amsterdam fleet node,
# confirmed lightpush=true in config-n1.toml) so the fleet relay network
# delivers messages to fleet-connected receiving_node1/node2.
# 2. starts the client with relay=false so no RLN membership is needed.
# 3. does NOT add the client to main_receiving_nodes (relay=false); the
# assertion peers remain receiving_node1 and receiving_node2 only.
def _fleet_setup_lightpush_node(self, image, node_index, **kwargs):
from src.node.waku_node import WakuNode
node = WakuNode(image, f"lightpush_node{node_index}_{self.test_id}")
fleet_kwargs = dict(kwargs)
# Force relay=false so the node runs as a pure lightpush *client* (caller).
# This means:
# - No relay protocol is mounted → no RLN membership required.
# - The node dials FLEET_N1_MULTIADDR via the lightpush protocol to publish
# messages; the fleet relay network then delivers them to the fleet-connected
# receiving_node1 / receiving_node2.
# skip_fleet_peering prevents the bootstrap patch from also injecting a fleet
# staticnode + RLN creds (which would fail for the same reason).
# Pre-set cluster_id and shard so the pubsub topic subscriptions issued by the
# test body match FLEET_PUBSUB_TOPICS.
fleet_kwargs["relay"] = "false"
fleet_kwargs["skip_fleet_peering"] = True
fleet_kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID)
fleet_kwargs.setdefault("shard", list(range(8)))
node.start(lightpushnode=FLEET_N1_MULTIADDR, **fleet_kwargs)
# relay=false → do NOT add to main_receiving_nodes; assertion peers are
# receiving_node1 and receiving_node2 only.
self.add_node_peer(node, self.multiaddr_list)
logger.debug(
"fleet _fleet_setup_lightpush_node: node %d started with relay=false, " "skip_fleet_peering=True, lightpushnode=%s",
node_index,
FLEET_N1_MULTIADDR,
)
return node
monkeypatch.setattr(StepsLightPush, "setup_lightpush_node", _fleet_setup_lightpush_node)
logger.info(
"Fleet cluster patch active pubsub topics overridden to cluster-id=%s "
"(shards 0-7, e.g. test_pubsub_topic=%s rln_test_pubsub_topic=%s); "
"StepsLightPush.setup_lightpush_node overridden to use fleet relay %s",
FLEET_CLUSTER_ID,
FLEET_PUBSUB_TOPICS[1],
FLEET_PUBSUB_TOPICS[0],
FLEET_N1_MULTIADDR,
)
yield
# See https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures # See https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures
@pytest.hookimpl(hookwrapper=True, tryfirst=True) @pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(item): def pytest_runtest_makereport(item):

View File

@ -23,7 +23,12 @@ class TestGetMessages(StepsStore):
logger.error(f'Payload {payload["description"]} failed: {str(e)}') logger.error(f'Payload {payload["description"]} failed: {str(e)}')
failed_payloads.append(payload["description"]) failed_payloads.append(payload["description"])
assert not failed_payloads, f"Payloads failed: {failed_payloads}" assert not failed_payloads, f"Payloads failed: {failed_payloads}"
assert len(self.store_response.messages) == len(SAMPLE_INPUTS) # In fleet mode the archive may also store background fleet messages; count
# only the messages that belong to the test by matching the test content topic.
test_msgs = [m for m in self.store_response.messages if m.get("message", {}).get("contentTopic") == self.test_content_topic]
assert len(test_msgs) == len(SAMPLE_INPUTS), (
f"Expected {len(SAMPLE_INPUTS)} test messages but found {len(test_msgs)} " f"(total in store: {len(self.store_response.messages)})"
)
@pytest.mark.waku_test_fleet @pytest.mark.waku_test_fleet
def test_get_store_messages_with_different_content_topics(self): def test_get_store_messages_with_different_content_topics(self):
@ -80,8 +85,12 @@ class TestGetMessages(StepsStore):
self.publish_message(message=message) self.publish_message(message=message)
message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex")) message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex"))
for node in self.store_nodes: for node in self.store_nodes:
store_response = self.get_messages_from_store(node, page_size=50) # Scope the store query to the test content topic so that background fleet
assert len(store_response.messages) == len(SAMPLE_INPUTS) # messages archived on the same shard do not inflate the expected count.
store_response = self.get_messages_from_store(node, page_size=50, content_topics=self.test_content_topic)
assert len(store_response.messages) == len(
SAMPLE_INPUTS
), f"Expected {len(SAMPLE_INPUTS)} messages but got {len(store_response.messages)}"
for index in range(len(store_response.messages)): for index in range(len(store_response.messages)):
assert store_response.message_hash(index) == message_hash_list[node.type()][index], f"Message hash at index {index} doesn't match" assert store_response.message_hash(index) == message_hash_list[node.type()][index], f"Message hash at index {index} doesn't match"