diff --git a/src/node/waku_node.py b/src/node/waku_node.py index c6bfe7f41..86bc85ee5 100644 --- a/src/node/waku_node.py +++ b/src/node/waku_node.py @@ -15,7 +15,6 @@ from src.node.docker_mananger import DockerManager from src.env_vars import DOCKER_LOG_DIR from src.data_storage import DS from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS, VALID_PUBSUB_TOPICS -from src.node.wrappers_manager import WrapperManager logger = get_custom_logger(__name__) @@ -84,22 +83,13 @@ class WakuNode: self._docker_manager = DockerManager(self._image_name) self._container = None self.start_args = {} - self._wrapper_node = None logger.debug(f"WakuNode instance initialized with log path {self._log_path}") - @property - def _is_wrapper(self) -> bool: - return self._wrapper_node is not None - @retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True) - def start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs): + def start(self, wait_for_node_sec=20, **kwargs): logger.debug("Starting Node...") default_args, remove_container = self._prepare_start_context(**kwargs) - - if use_wrapper: - self._start_wrapper(default_args, wait_for_node_sec) - else: - self._start_docker(default_args, remove_container, wait_for_node_sec) + self._start_docker(default_args, remove_container, wait_for_node_sec) def _prepare_start_context(self, **kwargs): self._docker_manager.create_network() @@ -213,52 +203,6 @@ class WakuNode: logger.error(f"REST service did not become ready in time: {ex}") raise - def _start_wrapper(self, default_args, wait_for_node_sec): - logger.debug("Starting node using wrappers") - wrapper_config = self._default_args_to_wrapper_config(default_args) - - result = WrapperManager.create_and_start(config=wrapper_config, timeout_s=wait_for_node_sec) - if result.is_err(): - raise RuntimeError(f"Failed to start wrapper node: {result.err()}") - self._wrapper_node = result.ok_value - - logger.debug(f"Started wrapper node. REST: {self._rest_port}") - DS.waku_nodes.append(self) - delay(1) - try: - self.ensure_ready(timeout_duration=wait_for_node_sec) - except Exception as ex: - logger.error(f"REST service did not become ready in time: {ex}") - raise - - def _default_args_to_wrapper_config(self, default_args): - def _bool(key, default="false"): - return default_args.get(key, default).lower() == "true" - - bootstrap = default_args.get("discv5-bootstrap-node") - - return { - "logLevel": default_args.get("log-level", "DEBUG"), - "mode": "Core", - "networkingConfig": { - "listenIpv4": default_args.get("listen-address", "0.0.0.0"), - "p2pTcpPort": int(default_args["tcp-port"]), - "discv5UdpPort": int(default_args["discv5-udp-port"]), - "restPort": int(default_args["rest-port"]), - "restAddress": default_args.get("rest-address", "0.0.0.0"), - }, - "protocolsConfig": { - "clusterId": int(default_args.get("cluster-id", DEFAULT_CLUSTER_ID)), - "relay": _bool("relay"), - "store": _bool("store"), - "filter": _bool("filter"), - "lightpush": _bool("lightpush"), - "peerExchange": _bool("peer-exchange"), - "discv5Discovery": _bool("discv5-discovery", "true"), - "discv5BootstrapNodes": [bootstrap] if bootstrap else [], - }, - } - @retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True) def register_rln(self, **kwargs): logger.debug("Registering RLN credentials...") @@ -293,12 +237,6 @@ class WakuNode: @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def stop(self): - if self._is_wrapper: - self._stop_wrapper() - else: - self._stop_docker() - - def _stop_docker(self): if self._container: logger.debug(f"Stopping container with id {self._container.short_id}") self._container.stop() @@ -309,14 +247,6 @@ class WakuNode: self._container = None logger.debug("Container stopped.") - def _stop_wrapper(self): - logger.debug("Stopping wrapper node") - result = self._wrapper_node.stop_and_destroy() - if result.is_err(): - logger.error(f"Failed to stop wrapper node: {result.err()}") - self._wrapper_node = None - logger.debug("Wrapper node stopped and destroyed.") - @retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True) def kill(self): if self._container: @@ -397,32 +327,14 @@ class WakuNode: def get_tcp_address(self): return f"/ip4/{self._ext_ip}/tcp/{self._tcp_port}" - def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): - if self._is_wrapper: - result = self._wrapper_node.subscribe_content_topic(content_topic, timeout_s=timeout_s) - if result.is_err(): - raise RuntimeError(f"subscribe_content_topic failed: {result.err()}") - return result.ok_value - else: - return self._api.set_relay_auto_subscriptions([content_topic]) + def subscribe_content_topic(self, content_topic: str): + return self._api.set_relay_auto_subscriptions([content_topic]) - def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0): - if self._is_wrapper: - result = self._wrapper_node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s) - if result.is_err(): - raise RuntimeError(f"unsubscribe_content_topic failed: {result.err()}") - return result.ok_value - else: - return self._api.delete_relay_auto_subscriptions([content_topic]) + def unsubscribe_content_topic(self, content_topic: str): + return self._api.delete_relay_auto_subscriptions([content_topic]) - def send_message(self, message: dict, *, timeout_s: float = 20.0): - if self._is_wrapper: - result = self._wrapper_node.send_message(message, timeout_s=timeout_s) - if result.is_err(): - raise RuntimeError(f"send_message failed: {result.err()}") - return result.ok_value - else: - return self._api.send_relay_auto_message(message) + def send_message(self, message: dict): + return self._api.send_relay_auto_message(message) def info(self): return self._api.info()