Fix review comments

This commit is contained in:
Aya Hassan 2026-04-21 18:15:46 +02:00
parent e27d64a1a4
commit 060d50afbb

View File

@ -15,7 +15,6 @@ from src.node.docker_mananger import DockerManager
from src.env_vars import DOCKER_LOG_DIR
from src.data_storage import DS
from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS, VALID_PUBSUB_TOPICS
from src.node.wrappers_manager import WrapperManager
logger = get_custom_logger(__name__)
@ -84,22 +83,13 @@ class WakuNode:
self._docker_manager = DockerManager(self._image_name)
self._container = None
self.start_args = {}
self._wrapper_node = None
logger.debug(f"WakuNode instance initialized with log path {self._log_path}")
@property
def _is_wrapper(self) -> bool:
return self._wrapper_node is not None
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
def start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs):
def start(self, wait_for_node_sec=20, **kwargs):
logger.debug("Starting Node...")
default_args, remove_container = self._prepare_start_context(**kwargs)
if use_wrapper:
self._start_wrapper(default_args, wait_for_node_sec)
else:
self._start_docker(default_args, remove_container, wait_for_node_sec)
self._start_docker(default_args, remove_container, wait_for_node_sec)
def _prepare_start_context(self, **kwargs):
self._docker_manager.create_network()
@ -213,52 +203,6 @@ class WakuNode:
logger.error(f"REST service did not become ready in time: {ex}")
raise
def _start_wrapper(self, default_args, wait_for_node_sec):
logger.debug("Starting node using wrappers")
wrapper_config = self._default_args_to_wrapper_config(default_args)
result = WrapperManager.create_and_start(config=wrapper_config, timeout_s=wait_for_node_sec)
if result.is_err():
raise RuntimeError(f"Failed to start wrapper node: {result.err()}")
self._wrapper_node = result.ok_value
logger.debug(f"Started wrapper node. REST: {self._rest_port}")
DS.waku_nodes.append(self)
delay(1)
try:
self.ensure_ready(timeout_duration=wait_for_node_sec)
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
def _default_args_to_wrapper_config(self, default_args):
def _bool(key, default="false"):
return default_args.get(key, default).lower() == "true"
bootstrap = default_args.get("discv5-bootstrap-node")
return {
"logLevel": default_args.get("log-level", "DEBUG"),
"mode": "Core",
"networkingConfig": {
"listenIpv4": default_args.get("listen-address", "0.0.0.0"),
"p2pTcpPort": int(default_args["tcp-port"]),
"discv5UdpPort": int(default_args["discv5-udp-port"]),
"restPort": int(default_args["rest-port"]),
"restAddress": default_args.get("rest-address", "0.0.0.0"),
},
"protocolsConfig": {
"clusterId": int(default_args.get("cluster-id", DEFAULT_CLUSTER_ID)),
"relay": _bool("relay"),
"store": _bool("store"),
"filter": _bool("filter"),
"lightpush": _bool("lightpush"),
"peerExchange": _bool("peer-exchange"),
"discv5Discovery": _bool("discv5-discovery", "true"),
"discv5BootstrapNodes": [bootstrap] if bootstrap else [],
},
}
@retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True)
def register_rln(self, **kwargs):
logger.debug("Registering RLN credentials...")
@ -293,12 +237,6 @@ class WakuNode:
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self):
if self._is_wrapper:
self._stop_wrapper()
else:
self._stop_docker()
def _stop_docker(self):
if self._container:
logger.debug(f"Stopping container with id {self._container.short_id}")
self._container.stop()
@ -309,14 +247,6 @@ class WakuNode:
self._container = None
logger.debug("Container stopped.")
def _stop_wrapper(self):
logger.debug("Stopping wrapper node")
result = self._wrapper_node.stop_and_destroy()
if result.is_err():
logger.error(f"Failed to stop wrapper node: {result.err()}")
self._wrapper_node = None
logger.debug("Wrapper node stopped and destroyed.")
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self):
if self._container:
@ -397,32 +327,14 @@ class WakuNode:
def get_tcp_address(self):
return f"/ip4/{self._ext_ip}/tcp/{self._tcp_port}"
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0):
if self._is_wrapper:
result = self._wrapper_node.subscribe_content_topic(content_topic, timeout_s=timeout_s)
if result.is_err():
raise RuntimeError(f"subscribe_content_topic failed: {result.err()}")
return result.ok_value
else:
return self._api.set_relay_auto_subscriptions([content_topic])
def subscribe_content_topic(self, content_topic: str):
return self._api.set_relay_auto_subscriptions([content_topic])
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0):
if self._is_wrapper:
result = self._wrapper_node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s)
if result.is_err():
raise RuntimeError(f"unsubscribe_content_topic failed: {result.err()}")
return result.ok_value
else:
return self._api.delete_relay_auto_subscriptions([content_topic])
def unsubscribe_content_topic(self, content_topic: str):
return self._api.delete_relay_auto_subscriptions([content_topic])
def send_message(self, message: dict, *, timeout_s: float = 20.0):
if self._is_wrapper:
result = self._wrapper_node.send_message(message, timeout_s=timeout_s)
if result.is_err():
raise RuntimeError(f"send_message failed: {result.err()}")
return result.ok_value
else:
return self._api.send_relay_auto_message(message)
def send_message(self, message: dict):
return self._api.send_relay_auto_message(message)
def info(self):
return self._api.info()