From c89ebe653c2f5f35b7ebab45d71503dbb4e148f2 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Tue, 12 May 2026 17:27:06 +0200 Subject: [PATCH] Adding the new tests in part3 file --- tests/wrappers_tests/test_send_e2e_part2.py | 533 ------------------- tests/wrappers_tests/test_send_e2e_part3.py | 552 ++++++++++++++++++++ 2 files changed, 552 insertions(+), 533 deletions(-) create mode 100644 tests/wrappers_tests/test_send_e2e_part3.py diff --git a/tests/wrappers_tests/test_send_e2e_part2.py b/tests/wrappers_tests/test_send_e2e_part2.py index 6a2800433..60a1d8861 100644 --- a/tests/wrappers_tests/test_send_e2e_part2.py +++ b/tests/wrappers_tests/test_send_e2e_part2.py @@ -1,15 +1,9 @@ import base64 -import json -import subprocess -import sys -import textwrap import pytest from src.steps.common import StepsCommon from src.libs.common import delay, to_base64 from src.libs.custom_logger import get_custom_logger -from src.env_vars import NODE_1 -from src.node.waku_node import WakuNode from src.node.wrappers_manager import WrapperManager from src.node.wrapper_helpers import ( EventCollector, @@ -21,7 +15,6 @@ from src.node.wrapper_helpers import ( wait_for_sent, wait_for_error, ) -from src.test_data import DEFAULT_CLUSTER_ID from tests.wrappers_tests.conftest import build_node_config logger = get_custom_logger(__name__) @@ -41,204 +34,6 @@ CACHE_EXPIRY_SLACK_S = 10.0 ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window" -# Default-cluster shard-0 pubsub topic; used to subscribe the S11 docker store -# peer so it joins the same relay mesh as the wrapper nodes (wrapper config -# uses numShardsInNetwork=1 => shard 0). -STORE_PEER_PUBSUB_TOPIC = f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0" - -S01_EXPECTED_ERROR_FRAGMENT = "not initialized" -S01_SUBPROCESS_TIMEOUT_S = 30 -S01_RESULT_MARKER = "__S01_RESULT__" -SEND_AFTER_DESTROY_RESULT_MARKER = "__SEND_AFTER_DESTROY_RESULT__" -SEND_AFTER_DESTROY_SUBPROCESS_TIMEOUT_S = 60 - -# S05: malformed content topics break shard resolution inside -# SubscriptionManager.subscribe(), forcing the auto-subscribe step to fail. -# Each case targets a distinct validator branch. -# Fragment match: the bindings return Err("Failed to auto-subscribe before -# sending: ..."). We match on the capitalized verb phrase "Failed to -# auto-subscribe" because the bare word "auto-subscribe" also appears in -# success-path log strings (e.g. "Auto-subscribing to topic on send") and -# would produce false positives if we matched on that. -S05_EXPECTED_ERROR_FRAGMENT = "Failed to auto-subscribe" -S05_MALFORMED_CONTENT_TOPICS = [ - # No leading slash — parser rejects with "must start with slash". - ("s05-invalid-no-leading-slash", "no-leading-slash"), - # Empty string — parser rejects empty content topic. - ("", "empty"), - # Only 3 segments — content topics need /app/version/name/encoding. - ("/app/1/name", "missing-encoding-segment"), - # Empty middle segment between slashes. - ("/app//name/proto", "empty-middle-segment"), -] - - -# Run send() in a subprocess so a missing C-ABI guard (which can SIGSEGV) -# fails the test cleanly instead of taking the runner down. -_S01_SUBPROCESS_SCRIPT = textwrap.dedent( - f""" - import json - import sys - from pathlib import Path - - _project_root = Path({repr(__file__)}).resolve().parents[2] - _bindings_path = _project_root / "vendor" / "logos-delivery-python-bindings" / "waku" - if str(_bindings_path) not in sys.path: - sys.path.insert(0, str(_bindings_path)) - if str(_project_root) not in sys.path: - sys.path.insert(0, str(_project_root)) - - from wrapper import NodeWrapper, ffi - from src.node.wrappers_manager import WrapperManager - from src.node.wrapper_helpers import create_message_bindings - - sender = WrapperManager(NodeWrapper(ctx=ffi.NULL, config_buffer=None, event_cb_handler=None)) - send_result = sender.send_message(message=create_message_bindings()) - - print({repr(S01_RESULT_MARKER)} + json.dumps({{ - "is_ok": send_result.is_ok(), - "ok": send_result.ok_value if send_result.is_ok() else None, - "err": send_result.err() if send_result.is_err() else None, - }})) - sys.exit(0) - """ -).strip() - - -# Uses destroy_keep_ctx() so self.ctx stays non-nil after destroy — forces -# the send call to reach the C side with the original (now-stale) pointer. -_SEND_AFTER_DESTROY_SUBPROCESS_SCRIPT = textwrap.dedent( - f""" - import json - import sys - from pathlib import Path - - _project_root = Path({repr(__file__)}).resolve().parents[2] - _bindings_path = _project_root / "vendor" / "logos-delivery-python-bindings" / "waku" - if str(_bindings_path) not in sys.path: - sys.path.insert(0, str(_bindings_path)) - if str(_project_root) not in sys.path: - sys.path.insert(0, str(_project_root)) - - from src.node.wrappers_manager import WrapperManager - from src.node.wrapper_helpers import EventCollector, create_message_bindings - from tests.wrappers_tests.conftest import build_node_config - - collector = EventCollector() - - create_result = WrapperManager.create_and_start( - config=build_node_config(), - event_cb=collector.event_callback, - ) - if create_result.is_err(): - print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps({{ - "stage": "create_and_start", - "is_ok": False, - "ok": None, - "err": create_result.err(), - "events_after_send": [], - }})) - sys.exit(0) - - sender = create_result.ok_value - - stop_result = sender.stop_node() - if stop_result.is_err(): - print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps({{ - "stage": "stop_node", - "is_ok": False, - "ok": None, - "err": stop_result.err(), - "events_after_send": [], - }})) - sys.exit(0) - - destroy_result = sender.destroy_keep_ctx() - if destroy_result.is_err(): - print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps({{ - "stage": "destroy_keep_ctx", - "is_ok": False, - "ok": None, - "err": destroy_result.err(), - "events_after_send": [], - }})) - sys.exit(0) - - events_before_send = len(collector.events) - - envelope = create_message_bindings() - send_result = sender.send_message(message=envelope) - - new_events = collector.events[events_before_send:] - - payload = {{ - "stage": "send_message", - "is_ok": send_result.is_ok(), - "ok": send_result.ok_value if send_result.is_ok() else None, - "err": send_result.err() if send_result.is_err() else None, - "events_after_send": [str(e) for e in new_events], - }} - print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps(payload)) - sys.exit(0) - """ -).strip() - - -class TestS01NilOrUninitializedHandle(StepsCommon): - """S01 — send() on a nil/destroyed handle must Err, no events, no crash.""" - - def test_s01_send_on_uninitialized_handle(self): - completed = subprocess.run( - [sys.executable, "-c", _S01_SUBPROCESS_SCRIPT], - capture_output=True, - text=True, - timeout=S01_SUBPROCESS_TIMEOUT_S, - ) - - assert completed.returncode == 0, ( - f"send() crashed on a nil handle (returncode={completed.returncode}). " f"stdout={completed.stdout!r} stderr={completed.stderr!r}" - ) - - result_line = next( - (l for l in completed.stdout.splitlines() if l.startswith(S01_RESULT_MARKER)), - None, - ) - assert result_line, f"missing result marker. stdout={completed.stdout!r} stderr={completed.stderr!r}" - - result = json.loads(result_line[len(S01_RESULT_MARKER) :]) - - assert result["is_ok"] is False, f"expected Err, got Ok({result['ok']!r})" - assert S01_EXPECTED_ERROR_FRAGMENT in ( - result["err"] or "" - ), f"expected error to mention {S01_EXPECTED_ERROR_FRAGMENT!r}, got: {result['err']!r}" - - def test_s01_send_on_destroyed_handle(self): - completed = subprocess.run( - [sys.executable, "-c", _SEND_AFTER_DESTROY_SUBPROCESS_SCRIPT], - capture_output=True, - text=True, - timeout=SEND_AFTER_DESTROY_SUBPROCESS_TIMEOUT_S, - ) - - assert completed.returncode == 0, ( - f"send() crashed on a destroyed handle (returncode={completed.returncode}). " f"stdout={completed.stdout!r} stderr={completed.stderr!r}" - ) - - result_line = next( - (l for l in completed.stdout.splitlines() if l.startswith(SEND_AFTER_DESTROY_RESULT_MARKER)), - None, - ) - assert result_line, f"missing result marker. stdout={completed.stdout!r} stderr={completed.stderr!r}" - - result = json.loads(result_line[len(SEND_AFTER_DESTROY_RESULT_MARKER) :]) - - assert result["stage"] == "send_message", f"setup failed at stage {result['stage']!r}: {result['err']!r}" - assert result["is_ok"] is False, f"expected Err, got Ok({result['ok']!r})" - assert S01_EXPECTED_ERROR_FRAGMENT in ( - result["err"] or "" - ), f"expected error to mention {S01_EXPECTED_ERROR_FRAGMENT!r}, got: {result['err']!r}" - assert result["events_after_send"] == [], f"expected no events after send(), got: {result['events_after_send']}" - class TestS02AutoSubscribeOnFirstSend(StepsCommon): """ @@ -306,229 +101,6 @@ class TestS02AutoSubscribeOnFirstSend(StepsCommon): assert error is None, f"Unexpected message_error event: {error}" -class TestS03SendOnAlreadySubscribedTopic(StepsCommon): - """ - S03 — Send on already-subscribed content topic. - Sender explicitly calls subscribe_content_topic() before send(). - The send path must behave identically to the auto-subscribe case: - Propagated arrives, no Sent (store disabled), no Error. - Topology mirrors S06 (relay-only sender + relay peer, no store). - Purpose: proves the send path is identical when auto-subscription is skipped. - """ - - def test_s03_send_on_already_subscribed_content_topic(self, node_config): - sender_collector = EventCollector() - content_topic = "/test/1/s03-already-subscribed/proto" - - node_config.update( - { - "relay": True, - "store": False, - "lightpush": False, - "filter": False, - "discv5Discovery": False, - "numShardsInNetwork": 1, - "reliabilityEnabled": True, - } - ) - - sender_result = WrapperManager.create_and_start( - config=node_config, - event_cb=sender_collector.event_callback, - ) - assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" - - with sender_result.ok_value as sender: - peer_config = { - **node_config, - "staticnodes": [get_node_multiaddr(sender)], - "portsShift": 1, - } - - peer_result = WrapperManager.create_and_start(config=peer_config) - assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" - - with peer_result.ok_value: - assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" - - # Explicit subscribe before send — this is what S03 is about. - # The send path must still return Ok(RequestId) and emit the - # same events as the auto-subscribe topology in S06. - subscribe_result = sender.subscribe_content_topic(content_topic) - assert subscribe_result.is_ok(), f"subscribe_content_topic failed: {subscribe_result.err()}" - - message = create_message_bindings( - payload=to_base64("S03 already-subscribed test payload"), - contentTopic=content_topic, - ) - - send_result = sender.send_message(message=message) - assert send_result.is_ok(), f"send() failed: {send_result.err()}" - - request_id = send_result.ok_value - assert request_id, "send() returned an empty RequestId" - - propagated = wait_for_propagated( - collector=sender_collector, - request_id=request_id, - timeout_s=PROPAGATED_TIMEOUT_S, - ) - assert propagated is not None, ( - f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" - ) - assert propagated["requestId"] == request_id - - error = wait_for_error(sender_collector, request_id, timeout_s=0) - assert error is None, f"Unexpected message_error event: {error}" - - sent = wait_for_sent(sender_collector, request_id, timeout_s=0) - assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}" - - assert_event_invariants(sender_collector, request_id) - - -class TestS04UnsubscribeThenSendSameTopic(StepsCommon): - """ - S04 — Unsubscribe, then send the same content topic again. - Sender subscribes to topic A, unsubscribes from A, then sends on A. - The send path must re-establish topic interest and deliver normally. - Topology mirrors S06 (relay-only sender + relay peer, no store). - Expected: send() returns Ok(RequestId), Propagated arrives, - no Sent (store disabled), no Error. - Purpose: verifies send() re-establishes topic interest after local unsubscribe. - """ - - def test_s04_unsubscribe_then_send_same_content_topic(self, node_config): - sender_collector = EventCollector() - content_topic = "/test/1/s04-unsubscribe-resend/proto" - - node_config.update( - { - "relay": True, - "store": False, - "lightpush": False, - "filter": False, - "discv5Discovery": False, - "numShardsInNetwork": 1, - "reliabilityEnabled": True, - } - ) - - sender_result = WrapperManager.create_and_start( - config=node_config, - event_cb=sender_collector.event_callback, - ) - assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" - - with sender_result.ok_value as sender: - peer_config = { - **node_config, - "staticnodes": [get_node_multiaddr(sender)], - "portsShift": 1, - } - - peer_result = WrapperManager.create_and_start(config=peer_config) - assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" - - with peer_result.ok_value: - assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" - - # subscribe -> unsubscribe -> send: send() must re-establish - # topic interest internally. - subscribe_result = sender.subscribe_content_topic(content_topic) - assert subscribe_result.is_ok(), f"subscribe_content_topic failed: {subscribe_result.err()}" - - unsubscribe_result = sender.unsubscribe_content_topic(content_topic) - assert unsubscribe_result.is_ok(), f"unsubscribe_content_topic failed: {unsubscribe_result.err()}" - - message = create_message_bindings( - payload=to_base64("S04 unsubscribe-then-send test payload"), - contentTopic=content_topic, - ) - - send_result = sender.send_message(message=message) - assert send_result.is_ok(), f"send() failed after unsubscribe: {send_result.err()}" - - request_id = send_result.ok_value - assert request_id, "send() returned an empty RequestId" - - propagated = wait_for_propagated( - collector=sender_collector, - request_id=request_id, - timeout_s=PROPAGATED_TIMEOUT_S, - ) - assert propagated is not None, ( - f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s " - f"after unsubscribe + send. Collected events: {sender_collector.events}" - ) - assert propagated["requestId"] == request_id - - error = wait_for_error(sender_collector, request_id, timeout_s=0) - assert error is None, f"Unexpected message_error event: {error}" - - sent = wait_for_sent(sender_collector, request_id, timeout_s=0) - assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}" - - assert_event_invariants(sender_collector, request_id) - - -class TestS05AutoSubscribeFailureBeforeTaskCreation(StepsCommon): - """ - S05 — Auto-subscribe failure before task creation. - Sender is initialized but auto-subscription is forced to fail by using a - malformed content topic that breaks shard resolution inside - SubscriptionManager.subscribe(). - Expected: send() returns Err with an "auto-subscribe" message, no events. - Purpose: covers the last synchronous error path before request ID creation, - across several distinct validator branches. - """ - - @pytest.mark.parametrize( - "content_topic", - [topic for topic, _ in S05_MALFORMED_CONTENT_TOPICS], - ids=[case_id for _, case_id in S05_MALFORMED_CONTENT_TOPICS], - ) - def test_s05_send_fails_when_auto_subscribe_fails(self, node_config, content_topic): - sender_collector = EventCollector() - - node_config.update( - { - "relay": True, - "numShardsInNetwork": 1, - } - ) - - sender_result = WrapperManager.create_and_start( - config=node_config, - event_cb=sender_collector.event_callback, - ) - assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" - - with sender_result.ok_value as sender: - # Malformed content topic — SubscriptionManager.subscribe() cannot - # resolve it to a shard, so auto-subscribe inside send() fails. - message = create_message_bindings( - payload=to_base64("S05 auto-subscribe failure payload"), - contentTopic=content_topic, - ) - - send_result = sender.send_message(message=message) - - assert send_result.is_err(), ( - f"send() must return Err when auto-subscribe fails for " f"content_topic={content_topic!r}, got Ok({send_result.ok_value!r})" - ) - - error_message = send_result.err() or "" - assert S05_EXPECTED_ERROR_FRAGMENT in error_message, ( - f"expected error to mention {S05_EXPECTED_ERROR_FRAGMENT!r} " f"for content_topic={content_topic!r}, got: {error_message!r}" - ) - - # No request id was created, so no events should be emitted. - assert sender_collector.events == [] or all( - event.get("eventType") == "connection_status_change" for event in sender_collector.events - ), f"Unexpected events after a pre-task-creation failure: {sender_collector.events}" - - class TestS06CoreSenderRelayOnly(StepsCommon): """ S06 — Core sender with relay peers only, no store. @@ -916,111 +488,6 @@ class TestS10EdgeSenderLightpushOnly(StepsCommon): assert_event_invariants(sender_collector, request_id) -class TestS11EdgeSenderLightpushAndStore(StepsCommon): - """ - S11 — Edge sender with lightpush path and store validation. - Edge sender has no local relay; it publishes via a wrapper lightpush peer - and validates delivery via a docker store peer. Reliability enabled. - Topology: - [LightpushPeer] wrapper, relay=True, lightpush=True, store=False - [StorePeer] docker WakuNode, relay=true, store=true, - dials the lightpush peer via add_peers and subscribes - to the same shard-0 pubsub topic so it joins the - relay mesh and archives propagated messages. - [Edge] wrapper, mode="Edge", - staticnodes=[lightpush_peer], - storenode=store_peer, - reliabilityEnabled=True - Expected: send() returns Ok(RequestId), Propagated arrives, then Sent - (store validation succeeds), no Error. - Purpose: edge-mode fully validated success path against a real docker - store node (cross-implementation check). - """ - - def test_s11_edge_lightpush_with_store_validation(self, node_config): - sender_collector = EventCollector() - - common = { - "filter": False, - "discv5Discovery": True, - "numShardsInNetwork": 1, - } - - lightpush_config = build_node_config( - relay=True, - lightpush=True, - store=False, - **common, - ) - - lightpush_result = WrapperManager.create_and_start(config=lightpush_config) - assert lightpush_result.is_ok(), f"Failed to start lightpush peer: {lightpush_result.err()}" - - with lightpush_result.ok_value as lightpush_peer: - lightpush_multiaddr = get_node_multiaddr(lightpush_peer) - - # Docker store peer — real nwaku node running as the store backend. - # Dial the lightpush peer via add_peers and subscribe to the same - # shard-0 pubsub topic so it joins the relay mesh and archives - # messages propagated by the lightpush peer. - store_peer = WakuNode(NODE_1, f"s11_store_{self.test_id}") - store_peer.start(relay="true", store="true") - self.add_node_peer(store_peer, [lightpush_multiaddr]) - store_peer.set_relay_subscriptions([STORE_PEER_PUBSUB_TOPIC]) - store_multiaddr = store_peer.get_multiaddr_with_id() - - edge_config = build_node_config( - mode="Edge", - # assuming disc5v already happened - staticnodes=[lightpush_multiaddr, store_multiaddr], - reliabilityEnabled=True, - **common, - ) - - edge_result = WrapperManager.create_and_start( - config=edge_config, - event_cb=sender_collector.event_callback, - ) - assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" - - with edge_result.ok_value as edge_sender: - message = create_message_bindings( - payload=to_base64("S11 edge lightpush + store test payload"), - contentTopic="/test/1/s11-edge-lightpush-store/proto", - ) - - send_result = edge_sender.send_message(message=message) - assert send_result.is_ok(), f"send() failed: {send_result.err()}" - - request_id = send_result.ok_value - assert request_id, "send() returned an empty RequestId" - - propagated = wait_for_propagated( - collector=sender_collector, - request_id=request_id, - timeout_s=PROPAGATED_TIMEOUT_S, - ) - assert propagated is not None, ( - f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" - ) - assert propagated["requestId"] == request_id - - sent = wait_for_sent( - collector=sender_collector, - request_id=request_id, - timeout_s=SENT_AFTER_STORE_TIMEOUT_S, - ) - assert sent is not None, ( - f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " f"after propagation. Collected events: {sender_collector.events}" - ) - assert sent["requestId"] == request_id - - error = wait_for_error(sender_collector, request_id, timeout_s=0) - assert error is None, f"Unexpected message_error event: {error}" - - assert_event_invariants(sender_collector, request_id) - - class TestS12IsolatedSenderNoPeers(StepsCommon): """ S12 — Isolated sender, no peers. diff --git a/tests/wrappers_tests/test_send_e2e_part3.py b/tests/wrappers_tests/test_send_e2e_part3.py new file mode 100644 index 000000000..9c31aaa7f --- /dev/null +++ b/tests/wrappers_tests/test_send_e2e_part3.py @@ -0,0 +1,552 @@ +import json +import subprocess +import sys +import textwrap + +import pytest +from src.steps.common import StepsCommon +from src.libs.common import to_base64 +from src.env_vars import NODE_1 +from src.node.waku_node import WakuNode +from src.node.wrappers_manager import WrapperManager +from src.node.wrapper_helpers import ( + EventCollector, + assert_event_invariants, + create_message_bindings, + get_node_multiaddr, + wait_for_connected, + wait_for_propagated, + wait_for_sent, + wait_for_error, +) +from src.test_data import DEFAULT_CLUSTER_ID +from tests.wrappers_tests.conftest import build_node_config + +PROPAGATED_TIMEOUT_S = 30.0 +SENT_AFTER_STORE_TIMEOUT_S = 60.0 + +# Default-cluster shard-0 pubsub topic; used to subscribe the S11 docker store +# peer so it joins the same relay mesh as the wrapper nodes (wrapper config +# uses numShardsInNetwork=1 => shard 0). +STORE_PEER_PUBSUB_TOPIC = f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0" + +S01_EXPECTED_ERROR_FRAGMENT = "not initialized" +S01_SUBPROCESS_TIMEOUT_S = 30 +S01_RESULT_MARKER = "__S01_RESULT__" +SEND_AFTER_DESTROY_RESULT_MARKER = "__SEND_AFTER_DESTROY_RESULT__" +SEND_AFTER_DESTROY_SUBPROCESS_TIMEOUT_S = 60 + +# S05: malformed content topics break shard resolution inside +# SubscriptionManager.subscribe(), forcing the auto-subscribe step to fail. +# Each case targets a distinct validator branch. +# Fragment match: the bindings return Err("Failed to auto-subscribe before +# sending: ..."). We match on the capitalized verb phrase "Failed to +# auto-subscribe" because the bare word "auto-subscribe" also appears in +# success-path log strings (e.g. "Auto-subscribing to topic on send") and +# would produce false positives if we matched on that. +S05_EXPECTED_ERROR_FRAGMENT = "Failed to auto-subscribe" +S05_MALFORMED_CONTENT_TOPICS = [ + # No leading slash — parser rejects with "must start with slash". + ("s05-invalid-no-leading-slash", "no-leading-slash"), + # Empty string — parser rejects empty content topic. + ("", "empty"), + # Only 3 segments — content topics need /app/version/name/encoding. + ("/app/1/name", "missing-encoding-segment"), + # Empty middle segment between slashes. + ("/app//name/proto", "empty-middle-segment"), +] + + +# Run send() in a subprocess so a missing C-ABI guard (which can SIGSEGV) +# fails the test cleanly instead of taking the runner down. +_S01_SUBPROCESS_SCRIPT = textwrap.dedent( + f""" + import json + import sys + from pathlib import Path + + _project_root = Path({repr(__file__)}).resolve().parents[2] + _bindings_path = _project_root / "vendor" / "logos-delivery-python-bindings" / "waku" + if str(_bindings_path) not in sys.path: + sys.path.insert(0, str(_bindings_path)) + if str(_project_root) not in sys.path: + sys.path.insert(0, str(_project_root)) + + from wrapper import NodeWrapper, ffi + from src.node.wrappers_manager import WrapperManager + from src.node.wrapper_helpers import create_message_bindings + + sender = WrapperManager(NodeWrapper(ctx=ffi.NULL, config_buffer=None, event_cb_handler=None)) + send_result = sender.send_message(message=create_message_bindings()) + + print({repr(S01_RESULT_MARKER)} + json.dumps({{ + "is_ok": send_result.is_ok(), + "ok": send_result.ok_value if send_result.is_ok() else None, + "err": send_result.err() if send_result.is_err() else None, + }})) + sys.exit(0) + """ +).strip() + + +# Uses destroy_keep_ctx() so self.ctx stays non-nil after destroy — forces +# the send call to reach the C side with the original (now-stale) pointer. +_SEND_AFTER_DESTROY_SUBPROCESS_SCRIPT = textwrap.dedent( + f""" + import json + import sys + from pathlib import Path + + _project_root = Path({repr(__file__)}).resolve().parents[2] + _bindings_path = _project_root / "vendor" / "logos-delivery-python-bindings" / "waku" + if str(_bindings_path) not in sys.path: + sys.path.insert(0, str(_bindings_path)) + if str(_project_root) not in sys.path: + sys.path.insert(0, str(_project_root)) + + from src.node.wrappers_manager import WrapperManager + from src.node.wrapper_helpers import EventCollector, create_message_bindings + from tests.wrappers_tests.conftest import build_node_config + + collector = EventCollector() + + create_result = WrapperManager.create_and_start( + config=build_node_config(), + event_cb=collector.event_callback, + ) + if create_result.is_err(): + print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps({{ + "stage": "create_and_start", + "is_ok": False, + "ok": None, + "err": create_result.err(), + "events_after_send": [], + }})) + sys.exit(0) + + sender = create_result.ok_value + + stop_result = sender.stop_node() + if stop_result.is_err(): + print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps({{ + "stage": "stop_node", + "is_ok": False, + "ok": None, + "err": stop_result.err(), + "events_after_send": [], + }})) + sys.exit(0) + + destroy_result = sender.destroy_keep_ctx() + if destroy_result.is_err(): + print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps({{ + "stage": "destroy_keep_ctx", + "is_ok": False, + "ok": None, + "err": destroy_result.err(), + "events_after_send": [], + }})) + sys.exit(0) + + events_before_send = len(collector.events) + + envelope = create_message_bindings() + send_result = sender.send_message(message=envelope) + + new_events = collector.events[events_before_send:] + + payload = {{ + "stage": "send_message", + "is_ok": send_result.is_ok(), + "ok": send_result.ok_value if send_result.is_ok() else None, + "err": send_result.err() if send_result.is_err() else None, + "events_after_send": [str(e) for e in new_events], + }} + print({repr(SEND_AFTER_DESTROY_RESULT_MARKER)} + json.dumps(payload)) + sys.exit(0) + """ +).strip() + + +class TestS01NilOrUninitializedHandle(StepsCommon): + """S01 — send() on a nil/destroyed handle must Err, no events, no crash.""" + + def test_s01_send_on_uninitialized_handle(self): + completed = subprocess.run( + [sys.executable, "-c", _S01_SUBPROCESS_SCRIPT], + capture_output=True, + text=True, + timeout=S01_SUBPROCESS_TIMEOUT_S, + ) + + assert completed.returncode == 0, ( + f"send() crashed on a nil handle (returncode={completed.returncode}). " f"stdout={completed.stdout!r} stderr={completed.stderr!r}" + ) + + result_line = next( + (l for l in completed.stdout.splitlines() if l.startswith(S01_RESULT_MARKER)), + None, + ) + assert result_line, f"missing result marker. stdout={completed.stdout!r} stderr={completed.stderr!r}" + + result = json.loads(result_line[len(S01_RESULT_MARKER) :]) + + assert result["is_ok"] is False, f"expected Err, got Ok({result['ok']!r})" + assert S01_EXPECTED_ERROR_FRAGMENT in ( + result["err"] or "" + ), f"expected error to mention {S01_EXPECTED_ERROR_FRAGMENT!r}, got: {result['err']!r}" + + def test_s01_send_on_destroyed_handle(self): + completed = subprocess.run( + [sys.executable, "-c", _SEND_AFTER_DESTROY_SUBPROCESS_SCRIPT], + capture_output=True, + text=True, + timeout=SEND_AFTER_DESTROY_SUBPROCESS_TIMEOUT_S, + ) + + assert completed.returncode == 0, ( + f"send() crashed on a destroyed handle (returncode={completed.returncode}). " f"stdout={completed.stdout!r} stderr={completed.stderr!r}" + ) + + result_line = next( + (l for l in completed.stdout.splitlines() if l.startswith(SEND_AFTER_DESTROY_RESULT_MARKER)), + None, + ) + assert result_line, f"missing result marker. stdout={completed.stdout!r} stderr={completed.stderr!r}" + + result = json.loads(result_line[len(SEND_AFTER_DESTROY_RESULT_MARKER) :]) + + assert result["stage"] == "send_message", f"setup failed at stage {result['stage']!r}: {result['err']!r}" + assert result["is_ok"] is False, f"expected Err, got Ok({result['ok']!r})" + assert S01_EXPECTED_ERROR_FRAGMENT in ( + result["err"] or "" + ), f"expected error to mention {S01_EXPECTED_ERROR_FRAGMENT!r}, got: {result['err']!r}" + assert result["events_after_send"] == [], f"expected no events after send(), got: {result['events_after_send']}" + + +class TestS03SendOnAlreadySubscribedTopic(StepsCommon): + """ + S03 — Send on already-subscribed content topic. + Sender explicitly calls subscribe_content_topic() before send(). + The send path must behave identically to the auto-subscribe case: + Propagated arrives, no Sent (store disabled), no Error. + Topology mirrors S06 (relay-only sender + relay peer, no store). + Purpose: proves the send path is identical when auto-subscription is skipped. + """ + + def test_s03_send_on_already_subscribed_content_topic(self, node_config): + sender_collector = EventCollector() + content_topic = "/test/1/s03-already-subscribed/proto" + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "reliabilityEnabled": True, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsShift": 1, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" + + with peer_result.ok_value: + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + + # Explicit subscribe before send — this is what S03 is about. + # The send path must still return Ok(RequestId) and emit the + # same events as the auto-subscribe topology in S06. + subscribe_result = sender.subscribe_content_topic(content_topic) + assert subscribe_result.is_ok(), f"subscribe_content_topic failed: {subscribe_result.err()}" + + message = create_message_bindings( + payload=to_base64("S03 already-subscribed test payload"), + contentTopic=content_topic, + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + sent = wait_for_sent(sender_collector, request_id, timeout_s=0) + assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS04UnsubscribeThenSendSameTopic(StepsCommon): + """ + S04 — Unsubscribe, then send the same content topic again. + Sender subscribes to topic A, unsubscribes from A, then sends on A. + The send path must re-establish topic interest and deliver normally. + Topology mirrors S06 (relay-only sender + relay peer, no store). + Expected: send() returns Ok(RequestId), Propagated arrives, + no Sent (store disabled), no Error. + Purpose: verifies send() re-establishes topic interest after local unsubscribe. + """ + + def test_s04_unsubscribe_then_send_same_content_topic(self, node_config): + sender_collector = EventCollector() + content_topic = "/test/1/s04-unsubscribe-resend/proto" + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + "reliabilityEnabled": True, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsShift": 1, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" + + with peer_result.ok_value: + assert wait_for_connected(sender_collector) is not None, "Sender did not reach Connected/PartiallyConnected state" + + # subscribe -> unsubscribe -> send: send() must re-establish + # topic interest internally. + subscribe_result = sender.subscribe_content_topic(content_topic) + assert subscribe_result.is_ok(), f"subscribe_content_topic failed: {subscribe_result.err()}" + + unsubscribe_result = sender.unsubscribe_content_topic(content_topic) + assert unsubscribe_result.is_ok(), f"unsubscribe_content_topic failed: {unsubscribe_result.err()}" + + message = create_message_bindings( + payload=to_base64("S04 unsubscribe-then-send test payload"), + contentTopic=content_topic, + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed after unsubscribe: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s " + f"after unsubscribe + send. Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + sent = wait_for_sent(sender_collector, request_id, timeout_s=0) + assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}" + + assert_event_invariants(sender_collector, request_id) + + +class TestS05AutoSubscribeFailureBeforeTaskCreation(StepsCommon): + """ + S05 — Auto-subscribe failure before task creation. + Sender is initialized but auto-subscription is forced to fail by using a + malformed content topic that breaks shard resolution inside + SubscriptionManager.subscribe(). + Expected: send() returns Err with an "auto-subscribe" message, no events. + Purpose: covers the last synchronous error path before request ID creation, + across several distinct validator branches. + """ + + @pytest.mark.parametrize( + "content_topic", + [topic for topic, _ in S05_MALFORMED_CONTENT_TOPICS], + ids=[case_id for _, case_id in S05_MALFORMED_CONTENT_TOPICS], + ) + def test_s05_send_fails_when_auto_subscribe_fails(self, node_config, content_topic): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + # Malformed content topic — SubscriptionManager.subscribe() cannot + # resolve it to a shard, so auto-subscribe inside send() fails. + message = create_message_bindings( + payload=to_base64("S05 auto-subscribe failure payload"), + contentTopic=content_topic, + ) + + send_result = sender.send_message(message=message) + + assert send_result.is_err(), ( + f"send() must return Err when auto-subscribe fails for " f"content_topic={content_topic!r}, got Ok({send_result.ok_value!r})" + ) + + error_message = send_result.err() or "" + assert S05_EXPECTED_ERROR_FRAGMENT in error_message, ( + f"expected error to mention {S05_EXPECTED_ERROR_FRAGMENT!r} " f"for content_topic={content_topic!r}, got: {error_message!r}" + ) + + # No request id was created, so no events should be emitted. + assert sender_collector.events == [] or all( + event.get("eventType") == "connection_status_change" for event in sender_collector.events + ), f"Unexpected events after a pre-task-creation failure: {sender_collector.events}" + + +class TestS11EdgeSenderLightpushAndStore(StepsCommon): + """ + S11 — Edge sender with lightpush path and store validation. + Edge sender has no local relay; it publishes via a wrapper lightpush peer + and validates delivery via a docker store peer. Reliability enabled. + Topology: + [LightpushPeer] wrapper, relay=True, lightpush=True, store=False + [StorePeer] docker WakuNode, relay=true, store=true, + dials the lightpush peer via add_peers and subscribes + to the same shard-0 pubsub topic so it joins the + relay mesh and archives propagated messages. + [Edge] wrapper, mode="Edge", + staticnodes=[lightpush_peer], + storenode=store_peer, + reliabilityEnabled=True + Expected: send() returns Ok(RequestId), Propagated arrives, then Sent + (store validation succeeds), no Error. + Purpose: edge-mode fully validated success path against a real docker + store node (cross-implementation check). + """ + + def test_s11_edge_lightpush_with_store_validation(self, node_config): + sender_collector = EventCollector() + + common = { + "filter": False, + "discv5Discovery": True, + "numShardsInNetwork": 1, + } + + lightpush_config = build_node_config( + relay=True, + lightpush=True, + store=False, + **common, + ) + + lightpush_result = WrapperManager.create_and_start(config=lightpush_config) + assert lightpush_result.is_ok(), f"Failed to start lightpush peer: {lightpush_result.err()}" + + with lightpush_result.ok_value as lightpush_peer: + lightpush_multiaddr = get_node_multiaddr(lightpush_peer) + + # Docker store peer — real nwaku node running as the store backend. + # Dial the lightpush peer via add_peers and subscribe to the same + # shard-0 pubsub topic so it joins the relay mesh and archives + # messages propagated by the lightpush peer. + store_peer = WakuNode(NODE_1, f"s11_store_{self.test_id}") + store_peer.start(relay="true", store="true") + self.add_node_peer(store_peer, [lightpush_multiaddr]) + store_peer.set_relay_subscriptions([STORE_PEER_PUBSUB_TOPIC]) + store_multiaddr = store_peer.get_multiaddr_with_id() + + edge_config = build_node_config( + mode="Edge", + # assuming disc5v already happened + staticnodes=[lightpush_multiaddr, store_multiaddr], + reliabilityEnabled=True, + **common, + ) + + edge_result = WrapperManager.create_and_start( + config=edge_config, + event_cb=sender_collector.event_callback, + ) + assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}" + + with edge_result.ok_value as edge_sender: + message = create_message_bindings( + payload=to_base64("S11 edge lightpush + store test payload"), + contentTopic="/test/1/s11-edge-lightpush-store/proto", + ) + + send_result = edge_sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + sent = wait_for_sent( + collector=sender_collector, + request_id=request_id, + timeout_s=SENT_AFTER_STORE_TIMEOUT_S, + ) + assert sent is not None, ( + f"No message_sent event within {SENT_AFTER_STORE_TIMEOUT_S}s " f"after propagation. Collected events: {sender_collector.events}" + ) + assert sent["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + assert_event_invariants(sender_collector, request_id)