import base64 import pytest from src.env_vars import NODE_1 from src.steps.common import StepsCommon from src.libs.common import delay, to_base64 from src.libs.custom_logger import get_custom_logger from src.node.waku_node import WakuNode from src.node.wrappers_manager import WrapperManager from src.node.wrapper_helpers import ( EventCollector, assert_event_invariants, create_message_bindings, get_node_multiaddr, wait_for_propagated, wait_for_sent, wait_for_error, ) from src.test_data import DEFAULT_CLUSTER_ID from tests.wrappers_tests.conftest import build_node_config, free_port logger = get_custom_logger(__name__) PROPAGATED_TIMEOUT_S = 30.0 NO_SENT_OBSERVATION_S = 5.0 SENT_AFTER_STORE_TIMEOUT_S = 60.0 OVERSIZED_PAYLOAD_BYTES = 200 * 1024 RECOVERY_TIMEOUT_S = 45.0 SERVICE_DOWN_SETTLE_S = 3.0 # Default-cluster shard-0 pubsub topic; used to subscribe the S11 docker store # peer so it joins the same relay mesh as the wrapper nodes (wrapper config # uses numShardsInNetwork=1 => shard 0). STORE_PEER_PUBSUB_TOPIC = f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0" class TestRelayToLightpushFallback(StepsCommon): """S08/S09 — Relay-to-lightpush fallback. Sender has relay enabled but zero gossipsub relay peers. A lightpush peer is reachable via lightpushnode (no staticnodes). Relay fails with NO_PEERS_TO_RELAY, lightpush fallback succeeds in the same processing pass. Topology: [Service] relay=True, lightpush=True [RelayPeer] relay=True, staticnodes=[service] (gives service gossipsub mesh) [Sender] relay=True, lightpush=True, lightpushnode=service (no staticnodes → zero gossipsub relay peers → fallback) """ @pytest.mark.xfail(reason="the test fail without lightpushnode, see https://github.com/logos-messaging/logos-delivery/issues/3847") def test_s08_relay_fallback_to_lightpush(self, node_config): """S08: no store peer → Propagated only.""" sender_collector = EventCollector() node_config.update( { "relay": True, "lightpush": True, "store": False, "filter": False, "discv5Discovery": False, "numShardsInNetwork": 1, } ) service_result = WrapperManager.create_and_start(config=node_config) assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" with service_result.ok_value as service: service_addr = get_node_multiaddr(service) relay_config = { **node_config, "lightpush": False, "staticnodes": [service_addr], "portsShift": 1, } relay_result = WrapperManager.create_and_start(config=relay_config) assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" with relay_result.ok_value: sender_config = { **node_config, # "lightpushnode": service_addr, #this comment currently raise issue "portsShift": 2, "discv5Discovery": True, } sender_result = WrapperManager.create_and_start( config=sender_config, event_cb=sender_collector.event_callback, ) assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" with sender_result.ok_value as sender: message = create_message_bindings() send_result = sender.send_message(message=message) assert send_result.is_ok(), f"send() failed: {send_result.err()}" request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" propagated = wait_for_propagated( collector=sender_collector, request_id=request_id, timeout_s=PROPAGATED_TIMEOUT_S, ) assert propagated is not None, ( f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" ) assert propagated["requestId"] == request_id error = wait_for_error(sender_collector, request_id, timeout_s=0) assert error is None, f"Unexpected message_error event: {error}" sent = wait_for_sent(sender_collector, request_id, timeout_s=0) assert sent is None, f"Unexpected message_sent event (no store peer): {sent}" assert_event_invariants(sender_collector, request_id) def test_s09_relay_fallback_to_lightpush_with_store_validation(self, node_config): """S09: S08 + store peer + reliability → Propagated, then Sent.""" sender_collector = EventCollector() node_config.update( { "relay": True, "lightpush": True, "store": True, "filter": False, "discv5Discovery": False, "numShardsInNetwork": 1, } ) service_result = WrapperManager.create_and_start(config=node_config) assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" with service_result.ok_value as service: service_addr = get_node_multiaddr(service) relay_config = { **node_config, "lightpush": False, "store": False, "staticnodes": [service_addr], "portsShift": 1, } relay_result = WrapperManager.create_and_start(config=relay_config) assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" with relay_result.ok_value: sender_config = {**node_config, "reliabilityEnabled": True, "storenode": service_addr, "portsShift": 2, "store": False} sender_result = WrapperManager.create_and_start( config=sender_config, event_cb=sender_collector.event_callback, ) assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" with sender_result.ok_value as sender: message = create_message_bindings() send_result = sender.send_message(message=message) assert send_result.is_ok(), f"send() failed: {send_result.err()}" request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" propagated = wait_for_propagated( collector=sender_collector, request_id=request_id, timeout_s=PROPAGATED_TIMEOUT_S, ) assert propagated is not None, ( f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" ) assert propagated["requestId"] == request_id sent = wait_for_sent( collector=sender_collector, request_id=request_id, timeout_s=SENT_AFTER_STORE_TIMEOUT_S, ) assert sent is not None, ( f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " f"after propagation. Collected events: {sender_collector.events}" ) assert sent["requestId"] == request_id error = wait_for_error(sender_collector, request_id, timeout_s=0) assert error is None, f"Unexpected message_error event: {error}" assert_event_invariants(sender_collector, request_id) class TestS10EdgeSenderLightpushOnly(StepsCommon): """ S10 — Edge sender with lightpush path only, no store peer. Edge sender has no local relay; it publishes via a lightpush service node. Expected: Propagated only (no Sent, no Error). """ @pytest.mark.xfail(reason="lightpush peer discovery via staticnodes is broken, see https://github.com/logos-messaging/logos-delivery/issues/3847") def test_s10_edge_lightpush_propagation(self): sender_collector = EventCollector() common = { "store": False, "filter": False, "discv5Discovery": False, "numShardsInNetwork": 1, } service_config = build_node_config(relay=True, lightpush=True, **common) service_result = WrapperManager.create_and_start(config=service_config) assert service_result.is_ok(), f"Failed to start service node: {service_result.err()}" with service_result.ok_value as service_node: service_multiaddr = get_node_multiaddr(service_node) relay_config = build_node_config( relay=True, staticnodes=[service_multiaddr], **common, ) relay_result = WrapperManager.create_and_start(config=relay_config) assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" with relay_result.ok_value: edge_config = build_node_config( mode="Edge", staticnodes=[service_multiaddr], **common, ) edge_result = WrapperManager.create_and_start( config=edge_config, event_cb=sender_collector.event_callback, ) assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" with edge_result.ok_value as edge_sender: message = create_message_bindings( payload=to_base64("S10 edge lightpush test payload"), contentTopic="/test/1/s10-edge-lightpush/proto", ) send_result = edge_sender.send_message(message=message) assert send_result.is_ok(), f"send() failed: {send_result.err()}" request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" propagated = wait_for_propagated( collector=sender_collector, request_id=request_id, timeout_s=PROPAGATED_TIMEOUT_S, ) assert propagated is not None, ( f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" ) assert propagated["requestId"] == request_id sent = wait_for_sent(sender_collector, request_id, timeout_s=NO_SENT_OBSERVATION_S) assert sent is None, f"Unexpected message_sent event (no store peer): {sent}" error = wait_for_error(sender_collector, request_id, timeout_s=0) assert error is None, f"Unexpected message_error event: {error}" assert_event_invariants(sender_collector, request_id) class TestS11EdgeSenderLightpushAndStore(StepsCommon): """ S11 — Edge sender with lightpush path and store validation. Edge sender has no local relay; it publishes via a wrapper lightpush peer and validates delivery via a docker store peer. Reliability enabled. Topology: [LightpushPeer] wrapper, relay=True, lightpush=True, store=False [StorePeer] docker WakuNode, relay=true, store=true, dials the lightpush peer via add_peers and subscribes to the same shard-0 pubsub topic so it joins the relay mesh and archives propagated messages. [Edge] wrapper, mode="Edge", staticnodes=[lightpush_peer], storenode=store_peer, reliabilityEnabled=True Expected: send() returns Ok(RequestId), Propagated arrives, then Sent (store validation succeeds), no Error. Purpose: edge-mode fully validated success path against a real docker store node (cross-implementation check). """ def test_s11_edge_lightpush_with_store_validation(self): sender_collector = EventCollector() common = { "filter": False, "discv5Discovery": True, "numShardsInNetwork": 1, } lightpush_config = build_node_config( relay=True, lightpush=True, store=False, **common, ) lightpush_result = WrapperManager.create_and_start(config=lightpush_config) assert lightpush_result.is_ok(), f"Failed to start lightpush peer: {lightpush_result.err()}" with lightpush_result.ok_value as lightpush_peer: lightpush_multiaddr = get_node_multiaddr(lightpush_peer) # Docker store peer — real nwaku node running as the store backend. # Dial the lightpush peer via add_peers and subscribe to the same # shard-0 pubsub topic so it joins the relay mesh and archives # messages propagated by the lightpush peer. store_peer = WakuNode(NODE_1, f"s11_store_{self.test_id}") store_peer.start(relay="true", store="true") self.add_node_peer(store_peer, [lightpush_multiaddr]) store_peer.set_relay_subscriptions([STORE_PEER_PUBSUB_TOPIC]) store_multiaddr = store_peer.get_multiaddr_with_id() edge_config = build_node_config( mode="Edge", # assuming disc5v already happened staticnodes=[lightpush_multiaddr, store_multiaddr], reliabilityEnabled=True, **common, ) edge_result = WrapperManager.create_and_start( config=edge_config, event_cb=sender_collector.event_callback, ) assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" with edge_result.ok_value as edge_sender: message = create_message_bindings( payload=to_base64("S11 edge lightpush + store test payload"), contentTopic="/test/1/s11-edge-lightpush-store/proto", ) send_result = edge_sender.send_message(message=message) assert send_result.is_ok(), f"send() failed: {send_result.err()}" request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" propagated = wait_for_propagated( collector=sender_collector, request_id=request_id, timeout_s=PROPAGATED_TIMEOUT_S, ) assert propagated is not None, ( f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" ) assert propagated["requestId"] == request_id sent = wait_for_sent( collector=sender_collector, request_id=request_id, timeout_s=SENT_AFTER_STORE_TIMEOUT_S, ) assert sent is not None, ( f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " f"after propagation. Collected events: {sender_collector.events}" ) assert sent["requestId"] == request_id error = wait_for_error(sender_collector, request_id, timeout_s=0) assert error is None, f"Unexpected message_error event: {error}" assert_event_invariants(sender_collector, request_id) class TestS14LightpushNonRetryableError(StepsCommon): """ S14 — Lightpush non-retryable error via oversized message. Edge sender publishes a message exceeding DefaultMaxWakuMessageSize (150KiB) through a lightpush service node. The server validates message size and returns INVALID_MESSAGE (420), a non-retryable error. Expected: send() returns Ok(RequestId), then message_error event. """ def test_s14_oversized_message_triggers_error(self): sender_collector = EventCollector() common = { "store": False, "filter": False, "discv5Discovery": False, "numShardsInNetwork": 1, } service_config = build_node_config(relay=True, lightpush=True, **common) service_result = WrapperManager.create_and_start(config=service_config) assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" with service_result.ok_value as service: service_multiaddr = get_node_multiaddr(service) edge_config = build_node_config( mode="Edge", staticnodes=[service_multiaddr], **common, ) edge_result = WrapperManager.create_and_start( config=edge_config, event_cb=sender_collector.event_callback, ) assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" with edge_result.ok_value as edge_sender: oversized_payload = base64.b64encode(b"x" * OVERSIZED_PAYLOAD_BYTES).decode() message = create_message_bindings( payload=oversized_payload, contentTopic="/test/1/s14-oversized/proto", ) send_result = edge_sender.send_message(message=message) assert send_result.is_ok(), f"send() failed: {send_result.err()}" request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" error = wait_for_error( collector=sender_collector, request_id=request_id, timeout_s=PROPAGATED_TIMEOUT_S, ) assert error is not None, ( f"No message_error event within {PROPAGATED_TIMEOUT_S}s " f"after sending oversized message. " f"Collected events: {sender_collector.events}" ) assert error["requestId"] == request_id logger.info(f"S14 received error event: {error}") error_msg = error.get("error", "").lower() assert "size exceeded" in error_msg, f"Error message doesn't indicate size violation: {error}" propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) assert propagated is None, f"Unexpected message_propagated for an invalid message: {propagated}" assert_event_invariants(sender_collector, request_id) class TestS15LightpushRetryableErrorRecovery(StepsCommon): """ S15 — Lightpush retryable error + recovery. Edge sender publishes via a lightpush service node that has NO relay peers. The service accepts the lightpush request but returns NO_PEERS_TO_RELAY — a retryable error (explicitly listed in the S15 spec). The message enters the retry loop. A relay peer then joins the service node, and the next retry succeeds. Expected: send() returns Ok(RequestId), then eventually Propagated. """ @pytest.mark.xfail(reason="lightpush peer discovery via staticnodes is broken, see https://github.com/logos-messaging/logos-delivery/issues/3847") def test_s15_lightpush_retryable_error_then_recovery(self): sender_collector = EventCollector() common = { "store": False, "filter": False, "discv5Discovery": False, "numShardsInNetwork": 1, } service_config = build_node_config(relay=True, lightpush=True, **common) service_result = WrapperManager.create_and_start(config=service_config) assert service_result.is_ok(), f"Failed to start service: {service_result.err()}" with service_result.ok_value as service: service_multiaddr = get_node_multiaddr(service) edge_config = build_node_config( mode="Edge", staticnodes=[service_multiaddr], **common, ) edge_result = WrapperManager.create_and_start( config=edge_config, event_cb=sender_collector.event_callback, ) assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" with edge_result.ok_value as edge_sender: msg = create_message_bindings( payload=to_base64("S15 retryable error recovery"), contentTopic="/test/1/s15-recovery/proto", ) send_result = edge_sender.send_message(message=msg) assert send_result.is_ok(), f"send() failed: {send_result.err()}" request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" delay(SERVICE_DOWN_SETTLE_S) early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) assert early_propagated is None, ( f"message_propagated arrived before relay peer joined — " f"retryable error path was not exercised: {early_propagated}" ) relay_config = build_node_config( relay=True, staticnodes=[service_multiaddr], **common, ) relay_result = WrapperManager.create_and_start(config=relay_config) assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" with relay_result.ok_value: propagated = wait_for_propagated( collector=sender_collector, request_id=request_id, timeout_s=RECOVERY_TIMEOUT_S, ) assert propagated is not None, ( f"No message_propagated within {RECOVERY_TIMEOUT_S}s " f"after relay peer joined. " f"Collected events: {sender_collector.events}" ) assert propagated["requestId"] == request_id error = wait_for_error(sender_collector, request_id, timeout_s=0) assert error is None, f"Unexpected message_error after recovery: {error}" assert_event_invariants(sender_collector, request_id) class TestS16LightpushPeerAppearsLater(StepsCommon): """ S16 — No delivery peers at T0, lightpush peer appears later. The edge sender has the lightpush service in its staticnodes, but the service is stopped before the sender starts, so there is no reachable delivery peer at T0. send() is called while the service is down. The service is restarted during the retry window; the sender connects to it and a later retry delivers the message. Expected: send() returns Ok(RequestId), then eventually Propagated. """ @pytest.mark.xfail(reason="binding cannot restart a node or add peers at runtime") def test_s16_lightpush_peer_appears_later(self): sender_collector = EventCollector() common = { "store": False, "filter": False, "discv5Discovery": False, "numShardsInNetwork": 1, } # Start the lightpush service once to obtain its multiaddr, then stop # it so the sender has no reachable peer at T0. The same node object # is restarted later, so the address stays valid. service_config = build_node_config(relay=True, lightpush=True, **common) service_result = WrapperManager.create_and_start(config=service_config) assert service_result.is_ok(), f"Failed to start lightpush peer: {service_result.err()}" with service_result.ok_value as service: service_multiaddr = get_node_multiaddr(service) stop_result = service.stop_node() assert stop_result.is_ok(), f"Failed to stop lightpush peer: {stop_result.err()}" delay(SERVICE_DOWN_SETTLE_S) # Edge sender is a lightpush client; its only peer is the service, # which is currently down. edge_config = build_node_config( mode="Edge", lightpush=True, staticnodes=[service_multiaddr], **common, ) edge_result = WrapperManager.create_and_start( config=edge_config, event_cb=sender_collector.event_callback, ) assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" with edge_result.ok_value as edge_sender: # send() is invoked while the service is down. msg = create_message_bindings( payload=to_base64("S16 lightpush peer appears later"), contentTopic="/test/1/s16-late-lightpush/proto", ) send_result = edge_sender.send_message(message=msg) assert send_result.is_ok(), f"send() failed: {send_result.err()}" request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" delay(SERVICE_DOWN_SETTLE_S) early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0) assert early_propagated is None, f"message_propagated arrived before the lightpush peer was reachable: {early_propagated}" # The lightpush peer comes back during the retry window. restart_result = service.start_node() assert restart_result.is_ok(), f"Failed to restart lightpush peer: {restart_result.err()}" propagated = wait_for_propagated( collector=sender_collector, request_id=request_id, timeout_s=RECOVERY_TIMEOUT_S, ) assert propagated is not None, ( f"No message_propagated within {RECOVERY_TIMEOUT_S}s " f"after the lightpush peer joined. " f"Collected events: {sender_collector.events}" ) assert propagated["requestId"] == request_id error = wait_for_error(sender_collector, request_id, timeout_s=0) assert error is None, f"Unexpected message_error after recovery: {error}" assert_event_invariants(sender_collector, request_id) class TestS26LightpushPeerChurn(StepsCommon): """ S26: multiple lightpush peers, the selected one disappears, an alternate remains. Topology (3 peers + sender): - peer1: relay + lightpush. The lightpush server initially selected by the sender. Stopped mid-test to simulate churn. - relay_peer: relay-only. Kept alive throughout the test as a stable gossipsub mesh neighbour, so that after peer1 disappears peer2 still has a relay path to propagate the message. - peer2: relay + lightpush. The surviving lightpush server that must take over once peer1 is gone. - sender: edge node with peer1 and peer2 as static lightpush peers. """ def test_s26_lightpush_peer_churn_alternate_remains(self, node_config): sender_collector = EventCollector() peer1_config = { **node_config, "relay": True, "lightpush": True, "store": False, "filter": False, "discv5Discovery": True, "numShardsInNetwork": 1, "portsShift": 1, "discv5UdpPort": free_port(), } peer1_result = WrapperManager.create_and_start(config=peer1_config) assert peer1_result.is_ok(), f"Failed to start lightpush peer1: {peer1_result.err()}" peer1 = peer1_result.ok_value relay_config = { **node_config, "relay": True, "lightpush": False, "store": False, "filter": False, "discv5Discovery": False, "numShardsInNetwork": 1, "portsShift": 4, } relay_result = WrapperManager.create_and_start(config=relay_config) assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" with relay_result.ok_value as relay_peer: peer2_config = { **peer1_config, "staticnodes": [ get_node_multiaddr(peer1), get_node_multiaddr(relay_peer), ], "portsShift": 2, "discv5UdpPort": free_port(), } peer2_result = WrapperManager.create_and_start(config=peer2_config) assert peer2_result.is_ok(), f"Failed to start lightpush peer2: {peer2_result.err()}" with peer2_result.ok_value as peer2: sender_config = { **node_config, "mode": "Edge", "relay": True, "lightpush": True, "store": False, "filter": False, "discv5Discovery": False, "numShardsInNetwork": 1, "portsShift": 3, "staticnodes": [ get_node_multiaddr(peer1), get_node_multiaddr(peer2), ], } sender_result = WrapperManager.create_and_start( config=sender_config, event_cb=sender_collector.event_callback, ) assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" with sender_result.ok_value as sender_node: delay(2) stop_result = peer1.stop_and_destroy() assert stop_result.is_ok(), f"Failed to stop peer1: {stop_result.err()}" delay(2) message = create_message_bindings() send_result = sender_node.send_message(message=message) assert send_result.is_ok(), f"send() must return Ok(RequestId) during peer churn, got: {send_result.err()}" request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" # Expect Propagated via the surviving lightpush peer (peer2). propagated_event = wait_for_propagated( collector=sender_collector, request_id=request_id, timeout_s=PROPAGATED_TIMEOUT_S, ) assert propagated_event is not None, ( f"No MessagePropagatedEvent within {PROPAGATED_TIMEOUT_S}s " f"after the selected lightpush peer disappeared. " f"Collected events: {sender_collector.events}" ) error_event = wait_for_error( collector=sender_collector, request_id=request_id, timeout_s=0, ) assert error_event is None, f"Unexpected message_error event during peer churn: {error_event}" assert_event_invariants(sender_collector, request_id)