mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-04-09 21:33:24 +00:00
Add needed helpers
This commit is contained in:
parent
b4bc2d83c4
commit
8687bf4107
@ -12,10 +12,10 @@ from src.libs.custom_logger import get_custom_logger
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
from src.node.api_clients.rest import REST
|
||||
from src.node.docker_mananger import DockerManager
|
||||
from src.env_vars import DOCKER_LOG_DIR, USE_WRAPPERS
|
||||
from src.env_vars import DOCKER_LOG_DIR
|
||||
from src.data_storage import DS
|
||||
from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS, VALID_PUBSUB_TOPICS
|
||||
from src.node import wrappers_manager
|
||||
from src.node.wrappers_manager import WrapperManager
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
@ -87,13 +87,17 @@ class WakuNode:
|
||||
self._wrapper_node = None
|
||||
logger.debug(f"WakuNode instance initialized with log path {self._log_path}")
|
||||
|
||||
@property
|
||||
def _is_wrapper(self) -> bool:
|
||||
return self._wrapper_node is not None
|
||||
|
||||
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
|
||||
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
|
||||
def start(self, wait_for_node_sec=20, **kwargs):
|
||||
def start(self, wait_for_node_sec=20, use_wrapper=False, **kwargs):
|
||||
logger.debug("Starting Node...")
|
||||
default_args, remove_container = self._prepare_start_context(**kwargs)
|
||||
|
||||
if USE_WRAPPERS:
|
||||
if use_wrapper:
|
||||
self._start_wrapper(default_args, wait_for_node_sec)
|
||||
else:
|
||||
self._start_docker(default_args, remove_container, wait_for_node_sec)
|
||||
@ -214,10 +218,11 @@ class WakuNode:
|
||||
logger.debug("Starting node using wrappers")
|
||||
wrapper_config = self._default_args_to_wrapper_config(default_args)
|
||||
|
||||
self._wrapper_node = wrappers_manager.wrapper_create_and_start_node(
|
||||
config=wrapper_config,
|
||||
timeout_s=wait_for_node_sec,
|
||||
)
|
||||
result = WrapperManager.create_and_start(config=wrapper_config, timeout_s=wait_for_node_sec)
|
||||
if result.is_err():
|
||||
raise RuntimeError(f"Failed to start wrapper node: {result.err()}")
|
||||
self._wrapper_node = result.ok_value
|
||||
|
||||
logger.debug(f"Started wrapper node. REST: {self._rest_port}")
|
||||
DS.waku_nodes.append(self)
|
||||
delay(1)
|
||||
@ -228,21 +233,33 @@ class WakuNode:
|
||||
raise
|
||||
|
||||
def _default_args_to_wrapper_config(self, default_args):
|
||||
config = {
|
||||
def _bool(key, default="false"):
|
||||
return default_args.get(key, default).lower() == "true"
|
||||
|
||||
bootstrap = default_args.get("discv5-bootstrap-node")
|
||||
|
||||
return {
|
||||
"logLevel": default_args.get("log-level", "DEBUG"),
|
||||
"mode": "Core",
|
||||
"networkingConfig": {
|
||||
"listenIpv4": default_args.get("listen-address", "0.0.0.0"),
|
||||
"p2pTcpPort": int(default_args["tcp-port"]),
|
||||
"discv5UdpPort": int(default_args["discv5-udp-port"]),
|
||||
"restPort": int(default_args["rest-port"]),
|
||||
"restAddress": default_args.get("rest-address", "0.0.0.0"),
|
||||
},
|
||||
"protocolsConfig": {
|
||||
"clusterId": int(default_args.get("cluster-id", DEFAULT_CLUSTER_ID)),
|
||||
"relay": _bool("relay"),
|
||||
"store": _bool("store"),
|
||||
"filter": _bool("filter"),
|
||||
"lightpush": _bool("lightpush"),
|
||||
"peerExchange": _bool("peer-exchange"),
|
||||
"discv5Discovery": _bool("discv5-discovery", "true"),
|
||||
"discv5BootstrapNodes": [bootstrap] if bootstrap else [],
|
||||
},
|
||||
}
|
||||
|
||||
return config
|
||||
|
||||
@retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True)
|
||||
def register_rln(self, **kwargs):
|
||||
logger.debug("Registering RLN credentials...")
|
||||
@ -277,7 +294,7 @@ class WakuNode:
|
||||
|
||||
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
|
||||
def stop(self):
|
||||
if USE_WRAPPERS:
|
||||
if self._is_wrapper:
|
||||
self._stop_wrapper()
|
||||
else:
|
||||
self._stop_docker()
|
||||
@ -295,7 +312,9 @@ class WakuNode:
|
||||
|
||||
def _stop_wrapper(self):
|
||||
logger.debug("Stopping wrapper node")
|
||||
wrappers_manager.wrapper_stop_and_destroy_node(self._wrapper_node)
|
||||
result = self._wrapper_node.stop_and_destroy()
|
||||
if result.is_err():
|
||||
logger.error(f"Failed to stop wrapper node: {result.err()}")
|
||||
self._wrapper_node = None
|
||||
logger.debug("Wrapper node stopped and destroyed.")
|
||||
|
||||
@ -379,6 +398,33 @@ class WakuNode:
|
||||
def get_tcp_address(self):
|
||||
return f"/ip4/{self._ext_ip}/tcp/{self._tcp_port}"
|
||||
|
||||
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0):
|
||||
if self._is_wrapper:
|
||||
result = self._wrapper_node.subscribe_content_topic(content_topic, timeout_s=timeout_s)
|
||||
if result.is_err():
|
||||
raise RuntimeError(f"subscribe_content_topic failed: {result.err()}")
|
||||
return result.ok_value
|
||||
else:
|
||||
return self._api.set_relay_auto_subscriptions([content_topic])
|
||||
|
||||
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0):
|
||||
if self._is_wrapper:
|
||||
result = self._wrapper_node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s)
|
||||
if result.is_err():
|
||||
raise RuntimeError(f"unsubscribe_content_topic failed: {result.err()}")
|
||||
return result.ok_value
|
||||
else:
|
||||
return self._api.delete_relay_auto_subscriptions([content_topic])
|
||||
|
||||
def send_message(self, message: dict, *, timeout_s: float = 20.0):
|
||||
if self._is_wrapper:
|
||||
result = self._wrapper_node.send_message(message, timeout_s=timeout_s)
|
||||
if result.is_err():
|
||||
raise RuntimeError(f"send_message failed: {result.err()}")
|
||||
return result.ok_value
|
||||
else:
|
||||
return self._api.send_relay_auto_message(message)
|
||||
|
||||
def info(self):
|
||||
return self._api.info()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user