Merge pull request #4 from logos-messaging/wrappers_part2

Adding wrapper for new APIs
This commit is contained in:
AYAHASSAN287 2026-04-07 17:16:07 +02:00 committed by GitHub
commit 1cae276b4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 172 additions and 24 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@ venv/
dist/
waku.egg-info/
waku/__pycache__/

@ -1 +0,0 @@
Subproject commit ba85873f03a1da6ab04287949849815fd97b7bfd

1
vendor/nwaku vendored Submodule

@ -0,0 +1 @@
Subproject commit f1d14e9942fc44168d5755cd5a7f0123da8c5649

View File

@ -1,12 +1,10 @@
import json
import threading
import logging
from cffi import FFI
from pathlib import Path
from result import Result, Ok, Err
logger = logging.getLogger(__name__)
ffi = FFI()
ffi.cdef(
@ -56,6 +54,32 @@ int logosdelivery_unsubscribe(
void *userData,
const char *contentTopic
);
int logosdelivery_send(
void *ctx,
FFICallBack callback,
void *userData,
const char *messageJson
);
int logosdelivery_get_available_node_info_ids(
void *ctx,
FFICallBack callback,
void *userData
);
int logosdelivery_get_node_info(
void *ctx,
FFICallBack callback,
void *userData,
const char *nodeInfoId
);
int logosdelivery_get_available_configs(
void *ctx,
FFICallBack callback,
void *userData
);
"""
)
@ -69,25 +93,35 @@ def _new_cb_state():
return {
"done": threading.Event(),
"ret": None,
"msg": None,
"msg": b"",
}
def _wait_cb(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]:
finished = state["done"].wait(timeout_s)
if not finished:
return Err(f"{op_name}: timeout waiting for callback after {timeout_s}s")
def _wait_cb_raw(
state,
op_name: str,
timeout_s: float = 20.0,
) -> Result[tuple[int, bytes], str]:
ok = state["done"].wait(timeout_s)
if not ok:
return Err(f"{op_name}: timeout after {timeout_s}s")
ret = state["ret"]
msg = state["msg"] or b""
if state["ret"] is None:
return Err(f"{op_name}: callback ret is None")
if ret is None:
return Err(f"{op_name}: callback fired but ret is None")
return Ok((state["ret"], state["msg"]))
if ret != 0:
return Err(f"{op_name}: failed (ret={ret}) msg={msg!r}")
return Ok(ret)
def _wait_cb_ok(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]:
wait_result = _wait_cb_raw(state, op_name, timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
return Err(f"callback failed in _wait_cb_ok: {op_name} (ret={cb_ret}) msg={cb_msg!r}")
return Ok(cb_ret)
class NodeWrapper:
@ -128,18 +162,18 @@ class NodeWrapper:
config_buffer = ffi.new("char[]", config_json.encode("utf-8"))
state = _new_cb_state()
create_cb = cls._make_waiting_cb(state)
cb = cls._make_waiting_cb(state)
ctx = lib.logosdelivery_create_node(
config_buffer,
create_cb,
cb,
ffi.NULL,
)
if ctx == ffi.NULL:
return Err("create_node: ctx is NULL")
wait_result = _wait_cb(state, "create_node", timeout_s)
wait_result = _wait_cb_ok(state, "create_node", timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
@ -171,6 +205,7 @@ class NodeWrapper:
return Err(node_result.err())
node = node_result.ok_value
start_result = node.start_node(timeout_s=timeout_s)
if start_result.is_err():
return Err(start_result.err())
@ -185,7 +220,7 @@ class NodeWrapper:
if rc != 0:
return Err(f"start_node: immediate call failed (ret={rc})")
return _wait_cb(state, "start_node", timeout_s)
return _wait_cb_ok(state, "start_node", timeout_s)
def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
state = _new_cb_state()
@ -195,7 +230,7 @@ class NodeWrapper:
if rc != 0:
return Err(f"stop_node: immediate call failed (ret={rc})")
return _wait_cb(state, "stop_node", timeout_s)
return _wait_cb_ok(state, "stop_node", timeout_s)
def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
state = _new_cb_state()
@ -205,7 +240,12 @@ class NodeWrapper:
if rc != 0:
return Err(f"destroy: immediate call failed (ret={rc})")
return _wait_cb(state, "destroy", timeout_s)
wait_result = _wait_cb_ok(state, "destroy", timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
self.ctx = ffi.NULL
return wait_result
def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
stop_result = self.stop_node(timeout_s=timeout_s)
@ -227,7 +267,7 @@ class NodeWrapper:
if rc != 0:
return Err(f"subscribe_content_topic: immediate call failed (ret={rc})")
return _wait_cb(state, f"subscribe({content_topic})", timeout_s)
return _wait_cb_ok(state, f"subscribe({content_topic})", timeout_s)
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
state = _new_cb_state()
@ -242,4 +282,111 @@ class NodeWrapper:
if rc != 0:
return Err(f"unsubscribe_content_topic: immediate call failed (ret={rc})")
return _wait_cb(state, f"unsubscribe({content_topic})", timeout_s)
return _wait_cb_ok(state, f"unsubscribe({content_topic})", timeout_s)
def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]:
state = _new_cb_state()
cb = self._make_waiting_cb(state)
message_json = json.dumps(message, separators=(",", ":"), ensure_ascii=False)
rc = lib.logosdelivery_send(
self.ctx,
cb,
ffi.NULL,
message_json.encode("utf-8"),
)
if rc != 0:
return Err(f"send_message: immediate call failed (ret={rc})")
wait_result = _wait_cb_raw(state, "send_message", timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
return Err(f"send_message: callback failed (ret={cb_ret}) msg={cb_msg!r}")
request_id = cb_msg.decode("utf-8") if cb_msg else ""
return Ok(request_id)
def get_available_node_info_ids(self, *, timeout_s: float = 20.0) -> Result[list[str], str]:
state = _new_cb_state()
cb = self._make_waiting_cb(state)
rc = lib.logosdelivery_get_available_node_info_ids(self.ctx, cb, ffi.NULL)
if rc != 0:
return Err(f"get_available_node_info_ids: immediate call failed (ret={rc})")
wait_result = _wait_cb_raw(state, "get_available_node_info_ids", timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
return Err(f"get_available_node_info_ids: callback failed (ret={cb_ret})")
if not cb_msg:
return Err("get_available_node_info_ids: empty response")
try:
return Ok(json.loads(cb_msg.decode("utf-8").strip().lstrip("@")))
except Exception as e:
return Err(f"get_available_node_info_ids: invalid response: {e}")
def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]:
state = _new_cb_state()
cb = self._make_waiting_cb(state)
rc = lib.logosdelivery_get_node_info(
self.ctx,
cb,
ffi.NULL,
node_info_id.encode("utf-8"),
)
if rc != 0:
return Err(f"get_node_info: immediate call failed (ret={rc})")
wait_result = _wait_cb_raw(state, "get_node_info", timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
return Err(f"get_node_info: callback failed (ret={cb_ret}) msg={cb_msg!r}")
if not cb_msg:
return Err("get_node_info: empty response")
try:
result = json.loads(cb_msg.decode("utf-8"))
except Exception as e:
return Err(f"get_node_info: invalid json: {e}")
return Ok(result)
def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]:
state = _new_cb_state()
cb = self._make_waiting_cb(state)
rc = lib.logosdelivery_get_available_configs(self.ctx, cb, ffi.NULL)
if rc != 0:
return Err(f"get_available_configs: immediate call failed (ret={rc})")
wait_result = _wait_cb_raw(state, "get_available_configs", timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
return Err(f"get_available_configs: callback failed (ret={cb_ret}) msg={cb_msg!r}")
if not cb_msg:
return Err("get_available_configs: empty response")
try:
result = json.loads(cb_msg.decode("utf-8"))
except Exception as e:
return Err(f"get_available_configs: invalid json: {e}")
return Ok(result)