From 6d52d1cf84f2222fdad6ab1edcf530e0d7c4f2be Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Tue, 24 Feb 2026 07:11:38 +0100 Subject: [PATCH 1/7] Add logos-delivery-python-bindings submodule --- .gitmodules | 3 +++ third_party/logos-delivery-python-bindings | 1 + 2 files changed, 4 insertions(+) create mode 100644 .gitmodules create mode 160000 third_party/logos-delivery-python-bindings diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..caa1987ee --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "third_party/logos-delivery-python-bindings"] + path = third_party/logos-delivery-python-bindings + url = https://github.com/logos-messaging/logos-delivery-python-bindings.git diff --git a/third_party/logos-delivery-python-bindings b/third_party/logos-delivery-python-bindings new file mode 160000 index 000000000..c29108b45 --- /dev/null +++ b/third_party/logos-delivery-python-bindings @@ -0,0 +1 @@ +Subproject commit c29108b45be52adf141df24720b1d020b488523c From b5ef345ea64ea1d8ed5f9346eef786bffb33f04b Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Sun, 8 Mar 2026 20:40:53 +0100 Subject: [PATCH 2/7] Adding wrapper manager modifications --- src/env_vars.py | 1 + src/node/waku_node.py | 67 +++++++++++++++++++++++++++++++++--- src/node/wrappers_manager.py | 49 ++++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 4 deletions(-) create mode 100644 src/node/wrappers_manager.py diff --git a/src/env_vars.py b/src/env_vars.py index 1fc8f47a7..8a8ad4726 100644 --- a/src/env_vars.py +++ b/src/env_vars.py @@ -16,6 +16,7 @@ def get_env_var(var_name, default=None): # Configuration constants. Need to be upercase to appear in reports DEFAULT_NWAKU = "wakuorg/nwaku:latest" STRESS_ENABLED = False +USE_WRAPPERS = True NODE_1 = get_env_var("NODE_1", DEFAULT_NWAKU) NODE_2 = get_env_var("NODE_2", DEFAULT_NWAKU) ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NWAKU},{DEFAULT_NWAKU},{DEFAULT_NWAKU}") diff --git a/src/node/waku_node.py b/src/node/waku_node.py index 1a96d0617..afefe051a 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -12,9 +12,10 @@ from src.libs.custom_logger import get_custom_logger from tenacity import retry, stop_after_delay, wait_fixed from src.node.api_clients.rest import REST from src.node.docker_mananger import DockerManager -from src.env_vars import DOCKER_LOG_DIR +from src.env_vars import DOCKER_LOG_DIR, USE_WRAPPERS from src.data_storage import DS from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS, VALID_PUBSUB_TOPICS +from src.node import wrappers_manager logger = get_custom_logger(__name__) @@ -83,11 +84,21 @@ class WakuNode: self._docker_manager = DockerManager(self._image_name) self._container = None self.start_args = {} + self._wrapper_node = None logger.debug(f"WakuNode instance initialized with log path {self._log_path}") + @retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True) def start(self, wait_for_node_sec=20, **kwargs): logger.debug("Starting Node...") + default_args, remove_container = self._prepare_start_context(**kwargs) + + if USE_WRAPPERS: + self._start_wrapper(default_args, wait_for_node_sec) + else: + self._start_docker(default_args, remove_container, wait_for_node_sec) + + def _prepare_start_context(self, **kwargs): self._docker_manager.create_network() self._ext_ip = self._docker_manager.generate_random_ext_ip() self._ports = self._docker_manager.generate_ports() @@ -175,8 +186,12 @@ class WakuNode: else: logger.info(f"RLN credentials not set or credential store not available, starting without RLN") - logger.debug(f"Using volumes {self._volumes}") self.start_args = dict(default_args) + return default_args, remove_container + + def _start_docker(self, default_args, remove_container, wait_for_node_sec): + logger.debug(f"Using volumes {self._volumes}") + self._container = self._docker_manager.start_container( self._docker_manager.image, ports=self._ports, @@ -186,16 +201,48 @@ class WakuNode: volumes=self._volumes, remove_container=remove_container, ) - logger.debug(f"Started container from image {self._image_name}. REST: {self._rest_port}") DS.waku_nodes.append(self) - delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly + delay(1) try: self.ensure_ready(timeout_duration=wait_for_node_sec) except Exception as ex: logger.error(f"REST service did not become ready in time: {ex}") raise + def _start_wrapper(self, default_args, wait_for_node_sec): + logger.debug("Starting node using wrappers") + wrapper_config = self._default_args_to_wrapper_config(default_args) + + self._wrapper_node = wrappers_manager.wrapper_create_and_start_node( + config=wrapper_config, + timeout_s=wait_for_node_sec, + ) + logger.debug(f"Started wrapper node. REST: {self._rest_port}") + DS.waku_nodes.append(self) + delay(1) + try: + self.ensure_ready(timeout_duration=wait_for_node_sec) + except Exception as ex: + logger.error(f"REST service did not become ready in time: {ex}") + raise + + def _default_args_to_wrapper_config(self, default_args): + config = { + "logLevel": default_args.get("log-level", "DEBUG"), + "mode": "Core", + "networkingConfig": { + "listenIpv4": default_args.get("listen-address", "0.0.0.0"), + "p2pTcpPort": int(default_args["tcp-port"]), + "discv5UdpPort": int(default_args["discv5-udp-port"]), + }, + "protocolsConfig": { + "clusterId": int(default_args.get("cluster-id", DEFAULT_CLUSTER_ID)), + }, + } + + return config + @retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True) def register_rln(self, **kwargs): logger.debug("Registering RLN credentials...") @@ -230,6 +277,12 @@ class WakuNode: @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def stop(self): + if USE_WRAPPERS: + self._stop_wrapper() + else: + self._stop_docker() + + def _stop_docker(self): if self._container: logger.debug(f"Stopping container with id {self._container.short_id}") self._container.stop() @@ -240,6 +293,12 @@ class WakuNode: self._container = None logger.debug("Container stopped.") + def _stop_wrapper(self): + logger.debug("Stopping wrapper node") + wrappers_manager.wrapper_stop_and_destroy_node(self._wrapper_node) + self._wrapper_node = None + logger.debug("Wrapper node stopped and destroyed.") + @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def kill(self): if self._container: diff --git a/src/node/wrappers_manager.py b/src/node/wrappers_manager.py new file mode 100644 index 000000000..2cba9161a --- /dev/null +++ b/src/node/wrappers_manager.py @@ -0,0 +1,49 @@ +from pathlib import Path +import sys + +WRAPPER_DIR = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku" +sys.path.insert(0, str(WRAPPER_DIR)) + +# from wrapper import NodeWrapper + + +def wrapper_create_node(config, event_cb=None, timeout_s=20.0): + return NodeWrapper.create_node( + config=config, + event_cb=event_cb, + timeout_s=timeout_s, + ) + + +def wrapper_create_and_start_node(config, event_cb=None, timeout_s=20.0): + return NodeWrapper.create_and_start( + config=config, + event_cb=event_cb, + timeout_s=timeout_s, + ) + + +def wrapper_stop_node(node, timeout_s=20.0): + return node.stop_node(timeout_s=timeout_s) + + +def wrapper_destroy_node(node, timeout_s=20.0): + return node.destroy(timeout_s=timeout_s) + + +def wrapper_stop_and_destroy_node(node, timeout_s=20.0): + return node.stop_and_destroy(timeout_s=timeout_s) + + +def wrapper_subscribe(node, content_topic, timeout_s=20.0): + return node.subscribe_content_topic( + content_topic=content_topic, + timeout_s=timeout_s, + ) + + +def wrapper_unsubscribe(node, content_topic, timeout_s=20.0): + return node.unsubscribe_content_topic( + content_topic=content_topic, + timeout_s=timeout_s, + ) From 437b7fc9c76b222e083b6ea08649166caf01dd21 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Tue, 10 Mar 2026 18:59:54 +0100 Subject: [PATCH 3/7] Add unit test for wrappers --- src/node/wrappers_manager.py | 26 +++++++++++++++++++++++ src/steps/wrappers_setup.py | 18 ++++++++++++++++ tests/wrappers_tests/unit_test.py | 35 +++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+) create mode 100644 src/steps/wrappers_setup.py create mode 100644 tests/wrappers_tests/unit_test.py diff --git a/src/node/wrappers_manager.py b/src/node/wrappers_manager.py index 2cba9161a..63ee02ef7 100644 --- a/src/node/wrappers_manager.py +++ b/src/node/wrappers_manager.py @@ -47,3 +47,29 @@ def wrapper_unsubscribe(node, content_topic, timeout_s=20.0): content_topic=content_topic, timeout_s=timeout_s, ) + + +def wrapper_send_message(node, message, timeout_s=20.0): + return node.send_message( + message=message, + timeout_s=timeout_s, + ) + + +def wrapper_get_available_node_info_ids(node, timeout_s=20.0): + return node.get_available_node_info_ids( + timeout_s=timeout_s, + ) + + +def wrapper_get_node_info(node, node_info_id, timeout_s=20.0): + return node.get_node_info( + node_info_id=node_info_id, + timeout_s=timeout_s, + ) + + +def wrapper_get_available_configs(node, timeout_s=20.0): + return node.get_available_configs( + timeout_s=timeout_s, + ) diff --git a/src/steps/wrappers_setup.py b/src/steps/wrappers_setup.py new file mode 100644 index 000000000..981b24261 --- /dev/null +++ b/src/steps/wrappers_setup.py @@ -0,0 +1,18 @@ +class NodeStub: + def send_message(self, message, timeout_s=20.0): + self.message = message + self.timeout_s = timeout_s + return Ok(1) + + def get_available_node_info_ids(self, timeout_s=20.0): + self.timeout_s = timeout_s + return Ok(2) + + def get_node_info(self, node_info_id, timeout_s=20.0): + self.node_info_id = node_info_id + self.timeout_s = timeout_s + return Ok(3) + + def get_available_configs(self, timeout_s=20.0): + self.timeout_s = timeout_s + return Ok(4) diff --git a/tests/wrappers_tests/unit_test.py b/tests/wrappers_tests/unit_test.py new file mode 100644 index 000000000..475d9b86c --- /dev/null +++ b/tests/wrappers_tests/unit_test.py @@ -0,0 +1,35 @@ +import inspect +import pytest +from result import Ok +from src.libs.custom_logger import get_custom_logger +from src.steps.wrappers_setup import NodeStub +from src.node import wrappers_manager + +# from wrapper_setup import NodeWrapper + +logger = get_custom_logger(__name__) + + +class TestWrappersManager: + @pytest.fixture(scope="function", autouse=True) + def wrapper_setup(self): + logger.debug(f"Running setup") + self.node = NodeForTest() + + def test_wrapper_send_message(self): + message = { + "contentTopic": "/test/1/chat/proto", + "payload": "SGVsbG8=", + "ephemeral": False, + } + + result = wrappers_manager.wrapper_send_message(self.node, message, timeout_s=5.0) + + assert result == Ok(1) + assert self.node.last_message == message + assert self.node.last_timeout == 5.0 + + def test_wrapper_get_available_node_info_ids(self): + result = wrappers_manager.wrapper_get_available_node_info_ids(self.node, timeout_s=6.0) + + assert result == Ok(2) From e43226b9637a91cc79a73c717d5f3df2bdd36a3c Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Wed, 11 Mar 2026 16:00:45 +0100 Subject: [PATCH 4/7] ignore third paty --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 16267b58b..e64e42ff9 100644 --- a/.gitignore +++ b/.gitignore @@ -104,3 +104,5 @@ dmypy.json # Pyre type checker .pyre/ + +third_party/logos-delivery-python-bindings From 8537de27d37caa860251a3727a5417752b050ad1 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Wed, 11 Mar 2026 16:03:31 +0100 Subject: [PATCH 5/7] Ignore third party --- third_party/logos-delivery-python-bindings | 1 - 1 file changed, 1 deletion(-) delete mode 160000 third_party/logos-delivery-python-bindings diff --git a/third_party/logos-delivery-python-bindings b/third_party/logos-delivery-python-bindings deleted file mode 160000 index c29108b45..000000000 --- a/third_party/logos-delivery-python-bindings +++ /dev/null @@ -1 +0,0 @@ -Subproject commit c29108b45be52adf141df24720b1d020b488523c From 4cfa132d0391c6f5118eb2ffe99df824060aba20 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Thu, 2 Apr 2026 17:43:49 +0200 Subject: [PATCH 6/7] Add thin wrapper layer to be used in tests --- src/node/wrappers_manager.py | 112 +++++++++++++++++------------------ 1 file changed, 55 insertions(+), 57 deletions(-) diff --git a/src/node/wrappers_manager.py b/src/node/wrappers_manager.py index 63ee02ef7..44be94e7f 100644 --- a/src/node/wrappers_manager.py +++ b/src/node/wrappers_manager.py @@ -1,75 +1,73 @@ -from pathlib import Path import sys +from pathlib import Path +from result import Result, Ok, Err -WRAPPER_DIR = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku" -sys.path.insert(0, str(WRAPPER_DIR)) +_THIRD_PARTY = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku" +if str(_THIRD_PARTY) not in sys.path: + sys.path.insert(0, str(_THIRD_PARTY)) -# from wrapper import NodeWrapper +from wrapper import NodeWrapper as _NodeWrapper # type: ignore[import] -def wrapper_create_node(config, event_cb=None, timeout_s=20.0): - return NodeWrapper.create_node( - config=config, - event_cb=event_cb, - timeout_s=timeout_s, - ) +class WrapperManager: + def __init__(self, node: _NodeWrapper): + self._node = node + @classmethod + def create( + cls, + config: dict, + event_cb=None, + *, + timeout_s: float = 20.0, + ) -> Result["WrapperManager", str]: + result = _NodeWrapper.create_node(config, event_cb, timeout_s=timeout_s) + if result.is_err(): + return Err(result.err()) + return Ok(cls(result.ok_value)) -def wrapper_create_and_start_node(config, event_cb=None, timeout_s=20.0): - return NodeWrapper.create_and_start( - config=config, - event_cb=event_cb, - timeout_s=timeout_s, - ) + @classmethod + def create_and_start( + cls, + config: dict, + event_cb=None, + *, + timeout_s: float = 20.0, + ) -> Result["WrapperManager", str]: + result = _NodeWrapper.create_and_start(config, event_cb, timeout_s=timeout_s) + if result.is_err(): + return Err(result.err()) + return Ok(cls(result.ok_value)) + def __enter__(self) -> "WrapperManager": + return self -def wrapper_stop_node(node, timeout_s=20.0): - return node.stop_node(timeout_s=timeout_s) + def __exit__(self, *_) -> None: + self.stop_and_destroy() + def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.start_node(timeout_s=timeout_s) -def wrapper_destroy_node(node, timeout_s=20.0): - return node.destroy(timeout_s=timeout_s) + def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.stop_node(timeout_s=timeout_s) + def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.destroy(timeout_s=timeout_s) -def wrapper_stop_and_destroy_node(node, timeout_s=20.0): - return node.stop_and_destroy(timeout_s=timeout_s) + def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.stop_and_destroy(timeout_s=timeout_s) + def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.subscribe_content_topic(content_topic, timeout_s=timeout_s) -def wrapper_subscribe(node, content_topic, timeout_s=20.0): - return node.subscribe_content_topic( - content_topic=content_topic, - timeout_s=timeout_s, - ) + def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]: + return self._node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s) + def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]: + return self._node.send_message(message, timeout_s=timeout_s) -def wrapper_unsubscribe(node, content_topic, timeout_s=20.0): - return node.unsubscribe_content_topic( - content_topic=content_topic, - timeout_s=timeout_s, - ) + def get_available_node_info_ids(self, *, timeout_s: float = 20.0) -> Result[list[str], str]: + return self._node.get_available_node_info_ids(timeout_s=timeout_s) - -def wrapper_send_message(node, message, timeout_s=20.0): - return node.send_message( - message=message, - timeout_s=timeout_s, - ) - - -def wrapper_get_available_node_info_ids(node, timeout_s=20.0): - return node.get_available_node_info_ids( - timeout_s=timeout_s, - ) - - -def wrapper_get_node_info(node, node_info_id, timeout_s=20.0): - return node.get_node_info( - node_info_id=node_info_id, - timeout_s=timeout_s, - ) - - -def wrapper_get_available_configs(node, timeout_s=20.0): - return node.get_available_configs( - timeout_s=timeout_s, - ) + def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]: + return self._node.get_node_info(node_info_id, timeout_s=timeout_s) From b4bc2d83c4c315ffd95007213a29eec0f4be0416 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Thu, 2 Apr 2026 19:55:58 +0200 Subject: [PATCH 7/7] Add last API --- src/node/wrappers_manager.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/node/wrappers_manager.py b/src/node/wrappers_manager.py index 44be94e7f..680bff9b2 100644 --- a/src/node/wrappers_manager.py +++ b/src/node/wrappers_manager.py @@ -71,3 +71,6 @@ class WrapperManager: def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]: return self._node.get_node_info(node_info_id, timeout_s=timeout_s) + + def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]: + return self._node.get_available_configs(timeout_s=timeout_s)