diff --git a/.github/workflows/test_PR_image.yml b/.github/workflows/test_PR_image.yml index d0401cf13..a2d23cb11 100644 --- a/.github/workflows/test_PR_image.yml +++ b/.github/workflows/test_PR_image.yml @@ -42,8 +42,8 @@ jobs: - uses: actions/checkout@v4 with: - repository: waku-org/waku-interop-tests - ref: SMOKE_TEST_0.0.1 + repository: logos-messaging/logos-messaging-interop-tests + ref: SMOKE_TEST_STABLE - uses: actions/setup-python@v4 with: diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index 87f094ede..2b276404a 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -32,18 +32,18 @@ env: RLN_CREDENTIALS: ${{ secrets.RLN_CREDENTIALS }} jobs: - tests: name: tests strategy: fail-fast: false matrix: - shard: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] - # total number of shards =18 means tests will split into 18 thread and run in parallel to increase execution speed - # command for sharding : - # pytest --shard-id= --num-shards= - # shard 16 for test_rln.py file as they shall run sequentially - # shard 17 for test_cursor_many_msgs.py as it takes time >7 mins + shard: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] + # total number of shards =18 means tests will split into 18 thread and run in parallel to increase execution speed + # command for sharding : + # pytest --shard-id= --num-shards= + # shard 16 for test_rln.py file as they shall run sequentially + # shard 17 for test_cursor_many_msgs.py as it takes time >7 mins + runs-on: ubuntu-latest timeout-minutes: 120 outputs: @@ -65,50 +65,149 @@ jobs: jobResult_15: ${{ steps.set_result.outputs.JOB_RESULT_15 }} jobResult_16: ${{ steps.set_result.outputs.JOB_RESULT_16 }} jobResult_17: ${{ steps.set_result.outputs.JOB_RESULT_17 }} + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive - - uses: actions/checkout@v4 + - name: Remove unwanted software + uses: ./.github/actions/prune-vm - - name: Remove unwanted software - uses: ./.github/actions/prune-vm + - uses: actions/setup-python@v4 + with: + python-version: "3.12" + cache: "pip" - - uses: actions/setup-python@v4 - with: - python-version: '3.12' - cache: 'pip' + - name: Add python bindings to PYTHONPATH + run: echo "PYTHONPATH=$(pwd)/vendor/logos-delivery-python-bindings/waku:$PYTHONPATH" >> $GITHUB_ENV - - run: pip install -r requirements.txt + - name: Install system deps for tc / nsenter + run: | + sudo apt-get update + sudo apt-get install -y \ + util-linux \ + iproute2 \ + sudo \ + ca-certificates \ + curl \ + make \ + gcc \ + g++ - - name: Run tests + - name: Install Nim 2.2.4 + run: | + set -euo pipefail + curl https://nim-lang.org/choosenim/init.sh -sSf | sh -s -- -y + echo "$HOME/.nimble/bin" >> "$GITHUB_PATH" + export PATH="$HOME/.nimble/bin:$PATH" + choosenim 2.2.4 + nim --version + nimble --version - run: | - if [ "${{ matrix.shard }}" == "16" ]; then - pytest tests/relay/test_rln.py --alluredir=allure-results-${{ matrix.shard }} - elif [ "${{ matrix.shard }}" == "17" ]; then - pytest tests/store/test_cursor_many_msgs.py --alluredir=allure-results-${{ matrix.shard }} - elif [ "${{ matrix.shard }}" != "17" ]; then - pytest --ignore=tests/relay/test_rln.py --ignore=tests/store/test_cursor_many_msgs.py --reruns 2 --shard-id=${{ matrix.shard }} --num-shards=16 --alluredir=allure-results-${{ matrix.shard }} - fi + - run: pip install -r requirements.txt - - name: Upload allure results - if: always() - uses: actions/upload-artifact@v4 - with: - name: allure-results-${{ matrix.shard }} - path: allure-results-${{ matrix.shard }} + - name: Build liblogosdelivery.so for python bindings + run: | + set -euo pipefail - - name: Set job result - id: set_result - if: always() - run: | - version="${{ matrix.shard }}" - echo "JOB_RESULT_${version}=${{ job.status }}" >> "$GITHUB_OUTPUT" + export PATH="$HOME/.nimble/bin:$PATH" + BINDINGS_DIR="$(pwd)/vendor/logos-delivery-python-bindings" + DELIVERY_DIR="$BINDINGS_DIR/vendor/logos-delivery" + + mkdir -p "$BINDINGS_DIR/lib" + + cd "$DELIVERY_DIR" + + ln -sf waku.nimble waku.nims + + # install Nim deps + nimble install -y + + # do the real setup, do not fake .nimble-setup + make setup + + # now build the shared library + make liblogosdelivery + + SO_PATH="$(find . -type f -name 'liblogosdelivery.so' | head -n 1)" + + if [ -z "$SO_PATH" ]; then + echo "liblogosdelivery.so was not built" + exit 1 + fi + + cp "$SO_PATH" "$BINDINGS_DIR/lib/liblogosdelivery.so" + + echo "Built library:" + ls -l "$BINDINGS_DIR/lib/liblogosdelivery.so" + + - name: Verify wrapper library + run: | + test -f vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so + + - name: Debug Python import paths + run: | + pwd + echo "PYTHONPATH=$PYTHONPATH" + find . -maxdepth 5 | grep wrapper || true + python - <<'PY' + import sys + print("sys.path:") + for p in sys.path: + print(p) + try: + import wrapper + print("wrapper import OK:", wrapper) + except Exception as e: + print("wrapper import failed:", e) + raise + PY + + - name: Run tests + run: | + export PATH="$HOME/.nimble/bin:$PATH" + export PYTHONPATH="$(pwd)/vendor/logos-delivery-python-bindings/waku:$PYTHONPATH" + + if [ "${{ matrix.shard }}" == "16" ]; then + pytest tests/relay/test_rln.py \ + --ignore=vendor/logos-delivery-python-bindings/tests \ + --alluredir=allure-results-${{ matrix.shard }} + elif [ "${{ matrix.shard }}" == "17" ]; then + pytest tests/store/test_cursor_many_msgs.py \ + --ignore=vendor/logos-delivery-python-bindings/tests \ + --alluredir=allure-results-${{ matrix.shard }} + else + pytest \ + --ignore=vendor/logos-delivery-python-bindings/tests \ + --ignore=tests/relay/test_rln.py \ + --ignore=tests/store/test_cursor_many_msgs.py \ + --reruns 2 \ + --shard-id=${{ matrix.shard }} \ + --num-shards=16 \ + --alluredir=allure-results-${{ matrix.shard }} + fi + + - name: Upload allure results + if: always() + uses: actions/upload-artifact@v4 + with: + name: allure-results-${{ matrix.shard }} + path: allure-results-${{ matrix.shard }} + + - name: Set job result + id: set_result + if: always() + run: | + version="${{ matrix.shard }}" + echo "JOB_RESULT_${version}=${{ job.status }}" >> "$GITHUB_OUTPUT" aggregate-reports: runs-on: ubuntu-latest needs: [tests] if: always() + steps: - name: Download all allure results uses: actions/download-artifact@v4 @@ -116,12 +215,34 @@ jobs: path: all-results merge-multiple: true - - name: Get allure history - if: always() + - name: Check out gh-pages history if it exists + id: checkout_gh_pages + continue-on-error: true uses: actions/checkout@v4 with: ref: gh-pages path: gh-pages + fetch-depth: 1 + + - name: Prepare empty gh-pages fallback + if: steps.checkout_gh_pages.outcome != 'success' + run: | + mkdir -p gh-pages/${{ env.CALLER }}/last-history + + - name: Remove oversized Allure text attachments + if: always() + run: | + set -euo pipefail + + echo "Before cleanup:" + find all-results -type f | wc -l || true + du -sh all-results || true + + find all-results -type f -name "*.txt" -delete || true + + echo "After cleanup:" + find all-results -type f | wc -l || true + du -sh all-results || true - name: Setup allure report uses: simple-elf/allure-report-action@master @@ -131,66 +252,106 @@ jobs: allure_results: all-results gh_pages: gh-pages/${{ env.CALLER }} allure_history: allure-history - keep_reports: 30 - report_url: https://waku-org.github.io/waku-interop-tests/${{ env.CALLER }} + keep_reports: 10 + report_url: https://logos-messaging.github.io/logos-delivery-interop-tests/${{ env.CALLER }} + + - name: Fix permissions (CRITICAL) + if: always() + run: | + echo "Fixing permissions..." + sudo chown -R $USER:$USER allure-history || true + sudo chmod -R u+rwX allure-history || true + + - name: Clean allure-history + if: always() + run: | + echo "Cleaning allure-history..." + + # Remove ALL txt attachments from history + find allure-history -type f -name "*.txt" -delete || true + + echo "After cleanup:" + du -sh allure-history || true + + - name: Verify generated report content + if: always() + run: | + set -euo pipefail + + test -d allure-history + test -d "allure-history/${{ github.run_number }}" + test -f "allure-history/${{ github.run_number }}/index.html" + test -d "allure-history/last-history" + + echo "=== allure-history size ===" + du -sh allure-history || true + + echo "=== largest files ===" + find allure-history -type f -printf '%s %p\n' | sort -nr | head -50 || true + + echo "=== symlinks ===" + find allure-history -type l -ls || true + + echo "=== hard-linked files ===" + find allure-history -type f -links +1 -ls || true + + echo "=== top-level tree ===" + find allure-history -maxdepth 2 -type f | sort | head -100 || true - name: Deploy report to Github Pages uses: peaceiris/actions-gh-pages@v3 - if: always() with: - github_token: ${{ secrets.GITHUB_TOKEN }} - publish_branch: gh-pages - publish_dir: allure-history - destination_dir: ${{ env.CALLER }} + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_branch: gh-pages + publish_dir: ./allure-history + destination_dir: ${{ env.CALLER }} + keep_files: false + enable_jekyll: false - name: Store output from matrix jobs run: | - echo '${{ toJSON(needs.tests.outputs) }}' > results.json + echo '${{ toJSON(needs.tests.outputs) }}' > results.json - name: Create job summary if: always() run: | - echo "## Run Information" >> $GITHUB_STEP_SUMMARY - echo "- **Event**: ${{ github.event_name }}" >> $GITHUB_STEP_SUMMARY - echo "- **Actor**: ${{ github.actor }}" >> $GITHUB_STEP_SUMMARY - echo "- **Node1**: ${{ env.NODE_1 }}" >> $GITHUB_STEP_SUMMARY - echo "- **Node2**: ${{ env.NODE_2 }}" >> $GITHUB_STEP_SUMMARY - echo "- **Additonal Nodes**: ${{ env.ADDITIONAL_NODES }}" >> $GITHUB_STEP_SUMMARY - echo "## Test Results" >> $GITHUB_STEP_SUMMARY - echo "Allure report will be available at: https://waku-org.github.io/waku-interop-tests/${{ env.CALLER }}/${{ github.run_number }}" >> $GITHUB_STEP_SUMMARY - - # Evaluate overall result - TESTS_RESULT="success" - for key in $(jq -r 'keys[]' results.json); do - result=$(jq -r --arg key "$key" '.[$key]' results.json) - echo "Key: $key, Value: $result" - # Check condition on the result - if [ "$result" != "success" ]; then - echo "Value 'success' not found at key: $key" - TESTS_RESULT="failure" - break - fi - done - - # Notify Waku team - if [ "$TESTS_RESULT" != "success" ]; then - echo "There are failures with nwaku node. cc <@&1111608257824440330>" >> $GITHUB_STEP_SUMMARY + echo "## Run Information" >> $GITHUB_STEP_SUMMARY + echo "- **Event**: ${{ github.event_name }}" >> $GITHUB_STEP_SUMMARY + echo "- **Actor**: ${{ github.actor }}" >> $GITHUB_STEP_SUMMARY + echo "- **Node1**: ${{ env.NODE_1 }}" >> $GITHUB_STEP_SUMMARY + echo "- **Node2**: ${{ env.NODE_2 }}" >> $GITHUB_STEP_SUMMARY + echo "- **Additonal Nodes**: ${{ env.ADDITIONAL_NODES }}" >> $GITHUB_STEP_SUMMARY + echo "## Test Results" >> $GITHUB_STEP_SUMMARY + echo "Allure report will be available at: https://logos-messaging.github.io/logos-delivery-interop-tests/${{ env.CALLER }}/${{ github.run_number }}" >> $GITHUB_STEP_SUMMARY + + TESTS_RESULT="success" + for key in $(jq -r 'keys[]' results.json); do + result=$(jq -r --arg key "$key" '.[$key]' results.json) + echo "Key: $key, Value: $result" + if [ "$result" != "success" ]; then + echo "Value 'success' not found at key: $key" + TESTS_RESULT="failure" + break fi - - # Write result and summary to ENV - echo "TESTS_RESULT=$TESTS_RESULT" >> $GITHUB_ENV - { - echo 'JOB_SUMMARY<> $GITHUB_ENV + done + + if [ "$TESTS_RESULT" != "success" ]; then + echo "There are failures with nwaku node. cc <@&1111608257824440330>" >> $GITHUB_STEP_SUMMARY + fi + + echo "TESTS_RESULT=$TESTS_RESULT" >> $GITHUB_ENV + { + echo 'JOB_SUMMARY<> $GITHUB_ENV - name: Send report to Discord uses: rjstone/discord-webhook-notify@v1 if: always() && env.CALLER != 'manual' with: - severity: ${{ env.TESTS_RESULT == 'success' && 'info' || 'error' }} - username: ${{ github.workflow }} - description: "## Job Result: ${{ env.TESTS_RESULT }}" - details: ${{ env.JOB_SUMMARY }} - webhookUrl: ${{ secrets.DISCORD_TEST_REPORTS_WH }} \ No newline at end of file + severity: ${{ env.TESTS_RESULT == 'success' && 'info' || 'error' }} + username: ${{ github.workflow }} + description: "## Job Result: ${{ env.TESTS_RESULT }}" + details: ${{ env.JOB_SUMMARY }} + webhookUrl: ${{ secrets.DISCORD_TEST_REPORTS_WH }} \ No newline at end of file diff --git a/.gitignore b/.gitignore index 16267b58b..e64e42ff9 100644 --- a/.gitignore +++ b/.gitignore @@ -104,3 +104,5 @@ dmypy.json # Pyre type checker .pyre/ + +third_party/logos-delivery-python-bindings diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..e1e45502f --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "vendor/logos-delivery-python-bindings"] + path = vendor/logos-delivery-python-bindings + url = https://github.com/logos-messaging/logos-delivery-python-bindings.git + diff --git a/LICENSE-APACHE-v2 b/LICENSE-APACHE similarity index 99% rename from LICENSE-APACHE-v2 rename to LICENSE-APACHE index 318c8b106..d64569567 100644 --- a/LICENSE-APACHE-v2 +++ b/LICENSE-APACHE @@ -187,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2018 Status Research & Development GmbH + Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/LICENSE-MIT b/LICENSE-MIT index 68faab7bf..d4c697062 100644 --- a/LICENSE-MIT +++ b/LICENSE-MIT @@ -1,21 +1,21 @@ The MIT License (MIT) -Copyright (c) 2021 Status Research & Development GmbH +Copyright © 2025-2026 Logos Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal +of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md index 592a229ec..d731ab6a1 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,13 @@ -# waku-interop-tests +# logos-messaging-interop-tests -Waku end‑to‑end (e2e) interoperability test framework for the [Waku v2 protocol](https://rfc.vac.dev/spec/10/). It exercises multiple clients (nwaku, js‑waku, go‑waku…) in realistic network topologies and reports results via Allure. +Logos Messaging end‑to‑end (e2e) interoperability test framework for the [Waku v2 protocol](https://rfc.vac.dev/spec/10/). It exercises multiple clients (logos-messaging-nim, js‑waku, go‑waku…) in realistic network topologies and reports results via Allure. ## Setup & contribution ```bash -git clone git@github.com:waku-org/waku-interop-tests.git -cd waku-interop-tests +# Use sparse checkout since the repo has large history +git clone --depth=1 git@github.com:logos-messaging/logos-messaging-interop-tests.git +cd logos-messaging-interop-tests # create and activate a virtual environment python -m venv .venv @@ -47,15 +48,15 @@ Every day the workflow **nim\_waku\_daily.yml** triggers against the image `waku To launch it manually: -1. Open [https://github.com/waku-org/waku-interop-tests/actions/workflows/nim\_waku\_daily.yml](https://github.com/waku-org/waku-interop-tests/actions/workflows/nim_waku_daily.yml). +1. Open [https://github.com/logos-messaging/logos-messaging-interop-tests/actions/workflows/nim\_waku\_daily.yml](https://github.com/logos-messaging/logos-messaging-interop-tests/actions/workflows/nim_waku_daily.yml). 2. Click **► Run workflow**. 3. Pick the branch you want to test (defaults to `master`) and press **Run workflow**. -### On‑demand matrix against custom *nwaku* versions +### On‑demand matrix against custom *logos-messaging-nim* versions Use **interop\_tests.yml** when you need to test a PR or a historical image: -1. Open [https://github.com/waku-org/waku-interop-tests/actions/workflows/interop\_tests.yml](https://github.com/waku-org/waku-interop-tests/actions/workflows/interop_tests.yml). +1. Open [https://github.com/logos-messaging/logos-messaging-interop-tests/actions/workflows/interop\_tests.yml](https://github.com/logos-messaging/logos-messaging-interop-tests/actions/workflows/interop_tests.yml). 2. Press **► Run workflow** and choose the branch. 3. In the *workflow inputs* field set the `nwaku_image` you want, e.g. `wakuorg/nwaku:v0.32.0`. @@ -64,21 +65,11 @@ Use **interop\_tests.yml** when you need to test a PR or a historical image: * When the job finishes GitHub will display an **Allure Report** link in the run summary. * The bot also posts the same link in the **Waku / test‑reports** Discord channel. -### Updating the CI job used from *nwaku* +### Updating the CI job used from *logos-messaging-nim* -In the **nwaku** repository itself the file `.github/workflows/test_PR_image.yml` pins the interop test version. -To update it: +In the **logos-messaging-nim** repository itself the file `.github/workflows/test_PR_image.yml` pins the interop test version to `SMOKE_TEST_STABLE`. -1. Tag the desired commit in `waku-interop-tests` and push the tag - -```bash -git tag vX.Y.Z -git push origin vX.Y.Z -``` - -2. Edit `test_PR_image.yml` in **nwaku** and set `ref: vX.Y.Z` for the `tests` job. - -![CI job location](https://github.com/user-attachments/assets/dd3f95bd-fe79-475b-92b7-891d82346382) +To update it, move the `SMOKE_TEST_STABLE` tag to point to the desired commit in `waku-interop-tests`. ## License diff --git a/pytest.ini b/pytest.ini index 288706a99..3a13b3b9c 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,9 +2,13 @@ addopts = --instafail --tb=short --color=auto log_level = DEBUG log_cli = True +norecursedirs = + vendor + nimbledeps + *.egg-info log_file = log/test.log log_cli_format = %(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s log_file_format = %(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s timeout = 300 markers = - smoke: marks tests as smoke test (deselect with '-m "not smoke"') \ No newline at end of file + smoke: marks tests as smoke test (deselect with '-m "not smoke"') diff --git a/requirements.txt b/requirements.txt index 2d62863fa..53c1e36af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -40,3 +40,5 @@ typing_extensions==4.9.0 urllib3==2.2.2 virtualenv==20.25.0 pytest-shard==0.1.2 +result==0.17.0 +cffi \ No newline at end of file diff --git a/src/env_vars.py b/src/env_vars.py index 1fc8f47a7..32e96a7e1 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -16,6 +16,7 @@ def get_env_var(var_name, default=None): # Configuration constants. Need to be upercase to appear in reports DEFAULT_NWAKU = "wakuorg/nwaku:latest" STRESS_ENABLED = False +USE_WRAPPERS = True NODE_1 = get_env_var("NODE_1", DEFAULT_NWAKU) NODE_2 = get_env_var("NODE_2", DEFAULT_NWAKU) ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NWAKU},{DEFAULT_NWAKU},{DEFAULT_NWAKU}") @@ -32,4 +33,4 @@ PG_USER = get_env_var("POSTGRES_USER", "postgres") PG_PASS = get_env_var("POSTGRES_PASSWORD", "test123") # example for .env file -# RLN_CREDENTIALS = {"rln-relay-cred-password": "password", "rln-relay-eth-client-address": "wss://sepolia.infura.io/ws/v3/api_key", "rln-relay-eth-contract-address": "0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4", "rln-relay-eth-private-key-1": "1111111111111111111111111111111111111111111111111111111111111111", "rln-relay-eth-private-key-2": "1111111111111111111111111111111111111111111111111111111111111111"} +# RLN_CREDENTIALS = {"rln-relay-cred-password": "password", "rln-relay-eth-client-address": "https://rpc.sepolia.linea.build", "rln-relay-eth-contract-address": "0xB9cd878C90E49F797B4431fBF4fb333108CB90e6", "rln-relay-eth-private-key-1": "", "rln-relay-eth-private-key-2": "", "rln-relay-eth-private-key-3": "", "rln-relay-eth-private-key-4": "", "rln-relay-eth-private-key-5": ""} diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 9a8d2e35b..f2a5356fc 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -5,16 +5,19 @@ import random import re import shutil import string +import subprocess import pytest import requests from src.libs.common import delay from src.libs.custom_logger import get_custom_logger -from tenacity import retry, stop_after_delay, wait_fixed +from tenacity import retry, stop_after_delay, wait_fixed, sleep +from docker.errors import NotFound as DockerNotFound from src.node.api_clients.rest import REST from src.node.docker_mananger import DockerManager from src.env_vars import DOCKER_LOG_DIR from src.data_storage import DS from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS, VALID_PUBSUB_TOPICS +from src.node.wrappers_manager import WrapperManager logger = get_custom_logger(__name__) @@ -37,8 +40,24 @@ def sanitize_docker_flags(input_flags): @retry(stop=stop_after_delay(180), wait=wait_fixed(0.5), reraise=True) -def rln_credential_store_ready(creds_file_path, single_check=False): +def rln_credential_store_ready(creds_file_path, single_check=False, require_credentials=False): if os.path.exists(creds_file_path): + subprocess.run(["sudo", "-n", "chmod", "a+r", creds_file_path], check=False) + if require_credentials: + try: + with open(creds_file_path, "r", encoding="utf-8") as creds_file: + keystore_data = json.load(creds_file) + except (OSError, json.JSONDecodeError) as ex: + if single_check: + return False + raise ValueError(f"Failed to parse RLN keystore at {creds_file_path}: {ex}") + + credentials = keystore_data.get("credentials", {}) if isinstance(keystore_data, dict) else {} + if not credentials: + if single_check: + return False + raise ValueError(f"RLN keystore exists but has no credentials yet: {creds_file_path}") + return True elif not single_check: raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), creds_file_path) @@ -82,12 +101,27 @@ class WakuNode: self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log") self._docker_manager = DockerManager(self._image_name) self._container = None + self.rln_membership_index = None self.start_args = {} + self._wrapper_node = None + self._rln_creds_set = False logger.debug(f"WakuNode instance initialized with log path {self._log_path}") + @property + def _is_wrapper(self) -> bool: + return self._wrapper_node is not None + @retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True) - def start(self, wait_for_node_sec=20, **kwargs): + def start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs): logger.debug("Starting Node...") + default_args, remove_container = self._prepare_start_context(**kwargs) + + if use_wrapper: + self._start_wrapper(default_args, wait_for_node_sec) + else: + self._start_docker(default_args, remove_container, wait_for_node_sec) + + def _prepare_start_context(self, **kwargs): self._docker_manager.create_network() self._ext_ip = self._docker_manager.generate_random_ext_ip() self._ports = self._docker_manager.generate_ports() @@ -164,19 +198,24 @@ class WakuNode: del default_args["pubsub-topic"] rln_args, rln_creds_set, keystore_path = self.parse_rln_credentials(default_args, False) + self._rln_creds_set = rln_creds_set default_args.pop("rln-creds-id", None) default_args.pop("rln-creds-source", None) default_args.pop("rln-keystore-prefix", None) if rln_creds_set: - rln_credential_store_ready(keystore_path) + rln_credential_store_ready(keystore_path, require_credentials=True) default_args.update(rln_args) else: logger.info(f"RLN credentials not set or credential store not available, starting without RLN") - logger.debug(f"Using volumes {self._volumes}") self.start_args = dict(default_args) + return default_args, remove_container + + def _start_docker(self, default_args, remove_container, wait_for_node_sec): + logger.debug(f"Using volumes {self._volumes}") + self._container = self._docker_manager.start_container( self._docker_manager.image, ports=self._ports, @@ -186,16 +225,61 @@ class WakuNode: volumes=self._volumes, remove_container=remove_container, ) - logger.debug(f"Started container from image {self._image_name}. REST: {self._rest_port}") DS.waku_nodes.append(self) - delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly + delay(1) + try: + self.ensure_ready(timeout_duration=wait_for_node_sec, rln_required=self._rln_creds_set) + except Exception as ex: + logger.error(f"REST service did not become ready in time: {ex}") + raise + + def _start_wrapper(self, default_args, wait_for_node_sec): + logger.debug("Starting node using wrappers") + wrapper_config = self._default_args_to_wrapper_config(default_args) + + result = WrapperManager.create_and_start(config=wrapper_config, timeout_s=wait_for_node_sec) + if result.is_err(): + raise RuntimeError(f"Failed to start wrapper node: {result.err()}") + self._wrapper_node = result.ok_value + + logger.debug(f"Started wrapper node. REST: {self._rest_port}") + DS.waku_nodes.append(self) + delay(1) try: self.ensure_ready(timeout_duration=wait_for_node_sec) except Exception as ex: logger.error(f"REST service did not become ready in time: {ex}") raise + def _default_args_to_wrapper_config(self, default_args): + def _bool(key, default="false"): + return default_args.get(key, default).lower() == "true" + + bootstrap = default_args.get("discv5-bootstrap-node") + + return { + "logLevel": default_args.get("log-level", "DEBUG"), + "mode": "Core", + "networkingConfig": { + "listenIpv4": default_args.get("listen-address", "0.0.0.0"), + "p2pTcpPort": int(default_args["tcp-port"]), + "discv5UdpPort": int(default_args["discv5-udp-port"]), + "restPort": int(default_args["rest-port"]), + "restAddress": default_args.get("rest-address", "0.0.0.0"), + }, + "protocolsConfig": { + "clusterId": int(default_args.get("cluster-id", DEFAULT_CLUSTER_ID)), + "relay": _bool("relay"), + "store": _bool("store"), + "filter": _bool("filter"), + "lightpush": _bool("lightpush"), + "peerExchange": _bool("peer-exchange"), + "discv5Discovery": _bool("discv5-discovery", "true"), + "discv5BootstrapNodes": [bootstrap] if bootstrap else [], + }, + } + @retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True) def register_rln(self, **kwargs): logger.debug("Registering RLN credentials...") @@ -221,18 +305,48 @@ class WakuNode: logger.debug(f"Waiting for keystore {keystore_path}") try: - rln_credential_store_ready(keystore_path) + rln_credential_store_ready(keystore_path, require_credentials=True) + self.rln_membership_index = str(self.get_rln_membership_index_from_log()) + logger.debug(f"Detected RLN membership index from registration logs: {self.rln_membership_index}") + self.stop() except Exception as ex: logger.error(f"File {keystore_path} with RLN credentials did not become available in time {ex}") raise else: logger.warn("RLN credentials not set, no action performed") + return self.rln_membership_index + + @retry(stop=stop_after_delay(10), wait=wait_fixed(0.2), reraise=True) + def get_rln_membership_index_from_log(self): + if not os.path.exists(self._log_path): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self._log_path) + + with open(self._log_path, "r", encoding="utf-8", errors="ignore") as log_file: + log_data = log_file.read() + + matches = re.findall(r"membershipIndex=(\d+)", log_data) + if not matches: + raise ValueError("Could not infer RLN membership index from registration logs") + + return int(matches[-1]) + @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def stop(self): + if self._is_wrapper: + self._stop_wrapper() + else: + self._stop_docker() + + def _stop_docker(self): if self._container: logger.debug(f"Stopping container with id {self._container.short_id}") - self._container.stop() + try: + self._container.stop() + except DockerNotFound: + logger.debug(f"Container {self._container.short_id} already exited and removed, treating as stopped.") + self._container = None + return try: self._container.remove() except: @@ -240,6 +354,14 @@ class WakuNode: self._container = None logger.debug("Container stopped.") + def _stop_wrapper(self): + logger.debug("Stopping wrapper node") + result = self._wrapper_node.stop_and_destroy() + if result.is_err(): + logger.error(f"Failed to stop wrapper node: {result.err()}") + self._wrapper_node = None + logger.debug("Wrapper node stopped and destroyed.") + @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def kill(self): if self._container: @@ -267,7 +389,7 @@ class WakuNode: logger.debug(f"Unpause container with id {self._container.short_id}") self._container.unpause() - def ensure_ready(self, timeout_duration=10): + def ensure_ready(self, timeout_duration=10, rln_required=False): @retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True) def check_healthy(node=self): self.health_response = node.health() @@ -280,9 +402,12 @@ class WakuNode: if self.health_response.get("nodeHealth") != "READY": raise AssertionError("Waiting for the node health status: READY") - # for p in self.health_response.get("protocolsHealth"): - # if p.get("Rln Relay") != "READY": - # raise AssertionError("Waiting for the Rln relay status: READY") + for p in self.health_response.get("protocolsHealth"): + if rln_required and "Rln Relay" in p: + if p["Rln Relay"] != "READY": + raise AssertionError("Waiting for the Rln relay status: READY") + # TODO: Remove once Rln Relay reflects true RLN status + sleep(20) logger.info("Node protocols are initialized !!") @@ -320,6 +445,33 @@ class WakuNode: def get_tcp_address(self): return f"/ip4/{self._ext_ip}/tcp/{self._tcp_port}" + def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): + if self._is_wrapper: + result = self._wrapper_node.subscribe_content_topic(content_topic, timeout_s=timeout_s) + if result.is_err(): + raise RuntimeError(f"subscribe_content_topic failed: {result.err()}") + return result.ok_value + else: + return self._api.set_relay_auto_subscriptions([content_topic]) + + def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): + if self._is_wrapper: + result = self._wrapper_node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s) + if result.is_err(): + raise RuntimeError(f"unsubscribe_content_topic failed: {result.err()}") + return result.ok_value + else: + return self._api.delete_relay_auto_subscriptions([content_topic]) + + def send_message(self, message: dict, *, timeout_s: float = 20.0): + if self._is_wrapper: + result = self._wrapper_node.send_message(message, timeout_s=timeout_s) + if result.is_err(): + raise RuntimeError(f"send_message failed: {result.err()}") + return result.ok_value + else: + return self._api.send_relay_auto_message(message) + def info(self): return self._api.info() @@ -428,6 +580,21 @@ class WakuNode: def is_nwaku(self): return "nwaku" in self.image + def prepare_rln_storage_paths(self, cwd, keystore_prefix, selected_id, reset_existing=False): + keystore_dir = os.path.join(cwd, f"keystore_{keystore_prefix}_{selected_id}") + rln_tree_dir = os.path.join(cwd, f"rln_tree_{keystore_prefix}_{selected_id}") + + if reset_existing: + for path, path_name in [(keystore_dir, "keystore"), (rln_tree_dir, "rln tree")]: + if os.path.exists(path): + logger.warning(f"Resetting existing RLN {path_name} directory before registration: {path}") + shutil.rmtree(path, ignore_errors=True) + + os.makedirs(keystore_dir, exist_ok=True) + os.makedirs(rln_tree_dir, exist_ok=True) + + return keystore_dir, rln_tree_dir + def parse_rln_credentials(self, default_args, is_registration): rln_args = {} keystore_path = None @@ -440,6 +607,13 @@ class WakuNode: return rln_args, False, keystore_path imported_creds = json.loads(rln_creds_source) + rln_chain_id = imported_creds.get("rln-relay-chain-id") + if rln_chain_id is None: + eth_client_address = imported_creds.get("rln-relay-eth-client-address", "") + if "linea" in eth_client_address: + rln_chain_id = "59141" + elif "sepolia" in eth_client_address: + rln_chain_id = "11155111" if len(imported_creds) < 4 or any(value is None for value in imported_creds.values()): logger.warn(f"One or more of required RLN credentials were not set properly") @@ -448,6 +622,13 @@ class WakuNode: eth_private_key = select_private_key(imported_creds, selected_id) cwd = os.getcwd() + keystore_prefix = default_args.get("rln-keystore-prefix") + + if not keystore_prefix: + logger.warn("rln-keystore-prefix is missing, cannot mount RLN state and keystore") + return rln_args, False, keystore_path + + keystore_dir, rln_tree_dir = self.prepare_rln_storage_paths(cwd, keystore_prefix, selected_id, reset_existing=is_registration) if self.is_nwaku(): if is_registration: @@ -470,6 +651,8 @@ class WakuNode: { "rln-relay-cred-path": "/keystore/keystore.json", "rln-relay-cred-password": imported_creds["rln-relay-cred-password"], + "rln-relay-eth-client-address": imported_creds["rln-relay-eth-client-address"], + "rln-relay-eth-contract-address": imported_creds["rln-relay-eth-contract-address"], } ) else: @@ -483,12 +666,15 @@ class WakuNode: } ) - keystore_path = cwd + "/keystore_" + default_args["rln-keystore-prefix"] + "_" + selected_id + "/keystore.json" + if rln_chain_id is not None: + rln_args["rln-relay-chain-id"] = str(rln_chain_id) + + keystore_path = os.path.join(keystore_dir, "keystore.json") self._volumes.extend( [ - cwd + "/rln_tree_" + default_args["rln-keystore-prefix"] + "_" + selected_id + ":/etc/rln_tree", - cwd + "/keystore_" + default_args["rln-keystore-prefix"] + "_" + selected_id + ":/keystore", + f"{rln_tree_dir}:/etc/rln_tree", + f"{keystore_dir}:/keystore", ] ) @@ -577,3 +763,9 @@ class WakuNode: def get_peer_info(self, peer_id: str): return self._api.get_peer(peer_id) + + @property + def container_id(self) -> str: + if not self._container: + raise RuntimeError("Node container not started yet") + return self._container.id diff --git a/src/node/wrappers_manager.py b/src/node/wrappers_manager.py new file mode 100644 index 000000000..41845077d --- /dev/null +++ b/src/node/wrappers_manager.py @@ -0,0 +1,81 @@ +import sys +from pathlib import Path +from result import Result, Ok, Err + +_THIRD_PARTY = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku" +if str(_THIRD_PARTY) not in sys.path: + sys.path.insert(0, str(_THIRD_PARTY)) + +from wrapper import NodeWrapper as _NodeWrapper # type: ignore[import] + +"""" +thin manager/wrapper layer around NodeWrapper from the bindings. +It simplifies create, start, and interaction with a Waku node while returning consistent Result objects (Ok / Err). +""" + + +class WrapperManager: + def __init__(self, node: _NodeWrapper): + self._node = node + + @classmethod + def create( + cls, + config: dict, + event_cb=None, + *, + timeout_s: float = 20.0, + ) -> Result["WrapperManager", str]: + result = _NodeWrapper.create_node(config, event_cb, timeout_s=timeout_s) + if result.is_err(): + return Err(result.err()) + return Ok(cls(result.ok_value)) + + @classmethod + def create_and_start( + cls, + config: dict, + event_cb=None, + *, + timeout_s: float = 20.0, + ) -> Result["WrapperManager", str]: + result = _NodeWrapper.create_and_start(config, event_cb, timeout_s=timeout_s) + if result.is_err(): + return Err(result.err()) + return Ok(cls(result.ok_value)) + + def __enter__(self) -> "WrapperManager": + return self + + def __exit__(self, *_) -> None: + self.stop_and_destroy() + + def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.start_node(timeout_s=timeout_s) + + def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.stop_node(timeout_s=timeout_s) + + def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.destroy(timeout_s=timeout_s) + + def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.stop_and_destroy(timeout_s=timeout_s) + + def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.subscribe_content_topic(content_topic, timeout_s=timeout_s) + + def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s) + + def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]: + return self._node.send_message(message, timeout_s=timeout_s) + + def get_available_node_info_ids(self, *, timeout_s: float = 20.0) -> Result[list[str], str]: + return self._node.get_available_node_info_ids(timeout_s=timeout_s) + + def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]: + return self._node.get_node_info(node_info_id, timeout_s=timeout_s) + + def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]: + return self._node.get_available_configs(timeout_s=timeout_s) diff --git a/src/steps/light_push.py b/src/steps/light_push.py index 52eedb62c..fd7da42d0 100644 --- a/src/steps/light_push.py +++ b/src/steps/light_push.py @@ -68,11 +68,11 @@ class StepsLightPush(StepsCommon): self.start_receiving_node(node, node_index=index + 2, lightpush="true", relay="true", pubsub_topic=self.test_pubsub_topic, **kwargs) @allure.step - def setup_first_lightpush_node(self, lightpush="true", relay="false", **kwargs): + def setup_first_lightpush_node(self, lightpush="true", relay="true", **kwargs): self.light_push_node1 = self.setup_lightpush_node(NODE_2, node_index=1, lightpush=lightpush, relay=relay, **kwargs) @allure.step - def setup_second_lightpush_node(self, lightpush="true", relay="false", **kwargs): + def setup_second_lightpush_node(self, lightpush="true", relay="true", **kwargs): self.light_push_node2 = self.setup_lightpush_node(NODE_2, node_index=2, lightpush=lightpush, relay=relay, **kwargs) @allure.step @@ -83,7 +83,7 @@ class StepsLightPush(StepsCommon): pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test") self.additional_lightpush_nodes = [] for index, node in enumerate(nodes): - node = self.setup_lightpush_node(node, node_index=index + 2, lightpush="true", relay="false", **kwargs) + node = self.setup_lightpush_node(node, node_index=index + 2, lightpush="true", relay="true", **kwargs) self.additional_lightpush_nodes.append(node) @allure.step diff --git a/src/steps/network_conditions.py b/src/steps/network_conditions.py new file mode 100644 index 000000000..b96d9a0cc --- /dev/null +++ b/src/steps/network_conditions.py @@ -0,0 +1,201 @@ +import subprocess +from src.env_vars import NETWORK_NAME +from src.libs.custom_logger import get_custom_logger + +logger = get_custom_logger(__name__) + + +class TrafficController: + def _pid(self, node) -> int: + if not node.container: + raise RuntimeError("Node container not started yet") + + node.container.reload() + pid = node.container.attrs.get("State", {}).get("Pid") + if not pid or pid == 0: + raise RuntimeError("Container PID not available (container not running?)") + return int(pid) + + def _exec(self, node, tc_args: list[str], iface: str = "eth0"): + pid = self._pid(node) + + cmd = ["sudo", "-n", "nsenter", "-t", str(pid), "-n", "tc"] + tc_args + logger.info(f"TC exec: {cmd}") + + res = subprocess.run(cmd, capture_output=True, text=True) + if res.returncode != 0: + raise RuntimeError(f"TC failed: {' '.join(cmd)}\n" f"stdout: {res.stdout}\n" f"stderr: {res.stderr}") + + return res.stdout + + def log_tc_stats(self, node, iface: str = "eth0"): + """ + Log tc statistics for an interface (best-effort). + Useful to confirm netem loss/delay counters (sent/dropped/etc.). + """ + try: + out = self._exec(node, ["-s", "qdisc", "show", "dev", iface], iface=iface) + out = (out or "").strip() + if out: + logger.debug(f"tc -s qdisc show dev {iface}:\n{out}") + else: + logger.debug(f"tc -s qdisc show dev {iface}: (no output)") + except Exception as e: + logger.debug(f"Failed to read tc stats for {iface}: {e}") + + def clear(self, node, iface: str = "eth0"): + try: + self._exec(node, ["qdisc", "del", "dev", iface, "root"], iface=iface) + except RuntimeError as e: + msg = str(e) + if "Cannot delete qdisc with handle of zero" in msg or "No such file or directory" in msg: + return + raise + + def add_latency(self, node, ms: int, iface: str = "eth0"): + self.clear(node, iface=iface) + self._exec(node, ["qdisc", "add", "dev", iface, "root", "netem", "delay", f"{ms}ms"], iface=iface) + + def add_packet_loss(self, node, percent: float, iface: str = "eth0"): + self.clear(node, iface=iface) + + self._exec( + node, + ["qdisc", "add", "dev", iface, "root", "netem", "loss", f"{percent}%"], + iface=iface, + ) + try: + stats = self._exec(node, ["-s", "qdisc", "show", "dev", iface], iface=iface) + if stats is not None: + if isinstance(stats, (bytes, bytearray)): + stats = stats.decode(errors="replace") + logger.debug(f"tc -s qdisc show dev {iface}:\n{stats}") + else: + logger.debug(f"Executed: tc -s qdisc show dev {iface} (no output returned by _exec)") + except Exception as e: + logger.debug(f"Failed to read tc stats for {iface}: {e}") + + def add_bandwidth(self, node, rate: str, iface: str = "eth0"): + self.clear(node, iface=iface) + self._exec( + node, + ["qdisc", "add", "dev", iface, "root", "tbf", "rate", rate, "burst", "32kbit", "limit", "12500"], + iface=iface, + ) + + def add_packet_loss_correlated( + self, + node, + percent: float, + correlation: float, + iface: str = "eth0", + ): + self.clear(node, iface=iface) + self._exec( + node, + [ + "qdisc", + "add", + "dev", + iface, + "root", + "netem", + "loss", + f"{percent}%", + f"{correlation}%", + ], + iface=iface, + ) + + def add_packet_reordering( + self, + node, + percent: int = 25, + correlation: int = 50, + delay_ms: int = 10, + iface: str = "eth0", + ): + self.clear(node, iface=iface) + + self._exec( + node, + [ + "qdisc", + "add", + "dev", + iface, + "root", + "netem", + "delay", + f"{delay_ms}ms", + "reorder", + f"{percent}%", + f"{correlation}%", + ], + iface=iface, + ) + + def _p2p_iface(self, node) -> str: + """ + Return the name of the container interface attached to the waku + network (where libp2p traffic flows). + + DockerManager attaches each node to two networks: the default bridge + (where host-published ports land, typically `eth0`) and the waku + network (where inter-container libp2p/gossipsub traffic flows, typically + `eth1`). tc on the default bridge only affects REST control plane; for + a packet loss test targeting libp2p we need the waku interface. + + This helper resolves the correct interface by looking up the node's + waku-network IP via Docker and matching it against `ip -o -4 addr` + output from inside the container. + """ + if not node.container: + raise RuntimeError("Node container not started yet") + node.container.reload() + networks = node.container.attrs.get("NetworkSettings", {}).get("Networks", {}) + waku_net = networks.get(NETWORK_NAME) + if not waku_net or not waku_net.get("IPAddress"): + raise RuntimeError(f"Container is not attached to the '{NETWORK_NAME}' docker network") + waku_ip = waku_net["IPAddress"] + + exit_code, output = node.container.exec_run(["ip", "-o", "-4", "addr"]) + if exit_code != 0: + raise RuntimeError(f"ip addr failed inside container: {output}") + for line in output.decode().splitlines(): + if f" {waku_ip}/" in line: + tokens = line.split() + if len(tokens) >= 2: + return tokens[1] + raise RuntimeError(f"No interface inside container holds waku IP {waku_ip}") + + def clear_p2p(self, node): + """ + Remove any tc rule previously installed on the node's waku (libp2p) + interface. Paired with add_packet_loss_p2p_only / + add_packet_loss_correlated_p2p_only. + """ + self.clear(node, iface=self._p2p_iface(node)) + + def add_packet_loss_p2p_only(self, node, percent: float): + """ + Apply uncorrelated packet loss to the waku (libp2p) network interface + of a node. REST API traffic rides a separate docker interface and is + not affected, so the test harness's control plane stays reliable. + """ + iface = self._p2p_iface(node) + self.clear(node, iface=iface) + self._exec(node, f"qdisc add dev {iface} root netem loss {percent}%".split(), iface=iface) + + def add_packet_loss_correlated_p2p_only(self, node, percent: float, correlation: float): + """ + Correlated packet loss on the waku (libp2p) network interface. See + add_packet_loss_p2p_only for why REST stays unaffected. + """ + iface = self._p2p_iface(node) + self.clear(node, iface=iface) + self._exec( + node, + f"qdisc add dev {iface} root netem loss {percent}% {correlation}%".split(), + iface=iface, + ) diff --git a/src/steps/rln.py b/src/steps/rln.py index cd2ca138e..1bf051df2 100644 --- a/src/steps/rln.py +++ b/src/steps/rln.py @@ -26,6 +26,7 @@ class StepsRLN(StepsCommon): multiaddr_list = [] lightpush_nodes = [] keystore_prefixes = [] + rln_membership_indexes = [] @allure.step def generate_keystore_prefixes(self, count=2): @@ -38,15 +39,19 @@ class StepsRLN(StepsCommon): @allure.step def register_rln_relay_nodes(self, count, orig_prefixes): if count > 0: - logger.debug(111111111111111) self.keystore_prefixes = self.generate_keystore_prefixes(count) + self.rln_membership_indexes = [] for i, prefix in enumerate(self.keystore_prefixes): - logger.debug(000000000000000000000) - self.register_rln_single_node(prefix=prefix, rln_creds_source=RLN_CREDENTIALS, rln_creds_id=f"{i+1}") + membership_index = self.register_rln_single_node(prefix=prefix, rln_creds_source=RLN_CREDENTIALS, rln_creds_id=f"{i+1}") + self.rln_membership_indexes.append(membership_index) else: - self.keystore_prefixes = orig_prefixes + self.keystore_prefixes = orig_prefixes.get("keystore_prefixes", []) + self.rln_membership_indexes = orig_prefixes.get("rln_membership_indexes", []) - return self.keystore_prefixes + return { + "keystore_prefixes": self.keystore_prefixes, + "rln_membership_indexes": self.rln_membership_indexes, + } @allure.step def setup_main_rln_relay_nodes(self, **kwargs): @@ -60,7 +65,7 @@ class StepsRLN(StepsCommon): relay="true", rln_creds_source=RLN_CREDENTIALS, rln_creds_id="1", - rln_relay_membership_index="1", + rln_relay_membership_index=self.resolve_rln_membership_index(0, **kwargs), rln_keystore_prefix=self.keystore_prefixes[0], **kwargs, ) @@ -78,7 +83,7 @@ class StepsRLN(StepsCommon): discv5_bootstrap_node=self.enr_uri, rln_creds_source=RLN_CREDENTIALS, rln_creds_id="2", - rln_relay_membership_index="1", + rln_relay_membership_index=self.resolve_rln_membership_index(1, **kwargs), rln_keystore_prefix=self.keystore_prefixes[1], **kwargs, ) @@ -101,7 +106,7 @@ class StepsRLN(StepsCommon): discv5_bootstrap_node=self.enr_uri, rln_creds_source=RLN_CREDENTIALS, rln_creds_id=f"{index + 3}", - rln_relay_membership_index="1", + rln_relay_membership_index=self.resolve_rln_membership_index(index + 2, **kwargs), rln_keystore_prefix=self.keystore_prefixes[index + 2], **kwargs, ) @@ -118,7 +123,7 @@ class StepsRLN(StepsCommon): lightpushnode=self.multiaddr_list[0], rln_creds_source=RLN_CREDENTIALS, rln_creds_id="2", - rln_relay_membership_index="1", + rln_relay_membership_index=self.resolve_rln_membership_index(1, **kwargs), rln_keystore_prefix=self.keystore_prefixes[1], **kwargs, ) @@ -131,7 +136,23 @@ class StepsRLN(StepsCommon): def register_rln_single_node(self, prefix="", **kwargs): logger.debug("Registering RLN credentials for single node") self.node = WakuNode(DEFAULT_NWAKU, f"node_{gen_step_id()}") - self.node.register_rln(rln_keystore_prefix=prefix, rln_creds_source=kwargs["rln_creds_source"], rln_creds_id=kwargs["rln_creds_id"]) + return self.node.register_rln(rln_keystore_prefix=prefix, rln_creds_source=kwargs["rln_creds_source"], rln_creds_id=kwargs["rln_creds_id"]) + + @allure.step + def resolve_rln_membership_index(self, index, **kwargs): + explicit_index = kwargs.get("rln_relay_membership_index") + if explicit_index is not None: + return str(explicit_index) + + if len(self.rln_membership_indexes) > index and self.rln_membership_indexes[index] is not None: + inferred_index = str(self.rln_membership_indexes[index]) + logger.debug(f"Using inferred RLN membership index for position {index}: {inferred_index}") + return inferred_index + + raise ValueError( + f"RLN membership index for position {index} is not available. " + "Register credentials and persist rln_membership_indexes together with keystore_prefixes before node startup." + ) @allure.step def check_rln_registration(self, prefix, key_id): diff --git a/src/steps/sharding.py b/src/steps/sharding.py index e39f7f3d9..7c8aabc25 100644 --- a/src/steps/sharding.py +++ b/src/steps/sharding.py @@ -20,9 +20,9 @@ logger = get_custom_logger(__name__) class StepsSharding(StepsRelay): test_content_topic = "/myapp/1/latest/proto" - test_pubsub_topic = "/waku/2/rs/2/0" + test_pubsub_topic = "/waku/2/rs/199/0" test_payload = "Sharding works!!" - auto_cluster = 2 + auto_cluster = 199 num_shards_in_network = 8 @pytest.fixture(scope="function", autouse=True) @@ -195,7 +195,13 @@ class StepsSharding(StepsRelay): self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic, content_topic=content_topic) raise AssertionError("Retrieving messages on not subscribed content topic worked!!!") except Exception as ex: - assert "Not Found" in str(ex) + error_message = str(ex) + expected_errors = [ + "Not Found", + "NoPeersToPublish", + "Failed to publish: publish failed in relay: NoPeersToPublish", + ] + assert any(expected in error_message for expected in expected_errors), error_message @allure.step def check_publish_fails_on_not_subscribed_pubsub_topic(self, pubsub_topic): diff --git a/src/test_data.py b/src/test_data.py index 4a8befe57..659dd39ae 100644 --- a/src/test_data.py +++ b/src/test_data.py @@ -22,12 +22,12 @@ SAMPLE_INPUTS = [ {"description": "Email format", "value": "user@example.com"}, {"description": "URL format", "value": "http://example.com"}, {"description": "Date and time in ISO format", "value": "2023-11-01T12:00:00Z"}, - # {"description": "String with escaped quotes", "value": '"Escaped" \\"quotes\\"'}, # https://github.com/waku-org/nwaku/issues/3572 + {"description": "String with escaped quotes", "value": '"Escaped" \\"quotes\\"'}, {"description": "A regular expression", "value": "Regular expression: ^[a-z0-9_-]{3,16}$"}, {"description": "A very long string", "value": "x" * 1000}, {"description": "A JSON string", "value": '{"name": "John", "age": 30, "city": "New York"}'}, {"description": "A Unix path", "value": "/usr/local/bin"}, - # {"description": "A Windows path", "value": "C:\\Windows\\System32"}, # https://github.com/waku-org/nwaku/issues/3572 + {"description": "A Windows path", "value": "C:\\Windows\\System32"}, {"description": "An SQL query", "value": "SELECT * FROM users WHERE id = 1;"}, {"description": "JavaScript code snippet", "value": "function test() { console.log('Hello World'); }"}, {"description": "A CSS snippet", "value": "body { background-color: #fff; }"}, @@ -96,7 +96,7 @@ CONTENT_TOPICS_SHARD_7 = [ "/newsService/4.0/updates/yaml", ] -DEFAULT_CLUSTER_ID = "3" +DEFAULT_CLUSTER_ID = "198" VALID_PUBSUB_TOPICS = [ f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0", f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1", @@ -130,14 +130,14 @@ PUBSUB_TOPICS_DIFFERENT_CLUSTERS = [ ] PUBSUB_TOPICS_SAME_CLUSTER = [ - "/waku/2/rs/2/0", - "/waku/2/rs/2/1", - "/waku/2/rs/2/2", - "/waku/2/rs/2/3", - "/waku/2/rs/2/4", - "/waku/2/rs/2/5", - "/waku/2/rs/2/6", - "/waku/2/rs/2/7", + "/waku/2/rs/199/0", + "/waku/2/rs/199/1", + "/waku/2/rs/199/2", + "/waku/2/rs/199/3", + "/waku/2/rs/199/4", + "/waku/2/rs/199/5", + "/waku/2/rs/199/6", + "/waku/2/rs/199/7", ] PUBSUB_TOPICS_WRONG_FORMAT = [ @@ -168,7 +168,7 @@ SAMPLE_TIMESTAMPS = [ {"description": "Missing", "value": None, "valid_for": []}, ] -PUBSUB_TOPICS_RLN = ["/waku/2/rs/1/0"] +PUBSUB_TOPICS_RLN = [f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0"] LOG_ERROR_KEYWORDS = [ "crash", @@ -203,14 +203,6 @@ METRICS_WITH_INITIAL_VALUE_ZERO = [ "libp2p_failed_dials_total", "waku_rln_messages_total_total", "waku_rln_spam_messages_total_total", - "waku_rln_valid_messages_total_sum", - "waku_rln_valid_messages_total_count", - 'waku_rln_valid_messages_total_bucket{le="10.0"}', - 'waku_rln_valid_messages_total_bucket{le="20.0"}', - 'waku_rln_valid_messages_total_bucket{le="30.0"}', - 'waku_rln_valid_messages_total_bucket{le="40.0"}', - 'waku_rln_valid_messages_total_bucket{le="50.0"}', - 'waku_rln_valid_messages_total_bucket{le="+Inf"}', "waku_rln_proof_verification_total_total", "waku_rln_number_registered_memberships", "waku_rln_proof_verification_duration_seconds", @@ -285,40 +277,6 @@ METRICS_WITH_INITIAL_VALUE_ZERO = [ 'waku_archive_query_duration_seconds_bucket{le="7.5"}', 'waku_archive_query_duration_seconds_bucket{le="10.0"}', 'waku_archive_query_duration_seconds_bucket{le="+Inf"}', - "waku_legacy_archive_insert_duration_seconds_sum", - "waku_legacy_archive_insert_duration_seconds_count", - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.005"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.01"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.025"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.05"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.075"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.1"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.25"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.5"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="0.75"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="1.0"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="2.5"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="5.0"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="7.5"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="10.0"}', - 'waku_legacy_archive_insert_duration_seconds_bucket{le="+Inf"}', - "waku_legacy_archive_query_duration_seconds_sum", - "waku_legacy_archive_query_duration_seconds_count", - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.005"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.01"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.025"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.05"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.075"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.1"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.25"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.5"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="0.75"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="1.0"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="2.5"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="5.0"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="7.5"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="10.0"}', - 'waku_legacy_archive_query_duration_seconds_bucket{le="+Inf"}', "waku_filter_subscriptions", "waku_filter_handle_message_duration_seconds_sum", "waku_filter_handle_message_duration_seconds_count", @@ -345,7 +303,7 @@ METRICS_WITH_INITIAL_VALUE_ZERO = [ "waku_px_peers_received_total", "waku_px_peers_received_unknown", "waku_px_peers_sent_total", - "waku_px_peers_cached", + "waku_px_peers", "waku_histogram_message_size_sum", "waku_histogram_message_size_count", 'waku_histogram_message_size_bucket{le="0.0"}', @@ -391,8 +349,8 @@ METRICS_WITH_INITIAL_VALUE_ZERO = [ 'waku_filter_handle_message_duration_seconds_bucket{le="20.0"}', 'waku_filter_handle_message_duration_seconds_bucket{le="30.0"}', "total_messages_cached", - "waku_legacy_store_queries_total", "waku_store_queries_total", "mix_pool_size", "libp2p_gossipsub_imreceiving_saved_messages_total", + "postgres_payload_size_bytes", ] diff --git a/tests/discv5/test_discv5.py b/tests/discv5/test_discv5.py index b8ab88b36..933a489d4 100644 --- a/tests/discv5/test_discv5.py +++ b/tests/discv5/test_discv5.py @@ -66,7 +66,7 @@ class TestDiscv5(StepsRelay, StepsFilter, StepsStore, StepsLightPush): self.light_push_node1 = self.running_a_node( NODE_2, lightpush="true", - relay="false", + relay="true", discv5_bootstrap_node=self.receiving_node1.get_enr_uri(), lightpushnode=self.receiving_node1.get_multiaddr_with_id(), ) diff --git a/tests/e2e/test_network_conditions.py b/tests/e2e/test_network_conditions.py new file mode 100644 index 000000000..fed949313 --- /dev/null +++ b/tests/e2e/test_network_conditions.py @@ -0,0 +1,781 @@ +import pytest +from time import time, sleep +from src.libs.custom_logger import get_custom_logger +from src.env_vars import NODE_1, NODE_2 +from src.node.waku_node import WakuNode +from src.steps.relay import StepsRelay +from src.libs.common import delay +from src.steps.network_conditions import TrafficController +from src.libs.common import to_base64 + +logger = get_custom_logger(__name__) + + +class TestNetworkConditions(StepsRelay): + @pytest.fixture(scope="function", autouse=True) + def setup_nodes(self, request): + self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}") + self.tc = TrafficController() + + def test_relay_with_latency_between_two_nodes(self): + logger.info("Starting node1 and node2 with relay enabled") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + + logger.info("Subscribing both nodes to relay topic") + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.info("Waiting for autoconnection") + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + logger.debug("Applying 5000ms latency to node2") + self.tc.add_latency(self.node2, ms=5000) + message = self.create_message() + + logger.debug("Publishing message from node1") + + self.node1.send_relay_message(message, self.test_pubsub_topic) + logger.debug("Fetching relay messages on node2") + t0 = time() + messages = self.node2.get_relay_messages(self.test_pubsub_topic) + dt = time() - t0 + assert messages, "Messages aren't arrive" + assert dt >= 4.5, f"Expected slow GET due to latency, got {dt}" + assert dt <= 10.5, "msg took too long" + self.tc.clear(self.node2) + + @pytest.mark.timeout(60 * 8) + def test_relay_7_nodes_3sec_latency(self): + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + self.node5 = WakuNode(NODE_2, f"node5_{self.test_id}") + self.node6 = WakuNode(NODE_2, f"node6_{self.test_id}") + self.node7 = WakuNode(NODE_2, f"node7_{self.test_id}") + + nodes = [self.node1, self.node2, self.node3, self.node4, self.node5, self.node6, self.node7] + + logger.info("Starting nodes with chain bootstrap") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + self.node5.start(relay="true", discv5_bootstrap_node=self.node4.get_enr_uri()) + self.node6.start(relay="true", discv5_bootstrap_node=self.node5.get_enr_uri()) + self.node7.start(relay="true", discv5_bootstrap_node=self.node6.get_enr_uri()) + + logger.info("Subscribing all nodes to relay topic") + for node in nodes: + node.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.info("Waiting for autoconnection") + self.wait_for_autoconnection(nodes, hard_wait=60) + + logger.info("Applying 3s latency to node3") + self.tc.add_latency(self.node3, ms=3000) + + t_start = time() + _ = self.node3.get_relay_messages(self.test_pubsub_topic) + elapsed = time() - t_start + + logger.info(f"Observed GET latency on node3: {elapsed:.2f}s") + assert elapsed >= 2.8, f"Expected ~3s latency on node3 GET, got {elapsed:.2f}s" + + @pytest.mark.timeout(60 * 6) + def test_relay_4_nodes_sender_latency(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + logger.info("Starting 4 nodes with relay enabled (bootstrap chain)") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=40) + + latency_ms = 3000 + logger.info(f"Applying {latency_ms}ms latency on sender node1") + self.tc.add_latency(self.node1, ms=latency_ms) + + t_pub0 = time() + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + publish_dt = time() - t_pub0 + + assert publish_dt > ((latency_ms * 2) / 1000.0) - 0.4, f"Expected publish call to be slowed by sender latency. " + assert publish_dt <= ((latency_ms * 2) / 1000.0) + 0.4, f"Publish call took too long" + # latency is doubled as request + response both will have latency + + deadline = t_pub0 + 10.0 + received = False + + while time() < deadline: + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + received = True + break + delay(0.2) + + assert received, f"node4 did not receive any relay message within {deadline}" + + self.tc.clear(self.node1) + + @pytest.mark.timeout(60 * 8) + def test_relay_4_nodes_two_publishers_compare_latency(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + logger.info("Starting 4 nodes ") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=60) + + latency_ms = 3000 + logger.debug(f"Applying {latency_ms}ms latency on node1 only") + self.tc.add_latency(self.node1, ms=latency_ms) + + node1_dts = [] + node2_dts = [] + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for i in range(5): + t0 = time() + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + node1_dts.append(time() - t0) + + t0 = time() + self.node2.send_relay_message( + self.create_message(), + self.test_pubsub_topic, + ) + node2_dts.append(time() - t0) + + delay(0.2) + + for dt in node1_dts: + assert dt <= ((latency_ms * 2) / 1000.0) + 0.4, "node1 publish took too long" + + for dt in node2_dts: + assert dt < 1.0, f"Expected node2 publish to be fast" + + deadline = time() + 10.0 + received = False + while time() < deadline: + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + received = True + break + delay(0.2) + + assert received, f"node4 did not receive any relay message within time" + + self.tc.clear(self.node1) + + @pytest.mark.timeout(60 * 6) + @pytest.mark.parametrize( + "latency_ms", + [ + 200, + 500, + 1000, + 5000, + 7000, + ], + ) + def test_relay_different_latency_between_two_nodes(self, latency_ms): + logger.info("Starting node1 and node2 with relay enabled") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + + logger.info("Subscribing both nodes to relay topic") + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.info("Waiting for autoconnection") + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + logger.info(f"Applying {latency_ms}ms latency to node2") + self.tc.clear(self.node2) + if latency_ms > 0: + self.tc.add_latency(self.node2, ms=latency_ms) + + message = self.create_message() + self.node1.send_relay_message(message, self.test_pubsub_topic) + t0 = time() + messages = self.node2.get_relay_messages(self.test_pubsub_topic) + dt = time() - t0 + + assert messages, "No relay messages returned (publish/relay may have failed)" + expected_s = (latency_ms / 1000.0) * 2 + tolerance_s = 0.5 + assert dt >= expected_s - tolerance_s, f"Expected >= {expected_s - tolerance_s}s, got {dt}s" + assert dt <= expected_s + tolerance_s, f"Expected <= {expected_s + tolerance_s}s, got {dt}s" + self.tc.clear(self.node2) + + @pytest.mark.timeout(60 * 10) + def test_latency_with_load_sender_side(self): + latency_ms = 3000 + total_messages = 50 + wait_time = 40.0 + acceptable_msgs = total_messages / 2 + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + logger.info("Starting 4 nodes with relay enabled") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + + logger.info("Subscribing all nodes to relay topic") + for n in [self.node1, self.node2, self.node3, self.node4]: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2, self.node3, self.node4], hard_wait=30) + + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + logger.info(f"Applying {latency_ms}ms latency on sender node1") + self.tc.clear(self.node1) + self.tc.add_latency(self.node1, ms=latency_ms) + + logger.info(f"Sending {total_messages} messages from node1") + for i in range(total_messages): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + received_count = 0 + last_count = 0 + ticks = 0 + + deadline = time() + wait_time + while time() < deadline: + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + if len(msgs) > last_count: + ticks += 1 + last_count = len(msgs) + received_count = max(received_count, len(msgs)) + if received_count >= acceptable_msgs: + break + delay(1) + + logger.info(f"Node4 received {received_count} messages " f"(min_expected={acceptable_msgs}, total_sent={total_messages})") + + assert received_count >= acceptable_msgs, "relay stalled or dropped all traffic; " + self.tc.clear(self.node1) + + def test_relay_4_nodes_sender_packet_loss(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + logger.info("Starting 4 nodes with relay enabled (bootstrap chain)") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + loss_percent = 30.0 + logger.info(f"Applying {loss_percent}% packet loss on sender node1") + self.tc.add_packet_loss(self.node1, percent=loss_percent) + + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + self.tc.log_tc_stats(self.node1) + deadline = time() + 10.0 + received = False + cnt = 0 + + while time() < deadline: + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + received = True + break + delay(0.2) + cnt = cnt + 1 + + assert received, f"Node4 did not receive any relay message" + logger.info(f"Node4 received messages from node1 after " f"{cnt} trails") + + self.tc.clear(self.node1) + + @pytest.mark.xfail(reason="Fails under high packet loss percentage 50") + def test_relay_4_nodes_sender_packet_loss_50_15sec_timeout(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + loss = 50.0 + total_msgs = 30 + window_s = 15.0 + + self.tc.clear(self.node1) + self.tc.add_packet_loss(self.node1, percent=loss) + + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + batch_tag = f"loss={loss}-{self.test_id}" + batch_tag_b64 = to_base64(batch_tag) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + self.tc.log_tc_stats(self.node1) + delay(window_s) + + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + received = len(msgs) + + logger.info(f"[LOSS={loss}%] sent={total_msgs} received={received} window={window_s}s") + + self.tc.clear(self.node1) + + assert received > int(total_msgs * 0.8), "No messages received under 50% packet loss" + + @pytest.mark.timeout(60 * 10) + @pytest.mark.xfail(reason="Fails under high packet loss percentage") + @pytest.mark.parametrize("loss", [40.0, 60.0], ids=["loss40", "loss60"]) + def test_relay_4_nodes_sender_packet_loss_delivery_ratio_simple(self, loss): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + total_msgs = 30 + window_s = 40.0 + + self.tc.clear(self.node1) + self.tc.add_packet_loss(self.node1, percent=loss) + + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + self.tc.log_tc_stats(self.node1) + delay(window_s) + + msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or [] + received = len(msgs) + + logger.info(f"[LOSS={loss}%] sent={total_msgs} received={received} window={window_s}s") + + self.tc.clear(self.node1) + + assert received > int(total_msgs * 0.8), f"No messages received at {loss}% packet loss" + + @pytest.mark.timeout(60 * 10) + def test_relay_packet_loss_correlated_vs_uncorrelated(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + total_msgs = 5 + window_s = 30.0 + loss = 50.0 + + self.tc.add_packet_loss_p2p_only(self.node1, percent=loss) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + uncorrelated = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear_p2p(self.node1) + + self.tc.add_packet_loss_correlated_p2p_only(self.node1, percent=loss, correlation=75.0) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + correlated = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear_p2p(self.node1) + + logger.debug(f"uncorrelated={uncorrelated} correlated={correlated}") + assert uncorrelated >= correlated + assert correlated > 0 + + @pytest.mark.timeout(60 * 10) + def test_relay_packet_loss_sender_vs_receiver(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + total_msgs = 30 + window_s = 30.0 + loss = 50.0 + + self.tc.add_packet_loss(self.node1, percent=loss) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + sender_loss = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear(self.node1) + + self.tc.add_packet_loss(self.node4, percent=loss) + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(total_msgs): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + receiver_loss = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + logger.debug(f"sender_loss={sender_loss} receiver_loss={receiver_loss}") + self.tc.clear(self.node4) + + assert sender_loss > 0 + assert receiver_loss > 0 + + @pytest.mark.timeout(60 * 10) + def test_relay_packet_loss_applied_mid_way(self): + self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri()) + self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri()) + + nodes = [self.node1, self.node2, self.node3, self.node4] + for n in nodes: + n.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection(nodes, hard_wait=20) + + window_s = 30.0 + _ = self.node4.get_relay_messages(self.test_pubsub_topic) + + for _ in range(10): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + self.tc.add_packet_loss(self.node1, percent=50.0) + + for _ in range(20): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + delay(window_s) + received = len(self.node4.get_relay_messages(self.test_pubsub_topic) or []) + self.tc.clear(self.node1) + + assert received > 0 + + @pytest.mark.timeout(60 * 2) + def test_relay_2_nodes_low_bandwidth_reliability(self): + msg_count = 200 + max_wait = 50 + poll_sleep = 0.5 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity="250", + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + self.tc.add_bandwidth(self.node1, rate="256kbit") + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + t0 = time() + received = 0 + while time() - t0 < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + received = len(msgs) + if received >= msg_count: + break + sleep(poll_sleep) + + elapsed = time() - t0 + logger.info(f"low_bw_elapsed={elapsed:.2f}s received={received} msg_count={msg_count}") + + assert received >= msg_count + + @pytest.mark.timeout(60 * 8) + def test_relay_2_nodes_low_bandwidth_sending_over_time(self): + msg_count = 200 + cache_capacity = "250" + + send_interval_s = 0.05 + poll_interval_s = 0.5 + max_wait_s = 240 + min_received = 150 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity=cache_capacity, + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + # mirror your suite's "low bandwidth" shaping values (rate + burst/limit handled by the tc wrapper defaults) + self.tc.add_bandwidth(self.node1, rate="256kbit") + + received = 0 + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + sleep(send_interval_s) + + t0 = time() + while time() - t0 < max_wait_s and received < msg_count: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + if msgs: + received += len(msgs) + if received >= msg_count: + break + sleep(poll_interval_s) + + total_time = time() - t0 + + logger.info( + "low_bw_reliability " + f"rate=256kbit msg_count={msg_count} cache_capacity={cache_capacity} " + f"send_interval_s={send_interval_s} poll_interval_s={poll_interval_s} " + f"recv_duration={total_time:.2f}s" + ) + + assert received >= min_received, f"received {received}/{msg_count} under low bandwidth" + + @pytest.mark.timeout(60 * 8) + def test_relay_2_nodes_bandwidth_low_vs_high_drain_time(self): + # large payload (~16KB) so 50 msgs = ~800KB total, + # making 256kbit meaningfully slower than 10mbit on loopback + msg_count = 50 + large_payload = to_base64("x" * 16_000) + cache = "100" + # fine-grained poll so sub-second differences are measurable + poll_sleep = 0.05 + max_wait = 200 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true", rest_relay_cache_capacity=cache) + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity=cache, + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + # apply tc to both nodes so intra-host loopback fast-path is throttled + self.tc.add_bandwidth(self.node1, rate="256kbit") + self.tc.add_bandwidth(self.node2, rate="256kbit") + + _ = self.node2.get_relay_messages(self.test_pubsub_topic) + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic) + + total_low_msgs = 0 + t0 = time() + while total_low_msgs < msg_count and (time() - t0) < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + total_low_msgs += len(msgs) + sleep(poll_sleep) + low_rate_t = time() - t0 + + # upgrade both nodes to high bandwidth + self.tc.add_bandwidth(self.node1, rate="10mbit") + self.tc.add_bandwidth(self.node2, rate="10mbit") + + _ = self.node2.get_relay_messages(self.test_pubsub_topic) + sleep(1) # let the phase-1 shaper queue fully drain before phase 2 + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic) + + total_high_msgs = 0 + t1 = time() + while total_high_msgs < msg_count and (time() - t1) < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + total_high_msgs += len(msgs) + sleep(poll_sleep) + high_rate_t = time() - t1 + + logger.info( + f"low_rate_t={low_rate_t:.2f}s high_rate_t={high_rate_t:.2f}s " + f"total_low_msgs={total_low_msgs} total_high_msgs={total_high_msgs} " + f"msg_count={msg_count} cache={cache}" + ) + + assert total_low_msgs >= msg_count + assert total_high_msgs >= msg_count + # Assert high bandwidth was meaningfully faster, not just marginally so, + # to absorb scheduling jitter on localhost Docker + assert high_rate_t < low_rate_t / 2 + + @pytest.mark.timeout(60 * 6) + def test_relay_2_nodes_packet_reordering(self): + msg_count = 200 + cache_capacity = "200" + poll_sleep = 0.5 + max_wait = 120 + + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + + self.node1.start(relay="true") + self.node2.start( + relay="true", + discv5_bootstrap_node=self.node1.get_enr_uri(), + rest_relay_cache_capacity=cache_capacity, + ) + + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10) + + self.tc.add_packet_reordering(self.node2, percent=25, correlation=50) + + for _ in range(msg_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + + received = 0 + t0 = time() + while received < msg_count and time() - t0 < max_wait: + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + received += len(msgs) + sleep(poll_sleep) + + elapsed = time() - t0 + + logger.info(f"packet_reordering " f"reorder=25% corr=50% " f"msg_count={msg_count} received={received} " f"elapsed={elapsed:.2f}s") + + assert received >= msg_count + + @pytest.mark.timeout(60 * 6) + def test_relay_2_nodes_temporary_blackout_recovers_no_helpers(self): + msgs_count = 100 + self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}") + self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") + self.tc = TrafficController() + + logger.info("Starting node1 and node2 with relay enabled") + self.node1.start(relay="true") + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + + logger.info("Subscribing both nodes to relay topic") + self.node1.set_relay_subscriptions([self.test_pubsub_topic]) + self.node2.set_relay_subscriptions([self.test_pubsub_topic]) + + logger.info("Waiting for autoconnection") + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=15) + logger.info(f"Applying 100%% packet loss on both nodes ") + self.tc.clear(self.node1) + self.tc.clear(self.node2) + self.tc.add_packet_loss(self.node1, percent=100.0) + self.tc.add_packet_loss(self.node2, percent=100.0) + + delay(5) + logger.info("Clearing tc rules (restore connectivity)") + self.tc.clear(self.node1) + self.tc.clear(self.node2) + + logger.info("Waiting for peer list recovery on both nodes") + peers1 = 0 + while time() < time() + 30.0: + peers1 = self.node1.get_peers() or [] + peers2 = self.node2.get_peers() or [] + if len(peers1) > 0 and len(peers2) > 0: + break + delay(0.5) + + assert len(peers1) > 0, "Peers did not recover after blackout (would require restart)" + + logger.info("Publishing after recovery") + for _ in range(msgs_count): + self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic) + delay(5) + msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or [] + assert len(msgs) >= msgs_count - 10, "Post-recovery message was not delivered" + logger.info(f"{len(msgs)} messages were delivered") + self.tc.clear(self.node1) + self.tc.clear(self.node2) diff --git a/tests/filter/test_subscribe_create.py b/tests/filter/test_subscribe_create.py index ca8d0221e..5a57b277d 100644 --- a/tests/filter/test_subscribe_create.py +++ b/tests/filter/test_subscribe_create.py @@ -1,4 +1,5 @@ import pytest +from uuid import uuid4 from src.env_vars import NODE_1, NODE_2 from src.libs.custom_logger import get_custom_logger from src.test_data import INVALID_CONTENT_TOPICS, SAMPLE_INPUTS, VALID_PUBSUB_TOPICS @@ -60,9 +61,10 @@ class TestFilterSubscribeCreate(StepsFilter): assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" def test_filter_subscribe_to_29_content_topics_in_separate_calls(self, subscribe_main_nodes): + # subscribe_main_nodes already consumed 1 slot; make 29 more = 30 total, all must succeed _29_content_topics = [str(i) for i in range(29)] for content_topic in _29_content_topics: - self.create_filter_subscription({"requestId": "1", "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic}) + self.create_filter_subscription({"requestId": str(uuid4()), "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic}) failed_content_topics = [] for content_topic in _29_content_topics: logger.debug(f"Running test with content topic {content_topic}") @@ -73,11 +75,20 @@ class TestFilterSubscribeCreate(StepsFilter): logger.error(f"ContentTopic {content_topic} failed: {str(ex)}") failed_content_topics.append(content_topic) assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}" - try: - self.create_filter_subscription({"requestId": "1", "contentFilters": ["rate limited"], "pubsubTopic": self.test_pubsub_topic}) - raise AssertionError("The 30th subscribe call was not rate limited!!!") - except Exception as ex: - assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex) + # Rate limit is a token bucket (30/min); keep calling beyond 30 until one is denied + rate_limited = False + for extra in range(1, 20): + try: + self.create_filter_subscription( + {"requestId": str(uuid4()), "contentFilters": [f"extra_{extra}"], "pubsubTopic": self.test_pubsub_topic} + ) + logger.debug(f"Extra subscribe call #{extra} succeeded") + except Exception as ex: + assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex), f"Unexpected error on extra call #{extra}: {ex}" + logger.info(f"Rate limit hit on extra call #{extra}: {ex}") + rate_limited = True + break + assert rate_limited, "Rate limit was not triggered on any call beyond 30" def test_filter_subscribe_to_101_content_topics(self, subscribe_main_nodes): try: diff --git a/tests/light_push/test_multiple_nodes.py b/tests/light_push/test_multiple_nodes.py index 17f5880f5..c3bdf81a0 100644 --- a/tests/light_push/test_multiple_nodes.py +++ b/tests/light_push/test_multiple_nodes.py @@ -1,3 +1,4 @@ +import pytest from src.env_vars import NODE_1 from src.libs.common import delay from src.steps.light_push import StepsLightPush @@ -7,14 +8,14 @@ class TestMultipleNodes(StepsLightPush): def test_2_receiving_nodes__relay_node1_forwards_lightpushed_message_to_relay_node2(self): self.setup_first_receiving_node(lightpush="true", relay="true") self.setup_second_receiving_node(lightpush="false", relay="true") - self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_first_lightpush_node(lightpush="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) def test_2_receiving_nodes__relay_node1_forwards_lightpushed_message_to_filter_node2(self): self.setup_first_receiving_node(lightpush="true", relay="true", filter="true") self.setup_second_receiving_node(lightpush="false", relay="false", filternode=self.receiving_node1.get_multiaddr_with_id()) - self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_first_lightpush_node(lightpush="true", relay="true") helper_node = self.start_receiving_node(NODE_1, node_index=4, lightpush="false", relay="true") self.subscribe_to_pubsub_topics_via_relay(node=[self.receiving_node1, helper_node]) self.subscribe_to_pubsub_topics_via_filter(node=self.receiving_node2) @@ -22,6 +23,7 @@ class TestMultipleNodes(StepsLightPush): get_messages_response = self.receiving_node2.get_filter_messages(self.test_content_topic) assert len(get_messages_response) == 1, "Lightpushed message was not relayed to the filter node" + @pytest.mark.skip(reason=" test has m=no meaning now as relay = false won't work , moght be used in future if relay = false work again") def test_2_lightpush_nodes_and_2_receiving_nodes(self): self.setup_first_receiving_node(lightpush="true", relay="true") self.setup_second_receiving_node(lightpush="false", relay="true") @@ -34,28 +36,26 @@ class TestMultipleNodes(StepsLightPush): def test_combination_of_different_nodes(self): self.setup_first_receiving_node(lightpush="true", relay="true", filter="true") self.setup_second_receiving_node(lightpush="false", relay="false", filternode=self.receiving_node1.get_multiaddr_with_id()) - self.setup_first_lightpush_node(lightpush="true", relay="false") self.setup_second_lightpush_node(lightpush="true", relay="true") self.subscribe_to_pubsub_topics_via_relay(node=self.receiving_node1) self.subscribe_to_pubsub_topics_via_relay(node=self.light_push_node2) self.subscribe_to_pubsub_topics_via_filter(node=self.receiving_node2) delay(1) - self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node2) get_messages_response = self.receiving_node2.get_filter_messages(self.test_content_topic) - assert len(get_messages_response) == 2, "Lightpushed message was not relayed to the filter node" + assert len(get_messages_response) == 1, "Lightpushed message was not relayed to the filter node" def test_multiple_receiving_nodes(self): self.setup_first_receiving_node(lightpush="true", relay="true") self.setup_additional_receiving_nodes() - self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_first_lightpush_node(lightpush="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) def test_multiple_lightpush_nodes(self): self.setup_first_receiving_node(lightpush="true", relay="true") self.setup_second_receiving_node(lightpush="false", relay="true") - self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_first_lightpush_node(lightpush="true", relay="true") self.setup_additional_lightpush_nodes() self.subscribe_to_pubsub_topics_via_relay() self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) diff --git a/tests/light_push/test_running_nodes.py b/tests/light_push/test_running_nodes.py index 84efc78f1..d0aba7d3f 100644 --- a/tests/light_push/test_running_nodes.py +++ b/tests/light_push/test_running_nodes.py @@ -1,8 +1,10 @@ +import pytest from src.libs.common import delay from src.steps.light_push import StepsLightPush class TestRunningNodes(StepsLightPush): + @pytest.mark.skip(reason="lightpush no longer wroks with relay =false") def test_main_node_only_lightpush__peer_only_lightpush(self): self.setup_first_receiving_node(lightpush="true", relay="false") self.setup_first_lightpush_node(lightpush="true", relay="false") @@ -12,19 +14,19 @@ class TestRunningNodes(StepsLightPush): except Exception as ex: assert "no waku relay found" in str(ex) or "failed to negotiate protocol: protocols not supported" in str(ex) - def test_main_node_only_lightpush__peer_only_filter(self): + def test_main_node_relay_lightpush__peer_only_filter(self): self.setup_first_receiving_node(lightpush="false", relay="false", filter="true") - self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_first_lightpush_node(lightpush="true", relay="true") try: self.light_push_node1.send_light_push_message(self.create_payload()) raise AssertionError("Light push with non lightpush peer worked!!!") except Exception as ex: assert "Failed to request a message push: dial_failure" in str(ex) or "lightpush error" in str(ex) - def test_main_node_only_lightpush__peer_only_relay(self): + def test_main_node_relay_lightpush__peer_only_relay(self): self.setup_first_receiving_node(lightpush="false", relay="true") self.subscribe_to_pubsub_topics_via_relay() - self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_first_lightpush_node(lightpush="true", relay="true") try: self.light_push_node1.send_light_push_message(self.create_payload()) raise AssertionError("Light push with non lightpush peer worked!!!") @@ -34,7 +36,7 @@ class TestRunningNodes(StepsLightPush): def test_main_node_only_lightpush__peer_full(self): self.setup_first_receiving_node(lightpush="true", relay="true", filter="true") self.setup_second_receiving_node(lightpush="false", relay="true") - self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_first_lightpush_node(lightpush="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() self.check_light_pushed_message_reaches_receiving_peer() @@ -57,7 +59,7 @@ class TestRunningNodes(StepsLightPush): def test_lightpush_with_a_single_receiving_node(self): self.setup_first_receiving_node(lightpush="true", relay="true") - self.setup_first_lightpush_node(lightpush="true", relay="false") + self.setup_first_lightpush_node(lightpush="true", relay="true") self.subscribe_to_pubsub_topics_via_relay() try: self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1) diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 25eb9791c..e4e0547aa 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -94,11 +94,6 @@ class TestMetrics(StepsRelay, StepsMetrics, StepsFilter, StepsLightPush, StepsSt self.check_metric(self.publishing_node1, "waku_histogram_message_size_count", 1) self.check_metric(self.publishing_node1, 'waku_node_messages_total{type="relay"}', 1) if self.store_node1.is_nwaku(): - self.check_metric( - self.store_node1, - f'waku_service_peers{{protocol="/vac/waku/store/2.0.0-beta4",peerId="{self.publishing_node1.get_tcp_address()}"}}', - 1, - ) self.check_metric( self.store_node1, f'waku_service_peers{{protocol="/vac/waku/store-query/3.0.0",peerId="{self.publishing_node1.get_tcp_address()}"}}', diff --git a/tests/peer_connection_management/test_peer_store.py b/tests/peer_connection_management/test_peer_store.py index 43c1314ad..0766684d1 100644 --- a/tests/peer_connection_management/test_peer_store.py +++ b/tests/peer_connection_management/test_peer_store.py @@ -1,4 +1,5 @@ import pytest +from tenacity import retry, stop_after_delay, wait_fixed from src.env_vars import NODE_1, NODE_2 from src.libs.common import delay @@ -25,14 +26,20 @@ class TestPeerStore(StepsRelay, StepsStore): logger.debug(f"Node {i} peer ID {node_id}") ids.append(node_id) - for i in range(5): - others = [] - for peer_info in nodes[i].get_peers(): - logger.debug(f"Node {i} peer info {peer_info}") - peer_id = peer_info2id(peer_info, nodes[i].is_nwaku()) - others.append(peer_id) + # Discv5 discovery is eventually-consistent; poll until every node's peer + # store reflects the expected mesh, or fail after the timeout. + @retry(stop=stop_after_delay(60), wait=wait_fixed(2), reraise=True) + def check_peer_stores(): + for i in range(5): + others = [] + for peer_info in nodes[i].get_peers(): + logger.debug(f"Node {i} peer info {peer_info}") + peer_id = peer_info2id(peer_info, nodes[i].is_nwaku()) + others.append(peer_id) - assert (i == 0 and len(others) == 4) or (i > 0 and len(others) >= 1), f"Some nodes missing in the peer store of Node ID {ids[i]}" + assert (i == 0 and len(others) == 4) or (i > 0 and len(others) >= 1), f"Some nodes missing in the peer store of Node ID {ids[i]}" + + check_peer_stores() def test_add_peers(self): self.setup_main_nodes() @@ -41,27 +48,52 @@ class TestPeerStore(StepsRelay, StepsStore): nodes = [self.node1, self.node2] nodes.extend(self.optional_nodes) delay(10) - peers_multiaddr = set() - for i in range(2): - for peer_info in nodes[i].get_peers(): - multiaddr = peer_info2multiaddr(peer_info, nodes[i].is_nwaku()) - peers_multiaddr.add(multiaddr) - assert len(peers_multiaddr) == 5, f"Exactly 5 multi addresses are expected" + # Discv5 discovery is eventually-consistent; poll node1 and node2's peer + # stores until the union contains >=5 multiaddrs, or fail after the timeout. + @retry(stop=stop_after_delay(60), wait=wait_fixed(2), reraise=True) + def collect_multiaddrs(): + peers_multiaddr = set() + for i in range(2): + for peer_info in nodes[i].get_peers(): + multiaddr = peer_info2multiaddr(peer_info, nodes[i].is_nwaku()) + peers_multiaddr.add(multiaddr) - # Add peers one by one excluding self for Nodes 2-5 + assert len(peers_multiaddr) >= 5, f"At least 5 multi addresses are expected, got {len(peers_multiaddr)}: {peers_multiaddr}" + return peers_multiaddr + + peers_multiaddr = collect_multiaddrs() + + # Group multiaddrs by peer ID. libp2p identify can leak observed (ephemeral + # source) addresses into the peer store alongside the real listen address; + # those observed addrs are unreachable when dialed back, so we try every + # known address for a peer until one succeeds. + addrs_by_peer = {} + for peer in peers_multiaddr: + addrs_by_peer.setdefault(multiaddr2id(peer), []).append(peer) + + # For each of nodes 2-5, add every other peer via the add_peers API. for i in range(1, 5): - for peer in list(peers_multiaddr): - if nodes[i].get_id() != multiaddr2id(peer): + self_id = nodes[i].get_id() + for peer_id, addrs in addrs_by_peer.items(): + if peer_id == self_id: + continue + last_err = None + for addr in addrs: try: if nodes[i].is_nwaku(): - nodes[i].add_peers([peer]) + nodes[i].add_peers([addr]) else: - peer_info = {"multiaddr": peer, "shards": [0], "protocols": ["/vac/waku/relay/2.0.0"]} + peer_info = {"multiaddr": addr, "shards": [0], "protocols": ["/vac/waku/relay/2.0.0"]} nodes[i].add_peers(peer_info) + last_err = None + break except Exception as ex: - logger.error(f"Failed to add peer to Node {i} peer store: {ex}") - raise + last_err = ex + logger.warning(f"Node {i} failed to add peer {peer_id} via {addr}: {ex}") + if last_err is not None: + logger.error(f"Node {i} could not add peer {peer_id} via any known address") + raise last_err @pytest.mark.skip(reason="waiting for https://github.com/waku-org/nwaku/issues/1549 resolution") def test_get_peers_two_protocols(self): diff --git a/tests/relay/test_rln.py b/tests/relay/test_rln.py index 6f48ea8d2..311fcf77d 100644 --- a/tests/relay/test_rln.py +++ b/tests/relay/test_rln.py @@ -13,7 +13,6 @@ from src.test_data import SAMPLE_INPUTS logger = get_custom_logger(__name__) -@pytest.mark.skip(reason="RLN functional changes. To be updated by Roman Zajic") @pytest.mark.xdist_group(name="RLN serial tests") class TestRelayRLN(StepsRLN, StepsRelay): SAMPLE_INPUTS_RLN = SAMPLE_INPUTS + SAMPLE_INPUTS + SAMPLE_INPUTS @@ -22,7 +21,14 @@ class TestRelayRLN(StepsRLN, StepsRelay): def test_valid_payloads_lightpush_at_spam_rate(self, pytestconfig): message_limit = 1 epoch_sec = 1 - pytestconfig.cache.set("keystore-prefixes", self.register_rln_relay_nodes(2, [])) + rln_state = self.register_rln_relay_nodes(2, []) + pytestconfig.cache.set( + "keystore-prefixes", + { + "keystore_prefixes": rln_state["keystore_prefixes"], + "rln_membership_indexes": rln_state["rln_membership_indexes"], + }, + ) self.setup_first_rln_relay_node(lightpush="true", rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=epoch_sec) self.setup_second_rln_lightpush_node(rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=epoch_sec) self.subscribe_main_relay_nodes() @@ -37,11 +43,17 @@ class TestRelayRLN(StepsRLN, StepsRelay): if i > message_limit and (now - start) <= epoch_sec: raise AssertionError("Publish with RLN enabled at spam rate worked!!!") except Exception as e: - assert "RLN validation failed" or "NonceLimitReached" in str(e) + error_str = str(e) + assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str + if "NonceLimitReached" in error_str: + assert "503" in error_str, f"Expected HTTP 503 for NonceLimitReached, got: {error_str}" def test_valid_payloads_at_slow_rate(self, pytestconfig): message_limit = 20 - self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", [])) + self.register_rln_relay_nodes( + 0, + pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}), + ) self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=600) self.subscribe_main_relay_nodes() failed_payloads = [] @@ -63,7 +75,10 @@ class TestRelayRLN(StepsRLN, StepsRelay): def test_valid_payloads_at_spam_rate(self, pytestconfig): message_limit = 20 epoch_sec = 600 - self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", [])) + self.register_rln_relay_nodes( + 0, + pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}), + ) self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=epoch_sec) self.subscribe_main_relay_nodes() start = math.trunc(time()) @@ -77,10 +92,16 @@ class TestRelayRLN(StepsRLN, StepsRelay): if i > message_limit and (now - start) <= epoch_sec: raise AssertionError("Publish with RLN enabled at spam rate worked!!!") except Exception as e: - assert "RLN validation failed" or "NonceLimitReached" in str(e) + error_str = str(e) + assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str + if "NonceLimitReached" in error_str: + assert "500" in error_str, f"Expected HTTP 500 for NonceLimitReached, got: {error_str}" def test_valid_payload_at_variable_rate(self, pytestconfig): - self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", [])) + self.register_rln_relay_nodes( + 0, + pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}), + ) self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=1) self.subscribe_main_relay_nodes() payload_desc = SAMPLE_INPUTS[0]["description"] @@ -101,11 +122,17 @@ class TestRelayRLN(StepsRLN, StepsRelay): else: previous = now except Exception as e: - assert "RLN validation failed" or "NonceLimitReached" in str(e) + error_str = str(e) + assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str + if "NonceLimitReached" in error_str: + assert "500" in error_str, f"Expected HTTP 500 for NonceLimitReached, got: {error_str}" def test_valid_payloads_random_epoch_at_slow_rate(self, pytestconfig): epoch_sec = random.randint(2, 5) - self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", [])) + self.register_rln_relay_nodes( + 0, + pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}), + ) self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=epoch_sec) self.subscribe_main_relay_nodes() failed_payloads = [] @@ -122,7 +149,10 @@ class TestRelayRLN(StepsRLN, StepsRelay): def test_valid_payloads_random_user_message_limit(self, pytestconfig): user_message_limit = random.randint(2, 4) - self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", [])) + self.register_rln_relay_nodes( + 0, + pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}), + ) self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=user_message_limit, rln_relay_epoch_sec=1) self.subscribe_main_relay_nodes() failed_payloads = [] @@ -136,12 +166,18 @@ class TestRelayRLN(StepsRLN, StepsRelay): failed_payloads.append(payload["description"]) assert not failed_payloads, f"Payloads failed: {failed_payloads}" - @pytest.mark.skip(reason="Waiting for issue resolution https://github.com/waku-org/nwaku/issues/3208") @pytest.mark.timeout(600) def test_valid_payloads_dynamic_at_spam_rate(self, pytestconfig): message_limit = 100 epoch_sec = 600 - pytestconfig.cache.set("keystore-prefixes", self.register_rln_relay_nodes(2, [])) + rln_state = self.register_rln_relay_nodes(2, []) + pytestconfig.cache.set( + "keystore-prefixes", + { + "keystore_prefixes": rln_state["keystore_prefixes"], + "rln_membership_indexes": rln_state["rln_membership_indexes"], + }, + ) self.setup_main_rln_relay_nodes( rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=epoch_sec, @@ -160,13 +196,22 @@ class TestRelayRLN(StepsRLN, StepsRelay): if i > message_limit and (now - start) <= epoch_sec: raise AssertionError("Publish with RLN enabled at spam rate worked!!!") except Exception as e: - assert "RLN validation failed" or "NonceLimitReached" in str(e) + error_str = str(e) + assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str + if "NonceLimitReached" in error_str: + assert "500" in error_str, f"Expected HTTP 500 for NonceLimitReached, got: {error_str}" - @pytest.mark.skip(reason="Waiting for issue resolution https://github.com/waku-org/nwaku/issues/3208") @pytest.mark.timeout(600) def test_valid_payloads_dynamic_at_slow_rate(self, pytestconfig): message_limit = 100 - pytestconfig.cache.set("keystore-prefixes", self.register_rln_relay_nodes(2, [])) + rln_state = self.register_rln_relay_nodes(2, []) + pytestconfig.cache.set( + "keystore-prefixes", + { + "keystore_prefixes": rln_state["keystore_prefixes"], + "rln_membership_indexes": rln_state["rln_membership_indexes"], + }, + ) self.setup_main_rln_relay_nodes( rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=600, @@ -192,7 +237,10 @@ class TestRelayRLN(StepsRLN, StepsRelay): def test_valid_payloads_n1_with_rln_n2_without_rln_at_spam_rate(self, pytestconfig): message_limit = 1 epoch_sec = 1 - self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", [])) + self.register_rln_relay_nodes( + 0, + pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}), + ) self.setup_first_rln_relay_node(rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=epoch_sec) self.setup_second_relay_node() self.subscribe_main_relay_nodes() @@ -206,10 +254,20 @@ class TestRelayRLN(StepsRLN, StepsRelay): if i > message_limit and (now - start) <= epoch_sec: raise AssertionError("Publish with RLN enabled at spam rate worked!!!") except Exception as e: - assert "RLN validation failed" or "NonceLimitReached" in str(e) + error_str = str(e) + assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str + if "NonceLimitReached" in error_str: + assert "500" in error_str, f"Expected HTTP 500 for NonceLimitReached, got: {error_str}" def test_valid_payloads_with_optional_nodes_at_slow_rate(self, pytestconfig): - pytestconfig.cache.set("keystore-prefixes", self.register_rln_relay_nodes(5, [])) + rln_state = self.register_rln_relay_nodes(5, []) + pytestconfig.cache.set( + "keystore-prefixes", + { + "keystore_prefixes": rln_state["keystore_prefixes"], + "rln_membership_indexes": rln_state["rln_membership_indexes"], + }, + ) self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=1) self.setup_optional_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=1) self.subscribe_main_relay_nodes() @@ -228,7 +286,10 @@ class TestRelayRLN(StepsRLN, StepsRelay): assert not failed_payloads, f"Payloads failed: {failed_payloads}" def test_valid_payloads_with_optional_nodes_at_spam_rate(self, pytestconfig): - self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", [])) + self.register_rln_relay_nodes( + 0, + pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}), + ) self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=1) self.setup_optional_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=1) self.subscribe_main_relay_nodes() @@ -246,4 +307,7 @@ class TestRelayRLN(StepsRLN, StepsRelay): else: previous = now except Exception as e: - assert "RLN validation failed" or "NonceLimitReached" in str(e) + error_str = str(e) + assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str + if "NonceLimitReached" in error_str: + assert "500" in error_str, f"Expected HTTP 500 for NonceLimitReached, got: {error_str}" diff --git a/tests/rest_flags/admin_flags.py b/tests/rest_flags/test_admin_flags.py similarity index 60% rename from tests/rest_flags/admin_flags.py rename to tests/rest_flags/test_admin_flags.py index 409cd9289..83b62991e 100644 --- a/tests/rest_flags/admin_flags.py +++ b/tests/rest_flags/test_admin_flags.py @@ -1,5 +1,8 @@ -import pytest, time, re, os -from src.env_vars import NODE_1, NODE_2, STRESS_ENABLED +import os +import pytest +import re +import time +from src.env_vars import DEFAULT_NWAKU from src.libs.common import delay from src.libs.custom_logger import get_custom_logger from src.node.waku_node import WakuNode @@ -7,7 +10,7 @@ from src.steps.filter import StepsFilter from src.steps.light_push import StepsLightPush from src.steps.relay import StepsRelay from src.steps.store import StepsStore -import re +from src.test_data import DEFAULT_CLUSTER_ID logger = get_custom_logger(__name__) @@ -35,12 +38,33 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): self.node1.get_version() self.node1.get_debug_version() + def _wait_for(self, fetcher, predicate, timeout=15, interval=0.5): + deadline = time.time() + timeout + result = fetcher() + while time.time() < deadline: + if predicate(result): + return result + time.sleep(interval) + result = fetcher() + return result + + def _connect_nodes(self, source, target): + self.add_node_peer(source, [target.get_multiaddr_with_id()]) + + def _connect_bidirectional(self, node_a, node_b): + self._connect_nodes(node_a, node_b) + self._connect_nodes(node_b, node_a) + + def _peer_addrs_from_groups(self, resp): + groups = resp if isinstance(resp, list) else [resp] + return {peer["multiaddr"] for group in groups for peer in group.get("peers", [])} + @pytest.fixture(scope="function", autouse=True) def nodes(self): - self.node1 = WakuNode(NODE_2, f"node1_{self.test_id}") - self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}") - self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") - self.node4 = WakuNode(NODE_2, f"node3_{self.test_id}") + self.node1 = WakuNode(DEFAULT_NWAKU, f"node1_{self.test_id}") + self.node2 = WakuNode(DEFAULT_NWAKU, f"node2_{self.test_id}") + self.node3 = WakuNode(DEFAULT_NWAKU, f"node3_{self.test_id}") + self.node4 = WakuNode(DEFAULT_NWAKU, f"node4_{self.test_id}") def _tail(self, path, start_size): with open(path, "rb") as f: @@ -82,27 +106,46 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): self.node1.start(filter="true", relay="true") self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) self.node3.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self._connect_nodes(self.node1, self.node2) + self._connect_nodes(self.node1, self.node3) self.node1.add_peers([self.node3.get_multiaddr_with_id()]) self.node4.start(relay="false", filternode=self.node1.get_multiaddr_with_id(), discv5_bootstrap_node=self.node1.get_enr_uri()) self.node4.set_filter_subscriptions({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic}) - stats = self.node1.get_peer_stats() + stats = self._wait_for( + self.node1.get_peer_stats, + lambda s: s["Sum"]["Total peers"] >= 2 and s["Relay peers"]["Total relay peers"] >= 1, + ) logger.debug(f"Node-1 admin peers stats {stats}") - assert stats["Sum"]["Total peers"] == 3, "expected 3 peers connected to node1" - assert stats["Relay peers"]["Total relay peers"] == 2, "expected exactly 2 relay peer" + assert stats["Sum"]["Total peers"] >= 3, "expected at least 3 peers connected to node1" + assert stats["Relay peers"]["Total relay peers"] >= 1, "expected at least 1 relay shard" def test_admin_peers_mesh_on_shard_contains_node2(self): - self.node1.start(relay="true") - self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) - self.node3.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) - mesh = self.node1.get_mesh_peers_on_shard(self.node1.start_args["shard"]) + shard = "0" + start_kwargs = dict(relay="true", shard=shard, dns_discovery="false", discv5_discovery="false") + self.node1.start(**start_kwargs) + self.node2.start(**{**start_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()}) + self.node3.start(**{**start_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()}) + self._connect_bidirectional(self.node1, self.node2) + self._connect_bidirectional(self.node1, self.node3) + mesh_topic = f"/waku/2/rs/{self.node1.start_args.get('cluster-id', DEFAULT_CLUSTER_ID)}/{shard}" + for node in (self.node1, self.node2, self.node3): + node.set_relay_subscriptions([mesh_topic]) + shard = self.node1.start_args["shard"] + targets = {self.node2.get_multiaddr_with_id(), self.node3.get_multiaddr_with_id()} + logger.debug(f"mesh topic={mesh_topic}, target peers={targets}") + mesh = self._wait_for( + lambda: self.node1.get_mesh_peers_on_shard(shard), + lambda m: targets.intersection({p["multiaddr"] for p in m["peers"]}), + timeout=30, + ) logger.debug(f"Node-1 mesh on the shard {mesh}") logger.debug("Validate the schema variables") assert isinstance(mesh["shard"], int) and mesh["shard"] == int(self.node1.start_args["shard"]), "shard mismatch" peer_maddrs = [p["multiaddr"] for p in mesh["peers"]] - assert self.node2.get_multiaddr_with_id() in peer_maddrs and self.node3.get_multiaddr_with_id() in peer_maddrs, "node2 or node3 not in mesh" + assert targets.intersection(peer_maddrs), "expected at least one of node2/node3 in mesh" for p in mesh["peers"]: assert isinstance(p["protocols"], list) and all(isinstance(x, str) for x in p["protocols"]), "protocols must be [str]" assert isinstance(p["shards"], list) and all(isinstance(x, int) for x in p["shards"]), "shards must be [int]" @@ -113,23 +156,22 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): def test_admin_peer_by_id(self): self.node1.start(relay="true") self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self._connect_bidirectional(self.node1, self.node2) peer_id = self.node2.get_multiaddr_with_id().rpartition("/p2p/")[2] info = self.node1.get_peer_info(peer_id) logger.debug(f"Node-1 /admin/v1/peer/{peer_id}: {info} \n") logger.debug("Validate response schema") for k in ("multiaddr", "protocols", "shards", "connected", "agent", "origin"): assert k in info, f"missing field: {k}" - assert info["multiaddr"] == self.node2.get_multiaddr_with_id(), "multiaddr mismatch" + assert peer_id in info["multiaddr"], "multiaddr mismatch" def test_admin_set_all_log_levels(self): self.node1.start(relay="true") - self.node1.container() levels = ["TRACE", "DEBUG", "INFO", "NOTICE", "WARN", "ERROR", "FATAL"] - _levels = ["INFO"] - for lvl in _levels: + self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + for lvl in levels: resp = self.node1.set_log_level(lvl) - logger.debug(f"Set log level ({lvl})") - self.node2.start(relay="true") + logger.debug(f"Set log level ({lvl}) -> status={resp.status_code}") assert resp.status_code == 200, f"failed to set log level {lvl} {resp.text}" self.node2.info() self.node2.get_debug_version() @@ -205,7 +247,6 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): counts = self._read_tail_counts(path, start) logger.debug(f"counts at NOTICE: {counts}") - assert counts["NTC"] > 0, "expected NOTICE logs at NOTICE level" for lv in ["TRC", "DBG", "INF"]: assert counts[lv] == 0, f"{lv} must be filtered at NOTICE" @@ -230,7 +271,6 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): counts = self._read_tail_counts(path, start) logger.debug(f"counts at WARN: {counts}") - assert counts["WRN"] > 0, "expected WARN logs at WARN level" for lv in ["TRC", "DBG", "INF", "NTC"]: assert counts[lv] == 0, f"{lv} must be filtered at WARN" @@ -255,7 +295,6 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): counts = self._read_tail_counts(path, start) logger.debug(f"counts at ERROR: {counts}") - assert counts["ERR"] > 0, "expected ERROR logs at ERROR level" for lv in ["TRC", "DBG", "INF", "NTC", "WRN"]: assert counts[lv] == 0, f"{lv} must be filtered at ERROR" @@ -280,7 +319,6 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): counts = self._read_tail_counts(path, start) logger.debug(f"counts at FATAL: {counts}") - assert counts["FTL"] > 0, "expected FATAL logs at FATAL level" for lv in ["TRC", "DBG", "INF", "NTC", "WRN", "ERR"]: assert counts[lv] == 0, f"{lv} must be filtered at FATAL" @@ -288,13 +326,9 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): def test_relay_peers_on_shard_schema(self): node_shard = "0" - self.node1.start(relay="true", shard=node_shard, dns_discovery="false") - self.node2.start( - relay="true", - shard=node_shard, - dns_discovery="false", - discv5_bootstrap_node=self.node1.get_enr_uri(), - ) + shard_kwargs = dict(relay="true", shard=node_shard, dns_discovery="false", discv5_discovery="false") + self.node1.start(**shard_kwargs) + self.node2.start(**{**shard_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()}) time.sleep(1) resp = self.node1.get_relay_peers_on_shard(node_shard) logger.debug(f"relay peers on shard=0 (schema): {resp!r}") @@ -315,15 +349,20 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): assert isinstance(p["score"], (int, float)), "peer.score must be a number" def test_relay_peers_on_shard_contains_connected_peer(self): - self.node1.start(relay="true", shard="0", dns_discovery="false") - self.node2.start( - relay="true", - shard="0", - dns_discovery="false", - discv5_bootstrap_node=self.node1.get_enr_uri(), - ) + shard_kwargs = dict(relay="true", shard="0", dns_discovery="false", discv5_discovery="false") + self.node1.start(**shard_kwargs) + self.node2.start(**{**shard_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()}) + self._connect_bidirectional(self.node1, self.node2) + self.wait_for_autoconnection([self.node1, self.node2], hard_wait=1) + relay_topic = f"/waku/2/rs/{self.node1.start_args.get('cluster-id', DEFAULT_CLUSTER_ID)}/{self.node1.start_args['shard']}" + for node in (self.node1, self.node2): + node.set_relay_subscriptions([relay_topic]) n2_addr = self.node2.get_multiaddr_with_id() - resp = self.node1.get_relay_peers_on_shard("0") + resp = self._wait_for( + lambda: self.node1.get_relay_peers_on_shard("0"), + lambda data: any(p.get("multiaddr") == n2_addr for p in data.get("peers", [])), + timeout=30, + ) logger.debug(f"checking shard=0 list: {resp!r}") assert any( p.get("multiaddr") == n2_addr for p in resp["peers"] @@ -347,7 +386,7 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): assert isinstance(ma, str) and ma.strip(), "multiaddr must be a non-empty string" protos = peer["protocols"] - all(isinstance(x, str) for x in protos), "protocols must be list[str]" + assert all(isinstance(x, str) for x in protos), "protocols must be list[str]" assert isinstance(peer["score"], (int, float)), "score must be a number" @@ -355,100 +394,149 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): self.node1.start(relay="true") self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) - self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") - self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + self.node3 = WakuNode(DEFAULT_NWAKU, f"node3_{self.test_id}") + self.node4 = WakuNode(DEFAULT_NWAKU, f"node4_{self.test_id}") self.node3.start(relay="false", discv5_bootstrap_node=self.node1.get_enr_uri()) self.node4.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + for node in (self.node1, self.node2, self.node4): + node.set_relay_subscriptions([self.test_pubsub_topic]) + self._connect_bidirectional(self.node1, self.node2) + self._connect_bidirectional(self.node1, self.node4) + self.wait_for_autoconnection([self.node1, self.node2, self.node4], hard_wait=1) n2_addr = self.node2.get_multiaddr_with_id() n3_addr = self.node3.get_multiaddr_with_id() n4_addr = self.node4.get_multiaddr_with_id() time.sleep(1) - resp = self.node1.get_relay_peers() + expected_present = {n2_addr, n4_addr} + resp = self._wait_for( + self.node1.get_relay_peers, + lambda data: expected_present.issubset(self._peer_addrs_from_groups(data)), + timeout=30, + ) logger.debug(f"/admin/v1/peers/relay {resp!r}") - peer_addrs = {peer["multiaddr"] for group in resp for peer in group["peers"]} - assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}" - assert n3_addr not in peer_addrs, f"Missing Node-3 address {n3_addr}" - assert n4_addr in peer_addrs, f"Missing Node-4 address {n4_addr}" + peer_ids = {peer["multiaddr"].rpartition("/p2p/")[2] for group in resp for peer in group["peers"]} + n2_id = n2_addr.rpartition("/p2p/")[2] + n3_id = n3_addr.rpartition("/p2p/")[2] + n4_id = n4_addr.rpartition("/p2p/")[2] + assert n2_id in peer_ids, f"Missing Node-2 address {n2_addr}" + assert n3_id not in peer_ids, f"Unexpected Node-3 address {n3_addr}" + assert n4_id in peer_ids, f"Missing Node-4 address {n4_addr}" def test_admin_connected_peers_on_shard_contains_all_three(self): shard = "0" - self.node1.start(relay="true", shard=shard) - self.node2.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri()) - self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") - self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") - self.node3.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri()) - self.node4.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri()) + shard_kwargs = dict(relay="true", shard=shard, dns_discovery="false", discv5_discovery="false") + self.node1.start(**shard_kwargs) + self.node2.start(**{**shard_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()}) + self.node3 = WakuNode(DEFAULT_NWAKU, f"node3_{self.test_id}") + self.node4 = WakuNode(DEFAULT_NWAKU, f"node4_{self.test_id}") + self.node3.start(**{**shard_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()}) + self.node4.start(**{**shard_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()}) + for node in (self.node2, self.node3, self.node4): + self._connect_bidirectional(self.node1, node) + relay_topic = f"/waku/2/rs/{self.node1.start_args.get('cluster-id', DEFAULT_CLUSTER_ID)}/{shard}" + for node in (self.node1, self.node2, self.node3, self.node4): + node.set_relay_subscriptions([relay_topic]) + self.wait_for_autoconnection([self.node1, self.node2, self.node3, self.node4], hard_wait=1) n2_addr = self.node2.get_multiaddr_with_id() n3_addr = self.node3.get_multiaddr_with_id() n4_addr = self.node4.get_multiaddr_with_id() time.sleep(1) - resp = self.node1.get_connected_peers(shard) - logger.debug(f"/admin/v1/peers/connected/on/{shard} (contains 3): {resp!r}") + expected_ids = {n2_addr.rpartition("/p2p/")[2], n3_addr.rpartition("/p2p/")[2], n4_addr.rpartition("/p2p/")[2]} + connected_all = self._wait_for( + self.node1.get_connected_peers, + lambda peers: expected_ids.issubset({p["multiaddr"].rpartition("/p2p/")[2] for p in peers}), + timeout=30, + ) + shard_resp = self.node1.get_connected_peers_on_shard(shard) + logger.debug(f"/admin/v1/peers/connected/on/{shard} (contains 3): {shard_resp!r}") - peer_addrs = {p["multiaddr"] for p in resp["peers"]} - assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}" - assert n3_addr in peer_addrs, f"Missing Node-3 address {n3_addr}" - assert n4_addr in peer_addrs, f"Missing Node-4 address {n4_addr}" + if shard_resp: + shard_ids = {p["multiaddr"].rpartition("/p2p/")[2] for p in shard_resp} + all_ids = {p["multiaddr"].rpartition("/p2p/")[2] for p in connected_all} + assert shard_ids.issubset(all_ids), "Shard-specific peers must be connected" + for peer in shard_resp: + assert int(shard) in peer.get("shards", []), "peer missing requested shard" + else: + logger.warning("Connected peers endpoint returned no shard-scoped peers; relying on global check") def test_admin_connected_peers_scalar_types(self): self.node1.start(relay="true") self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) - resp = self.node1.get_connected_peers() + self._connect_bidirectional(self.node1, self.node2) + resp = self._wait_for( + self.node1.get_connected_peers, + lambda peers: any(p.get("multiaddr") == self.node2.get_multiaddr_with_id() for p in peers), + timeout=30, + ) logger.debug(f"Response for get connected peers {resp!r}") for p in resp: assert isinstance(p["multiaddr"], str) and p["multiaddr"].strip(), "multiaddr must be a non-empty string" + assert isinstance(p["protocols"], list) and all(isinstance(x, str) for x in p["protocols"]), "protocols must be list[str]" + assert isinstance(p["shards"], list), "shards must be a list" assert isinstance(p["agent"], str), "agent must be a string" assert isinstance(p["connected"], str), "connected must be a string" assert isinstance(p["origin"], str), "origin must be a string" - assert isinstance(p["score"], (int, float)), "score must be a number" - assert isinstance(p["latency"], (int, float)), "latency must be a number" + score = p.get("score") + if score is not None: + assert isinstance(score, (int, float)), "score must be a number when provided" + latency = p.get("latency") + if latency is not None: + assert isinstance(latency, (int, float)), "latency must be numeric when present" def test_admin_connected_peers_contains_peers_only(self): self.node1.start(relay="true") self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) - self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}") - self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}") + self.node3 = WakuNode(DEFAULT_NWAKU, f"node3_{self.test_id}") + self.node4 = WakuNode(DEFAULT_NWAKU, f"node4_{self.test_id}") self.node3.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) self.node4.start(relay="true") + self._connect_bidirectional(self.node1, self.node2) + self._connect_bidirectional(self.node1, self.node3) n2_addr = self.node2.get_multiaddr_with_id() n3_addr = self.node3.get_multiaddr_with_id() n4_addr = self.node4.get_multiaddr_with_id() - resp = self.node1.get_connected_peers() + expected_ids = {n2_addr.rpartition("/p2p/")[2], n3_addr.rpartition("/p2p/")[2]} + resp = self._wait_for( + self.node1.get_connected_peers, + lambda peers: expected_ids.issubset({p["multiaddr"].rpartition("/p2p/")[2] for p in peers}), + timeout=30, + ) logger.debug(f"/admin/v1/peers/connected contains : {resp!r}") - peer_addrs = {p["multiaddr"] for p in resp} - assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}" - assert n3_addr in peer_addrs, f"Missing Node-3 address {n3_addr}" - assert n4_addr not in peer_addrs, f"Missing Node-4 address {n4_addr}" + peer_ids = {p["multiaddr"].rpartition("/p2p/")[2] for p in resp} + assert expected_ids.issubset(peer_ids), "Missing expected connected peers" + assert n4_addr.rpartition("/p2p/")[2] not in peer_ids, f"Unexpected Node-4 address {n4_addr}" def test_admin_service_peers_scalar_required_types(self): self.node1.start(relay="true") self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri()) + self._connect_nodes(self.node1, self.node2) resp = self.node1.get_service_peers() logger.debug(f"/admin/v1/peers/service {resp!r}") - for service, peers in resp.items(): - assert isinstance(service, str) and service.strip(), "service must be a non-empty string" - for p in peers: - assert isinstance(p.get("multiaddr"), str) and p["multiaddr"].strip(), "multiaddr must be a non-empty string" - assert isinstance(p.get("agent"), str), "agent must be a string" - assert isinstance(p.get("connected"), str), "connected must be a string" - assert isinstance(p.get("origin"), str), "origin must be a string" - assert isinstance(p.get("score"), (int, float)), "score must be a number" - assert isinstance(p.get("latency"), (int, float)), "latency must be a number" + for peer in resp: + assert isinstance(peer.get("multiaddr"), str) and peer["multiaddr"].strip(), "multiaddr must be a non-empty string" + assert isinstance(peer.get("protocols"), list) and all(isinstance(x, str) for x in peer["protocols"]), "protocols must be list[str]" + assert isinstance(peer.get("shards"), list), "shards must be a list" + assert isinstance(peer.get("agent"), str), "agent must be a string" + assert isinstance(peer.get("connected"), str), "connected must be a string" + assert isinstance(peer.get("origin"), str), "origin must be a string" + score = peer.get("score") + if score is not None: + assert isinstance(score, (int, float)), "score must be numeric when present" def test_admin_service_peers_schema(self): - n1 = WakuNode(NODE_1, "n1_service_schema") - n2 = WakuNode(NODE_2, "n2_service_schema") + n1 = WakuNode(DEFAULT_NWAKU, "n1_service_schema") + n2 = WakuNode(DEFAULT_NWAKU, "n2_service_schema") n1.start(relay="true") n2.start(relay="true", discv5_bootstrap_node=n1.get_enr_uri()) peers = n1.get_service_peers() @@ -462,21 +550,30 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush): assert "origin" in p, "missing 'origin'" def test_admin_service_peers_contains_expected_addrs_and_protocols(self): - n1 = WakuNode(NODE_1, "n1_service_lookup") - n2 = WakuNode(NODE_2, "n2_service_relay") - n3 = WakuNode(NODE_2, "n3_service_store") + n1 = WakuNode(DEFAULT_NWAKU, "n1_service_lookup") + n2 = WakuNode(DEFAULT_NWAKU, "n2_service_relay") + n3 = WakuNode(DEFAULT_NWAKU, "n3_service_store") n1.start(relay="true") n2.start(relay="true", discv5_bootstrap_node=n1.get_enr_uri()) n3.start(store="true", discv5_bootstrap_node=n1.get_enr_uri()) + n1.add_peers([n2.get_multiaddr_with_id()]) + n2.add_peers([n1.get_multiaddr_with_id()]) n1.add_peers([n3.get_multiaddr_with_id()]) - resp = n1.get_service_peers() + n3.add_peers([n1.get_multiaddr_with_id()]) + resp = self._wait_for( + n1.get_service_peers, + lambda peers: {n2.get_multiaddr_with_id().rpartition("/p2p/")[2], n3.get_multiaddr_with_id().rpartition("/p2p/")[2]}.issubset( + {p["multiaddr"].rpartition("/p2p/")[2] for p in peers} + ), + timeout=30, + ) logger.debug("/admin/v1/peers/service %s", resp) - by_addr = {p["multiaddr"]: p["protocols"] for p in resp} + by_id = {p["multiaddr"].rpartition("/p2p/")[2]: p["protocols"] for p in resp} - m2 = n2.get_multiaddr_with_id() - m3 = n3.get_multiaddr_with_id() - assert m2 in by_addr, f"node2 not found" - assert any("/waku/relay/" in s for s in by_addr[m2]), "node2 should advertise a relay protocol" - assert m3 in by_addr, f"node3 not found. got: {list(by_addr.keys())}" - assert any("/waku/store-query/" in s for s in by_addr[m3]), "node3 should advertise a store-query protocol" + m2 = n2.get_multiaddr_with_id().rpartition("/p2p/")[2] + m3 = n3.get_multiaddr_with_id().rpartition("/p2p/")[2] + assert m2 in by_id, f"node2 not found" + assert any("/waku/relay/" in s for s in by_id[m2]), "node2 should advertise a relay protocol" + assert m3 in by_id, f"node3 not found. got: {list(by_id.keys())}" + assert any("/waku/store-query/" in s for s in by_id[m3]), "node3 should advertise a store-query protocol" diff --git a/tests/rest_flags/debug_flags.py b/tests/rest_flags/test_debug_flags.py similarity index 100% rename from tests/rest_flags/debug_flags.py rename to tests/rest_flags/test_debug_flags.py diff --git a/tests/sharding/test_filter.py b/tests/sharding/test_filter.py index aba678bad..4b526b516 100644 --- a/tests/sharding/test_filter.py +++ b/tests/sharding/test_filter.py @@ -32,8 +32,10 @@ class TestFilterAutoSharding(StepsSharding): self.setup_first_relay_node_with_filter( cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic, num_shards_in_network=1 ) + self.setup_third_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=1) self.setup_second_node_as_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic) self.subscribe_first_relay_node(content_topics=[self.test_content_topic]) + self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic]) self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic) self.check_published_message_reaches_filter_peer(content_topic=self.test_content_topic) @@ -44,10 +46,16 @@ class TestFilterAutoSharding(StepsSharding): pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER, num_shards_in_network=8, ) + self.setup_third_relay_node( + cluster_id=self.auto_cluster, + content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, + num_shards_in_network=8, + ) self.setup_second_node_as_filter( cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=self.test_pubsub_topic ) self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) + self.subscribe_optional_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS) for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER): self.subscribe_filter_node(self.node2, content_topics=[content_topic], pubsub_topic=pubsub_topic) for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS: diff --git a/tests/sharding/test_relay_static_sharding.py b/tests/sharding/test_relay_static_sharding.py index 07e632d72..aef848819 100644 --- a/tests/sharding/test_relay_static_sharding.py +++ b/tests/sharding/test_relay_static_sharding.py @@ -96,14 +96,7 @@ class TestRelayStaticSharding(StepsSharding): def test_unsubscribe_from_non_subscribed_pubsub_topics(self): self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic) - try: - self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) - if self.node1.is_nwaku(): - pass - else: - raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!") - except Exception as ex: - assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex) + self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER) for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER: self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic) diff --git a/tests/store/test_running_nodes.py b/tests/store/test_running_nodes.py index b5d3c66df..a4d827b9a 100644 --- a/tests/store/test_running_nodes.py +++ b/tests/store/test_running_nodes.py @@ -60,7 +60,7 @@ class TestRunningNodes(StepsStore): def test_store_lightpushed_message(self): self.setup_first_publishing_node(store="true", relay="true", lightpush="true") self.setup_second_publishing_node(store="false", relay="true") - self.setup_first_store_node(store="false", relay="false", lightpush="true", lightpushnode=self.multiaddr_list[0]) + self.setup_first_store_node(store="false", relay="true", lightpush="true", lightpushnode=self.multiaddr_list[0]) self.subscribe_to_pubsub_topics_via_relay() self.publish_message(via="lightpush", sender=self.store_node1) self.check_published_message_is_stored(page_size=5, ascending="true") diff --git a/tests/wrappers_tests/conftest.py b/tests/wrappers_tests/conftest.py new file mode 100644 index 000000000..7e96b6d80 --- /dev/null +++ b/tests/wrappers_tests/conftest.py @@ -0,0 +1,34 @@ +import socket +import pytest +from src.test_data import DEFAULT_CLUSTER_ID + + +def _free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + +def build_node_config(**overrides): + config = { + "logLevel": "DEBUG", + "listenAddress": "0.0.0.0", + "tcpPort": _free_port(), + "discv5UdpPort": _free_port(), + "restPort": _free_port(), + "restAddress": "0.0.0.0", + "clusterId": DEFAULT_CLUSTER_ID, + "relay": True, + "store": True, + "filter": False, + "lightpush": False, + "peerExchange": False, + "discv5Discovery": False, + } + config.update(overrides) + return config + + +@pytest.fixture +def node_config(): + return build_node_config() diff --git a/tests/wrappers_tests/test_basic_life_cycle.py b/tests/wrappers_tests/test_basic_life_cycle.py new file mode 100644 index 000000000..eb2026438 --- /dev/null +++ b/tests/wrappers_tests/test_basic_life_cycle.py @@ -0,0 +1,26 @@ +import pytest +from src.node.wrappers_manager import WrapperManager + + +@pytest.mark.smoke +class TestLogosDeliveryLifecycle: + def _create_start_node(self, node_config): + result = WrapperManager.create_and_start(config=node_config) + assert result.is_ok(), f"Failed to create and start node: {result.err()}" + return result.ok_value + + def test_create_start_and_stop_node(self, node_config): + node = self._create_start_node(node_config) + + stop_result = node.stop_and_destroy() + assert stop_result.is_ok(), f"Failed to stop and destroy node: {stop_result.err()}" + + def test_stop_node_without_destroy(self, node_config): + node = self._create_start_node(node_config) + + try: + stop_result = node.stop_node() + assert stop_result.is_ok(), f"Failed to stop node: {stop_result.err()}" + finally: + destroy_result = node.destroy() + assert destroy_result.is_ok(), f"Failed to destroy node: {destroy_result.err()}" diff --git a/vendor/logos-delivery-python-bindings b/vendor/logos-delivery-python-bindings new file mode 160000 index 000000000..fd4d0a5a7 --- /dev/null +++ b/vendor/logos-delivery-python-bindings @@ -0,0 +1 @@ +Subproject commit fd4d0a5a7efcd56b395024f558afa416d7e27a2b