diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a525861..9f50b4f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,11 +1,11 @@ repos: - repo: https://github.com/psf/black - rev: 23.7.0 + rev: 24.10.0 hooks: - id: black args: [--line-length=150] - repo: https://github.com/RobertCraigie/pyright-python - rev: v1.1.326 + rev: v1.1.390 hooks: - id: pyright diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/env_vars.py b/src/env_vars.py new file mode 100644 index 0000000..bf4e97a --- /dev/null +++ b/src/env_vars.py @@ -0,0 +1,29 @@ +import os + +from dotenv import load_dotenv + +load_dotenv() # This will load environment variables from a .env file if it exists + + +def get_env_var(var_name, default=None): + env_var = os.getenv(var_name, default) + if env_var in [None, ""]: + print(f"{var_name} is not set; using default value: {default}") + env_var = default + print(f"{var_name}: {env_var}") + return env_var + + +# Configuration constants. Need to be upercase to appear in reports +DEFAULT_NOMOS = "nomos:latest" +NODE_1 = get_env_var("NODE_1", DEFAULT_NOMOS) +NODE_2 = get_env_var("NODE_2", DEFAULT_NOMOS) +ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NOMOS},{DEFAULT_NOMOS}") +# more nodes need to follow the NODE_X pattern +DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker") +NETWORK_NAME = get_env_var("NETWORK_NAME", "nomos") +SUBNET = get_env_var("SUBNET", "172.19.0.0/16") +IP_RANGE = get_env_var("IP_RANGE", "172.19.0.0/24") +GATEWAY = get_env_var("GATEWAY", "172.19.0.1") +RUNNING_IN_CI = get_env_var("CI") +API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 20) diff --git a/src/libs/__init__.py b/src/libs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/libs/common.py b/src/libs/common.py new file mode 100644 index 0000000..939a66c --- /dev/null +++ b/src/libs/common.py @@ -0,0 +1,22 @@ +import uuid +from datetime import datetime +from time import sleep +from src.libs.custom_logger import get_custom_logger +import os +import allure + +logger = get_custom_logger(__name__) + + +def attach_allure_file(file): + logger.debug(f"Attaching file {file}") + allure.attach.file(file, name=os.path.basename(file), attachment_type=allure.attachment_type.TEXT) + + +def delay(num_seconds): + logger.debug(f"Sleeping for {num_seconds} seconds") + sleep(num_seconds) + + +def gen_step_id(): + return f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}__{str(uuid.uuid4())}" diff --git a/src/libs/custom_logger.py b/src/libs/custom_logger.py new file mode 100644 index 0000000..989548c --- /dev/null +++ b/src/libs/custom_logger.py @@ -0,0 +1,24 @@ +import logging + +max_log_line_length = 5000 + + +def log_length_filter(max_length): + class logLengthFilter(logging.Filter): + def filter(self, record): + if len(record.getMessage()) > max_length: + logging.getLogger(record.name).log( + record.levelno, f"Log line was discarded because it's longer than max_log_line_length={max_log_line_length}" + ) + return False + return True + + return logLengthFilter() + + +def get_custom_logger(name): + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("docker").setLevel(logging.WARNING) + logger = logging.getLogger(name) + logger.addFilter(log_length_filter(max_log_line_length)) + return logger diff --git a/src/node/__init__.py b/src/node/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/node/api_clients/__init__.py b/src/node/api_clients/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/node/api_clients/base_client.py b/src/node/api_clients/base_client.py new file mode 100644 index 0000000..b731e0f --- /dev/null +++ b/src/node/api_clients/base_client.py @@ -0,0 +1,37 @@ +import json +import requests +from src.env_vars import API_REQUEST_TIMEOUT +from src.libs.custom_logger import get_custom_logger + +logger = get_custom_logger(__name__) + + +class BaseClient: + def make_request(self, method, url, headers=None, data=None): + self.log_request_as_curl(method, url, headers, data) + response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT) + try: + response.raise_for_status() + except requests.HTTPError as http_err: + logger.error(f"HTTP error occurred: {http_err}. Response content: {response.content}") + raise Exception(f"Error: {http_err} with response: {response.content}") + except Exception as err: + logger.error(f"An error occurred: {err}. Response content: {response.content}") + raise Exception(f"Error: {err} with response: {response.content}") + else: + logger.info(f"Response status code: {response.status_code}. Response content: {response.content}") + return response + + def log_request_as_curl(self, method, url, headers, data): + if data: + try: + data_dict = json.loads(data) + if "timestamp" in data_dict: + data_dict["timestamp"] = "TIMESTAMP_PLACEHOLDER" + data = json.dumps(data_dict) + data = data.replace('"TIMESTAMP_PLACEHOLDER"', "'$(date +%s%N)'") + except json.JSONDecodeError: + logger.error("Invalid JSON data provided") + headers_str_for_log = " ".join([f'-H "{key}: {value}"' for key, value in headers.items()]) if headers else "" + curl_cmd = f"curl -v -X {method.upper()} \"{url}\" {headers_str_for_log} -d '{data}'" + logger.info(curl_cmd) diff --git a/src/node/api_clients/rest.py b/src/node/api_clients/rest.py new file mode 100644 index 0000000..c2197da --- /dev/null +++ b/src/node/api_clients/rest.py @@ -0,0 +1,25 @@ +from src.libs.custom_logger import get_custom_logger +import json +from urllib.parse import quote +from src.node.api_clients.base_client import BaseClient + +logger = get_custom_logger(__name__) + + +class REST(BaseClient): + def __init__(self, rest_port): + self._rest_port = rest_port + + def rest_call(self, method, endpoint, payload=None): + url = f"http://127.0.0.1:{self._rest_port}/{endpoint}" + headers = {"Content-Type": "application/json"} + return self.make_request(method, url, headers=headers, data=payload) + + def rest_call_text(self, method, endpoint, payload=None): + url = f"http://127.0.0.1:{self._rest_port}/{endpoint}" + headers = {"accept": "text/plain"} + return self.make_request(method, url, headers=headers, data=payload) + + def status(self): + info_response = self.rest_call("get", "cl/status") + return info_response.json() diff --git a/src/node/docker_mananger.py b/src/node/docker_mananger.py new file mode 100644 index 0000000..6318e8c --- /dev/null +++ b/src/node/docker_mananger.py @@ -0,0 +1,147 @@ +import os +import re +import time + +import docker + +from src.libs.custom_logger import get_custom_logger +import random +import threading +from src.env_vars import NETWORK_NAME, SUBNET, IP_RANGE, GATEWAY +from docker.types import IPAMConfig, IPAMPool +from docker.errors import NotFound, APIError + +logger = get_custom_logger(__name__) + + +class DockerManager: + def __init__(self, image): + self._image = image + self._client = docker.from_env() + logger.debug(f"Docker client initialized with image {self._image}") + + def create_network(self, network_name=NETWORK_NAME): + logger.debug(f"Attempting to create or retrieve network {network_name}") + networks = self._client.networks.list(names=[network_name]) + if networks: + logger.debug(f"Network {network_name} already exists") + return networks[0] + + network = self._client.networks.create( + network_name, + driver="bridge", + ipam=IPAMConfig(driver="default", pool_configs=[IPAMPool(subnet=SUBNET, iprange=IP_RANGE, gateway=GATEWAY)]), + ) + logger.debug(f"Network {network_name} created") + return network + + def start_container(self, image_name, ports, args, log_path, container_ip, volumes, remove_container=True): + cli_args = [] + for key, value in args.items(): + if isinstance(value, list): # Check if value is a list + cli_args.extend([f"--{key}={item}" for item in value]) # Add a command for each item in the list + elif value is None: + cli_args.append(f"{key}") # Add simple command as it is passed in the key + else: + cli_args.append(f"--{key}={value}") # Add a single command + + port_bindings = {f"{port}/tcp": ("", port) for port in ports} + port_bindings_for_log = " ".join(f"-p {port}:{port}" for port in ports) + cli_args_str_for_log = " ".join(cli_args) + logger.debug(f"docker run -i -t {port_bindings_for_log} {image_name} {cli_args_str_for_log}") + container = self._client.containers.run( + image_name, command=cli_args, ports=port_bindings, detach=True, remove=remove_container, auto_remove=remove_container, volumes=volumes + ) + + network = self._client.networks.get(NETWORK_NAME) + logger.debug(f"docker network connect --ip {container_ip} {NETWORK_NAME} {container.id}") + network.connect(container, ipv4_address=container_ip) + + logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}") + log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path)) + log_thread.daemon = True + log_thread.start() + + return container + + def _log_container_output(self, container, log_path): + os.makedirs(os.path.dirname(log_path), exist_ok=True) + retry_count = 0 + start_time = time.time() + try: + with open(log_path, "wb+") as log_file: + while True: + if container.status in ["exited", "dead"]: + logger.info(f"Container {container.short_id} has stopped. Exiting log stream.") + return + try: + for chunk in container.logs(stream=True): + if chunk: + log_file.write(chunk) + log_file.flush() + start_time = time.time() + retry_count = 0 + else: + if time.time() - start_time > 5: + logger.warning(f"Log stream timeout for container {container.short_id}") + return + except (APIError, IOError) as e: + retry_count += 1 + if retry_count >= 5: + logger.error(f"Max retries reached for container {container.short_id}. Exiting log stream.") + return + time.sleep(0.2) + except Exception as e: + return + except Exception as e: + logger.error(f"Failed to set up logging for container {container.short_id}: {e}") + + def generate_ports(self, base_port=None, count=5): + if base_port is None: + base_port = random.randint(1024, 65535 - count) + ports = [str(base_port + i) for i in range(count)] + logger.debug(f"Generated ports {ports}") + return ports + + @staticmethod + def generate_random_ext_ip(): + base_ip_fragments = ["172", "18"] + ext_ip = ".".join(base_ip_fragments + [str(random.randint(0, 255)) for _ in range(2)]) + logger.debug(f"Generated random external IP {ext_ip}") + return ext_ip + + def is_container_running(self, container): + try: + refreshed_container = self._client.containers.get(container.id) + return refreshed_container.status == "running" + except NotFound: + logger.error(f"Container with ID {container.id} not found") + return False + + @property + def image(self): + return self._image + + def search_log_for_keywords(self, log_path, keywords, use_regex=False): + matches = {keyword: [] for keyword in keywords} + + # Open the log file and search line by line + with open(log_path, "r") as log_file: + for line in log_file: + for keyword in keywords: + if use_regex: + if re.search(keyword, line, re.IGNORECASE): + matches[keyword].append(line.strip()) + else: + if keyword.lower() in line.lower(): + matches[keyword].append(line.strip()) + + # Check if there were any matches + if any(matches[keyword] for keyword in keywords): + for keyword, lines in matches.items(): + if lines: + logger.debug(f"Found matches for keyword '{keyword}': {lines}") + return matches + else: + logger.debug("No errors found in the nomos logs.") + return None diff --git a/src/node/nomos_node.py b/src/node/nomos_node.py new file mode 100644 index 0000000..246106d --- /dev/null +++ b/src/node/nomos_node.py @@ -0,0 +1,31 @@ +import os + +from src.libs.custom_logger import get_custom_logger +from tenacity import retry, stop_after_delay, wait_fixed +from src.node.api_clients.rest import REST +from src.node.docker_mananger import DockerManager +from src.env_vars import DOCKER_LOG_DIR + +logger = get_custom_logger(__name__) + + +def sanitize_docker_flags(input_flags): + output_flags = {} + for key, value in input_flags.items(): + key = key.replace("_", "-") + output_flags[key] = value + + return output_flags + + +class NomosNode: + def __init__(self, docker_image, docker_log_prefix=""): + self._image_name = docker_image + self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log") + self._docker_manager = DockerManager(self._image_name) + self._container = None + logger.debug(f"NomosNode instance initialized with log path {self._log_path}") + + @retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True) + def start(self, wait_for_node_sec=20, **kwargs): + logger.debug("Starting Node...") diff --git a/src/steps/__init__.py b/src/steps/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/steps/common.py b/src/steps/common.py new file mode 100644 index 0000000..91fdea5 --- /dev/null +++ b/src/steps/common.py @@ -0,0 +1,20 @@ +import inspect + +import pytest + +from src.env_vars import NODE_1, NODE_2 +from src.libs.custom_logger import get_custom_logger +from src.node.nomos_node import NomosNode + +logger = get_custom_logger(__name__) + + +class StepsCommon: + @pytest.fixture(scope="function") + def setup_main_nodes(self, request): + logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}") + self.node1 = NomosNode(NODE_1, f"node1_{request.cls.test_id}") + self.node1.start() + self.node2 = NomosNode(NODE_2, f"node2_{request.cls.test_id}") + self.node2.start() + self.main_nodes.extend([self.node1, self.node2]) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/data_integrity/test_data_integrity.py b/tests/data_integrity/test_data_integrity.py new file mode 100644 index 0000000..635559e --- /dev/null +++ b/tests/data_integrity/test_data_integrity.py @@ -0,0 +1,6 @@ +class TestDataIntegrity: + main_nodes = [] + + def test_cluster_start(self): + for node in self.main_nodes: + print(node)