import os import logging import random import threading import docker from src.env_vars import NETWORK_NAME, SUBNET, IP_RANGE, GATEWAY from docker.types import IPAMConfig, IPAMPool from docker.errors import NotFound logger = logging.getLogger(__name__) class DockerManager: def __init__(self, image): self._image = image self._client = docker.from_env() logger.debug(f"Docker client initialized with image {self._image}") def create_network(self, network_name=NETWORK_NAME): logger.debug(f"Attempting to create or retrieve network '{network_name}'.") networks = self._client.networks.list(names=[network_name]) if networks: logger.debug(f"Network '{network_name}' already exists.") return networks[0] network = self._client.networks.create( network_name, driver="bridge", ipam=IPAMConfig(driver="default", pool_configs=[IPAMPool(subnet=SUBNET, iprange=IP_RANGE, gateway=GATEWAY)]), ) logger.debug(f"Network '{network_name}' created.") return network def start_container(self, image_name, ports, args, log_path, container_ip): command = [f"--{key}={value}" for key, value in args.items()] port_bindings = {f"{port}/tcp": ("", port) for port in ports} logger.debug(f"Starting container with image '{image_name}'.") container = self._client.containers.run(image_name, command=command, ports=port_bindings, detach=True, auto_remove=True) network = self._client.networks.get(NETWORK_NAME) network.connect(container, ipv4_address=container_ip) logger.debug(f"Container started with ID {container.short_id}. Setting up logs at '{log_path}'.") log_thread = threading.Thread(target=self._log_container_output, args=(container, log_path)) log_thread.daemon = True log_thread.start() return container def _log_container_output(self, container, log_path): os.makedirs(os.path.dirname(log_path), exist_ok=True) with open(log_path, "wb+") as log_file: for chunk in container.logs(stream=True): log_file.write(chunk) def generate_ports(self, base_port=None, count=4): if base_port is None: base_port = random.randint(1024, 65535 - count) ports = [base_port + i for i in range(count)] logger.debug(f"Generated ports: {ports}") return ports @staticmethod def generate_random_ext_ip(): base_ip_fragments = ["172", "18"] ext_ip = ".".join(base_ip_fragments + [str(random.randint(0, 255)) for _ in range(2)]) logger.debug(f"Generated random external IP: {ext_ip}") return ext_ip def is_container_running(self, container): try: refreshed_container = self._client.containers.get(container.id) return refreshed_container.status == "running" except NotFound: logger.error(f"Container with ID {container.id} not found.") return False @property def image(self): return self._image