From bf1731805b5d340114eb0cc875c801b9fc44987c Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Thu, 7 May 2026 12:13:23 +0200 Subject: [PATCH] address review comments about xfail --- tests/wrappers_tests/test_send_e2e_part1.py | 105 ++++++++------------ 1 file changed, 40 insertions(+), 65 deletions(-) diff --git a/tests/wrappers_tests/test_send_e2e_part1.py b/tests/wrappers_tests/test_send_e2e_part1.py index 851359e43..7f6a1d8f2 100644 --- a/tests/wrappers_tests/test_send_e2e_part1.py +++ b/tests/wrappers_tests/test_send_e2e_part1.py @@ -222,17 +222,28 @@ class TestSendBeforeRelay(StepsStore): ascending="true", ) - # @pytest.mark.xfail(reason="error:NoPeersToPublish") + @pytest.mark.skip(reason="Forcing the miss store round not possible") def test_s20_store_misses_initially_then_retry_succeeds(self, node_config): """ - S20: relay propagation succeeds, initial store query misses, - a retry republishes, and a store peer eventually archives the message. + S20: relay propagation succeeds, the first store query misses + (the store peer is reachable but does not yet have the message), + a later retry republishes through the relay mesh, and the store + peer then archives it. Covers state flow: SuccessfullyPropagated -> NextRoundRetry -> SuccessfullyPropagated -> SuccessfullyValidated + """ - sender_collector = EventCollector() + store_node = WakuNode(NODE_2, f"s20_store_node_{self.test_id}") + store_node.start( + relay="true", + store="true", + discv5_discovery="false", + cluster_id=node_config["clusterId"], + shard=0, + ) + store_multiaddr = store_node.get_multiaddr_with_id() node_config.update( { @@ -241,6 +252,7 @@ class TestSendBeforeRelay(StepsStore): "discv5Discovery": False, "numShardsInNetwork": 1, "reliabilityEnabled": True, + "storenode": store_multiaddr, } ) @@ -251,43 +263,22 @@ class TestSendBeforeRelay(StepsStore): assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" with sender_result.ok_value as sender_node: - # Three relay peers so the sender keeps mesh peers during store-join churn. - relay_a_config = { + relay_config = { **node_config, "staticnodes": [get_node_multiaddr(sender_node)], "portsShift": 1, "store": False, - "reliabilityEnabled": True, - } - relay_b_config = { - **node_config, - "staticnodes": [get_node_multiaddr(sender_node)], - "portsShift": 2, - "store": False, - "reliabilityEnabled": True, - } - relay_c_config = { - **node_config, - "staticnodes": [get_node_multiaddr(sender_node)], - "portsShift": 3, - "store": False, } + relay_result = WrapperManager.create_and_start(config=relay_config) + assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}" - relay_a_result = WrapperManager.create_and_start(config=relay_a_config) - assert relay_a_result.is_ok(), f"Failed to start relay peer A: {relay_a_result.err()}" + with relay_result.ok_value as relay_peer: + # Wait for the sender to see the relay peer before publishing. + assert wait_for_connected(sender_collector) is not None, ( + f"Sender did not reach Connected/PartiallyConnected. " f"Collected events: {sender_collector.events}" + ) - relay_b_result = WrapperManager.create_and_start(config=relay_b_config) - assert relay_b_result.is_ok(), f"Failed to start relay peer B: {relay_b_result.err()}" - - relay_c_result = WrapperManager.create_and_start(config=relay_c_config) - assert relay_c_result.is_ok(), f"Failed to start relay peer C: {relay_c_result.err()}" - - with ( - relay_a_result.ok_value as relay_peer_a, - relay_b_result.ok_value as relay_peer_b, - relay_c_result.ok_value as relay_peer_c, - ): - # Let the gossipsub mesh form before publishing. + # Let the gossipsub mesh form between sender and relay peer. delay(MESH_STABILIZATION_S) message = create_message_bindings(ephemeral=False) @@ -297,62 +288,47 @@ class TestSendBeforeRelay(StepsStore): request_id = send_result.ok_value assert request_id, "send() returned an empty RequestId" - # First round: propagation succeeds. + # Round 1: propagation succeeds via the relay peer. propagated_event = wait_for_propagated( collector=sender_collector, request_id=request_id, timeout_s=PROPAGATED_TIMEOUT_S, ) assert propagated_event is not None, ( - f"No MessagePropagatedEvent received within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + f"No MessagePropagatedEvent within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" ) + # The store peer is reachable for queries but never received + # the message via gossipsub, so the first store query must + # miss and Sent must NOT arrive yet. early_sent_event = wait_for_sent( collector=sender_collector, request_id=request_id, timeout_s=NO_SENT_OBSERVATION_S, ) assert early_sent_event is None, ( - f"MessageSentEvent arrived before any store peer was reachable. " - f"Initial store validation should have missed and triggered a retry. " - f"Event: {early_sent_event}" + f"MessageSentEvent arrived before the store could have the message. " + f"Initial store query should have missed. Event: {early_sent_event}" ) - store_node = WakuNode(NODE_2, f"s20_store_node_{self.test_id}") - store_node.start( - relay="true", - store="true", - discv5_discovery="false", - cluster_id=node_config["clusterId"], - shard=0, - ) + # Now subscribe the store to the test topic and wire it into + # the relay mesh so the next retry round's republish reaches + # the store via gossipsub. store_node.set_relay_subscriptions([self.test_pubsub_topic]) - - # Connect the relay path first and let the mesh settle, then - # connect the sender so its mesh churn doesn't drop the store - # from the topic peer set. - sender_multiaddr = get_node_multiaddr(sender_node) - relay_a_multiaddr = get_node_multiaddr(relay_peer_a) - relay_b_multiaddr = get_node_multiaddr(relay_peer_b) - relay_c_multiaddr = get_node_multiaddr(relay_peer_c) - - store_node.add_peers([relay_a_multiaddr, relay_b_multiaddr, relay_c_multiaddr]) + store_node.add_peers([get_node_multiaddr(sender_node), get_node_multiaddr(relay_peer)]) self.wait_for_autoconnection([store_node], hard_wait=10) delay(STORE_JOIN_STABILIZATION_S) - store_node.add_peers([sender_multiaddr]) - delay(STORE_JOIN_STABILIZATION_S) - - # Retry round: republish reaches the store. + # Round 2: retry republishes, store archives, next query hits. sent_event = wait_for_sent( collector=sender_collector, request_id=request_id, timeout_s=SENT_AFTER_STORE_TIMEOUT_S, ) assert sent_event is not None, ( - f"No MessageSentEvent received within {SENT_AFTER_STORE_TIMEOUT_S}s " - f"after the store peer joined. The retry round should have " - f"republished the message and the store peer should have archived it. " + f"No MessageSentEvent within {SENT_AFTER_STORE_TIMEOUT_S}s " + f"after the store joined the relay mesh. The retry round " + f"should have republished and the store should have archived. " f"Collected events: {sender_collector.events}" ) @@ -838,7 +814,6 @@ class TestSendBeforeRelay(StepsStore): f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}" ) - # @pytest.mark.xfail(reason="S31 exposes nwaku crash in json_serialization writer") def test_s31_concurrent_sends_mixed_topics_during_churn(self, node_config): """ S31: concurrent sends across mixed content topics during peer churn.