diff --git a/.gitmodules b/.gitmodules index a520c21..56513bd 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ -[submodule "vendor/nwaku"] - path = vendor/nwaku - url = git@github.com:waku-org/nwaku.git +[submodule "vendor/logos-delivery"] + path = vendor/logos-delivery + url = git@github.com:logos-messaging/logos-delivery.git diff --git a/README.md b/README.md index 003c2ca..baede2a 100644 --- a/README.md +++ b/README.md @@ -82,17 +82,15 @@ Then, follow the following steps from the root folder to rebuild the `libwaku.so` library: ```bash -cd vendor/nwaku +cd vendor/liblogosdelivery ``` ```bash -make libwaku -j8 +make liblogosdelivery -j8 ``` ```bash cd ../../ ``` ```bash -cp vendor/nwaku/build/libwaku.so lib/ +cp vendor/liblogosdelivery/build/liblogosdelivery.so lib/ ``` -Notice that the `libwaku.so` library is also distributed within -the `Py-Waku` package. diff --git a/vendor/logos-delivery b/vendor/logos-delivery new file mode 160000 index 0000000..ba85873 --- /dev/null +++ b/vendor/logos-delivery @@ -0,0 +1 @@ +Subproject commit ba85873f03a1da6ab04287949849815fd97b7bfd diff --git a/vendor/nwaku b/vendor/nwaku deleted file mode 160000 index f1d14e9..0000000 --- a/vendor/nwaku +++ /dev/null @@ -1 +0,0 @@ -Subproject commit f1d14e9942fc44168d5755cd5a7f0123da8c5649 diff --git a/waku/wrapper.py b/waku/wrapper.py new file mode 100644 index 0000000..55bf93e --- /dev/null +++ b/waku/wrapper.py @@ -0,0 +1,245 @@ +import json +import threading +import logging +from cffi import FFI +from pathlib import Path +from result import Result, Ok, Err + +logger = logging.getLogger(__name__) + +ffi = FFI() + +ffi.cdef( + """ +typedef void (*FFICallBack)(int callerRet, const char *msg, size_t len, void *userData); + +void *logosdelivery_create_node( + const char *configJson, + FFICallBack callback, + void *userData +); + +int logosdelivery_start_node( + void *ctx, + FFICallBack callback, + void *userData +); + +int logosdelivery_stop_node( + void *ctx, + FFICallBack callback, + void *userData +); + +void logosdelivery_set_event_callback( + void *ctx, + FFICallBack callback, + void *userData +); + +int logosdelivery_destroy( + void *ctx, + FFICallBack callback, + void *userData +); + +int logosdelivery_subscribe( + void *ctx, + FFICallBack callback, + void *userData, + const char *contentTopic +); + +int logosdelivery_unsubscribe( + void *ctx, + FFICallBack callback, + void *userData, + const char *contentTopic +); +""" +) + +_repo_root = Path(__file__).resolve().parents[1] +lib = ffi.dlopen(str(_repo_root / "lib" / "liblogosdelivery.so")) + +CallbackType = ffi.callback("void(int, const char*, size_t, void*)") + + +def _new_cb_state(): + return { + "done": threading.Event(), + "ret": None, + "msg": None, + } + + +def _wait_cb(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]: + finished = state["done"].wait(timeout_s) + if not finished: + return Err(f"{op_name}: timeout waiting for callback after {timeout_s}s") + + ret = state["ret"] + msg = state["msg"] or b"" + + if ret is None: + return Err(f"{op_name}: callback fired but ret is None") + + if ret != 0: + return Err(f"{op_name}: failed (ret={ret}) msg={msg!r}") + + return Ok(ret) + + +class NodeWrapper: + def __init__(self, ctx, config_buffer, event_cb_handler): + self.ctx = ctx + self._config_buffer = config_buffer + self._event_cb_handler = event_cb_handler + + @staticmethod + def _make_waiting_cb(state): + def c_cb(ret, char_p, length, userData): + msg = ffi.buffer(char_p, length)[:] if char_p != ffi.NULL else b"" + + if not state["done"].is_set(): + state["ret"] = int(ret) + state["msg"] = msg + state["done"].set() + + return CallbackType(c_cb) + + @staticmethod + def _make_event_cb(py_callback): + def c_cb(ret, char_p, length, userData): + msg = ffi.buffer(char_p, length)[:] if char_p != ffi.NULL else b"" + py_callback(int(ret), msg) + + return CallbackType(c_cb) + + @classmethod + def create_node( + cls, + config: dict, + event_cb=None, + *, + timeout_s: float = 20.0, + ) -> Result["NodeWrapper", str]: + config_json = json.dumps(config, separators=(",", ":"), ensure_ascii=False) + config_buffer = ffi.new("char[]", config_json.encode("utf-8")) + + state = _new_cb_state() + create_cb = cls._make_waiting_cb(state) + + ctx = lib.logosdelivery_create_node( + config_buffer, + create_cb, + ffi.NULL, + ) + + if ctx == ffi.NULL: + return Err("create_node: ctx is NULL") + + wait_result = _wait_cb(state, "create_node", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) + + event_cb_handler = None + if event_cb is not None: + event_cb_handler = cls._make_event_cb(event_cb) + lib.logosdelivery_set_event_callback( + ctx, + event_cb_handler, + ffi.NULL, + ) + + return Ok(cls(ctx, config_buffer, event_cb_handler)) + + @classmethod + def create_and_start( + cls, + config: dict, + event_cb=None, + *, + timeout_s: float = 20.0, + ) -> Result["NodeWrapper", str]: + node_result = cls.create_node( + config=config, + event_cb=event_cb, + timeout_s=timeout_s, + ) + if node_result.is_err(): + return Err(node_result.err()) + + node = node_result.ok_value + start_result = node.start_node(timeout_s=timeout_s) + if start_result.is_err(): + return Err(start_result.err()) + + return Ok(node) + + def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + rc = lib.logosdelivery_start_node(self.ctx, cb, ffi.NULL) + if rc != 0: + return Err(f"start_node: immediate call failed (ret={rc})") + + return _wait_cb(state, "start_node", timeout_s) + + def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + rc = lib.logosdelivery_stop_node(self.ctx, cb, ffi.NULL) + if rc != 0: + return Err(f"stop_node: immediate call failed (ret={rc})") + + return _wait_cb(state, "stop_node", timeout_s) + + def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + rc = lib.logosdelivery_destroy(self.ctx, cb, ffi.NULL) + if rc != 0: + return Err(f"destroy: immediate call failed (ret={rc})") + + return _wait_cb(state, "destroy", timeout_s) + + def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: + stop_result = self.stop_node(timeout_s=timeout_s) + if stop_result.is_err(): + return Err(stop_result.err()) + + return self.destroy(timeout_s=timeout_s) + + def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + rc = lib.logosdelivery_subscribe( + self.ctx, + cb, + ffi.NULL, + content_topic.encode("utf-8"), + ) + if rc != 0: + return Err(f"subscribe_content_topic: immediate call failed (ret={rc})") + + return _wait_cb(state, f"subscribe({content_topic})", timeout_s) + + def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + rc = lib.logosdelivery_unsubscribe( + self.ctx, + cb, + ffi.NULL, + content_topic.encode("utf-8"), + ) + if rc != 0: + return Err(f"unsubscribe_content_topic: immediate call failed (ret={rc})") + + return _wait_cb(state, f"unsubscribe({content_topic})", timeout_s) \ No newline at end of file