Add S02/S12 send API tests and PR CI pipeline (#174)

* Add tests for auto-subscribe on first send and isolated sender with no peers

* Add PR CI workflow with tiered test strategy

- pr_tests.yml: build job with cache, wrapper-tests, smoke-tests,
  and label-triggered full-suite
- test_common.yml: add deploy_allure/send_discord inputs so PR runs
  skip reporting side effects
- Add docker_required marker to S19 (needs Docker, excluded from
  wrapper-only CI job)
- Register docker_required marker in pytest.ini

* Document PR CI test workflows in README

* Refine PR CI test strategy:
- Exclude `docker_required` tests from smoke set in `pr_tests.yml`.
- Add `wait_for_connected` helper for connection state checks.
- Update S19 test to dynamically create and clean up the store node setup.
- General simplifications and improved test stability.

* Add `wait_for_connected` assertion to ensure sender connection state before propagation test

* Refine tests and CI workflows:
- Replace `ERROR_TIMEOUT_S` with `ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S` in `test_send_e2e.py`.
- Adjust timeout assertion for better clarity and accuracy.
- Update `pr_tests.yml` to add retries (`--reruns`) and ignore wrapper tests in smoke tests.
- Change `test_common.yml` default Discord reporting to `false`.

* Normalize `portsshift` to `portsShift` in `test_send_e2e.py` configuration definitions.

---------

Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>
This commit is contained in:
Egor Rachkovskii 2026-04-30 16:15:50 +01:00 committed by GitHub
parent df073119ba
commit a85223a79c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 436 additions and 51 deletions

235
.github/workflows/pr_tests.yml vendored Normal file
View File

@ -0,0 +1,235 @@
name: PR Tests
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
on:
pull_request:
types: [opened, synchronize, reopened, labeled]
paths:
- "src/**"
- "tests/**"
- "vendor/**"
- "requirements.txt"
- "pytest.ini"
- ".github/workflows/pr_tests.yml"
push:
branches: [master]
paths:
- "vendor/**"
workflow_dispatch:
inputs:
run_full_suite:
description: "Run the full test suite (18 shards)"
required: false
default: false
type: boolean
jobs:
build:
name: Build liblogosdelivery
runs-on: ubuntu-latest
timeout-minutes: 45
if: >-
github.event.action != 'labeled' ||
github.event.label.name == 'full-test'
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: Compute cache key
id: cache-key
run: |
BINDINGS_HASH=$(git rev-parse HEAD:vendor/logos-delivery-python-bindings)
DELIVERY_HASH=$(git -C vendor/logos-delivery-python-bindings rev-parse HEAD:vendor/logos-delivery)
echo "key=liblogosdelivery-${{ runner.os }}-nim2.2.4-${BINDINGS_HASH}-${DELIVERY_HASH}" >> "$GITHUB_OUTPUT"
- name: Cache liblogosdelivery.so
id: cache-lib
uses: actions/cache@v4
with:
path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so
key: ${{ steps.cache-key.outputs.key }}
- name: Remove unwanted software
if: steps.cache-lib.outputs.cache-hit != 'true'
uses: ./.github/actions/prune-vm
- name: Install system deps
if: steps.cache-lib.outputs.cache-hit != 'true'
run: |
sudo apt-get update
sudo apt-get install -y \
util-linux \
iproute2 \
sudo \
ca-certificates \
curl \
make \
gcc \
g++
- name: Install Nim 2.2.4
if: steps.cache-lib.outputs.cache-hit != 'true'
run: |
set -euo pipefail
curl https://nim-lang.org/choosenim/init.sh -sSf | sh -s -- -y
echo "$HOME/.nimble/bin" >> "$GITHUB_PATH"
export PATH="$HOME/.nimble/bin:$PATH"
choosenim 2.2.4
nim --version
nimble --version
- name: Build liblogosdelivery.so
if: steps.cache-lib.outputs.cache-hit != 'true'
run: |
set -euo pipefail
export PATH="$HOME/.nimble/bin:$PATH"
BINDINGS_DIR="$(pwd)/vendor/logos-delivery-python-bindings"
DELIVERY_DIR="$BINDINGS_DIR/vendor/logos-delivery"
mkdir -p "$BINDINGS_DIR/lib"
cd "$DELIVERY_DIR"
ln -sf waku.nimble waku.nims
nimble install -y
make setup
make liblogosdelivery
SO_PATH="$(find . -type f -name 'liblogosdelivery.so' | head -n 1)"
if [ -z "$SO_PATH" ]; then
echo "liblogosdelivery.so was not built"
exit 1
fi
cp "$SO_PATH" "$BINDINGS_DIR/lib/liblogosdelivery.so"
echo "Built library:"
ls -l "$BINDINGS_DIR/lib/liblogosdelivery.so"
- name: Upload library artifact
uses: actions/upload-artifact@v4
with:
name: liblogosdelivery
path: vendor/logos-delivery-python-bindings/lib/liblogosdelivery.so
retention-days: 1
wrapper-tests:
name: Wrapper Tests
runs-on: ubuntu-latest
needs: [build]
timeout-minutes: 15
if: >-
github.event_name != 'push' &&
(github.event.action != 'labeled' || github.event.label.name == 'full-test')
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- uses: actions/setup-python@v4
with:
python-version: "3.12"
cache: "pip"
- run: pip install -r requirements.txt
- name: Download liblogosdelivery.so
uses: actions/download-artifact@v4
with:
name: liblogosdelivery
path: vendor/logos-delivery-python-bindings/lib/
- name: Run wrapper tests
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest tests/wrappers_tests/ \
-m "not docker_required" \
--reruns 2 \
--junit-xml=wrapper-results.xml
- name: Test Report
if: always()
uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2
with:
name: Wrapper Test Results
path: wrapper-results.xml
reporter: java-junit
use-actions-summary: "true"
smoke-tests:
name: Smoke Tests
runs-on: ubuntu-latest
needs: [build]
timeout-minutes: 30
if: >-
github.event_name != 'push' &&
(github.event.action != 'labeled' || github.event.label.name == 'full-test')
env:
NODE_1: "wakuorg/nwaku:latest"
NODE_2: "wakuorg/nwaku:latest"
ADDITIONAL_NODES: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest"
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- uses: actions/setup-python@v4
with:
python-version: "3.12"
cache: "pip"
- run: pip install -r requirements.txt
- name: Download liblogosdelivery.so
uses: actions/download-artifact@v4
with:
name: liblogosdelivery
path: vendor/logos-delivery-python-bindings/lib/
- name: Run smoke tests
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest -m "smoke and not docker_required" \
--ignore=vendor/logos-delivery-python-bindings/tests \
--ignore=tests/wrappers_tests \
--reruns 1 \
-n 4 \
--dist=loadgroup \
--junit-xml=smoke-results.xml
- name: Test Report
if: always()
uses: dorny/test-reporter@95058abb17504553158e70e2c058fe1fda4392c2
with:
name: Smoke Test Results
path: smoke-results.xml
reporter: java-junit
use-actions-summary: "true"
full-suite:
name: Full Suite
if: >-
github.event_name != 'push' &&
(contains(github.event.pull_request.labels.*.name, 'full-test') ||
github.event.inputs.run_full_suite == 'true')
uses: ./.github/workflows/test_common.yml
secrets: inherit
with:
node1: "wakuorg/nwaku:latest"
node2: "wakuorg/nwaku:latest"
additional_nodes: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest"
caller: "pr"
deploy_allure: false
send_discord: false

View File

@ -22,6 +22,16 @@ on:
required: false
description: "Workflow caller. Used in reporting"
type: string
deploy_allure:
required: false
description: "Deploy allure report to gh-pages"
type: boolean
default: true
send_discord:
required: false
description: "Send test results to Discord"
type: boolean
default: false
env:
FORCE_COLOR: "1"
@ -206,7 +216,7 @@ jobs:
aggregate-reports:
runs-on: ubuntu-latest
needs: [tests]
if: always()
if: always() && inputs.deploy_allure
steps:
- name: Download all allure results
@ -348,7 +358,7 @@ jobs:
- name: Send report to Discord
uses: rjstone/discord-webhook-notify@v1
if: always() && env.CALLER != 'manual'
if: always() && env.CALLER != 'manual' && inputs.send_discord
with:
severity: ${{ env.TESTS_RESULT == 'success' && 'info' || 'error' }}
username: ${{ github.workflow }}

View File

@ -52,6 +52,16 @@ To launch it manually:
2. Click **Run workflow**.
3. Pick the branch you want to test (defaults to `master`) and press **Run workflow**.
### PR tests
Every push to a pull request triggers **pr\_tests.yml** which runs:
1. **Build** — compiles `liblogosdelivery.so` (cached by submodule commit hash).
2. **Wrapper tests** — all tests under `tests/wrappers_tests/` that don't require Docker (~5 min).
3. **Smoke tests**`pytest -m smoke` with Docker nodes (~10 min).
To run the **full test suite** (18 shards, same as daily) on a PR, add the label **`full-test`** to the pull request. The full suite will start automatically.
### Ondemand matrix against custom *logos-messaging-nim* versions
Use **interop\_tests.yml** when you need to test a PR or a historical image:

View File

@ -12,3 +12,4 @@ log_file_format = %(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s
timeout = 300
markers =
smoke: marks tests as smoke test (deselect with '-m "not smoke"')
docker_required: test requires Docker nodes (WakuNode)

View File

@ -12,10 +12,6 @@ EVENT_PROPAGATED = "message_propagated"
EVENT_SENT = "message_sent"
EVENT_ERROR = "message_error"
# ---------------------------------------------------------------------------
# Event collection
# ---------------------------------------------------------------------------
class EventCollector:
"""Thread-safe collector for async node events.
@ -88,6 +84,22 @@ def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float)
return wait_for_event(collector, request_id, is_error_event, timeout_s)
def wait_for_connected(
collector: EventCollector,
timeout_s: float = 10.0,
poll_interval_s: float = 0.3,
) -> Optional[dict]:
"""Wait until a connection_status_change event with PartiallyConnected or Connected arrives."""
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
with collector._lock:
for event in collector.events:
if event.get("eventType") == "connection_status_change" and event.get("connectionStatus") in ("PartiallyConnected", "Connected"):
return event
time.sleep(poll_interval_s)
return None
TERMINAL_EVENT_TYPES = {EVENT_PROPAGATED, EVENT_SENT, EVENT_ERROR}
@ -136,7 +148,6 @@ def get_node_multiaddr(node) -> str:
return addr
# This API for creating messages for send.API not the REST calls
def create_message_bindings(**overrides) -> dict:
envelope = {
"contentTopic": DEFAULT_CONTENT_TOPIC,

View File

@ -1,17 +1,16 @@
import base64
import pytest
from src.env_vars import NODE_2
from src.steps.common import StepsCommon
from src.libs.common import delay, to_base64
from src.libs.custom_logger import get_custom_logger
from src.node.waku_node import WakuNode
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import (
EventCollector,
assert_event_invariants,
create_message_bindings,
get_node_multiaddr,
wait_for_connected,
wait_for_propagated,
wait_for_sent,
wait_for_error,
@ -38,7 +37,7 @@ RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
@pytest.mark.smoke
class TestSendBeforeRelay(StepsStore):
class TestSendBeforeRelay(StepsCommon):
def test_s17_send_before_relay_peers_joins(self, node_config):
"""
S17: sender starts isolated, calls send()
@ -74,7 +73,7 @@ class TestSendBeforeRelay(StepsStore):
relay_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender_node)],
"portsshift": 1,
"portsShift": 1,
"store": True,
}
@ -135,7 +134,7 @@ class TestSendBeforeRelay(StepsStore):
relay_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender_node)],
"portsshift": 1,
"portsShift": 1,
"store": False,
}
@ -143,6 +142,8 @@ class TestSendBeforeRelay(StepsStore):
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
@ -168,12 +169,11 @@ class TestSendBeforeRelay(StepsStore):
def test_s19_store_peer_appears_after_propagation(self, node_config):
"""
S19: a store peer comes online later.
question for Zoltan , is reliability = true mandatory for the store peer ?
what is the effect of the reliability here ?
- send() returns Ok(RequestId) immediately
- Propagated --- relay peer
- Sent when store peer is reachable
S19: store peer comes online after relay propagation succeeds.
- send() returns Ok(RequestId)
- Propagated arrives via relay peer
- No Sent while store peer is absent
- Sent arrives after store peer joins and archives the message
"""
sender_collector = EventCollector()
@ -194,20 +194,19 @@ class TestSendBeforeRelay(StepsStore):
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
# relay peer
relay_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender_node)],
"portsshift": 1,
"portsShift": 1,
"store": False,
# "p2preliability": False, # commented as the option not supported
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value as relay_peer:
# send(). Must return Ok(RequestId) immediately.
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings()
send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId), got: {send_result.err()}"
@ -215,7 +214,6 @@ class TestSendBeforeRelay(StepsStore):
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
# Propagated should arrive via the relay peer.
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
@ -232,33 +230,29 @@ class TestSendBeforeRelay(StepsStore):
)
assert early_sent_event is None, f"MessageSentEvent arrived before any store peer was reachable. " f"Event: {early_sent_event}"
# Store peer
store_node = WakuNode(NODE_2, f"store_node")
store_node.start(relay="true", store="true", discv5_discovery="false")
store_node.set_relay_subscriptions([self.test_pubsub_topic])
relay_multiaddr = get_node_multiaddr(relay_peer)
sender_multiaddr = get_node_multiaddr(sender_node)
store_node.add_peers([relay_multiaddr, sender_multiaddr])
delay(3)
store_config = {
**node_config,
"staticnodes": [
get_node_multiaddr(sender_node),
get_node_multiaddr(relay_peer),
],
"portsShift": 2,
"store": True,
}
sent_event = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=SENT_AFTER_STORE_TIMEOUT_S,
)
store_result = WrapperManager.create_and_start(config=store_config)
assert store_result.is_ok(), f"Failed to start store peer: {store_result.err()}"
assert sent_event is not None, (
f"No MessageSentEvent received within {SENT_AFTER_STORE_TIMEOUT_S}s "
f"after store peer joined. Collected events: {sender_collector.events}"
)
self.check_published_message_is_stored(
store_node=store_node,
pubsub_topic=self.test_pubsub_topic,
messages_to_check=[message],
page_size=5,
ascending="true",
)
with store_result.ok_value:
sent_event = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=SENT_AFTER_STORE_TIMEOUT_S,
)
assert sent_event is not None, (
f"No MessageSentEvent received within {SENT_AFTER_STORE_TIMEOUT_S}s "
f"after store peer joined. Collected events: {sender_collector.events}"
)
assert_event_invariants(sender_collector, request_id)
@ -349,7 +343,7 @@ class TestS07CoreSenderRelayAndStore(StepsCommon):
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsshift": 1,
"portsShift": 1,
"store": True,
}
@ -506,14 +500,16 @@ class TestS06CoreSenderRelayOnly(StepsCommon):
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsshift": 1,
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
message = self.create_message(
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S06 relay-only test payload"),
contentTopic="/test/1/s06-relay-only/proto",
)
@ -543,6 +539,128 @@ class TestS06CoreSenderRelayOnly(StepsCommon):
assert_event_invariants(sender_collector, request_id)
class TestS02AutoSubscribeOnFirstSend(StepsCommon):
"""
S02 Auto-subscribe on first send.
Sender never calls subscribe_content_topic() before send().
The send API must auto-subscribe to the content topic used in the message.
Expected: send() returns Ok(RequestId), message_propagated arrives.
"""
def test_s02_send_without_explicit_subscribe(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S02 auto-subscribe test payload"),
contentTopic="/test/1/s02-auto-subscribe/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
class TestS12IsolatedSenderNoPeers(StepsCommon):
"""
S12 Isolated sender, no peers.
Sender has relay enabled but zero relay peers and zero lightpush peers.
Expected: send() returns Ok(RequestId), but eventually a message_error
event arrives (no route to propagate).
"""
def test_s12_send_with_no_peers_produces_error(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
message = create_message_bindings(
payload=to_base64("S12 isolated sender payload"),
contentTopic="/test/1/s12-isolated/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
error = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S,
)
assert error is not None, (
f"No message_error event within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s "
f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack) for isolated sender. "
f"Collected events: {sender_collector.events}"
)
assert error["requestId"] == request_id
propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert propagated is None, f"Unexpected message_propagated event for isolated sender: {propagated}"
class TestS14LightpushNonRetryableError(StepsCommon):
"""
S14 Lightpush non-retryable error via oversized message.