e2e_part2 (#179)

* add test s17

* Add temp changes

* Add s17 positive / negative scenarios

* add S19

* Add S06 relay-only test and fix wrapper helpers (#173)

* - Add S06 relay-only test case for testing message propagation without a store.
- Update `wrapper_helpers` for clearer event type handling and type annotations (`Optional[...]` usage).
- Simplify `get_node_multiaddr` to retrieve addresses via `get_node_info_raw`.
- Refactor `wrappers_manager` to adjust bindings path to `vendor` directory and add `get_node_info_raw` method.
- Update `.gitignore` to exclude `store.sqlite3*`.

* Refactor S06 relay-only test: replace try-finally blocks with context managers for clarity and conciseness.

* Migrate S06 relay-only test to `test_send_e2e.py` and refactor with `StepsCommon` for reusability.

---------

Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>

* Modify S19 test

* Adding S21

* Fix review comments

* Adding S22/S23

* Adding S24

* Add S26

* Add S30

* Add S31

* Improve `wait_for_event` loop logic and add `assert_event_invariants` helper (#178)

- Refactored the `wait_for_event` function for clarity and to ensure proper deadline handling within the loop.
- Introduced `assert_event_invariants` to validate per-request event properties, enforcing invariants like correct `requestId`, no duplicate terminal events, and proper timing between `Propagated` and `Sent`.
- Added tests for `assert_event_invariants` enforcement in `S14` and `S15` lightpush scenarios.

Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>

* Add S07 and S10 send API tests with event invariants helper  (#176)

* Add `assert_event_invariants` to enforce per-request event constraints and integrate into relevant tests

* Integrate `assert_event_invariants` into edge and store tests

* Remove redundant comments from `test_send_e2e.py`

---------

Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>

* Fix some tests

* Add S02/S12 send API tests and PR CI pipeline (#174)

* Add tests for auto-subscribe on first send and isolated sender with no peers

* Add PR CI workflow with tiered test strategy

- pr_tests.yml: build job with cache, wrapper-tests, smoke-tests,
  and label-triggered full-suite
- test_common.yml: add deploy_allure/send_discord inputs so PR runs
  skip reporting side effects
- Add docker_required marker to S19 (needs Docker, excluded from
  wrapper-only CI job)
- Register docker_required marker in pytest.ini

* Document PR CI test workflows in README

* Refine PR CI test strategy:
- Exclude `docker_required` tests from smoke set in `pr_tests.yml`.
- Add `wait_for_connected` helper for connection state checks.
- Update S19 test to dynamically create and clean up the store node setup.
- General simplifications and improved test stability.

* Add `wait_for_connected` assertion to ensure sender connection state before propagation test

* Refine tests and CI workflows:
- Replace `ERROR_TIMEOUT_S` with `ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S` in `test_send_e2e.py`.
- Adjust timeout assertion for better clarity and accuracy.
- Update `pr_tests.yml` to add retries (`--reruns`) and ignore wrapper tests in smoke tests.
- Change `test_common.yml` default Discord reporting to `false`.

* Normalize `portsshift` to `portsShift` in `test_send_e2e.py` configuration definitions.

---------

Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>

* Add relay-to-lightpush fallback integration tests (S08/S09) (#180)

Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>

* Ignore S19

* fix s26

* Ignore s20 / s31 for errors

* Change image name

* fix xfail syntax error

* rename test file

* FIx flaky tests

* comment the skipped tests

* Fix review comments

* revert tag in yml in latest

* commenting lightpush

* Modify the PR

* Fix the ports conflict

* Modify S20

* fix portsshift option

* remove the /true from yml to allow errors to exist

* Modify the yml to continue on error

* First set of review comments

* adding xfail mark for failed tests

* address review comments about xfail

* cleanup unused lines

* event collector fix

* Address review comment about delay constant

* fix the timeout review comment

* Add assert_event_invariants

* enhance comment on S26 test

* mark the waku tests as docker_required

* Mark `test_s10_edge_lightpush_propagation` as xfail due to broken lightpush peer discovery.

* Mark `test_s15_lightpush_retryable_error_then_recovery` as xfail due to broken lightpush peer discovery.

---------

Co-authored-by: Egor Rachkovskii <32649334+at0m1x19@users.noreply.github.com>
Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>
This commit is contained in:
AYAHASSAN287 2026-05-11 16:53:18 +03:00 committed by GitHub
parent 34092b6efc
commit 11197db624
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 2309 additions and 61 deletions

256
.github/workflows/pr_tests.yml vendored Normal file
View File

@ -0,0 +1,256 @@
name: PR Tests
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
on:
pull_request:
types: [opened, synchronize, reopened, labeled]
paths:
- "src/**"
- "tests/**"
- "vendor/**"
- "requirements.txt"
- "pytest.ini"
- ".github/workflows/pr_tests.yml"
push:
branches: [master]
paths:
- "vendor/**"
workflow_dispatch:
inputs:
run_full_suite:
description: "Run the full test suite (18 shards)"
required: false
default: false
type: boolean
jobs:
build:
name: Build liblogosdelivery
runs-on: ubuntu-latest
timeout-minutes: 45
if: >-
github.event.action != 'labeled' ||
github.event.label.name == 'full-test'
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: Compute cache key
id: cache-key
run: |
BINDINGS_HASH=$(git rev-parse HEAD:vendor/logos-delivery-python-bindings)
DELIVERY_HASH=$(git -C vendor/logos-delivery-python-bindings rev-parse HEAD:vendor/logos-delivery)
echo "key=liblogosdelivery-${{ runner.os }}-nim2.2.4-${BINDINGS_HASH}-${DELIVERY_HASH}" >> "$GITHUB_OUTPUT"
- name: Cache liblogosdelivery.so
id: cache-lib
uses: actions/cache@v4
with:
path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so
key: ${{ steps.cache-key.outputs.key }}
- name: Remove unwanted software
if: steps.cache-lib.outputs.cache-hit != 'true'
uses: ./.github/actions/prune-vm
- name: Install system deps
if: steps.cache-lib.outputs.cache-hit != 'true'
run: |
sudo apt-get update
sudo apt-get install -y \
util-linux \
iproute2 \
sudo \
ca-certificates \
curl \
make \
gcc \
g++
- name: Install Nim 2.2.4
if: steps.cache-lib.outputs.cache-hit != 'true'
run: |
set -euo pipefail
curl https://nim-lang.org/choosenim/init.sh -sSf | sh -s -- -y
echo "$HOME/.nimble/bin" >> "$GITHUB_PATH"
export PATH="$HOME/.nimble/bin:$PATH"
choosenim 2.2.4
nim --version
nimble --version
- name: Build liblogosdelivery.so
if: steps.cache-lib.outputs.cache-hit != 'true'
run: |
set -euo pipefail
export PATH="$HOME/.nimble/bin:$PATH"
BINDINGS_DIR="$(pwd)/vendor/logos-delivery-python-bindings"
DELIVERY_DIR="$BINDINGS_DIR/vendor/logos-delivery"
mkdir -p "$BINDINGS_DIR/lib"
cd "$DELIVERY_DIR"
ln -sf waku.nimble waku.nims
nimble install -y
make setup
make liblogosdelivery
SO_PATH="$(find . -type f -name 'liblogosdelivery.so' | head -n 1)"
if [ -z "$SO_PATH" ]; then
echo "liblogosdelivery.so was not built"
exit 1
fi
cp "$SO_PATH" "$BINDINGS_DIR/lib/liblogosdelivery.so"
echo "Built library:"
ls -l "$BINDINGS_DIR/lib/liblogosdelivery.so"
- name: Upload library artifact
uses: actions/upload-artifact@v4
with:
name: liblogosdelivery
path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so
retention-days: 1
wrapper-tests:
name: Wrapper Tests
runs-on: ubuntu-latest
needs: [build]
timeout-minutes: 45
if: >-
github.event_name != 'push' &&
(github.event.action != 'labeled' || github.event.label.name == 'full-test')
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- uses: actions/setup-python@v4
with:
python-version: "3.12"
cache: "pip"
- run: pip install -r requirements.txt
- name: Download liblogosdelivery.so
uses: actions/download-artifact@v4
with:
name: liblogosdelivery
path: vendor/logos-delivery-python-bindings/lib/
- name: Run wrapper tests - basic life cycle
continue-on-error: true
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest tests/wrappers_tests/test_basic_life_cycle.py \
-m "not docker_required" \
--reruns 2 \
--junit-xml=wrapper-results-basic.xml
- name: Run wrapper tests - send e2e part 1
continue-on-error: true
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest tests/wrappers_tests/test_send_e2e_part1.py \
-m "not docker_required" \
--reruns 2 \
--junit-xml=wrapper-results-send-part1.xml
- name: Run wrapper tests - send e2e part 2
continue-on-error: true
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest tests/wrappers_tests/test_send_e2e_part2.py \
-m "not docker_required" \
--reruns 2 \
--junit-xml=wrapper-results-send-part2.xml
- name: Test Report
if: always()
uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2
with:
name: Wrapper Test Results
path: wrapper-results-*.xml
reporter: java-junit
use-actions-summary: "true"
smoke-tests:
name: Smoke Tests
runs-on: ubuntu-latest
needs: [build]
timeout-minutes: 30
if: >-
github.event_name != 'push' &&
(github.event.action != 'labeled' || github.event.label.name == 'full-test')
env:
NODE_1: "wakuorg/nwaku:latest"
NODE_2: "wakuorg/nwaku:latest"
ADDITIONAL_NODES: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest"
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- uses: actions/setup-python@v4
with:
python-version: "3.12"
cache: "pip"
- run: pip install -r requirements.txt
- name: Download liblogosdelivery.so
uses: actions/download-artifact@v4
with:
name: liblogosdelivery
path: vendor/logos-delivery-python-bindings/lib/
- name: Run smoke tests
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest -m "smoke and not docker_required" \
--ignore=vendor/logos-delivery-python-bindings/tests \
--ignore=tests/wrappers_tests \
--reruns 1 \
-n 4 \
--dist=loadgroup \
--junit-xml=smoke-results.xml
- name: Test Report
if: always()
uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2
with:
name: Smoke Test Results
path: smoke-results.xml
reporter: java-junit
use-actions-summary: "true"
full-suite:
name: Full Suite
if: >-
github.event_name != 'push' &&
(contains(github.event.pull_request.labels.*.name, 'full-test') ||
github.event.inputs.run_full_suite == 'true')
uses: ./.github/workflows/test_common.yml
secrets: inherit
with:
node1: "wakuorg/nwaku:latest"
node2: "wakuorg/nwaku:latest"
additional_nodes: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest"
caller: "pr"
deploy_allure: false
send_discord: false

View File

@ -27,6 +27,16 @@ on:
description: "Run fleet tests only"
type: boolean
default: false
deploy_allure:
required: false
description: "Deploy allure report to gh-pages"
type: boolean
default: true
send_discord:
required: false
description: "Send test results to Discord"
type: boolean
default: false
env:
FORCE_COLOR: "1"
@ -42,7 +52,7 @@ jobs:
strategy:
fail-fast: false
matrix:
shard: ${{ inputs.fleet_tests && fromJSON('[0]') || fromJSON('[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17]') }}
shard: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17]
# total number of shards =18 means tests will split into 18 thread and run in parallel to increase execution speed
# command for sharding :
# pytest --shard-id=<shard_number> --num-shards=<total_shards>
@ -223,7 +233,7 @@ jobs:
aggregate-reports:
runs-on: ubuntu-latest
needs: [tests]
if: always()
if: always() && inputs.deploy_allure
steps:
- name: Download all allure results
@ -365,7 +375,7 @@ jobs:
- name: Send report to Discord
uses: rjstone/discord-webhook-notify@v1
if: always() && env.CALLER != 'manual'
if: always() && env.CALLER != 'manual' && inputs.send_discord
with:
severity: ${{ env.TESTS_RESULT == 'success' && 'info' || 'error' }}
username: ${{ github.workflow }}

3
.gitignore vendored
View File

@ -105,4 +105,5 @@ dmypy.json
# Pyre type checker
.pyre/
third_party/logos-delivery-python-bindings
# Waku node runtime artifacts
store.sqlite3*

View File

@ -52,6 +52,16 @@ To launch it manually:
2. Click **Run workflow**.
3. Pick the branch you want to test (defaults to `master`) and press **Run workflow**.
### PR tests
Every push to a pull request triggers **pr\_tests.yml** which runs:
1. **Build** — compiles `liblogosdelivery.so` (cached by submodule commit hash).
2. **Wrapper tests** — all tests under `tests/wrappers_tests/` that don't require Docker (~5 min).
3. **Smoke tests**`pytest -m smoke` with Docker nodes (~10 min).
To run the **full test suite** (18 shards, same as daily) on a PR, add the label **`full-test`** to the pull request. The full suite will start automatically.
### Ondemand matrix against custom *logos-messaging-nim* versions
Use **interop\_tests.yml** when you need to test a PR or a historical image:

View File

@ -12,5 +12,6 @@ log_file_format = %(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s
timeout = 300
markers =
smoke: marks tests as smoke test (deselect with '-m "not smoke"')
docker_required: test requires Docker nodes (WakuNode)
waku_test_fleet: marks tests that run against a live Waku test fleet
store2000: marks tests that use 2000 store messages

View File

@ -17,7 +17,6 @@ from src.node.docker_mananger import DockerManager
from src.env_vars import DOCKER_LOG_DIR
from src.data_storage import DS
from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS, VALID_PUBSUB_TOPICS
from src.node.wrappers_manager import WrapperManager
logger = get_custom_logger(__name__)
@ -106,25 +105,16 @@ class WakuNode:
self._container = None
self.rln_membership_index = None
self.start_args = {}
self._wrapper_node = None
self._rln_creds_set = False
logger.debug(f"WakuNode instance initialized with log path {self._log_path}")
@property
def _is_wrapper(self) -> bool:
return self._wrapper_node is not None
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
def start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs):
if WakuNode._pre_start_hook is not None:
kwargs = WakuNode._pre_start_hook(self, kwargs)
logger.debug("Starting Node...")
default_args, remove_container = self._prepare_start_context(**kwargs)
if use_wrapper:
self._start_wrapper(default_args, wait_for_node_sec)
else:
self._start_docker(default_args, remove_container, wait_for_node_sec)
self._start_docker(default_args, remove_container, wait_for_node_sec)
def _prepare_start_context(self, **kwargs):
self._docker_manager.create_network()
@ -285,7 +275,7 @@ class WakuNode:
},
}
@retry(stop=stop_after_attempt(1), wait=wait_fixed(0.1), reraise=True)
@retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True)
def register_rln(self, **kwargs):
logger.debug("Registering RLN credentials...")
self._docker_manager.create_network()
@ -338,12 +328,6 @@ class WakuNode:
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self):
if self._is_wrapper:
self._stop_wrapper()
else:
self._stop_docker()
def _stop_docker(self):
if self._container:
logger.debug(f"Stopping container with id {self._container.short_id}")
try:
@ -359,14 +343,6 @@ class WakuNode:
self._container = None
logger.debug("Container stopped.")
def _stop_wrapper(self):
logger.debug("Stopping wrapper node")
result = self._wrapper_node.stop_and_destroy()
if result.is_err():
logger.error(f"Failed to stop wrapper node: {result.err()}")
self._wrapper_node = None
logger.debug("Wrapper node stopped and destroyed.")
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self):
if self._container:
@ -450,32 +426,14 @@ class WakuNode:
def get_tcp_address(self):
return f"/ip4/{self._ext_ip}/tcp/{self._tcp_port}"
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0):
if self._is_wrapper:
result = self._wrapper_node.subscribe_content_topic(content_topic, timeout_s=timeout_s)
if result.is_err():
raise RuntimeError(f"subscribe_content_topic failed: {result.err()}")
return result.ok_value
else:
return self._api.set_relay_auto_subscriptions([content_topic])
def subscribe_content_topic(self, content_topic: str):
return self._api.set_relay_auto_subscriptions([content_topic])
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0):
if self._is_wrapper:
result = self._wrapper_node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s)
if result.is_err():
raise RuntimeError(f"unsubscribe_content_topic failed: {result.err()}")
return result.ok_value
else:
return self._api.delete_relay_auto_subscriptions([content_topic])
def unsubscribe_content_topic(self, content_topic: str):
return self._api.delete_relay_auto_subscriptions([content_topic])
def send_message(self, message: dict, *, timeout_s: float = 20.0):
if self._is_wrapper:
result = self._wrapper_node.send_message(message, timeout_s=timeout_s)
if result.is_err():
raise RuntimeError(f"send_message failed: {result.err()}")
return result.ok_value
else:
return self._api.send_relay_auto_message(message)
def send_message(self, message: dict):
return self._api.send_relay_auto_message(message)
def info(self):
return self._api.info()

218
src/node/wrapper_helpers.py Normal file
View File

@ -0,0 +1,218 @@
from __future__ import annotations
import json
import threading
import time
from typing import Optional
from src.libs.common import to_base64
DEFAULT_CONTENT_TOPIC = "/test/1/default/proto"
DEFAULT_PAYLOAD = to_base64("test payload")
EVENT_PROPAGATED = "message_propagated"
EVENT_SENT = "message_sent"
EVENT_ERROR = "message_error"
# MaxTimeInCache from send_service.nim.
MAX_TIME_IN_CACHE_S = 60.0
# Extra slack to cover the background retry loop tick after the window expires.
CACHE_EXPIRY_SLACK_S = 10.0
ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S
RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
class EventCollector:
"""Thread-safe collector for async node events.
Pass `collector.event_callback` as the `event_cb` argument to
WrapperManager.create_and_start(). Every event fired by the library
is decoded from JSON and appended to `self.events`.
"""
def __init__(self):
self._lock = threading.Lock()
self.events: list[dict] = []
def event_callback(self, ret: int, raw: bytes) -> None:
try:
payload = json.loads(raw.decode("utf-8"))
except Exception:
payload = {"_raw": raw.decode("utf-8", errors="replace"), "_ret": ret}
with self._lock:
self.events.append(payload)
def get_events_for_request(self, request_id: str) -> list[dict]:
with self._lock:
return [e for e in self.events if e.get("requestId") == request_id]
def snapshot(self) -> list[dict]:
"""Return a thread-safe copy of all collected events.
Use this whenever you need to iterate over every event (rather than
events for a single request_id). Iterating `self.events` directly is
unsafe because `event_callback` appends from the wrapper's event
thread.
"""
with self._lock:
return list(self.events)
def is_propagated_event(event: dict) -> bool:
return event.get("eventType") == EVENT_PROPAGATED
def is_sent_event(event: dict) -> bool:
return event.get("eventType") == EVENT_SENT
def is_error_event(event: dict) -> bool:
return event.get("eventType") == EVENT_ERROR
def wait_for_event(
collector: EventCollector,
request_id: str,
predicate,
timeout_s: float,
poll_interval_s: float = 0.5,
) -> Optional[dict]:
"""Poll until an event matching `predicate` arrives for `request_id`,
or until `timeout_s` elapses. Returns the matching event or None.
"""
deadline = time.monotonic() + timeout_s
while True:
for event in collector.get_events_for_request(request_id):
if predicate(event):
return event
if time.monotonic() >= deadline:
return None
time.sleep(poll_interval_s)
def wait_for_propagated(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]:
return wait_for_event(collector, request_id, is_propagated_event, timeout_s)
def wait_for_sent(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]:
return wait_for_event(collector, request_id, is_sent_event, timeout_s)
def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]:
return wait_for_event(collector, request_id, is_error_event, timeout_s)
def assert_no_error(collector: EventCollector, request_id: str, context: str = "") -> None:
"""Assert that no message_error event is currently buffered for `request_id`."""
event = wait_for_error(collector, request_id, timeout_s=0)
suffix = f" ({context})" if context else ""
assert event is None, f"Unexpected message_error event{suffix}: {event}"
def assert_no_sent(collector: EventCollector, request_id: str, context: str = "") -> None:
"""Assert that no message_sent event is currently buffered for `request_id`."""
event = wait_for_sent(collector, request_id, timeout_s=0)
suffix = f" ({context})" if context else ""
assert event is None, f"Unexpected message_sent event{suffix}: {event}"
def assert_no_propagated(collector: EventCollector, request_id: str, context: str = "") -> None:
"""Assert that no message_propagated event is currently buffered for `request_id`."""
event = wait_for_propagated(collector, request_id, timeout_s=0)
suffix = f" ({context})" if context else ""
assert event is None, f"Unexpected message_propagated event{suffix}: {event}"
def wait_for_connected(
collector: EventCollector,
timeout_s: float = 10.0,
poll_interval_s: float = 0.3,
) -> Optional[dict]:
"""Wait until a connection_status_change event with PartiallyConnected or Connected arrives."""
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
for event in collector.snapshot():
if event.get("eventType") == "connection_status_change" and event.get("connectionStatus") in ("PartiallyConnected", "Connected"):
return event
time.sleep(poll_interval_s)
return None
TERMINAL_EVENT_TYPES = {EVENT_PROPAGATED, EVENT_SENT, EVENT_ERROR}
def assert_event_invariants(collector: EventCollector, request_id: str) -> None:
"""Check per-request event invariants (issue #163):
- All events carry the correct requestId.
- No duplicate terminal events (Propagated, Sent, Error).
- Sent never appears before Propagated.
"""
events = collector.get_events_for_request(request_id)
assert events, f"No events found for request {request_id}"
counts: dict[str, int] = {}
first_index: dict[str, int] = {}
for i, event in enumerate(events):
assert event.get("requestId") == request_id, (
f"Event at index {i} has wrong requestId: " f"expected {request_id!r}, got {event.get('requestId')!r}"
)
event_type = event.get("eventType", "")
if event_type in TERMINAL_EVENT_TYPES:
counts[event_type] = counts.get(event_type, 0) + 1
if event_type not in first_index:
first_index[event_type] = i
for event_type, count in counts.items():
assert count == 1, f"Duplicate {event_type} events for request {request_id}: " f"got {count}, expected 1. Events: {events}"
if EVENT_SENT in first_index and EVENT_PROPAGATED in first_index:
assert first_index[EVENT_PROPAGATED] < first_index[EVENT_SENT], (
f"message_sent (index {first_index[EVENT_SENT]}) arrived before "
f"message_propagated (index {first_index[EVENT_PROPAGATED]}) "
f"for request {request_id}. Events: {events}"
)
def get_node_multiaddr(node) -> str:
"""Return the TCP multiaddr (with peer-id) from a WrapperManager node.
Asserts that the wrapper returned exactly one address. If the wrapper ever
starts returning multiple addresses (newline/comma-separated or a JSON
list), this fails loudly instead of silently passing a malformed string
downstream to staticnodes / add_peers.
"""
result = node.get_node_info_raw("MyMultiaddresses")
if result.is_err():
raise RuntimeError(f"get_node_info_raw failed: {result.err()}")
addr = result.ok_value.strip()
if not addr or not addr.startswith("/"):
raise RuntimeError(f"Unexpected multiaddr format: {addr!r}")
if "\n" in addr or "," in addr or addr.startswith("["):
raise AssertionError(f"Expected a single multiaddr from MyMultiaddresses, got multiple: {addr!r}")
return addr
def create_message_bindings(**overrides) -> dict:
envelope = {
"contentTopic": DEFAULT_CONTENT_TOPIC,
"payload": DEFAULT_PAYLOAD,
"ephemeral": False,
}
envelope.update(overrides)
return envelope
def assert_no_unknown_request_ids(collector: EventCollector, issued_request_ids) -> None:
"""Cross-association guard: every event carrying a requestId must belong
to one of the request ids we issued. Catches events that get attached to
the wrong request id under concurrency.
"""
issued = set(issued_request_ids)
for event in collector.snapshot():
event_request_id = event.get("requestId")
if event_request_id is None:
continue
assert event_request_id in issued, f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"

View File

@ -2,9 +2,9 @@ import sys
from pathlib import Path
from result import Result, Ok, Err
_THIRD_PARTY = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku"
if str(_THIRD_PARTY) not in sys.path:
sys.path.insert(0, str(_THIRD_PARTY))
_BINDINGS_PATH = Path(__file__).resolve().parents[2] / "vendor" / "logos-delivery-python-bindings" / "waku"
if str(_BINDINGS_PATH) not in sys.path:
sys.path.insert(0, str(_BINDINGS_PATH))
from wrapper import NodeWrapper as _NodeWrapper # type: ignore[import]
@ -77,5 +77,22 @@ class WrapperManager:
def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]:
return self._node.get_node_info(node_info_id, timeout_s=timeout_s)
def get_node_info_raw(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[str, str]:
"""Like get_node_info but returns the raw string without JSON parsing."""
from wrapper import lib, ffi, _new_cb_state, _wait_cb_raw # type: ignore[import]
state = _new_cb_state()
cb = self._node._make_waiting_cb(state)
rc = lib.logosdelivery_get_node_info(self._node.ctx, cb, ffi.NULL, node_info_id.encode("utf-8"))
if rc != 0:
return Err(f"get_node_info_raw: immediate call failed (ret={rc})")
wait_result = _wait_cb_raw(state, "get_node_info_raw", timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
return Err(f"get_node_info_raw: callback failed (ret={cb_ret})")
return Ok(cb_msg.decode("utf-8") if cb_msg else "")
def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]:
return self._node.get_available_configs(timeout_s=timeout_s)

View File

@ -250,6 +250,62 @@ class StepsStore(StepsCommon):
expected_hash == actual_hash
), f"Message hash at index {idx} returned by store doesn't match the computed message hash {expected_hash}. Actual hash: {actual_hash}"
@allure.step
def check_sent_message_is_stored(
self,
expected_hashes,
store_node=None,
peer_addr=None,
include_data=None,
pubsub_topic=None,
content_topics=None,
start_time=None,
end_time=None,
hashes=None,
cursor=None,
page_size=None,
ascending=None,
store_v="v3",
**kwargs,
):
"""Verify that messages with the given hashes are present in the store."""
if pubsub_topic is None:
pubsub_topic = self.test_pubsub_topic
if isinstance(expected_hashes, str):
expected_hashes = [expected_hashes]
if store_node is None:
store_node = self.store_nodes
elif not isinstance(store_node, list):
store_node = [store_node]
for node in store_node:
logger.debug(f"Checking that peer {node.image} can find the stored messages by hash")
self.store_response = self.get_messages_from_store(
node=node,
peer_addr=peer_addr,
include_data=include_data,
pubsub_topic=pubsub_topic,
content_topics=content_topics,
start_time=start_time,
end_time=end_time,
hashes=hashes,
cursor=cursor,
page_size=page_size,
ascending=ascending,
store_v=store_v,
**kwargs,
)
logger.debug(f"messages length is {len(self.store_response.messages)}")
assert self.store_response.messages, f"Peer {node.image} couldn't find any messages. " f"Actual response: {self.store_response.resp_json}"
assert len(self.store_response.messages) >= len(expected_hashes), (
f"Expected at least {len(expected_hashes)} messages " f"but got {len(self.store_response.messages)}"
)
actual_hashes = [self.store_response.message_hash(i) for i in range(len(self.store_response.messages))]
for expected_hash in expected_hashes:
assert expected_hash in actual_hashes, f"Expected hash {expected_hash} not found in store. " f"Actual hashes: {actual_hashes}"
@allure.step
def check_store_returns_empty_response(self, pubsub_topic=None):
if not pubsub_topic:

View File

@ -3,7 +3,8 @@ import pytest
from src.test_data import DEFAULT_CLUSTER_ID
def _free_port():
def free_port():
"""Return a currently-unbound TCP/UDP port from the OS."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
return s.getsockname()[1]
@ -13,9 +14,9 @@ def build_node_config(**overrides):
config = {
"logLevel": "DEBUG",
"listenAddress": "0.0.0.0",
"tcpPort": _free_port(),
"discv5UdpPort": _free_port(),
"restPort": _free_port(),
"tcpPort": free_port(),
"discv5UdpPort": free_port(),
"restPort": free_port(),
"restAddress": "0.0.0.0",
"clusterId": DEFAULT_CLUSTER_ID,
"relay": True,

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,700 @@
import base64
import pytest
from src.steps.common import StepsCommon
from src.libs.common import delay, to_base64
from src.libs.custom_logger import get_custom_logger
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import (
EventCollector,
assert_event_invariants,
create_message_bindings,
get_node_multiaddr,
wait_for_connected,
wait_for_propagated,
wait_for_sent,
wait_for_error,
)
from tests.wrappers_tests.conftest import build_node_config
logger = get_custom_logger(__name__)
PROPAGATED_TIMEOUT_S = 30.0
SENT_TIMEOUT_S = 10.0
NO_SENT_OBSERVATION_S = 5.0
SENT_AFTER_STORE_TIMEOUT_S = 60.0
OVERSIZED_PAYLOAD_BYTES = 200 * 1024
RECOVERY_TIMEOUT_S = 45.0
SERVICE_DOWN_SETTLE_S = 3.0
# MaxTimeInCache from send_service.nim.
MAX_TIME_IN_CACHE_S = 60.0
# Extra slack to cover the background retry loop tick after the window expires.
CACHE_EXPIRY_SLACK_S = 10.0
ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S
RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
class TestS02AutoSubscribeOnFirstSend(StepsCommon):
"""
S02 Auto-subscribe on first send.
Sender never calls subscribe_content_topic() before send().
The send API must auto-subscribe to the content topic used in the message.
Expected: send() returns Ok(RequestId), message_propagated arrives.
"""
def test_s02_send_without_explicit_subscribe(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S02 auto-subscribe test payload"),
contentTopic="/test/1/s02-auto-subscribe/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
class TestS06CoreSenderRelayOnly(StepsCommon):
"""
S06 Core sender with relay peers only, no store.
Sender has local relay enabled and is connected to one relay peer.
Expected: send() returns Ok(RequestId), message_propagated event arrives,
no message_sent (store disabled), no message_error.
"""
def test_s06_relay_propagation_without_store(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S06 relay-only test payload"),
contentTopic="/test/1/s06-relay-only/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
assert_event_invariants(sender_collector, request_id)
class TestS07CoreSenderRelayAndStore(StepsCommon):
"""
S07 Core sender with relay peers and store peer, reliability enabled.
Sender relays message to a store-capable peer; delivery service validates
the message reached the store via p2p reliability check.
Expected: Propagated, then Sent.
"""
def test_s07_relay_propagation_with_store_validation(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
"store": True,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start store peer: {peer_result.err()}"
with peer_result.ok_value:
message = create_message_bindings(
payload=to_base64("S07 relay+store test payload"),
contentTopic="/test/1/s07-relay-store/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
sent = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=SENT_TIMEOUT_S,
)
assert sent is not None, (
f"No message_sent event within {SENT_TIMEOUT_S}s after propagation. " f"Collected events: {sender_collector.events}"
)
assert sent["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
assert_event_invariants(sender_collector, request_id)
class TestRelayToLightpushFallback(StepsCommon):
"""S08/S09 — Relay-to-lightpush fallback.
Sender has relay enabled but zero gossipsub relay peers.
A lightpush peer is reachable via lightpushnode (no staticnodes).
Relay fails with NO_PEERS_TO_RELAY, lightpush fallback succeeds
in the same processing pass.
Topology:
[Service] relay=True, lightpush=True
[RelayPeer] relay=True, staticnodes=[service] (gives service gossipsub mesh)
[Sender] relay=True, lightpush=True, lightpushnode=service
(no staticnodes zero gossipsub relay peers fallback)
"""
@pytest.mark.xfail(reason="the test fail without lightpushnode, see https://github.com/logos-messaging/logos-delivery/issues/3847")
def test_s08_relay_fallback_to_lightpush(self, node_config):
"""S08: no store peer → Propagated only."""
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"lightpush": True,
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
service_result = WrapperManager.create_and_start(config=node_config)
assert service_result.is_ok(), f"Failed to start service: {service_result.err()}"
with service_result.ok_value as service:
service_addr = get_node_multiaddr(service)
relay_config = {
**node_config,
"lightpush": False,
"staticnodes": [service_addr],
"portsShift": 1,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
sender_config = {
**node_config,
# "lightpushnode": service_addr, #this comment currently raise issue
"portsShift": 2,
"discv5Discovery": True,
}
sender_result = WrapperManager.create_and_start(
config=sender_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
message = create_message_bindings()
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
assert sent is None, f"Unexpected message_sent event (no store peer): {sent}"
assert_event_invariants(sender_collector, request_id)
def test_s09_relay_fallback_to_lightpush_with_store_validation(self, node_config):
"""S09: S08 + store peer + reliability → Propagated, then Sent."""
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"lightpush": True,
"store": True,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
service_result = WrapperManager.create_and_start(config=node_config)
assert service_result.is_ok(), f"Failed to start service: {service_result.err()}"
with service_result.ok_value as service:
service_addr = get_node_multiaddr(service)
relay_config = {
**node_config,
"lightpush": False,
"store": False,
"staticnodes": [service_addr],
"portsShift": 1,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
sender_config = {**node_config, "reliabilityEnabled": True, "storenode": service_addr, "portsShift": 2, "store": False}
sender_result = WrapperManager.create_and_start(
config=sender_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
message = create_message_bindings()
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
sent = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=SENT_AFTER_STORE_TIMEOUT_S,
)
assert sent is not None, (
f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s "
f"after propagation. Collected events: {sender_collector.events}"
)
assert sent["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
assert_event_invariants(sender_collector, request_id)
class TestS10EdgeSenderLightpushOnly(StepsCommon):
"""
S10 Edge sender with lightpush path only, no store peer.
Edge sender has no local relay; it publishes via a lightpush service node.
Expected: Propagated only (no Sent, no Error).
"""
@pytest.mark.xfail(reason="lightpush peer discovery via staticnodes is broken, see https://github.com/logos-messaging/logos-delivery/issues/3847")
def test_s10_edge_lightpush_propagation(self, node_config):
sender_collector = EventCollector()
common = {
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
service_config = build_node_config(relay=True, lightpush=True, **common)
service_result = WrapperManager.create_and_start(config=service_config)
assert service_result.is_ok(), f"Failed to start service node: {service_result.err()}"
with service_result.ok_value as service_node:
service_multiaddr = get_node_multiaddr(service_node)
relay_config = build_node_config(
relay=True,
staticnodes=[service_multiaddr],
**common,
)
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
edge_config = build_node_config(
mode="Edge",
staticnodes=[service_multiaddr],
**common,
)
edge_result = WrapperManager.create_and_start(
config=edge_config,
event_cb=sender_collector.event_callback,
)
assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"
with edge_result.ok_value as edge_sender:
message = create_message_bindings(
payload=to_base64("S10 edge lightpush test payload"),
contentTopic="/test/1/s10-edge-lightpush/proto",
)
send_result = edge_sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
sent = wait_for_sent(sender_collector, request_id, timeout_s=NO_SENT_OBSERVATION_S)
assert sent is None, f"Unexpected message_sent event (no store peer): {sent}"
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
assert_event_invariants(sender_collector, request_id)
class TestS12IsolatedSenderNoPeers(StepsCommon):
"""
S12 Isolated sender, no peers.
Sender has relay enabled but zero relay peers and zero lightpush peers.
Expected: send() returns Ok(RequestId), but eventually a message_error
event arrives (no route to propagate).
"""
def test_s12_send_with_no_peers_produces_error(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
message = create_message_bindings(
payload=to_base64("S12 isolated sender payload"),
contentTopic="/test/1/s12-isolated/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
error = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S,
)
assert error is not None, (
f"No message_error event within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s "
f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack) for isolated sender. "
f"Collected events: {sender_collector.events}"
)
assert error["requestId"] == request_id
propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert propagated is None, f"Unexpected message_propagated event for isolated sender: {propagated}"
class TestS14LightpushNonRetryableError(StepsCommon):
"""
S14 Lightpush non-retryable error via oversized message.
Edge sender publishes a message exceeding DefaultMaxWakuMessageSize (150KiB)
through a lightpush service node. The server validates message size and
returns INVALID_MESSAGE (420), a non-retryable error.
Expected: send() returns Ok(RequestId), then message_error event.
"""
def test_s14_oversized_message_triggers_error(self):
sender_collector = EventCollector()
common = {
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
service_config = build_node_config(relay=True, lightpush=True, **common)
service_result = WrapperManager.create_and_start(config=service_config)
assert service_result.is_ok(), f"Failed to start service: {service_result.err()}"
with service_result.ok_value as service:
service_multiaddr = get_node_multiaddr(service)
edge_config = build_node_config(
mode="Edge",
staticnodes=[service_multiaddr],
**common,
)
edge_result = WrapperManager.create_and_start(
config=edge_config,
event_cb=sender_collector.event_callback,
)
assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"
with edge_result.ok_value as edge_sender:
oversized_payload = base64.b64encode(b"x" * OVERSIZED_PAYLOAD_BYTES).decode()
message = create_message_bindings(
payload=oversized_payload,
contentTopic="/test/1/s14-oversized/proto",
)
send_result = edge_sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
error = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert error is not None, (
f"No message_error event within {PROPAGATED_TIMEOUT_S}s "
f"after sending oversized message. "
f"Collected events: {sender_collector.events}"
)
assert error["requestId"] == request_id
logger.info(f"S14 received error event: {error}")
error_msg = error.get("error", "").lower()
assert "size exceeded" in error_msg, f"Error message doesn't indicate size violation: {error}"
propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert propagated is None, f"Unexpected message_propagated for an invalid message: {propagated}"
assert_event_invariants(sender_collector, request_id)
class TestS15LightpushRetryableErrorRecovery(StepsCommon):
"""
S15 Lightpush retryable error + recovery.
Edge sender publishes via a lightpush service node that has NO relay peers.
The service accepts the lightpush request but returns NO_PEERS_TO_RELAY
a retryable error (explicitly listed in the S15 spec). The message enters
the retry loop. A relay peer then joins the service node, and the next
retry succeeds.
Expected: send() returns Ok(RequestId), then eventually Propagated.
"""
@pytest.mark.xfail(reason="lightpush peer discovery via staticnodes is broken, see https://github.com/logos-messaging/logos-delivery/issues/3847")
def test_s15_lightpush_retryable_error_then_recovery(self):
sender_collector = EventCollector()
common = {
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
service_config = build_node_config(relay=True, lightpush=True, **common)
service_result = WrapperManager.create_and_start(config=service_config)
assert service_result.is_ok(), f"Failed to start service: {service_result.err()}"
with service_result.ok_value as service:
service_multiaddr = get_node_multiaddr(service)
edge_config = build_node_config(
mode="Edge",
staticnodes=[service_multiaddr],
**common,
)
edge_result = WrapperManager.create_and_start(
config=edge_config,
event_cb=sender_collector.event_callback,
)
assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"
with edge_result.ok_value as edge_sender:
msg = create_message_bindings(
payload=to_base64("S15 retryable error recovery"),
contentTopic="/test/1/s15-recovery/proto",
)
send_result = edge_sender.send_message(message=msg)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
delay(SERVICE_DOWN_SETTLE_S)
early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert early_propagated is None, (
f"message_propagated arrived before relay peer joined — " f"retryable error path was not exercised: {early_propagated}"
)
relay_config = build_node_config(
relay=True,
staticnodes=[service_multiaddr],
**common,
)
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=RECOVERY_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated within {RECOVERY_TIMEOUT_S}s "
f"after relay peer joined. "
f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error after recovery: {error}"
assert_event_invariants(sender_collector, request_id)