From 8687bf4107ead80f11fe2fb374880e14dad1d608 Mon Sep 17 00:00:00 2001 From: Aya Hassan Date: Sat, 4 Apr 2026 17:14:30 +0200 Subject: [PATCH] Add needed helpers --- src/node/waku_node.py | 72 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 13 deletions(-) diff --git a/src/node/waku_node.py b/src/node/waku_node.py index afefe051a..32da1e2f9 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -12,10 +12,10 @@ from src.libs.custom_logger import get_custom_logger from tenacity import retry, stop_after_delay, wait_fixed from src.node.api_clients.rest import REST from src.node.docker_mananger import DockerManager -from src.env_vars import DOCKER_LOG_DIR, USE_WRAPPERS +from src.env_vars import DOCKER_LOG_DIR from src.data_storage import DS from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS, VALID_PUBSUB_TOPICS -from src.node import wrappers_manager +from src.node.wrappers_manager import WrapperManager logger = get_custom_logger(__name__) @@ -87,13 +87,17 @@ class WakuNode: self._wrapper_node = None logger.debug(f"WakuNode instance initialized with log path {self._log_path}") + @property + def _is_wrapper(self) -> bool: + return self._wrapper_node is not None + @retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True) - def start(self, wait_for_node_sec=20, **kwargs): + def start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs): logger.debug("Starting Node...") default_args, remove_container = self._prepare_start_context(**kwargs) - if USE_WRAPPERS: + if use_wrapper: self._start_wrapper(default_args, wait_for_node_sec) else: self._start_docker(default_args, remove_container, wait_for_node_sec) @@ -214,10 +218,11 @@ class WakuNode: logger.debug("Starting node using wrappers") wrapper_config = self._default_args_to_wrapper_config(default_args) - self._wrapper_node = wrappers_manager.wrapper_create_and_start_node( - config=wrapper_config, - timeout_s=wait_for_node_sec, - ) + result = WrapperManager.create_and_start(config=wrapper_config, timeout_s=wait_for_node_sec) + if result.is_err(): + raise RuntimeError(f"Failed to start wrapper node: {result.err()}") + self._wrapper_node = result.ok_value + logger.debug(f"Started wrapper node. REST: {self._rest_port}") DS.waku_nodes.append(self) delay(1) @@ -228,21 +233,33 @@ class WakuNode: raise def _default_args_to_wrapper_config(self, default_args): - config = { + def _bool(key, default="false"): + return default_args.get(key, default).lower() == "true" + + bootstrap = default_args.get("discv5-bootstrap-node") + + return { "logLevel": default_args.get("log-level", "DEBUG"), "mode": "Core", "networkingConfig": { "listenIpv4": default_args.get("listen-address", "0.0.0.0"), "p2pTcpPort": int(default_args["tcp-port"]), "discv5UdpPort": int(default_args["discv5-udp-port"]), + "restPort": int(default_args["rest-port"]), + "restAddress": default_args.get("rest-address", "0.0.0.0"), }, "protocolsConfig": { "clusterId": int(default_args.get("cluster-id", DEFAULT_CLUSTER_ID)), + "relay": _bool("relay"), + "store": _bool("store"), + "filter": _bool("filter"), + "lightpush": _bool("lightpush"), + "peerExchange": _bool("peer-exchange"), + "discv5Discovery": _bool("discv5-discovery", "true"), + "discv5BootstrapNodes": [bootstrap] if bootstrap else [], }, } - return config - @retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True) def register_rln(self, **kwargs): logger.debug("Registering RLN credentials...") @@ -277,7 +294,7 @@ class WakuNode: @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def stop(self): - if USE_WRAPPERS: + if self._is_wrapper: self._stop_wrapper() else: self._stop_docker() @@ -295,7 +312,9 @@ class WakuNode: def _stop_wrapper(self): logger.debug("Stopping wrapper node") - wrappers_manager.wrapper_stop_and_destroy_node(self._wrapper_node) + result = self._wrapper_node.stop_and_destroy() + if result.is_err(): + logger.error(f"Failed to stop wrapper node: {result.err()}") self._wrapper_node = None logger.debug("Wrapper node stopped and destroyed.") @@ -379,6 +398,33 @@ class WakuNode: def get_tcp_address(self): return f"/ip4/{self._ext_ip}/tcp/{self._tcp_port}" + def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): + if self._is_wrapper: + result = self._wrapper_node.subscribe_content_topic(content_topic, timeout_s=timeout_s) + if result.is_err(): + raise RuntimeError(f"subscribe_content_topic failed: {result.err()}") + return result.ok_value + else: + return self._api.set_relay_auto_subscriptions([content_topic]) + + def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): + if self._is_wrapper: + result = self._wrapper_node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s) + if result.is_err(): + raise RuntimeError(f"unsubscribe_content_topic failed: {result.err()}") + return result.ok_value + else: + return self._api.delete_relay_auto_subscriptions([content_topic]) + + def send_message(self, message: dict, *, timeout_s: float = 20.0): + if self._is_wrapper: + result = self._wrapper_node.send_message(message, timeout_s=timeout_s) + if result.is_err(): + raise RuntimeError(f"send_message failed: {result.err()}") + return result.ok_value + else: + return self._api.send_relay_auto_message(message) + def info(self): return self._api.info()