Add thin wrapper layer to be used in tests

This commit is contained in:
Aya Hassan 2026-04-02 17:43:49 +02:00
parent 8537de27d3
commit 4cfa132d03

View File

@ -1,75 +1,73 @@
from pathlib import Path
import sys
from pathlib import Path
from result import Result, Ok, Err
WRAPPER_DIR = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku"
sys.path.insert(0, str(WRAPPER_DIR))
_THIRD_PARTY = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku"
if str(_THIRD_PARTY) not in sys.path:
sys.path.insert(0, str(_THIRD_PARTY))
# from wrapper import NodeWrapper
from wrapper import NodeWrapper as _NodeWrapper # type: ignore[import]
def wrapper_create_node(config, event_cb=None, timeout_s=20.0):
return NodeWrapper.create_node(
config=config,
event_cb=event_cb,
timeout_s=timeout_s,
)
class WrapperManager:
def __init__(self, node: _NodeWrapper):
self._node = node
@classmethod
def create(
cls,
config: dict,
event_cb=None,
*,
timeout_s: float = 20.0,
) -> Result["WrapperManager", str]:
result = _NodeWrapper.create_node(config, event_cb, timeout_s=timeout_s)
if result.is_err():
return Err(result.err())
return Ok(cls(result.ok_value))
def wrapper_create_and_start_node(config, event_cb=None, timeout_s=20.0):
return NodeWrapper.create_and_start(
config=config,
event_cb=event_cb,
timeout_s=timeout_s,
)
@classmethod
def create_and_start(
cls,
config: dict,
event_cb=None,
*,
timeout_s: float = 20.0,
) -> Result["WrapperManager", str]:
result = _NodeWrapper.create_and_start(config, event_cb, timeout_s=timeout_s)
if result.is_err():
return Err(result.err())
return Ok(cls(result.ok_value))
def __enter__(self) -> "WrapperManager":
return self
def wrapper_stop_node(node, timeout_s=20.0):
return node.stop_node(timeout_s=timeout_s)
def __exit__(self, *_) -> None:
self.stop_and_destroy()
def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.start_node(timeout_s=timeout_s)
def wrapper_destroy_node(node, timeout_s=20.0):
return node.destroy(timeout_s=timeout_s)
def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.stop_node(timeout_s=timeout_s)
def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.destroy(timeout_s=timeout_s)
def wrapper_stop_and_destroy_node(node, timeout_s=20.0):
return node.stop_and_destroy(timeout_s=timeout_s)
def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.stop_and_destroy(timeout_s=timeout_s)
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.subscribe_content_topic(content_topic, timeout_s=timeout_s)
def wrapper_subscribe(node, content_topic, timeout_s=20.0):
return node.subscribe_content_topic(
content_topic=content_topic,
timeout_s=timeout_s,
)
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
return self._node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s)
def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]:
return self._node.send_message(message, timeout_s=timeout_s)
def wrapper_unsubscribe(node, content_topic, timeout_s=20.0):
return node.unsubscribe_content_topic(
content_topic=content_topic,
timeout_s=timeout_s,
)
def get_available_node_info_ids(self, *, timeout_s: float = 20.0) -> Result[list[str], str]:
return self._node.get_available_node_info_ids(timeout_s=timeout_s)
def wrapper_send_message(node, message, timeout_s=20.0):
return node.send_message(
message=message,
timeout_s=timeout_s,
)
def wrapper_get_available_node_info_ids(node, timeout_s=20.0):
return node.get_available_node_info_ids(
timeout_s=timeout_s,
)
def wrapper_get_node_info(node, node_info_id, timeout_s=20.0):
return node.get_node_info(
node_info_id=node_info_id,
timeout_s=timeout_s,
)
def wrapper_get_available_configs(node, timeout_s=20.0):
return node.get_available_configs(
timeout_s=timeout_s,
)
def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]:
return self._node.get_node_info(node_info_id, timeout_s=timeout_s)