Adding wrapper manager modifications

This commit is contained in:
Aya Hassan 2026-03-08 20:40:53 +01:00
parent 66a9f8e162
commit b5ef345ea6
3 changed files with 113 additions and 4 deletions

View File

@ -16,6 +16,7 @@ def get_env_var(var_name, default=None):
# Configuration constants. Need to be upercase to appear in reports
DEFAULT_NWAKU = "wakuorg/nwaku:latest"
STRESS_ENABLED = False
USE_WRAPPERS = True
NODE_1 = get_env_var("NODE_1", DEFAULT_NWAKU)
NODE_2 = get_env_var("NODE_2", DEFAULT_NWAKU)
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NWAKU},{DEFAULT_NWAKU},{DEFAULT_NWAKU}")

View File

@ -12,9 +12,10 @@ from src.libs.custom_logger import get_custom_logger
from tenacity import retry, stop_after_delay, wait_fixed
from src.node.api_clients.rest import REST
from src.node.docker_mananger import DockerManager
from src.env_vars import DOCKER_LOG_DIR
from src.env_vars import DOCKER_LOG_DIR, USE_WRAPPERS
from src.data_storage import DS
from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS, VALID_PUBSUB_TOPICS
from src.node import wrappers_manager
logger = get_custom_logger(__name__)
@ -83,11 +84,21 @@ class WakuNode:
self._docker_manager = DockerManager(self._image_name)
self._container = None
self.start_args = {}
self._wrapper_node = None
logger.debug(f"WakuNode instance initialized with log path {self._log_path}")
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
def start(self, wait_for_node_sec=20, **kwargs):
logger.debug("Starting Node...")
default_args, remove_container = self._prepare_start_context(**kwargs)
if USE_WRAPPERS:
self._start_wrapper(default_args, wait_for_node_sec)
else:
self._start_docker(default_args, remove_container, wait_for_node_sec)
def _prepare_start_context(self, **kwargs):
self._docker_manager.create_network()
self._ext_ip = self._docker_manager.generate_random_ext_ip()
self._ports = self._docker_manager.generate_ports()
@ -175,8 +186,12 @@ class WakuNode:
else:
logger.info(f"RLN credentials not set or credential store not available, starting without RLN")
logger.debug(f"Using volumes {self._volumes}")
self.start_args = dict(default_args)
return default_args, remove_container
def _start_docker(self, default_args, remove_container, wait_for_node_sec):
logger.debug(f"Using volumes {self._volumes}")
self._container = self._docker_manager.start_container(
self._docker_manager.image,
ports=self._ports,
@ -186,16 +201,48 @@ class WakuNode:
volumes=self._volumes,
remove_container=remove_container,
)
logger.debug(f"Started container from image {self._image_name}. REST: {self._rest_port}")
DS.waku_nodes.append(self)
delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly
delay(1)
try:
self.ensure_ready(timeout_duration=wait_for_node_sec)
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
def _start_wrapper(self, default_args, wait_for_node_sec):
logger.debug("Starting node using wrappers")
wrapper_config = self._default_args_to_wrapper_config(default_args)
self._wrapper_node = wrappers_manager.wrapper_create_and_start_node(
config=wrapper_config,
timeout_s=wait_for_node_sec,
)
logger.debug(f"Started wrapper node. REST: {self._rest_port}")
DS.waku_nodes.append(self)
delay(1)
try:
self.ensure_ready(timeout_duration=wait_for_node_sec)
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
def _default_args_to_wrapper_config(self, default_args):
config = {
"logLevel": default_args.get("log-level", "DEBUG"),
"mode": "Core",
"networkingConfig": {
"listenIpv4": default_args.get("listen-address", "0.0.0.0"),
"p2pTcpPort": int(default_args["tcp-port"]),
"discv5UdpPort": int(default_args["discv5-udp-port"]),
},
"protocolsConfig": {
"clusterId": int(default_args.get("cluster-id", DEFAULT_CLUSTER_ID)),
},
}
return config
@retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True)
def register_rln(self, **kwargs):
logger.debug("Registering RLN credentials...")
@ -230,6 +277,12 @@ class WakuNode:
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self):
if USE_WRAPPERS:
self._stop_wrapper()
else:
self._stop_docker()
def _stop_docker(self):
if self._container:
logger.debug(f"Stopping container with id {self._container.short_id}")
self._container.stop()
@ -240,6 +293,12 @@ class WakuNode:
self._container = None
logger.debug("Container stopped.")
def _stop_wrapper(self):
logger.debug("Stopping wrapper node")
wrappers_manager.wrapper_stop_and_destroy_node(self._wrapper_node)
self._wrapper_node = None
logger.debug("Wrapper node stopped and destroyed.")
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self):
if self._container:

View File

@ -0,0 +1,49 @@
from pathlib import Path
import sys
WRAPPER_DIR = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku"
sys.path.insert(0, str(WRAPPER_DIR))
# from wrapper import NodeWrapper
def wrapper_create_node(config, event_cb=None, timeout_s=20.0):
return NodeWrapper.create_node(
config=config,
event_cb=event_cb,
timeout_s=timeout_s,
)
def wrapper_create_and_start_node(config, event_cb=None, timeout_s=20.0):
return NodeWrapper.create_and_start(
config=config,
event_cb=event_cb,
timeout_s=timeout_s,
)
def wrapper_stop_node(node, timeout_s=20.0):
return node.stop_node(timeout_s=timeout_s)
def wrapper_destroy_node(node, timeout_s=20.0):
return node.destroy(timeout_s=timeout_s)
def wrapper_stop_and_destroy_node(node, timeout_s=20.0):
return node.stop_and_destroy(timeout_s=timeout_s)
def wrapper_subscribe(node, content_topic, timeout_s=20.0):
return node.subscribe_content_topic(
content_topic=content_topic,
timeout_s=timeout_s,
)
def wrapper_unsubscribe(node, content_topic, timeout_s=20.0):
return node.unsubscribe_content_topic(
content_topic=content_topic,
timeout_s=timeout_s,
)