cr-connect-workflow/tests/workflow/test_workflow_service.py
Dan Funk d85ca1ce51 Whenever a workflow is loaded or updated, add events to the TaskEvent table with assignments for the next set of ready tasks, along with who should complete those tasks.
Add the lane information to the Task model.
Drop the foreign key constraint on the user_uid in the task log, as we might create tasks for users before they ever log into the system.
Add a new endpoint to the API called task events.  It should be possible to query this and get a list of all tasks that need a users attention.
The task events returned include detailed information about the workflow and study as sub-models
Rename all the actions in event log to things that are easier to pass over the api as arguments, make this backwards compatible, updating existing names in the database via the migration.
Throughly test the navigation and task details as control of the workflow is passed between two lanes.
2020-07-14 22:16:44 -04:00

134 lines
5.3 KiB
Python

import json
from tests.base_test import BaseTest
from crc.services.workflow_processor import WorkflowProcessor
from crc.services.workflow_service import WorkflowService
from SpiffWorkflow import Task as SpiffTask, WorkflowException
from example_data import ExampleDataLoader
from crc import db
from crc.models.task_event import TaskEventModel
from crc.models.api_models import Task
class TestWorkflowService(BaseTest):
def test_documentation_processing_handles_replacements(self):
self.load_example_data()
workflow = self.create_workflow('random_fact')
processor = WorkflowProcessor(workflow)
processor.do_engine_steps()
task = processor.next_task()
task.task_spec.documentation = "Some simple docs"
docs = WorkflowService._process_documentation(task)
self.assertEqual("Some simple docs", docs)
task.data = {"replace_me": "new_thing"}
task.task_spec.documentation = "{{replace_me}}"
docs = WorkflowService._process_documentation(task)
self.assertEqual("new_thing", docs)
documentation = """
# Bigger Test
* bullet one
* bullet two has {{replace_me}}
# other stuff.
"""
expected = """
# Bigger Test
* bullet one
* bullet two has new_thing
# other stuff.
"""
task.task_spec.documentation = documentation
result = WorkflowService._process_documentation(task)
self.assertEqual(expected, result)
def test_documentation_processing_handles_conditionals(self):
self.load_example_data()
workflow = self.create_workflow('random_fact')
processor = WorkflowProcessor(workflow)
processor.do_engine_steps()
task = processor.next_task()
task.task_spec.documentation = "This test {% if works == 'yes' %}works{% endif %}"
docs = WorkflowService._process_documentation(task)
self.assertEqual("This test ", docs)
task.data = {"works": 'yes'}
docs = WorkflowService._process_documentation(task)
self.assertEqual("This test works", docs)
def test_enum_options_from_file(self):
self.load_example_data()
workflow = self.create_workflow('enum_options_from_file')
processor = WorkflowProcessor(workflow)
processor.do_engine_steps()
task = processor.next_task()
WorkflowService.process_options(task, task.task_spec.form.fields[0])
options = task.task_spec.form.fields[0].options
self.assertEqual(28, len(options))
self.assertEqual('1000', options[0]['id'])
self.assertEqual("UVA - INTERNAL - GM USE ONLY", options[0]['name'])
def test_random_data_populate_form_on_auto_complete(self):
self.load_example_data()
workflow = self.create_workflow('enum_options_with_search')
processor = WorkflowProcessor(workflow)
processor.do_engine_steps()
task = processor.next_task()
task_api = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True)
WorkflowService.populate_form_with_random_data(task, task_api, required_only=False)
self.assertTrue(isinstance(task.data["sponsor"], dict))
def test_fix_legacy_data_model_for_rrt(self):
ExampleDataLoader().load_rrt() # Make sure the research_rampup is loaded, as it's not a test spec.
workflow = self.create_workflow('research_rampup')
processor = WorkflowProcessor(workflow, validate_only=True)
# Use the test spec code to complete the workflow of research rampup.
while not processor.bpmn_workflow.is_completed():
processor.bpmn_workflow.do_engine_steps()
tasks = processor.bpmn_workflow.get_tasks(SpiffTask.READY)
for task in tasks:
task_api = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True)
WorkflowService.populate_form_with_random_data(task, task_api, False)
task.complete()
# create the task events
WorkflowService.log_task_action('dhf8r', processor, task,
WorkflowService.TASK_ACTION_COMPLETE)
processor.save()
db.session.commit()
WorkflowService.fix_legacy_data_model_for_rrt()
# All tasks should now have data associated with them.
task_logs = db.session.query(TaskEventModel) \
.filter(TaskEventModel.workflow_id == workflow.id) \
.filter(TaskEventModel.action == WorkflowService.TASK_ACTION_COMPLETE) \
.order_by(TaskEventModel.date).all() # Get them back in order.
self.assertEqual(17, len(task_logs))
for log in task_logs:
task = processor.bpmn_workflow.get_tasks_from_spec_name(log.task_name)[0]
self.assertIsNotNone(log.form_data)
# Each task should have the data in the form for that task in the task event.
if hasattr(task.task_spec, 'form'):
for field in task.task_spec.form.fields:
if field.has_property(Task.PROP_OPTIONS_REPEAT):
self.assertIn(field.get_property(Task.PROP_OPTIONS_REPEAT), log.form_data)
else:
self.assertIn(field.id, log.form_data)
# Some spot checks:
# The first task should be empty, with all the data removed.
self.assertEqual({}, task_logs[0].form_data)