diff --git a/waku/wrapper.py b/waku/wrapper.py index fb3e8b6..55bf93e 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -3,6 +3,7 @@ import threading import logging from cffi import FFI from pathlib import Path +from result import Result, Ok, Err logger = logging.getLogger(__name__) @@ -72,19 +73,21 @@ def _new_cb_state(): } -def _wait_cb(state, op_name: str, timeout_s: float = 20.0): +def _wait_cb(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]: finished = state["done"].wait(timeout_s) if not finished: - raise TimeoutError(f"{op_name}: timeout waiting for callback after {timeout_s}s") + return Err(f"{op_name}: timeout waiting for callback after {timeout_s}s") ret = state["ret"] msg = state["msg"] or b"" if ret is None: - raise RuntimeError(f"{op_name}: callback fired but ret is None") + return Err(f"{op_name}: callback fired but ret is None") if ret != 0: - raise RuntimeError(f"{op_name}: failed (ret={ret}) msg={msg!r}") + return Err(f"{op_name}: failed (ret={ret}) msg={msg!r}") + + return Ok(ret) class NodeWrapper: @@ -120,7 +123,7 @@ class NodeWrapper: event_cb=None, *, timeout_s: float = 20.0, - ): + ) -> Result["NodeWrapper", str]: config_json = json.dumps(config, separators=(",", ":"), ensure_ascii=False) config_buffer = ffi.new("char[]", config_json.encode("utf-8")) @@ -134,9 +137,11 @@ class NodeWrapper: ) if ctx == ffi.NULL: - raise RuntimeError("create_node: ctx is NULL") + return Err("create_node: ctx is NULL") - _wait_cb(state, "create_node", timeout_s) + wait_result = _wait_cb(state, "create_node", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) event_cb_handler = None if event_cb is not None: @@ -147,7 +152,7 @@ class NodeWrapper: ffi.NULL, ) - return cls(ctx, config_buffer, event_cb_handler) + return Ok(cls(ctx, config_buffer, event_cb_handler)) @classmethod def create_and_start( @@ -156,54 +161,60 @@ class NodeWrapper: event_cb=None, *, timeout_s: float = 20.0, - ): - node = cls.create_node( + ) -> Result["NodeWrapper", str]: + node_result = cls.create_node( config=config, event_cb=event_cb, timeout_s=timeout_s, ) - node.start_node(timeout_s=timeout_s) - return node + if node_result.is_err(): + return Err(node_result.err()) - def start_node(self, *, timeout_s: float = 20.0): + node = node_result.ok_value + start_result = node.start_node(timeout_s=timeout_s) + if start_result.is_err(): + return Err(start_result.err()) + + return Ok(node) + + def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) rc = lib.logosdelivery_start_node(self.ctx, cb, ffi.NULL) if rc != 0: - raise RuntimeError(f"start_node: immediate call failed (ret={rc})") + return Err(f"start_node: immediate call failed (ret={rc})") - _wait_cb(state, "start_node", timeout_s) - return 0 + return _wait_cb(state, "start_node", timeout_s) - def stop_node(self, *, timeout_s: float = 20.0): + def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) rc = lib.logosdelivery_stop_node(self.ctx, cb, ffi.NULL) if rc != 0: - raise RuntimeError(f"stop_node: immediate call failed (ret={rc})") + return Err(f"stop_node: immediate call failed (ret={rc})") - _wait_cb(state, "stop_node", timeout_s) - return 0 + return _wait_cb(state, "stop_node", timeout_s) - def destroy(self, *, timeout_s: float = 20.0): + def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) rc = lib.logosdelivery_destroy(self.ctx, cb, ffi.NULL) if rc != 0: - raise RuntimeError(f"destroy: immediate call failed (ret={rc})") + return Err(f"destroy: immediate call failed (ret={rc})") - _wait_cb(state, "destroy", timeout_s) - return 0 + return _wait_cb(state, "destroy", timeout_s) - def stop_and_destroy(self, *, timeout_s: float = 20.0): - self.stop_node(timeout_s=timeout_s) - self.destroy(timeout_s=timeout_s) - return 0 + def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: + stop_result = self.stop_node(timeout_s=timeout_s) + if stop_result.is_err(): + return Err(stop_result.err()) - def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): + return self.destroy(timeout_s=timeout_s) + + def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) @@ -214,12 +225,11 @@ class NodeWrapper: content_topic.encode("utf-8"), ) if rc != 0: - raise RuntimeError(f"subscribe_content_topic: immediate call failed (ret={rc})") + return Err(f"subscribe_content_topic: immediate call failed (ret={rc})") - _wait_cb(state, f"subscribe({content_topic})", timeout_s) - return 0 + return _wait_cb(state, f"subscribe({content_topic})", timeout_s) - def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): + def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) @@ -230,8 +240,6 @@ class NodeWrapper: content_topic.encode("utf-8"), ) if rc != 0: - raise RuntimeError(f"unsubscribe_content_topic: immediate call failed (ret={rc})") + return Err(f"unsubscribe_content_topic: immediate call failed (ret={rc})") - _wait_cb(state, f"unsubscribe({content_topic})", timeout_s) - - return 0 + return _wait_cb(state, f"unsubscribe({content_topic})", timeout_s) \ No newline at end of file