393 lines
11 KiB
Python
Raw Normal View History

import json
import threading
2026-03-29 15:50:12 +02:00
2026-02-19 13:22:44 +01:00
from cffi import FFI
from pathlib import Path
2026-03-09 18:35:59 +01:00
from result import Result, Ok, Err
2026-03-29 23:55:40 +02:00
2026-02-19 13:22:44 +01:00
ffi = FFI()
ffi.cdef(
"""
typedef void (*FFICallBack)(int callerRet, const char *msg, size_t len, void *userData);
2026-02-19 13:22:44 +01:00
void *logosdelivery_create_node(
const char *configJson,
FFICallBack callback,
void *userData
);
int logosdelivery_start_node(
void *ctx,
FFICallBack callback,
void *userData
);
2026-02-22 19:55:07 +01:00
int logosdelivery_stop_node(
void *ctx,
FFICallBack callback,
void *userData
);
void logosdelivery_set_event_callback(
void *ctx,
FFICallBack callback,
void *userData
);
int logosdelivery_destroy(
void *ctx,
FFICallBack callback,
void *userData
);
int logosdelivery_subscribe(
void *ctx,
FFICallBack callback,
void *userData,
const char *contentTopic
);
int logosdelivery_unsubscribe(
void *ctx,
FFICallBack callback,
void *userData,
const char *contentTopic
);
2026-03-09 18:48:25 +01:00
int logosdelivery_send(
void *ctx,
FFICallBack callback,
void *userData,
const char *messageJson
);
2026-03-29 23:55:40 +02:00
int logosdelivery_get_available_node_info_ids(
void *ctx,
FFICallBack callback,
void *userData
);
int logosdelivery_get_node_info(
void *ctx,
FFICallBack callback,
void *userData,
const char *nodeInfoId
);
int logosdelivery_get_available_configs(
void *ctx,
FFICallBack callback,
void *userData
);
"""
)
2026-02-19 13:22:44 +01:00
_repo_root = Path(__file__).resolve().parents[1]
lib = ffi.dlopen(str(_repo_root / "lib" / "liblogosdelivery.so"))
2026-02-19 13:22:44 +01:00
CallbackType = ffi.callback("void(int, const char*, size_t, void*)")
def _new_cb_state():
return {
"done": threading.Event(),
"ret": None,
2026-03-30 11:51:34 +02:00
"msg": b"",
}
2026-04-07 17:03:35 +02:00
def _wait_cb_raw(
state,
op_name: str,
timeout_s: float = 20.0,
) -> Result[tuple[int, bytes], str]:
2026-03-30 11:51:34 +02:00
ok = state["done"].wait(timeout_s)
if not ok:
return Err(f"{op_name}: timeout after {timeout_s}s")
2026-03-30 11:51:34 +02:00
if state["ret"] is None:
return Err(f"{op_name}: callback ret is None")
2026-03-30 11:51:34 +02:00
return Ok((state["ret"], state["msg"]))
2026-03-29 23:55:40 +02:00
2026-03-30 11:51:34 +02:00
def _wait_cb_ok(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]:
2026-03-29 23:55:40 +02:00
wait_result = _wait_cb_raw(state, op_name, timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
2026-03-30 11:51:34 +02:00
cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
return Err(f"callback failed in _wait_cb_ok: {op_name} (ret={cb_ret}) msg={cb_msg!r}")
2026-03-09 18:35:59 +01:00
2026-03-30 11:51:34 +02:00
return Ok(cb_ret)
2026-02-22 19:55:07 +01:00
class NodeWrapper:
def __init__(self, ctx, config_buffer, event_cb_handler):
2026-02-22 19:55:07 +01:00
self.ctx = ctx
self._config_buffer = config_buffer
self._event_cb_handler = event_cb_handler
2026-02-22 19:55:07 +01:00
@staticmethod
2026-03-05 20:34:12 +01:00
def _make_waiting_cb(state):
2026-02-22 19:55:07 +01:00
def c_cb(ret, char_p, length, userData):
2026-03-05 20:34:12 +01:00
msg = ffi.buffer(char_p, length)[:] if char_p != ffi.NULL else b""
2026-03-05 20:34:12 +01:00
if not state["done"].is_set():
state["ret"] = int(ret)
state["msg"] = msg
state["done"].set()
2026-03-05 20:34:12 +01:00
return CallbackType(c_cb)
@staticmethod
def _make_event_cb(py_callback):
def c_cb(ret, char_p, length, userData):
msg = ffi.buffer(char_p, length)[:] if char_p != ffi.NULL else b""
py_callback(int(ret), msg)
2026-02-22 19:55:07 +01:00
return CallbackType(c_cb)
@classmethod
def create_node(
cls,
config: dict,
event_cb=None,
*,
timeout_s: float = 20.0,
2026-03-09 18:35:59 +01:00
) -> Result["NodeWrapper", str]:
2026-02-22 19:55:07 +01:00
config_json = json.dumps(config, separators=(",", ":"), ensure_ascii=False)
config_buffer = ffi.new("char[]", config_json.encode("utf-8"))
state = _new_cb_state()
2026-03-30 11:51:34 +02:00
cb = cls._make_waiting_cb(state)
2026-02-22 19:55:07 +01:00
ctx = lib.logosdelivery_create_node(
config_buffer,
2026-03-30 11:51:34 +02:00
cb,
2026-02-22 19:55:07 +01:00
ffi.NULL,
)
if ctx == ffi.NULL:
2026-03-09 18:35:59 +01:00
return Err("create_node: ctx is NULL")
2026-02-22 19:55:07 +01:00
2026-03-30 11:51:34 +02:00
wait_result = _wait_cb_ok(state, "create_node", timeout_s)
2026-03-09 18:35:59 +01:00
if wait_result.is_err():
return Err(wait_result.err())
2026-02-22 19:55:07 +01:00
event_cb_handler = None
if event_cb is not None:
2026-03-05 20:34:12 +01:00
event_cb_handler = cls._make_event_cb(event_cb)
lib.logosdelivery_set_event_callback(
ctx,
event_cb_handler,
ffi.NULL,
)
2026-02-22 19:55:07 +01:00
2026-03-09 18:35:59 +01:00
return Ok(cls(ctx, config_buffer, event_cb_handler))
2026-02-22 19:55:07 +01:00
2026-03-05 20:34:12 +01:00
@classmethod
def create_and_start(
cls,
config: dict,
event_cb=None,
*,
timeout_s: float = 20.0,
2026-03-09 18:35:59 +01:00
) -> Result["NodeWrapper", str]:
node_result = cls.create_node(
2026-03-05 20:34:12 +01:00
config=config,
event_cb=event_cb,
timeout_s=timeout_s,
)
2026-03-09 18:35:59 +01:00
if node_result.is_err():
return Err(node_result.err())
node = node_result.ok_value
2026-03-30 11:51:34 +02:00
2026-03-09 18:35:59 +01:00
start_result = node.start_node(timeout_s=timeout_s)
if start_result.is_err():
return Err(start_result.err())
return Ok(node)
2026-03-05 20:34:12 +01:00
2026-03-09 18:35:59 +01:00
def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
state = _new_cb_state()
2026-03-05 20:34:12 +01:00
cb = self._make_waiting_cb(state)
2026-02-22 19:55:07 +01:00
2026-03-05 20:34:12 +01:00
rc = lib.logosdelivery_start_node(self.ctx, cb, ffi.NULL)
if rc != 0:
2026-03-09 18:35:59 +01:00
return Err(f"start_node: immediate call failed (ret={rc})")
2026-02-22 19:55:07 +01:00
2026-03-30 11:51:34 +02:00
return _wait_cb_ok(state, "start_node", timeout_s)
2026-02-22 19:55:07 +01:00
2026-03-09 18:35:59 +01:00
def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
state = _new_cb_state()
2026-03-05 20:34:12 +01:00
cb = self._make_waiting_cb(state)
2026-02-22 19:55:07 +01:00
2026-03-05 20:34:12 +01:00
rc = lib.logosdelivery_stop_node(self.ctx, cb, ffi.NULL)
if rc != 0:
2026-03-09 18:35:59 +01:00
return Err(f"stop_node: immediate call failed (ret={rc})")
2026-02-22 19:55:07 +01:00
2026-03-30 11:51:34 +02:00
return _wait_cb_ok(state, "stop_node", timeout_s)
2026-02-22 19:55:07 +01:00
2026-03-09 18:35:59 +01:00
def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
state = _new_cb_state()
2026-03-05 20:34:12 +01:00
cb = self._make_waiting_cb(state)
2026-02-22 19:55:07 +01:00
2026-03-05 20:34:12 +01:00
rc = lib.logosdelivery_destroy(self.ctx, cb, ffi.NULL)
if rc != 0:
2026-03-09 18:35:59 +01:00
return Err(f"destroy: immediate call failed (ret={rc})")
2026-02-22 19:55:07 +01:00
2026-03-30 11:51:34 +02:00
wait_result = _wait_cb_ok(state, "destroy", timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
self.ctx = ffi.NULL
return wait_result
2026-02-22 19:55:07 +01:00
2026-03-09 18:35:59 +01:00
def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
stop_result = self.stop_node(timeout_s=timeout_s)
if stop_result.is_err():
return Err(stop_result.err())
2026-02-22 19:55:07 +01:00
2026-03-09 18:35:59 +01:00
return self.destroy(timeout_s=timeout_s)
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
state = _new_cb_state()
2026-03-05 20:34:12 +01:00
cb = self._make_waiting_cb(state)
2026-03-05 20:34:12 +01:00
rc = lib.logosdelivery_subscribe(
self.ctx,
cb,
ffi.NULL,
content_topic.encode("utf-8"),
2026-02-22 19:55:07 +01:00
)
if rc != 0:
2026-03-09 18:35:59 +01:00
return Err(f"subscribe_content_topic: immediate call failed (ret={rc})")
2026-02-22 19:55:07 +01:00
2026-03-30 11:51:34 +02:00
return _wait_cb_ok(state, f"subscribe({content_topic})", timeout_s)
2026-02-24 06:44:14 +01:00
2026-03-09 18:35:59 +01:00
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
state = _new_cb_state()
2026-03-05 20:34:12 +01:00
cb = self._make_waiting_cb(state)
2026-03-05 20:34:12 +01:00
rc = lib.logosdelivery_unsubscribe(
self.ctx,
cb,
ffi.NULL,
content_topic.encode("utf-8"),
2026-02-24 06:44:14 +01:00
)
if rc != 0:
2026-03-09 18:35:59 +01:00
return Err(f"unsubscribe_content_topic: immediate call failed (ret={rc})")
2026-03-05 20:34:12 +01:00
2026-03-30 11:51:34 +02:00
return _wait_cb_ok(state, f"unsubscribe({content_topic})", timeout_s)
2026-03-09 18:48:25 +01:00
2026-03-29 23:55:40 +02:00
def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]:
2026-03-09 18:48:25 +01:00
state = _new_cb_state()
cb = self._make_waiting_cb(state)
message_json = json.dumps(message, separators=(",", ":"), ensure_ascii=False)
2026-03-29 23:55:40 +02:00
rc = lib.logosdelivery_send(
2026-03-09 18:48:25 +01:00
self.ctx,
cb,
ffi.NULL,
message_json.encode("utf-8"),
)
2026-03-30 11:51:34 +02:00
if rc != 0:
2026-03-29 23:55:40 +02:00
return Err(f"send_message: immediate call failed (ret={rc})")
2026-03-29 15:50:12 +02:00
2026-03-29 23:55:40 +02:00
wait_result = _wait_cb_raw(state, "send_message", timeout_s)
2026-03-29 15:50:12 +02:00
if wait_result.is_err():
return Err(wait_result.err())
2026-03-29 23:55:40 +02:00
cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
2026-03-30 11:51:34 +02:00
return Err(f"send_message: callback failed (ret={cb_ret}) msg={cb_msg!r}")
2026-03-29 23:55:40 +02:00
request_id = cb_msg.decode("utf-8") if cb_msg else ""
2026-03-29 15:50:12 +02:00
return Ok(request_id)
2026-04-02 16:58:51 +02:00
def get_available_node_info_ids(self, *, timeout_s: float = 20.0) -> Result[list[str], str]:
2026-03-29 23:55:40 +02:00
state = _new_cb_state()
cb = self._make_waiting_cb(state)
2026-04-02 17:04:18 +02:00
rc = lib.logosdelivery_get_available_node_info_ids(self.ctx, cb, ffi.NULL)
2026-04-02 16:58:51 +02:00
if rc != 0:
2026-04-02 17:08:47 +02:00
return Err(f"get_available_node_info_ids: immediate call failed (ret={rc})")
2026-03-29 23:55:40 +02:00
wait_result = _wait_cb_raw(state, "get_available_node_info_ids", timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
2026-03-30 11:51:34 +02:00
cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
2026-04-02 17:08:47 +02:00
return Err(f"get_available_node_info_ids: callback failed (ret={cb_ret})")
if not cb_msg:
return Err("get_available_node_info_ids: empty response")
2026-03-30 11:51:34 +02:00
2026-04-02 17:08:47 +02:00
try:
return Ok(json.loads(cb_msg.decode("utf-8").strip().lstrip("@")))
except Exception as e:
return Err(f"get_available_node_info_ids: invalid response: {e}")
2026-03-29 23:55:40 +02:00
def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]:
state = _new_cb_state()
cb = self._make_waiting_cb(state)
rc = lib.logosdelivery_get_node_info(
self.ctx,
cb,
ffi.NULL,
node_info_id.encode("utf-8"),
)
2026-03-30 11:51:34 +02:00
if rc != 0:
return Err(f"get_node_info: immediate call failed (ret={rc})")
2026-03-29 23:55:40 +02:00
2026-03-30 11:51:34 +02:00
wait_result = _wait_cb_raw(state, "get_node_info", timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
2026-03-29 23:55:40 +02:00
2026-03-30 11:51:34 +02:00
cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
return Err(f"get_node_info: callback failed (ret={cb_ret}) msg={cb_msg!r}")
2026-03-29 23:55:40 +02:00
2026-03-30 11:51:34 +02:00
if not cb_msg:
return Err("get_node_info: empty response")
2026-03-29 23:55:40 +02:00
try:
2026-03-30 11:51:34 +02:00
result = json.loads(cb_msg.decode("utf-8"))
except Exception as e:
return Err(f"get_node_info: invalid json: {e}")
2026-03-29 23:55:40 +02:00
2026-03-30 11:51:34 +02:00
return Ok(result)
2026-03-29 23:55:40 +02:00
2026-03-30 11:51:34 +02:00
def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]:
2026-03-29 23:55:40 +02:00
state = _new_cb_state()
cb = self._make_waiting_cb(state)
rc = lib.logosdelivery_get_available_configs(self.ctx, cb, ffi.NULL)
if rc != 0:
2026-03-30 11:51:34 +02:00
return Err(f"get_available_configs: immediate call failed (ret={rc})")
2026-03-29 23:55:40 +02:00
wait_result = _wait_cb_raw(state, "get_available_configs", timeout_s)
2026-03-29 23:55:40 +02:00
if wait_result.is_err():
return Err(wait_result.err())
cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
2026-03-30 11:51:34 +02:00
return Err(f"get_available_configs: callback failed (ret={cb_ret}) msg={cb_msg!r}")
2026-03-29 15:50:12 +02:00
2026-03-30 11:51:34 +02:00
if not cb_msg:
return Err("get_available_configs: empty response")
2026-03-29 15:50:12 +02:00
2026-03-30 11:51:34 +02:00
try:
result = json.loads(cb_msg.decode("utf-8"))
except Exception as e:
return Err(f"get_available_configs: invalid json: {e}")
2026-03-29 23:55:40 +02:00
2026-04-02 16:58:51 +02:00
return Ok(result)