From 4d4bd535c4664ac5c3cb00d656476b09fdb434f0 Mon Sep 17 00:00:00 2001 From: Egor Rachkovskii <32649334+at0m1x19@users.noreply.github.com> Date: Tue, 28 Apr 2026 17:44:27 +0100 Subject: [PATCH 1/4] Improve `wait_for_event` loop logic and add `assert_event_invariants` helper (#178) - Refactored the `wait_for_event` function for clarity and to ensure proper deadline handling within the loop. - Introduced `assert_event_invariants` to validate per-request event properties, enforcing invariants like correct `requestId`, no duplicate terminal events, and proper timing between `Propagated` and `Sent`. - Added tests for `assert_event_invariants` enforcement in `S14` and `S15` lightpush scenarios. Co-authored-by: Egor Rachkovskii --- src/node/wrapper_helpers.py | 41 ++++++- tests/wrappers_tests/test_send_e2e.py | 167 +++++++++++++++++++++++++- 2 files changed, 203 insertions(+), 5 deletions(-) diff --git a/src/node/wrapper_helpers.py b/src/node/wrapper_helpers.py index c53cb038e..813684165 100644 --- a/src/node/wrapper_helpers.py +++ b/src/node/wrapper_helpers.py @@ -67,14 +67,14 @@ def wait_for_event( """ deadline = time.monotonic() + timeout_s - while time.monotonic() < deadline: + while True: for event in collector.get_events_for_request(request_id): if predicate(event): return event + if time.monotonic() >= deadline: + return None time.sleep(poll_interval_s) - return None - def wait_for_propagated(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]: return wait_for_event(collector, request_id, is_propagated_event, timeout_s) @@ -88,6 +88,41 @@ def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float) return wait_for_event(collector, request_id, is_error_event, timeout_s) +TERMINAL_EVENT_TYPES = {EVENT_PROPAGATED, EVENT_SENT, EVENT_ERROR} + + +def assert_event_invariants(collector: EventCollector, request_id: str) -> None: + """Check per-request event invariants (issue #163): + - All events carry the correct requestId. + - No duplicate terminal events (Propagated, Sent, Error). + - Sent never appears before Propagated. + """ + events = collector.get_events_for_request(request_id) + assert events, f"No events found for request {request_id}" + + counts: dict[str, int] = {} + first_index: dict[str, int] = {} + for i, event in enumerate(events): + assert event.get("requestId") == request_id, ( + f"Event at index {i} has wrong requestId: " f"expected {request_id!r}, got {event.get('requestId')!r}" + ) + event_type = event.get("eventType", "") + if event_type in TERMINAL_EVENT_TYPES: + counts[event_type] = counts.get(event_type, 0) + 1 + if event_type not in first_index: + first_index[event_type] = i + + for event_type, count in counts.items(): + assert count == 1, f"Duplicate {event_type} events for request {request_id}: " f"got {count}, expected 1. Events: {events}" + + if EVENT_SENT in first_index and EVENT_PROPAGATED in first_index: + assert first_index[EVENT_PROPAGATED] < first_index[EVENT_SENT], ( + f"message_sent (index {first_index[EVENT_SENT]}) arrived before " + f"message_propagated (index {first_index[EVENT_PROPAGATED]}) " + f"for request {request_id}. Events: {events}" + ) + + def get_node_multiaddr(node) -> str: """Return the first TCP multiaddr (with peer-id) from a WrapperManager node.""" result = node.get_node_info_raw("MyMultiaddresses") diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py index 18a8da5aa..147caa367 100644 --- a/tests/wrappers_tests/test_send_e2e.py +++ b/tests/wrappers_tests/test_send_e2e.py @@ -1,4 +1,4 @@ -from time import time_ns +import base64 import pytest from src.env_vars import NODE_2 @@ -9,6 +9,7 @@ from src.node.waku_node import WakuNode from src.node.wrappers_manager import WrapperManager from src.node.wrapper_helpers import ( EventCollector, + assert_event_invariants, create_message_bindings, get_node_multiaddr, wait_for_propagated, @@ -16,14 +17,17 @@ from src.node.wrapper_helpers import ( wait_for_error, ) from src.steps.store import StepsStore +from tests.wrappers_tests.conftest import build_node_config logger = get_custom_logger(__name__) - PROPAGATED_TIMEOUT_S = 30.0 SENT_TIMEOUT_S = 10.0 NO_SENT_OBSERVATION_S = 5.0 SENT_AFTER_STORE_TIMEOUT_S = 60.0 +OVERSIZED_PAYLOAD_BYTES = 200 * 1024 +RECOVERY_TIMEOUT_S = 45.0 +SERVICE_DOWN_SETTLE_S = 3.0 # MaxTimeInCache from send_service.nim. MAX_TIME_IN_CACHE_S = 60.0 @@ -371,3 +375,162 @@ class TestS06CoreSenderRelayOnly(StepsCommon): sent = wait_for_sent(sender_collector, request_id, timeout_s=0) assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}" + + +class TestS14LightpushNonRetryableError(StepsCommon): + """ + S14 — Lightpush non-retryable error via oversized message. + Edge sender publishes a message exceeding DefaultMaxWakuMessageSize (150KiB) + through a lightpush service node. The server validates message size and + returns INVALID_MESSAGE (420), a non-retryable error. + Expected: send() returns Ok(RequestId), then message_error event. + """ + + def test_s14_oversized_message_triggers_error(self): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + + service_config = build_node_config(relay=True, lightpush=True, **common) + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_multiaddr = get_node_multiaddr(service) + + edge_config = build_node_config( + mode="Edge", + relay=False, + lightpushnode=service_multiaddr, + staticnodes=[service_multiaddr], + **common, + ) + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + oversized_payload = base64.b64encode(b"x" * OVERSIZED_PAYLOAD_BYTES).decode() + message = create_message_bindings( + payload=oversized_payload, + contentTopic="/test/1/s14-oversized/proto", + ) + + send_result = edge_sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + error = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert error is not None, ( + f"No message_error event within {PROPAGATED_TIMEOUT_S}s " + f"after sending oversized message. " + f"Collected events: {sender_collector.events}" + ) + assert error["requestId"] == request_id + logger.info(f"S14 received error event: {error}") + + error_msg = error.get("error", "").lower() + assert "size exceeded" in error_msg, f"Error message doesn't indicate size violation: {error}" + + propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert propagated is None, f"Unexpected message_propagated for an invalid message: {propagated}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS15LightpushRetryableErrorRecovery(StepsCommon): + """ + S15 — Lightpush retryable error + recovery. + Edge sender publishes via a lightpush service node that has NO relay peers. + The service accepts the lightpush request but returns NO_PEERS_TO_RELAY — + a retryable error (explicitly listed in the S15 spec). The message enters + the retry loop. A relay peer then joins the service node, and the next + retry succeeds. + Expected: send() returns Ok(RequestId), then eventually Propagated. + """ + + def test_s15_lightpush_retryable_error_then_recovery(self): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + + service_config = build_node_config(relay=True, lightpush=True, **common) + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_multiaddr = get_node_multiaddr(service) + + edge_config = build_node_config( + mode="Edge", + relay=False, + lightpushnode=service_multiaddr, + staticnodes=[service_multiaddr], + **common, + ) + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + msg = create_message_bindings( + payload=to_base64("S15 retryable error recovery"), + contentTopic="/test/1/s15-recovery/proto", + ) + send_result = edge_sender.send_message(message=msg) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + delay(SERVICE_DOWN_SETTLE_S) + + early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert early_propagated is None, ( + f"message_propagated arrived before relay peer joined — " f"retryable error path was not exercised: {early_propagated}" + ) + + relay_config = build_node_config( + relay=True, + staticnodes=[service_multiaddr], + **common, + ) + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=RECOVERY_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated within {RECOVERY_TIMEOUT_S}s " + f"after relay peer joined. " + f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error after recovery: {error}" + + assert_event_invariants(sender_collector, request_id) From df073119ba8a30a20e4c3a15bee94272cd03f6fe Mon Sep 17 00:00:00 2001 From: Egor Rachkovskii <32649334+at0m1x19@users.noreply.github.com> Date: Wed, 29 Apr 2026 13:21:56 +0100 Subject: [PATCH 2/4] Add S07 and S10 send API tests with event invariants helper (#176) * Add `assert_event_invariants` to enforce per-request event constraints and integrate into relevant tests * Integrate `assert_event_invariants` into edge and store tests * Remove redundant comments from `test_send_e2e.py` --------- Co-authored-by: Egor Rachkovskii --- tests/wrappers_tests/test_send_e2e.py | 170 +++++++++++++++++++++++++- 1 file changed, 168 insertions(+), 2 deletions(-) diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py index 147caa367..d64d3db15 100644 --- a/tests/wrappers_tests/test_send_e2e.py +++ b/tests/wrappers_tests/test_send_e2e.py @@ -101,6 +101,8 @@ class TestSendBeforeRelay(StepsStore): f"from a store-enabled relay peer. Collected events: {sender_collector.events}" ) + assert_event_invariants(sender_collector, request_id) + def test_s17_no_sent_event_when_relay_has_no_store(self, node_config): """ S17 negative: relay peerstore=false, there shouldn't be a Sent event,. @@ -162,6 +164,8 @@ class TestSendBeforeRelay(StepsStore): f"Collected events: {sender_collector.events}" ) + assert_event_invariants(sender_collector, request_id) + def test_s19_store_peer_appears_after_propagation(self, node_config): """ S19: a store peer comes online later. @@ -173,8 +177,6 @@ class TestSendBeforeRelay(StepsStore): """ sender_collector = EventCollector() - sender_collector = EventCollector() - node_config.update( { "relay": True, @@ -258,6 +260,8 @@ class TestSendBeforeRelay(StepsStore): ascending="true", ) + assert_event_invariants(sender_collector, request_id) + def test_s21_error_when_retry_window_expires(self, node_config): """ S21: delivery retry window expires before any valid path recovers. @@ -309,6 +313,166 @@ class TestSendBeforeRelay(StepsStore): f"Full event: {error_event}" ) + assert_event_invariants(sender_collector, request_id) + + +class TestS07CoreSenderRelayAndStore(StepsCommon): + """ + S07 — Core sender with relay peers and store peer, reliability enabled. + Sender relays message to a store-capable peer; delivery service validates + the message reached the store via p2p reliability check. + Expected: Propagated, then Sent. + """ + + def test_s07_relay_propagation_with_store_validation(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "reliabilityEnabled": True, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsshift": 1, + "store": True, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start store peer: {peer_result.err()}" + + with peer_result.ok_value: + message = create_message_bindings( + payload=to_base64("S07 relay+store test payload"), + contentTopic="/test/1/s07-relay-store/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + sent = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_TIMEOUT_S, + ) + assert sent is not None, ( + f"No message_sent event within {SENT_TIMEOUT_S}s after propagation. " f"Collected events: {sender_collector.events}" + ) + assert sent["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS10EdgeSenderLightpushOnly(StepsCommon): + """ + S10 — Edge sender with lightpush path only, no store peer. + Edge sender has no local relay; it publishes via a lightpush service node. + Expected: Propagated only (no Sent, no Error). + """ + + def test_s10_edge_lightpush_propagation(self, node_config): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + + service_config = build_node_config(relay=True, lightpush=True, **common) + + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start service node: {service_result.err()}" + + with service_result.ok_value as service_node: + service_multiaddr = get_node_multiaddr(service_node) + + relay_config = build_node_config( + relay=True, + staticnodes=[service_multiaddr], + **common, + ) + + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + edge_config = build_node_config( + mode="Edge", + relay=False, + lightpushnode=service_multiaddr, + staticnodes=[service_multiaddr], + **common, + ) + + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + message = create_message_bindings( + payload=to_base64("S10 edge lightpush test payload"), + contentTopic="/test/1/s10-edge-lightpush/proto", + ) + + send_result = edge_sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + sent = wait_for_sent(sender_collector, request_id, timeout_s=NO_SENT_OBSERVATION_S) + assert sent is None, f"Unexpected message_sent event (no store peer): {sent}" + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + assert_event_invariants(sender_collector, request_id) + class TestS06CoreSenderRelayOnly(StepsCommon): """ @@ -376,6 +540,8 @@ class TestS06CoreSenderRelayOnly(StepsCommon): sent = wait_for_sent(sender_collector, request_id, timeout_s=0) assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}" + assert_event_invariants(sender_collector, request_id) + class TestS14LightpushNonRetryableError(StepsCommon): """ From a85223a79c9ec55dfc04b22d7ef0ab6fa380fe8f Mon Sep 17 00:00:00 2001 From: Egor Rachkovskii <32649334+at0m1x19@users.noreply.github.com> Date: Thu, 30 Apr 2026 16:15:50 +0100 Subject: [PATCH 3/4] Add S02/S12 send API tests and PR CI pipeline (#174) * Add tests for auto-subscribe on first send and isolated sender with no peers * Add PR CI workflow with tiered test strategy - pr_tests.yml: build job with cache, wrapper-tests, smoke-tests, and label-triggered full-suite - test_common.yml: add deploy_allure/send_discord inputs so PR runs skip reporting side effects - Add docker_required marker to S19 (needs Docker, excluded from wrapper-only CI job) - Register docker_required marker in pytest.ini * Document PR CI test workflows in README * Refine PR CI test strategy: - Exclude `docker_required` tests from smoke set in `pr_tests.yml`. - Add `wait_for_connected` helper for connection state checks. - Update S19 test to dynamically create and clean up the store node setup. - General simplifications and improved test stability. * Add `wait_for_connected` assertion to ensure sender connection state before propagation test * Refine tests and CI workflows: - Replace `ERROR_TIMEOUT_S` with `ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S` in `test_send_e2e.py`. - Adjust timeout assertion for better clarity and accuracy. - Update `pr_tests.yml` to add retries (`--reruns`) and ignore wrapper tests in smoke tests. - Change `test_common.yml` default Discord reporting to `false`. * Normalize `portsshift` to `portsShift` in `test_send_e2e.py` configuration definitions. --------- Co-authored-by: Egor Rachkovskii --- .github/workflows/pr_tests.yml | 235 ++++++++++++++++++++++++++ .github/workflows/test_common.yml | 14 +- README.md | 10 ++ pytest.ini | 1 + src/node/wrapper_helpers.py | 21 ++- tests/wrappers_tests/test_send_e2e.py | 206 +++++++++++++++++----- 6 files changed, 436 insertions(+), 51 deletions(-) create mode 100644 .github/workflows/pr_tests.yml diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml new file mode 100644 index 000000000..69bb5a399 --- /dev/null +++ b/.github/workflows/pr_tests.yml @@ -0,0 +1,235 @@ +name: PR Tests + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +on: + pull_request: + types: [opened, synchronize, reopened, labeled] + paths: + - "src/**" + - "tests/**" + - "vendor/**" + - "requirements.txt" + - "pytest.ini" + - ".github/workflows/pr_tests.yml" + push: + branches: [master] + paths: + - "vendor/**" + workflow_dispatch: + inputs: + run_full_suite: + description: "Run the full test suite (18 shards)" + required: false + default: false + type: boolean + +jobs: + build: + name: Build liblogosdelivery + runs-on: ubuntu-latest + timeout-minutes: 45 + if: >- + github.event.action != 'labeled' || + github.event.label.name == 'full-test' + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Compute cache key + id: cache-key + run: | + BINDINGS_HASH=$(git rev-parse HEAD:vendor/logos-delivery-python-bindings) + DELIVERY_HASH=$(git -C vendor/logos-delivery-python-bindings rev-parse HEAD:vendor/logos-delivery) + echo "key=liblogosdelivery-${{ runner.os }}-nim2.2.4-${BINDINGS_HASH}-${DELIVERY_HASH}" >> "$GITHUB_OUTPUT" + + - name: Cache liblogosdelivery.so + id: cache-lib + uses: actions/cache@v4 + with: + path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so + key: ${{ steps.cache-key.outputs.key }} + + - name: Remove unwanted software + if: steps.cache-lib.outputs.cache-hit != 'true' + uses: ./.github/actions/prune-vm + + - name: Install system deps + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + sudo apt-get update + sudo apt-get install -y \ + util-linux \ + iproute2 \ + sudo \ + ca-certificates \ + curl \ + make \ + gcc \ + g++ + + - name: Install Nim 2.2.4 + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + set -euo pipefail + curl https://nim-lang.org/choosenim/init.sh -sSf | sh -s -- -y + echo "$HOME/.nimble/bin" >> "$GITHUB_PATH" + export PATH="$HOME/.nimble/bin:$PATH" + choosenim 2.2.4 + nim --version + nimble --version + + - name: Build liblogosdelivery.so + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + set -euo pipefail + + export PATH="$HOME/.nimble/bin:$PATH" + + BINDINGS_DIR="$(pwd)/vendor/logos-delivery-python-bindings" + DELIVERY_DIR="$BINDINGS_DIR/vendor/logos-delivery" + + mkdir -p "$BINDINGS_DIR/lib" + + cd "$DELIVERY_DIR" + + ln -sf waku.nimble waku.nims + + nimble install -y + + make setup + + make liblogosdelivery + + SO_PATH="$(find . -type f -name 'liblogosdelivery.so' | head -n 1)" + + if [ -z "$SO_PATH" ]; then + echo "liblogosdelivery.so was not built" + exit 1 + fi + + cp "$SO_PATH" "$BINDINGS_DIR/lib/liblogosdelivery.so" + + echo "Built library:" + ls -l "$BINDINGS_DIR/lib/liblogosdelivery.so" + + - name: Upload library artifact + uses: actions/upload-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so + retention-days: 1 + + wrapper-tests: + name: Wrapper Tests + runs-on: ubuntu-latest + needs: [build] + timeout-minutes: 15 + if: >- + github.event_name != 'push' && + (github.event.action != 'labeled' || github.event.label.name == 'full-test') + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - uses: actions/setup-python@v4 + with: + python-version: "3.12" + cache: "pip" + + - run: pip install -r requirements.txt + + - name: Download liblogosdelivery.so + uses: actions/download-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/ + + - name: Run wrapper tests + env: + PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku + run: | + pytest tests/wrappers_tests/ \ + -m "not docker_required" \ + --reruns 2 \ + --junit-xml=wrapper-results.xml + + - name: Test Report + if: always() + uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2 + with: + name: Wrapper Test Results + path: wrapper-results.xml + reporter: java-junit + use-actions-summary: "true" + + smoke-tests: + name: Smoke Tests + runs-on: ubuntu-latest + needs: [build] + timeout-minutes: 30 + if: >- + github.event_name != 'push' && + (github.event.action != 'labeled' || github.event.label.name == 'full-test') + env: + NODE_1: "wakuorg/nwaku:latest" + NODE_2: "wakuorg/nwaku:latest" + ADDITIONAL_NODES: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest" + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - uses: actions/setup-python@v4 + with: + python-version: "3.12" + cache: "pip" + + - run: pip install -r requirements.txt + + - name: Download liblogosdelivery.so + uses: actions/download-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/ + + - name: Run smoke tests + env: + PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku + run: | + pytest -m "smoke and not docker_required" \ + --ignore=vendor/logos-delivery-python-bindings/tests \ + --ignore=tests/wrappers_tests \ + --reruns 1 \ + -n 4 \ + --dist=loadgroup \ + --junit-xml=smoke-results.xml + + - name: Test Report + if: always() + uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2 + with: + name: Smoke Test Results + path: smoke-results.xml + reporter: java-junit + use-actions-summary: "true" + + full-suite: + name: Full Suite + if: >- + github.event_name != 'push' && + (contains(github.event.pull_request.labels.*.name, 'full-test') || + github.event.inputs.run_full_suite == 'true') + uses: ./.github/workflows/test_common.yml + secrets: inherit + with: + node1: "wakuorg/nwaku:latest" + node2: "wakuorg/nwaku:latest" + additional_nodes: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest" + caller: "pr" + deploy_allure: false + send_discord: false diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index 2b276404a..be7b1f7b8 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -22,6 +22,16 @@ on: required: false description: "Workflow caller. Used in reporting" type: string + deploy_allure: + required: false + description: "Deploy allure report to gh-pages" + type: boolean + default: true + send_discord: + required: false + description: "Send test results to Discord" + type: boolean + default: false env: FORCE_COLOR: "1" @@ -206,7 +216,7 @@ jobs: aggregate-reports: runs-on: ubuntu-latest needs: [tests] - if: always() + if: always() && inputs.deploy_allure steps: - name: Download all allure results @@ -348,7 +358,7 @@ jobs: - name: Send report to Discord uses: rjstone/discord-webhook-notify@v1 - if: always() && env.CALLER != 'manual' + if: always() && env.CALLER != 'manual' && inputs.send_discord with: severity: ${{ env.TESTS_RESULT == 'success' && 'info' || 'error' }} username: ${{ github.workflow }} diff --git a/README.md b/README.md index d731ab6a1..1b6903fbf 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,16 @@ To launch it manually: 2. Click **► Run workflow**. 3. Pick the branch you want to test (defaults to `master`) and press **Run workflow**. +### PR tests + +Every push to a pull request triggers **pr\_tests.yml** which runs: + +1. **Build** — compiles `liblogosdelivery.so` (cached by submodule commit hash). +2. **Wrapper tests** — all tests under `tests/wrappers_tests/` that don't require Docker (~5 min). +3. **Smoke tests** — `pytest -m smoke` with Docker nodes (~10 min). + +To run the **full test suite** (18 shards, same as daily) on a PR, add the label **`full-test`** to the pull request. The full suite will start automatically. + ### On‑demand matrix against custom *logos-messaging-nim* versions Use **interop\_tests.yml** when you need to test a PR or a historical image: diff --git a/pytest.ini b/pytest.ini index 3a13b3b9c..2406bfb06 100644 --- a/pytest.ini +++ b/pytest.ini @@ -12,3 +12,4 @@ log_file_format = %(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s timeout = 300 markers = smoke: marks tests as smoke test (deselect with '-m "not smoke"') + docker_required: test requires Docker nodes (WakuNode) diff --git a/src/node/wrapper_helpers.py b/src/node/wrapper_helpers.py index 813684165..ecce2e50f 100644 --- a/src/node/wrapper_helpers.py +++ b/src/node/wrapper_helpers.py @@ -12,10 +12,6 @@ EVENT_PROPAGATED = "message_propagated" EVENT_SENT = "message_sent" EVENT_ERROR = "message_error" -# --------------------------------------------------------------------------- -# Event collection -# --------------------------------------------------------------------------- - class EventCollector: """Thread-safe collector for async node events. @@ -88,6 +84,22 @@ def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float) return wait_for_event(collector, request_id, is_error_event, timeout_s) +def wait_for_connected( + collector: EventCollector, + timeout_s: float = 10.0, + poll_interval_s: float = 0.3, +) -> Optional[dict]: + """Wait until a connection_status_change event with PartiallyConnected or Connected arrives.""" + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + with collector._lock: + for event in collector.events: + if event.get("eventType") == "connection_status_change" and event.get("connectionStatus") in ("PartiallyConnected", "Connected"): + return event + time.sleep(poll_interval_s) + return None + + TERMINAL_EVENT_TYPES = {EVENT_PROPAGATED, EVENT_SENT, EVENT_ERROR} @@ -136,7 +148,6 @@ def get_node_multiaddr(node) -> str: return addr -# This API for creating messages for send.API not the REST calls def create_message_bindings(**overrides) -> dict: envelope = { "contentTopic": DEFAULT_CONTENT_TOPIC, diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py index d64d3db15..cd46d954e 100644 --- a/tests/wrappers_tests/test_send_e2e.py +++ b/tests/wrappers_tests/test_send_e2e.py @@ -1,17 +1,16 @@ import base64 import pytest -from src.env_vars import NODE_2 from src.steps.common import StepsCommon from src.libs.common import delay, to_base64 from src.libs.custom_logger import get_custom_logger -from src.node.waku_node import WakuNode from src.node.wrappers_manager import WrapperManager from src.node.wrapper_helpers import ( EventCollector, assert_event_invariants, create_message_bindings, get_node_multiaddr, + wait_for_connected, wait_for_propagated, wait_for_sent, wait_for_error, @@ -38,7 +37,7 @@ RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window" @pytest.mark.smoke -class TestSendBeforeRelay(StepsStore): +class TestSendBeforeRelay(StepsCommon): def test_s17_send_before_relay_peers_joins(self, node_config): """ S17: sender starts isolated, calls send() @@ -74,7 +73,7 @@ class TestSendBeforeRelay(StepsStore): relay_config = { **node_config, "staticnodes": [get_node_multiaddr(sender_node)], - "portsshift": 1, + "portsShift": 1, "store": True, } @@ -135,7 +134,7 @@ class TestSendBeforeRelay(StepsStore): relay_config = { **node_config, "staticnodes": [get_node_multiaddr(sender_node)], - "portsshift": 1, + "portsShift": 1, "store": False, } @@ -143,6 +142,8 @@ class TestSendBeforeRelay(StepsStore): assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" with relay_result.ok_value: + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + propagated_event = wait_for_propagated( collector=sender_collector, request_id=request_id, @@ -168,12 +169,11 @@ class TestSendBeforeRelay(StepsStore): def test_s19_store_peer_appears_after_propagation(self, node_config): """ - S19: a store peer comes online later. - question for Zoltan , is reliability = true mandatory for the store peer ? - what is the effect of the reliability here ? - - send() returns Ok(RequestId) immediately - - Propagated --- relay peer - - Sent when store peer is reachable + S19: store peer comes online after relay propagation succeeds. + - send() returns Ok(RequestId) + - Propagated arrives via relay peer + - No Sent while store peer is absent + - Sent arrives after store peer joins and archives the message """ sender_collector = EventCollector() @@ -194,20 +194,19 @@ class TestSendBeforeRelay(StepsStore): assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" with sender_result.ok_value as sender_node: - # relay peer relay_config = { **node_config, "staticnodes": [get_node_multiaddr(sender_node)], - "portsshift": 1, + "portsShift": 1, "store": False, - # "p2preliability": False, # commented as the option not supported } relay_result = WrapperManager.create_and_start(config=relay_config) assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" with relay_result.ok_value as relay_peer: - # send(). Must return Ok(RequestId) immediately. + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + message = create_message_bindings() send_result = sender_node.send_message(message=message) assert send_result.is_ok(), f"send() must return Ok(RequestId), got: {send_result.err()}" @@ -215,7 +214,6 @@ class TestSendBeforeRelay(StepsStore): request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" - # Propagated should arrive via the relay peer. propagated_event = wait_for_propagated( collector=sender_collector, request_id=request_id, @@ -232,33 +230,29 @@ class TestSendBeforeRelay(StepsStore): ) assert early_sent_event is None, f"MessageSentEvent arrived before any store peer was reachable. " f"Event: {early_sent_event}" - # Store peer - store_node = WakuNode(NODE_2, f"store_node") - store_node.start(relay="true", store="true", discv5_discovery="false") - store_node.set_relay_subscriptions([self.test_pubsub_topic]) - relay_multiaddr = get_node_multiaddr(relay_peer) - sender_multiaddr = get_node_multiaddr(sender_node) - store_node.add_peers([relay_multiaddr, sender_multiaddr]) - delay(3) + store_config = { + **node_config, + "staticnodes": [ + get_node_multiaddr(sender_node), + get_node_multiaddr(relay_peer), + ], + "portsShift": 2, + "store": True, + } - sent_event = wait_for_sent( - collector=sender_collector, - request_id=request_id, - timeout_s=SENT_AFTER_STORE_TIMEOUT_S, - ) + store_result = WrapperManager.create_and_start(config=store_config) + assert store_result.is_ok(), f"Failed to start store peer: {store_result.err()}" - assert sent_event is not None, ( - f"No MessageSentEvent received within {SENT_AFTER_STORE_TIMEOUT_S}s " - f"after store peer joined. Collected events: {sender_collector.events}" - ) - - self.check_published_message_is_stored( - store_node=store_node, - pubsub_topic=self.test_pubsub_topic, - messages_to_check=[message], - page_size=5, - ascending="true", - ) + with store_result.ok_value: + sent_event = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_AFTER_STORE_TIMEOUT_S, + ) + assert sent_event is not None, ( + f"No MessageSentEvent received within {SENT_AFTER_STORE_TIMEOUT_S}s " + f"after store peer joined. Collected events: {sender_collector.events}" + ) assert_event_invariants(sender_collector, request_id) @@ -349,7 +343,7 @@ class TestS07CoreSenderRelayAndStore(StepsCommon): peer_config = { **node_config, "staticnodes": [get_node_multiaddr(sender)], - "portsshift": 1, + "portsShift": 1, "store": True, } @@ -506,14 +500,16 @@ class TestS06CoreSenderRelayOnly(StepsCommon): peer_config = { **node_config, "staticnodes": [get_node_multiaddr(sender)], - "portsshift": 1, + "portsShift": 1, } peer_result = WrapperManager.create_and_start(config=peer_config) assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" with peer_result.ok_value: - message = self.create_message( + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + + message = create_message_bindings( payload=to_base64("S06 relay-only test payload"), contentTopic="/test/1/s06-relay-only/proto", ) @@ -543,6 +539,128 @@ class TestS06CoreSenderRelayOnly(StepsCommon): assert_event_invariants(sender_collector, request_id) +class TestS02AutoSubscribeOnFirstSend(StepsCommon): + """ + S02 — Auto-subscribe on first send. + Sender never calls subscribe_content_topic() before send(). + The send API must auto-subscribe to the content topic used in the message. + Expected: send() returns Ok(RequestId), message_propagated arrives. + """ + + def test_s02_send_without_explicit_subscribe(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsShift": 1, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" + + with peer_result.ok_value: + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + + message = create_message_bindings( + payload=to_base64("S02 auto-subscribe test payload"), + contentTopic="/test/1/s02-auto-subscribe/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + +class TestS12IsolatedSenderNoPeers(StepsCommon): + """ + S12 — Isolated sender, no peers. + Sender has relay enabled but zero relay peers and zero lightpush peers. + Expected: send() returns Ok(RequestId), but eventually a message_error + event arrives (no route to propagate). + """ + + def test_s12_send_with_no_peers_produces_error(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings( + payload=to_base64("S12 isolated sender payload"), + contentTopic="/test/1/s12-isolated/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + error = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S, + ) + assert error is not None, ( + f"No message_error event within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s " + f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack) for isolated sender. " + f"Collected events: {sender_collector.events}" + ) + assert error["requestId"] == request_id + + propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert propagated is None, f"Unexpected message_propagated event for isolated sender: {propagated}" + + class TestS14LightpushNonRetryableError(StepsCommon): """ S14 — Lightpush non-retryable error via oversized message. From f219c807d5587d70794d8fde937574d9de2a8172 Mon Sep 17 00:00:00 2001 From: Egor Rachkovskii <32649334+at0m1x19@users.noreply.github.com> Date: Thu, 30 Apr 2026 17:05:06 +0100 Subject: [PATCH 4/4] Add relay-to-lightpush fallback integration tests (S08/S09) (#180) Co-authored-by: Egor Rachkovskii --- tests/wrappers_tests/test_send_e2e.py | 163 ++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py index cd46d954e..d57a85c2b 100644 --- a/tests/wrappers_tests/test_send_e2e.py +++ b/tests/wrappers_tests/test_send_e2e.py @@ -818,3 +818,166 @@ class TestS15LightpushRetryableErrorRecovery(StepsCommon): assert error is None, f"Unexpected message_error after recovery: {error}" assert_event_invariants(sender_collector, request_id) + + +class TestRelayToLightpushFallback(StepsCommon): + """S08/S09 — Relay-to-lightpush fallback. + + Sender has relay enabled but zero gossipsub relay peers. + A lightpush peer is reachable via lightpushnode (no staticnodes). + Relay fails with NO_PEERS_TO_RELAY, lightpush fallback succeeds + in the same processing pass. + + Topology: + [Service] relay=True, lightpush=True + [RelayPeer] relay=True, staticnodes=[service] (gives service gossipsub mesh) + [Sender] relay=True, lightpush=True, lightpushnode=service + (no staticnodes → zero gossipsub relay peers → fallback) + """ + + def test_s08_relay_fallback_to_lightpush(self, node_config): + """S08: no store peer → Propagated only.""" + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "lightpush": True, + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + service_result = WrapperManager.create_and_start(config=node_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_addr = get_node_multiaddr(service) + + relay_config = { + **node_config, + "lightpush": False, + "staticnodes": [service_addr], + "portsShift": 1, + } + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + sender_config = { + **node_config, + "lightpushnode": service_addr, + "portsShift": 2, + } + sender_result = WrapperManager.create_and_start( + config=sender_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings() + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + sent = wait_for_sent(sender_collector, request_id, timeout_s=0) + assert sent is None, f"Unexpected message_sent event (no store peer): {sent}" + + assert_event_invariants(sender_collector, request_id) + + def test_s09_relay_fallback_to_lightpush_with_store_validation(self, node_config): + """S09: S08 + store peer + reliability → Propagated, then Sent.""" + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "lightpush": True, + "store": True, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + service_result = WrapperManager.create_and_start(config=node_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_addr = get_node_multiaddr(service) + + relay_config = { + **node_config, + "lightpush": False, + "store": False, + "staticnodes": [service_addr], + "portsShift": 1, + } + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + sender_config = { + **node_config, + "reliabilityEnabled": True, + "lightpushnode": service_addr, + "storenode": service_addr, + "portsShift": 2, + } + sender_result = WrapperManager.create_and_start( + config=sender_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings() + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + sent = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_AFTER_STORE_TIMEOUT_S, + ) + assert sent is not None, ( + f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " + f"after propagation. Collected events: {sender_collector.events}" + ) + assert sent["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + assert_event_invariants(sender_collector, request_id)