From d6a23f95c29d14f5af4a3339a015b542f9cf2956 Mon Sep 17 00:00:00 2001 From: Roman Date: Wed, 29 Apr 2026 15:28:58 +0800 Subject: [PATCH] fix: refactor monkeypatch --- src/node/fleet_waku_node.py | 178 ++++++++++++++++++++++++ src/node/waku_node.py | 8 ++ src/test_config.py | 40 ++++++ tests/conftest.py | 263 +++++++++++++----------------------- 4 files changed, 317 insertions(+), 172 deletions(-) create mode 100644 src/node/fleet_waku_node.py create mode 100644 src/test_config.py diff --git a/src/node/fleet_waku_node.py b/src/node/fleet_waku_node.py new file mode 100644 index 000000000..650c159b3 --- /dev/null +++ b/src/node/fleet_waku_node.py @@ -0,0 +1,178 @@ +"""Fleet bootstrap configuration for WakuNode. + +When fleet bootstrap is active (``--fleet`` CLI flag or ``FLEET_BOOTSTRAP=true`` +env var), an instance of :class:`FleetBootstrapConfig` is assigned to +``WakuNode._pre_start_hook``. ``WakuNode.start()`` calls this hook before +processing any start arguments, injecting fleet-specific bootstrap parameters +into every local Docker node. + +This module encapsulates all fleet injection logic that was previously an +ad-hoc closure inside ``tests/conftest.py``, making it independently testable +and reusable. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from src.libs.custom_logger import get_custom_logger +from src.data_storage import DS +from src.env_vars import ( + FLEET_N1_MULTIADDR, + FLEET_N2_MULTIADDR, + FLEET_PRIMARY_MULTIADDR, + FLEET_DNS_DISCOVERY_URL, + RLN_CREDENTIALS, +) +from src.test_data import FLEET_CLUSTER_ID + +if TYPE_CHECKING: + from src.node.waku_node import WakuNode + +logger = get_custom_logger(__name__) + + +def _append_fleet_kwarg(kwargs: dict, key: str, value: Any) -> None: + """Append *value* to the kwargs entry *key*, creating a list when needed.""" + existing = kwargs.get(key) + if existing is None: + kwargs[key] = value + elif isinstance(existing, list): + if value not in existing: + kwargs[key] = existing + [value] + else: + if existing != value: + kwargs[key] = [existing, value] + + +@dataclass +class FleetBootstrapConfig: + """Holds fleet session state and implements the pre-start kwargs injection. + + One instance is created per pytest session when fleet bootstrap is active. + It is registered as ``WakuNode._pre_start_hook`` so that every call to + ``WakuNode.start()`` automatically receives fleet bootstrap arguments. + + Bootstrap assignment by node creation order (mirrors config-n*.toml files): + - NODE1 (1st started) → FLEET_N1_MULTIADDR (node-01.do-ams3) + - NODE2 (2nd started) → FLEET_N2_MULTIADDR (node-01.gc-us-central1-a) + - additional nodes → FLEET_PRIMARY_MULTIADDR (Amsterdam, same as NODE1) + + Direct bootstrap coupling between NODE1 and NODE2 is suppressed: any + ``discv5_bootstrap_node`` kwarg pointing to a local node ENR is removed; + fleet DNS discovery (``dns_discovery_url``) replaces it. + + Attributes: + fleet_rln_state: Dict with keys ``keystore_prefixes`` and + ``rln_membership_indexes`` populated by the ``fleet_rln_state`` + session fixture. + """ + + fleet_rln_state: dict + + def prepare_start_kwargs(self, node: "WakuNode", kwargs: dict) -> dict: + """Inject fleet bootstrap arguments into *kwargs* before node start. + + Transparent to callers: + + - Existing ``staticnode`` entries are preserved; the fleet multiaddr is + *appended* as an additional entry. + - ``dns_discovery`` / ``dns_discovery_url`` use ``setdefault`` semantics. + - RLN credentials are injected only when not already present and relay + is enabled. + - ``discv5_bootstrap_node`` pointing to a local ENR is removed so each + node bootstraps independently from its assigned fleet peer. + + The ``skip_fleet_peering`` kwarg (not a real nwaku flag) acts as an + escape-hatch: when ``True``, only ``discv5_bootstrap_node`` removal + happens; all other fleet injection is skipped. + + Returns: + The (possibly modified) *kwargs* dict. + """ + logger.debug("FleetBootstrapConfig.prepare_start_kwargs: injecting waku.test bootstrap args") + + if kwargs.pop("skip_fleet_peering", False): + kwargs.pop("discv5_bootstrap_node", None) + logger.debug( + "FleetBootstrapConfig: skip_fleet_peering=True – " "bypassing fleet bootstrap for this node (no fleet staticnode / shard injected)" + ) + return kwargs + + # Determine which fleet peer to connect to based on node creation order + # within the current test (DS.waku_nodes is reset to [] before each test). + # The append to DS.waku_nodes happens inside _start_docker/_start_wrapper, + # *after* start() returns, so len() here reflects the count of nodes + # already fully started. + node_index = len(DS.waku_nodes) + if node_index == 0: + fleet_multiaddr = FLEET_N1_MULTIADDR + logger.debug( + "FleetBootstrapConfig: NODE1 – bootstrapping from config-n1.toml (%s)", + fleet_multiaddr, + ) + elif node_index == 1: + fleet_multiaddr = FLEET_N2_MULTIADDR + logger.debug( + "FleetBootstrapConfig: NODE2 – bootstrapping from config-n2.toml (%s)", + fleet_multiaddr, + ) + else: + fleet_multiaddr = FLEET_PRIMARY_MULTIADDR + logger.debug( + "FleetBootstrapConfig: additional node %d – bootstrapping from primary (%s)", + node_index, + fleet_multiaddr, + ) + + _append_fleet_kwarg(kwargs, "staticnode", fleet_multiaddr) + kwargs.setdefault("dns_discovery", "true") + kwargs.setdefault("dns_discovery_url", FLEET_DNS_DISCOVERY_URL) + # Align local node cluster and shards with the fleet node configs + # (config-n1.toml / config-n2.toml both use cluster-id=1, shards 0-7). + kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID) + kwargs.setdefault("shard", list(range(8))) + + # Inject session-level RLN credentials into nodes that don't already + # carry explicit RLN args. Uses the same node_index as the fleet + # multiaddr assignment so NODE1 gets creds-id=1 and NODE2 gets creds-id=2. + # Only inject into nodes with relay enabled – filter/lightpush/store service + # nodes run without relay and nwaku rejects rln-relay when WakuRelay is not mounted. + rln_prefixes = self.fleet_rln_state.get("keystore_prefixes", []) + if rln_prefixes and kwargs.get("rln_creds_source") is None: + if str(kwargs.get("relay", "")).lower() == "true": + if node_index < len(rln_prefixes): + kwargs["rln_creds_source"] = RLN_CREDENTIALS + kwargs["rln_creds_id"] = str(node_index + 1) + kwargs["rln_keystore_prefix"] = rln_prefixes[node_index] + kwargs["rln_relay_membership_index"] = str(self.fleet_rln_state["rln_membership_indexes"][node_index]) + kwargs.setdefault("rln_relay_user_message_limit", "300") + logger.debug( + "FleetBootstrapConfig: injected session RLN for node %d – " "prefix=%s index=%s user_message_limit=300", + node_index, + kwargs["rln_keystore_prefix"], + kwargs["rln_relay_membership_index"], + ) + else: + logger.debug( + "FleetBootstrapConfig: skipping RLN injection for node %d – " "relay not enabled (relay=%r)", + node_index, + kwargs.get("relay"), + ) + + # Strip any local-node discv5 bootstrap ENR so that each node bootstraps + # independently from its assigned fleet peer rather than from another + # local container. The fleet DNS tree (dns_discovery_url) replaces it. + if "discv5_bootstrap_node" in kwargs: + logger.debug( + "FleetBootstrapConfig: dropping local discv5_bootstrap_node=%s " "(fleet DNS discovery replaces it)", + kwargs["discv5_bootstrap_node"], + ) + del kwargs["discv5_bootstrap_node"] + + logger.debug( + "FleetBootstrapConfig: staticnode=%s dns_discovery_url=%s", + kwargs.get("staticnode"), + kwargs.get("dns_discovery_url"), + ) + return kwargs diff --git a/src/node/waku_node.py b/src/node/waku_node.py index c53c75dbb..bb085e8ce 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -96,6 +96,12 @@ def resolve_sharding_flags(kwargs): class WakuNode: + # Optional pre-start hook: when set to a callable, it is invoked at the + # beginning of every start() call with (self, kwargs_dict) and must return + # the (possibly modified) kwargs dict. Set by tests/conftest.py when + # fleet bootstrap is active; None by default so all normal tests are unaffected. + _pre_start_hook = None + def __init__(self, docker_image, docker_log_prefix=""): self._image_name = docker_image self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log") @@ -113,6 +119,8 @@ class WakuNode: @retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True) def start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs): + if WakuNode._pre_start_hook is not None: + kwargs = WakuNode._pre_start_hook(self, kwargs) logger.debug("Starting Node...") default_args, remove_container = self._prepare_start_context(**kwargs) diff --git a/src/test_config.py b/src/test_config.py new file mode 100644 index 000000000..121c59e1e --- /dev/null +++ b/src/test_config.py @@ -0,0 +1,40 @@ +"""Test session configuration objects. + +These dataclasses carry configuration that varies between fleet and +non-fleet test runs. Fixtures in ``tests/conftest.py`` build and +provide the appropriate instance; step classes and tests consume them +via fixture injection rather than through hardcoded class attributes or +``monkeypatch``. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import List + + +@dataclass(frozen=True) +class PubsubConfig: + """Named pubsub-topic slots for a test session. + + A *default* instance uses ``VALID_PUBSUB_TOPICS`` / ``PUBSUB_TOPICS_RLN`` + (cluster-id 198). A *fleet* instance uses ``FLEET_PUBSUB_TOPICS`` + (cluster-id 1, shards 0-7). + + Attributes: + relay_test_topic: Primary topic used by ``StepsRelay`` tests. + filter_test_topic: Primary topic used by ``StepsFilter`` tests. + filter_second_topic: Secondary topic used by multi-topic filter tests. + lightpush_test_topic: Topic used by ``StepsLightPush`` tests. + store_test_topic: Topic used by ``StepsStore`` tests. + rln_test_topic: Topic used by ``StepsRLN`` tests. + all_topics: Full ordered topic list (used where a module-level + ``VALID_PUBSUB_TOPICS`` reference is iterated). + """ + + relay_test_topic: str + filter_test_topic: str + filter_second_topic: str + lightpush_test_topic: str + store_test_topic: str + rln_test_topic: str + all_topics: List[str] diff --git a/tests/conftest.py b/tests/conftest.py index 5d6143859..da199fb23 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,7 +4,7 @@ Root pytest configuration for the logos-delivery interop test suite. Fleet bootstrap – hybrid local+fleet -------------------------------------------------- -Every local Docker node spawned by a test is patched at start() time so +Every local Docker node spawned by a test is configured at start() time so that it connects to a live waku.test fleet peer as a static peer, and discovers all remaining fleet peers through the published ENR DNS tree. @@ -47,7 +47,8 @@ import src.env_vars as env_vars from src.env_vars import FLEET_PRIMARY_MULTIADDR, FLEET_DNS_DISCOVERY_URL, FLEET_N1_MULTIADDR, FLEET_N2_MULTIADDR from src.data_storage import DS from src.postgres_setup import start_postgres, stop_postgres -from src.test_data import FLEET_CLUSTER_ID, FLEET_PUBSUB_TOPICS, PUBSUB_TOPICS_RLN +from src.test_data import FLEET_CLUSTER_ID, FLEET_PUBSUB_TOPICS, PUBSUB_TOPICS_RLN, VALID_PUBSUB_TOPICS +from src.test_config import PubsubConfig logger = get_custom_logger(__name__) @@ -78,19 +79,6 @@ def _fleet_bootstrap_enabled(config) -> bool: return os.getenv("FLEET_BOOTSTRAP", "false").lower() == "true" -def _append_fleet_kwarg(kwargs: dict, key: str, value: str) -> None: - """Add *value* to the kwargs entry *key*, creating a list when needed.""" - existing = kwargs.get(key) - if existing is None: - kwargs[key] = value - elif isinstance(existing, list): - if value not in existing: - kwargs[key] = existing + [value] - else: - if existing != value: - kwargs[key] = [existing, value] - - @pytest.fixture(scope="session") def fleet_rln_state(request): """Register 2 RLN memberships once per test session when ``--fleet`` is active. @@ -142,129 +130,72 @@ def fleet_rln_state(request): yield state -@pytest.fixture(autouse=True) -def patch_waku_node_start(request, monkeypatch, fleet_rln_state): - """Monkey-patch WakuNode.start() to bootstrap every local node from the - waku.test fleet before the test body runs. +@pytest.fixture(scope="session") +def pubsub_cfg(request) -> PubsubConfig: + """Return the pubsub-topic configuration for the current session. - Active only when ``--fleet`` is passed to pytest or ``FLEET_BOOTSTRAP=true`` - is set. By default the patch is a no-op so all existing tests are unaffected. + Fleet mode (``--fleet`` / ``FLEET_BOOTSTRAP=true``) → cluster-id 1, shards 0-7. + Default mode → cluster-id 198 (``VALID_PUBSUB_TOPICS`` / ``PUBSUB_TOPICS_RLN``). + """ + if _fleet_bootstrap_enabled(request.config): + return PubsubConfig( + relay_test_topic=FLEET_PUBSUB_TOPICS[1], + filter_test_topic=FLEET_PUBSUB_TOPICS[1], + filter_second_topic=FLEET_PUBSUB_TOPICS[2], + lightpush_test_topic=FLEET_PUBSUB_TOPICS[0], + store_test_topic=FLEET_PUBSUB_TOPICS[0], + rln_test_topic=FLEET_PUBSUB_TOPICS[0], + all_topics=FLEET_PUBSUB_TOPICS, + ) + return PubsubConfig( + relay_test_topic=VALID_PUBSUB_TOPICS[1], + filter_test_topic=VALID_PUBSUB_TOPICS[1], + filter_second_topic=VALID_PUBSUB_TOPICS[2], + lightpush_test_topic=VALID_PUBSUB_TOPICS[0], + store_test_topic=VALID_PUBSUB_TOPICS[0], + rln_test_topic=PUBSUB_TOPICS_RLN[0], + all_topics=VALID_PUBSUB_TOPICS, + ) - The patch is transparent to existing tests: - - Any ``staticnode`` kwarg already provided is preserved; the fleet - multiaddr is *appended* as an additional entry. - - ``dns_discovery`` and ``dns_discovery_url`` are set only if the caller - has not already supplied them (``setdefault`` semantics). - - When ``fleet_rln_state`` contains registered credentials AND the caller - has not provided explicit RLN args, RLN credentials are injected - automatically so that the node can participate in the fleet's - RLN-protected relay. + +@pytest.fixture(scope="session", autouse=True) +def configure_fleet_bootstrap(request, fleet_rln_state): + """Register ``FleetBootstrapConfig`` as ``WakuNode._pre_start_hook`` for the session. + + Active only when ``--fleet`` is passed or ``FLEET_BOOTSTRAP=true`` is set. + The hook is cleared at session teardown so it does not leak across test + collection runs. + + Replaces the former per-test ``monkeypatch``-based ``patch_waku_node_start`` + fixture. Benefits of this approach: + + - **Session-scoped** – the hook is registered once, not reinstalled before + every test function. + - **Encapsulated** – all fleet injection logic lives in + :class:`src.node.fleet_waku_node.FleetBootstrapConfig`, making it + independently testable. + - **No closure over ``original_start``** – ``WakuNode.start`` is not + replaced; the hook is called from within it via a class variable. """ if not _fleet_bootstrap_enabled(request.config): logger.info("Fleet bootstrap inactive – pass --fleet (or set FLEET_BOOTSTRAP=true) " "to connect local nodes to the waku.test fleet") yield return + from src.node.fleet_waku_node import FleetBootstrapConfig from src.node.waku_node import WakuNode - from src.env_vars import RLN_CREDENTIALS - original_start = WakuNode.start - - def fleet_joined_start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs): - logger.debug("fleet_joined_start: injecting waku.test bootstrap args into WakuNode.start()") - - # Escape-hatch: callers that set skip_fleet_peering=True (e.g. lightpush client - # nodes that have no RLN membership) bypass all fleet bootstrap injection so - # they do NOT join the fleet relay mesh. discv5_bootstrap_node is still stripped - # to prevent accidental local-node coupling. All other kwargs are forwarded as-is. - if kwargs.pop("skip_fleet_peering", False): - if "discv5_bootstrap_node" in kwargs: - del kwargs["discv5_bootstrap_node"] - logger.debug( - "fleet_joined_start: skip_fleet_peering=True – bypassing fleet bootstrap " "for this node (no fleet staticnode / shard injected)" - ) - return original_start(self, wait_for_node_sec=wait_for_node_sec, use_wrapper=use_wrapper, **kwargs) - - # Determine which fleet peer to connect to based on node creation order - # within the current test (DS.waku_nodes is reset to [] before each test). - # The append to DS.waku_nodes happens inside _start_docker/_start_wrapper, - # *after* this function returns, so len() here reflects the count of nodes - # already fully started. - node_index = len(DS.waku_nodes) - if node_index == 0: - # First node started → NODE1 → mirrors config-n1.toml - fleet_multiaddr = FLEET_N1_MULTIADDR # node-01.do-ams3.waku.test.status.im - logger.debug("fleet_joined_start: NODE1 – bootstrapping from config-n1.toml (%s)", fleet_multiaddr) - elif node_index == 1: - # Second node started → NODE2 → mirrors config-n2.toml - fleet_multiaddr = FLEET_N2_MULTIADDR # node-01.gc-us-central1-a.waku.test.status.im - logger.debug("fleet_joined_start: NODE2 – bootstrapping from config-n2.toml (%s)", fleet_multiaddr) - else: - # Additional nodes fall back to the primary (Amsterdam) fleet peer - fleet_multiaddr = FLEET_PRIMARY_MULTIADDR - logger.debug("fleet_joined_start: additional node %d – bootstrapping from primary (%s)", node_index, fleet_multiaddr) - - _append_fleet_kwarg(kwargs, "staticnode", fleet_multiaddr) - kwargs.setdefault("dns_discovery", "true") - kwargs.setdefault("dns_discovery_url", FLEET_DNS_DISCOVERY_URL) - # Align local node cluster and shards with the fleet node configs - # (config-n1.toml / config-n2.toml both use cluster-id=1, shards 0-7). - kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID) - kwargs.setdefault("shard", list(range(8))) - - # Inject session-level RLN credentials into nodes that don't already - # carry explicit RLN args (i.e. the regular waku_test_fleet tests that - # are not themselves RLN tests). Uses the same node_index as the fleet - # multiaddr assignment so NODE1 gets creds-id=1 and NODE2 gets creds-id=2. - # Only inject into nodes with relay enabled – filter/lightpush/store service - # nodes run without relay and nwaku rejects rln-relay when WakuRelay is not mounted. - if fleet_rln_state["keystore_prefixes"] and kwargs.get("rln_creds_source") is None: - if str(kwargs.get("relay", "")).lower() == "true": - if node_index < len(fleet_rln_state["keystore_prefixes"]): - kwargs["rln_creds_source"] = RLN_CREDENTIALS - kwargs["rln_creds_id"] = str(node_index + 1) - kwargs["rln_keystore_prefix"] = fleet_rln_state["keystore_prefixes"][node_index] - kwargs["rln_relay_membership_index"] = str(fleet_rln_state["rln_membership_indexes"][node_index]) - kwargs.setdefault("rln_relay_user_message_limit", "300") - logger.debug( - "fleet_joined_start: injected session RLN for node %d – prefix=%s index=%s user_message_limit=300", - node_index, - kwargs["rln_keystore_prefix"], - kwargs["rln_relay_membership_index"], - ) - else: - logger.debug( - "fleet_joined_start: skipping RLN injection for node %d – relay not enabled (relay=%r)", - node_index, - kwargs.get("relay"), - ) - - # Strip any local-node discv5 bootstrap ENR so that each node bootstraps - # independently from its assigned fleet peer rather than from another local - # container. The fleet DNS tree (dns_discovery_url) replaces it. - if "discv5_bootstrap_node" in kwargs: - logger.debug( - "fleet_joined_start: dropping local discv5_bootstrap_node=%s " "(fleet DNS discovery replaces it)", - kwargs["discv5_bootstrap_node"], - ) - del kwargs["discv5_bootstrap_node"] - - logger.debug( - "fleet_joined_start: staticnode=%s dns_discovery_url=%s", - kwargs.get("staticnode"), - kwargs.get("dns_discovery_url"), - ) - return original_start(self, wait_for_node_sec=wait_for_node_sec, use_wrapper=use_wrapper, **kwargs) - - monkeypatch.setattr(WakuNode, "start", fleet_joined_start) + cfg = FleetBootstrapConfig(fleet_rln_state=fleet_rln_state) + WakuNode._pre_start_hook = cfg.prepare_start_kwargs logger.info( - "Fleet bootstrap patch active – NODE1→%s NODE2→%s (additional nodes→%s) dns_discovery_url=%s", + "Fleet bootstrap active – NODE1→%s NODE2→%s (additional nodes→%s) dns_discovery_url=%s", FLEET_N1_MULTIADDR, FLEET_N2_MULTIADDR, FLEET_PRIMARY_MULTIADDR, FLEET_DNS_DISCOVERY_URL, ) yield + WakuNode._pre_start_hook = None @pytest.fixture(scope="function", autouse=True) @@ -285,16 +216,24 @@ def skip_fleet_test_without_rln(request, fleet_rln_state): pytest.skip("Skipping fleet test: RLN keystore not available " "(RLN_CREDENTIALS not set or on-chain registration failed)") -@pytest.fixture(autouse=True) -def patch_fleet_cluster_config(request, monkeypatch): - """When --fleet is active, override every step-class pubsub-topic attribute and - any module-level VALID_PUBSUB_TOPICS reference used in @pytest.mark.waku_test_fleet - tests so that local Docker nodes use cluster-id=1, shards 0-7 – matching the - fleet node configs (config-n1.toml / config-n2.toml). +@pytest.fixture(scope="session", autouse=True) +def configure_fleet_cluster(request, pubsub_cfg): + """Apply fleet cluster configuration to step classes when ``--fleet`` is active. - Without this patch the step classes default to VALID_PUBSUB_TOPICS (cluster-id - 198) which the fleet nodes do not subscribe to, so no cross-fleet data flow - would actually be exercised. + Sets step-class pubsub-topic attributes and overrides + ``StepsLightPush.setup_lightpush_node`` **once** at session start from the + ``pubsub_cfg`` configuration object. + + Replaces the former per-test ``monkeypatch``-based ``patch_fleet_cluster_config`` + fixture. Benefits: + + - **Session-scoped** – class attributes are set once, not re-patched on + every test function. + - **Config-object driven** – topic values come from :class:`PubsubConfig` + rather than scattered inline constants; changing the mapping requires + editing one place. + - **No ``monkeypatch``** – direct class-attribute assignment; restoring + original values is unnecessary because the entire session uses fleet topics. """ if not _fleet_bootstrap_enabled(request.config): yield @@ -307,63 +246,43 @@ def patch_fleet_cluster_config(request, monkeypatch): from src.steps.rln import StepsRLN import tests.relay.test_publish as _relay_publish_mod - # Step-class attributes: each class has test_pubsub_topic set to a cluster-198 - # topic. Override them to the matching cluster-1 shard. - # StepsRelay → VALID_PUBSUB_TOPICS[1] → FLEET_PUBSUB_TOPICS[1] (/waku/2/rs/1/1) - # StepsFilter → VALID_PUBSUB_TOPICS[1] → FLEET_PUBSUB_TOPICS[1] - # second_pubsub_topic is VALID_PUBSUB_TOPICS[2] → FLEET_PUBSUB_TOPICS[2] - # StepsLightPush → VALID_PUBSUB_TOPICS[0] → FLEET_PUBSUB_TOPICS[0] (/waku/2/rs/1/0) - # StepsStore → VALID_PUBSUB_TOPICS[0] → FLEET_PUBSUB_TOPICS[0] - # StepsRLN → PUBSUB_TOPICS_RLN[0] (/waku/2/rs/198/0) → FLEET_PUBSUB_TOPICS[0] (/waku/2/rs/1/0) - monkeypatch.setattr(StepsRelay, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[1]) - monkeypatch.setattr(StepsFilter, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[1]) - monkeypatch.setattr(StepsFilter, "second_pubsub_topic", FLEET_PUBSUB_TOPICS[2]) - monkeypatch.setattr(StepsLightPush, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0]) - monkeypatch.setattr(StepsStore, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0]) - monkeypatch.setattr(StepsRLN, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0]) + # Override step-class topic attributes with fleet cluster-1 topics. + StepsRelay.test_pubsub_topic = pubsub_cfg.relay_test_topic + StepsFilter.test_pubsub_topic = pubsub_cfg.filter_test_topic + StepsFilter.second_pubsub_topic = pubsub_cfg.filter_second_topic + StepsLightPush.test_pubsub_topic = pubsub_cfg.lightpush_test_topic + StepsStore.test_pubsub_topic = pubsub_cfg.store_test_topic + StepsRLN.test_pubsub_topic = pubsub_cfg.rln_test_topic - # tests/relay/test_publish.py::test_publish_on_multiple_pubsub_topics has a - # @pytest.mark.waku_test_fleet marker and iterates over the module-level - # VALID_PUBSUB_TOPICS import directly. Patch that binding so the test uses - # cluster-1 topics instead. - monkeypatch.setattr(_relay_publish_mod, "VALID_PUBSUB_TOPICS", FLEET_PUBSUB_TOPICS) + # tests/relay/test_publish.py::test_publish_on_multiple_pubsub_topics iterates + # over the module-level VALID_PUBSUB_TOPICS import directly; rebind it. + _relay_publish_mod.VALID_PUBSUB_TOPICS = pubsub_cfg.all_topics # ── Light-push client topology fix ────────────────────────────────────────── # In fleet mode the 3rd node started by light-push tests (light_push_node1, # node_index=2) has relay=true but NO RLN membership (only 2 memberships are # registered in fleet_rln_state). Connecting to a fleet peer that enforces - # rln-relay with no credentials causes nwaku to crash, producing the - # ConnectionResetError(54) seen in the ERROR lines. + # rln-relay with no credentials causes nwaku to crash. # # Fix: replace setup_lightpush_node with a fleet-aware version that - # 1. routes lightpush requests to FLEET_N1_MULTIADDR (Amsterdam fleet node, - # confirmed lightpush=true in config-n1.toml) so the fleet relay network - # delivers messages to fleet-connected receiving_node1/node2. + # 1. routes lightpush requests to FLEET_N1_MULTIADDR so the fleet relay + # network delivers messages to fleet-connected receiving nodes. # 2. starts the client with relay=false so no RLN membership is needed. - # 3. does NOT add the client to main_receiving_nodes (relay=false); the - # assertion peers remain receiving_node1 and receiving_node2 only. + # 3. does NOT add the client to main_receiving_nodes; assertion peers + # remain receiving_node1 and receiving_node2 only. def _fleet_setup_lightpush_node(self, image, node_index, **kwargs): from src.node.waku_node import WakuNode node = WakuNode(image, f"lightpush_node{node_index}_{self.test_id}") fleet_kwargs = dict(kwargs) - # Force relay=false so the node runs as a pure lightpush *client* (caller). - # This means: - # - No relay protocol is mounted → no RLN membership required. - # - The node dials FLEET_N1_MULTIADDR via the lightpush protocol to publish - # messages; the fleet relay network then delivers them to the fleet-connected - # receiving_node1 / receiving_node2. - # skip_fleet_peering prevents the bootstrap patch from also injecting a fleet + # Force relay=false – pure lightpush client, no RLN membership required. + # skip_fleet_peering prevents the bootstrap hook from injecting a fleet # staticnode + RLN creds (which would fail for the same reason). - # Pre-set cluster_id and shard so the pubsub topic subscriptions issued by the - # test body match FLEET_PUBSUB_TOPICS. fleet_kwargs["relay"] = "false" fleet_kwargs["skip_fleet_peering"] = True fleet_kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID) fleet_kwargs.setdefault("shard", list(range(8))) node.start(lightpushnode=FLEET_N1_MULTIADDR, **fleet_kwargs) - # relay=false → do NOT add to main_receiving_nodes; assertion peers are - # receiving_node1 and receiving_node2 only. self.add_node_peer(node, self.multiaddr_list) logger.debug( "fleet _fleet_setup_lightpush_node: node %d started with relay=false, " "skip_fleet_peering=True, lightpushnode=%s", @@ -372,15 +291,15 @@ def patch_fleet_cluster_config(request, monkeypatch): ) return node - monkeypatch.setattr(StepsLightPush, "setup_lightpush_node", _fleet_setup_lightpush_node) + StepsLightPush.setup_lightpush_node = _fleet_setup_lightpush_node logger.info( - "Fleet cluster patch active – pubsub topics overridden to cluster-id=%s " - "(shards 0-7, e.g. test_pubsub_topic=%s rln_test_pubsub_topic=%s); " + "Fleet cluster config active – pubsub topics overridden to cluster-id=%s " + "(shards 0-7, e.g. relay_test_topic=%s rln_test_topic=%s); " "StepsLightPush.setup_lightpush_node overridden to use fleet relay %s", FLEET_CLUSTER_ID, - FLEET_PUBSUB_TOPICS[1], - FLEET_PUBSUB_TOPICS[0], + pubsub_cfg.relay_test_topic, + pubsub_cfg.rln_test_topic, FLEET_N1_MULTIADDR, ) yield