diff --git a/waku/wrapper.py b/waku/wrapper.py index 79ade59..dd684d4 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -6,7 +6,7 @@ from time import sleep from cffi import FFI from pathlib import Path from result import Result, Ok, Err -import time + logger = logging.getLogger(__name__) ffi = FFI() @@ -65,6 +65,19 @@ int logosdelivery_send( void *userData, const char *messageJson ); + +int logosdelivery_get_available_node_info_ids( + void *ctx, + FFICallBack callback, + void *userData +); + +int logosdelivery_get_node_info( + void *ctx, + FFICallBack callback, + void *userData, + const char *nodeInfoId +); """ ) @@ -82,7 +95,7 @@ def _new_cb_state(): } -def _wait_cb(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]: +def _wait_cb_raw(state, op_name: str, timeout_s: float = 20.0): finished = state["done"].wait(timeout_s) if not finished: return Err(f"{op_name}: timeout waiting for callback after {timeout_s}s") @@ -93,6 +106,15 @@ def _wait_cb(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]: if ret is None: return Err(f"{op_name}: callback fired but ret is None") + return Ok((ret, msg)) + + +def _wait_cb_status(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]: + wait_result = _wait_cb_raw(state, op_name, timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + ret, msg = wait_result.ok_value if ret != 0: return Err(f"{op_name}: failed (ret={ret}) msg={msg!r}") @@ -148,7 +170,7 @@ class NodeWrapper: if ctx == ffi.NULL: return Err("create_node: ctx is NULL") - wait_result = _wait_cb(state, "create_node", timeout_s) + wait_result = _wait_cb_status(state, "create_node", timeout_s) if wait_result.is_err(): return Err(wait_result.err()) @@ -194,7 +216,7 @@ class NodeWrapper: if rc != 0: return Err(f"start_node: immediate call failed (ret={rc})") - return _wait_cb(state, "start_node", timeout_s) + return _wait_cb_status(state, "start_node", timeout_s) def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() @@ -204,7 +226,7 @@ class NodeWrapper: if rc != 0: return Err(f"stop_node: immediate call failed (ret={rc})") - return _wait_cb(state, "stop_node", timeout_s) + return _wait_cb_status(state, "stop_node", timeout_s) def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() @@ -214,7 +236,7 @@ class NodeWrapper: if rc != 0: return Err(f"destroy: immediate call failed (ret={rc})") - return _wait_cb(state, "destroy", timeout_s) + return _wait_cb_status(state, "destroy", timeout_s) def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: stop_result = self.stop_node(timeout_s=timeout_s) @@ -236,7 +258,7 @@ class NodeWrapper: if rc != 0: return Err(f"subscribe_content_topic: immediate call failed (ret={rc})") - return _wait_cb(state, f"subscribe({content_topic})", timeout_s) + return _wait_cb_status(state, f"subscribe({content_topic})", timeout_s) def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() @@ -251,30 +273,108 @@ class NodeWrapper: if rc != 0: return Err(f"unsubscribe_content_topic: immediate call failed (ret={rc})") - return _wait_cb(state, f"unsubscribe({content_topic})", timeout_s) + return _wait_cb_status(state, f"unsubscribe({content_topic})", timeout_s) - def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[int, str]: + def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) message_json = json.dumps(message, separators=(",", ":"), ensure_ascii=False) - request_id = lib.logosdelivery_send( + rc = lib.logosdelivery_send( self.ctx, cb, ffi.NULL, message_json.encode("utf-8"), ) - if request_id < 0: - return Err(f"send_message: immediate call failed (ret={request_id})") + if rc < 0: + return Err(f"send_message: immediate call failed (ret={rc})") - wait_result = _wait_cb(state, "send_message", timeout_s) + wait_result = _wait_cb_raw(state, "send_message", timeout_s) if wait_result.is_err(): return Err(wait_result.err()) + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"send_message: failed (ret={cb_ret}) msg={cb_msg!r}") + + request_id = cb_msg.decode("utf-8") if cb_msg else "" return Ok(request_id) + def get_available_node_info_ids(self, *, timeout_s: float = 20.0) -> Result[int, str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + node_info_id = lib.logosdelivery_get_available_node_info_ids( + self.ctx, + cb, + ffi.NULL, + ) + + if node_info_id < 0: + return Err(f"get_available_node_info_ids: immediate call failed (ret={node_info_id})") + + wait_result = _wait_cb_raw(state, "get_available_node_info_ids", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + return Ok(node_info_id) + + def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + rc = lib.logosdelivery_get_node_info( + self.ctx, + cb, + ffi.NULL, + node_info_id.encode("utf-8"), + ) + + if rc < 0: + return Err(f"get_node_info: immediate call failed (ret={rc})") + + wait_result = _wait_cb_raw(state, "get_node_info", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"get_node_info: failed (ret={cb_ret}) msg={cb_msg!r}") + + if not cb_msg: + return Err("get_node_info: empty response") + + try: + result = json.loads(cb_msg.decode("utf-8")) + except Exception as e: + return Err(f"get_node_info: failed to parse response: {e}") + + return Ok(result) + + def debug_get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[tuple, str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + rc = lib.logosdelivery_get_node_info( + self.ctx, + cb, + ffi.NULL, + node_info_id.encode("utf-8"), + ) + + print(f"[DEBUG] get_node_info immediate rc={rc}") + + wait_result = _wait_cb_raw(state, "get_node_info", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + cb_ret, cb_msg = wait_result.ok_value + print(f"[DEBUG] get_node_info callback ret={cb_ret}, msg={cb_msg}") + + return Ok((rc, cb_ret, cb_msg)) + def main(): config = { "logLevel": "DEBUG", @@ -310,11 +410,12 @@ def main(): node = node_result.ok_value print(node.subscribe_content_topic(topic)) - #print(node.send_message(message)) - + print(node.send_message(message)) + #print(node.get_available_node_info_ids()) print(node.stop_node()) print(node.destroy()) if __name__ == "__main__": - main() \ No newline at end of file + main() +