Basic project structure

This commit is contained in:
Roman 2024-12-13 15:58:25 +08:00
parent b8d896aa16
commit 460e2c3a7a
No known key found for this signature in database
GPG Key ID: B8FE070B54E11B75
16 changed files with 343 additions and 2 deletions

View File

@ -1,11 +1,11 @@
repos:
- repo: https://github.com/psf/black
rev: 23.7.0
rev: 24.10.0
hooks:
- id: black
args: [--line-length=150]
- repo: https://github.com/RobertCraigie/pyright-python
rev: v1.1.326
rev: v1.1.390
hooks:
- id: pyright

0
src/__init__.py Normal file
View File

29
src/env_vars.py Normal file
View File

@ -0,0 +1,29 @@
import os
from dotenv import load_dotenv
load_dotenv() # This will load environment variables from a .env file if it exists
def get_env_var(var_name, default=None):
env_var = os.getenv(var_name, default)
if env_var in [None, ""]:
print(f"{var_name} is not set; using default value: {default}")
env_var = default
print(f"{var_name}: {env_var}")
return env_var
# Configuration constants. Need to be upercase to appear in reports
DEFAULT_NOMOS = "nomos:latest"
NODE_1 = get_env_var("NODE_1", DEFAULT_NOMOS)
NODE_2 = get_env_var("NODE_2", DEFAULT_NOMOS)
ADDITIONAL_NODES = get_env_var("ADDITIONAL_NODES", f"{DEFAULT_NOMOS},{DEFAULT_NOMOS}")
# more nodes need to follow the NODE_X pattern
DOCKER_LOG_DIR = get_env_var("DOCKER_LOG_DIR", "./log/docker")
NETWORK_NAME = get_env_var("NETWORK_NAME", "nomos")
SUBNET = get_env_var("SUBNET", "172.19.0.0/16")
IP_RANGE = get_env_var("IP_RANGE", "172.19.0.0/24")
GATEWAY = get_env_var("GATEWAY", "172.19.0.1")
RUNNING_IN_CI = get_env_var("CI")
API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 20)

0
src/libs/__init__.py Normal file
View File

22
src/libs/common.py Normal file
View File

@ -0,0 +1,22 @@
import uuid
from datetime import datetime
from time import sleep
from src.libs.custom_logger import get_custom_logger
import os
import allure
logger = get_custom_logger(__name__)
def attach_allure_file(file):
logger.debug(f"Attaching file {file}")
allure.attach.file(file, name=os.path.basename(file), attachment_type=allure.attachment_type.TEXT)
def delay(num_seconds):
logger.debug(f"Sleeping for {num_seconds} seconds")
sleep(num_seconds)
def gen_step_id():
return f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}__{str(uuid.uuid4())}"

24
src/libs/custom_logger.py Normal file
View File

@ -0,0 +1,24 @@
import logging
max_log_line_length = 5000
def log_length_filter(max_length):
class logLengthFilter(logging.Filter):
def filter(self, record):
if len(record.getMessage()) > max_length:
logging.getLogger(record.name).log(
record.levelno, f"Log line was discarded because it's longer than max_log_line_length={max_log_line_length}"
)
return False
return True
return logLengthFilter()
def get_custom_logger(name):
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("docker").setLevel(logging.WARNING)
logger = logging.getLogger(name)
logger.addFilter(log_length_filter(max_log_line_length))
return logger

0
src/node/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,37 @@
import json
import requests
from src.env_vars import API_REQUEST_TIMEOUT
from src.libs.custom_logger import get_custom_logger
logger = get_custom_logger(__name__)
class BaseClient:
def make_request(self, method, url, headers=None, data=None):
self.log_request_as_curl(method, url, headers, data)
response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT)
try:
response.raise_for_status()
except requests.HTTPError as http_err:
logger.error(f"HTTP error occurred: {http_err}. Response content: {response.content}")
raise Exception(f"Error: {http_err} with response: {response.content}")
except Exception as err:
logger.error(f"An error occurred: {err}. Response content: {response.content}")
raise Exception(f"Error: {err} with response: {response.content}")
else:
logger.info(f"Response status code: {response.status_code}. Response content: {response.content}")
return response
def log_request_as_curl(self, method, url, headers, data):
if data:
try:
data_dict = json.loads(data)
if "timestamp" in data_dict:
data_dict["timestamp"] = "TIMESTAMP_PLACEHOLDER"
data = json.dumps(data_dict)
data = data.replace('"TIMESTAMP_PLACEHOLDER"', "'$(date +%s%N)'")
except json.JSONDecodeError:
logger.error("Invalid JSON data provided")
headers_str_for_log = " ".join([f'-H "{key}: {value}"' for key, value in headers.items()]) if headers else ""
curl_cmd = f"curl -v -X {method.upper()} \"{url}\" {headers_str_for_log} -d '{data}'"
logger.info(curl_cmd)

View File

@ -0,0 +1,25 @@
from src.libs.custom_logger import get_custom_logger
import json
from urllib.parse import quote
from src.node.api_clients.base_client import BaseClient
logger = get_custom_logger(__name__)
class REST(BaseClient):
def __init__(self, rest_port):
self._rest_port = rest_port
def rest_call(self, method, endpoint, payload=None):
url = f"http://127.0.0.1:{self._rest_port}/{endpoint}"
headers = {"Content-Type": "application/json"}
return self.make_request(method, url, headers=headers, data=payload)
def rest_call_text(self, method, endpoint, payload=None):
url = f"http://127.0.0.1:{self._rest_port}/{endpoint}"
headers = {"accept": "text/plain"}
return self.make_request(method, url, headers=headers, data=payload)
def status(self):
info_response = self.rest_call("get", "cl/status")
return info_response.json()

147
src/node/docker_mananger.py Normal file
View File

@ -0,0 +1,147 @@
import os
import re
import time
import docker
from src.libs.custom_logger import get_custom_logger
import random
import threading
from src.env_vars import NETWORK_NAME, SUBNET, IP_RANGE, GATEWAY
from docker.types import IPAMConfig, IPAMPool
from docker.errors import NotFound, APIError
logger = get_custom_logger(__name__)
class DockerManager:
def __init__(self, image):
self._image = image
self._client = docker.from_env()
logger.debug(f"Docker client initialized with image {self._image}")
def create_network(self, network_name=NETWORK_NAME):
logger.debug(f"Attempting to create or retrieve network {network_name}")
networks = self._client.networks.list(names=[network_name])
if networks:
logger.debug(f"Network {network_name} already exists")
return networks[0]
network = self._client.networks.create(
network_name,
driver="bridge",
ipam=IPAMConfig(driver="default", pool_configs=[IPAMPool(subnet=SUBNET, iprange=IP_RANGE, gateway=GATEWAY)]),
)
logger.debug(f"Network {network_name} created")
return network
def start_container(self, image_name, ports, args, log_path, container_ip, volumes, remove_container=True):
cli_args = []
for key, value in args.items():
if isinstance(value, list): # Check if value is a list
cli_args.extend([f"--{key}={item}" for item in value]) # Add a command for each item in the list
elif value is None:
cli_args.append(f"{key}") # Add simple command as it is passed in the key
else:
cli_args.append(f"--{key}={value}") # Add a single command
port_bindings = {f"{port}/tcp": ("", port) for port in ports}
port_bindings_for_log = " ".join(f"-p {port}:{port}" for port in ports)
cli_args_str_for_log = " ".join(cli_args)
logger.debug(f"docker run -i -t {port_bindings_for_log} {image_name} {cli_args_str_for_log}")
container = self._client.containers.run(
image_name, command=cli_args, ports=port_bindings, detach=True, remove=remove_container, auto_remove=remove_container, volumes=volumes
)
network = self._client.networks.get(NETWORK_NAME)
logger.debug(f"docker network connect --ip {container_ip} {NETWORK_NAME} {container.id}")
network.connect(container, ipv4_address=container_ip)
logger.debug(f"Container started with ID {container.short_id}. Setting up logs at {log_path}")
log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path))
log_thread.daemon = True
log_thread.start()
return container
def _log_container_output(self, container, log_path):
os.makedirs(os.path.dirname(log_path), exist_ok=True)
retry_count = 0
start_time = time.time()
try:
with open(log_path, "wb+") as log_file:
while True:
if container.status in ["exited", "dead"]:
logger.info(f"Container {container.short_id} has stopped. Exiting log stream.")
return
try:
for chunk in container.logs(stream=True):
if chunk:
log_file.write(chunk)
log_file.flush()
start_time = time.time()
retry_count = 0
else:
if time.time() - start_time > 5:
logger.warning(f"Log stream timeout for container {container.short_id}")
return
except (APIError, IOError) as e:
retry_count += 1
if retry_count >= 5:
logger.error(f"Max retries reached for container {container.short_id}. Exiting log stream.")
return
time.sleep(0.2)
except Exception as e:
return
except Exception as e:
logger.error(f"Failed to set up logging for container {container.short_id}: {e}")
def generate_ports(self, base_port=None, count=5):
if base_port is None:
base_port = random.randint(1024, 65535 - count)
ports = [str(base_port + i) for i in range(count)]
logger.debug(f"Generated ports {ports}")
return ports
@staticmethod
def generate_random_ext_ip():
base_ip_fragments = ["172", "18"]
ext_ip = ".".join(base_ip_fragments + [str(random.randint(0, 255)) for _ in range(2)])
logger.debug(f"Generated random external IP {ext_ip}")
return ext_ip
def is_container_running(self, container):
try:
refreshed_container = self._client.containers.get(container.id)
return refreshed_container.status == "running"
except NotFound:
logger.error(f"Container with ID {container.id} not found")
return False
@property
def image(self):
return self._image
def search_log_for_keywords(self, log_path, keywords, use_regex=False):
matches = {keyword: [] for keyword in keywords}
# Open the log file and search line by line
with open(log_path, "r") as log_file:
for line in log_file:
for keyword in keywords:
if use_regex:
if re.search(keyword, line, re.IGNORECASE):
matches[keyword].append(line.strip())
else:
if keyword.lower() in line.lower():
matches[keyword].append(line.strip())
# Check if there were any matches
if any(matches[keyword] for keyword in keywords):
for keyword, lines in matches.items():
if lines:
logger.debug(f"Found matches for keyword '{keyword}': {lines}")
return matches
else:
logger.debug("No errors found in the nomos logs.")
return None

31
src/node/nomos_node.py Normal file
View File

@ -0,0 +1,31 @@
import os
from src.libs.custom_logger import get_custom_logger
from tenacity import retry, stop_after_delay, wait_fixed
from src.node.api_clients.rest import REST
from src.node.docker_mananger import DockerManager
from src.env_vars import DOCKER_LOG_DIR
logger = get_custom_logger(__name__)
def sanitize_docker_flags(input_flags):
output_flags = {}
for key, value in input_flags.items():
key = key.replace("_", "-")
output_flags[key] = value
return output_flags
class NomosNode:
def __init__(self, docker_image, docker_log_prefix=""):
self._image_name = docker_image
self._log_path = os.path.join(DOCKER_LOG_DIR, f"{docker_log_prefix}__{self._image_name.replace('/', '_')}.log")
self._docker_manager = DockerManager(self._image_name)
self._container = None
logger.debug(f"NomosNode instance initialized with log path {self._log_path}")
@retry(stop=stop_after_delay(60), wait=wait_fixed(0.1), reraise=True)
def start(self, wait_for_node_sec=20, **kwargs):
logger.debug("Starting Node...")

0
src/steps/__init__.py Normal file
View File

20
src/steps/common.py Normal file
View File

@ -0,0 +1,20 @@
import inspect
import pytest
from src.env_vars import NODE_1, NODE_2
from src.libs.custom_logger import get_custom_logger
from src.node.nomos_node import NomosNode
logger = get_custom_logger(__name__)
class StepsCommon:
@pytest.fixture(scope="function")
def setup_main_nodes(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.node1 = NomosNode(NODE_1, f"node1_{request.cls.test_id}")
self.node1.start()
self.node2 = NomosNode(NODE_2, f"node2_{request.cls.test_id}")
self.node2.start()
self.main_nodes.extend([self.node1, self.node2])

0
tests/__init__.py Normal file
View File

View File

@ -0,0 +1,6 @@
class TestDataIntegrity:
main_nodes = []
def test_cluster_start(self):
for node in self.main_nodes:
print(node)