cr-connect-workflow/tests/scripts/test_task_logging.py
Dan 2fc4b44ef3 Create a path to directly download the spreadsheet file (and avoid the weird dance on the front end of making an API call to get file data.)
Fixing pagination.  Seems the front end uses a page_index that is 0 based, and sqlAlchemy prefers to start at 1.
2022-03-12 16:19:07 -05:00

171 lines
7.9 KiB
Python

import json
from tests.base_test import BaseTest
from crc.models.user import UserModel
from crc import session
from crc.models.task_log import TaskLogModel, TaskLogQuery, TaskLogQuerySchema
from crc.models.study import StudyModel
from crc.scripts.log import TaskLog
from crc.services.workflow_processor import WorkflowProcessor
from crc.services.task_logging_service import TaskLoggingService
class TestTaskLogging(BaseTest):
def add_log(self, log_data):
workflow = self.create_workflow('logging_task')
workflow_api = self.get_workflow_api(workflow)
task = workflow_api.next_task
workflow_api = self.complete_form(workflow, task, log_data)
task = workflow_api.next_task
log_id = task.data['log_model']['id']
return log_id
def test_logging_validation(self):
self.load_test_spec('empty_workflow', master_spec=True)
self.create_reference_document()
spec_model = self.load_test_spec('logging_task')
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
self.assertEqual([], rv.json)
def test_add_log(self):
log_data = {'level': 'info',
'code': 'test_code',
'message': 'You forgot to include the correct data.'}
log_id = self.add_log(log_data)
log_model = session.query(TaskLogModel).filter(TaskLogModel.id == log_id).first()
self.assertEqual('test_code', log_model.code)
self.assertEqual('info', log_model.level)
self.assertEqual('Activity_LogEvent', log_model.task)
def test_add_metrics_log(self):
log_data = {'level': 'metrics',
'code': 'test_code',
'message': 'You forgot to include the correct data.'}
log_id = self.add_log(log_data)
log_model = session.query(TaskLogModel).filter(TaskLogModel.id == log_id).first()
self.assertEqual('metrics', log_model.level)
def test_get_logging_validation(self):
self.load_test_spec('empty_workflow', master_spec=True)
self.create_reference_document()
spec_model = self.load_test_spec('get_logging')
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
self.assertEqual([], rv.json)
def test_get_logs(self):
workflow = self.create_workflow('get_logging')
workflow_api = self.get_workflow_api(workflow)
task = workflow_api.next_task
self.assertEqual(2, len(task.data['logging_models_all_post']))
self.assertEqual(1, len(task.data['logging_models_info_post']))
self.assertEqual(1, len(task.data['logging_models_debug_post']))
self.assertIn(task.data['logging_models_info_post'][0], task.data['logging_models_all_post'])
self.assertIn(task.data['logging_models_debug_post'][0], task.data['logging_models_all_post'])
self.assertEqual('test_code', task.data['logging_models_info_post'][0]['code'])
self.assertEqual('debug_test_code', task.data['logging_models_debug_post'][0]['code'])
def test_get_logs_for_study(self):
self.add_studies()
study = session.query(StudyModel).first()
workflow = self.create_workflow('hello_world', study=study)
processor = WorkflowProcessor(workflow)
task = processor.next_task()
TaskLog().do_task(task, study.id, workflow.id,
level='critical',
code='critical_code',
message='This is my critical message.')
TaskLog().do_task(task, study.id, workflow.id,
level='debug',
code='debug_code',
message='This is my debug message.')
# This workflow adds 3 logs
# some_text = 'variable'
# log('info', 'some_code', 'Some longer message')
# log('info', 'some_other_code', 'Another really long message')
# log('debug', 'debug_code', f'This message has a { some_text }!')
workflow = self.create_workflow('get_logging_for_study', study=study)
workflow_api = self.get_workflow_api(workflow)
task_api = workflow_api.next_task
workflow_api = self.complete_form(workflow, task_api, {})
task_api = workflow_api.next_task
workflow_logs = task_api.data['workflow_logs']
study_logs = task_api.data['study_logs']
self.assertEqual(3, len(workflow_logs))
self.assertEqual(5, len(study_logs))
def test_logging_api(self):
workflow = self.create_workflow('logging_task')
workflow_api = self.get_workflow_api(workflow)
task = workflow_api.next_task
form_data = {'level': 'info',
'code': 'test_code',
'message': 'You forgot to include the correct data.'}
workflow_api = self.complete_form(workflow, task, form_data)
task = workflow_api.next_task
user = session.query(UserModel).filter_by(uid=self.test_uid).first()
url = f'/v1.0/study/{workflow.study_id}/log'
task_log_query = TaskLogQuery()
rv = self.app.put(url, headers=self.logged_in_headers(user), content_type="application/json",
data=TaskLogQuerySchema().dump(task_log_query))
self.assert_success(rv)
log_query = json.loads(rv.get_data(as_text=True))
logs = log_query['items']
self.assertEqual(1, len(logs))
self.assertEqual(workflow.id, logs[0]['workflow_id'])
self.assertEqual(workflow.study_id, logs[0]['study_id'])
self.assertEqual('info', logs[0]['level'])
self.assertEqual(self.test_uid, logs[0]['user_uid'])
self.assertEqual('You forgot to include the correct data.', logs[0]['message'])
def test_logging_service_paginates_and_sorts(self):
self.add_studies()
study = session.query(StudyModel).first()
workflow_model = self.create_workflow('hello_world', study=study)
workflow_processor = WorkflowProcessor(workflow_model)
task = workflow_processor.next_task()
for i in range(0, 10):
TaskLog().do_task(task, study.id, workflow_model.id, level='critical', code='critical_code',
message=f'This is my critical message # {i}.')
TaskLog().do_task(task, study.id, workflow_model.id, level='debug', code='debug_code',
message=f'This is my debug message # {i}.')
TaskLog().do_task(task, study.id, workflow_model.id, level='error', code='debug_code',
message=f'This is my error message # {i}.')
TaskLog().do_task(task, study.id, workflow_model.id, level='info', code='debug_code',
message=f'This is my info message # {i}.')
results = TaskLoggingService().get_logs_for_study_paginated(study.id, TaskLogQuery(per_page=100))
self.assertEqual(40, len(results.items), "There should be 40 logs total")
logs = TaskLoggingService().get_logs_for_study_paginated(study.id, TaskLogQuery(per_page=5))
self.assertEqual(40, logs.total)
self.assertEqual(5, len(logs.items), "I can limit results to 5")
self.assertEqual(0, logs.page)
self.assertEqual(8, logs.pages)
self.assertEqual(5, logs.per_page)
self.assertEqual(True, logs.has_next)
self.assertEqual(False, logs.has_prev)
logs = TaskLoggingService.get_logs_for_study_paginated(study.id, TaskLogQuery(per_page=5, sort_column="level"))
for i in range(0, 5):
self.assertEqual('critical', logs.items[i].level, "It is possible to sort on a column")
logs = TaskLoggingService.get_logs_for_study_paginated(study.id, TaskLogQuery(per_page=5, sort_column="level", sort_reverse=True))
for i in range(0, 5):
self.assertEqual('info', logs.items[i].level, "It is possible to sort on a column")