clean version of wrappers

This commit is contained in:
Aya Hassan 2026-03-30 11:51:34 +02:00
parent 17e526c4a1
commit 2b448e2f0d

View File

@ -1,14 +1,10 @@
import json import json
import threading import threading
import logging
from time import sleep
from cffi import FFI from cffi import FFI
from pathlib import Path from pathlib import Path
from result import Result, Ok, Err from result import Result, Ok, Err
logger = logging.getLogger(__name__)
ffi = FFI() ffi = FFI()
ffi.cdef( ffi.cdef(
@ -97,34 +93,31 @@ def _new_cb_state():
return { return {
"done": threading.Event(), "done": threading.Event(),
"ret": None, "ret": None,
"msg": None, "msg": b"",
} }
def _wait_cb_raw(state, op_name: str, timeout_s: float = 20.0): def _wait_cb_raw(state, op_name: str, timeout_s: float = 20.0):
finished = state["done"].wait(timeout_s) ok = state["done"].wait(timeout_s)
if not finished: if not ok:
return Err(f"{op_name}: timeout waiting for callback after {timeout_s}s") return Err(f"{op_name}: timeout after {timeout_s}s")
ret = state["ret"] if state["ret"] is None:
msg = state["msg"] or b"" return Err(f"{op_name}: callback ret is None")
if ret is None: return Ok((state["ret"], state["msg"]))
return Err(f"{op_name}: callback fired but ret is None")
return Ok((ret, msg))
def _wait_cb_status(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]: def _wait_cb_ok(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]:
wait_result = _wait_cb_raw(state, op_name, timeout_s) wait_result = _wait_cb_raw(state, op_name, timeout_s)
if wait_result.is_err(): if wait_result.is_err():
return Err(wait_result.err()) return Err(wait_result.err())
ret, msg = wait_result.ok_value cb_ret, cb_msg = wait_result.ok_value
if ret != 0: if cb_ret != 0:
return Err(f"{op_name}: failed (ret={ret}) msg={msg!r}") return Err(f"{op_name}: callback failed (ret={cb_ret}) msg={cb_msg!r}")
return Ok(ret) return Ok(cb_ret)
class NodeWrapper: class NodeWrapper:
@ -165,18 +158,18 @@ class NodeWrapper:
config_buffer = ffi.new("char[]", config_json.encode("utf-8")) config_buffer = ffi.new("char[]", config_json.encode("utf-8"))
state = _new_cb_state() state = _new_cb_state()
create_cb = cls._make_waiting_cb(state) cb = cls._make_waiting_cb(state)
ctx = lib.logosdelivery_create_node( ctx = lib.logosdelivery_create_node(
config_buffer, config_buffer,
create_cb, cb,
ffi.NULL, ffi.NULL,
) )
if ctx == ffi.NULL: if ctx == ffi.NULL:
return Err("create_node: ctx is NULL") return Err("create_node: ctx is NULL")
wait_result = _wait_cb_status(state, "create_node", timeout_s) wait_result = _wait_cb_ok(state, "create_node", timeout_s)
if wait_result.is_err(): if wait_result.is_err():
return Err(wait_result.err()) return Err(wait_result.err())
@ -208,6 +201,7 @@ class NodeWrapper:
return Err(node_result.err()) return Err(node_result.err())
node = node_result.ok_value node = node_result.ok_value
start_result = node.start_node(timeout_s=timeout_s) start_result = node.start_node(timeout_s=timeout_s)
if start_result.is_err(): if start_result.is_err():
return Err(start_result.err()) return Err(start_result.err())
@ -222,7 +216,7 @@ class NodeWrapper:
if rc != 0: if rc != 0:
return Err(f"start_node: immediate call failed (ret={rc})") return Err(f"start_node: immediate call failed (ret={rc})")
return _wait_cb_status(state, "start_node", timeout_s) return _wait_cb_ok(state, "start_node", timeout_s)
def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
state = _new_cb_state() state = _new_cb_state()
@ -232,7 +226,7 @@ class NodeWrapper:
if rc != 0: if rc != 0:
return Err(f"stop_node: immediate call failed (ret={rc})") return Err(f"stop_node: immediate call failed (ret={rc})")
return _wait_cb_status(state, "stop_node", timeout_s) return _wait_cb_ok(state, "stop_node", timeout_s)
def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
state = _new_cb_state() state = _new_cb_state()
@ -242,7 +236,12 @@ class NodeWrapper:
if rc != 0: if rc != 0:
return Err(f"destroy: immediate call failed (ret={rc})") return Err(f"destroy: immediate call failed (ret={rc})")
return _wait_cb_status(state, "destroy", timeout_s) wait_result = _wait_cb_ok(state, "destroy", timeout_s)
if wait_result.is_err():
return Err(wait_result.err())
self.ctx = ffi.NULL
return wait_result
def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
stop_result = self.stop_node(timeout_s=timeout_s) stop_result = self.stop_node(timeout_s=timeout_s)
@ -264,7 +263,7 @@ class NodeWrapper:
if rc != 0: if rc != 0:
return Err(f"subscribe_content_topic: immediate call failed (ret={rc})") return Err(f"subscribe_content_topic: immediate call failed (ret={rc})")
return _wait_cb_status(state, f"subscribe({content_topic})", timeout_s) return _wait_cb_ok(state, f"subscribe({content_topic})", timeout_s)
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
state = _new_cb_state() state = _new_cb_state()
@ -279,7 +278,7 @@ class NodeWrapper:
if rc != 0: if rc != 0:
return Err(f"unsubscribe_content_topic: immediate call failed (ret={rc})") return Err(f"unsubscribe_content_topic: immediate call failed (ret={rc})")
return _wait_cb_status(state, f"unsubscribe({content_topic})", timeout_s) return _wait_cb_ok(state, f"unsubscribe({content_topic})", timeout_s)
def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]: def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]:
state = _new_cb_state() state = _new_cb_state()
@ -293,8 +292,7 @@ class NodeWrapper:
ffi.NULL, ffi.NULL,
message_json.encode("utf-8"), message_json.encode("utf-8"),
) )
if rc != 0:
if rc < 0:
return Err(f"send_message: immediate call failed (ret={rc})") return Err(f"send_message: immediate call failed (ret={rc})")
wait_result = _wait_cb_raw(state, "send_message", timeout_s) wait_result = _wait_cb_raw(state, "send_message", timeout_s)
@ -303,7 +301,7 @@ class NodeWrapper:
cb_ret, cb_msg = wait_result.ok_value cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0: if cb_ret != 0:
return Err(f"send_message: failed (ret={cb_ret}) msg={cb_msg!r}") return Err(f"send_message: callback failed (ret={cb_ret}) msg={cb_msg!r}")
request_id = cb_msg.decode("utf-8") if cb_msg else "" request_id = cb_msg.decode("utf-8") if cb_msg else ""
return Ok(request_id) return Ok(request_id)
@ -312,20 +310,23 @@ class NodeWrapper:
state = _new_cb_state() state = _new_cb_state()
cb = self._make_waiting_cb(state) cb = self._make_waiting_cb(state)
node_info_id = lib.logosdelivery_get_available_node_info_ids( rc = lib.logosdelivery_get_available_node_info_ids(
self.ctx, self.ctx,
cb, cb,
ffi.NULL, ffi.NULL,
) )
if rc < 0:
if node_info_id < 0: return Err(f"get_available_node_info_ids: immediate call failed (ret={rc})")
return Err(f"get_available_node_info_ids: immediate call failed (ret={node_info_id})")
wait_result = _wait_cb_raw(state, "get_available_node_info_ids", timeout_s) wait_result = _wait_cb_raw(state, "get_available_node_info_ids", timeout_s)
if wait_result.is_err(): if wait_result.is_err():
return Err(wait_result.err()) return Err(wait_result.err())
return Ok(node_info_id) cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0:
return Err(f"get_available_node_info_ids: callback failed (ret={cb_ret}) msg={cb_msg!r}")
return Ok(rc)
def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]: def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]:
state = _new_cb_state() state = _new_cb_state()
@ -337,31 +338,34 @@ class NodeWrapper:
ffi.NULL, ffi.NULL,
node_info_id.encode("utf-8"), node_info_id.encode("utf-8"),
) )
if rc != 0:
return Err(f"get_node_info: immediate call failed (ret={rc})")
if rc < 0: wait_result = _wait_cb_raw(state, "get_node_info", timeout_s)
return Err(f"call failed rc={rc}") if wait_result.is_err():
return Err(wait_result.err())
wait = _wait_cb_raw(state, "get_node_info", timeout_s) cb_ret, cb_msg = wait_result.ok_value
if wait.is_err(): if cb_ret != 0:
return Err(wait.err()) return Err(f"get_node_info: callback failed (ret={cb_ret}) msg={cb_msg!r}")
cb_ret, cb_msg = wait.ok_value if not cb_msg:
if cb_ret != 0 or not cb_msg: return Err("get_node_info: empty response")
return Err(f"callback failed ret={cb_ret}")
try: try:
return Ok(json.loads(cb_msg.decode())) result = json.loads(cb_msg.decode("utf-8"))
except Exception: except Exception as e:
return Err(f"get_node_info: invalid json: {e}")
return Err("invalid json") return Ok(result)
def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[bytes, str]: def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]:
state = _new_cb_state() state = _new_cb_state()
cb = self._make_waiting_cb(state) cb = self._make_waiting_cb(state)
rc = lib.logosdelivery_get_available_configs(self.ctx, cb, ffi.NULL) rc = lib.logosdelivery_get_available_configs(self.ctx, cb, ffi.NULL)
if rc != 0: if rc != 0:
return Err(f"get_available_configs failed: {rc}") return Err(f"get_available_configs: immediate call failed (ret={rc})")
wait_result = _wait_cb_raw(state, "get_available_configs", timeout_s) wait_result = _wait_cb_raw(state, "get_available_configs", timeout_s)
if wait_result.is_err(): if wait_result.is_err():
@ -369,52 +373,14 @@ class NodeWrapper:
cb_ret, cb_msg = wait_result.ok_value cb_ret, cb_msg = wait_result.ok_value
if cb_ret != 0: if cb_ret != 0:
return Err(f"get_available_configs failed: {cb_ret}") return Err(f"get_available_configs: callback failed (ret={cb_ret}) msg={cb_msg!r}")
return Ok(cb_msg) if not cb_msg:
return Err("get_available_configs: empty response")
def main(): try:
config = { result = json.loads(cb_msg.decode("utf-8"))
"logLevel": "DEBUG", except Exception as e:
"mode": "Core", return Err(f"get_available_configs: invalid json: {e}")
"protocolsConfig": {
"entryNodes": [
"/dns4/node-01.do-ams3.misc.logos-chat.status.im/tcp/30303/p2p/16Uiu2HAkxoqUTud5LUPQBRmkeL2xP4iKx2kaABYXomQRgmLUgf78"
],
"clusterId": 3,
"autoShardingConfig": {"numShardsInCluster": 8},
},
"networkingConfig": {
"listenIpv4": "0.0.0.0",
"p2pTcpPort": 60000,
"discv5UdpPort": 9000,
},
}
topic = "/test/1/chat/proto"
message = {
"contentTopic": topic,
"payload": "SGVsbG8=",
"ephemeral": False,
}
node_result = NodeWrapper.create_and_start(config)
sleep(20)
if node_result.is_err():
print(node_result.err())
return
node = node_result.ok_value
print(node.subscribe_content_topic(topic))
print(node.send_message(message))
#print(node.get_available_node_info_ids())
#print(node.debug_get_available_configs())
print(node.stop_node())
print(node.destroy())
if __name__ == "__main__":
main()
return Ok(result)