parent
bfa7866cee
commit
8b4c31fe63
|
@ -2,7 +2,6 @@ import logging
|
|||
import os
|
||||
|
||||
LOG_LEVEL = logging.DEBUG
|
||||
LOCAL_RUN = True
|
||||
DEV_BUILD = False
|
||||
|
||||
APP_DIR = os.getenv('APP_DIR')
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import logging
|
||||
|
||||
LOG_LEVEL = logging.DEBUG
|
||||
LOCAL_RUN = True
|
||||
DEV_BUILD = False
|
||||
|
||||
APP_DIR = None
|
||||
|
|
|
@ -62,6 +62,5 @@ def pytest_exception_interact(node):
|
|||
body=screenshot.read_bytes(),
|
||||
attachment_type=allure.attachment_type.PNG)
|
||||
driver.context.detach()
|
||||
AUT().stop()
|
||||
except Exception as ex:
|
||||
_logger.debug(ex)
|
||||
|
|
|
@ -4,6 +4,7 @@ from copy import deepcopy
|
|||
import configs.timeouts
|
||||
|
||||
if configs.system.IS_MAC:
|
||||
from atomacos._a11y import _running_apps_with_bundle_id
|
||||
import atomacos
|
||||
|
||||
BUNDLE_ID = 'im.Status.NimStatusClient'
|
||||
|
@ -13,7 +14,19 @@ BUNDLE_ID = 'im.Status.NimStatusClient'
|
|||
|
||||
|
||||
def attach_atomac(timeout_sec: int = configs.timeouts.UI_LOAD_TIMEOUT_SEC):
|
||||
atomator = atomacos.getAppRefByBundleId(BUNDLE_ID)
|
||||
def from_bundle_id(bundle_id):
|
||||
"""
|
||||
Get the top level element for the application with the specified
|
||||
bundle ID, such as com.vmware.fusion.
|
||||
"""
|
||||
apps = _running_apps_with_bundle_id(bundle_id)
|
||||
if not apps:
|
||||
raise ValueError(
|
||||
"Specified bundle ID not found in " "running apps: %s" % bundle_id
|
||||
)
|
||||
return atomacos.NativeUIElement.from_pid(apps[-1].processIdentifier())
|
||||
|
||||
atomator = from_bundle_id(BUNDLE_ID)
|
||||
started_at = time.monotonic()
|
||||
while not hasattr(atomator, 'AXMainWindow'):
|
||||
time.sleep(1)
|
||||
|
|
|
@ -3,7 +3,7 @@ import squish
|
|||
|
||||
import configs
|
||||
import driver
|
||||
from configs.system import IS_WIN, IS_LIN
|
||||
from configs.system import IS_LIN
|
||||
from driver import context
|
||||
from driver.server import SquishServer
|
||||
from scripts.utils import system_path, local_system
|
||||
|
@ -23,7 +23,6 @@ class AUT:
|
|||
self.ctx = None
|
||||
self.pid = None
|
||||
self.aut_id = self.path.name if IS_LIN else self.path.stem
|
||||
self.process_name = 'Status' if IS_WIN else 'nim_status_client'
|
||||
driver.testSettings.setWrappersForApplication(self.aut_id, ['Qt'])
|
||||
|
||||
def __str__(self):
|
||||
|
@ -49,12 +48,9 @@ class AUT:
|
|||
self.ctx = None
|
||||
return self
|
||||
|
||||
@allure.step('Close application by process name')
|
||||
@allure.step('Close application')
|
||||
def stop(self):
|
||||
if configs.LOCAL_RUN:
|
||||
local_system.kill_process_by_pid(self.pid)
|
||||
else:
|
||||
local_system.kill_process_by_name(self.process_name)
|
||||
local_system.kill_process(self.pid)
|
||||
|
||||
@allure.step("Start application")
|
||||
def launch(self, *args) -> 'AUT':
|
||||
|
@ -67,10 +63,6 @@ class AUT:
|
|||
f'"{self.path}"'
|
||||
] + list(args)
|
||||
local_system.execute(command)
|
||||
try:
|
||||
local_system.wait_for_started(self.process_name)
|
||||
except AssertionError:
|
||||
local_system.execute(command, check=True)
|
||||
else:
|
||||
SquishServer().add_executable_aut(self.aut_id, self.path.parent)
|
||||
command = [self.aut_id] + list(args)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import logging
|
||||
import typing
|
||||
from subprocess import CalledProcessError
|
||||
|
||||
import configs.testpath
|
||||
from scripts.utils import local_system
|
||||
|
@ -20,6 +21,7 @@ class SquishServer:
|
|||
self.config = configs.testpath.ROOT / 'squish_server.ini'
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.pid = None
|
||||
|
||||
def start(self):
|
||||
cmd = [
|
||||
|
@ -28,19 +30,12 @@ class SquishServer:
|
|||
f'--host={self.host}',
|
||||
f'--port={self.port}',
|
||||
]
|
||||
local_system.execute(cmd)
|
||||
try:
|
||||
local_system.wait_for_started(_PROCESS_NAME)
|
||||
except AssertionError as err:
|
||||
_logger.info(err)
|
||||
local_system.execute(cmd, check=True)
|
||||
self.pid = local_system.execute(cmd)
|
||||
|
||||
def stop(self):
|
||||
local_system.kill_process_by_name(_PROCESS_NAME, verify=False)
|
||||
try:
|
||||
local_system.wait_for_close(_PROCESS_NAME, 2)
|
||||
except AssertionError as err:
|
||||
_logger.debug(err)
|
||||
if self.pid is not None:
|
||||
local_system.kill_process(self.pid)
|
||||
self.pid = None
|
||||
|
||||
# https://doc-snapshots.qt.io/squish/cli-squishserver.html
|
||||
def configuring(self, action: str, options: typing.Union[int, str, list]):
|
||||
|
|
|
@ -3,128 +3,24 @@ import os
|
|||
import signal
|
||||
import subprocess
|
||||
import time
|
||||
from collections import namedtuple
|
||||
from datetime import datetime
|
||||
|
||||
import allure
|
||||
import psutil
|
||||
|
||||
import configs
|
||||
from configs.system import IS_WIN
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
process_info = namedtuple('RunInfo', ['pid', 'name', 'create_time'])
|
||||
|
||||
|
||||
@allure.step('Find process by name')
|
||||
def find_process_by_name(process_name: str):
|
||||
processes = []
|
||||
for proc in psutil.process_iter():
|
||||
try:
|
||||
if process_name.lower().split('.')[0] == proc.name().lower().split('.')[0]:
|
||||
processes.append(process_info(
|
||||
proc.pid,
|
||||
proc.name(),
|
||||
datetime.fromtimestamp(proc.create_time()).strftime("%H:%M:%S.%f"))
|
||||
)
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
pass
|
||||
return processes
|
||||
|
||||
|
||||
@allure.step('Find process by pid')
|
||||
def find_process_by_pid(pid):
|
||||
for proc in psutil.process_iter():
|
||||
try:
|
||||
if proc.pid == pid:
|
||||
return process_info(
|
||||
proc.pid,
|
||||
proc.name(),
|
||||
datetime.fromtimestamp(proc.create_time()).strftime("%H:%M:%S.%f")
|
||||
)
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
pass
|
||||
|
||||
|
||||
@allure.step('Find process by port')
|
||||
def find_process_by_port(port: int):
|
||||
for proc in psutil.process_iter():
|
||||
try:
|
||||
for conns in proc.connections(kind='inet'):
|
||||
if conns.laddr.port == port:
|
||||
return process_info(
|
||||
proc.pid,
|
||||
proc.name(),
|
||||
datetime.fromtimestamp(proc.create_time()).strftime("%H:%M:%S.%f")
|
||||
)
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
pass
|
||||
|
||||
|
||||
@allure.step('Kill process by name')
|
||||
def kill_process_by_name(process_name: str, verify: bool = True, timeout_sec: int = 10):
|
||||
_logger.info(f'Closing process: {process_name}')
|
||||
processes = find_process_by_name(process_name)
|
||||
for process in processes:
|
||||
try:
|
||||
os.kill(process.pid, signal.SIGILL if IS_WIN else signal.SIGKILL)
|
||||
except PermissionError as err:
|
||||
_logger.info(f'Close "{process}" error: {err}')
|
||||
if verify and processes:
|
||||
wait_for_close(process_name, timeout_sec)
|
||||
|
||||
|
||||
@allure.step('Kill process by PID')
|
||||
def kill_process_by_pid(pid, verify: bool = True, timeout_sec: int = 10):
|
||||
@allure.step('Kill process')
|
||||
def kill_process(pid):
|
||||
os.kill(pid, signal.SIGILL if IS_WIN else signal.SIGKILL)
|
||||
if verify:
|
||||
wait_for_close(pid=pid, timeout_sec=timeout_sec)
|
||||
|
||||
|
||||
@allure.step('Kill process by port')
|
||||
def kill_process_by_port(port: int):
|
||||
proc = find_process_by_port(port)
|
||||
if proc is not None and proc.pid:
|
||||
kill_process_by_pid(proc.pid)
|
||||
|
||||
|
||||
@allure.step('Wait for process start')
|
||||
def wait_for_started(process_name: str = None, timeout_sec: int = configs.timeouts.PROCESS_TIMEOUT_SEC):
|
||||
started_at = time.monotonic()
|
||||
while True:
|
||||
process = find_process_by_name(process_name)
|
||||
if process:
|
||||
_logger.info(f'Process started: {process_name}, start time: {process[0].create_time}')
|
||||
return process[0]
|
||||
time.sleep(1)
|
||||
_logger.debug(f'Waiting time: {int(time.monotonic() - started_at)} seconds')
|
||||
assert time.monotonic() - started_at < timeout_sec, f'Start process error: {process_name}'
|
||||
|
||||
|
||||
@allure.step('Wait for process close')
|
||||
def wait_for_close(process_name: str = None, timeout_sec: int = configs.timeouts.PROCESS_TIMEOUT_SEC, pid=None):
|
||||
started_at = time.monotonic()
|
||||
while True:
|
||||
if process_name is not None:
|
||||
process = find_process_by_name(process_name)
|
||||
if not process:
|
||||
break
|
||||
elif pid is not None:
|
||||
process = find_process_by_pid(pid)
|
||||
if process is None:
|
||||
break
|
||||
else:
|
||||
raise RuntimeError('Set process name or PID to find process')
|
||||
time.sleep(1)
|
||||
assert time.monotonic() - started_at < timeout_sec, f'Close process error: {process_name or pid}'
|
||||
_logger.info(f'Process closed: {process_name}')
|
||||
|
||||
|
||||
@allure.step('System execute command')
|
||||
def execute(
|
||||
command: list,
|
||||
shell=True,
|
||||
shell=False if IS_WIN else True,
|
||||
stderr=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
check=False
|
||||
|
@ -152,7 +48,7 @@ def execute(
|
|||
@allure.step('System run command')
|
||||
def run(
|
||||
command: list,
|
||||
shell=True,
|
||||
shell=False if IS_WIN else True,
|
||||
stderr=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
timeout_sec=configs.timeouts.PROCESS_TIMEOUT_SEC,
|
||||
|
|
Loading…
Reference in New Issue