diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml new file mode 100644 index 000000000..69bb5a399 --- /dev/null +++ b/.github/workflows/pr_tests.yml @@ -0,0 +1,235 @@ +name: PR Tests + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +on: + pull_request: + types: [opened, synchronize, reopened, labeled] + paths: + - "src/**" + - "tests/**" + - "vendor/**" + - "requirements.txt" + - "pytest.ini" + - ".github/workflows/pr_tests.yml" + push: + branches: [master] + paths: + - "vendor/**" + workflow_dispatch: + inputs: + run_full_suite: + description: "Run the full test suite (18 shards)" + required: false + default: false + type: boolean + +jobs: + build: + name: Build liblogosdelivery + runs-on: ubuntu-latest + timeout-minutes: 45 + if: >- + github.event.action != 'labeled' || + github.event.label.name == 'full-test' + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Compute cache key + id: cache-key + run: | + BINDINGS_HASH=$(git rev-parse HEAD:vendor/logos-delivery-python-bindings) + DELIVERY_HASH=$(git -C vendor/logos-delivery-python-bindings rev-parse HEAD:vendor/logos-delivery) + echo "key=liblogosdelivery-${{ runner.os }}-nim2.2.4-${BINDINGS_HASH}-${DELIVERY_HASH}" >> "$GITHUB_OUTPUT" + + - name: Cache liblogosdelivery.so + id: cache-lib + uses: actions/cache@v4 + with: + path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so + key: ${{ steps.cache-key.outputs.key }} + + - name: Remove unwanted software + if: steps.cache-lib.outputs.cache-hit != 'true' + uses: ./.github/actions/prune-vm + + - name: Install system deps + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + sudo apt-get update + sudo apt-get install -y \ + util-linux \ + iproute2 \ + sudo \ + ca-certificates \ + curl \ + make \ + gcc \ + g++ + + - name: Install Nim 2.2.4 + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + set -euo pipefail + curl https://nim-lang.org/choosenim/init.sh -sSf | sh -s -- -y + echo "$HOME/.nimble/bin" >> "$GITHUB_PATH" + export PATH="$HOME/.nimble/bin:$PATH" + choosenim 2.2.4 + nim --version + nimble --version + + - name: Build liblogosdelivery.so + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + set -euo pipefail + + export PATH="$HOME/.nimble/bin:$PATH" + + BINDINGS_DIR="$(pwd)/vendor/logos-delivery-python-bindings" + DELIVERY_DIR="$BINDINGS_DIR/vendor/logos-delivery" + + mkdir -p "$BINDINGS_DIR/lib" + + cd "$DELIVERY_DIR" + + ln -sf waku.nimble waku.nims + + nimble install -y + + make setup + + make liblogosdelivery + + SO_PATH="$(find . -type f -name 'liblogosdelivery.so' | head -n 1)" + + if [ -z "$SO_PATH" ]; then + echo "liblogosdelivery.so was not built" + exit 1 + fi + + cp "$SO_PATH" "$BINDINGS_DIR/lib/liblogosdelivery.so" + + echo "Built library:" + ls -l "$BINDINGS_DIR/lib/liblogosdelivery.so" + + - name: Upload library artifact + uses: actions/upload-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so + retention-days: 1 + + wrapper-tests: + name: Wrapper Tests + runs-on: ubuntu-latest + needs: [build] + timeout-minutes: 15 + if: >- + github.event_name != 'push' && + (github.event.action != 'labeled' || github.event.label.name == 'full-test') + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - uses: actions/setup-python@v4 + with: + python-version: "3.12" + cache: "pip" + + - run: pip install -r requirements.txt + + - name: Download liblogosdelivery.so + uses: actions/download-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/ + + - name: Run wrapper tests + env: + PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku + run: | + pytest tests/wrappers_tests/ \ + -m "not docker_required" \ + --reruns 2 \ + --junit-xml=wrapper-results.xml + + - name: Test Report + if: always() + uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2 + with: + name: Wrapper Test Results + path: wrapper-results.xml + reporter: java-junit + use-actions-summary: "true" + + smoke-tests: + name: Smoke Tests + runs-on: ubuntu-latest + needs: [build] + timeout-minutes: 30 + if: >- + github.event_name != 'push' && + (github.event.action != 'labeled' || github.event.label.name == 'full-test') + env: + NODE_1: "wakuorg/nwaku:latest" + NODE_2: "wakuorg/nwaku:latest" + ADDITIONAL_NODES: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest" + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - uses: actions/setup-python@v4 + with: + python-version: "3.12" + cache: "pip" + + - run: pip install -r requirements.txt + + - name: Download liblogosdelivery.so + uses: actions/download-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/ + + - name: Run smoke tests + env: + PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku + run: | + pytest -m "smoke and not docker_required" \ + --ignore=vendor/logos-delivery-python-bindings/tests \ + --ignore=tests/wrappers_tests \ + --reruns 1 \ + -n 4 \ + --dist=loadgroup \ + --junit-xml=smoke-results.xml + + - name: Test Report + if: always() + uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2 + with: + name: Smoke Test Results + path: smoke-results.xml + reporter: java-junit + use-actions-summary: "true" + + full-suite: + name: Full Suite + if: >- + github.event_name != 'push' && + (contains(github.event.pull_request.labels.*.name, 'full-test') || + github.event.inputs.run_full_suite == 'true') + uses: ./.github/workflows/test_common.yml + secrets: inherit + with: + node1: "wakuorg/nwaku:latest" + node2: "wakuorg/nwaku:latest" + additional_nodes: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest" + caller: "pr" + deploy_allure: false + send_discord: false diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index d72e310eb..af2703479 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -22,6 +22,16 @@ on: required: false description: "Workflow caller. Used in reporting" type: string + deploy_allure: + required: false + description: "Deploy allure report to gh-pages" + type: boolean + default: true + send_discord: + required: false + description: "Send test results to Discord" + type: boolean + default: false env: FORCE_COLOR: "1" @@ -206,7 +216,7 @@ jobs: aggregate-reports: runs-on: ubuntu-latest needs: [tests] - if: always() + if: always() && inputs.deploy_allure steps: - name: Download all allure results @@ -348,7 +358,7 @@ jobs: - name: Send report to Discord uses: rjstone/discord-webhook-notify@v1 - if: always() && env.CALLER != 'manual' + if: always() && env.CALLER != 'manual' && inputs.send_discord with: severity: ${{ env.TESTS_RESULT == 'success' && 'info' || 'error' }} username: ${{ github.workflow }} diff --git a/README.md b/README.md index d731ab6a1..1b6903fbf 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,16 @@ To launch it manually: 2. Click **► Run workflow**. 3. Pick the branch you want to test (defaults to `master`) and press **Run workflow**. +### PR tests + +Every push to a pull request triggers **pr\_tests.yml** which runs: + +1. **Build** — compiles `liblogosdelivery.so` (cached by submodule commit hash). +2. **Wrapper tests** — all tests under `tests/wrappers_tests/` that don't require Docker (~5 min). +3. **Smoke tests** — `pytest -m smoke` with Docker nodes (~10 min). + +To run the **full test suite** (18 shards, same as daily) on a PR, add the label **`full-test`** to the pull request. The full suite will start automatically. + ### On‑demand matrix against custom *logos-messaging-nim* versions Use **interop\_tests.yml** when you need to test a PR or a historical image: diff --git a/pytest.ini b/pytest.ini index 3a13b3b9c..2406bfb06 100644 --- a/pytest.ini +++ b/pytest.ini @@ -12,3 +12,4 @@ log_file_format = %(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s timeout = 300 markers = smoke: marks tests as smoke test (deselect with '-m "not smoke"') + docker_required: test requires Docker nodes (WakuNode) diff --git a/src/node/wrapper_helpers.py b/src/node/wrapper_helpers.py index c53cb038e..ecce2e50f 100644 --- a/src/node/wrapper_helpers.py +++ b/src/node/wrapper_helpers.py @@ -12,10 +12,6 @@ EVENT_PROPAGATED = "message_propagated" EVENT_SENT = "message_sent" EVENT_ERROR = "message_error" -# --------------------------------------------------------------------------- -# Event collection -# --------------------------------------------------------------------------- - class EventCollector: """Thread-safe collector for async node events. @@ -67,14 +63,14 @@ def wait_for_event( """ deadline = time.monotonic() + timeout_s - while time.monotonic() < deadline: + while True: for event in collector.get_events_for_request(request_id): if predicate(event): return event + if time.monotonic() >= deadline: + return None time.sleep(poll_interval_s) - return None - def wait_for_propagated(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]: return wait_for_event(collector, request_id, is_propagated_event, timeout_s) @@ -88,6 +84,57 @@ def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float) return wait_for_event(collector, request_id, is_error_event, timeout_s) +def wait_for_connected( + collector: EventCollector, + timeout_s: float = 10.0, + poll_interval_s: float = 0.3, +) -> Optional[dict]: + """Wait until a connection_status_change event with PartiallyConnected or Connected arrives.""" + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + with collector._lock: + for event in collector.events: + if event.get("eventType") == "connection_status_change" and event.get("connectionStatus") in ("PartiallyConnected", "Connected"): + return event + time.sleep(poll_interval_s) + return None + + +TERMINAL_EVENT_TYPES = {EVENT_PROPAGATED, EVENT_SENT, EVENT_ERROR} + + +def assert_event_invariants(collector: EventCollector, request_id: str) -> None: + """Check per-request event invariants (issue #163): + - All events carry the correct requestId. + - No duplicate terminal events (Propagated, Sent, Error). + - Sent never appears before Propagated. + """ + events = collector.get_events_for_request(request_id) + assert events, f"No events found for request {request_id}" + + counts: dict[str, int] = {} + first_index: dict[str, int] = {} + for i, event in enumerate(events): + assert event.get("requestId") == request_id, ( + f"Event at index {i} has wrong requestId: " f"expected {request_id!r}, got {event.get('requestId')!r}" + ) + event_type = event.get("eventType", "") + if event_type in TERMINAL_EVENT_TYPES: + counts[event_type] = counts.get(event_type, 0) + 1 + if event_type not in first_index: + first_index[event_type] = i + + for event_type, count in counts.items(): + assert count == 1, f"Duplicate {event_type} events for request {request_id}: " f"got {count}, expected 1. Events: {events}" + + if EVENT_SENT in first_index and EVENT_PROPAGATED in first_index: + assert first_index[EVENT_PROPAGATED] < first_index[EVENT_SENT], ( + f"message_sent (index {first_index[EVENT_SENT]}) arrived before " + f"message_propagated (index {first_index[EVENT_PROPAGATED]}) " + f"for request {request_id}. Events: {events}" + ) + + def get_node_multiaddr(node) -> str: """Return the first TCP multiaddr (with peer-id) from a WrapperManager node.""" result = node.get_node_info_raw("MyMultiaddresses") @@ -101,7 +148,6 @@ def get_node_multiaddr(node) -> str: return addr -# This API for creating messages for send.API not the REST calls def create_message_bindings(**overrides) -> dict: envelope = { "contentTopic": DEFAULT_CONTENT_TOPIC, diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/wrappers_tests/test_send_e2e_part1.py b/tests/wrappers_tests/test_send_e2e_part1.py index 5886be168..994e7d170 100644 --- a/tests/wrappers_tests/test_send_e2e_part1.py +++ b/tests/wrappers_tests/test_send_e2e_part1.py @@ -961,70 +961,3 @@ class TestSendBeforeRelay(StepsStore): request_ids.append(request_id) return request_ids - - -class TestS06CoreSenderRelayOnly(StepsCommon): - """ - S06 — Core sender with relay peers only, no store. - Sender has local relay enabled and is connected to one relay peer. - Expected: send() returns Ok(RequestId), message_propagated event arrives, - no message_sent (store disabled), no message_error. - """ - - def test_s06_relay_propagation_without_store(self, node_config): - sender_collector = EventCollector() - - node_config.update( - { - "relay": True, - "store": False, - "lightpush": False, - "filter": False, - "discv5Discovery": False, - "numShardsInNetwork": 1, - } - ) - - sender_result = WrapperManager.create_and_start( - config=node_config, - event_cb=sender_collector.event_callback, - ) - assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" - - with sender_result.ok_value as sender: - peer_config = { - **node_config, - "staticnodes": [get_node_multiaddr(sender)], - "portsshift": 1, - } - - peer_result = WrapperManager.create_and_start(config=peer_config) - assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" - - with peer_result.ok_value: - message = self.create_message( - payload=to_base64("S06 relay-only test payload"), - contentTopic="/test/1/s06-relay-only/proto", - ) - - send_result = sender.send_message(message=message) - assert send_result.is_ok(), f"send() failed: {send_result.err()}" - - request_id = send_result.ok_value - assert request_id, "send() returned an empty RequestId" - - propagated = wait_for_propagated( - collector=sender_collector, - request_id=request_id, - timeout_s=PROPAGATED_TIMEOUT_S, - ) - assert propagated is not None, ( - f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" - ) - assert propagated["requestId"] == request_id - - error = wait_for_error(sender_collector, request_id, timeout_s=0) - assert error is None, f"Unexpected message_error event: {error}" - - sent = wait_for_sent(sender_collector, request_id, timeout_s=0) - assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}" diff --git a/tests/wrappers_tests/test_send_e2e_part2.py b/tests/wrappers_tests/test_send_e2e_part2.py new file mode 100644 index 000000000..2e36a53a1 --- /dev/null +++ b/tests/wrappers_tests/test_send_e2e_part2.py @@ -0,0 +1,709 @@ +import base64 + +import pytest +from src.steps.common import StepsCommon +from src.libs.common import delay, to_base64 +from src.libs.custom_logger import get_custom_logger +from src.node.wrappers_manager import WrapperManager +from src.node.wrapper_helpers import ( + EventCollector, + assert_event_invariants, + create_message_bindings, + get_node_multiaddr, + wait_for_connected, + wait_for_propagated, + wait_for_sent, + wait_for_error, +) +from src.steps.store import StepsStore +from tests.wrappers_tests.conftest import build_node_config + +logger = get_custom_logger(__name__) + +PROPAGATED_TIMEOUT_S = 30.0 +SENT_TIMEOUT_S = 10.0 +NO_SENT_OBSERVATION_S = 5.0 +SENT_AFTER_STORE_TIMEOUT_S = 60.0 +OVERSIZED_PAYLOAD_BYTES = 200 * 1024 +RECOVERY_TIMEOUT_S = 45.0 +SERVICE_DOWN_SETTLE_S = 3.0 + +# MaxTimeInCache from send_service.nim. +MAX_TIME_IN_CACHE_S = 60.0 +# Extra slack to cover the background retry loop tick after the window expires. +CACHE_EXPIRY_SLACK_S = 10.0 +ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S +RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window" + + +class TestS07CoreSenderRelayAndStore(StepsCommon): + """ + S07 — Core sender with relay peers and store peer, reliability enabled. + Sender relays message to a store-capable peer; delivery service validates + the message reached the store via p2p reliability check. + Expected: Propagated, then Sent. + """ + + def test_s07_relay_propagation_with_store_validation(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "reliabilityEnabled": True, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsShift": 1, + "store": True, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start store peer: {peer_result.err()}" + + with peer_result.ok_value: + message = create_message_bindings( + payload=to_base64("S07 relay+store test payload"), + contentTopic="/test/1/s07-relay-store/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + sent = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_TIMEOUT_S, + ) + assert sent is not None, ( + f"No message_sent event within {SENT_TIMEOUT_S}s after propagation. " f"Collected events: {sender_collector.events}" + ) + assert sent["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS10EdgeSenderLightpushOnly(StepsCommon): + """ + S10 — Edge sender with lightpush path only, no store peer. + Edge sender has no local relay; it publishes via a lightpush service node. + Expected: Propagated only (no Sent, no Error). + """ + + def test_s10_edge_lightpush_propagation(self, node_config): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + + service_config = build_node_config(relay=True, lightpush=True, **common) + + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start service node: {service_result.err()}" + + with service_result.ok_value as service_node: + service_multiaddr = get_node_multiaddr(service_node) + + relay_config = build_node_config( + relay=True, + staticnodes=[service_multiaddr], + **common, + ) + + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + edge_config = build_node_config( + mode="Edge", + relay=False, + lightpushnode=service_multiaddr, + staticnodes=[service_multiaddr], + **common, + ) + + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + message = create_message_bindings( + payload=to_base64("S10 edge lightpush test payload"), + contentTopic="/test/1/s10-edge-lightpush/proto", + ) + + send_result = edge_sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + sent = wait_for_sent(sender_collector, request_id, timeout_s=NO_SENT_OBSERVATION_S) + assert sent is None, f"Unexpected message_sent event (no store peer): {sent}" + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS06CoreSenderRelayOnly(StepsCommon): + """ + S06 — Core sender with relay peers only, no store. + Sender has local relay enabled and is connected to one relay peer. + Expected: send() returns Ok(RequestId), message_propagated event arrives, + no message_sent (store disabled), no message_error. + """ + + def test_s06_relay_propagation_without_store(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsShift": 1, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" + + with peer_result.ok_value: + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + + message = create_message_bindings( + payload=to_base64("S06 relay-only test payload"), + contentTopic="/test/1/s06-relay-only/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + sent = wait_for_sent(sender_collector, request_id, timeout_s=0) + assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS02AutoSubscribeOnFirstSend(StepsCommon): + """ + S02 — Auto-subscribe on first send. + Sender never calls subscribe_content_topic() before send(). + The send API must auto-subscribe to the content topic used in the message. + Expected: send() returns Ok(RequestId), message_propagated arrives. + """ + + def test_s02_send_without_explicit_subscribe(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsShift": 1, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" + + with peer_result.ok_value: + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + + message = create_message_bindings( + payload=to_base64("S02 auto-subscribe test payload"), + contentTopic="/test/1/s02-auto-subscribe/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + +class TestS12IsolatedSenderNoPeers(StepsCommon): + """ + S12 — Isolated sender, no peers. + Sender has relay enabled but zero relay peers and zero lightpush peers. + Expected: send() returns Ok(RequestId), but eventually a message_error + event arrives (no route to propagate). + """ + + def test_s12_send_with_no_peers_produces_error(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings( + payload=to_base64("S12 isolated sender payload"), + contentTopic="/test/1/s12-isolated/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + error = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S, + ) + assert error is not None, ( + f"No message_error event within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s " + f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack) for isolated sender. " + f"Collected events: {sender_collector.events}" + ) + assert error["requestId"] == request_id + + propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert propagated is None, f"Unexpected message_propagated event for isolated sender: {propagated}" + + +class TestS14LightpushNonRetryableError(StepsCommon): + """ + S14 — Lightpush non-retryable error via oversized message. + Edge sender publishes a message exceeding DefaultMaxWakuMessageSize (150KiB) + through a lightpush service node. The server validates message size and + returns INVALID_MESSAGE (420), a non-retryable error. + Expected: send() returns Ok(RequestId), then message_error event. + """ + + def test_s14_oversized_message_triggers_error(self): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + + service_config = build_node_config(relay=True, lightpush=True, **common) + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_multiaddr = get_node_multiaddr(service) + + edge_config = build_node_config( + mode="Edge", + relay=False, + lightpushnode=service_multiaddr, + staticnodes=[service_multiaddr], + **common, + ) + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + oversized_payload = base64.b64encode(b"x" * OVERSIZED_PAYLOAD_BYTES).decode() + message = create_message_bindings( + payload=oversized_payload, + contentTopic="/test/1/s14-oversized/proto", + ) + + send_result = edge_sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + error = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert error is not None, ( + f"No message_error event within {PROPAGATED_TIMEOUT_S}s " + f"after sending oversized message. " + f"Collected events: {sender_collector.events}" + ) + assert error["requestId"] == request_id + logger.info(f"S14 received error event: {error}") + + error_msg = error.get("error", "").lower() + assert "size exceeded" in error_msg, f"Error message doesn't indicate size violation: {error}" + + propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert propagated is None, f"Unexpected message_propagated for an invalid message: {propagated}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS15LightpushRetryableErrorRecovery(StepsCommon): + """ + S15 — Lightpush retryable error + recovery. + Edge sender publishes via a lightpush service node that has NO relay peers. + The service accepts the lightpush request but returns NO_PEERS_TO_RELAY — + a retryable error (explicitly listed in the S15 spec). The message enters + the retry loop. A relay peer then joins the service node, and the next + retry succeeds. + Expected: send() returns Ok(RequestId), then eventually Propagated. + """ + + def test_s15_lightpush_retryable_error_then_recovery(self): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + + service_config = build_node_config(relay=True, lightpush=True, **common) + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_multiaddr = get_node_multiaddr(service) + + edge_config = build_node_config( + mode="Edge", + relay=False, + lightpushnode=service_multiaddr, + staticnodes=[service_multiaddr], + **common, + ) + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + msg = create_message_bindings( + payload=to_base64("S15 retryable error recovery"), + contentTopic="/test/1/s15-recovery/proto", + ) + send_result = edge_sender.send_message(message=msg) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + delay(SERVICE_DOWN_SETTLE_S) + + early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert early_propagated is None, ( + f"message_propagated arrived before relay peer joined — " f"retryable error path was not exercised: {early_propagated}" + ) + + relay_config = build_node_config( + relay=True, + staticnodes=[service_multiaddr], + **common, + ) + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=RECOVERY_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated within {RECOVERY_TIMEOUT_S}s " + f"after relay peer joined. " + f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error after recovery: {error}" + + assert_event_invariants(sender_collector, request_id) + + +class TestRelayToLightpushFallback(StepsCommon): + """S08/S09 — Relay-to-lightpush fallback. + + Sender has relay enabled but zero gossipsub relay peers. + A lightpush peer is reachable via lightpushnode (no staticnodes). + Relay fails with NO_PEERS_TO_RELAY, lightpush fallback succeeds + in the same processing pass. + + Topology: + [Service] relay=True, lightpush=True + [RelayPeer] relay=True, staticnodes=[service] (gives service gossipsub mesh) + [Sender] relay=True, lightpush=True, lightpushnode=service + (no staticnodes → zero gossipsub relay peers → fallback) + """ + + def test_s08_relay_fallback_to_lightpush(self, node_config): + """S08: no store peer → Propagated only.""" + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "lightpush": True, + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + service_result = WrapperManager.create_and_start(config=node_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_addr = get_node_multiaddr(service) + + relay_config = { + **node_config, + "lightpush": False, + "staticnodes": [service_addr], + "portsShift": 1, + } + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + sender_config = { + **node_config, + "lightpushnode": service_addr, + "portsShift": 2, + } + sender_result = WrapperManager.create_and_start( + config=sender_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings() + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + sent = wait_for_sent(sender_collector, request_id, timeout_s=0) + assert sent is None, f"Unexpected message_sent event (no store peer): {sent}" + + assert_event_invariants(sender_collector, request_id) + + def test_s09_relay_fallback_to_lightpush_with_store_validation(self, node_config): + """S09: S08 + store peer + reliability → Propagated, then Sent.""" + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "lightpush": True, + "store": True, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + service_result = WrapperManager.create_and_start(config=node_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_addr = get_node_multiaddr(service) + + relay_config = { + **node_config, + "lightpush": False, + "store": False, + "staticnodes": [service_addr], + "portsShift": 1, + } + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + sender_config = { + **node_config, + "reliabilityEnabled": True, + "lightpushnode": service_addr, + "storenode": service_addr, + "portsShift": 2, + } + sender_result = WrapperManager.create_and_start( + config=sender_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings() + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + sent = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_AFTER_STORE_TIMEOUT_S, + ) + assert sent is not None, ( + f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " + f"after propagation. Collected events: {sender_collector.events}" + ) + assert sent["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + assert_event_invariants(sender_collector, request_id)