cr-connect-workflow/tests/test_workflow_processor.py
Dan Funk be057e8758 Adding an "UpdateStudy" task that is able to update the data on the study model, useful for setting core data points on the model, such as setting the Primary Investigator, or altering the Study Title.
Fixing a bug where the validation of forms did not correctly process auto-complete fields.
Fixing a bug where the approvals script and the update study script could not process dot notation correctly.

Moved populate_random_data into the WorkflowService where it makes more sense.
2020-05-25 15:30:06 -04:00

420 lines
21 KiB
Python

import json
import logging
import os
from unittest.mock import patch
from tests.base_test import BaseTest
from SpiffWorkflow.bpmn.specs.EndEvent import EndEvent
from SpiffWorkflow.camunda.specs.UserTask import FormField
from crc import session, db, app
from crc.api.common import ApiError
from crc.models.file import FileModel, FileDataModel
from crc.models.protocol_builder import ProtocolBuilderStudySchema
from crc.services.protocol_builder import ProtocolBuilderService
from crc.models.study import StudyModel
from crc.models.workflow import WorkflowSpecModel, WorkflowStatus
from crc.services.file_service import FileService
from crc.services.study_service import StudyService
from crc.services.workflow_processor import WorkflowProcessor
from crc.services.workflow_service import WorkflowService
class TestWorkflowProcessor(BaseTest):
def _populate_form_with_random_data(self, task):
api_task = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True)
WorkflowService.populate_form_with_random_data(task, api_task)
def get_processor(self, study_model, spec_model):
workflow_model = StudyService._create_workflow_model(study_model, spec_model)
return WorkflowProcessor(workflow_model)
def test_create_and_complete_workflow(self):
self.load_example_data()
workflow_spec_model = self.load_test_spec("random_fact")
study = session.query(StudyModel).first()
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(study.id, processor.bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY])
self.assertIsNotNone(processor)
self.assertEqual(WorkflowStatus.user_input_required, processor.get_status())
next_user_tasks = processor.next_user_tasks()
self.assertEqual(1, len(next_user_tasks))
task = next_user_tasks[0]
self.assertEqual("Task_User_Select_Type", task.get_name())
model = {"type": "buzzword"}
if task.data is None:
task.data = {}
task.data.update(model)
processor.complete_task(task)
self.assertEqual(WorkflowStatus.waiting, processor.get_status())
processor.do_engine_steps()
self.assertEqual(WorkflowStatus.complete, processor.get_status())
data = processor.get_data()
self.assertIsNotNone(data)
self.assertIn("FactService", data)
def test_workflow_with_dmn(self):
self.load_example_data()
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("decision_table")
files = session.query(FileModel).filter_by(workflow_spec_id='decision_table').all()
self.assertEqual(2, len(files))
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(WorkflowStatus.user_input_required, processor.get_status())
next_user_tasks = processor.next_user_tasks()
self.assertEqual(1, len(next_user_tasks))
task = next_user_tasks[0]
self.assertEqual("get_num_presents", task.get_name())
model = {"num_presents": 1}
if task.data is None:
task.data = {}
task.data.update(model)
processor.complete_task(task)
processor.do_engine_steps()
data = processor.get_data()
self.assertIsNotNone(data)
self.assertIn("message", data)
self.assertEqual("Oh, Ginger.", data.get('message'))
self.assertEqual("End", processor.bpmn_workflow.last_task.task_spec.name)
self.assertEqual("Oh, Ginger.", processor.bpmn_workflow.last_task.data.get('message'))
def test_workflow_with_parallel_forms(self):
self.load_example_data()
workflow_spec_model = self.load_test_spec("parallel_tasks")
study = session.query(StudyModel).first()
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(WorkflowStatus.user_input_required, processor.get_status())
# Complete the first steps of the 4 parallel tasks
next_user_tasks = processor.next_user_tasks()
self.assertEqual(4, len(next_user_tasks))
self._populate_form_with_random_data(next_user_tasks[0])
self._populate_form_with_random_data(next_user_tasks[1])
self._populate_form_with_random_data(next_user_tasks[2])
self._populate_form_with_random_data(next_user_tasks[3])
processor.complete_task(next_user_tasks[0])
processor.complete_task(next_user_tasks[1])
processor.complete_task(next_user_tasks[2])
processor.complete_task(next_user_tasks[3])
# There are another 4 tasks to complete (each parallel task has a follow-up task)
next_user_tasks = processor.next_user_tasks()
self.assertEqual(4, len(next_user_tasks))
self._populate_form_with_random_data(next_user_tasks[0])
self._populate_form_with_random_data(next_user_tasks[1])
self._populate_form_with_random_data(next_user_tasks[2])
self._populate_form_with_random_data(next_user_tasks[3])
processor.complete_task(next_user_tasks[0])
processor.complete_task(next_user_tasks[1])
processor.complete_task(next_user_tasks[2])
processor.complete_task(next_user_tasks[3])
processor.do_engine_steps()
# Should be one last step after the above are complete
final_user_tasks = processor.next_user_tasks()
self.assertEqual(1, len(final_user_tasks))
self._populate_form_with_random_data(final_user_tasks[0])
processor.complete_task(final_user_tasks[0])
processor.do_engine_steps()
self.assertTrue(processor.bpmn_workflow.is_completed())
def test_workflow_processor_knows_the_text_task_even_when_parallel(self):
self.load_example_data()
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("parallel_tasks")
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(WorkflowStatus.user_input_required, processor.get_status())
next_user_tasks = processor.next_user_tasks()
self.assertEqual(4, len(next_user_tasks))
self.assertEqual(next_user_tasks[0], processor.next_task(), "First task in list of 4")
# Complete the third open task, so do things out of order
# this should cause the system to recommend the first ready task that is a
# child of the last completed task.
task = next_user_tasks[2]
self._populate_form_with_random_data(task)
processor.complete_task(task)
next_user_tasks = processor.next_user_tasks()
self.assertEqual(processor.bpmn_workflow.last_task, task)
self.assertEqual(4, len(next_user_tasks))
self.assertEqual(task.children[0], processor.next_task())
def test_workflow_processor_returns_next_task_as_end_task_if_complete(self):
self.load_example_data()
workflow_spec_model = self.load_test_spec("random_fact")
study = session.query(StudyModel).first()
processor = self.get_processor(study, workflow_spec_model)
processor.do_engine_steps()
task = processor.next_task()
task.data = {"type": "buzzword"}
processor.complete_task(task)
self.assertEqual(WorkflowStatus.waiting, processor.get_status())
processor.do_engine_steps()
self.assertEqual(WorkflowStatus.complete, processor.get_status())
task = processor.next_task()
self.assertIsNotNone(task)
self.assertIn("FactService", task.data)
self.assertIsInstance(task.task_spec, EndEvent)
def test_workflow_validation_error_is_properly_raised(self):
self.load_example_data()
workflow_spec_model = self.load_test_spec("invalid_spec")
study = session.query(StudyModel).first()
with self.assertRaises(ApiError) as context:
self.get_processor(study, workflow_spec_model)
self.assertEqual("workflow_validation_error", context.exception.code)
self.assertTrue("bpmn:startEvent" in context.exception.message)
def test_workflow_spec_key_error(self):
"""Frequently seeing errors in the logs about a 'Key' error, where a workflow
references something that doesn't exist in the midst of processing. Want to
make sure we produce errors to the front end that allows us to debug this."""
# Start the two_forms workflow, and enter some data in the first form.
self.load_example_data()
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("two_forms")
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
task = processor.next_task()
task.data = {"color": "blue"}
processor.complete_task(task)
# Modify the specification, with a major change.
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path)
# Attemping a soft update on a structural change should raise a sensible error.
with self.assertRaises(ApiError) as context:
processor3 = WorkflowProcessor(processor.workflow_model, soft_reset=True)
self.assertEqual("unexpected_workflow_structure", context.exception.code)
def test_workflow_with_bad_expression_raises_sensible_error(self):
self.load_example_data()
workflow_spec_model = self.load_test_spec("invalid_expression")
study = session.query(StudyModel).first()
processor = self.get_processor(study, workflow_spec_model)
processor.do_engine_steps()
next_user_tasks = processor.next_user_tasks()
self.assertEqual(1, len(next_user_tasks))
self._populate_form_with_random_data(next_user_tasks[0])
processor.complete_task(next_user_tasks[0])
with self.assertRaises(ApiError) as context:
processor.do_engine_steps()
self.assertEqual("task_error", context.exception.code)
def test_workflow_with_docx_template(self):
self.load_example_data()
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("docx")
files = session.query(FileModel).filter_by(workflow_spec_id='docx').all()
self.assertEqual(2, len(files))
workflow_spec_model = session.query(WorkflowSpecModel).filter_by(id="docx").first()
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(WorkflowStatus.user_input_required, processor.get_status())
next_user_tasks = processor.next_user_tasks()
self.assertEqual(1, len(next_user_tasks))
task = next_user_tasks[0]
self.assertEqual("task_gather_information", task.get_name())
self._populate_form_with_random_data(task)
processor.complete_task(task)
files = session.query(FileModel).filter_by(study_id=study.id, workflow_id=processor.get_workflow_id()).all()
self.assertEqual(0, len(files))
processor.do_engine_steps()
files = session.query(FileModel).filter_by(study_id=study.id, workflow_id=processor.get_workflow_id()).all()
self.assertEqual(1, len(files), "The task should create a new file.")
file_data = session.query(FileDataModel).filter(FileDataModel.file_model_id == files[0].id).first()
self.assertIsNotNone(file_data.data)
self.assertTrue(len(file_data.data) > 0)
# Not going any farther here, assuming this is tested in libraries correctly.
def test_load_study_information(self):
""" Test a workflow that includes requests to pull in Study Details."""
self.load_example_data()
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("study_details")
processor = self.get_processor(study, workflow_spec_model)
processor.do_engine_steps()
task = processor.bpmn_workflow.last_task
self.assertIsNotNone(task.data)
self.assertIn("StudyInfo", task.data)
self.assertIn("info", task.data["StudyInfo"])
self.assertIn("title", task.data["StudyInfo"]["info"])
self.assertIn("last_updated", task.data["StudyInfo"]["info"])
self.assertIn("sponsor", task.data["StudyInfo"]["info"])
def test_spec_versioning(self):
self.load_example_data()
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("decision_table")
processor = self.get_processor(study, workflow_spec_model)
self.assertTrue(processor.get_spec_version().startswith('v1.1'))
file_service = FileService()
file_service.add_workflow_spec_file(workflow_spec_model, "new_file.txt", "txt", b'blahblah')
processor = self.get_processor(study, workflow_spec_model)
self.assertTrue(processor.get_spec_version().startswith('v1.1.1'))
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'docx', 'docx.bpmn')
file = open(file_path, "rb")
data = file.read()
file_model = db.session.query(FileModel).filter(FileModel.name == "decision_table.bpmn").first()
file_service.update_file(file_model, data, "txt")
processor = self.get_processor(study, workflow_spec_model)
self.assertTrue(processor.get_spec_version().startswith('v2.1.1'))
def test_restart_workflow(self):
self.load_example_data()
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("two_forms")
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
task = processor.next_task()
task.data = {"key": "Value"}
processor.complete_task(task)
task_before_restart = processor.next_task()
processor.hard_reset()
task_after_restart = processor.next_task()
self.assertNotEqual(task.get_name(), task_before_restart.get_name())
self.assertEqual(task.get_name(), task_after_restart.get_name())
self.assertEqual(task.data, task_after_restart.data)
def test_soft_reset(self):
self.load_example_data()
# Start the two_forms workflow, and enter some data in the first form.
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("two_forms")
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
task = processor.next_task()
task.data = {"color": "blue"}
processor.complete_task(task)
# Modify the specification, with a minor text change.
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_text_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path)
# Setting up another processor should not error out, but doesn't pick up the update.
processor.workflow_model.bpmn_workflow_json = processor.serialize()
processor2 = WorkflowProcessor(processor.workflow_model)
self.assertEqual("Step 1", processor2.bpmn_workflow.last_task.task_spec.description)
self.assertNotEqual("# This is some documentation I wanted to add.",
processor2.bpmn_workflow.last_task.task_spec.documentation)
# You can do a soft update and get the right response.
processor3 = WorkflowProcessor(processor.workflow_model, soft_reset=True)
self.assertEqual("Step 1", processor3.bpmn_workflow.last_task.task_spec.description)
self.assertEqual("# This is some documentation I wanted to add.",
processor3.bpmn_workflow.last_task.task_spec.documentation)
def test_hard_reset(self):
self.load_example_data()
# Start the two_forms workflow, and enter some data in the first form.
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("two_forms")
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
task = processor.next_task()
task.data = {"color": "blue"}
processor.complete_task(task)
next_task = processor.next_task()
self.assertEqual("Step 2", next_task.task_spec.description)
# Modify the specification, with a major change that alters the flow and can't be serialized effectively.
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path)
# Assure that creating a new processor doesn't cause any issues, and maintains the spec version.
processor.workflow_model.bpmn_workflow_json = processor.serialize()
processor2 = WorkflowProcessor(processor.workflow_model)
self.assertTrue(processor2.get_spec_version().startswith("v1 ")) # Still at version 1.
# Do a hard reset, which should bring us back to the beginning, but retain the data.
processor3 = WorkflowProcessor(processor.workflow_model, hard_reset=True)
self.assertEqual("Step 1", processor3.next_task().task_spec.description)
self.assertEqual({"color": "blue"}, processor3.next_task().data)
processor3.complete_task(processor3.next_task())
self.assertEqual("New Step", processor3.next_task().task_spec.description)
self.assertEqual("blue", processor3.next_task().data["color"])
def test_get_latest_spec_version(self):
workflow_spec_model = self.load_test_spec("two_forms")
version = WorkflowProcessor.get_latest_version_string("two_forms")
self.assertTrue(version.startswith("v1 "))
@patch('crc.services.protocol_builder.ProtocolBuilderService.get_studies')
@patch('crc.services.protocol_builder.ProtocolBuilderService.get_investigators')
@patch('crc.services.protocol_builder.ProtocolBuilderService.get_required_docs')
@patch('crc.services.protocol_builder.ProtocolBuilderService.get_study_details')
def test_master_bpmn_for_crc(self, mock_details, mock_required_docs, mock_investigators, mock_studies):
# Mock Protocol Builder response
studies_response = self.protocol_builder_response('user_studies.json')
mock_studies.return_value = ProtocolBuilderStudySchema(many=True).loads(studies_response)
investigators_response = self.protocol_builder_response('investigators.json')
mock_investigators.return_value = json.loads(investigators_response)
required_docs_response = self.protocol_builder_response('required_docs.json')
mock_required_docs.return_value = json.loads(required_docs_response)
details_response = self.protocol_builder_response('study_details.json')
mock_details.return_value = json.loads(details_response)
self.load_example_data(use_crc_data=True)
ProtocolBuilderService.ENABLED = True
study = session.query(StudyModel).first()
workflow_spec_model = db.session.query(WorkflowSpecModel).\
filter(WorkflowSpecModel.name == "top_level_workflow").first()
self.assertIsNotNone(workflow_spec_model)
processor = self.get_processor(study, workflow_spec_model)
processor.do_engine_steps()
self.assertTrue("Top level process is fully automatic.", processor.bpmn_workflow.is_completed())
data = processor.bpmn_workflow.last_task.data
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
# It should mark Enter Core Data as required, because it is always required.
self.assertTrue("enter_core_info" in data)
self.assertEqual("required", data["enter_core_info"])
# It should mark Personnel as required, because StudyInfo.investigators is not empty.
self.assertTrue("personnel" in data)
self.assertEqual("required", data["personnel"])
# It should mark the sponsor funding source as disabled since the funding required (12) is not included in the required docs.
self.assertTrue("sponsor_funding_source" in data)
self.assertEqual("disabled", data["sponsor_funding_source"])
def test_enum_with_no_choices_raises_api_error(self):
self.load_example_data()
workflow_spec_model = self.load_test_spec("random_fact")
study = session.query(StudyModel).first()
processor = self.get_processor(study, workflow_spec_model)
processor.do_engine_steps()
tasks = processor.next_user_tasks()
task = tasks[0]
field = FormField()
field.id = "test_enum_field"
field.type = "enum"
field.options = []
task.task_spec.form.fields.append(field)
with self.assertRaises(ApiError):
self._populate_form_with_random_data(task)