fix: remove windows carrige returns from files

Drives me crazy.

Signed-off-by: Jakub Sokołowski <jakub@status.im>
This commit is contained in:
Jakub Sokołowski 2023-11-30 23:32:06 +01:00 committed by Anastasiya
parent 2a9d8a4c29
commit 05ad2ebadb
11 changed files with 599 additions and 599 deletions

View File

@ -1,12 +1,12 @@
import os
import platform
IS_LIN = True if platform.system() == 'Linux' else False
IS_MAC = True if platform.system() == 'Darwin' else False
IS_WIN = True if platform.system() == 'Windows' else False
OS_ID = 'lin' if IS_LIN else 'mac' if IS_MAC else 'win'
DISPLAY = os.getenv('DISPLAY', ':0')
TEST_MODE = os.getenv('STATUS_RUNTIME_TEST_MODE')
import os
import platform
IS_LIN = True if platform.system() == 'Linux' else False
IS_MAC = True if platform.system() == 'Darwin' else False
IS_WIN = True if platform.system() == 'Windows' else False
OS_ID = 'lin' if IS_LIN else 'mac' if IS_MAC else 'win'
DISPLAY = os.getenv('DISPLAY', ':0')
TEST_MODE = os.getenv('STATUS_RUNTIME_TEST_MODE')

View File

@ -1,13 +1,13 @@
import configs.system
# Buttons
BACKSPACE = 'Backspace'
COMMAND = 'Command'
CTRL = 'Ctrl'
ESCAPE = 'Escape'
RETURN = 'Return'
SHIFT = 'Shift'
# Combinations
SELECT_ALL = f'{CTRL if configs.system.IS_WIN else COMMAND}+A'
OPEN_GOTO = f'{COMMAND}+{SHIFT}+G'
import configs.system
# Buttons
BACKSPACE = 'Backspace'
COMMAND = 'Command'
CTRL = 'Ctrl'
ESCAPE = 'Escape'
RETURN = 'Return'
SHIFT = 'Shift'
# Combinations
SELECT_ALL = f'{CTRL if configs.system.IS_WIN else COMMAND}+A'
OPEN_GOTO = f'{COMMAND}+{SHIFT}+G'

View File

@ -1,44 +1,44 @@
import time
import squish
def move(obj: object, x: int, y: int, dx: int, dy: int, step: int, sleep: float = 0):
while True:
if x > dx:
x -= step
if x < x:
x = dx
elif x < dx:
x += step
if x > dx:
x = dx
if y > dy:
y -= step
if y < dy:
y = dy
elif y < dy:
y += step
if y > dy:
y = dy
squish.mouseMove(obj, x, y)
time.sleep(sleep)
if x == dx and y == dy:
break
def press_and_move(
obj,
x: int,
y: int,
dx: int,
dy: int,
mouse: int = squish.MouseButton.LeftButton,
step: int = 1,
sleep: float = 0
):
squish.mouseMove(obj, x, y)
squish.mousePress(obj, x, y, mouse)
move(obj, x, y, dx, dy, step, sleep)
squish.mouseRelease(mouse)
time.sleep(1)
import time
import squish
def move(obj: object, x: int, y: int, dx: int, dy: int, step: int, sleep: float = 0):
while True:
if x > dx:
x -= step
if x < x:
x = dx
elif x < dx:
x += step
if x > dx:
x = dx
if y > dy:
y -= step
if y < dy:
y = dy
elif y < dy:
y += step
if y > dy:
y = dy
squish.mouseMove(obj, x, y)
time.sleep(sleep)
if x == dx and y == dy:
break
def press_and_move(
obj,
x: int,
y: int,
dx: int,
dy: int,
mouse: int = squish.MouseButton.LeftButton,
step: int = 1,
sleep: float = 0
):
squish.mouseMove(obj, x, y)
squish.mousePress(obj, x, y, mouse)
move(obj, x, y, dx, dy, step, sleep)
squish.mouseRelease(mouse)
time.sleep(1)

View File

@ -1,20 +1,20 @@
import logging
import allure
import configs
import driver
from gui.elements.object import QObject
_logger = logging.getLogger(__name__)
class CheckBox(QObject):
@allure.step("Set {0} value: {1}")
def set(self, value: bool, x: int = None, y: int = None):
if self.is_checked is not value:
self.click(x, y)
assert driver.waitFor(
lambda: self.is_checked is value, configs.timeouts.UI_LOAD_TIMEOUT_MSEC), 'Value not changed'
_logger.info(f'{self}: value changed to "{value}"')
import logging
import allure
import configs
import driver
from gui.elements.object import QObject
_logger = logging.getLogger(__name__)
class CheckBox(QObject):
@allure.step("Set {0} value: {1}")
def set(self, value: bool, x: int = None, y: int = None):
if self.is_checked is not value:
self.click(x, y)
assert driver.waitFor(
lambda: self.is_checked is value, configs.timeouts.UI_LOAD_TIMEOUT_MSEC), 'Value not changed'
_logger.info(f'{self}: value changed to "{value}"')

View File

@ -1,48 +1,48 @@
import logging
import time
import typing
import allure
import configs
import driver
from gui.elements.object import QObject
_logger = logging.getLogger(__name__)
class List(QObject):
@property
@allure.step('Get list items {0}')
def items(self):
return [self.object.itemAtIndex(index) for index in range(self.object.count)]
@allure.step('Get values of list items {0}')
def get_values(self, attr_name: str) -> typing.List[str]:
values = []
for index in range(self.object.count):
value = str(getattr(self.object.itemAtIndex(index), attr_name, ''))
if value:
values.append(value)
return values
@allure.step('Select item {1} in {0}')
def select(self, value: str, attr_name: str):
driver.mouseClick(self.wait_for_item(value, attr_name))
_logger.info(f'{self}: {value} selected')
@allure.step('Wait for item {1} in {0} with attribute {2}')
def wait_for_item(self, value: str, attr_name: str, timeout_sec: int = configs.timeouts.UI_LOAD_TIMEOUT_SEC):
started_at = time.monotonic()
values = []
while True:
for index in range(self.object.count):
cur_value = str(getattr(self.object.itemAtIndex(index), attr_name, ''))
if cur_value == value:
_logger.info(f'{self}: "{value}" for attribute "{attr_name}" appeared')
return self.object.itemAtIndex(index)
values.append(cur_value)
time.sleep(1)
if time.monotonic() - started_at > timeout_sec:
raise RuntimeError(f'value not found in list: {values}')
import logging
import time
import typing
import allure
import configs
import driver
from gui.elements.object import QObject
_logger = logging.getLogger(__name__)
class List(QObject):
@property
@allure.step('Get list items {0}')
def items(self):
return [self.object.itemAtIndex(index) for index in range(self.object.count)]
@allure.step('Get values of list items {0}')
def get_values(self, attr_name: str) -> typing.List[str]:
values = []
for index in range(self.object.count):
value = str(getattr(self.object.itemAtIndex(index), attr_name, ''))
if value:
values.append(value)
return values
@allure.step('Select item {1} in {0}')
def select(self, value: str, attr_name: str):
driver.mouseClick(self.wait_for_item(value, attr_name))
_logger.info(f'{self}: {value} selected')
@allure.step('Wait for item {1} in {0} with attribute {2}')
def wait_for_item(self, value: str, attr_name: str, timeout_sec: int = configs.timeouts.UI_LOAD_TIMEOUT_SEC):
started_at = time.monotonic()
values = []
while True:
for index in range(self.object.count):
cur_value = str(getattr(self.object.itemAtIndex(index), attr_name, ''))
if cur_value == value:
_logger.info(f'{self}: "{value}" for attribute "{attr_name}" appeared')
return self.object.itemAtIndex(index)
values.append(cur_value)
time.sleep(1)
if time.monotonic() - started_at > timeout_sec:
raise RuntimeError(f'value not found in list: {values}')

View File

@ -1,37 +1,37 @@
import logging
import allure
from gui.elements.object import QObject
_logger = logging.getLogger(__name__)
class Slider(QObject):
@property
@allure.step('Get minimal value {0}')
def min(self) -> int:
return int(getattr(self.object, 'from'))
@property
@allure.step('Get maximal value {0}')
def max(self) -> max:
return int(getattr(self.object, 'to'))
@property
@allure.step('Get value {0}')
def value(self) -> int:
return int(self.object.value)
@value.setter
@allure.step('Set value {1} in {0}')
def value(self, value: int):
if value != self.value:
if self.min <= value <= self.max:
if self.value < value:
while self.value < value:
self.object.increase()
if self.value > value:
while self.value > value:
self.object.decrease()
_logger.info(f'{self}: value changed to "{value}"')
import logging
import allure
from gui.elements.object import QObject
_logger = logging.getLogger(__name__)
class Slider(QObject):
@property
@allure.step('Get minimal value {0}')
def min(self) -> int:
return int(getattr(self.object, 'from'))
@property
@allure.step('Get maximal value {0}')
def max(self) -> max:
return int(getattr(self.object, 'to'))
@property
@allure.step('Get value {0}')
def value(self) -> int:
return int(self.object.value)
@value.setter
@allure.step('Set value {1} in {0}')
def value(self, value: int):
if value != self.value:
if self.min <= value <= self.max:
if self.value < value:
while self.value < value:
self.object.increase()
if self.value > value:
while self.value > value:
self.object.decrease()
_logger.info(f'{self}: value changed to "{value}"')

View File

@ -1,40 +1,40 @@
import logging
import allure
import configs
import driver
from gui.elements.object import QObject
_logger = logging.getLogger(__name__)
class TextEdit(QObject):
@property
@allure.step('Get current text {0}')
def text(self) -> str:
return str(self.object.text)
@text.setter
@allure.step('Type text {1} {0}')
def text(self, value: str):
self.clear()
self.type_text(value)
assert driver.waitFor(lambda: self.text == value, configs.timeouts.UI_LOAD_TIMEOUT_MSEC), \
f'Type text failed, value in field: "{self.text}", expected: {value}'
@allure.step('Type: {1} in {0}')
def type_text(self, value: str):
driver.type(self.object, value)
_logger.info(f'{self}: value changed to "{value}"')
return self
@allure.step('Clear {0}')
def clear(self, verify: bool = True):
self.object.clear()
if verify:
assert driver.waitFor(lambda: not self.text), \
f'Clear text field failed, value in field: "{self.text}"'
_logger.info(f'{self}: cleared')
return self
import logging
import allure
import configs
import driver
from gui.elements.object import QObject
_logger = logging.getLogger(__name__)
class TextEdit(QObject):
@property
@allure.step('Get current text {0}')
def text(self) -> str:
return str(self.object.text)
@text.setter
@allure.step('Type text {1} {0}')
def text(self, value: str):
self.clear()
self.type_text(value)
assert driver.waitFor(lambda: self.text == value, configs.timeouts.UI_LOAD_TIMEOUT_MSEC), \
f'Type text failed, value in field: "{self.text}", expected: {value}'
@allure.step('Type: {1} in {0}')
def type_text(self, value: str):
driver.type(self.object, value)
_logger.info(f'{self}: value changed to "{value}"')
return self
@allure.step('Clear {0}')
def clear(self, verify: bool = True):
self.object.clear()
if verify:
assert driver.waitFor(lambda: not self.text), \
f'Clear text field failed, value in field: "{self.text}"'
_logger.info(f'{self}: cleared')
return self

View File

@ -1,15 +1,15 @@
import logging
import allure
from gui.elements.object import QObject
_logger = logging.getLogger(__name__)
class TextLabel(QObject):
@property
@allure.step('Get text {0}')
def text(self) -> str:
return str(self.object.text)
import logging
import allure
from gui.elements.object import QObject
_logger = logging.getLogger(__name__)
class TextLabel(QObject):
@property
@allure.step('Get text {0}')
def text(self) -> str:
return str(self.object.text)

View File

@ -1,234 +1,234 @@
import logging
import time
import typing
from datetime import datetime
import allure
import cv2
import numpy as np
import pytesseract
from PIL import ImageGrab
import configs
import constants
import driver
from configs.system import IS_LIN
from scripts.tools.ocv import Ocv
from scripts.utils.system_path import SystemPath
_logger = logging.getLogger(__name__)
class Image:
def __init__(self, object_name: dict):
self.object_name = object_name
self._view = None
@property
@allure.step('Get image view')
def view(self) -> np.ndarray:
return self._view
@property
@allure.step('Get image height')
def height(self) -> int:
return self.view.shape[0]
@property
@allure.step('Get image width')
def width(self) -> int:
return self.view.shape[1]
@property
@allure.step('Get image is grayscale')
def is_grayscale(self) -> bool:
return self.view.ndim == 2
@allure.step('Set image in grayscale')
def set_grayscale(self) -> 'Image':
if not self.is_grayscale:
self._view = cv2.cvtColor(self.view, cv2.COLOR_BGR2GRAY)
return self
@allure.step('Grab image view from object')
def update_view(self):
_logger.debug(f'Image view was grab from: {self.object_name}')
rect = driver.object.globalBounds(driver.waitForObject(self.object_name))
img = ImageGrab.grab(
bbox=(rect.x, rect.y, rect.x + rect.width, rect.y + rect.height),
xdisplay=configs.system.DISPLAY if IS_LIN else None
)
self._view = cv2.cvtColor(np.array(img), cv2.COLOR_BGR2RGB)
@allure.step('Save image')
def save(self, path: SystemPath, force: bool = False):
path.parent.mkdir(parents=True, exist_ok=True)
if path.exists() and not force:
raise FileExistsError(path)
if self.view is None:
self.update_view()
cv2.imwrite(str(path), self.view)
@allure.step('Compare images')
def compare(
self, expected: np.ndarray, threshold: float = 0.99) -> bool:
if self.view is None:
self.update_view()
correlation = Ocv.compare_images(self.view, expected)
result = correlation >= threshold
_logger.info(f'Images equals on: {abs(round(correlation, 4) * 100)}%')
if result:
_logger.info(f'Screenshot comparison passed')
else:
configs.testpath.TEST_ARTIFACTS.mkdir(parents=True, exist_ok=True)
diff = Ocv.draw_contours(self.view, expected)
actual_fp = configs.testpath.TEST_ARTIFACTS / f'actual_image.png'
expected_fp = configs.testpath.TEST_ARTIFACTS / f'expected_image.png'
diff_fp = configs.testpath.TEST_ARTIFACTS / f'diff_image.png'
self.save(actual_fp, force=True)
cv2.imwrite(str(expected_fp), expected)
cv2.imwrite(str(diff_fp), diff)
allure.attach(name='actual', body=actual_fp.read_bytes(), attachment_type=allure.attachment_type.PNG)
allure.attach(name='expected', body=expected_fp.read_bytes(), attachment_type=allure.attachment_type.PNG)
allure.attach(name='diff', body=diff_fp.read_bytes(), attachment_type=allure.attachment_type.PNG)
_logger.info(
f"Screenshot comparison failed.\n"
f"Actual, Diff and Expected screenshots are saved:\n"
f"{configs.testpath.TEST_ARTIFACTS.relative_to(configs.testpath.ROOT)}.")
return result
@allure.step('Crop image')
def crop(self, rect: driver.UiTypes.ScreenRectangle):
assert rect.x + rect.width < self.width
assert rect.y + rect.height < self.height
self._view = self.view[rect.y: (rect.y + rect.height), rect.x: (rect.x + rect.width)]
@allure.step('Parse text on image')
def to_string(self, custom_config: str):
text: str = pytesseract.image_to_string(self.view, config=custom_config)
_logger.debug(f'Text on image: {text}')
return text
@allure.step('Verify: Image contains text: {1}')
def has_text(self, text: str, criteria: str, crop: driver.UiTypes.ScreenRectangle = None) -> bool:
self.update_view()
if crop:
self.crop(crop)
# Search text on image converted in gray color
self.set_grayscale()
fp_gray = configs.testpath.TEST_ARTIFACTS / f'search_region_in_gray_color.png'
self.save(fp_gray, force=True)
if text.lower() in self.to_string(criteria).lower():
allure.attach(name='search_region', body=fp_gray.read_bytes(), attachment_type=allure.attachment_type.PNG)
return True
# Search text on image with inverted color
self._view = cv2.bitwise_not(self.view)
fp_invert = configs.testpath.TEST_ARTIFACTS / f'search_region_in_inverted_color.png'
self.save(fp_invert, force=True)
if text.lower() in self.to_string(criteria).lower():
allure.attach(name='search_region', body=fp_invert.read_bytes(), attachment_type=allure.attachment_type.PNG)
return True
return False
@allure.step('Search color on image')
def has_color(self, color: constants.Color, denoise: int = 10, crop: driver.UiTypes.ScreenRectangle = None) -> bool:
self.update_view()
if crop:
self.crop(crop)
initial_view = configs.testpath.TEST_ARTIFACTS / f'{color.name}.png'
self.save(initial_view)
allure.attach(name='search_region', body=initial_view.read_bytes(), attachment_type=allure.attachment_type.PNG)
contours = self._get_color_contours(color, denoise, apply=True)
mask_view = configs.testpath.TEST_ARTIFACTS / f'{color.name}_mask.png'
self.save(mask_view)
allure.attach(name='contours', body=mask_view.read_bytes(), attachment_type=allure.attachment_type.PNG)
self._view = None
return len(contours) >= 1
@allure.step('Apply contours with found color on image')
def _get_color_contours(
self,
color: constants.Color,
denoise: int = 10,
apply: bool = False
) -> typing.List[driver.UiTypes.ScreenRectangle]:
if not self.is_grayscale:
view = cv2.cvtColor(self.view, cv2.COLOR_BGR2HSV)
else:
view = self.view
boundaries = constants.boundaries[color]
if color == constants.Color.RED:
mask = None
for bond in boundaries:
lower_range = np.array(bond[0])
upper_range = np.array(bond[1])
_mask = cv2.inRange(view, lower_range, upper_range)
mask = _mask if mask is None else mask + _mask
else:
lower_range = np.array(boundaries[0])
upper_range = np.array(boundaries[1])
mask = cv2.inRange(view, lower_range, upper_range)
contours = []
_contours = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
_contours = _contours[0] if len(_contours) == 2 else _contours[1]
for _contour in _contours:
x, y, w, h = cv2.boundingRect(_contour)
# To remove small artifacts, less than denoise variable value
if w * h < denoise:
continue
contours.append(driver.UiTypes.ScreenRectangle(x, y, w, h))
if apply:
self._view = cv2.bitwise_and(self.view, self.view, mask=mask)
for contour in contours:
cv2.rectangle(
self.view,
(contour.x, contour.y),
(contour.x + contour.width, contour.y + contour.height),
(36, 255, 12), 2)
return contours
@allure.step('Compare images')
def compare(actual: Image,
expected: typing.Union[str, SystemPath, Image],
threshold: float = 0.99,
timout_sec: int = 1
):
expected_fp = None
if isinstance(expected, str):
expected_fp = configs.testpath.TEST_VP / configs.system.OS_ID / expected
if not expected_fp.exists():
expected_fp = configs.testpath.TEST_VP / expected
expected = expected_fp
if isinstance(expected, SystemPath):
assert expected.exists(), f'File: {expected} not found'
expected = cv2.imread(str(expected))
else:
expected = expected.view
start = datetime.now()
while not actual.compare(expected, threshold):
time.sleep(1)
if (datetime.now() - start).seconds > timout_sec:
if configs.UPDATE_VP_ON_FAIL and expected_fp is not None:
actual.save(expected_fp, force=True)
_logger.warning(f'VP file updated: {expected_fp}')
break
else:
raise AssertionError('Images comparison failed')
_logger.info(f'Screenshot comparison passed')
import logging
import time
import typing
from datetime import datetime
import allure
import cv2
import numpy as np
import pytesseract
from PIL import ImageGrab
import configs
import constants
import driver
from configs.system import IS_LIN
from scripts.tools.ocv import Ocv
from scripts.utils.system_path import SystemPath
_logger = logging.getLogger(__name__)
class Image:
def __init__(self, object_name: dict):
self.object_name = object_name
self._view = None
@property
@allure.step('Get image view')
def view(self) -> np.ndarray:
return self._view
@property
@allure.step('Get image height')
def height(self) -> int:
return self.view.shape[0]
@property
@allure.step('Get image width')
def width(self) -> int:
return self.view.shape[1]
@property
@allure.step('Get image is grayscale')
def is_grayscale(self) -> bool:
return self.view.ndim == 2
@allure.step('Set image in grayscale')
def set_grayscale(self) -> 'Image':
if not self.is_grayscale:
self._view = cv2.cvtColor(self.view, cv2.COLOR_BGR2GRAY)
return self
@allure.step('Grab image view from object')
def update_view(self):
_logger.debug(f'Image view was grab from: {self.object_name}')
rect = driver.object.globalBounds(driver.waitForObject(self.object_name))
img = ImageGrab.grab(
bbox=(rect.x, rect.y, rect.x + rect.width, rect.y + rect.height),
xdisplay=configs.system.DISPLAY if IS_LIN else None
)
self._view = cv2.cvtColor(np.array(img), cv2.COLOR_BGR2RGB)
@allure.step('Save image')
def save(self, path: SystemPath, force: bool = False):
path.parent.mkdir(parents=True, exist_ok=True)
if path.exists() and not force:
raise FileExistsError(path)
if self.view is None:
self.update_view()
cv2.imwrite(str(path), self.view)
@allure.step('Compare images')
def compare(
self, expected: np.ndarray, threshold: float = 0.99) -> bool:
if self.view is None:
self.update_view()
correlation = Ocv.compare_images(self.view, expected)
result = correlation >= threshold
_logger.info(f'Images equals on: {abs(round(correlation, 4) * 100)}%')
if result:
_logger.info(f'Screenshot comparison passed')
else:
configs.testpath.TEST_ARTIFACTS.mkdir(parents=True, exist_ok=True)
diff = Ocv.draw_contours(self.view, expected)
actual_fp = configs.testpath.TEST_ARTIFACTS / f'actual_image.png'
expected_fp = configs.testpath.TEST_ARTIFACTS / f'expected_image.png'
diff_fp = configs.testpath.TEST_ARTIFACTS / f'diff_image.png'
self.save(actual_fp, force=True)
cv2.imwrite(str(expected_fp), expected)
cv2.imwrite(str(diff_fp), diff)
allure.attach(name='actual', body=actual_fp.read_bytes(), attachment_type=allure.attachment_type.PNG)
allure.attach(name='expected', body=expected_fp.read_bytes(), attachment_type=allure.attachment_type.PNG)
allure.attach(name='diff', body=diff_fp.read_bytes(), attachment_type=allure.attachment_type.PNG)
_logger.info(
f"Screenshot comparison failed.\n"
f"Actual, Diff and Expected screenshots are saved:\n"
f"{configs.testpath.TEST_ARTIFACTS.relative_to(configs.testpath.ROOT)}.")
return result
@allure.step('Crop image')
def crop(self, rect: driver.UiTypes.ScreenRectangle):
assert rect.x + rect.width < self.width
assert rect.y + rect.height < self.height
self._view = self.view[rect.y: (rect.y + rect.height), rect.x: (rect.x + rect.width)]
@allure.step('Parse text on image')
def to_string(self, custom_config: str):
text: str = pytesseract.image_to_string(self.view, config=custom_config)
_logger.debug(f'Text on image: {text}')
return text
@allure.step('Verify: Image contains text: {1}')
def has_text(self, text: str, criteria: str, crop: driver.UiTypes.ScreenRectangle = None) -> bool:
self.update_view()
if crop:
self.crop(crop)
# Search text on image converted in gray color
self.set_grayscale()
fp_gray = configs.testpath.TEST_ARTIFACTS / f'search_region_in_gray_color.png'
self.save(fp_gray, force=True)
if text.lower() in self.to_string(criteria).lower():
allure.attach(name='search_region', body=fp_gray.read_bytes(), attachment_type=allure.attachment_type.PNG)
return True
# Search text on image with inverted color
self._view = cv2.bitwise_not(self.view)
fp_invert = configs.testpath.TEST_ARTIFACTS / f'search_region_in_inverted_color.png'
self.save(fp_invert, force=True)
if text.lower() in self.to_string(criteria).lower():
allure.attach(name='search_region', body=fp_invert.read_bytes(), attachment_type=allure.attachment_type.PNG)
return True
return False
@allure.step('Search color on image')
def has_color(self, color: constants.Color, denoise: int = 10, crop: driver.UiTypes.ScreenRectangle = None) -> bool:
self.update_view()
if crop:
self.crop(crop)
initial_view = configs.testpath.TEST_ARTIFACTS / f'{color.name}.png'
self.save(initial_view)
allure.attach(name='search_region', body=initial_view.read_bytes(), attachment_type=allure.attachment_type.PNG)
contours = self._get_color_contours(color, denoise, apply=True)
mask_view = configs.testpath.TEST_ARTIFACTS / f'{color.name}_mask.png'
self.save(mask_view)
allure.attach(name='contours', body=mask_view.read_bytes(), attachment_type=allure.attachment_type.PNG)
self._view = None
return len(contours) >= 1
@allure.step('Apply contours with found color on image')
def _get_color_contours(
self,
color: constants.Color,
denoise: int = 10,
apply: bool = False
) -> typing.List[driver.UiTypes.ScreenRectangle]:
if not self.is_grayscale:
view = cv2.cvtColor(self.view, cv2.COLOR_BGR2HSV)
else:
view = self.view
boundaries = constants.boundaries[color]
if color == constants.Color.RED:
mask = None
for bond in boundaries:
lower_range = np.array(bond[0])
upper_range = np.array(bond[1])
_mask = cv2.inRange(view, lower_range, upper_range)
mask = _mask if mask is None else mask + _mask
else:
lower_range = np.array(boundaries[0])
upper_range = np.array(boundaries[1])
mask = cv2.inRange(view, lower_range, upper_range)
contours = []
_contours = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
_contours = _contours[0] if len(_contours) == 2 else _contours[1]
for _contour in _contours:
x, y, w, h = cv2.boundingRect(_contour)
# To remove small artifacts, less than denoise variable value
if w * h < denoise:
continue
contours.append(driver.UiTypes.ScreenRectangle(x, y, w, h))
if apply:
self._view = cv2.bitwise_and(self.view, self.view, mask=mask)
for contour in contours:
cv2.rectangle(
self.view,
(contour.x, contour.y),
(contour.x + contour.width, contour.y + contour.height),
(36, 255, 12), 2)
return contours
@allure.step('Compare images')
def compare(actual: Image,
expected: typing.Union[str, SystemPath, Image],
threshold: float = 0.99,
timout_sec: int = 1
):
expected_fp = None
if isinstance(expected, str):
expected_fp = configs.testpath.TEST_VP / configs.system.OS_ID / expected
if not expected_fp.exists():
expected_fp = configs.testpath.TEST_VP / expected
expected = expected_fp
if isinstance(expected, SystemPath):
assert expected.exists(), f'File: {expected} not found'
expected = cv2.imread(str(expected))
else:
expected = expected.view
start = datetime.now()
while not actual.compare(expected, threshold):
time.sleep(1)
if (datetime.now() - start).seconds > timout_sec:
if configs.UPDATE_VP_ON_FAIL and expected_fp is not None:
actual.save(expected_fp, force=True)
_logger.warning(f'VP file updated: {expected_fp}')
break
else:
raise AssertionError('Images comparison failed')
_logger.info(f'Screenshot comparison passed')

View File

@ -1,27 +1,27 @@
import cv2
import numpy as np
class Ocv:
@classmethod
def compare_images(cls, lhd: np.ndarray, rhd: np.ndarray) -> float:
res = cv2.matchTemplate(lhd, rhd, cv2.TM_CCOEFF_NORMED)
_, correlation, _, _ = cv2.minMaxLoc(res)
return correlation
@classmethod
def draw_contours(cls, lhd: np.ndarray, rhd: np.ndarray) -> np.ndarray:
view = rhd.copy()
lhd = cv2.cvtColor(lhd, cv2.COLOR_BGRA2GRAY)
_, thresh = cv2.threshold(lhd, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)
contours, hierarchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(view, contours, -1, (0, 0, 255), 1)
rhd = cv2.cvtColor(rhd, cv2.COLOR_BGRA2GRAY)
_, thresh = cv2.threshold(rhd, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)
contours, hierarchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(view, contours, -1, (0, 255, 0), 1)
return view
import cv2
import numpy as np
class Ocv:
@classmethod
def compare_images(cls, lhd: np.ndarray, rhd: np.ndarray) -> float:
res = cv2.matchTemplate(lhd, rhd, cv2.TM_CCOEFF_NORMED)
_, correlation, _, _ = cv2.minMaxLoc(res)
return correlation
@classmethod
def draw_contours(cls, lhd: np.ndarray, rhd: np.ndarray) -> np.ndarray:
view = rhd.copy()
lhd = cv2.cvtColor(lhd, cv2.COLOR_BGRA2GRAY)
_, thresh = cv2.threshold(lhd, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)
contours, hierarchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(view, contours, -1, (0, 0, 255), 1)
rhd = cv2.cvtColor(rhd, cv2.COLOR_BGRA2GRAY)
_, thresh = cv2.threshold(rhd, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)
contours, hierarchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
cv2.drawContours(view, contours, -1, (0, 255, 0), 1)
return view

View File

@ -1,109 +1,109 @@
import logging
import os
import signal
import subprocess
import time
import typing
import allure
import psutil
import configs
from configs.system import IS_WIN
_logger = logging.getLogger(__name__)
def find_process_by_port(port: int) -> typing.List[int]:
pid_list = []
for proc in psutil.process_iter():
try:
for conns in proc.connections(kind='inet'):
if conns.laddr.port == port:
pid_list.append(proc.pid)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return pid_list
def find_free_port(start: int, step: int):
while find_process_by_port(start):
start+=step
return start
def wait_for_close(pid: int, timeout_sec: int = configs.timeouts.PROCESS_TIMEOUT_SEC):
started_at = time.monotonic()
while True:
for proc in psutil.process_iter():
try:
if proc.pid == pid:
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
time.sleep(1)
if time.monotonic() - started_at > timeout_sec:
raise RuntimeError(f'Process with PID: {pid} not closed')
else:
break
@allure.step('Kill process')
def kill_process(pid, verify: bool = False, timeout_sec: int = configs.timeouts.PROCESS_TIMEOUT_SEC, attempt: int = 2):
try:
os.kill(pid, signal.SIGILL if IS_WIN else signal.SIGKILL)
except ProcessLookupError as err:
_logger.debug(err)
if verify:
try:
wait_for_close(pid, timeout_sec)
except RuntimeError as err:
if attempt:
kill_process(pid, verify, timeout_sec, attempt - 1)
else:
raise err
@allure.step('System execute command')
def execute(
command: list,
shell=False if IS_WIN else True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
check=False
):
def _is_process_exists(_process) -> bool:
return _process.poll() is None
def _wait_for_execution(_process):
while _is_process_exists(_process):
time.sleep(1)
def _get_output(_process):
_wait_for_execution(_process)
return _process.communicate()
command = " ".join(str(atr) for atr in command)
_logger.info(f'Execute: {command}')
process = subprocess.Popen(command, shell=shell, stderr=stderr, stdout=stdout)
if check and process.returncode != 0:
stdout, stderr = _get_output(process)
raise RuntimeError(stderr)
return process.pid
@allure.step('System run command')
def run(
command: list,
shell=False if IS_WIN else True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
timeout_sec=configs.timeouts.PROCESS_TIMEOUT_SEC,
check=True
):
command = " ".join(str(atr) for atr in command)
_logger.info(f'Execute: {command}')
process = subprocess.run(command, shell=shell, stderr=stderr, stdout=stdout, timeout=timeout_sec)
if check and process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode, command, process.stdout, process.stderr)
_logger.debug(f'stdout: {process.stdout}')
import logging
import os
import signal
import subprocess
import time
import typing
import allure
import psutil
import configs
from configs.system import IS_WIN
_logger = logging.getLogger(__name__)
def find_process_by_port(port: int) -> typing.List[int]:
pid_list = []
for proc in psutil.process_iter():
try:
for conns in proc.connections(kind='inet'):
if conns.laddr.port == port:
pid_list.append(proc.pid)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return pid_list
def find_free_port(start: int, step: int):
while find_process_by_port(start):
start+=step
return start
def wait_for_close(pid: int, timeout_sec: int = configs.timeouts.PROCESS_TIMEOUT_SEC):
started_at = time.monotonic()
while True:
for proc in psutil.process_iter():
try:
if proc.pid == pid:
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
time.sleep(1)
if time.monotonic() - started_at > timeout_sec:
raise RuntimeError(f'Process with PID: {pid} not closed')
else:
break
@allure.step('Kill process')
def kill_process(pid, verify: bool = False, timeout_sec: int = configs.timeouts.PROCESS_TIMEOUT_SEC, attempt: int = 2):
try:
os.kill(pid, signal.SIGILL if IS_WIN else signal.SIGKILL)
except ProcessLookupError as err:
_logger.debug(err)
if verify:
try:
wait_for_close(pid, timeout_sec)
except RuntimeError as err:
if attempt:
kill_process(pid, verify, timeout_sec, attempt - 1)
else:
raise err
@allure.step('System execute command')
def execute(
command: list,
shell=False if IS_WIN else True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
check=False
):
def _is_process_exists(_process) -> bool:
return _process.poll() is None
def _wait_for_execution(_process):
while _is_process_exists(_process):
time.sleep(1)
def _get_output(_process):
_wait_for_execution(_process)
return _process.communicate()
command = " ".join(str(atr) for atr in command)
_logger.info(f'Execute: {command}')
process = subprocess.Popen(command, shell=shell, stderr=stderr, stdout=stdout)
if check and process.returncode != 0:
stdout, stderr = _get_output(process)
raise RuntimeError(stderr)
return process.pid
@allure.step('System run command')
def run(
command: list,
shell=False if IS_WIN else True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
timeout_sec=configs.timeouts.PROCESS_TIMEOUT_SEC,
check=True
):
command = " ".join(str(atr) for atr in command)
_logger.info(f'Execute: {command}')
process = subprocess.run(command, shell=shell, stderr=stderr, stdout=stdout, timeout=timeout_sec)
if check and process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode, command, process.stdout, process.stderr)
_logger.debug(f'stdout: {process.stdout}')