From 34092b6efc2d7bdf3e069784a0a1dc954f3b92de Mon Sep 17 00:00:00 2001 From: Roman Zajic Date: Fri, 8 May 2026 14:59:20 +0800 Subject: [PATCH] chore: Test waku fleet (#175) * chore: build logos delivery lib locally * test: soft attachment to waku.test * chore: node1 node2 bootstrap from test fleet nodes - selected test cases relay, store * fix: cleanup artifacts after liblogosdelivery build * chore: add fleet tests workflow * fix: trigger on push and test * fix: register markers * test: add light_push to fleet tests * test: add filter to fleet tests * fix: add more store tests to fleet tests * fix: add more relay to fleet tests * fix: wf efficiency * fix: wf syntax * test: join fleet with real cluster ID shards RLN on * fix: stop fleet tests when RLN registration fails * fix: refactor monkeypatch * fix: light_push tests * fix: scoped assertion for store test in fleet mode * fix: reduce comments * fix: different propagation delay for fleet test * fix: add fresh timestamp helper * fix: reduce comments * test: change to Waku v0.38.0 image temporarily * fix: reduce log message * fix: undo reduce log message * fix: add scheduled run at 2 am. * fix: fail fleet tests instead of skip when RLN is not working * fix: refactor get_sample_timestamps * fix: remove on push trigger for fleet wf - reset back to use the latest docker image --- .github/workflows/fleet_tests.yml | 37 +++++ .github/workflows/test_common.yml | 23 ++- pytest.ini | 2 + scripts/build_logos_delivery_lib.sh | 120 ++++++++++++++++ src/env_vars.py | 14 ++ src/node/docker_mananger.py | 1 + src/node/fleet_waku_node.py | 138 ++++++++++++++++++ src/node/waku_node.py | 9 +- src/steps/filter.py | 12 +- src/steps/light_push.py | 14 +- src/steps/relay.py | 12 +- src/test_config.py | 27 ++++ src/test_data.py | 63 +++++--- tests/conftest.py | 199 +++++++++++++++++++++++++- tests/filter/test_get_messages.py | 10 +- tests/filter/test_subscribe_create.py | 4 + tests/filter/test_subscribe_update.py | 2 + tests/filter/test_unsubscribe.py | 3 + tests/filter/test_unsubscribe_all.py | 2 + tests/light_push/test_publish.py | 19 ++- tests/relay/test_publish.py | 14 +- tests/store/test_get_messages.py | 25 +++- 22 files changed, 703 insertions(+), 47 deletions(-) create mode 100644 .github/workflows/fleet_tests.yml create mode 100755 scripts/build_logos_delivery_lib.sh create mode 100644 src/node/fleet_waku_node.py create mode 100644 src/test_config.py diff --git a/.github/workflows/fleet_tests.yml b/.github/workflows/fleet_tests.yml new file mode 100644 index 000000000..fda82f69e --- /dev/null +++ b/.github/workflows/fleet_tests.yml @@ -0,0 +1,37 @@ +name: Waku Fleet Tests + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: false + +on: + schedule: + - cron: '0 2 * * *' + workflow_dispatch: + inputs: + node1: + required: true + description: "Node that usually publishes messages. Used for all tests" + type: string + default: "wakuorg/nwaku:latest" + node2: + required: true + description: "Node that usually queries for published messages. Used for all tests" + type: string + default: "wakuorg/nwaku:latest" + additional_nodes: + required: false + description: "Additional optional nodes used in e2e tests, separated by ," + type: string + default: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest" + +jobs: + test-common: + uses: ./.github/workflows/test_common.yml + secrets: inherit + with: + node1: ${{ inputs.node1 || 'wakuorg/nwaku:latest' }} + node2: ${{ inputs.node2 || 'wakuorg/nwaku:latest' }} + additional_nodes: ${{ inputs.additional_nodes || 'wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest' }} + fleet_tests: true + caller: "fleet" \ No newline at end of file diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index 2b276404a..397d8fbf0 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -22,6 +22,11 @@ on: required: false description: "Workflow caller. Used in reporting" type: string + fleet_tests: + required: false + description: "Run fleet tests only" + type: boolean + default: false env: FORCE_COLOR: "1" @@ -37,7 +42,7 @@ jobs: strategy: fail-fast: false matrix: - shard: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] + shard: ${{ inputs.fleet_tests && fromJSON('[0]') || fromJSON('[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17]') }} # total number of shards =18 means tests will split into 18 thread and run in parallel to increase execution speed # command for sharding : # pytest --shard-id= --num-shards= @@ -142,6 +147,10 @@ jobs: echo "Built library:" ls -l "$BINDINGS_DIR/lib/liblogosdelivery.so" + + # Cleanup build artifacts + rm -rf .gh-pages/nim + - name: Verify wrapper library run: | @@ -170,7 +179,15 @@ jobs: export PATH="$HOME/.nimble/bin:$PATH" export PYTHONPATH="$(pwd)/vendor/logos-delivery-python-bindings/waku:$PYTHONPATH" - if [ "${{ matrix.shard }}" == "16" ]; then + if [ "${{ inputs.fleet_tests }}" == "true" ]; then + if [ "${{ matrix.shard }}" == "0" ]; then + pytest --fleet -m waku_test_fleet \ + --ignore=vendor/logos-delivery-python-bindings/tests \ + --alluredir=allure-results-${{ matrix.shard }} + else + echo "Skipping shard ${{ matrix.shard }}: fleet tests run on shard 0 only" + fi + elif [ "${{ matrix.shard }}" == "16" ]; then pytest tests/relay/test_rln.py \ --ignore=vendor/logos-delivery-python-bindings/tests \ --alluredir=allure-results-${{ matrix.shard }} @@ -328,7 +345,7 @@ jobs: for key in $(jq -r 'keys[]' results.json); do result=$(jq -r --arg key "$key" '.[$key]' results.json) echo "Key: $key, Value: $result" - if [ "$result" != "success" ]; then + if [ -n "$result" ] && [ "$result" != "success" ]; then echo "Value 'success' not found at key: $key" TESTS_RESULT="failure" break diff --git a/pytest.ini b/pytest.ini index 3a13b3b9c..f945bd8a0 100644 --- a/pytest.ini +++ b/pytest.ini @@ -12,3 +12,5 @@ log_file_format = %(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s timeout = 300 markers = smoke: marks tests as smoke test (deselect with '-m "not smoke"') + waku_test_fleet: marks tests that run against a live Waku test fleet + store2000: marks tests that use 2000 store messages diff --git a/scripts/build_logos_delivery_lib.sh b/scripts/build_logos_delivery_lib.sh new file mode 100755 index 000000000..be3c1d6b4 --- /dev/null +++ b/scripts/build_logos_delivery_lib.sh @@ -0,0 +1,120 @@ +#!/usr/bin/env bash +# Prerequisites: +# - Nim / nimble / choosenim installed (e.g. via https://nim-lang.org/choosenim) +# - make, gcc / g++ (or clang on macOS) +# - python3 available on PATH +# - git submodules initialised: +# git submodule update --init --recursive + +set -euo pipefail + +# ── Resolve repository root ─────────────────────────────────────────────────── +# The script lives in /scripts/, so go one level up. +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" + +# ── Resolve Python interpreter ──────────────────────────────────────────────── +# Prefer the project's own virtual environment (cffi + all requirements). +# Checks both the hidden (.venv) and non-hidden (venv) conventional names. +# Falls back to whatever python3 is on PATH (e.g. in CI after pip install). +if [ -x "$REPO_ROOT/.venv/bin/python" ]; then + PYTHON="$REPO_ROOT/.venv/bin/python" + echo "Using venv Python: $PYTHON" +elif [ -x "$REPO_ROOT/venv/bin/python" ]; then + PYTHON="$REPO_ROOT/venv/bin/python" + echo "Using venv Python: $PYTHON" +else + PYTHON="$(command -v python3 || command -v python)" + echo "No local venv found, falling back to: $PYTHON" +fi + +# ── 1. Build liblogosdelivery shared library for Python bindings ────────────── +echo "──────────────────────────────────────────────────────────────────────────" +echo "Step 1 – Build liblogosdelivery shared library for Python bindings" +echo "──────────────────────────────────────────────────────────────────────────" + +# Make sure nimble / choosenim binaries are on PATH (same as CI) +export PATH="$HOME/.nimble/bin:$PATH" + +BINDINGS_DIR="$REPO_ROOT/vendor/logos-delivery-python-bindings" +DELIVERY_DIR="$BINDINGS_DIR/vendor/logos-delivery" + +export "PYTHONPATH=$BINDINGS_DIR/waku${PYTHONPATH:+:$PYTHONPATH}" + +echo "--> Creating lib output directory: $BINDINGS_DIR/lib" +mkdir -p "$BINDINGS_DIR/lib" + +echo "--> Entering: $DELIVERY_DIR" +cd "$DELIVERY_DIR" + +echo "--> Creating waku.nims symlink (waku.nimble -> waku.nims)" +ln -sf waku.nimble waku.nims + +echo "--> Installing Nim dependencies (nimble install -y)" +nimble install -y + +echo "--> Running: make setup" +make setup + +echo "--> Running: make liblogosdelivery" +make liblogosdelivery + +# On Linux the library is .so; on macOS it may be .dylib +SO_PATH="$(find . -type f \( -name 'liblogosdelivery.so' -o -name 'liblogosdelivery.dylib' \) | head -n 1)" + +if [ -z "$SO_PATH" ]; then + echo "ERROR: liblogosdelivery shared library was not built (neither .so nor .dylib found)" + exit 1 +fi + +# Preserve the platform-native extension in the destination +case "$SO_PATH" in + *.dylib) DEST_LIB="$BINDINGS_DIR/lib/liblogosdelivery.dylib" ;; + *) DEST_LIB="$BINDINGS_DIR/lib/liblogosdelivery.so" ;; +esac + +cp "$SO_PATH" "$DEST_LIB" +echo "Built library:" +ls -l "$DEST_LIB" + +# ── 2. Verify wrapper library ───────────────────────────────────────────────── +echo "" +echo "──────────────────────────────────────────────────────────────────────────" +echo "Step 2 – Verify wrapper library" +echo "──────────────────────────────────────────────────────────────────────────" + +if test -f "$BINDINGS_DIR/lib/liblogosdelivery.so" \ + || test -f "$BINDINGS_DIR/lib/liblogosdelivery.dylib"; then + echo "OK: wrapper library is present in $BINDINGS_DIR/lib/" +else + echo "ERROR: wrapper library not found in $BINDINGS_DIR/lib/" + exit 1 +fi + +# ── 3. Debug Python import paths ────────────────────────────────────────────── +echo "" +echo "──────────────────────────────────────────────────────────────────────────" +echo "Step 3 – Debug Python import paths" +echo "──────────────────────────────────────────────────────────────────────────" + +cd "$REPO_ROOT" + +# Prepend the waku bindings directory to PYTHONPATH (mirrors the CI env step) +export PYTHONPATH="$REPO_ROOT/vendor/logos-delivery-python-bindings/waku${PYTHONPATH:+:$PYTHONPATH}" + +pwd +echo "PYTHONPATH=$PYTHONPATH" +find . -maxdepth 5 | grep wrapper || true + +"$PYTHON" - <<'PY' +import sys +print("sys.path:") +for p in sys.path: + print(p) +try: + import wrapper + print("wrapper import OK:", wrapper) +except Exception as e: + print("wrapper import failed:", e) + raise +PY diff --git a/src/env_vars.py b/src/env_vars.py index 32e96a7e1..4985a6f17 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -32,5 +32,19 @@ RLN_CREDENTIALS = get_env_var("RLN_CREDENTIALS") PG_USER = get_env_var("POSTGRES_USER", "postgres") PG_PASS = get_env_var("POSTGRES_PASSWORD", "test123") +FLEET_NODES = [ + # Amsterdam + "/dns4/node-01.do-ams3.waku.test.status.im/tcp/30303/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd38UcQD6ej5W", + # US Central + "/dns4/node-01.gc-us-central1-a.waku.test.status.im/tcp/30303/p2p/16Uiu2HAmDCp8XJ9z1ev18zuv8NHekAsjNyezAvmMfFEJkiharitG", + # Hong Kong + "/dns4/node-01.ac-cn-hongkong-c.waku.test.status.im/tcp/30303/p2p/16Uiu2HAkzHaTP5JsUwfR9NR8Rj9HC24puS6ocaU8wze4QrXr9iXp", +] +FLEET_PRIMARY_MULTIADDR = FLEET_NODES[0] +FLEET_DNS_DISCOVERY_URL = "enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im" + +FLEET_N1_MULTIADDR = FLEET_NODES[0] # node-01.do-ams3 – used by NODE1 in --fleet mode +FLEET_N2_MULTIADDR = FLEET_NODES[1] # node-01.gc-us-central1-a – used by NODE2 in --fleet mode + # example for .env file # RLN_CREDENTIALS = {"rln-relay-cred-password": "password", "rln-relay-eth-client-address": "https://rpc.sepolia.linea.build", "rln-relay-eth-contract-address": "0xB9cd878C90E49F797B4431fBF4fb333108CB90e6", "rln-relay-eth-private-key-1": "", "rln-relay-eth-private-key-2": "", "rln-relay-eth-private-key-3": "", "rln-relay-eth-private-key-4": "", "rln-relay-eth-private-key-5": ""} diff --git a/src/node/docker_mananger.py b/src/node/docker_mananger.py index 44329d098..d124b2981 100644 --- a/src/node/docker_mananger.py +++ b/src/node/docker_mananger.py @@ -46,6 +46,7 @@ class DockerManager: port_bindings = {f"{port}/tcp": ("", port) for port in ports} port_bindings_for_log = " ".join(f"-p {port}:{port}" for port in ports) cli_args_str_for_log = " ".join(cli_args) + cli_args_str_for_log = re.sub(r"(--rln-relay-eth-private-key=)\S+", r"\1REDACTED", cli_args_str_for_log) logger.debug(f"docker run -i -t {port_bindings_for_log} {image_name} {cli_args_str_for_log}") container = self._client.containers.run( image_name, command=cli_args, ports=port_bindings, detach=True, remove=remove_container, auto_remove=remove_container, volumes=volumes diff --git a/src/node/fleet_waku_node.py b/src/node/fleet_waku_node.py new file mode 100644 index 000000000..6a604340e --- /dev/null +++ b/src/node/fleet_waku_node.py @@ -0,0 +1,138 @@ +"""Fleet bootstrap configuration for WakuNode. + +When fleet bootstrap is active (``--fleet`` CLI flag or ``FLEET_BOOTSTRAP=true`` +env var), an instance of :class:`FleetBootstrapConfig` is assigned to +``WakuNode._pre_start_hook``. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from src.libs.custom_logger import get_custom_logger +from src.data_storage import DS +from src.env_vars import ( + FLEET_N1_MULTIADDR, + FLEET_N2_MULTIADDR, + FLEET_PRIMARY_MULTIADDR, + FLEET_DNS_DISCOVERY_URL, + RLN_CREDENTIALS, +) +from src.test_data import FLEET_CLUSTER_ID + +if TYPE_CHECKING: + from src.node.waku_node import WakuNode + +logger = get_custom_logger(__name__) + + +def _append_fleet_kwarg(kwargs: dict, key: str, value: Any) -> None: + """Append *value* to the kwargs entry *key*, creating a list when needed.""" + existing = kwargs.get(key) + if existing is None: + kwargs[key] = value + elif isinstance(existing, list): + if value not in existing: + kwargs[key] = existing + [value] + else: + if existing != value: + kwargs[key] = [existing, value] + + +@dataclass +class FleetBootstrapConfig: + """Holds fleet session state and implements the pre-start kwargs injection. + + One instance is created per pytest session when fleet bootstrap is active. + It is registered as ``WakuNode._pre_start_hook`` so that every call to + ``WakuNode.start()`` automatically receives fleet bootstrap arguments. + + Bootstrap assignment by node creation order (mirrors config-n*.toml files): + - NODE1 (1st started) → FLEET_N1_MULTIADDR (node-01.do-ams3) + - NODE2 (2nd started) → FLEET_N2_MULTIADDR (node-01.gc-us-central1-a) + - additional nodes → FLEET_PRIMARY_MULTIADDR (Amsterdam, same as NODE1) + """ + + fleet_rln_state: dict + + def prepare_start_kwargs(self, node: "WakuNode", kwargs: dict) -> dict: + """Inject fleet bootstrap arguments into *kwargs* before node start.""" + + logger.debug("FleetBootstrapConfig.prepare_start_kwargs: injecting waku.test bootstrap args") + + if kwargs.pop("skip_fleet_peering", False): + kwargs.pop("discv5_bootstrap_node", None) + logger.debug( + "FleetBootstrapConfig: skip_fleet_peering=True – " "bypassing fleet bootstrap for this node (no fleet staticnode / shard injected)" + ) + return kwargs + + # Determine which fleet peer to connect to based on node creation order + # within the current test (DS.waku_nodes is reset to [] before each test). + node_index = len(DS.waku_nodes) + if node_index == 0: + fleet_multiaddr = FLEET_N1_MULTIADDR + logger.debug( + "FleetBootstrapConfig: NODE1 – bootstrapping from config-n1.toml (%s)", + fleet_multiaddr, + ) + elif node_index == 1: + fleet_multiaddr = FLEET_N2_MULTIADDR + logger.debug( + "FleetBootstrapConfig: NODE2 – bootstrapping from config-n2.toml (%s)", + fleet_multiaddr, + ) + else: + fleet_multiaddr = FLEET_PRIMARY_MULTIADDR + logger.debug( + "FleetBootstrapConfig: additional node %d – bootstrapping from primary (%s)", + node_index, + fleet_multiaddr, + ) + + _append_fleet_kwarg(kwargs, "staticnode", fleet_multiaddr) + kwargs.setdefault("dns_discovery", "true") + kwargs.setdefault("dns_discovery_url", FLEET_DNS_DISCOVERY_URL) + kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID) + kwargs.setdefault("shard", list(range(8))) + + # Inject session-level RLN credentials into relay enabled nodes that + # don't already carry explicit RLN args. + rln_prefixes = self.fleet_rln_state.get("keystore_prefixes", []) + if rln_prefixes and kwargs.get("rln_creds_source") is None: + if str(kwargs.get("relay", "")).lower() == "true": + if node_index < len(rln_prefixes): + kwargs["rln_creds_source"] = RLN_CREDENTIALS + kwargs["rln_creds_id"] = str(node_index + 1) + kwargs["rln_keystore_prefix"] = rln_prefixes[node_index] + kwargs["rln_relay_membership_index"] = str(self.fleet_rln_state["rln_membership_indexes"][node_index]) + kwargs.setdefault("rln_relay_user_message_limit", "300") + logger.debug( + "FleetBootstrapConfig: injected session RLN for node %d – " "prefix=%s index=%s user_message_limit=300", + node_index, + kwargs["rln_keystore_prefix"], + kwargs["rln_relay_membership_index"], + ) + else: + logger.debug( + "FleetBootstrapConfig: skipping RLN injection for node %d – " "relay not enabled (relay=%r)", + node_index, + kwargs.get("relay"), + ) + + # Strip any local-node discv5 bootstrap ENR so that each node bootstraps + # independently from its assigned fleet peer rather than from another + # local container. The fleet DNS tree (dns_discovery_url) replaces it. + if "discv5_bootstrap_node" in kwargs: + logger.debug( + "FleetBootstrapConfig: dropping local discv5_bootstrap_node=%s " "(fleet DNS discovery replaces it)", + kwargs["discv5_bootstrap_node"], + ) + del kwargs["discv5_bootstrap_node"] + + logger.debug( + "FleetBootstrapConfig: staticnode=%s dns_discovery_url=%s", + kwargs.get("staticnode"), + kwargs.get("dns_discovery_url"), + ) + return kwargs diff --git a/src/node/waku_node.py b/src/node/waku_node.py index f2a5356fc..1fa71dd2e 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -10,7 +10,7 @@ import pytest import requests from src.libs.common import delay from src.libs.custom_logger import get_custom_logger -from tenacity import retry, stop_after_delay, wait_fixed, sleep +from tenacity import retry, stop_after_attempt, stop_after_delay, wait_fixed, sleep from docker.errors import NotFound as DockerNotFound from src.node.api_clients.rest import REST from src.node.docker_mananger import DockerManager @@ -96,6 +96,9 @@ def resolve_sharding_flags(kwargs): class WakuNode: + # Optional pre-start hook to allow modifications for fleet tests + _pre_start_hook = None + def __init__(self, docker_image, docker_log_prefix=""): self._image_name = docker_image self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log") @@ -113,6 +116,8 @@ class WakuNode: @retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True) def start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs): + if WakuNode._pre_start_hook is not None: + kwargs = WakuNode._pre_start_hook(self, kwargs) logger.debug("Starting Node...") default_args, remove_container = self._prepare_start_context(**kwargs) @@ -280,7 +285,7 @@ class WakuNode: }, } - @retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True) + @retry(stop=stop_after_attempt(1), wait=wait_fixed(0.1), reraise=True) def register_rln(self, **kwargs): logger.debug("Registering RLN credentials...") self._docker_manager.create_network() diff --git a/src/steps/filter.py b/src/steps/filter.py index 8a5d90c41..39429770b 100644 --- a/src/steps/filter.py +++ b/src/steps/filter.py @@ -95,9 +95,15 @@ class StepsFilter(StepsCommon): for index, peer in enumerate(peer_list): logger.debug(f"Checking that peer NODE_{index + 2}:{peer.image} can find the published message") get_messages_response = self.get_filter_messages(message["contentTopic"], pubsub_topic=pubsub_topic, node=peer) - assert get_messages_response, f"Peer NODE_{index + 2}:{peer.image} couldn't find any messages" - assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" - waku_message = WakuMessage(get_messages_response) + # get_filter_messages already scopes to the requested content topic; the + # additional filter guards against any residual or fleet messages that may + # have been queued under the same topic before this call. + test_messages = [m for m in get_messages_response if m.get("contentTopic") == message["contentTopic"]] + assert test_messages, f"Peer NODE_{index + 2}:{peer.image} couldn't find any messages" + assert len(test_messages) == 1, ( + f"Expected 1 test message but got {len(test_messages)} " f"(total messages returned: {len(get_messages_response)})" + ) + waku_message = WakuMessage(test_messages) waku_message.assert_received_message(message) @allure.step diff --git a/src/steps/light_push.py b/src/steps/light_push.py index fd7da42d0..3980513cf 100644 --- a/src/steps/light_push.py +++ b/src/steps/light_push.py @@ -22,6 +22,7 @@ class StepsLightPush(StepsCommon): test_content_topic = "/myapp/1/latest/proto" test_pubsub_topic = VALID_PUBSUB_TOPICS[0] test_payload = "Light push works!!" + default_message_propagation_delay = 0.1 @pytest.fixture(scope="function", autouse=True) def light_push_setup(self): @@ -109,7 +110,7 @@ class StepsLightPush(StepsCommon): @allure.step def check_light_pushed_message_reaches_receiving_peer( - self, pubsub_topic=None, message=None, message_propagation_delay=0.1, sender=None, peer_list=None + self, pubsub_topic=None, message=None, message_propagation_delay=None, sender=None, peer_list=None ): if pubsub_topic is None: pubsub_topic = self.test_pubsub_topic @@ -117,6 +118,8 @@ class StepsLightPush(StepsCommon): sender = self.light_push_node1 if not peer_list: peer_list = self.main_receiving_nodes + self.optional_nodes + if message_propagation_delay is None: + message_propagation_delay = self.default_message_propagation_delay payload = self.create_payload(pubsub_topic, message) logger.debug("Lightpushing message") sender.send_light_push_message(payload) @@ -124,9 +127,12 @@ class StepsLightPush(StepsCommon): for index, peer in enumerate(peer_list): logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the lightpushed message") get_messages_response = peer.get_relay_messages(pubsub_topic) - assert get_messages_response, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" - assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" - waku_message = WakuMessage(get_messages_response) + test_messages = [m for m in get_messages_response if m.get("contentTopic") == payload["message"]["contentTopic"]] + assert test_messages, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" + assert len(test_messages) == 1, ( + f"Expected 1 test message but got {len(test_messages)} " f"(total messages in cache: {len(get_messages_response)})" + ) + waku_message = WakuMessage(test_messages) waku_message.assert_received_message(payload["message"]) @allure.step diff --git a/src/steps/relay.py b/src/steps/relay.py index 840461bbe..b801f848b 100644 --- a/src/steps/relay.py +++ b/src/steps/relay.py @@ -122,9 +122,15 @@ class StepsRelay(StepsCommon): for index, peer in enumerate(peer_list): logger.debug(f"Checking that peer NODE_{index + 1}:{peer.image} can find the published message") get_messages_response = peer.get_relay_messages(pubsub_topic) - assert get_messages_response, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" - assert len(get_messages_response) == 1, f"Expected 1 message but got {len(get_messages_response)}" - waku_message = WakuMessage(get_messages_response) + # In fleet mode the relay cache may contain background messages from other + # fleet participants. Filter to only the message whose contentTopic matches + # what the test sent so that fleet noise does not break the count assertion. + test_messages = [m for m in get_messages_response if m.get("contentTopic") == message["contentTopic"]] + assert test_messages, f"Peer NODE_{index + 1}:{peer.image} couldn't find any messages" + assert len(test_messages) == 1, ( + f"Expected 1 test message but got {len(test_messages)} " f"(total messages in cache: {len(get_messages_response)})" + ) + waku_message = WakuMessage(test_messages) waku_message.assert_received_message(message) @allure.step diff --git a/src/test_config.py b/src/test_config.py new file mode 100644 index 000000000..11f098d48 --- /dev/null +++ b/src/test_config.py @@ -0,0 +1,27 @@ +"""Test session configuration objects. + +These dataclasses carry configuration that varies between fleet and +non-fleet test runs. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import List + + +@dataclass(frozen=True) +class PubsubConfig: + """Named pubsub-topic slots for a test session. + + A *default* instance uses ``VALID_PUBSUB_TOPICS`` / ``PUBSUB_TOPICS_RLN`` + (cluster-id 198). A *fleet* instance uses ``FLEET_PUBSUB_TOPICS`` + (cluster-id 1, shards 0-7). + """ + + relay_test_topic: str + filter_test_topic: str + filter_second_topic: str + lightpush_test_topic: str + store_test_topic: str + rln_test_topic: str + all_topics: List[str] diff --git a/src/test_data.py b/src/test_data.py index 659dd39ae..2c1c6946b 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -1,3 +1,4 @@ +import os from time import time from datetime import datetime, timedelta @@ -105,6 +106,9 @@ VALID_PUBSUB_TOPICS = [ f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1000", ] +FLEET_CLUSTER_ID = "1" +FLEET_PUBSUB_TOPICS = [f"/waku/2/rs/{FLEET_CLUSTER_ID}/{i}" for i in range(8)] + PUBSUB_TOPICS_STORE = [ f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0", f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1", @@ -147,26 +151,45 @@ PUBSUB_TOPICS_WRONG_FORMAT = [ {"description": "A bool", "value": True}, ] -SAMPLE_TIMESTAMPS = [ - {"description": "Now", "value": int(time() * 1e9), "valid_for": ["nwaku"]}, - { - "description": "Far future", - "value": int((NOW + timedelta(days=365 * 10)).timestamp() * 1e9), - "valid_for": ["nwaku"], - }, # 10 years from now - {"description": "Recent past", "value": int((NOW - timedelta(hours=1)).timestamp() * 1e9), "valid_for": ["nwaku"]}, # 1 hour ago - {"description": "Near future", "value": int((NOW + timedelta(hours=1)).timestamp() * 1e9), "valid_for": ["nwaku"]}, # 1 hour ahead - {"description": "Positive number", "value": 1, "valid_for": ["nwaku"]}, - {"description": "Negative number", "value": -1, "valid_for": ["nwaku"]}, - {"description": "DST change", "value": int(datetime(2020, 3, 8, 2, 0, 0).timestamp() * 1e9), "valid_for": ["nwaku"]}, # DST starts - {"description": "Timestamp as string number", "value": str(int(time() * 1e9)), "valid_for": []}, - {"description": "Invalid large number", "value": 2**63, "valid_for": []}, - {"description": "Float number", "value": float(time() * 1e9), "valid_for": []}, - {"description": "Array instead of timestamp", "value": [int(time() * 1e9)], "valid_for": []}, - {"description": "Object instead of timestamp", "value": {"time": int(time() * 1e9)}, "valid_for": []}, - {"description": "ISO 8601 timestamp", "value": "2023-12-26T10:58:51", "valid_for": []}, - {"description": "Missing", "value": None, "valid_for": []}, -] + +def get_sample_timestamps(): + """Return timestamp test-cases with values evaluated fresh at call time. + + This factory function MUST be called from inside each test (never at module + import time) so that the "Now" value reflects the actual time when the + message is published. + + Valid_for semantics: + ``["nwaku"]`` – accepted by nwaku in the current run mode. + ``[]`` – rejected by nwaku in the current run mode (either + structurally invalid type, or out-of-epoch in fleet/RLN + mode). + """ + now_ns = int(time() * 1e9) + now_dt = datetime.now() + fleet_mode = os.getenv("FLEET_BOOTSTRAP", "false").lower() == "true" + standalone_valid = [] if fleet_mode else ["nwaku"] + return [ + {"description": "Now", "value": now_ns, "valid_for": ["nwaku"]}, + # 10 years from now + {"description": "Far future", "value": int((now_dt + timedelta(days=365 * 10)).timestamp() * 1e9), "valid_for": standalone_valid}, + # 1 hour ago + {"description": "Recent past", "value": int((now_dt - timedelta(hours=1)).timestamp() * 1e9), "valid_for": standalone_valid}, + # 1 hour ahead + {"description": "Near future", "value": int((now_dt + timedelta(hours=1)).timestamp() * 1e9), "valid_for": standalone_valid}, + {"description": "Positive number", "value": 1, "valid_for": standalone_valid}, + {"description": "Negative number", "value": -1, "valid_for": standalone_valid}, + # DST starts + {"description": "DST change", "value": int(datetime(2020, 3, 8, 2, 0, 0).timestamp() * 1e9), "valid_for": standalone_valid}, + {"description": "Timestamp as string number", "value": str(now_ns), "valid_for": []}, + {"description": "Invalid large number", "value": 2**63, "valid_for": []}, + {"description": "Float number", "value": float(now_ns), "valid_for": []}, + {"description": "Array instead of timestamp", "value": [now_ns], "valid_for": []}, + {"description": "Object instead of timestamp", "value": {"time": now_ns}, "valid_for": []}, + {"description": "ISO 8601 timestamp", "value": "2023-12-26T10:58:51", "valid_for": []}, + {"description": "Missing", "value": None, "valid_for": []}, + ] + PUBSUB_TOPICS_RLN = [f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0"] diff --git a/tests/conftest.py b/tests/conftest.py index 258c81c66..28f87dd73 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,20 +1,217 @@ # -*- coding: utf-8 -*- import inspect import glob +import random +import string from src.libs.custom_logger import get_custom_logger import os import pytest from datetime import datetime from time import time from uuid import uuid4 -from src.libs.common import attach_allure_file +from src.libs.common import attach_allure_file, gen_step_id import src.env_vars as env_vars +from src.env_vars import FLEET_PRIMARY_MULTIADDR, FLEET_DNS_DISCOVERY_URL, FLEET_N1_MULTIADDR, FLEET_N2_MULTIADDR from src.data_storage import DS from src.postgres_setup import start_postgres, stop_postgres +from src.test_data import FLEET_CLUSTER_ID, FLEET_PUBSUB_TOPICS, PUBSUB_TOPICS_RLN, VALID_PUBSUB_TOPICS +from src.test_config import PubsubConfig logger = get_custom_logger(__name__) +def pytest_addoption(parser): + """Register the --fleet command-line option.""" + parser.addoption( + "--fleet", + action="store_true", + default=False, + help=( + "Bootstrap every local nwaku Docker node against the live waku.test " + "fleet (node-01.do-ams3 / gc-us-central1-a / ac-cn-hongkong-c). " + "Also activatable via FLEET_BOOTSTRAP=true env var." + ), + ) + + +def _fleet_bootstrap_enabled(config) -> bool: + """Return True when fleet bootstrap should be activated.""" + + if config.getoption("--fleet", default=False): + return True + return os.getenv("FLEET_BOOTSTRAP", "false").lower() == "true" + + +@pytest.fixture(scope="session") +def fleet_rln_state(request): + """Register 2 RLN memberships once per test session when ``--fleet`` is active.""" + if not _fleet_bootstrap_enabled(request.config): + yield {"keystore_prefixes": [], "rln_membership_indexes": []} + return + + from src.node.waku_node import WakuNode + from src.env_vars import RLN_CREDENTIALS, DEFAULT_NWAKU + + if not RLN_CREDENTIALS: + logger.info("Fleet RLN: RLN_CREDENTIALS not set – nodes will start without RLN") + yield {"keystore_prefixes": [], "rln_membership_indexes": []} + return + + state: dict = {"keystore_prefixes": [], "rln_membership_indexes": []} + try: + for i in range(2): + prefix = "".join(random.choices(string.ascii_lowercase, k=4)) + node = WakuNode(DEFAULT_NWAKU, f"rln_reg_{i + 1}_{gen_step_id()}") + membership_index = node.register_rln( + rln_keystore_prefix=prefix, + rln_creds_source=RLN_CREDENTIALS, + rln_creds_id=str(i + 1), + ) + state["keystore_prefixes"].append(prefix) + state["rln_membership_indexes"].append(membership_index) + logger.info( + "Fleet RLN: registered %d memberships – indexes=%s prefixes=%s", + len(state["rln_membership_indexes"]), + state["rln_membership_indexes"], + state["keystore_prefixes"], + ) + except BaseException as ex: + logger.error("Fleet RLN: registration failed – aborting test session: %s", ex) + pytest.exit(f"Fleet RLN registration failed – aborting session: {ex}", returncode=1) + + yield state + + +@pytest.fixture(scope="session") +def pubsub_cfg(request) -> PubsubConfig: + """Return the pubsub-topic configuration for the current session.""" + + if _fleet_bootstrap_enabled(request.config): + return PubsubConfig( + relay_test_topic=FLEET_PUBSUB_TOPICS[1], + filter_test_topic=FLEET_PUBSUB_TOPICS[1], + filter_second_topic=FLEET_PUBSUB_TOPICS[2], + lightpush_test_topic=FLEET_PUBSUB_TOPICS[0], + store_test_topic=FLEET_PUBSUB_TOPICS[0], + rln_test_topic=FLEET_PUBSUB_TOPICS[0], + all_topics=FLEET_PUBSUB_TOPICS, + ) + return PubsubConfig( + relay_test_topic=VALID_PUBSUB_TOPICS[1], + filter_test_topic=VALID_PUBSUB_TOPICS[1], + filter_second_topic=VALID_PUBSUB_TOPICS[2], + lightpush_test_topic=VALID_PUBSUB_TOPICS[0], + store_test_topic=VALID_PUBSUB_TOPICS[0], + rln_test_topic=PUBSUB_TOPICS_RLN[0], + all_topics=VALID_PUBSUB_TOPICS, + ) + + +@pytest.fixture(scope="session", autouse=True) +def configure_fleet_bootstrap(request, fleet_rln_state): + """Register ``FleetBootstrapConfig`` as ``WakuNode._pre_start_hook`` for the session.""" + + if not _fleet_bootstrap_enabled(request.config): + logger.info("Fleet bootstrap inactive – pass --fleet (or set FLEET_BOOTSTRAP=true) " "to connect local nodes to the waku.test fleet") + yield + return + + os.environ["FLEET_BOOTSTRAP"] = "true" + + from src.node.fleet_waku_node import FleetBootstrapConfig + from src.node.waku_node import WakuNode + + cfg = FleetBootstrapConfig(fleet_rln_state=fleet_rln_state) + WakuNode._pre_start_hook = cfg.prepare_start_kwargs + logger.info( + "Fleet bootstrap active – NODE1→%s NODE2→%s (additional nodes→%s) dns_discovery_url=%s", + FLEET_N1_MULTIADDR, + FLEET_N2_MULTIADDR, + FLEET_PRIMARY_MULTIADDR, + FLEET_DNS_DISCOVERY_URL, + ) + yield + WakuNode._pre_start_hook = None + + +@pytest.fixture(scope="function", autouse=True) +def skip_fleet_test_without_rln(request, fleet_rln_state): + """Skip tests marked @pytest.mark.waku_test_fleet when no RLN keystore is + available for the current session. + """ + if not _fleet_bootstrap_enabled(request.config): + return + if not request.node.get_closest_marker("waku_test_fleet"): + return + if not fleet_rln_state.get("keystore_prefixes"): + pytest.fail("Failing fleet tests: RLN keystore not available " "(RLN_CREDENTIALS not set or on-chain registration failed)") + + +@pytest.fixture(scope="session", autouse=True) +def configure_fleet_cluster(request, pubsub_cfg): + """Apply fleet cluster configuration to step classes when ``--fleet`` is active.""" + + if not _fleet_bootstrap_enabled(request.config): + yield + return + + from src.steps.relay import StepsRelay + from src.steps.filter import StepsFilter + from src.steps.light_push import StepsLightPush + from src.steps.store import StepsStore + from src.steps.rln import StepsRLN + import tests.relay.test_publish as _relay_publish_mod + + # Override step-class topic attributes with fleet cluster-1 topics. + StepsRelay.test_pubsub_topic = pubsub_cfg.relay_test_topic + StepsFilter.test_pubsub_topic = pubsub_cfg.filter_test_topic + StepsFilter.second_pubsub_topic = pubsub_cfg.filter_second_topic + StepsLightPush.test_pubsub_topic = pubsub_cfg.lightpush_test_topic + StepsStore.test_pubsub_topic = pubsub_cfg.store_test_topic + StepsRLN.test_pubsub_topic = pubsub_cfg.rln_test_topic + + # tests/relay/test_publish.py::test_publish_on_multiple_pubsub_topics iterates + # over the module-level VALID_PUBSUB_TOPICS import directly; rebind it. + _relay_publish_mod.VALID_PUBSUB_TOPICS = pubsub_cfg.all_topics + + def _fleet_setup_lightpush_node(self, image, node_index, **kwargs): + from src.node.waku_node import WakuNode + + node = WakuNode(image, f"lightpush_node{node_index}_{self.test_id}") + fleet_kwargs = dict(kwargs) + fleet_kwargs["relay"] = "false" + fleet_kwargs["lightpush"] = "false" + fleet_kwargs["skip_fleet_peering"] = True + fleet_kwargs.setdefault("cluster_id", FLEET_CLUSTER_ID) + fleet_kwargs.setdefault("shard", list(range(8))) + + lightpush_service_addr = self.multiaddr_list[0] + node.start(lightpushnode=lightpush_service_addr, **fleet_kwargs) + self.add_node_peer(node, self.multiaddr_list) + logger.debug( + "fleet _fleet_setup_lightpush_node: node %d started with relay=false, " "skip_fleet_peering=True, lightpushnode=%s", + node_index, + lightpush_service_addr, + ) + return node + + StepsLightPush.setup_lightpush_node = _fleet_setup_lightpush_node + StepsLightPush.default_message_propagation_delay = 0.5 + + logger.info( + "Fleet cluster config active – pubsub topics overridden to cluster-id=%s " + "(shards 0-7, e.g. relay_test_topic=%s rln_test_topic=%s); " + "StepsLightPush.setup_lightpush_node overridden to use receiving_node1 as " + "lightpush service (fleet-peered with RLN membership #1; messages relay " + "through fleet mesh to receiving_node2 peered with %s)", + FLEET_CLUSTER_ID, + pubsub_cfg.relay_test_topic, + pubsub_cfg.rln_test_topic, + FLEET_N2_MULTIADDR, + ) + yield + + # See https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures @pytest.hookimpl(hookwrapper=True, tryfirst=True) def pytest_runtest_makereport(item): diff --git a/tests/filter/test_get_messages.py b/tests/filter/test_get_messages.py index 4c86394be..89df2ff7e 100644 --- a/tests/filter/test_get_messages.py +++ b/tests/filter/test_get_messages.py @@ -2,7 +2,7 @@ import pytest from src.env_vars import NODE_1, NODE_2 from src.libs.common import delay, to_base64 from src.libs.custom_logger import get_custom_logger -from src.test_data import SAMPLE_INPUTS, SAMPLE_TIMESTAMPS +from src.test_data import SAMPLE_INPUTS, get_sample_timestamps from src.steps.filter import StepsFilter logger = get_custom_logger(__name__) @@ -12,6 +12,7 @@ logger = get_custom_logger(__name__) @pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node", "subscribe_main_nodes") class TestFilterGetMessages(StepsFilter): @pytest.mark.smoke + @pytest.mark.waku_test_fleet def test_filter_get_message_with_valid_payloads(self): failed_payloads = [] for payload in SAMPLE_INPUTS: @@ -24,9 +25,10 @@ class TestFilterGetMessages(StepsFilter): failed_payloads.append(payload["description"]) assert not failed_payloads, f"Payloads failed: {failed_payloads}" + @pytest.mark.waku_test_fleet def test_filter_get_message_with_valid_timestamps(self): failed_timestamps = [] - for timestamp in SAMPLE_TIMESTAMPS: + for timestamp in get_sample_timestamps(): if self.node1.type() in timestamp["valid_for"]: logger.debug(f'Running test with timestamp {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) @@ -37,12 +39,15 @@ class TestFilterGetMessages(StepsFilter): failed_timestamps.append(timestamp) assert not failed_timestamps, f"Timestamps failed: {failed_timestamps}" + @pytest.mark.waku_test_fleet def test_filter_get_message_with_version(self): self.check_published_message_reaches_filter_peer(self.create_message(version=10)) + @pytest.mark.waku_test_fleet def test_filter_get_message_with_meta(self): self.check_published_message_reaches_filter_peer(self.create_message(meta=to_base64(self.test_payload))) + @pytest.mark.waku_test_fleet def test_filter_get_message_with_ephemeral(self): failed_ephemeral = [] for ephemeral in [True, False]: @@ -54,6 +59,7 @@ class TestFilterGetMessages(StepsFilter): failed_ephemeral.append(ephemeral) assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}" + @pytest.mark.waku_test_fleet def test_filter_get_message_with_extra_field(self): try: self.check_published_message_reaches_filter_peer(self.create_message(extraField="extraValue")) diff --git a/tests/filter/test_subscribe_create.py b/tests/filter/test_subscribe_create.py index 5a57b277d..4dcac4f2d 100644 --- a/tests/filter/test_subscribe_create.py +++ b/tests/filter/test_subscribe_create.py @@ -10,6 +10,7 @@ logger = get_custom_logger(__name__) @pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node") class TestFilterSubscribeCreate(StepsFilter): + @pytest.mark.waku_test_fleet def test_filter_subscribe_to_single_topics(self): self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) self.check_published_message_reaches_filter_peer() @@ -47,6 +48,7 @@ class TestFilterSubscribeCreate(StepsFilter): failed_pubsub_topics.append(pubsub_topic) assert not failed_pubsub_topics, f"PubsubTopics failed: {failed_pubsub_topics}" + @pytest.mark.waku_test_fleet def test_filter_subscribe_to_100_content_topics_in_one_call(self): failed_content_topics = [] _100_content_topics = [str(i) for i in range(100)] @@ -98,6 +100,7 @@ class TestFilterSubscribeCreate(StepsFilter): except Exception as ex: assert "Bad Request" in str(ex) + @pytest.mark.waku_test_fleet def test_filter_subscribe_refresh(self): for _ in range(2): self.wait_for_subscriptions_on_main_nodes([self.test_content_topic]) @@ -159,6 +162,7 @@ class TestFilterSubscribeCreate(StepsFilter): except Exception as ex: assert "Bad Request" in str(ex) + @pytest.mark.waku_test_fleet def test_filter_subscribe_with_extra_field(self, subscribe_main_nodes): try: self.create_filter_subscription( diff --git a/tests/filter/test_subscribe_update.py b/tests/filter/test_subscribe_update.py index fed6aef9b..c5f48da8e 100644 --- a/tests/filter/test_subscribe_update.py +++ b/tests/filter/test_subscribe_update.py @@ -8,6 +8,7 @@ logger = get_custom_logger(__name__) @pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node") class TestFilterSubscribeUpdate(StepsFilter): + @pytest.mark.waku_test_fleet def test_filter_update_subscription_add_a_new_content_topic(self): self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic) self.update_filter_subscription({"requestId": "1", "contentFilters": [self.second_content_topic], "pubsubTopic": self.test_pubsub_topic}) @@ -39,6 +40,7 @@ class TestFilterSubscribeUpdate(StepsFilter): except Exception as ex: assert "Bad Request" in str(ex) + @pytest.mark.waku_test_fleet def test_filter_update_subscription_refresh_existing(self): self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], pubsub_topic=self.test_pubsub_topic) self.update_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) diff --git a/tests/filter/test_unsubscribe.py b/tests/filter/test_unsubscribe.py index ded2623e3..b4fc9ed93 100644 --- a/tests/filter/test_unsubscribe.py +++ b/tests/filter/test_unsubscribe.py @@ -5,11 +5,13 @@ from src.steps.filter import StepsFilter @pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node", "subscribe_main_nodes") class TestFilterUnSubscribe(StepsFilter): + @pytest.mark.waku_test_fleet def test_filter_unsubscribe_from_single_content_topic(self): self.check_published_message_reaches_filter_peer() self.delete_filter_subscription({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) self.check_publish_without_filter_subscription() + @pytest.mark.waku_test_fleet def test_filter_unsubscribe_from_all_subscribed_content_topics(self): content_topics = [input["value"] for input in SAMPLE_INPUTS[:2]] self.wait_for_subscriptions_on_main_nodes(content_topics) @@ -25,6 +27,7 @@ class TestFilterUnSubscribe(StepsFilter): self.check_published_message_reaches_filter_peer(self.create_message(contentTopic=content_topics[1])) self.check_publish_without_filter_subscription(self.create_message(contentTopic=content_topics[0])) + @pytest.mark.waku_test_fleet def test_filter_unsubscribe_from_pubsub_topics(self): self.wait_for_subscriptions_on_main_nodes([self.test_content_topic], self.test_pubsub_topic) self.wait_for_subscriptions_on_main_nodes([self.second_content_topic], self.second_pubsub_topic) diff --git a/tests/filter/test_unsubscribe_all.py b/tests/filter/test_unsubscribe_all.py index f80a10a33..c46b70960 100644 --- a/tests/filter/test_unsubscribe_all.py +++ b/tests/filter/test_unsubscribe_all.py @@ -6,6 +6,7 @@ from random import choice @pytest.mark.usefixtures("setup_main_relay_node", "setup_main_filter_node") class TestFilterUnSubscribeAll(StepsFilter): + @pytest.mark.waku_test_fleet def test_filter_unsubscribe_all_from_few_content_topics(self): content_topics = [input["value"] for input in SAMPLE_INPUTS[:5]] self.wait_for_subscriptions_on_main_nodes(content_topics) @@ -30,6 +31,7 @@ class TestFilterUnSubscribeAll(StepsFilter): self.check_publish_without_filter_subscription(self.create_message(contentTopic=choice(second_list))) self.check_publish_without_filter_subscription(self.create_message(contentTopic=choice(third_list))) + @pytest.mark.waku_test_fleet def test_filter_unsubscribe_all_from_multiple_pubsub_topics(self): for pubsub_topic in VALID_PUBSUB_TOPICS: content_topic = pubsub_topic diff --git a/tests/light_push/test_publish.py b/tests/light_push/test_publish.py index 1e39ac0bb..5e90b4edc 100644 --- a/tests/light_push/test_publish.py +++ b/tests/light_push/test_publish.py @@ -4,7 +4,14 @@ from src.libs.custom_logger import get_custom_logger from time import time from src.libs.common import delay, to_base64 from src.steps.light_push import StepsLightPush -from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, PUBSUB_TOPICS_WRONG_FORMAT, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS, VALID_PUBSUB_TOPICS +from src.test_data import ( + INVALID_CONTENT_TOPICS, + INVALID_PAYLOADS, + PUBSUB_TOPICS_WRONG_FORMAT, + SAMPLE_INPUTS, + VALID_PUBSUB_TOPICS, + get_sample_timestamps, +) logger = get_custom_logger(__name__) @@ -17,6 +24,7 @@ class TestLightPushPublish(StepsLightPush): self.setup_first_lightpush_node() self.subscribe_to_pubsub_topics_via_relay() + @pytest.mark.waku_test_fleet def test_light_push_with_valid_payloads(self): failed_payloads = [] for payload in SAMPLE_INPUTS: @@ -75,6 +83,7 @@ class TestLightPushPublish(StepsLightPush): except Exception as ex: assert "Message size exceeded maximum of 153600 bytes" in str(ex) + @pytest.mark.waku_test_fleet def test_light_push_with_valid_content_topics(self): failed_content_topics = [] for content_topic in SAMPLE_INPUTS: @@ -151,9 +160,10 @@ class TestLightPushPublish(StepsLightPush): except Exception as ex: assert "not_published_to_any_peer" in str(ex) or "timeout" in str(ex) + @pytest.mark.waku_test_fleet def test_light_push_with_valid_timestamps(self): failed_timestamps = [] - for timestamp in SAMPLE_TIMESTAMPS: + for timestamp in get_sample_timestamps(): if self.light_push_node1.type() in timestamp["valid_for"]: logger.debug(f'Running test with timestamp {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) @@ -166,7 +176,7 @@ class TestLightPushPublish(StepsLightPush): def test_light_push_with_invalid_timestamps(self): success_timestamps = [] - for timestamp in SAMPLE_TIMESTAMPS: + for timestamp in get_sample_timestamps(): if self.light_push_node1.type() not in timestamp["valid_for"]: logger.debug(f'Running test with timestamp {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) @@ -191,6 +201,7 @@ class TestLightPushPublish(StepsLightPush): except Exception as ex: assert "Bad Request" in str(ex) + @pytest.mark.waku_test_fleet def test_light_push_with_valid_meta(self): self.check_light_pushed_message_reaches_receiving_peer(message=self.create_message(meta=to_base64(self.test_payload))) @@ -208,6 +219,7 @@ class TestLightPushPublish(StepsLightPush): except Exception as ex: assert '(kind: InvalidLengthField, field: "meta")' in str(ex) or "invalid length for Meta field" in str(ex) + @pytest.mark.waku_test_fleet def test_light_push_with_ephemeral(self): failed_ephemeral = [] for ephemeral in [True, False]: @@ -219,6 +231,7 @@ class TestLightPushPublish(StepsLightPush): failed_ephemeral.append(ephemeral) assert not failed_ephemeral, f"Ephemeral that failed: {failed_ephemeral}" + @pytest.mark.waku_test_fleet def test_light_push_with_extra_field(self): try: self.check_light_pushed_message_reaches_receiving_peer(message=self.create_message(extraField="extraValue")) diff --git a/tests/relay/test_publish.py b/tests/relay/test_publish.py index 5e2879045..d61ea1dbd 100644 --- a/tests/relay/test_publish.py +++ b/tests/relay/test_publish.py @@ -4,7 +4,7 @@ from src.libs.custom_logger import get_custom_logger from time import time from src.libs.common import delay, to_base64 from src.steps.relay import StepsRelay -from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, SAMPLE_TIMESTAMPS, VALID_PUBSUB_TOPICS +from src.test_data import INVALID_CONTENT_TOPICS, INVALID_PAYLOADS, SAMPLE_INPUTS, VALID_PUBSUB_TOPICS, get_sample_timestamps from src.node.waku_message import WakuMessage logger = get_custom_logger(__name__) @@ -12,6 +12,7 @@ logger = get_custom_logger(__name__) @pytest.mark.usefixtures("setup_main_relay_nodes", "subscribe_main_relay_nodes", "relay_warm_up") class TestRelayPublish(StepsRelay): + @pytest.mark.waku_test_fleet def test_publish_with_valid_payloads(self): failed_payloads = [] for payload in SAMPLE_INPUTS: @@ -44,6 +45,7 @@ class TestRelayPublish(StepsRelay): except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + @pytest.mark.waku_test_fleet def test_publish_with_payload_less_than_150_kb(self): payload_length = 1024 * 100 # after encoding to base64 this will be close to 150KB logger.debug(f"Running test with payload length of {payload_length} bytes") @@ -60,6 +62,7 @@ class TestRelayPublish(StepsRelay): except Exception as ex: assert "couldn't find any messages" in str(ex) or "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + @pytest.mark.waku_test_fleet def test_publish_with_valid_content_topics(self): failed_content_topics = [] for content_topic in SAMPLE_INPUTS: @@ -92,6 +95,7 @@ class TestRelayPublish(StepsRelay): except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + @pytest.mark.waku_test_fleet def test_publish_on_multiple_pubsub_topics(self): self.ensure_relay_subscriptions_on_nodes(self.main_nodes, VALID_PUBSUB_TOPICS) failed_pubsub_topics = [] @@ -118,9 +122,10 @@ class TestRelayPublish(StepsRelay): except Exception as ex: assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + @pytest.mark.waku_test_fleet def test_publish_with_valid_timestamps(self): failed_timestamps = [] - for timestamp in SAMPLE_TIMESTAMPS: + for timestamp in get_sample_timestamps(): if self.node1.type() in timestamp["valid_for"]: logger.debug(f'Running test with timestamp {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) @@ -133,7 +138,7 @@ class TestRelayPublish(StepsRelay): def test_publish_with_invalid_timestamps(self): success_timestamps = [] - for timestamp in SAMPLE_TIMESTAMPS: + for timestamp in get_sample_timestamps(): if self.node1.type() not in timestamp["valid_for"]: logger.debug(f'Running test with timestamp {timestamp["description"]}') message = self.create_message(timestamp=timestamp["value"]) @@ -144,10 +149,12 @@ class TestRelayPublish(StepsRelay): pass assert not success_timestamps, f"Invalid Timestamps that didn't failed: {success_timestamps}" + @pytest.mark.waku_test_fleet def test_publish_with_no_timestamp(self): message = {"payload": to_base64(self.test_payload), "contentTopic": self.test_content_topic} self.check_published_message_reaches_relay_peer(message) + @pytest.mark.waku_test_fleet def test_publish_with_valid_version(self): self.check_published_message_reaches_relay_peer(self.create_message(version=10)) @@ -168,6 +175,7 @@ class TestRelayPublish(StepsRelay): except Exception as ex: assert "Bad Request" in str(ex) + @pytest.mark.waku_test_fleet def test_publish_with_ephemeral(self): failed_ephemeral = [] for ephemeral in [True, False]: diff --git a/tests/store/test_get_messages.py b/tests/store/test_get_messages.py index 1af7f85ab..70e7d715f 100644 --- a/tests/store/test_get_messages.py +++ b/tests/store/test_get_messages.py @@ -10,6 +10,7 @@ logger = get_custom_logger(__name__) @pytest.mark.usefixtures("node_setup") class TestGetMessages(StepsStore): + @pytest.mark.waku_test_fleet def test_get_store_messages_with_different_payloads(self): failed_payloads = [] for payload in SAMPLE_INPUTS: @@ -22,8 +23,19 @@ class TestGetMessages(StepsStore): logger.error(f'Payload {payload["description"]} failed: {str(e)}') failed_payloads.append(payload["description"]) assert not failed_payloads, f"Payloads failed: {failed_payloads}" - assert len(self.store_response.messages) == len(SAMPLE_INPUTS) + # Content-topic-scoped query to accommodate fleet tests + for node in self.store_nodes: + scoped = self.get_messages_from_store( + node, + page_size=len(SAMPLE_INPUTS) + 10, + content_topics=self.test_content_topic, + ascending="true", + ) + assert len(scoped.messages) == len(SAMPLE_INPUTS), ( + f"Expected {len(SAMPLE_INPUTS)} test messages on {node.image} " f"but found {len(scoped.messages)}" + ) + @pytest.mark.waku_test_fleet def test_get_store_messages_with_different_content_topics(self): failed_content_topics = [] for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: @@ -50,11 +62,13 @@ class TestGetMessages(StepsStore): failed_pubsub_topics.append(pubsub_topic) assert not failed_pubsub_topics, f"PubsubTopics failed: {failed_pubsub_topics}" + @pytest.mark.waku_test_fleet def test_get_store_message_with_meta(self): message = self.create_message(meta=to_base64(self.test_payload)) self.publish_message(message=message) self.check_published_message_is_stored(page_size=5, ascending="true") + @pytest.mark.waku_test_fleet def test_get_store_message_with_version(self): message = self.create_message(version=10) self.publish_message(message=message) @@ -68,6 +82,7 @@ class TestGetMessages(StepsStore): # only one message is stored assert len(self.store_response.messages) == 1 + @pytest.mark.waku_test_fleet def test_get_multiple_store_messages(self): message_hash_list = {"nwaku": []} for payload in SAMPLE_INPUTS: @@ -75,8 +90,12 @@ class TestGetMessages(StepsStore): self.publish_message(message=message) message_hash_list["nwaku"].append(self.compute_message_hash(self.test_pubsub_topic, message, hash_type="hex")) for node in self.store_nodes: - store_response = self.get_messages_from_store(node, page_size=50) - assert len(store_response.messages) == len(SAMPLE_INPUTS) + # Scope the store query to the test content topic so that background fleet + # messages archived on the same shard do not inflate the expected count. + store_response = self.get_messages_from_store(node, page_size=50, content_topics=self.test_content_topic) + assert len(store_response.messages) == len( + SAMPLE_INPUTS + ), f"Expected {len(SAMPLE_INPUTS)} messages but got {len(store_response.messages)}" for index in range(len(store_response.messages)): assert store_response.message_hash(index) == message_hash_list[node.type()][index], f"Message hash at index {index} doesn't match"