diff --git a/src/steps/filter.py b/src/steps/filter.py index 8a5d90c41..39429770b 100644 --- a/src/steps/filter.py +++ b/src/steps/filter.py @@ -95,9 +95,15 @@ class StepsFilter(StepsCommon): for index, peer in enumerate(peer_list): logger.debug(f"Checking that peer NODE_{index + 2}:{peer.image} can find the published message") get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topic=pubsub_topic, node=peer) - assert get_messages_response, f"Peer NODE_{index + 2}:{peer.image} couldn't find any messages" - assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" - waku_message = WakuMessage(get_messages_response) + # get_filter_messages already scopes to the requested content topic; the + # additional filter guards against any residual or fleet messages that may + # have been queued under the same topic before this call. + test_messages = [m for m in get_messages_response if m.get("contentTopic") == message["contentTopic"]] + assert test_messages, f"Peer NODE_{index + 2}:{peer.image} couldn't find any messages" + assert len(test_messages) == 1, ( + f"Expected 1 test message but got {len(test_messages)} " f"(total messages returned: {len(get_messages_response)})" + ) + waku_message = WakuMessage(test_messages) waku_message.assert_received_message(message) @allure.step diff --git a/src/steps/light_push.py b/src/steps/light_push.py index fd7da42d0..94d2410b0 100644 --- a/src/steps/light_push.py +++ b/src/steps/light_push.py @@ -124,9 +124,15 @@ class StepsLightPush(StepsCommon): for index, peer in enumerate(peer_list): logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the lightpushed message") get_messages_response = peer.get_relay_messages(pubsub_topic) - assert get_messages_response, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" - assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" - waku_message = WakuMessage(get_messages_response) + # In fleet mode the relay cache may contain background messages from other + # fleet participants. Filter to only messages whose contentTopic matches + # the test message so that fleet noise does not break the count assertion. + test_messages = [m for m in get_messages_response if m.get("contentTopic") == payload["message"]["contentTopic"]] + assert test_messages, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" + assert len(test_messages) == 1, ( + f"Expected 1 test message but got {len(test_messages)} " f"(total messages in cache: {len(get_messages_response)})" + ) + waku_message = WakuMessage(test_messages) waku_message.assert_received_message(payload["message"]) @allure.step diff --git a/src/steps/relay.py b/src/steps/relay.py index 840461bbe..b801f848b 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -122,9 +122,15 @@ class StepsRelay(StepsCommon): for index, peer in enumerate(peer_list): logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") get_messages_response = peer.get_relay_messages(pubsub_topic) - assert get_messages_response, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" - assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" - waku_message = WakuMessage(get_messages_response) + # In fleet mode the relay cache may contain background messages from other + # fleet participants. Filter to only the message whose contentTopic matches + # what the test sent so that fleet noise does not break the count assertion. + test_messages = [m for m in get_messages_response if m.get("contentTopic") == message["contentTopic"]] + assert test_messages, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" + assert len(test_messages) == 1, ( + f"Expected 1 test message but got {len(test_messages)} " f"(total messages in cache: {len(get_messages_response)})" + ) + waku_message = WakuMessage(test_messages) waku_message.assert_received_message(message) @allure.step diff --git a/src/test_data.py b/src/test_data.py index 659dd39ae..3bb3f5476 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -105,6 +105,11 @@ VALID_PUBSUB_TOPICS = [ f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1000", ] +# Fleet cluster configuration – mirrors config-n1.toml / config-n2.toml +# Both fleet nodes run cluster-id=1 with shards 0-7. +FLEET_CLUSTER_ID = "1" +FLEET_PUBSUB_TOPICS = [f"/waku/2/rs/{FLEET_CLUSTER_ID}/{i}" for i in range(8)] + PUBSUB_TOPICS_STORE = [ f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0", f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1", diff --git a/tests/conftest.py b/tests/conftest.py index 5ae323d8e..1b1099886 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -34,17 +34,20 @@ Activation (opt-in, disabled by default): """ import inspect import glob +import random +import string from src.libs.custom_logger import get_custom_logger import os import pytest from datetime import datetime from time import time from uuid import uuid4 -from src.libs.common import attach_allure_file +from src.libs.common import attach_allure_file, gen_step_id import src.env_vars as env_vars from src.env_vars import FLEET_PRIMARY_MULTIADDR, FLEET_DNS_DISCOVERY_URL, FLEET_N1_MULTIADDR, FLEET_N2_MULTIADDR from src.data_storage import DS from src.postgres_setup import start_postgres, stop_postgres +from src.test_data import FLEET_CLUSTER_ID, FLEET_PUBSUB_TOPICS, PUBSUB_TOPICS_RLN logger = get_custom_logger(__name__) @@ -88,8 +91,59 @@ def _append_fleet_kwarg(kwargs: dict, key: str, value: str) -> None: kwargs[key] = [existing, value] +@pytest.fixture(scope="session") +def fleet_rln_state(request): + """Register 2 RLN memberships once per test session when ``--fleet`` is active. + + The on-disk keystore directories created here are reused by every test in + the session so that the expensive blockchain registration only happens once. + + Yields a dict with keys: + ``keystore_prefixes`` – list[str] of random 4-char directory prefixes + ``rln_membership_indexes`` – list[int | None] returned by register_rln() + + An empty dict (both lists empty) is yielded when fleet bootstrap is not + active or when ``RLN_CREDENTIALS`` is not set. + """ + if not _fleet_bootstrap_enabled(request.config): + yield {"keystore_prefixes": [], "rln_membership_indexes": []} + return + + from src.node.waku_node import WakuNode + from src.env_vars import RLN_CREDENTIALS, DEFAULT_NWAKU + + if not RLN_CREDENTIALS: + logger.info("Fleet RLN: RLN_CREDENTIALS not set – nodes will start without RLN") + yield {"keystore_prefixes": [], "rln_membership_indexes": []} + return + + state: dict = {"keystore_prefixes": [], "rln_membership_indexes": []} + try: + for i in range(2): + prefix = "".join(random.choices(string.ascii_lowercase, k=4)) + node = WakuNode(DEFAULT_NWAKU, f"rln_reg_{i + 1}_{gen_step_id()}") + membership_index = node.register_rln( + rln_keystore_prefix=prefix, + rln_creds_source=RLN_CREDENTIALS, + rln_creds_id=str(i + 1), + ) + state["keystore_prefixes"].append(prefix) + state["rln_membership_indexes"].append(membership_index) + logger.info( + "Fleet RLN: registered %d memberships – indexes=%s prefixes=%s", + len(state["rln_membership_indexes"]), + state["rln_membership_indexes"], + state["keystore_prefixes"], + ) + except Exception as ex: + logger.warning("Fleet RLN: registration failed – nodes will start without RLN: %s", ex) + state = {"keystore_prefixes": [], "rln_membership_indexes": []} + + yield state + + @pytest.fixture(autouse=True) -def patch_waku_node_start(request, monkeypatch): +def patch_waku_node_start(request, monkeypatch, fleet_rln_state): """Monkey-patch WakuNode.start() to bootstrap every local node from the waku.test fleet before the test body runs. @@ -101,6 +155,10 @@ def patch_waku_node_start(request, monkeypatch): multiaddr is *appended* as an additional entry. - ``dns_discovery`` and ``dns_discovery_url`` are set only if the caller has not already supplied them (``setdefault`` semantics). + - When ``fleet_rln_state`` contains registered credentials AND the caller + has not provided explicit RLN args, RLN credentials are injected + automatically so that the node can participate in the fleet's + RLN-protected relay. """ if not _fleet_bootstrap_enabled(request.config): logger.info("Fleet bootstrap inactive – pass --fleet (or set FLEET_BOOTSTRAP=true) " "to connect local nodes to the waku.test fleet") @@ -108,12 +166,25 @@ def patch_waku_node_start(request, monkeypatch): return from src.node.waku_node import WakuNode + from src.env_vars import RLN_CREDENTIALS original_start = WakuNode.start def fleet_joined_start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs): logger.debug("fleet_joined_start: injecting waku.test bootstrap args into WakuNode.start()") + # Escape-hatch: callers that set skip_fleet_peering=True (e.g. lightpush client + # nodes that have no RLN membership) bypass all fleet bootstrap injection so + # they do NOT join the fleet relay mesh. discv5_bootstrap_node is still stripped + # to prevent accidental local-node coupling. All other kwargs are forwarded as-is. + if kwargs.pop("skip_fleet_peering", False): + if "discv5_bootstrap_node" in kwargs: + del kwargs["discv5_bootstrap_node"] + logger.debug( + "fleet_joined_start: skip_fleet_peering=True – bypassing fleet bootstrap " "for this node (no fleet staticnode / shard injected)" + ) + return original_start(self, wait_for_node_sec=wait_for_node_sec, use_wrapper=use_wrapper, **kwargs) + # Determine which fleet peer to connect to based on node creation order # within the current test (DS.waku_nodes is reset to [] before each test). # The append to DS.waku_nodes happens inside _start_docker/_start_wrapper, @@ -136,6 +207,37 @@ def patch_waku_node_start(request, monkeypatch): _append_fleet_kwarg(kwargs, "staticnode", fleet_multiaddr) kwargs.setdefault("dns_discovery", "true") kwargs.setdefault("dns_discovery_url", FLEET_DNS_DISCOVERY_URL) + # Align local node cluster and shards with the fleet node configs + # (config-n1.toml / config-n2.toml both use cluster-id=1, shards 0-7). + kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID) + kwargs.setdefault("shard", list(range(8))) + + # Inject session-level RLN credentials into nodes that don't already + # carry explicit RLN args (i.e. the regular waku_test_fleet tests that + # are not themselves RLN tests). Uses the same node_index as the fleet + # multiaddr assignment so NODE1 gets creds-id=1 and NODE2 gets creds-id=2. + # Only inject into nodes with relay enabled – filter/lightpush/store service + # nodes run without relay and nwaku rejects rln-relay when WakuRelay is not mounted. + if fleet_rln_state["keystore_prefixes"] and kwargs.get("rln_creds_source") is None: + if str(kwargs.get("relay", "")).lower() == "true": + if node_index < len(fleet_rln_state["keystore_prefixes"]): + kwargs["rln_creds_source"] = RLN_CREDENTIALS + kwargs["rln_creds_id"] = str(node_index + 1) + kwargs["rln_keystore_prefix"] = fleet_rln_state["keystore_prefixes"][node_index] + kwargs["rln_relay_membership_index"] = str(fleet_rln_state["rln_membership_indexes"][node_index]) + kwargs.setdefault("rln_relay_user_message_limit", "300") + logger.debug( + "fleet_joined_start: injected session RLN for node %d – prefix=%s index=%s user_message_limit=300", + node_index, + kwargs["rln_keystore_prefix"], + kwargs["rln_relay_membership_index"], + ) + else: + logger.debug( + "fleet_joined_start: skipping RLN injection for node %d – relay not enabled (relay=%r)", + node_index, + kwargs.get("relay"), + ) # Strip any local-node discv5 bootstrap ENR so that each node bootstraps # independently from its assigned fleet peer rather than from another local @@ -165,6 +267,107 @@ def patch_waku_node_start(request, monkeypatch): yield +@pytest.fixture(autouse=True) +def patch_fleet_cluster_config(request, monkeypatch): + """When --fleet is active, override every step-class pubsub-topic attribute and + any module-level VALID_PUBSUB_TOPICS reference used in @pytest.mark.waku_test_fleet + tests so that local Docker nodes use cluster-id=1, shards 0-7 – matching the + fleet node configs (config-n1.toml / config-n2.toml). + + Without this patch the step classes default to VALID_PUBSUB_TOPICS (cluster-id + 198) which the fleet nodes do not subscribe to, so no cross-fleet data flow + would actually be exercised. + """ + if not _fleet_bootstrap_enabled(request.config): + yield + return + + from src.steps.relay import StepsRelay + from src.steps.filter import StepsFilter + from src.steps.light_push import StepsLightPush + from src.steps.store import StepsStore + from src.steps.rln import StepsRLN + import tests.relay.test_publish as _relay_publish_mod + + # Step-class attributes: each class has test_pubsub_topic set to a cluster-198 + # topic. Override them to the matching cluster-1 shard. + # StepsRelay → VALID_PUBSUB_TOPICS[1] → FLEET_PUBSUB_TOPICS[1] (/waku/2/rs/1/1) + # StepsFilter → VALID_PUBSUB_TOPICS[1] → FLEET_PUBSUB_TOPICS[1] + # second_pubsub_topic is VALID_PUBSUB_TOPICS[2] → FLEET_PUBSUB_TOPICS[2] + # StepsLightPush → VALID_PUBSUB_TOPICS[0] → FLEET_PUBSUB_TOPICS[0] (/waku/2/rs/1/0) + # StepsStore → VALID_PUBSUB_TOPICS[0] → FLEET_PUBSUB_TOPICS[0] + # StepsRLN → PUBSUB_TOPICS_RLN[0] (/waku/2/rs/198/0) → FLEET_PUBSUB_TOPICS[0] (/waku/2/rs/1/0) + monkeypatch.setattr(StepsRelay, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[1]) + monkeypatch.setattr(StepsFilter, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[1]) + monkeypatch.setattr(StepsFilter, "second_pubsub_topic", FLEET_PUBSUB_TOPICS[2]) + monkeypatch.setattr(StepsLightPush, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0]) + monkeypatch.setattr(StepsStore, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0]) + monkeypatch.setattr(StepsRLN, "test_pubsub_topic", FLEET_PUBSUB_TOPICS[0]) + + # tests/relay/test_publish.py::test_publish_on_multiple_pubsub_topics has a + # @pytest.mark.waku_test_fleet marker and iterates over the module-level + # VALID_PUBSUB_TOPICS import directly. Patch that binding so the test uses + # cluster-1 topics instead. + monkeypatch.setattr(_relay_publish_mod, "VALID_PUBSUB_TOPICS", FLEET_PUBSUB_TOPICS) + + # ── Light-push client topology fix ────────────────────────────────────────── + # In fleet mode the 3rd node started by light-push tests (light_push_node1, + # node_index=2) has relay=true but NO RLN membership (only 2 memberships are + # registered in fleet_rln_state). Connecting to a fleet peer that enforces + # rln-relay with no credentials causes nwaku to crash, producing the + # ConnectionResetError(54) seen in the ERROR lines. + # + # Fix: replace setup_lightpush_node with a fleet-aware version that + # 1. routes lightpush requests to FLEET_N1_MULTIADDR (Amsterdam fleet node, + # confirmed lightpush=true in config-n1.toml) so the fleet relay network + # delivers messages to fleet-connected receiving_node1/node2. + # 2. starts the client with relay=false so no RLN membership is needed. + # 3. does NOT add the client to main_receiving_nodes (relay=false); the + # assertion peers remain receiving_node1 and receiving_node2 only. + def _fleet_setup_lightpush_node(self, image, node_index, **kwargs): + from src.node.waku_node import WakuNode + + node = WakuNode(image, f"lightpush_node{node_index}_{self.test_id}") + fleet_kwargs = dict(kwargs) + # Force relay=false so the node runs as a pure lightpush *client* (caller). + # This means: + # - No relay protocol is mounted → no RLN membership required. + # - The node dials FLEET_N1_MULTIADDR via the lightpush protocol to publish + # messages; the fleet relay network then delivers them to the fleet-connected + # receiving_node1 / receiving_node2. + # skip_fleet_peering prevents the bootstrap patch from also injecting a fleet + # staticnode + RLN creds (which would fail for the same reason). + # Pre-set cluster_id and shard so the pubsub topic subscriptions issued by the + # test body match FLEET_PUBSUB_TOPICS. + fleet_kwargs["relay"] = "false" + fleet_kwargs["skip_fleet_peering"] = True + fleet_kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID) + fleet_kwargs.setdefault("shard", list(range(8))) + node.start(lightpushnode=FLEET_N1_MULTIADDR, **fleet_kwargs) + # relay=false → do NOT add to main_receiving_nodes; assertion peers are + # receiving_node1 and receiving_node2 only. + self.add_node_peer(node, self.multiaddr_list) + logger.debug( + "fleet _fleet_setup_lightpush_node: node %d started with relay=false, " "skip_fleet_peering=True, lightpushnode=%s", + node_index, + FLEET_N1_MULTIADDR, + ) + return node + + monkeypatch.setattr(StepsLightPush, "setup_lightpush_node", _fleet_setup_lightpush_node) + + logger.info( + "Fleet cluster patch active – pubsub topics overridden to cluster-id=%s " + "(shards 0-7, e.g. test_pubsub_topic=%s rln_test_pubsub_topic=%s); " + "StepsLightPush.setup_lightpush_node overridden to use fleet relay %s", + FLEET_CLUSTER_ID, + FLEET_PUBSUB_TOPICS[1], + FLEET_PUBSUB_TOPICS[0], + FLEET_N1_MULTIADDR, + ) + yield + + # See https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures @pytest.hookimpl(hookwrapper=True, tryfirst=True) def pytest_runtest_makereport(item): diff --git a/tests/store/test_get_messages.py b/tests/store/test_get_messages.py index 486bc21a2..9e461d4ab 100644 --- a/tests/store/test_get_messages.py +++ b/tests/store/test_get_messages.py @@ -23,7 +23,12 @@ class TestGetMessages(StepsStore): logger.error(f'Payload {payload["description"]} failed: {str(e)}') failed_payloads.append(payload["description"]) assert not failed_payloads, f"Payloads failed: {failed_payloads}" - assert len(self.store_response.messages) == len(SAMPLE_INPUTS) + # In fleet mode the archive may also store background fleet messages; count + # only the messages that belong to the test by matching the test content topic. + test_msgs = [m for m in self.store_response.messages if m.get("message", {}).get("contentTopic") == self.test_content_topic] + assert len(test_msgs) == len(SAMPLE_INPUTS), ( + f"Expected {len(SAMPLE_INPUTS)} test messages but found {len(test_msgs)} " f"(total in store: {len(self.store_response.messages)})" + ) @pytest.mark.waku_test_fleet def test_get_store_messages_with_different_content_topics(self): @@ -80,8 +85,12 @@ class TestGetMessages(StepsStore): self.publish_message(message=message) message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex")) for node in self.store_nodes: - store_response = self.get_messages_from_store(node, page_size=50) - assert len(store_response.messages) == len(SAMPLE_INPUTS) + # Scope the store query to the test content topic so that background fleet + # messages archived on the same shard do not inflate the expected count. + store_response = self.get_messages_from_store(node, page_size=50, content_topics=self.test_content_topic) + assert len(store_response.messages) == len( + SAMPLE_INPUTS + ), f"Expected {len(SAMPLE_INPUTS)} messages but got {len(store_response.messages)}" for index in range(len(store_response.messages)): assert store_response.message_hash(index) == message_hash_list[node.type()][index], f"Message hash at index {index} doesn't match"