status-desktop/test/e2e/scripts/utils/local_system.py

167 lines
5.7 KiB
Python
Raw Normal View History

import logging
import os
import signal
import subprocess
import time
from collections import namedtuple
from datetime import datetime
import allure
import psutil
import configs
from configs.system import IS_WIN
_logger = logging.getLogger(__name__)
process_info = namedtuple('RunInfo', ['pid', 'name', 'create_time'])
@allure.step('Find process by name')
def find_process_by_name(process_name: str):
processes = []
for proc in psutil.process_iter():
try:
if process_name.lower().split('.')[0] == proc.name().lower().split('.')[0]:
processes.append(process_info(
proc.pid,
proc.name(),
datetime.fromtimestamp(proc.create_time()).strftime("%H:%M:%S.%f"))
)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return processes
@allure.step('Find process by pid')
def find_process_by_pid(pid):
for proc in psutil.process_iter():
try:
if proc.pid == pid:
return process_info(
proc.pid,
proc.name(),
datetime.fromtimestamp(proc.create_time()).strftime("%H:%M:%S.%f")
)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
@allure.step('Find process by port')
def find_process_by_port(port: int):
for proc in psutil.process_iter():
try:
for conns in proc.connections(kind='inet'):
if conns.laddr.port == port:
return process_info(
proc.pid,
proc.name(),
datetime.fromtimestamp(proc.create_time()).strftime("%H:%M:%S.%f")
)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
@allure.step('Kill process by name')
def kill_process_by_name(process_name: str, verify: bool = True, timeout_sec: int = 10):
_logger.info(f'Closing process: {process_name}')
processes = find_process_by_name(process_name)
for process in processes:
try:
os.kill(process.pid, signal.SIGILL if IS_WIN else signal.SIGKILL)
except PermissionError as err:
_logger.info(f'Close "{process}" error: {err}')
if verify and processes:
wait_for_close(process_name, timeout_sec)
@allure.step('Kill process by PID')
def kill_process_by_pid(pid, verify: bool = True, timeout_sec: int = 10):
os.kill(pid, signal.SIGILL if IS_WIN else signal.SIGKILL)
if verify:
wait_for_close(pid=pid, timeout_sec=timeout_sec)
@allure.step('Kill process by port')
def kill_process_by_port(port: int):
proc = find_process_by_port(port)
if proc is not None and proc.pid:
kill_process_by_pid(proc.pid)
@allure.step('Wait for process start')
def wait_for_started(process_name: str = None, timeout_sec: int = configs.timeouts.PROCESS_TIMEOUT_SEC):
started_at = time.monotonic()
while True:
process = find_process_by_name(process_name)
if process:
_logger.info(f'Process started: {process_name}, start time: {process[0].create_time}')
return process[0]
time.sleep(1)
_logger.debug(f'Waiting time: {int(time.monotonic() - started_at)} seconds')
assert time.monotonic() - started_at < timeout_sec, f'Start process error: {process_name}'
@allure.step('Wait for process close')
def wait_for_close(process_name: str = None, timeout_sec: int = configs.timeouts.PROCESS_TIMEOUT_SEC, pid=None):
started_at = time.monotonic()
while True:
if process_name is not None:
process = find_process_by_name(process_name)
if not process:
break
elif pid is not None:
process = find_process_by_pid(pid)
if process is None:
break
else:
raise RuntimeError('Set process name or PID to find process')
time.sleep(1)
assert time.monotonic() - started_at < timeout_sec, f'Close process error: {process_name}'
_logger.info(f'Process closed: {process_name}')
@allure.step('System execute command')
def execute(
command: list,
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
check=False
):
def _is_process_exists(_process) -> bool:
return _process.poll() is None
def _wait_for_execution(_process):
while _is_process_exists(_process):
time.sleep(1)
def _get_output(_process):
_wait_for_execution(_process)
return _process.communicate()
command = " ".join(str(atr) for atr in command)
_logger.info(f'Execute: {command}')
process = subprocess.Popen(command, shell=shell, stderr=stderr, stdout=stdout)
if check and process.returncode != 0:
stdout, stderr = _get_output(process)
raise RuntimeError(stderr)
return process.pid
@allure.step('System run command')
def run(
command: list,
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
timeout_sec=configs.timeouts.PROCESS_TIMEOUT_SEC,
check=True
):
command = " ".join(str(atr) for atr in command)
_logger.info(f'Execute: {command}')
process = subprocess.run(command, shell=shell, stderr=stderr, stdout=stdout, timeout=timeout_sec)
if check and process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode, command, process.stdout, process.stderr)
_logger.debug(f'stdout: {process.stdout}')