From 221af4f84331211e4a62cdfee032aca377161437 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Mon, 2 Mar 2026 10:11:56 +0100 Subject: [PATCH] Make changes to allow callback assertion inside each API --- waku/wrapper.py | 216 +++++++++++++++++++++++++++++------------------- 1 file changed, 129 insertions(+), 87 deletions(-) diff --git a/waku/wrapper.py b/waku/wrapper.py index 8532c58..aaf5667 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -1,10 +1,15 @@ import json +import threading +import logging from cffi import FFI from pathlib import Path +logger = logging.getLogger(__name__) + ffi = FFI() -ffi.cdef(""" +ffi.cdef( + """ typedef void (*FFICallBack)(int callerRet, const char *msg, size_t len, void *userData); void *logosdelivery_create_node( @@ -50,129 +55,166 @@ int logosdelivery_unsubscribe( void *userData, const char *contentTopic ); -""") +""" +) _repo_root = Path(__file__).resolve().parents[1] lib = ffi.dlopen(str(_repo_root / "lib" / "liblogosdelivery.so")) CallbackType = ffi.callback("void(int, const char*, size_t, void*)") + +def _new_cb_state(): + return { + "done": threading.Event(), + "ret": None, + "msg": None, + } + + +def _wait_cb(state, op_name: str, timeout_s: float = 20.0): + finished = state["done"].wait(timeout_s) + if not finished: + raise TimeoutError(f"{op_name}: timeout waiting for callback after {timeout_s}s") + + ret = state["ret"] + msg = state["msg"] or b"" + + if ret is None: + raise RuntimeError(f"{op_name}: callback fired but ret is None") + + if ret != 0: + raise RuntimeError(f"{op_name}: failed (ret={ret}) msg={msg!r}") + + class NodeWrapper: - def __init__(self, ctx, config_buffer): + def __init__(self, ctx, config_buffer, event_cb_handler): self.ctx = ctx self._config_buffer = config_buffer - self._event_cb_handler = None + self._event_cb_handler = event_cb_handler @staticmethod - def _make_cb(py_callback): + def _make_cb(py_callback, state=None): def c_cb(ret, char_p, length, userData): msg = ffi.buffer(char_p, length)[:] - py_callback(ret, msg) + + if state is not None and not state["done"].is_set(): + state["ret"] = int(ret) + state["msg"] = msg + state["done"].set() + + if py_callback is not None: + py_callback(int(ret), msg) return CallbackType(c_cb) @classmethod - def create_node(cls, config: dict, py_callback): + def create_node( + cls, + config: dict, + create_cb=None, + event_cb=None, + *, + timeout_s: float = 20.0, + ): config_json = json.dumps(config, separators=(",", ":"), ensure_ascii=False) config_buffer = ffi.new("char[]", config_json.encode("utf-8")) - cb = cls._make_cb(py_callback) + state = _new_cb_state() + create_c_cb = cls._make_cb(create_cb, state) ctx = lib.logosdelivery_create_node( config_buffer, - cb, + create_c_cb, ffi.NULL, ) - return cls(ctx, config_buffer) + if ctx == ffi.NULL: + raise RuntimeError("create_node: ctx is NULL") - def start_node(self, py_callback): - cb = self._make_cb(py_callback) + _wait_cb(state, "create_node", timeout_s) - ret = lib.logosdelivery_start_node( - self.ctx, - cb, - ffi.NULL, - ) + event_cb_handler = None + if event_cb is not None: + event_cb_handler = cls._make_cb(event_cb, state=None) + lib.logosdelivery_set_event_callback( + ctx, + event_cb_handler, + ffi.NULL, + ) - return int(ret) + return cls(ctx, config_buffer, event_cb_handler) - @classmethod - def create_and_start(cls, config: dict, create_cb, start_cb): - node = cls.create_node(config, create_cb) - rc = node.start_node(start_cb) - return node, rc + def start_node(self, start_cb=None, *, timeout_s: float = 20.0): + state = _new_cb_state() + cb = self._make_cb(start_cb, state) - def stop_node(self, py_callback): - cb = self._make_cb(py_callback) - - ret = lib.logosdelivery_stop_node( - self.ctx, - cb, - ffi.NULL, - ) - - return int(ret) - - self._event_cb_handler = cb - - def destroy(self, py_callback): - cb = self._make_cb(py_callback) - - ret = lib.logosdelivery_destroy( - self.ctx, - cb, - ffi.NULL, - ) - - return int(ret) - - def stop_and_destroy(self, callback): - stop_rc = self.stop_node(callback) - if stop_rc != 0: - raise RuntimeError(f"Stop failed (ret={stop_rc})") - - destroy_rc = self.destroy(callback) - if destroy_rc != 0: - raise RuntimeError(f"Destroy failed (ret={destroy_rc})") + rc = int(lib.logosdelivery_start_node(self.ctx, cb, ffi.NULL)) + if rc != 0: + raise RuntimeError(f"start_node: immediate call failed (ret={rc})") + _wait_cb(state, "start_node", timeout_s) return 0 - - def subscribe_content_topic(self, content_topic: str, py_callback): - cb = self._make_cb(py_callback) - ret = lib.logosdelivery_subscribe( - self.ctx, - cb, - ffi.NULL, - content_topic.encode("utf-8"), + def stop_node(self, stop_cb=None, *, timeout_s: float = 20.0): + state = _new_cb_state() + cb = self._make_cb(stop_cb, state) + + rc = int(lib.logosdelivery_stop_node(self.ctx, cb, ffi.NULL)) + if rc != 0: + raise RuntimeError(f"stop_node: immediate call failed (ret={rc})") + + _wait_cb(state, "stop_node", timeout_s) + return 0 + + def destroy(self, destroy_cb=None, *, timeout_s: float = 20.0): + state = _new_cb_state() + cb = self._make_cb(destroy_cb, state) + + rc = int(lib.logosdelivery_destroy(self.ctx, cb, ffi.NULL)) + if rc != 0: + raise RuntimeError(f"destroy: immediate call failed (ret={rc})") + + _wait_cb(state, "destroy", timeout_s) + return 0 + + def stop_and_destroy(self, cb=None, *, timeout_s: float = 20.0): + self.stop_node(stop_cb=cb, timeout_s=timeout_s) + self.destroy(destroy_cb=cb, timeout_s=timeout_s) + return 0 + + def subscribe_content_topic(self, content_topic: str, subscribe_cb=None, *, timeout_s: float = 20.0): + state = _new_cb_state() + cb = self._make_cb(subscribe_cb, state) + + rc = int( + lib.logosdelivery_subscribe( + self.ctx, + cb, + ffi.NULL, + content_topic.encode("utf-8"), + ) ) + if rc != 0: + raise RuntimeError(f"subscribe_content_topic: immediate call failed (ret={rc})") - return int(ret) + _wait_cb(state, f"subscribe({content_topic})", timeout_s) + return 0 - def unsubscribe_content_topic(self, content_topic: str, py_callback): - cb = self._make_cb(py_callback) + def unsubscribe_content_topic(self, content_topic: str, unsubscribe_cb=None, *, timeout_s: float = 20.0): + state = _new_cb_state() + cb = self._make_cb(unsubscribe_cb, state) - ret = lib.logosdelivery_unsubscribe( - self.ctx, - cb, - ffi.NULL, - content_topic.encode("utf-8"), + rc = int( + lib.logosdelivery_unsubscribe( + self.ctx, + cb, + ffi.NULL, + content_topic.encode("utf-8"), + ) ) + if rc != 0: + raise RuntimeError(f"unsubscribe_content_topic: immediate call failed (ret={rc})") - return int(ret) - - def set_event_callback(self, py_callback): - def c_cb(ret, char_p, length, userData): - msg = ffi.buffer(char_p, length)[:] - py_callback(ret, msg) - - cb = CallbackType(c_cb) - - lib.logosdelivery_set_event_callback( - self.ctx, - cb, - ffi.NULL, - ) - - self._event_cb_handler = cb + _wait_cb(state, f"unsubscribe({content_topic})", timeout_s) + return 0 \ No newline at end of file