mirror of
https://github.com/logos-messaging/logos-delivery-python-bindings.git
synced 2026-05-11 08:29:26 +00:00
add resullt , ok , err
This commit is contained in:
parent
2ab1500aad
commit
cb62f65f43
@ -3,6 +3,7 @@ import threading
|
|||||||
import logging
|
import logging
|
||||||
from cffi import FFI
|
from cffi import FFI
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from result import Result, Ok, Err
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -72,19 +73,21 @@ def _new_cb_state():
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def _wait_cb(state, op_name: str, timeout_s: float = 20.0):
|
def _wait_cb(state, op_name: str, timeout_s: float = 20.0) -> Result[int, str]:
|
||||||
finished = state["done"].wait(timeout_s)
|
finished = state["done"].wait(timeout_s)
|
||||||
if not finished:
|
if not finished:
|
||||||
raise TimeoutError(f"{op_name}: timeout waiting for callback after {timeout_s}s")
|
return Err(f"{op_name}: timeout waiting for callback after {timeout_s}s")
|
||||||
|
|
||||||
ret = state["ret"]
|
ret = state["ret"]
|
||||||
msg = state["msg"] or b""
|
msg = state["msg"] or b""
|
||||||
|
|
||||||
if ret is None:
|
if ret is None:
|
||||||
raise RuntimeError(f"{op_name}: callback fired but ret is None")
|
return Err(f"{op_name}: callback fired but ret is None")
|
||||||
|
|
||||||
if ret != 0:
|
if ret != 0:
|
||||||
raise RuntimeError(f"{op_name}: failed (ret={ret}) msg={msg!r}")
|
return Err(f"{op_name}: failed (ret={ret}) msg={msg!r}")
|
||||||
|
|
||||||
|
return Ok(ret)
|
||||||
|
|
||||||
|
|
||||||
class NodeWrapper:
|
class NodeWrapper:
|
||||||
@ -120,7 +123,7 @@ class NodeWrapper:
|
|||||||
event_cb=None,
|
event_cb=None,
|
||||||
*,
|
*,
|
||||||
timeout_s: float = 20.0,
|
timeout_s: float = 20.0,
|
||||||
):
|
) -> Result["NodeWrapper", str]:
|
||||||
config_json = json.dumps(config, separators=(",", ":"), ensure_ascii=False)
|
config_json = json.dumps(config, separators=(",", ":"), ensure_ascii=False)
|
||||||
config_buffer = ffi.new("char[]", config_json.encode("utf-8"))
|
config_buffer = ffi.new("char[]", config_json.encode("utf-8"))
|
||||||
|
|
||||||
@ -134,9 +137,11 @@ class NodeWrapper:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if ctx == ffi.NULL:
|
if ctx == ffi.NULL:
|
||||||
raise RuntimeError("create_node: ctx is NULL")
|
return Err("create_node: ctx is NULL")
|
||||||
|
|
||||||
_wait_cb(state, "create_node", timeout_s)
|
wait_result = _wait_cb(state, "create_node", timeout_s)
|
||||||
|
if wait_result.is_err():
|
||||||
|
return Err(wait_result.err())
|
||||||
|
|
||||||
event_cb_handler = None
|
event_cb_handler = None
|
||||||
if event_cb is not None:
|
if event_cb is not None:
|
||||||
@ -147,7 +152,7 @@ class NodeWrapper:
|
|||||||
ffi.NULL,
|
ffi.NULL,
|
||||||
)
|
)
|
||||||
|
|
||||||
return cls(ctx, config_buffer, event_cb_handler)
|
return Ok(cls(ctx, config_buffer, event_cb_handler))
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create_and_start(
|
def create_and_start(
|
||||||
@ -156,54 +161,60 @@ class NodeWrapper:
|
|||||||
event_cb=None,
|
event_cb=None,
|
||||||
*,
|
*,
|
||||||
timeout_s: float = 20.0,
|
timeout_s: float = 20.0,
|
||||||
):
|
) -> Result["NodeWrapper", str]:
|
||||||
node = cls.create_node(
|
node_result = cls.create_node(
|
||||||
config=config,
|
config=config,
|
||||||
event_cb=event_cb,
|
event_cb=event_cb,
|
||||||
timeout_s=timeout_s,
|
timeout_s=timeout_s,
|
||||||
)
|
)
|
||||||
node.start_node(timeout_s=timeout_s)
|
if node_result.is_err():
|
||||||
return node
|
return Err(node_result.err())
|
||||||
|
|
||||||
def start_node(self, *, timeout_s: float = 20.0):
|
node = node_result.ok_value
|
||||||
|
start_result = node.start_node(timeout_s=timeout_s)
|
||||||
|
if start_result.is_err():
|
||||||
|
return Err(start_result.err())
|
||||||
|
|
||||||
|
return Ok(node)
|
||||||
|
|
||||||
|
def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||||
state = _new_cb_state()
|
state = _new_cb_state()
|
||||||
cb = self._make_waiting_cb(state)
|
cb = self._make_waiting_cb(state)
|
||||||
|
|
||||||
rc = lib.logosdelivery_start_node(self.ctx, cb, ffi.NULL)
|
rc = lib.logosdelivery_start_node(self.ctx, cb, ffi.NULL)
|
||||||
if rc != 0:
|
if rc != 0:
|
||||||
raise RuntimeError(f"start_node: immediate call failed (ret={rc})")
|
return Err(f"start_node: immediate call failed (ret={rc})")
|
||||||
|
|
||||||
_wait_cb(state, "start_node", timeout_s)
|
return _wait_cb(state, "start_node", timeout_s)
|
||||||
return 0
|
|
||||||
|
|
||||||
def stop_node(self, *, timeout_s: float = 20.0):
|
def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||||
state = _new_cb_state()
|
state = _new_cb_state()
|
||||||
cb = self._make_waiting_cb(state)
|
cb = self._make_waiting_cb(state)
|
||||||
|
|
||||||
rc = lib.logosdelivery_stop_node(self.ctx, cb, ffi.NULL)
|
rc = lib.logosdelivery_stop_node(self.ctx, cb, ffi.NULL)
|
||||||
if rc != 0:
|
if rc != 0:
|
||||||
raise RuntimeError(f"stop_node: immediate call failed (ret={rc})")
|
return Err(f"stop_node: immediate call failed (ret={rc})")
|
||||||
|
|
||||||
_wait_cb(state, "stop_node", timeout_s)
|
return _wait_cb(state, "stop_node", timeout_s)
|
||||||
return 0
|
|
||||||
|
|
||||||
def destroy(self, *, timeout_s: float = 20.0):
|
def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||||
state = _new_cb_state()
|
state = _new_cb_state()
|
||||||
cb = self._make_waiting_cb(state)
|
cb = self._make_waiting_cb(state)
|
||||||
|
|
||||||
rc = lib.logosdelivery_destroy(self.ctx, cb, ffi.NULL)
|
rc = lib.logosdelivery_destroy(self.ctx, cb, ffi.NULL)
|
||||||
if rc != 0:
|
if rc != 0:
|
||||||
raise RuntimeError(f"destroy: immediate call failed (ret={rc})")
|
return Err(f"destroy: immediate call failed (ret={rc})")
|
||||||
|
|
||||||
_wait_cb(state, "destroy", timeout_s)
|
return _wait_cb(state, "destroy", timeout_s)
|
||||||
return 0
|
|
||||||
|
|
||||||
def stop_and_destroy(self, *, timeout_s: float = 20.0):
|
def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||||
self.stop_node(timeout_s=timeout_s)
|
stop_result = self.stop_node(timeout_s=timeout_s)
|
||||||
self.destroy(timeout_s=timeout_s)
|
if stop_result.is_err():
|
||||||
return 0
|
return Err(stop_result.err())
|
||||||
|
|
||||||
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0):
|
return self.destroy(timeout_s=timeout_s)
|
||||||
|
|
||||||
|
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||||
state = _new_cb_state()
|
state = _new_cb_state()
|
||||||
cb = self._make_waiting_cb(state)
|
cb = self._make_waiting_cb(state)
|
||||||
|
|
||||||
@ -214,12 +225,11 @@ class NodeWrapper:
|
|||||||
content_topic.encode("utf-8"),
|
content_topic.encode("utf-8"),
|
||||||
)
|
)
|
||||||
if rc != 0:
|
if rc != 0:
|
||||||
raise RuntimeError(f"subscribe_content_topic: immediate call failed (ret={rc})")
|
return Err(f"subscribe_content_topic: immediate call failed (ret={rc})")
|
||||||
|
|
||||||
_wait_cb(state, f"subscribe({content_topic})", timeout_s)
|
return _wait_cb(state, f"subscribe({content_topic})", timeout_s)
|
||||||
return 0
|
|
||||||
|
|
||||||
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0):
|
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||||
state = _new_cb_state()
|
state = _new_cb_state()
|
||||||
cb = self._make_waiting_cb(state)
|
cb = self._make_waiting_cb(state)
|
||||||
|
|
||||||
@ -230,8 +240,6 @@ class NodeWrapper:
|
|||||||
content_topic.encode("utf-8"),
|
content_topic.encode("utf-8"),
|
||||||
)
|
)
|
||||||
if rc != 0:
|
if rc != 0:
|
||||||
raise RuntimeError(f"unsubscribe_content_topic: immediate call failed (ret={rc})")
|
return Err(f"unsubscribe_content_topic: immediate call failed (ret={rc})")
|
||||||
|
|
||||||
_wait_cb(state, f"unsubscribe({content_topic})", timeout_s)
|
return _wait_cb(state, f"unsubscribe({content_topic})", timeout_s)
|
||||||
|
|
||||||
return 0
|
|
||||||
Loading…
x
Reference in New Issue
Block a user