diff --git a/.gitignore b/.gitignore index eac0912..cb7b31c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ venv/ dist/ waku.egg-info/ waku/__pycache__/ + diff --git a/vendor/logos-delivery b/vendor/logos-delivery deleted file mode 160000 index ba85873..0000000 --- a/vendor/logos-delivery +++ /dev/null @@ -1 +0,0 @@ -Subproject commit ba85873f03a1da6ab04287949849815fd97b7bfd diff --git a/vendor/nwaku b/vendor/nwaku new file mode 160000 index 0000000..f1d14e9 --- /dev/null +++ b/vendor/nwaku @@ -0,0 +1 @@ +Subproject commit f1d14e9942fc44168d5755cd5a7f0123da8c5649 diff --git a/waku/wrapper.py b/waku/wrapper.py index 55bf93e..d232388 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -1,12 +1,10 @@ import json import threading -import logging + from cffi import FFI from pathlib import Path from result import Result, Ok, Err -logger = logging.getLogger(__name__) - ffi = FFI() ffi.cdef( @@ -56,6 +54,32 @@ int logosdelivery_unsubscribe( void *userData, const char *contentTopic ); + +int logosdelivery_send( + void *ctx, + FFICallBack callback, + void *userData, + const char *messageJson +); + +int logosdelivery_get_available_node_info_ids( + void *ctx, + FFICallBack callback, + void *userData +); + +int logosdelivery_get_node_info( + void *ctx, + FFICallBack callback, + void *userData, + const char *nodeInfoId +); + +int logosdelivery_get_available_configs( + void *ctx, + FFICallBack callback, + void *userData +); """ ) @@ -69,25 +93,35 @@ def _new_cb_state(): return { "done": threading.Event(), "ret": None, - "msg": None, + "msg": b"", } -def _wait_cb(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]: - finished = state["done"].wait(timeout_s) - if not finished: - return Err(f"{op_name}: timeout waiting for callback after {timeout_s}s") +def _wait_cb_raw( + state, + op_name: str, + timeout_s: float = 20.0, +) -> Result[tuple[int, bytes], str]: + ok = state["done"].wait(timeout_s) + if not ok: + return Err(f"{op_name}: timeout after {timeout_s}s") - ret = state["ret"] - msg = state["msg"] or b"" + if state["ret"] is None: + return Err(f"{op_name}: callback ret is None") - if ret is None: - return Err(f"{op_name}: callback fired but ret is None") + return Ok((state["ret"], state["msg"])) - if ret != 0: - return Err(f"{op_name}: failed (ret={ret}) msg={msg!r}") - return Ok(ret) +def _wait_cb_ok(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]: + wait_result = _wait_cb_raw(state, op_name, timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"callback failed in _wait_cb_ok: {op_name} (ret={cb_ret}) msg={cb_msg!r}") + + return Ok(cb_ret) class NodeWrapper: @@ -128,18 +162,18 @@ class NodeWrapper: config_buffer = ffi.new("char[]", config_json.encode("utf-8")) state = _new_cb_state() - create_cb = cls._make_waiting_cb(state) + cb = cls._make_waiting_cb(state) ctx = lib.logosdelivery_create_node( config_buffer, - create_cb, + cb, ffi.NULL, ) if ctx == ffi.NULL: return Err("create_node: ctx is NULL") - wait_result = _wait_cb(state, "create_node", timeout_s) + wait_result = _wait_cb_ok(state, "create_node", timeout_s) if wait_result.is_err(): return Err(wait_result.err()) @@ -171,6 +205,7 @@ class NodeWrapper: return Err(node_result.err()) node = node_result.ok_value + start_result = node.start_node(timeout_s=timeout_s) if start_result.is_err(): return Err(start_result.err()) @@ -185,7 +220,7 @@ class NodeWrapper: if rc != 0: return Err(f"start_node: immediate call failed (ret={rc})") - return _wait_cb(state, "start_node", timeout_s) + return _wait_cb_ok(state, "start_node", timeout_s) def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() @@ -195,7 +230,7 @@ class NodeWrapper: if rc != 0: return Err(f"stop_node: immediate call failed (ret={rc})") - return _wait_cb(state, "stop_node", timeout_s) + return _wait_cb_ok(state, "stop_node", timeout_s) def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() @@ -205,7 +240,12 @@ class NodeWrapper: if rc != 0: return Err(f"destroy: immediate call failed (ret={rc})") - return _wait_cb(state, "destroy", timeout_s) + wait_result = _wait_cb_ok(state, "destroy", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + self.ctx = ffi.NULL + return wait_result def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: stop_result = self.stop_node(timeout_s=timeout_s) @@ -227,7 +267,7 @@ class NodeWrapper: if rc != 0: return Err(f"subscribe_content_topic: immediate call failed (ret={rc})") - return _wait_cb(state, f"subscribe({content_topic})", timeout_s) + return _wait_cb_ok(state, f"subscribe({content_topic})", timeout_s) def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() @@ -242,4 +282,111 @@ class NodeWrapper: if rc != 0: return Err(f"unsubscribe_content_topic: immediate call failed (ret={rc})") - return _wait_cb(state, f"unsubscribe({content_topic})", timeout_s) \ No newline at end of file + return _wait_cb_ok(state, f"unsubscribe({content_topic})", timeout_s) + + def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + message_json = json.dumps(message, separators=(",", ":"), ensure_ascii=False) + + rc = lib.logosdelivery_send( + self.ctx, + cb, + ffi.NULL, + message_json.encode("utf-8"), + ) + if rc != 0: + return Err(f"send_message: immediate call failed (ret={rc})") + + wait_result = _wait_cb_raw(state, "send_message", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"send_message: callback failed (ret={cb_ret}) msg={cb_msg!r}") + + request_id = cb_msg.decode("utf-8") if cb_msg else "" + return Ok(request_id) + + def get_available_node_info_ids(self, *, timeout_s: float = 20.0) -> Result[list[str], str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + rc = lib.logosdelivery_get_available_node_info_ids(self.ctx, cb, ffi.NULL) + if rc != 0: + return Err(f"get_available_node_info_ids: immediate call failed (ret={rc})") + + wait_result = _wait_cb_raw(state, "get_available_node_info_ids", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"get_available_node_info_ids: callback failed (ret={cb_ret})") + if not cb_msg: + return Err("get_available_node_info_ids: empty response") + + try: + return Ok(json.loads(cb_msg.decode("utf-8").strip().lstrip("@"))) + except Exception as e: + return Err(f"get_available_node_info_ids: invalid response: {e}") + + def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + rc = lib.logosdelivery_get_node_info( + self.ctx, + cb, + ffi.NULL, + node_info_id.encode("utf-8"), + ) + if rc != 0: + return Err(f"get_node_info: immediate call failed (ret={rc})") + + wait_result = _wait_cb_raw(state, "get_node_info", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"get_node_info: callback failed (ret={cb_ret}) msg={cb_msg!r}") + + if not cb_msg: + return Err("get_node_info: empty response") + + try: + result = json.loads(cb_msg.decode("utf-8")) + except Exception as e: + return Err(f"get_node_info: invalid json: {e}") + + return Ok(result) + + def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + rc = lib.logosdelivery_get_available_configs(self.ctx, cb, ffi.NULL) + if rc != 0: + return Err(f"get_available_configs: immediate call failed (ret={rc})") + + wait_result = _wait_cb_raw(state, "get_available_configs", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"get_available_configs: callback failed (ret={cb_ret}) msg={cb_msg!r}") + + if not cb_msg: + return Err("get_available_configs: empty response") + + try: + result = json.loads(cb_msg.decode("utf-8")) + except Exception as e: + return Err(f"get_available_configs: invalid json: {e}") + + return Ok(result) +