mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-04-05 03:13:22 +00:00
Merge b4bc2d83c4c315ffd95007213a29eec0f4be0416 into 155296c4d3a46f7e675b4822ac8d78aa2997a676
This commit is contained in:
commit
ba1d0f2333
2
.gitignore
vendored
2
.gitignore
vendored
@ -104,3 +104,5 @@ dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
third_party/logos-delivery-python-bindings
|
||||
|
||||
3
.gitmodules
vendored
Normal file
3
.gitmodules
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
[submodule "third_party/logos-delivery-python-bindings"]
|
||||
path = third_party/logos-delivery-python-bindings
|
||||
url = https://github.com/logos-messaging/logos-delivery-python-bindings.git
|
||||
@ -16,6 +16,7 @@ def get_env_var(var_name, default=None):
|
||||
# Configuration constants. Need to be upercase to appear in reports
|
||||
DEFAULT_NWAKU = "wakuorg/nwaku:latest"
|
||||
STRESS_ENABLED = False
|
||||
USE_WRAPPERS = True
|
||||
NODE_1 = get_env_var("NODE_1", DEFAULT_NWAKU)
|
||||
NODE_2 = get_env_var("NODE_2", DEFAULT_NWAKU)
|
||||
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NWAKU},{DEFAULT_NWAKU},{DEFAULT_NWAKU}")
|
||||
|
||||
@ -12,9 +12,10 @@ from src.libs.custom_logger import get_custom_logger
|
||||
from tenacity import retry, stop_after_delay, wait_fixed
|
||||
from src.node.api_clients.rest import REST
|
||||
from src.node.docker_mananger import DockerManager
|
||||
from src.env_vars import DOCKER_LOG_DIR
|
||||
from src.env_vars import DOCKER_LOG_DIR, USE_WRAPPERS
|
||||
from src.data_storage import DS
|
||||
from src.test_data import DEFAULT_CLUSTER_ID, LOG_ERROR_KEYWORDS, VALID_PUBSUB_TOPICS
|
||||
from src.node import wrappers_manager
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
@ -83,11 +84,21 @@ class WakuNode:
|
||||
self._docker_manager = DockerManager(self._image_name)
|
||||
self._container = None
|
||||
self.start_args = {}
|
||||
self._wrapper_node = None
|
||||
logger.debug(f"WakuNode instance initialized with log path {self._log_path}")
|
||||
|
||||
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
|
||||
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
|
||||
def start(self, wait_for_node_sec=20, **kwargs):
|
||||
logger.debug("Starting Node...")
|
||||
default_args, remove_container = self._prepare_start_context(**kwargs)
|
||||
|
||||
if USE_WRAPPERS:
|
||||
self._start_wrapper(default_args, wait_for_node_sec)
|
||||
else:
|
||||
self._start_docker(default_args, remove_container, wait_for_node_sec)
|
||||
|
||||
def _prepare_start_context(self, **kwargs):
|
||||
self._docker_manager.create_network()
|
||||
self._ext_ip = self._docker_manager.generate_random_ext_ip()
|
||||
self._ports = self._docker_manager.generate_ports()
|
||||
@ -175,8 +186,12 @@ class WakuNode:
|
||||
else:
|
||||
logger.info(f"RLN credentials not set or credential store not available, starting without RLN")
|
||||
|
||||
logger.debug(f"Using volumes {self._volumes}")
|
||||
self.start_args = dict(default_args)
|
||||
return default_args, remove_container
|
||||
|
||||
def _start_docker(self, default_args, remove_container, wait_for_node_sec):
|
||||
logger.debug(f"Using volumes {self._volumes}")
|
||||
|
||||
self._container = self._docker_manager.start_container(
|
||||
self._docker_manager.image,
|
||||
ports=self._ports,
|
||||
@ -186,16 +201,48 @@ class WakuNode:
|
||||
volumes=self._volumes,
|
||||
remove_container=remove_container,
|
||||
)
|
||||
|
||||
logger.debug(f"Started container from image {self._image_name}. REST: {self._rest_port}")
|
||||
DS.waku_nodes.append(self)
|
||||
delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly
|
||||
delay(1)
|
||||
try:
|
||||
self.ensure_ready(timeout_duration=wait_for_node_sec)
|
||||
except Exception as ex:
|
||||
logger.error(f"REST service did not become ready in time: {ex}")
|
||||
raise
|
||||
|
||||
def _start_wrapper(self, default_args, wait_for_node_sec):
|
||||
logger.debug("Starting node using wrappers")
|
||||
wrapper_config = self._default_args_to_wrapper_config(default_args)
|
||||
|
||||
self._wrapper_node = wrappers_manager.wrapper_create_and_start_node(
|
||||
config=wrapper_config,
|
||||
timeout_s=wait_for_node_sec,
|
||||
)
|
||||
logger.debug(f"Started wrapper node. REST: {self._rest_port}")
|
||||
DS.waku_nodes.append(self)
|
||||
delay(1)
|
||||
try:
|
||||
self.ensure_ready(timeout_duration=wait_for_node_sec)
|
||||
except Exception as ex:
|
||||
logger.error(f"REST service did not become ready in time: {ex}")
|
||||
raise
|
||||
|
||||
def _default_args_to_wrapper_config(self, default_args):
|
||||
config = {
|
||||
"logLevel": default_args.get("log-level", "DEBUG"),
|
||||
"mode": "Core",
|
||||
"networkingConfig": {
|
||||
"listenIpv4": default_args.get("listen-address", "0.0.0.0"),
|
||||
"p2pTcpPort": int(default_args["tcp-port"]),
|
||||
"discv5UdpPort": int(default_args["discv5-udp-port"]),
|
||||
},
|
||||
"protocolsConfig": {
|
||||
"clusterId": int(default_args.get("cluster-id", DEFAULT_CLUSTER_ID)),
|
||||
},
|
||||
}
|
||||
|
||||
return config
|
||||
|
||||
@retry(stop=stop_after_delay(250), wait=wait_fixed(0.1), reraise=True)
|
||||
def register_rln(self, **kwargs):
|
||||
logger.debug("Registering RLN credentials...")
|
||||
@ -230,6 +277,12 @@ class WakuNode:
|
||||
|
||||
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
|
||||
def stop(self):
|
||||
if USE_WRAPPERS:
|
||||
self._stop_wrapper()
|
||||
else:
|
||||
self._stop_docker()
|
||||
|
||||
def _stop_docker(self):
|
||||
if self._container:
|
||||
logger.debug(f"Stopping container with id {self._container.short_id}")
|
||||
self._container.stop()
|
||||
@ -240,6 +293,12 @@ class WakuNode:
|
||||
self._container = None
|
||||
logger.debug("Container stopped.")
|
||||
|
||||
def _stop_wrapper(self):
|
||||
logger.debug("Stopping wrapper node")
|
||||
wrappers_manager.wrapper_stop_and_destroy_node(self._wrapper_node)
|
||||
self._wrapper_node = None
|
||||
logger.debug("Wrapper node stopped and destroyed.")
|
||||
|
||||
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
|
||||
def kill(self):
|
||||
if self._container:
|
||||
|
||||
76
src/node/wrappers_manager.py
Normal file
76
src/node/wrappers_manager.py
Normal file
@ -0,0 +1,76 @@
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from result import Result, Ok, Err
|
||||
|
||||
_THIRD_PARTY = Path(__file__).resolve().parents[2] / "third_party" / "logos-delivery-python-bindings" / "waku"
|
||||
if str(_THIRD_PARTY) not in sys.path:
|
||||
sys.path.insert(0, str(_THIRD_PARTY))
|
||||
|
||||
from wrapper import NodeWrapper as _NodeWrapper # type: ignore[import]
|
||||
|
||||
|
||||
class WrapperManager:
|
||||
def __init__(self, node: _NodeWrapper):
|
||||
self._node = node
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls,
|
||||
config: dict,
|
||||
event_cb=None,
|
||||
*,
|
||||
timeout_s: float = 20.0,
|
||||
) -> Result["WrapperManager", str]:
|
||||
result = _NodeWrapper.create_node(config, event_cb, timeout_s=timeout_s)
|
||||
if result.is_err():
|
||||
return Err(result.err())
|
||||
return Ok(cls(result.ok_value))
|
||||
|
||||
@classmethod
|
||||
def create_and_start(
|
||||
cls,
|
||||
config: dict,
|
||||
event_cb=None,
|
||||
*,
|
||||
timeout_s: float = 20.0,
|
||||
) -> Result["WrapperManager", str]:
|
||||
result = _NodeWrapper.create_and_start(config, event_cb, timeout_s=timeout_s)
|
||||
if result.is_err():
|
||||
return Err(result.err())
|
||||
return Ok(cls(result.ok_value))
|
||||
|
||||
def __enter__(self) -> "WrapperManager":
|
||||
return self
|
||||
|
||||
def __exit__(self, *_) -> None:
|
||||
self.stop_and_destroy()
|
||||
|
||||
def start_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||
return self._node.start_node(timeout_s=timeout_s)
|
||||
|
||||
def stop_node(self, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||
return self._node.stop_node(timeout_s=timeout_s)
|
||||
|
||||
def destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||
return self._node.destroy(timeout_s=timeout_s)
|
||||
|
||||
def stop_and_destroy(self, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||
return self._node.stop_and_destroy(timeout_s=timeout_s)
|
||||
|
||||
def subscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||
return self._node.subscribe_content_topic(content_topic, timeout_s=timeout_s)
|
||||
|
||||
def unsubscribe_content_topic(self, content_topic: str, *, timeout_s: float = 20.0) -> Result[int, str]:
|
||||
return self._node.unsubscribe_content_topic(content_topic, timeout_s=timeout_s)
|
||||
|
||||
def send_message(self, message: dict, *, timeout_s: float = 20.0) -> Result[str, str]:
|
||||
return self._node.send_message(message, timeout_s=timeout_s)
|
||||
|
||||
def get_available_node_info_ids(self, *, timeout_s: float = 20.0) -> Result[list[str], str]:
|
||||
return self._node.get_available_node_info_ids(timeout_s=timeout_s)
|
||||
|
||||
def get_node_info(self, node_info_id: str, *, timeout_s: float = 20.0) -> Result[dict, str]:
|
||||
return self._node.get_node_info(node_info_id, timeout_s=timeout_s)
|
||||
|
||||
def get_available_configs(self, *, timeout_s: float = 20.0) -> Result[dict, str]:
|
||||
return self._node.get_available_configs(timeout_s=timeout_s)
|
||||
18
src/steps/wrappers_setup.py
Normal file
18
src/steps/wrappers_setup.py
Normal file
@ -0,0 +1,18 @@
|
||||
class NodeStub:
|
||||
def send_message(self, message, timeout_s=20.0):
|
||||
self.message = message
|
||||
self.timeout_s = timeout_s
|
||||
return Ok(1)
|
||||
|
||||
def get_available_node_info_ids(self, timeout_s=20.0):
|
||||
self.timeout_s = timeout_s
|
||||
return Ok(2)
|
||||
|
||||
def get_node_info(self, node_info_id, timeout_s=20.0):
|
||||
self.node_info_id = node_info_id
|
||||
self.timeout_s = timeout_s
|
||||
return Ok(3)
|
||||
|
||||
def get_available_configs(self, timeout_s=20.0):
|
||||
self.timeout_s = timeout_s
|
||||
return Ok(4)
|
||||
35
tests/wrappers_tests/unit_test.py
Normal file
35
tests/wrappers_tests/unit_test.py
Normal file
@ -0,0 +1,35 @@
|
||||
import inspect
|
||||
import pytest
|
||||
from result import Ok
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.steps.wrappers_setup import NodeStub
|
||||
from src.node import wrappers_manager
|
||||
|
||||
# from wrapper_setup import NodeWrapper
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class TestWrappersManager:
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def wrapper_setup(self):
|
||||
logger.debug(f"Running setup")
|
||||
self.node = NodeForTest()
|
||||
|
||||
def test_wrapper_send_message(self):
|
||||
message = {
|
||||
"contentTopic": "/test/1/chat/proto",
|
||||
"payload": "SGVsbG8=",
|
||||
"ephemeral": False,
|
||||
}
|
||||
|
||||
result = wrappers_manager.wrapper_send_message(self.node, message, timeout_s=5.0)
|
||||
|
||||
assert result == Ok(1)
|
||||
assert self.node.last_message == message
|
||||
assert self.node.last_timeout == 5.0
|
||||
|
||||
def test_wrapper_get_available_node_info_ids(self):
|
||||
result = wrappers_manager.wrapper_get_available_node_info_ids(self.node, timeout_s=6.0)
|
||||
|
||||
assert result == Ok(2)
|
||||
Loading…
x
Reference in New Issue
Block a user