diff --git a/tests/wrappers_tests/test_send_lightpush_and_edge.py b/tests/wrappers_tests/test_send_lightpush_and_edge.py index 917ecd918..ac7ae057b 100644 --- a/tests/wrappers_tests/test_send_lightpush_and_edge.py +++ b/tests/wrappers_tests/test_send_lightpush_and_edge.py @@ -532,6 +532,85 @@ class TestS15LightpushRetryableErrorRecovery(StepsCommon): assert_event_invariants(sender_collector, request_id) +class TestS16LightpushPeerAppearsLater(StepsCommon): + """ + S16 — No delivery peers at T0, lightpush peer appears later. + The edge sender starts fully isolated and send() is called before any + peer exists. A lightpush service node joins during the retry window, + and a later retry delivers the message. + Expected: send() returns Ok(RequestId), then eventually Propagated. + """ + + @pytest.mark.xfail(reason="lightpush peer discovery via staticnodes is broken, see https://github.com/logos-messaging/logos-delivery/issues/3847") + def test_s16_lightpush_peer_appears_later(self): + sender_collector = EventCollector() + + common = { + "store": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + service_config = build_node_config(relay=True, lightpush=True, **common) + service_result = WrapperManager.create_and_start(config=service_config) + assert service_result.is_ok(), f"Failed to start lightpush peer: {service_result.err()}" + + with service_result.ok_value as service: + service_multiaddr = get_node_multiaddr(service) + + stop_result = service.stop_node() + assert stop_result.is_ok(), f"Failed to stop lightpush peer: {stop_result.err()}" + delay(SERVICE_DOWN_SETTLE_S) + + edge_config = build_node_config( + mode="Edge", + staticnodes=[service_multiaddr], + **common, + ) + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + # send() is invoked while the sender is isolated; no peer exists yet. + msg = create_message_bindings( + payload=to_base64("S16 lightpush peer appears later"), + contentTopic="/test/1/s16-late-lightpush/proto", + ) + send_result = edge_sender.send_message(message=msg) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + delay(SERVICE_DOWN_SETTLE_S) + + early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) + assert early_propagated is None, f"message_propagated arrived before any lightpush peer existed: {early_propagated}" + + # The lightpush peer comes back during the retry window. + restart_result = service.start_node() + assert restart_result.is_ok(), f"Failed to restart lightpush peer: {restart_result.err()}" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=RECOVERY_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated within {RECOVERY_TIMEOUT_S}s " + f"after the lightpush peer joined. " + f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error after recovery: {error}" + + assert_event_invariants(sender_collector, request_id) + + class TestS26LightpushPeerChurn(StepsCommon): """ S26: multiple lightpush peers, the selected one disappears,