From 2b448e2f0db7aa5d3f36d53db798331c505a9698 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Mon, 30 Mar 2026 11:51:34 +0200 Subject: [PATCH] clean version of wrappers --- waku/wrapper.py | 154 +++++++++++++++++++----------------------------- 1 file changed, 60 insertions(+), 94 deletions(-) diff --git a/waku/wrapper.py b/waku/wrapper.py index 9f9dd50..b53fd17 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -1,14 +1,10 @@ import json import threading -import logging -from time import sleep from cffi import FFI from pathlib import Path from result import Result, Ok, Err -logger = logging.getLogger(__name__) - ffi = FFI() ffi.cdef( @@ -97,34 +93,31 @@ def _new_cb_state(): return { "done": threading.Event(), "ret": None, - "msg": None, + "msg": b"", } def _wait_cb_raw(state, op_name: str, timeout_s: float = 20.0): - finished = state["done"].wait(timeout_s) - if not finished: - return Err(f"{op_name}: timeout waiting for callback after {timeout_s}s") + ok = state["done"].wait(timeout_s) + if not ok: + return Err(f"{op_name}: timeout after {timeout_s}s") - ret = state["ret"] - msg = state["msg"] or b"" + if state["ret"] is None: + return Err(f"{op_name}: callback ret is None") - if ret is None: - return Err(f"{op_name}: callback fired but ret is None") - - return Ok((ret, msg)) + return Ok((state["ret"], state["msg"])) -def _wait_cb_status(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]: +def _wait_cb_ok(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]: wait_result = _wait_cb_raw(state, op_name, timeout_s) if wait_result.is_err(): return Err(wait_result.err()) - ret, msg = wait_result.ok_value - if ret != 0: - return Err(f"{op_name}: failed (ret={ret}) msg={msg!r}") + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"{op_name}: callback failed (ret={cb_ret}) msg={cb_msg!r}") - return Ok(ret) + return Ok(cb_ret) class NodeWrapper: @@ -165,18 +158,18 @@ class NodeWrapper: config_buffer = ffi.new("char[]", config_json.encode("utf-8")) state = _new_cb_state() - create_cb = cls._make_waiting_cb(state) + cb = cls._make_waiting_cb(state) ctx = lib.logosdelivery_create_node( config_buffer, - create_cb, + cb, ffi.NULL, ) if ctx == ffi.NULL: return Err("create_node: ctx is NULL") - wait_result = _wait_cb_status(state, "create_node", timeout_s) + wait_result = _wait_cb_ok(state, "create_node", timeout_s) if wait_result.is_err(): return Err(wait_result.err()) @@ -208,6 +201,7 @@ class NodeWrapper: return Err(node_result.err()) node = node_result.ok_value + start_result = node.start_node(timeout_s=timeout_s) if start_result.is_err(): return Err(start_result.err()) @@ -222,7 +216,7 @@ class NodeWrapper: if rc != 0: return Err(f"start_node: immediate call failed (ret={rc})") - return _wait_cb_status(state, "start_node", timeout_s) + return _wait_cb_ok(state, "start_node", timeout_s) def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() @@ -232,7 +226,7 @@ class NodeWrapper: if rc != 0: return Err(f"stop_node: immediate call failed (ret={rc})") - return _wait_cb_status(state, "stop_node", timeout_s) + return _wait_cb_ok(state, "stop_node", timeout_s) def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() @@ -242,7 +236,12 @@ class NodeWrapper: if rc != 0: return Err(f"destroy: immediate call failed (ret={rc})") - return _wait_cb_status(state, "destroy", timeout_s) + wait_result = _wait_cb_ok(state, "destroy", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + self.ctx = ffi.NULL + return wait_result def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: stop_result = self.stop_node(timeout_s=timeout_s) @@ -264,7 +263,7 @@ class NodeWrapper: if rc != 0: return Err(f"subscribe_content_topic: immediate call failed (ret={rc})") - return _wait_cb_status(state, f"subscribe({content_topic})", timeout_s) + return _wait_cb_ok(state, f"subscribe({content_topic})", timeout_s) def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() @@ -279,7 +278,7 @@ class NodeWrapper: if rc != 0: return Err(f"unsubscribe_content_topic: immediate call failed (ret={rc})") - return _wait_cb_status(state, f"unsubscribe({content_topic})", timeout_s) + return _wait_cb_ok(state, f"unsubscribe({content_topic})", timeout_s) def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]: state = _new_cb_state() @@ -293,8 +292,7 @@ class NodeWrapper: ffi.NULL, message_json.encode("utf-8"), ) - - if rc < 0: + if rc != 0: return Err(f"send_message: immediate call failed (ret={rc})") wait_result = _wait_cb_raw(state, "send_message", timeout_s) @@ -303,7 +301,7 @@ class NodeWrapper: cb_ret, cb_msg = wait_result.ok_value if cb_ret != 0: - return Err(f"send_message: failed (ret={cb_ret}) msg={cb_msg!r}") + return Err(f"send_message: callback failed (ret={cb_ret}) msg={cb_msg!r}") request_id = cb_msg.decode("utf-8") if cb_msg else "" return Ok(request_id) @@ -312,20 +310,23 @@ class NodeWrapper: state = _new_cb_state() cb = self._make_waiting_cb(state) - node_info_id = lib.logosdelivery_get_available_node_info_ids( + rc = lib.logosdelivery_get_available_node_info_ids( self.ctx, cb, ffi.NULL, ) - - if node_info_id < 0: - return Err(f"get_available_node_info_ids: immediate call failed (ret={node_info_id})") + if rc < 0: + return Err(f"get_available_node_info_ids: immediate call failed (ret={rc})") wait_result = _wait_cb_raw(state, "get_available_node_info_ids", timeout_s) if wait_result.is_err(): return Err(wait_result.err()) - return Ok(node_info_id) + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"get_available_node_info_ids: callback failed (ret={cb_ret}) msg={cb_msg!r}") + + return Ok(rc) def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]: state = _new_cb_state() @@ -337,31 +338,34 @@ class NodeWrapper: ffi.NULL, node_info_id.encode("utf-8"), ) + if rc != 0: + return Err(f"get_node_info: immediate call failed (ret={rc})") - if rc < 0: - return Err(f"call failed rc={rc}") + wait_result = _wait_cb_raw(state, "get_node_info", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) - wait = _wait_cb_raw(state, "get_node_info", timeout_s) - if wait.is_err(): - return Err(wait.err()) + cb_ret, cb_msg = wait_result.ok_value + if cb_ret != 0: + return Err(f"get_node_info: callback failed (ret={cb_ret}) msg={cb_msg!r}") - cb_ret, cb_msg = wait.ok_value - if cb_ret != 0 or not cb_msg: - return Err(f"callback failed ret={cb_ret}") + if not cb_msg: + return Err("get_node_info: empty response") try: - return Ok(json.loads(cb_msg.decode())) - except Exception: + result = json.loads(cb_msg.decode("utf-8")) + except Exception as e: + return Err(f"get_node_info: invalid json: {e}") - return Err("invalid json") + return Ok(result) - def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[bytes, str]: + def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) rc = lib.logosdelivery_get_available_configs(self.ctx, cb, ffi.NULL) if rc != 0: - return Err(f"get_available_configs failed: {rc}") + return Err(f"get_available_configs: immediate call failed (ret={rc})") wait_result = _wait_cb_raw(state, "get_available_configs", timeout_s) if wait_result.is_err(): @@ -369,52 +373,14 @@ class NodeWrapper: cb_ret, cb_msg = wait_result.ok_value if cb_ret != 0: - return Err(f"get_available_configs failed: {cb_ret}") + return Err(f"get_available_configs: callback failed (ret={cb_ret}) msg={cb_msg!r}") - return Ok(cb_msg) + if not cb_msg: + return Err("get_available_configs: empty response") -def main(): - config = { - "logLevel": "DEBUG", - "mode": "Core", - "protocolsConfig": { - "entryNodes": [ - "/dns4/node-01.do-ams3.misc.logos-chat.status.im/tcp/30303/p2p/16Uiu2HAkxoqUTud5LUPQBRmkeL2xP4iKx2kaABYXomQRgmLUgf78" - ], - "clusterId": 3, - "autoShardingConfig": {"numShardsInCluster": 8}, - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000, - }, - } - - topic = "/test/1/chat/proto" - - message = { - "contentTopic": topic, - "payload": "SGVsbG8=", - "ephemeral": False, - } - - node_result = NodeWrapper.create_and_start(config) - sleep(20) - if node_result.is_err(): - print(node_result.err()) - return - - node = node_result.ok_value - - print(node.subscribe_content_topic(topic)) - print(node.send_message(message)) - #print(node.get_available_node_info_ids()) - #print(node.debug_get_available_configs()) - print(node.stop_node()) - print(node.destroy()) - - -if __name__ == "__main__": - main() + try: + result = json.loads(cb_msg.decode("utf-8")) + except Exception as e: + return Err(f"get_available_configs: invalid json: {e}") + return Ok(result) \ No newline at end of file