From a85223a79c9ec55dfc04b22d7ef0ab6fa380fe8f Mon Sep 17 00:00:00 2001 From: Egor Rachkovskii <32649334+at0m1x19@users.noreply.github.com> Date: Thu, 30 Apr 2026 16:15:50 +0100 Subject: [PATCH] Add S02/S12 send API tests and PR CI pipeline (#174) * Add tests for auto-subscribe on first send and isolated sender with no peers * Add PR CI workflow with tiered test strategy - pr_tests.yml: build job with cache, wrapper-tests, smoke-tests, and label-triggered full-suite - test_common.yml: add deploy_allure/send_discord inputs so PR runs skip reporting side effects - Add docker_required marker to S19 (needs Docker, excluded from wrapper-only CI job) - Register docker_required marker in pytest.ini * Document PR CI test workflows in README * Refine PR CI test strategy: - Exclude `docker_required` tests from smoke set in `pr_tests.yml`. - Add `wait_for_connected` helper for connection state checks. - Update S19 test to dynamically create and clean up the store node setup. - General simplifications and improved test stability. * Add `wait_for_connected` assertion to ensure sender connection state before propagation test * Refine tests and CI workflows: - Replace `ERROR_TIMEOUT_S` with `ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S` in `test_send_e2e.py`. - Adjust timeout assertion for better clarity and accuracy. - Update `pr_tests.yml` to add retries (`--reruns`) and ignore wrapper tests in smoke tests. - Change `test_common.yml` default Discord reporting to `false`. * Normalize `portsshift` to `portsShift` in `test_send_e2e.py` configuration definitions. --------- Co-authored-by: Egor Rachkovskii --- .github/workflows/pr_tests.yml | 235 ++++++++++++++++++++++++++ .github/workflows/test_common.yml | 14 +- README.md | 10 ++ pytest.ini | 1 + src/node/wrapper_helpers.py | 21 ++- tests/wrappers_tests/test_send_e2e.py | 206 +++++++++++++++++----- 6 files changed, 436 insertions(+), 51 deletions(-) create mode 100644 .github/workflows/pr_tests.yml diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml new file mode 100644 index 000000000..69bb5a399 --- /dev/null +++ b/.github/workflows/pr_tests.yml @@ -0,0 +1,235 @@ +name: PR Tests + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +on: + pull_request: + types: [opened, synchronize, reopened, labeled] + paths: + - "src/**" + - "tests/**" + - "vendor/**" + - "requirements.txt" + - "pytest.ini" + - ".github/workflows/pr_tests.yml" + push: + branches: [master] + paths: + - "vendor/**" + workflow_dispatch: + inputs: + run_full_suite: + description: "Run the full test suite (18 shards)" + required: false + default: false + type: boolean + +jobs: + build: + name: Build liblogosdelivery + runs-on: ubuntu-latest + timeout-minutes: 45 + if: >- + github.event.action != 'labeled' || + github.event.label.name == 'full-test' + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Compute cache key + id: cache-key + run: | + BINDINGS_HASH=$(git rev-parse HEAD:vendor/logos-delivery-python-bindings) + DELIVERY_HASH=$(git -C vendor/logos-delivery-python-bindings rev-parse HEAD:vendor/logos-delivery) + echo "key=liblogosdelivery-${{ runner.os }}-nim2.2.4-${BINDINGS_HASH}-${DELIVERY_HASH}" >> "$GITHUB_OUTPUT" + + - name: Cache liblogosdelivery.so + id: cache-lib + uses: actions/cache@v4 + with: + path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so + key: ${{ steps.cache-key.outputs.key }} + + - name: Remove unwanted software + if: steps.cache-lib.outputs.cache-hit != 'true' + uses: ./.github/actions/prune-vm + + - name: Install system deps + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + sudo apt-get update + sudo apt-get install -y \ + util-linux \ + iproute2 \ + sudo \ + ca-certificates \ + curl \ + make \ + gcc \ + g++ + + - name: Install Nim 2.2.4 + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + set -euo pipefail + curl https://nim-lang.org/choosenim/init.sh -sSf | sh -s -- -y + echo "$HOME/.nimble/bin" >> "$GITHUB_PATH" + export PATH="$HOME/.nimble/bin:$PATH" + choosenim 2.2.4 + nim --version + nimble --version + + - name: Build liblogosdelivery.so + if: steps.cache-lib.outputs.cache-hit != 'true' + run: | + set -euo pipefail + + export PATH="$HOME/.nimble/bin:$PATH" + + BINDINGS_DIR="$(pwd)/vendor/logos-delivery-python-bindings" + DELIVERY_DIR="$BINDINGS_DIR/vendor/logos-delivery" + + mkdir -p "$BINDINGS_DIR/lib" + + cd "$DELIVERY_DIR" + + ln -sf waku.nimble waku.nims + + nimble install -y + + make setup + + make liblogosdelivery + + SO_PATH="$(find . -type f -name 'liblogosdelivery.so' | head -n 1)" + + if [ -z "$SO_PATH" ]; then + echo "liblogosdelivery.so was not built" + exit 1 + fi + + cp "$SO_PATH" "$BINDINGS_DIR/lib/liblogosdelivery.so" + + echo "Built library:" + ls -l "$BINDINGS_DIR/lib/liblogosdelivery.so" + + - name: Upload library artifact + uses: actions/upload-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so + retention-days: 1 + + wrapper-tests: + name: Wrapper Tests + runs-on: ubuntu-latest + needs: [build] + timeout-minutes: 15 + if: >- + github.event_name != 'push' && + (github.event.action != 'labeled' || github.event.label.name == 'full-test') + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - uses: actions/setup-python@v4 + with: + python-version: "3.12" + cache: "pip" + + - run: pip install -r requirements.txt + + - name: Download liblogosdelivery.so + uses: actions/download-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/ + + - name: Run wrapper tests + env: + PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku + run: | + pytest tests/wrappers_tests/ \ + -m "not docker_required" \ + --reruns 2 \ + --junit-xml=wrapper-results.xml + + - name: Test Report + if: always() + uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2 + with: + name: Wrapper Test Results + path: wrapper-results.xml + reporter: java-junit + use-actions-summary: "true" + + smoke-tests: + name: Smoke Tests + runs-on: ubuntu-latest + needs: [build] + timeout-minutes: 30 + if: >- + github.event_name != 'push' && + (github.event.action != 'labeled' || github.event.label.name == 'full-test') + env: + NODE_1: "wakuorg/nwaku:latest" + NODE_2: "wakuorg/nwaku:latest" + ADDITIONAL_NODES: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest" + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - uses: actions/setup-python@v4 + with: + python-version: "3.12" + cache: "pip" + + - run: pip install -r requirements.txt + + - name: Download liblogosdelivery.so + uses: actions/download-artifact@v4 + with: + name: liblogosdelivery + path: vendor/logos-delivery-python-bindings/lib/ + + - name: Run smoke tests + env: + PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku + run: | + pytest -m "smoke and not docker_required" \ + --ignore=vendor/logos-delivery-python-bindings/tests \ + --ignore=tests/wrappers_tests \ + --reruns 1 \ + -n 4 \ + --dist=loadgroup \ + --junit-xml=smoke-results.xml + + - name: Test Report + if: always() + uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2 + with: + name: Smoke Test Results + path: smoke-results.xml + reporter: java-junit + use-actions-summary: "true" + + full-suite: + name: Full Suite + if: >- + github.event_name != 'push' && + (contains(github.event.pull_request.labels.*.name, 'full-test') || + github.event.inputs.run_full_suite == 'true') + uses: ./.github/workflows/test_common.yml + secrets: inherit + with: + node1: "wakuorg/nwaku:latest" + node2: "wakuorg/nwaku:latest" + additional_nodes: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest" + caller: "pr" + deploy_allure: false + send_discord: false diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index 2b276404a..be7b1f7b8 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -22,6 +22,16 @@ on: required: false description: "Workflow caller. Used in reporting" type: string + deploy_allure: + required: false + description: "Deploy allure report to gh-pages" + type: boolean + default: true + send_discord: + required: false + description: "Send test results to Discord" + type: boolean + default: false env: FORCE_COLOR: "1" @@ -206,7 +216,7 @@ jobs: aggregate-reports: runs-on: ubuntu-latest needs: [tests] - if: always() + if: always() && inputs.deploy_allure steps: - name: Download all allure results @@ -348,7 +358,7 @@ jobs: - name: Send report to Discord uses: rjstone/discord-webhook-notify@v1 - if: always() && env.CALLER != 'manual' + if: always() && env.CALLER != 'manual' && inputs.send_discord with: severity: ${{ env.TESTS_RESULT == 'success' && 'info' || 'error' }} username: ${{ github.workflow }} diff --git a/README.md b/README.md index d731ab6a1..1b6903fbf 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,16 @@ To launch it manually: 2. Click **► Run workflow**. 3. Pick the branch you want to test (defaults to `master`) and press **Run workflow**. +### PR tests + +Every push to a pull request triggers **pr\_tests.yml** which runs: + +1. **Build** — compiles `liblogosdelivery.so` (cached by submodule commit hash). +2. **Wrapper tests** — all tests under `tests/wrappers_tests/` that don't require Docker (~5 min). +3. **Smoke tests** — `pytest -m smoke` with Docker nodes (~10 min). + +To run the **full test suite** (18 shards, same as daily) on a PR, add the label **`full-test`** to the pull request. The full suite will start automatically. + ### On‑demand matrix against custom *logos-messaging-nim* versions Use **interop\_tests.yml** when you need to test a PR or a historical image: diff --git a/pytest.ini b/pytest.ini index 3a13b3b9c..2406bfb06 100644 --- a/pytest.ini +++ b/pytest.ini @@ -12,3 +12,4 @@ log_file_format = %(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s timeout = 300 markers = smoke: marks tests as smoke test (deselect with '-m "not smoke"') + docker_required: test requires Docker nodes (WakuNode) diff --git a/src/node/wrapper_helpers.py b/src/node/wrapper_helpers.py index 813684165..ecce2e50f 100644 --- a/src/node/wrapper_helpers.py +++ b/src/node/wrapper_helpers.py @@ -12,10 +12,6 @@ EVENT_PROPAGATED = "message_propagated" EVENT_SENT = "message_sent" EVENT_ERROR = "message_error" -# --------------------------------------------------------------------------- -# Event collection -# --------------------------------------------------------------------------- - class EventCollector: """Thread-safe collector for async node events. @@ -88,6 +84,22 @@ def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float) return wait_for_event(collector, request_id, is_error_event, timeout_s) +def wait_for_connected( + collector: EventCollector, + timeout_s: float = 10.0, + poll_interval_s: float = 0.3, +) -> Optional[dict]: + """Wait until a connection_status_change event with PartiallyConnected or Connected arrives.""" + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + with collector._lock: + for event in collector.events: + if event.get("eventType") == "connection_status_change" and event.get("connectionStatus") in ("PartiallyConnected", "Connected"): + return event + time.sleep(poll_interval_s) + return None + + TERMINAL_EVENT_TYPES = {EVENT_PROPAGATED, EVENT_SENT, EVENT_ERROR} @@ -136,7 +148,6 @@ def get_node_multiaddr(node) -> str: return addr -# This API for creating messages for send.API not the REST calls def create_message_bindings(**overrides) -> dict: envelope = { "contentTopic": DEFAULT_CONTENT_TOPIC, diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py index d64d3db15..cd46d954e 100644 --- a/tests/wrappers_tests/test_send_e2e.py +++ b/tests/wrappers_tests/test_send_e2e.py @@ -1,17 +1,16 @@ import base64 import pytest -from src.env_vars import NODE_2 from src.steps.common import StepsCommon from src.libs.common import delay, to_base64 from src.libs.custom_logger import get_custom_logger -from src.node.waku_node import WakuNode from src.node.wrappers_manager import WrapperManager from src.node.wrapper_helpers import ( EventCollector, assert_event_invariants, create_message_bindings, get_node_multiaddr, + wait_for_connected, wait_for_propagated, wait_for_sent, wait_for_error, @@ -38,7 +37,7 @@ RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window" @pytest.mark.smoke -class TestSendBeforeRelay(StepsStore): +class TestSendBeforeRelay(StepsCommon): def test_s17_send_before_relay_peers_joins(self, node_config): """ S17: sender starts isolated, calls send() @@ -74,7 +73,7 @@ class TestSendBeforeRelay(StepsStore): relay_config = { **node_config, "staticnodes": [get_node_multiaddr(sender_node)], - "portsshift": 1, + "portsShift": 1, "store": True, } @@ -135,7 +134,7 @@ class TestSendBeforeRelay(StepsStore): relay_config = { **node_config, "staticnodes": [get_node_multiaddr(sender_node)], - "portsshift": 1, + "portsShift": 1, "store": False, } @@ -143,6 +142,8 @@ class TestSendBeforeRelay(StepsStore): assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" with relay_result.ok_value: + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + propagated_event = wait_for_propagated( collector=sender_collector, request_id=request_id, @@ -168,12 +169,11 @@ class TestSendBeforeRelay(StepsStore): def test_s19_store_peer_appears_after_propagation(self, node_config): """ - S19: a store peer comes online later. - question for Zoltan , is reliability = true mandatory for the store peer ? - what is the effect of the reliability here ? - - send() returns Ok(RequestId) immediately - - Propagated --- relay peer - - Sent when store peer is reachable + S19: store peer comes online after relay propagation succeeds. + - send() returns Ok(RequestId) + - Propagated arrives via relay peer + - No Sent while store peer is absent + - Sent arrives after store peer joins and archives the message """ sender_collector = EventCollector() @@ -194,20 +194,19 @@ class TestSendBeforeRelay(StepsStore): assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" with sender_result.ok_value as sender_node: - # relay peer relay_config = { **node_config, "staticnodes": [get_node_multiaddr(sender_node)], - "portsshift": 1, + "portsShift": 1, "store": False, - # "p2preliability": False, # commented as the option not supported } relay_result = WrapperManager.create_and_start(config=relay_config) assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" with relay_result.ok_value as relay_peer: - # send(). Must return Ok(RequestId) immediately. + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + message = create_message_bindings() send_result = sender_node.send_message(message=message) assert send_result.is_ok(), f"send() must return Ok(RequestId), got: {send_result.err()}" @@ -215,7 +214,6 @@ class TestSendBeforeRelay(StepsStore): request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" - # Propagated should arrive via the relay peer. propagated_event = wait_for_propagated( collector=sender_collector, request_id=request_id, @@ -232,33 +230,29 @@ class TestSendBeforeRelay(StepsStore): ) assert early_sent_event is None, f"MessageSentEvent arrived before any store peer was reachable. " f"Event: {early_sent_event}" - # Store peer - store_node = WakuNode(NODE_2, f"store_node") - store_node.start(relay="true", store="true", discv5_discovery="false") - store_node.set_relay_subscriptions([self.test_pubsub_topic]) - relay_multiaddr = get_node_multiaddr(relay_peer) - sender_multiaddr = get_node_multiaddr(sender_node) - store_node.add_peers([relay_multiaddr, sender_multiaddr]) - delay(3) + store_config = { + **node_config, + "staticnodes": [ + get_node_multiaddr(sender_node), + get_node_multiaddr(relay_peer), + ], + "portsShift": 2, + "store": True, + } - sent_event = wait_for_sent( - collector=sender_collector, - request_id=request_id, - timeout_s=SENT_AFTER_STORE_TIMEOUT_S, - ) + store_result = WrapperManager.create_and_start(config=store_config) + assert store_result.is_ok(), f"Failed to start store peer: {store_result.err()}" - assert sent_event is not None, ( - f"No MessageSentEvent received within {SENT_AFTER_STORE_TIMEOUT_S}s " - f"after store peer joined. Collected events: {sender_collector.events}" - ) - - self.check_published_message_is_stored( - store_node=store_node, - pubsub_topic=self.test_pubsub_topic, - messages_to_check=[message], - page_size=5, - ascending="true", - ) + with store_result.ok_value: + sent_event = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_AFTER_STORE_TIMEOUT_S, + ) + assert sent_event is not None, ( + f"No MessageSentEvent received within {SENT_AFTER_STORE_TIMEOUT_S}s " + f"after store peer joined. Collected events: {sender_collector.events}" + ) assert_event_invariants(sender_collector, request_id) @@ -349,7 +343,7 @@ class TestS07CoreSenderRelayAndStore(StepsCommon): peer_config = { **node_config, "staticnodes": [get_node_multiaddr(sender)], - "portsshift": 1, + "portsShift": 1, "store": True, } @@ -506,14 +500,16 @@ class TestS06CoreSenderRelayOnly(StepsCommon): peer_config = { **node_config, "staticnodes": [get_node_multiaddr(sender)], - "portsshift": 1, + "portsShift": 1, } peer_result = WrapperManager.create_and_start(config=peer_config) assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" with peer_result.ok_value: - message = self.create_message( + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + + message = create_message_bindings( payload=to_base64("S06 relay-only test payload"), contentTopic="/test/1/s06-relay-only/proto", ) @@ -543,6 +539,128 @@ class TestS06CoreSenderRelayOnly(StepsCommon): assert_event_invariants(sender_collector, request_id) +class TestS02AutoSubscribeOnFirstSend(StepsCommon): + """ + S02 — Auto-subscribe on first send. + Sender never calls subscribe_content_topic() before send(). + The send API must auto-subscribe to the content topic used in the message. + Expected: send() returns Ok(RequestId), message_propagated arrives. + """ + + def test_s02_send_without_explicit_subscribe(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsShift": 1, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" + + with peer_result.ok_value: + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + + message = create_message_bindings( + payload=to_base64("S02 auto-subscribe test payload"), + contentTopic="/test/1/s02-auto-subscribe/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + +class TestS12IsolatedSenderNoPeers(StepsCommon): + """ + S12 — Isolated sender, no peers. + Sender has relay enabled but zero relay peers and zero lightpush peers. + Expected: send() returns Ok(RequestId), but eventually a message_error + event arrives (no route to propagate). + """ + + def test_s12_send_with_no_peers_produces_error(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings( + payload=to_base64("S12 isolated sender payload"), + contentTopic="/test/1/s12-isolated/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + error = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S, + ) + assert error is not None, ( + f"No message_error event within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s " + f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack) for isolated sender. " + f"Collected events: {sender_collector.events}" + ) + assert error["requestId"] == request_id + + propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert propagated is None, f"Unexpected message_propagated event for isolated sender: {propagated}" + + class TestS14LightpushNonRetryableError(StepsCommon): """ S14 — Lightpush non-retryable error via oversized message.