diff --git a/src/node/wrapper_helpers.py b/src/node/wrapper_helpers.py index c53cb038e..813684165 100644 --- a/src/node/wrapper_helpers.py +++ b/src/node/wrapper_helpers.py @@ -67,14 +67,14 @@ def wait_for_event( """ deadline = time.monotonic() + timeout_s - while time.monotonic() < deadline: + while True: for event in collector.get_events_for_request(request_id): if predicate(event): return event + if time.monotonic() >= deadline: + return None time.sleep(poll_interval_s) - return None - def wait_for_propagated(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]: return wait_for_event(collector, request_id, is_propagated_event, timeout_s) @@ -88,6 +88,41 @@ def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float) return wait_for_event(collector, request_id, is_error_event, timeout_s) +TERMINAL_EVENT_TYPES = {EVENT_PROPAGATED, EVENT_SENT, EVENT_ERROR} + + +def assert_event_invariants(collector: EventCollector, request_id: str) -> None: + """Check per-request event invariants (issue #163): + - All events carry the correct requestId. + - No duplicate terminal events (Propagated, Sent, Error). + - Sent never appears before Propagated. + """ + events = collector.get_events_for_request(request_id) + assert events, f"No events found for request {request_id}" + + counts: dict[str, int] = {} + first_index: dict[str, int] = {} + for i, event in enumerate(events): + assert event.get("requestId") == request_id, ( + f"Event at index {i} has wrong requestId: " f"expected {request_id!r}, got {event.get('requestId')!r}" + ) + event_type = event.get("eventType", "") + if event_type in TERMINAL_EVENT_TYPES: + counts[event_type] = counts.get(event_type, 0) + 1 + if event_type not in first_index: + first_index[event_type] = i + + for event_type, count in counts.items(): + assert count == 1, f"Duplicate {event_type} events for request {request_id}: " f"got {count}, expected 1. Events: {events}" + + if EVENT_SENT in first_index and EVENT_PROPAGATED in first_index: + assert first_index[EVENT_PROPAGATED] < first_index[EVENT_SENT], ( + f"message_sent (index {first_index[EVENT_SENT]}) arrived before " + f"message_propagated (index {first_index[EVENT_PROPAGATED]}) " + f"for request {request_id}. Events: {events}" + ) + + def get_node_multiaddr(node) -> str: """Return the first TCP multiaddr (with peer-id) from a WrapperManager node.""" result = node.get_node_info_raw("MyMultiaddresses") diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py index 18a8da5aa..147caa367 100644 --- a/tests/wrappers_tests/test_send_e2e.py +++ b/tests/wrappers_tests/test_send_e2e.py @@ -1,4 +1,4 @@ -from time import time_ns +import base64 import pytest from src.env_vars import NODE_2 @@ -9,6 +9,7 @@ from src.node.waku_node import WakuNode from src.node.wrappers_manager import WrapperManager from src.node.wrapper_helpers import ( EventCollector, + assert_event_invariants, create_message_bindings, get_node_multiaddr, wait_for_propagated, @@ -16,14 +17,17 @@ from src.node.wrapper_helpers import ( wait_for_error, ) from src.steps.store import StepsStore +from tests.wrappers_tests.conftest import build_node_config logger = get_custom_logger(__name__) - PROPAGATED_TIMEOUT_S = 30.0 SENT_TIMEOUT_S = 10.0 NO_SENT_OBSERVATION_S = 5.0 SENT_AFTER_STORE_TIMEOUT_S = 60.0 +OVERSIZED_PAYLOAD_BYTES = 200 * 1024 +RECOVERY_TIMEOUT_S = 45.0 +SERVICE_DOWN_SETTLE_S = 3.0 # MaxTimeInCache from send_service.nim. MAX_TIME_IN_CACHE_S = 60.0 @@ -371,3 +375,162 @@ class TestS06CoreSenderRelayOnly(StepsCommon): sent = wait_for_sent(sender_collector, request_id, timeout_s=0) assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}" + + +class TestS14LightpushNonRetryableError(StepsCommon): + """ + S14 — Lightpush non-retryable error via oversized message. + Edge sender publishes a message exceeding DefaultMaxWakuMessageSize (150KiB) + through a lightpush service node. The server validates message size and + returns INVALID_MESSAGE (420), a non-retryable error. + Expected: send() returns Ok(RequestId), then message_error event. + """ + + def test_s14_oversized_message_triggers_error(self): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + + service_config = build_node_config(relay=True, lightpush=True, **common) + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_multiaddr = get_node_multiaddr(service) + + edge_config = build_node_config( + mode="Edge", + relay=False, + lightpushnode=service_multiaddr, + staticnodes=[service_multiaddr], + **common, + ) + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + oversized_payload = base64.b64encode(b"x" * OVERSIZED_PAYLOAD_BYTES).decode() + message = create_message_bindings( + payload=oversized_payload, + contentTopic="/test/1/s14-oversized/proto", + ) + + send_result = edge_sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + error = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert error is not None, ( + f"No message_error event within {PROPAGATED_TIMEOUT_S}s " + f"after sending oversized message. " + f"Collected events: {sender_collector.events}" + ) + assert error["requestId"] == request_id + logger.info(f"S14 received error event: {error}") + + error_msg = error.get("error", "").lower() + assert "size exceeded" in error_msg, f"Error message doesn't indicate size violation: {error}" + + propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert propagated is None, f"Unexpected message_propagated for an invalid message: {propagated}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS15LightpushRetryableErrorRecovery(StepsCommon): + """ + S15 — Lightpush retryable error + recovery. + Edge sender publishes via a lightpush service node that has NO relay peers. + The service accepts the lightpush request but returns NO_PEERS_TO_RELAY — + a retryable error (explicitly listed in the S15 spec). The message enters + the retry loop. A relay peer then joins the service node, and the next + retry succeeds. + Expected: send() returns Ok(RequestId), then eventually Propagated. + """ + + def test_s15_lightpush_retryable_error_then_recovery(self): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + + service_config = build_node_config(relay=True, lightpush=True, **common) + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_multiaddr = get_node_multiaddr(service) + + edge_config = build_node_config( + mode="Edge", + relay=False, + lightpushnode=service_multiaddr, + staticnodes=[service_multiaddr], + **common, + ) + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + msg = create_message_bindings( + payload=to_base64("S15 retryable error recovery"), + contentTopic="/test/1/s15-recovery/proto", + ) + send_result = edge_sender.send_message(message=msg) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + delay(SERVICE_DOWN_SETTLE_S) + + early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert early_propagated is None, ( + f"message_propagated arrived before relay peer joined — " f"retryable error path was not exercised: {early_propagated}" + ) + + relay_config = build_node_config( + relay=True, + staticnodes=[service_multiaddr], + **common, + ) + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=RECOVERY_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated within {RECOVERY_TIMEOUT_S}s " + f"after relay peer joined. " + f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error after recovery: {error}" + + assert_event_invariants(sender_collector, request_id)