mirror of
https://github.com/status-im/status-desktop.git
synced 2025-01-09 22:06:25 +00:00
126 lines
3.7 KiB
Python
126 lines
3.7 KiB
Python
import logging
|
|
import typing
|
|
|
|
import pytest
|
|
from testrail_api import TestRailAPI
|
|
|
|
import configs
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
testrail_api = None
|
|
|
|
PASS = 1
|
|
FAIL = 5
|
|
RETEST = 4
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def init_testrail_api(request):
|
|
global testrail_api
|
|
if configs.testrail.TESTRAIL_RUN_ID:
|
|
_logger.info('TestRail API initializing')
|
|
testrail_api = TestRailAPI(
|
|
configs.testrail.TESTRAIL_URL,
|
|
configs.testrail.TESTRAIL_USR,
|
|
configs.testrail.TESTRAIL_PSW
|
|
)
|
|
test_case_ids = get_test_ids_in_session(request)
|
|
for test_case_id in test_case_ids:
|
|
if is_test_case_in_run(test_case_id):
|
|
_update_result(test_case_id, RETEST)
|
|
_logger.info(f'Test: "{test_case_id}" marked as "Retest"')
|
|
else:
|
|
_logger.info(f'Report result for test case: {test_case_id} skipped, not in test run')
|
|
else:
|
|
_logger.info('TestRail report skipped')
|
|
|
|
|
|
@pytest.fixture
|
|
def check_result(request):
|
|
yield
|
|
if configs.testrail.TESTRAIL_RUN_ID:
|
|
item = request.node
|
|
test_case_ids = _find_test_case_id_markers(request)
|
|
for test_case_id in test_case_ids:
|
|
if is_test_case_in_run(test_case_id):
|
|
current_test_status = _get_test_case_status(test_case_id)
|
|
if item.rep_call.failed:
|
|
if current_test_status != FAIL:
|
|
_update_result(test_case_id, FAIL)
|
|
_update_comment(test_case_id, f"{request.node.name} FAILED")
|
|
else:
|
|
if current_test_status != FAIL:
|
|
_update_result(test_case_id, PASS)
|
|
_update_comment(test_case_id, f"{request.node.name} SUCCESS")
|
|
|
|
|
|
def _update_result(test_case_id: int, result: int):
|
|
testrail_api.results.add_result_for_case(
|
|
run_id=configs.testrail.TESTRAIL_RUN_ID,
|
|
case_id=test_case_id,
|
|
status_id=result,
|
|
)
|
|
|
|
|
|
def _update_comment(test_case_id: int, comment: str):
|
|
testrail_api.results.add_result_for_case(
|
|
run_id=configs.testrail.TESTRAIL_RUN_ID,
|
|
case_id=test_case_id,
|
|
comment=comment
|
|
)
|
|
|
|
|
|
def _find_test_case_id_markers(request) -> typing.List[int]:
|
|
for marker in request.node.own_markers:
|
|
if marker.name == 'case':
|
|
test_case_ids = marker.args
|
|
return test_case_ids
|
|
return []
|
|
|
|
|
|
def _get_test_case_status(test_case_id: int) -> int:
|
|
test_case_results = testrail_api.results.get_results_for_case(configs.testrail.TESTRAIL_RUN_ID, test_case_id)
|
|
try:
|
|
result = 0
|
|
while True:
|
|
last_test_case_status = test_case_results['results'][result]['status_id']
|
|
if last_test_case_status is None:
|
|
result += 1
|
|
else:
|
|
return last_test_case_status
|
|
except:
|
|
return RETEST
|
|
|
|
|
|
def is_test_case_in_run(test_case_id: int) -> bool:
|
|
try:
|
|
testrail_api.results.get_results_for_case(configs.testrail.TESTRAIL_RUN_ID, test_case_id)
|
|
except Exception as err:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def _get_test_cases():
|
|
results = []
|
|
limit = 250
|
|
chunk = 0
|
|
while True:
|
|
tests = testrail_api.tests.get_tests(configs.testrail.TESTRAIL_RUN_ID, offset=chunk)['tests']
|
|
results.extend(tests)
|
|
if len(tests) == limit:
|
|
chunk += limit
|
|
else:
|
|
return results
|
|
|
|
|
|
def get_test_ids_in_session(request):
|
|
tests = request.session.items
|
|
ids = []
|
|
for test in tests:
|
|
for marker in getattr(test, 'own_markers', []):
|
|
if getattr(marker, 'name', '') == 'case':
|
|
ids.extend(list(marker.args))
|
|
return set(ids)
|