From 456b3d273d2adad910fd8d77ef35822bda89d7aa Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Thu, 19 Feb 2026 02:13:55 +0100 Subject: [PATCH 01/11] Add wrapper for first function --- waku/wrapper.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 waku/wrapper.py diff --git a/waku/wrapper.py b/waku/wrapper.py new file mode 100644 index 0000000..e69de29 From 0f401533274f3248c07ccb29bac6346c7826eadd Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Thu, 19 Feb 2026 13:22:44 +0100 Subject: [PATCH 02/11] Add first function wrapper --- waku/wrapper.py | 52 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/waku/wrapper.py b/waku/wrapper.py index e69de29..f2cba26 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -0,0 +1,52 @@ +from cffi import FFI +from pathlib import Path +import json + + +config = { + "relay": True, + "discv5Discovery": True, + "peerExchange": True, + "clusterId": 3, + "shard": 0, + "rlnRelay": False +} +config_json1 = json.dumps(config) +ffi = FFI() + +_repo_root = Path(__file__).resolve().parents[1] +lib = ffi.dlopen(str(_repo_root / "lib" / "liblogosdelivery.so")) + +ffi.cdef(""" + typedef void (*FFICallBack)(int callerRet, const char *msg, size_t len, void *userData); + + void *logosdelivery_create_node( + const char *configJson, + FFICallBack callback, + void *userData + ); +""") + +def process_callback(ret, char_p, length, callback): + byte_string = ffi.buffer(char_p, length)[:] if char_p != ffi.NULL and length else b"" + callback(ret, byte_string) + +CallbackType = ffi.callback("void(int, const char*, size_t, void*)") + +def logosdelivery_create_node(config_json, callback): + def cb(ret, char_p, length, userData): + process_callback(ret, char_p, length, callback) + + return lib.logosdelivery_create_node( + config_json.encode("utf-8"), + CallbackType(cb), + ffi.cast("void*", 0), + ) + +if __name__ == "__main__": + def cb(ret, msg): + print("ret:", ret) + print("msg:", msg) + + ctx = logosdelivery_create_node(config_json1, cb) + print("ctx:", ctx) \ No newline at end of file From cab26c889fc3abb82487a520ec9a143be52a5c93 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Thu, 19 Feb 2026 14:47:08 +0100 Subject: [PATCH 03/11] test create node API and add start node API --- waku/wrapper.py | 91 +++++++++++++++++++++++++++++++------------------ 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/waku/wrapper.py b/waku/wrapper.py index f2cba26..3403270 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -1,52 +1,75 @@ +import json from cffi import FFI from pathlib import Path -import json - -config = { - "relay": True, - "discv5Discovery": True, - "peerExchange": True, - "clusterId": 3, - "shard": 0, - "rlnRelay": False -} -config_json1 = json.dumps(config) ffi = FFI() +ffi.cdef(""" +typedef void (*FFICallBack)(int callerRet, const char *msg, size_t len, void *userData); + +void *logosdelivery_create_node( + const char *configJson, + FFICallBack callback, + void *userData +); + +int logosdelivery_start_node( + void *ctx, + FFICallBack callback, + void *userData +); +""") + _repo_root = Path(__file__).resolve().parents[1] lib = ffi.dlopen(str(_repo_root / "lib" / "liblogosdelivery.so")) -ffi.cdef(""" - typedef void (*FFICallBack)(int callerRet, const char *msg, size_t len, void *userData); - - void *logosdelivery_create_node( - const char *configJson, - FFICallBack callback, - void *userData - ); -""") - -def process_callback(ret, char_p, length, callback): - byte_string = ffi.buffer(char_p, length)[:] if char_p != ffi.NULL and length else b"" - callback(ret, byte_string) - CallbackType = ffi.callback("void(int, const char*, size_t, void*)") -def logosdelivery_create_node(config_json, callback): - def cb(ret, char_p, length, userData): - process_callback(ret, char_p, length, callback) +class NodeHandle: + def __init__(self, ctx, cb_handle): + self.ctx = ctx + self._cb_handle = cb_handle # keep callback alive - return lib.logosdelivery_create_node( - config_json.encode("utf-8"), - CallbackType(cb), - ffi.cast("void*", 0), +def logosdelivery_create_node(config: dict, py_callback): + config_json = json.dumps(config, separators=(",", ":"), ensure_ascii=False) + cnfig_bytes = config_json.encode("utf-8") + + def c_cb(ret, char_p, length, userData): + if char_p != ffi.NULL and length : + msg = ffi.buffer(char_p, length)[:] + else : + msg = b"" + py_callback(ret, msg) + + cb_handle = CallbackType(c_cb) + ctx = lib.logosdelivery_create_node( + cnfig_bytes, + cb_handle, + ffi.NULL, ) + return NodeHandle(ctx, cb_handle) if __name__ == "__main__": + config = { + "logLevel": "DEBUG", + "mode": "Core", + "protocolsConfig": { + "entryNodes": [ + "/dns4/node-01.do-ams3.misc.logos-chat.status.im/tcp/30303/p2p/16Uiu2HAkxoqUTud5LUPQBRmkeL2xP4iKx2kaABYXomQRgmLUgf78" + ], + "clusterId": 3, + "autoShardingConfig": {"numShardsInCluster": 8}, + }, + "networkingConfig": { + "listenIpv4": "0.0.0.0", + "p2pTcpPort": 60000, + "discv5UdpPort": 9000, + }, + } + def cb(ret, msg): print("ret:", ret) print("msg:", msg) - ctx = logosdelivery_create_node(config_json1, cb) - print("ctx:", ctx) \ No newline at end of file + h = logosdelivery_create_node(config, cb) + print("ctx:", h.ctx) From 36a8b2945d3592d3dd77fd240623a1116efcc92c Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Sun, 22 Feb 2026 19:55:07 +0100 Subject: [PATCH 04/11] Add rest of wrappers --- waku/wrapper.py | 176 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 152 insertions(+), 24 deletions(-) diff --git a/waku/wrapper.py b/waku/wrapper.py index 3403270..f338942 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -18,6 +18,38 @@ int logosdelivery_start_node( FFICallBack callback, void *userData ); + +int logosdelivery_stop_node( + void *ctx, + FFICallBack callback, + void *userData +); + +void logosdelivery_set_event_callback( + void *ctx, + FFICallBack callback, + void *userData +); + +int logosdelivery_destroy( + void *ctx, + FFICallBack callback, + void *userData +); + +int logosdelivery_subscribe( + void *ctx, + FFICallBack callback, + void *userData, + const char *contentTopic +); + +int logosdelivery_unsubscribe( + void *ctx, + FFICallBack callback, + void *userData, + const char *contentTopic +); """) _repo_root = Path(__file__).resolve().parents[1] @@ -25,29 +57,112 @@ lib = ffi.dlopen(str(_repo_root / "lib" / "liblogosdelivery.so")) CallbackType = ffi.callback("void(int, const char*, size_t, void*)") -class NodeHandle: - def __init__(self, ctx, cb_handle): + +class NodeWrapper: + def __init__(self, ctx, config_buffer): self.ctx = ctx - self._cb_handle = cb_handle # keep callback alive + self._config_buffer = config_buffer + self._event_cb_handler = None -def logosdelivery_create_node(config: dict, py_callback): - config_json = json.dumps(config, separators=(",", ":"), ensure_ascii=False) - cnfig_bytes = config_json.encode("utf-8") - - def c_cb(ret, char_p, length, userData): - if char_p != ffi.NULL and length : + @staticmethod + def _make_cb(py_callback): + def c_cb(ret, char_p, length, userData): msg = ffi.buffer(char_p, length)[:] - else : - msg = b"" - py_callback(ret, msg) + py_callback(ret, msg) + + return CallbackType(c_cb) + + @classmethod + def create_node(cls, config: dict, py_callback): + config_json = json.dumps(config, separators=(",", ":"), ensure_ascii=False) + config_buffer = ffi.new("char[]", config_json.encode("utf-8")) + + cb = cls._make_cb(py_callback) + + ctx = lib.logosdelivery_create_node( + config_buffer, + cb, + ffi.NULL, + ) + + return cls(ctx, config_buffer) + + def start_node(self, py_callback): + cb = self._make_cb(py_callback) + + ret = lib.logosdelivery_start_node( + self.ctx, + cb, + ffi.NULL, + ) + + return int(ret) + + @classmethod + def create_and_start(cls, config: dict, create_cb, start_cb): + node = cls.create_node(config, create_cb) + rc = node.start_node(start_cb) + return node, rc + + def stop_node(self, py_callback): + cb = self._make_cb(py_callback) + + ret = lib.logosdelivery_stop_node( + self.ctx, + cb, + ffi.NULL, + ) + + return int(ret) + + self._event_cb_handler = cb + + def destroy(self, py_callback): + cb = self._make_cb(py_callback) + + ret = lib.logosdelivery_destroy( + self.ctx, + cb, + ffi.NULL, + ) + + return int(ret) + + def stop_and_destroy(self, callback): + stop_rc = self.stop_node(callback) + if stop_rc != 0: + raise RuntimeError(f"Stop failed (ret={stop_rc})") + + destroy_rc = self.destroy(callback) + if destroy_rc != 0: + raise RuntimeError(f"Destroy failed (ret={destroy_rc})") + + return 0 + + def subscribe_content_topic(self, content_topic: str, py_callback): + cb = self._make_cb(py_callback) + + ret = lib.logosdelivery_subscribe( + self.ctx, + cb, + ffi.NULL, + content_topic.encode("utf-8"), + ) + + return int(ret) + + def unsubscribe_content_topic(self, content_topic: str, py_callback): + cb = self._make_cb(py_callback) + + ret = lib.logosdelivery_unsubscribe( + self.ctx, + cb, + ffi.NULL, + content_topic.encode("utf-8"), + ) + + return int(ret) - cb_handle = CallbackType(c_cb) - ctx = lib.logosdelivery_create_node( - cnfig_bytes, - cb_handle, - ffi.NULL, - ) - return NodeHandle(ctx, cb_handle) if __name__ == "__main__": config = { @@ -57,7 +172,7 @@ if __name__ == "__main__": "entryNodes": [ "/dns4/node-01.do-ams3.misc.logos-chat.status.im/tcp/30303/p2p/16Uiu2HAkxoqUTud5LUPQBRmkeL2xP4iKx2kaABYXomQRgmLUgf78" ], - "clusterId": 3, + "clusterId": 42, "autoShardingConfig": {"numShardsInCluster": 8}, }, "networkingConfig": { @@ -68,8 +183,21 @@ if __name__ == "__main__": } def cb(ret, msg): - print("ret:", ret) - print("msg:", msg) + print("ret:", ret, "msg:", msg) - h = logosdelivery_create_node(config, cb) - print("ctx:", h.ctx) + node = NodeWrapper.create_node(config, cb) + rc = node.start_node(cb) + print("start rc:", rc) + + topic = "/myapp/1/chat/proto" + rc = node.subscribe(topic, cb) + print("subscribe rc:", rc) + + rc = node.unsubscribe(topic, cb) + print("unsubscribe rc:", rc) + + rc = node.stop_node(cb) + print("stop rc:", rc) + + rc = node.destroy(cb) + print("destroy rc:", rc) \ No newline at end of file From 1f67f2ee5b93ae3a8b61e73ac9ba7edb2bd8f2f6 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Tue, 24 Feb 2026 06:44:14 +0100 Subject: [PATCH 05/11] Finalize wrappers --- waku/wrapper.py | 46 +++++++++++----------------------------------- 1 file changed, 11 insertions(+), 35 deletions(-) diff --git a/waku/wrapper.py b/waku/wrapper.py index f338942..a354320 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -163,41 +163,17 @@ class NodeWrapper: return int(ret) + def set_event_callback(self, py_callback): + def c_cb(ret, char_p, length, userData): + msg = ffi.buffer(char_p, length)[:] + py_callback(ret, msg) -if __name__ == "__main__": - config = { - "logLevel": "DEBUG", - "mode": "Core", - "protocolsConfig": { - "entryNodes": [ - "/dns4/node-01.do-ams3.misc.logos-chat.status.im/tcp/30303/p2p/16Uiu2HAkxoqUTud5LUPQBRmkeL2xP4iKx2kaABYXomQRgmLUgf78" - ], - "clusterId": 42, - "autoShardingConfig": {"numShardsInCluster": 8}, - }, - "networkingConfig": { - "listenIpv4": "0.0.0.0", - "p2pTcpPort": 60000, - "discv5UdpPort": 9000, - }, - } + cb = CallbackType(c_cb) - def cb(ret, msg): - print("ret:", ret, "msg:", msg) + lib.logosdelivery_set_event_callback( + self.ctx, + cb, + ffi.NULL, + ) - node = NodeWrapper.create_node(config, cb) - rc = node.start_node(cb) - print("start rc:", rc) - - topic = "/myapp/1/chat/proto" - rc = node.subscribe(topic, cb) - print("subscribe rc:", rc) - - rc = node.unsubscribe(topic, cb) - print("unsubscribe rc:", rc) - - rc = node.stop_node(cb) - print("stop rc:", rc) - - rc = node.destroy(cb) - print("destroy rc:", rc) \ No newline at end of file + self._event_cb_handler = cb From a42e4b7885f045735499d8d8caea1f891b30b66f Mon Sep 17 00:00:00 2001 From: AYAHASSAN287 <49167455+AYAHASSAN287@users.noreply.github.com> Date: Sat, 28 Feb 2026 21:20:50 +0100 Subject: [PATCH 06/11] Update waku/wrapper.py Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --- waku/wrapper.py | 1 - 1 file changed, 1 deletion(-) diff --git a/waku/wrapper.py b/waku/wrapper.py index a354320..8532c58 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -57,7 +57,6 @@ lib = ffi.dlopen(str(_repo_root / "lib" / "liblogosdelivery.so")) CallbackType = ffi.callback("void(int, const char*, size_t, void*)") - class NodeWrapper: def __init__(self, ctx, config_buffer): self.ctx = ctx From 1eba34701d988960a0db339c3262d0aa4906e749 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Sun, 1 Mar 2026 17:14:22 +0100 Subject: [PATCH 07/11] replace submodule in vendor --- .gitmodules | 6 +++--- vendor/logos-delivery | 1 + vendor/nwaku | 1 - 3 files changed, 4 insertions(+), 4 deletions(-) create mode 160000 vendor/logos-delivery delete mode 160000 vendor/nwaku diff --git a/.gitmodules b/.gitmodules index a520c21..56513bd 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ -[submodule "vendor/nwaku"] - path = vendor/nwaku - url = git@github.com:waku-org/nwaku.git +[submodule "vendor/logos-delivery"] + path = vendor/logos-delivery + url = git@github.com:logos-messaging/logos-delivery.git diff --git a/vendor/logos-delivery b/vendor/logos-delivery new file mode 160000 index 0000000..ba85873 --- /dev/null +++ b/vendor/logos-delivery @@ -0,0 +1 @@ +Subproject commit ba85873f03a1da6ab04287949849815fd97b7bfd diff --git a/vendor/nwaku b/vendor/nwaku deleted file mode 160000 index f1d14e9..0000000 --- a/vendor/nwaku +++ /dev/null @@ -1 +0,0 @@ -Subproject commit f1d14e9942fc44168d5755cd5a7f0123da8c5649 From 263e015bfc60145fb7952baa0faa63e77d3f91ce Mon Sep 17 00:00:00 2001 From: AYAHASSAN287 <49167455+AYAHASSAN287@users.noreply.github.com> Date: Mon, 2 Mar 2026 10:02:07 +0100 Subject: [PATCH 08/11] Update README.md --- README.md | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 003c2ca..baede2a 100644 --- a/README.md +++ b/README.md @@ -82,17 +82,15 @@ Then, follow the following steps from the root folder to rebuild the `libwaku.so` library: ```bash -cd vendor/nwaku +cd vendor/liblogosdelivery ``` ```bash -make libwaku -j8 +make liblogosdelivery -j8 ``` ```bash cd ../../ ``` ```bash -cp vendor/nwaku/build/libwaku.so lib/ +cp vendor/liblogosdelivery/build/liblogosdelivery.so lib/ ``` -Notice that the `libwaku.so` library is also distributed within -the `Py-Waku` package. From 221af4f84331211e4a62cdfee032aca377161437 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Mon, 2 Mar 2026 10:11:56 +0100 Subject: [PATCH 09/11] Make changes to allow callback assertion inside each API --- waku/wrapper.py | 216 +++++++++++++++++++++++++++++------------------- 1 file changed, 129 insertions(+), 87 deletions(-) diff --git a/waku/wrapper.py b/waku/wrapper.py index 8532c58..aaf5667 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -1,10 +1,15 @@ import json +import threading +import logging from cffi import FFI from pathlib import Path +logger = logging.getLogger(__name__) + ffi = FFI() -ffi.cdef(""" +ffi.cdef( + """ typedef void (*FFICallBack)(int callerRet, const char *msg, size_t len, void *userData); void *logosdelivery_create_node( @@ -50,129 +55,166 @@ int logosdelivery_unsubscribe( void *userData, const char *contentTopic ); -""") +""" +) _repo_root = Path(__file__).resolve().parents[1] lib = ffi.dlopen(str(_repo_root / "lib" / "liblogosdelivery.so")) CallbackType = ffi.callback("void(int, const char*, size_t, void*)") + +def _new_cb_state(): + return { + "done": threading.Event(), + "ret": None, + "msg": None, + } + + +def _wait_cb(state, op_name: str, timeout_s: float = 20.0): + finished = state["done"].wait(timeout_s) + if not finished: + raise TimeoutError(f"{op_name}: timeout waiting for callback after {timeout_s}s") + + ret = state["ret"] + msg = state["msg"] or b"" + + if ret is None: + raise RuntimeError(f"{op_name}: callback fired but ret is None") + + if ret != 0: + raise RuntimeError(f"{op_name}: failed (ret={ret}) msg={msg!r}") + + class NodeWrapper: - def __init__(self, ctx, config_buffer): + def __init__(self, ctx, config_buffer, event_cb_handler): self.ctx = ctx self._config_buffer = config_buffer - self._event_cb_handler = None + self._event_cb_handler = event_cb_handler @staticmethod - def _make_cb(py_callback): + def _make_cb(py_callback, state=None): def c_cb(ret, char_p, length, userData): msg = ffi.buffer(char_p, length)[:] - py_callback(ret, msg) + + if state is not None and not state["done"].is_set(): + state["ret"] = int(ret) + state["msg"] = msg + state["done"].set() + + if py_callback is not None: + py_callback(int(ret), msg) return CallbackType(c_cb) @classmethod - def create_node(cls, config: dict, py_callback): + def create_node( + cls, + config: dict, + create_cb=None, + event_cb=None, + *, + timeout_s: float = 20.0, + ): config_json = json.dumps(config, separators=(",", ":"), ensure_ascii=False) config_buffer = ffi.new("char[]", config_json.encode("utf-8")) - cb = cls._make_cb(py_callback) + state = _new_cb_state() + create_c_cb = cls._make_cb(create_cb, state) ctx = lib.logosdelivery_create_node( config_buffer, - cb, + create_c_cb, ffi.NULL, ) - return cls(ctx, config_buffer) + if ctx == ffi.NULL: + raise RuntimeError("create_node: ctx is NULL") - def start_node(self, py_callback): - cb = self._make_cb(py_callback) + _wait_cb(state, "create_node", timeout_s) - ret = lib.logosdelivery_start_node( - self.ctx, - cb, - ffi.NULL, - ) + event_cb_handler = None + if event_cb is not None: + event_cb_handler = cls._make_cb(event_cb, state=None) + lib.logosdelivery_set_event_callback( + ctx, + event_cb_handler, + ffi.NULL, + ) - return int(ret) + return cls(ctx, config_buffer, event_cb_handler) - @classmethod - def create_and_start(cls, config: dict, create_cb, start_cb): - node = cls.create_node(config, create_cb) - rc = node.start_node(start_cb) - return node, rc + def start_node(self, start_cb=None, *, timeout_s: float = 20.0): + state = _new_cb_state() + cb = self._make_cb(start_cb, state) - def stop_node(self, py_callback): - cb = self._make_cb(py_callback) - - ret = lib.logosdelivery_stop_node( - self.ctx, - cb, - ffi.NULL, - ) - - return int(ret) - - self._event_cb_handler = cb - - def destroy(self, py_callback): - cb = self._make_cb(py_callback) - - ret = lib.logosdelivery_destroy( - self.ctx, - cb, - ffi.NULL, - ) - - return int(ret) - - def stop_and_destroy(self, callback): - stop_rc = self.stop_node(callback) - if stop_rc != 0: - raise RuntimeError(f"Stop failed (ret={stop_rc})") - - destroy_rc = self.destroy(callback) - if destroy_rc != 0: - raise RuntimeError(f"Destroy failed (ret={destroy_rc})") + rc = int(lib.logosdelivery_start_node(self.ctx, cb, ffi.NULL)) + if rc != 0: + raise RuntimeError(f"start_node: immediate call failed (ret={rc})") + _wait_cb(state, "start_node", timeout_s) return 0 - - def subscribe_content_topic(self, content_topic: str, py_callback): - cb = self._make_cb(py_callback) - ret = lib.logosdelivery_subscribe( - self.ctx, - cb, - ffi.NULL, - content_topic.encode("utf-8"), + def stop_node(self, stop_cb=None, *, timeout_s: float = 20.0): + state = _new_cb_state() + cb = self._make_cb(stop_cb, state) + + rc = int(lib.logosdelivery_stop_node(self.ctx, cb, ffi.NULL)) + if rc != 0: + raise RuntimeError(f"stop_node: immediate call failed (ret={rc})") + + _wait_cb(state, "stop_node", timeout_s) + return 0 + + def destroy(self, destroy_cb=None, *, timeout_s: float = 20.0): + state = _new_cb_state() + cb = self._make_cb(destroy_cb, state) + + rc = int(lib.logosdelivery_destroy(self.ctx, cb, ffi.NULL)) + if rc != 0: + raise RuntimeError(f"destroy: immediate call failed (ret={rc})") + + _wait_cb(state, "destroy", timeout_s) + return 0 + + def stop_and_destroy(self, cb=None, *, timeout_s: float = 20.0): + self.stop_node(stop_cb=cb, timeout_s=timeout_s) + self.destroy(destroy_cb=cb, timeout_s=timeout_s) + return 0 + + def subscribe_content_topic(self, content_topic: str, subscribe_cb=None, *, timeout_s: float = 20.0): + state = _new_cb_state() + cb = self._make_cb(subscribe_cb, state) + + rc = int( + lib.logosdelivery_subscribe( + self.ctx, + cb, + ffi.NULL, + content_topic.encode("utf-8"), + ) ) + if rc != 0: + raise RuntimeError(f"subscribe_content_topic: immediate call failed (ret={rc})") - return int(ret) + _wait_cb(state, f"subscribe({content_topic})", timeout_s) + return 0 - def unsubscribe_content_topic(self, content_topic: str, py_callback): - cb = self._make_cb(py_callback) + def unsubscribe_content_topic(self, content_topic: str, unsubscribe_cb=None, *, timeout_s: float = 20.0): + state = _new_cb_state() + cb = self._make_cb(unsubscribe_cb, state) - ret = lib.logosdelivery_unsubscribe( - self.ctx, - cb, - ffi.NULL, - content_topic.encode("utf-8"), + rc = int( + lib.logosdelivery_unsubscribe( + self.ctx, + cb, + ffi.NULL, + content_topic.encode("utf-8"), + ) ) + if rc != 0: + raise RuntimeError(f"unsubscribe_content_topic: immediate call failed (ret={rc})") - return int(ret) - - def set_event_callback(self, py_callback): - def c_cb(ret, char_p, length, userData): - msg = ffi.buffer(char_p, length)[:] - py_callback(ret, msg) - - cb = CallbackType(c_cb) - - lib.logosdelivery_set_event_callback( - self.ctx, - cb, - ffi.NULL, - ) - - self._event_cb_handler = cb + _wait_cb(state, f"unsubscribe({content_topic})", timeout_s) + return 0 \ No newline at end of file From 2ab1500aad3697dcbd5dabc602cbe89dc5c9bb53 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Thu, 5 Mar 2026 20:34:12 +0100 Subject: [PATCH 10/11] Update wrapper with more short version --- waku/wrapper.py | 99 +++++++++++++++++++++++++++++-------------------- 1 file changed, 58 insertions(+), 41 deletions(-) diff --git a/waku/wrapper.py b/waku/wrapper.py index aaf5667..fb3e8b6 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -94,17 +94,22 @@ class NodeWrapper: self._event_cb_handler = event_cb_handler @staticmethod - def _make_cb(py_callback, state=None): + def _make_waiting_cb(state): def c_cb(ret, char_p, length, userData): - msg = ffi.buffer(char_p, length)[:] + msg = ffi.buffer(char_p, length)[:] if char_p != ffi.NULL else b"" - if state is not None and not state["done"].is_set(): + if not state["done"].is_set(): state["ret"] = int(ret) state["msg"] = msg state["done"].set() - if py_callback is not None: - py_callback(int(ret), msg) + return CallbackType(c_cb) + + @staticmethod + def _make_event_cb(py_callback): + def c_cb(ret, char_p, length, userData): + msg = ffi.buffer(char_p, length)[:] if char_p != ffi.NULL else b"" + py_callback(int(ret), msg) return CallbackType(c_cb) @@ -112,7 +117,6 @@ class NodeWrapper: def create_node( cls, config: dict, - create_cb=None, event_cb=None, *, timeout_s: float = 20.0, @@ -121,11 +125,11 @@ class NodeWrapper: config_buffer = ffi.new("char[]", config_json.encode("utf-8")) state = _new_cb_state() - create_c_cb = cls._make_cb(create_cb, state) + create_cb = cls._make_waiting_cb(state) ctx = lib.logosdelivery_create_node( config_buffer, - create_c_cb, + create_cb, ffi.NULL, ) @@ -136,7 +140,7 @@ class NodeWrapper: event_cb_handler = None if event_cb is not None: - event_cb_handler = cls._make_cb(event_cb, state=None) + event_cb_handler = cls._make_event_cb(event_cb) lib.logosdelivery_set_event_callback( ctx, event_cb_handler, @@ -145,55 +149,69 @@ class NodeWrapper: return cls(ctx, config_buffer, event_cb_handler) - def start_node(self, start_cb=None, *, timeout_s: float = 20.0): - state = _new_cb_state() - cb = self._make_cb(start_cb, state) + @classmethod + def create_and_start( + cls, + config: dict, + event_cb=None, + *, + timeout_s: float = 20.0, + ): + node = cls.create_node( + config=config, + event_cb=event_cb, + timeout_s=timeout_s, + ) + node.start_node(timeout_s=timeout_s) + return node - rc = int(lib.logosdelivery_start_node(self.ctx, cb, ffi.NULL)) + def start_node(self, *, timeout_s: float = 20.0): + state = _new_cb_state() + cb = self._make_waiting_cb(state) + + rc = lib.logosdelivery_start_node(self.ctx, cb, ffi.NULL) if rc != 0: raise RuntimeError(f"start_node: immediate call failed (ret={rc})") _wait_cb(state, "start_node", timeout_s) return 0 - def stop_node(self, stop_cb=None, *, timeout_s: float = 20.0): + def stop_node(self, *, timeout_s: float = 20.0): state = _new_cb_state() - cb = self._make_cb(stop_cb, state) + cb = self._make_waiting_cb(state) - rc = int(lib.logosdelivery_stop_node(self.ctx, cb, ffi.NULL)) + rc = lib.logosdelivery_stop_node(self.ctx, cb, ffi.NULL) if rc != 0: raise RuntimeError(f"stop_node: immediate call failed (ret={rc})") _wait_cb(state, "stop_node", timeout_s) return 0 - def destroy(self, destroy_cb=None, *, timeout_s: float = 20.0): + def destroy(self, *, timeout_s: float = 20.0): state = _new_cb_state() - cb = self._make_cb(destroy_cb, state) + cb = self._make_waiting_cb(state) - rc = int(lib.logosdelivery_destroy(self.ctx, cb, ffi.NULL)) + rc = lib.logosdelivery_destroy(self.ctx, cb, ffi.NULL) if rc != 0: raise RuntimeError(f"destroy: immediate call failed (ret={rc})") _wait_cb(state, "destroy", timeout_s) return 0 - def stop_and_destroy(self, cb=None, *, timeout_s: float = 20.0): - self.stop_node(stop_cb=cb, timeout_s=timeout_s) - self.destroy(destroy_cb=cb, timeout_s=timeout_s) + def stop_and_destroy(self, *, timeout_s: float = 20.0): + self.stop_node(timeout_s=timeout_s) + self.destroy(timeout_s=timeout_s) return 0 - def subscribe_content_topic(self, content_topic: str, subscribe_cb=None, *, timeout_s: float = 20.0): + def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): state = _new_cb_state() - cb = self._make_cb(subscribe_cb, state) + cb = self._make_waiting_cb(state) - rc = int( - lib.logosdelivery_subscribe( - self.ctx, - cb, - ffi.NULL, - content_topic.encode("utf-8"), - ) + rc = lib.logosdelivery_subscribe( + self.ctx, + cb, + ffi.NULL, + content_topic.encode("utf-8"), ) if rc != 0: raise RuntimeError(f"subscribe_content_topic: immediate call failed (ret={rc})") @@ -201,20 +219,19 @@ class NodeWrapper: _wait_cb(state, f"subscribe({content_topic})", timeout_s) return 0 - def unsubscribe_content_topic(self, content_topic: str, unsubscribe_cb=None, *, timeout_s: float = 20.0): + def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): state = _new_cb_state() - cb = self._make_cb(unsubscribe_cb, state) + cb = self._make_waiting_cb(state) - rc = int( - lib.logosdelivery_unsubscribe( - self.ctx, - cb, - ffi.NULL, - content_topic.encode("utf-8"), - ) + rc = lib.logosdelivery_unsubscribe( + self.ctx, + cb, + ffi.NULL, + content_topic.encode("utf-8"), ) if rc != 0: raise RuntimeError(f"unsubscribe_content_topic: immediate call failed (ret={rc})") _wait_cb(state, f"unsubscribe({content_topic})", timeout_s) - return 0 \ No newline at end of file + + return 0 From cb62f65f43214c7f70212e0a1a29532a0009ce60 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Mon, 9 Mar 2026 18:35:59 +0100 Subject: [PATCH 11/11] add resullt , ok , err --- waku/wrapper.py | 82 +++++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/waku/wrapper.py b/waku/wrapper.py index fb3e8b6..55bf93e 100644 --- a/waku/wrapper.py +++ b/waku/wrapper.py @@ -3,6 +3,7 @@ import threading import logging from cffi import FFI from pathlib import Path +from result import Result, Ok, Err logger = logging.getLogger(__name__) @@ -72,19 +73,21 @@ def _new_cb_state(): } -def _wait_cb(state, op_name: str, timeout_s: float = 20.0): +def _wait_cb(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]: finished = state["done"].wait(timeout_s) if not finished: - raise TimeoutError(f"{op_name}: timeout waiting for callback after {timeout_s}s") + return Err(f"{op_name}: timeout waiting for callback after {timeout_s}s") ret = state["ret"] msg = state["msg"] or b"" if ret is None: - raise RuntimeError(f"{op_name}: callback fired but ret is None") + return Err(f"{op_name}: callback fired but ret is None") if ret != 0: - raise RuntimeError(f"{op_name}: failed (ret={ret}) msg={msg!r}") + return Err(f"{op_name}: failed (ret={ret}) msg={msg!r}") + + return Ok(ret) class NodeWrapper: @@ -120,7 +123,7 @@ class NodeWrapper: event_cb=None, *, timeout_s: float = 20.0, - ): + ) -> Result["NodeWrapper", str]: config_json = json.dumps(config, separators=(",", ":"), ensure_ascii=False) config_buffer = ffi.new("char[]", config_json.encode("utf-8")) @@ -134,9 +137,11 @@ class NodeWrapper: ) if ctx == ffi.NULL: - raise RuntimeError("create_node: ctx is NULL") + return Err("create_node: ctx is NULL") - _wait_cb(state, "create_node", timeout_s) + wait_result = _wait_cb(state, "create_node", timeout_s) + if wait_result.is_err(): + return Err(wait_result.err()) event_cb_handler = None if event_cb is not None: @@ -147,7 +152,7 @@ class NodeWrapper: ffi.NULL, ) - return cls(ctx, config_buffer, event_cb_handler) + return Ok(cls(ctx, config_buffer, event_cb_handler)) @classmethod def create_and_start( @@ -156,54 +161,60 @@ class NodeWrapper: event_cb=None, *, timeout_s: float = 20.0, - ): - node = cls.create_node( + ) -> Result["NodeWrapper", str]: + node_result = cls.create_node( config=config, event_cb=event_cb, timeout_s=timeout_s, ) - node.start_node(timeout_s=timeout_s) - return node + if node_result.is_err(): + return Err(node_result.err()) - def start_node(self, *, timeout_s: float = 20.0): + node = node_result.ok_value + start_result = node.start_node(timeout_s=timeout_s) + if start_result.is_err(): + return Err(start_result.err()) + + return Ok(node) + + def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) rc = lib.logosdelivery_start_node(self.ctx, cb, ffi.NULL) if rc != 0: - raise RuntimeError(f"start_node: immediate call failed (ret={rc})") + return Err(f"start_node: immediate call failed (ret={rc})") - _wait_cb(state, "start_node", timeout_s) - return 0 + return _wait_cb(state, "start_node", timeout_s) - def stop_node(self, *, timeout_s: float = 20.0): + def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) rc = lib.logosdelivery_stop_node(self.ctx, cb, ffi.NULL) if rc != 0: - raise RuntimeError(f"stop_node: immediate call failed (ret={rc})") + return Err(f"stop_node: immediate call failed (ret={rc})") - _wait_cb(state, "stop_node", timeout_s) - return 0 + return _wait_cb(state, "stop_node", timeout_s) - def destroy(self, *, timeout_s: float = 20.0): + def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) rc = lib.logosdelivery_destroy(self.ctx, cb, ffi.NULL) if rc != 0: - raise RuntimeError(f"destroy: immediate call failed (ret={rc})") + return Err(f"destroy: immediate call failed (ret={rc})") - _wait_cb(state, "destroy", timeout_s) - return 0 + return _wait_cb(state, "destroy", timeout_s) - def stop_and_destroy(self, *, timeout_s: float = 20.0): - self.stop_node(timeout_s=timeout_s) - self.destroy(timeout_s=timeout_s) - return 0 + def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: + stop_result = self.stop_node(timeout_s=timeout_s) + if stop_result.is_err(): + return Err(stop_result.err()) - def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): + return self.destroy(timeout_s=timeout_s) + + def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) @@ -214,12 +225,11 @@ class NodeWrapper: content_topic.encode("utf-8"), ) if rc != 0: - raise RuntimeError(f"subscribe_content_topic: immediate call failed (ret={rc})") + return Err(f"subscribe_content_topic: immediate call failed (ret={rc})") - _wait_cb(state, f"subscribe({content_topic})", timeout_s) - return 0 + return _wait_cb(state, f"subscribe({content_topic})", timeout_s) - def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): + def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: state = _new_cb_state() cb = self._make_waiting_cb(state) @@ -230,8 +240,6 @@ class NodeWrapper: content_topic.encode("utf-8"), ) if rc != 0: - raise RuntimeError(f"unsubscribe_content_topic: immediate call failed (ret={rc})") + return Err(f"unsubscribe_content_topic: immediate call failed (ret={rc})") - _wait_cb(state, f"unsubscribe({content_topic})", timeout_s) - - return 0 + return _wait_cb(state, f"unsubscribe({content_topic})", timeout_s) \ No newline at end of file