From 8c7979ce678d9a1ee48bc8b7fe223d1bac4923b9 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Mon, 27 Apr 2026 10:46:47 +0200 Subject: [PATCH] Add S31 --- tests/wrappers_tests/test_send_e2e.py | 133 +++++++++++++++++++++++++- 1 file changed, 132 insertions(+), 1 deletion(-) diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py index df4fb0fe1..7cd505075 100644 --- a/tests/wrappers_tests/test_send_e2e.py +++ b/tests/wrappers_tests/test_send_e2e.py @@ -38,6 +38,16 @@ RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window" S30_CONCURRENT_SENDS = 5 S30_CONTENT_TOPIC = "/test/1/s30-concurrent/proto" +# S31: concurrent sends across mixed topics during peer churn. +S31_BURST_SIZE = 4 +S31_CONTENT_TOPICS = [ + "/test/1/s31-topic-a/proto", + "/test/1/s31-topic-b/proto", + "/test/1/s31-topic-c/proto", + "/test/1/s31-topic-d/proto", +] +S31_PROPAGATED_TIMEOUT_S = 30.0 + class TestSendBeforeRelay(StepsStore): def test_s17_send_before_relay_peers_joins(self, node_config): @@ -546,8 +556,11 @@ class TestSendBeforeRelay(StepsStore): def test_s30_concurrent_sends_during_auto_subscribe(self, node_config): """ S30: concurrent sends on the same content topic during initial auto-subscribe. + - Sender starts unsubscribed to the target topic. + - Several send() calls are issued at nearly the same time. - Each call must return Ok(RequestId) with a unique id. - + - Each request id must get its own propagated event, + with no dropped or cross-associated events. """ sender_collector = EventCollector() @@ -636,6 +649,124 @@ class TestSendBeforeRelay(StepsStore): f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}" ) + def test_s31_concurrent_sends_mixed_topics_during_churn(self, node_config): + """ + S31: concurrent sends across mixed content topics during peer churn. + """ + sender_collector = EventCollector() + + # Three docker peers, started first so the sender can discover them. + relay_peer = WakuNode(NODE_2, f"s31_relay_peer_{self.test_id}") + relay_peer.start(relay="true", discv5_discovery="false") + relay_peer.set_relay_subscriptions([self.test_pubsub_topic]) + + lightpush_peer = WakuNode(NODE_2, f"s31_lightpush_peer_{self.test_id}") + lightpush_peer.start(relay="true", lightpush="true", discv5_discovery="false") + lightpush_peer.set_relay_subscriptions([self.test_pubsub_topic]) + + store_peer = WakuNode(NODE_2, f"s31_store_peer_{self.test_id}") + store_peer.start(relay="true", store="true", discv5_discovery="false") + store_peer.set_relay_subscriptions([self.test_pubsub_topic]) + + churn_peers = [relay_peer, lightpush_peer, store_peer] + + # Sender: wrapper node with relay + lightpush enabled as a client. + node_config.update( + { + "relay": True, + "lightpush": True, + "store": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "lightpushnode": lightpush_peer.get_multiaddr_with_id(), + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender_node: + # Connect every docker peer to the sender. + sender_multiaddr = get_node_multiaddr(sender_node) + for peer in churn_peers: + peer.add_peers([sender_multiaddr]) + delay(3) # let docker peers connect to the sender + + all_request_ids: list[str] = [] + + # ---- Phase 1: burst BEFORE churn (full topology). ---- + phase1_ids = self._s31_fire_burst(sender_node, phase_label="phase1") + all_request_ids.extend(phase1_ids) + + # ---- Phase 2: restart all docker peers, burst DURING churn. ---- + for peer in churn_peers: + peer.restart() + phase2_ids = self._s31_fire_burst(sender_node, phase_label="phase2") + all_request_ids.extend(phase2_ids) + + # Wait for all peers to be ready again and re-attach the sender. + for peer in churn_peers: + peer.ensure_ready(timeout_duration=20) + peer.add_peers([sender_multiaddr]) + delay(3) + + # ---- Phase 3: burst AFTER churn (full topology restored). ---- + phase3_ids = self._s31_fire_burst(sender_node, phase_label="phase3") + all_request_ids.extend(phase3_ids) + + # All request ids across all phases must be globally unique. + assert len(set(all_request_ids)) == len(all_request_ids), f"Duplicate RequestIds across bursts: {all_request_ids}" + + # Stable-topology phases must each get a propagated event + # per request id. We do not assert this for phase 2. + for request_id in phase1_ids + phase3_ids: + propagated_event = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=S31_PROPAGATED_TIMEOUT_S, + ) + assert propagated_event is not None, ( + f"No MessagePropagatedEvent for stable-phase " + f"request_id={request_id} within {S31_PROPAGATED_TIMEOUT_S}s. " + f"Collected events: {sender_collector.events}" + ) + + issued = set(all_request_ids) + for event in sender_collector.events: + event_request_id = event.get("requestId") + if event_request_id is None: + continue + assert event_request_id in issued, ( + f"Event carries an unknown requestId={event_request_id!r}, " f"not in issued set {issued}. Event: {event}" + ) + + @staticmethod + def _s31_fire_burst(sender_node, *, phase_label: str) -> list[str]: + """Fire S31_BURST_SIZE concurrent sends, one per topic in S31_CONTENT_TOPICS. + Returns the list of request ids. Asserts every send returned Ok.""" + messages = [ + create_message_bindings( + contentTopic=S31_CONTENT_TOPICS[i], + payload=to_base64(f"s31-{phase_label}-{i}"), + ) + for i in range(S31_BURST_SIZE) + ] + + with ThreadPoolExecutor(max_workers=S31_BURST_SIZE) as pool: + send_results = list(pool.map(sender_node.send_message, messages)) + + request_ids = [] + for i, send_result in enumerate(send_results): + assert send_result.is_ok(), f"{phase_label}: concurrent send #{i} failed: {send_result.err()}" + request_id = send_result.ok_value + assert request_id, f"{phase_label}: concurrent send #{i} returned an empty RequestId" + request_ids.append(request_id) + + return request_ids + class TestS06CoreSenderRelayOnly(StepsCommon): """