diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py index cd46d954e..d57a85c2b 100644 --- a/tests/wrappers_tests/test_send_e2e.py +++ b/tests/wrappers_tests/test_send_e2e.py @@ -818,3 +818,166 @@ class TestS15LightpushRetryableErrorRecovery(StepsCommon): assert error is None, f"Unexpected message_error after recovery: {error}" assert_event_invariants(sender_collector, request_id) + + +class TestRelayToLightpushFallback(StepsCommon): + """S08/S09 — Relay-to-lightpush fallback. + + Sender has relay enabled but zero gossipsub relay peers. + A lightpush peer is reachable via lightpushnode (no staticnodes). + Relay fails with NO_PEERS_TO_RELAY, lightpush fallback succeeds + in the same processing pass. + + Topology: + [Service] relay=True, lightpush=True + [RelayPeer] relay=True, staticnodes=[service] (gives service gossipsub mesh) + [Sender] relay=True, lightpush=True, lightpushnode=service + (no staticnodes → zero gossipsub relay peers → fallback) + """ + + def test_s08_relay_fallback_to_lightpush(self, node_config): + """S08: no store peer → Propagated only.""" + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "lightpush": True, + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + service_result = WrapperManager.create_and_start(config=node_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_addr = get_node_multiaddr(service) + + relay_config = { + **node_config, + "lightpush": False, + "staticnodes": [service_addr], + "portsShift": 1, + } + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + sender_config = { + **node_config, + "lightpushnode": service_addr, + "portsShift": 2, + } + sender_result = WrapperManager.create_and_start( + config=sender_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings() + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + sent = wait_for_sent(sender_collector, request_id, timeout_s=0) + assert sent is None, f"Unexpected message_sent event (no store peer): {sent}" + + assert_event_invariants(sender_collector, request_id) + + def test_s09_relay_fallback_to_lightpush_with_store_validation(self, node_config): + """S09: S08 + store peer + reliability → Propagated, then Sent.""" + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "lightpush": True, + "store": True, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + service_result = WrapperManager.create_and_start(config=node_config) + assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" + + with service_result.ok_value as service: + service_addr = get_node_multiaddr(service) + + relay_config = { + **node_config, + "lightpush": False, + "store": False, + "staticnodes": [service_addr], + "portsShift": 1, + } + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + + with relay_result.ok_value: + sender_config = { + **node_config, + "reliabilityEnabled": True, + "lightpushnode": service_addr, + "storenode": service_addr, + "portsShift": 2, + } + sender_result = WrapperManager.create_and_start( + config=sender_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + message = create_message_bindings() + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + sent = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_AFTER_STORE_TIMEOUT_S, + ) + assert sent is not None, ( + f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " + f"after propagation. Collected events: {sender_collector.events}" + ) + assert sent["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + assert_event_invariants(sender_collector, request_id)