188 lines
5.4 KiB
Python
188 lines
5.4 KiB
Python
import logging
|
|
import typing
|
|
from collections import namedtuple
|
|
|
|
import pytest
|
|
import requests
|
|
from testrail_api import TestRailAPI
|
|
|
|
import configs
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
global testrail_api
|
|
test_run_id = None
|
|
|
|
PASS = 1
|
|
FAIL = 5
|
|
SKIPPED = 11
|
|
UNTESTED = 12
|
|
|
|
test_case = namedtuple('TestCase', ['id', 'skipped'])
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def init_testrail_api(request):
|
|
if not configs.testrail.RUN_NAME:
|
|
LOG.info('TestRail report skipped')
|
|
return
|
|
|
|
LOG.info('TestRail API initializing')
|
|
global testrail_api
|
|
testrail_api = TestRailAPI(
|
|
configs.testrail.URL,
|
|
configs.testrail.USR,
|
|
configs.testrail.PSW
|
|
)
|
|
|
|
response = requests.get(
|
|
configs.testrail.URL,
|
|
auth=(configs.testrail.USR, configs.testrail.PSW),
|
|
)
|
|
|
|
if response.status_code in [400, 403, 429, 500]:
|
|
LOG.info(f'Could not login to test rail, got the following response {response}')
|
|
return
|
|
|
|
test_cases = get_test_cases_in_session(request)
|
|
test_run = get_test_run(configs.testrail.RUN_NAME)
|
|
if not test_run:
|
|
test_case_ids = list(set([tc_id.id for tc_id in test_cases]))
|
|
LOG.debug(f'test_case_ids = {test_case_ids}')
|
|
test_run = create_test_run(configs.testrail.RUN_NAME, test_case_ids)
|
|
|
|
global test_run_id
|
|
test_run_id = test_run['id']
|
|
|
|
for test_case in test_cases:
|
|
if not is_test_case_in_run(test_case.id):
|
|
LOG.info('Report result for test case: %s skipped, not in test run: %s',
|
|
test_case.id, configs.testrail.RUN_NAME)
|
|
continue
|
|
|
|
if test_case.skipped:
|
|
_update_result(test_case.id, SKIPPED, test_case.skipped)
|
|
LOG.info(f'Test: "{test_case.id}" marked as "Skipped"')
|
|
else:
|
|
if _get_test_case_status(test_case.id) != UNTESTED:
|
|
_update_result(test_case.id, UNTESTED)
|
|
LOG.info(f'Test: "{test_case.id}" marked as "Untested"')
|
|
|
|
|
|
@pytest.fixture
|
|
def check_result(request):
|
|
yield
|
|
if configs.testrail.RUN_NAME:
|
|
item = request.node
|
|
test_case_ids = _find_test_case_id_markers(request)
|
|
for test_case_id in test_case_ids:
|
|
if is_test_case_in_run(test_case_id):
|
|
current_test_status = _get_test_case_status(test_case_id)
|
|
if item.rep_call.failed:
|
|
_update_result(test_case_id, FAIL, item.rep_call.longreprtext)
|
|
else:
|
|
if current_test_status != FAIL:
|
|
_update_result(test_case_id, PASS, f"{request.node.name} SUCCESS")
|
|
else:
|
|
_update_comment(test_case_id, f"{request.node.name} SUCCESS")
|
|
|
|
|
|
def _update_result(test_case_id: int, result: int, comment: str = None):
|
|
testrail_api.results.add_result_for_case(
|
|
run_id=test_run_id,
|
|
case_id=test_case_id,
|
|
status_id=result,
|
|
comment=comment or ""
|
|
)
|
|
|
|
|
|
def _update_comment(test_case_id: int, comment: str):
|
|
testrail_api.results.add_result_for_case(
|
|
run_id=test_run_id,
|
|
case_id=test_case_id,
|
|
comment=comment
|
|
)
|
|
|
|
|
|
def _find_test_case_id_markers(request) -> typing.List[int]:
|
|
for marker in request.node.own_markers:
|
|
if marker.name == 'case':
|
|
test_case_ids = marker.args
|
|
return test_case_ids
|
|
return []
|
|
|
|
|
|
def _get_test_case_status(test_case_id: int) -> int:
|
|
test_case_results = testrail_api.results.get_results_for_case(test_run_id, test_case_id)
|
|
try:
|
|
result = 0
|
|
while True:
|
|
last_test_case_status = test_case_results['results'][result]['status_id']
|
|
if last_test_case_status is None:
|
|
result += 1
|
|
else:
|
|
return last_test_case_status
|
|
except:
|
|
return SKIPPED
|
|
|
|
|
|
def is_test_case_in_run(test_case_id: int) -> bool:
|
|
try:
|
|
testrail_api.results.get_results_for_case(test_run_id, test_case_id)
|
|
except Exception as err:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def _get_test_cases():
|
|
results = []
|
|
limit = 250
|
|
chunk = 0
|
|
while True:
|
|
tests = testrail_api.tests.get_tests(test_run_id, offset=chunk)['tests']
|
|
results.extend(tests)
|
|
if len(tests) == limit:
|
|
chunk += limit
|
|
else:
|
|
return results
|
|
|
|
|
|
def get_test_cases_in_session(request) -> typing.List[test_case]:
|
|
tests = request.session.items
|
|
test_cases = []
|
|
for test in tests:
|
|
tc_ids = []
|
|
skipped = ''
|
|
for marker in getattr(test, 'own_markers', []):
|
|
match getattr(marker, 'name', ''):
|
|
case 'case':
|
|
tc_ids = list(marker.args)
|
|
case 'skip':
|
|
skipped = f'Reason: {marker.kwargs.get("reason", "")}'
|
|
for tc_id in tc_ids:
|
|
test_cases.append(test_case(tc_id, skipped))
|
|
return test_cases
|
|
|
|
|
|
def create_test_run(name: str, ids: list) -> dict:
|
|
test_run = testrail_api.runs.add_run(
|
|
project_id=configs.testrail.PROJECT_ID,
|
|
name=name,
|
|
description=f'Jenkins: {configs.testrail.CI_BUILD_URL}',
|
|
include_all=False if list else True,
|
|
case_ids=ids or None
|
|
)
|
|
return test_run
|
|
|
|
|
|
def get_test_run(name: str) -> typing.Optional[dict]:
|
|
test_runs = testrail_api.runs.get_runs(
|
|
project_id=configs.testrail.PROJECT_ID,
|
|
is_completed=False
|
|
)
|
|
|
|
for test_run in test_runs['runs']:
|
|
if test_run['name'] == name:
|
|
return test_run
|