From f3f4cf8a6470af3f6de0f26381d2090c34d7c0ab Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Tue, 14 Apr 2026 19:43:20 +0200 Subject: [PATCH] add test s17 --- src/node/wrappers_helpers.py | 102 +++++++++++++++++++++++ src/node/wrappers_manager.py | 76 +++++++++++++++++ tests/wrappers_tests/test_send_e2e.py | 113 ++++++++++++++++++++++++++ 3 files changed, 291 insertions(+) create mode 100644 src/node/wrappers_helpers.py create mode 100644 src/node/wrappers_manager.py create mode 100644 tests/wrappers_tests/test_send_e2e.py diff --git a/src/node/wrappers_helpers.py b/src/node/wrappers_helpers.py new file mode 100644 index 000000000..31aaaee67 --- /dev/null +++ b/src/node/wrappers_helpers.py @@ -0,0 +1,102 @@ +import json +import threading +import time + + +class EventCollector: + """Thread-safe collector for async node events. + + Pass `collector.event_callback` as the `event_cb` argument to + WrapperManager.create_and_start(). Every event fired by the library + is decoded from JSON and appended to `self.events`. + """ + + def __init__(self): + self._lock = threading.Lock() + self.events: list[dict] = [] + + def event_callback(self, ret: int, raw: bytes) -> None: + try: + payload = json.loads(raw.decode("utf-8")) + except Exception: + payload = {"_raw": raw.decode("utf-8", errors="replace"), "_ret": ret} + + with self._lock: + self.events.append(payload) + + def get_events_for_request(self, request_id: str) -> list[dict]: + with self._lock: + return [e for e in self.events if e.get("requestId") == request_id] + + +def is_propagated_event(event: dict) -> bool: + return "Propagated" in event.get("eventType", "") + + +def is_sent_event(event: dict) -> bool: + return "Sent" in event.get("eventType", "") + + +def is_error_event(event: dict) -> bool: + return "Error" in event.get("eventType", "") + + +def wait_for_event( + collector: EventCollector, + request_id: str, + predicate, + timeout_s: float, + poll_interval_s: float = 0.5, +) -> dict | None: + """Poll until an event matching `predicate` arrives for `request_id`, + or until `timeout_s` elapses. Returns the matching event or None. + """ + deadline = time.monotonic() + timeout_s + + while time.monotonic() < deadline: + for event in collector.get_events_for_request(request_id): + if predicate(event): + return event + time.sleep(poll_interval_s) + + return None + + +def wait_for_propagated(collector: EventCollector, request_id: str, timeout_s: float) -> dict | None: + return wait_for_event(collector, request_id, is_propagated_event, timeout_s) + + +def wait_for_sent(collector: EventCollector, request_id: str, timeout_s: float) -> dict | None: + return wait_for_event(collector, request_id, is_sent_event, timeout_s) + + +def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float) -> dict | None: + return wait_for_event(collector, request_id, is_error_event, timeout_s) + + +def get_node_multiaddr(node) -> str: + """Return the first TCP multiaddr (with peer-id) from a WrapperManager node. + + Uses the existing get_available_node_info_ids / get_node_info APIs on + WrapperManager. Raises RuntimeError if the address cannot be resolved. + """ + ids_result = node.get_available_node_info_ids() + if ids_result.is_err(): + raise RuntimeError(f"get_available_node_info_ids failed: {ids_result.err()}") + + ids = ids_result.ok_value + if not ids: + raise RuntimeError("No node-info IDs returned") + + info_result = node.get_node_info(ids[0]) + if info_result.is_err(): + raise RuntimeError(f"get_node_info failed: {info_result.err()}") + + info = info_result.ok_value + + for key in ("listenAddresses", "multiaddrs", "addresses"): + addresses = info.get(key) + if addresses: + return addresses[0] + + raise RuntimeError(f"Could not find a listen address in node info: {info}") diff --git a/src/node/wrappers_manager.py b/src/node/wrappers_manager.py new file mode 100644 index 000000000..680bff9b2 --- /dev/null +++ b/src/node/wrappers_manager.py @@ -0,0 +1,76 @@ +import sys +from pathlib import Path +from result import Result, Ok, Err + +_THIRD_PARTY = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku" +if str(_THIRD_PARTY) not in sys.path: + sys.path.insert(0, str(_THIRD_PARTY)) + +from wrapper import NodeWrapper as _NodeWrapper # type: ignore[import] + + +class WrapperManager: + def __init__(self, node: _NodeWrapper): + self._node = node + + @classmethod + def create( + cls, + config: dict, + event_cb=None, + *, + timeout_s: float = 20.0, + ) -> Result["WrapperManager", str]: + result = _NodeWrapper.create_node(config, event_cb, timeout_s=timeout_s) + if result.is_err(): + return Err(result.err()) + return Ok(cls(result.ok_value)) + + @classmethod + def create_and_start( + cls, + config: dict, + event_cb=None, + *, + timeout_s: float = 20.0, + ) -> Result["WrapperManager", str]: + result = _NodeWrapper.create_and_start(config, event_cb, timeout_s=timeout_s) + if result.is_err(): + return Err(result.err()) + return Ok(cls(result.ok_value)) + + def __enter__(self) -> "WrapperManager": + return self + + def __exit__(self, *_) -> None: + self.stop_and_destroy() + + def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.start_node(timeout_s=timeout_s) + + def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.stop_node(timeout_s=timeout_s) + + def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.destroy(timeout_s=timeout_s) + + def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.stop_and_destroy(timeout_s=timeout_s) + + def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.subscribe_content_topic(content_topic, timeout_s=timeout_s) + + def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s) + + def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]: + return self._node.send_message(message, timeout_s=timeout_s) + + def get_available_node_info_ids(self, *, timeout_s: float = 20.0) -> Result[list[str], str]: + return self._node.get_available_node_info_ids(timeout_s=timeout_s) + + def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]: + return self._node.get_node_info(node_info_id, timeout_s=timeout_s) + + def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]: + return self._node.get_available_configs(timeout_s=timeout_s) diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py new file mode 100644 index 000000000..ede0e04a6 --- /dev/null +++ b/tests/wrappers_tests/test_send_e2e.py @@ -0,0 +1,113 @@ +""" +S17 – No delivery peers at T0, relay peers appear later. + +Setup: Sender starts isolated (no peers). +Action: send() is called immediately. +Then: A relay peer is started with the sender's multiaddr as a static peer, + causing both nodes to connect and the sender's retry loop to deliver. +Expected: + - send() returns Ok(RequestId) synchronously even with no peers. + - A MessagePropagatedEvent with the same RequestId arrives once the + relay peer joins. + - No MessageErrorEvent arrives before the Propagated event. + +Reference: issue #163, scenario S17. +""" + +import pytest + +from src.node.wrappers_manager import WrapperManager +from src.node.wrappers_helpers import ( + EventCollector, + get_node_multiaddr, + wait_for_propagated, + wait_for_error, +) + +CONTENT_TOPIC = "/test/1/s17-relay-late-join/proto" +PROPAGATED_TIMEOUT_S = 30.0 + + +class TestS17RelayPeersAppearLater: + @pytest.fixture + def sender_collector(self): + return EventCollector() + + @pytest.fixture + def sender_node(self, node_config, sender_collector): + # node_config is provided by the conftest.py fixture; + # override only what differs from the default for an isolated sender. + node_config.update({"relay": True, "store": False, "discv5Discovery": False}) + + result = WrapperManager.create_and_start(config=node_config, event_cb=sender_collector.event_callback) + assert result.is_ok(), f"Failed to start sender: {result.err()}" + + node = result.ok_value + yield node + node.stop_and_destroy() + + def test_send_before_relay_peers_exist_then_peer_joins(self, sender_node, sender_collector, node_config): + """ + S17: send() is called while the sender has no peers. + A relay peer is then brought online with the sender's address as a + static peer, causing the sender's retry loop to deliver the message. + """ + + # Step 1: send while isolated — must return Ok(RequestId) immediately + message = { + "contentTopic": CONTENT_TOPIC, + "payload": "UzE3IHJlbGF5IGxhdGUgam9pbg==", # base64("S17 relay late join") + } + + send_result = sender_node.send_message(message) + + assert send_result.is_ok(), f"send() must return Ok(RequestId) even with no peers, got: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + # Step 2: get sender's multiaddr so the relay peer can dial back to it + sender_multiaddr = get_node_multiaddr(sender_node) + + # Step 3: start the relay peer with the sender listed as a static peer. + # node_config produces a fresh config with its own free ports each call. + node_config.update({"relay": True, "store": False, "discv5Discovery": False}) + node_config["staticPeers"] = [sender_multiaddr] + + relay_peer_result = WrapperManager.create_and_start(config=node_config) + assert relay_peer_result.is_ok(), f"Failed to start relay peer: {relay_peer_result.err()}" + relay_peer = relay_peer_result.ok_value + + try: + # Step 4: wait for a Propagated event — the sender's retry loop should + # deliver the message now that a relay peer is reachable + propagated_event = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + + # Step 5: check no Error arrived before Propagated + error_event = wait_for_error( + collector=sender_collector, + request_id=request_id, + timeout_s=0, + ) + + # All events for this request must carry the same requestId + for event in sender_collector.get_events_for_request(request_id): + assert event.get("requestId") == request_id, f"Event carries wrong requestId: {event}" + + assert propagated_event is not None, ( + f"No MessagePropagatedEvent received within {PROPAGATED_TIMEOUT_S}s " + f"after relay peer joined. Collected events: {sender_collector.events}" + ) + + assert error_event is None, ( + f"MessageErrorEvent arrived before Propagated — violates S17 expectations.\n" + f"Error : {error_event}\n" + f"Propagated: {propagated_event}" + ) + + finally: + relay_peer.stop_and_destroy()