Merge branch 'master' into shards_fix

This commit is contained in:
Aya Hassan 2026-06-07 18:39:28 +02:00
commit 321a2cec69
15 changed files with 2150 additions and 812 deletions

View File

@ -6,32 +6,32 @@ concurrency:
on:
schedule:
- cron: '0 2 * * *'
- cron: '0 2 * * 0'
workflow_dispatch:
inputs:
node1:
required: true
description: "Node that usually publishes messages. Used for all tests"
type: string
default: "wakuorg/nwaku:latest"
default: "wakuorg/nwaku:v0.38.1"
node2:
required: true
description: "Node that usually queries for published messages. Used for all tests"
type: string
default: "wakuorg/nwaku:latest"
default: "wakuorg/nwaku:v0.38.1"
additional_nodes:
required: false
description: "Additional optional nodes used in e2e tests, separated by ,"
type: string
default: "wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest"
default: "wakuorg/nwaku:v0.38.1,wakuorg/nwaku:v0.38.1,wakuorg/nwaku:v0.38.1"
jobs:
test-common:
uses: ./.github/workflows/test_common.yml
secrets: inherit
with:
node1: ${{ inputs.node1 || 'wakuorg/nwaku:latest' }}
node2: ${{ inputs.node2 || 'wakuorg/nwaku:latest' }}
additional_nodes: ${{ inputs.additional_nodes || 'wakuorg/nwaku:latest,wakuorg/nwaku:latest,wakuorg/nwaku:latest' }}
node1: ${{ inputs.node1 || 'wakuorg/nwaku:v0.38.1' }}
node2: ${{ inputs.node2 || 'wakuorg/nwaku:v0.38.1' }}
additional_nodes: ${{ inputs.additional_nodes || 'wakuorg/nwaku:v0.38.1,wakuorg/nwaku:v0.38.1,wakuorg/nwaku:v0.38.1' }}
fleet_tests: true
caller: "fleet"

View File

@ -71,7 +71,7 @@ jobs:
gcc \
g++
- name: Install Nim 2.2.4
- name: Install Nim 2.2.4 and Nimble 0.22.3
if: steps.cache-lib.outputs.cache-hit != 'true'
run: |
set -euo pipefail
@ -79,6 +79,8 @@ jobs:
echo "$HOME/.nimble/bin" >> "$GITHUB_PATH"
export PATH="$HOME/.nimble/bin:$PATH"
choosenim 2.2.4
# Pin the nimble that generated nimble.lock; newer ones fail the checksum.
(cd /tmp && nimble install "nimble@0.22.3" -y)
nim --version
nimble --version
@ -98,11 +100,9 @@ jobs:
ln -sf waku.nimble waku.nims
nimble install -y
make setup
make liblogosdelivery
# Portable build (no -march=native) so the cached .so doesn't SIGILL
# on runners with older CPUs; via env so the Makefile's NIM_PARAMS apply.
NIM_PARAMS="-d:disableMarchNative -d:marchOptimized" make liblogosdelivery
SO_PATH="$(find . -type f -name 'liblogosdelivery.so' | head -n 1)"
@ -159,25 +159,55 @@ jobs:
--reruns 2 \
--junit-xml=wrapper-results-basic.xml
- name: Run wrapper tests - send e2e part 1
- name: Run wrapper tests - send handle and subscription
continue-on-error: true
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest tests/wrappers_tests/test_send_e2e_part1.py \
pytest tests/wrappers_tests/test_send_handle_and_subscription.py \
-m "not docker_required" \
--reruns 2 \
--junit-xml=wrapper-results-send-part1.xml
--junit-xml=wrapper-results-send-handle-and-subscription.xml
- name: Run wrapper tests - send e2e part 2
- name: Run wrapper tests - send relay propagation
continue-on-error: true
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest tests/wrappers_tests/test_send_e2e_part2.py \
pytest tests/wrappers_tests/test_send_relay_propagation.py \
-m "not docker_required" \
--reruns 2 \
--junit-xml=wrapper-results-send-part2.xml
--junit-xml=wrapper-results-send-relay-propagation.xml
- name: Run wrapper tests - send lightpush and edge
continue-on-error: true
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest tests/wrappers_tests/test_send_lightpush_and_edge.py \
-m "not docker_required" \
--reruns 2 \
--junit-xml=wrapper-results-send-lightpush-and-edge.xml
- name: Run wrapper tests - send errors and concurrency
continue-on-error: true
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest tests/wrappers_tests/test_send_errors_and_concurrency.py \
-m "not docker_required" \
--reruns 2 \
--junit-xml=wrapper-results-send-errors-and-concurrency.xml
- name: Run wrapper tests - corner cases
continue-on-error: true
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest tests/wrappers_tests/test_wrapper_corner_cases.py \
-m "not docker_required" \
--reruns 2 \
--junit-xml=wrapper-results-corner-cases.xml
- name: Test Report
if: always()

View File

@ -52,7 +52,7 @@ jobs:
strategy:
fail-fast: false
matrix:
shard: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17]
shard: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17]
# total number of shards =18 means tests will split into 18 thread and run in parallel to increase execution speed
# command for sharding :
# pytest --shard-id=<shard_number> --num-shards=<total_shards>

View File

@ -230,52 +230,6 @@ class WakuNode:
logger.error(f"REST service did not become ready in time: {ex}")
raise
def _start_wrapper(self, default_args, wait_for_node_sec):
logger.debug("Starting node using wrappers")
wrapper_config = self._default_args_to_wrapper_config(default_args)
result = WrapperManager.create_and_start(config=wrapper_config, timeout_s=wait_for_node_sec)
if result.is_err():
raise RuntimeError(f"Failed to start wrapper node: {result.err()}")
self._wrapper_node = result.ok_value
logger.debug(f"Started wrapper node. REST: {self._rest_port}")
DS.waku_nodes.append(self)
delay(1)
try:
self.ensure_ready(timeout_duration=wait_for_node_sec)
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
def _default_args_to_wrapper_config(self, default_args):
def _bool(key, default="false"):
return default_args.get(key, default).lower() == "true"
bootstrap = default_args.get("discv5-bootstrap-node")
return {
"logLevel": default_args.get("log-level", "DEBUG"),
"mode": "Core",
"networkingConfig": {
"listenIpv4": default_args.get("listen-address", "0.0.0.0"),
"p2pTcpPort": int(default_args["tcp-port"]),
"discv5UdpPort": int(default_args["discv5-udp-port"]),
"restPort": int(default_args["rest-port"]),
"restAddress": default_args.get("rest-address", "0.0.0.0"),
},
"protocolsConfig": {
"clusterId": int(default_args.get("cluster-id", DEFAULT_CLUSTER_ID)),
"relay": _bool("relay"),
"store": _bool("store"),
"filter": _bool("filter"),
"lightpush": _bool("lightpush"),
"peerExchange": _bool("peer-exchange"),
"discv5Discovery": _bool("discv5-discovery", "true"),
"discv5BootstrapNodes": [bootstrap] if bootstrap else [],
},
}
@retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True)
def register_rln(self, **kwargs):
logger.debug("Registering RLN credentials...")

View File

@ -1,6 +1,8 @@
from __future__ import annotations
import base64
import json
import re
import threading
import time
from typing import Optional
@ -195,6 +197,53 @@ def get_node_multiaddr(node) -> str:
return addr
# Matches the /tcp/<port>/ segment in a libp2p multiaddr.
TCP_PORT_RE = re.compile(r"/tcp/(\d+)/")
def get_node_tcp_port(node) -> int:
"""Return the TCP port the node advertises in its multiaddr."""
multiaddr = get_node_multiaddr(node)
match = TCP_PORT_RE.search(multiaddr)
if not match:
raise RuntimeError(f"multiaddr missing /tcp/<port>/ segment: {multiaddr!r}")
return int(match.group(1))
def get_node_bound_ports(node) -> dict:
"""Return the MyBoundPorts debug info .
Keys: tcp, webSocket, rest, discv5Udp, metrics. A value of 0 means the
service is disabled or did not bind.
"""
result = node.get_node_info_raw("MyBoundPorts")
if result.is_err():
raise RuntimeError(f"MyBoundPorts query failed: {result.err()}")
return json.loads(result.ok_value)
def enr_udp_port(enr_uri: str) -> int:
"""Extract the advertised udp port from a text-encoded ENR.
An ENR is "enr:" + base64url(RLP). Instead of pulling in a full RLP
decoder, find the "udp" key in the raw bytes and read the value after it.
"""
if not enr_uri.startswith("enr:"):
raise RuntimeError(f"not an ENR URI: {enr_uri!r}")
b64 = enr_uri[len("enr:") :]
payload = base64.urlsafe_b64decode(b64 + "=" * (-len(b64) % 4))
key = payload.find(b"\x83udp") # "udp" encoded as a 3-byte RLP string
if key == -1:
raise RuntimeError(f"ENR has no udp entry: {enr_uri!r}")
prefix = payload[key + 4]
if prefix < 0x80: # values < 128 are encoded as a single byte
return prefix
size = prefix - 0x80 # short string: prefix is 0x80 + length
return int.from_bytes(payload[key + 5 : key + 5 + size], "big")
def create_message_bindings(**overrides) -> dict:
envelope = {
"contentTopic": DEFAULT_CONTENT_TOPIC,

View File

@ -62,6 +62,10 @@ class WrapperManager:
def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.stop_and_destroy(timeout_s=timeout_s)
def destroy_keep_ctx(self, *, timeout_s: float = 20.0) -> Result[int, str]:
"""Pass-through for NodeWrapper.destroy_keep_ctx — see that method."""
return self._node.destroy_keep_ctx(timeout_s=timeout_s)
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.subscribe_content_topic(content_topic, timeout_s=timeout_s)

View File

@ -8,7 +8,7 @@ import allure
from src.steps.common import StepsCommon
from src.test_data import PUBSUB_TOPICS_RLN
from src.env_vars import DEFAULT_NWAKU, RLN_CREDENTIALS, NODE_1, NODE_2, ADDITIONAL_NODES
from src.env_vars import RLN_CREDENTIALS, NODE_1, NODE_2, ADDITIONAL_NODES
from src.libs.common import gen_step_id, delay
from src.libs.custom_logger import get_custom_logger
from src.node.waku_node import WakuNode, rln_credential_store_ready
@ -135,7 +135,7 @@ class StepsRLN(StepsCommon):
@allure.step
def register_rln_single_node(self, prefix="", **kwargs):
logger.debug("Registering RLN credentials for single node")
self.node = WakuNode(DEFAULT_NWAKU, f"node_{gen_step_id()}")
self.node = WakuNode(NODE_1, f"node_{gen_step_id()}")
return self.node.register_rln(rln_keystore_prefix=prefix, rln_creds_source=kwargs["rln_creds_source"], rln_creds_id=kwargs["rln_creds_id"])
@allure.step

View File

@ -50,7 +50,7 @@ def fleet_rln_state(request):
return
from src.node.waku_node import WakuNode
from src.env_vars import RLN_CREDENTIALS, DEFAULT_NWAKU
from src.env_vars import RLN_CREDENTIALS, NODE_1
if not RLN_CREDENTIALS:
logger.info("Fleet RLN: RLN_CREDENTIALS not set nodes will start without RLN")
@ -61,7 +61,7 @@ def fleet_rln_state(request):
try:
for i in range(2):
prefix = "".join(random.choices(string.ascii_lowercase, k=4))
node = WakuNode(DEFAULT_NWAKU, f"rln_reg_{i + 1}_{gen_step_id()}")
node = WakuNode(NODE_1, f"rln_reg_{i + 1}_{gen_step_id()}")
membership_index = node.register_rln(
rln_keystore_prefix=prefix,
rln_creds_source=RLN_CREDENTIALS,

View File

@ -0,0 +1,148 @@
"""S01 helpers: invoke send() against an invalid handle in an isolated process.
Run via:
python -m tests.wrappers_tests.helpers.send_on_invalid_handle nil <marker>
python -m tests.wrappers_tests.helpers.send_on_invalid_handle destroyed <marker>
Prints a single line to stdout starting with <marker>, followed by a JSON
payload describing the outcome. Runs in its own process so that a missing
C-ABI guard (which can SIGSEGV) fails the parent test cleanly instead of
taking the pytest runner down.
Cases:
- nil: send() on a wrapper built with ctx=ffi.NULL.
- destroyed: send() after destroy_keep_ctx() self.ctx stays non-nil so the
call reaches the C side with the original (now-stale) pointer.
"""
import json
import sys
from pathlib import Path
def _ensure_bindings_on_path() -> None:
# The wrapper module lives outside the project tree, under vendor/.
# Helper file is at <root>/tests/wrappers_tests/helpers/<this>.py.
project_root = Path(__file__).resolve().parents[3]
bindings_path = project_root / "vendor" / "logos-delivery-python-bindings" / "waku"
if str(bindings_path) not in sys.path:
sys.path.insert(0, str(bindings_path))
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
def _emit(marker: str, payload: dict) -> None:
print(marker + json.dumps(payload))
def _run_nil_handle(marker: str) -> None:
from wrapper import NodeWrapper, ffi # type: ignore[import-not-found]
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import create_message_bindings
sender = WrapperManager(NodeWrapper(ctx=ffi.NULL, config_buffer=None, event_cb_handler=None))
send_result = sender.send_message(message=create_message_bindings())
_emit(
marker,
{
"is_ok": send_result.is_ok(),
"ok": send_result.ok_value if send_result.is_ok() else None,
"err": send_result.err() if send_result.is_err() else None,
},
)
def _run_destroyed_handle(marker: str) -> None:
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import EventCollector, create_message_bindings
from tests.wrappers_tests.conftest import build_node_config
collector = EventCollector()
create_result = WrapperManager.create_and_start(
config=build_node_config(),
event_cb=collector.event_callback,
)
if create_result.is_err():
_emit(
marker,
{
"stage": "create_and_start",
"is_ok": False,
"ok": None,
"err": create_result.err(),
"events_after_send": [],
},
)
return
sender = create_result.ok_value
stop_result = sender.stop_node()
if stop_result.is_err():
_emit(
marker,
{
"stage": "stop_node",
"is_ok": False,
"ok": None,
"err": stop_result.err(),
"events_after_send": [],
},
)
return
destroy_result = sender.destroy_keep_ctx()
if destroy_result.is_err():
_emit(
marker,
{
"stage": "destroy_keep_ctx",
"is_ok": False,
"ok": None,
"err": destroy_result.err(),
"events_after_send": [],
},
)
return
events_before_send = len(collector.events)
send_result = sender.send_message(message=create_message_bindings())
new_events = collector.events[events_before_send:]
_emit(
marker,
{
"stage": "send_message",
"is_ok": send_result.is_ok(),
"ok": send_result.ok_value if send_result.is_ok() else None,
"err": send_result.err() if send_result.is_err() else None,
"events_after_send": [str(e) for e in new_events],
},
)
CASES = {
"nil": _run_nil_handle,
"destroyed": _run_destroyed_handle,
}
def main() -> int:
if len(sys.argv) != 3 or sys.argv[1] not in CASES:
cases = "|".join(CASES)
print(f"usage: send_on_invalid_handle <{cases}> <result_marker>", file=sys.stderr)
return 2
case, marker = sys.argv[1], sys.argv[2]
_ensure_bindings_on_path()
CASES[case](marker)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,526 @@
import base64
from concurrent.futures import ThreadPoolExecutor
import pytest
from src.env_vars import NODE_2
from src.steps.common import StepsCommon
from src.steps.store import StepsStore
from src.libs.common import delay, to_base64
from src.libs.custom_logger import get_custom_logger
from src.node.waku_node import WakuNode
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import (
EventCollector,
assert_event_invariants,
create_message_bindings,
get_node_multiaddr,
wait_for_connected,
wait_for_propagated,
wait_for_sent,
wait_for_error,
)
logger = get_custom_logger(__name__)
PROPAGATED_TIMEOUT_S = 30.0
RECOVERY_TIMEOUT_S = 45.0
# MaxTimeInCache from send_service.nim.
MAX_TIME_IN_CACHE_S = 60.0
# Extra slack to cover the background retry loop tick after the window expires.
CACHE_EXPIRY_SLACK_S = 10.0
ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S
RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
# Payload above DefaultMaxWakuMessageSize (150KiB), so the relay publish
# rejects it instead of failing with NO_PEERS_TO_RELAY.
OVERSIZED_PAYLOAD_BYTES = 200 * 1024
ERROR_TIMEOUT_S = 30.0
MESSAGE_SIZE_EXCEEDED_MSG = "Message size exceeded"
# S30: concurrent sends on the same content topic during initial auto-subscribe.
S30_CONCURRENT_SENDS = 5
S30_CONTENT_TOPIC = "/test/1/s30-concurrent/proto"
# S31: concurrent sends across mixed topics during peer churn.
S31_BURST_SIZE = 8
S31_CONTENT_TOPICS = [
"/test/1/s31-topic-a/proto",
"/test/1/s31-topic-b/proto",
"/test/1/s31-topic-c/proto",
"/test/1/s31-topic-d/proto",
"/test/1/s31-topic-e/proto",
"/test/1/s31-topic-f/proto",
"/test/1/s31-topic-g/proto",
"/test/1/s31-topic-h/proto",
]
class TestS12IsolatedSenderNoPeers(StepsCommon):
"""
S12 Isolated sender, no peers.
Sender has relay enabled but zero relay peers and zero lightpush peers.
Expected: send() returns Ok(RequestId), but eventually a message_error
event arrives (no route to propagate).
"""
def test_s12_send_with_no_peers_produces_error(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
message = create_message_bindings(
payload=to_base64("S12 isolated sender payload"),
contentTopic="/test/1/s12-isolated/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
error = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S,
)
assert error is not None, (
f"No message_error event within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s "
f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack) for isolated sender. "
f"Collected events: {sender_collector.events}"
)
assert error["requestId"] == request_id
propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert propagated is None, f"Unexpected message_propagated event for isolated sender: {propagated}"
class TestS21ErrorWhenRetryWindowExpires(StepsCommon):
"""
S21: delivery retry window expires before any valid path recovers.
"""
def test_s21_error_when_retry_window_expires(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
message = create_message_bindings()
send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
# No peer
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S,
)
assert error_event is not None, (
f"No MessageErrorEvent received within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s "
f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack). "
f"Collected events: {sender_collector.events}"
)
logger.info(f"S21 received error event: {error_event}")
assert error_event.get("error") == RETRY_WINDOW_EXPIRED_MSG, (
f"Unexpected error message in message_error event.\n"
f"Expected: {RETRY_WINDOW_EXPIRED_MSG!r}\n"
f"Got: {error_event.get('error')!r}\n"
f"Full event: {error_event}"
)
assert_event_invariants(sender_collector, request_id)
class TestS13RelayHardFailureWithoutFallback(StepsCommon):
"""
S13: relay path is reachable (a relay peer is connected, so the publish
gets past NO_PEERS_TO_RELAY), but the relay publish fails for another
reason. An oversized payload is used so the relay processor rejects the
message immediately. No lightpush fallback is configured.
- Expected: Ok(RequestId), then a message_error event.
"""
def test_s13_relay_hard_failure_without_fallback(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
relay_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender_node)],
"portsShift": 1,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
# A connected relay peer means the publish gets past
# NO_PEERS_TO_RELAY and actually reaches the relay processor.
assert wait_for_connected(sender_collector) is not None, (
f"Sender did not reach Connected/PartiallyConnected. " f"Collected events: {sender_collector.events}"
)
oversized_payload = base64.b64encode(b"x" * OVERSIZED_PAYLOAD_BYTES).decode()
message = create_message_bindings(
payload=oversized_payload,
contentTopic="/test/1/s13-relay-hard-failure/proto",
)
send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId), got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_TIMEOUT_S,
)
assert error_event is not None, (
f"No message_error event within {ERROR_TIMEOUT_S}s from the " f"relay processor. Collected events: {sender_collector.events}"
)
assert error_event["requestId"] == request_id
assert MESSAGE_SIZE_EXCEEDED_MSG in (error_event.get("error") or ""), (
f"Expected error to contain {MESSAGE_SIZE_EXCEEDED_MSG!r}.\n" f"Got: {error_event.get('error')!r}\n" f"Full event: {error_event}"
)
propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert propagated is None, f"Unexpected message_propagated event for a failed relay publish: {propagated}"
assert_event_invariants(sender_collector, request_id)
class TestS30ConcurrentSendsDuringAutoSubscribe(StepsCommon):
"""
S30: concurrent sends on the same content topic during initial auto-subscribe.
- Sender starts unsubscribed to the target topic.
- Several send() calls are issued at nearly the same time.
- Each call must return Ok(RequestId) with a unique id.
- Each request id must get its own propagated event,
with no dropped or cross-associated events.
"""
def test_s30_concurrent_sends_during_auto_subscribe(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
# Relay peer so the sender has a propagation path.
relay_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender_node)],
"portsShift": 1,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
# Build one message per send, with distinct payloads so we can
# detect any cross-association between request ids and events.
messages = [
create_message_bindings(
contentTopic=S30_CONTENT_TOPIC,
payload=to_base64(f"s30-concurrent-{i}"),
)
for i in range(S30_CONCURRENT_SENDS)
]
# Fire all sends concurrently. The sender is not yet subscribed
# to S30_CONTENT_TOPIC, so this exercises the auto-subscribe path
# under contention.
with ThreadPoolExecutor(max_workers=S30_CONCURRENT_SENDS) as pool:
send_results = list(pool.map(sender_node.send_message, messages))
# Every send must return Ok(RequestId).
request_ids = []
for i, send_result in enumerate(send_results):
assert send_result.is_ok(), f"Concurrent send #{i} failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, f"Concurrent send #{i} returned an empty RequestId"
request_ids.append(request_id)
# Request ids must be unique across concurrent sends.
assert len(set(request_ids)) == len(request_ids), f"Duplicate RequestIds returned by concurrent sends: {request_ids}"
# Each request id must get its own propagated event and no error.
for request_id in request_ids:
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent for request_id={request_id} "
f"within {PROPAGATED_TIMEOUT_S}s. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error for request_id={request_id}: {error_event}"
# Cross-association guard: every event with a requestId must
# belong to exactly one of the request ids we issued.
issued = set(request_ids)
for event in sender_collector.snapshot():
event_request_id = event.get("requestId")
if event_request_id is None:
continue
assert event_request_id in issued, (
f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"
)
# Per-request invariants apply to every concurrent send
# (correct requestId, no duplicate terminal events,
# Sent never before Propagated).
for request_id in request_ids:
assert_event_invariants(sender_collector, request_id)
class TestS31ConcurrentSendsMixedTopicsDuringChurn(StepsStore):
"""
S31: concurrent sends across mixed content topics during peer churn.
"""
@pytest.mark.docker_required
def test_s31_concurrent_sends_mixed_topics_during_churn(self, node_config):
sender_collector = EventCollector()
relay_peer = WakuNode(NODE_2, f"s31_relay_peer_{self.test_id}")
relay_peer.start(relay="true", discv5_discovery="false")
relay_peer.set_relay_subscriptions([self.test_pubsub_topic])
lightpush_peer = WakuNode(NODE_2, f"s31_lightpush_peer_{self.test_id}")
lightpush_peer.start(relay="true", lightpush="true", discv5_discovery="false")
lightpush_peer.set_relay_subscriptions([self.test_pubsub_topic])
store_peer = WakuNode(NODE_2, f"s31_store_peer_{self.test_id}")
store_peer.start(relay="true", store="true", discv5_discovery="false")
store_peer.set_relay_subscriptions([self.test_pubsub_topic])
churn_peers = [relay_peer, lightpush_peer, store_peer]
# Mesh docker peers so a lightpushed message can fan out to the store peer.
peer_multiaddrs = [p.get_multiaddr_with_id() for p in churn_peers]
for peer in churn_peers:
others = [a for a in peer_multiaddrs if a != peer.get_multiaddr_with_id()]
peer.add_peers(others)
node_config.update(
{
"mode": "Edge",
"relay": True,
"lightpush": True,
"store": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"lightpushnode": lightpush_peer.get_multiaddr_with_id(),
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
sender_multiaddr = get_node_multiaddr(sender_node)
for peer in churn_peers:
peer.add_peers([sender_multiaddr])
delay(3) # let docker peers connect to the sender
all_request_ids: list[str] = []
phase1_ids = self._s31_fire_burst(sender_node, phase_label="phase1")
all_request_ids.extend(phase1_ids)
for peer in churn_peers:
peer.restart()
delay(1) # small window so the restart is actually in-flight
phase2_ids = self._s31_fire_burst(sender_node, phase_label="phase2")
all_request_ids.extend(phase2_ids)
# Wait for all peers to be ready again and re-attach the sender.
for peer in churn_peers:
peer.ensure_ready(timeout_duration=20)
peer.add_peers([sender_multiaddr])
peer_multiaddrs = [p.get_multiaddr_with_id() for p in churn_peers]
for peer in churn_peers:
others = [a for a in peer_multiaddrs if a != peer.get_multiaddr_with_id()]
peer.add_peers(others)
delay(3)
phase3_ids = self._s31_fire_burst(sender_node, phase_label="phase3")
all_request_ids.extend(phase3_ids)
assert len(set(all_request_ids)) == len(all_request_ids), f"Duplicate RequestIds across bursts: {all_request_ids}"
# Phase 1 ran before any churn, so the mesh was stable — standard timeout.
# Phase 3 ran right after restart + re-attach, so the mesh needed to
# re-stabilize — use the recovery timeout to avoid CI flakiness.
phase_timeouts = [
(phase1_ids, PROPAGATED_TIMEOUT_S),
(phase3_ids, RECOVERY_TIMEOUT_S),
]
for request_ids, timeout_s in phase_timeouts:
for request_id in request_ids:
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=timeout_s,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent for stable-phase "
f"request_id={request_id} within {timeout_s}s. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error event for stable-phase " f"request_id={request_id}: {error_event}"
for request_id in phase2_ids:
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected terminal message_error for phase-2 " f"request_id={request_id} after recovery: {error_event}"
issued = set(all_request_ids)
for event in sender_collector.snapshot():
event_request_id = event.get("requestId")
if event_request_id is None:
continue
assert event_request_id in issued, (
f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"
)
# Use the hash the wrapper emitted on message_sent so the store
# lookup matches the exact bytes that were actually published.
phase3_hashes = []
for request_id in phase3_ids:
sent_event = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=RECOVERY_TIMEOUT_S,
)
assert sent_event is not None, (
f"No message_sent event for phase-3 request_id={request_id} "
f"within {RECOVERY_TIMEOUT_S}s. Collected events: {sender_collector.events}"
)
msg_hash = sent_event.get("messageHash")
assert msg_hash, f"message_sent event missing messageHash: {sent_event}"
phase3_hashes.append(msg_hash)
# 3 phases × S31_BURST_SIZE messages, so the page must fit them all,
# otherwise phase-3 hashes (which sort last in ascending order) get cut off.
self.check_sent_message_is_stored(
expected_hashes=phase3_hashes,
store_node=store_peer,
pubsub_topic=self.test_pubsub_topic,
page_size=S31_BURST_SIZE * 3,
ascending="true",
)
# Per-request invariants apply across all phases, including the
# retry-path bursts (phase 2). If retries ever emit duplicate
# Propagated events or reorder Sent before Propagated, this catches it.
for request_id in all_request_ids:
assert_event_invariants(sender_collector, request_id)
def _s31_fire_burst(self, sender_node, *, phase_label: str) -> list[str]:
"""Fire S31_BURST_SIZE concurrent sends, one per topic in S31_CONTENT_TOPICS.
Returns the list of RequestIds. Asserts every send returned Ok."""
messages = [
self.create_message(
contentTopic=S31_CONTENT_TOPICS[i],
payload=to_base64(f"s31-{phase_label}-{i}"),
)
for i in range(S31_BURST_SIZE)
]
with ThreadPoolExecutor(max_workers=S31_BURST_SIZE) as pool:
send_results = list(pool.map(sender_node.send_message, messages))
request_ids = []
for i, send_result in enumerate(send_results):
assert send_result.is_ok(), f"{phase_label}: concurrent send #{i} failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, f"{phase_label}: concurrent send #{i} returned an empty RequestId"
request_ids.append(request_id)
return request_ids

View File

@ -0,0 +1,394 @@
import json
import subprocess
import sys
import pytest
from src.steps.common import StepsCommon
from src.libs.common import to_base64
from src.libs.custom_logger import get_custom_logger
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import (
EventCollector,
assert_event_invariants,
create_message_bindings,
get_node_multiaddr,
wait_for_connected,
wait_for_propagated,
wait_for_sent,
wait_for_error,
)
logger = get_custom_logger(__name__)
PROPAGATED_TIMEOUT_S = 30.0
S01_EXPECTED_ERROR_FRAGMENT = "not initialized"
# Destroyed-handle path fails synchronously in the C layer (no callback),
# so the binding surfaces a different string than the nil-handle path.
S01_DESTROYED_HANDLE_ERROR_FRAGMENT = "immediate call failed"
S01_SUBPROCESS_TIMEOUT_S = 30
S01_RESULT_MARKER = "__S01_RESULT__"
SEND_AFTER_DESTROY_RESULT_MARKER = "__SEND_AFTER_DESTROY_RESULT__"
SEND_AFTER_DESTROY_SUBPROCESS_TIMEOUT_S = 60
S01_INVALID_HANDLE_HELPER = "tests.wrappers_tests.helpers.send_invalid_handle"
# S05: malformed content topics
S05_EXPECTED_ERROR_FRAGMENT = "Failed to auto-subscribe"
S05_MALFORMED_CONTENT_TOPICS = [
# No leading slash — parser rejects with "must start with slash".
("s05-invalid-no-leading-slash", "no-leading-slash"),
# Empty string — parser rejects empty content topic.
("", "empty"),
# Only 3 segments — content topics need /app/version/name/encoding.
("/app/1/name", "missing-encoding-segment"),
# Empty middle segment between slashes.
("/app//name/proto", "empty-middle-segment"),
]
class TestS01NilOrUninitializedHandle(StepsCommon):
"""S01 — send() on a nil/destroyed handle must Err, no events, no crash."""
@pytest.mark.skip(reason="see https://github.com/logos-messaging/logos-delivery/issues/3873")
def test_s01_send_on_uninitialized_handle(self):
completed = subprocess.run(
[sys.executable, "-m", S01_INVALID_HANDLE_HELPER, "nil", S01_RESULT_MARKER],
capture_output=True,
text=True,
timeout=S01_SUBPROCESS_TIMEOUT_S,
)
assert completed.returncode == 0, (
f"send() crashed on a nil handle (returncode={completed.returncode}). " f"stdout={completed.stdout!r} stderr={completed.stderr!r}"
)
result_line = next(
(l for l in completed.stdout.splitlines() if l.startswith(S01_RESULT_MARKER)),
None,
)
assert result_line, f"missing result marker. stdout={completed.stdout!r} stderr={completed.stderr!r}"
result = json.loads(result_line[len(S01_RESULT_MARKER) :])
assert result["is_ok"] is False, f"expected Err, got Ok({result['ok']!r})"
assert S01_EXPECTED_ERROR_FRAGMENT in (
result["err"] or ""
), f"expected error to mention {S01_EXPECTED_ERROR_FRAGMENT!r}, got: {result['err']!r}"
@pytest.mark.skip(reason="see https://github.com/logos-messaging/logos-delivery/issues/3863")
def test_s01_send_on_destroyed_handle(self):
completed = subprocess.run(
[sys.executable, "-m", S01_INVALID_HANDLE_HELPER, "destroyed", SEND_AFTER_DESTROY_RESULT_MARKER],
capture_output=True,
text=True,
timeout=SEND_AFTER_DESTROY_SUBPROCESS_TIMEOUT_S,
)
assert completed.returncode == 0, (
f"send() crashed on a destroyed handle (returncode={completed.returncode}). " f"stdout={completed.stdout!r} stderr={completed.stderr!r}"
)
result_line = next(
(l for l in completed.stdout.splitlines() if l.startswith(SEND_AFTER_DESTROY_RESULT_MARKER)),
None,
)
assert result_line, f"missing result marker. stdout={completed.stdout!r} stderr={completed.stderr!r}"
result = json.loads(result_line[len(SEND_AFTER_DESTROY_RESULT_MARKER) :])
assert result["stage"] == "send_message", f"setup failed at stage {result['stage']!r}: {result['err']!r}"
assert result["is_ok"] is False, f"expected Err, got Ok({result['ok']!r})"
err_msg = result["err"] or ""
assert S01_EXPECTED_ERROR_FRAGMENT in err_msg or S01_DESTROYED_HANDLE_ERROR_FRAGMENT in err_msg, (
f"expected error to mention {S01_EXPECTED_ERROR_FRAGMENT!r} " f"or {S01_DESTROYED_HANDLE_ERROR_FRAGMENT!r}, got: {result['err']!r}"
)
assert result["events_after_send"] == [], f"expected no events after send(), got: {result['events_after_send']}"
class TestS02AutoSubscribeOnFirstSend(StepsCommon):
"""
S02 Auto-subscribe on first send.
Sender never calls subscribe_content_topic() before send().
The send API must auto-subscribe to the content topic used in the message.
Expected: send() returns Ok(RequestId), message_propagated arrives.
"""
def test_s02_send_without_explicit_subscribe(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S02 auto-subscribe test payload"),
contentTopic="/test/1/s02-auto-subscribe/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
class TestS03SendOnAlreadySubscribedTopic(StepsCommon):
"""
S03 Send on already-subscribed content topic.
Sender explicitly calls subscribe_content_topic() before send().
The send path must behave identically to the auto-subscribe case:
Propagated arrives, no Sent (store disabled), no Error.
Topology mirrors S06 (relay-only sender + relay peer, no store).
Purpose: proves the send path is identical when auto-subscription is skipped.
"""
def test_s03_send_on_already_subscribed_content_topic(self, node_config):
sender_collector = EventCollector()
content_topic = "/test/1/s03-already-subscribed/proto"
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
# Explicit subscribe before send — this is what S03 is about.
# The send path must still return Ok(RequestId) and emit the
# same events as the auto-subscribe topology in S06.
subscribe_result = sender.subscribe_content_topic(content_topic)
assert subscribe_result.is_ok(), f"subscribe_content_topic failed: {subscribe_result.err()}"
message = create_message_bindings(
payload=to_base64("S03 already-subscribed test payload"),
contentTopic=content_topic,
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
assert_event_invariants(sender_collector, request_id)
class TestS04UnsubscribeThenSendSameTopic(StepsCommon):
"""
S04 Unsubscribe, then send the same content topic again.
Sender subscribes to topic A, unsubscribes from A, then sends on A.
The send path must re-establish topic interest and deliver normally.
Topology mirrors S06 (relay-only sender + relay peer, no store).
Expected: send() returns Ok(RequestId), Propagated arrives,
no Sent (store disabled), no Error.
Purpose: verifies send() re-establishes topic interest after local unsubscribe.
"""
def test_s04_unsubscribe_then_send_same_content_topic(self, node_config):
sender_collector = EventCollector()
content_topic = "/test/1/s04-unsubscribe-resend/proto"
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
# subscribe -> unsubscribe -> send: send() must re-establish
# topic interest internally.
subscribe_result = sender.subscribe_content_topic(content_topic)
assert subscribe_result.is_ok(), f"subscribe_content_topic failed: {subscribe_result.err()}"
unsubscribe_result = sender.unsubscribe_content_topic(content_topic)
assert unsubscribe_result.is_ok(), f"unsubscribe_content_topic failed: {unsubscribe_result.err()}"
message = create_message_bindings(
payload=to_base64("S04 unsubscribe-then-send test payload"),
contentTopic=content_topic,
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed after unsubscribe: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s "
f"after unsubscribe + send. Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
assert_event_invariants(sender_collector, request_id)
class TestS05AutoSubscribeFailureBeforeTaskCreation(StepsCommon):
"""
S05 Auto-subscribe failure before task creation.
Sender is initialized but auto-subscription is forced to fail by using a
malformed content topic that breaks shard resolution inside
SubscriptionManager.subscribe().
Expected: send() returns Err with an "auto-subscribe" message, no events.
Purpose: covers the last synchronous error path before request ID creation,
across several distinct validator branches.
"""
@pytest.mark.parametrize(
"content_topic",
[topic for topic, _ in S05_MALFORMED_CONTENT_TOPICS],
ids=[case_id for _, case_id in S05_MALFORMED_CONTENT_TOPICS],
)
def test_s05_send_fails_when_auto_subscribe_fails(self, node_config, content_topic):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
# Malformed content topic — SubscriptionManager.subscribe() cannot
# resolve it to a shard, so auto-subscribe inside send() fails.
message = create_message_bindings(
payload=to_base64("S05 auto-subscribe failure payload"),
contentTopic=content_topic,
)
send_result = sender.send_message(message=message)
assert send_result.is_err(), (
f"send() must return Err when auto-subscribe fails for " f"content_topic={content_topic!r}, got Ok({send_result.ok_value!r})"
)
error_message = send_result.err() or ""
assert S05_EXPECTED_ERROR_FRAGMENT in error_message, (
f"expected error to mention {S05_EXPECTED_ERROR_FRAGMENT!r} " f"for content_topic={content_topic!r}, got: {error_message!r}"
)
# No request id was created, so no events should be emitted.
assert sender_collector.events == [] or all(
event.get("eventType") == "connection_status_change" for event in sender_collector.events
), f"Unexpected events after a pre-task-creation failure: {sender_collector.events}"

View File

@ -1,253 +1,36 @@
import base64
import pytest
from src.env_vars import NODE_1
from src.steps.common import StepsCommon
from src.libs.common import delay, to_base64
from src.libs.custom_logger import get_custom_logger
from src.node.waku_node import WakuNode
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import (
EventCollector,
assert_event_invariants,
create_message_bindings,
get_node_multiaddr,
wait_for_connected,
wait_for_propagated,
wait_for_sent,
wait_for_error,
)
from tests.wrappers_tests.conftest import build_node_config
from src.test_data import DEFAULT_CLUSTER_ID
from tests.wrappers_tests.conftest import build_node_config, free_port
logger = get_custom_logger(__name__)
PROPAGATED_TIMEOUT_S = 30.0
SENT_TIMEOUT_S = 10.0
NO_SENT_OBSERVATION_S = 5.0
SENT_AFTER_STORE_TIMEOUT_S = 60.0
OVERSIZED_PAYLOAD_BYTES = 200 * 1024
RECOVERY_TIMEOUT_S = 45.0
SERVICE_DOWN_SETTLE_S = 3.0
# MaxTimeInCache from send_service.nim.
MAX_TIME_IN_CACHE_S = 60.0
# Extra slack to cover the background retry loop tick after the window expires.
CACHE_EXPIRY_SLACK_S = 10.0
ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S
RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
class TestS02AutoSubscribeOnFirstSend(StepsCommon):
"""
S02 Auto-subscribe on first send.
Sender never calls subscribe_content_topic() before send().
The send API must auto-subscribe to the content topic used in the message.
Expected: send() returns Ok(RequestId), message_propagated arrives.
"""
def test_s02_send_without_explicit_subscribe(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S02 auto-subscribe test payload"),
contentTopic="/test/1/s02-auto-subscribe/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
class TestS06CoreSenderRelayOnly(StepsCommon):
"""
S06 Core sender with relay peers only, no store.
Sender has local relay enabled and is connected to one relay peer.
Expected: send() returns Ok(RequestId), message_propagated event arrives,
no message_sent (store disabled), no message_error.
"""
def test_s06_relay_propagation_without_store(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S06 relay-only test payload"),
contentTopic="/test/1/s06-relay-only/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
assert_event_invariants(sender_collector, request_id)
class TestS07CoreSenderRelayAndStore(StepsCommon):
"""
S07 Core sender with relay peers and store peer, reliability enabled.
Sender relays message to a store-capable peer; delivery service validates
the message reached the store via p2p reliability check.
Expected: Propagated, then Sent.
"""
def test_s07_relay_propagation_with_store_validation(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
"store": True,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start store peer: {peer_result.err()}"
with peer_result.ok_value:
message = create_message_bindings(
payload=to_base64("S07 relay+store test payload"),
contentTopic="/test/1/s07-relay-store/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
sent = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=SENT_TIMEOUT_S,
)
assert sent is not None, (
f"No message_sent event within {SENT_TIMEOUT_S}s after propagation. " f"Collected events: {sender_collector.events}"
)
assert sent["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
assert_event_invariants(sender_collector, request_id)
# Default-cluster shard-0 pubsub topic; used to subscribe the S11 docker store
# peer so it joins the same relay mesh as the wrapper nodes (wrapper config
# uses numShardsInNetwork=1 => shard 0).
STORE_PEER_PUBSUB_TOPIC = f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0"
class TestRelayToLightpushFallback(StepsCommon):
@ -417,7 +200,7 @@ class TestS10EdgeSenderLightpushOnly(StepsCommon):
"""
@pytest.mark.xfail(reason="lightpush peer discovery via staticnodes is broken, see https://github.com/logos-messaging/logos-delivery/issues/3847")
def test_s10_edge_lightpush_propagation(self, node_config):
def test_s10_edge_lightpush_propagation(self):
sender_collector = EventCollector()
common = {
@ -488,60 +271,110 @@ class TestS10EdgeSenderLightpushOnly(StepsCommon):
assert_event_invariants(sender_collector, request_id)
class TestS12IsolatedSenderNoPeers(StepsCommon):
class TestS11EdgeSenderLightpushAndStore(StepsCommon):
"""
S12 Isolated sender, no peers.
Sender has relay enabled but zero relay peers and zero lightpush peers.
Expected: send() returns Ok(RequestId), but eventually a message_error
event arrives (no route to propagate).
S11 Edge sender with lightpush path and store validation.
Edge sender has no local relay; it publishes via a wrapper lightpush peer
and validates delivery via a docker store peer. Reliability enabled.
Topology:
[LightpushPeer] wrapper, relay=True, lightpush=True, store=False
[StorePeer] docker WakuNode, relay=true, store=true,
dials the lightpush peer via add_peers and subscribes
to the same shard-0 pubsub topic so it joins the
relay mesh and archives propagated messages.
[Edge] wrapper, mode="Edge",
staticnodes=[lightpush_peer],
storenode=store_peer,
reliabilityEnabled=True
Expected: send() returns Ok(RequestId), Propagated arrives, then Sent
(store validation succeeds), no Error.
Purpose: edge-mode fully validated success path against a real docker
store node (cross-implementation check).
"""
def test_s12_send_with_no_peers_produces_error(self, node_config):
@pytest.mark.docker_required
def test_s11_edge_lightpush_with_store_validation(self):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
common = {
"filter": False,
"discv5Discovery": True,
"numShardsInNetwork": 1,
}
lightpush_config = build_node_config(
relay=True,
lightpush=True,
store=False,
**common,
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
lightpush_result = WrapperManager.create_and_start(config=lightpush_config)
assert lightpush_result.is_ok(), f"Failed to start lightpush peer: {lightpush_result.err()}"
with sender_result.ok_value as sender:
message = create_message_bindings(
payload=to_base64("S12 isolated sender payload"),
contentTopic="/test/1/s12-isolated/proto",
with lightpush_result.ok_value as lightpush_peer:
lightpush_multiaddr = get_node_multiaddr(lightpush_peer)
# Docker store peer — real nwaku node running as the store backend.
# Dial the lightpush peer via add_peers and subscribe to the same
# shard-0 pubsub topic so it joins the relay mesh and archives
# messages propagated by the lightpush peer.
store_peer = WakuNode(NODE_1, f"s11_store_{self.test_id}")
store_peer.start(relay="true", store="true")
self.add_node_peer(store_peer, [lightpush_multiaddr])
store_peer.set_relay_subscriptions([STORE_PEER_PUBSUB_TOPIC])
store_multiaddr = store_peer.get_multiaddr_with_id()
edge_config = build_node_config(
mode="Edge",
# assuming disc5v already happened
staticnodes=[lightpush_multiaddr, store_multiaddr],
reliabilityEnabled=True,
**common,
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
error = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S,
edge_result = WrapperManager.create_and_start(
config=edge_config,
event_cb=sender_collector.event_callback,
)
assert error is not None, (
f"No message_error event within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s "
f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack) for isolated sender. "
f"Collected events: {sender_collector.events}"
)
assert error["requestId"] == request_id
assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"
propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert propagated is None, f"Unexpected message_propagated event for isolated sender: {propagated}"
with edge_result.ok_value as edge_sender:
message = create_message_bindings(
payload=to_base64("S11 edge lightpush + store test payload"),
contentTopic="/test/1/s11-edge-lightpush-store/proto",
)
send_result = edge_sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
sent = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=SENT_AFTER_STORE_TIMEOUT_S,
)
assert sent is not None, (
f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " f"after propagation. Collected events: {sender_collector.events}"
)
assert sent["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
assert_event_invariants(sender_collector, request_id)
class TestS14LightpushNonRetryableError(StepsCommon):
@ -698,3 +531,483 @@ class TestS15LightpushRetryableErrorRecovery(StepsCommon):
assert error is None, f"Unexpected message_error after recovery: {error}"
assert_event_invariants(sender_collector, request_id)
class TestS16LightpushPeerAppearsLater(StepsCommon):
"""
S16 No delivery peers at T0, lightpush peer appears later.
The edge sender has the lightpush service in its staticnodes, but the
service is stopped before the sender starts, so there is no reachable
delivery peer at T0. send() is called while the service is down. The
service is restarted during the retry window; the sender connects to it
and a later retry delivers the message.
Expected: send() returns Ok(RequestId), then eventually Propagated.
"""
@pytest.mark.xfail(reason="binding cannot restart a node or add peers at runtime")
def test_s16_lightpush_peer_appears_later(self):
sender_collector = EventCollector()
common = {
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
# Start the lightpush service once to obtain its multiaddr, then stop
# it so the sender has no reachable peer at T0. The same node object
# is restarted later, so the address stays valid.
service_config = build_node_config(relay=True, lightpush=True, **common)
service_result = WrapperManager.create_and_start(config=service_config)
assert service_result.is_ok(), f"Failed to start lightpush peer: {service_result.err()}"
with service_result.ok_value as service:
service_multiaddr = get_node_multiaddr(service)
stop_result = service.stop_node()
assert stop_result.is_ok(), f"Failed to stop lightpush peer: {stop_result.err()}"
delay(SERVICE_DOWN_SETTLE_S)
# Edge sender is a lightpush client; its only peer is the service,
# which is currently down.
edge_config = build_node_config(
mode="Edge",
lightpush=True,
staticnodes=[service_multiaddr],
**common,
)
edge_result = WrapperManager.create_and_start(
config=edge_config,
event_cb=sender_collector.event_callback,
)
assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"
with edge_result.ok_value as edge_sender:
# send() is invoked while the service is down.
msg = create_message_bindings(
payload=to_base64("S16 lightpush peer appears later"),
contentTopic="/test/1/s16-late-lightpush/proto",
)
send_result = edge_sender.send_message(message=msg)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
delay(SERVICE_DOWN_SETTLE_S)
early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert early_propagated is None, f"message_propagated arrived before the lightpush peer was reachable: {early_propagated}"
# The lightpush peer comes back during the retry window.
restart_result = service.start_node()
assert restart_result.is_ok(), f"Failed to restart lightpush peer: {restart_result.err()}"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=RECOVERY_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated within {RECOVERY_TIMEOUT_S}s "
f"after the lightpush peer joined. "
f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error after recovery: {error}"
assert_event_invariants(sender_collector, request_id)
class TestS26LightpushPeerChurn(StepsCommon):
"""
S26: multiple lightpush peers, the selected one disappears,
an alternate remains.
Topology (3 peers + sender):
- peer1: relay + lightpush. The lightpush server initially selected
by the sender. Stopped mid-test to simulate churn.
- relay_peer: relay-only. Kept alive throughout the test as a
stable gossipsub mesh neighbour, so that after peer1 disappears
peer2 still has a relay path to propagate the message.
- peer2: relay + lightpush. The surviving lightpush server that
must take over once peer1 is gone.
- sender: edge node with peer1 and peer2 as static lightpush peers.
"""
def test_s26_lightpush_peer_churn_alternate_remains(self, node_config):
sender_collector = EventCollector()
peer1_config = {
**node_config,
"relay": True,
"lightpush": True,
"store": False,
"filter": False,
"discv5Discovery": True,
"numShardsInNetwork": 1,
"portsShift": 1,
"discv5UdpPort": free_port(),
}
peer1_result = WrapperManager.create_and_start(config=peer1_config)
assert peer1_result.is_ok(), f"Failed to start lightpush peer1: {peer1_result.err()}"
peer1 = peer1_result.ok_value
relay_config = {
**node_config,
"relay": True,
"lightpush": False,
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"portsShift": 4,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value as relay_peer:
peer2_config = {
**peer1_config,
"staticnodes": [
get_node_multiaddr(peer1),
get_node_multiaddr(relay_peer),
],
"portsShift": 2,
"discv5UdpPort": free_port(),
}
peer2_result = WrapperManager.create_and_start(config=peer2_config)
assert peer2_result.is_ok(), f"Failed to start lightpush peer2: {peer2_result.err()}"
with peer2_result.ok_value as peer2:
sender_config = {
**node_config,
"mode": "Edge",
"relay": True,
"lightpush": True,
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"portsShift": 3,
"staticnodes": [
get_node_multiaddr(peer1),
get_node_multiaddr(peer2),
],
}
sender_result = WrapperManager.create_and_start(
config=sender_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
delay(2)
stop_result = peer1.stop_and_destroy()
assert stop_result.is_ok(), f"Failed to stop peer1: {stop_result.err()}"
delay(2)
message = create_message_bindings()
send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) during peer churn, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
# Expect Propagated via the surviving lightpush peer (peer2).
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent within {PROPAGATED_TIMEOUT_S}s "
f"after the selected lightpush peer disappeared. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error event during peer churn: {error_event}"
assert_event_invariants(sender_collector, request_id)
class TestS18StagedTopologyReformation(StepsCommon):
"""
S18: relay absent at T0, lightpush absent at T0, both appear in stages.
The sender has relay and lightpush enabled but starts fully isolated:
no relay peers and no lightpush peers. send() is called before any
peer exists. Peers then appear one stage at a time. The message must
succeed as soon as any valid path becomes available.
Two orderings are covered, one per test method:
- lightpush appears first, then relay.
- relay appears first, then lightpush.
Both prove the send service recovers from arbitrary staged topology
reformation.
Common topology per stage:
[Sender] relay=True, lightpush=True, isolated at T0.
[LightpushPeer] relay=True, lightpush=True, staticnodes=[sender].
[RelayPeer] relay=True, staticnodes=[sender].
"""
# Common config shared by the sender and both staged peers.
_COMMON = {
"relay": True,
"lightpush": True,
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
def test_s18_lightpush_first_then_relay(self, node_config):
"""Sender isolated at T0; lightpush peer appears, then relay peer."""
sender_collector = EventCollector()
node_config.update(self._COMMON)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
# send() before any peer exists must still return Ok(RequestId).
message = create_message_bindings(
payload=to_base64("S18 lightpush-first staged recovery"),
contentTopic="/test/1/s18-lightpush-first/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) while isolated, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
# No peers yet: the message must not propagate.
delay(SERVICE_DOWN_SETTLE_S)
early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert early_propagated is None, f"message_propagated arrived before any peer joined: {early_propagated}"
# Stage 1: lightpush peer appears.
lightpush_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
lightpush_result = WrapperManager.create_and_start(config=lightpush_config)
assert lightpush_result.is_ok(), f"Failed to start lightpush peer: {lightpush_result.err()}"
with lightpush_result.ok_value:
# Stage 2: relay peer appears.
relay_config = {
**node_config,
"lightpush": False,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 2,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
# The message must succeed once any valid path is available.
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=RECOVERY_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated within {RECOVERY_TIMEOUT_S}s "
f"after peers appeared in stages. "
f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error after staged recovery: {error}"
assert_event_invariants(sender_collector, request_id)
def test_s18_relay_first_then_lightpush(self, node_config):
"""Sender isolated at T0; relay peer appears, then lightpush peer."""
sender_collector = EventCollector()
node_config.update(self._COMMON)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
# send() before any peer exists must still return Ok(RequestId).
message = create_message_bindings(
payload=to_base64("S18 relay-first staged recovery"),
contentTopic="/test/1/s18-relay-first/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) while isolated, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
# No peers yet: the message must not propagate.
delay(SERVICE_DOWN_SETTLE_S)
early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert early_propagated is None, f"message_propagated arrived before any peer joined: {early_propagated}"
# Stage 1: relay peer appears.
relay_config = {
**node_config,
"lightpush": False,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
# Stage 2: lightpush peer appears.
lightpush_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 2,
}
lightpush_result = WrapperManager.create_and_start(config=lightpush_config)
assert lightpush_result.is_ok(), f"Failed to start lightpush peer: {lightpush_result.err()}"
with lightpush_result.ok_value:
# The message must succeed once any valid path is available.
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=RECOVERY_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated within {RECOVERY_TIMEOUT_S}s "
f"after peers appeared in stages. "
f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error after staged recovery: {error}"
assert_event_invariants(sender_collector, request_id)
class TestS25EphemeralLightpushWithStore(StepsCommon):
"""
S25 Ephemeral message over lightpush with a reachable store peer.
The sender is an Edge node, so it has no local relay and publishes only
via lightpush. A docker store peer is reachable and joined to the
lightpush peer's relay mesh, so the message does reach a store.
Because ephemeral messages are never store-validated, the expected result
is Propagated only no Sent even though the store peer is reachable.
This is the lightpush-transport counterpart of S24 and proves the
ephemeral rule is transport-independent.
Topology mirrors S11:
[LightpushPeer] wrapper, relay=True, lightpush=True, store=False
[StorePeer] docker WakuNode, relay=true, store=true, joined to
the lightpush peer's shard-0 relay mesh
[Edge] wrapper, mode="Edge", reliabilityEnabled=True,
staticnodes=[lightpush_peer, store_peer]
"""
@pytest.mark.docker_required
def test_s25_ephemeral_lightpush_with_store(self):
sender_collector = EventCollector()
common = {
"numShardsInNetwork": 1,
}
lightpush_config = build_node_config(
relay=True,
lightpush=True,
**common,
)
lightpush_result = WrapperManager.create_and_start(config=lightpush_config)
assert lightpush_result.is_ok(), f"Failed to start lightpush peer: {lightpush_result.err()}"
with lightpush_result.ok_value as lightpush_peer:
lightpush_multiaddr = get_node_multiaddr(lightpush_peer)
# Docker store peer joined to the lightpush peer's shard-0 relay
# mesh, so messages propagated by the lightpush peer are archived.
store_peer = WakuNode(NODE_1, f"s25_store_{self.test_id}")
store_peer.start(relay="true", store="true")
self.add_node_peer(store_peer, [lightpush_multiaddr])
store_peer.set_relay_subscriptions([STORE_PEER_PUBSUB_TOPIC])
store_multiaddr = store_peer.get_multiaddr_with_id()
edge_config = build_node_config(
mode="Edge",
staticnodes=[lightpush_multiaddr, store_multiaddr],
**common,
)
edge_result = WrapperManager.create_and_start(
config=edge_config,
event_cb=sender_collector.event_callback,
)
assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"
with edge_result.ok_value as edge_sender:
message = create_message_bindings(
payload=to_base64("S25 ephemeral lightpush + store payload"),
contentTopic="/test/1/s25-ephemeral-lightpush/proto",
ephemeral=True,
)
send_result = edge_sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
# Ephemeral messages are never store-validated, so no Sent
# event must arrive even though the store peer is reachable.
sent = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=NO_SENT_OBSERVATION_S,
)
assert sent is None, (
f"Unexpected message_sent event for an ephemeral message. "
f"Ephemeral messages must never be store-validated.\n"
f"Sent event: {sent}\n"
f"Collected events: {sender_collector.events}"
)
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
assert_event_invariants(sender_collector, request_id)

View File

@ -1,8 +1,7 @@
from concurrent.futures import ThreadPoolExecutor
import pytest
from src.env_vars import NODE_2
from src.steps.common import StepsCommon
from src.steps.store import StepsStore
from src.libs.common import delay, to_base64
from src.libs.custom_logger import get_custom_logger
from src.node.waku_node import WakuNode
@ -17,55 +16,180 @@ from src.node.wrapper_helpers import (
wait_for_sent,
wait_for_error,
)
from src.steps.store import StepsStore
from src.test_data import CONTENT_TOPICS_DIFFERENT_SHARDS
from tests.wrappers_tests.conftest import free_port
logger = get_custom_logger(__name__)
## max time to wait after sending the message
PROPAGATED_TIMEOUT_S = 30.0
SENT_TIMEOUT_S = 10.0
NO_SENT_OBSERVATION_S = 5.0
SENT_AFTER_STORE_TIMEOUT_S = 60.0
NO_STORE_OBSERVATION_S = 60.0
RECOVERY_TIMEOUT_S = 45.0
# S20 stabilization delays for gossipsub mesh formation.
MESH_STABILIZATION_S = 10
STORE_JOIN_STABILIZATION_S = 10
# MaxTimeInCache from send_service.nim.
MAX_TIME_IN_CACHE_S = 60.0
# Extra slack to cover the background retry loop tick after the window expires.
CACHE_EXPIRY_SLACK_S = 10.0
ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S
RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
# S30: concurrent sends on the same content topic during initial auto-subscribe.
S30_CONCURRENT_SENDS = 5
S30_CONTENT_TOPIC = "/test/1/s30-concurrent/proto"
class TestS06CoreSenderRelayOnly(StepsCommon):
"""
S06 Core sender with relay peers only, no store.
Sender has local relay enabled and is connected to one relay peer.
Expected: send() returns Ok(RequestId), message_propagated event arrives,
no message_sent (store disabled), no message_error.
"""
# S31: concurrent sends across mixed topics during peer churn.
S31_BURST_SIZE = 8
S31_CONTENT_TOPICS = [
"/test/1/s31-topic-a/proto",
"/test/1/s31-topic-b/proto",
"/test/1/s31-topic-c/proto",
"/test/1/s31-topic-d/proto",
"/test/1/s31-topic-e/proto",
"/test/1/s31-topic-f/proto",
"/test/1/s31-topic-g/proto",
"/test/1/s31-topic-h/proto",
]
def test_s06_relay_propagation_without_store(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S06 relay-only test payload"),
contentTopic="/test/1/s06-relay-only/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
assert_event_invariants(sender_collector, request_id)
class TestSendBeforeRelay(StepsStore):
class TestS07CoreSenderRelayAndStore(StepsCommon):
"""
S07 Core sender with relay peers and store peer, reliability enabled.
Sender relays message to a store-capable peer; delivery service validates
the message reached the store via p2p reliability check.
Expected: Propagated, then Sent.
"""
def test_s07_relay_propagation_with_store_validation(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
"store": True,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start store peer: {peer_result.err()}"
with peer_result.ok_value:
message = create_message_bindings(
payload=to_base64("S07 relay+store test payload"),
contentTopic="/test/1/s07-relay-store/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
sent = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=SENT_TIMEOUT_S,
)
assert sent is not None, (
f"No message_sent event within {SENT_TIMEOUT_S}s after propagation. " f"Collected events: {sender_collector.events}"
)
assert sent["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
assert_event_invariants(sender_collector, request_id)
class TestS17SendBeforeRelayPeersJoin(StepsCommon):
"""
S17: sender starts isolated, calls send()
- send() returns Ok(RequestId) immediately
- Propagated event eventually arrives after a relay peer joins
"""
def test_s17_send_before_relay_peers_joins(self, node_config):
"""
S17: sender starts isolated, calls send()
- send() returns Ok(RequestId) immediately
- Propagated event eventually arrives
"""
sender_collector = EventCollector()
node_config.update(
@ -132,15 +256,18 @@ class TestSendBeforeRelay(StepsStore):
assert_event_invariants(sender_collector, request_id)
class TestS19StorePeerAppearsAfterPropagation(StepsStore):
"""
S19: a store peer comes online later.
- send() returns Ok(RequestId) immediately
- Propagated --- relay peer
- Sent when store peer is reachable
"""
@pytest.mark.docker_required
@pytest.mark.xfail(reason="fails to republish after store peer joins mesh see https://github.com/logos-messaging/logos-delivery/issues/3848")
def test_s19_store_peer_appears_after_propagation(self, node_config):
"""
S19: a store peer comes online later.
- send() returns Ok(RequestId) immediately
- Propagated --- relay peer
- Sent when store peer is reachable
"""
sender_collector = EventCollector()
node_config.update({"relay": True, "store": False, "discv5Discovery": False, "numShardsInNetwork": 1, "reliabilityEnabled": True})
@ -228,20 +355,22 @@ class TestSendBeforeRelay(StepsStore):
assert_event_invariants(sender_collector, request_id)
class TestS20StoreMissesInitiallyThenRetrySucceeds(StepsStore):
"""
S20: relay propagation succeeds, the first store query misses
(the store peer is reachable but does not yet have the message),
a later retry republishes through the relay mesh, and the store
peer then archives it.
Covers state flow:
SuccessfullyPropagated -> NextRoundRetry
-> SuccessfullyPropagated -> SuccessfullyValidated
"""
@pytest.mark.docker_required
@pytest.mark.skip(reason="Forcing the miss store round not possible")
def test_s20_store_misses_initially_then_retry_succeeds(self, node_config):
"""
S20: relay propagation succeeds, the first store query misses
(the store peer is reachable but does not yet have the message),
a later retry republishes through the relay mesh, and the store
peer then archives it.
Covers state flow:
SuccessfullyPropagated -> NextRoundRetry
-> SuccessfullyPropagated -> SuccessfullyValidated
"""
sender_collector = EventCollector()
store_node = WakuNode(NODE_2, f"s20_store_node_{self.test_id}")
store_node.start(
@ -350,66 +479,16 @@ class TestSendBeforeRelay(StepsStore):
assert_event_invariants(sender_collector, request_id)
def test_s21_error_when_retry_window_expires(self, node_config):
"""
S21: delivery retry window expires before any valid path recovers.
"""
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
message = create_message_bindings()
send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
# No peer
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S,
)
assert error_event is not None, (
f"No MessageErrorEvent received within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s "
f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack). "
f"Collected events: {sender_collector.events}"
)
logger.info(f"S21 received error event: {error_event}")
assert error_event.get("error") == RETRY_WINDOW_EXPIRED_MSG, (
f"Unexpected error message in message_error event.\n"
f"Expected: {RETRY_WINDOW_EXPIRED_MSG!r}\n"
f"Got: {error_event.get('error')!r}\n"
f"Full event: {error_event}"
)
assert_event_invariants(sender_collector, request_id)
class TestS22NonEphemeralWithReliabilityDisabled(StepsCommon):
"""
S22: non-ephemeral message with reliabilityEnabled disabled.
- propagation path exists ,reliabilityEnabled = false.
- Expected: Ok(RequestId), Propagated event only, no Sent event.
Note: S17 already covers the positive path of this test with reliabilityEnabled=True.
"""
def test_s22_non_ephemeral_message_with_reliability_disabled(self, node_config):
"""
S22: non-ephemeral message with reliabilityEnabled disabled.
- propagation path exists ,reliabilityEnabled = false.
- Expected: Ok(RequestId), Propagated event only, no Sent event.
Note: S17 already covers the positive path of this test with reliabilityEnabled=True.
"""
sender_collector = EventCollector()
node_config.update(
@ -476,11 +555,14 @@ class TestSendBeforeRelay(StepsStore):
assert_event_invariants(sender_collector, request_id)
class TestS23NoSentEventWhenRelayHasNoStore(StepsCommon):
"""
S23: non-ephemeral message, reliability enabled, no store peer ever reachable.
- Expected: Ok(RequestId), Propagated event only, no Sent and no terminal error.
"""
def test_s23_no_sent_event_when_relay_has_no_store(self, node_config):
"""
S23: non-ephemeral message, reliability enabled, no store peer ever reachable.
- Expected: Ok(RequestId), Propagated event only, no Sent and no terminal error.
"""
sender_collector = EventCollector()
node_config.update(
@ -557,13 +639,15 @@ class TestSendBeforeRelay(StepsStore):
assert_event_invariants(sender_collector, request_id)
def test_s24_ephemeral_message_with_reachable_store(self, node_config):
"""
S24: ephemeral message, reliability enabled, reachable store peer.
- Setup: propagation path exists, relay peer has store=True (reachable),
- Expected: Ok(RequestId), Propagated event only, no Sent event.
"""
class TestS24EphemeralMessageWithReachableStore(StepsCommon):
"""
S24: ephemeral message, reliability enabled, reachable store peer.
- Setup: propagation path exists, relay peer has store=True (reachable),
- Expected: Ok(RequestId), Propagated event only, no Sent event.
"""
def test_s24_ephemeral_message_with_reachable_store(self, node_config):
sender_collector = EventCollector()
node_config.update(
@ -624,139 +708,35 @@ class TestSendBeforeRelay(StepsStore):
assert_event_invariants(sender_collector, request_id)
def test_s26_lightpush_peer_churn_alternate_remains(self, node_config):
"""
S26: multiple lightpush peers, the selected one disappears,
an alternate remains.
Topology (3 peers + sender):
- peer1: relay + lightpush. The lightpush server initially selected
by the sender. Stopped mid-test to simulate churn.
- relay_peer: relay-only. Kept alive throughout the test as a
stable gossipsub mesh neighbour, so that after peer1 disappears
peer2 still has a relay path to propagate the message.
- peer2: relay + lightpush. The surviving lightpush server that
must take over once peer1 is gone.
- sender: edge node with peer1 and peer2 as static lightpush peers.
"""
sender_collector = EventCollector()
peer1_config = {
**node_config,
"relay": True,
"lightpush": True,
"store": False,
"filter": False,
"discv5Discovery": True,
"numShardsInNetwork": 1,
"portsShift": 1,
"discv5UdpPort": free_port(),
}
peer1_result = WrapperManager.create_and_start(config=peer1_config)
assert peer1_result.is_ok(), f"Failed to start lightpush peer1: {peer1_result.err()}"
peer1 = peer1_result.ok_value
class TestS29SendOnTopicsMappingToDifferentShards(StepsCommon):
"""
S29 Send on two different content topics that map to different shards.
Sender has a relay peer reachable on shard X and shard Y; topic A maps to
shard X and topic B maps to shard Y. Two independent sends, one per topic.
Expected: both sends return Ok(RequestId), and each request gets its own
message_propagated event following the availability of its own shard.
Purpose: ensures shard derivation and delivery behavior are topic-specific.
"""
relay_config = {
**node_config,
"relay": True,
"lightpush": False,
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"portsShift": 4,
}
# Topic A -> shard 0, Topic B -> shard 1 (per CONTENT_TOPICS_DIFFERENT_SHARDS).
TOPIC_A = CONTENT_TOPICS_DIFFERENT_SHARDS[0]
TOPIC_B = CONTENT_TOPICS_DIFFERENT_SHARDS[1]
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value as relay_peer:
peer2_config = {
**peer1_config,
"staticnodes": [
get_node_multiaddr(peer1),
get_node_multiaddr(relay_peer),
],
"portsShift": 2,
"discv5UdpPort": free_port(),
}
peer2_result = WrapperManager.create_and_start(config=peer2_config)
assert peer2_result.is_ok(), f"Failed to start lightpush peer2: {peer2_result.err()}"
with peer2_result.ok_value as peer2:
sender_config = {
**node_config,
"mode": "Edge",
"relay": True,
"lightpush": True,
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"portsShift": 3,
"staticnodes": [
get_node_multiaddr(peer1),
get_node_multiaddr(peer2),
],
}
sender_result = WrapperManager.create_and_start(
config=sender_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
delay(2)
stop_result = peer1.stop_and_destroy()
assert stop_result.is_ok(), f"Failed to stop peer1: {stop_result.err()}"
delay(2)
message = create_message_bindings()
send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) during peer churn, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
# Expect Propagated via the surviving lightpush peer (peer2).
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent within {PROPAGATED_TIMEOUT_S}s "
f"after the selected lightpush peer disappeared. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error event during peer churn: {error_event}"
assert_event_invariants(sender_collector, request_id)
def test_s30_concurrent_sends_during_auto_subscribe(self, node_config):
"""
S30: concurrent sends on the same content topic during initial auto-subscribe.
- Sender starts unsubscribed to the target topic.
- Several send() calls are issued at nearly the same time.
- Each call must return Ok(RequestId) with a unique id.
- Each request id must get its own propagated event,
with no dropped or cross-associated events.
"""
def test_s29_send_on_topics_mapping_to_different_shards(self, node_config):
sender_collector = EventCollector()
# numShardsInNetwork=8 so the two topics resolve to distinct shards
# (shard 0 and shard 1) instead of being collapsed onto shard 0.
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"numShardsInNetwork": 8,
"reliabilityEnabled": True,
}
)
@ -766,255 +746,65 @@ class TestSendBeforeRelay(StepsStore):
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
# Relay peer so the sender has a propagation path.
relay_config = {
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender_node)],
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with relay_result.ok_value:
# Build one message per send, with distinct payloads so we can
# detect any cross-association between request ids and events.
messages = [
create_message_bindings(
contentTopic=S30_CONTENT_TOPIC,
payload=to_base64(f"s30-concurrent-{i}"),
)
for i in range(S30_CONCURRENT_SENDS)
]
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
# Fire all sends concurrently. The sender is not yet subscribed
# to S30_CONTENT_TOPIC, so this exercises the auto-subscribe path
# under contention.
with ThreadPoolExecutor(max_workers=S30_CONCURRENT_SENDS) as pool:
send_results = list(pool.map(sender_node.send_message, messages))
message_a = create_message_bindings(
payload=to_base64("S29 shard X payload"),
contentTopic=self.TOPIC_A,
)
send_a = sender.send_message(message=message_a)
assert send_a.is_ok(), f"send() on TOPIC_A failed: {send_a.err()}"
request_id_a = send_a.ok_value
assert request_id_a, "send() on TOPIC_A returned an empty RequestId"
# Every send must return Ok(RequestId).
request_ids = []
for i, send_result in enumerate(send_results):
assert send_result.is_ok(), f"Concurrent send #{i} failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, f"Concurrent send #{i} returned an empty RequestId"
request_ids.append(request_id)
# Send on topic B (shard Y).
message_b = create_message_bindings(
payload=to_base64("S29 shard Y payload"),
contentTopic=self.TOPIC_B,
)
send_b = sender.send_message(message=message_b)
assert send_b.is_ok(), f"send() on TOPIC_B failed: {send_b.err()}"
request_id_b = send_b.ok_value
assert request_id_b, "send() on TOPIC_B returned an empty RequestId"
# Request ids must be unique across concurrent sends.
assert len(set(request_ids)) == len(request_ids), f"Duplicate RequestIds returned by concurrent sends: {request_ids}"
assert request_id_a != request_id_b, "Each send must produce a distinct RequestId"
# Each request id must get its own propagated event and no error.
for request_id in request_ids:
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent for request_id={request_id} "
f"within {PROPAGATED_TIMEOUT_S}s. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error for request_id={request_id}: {error_event}"
# Cross-association guard: every event with a requestId must
# belong to exactly one of the request ids we issued.
issued = set(request_ids)
for event in sender_collector.snapshot():
event_request_id = event.get("requestId")
if event_request_id is None:
continue
assert event_request_id in issued, (
f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"
)
# Per-request invariants apply to every concurrent send
# (correct requestId, no duplicate terminal events,
# Sent never before Propagated).
for request_id in request_ids:
assert_event_invariants(sender_collector, request_id)
@pytest.mark.docker_required
def test_s31_concurrent_sends_mixed_topics_during_churn(self, node_config):
"""
S31: concurrent sends across mixed content topics during peer churn.
"""
sender_collector = EventCollector()
relay_peer = WakuNode(NODE_2, f"s31_relay_peer_{self.test_id}")
relay_peer.start(relay="true", discv5_discovery="false")
relay_peer.set_relay_subscriptions([self.test_pubsub_topic])
lightpush_peer = WakuNode(NODE_2, f"s31_lightpush_peer_{self.test_id}")
lightpush_peer.start(relay="true", lightpush="true", discv5_discovery="false")
lightpush_peer.set_relay_subscriptions([self.test_pubsub_topic])
store_peer = WakuNode(NODE_2, f"s31_store_peer_{self.test_id}")
store_peer.start(relay="true", store="true", discv5_discovery="false")
store_peer.set_relay_subscriptions([self.test_pubsub_topic])
churn_peers = [relay_peer, lightpush_peer, store_peer]
# Mesh docker peers so a lightpushed message can fan out to the store peer.
peer_multiaddrs = [p.get_multiaddr_with_id() for p in churn_peers]
for peer in churn_peers:
others = [a for a in peer_multiaddrs if a != peer.get_multiaddr_with_id()]
peer.add_peers(others)
node_config.update(
{
"mode": "Edge",
"relay": True,
"lightpush": True,
"store": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"lightpushnode": lightpush_peer.get_multiaddr_with_id(),
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
sender_multiaddr = get_node_multiaddr(sender_node)
for peer in churn_peers:
peer.add_peers([sender_multiaddr])
delay(3) # let docker peers connect to the sender
all_request_ids: list[str] = []
phase1_ids = self._s31_fire_burst(sender_node, phase_label="phase1")
all_request_ids.extend(phase1_ids)
for peer in churn_peers:
peer.restart()
delay(1) # small window so the restart is actually in-flight
phase2_ids = self._s31_fire_burst(sender_node, phase_label="phase2")
all_request_ids.extend(phase2_ids)
# Wait for all peers to be ready again and re-attach the sender.
for peer in churn_peers:
peer.ensure_ready(timeout_duration=20)
peer.add_peers([sender_multiaddr])
peer_multiaddrs = [p.get_multiaddr_with_id() for p in churn_peers]
for peer in churn_peers:
others = [a for a in peer_multiaddrs if a != peer.get_multiaddr_with_id()]
peer.add_peers(others)
delay(3)
phase3_ids = self._s31_fire_burst(sender_node, phase_label="phase3")
all_request_ids.extend(phase3_ids)
assert len(set(all_request_ids)) == len(all_request_ids), f"Duplicate RequestIds across bursts: {all_request_ids}"
# Phase 1 ran before any churn, so the mesh was stable — standard timeout.
# Phase 3 ran right after restart + re-attach, so the mesh needed to
# re-stabilize — use the recovery timeout to avoid CI flakiness.
phase_timeouts = [
(phase1_ids, PROPAGATED_TIMEOUT_S),
(phase3_ids, RECOVERY_TIMEOUT_S),
]
for request_ids, timeout_s in phase_timeouts:
for request_id in request_ids:
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=timeout_s,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent for stable-phase "
f"request_id={request_id} within {timeout_s}s. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error event for stable-phase " f"request_id={request_id}: {error_event}"
for request_id in phase2_ids:
error_event = wait_for_error(
# Each request propagates over its own shard's mesh independently.
propagated_a = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
request_id=request_id_a,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert error_event is None, f"Unexpected terminal message_error for phase-2 " f"request_id={request_id} after recovery: {error_event}"
issued = set(all_request_ids)
for event in sender_collector.snapshot():
event_request_id = event.get("requestId")
if event_request_id is None:
continue
assert event_request_id in issued, (
f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"
assert propagated_a is not None, (
f"No message_propagated event for TOPIC_A within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated_a["requestId"] == request_id_a
# Use the hash the wrapper emitted on message_sent so the store
# lookup matches the exact bytes that were actually published.
phase3_hashes = []
for request_id in phase3_ids:
sent_event = wait_for_sent(
propagated_b = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=RECOVERY_TIMEOUT_S,
request_id=request_id_b,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert sent_event is not None, (
f"No message_sent event for phase-3 request_id={request_id} "
f"within {RECOVERY_TIMEOUT_S}s. Collected events: {sender_collector.events}"
assert propagated_b is not None, (
f"No message_propagated event for TOPIC_B within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
msg_hash = sent_event.get("messageHash")
assert msg_hash, f"message_sent event missing messageHash: {sent_event}"
phase3_hashes.append(msg_hash)
assert propagated_b["requestId"] == request_id_b
# 3 phases × S31_BURST_SIZE messages, so the page must fit them all,
# otherwise phase-3 hashes (which sort last in ascending order) get cut off.
self.check_sent_message_is_stored(
expected_hashes=phase3_hashes,
store_node=store_peer,
pubsub_topic=self.test_pubsub_topic,
page_size=S31_BURST_SIZE * 3,
ascending="true",
)
# No cross-talk: neither request should produce an error.
for request_id in (request_id_a, request_id_b):
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event for {request_id}: {error}"
# Per-request invariants apply across all phases, including the
# retry-path bursts (phase 2). If retries ever emit duplicate
# Propagated events or reorder Sent before Propagated, this catches it.
for request_id in all_request_ids:
assert_event_invariants(sender_collector, request_id)
def _s31_fire_burst(self, sender_node, *, phase_label: str) -> list[str]:
"""Fire S31_BURST_SIZE concurrent sends, one per topic in S31_CONTENT_TOPICS.
Returns the list of RequestIds. Asserts every send returned Ok."""
messages = [
self.create_message(
contentTopic=S31_CONTENT_TOPICS[i],
payload=to_base64(f"s31-{phase_label}-{i}"),
)
for i in range(S31_BURST_SIZE)
]
with ThreadPoolExecutor(max_workers=S31_BURST_SIZE) as pool:
send_results = list(pool.map(sender_node.send_message, messages))
request_ids = []
for i, send_result in enumerate(send_results):
assert send_result.is_ok(), f"{phase_label}: concurrent send #{i} failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, f"{phase_label}: concurrent send #{i} returned an empty RequestId"
request_ids.append(request_id)
return request_ids
assert_event_invariants(sender_collector, request_id_a)
assert_event_invariants(sender_collector, request_id_b)

View File

@ -0,0 +1,130 @@
import pytest
from src.steps.common import StepsCommon
from src.libs.custom_logger import get_custom_logger
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import (
EventCollector,
create_message_bindings,
enr_udp_port,
get_node_bound_ports,
get_node_multiaddr,
get_node_tcp_port,
wait_for_propagated,
)
from tests.wrappers_tests.conftest import build_node_config, free_port
logger = get_custom_logger(__name__)
PROPAGATED_TIMEOUT_S = 30.0
# The five service ports covered by logos-messaging/logos-delivery#3828:
# (config field, key in MyBoundPorts, config to enable the service, default port)
SERVICE_PORTS = [
("tcpPort", "tcp", {}, 60000),
("discv5UdpPort", "discv5Udp", {"discv5Discovery": True}, 9000),
("websocketPort", "webSocket", {"websocketSupport": True}, 8000),
("restPort", "rest", {"rest": True}, 8645),
("metricsServerPort", "metrics", {"metricsServer": True}, 8008),
]
class TestWrapperAutoPortAllocation(StepsCommon):
"""Corner case: port 0 triggers auto-port allocation.
Tracks logos-messaging/logos-delivery#3828:
- any service port set to 0 gets a free port auto-assigned at bind time
- defaults remain concrete, so auto-port is opt-in via an explicit 0
- bound values are exposed through MyBoundPorts (0 = service disabled)
- the ENR is rebuilt after discv5 startup, so it advertises the
actually-bound UDP port instead of the configured 0
"""
def test_auto_port_starts_node_with_tcp_and_discv5_zero(self):
config = build_node_config(tcpPort=0, discv5UdpPort=0)
result = WrapperManager.create_and_start(config=config)
assert result.is_ok(), f"create_and_start failed: {result.err()}"
with result.ok_value as node:
tcp_port = get_node_tcp_port(node)
assert tcp_port != 0, "multiaddr still reports port 0; auto-port did not happen"
assert get_node_bound_ports(node)["tcp"] == tcp_port, "MyBoundPorts disagrees with the multiaddr port"
def test_auto_port_node_can_propagate_message(self):
# End-to-end: two auto-port nodes exchange a message.
# numShardsInNetwork=1 enables autosharding, required by the send API.
collector = EventCollector()
sender_config = build_node_config(tcpPort=0, discv5UdpPort=0, numShardsInNetwork=1)
sender_result = WrapperManager.create_and_start(config=sender_config, event_cb=collector.event_callback)
assert sender_result.is_ok(), f"sender start failed: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = build_node_config(
tcpPort=0,
discv5UdpPort=0,
numShardsInNetwork=1,
staticnodes=[get_node_multiaddr(sender)],
)
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"peer start failed: {peer_result.err()}"
with peer_result.ok_value:
send_result = sender.send_message(message=create_message_bindings())
assert send_result.is_ok(), f"send failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send returned empty RequestId"
propagated = wait_for_propagated(collector, request_id, PROPAGATED_TIMEOUT_S)
assert propagated is not None, f"no message_propagated event. Events: {collector.events}"
@pytest.mark.parametrize("port_field, bound_key, enable_service, default_port", SERVICE_PORTS)
def test_auto_port_per_field(self, port_field, bound_key, enable_service, default_port):
# Each service port set to 0 in isolation, with its service enabled.
config = build_node_config(**{port_field: 0}, **enable_service)
result = WrapperManager.create_and_start(config=config)
assert result.is_ok(), f"create_and_start failed with {port_field}=0: {result.err()}"
with result.ok_value as node:
port = get_node_bound_ports(node)[bound_key]
assert port != 0, f"{port_field}=0 but nothing bound; auto-port did not happen"
assert port != default_port, f"{port_field}=0 bound the default {default_port}; expected an ephemeral port"
def test_concrete_ports_are_respected(self):
# Auto-port is opt-in: non-zero ports must bind exactly as requested.
requested = {field: free_port() for field, _, _, _ in SERVICE_PORTS}
enable_all = {key: value for _, _, enable, _ in SERVICE_PORTS for key, value in enable.items()}
config = build_node_config(**requested, **enable_all)
result = WrapperManager.create_and_start(config=config)
assert result.is_ok(), f"create_and_start failed: {result.err()}"
with result.ok_value as node:
bound = get_node_bound_ports(node)
for field, bound_key, _, _ in SERVICE_PORTS:
assert bound[bound_key] == requested[field], f"{field}: requested {requested[field]}, bound {bound[bound_key]}"
def test_bound_ports_zero_for_disabled_services(self):
# build_node_config leaves websocket, REST, metrics and discv5 off.
result = WrapperManager.create_and_start(config=build_node_config())
assert result.is_ok(), f"create_and_start failed: {result.err()}"
with result.ok_value as node:
bound = get_node_bound_ports(node)
assert bound["tcp"] != 0, "tcp is enabled but reports port 0"
for key in ("webSocket", "rest", "discv5Udp", "metrics"):
assert bound[key] == 0, f"{key} is disabled but reports port {bound[key]}"
def test_enr_advertises_bound_discv5_port(self):
# #3828 rebuilds the ENR after discv5 startup so it advertises the
# actually-bound UDP port, not the configured 0.
config = build_node_config(discv5UdpPort=0, discv5Discovery=True)
result = WrapperManager.create_and_start(config=config)
assert result.is_ok(), f"create_and_start failed: {result.err()}"
with result.ok_value as node:
discv5_port = get_node_bound_ports(node)["discv5Udp"]
assert discv5_port != 0, "discv5 enabled with port 0 but nothing bound"
enr_result = node.get_node_info_raw("MyENR")
assert enr_result.is_ok(), f"MyENR query failed: {enr_result.err()}"
assert enr_udp_port(enr_result.ok_value.strip()) == discv5_port, "ENR was not rebuilt after discv5 startup"

@ -1 +1 @@
Subproject commit fd4d0a5a7efcd56b395024f558afa416d7e27a2b
Subproject commit 36041dae76a53fff91fe53253933d8eb2ab385f9