151 lines
5.6 KiB
Python
151 lines
5.6 KiB
Python
import json
|
|
import os
|
|
import requests
|
|
import base64
|
|
from subprocess import check_output
|
|
|
|
TEST_REPORT_DIR_CONTAINER = '/var/log/takoe'
|
|
|
|
|
|
def get_test_report_file_path(test_name):
|
|
file_name = "%s.json" % test_name
|
|
if not os.path.exists(TEST_REPORT_DIR_CONTAINER):
|
|
os.makedirs(TEST_REPORT_DIR_CONTAINER)
|
|
return os.path.join(TEST_REPORT_DIR_CONTAINER, file_name)
|
|
|
|
|
|
def get_testrail_case_id(obj):
|
|
if 'testrail_id' in obj.keywords._markers:
|
|
return obj.keywords._markers['testrail_id'].args[0]
|
|
|
|
|
|
def save_test_result(test, report):
|
|
test_name = test.name
|
|
file_path = get_test_report_file_path(test_name)
|
|
error = None
|
|
if report.failed:
|
|
check_output(['import', '-window', 'root', '%s/%s.png' % (TEST_REPORT_DIR_CONTAINER, test_name)])
|
|
error = report.longrepr.reprcrash.message
|
|
with open('%s/%s.log' % (TEST_REPORT_DIR_CONTAINER, test_name), 'r') as log:
|
|
steps = [i for i in log]
|
|
test_dict = {
|
|
'testrail_case_id': get_testrail_case_id(test),
|
|
'name': test_name,
|
|
'steps': steps,
|
|
'error': error,
|
|
'screenshot': test_name + '.png'}
|
|
json.dump(test_dict, open(file_path, 'w'))
|
|
|
|
|
|
class SingleTestData(object):
|
|
def __init__(self, name, testruns, testrail_case_id):
|
|
self.testrail_case_id = testrail_case_id
|
|
self.name = name
|
|
self.testruns = testruns
|
|
|
|
class TestRunData(object):
|
|
def __init__(self, steps, error):
|
|
self.steps = steps
|
|
self.error = error
|
|
|
|
|
|
class BaseReportDesktop:
|
|
|
|
def __init__(self, test_report_dir_master):
|
|
self.test_reports_dir = test_report_dir_master
|
|
|
|
def init_report(self):
|
|
# overridden in order to prevent deletion of all actual report files, do not remove!
|
|
pass
|
|
|
|
def get_all_tests(self):
|
|
tests = list()
|
|
file_list = [f for f in os.listdir(self.test_reports_dir)]
|
|
for file_name in file_list:
|
|
if file_name.endswith('json'):
|
|
file_path = os.path.join(self.test_reports_dir, file_name)
|
|
test_data = json.load(open(file_path))
|
|
testruns = list()
|
|
testruns.append(SingleTestData.TestRunData(
|
|
steps=test_data['steps'], error=test_data['error']))
|
|
tests.append(SingleTestData(name=test_data['name'], testruns=testruns,
|
|
testrail_case_id=test_data['testrail_case_id']))
|
|
return tests
|
|
|
|
|
|
class TestrailReportDesktop(BaseReportDesktop):
|
|
|
|
def __init__(self, test_report_dir_master):
|
|
super(TestrailReportDesktop, self).__init__(test_report_dir_master)
|
|
|
|
self.password = os.environ.get('TESTRAIL_PASS')
|
|
self.user = os.environ.get('TESTRAIL_USER')
|
|
|
|
self.outcomes = {
|
|
'passed': 1,
|
|
'undefined_fail': 10}
|
|
|
|
self.headers = dict()
|
|
self.headers['Authorization'] = 'Basic %s' % str(
|
|
base64.b64encode(bytes('%s:%s' % (self.user, self.password), 'utf-8')), 'ascii').strip()
|
|
self.headers['Content-Type'] = 'application/json'
|
|
|
|
self.url = 'https://ethstatus.testrail.net/index.php?/'
|
|
self.api_url = self.url + 'api/v2/'
|
|
|
|
self.run_id = 1917
|
|
self.suite_id = 52
|
|
self.project_id = 16
|
|
|
|
def add_run(self, run_name):
|
|
request_body = {'suite_id': self.suite_id,
|
|
'name': run_name,
|
|
'case_ids': self.get_regression_cases(is_pr='PR-' in run_name),
|
|
'include_all': False}
|
|
run = self.post('add_run/%s' % self.project_id, request_body)
|
|
self.run_id = run['id']
|
|
|
|
def get(self, method):
|
|
return requests.get(self.api_url + method, headers=self.headers).json()
|
|
|
|
def post(self, method, data):
|
|
data = bytes(json.dumps(data), 'utf-8')
|
|
return requests.post(self.api_url + method, data=data, headers=self.headers).json()
|
|
|
|
def get_cases(self, section_id):
|
|
return self.get('get_cases/%s&suite_id=%s§ion_id=%s' % (self.project_id, self.suite_id, section_id))
|
|
|
|
def get_regression_cases(self, is_pr=False):
|
|
test_cases = dict()
|
|
test_cases['critical'] = 822
|
|
test_cases['high'] = 799
|
|
test_cases['medium'] = 823
|
|
test_cases['low'] = 801
|
|
case_ids = list()
|
|
if is_pr:
|
|
case_ids = [case['id'] for case in self.get_cases(test_cases['critical'])]
|
|
else:
|
|
for phase in test_cases:
|
|
for case in self.get_cases(test_cases[phase]):
|
|
case_ids.append(case['id'])
|
|
return case_ids
|
|
|
|
def get_screenshot_url(self, build_num, test_name):
|
|
jenkins_url = 'https://jenkins.status.im'
|
|
job_name = 'desktop-sandbox'
|
|
artifacts = '/job/end-to-end-tests/job/%s/%s/artifact/test/desktop_sikuli/report/' % (job_name, build_num)
|
|
return jenkins_url + artifacts + test_name + '.png'
|
|
|
|
def add_results(self, build_num):
|
|
for test in self.get_all_tests():
|
|
test_steps = "\n# Steps: \n"
|
|
method = 'add_result_for_case/%s/%s' % (self.run_id, test.testrail_case_id)
|
|
last_testrun = test.testruns[-1]
|
|
for step in last_testrun.steps:
|
|
test_steps += step + "\n"
|
|
data = {'status_id': self.outcomes['undefined_fail'] if last_testrun.error else self.outcomes['passed'],
|
|
'comment': '%s' % ('# Error: \n %s \n# Screenshot:\n "![](%s)"' % (
|
|
last_testrun.error + test_steps, self.get_screenshot_url(build_num, test.name))
|
|
if last_testrun.error else test_steps)}
|
|
self.post(method, data=data)
|