Merge branch 'master' into Lite_protocol_part2

This commit is contained in:
AYAHASSAN287 2026-05-07 15:34:10 +03:00 committed by GitHub
commit 59225aa1f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 2038 additions and 370 deletions

View File

@ -42,8 +42,8 @@ jobs:
- uses: actions/checkout@v4
with:
repository: waku-org/waku-interop-tests
ref: SMOKE_TEST_0.0.1
repository: logos-messaging/logos-messaging-interop-tests
ref: SMOKE_TEST_STABLE
- uses: actions/setup-python@v4
with:

View File

@ -32,18 +32,18 @@ env:
RLN_CREDENTIALS: ${{ secrets.RLN_CREDENTIALS }}
jobs:
tests:
name: tests
strategy:
fail-fast: false
matrix:
shard: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17]
# total number of shards =18 means tests will split into 18 thread and run in parallel to increase execution speed
# command for sharding :
# pytest --shard-id=<shard_number> --num-shards=<total_shards>
# shard 16 for test_rln.py file as they shall run sequentially
# shard 17 for test_cursor_many_msgs.py as it takes time >7 mins
shard: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17]
# total number of shards =18 means tests will split into 18 thread and run in parallel to increase execution speed
# command for sharding :
# pytest --shard-id=<shard_number> --num-shards=<total_shards>
# shard 16 for test_rln.py file as they shall run sequentially
# shard 17 for test_cursor_many_msgs.py as it takes time >7 mins
runs-on: ubuntu-latest
timeout-minutes: 120
outputs:
@ -65,50 +65,149 @@ jobs:
jobResult_15: ${{ steps.set_result.outputs.JOB_RESULT_15 }}
jobResult_16: ${{ steps.set_result.outputs.JOB_RESULT_16 }}
jobResult_17: ${{ steps.set_result.outputs.JOB_RESULT_17 }}
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- uses: actions/checkout@v4
- name: Remove unwanted software
uses: ./.github/actions/prune-vm
- name: Remove unwanted software
uses: ./.github/actions/prune-vm
- uses: actions/setup-python@v4
with:
python-version: "3.12"
cache: "pip"
- uses: actions/setup-python@v4
with:
python-version: '3.12'
cache: 'pip'
- name: Add python bindings to PYTHONPATH
run: echo "PYTHONPATH=$(pwd)/vendor/logos-delivery-python-bindings/waku:$PYTHONPATH" >> $GITHUB_ENV
- run: pip install -r requirements.txt
- name: Install system deps for tc / nsenter
run: |
sudo apt-get update
sudo apt-get install -y \
util-linux \
iproute2 \
sudo \
ca-certificates \
curl \
make \
gcc \
g++
- name: Run tests
- name: Install Nim 2.2.4
run: |
set -euo pipefail
curl https://nim-lang.org/choosenim/init.sh -sSf | sh -s -- -y
echo "$HOME/.nimble/bin" >> "$GITHUB_PATH"
export PATH="$HOME/.nimble/bin:$PATH"
choosenim 2.2.4
nim --version
nimble --version
run: |
if [ "${{ matrix.shard }}" == "16" ]; then
pytest tests/relay/test_rln.py --alluredir=allure-results-${{ matrix.shard }}
elif [ "${{ matrix.shard }}" == "17" ]; then
pytest tests/store/test_cursor_many_msgs.py --alluredir=allure-results-${{ matrix.shard }}
elif [ "${{ matrix.shard }}" != "17" ]; then
pytest --ignore=tests/relay/test_rln.py --ignore=tests/store/test_cursor_many_msgs.py --reruns 2 --shard-id=${{ matrix.shard }} --num-shards=16 --alluredir=allure-results-${{ matrix.shard }}
fi
- run: pip install -r requirements.txt
- name: Upload allure results
if: always()
uses: actions/upload-artifact@v4
with:
name: allure-results-${{ matrix.shard }}
path: allure-results-${{ matrix.shard }}
- name: Build liblogosdelivery.so for python bindings
run: |
set -euo pipefail
- name: Set job result
id: set_result
if: always()
run: |
version="${{ matrix.shard }}"
echo "JOB_RESULT_${version}=${{ job.status }}" >> "$GITHUB_OUTPUT"
export PATH="$HOME/.nimble/bin:$PATH"
BINDINGS_DIR="$(pwd)/vendor/logos-delivery-python-bindings"
DELIVERY_DIR="$BINDINGS_DIR/vendor/logos-delivery"
mkdir -p "$BINDINGS_DIR/lib"
cd "$DELIVERY_DIR"
ln -sf waku.nimble waku.nims
# install Nim deps
nimble install -y
# do the real setup, do not fake .nimble-setup
make setup
# now build the shared library
make liblogosdelivery
SO_PATH="$(find . -type f -name 'liblogosdelivery.so' | head -n 1)"
if [ -z "$SO_PATH" ]; then
echo "liblogosdelivery.so was not built"
exit 1
fi
cp "$SO_PATH" "$BINDINGS_DIR/lib/liblogosdelivery.so"
echo "Built library:"
ls -l "$BINDINGS_DIR/lib/liblogosdelivery.so"
- name: Verify wrapper library
run: |
test -f vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so
- name: Debug Python import paths
run: |
pwd
echo "PYTHONPATH=$PYTHONPATH"
find . -maxdepth 5 | grep wrapper || true
python - <<'PY'
import sys
print("sys.path:")
for p in sys.path:
print(p)
try:
import wrapper
print("wrapper import OK:", wrapper)
except Exception as e:
print("wrapper import failed:", e)
raise
PY
- name: Run tests
run: |
export PATH="$HOME/.nimble/bin:$PATH"
export PYTHONPATH="$(pwd)/vendor/logos-delivery-python-bindings/waku:$PYTHONPATH"
if [ "${{ matrix.shard }}" == "16" ]; then
pytest tests/relay/test_rln.py \
--ignore=vendor/logos-delivery-python-bindings/tests \
--alluredir=allure-results-${{ matrix.shard }}
elif [ "${{ matrix.shard }}" == "17" ]; then
pytest tests/store/test_cursor_many_msgs.py \
--ignore=vendor/logos-delivery-python-bindings/tests \
--alluredir=allure-results-${{ matrix.shard }}
else
pytest \
--ignore=vendor/logos-delivery-python-bindings/tests \
--ignore=tests/relay/test_rln.py \
--ignore=tests/store/test_cursor_many_msgs.py \
--reruns 2 \
--shard-id=${{ matrix.shard }} \
--num-shards=16 \
--alluredir=allure-results-${{ matrix.shard }}
fi
- name: Upload allure results
if: always()
uses: actions/upload-artifact@v4
with:
name: allure-results-${{ matrix.shard }}
path: allure-results-${{ matrix.shard }}
- name: Set job result
id: set_result
if: always()
run: |
version="${{ matrix.shard }}"
echo "JOB_RESULT_${version}=${{ job.status }}" >> "$GITHUB_OUTPUT"
aggregate-reports:
runs-on: ubuntu-latest
needs: [tests]
if: always()
steps:
- name: Download all allure results
uses: actions/download-artifact@v4
@ -116,12 +215,34 @@ jobs:
path: all-results
merge-multiple: true
- name: Get allure history
if: always()
- name: Check out gh-pages history if it exists
id: checkout_gh_pages
continue-on-error: true
uses: actions/checkout@v4
with:
ref: gh-pages
path: gh-pages
fetch-depth: 1
- name: Prepare empty gh-pages fallback
if: steps.checkout_gh_pages.outcome != 'success'
run: |
mkdir -p gh-pages/${{ env.CALLER }}/last-history
- name: Remove oversized Allure text attachments
if: always()
run: |
set -euo pipefail
echo "Before cleanup:"
find all-results -type f | wc -l || true
du -sh all-results || true
find all-results -type f -name "*.txt" -delete || true
echo "After cleanup:"
find all-results -type f | wc -l || true
du -sh all-results || true
- name: Setup allure report
uses: simple-elf/allure-report-action@master
@ -131,66 +252,106 @@ jobs:
allure_results: all-results
gh_pages: gh-pages/${{ env.CALLER }}
allure_history: allure-history
keep_reports: 30
report_url: https://waku-org.github.io/waku-interop-tests/${{ env.CALLER }}
keep_reports: 10
report_url: https://logos-messaging.github.io/logos-delivery-interop-tests/${{ env.CALLER }}
- name: Fix permissions (CRITICAL)
if: always()
run: |
echo "Fixing permissions..."
sudo chown -R $USER:$USER allure-history || true
sudo chmod -R u+rwX allure-history || true
- name: Clean allure-history
if: always()
run: |
echo "Cleaning allure-history..."
# Remove ALL txt attachments from history
find allure-history -type f -name "*.txt" -delete || true
echo "After cleanup:"
du -sh allure-history || true
- name: Verify generated report content
if: always()
run: |
set -euo pipefail
test -d allure-history
test -d "allure-history/${{ github.run_number }}"
test -f "allure-history/${{ github.run_number }}/index.html"
test -d "allure-history/last-history"
echo "=== allure-history size ==="
du -sh allure-history || true
echo "=== largest files ==="
find allure-history -type f -printf '%s %p\n' | sort -nr | head -50 || true
echo "=== symlinks ==="
find allure-history -type l -ls || true
echo "=== hard-linked files ==="
find allure-history -type f -links +1 -ls || true
echo "=== top-level tree ==="
find allure-history -maxdepth 2 -type f | sort | head -100 || true
- name: Deploy report to Github Pages
uses: peaceiris/actions-gh-pages@v3
if: always()
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_branch: gh-pages
publish_dir: allure-history
destination_dir: ${{ env.CALLER }}
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_branch: gh-pages
publish_dir: ./allure-history
destination_dir: ${{ env.CALLER }}
keep_files: false
enable_jekyll: false
- name: Store output from matrix jobs
run: |
echo '${{ toJSON(needs.tests.outputs) }}' > results.json
echo '${{ toJSON(needs.tests.outputs) }}' > results.json
- name: Create job summary
if: always()
run: |
echo "## Run Information" >> $GITHUB_STEP_SUMMARY
echo "- **Event**: ${{ github.event_name }}" >> $GITHUB_STEP_SUMMARY
echo "- **Actor**: ${{ github.actor }}" >> $GITHUB_STEP_SUMMARY
echo "- **Node1**: ${{ env.NODE_1 }}" >> $GITHUB_STEP_SUMMARY
echo "- **Node2**: ${{ env.NODE_2 }}" >> $GITHUB_STEP_SUMMARY
echo "- **Additonal Nodes**: ${{ env.ADDITIONAL_NODES }}" >> $GITHUB_STEP_SUMMARY
echo "## Test Results" >> $GITHUB_STEP_SUMMARY
echo "Allure report will be available at: https://waku-org.github.io/waku-interop-tests/${{ env.CALLER }}/${{ github.run_number }}" >> $GITHUB_STEP_SUMMARY
# Evaluate overall result
TESTS_RESULT="success"
for key in $(jq -r 'keys[]' results.json); do
result=$(jq -r --arg key "$key" '.[$key]' results.json)
echo "Key: $key, Value: $result"
# Check condition on the result
if [ "$result" != "success" ]; then
echo "Value 'success' not found at key: $key"
TESTS_RESULT="failure"
break
fi
done
# Notify Waku team
if [ "$TESTS_RESULT" != "success" ]; then
echo "There are failures with nwaku node. cc <@&1111608257824440330>" >> $GITHUB_STEP_SUMMARY
echo "## Run Information" >> $GITHUB_STEP_SUMMARY
echo "- **Event**: ${{ github.event_name }}" >> $GITHUB_STEP_SUMMARY
echo "- **Actor**: ${{ github.actor }}" >> $GITHUB_STEP_SUMMARY
echo "- **Node1**: ${{ env.NODE_1 }}" >> $GITHUB_STEP_SUMMARY
echo "- **Node2**: ${{ env.NODE_2 }}" >> $GITHUB_STEP_SUMMARY
echo "- **Additonal Nodes**: ${{ env.ADDITIONAL_NODES }}" >> $GITHUB_STEP_SUMMARY
echo "## Test Results" >> $GITHUB_STEP_SUMMARY
echo "Allure report will be available at: https://logos-messaging.github.io/logos-delivery-interop-tests/${{ env.CALLER }}/${{ github.run_number }}" >> $GITHUB_STEP_SUMMARY
TESTS_RESULT="success"
for key in $(jq -r 'keys[]' results.json); do
result=$(jq -r --arg key "$key" '.[$key]' results.json)
echo "Key: $key, Value: $result"
if [ "$result" != "success" ]; then
echo "Value 'success' not found at key: $key"
TESTS_RESULT="failure"
break
fi
# Write result and summary to ENV
echo "TESTS_RESULT=$TESTS_RESULT" >> $GITHUB_ENV
{
echo 'JOB_SUMMARY<<EOF'
cat $GITHUB_STEP_SUMMARY
echo EOF
} >> $GITHUB_ENV
done
if [ "$TESTS_RESULT" != "success" ]; then
echo "There are failures with nwaku node. cc <@&1111608257824440330>" >> $GITHUB_STEP_SUMMARY
fi
echo "TESTS_RESULT=$TESTS_RESULT" >> $GITHUB_ENV
{
echo 'JOB_SUMMARY<<EOF'
cat $GITHUB_STEP_SUMMARY
echo EOF
} >> $GITHUB_ENV
- name: Send report to Discord
uses: rjstone/discord-webhook-notify@v1
if: always() && env.CALLER != 'manual'
with:
severity: ${{ env.TESTS_RESULT == 'success' && 'info' || 'error' }}
username: ${{ github.workflow }}
description: "## Job Result: ${{ env.TESTS_RESULT }}"
details: ${{ env.JOB_SUMMARY }}
webhookUrl: ${{ secrets.DISCORD_TEST_REPORTS_WH }}
severity: ${{ env.TESTS_RESULT == 'success' && 'info' || 'error' }}
username: ${{ github.workflow }}
description: "## Job Result: ${{ env.TESTS_RESULT }}"
details: ${{ env.JOB_SUMMARY }}
webhookUrl: ${{ secrets.DISCORD_TEST_REPORTS_WH }}

2
.gitignore vendored
View File

@ -104,3 +104,5 @@ dmypy.json
# Pyre type checker
.pyre/
third_party/logos-delivery-python-bindings

4
.gitmodules vendored Normal file
View File

@ -0,0 +1,4 @@
[submodule "vendor/logos-delivery-python-bindings"]
path = vendor/logos-delivery-python-bindings
url = https://github.com/logos-messaging/logos-delivery-python-bindings.git

View File

@ -187,7 +187,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2018 Status Research & Development GmbH
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@ -1,21 +1,21 @@
The MIT License (MIT)
Copyright (c) 2021 Status Research & Development GmbH
Copyright © 2025-2026 Logos
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
of this software and associated documentation files (the “Software”), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -1,12 +1,13 @@
# waku-interop-tests
# logos-messaging-interop-tests
Waku endtoend (e2e) interoperability test framework for the [Waku v2 protocol](https://rfc.vac.dev/spec/10/). It exercises multiple clients (nwaku, jswaku, gowaku…) in realistic network topologies and reports results via Allure.
Logos Messaging endtoend (e2e) interoperability test framework for the [Waku v2 protocol](https://rfc.vac.dev/spec/10/). It exercises multiple clients (logos-messaging-nim, jswaku, gowaku…) in realistic network topologies and reports results via Allure.
## Setup & contribution
```bash
git clone git@github.com:waku-org/waku-interop-tests.git
cd waku-interop-tests
# Use sparse checkout since the repo has large history
git clone --depth=1 git@github.com:logos-messaging/logos-messaging-interop-tests.git
cd logos-messaging-interop-tests
# create and activate a virtual environment
python -m venv .venv
@ -47,15 +48,15 @@ Every day the workflow **nim\_waku\_daily.yml** triggers against the image `waku
To launch it manually:
1. Open [https://github.com/waku-org/waku-interop-tests/actions/workflows/nim\_waku\_daily.yml](https://github.com/waku-org/waku-interop-tests/actions/workflows/nim_waku_daily.yml).
1. Open [https://github.com/logos-messaging/logos-messaging-interop-tests/actions/workflows/nim\_waku\_daily.yml](https://github.com/logos-messaging/logos-messaging-interop-tests/actions/workflows/nim_waku_daily.yml).
2. Click **Run workflow**.
3. Pick the branch you want to test (defaults to `master`) and press **Run workflow**.
### Ondemand matrix against custom *nwaku* versions
### Ondemand matrix against custom *logos-messaging-nim* versions
Use **interop\_tests.yml** when you need to test a PR or a historical image:
1. Open [https://github.com/waku-org/waku-interop-tests/actions/workflows/interop\_tests.yml](https://github.com/waku-org/waku-interop-tests/actions/workflows/interop_tests.yml).
1. Open [https://github.com/logos-messaging/logos-messaging-interop-tests/actions/workflows/interop\_tests.yml](https://github.com/logos-messaging/logos-messaging-interop-tests/actions/workflows/interop_tests.yml).
2. Press **Run workflow** and choose the branch.
3. In the *workflow inputs* field set the `nwaku_image` you want, e.g. `wakuorg/nwaku:v0.32.0`.
@ -64,21 +65,11 @@ Use **interop\_tests.yml** when you need to test a PR or a historical image:
* When the job finishes GitHub will display an **Allure Report** link in the run summary.
* The bot also posts the same link in the **Waku / testreports** Discord channel.
### Updating the CI job used from *nwaku*
### Updating the CI job used from *logos-messaging-nim*
In the **nwaku** repository itself the file `.github/workflows/test_PR_image.yml` pins the interop test version.
To update it:
In the **logos-messaging-nim** repository itself the file `.github/workflows/test_PR_image.yml` pins the interop test version to `SMOKE_TEST_STABLE`.
1. Tag the desired commit in `waku-interop-tests` and push the tag
```bash
git tag vX.Y.Z
git push origin vX.Y.Z
```
2. Edit `test_PR_image.yml` in **nwaku** and set `ref: vX.Y.Z` for the `tests` job.
![CI job location](https://github.com/user-attachments/assets/dd3f95bd-fe79-475b-92b7-891d82346382)
To update it, move the `SMOKE_TEST_STABLE` tag to point to the desired commit in `waku-interop-tests`.
## License

View File

@ -2,9 +2,13 @@
addopts = --instafail --tb=short --color=auto
log_level = DEBUG
log_cli = True
norecursedirs =
vendor
nimbledeps
*.egg-info
log_file = log/test.log
log_cli_format = %(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s
log_file_format = %(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s
timeout = 300
markers =
smoke: marks tests as smoke test (deselect with '-m "not smoke"')
smoke: marks tests as smoke test (deselect with '-m "not smoke"')

View File

@ -40,3 +40,5 @@ typing_extensions==4.9.0
urllib3==2.2.2
virtualenv==20.25.0
pytest-shard==0.1.2
result==0.17.0
cffi

View File

@ -16,6 +16,7 @@ def get_env_var(var_name, default=None):
# Configuration constants. Need to be upercase to appear in reports
DEFAULT_NWAKU = "wakuorg/nwaku:latest"
STRESS_ENABLED = False
USE_WRAPPERS = True
NODE_1 = get_env_var("NODE_1", DEFAULT_NWAKU)
NODE_2 = get_env_var("NODE_2", DEFAULT_NWAKU)
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NWAKU},{DEFAULT_NWAKU},{DEFAULT_NWAKU}")
@ -32,4 +33,4 @@ PG_USER = get_env_var("POSTGRES_USER", "postgres")
PG_PASS = get_env_var("POSTGRES_PASSWORD", "test123")
# example for .env file
# RLN_CREDENTIALS = {"rln-relay-cred-password": "password", "rln-relay-eth-client-address": "wss://sepolia.infura.io/ws/v3/api_key", "rln-relay-eth-contract-address": "0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4", "rln-relay-eth-private-key-1": "1111111111111111111111111111111111111111111111111111111111111111", "rln-relay-eth-private-key-2": "1111111111111111111111111111111111111111111111111111111111111111"}
# RLN_CREDENTIALS = {"rln-relay-cred-password": "password", "rln-relay-eth-client-address": "https://rpc.sepolia.linea.build", "rln-relay-eth-contract-address": "0xB9cd878C90E49F797B4431fBF4fb333108CB90e6", "rln-relay-eth-private-key-1": "", "rln-relay-eth-private-key-2": "", "rln-relay-eth-private-key-3": "", "rln-relay-eth-private-key-4": "", "rln-relay-eth-private-key-5": ""}

View File

@ -5,16 +5,19 @@ import random
import re
import shutil
import string
import subprocess
import pytest
import requests
from src.libs.common import delay
from src.libs.custom_logger import get_custom_logger
from tenacity import retry, stop_after_delay, wait_fixed
from tenacity import retry, stop_after_delay, wait_fixed, sleep
from docker.errors import NotFound as DockerNotFound
from src.node.api_clients.rest import REST
from src.node.docker_mananger import DockerManager
from src.env_vars import DOCKER_LOG_DIR
from src.data_storage import DS
from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS, VALID_PUBSUB_TOPICS
from src.node.wrappers_manager import WrapperManager
logger = get_custom_logger(__name__)
@ -37,8 +40,24 @@ def sanitize_docker_flags(input_flags):
@retry(stop=stop_after_delay(180), wait=wait_fixed(0.5), reraise=True)
def rln_credential_store_ready(creds_file_path, single_check=False):
def rln_credential_store_ready(creds_file_path, single_check=False, require_credentials=False):
if os.path.exists(creds_file_path):
subprocess.run(["sudo", "-n", "chmod", "a+r", creds_file_path], check=False)
if require_credentials:
try:
with open(creds_file_path, "r", encoding="utf-8") as creds_file:
keystore_data = json.load(creds_file)
except (OSError, json.JSONDecodeError) as ex:
if single_check:
return False
raise ValueError(f"Failed to parse RLN keystore at {creds_file_path}: {ex}")
credentials = keystore_data.get("credentials", {}) if isinstance(keystore_data, dict) else {}
if not credentials:
if single_check:
return False
raise ValueError(f"RLN keystore exists but has no credentials yet: {creds_file_path}")
return True
elif not single_check:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), creds_file_path)
@ -82,12 +101,27 @@ class WakuNode:
self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log")
self._docker_manager = DockerManager(self._image_name)
self._container = None
self.rln_membership_index = None
self.start_args = {}
self._wrapper_node = None
self._rln_creds_set = False
logger.debug(f"WakuNode instance initialized with log path {self._log_path}")
@property
def _is_wrapper(self) -> bool:
return self._wrapper_node is not None
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
def start(self, wait_for_node_sec=20, **kwargs):
def start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs):
logger.debug("Starting Node...")
default_args, remove_container = self._prepare_start_context(**kwargs)
if use_wrapper:
self._start_wrapper(default_args, wait_for_node_sec)
else:
self._start_docker(default_args, remove_container, wait_for_node_sec)
def _prepare_start_context(self, **kwargs):
self._docker_manager.create_network()
self._ext_ip = self._docker_manager.generate_random_ext_ip()
self._ports = self._docker_manager.generate_ports()
@ -164,19 +198,24 @@ class WakuNode:
del default_args["pubsub-topic"]
rln_args, rln_creds_set, keystore_path = self.parse_rln_credentials(default_args, False)
self._rln_creds_set = rln_creds_set
default_args.pop("rln-creds-id", None)
default_args.pop("rln-creds-source", None)
default_args.pop("rln-keystore-prefix", None)
if rln_creds_set:
rln_credential_store_ready(keystore_path)
rln_credential_store_ready(keystore_path, require_credentials=True)
default_args.update(rln_args)
else:
logger.info(f"RLN credentials not set or credential store not available, starting without RLN")
logger.debug(f"Using volumes {self._volumes}")
self.start_args = dict(default_args)
return default_args, remove_container
def _start_docker(self, default_args, remove_container, wait_for_node_sec):
logger.debug(f"Using volumes {self._volumes}")
self._container = self._docker_manager.start_container(
self._docker_manager.image,
ports=self._ports,
@ -186,16 +225,61 @@ class WakuNode:
volumes=self._volumes,
remove_container=remove_container,
)
logger.debug(f"Started container from image {self._image_name}. REST: {self._rest_port}")
DS.waku_nodes.append(self)
delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly
delay(1)
try:
self.ensure_ready(timeout_duration=wait_for_node_sec, rln_required=self._rln_creds_set)
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
def _start_wrapper(self, default_args, wait_for_node_sec):
logger.debug("Starting node using wrappers")
wrapper_config = self._default_args_to_wrapper_config(default_args)
result = WrapperManager.create_and_start(config=wrapper_config, timeout_s=wait_for_node_sec)
if result.is_err():
raise RuntimeError(f"Failed to start wrapper node: {result.err()}")
self._wrapper_node = result.ok_value
logger.debug(f"Started wrapper node. REST: {self._rest_port}")
DS.waku_nodes.append(self)
delay(1)
try:
self.ensure_ready(timeout_duration=wait_for_node_sec)
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
def _default_args_to_wrapper_config(self, default_args):
def _bool(key, default="false"):
return default_args.get(key, default).lower() == "true"
bootstrap = default_args.get("discv5-bootstrap-node")
return {
"logLevel": default_args.get("log-level", "DEBUG"),
"mode": "Core",
"networkingConfig": {
"listenIpv4": default_args.get("listen-address", "0.0.0.0"),
"p2pTcpPort": int(default_args["tcp-port"]),
"discv5UdpPort": int(default_args["discv5-udp-port"]),
"restPort": int(default_args["rest-port"]),
"restAddress": default_args.get("rest-address", "0.0.0.0"),
},
"protocolsConfig": {
"clusterId": int(default_args.get("cluster-id", DEFAULT_CLUSTER_ID)),
"relay": _bool("relay"),
"store": _bool("store"),
"filter": _bool("filter"),
"lightpush": _bool("lightpush"),
"peerExchange": _bool("peer-exchange"),
"discv5Discovery": _bool("discv5-discovery", "true"),
"discv5BootstrapNodes": [bootstrap] if bootstrap else [],
},
}
@retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True)
def register_rln(self, **kwargs):
logger.debug("Registering RLN credentials...")
@ -221,18 +305,48 @@ class WakuNode:
logger.debug(f"Waiting for keystore {keystore_path}")
try:
rln_credential_store_ready(keystore_path)
rln_credential_store_ready(keystore_path, require_credentials=True)
self.rln_membership_index = str(self.get_rln_membership_index_from_log())
logger.debug(f"Detected RLN membership index from registration logs: {self.rln_membership_index}")
self.stop()
except Exception as ex:
logger.error(f"File {keystore_path} with RLN credentials did not become available in time {ex}")
raise
else:
logger.warn("RLN credentials not set, no action performed")
return self.rln_membership_index
@retry(stop=stop_after_delay(10), wait=wait_fixed(0.2), reraise=True)
def get_rln_membership_index_from_log(self):
if not os.path.exists(self._log_path):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), self._log_path)
with open(self._log_path, "r", encoding="utf-8", errors="ignore") as log_file:
log_data = log_file.read()
matches = re.findall(r"membershipIndex=(\d+)", log_data)
if not matches:
raise ValueError("Could not infer RLN membership index from registration logs")
return int(matches[-1])
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self):
if self._is_wrapper:
self._stop_wrapper()
else:
self._stop_docker()
def _stop_docker(self):
if self._container:
logger.debug(f"Stopping container with id {self._container.short_id}")
self._container.stop()
try:
self._container.stop()
except DockerNotFound:
logger.debug(f"Container {self._container.short_id} already exited and removed, treating as stopped.")
self._container = None
return
try:
self._container.remove()
except:
@ -240,6 +354,14 @@ class WakuNode:
self._container = None
logger.debug("Container stopped.")
def _stop_wrapper(self):
logger.debug("Stopping wrapper node")
result = self._wrapper_node.stop_and_destroy()
if result.is_err():
logger.error(f"Failed to stop wrapper node: {result.err()}")
self._wrapper_node = None
logger.debug("Wrapper node stopped and destroyed.")
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self):
if self._container:
@ -267,7 +389,7 @@ class WakuNode:
logger.debug(f"Unpause container with id {self._container.short_id}")
self._container.unpause()
def ensure_ready(self, timeout_duration=10):
def ensure_ready(self, timeout_duration=10, rln_required=False):
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True)
def check_healthy(node=self):
self.health_response = node.health()
@ -280,9 +402,12 @@ class WakuNode:
if self.health_response.get("nodeHealth") != "READY":
raise AssertionError("Waiting for the node health status: READY")
# for p in self.health_response.get("protocolsHealth"):
# if p.get("Rln Relay") != "READY":
# raise AssertionError("Waiting for the Rln relay status: READY")
for p in self.health_response.get("protocolsHealth"):
if rln_required and "Rln Relay" in p:
if p["Rln Relay"] != "READY":
raise AssertionError("Waiting for the Rln relay status: READY")
# TODO: Remove once Rln Relay reflects true RLN status
sleep(20)
logger.info("Node protocols are initialized !!")
@ -320,6 +445,33 @@ class WakuNode:
def get_tcp_address(self):
return f"/ip4/{self._ext_ip}/tcp/{self._tcp_port}"
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0):
if self._is_wrapper:
result = self._wrapper_node.subscribe_content_topic(content_topic, timeout_s=timeout_s)
if result.is_err():
raise RuntimeError(f"subscribe_content_topic failed: {result.err()}")
return result.ok_value
else:
return self._api.set_relay_auto_subscriptions([content_topic])
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0):
if self._is_wrapper:
result = self._wrapper_node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s)
if result.is_err():
raise RuntimeError(f"unsubscribe_content_topic failed: {result.err()}")
return result.ok_value
else:
return self._api.delete_relay_auto_subscriptions([content_topic])
def send_message(self, message: dict, *, timeout_s: float = 20.0):
if self._is_wrapper:
result = self._wrapper_node.send_message(message, timeout_s=timeout_s)
if result.is_err():
raise RuntimeError(f"send_message failed: {result.err()}")
return result.ok_value
else:
return self._api.send_relay_auto_message(message)
def info(self):
return self._api.info()
@ -428,6 +580,21 @@ class WakuNode:
def is_nwaku(self):
return "nwaku" in self.image
def prepare_rln_storage_paths(self, cwd, keystore_prefix, selected_id, reset_existing=False):
keystore_dir = os.path.join(cwd, f"keystore_{keystore_prefix}_{selected_id}")
rln_tree_dir = os.path.join(cwd, f"rln_tree_{keystore_prefix}_{selected_id}")
if reset_existing:
for path, path_name in [(keystore_dir, "keystore"), (rln_tree_dir, "rln tree")]:
if os.path.exists(path):
logger.warning(f"Resetting existing RLN {path_name} directory before registration: {path}")
shutil.rmtree(path, ignore_errors=True)
os.makedirs(keystore_dir, exist_ok=True)
os.makedirs(rln_tree_dir, exist_ok=True)
return keystore_dir, rln_tree_dir
def parse_rln_credentials(self, default_args, is_registration):
rln_args = {}
keystore_path = None
@ -440,6 +607,13 @@ class WakuNode:
return rln_args, False, keystore_path
imported_creds = json.loads(rln_creds_source)
rln_chain_id = imported_creds.get("rln-relay-chain-id")
if rln_chain_id is None:
eth_client_address = imported_creds.get("rln-relay-eth-client-address", "")
if "linea" in eth_client_address:
rln_chain_id = "59141"
elif "sepolia" in eth_client_address:
rln_chain_id = "11155111"
if len(imported_creds) < 4 or any(value is None for value in imported_creds.values()):
logger.warn(f"One or more of required RLN credentials were not set properly")
@ -448,6 +622,13 @@ class WakuNode:
eth_private_key = select_private_key(imported_creds, selected_id)
cwd = os.getcwd()
keystore_prefix = default_args.get("rln-keystore-prefix")
if not keystore_prefix:
logger.warn("rln-keystore-prefix is missing, cannot mount RLN state and keystore")
return rln_args, False, keystore_path
keystore_dir, rln_tree_dir = self.prepare_rln_storage_paths(cwd, keystore_prefix, selected_id, reset_existing=is_registration)
if self.is_nwaku():
if is_registration:
@ -470,6 +651,8 @@ class WakuNode:
{
"rln-relay-cred-path": "/keystore/keystore.json",
"rln-relay-cred-password": imported_creds["rln-relay-cred-password"],
"rln-relay-eth-client-address": imported_creds["rln-relay-eth-client-address"],
"rln-relay-eth-contract-address": imported_creds["rln-relay-eth-contract-address"],
}
)
else:
@ -483,12 +666,15 @@ class WakuNode:
}
)
keystore_path = cwd + "/keystore_" + default_args["rln-keystore-prefix"] + "_" + selected_id + "/keystore.json"
if rln_chain_id is not None:
rln_args["rln-relay-chain-id"] = str(rln_chain_id)
keystore_path = os.path.join(keystore_dir, "keystore.json")
self._volumes.extend(
[
cwd + "/rln_tree_" + default_args["rln-keystore-prefix"] + "_" + selected_id + ":/etc/rln_tree",
cwd + "/keystore_" + default_args["rln-keystore-prefix"] + "_" + selected_id + ":/keystore",
f"{rln_tree_dir}:/etc/rln_tree",
f"{keystore_dir}:/keystore",
]
)
@ -577,3 +763,9 @@ class WakuNode:
def get_peer_info(self, peer_id: str):
return self._api.get_peer(peer_id)
@property
def container_id(self) -> str:
if not self._container:
raise RuntimeError("Node container not started yet")
return self._container.id

View File

@ -0,0 +1,81 @@
import sys
from pathlib import Path
from result import Result, Ok, Err
_THIRD_PARTY = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku"
if str(_THIRD_PARTY) not in sys.path:
sys.path.insert(0, str(_THIRD_PARTY))
from wrapper import NodeWrapper as _NodeWrapper # type: ignore[import]
""""
thin manager/wrapper layer around NodeWrapper from the bindings.
It simplifies create, start, and interaction with a Waku node while returning consistent Result objects (Ok / Err).
"""
class WrapperManager:
def __init__(self, node: _NodeWrapper):
self._node = node
@classmethod
def create(
cls,
config: dict,
event_cb=None,
*,
timeout_s: float = 20.0,
) -> Result["WrapperManager", str]:
result = _NodeWrapper.create_node(config, event_cb, timeout_s=timeout_s)
if result.is_err():
return Err(result.err())
return Ok(cls(result.ok_value))
@classmethod
def create_and_start(
cls,
config: dict,
event_cb=None,
*,
timeout_s: float = 20.0,
) -> Result["WrapperManager", str]:
result = _NodeWrapper.create_and_start(config, event_cb, timeout_s=timeout_s)
if result.is_err():
return Err(result.err())
return Ok(cls(result.ok_value))
def __enter__(self) -> "WrapperManager":
return self
def __exit__(self, *_) -> None:
self.stop_and_destroy()
def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.start_node(timeout_s=timeout_s)
def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.stop_node(timeout_s=timeout_s)
def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.destroy(timeout_s=timeout_s)
def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.stop_and_destroy(timeout_s=timeout_s)
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.subscribe_content_topic(content_topic, timeout_s=timeout_s)
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s)
def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]:
return self._node.send_message(message, timeout_s=timeout_s)
def get_available_node_info_ids(self, *, timeout_s: float = 20.0) -> Result[list[str], str]:
return self._node.get_available_node_info_ids(timeout_s=timeout_s)
def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]:
return self._node.get_node_info(node_info_id, timeout_s=timeout_s)
def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]:
return self._node.get_available_configs(timeout_s=timeout_s)

View File

@ -68,11 +68,11 @@ class StepsLightPush(StepsCommon):
self.start_receiving_node(node, node_index=index + 2, lightpush="true", relay="true", pubsub_topic=self.test_pubsub_topic, **kwargs)
@allure.step
def setup_first_lightpush_node(self, lightpush="true", relay="false", **kwargs):
def setup_first_lightpush_node(self, lightpush="true", relay="true", **kwargs):
self.light_push_node1 = self.setup_lightpush_node(NODE_2, node_index=1, lightpush=lightpush, relay=relay, **kwargs)
@allure.step
def setup_second_lightpush_node(self, lightpush="true", relay="false", **kwargs):
def setup_second_lightpush_node(self, lightpush="true", relay="true", **kwargs):
self.light_push_node2 = self.setup_lightpush_node(NODE_2, node_index=2, lightpush=lightpush, relay=relay, **kwargs)
@allure.step
@ -83,7 +83,7 @@ class StepsLightPush(StepsCommon):
pytest.skip("ADDITIONAL_NODES/node_list is empty, cannot run test")
self.additional_lightpush_nodes = []
for index, node in enumerate(nodes):
node = self.setup_lightpush_node(node, node_index=index + 2, lightpush="true", relay="false", **kwargs)
node = self.setup_lightpush_node(node, node_index=index + 2, lightpush="true", relay="true", **kwargs)
self.additional_lightpush_nodes.append(node)
@allure.step

View File

@ -0,0 +1,201 @@
import subprocess
from src.env_vars import NETWORK_NAME
from src.libs.custom_logger import get_custom_logger
logger = get_custom_logger(__name__)
class TrafficController:
def _pid(self, node) -> int:
if not node.container:
raise RuntimeError("Node container not started yet")
node.container.reload()
pid = node.container.attrs.get("State", {}).get("Pid")
if not pid or pid == 0:
raise RuntimeError("Container PID not available (container not running?)")
return int(pid)
def _exec(self, node, tc_args: list[str], iface: str = "eth0"):
pid = self._pid(node)
cmd = ["sudo", "-n", "nsenter", "-t", str(pid), "-n", "tc"] + tc_args
logger.info(f"TC exec: {cmd}")
res = subprocess.run(cmd, capture_output=True, text=True)
if res.returncode != 0:
raise RuntimeError(f"TC failed: {' '.join(cmd)}\n" f"stdout: {res.stdout}\n" f"stderr: {res.stderr}")
return res.stdout
def log_tc_stats(self, node, iface: str = "eth0"):
"""
Log tc statistics for an interface (best-effort).
Useful to confirm netem loss/delay counters (sent/dropped/etc.).
"""
try:
out = self._exec(node, ["-s", "qdisc", "show", "dev", iface], iface=iface)
out = (out or "").strip()
if out:
logger.debug(f"tc -s qdisc show dev {iface}:\n{out}")
else:
logger.debug(f"tc -s qdisc show dev {iface}: (no output)")
except Exception as e:
logger.debug(f"Failed to read tc stats for {iface}: {e}")
def clear(self, node, iface: str = "eth0"):
try:
self._exec(node, ["qdisc", "del", "dev", iface, "root"], iface=iface)
except RuntimeError as e:
msg = str(e)
if "Cannot delete qdisc with handle of zero" in msg or "No such file or directory" in msg:
return
raise
def add_latency(self, node, ms: int, iface: str = "eth0"):
self.clear(node, iface=iface)
self._exec(node, ["qdisc", "add", "dev", iface, "root", "netem", "delay", f"{ms}ms"], iface=iface)
def add_packet_loss(self, node, percent: float, iface: str = "eth0"):
self.clear(node, iface=iface)
self._exec(
node,
["qdisc", "add", "dev", iface, "root", "netem", "loss", f"{percent}%"],
iface=iface,
)
try:
stats = self._exec(node, ["-s", "qdisc", "show", "dev", iface], iface=iface)
if stats is not None:
if isinstance(stats, (bytes, bytearray)):
stats = stats.decode(errors="replace")
logger.debug(f"tc -s qdisc show dev {iface}:\n{stats}")
else:
logger.debug(f"Executed: tc -s qdisc show dev {iface} (no output returned by _exec)")
except Exception as e:
logger.debug(f"Failed to read tc stats for {iface}: {e}")
def add_bandwidth(self, node, rate: str, iface: str = "eth0"):
self.clear(node, iface=iface)
self._exec(
node,
["qdisc", "add", "dev", iface, "root", "tbf", "rate", rate, "burst", "32kbit", "limit", "12500"],
iface=iface,
)
def add_packet_loss_correlated(
self,
node,
percent: float,
correlation: float,
iface: str = "eth0",
):
self.clear(node, iface=iface)
self._exec(
node,
[
"qdisc",
"add",
"dev",
iface,
"root",
"netem",
"loss",
f"{percent}%",
f"{correlation}%",
],
iface=iface,
)
def add_packet_reordering(
self,
node,
percent: int = 25,
correlation: int = 50,
delay_ms: int = 10,
iface: str = "eth0",
):
self.clear(node, iface=iface)
self._exec(
node,
[
"qdisc",
"add",
"dev",
iface,
"root",
"netem",
"delay",
f"{delay_ms}ms",
"reorder",
f"{percent}%",
f"{correlation}%",
],
iface=iface,
)
def _p2p_iface(self, node) -> str:
"""
Return the name of the container interface attached to the waku
network (where libp2p traffic flows).
DockerManager attaches each node to two networks: the default bridge
(where host-published ports land, typically `eth0`) and the waku
network (where inter-container libp2p/gossipsub traffic flows, typically
`eth1`). tc on the default bridge only affects REST control plane; for
a packet loss test targeting libp2p we need the waku interface.
This helper resolves the correct interface by looking up the node's
waku-network IP via Docker and matching it against `ip -o -4 addr`
output from inside the container.
"""
if not node.container:
raise RuntimeError("Node container not started yet")
node.container.reload()
networks = node.container.attrs.get("NetworkSettings", {}).get("Networks", {})
waku_net = networks.get(NETWORK_NAME)
if not waku_net or not waku_net.get("IPAddress"):
raise RuntimeError(f"Container is not attached to the '{NETWORK_NAME}' docker network")
waku_ip = waku_net["IPAddress"]
exit_code, output = node.container.exec_run(["ip", "-o", "-4", "addr"])
if exit_code != 0:
raise RuntimeError(f"ip addr failed inside container: {output}")
for line in output.decode().splitlines():
if f" {waku_ip}/" in line:
tokens = line.split()
if len(tokens) >= 2:
return tokens[1]
raise RuntimeError(f"No interface inside container holds waku IP {waku_ip}")
def clear_p2p(self, node):
"""
Remove any tc rule previously installed on the node's waku (libp2p)
interface. Paired with add_packet_loss_p2p_only /
add_packet_loss_correlated_p2p_only.
"""
self.clear(node, iface=self._p2p_iface(node))
def add_packet_loss_p2p_only(self, node, percent: float):
"""
Apply uncorrelated packet loss to the waku (libp2p) network interface
of a node. REST API traffic rides a separate docker interface and is
not affected, so the test harness's control plane stays reliable.
"""
iface = self._p2p_iface(node)
self.clear(node, iface=iface)
self._exec(node, f"qdisc add dev {iface} root netem loss {percent}%".split(), iface=iface)
def add_packet_loss_correlated_p2p_only(self, node, percent: float, correlation: float):
"""
Correlated packet loss on the waku (libp2p) network interface. See
add_packet_loss_p2p_only for why REST stays unaffected.
"""
iface = self._p2p_iface(node)
self.clear(node, iface=iface)
self._exec(
node,
f"qdisc add dev {iface} root netem loss {percent}% {correlation}%".split(),
iface=iface,
)

View File

@ -26,6 +26,7 @@ class StepsRLN(StepsCommon):
multiaddr_list = []
lightpush_nodes = []
keystore_prefixes = []
rln_membership_indexes = []
@allure.step
def generate_keystore_prefixes(self, count=2):
@ -38,15 +39,19 @@ class StepsRLN(StepsCommon):
@allure.step
def register_rln_relay_nodes(self, count, orig_prefixes):
if count > 0:
logger.debug(111111111111111)
self.keystore_prefixes = self.generate_keystore_prefixes(count)
self.rln_membership_indexes = []
for i, prefix in enumerate(self.keystore_prefixes):
logger.debug(000000000000000000000)
self.register_rln_single_node(prefix=prefix, rln_creds_source=RLN_CREDENTIALS, rln_creds_id=f"{i+1}")
membership_index = self.register_rln_single_node(prefix=prefix, rln_creds_source=RLN_CREDENTIALS, rln_creds_id=f"{i+1}")
self.rln_membership_indexes.append(membership_index)
else:
self.keystore_prefixes = orig_prefixes
self.keystore_prefixes = orig_prefixes.get("keystore_prefixes", [])
self.rln_membership_indexes = orig_prefixes.get("rln_membership_indexes", [])
return self.keystore_prefixes
return {
"keystore_prefixes": self.keystore_prefixes,
"rln_membership_indexes": self.rln_membership_indexes,
}
@allure.step
def setup_main_rln_relay_nodes(self, **kwargs):
@ -60,7 +65,7 @@ class StepsRLN(StepsCommon):
relay="true",
rln_creds_source=RLN_CREDENTIALS,
rln_creds_id="1",
rln_relay_membership_index="1",
rln_relay_membership_index=self.resolve_rln_membership_index(0, **kwargs),
rln_keystore_prefix=self.keystore_prefixes[0],
**kwargs,
)
@ -78,7 +83,7 @@ class StepsRLN(StepsCommon):
discv5_bootstrap_node=self.enr_uri,
rln_creds_source=RLN_CREDENTIALS,
rln_creds_id="2",
rln_relay_membership_index="1",
rln_relay_membership_index=self.resolve_rln_membership_index(1, **kwargs),
rln_keystore_prefix=self.keystore_prefixes[1],
**kwargs,
)
@ -101,7 +106,7 @@ class StepsRLN(StepsCommon):
discv5_bootstrap_node=self.enr_uri,
rln_creds_source=RLN_CREDENTIALS,
rln_creds_id=f"{index + 3}",
rln_relay_membership_index="1",
rln_relay_membership_index=self.resolve_rln_membership_index(index + 2, **kwargs),
rln_keystore_prefix=self.keystore_prefixes[index + 2],
**kwargs,
)
@ -118,7 +123,7 @@ class StepsRLN(StepsCommon):
lightpushnode=self.multiaddr_list[0],
rln_creds_source=RLN_CREDENTIALS,
rln_creds_id="2",
rln_relay_membership_index="1",
rln_relay_membership_index=self.resolve_rln_membership_index(1, **kwargs),
rln_keystore_prefix=self.keystore_prefixes[1],
**kwargs,
)
@ -131,7 +136,23 @@ class StepsRLN(StepsCommon):
def register_rln_single_node(self, prefix="", **kwargs):
logger.debug("Registering RLN credentials for single node")
self.node = WakuNode(DEFAULT_NWAKU, f"node_{gen_step_id()}")
self.node.register_rln(rln_keystore_prefix=prefix, rln_creds_source=kwargs["rln_creds_source"], rln_creds_id=kwargs["rln_creds_id"])
return self.node.register_rln(rln_keystore_prefix=prefix, rln_creds_source=kwargs["rln_creds_source"], rln_creds_id=kwargs["rln_creds_id"])
@allure.step
def resolve_rln_membership_index(self, index, **kwargs):
explicit_index = kwargs.get("rln_relay_membership_index")
if explicit_index is not None:
return str(explicit_index)
if len(self.rln_membership_indexes) > index and self.rln_membership_indexes[index] is not None:
inferred_index = str(self.rln_membership_indexes[index])
logger.debug(f"Using inferred RLN membership index for position {index}: {inferred_index}")
return inferred_index
raise ValueError(
f"RLN membership index for position {index} is not available. "
"Register credentials and persist rln_membership_indexes together with keystore_prefixes before node startup."
)
@allure.step
def check_rln_registration(self, prefix, key_id):

View File

@ -20,9 +20,9 @@ logger = get_custom_logger(__name__)
class StepsSharding(StepsRelay):
test_content_topic = "/myapp/1/latest/proto"
test_pubsub_topic = "/waku/2/rs/2/0"
test_pubsub_topic = "/waku/2/rs/199/0"
test_payload = "Sharding works!!"
auto_cluster = 2
auto_cluster = 199
num_shards_in_network = 8
@pytest.fixture(scope="function", autouse=True)
@ -195,7 +195,13 @@ class StepsSharding(StepsRelay):
self.check_published_message_reaches_relay_peer(pubsub_topic=pubsub_topic, content_topic=content_topic)
raise AssertionError("Retrieving messages on not subscribed content topic worked!!!")
except Exception as ex:
assert "Not Found" in str(ex)
error_message = str(ex)
expected_errors = [
"Not Found",
"NoPeersToPublish",
"Failed to publish: publish failed in relay: NoPeersToPublish",
]
assert any(expected in error_message for expected in expected_errors), error_message
@allure.step
def check_publish_fails_on_not_subscribed_pubsub_topic(self, pubsub_topic):

View File

@ -22,12 +22,12 @@ SAMPLE_INPUTS = [
{"description": "Email format", "value": "user@example.com"},
{"description": "URL format", "value": "http://example.com"},
{"description": "Date and time in ISO format", "value": "2023-11-01T12:00:00Z"},
# {"description": "String with escaped quotes", "value": '"Escaped" \\"quotes\\"'}, # https://github.com/waku-org/nwaku/issues/3572
{"description": "String with escaped quotes", "value": '"Escaped" \\"quotes\\"'},
{"description": "A regular expression", "value": "Regular expression: ^[a-z0-9_-]{3,16}$"},
{"description": "A very long string", "value": "x" * 1000},
{"description": "A JSON string", "value": '{"name": "John", "age": 30, "city": "New York"}'},
{"description": "A Unix path", "value": "/usr/local/bin"},
# {"description": "A Windows path", "value": "C:\\Windows\\System32"}, # https://github.com/waku-org/nwaku/issues/3572
{"description": "A Windows path", "value": "C:\\Windows\\System32"},
{"description": "An SQL query", "value": "SELECT * FROM users WHERE id = 1;"},
{"description": "JavaScript code snippet", "value": "function test() { console.log('Hello World'); }"},
{"description": "A CSS snippet", "value": "body { background-color: #fff; }"},
@ -96,7 +96,7 @@ CONTENT_TOPICS_SHARD_7 = [
"/newsService/4.0/updates/yaml",
]
DEFAULT_CLUSTER_ID = "3"
DEFAULT_CLUSTER_ID = "198"
VALID_PUBSUB_TOPICS = [
f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0",
f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1",
@ -130,14 +130,14 @@ PUBSUB_TOPICS_DIFFERENT_CLUSTERS = [
]
PUBSUB_TOPICS_SAME_CLUSTER = [
"/waku/2/rs/2/0",
"/waku/2/rs/2/1",
"/waku/2/rs/2/2",
"/waku/2/rs/2/3",
"/waku/2/rs/2/4",
"/waku/2/rs/2/5",
"/waku/2/rs/2/6",
"/waku/2/rs/2/7",
"/waku/2/rs/199/0",
"/waku/2/rs/199/1",
"/waku/2/rs/199/2",
"/waku/2/rs/199/3",
"/waku/2/rs/199/4",
"/waku/2/rs/199/5",
"/waku/2/rs/199/6",
"/waku/2/rs/199/7",
]
PUBSUB_TOPICS_WRONG_FORMAT = [
@ -168,7 +168,7 @@ SAMPLE_TIMESTAMPS = [
{"description": "Missing", "value": None, "valid_for": []},
]
PUBSUB_TOPICS_RLN = ["/waku/2/rs/1/0"]
PUBSUB_TOPICS_RLN = [f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0"]
LOG_ERROR_KEYWORDS = [
"crash",
@ -203,14 +203,6 @@ METRICS_WITH_INITIAL_VALUE_ZERO = [
"libp2p_failed_dials_total",
"waku_rln_messages_total_total",
"waku_rln_spam_messages_total_total",
"waku_rln_valid_messages_total_sum",
"waku_rln_valid_messages_total_count",
'waku_rln_valid_messages_total_bucket{le="10.0"}',
'waku_rln_valid_messages_total_bucket{le="20.0"}',
'waku_rln_valid_messages_total_bucket{le="30.0"}',
'waku_rln_valid_messages_total_bucket{le="40.0"}',
'waku_rln_valid_messages_total_bucket{le="50.0"}',
'waku_rln_valid_messages_total_bucket{le="+Inf"}',
"waku_rln_proof_verification_total_total",
"waku_rln_number_registered_memberships",
"waku_rln_proof_verification_duration_seconds",
@ -285,40 +277,6 @@ METRICS_WITH_INITIAL_VALUE_ZERO = [
'waku_archive_query_duration_seconds_bucket{le="7.5"}',
'waku_archive_query_duration_seconds_bucket{le="10.0"}',
'waku_archive_query_duration_seconds_bucket{le="+Inf"}',
"waku_legacy_archive_insert_duration_seconds_sum",
"waku_legacy_archive_insert_duration_seconds_count",
'waku_legacy_archive_insert_duration_seconds_bucket{le="0.005"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="0.01"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="0.025"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="0.05"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="0.075"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="0.1"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="0.25"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="0.5"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="0.75"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="1.0"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="2.5"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="5.0"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="7.5"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="10.0"}',
'waku_legacy_archive_insert_duration_seconds_bucket{le="+Inf"}',
"waku_legacy_archive_query_duration_seconds_sum",
"waku_legacy_archive_query_duration_seconds_count",
'waku_legacy_archive_query_duration_seconds_bucket{le="0.005"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="0.01"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="0.025"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="0.05"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="0.075"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="0.1"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="0.25"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="0.5"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="0.75"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="1.0"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="2.5"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="5.0"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="7.5"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="10.0"}',
'waku_legacy_archive_query_duration_seconds_bucket{le="+Inf"}',
"waku_filter_subscriptions",
"waku_filter_handle_message_duration_seconds_sum",
"waku_filter_handle_message_duration_seconds_count",
@ -345,7 +303,7 @@ METRICS_WITH_INITIAL_VALUE_ZERO = [
"waku_px_peers_received_total",
"waku_px_peers_received_unknown",
"waku_px_peers_sent_total",
"waku_px_peers_cached",
"waku_px_peers",
"waku_histogram_message_size_sum",
"waku_histogram_message_size_count",
'waku_histogram_message_size_bucket{le="0.0"}',
@ -391,8 +349,8 @@ METRICS_WITH_INITIAL_VALUE_ZERO = [
'waku_filter_handle_message_duration_seconds_bucket{le="20.0"}',
'waku_filter_handle_message_duration_seconds_bucket{le="30.0"}',
"total_messages_cached",
"waku_legacy_store_queries_total",
"waku_store_queries_total",
"mix_pool_size",
"libp2p_gossipsub_imreceiving_saved_messages_total",
"postgres_payload_size_bytes",
]

View File

@ -66,7 +66,7 @@ class TestDiscv5(StepsRelay, StepsFilter, StepsStore, StepsLightPush):
self.light_push_node1 = self.running_a_node(
NODE_2,
lightpush="true",
relay="false",
relay="true",
discv5_bootstrap_node=self.receiving_node1.get_enr_uri(),
lightpushnode=self.receiving_node1.get_multiaddr_with_id(),
)

View File

@ -0,0 +1,781 @@
import pytest
from time import time, sleep
from src.libs.custom_logger import get_custom_logger
from src.env_vars import NODE_1, NODE_2
from src.node.waku_node import WakuNode
from src.steps.relay import StepsRelay
from src.libs.common import delay
from src.steps.network_conditions import TrafficController
from src.libs.common import to_base64
logger = get_custom_logger(__name__)
class TestNetworkConditions(StepsRelay):
@pytest.fixture(scope="function", autouse=True)
def setup_nodes(self, request):
self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}")
self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}")
self.tc = TrafficController()
def test_relay_with_latency_between_two_nodes(self):
logger.info("Starting node1 and node2 with relay enabled")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
logger.info("Subscribing both nodes to relay topic")
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
logger.info("Waiting for autoconnection")
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10)
logger.debug("Applying 5000ms latency to node2")
self.tc.add_latency(self.node2, ms=5000)
message = self.create_message()
logger.debug("Publishing message from node1")
self.node1.send_relay_message(message, self.test_pubsub_topic)
logger.debug("Fetching relay messages on node2")
t0 = time()
messages = self.node2.get_relay_messages(self.test_pubsub_topic)
dt = time() - t0
assert messages, "Messages aren't arrive"
assert dt >= 4.5, f"Expected slow GET due to latency, got {dt}"
assert dt <= 10.5, "msg took too long"
self.tc.clear(self.node2)
@pytest.mark.timeout(60 * 8)
def test_relay_7_nodes_3sec_latency(self):
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node5 = WakuNode(NODE_2, f"node5_{self.test_id}")
self.node6 = WakuNode(NODE_2, f"node6_{self.test_id}")
self.node7 = WakuNode(NODE_2, f"node7_{self.test_id}")
nodes = [self.node1, self.node2, self.node3, self.node4, self.node5, self.node6, self.node7]
logger.info("Starting nodes with chain bootstrap")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri())
self.node5.start(relay="true", discv5_bootstrap_node=self.node4.get_enr_uri())
self.node6.start(relay="true", discv5_bootstrap_node=self.node5.get_enr_uri())
self.node7.start(relay="true", discv5_bootstrap_node=self.node6.get_enr_uri())
logger.info("Subscribing all nodes to relay topic")
for node in nodes:
node.set_relay_subscriptions([self.test_pubsub_topic])
logger.info("Waiting for autoconnection")
self.wait_for_autoconnection(nodes, hard_wait=60)
logger.info("Applying 3s latency to node3")
self.tc.add_latency(self.node3, ms=3000)
t_start = time()
_ = self.node3.get_relay_messages(self.test_pubsub_topic)
elapsed = time() - t_start
logger.info(f"Observed GET latency on node3: {elapsed:.2f}s")
assert elapsed >= 2.8, f"Expected ~3s latency on node3 GET, got {elapsed:.2f}s"
@pytest.mark.timeout(60 * 6)
def test_relay_4_nodes_sender_latency(self):
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
logger.info("Starting 4 nodes with relay enabled (bootstrap chain)")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri())
nodes = [self.node1, self.node2, self.node3, self.node4]
for n in nodes:
n.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection(nodes, hard_wait=40)
latency_ms = 3000
logger.info(f"Applying {latency_ms}ms latency on sender node1")
self.tc.add_latency(self.node1, ms=latency_ms)
t_pub0 = time()
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
publish_dt = time() - t_pub0
assert publish_dt > ((latency_ms * 2) / 1000.0) - 0.4, f"Expected publish call to be slowed by sender latency. "
assert publish_dt <= ((latency_ms * 2) / 1000.0) + 0.4, f"Publish call took too long"
# latency is doubled as request + response both will have latency
deadline = t_pub0 + 10.0
received = False
while time() < deadline:
msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or []
if msgs:
received = True
break
delay(0.2)
assert received, f"node4 did not receive any relay message within {deadline}"
self.tc.clear(self.node1)
@pytest.mark.timeout(60 * 8)
def test_relay_4_nodes_two_publishers_compare_latency(self):
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
logger.info("Starting 4 nodes ")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri())
nodes = [self.node1, self.node2, self.node3, self.node4]
for n in nodes:
n.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection(nodes, hard_wait=60)
latency_ms = 3000
logger.debug(f"Applying {latency_ms}ms latency on node1 only")
self.tc.add_latency(self.node1, ms=latency_ms)
node1_dts = []
node2_dts = []
_ = self.node4.get_relay_messages(self.test_pubsub_topic)
for i in range(5):
t0 = time()
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
node1_dts.append(time() - t0)
t0 = time()
self.node2.send_relay_message(
self.create_message(),
self.test_pubsub_topic,
)
node2_dts.append(time() - t0)
delay(0.2)
for dt in node1_dts:
assert dt <= ((latency_ms * 2) / 1000.0) + 0.4, "node1 publish took too long"
for dt in node2_dts:
assert dt < 1.0, f"Expected node2 publish to be fast"
deadline = time() + 10.0
received = False
while time() < deadline:
msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or []
if msgs:
received = True
break
delay(0.2)
assert received, f"node4 did not receive any relay message within time"
self.tc.clear(self.node1)
@pytest.mark.timeout(60 * 6)
@pytest.mark.parametrize(
"latency_ms",
[
200,
500,
1000,
5000,
7000,
],
)
def test_relay_different_latency_between_two_nodes(self, latency_ms):
logger.info("Starting node1 and node2 with relay enabled")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
logger.info("Subscribing both nodes to relay topic")
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
logger.info("Waiting for autoconnection")
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10)
logger.info(f"Applying {latency_ms}ms latency to node2")
self.tc.clear(self.node2)
if latency_ms > 0:
self.tc.add_latency(self.node2, ms=latency_ms)
message = self.create_message()
self.node1.send_relay_message(message, self.test_pubsub_topic)
t0 = time()
messages = self.node2.get_relay_messages(self.test_pubsub_topic)
dt = time() - t0
assert messages, "No relay messages returned (publish/relay may have failed)"
expected_s = (latency_ms / 1000.0) * 2
tolerance_s = 0.5
assert dt >= expected_s - tolerance_s, f"Expected >= {expected_s - tolerance_s}s, got {dt}s"
assert dt <= expected_s + tolerance_s, f"Expected <= {expected_s + tolerance_s}s, got {dt}s"
self.tc.clear(self.node2)
@pytest.mark.timeout(60 * 10)
def test_latency_with_load_sender_side(self):
latency_ms = 3000
total_messages = 50
wait_time = 40.0
acceptable_msgs = total_messages / 2
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
logger.info("Starting 4 nodes with relay enabled")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
logger.info("Subscribing all nodes to relay topic")
for n in [self.node1, self.node2, self.node3, self.node4]:
n.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection([self.node1, self.node2, self.node3, self.node4], hard_wait=30)
_ = self.node4.get_relay_messages(self.test_pubsub_topic)
logger.info(f"Applying {latency_ms}ms latency on sender node1")
self.tc.clear(self.node1)
self.tc.add_latency(self.node1, ms=latency_ms)
logger.info(f"Sending {total_messages} messages from node1")
for i in range(total_messages):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
received_count = 0
last_count = 0
ticks = 0
deadline = time() + wait_time
while time() < deadline:
msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or []
if len(msgs) > last_count:
ticks += 1
last_count = len(msgs)
received_count = max(received_count, len(msgs))
if received_count >= acceptable_msgs:
break
delay(1)
logger.info(f"Node4 received {received_count} messages " f"(min_expected={acceptable_msgs}, total_sent={total_messages})")
assert received_count >= acceptable_msgs, "relay stalled or dropped all traffic; "
self.tc.clear(self.node1)
def test_relay_4_nodes_sender_packet_loss(self):
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
logger.info("Starting 4 nodes with relay enabled (bootstrap chain)")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri())
nodes = [self.node1, self.node2, self.node3, self.node4]
for n in nodes:
n.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection(nodes, hard_wait=20)
loss_percent = 30.0
logger.info(f"Applying {loss_percent}% packet loss on sender node1")
self.tc.add_packet_loss(self.node1, percent=loss_percent)
_ = self.node4.get_relay_messages(self.test_pubsub_topic)
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
self.tc.log_tc_stats(self.node1)
deadline = time() + 10.0
received = False
cnt = 0
while time() < deadline:
msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or []
if msgs:
received = True
break
delay(0.2)
cnt = cnt + 1
assert received, f"Node4 did not receive any relay message"
logger.info(f"Node4 received messages from node1 after " f"{cnt} trails")
self.tc.clear(self.node1)
@pytest.mark.xfail(reason="Fails under high packet loss percentage 50")
def test_relay_4_nodes_sender_packet_loss_50_15sec_timeout(self):
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri())
nodes = [self.node1, self.node2, self.node3, self.node4]
for n in nodes:
n.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection(nodes, hard_wait=20)
loss = 50.0
total_msgs = 30
window_s = 15.0
self.tc.clear(self.node1)
self.tc.add_packet_loss(self.node1, percent=loss)
_ = self.node4.get_relay_messages(self.test_pubsub_topic)
batch_tag = f"loss={loss}-{self.test_id}"
batch_tag_b64 = to_base64(batch_tag)
for _ in range(total_msgs):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
self.tc.log_tc_stats(self.node1)
delay(window_s)
msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or []
received = len(msgs)
logger.info(f"[LOSS={loss}%] sent={total_msgs} received={received} window={window_s}s")
self.tc.clear(self.node1)
assert received > int(total_msgs * 0.8), "No messages received under 50% packet loss"
@pytest.mark.timeout(60 * 10)
@pytest.mark.xfail(reason="Fails under high packet loss percentage")
@pytest.mark.parametrize("loss", [40.0, 60.0], ids=["loss40", "loss60"])
def test_relay_4_nodes_sender_packet_loss_delivery_ratio_simple(self, loss):
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri())
nodes = [self.node1, self.node2, self.node3, self.node4]
for n in nodes:
n.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection(nodes, hard_wait=20)
total_msgs = 30
window_s = 40.0
self.tc.clear(self.node1)
self.tc.add_packet_loss(self.node1, percent=loss)
_ = self.node4.get_relay_messages(self.test_pubsub_topic)
for _ in range(total_msgs):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
self.tc.log_tc_stats(self.node1)
delay(window_s)
msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or []
received = len(msgs)
logger.info(f"[LOSS={loss}%] sent={total_msgs} received={received} window={window_s}s")
self.tc.clear(self.node1)
assert received > int(total_msgs * 0.8), f"No messages received at {loss}% packet loss"
@pytest.mark.timeout(60 * 10)
def test_relay_packet_loss_correlated_vs_uncorrelated(self):
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri())
nodes = [self.node1, self.node2, self.node3, self.node4]
for n in nodes:
n.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection(nodes, hard_wait=20)
total_msgs = 5
window_s = 30.0
loss = 50.0
self.tc.add_packet_loss_p2p_only(self.node1, percent=loss)
_ = self.node4.get_relay_messages(self.test_pubsub_topic)
for _ in range(total_msgs):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
delay(window_s)
uncorrelated = len(self.node4.get_relay_messages(self.test_pubsub_topic) or [])
self.tc.clear_p2p(self.node1)
self.tc.add_packet_loss_correlated_p2p_only(self.node1, percent=loss, correlation=75.0)
_ = self.node4.get_relay_messages(self.test_pubsub_topic)
for _ in range(total_msgs):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
delay(window_s)
correlated = len(self.node4.get_relay_messages(self.test_pubsub_topic) or [])
self.tc.clear_p2p(self.node1)
logger.debug(f"uncorrelated={uncorrelated} correlated={correlated}")
assert uncorrelated >= correlated
assert correlated > 0
@pytest.mark.timeout(60 * 10)
def test_relay_packet_loss_sender_vs_receiver(self):
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri())
nodes = [self.node1, self.node2, self.node3, self.node4]
for n in nodes:
n.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection(nodes, hard_wait=20)
total_msgs = 30
window_s = 30.0
loss = 50.0
self.tc.add_packet_loss(self.node1, percent=loss)
_ = self.node4.get_relay_messages(self.test_pubsub_topic)
for _ in range(total_msgs):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
delay(window_s)
sender_loss = len(self.node4.get_relay_messages(self.test_pubsub_topic) or [])
self.tc.clear(self.node1)
self.tc.add_packet_loss(self.node4, percent=loss)
_ = self.node4.get_relay_messages(self.test_pubsub_topic)
for _ in range(total_msgs):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
delay(window_s)
receiver_loss = len(self.node4.get_relay_messages(self.test_pubsub_topic) or [])
logger.debug(f"sender_loss={sender_loss} receiver_loss={receiver_loss}")
self.tc.clear(self.node4)
assert sender_loss > 0
assert receiver_loss > 0
@pytest.mark.timeout(60 * 10)
def test_relay_packet_loss_applied_mid_way(self):
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri())
nodes = [self.node1, self.node2, self.node3, self.node4]
for n in nodes:
n.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection(nodes, hard_wait=20)
window_s = 30.0
_ = self.node4.get_relay_messages(self.test_pubsub_topic)
for _ in range(10):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
self.tc.add_packet_loss(self.node1, percent=50.0)
for _ in range(20):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
delay(window_s)
received = len(self.node4.get_relay_messages(self.test_pubsub_topic) or [])
self.tc.clear(self.node1)
assert received > 0
@pytest.mark.timeout(60 * 2)
def test_relay_2_nodes_low_bandwidth_reliability(self):
msg_count = 200
max_wait = 50
poll_sleep = 0.5
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
self.node1.start(relay="true")
self.node2.start(
relay="true",
discv5_bootstrap_node=self.node1.get_enr_uri(),
rest_relay_cache_capacity="250",
)
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10)
self.tc.add_bandwidth(self.node1, rate="256kbit")
for _ in range(msg_count):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
t0 = time()
received = 0
while time() - t0 < max_wait:
msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or []
received = len(msgs)
if received >= msg_count:
break
sleep(poll_sleep)
elapsed = time() - t0
logger.info(f"low_bw_elapsed={elapsed:.2f}s received={received} msg_count={msg_count}")
assert received >= msg_count
@pytest.mark.timeout(60 * 8)
def test_relay_2_nodes_low_bandwidth_sending_over_time(self):
msg_count = 200
cache_capacity = "250"
send_interval_s = 0.05
poll_interval_s = 0.5
max_wait_s = 240
min_received = 150
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
self.node1.start(relay="true")
self.node2.start(
relay="true",
discv5_bootstrap_node=self.node1.get_enr_uri(),
rest_relay_cache_capacity=cache_capacity,
)
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10)
# mirror your suite's "low bandwidth" shaping values (rate + burst/limit handled by the tc wrapper defaults)
self.tc.add_bandwidth(self.node1, rate="256kbit")
received = 0
for _ in range(msg_count):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
sleep(send_interval_s)
t0 = time()
while time() - t0 < max_wait_s and received < msg_count:
msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or []
if msgs:
received += len(msgs)
if received >= msg_count:
break
sleep(poll_interval_s)
total_time = time() - t0
logger.info(
"low_bw_reliability "
f"rate=256kbit msg_count={msg_count} cache_capacity={cache_capacity} "
f"send_interval_s={send_interval_s} poll_interval_s={poll_interval_s} "
f"recv_duration={total_time:.2f}s"
)
assert received >= min_received, f"received {received}/{msg_count} under low bandwidth"
@pytest.mark.timeout(60 * 8)
def test_relay_2_nodes_bandwidth_low_vs_high_drain_time(self):
# large payload (~16KB) so 50 msgs = ~800KB total,
# making 256kbit meaningfully slower than 10mbit on loopback
msg_count = 50
large_payload = to_base64("x" * 16_000)
cache = "100"
# fine-grained poll so sub-second differences are measurable
poll_sleep = 0.05
max_wait = 200
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
self.node1.start(relay="true", rest_relay_cache_capacity=cache)
self.node2.start(
relay="true",
discv5_bootstrap_node=self.node1.get_enr_uri(),
rest_relay_cache_capacity=cache,
)
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10)
# apply tc to both nodes so intra-host loopback fast-path is throttled
self.tc.add_bandwidth(self.node1, rate="256kbit")
self.tc.add_bandwidth(self.node2, rate="256kbit")
_ = self.node2.get_relay_messages(self.test_pubsub_topic)
for _ in range(msg_count):
self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic)
total_low_msgs = 0
t0 = time()
while total_low_msgs < msg_count and (time() - t0) < max_wait:
msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or []
total_low_msgs += len(msgs)
sleep(poll_sleep)
low_rate_t = time() - t0
# upgrade both nodes to high bandwidth
self.tc.add_bandwidth(self.node1, rate="10mbit")
self.tc.add_bandwidth(self.node2, rate="10mbit")
_ = self.node2.get_relay_messages(self.test_pubsub_topic)
sleep(1) # let the phase-1 shaper queue fully drain before phase 2
for _ in range(msg_count):
self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic)
total_high_msgs = 0
t1 = time()
while total_high_msgs < msg_count and (time() - t1) < max_wait:
msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or []
total_high_msgs += len(msgs)
sleep(poll_sleep)
high_rate_t = time() - t1
logger.info(
f"low_rate_t={low_rate_t:.2f}s high_rate_t={high_rate_t:.2f}s "
f"total_low_msgs={total_low_msgs} total_high_msgs={total_high_msgs} "
f"msg_count={msg_count} cache={cache}"
)
assert total_low_msgs >= msg_count
assert total_high_msgs >= msg_count
# Assert high bandwidth was meaningfully faster, not just marginally so,
# to absorb scheduling jitter on localhost Docker
assert high_rate_t < low_rate_t / 2
@pytest.mark.timeout(60 * 6)
def test_relay_2_nodes_packet_reordering(self):
msg_count = 200
cache_capacity = "200"
poll_sleep = 0.5
max_wait = 120
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
self.node1.start(relay="true")
self.node2.start(
relay="true",
discv5_bootstrap_node=self.node1.get_enr_uri(),
rest_relay_cache_capacity=cache_capacity,
)
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10)
self.tc.add_packet_reordering(self.node2, percent=25, correlation=50)
for _ in range(msg_count):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
received = 0
t0 = time()
while received < msg_count and time() - t0 < max_wait:
msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or []
received += len(msgs)
sleep(poll_sleep)
elapsed = time() - t0
logger.info(f"packet_reordering " f"reorder=25% corr=50% " f"msg_count={msg_count} received={received} " f"elapsed={elapsed:.2f}s")
assert received >= msg_count
@pytest.mark.timeout(60 * 6)
def test_relay_2_nodes_temporary_blackout_recovers_no_helpers(self):
msgs_count = 100
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
self.tc = TrafficController()
logger.info("Starting node1 and node2 with relay enabled")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
logger.info("Subscribing both nodes to relay topic")
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
logger.info("Waiting for autoconnection")
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=15)
logger.info(f"Applying 100%% packet loss on both nodes ")
self.tc.clear(self.node1)
self.tc.clear(self.node2)
self.tc.add_packet_loss(self.node1, percent=100.0)
self.tc.add_packet_loss(self.node2, percent=100.0)
delay(5)
logger.info("Clearing tc rules (restore connectivity)")
self.tc.clear(self.node1)
self.tc.clear(self.node2)
logger.info("Waiting for peer list recovery on both nodes")
peers1 = 0
while time() < time() + 30.0:
peers1 = self.node1.get_peers() or []
peers2 = self.node2.get_peers() or []
if len(peers1) > 0 and len(peers2) > 0:
break
delay(0.5)
assert len(peers1) > 0, "Peers did not recover after blackout (would require restart)"
logger.info("Publishing after recovery")
for _ in range(msgs_count):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
delay(5)
msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or []
assert len(msgs) >= msgs_count - 10, "Post-recovery message was not delivered"
logger.info(f"{len(msgs)} messages were delivered")
self.tc.clear(self.node1)
self.tc.clear(self.node2)

View File

@ -1,4 +1,5 @@
import pytest
from uuid import uuid4
from src.env_vars import NODE_1, NODE_2
from src.libs.custom_logger import get_custom_logger
from src.test_data import INVALID_CONTENT_TOPICS, SAMPLE_INPUTS, VALID_PUBSUB_TOPICS
@ -60,9 +61,10 @@ class TestFilterSubscribeCreate(StepsFilter):
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
def test_filter_subscribe_to_29_content_topics_in_separate_calls(self, subscribe_main_nodes):
# subscribe_main_nodes already consumed 1 slot; make 29 more = 30 total, all must succeed
_29_content_topics = [str(i) for i in range(29)]
for content_topic in _29_content_topics:
self.create_filter_subscription({"requestId": "1", "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic})
self.create_filter_subscription({"requestId": str(uuid4()), "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic})
failed_content_topics = []
for content_topic in _29_content_topics:
logger.debug(f"Running test with content topic {content_topic}")
@ -73,11 +75,20 @@ class TestFilterSubscribeCreate(StepsFilter):
logger.error(f"ContentTopic {content_topic} failed: {str(ex)}")
failed_content_topics.append(content_topic)
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
try:
self.create_filter_subscription({"requestId": "1", "contentFilters": ["rate limited"], "pubsubTopic": self.test_pubsub_topic})
raise AssertionError("The 30th subscribe call was not rate limited!!!")
except Exception as ex:
assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex)
# Rate limit is a token bucket (30/min); keep calling beyond 30 until one is denied
rate_limited = False
for extra in range(1, 20):
try:
self.create_filter_subscription(
{"requestId": str(uuid4()), "contentFilters": [f"extra_{extra}"], "pubsubTopic": self.test_pubsub_topic}
)
logger.debug(f"Extra subscribe call #{extra} succeeded")
except Exception as ex:
assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex), f"Unexpected error on extra call #{extra}: {ex}"
logger.info(f"Rate limit hit on extra call #{extra}: {ex}")
rate_limited = True
break
assert rate_limited, "Rate limit was not triggered on any call beyond 30"
def test_filter_subscribe_to_101_content_topics(self, subscribe_main_nodes):
try:

View File

@ -1,3 +1,4 @@
import pytest
from src.env_vars import NODE_1
from src.libs.common import delay
from src.steps.light_push import StepsLightPush
@ -7,14 +8,14 @@ class TestMultipleNodes(StepsLightPush):
def test_2_receiving_nodes__relay_node1_forwards_lightpushed_message_to_relay_node2(self):
self.setup_first_receiving_node(lightpush="true", relay="true")
self.setup_second_receiving_node(lightpush="false", relay="true")
self.setup_first_lightpush_node(lightpush="true", relay="false")
self.setup_first_lightpush_node(lightpush="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1)
def test_2_receiving_nodes__relay_node1_forwards_lightpushed_message_to_filter_node2(self):
self.setup_first_receiving_node(lightpush="true", relay="true", filter="true")
self.setup_second_receiving_node(lightpush="false", relay="false", filternode=self.receiving_node1.get_multiaddr_with_id())
self.setup_first_lightpush_node(lightpush="true", relay="false")
self.setup_first_lightpush_node(lightpush="true", relay="true")
helper_node = self.start_receiving_node(NODE_1, node_index=4, lightpush="false", relay="true")
self.subscribe_to_pubsub_topics_via_relay(node=[self.receiving_node1, helper_node])
self.subscribe_to_pubsub_topics_via_filter(node=self.receiving_node2)
@ -22,6 +23,7 @@ class TestMultipleNodes(StepsLightPush):
get_messages_response = self.receiving_node2.get_filter_messages(self.test_content_topic)
assert len(get_messages_response) == 1, "Lightpushed message was not relayed to the filter node"
@pytest.mark.skip(reason=" test has m=no meaning now as relay = false won't work , moght be used in future if relay = false work again")
def test_2_lightpush_nodes_and_2_receiving_nodes(self):
self.setup_first_receiving_node(lightpush="true", relay="true")
self.setup_second_receiving_node(lightpush="false", relay="true")
@ -34,28 +36,26 @@ class TestMultipleNodes(StepsLightPush):
def test_combination_of_different_nodes(self):
self.setup_first_receiving_node(lightpush="true", relay="true", filter="true")
self.setup_second_receiving_node(lightpush="false", relay="false", filternode=self.receiving_node1.get_multiaddr_with_id())
self.setup_first_lightpush_node(lightpush="true", relay="false")
self.setup_second_lightpush_node(lightpush="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay(node=self.receiving_node1)
self.subscribe_to_pubsub_topics_via_relay(node=self.light_push_node2)
self.subscribe_to_pubsub_topics_via_filter(node=self.receiving_node2)
delay(1)
self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1)
self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node2)
get_messages_response = self.receiving_node2.get_filter_messages(self.test_content_topic)
assert len(get_messages_response) == 2, "Lightpushed message was not relayed to the filter node"
assert len(get_messages_response) == 1, "Lightpushed message was not relayed to the filter node"
def test_multiple_receiving_nodes(self):
self.setup_first_receiving_node(lightpush="true", relay="true")
self.setup_additional_receiving_nodes()
self.setup_first_lightpush_node(lightpush="true", relay="false")
self.setup_first_lightpush_node(lightpush="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1)
def test_multiple_lightpush_nodes(self):
self.setup_first_receiving_node(lightpush="true", relay="true")
self.setup_second_receiving_node(lightpush="false", relay="true")
self.setup_first_lightpush_node(lightpush="true", relay="false")
self.setup_first_lightpush_node(lightpush="true", relay="true")
self.setup_additional_lightpush_nodes()
self.subscribe_to_pubsub_topics_via_relay()
self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1)

View File

@ -1,8 +1,10 @@
import pytest
from src.libs.common import delay
from src.steps.light_push import StepsLightPush
class TestRunningNodes(StepsLightPush):
@pytest.mark.skip(reason="lightpush no longer wroks with relay =false")
def test_main_node_only_lightpush__peer_only_lightpush(self):
self.setup_first_receiving_node(lightpush="true", relay="false")
self.setup_first_lightpush_node(lightpush="true", relay="false")
@ -12,19 +14,19 @@ class TestRunningNodes(StepsLightPush):
except Exception as ex:
assert "no waku relay found" in str(ex) or "failed to negotiate protocol: protocols not supported" in str(ex)
def test_main_node_only_lightpush__peer_only_filter(self):
def test_main_node_relay_lightpush__peer_only_filter(self):
self.setup_first_receiving_node(lightpush="false", relay="false", filter="true")
self.setup_first_lightpush_node(lightpush="true", relay="false")
self.setup_first_lightpush_node(lightpush="true", relay="true")
try:
self.light_push_node1.send_light_push_message(self.create_payload())
raise AssertionError("Light push with non lightpush peer worked!!!")
except Exception as ex:
assert "Failed to request a message push: dial_failure" in str(ex) or "lightpush error" in str(ex)
def test_main_node_only_lightpush__peer_only_relay(self):
def test_main_node_relay_lightpush__peer_only_relay(self):
self.setup_first_receiving_node(lightpush="false", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.setup_first_lightpush_node(lightpush="true", relay="false")
self.setup_first_lightpush_node(lightpush="true", relay="true")
try:
self.light_push_node1.send_light_push_message(self.create_payload())
raise AssertionError("Light push with non lightpush peer worked!!!")
@ -34,7 +36,7 @@ class TestRunningNodes(StepsLightPush):
def test_main_node_only_lightpush__peer_full(self):
self.setup_first_receiving_node(lightpush="true", relay="true", filter="true")
self.setup_second_receiving_node(lightpush="false", relay="true")
self.setup_first_lightpush_node(lightpush="true", relay="false")
self.setup_first_lightpush_node(lightpush="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
self.check_light_pushed_message_reaches_receiving_peer()
@ -57,7 +59,7 @@ class TestRunningNodes(StepsLightPush):
def test_lightpush_with_a_single_receiving_node(self):
self.setup_first_receiving_node(lightpush="true", relay="true")
self.setup_first_lightpush_node(lightpush="true", relay="false")
self.setup_first_lightpush_node(lightpush="true", relay="true")
self.subscribe_to_pubsub_topics_via_relay()
try:
self.check_light_pushed_message_reaches_receiving_peer(sender=self.light_push_node1)

View File

@ -94,11 +94,6 @@ class TestMetrics(StepsRelay, StepsMetrics, StepsFilter, StepsLightPush, StepsSt
self.check_metric(self.publishing_node1, "waku_histogram_message_size_count", 1)
self.check_metric(self.publishing_node1, 'waku_node_messages_total{type="relay"}', 1)
if self.store_node1.is_nwaku():
self.check_metric(
self.store_node1,
f'waku_service_peers{{protocol="/vac/waku/store/2.0.0-beta4",peerId="{self.publishing_node1.get_tcp_address()}"}}',
1,
)
self.check_metric(
self.store_node1,
f'waku_service_peers{{protocol="/vac/waku/store-query/3.0.0",peerId="{self.publishing_node1.get_tcp_address()}"}}',

View File

@ -1,4 +1,5 @@
import pytest
from tenacity import retry, stop_after_delay, wait_fixed
from src.env_vars import NODE_1, NODE_2
from src.libs.common import delay
@ -25,14 +26,20 @@ class TestPeerStore(StepsRelay, StepsStore):
logger.debug(f"Node {i} peer ID {node_id}")
ids.append(node_id)
for i in range(5):
others = []
for peer_info in nodes[i].get_peers():
logger.debug(f"Node {i} peer info {peer_info}")
peer_id = peer_info2id(peer_info, nodes[i].is_nwaku())
others.append(peer_id)
# Discv5 discovery is eventually-consistent; poll until every node's peer
# store reflects the expected mesh, or fail after the timeout.
@retry(stop=stop_after_delay(60), wait=wait_fixed(2), reraise=True)
def check_peer_stores():
for i in range(5):
others = []
for peer_info in nodes[i].get_peers():
logger.debug(f"Node {i} peer info {peer_info}")
peer_id = peer_info2id(peer_info, nodes[i].is_nwaku())
others.append(peer_id)
assert (i == 0 and len(others) == 4) or (i > 0 and len(others) >= 1), f"Some nodes missing in the peer store of Node ID {ids[i]}"
assert (i == 0 and len(others) == 4) or (i > 0 and len(others) >= 1), f"Some nodes missing in the peer store of Node ID {ids[i]}"
check_peer_stores()
def test_add_peers(self):
self.setup_main_nodes()
@ -41,27 +48,52 @@ class TestPeerStore(StepsRelay, StepsStore):
nodes = [self.node1, self.node2]
nodes.extend(self.optional_nodes)
delay(10)
peers_multiaddr = set()
for i in range(2):
for peer_info in nodes[i].get_peers():
multiaddr = peer_info2multiaddr(peer_info, nodes[i].is_nwaku())
peers_multiaddr.add(multiaddr)
assert len(peers_multiaddr) == 5, f"Exactly 5 multi addresses are expected"
# Discv5 discovery is eventually-consistent; poll node1 and node2's peer
# stores until the union contains >=5 multiaddrs, or fail after the timeout.
@retry(stop=stop_after_delay(60), wait=wait_fixed(2), reraise=True)
def collect_multiaddrs():
peers_multiaddr = set()
for i in range(2):
for peer_info in nodes[i].get_peers():
multiaddr = peer_info2multiaddr(peer_info, nodes[i].is_nwaku())
peers_multiaddr.add(multiaddr)
# Add peers one by one excluding self for Nodes 2-5
assert len(peers_multiaddr) >= 5, f"At least 5 multi addresses are expected, got {len(peers_multiaddr)}: {peers_multiaddr}"
return peers_multiaddr
peers_multiaddr = collect_multiaddrs()
# Group multiaddrs by peer ID. libp2p identify can leak observed (ephemeral
# source) addresses into the peer store alongside the real listen address;
# those observed addrs are unreachable when dialed back, so we try every
# known address for a peer until one succeeds.
addrs_by_peer = {}
for peer in peers_multiaddr:
addrs_by_peer.setdefault(multiaddr2id(peer), []).append(peer)
# For each of nodes 2-5, add every other peer via the add_peers API.
for i in range(1, 5):
for peer in list(peers_multiaddr):
if nodes[i].get_id() != multiaddr2id(peer):
self_id = nodes[i].get_id()
for peer_id, addrs in addrs_by_peer.items():
if peer_id == self_id:
continue
last_err = None
for addr in addrs:
try:
if nodes[i].is_nwaku():
nodes[i].add_peers([peer])
nodes[i].add_peers([addr])
else:
peer_info = {"multiaddr": peer, "shards": [0], "protocols": ["/vac/waku/relay/2.0.0"]}
peer_info = {"multiaddr": addr, "shards": [0], "protocols": ["/vac/waku/relay/2.0.0"]}
nodes[i].add_peers(peer_info)
last_err = None
break
except Exception as ex:
logger.error(f"Failed to add peer to Node {i} peer store: {ex}")
raise
last_err = ex
logger.warning(f"Node {i} failed to add peer {peer_id} via {addr}: {ex}")
if last_err is not None:
logger.error(f"Node {i} could not add peer {peer_id} via any known address")
raise last_err
@pytest.mark.skip(reason="waiting for https://github.com/waku-org/nwaku/issues/1549 resolution")
def test_get_peers_two_protocols(self):

View File

@ -13,7 +13,6 @@ from src.test_data import SAMPLE_INPUTS
logger = get_custom_logger(__name__)
@pytest.mark.skip(reason="RLN functional changes. To be updated by Roman Zajic")
@pytest.mark.xdist_group(name="RLN serial tests")
class TestRelayRLN(StepsRLN, StepsRelay):
SAMPLE_INPUTS_RLN = SAMPLE_INPUTS + SAMPLE_INPUTS + SAMPLE_INPUTS
@ -22,7 +21,14 @@ class TestRelayRLN(StepsRLN, StepsRelay):
def test_valid_payloads_lightpush_at_spam_rate(self, pytestconfig):
message_limit = 1
epoch_sec = 1
pytestconfig.cache.set("keystore-prefixes", self.register_rln_relay_nodes(2, []))
rln_state = self.register_rln_relay_nodes(2, [])
pytestconfig.cache.set(
"keystore-prefixes",
{
"keystore_prefixes": rln_state["keystore_prefixes"],
"rln_membership_indexes": rln_state["rln_membership_indexes"],
},
)
self.setup_first_rln_relay_node(lightpush="true", rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=epoch_sec)
self.setup_second_rln_lightpush_node(rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=epoch_sec)
self.subscribe_main_relay_nodes()
@ -37,11 +43,17 @@ class TestRelayRLN(StepsRLN, StepsRelay):
if i > message_limit and (now - start) <= epoch_sec:
raise AssertionError("Publish with RLN enabled at spam rate worked!!!")
except Exception as e:
assert "RLN validation failed" or "NonceLimitReached" in str(e)
error_str = str(e)
assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str
if "NonceLimitReached" in error_str:
assert "503" in error_str, f"Expected HTTP 503 for NonceLimitReached, got: {error_str}"
def test_valid_payloads_at_slow_rate(self, pytestconfig):
message_limit = 20
self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", []))
self.register_rln_relay_nodes(
0,
pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}),
)
self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=600)
self.subscribe_main_relay_nodes()
failed_payloads = []
@ -63,7 +75,10 @@ class TestRelayRLN(StepsRLN, StepsRelay):
def test_valid_payloads_at_spam_rate(self, pytestconfig):
message_limit = 20
epoch_sec = 600
self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", []))
self.register_rln_relay_nodes(
0,
pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}),
)
self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=epoch_sec)
self.subscribe_main_relay_nodes()
start = math.trunc(time())
@ -77,10 +92,16 @@ class TestRelayRLN(StepsRLN, StepsRelay):
if i > message_limit and (now - start) <= epoch_sec:
raise AssertionError("Publish with RLN enabled at spam rate worked!!!")
except Exception as e:
assert "RLN validation failed" or "NonceLimitReached" in str(e)
error_str = str(e)
assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str
if "NonceLimitReached" in error_str:
assert "500" in error_str, f"Expected HTTP 500 for NonceLimitReached, got: {error_str}"
def test_valid_payload_at_variable_rate(self, pytestconfig):
self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", []))
self.register_rln_relay_nodes(
0,
pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}),
)
self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=1)
self.subscribe_main_relay_nodes()
payload_desc = SAMPLE_INPUTS[0]["description"]
@ -101,11 +122,17 @@ class TestRelayRLN(StepsRLN, StepsRelay):
else:
previous = now
except Exception as e:
assert "RLN validation failed" or "NonceLimitReached" in str(e)
error_str = str(e)
assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str
if "NonceLimitReached" in error_str:
assert "500" in error_str, f"Expected HTTP 500 for NonceLimitReached, got: {error_str}"
def test_valid_payloads_random_epoch_at_slow_rate(self, pytestconfig):
epoch_sec = random.randint(2, 5)
self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", []))
self.register_rln_relay_nodes(
0,
pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}),
)
self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=epoch_sec)
self.subscribe_main_relay_nodes()
failed_payloads = []
@ -122,7 +149,10 @@ class TestRelayRLN(StepsRLN, StepsRelay):
def test_valid_payloads_random_user_message_limit(self, pytestconfig):
user_message_limit = random.randint(2, 4)
self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", []))
self.register_rln_relay_nodes(
0,
pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}),
)
self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=user_message_limit, rln_relay_epoch_sec=1)
self.subscribe_main_relay_nodes()
failed_payloads = []
@ -136,12 +166,18 @@ class TestRelayRLN(StepsRLN, StepsRelay):
failed_payloads.append(payload["description"])
assert not failed_payloads, f"Payloads failed: {failed_payloads}"
@pytest.mark.skip(reason="Waiting for issue resolution https://github.com/waku-org/nwaku/issues/3208")
@pytest.mark.timeout(600)
def test_valid_payloads_dynamic_at_spam_rate(self, pytestconfig):
message_limit = 100
epoch_sec = 600
pytestconfig.cache.set("keystore-prefixes", self.register_rln_relay_nodes(2, []))
rln_state = self.register_rln_relay_nodes(2, [])
pytestconfig.cache.set(
"keystore-prefixes",
{
"keystore_prefixes": rln_state["keystore_prefixes"],
"rln_membership_indexes": rln_state["rln_membership_indexes"],
},
)
self.setup_main_rln_relay_nodes(
rln_relay_user_message_limit=message_limit,
rln_relay_epoch_sec=epoch_sec,
@ -160,13 +196,22 @@ class TestRelayRLN(StepsRLN, StepsRelay):
if i > message_limit and (now - start) <= epoch_sec:
raise AssertionError("Publish with RLN enabled at spam rate worked!!!")
except Exception as e:
assert "RLN validation failed" or "NonceLimitReached" in str(e)
error_str = str(e)
assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str
if "NonceLimitReached" in error_str:
assert "500" in error_str, f"Expected HTTP 500 for NonceLimitReached, got: {error_str}"
@pytest.mark.skip(reason="Waiting for issue resolution https://github.com/waku-org/nwaku/issues/3208")
@pytest.mark.timeout(600)
def test_valid_payloads_dynamic_at_slow_rate(self, pytestconfig):
message_limit = 100
pytestconfig.cache.set("keystore-prefixes", self.register_rln_relay_nodes(2, []))
rln_state = self.register_rln_relay_nodes(2, [])
pytestconfig.cache.set(
"keystore-prefixes",
{
"keystore_prefixes": rln_state["keystore_prefixes"],
"rln_membership_indexes": rln_state["rln_membership_indexes"],
},
)
self.setup_main_rln_relay_nodes(
rln_relay_user_message_limit=message_limit,
rln_relay_epoch_sec=600,
@ -192,7 +237,10 @@ class TestRelayRLN(StepsRLN, StepsRelay):
def test_valid_payloads_n1_with_rln_n2_without_rln_at_spam_rate(self, pytestconfig):
message_limit = 1
epoch_sec = 1
self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", []))
self.register_rln_relay_nodes(
0,
pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}),
)
self.setup_first_rln_relay_node(rln_relay_user_message_limit=message_limit, rln_relay_epoch_sec=epoch_sec)
self.setup_second_relay_node()
self.subscribe_main_relay_nodes()
@ -206,10 +254,20 @@ class TestRelayRLN(StepsRLN, StepsRelay):
if i > message_limit and (now - start) <= epoch_sec:
raise AssertionError("Publish with RLN enabled at spam rate worked!!!")
except Exception as e:
assert "RLN validation failed" or "NonceLimitReached" in str(e)
error_str = str(e)
assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str
if "NonceLimitReached" in error_str:
assert "500" in error_str, f"Expected HTTP 500 for NonceLimitReached, got: {error_str}"
def test_valid_payloads_with_optional_nodes_at_slow_rate(self, pytestconfig):
pytestconfig.cache.set("keystore-prefixes", self.register_rln_relay_nodes(5, []))
rln_state = self.register_rln_relay_nodes(5, [])
pytestconfig.cache.set(
"keystore-prefixes",
{
"keystore_prefixes": rln_state["keystore_prefixes"],
"rln_membership_indexes": rln_state["rln_membership_indexes"],
},
)
self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=1)
self.setup_optional_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=1)
self.subscribe_main_relay_nodes()
@ -228,7 +286,10 @@ class TestRelayRLN(StepsRLN, StepsRelay):
assert not failed_payloads, f"Payloads failed: {failed_payloads}"
def test_valid_payloads_with_optional_nodes_at_spam_rate(self, pytestconfig):
self.register_rln_relay_nodes(0, pytestconfig.cache.get("keystore-prefixes", []))
self.register_rln_relay_nodes(
0,
pytestconfig.cache.get("keystore-prefixes", {"keystore_prefixes": [], "rln_membership_indexes": []}),
)
self.setup_main_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=1)
self.setup_optional_rln_relay_nodes(rln_relay_user_message_limit=1, rln_relay_epoch_sec=1)
self.subscribe_main_relay_nodes()
@ -246,4 +307,7 @@ class TestRelayRLN(StepsRLN, StepsRelay):
else:
previous = now
except Exception as e:
assert "RLN validation failed" or "NonceLimitReached" in str(e)
error_str = str(e)
assert "RLN validation failed" in error_str or "NonceLimitReached" in error_str
if "NonceLimitReached" in error_str:
assert "500" in error_str, f"Expected HTTP 500 for NonceLimitReached, got: {error_str}"

View File

@ -1,5 +1,8 @@
import pytest, time, re, os
from src.env_vars import NODE_1, NODE_2, STRESS_ENABLED
import os
import pytest
import re
import time
from src.env_vars import DEFAULT_NWAKU
from src.libs.common import delay
from src.libs.custom_logger import get_custom_logger
from src.node.waku_node import WakuNode
@ -7,7 +10,7 @@ from src.steps.filter import StepsFilter
from src.steps.light_push import StepsLightPush
from src.steps.relay import StepsRelay
from src.steps.store import StepsStore
import re
from src.test_data import DEFAULT_CLUSTER_ID
logger = get_custom_logger(__name__)
@ -35,12 +38,33 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
self.node1.get_version()
self.node1.get_debug_version()
def _wait_for(self, fetcher, predicate, timeout=15, interval=0.5):
deadline = time.time() + timeout
result = fetcher()
while time.time() < deadline:
if predicate(result):
return result
time.sleep(interval)
result = fetcher()
return result
def _connect_nodes(self, source, target):
self.add_node_peer(source, [target.get_multiaddr_with_id()])
def _connect_bidirectional(self, node_a, node_b):
self._connect_nodes(node_a, node_b)
self._connect_nodes(node_b, node_a)
def _peer_addrs_from_groups(self, resp):
groups = resp if isinstance(resp, list) else [resp]
return {peer["multiaddr"] for group in groups for peer in group.get("peers", [])}
@pytest.fixture(scope="function", autouse=True)
def nodes(self):
self.node1 = WakuNode(NODE_2, f"node1_{self.test_id}")
self.node2 = WakuNode(NODE_2, f"node2_{self.test_id}")
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node1 = WakuNode(DEFAULT_NWAKU, f"node1_{self.test_id}")
self.node2 = WakuNode(DEFAULT_NWAKU, f"node2_{self.test_id}")
self.node3 = WakuNode(DEFAULT_NWAKU, f"node3_{self.test_id}")
self.node4 = WakuNode(DEFAULT_NWAKU, f"node4_{self.test_id}")
def _tail(self, path, start_size):
with open(path, "rb") as f:
@ -82,27 +106,46 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
self.node1.start(filter="true", relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self._connect_nodes(self.node1, self.node2)
self._connect_nodes(self.node1, self.node3)
self.node1.add_peers([self.node3.get_multiaddr_with_id()])
self.node4.start(relay="false", filternode=self.node1.get_multiaddr_with_id(), discv5_bootstrap_node=self.node1.get_enr_uri())
self.node4.set_filter_subscriptions({"requestId": "1", "contentFilters": [self.test_content_topic], "pubsubTopic": self.test_pubsub_topic})
stats = self.node1.get_peer_stats()
stats = self._wait_for(
self.node1.get_peer_stats,
lambda s: s["Sum"]["Total peers"] >= 2 and s["Relay peers"]["Total relay peers"] >= 1,
)
logger.debug(f"Node-1 admin peers stats {stats}")
assert stats["Sum"]["Total peers"] == 3, "expected 3 peers connected to node1"
assert stats["Relay peers"]["Total relay peers"] == 2, "expected exactly 2 relay peer"
assert stats["Sum"]["Total peers"] >= 3, "expected at least 3 peers connected to node1"
assert stats["Relay peers"]["Total relay peers"] >= 1, "expected at least 1 relay shard"
def test_admin_peers_mesh_on_shard_contains_node2(self):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
mesh = self.node1.get_mesh_peers_on_shard(self.node1.start_args["shard"])
shard = "0"
start_kwargs = dict(relay="true", shard=shard, dns_discovery="false", discv5_discovery="false")
self.node1.start(**start_kwargs)
self.node2.start(**{**start_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()})
self.node3.start(**{**start_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()})
self._connect_bidirectional(self.node1, self.node2)
self._connect_bidirectional(self.node1, self.node3)
mesh_topic = f"/waku/2/rs/{self.node1.start_args.get('cluster-id', DEFAULT_CLUSTER_ID)}/{shard}"
for node in (self.node1, self.node2, self.node3):
node.set_relay_subscriptions([mesh_topic])
shard = self.node1.start_args["shard"]
targets = {self.node2.get_multiaddr_with_id(), self.node3.get_multiaddr_with_id()}
logger.debug(f"mesh topic={mesh_topic}, target peers={targets}")
mesh = self._wait_for(
lambda: self.node1.get_mesh_peers_on_shard(shard),
lambda m: targets.intersection({p["multiaddr"] for p in m["peers"]}),
timeout=30,
)
logger.debug(f"Node-1 mesh on the shard {mesh}")
logger.debug("Validate the schema variables")
assert isinstance(mesh["shard"], int) and mesh["shard"] == int(self.node1.start_args["shard"]), "shard mismatch"
peer_maddrs = [p["multiaddr"] for p in mesh["peers"]]
assert self.node2.get_multiaddr_with_id() in peer_maddrs and self.node3.get_multiaddr_with_id() in peer_maddrs, "node2 or node3 not in mesh"
assert targets.intersection(peer_maddrs), "expected at least one of node2/node3 in mesh"
for p in mesh["peers"]:
assert isinstance(p["protocols"], list) and all(isinstance(x, str) for x in p["protocols"]), "protocols must be [str]"
assert isinstance(p["shards"], list) and all(isinstance(x, int) for x in p["shards"]), "shards must be [int]"
@ -113,23 +156,22 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
def test_admin_peer_by_id(self):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self._connect_bidirectional(self.node1, self.node2)
peer_id = self.node2.get_multiaddr_with_id().rpartition("/p2p/")[2]
info = self.node1.get_peer_info(peer_id)
logger.debug(f"Node-1 /admin/v1/peer/{peer_id}: {info} \n")
logger.debug("Validate response schema")
for k in ("multiaddr", "protocols", "shards", "connected", "agent", "origin"):
assert k in info, f"missing field: {k}"
assert info["multiaddr"] == self.node2.get_multiaddr_with_id(), "multiaddr mismatch"
assert peer_id in info["multiaddr"], "multiaddr mismatch"
def test_admin_set_all_log_levels(self):
self.node1.start(relay="true")
self.node1.container()
levels = ["TRACE", "DEBUG", "INFO", "NOTICE", "WARN", "ERROR", "FATAL"]
_levels = ["INFO"]
for lvl in _levels:
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
for lvl in levels:
resp = self.node1.set_log_level(lvl)
logger.debug(f"Set log level ({lvl})")
self.node2.start(relay="true")
logger.debug(f"Set log level ({lvl}) -> status={resp.status_code}")
assert resp.status_code == 200, f"failed to set log level {lvl} {resp.text}"
self.node2.info()
self.node2.get_debug_version()
@ -205,7 +247,6 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
counts = self._read_tail_counts(path, start)
logger.debug(f"counts at NOTICE: {counts}")
assert counts["NTC"] > 0, "expected NOTICE logs at NOTICE level"
for lv in ["TRC", "DBG", "INF"]:
assert counts[lv] == 0, f"{lv} must be filtered at NOTICE"
@ -230,7 +271,6 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
counts = self._read_tail_counts(path, start)
logger.debug(f"counts at WARN: {counts}")
assert counts["WRN"] > 0, "expected WARN logs at WARN level"
for lv in ["TRC", "DBG", "INF", "NTC"]:
assert counts[lv] == 0, f"{lv} must be filtered at WARN"
@ -255,7 +295,6 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
counts = self._read_tail_counts(path, start)
logger.debug(f"counts at ERROR: {counts}")
assert counts["ERR"] > 0, "expected ERROR logs at ERROR level"
for lv in ["TRC", "DBG", "INF", "NTC", "WRN"]:
assert counts[lv] == 0, f"{lv} must be filtered at ERROR"
@ -280,7 +319,6 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
counts = self._read_tail_counts(path, start)
logger.debug(f"counts at FATAL: {counts}")
assert counts["FTL"] > 0, "expected FATAL logs at FATAL level"
for lv in ["TRC", "DBG", "INF", "NTC", "WRN", "ERR"]:
assert counts[lv] == 0, f"{lv} must be filtered at FATAL"
@ -288,13 +326,9 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
def test_relay_peers_on_shard_schema(self):
node_shard = "0"
self.node1.start(relay="true", shard=node_shard, dns_discovery="false")
self.node2.start(
relay="true",
shard=node_shard,
dns_discovery="false",
discv5_bootstrap_node=self.node1.get_enr_uri(),
)
shard_kwargs = dict(relay="true", shard=node_shard, dns_discovery="false", discv5_discovery="false")
self.node1.start(**shard_kwargs)
self.node2.start(**{**shard_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()})
time.sleep(1)
resp = self.node1.get_relay_peers_on_shard(node_shard)
logger.debug(f"relay peers on shard=0 (schema): {resp!r}")
@ -315,15 +349,20 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
assert isinstance(p["score"], (int, float)), "peer.score must be a number"
def test_relay_peers_on_shard_contains_connected_peer(self):
self.node1.start(relay="true", shard="0", dns_discovery="false")
self.node2.start(
relay="true",
shard="0",
dns_discovery="false",
discv5_bootstrap_node=self.node1.get_enr_uri(),
)
shard_kwargs = dict(relay="true", shard="0", dns_discovery="false", discv5_discovery="false")
self.node1.start(**shard_kwargs)
self.node2.start(**{**shard_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()})
self._connect_bidirectional(self.node1, self.node2)
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=1)
relay_topic = f"/waku/2/rs/{self.node1.start_args.get('cluster-id', DEFAULT_CLUSTER_ID)}/{self.node1.start_args['shard']}"
for node in (self.node1, self.node2):
node.set_relay_subscriptions([relay_topic])
n2_addr = self.node2.get_multiaddr_with_id()
resp = self.node1.get_relay_peers_on_shard("0")
resp = self._wait_for(
lambda: self.node1.get_relay_peers_on_shard("0"),
lambda data: any(p.get("multiaddr") == n2_addr for p in data.get("peers", [])),
timeout=30,
)
logger.debug(f"checking shard=0 list: {resp!r}")
assert any(
p.get("multiaddr") == n2_addr for p in resp["peers"]
@ -347,7 +386,7 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
assert isinstance(ma, str) and ma.strip(), "multiaddr must be a non-empty string"
protos = peer["protocols"]
all(isinstance(x, str) for x in protos), "protocols must be list[str]"
assert all(isinstance(x, str) for x in protos), "protocols must be list[str]"
assert isinstance(peer["score"], (int, float)), "score must be a number"
@ -355,100 +394,149 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node3 = WakuNode(DEFAULT_NWAKU, f"node3_{self.test_id}")
self.node4 = WakuNode(DEFAULT_NWAKU, f"node4_{self.test_id}")
self.node3.start(relay="false", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
for node in (self.node1, self.node2, self.node4):
node.set_relay_subscriptions([self.test_pubsub_topic])
self._connect_bidirectional(self.node1, self.node2)
self._connect_bidirectional(self.node1, self.node4)
self.wait_for_autoconnection([self.node1, self.node2, self.node4], hard_wait=1)
n2_addr = self.node2.get_multiaddr_with_id()
n3_addr = self.node3.get_multiaddr_with_id()
n4_addr = self.node4.get_multiaddr_with_id()
time.sleep(1)
resp = self.node1.get_relay_peers()
expected_present = {n2_addr, n4_addr}
resp = self._wait_for(
self.node1.get_relay_peers,
lambda data: expected_present.issubset(self._peer_addrs_from_groups(data)),
timeout=30,
)
logger.debug(f"/admin/v1/peers/relay {resp!r}")
peer_addrs = {peer["multiaddr"] for group in resp for peer in group["peers"]}
assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}"
assert n3_addr not in peer_addrs, f"Missing Node-3 address {n3_addr}"
assert n4_addr in peer_addrs, f"Missing Node-4 address {n4_addr}"
peer_ids = {peer["multiaddr"].rpartition("/p2p/")[2] for group in resp for peer in group["peers"]}
n2_id = n2_addr.rpartition("/p2p/")[2]
n3_id = n3_addr.rpartition("/p2p/")[2]
n4_id = n4_addr.rpartition("/p2p/")[2]
assert n2_id in peer_ids, f"Missing Node-2 address {n2_addr}"
assert n3_id not in peer_ids, f"Unexpected Node-3 address {n3_addr}"
assert n4_id in peer_ids, f"Missing Node-4 address {n4_addr}"
def test_admin_connected_peers_on_shard_contains_all_three(self):
shard = "0"
self.node1.start(relay="true", shard=shard)
self.node2.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node3.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri())
self.node4.start(relay="true", shard=shard, discv5_bootstrap_node=self.node1.get_enr_uri())
shard_kwargs = dict(relay="true", shard=shard, dns_discovery="false", discv5_discovery="false")
self.node1.start(**shard_kwargs)
self.node2.start(**{**shard_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()})
self.node3 = WakuNode(DEFAULT_NWAKU, f"node3_{self.test_id}")
self.node4 = WakuNode(DEFAULT_NWAKU, f"node4_{self.test_id}")
self.node3.start(**{**shard_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()})
self.node4.start(**{**shard_kwargs, "discv5_bootstrap_node": self.node1.get_enr_uri()})
for node in (self.node2, self.node3, self.node4):
self._connect_bidirectional(self.node1, node)
relay_topic = f"/waku/2/rs/{self.node1.start_args.get('cluster-id', DEFAULT_CLUSTER_ID)}/{shard}"
for node in (self.node1, self.node2, self.node3, self.node4):
node.set_relay_subscriptions([relay_topic])
self.wait_for_autoconnection([self.node1, self.node2, self.node3, self.node4], hard_wait=1)
n2_addr = self.node2.get_multiaddr_with_id()
n3_addr = self.node3.get_multiaddr_with_id()
n4_addr = self.node4.get_multiaddr_with_id()
time.sleep(1)
resp = self.node1.get_connected_peers(shard)
logger.debug(f"/admin/v1/peers/connected/on/{shard} (contains 3): {resp!r}")
expected_ids = {n2_addr.rpartition("/p2p/")[2], n3_addr.rpartition("/p2p/")[2], n4_addr.rpartition("/p2p/")[2]}
connected_all = self._wait_for(
self.node1.get_connected_peers,
lambda peers: expected_ids.issubset({p["multiaddr"].rpartition("/p2p/")[2] for p in peers}),
timeout=30,
)
shard_resp = self.node1.get_connected_peers_on_shard(shard)
logger.debug(f"/admin/v1/peers/connected/on/{shard} (contains 3): {shard_resp!r}")
peer_addrs = {p["multiaddr"] for p in resp["peers"]}
assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}"
assert n3_addr in peer_addrs, f"Missing Node-3 address {n3_addr}"
assert n4_addr in peer_addrs, f"Missing Node-4 address {n4_addr}"
if shard_resp:
shard_ids = {p["multiaddr"].rpartition("/p2p/")[2] for p in shard_resp}
all_ids = {p["multiaddr"].rpartition("/p2p/")[2] for p in connected_all}
assert shard_ids.issubset(all_ids), "Shard-specific peers must be connected"
for peer in shard_resp:
assert int(shard) in peer.get("shards", []), "peer missing requested shard"
else:
logger.warning("Connected peers endpoint returned no shard-scoped peers; relying on global check")
def test_admin_connected_peers_scalar_types(self):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
resp = self.node1.get_connected_peers()
self._connect_bidirectional(self.node1, self.node2)
resp = self._wait_for(
self.node1.get_connected_peers,
lambda peers: any(p.get("multiaddr") == self.node2.get_multiaddr_with_id() for p in peers),
timeout=30,
)
logger.debug(f"Response for get connected peers {resp!r}")
for p in resp:
assert isinstance(p["multiaddr"], str) and p["multiaddr"].strip(), "multiaddr must be a non-empty string"
assert isinstance(p["protocols"], list) and all(isinstance(x, str) for x in p["protocols"]), "protocols must be list[str]"
assert isinstance(p["shards"], list), "shards must be a list"
assert isinstance(p["agent"], str), "agent must be a string"
assert isinstance(p["connected"], str), "connected must be a string"
assert isinstance(p["origin"], str), "origin must be a string"
assert isinstance(p["score"], (int, float)), "score must be a number"
assert isinstance(p["latency"], (int, float)), "latency must be a number"
score = p.get("score")
if score is not None:
assert isinstance(score, (int, float)), "score must be a number when provided"
latency = p.get("latency")
if latency is not None:
assert isinstance(latency, (int, float)), "latency must be numeric when present"
def test_admin_connected_peers_contains_peers_only(self):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node3 = WakuNode(DEFAULT_NWAKU, f"node3_{self.test_id}")
self.node4 = WakuNode(DEFAULT_NWAKU, f"node4_{self.test_id}")
self.node3.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node4.start(relay="true")
self._connect_bidirectional(self.node1, self.node2)
self._connect_bidirectional(self.node1, self.node3)
n2_addr = self.node2.get_multiaddr_with_id()
n3_addr = self.node3.get_multiaddr_with_id()
n4_addr = self.node4.get_multiaddr_with_id()
resp = self.node1.get_connected_peers()
expected_ids = {n2_addr.rpartition("/p2p/")[2], n3_addr.rpartition("/p2p/")[2]}
resp = self._wait_for(
self.node1.get_connected_peers,
lambda peers: expected_ids.issubset({p["multiaddr"].rpartition("/p2p/")[2] for p in peers}),
timeout=30,
)
logger.debug(f"/admin/v1/peers/connected contains : {resp!r}")
peer_addrs = {p["multiaddr"] for p in resp}
assert n2_addr in peer_addrs, f"Missing Node-2 address {n2_addr}"
assert n3_addr in peer_addrs, f"Missing Node-3 address {n3_addr}"
assert n4_addr not in peer_addrs, f"Missing Node-4 address {n4_addr}"
peer_ids = {p["multiaddr"].rpartition("/p2p/")[2] for p in resp}
assert expected_ids.issubset(peer_ids), "Missing expected connected peers"
assert n4_addr.rpartition("/p2p/")[2] not in peer_ids, f"Unexpected Node-4 address {n4_addr}"
def test_admin_service_peers_scalar_required_types(self):
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self._connect_nodes(self.node1, self.node2)
resp = self.node1.get_service_peers()
logger.debug(f"/admin/v1/peers/service {resp!r}")
for service, peers in resp.items():
assert isinstance(service, str) and service.strip(), "service must be a non-empty string"
for p in peers:
assert isinstance(p.get("multiaddr"), str) and p["multiaddr"].strip(), "multiaddr must be a non-empty string"
assert isinstance(p.get("agent"), str), "agent must be a string"
assert isinstance(p.get("connected"), str), "connected must be a string"
assert isinstance(p.get("origin"), str), "origin must be a string"
assert isinstance(p.get("score"), (int, float)), "score must be a number"
assert isinstance(p.get("latency"), (int, float)), "latency must be a number"
for peer in resp:
assert isinstance(peer.get("multiaddr"), str) and peer["multiaddr"].strip(), "multiaddr must be a non-empty string"
assert isinstance(peer.get("protocols"), list) and all(isinstance(x, str) for x in peer["protocols"]), "protocols must be list[str]"
assert isinstance(peer.get("shards"), list), "shards must be a list"
assert isinstance(peer.get("agent"), str), "agent must be a string"
assert isinstance(peer.get("connected"), str), "connected must be a string"
assert isinstance(peer.get("origin"), str), "origin must be a string"
score = peer.get("score")
if score is not None:
assert isinstance(score, (int, float)), "score must be numeric when present"
def test_admin_service_peers_schema(self):
n1 = WakuNode(NODE_1, "n1_service_schema")
n2 = WakuNode(NODE_2, "n2_service_schema")
n1 = WakuNode(DEFAULT_NWAKU, "n1_service_schema")
n2 = WakuNode(DEFAULT_NWAKU, "n2_service_schema")
n1.start(relay="true")
n2.start(relay="true", discv5_bootstrap_node=n1.get_enr_uri())
peers = n1.get_service_peers()
@ -462,21 +550,30 @@ class TestAdminFlags(StepsFilter, StepsStore, StepsRelay, StepsLightPush):
assert "origin" in p, "missing 'origin'"
def test_admin_service_peers_contains_expected_addrs_and_protocols(self):
n1 = WakuNode(NODE_1, "n1_service_lookup")
n2 = WakuNode(NODE_2, "n2_service_relay")
n3 = WakuNode(NODE_2, "n3_service_store")
n1 = WakuNode(DEFAULT_NWAKU, "n1_service_lookup")
n2 = WakuNode(DEFAULT_NWAKU, "n2_service_relay")
n3 = WakuNode(DEFAULT_NWAKU, "n3_service_store")
n1.start(relay="true")
n2.start(relay="true", discv5_bootstrap_node=n1.get_enr_uri())
n3.start(store="true", discv5_bootstrap_node=n1.get_enr_uri())
n1.add_peers([n2.get_multiaddr_with_id()])
n2.add_peers([n1.get_multiaddr_with_id()])
n1.add_peers([n3.get_multiaddr_with_id()])
resp = n1.get_service_peers()
n3.add_peers([n1.get_multiaddr_with_id()])
resp = self._wait_for(
n1.get_service_peers,
lambda peers: {n2.get_multiaddr_with_id().rpartition("/p2p/")[2], n3.get_multiaddr_with_id().rpartition("/p2p/")[2]}.issubset(
{p["multiaddr"].rpartition("/p2p/")[2] for p in peers}
),
timeout=30,
)
logger.debug("/admin/v1/peers/service %s", resp)
by_addr = {p["multiaddr"]: p["protocols"] for p in resp}
by_id = {p["multiaddr"].rpartition("/p2p/")[2]: p["protocols"] for p in resp}
m2 = n2.get_multiaddr_with_id()
m3 = n3.get_multiaddr_with_id()
assert m2 in by_addr, f"node2 not found"
assert any("/waku/relay/" in s for s in by_addr[m2]), "node2 should advertise a relay protocol"
assert m3 in by_addr, f"node3 not found. got: {list(by_addr.keys())}"
assert any("/waku/store-query/" in s for s in by_addr[m3]), "node3 should advertise a store-query protocol"
m2 = n2.get_multiaddr_with_id().rpartition("/p2p/")[2]
m3 = n3.get_multiaddr_with_id().rpartition("/p2p/")[2]
assert m2 in by_id, f"node2 not found"
assert any("/waku/relay/" in s for s in by_id[m2]), "node2 should advertise a relay protocol"
assert m3 in by_id, f"node3 not found. got: {list(by_id.keys())}"
assert any("/waku/store-query/" in s for s in by_id[m3]), "node3 should advertise a store-query protocol"

View File

@ -32,8 +32,10 @@ class TestFilterAutoSharding(StepsSharding):
self.setup_first_relay_node_with_filter(
cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic, num_shards_in_network=1
)
self.setup_third_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=1)
self.setup_second_node_as_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic)
self.subscribe_first_relay_node(content_topics=[self.test_content_topic])
self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic])
self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
self.check_published_message_reaches_filter_peer(content_topic=self.test_content_topic)
@ -44,10 +46,16 @@ class TestFilterAutoSharding(StepsSharding):
pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER,
num_shards_in_network=8,
)
self.setup_third_relay_node(
cluster_id=self.auto_cluster,
content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS,
num_shards_in_network=8,
)
self.setup_second_node_as_filter(
cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=self.test_pubsub_topic
)
self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
self.subscribe_optional_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER):
self.subscribe_filter_node(self.node2, content_topics=[content_topic], pubsub_topic=pubsub_topic)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:

View File

@ -96,14 +96,7 @@ class TestRelayStaticSharding(StepsSharding):
def test_unsubscribe_from_non_subscribed_pubsub_topics(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
try:
self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
if self.node1.is_nwaku():
pass
else:
raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic)

View File

@ -60,7 +60,7 @@ class TestRunningNodes(StepsStore):
def test_store_lightpushed_message(self):
self.setup_first_publishing_node(store="true", relay="true", lightpush="true")
self.setup_second_publishing_node(store="false", relay="true")
self.setup_first_store_node(store="false", relay="false", lightpush="true", lightpushnode=self.multiaddr_list[0])
self.setup_first_store_node(store="false", relay="true", lightpush="true", lightpushnode=self.multiaddr_list[0])
self.subscribe_to_pubsub_topics_via_relay()
self.publish_message(via="lightpush", sender=self.store_node1)
self.check_published_message_is_stored(page_size=5, ascending="true")

View File

@ -0,0 +1,34 @@
import socket
import pytest
from src.test_data import DEFAULT_CLUSTER_ID
def _free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
return s.getsockname()[1]
def build_node_config(**overrides):
config = {
"logLevel": "DEBUG",
"listenAddress": "0.0.0.0",
"tcpPort": _free_port(),
"discv5UdpPort": _free_port(),
"restPort": _free_port(),
"restAddress": "0.0.0.0",
"clusterId": DEFAULT_CLUSTER_ID,
"relay": True,
"store": True,
"filter": False,
"lightpush": False,
"peerExchange": False,
"discv5Discovery": False,
}
config.update(overrides)
return config
@pytest.fixture
def node_config():
return build_node_config()

View File

@ -0,0 +1,26 @@
import pytest
from src.node.wrappers_manager import WrapperManager
@pytest.mark.smoke
class TestLogosDeliveryLifecycle:
def _create_start_node(self, node_config):
result = WrapperManager.create_and_start(config=node_config)
assert result.is_ok(), f"Failed to create and start node: {result.err()}"
return result.ok_value
def test_create_start_and_stop_node(self, node_config):
node = self._create_start_node(node_config)
stop_result = node.stop_and_destroy()
assert stop_result.is_ok(), f"Failed to stop and destroy node: {stop_result.err()}"
def test_stop_node_without_destroy(self, node_config):
node = self._create_start_node(node_config)
try:
stop_result = node.stop_node()
assert stop_result.is_ok(), f"Failed to stop node: {stop_result.err()}"
finally:
destroy_result = node.destroy()
assert destroy_result.is_ok(), f"Failed to destroy node: {destroy_result.err()}"

@ -0,0 +1 @@
Subproject commit fd4d0a5a7efcd56b395024f558afa416d7e27a2b