import sys from pathlib import Path from result import Result, Ok, Err _THIRD_PARTY = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku" if str(_THIRD_PARTY) not in sys.path: sys.path.insert(0, str(_THIRD_PARTY)) from wrapper import NodeWrapper as _NodeWrapper # type: ignore[import] """" thin manager/wrapper layer around NodeWrapper from the bindings. It simplifies create, start, and interaction with a Waku node while returning consistent Result objects (Ok / Err). """ class WrapperManager: def __init__(self, node: _NodeWrapper): self._node = node @classmethod def create( cls, config: dict, event_cb=None, *, timeout_s: float = 20.0, ) -> Result["WrapperManager", str]: result = _NodeWrapper.create_node(config, event_cb, timeout_s=timeout_s) if result.is_err(): return Err(result.err()) return Ok(cls(result.ok_value)) @classmethod def create_and_start( cls, config: dict, event_cb=None, *, timeout_s: float = 20.0, ) -> Result["WrapperManager", str]: result = _NodeWrapper.create_and_start(config, event_cb, timeout_s=timeout_s) if result.is_err(): return Err(result.err()) return Ok(cls(result.ok_value)) def __enter__(self) -> "WrapperManager": return self def __exit__(self, *_) -> None: self.stop_and_destroy() def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: return self._node.start_node(timeout_s=timeout_s) def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: return self._node.stop_node(timeout_s=timeout_s) def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: return self._node.destroy(timeout_s=timeout_s) def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: return self._node.stop_and_destroy(timeout_s=timeout_s) def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: return self._node.subscribe_content_topic(content_topic, timeout_s=timeout_s) def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: return self._node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s) def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]: return self._node.send_message(message, timeout_s=timeout_s) def get_available_node_info_ids(self, *, timeout_s: float = 20.0) -> Result[list[str], str]: return self._node.get_available_node_info_ids(timeout_s=timeout_s) def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]: return self._node.get_node_info(node_info_id, timeout_s=timeout_s) def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]: return self._node.get_available_configs(timeout_s=timeout_s)