Merge pull request #1 from logos-co/two-node-cl-start

Two node cluster start automation
This commit is contained in:
Roman Zajic 2025-01-16 09:07:34 +08:00 committed by GitHub
commit 78f9e2bfe0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 376 additions and 24 deletions

View File

@ -19,10 +19,10 @@ pytest
Licensed and distributed under either of Licensed and distributed under either of
- MIT license: [LICENSE-MIT](https://github.com/waku-org/js-waku/blob/master/LICENSE-MIT) or http://opensource.org/licenses/MIT - MIT license: [LICENSE-MIT](http://opensource.org/licenses/MIT)
or or
- Apache License, Version 2.0, ([LICENSE-APACHE-v2](https://github.com/waku-org/js-waku/blob/master/LICENSE-APACHE-v2) or http://www.apache.org/licenses/LICENSE-2.0) - Apache License, Version 2.0, [LICENSE-APACHE-v2](http://www.apache.org/licenses/LICENSE-2.0)
at your option. These files may not be copied, modified, or distributed except according to those terms. at your option. These files may not be copied, modified, or distributed except according to those terms.

View File

@ -0,0 +1,31 @@
port: 4400
n_hosts: 2
timeout: 30
# ConsensusConfig related parameters
security_param: 10
active_slot_coeff: 0.9
# DaConfig related parameters
subnetwork_size: 2
dispersal_factor: 2
num_samples: 1
num_subnets: 2
old_blobs_check_interval_secs: 5
blobs_validity_duration_secs: 60
global_params_path: "/kzgrs_test_params"
# Tracing
tracing_settings:
logger: Stdout
tracing: !Otlp
endpoint: http://tempo:4317/
sample_ratio: 0.5
service_name: node
filter: !EnvFilter
filters:
nomos: debug
metrics: !Otlp
endpoint: http://prometheus:9090/api/v1/otlp/v1/metrics
host_identifier: node
level: INFO

View File

@ -0,0 +1,5 @@
#!/bin/sh
set -e
exec /usr/bin/cfgsync-server /etc/nomos/cfgsync.yaml

View File

@ -0,0 +1,14 @@
#!/bin/sh
set -e
export CFG_FILE_PATH="/config.yaml" \
CFG_SERVER_ADDR="http://cfgsync:4400" \
CFG_HOST_IP=$(hostname -i) \
CFG_HOST_KIND="executor" \
CFG_HOST_IDENTIFIER="executor-$(hostname -i)" \
LOG_LEVEL="INFO" \
RISC0_DEV_MODE=true
/usr/bin/cfgsync-client && \
exec /usr/bin/nomos-executor /config.yaml

View File

@ -0,0 +1,13 @@
#!/bin/sh
set -e
export CFG_FILE_PATH="/config.yaml" \
CFG_SERVER_ADDR="http://cfgsync:4400" \
CFG_HOST_IP=$(hostname -i) \
CFG_HOST_IDENTIFIER="validator-$(hostname -i)" \
LOG_LEVEL="INFO" \
RISC0_DEV_MODE=true
/usr/bin/cfgsync-client && \
exec /usr/bin/nomos-node /config.yaml

View File

@ -31,7 +31,7 @@ pytest-xdist==3.5.0
python-dotenv==1.0.1 python-dotenv==1.0.1
pytest-dependency==0.6.0 pytest-dependency==0.6.0
PyYAML==6.0.1 PyYAML==6.0.1
requests==2.32.0 requests==2.31.0
setuptools==70.0.0 setuptools==70.0.0
tenacity==8.2.3 tenacity==8.2.3
typeguard==4.1.5 typeguard==4.1.5

3
src/data_storage.py Normal file
View File

@ -0,0 +1,3 @@
# We use this class for global variables
class DS:
nomos_nodes = []

View File

@ -15,10 +15,15 @@ def get_env_var(var_name, default=None):
# Configuration constants. Need to be upercase to appear in reports # Configuration constants. Need to be upercase to appear in reports
DEFAULT_NOMOS = "nomos:latest" NOMOS = "nomos"
NODE_1 = get_env_var("NODE_1", DEFAULT_NOMOS) NOMOS_EXECUTOR = "nomos_executor"
NODE_2 = get_env_var("NODE_2", DEFAULT_NOMOS) CFGSYNC = "cfgsync"
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NOMOS},{DEFAULT_NOMOS}")
NODE_1 = get_env_var("NODE_1", NOMOS)
NODE_2 = get_env_var("NODE_2", NOMOS_EXECUTOR)
NODE_3 = get_env_var("NODE_3", CFGSYNC)
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{NOMOS},{NOMOS}")
# more nodes need to follow the NODE_X pattern # more nodes need to follow the NODE_X pattern
DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker") DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker")
NETWORK_NAME = get_env_var("NETWORK_NAME", "nomos") NETWORK_NAME = get_env_var("NETWORK_NAME", "nomos")

View File

@ -20,6 +20,6 @@ class REST(BaseClient):
headers = {"accept": "text/plain"} headers = {"accept": "text/plain"}
return self.make_request(method, url, headers=headers, data=payload) return self.make_request(method, url, headers=headers, data=payload)
def status(self): def info(self):
info_response = self.rest_call("get", "cl/status") status_response = self.rest_call("get", "cryptarchia/info")
return info_response.json() return status_response.json()

View File

@ -35,7 +35,7 @@ class DockerManager:
logger.debug(f"Network {network_name} created") logger.debug(f"Network {network_name} created")
return network return network
def start_container(self, image_name, ports, args, log_path, container_ip, volumes, remove_container=True): def start_container(self, image_name, port_bindings, args, log_path, volumes, entrypoint, remove_container=True, name=None):
cli_args = [] cli_args = []
for key, value in args.items(): for key, value in args.items():
if isinstance(value, list): # Check if value is a list if isinstance(value, list): # Check if value is a list
@ -45,18 +45,21 @@ class DockerManager:
else: else:
cli_args.append(f"--{key}={value}") # Add a single command cli_args.append(f"--{key}={value}") # Add a single command
port_bindings = {f"{port}/tcp": ("", port) for port in ports}
port_bindings_for_log = " ".join(f"-p {port}:{port}" for port in ports)
cli_args_str_for_log = " ".join(cli_args) cli_args_str_for_log = " ".join(cli_args)
logger.debug(f"docker run -i -t {port_bindings_for_log} {image_name} {cli_args_str_for_log}") logger.debug(f"docker run -i -t {port_bindings} {image_name} {cli_args_str_for_log}")
container = self._client.containers.run( container = self._client.containers.run(
image_name, command=cli_args, ports=port_bindings, detach=True, remove=remove_container, auto_remove=remove_container, volumes=volumes image_name,
command=cli_args,
ports=port_bindings,
detach=True,
remove=remove_container,
auto_remove=remove_container,
volumes=volumes,
entrypoint=entrypoint,
name=name,
network=NETWORK_NAME,
) )
network = self._client.networks.get(NETWORK_NAME)
logger.debug(f"docker network connect --ip {container_ip} {NETWORK_NAME} {container.id}")
network.connect(container, ipv4_address=container_ip)
logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}") logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}")
log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path)) log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path))
log_thread.daemon = True log_thread.daemon = True
@ -105,7 +108,7 @@ class DockerManager:
@staticmethod @staticmethod
def generate_random_ext_ip(): def generate_random_ext_ip():
base_ip_fragments = ["172", "18"] base_ip_fragments = ["172", "19"]
ext_ip = ".".join(base_ip_fragments + [str(random.randint(0, 255)) for _ in range(2)]) ext_ip = ".".join(base_ip_fragments + [str(random.randint(0, 255)) for _ in range(2)])
logger.debug(f"Generated random external IP {ext_ip}") logger.debug(f"Generated random external IP {ext_ip}")
return ext_ip return ext_ip

20
src/node/node_vars.py Normal file
View File

@ -0,0 +1,20 @@
nomos_nodes = {
"nomos": {
"image": "nomos:latest",
"volumes": ["cluster_config:/etc/nomos", "./kzgrs/kzgrs_test_params:/kzgrs_test_params:z"],
"ports": ["3000/udp", "18080/tcp"],
"entrypoint": "/etc/nomos/scripts/run_nomos_node.sh",
},
"nomos_executor": {
"image": "nomos:latest",
"volumes": ["cluster_config:/etc/nomos", "./kzgrs/kzgrs_test_params:/kzgrs_test_params:z"],
"ports": ["3000/udp", "18080/tcp"],
"entrypoint": "/etc/nomos/scripts/run_nomos_executor.sh",
},
"cfgsync": {
"image": "nomos:latest",
"volumes": ["cluster_config:/etc/nomos"],
"ports": "",
"entrypoint": "/etc/nomos/scripts/run_cfgsync.sh",
},
}

View File

@ -1,10 +1,14 @@
import os import os
from src.data_storage import DS
from src.libs.custom_logger import get_custom_logger from src.libs.custom_logger import get_custom_logger
from tenacity import retry, stop_after_delay, wait_fixed from tenacity import retry, stop_after_delay, wait_fixed
from src.node.api_clients.rest import REST from src.node.api_clients.rest import REST
from src.node.docker_mananger import DockerManager from src.node.docker_mananger import DockerManager
from src.env_vars import DOCKER_LOG_DIR from src.env_vars import DOCKER_LOG_DIR
from src.node.node_vars import nomos_nodes
from src.test_data import LOG_ERROR_KEYWORDS
logger = get_custom_logger(__name__) logger = get_custom_logger(__name__)
@ -19,13 +23,125 @@ def sanitize_docker_flags(input_flags):
class NomosNode: class NomosNode:
def __init__(self, docker_image, docker_log_prefix=""): def __init__(self, node_type, container_name=""):
self._image_name = docker_image logger.debug(f"Node is going to be initialized with this config {nomos_nodes[node_type]}")
self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log") self._image_name = nomos_nodes[node_type]["image"]
self._internal_ports = nomos_nodes[node_type]["ports"]
self._volumes = nomos_nodes[node_type]["volumes"]
self._entrypoint = nomos_nodes[node_type]["entrypoint"]
self._log_path = os.path.join(DOCKER_LOG_DIR, f"{container_name}__{self._image_name.replace('/', '_')}.log")
self._docker_manager = DockerManager(self._image_name) self._docker_manager = DockerManager(self._image_name)
self._container_name = container_name
self._container = None self._container = None
cwd = os.getcwd()
for i, volume in enumerate(self._volumes):
self._volumes[i] = cwd + "/" + volume
logger.debug(f"NomosNode instance initialized with log path {self._log_path}") logger.debug(f"NomosNode instance initialized with log path {self._log_path}")
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True) @retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
def start(self, wait_for_node_sec=20, **kwargs): def start(self, wait_for_node_sec=120, **kwargs):
logger.debug("Starting Node...") logger.debug("Starting Node...")
self._docker_manager.create_network()
self._ext_ip = self._docker_manager.generate_random_ext_ip()
number_of_ports = len(self._internal_ports)
self._port_map = {}
if number_of_ports > 0:
self._external_ports = self._docker_manager.generate_ports(count=number_of_ports)
self._udp_port = self._external_ports[0]
self._tcp_port = self._external_ports[1]
self._api = REST(self._tcp_port)
logger.debug(f"Internal ports {self._internal_ports}")
for i, port in enumerate(self._internal_ports):
self._port_map[port] = int(self._external_ports[i])
default_args = {}
logger.debug(f"Using volumes {self._volumes}")
logger.debug(f"Port map {self._port_map}")
self._container = self._docker_manager.start_container(
self._docker_manager.image,
port_bindings=self._port_map,
args=default_args,
log_path=self._log_path,
volumes=self._volumes,
entrypoint=self._entrypoint,
remove_container=True,
name=self._container_name,
)
logger.debug(f"Started container from image {self._image_name}. " f"REST: {getattr(self, '_tcp_port', 'N/A')}")
DS.nomos_nodes.append(self)
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def stop(self):
if self._container:
logger.debug(f"Stopping container with id {self._container.short_id}")
self._container.stop()
try:
self._container.remove()
except:
pass
self._container = None
logger.debug("Container stopped.")
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
def kill(self):
if self._container:
logger.debug(f"Killing container with id {self._container.short_id}")
self._container.kill()
try:
self._container.remove()
except:
pass
self._container = None
logger.debug("Container killed.")
def restart(self):
if self._container:
logger.debug(f"Restarting container with id {self._container.short_id}")
self._container.restart()
def pause(self):
if self._container:
logger.debug(f"Pausing container with id {self._container.short_id}")
self._container.pause()
def unpause(self):
if self._container:
logger.debug(f"Unpause container with id {self._container.short_id}")
self._container.unpause()
def ensure_ready(self, timeout_duration=10):
@retry(stop=stop_after_delay(timeout_duration), wait=wait_fixed(0.1), reraise=True)
def check_ready(node=self):
node.info_response = node.info()
logger.info("REST service is ready !!")
if self.is_nomos():
check_ready()
def is_nomos(self):
return "nomos" in self._container_name
def info(self):
return self._api.info()
def check_nomos_log_errors(self, whitelist=None):
keywords = LOG_ERROR_KEYWORDS
# If a whitelist is provided, remove those keywords from the keywords list
if whitelist:
keywords = [keyword for keyword in keywords if keyword not in whitelist]
matches = self._docker_manager.search_log_for_keywords(self._log_path, keywords, False)
assert not matches, f"Found errors {matches}"

26
src/test_data.py Normal file
View File

@ -0,0 +1,26 @@
from time import time
from datetime import datetime, timedelta
LOG_ERROR_KEYWORDS = [
"crash",
"fatal",
"panic",
"abort",
"segfault",
"corrupt",
"terminated",
"unhandled",
"stacktrace",
"deadlock",
"SIGSEGV",
"SIGABRT",
"stack overflow",
"index out of bounds",
"nil pointer dereference",
"goroutine exit",
"nil pointer",
"runtime error",
"goexit",
"race condition",
"double free",
]

92
tests/conftest.py Normal file
View File

@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
import inspect
import glob
from src.libs.custom_logger import get_custom_logger
import os
import pytest
from datetime import datetime
from time import time
from uuid import uuid4
from src.libs.common import attach_allure_file
import src.env_vars as env_vars
from src.data_storage import DS
logger = get_custom_logger(__name__)
# See https://docs.pytest.org/en/latest/example/simple.html#making-test-result-information-available-in-fixtures
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(item):
outcome = yield
rep = outcome.get_result()
if rep.when == "call":
setattr(item, "rep_call", rep)
return rep
return None
@pytest.fixture(scope="session", autouse=True)
def set_allure_env_variables():
yield
if os.path.isdir("allure-results") and not os.path.isfile(os.path.join("allure-results", "environment.properties")):
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
with open(os.path.join("allure-results", "environment.properties"), "w") as outfile:
for attribute_name in dir(env_vars):
if attribute_name.isupper():
attribute_value = getattr(env_vars, attribute_name)
outfile.write(f"{attribute_name}={attribute_value}\n")
@pytest.fixture(scope="function", autouse=True)
def test_id(request):
# setting up an unique test id to be used where needed
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
request.cls.test_id = f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}__{str(uuid4())}"
@pytest.fixture(scope="function", autouse=True)
def test_setup(request, test_id):
logger.debug(f"Running test: {request.node.name} with id: {request.cls.test_id}")
yield
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*")):
if os.path.getmtime(file) < time() - 3600:
logger.debug(f"Deleting old log file: {file}")
try:
os.remove(file)
except:
logger.error("Could not delete file")
@pytest.fixture(scope="function", autouse=True)
def attach_logs_on_fail(request):
yield
if env_vars.RUNNING_IN_CI and hasattr(request.node, "rep_call") and request.node.rep_call.failed:
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
logger.debug("Test failed, attempting to attach logs to the allure reports")
for file in glob.glob(os.path.join(env_vars.DOCKER_LOG_DIR, "*" + request.cls.test_id + "*")):
attach_allure_file(file)
@pytest.fixture(scope="function", autouse=True)
def close_open_nodes(attach_logs_on_fail):
DS.nomos_nodes = []
yield
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
crashed_containers = []
for node in DS.nomos_nodes:
try:
node.stop()
except Exception as ex:
if "No such container" in str(ex):
crashed_containers.append(node.image)
logger.error(f"Failed to stop container because of error {ex}")
assert not crashed_containers, f"Containers {crashed_containers} crashed during the test!!!"
@pytest.fixture(scope="function", autouse=True)
def check_nomos_log_errors():
yield
logger.debug(f"Running fixture teardown: {inspect.currentframe().f_code.co_name}")
for node in DS.nomos_nodes:
node.check_nomos_log_errors()

View File

0
tests/e2e/__init__.py Normal file
View File

View File

@ -0,0 +1,24 @@
from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR
from src.libs.custom_logger import get_custom_logger
from src.node.nomos_node import NomosNode
logger = get_custom_logger(__name__)
class Test2NodeClAlive:
def test_cluster_start(self):
self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0")
self.node3 = NomosNode(NOMOS_EXECUTOR, "nomos_node_1")
self.node1.start()
self.node2.start()
self.node3.start()
try:
self.node2.ensure_ready()
self.node3.ensure_ready()
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise