mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-05-24 11:19:43 +00:00
fix: refactor monkeypatch
This commit is contained in:
parent
5901a6f848
commit
d6a23f95c2
178
src/node/fleet_waku_node.py
Normal file
178
src/node/fleet_waku_node.py
Normal file
@ -0,0 +1,178 @@
|
||||
"""Fleet bootstrap configuration for WakuNode.
|
||||
|
||||
When fleet bootstrap is active (``--fleet`` CLI flag or ``FLEET_BOOTSTRAP=true``
|
||||
env var), an instance of :class:`FleetBootstrapConfig` is assigned to
|
||||
``WakuNode._pre_start_hook``. ``WakuNode.start()`` calls this hook before
|
||||
processing any start arguments, injecting fleet-specific bootstrap parameters
|
||||
into every local Docker node.
|
||||
|
||||
This module encapsulates all fleet injection logic that was previously an
|
||||
ad-hoc closure inside ``tests/conftest.py``, making it independently testable
|
||||
and reusable.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.data_storage import DS
|
||||
from src.env_vars import (
|
||||
FLEET_N1_MULTIADDR,
|
||||
FLEET_N2_MULTIADDR,
|
||||
FLEET_PRIMARY_MULTIADDR,
|
||||
FLEET_DNS_DISCOVERY_URL,
|
||||
RLN_CREDENTIALS,
|
||||
)
|
||||
from src.test_data import FLEET_CLUSTER_ID
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from src.node.waku_node import WakuNode
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
def _append_fleet_kwarg(kwargs: dict, key: str, value: Any) -> None:
|
||||
"""Append *value* to the kwargs entry *key*, creating a list when needed."""
|
||||
existing = kwargs.get(key)
|
||||
if existing is None:
|
||||
kwargs[key] = value
|
||||
elif isinstance(existing, list):
|
||||
if value not in existing:
|
||||
kwargs[key] = existing + [value]
|
||||
else:
|
||||
if existing != value:
|
||||
kwargs[key] = [existing, value]
|
||||
|
||||
|
||||
@dataclass
|
||||
class FleetBootstrapConfig:
|
||||
"""Holds fleet session state and implements the pre-start kwargs injection.
|
||||
|
||||
One instance is created per pytest session when fleet bootstrap is active.
|
||||
It is registered as ``WakuNode._pre_start_hook`` so that every call to
|
||||
``WakuNode.start()`` automatically receives fleet bootstrap arguments.
|
||||
|
||||
Bootstrap assignment by node creation order (mirrors config-n*.toml files):
|
||||
- NODE1 (1st started) → FLEET_N1_MULTIADDR (node-01.do-ams3)
|
||||
- NODE2 (2nd started) → FLEET_N2_MULTIADDR (node-01.gc-us-central1-a)
|
||||
- additional nodes → FLEET_PRIMARY_MULTIADDR (Amsterdam, same as NODE1)
|
||||
|
||||
Direct bootstrap coupling between NODE1 and NODE2 is suppressed: any
|
||||
``discv5_bootstrap_node`` kwarg pointing to a local node ENR is removed;
|
||||
fleet DNS discovery (``dns_discovery_url``) replaces it.
|
||||
|
||||
Attributes:
|
||||
fleet_rln_state: Dict with keys ``keystore_prefixes`` and
|
||||
``rln_membership_indexes`` populated by the ``fleet_rln_state``
|
||||
session fixture.
|
||||
"""
|
||||
|
||||
fleet_rln_state: dict
|
||||
|
||||
def prepare_start_kwargs(self, node: "WakuNode", kwargs: dict) -> dict:
|
||||
"""Inject fleet bootstrap arguments into *kwargs* before node start.
|
||||
|
||||
Transparent to callers:
|
||||
|
||||
- Existing ``staticnode`` entries are preserved; the fleet multiaddr is
|
||||
*appended* as an additional entry.
|
||||
- ``dns_discovery`` / ``dns_discovery_url`` use ``setdefault`` semantics.
|
||||
- RLN credentials are injected only when not already present and relay
|
||||
is enabled.
|
||||
- ``discv5_bootstrap_node`` pointing to a local ENR is removed so each
|
||||
node bootstraps independently from its assigned fleet peer.
|
||||
|
||||
The ``skip_fleet_peering`` kwarg (not a real nwaku flag) acts as an
|
||||
escape-hatch: when ``True``, only ``discv5_bootstrap_node`` removal
|
||||
happens; all other fleet injection is skipped.
|
||||
|
||||
Returns:
|
||||
The (possibly modified) *kwargs* dict.
|
||||
"""
|
||||
logger.debug("FleetBootstrapConfig.prepare_start_kwargs: injecting waku.test bootstrap args")
|
||||
|
||||
if kwargs.pop("skip_fleet_peering", False):
|
||||
kwargs.pop("discv5_bootstrap_node", None)
|
||||
logger.debug(
|
||||
"FleetBootstrapConfig: skip_fleet_peering=True – " "bypassing fleet bootstrap for this node (no fleet staticnode / shard injected)"
|
||||
)
|
||||
return kwargs
|
||||
|
||||
# Determine which fleet peer to connect to based on node creation order
|
||||
# within the current test (DS.waku_nodes is reset to [] before each test).
|
||||
# The append to DS.waku_nodes happens inside _start_docker/_start_wrapper,
|
||||
# *after* start() returns, so len() here reflects the count of nodes
|
||||
# already fully started.
|
||||
node_index = len(DS.waku_nodes)
|
||||
if node_index == 0:
|
||||
fleet_multiaddr = FLEET_N1_MULTIADDR
|
||||
logger.debug(
|
||||
"FleetBootstrapConfig: NODE1 – bootstrapping from config-n1.toml (%s)",
|
||||
fleet_multiaddr,
|
||||
)
|
||||
elif node_index == 1:
|
||||
fleet_multiaddr = FLEET_N2_MULTIADDR
|
||||
logger.debug(
|
||||
"FleetBootstrapConfig: NODE2 – bootstrapping from config-n2.toml (%s)",
|
||||
fleet_multiaddr,
|
||||
)
|
||||
else:
|
||||
fleet_multiaddr = FLEET_PRIMARY_MULTIADDR
|
||||
logger.debug(
|
||||
"FleetBootstrapConfig: additional node %d – bootstrapping from primary (%s)",
|
||||
node_index,
|
||||
fleet_multiaddr,
|
||||
)
|
||||
|
||||
_append_fleet_kwarg(kwargs, "staticnode", fleet_multiaddr)
|
||||
kwargs.setdefault("dns_discovery", "true")
|
||||
kwargs.setdefault("dns_discovery_url", FLEET_DNS_DISCOVERY_URL)
|
||||
# Align local node cluster and shards with the fleet node configs
|
||||
# (config-n1.toml / config-n2.toml both use cluster-id=1, shards 0-7).
|
||||
kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID)
|
||||
kwargs.setdefault("shard", list(range(8)))
|
||||
|
||||
# Inject session-level RLN credentials into nodes that don't already
|
||||
# carry explicit RLN args. Uses the same node_index as the fleet
|
||||
# multiaddr assignment so NODE1 gets creds-id=1 and NODE2 gets creds-id=2.
|
||||
# Only inject into nodes with relay enabled – filter/lightpush/store service
|
||||
# nodes run without relay and nwaku rejects rln-relay when WakuRelay is not mounted.
|
||||
rln_prefixes = self.fleet_rln_state.get("keystore_prefixes", [])
|
||||
if rln_prefixes and kwargs.get("rln_creds_source") is None:
|
||||
if str(kwargs.get("relay", "")).lower() == "true":
|
||||
if node_index < len(rln_prefixes):
|
||||
kwargs["rln_creds_source"] = RLN_CREDENTIALS
|
||||
kwargs["rln_creds_id"] = str(node_index + 1)
|
||||
kwargs["rln_keystore_prefix"] = rln_prefixes[node_index]
|
||||
kwargs["rln_relay_membership_index"] = str(self.fleet_rln_state["rln_membership_indexes"][node_index])
|
||||
kwargs.setdefault("rln_relay_user_message_limit", "300")
|
||||
logger.debug(
|
||||
"FleetBootstrapConfig: injected session RLN for node %d – " "prefix=%s index=%s user_message_limit=300",
|
||||
node_index,
|
||||
kwargs["rln_keystore_prefix"],
|
||||
kwargs["rln_relay_membership_index"],
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
"FleetBootstrapConfig: skipping RLN injection for node %d – " "relay not enabled (relay=%r)",
|
||||
node_index,
|
||||
kwargs.get("relay"),
|
||||
)
|
||||
|
||||
# Strip any local-node discv5 bootstrap ENR so that each node bootstraps
|
||||
# independently from its assigned fleet peer rather than from another
|
||||
# local container. The fleet DNS tree (dns_discovery_url) replaces it.
|
||||
if "discv5_bootstrap_node" in kwargs:
|
||||
logger.debug(
|
||||
"FleetBootstrapConfig: dropping local discv5_bootstrap_node=%s " "(fleet DNS discovery replaces it)",
|
||||
kwargs["discv5_bootstrap_node"],
|
||||
)
|
||||
del kwargs["discv5_bootstrap_node"]
|
||||
|
||||
logger.debug(
|
||||
"FleetBootstrapConfig: staticnode=%s dns_discovery_url=%s",
|
||||
kwargs.get("staticnode"),
|
||||
kwargs.get("dns_discovery_url"),
|
||||
)
|
||||
return kwargs
|
||||
@ -96,6 +96,12 @@ def resolve_sharding_flags(kwargs):
|
||||
|
||||
|
||||
class WakuNode:
|
||||
# Optional pre-start hook: when set to a callable, it is invoked at the
|
||||
# beginning of every start() call with (self, kwargs_dict) and must return
|
||||
# the (possibly modified) kwargs dict. Set by tests/conftest.py when
|
||||
# fleet bootstrap is active; None by default so all normal tests are unaffected.
|
||||
_pre_start_hook = None
|
||||
|
||||
def __init__(self, docker_image, docker_log_prefix=""):
|
||||
self._image_name = docker_image
|
||||
self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log")
|
||||
@ -113,6 +119,8 @@ class WakuNode:
|
||||
|
||||
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
|
||||
def start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs):
|
||||
if WakuNode._pre_start_hook is not None:
|
||||
kwargs = WakuNode._pre_start_hook(self, kwargs)
|
||||
logger.debug("Starting Node...")
|
||||
default_args, remove_container = self._prepare_start_context(**kwargs)
|
||||
|
||||
|
||||
40
src/test_config.py
Normal file
40
src/test_config.py
Normal file
@ -0,0 +1,40 @@
|
||||
"""Test session configuration objects.
|
||||
|
||||
These dataclasses carry configuration that varies between fleet and
|
||||
non-fleet test runs. Fixtures in ``tests/conftest.py`` build and
|
||||
provide the appropriate instance; step classes and tests consume them
|
||||
via fixture injection rather than through hardcoded class attributes or
|
||||
``monkeypatch``.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import List
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PubsubConfig:
|
||||
"""Named pubsub-topic slots for a test session.
|
||||
|
||||
A *default* instance uses ``VALID_PUBSUB_TOPICS`` / ``PUBSUB_TOPICS_RLN``
|
||||
(cluster-id 198). A *fleet* instance uses ``FLEET_PUBSUB_TOPICS``
|
||||
(cluster-id 1, shards 0-7).
|
||||
|
||||
Attributes:
|
||||
relay_test_topic: Primary topic used by ``StepsRelay`` tests.
|
||||
filter_test_topic: Primary topic used by ``StepsFilter`` tests.
|
||||
filter_second_topic: Secondary topic used by multi-topic filter tests.
|
||||
lightpush_test_topic: Topic used by ``StepsLightPush`` tests.
|
||||
store_test_topic: Topic used by ``StepsStore`` tests.
|
||||
rln_test_topic: Topic used by ``StepsRLN`` tests.
|
||||
all_topics: Full ordered topic list (used where a module-level
|
||||
``VALID_PUBSUB_TOPICS`` reference is iterated).
|
||||
"""
|
||||
|
||||
relay_test_topic: str
|
||||
filter_test_topic: str
|
||||
filter_second_topic: str
|
||||
lightpush_test_topic: str
|
||||
store_test_topic: str
|
||||
rln_test_topic: str
|
||||
all_topics: List[str]
|
||||
@ -4,7 +4,7 @@ Root pytest configuration for the logos-delivery interop test suite.
|
||||
|
||||
Fleet bootstrap – hybrid local+fleet
|
||||
--------------------------------------------------
|
||||
Every local Docker node spawned by a test is patched at start() time so
|
||||
Every local Docker node spawned by a test is configured at start() time so
|
||||
that it connects to a live waku.test fleet peer as a static peer, and
|
||||
discovers all remaining fleet peers through the published ENR DNS tree.
|
||||
|
||||
@ -47,7 +47,8 @@ import src.env_vars as env_vars
|
||||
from src.env_vars import FLEET_PRIMARY_MULTIADDR, FLEET_DNS_DISCOVERY_URL, FLEET_N1_MULTIADDR, FLEET_N2_MULTIADDR
|
||||
from src.data_storage import DS
|
||||
from src.postgres_setup import start_postgres, stop_postgres
|
||||
from src.test_data import FLEET_CLUSTER_ID, FLEET_PUBSUB_TOPICS, PUBSUB_TOPICS_RLN
|
||||
from src.test_data import FLEET_CLUSTER_ID, FLEET_PUBSUB_TOPICS, PUBSUB_TOPICS_RLN, VALID_PUBSUB_TOPICS
|
||||
from src.test_config import PubsubConfig
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
@ -78,19 +79,6 @@ def _fleet_bootstrap_enabled(config) -> bool:
|
||||
return os.getenv("FLEET_BOOTSTRAP", "false").lower() == "true"
|
||||
|
||||
|
||||
def _append_fleet_kwarg(kwargs: dict, key: str, value: str) -> None:
|
||||
"""Add *value* to the kwargs entry *key*, creating a list when needed."""
|
||||
existing = kwargs.get(key)
|
||||
if existing is None:
|
||||
kwargs[key] = value
|
||||
elif isinstance(existing, list):
|
||||
if value not in existing:
|
||||
kwargs[key] = existing + [value]
|
||||
else:
|
||||
if existing != value:
|
||||
kwargs[key] = [existing, value]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def fleet_rln_state(request):
|
||||
"""Register 2 RLN memberships once per test session when ``--fleet`` is active.
|
||||
@ -142,129 +130,72 @@ def fleet_rln_state(request):
|
||||
yield state
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_waku_node_start(request, monkeypatch, fleet_rln_state):
|
||||
"""Monkey-patch WakuNode.start() to bootstrap every local node from the
|
||||
waku.test fleet before the test body runs.
|
||||
@pytest.fixture(scope="session")
|
||||
def pubsub_cfg(request) -> PubsubConfig:
|
||||
"""Return the pubsub-topic configuration for the current session.
|
||||
|
||||
Active only when ``--fleet`` is passed to pytest or ``FLEET_BOOTSTRAP=true``
|
||||
is set. By default the patch is a no-op so all existing tests are unaffected.
|
||||
Fleet mode (``--fleet`` / ``FLEET_BOOTSTRAP=true``) → cluster-id 1, shards 0-7.
|
||||
Default mode → cluster-id 198 (``VALID_PUBSUB_TOPICS`` / ``PUBSUB_TOPICS_RLN``).
|
||||
"""
|
||||
if _fleet_bootstrap_enabled(request.config):
|
||||
return PubsubConfig(
|
||||
relay_test_topic=FLEET_PUBSUB_TOPICS[1],
|
||||
filter_test_topic=FLEET_PUBSUB_TOPICS[1],
|
||||
filter_second_topic=FLEET_PUBSUB_TOPICS[2],
|
||||
lightpush_test_topic=FLEET_PUBSUB_TOPICS[0],
|
||||
store_test_topic=FLEET_PUBSUB_TOPICS[0],
|
||||
rln_test_topic=FLEET_PUBSUB_TOPICS[0],
|
||||
all_topics=FLEET_PUBSUB_TOPICS,
|
||||
)
|
||||
return PubsubConfig(
|
||||
relay_test_topic=VALID_PUBSUB_TOPICS[1],
|
||||
filter_test_topic=VALID_PUBSUB_TOPICS[1],
|
||||
filter_second_topic=VALID_PUBSUB_TOPICS[2],
|
||||
lightpush_test_topic=VALID_PUBSUB_TOPICS[0],
|
||||
store_test_topic=VALID_PUBSUB_TOPICS[0],
|
||||
rln_test_topic=PUBSUB_TOPICS_RLN[0],
|
||||
all_topics=VALID_PUBSUB_TOPICS,
|
||||
)
|
||||
|
||||
The patch is transparent to existing tests:
|
||||
- Any ``staticnode`` kwarg already provided is preserved; the fleet
|
||||
multiaddr is *appended* as an additional entry.
|
||||
- ``dns_discovery`` and ``dns_discovery_url`` are set only if the caller
|
||||
has not already supplied them (``setdefault`` semantics).
|
||||
- When ``fleet_rln_state`` contains registered credentials AND the caller
|
||||
has not provided explicit RLN args, RLN credentials are injected
|
||||
automatically so that the node can participate in the fleet's
|
||||
RLN-protected relay.
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def configure_fleet_bootstrap(request, fleet_rln_state):
|
||||
"""Register ``FleetBootstrapConfig`` as ``WakuNode._pre_start_hook`` for the session.
|
||||
|
||||
Active only when ``--fleet`` is passed or ``FLEET_BOOTSTRAP=true`` is set.
|
||||
The hook is cleared at session teardown so it does not leak across test
|
||||
collection runs.
|
||||
|
||||
Replaces the former per-test ``monkeypatch``-based ``patch_waku_node_start``
|
||||
fixture. Benefits of this approach:
|
||||
|
||||
- **Session-scoped** – the hook is registered once, not reinstalled before
|
||||
every test function.
|
||||
- **Encapsulated** – all fleet injection logic lives in
|
||||
:class:`src.node.fleet_waku_node.FleetBootstrapConfig`, making it
|
||||
independently testable.
|
||||
- **No closure over ``original_start``** – ``WakuNode.start`` is not
|
||||
replaced; the hook is called from within it via a class variable.
|
||||
"""
|
||||
if not _fleet_bootstrap_enabled(request.config):
|
||||
logger.info("Fleet bootstrap inactive – pass --fleet (or set FLEET_BOOTSTRAP=true) " "to connect local nodes to the waku.test fleet")
|
||||
yield
|
||||
return
|
||||
|
||||
from src.node.fleet_waku_node import FleetBootstrapConfig
|
||||
from src.node.waku_node import WakuNode
|
||||
from src.env_vars import RLN_CREDENTIALS
|
||||
|
||||
original_start = WakuNode.start
|
||||
|
||||
def fleet_joined_start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs):
|
||||
logger.debug("fleet_joined_start: injecting waku.test bootstrap args into WakuNode.start()")
|
||||
|
||||
# Escape-hatch: callers that set skip_fleet_peering=True (e.g. lightpush client
|
||||
# nodes that have no RLN membership) bypass all fleet bootstrap injection so
|
||||
# they do NOT join the fleet relay mesh. discv5_bootstrap_node is still stripped
|
||||
# to prevent accidental local-node coupling. All other kwargs are forwarded as-is.
|
||||
if kwargs.pop("skip_fleet_peering", False):
|
||||
if "discv5_bootstrap_node" in kwargs:
|
||||
del kwargs["discv5_bootstrap_node"]
|
||||
logger.debug(
|
||||
"fleet_joined_start: skip_fleet_peering=True – bypassing fleet bootstrap " "for this node (no fleet staticnode / shard injected)"
|
||||
)
|
||||
return original_start(self, wait_for_node_sec=wait_for_node_sec, use_wrapper=use_wrapper, **kwargs)
|
||||
|
||||
# Determine which fleet peer to connect to based on node creation order
|
||||
# within the current test (DS.waku_nodes is reset to [] before each test).
|
||||
# The append to DS.waku_nodes happens inside _start_docker/_start_wrapper,
|
||||
# *after* this function returns, so len() here reflects the count of nodes
|
||||
# already fully started.
|
||||
node_index = len(DS.waku_nodes)
|
||||
if node_index == 0:
|
||||
# First node started → NODE1 → mirrors config-n1.toml
|
||||
fleet_multiaddr = FLEET_N1_MULTIADDR # node-01.do-ams3.waku.test.status.im
|
||||
logger.debug("fleet_joined_start: NODE1 – bootstrapping from config-n1.toml (%s)", fleet_multiaddr)
|
||||
elif node_index == 1:
|
||||
# Second node started → NODE2 → mirrors config-n2.toml
|
||||
fleet_multiaddr = FLEET_N2_MULTIADDR # node-01.gc-us-central1-a.waku.test.status.im
|
||||
logger.debug("fleet_joined_start: NODE2 – bootstrapping from config-n2.toml (%s)", fleet_multiaddr)
|
||||
else:
|
||||
# Additional nodes fall back to the primary (Amsterdam) fleet peer
|
||||
fleet_multiaddr = FLEET_PRIMARY_MULTIADDR
|
||||
logger.debug("fleet_joined_start: additional node %d – bootstrapping from primary (%s)", node_index, fleet_multiaddr)
|
||||
|
||||
_append_fleet_kwarg(kwargs, "staticnode", fleet_multiaddr)
|
||||
kwargs.setdefault("dns_discovery", "true")
|
||||
kwargs.setdefault("dns_discovery_url", FLEET_DNS_DISCOVERY_URL)
|
||||
# Align local node cluster and shards with the fleet node configs
|
||||
# (config-n1.toml / config-n2.toml both use cluster-id=1, shards 0-7).
|
||||
kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID)
|
||||
kwargs.setdefault("shard", list(range(8)))
|
||||
|
||||
# Inject session-level RLN credentials into nodes that don't already
|
||||
# carry explicit RLN args (i.e. the regular waku_test_fleet tests that
|
||||
# are not themselves RLN tests). Uses the same node_index as the fleet
|
||||
# multiaddr assignment so NODE1 gets creds-id=1 and NODE2 gets creds-id=2.
|
||||
# Only inject into nodes with relay enabled – filter/lightpush/store service
|
||||
# nodes run without relay and nwaku rejects rln-relay when WakuRelay is not mounted.
|
||||
if fleet_rln_state["keystore_prefixes"] and kwargs.get("rln_creds_source") is None:
|
||||
if str(kwargs.get("relay", "")).lower() == "true":
|
||||
if node_index < len(fleet_rln_state["keystore_prefixes"]):
|
||||
kwargs["rln_creds_source"] = RLN_CREDENTIALS
|
||||
kwargs["rln_creds_id"] = str(node_index + 1)
|
||||
kwargs["rln_keystore_prefix"] = fleet_rln_state["keystore_prefixes"][node_index]
|
||||
kwargs["rln_relay_membership_index"] = str(fleet_rln_state["rln_membership_indexes"][node_index])
|
||||
kwargs.setdefault("rln_relay_user_message_limit", "300")
|
||||
logger.debug(
|
||||
"fleet_joined_start: injected session RLN for node %d – prefix=%s index=%s user_message_limit=300",
|
||||
node_index,
|
||||
kwargs["rln_keystore_prefix"],
|
||||
kwargs["rln_relay_membership_index"],
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
"fleet_joined_start: skipping RLN injection for node %d – relay not enabled (relay=%r)",
|
||||
node_index,
|
||||
kwargs.get("relay"),
|
||||
)
|
||||
|
||||
# Strip any local-node discv5 bootstrap ENR so that each node bootstraps
|
||||
# independently from its assigned fleet peer rather than from another local
|
||||
# container. The fleet DNS tree (dns_discovery_url) replaces it.
|
||||
if "discv5_bootstrap_node" in kwargs:
|
||||
logger.debug(
|
||||
"fleet_joined_start: dropping local discv5_bootstrap_node=%s " "(fleet DNS discovery replaces it)",
|
||||
kwargs["discv5_bootstrap_node"],
|
||||
)
|
||||
del kwargs["discv5_bootstrap_node"]
|
||||
|
||||
logger.debug(
|
||||
"fleet_joined_start: staticnode=%s dns_discovery_url=%s",
|
||||
kwargs.get("staticnode"),
|
||||
kwargs.get("dns_discovery_url"),
|
||||
)
|
||||
return original_start(self, wait_for_node_sec=wait_for_node_sec, use_wrapper=use_wrapper, **kwargs)
|
||||
|
||||
monkeypatch.setattr(WakuNode, "start", fleet_joined_start)
|
||||
cfg = FleetBootstrapConfig(fleet_rln_state=fleet_rln_state)
|
||||
WakuNode._pre_start_hook = cfg.prepare_start_kwargs
|
||||
logger.info(
|
||||
"Fleet bootstrap patch active – NODE1→%s NODE2→%s (additional nodes→%s) dns_discovery_url=%s",
|
||||
"Fleet bootstrap active – NODE1→%s NODE2→%s (additional nodes→%s) dns_discovery_url=%s",
|
||||
FLEET_N1_MULTIADDR,
|
||||
FLEET_N2_MULTIADDR,
|
||||
FLEET_PRIMARY_MULTIADDR,
|
||||
FLEET_DNS_DISCOVERY_URL,
|
||||
)
|
||||
yield
|
||||
WakuNode._pre_start_hook = None
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
@ -285,16 +216,24 @@ def skip_fleet_test_without_rln(request, fleet_rln_state):
|
||||
pytest.skip("Skipping fleet test: RLN keystore not available " "(RLN_CREDENTIALS not set or on-chain registration failed)")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_fleet_cluster_config(request, monkeypatch):
|
||||
"""When --fleet is active, override every step-class pubsub-topic attribute and
|
||||
any module-level VALID_PUBSUB_TOPICS reference used in @pytest.mark.waku_test_fleet
|
||||
tests so that local Docker nodes use cluster-id=1, shards 0-7 – matching the
|
||||
fleet node configs (config-n1.toml / config-n2.toml).
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def configure_fleet_cluster(request, pubsub_cfg):
|
||||
"""Apply fleet cluster configuration to step classes when ``--fleet`` is active.
|
||||
|
||||
Without this patch the step classes default to VALID_PUBSUB_TOPICS (cluster-id
|
||||
198) which the fleet nodes do not subscribe to, so no cross-fleet data flow
|
||||
would actually be exercised.
|
||||
Sets step-class pubsub-topic attributes and overrides
|
||||
``StepsLightPush.setup_lightpush_node`` **once** at session start from the
|
||||
``pubsub_cfg`` configuration object.
|
||||
|
||||
Replaces the former per-test ``monkeypatch``-based ``patch_fleet_cluster_config``
|
||||
fixture. Benefits:
|
||||
|
||||
- **Session-scoped** – class attributes are set once, not re-patched on
|
||||
every test function.
|
||||
- **Config-object driven** – topic values come from :class:`PubsubConfig`
|
||||
rather than scattered inline constants; changing the mapping requires
|
||||
editing one place.
|
||||
- **No ``monkeypatch``** – direct class-attribute assignment; restoring
|
||||
original values is unnecessary because the entire session uses fleet topics.
|
||||
"""
|
||||
if not _fleet_bootstrap_enabled(request.config):
|
||||
yield
|
||||
@ -307,63 +246,43 @@ def patch_fleet_cluster_config(request, monkeypatch):
|
||||
from src.steps.rln import StepsRLN
|
||||
import tests.relay.test_publish as _relay_publish_mod
|
||||
|
||||
# Step-class attributes: each class has test_pubsub_topic set to a cluster-198
|
||||
# topic. Override them to the matching cluster-1 shard.
|
||||
# StepsRelay → VALID_PUBSUB_TOPICS[1] → FLEET_PUBSUB_TOPICS[1] (/waku/2/rs/1/1)
|
||||
# StepsFilter → VALID_PUBSUB_TOPICS[1] → FLEET_PUBSUB_TOPICS[1]
|
||||
# second_pubsub_topic is VALID_PUBSUB_TOPICS[2] → FLEET_PUBSUB_TOPICS[2]
|
||||
# StepsLightPush → VALID_PUBSUB_TOPICS[0] → FLEET_PUBSUB_TOPICS[0] (/waku/2/rs/1/0)
|
||||
# StepsStore → VALID_PUBSUB_TOPICS[0] → FLEET_PUBSUB_TOPICS[0]
|
||||
# StepsRLN → PUBSUB_TOPICS_RLN[0] (/waku/2/rs/198/0) → FLEET_PUBSUB_TOPICS[0] (/waku/2/rs/1/0)
|
||||
monkeypatch.setattr(StepsRelay, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[1])
|
||||
monkeypatch.setattr(StepsFilter, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[1])
|
||||
monkeypatch.setattr(StepsFilter, "second_pubsub_topic", FLEET_PUBSUB_TOPICS[2])
|
||||
monkeypatch.setattr(StepsLightPush, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0])
|
||||
monkeypatch.setattr(StepsStore, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0])
|
||||
monkeypatch.setattr(StepsRLN, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0])
|
||||
# Override step-class topic attributes with fleet cluster-1 topics.
|
||||
StepsRelay.test_pubsub_topic = pubsub_cfg.relay_test_topic
|
||||
StepsFilter.test_pubsub_topic = pubsub_cfg.filter_test_topic
|
||||
StepsFilter.second_pubsub_topic = pubsub_cfg.filter_second_topic
|
||||
StepsLightPush.test_pubsub_topic = pubsub_cfg.lightpush_test_topic
|
||||
StepsStore.test_pubsub_topic = pubsub_cfg.store_test_topic
|
||||
StepsRLN.test_pubsub_topic = pubsub_cfg.rln_test_topic
|
||||
|
||||
# tests/relay/test_publish.py::test_publish_on_multiple_pubsub_topics has a
|
||||
# @pytest.mark.waku_test_fleet marker and iterates over the module-level
|
||||
# VALID_PUBSUB_TOPICS import directly. Patch that binding so the test uses
|
||||
# cluster-1 topics instead.
|
||||
monkeypatch.setattr(_relay_publish_mod, "VALID_PUBSUB_TOPICS", FLEET_PUBSUB_TOPICS)
|
||||
# tests/relay/test_publish.py::test_publish_on_multiple_pubsub_topics iterates
|
||||
# over the module-level VALID_PUBSUB_TOPICS import directly; rebind it.
|
||||
_relay_publish_mod.VALID_PUBSUB_TOPICS = pubsub_cfg.all_topics
|
||||
|
||||
# ── Light-push client topology fix ──────────────────────────────────────────
|
||||
# In fleet mode the 3rd node started by light-push tests (light_push_node1,
|
||||
# node_index=2) has relay=true but NO RLN membership (only 2 memberships are
|
||||
# registered in fleet_rln_state). Connecting to a fleet peer that enforces
|
||||
# rln-relay with no credentials causes nwaku to crash, producing the
|
||||
# ConnectionResetError(54) seen in the ERROR lines.
|
||||
# rln-relay with no credentials causes nwaku to crash.
|
||||
#
|
||||
# Fix: replace setup_lightpush_node with a fleet-aware version that
|
||||
# 1. routes lightpush requests to FLEET_N1_MULTIADDR (Amsterdam fleet node,
|
||||
# confirmed lightpush=true in config-n1.toml) so the fleet relay network
|
||||
# delivers messages to fleet-connected receiving_node1/node2.
|
||||
# 1. routes lightpush requests to FLEET_N1_MULTIADDR so the fleet relay
|
||||
# network delivers messages to fleet-connected receiving nodes.
|
||||
# 2. starts the client with relay=false so no RLN membership is needed.
|
||||
# 3. does NOT add the client to main_receiving_nodes (relay=false); the
|
||||
# assertion peers remain receiving_node1 and receiving_node2 only.
|
||||
# 3. does NOT add the client to main_receiving_nodes; assertion peers
|
||||
# remain receiving_node1 and receiving_node2 only.
|
||||
def _fleet_setup_lightpush_node(self, image, node_index, **kwargs):
|
||||
from src.node.waku_node import WakuNode
|
||||
|
||||
node = WakuNode(image, f"lightpush_node{node_index}_{self.test_id}")
|
||||
fleet_kwargs = dict(kwargs)
|
||||
# Force relay=false so the node runs as a pure lightpush *client* (caller).
|
||||
# This means:
|
||||
# - No relay protocol is mounted → no RLN membership required.
|
||||
# - The node dials FLEET_N1_MULTIADDR via the lightpush protocol to publish
|
||||
# messages; the fleet relay network then delivers them to the fleet-connected
|
||||
# receiving_node1 / receiving_node2.
|
||||
# skip_fleet_peering prevents the bootstrap patch from also injecting a fleet
|
||||
# Force relay=false – pure lightpush client, no RLN membership required.
|
||||
# skip_fleet_peering prevents the bootstrap hook from injecting a fleet
|
||||
# staticnode + RLN creds (which would fail for the same reason).
|
||||
# Pre-set cluster_id and shard so the pubsub topic subscriptions issued by the
|
||||
# test body match FLEET_PUBSUB_TOPICS.
|
||||
fleet_kwargs["relay"] = "false"
|
||||
fleet_kwargs["skip_fleet_peering"] = True
|
||||
fleet_kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID)
|
||||
fleet_kwargs.setdefault("shard", list(range(8)))
|
||||
node.start(lightpushnode=FLEET_N1_MULTIADDR, **fleet_kwargs)
|
||||
# relay=false → do NOT add to main_receiving_nodes; assertion peers are
|
||||
# receiving_node1 and receiving_node2 only.
|
||||
self.add_node_peer(node, self.multiaddr_list)
|
||||
logger.debug(
|
||||
"fleet _fleet_setup_lightpush_node: node %d started with relay=false, " "skip_fleet_peering=True, lightpushnode=%s",
|
||||
@ -372,15 +291,15 @@ def patch_fleet_cluster_config(request, monkeypatch):
|
||||
)
|
||||
return node
|
||||
|
||||
monkeypatch.setattr(StepsLightPush, "setup_lightpush_node", _fleet_setup_lightpush_node)
|
||||
StepsLightPush.setup_lightpush_node = _fleet_setup_lightpush_node
|
||||
|
||||
logger.info(
|
||||
"Fleet cluster patch active – pubsub topics overridden to cluster-id=%s "
|
||||
"(shards 0-7, e.g. test_pubsub_topic=%s rln_test_pubsub_topic=%s); "
|
||||
"Fleet cluster config active – pubsub topics overridden to cluster-id=%s "
|
||||
"(shards 0-7, e.g. relay_test_topic=%s rln_test_topic=%s); "
|
||||
"StepsLightPush.setup_lightpush_node overridden to use fleet relay %s",
|
||||
FLEET_CLUSTER_ID,
|
||||
FLEET_PUBSUB_TOPICS[1],
|
||||
FLEET_PUBSUB_TOPICS[0],
|
||||
pubsub_cfg.relay_test_topic,
|
||||
pubsub_cfg.rln_test_topic,
|
||||
FLEET_N1_MULTIADDR,
|
||||
)
|
||||
yield
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user