e2e part3 (#181)

* add test s17

* Add temp changes

* Add s17 positive / negative scenarios

* add S19

* Add S06 relay-only test and fix wrapper helpers (#173)

* - Add S06 relay-only test case for testing message propagation without a store.
- Update `wrapper_helpers` for clearer event type handling and type annotations (`Optional[...]` usage).
- Simplify `get_node_multiaddr` to retrieve addresses via `get_node_info_raw`.
- Refactor `wrappers_manager` to adjust bindings path to `vendor` directory and add `get_node_info_raw` method.
- Update `.gitignore` to exclude `store.sqlite3*`.

* Refactor S06 relay-only test: replace try-finally blocks with context managers for clarity and conciseness.

* Migrate S06 relay-only test to `test_send_e2e.py` and refactor with `StepsCommon` for reusability.

---------

Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>

* Modify S19 test

* Adding S21

* Fix review comments

* Adding S22/S23

* Adding S24

* Add S26

* Add S30

* Add S31

* Improve `wait_for_event` loop logic and add `assert_event_invariants` helper (#178)

- Refactored the `wait_for_event` function for clarity and to ensure proper deadline handling within the loop.
- Introduced `assert_event_invariants` to validate per-request event properties, enforcing invariants like correct `requestId`, no duplicate terminal events, and proper timing between `Propagated` and `Sent`.
- Added tests for `assert_event_invariants` enforcement in `S14` and `S15` lightpush scenarios.

Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>

* Add S07 and S10 send API tests with event invariants helper  (#176)

* Add `assert_event_invariants` to enforce per-request event constraints and integrate into relevant tests

* Integrate `assert_event_invariants` into edge and store tests

* Remove redundant comments from `test_send_e2e.py`

---------

Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>

* Fix some tests

* Add S02/S12 send API tests and PR CI pipeline (#174)

* Add tests for auto-subscribe on first send and isolated sender with no peers

* Add PR CI workflow with tiered test strategy

- pr_tests.yml: build job with cache, wrapper-tests, smoke-tests,
  and label-triggered full-suite
- test_common.yml: add deploy_allure/send_discord inputs so PR runs
  skip reporting side effects
- Add docker_required marker to S19 (needs Docker, excluded from
  wrapper-only CI job)
- Register docker_required marker in pytest.ini

* Document PR CI test workflows in README

* Refine PR CI test strategy:
- Exclude `docker_required` tests from smoke set in `pr_tests.yml`.
- Add `wait_for_connected` helper for connection state checks.
- Update S19 test to dynamically create and clean up the store node setup.
- General simplifications and improved test stability.

* Add `wait_for_connected` assertion to ensure sender connection state before propagation test

* Refine tests and CI workflows:
- Replace `ERROR_TIMEOUT_S` with `ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S` in `test_send_e2e.py`.
- Adjust timeout assertion for better clarity and accuracy.
- Update `pr_tests.yml` to add retries (`--reruns`) and ignore wrapper tests in smoke tests.
- Change `test_common.yml` default Discord reporting to `false`.

* Normalize `portsshift` to `portsShift` in `test_send_e2e.py` configuration definitions.

---------

Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>

* Add relay-to-lightpush fallback integration tests (S08/S09) (#180)

Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>

* Ignore S19

* fix s26

* Ignore s20 / s31 for errors

* Change image name

* fix xfail syntax error

* rename test file

* FIx flaky tests

* comment the skipped tests

* Fix review comments

* revert tag in yml in latest

* commenting lightpush

* Modify the PR

* Fix the ports conflict

* Modify S20

* fix portsshift option

* remove the /true from yml to allow errors to exist

* Modify the yml to continue on error

* First set of review comments

* adding xfail mark for failed tests

* address review comments about xfail

* cleanup unused lines

* event collector fix

* Address review comment about delay constant

* fix the timeout review comment

* Add assert_event_invariants

* enhance comment on S26 test

* mark the waku tests as docker_required

* Add S01

* add S01 second scenario

* Add S03

* Add S04

* Adding S11

* modify s11 scenario to pass

* Adding test S05

* Adding the new tests in part3 file

* Fix the yml file error

* Add the new test file to the PR job

* bump logos-delivery-python-bindings to include destroy_keep_ctx

* modify the S01 test

* mark S01 with xfail

* mark the second S01 test as xfail too

* use skip instead of xfail

* comment the skip line to try S01 again

* restore the xfail mark again

* remove the wrapped text code from test file

* Changing  the test files names

* skip S01 again

* removed extra comments

* Update logos-delivery-python-bindings submodule

---------

Co-authored-by: Egor Rachkovskii <32649334+at0m1x19@users.noreply.github.com>
Co-authored-by: Egor Rachkovskii <egorrachkovskii@status.im>
This commit is contained in:
AYAHASSAN287 2026-05-14 15:48:14 +03:00 committed by GitHub
parent 11197db624
commit 4ba30a8aff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 1437 additions and 833 deletions

View File

@ -159,25 +159,45 @@ jobs:
--reruns 2 \ --reruns 2 \
--junit-xml=wrapper-results-basic.xml --junit-xml=wrapper-results-basic.xml
- name: Run wrapper tests - send e2e part 1 - name: Run wrapper tests - send handle and subscription
continue-on-error: true continue-on-error: true
env: env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: | run: |
pytest tests/wrappers_tests/test_send_e2e_part1.py \ pytest tests/wrappers_tests/test_send_handle_and_subscription.py \
-m "not docker_required" \ -m "not docker_required" \
--reruns 2 \ --reruns 2 \
--junit-xml=wrapper-results-send-part1.xml --junit-xml=wrapper-results-send-handle-and-subscription.xml
- name: Run wrapper tests - send e2e part 2 - name: Run wrapper tests - send relay propagation
continue-on-error: true continue-on-error: true
env: env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: | run: |
pytest tests/wrappers_tests/test_send_e2e_part2.py \ pytest tests/wrappers_tests/test_send_relay_propagation.py \
-m "not docker_required" \ -m "not docker_required" \
--reruns 2 \ --reruns 2 \
--junit-xml=wrapper-results-send-part2.xml --junit-xml=wrapper-results-send-relay-propagation.xml
- name: Run wrapper tests - send lightpush and edge
continue-on-error: true
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest tests/wrappers_tests/test_send_lightpush_and_edge.py \
-m "not docker_required" \
--reruns 2 \
--junit-xml=wrapper-results-send-lightpush-and-edge.xml
- name: Run wrapper tests - send errors and concurrency
continue-on-error: true
env:
PYTHONPATH: ${{ github.workspace }}/vendor/logos-delivery-python-bindings/waku
run: |
pytest tests/wrappers_tests/test_send_errors_and_concurrency.py \
-m "not docker_required" \
--reruns 2 \
--junit-xml=wrapper-results-send-errors-and-concurrency.xml
- name: Test Report - name: Test Report
if: always() if: always()

View File

@ -52,7 +52,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
shard: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17] shard: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17]
# total number of shards =18 means tests will split into 18 thread and run in parallel to increase execution speed # total number of shards =18 means tests will split into 18 thread and run in parallel to increase execution speed
# command for sharding : # command for sharding :
# pytest --shard-id=<shard_number> --num-shards=<total_shards> # pytest --shard-id=<shard_number> --num-shards=<total_shards>

View File

@ -229,52 +229,6 @@ class WakuNode:
logger.error(f"REST service did not become ready in time: {ex}") logger.error(f"REST service did not become ready in time: {ex}")
raise raise
def _start_wrapper(self, default_args, wait_for_node_sec):
logger.debug("Starting node using wrappers")
wrapper_config = self._default_args_to_wrapper_config(default_args)
result = WrapperManager.create_and_start(config=wrapper_config, timeout_s=wait_for_node_sec)
if result.is_err():
raise RuntimeError(f"Failed to start wrapper node: {result.err()}")
self._wrapper_node = result.ok_value
logger.debug(f"Started wrapper node. REST: {self._rest_port}")
DS.waku_nodes.append(self)
delay(1)
try:
self.ensure_ready(timeout_duration=wait_for_node_sec)
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
def _default_args_to_wrapper_config(self, default_args):
def _bool(key, default="false"):
return default_args.get(key, default).lower() == "true"
bootstrap = default_args.get("discv5-bootstrap-node")
return {
"logLevel": default_args.get("log-level", "DEBUG"),
"mode": "Core",
"networkingConfig": {
"listenIpv4": default_args.get("listen-address", "0.0.0.0"),
"p2pTcpPort": int(default_args["tcp-port"]),
"discv5UdpPort": int(default_args["discv5-udp-port"]),
"restPort": int(default_args["rest-port"]),
"restAddress": default_args.get("rest-address", "0.0.0.0"),
},
"protocolsConfig": {
"clusterId": int(default_args.get("cluster-id", DEFAULT_CLUSTER_ID)),
"relay": _bool("relay"),
"store": _bool("store"),
"filter": _bool("filter"),
"lightpush": _bool("lightpush"),
"peerExchange": _bool("peer-exchange"),
"discv5Discovery": _bool("discv5-discovery", "true"),
"discv5BootstrapNodes": [bootstrap] if bootstrap else [],
},
}
@retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True)
def register_rln(self, **kwargs): def register_rln(self, **kwargs):
logger.debug("Registering RLN credentials...") logger.debug("Registering RLN credentials...")

View File

@ -62,6 +62,10 @@ class WrapperManager:
def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.stop_and_destroy(timeout_s=timeout_s) return self._node.stop_and_destroy(timeout_s=timeout_s)
def destroy_keep_ctx(self, *, timeout_s: float = 20.0) -> Result[int, str]:
"""Pass-through for NodeWrapper.destroy_keep_ctx — see that method."""
return self._node.destroy_keep_ctx(timeout_s=timeout_s)
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.subscribe_content_topic(content_topic, timeout_s=timeout_s) return self._node.subscribe_content_topic(content_topic, timeout_s=timeout_s)

View File

@ -0,0 +1,148 @@
"""S01 helpers: invoke send() against an invalid handle in an isolated process.
Run via:
python -m tests.wrappers_tests.helpers.send_on_invalid_handle nil <marker>
python -m tests.wrappers_tests.helpers.send_on_invalid_handle destroyed <marker>
Prints a single line to stdout starting with <marker>, followed by a JSON
payload describing the outcome. Runs in its own process so that a missing
C-ABI guard (which can SIGSEGV) fails the parent test cleanly instead of
taking the pytest runner down.
Cases:
- nil: send() on a wrapper built with ctx=ffi.NULL.
- destroyed: send() after destroy_keep_ctx() self.ctx stays non-nil so the
call reaches the C side with the original (now-stale) pointer.
"""
import json
import sys
from pathlib import Path
def _ensure_bindings_on_path() -> None:
# The wrapper module lives outside the project tree, under vendor/.
# Helper file is at <root>/tests/wrappers_tests/helpers/<this>.py.
project_root = Path(__file__).resolve().parents[3]
bindings_path = project_root / "vendor" / "logos-delivery-python-bindings" / "waku"
if str(bindings_path) not in sys.path:
sys.path.insert(0, str(bindings_path))
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
def _emit(marker: str, payload: dict) -> None:
print(marker + json.dumps(payload))
def _run_nil_handle(marker: str) -> None:
from wrapper import NodeWrapper, ffi # type: ignore[import-not-found]
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import create_message_bindings
sender = WrapperManager(NodeWrapper(ctx=ffi.NULL, config_buffer=None, event_cb_handler=None))
send_result = sender.send_message(message=create_message_bindings())
_emit(
marker,
{
"is_ok": send_result.is_ok(),
"ok": send_result.ok_value if send_result.is_ok() else None,
"err": send_result.err() if send_result.is_err() else None,
},
)
def _run_destroyed_handle(marker: str) -> None:
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import EventCollector, create_message_bindings
from tests.wrappers_tests.conftest import build_node_config
collector = EventCollector()
create_result = WrapperManager.create_and_start(
config=build_node_config(),
event_cb=collector.event_callback,
)
if create_result.is_err():
_emit(
marker,
{
"stage": "create_and_start",
"is_ok": False,
"ok": None,
"err": create_result.err(),
"events_after_send": [],
},
)
return
sender = create_result.ok_value
stop_result = sender.stop_node()
if stop_result.is_err():
_emit(
marker,
{
"stage": "stop_node",
"is_ok": False,
"ok": None,
"err": stop_result.err(),
"events_after_send": [],
},
)
return
destroy_result = sender.destroy_keep_ctx()
if destroy_result.is_err():
_emit(
marker,
{
"stage": "destroy_keep_ctx",
"is_ok": False,
"ok": None,
"err": destroy_result.err(),
"events_after_send": [],
},
)
return
events_before_send = len(collector.events)
send_result = sender.send_message(message=create_message_bindings())
new_events = collector.events[events_before_send:]
_emit(
marker,
{
"stage": "send_message",
"is_ok": send_result.is_ok(),
"ok": send_result.ok_value if send_result.is_ok() else None,
"err": send_result.err() if send_result.is_err() else None,
"events_after_send": [str(e) for e in new_events],
},
)
CASES = {
"nil": _run_nil_handle,
"destroyed": _run_destroyed_handle,
}
def main() -> int:
if len(sys.argv) != 3 or sys.argv[1] not in CASES:
cases = "|".join(CASES)
print(f"usage: send_on_invalid_handle <{cases}> <result_marker>", file=sys.stderr)
return 2
case, marker = sys.argv[1], sys.argv[2]
_ensure_bindings_on_path()
CASES[case](marker)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,445 @@
from concurrent.futures import ThreadPoolExecutor
import pytest
from src.env_vars import NODE_2
from src.steps.common import StepsCommon
from src.steps.store import StepsStore
from src.libs.common import delay, to_base64
from src.libs.custom_logger import get_custom_logger
from src.node.waku_node import WakuNode
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import (
EventCollector,
assert_event_invariants,
create_message_bindings,
get_node_multiaddr,
wait_for_propagated,
wait_for_sent,
wait_for_error,
)
logger = get_custom_logger(__name__)
PROPAGATED_TIMEOUT_S = 30.0
RECOVERY_TIMEOUT_S = 45.0
# MaxTimeInCache from send_service.nim.
MAX_TIME_IN_CACHE_S = 60.0
# Extra slack to cover the background retry loop tick after the window expires.
CACHE_EXPIRY_SLACK_S = 10.0
ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S
RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
# S30: concurrent sends on the same content topic during initial auto-subscribe.
S30_CONCURRENT_SENDS = 5
S30_CONTENT_TOPIC = "/test/1/s30-concurrent/proto"
# S31: concurrent sends across mixed topics during peer churn.
S31_BURST_SIZE = 8
S31_CONTENT_TOPICS = [
"/test/1/s31-topic-a/proto",
"/test/1/s31-topic-b/proto",
"/test/1/s31-topic-c/proto",
"/test/1/s31-topic-d/proto",
"/test/1/s31-topic-e/proto",
"/test/1/s31-topic-f/proto",
"/test/1/s31-topic-g/proto",
"/test/1/s31-topic-h/proto",
]
class TestS12IsolatedSenderNoPeers(StepsCommon):
"""
S12 Isolated sender, no peers.
Sender has relay enabled but zero relay peers and zero lightpush peers.
Expected: send() returns Ok(RequestId), but eventually a message_error
event arrives (no route to propagate).
"""
def test_s12_send_with_no_peers_produces_error(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
message = create_message_bindings(
payload=to_base64("S12 isolated sender payload"),
contentTopic="/test/1/s12-isolated/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
error = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S,
)
assert error is not None, (
f"No message_error event within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s "
f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack) for isolated sender. "
f"Collected events: {sender_collector.events}"
)
assert error["requestId"] == request_id
propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert propagated is None, f"Unexpected message_propagated event for isolated sender: {propagated}"
class TestS21ErrorWhenRetryWindowExpires(StepsCommon):
"""
S21: delivery retry window expires before any valid path recovers.
"""
def test_s21_error_when_retry_window_expires(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
message = create_message_bindings()
send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
# No peer
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S,
)
assert error_event is not None, (
f"No MessageErrorEvent received within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s "
f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack). "
f"Collected events: {sender_collector.events}"
)
logger.info(f"S21 received error event: {error_event}")
assert error_event.get("error") == RETRY_WINDOW_EXPIRED_MSG, (
f"Unexpected error message in message_error event.\n"
f"Expected: {RETRY_WINDOW_EXPIRED_MSG!r}\n"
f"Got: {error_event.get('error')!r}\n"
f"Full event: {error_event}"
)
assert_event_invariants(sender_collector, request_id)
class TestS30ConcurrentSendsDuringAutoSubscribe(StepsCommon):
"""
S30: concurrent sends on the same content topic during initial auto-subscribe.
- Sender starts unsubscribed to the target topic.
- Several send() calls are issued at nearly the same time.
- Each call must return Ok(RequestId) with a unique id.
- Each request id must get its own propagated event,
with no dropped or cross-associated events.
"""
def test_s30_concurrent_sends_during_auto_subscribe(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
# Relay peer so the sender has a propagation path.
relay_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender_node)],
"portsShift": 1,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
# Build one message per send, with distinct payloads so we can
# detect any cross-association between request ids and events.
messages = [
create_message_bindings(
contentTopic=S30_CONTENT_TOPIC,
payload=to_base64(f"s30-concurrent-{i}"),
)
for i in range(S30_CONCURRENT_SENDS)
]
# Fire all sends concurrently. The sender is not yet subscribed
# to S30_CONTENT_TOPIC, so this exercises the auto-subscribe path
# under contention.
with ThreadPoolExecutor(max_workers=S30_CONCURRENT_SENDS) as pool:
send_results = list(pool.map(sender_node.send_message, messages))
# Every send must return Ok(RequestId).
request_ids = []
for i, send_result in enumerate(send_results):
assert send_result.is_ok(), f"Concurrent send #{i} failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, f"Concurrent send #{i} returned an empty RequestId"
request_ids.append(request_id)
# Request ids must be unique across concurrent sends.
assert len(set(request_ids)) == len(request_ids), f"Duplicate RequestIds returned by concurrent sends: {request_ids}"
# Each request id must get its own propagated event and no error.
for request_id in request_ids:
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent for request_id={request_id} "
f"within {PROPAGATED_TIMEOUT_S}s. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error for request_id={request_id}: {error_event}"
# Cross-association guard: every event with a requestId must
# belong to exactly one of the request ids we issued.
issued = set(request_ids)
for event in sender_collector.snapshot():
event_request_id = event.get("requestId")
if event_request_id is None:
continue
assert event_request_id in issued, (
f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"
)
# Per-request invariants apply to every concurrent send
# (correct requestId, no duplicate terminal events,
# Sent never before Propagated).
for request_id in request_ids:
assert_event_invariants(sender_collector, request_id)
class TestS31ConcurrentSendsMixedTopicsDuringChurn(StepsStore):
"""
S31: concurrent sends across mixed content topics during peer churn.
"""
@pytest.mark.docker_required
def test_s31_concurrent_sends_mixed_topics_during_churn(self, node_config):
sender_collector = EventCollector()
relay_peer = WakuNode(NODE_2, f"s31_relay_peer_{self.test_id}")
relay_peer.start(relay="true", discv5_discovery="false")
relay_peer.set_relay_subscriptions([self.test_pubsub_topic])
lightpush_peer = WakuNode(NODE_2, f"s31_lightpush_peer_{self.test_id}")
lightpush_peer.start(relay="true", lightpush="true", discv5_discovery="false")
lightpush_peer.set_relay_subscriptions([self.test_pubsub_topic])
store_peer = WakuNode(NODE_2, f"s31_store_peer_{self.test_id}")
store_peer.start(relay="true", store="true", discv5_discovery="false")
store_peer.set_relay_subscriptions([self.test_pubsub_topic])
churn_peers = [relay_peer, lightpush_peer, store_peer]
# Mesh docker peers so a lightpushed message can fan out to the store peer.
peer_multiaddrs = [p.get_multiaddr_with_id() for p in churn_peers]
for peer in churn_peers:
others = [a for a in peer_multiaddrs if a != peer.get_multiaddr_with_id()]
peer.add_peers(others)
node_config.update(
{
"mode": "Edge",
"relay": True,
"lightpush": True,
"store": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"lightpushnode": lightpush_peer.get_multiaddr_with_id(),
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
sender_multiaddr = get_node_multiaddr(sender_node)
for peer in churn_peers:
peer.add_peers([sender_multiaddr])
delay(3) # let docker peers connect to the sender
all_request_ids: list[str] = []
phase1_ids = self._s31_fire_burst(sender_node, phase_label="phase1")
all_request_ids.extend(phase1_ids)
for peer in churn_peers:
peer.restart()
delay(1) # small window so the restart is actually in-flight
phase2_ids = self._s31_fire_burst(sender_node, phase_label="phase2")
all_request_ids.extend(phase2_ids)
# Wait for all peers to be ready again and re-attach the sender.
for peer in churn_peers:
peer.ensure_ready(timeout_duration=20)
peer.add_peers([sender_multiaddr])
peer_multiaddrs = [p.get_multiaddr_with_id() for p in churn_peers]
for peer in churn_peers:
others = [a for a in peer_multiaddrs if a != peer.get_multiaddr_with_id()]
peer.add_peers(others)
delay(3)
phase3_ids = self._s31_fire_burst(sender_node, phase_label="phase3")
all_request_ids.extend(phase3_ids)
assert len(set(all_request_ids)) == len(all_request_ids), f"Duplicate RequestIds across bursts: {all_request_ids}"
# Phase 1 ran before any churn, so the mesh was stable — standard timeout.
# Phase 3 ran right after restart + re-attach, so the mesh needed to
# re-stabilize — use the recovery timeout to avoid CI flakiness.
phase_timeouts = [
(phase1_ids, PROPAGATED_TIMEOUT_S),
(phase3_ids, RECOVERY_TIMEOUT_S),
]
for request_ids, timeout_s in phase_timeouts:
for request_id in request_ids:
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=timeout_s,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent for stable-phase "
f"request_id={request_id} within {timeout_s}s. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error event for stable-phase " f"request_id={request_id}: {error_event}"
for request_id in phase2_ids:
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected terminal message_error for phase-2 " f"request_id={request_id} after recovery: {error_event}"
issued = set(all_request_ids)
for event in sender_collector.snapshot():
event_request_id = event.get("requestId")
if event_request_id is None:
continue
assert event_request_id in issued, (
f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"
)
# Use the hash the wrapper emitted on message_sent so the store
# lookup matches the exact bytes that were actually published.
phase3_hashes = []
for request_id in phase3_ids:
sent_event = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=RECOVERY_TIMEOUT_S,
)
assert sent_event is not None, (
f"No message_sent event for phase-3 request_id={request_id} "
f"within {RECOVERY_TIMEOUT_S}s. Collected events: {sender_collector.events}"
)
msg_hash = sent_event.get("messageHash")
assert msg_hash, f"message_sent event missing messageHash: {sent_event}"
phase3_hashes.append(msg_hash)
# 3 phases × S31_BURST_SIZE messages, so the page must fit them all,
# otherwise phase-3 hashes (which sort last in ascending order) get cut off.
self.check_sent_message_is_stored(
expected_hashes=phase3_hashes,
store_node=store_peer,
pubsub_topic=self.test_pubsub_topic,
page_size=S31_BURST_SIZE * 3,
ascending="true",
)
# Per-request invariants apply across all phases, including the
# retry-path bursts (phase 2). If retries ever emit duplicate
# Propagated events or reorder Sent before Propagated, this catches it.
for request_id in all_request_ids:
assert_event_invariants(sender_collector, request_id)
def _s31_fire_burst(self, sender_node, *, phase_label: str) -> list[str]:
"""Fire S31_BURST_SIZE concurrent sends, one per topic in S31_CONTENT_TOPICS.
Returns the list of RequestIds. Asserts every send returned Ok."""
messages = [
self.create_message(
contentTopic=S31_CONTENT_TOPICS[i],
payload=to_base64(f"s31-{phase_label}-{i}"),
)
for i in range(S31_BURST_SIZE)
]
with ThreadPoolExecutor(max_workers=S31_BURST_SIZE) as pool:
send_results = list(pool.map(sender_node.send_message, messages))
request_ids = []
for i, send_result in enumerate(send_results):
assert send_result.is_ok(), f"{phase_label}: concurrent send #{i} failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, f"{phase_label}: concurrent send #{i} returned an empty RequestId"
request_ids.append(request_id)
return request_ids

View File

@ -0,0 +1,394 @@
import json
import subprocess
import sys
import pytest
from src.steps.common import StepsCommon
from src.libs.common import to_base64
from src.libs.custom_logger import get_custom_logger
from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import (
EventCollector,
assert_event_invariants,
create_message_bindings,
get_node_multiaddr,
wait_for_connected,
wait_for_propagated,
wait_for_sent,
wait_for_error,
)
logger = get_custom_logger(__name__)
PROPAGATED_TIMEOUT_S = 30.0
S01_EXPECTED_ERROR_FRAGMENT = "not initialized"
# Destroyed-handle path fails synchronously in the C layer (no callback),
# so the binding surfaces a different string than the nil-handle path.
S01_DESTROYED_HANDLE_ERROR_FRAGMENT = "immediate call failed"
S01_SUBPROCESS_TIMEOUT_S = 30
S01_RESULT_MARKER = "__S01_RESULT__"
SEND_AFTER_DESTROY_RESULT_MARKER = "__SEND_AFTER_DESTROY_RESULT__"
SEND_AFTER_DESTROY_SUBPROCESS_TIMEOUT_S = 60
S01_INVALID_HANDLE_HELPER = "tests.wrappers_tests.helpers.send_invalid_handle"
# S05: malformed content topics
S05_EXPECTED_ERROR_FRAGMENT = "Failed to auto-subscribe"
S05_MALFORMED_CONTENT_TOPICS = [
# No leading slash — parser rejects with "must start with slash".
("s05-invalid-no-leading-slash", "no-leading-slash"),
# Empty string — parser rejects empty content topic.
("", "empty"),
# Only 3 segments — content topics need /app/version/name/encoding.
("/app/1/name", "missing-encoding-segment"),
# Empty middle segment between slashes.
("/app//name/proto", "empty-middle-segment"),
]
class TestS01NilOrUninitializedHandle(StepsCommon):
"""S01 — send() on a nil/destroyed handle must Err, no events, no crash."""
@pytest.mark.skip(reason="see https://github.com/logos-messaging/logos-delivery/issues/3873")
def test_s01_send_on_uninitialized_handle(self):
completed = subprocess.run(
[sys.executable, "-m", S01_INVALID_HANDLE_HELPER, "nil", S01_RESULT_MARKER],
capture_output=True,
text=True,
timeout=S01_SUBPROCESS_TIMEOUT_S,
)
assert completed.returncode == 0, (
f"send() crashed on a nil handle (returncode={completed.returncode}). " f"stdout={completed.stdout!r} stderr={completed.stderr!r}"
)
result_line = next(
(l for l in completed.stdout.splitlines() if l.startswith(S01_RESULT_MARKER)),
None,
)
assert result_line, f"missing result marker. stdout={completed.stdout!r} stderr={completed.stderr!r}"
result = json.loads(result_line[len(S01_RESULT_MARKER) :])
assert result["is_ok"] is False, f"expected Err, got Ok({result['ok']!r})"
assert S01_EXPECTED_ERROR_FRAGMENT in (
result["err"] or ""
), f"expected error to mention {S01_EXPECTED_ERROR_FRAGMENT!r}, got: {result['err']!r}"
@pytest.mark.skip(reason="see https://github.com/logos-messaging/logos-delivery/issues/3863")
def test_s01_send_on_destroyed_handle(self):
completed = subprocess.run(
[sys.executable, "-m", S01_INVALID_HANDLE_HELPER, "destroyed", SEND_AFTER_DESTROY_RESULT_MARKER],
capture_output=True,
text=True,
timeout=SEND_AFTER_DESTROY_SUBPROCESS_TIMEOUT_S,
)
assert completed.returncode == 0, (
f"send() crashed on a destroyed handle (returncode={completed.returncode}). " f"stdout={completed.stdout!r} stderr={completed.stderr!r}"
)
result_line = next(
(l for l in completed.stdout.splitlines() if l.startswith(SEND_AFTER_DESTROY_RESULT_MARKER)),
None,
)
assert result_line, f"missing result marker. stdout={completed.stdout!r} stderr={completed.stderr!r}"
result = json.loads(result_line[len(SEND_AFTER_DESTROY_RESULT_MARKER) :])
assert result["stage"] == "send_message", f"setup failed at stage {result['stage']!r}: {result['err']!r}"
assert result["is_ok"] is False, f"expected Err, got Ok({result['ok']!r})"
err_msg = result["err"] or ""
assert S01_EXPECTED_ERROR_FRAGMENT in err_msg or S01_DESTROYED_HANDLE_ERROR_FRAGMENT in err_msg, (
f"expected error to mention {S01_EXPECTED_ERROR_FRAGMENT!r} " f"or {S01_DESTROYED_HANDLE_ERROR_FRAGMENT!r}, got: {result['err']!r}"
)
assert result["events_after_send"] == [], f"expected no events after send(), got: {result['events_after_send']}"
class TestS02AutoSubscribeOnFirstSend(StepsCommon):
"""
S02 Auto-subscribe on first send.
Sender never calls subscribe_content_topic() before send().
The send API must auto-subscribe to the content topic used in the message.
Expected: send() returns Ok(RequestId), message_propagated arrives.
"""
def test_s02_send_without_explicit_subscribe(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S02 auto-subscribe test payload"),
contentTopic="/test/1/s02-auto-subscribe/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
class TestS03SendOnAlreadySubscribedTopic(StepsCommon):
"""
S03 Send on already-subscribed content topic.
Sender explicitly calls subscribe_content_topic() before send().
The send path must behave identically to the auto-subscribe case:
Propagated arrives, no Sent (store disabled), no Error.
Topology mirrors S06 (relay-only sender + relay peer, no store).
Purpose: proves the send path is identical when auto-subscription is skipped.
"""
def test_s03_send_on_already_subscribed_content_topic(self, node_config):
sender_collector = EventCollector()
content_topic = "/test/1/s03-already-subscribed/proto"
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
# Explicit subscribe before send — this is what S03 is about.
# The send path must still return Ok(RequestId) and emit the
# same events as the auto-subscribe topology in S06.
subscribe_result = sender.subscribe_content_topic(content_topic)
assert subscribe_result.is_ok(), f"subscribe_content_topic failed: {subscribe_result.err()}"
message = create_message_bindings(
payload=to_base64("S03 already-subscribed test payload"),
contentTopic=content_topic,
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
assert_event_invariants(sender_collector, request_id)
class TestS04UnsubscribeThenSendSameTopic(StepsCommon):
"""
S04 Unsubscribe, then send the same content topic again.
Sender subscribes to topic A, unsubscribes from A, then sends on A.
The send path must re-establish topic interest and deliver normally.
Topology mirrors S06 (relay-only sender + relay peer, no store).
Expected: send() returns Ok(RequestId), Propagated arrives,
no Sent (store disabled), no Error.
Purpose: verifies send() re-establishes topic interest after local unsubscribe.
"""
def test_s04_unsubscribe_then_send_same_content_topic(self, node_config):
sender_collector = EventCollector()
content_topic = "/test/1/s04-unsubscribe-resend/proto"
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
# subscribe -> unsubscribe -> send: send() must re-establish
# topic interest internally.
subscribe_result = sender.subscribe_content_topic(content_topic)
assert subscribe_result.is_ok(), f"subscribe_content_topic failed: {subscribe_result.err()}"
unsubscribe_result = sender.unsubscribe_content_topic(content_topic)
assert unsubscribe_result.is_ok(), f"unsubscribe_content_topic failed: {unsubscribe_result.err()}"
message = create_message_bindings(
payload=to_base64("S04 unsubscribe-then-send test payload"),
contentTopic=content_topic,
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed after unsubscribe: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s "
f"after unsubscribe + send. Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
assert_event_invariants(sender_collector, request_id)
class TestS05AutoSubscribeFailureBeforeTaskCreation(StepsCommon):
"""
S05 Auto-subscribe failure before task creation.
Sender is initialized but auto-subscription is forced to fail by using a
malformed content topic that breaks shard resolution inside
SubscriptionManager.subscribe().
Expected: send() returns Err with an "auto-subscribe" message, no events.
Purpose: covers the last synchronous error path before request ID creation,
across several distinct validator branches.
"""
@pytest.mark.parametrize(
"content_topic",
[topic for topic, _ in S05_MALFORMED_CONTENT_TOPICS],
ids=[case_id for _, case_id in S05_MALFORMED_CONTENT_TOPICS],
)
def test_s05_send_fails_when_auto_subscribe_fails(self, node_config, content_topic):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
# Malformed content topic — SubscriptionManager.subscribe() cannot
# resolve it to a shard, so auto-subscribe inside send() fails.
message = create_message_bindings(
payload=to_base64("S05 auto-subscribe failure payload"),
contentTopic=content_topic,
)
send_result = sender.send_message(message=message)
assert send_result.is_err(), (
f"send() must return Err when auto-subscribe fails for " f"content_topic={content_topic!r}, got Ok({send_result.ok_value!r})"
)
error_message = send_result.err() or ""
assert S05_EXPECTED_ERROR_FRAGMENT in error_message, (
f"expected error to mention {S05_EXPECTED_ERROR_FRAGMENT!r} " f"for content_topic={content_topic!r}, got: {error_message!r}"
)
# No request id was created, so no events should be emitted.
assert sender_collector.events == [] or all(
event.get("eventType") == "connection_status_change" for event in sender_collector.events
), f"Unexpected events after a pre-task-creation failure: {sender_collector.events}"

View File

@ -1,253 +1,36 @@
import base64 import base64
import pytest import pytest
from src.env_vars import NODE_1
from src.steps.common import StepsCommon from src.steps.common import StepsCommon
from src.libs.common import delay, to_base64 from src.libs.common import delay, to_base64
from src.libs.custom_logger import get_custom_logger from src.libs.custom_logger import get_custom_logger
from src.node.waku_node import WakuNode
from src.node.wrappers_manager import WrapperManager from src.node.wrappers_manager import WrapperManager
from src.node.wrapper_helpers import ( from src.node.wrapper_helpers import (
EventCollector, EventCollector,
assert_event_invariants, assert_event_invariants,
create_message_bindings, create_message_bindings,
get_node_multiaddr, get_node_multiaddr,
wait_for_connected,
wait_for_propagated, wait_for_propagated,
wait_for_sent, wait_for_sent,
wait_for_error, wait_for_error,
) )
from tests.wrappers_tests.conftest import build_node_config from src.test_data import DEFAULT_CLUSTER_ID
from tests.wrappers_tests.conftest import build_node_config, free_port
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
PROPAGATED_TIMEOUT_S = 30.0 PROPAGATED_TIMEOUT_S = 30.0
SENT_TIMEOUT_S = 10.0
NO_SENT_OBSERVATION_S = 5.0 NO_SENT_OBSERVATION_S = 5.0
SENT_AFTER_STORE_TIMEOUT_S = 60.0 SENT_AFTER_STORE_TIMEOUT_S = 60.0
OVERSIZED_PAYLOAD_BYTES = 200 * 1024 OVERSIZED_PAYLOAD_BYTES = 200 * 1024
RECOVERY_TIMEOUT_S = 45.0 RECOVERY_TIMEOUT_S = 45.0
SERVICE_DOWN_SETTLE_S = 3.0 SERVICE_DOWN_SETTLE_S = 3.0
# MaxTimeInCache from send_service.nim. # Default-cluster shard-0 pubsub topic; used to subscribe the S11 docker store
MAX_TIME_IN_CACHE_S = 60.0 # peer so it joins the same relay mesh as the wrapper nodes (wrapper config
# Extra slack to cover the background retry loop tick after the window expires. # uses numShardsInNetwork=1 => shard 0).
CACHE_EXPIRY_SLACK_S = 10.0 STORE_PEER_PUBSUB_TOPIC = f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0"
ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S
RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
class TestS02AutoSubscribeOnFirstSend(StepsCommon):
"""
S02 Auto-subscribe on first send.
Sender never calls subscribe_content_topic() before send().
The send API must auto-subscribe to the content topic used in the message.
Expected: send() returns Ok(RequestId), message_propagated arrives.
"""
def test_s02_send_without_explicit_subscribe(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S02 auto-subscribe test payload"),
contentTopic="/test/1/s02-auto-subscribe/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
class TestS06CoreSenderRelayOnly(StepsCommon):
"""
S06 Core sender with relay peers only, no store.
Sender has local relay enabled and is connected to one relay peer.
Expected: send() returns Ok(RequestId), message_propagated event arrives,
no message_sent (store disabled), no message_error.
"""
def test_s06_relay_propagation_without_store(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S06 relay-only test payload"),
contentTopic="/test/1/s06-relay-only/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
assert_event_invariants(sender_collector, request_id)
class TestS07CoreSenderRelayAndStore(StepsCommon):
"""
S07 Core sender with relay peers and store peer, reliability enabled.
Sender relays message to a store-capable peer; delivery service validates
the message reached the store via p2p reliability check.
Expected: Propagated, then Sent.
"""
def test_s07_relay_propagation_with_store_validation(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
"store": True,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start store peer: {peer_result.err()}"
with peer_result.ok_value:
message = create_message_bindings(
payload=to_base64("S07 relay+store test payload"),
contentTopic="/test/1/s07-relay-store/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
sent = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=SENT_TIMEOUT_S,
)
assert sent is not None, (
f"No message_sent event within {SENT_TIMEOUT_S}s after propagation. " f"Collected events: {sender_collector.events}"
)
assert sent["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
assert_event_invariants(sender_collector, request_id)
class TestRelayToLightpushFallback(StepsCommon): class TestRelayToLightpushFallback(StepsCommon):
@ -417,7 +200,7 @@ class TestS10EdgeSenderLightpushOnly(StepsCommon):
""" """
@pytest.mark.xfail(reason="lightpush peer discovery via staticnodes is broken, see https://github.com/logos-messaging/logos-delivery/issues/3847") @pytest.mark.xfail(reason="lightpush peer discovery via staticnodes is broken, see https://github.com/logos-messaging/logos-delivery/issues/3847")
def test_s10_edge_lightpush_propagation(self, node_config): def test_s10_edge_lightpush_propagation(self):
sender_collector = EventCollector() sender_collector = EventCollector()
common = { common = {
@ -488,60 +271,109 @@ class TestS10EdgeSenderLightpushOnly(StepsCommon):
assert_event_invariants(sender_collector, request_id) assert_event_invariants(sender_collector, request_id)
class TestS12IsolatedSenderNoPeers(StepsCommon): class TestS11EdgeSenderLightpushAndStore(StepsCommon):
""" """
S12 Isolated sender, no peers. S11 Edge sender with lightpush path and store validation.
Sender has relay enabled but zero relay peers and zero lightpush peers. Edge sender has no local relay; it publishes via a wrapper lightpush peer
Expected: send() returns Ok(RequestId), but eventually a message_error and validates delivery via a docker store peer. Reliability enabled.
event arrives (no route to propagate). Topology:
[LightpushPeer] wrapper, relay=True, lightpush=True, store=False
[StorePeer] docker WakuNode, relay=true, store=true,
dials the lightpush peer via add_peers and subscribes
to the same shard-0 pubsub topic so it joins the
relay mesh and archives propagated messages.
[Edge] wrapper, mode="Edge",
staticnodes=[lightpush_peer],
storenode=store_peer,
reliabilityEnabled=True
Expected: send() returns Ok(RequestId), Propagated arrives, then Sent
(store validation succeeds), no Error.
Purpose: edge-mode fully validated success path against a real docker
store node (cross-implementation check).
""" """
def test_s12_send_with_no_peers_produces_error(self, node_config): def test_s11_edge_lightpush_with_store_validation(self):
sender_collector = EventCollector() sender_collector = EventCollector()
node_config.update( common = {
{ "filter": False,
"relay": True, "discv5Discovery": True,
"store": False, "numShardsInNetwork": 1,
"lightpush": False, }
"filter": False,
"discv5Discovery": False, lightpush_config = build_node_config(
"numShardsInNetwork": 1, relay=True,
} lightpush=True,
store=False,
**common,
) )
sender_result = WrapperManager.create_and_start( lightpush_result = WrapperManager.create_and_start(config=lightpush_config)
config=node_config, assert lightpush_result.is_ok(), f"Failed to start lightpush peer: {lightpush_result.err()}"
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender: with lightpush_result.ok_value as lightpush_peer:
message = create_message_bindings( lightpush_multiaddr = get_node_multiaddr(lightpush_peer)
payload=to_base64("S12 isolated sender payload"),
contentTopic="/test/1/s12-isolated/proto", # Docker store peer — real nwaku node running as the store backend.
# Dial the lightpush peer via add_peers and subscribe to the same
# shard-0 pubsub topic so it joins the relay mesh and archives
# messages propagated by the lightpush peer.
store_peer = WakuNode(NODE_1, f"s11_store_{self.test_id}")
store_peer.start(relay="true", store="true")
self.add_node_peer(store_peer, [lightpush_multiaddr])
store_peer.set_relay_subscriptions([STORE_PEER_PUBSUB_TOPIC])
store_multiaddr = store_peer.get_multiaddr_with_id()
edge_config = build_node_config(
mode="Edge",
# assuming disc5v already happened
staticnodes=[lightpush_multiaddr, store_multiaddr],
reliabilityEnabled=True,
**common,
) )
send_result = sender.send_message(message=message) edge_result = WrapperManager.create_and_start(
assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}" config=edge_config,
event_cb=sender_collector.event_callback,
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
error = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S,
) )
assert error is not None, ( assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"
f"No message_error event within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s "
f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack) for isolated sender. "
f"Collected events: {sender_collector.events}"
)
assert error["requestId"] == request_id
propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) with edge_result.ok_value as edge_sender:
assert propagated is None, f"Unexpected message_propagated event for isolated sender: {propagated}" message = create_message_bindings(
payload=to_base64("S11 edge lightpush + store test payload"),
contentTopic="/test/1/s11-edge-lightpush-store/proto",
)
send_result = edge_sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
sent = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=SENT_AFTER_STORE_TIMEOUT_S,
)
assert sent is not None, (
f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " f"after propagation. Collected events: {sender_collector.events}"
)
assert sent["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
assert_event_invariants(sender_collector, request_id)
class TestS14LightpushNonRetryableError(StepsCommon): class TestS14LightpushNonRetryableError(StepsCommon):
@ -698,3 +530,122 @@ class TestS15LightpushRetryableErrorRecovery(StepsCommon):
assert error is None, f"Unexpected message_error after recovery: {error}" assert error is None, f"Unexpected message_error after recovery: {error}"
assert_event_invariants(sender_collector, request_id) assert_event_invariants(sender_collector, request_id)
class TestS26LightpushPeerChurn(StepsCommon):
"""
S26: multiple lightpush peers, the selected one disappears,
an alternate remains.
Topology (3 peers + sender):
- peer1: relay + lightpush. The lightpush server initially selected
by the sender. Stopped mid-test to simulate churn.
- relay_peer: relay-only. Kept alive throughout the test as a
stable gossipsub mesh neighbour, so that after peer1 disappears
peer2 still has a relay path to propagate the message.
- peer2: relay + lightpush. The surviving lightpush server that
must take over once peer1 is gone.
- sender: edge node with peer1 and peer2 as static lightpush peers.
"""
def test_s26_lightpush_peer_churn_alternate_remains(self, node_config):
sender_collector = EventCollector()
peer1_config = {
**node_config,
"relay": True,
"lightpush": True,
"store": False,
"filter": False,
"discv5Discovery": True,
"numShardsInNetwork": 1,
"portsShift": 1,
"discv5UdpPort": free_port(),
}
peer1_result = WrapperManager.create_and_start(config=peer1_config)
assert peer1_result.is_ok(), f"Failed to start lightpush peer1: {peer1_result.err()}"
peer1 = peer1_result.ok_value
relay_config = {
**node_config,
"relay": True,
"lightpush": False,
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"portsShift": 4,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value as relay_peer:
peer2_config = {
**peer1_config,
"staticnodes": [
get_node_multiaddr(peer1),
get_node_multiaddr(relay_peer),
],
"portsShift": 2,
"discv5UdpPort": free_port(),
}
peer2_result = WrapperManager.create_and_start(config=peer2_config)
assert peer2_result.is_ok(), f"Failed to start lightpush peer2: {peer2_result.err()}"
with peer2_result.ok_value as peer2:
sender_config = {
**node_config,
"mode": "Edge",
"relay": True,
"lightpush": True,
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"portsShift": 3,
"staticnodes": [
get_node_multiaddr(peer1),
get_node_multiaddr(peer2),
],
}
sender_result = WrapperManager.create_and_start(
config=sender_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
delay(2)
stop_result = peer1.stop_and_destroy()
assert stop_result.is_ok(), f"Failed to stop peer1: {stop_result.err()}"
delay(2)
message = create_message_bindings()
send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) during peer churn, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
# Expect Propagated via the surviving lightpush peer (peer2).
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent within {PROPAGATED_TIMEOUT_S}s "
f"after the selected lightpush peer disappeared. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error event during peer churn: {error_event}"
assert_event_invariants(sender_collector, request_id)

View File

@ -1,8 +1,7 @@
from concurrent.futures import ThreadPoolExecutor
import pytest import pytest
from src.env_vars import NODE_2 from src.env_vars import NODE_2
from src.steps.common import StepsCommon from src.steps.common import StepsCommon
from src.steps.store import StepsStore
from src.libs.common import delay, to_base64 from src.libs.common import delay, to_base64
from src.libs.custom_logger import get_custom_logger from src.libs.custom_logger import get_custom_logger
from src.node.waku_node import WakuNode from src.node.waku_node import WakuNode
@ -17,55 +16,179 @@ from src.node.wrapper_helpers import (
wait_for_sent, wait_for_sent,
wait_for_error, wait_for_error,
) )
from src.steps.store import StepsStore
from tests.wrappers_tests.conftest import free_port from tests.wrappers_tests.conftest import free_port
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
## max time to wait after sending the message
PROPAGATED_TIMEOUT_S = 30.0 PROPAGATED_TIMEOUT_S = 30.0
SENT_TIMEOUT_S = 10.0 SENT_TIMEOUT_S = 10.0
NO_SENT_OBSERVATION_S = 5.0 NO_SENT_OBSERVATION_S = 5.0
SENT_AFTER_STORE_TIMEOUT_S = 60.0 SENT_AFTER_STORE_TIMEOUT_S = 60.0
NO_STORE_OBSERVATION_S = 60.0 NO_STORE_OBSERVATION_S = 60.0
RECOVERY_TIMEOUT_S = 45.0
# S20 stabilization delays for gossipsub mesh formation. # S20 stabilization delays for gossipsub mesh formation.
MESH_STABILIZATION_S = 10 MESH_STABILIZATION_S = 10
STORE_JOIN_STABILIZATION_S = 10 STORE_JOIN_STABILIZATION_S = 10
# MaxTimeInCache from send_service.nim.
MAX_TIME_IN_CACHE_S = 60.0
# Extra slack to cover the background retry loop tick after the window expires.
CACHE_EXPIRY_SLACK_S = 10.0
ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S
RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"
# S30: concurrent sends on the same content topic during initial auto-subscribe. class TestS06CoreSenderRelayOnly(StepsCommon):
S30_CONCURRENT_SENDS = 5 """
S30_CONTENT_TOPIC = "/test/1/s30-concurrent/proto" S06 Core sender with relay peers only, no store.
Sender has local relay enabled and is connected to one relay peer.
Expected: send() returns Ok(RequestId), message_propagated event arrives,
no message_sent (store disabled), no message_error.
"""
# S31: concurrent sends across mixed topics during peer churn. def test_s06_relay_propagation_without_store(self, node_config):
S31_BURST_SIZE = 8 sender_collector = EventCollector()
S31_CONTENT_TOPICS = [
"/test/1/s31-topic-a/proto", node_config.update(
"/test/1/s31-topic-b/proto", {
"/test/1/s31-topic-c/proto", "relay": True,
"/test/1/s31-topic-d/proto", "store": False,
"/test/1/s31-topic-e/proto", "lightpush": False,
"/test/1/s31-topic-f/proto", "filter": False,
"/test/1/s31-topic-g/proto", "discv5Discovery": False,
"/test/1/s31-topic-h/proto", "numShardsInNetwork": 1,
] "reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}"
with peer_result.ok_value:
assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state"
message = create_message_bindings(
payload=to_base64("S06 relay-only test payload"),
contentTopic="/test/1/s06-relay-only/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
sent = wait_for_sent(sender_collector, request_id, timeout_s=0)
assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"
assert_event_invariants(sender_collector, request_id)
class TestSendBeforeRelay(StepsStore): class TestS07CoreSenderRelayAndStore(StepsCommon):
"""
S07 Core sender with relay peers and store peer, reliability enabled.
Sender relays message to a store-capable peer; delivery service validates
the message reached the store via p2p reliability check.
Expected: Propagated, then Sent.
"""
def test_s07_relay_propagation_with_store_validation(self, node_config):
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"lightpush": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"reliabilityEnabled": True,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender:
peer_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender)],
"portsShift": 1,
"store": True,
}
peer_result = WrapperManager.create_and_start(config=peer_config)
assert peer_result.is_ok(), f"Failed to start store peer: {peer_result.err()}"
with peer_result.ok_value:
message = create_message_bindings(
payload=to_base64("S07 relay+store test payload"),
contentTopic="/test/1/s07-relay-store/proto",
)
send_result = sender.send_message(message=message)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id
sent = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=SENT_TIMEOUT_S,
)
assert sent is not None, (
f"No message_sent event within {SENT_TIMEOUT_S}s after propagation. " f"Collected events: {sender_collector.events}"
)
assert sent["requestId"] == request_id
error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error event: {error}"
assert_event_invariants(sender_collector, request_id)
class TestS17SendBeforeRelayPeersJoin(StepsCommon):
"""
S17: sender starts isolated, calls send()
- send() returns Ok(RequestId) immediately
- Propagated event eventually arrives after a relay peer joins
"""
def test_s17_send_before_relay_peers_joins(self, node_config): def test_s17_send_before_relay_peers_joins(self, node_config):
"""
S17: sender starts isolated, calls send()
- send() returns Ok(RequestId) immediately
- Propagated event eventually arrives
"""
sender_collector = EventCollector() sender_collector = EventCollector()
node_config.update( node_config.update(
@ -132,15 +255,18 @@ class TestSendBeforeRelay(StepsStore):
assert_event_invariants(sender_collector, request_id) assert_event_invariants(sender_collector, request_id)
class TestS19StorePeerAppearsAfterPropagation(StepsStore):
"""
S19: a store peer comes online later.
- send() returns Ok(RequestId) immediately
- Propagated --- relay peer
- Sent when store peer is reachable
"""
@pytest.mark.docker_required @pytest.mark.docker_required
@pytest.mark.xfail(reason="fails to republish after store peer joins mesh see https://github.com/logos-messaging/logos-delivery/issues/3848") @pytest.mark.xfail(reason="fails to republish after store peer joins mesh see https://github.com/logos-messaging/logos-delivery/issues/3848")
def test_s19_store_peer_appears_after_propagation(self, node_config): def test_s19_store_peer_appears_after_propagation(self, node_config):
"""
S19: a store peer comes online later.
- send() returns Ok(RequestId) immediately
- Propagated --- relay peer
- Sent when store peer is reachable
"""
sender_collector = EventCollector() sender_collector = EventCollector()
node_config.update({"relay": True, "store": False, "discv5Discovery": False, "numShardsInNetwork": 1, "reliabilityEnabled": True}) node_config.update({"relay": True, "store": False, "discv5Discovery": False, "numShardsInNetwork": 1, "reliabilityEnabled": True})
@ -228,20 +354,22 @@ class TestSendBeforeRelay(StepsStore):
assert_event_invariants(sender_collector, request_id) assert_event_invariants(sender_collector, request_id)
class TestS20StoreMissesInitiallyThenRetrySucceeds(StepsStore):
"""
S20: relay propagation succeeds, the first store query misses
(the store peer is reachable but does not yet have the message),
a later retry republishes through the relay mesh, and the store
peer then archives it.
Covers state flow:
SuccessfullyPropagated -> NextRoundRetry
-> SuccessfullyPropagated -> SuccessfullyValidated
"""
@pytest.mark.docker_required @pytest.mark.docker_required
@pytest.mark.skip(reason="Forcing the miss store round not possible") @pytest.mark.skip(reason="Forcing the miss store round not possible")
def test_s20_store_misses_initially_then_retry_succeeds(self, node_config): def test_s20_store_misses_initially_then_retry_succeeds(self, node_config):
"""
S20: relay propagation succeeds, the first store query misses
(the store peer is reachable but does not yet have the message),
a later retry republishes through the relay mesh, and the store
peer then archives it.
Covers state flow:
SuccessfullyPropagated -> NextRoundRetry
-> SuccessfullyPropagated -> SuccessfullyValidated
"""
sender_collector = EventCollector() sender_collector = EventCollector()
store_node = WakuNode(NODE_2, f"s20_store_node_{self.test_id}") store_node = WakuNode(NODE_2, f"s20_store_node_{self.test_id}")
store_node.start( store_node.start(
@ -350,66 +478,16 @@ class TestSendBeforeRelay(StepsStore):
assert_event_invariants(sender_collector, request_id) assert_event_invariants(sender_collector, request_id)
def test_s21_error_when_retry_window_expires(self, node_config):
"""
S21: delivery retry window expires before any valid path recovers.
"""
sender_collector = EventCollector()
node_config.update( class TestS22NonEphemeralWithReliabilityDisabled(StepsCommon):
{ """
"relay": True, S22: non-ephemeral message with reliabilityEnabled disabled.
"store": False, - propagation path exists ,reliabilityEnabled = false.
"lightpush": False, - Expected: Ok(RequestId), Propagated event only, no Sent event.
"filter": False, Note: S17 already covers the positive path of this test with reliabilityEnabled=True.
"discv5Discovery": False, """
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
message = create_message_bindings()
send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
# No peer
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S,
)
assert error_event is not None, (
f"No MessageErrorEvent received within {ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S}s "
f"(MaxTimeInCache={MAX_TIME_IN_CACHE_S}s + slack). "
f"Collected events: {sender_collector.events}"
)
logger.info(f"S21 received error event: {error_event}")
assert error_event.get("error") == RETRY_WINDOW_EXPIRED_MSG, (
f"Unexpected error message in message_error event.\n"
f"Expected: {RETRY_WINDOW_EXPIRED_MSG!r}\n"
f"Got: {error_event.get('error')!r}\n"
f"Full event: {error_event}"
)
assert_event_invariants(sender_collector, request_id)
def test_s22_non_ephemeral_message_with_reliability_disabled(self, node_config): def test_s22_non_ephemeral_message_with_reliability_disabled(self, node_config):
"""
S22: non-ephemeral message with reliabilityEnabled disabled.
- propagation path exists ,reliabilityEnabled = false.
- Expected: Ok(RequestId), Propagated event only, no Sent event.
Note: S17 already covers the positive path of this test with reliabilityEnabled=True.
"""
sender_collector = EventCollector() sender_collector = EventCollector()
node_config.update( node_config.update(
@ -476,11 +554,14 @@ class TestSendBeforeRelay(StepsStore):
assert_event_invariants(sender_collector, request_id) assert_event_invariants(sender_collector, request_id)
class TestS23NoSentEventWhenRelayHasNoStore(StepsCommon):
"""
S23: non-ephemeral message, reliability enabled, no store peer ever reachable.
- Expected: Ok(RequestId), Propagated event only, no Sent and no terminal error.
"""
def test_s23_no_sent_event_when_relay_has_no_store(self, node_config): def test_s23_no_sent_event_when_relay_has_no_store(self, node_config):
"""
S23: non-ephemeral message, reliability enabled, no store peer ever reachable.
- Expected: Ok(RequestId), Propagated event only, no Sent and no terminal error.
"""
sender_collector = EventCollector() sender_collector = EventCollector()
node_config.update( node_config.update(
@ -557,13 +638,15 @@ class TestSendBeforeRelay(StepsStore):
assert_event_invariants(sender_collector, request_id) assert_event_invariants(sender_collector, request_id)
def test_s24_ephemeral_message_with_reachable_store(self, node_config):
"""
S24: ephemeral message, reliability enabled, reachable store peer.
- Setup: propagation path exists, relay peer has store=True (reachable),
- Expected: Ok(RequestId), Propagated event only, no Sent event.
"""
class TestS24EphemeralMessageWithReachableStore(StepsCommon):
"""
S24: ephemeral message, reliability enabled, reachable store peer.
- Setup: propagation path exists, relay peer has store=True (reachable),
- Expected: Ok(RequestId), Propagated event only, no Sent event.
"""
def test_s24_ephemeral_message_with_reachable_store(self, node_config):
sender_collector = EventCollector() sender_collector = EventCollector()
node_config.update( node_config.update(
@ -623,398 +706,3 @@ class TestSendBeforeRelay(StepsStore):
) )
assert_event_invariants(sender_collector, request_id) assert_event_invariants(sender_collector, request_id)
def test_s26_lightpush_peer_churn_alternate_remains(self, node_config):
"""
S26: multiple lightpush peers, the selected one disappears,
an alternate remains.
Topology (3 peers + sender):
- peer1: relay + lightpush. The lightpush server initially selected
by the sender. Stopped mid-test to simulate churn.
- relay_peer: relay-only. Kept alive throughout the test as a
stable gossipsub mesh neighbour, so that after peer1 disappears
peer2 still has a relay path to propagate the message.
- peer2: relay + lightpush. The surviving lightpush server that
must take over once peer1 is gone.
- sender: edge node with peer1 and peer2 as static lightpush peers.
"""
sender_collector = EventCollector()
peer1_config = {
**node_config,
"relay": True,
"lightpush": True,
"store": False,
"filter": False,
"discv5Discovery": True,
"numShardsInNetwork": 1,
"portsShift": 1,
"discv5UdpPort": free_port(),
}
peer1_result = WrapperManager.create_and_start(config=peer1_config)
assert peer1_result.is_ok(), f"Failed to start lightpush peer1: {peer1_result.err()}"
peer1 = peer1_result.ok_value
relay_config = {
**node_config,
"relay": True,
"lightpush": False,
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"portsShift": 4,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value as relay_peer:
peer2_config = {
**peer1_config,
"staticnodes": [
get_node_multiaddr(peer1),
get_node_multiaddr(relay_peer),
],
"portsShift": 2,
"discv5UdpPort": free_port(),
}
peer2_result = WrapperManager.create_and_start(config=peer2_config)
assert peer2_result.is_ok(), f"Failed to start lightpush peer2: {peer2_result.err()}"
with peer2_result.ok_value as peer2:
sender_config = {
**node_config,
"mode": "Edge",
"relay": True,
"lightpush": True,
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"portsShift": 3,
"staticnodes": [
get_node_multiaddr(peer1),
get_node_multiaddr(peer2),
],
}
sender_result = WrapperManager.create_and_start(
config=sender_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
delay(2)
stop_result = peer1.stop_and_destroy()
assert stop_result.is_ok(), f"Failed to stop peer1: {stop_result.err()}"
delay(2)
message = create_message_bindings()
send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId) during peer churn, got: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"
# Expect Propagated via the surviving lightpush peer (peer2).
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent within {PROPAGATED_TIMEOUT_S}s "
f"after the selected lightpush peer disappeared. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error event during peer churn: {error_event}"
assert_event_invariants(sender_collector, request_id)
def test_s30_concurrent_sends_during_auto_subscribe(self, node_config):
"""
S30: concurrent sends on the same content topic during initial auto-subscribe.
- Sender starts unsubscribed to the target topic.
- Several send() calls are issued at nearly the same time.
- Each call must return Ok(RequestId) with a unique id.
- Each request id must get its own propagated event,
with no dropped or cross-associated events.
"""
sender_collector = EventCollector()
node_config.update(
{
"relay": True,
"store": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
# Relay peer so the sender has a propagation path.
relay_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender_node)],
"portsShift": 1,
}
relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"
with relay_result.ok_value:
# Build one message per send, with distinct payloads so we can
# detect any cross-association between request ids and events.
messages = [
create_message_bindings(
contentTopic=S30_CONTENT_TOPIC,
payload=to_base64(f"s30-concurrent-{i}"),
)
for i in range(S30_CONCURRENT_SENDS)
]
# Fire all sends concurrently. The sender is not yet subscribed
# to S30_CONTENT_TOPIC, so this exercises the auto-subscribe path
# under contention.
with ThreadPoolExecutor(max_workers=S30_CONCURRENT_SENDS) as pool:
send_results = list(pool.map(sender_node.send_message, messages))
# Every send must return Ok(RequestId).
request_ids = []
for i, send_result in enumerate(send_results):
assert send_result.is_ok(), f"Concurrent send #{i} failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, f"Concurrent send #{i} returned an empty RequestId"
request_ids.append(request_id)
# Request ids must be unique across concurrent sends.
assert len(set(request_ids)) == len(request_ids), f"Duplicate RequestIds returned by concurrent sends: {request_ids}"
# Each request id must get its own propagated event and no error.
for request_id in request_ids:
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=PROPAGATED_TIMEOUT_S,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent for request_id={request_id} "
f"within {PROPAGATED_TIMEOUT_S}s. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error for request_id={request_id}: {error_event}"
# Cross-association guard: every event with a requestId must
# belong to exactly one of the request ids we issued.
issued = set(request_ids)
for event in sender_collector.snapshot():
event_request_id = event.get("requestId")
if event_request_id is None:
continue
assert event_request_id in issued, (
f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"
)
# Per-request invariants apply to every concurrent send
# (correct requestId, no duplicate terminal events,
# Sent never before Propagated).
for request_id in request_ids:
assert_event_invariants(sender_collector, request_id)
@pytest.mark.docker_required
def test_s31_concurrent_sends_mixed_topics_during_churn(self, node_config):
"""
S31: concurrent sends across mixed content topics during peer churn.
"""
sender_collector = EventCollector()
relay_peer = WakuNode(NODE_2, f"s31_relay_peer_{self.test_id}")
relay_peer.start(relay="true", discv5_discovery="false")
relay_peer.set_relay_subscriptions([self.test_pubsub_topic])
lightpush_peer = WakuNode(NODE_2, f"s31_lightpush_peer_{self.test_id}")
lightpush_peer.start(relay="true", lightpush="true", discv5_discovery="false")
lightpush_peer.set_relay_subscriptions([self.test_pubsub_topic])
store_peer = WakuNode(NODE_2, f"s31_store_peer_{self.test_id}")
store_peer.start(relay="true", store="true", discv5_discovery="false")
store_peer.set_relay_subscriptions([self.test_pubsub_topic])
churn_peers = [relay_peer, lightpush_peer, store_peer]
# Mesh docker peers so a lightpushed message can fan out to the store peer.
peer_multiaddrs = [p.get_multiaddr_with_id() for p in churn_peers]
for peer in churn_peers:
others = [a for a in peer_multiaddrs if a != peer.get_multiaddr_with_id()]
peer.add_peers(others)
node_config.update(
{
"mode": "Edge",
"relay": True,
"lightpush": True,
"store": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
"lightpushnode": lightpush_peer.get_multiaddr_with_id(),
}
)
sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"
with sender_result.ok_value as sender_node:
sender_multiaddr = get_node_multiaddr(sender_node)
for peer in churn_peers:
peer.add_peers([sender_multiaddr])
delay(3) # let docker peers connect to the sender
all_request_ids: list[str] = []
phase1_ids = self._s31_fire_burst(sender_node, phase_label="phase1")
all_request_ids.extend(phase1_ids)
for peer in churn_peers:
peer.restart()
delay(1) # small window so the restart is actually in-flight
phase2_ids = self._s31_fire_burst(sender_node, phase_label="phase2")
all_request_ids.extend(phase2_ids)
# Wait for all peers to be ready again and re-attach the sender.
for peer in churn_peers:
peer.ensure_ready(timeout_duration=20)
peer.add_peers([sender_multiaddr])
peer_multiaddrs = [p.get_multiaddr_with_id() for p in churn_peers]
for peer in churn_peers:
others = [a for a in peer_multiaddrs if a != peer.get_multiaddr_with_id()]
peer.add_peers(others)
delay(3)
phase3_ids = self._s31_fire_burst(sender_node, phase_label="phase3")
all_request_ids.extend(phase3_ids)
assert len(set(all_request_ids)) == len(all_request_ids), f"Duplicate RequestIds across bursts: {all_request_ids}"
# Phase 1 ran before any churn, so the mesh was stable — standard timeout.
# Phase 3 ran right after restart + re-attach, so the mesh needed to
# re-stabilize — use the recovery timeout to avoid CI flakiness.
phase_timeouts = [
(phase1_ids, PROPAGATED_TIMEOUT_S),
(phase3_ids, RECOVERY_TIMEOUT_S),
]
for request_ids, timeout_s in phase_timeouts:
for request_id in request_ids:
propagated_event = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=timeout_s,
)
assert propagated_event is not None, (
f"No MessagePropagatedEvent for stable-phase "
f"request_id={request_id} within {timeout_s}s. "
f"Collected events: {sender_collector.events}"
)
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected message_error event for stable-phase " f"request_id={request_id}: {error_event}"
for request_id in phase2_ids:
error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=0,
)
assert error_event is None, f"Unexpected terminal message_error for phase-2 " f"request_id={request_id} after recovery: {error_event}"
issued = set(all_request_ids)
for event in sender_collector.snapshot():
event_request_id = event.get("requestId")
if event_request_id is None:
continue
assert event_request_id in issued, (
f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}"
)
# Use the hash the wrapper emitted on message_sent so the store
# lookup matches the exact bytes that were actually published.
phase3_hashes = []
for request_id in phase3_ids:
sent_event = wait_for_sent(
collector=sender_collector,
request_id=request_id,
timeout_s=RECOVERY_TIMEOUT_S,
)
assert sent_event is not None, (
f"No message_sent event for phase-3 request_id={request_id} "
f"within {RECOVERY_TIMEOUT_S}s. Collected events: {sender_collector.events}"
)
msg_hash = sent_event.get("messageHash")
assert msg_hash, f"message_sent event missing messageHash: {sent_event}"
phase3_hashes.append(msg_hash)
# 3 phases × S31_BURST_SIZE messages, so the page must fit them all,
# otherwise phase-3 hashes (which sort last in ascending order) get cut off.
self.check_sent_message_is_stored(
expected_hashes=phase3_hashes,
store_node=store_peer,
pubsub_topic=self.test_pubsub_topic,
page_size=S31_BURST_SIZE * 3,
ascending="true",
)
# Per-request invariants apply across all phases, including the
# retry-path bursts (phase 2). If retries ever emit duplicate
# Propagated events or reorder Sent before Propagated, this catches it.
for request_id in all_request_ids:
assert_event_invariants(sender_collector, request_id)
def _s31_fire_burst(self, sender_node, *, phase_label: str) -> list[str]:
"""Fire S31_BURST_SIZE concurrent sends, one per topic in S31_CONTENT_TOPICS.
Returns the list of RequestIds. Asserts every send returned Ok."""
messages = [
self.create_message(
contentTopic=S31_CONTENT_TOPICS[i],
payload=to_base64(f"s31-{phase_label}-{i}"),
)
for i in range(S31_BURST_SIZE)
]
with ThreadPoolExecutor(max_workers=S31_BURST_SIZE) as pool:
send_results = list(pool.map(sender_node.send_message, messages))
request_ids = []
for i, send_result in enumerate(send_results):
assert send_result.is_ok(), f"{phase_label}: concurrent send #{i} failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, f"{phase_label}: concurrent send #{i} returned an empty RequestId"
request_ids.append(request_id)
return request_ids

@ -1 +1 @@
Subproject commit fd4d0a5a7efcd56b395024f558afa416d7e27a2b Subproject commit 370693695f8843a26108aa34c915bf6bd9f9f2d5