From 959bda526ef36d85af5d7f3cc46ffccefcec3834 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Mon, 11 May 2026 16:44:23 +0200 Subject: [PATCH] modify s11 scenario to pass --- tests/wrappers_tests/test_send_e2e_part2.py | 146 ++++++++++---------- 1 file changed, 74 insertions(+), 72 deletions(-) diff --git a/tests/wrappers_tests/test_send_e2e_part2.py b/tests/wrappers_tests/test_send_e2e_part2.py index 76ab76a53..2a096fbaa 100644 --- a/tests/wrappers_tests/test_send_e2e_part2.py +++ b/tests/wrappers_tests/test_send_e2e_part2.py @@ -8,6 +8,8 @@ import pytest from src.steps.common import StepsCommon from src.libs.common import delay, to_base64 from src.libs.custom_logger import get_custom_logger +from src.env_vars import NODE_1 +from src.node.waku_node import WakuNode from src.node.wrappers_manager import WrapperManager from src.node.wrapper_helpers import ( EventCollector, @@ -19,6 +21,7 @@ from src.node.wrapper_helpers import ( wait_for_sent, wait_for_error, ) +from src.test_data import DEFAULT_CLUSTER_ID from tests.wrappers_tests.conftest import build_node_config logger = get_custom_logger(__name__) @@ -38,6 +41,11 @@ CACHE_EXPIRY_SLACK_S = 10.0 ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window" +# Default-cluster shard-0 pubsub topic; used to subscribe the S11 docker store +# peer so it joins the same relay mesh as the wrapper nodes (wrapper config +# uses numShardsInNetwork=1 => shard 0). +STORE_PEER_PUBSUB_TOPIC = f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0" + S01_EXPECTED_ERROR_FRAGMENT = "not initialized" S01_SUBPROCESS_TIMEOUT_S = 30 S01_RESULT_MARKER = "__S01_RESULT__" @@ -834,21 +842,22 @@ class TestS10EdgeSenderLightpushOnly(StepsCommon): class TestS11EdgeSenderLightpushAndStore(StepsCommon): """ S11 — Edge sender with lightpush path and store validation. - Edge sender has no local relay; it publishes via a dedicated lightpush - peer and validates delivery via a dedicated store peer. Reliability - enabled. + Edge sender has no local relay; it publishes via a wrapper lightpush peer + and validates delivery via a docker store peer. Reliability enabled. Topology: - [LightpushPeer] relay=True, lightpush=True, store=False - [StorePeer] relay=True, store=True, lightpush=False, - staticnodes=[lightpush_peer] (joins the relay mesh - so it archives propagated messages) - [Edge] mode="Edge", + [LightpushPeer] wrapper, relay=True, lightpush=True, store=False + [StorePeer] docker WakuNode, relay=true, store=true, + dials the lightpush peer via add_peers and subscribes + to the same shard-0 pubsub topic so it joins the + relay mesh and archives propagated messages. + [Edge] wrapper, mode="Edge", staticnodes=[lightpush_peer], storenode=store_peer, reliabilityEnabled=True Expected: send() returns Ok(RequestId), Propagated arrives, then Sent (store validation succeeds), no Error. - Purpose: edge-mode fully validated success path. + Purpose: edge-mode fully validated success path against a real docker + store node (cross-implementation check). """ def test_s11_edge_lightpush_with_store_validation(self, node_config): @@ -873,79 +882,72 @@ class TestS11EdgeSenderLightpushAndStore(StepsCommon): with lightpush_result.ok_value as lightpush_peer: lightpush_multiaddr = get_node_multiaddr(lightpush_peer) - # Store peer joins the lightpush peer's relay mesh so propagated - # messages are archived and become visible to store queries. - store_config = build_node_config( - relay=True, - lightpush=False, - store=True, - staticnodes=[lightpush_multiaddr], + # Docker store peer — real nwaku node running as the store backend. + # Dial the lightpush peer via add_peers and subscribe to the same + # shard-0 pubsub topic so it joins the relay mesh and archives + # messages propagated by the lightpush peer. + store_peer = WakuNode(NODE_1, f"s11_store_{self.test_id}") + store_peer.start(relay="true", store="true") + self.add_node_peer(store_peer, [lightpush_multiaddr]) + store_peer.set_relay_subscriptions([STORE_PEER_PUBSUB_TOPIC]) + store_multiaddr = store_peer.get_multiaddr_with_id() + + # Edge sender must dial BOTH peers: the lightpush peer for the + # publish path, and the store peer so the store-query channel + # is actually connected when reliability validation runs. + # storenode= alone registers it in service slots but does not + # dial it. + edge_config = build_node_config( + mode="Edge", + # assuming disc5v already happened + staticnodes=[lightpush_multiaddr, store_multiaddr], + storenode=store_multiaddr, + reliabilityEnabled=True, **common, ) - store_result = WrapperManager.create_and_start(config=store_config) - assert store_result.is_ok(), f"Failed to start store peer: {store_result.err()}" + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" - with store_result.ok_value as store_peer: - store_multiaddr = get_node_multiaddr(store_peer) - - # Edge sender must dial BOTH peers: the lightpush peer for the - # publish path, and the store peer so the store-query channel - # is actually connected when reliability validation runs. - # storenode= alone registers it in service slots but does not - # dial it. - edge_config = build_node_config( - mode="Edge", - staticnodes=[lightpush_multiaddr], - lightpushnode=lightpush_multiaddr, - storenode=store_multiaddr, - reliabilityEnabled=True, - **common, + with edge_result.ok_value as edge_sender: + message = create_message_bindings( + payload=to_base64("S11 edge lightpush + store test payload"), + contentTopic="/test/1/s11-edge-lightpush-store/proto", ) - edge_result = WrapperManager.create_and_start( - config=edge_config, - event_cb=sender_collector.event_callback, + send_result = edge_sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, ) - assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id - with edge_result.ok_value as edge_sender: - message = create_message_bindings( - payload=to_base64("S11 edge lightpush + store test payload"), - contentTopic="/test/1/s11-edge-lightpush-store/proto", - ) + sent = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_AFTER_STORE_TIMEOUT_S, + ) + assert sent is not None, ( + f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " f"after propagation. Collected events: {sender_collector.events}" + ) + assert sent["requestId"] == request_id - send_result = edge_sender.send_message(message=message) - assert send_result.is_ok(), f"send() failed: {send_result.err()}" + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" - request_id = send_result.ok_value - assert request_id, "send() returned an empty RequestId" - - propagated = wait_for_propagated( - collector=sender_collector, - request_id=request_id, - timeout_s=PROPAGATED_TIMEOUT_S, - ) - assert propagated is not None, ( - f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" - ) - assert propagated["requestId"] == request_id - - sent = wait_for_sent( - collector=sender_collector, - request_id=request_id, - timeout_s=SENT_AFTER_STORE_TIMEOUT_S, - ) - assert sent is not None, ( - f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " - f"after propagation. Collected events: {sender_collector.events}" - ) - assert sent["requestId"] == request_id - - error = wait_for_error(sender_collector, request_id, timeout_s=0) - assert error is None, f"Unexpected message_error event: {error}" - - assert_event_invariants(sender_collector, request_id) + assert_event_invariants(sender_collector, request_id) class TestS12IsolatedSenderNoPeers(StepsCommon):