diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py index 893074bde..9104bda11 100644 --- a/tests/wrappers_tests/test_send_e2e.py +++ b/tests/wrappers_tests/test_send_e2e.py @@ -27,6 +27,10 @@ NO_SENT_OBSERVATION_S = 5.0 SENT_AFTER_STORE_TIMEOUT_S = 60.0 NO_STORE_OBSERVATION_S = 60.0 +# S20 stabilization delays for gossipsub mesh formation. +MESH_STABILIZATION_S = 5 +STORE_JOIN_STABILIZATION_S = 10 + # MaxTimeInCache from send_service.nim. MAX_TIME_IN_CACHE_S = 60.0 # Extra slack to cover the background retry loop tick after the window expires. @@ -280,6 +284,7 @@ class TestSendBeforeRelay(StepsStore): ascending="true", ) + @pytest.mark.xfail("error:NoPeersToPublish") def test_s20_store_misses_initially_then_retry_succeeds(self, node_config): """ S20: relay propagation succeeds, initial store query misses, @@ -308,18 +313,45 @@ class TestSendBeforeRelay(StepsStore): assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" with sender_result.ok_value as sender_node: - # Relay-only peer: gives the sender a propagation path but no store. - relay_config = { + # Three relay peers so the sender keeps mesh peers during store-join churn. + relay_a_config = { **node_config, "staticnodes": [get_node_multiaddr(sender_node)], "portsshift": 1, "store": False, + "reliabilityEnabled": True, + } + relay_b_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender_node)], + "portsshift": 2, + "store": False, + "reliabilityEnabled": True, + } + relay_c_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender_node)], + "portsshift": 3, + "store": False, } - relay_result = WrapperManager.create_and_start(config=relay_config) - assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" + relay_a_result = WrapperManager.create_and_start(config=relay_a_config) + assert relay_a_result.is_ok(), f"Failed to start relay peer A: {relay_a_result.err()}" + + relay_b_result = WrapperManager.create_and_start(config=relay_b_config) + assert relay_b_result.is_ok(), f"Failed to start relay peer B: {relay_b_result.err()}" + + relay_c_result = WrapperManager.create_and_start(config=relay_c_config) + assert relay_c_result.is_ok(), f"Failed to start relay peer C: {relay_c_result.err()}" + + with ( + relay_a_result.ok_value as relay_peer_a, + relay_b_result.ok_value as relay_peer_b, + relay_c_result.ok_value as relay_peer_c, + ): + # Let the gossipsub mesh form before publishing. + delay(MESH_STABILIZATION_S) - with relay_result.ok_value as relay_peer: message = create_message_bindings(ephemeral=False) send_result = sender_node.send_message(message=message) assert send_result.is_ok(), f"send() must return Ok(RequestId), got: {send_result.err()}" @@ -327,7 +359,7 @@ class TestSendBeforeRelay(StepsStore): request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" - # First round: propagation succeeds via the relay peer. + # First round: propagation succeeds. propagated_event = wait_for_propagated( collector=sender_collector, request_id=request_id, @@ -352,12 +384,18 @@ class TestSendBeforeRelay(StepsStore): store_node.start(relay="true", store="true", discv5_discovery="false") store_node.set_relay_subscriptions([self.test_pubsub_topic]) - relay_multiaddr = get_node_multiaddr(relay_peer) + # Connect relay peers first, then the sender, so mesh churn on + # the sender doesn't disrupt the store-archival path. sender_multiaddr = get_node_multiaddr(sender_node) - store_node.add_peers([relay_multiaddr, sender_multiaddr]) - delay(3) + relay_a_multiaddr = get_node_multiaddr(relay_peer_a) + relay_b_multiaddr = get_node_multiaddr(relay_peer_b) + relay_c_multiaddr = get_node_multiaddr(relay_peer_c) + store_node.add_peers([relay_a_multiaddr, relay_b_multiaddr, relay_c_multiaddr]) + delay(STORE_JOIN_STABILIZATION_S) + store_node.add_peers([sender_multiaddr]) + delay(STORE_JOIN_STABILIZATION_S) - # Retry round: republish reaches the store peer, validation passes. + # Retry round: republish reaches the store. sent_event = wait_for_sent( collector=sender_collector, request_id=request_id, @@ -602,7 +640,7 @@ class TestSendBeforeRelay(StepsStore): **peer1_config, "staticnodes": [ get_node_multiaddr(peer1), - # get_node_multiaddr(relay_peer), + get_node_multiaddr(relay_peer), ], "portsshift": 2, } @@ -761,7 +799,7 @@ class TestSendBeforeRelay(StepsStore): f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}" ) - @pytest.mark.note("S31 exposes nwaku crash in json_serialization writer") + @pytest.mark.xfail("S31 exposes nwaku crash in json_serialization writer") def test_s31_concurrent_sends_mixed_topics_during_churn(self, node_config): """ S31: concurrent sends across mixed content topics during peer churn.