From df69919a7bcdbd53bba2ea019418b574d1e157c3 Mon Sep 17 00:00:00 2001 From: Egor Rachkovskii <32649334+at0m1x19@users.noreply.github.com> Date: Mon, 20 Apr 2026 12:33:42 +0100 Subject: [PATCH] Add S06 relay-only test and fix wrapper helpers (#173) * - Add S06 relay-only test case for testing message propagation without a store. - Update `wrapper_helpers` for clearer event type handling and type annotations (`Optional[...]` usage). - Simplify `get_node_multiaddr` to retrieve addresses via `get_node_info_raw`. - Refactor `wrappers_manager` to adjust bindings path to `vendor` directory and add `get_node_info_raw` method. - Update `.gitignore` to exclude `store.sqlite3*`. * Refactor S06 relay-only test: replace try-finally blocks with context managers for clarity and conciseness. * Migrate S06 relay-only test to `test_send_e2e.py` and refactor with `StepsCommon` for reusability. --------- Co-authored-by: Egor Rachkovskii --- .gitignore | 3 +- src/node/wrapper_helpers.py | 54 ++++++++++----------- src/node/wrappers_manager.py | 23 +++++++-- tests/wrappers_tests/test_send_e2e.py | 68 +++++++++++++++++++++++++++ 4 files changed, 114 insertions(+), 34 deletions(-) diff --git a/.gitignore b/.gitignore index e64e42ff9..7d7cfc8b6 100644 --- a/.gitignore +++ b/.gitignore @@ -105,4 +105,5 @@ dmypy.json # Pyre type checker .pyre/ -third_party/logos-delivery-python-bindings +# Waku node runtime artifacts +store.sqlite3* diff --git a/src/node/wrapper_helpers.py b/src/node/wrapper_helpers.py index 31aaaee67..b18a734ca 100644 --- a/src/node/wrapper_helpers.py +++ b/src/node/wrapper_helpers.py @@ -1,6 +1,9 @@ +from __future__ import annotations + import json import threading import time +from typing import Optional class EventCollector: @@ -29,16 +32,22 @@ class EventCollector: return [e for e in self.events if e.get("requestId") == request_id] +# eventType values emitted by liblogosdelivery (node_api.nim:106–124) +EVENT_PROPAGATED = "message_propagated" +EVENT_SENT = "message_sent" +EVENT_ERROR = "message_error" + + def is_propagated_event(event: dict) -> bool: - return "Propagated" in event.get("eventType", "") + return event.get("eventType") == EVENT_PROPAGATED def is_sent_event(event: dict) -> bool: - return "Sent" in event.get("eventType", "") + return event.get("eventType") == EVENT_SENT def is_error_event(event: dict) -> bool: - return "Error" in event.get("eventType", "") + return event.get("eventType") == EVENT_ERROR def wait_for_event( @@ -47,7 +56,7 @@ def wait_for_event( predicate, timeout_s: float, poll_interval_s: float = 0.5, -) -> dict | None: +) -> Optional[dict]: """Poll until an event matching `predicate` arrives for `request_id`, or until `timeout_s` elapses. Returns the matching event or None. """ @@ -62,41 +71,26 @@ def wait_for_event( return None -def wait_for_propagated(collector: EventCollector, request_id: str, timeout_s: float) -> dict | None: +def wait_for_propagated(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]: return wait_for_event(collector, request_id, is_propagated_event, timeout_s) -def wait_for_sent(collector: EventCollector, request_id: str, timeout_s: float) -> dict | None: +def wait_for_sent(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]: return wait_for_event(collector, request_id, is_sent_event, timeout_s) -def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float) -> dict | None: +def wait_for_error(collector: EventCollector, request_id: str, timeout_s: float) -> Optional[dict]: return wait_for_event(collector, request_id, is_error_event, timeout_s) def get_node_multiaddr(node) -> str: - """Return the first TCP multiaddr (with peer-id) from a WrapperManager node. + """Return the first TCP multiaddr (with peer-id) from a WrapperManager node.""" + result = node.get_node_info_raw("MyMultiaddresses") + if result.is_err(): + raise RuntimeError(f"get_node_info_raw failed: {result.err()}") - Uses the existing get_available_node_info_ids / get_node_info APIs on - WrapperManager. Raises RuntimeError if the address cannot be resolved. - """ - ids_result = node.get_available_node_info_ids() - if ids_result.is_err(): - raise RuntimeError(f"get_available_node_info_ids failed: {ids_result.err()}") + addr = result.ok_value.strip() + if not addr or not addr.startswith("/"): + raise RuntimeError(f"Unexpected multiaddr format: {addr!r}") - ids = ids_result.ok_value - if not ids: - raise RuntimeError("No node-info IDs returned") - - info_result = node.get_node_info(ids[0]) - if info_result.is_err(): - raise RuntimeError(f"get_node_info failed: {info_result.err()}") - - info = info_result.ok_value - - for key in ("listenAddresses", "multiaddrs", "addresses"): - addresses = info.get(key) - if addresses: - return addresses[0] - - raise RuntimeError(f"Could not find a listen address in node info: {info}") + return addr diff --git a/src/node/wrappers_manager.py b/src/node/wrappers_manager.py index 41845077d..a0d20e0d6 100644 --- a/src/node/wrappers_manager.py +++ b/src/node/wrappers_manager.py @@ -2,9 +2,9 @@ import sys from pathlib import Path from result import Result, Ok, Err -_THIRD_PARTY = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku" -if str(_THIRD_PARTY) not in sys.path: - sys.path.insert(0, str(_THIRD_PARTY)) +_BINDINGS_PATH = Path(__file__).resolve().parents[2] / "vendor" / "logos-delivery-python-bindings" / "waku" +if str(_BINDINGS_PATH) not in sys.path: + sys.path.insert(0, str(_BINDINGS_PATH)) from wrapper import NodeWrapper as _NodeWrapper # type: ignore[import] @@ -77,5 +77,22 @@ class WrapperManager: def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]: return self._node.get_node_info(node_info_id, timeout_s=timeout_s) + def get_node_info_raw(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[str, str]: + """Like get_node_info but returns the raw string without JSON parsing.""" + from wrapper import lib, ffi, _new_cb_state, _wait_cb_raw # type: ignore[import] + + state = _new_cb_state() + cb = self._node._make_waiting_cb(state) + rc = lib.logosdelivery_get_node_info(self._node.ctx, cb, ffi.NULL, node_info_id.encode("utf-8")) + if rc != 0: + return Err(f"get_node_info_raw: immediate call failed (ret={rc})") + wait_result = _wait_cb_raw(state, "get_node_info_raw", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"get_node_info_raw: callback failed (ret={cb_ret})") + return Ok(cb_msg.decode("utf-8") if cb_msg else "") + def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]: return self._node.get_available_configs(timeout_s=timeout_s) diff --git a/tests/wrappers_tests/test_send_e2e.py b/tests/wrappers_tests/test_send_e2e.py index fb794ab6e..fb80d15f2 100644 --- a/tests/wrappers_tests/test_send_e2e.py +++ b/tests/wrappers_tests/test_send_e2e.py @@ -12,6 +12,7 @@ from src.node.wrapper_helpers import ( get_node_multiaddr, wait_for_propagated, wait_for_sent, + wait_for_error, ) from src.steps.store import StepsStore @@ -248,3 +249,70 @@ class TestSendBeforeRelay(StepsStore): page_size=5, ascending="true", ) + + +class TestS06CoreSenderRelayOnly(StepsCommon): + """ + S06 — Core sender with relay peers only, no store. + Sender has local relay enabled and is connected to one relay peer. + Expected: send() returns Ok(RequestId), message_propagated event arrives, + no message_sent (store disabled), no message_error. + """ + + def test_s06_relay_propagation_without_store(self, node_config): + sender_collector = EventCollector() + + node_config.update( + { + "relay": True, + "store": False, + "lightpush": False, + "filter": False, + "discv5Discovery": False, + "numShardsInNetwork": 1, + } + ) + + sender_result = WrapperManager.create_and_start( + config=node_config, + event_cb=sender_collector.event_callback, + ) + assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}" + + with sender_result.ok_value as sender: + peer_config = { + **node_config, + "staticnodes": [get_node_multiaddr(sender)], + "portsshift": 1, + } + + peer_result = WrapperManager.create_and_start(config=peer_config) + assert peer_result.is_ok(), f"Failed to start relay peer: {peer_result.err()}" + + with peer_result.ok_value: + message = self.create_message( + payload=to_base64("S06 relay-only test payload"), + contentTopic="/test/1/s06-relay-only/proto", + ) + + send_result = sender.send_message(message=message) + assert send_result.is_ok(), f"send() failed: {send_result.err()}" + + request_id = send_result.ok_value + assert request_id, "send() returned an empty RequestId" + + propagated = wait_for_propagated( + collector=sender_collector, + request_id=request_id, + timeout_s=PROPAGATED_TIMEOUT_S, + ) + assert propagated is not None, ( + f"No message_propagated event within {PROPAGATED_TIMEOUT_S}s. " f"Collected events: {sender_collector.events}" + ) + assert propagated["requestId"] == request_id + + error = wait_for_error(sender_collector, request_id, timeout_s=0) + assert error is None, f"Unexpected message_error event: {error}" + + sent = wait_for_sent(sender_collector, request_id, timeout_s=0) + assert sent is None, f"Unexpected message_sent event (store is disabled): {sent}"